kafka通過生產者kafkaproducer的send()方法將訊息傳送到broker中,但在傳送過程中需要經過***(interceptor)、序列化器(serializer)和分割槽器(partitioner)的一系列作用之後才能被真正地發往broker。訊息在經過序列化後需要確定它發往的分割槽,如果訊息producerrecord中指定了partition欄位,那麼就不需要分割槽器的作用,因為partition代表的就是所要發往的分割槽號。
如果訊息producerrecord中沒有指定partition欄位,那麼需要依賴分割槽器,根據key這個欄位來計算partition的值。分割槽器的作用就是為訊息分配分割槽。
/**
* computes partition for given record.
* if the record has partition returns the value otherwise
* calls configured partitioner class to compute the partition.
*/private int partition(producerrecordrecord, byte serializedkey, byte serializedvalue, cluster cluster)
kafka中提供的預設器是org.apache.kafka.clients.producer.internals.defaultpartitioner,它實現了partitioner介面,這介面中定義了2個方法:
public int partition(string topic, object key, byte keybytes, object value, byte valuebytes, cluster cluster);
public void close();
其中partition()方法用來計算分割槽號,返回值為int型別。
在預設分割槽器defaultpartitioner在實現中,close()是空方法,而在partition()方法中定義了主要的分割槽分配邏輯。如果key不為null,那麼預設的分割槽器會對key進行雜湊(採用murmurhash2演算法,具備高運算效能及低碰撞率),最終根據得到的雜湊值來計算分割槽號,擁有相同key的訊息會被寫入同乙個分割槽。如果key為null,那麼訊息將會以輪詢的方式發往主題內的各個可用分割槽。
public class defaultpartitioner implements partitioner
/*** compute the partition for the given record.
** @param topic the topic name
* @param key the key to partition on (or null if no key)
* @param keybytes serialized key to partition on (or null if no key)
* @param value the value to partition on or null
* @param valuebytes serialized value to partition on or null
* @param cluster the current cluster metadata
*/public int partition(string topic, object key, byte keybytes, object value, byte valuebytes, cluster cluster) else
} else
}private int nextvalue(string topic)
}return counter.getandincrement();
}public void close() {}
}
注意:如果key不為null,那麼計算得到的分割槽號會是所有分割槽中的任意乙個;如果key為null並且有可用分割槽時,那麼計算得到的分割槽號僅為可用分割槽中的任意一中,注意兩者之間的區別在不改變主題分割槽數量的情況下,key與分割槽之間的對映可以保持不變。不過,一旦主題中增加了分割槽,那麼就難以保證key與分割槽之間的對映關係了。
除了使用kafka提供的預設分割槽器進行分割槽分配,還可以使用自定義的分割槽器,只需跟defaultpartitioner一樣實現partitioner介面即可。預設的分割槽器在key為null時不會選擇非可用的分割槽,我們可以通過自定義的分割槽器selfpartitioner來打破這一限制,具體的實現可以參考下面的示例**:
public class selfpartitioner implements partitioner else
}@override
public void close()
@override
public void configure(mapconfigs)
}
實現自定義的分割槽器後,需要通過配置引數partitioner.class來顯式指定這個分割槽器。求全如下:
properties properties = new properties();
properties.put(producerconfig.partitioner_class_config, selfpartitioner.class.getname());
flink寫入kafka之自定義分割槽器
直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章 接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器 直接上 自定義分割槽 suppresswarnings unchecke...
Kafka分割槽策略及自定義
預設分割槽策略是 取正 bytearray生成32位hash值 numpartitions 這個公式的結果是得到0 numpartitions 1 間正整數的個數大致相等,也就是說kafka的預設分割槽策略是無論我們給定多少個分割槽,我們存放的資料基本上會平均的分到各個分割槽上。private in...
自定義分割槽器外掛程式
官方只提供了取模分割槽器,如果有其他分割槽需求,可通過自定義分割槽外掛程式擴充套件。自定義分割槽器外掛程式需要實現以下兩個介面 partitioner 基類basepartitioner 說明 維護配置 訪問對話方塊 執行時分配資料到分割槽 stepdialoginte ce 基類basestepd...