預設分割槽策略是:取正(bytearray生成32位hash值)%numpartitions
這個公式的結果是得到0-(numpartitions-1)間正整數的個數大致相等,也就是說kafka的預設分割槽策略是無論我們給定多少個分割槽,我們存放的資料基本上會平均的分到各個分割槽上。
private
intdefaultpartition(string topic, object key, byte keybytes, object value, byte valuebytes, cluster cluster) else
} else
return partition;
}private
static
inttopositive(int number)
//生成32位的hash 值
public
static
intmurmur2(final
byte data)
實際開發中會遇到不讓資料均勻分布,如按照範圍放到不同的分割槽中,這樣就得使用自定義的分割槽策略了
int partition = 0;
if(key<100)else
if(key<200)else
producerrecordrecords = new producerrecord(topic,partition,key,value);
kafkaproducer.send(records);
public
class
kafkacustompartitioner
implements
partitioner
public
void
close() {}
public
intpartition(string topic, object arg1, byte keybytes, object arg3, byte arg4, cluster arg5) else
if(key<200)else
return partition;
}}
2.新增配置
partitioner.class值為自定義分割槽類的完整包名,這樣生產者就會選擇自定義的分割槽策略。
props.put("partitioner.class", "xx.xx.kafkacustompartitioner");
說明:1.客戶端測試環境中,自定義分割槽類跟生產者類在乙個專案中,不需要其他操作;2.想要自定義的分割槽放到kafka的伺服器端環境時,需要將自定義的分割槽類生成jar包放到kafka環境的lib下,同樣配置檔案中指定完整包名。 Kafka自定義分割槽器
kafka通過生產者kafkaproducer的send 方法將訊息傳送到broker中,但在傳送過程中需要經過 interceptor 序列化器 serializer 和分割槽器 partitioner 的一系列作用之後才能被真正地發往broker。訊息在經過序列化後需要確定它發往的分割槽,如果訊...
flink寫入kafka之自定義分割槽器
直入正題,flink寫入kafka根據某個資料中的字段做分割槽傳送到kafka的指定分割槽,如果你在sink中每次要手動寫producer,那麼你可以略過此文章 接著上篇文章flink寫入kafka之預設序列化類和預設分割槽器 直接上 自定義分割槽 suppresswarnings unchecke...
Kafka 自定義分割槽的生產者
1,實現 介面 patitioner public class kafkapartitioner implements partitioner override public void close override public void configure mapmap 2,預設的分割槽器 def...