隨筆記錄zmq中的poller 用法

2021-08-10 02:21:45 字數 3585 閱讀 4071

乙個執行緒中如果有多個sokect,同時需要收發資料時,zmq提供polling sockets實現,不用在send()或者recv()時阻塞socket。

下面是乙個在recv()端接受資訊的poller()輪詢接受**。

#!/usr/bin/python

# -*- coding: utf-8

import zmq

import random

import time

from multiprocessing import process

def server_push(port="5556"):

context = zmq.context()

socket = context.socket(zmq.push)

socket.bind("tcp://*:%s" % port)

print "running server on port: ", port

# serves only 5 request and dies

for reqnum in range(10):

if reqnum < 6:

socket.send("continue")

else:

socket.send("exit")

break

time.sleep (1)

def server_pub(port="5558"):

context = zmq.context()

socket = context.socket(zmq.pub)

socket.bind("tcp://*:%s" % port)

publisher_id = random.randrange(0,9999)

print "running server on port: ", port

# serves only 5 request and dies

for reqnum in range(10):

# wait for next request from client

topic = random.randrange(8,10)

messagedata = "server#%s" % publisher_id

print "%s %s" % (topic, messagedata)

socket.send("%d %s" % (topic, messagedata))

time.sleep(1)

def client(port_push, port_sub):

context = zmq.context()

socket_pull = context.socket(zmq.pull)

socket_pull.connect ("tcp://localhost:%s" % port_push)

print "connected to server with port %s" % port_push

socket_sub = context.socket(zmq.sub)

socket_sub.connect ("tcp://localhost:%s" % port_sub)

socket_sub.setsockopt(zmq.subscribe, "9")

#zmq.subscribe建立乙個訊息過濾標誌,訂閱以9為字首的訊息

print "connected to publisher with port %s" % port_sub

# 初始化poller

poller = zmq.poller()

poller.register(socket_pull, zmq.pollin)

poller.register(socket_sub, zmq.pollin)

# work on requests from both server and publisher

#如果設定為pollin則重新整理recv,與之對應的是pollout重新整理send傳送事件,也可以同時設定兩個標誌

should_continue = true

while should_continue:

socks = dict(poller.poll())

if socket_pull in socks and socks[socket_pull] == zmq.pollin:

message = socket_pull.recv()

print "recieved control command: %s" % message

if message == "exit":

print "recieved exit command, client will stop recieving messages"

should_continue = false

if socket_sub in socks and socks[socket_sub] == zmq.pollin:

string = socket_sub.recv()

topic, messagedata = string.split()

#python split()通過指定分隔符對字串進行切片,如果引數num 有指定值,則僅分隔 num 個子字串

#str.split(str="", num=string.count(str)).

# str -- 分隔符,預設為所有的空字元,包括空格、換行(\n)、製表符(\t)等,num -- 分割次數

print "processing ... ", topic, messagedata

if __name__ == "__main__":

# now we can run a few servers

server_push_port = "5556"

server_pub_port = "5558"

process(target=server_push, args=(server_push_port,)).start()

process(target=server_pub, args=(server_pub_port,)).start()

process(target=client, args=(server_push_port,server_pub_port,)).start()

在poller中pollin和pollout的區別是pollin在recv()端,負責重新整理recv埠,來接受資訊,pollout在send()埠,負責重新整理send端,來傳送訊息。

flag (int, default=pollin|pollout) – 0mqpoll flags. if flag|pollin, recv events will be flushed. if flag|pollout, sendevents will be flushed. both flags can be set at once, which is the default.

這是程式執行結果。

rabbitMQ的隨筆記錄

rabbitmq windows 安裝好erlang 需要配置系統環境變數 erlang home erlang 安裝根目錄 path erlang home bin rabbitmq server home bin rabbitmq安裝完成開啟外掛程式 rabbitmq managemen是管理後...

hive的隨筆記錄

1,當hive的sql語句select有資料,但count顯示為0 答 這個主要是元資料裡沒有統計,所以取不出來統計後的條數,解決方法有兩種 set hive.compute.query.using.stats false 這個配置一般在hive site.xml有配置,不從元資料裡取值,重新計算,...

Android 隨筆記錄

安卓環境變數配置 系統變數 變數名 android home 變數值 d android android sdk windows path中增加 android home platform tools android home tools 返回 override public boolean onk...