使用pykafka操作kafka踩的乙個坑

2021-07-31 12:38:29 字數 1126 閱讀 3743

一、關於如何使用pykafka,請看這裡

我想說的主要是pykafka消費訊息時的問題- 消費訊息時我們很多時候希望不要重複消費,對已經消費過的資訊進行捨棄

我查了很多解決方法都是這樣的:

from pykafka import kafkaclient

client = kafkaclient(hosts="localhost:9092")

topic = client.topics['test']

consumer = topic.get_******_consumer(

consumer_group='test1',

auto_commit_enable=true,

auto_commit_interval_ms=1,

consumer_id='test'

)for x in consumer:

if x is

notnone:

print(x.value.decode('utf-8'))

其實是因為kafka在傳輸的時候需要bytes,而不是str,所以在str上加上b標識就可以,如下

from pykafka import kafkaclient

client = kafkaclient(hosts="localhost:9092")

topic = client.topics[b'test']

consumer = topic.get_******_consumer(

consumer_group=b'test1',

auto_commit_enable=true,

auto_commit_interval_ms=1,

consumer_id=b'test'

)for x in consumer:

if x is

notnone:

print(x.value.decode('utf-8'))

這樣就可以實現我們想要的不重複消費訊息了

單台機器使用docker安裝啟動kafka

kafaka執行需要zookeeper,docker拉取映象 docker pull wurstmeister zookeeper docker pull wurstmeister kafka啟動 docker run d name zookeeper p 2181 2181 t wurstmeis...

使用shell讀取文字檔案傳送到kafka

bin sh 引數定義 dt date y m d d 1 days outpath x log txt brokerlist 192.168.1 100 9092,192.168.1.101 9092,192.168.1.102 9092 echo dt outpath brokerlist 查詢...

fso使用操作

fso方法使用說明 set fso server.createobject scripting.filesystemobject fso檔案 顯示檔案列表 set f fso.getfolder folderspec set fc f.files for each f1 in fc s s f1.n...