nsq原始碼之生產者發布訊息

2021-09-13 18:19:57 字數 2681 閱讀 3099

研究完了消費者,那麼我們就要看看生產者是如何工作的

// producer 生產者

func producer()

i := 1

for

time.sleep(time.second * 5)

i++}}

首先通過nsq的newproducer的方法構建了乙個producer,注意在這裡傳入的adder是nsqd的位址,接下來就是呼叫producer的publish將我們要傳送的topic和訊息發布出去

func (w *producer) publish(topic string, body byte) error
呼叫publish方法,將topic和要發布的訊息封裝程command:

func publish(topic string, body byte) *command 

return &command

}

func (w *producer) sendcommand(cmd *command) error 

t := <-donechan

return t.error

}

首先建立了乙個chan用來接收發布的結果,將這個chan和cmd一起傳入sendcommandasync方法進行訊息的傳送

func (w *producer) sendcommandasync(cmd *command, donechan chan *producertransaction,

args inte***ce{}) error

} t := &producertransaction

select

return nil

}

記錄正在傳送的訊息的數目,之後檢查produer是否和nsqd建立了連線,沒有的話先建立連線,最後將傳入的引數封裝成producertransaction放到transactionchan 中等待被傳送

func (w *producer) connect() error 

switch state := atomic.loadint32(&w.state); state

w.log(loglevelinfo, "(%s) connecting to nsqd", w.addr)

logger, loglvl := w.getlogger()

w.conn = newconn(w.addr, &w.config, &producerconndelegate)

w.conn.setlogger(logger, loglvl, fmt.sprintf("%3d (%%s)", w.id))

_, err := w.conn.connect()

if err != nil

atomic.storeint32(&w.state, stateconnected)

w.closechan = make(chan int)

w.wg.add(1)

go w.router()

return nil

}

在connect中,先檢查producer的連線狀態是否可以進行連線,依舊是通過newconn方法建立conn,並且將producer進行了委託包裝,建立成功後建立連線,最後啟動了乙個goroutine。

連線方法w.conn.connect()和consumer的是一樣的,就是在建立完連線後會啟動兩個goroutine分別用來讀和寫。我們重點看一下router方法:

func (w *producer) router() 

case data := <-w.responsechan:

w.poptransaction(frametyperesponse, data)

case data := <-w.errorchan:

w.poptransaction(frametypeerror, data)

case <-w.closechan:

goto exit

case <-w.exitchan:

goto exit

} }exit:

w.transactioncleanup()

w.wg.done()

w.log(loglevelinfo, "exiting router")

}

在這個方法裡就是監聽多個chan,分別是:是否有需要傳送的訊息,是否有收到的響應,是否有錯誤,是否有退出訊息

當成功或者失敗發布訊息的時候,data都會得到資料並呼叫poptransaction方法

func (w *producer) poptransaction(frametype int32, data byte) 

} t.finish()

}

首先獲取第乙個transactions中的元素,如果是錯誤的響應,那麼給他的error上設定錯誤資訊,最後呼叫finish方法

func (t *producertransaction) finish() 

}

就是向傳送訊息是建立的donechan中傳入傳送結果,那麼使用者就可以通過donechan知道訊息是否傳送成功了。

這就是生產者發布訊息的大體流程

Kafka生產者原始碼解析

1.生產者客戶端如何獲取要生產資料的topic元資料 2.生產者如何組裝訊息 3.生產者組裝好訊息後是直接傳送到broker端嗎?4.訊息是如何傳送到broker端的呢?5.生產者中,如果配置了訊息壓縮策略,同時服務端也配置了壓縮策略,並且兩個地方設定的策略不同,那採用哪個策略呢?6.如果多個客戶端...

kafka原始碼 Kafka生產者原始碼分析

我們kafka原始碼的doc文件拿下來乙個demo,這個demo用於向服務端傳送訊息,從這個例子我們可以知道傳送訊息是通過生產者的kafkaproducer來完成的,這一篇文章將先完成kafkaproducer的分析 properties props new properties props.put...

kafka原始碼分析 生產者 消費者

kafka 2.5 kafka測試 位址 producer consumer 因為是單執行緒模型,當處理接收到的返回訊息時是不能傳送心跳 執行緒不安全 乙個partition只能分配給乙個consumer,乙個consumer可以處理多個partition 新版本的將kafka consumer的消...