worker pool其實就是執行緒池thread pool。對於go來說,直接使用的是goroutine而非執行緒,不過這裡仍然以執行緒來解釋執行緒池。
只要有任務進來,就會放進任務佇列中。只要執行緒執行完了乙個任務,就將任務放進已完成任務佇列,有時候還會將任務的處理結果也放進已完成佇列中。
worker pool中包含了一堆的執行緒(worker,對go而言每個worker就是乙個goroutine),這些執行緒嗷嗷待哺,等待著為它們分配任務,或者自己去任務佇列中取任務。取得任務後更新任務佇列,然後執行任務,並將執行完成的任務放進已完成佇列。
下圖來自wiki:
在go中有兩種方式可以實現工作池:傳統的互斥鎖、channel。
假設go中的任務的定義形式為:
type task struct
每次有任務進來時,都將任務放在任務佇列中。
使用傳統的互斥鎖方式實現,任務佇列的定義結構大概如下:
type queue struct
然後在執行任務的函式中加上lock()和unlock()。例如:
func worker(queue *queue)
}
上面只是給出了一點主要的**段,要實現完整的執行緒池,還有很多額外的**。
通過互斥鎖,上面的一切操作都是執行緒安全的。但問題在於加鎖/解鎖的機制比較重量級,當worker(即goroutine)的數量足夠多,鎖機制的實現將出現瓶頸。
在go中,也能用buffered channel實現工作池。
示例**很長,所以這裡先拆分解釋每一部分,最後給出完整的**段。
在下面的示例中,每個worker的工作都是計算每個數值的位數相加之和。例如給定乙個數值234,worker則計算2+3+4=9
。這裡交給worker的數值是隨機生成的[0,999)範圍內的數值。
這個示例有幾個核心功能需要先解釋,也是通過channel實現執行緒池的一般功能:
首先,建立task和result兩個結構,並建立它們的通道:
type task struct
type result struct
var tasks = make(chan task, 10)
var results = make(chan result, 10)
這裡,每個task都有自己的id,以及該任務將要被worker計算的隨機數。每個result都包含了worker的計算結果result以及這個結果對應的task,這樣從result中就可以取出任務資訊以及計算結果。
另外,兩個通道都是buffered channel,容量都是10。每個worker都會監聽tasks通道,並取出其中的任務進行計算,然後將計算結果和任務自身放進results通道中。
然後是計算位數之和的函式process(),它將作為worker的工作任務之一。
func process(num int) int
time.sleep(2 * time.second)
return sum
}
這個計算過程其實很簡單,但隨後還睡眠了2秒,用來假裝執行乙個計算任務是需要一點時間的。
然後是worker(),它監聽tasks通道並取出任務進行計算,並將結果放進results通道。
func worker(wg *waitgroup)
results }}
上面的**很容易理解,只要tasks channel不關閉,就會一直監聽該channel。需要注意的是,該函式使用指標型別的*waitgroup
作為引數,不能直接使用值型別的waitgroup
作為引數,這樣會使得每個worker都有乙個自己的waitgroup。
然後是建立工作池的函式createworkerpool(),它有乙個數值引數,表示要建立多少個worker。
func createworkerpool(numofworkers int)
wg.wait()
close(results)
}
建立工作池時,首先建立乙個waitgroup的值wg,這個wg被工作池中的所有goroutine共享,每建立乙個goroutine都wg.add(1)。建立完所有的goroutine後等待所有的groutine都執行完它們的任務,只要有乙個任務還沒有執行完,這個函式就會被wait()阻塞。當所有任務都執行完成後,關閉results通道,因為沒有結果再需要向該通道寫了。
當然,這裡是否需要關閉results通道,是由稍後的range迭代這個通道決定的,不關閉這個通道會一直阻塞range,最終導致死鎖。
工作池部分已經完成了。現在需要使用allocate()函式分配任務:生成一大堆的隨機數,然後將task放進tasks通道。該函式有乙個代表建立任務數量的數值引數:
func allocate(numoftasks int)
tasks
}close(tasks)
}
注意,最後需要關閉tasks通道,因為所有任務都分配完之後,沒有任務再需要分配。當然,這裡之所以需要關閉tasks通道,是因為worker()中使用了range迭代tasks通道,如果不關閉這個通道,worker將在取完所有任務後一直阻塞,最終導致死鎖。
再接著的是取出results通道中的結果進行輸出,函式名為getresult():
func getresult(done chan bool)
done
}
getresult()中使用了乙個done引數,這個引數是乙個訊號通道,用來表示results中的所有結果都取出來並處理完成了,這個通道不一定要用bool型別,任何型別皆可,它不用來傳資料,僅用來返回可讀,所以上面直接close(done)的效果也一樣。通過下面的main()函式,就能理解done訊號通道的作用。
最後還差main()函式:
func main()
上面分配了20個worker,這20個worker總共需要處理的任務數量為100。但注意,無論是tasks還是results通道,容量都是10,意味著任務佇列最長只能是10個任務。
下面是完整的**段:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)type task struct
type result struct
var tasks = make(chan task, 10)
var results = make(chan result, 10)
func process(num int) int
time.sleep(2 * time.second)
return sum
}func worker(wg *sync.waitgroup)
results
}}func createworkerpool(numofworkers int)
wg.wait()
close(results)
}func allocate(numoftasks int)
tasks
}close(tasks)
}func getresult(done chan bool)
done
}func main()
執行結果:
task id 19, randnum 914 , sum 14
task id 9, randnum 150 , sum 6
task id 15, randnum 215 , sum 8
............
task id 97, randnum 315 , sum 9
task id 99, randnum 641 , sum 11
total time taken 10.0174705 seconds
總共花費10秒。
可以試著將任務數量、worker數量修改修改,看看它們的效能比例情況。例如,將worker數量設定為99,將需要4秒,將worker數量設定為10,將需要20秒。
Go實現設計模式系列(8) Go實現物件池模式
物件池模式,物件被預先建立並初始化後放入物件池中,物件提供者,物件提供者就能利用已有的物件來處理請求,減少物件頻繁建立所浪費的資源。例如資料庫的連線池等等,基本都是建立後就被放入連線池中,後續的查詢請求使用的都是連線池中的物件,從而加快了查詢速度 不然每次查詢都需要重新建立資料庫連線物件,比較浪費 ...
Go實現資源池
pool.go packagepool import sync io errors log var errsizetoosmall errors.new 池大小必須為正整數 errpoolclosed errors.new 池已關閉 pool 管理一組可以安全地在多個goroutine間共享的資源,...
GO 開發系列 基礎 Go 併發程式設計
併發和並行說明 併發特點 並行特點 go 協程 說明 乙個 go 執行緒上可以起多個協程,協程可以理解為是輕量級的執行緒 go 協程特點 go 併發原理 mpg 模型 詳見部落格 示例 package main import fmt time 向 intchan放入 1 8000 個數 func p...