目錄spring-kafka生產端
producer向broker傳送訊息資料,需要有一定的可靠性,至少要保證資料:
1、不丟失
2、不重複
producer提供了一些引數,在編寫producer是進行合理設定和編寫,就可以保證資料的可靠性。
acks 引數配置
為保證producer傳送的資料能夠可靠的傳送到指定topic,topic的每個partition收到訊息後,都需要向producer傳送ack(acknowledgement確認收到),如果producer收到 ack,就會進行下一輪的傳送,否則重新傳送資料。
0: producer 不等待 broker 的 ack,這一操作提供了乙個最低的延遲, broker 一接收到還沒有寫入磁碟就已經返回,當 broker 故障時有可能丟失資料;
1: producer 等待 broker 的 ack, partition 的 leader 落盤成功後返回 ack,如果在 follower同步成功之前 leader 故障,那麼將會丟失資料;
-1(all) : producer 等待 broker 的 ack, partition 的 leader 和 follower 全部落盤成功後才返回 ack。但是如果在 follower 同步完成後, broker 傳送 ack 之前leader 發生故障,那麼會造成資料重複。
exactly once 語義
當ack級別設定為-1的時候,可以保證producer到broker之間不會丟失資料,即at
least once 語義 。相對的,將伺服器ack級別設定為0,可以保證生產者每條訊息只會被傳送一次,即at most once 語義 。
at least once 可以保證資料不丟失,但是不能保證資料不重複;相對的, at least once可以保證資料不重複,但是不能保證資料不丟失。
對於一些重要資訊,我們要求既不能重複也不能丟失,這時我們需要使用exactly once 語義 。0.11 版本的 kafka,引入了一項重大特性:冪等性。 所謂冪等性就是producer無論向broker傳送了多少次重複資料,broker都只會持久化一條。冪等性結合at least once語義,就結合成了kafka的exactly once語義。
at least once + 冪等性 = exactly once
啟動冪等性,只需要將producer的引數enable.idompotence 設定為true,ack設定為-1即可。
開啟冪等性的producer在初始化的時候會被分配乙個pid,發往同乙個分割槽的訊息會附帶sequence number(自動增長)。broker端會對做快取,當具有相同主鍵的訊息提交的時候,broker只會持久化一條訊息。
msg1
msg2
msg2
但是,pid重啟就會變化,同時不同分割槽也會有不同主鍵,所以冪等性無法保證跨分割槽跨會話。這裡我們就需要引進kafka事務。
事務kafka 從 0.11 版本開始引入了事務支援。事務可以保證 kafka 在 exactly once 語義的基礎上,生產和消費可以跨分割槽和會話,要麼全部成功,要麼全部失敗 。為了實現跨分割槽跨會話事務,引入乙個全域性唯一的transaction id ,將pproducer的pid和transaction id進行繫結,這樣,當producer重啟後,就可以通過transaction id 獲得原來的 pid。這個引數通過客戶端程式來進行設定 。
我們使用kafka訊息事務的場景有以下兩種:
在一次業務中,存在消費訊息,又存在生產訊息。此時如果訊息生產失敗,那麼消費者需要回滾。這種情況稱為consumer-transform-producer
在一次業務中,存在多次生產訊息,其中後續生產的訊息丟擲異常,前置生產的訊息需要回滾。
事務要求生產者開啟冪等性特性,因此通過將transactional.id引數設定為非空從而開啟事務特性的同時
需要將producerconfig.enable_idempotence_config設定為true(預設值為true),如果顯示設
置為false,則會丟擲異常。
以上是保證producer傳送資料可靠性保證的相關引數,結合spring-kafka的具體使用如下。
spring-kafkaproducer.xml配置:
<?xml version="1.0" encoding="utf-8"?>
部分重要引數詳解:
acks:
這個引數用來指定分割槽中必須有多少個副本收到這條訊息,之後生產者才會認為這條訊息時寫入成功
的。retries :
生產者從伺服器收到的錯誤有可能是臨時性的錯誤(比如分割槽找不到首領)。在這種情況下,如果達到
了 retires 設定的次數,生產者會放棄重試並返回錯誤。預設情況下,生產者會在每次重試之間等待
100ms,可以通過 retry.backoff.ms 引數來修改這個時間間隔。
batch.size :
當有多個訊息要被傳送到同乙個分割槽時,生產者會把它們放在同乙個批次裡。該引數指定了乙個批次可
以使用的記憶體大小,按照位元組數計算,而不是訊息個數。當批次被填滿,批次裡的所有訊息會被傳送出
去。不過生產者並不一定都會等到批次被填滿才傳送,半滿的批次,甚至只包含乙個訊息的批次也可能
被傳送。所以就算把 batch.size 設定的很大,也不會造成延遲,只會占用更多的記憶體而已,如果設定
的太小,生產者會因為頻繁傳送訊息而增加一些額外的開銷。
max.request.size :
該引數用於控制生產者傳送的請求大小,它可以指定能傳送的單個訊息的最大值,也可以指單個請求裡
所有訊息的總大小。 broker 對可接收的訊息最大值也有自己的限制( message.max.size ),所以兩
邊的配置最好匹配,避免生產者傳送的訊息被 broker 拒絕。
linger.ms:批處理延遲時間上限
buffer.memory:批處理緩衝區
enable.idempotence:是否開啟冪等性
producerlistener類
訊息傳送後的**方法,注意的是,這裡的監聽回顯的資料時要傳送的資料,不是返回的資料,可以通過日誌來觀察傳送資料是否正確。
public class kafkasendresulthandler implements producerlistener
public void onerror(string topic, integer partition, object key, object value, exception e)
public boolean isinterestedinsuccess()
}
producerclient類
對kafkatemplate的再一次封裝,kafka在訊息傳送的時候傳送方式可以分為同步傳送和非同步傳送。
同步傳送:
同步傳送的意思就是,一條訊息傳送之後,會阻塞當前執行緒, 直至返回 ack。
//同步傳送
public void syncsend()
非同步傳送:
//非同步傳送
public void asyncsend()
@override
public void onfailure(throwable ex)
});}
producerclient對kafkatemplate的封裝(不帶事務)
這裡只封裝了最簡單的傳送方法,同時可對其他傳送方法進行封裝,只需要修改傳參即可。
public class producerclient else
}else
} catch (interruptedexception e) catch (executionexception e)
system.out.println("kafkaservers response : "+m);
}}
public class kafkamesconstant
測試一下public class excuter
}
控制台結果:(我這裡沒有使用日誌輸出,在實際開發中需要使用日誌開發)
producerlistener started
kafka message send successful : ---topic:topic2---partition:null---key:null---value:2019-11-19 02:57:07---recordmetadata:topic2-2@4928
kafkaservers response :
Kafka之生產者
1 方便在集群中擴充套件,乙個topic可以有多個partition組成,而每個partition可以通過調整以適應它所在的機器 2 可以提高併發,因為可以以partition為單位讀寫 我們需要將生產者傳送的資料封裝成乙個producerrecord物件。1 指明partition的情況下,直接將...
kafka 生產者(二)
想要提高生產者的吞吐量可以通過調整一下4個引數來實現 batch.size 批次大小,預設16k linger.ms 等待時間,修改為5 100ms recordaccumulator 緩衝區大小,修改為64m 實現 public class customproducerparameters 關閉資...
kafka生產者分割槽策略
kafka生產者 分割槽策略 分割槽的原因 1 方便在集群中擴充套件,每個partition可以通過調整以適應它所在的機器,而乙個topic又 可以有多個partition組成,因此整個集群就可以適應任意大小的資料了 2 可以提高併發,因為可以以partition為單位讀寫了。分割槽的原則 1 指明...