NSQ原始碼分析之概述

2021-08-20 17:43:10 字數 3314 閱讀 3188

目前,看了nsqlookupd的**,寫的真的很精美,我覺得**可以和redis相媲美,這等後續分析**時再詳說;關於nsq的特性,可以檢視nsq官網;這篇文章主要分析以下幾點:

nsq提供了三大元件以及一些工具,三大元件為:

nqsd nsq主要元件,用於儲存訊息以及分發訊息;

nsqlookupd 用於管理nsqd集群拓撲,提供查詢nsqd主機位址的服務以及服務最終一致性;

nsqadmin 用於管理以及檢視集群中的topic,channel,node等等;

對於單機版,只需要用到nsqd就夠了,但是單機會出現單點問題以及沒有監控,因此如果是線上環境,都會部署nsqlookupd,nsqadmin以及nsqd集群;這裡先給出我手繪的nsq拓撲圖:

nsq的拓撲結構和檔案系統的拓撲結構類似,有乙個中心節點來管理集群節點;我們從圖中可以看出:

nsqlookupd服務同時開啟tcp和http兩個監聽服務,nsqd會作為客戶端,連上nsqlookupd的tcp服務,並上報自己的topic和channel資訊,以及通過心跳機制判斷nsqd狀態;還有個http服務提供給nsqadmin獲取集群資訊;

nsqadmin只開啟http服務,其實就是乙個web服務,提供給客戶端查詢集群資訊;

nsqd也會同時開啟tcp和http服務,兩個服務都可以提供給生產者和消費者,http服務還提供給nsqadmin獲取該nsqd本地topic和channel資訊;

以上就是nsq集群服務整體拓撲資訊,下面來看下客戶端方面;nsq提供了多種語言的支援,比如go-nsq for golang,pynsq for python等;上述拓撲資訊,我也是看了pynsq才弄懂了客戶端是如何和nsq集群連線的;我們來看下:

生產者會同時連上nsq集群中所有nsqd節點,當然這些節點的位址是在writer初始化時,通過外界傳遞進去;當發布訊息時,writer會隨機選擇乙個nsqd節點發布某個topic的訊息;

消費者也會同時連上nsq集群中所有nsqd節點,reader首先會連上nsqlookupd,獲取集群中topic的所有producer,然後通過tcp連上所有producer節點,並在本地用tornado輪詢每個連線,當某個連線有可讀事件時,即有訊息達到,處理即可;

根據我自己的理解,說說nsq優點:

ok,分析了nsq集群整體的拓撲結構之後,我們來看下單個nsqd節點是如何處理訊息的,下面給出官網提供的**:

當向某個topic發布乙個訊息時,該訊息會被複製到所有的channel,如果channel只有乙個客戶端,那麼channel就將訊息投遞給這個客戶端;如果channel的客戶端不止乙個,那麼channel將把訊息隨機投遞給任何乙個客戶端,這也可以看做是客戶端的負載均衡;

pip3 install pynsq
然後需要在三個終端分別開啟nsqd,nsqlookupd和nsqadmin服務

三個服務開啟之後,我們就可以編寫生產者和消費者;生產者**如下:

importnsqimporttornado.ioloopimporttimedefpub_message():

writer.pub('test', time.strftime('%h:%m:%s').encode('utf-8'), finish_pub)deffinish_pub(conn, data):

print(data)

writer = nsq.writer(['127.0.0.1:4150'])

tornado.ioloop.periodiccallback(pub_message, 5000).start()

nsq.run()

消費者**如下:

importnsqdefhandler(message):

print(message.body)returntruer = nsq.reader(message_handler=handler,

nsqd_tcp_addresses=['127.0.0.1:4150'],

topic='test', channel='test', lookupd_poll_interval=15)

nsq.run()

這裡消費者,暫時連線到具體的nsqd例項,因為連線到nsqlookupd會報錯,根據出錯提示,應該是pynsq包對nsqlookupd返回的結果處理出錯;

這裡的生產者每隔5秒向'test' topic傳送時間字串,消費者可以得到這個時間字串,我們可以看下上述生產者和消費的輸出;

//生產者輸出,返回ok,表示訊息投遞成功

charles@charles-aspire-4741:~/mydir/pydir$ python3 nsq_producer.py

b'ok'

b'ok'

b'ok'

b'ok'

.....

//消費者輸出

charles@charles-aspire-4741:~/mydir/pydir$ python3 nsq_consume.py

b'19:36:11'

b'19:36:16'

b'19:36:21'

b'19:36:26'

b'19:36:31'

.....

上述就是python操作nsq最簡單的示例程式;

這篇文章分析了nsq的架構設計,並通過乙個簡單的例子說明了nsq如何使用;當然nsq還有很多配置引數,例如每隔訊息佇列的長度,以及記憶體使用上限等等;後續文章,將繼續分析nsqlookupd和nsqd的原始碼。

NSQ 原始碼分析之NSQD ProtocolV2

今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。詳細流程參考 中的邏輯流程圖。主要 檔案 1.nsqd protocol v2.go io...

NSQ 原始碼分析之NSQD Topic

今天主要講的是nsq topic 的 實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。主要 檔案 1.nsqd topic.go topic結構體 type topic structnewtopic 函式 主要做三件事,一是例項化topic,二是開啟m...

NSQ 原始碼分析之NSQD Channel

今天主要講的是nsq channel 的 實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。主要 檔案 1.nsqd channel.go channel結構體 type channel structnewchannel 主要實現channel...