網上很關於queue的教程都只是個簡單的demo,看完也還是不知道怎麼在實戰中運用,下面的**是在用flask開發自動化測試平台時,用到的有關queue的**;
首先,在使用者介面所在的user.py檔案中匯入queue庫,並例項化為q , 同時新建乙個thread子執行緒,這個執行緒提供功能是消費執行緒,可以是自己定義的任務或功能,(這個過程在flask啟動的時候就自動執行)
其次,寫乙個/duilie的介面,在介面中進行q.put 操作,相當於生產線程,使用者每訪問一次這個介面,就相當於生產了乙個執行緒,提交了乙個等待執行的任務,並加入任務佇列中去,比如測試工程師下班前提交自己的自動化測試任務,多人提交,晚上按順序執行的場景;
###########queue訊息佇列練習開始############
from queue import queue
q = queue(maxsize = 5) # 建立佇列物件
#消費執行緒
def start_test(*args):
while true:
# print('當前佇列長度為:%s ' % q.qsize())
if q.get() == 'version 1.0':
i = 1
while true:
print("check " + 'version 1.0' + '_' + str(i))
i += 1
time.sleep(3)
if i > 10:
print("check_01 over")
break
q.task_done()
# 如果用了q.join(),那就一定要寫q.task_done() ,來通知q.join,當前事件已完成, 不寫q.task_done()的話,q.join收不到本事件完成時的訊號,還會阻塞在那裡等訊號,join後的**得不到正常執行
# 如果同時不寫q.join,和q.task_done() ,也是可以的,可以正常執行
# 只寫q.task_done()不寫q.join也是可以的,可以正常執行
time.sleep(20)
# print("佇列已經為空")
from threading import thread
import time
thread_check = thread(target=start_test, args=(none,))
thread_check.start()
@user.route('/duilie')
def duilie():
#可以從http請求中獲取version
version = "version 1.0"
try:
#生產線程
q.put(version,block=false)
# q.join()
return "hello world"
except exception:
print("佇列已滿,當前最多同時執行%s個任務"% q.maxsize)
return "佇列已滿,當前最多同時執行%s個任務" % q.maxsize
##################queue練習結束###############
python訊息佇列Queue
例項1 訊息佇列queue,不要將檔案命名為 queue.py 否則會報異常 importerror cannot import name queue coding utf 8 from multiprocessing import queue q queue 3 初始化乙個queue物件,最多可接...
queue之 單向訊息佇列
import queue q queue.queue 建立乙個單項佇列 qsize 檢視這個單項佇列元素的個數 empty 與 clear功能是一樣的 full 是用來檢視這個佇列是否填滿了,佇列可以定義佇列的個數,所以可以通過full來檢視佇列是否填滿了 put 插入 資料 get 取資料 imp...
ThinkPHP5 0 Queue訊息佇列
thinkphp官方團隊開發的乙個專門支援佇列服務的擴充套件包,使用composer管理,使用起來非常方便 1.queue內建了 redis,database,topthink sync這四種驅動,本文使用redis驅動 3.queue訊息訊息可進行發布,獲取,執行,刪除,重發,失敗處理,延遲執行,...