項
pandas
spark
工作方式
單機,無法處理大量資料
分布式,能處理大量資料
儲存方式
單機快取
可以呼叫 persist/cache 分布式快取
是否可變
是 否
index索引
自動建立
無索引
行結構pandas.series
pyspark.sql.row
列結構pandas.series
pyspark.sql.column
允許列重名
否 是pandas dataframe 無法支援大量資料的計算,可以嘗試 spark df 來解決這個問題。
優化前import xgboost as xgb
import pandas as pd
import numpy as np
# 載入模型
bst = xgb.booster()
bst.load_model("***.model")
# 變數列表
var_list=[...]
df.rdd.map(lambda x : cal_xgb_score(x,var_list,ntree_limit=304)).write.todf()
# 計算分數
def cal_xgb_score(x,var_list,ntree_limit=50):
feature_count = len(var_list)
x1 = pd.dataframe(np.array(x).reshape(1,feature_count),columns=var_list)
# 資料變化操作
y1 = transformfun(x1)
test_x = xgb.dmatrix(y1.drop(['mobile','mobile_md5'],xais=1),missing=float('nan'))
y1['score'] = bst.predict(test_x,ntree_limit=ntree_limit)
y2 = y1[['mobile','mobile_md5','score']]
return 每條資料都轉化為 pd,增加了額外開銷。
優化後:def cal_xgb_score(x,var_list,ntree_limit=50):
feature_count = len(var_list)
//將 iterator 轉為list
x1 = pd.dataframe(list(x),columns=var_list)
...//將 pdf 轉為字典
return y1[['mobile','mobile_md5','score']].to_dict(orient='record')優化前:df.topandas()優化後:import pandas as pd
def _map_to_pandas(rdds):
return [pd.dataframe(list(rdds))]
def topandas(df, n_partitions=none):
if n_partitions is not none: df = df.repartition(n_partitions)
df_pand = df.rdd.mappartitions(_map_to_pandas).collect()
df_pand = pd.concat(df_pand)
df_pand.columns = df.columns
return df_pand
# 98列,22w行,型別 array/string/long/int,分割槽 200
df = spark.sql("...").sample(false,0.002)
df.cache()
df.count()
# 原生的 topandas 方法
%timeit df.topandas()
# 分布式的 topandas
%timeit topandas(df)
#使用 apache arrow,spark 版本2.3以上
spark.sql("set spark.sql.execution.arrow.enabled=true")
%timeit df.topandas()一. xgboost **
單條資料處理速度從 120 record / min 提高到 3278 record / min
tips: 如果乙個分割槽資料量過大將會導致 executor oom
二. spark dataframe 轉 pandas dataframe
type
cost (seconds)
native topandas
12 distributed topandas
5.91
arrow topandas
2.52
topandas 返回的資料歸根結底還是快取在 driver 的記憶體中的,不建議返回過大的資料。
Spark簡單使用
spark的乙個主要特點就是可以在記憶體中使用,因此他的計算速度比較快。在初學之前按照 quick start.html 中的示例來做一遍。先來初步理解一下操作流程。1.首先是搭建spark,網上有很多教程,cmd中最後執行pyspark 我們首先來分析spark資料夾中的 readme.md 檔案...
spark基本使用
啟動pysparkcd usr local spark bin pyspark統計文字的行數lines sc.textfile file usr local spark readme.md lines.count rdd的persisit方法會將該rdd物件持久化到記憶體中,對於可能會被重複呼叫的r...
spark使用parallelize方法建立RDD
通過呼叫sparkcontext的parallelize方法,在乙個已經存在的scala集合上建立的 乙個seq物件 集合的物件將會被拷貝,建立出乙個可以被並行操作的分布式資料集。python view plain copy data 1,2,3,4,5 distdata sc.paralleliz...