Go 高階併發

2021-10-08 11:38:30 字數 3437 閱讀 3944

譯文出處:

譯者:咔嘰咔嘰

校對者:fivezh

如果你曾經使用過 go 一段時間,那麼你可能了解一些 go 中的併發原語:

這些語言特性和包組合在一起,為構建高併發的應用程式提供了豐富的工具集。你可能還沒有發現在擴充套件庫 golang.org/x/sync 中,提供了一系列更高階別的併發原語。我們將在本文中來談談這些內容。

正如文件中所描述,這個包提供了乙個重複函式呼叫抑制的機制。

如果你正在處理計算量大(也可能僅僅是慢,比如網路訪問)的使用者請求時,這個包就很有用。例如,你的資料庫中包含每個城市的天氣資訊,並且你想將這些資料以 api 的形式提供服務。在某些情況下,可能同時有多個使用者想查詢同一城市的天氣。

在這種場景下,如果你只查詢一次資料庫然後將結果共享給所有等待的請求,這樣不是更好嗎?這就是singleflight提供的功能。

在使用的時候,我們需要要建立乙個singleflight.group。它需要在所有請求中共享才能工作。然後將緩慢或者開銷大的操作包裝到group.do(key, fn)的呼叫中。對同乙個key的多個併發請求將僅呼叫fn一次,並且將fn的結果返回給所有呼叫者。

實際中的使用如下:

package weather

type info struct

var group singleflight.group

func city(city string) (*info, error) , error) )

if err != nil

return results.(*info), nil

}

需要注意的是,我們傳遞給group.do的閉包必須返回(inte***ce{}, error)才能和 go 型別系統一起使用。上面的例子中忽略了group.do的第三個返回值,該值是用來表示結果是否在多個呼叫方之間共享。

另乙個有用的包是 errgroup package。它和sync.waitgroup比較相似,但是會將任務返回的錯誤回傳給阻塞的呼叫方。

當你有多個等待的操作,但又想知道它們是否都已經成功完成時,這個包就很有用。還是以上面的天氣為例,假如你要一次查詢多個城市的天氣,並且要確保其中所有的查詢都成功返回。

首先定義乙個errgroup.group,然後為每個城市都使用group.go(fn func() error)方法。該方法會生成乙個 goroutine 來執行這個任務。當生成你想執行的所有任務時,使用group.wait()等待它們完成。需要注意和sync.waitgroup有一點不同的是,該方法會返回錯誤。當且僅當所有任務都返回nil時,才會返回乙個nil錯誤。

實際中的使用如下:

func cities(cities ...string) (*info, error) )

}if err := g.wait(); err != nil

return res, nil

}

這裡我們使用乙個res切片來儲存每個 goroutine 執行的結果。儘管上面的**沒有使用mu互斥鎖也是執行緒安全的,但是每個 goroutine 都是在切片中自己的位置寫入結果,因此我們不得不使用乙個切片,以防**變化。

上面的**將同時查詢給定城市的天氣資訊。如果城市數量比較少,那還不錯,但是如果城市數量很多,可能會導致效能問題。在這種情況下,就應該引入限制併發了。

在 go 中使用 semaphores 訊號量讓實現限制併發變得非常簡單。訊號量是你學習電腦科學中可能已經遇到過的併發原語,如果沒有遇到也不用擔心。你可以出於多種目的來使用訊號量,但是這裡我們只使用它來追蹤執行中的任務的數量,並阻塞直到有空間可以執行其他任務。

在 go 中,我們可以使用channel來實現訊號量的功能。如果我們一次需要最多執行 10 個任務,則需要建立乙個容量為 10 的channelsemaphore := make(chan struct{}, 10)。你可以想象它為乙個可以容納 10 個球的管道。

如果想執行乙個新的任務,我們只需要給channel傳送乙個值:semaphore <- struct{}{},如果已經有很多任務在執行的話,將會阻塞。這類似於將乙個球推入管道,如果管道已滿,則需要等待直到有空間為止。

當通過<-semaphore能從該channel中取出乙個值時,這表示乙個任務完成了。這類似於在管道另一端拿出乙個球,這將為塞入下乙個球提供了空間。

如描述一樣,我們修改後的cities**如下:

func cities(cities ...string) (*info, error) , 10)

for i, city := range cities {}

g.go(func() error )

}if err := g.wait(); err != nil

return res, nil

}

最後,當你想要限制併發的時候,並不是所有任務優先順序都一樣。在這種情況下,我們消耗的資源將依據高、低優先順序任務的分布以及它們如何開始執行而發生變化。

在這種場景下使用加權限制併發是一種不錯的解決方式。它的工作原理很簡單:我們不需要為同時執行的任務數量做預估,而是為每個任務提供乙個 "cost",並從訊號量中獲取和釋放它。

我們不再使用channel來做這件事,因為我們需要立即獲取並釋放 "cost"。幸運的是,"擴充套件庫" golang.org/x/sync/sempahore 實現了加權訊號量。

sem <- struct{}{}操作叫 "獲取",<-sem操作叫 "釋放"。你可能會注意到semaphore.acquire方法會返回錯誤,那是因為它可以和context包一起使用來控制提前結束。在這個例子中,我們將忽略它。

實際上,天氣查詢的例子比較簡單,不適用加權訊號量,但是為了簡單起見,我們假設cost變數隨城市名稱長度而變化。然後,我們修改如下:

func cities(cities ...string) (*info, error) 

g.go(func() error )

}if err := g.wait(); err != nil else if err := ctx.err(); err != nil

return res, nil

}

上面的例子展示了在 go 中通過微調來實現需要的併發模式是多麼簡單。

Go語言併發

協程 本質上是一種使用者態執行緒,不需要作業系統來進行搶占式排程,且在真正的實現重寄存於執行緒中,因此,系統開銷極小,可以有效提高執行緒的任務併發性,從而避免多執行緒的缺點。使用協程的優點是程式設計簡單,結構清晰 缺點是需要語言的支援。協程最大優勢 輕量級 可以輕鬆建立上百萬個而不會導致系統資源衰竭...

go總結 併發

主要學習了菜鳥程式設計 go 併發 總結 go func 可以起乙個執行緒,輕量級的執行緒,執行緒之間採用chan進行傳值。package main import fmt func sum s int c chan int c sum 把 sum 傳送到通道 c func main 緩衝區大小為2 ...

Go控制併發

目錄非同步返回結果 多路復用和超時控制 channel的關閉和廣播 任務的取消 關聯任務的取消 在 go 中可以使用 sync.mutex 或者 sync.rwmutex 來實現 sync.mutex 互斥鎖 sync.rwmutex 一種特殊型別的鎖,其允許多個唯讀操作並行執行,但寫操作會完全互斥...