Python3 實現 Redis 訊息佇列

2021-10-22 13:24:53 字數 1738 閱讀 2943

廢話不多說,我們先封裝乙個類。

redis_message_queue.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

from redis import redis, connectionpool

class rmq(object):

def __init__(self, url, name):

# self.client = redis(host=url)

pool = connectionpool.from_url(url=url, decode_responses=true)

self.client = redis(connection_pool=pool)

self.queue_name = name

def publish(self, data):

""" 發布 """

self.client.publish(self.queue_name, data)

return true

def subscribe(self):

""" 訂閱 """

pub = self.client.pubsub()

pub.subscribe(self.queue_name)

return pub

def run_subscribe(self):

""" 啟動訂閱 """

pub = self.subscribe()

while true:

_, queue_name, message = pub.parse_response()

if _ == 'subscribe':

print('... 佇列啟動,開始接受訊息 ...')

continue

data =

print(data)

publish.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

from redis_message_queue import rmq

rmq = rmq(url='redis:', name='main')

if __name__ == '__main__':

print(rmq.publish('nice!'))

執行結果:

(demo) macbook:zhangyi$ python publish.py 

true

(demo) macbook: zhangyi$

subscribe.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

from redis_message_queue import rmq

rmq = rmq(url='redis:', name='main')

if __name__ == '__main__':

rmq.run_subscribe()

執行結果:

(demo) macbook:zhangyi$ python subscribe.py 

... 佇列啟動,開始接受訊息 ...

參考文章:

python3實現CryptoJS AES加密演算法

from crypto.cipher import aes from binascii import b2a hex,a2b hex import base64 class aescrypt def init self,key self.key key.encode utf8 self.mode a...

Python3 實現選擇排序

選擇排序 selection sort 原理很簡單,就是依次在未排序資料段中選擇出乙個最小的數,然後將其排列在已排序資料段末端,直到整個資料段排序完成演算法結束。程式如下,第乙個迴圈依次縮小未排序資料段的長度,並且每次將最小值暫定為未排序中第一位索引。第二個迴圈依次將該最小值與未排序資料段作比較,選...

python3實現線性單元

理論知識見 直接上python3的 coding utf 8 import matplotlib.pyplot as plt from functools import reduce class perceptron object 初始化,輸入訓練數目,啟用函式 def init self,inpu...