Kafka2 0消費者客戶端使用

2021-09-25 06:31:06 字數 4031 閱讀 1157

kafka 通過 kafkaconsumer 構造器初始化生產者客戶端的配置。

常用的重要配置,詳見官網。

group.id:消費組 id

key.serializer:實現了 kafka 序列化介面的類,用來序列化 key。

value.serializer:實現了 kafka 序列化介面的類,用來序列化 value。

enable.auto.commit:預設 true,表示消費者偏移量會定期提交到後台。

auto.offset.reset:kafka 的偏移量。

earliest:自動重置為最早的偏移量。

latest:自動重置為最新的偏移量。

none:如果沒有找到消費組之前的那個偏移量,則消費者丟擲異常。

其他:消費者丟擲異常。

fetch.min.bytes/fetch.max.bytes:消費者一次拉取的最小/最大值。

max.poll.interval.ms:消費者拉取的最大間隔時間,超時後從組中移除消費者。

heartbeat.interval.ms:心跳傳送間隔的超時時間,超時後從組中移除消費者。

isolation.level:事務的隔離級別。

read_uncommitted:預設,可以消費到所有訊息,包括被中止的訊息。

read_committed:只能消費到事務提交過的訊息。

非事務性訊息無條件返回。

// 基礎配置

mapconfigs = new hashmap<>();

configs.put(consumerconfig.bootstrap_servers_config, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");

configs.put(consumerconfig.group_id_config, "my_test");

configs.put(consumerconfig.enable_auto_commit_config, true);

configs.put(consumerconfig.auto_offset_reset_config, "earliest");

configs.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class);

configs.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class);

kafkaconsumerconsumer = new kafkaconsumer<>(configs);

kafka 消費者提供4種方式訂閱主題,1種方式指定分割槽。

// 指定主題

public void subscribe(collectiontopics, consumerrebalancelistener listener)

public void subscribe(collectiontopics)

public void subscribe(pattern pattern, consumerrebalancelistener listener)

public void subscribe(pattern pattern)

// 指定分割槽

public void assign(collectionpartitions)

kafkaconsumerconsumer = new kafkaconsumer<>(configs);

consumer.subscribe(collections.singletonlist("test")); // 指定主題

consumerrecordsrecords = consumer.poll(duration.ofseconds(3));

topicpartition tp = new topicpartition("test", 0);

consumer.assign(collections.singletonlist(tp)); // 訂閱指定分割槽

consumer.seek(tp, 4l); // 指定分割槽偏移量值為4

consumerrecordsrecords = consumer.poll(duration.ofseconds(3));

topicpartition tp = new topicpartition("test", 0);

consumer.assign(collections.singletonlist(tp)); // 訂閱指定分割槽

maptptime = new hashmap<>();

tptime.put(tp, 1563027475113l); // 指定時間戳

maptpoffsetandtime = consumer.offsetsfortimes(tptime);

long offset = tpoffsetandtime.get(tp).offset(); // 獲取偏移量

consumer.seek(tp, offset); // 指定偏移量

consumerrecordsrecords = consumer.poll(duration.ofseconds(3));

引數說明

public void commitsync()

public void commitsync(duration timeout)

public void commitsync(final mapoffsets)

public void commitsync(final mapoffsets, final duration timeout)

引數說明

public void commitasync()

public void commitasync(offsetcommitcallback callback)

public void commitasync(final mapoffsets, offsetcommitcallback callback)

// 獲取分配給當前消費者的分割槽集合

public setassignment()

// 取消訂閱

public void unsubscribe()

// 找到指定分割槽的第乙個偏移量

public void seektobeginning(collectionpartitions)

// 找到指定分割槽的最後乙個偏移量

public void seektoend(collectionpartitions)

// 獲取指定分割槽即將消費的下乙個偏移量

public long position(topicpartition partition)

// 獲取指定分割槽最後提交的偏移量

public offsetandmetadata committed(topicpartition partition)

// 獲取指定主題的分割槽列表

public listpartitionsfor(string topic)

// 獲取所有主題的資訊

public map> listtopics()

// 暫停消費

public void pause(collectionpartitions)

// 恢復被暫停的消費

public void resume(collectionpartitions)

// 獲取暫停的分割槽列表

public setpaused()

// 獲取指定分割槽第乙個偏移量

public mapbeginningoffsets(collectionpartitions)

// 獲取指定分割槽最後乙個偏移量

public mapendoffsets(collectionpartitions)

// 喚醒消費者

public void wakeup()

Kafka2 0消費者客戶端使用

kafka 通過 kafkaconsumer 構造器初始化生產者客戶端的配置。常用的重要配置,詳見官網。group.id 消費組 id key.serializer 實現了 kafka 序列化介面的類,用來序列化 key。value.serializer 實現了 kafka 序列化介面的類,用來序列...

kafka消費者客戶端重要引數

kafka消費者客戶端重要引數說明 bootstrap.serverskafka集群broker位址列表 key.deserializer訊息中key對應的反序列化類 value.deserializer訊息中value對應的反序列化類 group.id消費者所屬消費者組的唯一標識 client.i...

Kafka2 0生產者客戶端原始碼分析

初始化引數配置。初始化記錄累加器 recordaccumulator。初始化 kafka 連線 kafkaclient,發現集群的所有節點加入快取。初始化實現了 runnable 介面的 sender 物件,並在 iothread 中啟動執行緒。執行訊息 查詢 kafka 集群元資料 序列化 key...