kafka訊息傳送模式

2021-09-10 04:39:30 字數 2723 閱讀 7015

在kafka-0.8.2之後,producer不再區分同步(sync)和非同步方式(async),所有的請求以非同步方式傳送,這樣提公升了客戶端效率。producer請求會返回乙個應答物件,包括偏移量或者錯誤信。這種非同步方地批量的傳送訊息到kafka broker節點,因而可以減少server端資源的開銷。新的producer和所有的伺服器網路通訊都是非同步地,在ack=-1模式下需要等待所有的replica副本完成複製時,可以大幅減少等待時間。

在kafka傳送端有3中不同的傳送模式,針對不同的場景可以選擇不同的模式。

1、fire-and-forget

只傳送訊息,不關心訊息是否傳送成功。本質上也是一種非同步傳送的方式,訊息先儲存在緩衝區中,達到設定條件後批量傳送。當然這是kafka吞吐量最高的一種方式,並配合引數acks=0,這樣生產者不需要等待伺服器的響應,以網路能支援的最大速度傳送訊息。但是也是訊息最不可靠的一種方式,因為對於傳送失敗的訊息沒有做任何處理。

producerrecordrecord = new producerrecord("the-topic", key, value);

try catch (exception e) catch (exception e) else

}});

4、非同步傳送相關引數

非同步傳送時,kafka會先把訊息儲存在緩衝池中,當到達設定條件觸發緩衝池訊息傳送。

(1)訊息快取達到batch.size;

(2)距離上一次訊息傳送時間間隔linger.ms;

(3)呼叫flush()方法,會立刻觸發傳送,並阻塞到當前緩衝區傳送完畢;

(4)呼叫close(),觸發傳送,完畢後關閉。

4.1 buffer.memory

此配置設定生產者可用於緩衝等待傳送給brokers訊息的總記憶體位元組數,預設為33554432=32mb。如果訊息傳送到快取區的速度比傳送到broker的速度快,那麼生產者會被阻塞(根據max.block.ms配置的時間,預設為60000ms=1分鐘,在0.9.0.0版本之前使用block.on.buffer.full配置),之後會丟擲異常。

4.3 compression.type

4.4 retries

預設值為0,當設定為大於零的值,客戶端會重新傳送任何傳送失敗的訊息。注意,此重試與客戶端收到錯誤時重新傳送訊息是沒有區別的。在配置max.in.flight.requests.per.connection不等於1的情況下,允許重試可能會改變訊息的順序,因為如果兩個批次的訊息被傳送到同乙個分割槽,第一批訊息傳送失敗但第二批成功,而第一批訊息會被重新傳送,則第二批訊息會先被寫入。注意此引數可能會改變訊息的順序性。

4.5 batch.size

當多個訊息被傳送到同乙個分割槽時,生產者會把它們一起處理。此配置設定用於每批處理使用的記憶體位元組數,預設為16384=16kb。當使用的記憶體滿的時候,生產者會傳送當前批次的所有訊息。但是,這並不意味著生產者會一直等待使用的記憶體變滿,根據下面linger.ms配置的時間也會觸發訊息傳送。設定較小的值會增加傳送的頻率,從而可能會減少吞吐量;設定較大的值會使用較多的記憶體,設定為0會關閉批處理的功能。

4.6 linger.ms

此配置設定在傳送當前批次訊息之前等待新訊息的時間量,預設值為0。kafkaproducer會在當前批次使用的記憶體已滿或等待時間到達linger.ms配置時間的時候傳送訊息。當linger.ms>0時,延時性會增加,但會提高吞吐量,因為會減少訊息傳送頻率。

4.7 client.id

用於標識傳送訊息的客戶端,通常用於日誌和效能指標以及配額。

4.8 max.in.flight.requests.per.connection

此配置設定客戶端在單個連線上能夠傳送的未確認請求的最大數量,預設為5,超過此數量會造成阻塞。設定大的值可以提高吞吐量但會增加記憶體使用,但是需要注意的是,當設定值大於1而且傳送失敗時,如果啟用了重試配置,有可能會改變訊息的順序。設定為1時,即使重新傳送訊息,也可以保證傳送的順序和寫入的順序一致。

4.9 request.timeout.ms

此配置設定客戶端等待請求響應的最長時間,預設為30000ms=30秒,如果在這個時間內沒有收到響應,客戶端將重發請求,如果超過重試次數將拋異常。此配置應該比replica.lag.time.max.ms(broker配置,預設10秒)大,以減少由於生產者不必要的重試造成訊息重複的可能性。

4.10 max.block.ms

當傳送緩衝區已滿或者元資料不可用時,生產者呼叫send()和partitionsfor()方法會被阻塞,預設阻塞時間為60000ms=1分鐘。由於使用使用者自定義的序列化器和分割槽器造成的阻塞將不會計入此時間。

4.11 max.request.size

此配置設定生產者在單個請求中能夠傳送的最大位元組數,預設為1048576位元組=1mb。例如,你可以傳送單個大小為1mb的訊息或者1000個大小為1kb的訊息。注意,broker也有接收訊息的大小限制,使用的配置是message.max.bytes=1000012位元組(好奇怪的數字,約等於1mb)。

4.12 receive.buffer.bytes和send.buffer.bytes

receive.buffer.bytes:讀取資料時使用的tcp接收緩衝區(so_rcvbuf)的大小,預設值為32768位元組=32kb。如果設定為-1,則將使用作業系統的預設值。

send.buffer.bytes:傳送資料時使用的tcp傳送緩衝區(so_sndbuf)的大小,預設值為131072位元組=128kb。如果設定為-1,則將使用作業系統的預設值。

Kafka 訊息傳送

建立乙個kafkaprodecer物件,傳入上面建立的properties物件 kafkaproducerproducer new kafkaproducer mykafkaprops 使用prodecerrecord string topic,string key,string value 建構函...

Kafka 傳送訊息流程

客戶端的幾個元件 一條訊息首先需要確定要被儲存到那個 partition 對應的雙端佇列上 其次,儲存訊息的雙端佇列是以批的維度儲存的,即 n 條訊息組成一批,一批訊息最多儲存 n 條,超過後則新建乙個組來儲存新訊息 其次,新來的訊息總是從左側寫入,即越靠左側的訊息產生的時間越晚 最後,只有當一批訊...

Kafka傳送訊息到HDFS

本文採用的是kafka0.7.2,安裝好kafka後在kafka的contrib目錄下有關於kafka與hadoop的一系列檔案,我們可以使用hadoop consumer目錄下的指令碼及配置檔案將kafka中某topic的資訊傳送到hdfs中。1.修改test目錄下的test.properties...