使用如下目錄結構:
celery.py
建立應用例項
config.py
引數配置檔案
tasks.py
執行任務檔案(生產者檔案)
from __future__ import absolute_import
from celery import celery
'celery_proj'
, include=
['celery_proj.tasks'])
'celery_proj.config'
)if __name__ ==
'__main__'
:)
from __future__ import absolute_import
from datetime import timedelta
celery_result_backend =
'redis:'
broker_url =
'redis:'
celery_timezone =
'asia/shanghai'
celerybeat_schedule =
,}
from __future__ import absolute_import
import requests
import sys
".."
)#呼叫封裝好的redis資料庫記錄推送狀態 已推送?未推送? 可以通過設定key的過期時間設定最小推送時間間隔
from model.redis_models import redis_db
from model.ord_models import
*import datetime
template_id =
'wjuawz1dghzjcok0w2bevwkzdorxitbnze5gufher9m'
'wx64118b44bbd3bfa3'
secret_key =
defget_access_token
:try
: payload =
req = requests.get(
'', params=payload,
timeout=3,
verify=
false
) access_token = req.json(
).get(
'access_token',""
)print
('access_token'
, access_token)
return access_token
except exception as e:
print
(e)def
push
( openid,
formid,
item_info:
dict
, access_token,
template_id=template_id)
:'''推送'''
data ='.
format
( item_info.get(
'item_pass_id'
,'未知'))
,"data":,
'keyword2':,
'keyword3':}
,"emphasis_keyword":''
} push_url =
''.format
( access_token)
result = requests.post(push_url, json=data, timeout=
3, verify=
false
)return result
@ap.task
defpullmsg
(delta=60)
:'''推送訊息'''
access_token = get_access_token(
) now = datetime.datetime.now(
) timedel = datetime.datetime.now(
)+ datetime.timedelta(minutes=delta)
# 搜尋所有快要開始的活動
objlist = db.session.query(ordobject.obj_id)
.filter
( ordobject.startord_time <= timedel,
ordobject.startord_time >= now)
# 搜尋相關訂單
ordlist = db.session.query(orderinfo.ordnum)
.filter
( orderinfo.objid.in_(objlist)
)# 查詢可推送的使用者和例項 此處為專案/活動
# 搜尋所有符合的使用者,預定資訊,專案名稱,位址, 並且逐個推送
# all (1, 'jhon', 'xixixi', 'bbbui0egbjy1zhbyw2khdufwvjje', datetime.datetime(2019, 5, 5, 14, 0))
result = db.session.query(
item.item_id,
item.item_name,
item.item_address,
item.pass_id,
order.ord_usid,
db.func.
min(
ordobject.startord_time)
).join(
orderinfo,
order.ord_num == orderinfo.ordnum)
.outerjoin(ordobject)
.filter
( ordobject.obj_id == orderinfo.objid,
ordobject.startord_time <= timedel,
ordobject.startord_time <= now)
.group_by(
item.item_id,
order.ord_usid)
.all()
# 按狀態推送 已推送的不推送
status_db = redis_db(
'pushmsg_status'
) formid_db = redis_db(
'formid'
)if result and access_token:
for i in result:
item_id, openid = i[0]
, i[4]
# 檢查是否已推送
response = status_db.check_pushmsg_status(item_id, openid)
if response:
# 已推送 下個
continue
else
:# 未推送
# 記錄狀態為已推送 假設未推送成功也最少24小時後再試
response = status_db.set_pushmsg_status(item_id, openid)
formid = formid_db.get_formid(openid)
if formid:
# 推送 無法推送的情況,需要 改進redis過期時間
item_info =
.. :'
.format
( i[5]
.year,
i[5]
.month,
i[5]
.day,
i[5]
.hour,
i[5]
.minute)
}return push(openid, formid, item_info, access_token)
else
:# 無法推送
return
'without formid!'
return
'no result for!'
else
:return
'no result!'
微信小程式傳送 模板訊息
實現步驟 1.先在前端獲取fromid,openid 2.將fromid,openid存入對應使用者的資料庫 3.下來就是寫模板訊息,查詢對應使用者的fromid和openid,將key值對應寫上 4.獲取access token,儲存時間7200 5.呼叫模板方法即可 前端 js獲取fromid存...
微信小程式客服自動回覆訊息後端實現
註冊賬號並授權 ngrok authtoken yourtoken 如果不註冊就會過期 8h之後生成的鏈結即會失效 本地埠內網穿透 ngrok http 8553 訪問 重新啟動之後更新網域名稱 可 ngrok http 8553 subdomain 4c4e91f7 如何使用ngrok生成固定的u...
微信小程式 幾步實現模版訊息的傳送
1.首先先建立乙個簡單的wxml頁面,包括乙個簡單的form表單 特別注意必須加上report submit true 2.獲取使用者 要傳送的使用者 的openid js 生命週期函式 監聽頁面載入 onload function options success function res php後...