在軟體開發的過程中,經常碰到這樣的場景:
某些模組負責生產資料,這些資料由其他模組來負責處理(此處的模組可能是:函式、執行緒、程序等)。
1. 產生資料的模組稱為生產者,
2. 處理資料的模組稱為消費者。
3. 在生產者與消費者之間的緩衝區稱之為倉庫。
4. 生產者負責往倉庫運輸商品,而消費者負責從倉庫裡取出商品,這就構成了生產者消費者模式。
結構圖如下:
1. 你把信寫好——>> 相當於生產者生產資料。
2. 你把信放入郵箱——>> 相當於生產者把資料放入緩衝區
3. 郵遞員把信從郵箱取出,做相應處理——>> 相當於消費者把資料取出緩衝區,處理資料
舉個例子,我們去郵局投遞信件,如果不使用郵箱(也就是緩衝區),你必須得把信直接交給郵遞員。有同學會說,直接給郵遞員不是挺簡單的嘛?其實不簡單,你必須 得認識誰是郵遞員,才能把信給他。這就產生了你和郵遞員之間的依賴(相當於生產者和消費者的強耦合)。萬一哪天郵遞員 換人了,你還要重新認識一下(相當於消費者變化導致修改生產者**)。而郵箱相對來說比較固定,你依賴它的成本就比較低(相當於和緩衝區之間的弱耦合)。
繼續上面的例子,如果我們不使用郵箱,就得在郵局等郵遞員,直到他回來,把信件交給他,這期間我們啥事兒都不能幹(也就是生產者阻塞)。或者郵遞員得挨家挨戶問,誰要寄信(相當於消費者輪詢)。
我們再拿寄信的例子,假設郵遞員一次只能帶走1000封信,萬一碰上情人節(或是聖誕節)送賀卡,需要寄出去的信超過了1000封,這時候郵箱這個緩衝區就派上用場了。郵遞員把來不及帶走的信暫存在郵箱中,等下次過來時再拿走。通過上面的介紹大家應該已經明白了生產者消費者模式。
在實現生產者消費者模式之前,我們先學習下python中的多執行緒程式設計。
執行緒是作業系統直接支援的執行單元,高階語言通常都內建多執行緒的支援,python也不例外,並且python的執行緒是真正的posix thread,而不是模擬出來的執行緒。
python的標準庫提供了兩個模組:_thread和threading,_thread是低階模組,threading是高階模組,對_thread進行了封裝。絕大多數情況下,我們只需要使用threading這個高階模組。
下面我們先看一段在python中實現多執行緒的**。
import time,threading
#執行緒**
class taskthread(threading.thread):
def __init__(self,name):
threading.thread.__init__(self,name=name)
def run(self):
print('thread %s is running...' % self.getname())
for i in range(6):
print('thread %s >>> %s' % (self.getname(), i))
time.sleep(1)
print('thread %s finished.' % self.getname())
taskthread = taskthread('taskthread')
taskthread.start()
taskthread.join()
下面是程式的執行結果:thread taskthread is running...
thread taskthread >>> 0
thread taskthread >>> 1
thread taskthread >>> 2
thread taskthread >>> 3
thread taskthread >>> 4
thread taskthread >>> 5
thread taskthread finished.
taskthread類繼承自threading模組中的thread執行緒類。建構函式的name引數指定執行緒的名字,通過過載基類run函式實現具體任務。在簡單熟悉了python的執行緒後,下面我們實現乙個生產者消費者模shi。
from queue import queue
import random,threading,time
#生產者類
class producer(threading.thread):
def __init__(self, name,queue):
threading.thread.__init__(self, name=name)
self.data=queue
def run(self):
for i in range(5):
print("%s is producing %d to the queue!" % (self.getname(), i))
self.data.put(i)
time.sleep(random.randrange(10)/5)
print("%s finished!" % self.getname())
#消費者類
class consumer(threading.thread):
def __init__(self,name,queue):
threading.thread.__init__(self,name=name)
self.data=queue
def run(self):
for i in range(5):
val = self.data.get()
print("%s is consuming. %d in the queue is consumed!" % (self.getname(),val))
time.sleep(random.randrange(10))
print("%s finished!" % self.getname())
def main():
queue = queue()
producer = producer('producer',queue)
consumer = consumer('consumer',queue)
producer.start()
consumer.start()
producer.join()
consumer.join()
print 'all threads finished!'
if __name__ == '__main__':
main()
執行結果可能如下:
producer is producing 0 to the queue!
consumer is consuming. 0 in the queue is consumed!
producer is producing 1 to the queue!
producer is producing 2 to the queue!
consumer is consuming. 1 in the queue is consumed!
consumer is consuming. 2 in the queue is consumed!
producer is producing 3 to the queue!
producer is producing 4 to the queue!
producer finished!
consumer is consuming. 3 in the queue is consumed!
consumer is consuming. 4 in the queue is consumed!
consumer finished!
all threads finished!
因為多執行緒是搶占式執行的,所以列印出的執行結果不一定和上面的完全一致。本例通過python實現了乙個簡單的生產者消費者模型。python中的queue模組已經提供了對執行緒同步的支援,所以本文並沒有涉及鎖、同步、死鎖等多執行緒問題。
python 生產者 消費者
from bs4 import beautifulsoup import requests import time import multiprocessing as mp import re from multiprocessing import queue from multiprocessin...
生產者消費者 生產者與消費者模式
一 什麼是生產者與消費者模式 其實生產者與消費者模式就是乙個多執行緒併發協作的模式,在這個模式中呢,一部分執行緒被用於去生產資料,另一部分執行緒去處理資料,於是便有了形象的生產者與消費者了。而為了更好的優化生產者與消費者的關係,便設立乙個緩衝區,也就相當於乙個資料倉儲,當生產者生產資料時鎖住倉庫,不...
redis stream 實現生產者消費者模式
test public void producer throws interruptedexception test public void consumer1 throws interruptedexception list msg jedis.xreadgroup groupname,consu...