python有成熟的kafka python client
github以及官文都有示例,這邊做個簡單&快速的總結:
1. 首先需要更新pip
pip install --upgrade pip -i
pip install kafka-python -i
python**示例
**無誤(非常推薦)
(consumer.py很方便使用,包含多個維度。producer.py可根據需求自行調整)
2. 簡單示例
producer持續輸入一列資料producer.py——自動輸入1~1000
import pickle
import
time
from kafka import kafkaproducer
producer = kafkaproducer(bootstrap_servers=
["bootstrap_servers:port","bootstrap_servers:port", "bootstrap_servers:port"
], key_serializer=lambda k: pickle.dumps(k),
value_serializer=lambda v: pickle.dumps(v))
start_time = time.time(
)for i in range(0, 1000):
print(
'------{}---------'.format(i))
future = producer.send(
'test_topic', key=
'num', value=i, partition=0)
producer.flush(
)producer.close(
)end_time = time.time(
)time_counts = end_time - start_time
print(time_counts)
簡單的producer
# -*- coding: utf-8 -*-
from kafka import kafkaproducer
producer = kafkaproducer(bootstrap_servers=
['10.0.18.75:6667','10.0.18.31:6667','10.0.18.228:6667'])
for i in range(3):
msg =
"msg %d" % i
print msg
producer.send(
'test_topic', msg)
producer.close(
)
其中關於auto_offset_reset=『earliest』,從而實現區分group consumer從頭開始消費
例如:
#!/usr/bin/env python
# encoding: utf-8
import socket
from kafka import kafkaconsumer
from kafka.errors import kafkaerror
import setting
conf = setting.kafka_setting
consumer = kafkaconsumer(auto_offset_reset=
'earliest',bootstrap_servers=conf[
'bootstrap_servers'
], group_id=conf[
'consumer_id'
], api_version =
(0,10,2),
session_timeout_ms=25000,
max_poll_records=100,
fetch_max_bytes=1 * 1024 * 1024)
print 'consumer start to consuming...'
consumer.subscribe((conf[
'topic_name'
], ))
for message in consumer:
print message.topic, message.offset , message.value
Mbed OS CAN匯流排收發訊息
can controller area network 是博世公司發明的。是乙個多主訊息廣播系統,最高速率可達1mbps,和傳統的usb,乙太網介面不同。can 不能點對點傳送大資料塊。只能以廣播方式傳送短訊息 8個位元組 和ethernet類似,can採取 csma cd匯流排通訊方式。can已經...
socket實現UDP收發訊息
import socket while true 建立udp套接字 udp socket socket.socket socket.af inet,socket.sock dgram 設定接收方的位址和埠 根據具體情況更改 dest addr 255.255.255.255 8080 從鍵盤輸入資料...
socket收發訊息底層原理
服務端和客戶端想要通訊,底層需要internet物理連線,網絡卡配備有ip位址和mac位址,網絡卡收發的訊息是位元組流。服務端程式和客戶端程式工作中應用層,服務端程式要想發包,必須一層一層往下走,走到網絡卡那一層,將資料轉化成二進位制才能送到客戶端 客戶端網絡卡收到包,一層一層往上送,然後客戶端程式...