spark資料處理方式主要有三種:rdd、dataframe、spark sql
三者的主要差異在於是否定義schema
rdd的資料未定義schema(也就是未定義欄位名及資料型別)。使用上必須有map/reduce的概念,需要高階別的程式設計能力。但是功能也最強,能完成所有spark功能。
spark dataframe建立時必須定義schema(定義每乙個欄位名與資料型別)
spark sql是由dataframe衍生出來的,我們必須先建立dataframe,然後通過登入spark sql temp table,就可以使用spark sql語法了。
易使用度:spark sql>dataframe>rdd
阿里天池智聯招聘處理資料舉例:
rdd1 = sc.textfile("/bigdata")
rdd1.count()
rdd2=rdd1.map(lambda line:line.split(","))
#通過rdd2建立dataframe,定義dataframe的每乙個欄位名與資料型別
from pyspark.sql import row
zhilian_rows = rdd2.map(lambda p:
row(
num_people=p[0],
company_name=p[1],
job_name=p[8],
work_place=p[9],
experience=p[14],
) )
2建立了zhilian_rows之後,使用sqlcontext.createdataframe()方法寫入zhilian_rows資料
from pyspark.sql import sqlcontext
sqlcontext = sqlcontext(sc)
zhilian_df = sqlcontext.createdataframe(zhilian_rows) #sqlcontext.createdataframe建立
zhilian_df.printschema() #檢視dataframes的schema
zhilian_df.show() #.show()方法來檢視前5行資料
zhilian_df.alias("df") #建立別名
df.select("company_type").groupby("company_type").count().show() #使用dataframe統計公司性質及數量
3建立pyspark sql
sqlcontext.registerdataframeastable(df, "zhilian_table") #使用registertemptable方法將df轉換為表
sqlcontext.sql("select count(*) counts from zhilian_table").show() #sqlcontext.sql查詢
其他**:
from pyspark.mllib.feature import word2vec
from pyspark.ml.feature import word2vec
from pyspark.ml.feature import countvectorizer, countvectorizermodel, tokenizer, regextokenizer, stopwordsremover
#1.從資料庫中提取資料
org_df = spark.sql("select label,username from *** ")
#2.將提取的資料轉換成dataframe格式
res_rdd = org_df.rdd.map(list).map(lambda x:[x[0],' '.join([i for i in x[1]])]).map(tuple)
#print(res_rdd.take(100))
res_df = spark.createdataframe(res_rdd,['label','username'])
#3.使用tokenizer分詞
tokenizer = tokenizer(inputcol="username", outputcol="words")
t_words = tokenizer.transform(res_df)
2 特徵處理
from pyspark.sql import sqlcontext
from pyspark import sparkcontext
sc =sparkcontext()
sqlcontext = sqlcontext(sc)
data = sqlcontext.read.format('spark.csv').options(header='true',inferschema='true').load('train.csv')
data = data.select([column for column in data.columns if column not in drop_list])
data.show(5)
data.printschema() #顯示結構
from pyspark.sql.functions import col
data.groupby("descrip").count().orderby(col("count").desc()).show() #包含犯罪數量最多的20個描述
#使用pipeline
dataset.show(5)
3文字分類各種模型
(trainingdata, testdata) = dataset.randomsplit([0.7, 0.3], seed = 100)
lr = logisticregression(maxiter=20, regparam=0.3, elasticnetparam=0)
lrmodel = lr.fit(trainingdata)
predictions = lrmodel.transform(testdata)
predictions.filter(predictions['prediction'] == 0) \
.select("descript","category","probability","label","prediction") \
.orderby("probability", ascending=false) \.show(n = 10, truncate = 30)
from pyspark.ml.evaluation import multiclassclassificationevaluator
evaluator = multiclassclassificationevaluator(predictioncol="prediction")
evaluator.evaluate(predictions)
其他參考
利用pyspark 資料預處理(特徵化))
特徵處理 數值特徵處理
專案工作流程 標準化標籤,將標籤值統一轉換成range 標籤值個數 1 範圍內 簡單來說 labelencoder 是對不連續的數字或者文字進行編號 one hot的基本思想 將離散型特徵的每一種取值都看成一種狀態,若你的這一特徵中有n個不相同的取值,那麼我們就可以將該特徵抽象成n種不同的狀態,on...
特徵預處理和特徵生成 三 缺失值的處理
填充nan的方法要取決於特定情況!填充缺失值常用的方法有以下三種 需要注意的是有時候缺失值已經被組織者替換了!通常情況下,在特徵生成之前要避免填充nans!1,二值特徵isnull可能會很有用,它指明了哪些特徵是缺失值,可以看做乙個類別特徵。在計算平均值或中位數時,這種方法可以解決樹和神經網路的問題...
特徵預處理,特徵選擇
統一量綱 特徵規格不一樣,不能放在一起比較。主要看模型,比如樹模型就不太需要,而計算距離之類的模型,或者神經網路就需要 主要有標準化,區間放縮,歸一化。標準化 標準化,均值為0,方差為1 from sklearn.preprocessing import standardscaler 標準化,返回值...