Queue訊息佇列實戰python

2021-10-19 17:27:52 字數 1680 閱讀 6572

網上很關於queue的教程都只是個簡單的demo,看完也還是不知道怎麼在實戰中運用,下面的**是在用flask開發自動化測試平台時,用到的有關queue的**;

首先,在使用者介面所在的user.py檔案中匯入queue庫,並例項化為q  ,  同時新建乙個thread子執行緒,這個執行緒提供功能是消費執行緒,可以是自己定義的任務或功能,(這個過程在flask啟動的時候就自動執行)

其次,寫乙個/duilie的介面,在介面中進行q.put 操作,相當於生產線程,使用者每訪問一次這個介面,就相當於生產了乙個執行緒,提交了乙個等待執行的任務,並加入任務佇列中去,比如測試工程師下班前提交自己的自動化測試任務,多人提交,晚上按順序執行的場景;

###########queue訊息佇列練習開始############

from queue import queue

q = queue(maxsize = 5)  # 建立佇列物件

#消費執行緒

def start_test(*args):

while true:

# print('當前佇列長度為:%s ' % q.qsize())

if q.get() == 'version 1.0':

i = 1

while true:

print("check " + 'version 1.0' + '_'  +  str(i))

i += 1

time.sleep(3)

if i > 10:

print("check_01 over")

break

q.task_done()     

# 如果用了q.join(),那就一定要寫q.task_done() ,來通知q.join,當前事件已完成, 不寫q.task_done()的話,q.join收不到本事件完成時的訊號,還會阻塞在那裡等訊號,join後的**得不到正常執行

#  如果同時不寫q.join,和q.task_done() ,也是可以的,可以正常執行

#  只寫q.task_done()不寫q.join也是可以的,可以正常執行

time.sleep(20)

# print("佇列已經為空")

from  threading import thread

import time

thread_check = thread(target=start_test, args=(none,))

thread_check.start()

@user.route('/duilie')

def duilie():

#可以從http請求中獲取version

version = "version 1.0"

try:

#生產線程

q.put(version,block=false)

# q.join()    

return "hello world"

except  exception:

print("佇列已滿,當前最多同時執行%s個任務"% q.maxsize)

return  "佇列已滿,當前最多同時執行%s個任務" %  q.maxsize

##################queue練習結束###############

python訊息佇列Queue

例項1 訊息佇列queue,不要將檔案命名為 queue.py 否則會報異常 importerror cannot import name queue coding utf 8 from multiprocessing import queue q queue 3 初始化乙個queue物件,最多可接...

queue之 單向訊息佇列

import queue q queue.queue 建立乙個單項佇列 qsize 檢視這個單項佇列元素的個數 empty 與 clear功能是一樣的 full 是用來檢視這個佇列是否填滿了,佇列可以定義佇列的個數,所以可以通過full來檢視佇列是否填滿了 put 插入 資料 get 取資料 imp...

ThinkPHP5 0 Queue訊息佇列

thinkphp官方團隊開發的乙個專門支援佇列服務的擴充套件包,使用composer管理,使用起來非常方便 1.queue內建了 redis,database,topthink sync這四種驅動,本文使用redis驅動 3.queue訊息訊息可進行發布,獲取,執行,刪除,重發,失敗處理,延遲執行,...