使用eventlet併發consumer指令碼:
eventlet.monkey_patch(all=true)
msg_per_queue = 50
queue_num = 10
rabbit_host = '10.23.54.150:5672'
class consumer():
def __init__(self, count):
self.queue_name = 'test_queue%d' % count
self.i=0
self.a=none
self.b=none
server =
self.conn = amqp.connection( server['host'],
userid=server['userid'],
password=server['password'],
ssl=server['ssl'])
self.ch = self.conn.channel()
self.ch.access_request('/data', active=true, read=true)
self.ch.exchange_declare(exchange='rabbit_test_concurrent',
type='topic', durable=true,
auto_delete=false)
self.ch.queue_declare(queue=self.queue_name, durable=true,
exclusive=false, auto_delete=false)
self.ch.queue_bind(queue=self.queue_name,
exchange='rabbit_test_concurrent',
routing_key=self.queue_name)
def showmsg(self, msg):
if self.i == 0:
self.a = time.time()
self.i = self.i + 1
msg.channel.basic_ack(msg.delivery_tag)
if self.i == msg_per_queue:
self.b =time.time()
msg.channel.basic_cancel(msg.consumer_tag)
if msg.body == 'quit':
msg.channel.basic_cancel(msg.consumer_tag)
def __call__(self):
self.ch.basic_consume(self.queue_name, callback=self.showmsg)
try:
while self.ch.callbacks:
self.ch.wait()
self.ch.close()
self.conn.close()
except keyboardinterrupt, e:
print 'hello'
raise e
if __name__ == '__main__':
consumers =
pool = eventlet.greenpool()
try:
for i in range(1, queue_num+1):
consumer = consumer(i)
pool.spawn_n(consumer)
pool.waitall()
except keyboardinterrupt :
msg_sum = sum(consumer.i for consumer in consumers)
print 'the sum of received
messages is %d' % msg_sum
exit()
a = min(consumer.a for consumer in consumers)
b = max(consumer.b for consumer in consumers)
if a and b:
print 'the time used is %f' % (b-a)
執行出現下面問題:
traceback (most recent call last):
file "consumer_concurrent_eventlet.py", line 68, in
consumer = consumer(i)
file "consumer_concurrent_eventlet.py", line 27, in __init__
ssl=server['ssl'])
file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/connection.py", line 138, in __init__
self._x_start_ok(d, login_method, login_response, locale)
file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/connection.py", line 719, in _x_start_ok
self._send_method((10, 11), args)
file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/abstract_channel.py", line 76, in _send_method
method_sig, args, content)
file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/method_framing.py", line 252, in write_method
self.dest.write_frame(1, channel, payload)
file "/usr/lib/python2.6/site-packages/amqplib-1.0.2-py2.6.egg/amqplib/client_0_8/transport.py", line 165, in write_frame
frame_type, channel, size, payload, 0xce))
file "/usr/lib/python2.6/site-packages/eventlet-0.13.0-py2.6.egg/eventlet/greenio.py", line 307, in sendall
tail = self.send(data, flags)
file "/usr/lib/python2.6/site-packages/eventlet-0.13.0-py2.6.egg/eventlet/greenio.py", line 293, in send
total_sent += fd.send(data[total_sent:], flags)
socket.error: [errno 104] connection reset by peer
將**進行如下修改,貌似可解決問題:
if __name__ == '__main__':
consumers =
pool = eventlet.greenpool()
try:
for i in range(1, queue_num+1):
consumer = consumer(i)
pool.spawn_n(consumer)
pool.waitall()
except keyboardinterrupt :
msg_sum = sum(consumer.i for consumer in consumers)
print 'the sum of received
messages is %d' % msg_sum
exit()
改為如下:
if __name__ == '__main__':
consumers =
pool = eventlet.greenpool()
try:
for i in range(1, queue_num+1):
consumer = consumer(i)
print '%d connection is open' % i
for consumer in consumers:
pool.spawn_n(consumer)
pool.waitall()
except keyboardinterrupt :
msg_sum = sum(consumer.i for consumer in consumers)
print 'the sum of received
messages is %d' % msg_sum
exit()
當發現問題時,第一反映是伺服器端可能出現了tcp連線數限制問題,覺得自己指令碼正常。通過列印標誌得到原來好多鏈結並沒有connection成功。
RabbitMQ的面試問題
為什麼使用訊息佇列啊?訊息佇列都有什麼優點和缺點?有點有上面說的解耦,非同步,削峰 缺點呢?因為插入了mq這個訊息中介軟體,如果mq訊息中介軟體掛掉了,那麼像上面說的解耦的情況,服務a的訊息給bcdef等伺服器就傳送不出去了,如果mq沒有掛掉bc都從a中獲取了訊息,修改了資料庫的資料,但是d服務的監...
RabbitMQ常見的面試問題
rabitmq是什麼?rabbitmq是一款使用erlang語言開發,基於amqp協議的訊息中介軟體.rabbitmq的應用場景是什麼?1.非同步通知 發簡訊,郵件的時候,採用非同步處理的方式,客戶無需等待通知結果 2.流量削鋒 在電商中大秒殺活動中,採用佇列長度來控制請求的數量,超過佇列的長度,則...
壓力測試問題
環境 兩台虛機配置,千m網絡卡 a 8cpu,32g記憶體 應用伺服器 b 2cpu,8g記憶體 資料庫 壓力測試資料 200使用者,每秒響應160 170次左右,cpu占用10 左右,記憶體占用穩定,始終無法提公升,網路使用率4 資料庫伺服器,cpu 2 記憶體占用穩定 資料庫一切正常,伺服器cp...