rq
是redis queue
的縮寫, 乙個基於redis
的簡單、輕量的非同步訊息佇列工具。
如果在**中使用者發起乙個用時很久的請求,如果用同步的方式,伺服器就會返回超時。這時候就需要用非同步請求,使用者發起請求後,服務端把作業扔給另乙個程序去執行,然後立刻返回給使用者,使用者再通過輪詢或者其他方式來獲取作業的執行進度和執行結果。
rq
的worker
作用就相當於啟動乙個新的守護程序,監聽到有新的任務進來就會執行,而使用者發起的請求則立即返回。
worker
任務監聽,守護程序。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from redis import redis
from rq import worker, connection
queues = ['default']
redis_url = 'redis:'
redis_connection = redis.from_url(redis_url)
def run_worker():
with connection(redis_connection):
worker = worker(queues)
worker.work()
if __name__ == '__main__':
run_worker()
send
傳送訊息佇列。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from rq import queue
from task import task
from redis import redis
redis_url = 'redis:'
redis_connection = redis.from_url(redis_url)
queue = queue(connection=redis_connection)
if __name__ == '__main__':
job = queue.enqueue(task, name='非同步佇列')
print(job.get_status())
task
實際執行的任務。
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import time
def task(name):
time.sleep(2)
print(name)
time.sleep(2)
return name
啟動worker
。
(demo) macbook:rq zhangyi$ python worker.py
12:30:57 worker rq:worker:3f2e384e636941939f7a054f20ad3ba5: started, version 1.6.1
12:30:57 subscribing to channel rq:pubsub:3f2e384e636941939f7a054f20ad3ba5
12:30:57 *** listening on default...
12:31:00 default: task.task(name='非同步佇列') (b539426a-9776-4b11-810d-a2d73c779c18)
非同步佇列
12:31:04 default: job ok (b539426a-9776-4b11-810d-a2d73c779c18)
12:31:04 result is kept for 500 seconds
執行傳送任務。
(demo) macbook:rq zhangyi$ python send.py
queued
(demo) macbook:rq zhangyi$
檢視作業執行的情況。
如果函式執行正常,返回作業的return,如果有異常,返回none,如果作業沒執行,也是返回none。
print(job.result)
獲取作業的狀態 queued 還在佇列中,failed:執行失敗,finished 完成。
print(job.get_status())
為作業設定乙個 id,如果沒有set_id 的操作,作業的id會是乙個隨機的唯一的字串。
job.set_id('my_id')
獲取作業的 id。
my_id = job.get_id()
把job例項轉化成乙個字典。
print(job.to_dict())
從redis中把該作業刪除掉。
print(job.delete())
取消作業,儘管作業已經被執行,也可以取消。
print(job.cancel())
返回是否存在該id的作業。
from rq import job
job.job.exists(my_id, redis_conn)
建立該作業id的例項。
from rq import job
my_job = job.job(my_id, redis_conn)
如果我們專案使用的是flask
,有專門的元件flask-rq2
可供使用。 Python3 實現 Redis 訊息佇列
廢話不多說,我們先封裝乙個類。redis message queue.py usr bin env python3 coding utf 8 from redis import redis,connectionpool class rmq object def init self,url,name ...
python3 非同步 async with 用法
非同步上下文管理器指的是在enter和exit方法處能夠暫停執行的上下文管理器。為了實現這樣的功能,需要加入兩個新的方法 aenter 和 aexit 這兩個方法都要返回乙個 awaitable型別的值。非同步上下文管理器的一種使用方法是 class asynccontextmanager asyn...
python3 非同步模組asyncio
yield方法引入,這裡存在的問題是,如果你想建立從0到1,000,000這樣乙個很大的序列,你不得不建立能容納1,000,000個整數的列表。但是當加入了生成器之後,你可以不用建立完整的序列,你只需要能夠每次儲存乙個整數的記憶體即可。import asyncio asyncio.coroutine...