先說個題外話,如何給hive表增加乙個列,並且該把該列的所有欄位設為』china』?
如果僅僅是增加一列倒是很簡單:
alter table test add columns(flag string)
可要把這個flag欄位全部設定為china,看起來的確是有點難度,因為往hive表中裝載資料的唯一途徑就是使用一種「大量」的資料裝載操作(如何往hive表載入資料請參考),這個時候,如果資料集中本來就沒有flag對應的資料,難道非要手動把china新增上去?這種情況,可以通過靜態分割槽就能夠解決:
load data local inpath '/data/test.txt' overwrite into table test partition (flag = 'china')
有人說,這不扯淡嗎?如果這個china欄位,並不是我們經常需要訪問的字段,何須作為分割槽欄位呢?的確是這樣的,這個時候還可以通過下面的方式來解決這個問題:
insert into table test1 select id, name,'china' as flag from test;
好了步入正題:如何向spark的dataframe增加一列資料
準備資料集:
張三,23
李四,24
王五,25
趙六,26
程式入口sparksession和載入資料**這裡不再描述:
val spark = sparksession
.builder()
.master(master = "local[*]")
.getorcreate()
import spark.implicits._
val df = spark.read.textfile("./data/clm")
.map(_.split(","))
.map(x => (x(0), x(1)))
.todf("name", "age")
.cache()
通過新增列或替換具有相同名稱的現有列來返回新的資料集新的列只能通過現有列轉換得到,這個就有點侷限,不過也能解決一部分問題:column的表示式只能引用此資料集提供的屬性。 新增引用其他資料集的列是錯誤的
比如,我想再增加一列為所有age增加1作為新的一列:
df.withcolumn("new_age", col = df("age") + 1).show()
結果:
+----+---+-------+
|name|age|new_age|
+----+---+-------+
|張三| 23| 24.0|
|李四| 24| 25.0|
|王五| 25| 26.0|
|趙六| 26| 27.0|
+----+---+-------+
那麼如果我想像前言中做那樣的操作怎麼辦?
lit函式的作用:creates a [[column]] of literal value. 建立[[column]]的字面量值
df.withcolumn("class",lit("一班")).show()
結果:
+----+---+-----+
|name|age|class|
+----+---+-----+
|張三| 23| 一班|
|李四| 24| 一班|
|王五| 25| 一班|
|趙六| 26| 一班|
+----+---+-----+
df.createtempview(viewname = "view1")
import spark.sql
sql(sqltext = "select name,age,'一班' as class from view1").show()
結果:
+----+---+-----+
|name|age|class|
+----+---+-----+
|張三| 23| 一班|
|李四| 24| 一班|
|王五| 25| 一班|
|趙六| 26| 一班|
+----+---+-----+
sql(sqltext = "select name,age,concat('','一班') as class from view1").show()
結果:
+----+---+-----+
|name|age|class|
+----+---+-----+
|張三| 23| 一班|
|李四| 24| 一班|
|王五| 25| 一班|
|趙六| 26| 一班|
+----+---+-----+
該函式官網的描述是:乙個列表示式,用於生成單調遞增的64位整數。但是請注意:這個自增列在分區內是連續的,但是分區間並不連續先來個簡單的使用案例:
import org.apache.spark.sql.functions._
df.withcolumn("id", monotonically_increasing_id()).show()
結果:
+----+---+---+
|name|age| id|
+----+---+---+
|張三| 23| 0|
|李四| 24| 1|
|王五| 25| 2|
|趙六| 26| 3|
+----+---+---+
但是,monotonically_increasing_id() 方法生成單調遞增僅僅是針對同乙個分割槽,儘管不同分割槽之間生成的id都是不同的,可不同分區間id不連續,也會造成使用上面的困難,下面進行詳細講解
df.repartition(2)
.withcolumn("id", monotonically_increasing_id())
.show()
結果:
+----+---+----------+
|name|age| id|
+----+---+----------+
|李四| 24| 0|
|趙六| 26| 1|
|張三| 23|8589934592|
|王五| 25|8589934593|
+----+---+----------+
顯然,可以看出李四和趙六為同一分割槽,張三和王五為另乙個分割槽,這兩個分區間id雖然不同,但是並不連續
val tmprdd: rdd[(row, long)] = df.rdd.repartition(2).zipwithindex()
val record: rdd[row] = tmprdd.map(x => )
val schema = new structtype().add("name", "string")
.add("age", "string")
.add("id", "long")
spark.createdataframe(record, schema).show()
結果:
+----+---+---+
|name|age| id|
+----+---+---+
|張三| 23| 0|
|王五| 25| 1|
|李四| 24| 2|
|趙六| 26| 3|
+----+---+---+
val w = window.orderby("age")
df.repartition(2).withcolumn("id", row_number().over(w)).show()
結果:
+----+---+---+
|name|age| id|
+----+---+---+
|張三| 23| 1|
|李四| 24| 2|
|王五| 25| 3|
|趙六| 26| 4|
+----+---+---+
df.repartition(1)
.withcolumn("id", monotonically_increasing_id())
.repartition(2)
.show()
結果:
+----+---+---+
|name|age| id|
+----+---+---+
|張三| 23| 0|
|李四| 24| 1|
|王五| 25| 2|
|趙六| 26| 3|
+----+---+---+
pandas的資料結構之DataFrame
dataframe是乙個 型的資料結構,它含有一組有序的列,每列可以是不同資料型別的資料。dataframe既有行索引也有列索引,可以將它看作為乙個由series組成的字典 共用同乙個索引 dataframe中的資料是以乙個或多個二維塊儲存的,而不是列表 字典或別的一維資料結構。a 通過字典建立,字...
pandas中的資料結構 DataFrame
型的資料結構 修改某一行 frame.values 0 d 2 frame name1 pay2 x d 2 y b 6000 z c 9000 修改某一行的值 frame.values 1 1 9000 frame name1 pay2 x d 2 y b 9000 z c 9000 獲取某行資料...
關於Spark和Spark的學習資料
hadoop社群依然發展迅速,2014年推出了2.3,2.4,2.5 的社群版本,比如增強 resource manager ha,yarn rest api,acl on hdfs,改進 hdfs 的 web ui hadoop roadmap 根據我的觀察,主要更新在yarn,hdfs,而map...