一、topic
1.檢視全部topic
kafka-topics.sh --zookeeper localhost:2181 --list
2.建立topic,指定分割槽數目和備份因子
kafka-topics.sh --zookeeper localhost:2181 --create --topic topic_name --partitions 3 --replication-factor 1
3.描述當前topic
kafka-topics.sh --zookeeper localhost:2181 --describe --topic topic_name
4.指定topic進行刪除
kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
二、生產與消費
1.指定topic,開啟乙個生產視窗
kafka-console-producer.sh --broker-list localhost:9092 --topic topic_name
2.指定topic,開啟乙個消費視窗,--from-beginning表從頭消費,沒有beginning則監聽實時消費
kafka-console-consumer.sh --zookeeper localhost:2181 --topic topic_name --from-beginning
3.新的消費命令,zookeeper換成bootstrap-server,也可以給出group的名稱
kafka-console-consumer.sh --bootstrap-server borker_host1:port1,borker_host2:port2 --topic topic_name --group group_name --from-beginning --max-messages 100
三、消費組
1.檢視所有的消費組
kafka-consumer-groups.sh --bootstrap-server borker_host:9092 --list
2.檢視當前consumer_group下的消費情況
kafka-consumer-groups.sh --bootstrap-server borker_host:9092 --describe --group kafka_consumer_group_v1
3.重置offset的工具
a. kafka-consumer-groups.sh --bootstrap-server borker_host1:port1 --group group_name --reset-offsets --execute --to-offset new_offset --topic topic_name
b. kafka-consumer-groups.sh --bootstrap-server borker_host1:port1 --group group_name --reset-offsets --execute --to-earliest/--to-latest --topic topic_name
第一條,指定group和topic的offset修改到new_offset的位置,下次消費啟動後,消費將從指定的offset處消費。所有的分割槽都將使用這個值。
第二條,指定group和topic的offset修改到earliest或者latest位置,使得消費者從頭或者從尾部消費。
例如,消費者將test_topic的資訊全部都消費了,假設此時我想將每個分割槽的offset都拉到200去,從200往後重新消費,則通過命令:
c. kafka-consumer-groups.sh --bootstrap-server localhost:6067 --group yan --reset-offsets --execute --to-offset 200 --topic shxx_qq_info_old --max-messages 10
對設定的offset進行檢查
d. kafka-consumer-groups.sh --bootstrap-server localhost:6067 --group yan --describe
4.刪除group
kafka-consumer-groups.sh --bootstrap-server localhost:6067 --group test-1 --delete
四、kafka版本檢視方法
kafka沒有像別的軟體一樣有,kafka -verison的命令,但是你可以檢視kafka/libs 下的庫檔案,知道kafka的版本號:
命令一find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
前面2.9.2是scala 的版本,後面0.8.1.1就是你kafka的版本。
命令二find / -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
kafka基本操作例項
1 啟動命令 任意節點建立 nohup kafka server start.sh 2 建立topic kafka topics.sh create zookeeper hadoop1 2181,hadoop2 2181,hadoop3 2181 replication factor 3 parti...
Kafka 基本概述 操作
點對點 一對一,消費者消費後立刻清除訊息 發布 訂閱模式 一對多,消費者消費資料之後不會清除訊息 producer 訊息生產者,想kafka broker傳送訊息的客戶端 consumer 訊息消費者,想kafka broker取訊息的客戶端 consumer group cg 消費者組,多個con...
Kafka 基本介紹
源自小夥伴的分享,我本身也不會使用這個東西,但是通過她的介紹,對kafka有乙個簡單的了解,基於此做乙個整理。1.什麼是kafka?kafka是乙個分布式 分割槽的 多副本的 多訂閱者,基於zookeeper協調的分布式日誌系統 也可以當做mq系統 常見可以用於web nginx日誌 訪問日誌,訊息...