這次的例子是計算航空公司的平均延遲時間,並畫圖
直接上**:
import csv
import matplotlib.pyplot as plt
import matplotlib as mpl
mpl.use(
"tkagg"
)# use tkagg to show figures
from stringio import stringio
from datetime import datetime
from collections import namedtuple
from operator import add, itemgetter
from pyspark import sparkconf, sparkcontext
"flight delay analysis"
date_fmt =
"%y-%m-%d"
time_fmt =
"%h%m"
fields =
('date'
,'airline'
,'flightnum'
,'origin'
,'dest'
,'dep'
,'dep_delay'
,'arv'
,'arv_delay'
,'airtime'
,'distance'
)flight = namedtuple(
'flight'
, fields)
defparse
(row)
:# 將每一行解析為乙個元祖
row[0]
= datetime.strptime(row[0]
, date_fmt)
.date(
) row[5]
= datetime.strptime(row[5]
, time_fmt)
.time(
) row[6]
=float
(row[6]
) row[7]
= datetime.strptime(row[7]
, time_fmt)
.time(
) row[8]
=float
(row[8]
) row[9]
=float
(row[9]
) row[10]
=float
(row[10]
)return flight(
*row[:11
])defsplit
(line)
:# 將一行內容作為檔案給csv.reader
reader = csv.reader(stringio(line)
)return reader.
next()
defplot
(delays)
: airlines =
[d[0
]for d in delays]
minutes =
[d[1
]for d in delays]
index =
list
(xrange
(len
(airlines)))
fig, axe = plt.subplots(
) bars = axe.barh(index, minutes)
for idx, air,
minin
zip(index, airlines, minutes):if
min>0:
bars[idx]
.set_color(
'#d9230f'
) axe.annotate(
"%0.0f min"
%min
, xy=
(min+1
, idx+
0.5)
, va=
'center'
)else
: bars[idx]
.set_color(
'#469408'
) axe.annotate(
"%0.0f min"
%min
, xy=
(min+1
, idx+
0.5)
, va=
'center'
) ticks = plt.yticks(
[idx+
0.5for idx in index]
, airlines)
xt = plt.xticks()[
0]plt.xticks(xt,
[' ']*
len(xt)
) plt.grid(axis=
'x', color =
'white'
, linestyle=
'-')
plt.title(
"total minutes delayed per airline"
) plt.show(
)def
main
(sc)
:# 開啟查詢airline全程的檔案,作為乙個字典用於查詢
airlines =
dict
(sc.textfile(
"flight/airlines.csv").
map(split)
.collect())
# 將這個字典傳送給所有worker節點
airline_lookup = sc.broadcast(airlines)
# 開啟航班資訊
flights = sc.textfile(
"flight/flights.csv").
map(split)
.map
(parse)
# 計算延誤時間
delays = flights.
map(
lambda f:
(airline_lookup.value[f.airline]
,add(f.dep_delay, f.arv_delay)))
# 對同個航空公司的延誤時間相加
delays = delays.reducebykey(add)
.collect(
)# 根據延誤時間排序
delays =
sorted
(delays, key=itemgetter(1)
)for d in delays:
print
"%0.0f minutes delayed\t%s"
%(d[1]
, d[0]
)# 畫圖
plot(delays)
if __name__==
"__main__"
:# spark配置
conf = sparkconf(
).setmaster(
"local[*]"
) sc = sparkcontext(conf=conf)
main(sc)
就可以執行了
Spark灰度發布在十萬級節點上的實踐
持續整合是指,及時地將最新開發的且經過測試的 整合到主幹分支中。持續整合的優點 目前主流的 管理工具有,github gitlab等。本文所介紹的內容中,所有 均託管於私有的 gitlab 中。鑑於 jenkins 幾乎是 ci 事實上的標準,本文介紹的 spark ci cd u0026amp c...
Spark灰度發布在十萬級節點上的實踐
持續整合是指,及時地將最新開發的且經過測試的 整合到主幹分支中。持續整合的優點 目前主流的 管理工具有,github gitlab等。本文所介紹的內容中,所有 均託管於私有的 gitlab 中。鑑於 jenkins 幾乎是 ci 事實上的標準,本文介紹的 spark ci cd u0026amp c...
Spark灰度發布在十萬級節點上的實踐
持續整合是指,及時地將最新開發的且經過測試的 整合到主幹分支中。持續整合的優點 目前主流的 管理工具有,github gitlab等。本文所介紹的內容中,所有 均託管於私有的 gitlab 中。鑑於 jenkins 幾乎是 ci 事實上的標準,本文介紹的 spark ci cd u0026amp c...