通過celery非同步處理以乙個查詢任務

2021-08-16 22:19:15 字數 4186 閱讀 5650

今天介紹通過celery實現乙個非同步任務。有這樣乙個需求,前端發起乙個查詢的請求,但是發起查詢後,查詢可能不會立即返回結果。這時候,發起查詢後,後端可以把這次查詢當作乙個task,並立即返回乙個能唯一表明該task的值,如taskid(使用者後面可以通過這個taskid 隨時檢視結果),使用者收到這個taskid後,可以轉去處理其他任務,而不必一直等待查詢結果。後端api呼叫celery來處理這個task,並將結果值儲存在乙個csv檔案中,後面使用者通過taskid 查詢時返回結果。

def

(environ,start_response):

"""部分**省略"""

query_string = environ['query_string']

servicegroupname = ""

for getparam in query_string.split("&"):

params = getparam.split("=")

resultinfo = ""

if params[0] == "type":

alerttype = params[1]

elif params[0] == "projectname":

projectname = params[1]

elif params[0] == "servicegroupname":

servicegroupname = params[1]

else:

resultinfo = error_info(-1, "get引數只能為type=<?>&projectname=<?>&servicegroupname=<?>;必須指定三個引數", {})

return [resultinfo]

taskid = 1

result_file_name = '/var/www/dba_api/api/test/'+ str(taskid) + '.csv'

contentinfo = json.dumps()

taskinfo = "任務已經建立,詳情請檢視:"% (taskid)

return [resultinfo]

在 celery 中執行任務的方法一共有三種:
在python的庫存放路徑中(一般是/usr/lib/python2.6/site-packages),建立乙個資料夾proj,進入proj目錄,建立三個檔案,init,將proj宣告乙個python包,celepy,其內容如下:

#_*_ coding:utf-8 _*_

from __future__ import absolute_import

from celery import celery

broker="amqp://user:password@localhost//",

backend="amqp",

include=["proj.tasks"]

)celery_routes=,})

if __name__=="__main__":

這裡我們定義了模組名稱proj以及celery 路由。還有乙個檔案,task.py

#_*_ coding:utf-8 _*_i

from __future__ import absolute_import

import random

import ******json as json

import types

import time

import mysqldb

import urllib2

import configparser as cparser

import hmac

import hashlib

import base64

defgetserviceinfo

(contentinfo):

contentinfo = json.loads(contentinfo)

servicegroupname = contentinfo['servicegroupname']

dbhost = contentinfo['dbhost']

dbport = int(contentinfo['dbport'])

dbuser = contentinfo['dbuser']

dbpasswd = contentinfo['dbpasswd']

msglib = messagelib.messagelib()

sql = "your sql"

#第三步:連線資料庫,執行**邏輯

try:

db_connection = mysqldb.connect(host=dbhost, port=dbport, passwd=dbpasswd, db="cmdb", user=dbuser, connect_timeout=2, charset="utf8")

cursor = db_connection.cursor()

cursor.execute(getservicegrouphostsql)

row = cursor.fetchall()

result =

for line in row:

...resultinfo = msglib.success_info(result)

return resultinfo

except exception, e:

raise

errorinfo = "dbhost:%s, port:%s, error:%s" % (dbhost, dbport, str(e))

#return getservicegrouphostsql,errorinfo

return msglib.error_info(-1, errorinfo, {})

啟動celery

celery -a proj worker -q getserviceinfo -l debug -c

6

最後,寫乙個結果,專門獲取查詢結果的結果,傳入的引數為taskid,部分**如下:

def

(environ,start_response):

status = '400 error'

start_response(status, response_headers)

status = '200 ok'

start_response(status, response_headers)

if environ['request_method'] != "get":

resultinfo = msglib.error_info(-1, "http請求型別不是get", {})

return [resultinfo]

query_string = environ['query_string']

servicegroupname = ""

for getparam in query_string.split("&"):

params = getparam.split("=")

resultinfo = ""

if params[0] == "taskid":

taskid = params[1]

else:

resultinfo = msglib.error_info(-1, "get引數無比指定taskid這個引數", {})

return [resultinfo]

logging.info(query_string)

result_file_name = '/var/www/dba_api/api/test/'+ str(taskid) + '.csv'

result =

try:

with open (result_file_name,'rb') as fp:

lines = csv.reader(fp)

for line in lines :

resultinfo = msglib.success_info(result)

return resultinfo

except exception, e:

errorinfo = "some thing wrong"

return msglib.error_info(-1, errorinfo, {})

HandlerThread 建立乙個非同步的後台執行緒

使用handlerthread幾大優點 1 製作乙個後台非同步執行緒,需要的時候就可以丟乙個任務給它,使用比較靈活 2 android系統提供的,使用簡單方便,內部自己封裝了looper handler機制 3 可以代替thread looper handler的寫法 4 可以避免專案中隨處可見的 ...

乙個非同步任務的梳理

開篇語略 下圖來引出話題 場景 個人使用者投遞簡歷,由於繁多的業務邏輯判斷和資料庫的不斷讀取 這裡就不用多說了吧 當個人使用者單個 批量投遞簡歷的時候,點選投遞按鈕之後會很長一段時間等待返回結果。需求 提公升使用者體驗,解決點選投遞按鈕之後的漫長等待,給人一種流暢感,當然不能去掉哪些業務邏輯判斷。設...

celery 重複執行同乙個task

今天用celery 執行 task的時候碰到了 重複執行的情況,而且是重複執行了8次 電腦是8核的 谷歌了一下,celery 在執行task時有個機制,就是任務時長超過了 visibility timeout 時還沒執行完,就會指定其他worker重新開始task,預設的時長是一小時.但是我這個肯定...