Go errgroup 源码深度剖析:从原理到 Context 取消机制全解析
本文深入剖析 Go 语言 errgroup 包的源码实现,对比原生 sync.WaitGroup 的不足,详解 errgroup 如何实现自动错误收集、goroutine 数量限制和 Context 联动取消。同时深入 context.WithCancelCause 源码,揭示取消原因传递机制,帮助你彻底掌握 Go 并发编程中的协程管理最佳实践。
Go errgroup
专门用于简化一组 goroutine 的生命周期管理和错误收集,解决了普通WaitGroup无法便捷收集错误、无法快速取消其他 goroutine 的痛点。
errgroup vs 原生 WaitGroup
| 特性 | sync.WaitGroup | errgroup |
|---|---|---|
| 等待 goroutine 结束 | 需要手动 Add/Done | 自动管理,无需手动调用 |
| 收集错误 | 需要自己用 channel 实现 | 自动收集第一个非 nil 错误 |
| 取消其他 goroutine | 需要自己结合 Context 实现 | 内置 Context,自动取消 |
| 使用复杂度 | 低(但扩展功能复杂) | 中(一站式解决多 goroutine 问题) |
使用示例
下面代码是 通过开启三个 goroutine 并发执行,能够感知错误,并且便捷收集第一个错误
context.Cause(ctx) 这个函数是专门用来获取 Context 取消原因的,后续会详细介绍
package main
import (
"context"
"errors"
"fmt"
"time"
"golang.org/x/sync/errgroup"
)
func main() {
eg, ctx := errgroup.WithContext(context.Background())
eg.Go(func() error {
fmt.Println("doing task1")
time.Sleep(5 * time.Second)
return errors.New("task1 error") // 业务错误:作为取消原因 这边取消 其他 goroutine 就会走 <-ctx.Done()
})
eg.Go(func() error {
for {
select {
case <-time.After(time.Second):
fmt.Println("doing task2")
case <-ctx.Done():
fmt.Println("task2 canceled")
// 可以调用 context.Cause(),获取取消的具体原因
cause := context.Cause(ctx)
fmt.Printf("task2 被取消的原因:%v\n", cause)
return ctx.Err() //这时候 return 的错误 是第二次或者第三次了 我们只记录第一次的
}
}
})
eg.Go(func() error {
for {
select {
case <-time.After(time.Second):
fmt.Println("doing task3")
case <-ctx.Done():
fmt.Println("task3 canceled")
// 可以调用 context.Cause(),获取取消的具体原因
cause := context.Cause(ctx)
fmt.Printf("task3 被取消的原因:%v\n", cause)
return ctx.Err()
}
}
})
err := eg.Wait()
if err != nil {
fmt.Println("task failed")
// 在 Wait() 后, 返回第一次错误
fmt.Printf("整体任务被取消的原因:%v\n", err)
} else {
fmt.Println("task success")
}
}
errgroup 源码解析
内部结构体
// 空结构体在 Go 中不占用字节内存,是标准用于通知的
type token struct{}
type Group struct {
cancel func(error) // 取消下级关联 Context 的函数
wg sync.WaitGroup // 管理 goroutine 的等待,替代手动 Add/Done
sem chan token // 带缓冲的通道 限制最大能有几个 goroutine 并行 用 SetLimit 函数去指明 不用这个函数那这个字段就不用管
errOnce sync.Once // 原子语句 无论多少 goroutine 写入 都只执行一次
err error
}
入口函数
新版本使用了 WithCancelCause 换掉了传统的 context.WithCancel
对于 context.WithCancelCause 与 context.WithCancel 的 源码 放在文章最后,深入理解 context 可阅读
func WithContext(ctx context.Context) (*Group, context.Context) {
//返回 子Context 和 可以取消的函数 这里覆盖了主 ctx 无所谓
ctx, cancel := context.WithCancelCause(ctx)
// 返回 errgroup 内置 结构体 和 context
return &Group{cancel: cancel}, ctx
}
(g *Group) SetLimit(n int) 函数
用于控制同时活跃的 goroutine 数量,实现并发限制,小于0,就没有限制,也就不用 g.sem 字段
必须是在初始化的时候去调用 SetLimit 函数,必须保证当前无活跃 goroutine,否则直接 panic 为了安全
func (g *Group) SetLimit(n int) {
if n < 0 {
g.sem = nil
return
}
if active := len(g.sem); active != 0 {
panic(fmt.Errorf("errgroup: modify limit while %v goroutines in the group are still active", active))
}
// 启动 容量为 n 的 cannel
g.sem = make(chan token, n)
}
(g *Group) done() 函数
本质对 WaitGroup 封装了一个 对 goroutine 数量限制 不设置的话 就相当于 WaitGroup.Done()
func (g *Group) done() {
if g.sem != nil { // 如果设置了goroutine数量限制
<-g.sem // 释放信号量,从sem通道取出一个token,允许新的goroutine启动
}
g.wg.Done() // 通知 WaitGroup:一个goroutine执行完成
}
(g *Group) Wait() 函数
同样也是多封装了一下 调用 cancel(g.err) 方法 去记录错误(如果有的话) 内部具体细节可以看最后源码
func (g *Group) Wait() error {
g.wg.Wait() // 阻塞,直到所有goroutine调用了wg.Done()
if g.cancel != nil { // 如果绑定了Context(通过WithContext创建)
g.cancel(g.err) // 取消Context,传递错误作为取消原因
}
return g.err // 返回第一个非nil错误(无错误则返回nil)
}
核心 (g *Group) Go 函数
这里并没有捕获 panic ,如果捕捉的话可能导致 panic 时机延迟,调试困难,panic 栈会被转为普通值,无法被监控工具捕获,而且还可能导致死锁
比如 两个goroutine被创建 第一个发生 panic 第二个在阻塞等待,Wait 需要等待两个任务结束才返回,死锁
func (g *Group) Go(f func() error) {
// 若设置了goroutine数量限制,先获取信号量 如果已经满了 阻塞直到有空闲token
if g.sem != nil {
g.sem <- token{} // 发送空token到sem通道,占用一个并发名额
}
// 增加WaitGroup计数
g.wg.Add(1)
// 启动新goroutine执行任务
go func() {
defer g.done() // 最后执行无论错误还是正常
// 执行用户传入的函数,获取错误
if err := f(); err != nil {
// 仅第一次执行 保证只存第一个错误
g.errOnce.Do(func() {
g.err = err // 存储第一个错误
if g.cancel != nil { // 若绑定了Context,取消所有关联 goroutine 大部分都会有的 因为我们在入口传了
g.cancel(g.err)
}
})
}
}()
}
// 这个函数和 Go 的唯一区别就是 它不阻塞 一旦数量满了 直接退出
func (g *Group) TryGo(f func() error) bool {
if g.sem != nil {
select {
case g.sem <- token{}: // 成功获取token
default: // 无空闲token 并发数达上限,直接返回false
return false
}
}
... //逻辑一样
}
Context 部分源码
context.WithCancelCause 与 context.WithCancel 的 源码 区别
简单理解就是在返回 cancel函数的时候多了一个参数而已
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
c := withCancel(parent)
return c, func() { c.cancel(true, Canceled, nil) }
}
func WithCancelCause(parent Context) (ctx Context, cancel CancelCauseFunc) {
c := withCancel(parent)
return c, func(cause error) { c.cancel(true, Canceled, cause) }
}
先介绍核心结构体 cancelCtx 和 cancelCtx.cancel 方法
context.WithCancel 和 context.WithCancelCause 以及 context.WithTimeout 都有它的影子
// 核心结构体 cancelCtx
type cancelCtx struct {
Context // 嵌入父Context,继承父Context的所有方法(Done/Err/Value等) 该字段是个接口
mu sync.Mutex // 保护以下字段的并发读写安全
done atomic.Value // 存储chan struct{},负责通知外部阻塞函数 查看 Done方法 和 Cancel 方法 之后就明白
children map[canceler]struct{} // 存储当前Context的子canceler,取消时会遍历取消所有子Context
err atomic.Value // 存储取消时的错误(如context.Canceled),原子操作避免锁竞争
cause error // WithCancelCause的核心 :存储取消原因,仅在第一次取消时赋值
}
// context包内部的接口,定义了「可取消」的行为
type canceler interface {
cancel(removeFromParent bool, err error, cause error)
Done() <-chan struct{}
}
核心方法 cancelCtx.cancel
这个函数 是写方法,使用频率是极低的 可能就是手动 cancel 或者 父 context 进行 cancel,为了防止并发问题也就是同时进行 c.err.Load() 发现没有,然后都执行了 c.err.Store(err) 是有问题的 我们只记录第一次 而且后面 close(d) 会报错,这里必须用锁,也不会影响性能
读方法 Err()、Done() 方法 常见 for + select 可能每个 goroutine 每毫秒跑几万次,这种就不能加锁,太影响性能 这个时候就体现出来 atomic.Value 这个字段的好处,防止一个 goroutine 在 cancel() 里写 c.err,另一个 goroutine 同时在 Err() 里读 c.err
func (c *cancelCtx) cancel(removeFromParent bool, err, cause error) {
// Context 设计原则:取消必须关联错误
if err == nil {
panic("context: internal error: missing cancel error")
}
// 旧版本 并没有 WithCancelCause 也就没有这个字段 兼容 旧逻辑
if cause == nil {
cause = err
}
// 上面已经解释
c.mu.Lock()
// 看一下 里面有没有数据
if c.err.Load() != nil { //原子地把值拿出来
c.mu.Unlock()
return // 说明有数据 已经写过了 只要写过了 就一定是在这个函数中写的 说明运行过一次 直接return
}
// 到这说明第一次进 cancel 方法 写入错误下一个 goroutine 再来调用 直接 return
c.err.Store(err) // 原子操作:高频读取时无需加锁,性能更优
c.cause = cause // 保证了仅第一次赋值有效 因为第二次直接 return
// 关闭取消信号通道,用于通知外部 select { case <-ctx.Done(): } 消除阻塞
d, _ := c.done.Load().(chan struct{})
if d == nil {
// 懒创建 下面会详细讲
c.done.Store(closedchan)
} else {
// 若 done 通道已创建,关闭通道 消除阻塞
close(d)
}
// 递归取消所有子 Context
for child := range c.children {
// 持有父锁时获取子锁会有嵌套锁风险,但 Context 取消是低频操作,可接受
child.cancel(false, err, cause)
}
// 清空子列表,释放内存,避免泄漏
c.children = nil
//解锁
c.mu.Unlock()
// 基本上都是 True 默认移除 避免内存泄漏
if removeFromParent {
// 从父 Context 移除自身
removeChild(c.Context, c)
}
}
懒创建
我们在写代码的时候,很少去写 ctx.Done() 方法,非常常见的两种,要么不用,要么只用 cancel
// 第一种 不用
func work(ctx context.Context) {
time.Sleep(10 * time.Second)
}
// 第二种 只 cancel
ctx, cancel := context.WithCancel(parent)
defer cancel()
doSomething(ctx) // ctx 只是被传来传去
正常逻辑是 建完 ctx 要使用 Done()方法
它进行了两次判断 其实可以进行一次 在刚开始进来的时候加锁就可以直接判断是否有值,可以一样的效果,但是这个函数读极多,就写一次,所以大多数情况下都直接返回,根本不需要锁,如果按刚开始就加锁,性能直线下降
func (c *cancelCtx) Done() <-chan struct{} { //返回一个只能接收的 channel
d := c.done.Load() //第一次读取
if d != nil {
//之前已经调用过 Done()
return d.(chan struct{})
}
c.mu.Lock()
defer c.mu.Unlock()
d = c.done.Load() // 第二次重新读取
if d == nil {
// 第一次 初始化并存入
d = make(chan struct{})
c.done.Store(d)
}
return d.(chan struct{})
}
解释 Done() 是 极高频函数
go语言非常常见的 阻塞循环,每次执行 ctx.Done() 函数 返回的没有数据的 channel 只能走 default 逻辑,default 啥也不干 然后再回到 select 语句中 在执行 ctx.Done() 函数 直到 调用 concel 函数 ,可能每毫秒跑几万次,只第一次进行了修改,进行了两次判断,其他的都是直接返回,所以不能在刚开始进行加锁
for {
select {
case <-ctx.Done():
return
default:
}
}
深入 withCancel 函数
我们都知道 parent 参数一般情况就是我们自己传的 ctx 可能是 context.WithCancel 或者 context.Background
在标准项目或者大型项目中,在主要内部结构体中会定义 cancel 字段用于 优雅退出
如果 parent 是 context.Background 没啥意义,
func withCancel(parent Context) *cancelCtx {
if parent == nil {
panic("cannot create context from nil parent")
}
// 1. 创建空的cancelCtx实例
c := &cancelCtx{}
// 2. 建立当前 cancelCtx 和父 Context 的关联,作用是父取消时,子也会被取消
c.propagateCancel(parent, c)
return c
}
propagateCancel 函数
func (c *cancelCtx) propagateCancel(parent Context, child canceler) {
// 挂载到字段
c.Context = parent
// 查看 父Context是否可取消
done := parent.Done()
if done == nil {
return // 如果是 context.Background 无需关联,父永远不会取消,子只能手动取消
}
// 检查父Context是否已经取消 非阻塞
select {
case <-done:
// 父已取消:立即取消子Context,继承父的错误和原因 这里的child 也就是我们刚创建的 空的cancelCtx实例
child.cancel(false, parent.Err(), Cause(parent))
return
default: // 父没取消 正常情况,在代码健壮的情况下 不太可能刚创建就取消了
}
// 断言 将父Context转为 *cancelCtx 几乎所有的Context 底层都有它的影子 这是最常见的
if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock() // 加锁保证并发安全
if err := p.err.Load(); err != nil {
// 父已取消:立即取消刚创建的 cancelCtx实例
child.cancel(false, err.(error), p.cause)
} else {
// 父未取消:将子Context加入父的children列表 当父取消时会遍历取消子
if p.children == nil { //第一次需要创建
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
return // 大多数到这返回了
}
// 特殊情况:父Context实现了afterFuncer接口(比如WithDeadline/WithTimeout)
if a, ok := parent.(afterFuncer); ok {
c.mu.Lock()
// 注册回调:父取消时,触发子取消
stop := a.AfterFunc(func() {
child.cancel(false, parent.Err(), Cause(parent))
})
// 包装父Context,记录stop函数(子取消时可停止回调)
c.Context = stopCtx{
Context: parent,
stop: stop,
}
c.mu.Unlock()
return
}
// 兜底方案:父Context不是cancelCtx也不是afterFuncer,但可取消(Done()≠nil) 自己实现的 Context
goroutines.Add(1)
go func() {
select {
case <-parent.Done():
// 父取消时,取消子Context
child.cancel(false, parent.Err(), Cause(parent))
case <-child.Done():
// 子先取消,无需处理
}
}()
}
核心函数 context.Cause
Cause 是用户获取 Context 取消原因的 唯一入口, errgroup 在 wait 判断时,可以拿到第一个错误,你也可以调用这个函数拿到错误,阅读源码我们知道,errgroup 内部主动 cancel(err) 传入了错误,并存储在了 cause 当然可以用该函数拿出
后续可以通过 cause := context.Cause(ctx) 拿到错误
func Cause(c Context) error {
// 每个 Context 都有 Value(key any) any 这个函数
if cc, ok := c.Value(&cancelCtxKey).(*cancelCtx); ok {
cc.mu.Lock()
cause := cc.cause
cc.mu.Unlock()
if cause != nil {
return cause
}
}
return c.Err()
}
Value 函数
其实就是递归向上找,我们这种情况直接返回,需要想上找的情况是自己存的业务 key ,后面会举例
func (c *cancelCtx) Value(key any) any {
if key == &cancelCtxKey {
return c
}
return value(c.Context, key)
}
// switch 所有的 已知的 context 找不到继续 向上找
func value(c Context, key any) any {
for {
switch ctx := c.(type) {
case *valueCtx:
if key == ctx.key {
return ctx.val
}
c = ctx.Context
case *cancelCtx:
if key == &cancelCtxKey {
return c
}
c = ctx.Context
case withoutCancelCtx:
if key == &cancelCtxKey {
return nil
}
c = ctx.c
case *timerCtx:
if key == &cancelCtxKey {
return &ctx.cancelCtx
}
c = ctx.Context
case backgroundCtx, todoCtx: // 空 emptyCtx
return nil
default:
return c.Value(key)
}
}
}
自定义业务 key
// 自定义业务 key
type userKey struct{}
func main() {
// 根 context,存入业务数据
ctx1 := context.WithValue(context.Background(), userKey{}, "张三")
// 在它基础上创建 cancelCtx
ctx2, cancel := context.WithCancel(ctx1)
// 再套一层 cancelCtx
ctx3, cancel2 := context.WithCancel(ctx2)
// 现在的结构:
// Background (空)
// └── ctx1 (valueCtx, 存了 userKey="张三")
// └── ctx2 (cancelCtx)
// └── ctx3 (cancelCtx) ← 我们拿着这个
// 查询业务数据:ctx3 本身没有,会递归向上找
user := ctx3.Value(userKey{})
fmt.Println("找到用户:", user) // 输出: 找到用户: 张三
// 即使 cancel 了,依然能找到
cancel2()
cancel()
user2 := ctx3.Value(userKey{})
fmt.Println("cancel后:", user2) // 输出: cancel后: 张三
}
总结
errgroup 是对 sync.WaitGroup 的增强封装,可以对 goroutine 数量限制 ,并且不需要显式的写WaitGroup..Add(1),内部自动管理,防止漏写,出错自动 cancel
如果自己相加业务逻辑 直接可以复制代码,按自己的需求添加字段或者函数
对于 context 源代码,要搞清楚当 cancel 后切断的是父对子的连接,但子有个字段专门存放父,这个并没有切断,才使得我们可以调用 context.Cause 方法一直向上找
更多推荐


所有评论(0)