直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器
直接上**
/*自定義分割槽*/
@suppresswarnings
("unchecked"
)flinkkafkaproducer010
flinkproducer =
newflinkkafkaproducer010
("topic名字"
,//自定義元資料
newmyschema()
,//kafka producer的屬性
pro,
//自定義分割槽器
newmypartitioner()
);flinkdatastream.
addsink
(flinkproducer)
.setparallelism(2
);
我們自己定義的元資料類
public
static
class
myschema
implements
keyedserializationschema
/** * 要傳送的value
** @param element 原資料
* @return value.getbytes
*/@override
public
byte
serializevalue
(object element)
@override
public string gettargettopic
(object element)
}
我們自己定義的分割槽器
public
static
class
mypartitioner
extends
flinkkafkapartitioner
}
這樣flink傳送到kafka的資料就不是隨便發的了,是根據你定義的key傳送的 flink寫入HDFS中文亂碼
客戶端埋點日誌進行解析時需要獲取地區編碼和名稱,程式是通過flink分布式快取將地區編碼和名稱資料傳到每個task節點進行讀取。本地測試時沒有問題,但是部署到集群資料寫入hdfs後發現中文亂碼,部分 如下 設定分布式快取檔案位址 streamexecutionenvironment bsenv st...
kafka順序寫入 ZeroCopy
1.為何kafka把訊息存在磁碟上,但可以輕鬆支援每秒百萬級的寫入請求 kafka高吞吐率的原因?kafka為了防止丟失資料,將收到的訊息寫入磁碟中,但仍能保證高吞吐率,超過了大部分的訊息中介軟體,使得kafka在日誌處理等海量資料場景廣泛應用。為了優化寫入速度kafka採用了順序寫入和mmfile...
Kafka生產者 向Kafka寫入資料
目錄 前言 1 kafka生產者概覽 2 建立kafka生產者 3 傳送訊息到kafka 4 生產者的配置 5 序列化器 6 分割槽 kafka不管是作為訊息佇列 訊息匯流排還是資料儲存平台來使用,都需要有乙個可以往kafka寫入資料的生產者和乙個可以從kafka讀取資料的消費者,或者乙個兼具兩種角...