***關於kafka producer 分割槽策略的思考
from:
今天跑了乙個簡單的kafka produce程式,如下所示
public class kafkaproducer extends thread
@override
public void run()
try catch (interruptedexception e)
} }
private producer createproducer()
public static void main(string args)
}
發現其只向topic:user11中的某乙個partiton中寫資料。一下子感覺不對啊,kafka不是號稱可以實現producer的訊息均發嗎?後來查了一下相關的引數:partitioner.class
# 分割槽的策略
# 預設為kafka.producer.defaultpartitioner,取模
partitioner.class = kafka.producer.defaultpartitioner
在上面的程式中,我在producer中沒有定義分割槽策略,也就是說程式採用預設的kafka.producer.defaultpartitioner,來看看原始碼中是怎麼定義的:
class defaultpartitioner(props: verifiableproperties = null) extends partitioner
}
其核心思想就是對每個訊息的key的hash值對partition數取模得到。再來看看我的程式中有這麼一段:
producer.send(new keyedmessage(topic,string))
來看看keymessage:
case class keyedmessage[k, v](val topic: string, val key: k, val partkey: any, val message: v)
def haskey = key != null
}
由於上面生產者**中沒有傳入key,所以程式呼叫:
def this(topic: string, message: v) = this(topic, null.asinstanceof[k], null, message)
但是如果key為null時會傳送到哪個分割槽?我在實驗的時候發現,每次執行生產者執行緒好像傳送的分割槽都不太相同。具體的解釋可以參考博文:
好的問題發現了該怎麼解決呢?只需要在生產者執行緒中對每條訊息指定key,如下:
producer.send(new keyedmessage(topic,string.valueof(i),string));
如下所示為自定義的分割槽函式,分割槽函式實現了partitioner介面
public class personalpartition implements partitioner
public int partition(object arg0, int arg1)
else
}}
然後修改配置即可:
properties.put("partitioner.class", "com.xx.kafka.personalpartition");
當然,也可以向topic中指定的partition中寫資料,如下**為向」user11」中partition 1中寫入資料:
public class kafkaproducer extends thread
@override
public void run()
try catch (interruptedexception e)
} }
private kafkaproducer createproducer()
}
kafka負載均衡相關資料收集(二)
關於kafka producer 分割槽策略的思考 from 今天跑了乙個簡單的kafka produce程式,如下所示 public class kafkaproducer extends thread override public void run try catch interruptede...
kafka手動負載均衡
針對執行中kafka的集群,因為特定原因,部分kafka節點負荷量超載,可以進行手動topic重新分配 還可以根據需求只重新分配特定topic到特定kafka節點,以實現只想在固定節點使用特定topic的目的。vim topics.json version 1 usr local kafka bin...
負載均衡相關
現在記下關閉linux防火牆的方法 1.即時生效,重啟後失效 開啟 service iptables start 關閉 service iptables stop 2 重啟後生效 開啟 chkconfig iptables on 關閉 chkconfig iptables off 關閉selinux...