flink寫入kafka之自定義分割槽器

2021-09-19 15:41:50 字數 1170 閱讀 5501

直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章

接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器

直接上**

/*自定義分割槽*/

@suppresswarnings

("unchecked"

)flinkkafkaproducer010

flinkproducer =

newflinkkafkaproducer010

("topic名字"

,//自定義元資料

newmyschema()

,//kafka producer的屬性

pro,

//自定義分割槽器

newmypartitioner()

);flinkdatastream.

addsink

(flinkproducer)

.setparallelism(2

);

我們自己定義的元資料類

public

static

class

myschema

implements

keyedserializationschema

/** * 要傳送的value

** @param element 原資料

* @return value.getbytes

*/@override

public

byte

serializevalue

(object element)

@override

public string gettargettopic

(object element)

}

我們自己定義的分割槽器

public

static

class

mypartitioner

extends

flinkkafkapartitioner

}

這樣flink傳送到kafka的資料就不是隨便發的了,是根據你定義的key傳送的

flink寫入HDFS中文亂碼

客戶端埋點日誌進行解析時需要獲取地區編碼和名稱,程式是通過flink分布式快取將地區編碼和名稱資料傳到每個task節點進行讀取。本地測試時沒有問題,但是部署到集群資料寫入hdfs後發現中文亂碼,部分 如下 設定分布式快取檔案位址 streamexecutionenvironment bsenv st...

kafka順序寫入 ZeroCopy

1.為何kafka把訊息存在磁碟上,但可以輕鬆支援每秒百萬級的寫入請求 kafka高吞吐率的原因?kafka為了防止丟失資料,將收到的訊息寫入磁碟中,但仍能保證高吞吐率,超過了大部分的訊息中介軟體,使得kafka在日誌處理等海量資料場景廣泛應用。為了優化寫入速度kafka採用了順序寫入和mmfile...

Kafka生產者 向Kafka寫入資料

目錄 前言 1 kafka生產者概覽 2 建立kafka生產者 3 傳送訊息到kafka 4 生產者的配置 5 序列化器 6 分割槽 kafka不管是作為訊息佇列 訊息匯流排還是資料儲存平台來使用,都需要有乙個可以往kafka寫入資料的生產者和乙個可以從kafka讀取資料的消費者,或者乙個兼具兩種角...