Python實現 Kafka批量匯入匯出

2021-10-07 04:42:13 字數 3668 閱讀 5141

kafka connect非常強大,但是也有侷限性,不能個性化的定製,如果需要參考我的另外乙個部落格部落格位址

python實現起來其實也很簡單,就是利用消費者匯出,生產者匯入。而且我效率也很不錯

下面是乙個從某個topic某個分割槽讀資料,然後寫到另外乙個topic的完整**

#!/usr/bin/env python

# -*- coding: utf-8 -*-

import json

import sys

import time

from kafka import kafkaconsumer, topicpartition, kafkaproducer

class

kafka_producer()

:'''

使用kafka的生產模組

'''def__init__

(self, kafkahost,kafkaport, kafkatopic)

: self.producer = kafkaproducer(

bootstrap_servers=

["{}:{}"

.format

(kafkahost,kafkaport)],

value_serializer=

lambda v: json.dumps(v)

.encode(

'utf-8'))

self.kafkatopic = kafkatopic

defsendjsondata

(self, msg)

:try

: self.producer.send(topic=self.kafkatopic, value=msg)

except keyboardinterrupt, e:

print e

class

kafka_consumer()

:def

__init__

(self, kafkahost, kafkaport, kafkatopic, partition_num, groupid,start=

0,end=

0,consumer_timeout_ms =

10000):

self.consumer = kafkaconsumer(

group_id=groupid,

bootstrap_servers=

["{}:{}"

.format

(kafkahost,kafkaport)],

value_deserializer=json.loads,

consumer_timeout_ms = consumer_timeout_ms

)self.end = sys.maxint;

# consumer from special offset

if start >0:

self.end = end

partition = topicpartition(kafkatopic, partition_num)

self.consumer.assign(

[partition]

) self.consumer.seek(partition,

int(start)

)def

consume_data

(self)

:try

:for message in self.consumer:

if message.offset > self.end:

break

yield message

except keyboardinterrupt, e:

print e

defgetsysmills()

:return

int(

round

(time.time()*

1000))

defmain()

: start_offset =

2127489

#開始的offset

end_offset =

4044141

#結束的offset

consumer_timeout_ms_global =

10000

source_hosts =

"***x"

#只是host不需要埠

source_topic =

"topic_***x"

source_partition_num =

0 group = source_topic+

'_***x_group'

dest_hosts =

"******"

dest_topic =

"******xx"

print

("source_hosts:{},source_topic:{},source_partition_num:{},start_offset:{},end_offset:{}"

.format

(source_hosts, source_topic, source_partition_num, start_offset, end_offset)

) start_time = getsysmills(

) consumer = kafka_consumer(source_hosts,

9092

, source_topic, source_partition_num, group, start=start_offset, end=end_offset, consumer_timeout_ms=consumer_timeout_ms_global)

messages = consumer.consume_data(

)##匯出

print

("dest_hosts:{},dest_topic:{}"

.format

(dest_hosts, dest_topic)

) producer = kafka_producer(dest_hosts,

9092

, dest_topic)

count =

0for message in messages:

producer.sendjsondata(message.value)

#匯入print

(message.value)

count = count+

1if count %

1000==0

:print

("migrate already completed count:{}"

.format

(count)

) producer.producer.close(

) consumer.consumer.close(

) end_time = getsysmills(

)print

"migrate complete,cost:{},count:{}"

.format

(end_time-start_time, count)

if __name__ ==

'__main__'

: main(

)

批量啟動kafka指令碼

一般kafka我們配置3臺,在啟動kafka時候需要到每個節點上一一啟動,所以寫個指令碼,方便執行。指令碼編寫 kf.sh bin bash case 1 in start stop esac 2.將指令碼上傳到 opt module hadoop 3.2.1 bin 前提是 hadoop已經配置過...

Python 基於Python實現批量建立目錄

基於python實現批量建立目錄 by 授客qq 1033553122 測試環境 python 版本 python 2.7 實踐 usr bin env python coding utf 8 author shouke import os class publictools def init se...

用python實現批量複製

使用shutil實現簡單的檔案批量複製 src dir為複製檔案的原始檔,就是從 複製,target dir 是目標檔案,就是要複製到哪 shutil.copy src dir,target dir 完整 import os import shutil 呼叫複製檔案函式 defcopy2file s...