Go 语言中 Channel 的实现
11月 6, 2020
关于 Go 并发设计的哲学中,最常见的一句话就是:不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存1。Go 语言采用 CSP(Communicating sequential processes)并发模型,底层运用 Channel 实现其特性,如果不对 Channel 有一番了解,也就很难真正掌握 Go 语言独特的并发技术。
使用方法 #
先复习下 Channel 的使用方法。
定义 #
var ch chan int
通过 var
声明一个变量,不会申请内存空间,显然也无法向通道中写入或读取任何数据,所以在使用 Channel 时我们一般用 make
。
// 定义一个无缓冲的队列
ch := make(chan int)
// 定义一个有缓冲的队列
ch := make(chan int, 10)
写入通道数据 #
通道在符号 <-
左边就是写入。
ch := make(chan int)
// 写入数据
ch <- 1
读取通道数据 #
使用 i <- ch
读取通道数据。
package main
import "fmt"
func main() {
ch := make(chan int, 10)
defer close(ch)
ch <- 1
// 读取数据
i := <-ch
fmt.Println(i)
}
// 1
其中也可以 i, ok := <-ch
实现,第二个参数 ok
是 bool 类型表示是否读取了数据,需要注意的是,他不能用于表示通道的关闭状态。
作为参数和返回值 #
通道作为一等公民,可以作为参数和返回值。通道是协程之间交流的方式,不管是将数据读取还是写入通道,都需要将代表通道的变量通过函数传递到所在的协程中去。
func worker(ch chan int) {
for n := range ch {
fmt.Println("get data:", n)
}
}
func producer(length int) chan int {
return make(chan int, length)
}
单向通道 #
有时我们经常限定一个 Channel 在一个场景下只读或只写,从而防止滥用。Go 语言的类型系统为此提供了单方向的通道类型。
// 限制只读
func worker(ch <-chan int) {
for n := range ch {
fmt.Println("get data:", n)
}
}
// 限制只写
func writer(ch chan<- int) {
for i := 0; i < 10; i++ {
ch <- i
}
}
这种用法一般用在函数的形参上面,使用 make
并不能这样定义,而且需要注意的是,试图在只能写入的通道中读取数据,编译时会报错,同样试图在只读的通道写入数据也会报错。
关闭通道 #
ch := make(chan int, 1)
defer close(ch)
执行关闭时需要注意,当 Channel 是一个空指针或者已经被关闭时,执行 close
都会直接崩溃并抛出异常。
for range #
Go 提供了range
关键字,将其使用在 channel 上时,会自动等待 channel 的动作一直到 channel 被关闭,如下:
ticker := time.NewTicker(time.Minute * 5)
for range ticker.C {
doSomeThing()
}
select #
Go 中的 select
可以让 goroutine 同时等待多个 channel 可读或可写,它有几个特性需要注意:
- 除 default 外,如果只有一个 case 语句评估通过,那么就执行这个case里的语句;
- 除 default 外,如果有多个 case 语句评估通过,那么通过伪随机的方式随机选一个;
- 如果 default 外的 case 语句都没有通过评估,那么执行 default 里的语句;
- 如果没有 default,那么代码块会被阻塞,直到有一个 case 通过评估;否则一直阻塞
- case 分支永远不会进入
nil
通道; - 空的
select
语句会直接阻塞当前 Goroutine,导致 Goroutine 进入无法被唤醒的永久休眠状态。
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
ch := make(chan bool)
go func() {
defer close(ch)
//具体的任务,这里模拟做的任务需要1秒完成
time.Sleep(time.Second * 1)
}()
select {
case <- ch:
fmt.Println("ch bool", <- ch)
case <- ctx.Done():
fmt.Println("ctx done")
}
// output:
// ch bool false
len 和 cap 使用 #
len()
方法可以查询缓冲区数据的个数,cap()
可以查看缓冲区的大小。
底层原理 #
现在开始从源码角度分析上面的 Channel 提供的特性,源码集中在 runtime/chan.go
里面,可以自行进行翻阅查找,另外说明一点,从关键字的语法到编译再到运行时进入真正源码的转换逻辑这里先不介绍了,因为我还没搞懂,后面再专门说。
数据结构 #
type hchan struct {
qcount uint // total data in the queue
dataqsiz uint // size of the circular queue
buf unsafe.Pointer // points to an array of dataqsiz elements
elemsize uint16
closed uint32
elemtype *_type // element type
sendx uint // send index
recvx uint // receive index
recvq waitq // list of recv waiters
sendq waitq // list of send waiters
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
type waitq struct {
first *sudog
last *sudog
}
源码中有英文注释,这里再详细解释下:
qcount
已经在队列中的数据的个数dataqsize
循环队列的长度buf
存放实际数据的缓冲区的指针,这段缓冲区会被当作数据的环形队列使用。elemsize
能够收发的元素的大小closed
标记队列关闭状态elemtype
能在收发的元素类型sendx
写入者处理到的 buf 中元素的序号recvx
接受者处理到的 buf 中的元素的序号recvq
被阻塞的接收者 Goroutine 列表sendq
由于缓冲区空间不足而阻塞的写入者 Goroutine 列表lock
互斥锁,可见一个管道同时仅允许被一个协程读写。
初始化 #
make
定义的 channel 经过编译会在运行时被翻译成调用 makechan
函数生成一个 *hchan
。其中第一个参数是 channel 元素的类型,第二个参数是元素的大小。
func makechan(t *chantype, size int) *hchan {
elem := t.elem
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
// buf points into the same allocation, elemtype is persistent.
// SudoG's are referenced from their owning thread so they can't be collected.
// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
var c *hchan
switch {
case mem == 0:
// Queue or element size is zero.
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector uses this location for synchronization.
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// Elements do not contain pointers.
// Allocate hchan and buf in one call.
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// Elements contain pointers.
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
lockInit(&c.lock, lockRankHchan)
return c
}
上面省略了一些非核心的校验代码,在 switch
片段中会,会根据函数传入的参数进行不同逻辑的创建:
- 当分配的大小为 0 时,即不存在缓冲区的 channel,那么只在内存分配一个
hchan
结构体大小的空间即可; - 如果 channel 的元素类型不为指针,则在内存分配
hchan
结构体加 size 大小的连续内存空间。 - 进入
default
为默认情况,即为指针的元素,需要为hchan
和数据两部分单独分别分配内存。
在函数的最后统一配置创建的 hchan
其他字段的值并返回。
写入消息 #
当在代码里使用 ch <- i
写入数据时,经过编译会被翻译成调用 chansend1
,继而调用 chansend
。
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
其中 chansend
的参数有四个:
hchan
即将要写入数据的 Channelep
要写入的数据。block
是否阻塞,chansend1
的入参固定是true
callerpc
程序计数器 pc,用于存放下一条指令所在单元的地址的地方。
代码中 chansend
函数比较长,整体实现了三种情况的消息写入,下面分开来说。
1. 可直接发送的 #
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
满足条件:
- 目标 Channel 没有被关闭;
- 已经有处于读等待的 Goroutine(
c.recvq
可以取到值)。
执行操作:
- 从正在等待的写入协程队列
c.recvq
中取出最先进入队列的 Goroutine 。 - 然后通过
send
函数直接向这个 Goroutine 发送数据。
2. 向缓冲区发送的 #
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
qp := chanbuf(c, c.sendx)
if raceenabled {
racenotify(c, c.sendx, nil)
}
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
...
}
满足条件:
- 创建的 Channel 有缓冲区;
- 缓冲区数据没有装满。
执行操作:
- 将向 Channel 发送的数据存储在
buf
空间里。- 数据会存储在
buf
里的c.sendx
+1 的位置。 - 因为
buf
是一个循环数组,所以注意当sendx
等于dataqsiz
时会重新回到数组开始的位置。
- 数据会存储在
3. 发送被阻塞的 #
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// Block on the channel. Some receiver will complete our operation for us.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
c.sendq.enqueue(mysg)
atomic.Store8(&gp.parkingOnChan, 1)
// Puts the current goroutine into a waiting state and calls unlockf on the system stack.
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
gp.waiting = nil
gp.activeStackChans = false
closed := !mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true
}
如果不满足上面的两种情况,发送就会被阻塞。比如 Channel 的缓冲区空间被装满,执行的操作如下:
- 创建一个
sudog
并将其加入 Channel 的sendq
队列中; - 当前 Goroutine 也会陷入阻塞等待其他的协程从 Channel 接收数据。
接收消息 #
当在代码里使用 i <- ch
或 i, ok <- ch
接收数据时,经过编译会被翻译成调用 chanrecv1
和 chanrecv2
,最终还是会调用运行时的 chanrecv
函数。
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
chanrecv
接收三个参数,分别是:
hchan
读取的队列elem
存放读取到的数据的内存地址;block
是否阻塞。
同写入 channel 一样,读取 channel 基本还是覆盖了之前的三种常见情况,但是有两种特殊情况:
- Channel 为 nil,这时会让出当前 Goroutine 的处理器使用权。
- Channel 已被关闭并且缓冲区没有数据,此时会清空
ep
中的数据并立刻返回。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// case 1
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
lock(&c.lock)
// case 2
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
}
1. 可以直接接收的 #
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if sg := c.sendq.dequeue(); sg != nil {
// Found a waiting sender. If buffer is size 0, receive value
// directly from sender. Otherwise, receive from head of queue
// and add sender's value to the tail of the queue (both map to
// the same buffer slot because the queue is full).
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
}
满足条件:
c.sendq
可以取出等待的 Goroutine 时。
执行操作:
- 使用
recv
函数进行从 Channel 接收数据并写入ep
。
recv 函数 #
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if c.dataqsiz == 0 {
if ep != nil {
// copy data from sender
recvDirect(c.elemtype, sg, ep)
}
} else {
// Queue is full. Take the item at the
// head of the queue. Make the sender enqueue
// its item at the tail of the queue. Since the
// queue is full, those are both the same slot.
qp := chanbuf(c, c.recvx)
// copy data from queue to receiver
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// copy data from sender to queue
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
sg.success = true
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
在 recv
函数中会判断 Channel 是否有缓冲区,然后进行不一样的处理:
- Channel 没有缓冲区,将发送者 Goroutine 的
sg.elem
存储到ep
中。 - Channel 有缓冲区:
- 将队列中的数据拷贝到接收方的内存地址;
- 然后将发送队列头的数据拷贝到缓冲区中,并释放一个阻塞的发送方;
2. 可以从缓冲区接收的 #
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
racenotify(c, c.recvx, nil)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
}
满足条件:
c.qcount > 0
说明 Channel 中的缓冲区不为空。
执行操作:
- 从
hchan
的buf
的recvx
位置取出数据给ep
。
3. 接收被阻塞的 #
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
// no sender available: block on this channel.
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
// Signal to anyone trying to shrink our stack that we're about
// to park on a channel. The window between when this G's status
// changes and when we set gp.activeStackChans is not safe for
// stack shrinking.
atomic.Store8(&gp.parkingOnChan, 1)
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
gp.waiting = nil
gp.activeStackChans = false
success := mysg.success
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, success
}
默认情况,不满足上面两个条件时,即发送队列不存在等待的 Goroutine ,并且缓冲区也没有数据时,进入这种情况,执行的操作如下:
- 将当前的 Goroutine 置为等待状态,并放入 Channel 的
recvq
队列。 - 让出当前 Goroutine 的处理器使用权。
参考 #
文章结构上基本参考了 Go 设计与实现的 这篇文章,但是文章的作者写了更多更细节和更全面的东西,本篇就是学习笔记了。
Do not communicate by sharing memory; instead, share memory by communicating https://go.dev/doc/effective_go ↩︎