最近專案中接觸到zeromq, 內部實現挺複雜的,沒時間深入了解,簡單記錄下使用方法吧,有時間會來填坑。 官方指導文件
專案主要用zeromq在多個ip主機上的服務間進行專案通訊,直接用scoket也可以實現,但比較費時費力,zeromq建立在socket的基礎上,提供了一套更加簡單強大的api,可以快速搭建起跨程序,跨ip等的通訊網路。很多文章中都提到了socket只能實現一對一的通訊,zeromq可以實現多對多的連線,而且有三種模式供選擇,可以根據業務需要,進行選擇和使用。
zeromq的三種通訊模式分別是:request-reply, publisher-subscriber, parallel pipeline
python安裝zmq模組:pip install pyzmq
1. request-reply(應答模式)
應答模式特點:
1. 客戶端提出請求,服務端必須回答請求,每個請求只回答一次
2. 客戶端沒有收到答覆前,不能再次進行請求
3. 可以有多個客戶端提出請求,服務端能保證各個客戶端只接收到自己的答覆
4. 如果服務端斷掉或者客戶端斷掉會產生怎樣的影響?
如果是客戶端斷掉,對服務端沒有任何影響,如果客戶端隨後又重新啟動,那麼兩方繼續一問一答,但是如果是服務端斷掉了,就可能會產生一些問題,這要看服務端是在什麼情況下斷掉的,如果服務端收是在回答完問題後斷掉的,那麼沒影響,重啟服務端後,雙發繼續一問一答,但如果服務端是在收到問題後斷掉了,還沒來得及回答問題,這就有問題了,那個提問的客戶端遲遲得不到答案,就會一直等待答案,因此不會再傳送新的提問,服務端重啟後,客戶端遲遲不發問題,所以也就一直等待提問。
python 實現客戶端和服務端**如下:
zmq_server.py
import zmq
context=zmq.context() #建立上下文
socket=context.socket(zmq.rep) #建立response服務端socket
socket.bind("tcp://*:5555") #socket繫結,*表示本機ip,埠號為5555,採用tcp協議通訊whiletrue:
message=socket.recv()
print(type(message)) #接收到的訊息也會bytes型別(位元組)
print("收到訊息:{}".format(message))
socket.send(b"new message") #傳送訊息,位元組碼訊息
zmq_client.py
#coding:utf-8import zmq
context=zmq.context()
socket=context.socket(zmq.req)
socket.connect("tcp://localhost:5555")
socket.send(b"a message")
response=socket.recv()
print(response)
常用資料傳送api如下:
#傳送資料
socket.send_json(data) #data 會被json序列化後進行傳輸 (json.dumps)
socket.send_string(data, encoding="utf-8") #data為unicode字串,會進行編碼成子節再傳輸
socket.send_pyobj(obj) #obj為python物件,採用pickle進行序列化後傳輸
socket.send_multipart(msg_parts) # msg_parts, 傳送多條訊息組成的迭代器序列,每條訊息是子節型別,
# 如[b"message1", b"message2", b"message2"]
#接收資料
socket.recv_json()
socket.recv_string()
socket.recv_pyobj()
socket.recv_multipart()
2. publisher-subscriber (發布-訂閱模式)
publiser廣播訊息到所有客戶端,客戶端根據訂閱主題過濾訊息
python實現**如下, 其中publisher發布兩條訊息,第一條訊息的topic為client1, 被第乙個subscriber接收到;第二條訊息的topic為client2, 被第二個subscriber接收到。
注意的是subscriber在匹配時,並不是完全匹配的,訊息的topic為client1開頭的字串都會被匹配到,如果topic為"client1cient2", 也會被第乙個subscriber接收到
zmq_server.py
#coding:utf-8import zmq
context=zmq.context()
socket=context.socket(zmq.pub)
socket.bind("tcp://*:5555")
topic= ["client1", "client2"]whiletrue:for t intopic:
data= "message for {}".format(t)
msg= [t.encode("utf-8"), data.encode("utf-8")] #列表中的第一項作為訊息的topic,sub根據topic過濾訊息
print(msg)
socket.send_multipart(msg)
zmq_client1.py
#coding:utf-8import zmq
context=zmq.context()
socket=context.socket(zmq.sub)
socket.subscribe("client1") #訂閱主題topic為:client1
socket.connect("tcp://localhost:5555")
msg=socket.recv_multipart()
print(msg)
結果:zmq_client2.py
import zmq
context = zmq.context()
socket = context.socket(zmq.sub)
socket.subscribe("client2") #訂閱主題topic為:client2
socket.connect("tcp://localhost:5555")
msg = socket.recv_multipart()
print(msg)
結果:3. parallel pipeline(並行管道模式)
管道模式有三部分組成,如下圖所示,最左邊的producer通過push產生任務, 中間的consumer接收任務處理後**,最後result collector接收所有任務的結果。 相比於publisher-subscriber,多了乙個資料快取和處理負載的部分,當連線斷開,資料不會丟失,重連後資料繼續傳送到客戶端。
python實現producer, consumer, resultcollector
producer.py
import zmq
context=zmq.context()
socket=context.socket(zmq.push)
socket.bind("tcp://*:5577")for num in range(2000):
work_message=
socket.send_json(work_message)
consumer.py
import random
import zmq
context=zmq.context()
consumer_id= random.randint(1, 1000)
#接收工作
consumer_receiver=context.socket(zmq.pull)
consumer_receiver.connect("tcp://localhost:5577")
#**結果
consumer_sender=context.socket(zmq.push)
consumer_sender.bind("tcp://*:5578")whiletrue:
msg=consumer_receiver.recv_json()
data= msg["num"]
result=
consumer_sender.send_json(result)
resultcollector.py
#coding:utf-8import zmq
context=zmq.context()
result_receiver=context.socket(zmq.pull)
result_receiver.connect("tcp://localhost:5578")
result=result_receiver.recv_json()
collecter_data={}for x in range(1000):if result['consumer_id'] incollecter_data:
collecter_data[result['consumer_id']] = collecter_data[result['consumer_id']] + 1
else:
collecter_data[result['consumer_id']] = 1
if x == 999:
print(collecter_data)
執行順序:
python producer.py
python consumer.py
python resultcollector.py
程式設計實現基於tcp的socket程式設計
server端 public class server socket.shutdowninput 關閉輸入流 4 獲取輸出流,響應客戶端的請求 outputstream os socket.getoutputstream printwriter pw new printwriter os 包裝為列印...
用C 實現基於用C 實現基於TCP協議的網路通訊
tcp 協議是乙個基本的網路 協議,基本上所有的網路服務都是基於 tcp協議的,如http,ftp等等,所以要了解網路程式設計就必須了解基於 tcp協議的程式設計。然而 tcp協議是乙個龐雜的體系,要徹底的弄清楚它的實現不是一天兩天的功夫,所幸的是在.net framework環境下,我們不必要去追...
TCP是怎麼實現可靠傳輸的
tcp協議傳輸的特點主要是面向位元組流 傳輸可靠 面向連線。答 tcp協議保證資料傳輸可靠性的方式主要有 序列號 tcp傳輸時將每個位元組的資料都進行了編號,即序列號。確認應答 tcp傳輸過程中,每次接收方收到資料後,都會對傳輸方進行確認應答。也就是傳送ack報文。這個ack報文當中帶有對應的確認序...