Go 语言的 MPG 并发调度模型

Go 语言的 MPG 并发调度模型

12月 21, 2020
源码分析, 系统设计, 数据结构, Golang

Golang 的高并发能力是通过协程 goroutine 实现的,在 Go 语言的开发中,每涉及到 goroutine 的相关功能实现时,都会意识到自己需要对 MPG 的模型有一个大概的了解,在此基础上才能做好开发上的决策,本篇也即为 MPG 并发调度模型的学习笔记了。

进程、线程和协程 #

要对 goroutine 的调度器深入了解,需要先了解一下进程、线程的基础知识。

进程 #

进程作为拥有资源分配的最小单位,它为每个程序维护着运行时的各种资源,比如进程ID、进程的页表、进程执行现场的寄存器值、进程各个段地址空间分布信息以及进程执行时的维护信息等,它们在程序的运行期间会被经常或实时更新。

进程作为用户操作的实体,它贯穿操作系统的整个生命周期,而程序是由若干段二进制码组成的。进程可以说是程序的运行态抽象,即运行于处理器中的二进制码叫作进程,保存在存储介质中的二进制码叫作程序。

Linux 用 fork 方法创建进程,不同的进程具有不同的进程 ID(PID)。

线程 #

一个进程会至少包含一个线程,线程是操作系统调度时的最基本单元,每个线程都会占用 1M 以上的内存空间,而且在线程切换时也会消耗更多的内存和时间。操作系统的线程状态一般分为就绪态、运行态、睡眠态、僵尸态和终止态。线程之间一般通过同步原语进行并发资源访问的管理。

Linux 用 clone 方法创建线程,线程也称为是轻量级的进程。

协程 #

协程并不是 Go 独有的概念,在很多其他语言也存在,但是只有 Go 在语言层面对协程直接提供如此优雅的支持。每个协程初始化后的内存大小只有 2k,比线程小的多,而且可以动态扩容。线程和协程是 M : N 的关系,即 M 线程上可能有 N 个协程。

调度模型 #

图中 G 是 goroutine,它是一个待执行的任务, M 是线程,由操作系统调度和管理,P 是运行在线程上的逻辑处理器,它们的代码定义在 runtime.runtime2.go 中,下面详细介绍。

G #

G,就是 Goroutine,即我们在 Go 程序中使用 go 关键字创建的执行体,Go 程序中每个 go 关键字都会创建一个 goroutine,是调度器中待执行的任务。

因为 runtime.g 中涉及了非常多的字段,在一起不太好说,下面开始拆开逐个分析。

栈内存 stack #

type g struct {
	stack       stack
	stackguard0 uintptr
}

type stack struct {
	lo uintptr
	hi uintptr
}

g.stack 字段描述了当前 goroutine 的栈内存范围 [stack.lo, stack.hi)

stackguard0 用于调度器抢占式调度。

调度器 sched #

type g struct {
  sched gobuf
}

sched 存储 goroutine 的调度相关的数据,即 gobug 格式的内容;

执行现场 gobuf #

当协程进行上下文切换时,上一个协程的执行现场需要存储在 g.gobuf 结构体中,g.gobuf 结构体主要保存 CPU 中几个重要的寄存器值,分别是 rsp、rip、rbp。

type g struct {
  sched gobuf
}

type gobuf struct {
	sp   uintptr     // CPU 的 rsp 寄存器的值
	pc   uintptr     // CPU 的 rip 寄存器的值
	g    guintptr    // 记录当前的 gobuf 属于哪个 goroutine
	ret  sys.Uintreg // 保存系统调用的返回值
	bp   uintptr     // 记录 CPU 的 rbp 寄存器的值
}

其中:

  • rsp 寄存器始终指向函数调用栈顶;
  • rip 寄存器执行程序要执行的下一条指令的地址;
  • rbp 存储了函数栈帧的起始位置。

当 goroutine 被恢复时,调度器代码就会把这个 G 对象里的变量恢复到寄存器中让 CPU 执行。

panic 和 defer #

type g struct {
	_panic       *_panic // 最内侧的 panic 结构体
	_defer       *_defer // 最内侧的延迟函数结构体
}

我们知道 panic 和 defer 在程序中的声明和调用是入栈和出栈的形式,它们在底层是以链表来实现的,每一个 goroutine 上都持有分别存储 deferpanic 对应结构体的链表。

关联 M #

type g struct {
	m    *m
}

m 为 g 所占用的线程。

状态流转 #

type g struct {
	atomicstatus uint32
}

g.atomicstatus 中记录了 goroutine 当前的状态,主要是以下的几种:

状态描述
_Gidle刚刚被分配并且还没有被初始化,初始化以后会变为 _Gdead 状态,_Gdead 也是协程被销毁时的状态。
_Grunnable没有执行代码,没有栈的所有权,存储在运行队列中,等待被运行。
_Grunning表示当前协程正在被运行,拥有栈的所有权,被赋予了内核线程 M 和处理器 P。
_Gsyscall正在执行系统调用,拥有栈的所有权,没有执行用户代码,被赋予了内核线程 M 但是不在运行队列上。
_Gwaiting由于运行时而被阻塞,没有执行用户代码并且不在运行队列上,但是可能存在于 Channel 的等待队列上。
_Gdead没有被使用,没有执行代码,可能有分配的栈。
_Gcopystack栈正在被拷贝,没有执行代码,不在运行队列上。
_Gpreempted由于抢占而被阻塞,没有执行用户代码并且不在运行队列上,等待唤醒。
_GscanGC 正在扫描栈空间,没有执行代码,可以与其他状态同时存在。

goroutine 的状态流转非常复杂,他们之间大体如下图。

M #

M 是 Machine,即传统意义上进程的线程,由操作系统调度和管理,M 需要持有 P 才能执行任务 G,注意 M 并不是线程本身,而是与运行时为 M 创建的线程相绑定。M 本身是无状态的,是否为空闲仅以它是否存在与调度器的空闲 M 列表中为依据。

除了 M0 是由 main goroutine 创建的,创建另外 M 的原因主要是没有足够的 M 来关联 P 并运行其中的 G,另外运行时系统执行系统监控和垃圾回收也会导致创建新的 M。

g0 #

type m struct {
	g0   *g  // goroutine with scheduling stack
}

每个线程中都有一个特殊的协程 g0,g0 负责执行协程调度的一系列运行时代码,而一般的协程用来执行用户代码,当用户协程退出或被抢占时,需要重新运行协程调度,这是就会从用户协程 g 切换到 g0,g0 会调度出另一个用户协程 g,从而开始执行新的用户协程,循环往复。

curg #

type m struct {
	curg *g  // current running goroutine
}

curg 就是 M 当前执行的用户协程。

线程本地存储 tls #

type m struct {
	tls [6]uintptr // thread-local storage (for x86 extern register)
}

操作系统中,线程中存储的局部变量只对当前线程可见,Go 语言的运行时的调度器使用线程本地存储将线程与运行时的 M 结构体绑定在一起。结构体 M 中 m.tls 的是线程本地存储的地址,同时 tls[0] 存储的是当前线程正在运行的协程 g 的地址。因此,在任何一个线程内部,都可以获取到当前线程上的协程 g、结构体 m、和处理器 p、特殊协程 g0 等。

持有 P #

type m struct {
	p             puintptr
	nextp         puintptr
	oldp          puintptr
}

M 被创建之后就会关联一个 P,M 与 P 的关系体现在以上三个字段中,其中:

  • p:表示正在运行代码的处理器。
  • nextp:表示暂存于当前 M 的、有潜在关系的处理器 P,M 创建之初,这个 P 就会被关联。
  • oldp:执行调用栈之前使用 M 的处理器。

P #

处理器(Processor)P 不是指 CPU,而是运行在线程上的调度管理器,是 G 能够在 M 上运行的关键,具有调度 goroutine 的能力。只有当 M 与一个 P 关联后才能执行 Go 代码,runtime.p 是处理器的运行时表示。P 的数量由环境变量 GOMAXPROCSruntime.GOMAXPROCS() 函数指定。

Go 语言的运行时会适时地让 P 与不同的 M 建立或断开关联,以使 P 中那些可运行的 G 能够在需要的时候获得运行时机,当 P 不再与 M 关联时,它会被加入全局的空闲 P 列表 runtime.sched.pidle

运行队列 #

type p struct {
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	runnext  guintptr
}

处理器 P 除了持有线程,还有一个 goroutine 运行队列,源码中用 runq 相关的字段表示,而 runnext 是线程下一个要执行的 goroutine。除了这个队列外,还有一个全局的 runq 队列,由多个处理器共享。

一般来说,P 中的 G 创建的协程会加入本地的 runq 中,如果本地已满,则会加入全局的队列,处理器 P 除了调度本地队列中的协程,还会周期地从全局队列中获取 goroutine 来调度。

GOMAXPROCS #

通过 GOMAXPROCS 可以设置 Go 程序最多运行的 P 的数量,默认情况下它会是主机 CPU 的核数。在确定了这个值以后,运行时会根据这个值初始化全局的 P 列表 runtime.allp

被 M 持有 #

type p struct {
  m muintptr
}

即当前 P 被这个 M 持有,如果当前的 M 被系统调用阻塞(即它运行的 G 进入了系统调用),运行时会将 M 与 P 分离开,重新找一个空闲的 M 或新建一个 M。

状态流转 #

type p struct {
	status      uint32
}

runtime.p 结构体中的状态 status 字段会是以下五种中的一种:

状态描述
_Pidle处理器没有运行用户代码或者调度器,被空闲队列或者改变其状态的结构持有,运行队列为空。
_Prunning被线程 M 持有,并且正在执行用户代码或者调度器。
_Psyscall没有执行用户代码,当前线程陷入系统调用。
_Pgcstop被线程 M 持有,当前处理器由于垃圾回收被停止,是 P 的初始状态。
_Pdead当前处理器已经不被使用。

P 在各个状态之间的流转如图所示:

调度器 schedt #

type schedt struct {
	lock mutex

	midle        muintptr // idle m's waiting for work
	nmidle       int32    // number of idle m's waiting for work
	nmidlelocked int32    // number of locked m's waiting for work
	mnext        int64    // number of m's that have been created and next M ID
	maxmcount    int32    // maximum number of m's allowed (or die)
	nmsys        int32    // number of system m's not counted for deadlock
	nmfreed      int64    // cumulative number of freed m's

	ngsys uint32 // number of system goroutines; updated atomically

	pidle      puintptr // idle p's
	npidle     uint32

	// Global runnable queue.
	runq     gQueue
	runqsize int32

	// Global cache of dead G's.
	gFree struct {
		lock    mutex
		stack   gList // Gs with stacks
		noStack gList // Gs without stacks
		n       int32
	}
  
	// freem is the list of m's waiting to be freed when their
	// m.exited is set. Linked through m.freelink.
	freem *m
}

调度器是所有 goroutine 被调度的核心,存放了调度器持有的全局资源,其中访问这些资源需要持有锁:

  • 管理了能够将 G 和 M 进行绑定的 M 队列
  • 管理了空闲的 P 链表(队列)
  • 管理了 G 的全局队列
  • 管理了可被复用的 G 的全局缓存
  • 管理了 defer 池

调度策略 #

Go 语言经历过多次版本迭代后,最终构成现在基于信号的抢占式调度器,其详细的演变历史可参考 Go 语言设计与实现 - 6.5 调度器

调度器启动 #

schedinit #

Go 语言的运行时通过 schedinit 初始化调度器。

func schedinit() {
	// raceinit must be the first call to race detector.
	// In particular, it must be done before mallocinit below calls racemapshadow.
	_g_ := getg()

	sched.maxmcount = 10000

	// The world starts stopped.
	worldStopped()

	moduledataverify()
	stackinit()
	mallocinit()
	fastrandinit() // must run before mcommoninit
	mcommoninit(_g_.m, -1)
	cpuinit()       // must run before alginit
	alginit()       // maps must not be used before this call
	modulesinit()   // provides activeModules
	typelinksinit() // uses maps, activeModules
	itabsinit()     // uses activeModules

	sigsave(&_g_.m.sigmask)

	goargs()
	goenvs()
	parsedebugvars()
	gcinit()

	lock(&sched.lock)
	sched.lastpoll = uint64(nanotime())
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
	unlock(&sched.lock)
	// World is effectively started now, as P's can run.
	worldStarted()
}

在初始函数执行的过程中会将能够创建的最大线程数 maxmcount 设置成 10000,可以使用 SetMaxThreads 函数设置 maxmcount,但是注意这个值不能小于 M 的数量,否则会引发 Panic,所以这个值越早设置风险越低。M 被创建之后会被加入全局 M 列表 runtime.allm

procresize #

schedinit 最后调用了 procresize 来更新程序中处理器的数量,这个过程中不会运行任何用户 goroutine,调度器也会进入锁定状态。

func procresize(nprocs int32) *p {
	assertLockHeld(&sched.lock)
	assertWorldStopped()

	old := gomaxprocs
	maskWords := (nprocs + 31) / 32

	// initialize new P's
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}
	_g_ := getg()
	// g.m.p is now set, so we no longer need mcache0 for bootstrapping.
	mcache0 = nil
	// release resources from unused P's
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy()
		// can't free P itself because it can be referenced by an M in syscall
	}
	var runnablePs *p
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	return runnablePs
}

procresize 函数会完成相应数量的处理器的启动,等待用户创建新的 goroutine 并为其调度处理器资源。

创建 goroutine #

goroutine 是通过 go 关键字创建的,编译器会将其在运行时转换为 newproc 函数调用。

newproc #

func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	gp := getg()
	pc := getcallerpc()
	systemstack(func() {
		newg := newproc1(fn, argp, siz, gp, pc)

		_p_ := getg().m.p.ptr()
		runqput(_p_, newg, true)

		if mainStarted {
			wakep()
		}
	})
}

newproc 的入参数,然后调用 newproc1 函数获取新的 goroutine 结构体,调用 runqput 将其加入处理器的运行队列并在满足条件时调用 wakep 唤醒新的处理器执行 goroutine。

newproc1 #

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()

	acquirem() // disable preemption because it can be holding p in a local var

	_p_ := _g_.m.p.ptr()
	newg := gfget(_p_)
	if newg == nil {
		newg = malg(_StackMin)
		casgstatus(newg, _Gidle, _Gdead)
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}

	if isSystemGoroutine(newg, false) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	casgstatus(newg, _Gdead, _Grunnable)
	releasem(_g_.m)

	return newg
}

newproc1会根据传入参数初始化一个 g 结构体,它实现的功能如下:

  1. 获取或者创建新的 goroutine 结构体;
  2. 将传入的参数移到 goroutine 的栈上;
  3. 更新 goroutine 调度相关的属性。

newproc1 会从处理器或者调度器的缓存中获取新的结构体,也可以调用 malg 函数创建。

运行队列 #

func runqput(_p_ *p, gp *g, next bool) {
	if next {
	retryNext:
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		}
		// Kick the old runnext out to the regular run queue.
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
	t := _p_.runqtail
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
		return
	}
	if runqputslow(_p_, gp, h, t) {
		return
	}
	// the queue is not full, now the put above must succeed
	goto retry
}

newproc 调用完 newproc1 获取完 goroutine 结构体会再调用 runqput 函数将 goroutine 放到运行队列上。Go 语言有两个运行队列,其中一个是处理器本地的运行队列,另一个是调度器持有的全局运行队列,只有在本地运行队列没有剩余空间时才会使用全局队列。

调度循环 #

调度器启动后会进入调度循环,经过 mstartmstart1,最后的核心函数是 schedule,调度循环也是调度器工作的最核心逻辑。

schedule #

func schedule() {
	_g_ := getg()

top:
	var gp *g
	var inheritTime bool

	if sched.disable.user && !schedEnabled(gp) {
		// Scheduling of this goroutine is disabled. Put it on
		// the list of pending runnable goroutines for when we
		// re-enable user scheduling and look again.
		lock(&sched.lock)
		if schedEnabled(gp) {
			// Something re-enabled scheduling while we
			// were acquiring the lock.
			unlock(&sched.lock)
		} else {
			sched.disable.runnable.pushBack(gp)
			sched.disable.n++
			unlock(&sched.lock)
			goto top
		}
	}

	execute(gp, inheritTime)
}

schedule 通过以下几种方式查找可运行的 goroutine 并使用 execute 执行:

  1. 从全局运行队列查找待执行的 goroutine;
  2. 从 P 本地的运行队列查找待执行的 goroutine;
  3. 两种方法都没找到,会通过 findrunnable 函数阻塞地进行查找 goroutine。

execute #

func execute(gp *g, inheritTime bool) {
	_g_ := getg()

	// Assign gp.m before entering _Grunning so running Gs have an
	// M.
	_g_.m.curg = gp
	gp.m = _g_.m
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		_g_.m.p.ptr().schedtick++
	}

	// Check whether the profiler needs to be turned on or off.
	hz := sched.profilehz
	if _g_.m.profilehz != hz {
		setThreadCPUProfiler(hz)
	}

	if trace.enabled {
		// GoSysExit has to happen when we have a P, but before GoStart.
		// So we emit it here.
		if gp.syscallsp != 0 && gp.sysblocktraced {
			traceGoSysExit(gp.sysexitticks)
		}
		traceGoStart()
	}

	gogo(&gp.sched)
}

找到可运行的 goroutine 后,交给 execute 函数准备执行 goroutine,然后 execute 会通过 gogo 函数将 goroutine 调度到当前线程上进行执行。

gogo #

gogo 是真正执行 goroutine,由汇编代码实现,在每种 CPU 平台的实现不一样,代码集中在 runtime.asm_xxx 里面。

// void gogo(Gobuf*)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $8-4
	MOVL buf+0(FP), BX     // 获取调度信息
	MOVL gobuf_g(BX), DX
	MOVL 0(DX), CX         // 保证 Goroutine 不为空
	get_tls(CX)
	MOVL DX, g(CX)
	MOVL gobuf_sp(BX), SP  // 将 runtime.goexit 函数的 PC 恢复到 SP 中
	MOVL gobuf_ret(BX), AX
	MOVL gobuf_ctxt(BX), DX
	MOVL $0, gobuf_sp(BX)
	MOVL $0, gobuf_ret(BX)
	MOVL $0, gobuf_ctxt(BX)
	MOVL gobuf_pc(BX), BX  // 获取待执行函数的程序计数器
	JMP  BX  

触发调度 #

触发 schedule 重新调度 goroutine 进行执行的场景主要有以下四种:

  • 主动挂起,比如 channel 阻塞。
  • 系统调用。
  • 协作式调度,比如使用 Gosched 函数。
  • 系统监控。

线程调度 #

Go 语言的运行时会通过 runtime.startm 启动线程来执行处理器 P,如果我们在该函数中没能从闲置列表中获取到线程 M 就会调用 runtime.newm 创建新的线程。

总结 #

本文从操作系统的进程和线程的介绍开始,进而重新认识了协程,然后通过源码的相关数据结构对 MPG 调度模型的研究来了解 Groutine 运行时调度机制,除了源码,文章也参考了很多资料,列在了下面。另外本篇是着重参考了 Go 语言设计与实现 - 6.5 调度器 的研究思路和框架结构边学习边做笔记,捡我暂时能理解的内容产出了本文。

本文共 5248 字,上次修改于 Dec 6, 2024,以 CC 署名-非商业性使用-禁止演绎 4.0 国际 协议进行许可。

相关文章

» Go 语言中 Channel 的实现

» Go 语言的 Context 源码分析

» 了解下 Protobuf 相关概念

» Gin Web 框架中 Middleware 的实现原理

» Redis 实现布隆过滤器