properties props = new properties();
//集群位址,多個伺服器用","分隔
props.put(
"bootstrap.servers", "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092");
//重新傳送訊息次數,到達次數返回錯誤
props.put(
"retries", 0)
;//producer會嘗試去把發往同乙個partition的多個requests進行合併,batch.size指明了一次batch合併後requests總大小的上限。如果這個值設定的太小,可能會導致所有的request都不進行batch。
props.put(
"batch.size", 163840)
;//producer缺省會把兩次傳送時間間隔內收集到的所有requests進行一次聚合然後再傳送,以此提高吞吐量,而linger.ms則更進一步,這個引數為每次傳送增加一些delay,以此來聚合更多的message。
props.put(
"linger.ms", 1)
;//在producer端用來存放尚未傳送出去的message的緩衝區大小
props.put(
"buffer.memory", 33554432)
;//key、value的序列化,此處以字串為例,使用kafka已有的序列化類
props.put(
"key.serializer", "org.apache.kafka.common.serialization.stringserializer");
props.put(
"value.serializer", "org.apache.kafka.common.serialization.stringserializer");
"partitioner.class", "com.kafka.demo.partitioner"
);//分割槽操作,此處未寫
props.put(
"acks", "1");
props.put(
"request.timeout.ms", "60000");
props.put(
"compression.type","lz4"
);
1、producer 引數acks設定
在訊息被認為是「已提交」之前,producer需要leader確認的produce請求的應答數。該引數用於控制訊息的永續性,目前提供了3個取值:
配置推薦:
如果要較高的永續性要求以及無資料丟失的需求,設定acks = -1。其他情況下設定acks = 1。
2、producer引數 buffer.memory 設定(吞吐量)
該引數用於指定producer端用於快取訊息的緩衝區大小,單位為位元組,預設值為:33554432合計為32m。kafka採用的是非同步傳送的訊息架構,prducer啟動時會首先建立一塊記憶體緩衝區用於儲存待傳送的訊息,然後由乙個專屬執行緒負責從緩衝區讀取訊息進行真正的傳送。
訊息持續傳送過程中,當緩衝區被填滿後,producer立即進入阻塞狀態直到空閒記憶體被釋放出來,這段時間不能超過max.blocks.ms設定的值,一旦超過,producer則會丟擲timeoutexception 異常,因為producer是執行緒安全的,若一直報timeoutexception,需要考慮調高buffer.memory 了。
使用者在使用多個執行緒共享kafka producer時,很容易把 buffer.memory 打滿。
3、 producer引數 compression.type 設定
4、 producer引數 retries設定
producer重試的次數設定。重試時producer會重新傳送之前由於瞬時原因出現失敗的訊息。瞬時失敗的原因可能包括:元資料資訊失效、副本數量不足、超時、位移越界或未知分割槽等。倘若設定了retries > 0,那麼這些情況下producer會嘗試重試。
5、 producer引數batch.size設定
producer都是按照batch進行傳送的,因此batch大小的選擇對於producer效能至關重要。producer會把發往同一分割槽的多條訊息封裝進乙個batch中,當batch滿了後,producer才會把訊息傳送出去。但是也不一定等到滿了,這和另外乙個引數linger.ms有關。預設值為16k,合計為16384.
6、 producer引數linger.ms設定
producer是按照batch進行傳送的,但是還要看linger.ms的值,預設是0,表示不做停留。這種情況下,可能有的batch中沒有包含足夠多的produce請求就被傳送出去了,造成了大量的小batch,給網路io帶來的極大的壓力。
配置推薦:
為了減少了網路io,提公升了整體的tps。假設設定linger.ms=5,表示producer請求可能會延時5ms才會被傳送。
kafka生產者例項配置引數
kafkaproducer中有三個引數是必填的 bootstrap.servers 指定生產者客戶端連線kafka集群所需的broker位址列表,格式為host1 port1,host2 port2,可以設定乙個或多個。這裡並非需要所有的broker位址,因為生產者會從給定的broker裡尋找其它的...
Kafka之生產者
1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...
kafka 生產者(二)
想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...