topn 是統計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的topn可以使業務方在記憶體中按照某個統計指標(如出現次數)計算排名並快速出發出更新後的排行榜。
我們以統計詞頻為例展示一下如何快速開發乙個計算topn的flink程式。
flink支援各種各樣的流資料介面作為資料的資料來源,本次demo我們採用內建的sockettextstream作為資料資料來源。
streamexecutionenvironment env =streamexecutionenvironment.getexecutionenvironment();env.setstreamtimecharacteristic(timecharacteristic.processingtime);
//以processtime作為時間語義
datastream
text = env.sockettextstream(hostname, port); //
監聽指定socket埠作為輸入
與離線wordcount類似,程式首先需要把輸入的整句文字按照分隔符split成乙個乙個單詞,然後按照單詞為key實現累加
datastream> ds =text.flatmap(
new linesplitter()); //
將輸入語句split成乙個乙個單詞並初始化count值為1的tuple2型別
private
static
final
class linesplitter implements
flatmapfunction
>}}
}
datastream> wcount =ds.keyby(0) //
按照tuple2的第乙個元素為key,也就是單詞
.window(slidingprocessingtimewindows.of(time.seconds(600),time.seconds(20)))
//key之後的元素進入乙個總時間長度為600s,每20s向後滑動一次的滑動視窗
.sum(1);//
將相同的key的元素第二個count值相加
全域性topn
資料流經過前面的處理後會每20s計算一次各個單詞的count值併發送到下游視窗
datastream> ret =wcount.windowall(tumblingprocessingtimewindows.of(time.seconds(20)))
//所有key元素進入乙個20s長的視窗(選20秒是因為上游視窗每20s計算一輪資料,topn視窗一次計算只統計乙個視窗時間內的變化)
.process(new topnallfunction(5));//
計算該視窗topn
windowall是乙個全域性併發為1的特殊操作,也就是所有元素都會進入到乙個視窗內進行計算。
privatestatic
class
topnallfunction
extends
processallwindowfunction
, tuple2, timewindow>
@override
public
void
process(
processallwindowfunction
, tuple2, timewindow>.context arg0,
iterable
>input,
collector
> out) throws
exception
});
//treemap按照key降序排列,相同count值不覆蓋
for (tuple2element : input)
}for (entry>entry : treemap
.entryset())
}}
分組topn
在部分場景下,使用者希望根據不同的分組進行排序,計算出每個分組的乙個排行榜。
wcount.keyby(new tuplekeyselectorbystart()) //按照首字母分組
.window(tumblingprocessingtimewindows.of(time.seconds(20))) //
20s視窗統計上游資料
.process(new topnfunction(5)) //
分組topn統計
privatestatic
class tuplekeyselectorbystart implements
keyselector
, string>
}
/*** *針對keyby window的topn函式,繼承自processwindowfunction
**/private
static
class
topnfunction
extends
processwindowfunction
, tuple2, string, timewindow>
@override
public
void
process(
string arg0,
processwindowfunction
, tuple2, string, timewindow>.context arg1,
iterable
>input,
collector
> out) throws
exception
});for (tuple2element : input)
}for (entry>entry : treemap
.entryset())
}}
上面的**實現了按照首字母分組,取每組元素count最高的topn方法。
巢狀topn
全域性topn的缺陷是,由於windowall是乙個全域性併發為1的操作,所有的資料只能匯集到乙個節點進行 topn 的計算,那麼計算能力就會受限於單台機器,容易產生資料熱點問題。
解決思路就是使用巢狀 topn,或者說兩層 topn。在原先的 topn 前面,再加一層 topn,用於分散熱點。例如可以先加一層分組 topn,第一層會計算出每一組的 topn,而後在第二層中進行合併彙總,得到最終的全網topn。第二層雖然仍是單點,但是大量的計算量由第一層分擔了,而第一層是可以水平擴充套件的。
攜程基於Flink的實時特徵平台
1.1 選擇實時計算平台 依據專案的效能指標要求 latency,throughput等 在已有的實時計算平台 storm spark flink進行選擇 1.2主要的開發運維過程 現在的架構是標準lamda架構,離線部分由spark sql datax組成。現在使用的是kv儲存系統aerospik...
基於FLINK搭建實時數倉技術調研
資料倉儲 data warehouse 是做大資料基本都會去涉及的專案。簡單來說,數倉是資料結構化儲存和查詢,並利用分布式計算引擎進行計算得到業務需要的指標,以支援企業商業智慧型,通過充分挖掘資料價值,形成資料資產。傳統的資料倉儲偏離線處理,通過定時排程實現資料的etl,指標的更新依賴於排程的頻率,...
基於 Flink 的實時數倉生產實踐
資料倉儲的建設是 資料智慧型 必不可少的一環,也是大規模資料應用中必然面臨的挑戰。在智慧型商業中,資料的結果代表了使用者反饋 獲取資料的及時性尤為重要。快速獲取資料反饋能夠幫助公司更快地做出決策,更好地進行產品迭代,實時數倉在這一過程中起到了不可替代的作用。如何更好的建設實時數倉 有哪些優秀的生產實...