Jason Pan

【Go】WaitGroup x Channel

潘忠显 / 2023-06-24


WaitGroup 和 Channel 是使用 goroutine 时常见的两个概念,两者都可以用于 goroutine 的同步,不同场景下选择 WaitGroup 或 Channel 会显得更简洁;而复杂场景下,又会结合使用。小文介绍下 WaitGroup 和 Channel 的基本使用、常见错误,以及结合使用时候的注意事项。

1. WaitGroup 的使用

WaitGroup 用于等待所有启动的 goroutine 完成,是用来 goroutine 同步的。一个非常简单的例子(公式):

func main() {
    var wg sync.WaitGroup                         // 0 - 创建wg
    for i := 1; i <= 5; i++ {
        wg.Add(1)                                 // 1 - Add
        i := i                                    //
        go func() {                               // 2 - goroutine
            defer wg.Done()                       // 3 - Done
            fmt.Printf("Worker %d starting\n", i) // 4 - 主要逻辑
        }()                                       // 5 - 记得启动 goroutine
    }
    wg.Wait()                                     // 6 - Wait
}

上边例子中,有几点需要注意的:

Go 新手会经常漏掉类似的 i := i 的那句表达式,它表示:在 for 循环里边将 i 赋值给新变量 i,是为了避免在后边的闭包中重复使用相同的一个值

下边再说说 WaitGroup 的常见使用错误

2. Channel 的使用

Channel 是连接并发 goroutine 的管道,可以将值从一个 goroutine 发送到通道,并将这些值接收到另一个 goroutine 。比如下边的代码,channel 无需显式锁或条件变量即可起到了同步的作用,只有等到 msg 准备好接收,且 goroutine 往管道里写入了 “ping”,才能被 msg 接收到,然后进行后续的逻辑:

func main() {
    messages := make(chan string)  // 使用 channel 之前,必须先进行创建
    go func() { messages <- "ping" }()
    msg := <-messages
    fmt.Println(msg)
}

上边创建的 channel 没有指定缓存大小,就是无缓存的 channel:发送和接收会阻塞,直到另一方准备好为止

具体地,messages := make(chan string)messages := make(chan string, 1) 是不一样的,前者没有缓存,只有接收者,才能执行 messages <- "ping",不然会阻塞在 <-

Channel 有方向,可以被关闭

如果有多个 goroutine,写入 channel 的可以认为是生产者,读取的则可认为是消费者。读取 channel 中的元素,可以有两个参数,第二个说明是否还会有后续,这样生产者可以在结束生产的时候,通知到消费者结束任务:

go func() {
    for {
        j, more := <-jobs
        if more {
            fmt.Println("received job", j)
        } else {
            fmt.Println("received all jobs")
            done <- true
            return
        }
    }
}()

for j := 1; j <= 3; j++ {
    jobs <- j
    fmt.Println("sent job", j)
}

Channel 可以有缓存,使用 range 遍历

channel 可以指定一个长度,缓存那么多的元素,再多就有阻塞。可以用在多 goroutine 的高效处理,也可以用于单 goroutine 中作为一个简单的队列来缓存一些内容。另外,可以使用 range 像遍历切片一样来遍历 channel:

func main() {
    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue) 
    for elem := range queue {
        fmt.Println(elem)
    }
}

并非只有 close() 之后才能 range,但两者确实是有联系:上边的 close(queue) 是为了避免死锁,如果队列没关闭,range 会一直的阻塞等待新内容。

关闭的 Channel 可以接收

一个在关闭的通道上的接收操作总是可以立即进行,接收到之前发送的值后,将返回元素类型的零值。在 Go 规范中有详细描述:

A receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value after any previously sent values have been received.

多次对一个关闭的 Channel 使用接收操作符,每次都会返回零值。这个在之前《【Go】透彻理解 context.Context》一文中有提到 Context.Done() 中的应用,可以参考。

3. WaitGroup x Channel

同时使用 WaitGroup 和 Channel 的例子很常见,比如我们要并发去请求一些服务,然后对服务返回进行处理,使用 WaitGroup 等待所有任务都完成,使用 Channel 将返回传递给处理逻辑。

来看一个死锁的例子:

func main() {
    ch := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func() {
            ch <- 1
            wg.Done()
        }()
    }

    wg.Wait()
    close(ch) // closes the channel

    for element := range ch {
        fmt.Println(element) // prints each value
    }
}

很容易看出为什么会死锁,因为 ch 是无缓存的:

另外需要注意的是,上边会同时阻塞在3个位置,两个 channel <- 1 以及 wg.Wait(),因为 channel 是无缓存的。

所以上边程序改进,应该让 Channel 的接收代码及时的运行。将两行放在另外一个 goroutine 中运行:

func main() {
    ch := make(chan int)
    var wg sync.WaitGroup

    for i := 0; i < 2; i++ {
        wg.Add(1)
        go func() {
            ch <- 1
            wg.Done()
        }()
    }

    go func() {
        wg.Wait()
        close(ch) // closes the channel
    }()

    for element := range ch {
        fmt.Println(element) // prints each value
    }
}

对上边的程序做下简单的解释:

  1. 启动了 3 个 goroutine
  2. 主程序阻塞在 range ch,只有 close(ch) 之后才会取消阻塞
  3. 2 个 goroutine 完成往 channel 中各发送一个值,然后执行 wg.Done()
  4. for 循环打印出了 channel 中的两个值,仍然阻塞着
  5. wg.Wait() 等待完成,执行 close(ch)
  6. 主函数中 for 循环阻塞结束

上边的 4/5/6 也可能是别的情况:还没有遍历完打印之前,就完成了 wg.Waitclose(ch),顺序不确定,但是执行的结果都是一样的。

多个 channel 场景

更实际的场景汇总,可能会有多个 Channel,比如请求多个服务的时候,除了使用1个 Channel 来接收返回,另外还会使用一个 Channel 来接收异常,同时使用 WaitGroup 进行 goroutine 的同步。

如果还是像上述那样使用无缓存的Channel的话,处理起来就非常麻烦了。这样的场景下,可以直接使用有缓存的Channelfor- 循环有几个,就可以使用多大的 Channel。上述问题就很简单了,甚至那个 wg.Wait() 所在的 goroutine 都可以直接放在主函数中。