目前,看了nsqlookupd的**,寫的真的很精美,我覺得**可以和redis相媲美,這等後續分析**時再詳說;關於nsq的特性,可以檢視nsq官網;這篇文章主要分析以下幾點:
nsq提供了三大元件以及一些工具,三大元件為:
nqsd nsq主要元件,用於儲存訊息以及分發訊息;
nsqlookupd 用於管理nsqd集群拓撲,提供查詢nsqd主機位址的服務以及服務最終一致性;
nsqadmin 用於管理以及檢視集群中的topic,channel,node等等;
對於單機版,只需要用到nsqd就夠了,但是單機會出現單點問題以及沒有監控,因此如果是線上環境,都會部署nsqlookupd,nsqadmin以及nsqd集群;這裡先給出我手繪的nsq拓撲圖:
nsq的拓撲結構和檔案系統的拓撲結構類似,有乙個中心節點來管理集群節點;我們從圖中可以看出:
nsqlookupd服務同時開啟tcp和http兩個監聽服務,nsqd會作為客戶端,連上nsqlookupd的tcp服務,並上報自己的topic和channel資訊,以及通過心跳機制判斷nsqd狀態;還有個http服務提供給nsqadmin獲取集群資訊;
nsqadmin只開啟http服務,其實就是乙個web服務,提供給客戶端查詢集群資訊;
nsqd也會同時開啟tcp和http服務,兩個服務都可以提供給生產者和消費者,http服務還提供給nsqadmin獲取該nsqd本地topic和channel資訊;
以上就是nsq集群服務整體拓撲資訊,下面來看下客戶端方面;nsq提供了多種語言的支援,比如go-nsq for golang,pynsq for python等;上述拓撲資訊,我也是看了pynsq才弄懂了客戶端是如何和nsq集群連線的;我們來看下:
生產者會同時連上nsq集群中所有nsqd節點,當然這些節點的位址是在writer初始化時,通過外界傳遞進去;當發布訊息時,writer會隨機選擇乙個nsqd節點發布某個topic的訊息;
消費者也會同時連上nsq集群中所有nsqd節點,reader首先會連上nsqlookupd,獲取集群中topic的所有producer,然後通過tcp連上所有producer節點,並在本地用tornado輪詢每個連線,當某個連線有可讀事件時,即有訊息達到,處理即可;
根據我自己的理解,說說nsq優點:
ok,分析了nsq集群整體的拓撲結構之後,我們來看下單個nsqd節點是如何處理訊息的,下面給出官網提供的**:
當向某個topic發布乙個訊息時,該訊息會被複製到所有的channel,如果channel只有乙個客戶端,那麼channel就將訊息投遞給這個客戶端;如果channel的客戶端不止乙個,那麼channel將把訊息隨機投遞給任何乙個客戶端,這也可以看做是客戶端的負載均衡;
pip3 install pynsq
然後需要在三個終端分別開啟nsqd,nsqlookupd和nsqadmin服務
三個服務開啟之後,我們就可以編寫生產者和消費者;生產者**如下:
importnsqimporttornado.ioloopimporttimedefpub_message():
writer.pub('test', time.strftime('%h:%m:%s').encode('utf-8'), finish_pub)deffinish_pub(conn, data):
print(data)
writer = nsq.writer(['127.0.0.1:4150'])
tornado.ioloop.periodiccallback(pub_message, 5000).start()
nsq.run()
消費者**如下:
importnsqdefhandler(message):
print(message.body)returntruer = nsq.reader(message_handler=handler,
nsqd_tcp_addresses=['127.0.0.1:4150'],
topic='test', channel='test', lookupd_poll_interval=15)
nsq.run()
這裡消費者,暫時連線到具體的nsqd例項,因為連線到nsqlookupd會報錯,根據出錯提示,應該是pynsq包對nsqlookupd返回的結果處理出錯;
這裡的生產者每隔5秒向'test' topic傳送時間字串,消費者可以得到這個時間字串,我們可以看下上述生產者和消費的輸出;
//生產者輸出,返回ok,表示訊息投遞成功
charles@charles-aspire-4741:~/mydir/pydir$ python3 nsq_producer.py
b'ok'
b'ok'
b'ok'
b'ok'
.....
//消費者輸出
charles@charles-aspire-4741:~/mydir/pydir$ python3 nsq_consume.py
b'19:36:11'
b'19:36:16'
b'19:36:21'
b'19:36:26'
b'19:36:31'
.....
上述就是python操作nsq最簡單的示例程式;
這篇文章分析了nsq的架構設計,並通過乙個簡單的例子說明了nsq如何使用;當然nsq還有很多配置引數,例如每隔訊息佇列的長度,以及記憶體使用上限等等;後續文章,將繼續分析nsqlookupd和nsqd的原始碼。
NSQ 原始碼分析之NSQD ProtocolV2
今天來說說nsqd.tcpserver中的核心函式ioloop的具體實現,ioloop主要的工作是接收和響應客戶的命令。同時開啟messagepump goroutine 進行心跳檢查,給訂閱者發生訊息等操作。詳細流程參考 中的邏輯流程圖。主要 檔案 1.nsqd protocol v2.go io...
NSQ 原始碼分析之NSQD Topic
今天主要講的是nsq topic 的 實現,topic作為mq的重要概念,了解它的實現,對我們理解其他mq的topic,有很多的益處。主要 檔案 1.nsqd topic.go topic結構體 type topic structnewtopic 函式 主要做三件事,一是例項化topic,二是開啟m...
NSQ 原始碼分析之NSQD Channel
今天主要講的是nsq channel 的 實現,channel 作為topic的重要組成部分,主要的作用是通過佇列的形式傳遞訊息,並等待訂閱者消費。主要 檔案 1.nsqd channel.go channel結構體 type channel structnewchannel 主要實現channel...