# 低階api:
props.
put(
"group.id"
,"01");
# offset自動重置,offset可能因為快取刪除,序號不一定從0開始
props.
put(consumerconfig.auto_offset_reset_config,
"earliest");
# 高階api:
consumer.
seek
(new
topicpartition
(topic, partititon, offset)
);
消費者組id、topic和partition唯一確定乙個offset。
可以檢視_consumer_offsets這個topic裡的資料。
intercetpor的實現介面是org.apache.kafka.clients.producer.producerinterceptor。
onsend(producerrecord)方法在訊息被序列號及計算分割槽之前呼叫。
onacknowledgement(recordmetadata, exception)在訊息被應答或傳送失敗時呼叫。
// 配置檔案新增***鏈
// props.put(producerconfig.interceptor_classes_config, list);
public
class
interceptor
implements
producerinterceptor
@override
public
void
onacknowledgement
(recordmetadata metadata, exception exception)
else
}@override
public
void
close()
@override
public
void
configure
(map?> configs)
}
輕量級(功能性弱) ,實時性(非微批次處理,視窗允許亂序資料,允許資料遲到),一條條資料處理。
>
>
org.apache.kafkagroupid
>
>
kafka-streamsartifactid
>
>
$version
>
dependency
>
// kafka streams處理案例
public
class
streams
,"source").
addsink
("sink"
,"second"
,"processor");
// 建立配置檔案
properties props =
newproperties()
; props.
put(
"bootstrap.servers"
,"hadoop01:9092");
props.
put(
,"kafkastreams");
// 建立kafka streams物件
kafkastreams kafkastreams =
newkafkastreams
(topologybuilder, props)
;// 開啟流處理
kafkastreams.
start()
;}}// topologybuilder.addprocessor中的prossorsupplier返回的processor類這裡做定製
public
class
myprocessor
implements
processor
<
byte
,byte
>
// 處理邏輯
@override
public
void
process
(byte
bytes,
byte
bytes2)
@override
public
void
punctuate
(long l)
@override
public
void
close()
}
資料傳輸層。
flume:cloudera公司研發,適合多個生產者,適合下游資料消費者不多的情況(費記憶體),適合資料安全性要求不高的操作,適合與hadoop生態圈對接的操作。
kafka:linkedin公司研發,適合資料下游消費眾多的情況(快取資料跟消費者個數無關),適合資料安全性較高的操作(資料在磁碟,備份),支援relication。
多個agent後台資料,交由乙個agent彙總,對接kafka,離線/實時兩條線消費。
物件擴充套件內容
const function def es6可以簡潔如下表示,上下是一樣的 const function def 我們知道訪問屬性的方式 點運算子和中括號運算子,區別在於點運算子後面不可以是變數或者數字,而中括號卻可以!let obj obj.name 3 obj 1 kangkang obj ad...
如何擴充套件Kafka的broker
背景 因為公司收集終端盒子資料的kafka服務偶爾會倒,所以考慮使用kafka的分布式,增加broker節點,來提高系統的可用性。當然,zookeeper服務節點也是可以增加的,但不在本文範圍內。具體步驟如下 1.新加kafka服務,並啟動 如果是同一伺服器,則可以拷貝新建server.proper...
oldboyshell程式設計擴充套件內容
oldboyshell程式設計擴充套件內容 一 命令的優先順序 命令分為 alias compound commands function build in hash path error command not found 獲取乙個命令會按照上述優先順序取尋找,先找同名的alias命令,再找com...