生產者消費者執行緒在Queue中實現多執行緒同步

2022-04-29 04:54:10 字數 4569 閱讀 2099

使用c#進行多執行緒程式設計經常會用佇列池進行執行緒同步的方法,實現就用到queue。queue是執行緒安全的(thread safe),但不是泛型的,物件出列時需要進行拆箱轉換。也有人會馬上想到

queue,但

可惜的是泛型queue卻不是執行緒安全,我們需要用其它程式設計方法來實現它。

下面介紹一種方法,它能夠使用泛型queue進行執行緒同步,但是需要用到lock關鍵字以及autoreseteventmanualresetevent類對主線程和兩個輔助線程進行執行緒同步。

該示例建立兩個輔助線程。乙個執行緒生成元素並將它們儲存在非執行緒安全的泛型佇列中。另乙個執行緒使用此佇列中的項。另外,主線程定期顯示佇列的內容,以便該佇列可由三個執行緒進行訪問。lock 關鍵字用於同步對佇列的訪問,以確保佇列的狀態不會被破壞。

除了只是使用 lock 關鍵字來防止同時訪問以外,還可以用兩個事件物件提供進一步的同步。乙個事件物件用來通知輔助線程終止,另乙個事件物件由製造者執行緒用來在有新項新增到隊 列中時通知使用者執行緒。這兩個事件物件封裝在乙個名為 syncevents 的類中。這使事件可以輕鬆傳遞給表示製造者執行緒和使用者執行緒的物件。syncevents 類按如下方式定義:

c#

public

class syncevents

public eventwaithandle exitthreadevent

}public eventwaithandle newitemevent

}public waithandle eventarray

}private eventwaithandle _newitemevent;

private eventwaithandle _exitthreadevent;

private waithandle _eventarray;

}

對「新項」事件使用autoresetevent類,因為您希望每當使用者執行緒響應此事件後,此事件都能自動重置。或者,將manualresetevent類用於「退出」事件,因為您希望當此事件終止時有多個執行緒響應。如果您改為使用 autoresetevent,則僅在乙個執行緒響應該事件以後,該事件就還原到非終止狀態。另乙個執行緒不會響應,因此在這種情況下,將無法終止。

syncevents 類建立兩個事件,並將它們以兩種不同的形式儲存:一種是作為eventwaithandle(它是 autoresetevent 和 manualresetevent 的基類),一種是作為基於waithandle的陣列。如關於使用者執行緒的討論中所述,此陣列是必需的,因為它使使用者執行緒可以響應兩個事件中的任何乙個。

使用者執行緒和製造者執行緒分別由名為 consumer 和 producer 的類表示。這兩個類都定義了乙個名為 threadrun 的方法。這些方法用作 main 方法建立的輔助線程的入口點。

producer 類定義的 threadrun 方法如下所示:

c#

// producer.threadrun

public

void threadrun()}}

console.writeline("producer thread: produced items"

, count);

}

此方法一直迴圈,直到「退出執行緒」事件變為終止狀態。此事件的狀態由waitone方法使用 syncevents 類定義的 exitthreadevent 屬性進行測試。在這種情況下,檢查該事件的狀態不會阻止當前執行緒,因為 waitone 使用的第乙個引數為零,這表示該方法應立即返回。如果 waitone 返回 true,則說明該事件當前處於終止狀態。如果是這樣,threadrun 方法將返回,其效果相當於終止執行此方法的輔助線程。

在「退出執行緒」事件終止前,producer.threadstart 方法將嘗試在佇列中保留 20 項。項只是 0 到 100 之間的乙個整數。在新增新項前,必須鎖定該集合,以防止使用者執行緒和主線程同時訪問該集合。這一點是使用 lock 關鍵字完成的。傳遞給 lock 的引數是通過 icollection 介面公開的 syncroot 字段。此欄位專門為同步執行緒訪問而提供。對該集合的獨佔訪問許可權被授予 lock 後面的**塊中包含的所有指令。對於製造者新增到佇列中的每個新項,都將呼叫「新項」事件的set

方法。這將通知使用者執行緒離開掛起狀態並開始處理新項。

consumer 物件還定義名為 threadrun 的方法。與製造者的 threadrun 類似,此方法由 main 方法建立的輔助線程執行。然而,使用者的 threadstart 必須響應兩個事件。consumer.threadrun 方法如下所示:

c#

// consumer.threadrun

public

void threadrun()

count++;

} console.writeline("consumer thread: consumed items"

, count);

}

此方法使用waitany來阻止使用者執行緒,直到所提供的陣列中的任意乙個等待控制代碼變為終止狀態。在這種情況下,陣列中有兩個控制代碼,乙個用來終止輔助線程,另乙個用來指示有新項新增到集合中。waitany 返回變為終止狀態的事件的索引。「新項」事件是陣列中的第乙個事件,因此索引零表示新項。在這種情況下,檢查索引 1(它指示「退出執行緒」事件),並使用它來確定此方法是否繼續使用項。如果「新項」事件處於終止狀態,您將通過 lock 獲得對集合的獨佔訪問許可權並使用新項。因為此示例生成並使用數千個項,所以不顯示使用的每個項,而是使用 main 定期顯示佇列中的內容,如下面所演示的那樣。

main 方法首先建立乙個佇列(該佇列的內容將被生成和使用)和 syncevents 的乙個例項(已在前面演示):

c#

queue queue = new queue();

syncevents syncevents = new syncevents();

然後,main 配置 producer 和 consumer 物件以供輔助線程使用。然而,此步驟並不建立或啟動實際的輔助線程:

c#

producer producer = new producer(queue, syncevents);

consumer consumer = new consumer(queue, syncevents);

thread producerthread = new thread(producer.threadrun);

thread consumerthread = new thread(consumer.threadrun);

請注意,佇列和同步事件物件作為構造函式引數同時傳遞給 consumer 和 producer 執行緒。這提供了兩個物件,它們具有執行各自任務所需的共享資源。然後建立兩個新的 thread 物件,並使用每個物件的 threadrun 方法作為引數。每個輔助線程在啟動時都將此引數用作執行緒的入口點。

接著,main 通過呼叫 start 方法來啟動兩個輔助線程,如下所示:

c#

producerthread.start();

consumerthread.start();

此時,建立了兩個新的輔助線程,它們獨立於當前正在執行 main 方法的主線程開始非同步執行過程。事實上,main 接下來要做的事情是通過呼叫 sleep 方法將主線程掛起。該方法將當前正在執行的執行緒掛起指定的時間(毫秒)。在此時間間隔過後,main 將重新啟用,這時它將顯示佇列的內容。main 重複此過程四次,如下所示:

c#

for (int i=0; i<4; i++)

最後,main 通過呼叫「退出執行緒」事件的 set 方法通知輔助線程終止,然後對每個輔助線程呼叫 join 方法以阻止主線程,直到每個輔助線程都響應該事件並終止。

有乙個執行緒同步的最終示例:showqueuecontents 方法。與製造者執行緒和使用者執行緒類似,此方法使用 lock 獲得對佇列的獨佔訪問許可權。然而在這種情況下,獨佔訪問非常重要,因為 showqueuecontents 對整個集合進行列舉。對集合進行列舉是乙個特別容易由於非同步操作而造成資料損壞的操作,因為它需要遍歷整個集合的內容。

請注意,showqueuecontents 是由主線程執行的,因為它被 main 呼叫。這意味著,當此方法獲得對項佇列的獨佔訪問許可權時,既阻止了製造者執行緒訪問佇列,也阻止了使用者執行緒訪問佇列。showqueuecontents 鎖定佇列並列舉其內容:

c#

private

static

void showqueuecontents(queue q)

", item);}}

console.writeline();

生產者消費者執行緒在Queue中實現多執行緒同步

使用c 進行多執行緒程式設計經常會用佇列池進行執行緒同步的方法,實現就用到queue。queue是執行緒安全的 thread safe 但不是泛型的,物件出列時需要進行拆箱轉換。也有人會馬上想到 queue,但 可惜的是泛型queue卻不是執行緒安全,我們需要用其它程式設計方法來實現它。下面介紹一種...

生產者消費者執行緒

include include include includeusing namespace std typedef int semaphore 訊號量是一種特殊的整型變數 const int size of buffer 5 緩衝區長度 const unsigned short producers...

生產者消費者執行緒

該簡單生產者 消費者執行緒,屬於本人學習過程中的一段練習 如有不足,請指點 package com.lanqiao.demo3 author 大廣子 類說明 簡單的生產者,消費者執行緒 public class threadptcs catch interruptedexception e 退出 s...