from apscheduler.schedulers.blocking importblockingscheduler
import
datetime
from apscheduler.events import
event_job_error, event_job_executed
deftest_1(a, b):
(a, b)
deftest_2(a, b):
print('
*'*16)
(a) c =0
#修改c的值,結束異常
if datetime.datetime(2020, 5, 26, 17, 19, 30) c = 1
print(b/c)
defjob_listener(event):
job =sched.get_job(event.job_id)
args =job.args
#正常結束任務
ifnot
event.exception:
#恢復原先的任務定時時間
sched.reschedule_job(event.job_id, trigger='
cron
', hour='
00', minute='
10', second='00'
)
print('
*'*20,'
成功', '
*'*20)
for job in
sched.get_jobs():
(job.name)
(job.trigger)
else
:
#計算當前時間5秒後的時間
next_datetime = datetime.datetime.now() + datetime.timedelta(seconds=5)
#修改出現異常的任務的定時,重新計算下次執行時間,本例為5秒後
sched.reschedule_job(event.job_id, trigger='
cron
', hour=next_datetime.hour, minute=next_datetime.minute, second=next_datetime.second)
msg = f"
jobname=|jobtrigger=|errcode=|exception=|traceback=|scheduled_time="if
__name__ == "
__main__":
service = 1seach_date_list = 2job_defaults =
#建立定時任務例項
sched =blockingscheduler()
sched.configure(job_defaults=job_defaults)
#新增任務1
sched.add_job(test_1,args=(service, seach_date_list,), trigger='
cron',
hour='
14', minute='
37', second='
00', id="
out_warehouse_order")
#新增任務2
sched.add_job(test_2,args=(service, seach_date_list,), trigger='
cron',
hour='
17', minute='
19', second='
00', id='
sale_after')
#建立監聽,任務出錯和任務正常結束都會執行job_listener函式
sched.add_listener(job_listener, event_job_error |\
event_job_executed)
#開始定時任務
sched.start()
apscheduler執行定時任務框架
最簡單用法 匯入模組 from apscheduler.schedulers.blocking import blockingscheduler from datetime import datetime 建立物件 scheduler blockingscheduler 建立定時任務 觸發器為 in...
使用apScheduler執行定時任務
從這篇博文可以了解到apscheduler的詳細解釋,也很簡潔優雅,改寫後可以直接用於生產環境!從這篇博文可以了解cron表示式的詳細解釋 coding utf 8 from apscheduler.schedulers.blocking import blockingscheduler impor...
APScheduler執行定時任務 簡單使用
本例目的 每天17 19 07列印 hello scheduler 版本3.3.1 tar zxvf 原始碼包名安裝 python setup.py installfrom apscheduler.schedulers.blocking import blockingscheduler schedu...