在我們現在開發的專案中,經常會用到kafka訊息中介軟體。一般情況下,單執行緒(單分割槽)的配置已經可以滿足需求,但是在某些大資料和資料併發量要求較高的應用場景下經常會遇到訊息來不及處理,出現訊息積壓的情況。因此,該文章主要針對這種應用場景提供了乙個多執行緒消費的解決方案
自己在平時使用kafka訊息中介軟體的時候開始也並沒有分割槽的概念,都是像傳統的mq訊息中介軟體一樣,直接從topic裡消費訊息就行了。但是在有次專案現場發現有時候topic裡的訊息往往會積壓一部分無法消費。後來經過網上查閱資料和閱讀kafka官方文件,了解到可以使用kafka提供的多分割槽能力來解決這個問題。
官網上關於kafka的分割槽概念介紹很多,我這裡總結一下就是:
kafka的分割槽,相當於把乙個topic再細分成了多個通道,乙個消費者應用可以從乙個通道或者多個通道中獲取資料。例如:
生產者隨機分割槽提交資料
這也是乙個比較關鍵步驟,只有隨機提交到不同的分割槽,才能實現多分割槽消費;可以自定義自己的分割槽策略,如下:
public
intpartition
(string topic, object key,
byte
keybytes, object var,
byte
valuebytes, cluster cluster)
catch
(exception e)
return math.
abs(partitionsnum % numpartitions)
;}
然後在初始化kafka生產者配置的時候修改如下配置:
props.
put(
"partitioner.class"
,properties.
getproperty
(com.mykafka.mypartition)
);
這樣就實現了kafka生產者隨機分割槽提交資料。
消費者多執行緒消費資料
最後一步就是消費者,修改單執行緒模式為多執行緒,這裡的多執行緒實現方式有很多,這裡就以最簡單的固定執行緒模式:
executorservice fixedthreadpool = executors.
newfixedthreadpool(3
);for(
int i =
0; i <
3; i++)}
);}
在消費時需要注意,這裡得遍歷所有的分割槽,否則還是只消費了乙個分割槽:
consumerrecordsrecords = cosumer.poll(1000);
for (topicpartition partition : records.partitions()) else
}}
注意上面的執行緒設定為固定的3個,因為這裡得跟上面kafka的分割槽個數相對應起來,否則如果執行緒超過了分割槽數量,那麼只會浪費執行緒,因為即使使用3個以上的執行緒也只會消費三個分割槽,而少了則無法消費完全。所以建議分割槽的數量和執行緒數設定為一致的。
通過上面的步驟,生產者和消費者就支援多分割槽和多執行緒的應用場景了。
需要注意的是,僅僅是消費者做了多執行緒用處不大的, 必須生產者生產資料的時候將資料發到不同的分割槽才適用大的應用場景。否則也只是治標不治本的加快消費速度而已。
在實際使用過程中,也遇到了一些問題,比如生產者隨機分配資料到分割槽時,分配並不均勻。我在topic上設定了四個分割槽,壓測過程中,發現每個分割槽的資料量差別挺大的,極端的時候,只有乙個分割槽有資料,其餘三個分割槽空閒。解決方法就是在用生產者生產資料時,send方法需要指定key。kafka會根據key的值,通過一定的演算法,如hash,將資料平均的傳送到不同的分割槽上。
高併發業務場景下常見的解決方案
由於系統都是連線資料庫的,但是一般最多資料庫每秒只能支撐幾千的並非,如果業務量激增,會導致系統宕機 因此需要從一下幾點入手設計 系統拆分 快取 mq 分庫分表 讀寫分離 搜尋 將乙個系統進行功能拆分,如現在流行的微服務,每個服務連線的資料庫分開,分開部署。這樣可以將壓力進行拆分,緩解因為網路和資料庫...
高併發場景下的限流策略
目錄快取 降級 限流 漏桶演算法 令牌桶演算法 漏桶演算法與令牌桶演算法的區別 有效提公升熱點資料的訪問效率,在高併發 大流量的場景降低服務端壓力。當訪問量快速增長 服務可能會出現一些問題 響應超時 或者會存在非核心服務影響到核心流程的效能時,仍然需要保證服務的可用性,即便是有損服務。所以意味著我們...
高併發場景下的請求合併
一.在專案中,我們經常用到如下方式進行介面呼叫 有多少請求訪問,就會呼叫多少次第三方介面或資料庫,這樣的情況在高併發場景下很容易出現執行緒被打滿,返回結果慢。為了優化這個介面,後台可以將相同的請求進行合併,然後呼叫批量的查詢介面。請求合併 下面上 已查詢資料庫舉例 1.建立請求類 data buil...