實現使用低階api讀取指定topic,指定partition,指定offset的資料。
1)消費者使用低階api 的主要步驟:
步驟主要工作
1根據指定的分割槽從主題元資料中找到主副本
2獲取分割槽最新的消費進度
3從主副本拉取分割槽的訊息
4識別主副本的變化,重試
指定分割槽,指定offset
(1)根據partition找到leader
(2)找到offset
(3)找到leader中的副本,進行儲存,方便識別主副本的變化
2)方法描述:
findleader()
客戶端向種子節點傳送主題元資料,將副本集加入備用節點
getlastoffset()
消費者客戶端傳送偏移量請求,獲取分割槽最近的偏移量
run()
消費者低階ap i拉取訊息的主要方法
findnewleader()
當分割槽的主副本節點發生故障,客戶將要找出新的主副本
public class ******example
public static void main(string args) catch (exception e)
}public void run(long a_maxreads, string a_topic, int a_partition, lista_seedbrokers, int a_port) throws exception
if (metadata.leader() == null)
string leadbroker = metadata.leader().host();
string clientname = "client_" + a_topic + "_" + a_partition;
//獲取資料的消費者物件
******consumer consumer = new ******consumer(leadbroker, a_port, 100000, 64 * 1024, clientname);
long readoffset = getlastoffset(consumer, a_topic, a_partition, kafka.api.offsetrequest.earliesttime(), clientname);
int numerrors = 0;
while (a_maxreads > 0)
//可以有多個addfetch方法,也就是這裡可以傳入多個topic和partition
fetchrequest req = new fetchrequestbuilder().clientid(clientname).addfetch(a_topic, a_partition, readoffset, 100000).build();
fetchresponse fetchresponse = consumer.fetch(req);
if (fetchresponse.haserror())
consumer.close();
consumer = null;
leadbroker = findnewleader(leadbroker, a_topic, a_partition, a_port);
continue;
}numerrors = 0;
long numread = 0;
for (messageandoffset messageandoffset :fetchresponse.messageset(a_topic, a_partition))
readoffset = messageandoffset.nextoffset();
bytebuffer payload = messageandoffset.message().payload();
byte bytes = new byte[payload.limit()];
payload.get(bytes);
system.out.println(string.valueof(messageandoffset.offset()) + ": " + new string(bytes, "utf-8"));
numread++;
a_maxreads--;
}if (numread == 0) catch (interruptedexception ie) }}
if (consumer != null)
consumer.close();
}public static long getlastoffset(******consumer consumer, string topic, int partition, long whichtime, string clientname)
long offsets = response.offsets(topic, partition);
return offsets[0];
}//當主leader掛了後,只需要從follower獲取變成leader的主機
private string findnewleader(string a_oldleader, string a_topic, int a_partition, int a_port)throws exception else if (metadata.leader() == null) else if (a_oldleader.equalsignorecase(metadata.leader().host()) && i == 0) else
if (gotosleep)
}system.out.println("unable to find new leader after broker failure. exiting");
throw new exception("unable to find new leader after broker failure. exiting");
}private partitionmetadatafindleader(lista_seedbrokers, int a_port, string a_topic, int a_partition) }}
} catch (exception e) finally
}if (returnmetadata != null)
}return returnmetadata;}}
kafka消費者無法消費異常
今天被乙個kafka消費異常折磨了一天,頭差點炸了,還好最後解決了它 異常 伺服器 record is corrupt 記錄損壞 不明原因 有可能磁碟空間不足導致 導致消費者無法正常消費訊息 卡在某乙個offset 不能繼續消費 解決辦法 先停掉消費者程式 殺掉程序 不可關閉kafka服務 然後手動...
kafka 主動消費 Kafka消費者的使用和原理
publicstaticvoidmain string args finally 前兩步和生產者類似,配置引數然後根據引數建立例項,區別在於消費者使用的是反序列化器,以及多了乙個必填引數 group.id,用於指定消費者所屬的消費組。關於消費組的概念在 kafka中的基本概念 中介紹過了,消費組使得...
010 消費者消費訊息API
一 概述 在前面講述api的時候,沒有說明消費者的api,本次在這裡需要重點的進行說明,二 建立消費者 我們使用現在推薦使用的方式來建立乙個消費者.下面展示建立乙個消費者具體的 現在推薦使用的就是建立乙個defaultconsumer的子類,重寫其中對應的方法,這是一種面向事件的程式設計模型.con...