【Go】WaitGroup x Channel
潘忠显 / 2023-06-24
WaitGroup 和 Channel 是使用 goroutine 时常见的两个概念,两者都可以用于 goroutine 的同步,不同场景下选择 WaitGroup 或 Channel 会显得更简洁;而复杂场景下,又会结合使用。小文介绍下 WaitGroup 和 Channel 的基本使用、常见错误,以及结合使用时候的注意事项。
1. WaitGroup 的使用
WaitGroup 用于等待所有启动的 goroutine 完成,是用来 goroutine 同步的。一个非常简单的例子(公式):
func main() {
var wg sync.WaitGroup // 0 - 创建wg
for i := 1; i <= 5; i++ {
wg.Add(1) // 1 - Add
i := i //
go func() { // 2 - goroutine
defer wg.Done() // 3 - Done
fmt.Printf("Worker %d starting\n", i) // 4 - 主要逻辑
}() // 5 - 记得启动 goroutine
}
wg.Wait() // 6 - Wait
}
上边例子中,有几点需要注意的:
- 每准启动一个 goroutine 就给
wg.Add(1)
,也可以在外边一次性增加 - 每个 goroutine 结束的时候调用
wg.Done()
- 在所有 goroutine 被调用的后边,调用
wg.Wait()
Go 新手会经常漏掉类似的 i := i
的那句表达式,它表示:在 for 循环里边将 i
赋值给新变量 i
,是为了避免在后边的闭包中重复使用相同的一个值。
下边再说说 WaitGroup 的常见使用错误:
wg.Add(1)
放在了 goroutine 中:wg.Add(1)
是为了在后边wg.Wait()
进行计数的,放在 goroutine 中如果它没有启动,可以直接走到wg.Wait()
,这时候计数也能达到满足,然后进行之后的逻辑。- 传递
wg
时引起了值复制:WaitGroup
的定义是一个struct
,其中存储着用于同步的变量,应该使用地址传递,不然两个 wg 并不是相同的一个,会造成死锁。 - 不是错误的错误——多次
wg.Wait()
:因为wg.Wait()
是阻塞等待计数变成 0 的函数,如果多写几个,其实是没影响的,只要一个计数变成0了,后边依然会通过。但是这样的方式,会在自己编码或阅读代码时引起困惑。 - 忘记执行 go 函数:编码时容易忘记在
go func(){}
之后加()
,不过这个会在格式化代码或者编译的时候提示错误,能及时发现。
2. Channel 的使用
Channel 是连接并发 goroutine 的管道,可以将值从一个 goroutine 发送到通道,并将这些值接收到另一个 goroutine 。比如下边的代码,channel 无需显式锁或条件变量即可起到了同步的作用,只有等到 msg
准备好接收,且 goroutine 往管道里写入了 “ping”,才能被 msg
接收到,然后进行后续的逻辑:
func main() {
messages := make(chan string) // 使用 channel 之前,必须先进行创建
go func() { messages <- "ping" }()
msg := <-messages
fmt.Println(msg)
}
上边创建的 channel 没有指定缓存大小,就是无缓存的 channel:发送和接收会阻塞,直到另一方准备好为止。
具体地,messages := make(chan string)
和 messages := make(chan string, 1)
是不一样的,前者没有缓存,只有接收者,才能执行 messages <- "ping"
,不然会阻塞在 <-
。
Channel 有方向,可以被关闭
如果有多个 goroutine,写入 channel 的可以认为是生产者,读取的则可认为是消费者。读取 channel 中的元素,可以有两个参数,第二个说明是否还会有后续,这样生产者可以在结束生产的时候,通知到消费者结束任务:
go func() {
for {
j, more := <-jobs
if more {
fmt.Println("received job", j)
} else {
fmt.Println("received all jobs")
done <- true
return
}
}
}()
for j := 1; j <= 3; j++ {
jobs <- j
fmt.Println("sent job", j)
}
Channel 可以有缓存,使用 range
遍历
channel 可以指定一个长度,缓存那么多的元素,再多就有阻塞。可以用在多 goroutine 的高效处理,也可以用于单 goroutine 中作为一个简单的队列来缓存一些内容。另外,可以使用 range
像遍历切片一样来遍历 channel:
func main() {
queue := make(chan string, 2)
queue <- "one"
queue <- "two"
close(queue)
for elem := range queue {
fmt.Println(elem)
}
}
并非只有 close()
之后才能 range
,但两者确实是有联系:上边的 close(queue)
是为了避免死锁,如果队列没关闭,range 会一直的阻塞等待新内容。
关闭的 Channel 可以接收
一个在关闭的通道上的接收操作总是可以立即进行,接收到之前发送的值后,将返回元素类型的零值。在 Go 规范中有详细描述:
A receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value after any previously sent values have been received.
多次对一个关闭的 Channel 使用接收操作符,每次都会返回零值。这个在之前《【Go】透彻理解 context.Context》一文中有提到 Context.Done()
中的应用,可以参考。
3. WaitGroup x Channel
同时使用 WaitGroup 和 Channel 的例子很常见,比如我们要并发去请求一些服务,然后对服务返回进行处理,使用 WaitGroup 等待所有任务都完成,使用 Channel 将返回传递给处理逻辑。
来看一个死锁的例子:
func main() {
ch := make(chan int)
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
ch <- 1
wg.Done()
}()
}
wg.Wait()
close(ch) // closes the channel
for element := range ch {
fmt.Println(element) // prints each value
}
}
很容易看出为什么会死锁,因为 ch
是无缓存的:
- 闭包中会卡在
ch <- 1
,不会执行wg.Done()
- 主函数中则会阻塞在
wg.Wait()
另外需要注意的是,上边会同时阻塞在3个位置,两个 channel <- 1
以及 wg.Wait()
,因为 channel 是无缓存的。
所以上边程序改进,应该让 Channel 的接收代码及时的运行。将两行放在另外一个 goroutine 中运行:
func main() {
ch := make(chan int)
var wg sync.WaitGroup
for i := 0; i < 2; i++ {
wg.Add(1)
go func() {
ch <- 1
wg.Done()
}()
}
go func() {
wg.Wait()
close(ch) // closes the channel
}()
for element := range ch {
fmt.Println(element) // prints each value
}
}
对上边的程序做下简单的解释:
- 启动了 3 个 goroutine
- 主程序阻塞在
range ch
,只有 close(ch) 之后才会取消阻塞 - 2 个 goroutine 完成往 channel 中各发送一个值,然后执行
wg.Done()
- for 循环打印出了 channel 中的两个值,仍然阻塞着
wg.Wait()
等待完成,执行close(ch)
- 主函数中 for 循环阻塞结束
上边的 4/5/6 也可能是别的情况:还没有遍历完打印之前,就完成了 wg.Wait
和 close(ch)
,顺序不确定,但是执行的结果都是一样的。
多个 channel 场景
更实际的场景汇总,可能会有多个 Channel,比如请求多个服务的时候,除了使用1个 Channel 来接收返回,另外还会使用一个 Channel 来接收异常,同时使用 WaitGroup 进行 goroutine 的同步。
如果还是像上述那样使用无缓存的Channel的话,处理起来就非常麻烦了。这样的场景下,可以直接使用有缓存的Channel,for-
循环有几个,就可以使用多大的 Channel。上述问题就很简单了,甚至那个 wg.Wait()
所在的 goroutine 都可以直接放在主函数中。