Go 關於 kafka 的生產者 消費者例項

2022-08-17 23:57:13 字數 2983 閱讀 2978

bin 目錄下包含了服務的啟動指令碼

啟動 zookeeper

./bin/zookeeper-server-start.sh config/zookeeper.properties
啟動 kafka server

./bin/kafka-server-start.sh config/server.properties
建立乙個主題

./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cctv1(主題名)
啟動生產者

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic cctv1
啟動消費者

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1
如果要消費前面的資料,在啟動時新增 --from-beginning 引數

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic cctv1 --from-beginning
檢視 kafka 程序命令:

安裝 sarama 包

go get github.com/shopify/sarama
demo:建立生產者,往單節點的 kafka 上傳送資料

)//訊息寫入kafka

func main() , config)

if err != nil

defer client.close()

for i:=0; i<5; i++

msg.topic = "cctv1"

msg.value = sarama.stringencoder("this is a good test,hello kai")

//傳送訊息

pid, offset, err := client.sendmessage(msg)

if err != nil

fmt.printf("pid:%v offset:%v\n", pid, offset)

time.sleep(time.second)}}

執行結果:(上述**執行一下,就會往 kafka 中 cctv1 的主題發布 5 條訊息)

然後通過 kafka 自帶的消費者終端檢視傳送的資料

./bin/kafka-console-consumer.sh --bootstrap-server 10.10.4.35:9092 --topic cctv1 --from-beginning
demo:建立消費者,從單節點的 kafka 中消費資料

//獲取 kafka 主題

partitions, err := consumer.partitions("cctv1")

if err != nil

for _, p := range partitions

wg.add(1)

go func()

wg.done()

}()}

wg.wait()

}consumer 版本2

上述**執行起來,就會開啟 kafka 消費者持續監聽,然後通過 kafka 自帶的生產者終端傳送 2 條測試資料,消費結果如下:

Kafka消費者生產者例項

它允許發布和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。它可以容錯的方式儲存記錄流。它可以處理記錄發生時的流。由於主要介紹如何使用kafka快速構建生產者消費者例項,所以不會涉及kafka內部的原理。乙個基於kafka的生產者消費者過程通常是這樣的 來自官網 cd kafka 2.11 0.11....

Kafka消費者生產者例項

2017年07月30日18 22 56 rhwayfunn 閱讀數 13818標籤 kafka 更多 個人分類 分布式系統 為了更為直觀展示卡夫卡的訊息生產消費的過程,我會從基於控制台和基於應用兩個方面介紹使用例項.kafka是乙個分布式流處理平台,具體來說有三層含義 它允許發布和訂閱記錄流,類似於...

Kafka命令 生產者 消費者

檢視當前伺服器中的所有 topic bin kafka topics.sh list zookeeper zk01 2181 建立 topic bin kafka topics.sh create zookeeper zk01 2181 replication factor 1 partitions...