生產者與消費者的協調問題

2021-08-31 01:46:09 字數 3870 閱讀 2178

生產者-消費者(producer-consumer)問題,也稱作有界緩衝區(bounded-buffer)問題,兩個程序共享乙個公共的固定大小的緩衝區。其中乙個是生產者,用於將訊息放入緩衝區;另外乙個是消費者,用於從緩衝區中取出訊息。問題出現在當緩衝區已經滿了,而此時生產者還想向其中放入乙個新的資料項的情形,其解決方法是讓生產者此時進行休眠,等待消費者從緩衝區中取走了乙個或者多個資料後再去喚醒它。同樣地,當緩衝區已經空了,而消費者還想去取訊息,此時也可以讓消費者進行休眠,等待生產者放入乙個或者多個資料時再喚醒它。

聽起來好像蠻對的,無懈可擊似的,但其實在實現時會有乙個競爭條件存在的。為了跟蹤緩衝區中的訊息數目,需要乙個變數 count。如果緩衝區最多存放 n 個訊息,則生產者的**會首先檢查 count 是否達到 n,如果是,則生產者休眠;否則,生產者向緩衝區中放入乙個訊息,並增加 count 的值。

消費者的**也與此類似,首先檢測 count 是否為 0,如果是,則休眠;否則,從緩衝區中取出訊息並遞減 count 的值。同時,每個程序也需要檢查是否需要喚醒另乙個程序。**可能如下:

// 緩衝區大小

#define n 100

int count = 0; // 跟蹤緩衝區的記錄數

/* 生產者程序 */

void procedure(void)

insert_item(item); // 將新資料項放入緩衝區

count = count + 1; // 計數器加 1

if (count == 1) // 表明插入之前為空,}}

/* 消費者程序 */

void consumer(void)

item = remove_item(); // 從緩衝區中取出乙個資料項

count = count - 1; // 計數器減 1

if (count == n -1) // 緩衝區有空槽

consume_item(item); // 列印出資料項}}

看上去很美,**出了問題,這裡對 count 的訪問是有可能出現競爭條件的:緩衝區為空,消費者剛剛讀取 count 的值為 0,而此時排程程式決定暫停消費者並啟動執行生產者。生產者向緩衝區中加入乙個資料項,count 加 1。現在 count 的值變成了 1.它推斷剛才 count 為 0,所以此時消費者一定在休眠,於是生產者開始呼叫 wakeup(consumer) 來喚醒消費者。但是,此時消費者在邏輯上並沒有休眠,所以 wakeup 訊號就丟失了。當消費者下次執行時,它將測試先前讀到的 count 值,發現為 0(注意,其實這個時刻 count 已經為 1 了),於是開始休眠(邏輯上)。而生產者下次執行的時候,count 會繼續遞增,並且不會喚醒 consumer 了,所以遲早會填滿緩衝區的,然後生產者也休眠,這樣兩個程序就都永遠的休眠下去了。

1,使用訊號量解決生產者-消費者問題

首先了解一下訊號量吧,訊號量是 e.w.dijkstra 在 1965 年提出的一種方法,它是使用乙個整型變數來累計喚醒的次數,供以後使用。在他的建議中,引入了乙個新的變數型別,稱為訊號量(semaphore),乙個訊號量的取值可以為 0(表示沒有儲存下來的喚醒操作)或者為正值(表示有乙個或多個喚醒操作)。

並且設立了兩種操作:down 和 up(分別為一般化後的 sleep 和 wakeup,其實也是一般教科書上說的 p/v 向量)。對乙個訊號量執行 down 操作,表示檢查其值是否大於 0,如果該值大於 0,則將其值減 1(即用掉乙個儲存的喚醒訊號)並繼續;如果為 0,則程序休眠,而且此時 down 操作並未結束。另外,就是檢查數值,修改變數值以及可能發生的休眠操作都作為單一的,不可分割的 原子操作 來完成。

下面開始考慮用訊號量來解決生產者-消費者問題了,不過在此之前,再次分析一下這個問題的本質會更清晰點:問題的實質在於發給乙個(尚)未休眠程序(如上的消費者程序在只判斷了 count == 0 後即被排程出來,還未休眠)的 wakeup 訊號丟失(如上的生產者程序在判斷了 count == 1 後以為消費者程序休眠,而喚醒它)了。如果它沒有丟失,則一切都會很好。

#define n 100 // 緩衝區中的槽數目

typedef int semaphore; // 訊號量一般被定義為特殊的整型資料

semaphore mutex = 1; // 控制對臨界區的訪問

semaphore empty = n; // 計數緩衝區中的空槽數目

semaphore full = 0; // 計數緩衝區中的滿槽數目

/* 生產者程序 */

void proceducer(void)

}/* 消費者程序 */

void consumer(voi)

}該解決方案使用了三個訊號量:乙個為 full,用來記錄充滿的緩衝槽的數目,乙個為 empty,記錄空的緩衝槽總數,乙個為 mutex,用來確保生產者和消費者不會同時訪問緩衝區。mutex 的初始值為 1,供兩個或者多個程序使用的訊號量,保證同乙個時刻只有乙個程序可以進入臨界區,稱為二元訊號量(binary semaphore)。如果每乙個程序在進入臨界區前都執行乙個 down(...),在剛剛退出臨界區時執行乙個 up(...),就能夠實現互斥。

另外,通常是將 down 和 up 操作作為系統呼叫來實現,而且 os 只需要在執行以下操作時暫時禁止全部中斷:測試訊號量,更新訊號量以及在需要時使某個程序休眠。

這裡使用了三個訊號量,但是它們的目的卻不相同,其中 full 和 empty 用來同步(synchronization),而 mutex 用來實現互斥。

2,使用訊息傳遞解決生產者-消費者問題

這種 ipc 方式使用兩條原語 send 和 receive,也是系統呼叫。如:

send(dest, &msg) // 將訊息 msg 傳送到目標(程序)dest 中

receive(src, &msg) // 接收由 src 過來的 msg,如果沒有訊息可用,則可能阻塞接收者

訊息傳遞系統會面臨位於網路中不同機器上的通訊程序的情形,所以會更加的複雜。如:訊息可能被網路丟失,一般使用確認(ack)訊息。如果傳送方在一定的時間段內沒有收到確認訊息,則重發訊息。

如果訊息本身被正確接收,但是返回的 ack 訊息丟失,傳送方則重發訊息,這樣接收方就會收到兩份同樣的訊息。一般使用在每條原始訊息的頭部嵌入乙個連續的序號來解決這個問題。

另外,訊息傳遞系統還需要解決程序命名的問題,在 send 和 receive 系統呼叫中指定的程序必須沒有二義性的。還有其他的一些問題,如效能問題,身份認證等等,不過那個就會扯多了,還是看看如果解決這個生產者-消費者的問題吧:

#define n 100 // 緩衝區中的槽數目

/* 生產者程序 */

void proceducer(void)

}/* 消費者程序 */

void consumer(voi)

}在這個解決方案中,共使用了 n 條訊息,有點類似於上乙個的共享記憶體緩衝區的 n 個槽,消費者程序這邊首先通過乙個 for 迴圈將 n條空訊息傳送給生產者。當生產者向消費者傳遞乙個資料項時,是通過取走每一條接收到的空訊息,然後送回填充了內容的訊息給消費者的。通過這種方式,整個訊息傳遞系統中的總的訊息數(包括空的訊息 + 存了資料項的訊息 == n)是不變的。

如果執行過程中,生產者程序的速度比消費者快,則所有的訊息最終都會塞滿,然後生產者程序就會等待消費者(即使呼叫 procedure 也是阻塞在 receive 處),直到消費者返回一條空的訊息;反之亦然。

下面再來看一下訊息傳遞方式的兩種變體。一種是:為每乙個程序分配乙個唯一的位址,讓訊息按照這個程序的位址進行編址。也就是 send 和 receive 呼叫的第乙個引數指定為具體的程序位址。另一種是:引入信箱(mailbox,現在正在做的乙個專案,就是這種方式),可以信箱就像乙個盒子,裡面裝了很多的信件,這個信件就是我們要傳遞的訊息,當然信箱是有容量限制的(現在 yahoo 好像推出無限容量的,呵呵)。當使用信箱時,send 和 receive 系統呼叫中的位址引數就是信箱的位址,而不是程序的位址。當乙個程序嘗試向乙個容量爆滿的信箱傳送訊息時,它將會被掛起,直到信箱中有訊息被取走。

生產者消費者 生產者與消費者模式

一 什麼是生產者與消費者模式 其實生產者與消費者模式就是乙個多執行緒併發協作的模式,在這個模式中呢,一部分執行緒被用於去生產資料,另一部分執行緒去處理資料,於是便有了形象的生產者與消費者了。而為了更好的優化生產者與消費者的關係,便設立乙個緩衝區,也就相當於乙個資料倉儲,當生產者生產資料時鎖住倉庫,不...

生產者與消費者問題

使用synchronized同步鎖機制,執行緒先獲得物件的鎖,先上鎖後執行執行緒內容,執行完成後釋放鎖。使用wait 和notifyall 簡單實現生產者與消費者 public class test1 class producer implements runnablecatch interrupt...

生產者與消費者問題

知識點 生產者與消費者問題 涉及到的執行緒間通訊的方法 wait 當前執行緒掛起並放棄cpu,同步資源,使別的執行緒可訪問並修改共享資源,當前執行緒排隊等候再次對資源訪問 notify 喚醒正在排隊等待同步資源的執行緒中優先順序最高者結束等待 notifyall 喚醒正在排隊等待資源的所有執行緒結束...