1、首先把redis包引入工程,這樣就不需要在集群裡每台機器上安裝redis客戶端。
$pip install redis
$cd /usr/local/lib/python3.6/dist-packages/ 找到自己環境的路徑
$zip -r redis.zip redis/*
$hdfs dfs -put redis.zip /user/data/
2、在**裡使用 addpyfile載入redis.zip
sc = sparkcontext(conf=conf)
sc.addpyfile("hdfs:///user/data/redis.zip")
#定義乙個寫入redis函式
def datatoredis(data):
r = redis.strictredis(host='ip', port=6379, password='passwd')
for i in data:
r.set(str(i[0]), str(i[1]))
#讀取hive資料
sqlcontext = hivecontext(sc)
read_hive_score = sqlcontext.sql("select id,item from recom.result limit 10")
hiverdd_score =read_hive_score.rdd
result_dataset = hiverdd_score.map(lambda x: (x['id'], x['item'])).collect()
#呼叫函式
datatoredis(result_dataset)
參考:
write data to redis from pyspark
spark-redis
pyspark例項-spark on yarn將hdfs的資料寫入redis
python redis.connectionerror() 例子
redis操作 + strictredis使用
Pyspark讀取parquet資料過程解析
parquet資料 列式儲存結構,由twitter和cloudera合作開發,相比於行式儲存,其特點是 可以跳過不符合條件的資料,只讀取需要的資料,降低io資料量 壓縮編碼可以降低磁碟儲存空間,使用更高效的壓縮編碼節約儲存空間 只讀取需要的列,支援向量運算,能夠獲取更好的掃瞄效能。那麼我們怎麼在py...
使用pyspark讀寫hive資料表
pyspark提供了操作hive的介面,可以直接使用sql語句從hive裡面查詢需要的資料,如下 coding utf 8 import sys from pyspark.sql import sparksession,hivecontext reload sys sys.setdefaultenc...
pyspark讀取csv檔案建立DataFrame
mark一下,感謝作者分享!方法一 用pandas輔助 from pyspark import sparkcontext from pyspark.sql import sqlcontext import pandas as pd sc sparkcontext sqlcontext sqlcont...