Kafka訊息重新傳送

2022-09-15 09:51:08 字數 2443 閱讀 5643

kafka訊息重新傳送

1、使用kafka訊息佇列做訊息的發布、訂閱,如果consumer端消費出問題,導致資料並沒有消費,此時不需要擔心,資料並不會立刻丟失,kafka會把資料在伺服器的磁碟上預設儲存7天,或者自己指定有兩種方式:1)指定時間,log.retention.hours=1682)指定大小,log.segment.bytes=1073741824。此時就可以通過重置某個topicoffset來是訊息重新傳送,進行消費

2、檢視topic的offset的範圍

1)使用下面的命令可以檢視topicuserlogbrokerspark:9092offset的最小值:

#./kafka-run-class.sh kafka.tools.getoffsetshell --broker-list spark:9092 -topic userlog --time -2

2)offset的最大值:

#./kafka-run-class.sh kafka.tools.getoffsetshell --broker-list spark:9092 -topic userlog --time -1

3、設定consumer group的offset

1)啟動zookeeper,如果使用的是kafka內建的zookeeper,直接啟動bin目錄下的zookeeper-shell.sh,進行登入:

#./zookeeper-shell.sh  localhost:2181

通過如下命令,來重置offset值,比如topic:userlog,group:userlogs,partition:0 ,offset:2181

set /consumers/userlogs/offsets/userlog/0 1288

如果有多個partition都執行上輸的命令,並將0換為相對應的分割槽號就可以了。

###如果建立分割槽的時候設定了zk的根目錄,如localhost:2181/kafka

則重置的命令為:

set /kafka/consumers/userlogs/offsets/userlog/0 1288

2)如果使用單獨安裝的zookeeper,直接使用bin目錄下的zkcli.sh登入後,執行上述命令即可。

4、設定完成後需要重啟相關的服務,就可以從設定offset的地方開始消費。

重啟服務:consumer服務。

Kafka 訊息傳送

建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...

使KafKa每次讀取訊息到最新傳送訊息的解決方案

情景是使kafka每次讀取訊息到最新傳送訊息,查了很多資料,對kafka的消費組和偏移量也有些研究,但本地與集群,不同版本都有不少不同之處。即使目前解決了該問題,仍有不少坑待填 之前想在這邊放下關於消費組和偏移量的東西,但比較多比較雜,就開了乙個新坑 考慮直接去掉偏移量,但清空偏移量想要reset乙...

kafka訊息傳送模式

在kafka 0.8.2之後,producer不再區分同步 sync 和非同步方式 async 所有的請求以非同步方式傳送,這樣提公升了客戶端效率。producer請求會返回乙個應答物件,包括偏移量或者錯誤信。這種非同步方地批量的傳送訊息到kafka broker節點,因而可以減少server端資源...