zmq pub-sub, push-pull模式沒有客服端服務端啟動先後順序的限制,與普通的socket通訊不一樣,必須先啟動服務端。
以下是測試程式,pub.py為服務端,sub.py客戶端。
pub.py
# coding: utf-8
import zmq
import time
import threading
import os
import stat
# 分類後的日誌的zmq的pub位址
log_type_pub_path = "ipc:///tmp/log_types.ipc"
# simulator 日誌的zmq的sub位址
log_sub_path = "ipc:///tmp/log_lator.ipc"
topic_list = ["lator", "att"]
def unlink_ipc(path):
index = path.rfind('ipc://')
if index < 0:
return
fpath = path[len('ipc://'):]
#if os.path.exists(fpath):
os.unlink(fpath)
def pub(pubaddr, topic):
context = zmq.context()
sock = context.socket(zmq.pub)
sock.set_hwm(100)
#unlink_ipc(pubaddr)
sock.bind(pubaddr)
counter = 1
os.chmod(pubaddr[len('ipc://'):], stat.s_irwxo + stat.s_irwxg + stat.s_irwxu)
zpath = sock.getsockopt(zmq.last_endpoint)
print zpath
while true:
messagedata = "this is msg fro topic one %s" % counter
print "%s %s" % (topic, messagedata)
sock.send("%s %s" % (topic, messagedata))
counter = counter + 1
time.sleep(1)
if __name__ == "__main__":
t1 = threading.thread(target=pub, args=(log_type_pub_path, "lator"))
t2 = threading.thread(target=pub, args=(log_sub_path, "att"))
t1.start()
t2.start()
t1.join()
t2.join()
sub.py
# coding: utf-8
import os
import zmq
from zmq.eventloop.ioloop import ioloop
from zmq.eventloop.zmqstream import zmqstream
# 分類後的日誌的zmq的pub位址
log_type_pub_path = "ipc:///tmp/log_types.ipc"
# simulator 日誌的zmq的sub位址
log_sub_path = "ipc:///tmp/log_lator.ipc"
topic_list = ["lator", "att"]
def unlink_ipc(path):
index = path.rfind('ipc://')
if index < 0:
return
fpath = path[len('ipc://'):]
if os.path.exists(fpath):
os.unlink(fpath)
def recv_func(msg):
print msg
def main2():
loop_instance = ioloop.instance()
ctx = zmq.context.instance()
sock = ctx.socket(zmq.sub)
sock.set_hwm(100)
sock.connect(log_type_pub_path)
sock.connect(log_sub_path)
for key in topic_list:
if isinstance(key, str):
sock.setsockopt(zmq.subscribe, key)
elif isinstance(key, unicode):
sock.setsockopt_string(zmq.subscribe, key)
else:
print("log_broker to set subscribe error:%s" % key)
sock = zmqstream(sock, loop_instance)
sock.on_recv(recv_func)
loop_instance.start()
if __name__ == "__main__":
main2()
ipc通訊之管道
首先 一 無名管道pipe 1,沒有名字的 2,半雙工 讀寫不能同時進行 3,通過直系親屬訪問繼承 4,管道缺省會阻塞 5,不能用lseek定位 6,操作沒有原子性 示例 include include include include include include include void sig...
程序間通訊之 IPC
有三種稱做xsi ipc的ipc 訊息佇列 message queues 訊號量 semaphores 以及共享記憶體 shared memory 每個核心中的ipc結構 訊息佇列,訊號量和共享儲存段 都用乙個非負整數的識別符號來加以引用。要向乙個佇列中傳送訊息或讀取訊息只需要知道其佇列識別符號即可...
IPC通訊之共享記憶體
共享記憶體就是使得多個程序可以訪問同一塊記憶體空間,是最快的可用 ipc形式。是針對其他通訊機制執行效率較低而設計的。往往與其它通訊機制,如訊號量結合使用,來達到程序間的同步及互斥。我們通過一張圖來表示這個關係 共享記憶體和訊息佇列,訊號量一樣都屬於xsi ipc。核心都為他們維護了一套資料結構 同...