背景
目前我們執行的實時任務基本上都是使用sparkstreaming,當然後面考慮使用最近比較火的flink,看了部分資料介紹後,我感覺sparkstreaming相對於flink,唯一的不足是,sparkstreaming在task排程上損耗了不少效能。flink還沒有深入研究內部實現,flink現在這麼火,後面找找相關資料學習一番。今天主要記錄一下當前我所用到的sparkstreaming任務是如何管理offset的。
實現方式
原理:每處理完乙個批次的資料,手動提交offset。
km =
newkafkamanager
(kafkaparam)
inputdstream = km.createdirectstream[string, string]
(ssc, kafkaparam, topics)
calculate.
loadcalculatefromconf
(spark)
logger.
info
"streamingframework initialized!"
)
override def calculate =
".temptable)
df.createorreplacetempview
.temptable)
calculate
(df)
}else
//更新 consumer offsets
val offsetranges = rdd.asinstanceof[hasoffsetranges]
.offsetranges //得到該 rdd 對應 kafka 的訊息的 offset
km.updatezkoffsets
(offsetranges)
//清除臨時表
val temptables = spark.sqlcontext.
tablenames()
temptables.
foreach
(table =
>
", table)
spark.sqlcontext.
droptemptable
(table)})
}catch
", e.getmessage)}}
})ssc.
start()
ssc.
awaittermination()
}
上面**是實時計算框架的一部分,實時計算框架主要還是依賴sparksql來實現,基於外部sql來達到快速開發,後面再介紹實時計算框架的思路,和實現。咱們回歸正題,offset的管理主要集中在上面kafkamanager類裡面。
在建立inputdstream 前,我們首先通過當前消費的groupid和topic資訊去獲取消費資訊,如果獲取不到,說明是乙個新的消費者,此時,根據auto.offset.reset配置來設定要開始處理的訊息的offset,如果是等於smallest,就從該topic最早的offset開始消費,否則就從最新的offset開始消費。如果存在消費記錄,就需要判斷儲存的offset是否大於該topic最早的offset,如果小於,則表示儲存的offset已經過時,因為kafka有清理歷史資料的機制,下面是**實現:
private def setorupdateoffsets
(topics: set[string]
, groupid: string)
: unit =")
val partitions: set[topicandpartition]
= partitionse.right.get
val consumeroffsetse = kc.
getconsumeroffsets
(groupid, partitions)
if(consumeroffsetse.isleft) hasconsumed =
false
logger.
info
("consumeroffsetse.isleft: "
+ consumeroffsetse.isleft)
if(hasconsumed)")
val earliestleaderoffsets = earliestleaderoffsetse.right.get
val consumeroffsets = consumeroffsetse.right.get
// 可能只是存在部分分割槽consumeroffsets過時,所以只更新過時分割槽的consumeroffsets為earliestleaderoffsets
var offsets: map[topicandpartition, long]
=map()
consumeroffsets.
foreach(}
) logger.
warn
("offsets: "
+ consumeroffsets)if(
!offsets.isempty)
}else
else
val offsets = leaderoffsets.map
logger.
warn
("offsets: "
+ offsets)
kc.setconsumeroffsets
(groupid, offsets)}}
)}
獲取到offset後,就可以呼叫kafkautils提供的介面建立inputdstream
kafkautils.createdirectstream[string, string](
ssc, locationstrategies.preferconsistent, consumerstrategies.subscribe[string, string]( topics, kafkaparams, offsets))
實時任務頻寬控制
proc sys kernel sched rt runtimes us,預設 950000 proc sys kernel sched rt period us,預設 1000000 在使用該功能時,當實時任務的頻寬用盡時 sched rt runtime us 核心會將對應的實時執行佇列rt r...
實時任務資料丟失
flink實時任務 從kafka集群讀取源資料 從redis定期全量拉取使用者白名單,然後進行廣播 源資料connect白名單資料,源資料根據白名單資料進行過濾處理 過濾處理完後的資料,http推送 寫redis 寫log等 上線驗證的時候,有些資料丟失,而且比較頻繁,分析可能原因 kafka源資料...
linux實時任務排程演算法分析
鑑於最近有關cpu占有率的一些問題涉及到linux核心的排程演算法,有必要進行了解。因此,寫了這篇文章。linux常見的任務有兩種,實時任務與非實時任務。實時任務的排程演算法是大家都非常熟悉的優先順序搶占或優先順序搶占加時間片兩種,其主要思想是效率優先。非實時任務的排程演算法是cfs 完全公平演算法...