一、簡介:
詳見:二、安裝
詳見部落格:
三、按照官網的樣例,先跑乙個應用
1、生產者:
from kafka import kafkaproducer
producer = kafkaproducer(bootstrap_servers=['172.21.10.136:9092']) #此處ip可以是多個['0.0.0.1:9092','0.0.0.2:9092','0.0.0.3:9092' ]
for i in range(3):
msg = "msg%d" % i
producer.send('test', msg)
producer.close()
2、消費者(簡單demo):
from kafka import kafkaconsumer
consumer = kafkaconsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動後生產者、消費者可以正常消費。
3、消費者(消費群組)
from kafka import kafkaconsumer
consumer = kafkaconsumer('test',
group_id='my-group',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
啟動多個消費者,只有其中可以可以消費到,滿足要求,消費組可以橫向擴充套件提高處理能力
4、消費者(讀取目前最早可讀的訊息)
from kafka import kafkaconsumer
consumer = kafkaconsumer('test',
auto_offset_reset='earliest',
bootstrap_servers=['172.21.10.136:9092'])
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
auto_offset_reset:重置偏移量,earliest移到最早的可用訊息,latest最新的訊息,預設為latest
原始碼定義:
5、消費者(手動設定偏移量)
from kafka import kafkaconsumer
from kafka.structs import topicpartition
consumer = kafkaconsumer('test',
bootstrap_servers=['172.21.10.136:9092'])
print consumer.partitions_for_topic("test") #獲取test主題的分割槽資訊
print consumer.topics() #獲取主題列表
print consumer.subscription() #獲取當前消費者訂閱的主題
print consumer.assignment() #獲取當前消費者topic、分割槽資訊
print consumer.beginning_offsets(consumer.assignment()) #獲取當前消費者可消費的偏移量
consumer.seek(topicpartition(topic=u'test', partition=0), 5) #重置偏移量,從第5個偏移量消費
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
6、消費者(訂閱多個主題)
from kafka import kafkaconsumer
from kafka.structs import topicpartition
consumer = kafkaconsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0')) #訂閱要消費的主題
print consumer.topics()
print consumer.position(topicpartition(topic=u'test', partition=0)) #獲取當前主題的最新偏移量
for message in consumer:
print ("%s:%d:%d: key=%s value=%s" % (message.topic, message.partition,
message.offset, message.key,
message.value))
7、消費者(手動拉取訊息)
from kafka import kafkaconsumer
import time
consumer = kafkaconsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test','test0'))
while true:
msg = consumer.poll(timeout_ms=5) #從kafka獲取訊息
print msg
time.sleep(1)
8、消費者(訊息掛起與恢復)
from kafka import kafkaconsumer
from kafka.structs import topicpartition
import time
consumer = kafkaconsumer(bootstrap_servers=['172.21.10.136:9092'])
consumer.subscribe(topics=('test'))
consumer.topics()
consumer.pause(topicpartition(topic=u'test', partition=0))
num = 0
while true:
print num
print consumer.paused() #獲取當前掛起的消費者
msg = consumer.poll(timeout_ms=5)
print msg
time.sleep(2)
num = num + 1
if num == 10:
print "resume..."
consumer.resume(topicpartition(topic=u'test', partition=0))
print "resume......"
pause執行後,consumer不能讀取,直到呼叫resume後恢復。
如果對您有幫助,記得給我點贊諾
如果對您有幫助,記得給我點贊諾
Python指令碼消費kafka資料
一 簡介 詳見 二 安裝 詳見部落格 三 按照官網的樣例,先跑乙個應用 1 生產者 from kafka import kafkaproducer producer kafkaproducer bootstrap servers 172.21.10.136 9092 此處ip可以是多個 0.0.0....
kafka消費原理
consumer 採用 pull 拉 模式從 broker 中讀取資料。push 推 模式很難適應消費速率不同的消費者,因為訊息傳送速率是由 broker 決定的。它的目標是盡可能以最快速度傳遞訊息,但是這樣很容易造成 consumer 來不及處理訊息,典型的表現就是拒絕服務以及網路擁塞。而 pul...
kafka 主動消費 Kafka消費者的使用和原理
publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...