spark在rdd上提供pipe()方法。spark的pipe()方法可以讓我們使用任意一種語言實現spark作業中的部分邏輯,只要它能讀寫unix標準的流就行。通過pipe(),你可以將rdd中的各元素從標準輸入流中以字串形式讀出,並對這些元素執行任何你需要的操作,然後把結果以字串的形式寫入標準輸出------這個過程就是rdd的轉化操作過程。有了pipe()這個管道我們就可以通過這個管道與r、c++、python以及shell指令碼等程式進行互動,使其能夠進行更快的計算。
本部落格只介紹spark通過pipe()與python,shell進行互動,感興趣的小夥伴可以嘗試一下其他的。
1.首先寫乙個python程式,在本地執行保證沒問題。
import sys
for line in sys.stdin:
d=line.strip().split(',')
if len(d) !=2:
continue
label=d.pop(0)
hit_id=d.pop(0)
features=
print(features)
注意:要用stdin或者raw_input獲取輸入。
2.把python程式寫成指令碼
#!/usr/bin/python
import sys
for line in sys.stdin:
d=line.strip().split(',')
if len(d) !=2:
continue
label=d.pop(0)
hit_id=d.pop(0)
features=
print(features)
注意:指令碼編寫後要新增指令碼的執行許可權
3.編寫spark程式呼叫pipe()
scala> val rdddata=sc.textfile("hdfs://ip/tmp/wordcount.txt")
scala> val scriptpath="/tmp/test/test.py"
scriptpath: string = /tmp/test/test.py
scala> println(rdddata.pipe(scriptpath).collect().tolist)
執行結果如下:
list([('item.id,spark', 1), ('item.id,hive', 1)], [('item.id,hadoop', 1), ('item.id,spark', 1)], [('item.id,zookeeper', 1), ('item.id,kylin', 1)], [('item.id,kylin', 1), ('item.id,hue', 1)], [('item.id,spark', 1), ('item.id,hue', 1)], [('item.id,hadoop', 1), ('item.id,spark', 1)], [('item.id,spark', 1), ('item.id,redis', 1)], [('item.id,spark', 1), ('item.id,hbase', 1)], [('item.id,hive', 1), ('item.id,hbase', 1)])
1.編寫shell指令碼
#!/bin/sh
while read line; do
echo $line | awk ''
done
2.在spark程式中呼叫
scala> val rdddata=sc.textfile("hdfs://ip/tmp/wordcount.txt")
scala> val scriptpath="/tmp/test/test.sh"
scriptpath: string = /tmp/test/test.sh
scala> println(rdddata.pipe(scriptpath).collect().tolist)
list((item.id,spark,1)(item.id,hive,1)(item.id,hive.hadoop,1)(item.id,hadoop,1)(item.id,spark,1)(item.id,zookeeper,1)(item.id,kylin,1)(item.id,kylin,1)(item.id,hue,1), (item.id,spark,1)(item.id,hue,1)(item.id,hadoop,1)(item.id,spark,1)(item.id,spark,1)(item.id,redis,1)(item.id,spark,1)(item.id,hbase,1)(item.id,hive,1)(item.id,hbase,1))
例項 Linux管道pipe的使用
例項 linux管道pipe的使用 moakap總結 函式 include int pipe int filedes 2 描述 pipe 函式建立乙個管道和指向該管道的一對檔案描述符,並且將檔案描述符儲存到檔案描述符陣列filedes中。其中filedes 0 為讀端,filedes 1 為寫端。返...
Spark在Yarn上的效能調優
1 process local 程序本地化 推薦使用 和資料在同乙個 executor 程序中,資料在executor的blockmanager中,效能最好 2 node local 節點本地化 推薦使用 和資料在乙個節點中,主要分兩種情況 i 資料在節點上,task在節點上的executor中執行...
Spark的基本使用
啟動spark shell 開啟命令列或終端 pyspark import pyspark 匯入pyspark 檢視spark context資訊 讀入檔案 列印檔案內容 可利用collect 函式,它能夠以陣列的形式,返回rdd資料集的所有元素 lines spark.read.text file...