spark在kafka讀數併發問題

2021-09-11 11:00:17 字數 1314 閱讀 7861

也就是修改了 kafkardd 類的 getpartitions 方法:

原實現:

override def getpartitions: array[partition] = .toarray

}修改後的實現:

override def getpartitions: array[partition] =

suboffsetranges(i * subconcurrency + j) = offsetrange.create(offsetrange.topic, offsetrange.partition, from, until)

}suboffsetranges.zipwithindex.map.toarray

}這個方法的實現思想還是很簡單的,就是通過設定 topic.partition.subconcurrency 引數,如果這個引數等於1,整個函式的執行效果和之前一樣。但是如果這個引數大於1,則之前乙個 kafka 分割槽由乙個 spark 分割槽消費的資料變成由 topic.partition.subconcurrency 個 spark 分割槽去消費,每個 spark 分割槽消費的資料量相等。這個無疑會加快 kafka 資料的消費,但是這種方法的問題也很明顯:

如果資料的順序很重要,這種方法會存在亂序的問題。

spark 設計的 kafkardd 目的是讓 kafka partition 和 spark rdd partition 一一對應,這樣可以保證同乙個分割槽裡面的資料順序,但是這種方法實現變成了 kafka partition 和 spark rdd partition 一對多的關係,無疑破壞了官方的原有設計。

到目前為止,上述 pr 被關閉,而且 spark-22056 一直處於 in progress 狀態,我猜這個最後可能也會被關閉掉。

那除了上面實現,我們還有其他實現嗎?當然有,我們可以在處理資料之前通過 repartition 或 coalease 對資料進行重分割槽:

但是這個方法的使用前提是資料重分割槽+後續處理的時間比沒有重分割槽直接處理資料的時間要短,否則重分割槽的開銷過大導致總的處理時間過長那就沒意義了。

但是上面兩種方法無法解決 kafka 端資料傾斜導致的資料處理過慢的問題(也就是有些分割槽資料量相比其他分割槽大很多,光是從這些分割槽消費資料的時間就比其他分割槽要長很多)。針對這種情況,我們需要考慮 kafka 分割槽設定是否合理?是否能夠通過修改 kafka 分割槽的實現來解決資料傾斜的問題。

如果不是 kafka 資料傾斜導致的資料處理過慢的問題,而是所有 kafka 分割槽的整體資料量就比較大,那這種情況我們可以考慮是否可以增加 kafka 分割槽數?是否需要增加 spark 的處理資源等。建議最好還是別使用多個執行緒處理同乙個 kafka 分割槽裡面的資料。

spark併發度控制

並行度可以通過如下三種方式來設定,可以根據實際的記憶體 cpu 資料以及應用程式邏輯的情況調整並行度引數,增加任務的並行度,充分利用集群機器的計算能力,一般並行度設定為集群cpu總和的2 3倍。1 在會產生shuffle的操作函式內設定並行度引數,優先順序最高 1.1 testrdd.groupby...

kafka在高併發場景下的解決方案

在我們現在開發的專案中,經常會用到kafka訊息中介軟體。一般情況下,單執行緒 單分割槽 的配置已經可以滿足需求,但是在某些大資料和資料併發量要求較高的應用場景下經常會遇到訊息來不及處理,出現訊息積壓的情況。因此,該文章主要針對這種應用場景提供了乙個多執行緒消費的解決方案 自己在平時使用kafka訊...

Spark讀取Kafka 高低階API

1 kafkautils.createdstream 建構函式為kafkautils.createdstream ssc,zk consumer group id per topic,partitions 使用了receivers來接收資料,利用的是kafka高層次的消費者api,對於所有的rece...