Spark SQL和特徵處理

2021-10-03 13:08:36 字數 4621 閱讀 4585

spark資料處理方式主要有三種:rdd、dataframe、spark sql

三者的主要差異在於是否定義schema

rdd的資料未定義schema(也就是未定義欄位名及資料型別)。使用上必須有map/reduce的概念,需要高階別的程式設計能力。但是功能也最強,能完成所有spark功能。

spark dataframe建立時必須定義schema(定義每乙個欄位名與資料型別)

spark sql是由dataframe衍生出來的,我們必須先建立dataframe,然後通過登入spark sql temp table,就可以使用spark sql語法了。

易使用度:spark sql>dataframe>rdd

阿里天池智聯招聘處理資料舉例:

rdd1 = sc.textfile("/bigdata")  

rdd1.count()

rdd2=rdd1.map(lambda line:line.split(","))

#通過rdd2建立dataframe,定義dataframe的每乙個欄位名與資料型別

from pyspark.sql import row

zhilian_rows = rdd2.map(lambda p:

row(

num_people=p[0],

company_name=p[1],

job_name=p[8],

work_place=p[9],

experience=p[14],

) )

2建立了zhilian_rows之後,使用sqlcontext.createdataframe()方法寫入zhilian_rows資料

from pyspark.sql import sqlcontext

sqlcontext = sqlcontext(sc)

zhilian_df = sqlcontext.createdataframe(zhilian_rows) #sqlcontext.createdataframe建立

zhilian_df.printschema() #檢視dataframes的schema

zhilian_df.show() #.show()方法來檢視前5行資料

zhilian_df.alias("df") #建立別名

df.select("company_type").groupby("company_type").count().show() #使用dataframe統計公司性質及數量

3建立pyspark sql

sqlcontext.registerdataframeastable(df, "zhilian_table") #使用registertemptable方法將df轉換為表

sqlcontext.sql("select count(*) counts from zhilian_table").show() #sqlcontext.sql查詢

其他**:

from pyspark.mllib.feature import word2vec

from pyspark.ml.feature import word2vec

from pyspark.ml.feature import countvectorizer, countvectorizermodel, tokenizer, regextokenizer, stopwordsremover

#1.從資料庫中提取資料

org_df = spark.sql("select label,username from *** ")

#2.將提取的資料轉換成dataframe格式

res_rdd = org_df.rdd.map(list).map(lambda x:[x[0],' '.join([i for i in x[1]])]).map(tuple)

#print(res_rdd.take(100))

res_df = spark.createdataframe(res_rdd,['label','username'])

#3.使用tokenizer分詞

tokenizer = tokenizer(inputcol="username", outputcol="words")

t_words = tokenizer.transform(res_df)

2 特徵處理

from pyspark.sql import sqlcontext

from pyspark import sparkcontext

sc =sparkcontext()

sqlcontext = sqlcontext(sc)

data = sqlcontext.read.format('spark.csv').options(header='true',inferschema='true').load('train.csv')

data = data.select([column for column in data.columns if column not in drop_list])

data.show(5)

data.printschema() #顯示結構

from pyspark.sql.functions import col

data.groupby("descrip").count().orderby(col("count").desc()).show() #包含犯罪數量最多的20個描述

#使用pipeline

dataset.show(5)

3文字分類各種模型

(trainingdata, testdata) = dataset.randomsplit([0.7, 0.3], seed = 100)

lr = logisticregression(maxiter=20, regparam=0.3, elasticnetparam=0)

lrmodel = lr.fit(trainingdata)

predictions = lrmodel.transform(testdata)

predictions.filter(predictions['prediction'] == 0) \

.select("descript","category","probability","label","prediction") \

.orderby("probability", ascending=false) \.show(n = 10, truncate = 30)

from pyspark.ml.evaluation import multiclassclassificationevaluator

evaluator = multiclassclassificationevaluator(predictioncol="prediction")

evaluator.evaluate(predictions)

其他參考

利用pyspark 資料預處理(特徵化))

特徵處理 數值特徵處理

專案工作流程 標準化標籤,將標籤值統一轉換成range 標籤值個數 1 範圍內 簡單來說 labelencoder 是對不連續的數字或者文字進行編號 one hot的基本思想 將離散型特徵的每一種取值都看成一種狀態,若你的這一特徵中有n個不相同的取值,那麼我們就可以將該特徵抽象成n種不同的狀態,on...

特徵預處理和特徵生成 三 缺失值的處理

填充nan的方法要取決於特定情況!填充缺失值常用的方法有以下三種 需要注意的是有時候缺失值已經被組織者替換了!通常情況下,在特徵生成之前要避免填充nans!1,二值特徵isnull可能會很有用,它指明了哪些特徵是缺失值,可以看做乙個類別特徵。在計算平均值或中位數時,這種方法可以解決樹和神經網路的問題...

特徵預處理,特徵選擇

統一量綱 特徵規格不一樣,不能放在一起比較。主要看模型,比如樹模型就不太需要,而計算距離之類的模型,或者神經網路就需要 主要有標準化,區間放縮,歸一化。標準化 標準化,均值為0,方差為1 from sklearn.preprocessing import standardscaler 標準化,返回值...