python3 非同步訊息佇列 RQ 處理

2021-10-10 17:13:21 字數 2992 閱讀 6907

rqredis queue的縮寫, 乙個基於redis的簡單、輕量的非同步訊息佇列工具。

如果在**中使用者發起乙個用時很久的請求,如果用同步的方式,伺服器就會返回超時。這時候就需要用非同步請求,使用者發起請求後,服務端把作業扔給另乙個程序去執行,然後立刻返回給使用者,使用者再通過輪詢或者其他方式來獲取作業的執行進度和執行結果。

rqworker作用就相當於啟動乙個新的守護程序,監聽到有新的任務進來就會執行,而使用者發起的請求則立即返回。

worker

任務監聽,守護程序。

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

from redis import redis

from rq import worker, connection

queues = ['default']

redis_url = 'redis:'

redis_connection = redis.from_url(redis_url)

def run_worker():

with connection(redis_connection):

worker = worker(queues)

worker.work()

if __name__ == '__main__':

run_worker()

send

傳送訊息佇列。

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

from rq import queue

from task import task

from redis import redis

redis_url = 'redis:'

redis_connection = redis.from_url(redis_url)

queue = queue(connection=redis_connection)

if __name__ == '__main__':

job = queue.enqueue(task, name='非同步佇列')

print(job.get_status())

task

實際執行的任務。

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

import time

def task(name):

time.sleep(2)

print(name)

time.sleep(2)

return name

啟動worker

(demo) macbook:rq zhangyi$ python worker.py 

12:30:57 worker rq:worker:3f2e384e636941939f7a054f20ad3ba5: started, version 1.6.1

12:30:57 subscribing to channel rq:pubsub:3f2e384e636941939f7a054f20ad3ba5

12:30:57 *** listening on default...

12:31:00 default: task.task(name='非同步佇列') (b539426a-9776-4b11-810d-a2d73c779c18)

非同步佇列

12:31:04 default: job ok (b539426a-9776-4b11-810d-a2d73c779c18)

12:31:04 result is kept for 500 seconds

執行傳送任務。

(demo) macbook:rq zhangyi$ python send.py 

queued

(demo) macbook:rq zhangyi$

檢視作業執行的情況。

如果函式執行正常,返回作業的return,如果有異常,返回none,如果作業沒執行,也是返回none。

print(job.result)
獲取作業的狀態 queued 還在佇列中,failed:執行失敗,finished 完成。

print(job.get_status())
為作業設定乙個 id,如果沒有set_id 的操作,作業的id會是乙個隨機的唯一的字串。

job.set_id('my_id')
獲取作業的 id。

my_id = job.get_id()
把job例項轉化成乙個字典。

print(job.to_dict())
從redis中把該作業刪除掉。

print(job.delete())
取消作業,儘管作業已經被執行,也可以取消。

print(job.cancel())
返回是否存在該id的作業。

from rq import job

job.job.exists(my_id, redis_conn)

建立該作業id的例項。

from rq import job

my_job = job.job(my_id, redis_conn)

如果我們專案使用的是flask,有專門的元件flask-rq2可供使用。

Python3 實現 Redis 訊息佇列

廢話不多說,我們先封裝乙個類。redis message queue.py usr bin env python3 coding utf 8 from redis import redis,connectionpool class rmq object def init self,url,name ...

python3 非同步 async with 用法

非同步上下文管理器指的是在enter和exit方法處能夠暫停執行的上下文管理器。為了實現這樣的功能,需要加入兩個新的方法 aenter 和 aexit 這兩個方法都要返回乙個 awaitable型別的值。非同步上下文管理器的一種使用方法是 class asynccontextmanager asyn...

python3 非同步模組asyncio

yield方法引入,這裡存在的問題是,如果你想建立從0到1,000,000這樣乙個很大的序列,你不得不建立能容納1,000,000個整數的列表。但是當加入了生成器之後,你可以不用建立完整的序列,你只需要能夠每次儲存乙個整數的記憶體即可。import asyncio asyncio.coroutine...