今天介紹通過celery實現乙個非同步任務。有這樣乙個需求,前端發起乙個查詢的請求,但是發起查詢後,查詢可能不會立即返回結果。這時候,發起查詢後,後端可以把這次查詢當作乙個task,並立即返回乙個能唯一表明該task的值,如taskid(使用者後面可以通過這個taskid 隨時檢視結果),使用者收到這個taskid後,可以轉去處理其他任務,而不必一直等待查詢結果。後端api呼叫celery來處理這個task,並將結果值儲存在乙個csv檔案中,後面使用者通過taskid 查詢時返回結果。
def
(environ,start_response):
"""部分**省略"""
query_string = environ['query_string']
servicegroupname = ""
for getparam in query_string.split("&"):
params = getparam.split("=")
resultinfo = ""
if params[0] == "type":
alerttype = params[1]
elif params[0] == "projectname":
projectname = params[1]
elif params[0] == "servicegroupname":
servicegroupname = params[1]
else:
resultinfo = error_info(-1, "get引數只能為type=<?>&projectname=<?>&servicegroupname=<?>;必須指定三個引數", {})
return [resultinfo]
taskid = 1
result_file_name = '/var/www/dba_api/api/test/'+ str(taskid) + '.csv'
contentinfo = json.dumps()
taskinfo = "任務已經建立,詳情請檢視:"% (taskid)
return [resultinfo]
在 celery 中執行任務的方法一共有三種:
在python的庫存放路徑中(一般是/usr/lib/python2.6/site-packages),建立乙個資料夾proj,進入proj目錄,建立三個檔案,init,將proj宣告乙個python包,celepy,其內容如下:
#_*_ coding:utf-8 _*_這裡我們定義了模組名稱proj以及celery 路由。還有乙個檔案,task.pyfrom __future__ import absolute_import
from celery import celery
broker="amqp://user:password@localhost//",
backend="amqp",
include=["proj.tasks"]
)celery_routes=,})
if __name__=="__main__":
#_*_ coding:utf-8 _*_i啟動celeryfrom __future__ import absolute_import
import random
import ******json as json
import types
import time
import mysqldb
import urllib2
import configparser as cparser
import hmac
import hashlib
import base64
defgetserviceinfo
(contentinfo):
contentinfo = json.loads(contentinfo)
servicegroupname = contentinfo['servicegroupname']
dbhost = contentinfo['dbhost']
dbport = int(contentinfo['dbport'])
dbuser = contentinfo['dbuser']
dbpasswd = contentinfo['dbpasswd']
msglib = messagelib.messagelib()
sql = "your sql"
#第三步:連線資料庫,執行**邏輯
try:
db_connection = mysqldb.connect(host=dbhost, port=dbport, passwd=dbpasswd, db="cmdb", user=dbuser, connect_timeout=2, charset="utf8")
cursor = db_connection.cursor()
cursor.execute(getservicegrouphostsql)
row = cursor.fetchall()
result =
for line in row:
...resultinfo = msglib.success_info(result)
return resultinfo
except exception, e:
raise
errorinfo = "dbhost:%s, port:%s, error:%s" % (dbhost, dbport, str(e))
#return getservicegrouphostsql,errorinfo
return msglib.error_info(-1, errorinfo, {})
celery -a proj worker -q getserviceinfo -l debug -c
6
最後,寫乙個結果,專門獲取查詢結果的結果,傳入的引數為taskid,部分**如下:
def
(environ,start_response):
status = '400 error'
start_response(status, response_headers)
status = '200 ok'
start_response(status, response_headers)
if environ['request_method'] != "get":
resultinfo = msglib.error_info(-1, "http請求型別不是get", {})
return [resultinfo]
query_string = environ['query_string']
servicegroupname = ""
for getparam in query_string.split("&"):
params = getparam.split("=")
resultinfo = ""
if params[0] == "taskid":
taskid = params[1]
else:
resultinfo = msglib.error_info(-1, "get引數無比指定taskid這個引數", {})
return [resultinfo]
logging.info(query_string)
result_file_name = '/var/www/dba_api/api/test/'+ str(taskid) + '.csv'
result =
try:
with open (result_file_name,'rb') as fp:
lines = csv.reader(fp)
for line in lines :
resultinfo = msglib.success_info(result)
return resultinfo
except exception, e:
errorinfo = "some thing wrong"
return msglib.error_info(-1, errorinfo, {})
HandlerThread 建立乙個非同步的後台執行緒
使用handlerthread幾大優點 1 製作乙個後台非同步執行緒,需要的時候就可以丟乙個任務給它,使用比較靈活 2 android系統提供的,使用簡單方便,內部自己封裝了looper handler機制 3 可以代替thread looper handler的寫法 4 可以避免專案中隨處可見的 ...
乙個非同步任務的梳理
開篇語略 下圖來引出話題 場景 個人使用者投遞簡歷,由於繁多的業務邏輯判斷和資料庫的不斷讀取 這裡就不用多說了吧 當個人使用者單個 批量投遞簡歷的時候,點選投遞按鈕之後會很長一段時間等待返回結果。需求 提公升使用者體驗,解決點選投遞按鈕之後的漫長等待,給人一種流暢感,當然不能去掉哪些業務邏輯判斷。設...
celery 重複執行同乙個task
今天用celery 執行 task的時候碰到了 重複執行的情況,而且是重複執行了8次 電腦是8核的 谷歌了一下,celery 在執行task時有個機制,就是任務時長超過了 visibility timeout 時還沒執行完,就會指定其他worker重新開始task,預設的時長是一小時.但是我這個肯定...