kafka python client收發訊息示例

2021-10-12 06:06:37 字數 2414 閱讀 4916

python有成熟的kafka python client

github以及官文都有示例,這邊做個簡單&快速的總結:

1. 首先需要更新pip

pip install --upgrade pip -i 

pip install kafka-python -i

python**示例

**無誤(非常推薦)

(consumer.py很方便使用,包含多個維度。producer.py可根據需求自行調整)

2. 簡單示例

producer持續輸入一列資料producer.py——自動輸入1~1000

import pickle

import

time

from kafka import kafkaproducer

producer = kafkaproducer(bootstrap_servers=

["bootstrap_servers:port","bootstrap_servers:port", "bootstrap_servers:port"

], key_serializer=lambda k: pickle.dumps(k),

value_serializer=lambda v: pickle.dumps(v))

start_time = time.time(

)for i in range(0, 1000):

print(

'------{}---------'.format(i))

future = producer.send(

'test_topic', key=

'num', value=i, partition=0)

producer.flush(

)producer.close(

)end_time = time.time(

)time_counts = end_time - start_time

print(time_counts)

簡單的producer

# -*- coding: utf-8 -*-

from kafka import kafkaproducer

producer = kafkaproducer(bootstrap_servers=

['10.0.18.75:6667','10.0.18.31:6667','10.0.18.228:6667'])

for i in range(3):

msg =

"msg %d" % i

print msg

producer.send(

'test_topic', msg)

producer.close(

)

其中關於auto_offset_reset=『earliest』,從而實現區分group consumer從頭開始消費

例如:

#!/usr/bin/env python

# encoding: utf-8

import socket

from kafka import kafkaconsumer

from kafka.errors import kafkaerror

import setting

conf = setting.kafka_setting

consumer = kafkaconsumer(auto_offset_reset=

'earliest',bootstrap_servers=conf[

'bootstrap_servers'

], group_id=conf[

'consumer_id'

], api_version =

(0,10,2),

session_timeout_ms=25000,

max_poll_records=100,

fetch_max_bytes=1 * 1024 * 1024)

print 'consumer start to consuming...'

consumer.subscribe((conf[

'topic_name'

], ))

for message in consumer:

print message.topic, message.offset , message.value

Mbed OS CAN匯流排收發訊息

can controller area network 是博世公司發明的。是乙個多主訊息廣播系統,最高速率可達1mbps,和傳統的usb,乙太網介面不同。can 不能點對點傳送大資料塊。只能以廣播方式傳送短訊息 8個位元組 和ethernet類似,can採取 csma cd匯流排通訊方式。can已經...

socket實現UDP收發訊息

import socket while true 建立udp套接字 udp socket socket.socket socket.af inet,socket.sock dgram 設定接收方的位址和埠 根據具體情況更改 dest addr 255.255.255.255 8080 從鍵盤輸入資料...

socket收發訊息底層原理

服務端和客戶端想要通訊,底層需要internet物理連線,網絡卡配備有ip位址和mac位址,網絡卡收發的訊息是位元組流。服務端程式和客戶端程式工作中應用層,服務端程式要想發包,必須一層一層往下走,走到網絡卡那一層,將資料轉化成二進位制才能送到客戶端 客戶端網絡卡收到包,一層一層往上送,然後客戶端程式...