import sched
import threading
import time
defnew_task
(function, delay_time, args):
""" 定時任務函式
:param function: 需要執行的函式
:param delay_time: 延遲多少時間執行
:param args: 需要給function函式傳遞的引數,是個tuple
:return:
"""scheduler = sched.scheduler(time.time, time.sleep)
scheduler.enter(delay_time, 10, function, args)
thread = threading.thread(target=scheduler.run)
thread.start()
return thread
# 我們寫乙個初始function函式
defhelloworld
(what):
print
'this is {}'.format(what)
# 如何呼叫定時執行這個helloworld函式
new_task(hellowrold, 3 ('what', )) # 3秒後執行helloworld函式
import threading
l = threading.lock()
defhelloworld
(what, threadname):
l.acquire() # 執行緒鎖
print
'this is {}, and this is threadname: {}'.format(what, threadname)
l.release() # 釋放鎖
# for 迴圈多次執行這個函式
for i in range(20):
new_task(hellowrold, 3 ('what', 'thread-name-{}'.format(i))
# 這樣每次就會執行一次函式,當函式釋放鎖之後才執行下一次,
# l.acquire() 和 l.release() 是成對使用的
import multiprocessing
defprint_time
(s, threadnname, delay, **kwargs):
if kwargs.get('lock', none) is
notnone:
print('get lock')
kwargs.get('lock').acquire()
count = 0
while count < 5:
time.sleep(delay)
count += 1
print('this thread name is and 當前時間是 '.format(timename=time.ctime(time.time()),
threadname=threadnname))
if kwargs.get('lock', none) is
notnone:
kwargs.get('lock').release()
pass
s = multiprocessing.semaphore(2) #設定執行緒池的大小,每次只可以而同事啟動2個執行緒
for i in range(20):
"""入口函式
"""p = multiprocessing.process(target=print_time, args=(s, 'xiancheng{}'.format(i), 3), kwargs=)
p.start()
python使用多執行緒
做測試的時候,我們不得不接觸下多執行緒,雖然python不能發揮cpu多核的優勢,但是在測試的時候依然十分必要,比如在做介面測試的時候,發出請求之後,在等待伺服器端給予回應的時候,我們不應該傻傻地等,其它執行緒可以在等待的同時發出請求。這樣,我們就能更快地完成我們的測試任務。coding utf 8...
python 多執行緒使用
一 python中的執行緒使用 python中使用執行緒有兩種方式 函式或者用類來包裝執行緒物件。1 函式式 呼叫thread模組中的start new thread 函式來產生新執行緒。如下例 python view plain copy import time import thread def...
python多執行緒使用
一 簡介 由於python2逐漸不被維護,以及python更優越的效能。後面介紹的python相關知識都是用python3版本寫。這裡介紹python3的多執行緒相關知識,執行緒的建立使用threading包。二 簡單執行緒建立 簡介執行緒的建立,先定義執行任務的函式,然後呼叫threading.t...