今天主要講的是nsq topic 的**實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。
主要**檔案:
1.nsqd/topic.go
topic結構體
type topic struct
newtopic 函式 主要做三件事,一是例項化topic, 二是開啟messagepump goroutine 進行訊息處理,三是通知 nsqd 有新的 topic建立,讓 nsqd 上報 lookupd
func newtopic(topicname string, ctx *context, deletecallback func(*topic)) *topic
// 建立記憶體佇列
if ctx.nsqd.getopts().memqueuesize > 0
//ephemeral 有特殊的用途,暫時還不知道幹啥?
if strings.hassuffix(topicname, "#ephemeral") else
//messagepump 主要作用是,傳送msg給所有訂閱了這個 topic 下的 channel。(channelmap)
t.waitgroup.wrap(t.messagepump)
//通知 nsqd 有新的 topi c建立。
t.ctx.nsqd.notify(t)
return t
}
messagepump 函式,主要處理 topic/channel 的變動及發布訊息給 channel
func (t *topic) messagepump()
break
} t.rlock()
//收集訂閱的channel
for _, c := range t.channelmap
t.runlock()
if len(chans) > 0 && !t.ispaused()
// main message loop
for
//將 msg 傳送給所有訂閱的 channel
for i, channel := range chans
if chanmsg.deferred != 0
//傳送 msg 到 channel
err := channel.putmessage(chanmsg)
...} }
exit:
t.ctx.nsqd.logf(log_info, "topic(%s): closing ... messagepump", t.name)
}
putmessage/putmessages 函式都是將訊息傳送(put)到topic的佇列(記憶體/磁碟)中,流程基本相同,都是要累計訊息條數和累計訊息的位元組總數。
func (t *topic) putmessage(m *message) error
//傳送訊息
err := t.put(m)
if err != nil
atomic.adduint64(&t.messagecount, 1) //累計訊息條數
atomic.adduint64(&t.messagebytes, uint64(len(m.body))) //累計位元組總數
return nil
}
func (t *topic) put(m *message) error
return nil
}
其他函式:
delete/close: topic 退出結束
pause/unpause:topic 暫停/重啟
flush:將記憶體佇列的訊息,全部重新整理到磁碟進行持久化(exit 操作的時候)
總結:今天主要分析了topic的**實現,在這裡主要需要關注的是 topic 如何接收訊息(pub),又如何將訊息傳送給 channel(messagepump),最後需要關注的是什麼時候將訊息儲存在記憶體,什麼時候儲存在磁碟。
下次分享:channel的**實現
NSQ原始碼分析之概述
目前,看了nsqlookupd的 寫的真的很精美,我覺得 可以和redis相媲美,這等後續分析 時再詳說 關於nsq的特性,可以檢視nsq官網 這篇文章主要分析以下幾點 nsq提供了三大元件以及一些工具,三大元件為 nqsd nsq主要元件,用於儲存訊息以及分發訊息 nsqlookupd 用於管理n...
NSQ 原始碼分析之NSQD ProtocolV2
今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。詳細流程參考 中的邏輯流程圖。主要 檔案 1.nsqd protocol v2.go io...
NSQ 原始碼分析之NSQD Channel
今天主要講的是nsq channel 的 實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。主要 檔案 1.nsqd channel.go channel結構體 type channel structnewchannel 主要實現channel...