多執行緒 併發工具類之CyclicBarrier詳解

2021-10-02 05:21:51 字數 4849 閱讀 2888

從字面意思理解,cyclicbarrier是回環屏障的意思,它可以讓一組執行緒全部達到乙個狀態後再全部同時執行。這裡之所以叫作回環是因為當所有等待執行緒執行完畢,並重置cyclicbarrier 的狀態後它可以被重用。之所以叫作屏障是因為執行緒呼叫await 方法後就會被阻塞,這個阻塞點就稱為屏障點,等所有執行緒都呼叫了await方法後,執行緒們就會衝破屏障,繼續向下執行。

cyclicbarrier是一種同步輔助工具,允許一組執行緒相互等待,直到達到共同的障礙點.

經常用於一組固定數量的執行緒必須相互等待的程式.

假如計數器值為n,那麼隨後呼叫await方法的n-1個執行緒都會因為到達屏障點而被阻塞,當第n個執行緒呼叫await後,計數器值為 0了,這時候第n個執行緒才會發出通知喚醒前面的n-1個執行緒。也就是當全部執行緒都到達屏障點時才能一塊繼續向下執行。

執行緒進入屏障通過cyclicbarrier的await()方法。

cyclicbarrier例項是可重複使用的:所有等待執行緒被喚醒的時候,任何執行緒再次執行cyclicbarrier.await()又會被暫停,直到這些執行緒中的最後乙個執行緒執行了cyclicbarrier.await().

如下例子,新建10個執行緒,直到10個執行緒都呼叫了await方法,即都到達屏障點後,就呼叫cyclicbarrier初始化時定義的方法(召喚神龍).

public static void main(string args) throws interruptedexception );

for (int i = 0; i < 10; i++) catch (exception e)

}, thread.currentthread().getname()+":"+i).start();

}

如下例子:假設乙個任務由階段1、階段2和階段3組成,每個執行緒要序列地執行階段1、階段2和階段3,當多個執行緒執行該任務時,必須要保證所有執行緒的階段1全部完成後才能進入階段2執行,當所有執行緒的階段2全部完成後才能進入階段3執行。該例子利用了cyclicbarrier的可復用性.

public static void main(string args) throws exception  catch (exception e) 

});}

executorservice.shutdown();

}輸出結果:

pool-1-thread-1 step1

pool-1-thread-3 step1

pool-1-thread-2 step1

pool-1-thread-2 step2

pool-1-thread-1 step2

pool-1-thread-3 step2

pool-1-thread-3 step3

pool-1-thread-1 step3

pool-1-thread-2 step3

在如上**中,每個子執行緒在執行完階段1後都呼叫了await方法,等到所有執行緒都到達屏障點後才會一塊往下執行,這就保證了所有執行緒都完成了階段 1後才會開始執行階段2。然後在階段 2後面呼叫了await方法,這保證了所有執行緒都完成了階段2後 ,才能開始階段3的執行。這個功能使用單個countdownlatch是無法完成的 。

private static class generation 

/** the lock for guarding barrier entry */

private final reentrantlock lock = new reentrantlock();

/** condition to wait on until tripped */

private final condition trip = lock.newcondition();

/** the number of parties */

private final int parties;

/* the command to run when tripped */

private final runnable barriercommand;

/** the current generation */

private generation generation = new generation();

/** * number of parties still waiting. counts down from parties to 0

* on each generation. it is reset to parties on each new

* generation or when broken.

*/private int count;

cyclicbarrier基於獨佔鎖實現,本質底層還是基於aqs的。

parties用來記錄執行緒個數,這裡表示多少執行緒呼叫await後,所有執行緒才會衝破屏障繼續往下執行。而count一開始等於parties,每當有執行緒呼叫await方法就遞減1,當count為0時就表示所有執行緒都到了屏障點。

你可能會疑惑,為何維護parties和count兩個變數,只使用

count不就可以了?另外別忘了cycliebarrier是可以被復用的,使用兩個變數的原因是,parties始終用來記錄總的執行緒個數,當count計數器值變為0後,會將parties的值賦給count,從而進行復用。這兩個變數是在構造cyclicbarrier物件時傳遞的.如下所示:

public cyclicbarrier(int parties, runnable barrieraction)
還有乙個變數barriercommand也通過建構函式傳遞,這是乙個任務,這個任務的執行時機是當所有執行緒都到達屏障點後。使用lock首先保證了更新計數器count的原子性。另外使用lock 的條件變數trip支援執行緒間使用await和signal操作進行同步。

最後,在變數generation內部有乙個變數broken,其用來記錄當前屏障是否被打破。注意,這裡的broken並沒有被宣告為volatile的,因為是在鎖內使用變數,所以不需要宣告。

private static class generation
幾個重要方法

int await()方法

當前執行緒呼叫cyclicbarrier的該方法時會被阻塞,直到滿足 下面條件之一才會返回:

由如下**可知,在內部呼叫了dowait方法。第乙個引數為false,則說明不設定超時時間,這時候第二個引數沒有意義。

public int await() throws interruptedexception, brokenbarrierexception  catch (timeoutexception toe) 

}

int dowait(boolean timed, long nanos)方法

該方法實現了cyclicbarrier的核心功能,其**如下:

private int dowait(boolean timed, long nanos)

throws interruptedexception, brokenbarrierexception,

timeoutexception

//(1)如果index==o則說明所有執行緒都到了屏障點,此時執行初始化時傳遞的任務

int index = --count;

if (index == 0) finally

}// loop until tripped, broken, interrupted, or timed out

//(4)如果index不為0

for (;;) catch (interruptedexception ie) else

}if (g.broken)

throw new brokenbarrierexception();

if (g != generation)

return index;

if (timed && nanos <= 0l)

}} finally

}private void nextgeneration()

當乙個執行緒呼叫了dowait方法後,首先會獲取獨佔鎖lock,如果建立cyclebarrier時傳遞的引數為10,那麼後面9個呼叫錢程會被阻塞。然後當前獲取到鎖的執行緒會對計數器count進行遞減操作,遞減後count=index=9,因為index!=o所以當前執行緒會執行**(4)。如果當前執行緒呼叫的是無引數的await() 方法 ,則這裡timed=false,所以當前執行緒會被放入條件變數 的trip的條件阻塞佇列,當前執行緒會被掛起並釋放獲取的lock 鎖。如果呼叫的是有引數的await方法則timed=true,然後當前執行緒也會被放入條件變數的條件佇列並釋放鎖資源,不同的是當前執行緒會在指定時間超時後自動被啟用。

當第乙個獲取鎖的執行緒由於被阻塞釋放鎖後,被阻塞的9個執行緒中有乙個會競爭到lock鎖,然後執行與第乙個執行緒同樣的操作,直到最後乙個執行緒獲取到lock鎖,此時己經有9個執行緒被放入了條件變數trip的條件佇列裡面。最後count=index等於 0,所以執行**(2),如果建立cyclicbarrier時傳遞了任務,則在其他執行緒被喚醒前先執行任務,任務執行完畢後再執行**(3),喚醒其他9個執行緒,並重置 cyclicbarrier,然後這 10個執行緒就可以繼續向下執行了。

cyclebarrier與countdownlatch的不同在於,前者是可以復用 的,並且前者特別適合分段任務有序執行的場景。

cyclebarrier其底層通過獨佔鎖reentrantlock實現計數器原子性更新,並使用條件變數佇列來實現執行緒同步。cyclicbarrier內部使用了乙個條件變數trip來實現等待/通知.使用了分代(generation)的概念用於表示cyclicbarrier例項是可以重複使用的.

多執行緒併發工具類

一 fork join 什麼是分而治之?規模為n的問題,n 閾值,直接解決,n 閾值,將n分解為k個小規模子問題,子問題互相對立,與原問題形式相同,將子問題的解合併得到原問題的解。fork join使用兩個類來完成以上兩件事情 forkjointask 我們要使用forkjoin框架,必須首先建立乙...

多執行緒併發工具類

countdownlatch,cyclicbarrier,semaphore,exchanger countdownlatch,用於乙個或多個執行緒等待其他執行緒完成操作。構造器中的計數值 count 實際上就是閉鎖需要等待的執行緒數量。這個值只能被設定一次,主線程在啟動其他執行緒後立即呼叫coun...

多執行緒 多執行緒之併發工具類

jdk 1.5 後,為我們提供的併發工具類有 名稱描述詳細 countdownlatch同步計數器 初始化時,傳入需要計數的執行緒等待數,並用 await 阻塞當前執行緒,其他執行緒中可以呼叫 countdown 方法讓計數器減一,當計數器為 0 時,則放行 cyclicbarrier柵欄 讓一組執...