Work Stealing Pool執行緒池

2021-10-24 11:41:35 字數 4634 閱讀 1526

work-stealing演算法的理念在於讓空閒的執行緒從忙碌的執行緒的雙端佇列中偷取任務.

預設情況下, 乙個工作執行緒從它自己內部的雙端佇列的頭部獲取任務. 當執行緒的的佇列中沒有任務,它從另外的繁忙的執行緒的雙端佇列(或者全域性的雙端佇列)的尾部獲取任務,因為佇列的尾部是最有可能存在還未執行的任務.

這種方式減小了執行緒之間對任務的競爭的可能性,它也使得執行緒以最大可能性去獲取可執行的執行緒,因為它們總是在最有可能存在還未執行的任務的地方尋找任務.

一般來說實現乙個執行緒池主要包括以下幾個組成部分:

建立和啟動乙個task類,但是不同的是執行緒池中的每乙個執行緒都有乙個本地佇列。執行緒池通過乙個任務排程器來分配任務,當主程式建立了乙個task後,由於建立這個task的執行緒不是執行緒池中的執行緒,則任務排程器會把該task放入全域性佇列中。

下面的演示圖,task1和task2都是主程式建立的,因此都是放在全域性佇列中,當工作者執行緒處理task2時,建立了乙個task3,此時task3被放入本地佇列

為什麼要設計本地佇列?這樣做的優勢是充分利用並行。隨著越來越多執行緒競爭工作項,所有的執行緒訪問單一的佇列並不是最優的,並且也不安全。所以,將任務放入本地佇列,並且由同乙個執行緒處理,這就避免了競爭。

本地佇列中的task,執行緒會按照lifo的方式去處理。這是因為在大多數場景下,最後建立的task可能仍然在cache中,處理它能夠提供快取命中率。顯然這意味放棄部分公平性而保證效能。如下面的演示圖,

工作者執行緒1建立了task2,task2建立了task3,task4,task5,但最先處理的還是task5。

執行緒竊取work stealing

當 a執行緒開始執行的時候,優先總是處理本地佇列中的任務,當它發現本地佇列已經空了,那麼它會去全域性佇列中獲取task,當全域性佇列中也是空的,那麼就會發 生工作竊取(work stealing)。任務排程器會把該執行緒池中額外的任務分配給a執行緒處理,其效果就好比該執行緒會才從其他執行緒的佇列中「竊取」乙個task來執行。這樣的目的是提高了cpu的使用效率

/** * 建構函式,建立指定數量的執行緒

* @param n 指定的執行緒數

*/explicit

stealthreadpool

(size_t n)

:m_done

(false),

m_init_finished

(false),

m_worker_num

(n)}

catch(.

..) m_init_finished =

true;}

/** * 析構函式,等待並阻塞至執行緒池內所有任務完成

*/~stealthreadpool()

}/** 獲取工作執行緒數 */

size_t worker_num()

const

/** 先執行緒池提交任務後返回的對應 future 的型別 */

template

<

typename resulttype>

using task_handle = std::future

;/** 向執行緒池提交任務 */

template

<

typename functiontype>

task_handle<

typename std::result_of<

functiontype()

>

::type>

submit

(functiontype f)

else

m_cv.

notify_one()

;return res;

}/** 返回執行緒池結束狀態 */

bool

done()

const

/** * 等待各執行緒完成當前執行的任務後立即結束退出

*/void

stop()

m_cv.

notify_all()

;// 喚醒所有工作執行緒

for(size_t i =

0; i < m_worker_num; i++)}

}/**

* 等待並阻塞至執行緒池內所有任務完成

* @note 至此執行緒池能工作執行緒結束不可再使用

*/void

join()

// 喚醒所有工作執行緒

m_cv.

notify_all()

;// 等待執行緒結束

for(size_t i =

0; i < m_worker_num; i++)}

m_done =

true;}

private

: std::atomic_bool m_done;

// 執行緒池全域性需終止指示

bool m_init_finished;

// 執行緒池是否初始化完畢

size_t m_worker_num;

// 工作執行緒數量

std::condition_variable m_cv;

// 訊號量,無任務時阻塞執行緒並等待

std::mutex m_cv_mutex;

// 配合訊號量的互斥量

threadsafequeue m_master_work_queue;

// 主線程任務佇列

std::vector<:unique_ptr>

> m_queues;

// 任務佇列(每個工作執行緒乙個)

std::vector<:thread> m_threads;

// 工作執行緒

// 執行緒本地變數

inline

static

thread_local workstealqueue* m_local_work_queue =

nullptr

;// 本地任務佇列

inline

static

thread_local size_t m_index =0;

inline

static

thread_local

bool m_thread_need_stop =

false

;// 執行緒停止執行指示

void

worker_thread

(size_t index)

}void

run_pending_task()

elseif(

pop_task_from_master_queue

(task)

)else

}elseif(

pop_task_from_other_thread_queue

(task)

)else);

}}bool

pop_task_from_master_queue

(task_type& task)

bool

pop_task_from_local_queue

(task_type& task)

bool

pop_task_from_other_thread_queue

(task_type& task)

for(size_t i =

0; i < m_worker_num;

++i)

}return

false;}

};// namespace hku

}/* namespace hku */

#endif

/* hikyuu_utilities_thread_stealthreadpool_h */

references:[1].

[2].

[3]. github中work stealing pool原始碼實現

執行緒!執行緒!!執行緒!!! Delphi版

以下內容僅供初學者參考 看到有的同學對delphi的執行緒認識不夠深,特開一貼給同學們講講。主要給出兩種常用的執行緒形式。1 長等待型執行緒示例,等待命令,執行不定長的工作,但每個工作的時間不會太長。2 長工作型執行緒示例,執行乙個很長時間的工作,但可以很快響應取消操作。以下程式所用的知識為 訊息機...

執行緒 執行緒控制代碼 執行緒ID

什麼是控制代碼 控制代碼是一種指向指標的指標。我們知道,所謂指標是一種記憶體位址。應用程式啟動後,組成這個程式的各物件是住留在記憶體的。如果簡單地理解,似乎我們只要獲知這個記憶體的首位址,那麼就可以隨時用這個位址訪問物件。但是,如果您真的這樣認為,那麼您就大錯特錯了。我們知道,windows是乙個以...

執行緒 執行緒控制代碼 執行緒ID

什麼是控制代碼 控制代碼是一種指向指標的指標。我們知道,所謂指標是一種記憶體位址。應用程式啟動後,組成這個程式的各物件是住留在記憶體的。如果簡單地理解,似乎我們只要獲知這個記憶體的首位址,那麼就可以隨時用這個位址訪問物件。但是,如果您真的這樣認為,那麼您就大錯特錯了。我們知道,windows是乙個以...