Go语言的context是常用的并发控制技术.与waitGroup最大的不同点是context对

于派生的goroutine有更强的控制力.可以控制多级goroutine.

context翻译成中文是'上下文'.即它可以控制一组呈树状结构的goroutine.每个

goroutine拥有相同上下文.如下图.

上图由于goroutine派生出子goroutine.子goroutine又继续派生出新的gorou

tine.因为goroutine个数不容易确定.使用context就很容易实现.

1.context实现原理:

context实际上只定义了接口.凡是实现了该接口的类都可以称为一种context.官方

实现了几个常用的context.分

别用于不同的场景.

接口定义:

源码位置src/context/context.go:Context:

type Context interface {

    Deadline() (deadline time.Time, ok bool)

    Done() <-chan struct{}
   
    Err() error

    Value(key any) any
}

2.基础的context定义了4个方法:

1)Deadline():

该方法返回一个deadline和标识是否已设置deadline的bool值.如果没有设置

deadline.如果没有设置deadline.则ok=false.此时deadline为一个初始值的

time.Time值.

2)Done():

该方法返回一个用于探测Context是否取消的channel.当Context取消时会自动将

该channel关闭.

注:对于不支持取消的context.该方法可能返回nil.例如.context.Background().

3)Err():

该方法描述context关闭的原因.关闭原因由context实现控制.不需要用户设置.比如

Deadline context.关闭原因

可能是因为deadline.也可能被提前主动关闭.那么关闭原因就会不同.

deadline关闭:context deadline exceeded.

主动关闭:context canceled.

当context关闭后.Err()返回context的关闭原因.当context还未关闭时.Err()返回nil.

4)Value:

有一种context.它不是用于控制呈树状分布的goroutine.而是用于在树状分布的

goroutine间传递信息.Value()就是用于此种类型的context.该方法根据key值查

询map中的value.

3.空context:

context包中定义了一个context.名为emptyCtx.用于context的根节点.空的con

text只是简单的实现了Context.本身不包含任何值.仅用于其他context的父节点.定

义如下:

type emptyCtx struct{}

func (emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (emptyCtx) Done() <-chan struct{} {
    return nil
}

func (emptyCtx) Err() error {
    return nil
}

func (emptyCtx) Value(key any) any {
    return nil
}

context包中定义了一个公用的emptyCtx全局变量.名为background.可以使用

Background()获取它.实现如下:

type backgroundCtx struct{ emptyCtx }


func Background() Context {
    return backgroundCtx{}
}

WithCancel():

func withCancel(parent Context) *cancelCtx {
    if parent == nil {
       panic("cannot create context from nil parent")
    }
    c := &cancelCtx{}
    c.propagateCancel(parent, c)
    return c
}

WithDeadlind():

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    return WithDeadlineCause(parent, d, nil)
}

WithTimeout():

func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

WithValue():

func WithValue(parent Context, key, val any) Context {
    if parent == nil {
       panic("cannot create context from nil parent")
    }
    if key == nil {
       panic("nil key")
    }
    if !reflectlite.TypeOf(key).Comparable() {
       panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

4.cancelCtx:

源码位置src/context/context.go:cancelCtx.

type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
    cause    error                 // set to non-nil by the first cancel call
}

children中记录了由此context派生的所有child.这个context被"cancel"时会把

其中所有的child都"cancel"掉.

cancelCtx与deadline和value无关.所以只实现了Done和Err()外漏接口即可.

Done方法实现:

按照context的定义.Done方法只需返回一个channel即可.源码实现如下.

func (c *cancelCtx) Done() <-chan struct{} {
    d := c.done.Load()
    if d != nil {
       return d.(chan struct{})
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    d = c.done.Load()
    if d == nil {
       d = make(chan struct{})
       c.done.Store(d)
    }
    return d.(chan struct{})
}

因为done是原子类.会先进行获取.如果不为空.进行类型转换返回.如果为空.就进行

加锁.再次获取.还是为空的话进行创建赋值.然后返回.

Err()方法实现:

按照context定义Err()只需要返回一个error告知context被返回的原因.

func (c *cancelCtx) Err() error {
    c.mu.Lock()
    err := c.err
    c.mu.Unlock()
    return err
}

源码很简单.就是通过加锁.然后获取err然后进行返回.

cancel()方法:

func (c *cancelCtx) cancel(removeFromParent bool, err, cause error) {
    if err == nil {
       panic("context: internal error: missing cancel error")
    }
    if cause == nil {
       cause = err
    }
    c.mu.Lock()
    if c.err != nil {
       c.mu.Unlock()
       return // already canceled
    }
    c.err = err
    c.cause = cause
    d, _ := c.done.Load().(chan struct{})
    if d == nil {
       c.done.Store(closedchan)
    } else {
       close(d)
    }
    for child := range c.children {
       // NOTE: acquiring the child's lock while holding parent's lock.
       child.cancel(false, err, cause)
    }
    c.children = nil
    c.mu.Unlock()

    if removeFromParent {
       removeChild(c.Context, c)
    }
}

removeFromParent这块会在正常情况下.需要将自己从parent中移除.

WithCancel()实现:

WithCancel做了三件事.

1).初始化一个cancelCtx实例.

2).将cancelCtx实例添加到其父节点children中.

3).返回cancelCtx实例和cancel()方法.

源码如下:

// WithCancel returns a derived context that points to the parent context
// but has a new Done channel. The returned context's Done channel is closed
// when the returned cancel function is called or when the parent context's
// Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    c := withCancel(parent)
    return c, func() { c.cancel(true, Canceled, nil) }
}

withCancel(parent)方法源码:

func withCancel(parent Context) *cancelCtx {
    if parent == nil {
       panic("cannot create context from nil parent")
    }
    c := &cancelCtx{}
    //将自身添加到父节点.
    c.propagateCancel(parent, c)
    return c
}

propagateCancel方法源码:

// propagateCancel arranges for child to be canceled when parent is.
// It sets the parent context of cancelCtx.
func (c *cancelCtx) propagateCancel(parent Context, child canceler) {
    c.Context = parent

    done := parent.Done()
    if done == nil {
       return // parent is never canceled
    }

    select {
    case <-done:
       // parent is already canceled
       child.cancel(false, parent.Err(), Cause(parent))
       return
    default:
    }

    if p, ok := parentCancelCtx(parent); ok {
       // parent is a *cancelCtx, or derives from one.
       p.mu.Lock()
       if p.err != nil {
          // parent has already been canceled
          child.cancel(false, p.err, p.cause)
       } else {
          if p.children == nil {
             p.children = make(map[canceler]struct{})
          }
          p.children[child] = struct{}{}
       }
       p.mu.Unlock()
       return
    }

    if a, ok := parent.(afterFuncer); ok {
       // parent implements an AfterFunc method.
       c.mu.Lock()
       stop := a.AfterFunc(func() {
          child.cancel(false, parent.Err(), Cause(parent))
       })
       c.Context = stopCtx{
          Context: parent,
          stop:    stop,
       }
       c.mu.Unlock()
       return
    }

    goroutines.Add(1)
    go func() {
       select {
       case <-parent.Done():
          child.cancel(false, parent.Err(), Cause(parent))
       case <-child.Done():
       }
    }()
}

说明:

1).如果父节点也支持cancel.父节点肯定有children成员.把新context添加到children中即可.

2).如果父节点不支持cancel.就继续向上查询.直到找到一个支持cancel的节点.把新

context加到children.

3).所有父节点均不支持cancel.则启动一个协程等待父节点结束.然后把当前con

text结束.

cancelContext示例:

方法:

package Concurrent

import (
    "context"
    "fmt"
    "time"
)

func WriteRedis(ctx context.Context) {
    for {
       select {
       case <-ctx.Done():
          fmt.Println("WriteRedis Done.")
          return
       default:
          fmt.Println("WriteRedis...")
          time.Sleep(2 * time.Second)
       }
    }
}

func WriteDatabase(ctx context.Context) {
    for {
       select {
       case <-ctx.Done():
          fmt.Println("WriteDatabase Done.")
          return
       default:
          fmt.Println("WriteDatabase...")
          time.Sleep(2 * time.Second)
       }
    }
}

func HandleRequest(ctx context.Context) {
    go WriteDatabase(ctx)
    go WriteRedis(ctx)
    for false {
       select {
       case <-ctx.Done():
          fmt.Println("HandleRequest Done.")
          return
       default:
          fmt.Println("HandleRequest...")
          time.Sleep(2 * time.Second)
       }
    }
}

main方法:

func main() {
    ctx, cancelFunc := context.WithCancel(context.Background())
    go Concurrent.HandleRequest(ctx)

    time.Sleep(5 * time.Second)
    fmt.Println("stop all goroutine")

    cancelFunc()
    time.Sleep(5 * time.Second)
}

执行结果:

5.timerCtx:

源码位置src/context/context.go:timerCtx:

// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}

timerCtx在cancelCtx基础上增加了deadline.用于标示自动cancel的最终时间.而

timer就是一个触发自动cancel的定时器.由此衍生出WithDeadline()和

WithTimeout().两种类型实现原理一样.不过使用语境不一样.

deadline:指定最后期限.

Timeout:指定最长存活时间.

Deadline()实现:
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

Deadline方法只是返回了timeCtx.deadline.

cancel()实现:
func (c *timerCtx) cancel(removeFromParent bool, err, cause error) {
    c.cancelCtx.cancel(false, err, cause)
    if removeFromParent {
       // Remove this timerCtx from its parent cancelCtx's children.
       removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
       c.timer.Stop()
       c.timer = nil
    }
    c.mu.Unlock()
}

cancel()方法基本继承cancelCtx.只是需要把timer关闭.

timerCtx被关闭后.timerCtx.cancelCtx.err将存储关闭原因.

如果deadline到来之前手动关闭.关闭原因与cancelCtx显示一致.

如果deadline到来时自动关闭.关闭原因为context deadline exceeded.

WithDeadline()实现:
// WithDeadline returns a derived context that points to the parent context
// but has the deadline adjusted to be no later than d. If the parent's
// deadline is already earlier than d, WithDeadline(parent, d) is semantically
// equivalent to parent. The returned [Context.Done] channel is closed when
// the deadline expires, when the returned cancel function is called,
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    return WithDeadlineCause(parent, d, nil)
}
// WithDeadlineCause behaves like [WithDeadline] but also sets the cause of the
// returned Context when the deadline is exceeded. The returned [CancelFunc] does
// not set the cause.
func WithDeadlineCause(parent Context, d time.Time, cause error) (Context, CancelFunc) {
    if parent == nil {
       panic("cannot create context from nil parent")
    }
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
       // The current deadline is already sooner than the new one.
       return WithCancel(parent)
    }
    c := &timerCtx{
       deadline: d,
    }
    c.cancelCtx.propagateCancel(parent, c)
    dur := time.Until(d)
    if dur <= 0 {
       c.cancel(true, DeadlineExceeded, cause) // deadline has already passed
       return c, func() { c.cancel(false, Canceled, nil) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    if c.err == nil {
       c.timer = time.AfterFunc(dur, func() {
          c.cancel(true, DeadlineExceeded, cause)
       })
    }
    return c, func() { c.cancel(true, Canceled, nil) }
}

1).初始化一个timerCtx实例.

2).将timerCtx实例添加到父节点的children中.

3).启动定时器.定时器到期会自动cancel本context.

4).返回timerCtx实例和concel()方法.

WithTimeout()实现:
// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this [Context] complete:
//
//  func slowOperationWithTimeout(ctx context.Context) (Result, error) {
//     ctx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
//     defer cancel()  // releases resources if slowOperation completes before timeout elapses
//     return slowOperation(ctx)
//  }
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

WithTimeout()实际上调用了WithDeadline.二者实现原理一致.具体可以参考上面.

timerCtx示例:

方法:

package Concurrent

import (
    "context"
    "fmt"
    "time"
)

func WriteRedis(ctx context.Context) {
    for {
       select {
       case <-ctx.Done():
          fmt.Println("WriteRedis Done.")
          return
       default:
          fmt.Println("WriteRedis...")
          time.Sleep(2 * time.Second)
       }
    }
}

func WriteDatabase(ctx context.Context) {
    for {
       select {
       case <-ctx.Done():
          fmt.Println("WriteDatabase Done.")
          return
       default:
          fmt.Println("WriteDatabase...")
          time.Sleep(2 * time.Second)
       }
    }
}

func HandleRequest(ctx context.Context) {
    go WriteDatabase(ctx)
    go WriteRedis(ctx)
    for false {
       select {
       case <-ctx.Done():
          fmt.Println("HandleRequest Done.")
          return
       default:
          fmt.Println("HandleRequest...")
          time.Sleep(2 * time.Second)
       }
    }
}

main方法:

func main() {
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
    go Concurrent.HandleRequest(ctx)
    time.Sleep(10 * time.Second)
}

执行结果:

创建了一个5s超时的context.传给子协程.context超时后会引发子协程退出.

6.valueCtx:

源码位置:src/context/context.go:valueCtx

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val any
}

valueCtx只是在Context基础上增加了一个key-value结构.用于在各级协程间传递

一些数据.由于valueCtx既不需要cancel.也不需要deadline.只需要实现Value()方

法即可.

Value()实现:

由于valueCtx数据结构定义的可见.valueCtx.key和valueCtx.val分别代表其key

和value值.实现如下.

func (c *valueCtx) Value(key any) any {
    if c.key == key {
       return c.val
    }
    return value(c.Context, key)
}

当前context查找不到key时.会向父节点查找.如果查询不到则会返回一个any.

查找源码如下:

func value(c Context, key any) any {
    for {
       switch ctx := c.(type) {
       case *valueCtx:
          if key == ctx.key {
             return ctx.val
          }
          c = ctx.Context
       case *cancelCtx:
          if key == &cancelCtxKey {
             return c
          }
          c = ctx.Context
       case withoutCancelCtx:
          if key == &cancelCtxKey {
             // This implements Cause(ctx) == nil
             // when ctx is created using WithoutCancel.
             return nil
          }
          c = ctx.c
       case *timerCtx:
          if key == &cancelCtxKey {
             return &ctx.cancelCtx
          }
          c = ctx.Context
       case backgroundCtx, todoCtx:
          return nil
       default:
          return c.Value(key)
       }
    }
}
WithValue()实现:
// WithValue returns a derived context that points to the parent Context.
// In the derived context, the value associated with key is val.
//
// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val any) Context {
    if parent == nil {
       panic("cannot create context from nil parent")
    }
    if key == nil {
       panic("nil key")
    }
    if !reflectlite.TypeOf(key).Comparable() {
       panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}

最终就是通过一些校验.然后返回了一个valueCtx.

valueCtx示例:

方法:

package Concurrent

import (
    "context"
    "fmt"
    "time"
)

func HandlerValueCtx(ctx context.Context) {
    for {
       select {
       case <-ctx.Done():
          fmt.Println("HandlerValueCtx Done.")
          return
       default:
          fmt.Println("HandlerValueCtx...,key:", ctx.Value("key"))
          time.Sleep(2 * time.Second)
       }
    }
}

main方法:

func main() {
    valueCtx := context.WithValue(context.Background(), "key", "我是key")
    go Concurrent.HandlerValueCtx(valueCtx)
    time.Sleep(10 * time.Second)
}

执行结果:

注:

本例子中子协程无法自动结束.因为valueCtx不支持cancel.因此需要在指定一个可

以执行cancel的父context.

小结:

1.Context仅仅是定义一个接口.根据实现不同.可以衍生出不同的context类型.

2.cancelCtx实现了Context接口.通过WithCancel()创建实例.

3.timerCtx实现了Context接口.通过WithDeadline和WithTimeout创建实例.

4.valueCtx实现了Context接口.通过WithValue创建实例.

5.三种context实例互为父节点.从而可以组合成不同的应用形式.

我以过客之名.曾经心舟赴彼岸远行.

如果大家喜欢我的分享的话.可以关注我的微信公众号

念何架构之路

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐