Go 语言中 Goroutine 的并发数量控制

Go 语言中 Goroutine 的并发数量控制

6月 19, 2020
Golang

虽然 Goroutine 号称占用的内存非常小,初始化只有几 K 大小,但是很多时候确实还是需要控制一台机器的 Goroutine 的并发数量,以进行业务上的可控并发场景的需要,或防止异常情况下的CPU、内存过载进而导致服务不可用的情况。

通过 Channel 控制 #

利用带缓冲区的 Channel 可以实现一个分发队列,队列里有值时就可以给 Goroutine 使用,没有值时 Goroutine 阻塞进行等待。

package main

import (
	"log"
	"time"
)

func main() {
	ch := make(chan struct{}, 3)
	for i := 0; i < 10; i++ {
		ch <- struct{}{}
		go func(i int) {
			log.Println(i)
			time.Sleep(time.Second)
			<-ch
		}(i)
	}
}
// 2022/03/22 19:53:55 2
// 2022/03/22 19:53:55 1
// 2022/03/22 19:53:55 0
// 2022/03/22 19:53:56 5
// 2022/03/22 19:53:56 4
// 2022/03/22 19:53:56 3
// 2022/03/22 19:53:57 6
// 2022/03/22 19:53:57 7
// 2022/03/22 19:53:57 8

协程池 - ants #

更加比较成熟的方案是使用开源的协程池实现,这里使用 ants 来介绍一下协程池的概念。ants 是一个高性能且低损耗的 goroutine 池,实现了对大规模 goroutine 的调度管理、goroutine 复用,允许使用者在开发并发程序的时候限制 goroutine 数量,复用资源,达到更高效执行任务的效果。

默认 pool #

package main

import (
	"fmt"
	"sync"
	"sync/atomic"
	"time"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func demoFunc() {
	time.Sleep(10 * time.Millisecond)
	fmt.Println("Hello World!")
}

func main() {
	defer ants.Release()
	runTimes := 1000
	// Use the common pool.
	var wg sync.WaitGroup
	syncCalculateSum := func() {
		demoFunc()
		wg.Done()
	}
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = ants.Submit(syncCalculateSum)
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", ants.Running())
	fmt.Printf("finish all tasks.\n")
}

自定义 pool #

package main

import (
	"fmt"
	"sync"
	"sync/atomic"

	"github.com/panjf2000/ants/v2"
)

var sum int32

func myFunc(i interface{}) {
	n := i.(int32)
	atomic.AddInt32(&sum, n)
	fmt.Printf("run with %d\n", n)
}

func main() {
	defer ants.Release()
	runTimes := 1000
	// Use the pool with a function,
	// set 10 to the capacity of goroutine pool and 1 second for expired duration.
	p, _ := ants.NewPoolWithFunc(10, func(i interface{}) {
		myFunc(i)
		wg.Done()
	})
	defer p.Release()
	// Submit tasks one by one.
	for i := 0; i < runTimes; i++ {
		wg.Add(1)
		_ = p.Invoke(int32(i))
	}
	wg.Wait()
	fmt.Printf("running goroutines: %d\n", p.Running())
	fmt.Printf("finish all tasks, result is %d\n", sum)
}

ants 也提供了很多自定义的选项来支持各种场景需要。

type Options struct {
	// ExpiryDuration is a period for the scavenger goroutine to clean up those expired workers,
	// the scavenger scans all workers every `ExpiryDuration` and clean up those workers that haven't been
	// used for more than `ExpiryDuration`.
	ExpiryDuration time.Duration

	// PreAlloc indicates whether to make memory pre-allocation when initializing Pool.
	PreAlloc bool

	// Max number of goroutine blocking on pool.Submit.
	// 0 (default value) means no such limit.
	MaxBlockingTasks int

	// When Nonblocking is true, Pool.Submit will never be blocked.
	// ErrPoolOverload will be returned when Pool.Submit cannot be done at once.
	// When Nonblocking is true, MaxBlockingTasks is inoperative.
	Nonblocking bool

	// PanicHandler is used to handle panics from each worker goroutine.
	// if nil, panics will be thrown out again from worker goroutines.
	PanicHandler func(interface{})

	// Logger is the customized logger for logging info, if it is not set,
	// default standard logger from log package is used.
	Logger Logger
}

具体的使用方法可以参考官方文档了,因为 ants 确实非常好用且性能出众,后面可能会专门从源码角度分析一下 ants。

全局变量加锁 #

当然还有一个最传统的方案,就是使用互斥锁维护一个全局变量的并发安全。变量可以是一个 struct,记录当前的并发数量和需要控制的并发数量,然后在 goroutine 出去方法之前,对全局变量的数量值进行校验。不过这种使用场景可能比较局限,慎用。

package main

import (
	"fmt"
	"sync"
	"time"
)

type GoNum struct {
	Max   int
	Curr  int
	Mutex sync.Mutex
}

var GONUM = &GoNum{
	Max:  10,
	Curr: 0,
}

func gofunc() {
	if GONUM.Curr < GONUM.Max {
		GONUM.Mutex.Lock()
    defer GONUM.Mutex.Unlock()
		fmt.Println("curr:", GONUM.Curr)
		GONUM.Curr++
	}
}

func main() {
	for {
		go gofunc()
		time.Sleep(time.Second)
	}
}

通过 Context 进行控制 #

通过 Context 主要就是指通过 Context.WithValue 方法生成根节点,同时维护一个变量,后面的故事就可以参考上面的“全局变量”小节了。

通过 WaitGroup 进行控制 #

说起同步原语,就不得不说使用 sync.WaitGroup ,可能更加简单,都不用手动加锁了,Go 封装的 WaitGroup 已经解决了并发安全问题,只需要通过 Add() 设置 WaitGroup 的计数器数量、通过 Done() 表示一个 Goroutine 的完成、通过 Wait() 进行阻塞,等待所有 Goroutine 的执行完成,就可以进行并发数量的限制了。

package main

import (
	"fmt"
	"sync"
)

func main() {
	var wg sync.WaitGroup
	for i := 0; i < 10; i++ {
		wg.Add(1)
		go func(v int) {
			defer wg.Done()
			fmt.Println(v)
		}(i)
	}
	wg.Wait()
	fmt.Println("work done!")
}
// output:
// 9
// 5
// 1
// 7
// 6
// 3
// 8
// 4
// 2
// 0
// work done!

总结 #

上面所有的方法,都有适合的场景,可以根据业务的需要灵活选择和结合使用,但有一个原则需要注意,也是 Go 语言里的一句名言:如果你不知道如何退出一个协程,那么就不要创建这个协程。

本文共 1258 字,上次修改于 Dec 6, 2024,以 CC 署名-非商业性使用-禁止演绎 4.0 国际 协议进行许可。

相关文章

» Gin Web 框架中 Validate 使用总结

» Go 程序取消子 Goroutine 的几种方式

» Go 标准库中涉及 I/O 操作的几个包的区别