// 建立乙個kafkaprodecer物件,傳入上面建立的properties物件
kafkaproducerproducer = new kafkaproducer(mykafkaprops);
/** * 使用prodecerrecord(string topic,string key,string value)建構函式建立訊息
* 建構函式接受三個引數:
* topic--告訴kafkaproducer訊息傳送到哪個topic;
* key--告訴kafkaproducer,所傳送訊息的key值,需要和配置檔案中的型別一直
* value--告訴kafkaproducer,訊息所傳送的value值,同上
*/producerrecordrecord = new producerrecord("mysecondtopic","messagekey1","hello kafka");
//立即傳送
producer.send(record);
//同步傳送
futurefuture = producer.send(record);
recordmetadata recordmetadata = future.get();
//非同步傳送
producer.send(record,new callback ()
else
}});
訊息的傳送方式是站在訊息生產者的角度巨集**其訊息資料傳送的情況的。而訊息資料是儲存在分割槽中的,而分割槽又有多個副本,所以一條訊息在被傳送到broker之後何時算投遞成功呢?kafka提供了三種模式。
生產者提供了自動重試機制,當從broker接收到的是臨時可恢復的異常時,生產者會向broker重發訊息,但不能無限制重發,如果重發次數達到限制值,生產者將不再重試並返回錯誤。這裡的限制值是由初始化生產者物件時的retries屬性決定的,在預設情況下生產者會在重試後等待100ms,可通過retry.backoff.ms屬性進行修改。建議在設定這兩個引數前測試節點恢復所用時間,重試時間要比節點恢復時間要長,否則生產者會過早地放棄重試動作。
生產者傳送多個訊息到同乙個分割槽的時候,為了減少網路帶來的系能開銷,kafka會對訊息進行批量傳送
kafka訊息傳送模式
在kafka 0.8.2之後,producer不再區分同步 sync 和非同步方式 async 所有的請求以非同步方式傳送,這樣提公升了客戶端效率。producer請求會返回乙個應答物件,包括偏移量或者錯誤信。這種非同步方地批量的傳送訊息到kafka broker節點,因而可以減少server端資源...
Kafka 傳送訊息流程
客戶端的幾個元件 一條訊息首先需要確定要被儲存到那個 partition 對應的雙端佇列上 其次,儲存訊息的雙端佇列是以批的維度儲存的,即 n 條訊息組成一批,一批訊息最多儲存 n 條,超過後則新建乙個組來儲存新訊息 其次,新來的訊息總是從左側寫入,即越靠左側的訊息產生的時間越晚 最後,只有當一批訊...
Kafka傳送訊息到HDFS
本文採用的是kafka0.7.2,安裝好kafka後在kafka的contrib目錄下有關於kafka與hadoop的一系列檔案,我們可以使用hadoop consumer目錄下的指令碼及配置檔案將kafka中某topic的資訊傳送到hdfs中。1.修改test目錄下的test.properties...