在併發程式設計領域,乙個非常讓程式設計師興奮,感到有成就感的事情就是做效能優化,譬如發現某個執行緒成為了單點瓶頸,然後上多執行緒。
提到了上多執行緒,那自然就會引入 thread pool,也就是我們通常說的執行緒池,我們會將任務扔給執行緒池,然後執行緒池裡面自己會負責將任務派發到不同的執行緒去執行,除開任務自身執行的開銷,如何高效的派發也會決定乙個執行緒池是否有足夠好的效能。下面,我們就來聊聊幾種常見的執行緒池的實現。
在 rust 裡面,我們可以通過標準庫提供的 channel 進行通訊,但 channel 其實是乙個 multi-producer,single-consumer 的結構,也就是我們俗稱的 mpsc。但對於執行緒池來說,我們需要的是乙個 mpmc 的 channel,也就是說,我們需要有乙個佇列,這個佇列可以支援多個執行緒同時新增,同時獲取任務。
雖然單獨的 channel 沒法支援,但如果我們給 channel 的 receiver 套上乙個 mutex,在加上 arc,其實就可以了。通過 mutex 我們能保證多個執行緒同時只能有乙個執行緒搶到 lock,然後從佇列裡面拿到資料。而加上 arc 主要是能在多個執行緒共享了,這裡就不說明了。
所以實現也就比較簡單了,如下:
pub struct threadpool
impl threadpool
});handlers.push(handle);
}threadpool
}}
task 其實就是乙個 fnbox,因為只有 nightly 版本支援 fnbox,所以我們自定義了一下
pub trait fnbox
impl: fnonce()> fnbox for f
}pub type task = box+ send>;
上面的**邏輯非常的簡單,建立乙個 channel,然後使用 arc + mutex 包上 receiver,建立多個執行緒,每個執行緒嘗試去獲取 channel 任務然後執行,如果 channel 裡面沒任務,recv **就會等著,而其他的執行緒這時候因為沒法拿到 lock 也會等著。
拋開 channel,我們還有一種更通用的做法,可以用在不同的語言,譬如 c 上面,也就是使用 condition variable。關於 condition variable 的使用,大家可以 google,因為在使用 condition variable 的時候,都會配套有乙個 mutex,所以我們可以通過這個 mutex 同時控制 condition variable 以及任務佇列。
首先我們定義乙個 state,用來處理任務佇列
struct state
對於不同執行緒獲取任務,我們可以通過
fn next_task(notifer: &arc<(mutex>, condvar)>) -> option>
match state.queue.pop_front()
none => }}
}
首先就是嘗試用 mutex 拿到 state,如果外面沒有結束,那麼就嘗試從佇列裡面獲取任務,如果沒有,就呼叫 condition variable 的 wait 進行等待了。
任務的新增也比較簡單
let &(ref lock, ref cvar) = &*self.notifer;
也是通過 lock 拿到 state,然後放到佇列裡面,在通知 condition variable。對於執行緒池的建立,也是比較容易的:
let s = state ;
let notifer = arc::new((mutex::new(s), condvar::new()));
for _ in 0..number
});handlers.push(handle);
}
上面提到的兩種做法,雖然都非常的通用,但有乙個明顯的問題,就在於他是有全域性 lock 的,在併發系統裡面,lock 如果使用不當,會造成非常嚴重的效能開銷,尤其是在出現 contention 的時候,所以多數時候,我們希望使用的是乙個 lock-free 的資料結構。
幸運的是,在 rust 裡面,已經有乙個非常穩定的庫來提供相關的支援了,這個就是 crossbeam,關於 crossbeam 的相關知識,後面可以再開一篇文章來詳細說明,這裡我們直接使用 crossbeam 的 channel,不同於標準庫的 channel,crossbeam 的 channel 是乙個 mpmc 的實現,所以我們能非常方便的用到執行緒池上面,簡單**如下:
let (tx, rx) = channel::unbounded::>();
let mut handlers = vec!;
for _ in 0..number
});handlers.push(handle);
}
可以看到,crossbeam 的 channel 使用比標準庫的更簡單,它甚至不需要 arc 來包一層,而且還是 lock-free 的。
參考這個 benchmark,分別對不同的 threadpool 進行測試,在我的機器上面會發現 crossbeam 的效能會明顯好很多,標準庫 channel 其次,最後才是 condition variable。
test thread_pool::benchmark_condvar_thread_pool ... bench: 128,924,340 ns/iter (+/- 39,853,735)
test thread_pool::benchmark_crossbeam_channel_thread_pool ... bench: 1,497,272 ns/iter (+/- 355,120)
test thread_pool::benchmark_std_channel_thread_pool ... bench: 50,925,087 ns/iter (+/- 6,753,377)
可以看到,使用 crossbeam 的效果已經非常好了,但這種實現其實還有乙個問題,主要在於它有乙個全域性的佇列,當併發嚴重的時候,多個執行緒對這個全域性佇列的爭搶,可能成為瓶頸。另外,還有乙個問題在於,它的派發機制是任意的,也就是那個執行緒搶到了任務就執行,在某些時候,我們希望一些任務其實是在某個執行緒上面執行的,這樣對於 cpu 的 cache 來說會更加友好,譬如有乙個任務在執行的時候,又會產生乙個後續任務,自然,我們希望這個後續任務在同乙個執行緒執行。
為了解決上面的問題,最直觀的做法就是每個執行緒乙個佇列,這樣我們就能夠顯示的控制任務派發了。乙個非常簡單的例子
let mut handlers = vec!;
let mut txs = vec!;
for _ in 0..number
});txs.push(tx);
handlers.push(handle);
}
上面我們為每個執行緒建立了乙個 channel,這樣每個執行緒就不用去爭搶全域性的 channel 了。
派發的時候我們也可以手動派發,譬如根據某個 id hash 到乙個對應的 thread 上面,通過 sender 傳送 訊息。
雖然每個執行緒乙個 channel 解決了全域性爭搶問題,也提公升了 cpu cache 的使用,但它引入了另乙個問題,就是任務的不均衡。直觀的來說,就是會導致某些執行緒一直忙碌,在不斷的處理任務,而另一些執行緒則沒有任務處理,一直很閒。為了解決這個問題,就有了 work stealing 的執行緒池。
work stealing 的原理其實很簡單,當乙個執行緒執行完自己執行緒佇列裡面的所有任務之後,它會嘗試去其它執行緒的佇列裡面偷一點任務執行。
因為 work stealing 的實現過於複雜,這裡就不描述了,rust 的 tokio 庫提供了乙個 tokio-threadpool,就是基於 work stealing 來做的,不過現在只提供了 future 的支援。
上面簡單的列舉了一些執行緒池的實現方式,如果你只是單純的想用乙個比較簡單的派發功能,基於 crossbeam 的就可以了,複雜一點的可以使用 work stealing 的。當然,這裡只是大概列舉了一些,如果有更好的實現,麻煩跟我聯絡討論,我的郵箱 [email protected]。
Rust的併發程式設計(一)多程序併發
併發,是指在巨集觀意義上同一時間處理多個任務。併發的方式一般包含為三種 多程序 多執行緒以及最近幾年剛剛火起來的協程。首先,我們建立兩個專案,乙個為子程序,乙個為主程序。在子程序的main.rs中,編寫如下 use std thread sleep use std time duration fn ...
Rust中堆疊與併發
對不同語言,堆和棧是相通的,只不過,這裡是以rust語言為例,來理解堆和棧。巨集觀上講,這兩個術語是關於記憶體管理的。棧和堆是幫助你決定何時分配和釋放記憶體的抽象 概念 首先先列出堆與棧的不同之處,沒有列全 舉個例子 fn main fn main box可以理解為裝箱吧,參考 乙個引用的值僅僅是乙...
Rust 通用程式設計概念
變數 基本型別 函式 控制流 rust中的變數預設是不可變的,這樣是為了能夠讓你安全並且方便地寫出複雜 甚至並行的 當乙個變數是不可變時,一旦它繫結到了某個值上面,這個值就再也無法被改變了。1 let x 5 m,2 println the value of x is x 3 x 6 4 canno...