廢話不多說,我們先封裝乙個類。
redis_message_queue.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from redis import redis, connectionpool
class rmq(object):
def __init__(self, url, name):
# self.client = redis(host=url)
pool = connectionpool.from_url(url=url, decode_responses=true)
self.client = redis(connection_pool=pool)
self.queue_name = name
def publish(self, data):
""" 發布 """
self.client.publish(self.queue_name, data)
return true
def subscribe(self):
""" 訂閱 """
pub = self.client.pubsub()
pub.subscribe(self.queue_name)
return pub
def run_subscribe(self):
""" 啟動訂閱 """
pub = self.subscribe()
while true:
_, queue_name, message = pub.parse_response()
if _ == 'subscribe':
print('... 佇列啟動,開始接受訊息 ...')
continue
data =
print(data)
publish.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from redis_message_queue import rmq
rmq = rmq(url='redis:', name='main')
if __name__ == '__main__':
print(rmq.publish('nice!'))
執行結果:
(demo) macbook:zhangyi$ python publish.py
true
(demo) macbook: zhangyi$
subscribe.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from redis_message_queue import rmq
rmq = rmq(url='redis:', name='main')
if __name__ == '__main__':
rmq.run_subscribe()
執行結果:
(demo) macbook:zhangyi$ python subscribe.py
... 佇列啟動,開始接受訊息 ...
參考文章: python3實現CryptoJS AES加密演算法
from crypto.cipher import aes from binascii import b2a hex,a2b hex import base64 class aescrypt def init self,key self.key key.encode utf8 self.mode a...
Python3 實現選擇排序
選擇排序 selection sort 原理很簡單,就是依次在未排序資料段中選擇出乙個最小的數,然後將其排列在已排序資料段末端,直到整個資料段排序完成演算法結束。程式如下,第乙個迴圈依次縮小未排序資料段的長度,並且每次將最小值暫定為未排序中第一位索引。第二個迴圈依次將該最小值與未排序資料段作比較,選...
python3實現線性單元
理論知識見 直接上python3的 coding utf 8 import matplotlib.pyplot as plt from functools import reduce class perceptron object 初始化,輸入訓練數目,啟用函式 def init self,inpu...