bin 目錄下包含了服務的啟動指令碼
啟動 zookeeper
./bin/zookeeper-server-start.sh config/zookeeper.properties啟動 kafka server
./bin/kafka-server-start.sh config/server.properties建立乙個主題
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cctv1(主題名)啟動生產者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cctv1啟動消費者
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1如果要消費前面的資料,在啟動時新增 --from-beginning 引數
./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 --from-beginning檢視 kafka 程序命令:
安裝 sarama 包
go get github.com/shopify/saramademo:建立生產者,往單節點的 kafka 上傳送資料
)//訊息寫入kafka
func main() , config)
if err != nil
defer client.close()
for i:=0; i<5; i++
msg.topic = "cctv1"
msg.value = sarama.stringencoder("this is a good test,hello kai")
//傳送訊息
pid, offset, err := client.sendmessage(msg)
if err != nil
fmt.printf("pid:%v offset:%v\n", pid, offset)
time.sleep(time.second)}}
執行結果:(上述**執行一下,就會往 kafka 中 cctv1 的主題發布 5 條訊息)
然後通過 kafka 自帶的消費者終端檢視傳送的資料
./bin/kafka-console-consumer.sh --bootstrap-server 10.10.4.35:9092 --topic cctv1 --from-beginningdemo:建立消費者,從單節點的 kafka 中消費資料
//獲取 kafka 主題
partitions, err := consumer.partitions("cctv1")
if err != nil
for _, p := range partitions
wg.add(1)
go func()
wg.done()
}()}
wg.wait()
}consumer 版本2
上述**執行起來,就會開啟 kafka 消費者持續監聽,然後通過 kafka 自帶的生產者終端傳送 2 條測試資料,消費結果如下:
Kafka消費者生產者例項
它允許發布和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。它可以容錯的方式儲存記錄流。它可以處理記錄發生時的流。由於主要介紹如何使用kafka快速構建生產者消費者例項,所以不會涉及kafka內部的原理。乙個基於kafka的生產者消費者過程通常是這樣的 來自官網 cd kafka 2.11 0.11....
Kafka消費者生產者例項
2017年07月30日18 22 56 rhwayfunn 閱讀數 13818標籤 kafka 更多 個人分類 分布式系統 為了更為直觀展示卡夫卡的訊息生產消費的過程,我會從基於控制台和基於應用兩個方面介紹使用例項.kafka是乙個分布式流處理平台,具體來說有三層含義 它允許發布和訂閱記錄流,類似於...
Kafka命令 生產者 消費者
檢視當前伺服器中的所有 topic bin kafka topics.sh list zookeeper zk01 2181 建立 topic bin kafka topics.sh create zookeeper zk01 2181 replication factor 1 partitions...