業務層:
newconsumer(client.mbrokers, group, topic, config)cluster-comsuer.go
構造consumer:
consumer, err := sarama.newconsumerfromclient(client.client)
c.client.refreshcoordinator(groupid)//獲取組協調者起迴圈
for {
c.nexttick
nexttick:
重平衡->訂閱主題->fetchoffsets->針對該主題分割槽下的offset起乙個分割槽消費者->然後呼叫分割槽消費者的consumepartition
comsumer.go
consumepartition(topic string, partition int32, offset int64)->
c.refbrokerconsumer(leader)->
c.newbrokerconsumer(broker)->
go withrecover(bc.subscriptionconsumer)->
bc.fetchnewmessages()->
bc.broker.fetch(request)
broker.go
b.sendandreceive(request, response)->
versioneddecode(buf, res, req.version()) and res is
respons, response is fetchresponse type
encode_decoder.go
in.decode(&helper, version) fetchresponse type decode is:
fetch_repsonse.go
decode(pd packetdecoder, version int16)->
records.decode(recordsdecoder);
records.go
decode(pd packetdecoder)->
record_batch.go
decode(pd packetdecoder) include 解壓縮
生產消費者
producer consumer model include include define buffer size 100 緩衝區數量 define max seq 200 define n consumer 10 消費者數量 define n producer 3 生產者數量 define t ...
生產者消費者 生產者與消費者模式
一 什麼是生產者與消費者模式 其實生產者與消費者模式就是乙個多執行緒併發協作的模式,在這個模式中呢,一部分執行緒被用於去生產資料,另一部分執行緒去處理資料,於是便有了形象的生產者與消費者了。而為了更好的優化生產者與消費者的關係,便設立乙個緩衝區,也就相當於乙個資料倉儲,當生產者生產資料時鎖住倉庫,不...
生成者消費者問題
自己實踐的 生成者消費者問題 public class threaddemo 生產者 class producer implements runnable public void run 消費者 class consumer implements runnable public void run c...