傳送端
import csv
import time
from kafka import kafkaproducer
from kafka import kafkaconsumer
import json
# 例項化乙個kafkaproducer示例,用於向kafka投遞訊息
producer = kafkaproducer(value_serializer=
lambda v: json.dumps(v)
.encode(
'utf-8'
),bootstrap_servers=
'192.168.130.28:9092'
)for x in
range(0
,1000):
time.sleep(
0.1)
# 每隔0.1秒傳送一行資料
# 傳送資料,topic為'test_data'
data =
print
(data)
producer.send(
'test_data'
, data)
我們再做乙個接收端
from kafka import kafkaconsumer
consumer = kafkaconsumer(
'test_data'
, bootstrap_servers=
['192.168.130.29:9092'])
for msg in consumer:
print
(msg.value)
recv =
"%s:%d:%d: key=%s value=%s"
%(msg.topic, msg.partition, msg.offset, msg.key, msg.value)
print
(recv)
很簡單的例子
錯誤syntaxerror: invalid syntax的解決方法總結
python -m pip install kafka-python
這裡有個問題就是消費者,如果不配置的話,消費者每次開啟後都會從最新的讀取,導致歷史資料沒辦法讀取出來,我們需要配置一下kafkaconsumer。
auto_offset_reset = earliest,只配置這個,會從kafka初始的資料消費,重複消費之前的資料。
我們需要再配置group_id=『my_group_new』。這樣就可以了,
bootstrap_servers,可以配置集群
consumer = kafkaconsumer(
'filestorage'
, group_id=
'my_group_new'
,auto_offset_reset=
'earliest'
,bootstrap_servers=
['xx:9092'
,'xx:9092'
,'xx:9092'],
)
Kafka 訊息傳送
建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...
kafka訊息傳送模式
在kafka 0.8.2之後,producer不再區分同步 sync 和非同步方式 async 所有的請求以非同步方式傳送,這樣提公升了客戶端效率。producer請求會返回乙個應答物件,包括偏移量或者錯誤信。這種非同步方地批量的傳送訊息到kafka broker節點,因而可以減少server端資源...
Kafka 傳送訊息流程
客戶端的幾個元件 一條訊息首先需要確定要被儲存到那個 partition 對應的雙端佇列上 其次,儲存訊息的雙端佇列是以批的維度儲存的,即 n 條訊息組成一批,一批訊息最多儲存 n 條,超過後則新建乙個組來儲存新訊息 其次,新來的訊息總是從左側寫入,即越靠左側的訊息產生的時間越晚 最後,只有當一批訊...