consumer 採用 pull(拉)模式從 broker 中讀取資料。
push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而 pull 模式則可以根據 consumer 的消費能力以適當的速率消費訊息。
pull 模式不足之處是,如果 kafka 沒有資料,消費者可能會陷入迴圈中,一直返回空資料。針對這一點,kafka 的消費者在消費資料時會傳入乙個時長引數 timeout,如果當前沒有資料可供消費,consumer 會等待一段時間之後再返回,這段時長即為 timeout。
同一時刻,一條訊息只能被組中的乙個消費者例項消費,消費者組訂閱這個主題,意味著主題下的所有分割槽都會被組中的消費者消費到,如果按照從屬關係來說的話就是,主題下的每個分割槽只從屬於組中的乙個消費者,不可能出現組中的兩個消費者負責同乙個分割槽。如果分割槽數大於或者等於組中的消費者例項數,那自然沒有什麼問題,無非乙個消費者會負責多個分割槽,(ps:當然,最理想的情況是二者數量相等,這樣就相當於乙個消費者負責乙個分割槽);但是,如果消費者例項的數量大於分割槽數,那麼按照預設的策略(ps:之所以強調預設策略是因為你也可以自定義策略),有一些消費者是多餘的,一直接不到訊息而處於空閒狀態。
kafka 有兩種分配策略,一是 roundrobin,一是 range。
roundronbin分配策略的具體實現是org.apache.kafka.clients.consumer.roundrobinassignor
輪詢分配策略是基於所有可用的消費者和所有可用的分割槽的,如果所有的消費者例項的訂閱都是相同的,那麼這樣最好了,可用統一分配,均衡分配
例如,假設有兩個消費者c0和c1,兩個主題t0和t1,每個主題有3個分割槽,分別是t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那麼,最終分配的結果是這樣的:
c0: [t0p0, t0p2, t1p1]
c1: [t0p1, t1p0, t1p2]
uploading-image-126242.png
假設,組中每個消費者訂閱的主題不一樣,分配過程仍然以輪詢的方式考慮每個消費者例項,但是如果沒有訂閱主題,則跳過例項。當然,這樣的話分配肯定不均衡。也就是說,消費者組是乙個邏輯概念,同組意味著同一時刻分割槽只能被乙個消費者例項消費,換句話說,同組意味著乙個分割槽只能分配給組中的乙個消費者。事實上,同組也可以不同訂閱,這就是說雖然屬於同乙個組,但是它們訂閱的主題可以是不一樣的。
例如,假設有3個主題t0,t1,t2;其中,t0有1個分割槽p0,t1有2個分割槽p0和p1,t2有3個分割槽p0,p1和p2;有3個消費者c0,c1和c2;c0訂閱t0,c1訂閱t0和t1,c2訂閱t0,t1和t2。那麼,按照輪詢分配的話,c0應該負責
首先,肯定是輪詢的方式,其次,比如說有主題t0,t1,t2,它們分別有1,2,3個分割槽,也就是t0有1個分割槽,t1有2個分割槽,t2有3個分割槽;有3個消費者分別從屬於3個組,c0訂閱t0,c1訂閱t0和t1,c2訂閱t0,t1,t2;那麼,按照輪詢分配的話,c0應該負責t0p0,c1應該負責t1p0,其餘均由c2負責。
最後的結果是
c0: [t0p0]
c1: [t1p0]
c2: [t1p1, t2p0, t2p1, t2p2]
這是因為,按照輪詢t0p1由c0負責,t1p0由c1負責,由於同組,c2只能負責t1p1,由於只有c2訂閱了t2,所以t2所有分割槽由c2負責,綜合起來就是這個結果
range策略對應的實現類是org.apache.kafka.clients.consumer.rangeassignor,這是預設的分配策略
對於每個主題,我們以數字順序排列可用分割槽,以字典順序排列消費者。然後,將分割槽數量除以消費者總數,以確定分配給每個消費者的分割槽數量。如果沒有平均劃分(ps:除不盡),那麼最初的幾個消費者將有乙個額外的分割槽。
簡而言之,就是,
1、range分配策略針對的是主題(ps:也就是說,這裡所說的分割槽指的某個主題的分割槽,消費者值的是訂閱這個主題的消費者組中的消費者例項)
2、首先,將分割槽按數字順序排行序,消費者按消費者名稱的字典序排好序
3、然後,用分割槽總數除以消費者總數。如果能夠除盡,則皆大歡喜,平均分配;若除不盡,則位於排序前面的消費者將多負責乙個分割槽
例如,假設有兩個消費者c0和c1,兩個主題t0和t1,並且每個主題有3個分割槽,分割槽的情況是這樣的:t0p0,t0p1,t0p2,t1p0,t1p1,t1p2
那麼,基於以上資訊,最終消費者分配分割槽的情況是這樣的:
kafka 0.9 版本之前,consumer 預設將 offset 儲存在 zookeeper 中,從 0.9 版本開始,consumer 預設將 offset 儲存在 kafka 乙個內建的 topic 中,該 topic 為__consumer_offsets。
為了使我們能夠專注於自己的業務邏輯,kafka 提供了自動提交 offset 的功能。
自動提交 offset 的相關引數:
enable.auto.commit:是否開啟自動提交 offset 功能
auto.commit.interval.ms:自動提交 offset 的時間間隔
雖然自動提交 offset 十分簡介便利,但由於其是基於時間提交的,開發人員難以把握offset 提交的時機。因此 kafka 還提供了手動提交 offset 的 api。
手動提交 offset 的方法有兩種:分別是 commitsync(同步提交)和 commitasync(非同步提交)。兩者的相同點是,都會將本次 poll 的一批資料最高的偏移量提交;不同點是,commitsync 阻塞當前執行緒,一直到提交成功,並且會自動失敗重試(由不可控因素導致,也會出現提交失敗);而 commitasync 則沒有失敗重試機制,故有可能提交失敗。
kafka 主動消費 Kafka基礎原理
1 訊息中介軟體作用 解耦 非同步 削峰 2 訊息中介軟體通訊模式 如上圖所示,點對點模式通常是基於拉取或者輪詢的訊息傳送模型,這個模型的特點是傳送到佇列的訊息被乙個且只有乙個消費者進行處理。生產者將訊息放入訊息佇列後,由消費者主動的去拉取訊息進行消費。點對點模型的的優點是消費者拉取訊息的頻率可以由...
kafka 主動消費 Kafka消費者的使用和原理
publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...
kafka重複消費 漏消費情況
kafka重複消費的情況 資料沒有丟,只是資料重複消費了。丟不丟資料指的是producer到broker的過程,以及broker儲存資料的過程。重複消費 漏消費指的是消費結果,所以我們記憶這些過程的時候,或者定位問題的時候,首先應該明確,是丟資料了還是重複消費了。重複消費 ack 1 produce...