1 . 執行緒佇列
from multiprocessing queue , joinablequeue #程序ipc佇列
from queue import queue #執行緒佇列 先進先出
from queue import lifoqueue #後進先出的
方法都是一樣的 : put , get , put_nowait , get_nowait , full , empty , qsize
佇列 queue : 先進先出 , 自帶鎖 , 資料安全
棧 lifoqueue : 後進先出 , 自帶鎖 , 資料安全
q = lifoqueue(5)
q.put(3)
q.put_nowait(4)
print(q.get()) #誰最後進的,就會先取誰
q.get_nowait()
print(q.full())
pirnt(q.empty())
print(q.qsize())
from queue import priorityqueue #優先順序佇列
q = priorityqueue()
q.put((10,"aaa"))
q.put((4,"bbb"))
print(q.get())
執行緒池threading 沒有執行緒池的
multiprocessing pool
concurrent.futures 幫助管理執行緒池和程序池
import time
from threading import currentthread,get_ident
from concurrent.futures import threadpoolexecutor #幫助啟動執行緒池
from concurrent.futures import processpoolexecutor #幫助啟動程序池
def func(i):
time.sleep(1)
print("in%s%s"%(i,currentthread()))
return i**2
def back(fn):
print(fn.result(),currentthread())
#map啟動多執行緒任務
t = threadpoolexecutor(5)
t.map(func,range(20)) == for i in range(20):
t.submit(func,i)
#submit非同步提交任務
t = threadpoolexecutor(5)
for i in range(20):
t.submit(func,i)
t.shutdown()
print("main:",currentthread()) #起多少個執行緒池 , 5*cpu的個數
#獲取任務結果
t = threadpoolexecutor(5)
li =
for i in range(20):
ret = t.submit(func,i)
t.shutdown()
for i in li:
print(i.result())
print("main:",currentthread())
#**函式
t = threadpoolexecutor(5)
for i in range(20):
t.submit(func,i).add_done_callback(back)
#**函式(程序版)
import os,time
from concurrent.futures import processpoolexecutor
def func(i):
print("in%s%s"%(i,os.getpid()))
return i**2
def back(fn):
print(fn.result(),os.getpid())
if __name__ == "__main__":
p = processpoolexecutor(5)
for i in range(20):
p.submit(func,i).add_done_callback(back)
print("main:",os.getpid())
multiprocessing模組自帶程序池的
threading模組是沒有執行緒池的
concurrent.futures 程序池和執行緒池 : 高度封裝 , 程序池/執行緒池的統一的使用方式
建立執行緒池/程序池 processpoolexecutor threadpoolexecutor
ret .result()獲取結果,如果想實現非同步效果,應該使用列表
shutdown == close + join 同步控制
程序 : 資源分配的最小單位 , 班級
執行緒 : cpu排程最小單位 , 人
cpython執行緒不能利用多核的 ,多執行緒無法利用多核 ,乙個執行緒能同時執行多個任務.
協程 : 能在一條執行緒的基礎上 ,再過個任務之間互相切換 .節省了執行緒開啟的消耗.
協程是從python**的級別排程的 , 正常的執行緒是cpu排程的最小單位 ; 協程的排程並不是由作業系統來完成的.
之前學過的協程在兩個任務之間相互切換的是生成器函式:yield
def func():
print(1)
x = yield "aaa"
print(x)
yield "bbb"
g = func()
print(next(g))
print(g.send("***"))
在多個函式之間互相切換的功能 --- 協程
def consumer():
while true:
x = yield
print(x)
def producer():
g = consumer()
next(g) #預激
for i in range(20):
g.send(i)
producer()
yield只有程式之間的切換,沒有重利用任何io操作的時間
模組的安裝 :
pip3 install 要安裝的模組的名字
使用協程減少io操作帶來的時間消耗
from gevent import monkey ; monkey.patch_all()
import gevent,time
def eat():
print("吃")
time.sleep(2)
print("吃完了")
def play():
print("玩")
time.sleep(1)
print("玩完了")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
沒有執行 , 為什麼沒執行???是需要開啟麼?
沒有開啟但是切換了
gevent幫我們做了切換,做切換是有條件的,遇到io才切換
gevent不認識除了gevent這個模組內以外的io操作
使用join可以一直阻塞直到協程任務完成
幫助gevent來認識其他模組中的阻塞
from gevent import monkey;monkey.patch_all() 寫在其他模組匯入之前.
使用協程來實現tcp協議 :
伺服器 :
from gevent import monkey;monkey.patch_all()
import gevent,socket
def func(conn):
while 1:
conn.send(b"hello")
print(conn.recv(1024))
sk = socket.socket()
sk.bind(("127.0.0.1",9090))
sk.listen()
while 1:
conn,addr = sk.accept()
gevent.spawn(func,conn)
客戶端 :
import socket
from threading import thread
def client():
sk = socket.socket()
sk.connect(("127.0.0.1",9090))
while 1:
print(sk.recv(1024))
sk.send(b"bye")
for i in range(20):
thread(target=client).start()
4c 併發50000 qps
5個程序
20個執行緒
500個協程
協程能夠在單核的情況極大地提高cpu的利用率
執行緒的切換 :
時間片到了 降低了cpu的效率
io會切 提高了cpu的效率
執行緒佇列,執行緒池,協程
執行緒的queue,類似於程序 作用也是類似,queue n 規範放入值的數量 這個和之前一樣是為了實現先進先出 import queue q queue.queue 2 括號內可加入數字規範放入值的數量,不加則不會規範 q.put 123 q.put qweqwe q.put 111 print ...
python執行緒池 協程
python的執行緒雖然是真正的執行緒,但直譯器執行 時,有乙個gil鎖 global interpreter lock,任何python執行緒執行前,必須先獲得gil鎖,然後,每執行100條位元組碼,直譯器就自動釋放gil鎖,讓別的執行緒有機會執行。這個gil全域性鎖實際上把所有執行緒的執行 都給...
併發程式設計 執行緒池程序池協程
1.socket服務端實現併發 現在回來想網路程式設計服務端需要滿足哪幾點需求 2.程序池執行緒池介紹 執行緒不可能無限制的開下去,總要消耗和占用資源 程序池執行緒池概念 硬體有極限,為了減輕硬體壓力,所以有了池的概念 注意協程這個概念完全是程式設計師自己想出來的東西,它對於作業系統來說根本不存在。...