Go 语言中 Channel 的实现

Go 语言中 Channel 的实现

Nov 6, 2020
Go, 源码分析, 数据结构

关于 Go 并发设计的哲学中,最常见的一句话就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存1。Go 语言采用 CSP(Communicating sequential processes)并发模型,底层运用 Channel 实现其特性,如果不对 Channel 有一番了解,也就很难真正掌握 Go 语言独特的并发技术。

使用方法 #

先复习下 Channel 的使用方法。

定义 #

var ch chan int

通过 var 声明一个变量,不会申请内存空间,显然也无法向通道中写入或读取任何数据,所以在使用 Channel 时我们一般用 make

// 定义一个无缓冲的队列
ch := make(chan int)

// 定义一个有缓冲的队列
ch := make(chan int, 10)

写入通道数据 #

通道在符号 <- 左边就是写入。

ch := make(chan int)
// 写入数据
ch <- 1

读取通道数据 #

使用 i <- ch 读取通道数据。

package main

import "fmt"

func main() {
	ch := make(chan int, 10)
	defer close(ch)
	ch <- 1

	// 读取数据
	i := <-ch
	fmt.Println(i)
}
// 1

其中也可以 i, ok := <-ch 实现,第二个参数 ok 是 bool 类型表示是否读取了数据,需要注意的是,他不能用于表示通道的关闭状态。

作为参数和返回值 #

通道作为一等公民,可以作为参数和返回值。通道是协程之间交流的方式,不管是将数据读取还是写入通道,都需要将代表通道的变量通过函数传递到所在的协程中去。

func worker(ch chan int) {
	for n := range ch {
		fmt.Println("get data:", n)
	}
}

func producer(length int) chan int {
	return make(chan int, length)
}

单向通道 #

有时我们经常限定一个 Channel 在一个场景下只读或只写,从而防止滥用。Go 语言的类型系统为此提供了单方向的通道类型。

// 限制只读
func worker(ch <-chan int) {
	for n := range ch {
		fmt.Println("get data:", n)
	}
}

// 限制只写
func writer(ch chan<- int) {
	for i := 0; i < 10; i++ {
		ch <- i
	}
}

这种用法一般用在函数的形参上面,使用 make 并不能这样定义,而且需要注意的是,试图在只能写入的通道中读取数据,编译时会报错,同样试图在只读的通道写入数据也会报错。

关闭通道 #

ch := make(chan int, 1)
defer close(ch)

执行关闭时需要注意,当 Channel 是一个空指针或者已经被关闭时,执行 close 都会直接崩溃并抛出异常。

for range #

Go 提供了range关键字,将其使用在 channel 上时,会自动等待 channel 的动作一直到 channel 被关闭,如下:

ticker := time.NewTicker(time.Minute * 5)
for range ticker.C {
	doSomeThing()
}

select #

Go 中的 select 可以让 goroutine 同时等待多个 channel 可读或可写,它有几个特性需要注意:

  1. 除 default 外,如果只有一个 case 语句评估通过,那么就执行这个case里的语句;
  2. 除 default 外,如果有多个 case 语句评估通过,那么通过伪随机的方式随机选一个;
  3. 如果 default 外的 case 语句都没有通过评估,那么执行 default 里的语句;
  4. 如果没有 default,那么代码块会被阻塞,直到有一个 case 通过评估;否则一直阻塞
  5. case 分支永远不会进入 nil 通道;
  6. 空的 select 语句会直接阻塞当前 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
ch := make(chan bool)
go func() {
  defer close(ch)
  //具体的任务,这里模拟做的任务需要1秒完成
  time.Sleep(time.Second * 1)
}()
select {
  case <- ch:
  fmt.Println("ch bool", <- ch)
  case <- ctx.Done():
  fmt.Println("ctx done")
}
// output:
// ch bool false

len 和 cap 使用 #

len() 方法可以查询缓冲区数据的个数,cap() 可以查看缓冲区的大小。

底层原理 #

现在开始从源码角度分析上面的 Channel 提供的特性,源码集中在 runtime/chan.go 里面,可以自行进行翻阅查找,另外说明一点,从关键字的语法到编译再到运行时进入真正源码的转换逻辑这里先不介绍了,因为我还没搞懂,后面再专门说。

数据结构 #

type hchan struct {
	qcount   uint           // total data in the queue
	dataqsiz uint           // size of the circular queue
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	elemsize uint16
	closed   uint32
	elemtype *_type // element type
	sendx    uint   // send index
	recvx    uint   // receive index
	recvq    waitq  // list of recv waiters
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	lock mutex
}

type waitq struct {
	first *sudog
	last  *sudog
}

源码中有英文注释,这里再详细解释下:

  • qcount 已经在队列中的数据的个数
  • dataqsize 循环队列的长度
  • buf 存放实际数据的缓冲区的指针,这段缓冲区会被当作数据的环形队列使用。
  • elemsize 能够收发的元素的大小
  • closed 标记队列关闭状态
  • elemtype 能在收发的元素类型
  • sendx 写入者处理到的 buf 中元素的序号
  • recvx 接受者处理到的 buf 中的元素的序号
  • recvq 被阻塞的接收者 Goroutine 列表
  • sendq 由于缓冲区空间不足而阻塞的写入者 Goroutine 列表
  • lock 互斥锁,可见一个管道同时仅允许被一个协程读写。

初始化 #

make 定义的 channel 经过编译会在运行时被翻译成调用 makechan 函数生成一个 *hchan。其中第一个参数是 channel 元素的类型,第二个参数是元素的大小。

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	return c
}

上面省略了一些非核心的校验代码,在 switch 片段中会,会根据函数传入的参数进行不同逻辑的创建:

  • 当分配的大小为 0 时,即不存在缓冲区的 channel,那么只在内存分配一个 hchan 结构体大小的空间即可;
  • 如果 channel 的元素类型不为指针,则在内存分配 hchan 结构体加 size 大小的连续内存空间。
  • 进入 default 为默认情况,即为指针的元素,需要为 hchan 和数据两部分单独分别分配内存。

在函数的最后统一配置创建的 hchan 其他字段的值并返回。

写入消息 #

当在代码里使用 ch <- i 写入数据时,经过编译会被翻译成调用 chansend1,继而调用 chansend

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
	chansend(c, elem, true, getcallerpc())
}

其中 chansend 的参数有四个:

  • hchan 即将要写入数据的 Channel
  • ep 要写入的数据。
  • block 是否阻塞,chansend1 的入参固定是 true
  • callerpc 程序计数器 pc,用于存放下一条指令所在单元的地址的地方。

代码中 chansend 函数比较长,整体实现了三种情况的消息写入,下面分开来说。

1. 可直接发送的 #

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}
  ...
}

满足条件:

  1. 目标 Channel 没有被关闭;
  2. 已经有处于读等待的 Goroutine(c.recvq可以取到值)。

执行操作:

  1. 从正在等待的写入协程队列 c.recvq 中取出最先进入队列的 Goroutine 。
  2. 然后通过 send 函数直接向这个 Goroutine 发送数据。

2. 向缓冲区发送的 #

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
...
}

满足条件:

  1. 创建的 Channel 有缓冲区;
  2. 缓冲区数据没有装满。

执行操作:

  1. 将向 Channel 发送的数据存储在 buf 空间里。
    1. 数据会存储在 buf 里的 c.sendx +1 的位置。
    2. 因为 buf 是一个循环数组,所以注意当 sendx 等于 dataqsiz 时会重新回到数组开始的位置。

3. 发送被阻塞的 #

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
  ...
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0

	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
  
  // Puts the current goroutine into a waiting state and calls unlockf on the system stack.
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)

	gp.waiting = nil
	gp.activeStackChans = false
	closed := !mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)

	return true
}

如果不满足上面的两种情况,发送就会被阻塞。比如 Channel 的缓冲区空间被装满,执行的操作如下:

  1. 创建一个 sudog 并将其加入 Channel 的 sendq 队列中;
  2. 当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据。

接收消息 #

当在代码里使用 i <- chi, ok <- ch 接收数据时,经过编译会被翻译成调用 chanrecv1chanrecv2,最终还是会调用运行时的 chanrecv 函数。

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
	chanrecv(c, elem, true)
}

//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
	_, received = chanrecv(c, elem, true)
	return
}

chanrecv 接收三个参数,分别是:

  • hchan 读取的队列
  • elem 存放读取到的数据的内存地址;
  • block 是否阻塞。

同写入 channel 一样,读取 channel 基本还是覆盖了之前的三种常见情况,但是有两种特殊情况:

  1. Channel 为 nil,这时会让出当前 Goroutine 的处理器使用权。
  2. Channel 已被关闭并且缓冲区没有数据,此时会清空 ep 中的数据并立刻返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
  // case 1
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	lock(&c.lock)
	// case 2
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		unlock(&c.lock)
		if ep != nil {
			typedmemclr(c.elemtype, ep)
		}
		return true, false
	}
}

1. 可以直接接收的 #

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
}

满足条件:

  1. c.sendq 可以取出等待的 Goroutine 时。

执行操作:

  1. 使用 recv 函数进行从 Channel 接收数据并写入 ep
recv 函数 #
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
	if c.dataqsiz == 0 {
		if ep != nil {
			// copy data from sender
			recvDirect(c.elemtype, sg, ep)
		}
	} else {
		// Queue is full. Take the item at the
		// head of the queue. Make the sender enqueue
		// its item at the tail of the queue. Since the
		// queue is full, those are both the same slot.
		qp := chanbuf(c, c.recvx)
		// copy data from queue to receiver
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// copy data from sender to queue
		typedmemmove(c.elemtype, qp, sg.elem)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
	}
	sg.elem = nil
	gp := sg.g
	unlockf()
	gp.param = unsafe.Pointer(sg)
	sg.success = true
	if sg.releasetime != 0 {
		sg.releasetime = cputicks()
	}
	goready(gp, skip+1)
}

recv 函数中会判断 Channel 是否有缓冲区,然后进行不一样的处理:

  1. Channel 没有缓冲区,将发送者 Goroutine 的 sg.elem 存储到 ep 中。
  2. Channel 有缓冲区:
    1. 将队列中的数据拷贝到接收方的内存地址;
    2. 然后将发送队列头的数据拷贝到缓冲区中,并释放一个阻塞的发送方

2. 可以从缓冲区接收的 #

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
}

满足条件:

  1. c.qcount > 0 说明 Channel 中的缓冲区不为空。

执行操作:

  1. hchanbufrecvx 位置取出数据给 ep

3. 接收被阻塞的 #

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0

	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// Signal to anyone trying to shrink our stack that we're about
	// to park on a channel. The window between when this G's status
	// changes and when we set gp.activeStackChans is not safe for
	// stack shrinking.
	atomic.Store8(&gp.parkingOnChan, 1)
	gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)

	gp.waiting = nil
	gp.activeStackChans = false

	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success
}

默认情况,不满足上面两个条件时,即发送队列不存在等待的 Goroutine ,并且缓冲区也没有数据时,进入这种情况,执行的操作如下:

  1. 将当前的 Goroutine 置为等待状态,并放入 Channel 的 recvq 队列。
  2. 让出当前 Goroutine 的处理器使用权。

参考 #

文章结构上基本参考了 Go 设计与实现的 这篇文章,但是文章的作者写了更多更细节和更全面的东西,本篇就是学习笔记了。


  1. Do not communicate by sharing memory; instead, share memory by communicating https://go.dev/doc/effective_go ↩︎

本文共 3726 字,上次修改于 Oct 10, 2022,以 CC 署名-非商业性使用-禁止演绎 4.0 国际 协议进行许可。

相关文章

» Gin Web 框架中 Middleware 的实现原理

» 了解下 Protobuf 相关概念

» Panic:assignment to entry in nil map

» Go 语言中 Goroutine 的并发数量控制

» Gin Web 框架中 Validate 使用总结