Go 语言中 Goroutine 的并发数量控制
6月 19, 2020
虽然 Goroutine 号称占用的内存非常小,初始化只有几 K 大小,但是很多时候确实还是需要控制一台机器的 Goroutine 的并发数量,以进行业务上的可控并发场景的需要,或防止异常情况下的CPU、内存过载进而导致服务不可用的情况。
通过 Channel 控制 #
利用带缓冲区的 Channel 可以实现一个分发队列,队列里有值时就可以给 Goroutine 使用,没有值时 Goroutine 阻塞进行等待。
package main
import (
"log"
"time"
)
func main() {
ch := make(chan struct{}, 3)
for i := 0; i < 10; i++ {
ch <- struct{}{}
go func(i int) {
log.Println(i)
time.Sleep(time.Second)
<-ch
}(i)
}
}
// 2022/03/22 19:53:55 2
// 2022/03/22 19:53:55 1
// 2022/03/22 19:53:55 0
// 2022/03/22 19:53:56 5
// 2022/03/22 19:53:56 4
// 2022/03/22 19:53:56 3
// 2022/03/22 19:53:57 6
// 2022/03/22 19:53:57 7
// 2022/03/22 19:53:57 8
协程池 - ants #
更加比较成熟的方案是使用开源的协程池实现,这里使用 ants 来介绍一下协程池的概念。ants 是一个高性能且低损耗的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。
默认 pool #
package main
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/panjf2000/ants/v2"
)
var sum int32
func demoFunc() {
time.Sleep(10 * time.Millisecond)
fmt.Println("Hello World!")
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the common pool.
var wg sync.WaitGroup
syncCalculateSum := func() {
demoFunc()
wg.Done()
}
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = ants.Submit(syncCalculateSum)
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", ants.Running())
fmt.Printf("finish all tasks.\n")
}
自定义 pool #
package main
import (
"fmt"
"sync"
"sync/atomic"
"github.com/panjf2000/ants/v2"
)
var sum int32
func myFunc(i interface{}) {
n := i.(int32)
atomic.AddInt32(&sum, n)
fmt.Printf("run with %d\n", n)
}
func main() {
defer ants.Release()
runTimes := 1000
// Use the pool with a function,
// set 10 to the capacity of goroutine pool and 1 second for expired duration.
p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
myFunc(i)
wg.Done()
})
defer p.Release()
// Submit tasks one by one.
for i := 0; i < runTimes; i++ {
wg.Add(1)
_ = p.Invoke(int32(i))
}
wg.Wait()
fmt.Printf("running goroutines: %d\n", p.Running())
fmt.Printf("finish all tasks, result is %d\n", sum)
}
ants 也提供了很多自定义的选项来支持各种场景需要。
type Options struct {
// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
// used for more than `ExpiryDuration`.
ExpiryDuration time.Duration
// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
PreAlloc bool
// Max number of goroutine blocking on pool.Submit.
// 0 (default value) means no such limit.
MaxBlockingTasks int
// When Nonblocking is true, Pool.Submit will never be blocked.
// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
// When Nonblocking is true, MaxBlockingTasks is inoperative.
Nonblocking bool
// PanicHandler is used to handle panics from each worker goroutine.
// if nil, panics will be thrown out again from worker goroutines.
PanicHandler func(interface{})
// Logger is the customized logger for logging info, if it is not set,
// default standard logger from log package is used.
Logger Logger
}
具体的使用方法可以参考官方文档了,因为 ants 确实非常好用且性能出众,后面可能会专门从源码角度分析一下 ants。
全局变量加锁 #
当然还有一个最传统的方案,就是使用互斥锁维护一个全局变量的并发安全。变量可以是一个 struct
,记录当前的并发数量和需要控制的并发数量,然后在 goroutine 出去方法之前,对全局变量的数量值进行校验。不过这种使用场景可能比较局限,慎用。
package main
import (
"fmt"
"sync"
"time"
)
type GoNum struct {
Max int
Curr int
Mutex sync.Mutex
}
var GONUM = &GoNum{
Max: 10,
Curr: 0,
}
func gofunc() {
if GONUM.Curr < GONUM.Max {
GONUM.Mutex.Lock()
defer GONUM.Mutex.Unlock()
fmt.Println("curr:", GONUM.Curr)
GONUM.Curr++
}
}
func main() {
for {
go gofunc()
time.Sleep(time.Second)
}
}
通过 Context 进行控制 #
通过 Context 主要就是指通过 Context.WithValue
方法生成根节点,同时维护一个变量,后面的故事就可以参考上面的“全局变量”小节了。
通过 WaitGroup 进行控制 #
说起同步原语,就不得不说使用 sync.WaitGroup
,可能更加简单,都不用手动加锁了,Go 封装的 WaitGroup
已经解决了并发安全问题,只需要通过 Add()
设置 WaitGroup 的计数器数量、通过 Done()
表示一个 Goroutine 的完成、通过 Wait()
进行阻塞,等待所有 Goroutine 的执行完成,就可以进行并发数量的限制了。
package main
import (
"fmt"
"sync"
)
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func(v int) {
defer wg.Done()
fmt.Println(v)
}(i)
}
wg.Wait()
fmt.Println("work done!")
}
// output:
// 9
// 5
// 1
// 7
// 6
// 3
// 8
// 4
// 2
// 0
// work done!
总结 #
上面所有的方法,都有适合的场景,可以根据业务的需要灵活选择和结合使用,但有一个原则需要注意,也是 Go 语言里的一句名言:如果你不知道如何退出一个协程,那么就不要创建这个协程。