集群中建立主題
bin/kafka-topics.sh --create --zookeeper 192.168.0.130:2181,192.168.0.131:2181,192.168.0.132:2181 --replication-factor 3 --partitions 3 --topic topictest
集群中檢視主題
bin/kafka-topics.sh --list --zookeeper 192.168.0.130:2181,192.168.0.131:2181,192.168.0.132:2181
生產者
bin/kafka-console-producer.sh --broker-list 192.168.0.130:9092,192.168.0.131:9092,192.168.0.132:9092 --topic topictest
消費者
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic topictest
表示從 latest 位移位置開始消費該主題的所有分割槽訊息,即僅消費正在寫入的訊息。
從開始位置消費
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092 --from-beginning --topic topictest
表示從指定主題中有效的起始位移位置開始消費所有分割槽的訊息。
顯示key消費
bin/kafka-console-consumer.sh --bootstrap-server hadoop1:9092,hadoop2:9092,hadoop3:9092 --property print.key=true --topic topictest
消費出的訊息結果將列印出訊息體的 key 和 value。 kafka刪除新建topic
原理翻譯可參考 刪除kafka的topic,參照 1 刪除kafka儲存目錄 server.properties檔案log.dirs配置,預設為 tmp kafka logs 相關topic目錄 2 kafka 刪除topic的命令是 bin kafka topics delete zookeepe...
kafka集群擴容後的topic分割槽遷移
kafka集群擴容後,新的broker上面不會資料進入這些節點,也就是說,這些節點是空閒的 它只有在建立新的topic時才會參與工作。除非將已有的partition遷移到新的伺服器上面 所以需要將一些topic的分割槽遷移到新的broker上。kafka reassign partitions.sh...
kafka 集群管理 建立乙個topic流程
topiccommand.createtopic def createtopic zkutils zkutils,opts topiccommandoptions else 往目錄裡面寫topic的分割槽的分配方案 writetopicpartitionassignment zkutils,topi...