20200911 -
本部分只是記錄一下,很久之前的利用python進行hadoop map和reduce的方法。
map**:
#! /bin/python
import sys
import time
defconvert_time
(struct_time)
:return struct_time.tm_hour *
60+ struct_time.tm_min
defmain()
:
stream_time =
"" src_ip =
"" dst_ip =
"" dst_port =
"" c2s_len =
"" payload=
""for line in sys.stdin:
if line.startswith(
"stream"):
data = line.strip(
).split(
"\t"
) stream_time = convert_time(time.strptime(data[1]
,"%y-%m-%d %h:%m:%s"))
src_ip = data[5]
dst_ip = data[7]
dst_port = data[8]
if line.startswith(
"length"):
c2s_len =
" ".join(line.strip()[
7:].split())
if line.startswith(
"payload"):
payload =
" ".join(line.strip()[
8:].split())
print
"%s\t%s\t%s\t%s\t%s\t%s"
%(stream_time,src_ip,dst_ip,dst_port,c2s_len,payload)
if __name__ ==
"__main__"
: main(
)
這裡處理的方式是針對多行的文字進行處理,但是這個指令碼有異常,就是當時的時候,處理另外的資料,發現不對。可能是需要新增異常處理的**。
reduce**:
#! /usr/bin/python
import sys
for line in sys.stdin:
print line.strip(
)
hadoop jar $hadoop_home/share/hadoop/tools/lib/hadoop-streaming-2.9.0.jar
mul_map.py -combiner mul_reduce.py -reducer mul_reduce.py
這個命令是不對的,沒有針對這個資料,但是基本上框架已經清楚的。
map**是用於處理多行的文字(這個是當時日誌記錄過程的時候導致的),然後直接利用key彙總,reduce輸出。
python 如何呼叫帶引數的shell指令碼
舉例 shell的指令碼 t.sh內容 echo this is a test shell with arguments echo arg1 1 arg2 2 執行指令碼.t.sh zhao 結果如下 noncode gnode108 knockdown workflow t.sh zhao1 zh...
Python爬蟲 scrapy定時執行的指令碼
由於伺服器的crontab莫名掛掉了,還沒找到解決的辦法,於是找了另乙個方法 原理 1個程序 多個子程序 scrapy程序 將以下 檔案放入scrapy專案中任意位置即可 from multiprocessing import process from scrapy import cmdline i...
python 中執行linux shell指令碼
subprocess.popen command,shell true 如果command不是乙個可執行檔案,shell true不可省。最簡單的方法是使用class subprocess.popen command,shell true popen類有popen.stdin,popen.stdou...