引言
最近在用 sanic 寫東西,所有涉及到io阻塞的**都需要用 aio 的模組,好在近年來 asyncio 生態圈發展的還算不錯,該有的都有 ~
近期業務中 登入/註冊 業務涉及的很複雜(涉及到邀請),需要解鎖、傳送簡訊等操作,想來這麼個模組整的很繁瑣,以後加個滑動驗證那還了得。
於是乎,想整乙個類似於celery 的模組,進行任務解耦,但是目前 celery 還目前不支援非同步(官方將在 celery5 支援非同步)。
所以目前查閱資料發現了乙個 python 實現的 arq 模組,已經應用在了生產環境,效果還算不錯 ~
官方是這麼介紹它的:
首先先安裝一下它:
$ pip install arq
那麼接下來,快速了解下它的使用吧 ~
先看下面編寫的這段**
import asyncio
from arq import create_pool
from arq.connections import redissettings
''''''
async
defsay_hello
(ctx, name)
->
none
:"""任務函式
parameters
----------
ctx: dict
工作者上下文
name: string
returns
-------
dict
"""print
(ctx)
print
(f"hello "
)async
defstartup
(ctx)
:print
("starting..."
)async
defshutdown
(ctx)
:print
("ending..."
)async
defmain()
:# 建立
redis =
await create_pool(redissettings(password=
"root123456"))
# 分配任務
await redis.enqueue_job(
'say_hello'
, name=
"liuzhichao"
)# workersettings定義了建立工作時要使用的設定,
# 它被arq cli使用
class
workersettings
:# 佇列使用 `redis` 配置, 可以配置相關引數
# 例如我的密碼是 `rooot123456`
redis_settings = redissettings(password=
"root123456"
)# 被監聽的函式
functions =
[say_hello]
# 開啟 `worker` 執行
on_startup = startup
# 終止 `worker` 後執行
on_shutdown = shutdown
if __name__ ==
'__main__'
: loop = asyncio.get_event_loop(
) loop.run_until_complete(main(
))
1、接下來看我們怎麼執行它
$ arq tasks.workersettings
maybe you can see 10:
56:25: starting worker for
1 functions: say_hello10:
56:25: redis_version=
4.0.1 mem_usage=
32.00m clients_connected=
6 db_keys=
19189
starting.
..
2、執行 tasks.py 檔案
$ python3 tasks.py
maybe you can see 11:
01:04:
0.29s → 5a5ac0edd5ad4b318b9848637b1ae800
:say_hello(name=
'liuzhichao'
)hello liuzhichao11:
01:04:
0.00s ← 5a5ac0edd5ad4b318b9848637b1ae800
:say_hello ●
3、那麼這個簡單任務就執行完成了,是不是特別簡單 ~
定時任務
#! /usr/bin/env python
# -*- coding: utf-8 -*-
''''''
from arq import cron
from arq.connections import redissettings
async
defrun_regularly
(ctx)
:# 表示在 10、11、12 分 50秒的時候列印
print
('run job at 26:05, 27:05 and 28:05'
)class
workersettings
: redis_settings = redissettings(password=
"root123456"
) cron_jobs =
[ cron(run_regularly, minute=
, second=50)
]
1、執行它
$ arq tasks.workersettings
if run out of the time,maybe you can see11:
10:25: starting worker for
1 functions: cron:run_regularly11:
10:25: redis_version=
4.0.1 mem_usage=
32.00m clients_connected=
6 db_keys=
1919011:
10:51:
0.51s → cron:run_regularly(
)run foo job at 26:05
,27:05
and28:05
11:10:
51:0.00s ← cron:run_regularly ● 11:
11:51:
0.51s → cron:run_regularly(
)run foo job at 26:05
,27:05
and28:05
11:11:
51:0.00s ← cron:run_regularly ● 11:
12:50:
0.50s → cron:run_regularly(
)run foo job at 26:05
,27:05
and28:05
11:12:
50:0.00s ← cron:run_regularly ●
按照此時間線,然後會一直進行無限迴圈下去 python非同步任務佇列示例
很多場景為了不阻塞,都需要非同步 機制。這是乙個簡單的例子,大家參考使用吧 複製 如下 usr bin env python coding utf 8 import logging import queue import threading def func a a,b return a b def...
celery 非同步任務佇列
celery是基於python開發的分布式任務佇列。它可以讓任務的執行完全脫離主程式,甚至可以被分配到其他主機上執行。我們通常使用它來實現非同步任務 async task 和定時任務 crontab 它的架構組成如下圖 celery 4.x以上版本不安裝該模組,新增任務時會報錯 使用celery包含...
非同步任務佇列Celery在Django中的使用
前段時間在django web平台開發中,碰到一些請求執行的任務時間較長 幾分鐘 為了加快使用者的響應時間,因此決定採用非同步任務的方式在後台執行這些任務。在同事的指引下接觸了celery這個非同步任務佇列框架,鑑於網上關於celery和django結合的文件較少,大部分也只是粗粗介紹了大概的流程,...