kafka是乙個高吞吐量的分布式的發布訂閱系統。
一、生產者大概執行流程
1.一條訊息過來首先會被封裝成乙個producerrecord物件
2.接下來要對這個物件進行序列化,因為kafka的訊息要從客戶端傳到伺服器端,涉及到網路傳輸,所以需要實現序列化。kafka提供了預設的序列化方式,也支援自定義序列化。
3.訊息序列化完了之後,對訊息要進行分割槽,分割槽的時候需要獲取集群的元資料。分割槽的這個過程很關鍵,因為這個時候就決定了我們這條訊息會被傳送到kafka服務端的哪個主題哪個分割槽了。
4.分好區的訊息不是直接被傳送到服務端,而是放入了生產者的乙個快取裡。在這個快取裡,多條訊息會被封裝成乙個批次,預設的乙個批次大小為16k。
5.sender執行緒啟動後會從快取裡去獲取可以傳送的批次。
6.sender執行緒把乙個乙個批次傳送到服務端。
1)producer:訊息生產者,發布訊息到kafka集群的終端或服務
2)broker:kafaka集群中包含的伺服器
3)topic:每條發布到kafka集群的訊息屬於的類別,即kafka是面向topic的
4)partition:每個topic包含乙個或者多個partition,kafka的分配單位是partition
5)comsumer:從kafka集群中消費訊息的終端或服務
6)comsumer group:每個consumer都屬於乙個consumer group,每條訊息只能被consumer group中的乙個comsumer group消費
7)replica:partition的副本,保障partition的高可用
8)leader:replica中的乙個角色,producer和consumer只跟leader互動
9)follower:replica中的乙個角色,從leader中複製資料
10)controller:kafka集群中的乙個伺服器,用來進行leader election以及各種failover
11)zookeeper:kafka通過zookeeper來儲存集群的meta資訊
對於傳統的message queue而言,一般會刪除已經被消費的訊息,而kafka集群會保留所有的訊息,無論其被消費與否。當然,因為磁碟的限制,不可能永久保留所有的資料,因此kafka提供兩種策略去刪除舊資料。一是基於時間,二是基於partition檔案大小。
kafka會為每乙個consumer group保留一些metadata資訊——當前消費的訊息的position,即offset。這個offect由consumer控制。正常情況下consumer會在消費完一條訊息後線性增加這個offset。當然,consumer也可將offset設成乙個較小的值,重新消費一些訊息。因為offset是無狀態的,它不需要標記哪些訊息被消費過,不需要通過broker去保證同乙個consumer group只有乙個consumer能消費某一條訊息,因此也就不需要鎖機制,這也為kafka的高吞吐率提供了有力保障。
c++使用librdkafka庫實現kafka的消費例項
1) 建立kafka配置
rdkafka::conf *conf = nullptr;
conf = rdkafka::conf::create(rdkafka::conf::conf_global);
2) 設定kafka各項引數
conf->set(「bootstrap.servers」, brokers_, errstr); //設定broker list
conf->set(「group.id」, groupid_, errstr); //設定consumer group
conf->set(「max.partition.fetch.bytes」, strfetch_num, errstr); //每次從單個分割槽中拉取訊息的最大尺寸
3) 建立kafka topic配置
rdkafka::conf *tconf = nullptr;
tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);
4) 設定kafka topic引數
if(tconf->set(「auto.offset.reset」, 「smallest」, errstr))
5) 建立kafka consumer例項
kafka_consumer_=rdkafka::consumer::create(conf, errstr);
6) 建立kafka topic
rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);
7) 啟動kafka consumer例項
rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);
8) 消費kafka
kafka_consumer_->consume(topic_, partition_, timeout_ms);
9) 阻塞等待訊息
kafka_consumer_->poll(0);
10)停止消費
kafka_consumer_->stop(topic_, partition_);
11)銷毀consumer例項
rdkafka::wait_destroyed(5000);
乙個典型的kafka集群中包含若干個producer(可以是web前端產生的page view,或者是伺服器日誌,系統cpu、memory等),若干broker(kafka支援水平擴充套件,一般broker數量越多,集群吞吐率越高),若干consumer group,以及乙個zookeeper集群。kafka通過zookeeper管理集群配置,選舉leader,以及在consumer group發生變化時進行rebalance。producer使用push模式將訊息發布到broker,consumer使用pull模式從broker訂閱並消費訊息。
kafka相關知識點總結
1 kafka是什麼 類jms訊息佇列,結合jms中的兩種模式 點對點模型,發布者 訂閱者模型 可以有多個消費者主動拉取資料,在jms中只有點對點模式才有消費者主動拉取資料。kafka是乙個生產 消費模型。producer 生產者,只負責資料生產,生產者的 可以整合到任務系統中。資料的分發策略由pr...
kafka相關命令
1.群起指令碼 bin bash case 1 in start stop esac 2.檢視主題 bin kafka topics.sh list zookeeper k8smaster 2181 3.建立主題 kafka topics.sh create zookeeper k8smaster ...
kafka知識總結
1 kafka是什麼 類jms訊息佇列,結合jms中的兩種模式,可以有多個消費者主動拉取資料,在jms中只有點對點模式才有消費者主動拉取資料。kafka是乙個生產 消費模型。producer 生產者,只負責資料生產,生產者的 可以整合到任務系統中。資料的分發策略由producer決定,預設是defa...