文章目录

【GO】并发编程

Go语言在并发编程方面的能力特别强大,实现典型的协程的概念。内置的Go调度器在调度Goroutine时开销非常低。

Go并发编程的理念是:

通过通信来共享内存,而不是共享内存来通信

在Go中,每个并发执行的单元称为goroutine。通常称为Go协程。

主goroutine结束了,内部调用的goroutine也会随之结束。

使用sync.WaitGroup实现协同调度

func GoroutineWG() {
    // 1. 初始化 WaitGroup
    wg := sync.WaitGroup{}
    // 定义输出奇数的函数
    printOdd := func() {
        // 3.并发执行结束后,计数器-1
        defer wg.Done()
        for i := 1; i <= 10; i += 2 {
            fmt.Println(i)
            time.Sleep(100 * time.Millisecond)
        }
    }

    // 定义输出偶数的函数
    printEven := func() {
        // 3.并发执行结束后,计数器-1
        defer wg.Done()
        for i := 2; i <= 10; i += 2 {
            fmt.Println(i)
            time.Sleep(100 * time.Millisecond)
        }
    }
    // 在 main goroutine 中,开启新的goroutine
    // 并发调用
    // 2, 累加WG的计数器
    wg.Add(2)
    go printOdd()
    go printEven()

    // main goroutine 运行结束
    // 内部调用的goroutine也就结束
    // 4. 主goroutine等待
    wg.Wait()
    fmt.Println("after main wait")
}

底层代码

使用 go1.20 2025 年 1 月发布的版本

// Copyright 2024 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package sync

type WaitGroup struct {
	noCopy noCopy

	// Bits (high to low):
	//   bits[0:32]  counter 任务计数器,表示待等待的goroutine数量
	//   bits[32]    flag: synctest bubble membership 用于synctest的标记
	//   bits[33:64] wait count 等待者数量,表示正在Wait的goroutine数量
	state atomic.Uint64
	sema  uint32 // 信号量,用于阻塞/唤醒等待的goroutine
}

// waitGroupBubbleFlag indicates that a WaitGroup is associated with a synctest bubble.
const waitGroupBubbleFlag = 0x8000_0000

// Add adds delta, which may be negative, to the [WaitGroup] task counter.
// 如果计数器变为零,释放所有因调用Wait而阻塞的goroutine。
// 如果计数器变为负数,Add会触发panic。
//
// 调用者应优先使用 [WaitGroup.Go]。
//
// 注意:当计数器为零时,带正数delta的Add调用必须在Wait之前执行。
// 带负数delta的调用,或当计数器大于零时才开始执行的正数Add调用,则可以在任意时刻发生。
// 通常这意味着Add的调用应先于创建goroutine或其他需等待的事件。
// 如果WaitGroup被复用以等待多个独立的事件集,新的Add调用必须在前一次所有Wait返回之后执行。
// 请参阅WaitGroup示例。
func (wg *WaitGroup) Add(delta int) {
	// race.Enabled 用于数据竞争检测,在正式代码中通常为false
	if race.Enabled {
		if delta < 0 {
			// 同步递减操作与Wait之间的竞争
			race.ReleaseMerge(unsafe.Pointer(wg))
		}
		race.Disable()
		defer race.Enable()
	}

	// bubbled 标记当前Add调用是否在synctest bubble内执行
	bubbled := false

	// synctest.IsInBubble() 检查当前是否在bubble测试环境中
	if synctest.IsInBubble() {
		// 如果Add从bubble内调用,则所有Add调用必须来自同一个bubble
		switch synctest.Associate(wg) {
		case synctest.Unbubbled:
			// 无特殊关联,正常处理
		case synctest.OtherBubble:
			// wg已与其他bubble关联,panic
			fatal("sync: WaitGroup.Add called from multiple synctest bubbles")
		case synctest.CurrentBubble:
			// 当前bubble匹配,设置bubbled标记
			bubbled = true
			// 设置waitGroupBubbleFlag标记
			state := wg.state.Or(waitGroupBubbleFlag)
			// 检查是否有从bubble外调用的Add
			if state != 0 && state&waitGroupBubbleFlag == 0 {
				fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
			}
		}
	}

	// state的高32位存储counter,低32位存储waiter count
	// 将delta左移32位后累加到state的高32位
	state := wg.state.Add(uint64(delta) << 32)

	// 检查Add调用是否在synctest bubble中一致
	if state&waitGroupBubbleFlag != 0 && !bubbled {
		fatal("sync: WaitGroup.Add called from inside and outside synctest bubble")
	}

	// 提取counter值(高32位,有符号)
	v := int32(state >> 32)
	// 提取waiter count(低32位,忽略最高位标记位)
	w := uint32(state & 0x7fffffff)

	// race检测:第一个递增必须与Wait同步
	if race.Enabled && delta > 0 && v == int32(delta) {
		// 需要建模为读操作,因为可能有多个并发的counter从0开始的转变
		race.Read(unsafe.Pointer(&wg.sema))
	}

	// counter为负数,panic
	if v < 0 {
		panic("sync: negative WaitGroup counter")
	}

	// waiter不为0且delta>0且是第一个增加操作,说明Add和Wait并发调用
	if w != 0 && delta > 0 && v == int32(delta) {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

	// 如果counter还没减到0,或者没有等待者,直接返回
	if v > 0 || w == 0 {
		return
	}

	// 执行到这里说明:counter变为0,且有等待者(w > 0)
	// 此时不能再有并发的Add或Wait操作
	// 做一次廉价的状态检查以检测误用
	if wg.state.Load() != state {
		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
	}

	// 将waiter count重置为0
	wg.state.Store(0)

	// 如果有关联的bubble,断开关联
	if bubbled {
		// counter为0时,不能有并发的Add操作,所以可以安全断开wg与bubble的关联
		synctest.Disassociate(wg)
	}

	// 唤醒所有等待的goroutine
	for ; w != 0; w-- {
		// 释放信号量,唤醒一个等待者
		runtime_Semrelease(&wg.sema, false, 0)
	}
}

// Done decrements the [WaitGroup] task counter by one.
// 它相当于 Add(-1)。
//
// 调用者应优先使用 [WaitGroup.Go]。
//
// 根据 Go 内存模型,Done 的调用 "同步先于" 它所解除阻塞的任何 Wait 调用的返回。
func (wg *WaitGroup) Done() {
	wg.Add(-1)
}

// Wait blocks until the [WaitGroup] task counter is zero.
// 阻塞直到WaitGroup的任务计数器变为零
func (wg *WaitGroup) Wait() {
	if race.Enabled {
		race.Disable()
	}

	// 自旋等待
	for {
		// 读取当前状态
		state := wg.state.Load()

		// 提取counter(高32位)
		v := int32(state >> 32)
		// 提取waiter count(低32位)
		w := uint32(state & 0x7fffffff)

		// counter为0,无需等待
		if v == 0 {
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}

			// 处理synctest bubble相关逻辑
			if w == 0 && state&waitGroupBubbleFlag != 0 && synctest.IsAssociated(wg) {
				// counter为0时不能有并发的Add,可以断开wg与bubble的关联
				if wg.state.CompareAndSwap(state, 0) {
					synctest.Disassociate(wg)
				}
			}
			return
		}

		// counter不为0,需要等待
		// 使用CAS原子地增加waiter count
		if wg.state.CompareAndSwap(state, state+1) {
			// race检测:第一个waiter需要与第一个Add同步
			if race.Enabled && w == 0 {
				// 需要建模为写操作,与Add中的读操作竞争
				// 只有第一个waiter执行写操作,避免并发Wait之间的竞争
				race.Write(unsafe.Pointer(&wg.sema))
			}

			// 标记是否为durable blocking(用于synctest)
			synctestDurable := false
			if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
				if race.Enabled {
					race.Enable()
				}
				if synctest.IsAssociated(wg) {
					// Add在当前bubble内调用,所以这个Wait是durable blocking
					synctestDurable = true
				}
				if race.Enabled {
					race.Disable()
				}
			}

			// 阻塞等待信号量,被唤醒时counter已经变为0
			runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)

			// 被唤醒后检查wg状态
			if wg.state.Load() != 0 {
				panic("sync: WaitGroup is reused before previous Wait has returned")
			}
			if race.Enabled {
				race.Enable()
				race.Acquire(unsafe.Pointer(wg))
			}
			return
		}
		// CAS失败,说明状态被其他线程修改,重新循环
	}
}



cursor 总结 WaitGroup

架构
┌─────────────────────────────────────────────────────────────────┐
│                         用户层                                   │
│  Add(delta)Done()Wait()                                   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                      sync 包层                                   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ WaitGroup 结构                                           │   │
│  │   - state (atomic.Uint64): counter + waiter + flag      │   │
│  │   - sema (uint32): 信号量                               │   │
│  └─────────────────────────────────────────────────────────┘   │
│  ┌─────────────────────────────────────────────────────────┐   │
│  │ 方法逻辑                                                │   │
│  │   - Add: 修改counter,counter=0时唤醒waiter             │   │
│  │   - Wait: 自旋+CAS增加waiter,阻塞等待sema               │   │
│  │   - Done: Add(-1) 的语法糖                              │   │
│  └─────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                     runtime 包层 (汇编实现)                      │
│  runtime_SemacquireWaitGroup(): P原子队列等待                   │
│  runtime_Semrelease(): 唤醒等待的P                              │
│  执行信号量的 PV 操作 (睡眠/唤醒 goroutine)                     │
└─────────────────────────────────────────────────────────────────┘
WaitGroup 结构
type WaitGroup struct {
    noCopy noCopy                          // 禁止复制
    state  atomic.Uint64                   // 64位原子整数
    sema   uint32                          // 信号量
}
// state 的位布局 (64位):
┌────────────────────────────────────────────────────────────────┐
│  63                        33    32            0              │
│  [      counter (32)       ][标记][   waiter count (31) ]  │
│         高32位                    ↑       低32(除最高位)      │
│                          waitGroupBubbleFlag                   │
│                          = 0x8000_0000                          │
└────────────────────────────────────────────────────────────────┘

为什么这样设计?

设计点 原因
atomic.Uint64 用一个 64 位整数存储多个字段,减少内存占用,保证原子性
counter 和 waiter 合并 counter 归零时需要知道 waiter 数量,一起读取避免竞态
标记位 用于 synctest bubble 测试(Go 1.24 新增)
sema 分离 信号量是独立的资源,不能合并到 state
Add
func (wg *WaitGroup) Add(delta int) {
    // ========== 步骤1: race 检测干预 ==========
    if race.Enabled {
        if delta < 0 {                    // Done() 场景
            race.ReleaseMerge(unsafe.Pointer(wg))  // 建立同步点
        }
        race.Disable()                    // 内部操作无需检测
    }
    // ========== 步骤2: synctest bubble 处理 ==========
    bubbled := false
    if synctest.IsInBubble() {
        // 确保同一 wg 的所有 Add 来自同一 bubble
        switch synctest.Associate(wg) {
        case synctest.CurrentBubble:
            bubbled = true
            state := wg.state.Or(waitGroupBubbleFlag)  // 设置标记位
            // 检查是否有从 bubble 外调用的 Add
            if state != 0 && state&waitGroupBubbleFlag == 0 {
                panic("...")
            }
        }
    }
    // ========== 步骤3: 核心 - 修改 counter ==========
    // delta 左移32位,只修改高32位的counter
    state := wg.state.Add(uint64(delta) << 32)
    // ========== 步骤4: 合法性检查 ==========
    v := int32(state >> 32)               // 提取 counter
    w := uint32(state & 0x7fffffff)       // 提取 waiter count
    if v < 0 {
        panic("sync: negative WaitGroup counter")
    }
    if w != 0 && delta > 0 && v == int32(delta) {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    if v > 0 || w == 0 {
        return  // counter 还没到 0,或没有等待者,直接返回
    }
    // ========== 步骤5: counter 归零,唤醒所有 waiter ==========
    // 双重检查:确保状态没有被并发修改
    if wg.state.Load() != state {
        panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    }
    // 重置 waiter count
    wg.state.Store(0)
    // 断开 bubble 关联(如果有)
    if bubbled {
        synctest.Disassociate(wg)
    }
    // 循环唤醒所有等待的 goroutine
    for ; w != 0; w-- {
        runtime_Semrelease(&wg.sema, false, 0)  // 唤醒一个
    }
}

Add 流程图

Add(delta) 被调用
        │
        ▼
┌───────────────────┐
│ 提取 counter 和   │
│ waiter count      │
└─────────┬─────────┘
          │
          ▼
    ┌─────────────┐
    │ v = counter │────── v < 0? ────→ panic(负数)
    └──────┬──────┘
           │
           ▼
    ┌─────────────┐
    │ v > 0 ?     │─── 是 ───→ return (还有未完成的任务)
    └──────┬──────┘
           │ 否 (v == 0)
           ▼
    ┌─────────────┐
    │ w = waiter  │
    │ w == 0 ?    │─── 是 ───→ return (没人等待)
    └──────┬──────┘
           │ 否 (w > 0,有人等待)
           ▼
┌───────────────────────────────┐
│ counter 归零,唤醒所有 waiter │
│ for w 次:                     │
│   runtime_Semrelease()        │
└───────────────────────────────┘
Wait
func (wg *WaitGroup) Wait() {
    // ========== 步骤1: race 检测干预 ==========
    if race.Enabled {
        race.Disable()
    }
    // ========== 步骤2: 自旋等待 ==========
    for {
        state := wg.state.Load()
        v := int32(state >> 32)           // counter
        w := uint32(state & 0x7fffffff)   // waiter count
        // ========== 步骤3: 检查是否需要等待 ==========
        if v == 0 {
            // counter 已归零,无需等待
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            // 处理 bubble 关联
            if w == 0 && state&waitGroupBubbleFlag != 0 && synctest.IsAssociated(wg) {
                if wg.state.CompareAndSwap(state, 0) {
                    synctest.Disassociate(wg)
                }
            }
            return
        }
        // ========== 步骤4: CAS 增加 waiter count ==========
        // 使用 CAS 确保原子性,防止多个 waiter 竞争
        if wg.state.CompareAndSwap(state, state+1) {
            // race 检测:第一个 waiter 需要同步
            if race.Enabled && w == 0 {
                race.Write(unsafe.Pointer(&wg.sema))
            }
            // ========== 步骤5: 阻塞等待 ==========
            synctestDurable := false
            if state&waitGroupBubbleFlag != 0 && synctest.IsInBubble() {
                if race.Enabled {
                    race.Enable()
                }
                if synctest.IsAssociated(wg) {
                    synctestDurable = true
                }
                if race.Enabled {
                    race.Disable()
                }
            }
            // 核心阻塞调用:被唤醒时 counter 已经归零
            runtime_SemacquireWaitGroup(&wg.sema, synctestDurable)
            // ========== 步骤6: 被唤醒后检查 ==========
            if wg.state.Load() != 0 {
                panic("sync: WaitGroup is reused before previous Wait has returned")
            }
            if race.Enabled {
                race.Enable()
                race.Acquire(unsafe.Pointer(wg))
            }
            return
        }
        // CAS 失败,说明状态被修改,重新循环
    }
}

Wait 流程图

Wait() 被调用
        │
        ▼
┌───────────────────┐
│ 读取 state        │
│ v = counter       │
│ w = waiter count  │
└─────────┬─────────┘
          │
          ▼
    ┌─────────────┐
    │ v == 0 ?    │─── 是 ───→ return (已完成)
    └──────┬──────┘
           │ 否
           ▼
    ┌───────────────────┐
    │ CAS: state+1      │──── 失败 ──→ 重新循环
    └─────────┬─────────┘
              │ 成功
              ▼
    ┌───────────────────┐
    │ race 检测处理     │
    └─────────┬─────────┘
              │
              ▼
    ┌───────────────────┐
    │ runtime_Semacquire│
    │ 阻塞等待信号量     │
    └─────────┬─────────┘
              │ 被 Add 唤醒
              ▼
    ┌───────────────────┐
    │ 检查 wg 是否复用  │─── 复用 ──→ panic
    └─────────┬─────────┘
              │ 没复用
              ▼
          return
信号量底层 (runtime)
// 核心数据结构:semaTable (信号量表)
// 每个 goroutine M 有一个等待队列
// runtime_SemacquireWaitGroup 内部实现(伪代码)
func runtime_SemacquireWaitGroup(sema *uint32, durable bool) {
    // 1. 将当前 goroutine 加入等待队列
    gp := getg()                          // 获取当前 goroutine
    gp.waitSema = sema
    // 2. 原子操作:如果 sema 仍为0,睡眠当前 goroutine
    for !atomic.Cas(sema, 0, 1) {
        // 放入等待队列
        enqueue_waiter(sema, gp)
        // 调度器:当前 goroutine 睡眠,切换到其他 goroutine
        gopark(unlock, sema, "semacquirewaitgroup")
    }
    // 3. 被唤醒时继续执行
}
// runtime_Semrelease 内部实现(伪代码)
func runtime_Semrelease(sema *uint32, handoff bool, traceskip int) {
    // 1. 从等待队列取出一个 waiter
    gp := dequeue_waiter(sema)
    // 2. 唤醒该 goroutine
    // 设置为可运行状态,等待调度器调度
    goready(gp, traceskip)
}
信号量流程图
goroutine A (Wait)              goroutine B (Add)
        │                              │
        │   runtime_Semacquire()       │
        │─────────────────────┐        │
        │                     │        │
        │   加入等待队列       │        │
        │   gopark 睡眠       │        │
        │                     │        │
        ▼                     │        ▼
    [睡眠等待]                │   counter == 0
        │                     │   runtime_Semrelease()
        │                     │          │
        │                     │          ▼
        │                     │   [唤醒 waiter]
        │                     │          │
        │                     │          ▼
        │◀────────────────────│   goready(gp)
        │                     │
   [被唤醒,继续执行]            │
        │                     │
        ▼                     ▼
   return
为什么这样设计?
1. 为什么用自旋 + CAS + 阻塞?
// Wait 中的 for 循环
for {
    if wg.state.CompareAndSwap(state, state+1) {
        runtime_SemacquireWaitGroup(...)
    }
}

原因:

  • 自旋:如果 counter 很快归零,不需要真正阻塞,节省一次上下文切换
  • CAS:确保 waiter count 的增加是原子的
  • 阻塞:counter 长时间不归零时,休眠 goroutine 让出 CPU

}

2.为什么需要用前 31 位存储 waiter 数量,既然 Add 对于是否唤醒操作是判断是否有 waiter,那我只设置 1bit 位标识是否有 waiter 不就好了,第一个 waiter 商场把 flag 置为 1
for ; w != 0; w-- {
    runtime_Semrelease(&wg.sema, false, 0)
}

所以需要知道有多少个 waiter

单个唤醒 vs 全部唤醒

因为信号量进入等待的 goruntine 在一个队列里,唤醒就是出队列的过程,时间复杂度都是 O(N)

单个唤醒你就可以

  • 让第一个被唤醒的 goroutine 很快就能运行
  • 不需要等待所有 1000 个都被调度,调度器可以并行调度这些 goroutine

如果用 broadcast

  • 必须先遍历整个链表
  • 然后一次性标记所有 goroutine 为 ready
  • 再让调度器去调度
  • 开销更大,因为是连续操作
// 场景:Add 唤醒 1000 个 waiter
for i := 0; i < 1000; i++ {
    runtime_Semrelease(&wg.sema, false, 0)
}

3. 为什么用信号量而不是 channel?
// ❌ 如果用 channel
ch := make(chan struct{}, 100)  // 缓冲多大?
wg.Wait() <- struct{}{}         // Add 发送
// 问题:不知道有多少个 Wait,无法确定缓冲大小

信号量的优势:

  • 不需要预先知道 waiter 数量
  • 动态队列,自动扩容
  • 直接对接 runtime 调度器,性能更好

总结

组件 作用 为什么
state (atomic.Uint64) 存储 counter + waiter + flag 一个原子操作完成多个字段更新
sema (uint32) 信号量句柄 对接 runtime 调度器
Add 修改 counter,counter=0 时唤醒 任务完成通知
Wait 自旋+CAS+阻塞等待 等待任务完成
runtime_Semacquire goroutine 睡眠 让出 CPU
runtime_Semrelease goroutine 唤醒 恢复执行

一句话总结:WaitGroup 用一个 64 位原子整数记录任务数量和等待者数量,用信号量实现 goroutine 的睡眠与唤醒,通过 CAS 保证并发安全。

goroutine的并发规模

Goroutine 的并发数量有上限吗?

  • 受goroutine占用的栈内存限制
  • 受内部操作资源限制
  • goroutine本身无上限

函数 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">runtime.NumGoroutine()</font> 可以获取当前存在的Goroutine数量。

goroutine的最小为2KB

之所以支持百万级的goroutine并发,核心原因是因为每个goroutine的初始栈内存为2KB,用于保持goroutine中的执行数据,例如局部变量等。相对来说,线程线程的栈内存通常为2MB。除了比较小的初始栈内存外,goroutine的栈内存可扩容的,也就是说支持按需增大或缩小,一个goroutine最大的栈内存当前限制为1GB。

goroutine内部资源竞争溢出

在goroutine内增加,fmt.Println() 测试:

panic: too many concurrent operations on a single file or socket (max 1048575)

goruntine 池

pool, err = ants.NewPool(size)

defer pool.Release()

err := pool.Submit(funct)

$ go get -u github.com/panjf2000/ants/v2

func GoroutineAnts() {
    // 1. 统计当前存在的goroutine的数量
    go func() {
        for {
            fmt.Println("NumGoroutine:", runtime.NumGoroutine())
            time.Sleep(500 * time.Millisecond)
        }
    }()

    // 2. 初始化协程池,goroutine pool
    size := 1024
    pool, err := ants.NewPool(size)
    if err != nil {
        log.Fatalln(err)
    }
    // 保证pool被关闭
    defer pool.Release()

    // 3. 利用 pool,调度需要并发的大量goroutine
    for {
        // 向pool中提交一个执行的goroutine
        err := pool.Submit(func() {
            time.Sleep(100 * time.Second)
        })
        if err != nil {
            log.Fatalln(err)
        }
    }
}

// ======
> go test -run TestGoroutineAnts
runtime.NumGoroutine():  8
runtime.NumGoroutine():  1031
runtime.NumGoroutine():  1031
runtime.NumGoroutine():  1031
runtime.NumGoroutine():  1031

并发调度

GMP 底层源码

数据结构
type g struct {
    // ...
    m         *m   // 在 p 的代理,负责执行当前 g 的 m;   
    // ...
    sched     gobuf
    // ...
}


type gobuf struct {
    sp   uintptr // 保存 CPU 的 rsp 寄存器的值,指向函数调用栈栈顶;
    pc   uintptr // 保存 CPU 的 rip 寄存器的值,指向程序下一条执行指令的地址;
    ret  uintptr // 保存系统调用的返回值;
    bp   uintptr // 保存 CPU 的 rbp 寄存器的值,存储函数栈帧的起始位置.
}

type m struct {
    g0      *g      // 系统 goroutine,用于调度(始终不变)
    curg    *g      // 当前正在运行的 g(会变化)
    // ...
    tls           [tlsSlots]uintptr // thread-local storage (for x86 extern register)
    // ...
        gsignal   *g    // 每个 m 有一个 gsignal 把信号处理隔离,避免打断持锁/内存分配
    
    // gsignal 的特殊之处:
    // - 有独立的信号栈 (很小,通常 64KB)
    // - 不受用户信号处理函数影响
    // - 用于运行时内部信号处理
    // m.tls[0] 存储的是当前运行的 g,因此线程可以通过 g 找到当前的 m、p、g0 等信息.
}

type p struct {
    // ...
    runqhead uint32
    runqtail uint32
    runq     [256]guintptr
    
    runnext guintptr // 下一个可执行的 goroutine.
    // ...
}

type schedt struct {
    // ...
    lock mutex
    // ...
    runq     gQueue
    runqsize int32
    // ...
}
状态

const(
  _Gidle = itoa // 0协程开始创建时的状态,此时尚未初始化完成;
  _Grunnable // 1协程在待执行队列中,等待被执行;
  _Grunning // 2协程正在执行,同一时刻一个 p 中只有一个 g 处于此状态;
  _Gsyscall // 3协程正在执行系统调用;
  _Gwaiting // 4 	协程处于挂起态,需要等待被唤醒. gc、
    //				channel 通信或者锁操作时经常会进入这种状态;
  _Gdead // 6 刚初始化完成或者已经被销毁,会处于此状态;
  _Gcopystack // 8 协程正在栈扩容流程中;
  _Gpreempted // 9 协程被抢占后的状态.
)
两种 G 的切换

m 通过 p 调度执行的 goroutine 永远在普通 g 和 g0 之间进行切换,当 g0 找到可执行的 g 时,会调用 gogo 方法,调度 g 执行用户定义的任务;当 g 需要主动让渡或被动调度时,会触发 mcall 方法,将执行权重新交还给 g0.

gogo 和 mcall 可以理解为对偶关系,其定义位于 runtime/stubs.go 文件中.

func gogo(buf *gobuf)
// ...
func mcall(fn func(*g))
调度(此处指切换 P)类型

主动调度

func Gosched() {
    checkTimeouts()
    mcall(gosched_m)
}
被动调度

g 可能会陷入阻塞态无法被调度,直到关注的条件达成后,g才从阻塞中被唤醒,重新进入可执行队列等待被调度.

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    // ...
    mcall(park_m)
}

goready 方法通常与 gopark 方法成对出现,能够将 g 从阻塞态中恢复,重新进入等待执行的状态.

func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}
正常调度

g 中的执行任务已完成,g0 会将当前 g 置为死亡状态,发起新一轮调度.(go 执行 schedule)

抢占调度

g 执行超过指定的时长或者被系统调用阻塞(非阻塞的不会)

值得一提的是,前 3 种调度方式都由 m 下的 g0 完成,唯独抢占调度不同.

因为发起系统调用时需要打破用户态的边界进入内核态,此时 m 陷入僵直,无法主动完成抢占调度的行为.

因此,在 Golang 进程会有一个全局监控协程 monitor g 的存在,这个 g 会越过 p 直接与一个 m 进行绑定,不断轮询对所有 p 的执行状况进行监控. 倘若发现满足抢占调度的条件,则会从第三方的角度出手干预,主动发起该动作

  • 当M的执行因为G进行了系统调用时,M会释放与之绑定的P,把P转移给其他的M去执行。称为P抢占
  • 当M执行完的系统调用阻塞的G后,M会尝试获取新的空闲P,同时将G放入P的本地队列执行。若没有空闲的P,则将G放入全局G队列,M进入休眠,等待被唤醒或被垃圾回收
总结

集齐各部分理论碎片之后,我们可以尝试对 gmp 的宏观调度流程进行整体串联:

(1)以 g0 -> g -> g0 的一轮循环为例进行串联;

(2)g0 执行 schedule() 函数,寻找到用于执行的 g;

(3)g0 执行 execute() 方法,更新当前 g、p 的状态信息(g 由 runnable->running),调度器调度次数+1,并调用 gogo() 方法,将执行权交给 g;

(4)g 因主动让渡( gosche_m() )、被动调度( park_m() )、正常结束( goexit0() )等原因,调用 m_call 函数,执行权重新回到 g0 手中;

(5)g0 执行 schedule() 函数,开启新一轮循环.

findRunnable

源码
func findRunnable() (gp *g, inheritTime, tryWakeP bool) {
    _g_ := getg()


    top:
    _p_ := _g_.m.p.ptr()
    // ...
    if _p_.schedtick%61 == 0 && sched.runqsize > 0 {
        lock(&sched.lock)
        gp = globrunqget(_p_, 1)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false, false
        }
    }

    // ...
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime, false
    }

    // ...
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false, false
        }
    }




    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            return gp, false, false
        }
    }


    // ...
    procs := uint32(gomaxprocs)
    if _g_.m.spinning || 2*atomic.Load(&sched.nmspinning) < procs-atomic.Load(&sched.npidle) {
        if !_g_.m.spinning {
            _g_.m.spinning = true
            atomic.Xadd(&sched.nmspinning, 1)
        }




        gp, inheritTime, tnow, w, newWork := stealWork(now)
        now = tnow
        if gp != nil {
            // Successfully stole.
            return gp, inheritTime, false
        }
        if newWork {
            // There may be new timer or GC work; restart to
            // discover.
            goto top
        }
        if w != 0 && (pollUntil == 0 || w < pollUntil) {
            // Earlier timer to wait for.
            pollUntil = w
        }
    }


    //
过程

p 每执行 61 次调度,会从全局队列中获取一个 goroutine 进行执行

多对多的协程调度

整体来说就是一个函数一旦执行,不能被打断去执行别的函数。

而协程Coroutine的设计就是在某个函数执行的过程中,可以主动(Python的yield)和被动(go 的goroutine)的被终止执行,转而去执行其他函数。

GMP模型结构

Goroutine 就是Go语言实现的协程模型。其核心结构有三个,称为GMP,也叫GMP模型。分别是:

  • G,Goroutine,我们使用关键字go调用的函数。存储于P的本地队列或者是全局队列中。
  • M,Machine,就是Work Thread,就是传统意义的线程,用于执行Goroutine,G。只有在M与具体的P绑定后,才能执行P中的G。
  • P,Processor,处理器,主要用于协调G和M之间的关系,存储需要执行的G队列,与特定的M绑定后,执行Go程序,也就是G。

GMP整体结构逻辑图:

P的数量

P的数量通常是固定的,当程序启动时由 **<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">$GOMAXPROCS</font>**环境变量决定创建P的数量。默认的值为当前CPU的核心数所有的 P 都在程序启动时创建。

这意味着程序的执行过程中,最多同时有$GOMAXPROCS个Goroutine同时运行,默认与CPU核数保持一致,可以最大程度利用多核CPU并行执行的能力。

程序运行时,runtime.GOMAXPROCS()函数可以动态改变P的数量,但通常不建议修改,或者即使修改也不建议数量超过CPU的核数。调动该函数的典型场景是控制程序的并行规模,例如:

// 最多利用一半的CPU
runtime.GOMAXPROCS(runtime.NumCPU() / 2)

// 获取当前CPU的核数
runtime.NumCPU()

我们知道Go没有限定G的数量,那M的数量呢?

  • Go对M的数量做了一个上限,10000个,但通常不会到达这个规模,因为操作系统很难支持这么多的线程。
  • M的数量是由P决定的。
  • 当P需要执行时,会去找可用的M,若没有可用的M,就会创建新的M,这就会导致M的数量不断增加
  • 当M线程长时间空闲,也就是长时间没有新任务时,GC会将线程资源回收,这会导致M的数量减少
  • 整体来说,M的数量一定会多于P的数量,取决于空闲(没有G可执行的)的,和完成其他任务(例如CGO操作,GC操作等)的M的数量多少

P与G关联的流程

go 创建的待执行的Goroutine与P建立关联的核心流程

  • 新创建的G会优先保持在P的本地队列中。例如A函数中执行了 go B(),那么B这个Goroutine会优先保存在A所属的P的本地队列中。
  • 若G加入P的本地队列时本地队列已满,那么G会被加入到全局G队列中。新G加入全局队列时,会把P本地队列中一半的G也同时移动到全局队列中(是乱序入队列),以保证P的本地队列可以继续加入新的G。
  • 当P要执行G时
    • 会从P的本地队列查找G。
    • 若本地队列中没有G,则会尝试从其他的P中偷取(Steal)G来执行,通常会偷取一半的G。
    • 若无法从其他的P中偷取G,则从全局G队列中获取G,会一次获取多个G。
    • 整体:本地G队列->其他P的本地G队列->全局G队列
  • 当全局运行队列中有待执行的 G 时,还会有固定几率(每61个调度时钟周期 schedtick)会从全局的运行队列中查找对应的 G,为了保证全局G队列一定可以被调度。

核心流程图例:

A 中调用了 go B(), P的本地队列未满时:

A 中调用了 go B(), P的本地队列已满时:

当P要执行G时:

P与M关联的流程

P中关联了大量待执行的G,若需要执行G,P要去找可用的M。P不能执行G,只有M才能真正执行G。

P与M建立关联的核心过程:

  • 当P需要执行时,P要寻找可用的M,优先从空闲M池中找,若没有空闲的,则新建M来执行
  • 在创建G时,G会尝试唤醒空闲的M
  • 当M的执行因为G进行了系统调用(无论是否阻塞)时,M会释放与之绑定的P,把P转移给其他的M去执行。称为P抢占。如果是用户态阻塞,M 不会解除 P,会继续执行 G
  • 当M执行完的系统调用阻塞的G后,M会尝试获取新的空闲P,同时将G放入P的本地队列执行。若没有空闲的P,则将G放入全局G队列,M进入休眠,等待被唤醒或被垃圾回收

如图所示:

P要寻找可用的M:

G执行了阻塞式系统调用,M与P解绑(释放),P转移到新的M上执行:

M1执行完G的系统调用后,G不一定结束,还要继续执行,则M1会尝试获取空闲的P(没有与M绑定的P),若没有空闲的P可用,将M1执行的G放入全局G队列,M1进入空闲状态:

M0 和 G0

  • M0, 启动程序后的编号为 0 的主线程,负责执行初始化操作和启动第一个 G,也就是 main Goroutine。之后与其他M一样调度。
  • G0,每个 M 创建的第一个 Goroutine。G0 仅用于负责调度的 G,G0 不指向任何可执行的函数,每个 M 都会有一个自己的 G0。在调度或系统调用时会使用 G0 的栈空间。
  • 当一个 M 被创建时,会自动同步初始化一个专属的 G0;G0 从诞生到销毁,始终与所属 M 绑定,不会被调度到其他 M 上,也不会有多个 M 共用一个 G0 的情况
  • M 是独立的操作系统线程,拥有自己的内核上下文,而调度操作(如切换 G、处理系统调用)需要专属、稳定的栈空间和上下文,如果多个 M 共用 G0,会导致栈数据混乱、调度逻辑冲突(比如两个 M 同时用 G0 的栈执行不同的调度操作)
  • G0 的「工作内容」是 Go 运行时内置的调度核心逻辑(如保存 G 上下文、查找待运行 G、绑定 P 与 M、处理系统调用等)

理解 G0

切换动作本身(保存当前 G 的上下文,查找下一个要运行的 G,恢复该 G 的上下文)是由 schedule() 和 execute() 等函数完成的。这些代码不能运行在普通的用户 G 栈上,因为:

  1. 大小限制:普通 G 的栈一开始很小(2KB),调度逻辑可能需要更多空间。
  2. 隔离性:如果调度逻辑在用户 G 的栈上运行,用户 G 的栈空间随时可能动态增长/收缩,这会破坏调度代码的稳定性。

g0 存在的意义就是提供一个“工作台”:

  • 它有一个硕大且动态增长的栈(通常比用户 G 的初始栈大得多)。
  • M 在执行调度逻辑时,会从“用户 G”切换到“自己的 g0”,在 g0 的栈上运行调度代码。
  • 当 M 执行用户 G 时:
    • 当前 CPU 的栈指针(SP)指向用户 G 的栈。
    • M 的 g0 字段依然指向那个 g0 结构体,但 CPU 并没有在执行 g0 的代码。
    • g0 此时就像一个“工具”,被放在一旁(存在内存中),并没有被调度运行。
  • 当需要调度时:
    • M 触发上下文切换。
    • CPU 切换到 g0 的栈(由 g0 结构体中的 sched.sp 和 sched.pc 决定)。
    • M 开始执行调度代码(在 g0 的栈上)。

如图:

M的G0:

协作和抢占调度

当某个 G 执行时间过长,其他的 G 如何调度。通常有两种方案:

  • 协作式,主动让出执行权,让其他G执行。通过runtime.Gosched()可以让出执行权。
  • 抢占式,被动让出执行权,也就是调度器将G的执行权取消,分配给其他的G。Go目前默认的方式。在Go中一个G最多可以执行10ms,超时就会被让出调度权。

函数:

runtime.Gosched()

方法可以要求Go主动调度该goroutine,去执行其他的goroutine。这种是典型的协作调度模式,类似于 py 的 yield。

示例:

func GoroutineSched() {
    runtime.GOMAXPROCS(1)
    wg := sync.WaitGroup{}
    wg.Add(1)
    max := 100
    go func() {
        defer wg.Done()
        for i := 1; i <= max; i += 2 {
            fmt.Print(i, " ")
            runtime.Gosched()
            //time.Sleep(time.Millisecond)
        }
    }()

    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 2; i <= max; i += 2 {
            fmt.Print(i, " ")
            runtime.Gosched()
            //time.Sleep(time.Millisecond)
        }
    }()

    wg.Wait()
}

我们采用1个P来进行模拟,看主动让出交替执行的情况。

上面代码中,若goroutine中,没有runtime.GoSched,则会先执行完一个,再执行另一个。若存在runtime.GoSched,则会交替执行。这就是协作式。

除此之外,增加sleep时间1ms,不增加runtime.GoSched调用,也会出现交替执行的情况,这种情况就是调度器主动调度Goroutine了,是抢占式。

小结

  • Goroutine:Go语言中实现的协程。
  • go 关键字:使用go 关键字调用函数,可以让函数独立运行在Goroutine中。
  • main 函数也是运行在Goroutine中
  • 通常 main 函数需要等待其他Goroutine运行结束
  • 典型的并发等待使用 sync.WaitGroup 类型。
  • 并发Goroutine的调度在应用层面可以认为是随机的
  • 支持海量gouroutine的特点:
    • goroutine语法层面没有限制,但使用时通常要限制,避免并发的goroutine过多,资源占用过大
    • 更小的goroutine栈内存
    • 强大的GMP调度
  • GMP
    • G,Goroutine,独立并发执行的代码段
    • M,mechine, 系统线程
    • P,Processor,逻辑处理器,用于联系G和M。
    • G存在与P的本地队列或全局队列中
    • M要与P绑定,P中的G才会执行
    • M执行G中的系统调用时,会解绑M和P,P会找到新的M执行

Channel通信

Channel概述

不要通过共享内存的方式进行通信,而是应该通过通信的方式共享内存

这是Go语言最核心的设计模式之一。

Go语言中多Goroutine通信的主要方案是Channel。Go语言也可以使用共享内存的方式支持Goroutine通信。

Go语言实现了CSP通信模式,CSP是Communicating Sequential Processes的缩写,通信顺序进程。Goroutine和Channel分别对应CSP中的实体和传递信息的媒介。

  • channel类型的元素是先进先出的,先发送到channel的value会先被receive
  • 先向Channel发送数据的Goroutine会先执行
  • 先从Channel接收数据的Goroutine会先执行
  • 上面三句话从底层理解

Channel类型

package main

import "fmt"

// 生产者:仅能向通道发送int,参数为chan<- int
func producer(ch chan<- int) {
	for i := 1; i <= 3; i++ {
		ch <- i // 合法:发送操作
		fmt.Printf("生产者发送:%d\n", i)
	}
	close(ch) // 发送方负责关闭通道,单向发送通道支持关闭
}

// 消费者:仅能从通道接收int,参数为<-chan int
func consumer(ch <-chan int) {
	for num := range ch { // 合法:接收操作
		fmt.Printf("消费者接收:%d\n", num)
	}
}

func main() {
	// 调用方创建双向通道,桥接生产者和消费者
	ch := make(chan int)
	go producer(ch) // 双向通道隐式转换为仅发送通道(兼容)
	consumer(ch)    // 双向通道隐式转换为仅接收通道(兼容)
}

初始化Channel值

内建函数make()可用于初始化Channel值。支持两个参数:

make(ChannelType, Capacity)

其中:

  • ChannelType是channel类型
  • Capacity是缓冲容量。可以省略或为0,表示无缓冲Channel

channel是引用类型,类似于map和slice。

示例:

ch := make(chan int)
var ch = make(chan int)
ch := make(chan int, 10)
ch := make(<-chan int)
ch := make(chan<- int, 10)

未使用make()初始化的channel为nil。nil channel不能执行收发通信操作,例如:

var ch chan int

ch就是nil channel。 nil 通道仅关闭操作会触发 panic,发送、接收、遍历均只阻塞不 panic。

关闭channel

内置函数close()用于关闭channel。

关闭Channel的意思是记录该Channel不能再被发送任何元素了,而不是销毁该Channel的意思。也就意味着关闭的Channel是可以继续接收值的。因此:

  • 向已关闭的Channel发送会引发runtime panic
  • 关闭nil Channel会引发runtime panic
  • 不能关闭仅接收Channel

当从已关闭的Channel接收时:

  • 可以接收关闭前发送的全部值
  • 若没有已发送的值会返回类型的零值,不会被阻塞

使用接收操作符的多值返回结构,可以判断Channel是否已经关闭:

var x, ok = <-ch
x, ok := <-ch
  • ok为true,channel未关闭
  • ok为false,channel已关闭

for range channel

for语句的range子句可以持续地从Channel中接收元素,语法如下:

for e := range ch {
    // e是ch中元素值
}

持续接收操作与接收操作<-行为一致:

  • 若ch为nil channel会阻塞
  • 若ch没有已发送元素会阻塞

for会持续执行到channel被关闭,关闭后,若channel中存在已发送元素,for会全部读取完毕。

缓冲与无冲 channel

Channel区别于是否存在缓冲区,分为:

  • 缓冲Channel,make(chan T, cap),cap是大于0的值。
  • 无缓冲Channel, make(chan T), make(chan T, 0)

无缓冲channel

也称为同步Channel,只有当发送方和接收方都准备就绪时,通信才会成功。

同步操作示例:

func ChannelSync() {
    // 初始化数据
    ch := make(chan int)
    wg := sync.WaitGroup{}

    // 间隔发送
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 5; i++ {
            ch <- i
            println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999"))
            // 间隔时间
            time.Sleep(1 * time.Second)
        }
        close(ch)
    }()

    // 间隔接收
    wg.Add(1)
    go func() {
        defer wg.Done()
        for v := range ch {
            println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999"))
            // 间隔时间,注意与send的间隔时间不同
            time.Sleep(3 * time.Second)
        }
    }()

    wg.Wait()
}

代码中,采用同步channel,使用两个goroutine完成发送和接收。每次发送和接收的时间间隔不同。我们分别打印发送和接收的值和时间。注意结果:

  • 发送和接收时间一致
  • 间隔以长的为准,可见发送和接收操作为同步操作

同步Channel适合在gotoutine间做同步信号!

缓冲Channel

缓冲Channel也称为异步Channel,接收和发送方不用等待双方就绪即可成功。缓冲Channel会存在一个容量为cap的缓冲空间。当使用缓冲Channel通信时,接收和发送操作是在操作Channel的Buffer:

func ChannelASync() {
    // 初始化数据
    ch := make(chan int, 5)
    wg := sync.WaitGroup{}

    // 间隔发送
    wg.Add(1)
    go func() {
        defer wg.Done()
        for i := 0; i < 5; i++ {
            ch <- i
            println("Send ", i, ".\tNow:", time.Now().Format("15:04:05.999999999"))
            // 间隔时间
            time.Sleep(1 * time.Second)
        }
    }()

    // 间隔接收
    wg.Add(1)
    go func() {
        defer wg.Done()
        for v := range ch {
            println("Received ", v, ".\tNow:", time.Now().Format("15:04:05.999999999"))
            // 间隔时间,注意与send的间隔时间不同
            time.Sleep(3 * time.Second)
        }
    }()

    wg.Wait()
}

代码中,与同步channel一致,只是采用了容量为5的缓冲channel,使用两个goroutine完成发送和接收。每次发送和接收的时间间隔不同。我们分别打印发送和接收的值和时间。注意结果:

  • 发送和接收时间不同
  • 发送和接收操作不会阻塞,可见发送和接收操作为异步操作

缓冲channel非常适合做goroutine的数据通信了。

长度和容量,len()和cap()

内置函数 len() 和 cap() 可以分别获取:

  • len()长度,缓冲中元素个数。
  • cap()容量,缓冲的总大小。cap()返回0,意味着是无缓冲通道

单向Channel

使用单向通道约束双向通道的操作。

语法上来说,就是我们会将双向Channel转换为单向Channel来使用。典型使用在函数参数或返回值类型中。

示例代码:

func ChannelDirectional() {
    // 初始化数据
    ch := make(chan int)
    wg := &sync.WaitGroup{}

    // send and receive
    wg.Add(2)
    go setElement(ch, 42, wg)
    go getElement(ch, wg)

    wg.Wait()
}

// only receive channel
func getElement(ch <-chan int, wg *sync.WaitGroup) {
    defer wg.Done()

    println("received from ch, element is ", <-ch)
}

// only send channel
func setElement(ch chan<- int, v int, wg *sync.WaitGroup) {
    defer wg.Done()

    ch <- v
    println("send to ch, element is ", v)
}

函数getElement和setElement,分别使用了单向的接收和发送channel,在语义上表示只能接收和只能发送操作,同时程序上限定了操作。

典型的单向Channel的标准库例子:

// signal.Notify()
func Notify(c chan<- os.Signal, sig ...os.Signal)

// time.After
func After(d Duration) <-chan Time

以上两个示例分别展示了单向Channel作为函数参数和函数返回值的语法。

Channel结构

channel 的阻塞机制

:::info
抛出问题:go的channel的阻塞机制怎么实现的。整个channel消费唤醒是用了信号通知么。等待的goruntine进入等待和唤醒都是通过系统调用么

:::

问题解答:

Go 的 Channel 阻塞与唤醒机制是 Go 调度器(GMP 模型)与底层操作系统(主要是 futex(负责把 G 由运行态转为阻塞态,GO 的线程没有阻塞态))协同工作的结果。

它没有直接使用 POSIX 信号或条件变量(pthread condition variable)那种“通知-回调”模式。而是采用了基于内存状态的自旋 + 阻塞队列 + futex 系统调用的组合策略。

核心数据结构:hchan 和 sudog

Channel的结构定义为 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">runtime.hchan</font>

// GOROOT/src/runtime/chan.go
type hchan struct {
    qcount   uint           // 元素个数。len()
    dataqsiz uint           // 缓冲队列的长度。cap()
    buf      unsafe.Pointer // 缓冲队列指针,无缓冲队列为nil
    elemsize uint16 // 元素大小
    
    closed   uint32
    
    elemtype *_type // 元素类型
    
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}
  • hchan (Channel Header): 这是 Channel 在内存中的对象。
    • sendq 和 recvq: 这两个是核心的等待队列。它们存储的是 sudog (Suspended Goroutine) 的指针。
      • sendq: 在 channel 无人接收数据而阻塞的发送者队列。
      • recvq: 在 channel 无数据可取而阻塞的接收者队列。
    • buf: 环形缓冲区(如果是 buffered channel)。
    • mutex: 一个互斥锁,保护上述所有字段。
    • 若为无缓冲channel,不分配channel.buf空间
  • make()初始化的核心操作就是分配内存空间

缓冲数组

缓冲为数组结构,channel记录发送和接收元素的索引:

sendx    uint   // 发送索引
 recvx    uint   // 接收索引

缓冲数组是循环使用的,也就是若数组的最后一个元素存储了元素,那么下一次会尝试存储在第一个元素位置。

Channel与Goroutine的关系

Channel记录两个属性,由于记录等待接收和发送的goroutine队列:

recvq    waitq  // 等待接收goroutine队列
sendq    waitq  // 等待发送goroutine队列

当基于某channel的接收或发送的goroutine需要阻塞时,会被记录到Channel的等待队列中。当channel可以完成相应的接收或发送操作时,从等待队列中唤醒goroutine进行操作。

其中等待队列是 runtime.waitq 类型,是一个双向链表结构,具体的某个链表节点存在两个指针,指向前后节点:

// GOROOT/src/runtime/chan.go
type waitq struct {
    first *sudog
    last  *sudog
}
  • sudog (Suspended Goroutine):
    • 这是 Go 运行时用来表示“等待中 goroutine”的结构体。
    • 它不是 goroutine 本身(G 结构体),而是一个中介。它持有:
      • 指向被阻塞的 G(goroutine)的指针。
      • 等待发送 / 接收的数据指针(val)。 elem Unsafe.Pointer
      • 指向下一个和上一个 sudog 的指针(用于链表)。
      • 等等

初始化channel流程

make()初始化channel时,会根据是否存在缓冲,选择:

  • 存在缓冲,为channel和buffer分别分配内存,同时channel.buf指向buffer地址
  • 不存在缓冲,仅为channel分配内存,channel.buf为nil。
  • 初始化channel中其他属性

向 channel 发送流程

语句 ch <- element 向 channel 发送元素时,大体的执行流程如下:

  • 加锁: 先获取 hchan 的 mutex。
  • 直接发送:如果 recvq 不为空(说明有人在等数据),直接从 recvq 头部取出一个 sudog,把数据拷贝给它(写入接收者的变量),然后唤醒这个接收者(goready),将其放在M的 runnext 位置,下次调度立即执行
  • 直接写缓冲区,当缓冲区存在空间时,将发送元素直接写入缓冲区,调整 channel.sendx 的位置
  • 阻塞发送,当缓冲区已满或无缓冲区时,创建 sudog: 创建一个新的 sudog,把要发送的数据 x 存在里面,关联当前的 G 后进入 channel.sendq 队列,调用 gopark,将当前的 G 状态设置为 等待态 (Waiting),注意了,G 进入等待,M 不会,整个过程不调用 futex,并把它从 P 的运行队列中移除。等待其他 goroutine 从 channel 中接收元素,进而唤醒发送 goroutine

gopark 函数与线程协程状态讲解

暂停 G,M 的 P 制空,触发调度,然后判断是否需要挂起线程(需要的话要调用 futex)

只有当 M 找不到任何可运行的 G 时:

  1. M 自旋 (Spinning) 一会儿,试图找 G。
  2. 如果全局都没有 G,M 会调用 futex 彻底休眠。
  3. 直到有新的 G 被创建(例如新建一个协程),或者有 G 被唤醒,才会把 M 唤醒。
  • M (线程) 有三种常见状态:
    • Running: 正在执行 G。
    • Spinning (自旋): M 空闲,但在努力找 G 跑(为了避免频繁休眠/唤醒,浪费 CPU 缓存)。
    • Idle (空闲): M 彻底没事干,进入休眠(调用 futex 等待被唤醒)。
    • 问:怎么进入的空闲态,futex 和 gopark 调用会进入内核态么,是系统调用么,线程 M 携带执行了系统调用的 G 是什么状态?
    • futex 会让线程进入空闲态,futex 是系统调用,运行态。

从channel接收流程

当另一个 goroutine 执行 <- ch 时:

  • 加锁。

关闭channel流程

close(ch)关闭channel,主要工作是:

  • 取消channel关联的sendq和recvq队列
  • 调度阻塞在sendq和recvq中的goroutine

select 语句

执行流程

for + select

select 匹配到可操作的case或者是defaultcase后,就执行完毕了。实操时,我们通常需要持续监听某些channel的操作,因此典型的select使用会配合for完成。

阻塞select

以下典型的情况会直接导致阻塞goroutine:

  • 不存在任何case的
  • case监听都是nil channel

示例:

func SelectBlock() {
    // 空select阻塞
    println("before select")
    select {}
    println("after select")

    // nil select阻塞
    var ch chan int
    go func() {
        ch <- 1024
    }()
    println("before select")
    select {
    case <-ch:
    case ch <- 42:
    }
    println("after select")
}

go test 测试时,会一直阻塞。若上面的代码出现在常规执行流程中,会导致 deadlock。

nil channel的case

nil channel 不能读写,因此通过将channel设置为nil,可以控制某个case不再被执行。

例如,3秒后,不再接受ch的数据:

func SelectNilChannel() {
    ch := make(chan int)
    // 写channel
    go func() {
        // 随机写入int
        rand.Seed(time.Now().Unix())
        for {
            ch <- rand.Intn(10)
            time.Sleep(400 * time.Millisecond)
        }
    }()

    // 读channel
    go func() {
        sum := 0
        t := time.After(3 * time.Second)
        for {
            select {
            case v := <-ch:
                println("received value: ", v)
                sum += v
            case <-t:
                // 将channel设置为nil,不再读写
                ch = nil
                println("ch was set nil, sum is ", sum)
            }
        }

    }()

    // sleep 5 秒
    time.Sleep(5 * time.Second)
}

带有default的select,非阻塞收发

当select语句存在default case时:

  • 若没有可操作的channel,会执行default case
  • 若有可操作的channel,会执行对应的case

这样select语句不会进入block状态,称之为非阻塞(non-block)的收发(channel 的接收和发送)。

示例:多人猜数字游戏,我们在乎是否有人猜中数字:

func SelectNonBlock() {
    // 初始化数据
    counter := 10 // 参与人数
    max := 20     // [0, 19] // 最大范围
    rand.Seed(time.Now().UnixMilli())
    answer := rand.Intn(max) // 随机答案
    println("The answer is ", answer)
    println("------------------------------")

    // 正确答案channel
    bingoCh := make(chan int, counter)
    // wg
    wg := sync.WaitGroup{}
    wg.Add(counter)
    for i := 0; i < counter; i++ {
        // 每个goroutine代表一个猜数字的人
        go func() {
            defer wg.Done()
            result := rand.Intn(max)
            println("someone guess ", result)
            // 答案争取,写入channel
            if result == answer {
                bingoCh <- result
            }
        }()
    }
    wg.Wait()

    println("------------------------------")
    // 是否有人发送了正确结果
    // 可以是0或多个人
    // 核心问题是是否有人猜中,而不是几个人
    select {
    case result := <-bingoCh:
        println("some one hint the answer ", result)
    default:
        println("no one hint the answer")
    }
}

特别的情况是存在两个case,其中一个是default,另一个是channel case,那么go的优化器会优化内部这个select。内部会以if结构完成处理。因为这种情况,不用考虑随机性的问题。类似于:

select {
    case result := <-bingoCh:
    println("some one hint the answer ", result)
    default:
    // 非阻塞的保证,存在default case
    println("no one hint the answer")
}

// 优化伪代码
if selectnbrecv(bingoCh) {
    println("some one hint the answer ", result)
} else {
    println("no one hint the answer")
}

Race模式

Race模式,典型的并发执行模式之一,多路同时操作资源,哪路先操作成功,优先使用,同时放弃其他路的等待。简而言之,从多个操作中选择一个最快的。核心工作:

  • 选择最快的
  • 停止其他未完成的

示例代码,示例从多个查询器同时读取数据,使用最先反返回结果的,其他查询器结束:

func SelectRace() {
    // 一,初始化数据
    // 模拟查询结果,需要与具体的querier建立联系
    type Rows struct {
        // 数据字段

        // 索引标识
        Index int
    }
    // 模拟的querier数量
    const QuerierNum = 8
    // 用于通信的channel,数据,停止信号
    ch := make(chan Rows, 1)
    stopChs := [QuerierNum]chan struct{}{}
    for i := range stopChs {
        stopChs[i] = make(chan struct{})
    }
    // wg,rand
    wg := sync.WaitGroup{}
    rand.Seed(time.Now().UnixMilli())

    // 二,模拟querier查询,每个查询持续不同的时间
    wg.Add(QuerierNum)
    for i := 0; i < QuerierNum; i++ {
        // 每一个 querier
        go func(i int) {
            defer wg.Done()
            // 模拟执行时间
            randD := rand.Intn(1000)
            println("querier ", i, " start fetch data, need duration is ", randD, " ms.")
            // 查询结果的channel
            chRst := make(chan Rows, 1)

            // 执行查询工作
            go func() {
                // 模拟时长
                time.Sleep(time.Duration(randD) * time.Millisecond)
                chRst <- Rows{
                    Index: i,
                }
            }()

            // 监听查询结果和停止信号channel
            select {
            // 查询结果
            case rows := <-chRst:
                println("querier ", i, " get result.")
                // 保证没有其他结果写入,才写入结果
                if len(ch) == 0 {
                    ch <- rows
                }
            // stop信号
            case <-stopChs[i]:
                println("querier ", i, " is stopping.")
                return
            }

        }(i)
    }

    // 三,等待第一个查询结果的反馈
    wg.Add(1)
    go func() {
        defer wg.Done()
        // 等待ch中传递的结果
        select {
        // 等待第一个查询结果
        case rows := <-ch:
            println("get first result from ", rows.Index, ". stop other querier.")
            // 循环结构,全部通知querier结束
            for i := range stopChs {
                // 当前返回结果的goroutine不需要了,因为已经结束
                if i == rows.Index {
                    continue
                }
                stopChs[i] <- struct{}{}
            }

        // 计划一个超时时间
        case <-time.After(5 * time.Second):
            println("all querier timeout.")
            // 循环结构,全部通知querier结束
            for i := range stopChs {
                stopChs[i] <- struct{}{}
            }
        }
    }()

    wg.Wait()
}

其中核心点:

  • 获取了结果,通知结束
  • 通过多个无缓冲channel通知goroutine结束
  • 通过缓冲channel传递结果

执行结果示例:

querier  2  start fetch data, Need duration is  674  ms.
querier  6  start fetch data, Need duration is  695  ms.
querier  1  start fetch data, Need duration is  484  ms.
querier  4  start fetch data, Need duration is  544  ms.
querier  0  start fetch data, Need duration is  101  ms.
querier  7  start fetch data, Need duration is  233  ms.
querier  5  start fetch data, Need duration is  721  ms.
querier  3  start fetch data, Need duration is  727  ms.
querier  0  get result.
get first result from  0 . stop other querier.
querier  7  is stopping.
querier  2  is stopping.
querier  4  is stopping.
querier  6  is stopping.
querier  5  is stopping.
querier  1  is stopping.
querier  3  is stopping.

All 模式

Race模式是多个Goroutine获取相同的结果,优先使用快速响应的。

而All模式是多个Goroutine分别获取结果的各个部分,全部获取完毕后,组合成完整的数据,要保证全部的Goroutine都响应后,继续执行。

无缓冲Channel+关闭作典型同步信号

基于:

  • 无缓冲Channel是同步的
  • closed 的channel是可以接收内容的

以上两点原因,经常使用关闭无缓冲channel的方案来作为信号传递使用。前提是,信号纯粹是信号,没有其他含义,比如关闭时间等。

示例代码:

func SelectChannelCloseSignal() {
	wg := sync.WaitGroup{}
	// 定义无缓冲channel
	// 作为一个终止信号使用(啥功能的信号都可以,信号本身不分功能)
	ch := make(chan struct{})

	// goroutine,用来close, 表示
发出信号
	wg.Add(1)
	go func() {
		defer wg.Done()
		time.Sleep(2 * time.Second)
		fmt.Println("发出信号, close(ch)")
		close(ch)
	}()

	// goroutine,接收ch,表示接收信号
	wg.Add(1)
	go func() {
		defer wg.Done()
		// 先正常处理,等待ch的信号到来
		for {
			select {
			case <-ch:
				fmt.Println("收到信号, <-ch")
				return
			default:

			}
			// 正常的业务逻辑
			fmt.Println("业务逻辑处理中....")
			time.Sleep(300 * time.Millisecond)
		}
	}()

	wg.Wait()
}

// ====
> go test -run TestSelectChannelCloseSignal
业务逻辑处理中....
业务逻辑处理中....
业务逻辑处理中....
业务逻辑处理中....
业务逻辑处理中....
业务逻辑处理中....
业务逻辑处理中....
发出信号, close(ch)
收到信号, <-ch
PASS
ok      goConcurrency   2.168s

ps: close channel 后,<-ch 得到零值

:::info
那for range阻塞读取channel,此时关闭channel怎么办,需要在for range的处理逻辑里添加针对于零值的处理么

不需要,for range 会自动检测关闭并退出。

:::

signal.Notify 信号通知监控

系统信号也是通过channel与应用程序交互,例如典型的 ctrl+c 中断程序, <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">os.Interrupt</font>,若不监控系统信号,ctrl+c后程序会直接终止,而如果监控了信号,那么可以在ctrl+c后,执行一系列的关闭处理,例如:

func SelectSignal() {
    // 一:模拟一段长时间运行的goroutine
    go func() {
        for {
            fmt.Println(time.Now().Format(".15.04.05.000"))
            time.Sleep(300 * time.Millisecond)
        }
    }()

    // 要求主goroutine等待上面的goroutine,方案:
    // 1. wg.Wait()
    // 2. time.Sleep()
    // 3. select{}

    // 持久阻塞
    //select {}

    // 二,监控系统的中断信号,interrupt
    // 1 创建channel,用于传递信号
    chSignal := make(chan os.Signal, 1)
    // 2 设置该channel可以监控哪些信号
    signal.Notify(chSignal, os.Interrupt)
    //signal.Notify(chSignal, os.Interrupt, os.Kill)
    //signal.Notify(chSignal) // 全部类型的信号都可以使用该channel
    // 3 监控channel
    select {
    case <-chSignal:
        fmt.Println("received os signal: Interrupt")
    }
}

定时器与断续器,Timer&Ticker

Timer&Ticker是Go标准包time中定义的类型,通过Channel与程序进行通信。

time包中两个与Channel紧密关联的结构:

// 定时器
time.Timer
// 断续器
time.Ticker
  • 定时器Timer类似于一次性闹钟
  • 断续器Ticker类似于重复性闹钟,循环定时器

无论是一次性还是重复性计时器,都是通过Channel与应用程序交互的。我们通过监控Timer和Ticker返回的Channel,来确定是否到时的需求。

定时器

使用语法:

// time.NewTimer
func NewTimer(d Duration) *Timer

创建定时器。参数是Duration时间。返回为 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Timer</font><font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Timer.C</font>是用来接收到期通知的单向Channel。

type Timer struct {
    C <-chan Time
}

因此我们只要可从 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Timer.C</font>上接收数据,就意味着定时器时间到。接收到的元素是 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">time.Time</font> 类型数据,为到时时间。

示例:

func TimerA() {
    t := time.NewTimer(time.Second)
    println("Set the timer, \ttime is ", time.Now().String())

    now := <-t.C
    println("The time is up, time is ", now.String())
}

Timer除了C之外,还有两个方法:

// 停止计时器
// 返回值bool类型,返回false,表示该定时器早已经停止,返回true表示由本次调用停止
func (t *Timer) Stop() bool

// 重置定时器
// 返回值bool类型,返回false,表示该定时器早已经停止,返回true表示由本次调用重置
func (t *Timer) Reset(d Duration) bool

使用这两个方法,可以完整定时器的业务逻辑。

示例代码,简单的猜数字游戏,共猜5次,每次有超时时间3秒钟:

func TimerB() {
    ch := make(chan int)

    // 写channel
    go func() {
        // 随机写入int
        for {
            ch <- rand.Intn(10)
            time.Sleep(400 * time.Millisecond)
        }
    }()

    // 每局时间
    t := time.NewTimer(time.Second * 3)
    hint, miss := 0, 0
    // 统计结果,共玩5次
    for i := 0; i < 5; i++ {
    guess:
        for {
            select {
            case v := <-ch:
                println("guess value: ", v)
                if v == 4 {
                    println("Bingo! some one hint the answer.")
                    // 新游戏,重置定时器
                    t.Reset(time.Second * 3)
                    hint++
                    break guess
                }
            case <-t.C:
                println("The time is up, no one hint.")
                miss++
                // 重新创建定时器
                t = time.NewTimer(time.Second * 3)
                break guess
            }
        }
    }
    println("Game Over! Hint ", hint, ", Miss ", miss)
}
// 可能出现 Timer 已经超时,但 t.C 的值还没被读取。
// // 3秒后,Timer 触发
// 此时 t.C 中已经有了一个值(等待被读取)

// 如果在这个状态下调用 t.Reset()
// 旧值还在 t.C 里!
// <-t.C 仍然会立即返回(读取到 3秒超时的那个旧值)

代码在猜中或者时间到时,要重置或新建定时器。

如果不需要定时器的关闭和重置操作,可以使用函数:

func After(d Duration) <-chan Time

直接返回定时器到期的通知Channel。

func TimerC() {
    ch := time.After(time.Second)
    println("Set the timer, \ttime is ", time.Now().String())

    now := <-ch
    println("The time is up, time is ", now.String())
}

如果希望在定时器到期时执行特定函数,可以使用如下函数:

func AfterFunc(d Duration, f func()) *Timer

该函数返回*Timer用于控制定时器,例如Stop或Reset.

断续器

也叫循环定时器。

使用语法:

func NewTicker(d Duration) *Ticker

创建断续器。参数是Duration时间。返回为 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Ticker</font><font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Ticker.C</font>是用来接收到期通知的单向Channel。

type Ticker struct {
    C <-chan Time // The channel on which the ticks are delivered.
}

因此我们只要可从 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Ticker.C</font>上接收数据,就意味着断续器时间到。接收到的元素是 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">time.Time</font> 类型数据,为到时时间。当接收到到期时间后,间隔下一个Duration还会再次接收到到期时间。

<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*Ticker</font>也有方法:

// 停止断续器
func (t *Ticker) Stop()
// 重置断续器间隔时间
func (t *Ticker) Reset(d Duration)

示例:

func TickerA() {
    // 断续器
    ticker := time.NewTicker(time.Second)

    // 定时器
    timer := time.After(5 * time.Second)
loop: // 持续心跳
    for now := range ticker.C {
        println("now is ", now.String())
        // heart beat
        println("http.Get(\"/ping\")")

        // 非阻塞读timer,到时结束断续器
        select {
        case <-timer:
            ticker.Stop()
            break loop
        default:
        }
    }
}

代码模拟了一个心跳程序,间隔1秒,发送ping操作。整体到时,运行结束。

计时器底层

不是创建单独的 goroutine 来计时!Go 使用全局计时器管理器,所有 Timer 共享资源。

创建 Timer 时:

用户代码
    ↓
t = &timer{
    C: make(chan Time, 1),   // 这里的 C 是普通的 channel
    when: 某时间戳,           // 触发时间点
    period: 0,               // 周期(0表示不重复)
}
    ↓
runtime 将 Timer 注册到全局计时器堆
┌─────────────────────────────────────────────────────┐
│              全局计时器堆 (小根堆)                    │
│                                                      │
│   Timer结构: { when: 时间戳, C: channel }            │
│                                                      │
│     when=100  ─────────┐                             │
│     when=150  ──────────────┐                       │
│     when=200  ───────────────────────┐              │
│     when=250  ───────────────────────────────┐      │
│     when=300  ─────────────────────────────────────┤
└─────────────────────────────────────────────────────┘
                      ↓
              runtime 监控最小 when
绝对时间戳

不需要遍历,不需要做减法

时间线:
─────────────────────────────────────────────────────────────►

now=50:
  堆顶 when=100
  sleep(100-50) = 50毫秒
         │
         │  50ms 后 now=100
         ▼
now=100:
  堆顶 when=100 <= now ✓ 到期!
  pop 处理
  新的堆顶 when=150
  sleep(150-100) = 50ms
         │
         │  50ms 后 now=150
         ▼
now=150:
  堆顶 when=150 <= now ✓ 到期!
  pop 处理
  ...继续循环

小结

Channel的分类

  • nil channel
  • 缓冲Channel
  • 无缓冲Channel
  • 单向Channel

Channel的操作

  • 初始化,make(channel type[, cap])
  • 发送,ch <- expression
  • 接收, v, ok := <- ch
  • 遍历接收,for e := range ch {}
  • 关闭, close(ch)

select语句

  • channel的多路复用
  • 执行第一个可以操作channel的case
  • 若同时多个channel可操作随机选择case避免饥饿case的出现
  • 增加default case可以达到非阻塞channel操作的目的
  • 经常配合for select使用循环多路监听
  • 典型的多路模式有:Race和All

timer和ticker

  • 定时器,到时执行一次,可以在到时前,重置或提前结束
  • 断续器,配置间隔重复执行,重复定时器,可以重置间隔时间和提前结束

Context上下文

Context概述

Go 1.7 标准库引入 context,译作“上下文”,准确说它是 goroutine 的上下文,包含 goroutine 的运行状态、环境、现场等信息。

context 主要用来在 goroutine 之间传递上下文信息,包括:取消信号、超时时间、截止时间、k-v 等。

随着 context 包的引入,标准库中很多接口因此加上了 context 参数,例如 database/sql 包。context 几乎成为了并发控制和超时控制的标准做法。

在一组goroutine 之间传递共享的值、取消信号、deadline是Context的作用

以典型的HTTPServer为例:

我们以 Context II为例,若没有上下文信号,当其中一个goroutine出现问题时,其他的goroutine不知道,还会继续工作。这样的无效的goroutine积攒起来,就会导致goroutine雪崩,进而导致服务宕机!

Context 核心结构

<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.Context</font> 是 Go 语言在 1.7 版本中引入标准库的接口,该接口定义了四个需要实现的方法:

type Context interface {
    // 返回被取消的时间
	Deadline() (deadline time.Time, ok bool)
    // 返回用于通知Context完结的channel
    // 当这个 channel 被关闭时,说明 context 被取消了
    // 在子协程里读这个 channel,除非被关闭,否则读不出来任何东西
	Done() <-chan struct{}
    // 返回Context取消的错误
    Err() error
    // 返回key对应的value
	Value(key any) any
}

除了Context接口,还存在一个canceler接口,用于实现Context可以被取消:

type canceler interface {
	cancel(removeFromParent bool, err error)
	Done() <-chan struct{}
}

除了以上两个接口,还有4个预定义的Context类型:

// 空Context
type emptyCtx int

// 取消Context
type cancelCtx struct {
	Context
	mu       sync.Mutex            // protects following fields
	done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
	children map[canceler]struct{} // set to nil by the first cancel call
	err      error                 // set to non-nil by the first cancel call
}

// 定时取消Context
type timerCtx struct {
	cancelCtx
	timer *time.Timer // Under cancelCtx.mu.

	deadline time.Time
}

// KV值Context
type valueCtx struct {
	Context
	key, val any
}

默认(空)Context的使用

context 包中最常用的方法是 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.Background</font><font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.TODO</font>,这两个方法都会返回预先初始化好的私有变量 background 和 todo,它们会在同一个 Go 程序中被复用:

  • context.Background, 是上下文的默认值,所有其他的上下文都应该从它衍生出来,在多数情况下,如果当前函数没有上下文作为入参,我们都会使用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.Background</font> 作为起始的上下文向下传递。
  • context.TODO,是一个备用,一个context占位,通常用在并不知道传递什么 context的情形。

使用示例,<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">database/sql</font>包中的执行:

func (db *DB) PingContext(ctx context.Context) error
func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)
func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)
func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row

方法,其中第一个参数就是context.Context。

例如:操作时:

db, _ := sql.Open("", "")
query := "DELETE FROM `table_name` WHERE `id` = ?"
db.ExecContext(context.Background(), query, 42)

当然,单独 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">database.sql</font>包中,也支持不传递context.Context的方法。功能一致,但缺失了context.Context相关功能。

func (db *DB) Exec(query string, args ...any) (Result, error)

context.Background 和 context.TODO 返回的都是预定义好的 emptyCtx 类型数据,其结构如下:

// 创建方法
func Background() Context {
    return background
}
func TODO() Context {
    return todo
}

// 预定义变量
var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)

// emptyCtx 定义
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key any) any {
    return nil
}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

可见,emptyCtx 是不具备取消、KV值和Deadline的相关功能的,称为空Context,没有任何功能。

Context传递取消信号

context.WithCancel 函数能够从 context.Context 中衍生出一个新的子上下文并返回用于取消该上下文的函数。一旦我们执行返回的取消函数,当前上下文以及它的子上下文都会被取消,所有的 Goroutine 都会同步收到这一取消信号。取消操作通常分为主动取消,定时取消两类。

主动取消

需要的操作为:

  • 创建带有cancel函数的Context,func WithCancel(parent Context) (ctx Context, cancel CancelFunc)
  • 接收cancel的Channel,ctx.Done()
  • 主动Cancel的函数,cancel CancelFunc

示例代码:

func ContextCancelCall() {
    // 1. 创建cancelContext
    ctx, cancel := context.WithCancel(context.Background()) // 会帮助创建Context

    wg := sync.WaitGroup{}
    wg.Add(4)
    // 2. 启动goroutine,携带cancelCtx
    for i := 0; i < 4; i++ {
        // 启动goroutine,携带ctx参数
        go func(c context.Context, n int) {
            defer wg.Done()
            // 监听context的取消完成channel,来确定是否执行了主动cancel操作
            for {
                select {
                // 等待接收c.Done()这个channel
                case <-c.Done():
                    fmt.Println("Cancel")
                    return
                default:

                }
                fmt.Println(strings.Repeat("  ", n), n)
                time.Sleep(300 * time.Millisecond)
            }
        }(ctx, i)
    }

    // 3. 主动取消 cancel()
    // 3s后取消
    select {
    case <-time.NewTimer(2 * time.Second).C:
        cancel() // ctx.Done() <- struct{}
    }

    select {
    case <-ctx.Done():
        fmt.Println("main Cancel")
    }

    wg.Wait()

}

// ======
> go test -run TestContextCancelCall
       3
   1  
 0  
     2
   1
       3
     2  
 0  
 0
   1  
       3
     2  
     2
   1
       3
 0
 0
   1
       3
     2
     2
   1
 0
       3
       3
 0
   1
     2
main Cancel
Cancel
Cancel
Cancel
Cancel
PASS
ok      goConcurrency   2.219s

当调用cancel()时,全部的goroutine会从 ctx.Done() 接收到内容,进而完成后续控制操作。

<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">func WithCancel(parent Context) (ctx Context, cancel CancelFunc)</font> 函数返回的Context是 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.cancelCtx</font> 结构体对象,以及一个CancelFunc。

其中 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.cancelCtx</font> 结构如下:

// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
    Context

    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}

其中:

  • Context,上级Context对象
  • mu, 互斥锁
  • done,用于处理cancel通知信号的channel。懒惰模式创建,调用cancel时关闭。
  • children,以该context为parent的可cancel的context们
  • err,error

Deadline和Timeout定时取消

与主动调用 CancelFunc 的差异在于,定时取消,增加了一个到时自动取消的机制:

  • Deadline,某个时间点后,使用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">func WithDeadline(parent Context, d time.Time) (Context, CancelFunc)</font>创建
  • Timeout,某个时间段后,使用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc)</font> 创建

示例代码如下,与主动cancel的代码类似:

// 1s后cancel
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)

// 每天 20:30 cancel
curr := time.Now()
t := time.Date(curr.Year(), curr.Month(), curr.Day(), 20, 30, 0, 0, time.Local)
ctx, cancel := context.WithDeadline(context.Background(), t)

其他代码一致,当时间到时,ctx.Done() 可以接收内容,进而控制goroutine停止。

不论WithDeadline和WithTimeout都会构建 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">*timerCtx</font> 类型的Context,结构如下:

// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
   cancelCtx
   timer *time.Timer // Under cancelCtx.mu.

   deadline time.Time
}

其中:

  • cancelCtx,基于parent构建的cancelCtx
  • deadline,cancel时间
  • timer,定时器,用于自动cancel

Cancel操作的向下传递

当父上下文被取消时,子上下文也会被取消。Context 结构如下:

ctxOne
  |    \
ctxTwo    ctxThree
  |
ctxFour

示例代码:

func ContextCancelDeep() {
    ctxOne, cancel := context.WithCancel(context.Background())
    ctxTwo, _ := context.WithCancel(ctxOne)
    ctxThree, _ := context.WithCancel(ctxOne)
    ctxFour, _ := context.WithCancel(ctxTwo)

    // 带有timeout的cancel
    //ctxOne, _ := context.WithTimeout(context.Background(), 1*time.Second)
    //ctxTwo, cancel := context.WithTimeout(ctxOne, 1*time.Second)
    //ctxThree, _ := context.WithTimeout(ctxOne, 1*time.Second)
    //ctxFour, _ := context.WithTimeout(ctxTwo, 1*time.Second)

    cancel()
    wg := sync.WaitGroup{}
    wg.Add(4)
    go func() {
        defer wg.Done()
        select {
        case <-ctxOne.Done():
            fmt.Println("one cancel")
        }
    }()
    go func() {
        defer wg.Done()
        select {
        case <-ctxTwo.Done():
            fmt.Println("two cancel")
        }
    }()
    go func() {
        defer wg.Done()
        select {
        case <-ctxThree.Done():
            fmt.Println("three cancel")
        }
    }()
    go func() {
        defer wg.Done()
        select {
        case <-ctxFour.Done():
            fmt.Println("four cancel")
        }
    }()
    wg.Wait()
}

我们调用 ctxOne 的 cancel, 其后续的context都会接收到取消的信号。

如果调用了其他的cancel,例如ctxTwo,那么ctxOne和ctxThree是不会接收到信号的。

取消操作流程

创建cancelCtx的流程

使用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.WithCancel</font>, <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.WithDeadlime</font>, <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.WithTimeout</font> 创建cancelCtx或timerCtx的核心过程基本一致,以 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.WithCancel</font> 为例:

func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    // 构建cancelCtx对象
    c := newCancelCtx(parent)
    // 传播Cancel操作
    propagateCancel(parent, &c)
    // 返回值,注意第二个cancel函数的实现
    return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}

由此可见,核心过程有两个:

  • newCancelCtx, 使用 parent 构建 cancelCtx
  • propagateCancel, 传播Cancel操作,用来构建父子Context的关联,用于保证在父级Context取消时可以同步取消子级Context

核心的propagateCancel 的实现如下:

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    // parent不会触发cancel操作
    done := parent.Done()
    if done == nil {
        return // parent is never canceled
    }

    // parent已经触发了cancel操作
    select {
    case <-done:
        // parent is already canceled
        child.cancel(false, parent.Err())
        return
    default:
    }

    // parent还没有触发cancel操作
    if p, ok := parentCancelCtx(parent); ok {
        // 内置cancelCtx类型
        p.mu.Lock()
        if p.err != nil {
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            // 将当前context放入parent.children中
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        // 非内置cancelCtx类型
        atomic.AddInt32(&goroutines, +1)
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}

以上代码在建立child和parent的cancelCtx联系时,处理了下面情况:

  • parent不会触发cancel操作,不做任何操作,直接返回
  • parent已经触发了cancel操作,执行child的cancel操作,返回
  • parent还没有触发cancel操作,<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">child</font> 会被加入 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">parent</font><font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">children</font> 列表中,等待 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">parent</font> 释放取消信号
  • 如果是自定义Context实现了可用的Done(),那么开启goroutine来监听parent.Done()和child.Done(),同样在parent.Done()时取消child。

如果是WithDeadline构建的timerCtx,构建的过程多了两步:

  • 对截至时间的判定,判定是否已经截至
  • 设置定时器

示例代码:

func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
    propagateCancel(parent, c)

    dur := time.Until(d)
    // 已过时
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
    // 设置定时器
    if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}
ctx.Done() 初始信号channel流程

以 cancelCtx 为例:

func (c *cancelCtx) Done() <-chan struct{} {
    // 加载已经存在的
    d := c.done.Load()
    if d != nil {
        return d.(chan struct{})
    }
    c.mu.Lock()
    defer c.mu.Unlock()

    // 初始化新的
    d = c.done.Load()
    if d == nil {
        d = make(chan struct{})
        c.done.Store(d)
    }
    return d.(chan struct{})
}

其中两个步骤:

  1. 先尝试加载已经存在的
  2. 后初始化新的

核心要点是,当调用Done()时,初始化chan struct{}, 而不是在上限文cancelCtx创建时,就初始化完成了。称为懒惰初始化。

cancel()操作流程

取消流程,我们以 cancelCtx 的主动取消函数cancel的实现为例:

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
    c.mu.Lock()
    if c.err != nil {
        c.mu.Unlock()
        return // already canceled
    }
    // 设置 err
    c.err = err
    // 关闭channel
    d, _ := c.done.Load().(chan struct{})
    if d == nil {
        c.done.Store(closedchan)
    } else {
        close(d)
    }
    // 遍历全部可取消的子context
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()

    // 从parent的children删除自己
    if removeFromParent {
        removeChild(c.Context, c)
    }
}

以上流程的核心操作:

  • 关闭channel,用来通知全部使用该ctx的goroutine
  • 遍历全部可取消的子context,执行child的取消操作
  • 从parent的children删除自己

Context传值

若希望在使用context时,携带额外的Key-Value数据,可以使用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.WithValue</font> 方法,构建带有值的context。并使用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">Value(key any) any</font> 方法获取值。带有值

对应方法的签名如下:

func WithValue(parent Context, key, val any) Context

type Context interface {
    Value(key any) any
}

需要三个参数:

  • 上级 Context
  • key 要求是comparable的(可比较的),实操时,推荐使用特定的Key类型,避免直接使用string或其他内置类型而带来package之间的冲突。
  • val any

示例代码

type MyContextKey string

func ContextValue() {
    wg := sync.WaitGroup{}

    ctx := context.WithValue(context.Background(), MyContextKey("title"), "Go")

    wg.Add(1)
    go func(c context.Context) {
        defer wg.Done()
        if v := c.Value(MyContextKey("title")); v != nil {
            fmt.Println("found value:", v)
            return
        }
        fmt.Println("key not found:", MyContextKey("title"))
    }(ctx)

    wg.Wait()
}

<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.WithValue</font> 方法返回 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.valueCtx</font> 结构体类型。<font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">context.valueCtx</font> 结构体包含了上级Context和key、value:

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
    Context
    key, val any
}


func (c *valueCtx) Value(key any) any {
    if c.key == key {
        return c.val
    }
    return value(c.Context, key)
}

也就是除了 value 功能,其他Contenxt功能都由parent Context实现。

如果 context.valueCtx.Value 方法查询的 key 不存在于当前 valueCtx 中,就会从父上下文中查找该键对应的值直到某个父上下文中返回 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">nil</font> 或者查找到对应的值。例如:

func ContextValueDeep() {
    wgOne := sync.WaitGroup{}

    ctxOne := context.WithValue(context.Background(), MyContextKey("title"), "One")
    //ctxOne := context.WithValue(context.Background(), MyContextKey("key"), "Value")
    //ctxTwo := context.WithValue(ctxOne, MyContextKey("title"), "Two")
    ctxTwo := context.WithValue(ctxOne, MyContextKey("key"), "Value")
    //ctxThree := context.WithValue(ctxTwo, MyContextKey("title"), "Three")
    ctxThree := context.WithValue(ctxTwo, MyContextKey("key"), "Value")

    wgOne.Add(1)
    go func(c context.Context) {
        defer wgOne.Done()
        if v := c.Value(MyContextKey("title")); v != nil {
            fmt.Println("found value:", v)
            return
        }
        fmt.Println("key not found:", MyContextKey("title"))
    }(ctxThree)

    wgOne.Wait()
}

小结

特定的结构体类型:

  • emptyCtx,函数 context.Background, context.TODO
  • cancelCtx,函数 context.WithCancel
  • timerCtx, 函数 context.WithDeadline, context.WithTimeout
  • valueCtx, 函数 context.WithValue

官方博客对Context使用的建议:

  • 直接将 Context 类型作为函数的第一参数,而且一般都命名为 ctx。
  • 如果你实在不知道传什么,标准库给你准备好了一个 context.TODO。
  • context 存储的应该是一些goroutine共同的数据。
  • context 是并发安全的。

同步和锁

概述

除了channel外,我们还可以使用go的官方同步包sync,sync/atomic 完成一些基础的同步功能。主要包含同步数据、锁、原子操作等。

Lock解决方案:

func SyncLock() {
    n := 0
    wg := sync.WaitGroup{}

    lk := sync.Mutex{}
    for i := 0; i < 1000; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for i := 0; i < 100; i++ {
                lk.Lock()
                n++
                lk.Unlock()
            }
        }()
    }
    wg.Wait()
    fmt.Println("n:", n)
}

// run
n: 100000

互斥锁Mutex的使用

type Mutex
// 锁定锁m, 若锁m已是锁定状态,调用的goroutine会被阻塞,直到可以锁定
func (m *Mutex) Lock()
// 解锁锁m,若m不是锁定状态,会导致运行时错误
func (m *Mutex) Unlock()
// 尝试是否可以加锁,返回是否成功
func (m *Mutex) TryLock() bool

注意:锁与goroutine没有关联,意味着允许一个goroutine加锁,在另一个goroutine中解锁。但是不是最典型的用法。

读写RWMutex的使用

Go 的读写锁是写优先策略

手撕读写锁

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <atomic>
#include <queue>
#include <chrono>

// 读写锁基类
class RWLock {
public:
    virtual void RLock() = 0;
    virtual void RUnlock() = 0;
    virtual void WLock() = 0;
    virtual void WUnlock() = 0;
    virtual ~RWLock() = default;
};

// ==================== 读优先策略 ====================
// 读者优先:读者不等待,直接进入;写者可能饥饿
class RWLockReadFirst : public RWLock {
private:
    std::mutex mtx_;
    std::condition_variable read_cond_;
    std::condition_variable write_cond_;
    int read_count_ = 0;      // 当前读者数量
    int write_pending_ = 0;   // 等待中的写者数量
    bool writing_ = false;    // 是否有写者正在写

public:
    void RLock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        // 有写者等待或正在写,读者需要等待吗?
        // 读优先策略:读者不等待,直接进入
        // 如果想严格读优先,去掉下面的条件
        while (writing_ || write_pending_ > 0) {
            read_cond_.wait(lock);
        }
        read_count_++;
    }

    void RUnlock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        read_count_--;
        if (read_count_ == 0) {
            // 所有读者离开,唤醒等待的写者
            write_cond_.notify_one();
        }
    }

    void WLock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        write_pending_++;
        // 等待所有读者离开
        while (read_count_ > 0 || writing_) {
            write_cond_.wait(lock);
        }
        write_pending_--;
        writing_ = true;
    }

    void WUnlock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        writing_ = false;
        // 优先唤醒写者(如果写者优先策略)
        // 读优先:这里可以改成 notify_all 让读者和写者竞争
        if (write_pending_ > 0) {
            write_cond_.notify_one();
        } else {
            read_cond_.notify_all();
        }
    }
};

// ==================== 写优先策略 ====================
// 写者优先:新读者需要等待已有写者完成后,但新写者可以插队到新读者前面
class RWLockWriteFirst : public RWLock {
private:
    std::mutex mtx_;
    std::condition_variable read_cond_;
    std::condition_variable write_cond_;
    int read_count_ = 0;       // 当前读者数量
    int write_pending_ = 0;    // 等待中的写者数量
    bool writing_ = false;     // 是否有写者正在写
    int active_readers_ = 0;   // 正在读的读者(在写者等待期间进入的)

public:
    void RLock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        
        // 有写者等待时,新读者需要等待
        // 这是写优先的关键!
        while (writing_ || write_pending_ > 0) {
            read_cond_.wait(lock);
        }
        active_readers_++;
        read_count_++;
    }

    void RUnlock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        read_count_--;
        active_readers_--;
        
        if (active_readers_ == 0 && write_pending_ > 0) {
            // 所有读者完成,唤醒写者
            write_cond_.notify_one();
        }
    }

    void WLock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        write_pending_++;
        
        // 等待所有读者离开
        while (read_count_ > 0 || writing_) {
            write_cond_.wait(lock);
        }
        write_pending_--;
        writing_ = true;
    }

    void WUnlock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        writing_ = false;
        
        // 写优先:优先唤醒写者
        if (write_pending_ > 0) {
            write_cond_.notify_one();
        } else if (read_count_ > active_readers_) {
            // 还有读者在等待,让它们继续
            read_cond_.notify_all();
        }
    }
};

// ==================== 公平策略(先来先服务) ====================
// 读者和写者按照请求顺序排队
class RWLockFair : public RWLock {
private:
    std::mutex mtx_;
    std::condition_variable read_cond_;
    std::condition_variable write_cond_;
    int read_count_ = 0;
    int write_count_ = 0;       // 等待的写者数量
    int active_writers_ = 0;    // 正在写的写者数量
    bool writer_waiting_ = false;  // 是否有写者在等待

public:
    void RLock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        
        // 如果有写者在等待,读者需要等待
        // 这样保证先来的写者优先
        while (active_writers_ > 0 || writer_waiting_) {
            read_cond_.wait(lock);
        }
        read_count_++;
    }

    void RUnlock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        read_count_--;
        
        if (read_count_ == 0 && write_count_ > 0) {
            // 所有读者完成,唤醒一个写者
            write_cond_.notify_one();
        }
    }

    void WLock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        write_count_++;
        writer_waiting_ = true;
        
        while (active_writers_ > 0 || read_count_ > 0) {
            write_cond_.wait(lock);
        }
        writer_waiting_ = false;
        write_count_--;
        active_writers_++;
    }

    void WUnlock() override {
        std::unique_lock<std::mutex> lock(mtx_);
        active_writers_--;
        
        // 按照顺序唤醒:先写者(如果还有),再读者
        if (write_count_ > 0) {
            write_cond_.notify_one();
        } else {
            read_cond_.notify_all();
        }
    }
};

// ==================== 测试代码 ====================
int shared_data = 0;

template<typename RWLockType>
void testRWLock(const std::string& name) {
    RWLockType rwlock;
    shared_data = 0;
    
    std::cout << "\n=== 测试 " << name << " ===\n";
    
    // 读者线程
    auto reader = [&]() {
        for (int i = 0; i < 5; i++) {
            rwlock.RLock();
            int data = shared_data;
            std::cout << "读线程 " << std::this_thread::get_id() 
                      << " 读取数据: " << data << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(100));
            rwlock.RUnlock();
        }
    };
    
    // 写者线程
    auto writer = [&]() {
        for (int i = 0; i < 5; i++) {
            rwlock.WLock();
            shared_data++;
            std::cout << "写线程 " << std::this_thread::get_id() 
                      << " 写入数据: " << shared_data << "\n";
            std::this_thread::sleep_for(std::chrono::milliseconds(150));
            rwlock.WUnlock();
        }
    };
    
    std::thread writers[] = {
        std::thread(writer), std::thread(writer)
    };
    std::thread readers[] = {
        std::thread(reader), std::thread(reader), std::thread(reader)
    };
    
    for (auto& w : writers) w.join();
    for (auto& r : readers) r.join();
}

int main() {
    testRWLock<RWLockReadFirst>("读优先策略");
    testRWLock<RWLockWriteFirst>("写优先策略");
    testRWLock<RWLockFair>("公平策略");
    
    return 0;
}

接口说明

type RWMutex
// 写锁定
func (rw *RWMutex) Lock()
// 写解锁
func (rw *RWMutex) Unlock()

// 读锁定
func (rw *RWMutex) RLock()
// 读解锁
func (rw *RWMutex) RUnlock()

// 尝试加写锁定
func (rw *RWMutex) TryLock() bool
// 尝试加读锁定
func (rw *RWMutex) TryRLock() bool

使用样例

func SyncRLock() {
    wg := sync.WaitGroup{}
    // 模拟多个goroutine
    var rwlck sync.RWMutex
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            //
            //rwlck.Lock()
            rwlck.RLock()
            // 输出一段内容
            fmt.Println(time.Now())
            time.Sleep(1 * time.Second)
            //
            //rwlck.Unlock()
            rwlck.RUnlock()
        }()
    }

    wg.Add(1)
    go func() {
        defer wg.Done()
        //
        rwlck.Lock()
        //rwlck.RLock()
        // 输出一段内容
        fmt.Println(time.Now(), "Lock")
        time.Sleep(1 * time.Second)
        //
        rwlck.Unlock()
        //rwlck.RUnlock()
    }()

    wg.Wait()
}

RLock 底层

结构
type RWMutex struct {
    w           Mutex      // 底层互斥锁,写操作使用
    writerSem   uint32     // 写者信号量
    readerSem   uint32     // 读者信号量
    readerCount int32      // 当前读者数量 ⚠️ 关键!多个 goroutine 同时修改这里 → 原子竞争!  
    readerWait  int32      // 写者需要等待的读者数量
}
实现原理
func (rw *RWMutex) RLock() {
    // 步骤1: 原子增加 readerCount
    if atomic.AddInt32(&rw.readerCount, 1) < 0 {
        // 步骤2: 如果有写者持有锁,加入等待队列
        runtime_SemacquireRWMutexR(&rw.readerSem)
    }
}

Lock 源码

func (rw *RWMutex) Lock() {
    // 步骤1: 与其他写者竞争
    rw.w.Lock()  // ← 关键!保证只有一个写者能进入
                 //         多个 goroutine 同时 Lock(),只有一个能获得 w
    
    // 步骤2: 标记"有写者在等待"
    // 原来的 readerCount 加上 (-rwmutexMaxReaders)
    // 例如:readerCount = 5 → 变成 5 - 2^30 = 负数
    r := rw.readerCount.Add(-rwmutexMaxReaders) + rwmutexMaxReaders
    //             ↓
    //    readerCount 变成负数
    //    向所有后续 RLock() 发出信号:"有写者在等待!"
    
    // 步骤3: 等待现有读者完成
    if r != 0 && rw.readerWait.Add(r) != 0 {
        // readerWait = 需要等待的读者数量
        // 阻塞直到所有读者完成 RUnlock()
        runtime_SemacquireRWMutex(&rw.writerSem, false, 0)
    }
}

RLock 源码

func (rw *RWMutex) RLock() {
    // 步骤1: 原子增加 readerCount
    if rw.readerCount.Add(1) < 0 {
        // 步骤2: readerCount 是负数 → 有写者在等待
        //       当前 goroutine 加入读者等待队列
        runtime_SemacquireRWMutexR(&rw.readerSem, false, 0)
    }
}

UnLock 源码

func (rw *RWMutex) Unlock() {
    if race.Enabled {
        race.Read(unsafe.Pointer(&rw.w))
        race.Release(unsafe.Pointer(&rw.readerSem))
        race.Disable()
    }

    // 步骤1: 恢复 readerCount
    // 把负数变回正数
    r := rw.readerCount.Add(rwmutexMaxReaders)
    if r >= rwmutexMaxReaders {
        race.Enable()
        fatal("sync: Unlock of unlocked RWMutex")
    }
    
    // 步骤2: 唤醒所有等待的读者
    for i := 0; i < int(r); i++ {
        runtime_Semrelease(&rw.readerSem, false, 0)
    }
    
    // 步骤3: 释放 w 锁,允许其他写者
    rw.w.Unlock()
}

RUnLock 源码

func (rw *RWMutex) RUnlock() {
    if race.Enabled {
        race.Read(unsafe.Pointer(&rw.w))
        race.ReleaseMerge(unsafe.Pointer(&rw.writerSem))
        race.Disable()
    }
    
    // 步骤1: 原子减少 readerCount
    if r := rw.readerCount.Add(-1); r < 0 {
        // 步骤2: 慢路径 - 有写者在等待
        rw.rUnlockSlow(r)
    }
}

func (rw *RWMutex) rUnlockSlow(r int32) {
    if r+1 == 0 || r+1 == -rwmutexMaxReaders {
        race.Enable()
        fatal("sync: RUnlock of unlocked RWMutex")
    }
    
    // 步骤3: readerWait 减 1
    if rw.readerWait.Add(-1) == 0 {
        // 步骤4: 最后一个读者,唤醒写者!
        runtime_Semrelease(&rw.writerSem, false, 1)
    }
}

同步Map sync.Map

Go中Map是非线程(goroutine)安全的。并发操作 Map 类型时,会导致 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">fatal error: concurrent map read and map write</font>错误:

func SyncMapErr() {
    m := map[string]int{}
    // 并发map写
    go func() {
        for {
            m["key"] = 0
        }
    }()
    // 并发map读
    go func() {
        for {
            _ = m["key"]
        }
    }()
    // 阻塞
    select {}
}

之所以Go不支持Map的并发安全,是因为Go认为Map的典型使用场景不需要在多个Goroutine间并发安全操作Map。

并发安全操作Map的方案:

  • 锁 + Map,自定义Map操作,增加锁的控制,可以选择 Mutex和RWMutex。
  • sync.Map,sync包提供的安全Map.

锁+Map示例,在结构体内嵌入sync.Mutex:

func SyncMapLock() {
    myMap := struct {
        sync.RWMutex
        Data map[string]int
    }{
        Data: map[string]int{},
    }

    // write
    myMap.Lock()
    myMap.Data["key"] = 0
    myMap.Unlock()

    // read
    myMap.RLock()
    _ = myMap.Data["key"]
    myMap.RUnlock()
}

sync.Map 的使用

type Map
// 最常用的4个方法:
// 存储
func (m *Map) Store(key, value any)
// 遍历 map
func (m *Map) Range(f func(key, value any) bool)
// 删除某个key元素
func (m *Map) Delete(key any)
// 返回key的值。存在key,返回value,true,不存在返回 nil, false
func (m *Map) Load(key any) (value any, ok bool)

// 若m[key]==old,执行删除。key不存在,返回false
func (m *Map) CompareAndDelete(key, old any) (deleted bool)
// 若m[key]==old,执行交换, m[key] = new
func (m *Map) CompareAndSwap(key, old, new any) bool

// 返回值后删除元素。loaded 表示是否load成功,key不存在,loaded为false
func (m *Map) LoadAndDelete(key any) (value any, loaded bool)
// 加载,若加载失败则存储。返回加载或存储的值和是否加载
func (m *Map) LoadOrStore(key, value any) (actual any, loaded bool)

// 存储新值,返回之前的值。loaded表示key是否存在
func (m *Map) Swap(key, value any) (previous any, loaded bool)

sync.Map 不需要类型初始化,即可使用,可以理解为map[comparable]any。

使用示例,不会触发 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">fatal error: concurrent map read and map write</font>

func SyncSyncMap() {
    var m sync.Map
    go func() {
        for {
            m.Store("key", 0)
        }
    }()
    go func() {
        for {
            _, _ = m.Load("key")
        }
    }()
    select {}
}

sync.Map 底层实现

sync.Map 使用了分段锁 + 读写分离的设计:

type Map struct {
    mu     Mutex      // 保护 dirty 字段
    read   atomic.Pointer<readMap>  // 读map(无锁)
    dirty  map[interface{}]*entry   // 写map(需要加锁)
    misses int                       // 读miss计数
}

readMap(只读层):

  • 包含大部分数据
  • 使用 atomic 操作,无需加锁
  • 存储 *entry,支持标记删除

dirtyMap(写层):

  • 只包含新写入的数据
  • 需要 mu 锁保护
  • 数据不完整

为什么锁+Map写快读慢?

对于读多写少的场景,sync.Map 反而更快,原因如下:

读操作对比

特性 RWMutex + Map sync.Map
锁类型 共享读锁 无锁(原子操作)
并发度 多个读共享锁 完全并发
竞争 读操作竞争同一把 CAS 锁(修改读用户数量是原子修改,需要竞争) 无竞争(读取 read 指针原子操作)
缓存行 多线程争用 每个线程独立访问

sync.Map 读快的原因:

  1. 读操作完全无锁 - 使用 atomic.LoadPointer 读取 readMap

:::info
2. 读缓存友好 - 没有锁竞争,CPU缓存更稳定
3. 无伪共享 - 没有锁的缓存行争用问题

:::

写操作对比

特性 RWMutex + Map sync.Map
锁类型 独占写锁 Mutex + 双map维护
开销 单次加锁 read+dirty同步、misses计数
删除 直接删除 标记删除 + 提升dirty

sync.Map 写慢的原因:

  1. 需要维护两个map
  2. 删除不能直接删除,只能标记
  3. dirty提升到read时需要锁住整个map(这个 map 指的是什么)

sync.Map 的工作流程 TODO

读操作(Read)
┌─────────────────────────────────────────┐
│         尝试从 readMap 读取              │
│         (无锁,原子操作)                 │
└────────────────┬────────────────────────┘
                 │
        ┌───────┴───────┐
        ↓               ↓
     找到 ✓           未找到 ✗
        │               │
        ↓               ↓
    返回value     ┌────┴────┐
                  ↓         ↓
            检查是否标记     直接返回nil
            删除(soft delete)   │
                  │             │
                  ↓             ↓
            从dirty读取    记录miss
                  │         (mu锁)
                  └─────┬───────┘
                        ↓
              dirty非空时提升readMap
写操作(Store)
┌─────────────────────────────────────────┐
│         加锁 (mu.Lock)                  │
└────────────────┬────────────────────────┘
                 │
        ┌────────┴────────┐
        ↓                 ↓
   read中不存在?      read中存在?
        │                 │
        ↓                 ↓
   写入dirty       更新/标记删除
   (创建如果为空)   (read是copy-on-write)
        │                 │
        └────────┬────────┘
                 │
        ┌────────┴────────┐
        ↓                 ↓
   misses >= len(dirty) → 提升dirty到read
        │                 │
        ↓                 ↓
   清空dirty         释放锁
   重置misses

何时使用 sync.Map

✅ 适合:

  • 读多写少(读>90%)
  • 并发读远多于并发写
  • key固定不变或很少删除

❌ 不适合:

  • 写多读少
  • 需要遍历所有数据(Range 需要锁)
  • 需要精确统计长度

简单来说,sync.Map 用空间换时间,用额外的内存维护 read/dirty 双map,换取读操作的极致性能。

原子操作 sync/atomic

原子操作是CPU指令级别实现的,比如在Intel的CPU上主要是使用总线锁的方式,AMD的CPU架构机器上就是使用MESI一致性协议的方式来保证原子操作。

go中 sync/atomic 包提供了原子操作的支持

除了预定义的整型的支持,还可以使用 atomic.Value 类型,完成其他类型的原子操作:

type Value
func (v *Value) CompareAndSwap(old, new any) (swapped bool)
func (v *Value) Load() (val any)
func (v *Value) Store(val any)
func (v *Value) Swap(new any) (old any)

使用方法:

func SyncAtomicValue() {

    var loadConfig = func() map[string]string {
        return map[string]string{
            // some config
            "title":   "马士兵Go并发编程",
            "varConf": fmt.Sprintf("%d", rand.Int63()),
        }
    }

    var config atomic.Value

    // 每N秒加载一次配置文件
    go func() {
        for {
            config.Store(loadConfig())
            fmt.Println("latest config was loaded", time.Now().Format("15:04:05.99999999"))
            time.Sleep(time.Second)
        }
    }()

    // 使用配置
    // 不能在加载的过程中使用配置
    for {
        go func() {
            c := config.Load()
            fmt.Println(c, time.Now().Format("15:04:05.99999999"))
        }()

        time.Sleep(400 * time.Millisecond)
    }

    select {}
}

sync.Pool 并发安全池

池是一组可以单独保存和检索的可以复用的临时对象。存储在池中的任何项目可随时自动删除,无需通知。一个池可以安全地同时被多个goroutine使用。

典型特征:

  • sync.Pool 是并发安全的
  • 池中的对象由Go负责删除,内存由Go自己回收
  • 池中元素的数量由Go负责管理,用户无法干预
  • 池中元素应该是临时的,不应该是持久的。例如长连接不适合放入 sync.Pool 中

池的目的是缓存已分配但未使用的项目以供以后重用,从而减轻垃圾收集器的压力。也就是说,它使构建高效、线程安全的自由元素变得容易。

池的一个适当用途是管理一组临时项,这些临时项在包的并发独立客户端之间默默共享,并可能被其重用。池提供了一种在许多客户机上分摊分配开销的方法。

一个很好地使用池的例子是fmt包,它维护了临时输出缓冲区的动态大小存储。

池由 sync.Pool类型实现,具体三个操作:

  • 初始化Pool实例,需要提供池中缓存元素的New方法。
  • 申请元素,func (p *Pool) Get() any
  • 交回对象,func (p *Pool) Put(x any)

操作示例:

func SyncPool() {
    // 原子的计数器
    var counter int32 = 0

    // 定义元素的Newer,创建器
    elementNewer := func() any {
        // 原子的计数器累加
        atomic.AddInt32(&counter, 1)

        // 池中元素推荐(强烈)是指针类型
        return new(bytes.Buffer)
    }

    // Pool的初始化
    pool := sync.Pool{
        New: elementNewer,
    }

    // 并发的申请和交回元素
    workerNum := 1024 * 1024
    wg := sync.WaitGroup{}
    wg.Add(workerNum)
    for i := 0; i < workerNum; i++ {
        go func() {
            defer wg.Done()
            // 申请元素,通常需要断言为特定类型
            buffer := pool.Get().(*bytes.Buffer)
            // 不用Pool
            //buffer := elementNewer().(*bytes.Buffer)
            // 交回元素
            defer pool.Put(buffer)
            // 使用元素
            _ = buffer.String()
        }()
    }

    //
    wg.Wait()

    // 测试创建元素的次数
    fmt.Println("elements number is :", counter)
}

// elements number is : 12

测试的时候,大家可以发现创建的元素数量远远低于goroutine的数量。

DATA RACE 现象

当程序运行时,由于并发的原因会导致数据竞争使用,有时在编写代码时很难发现,要经过大量测试才会发现。可以用 <font style="color:rgb(36, 41, 46);background-color:rgba(27, 31, 35, 0.05);">go run -race </font> ,增加-race选项,检测运行时可能出现的竞争问题。

测试之前的计数器累加代码:本例子需要 main.main 来演示,因为是 go run:

package main

import (
    "fmt"
    "sync"
)

func main() {
    wg := sync.WaitGroup{}
    // 计数器
    counter := 0
    // 多个goroutine并发的累加计数器
    gs := 1000
    wg.Add(gs)
    for i := 0; i < gs; i++ {
        go func() {
            defer wg.Done()
            // 累加
            for k := 0; k < 100; k++ {
                counter++
                // ++ 操作不是原子的
                // counter = counter + 1
                // 1. 获取当前的counter变量
                // 2. +1
                // 3. 赋值新值到counter
            }
        }()
    }
    // 统计计数结果
    wg.Wait()
    fmt.Println("Counter:", counter)
}

结果:

# 没有使用 -race
PS D:\apps\goExample\concurrency> go run .\syncRace.go
n: 94077

# 使用 -race
PS D:\apps\goExample\concurrency> go run -race .\syncRace.go
==================
WARNING: DATA RACE  
Read at 0x00c00000e0f8 by goroutine 9:
  main.main.func1()
      D:/apps/goExample/concurrency/syncMain.go:16 +0xa8

Previous write at 0x00c00000e0f8 by goroutine 7:
Goroutine 9 (running) created at:
  main.main()
      D:/apps/goExample/concurrency/syncMain.go:13 +0x84

Goroutine 7 (finished) created at:
  main.main()
      D:/apps/goExample/concurrency/syncMain.go:13 +0x84
==================
n: 98807
Found 1 data race(s)
exit status 66

该选项用于在开发阶段,检测数据竞争情况。

出现 data race情况,可以使用锁,或原子操作的来解决。

sync.Once

若需要保证多个并发goroutine中,某段代码仅仅执行一次,就可以使用 sync.Once 结构实现。

例如,在获取配置的时候,往往仅仅需要获取一次,然后去使用。在多个goroutine并发时,要保证能够获取到配置,同时仅获取一次配置,就可以使用sync.Once结构:

func SyncOnce() {

    // 初始化config变量
    config := make(map[string]string)

    // 1. 初始化 sync.Once
    once := sync.Once{}

    // 加载配置的函数
    loadConfig := func() {
        // 2. 利用 once.Do() 来执行
        once.Do(func() {
            // 保证执行一次
            config = map[string]string{
                "varInt": fmt.Sprintf("%d", rand.Int31()),
            }
            fmt.Println("config loaded")
        })
    }

    // 模拟多个goroutine,多次调用加载配置
    // 测试加载配置操作,执行了几次
    workers := 10
    wg := sync.WaitGroup{}
    wg.Add(workers)
    for i := 0; i < workers; i++ {
        go func() {
            defer wg.Done()
            // 并发的多次加载配置
            loadConfig()
            // 使用配置
            _ = config

        }()
    }
    wg.Wait()
}

核心逻辑:

  1. 初始化 sync.Once
  2. once.Do(func()) 可以确保func()仅仅执行一次

sync.Once 的实现很简单:

type Once struct {
    // 是否已处理,保证一次
    done uint32
    // 锁,保证并发安全
    m    Mutex
}

sync.Cond

sync.Cond是sync包提供的基于条件(Condition)的通知结构。

该结构提供了4个方法:

// 创建Cond
func NewCond(l Locker) *Cond
// 全部唤醒
func (c *Cond) Broadcast()
// 唤醒1个
func (c *Cond) Signal()
// 等待唤醒
func (c *Cond) Wait()

其中,创建时,需要1个Locker作为参数,通常是 sync.Mutext或sync.RWMutex。然后两个方法用来通知,一个方法用来等待。

使用逻辑很简单,通常是一个goroutine负责通知,多个goroutine等待处理,如图:

cond := sync.NewCond(&sync.Mutex{})
cond := sync.NewCond(&sync.RWMutex{})

cond的广播和信号通知操作是并发安全的,可以重复调用的。

要注意Wait()操作,是会先解锁,等到广播信号后,再加锁。因此,Wait()操作前,要加锁。

示例代码:

  • 一个goroutine负责接收数据,完毕后,广播给处理数据的goroutine
  • 多个goroutine处理数据,在数据未处理完前,等待广播信号。信号来了,处理数据
func SyncCond() {
    wg := sync.WaitGroup{}
    dataCap := 1024 * 1024
    var data []int
    cond := sync.NewCond(&sync.Mutex{})
    for i := 0; i < 8; i++ {
        wg.Add(1)
        go func(c *sync.Cond) {
            defer wg.Done()
            c.L.Lock()
            for len(data) < dataCap {
                c.Wait()
            }
            fmt.Println("listen", len(data), time.Now())
            c.L.Unlock()
        }(cond)
    }

    wg.Add(1)
    go func(c *sync.Cond) {
        defer wg.Done()
        c.L.Lock()
        defer c.L.Unlock()
        for i := 0; i < dataCap; i++ {
            data = append(data, i*i)
        }
        fmt.Println("Broadcast")
        c.Broadcast()
        //c.Signal()
    }(cond)

    // 为什么 for { wait() }
    // 另外的广播goroutine
    //wg.Add(1)
    //go func(c *sync.Cond) {
    //    defer wg.Done()
    //    c.Broadcast()
    //}(cond)

    wg.Wait()
}

示例代码要点:

  • 因为调度问题,不能决定哪个协程先执行。所以会出现,先 broadcast 再 wait,有部分协程没有被唤醒。所以一定要在 wait 前判断条件。这个条件判断使用 for,为了让 wait 返回后再判断,防止其他地方 boradcast
  • wait要使用for进行条件判定,因为在wait返回后,条件不一定成立。因为Broadcast()操作可能被提前调用(通常是在其他的goroutine中。
  • Broadcast() 操作可选的是否加锁解锁
  • Wait() 操作前,一定要加锁。因为Wait()操作,会先解锁,接收到信号后,再加锁。

sync.Cond 基本原理

sync.Cond结构:

type Cond struct {
    // 锁
    L Locker
    // 等待通知goroutine列表
    notify  notifyList

    // 限制不能被拷贝
    noCopy noCopy
    checker copyChecker
}

结构上可见,Cond记录了等待的goroutine列表,这样就可以做到,广播到全部的等待goroutine。这也是Cond应该被复制的原因,否则这些goroutine可能会被意外唤醒。

Wait() 操作:

func (c *Cond) Wait() {
   // 检查是否被复制
   c.checker.check()
   // 更新 notifyList 中需要等待的 waiter 的数量
   // 返回当前需要插入 notifyList 的编号
   t := runtime_notifyListAdd(&c.notify)
   // 解锁
   c.L.Unlock()
   // 挂起,直到被唤醒
   runtime_notifyListWait(&c.notify, t)
   // 唤醒之后,重新加锁。
   // 因为阻塞之前解锁了。
   c.L.Lock()
}

核心工作就是,记录当goroutine到Cond的notifyList。之后解锁,挂起,加锁。因此要在Wait()前加锁,后边要解锁。

Broadcast()操作:

func (c *Cond) Broadcast() {
   // 检查 sync.Cond 是否被复制了
   c.checker.check()
   // 唤醒 notifyList 中的所有 goroutine
   runtime_notifyListNotifyAll(&c.notify)
}

核心工作就是唤醒 notifyList 中全部的 goroutine。

小结

同步类型:

  • 数据同步,保证数据操作的原子性
    • sync/atomic
    • sync.Map
    • sync.Mutex, sync.RWMutex
  • 操作同步
    • sync.Mutex, sync.RWMutex

锁的类型:

  • 互斥锁 sync.Mutex,完全独占
  • 读写互斥锁 sync.RWMutex,可以共享读操作

锁的不锁资源,只是锁定申请锁本身的操作。

sync包总结

  • 锁:sync.Mutex, sync.RWMutex
  • 数据:sync.Map, sync/atomic
  • sync.Pool
  • sync.Once
  • sync.Cond

使用Channel完成数据和信号的同步!

案例

并发目录大小统计

业务逻辑

统计目录的文件数量和大小(或其他信息)。示例输出:

// 某个目录:
2637 files 1149.87 MB

实现思路

  • 给定一个或多个目录,并发的统计每个目录的size,最后累加到一起。
  • 当目录中存在子目录时,递归的统计。
  • 每个目录的统计都由独立的Goroutine完成
  • 累计总Size由独立的Goroutine完成
  • 使用Channel传递获取的文件大小
  • 使用WaitGroup调度

核心代码

// 读取目录内容
// os.ReadDir
func ReadDir(name string) ([]DirEntry, error)
entries, err := os.ReadDir(dir)

// 取得文件信息
info, err := entry.Info()

//判定是否为目录
entry.IsDir()

编码实现

package main

import (
	"fmt"
	"io/fs"
	"os"
	"path/filepath"
	"sync"
)

// WalkDir 遍历指定目录列表,计算所有文件的总大小和数量
// 参数 dirs: 可变数量的目录路径,如果不传则默认为当前目录"."
// 返回: 格式化字符串,包含文件数量和总大小(MB)
func WalkDir(dirs ...string) string {
	// 如果没有传入任何目录,使用当前目录作为默认值
	if len(dirs) == 0 {
		dirs = []string{"."}
	}

	// 创建带缓冲的通道,缓冲大小为1
	// 缓冲可以避免在等待接收时阻塞发送方,提高并发效率
	filesizeCh := make(chan int64, 1)

	// 初始化等待组,用于等待所有goroutine完成
	wg := &sync.WaitGroup{}

	// 遍历每个待处理的目录
	for _, dir := range dirs {
		// 等待组计数加1,表示有一个新的goroutine开始工作
		wg.Add(1)
		// 启动goroutine异步遍历子目录
		go walkDir(dir, filesizeCh, wg)
	}

	// 单独启动一个goroutine等待所有walkDir任务完成
	// 当所有goroutine完成后,关闭filesizeCh通道
	// 这样range循环就能正确结束
	go func(wg *sync.WaitGroup) {
		// 阻塞等待,直到所有goroutine调用wg.Done()
		wg.Wait()
		// 关闭通道,通知接收方没有更多数据了
		close(filesizeCh)
	}(wg)

	// 初始化计数器
	var fileNum, sizeTotal int64

	// 从通道中读取所有文件大小数据
	// 当通道被关闭时,range循环会自动结束
	for filesize := range filesizeCh {
		fileNum++      // 文件数量加1
		sizeTotal += filesize  // 累加文件大小
	}

	// 格式化输出结果
	// 文件数量取整,总大小转换为MB(1e6字节 = 1MB)
	return fmt.Sprintf("%d files %.2f MB\n", fileNum, float64(sizeTotal)/1e6)
}

// walkDir 递归遍历目录,发送所有文件的大小到通道
// 参数 dir: 当前要遍历的目录路径
// 参数 fileSizes: 用于发送文件大小的通道
// 参数 wg: 等待组引用,用于标记任务完成
func walkDir(dir string, fileSizes chan<- int64, wg *sync.WaitGroup) {
	// defer确保在函数返回时调用wg.Done()
	// 即使发生panic,也会执行,保证计数器正确递减
	defer wg.Done()

	// 获取目录下所有文件和子目录的信息
	for _, fileinfo := range fileInfos(dir) {
		// 判断当前条目是否是目录
		if fileinfo.IsDir() {
			// 构建子目录的完整路径
			subDir := filepath.Join(dir, fileinfo.Name())

			// 遇到子目录,需要递归遍历
			// 先增加等待组计数
			wg.Add(1)
			// 启动新的goroutine处理子目录
			go walkDir(subDir, fileSizes, wg)
		} else {
			// 如果是文件,将文件大小发送到通道
			// 非阻塞发送,因为通道有1个缓冲
			fileSizes <- fileinfo.Size()
		}
	}
}

// fileInfos 读取指定目录下的所有文件信息
// 参数 dir: 目录路径
// 返回: 文件信息切片,读取失败时返回空切片

测试执行

> go test -run=WalkDir
70 files 0.09 MB

PASS
ok      goConcurrency   0.321s

快速排序的并发编程实现

典型的单线程快速排序实现

func QuickSortSingle(arr []int) []int {
    // 确保arr中至少存在2个或以上元素
    if arr == nil || len(arr) < 2 {
        return arr
    }
    // 执行排序
    quickSortSingle(arr, 0, len(arr)-1)
    return arr
}

func quickSortSingle(arr []int, l, r int) {
    // 判定待排序范围是否合法
    if l < r {
        // 获取参考元素位置索引
        mid := partition(arr, l, r)
        // 递归排序左边
        quickSortSingle(arr, l, mid-1)
        // 递归排序右边
        quickSortSingle(arr, mid+1, r)
    }
}

// 大小分区,返回参考元素索引
func partition(arr []int, l, r int) int {
    p := l - 1
    for i := l; i <= r; i++ {
        if arr[i] <= arr[r] {
            p++
            swap(arr, p, i)
        }
    }
    return p
}

// 交换arr中i和j元素
func swap(arr []int, i, j int) {
    t := arr[i]
    arr[i] = arr[j]
    arr[j] = t
}

并发编程实现思路

  • 使用独立的Goroutine完成arr中某部分的排序
  • WaitGroup 完成等待阻塞同步

编码实现

// QuickSortConcurrency 快速排序调用函数
func QuickSortConcurrency(arr []int) []int {
	// 一:校验arr是否满足排序需要,至少要有2个元素
	if arr == nil || len(arr) < 2 {
		return arr
	}

	// 四:同步的控制
	wg := &sync.WaitGroup{}
	// 二:执行排序
	// 初始排序整体[0, len(arr)-1]
	wg.Add(1)
	go quickSortConcurrency(arr, 0, len(arr)-1, wg)
	wg.Wait()

	// 三:返回结果
	return arr
}

// 实现递归快排的核心函数
// 接收arr,和排序区间的索引位置[l, r]
func quickSortConcurrency(arr []int, l, r int, wg *sync.WaitGroup) {
	// 一:-1wg的计数器
	defer wg.Done()

	// 二:判定是否需要排序, l < r
	if l < r {
		// 三:大小分区元素,并获取参考元素索引
		mid := partition(arr, l, r)

		// 四:并发对左部分排序
		wg.Add(1)
		go quickSortConcurrency(arr, l, mid-1, wg)

		// 五:并发的对右部分排序
		wg.Add(1)
		go quickSortConcurrency(arr, mid+1, r, wg)
	}
}

partition 和 swap 部分不变。

测试执行

func TestQuickSortConcurrency(t *testing.T) {
	randArr := GenerateRandArr(1000)
	sortArr := QuickSortConcurrency(randArr)
	fmt.Println(sortArr)
}


// 生成大的随机数组
func GenerateRandArr(l int) []int {
	// 生产大量的随机数
	arr := make([]int, l)
	rand.Seed(time.Now().UnixMilli())
	for i := 0; i < l; i++ {
		arr[i] = int(rand.Int31n(int32(l * 5)))
	}

	return arr
}
> go test -run=QuickSortConcurrency

参考文件:src/runtime 源码。
参考文章链接:https://mp.weixin.qq.com/s?__biz=MzkxMjQzMjA0OQ==&mid=2247483889&idx=1&sn=dd5066f7c27a6b29f57ff9fecb699d77&poc_token=HCXfiWmjPrPdgc_HuGVO_V67h8uXKTdcTVzBc6b6

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐