原文:Go - graceful shutdown of worker goroutines
在这篇博文中,我们将看看 Go 程序的优雅关闭。这类 Go 程序有一些执行任务的工作 goroutine,要求在程序关闭之前,这些工作 goroutine 必须完成任务。
介绍
在一个最近的项目中,我们有一个使用场景:一个基于 Go 的微服务不断地消费另一个第三方库发出的事件。这些事件在调用外部服务之前,会进行一些处理。而外部服务处理每个请求的速度都相当慢,但另一方面,它能够处理许多并发请求。因此,我们实现了一个简单的 worker 池,将输入事件扇出为几个并发执行的 goroutine。
总的来说,它看起来像这样:
然而,我们需要保证在该微服务关闭的时候,当前任何正在运行的对外部服务的请求必须完成,并且请求结果在我们的内部后端持久化。
worker 池和终止信号处理
worker 池模式是一个有名的关于 worker 池的 Go 模式。此外,还有大量关于如何进行基于 SIGTERM 通知的优雅关闭的例子。但我们意识到,我们的一些需求使得使用场景有点更复杂。
当程序接收到 SIGTERM 或者 SIGINT 信号(例如,因容器编排器缩容到一定数目的副本数而产生的)时,在终止整个程序之前,必须允许当前任何工作中的 worker goroutine 完成它们长期运行的工作。
让事情稍微复杂些的是,我们对生产者端的库没有任何控制权。一开始我们会注册一个回调函数,每当生产端的库有了一个(我们需要的)事件,就会调用这个回调函数。该库会处于阻塞状态,直到回调函数结束执行。然后,当有更多事件产生时,库会再次调用这个函数。
worker-pool 的诸多 goroutine 通过使用标准的“对 channel 进行 range 操作”结构,来不断处理事件,例如:
1 | func workerFunc() { |
这意味着,让一个 worker “结束”最干净的方式是关闭名为 “jobsChan” 的 channel。
在生产者端进行关闭
你首先学到的关于在 Go 中关闭 channel 的第一件事情之一是,如果向已关闭的 channel 发送数据,程序就会 panic。这归结于一个非常简单的规则:
“总是在生产者端关闭一个 channel(Always close a channel on the producer side)”
不管怎样,什么是生产者端呢?嗯,一般是那个将事件发送到 channel 里的 goroutine: 1
2
3
4func callbackFunc(event int) {
jobsChan<-event
}
你要如何 安全地 保护上面的代码免于给已关闭的 channel 发送数据呢?一路沿着 Mutex、布尔型标志和 if 语句以确定是否一些_其他_ goroutine 关闭了 channel,以及控制是否应该允许发送数据,这并不简单。多留心潜在的竞争条件和不确定行为。
我们的解决方法是引入一个中间 channel 和一个内部的“消费者”,后者作为回调和任务 channel 之间的代理:
消费者函数看起来像这样:
1 | func startConsumer(ctx context.Context) { |
好了,等下。这个 “select” 和 “ctx.Done()” 是啥?
恕我直言,select 语句是 Go 最神奇的东西之一。它允许多个 channel 的等待和协同。在这种情况下,我们或者会从中间 channel 那里收到事件,然后将其传到 jobsChan,又或者会从 context.Context 接收到取消信号。
关闭 jobsChan 之后的 return 语句将让我们离开 for 循环和函数,这确保了 不会有新事件被传递给 jobsChan,并且不会从 intermediateChan 消费到 任何事件。
所以,要么是传递事件到 jobsChan(worker 从这里消费),要么在作为生产者的 同一个 goroutine 中 关闭 jobsChan。
关闭 jobsChan 意味着消费端的所有 worker 将会停止遍历 jobsChan:
1 | for event := range jobsChan { // <- on the close(jobsChan), all goroutines waiting for jobs here will exit the for-loop |
发出取消信号
等待 Go 程序退出是一种有名的模式:
1 | func main() { |
在 “接下来呢?” 这一部分,捕获到 SIGINT 或者 SIGTERM 后,主 goroutine 恢复执行。我们需要告诉将事件从 intermediateChan 传到 jobsChan 的消费者,跨 goroutine 边界关闭 jobsChan。
再次,使用 Mutex 和条件语句来解决这个问题,技术上是可行的,但是相当难搞并且容易出错。作为替代,我们会利用前面提及的 context.Context 的取消支持。
在 func main() 的某个地方,我们设置了一个带取消支持的根 background context:
1 | func main() { |
这就是 < -ctx.Done() 这一 select case 如何被调用的,它开始优雅拆卸 channel 和 worker。
使用 WaitGroup
上面这个方法只有一个问题:调用 cancelFunc() 后,程序会立即退出,这意味着,正在动态调用中的工作 goroutine 将没有时间执行完毕,这使得我们系统中的处理有可能处于中间态。
我们需要停止关闭,直到所有的 worker 都报告说它们完成了工作。现在,我们进入 sync.WaitGroup,它允许我们等待任意数目的 goroutine 结束!
当启动 worker 时,我们传递一个指向在 func main() 中创建的 WaitGroup 的指针:
1 | const numberOfWorkers = 4 |
这会稍微改变我们的 worker 启动函数: 1
2
3
4
5
6
7func workerFunc(wg *sync.WaitGroup) {
defer wg.Done() // Mark this goroutine as done! once the function exits
for event := range jobsChan {
// handle the event...
}
}
运行
最终程序的源代码在下一个部分。在此其中,我添加了一些日志,这样就能看看该过程发生了什么。
下面是一个带有 4 个工作 goroutine 的程序的执行输出,这里,我使用 Ctrl+C 来停止程序: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28$ go run main.go
Worker 3 starting
Worker 2 starting
Worker 1 starting
Worker 0 starting
Worker 3 finished processing job 0
Worker 0 finished processing job 3
^C********************************* <-- HERE I PRESS CTRL+C
Shutdown signal received
*********************************
Worker 3 finished processing job 4
Worker 2 finished processing job 1
Worker 1 finished processing job 2
Consumer received cancellation signal, closing jobsChan! <-- Here, the consumer receives the <-ctx.Done()
Worker 3 finished processing job 6
Worker 0 finished processing job 5
Worker 1 finished processing job 8
Worker 2 finished processing job 7
Worker 0 finished processing job 10
Worker 0 interrupted <-- Worker 0 has finished job #10, 3 left
Worker 2 finished processing job 12
Worker 2 interrupted <-- Worker 2 has finished job #12, 2 left
Worker 3 finished processing job 9
Worker 3 interrupted <-- Worker 3 has finished job #9, 1 left
Worker 1 finished processing job 11
Worker 1 interrupted <-- Worker 1 has finished job #11, all done
All workers done, shutting down!1
2“如果可以处理一个或多个通信,那么选择进行处理的那个 chanel 是通过统一的伪随机选择的。(If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection)”。
另一个特别的事情是,似乎即使在关闭了 jobsChan 之后,任务(任务 9-12)还是被传给 worker 了。恩,它们实际是在该 channel 被关闭 之前 被传给 worker 的。这个现象会发生是因为我们使用了一个带有 4 个“槽” 的缓存 channel。这意味着,假定我们第三方生产者以比我们的 worker 可以处理的速度更快地不断传递新事件,如果所有四个 worker 都从 channel 中消费了一个任务并且处理它们,那么该 channel 里就可能会有四个新的事件正等待被消费。关闭 channel 并不会影响那些已经缓存到 channel 里的数据 —— Go 允许消费者消费它们。
如果我们将 jobsChan 修改为无缓存的: 1
2jobsChan := make(chan int)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21$ go run main.go
.... omitted for brevity ....
^C*********************************
Shutdown signal received
*********************************
Worker 3 finished processing job 3
Worker 3 started job 5
Worker 0 finished processing job 4
Worker 0 started job 6
Consumer received cancellation signal, closing jobsChan! <-- again, it may take some time until the consumer is handed <-ctx.Done()
Consumer closed jobsChan
Worker 1 finished processing job 1 <-- From here on, we see that each worker finishes exactly one job before being interrupted.
Worker 1 interrupted
Worker 2 finished processing job 2
Worker 2 interrupted
Worker 0 finished processing job 6
Worker 0 interrupted
Worker 3 finished processing job 5
Worker 3 interrupted
All workers done, shutting down!
完整的程序
上面的代码片段在某些地方进行了简化,以使得它们尽可能简洁。带有某些结构以封装和模拟第三方生产者的完整程序如下:
1 | package main |
消费者结构: 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40// -- 从这里起,下面是 Consumer!
type Consumer struct {
ingestChan chan int
jobsChan chan int
}
// 每当外部库传递一个事件给我们,就会调用 callbackFunc。
func (c Consumer) callbackFunc(event int) {
c.ingestChan <- event
}
// workerFunc 启动一个 worker 函数,它会遍历 jobsChan,直到该 channel 关闭。
func (c Consumer) workerFunc(wg *sync.WaitGroup, index int) {
defer wg.Done()
fmt.Printf("Worker %d starting\n", index)
for eventIndex := range c.jobsChan {
// 模拟工作执行 1 ~ 3 秒
fmt.Printf("Worker %d started job %d\n", index, eventIndex)
time.Sleep(time.Millisecond * time.Duration(1000+rand.Intn(2000)))
fmt.Printf("Worker %d finished processing job %d\n", index, eventIndex)
}
fmt.Printf("Worker %d interrupted\n", index)
}
// startConsumer 作为 ingestChan 和 jobsChan 之间的代理,使用 select 语句以支持优雅关闭。
func (c Consumer) startConsumer(ctx context.Context) {
for {
select {
case job := <-c.ingestChan:
c.jobsChan <- job
case <-ctx.Done():
fmt.Println("Consumer received cancellation signal, closing jobsChan!")
close(c.jobsChan)
fmt.Println("Consumer closed jobsChan")
return
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14// -- Producer 模拟一个外部库,每 100ms 有新数据时simulates an external library that invokes the
// 它会调用注册的回调函数。
type Producer struct {
callbackFunc func(event int)
}
func (p Producer) start() {
eventIndex := 0
for {
p.callbackFunc(eventIndex)
eventIndex++
time.Sleep(time.Millisecond * 100)
}
}
我希望这篇小博文提供了一个简单的例子,说明了基于 goroutine 的 worker 池,以及如何使用基于 context 的取消、WaitGroup 和生产端 channel 关闭,来优雅关闭这些 goroutine。