在工作中我們經常面對各種缺失值的處理,當使用pandas,缺失值可以使用fillna,指定method=ffill或bfill就能實現
缺失值的前向或後向填充。但是在spark應用中,需要稍微做一些改變。比如說我們先建立乙個dataframe:
df = spark.createdataframe([(
"a",1,
'2019-06-15 13:20'),
("a",2
,none),
("a",3
,none),
("a",4
,'2019-06-15 13:40'),
("a",5
,'2019-06-15 14:40'),
("a",6
,none),
("b",1
,'2019-06-15 13:42'),
("b",2
,none),
("b",3
,none)]
,["id"
,"num"
,"time"])
df.show(
)
+--
-+--
-+--
----
----
----
--+|
id|num| time|+-
--+-
--+-
----
----
----
---+
| a|1|
2019-06
-1513:
20|| a|
2| null|
| a|
3| null|
| a|4|
2019-06
-1513:
40|| a|5|
2019-06
-1514:
40|| a|
6| null|
| b|1|
2019-06
-1513:
42|| b|
2| null|
| b|
3| null|+-
--+-
--+-
----
----
----
---+
假如有這樣的場景:對於不同的使用者"a"和"b",它有乙個維度num,還有乙個時間列。但是time列存在很多缺失值,現在想要填充這些null值,要求是向前填,
也就是說如果當前有值不作處理,若為空值,就向前找離他最近的值填充。
要實現這個功能就要借助spark的windows函式,**如下:
from pyspark.sql import window
from pyspark.sql.functions import last
import sys
# 開窗函式,以id做分組,指定排序方式,設定視窗大小
window = window.partitionby(
"id"
).orderby(
"num"
).rowsbetween(
-sys.maxsize,0)
# last函式,返回分組中的最後乙個值。ignorenulls為true表示只對null值應用
filled = last(df[
"time"
], ignorenulls=
true
).over(window)
spark_df = df.withcolumn(
"tmp_time"
, filled)
spark_df.orderby(
"id"
,"num"
).show(
).show(
)
結果如下:
+--
-+--
-+--
----
----
----
--+-
----
----
----
---+
|id|num| time| tmp_time|+-
--+-
--+-
----
----
----
---+
----
----
----
----
+| a|1|
2019-06
-1513:
20|2019-06
-1513:
20|| a|
2| null|
2019-06
-1513:
20|| a|
3| null|
2019-06
-1513:
20|| a|4|
2019-06
-1513:
40|2019-06
-1513:
40|| a|5|
2019-06
-1514:
40|2019-06
-1514:
40|| a|
6| null|
2019-06
-1514:
40|| b|1|
2019-06
-1513:
42|2019-06
-1513:
42|| b|
2| null|
2019-06
-1513:
42|| b|
3| null|
2019-06
-1513:
42|+-
--+-
--+-
----
----
----
---+
----
----
----
----
+
可以發現確實實現了這個功能。
原文參考這個部落格,有興趣可以了解下
Spark 解析XML檔案到DataFrame
公司遇到一點需求,平時load檔案基本上都是csv格式的檔案,可是就有那麼乙個檔案是xml檔案,這也正常,因為檔案是別的team推過來的,自然要遵循他們的格式,於是就要想辦法解析xml檔案。目標是把xml檔案轉換為dataframe,然後寫到表中。可是spark.reader並沒有讀取xml格式檔案...
PySpark學習資源
環境搭建及基礎 子雨大資料及spark入門教程 python版 apache spark中國技術社群 spark structured streaming structured streaming using python dataframes api 超讚 structured streaming...
pyspark動作函式
本文列舉幾個常見的pyspark動作函式,幾個常見的轉換函式點這裡 count 返回資料集中的元素個數 collect 以列表的形式返回資料集中的所有元素 first 返回資料集中的第乙個元素 take n 以陣列的形式返回資料集中的前n個元素 reduce func 通過函式func 輸入兩個引數...