一、關於如何使用pykafka,請看這裡我想說的主要是pykafka消費訊息時的問題- 消費訊息時我們很多時候希望不要重複消費,對已經消費過的資訊進行捨棄
我查了很多解決方法都是這樣的:
from pykafka import kafkaclient
client = kafkaclient(hosts="localhost:9092")
topic = client.topics['test']
consumer = topic.get_******_consumer(
consumer_group='test1',
auto_commit_enable=true,
auto_commit_interval_ms=1,
consumer_id='test'
)for x in consumer:
if x is
notnone:
print(x.value.decode('utf-8'))
其實是因為kafka在傳輸的時候需要bytes,而不是str,所以在str上加上b標識就可以,如下
from pykafka import kafkaclient
client = kafkaclient(hosts="localhost:9092")
topic = client.topics[b'test']
consumer = topic.get_******_consumer(
consumer_group=b'test1',
auto_commit_enable=true,
auto_commit_interval_ms=1,
consumer_id=b'test'
)for x in consumer:
if x is
notnone:
print(x.value.decode('utf-8'))
這樣就可以實現我們想要的不重複消費訊息了 單台機器使用docker安裝啟動kafka
kafaka執行需要zookeeper,docker拉取映象 docker pull wurstmeister zookeeper docker pull wurstmeister kafka啟動 docker run d name zookeeper p 2181 2181 t wurstmeis...
使用shell讀取文字檔案傳送到kafka
bin sh 引數定義 dt date y m d d 1 days outpath x log txt brokerlist 192.168.1 100 9092,192.168.1.101 9092,192.168.1.102 9092 echo dt outpath brokerlist 查詢...
fso使用操作
fso方法使用說明 set fso server.createobject scripting.filesystemobject fso檔案 顯示檔案列表 set f fso.getfolder folderspec set fc f.files for each f1 in fc s s f1.n...