kafka提供了兩套consumer api:高階consumer api和低階consumer api
1、高階api
1)高階api優點
高階api 寫起來簡單
不需要自行去管理offset,系統通過zookeeper自行管理。
不需要管理分割槽,副本等情況,系統自動管理。
消費者斷線會自動根據上一次記錄在zookeeper中的offset去接著獲取資料(預設設定1分鐘更新一下zookeeper中存的offset)
可以使用group來區分對同乙個topic 的不同程式訪問,分離開來(不同的group記錄不同的offset,這樣不同程式讀取同乙個topic才不會因為offset互相影響)
2)高階api缺點
不能自行控制offset(對於某些特殊需求來說)
不能細化控制如分割槽、副本、zk等
2、低階api
1)低階 api 優點
能夠讓開發者自己控制offset,想從**讀取就從**讀取。
自行控制連線分割槽,對分割槽自定義進行負載均衡
對zookeeper的依賴性降低(如:offset不一定非要靠zk儲存,自行儲存offset即可,比如存在檔案或者記憶體中)
2)低階api缺點
太過複雜,需要自行控制offset,連線哪個分割槽,找到分割槽leader 等。
3、消費者組
消費者是以consumer group消費者組的方式工作,由乙個或者多個消費者組成乙個組,共同消費乙個topic。每個分割槽在同一時間只能由group中的乙個消費者讀取,但是多個group可以同時消費這個partition。在圖中,有乙個由三個消費者組成的group,有乙個消費者讀取主題中的兩個分割槽,另外兩個分別讀取乙個分割槽。某個消費者讀取某個分割槽,也可以叫做某個消費者是某個分割槽的擁有者。在這種情況下,消費者可以通過水平擴充套件的方式同時讀取大量的訊息。另外,如果乙個消費者失敗了,那麼其他的group成員會自動負載均衡讀取之前失敗的消費者讀取的分割槽。
4、消費方式
consumer採用pull(拉)模式從broker中讀取資料。
push(推)模式很難適應消費速率不同的消費者,因為訊息傳送速率是由broker決定的。它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成consumer來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而pull模式則可以根據consumer的消費能力以適當的速率消費訊息。
對於kafka而言,pull模式更合適,它可簡化broker的設計,consumer可自主控制消費訊息的速率,同時consumer可以自己控制消費方式——即可批量消費也可逐條消費,同時還能選擇不同的提交方式從而實現不同的傳輸語義。
5、消費者組案例
1)測試同乙個消費者組中的消費者,同一時刻只能有乙個消費者消費
2)在node2、node3上修改/opt/module/kafka/config/consumer.properties配置檔案中的group.id屬性為任意組名
[victor@node2 config]$ vim consumer.properties
group.id=victor
[victor@node3 config]$ vim consumer.properties
group.id=victor
3)在node2、node3上分別啟動消費者
[victor@node1 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper node1:2181 \
--topic first \
--consumer.config config/consumer.properties
[victor@node2 kafka]$ bin/kafka-console-consumer.sh \
--zookeeper node1:2181 \
--topic first \
--consumer.config config/consumer.properties
4)在node3上啟動生產者
[victor@node3 kafka]$ bin/kafka-console-producer.sh \
--broker-list node1:9092 \
--topic first
>hello world
>hello
>world
黑猴子的家 Hadoop Checkpoint機制
fsimage和edit log合併的過程如下圖所示 其實這個合併過程是乙個很耗i o與cpu的操作,並且在進行合併的過程中肯定也會有其他應用繼續訪問和修改hdfs檔案。所以,這個過程一般不是在單一的namenode節點上進行從。如果hdfs沒有做ha的話,checkpoint由secondname...
黑猴子的家 FileInputFormat切片機制
1 job提交流程原始碼詳解 waitforcompletion submit 1 建立連線 connect 1 建立提交job的 new cluster getconfiguration 2 判斷是本地yarn還是遠端 initialize jobtrackaddr,conf 2 提交job su...
黑猴子的家 Zookeeper Java API
1 code github 2 環境準備 1 建立乙個工程 2 解壓zookeeper 3.4.10.tar.gz檔案 3 拷貝zookeeper 3.4.10.jar jline 0.9.94.jar log4j 1.2.16.jar netty 3.10.5.final.jar slf4j ap...