#-*- coding: utf-8 -*-
'''''
使用kafka-python 1.3.3模組
# pip install kafka==1.3.5
# pip install kafka-python==1.3.5
'''import
sysimport
time
import
json
from kafka import
kafkaproducer
from kafka import
kafkaconsumer
from kafka.errors import
kafkaerror
kafaka_host = "
101.236.51.235
"kafaka_port = 9092kafaka_topic = "
test
"class
kafka_producer():
'''''
生產模組:根據不同的key,區分訊息
'''def
__init__
(self, kafkahost,kafkaport, kafkatopic, key):
self.kafkahost =kafkahost
self.kafkaport =kafkaport
self.kafkatopic =kafkatopic
self.key =key
print("
producer:h,p,t,k
",kafkahost,kafkaport,kafkatopic,key)
bootstrap_servers = ':'
.format(
kafka_host=self.kafkahost,
kafka_port=self.kafkaport
)print("
boot svr:
",bootstrap_servers)
self.producer = kafkaproducer(bootstrap_servers =bootstrap_servers
)defsendjsondata(self, params):
try:
parmas_message = json.dumps(params,ensure_ascii=false)
producer =self.producer
(parmas_message)
v = parmas_message.encode('
utf-8')
k = key.encode('
utf-8')
print("
send msg:(k,v)
",k,v)
producer.send(self.kafkatopic, key=k, value=v)
producer.flush()
except
kafkaerror as e:
(e)class
kafka_consumer():
'''''
消費模組: 通過不同groupid消費topic裡面的訊息
'''def
__init__
(self, kafkahost, kafkaport, kafkatopic, groupid):
self.kafkahost =kafkahost
self.kafkaport =kafkaport
self.kafkatopic =kafkatopic
self.groupid =groupid
self.key =key
self.consumer = kafkaconsumer(self.kafkatopic, group_id =self.groupid,
bootstrap_servers = ':'
.format(
kafka_host=self.kafkahost,
kafka_port=self.kafkaport )
)defconsume_data(self):
try:
for message in
self.consumer:
yield
message
except
keyboardinterrupt as e:
(e)def
main(xtype, group, key):
'''''
測試consumer和producer
'''if xtype == "p"
:
#生產模組
producer =kafka_producer(kafaka_host, kafaka_port, kafaka_topic, key)
print ("
**********=> producer:
", producer)
for _id in range(100):
params = '
' %str(_id)
params=[,]
producer.sendjsondata(params)
time.sleep(1)
if xtype == 'c'
:
#消費模組
consumer =kafka_consumer(kafaka_host, kafaka_port, kafaka_topic, group)
print ("
**********=> consumer:
", consumer)
message =consumer.consume_data()
for msg in
message:
print ('
msg---------------->k,v
', msg.key,msg.value)
print ('
offset---------------->
', msg.offset)
if__name__ == '
__main__':
xtype = sys.argv[1]
group = sys.argv[2]
key = sys.argv[3]
main(xtype, group, key)
使用方式
生產訊息
python testkafka.py p g k
消費訊息
python testkafka.py c g k
使用python連線kafka
kafka是高吞吐的訊息佇列系統,輕鬆支援每秒百萬級的寫入請求,這種特性也使得kafka在日誌處理等海量資料場景廣泛應用。kafka依賴於zookeeper執行,zookeeper充當了協調和管理kafka集群的任務,並且儲存一些meta資訊。此處,因作者能力有限,不詳細討論kafka與zookee...
python3 連線kafka生產測試資料
log pip3 install kafka pip3 install kafka python 1.通過指令碼實現讓kafka生產測試資料,測試下游業務服務效能 2.可以增加執行緒池,讓多執行緒併發執行,效果更好 usr bin env python encoding utf 8 author y...
Kafka生產過程
1.寫入方式 寫磁碟效率比隨機寫記憶體要高,保障kafka吞吐率 2.分割槽 partition kafka集群有多個訊息 伺服器 broker server 組成,發布到kafka集群的每條訊息都有乙個類別,用主題 topic 來表示。通常,不同應用產生不同型別的資料,可以設定不同的主題。乙個主題...