pull wurstmeister/zookeeper
sudo docker pull wurstmeister/zookeeper
pull wurstmeister/kafka
sudo docker pull wurstmeister/kafka
啟動zookeeper
sudo docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
啟動kafka
sudo docker run -d -t --name kafka -p 9092:9092 -e kafka_broker_id=0 -e kafka_zookeeper_connect=192.168.18.166:2181 -e kafka_advertised_listeners=plaintext: -e kafka_listeners=plaintext: wurstmeister/kafka
server.properties 修改num.partitions=2 ,表示2個分割槽
num.partitions=2
重啟kafka container
product
import json
from kafka import kafkaproducer
def sendmsg(topic,msg_dict):
producer = kafkaproducer(bootstrap_servers=["192.168.18.166:9092"],value_serializer=lambda v: json.dumps(v).encode('utf-8'))
'''send json string to kafka '''
producer.send(topic, msg_dict)
producer.close()
if __name__ == '__main__':
for i in range(10):
sendmsg("peter.test1",str(str(i)+'11'))
print("over"+str(str(i)+'10'))
sendmsg("json",msg_dict)
兩個consumer指定分割槽消費,,如果不指定分割槽,則消費全部訊息
#consumer 1
from kafka import kafkaconsumer
import logging
import json
import datetime
from kafka import topicpartition
def main():
#consumer = kafkaconsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,
consumer = kafkaconsumer( group_id="peter_consumer_cluser1", max_poll_records=5, max_poll_interval_ms=600000,
#enable_auto_commit =false,
bootstrap_servers=["192.168.18.166:9092"], value_deserializer=json.loads)
print("start consumer",str(consumer))
consumer.assign([topicpartition('peter.test1', 0)]) # 指定topic 和指定分割槽消費
for message in consumer:
# print(str(message.offset()))
print("receive label message")
if message:
try:
print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))
#consumer.commit()
except exception as e:
logging.error("@@----> exception : ")
logging.error(e)
traceback.print_exc()
if __name__ == '__main__':
main()
consumer2
from kafka import kafkaconsumer
import logging
import json
import datetime
from kafka import topicpartition
def main():
#consumer = kafkaconsumer( "peter.test_cluser",group_id="peter_consumer_cluser3", max_poll_records=5, max_poll_interval_ms=600000,
consumer = kafkaconsumer( group_id="peter_consumer_cluser2", max_poll_records=5, max_poll_interval_ms=600000,
#enable_auto_commit =false,
bootstrap_servers=["192.168.18.166:9092"], value_deserializer=json.loads)
print("start consumer",str(consumer))
consumer.assign([topicpartition('peter.test1', 1)])# 指定topic 和指定分割槽消費
for message in consumer:
# print(str(message.offset()))
print("receive label message")
if message:
try:
print("@@@@@ ---> consumer_cluser1 get new message ",str(message.value))
#consumer.commit()
except exception as e:
logging.error("@@----> exception : ")
logging.error(e)
traceback.print_exc()
if __name__ == '__main__':
main()
kafka 修改分割槽 kafka分割槽
一 topic下引入partition的作用 topic是邏輯的概念,partition是物理的概念。為了效能考慮,如果topic內的訊息只存於乙個broker,那這個broker會成為瓶頸,無法做到水平擴充套件。kafka通過演算法盡可能的把partition分配到集群的不同伺服器上。partit...
基於docker環境搭建kafka集群(單機版)
如果沒有 zookeeper 映象,則拉去zookeeper映象 docker pull wurstmeister zookeeper拉取 kafka映象docker pull wurstmeister kafka啟動zookeeper映象 docker run d name zookeeper p...
kafka 分割槽數
kafka的分割槽,相當於把乙個topic再細分成了多個通道 對應 多個執行緒 部署的時候盡量做到乙個消費者 執行緒 對應乙個分割槽。如果你的分割槽數是n,那麼最好執行緒數也保持為n。kafkastream 它是consumer的關鍵類,提供了遍歷方法用於consumer程式呼叫實現資料的消費。其底...