想要提高生產者的吞吐量可以通過調整一下4個引數來實現
batch.size:批次大小,預設16k
linger.ms:等待時間,修改為5-100ms
recordaccumulator:緩衝區大小,修改為64m
**實現
public傳送流程ack 應答級別class
customproducerparameters
//關閉資源
kafkaproducer.close();}}
資料完全可靠條件=ack級別設定為-1+分割槽副本大於等於2+isr裡應答的最小副本數量大於等於2
可靠性總結:
測試
public3.1、資料傳輸語義class
customproduceracks
//3、關閉資源
kafkaproducer.close();}}
kafka0.11版本以後,引入了一項重大特性:冪等性和事務
3.2、冪等性
冪等性指producer不論向broker傳送多少次重複資料,broker端都只會持久化一條,保證了不重複。
精確一次(exactlyonce)=冪等性+至少一次(ack=-1+分割槽副本數》=2+isr最小副本數量》=2)
重複資料的判斷標準:具有相同主鍵的訊息提交時,broker只會持久化一條。其中pid是kafka每次重啟都會分配乙個新的;partition表示分割槽號;sequencenumber是單調自增的。所以冪等性只能保證的是在單分割槽單會話內不重複
如何使用冪等性:開啟引數enable.idempotence 預設為true,false關閉
3.3、事務
說明:開啟事務,必須開啟冪等性。
producer 在使用事務功能前,必須先自定義乙個唯一的transactional.id。有了transactional.id,即使客戶端掛掉了,它重啟後也能繼續處理未完成的事務
事務測試
public單分區內,有序(有條件的,詳見下節);多分割槽,分割槽與分區間無序; kafka在1.x版本之前保證資料單分割槽有序的條件 max.in.flight.requests.per.connection=1(不需要考慮是否開啟冪等性)class
customproducertranaction
//提交事務
kafkaproducer.committransaction();
} catch
(exception e)
finally}}
kafka在1.x及以後版本保證資料單分割槽有序分兩者情況
未開啟冪等性 max.in.flight.requests.per.connection需要設定為1
開啟冪等性 max.in.flight.requests.per.connection需要設定小於等於5
原因說明:因為在kafka1.x以後,啟用冪等後,kafka服務端會快取producer發來的最近5個request的元資料,故無論如何,都可以保證最近5個request的資料都是有序的。
Kafka之生產者
1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...
kafka生產者分割槽策略
kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...
kafka生產者API操作
建立producer傳送資訊給消費者 設定配置相關 private val prop new properties prop.setproperty bootstrap.servers mypc01 9092,mypc02 9092,mypc03 9092 prop.setproperty acks...