NSQ 原始碼分析之NSQD ProtocolV2

2021-10-03 20:56:10 字數 1813 閱讀 5693

今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。

詳細流程參考  中的邏輯流程圖。

主要**檔案:

1.nsqd/protocol_v2.go

ioloop函式

func (p *protocolv2) ioloop(conn net.conn) error  else 

//以\n 為分隔符作為一條命令的結束(協議)

line, err = client.reader.readslice('\n')

...//每條命令 以separatorbytes(這裡定義的是空格) 作為命令引數分解符(協議)

params := bytes.split(line, separatorbytes)

var response byte

//執行命令

response, err = p.exec(client, params)

if err != nil

//傳送響應給客戶端

if response != nil

} ....

}

messagepump函式

func (p *protocolv2) messagepump(client *clientv2, startedchan chan bool)  else if flushed  else 

select

case b := <-backendmsgchan: //消費磁碟channel的訊息,

....

case msg := <-memorymsgchan: //消費記憶體channel的訊息

if samplerate > 0 && rand.int31n(100) > samplerate

msg.attempts++ //嘗試重新消費的次數,如果超過這個次數,不應該再重新消費

//加入等待消費確認的佇列,如果超過msgtimeout,就把被重新被消費,加入這個隊裡。

subchannel.startinflighttimeout(msg, client.id, msgtimeout)

//增加總共傳送的訊息總數 和 待消費確認的訊息總數

client.sendingmessage()

//傳送訊息到客戶快取

err = p.sendmessage(client, msg)

if err != nil

flushed = false

case <-client.exitchan:

goto exit

} }...

}

exec函式

func (p *protocolv2) exec(client *clientv2, params byte) (byte, error) 

err := enforcetlspolicy(client, p, params[0]) //tls 加密

if err != nil

switch

return nil, protocol.newfatalclienterr(nil, "e_invalid", fmt.sprintf("invalid command %s", params[0]))

}

總結

1.ioloop 主要實現兩個迴圈處理,乙個是messagepump,主要是對心跳檢測,訂閱的消費的訊息輸出等。另乙個是對客戶端請求命令的處理exec。

NSQ原始碼分析之概述

目前,看了nsqlookupd的 寫的真的很精美,我覺得 可以和redis相媲美,這等後續分析 時再詳說 關於nsq的特性,可以檢視nsq官網 這篇文章主要分析以下幾點 nsq提供了三大元件以及一些工具,三大元件為 nqsd nsq主要元件,用於儲存訊息以及分發訊息 nsqlookupd 用於管理n...

NSQ 原始碼分析之NSQD Topic

今天主要講的是nsq topic 的 實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。主要 檔案 1.nsqd topic.go topic結構體 type topic structnewtopic 函式 主要做三件事,一是例項化topic,二是開啟m...

NSQ 原始碼分析之NSQD Channel

今天主要講的是nsq channel 的 實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。主要 檔案 1.nsqd channel.go channel結構體 type channel structnewchannel 主要實現channel...