spark-submit --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3 --properties-file /etc/spark/conf/spark-defaults.conf test.py
from helper.util_helper import sub_name
data_converted = data.
map(
lambda x:
(sub_name(x[2]
[1])
, sub_name(x[1]
[1])
, sub_name(x[2]
[1])
))
zip -r helper.zip helper/
spark-submit --deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3 --properties-file /etc/spark/conf/spark-defaults.conf --py-files ./helper.zip test.py
def
get_bd_res()
:import sys
# return sys.path
import requests
url =
""payload=
headers =
return
str(sys.path)
+requests.request(
"get"
, url, headers=headers, data=payload)
.text..
....
#測試依賴打包上傳集群
from helper.util_helper import sub_name
data_converted = data.
map(
lambda x:
(get_bd_res(),
get_bd_res(
), sub_name(x[2]
[1])
))
wget
bash anaconda3-5.2.0-linux-x86_64.sh
conda create -n python3.6 python==3.6 anaconda
zip -r anaconda3.zip anaconda3/*
hadoop fs -put ./anaconda3.zip /data/ai
spark-submit
--deploy-mode client --driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3
--properties-file /etc/spark/conf/spark-defaults.conf
--py-files ./helper.zip
--archives hdfs:///data/ai/anaconda3.zip#anaconda3 (這個一定不要省略,代表集群拉下來zip包後解壓到的檔名)
--conf spark.pyspark.python=./anaconda3/anaconda3/envs/python3.6/bin/python3 (集群worker使用從hdfs上拉下來的解壓過的python3.6直譯器)
test.py
spark-submit --deploy-mode cluster
--driver-memory 2g --executor-memory 2g --executor-cores 3 --num-executors 3
--properties-file /etc/spark/conf/spark-defaults.conf
--py-files ./helper.zip --archives hdfs:///data/ai/anaconda3.zip#anaconda3
--queue root.default
--name my.test.py
--conf spark.pyspark.python=./anaconda3/anaconda3/envs/python3.6/bin/python3
test.py
test.py
from pyspark.conf import sparkconf
from pyspark.context import sparkcontext
from pyspark.sql import sparksession
default_yarn_queue =
"root.default"
default_master =
"yarn"
defopen_spark_session
"ai-train"
, executor_memory=
"2g"
, executor_instances=
"3", executor_cores=
"2", driver_memory=
"2g"):
executor_instances=executor_instances, executor_cores=executor_cores,
driver_memory=driver_memory, yarn_queue=default_yarn_queue)
spark = sparksession.builder.config(conf=conf)
.getorcreate(
)return spark
defget_spark_config
(master=
"yarn-client"
"ai-train"
, executor_memory=
"2g"
, executor_instances=
"3", executor_cores=
"2", driver_memory=
"2g"
, yarn_queue=
"root.ai"):
conf =
(sparkconf(
).setmaster(master)
.set
("spark.executor.memory"
, executor_memory)
.set
("spark.executor.instances"
, executor_instances)
.set
("spark.executor.cores"
, executor_cores)
.set
("spark.driver.memory"
, driver_memory)
.set
("spark.yarn.queue"
, yarn_queue)
)return conf
"test"
)data = spark.sparkcontext.parallelize([[
('id'
,'a0w1a0000003xb1a'),
('packsize'
,1.0),
('name'
,'a')]
,[('id'
,'a0w1a0000003xaai'),
('packsize'
,1.0),
('name'
,'b')]
,[('id'
,'a0w1a00000xb3aai'),
('packsize'
,30.0),
('name'
,'c')]
])data_converted = data.
map(
lambda x:
(x[2][
1], x[1]
[1], x[2]
[1])
)print
(data_converted.take(3)
)
helper/util_helper.py
def
sub_name
(name)
:return
"testaaa"
+str
(name)
**配置的sparkconf
spark-submit命令列的引數指定
spark-default等配置檔案的引數
pyspark提交集群任務
建議使用conda conda轉殖環境 conda create n prod env clone base 進入conda的miniconda3 envs 打包python環境 zip r prod env.zip prod env sh指令碼 exportpyspark driver pytho...
在hue(oozie)上提交pyspark
因為需要使用python的一些庫,不得不使用pyspark。在這裡記錄下遇到的問題。本地開發環境,網上比較容易查到,這裡就不寫了。這裡主要說線上提交的依賴問題。這裡是用hue提交,主要有以下幾個步驟 1 將所有的依賴打成zip包 2 使用sc.addpyfiles path 或者是sc.addpyf...
pyspark 新增 redis 模組
安裝 redis 模組 並把 redis 模組打包 pip install redis mkdir redis mv site packages redis redis import shutil dir name redis output filename redis shutil.make ar...