一定結合自己的專案檔案對應的模組來看, 關於專案名下方會有標註,本篇只介紹celery作為非同步任務呼叫功能的呼叫,定時任務我用的flask-apscheduler庫先介紹下我的模組劃分
- tasks.py # celery函式放在對應應用下
manage.py # 使用flask-script啟動, 啟動檔案
mycelery.py # celery繼承,整合flask上下文等等
需要安裝的庫
flask-celery-helper
celery==
3.1.17
上**,本篇不介紹celery原理/使用方法, 直接告訴你怎麼寫
# mycelery.py
from flask_celery import celery
from flask import has_request_context, make_response, request
class
celerywithcontext
(celery)
:def
:super
(celerywithcontext, self)
task_base = self.task
class
contexttask
(task_base)
:#: name of the additional parameter passed to tasks
#: that contains information about the original flask request context.
context_arg_name =
'_flask_request_context'
def__call__
(self,
*args,
**kwargs)
:"""execute task code with given arguments."""
call =
lambda
:super
(contexttask, self)
.__call__(
*args,
**kwargs)
context = kwargs.pop(self.context_arg_name,
none
)if context is
none
or has_request_context():
return call(
)**context)
: result = call(
)# process a fake "response" so that
# ``@after_request`` hooks are executed'')
)return result
def(self, args=
none
, kwargs=
none
,**rest)
: self._include_request_context(kwargs)
return
super
(contexttask, self)
**rest)
def(self, args=
none
, kwargs=
none
,**rest)
: self._include_request_context(kwargs)
return
super
(contexttask, self)
.(args, kwargs,
**rest)
defretry
(self, args=
none
, kwargs=
none
,**rest)
: self._include_request_context(kwargs)
return
super
(contexttask, self)
.retry(args, kwargs,
**rest)
def_include_request_context
(self, kwargs)
:"""includes all the information about current flask request context
as an additional argument to the task.
"""ifnot has_request_context():
return
# keys correspond to arguments of :meth:`flask.test_request_context`
context =
if'?'
in request.url:
context[
'query_string'
]= request.url[
(request.url.find(
'?')+1
):] kwargs[self.context_arg_name]
= context
setattr
(contexttask,
'abstract'
,true
)setattr
(self,
'task'
, contexttask)
celery = celerywithcontext(
)from flask import flask
from mycelery import celery
def(config_name):)
# 中間部分是你的其他庫的初始化部分,省略..
. # manage.py
from mycelery import celery
base_dir = os.path.abspath(os.path.dirname(__file__)))
sys.path.insert(
0from flask_script import manager
if __name__ ==
'__main__'
: manager.run(
)# config.py
class
config
(object):
...# 檔名也只能是tasks
celery_imports =(,
) celery_broker_url =
'redis://localhost'
celery_result_backend =
'redis://localhost'
@staticmethod
def:
pass
config =
# tasks.py
from mycelery import celery
@celery.task(
)def
add(a, b)
:# 並不是一定要寫add,就是你封裝的耗時比較久的任務..
. do something
return
str(a+b)
# 呼叫
deffunc
(a,b):.
..add.delay(a,b)
return
配置好後啟動, 命令列輸入
celery -a manage.celery worker -l info
啟動成功介面
想看celery原理更多應用方式戳它➡️非同步神器celery
Flask工廠函式使用記錄
flask工廠函式 我的目錄結構flask project config init py config.py settings.py manage.py基本示例定義工廠函式 from flask import flask from config.settings import config from...
flask 工廠模式與celery結合
簡單介紹一下celery celery 是乙個非同步任務佇列。你可以使用它在你的應用上下文之外執行任務。總的想法就是你的應用程式可能需要執行任何消耗資源的任務都可以交給任務佇列,讓你的應用程式自由和快速地響應客戶端請求。官方文件 中文文件 flask 中使用 celery 別人寫的,大家可以參考著來...
python中的工廠函式
工廠函式 乙個能夠記住巢狀作用域的變數值的函式,儘管那個作用域已經不存在了。defmaker n defaction x return x n return action這定義了乙個外部函式,這個函式簡單地生成並且返回了乙個巢狀的函式,卻並不呼叫這個內嵌函式。如果呼叫外部函式就得到乙個生成的內嵌函式...