# coding=utf-8
import os
os.environ[
'pyspark_submit_args']=
'--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell'
from pyspark.sql import sparksession, sqlcontext
from pyspark import sparkconf, sparkcontext
conf = sparkconf(
).setmaster(
"yarn"
)"pyspark_xgboost_yarn"
)sc = sparkcontext(conf=conf)
'calculatinggeodistances'
).getorcreate(
)sqlcontext = sqlcontext(sparkcontext=sc)
from pyspark.sql.types import
*from pyspark.ml.feature import stringindexer, vectorassembler
from pyspark.ml import pipeline
# spark.sparkcontext.addpyfile("hdfs:///tmp/rd/lp/sparkxgb.zip")
schema = structtype(
[structfield(
"passengerid"
, doubletype())
, structfield(
"survived"
, doubletype())
, structfield(
"pclass"
, doubletype())
, structfield(
"name"
, stringtype())
, structfield(
"***"
, stringtype())
, structfield(
"age"
, doubletype())
, structfield(
"sibsp"
, doubletype())
, structfield(
"parch"
, doubletype())
, structfield(
"ticket"
, stringtype())
, structfield(
"fare"
, doubletype())
, structfield(
"cabin"
, stringtype())
, structfield(
"embarked"
, stringtype())
])df_raw = spark \
.read \
.option(
"header"
,"true"
) \ .schema(schema) \
.csv(
"train.csv"
)df_raw.show(20)
df = df_raw.na.fill(0)
vectorassembler = vectorassembler(
) \ .setinputcols(
["pclass"
,"age"
,"sibsp"
,"parch"
,"fare"
]) \
.setoutputcol(
"features"
)from sparkxgb import xgboostclassifier
xgboost = xgboostclassifier(
featurescol=
"features"
, labelcol=
"survived"
, predictioncol=
"prediction"
, missing=
0.0)
pipeline = pipeline(stages=
[vectorassembler, xgboost]
)# randomsplit 隨機分為測試集合訓練集
traindf, testdf = df.randomsplit(
[0.8
,0.2
], seed=24)
traindf.show(2)
print
("************************開始訓練****************************"
)# 擬合模型
model = pipeline.fit(traindf)
print
("************************訓練結束****************************"
)print
("************************開始******************************"
)model.transform(testdf)
.select(
"passengerid"
,"survived"
,"prediction"
).show(
)print
("**************************結束*****************************"
)# 輸出的所有結果
model.transform(testdf)
.show(
)print
("程式結束"
1.上面兩個jar包必須放到spark-submit提交引數裡面。os.environ['pyspark_submit_args'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell
2.將sparkxgb.zip 解壓到python3 的包的安裝目錄裡面,linux裡面預設安裝路徑如下/usr/local/python3/lib/python3.6/site-packages
3.如果不想將sparkxgb.zip解壓到python包的安裝目錄,不想把jar包放到python**裡面可以。那麼就可以使用spark shell首先要注釋:
os.environ['pyspark_submit_args'] = '--jars /data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar,/data/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar pyspark-shell
然後在linux裡面執行如下spark shell命令:
spark-submit --master yarn --py-files /
data
/pycharm/zhanglong/pysparkxgboostnew/sparkxgb.zip --jars /
data
/pycharm/zhanglon/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-0.90.jar,
/data
/pycharm/zhanglong/pysparkxgboostnew/xgboost4j-spark-0.90.jar /
data
/pycharm/zhanglong/pysparkxgboostnew/test.py
zip包和jar包需要指定到具體的位置。
4.spark 預設讀取的csv檔案在hdfs的 /user/root/ 目錄下,執行前需要提前將train.csv檔案上傳到該目錄下面。
如需sparkxgb.zip包和兩個jar包和訓練集可以q:2316352792
分布式系統 Spark
特點 粗粒度的變換。如 map,filter,join 行為 需要得出結果時呼叫 5部分操作意義 資料分割槽集 partitions partition是資料集的最小單位,即乙個shard 位置preferredlocations 輸入partition,輸出是該資料所在的位置 此分割槽在哪台機器上...
Jenkins 分布式執行
master sl e jenkins部署到linux伺服器,執行在windows本地 1 sl e向master報道 jenkins配置 節點管理 配置節點 通過launch,安裝jar包連線主機 2.正常配置jenkins任務 區別點 在general中設定 restrict配置,label是在...
Spark 偽分布式安裝教程
mr跑迭代演算法的侷限性太大,後續想將一部分任務轉移到spark上。公司其他組每天有提交spark任務在yarn上執行。但是他們的客戶機,我們組沒有許可權登入,而且他們也沒有相應的測試機器。於是一咬牙,一跺腳,算了,自己搭環境吧。找了臺我們自己的測試機開幹。給大家上個spark版本資訊的圖 基本每隔...