kafka 訊息傳送和接收

2021-09-11 02:07:09 字數 2643 閱讀 4783

傳送**例項

public class kafkaproducerdemo extends thread

@override

public void run()

}});

}else catch (interruptedexception e) catch (executionexception e)

}num++;

try catch (interruptedexception e) }}

public static void main(string args)

訊息消費**示例

public class kafkaconsumerdemo extends thread

@override

public void run() }}

public static void main(string args)

}

傳送端重要引數說明:

acks :表示 producer 傳送訊息到 broker 上以後的確認值。有三個可選項

0:表示 producer 不需要等待 broker 的訊息確認。這個選項時延最小但同

時風險最大(因為當 server 宕機時,資料將會丟失)。

1:表示 producer 只需要獲得 kafka 集群中的 leader 節點確認即可,這個

選擇時延較小同時確保了 leader 節點確認接收成功。

all(-1):需要 isr 中所有的 replica 給予接收確認,速度最慢,安全性最高,

但是由於 isr 可能會縮小到僅包含乙個 replica,所以設定引數為 all 並不能一

定避免資料丟失

batch.size

生產者傳送多個訊息到 broker 上的同乙個分割槽時,為了減少網路請求帶來的

效能開銷,通過批量的方式來提交訊息,可以通過這個引數來控制批量提交的

位元組數大小,預設大小是 16384byte,也就是 16kb,意味著當一批訊息大小達

到指定的 batch.size 的時候會統一傳送

linger.ms

producer 缺省會把兩次傳送時間間隔內收集到的所有 requests 進行一次聚合

然後再傳送,以此提高吞吐量,而 linger.ms 就是為每次傳送到 broker 的請求

增加一些 delay,以此來聚合更多的 message 請求。 這個有點像tcp 裡面的

nagle 演算法,在 tcp 協議的傳輸中,為了減少大量小資料報的傳送,採用了

nagle 演算法,也就是基於小包的等-停協議

batch.size 和 linger.ms 這兩個引數是 kafka 效能優化的關鍵引數,很多同

學會發現 batch.size 和 linger.ms 這兩者的作用是一樣的,如果兩個都配置

了,那麼怎麼工作的呢?實際上,當二者都配置的時候,只要滿足其中乙個要

求,就會傳送請求到 broker 上

max.request.size

設定請求的資料的最大位元組數,為了防止發生較大的資料報影響到吞吐量,默

認值為 1mb

消費端重要引數說明:

group.id

組 是 kafka 提供的可擴充套件且具有容錯性的消費者機制。乙個組內可以有多個消費者或消費者例項consumer instance),它們共享乙個公共的 id,即 group id。組內的所有消費者協調一起來消費訂閱主題(subscribed topics)的所有分割槽(partition)。每個分割槽只能由同乙個消費組內的乙個 consumer 來消費.如下圖所示,分別有三個消費者,屬於兩個不同的 group,那麼對於 firsttopic 這個 topic 來說,這兩個組的消費者都能同時消費這個 topic 中的訊息,此時,這個 firsttopic 就類似於 activemq 中的 topic 概念(一條訊息可以被多個組同時消費)。如右圖所示,如果 3 個消費者都屬於同乙個group,那麼此時 firsttopic 就是乙個 queue 的概念。

消費者消費訊息以後自動提交. 當訊息提交以後,該訊息才不會被再次接收到, 該配置可以配合 auto.commit.interval.ms 控制自動提交的頻率.也可以通過 consumer.commitsync()的方式實現手動提交。

auto.offset.reset

對於這個引數是針對新的 groupid 中的消費者而言的,當有新 groupid 的消費者來消費指定的 topic 時,對於該引數的配置,會有不同的意義

auto.offset.reset=latest 情況下,新的消費者將會從其他消費者最後消費的offset 處開始消費 topic 下的訊息

auto.offset.reset= earliest 情況下,新的消費者會從該 topic 最早的訊息開始消費

auto.offset.reset=none 情況下,新的消費者加入以後,由於之前不存在offset,則會直接丟擲異常

max.po ll.records

此設定限制每次呼叫 poll 返回的訊息數,這樣可以更容易的**每次 poll 間隔要處理的最大值。通過調整此值,可以減少 poll 間隔

通過kafka傳送和接收訊息

生產者配置類 configuration enablekafka public class kafkaproducerconfig private string address value private string batchsize value private string linger pu...

go 實現 kafka 訊息傳送 接收

kafka是訊息中介軟體的一種,是一種分布式流平台,是用於構建實時資料管道和流應用程式。具有橫向擴充套件,容錯,wicked fast 快 等優點。生產者 producer 將訊息記錄 record 傳送到kafka中的主題中 topic 乙個主題可以有多個分割槽 partition 訊息最終儲存在...

接收kafka訊息

kafka server 127.0.0.1 8081,127.0.0.1 8082,127.0.0.1 8083,127.0.0.1 8084 topics eseal hr test 2 gourp id hthr value string servers value string groupi...