#coding=utf-8import pika
import time
connection = pika.blockingconnection(pika.connectionparameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hellow',#這裡為什麼會重新宣告乙個佇列,是因為不確定是客戶端先啟動還是服務端先啟動,所以先確認佇列存在
durable=true) #防止rabbitmq伺服器掛掉資料丟失了,佇列持久化,durable只儲存了佇列,不儲存隊裡裡面的資料
def callback(ch,method,properties,body): #**函式
print '收到訊息:',body
time.sleep(30)
print '訊息處理完畢:', body
channel.basic_consume(callback,queue='hellow',
#no_ack=true 需要客戶端確認,如果正在處理訊息的時候客戶端掛掉就會轉到下乙個客戶端,會等待訊息完整的處理完
)#如果收到訊息就呼叫callback來處理訊息
print '等待接收訊息。。。。。'
channel.start_consuming() #迴圈持續執行下去
#訊息持久化
已經定義的佇列,再次定義是無效的,這就是冪次原理。rabbitmq不允許重新定義乙個已有的佇列資訊,也就是說不允許修改已經存在的佇列的引數。如果非要這樣做,只會返回異常。
生產者 消費者例項
package com.rx.thread.produce.consume public class product public void setid string id public product string id public string tostring public static v...
Kafka消費者生產者例項
它允許發布和訂閱記錄流,類似於訊息佇列或企業訊息傳遞系統。它可以容錯的方式儲存記錄流。它可以處理記錄發生時的流。由於主要介紹如何使用kafka快速構建生產者消費者例項,所以不會涉及kafka內部的原理。乙個基於kafka的生產者消費者過程通常是這樣的 來自官網 cd kafka 2.11 0.11....
Kafka消費者生產者例項
2017年07月30日18 22 56 rhwayfunn 閱讀數 13818標籤 kafka 更多 個人分類 分布式系統 為了更為直觀展示卡夫卡的訊息生產消費的過程,我會從基於控制台和基於應用兩個方面介紹使用例項.kafka是乙個分布式流處理平台,具體來說有三層含義 它允許發布和訂閱記錄流,類似於...