kafka connect非常強大,但是也有侷限性,不能個性化的定製,如果需要參考我的另外乙個部落格部落格位址
python實現起來其實也很簡單,就是利用消費者匯出,生產者匯入。而且我效率也很不錯
下面是乙個從某個topic某個分割槽讀資料,然後寫到另外乙個topic的完整**
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import json
import sys
import time
from kafka import kafkaconsumer, topicpartition, kafkaproducer
class
kafka_producer()
:'''
使用kafka的生產模組
'''def__init__
(self, kafkahost,kafkaport, kafkatopic)
: self.producer = kafkaproducer(
bootstrap_servers=
["{}:{}"
.format
(kafkahost,kafkaport)],
value_serializer=
lambda v: json.dumps(v)
.encode(
'utf-8'))
self.kafkatopic = kafkatopic
defsendjsondata
(self, msg)
:try
: self.producer.send(topic=self.kafkatopic, value=msg)
except keyboardinterrupt, e:
print e
class
kafka_consumer()
:def
__init__
(self, kafkahost, kafkaport, kafkatopic, partition_num, groupid,start=
0,end=
0,consumer_timeout_ms =
10000):
self.consumer = kafkaconsumer(
group_id=groupid,
bootstrap_servers=
["{}:{}"
.format
(kafkahost,kafkaport)],
value_deserializer=json.loads,
consumer_timeout_ms = consumer_timeout_ms
)self.end = sys.maxint;
# consumer from special offset
if start >0:
self.end = end
partition = topicpartition(kafkatopic, partition_num)
self.consumer.assign(
[partition]
) self.consumer.seek(partition,
int(start)
)def
consume_data
(self)
:try
:for message in self.consumer:
if message.offset > self.end:
break
yield message
except keyboardinterrupt, e:
print e
defgetsysmills()
:return
int(
round
(time.time()*
1000))
defmain()
: start_offset =
2127489
#開始的offset
end_offset =
4044141
#結束的offset
consumer_timeout_ms_global =
10000
source_hosts =
"***x"
#只是host不需要埠
source_topic =
"topic_***x"
source_partition_num =
0 group = source_topic+
'_***x_group'
dest_hosts =
"******"
dest_topic =
"******xx"
print
("source_hosts:{},source_topic:{},source_partition_num:{},start_offset:{},end_offset:{}"
.format
(source_hosts, source_topic, source_partition_num, start_offset, end_offset)
) start_time = getsysmills(
) consumer = kafka_consumer(source_hosts,
9092
, source_topic, source_partition_num, group, start=start_offset, end=end_offset, consumer_timeout_ms=consumer_timeout_ms_global)
messages = consumer.consume_data(
)##匯出
print
("dest_hosts:{},dest_topic:{}"
.format
(dest_hosts, dest_topic)
) producer = kafka_producer(dest_hosts,
9092
, dest_topic)
count =
0for message in messages:
producer.sendjsondata(message.value)
#匯入print
(message.value)
count = count+
1if count %
1000==0
:print
("migrate already completed count:{}"
.format
(count)
) producer.producer.close(
) consumer.consumer.close(
) end_time = getsysmills(
)print
"migrate complete,cost:{},count:{}"
.format
(end_time-start_time, count)
if __name__ ==
'__main__'
: main(
)
批量啟動kafka指令碼
一般kafka我們配置3臺,在啟動kafka時候需要到每個節點上一一啟動,所以寫個指令碼,方便執行。指令碼編寫 kf.sh bin bash case 1 in start stop esac 2.將指令碼上傳到 opt module hadoop 3.2.1 bin 前提是 hadoop已經配置過...
Python 基於Python實現批量建立目錄
基於python實現批量建立目錄 by 授客qq 1033553122 測試環境 python 版本 python 2.7 實踐 usr bin env python coding utf 8 author shouke import os class publictools def init se...
用python實現批量複製
使用shutil實現簡單的檔案批量複製 src dir為複製檔案的原始檔,就是從 複製,target dir 是目標檔案,就是要複製到哪 shutil.copy src dir,target dir 完整 import os import shutil 呼叫複製檔案函式 defcopy2file s...