今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。
詳細流程參考 中的邏輯流程圖。
主要**檔案:
1.nsqd/protocol_v2.go
ioloop函式
func (p *protocolv2) ioloop(conn net.conn) error else
//以\n 為分隔符作為一條命令的結束(協議)
line, err = client.reader.readslice('\n')
...//每條命令 以separatorbytes(這裡定義的是空格) 作為命令引數分解符(協議)
params := bytes.split(line, separatorbytes)
var response byte
//執行命令
response, err = p.exec(client, params)
if err != nil
//傳送響應給客戶端
if response != nil
} ....
}
messagepump函式
func (p *protocolv2) messagepump(client *clientv2, startedchan chan bool) else if flushed else
select
case b := <-backendmsgchan: //消費磁碟channel的訊息,
....
case msg := <-memorymsgchan: //消費記憶體channel的訊息
if samplerate > 0 && rand.int31n(100) > samplerate
msg.attempts++ //嘗試重新消費的次數,如果超過這個次數,不應該再重新消費
//加入等待消費確認的佇列,如果超過msgtimeout,就把被重新被消費,加入這個隊裡。
subchannel.startinflighttimeout(msg, client.id, msgtimeout)
//增加總共傳送的訊息總數 和 待消費確認的訊息總數
client.sendingmessage()
//傳送訊息到客戶快取
err = p.sendmessage(client, msg)
if err != nil
flushed = false
case <-client.exitchan:
goto exit
} }...
}
exec函式
func (p *protocolv2) exec(client *clientv2, params byte) (byte, error)
err := enforcetlspolicy(client, p, params[0]) //tls 加密
if err != nil
switch
return nil, protocol.newfatalclienterr(nil, "e_invalid", fmt.sprintf("invalid command %s", params[0]))
}
總結
1.ioloop 主要實現兩個迴圈處理,乙個是messagepump,主要是對心跳檢測,訂閱的消費的訊息輸出等。另乙個是對客戶端請求命令的處理exec。
NSQ原始碼分析之概述
目前,看了nsqlookupd的 寫的真的很精美,我覺得 可以和redis相媲美,這等後續分析 時再詳說 關於nsq的特性,可以檢視nsq官網 這篇文章主要分析以下幾點 nsq提供了三大元件以及一些工具,三大元件為 nqsd nsq主要元件,用於儲存訊息以及分發訊息 nsqlookupd 用於管理n...
NSQ 原始碼分析之NSQD Topic
今天主要講的是nsq topic 的 實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。主要 檔案 1.nsqd topic.go topic結構體 type topic structnewtopic 函式 主要做三件事,一是例項化topic,二是開啟m...
NSQ 原始碼分析之NSQD Channel
今天主要講的是nsq channel 的 實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。主要 檔案 1.nsqd channel.go channel結構體 type channel structnewchannel 主要實現channel...