kafka 資料不丟失

2021-08-21 18:26:29 字數 2153 閱讀 3897

設定引數

props.put("bootstrap.servers", "10.176.2.170:9092,10.176.1.97:9092,10.176.7.57:9092");

//producer用於壓縮資料的壓縮型別。預設是無壓縮

props.put("compression.type", "gzip");

//增加延遲

props.put("linger.ms", "50");

//這意味著leader需要等待所有備份都成功寫入日誌,這種策略會保證只要有乙個備份存活就不會丟失資料。這是最強的保證。

props.put("acks", "all");

props.put("batch.size","16384");

props.put("buffer.memory", "33554432");

//設定大於0的值將使客戶端重新傳送任何資料,一旦這些資料傳送失敗。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同乙個partition,則第乙個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。

props.put("retries ", 30);

props.put("reconnect.backoff.ms", 20000);

props.put("retry.backoff.ms", 20000);

props.put("transactional.id", "my-transactional-id");

props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");

props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");

while(i<100000)

else

}});

system.out.println("producer傳送第 "+i+" 資料:" + personinfo);

/*try catch (interruptedexception e) */

i++;

}很關鍵

producer.flush();

//列出topic的相關資訊

listpartitions = new arraylist() ;

partitions = producer.partitionsfor(topic);

for(partitioninfo p:partitions)

system.out.println("send message over.");

producer.close(100,timeunit.milliseconds);

所以,造成上面生產不成功的原因就是雖然呼叫了producer.send(),但是資料還沒來得及生產到 kafka 集群 主程式就掛掉了,於是資料就沒有生產到 kafka 集群中了~~ 

如果對效能要求不高的話,可以再producer.send()方法呼叫後再呼叫producer.flush()方法,該方法會將資料全部生產到kafka,否則就會阻塞。對於producer.flush()方法,原始碼原話如下:

"flush any accumulated records form the producer. blocks until all sends are complete."
但是這個方法有一點侷限性,就是對效能的影響有點大,這個是要注意的地方~

如果對效能要求比較高,同時也想把資料確切的生產到集群的話,推薦將linger.ms引數設定乙個比0大的值(預設是0),batch.size也可以設定一下(預設是16384),同時用producer.send(producerrecord, callback)來將資料生產到集群中,其中 callback 匿名內部類中的oncompletion()方法用來處理 「確認生產到集群」 的邏輯~~

Kafka重複消費,不丟失資料

kafka0.11.0.0版本正式支援精確一次處理語義exactly once semantic eos kafka冪等性參考 1 冪等producer 保證單個分割槽的只會傳送一次,不會出現重複訊息 2 事務 transation 保證原子性的寫入多個分割槽,即寫入到多個分割槽的訊息要麼全部成功,...

Kafka如何保證資料不丟失

kafka的ack機制 在kafka傳送資料的時候,每次傳送訊息都會有乙個確認反饋機制,確保訊息正常的能夠被收到,其中狀態有0,1,1。producer.type sync request.required.acks 1 producer.type async request.required.ac...

Kafka資料不丟失的策略權衡

一 會丟資料的情況 1 生產端 可通過 producer.type 來選擇傳送模式,預設為 producer.type sync 同步 非同步設定為 async 1 同步模式下 producer 在傳送訊息之後,在得到返回結果前阻塞。這是一種犧牲效能的辦法,而且對於不同的配置,效能的損失程度不同 可...