設定引數
props.put("bootstrap.servers", "10.176.2.170:9092,10.176.1.97:9092,10.176.7.57:9092");
//producer用於壓縮資料的壓縮型別。預設是無壓縮
props.put("compression.type", "gzip");
//增加延遲
props.put("linger.ms", "50");
//這意味著leader需要等待所有備份都成功寫入日誌,這種策略會保證只要有乙個備份存活就不會丟失資料。這是最強的保證。
props.put("acks", "all");
props.put("batch.size","16384");
props.put("buffer.memory", "33554432");
//設定大於0的值將使客戶端重新傳送任何資料,一旦這些資料傳送失敗。注意,這些重試與客戶端接收到傳送錯誤時的重試沒有什麼不同。允許重試將潛在的改變資料的順序,如果這兩個訊息記錄都是傳送到同乙個partition,則第乙個訊息失敗第二個傳送成功,則第二條訊息會比第一條訊息出現要早。
props.put("retries ", 30);
props.put("reconnect.backoff.ms", 20000);
props.put("retry.backoff.ms", 20000);
props.put("transactional.id", "my-transactional-id");
props.put("key.serializer", "org.apache.kafka.common.serialization.stringserializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.stringserializer");
while(i<100000)
else
}});
system.out.println("producer傳送第 "+i+" 資料:" + personinfo);
/*try catch (interruptedexception e) */
i++;
}很關鍵
producer.flush();
//列出topic的相關資訊
listpartitions = new arraylist() ;
partitions = producer.partitionsfor(topic);
for(partitioninfo p:partitions)
system.out.println("send message over.");
producer.close(100,timeunit.milliseconds);
所以,造成上面生產不成功的原因就是雖然呼叫了producer.send()
,但是資料還沒來得及生產到 kafka 集群 主程式就掛掉了,於是資料就沒有生產到 kafka 集群中了~~
如果對效能要求不高的話,可以再producer.send()
方法呼叫後再呼叫producer.flush()
方法,該方法會將資料全部生產到kafka,否則就會阻塞。對於producer.flush()
方法,原始碼原話如下:
"flush any accumulated records form the producer. blocks until all sends are complete."
但是這個方法有一點侷限性,就是對效能的影響有點大,這個是要注意的地方~
如果對效能要求比較高,同時也想把資料確切的生產到集群的話,推薦將linger.ms
引數設定乙個比0
大的值(預設是0
),batch.size
也可以設定一下(預設是16384),同時用producer.send(producerrecord, callback)
來將資料生產到集群中,其中 callback 匿名內部類中的oncompletion()
方法用來處理 「確認生產到集群」 的邏輯~~
Kafka重複消費,不丟失資料
kafka0.11.0.0版本正式支援精確一次處理語義exactly once semantic eos kafka冪等性參考 1 冪等producer 保證單個分割槽的只會傳送一次,不會出現重複訊息 2 事務 transation 保證原子性的寫入多個分割槽,即寫入到多個分割槽的訊息要麼全部成功,...
Kafka如何保證資料不丟失
kafka的ack機制 在kafka傳送資料的時候,每次傳送訊息都會有乙個確認反饋機制,確保訊息正常的能夠被收到,其中狀態有0,1,1。producer.type sync request.required.acks 1 producer.type async request.required.ac...
Kafka資料不丟失的策略權衡
一 會丟資料的情況 1 生產端 可通過 producer.type 來選擇傳送模式,預設為 producer.type sync 同步 非同步設定為 async 1 同步模式下 producer 在傳送訊息之後,在得到返回結果前阻塞。這是一種犧牲效能的辦法,而且對於不同的配置,效能的損失程度不同 可...