2、傳送原理刨析
3、其他生產者引數
訊息要到網路上進行傳輸,必須進行序列化,而序列號器的作用就是入磁。
kafka提供了預設的字串序列化器(org.apache.kafka.common.serialization.stringserializer),還有整型(integerserializer)和位元組陣列(bytesserializer)序列化器,這些序列化器都實現了介面(org.apache.kafka.common.serialization.serializer)基本上能夠滿足大部分場景的需求。
/**
* 自定義序列化器
*/public
class
companyserializer
implements
serializer
@override
public
byte
serializer
(string topic,company data)
byte
name,address;
tryelse
if(data.
getaddress()
!=null)
else
bytebuffer buffer = bytebuffer.
allocate(4
+4+name.length+address.length)
; buffer.
putint
(name.length)
; buffer.
put(name)
; buffer.
putint
(address.length)
; buffer.
put(address)
;return buffer.
array()
;}catch
(unsupportedencodinfexception e)
return
newbyte[0
];}@override
public
void
close()
}
本身kafka有自己的分割槽策略的,如果未指定,就會使用預設的分割槽策略
kafka根據傳遞訊息的key來進行分割槽的分配,即hash(key)%nnumpartitions。如果key相同的化,那麼就會分配到同一分割槽。
原始碼:org.apache.kafka.clients.producer.internals.defaultpartitioner
public
intpartition
(string topic,object key,
byte
keybytes,object value,
byte
valuebytes,cluster cluster)
else
}else
}
/**
* 自定義分割槽器
*/public
class
definepartitioner
implements
partitioner
else
}@override
public
void
close()
}
//自定義分割槽器的使用
props.
put(producerconfig.partitioner_class_config,definepartitioner.
class
.getname()
);
producer***(interceptor)是相當新的功能,它和consumer端interceptor是在kafka0.10版本被引入的,主要用於實現client端的定製化控制邏輯。
生產者***可以用在訊息傳送前做一些準備工作。
使用場景
按照某個規則過濾掉不符合要求的訊息
修改訊息的內容
統計類需求
/**
* 自定義***
訊息傳送的過程中,設計到兩個執行緒協調工作,主線程首先將業務資料封裝成producerrecord物件,之後呼叫send()方法將訊息方入recordaccumulator(訊息收集器,也可以理解為主執行緒與sender執行緒直接的緩衝區)中暫存,sender執行緒負責將訊息構成請求,並最終執行網路i/o的執行緒,它從recordaccumulator中取出訊息並批量傳送出去,需要注意的是,kafkaproducer是執行緒安全的,多個執行緒間可以共享使用同乙個kafkaproducer物件
這個引數用來指定分割槽中必須有多少個副本收到這條訊息,之後生產者才會認為這條訊息是寫入成功的。acks是生產者客戶端中非常重要的乙個引數,它涉及到訊息的可靠性和吞吐量之間的權衡。
注意:acks引數配置的時乙個字串型別,而不是整數型別,如果配置為證書型別會丟擲異常
生產者從伺服器收到的錯誤有可能時臨時性的錯誤(比如分割槽找不到首領),在這種情況下,如果達到了retries
設定的次數,生產者會放棄重試並返回錯誤,預設情況下,生產者會在每次重試之間等待100ms,可以通過retry.backoff.ms
引數來修改這個時間間隔。
當有多個訊息被傳送到同乙個分割槽時,生產者會把他們放在同乙個批次裡,該引數指定了乙個批次可能使用的記憶體大小,按照位元組數計算,而不是訊息個數,當批次被填滿,批次裡的所有訊息會被傳送出去。不過生產者並不一定都會等到批次被填滿才傳送,半滿的批次,甚至只包含乙個訊息的批次也可能被傳送。所以就算把batch.size
設定的很大,也不會造成延遲,只會占用更多的記憶體而已,如果設定的大小,生產者會因為頻繁傳送訊息而增加一些額外的開銷。
該引數用於控制生產者傳送的請求大小,它可以指定能傳送的單個訊息的最大值,也可以指單個請求裡所有訊息的總大小。broker
對可接收的訊息最大值也有自己的限制(message.max.size
),所以兩邊的配置最好匹配,避免生產者傳送的訊息被broker
拒絕。
kafka學習筆記(二) 生產者介紹
生產者傳送訊息的過程 分割槽原則 資料可靠性保證 acsk 0 producer不等待broker的ack,這樣可以提供最低的延遲,broker一接收到還沒有寫入到磁碟就返回ack,當broker出現故障的時候可能會丟失資料 1 producer等待broker返回ack,partition的lea...
kafka學習總結006 生產者事務
前面提到了,kafka0.11.0版本引入的冪等性只能保證分割槽級別的at exactly once語義 如圖,producer向三個分割槽分別生產10條資料,前兩個生產成功,寫第三個分割槽時,producer掛掉 producer重啟後,重新向三個分割槽寫入資料 此時producer的pid變化,...
kafka 生產者(二)
想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...