目录

7.1 为什么要使用 goroutine

7.2 进程、线程以及并行、并发

7.2.1 关于进程和线程

7.2.2 关于并行和并发

7.2.3 关于协程

7.3 Golang 中的goroutine以及主线程

7.3.1 Golang 中的goroutine以及主线程介绍

7.3.2 Goroutine 的使用以及 sync.WaitGroup

7.3.3 启动多个 goroutine

7.3.4 设置 Golang 并行运行的时候占用的 CPU 数量

7.3.5 案例:Goroutine 统计素数

7.4 Channel 管道

7.4.1 channel 类型

7.4.2 创建 channel

7.4.3 channel 操作

7.4.4 管道阻塞

7.4.5 for... := range... 从管道循环取值

7.5 Goroutine 结合 Channel 管道

7.6 单向管道

7.7 select 多路复用

7.8 Golang 并发安全和锁

7.8.1 互斥锁

7.8.2 读写互斥锁

7.9 Goroutine Recover 解决协程中出现的 Panic

7.10 练习题:Web3.0并发编程应用


7.1 为什么要使用 goroutine

需求:要统计 1-10000000 的数字中那些是素数,并打印这些素数?

素数:就是除了 1 和它本身不能被其他数整除的数。

实现方法:

  1、传统方法,通过一个 for 循环判断各个数是不是素数;

  2、使用并发或者并行的方式,将统计素数的任务分配给多个 goroutine 去完成,这个时候就用到了 goroutine;

  3、goroutine 结合 channel;

想象一下你要同时做几件事:比如煮汤、回复微信、洗衣服。传统方式就像你雇佣三个不同的人分别做,但雇佣和管理每个人的成本很高;而Go语言中的goroutine就像你拥有了“分身术”,你可以轻松地同时处理很多任务,每个任务只占用一点点精力,而且Go语言会自动帮你协调这些任务,让你写代码时就像在写普通的顺序任务一样简单。这样就能高效利用时间,快速完成所有事情。

7.2 进程、线程以及并行、并发

7.2.1 关于进程和线程

进程(Process)就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位,进程是一个动态概念,是程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空间。一个进程至少有 5 种基本状态,它们是:初始态,执行态,等待状态,就绪状态,终止状态。通俗地讲进程就是一个正在执行的程序。

线程 是进程的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位一个进程可以创建多个线程,同一个进程中的多个线程可以并发执行,一个程序要运行的话至少有一个进程。

7.2.2 关于并行和并发

并发:多个线程同时竞争一个位置,竞争到的才可以执行,每一个时间段只有一个线程在执行。

并行:多个线程可以同时执行,每一个时间段,可以有多个线程同时执行。

通俗的讲多线程程序在单核 CPU 上面运行就是并发,多线程程序在多核 CUP 上运行就是并行如果线程数大于 CPU 核数,则多线程程序在多个 CPU 上面运行既有并行又有并发

7.2.3 关于协程

协程(Coroutine)是一种轻量级的“虚拟线程”,可以理解为一种特殊的函数,它允许你在执行到一半的时候暂停,然后从暂停的地方继续执行。它和普通函数最大的区别是:普通函数一旦开始就会一直执行直到结束,而协程可以主动让出控制权,稍后再回来接着执行。

举个例子:就像你看书时,突然想喝水,你夹个书签标记位置,然后去倒水,回来继续从书签处阅读。协程就是这样一个可以“暂停+恢复”的任务,非常适合处理需要等待的操作(比如网络请求、文件读写),在等待期间让协程暂停,去执行其他任务,等结果返回再恢复,从而高效利用CPU。

在Go语言中,goroutine就是一种协程实现,但Go的运行时还做了优化:它把多个协程(goroutine)自动分配到多个操作系统线程上,让你既能享受协程的轻量,又能利用多核并行。简单来说,协程就是帮你“一心多用”的工具,而Go让这个工具用起来特别顺手。

7.3 Golang 中的goroutine以及主线程

7.3.1 Golang 中的goroutine以及主线程介绍

Golang 中的主线程:(可以理解为线程/也可以理解为进程),在一个Golang 程序的主线程上可以起多个协程Golang 中多协程可以实现并行或者并发。

协程:可以理解为用户级线程,这是对内核透明的,也就是系统并不知道有协程的存在,是完全由用户自己的程序进行调度的。Golang 的一大特色就是从语言层面原生支持协程,在函数或者方法前面加 go 关键字就可创建一个协程。可以说 Golang 中的协程就是goroutine 。

Golang 中的多协程有点类似其他语言中的多线程

多协程和多线程Golang 中每个 goroutine (协程) 默认占用内存远比 Java 、C 的线程少。

OS 线程(操作系统线程)一般都有固定的栈内存(通常为 2MB 左右),一个 goroutine (协程) 占用内存非常小,只有 2KB 左右,多协程 goroutine 切换调度开销方面远比线程要少。

这也是为什么越来越多的大公司使用 Golang 的原因之一。

7.3.2 Goroutine 的使用以及 sync.WaitGroup

1、并行执行需求

在主线程(可以理解成进程)中,开启一个 goroutine, 该协程每隔 50 毫秒秒输出 "你好 goroutine" 在主线程中也每隔 50 毫秒输出"你好 golang", 输出 10 次后,退出程序,要求主线程和 goroutine 同时执行。

package main

import (
    "fmt"
    "strconv"
    "time"
)

// test 函数作为一个独立的 goroutine 运行,用于演示并发执行。
// 它会循环10次,每次输出一行信息,然后暂停1秒。
func test() {
    for i := 1; i <= 10; i++ {
        // strconv.Itoa(i) 将整数 i 转换为十进制字符串形式,便于拼接输出。
        fmt.Println("test()输出:你好 goroutine:", strconv.Itoa(i))
        // time.Sleep(time.Second) 让当前 goroutine 暂停执行1秒钟,
        // 模拟耗时操作,并让出 CPU 给其他 goroutine 执行。
        time.Sleep(time.Second)
    }
}

func main() {
    // 使用 go 关键字启动一个新的 goroutine 来并发执行 test() 函数。
    // 此时 main() 函数(主 goroutine)和 test() 会并发运行。
    go test()

    // 主 goroutine 中的循环,执行9次,每次输出后也暂停1秒。
    for i := 1; i < 10; i++ {
        fmt.Println("main() 你好 golang:", strconv.Itoa(i))
        time.Sleep(time.Second)
    }
    // 当 main() 函数执行完毕时,整个程序就会退出,
}

上面代码看上去没有问题,但是要注意主线程执行完毕后即使协程没有执行完毕,程序会退出,所以我们需要对上面代码进行改造。

sync.WaitGroup 可以实现主线程等待协程执行完毕。

package main

import (
        "fmt"
        "strconv"
        "sync"
        "time"
)

// wg 是 sync.WaitGroup 类型的变量,用于等待一组 goroutine 完成。
// 它通过计数器来跟踪启动的 goroutine 数量,当计数器归零时,Wait() 方法会解除阻塞。
var wg sync.WaitGroup

// test 函数是一个独立的 goroutine 入口,它会循环 10 次输出信息,并模拟耗时操作。
func test() {
        // 循环 10 次
        for i := 0; i < 10; i++ {
                // 输出当前循环次数,使用 strconv.Itoa 将整数转换为字符串
                fmt.Println("test() 你好 golang " + strconv.Itoa(i))
                // 暂停 50 毫秒,模拟任务执行时间,并让出 CPU 给其他 goroutine
                time.Sleep(time.Millisecond * 50)
        }
        // wg.Done() 表示当前 goroutine 已完成,将 WaitGroup 计数器减 1。
        // 通常使用 defer wg.Done() 来确保在函数退出时调用,但此处显式调用也正确。
        wg.Done()
}

func main() {
        // wg.Add(1) 将 WaitGroup 计数器增加 1,表示有一个 goroutine 需要等待。
        // 必须在启动 goroutine 之前调用 Add,以避免竞态条件。
        wg.Add(1)
        // 使用 go 关键字启动一个新的 goroutine 来并发执行 test() 函数。
        go test()

        // 主 goroutine 中的循环,执行 2 次,每次输出后也暂停 50 毫秒。
        for i := 0; i < 2; i++ {
                fmt.Println("main() 你好 golang " + strconv.Itoa(i))
                time.Sleep(time.Millisecond * 50)
        }

        // wg.Wait() 会阻塞当前 goroutine(这里是主 goroutine),
        // 直到 WaitGroup 的计数器变为 0(即所有通过 Add 添加的 goroutine 都调用了 Done)。
        // 这样可以确保主函数不会提前退出,从而让 test() goroutine 有机会执行完毕。
        wg.Wait()
}

2、sync.WaitGroup 详解

sync.WaitGroup 是 Go 语言标准库 sync 包中提供的并发同步工具,用于等待一组 goroutine 全部完成。它通过一个内部计数器来跟踪正在运行的 goroutine 数量,当计数器归零时,被阻塞的 Wait() 方法返回,主程序或其他 goroutine 即可安全地继续执行。

(1)核心方法

  • Add(delta int)给计数器增加指定的 delta(可以为负数,但通常不这样做)。通常在启动每个 goroutine 前调用 Add(1)

  • Done()等价于 Add(-1),在 goroutine 完成时调用,表示该 goroutine 已结束。

  • Wait()阻塞当前 goroutine,直到计数器变为 0。如果计数器已经是 0,则立即返回。

(2)工作原理

  • 内部维护一个计数器,初始值为 0。

  • Add 增加计数器,Done 减少计数器。

  • Wait 会一直阻塞,直到计数器为 0。

  • 计数器不能小于 0,否则会引发 panic。

(3)典型使用模式

var wg sync.WaitGroup

func worker() {
    defer wg.Done() // 确保 Done 被调用
    // 执行任务...
}

func main() {
    wg.Add(1)
    go worker()

    wg.Add(1)
    go anotherWorker()

    wg.Wait() // 等待所有 worker 完成
}

关键点Add 必须在启动 goroutine 之前调用,否则可能导致 Wait 提前结束或引发数据竞争。

(4)注意事项

  • 计数器不能为负Done 调用次数不能超过 Add 的总和,否则会 panic。

  • 不要在 AddWait 之间复制 WaitGroupWaitGroup 应通过指针传递,直接复制会导致计数器状态不共享。

  • 复用 WaitGroup:可以在 Wait 返回后重置计数器再次使用(通过 Add),但要注意清除已完成的状态。

  • 与 defer 配合:通常使用 defer wg.Done() 确保在函数返回时计数器减一,避免因 panic 或早期返回导致的死锁。

(5)完整示例(这个案例没看懂~~~~)

package main

import (
    "fmt"
    "sync"
    "time"
)

// worker 是一个模拟工作的函数,接收一个 worker ID 和一个指向 sync.WaitGroup 的指针。
// 函数内部通过 defer 语句确保在函数返回时调用 wg.Done(),从而通知 WaitGroup 该 goroutine 已完成。
func worker(id int, wg *sync.WaitGroup) {
    // defer wg.Done() 会在函数退出时执行,无论正常返回还是发生 panic,都能保证计数器减一。
    defer wg.Done()
    // 打印开始信息
    fmt.Printf("Worker %d starting\n", id)
    // 模拟耗时工作(例如 I/O 操作或计算),让当前 goroutine 睡眠 1 秒
    time.Sleep(time.Second)
    // 打印完成信息
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    // 声明一个 sync.WaitGroup 变量,用于等待一组 goroutine 完成。
    var wg sync.WaitGroup

    // 启动 3 个 worker goroutine
    for i := 1; i <= 3; i++ {
        // 在启动每个 goroutine 之前,调用 wg.Add(1) 将 WaitGroup 的内部计数器加 1。
        // 这样做可以确保 WaitGroup 知道有一个新的 goroutine 需要等待。
        wg.Add(1)
        // 使用 go 关键字启动一个新的 goroutine,执行 worker 函数。
        // 注意:必须传递 wg 的指针,因为 WaitGroup 内部状态需要被所有 goroutine 共享。
        go worker(i, &wg)
    }

    // wg.Wait() 会阻塞当前 goroutine(即主 goroutine),直到 WaitGroup 的计数器归零。
    // 也就是说,它会等待所有通过 wg.Add(1) 注册的 goroutine 都调用 wg.Done() 完成。
    wg.Wait()
    // 所有 worker 完成后,打印最终信息
    fmt.Println("All workers finished")
}

(6)与其他同步原语对比

工具

适用场景

sync.WaitGroup

等待一组 goroutine 完成

channel

可用于等待完成(如使用无缓冲 channel),但更擅长数据传递

sync.Cond

复杂的条件等待

sync.Once

确保某个函数只执行一次

总结:sync.WaitGroup 是 Go 并发编程中最常用的同步工具之一,它简单、高效,非常适合用于“主 goroutine 等待子任务完成”的场景。掌握其正确使用方式(特别是 Add 的位置和 Done 的配对)是编写健壮并发代码的基础。

7.3.3 启动多个 goroutine

在 Go 语言中实现并发就是这样简单,我们还可以启动多个 goroutine。让我们再来一个例子:(这里使用了 sync.WaitGroup 来实现等待 goroutine 执行完毕)。

package main

import (
        "fmt"
        "sync"
)

// wg 是全局的 sync.WaitGroup 变量,用于等待一组 goroutine 完成。
// WaitGroup 内部维护一个计数器,Add 方法增加计数,Done 方法减少计数,
// Wait 方法会阻塞直到计数器归零。
var wg sync.WaitGroup

// hello 是一个被多个 goroutine 执行的函数。
// 它接收一个整数参数 i,输出一条带有该数字的问候信息。
// 通过 defer wg.Done() 确保函数执行完毕后,无论正常返回还是发生 panic,
// 都会将 WaitGroup 的计数器减 1,表示该 goroutine 已完成。
func hello(i int) {
        // wg.Done() 等价于 wg.Add(-1),在函数返回前执行。
        defer wg.Done()

        // 打印当前 goroutine 的编号。
        fmt.Println("Hello Goroutine!", i)
}

func main() {
        // 循环启动 10 个 goroutine。
        for i := 0; i < 10; i++ {
                // 在启动每个 goroutine 之前,调用 wg.Add(1) 将计数器加 1,
                // 表示有一个新的 goroutine 需要等待。这一步必须放在 go 语句之前,
                // 避免因调度延迟导致 Wait 过早执行。
                wg.Add(1)
                // 使用 go 关键字启动一个新的 goroutine 来执行 hello(i)。
                // 每个 goroutine 并发运行,i 的值被复制传递。
                go hello(i)
        }

        // wg.Wait() 会阻塞 main goroutine,直到 WaitGroup 的计数器归零。
        // 也就是说,它会等待所有通过 wg.Add(1) 注册的 goroutine 都执行完 wg.Done()。
        // 这确保了所有 hello goroutine 完成后再继续执行 main 函数的后续代码。
        wg.Wait()

        // 所有 goroutine 完成后,程序正常退出。
        // 如果没有 wg.Wait(),main 函数可能会提前结束,导致部分 goroutine 来不及执行。
}

多次执行上面的代码,会发现每次打印的数字的顺序都不一致。这是因为 10 个 goroutine 是并发执行的,而 goroutine 的调度是随机的。

Go 语言的 goroutine 是并发执行的,这 10 个 goroutine 并不是按照 for 循环的顺序一个一个执行,而是被顺序启动后,由 Go 调度器决定它们的执行时机。for 循环本身在主 goroutine 中按顺序执行,每次迭代会立即启动一个新的 goroutine,但这些 goroutine 一旦启动,就会进入就绪队列,调度器会根据 CPU 核心数和调度策略让它们同时运行或交错运行,因此它们的输出顺序是不确定的,可能会看到任意编号的打印结果。

启动是顺序的,执行是并行的

注意:

在这个 for 循环中,执行顺序是严格顺序的:

1. 主 goroutine 执行 wg.Add(1):将 WaitGroup 计数器增加 1,表示有一个新的 goroutine 需要等待。这一步是同步完成的,立即生效。

2. 紧接着执行 go hello(i):启动一个新的 goroutine 来运行 hello(i)函数。这个启动操作本身也是同步完成的(即主 goroutine 发出启动指令后立即继续执行下一行),但新 goroutine 中的代码何时真正开始运行由 Go 调度器决定,可能立即运行,也可能稍后运行。

3. 循环继续下一次迭代:主 goroutine 继续执行 `i++` 并重复上述两步,直到循环结束。

关键点

(1)wg.Add(1)必须在 go语句之前调用,这是正确的做法,可以避免因调度延迟导致 wg.Wait() 在 goroutine 启动前就错误地开始等待。

(2)每个 goroutine 获得的 i 值是当前迭代时的值的副本(因为参数传递是值拷贝),所以不会有循环变量共享问题。

(3)所有 10 个 goroutine 启动后,主 goroutine 才会执行到 wg.Wait(),并阻塞直到所有 goroutine 调用 wg.Done()。

输出顺序:由于 goroutine 调度不确定,各个 `hello` 函数中的 `fmt.Println` 打印顺序可能是不固定的,但所有打印最终都会在 `wg.Wait()` 返回前完成。

7.3.4 设置 Golang 并行运行的时候占用的 CPU 数量

Go 运行时的调度器使用 GOMAXPROCS 参数来确定需要使用多少个 OS 线程来同时执行 Go 代码。默认值是机器上的 CPU 核心数。例如在一个 8 核心的机器上,调度器会把 Go 代码同时调度到 8 个 OS 线程上。

Go 语言中可以通过 runtime.GOMAXPROCS() 函数设置当前程序并发时占用的 CPU 逻辑核心数。

Go1.5 版本之前,默认使用的是单核心执行。Go1.5 版本之后,默认使用全部的 CPU 逻辑核心数。

package main

import (
    "fmt"
    "runtime" // 导入 runtime 包,提供与 Go 运行时环境交互的功能,包括 CPU 核心数查询和设置
)

func main() {

    // 获取当前计算机上面的逻辑 CPU 个数
    // runtime.NumCPU() 返回当前操作系统可用的逻辑 CPU 数量(包括超线程)
    cpuNum := runtime.NumCPU()

    // 打印 CPU 核心数到控制台
    fmt.Println("cpuNum=", cpuNum)

    // 设置 Go 程序可同时使用的 CPU 核心数(最大并行数)
    // runtime.GOMAXPROCS(n) 用于设置可同时执行的最大 CPU 数,并返回之前的设置
    // 这里传入 cpuNum-1,表示留出一个核心给操作系统和其他进程,避免全部占用
    // 如果传入的参数小于 1,则 GOMAXPROCS 不会改变;通常传入值应大于等于 1
    runtime.GOMAXPROCS(cpuNum - 1)

    // 输出提示信息,表示设置完成
    fmt.Println("ok")
}

7.3.5 案例:Goroutine 统计素数

需求:要统计 1-1000000 的数字中那些是素数?

(1)通过传统的 for 循环来统计

package main

import (
    "fmt"
    "time"
)

func main() {
    // 获取程序开始执行时的 Unix 时间戳(秒)
    start := time.Now().Unix()

    // 从 0 循环到 1000000,依次判断每个数是否为素数
    for num := 0; num <= 1000000; num++ {

        // 先假设当前数 num 是素数,flag 为 true
        flag := true

        // 内层循环:从 2 开始到 num-1,检查是否存在能整除 num 的数
        // 注意:对于 num=0 和 1,循环条件 i<num 不成立,直接跳过,flag 保持 true,但 0 和 1 不是素数,后面会修正逻辑
        for i := 2; i < num; i++ {
            if num % i == 0 { // 如果 num 能被 i 整除,说明 num 不是素数
                flag = false // 将 flag 设为 false
                break        // 退出内层循环,无需继续检查
            }
        }

        // 如果 flag 仍为 true,说明 num 是素数(实际上对于 0 和 1 会误判,此处是示例代码,暂不修正)
        if flag {
            fmt.Println(num) // 输出素数
        }
    }

    // 获取程序结束时的 Unix 时间戳(秒)
    end := time.Now().Unix()
    // 计算并输出耗时
    fmt.Println("普通的方法耗时=", end-start)
}

(2)goroutine 开启多个协程统计

package main

import (
        "fmt"
        "sync"
        "time"
)

// 定义全局的 sync.WaitGroup,用于等待所有 goroutine 完成
var wg sync.WaitGroup

// fun1 是一个并发执行的函数,负责计算并输出指定区间内的素数
// 参数 n 表示区间编号(1 到 4),每个区间覆盖 30000 个数:
// 区间1:1~30000,区间2:30001~60000,区间3:60001~90000,区间4:90001~120000
func fun1(n int) {
        // 计算当前区间的起始和结束数字
        // (n-1)*30000 + 1 是区间起始值,n*30000 是区间结束值
        for num := (n-1)*30000 + 1; num <= n*30000; num++ {
                flag := true // 先假设当前数 num 是素数

                // 内层循环:检查从 2 到 num-1 是否有能整除 num 的数
                // 如果存在,则 num 不是素数,将 flag 置为 false 并跳出循环
                for i := 2; i < num; i++ {
                        if num%i == 0 {
                                flag = false
                                break
                        }
                }

                // 如果 flag 仍为 true,说明 num 是素数,输出它
                // 注意:该逻辑会错误地将 1 判断为素数(因为内层循环不会执行),且 0 不在区间内
                if flag {
                        fmt.Println(num)
                }
        }
        // 当前 goroutine 工作完成,调用 wg.Done() 将 WaitGroup 计数器减 1
        wg.Done()
}

func main() {
        // 记录程序开始执行的时间戳(秒)
        start := time.Now().Unix()

        // 启动 4 个 goroutine,分别处理不同的数字区间
        for i := 1; i <= 4; i++ {
                // 在启动每个 goroutine 前,调用 wg.Add(1) 将计数器加 1,表示需要等待一个 goroutine
                wg.Add(1)
                // 启动 goroutine,传入区间编号 i
                go fun1(i)
        }

        // wg.Wait() 阻塞主 goroutine,直到所有 4 个 goroutine 都调用了 wg.Done(),即全部完成
        wg.Wait()

        // 记录程序结束时间戳
        end := time.Now().Unix()

        // 输出并行计算所消耗的总时间(秒)
        fmt.Println("并行方法耗时:", end-start)
}

问题:上面我们使用了 goroutine 已经能大大的提升性能,但是如果我们想统计数据打印数据同时进行,这个时候如何实现呢,这个时候我们就可以使用管道。

7.4 Channel 管道

管道是 Golang 在语言级别上提供的 goroutine 间的通讯方式,我们可以使用 channel 在多个 goroutine 之间传递消息。如果说 goroutine 是 Go 程序并发的执行体,channel 就是它们之间的连接。channel 是可以让一个 goroutine 发送特定值到另一个 goroutine 的通信机制。

Golang 的并发模型是 CSP(Communicating Sequential Processes),提倡通过通信共享内存而不是通过共享内存而实现通信。

Go 语言中的管道(channel)是一种特殊的类型。管道像一个传送带或者队列,总是遵循先入先出(First In First Out)的规则,保证收发数据的顺序。每一个管道都是一个具体类型的导管,也就是声明 channel 的时候需要为其指定元素类型。

7.4.1 channel 类型

channel 是一种类型,一种引用类型。声明管道类型的格式如下:

var 变量 chan 元素类型

举几个例子:

var ch1 chan int   // 声明一个传递整型的管道 
var ch2 chan bool  // 声明一个传递布尔型的管道 
var ch3 chan []int // 声明一个传递 int 切片的管道

7.4.2 创建 channel

声明的管道后需要使用 make 函数初始化之后才能使用。

创建 channel 的格式如下:

make(chan 元素类型, 容量)

举几个例子:

//创建一个能存储 10 个 int 类型数据的管道 
ch1 := make(chan int, 10) 
//创建一个能存储 4 个 bool 类型数据的管道 
ch2 := make(chan bool, 4) 
//创建一个能存储 3 个[]int 切片类型数据的管道 
ch3 := make(chan []int, 3)

7.4.3 channel 操作

管道有发送(send)、接收(receive)和关闭(close)三种操作。

发送和接收都使用<-符号。

现在我们先使用以下语句定义一个管道:

ch := make(chan int, 3)

(1)发送(将数据放在管道内)

将一个值发送到管道中。

ch <- 10 // 把 10 发送到 ch 中

(2)接收(从管道内取值)

从一个管道中接收值。

x := <- ch // 从 ch 中接收值并赋值给变量 x 
<-ch       // 从 ch 中接收值,忽略结果

(3)关闭管道

我们通过调用内置的 close 函数来关闭管道。

close(ch)

关闭管道需要注意,只有在通知接收方 goroutine 所有的数据都发送完毕的时候才需要关闭管道。管道是可以被垃圾回收机制回收的,它和关闭文件是不一样的,在结束操作之后关闭文件是必须要做的,但关闭管道不是必须的

关闭后的管道有以下特点:

(1)对一个关闭的管道再发送值就会导致 panic。

(2)对一个关闭的管道进行接收会一直获取值直到管道为空。

(3)对一个关闭的并且没有值的管道执行接收操作会得到对应类型的零值。

(4)关闭一个已经关闭的管道会导致 panic。

7.4.4 管道阻塞

(1)无缓冲的管道:如果创建管道的时候没有指定容量,那么我们可以叫这个管道为无缓冲的管道。无缓冲的管道又称为阻塞的管道。我们来看一下下面的代码:

package main

import "fmt"

func main() {
    ch := make(chan int)
    ch <- 10

    fmt.Println("发送成功")
}

上面这段代码能够通过编译,但是执行的时候会出现以下错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        D:/golang_learning/study01/main.go:7 +0x28
exit status 2

(2)有缓冲的管道:解决上面问题的方法还有一种就是使用有缓冲区的管道。我们可以在使用 make 函数初始化管道的时候为其指定管道的容量,例如:

package main

import "fmt"

func main() {
    ch := make(chan int, 1)
    ch <- 10

    fmt.Println("发送成功")
}

只要管道的容量大于零,那么该管道就是有缓冲的管道,管道的容量表示管道中能存放元素的数量。就像你小区的快递柜只有那么个多格子,格子满了就装不下了,就阻塞了,等到别人取走一个快递员就能往里面放一个。

管道阻塞具体代码如下:

package main

import "fmt"

func main() {
    ch := make(chan int, 1)
    ch <- 10
    ch <- 12
    fmt.Println("发送成功")
}

解决办法:

package main

import "fmt"

func main() {
    ch := make(chan int, 1)
    ch <- 10 // 放进去
    <-ch     // 取走
    ch <- 12 // 放进去
    <-ch     // 取走
    ch <- 17 // 还可以放进去
    fmt.Println("发送成功")
}

7.4.5 for... := range... 从管道循环取值

当向管道中发送完数据时,我们可以通过 close 函数来关闭管道。当管道被关闭时,再往该管道发送值会引发 panic,从该管道取值的操作会先取完管道中的值,再然后取到的值一直都是对应类型的零值。那如何判断一个管道是否被关闭了呢?

我们来看下面这个例子:

package main

import "fmt"

// 循环遍历管道数据
func main() {
    var ch1 = make(chan int, 5)

    for i := 0; i < 5; i++ {
        ch1 <- i + 1
    }
    close(ch1) // 关闭管道

    // 使用for range 遍历管道,当管道被关闭的时候就会退出 for range,如果没有关闭管道就会报错
    // 错误:fatal error:all goroutines are asleep - deadlock!

    // 通过for range 来遍历管道数据 管道没有key
    for val := range ch1 {
        fmt.Println(val)
    }
}

从上面的例子中我们看到有两种方式在接收值的时候判断该管道是否被关闭,不过我们通常使用的是 for range 的方式。使用for range 遍历管道时,并不是在管道被关闭的瞬间就立即退出,而是会先读完管道中所有已缓冲的值,然后才会退出循环。也就是说,只有当管道被关闭且缓冲区为空时,for range 才会自动结束。这是 Go 语言中处理管道关闭的惯用方式,确实比手动判断更简洁安全。

注意:这段代码是正确的,行为符合预期。虽然管道在遍历前已经通过 close(ch1) 关闭,但关闭管道并不会丢失缓冲区中已存在的数据。for range 会先读完管道缓冲区中的所有值(即 1,2,3,4,5),然后当管道被关闭且缓冲区为空时,才会自动退出循环。因此,代码正常输出 1 到 5,不会报错。

7.5 Goroutine 结合 Channel 管道

需求 1:定义两个方法,一个方法给管道里面写数据,一个给管道里面读取数据。要求同步进行。

  1、开启一个 fn1 的的协程给向管道 inChan 中写入 100 条数据

  2、开启一个 fn2 的协程读取 inChan 中写入的数据

  3、注意:fn1 和 fn2 同时操作一个管道

  4、主线程必须等待操作完成后才可以退出

package main

import (
    "fmt"
    "sync"
    "time"
)

var wg sync.WaitGroup

func fn1(intChan chan int) {
    for i := 0; i < 100; i++ {
        intChan <- i + 1
        fmt.Printf("writeData写入数据-%v\n", i+1)
        time.Sleep(time.Millisecond * 100)
    }

    close(intChan)
    wg.Done()
}

func fn2(intChan chan int) {
    for v := range intChan {
        fmt.Printf("readData 读到数据=%v\n", v)
        time.Sleep(time.Millisecond * 50)
    }
    wg.Done()
}
func main() {
    allChan := make(chan int, 100)

    wg.Add(1)
    go fn1(allChan)
    
    wg.Add(1)
    go fn2(allChan)

    wg.Wait()
    fmt.Println("读取完毕...")
}

Go 通过通道(channel)的机制,从语言和运行时层面保证了数据在 goroutine 之间传递的顺序一致性完整性

可以从以下几个层面来理解为什么“能确保”:

(1)通道的本质:先进先出的队列

在逻辑上,通道就是一个先进先出的队列。这意味着:

  • 发送顺序:当多个数据被发送到同一个通道时,它们在通道内部会按照发送的顺序排列。

  • 接收顺序:从通道接收数据时,一定是按照这个顺序取出。所以,fn1 发送的 1, 2, 3... 的顺序,会被 fn2 严格地以 1, 2, 3... 的顺序接收到。不会出现后发送的数据被先读到的情况。

(2)同步保证:发送与接收的配对

  Go 的通道操作(无缓冲或缓冲未满/未空时)是同步的:

  • 发送操作 (ch <- data) 会一直等待,直到有另一个 goroutine 准备好接收这个数据,或者数据被成功放入缓冲区。

  • 接收操作 (<-ch) 会一直等待,直到有另一个 goroutine 发送了数据,或者缓冲区中有数据可取。

  这种天然的同步机制确保了数据在被安全地“递到”接收方之前,发送方不会“反悔”或覆盖。

(3)内存可见性: happens-before 关系

  这是并发编程中一个更底层的保证。Go 的内存模型明确规定:

  • 对一个通道的发送操作,在另一个 goroutine 中对应的接收操作完成之前,就确保了“发生在前”(happens-before)的关系。

  • 这意味着,fn1 在写入 i+1 之前所做的所有内存修改,在 fn2 成功读取到这个值之后,都是完全可见的。你不会读到某个数据“写到一半”的状态。

(4)无数据竞争

  因为通道的这种同步特性,当通过通道进行通信时,两个 goroutine 实际上是在进行数据拷贝(值传递)或所有权转移(指针传递)。这使得它们不会同时对同一块内存进行非同步的读写,从而避免了数据竞争(data race)。数据不会因为并发访问而损坏或错乱。

(5)在带缓冲通道中的具体表现

  在你给出的代码中,通道 allChan 是有缓冲的(容量为 100):

  1. 写入fn1 负责往缓冲区里放数据。只要缓冲区没满,它就可以一直放,不会阻塞。

  2. 读取fn2 负责从缓冲区里取数据。只要缓冲区不为空,它就可以一直取。

  3. 保证:即使 fn1 已经写完了 100 个数据并关闭了通道,这 100 个数据依然安全地待在缓冲区里。fn2for range 循环会继续从缓冲区中按顺序读取,直到缓冲区被清空。这就像两个工人面对一个传送带,一个负责往传送带上放打包好的箱子,另一个负责从传送带上取箱子。只要箱子是好的,传送带是顺序的,那么取到的自然就是之前放进去的。

总结一下,能确保读到之前写进去的数据,是因为 Go 语言的通道被设计为一个并发安全的、先进先出的、同步的通信原语。它不仅仅是数据传输,更包含了同步和内存可见性的保证,这是 Go 并发哲学 “不要通过共享内存来通信,而要通过通信来共享内存” 的基石。

需求 2:goroutine 结合 channel 实现统计 1-120000 的数字中那些是素数?

package main

import (
        "fmt"
        "sync"
        "time"
)

// wg 是全局的 sync.WaitGroup,用于等待所有 goroutine 完成
var wg sync.WaitGroup

// putNum 向 intChan 通道中写入 0 到 119999 共 120000 个整数
func putNum(intChan chan int) {
        // 循环写入 120000 个数(i 从 0 到 119999)
        for i := 0; i < 120000; i++ {
                intChan <- i
        }

        // 数据写入完成后关闭 intChan,通知接收方不会再发送新数据
        close(intChan)
        // 标记该 goroutine 完成,WaitGroup 计数器减 1
        wg.Done()
}

// primeNum 从 intChan 读取数据,判断是否为素数,如果是则写入 primeChan
// 参数:
//   intChan   - 存放待判断整数的通道
//   primeChan - 存放判断结果为素数的通道
//   exitChan  - 用于通知主监控 goroutine 本任务已完成的通道
func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
        // 使用 for range 从 intChan 中循环读取数据,直到 intChan 关闭且数据读完
        for num := range intChan {
                // 假设 num 是素数
                var flag bool = true
                // 从 2 到 num-1 依次检查是否能整除
                // 注意:当 num <= 1 时,循环不会执行,flag 保持 true,因此会错误地将 0 和 1 判断为素数(此处作为示例暂不修正)
                for i := 2; i < num; i++ {
                        if num%i == 0 {
                                flag = false
                                break
                        }
                }
                // 如果 flag 仍为 true,说明 num 是素数,将其写入 primeChan
                if flag {
                        primeChan <- num
                }
        }
        // 当前 primeNum goroutine 完成任务,向 exitChan 发送一个信号
        exitChan <- true
        // 标记该 goroutine 完成
        wg.Done()
}

// printPrime 从 primeChan 读取素数并打印,直到 primeChan 关闭
func printPrime(primeChan chan int) {
        // for range 循环会一直读取,直到 primeChan 被关闭且缓冲区为空
        for v := range primeChan {
                fmt.Println(v)
        }
        // 标记该 goroutine 完成
        wg.Done()
}

func main() {
        // 记录程序开始时间
        start := time.Now().Unix()

        // 创建带缓冲的通道:
        // intChan   用于传递待判断的整数,容量 120000 可容纳所有输入
        // primeChan 用于传递素数结果,容量 200000 留有余量
        // exitChan  用于 primeNum goroutine 发送完成信号,容量与 primeNum 数量相同(8个)
        intChan := make(chan int, 120000)
        primeChan := make(chan int, 200000)
        exitChan := make(chan bool, 8)

        // 启动一个 goroutine 负责写入数据
        wg.Add(1)
        go putNum(intChan)

        // 启动 8 个 goroutine 并发执行素数判断
        for i := 0; i < 8; i++ {
                wg.Add(1)
                go primeNum(intChan, primeChan, exitChan)
        }

        // 启动一个 goroutine 负责打印素数结果
        wg.Add(1)
        go printPrime(primeChan)

        // 启动一个监控 goroutine,等待所有 primeNum goroutine 结束,然后关闭 primeChan
        wg.Add(1)
        go func() {
                // 循环从 exitChan 接收 8 个信号(对应 8 个 primeNum goroutine)
                for i := 0; i < 8; i++ {
                        <-exitChan
                }
                // 所有 primeNum 已完成,可以安全地关闭 primeChan
                // 关闭 primeChan 会使得 printPrime 中的 for range 循环结束
                close(primeChan)
                // 监控 goroutine 完成
                wg.Done()
        }()

        // 等待所有 goroutine 执行完毕
        wg.Wait()

        // 记录结束时间并计算耗时
        end := time.Now().Unix()
        fmt.Println("耗时:", end-start)
        fmt.Printf("main 线程退出")
}

7.6 单向管道

有的时候我们会将管道作为参数在多个任务函数间传递,很多时候我们在不同的任务函数中使用管道都会对其进行限制,比如限制管道在函数中只能发送或只能接收。

项目案例1:

package main

import "fmt"

func main() {
    // 默认情况下,管道是双向的
    // var chan1 chan int  // 可读可写

    // 声明为只写
    var chan2 chan <- int
    chan2 = make(chan int, 3)

    chan2 <- 20

    // num := <-chan2 // error

    fmt.Println("chan2=", chan2)

    // 3. 声明为只读
    var chan3 <- chan int
    num2 := <-chan3
    // chan3<- 30 //err
    fmt.Println("num2", num2)

}

7.7 select 多路复用

在某些场景下我们需要同时从多个通道接收数据。这个时候就可以用到 golang 中给我们提供的 select 多路复用。 通常情况通道在接收数据时,如果没有数据可以接收将会发生阻塞。 比如说下面代码来实现从多个通道接收数据的时候就会发生阻塞:

for{ 
    // 尝试从 ch1 接收值 
    data, ok := <-ch1 
    // 尝试从 ch2 接收值 
    data, ok := <-ch2 
    … 
}

这种方式虽然可以实现从多个管道接收值,但是运行性能会差很多。为了应对这种场景,Go 内置了 select 关键字,可以同时响应多个管道的操作。

select 的使用类似于 switch 语句,它有一系列 case 分支和一个默认的分支。每个 case 会对应一个管道的通信(接收或发送)过程。select 会一直等待,直到某个 case 的通信操作完成时,就会执行 case 分支对应的语句。具体格式如下:

select{ 
    case <-ch1: 
        ... 
    case data := <-ch2: 
        ... 
    case ch3<-data: 
        ...
    default: 
        默认操作 
}

举个小例子来演示下 select 的使用:

package main

import (
        "fmt"
        "time"
)

func main() {
        // 创建一个整型通道 intChan,缓冲区大小为 10
        intChan := make(chan int, 10)
        // 向 intChan 中写入 10 个整数(0 到 9)
        for i := 0; i < 10; i++ {
                intChan <- i
        }

        // 创建一个字符串通道 stringChan,缓冲区大小为 5
        stringChan := make(chan string, 5)
        // 向 stringChan 中写入 5 个字符串,格式为 "hello0", "hello1", ... "hello4"
        for i := 0; i < 5; i++ {
                stringChan <- "hello" + fmt.Sprintf("%d", i) // fmt.Sprintf 将整数 i 转换为字符串
        }

        // 无限循环,使用 select 多路复用从两个通道中读取数据
        for {
                select {
                // 如果 intChan 可读,则读取一个值赋给 v,并执行该分支
                case v := <-intChan:
                        fmt.Printf("从intChan读取的数据%d\n", v)
                // 如果 stringChan 可读,则读取一个值赋给 v,并执行该分支
                case v := <-stringChan:
                        fmt.Printf("从stringChan读取的数据%s\n", v)
                // 如果所有 case 都阻塞(即所有通道当前都无数据可读),则执行 default 分支
                default:
                        fmt.Printf("都取不到了,不玩了。程序员可以加入逻辑\n")
                        time.Sleep(time.Second) // 休眠一秒(模拟做一些其他工作)
                        return                   // 退出程序
                }
        }
}

为了帮助你更直观地理解 selectcase 的运行机制,我们可以把它想象成一个多路复用器,同时监听多个通道的通信状态。

(1)核心行为:

  1. 同时监听select 语句会同时关注所有 case 后面的通道操作(无论读写)。它不会按代码顺序依次检查,而是一视同仁地等待。

  2. 随机选择:当多个通道同时就绪(即有数据可读或可写)时,select随机选择一个 case 来执行。这保证了公平性,避免某个通道因为位置靠前而被“偏爱”。

  3. 阻塞等待:如果所有通道都未就绪,且没有 default,那么当前 goroutine 会阻塞,直到至少有一个通道就绪。

  4. default 保底:如果存在 default,当所有通道都未就绪时,会立即执行 default,从而避免阻塞。

(2)类比说明

想象你在同时听两个对讲机(通道 A 和 B):

  • 同时听着两个对讲机,一旦有声音(数据到达),你就会接听。

  • 如果两个对讲机同时响起,你会随机选择其中一个先接听。

  • 如果都没有声音,但你有其他事情可做(default),你就去做其他事;否则,你就一直等下去,直到有声音。

(3)代码示例剖析

示例1:多个就绪,随机选择

ch1 := make(chan int, 1)
ch2 := make(chan int, 1)
ch1 <- 1
ch2 <- 2

select {
case v := <-ch1:
    fmt.Println("ch1:", v)
case v := <-ch2:
    fmt.Println("ch2:", v)
}

由于两个通道都已存有数据,两个 case 同时就绪,select 会随机输出 "ch1:1" 或 "ch2:2"。你可以多次运行,结果可能不同。

示例2:一个就绪,执行那个

ch1 := make(chan int)

go func() {
    time.Sleep(1 * time.Second)
    ch1 <- 1
}()

select {
case v := <-ch1:
    fmt.Println("ch1:", v)  // 1秒后执行
case v := <-ch2:            // ch2 无数据,永远阻塞
    fmt.Println("ch2:", v)
}

此时只有 ch1 会在1秒后就绪,所以 select 会等待1秒后执行第一个 case,第二个 case 因为一直未就绪而被忽略。

示例3:default 的作用

ch := make(chan int)
select {
case v := <-ch:
    fmt.Println(v)
default:
    fmt.Println("通道无数据,执行默认逻辑")
}

这里 ch 无数据且未关闭,所有 case 阻塞,所以立即执行 default,输出 "通道无数据,执行默认逻辑",程序不会阻塞。

总结:

  • select 中的 case同时监听,不是顺序判断。

  • 多个就绪时随机选择

  • 全部阻塞时,有 defaultdefault,无 default 则阻塞等待。

  • 这种机制让你能够优雅地处理多路通道通信,而不必担心某个通道被忽略。

7.8 Golang 并发安全和锁

7.8.1 互斥锁

互斥锁是传统并发编程中对共享资源进行访问控制的主要手段,它由标准库 sync 中的 Mutex 结构体类型表示。sync.Mutex 类型只有两个公开的指针方法,Lock 和 Unlock。Lock 锁定当前的共享资源,Unlock 进行解锁。

有问题代码:

package main

import (
    "fmt"
    "time"
)

var count = 0

func test() {
    count++
    fmt.Println("the count is : ", count)
    time.Sleep(time.Millisecond)
}
func main() {
    for r := 0; r < 100; r++ {
        go test()
    }
    time.Sleep(time.Second)
}

go build -race main.go 然后我们运行 main.exe 就知道到底哪里存在互斥

使用互斥锁能够保证同一时间有且只有一个 goroutine 进入临界区,其他的 goroutine 则在等待锁;当互斥锁释放后,等待的 goroutine 才可以获取锁进入临界区,多个 goroutine 同时等待一个锁时,唤醒的策略是随机的。

虽然使用互斥锁能解决资源争夺问题,但是并不完美,通过全局变量加锁同步来实现通讯,并不利于多个协程对全局变量的读写操作。这个时候我们也可以通过另一种方式来实现上面的功能管道(Channel)。

7.8.2 读写互斥锁

互斥锁的本质是当一个 goroutine 访问的时候,其他 goroutine 都不能访问。这样在资源同步,避免竞争的同时也降低了程序的并发性能。程序由原来的并行执行变成了串行执行。

其实,当我们对一个不会变化的数据只做“读”操作的话,是不存在资源竞争的问题的。因为数据是不变的,不管怎么读取,多少 goroutine 同时读取,都是可以的。 所以问题不是出在“读”上,主要是修改,也就是“写”。修改的数据要同步,这样其他 goroutine 才可以感知到。所以真正的互斥应该是读取和修改、修改和修改之间,读和读是没有互斥操作的必要的。 因此,衍生出另外一种锁,叫做读写锁

读写锁可以让多个读操作并发,同时读取,但是对于写操作是完全互斥的。也就是说,当一个 goroutine 进行写操作的时候,其他 goroutine 既不能进行读操作,也不能进行写操作。

GO 中的读写锁由结构体类型 sync.RWMutex 表示。此类型的方法集合中包含两对方法:一组是对写操作的锁定和解锁,简称“写锁定”和“写解锁”:

func (*RWMutex)Lock() 
func (*RWMutex)Unlock()

另一组表示对读操作的锁定和解锁,简称为“读锁定”与“读解锁”:

func (*RWMutex)RLock() 
func (*RWMutex)RUnlock()读写锁示例:
package main

import (
    "fmt"
    "sync"
    "time"
)

var count int
var mutex sync.RWMutex
var wg sync.WaitGroup

// 写的方法
func write() {
    mutex.Lock()
    fmt.Println("执行写操作")
    time.Sleep(time.Second * 3)

    mutex.Unlock()

    wg.Done()
}

// 读的方法
func read() {
    mutex.RLock()
    fmt.Println("执行读操作")
    time.Sleep(time.Second * 3)
    mutex.RUnlock()
    wg.Done()
}

func main() {
    // 开启 10个 协程执行写操作
    for i := 0; i < 10; i++ {
        wg.Add(1)
        go read()
    }

    // 开启 10个协程执行读操作
    for i := 0; i < 10; i++ {
        // wg.Add(1) 把 WaitGroup 计数器加 1,表示“新增 1 个待完成的任务”,
        // 后面必须对应一次 wg.Done() 来减 1,否则 wg.Wait() 会永远阻塞。
        wg.Add(1)
        go write() // 启动一个新的 goroutine 去执行函数 write(),与当前线程并发运行,不阻塞调用者。
    }

    wg.Wait() // 等所有后台任务跑完再往下走
}

7.9 Goroutine Recover 解决协程中出现的 Panic

package main

import (
    "fmt"
    "time"
)

// sayHello 是一个简单的函数,循环 10 次,每秒打印一次 "hello world"
func sayHello() {
    for i := 0; i < 10; i++ {
        time.Sleep(time.Second) // 暂停 1 秒,模拟耗时操作
        fmt.Println("hello world")
    }
}

// test 函数演示了如何使用 defer + recover 捕获 panic,避免程序崩溃
func test() {
    // 使用 defer 定义一个匿名函数,在 test 函数返回前执行
    defer func() {
        // recover 用于捕获 panic 信息,如果没有 panic 则返回 nil
        if err := recover(); err != nil {
            // 捕获到 panic,打印错误信息,但程序不会终止
            fmt.Println("test()发生错误", err)
        }
    }()

    // 定义一个 map 变量,但没有使用 make 初始化,因此 myMap 为 nil
    var myMap map[int]string
    // 向 nil map 赋值会引发 panic: assignment to entry in nil map
    // 但由于上面的 defer 中使用了 recover,panic 被捕获,程序不会崩溃
    myMap[0] = "golang" // error
}

func main() {
    // 启动一个 goroutine 并发执行 sayHello 函数
    go sayHello()
    // 启动另一个 goroutine 并发执行 test 函数
    go test()

    // 主 goroutine 循环 10 次,每秒打印一次 "main() ok=" 及其索引
    for i := 0; i < 10; i++ {
        fmt.Println("main() ok=", i)
        time.Sleep(time.Second) // 暂停 1 秒,保持与子 goroutine 同步
    }
    // 主 goroutine 结束,程序退出,但其他 goroutine 尚未完成时也会被强制终止
    // 不过由于循环次数相同(10次),所有 goroutine 大约同时结束
}

7.10 练习题:Web3.0并发编程应用

练习1:多节点区块链数据同步

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// BlockchainNode 结构体表示一个区块链节点
type BlockchainNode struct {
    NodeID       string       // 节点唯一标识
    BlockData    []string     // 节点存储的区块数据(字符串切片)
    DataSyncChan chan string  // 用于节点间直接同步数据的通道
    Running      bool         // 节点运行状态标志
    Mutex        sync.RWMutex // 读写互斥锁,保护对 BlockData 的并发访问
}

// NewNode 创建一个新的区块链节点
func NewNode(nodeID string) *BlockchainNode {
    return &BlockchainNode{
        NodeID:       nodeID,
        BlockData:    []string{"创世区块"},       // 初始化时包含创世区块
        DataSyncChan: make(chan string, 100), // 带缓冲的同步通道
        Running:      true,
    }
}

// StartSync 启动节点的同步循环,监听来自网络或直接同步的数据
func (node *BlockchainNode) StartSync(networkChan chan string, wg *sync.WaitGroup) {
    defer wg.Done() // 函数结束时通知 WaitGroup 计数器减一

    fmt.Printf("🟢 节点 %s 开始同步数据\n", node.NodeID)

    for node.Running { // 只要节点还在运行,就持续监听
        select {
        case newBlock := <-node.DataSyncChan:
            // 从其他节点直接同步过来的新区块
            node.Mutex.Lock()
            node.BlockData = append(node.BlockData, newBlock) // 追加到本地链
            node.Mutex.Unlock()

            fmt.Printf("📥 节点 %s 接收到区块: %s\n", node.NodeID, newBlock)

            // 模拟处理新区块所需的时间(随机延迟)
            time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond)

        case newBlock := <-networkChan:
            // 从网络广播中接收到的新区块
            node.Mutex.Lock()
            node.BlockData = append(node.BlockData, newBlock)
            node.Mutex.Unlock()

            fmt.Printf("🌐 节点 %s 从网络接收到: %s\n", node.NodeID, newBlock)

        case <-time.After(2 * time.Second):
            // 超时检测:如果2秒内没有数据,检查节点是否应该停止
            if !node.Running {
                return
            }
            // 否则继续循环
        }
    }

    fmt.Printf("🔴 节点 %s 停止同步\n", node.NodeID)
}

// GenerateBlock 生成一个新的区块,并广播到网络
func (node *BlockchainNode) GenerateBlock(blockHeight int, networkChan chan string) {
    // 读取当前最新区块(只读锁)
    node.Mutex.RLock()
    latestBlock := node.BlockData[len(node.BlockData)-1]
    node.Mutex.RUnlock()

    // 构造新区块的描述字符串
    newBlock := fmt.Sprintf("区块%d(基于%s)", blockHeight, latestBlock)

    // 将新区块追加到本地链(写锁)
    node.Mutex.Lock()
    node.BlockData = append(node.BlockData, newBlock)
    node.Mutex.Unlock()

    fmt.Printf("⛏️  节点 %s 生成新区块: %s\n", node.NodeID, newBlock)

    // 尝试将新区块广播到网络(非阻塞,带超时)
    select {
    case networkChan <- newBlock:
        fmt.Printf("📤 节点 %s 广播区块到网络\n", node.NodeID)
    case <-time.After(100 * time.Millisecond):
        fmt.Printf("⚠️  节点 %s 广播超时\n", node.NodeID)
    }
}

// SyncToOtherNode 直接将数据同步到另一个节点
func (node *BlockchainNode) SyncToOtherNode(targetNode *BlockchainNode, data string) {
    select {
    case targetNode.DataSyncChan <- data: // 将数据发送到目标节点的同步通道
        fmt.Printf("🔁 节点 %s → 节点 %s: %s\n", node.NodeID, targetNode.NodeID, data)
    case <-time.After(50 * time.Millisecond):
        fmt.Printf("⏰ 节点 %s 同步到 %s 超时\n", node.NodeID, targetNode.NodeID)
    }
}

// NetworkSimulator 模拟区块链网络的传播行为
func NetworkSimulator(networkChan chan string, stopSignal chan bool) {
    fmt.Println("🌐 区块链网络启动")

    for {
        select {
        case <-networkChan: // 从网络通道接收数据,但不关心具体内容
            // 模拟网络传播延迟(随机 0~200ms)
            time.Sleep(time.Duration(rand.Intn(200)) * time.Millisecond)
            // 实际数据已通过 networkChan 分发到各个节点的网络接收分支

        case <-stopSignal: // 收到停止信号,退出
            fmt.Println("🌐 区块链网络停止")
            return

        case <-time.After(5 * time.Second): // 每5秒输出一次网络状态
            fmt.Println("🌐 网络运行中...")
        }
    }
}

// PerformConsistencyCheck 检查所有节点的区块数据是否达成共识
func PerformConsistencyCheck(nodes []*BlockchainNode) {
    consensusCount := 0 // 达成共识的区块数量

    // 以第一个节点为基准,逐个区块检查其他节点是否一致
    for i := 0; i < len(nodes[0].BlockData); i++ {
        block := nodes[0].BlockData[i]
        consistent := true

        // 检查其余节点是否包含该区块且内容相同
        for _, node := range nodes[1:] {
            if i >= len(node.BlockData) || node.BlockData[i] != block {
                consistent = false
                break
            }
        }

        if consistent {
            consensusCount++
            fmt.Printf("✅ 区块 %d 达成共识: %s\n", i, block)
        } else {
            fmt.Printf("❌ 区块 %d 未达成共识\n", i)
            break // 一旦出现不一致,停止检查(因为之后的区块必然不一致)
        }
    }

    totalBlocks := len(nodes[0].BlockData) // 总区块数
    consensusRate := float64(consensusCount) / float64(totalBlocks) * 100
    fmt.Printf("共识率: %.1f%% (%d/%d)\n", consensusRate, consensusCount, totalBlocks)
}

func main() {
    fmt.Println("🔗 多节点区块链数据同步")
    fmt.Println("========================")

    rand.Seed(time.Now().UnixNano()) // 初始化随机数种子

    // 创建网络通道和停止信号通道
    networkChan := make(chan string, 1000) // 广播网络通道,缓冲区较大
    stopSignal := make(chan bool)          // 用于停止网络模拟器

    // 创建多个节点
    nodes := []*BlockchainNode{
        NewNode("北京节点"),
        NewNode("上海节点"),
        NewNode("广州节点"),
        NewNode("深圳节点"),
        NewNode("杭州节点"),
    }

    var wg sync.WaitGroup // 等待组,用于等待所有节点 goroutine 完成

    // 启动网络模拟器 goroutine
    go NetworkSimulator(networkChan, stopSignal)

    // 启动所有节点的同步 goroutine
    for _, node := range nodes {
        wg.Add(1)
        go node.StartSync(networkChan, &wg)
    }

    // 模拟区块生成和网络活动(在另一个 goroutine 中执行)
    go func() {
        blockHeight := 1

        for i := 0; i < 10; i++ { // 生成10个区块
            // 随机选择一个节点生成区块
            randomNode := nodes[rand.Intn(len(nodes))]
            randomNode.GenerateBlock(blockHeight, networkChan)
            blockHeight++

            // 每两个循环一次,模拟节点间直接同步
            if i%2 == 0 && len(nodes) > 1 {
                sourceNode := nodes[rand.Intn(len(nodes))]
                targetNode := nodes[rand.Intn(len(nodes))]
                if sourceNode != targetNode {
                    // 获取源节点的最新区块数据
                    sourceNode.Mutex.RLock()
                    if len(sourceNode.BlockData) > 0 {
                        latestData := sourceNode.BlockData[len(sourceNode.BlockData)-1]
                        sourceNode.Mutex.RUnlock()
                        // 同步到目标节点
                        sourceNode.SyncToOtherNode(targetNode, latestData)
                    } else {
                        sourceNode.Mutex.RUnlock()
                    }
                }
            }

            time.Sleep(500 * time.Millisecond) // 每次生成间隔500ms
        }

        // 停止所有节点
        fmt.Println("\n🛑 停止所有节点...")
        for _, node := range nodes {
            node.Running = false // 设置运行标志为 false,节点将退出循环
        }

        // 停止网络模拟器
        stopSignal <- true
    }()

    // 等待所有节点 goroutine 结束
    wg.Wait()

    // 显示各节点的最终状态
    fmt.Println("\n📊 各节点最终状态:")
    for _, node := range nodes {
        node.Mutex.RLock()
        fmt.Printf("节点 %s: %d 个区块\n", node.NodeID, len(node.BlockData))
        if len(node.BlockData) > 0 {
            fmt.Printf("  最新区块: %s\n", node.BlockData[len(node.BlockData)-1])
        }
        node.Mutex.RUnlock()
    }

    // 执行一致性检查
    fmt.Println("\n🔍 一致性检查:")
    PerformConsistencyCheck(nodes)
}

练习2:并发加密货币价格聚合器

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// PriceData 表示一个价格数据点
type PriceData struct {
    Exchange  string    // 交易所名称
    Token     string    // 代币符号
    Price     float64   // 价格
    Timestamp time.Time // 时间戳
    Source    string    // 数据来源(API / DeFi)
}

// PriceAggregator 是价格聚合器,负责收集和汇总价格
type PriceAggregator struct {
    tokenPrices     map[string][]PriceData // 按代币存储的价格历史
    mu              sync.RWMutex            // 读写锁保护并发访问
    priceUpdateChan chan PriceData          // 价格更新通道
}

// NewPriceAggregator 创建一个新的价格聚合器
func NewPriceAggregator() *PriceAggregator {
    return &PriceAggregator{
        tokenPrices:     make(map[string][]PriceData),
        priceUpdateChan: make(chan PriceData, 100),
    }
}

// simulateExchangeAPI 模拟一个交易所的价格数据流
func simulateExchangeAPI(exchange string, token string, aggregator *PriceAggregator, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("📡 启动 %s %s 价格流\n", exchange, token)

    // 随机间隔 1~3 秒推送一次价格
    ticker := time.NewTicker(time.Duration(1+rand.Intn(3)) * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        // 模拟不同代币的基础价格
        basePrice := 3000.0
        if token == "BTC" {
            basePrice = 45000.0
        } else if token == "SOL" {
            basePrice = 100.0
        }

        // 价格随机波动 ±100
        price := basePrice + (rand.Float64()*200 - 100)
        data := PriceData{
            Exchange:  exchange,
            Token:     token,
            Price:     price,
            Timestamp: time.Now(),
            Source:    "API",
        }

        // 尝试发送到聚合器,超时则放弃
        select {
        case aggregator.priceUpdateChan <- data:
            // 发送成功
        case <-time.After(100 * time.Millisecond):
            fmt.Printf("⏰ %s 价格更新超时\n", exchange)
        }
    }
}

// simulateDefiProtocol 模拟一个 DeFi 协议的价格数据流
func simulateDefiProtocol(protocol string, token string, aggregator *PriceAggregator, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("🔄 启动 %s %s 价格流\n", protocol, token)

    ticker := time.NewTicker(time.Duration(2+rand.Intn(2)) * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        basePrice := 3000.0
        if token == "BTC" {
            basePrice = 45000.0
        }

        // DeFi 价格波动更大 ±200
        price := basePrice + (rand.Float64()*400 - 200)
        data := PriceData{
            Exchange:  protocol,
            Token:     token,
            Price:     price,
            Timestamp: time.Now(),
            Source:    "DeFi",
        }

        aggregator.priceUpdateChan <- data
    }
}

// processPriceUpdates 持续处理价格更新通道中的数据
func (aggregator *PriceAggregator) processPriceUpdates(wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("🔄 启动价格更新处理器")

    for data := range aggregator.priceUpdateChan {
        aggregator.mu.Lock()

        // 初始化该代币的价格列表(如果需要)
        if aggregator.tokenPrices[data.Token] == nil {
            aggregator.tokenPrices[data.Token] = []PriceData{}
        }
        // 追加新数据
        aggregator.tokenPrices[data.Token] = append(aggregator.tokenPrices[data.Token], data)

        // 只保留最近的100条记录
        if len(aggregator.tokenPrices[data.Token]) > 100 {
            aggregator.tokenPrices[data.Token] = aggregator.tokenPrices[data.Token][1:]
        }

        aggregator.mu.Unlock()

        fmt.Printf("💰 %s %s: $%.2f\n", data.Exchange, data.Token, data.Price)
    }
}

// calculateAggregatedPrice 计算指定代币的聚合价格(基于最近30秒的有效数据)
func (aggregator *PriceAggregator) calculateAggregatedPrice(token string) (float64, int, error) {
    aggregator.mu.RLock()
    defer aggregator.mu.RUnlock()

    prices := aggregator.tokenPrices[token]
    if len(prices) == 0 {
        return 0, 0, fmt.Errorf("没有价格数据")
    }

    // 只考虑最近30秒内的数据
    var validPrices []float64
    cutoff := time.Now().Add(-30 * time.Second)

    for _, data := range prices {
        if data.Timestamp.After(cutoff) {
            validPrices = append(validPrices, data.Price)
        }
    }

    if len(validPrices) == 0 {
        return 0, 0, fmt.Errorf("没有有效价格数据")
    }

    // 使用平均值作为聚合价格(原代码注释中写的是中位数,但实际实现是平均值)
    return calculateAverage(validPrices), len(validPrices), nil
}

// calculateAverage 计算浮点数切片的平均值
func calculateAverage(values []float64) float64 {
    sum := 0.0
    for _, v := range values {
        sum += v
    }
    return sum / float64(len(values))
}

// priceMonitor 定时监控并输出聚合价格
func priceMonitor(aggregator *PriceAggregator, tokens []string, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("👀 启动价格监控器")

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        fmt.Println("\n📊 价格监控报告:")

        for _, token := range tokens {
            aggPrice, dataPoints, err := aggregator.calculateAggregatedPrice(token)
            if err != nil {
                fmt.Printf("  %s: %v\n", token, err)
                continue
            }

            fmt.Printf("  %s: $%.2f (基于%d个数据源)\n", token, aggPrice, dataPoints)

            // 检测价格异常
            aggregator.detectPriceAnomaly(token, aggPrice)
        }
    }
}

// detectPriceAnomaly 检测价格异常波动(变化超过5%)
func (aggregator *PriceAggregator) detectPriceAnomaly(token string, currentPrice float64) {
    aggregator.mu.RLock()
    defer aggregator.mu.RUnlock()

    prices := aggregator.tokenPrices[token]
    if len(prices) < 5 {
        return
    }

    // 与上一个价格比较
    if len(prices) >= 2 {
        prevPrice := prices[len(prices)-2].Price
        changePercent := (currentPrice - prevPrice) / prevPrice * 100

        if abs(changePercent) > 5 {
            fmt.Printf("    🚨 %s 价格异常波动: %.2f%%\n", token, changePercent)
        }
    }
}

// abs 返回浮点数的绝对值
func abs(x float64) float64 {
    if x < 0 {
        return -x
    }
    return x
}

// generateFinalReport 生成最终的统计报告
func generateFinalReport(aggregator *PriceAggregator, tokens []string) {
    for _, token := range tokens {
        aggregator.mu.RLock()
        prices := aggregator.tokenPrices[token]
        aggregator.mu.RUnlock()

        if len(prices) == 0 {
            fmt.Printf("%s: 无数据\n", token)
            continue
        }

        // 统计各个数据源的贡献数量
        sourceCount := make(map[string]int)
        for _, data := range prices {
            sourceCount[data.Exchange]++
        }

        aggPrice, validPoints, _ := aggregator.calculateAggregatedPrice(token)

        fmt.Printf("\n%s 报告:\n", token)
        fmt.Printf("  聚合价格: $%.2f\n", aggPrice)
        fmt.Printf("  总数据点: %d\n", len(prices))
        fmt.Printf("  有效数据点: %d\n", validPoints)
        fmt.Printf("  数据源: %d 个\n", len(sourceCount))

        fmt.Println("  数据源分布:")
        for source, count := range sourceCount {
            fmt.Printf("    %s: %d\n", source, count)
        }
    }
}

func main() {
    fmt.Println("💰 并发加密货币价格聚合器")
    fmt.Println("========================")

    rand.Seed(time.Now().UnixNano())

    // 创建聚合器实例
    aggregator := NewPriceAggregator()

    var wg sync.WaitGroup

    // 启动价格更新处理 goroutine
    wg.Add(1)
    go aggregator.processPriceUpdates(&wg)

    tokens := []string{"ETH", "BTC", "SOL"}
    exchanges := []string{"Binance", "Coinbase", "OKX", "Kraken"}
    defiProtocols := []string{"Uniswap", "Curve", "Balancer"}

    // 启动所有交易所的数据源
    for _, exchange := range exchanges {
        for _, token := range tokens {
            wg.Add(1)
            go simulateExchangeAPI(exchange, token, aggregator, &wg)
        }
    }

    // 启动所有 DeFi 协议的数据源
    for _, protocol := range defiProtocols {
        for _, token := range tokens {
            wg.Add(1)
            go simulateDefiProtocol(protocol, token, aggregator, &wg)
        }
    }

    // 启动价格监控器
    wg.Add(1)
    go priceMonitor(aggregator, tokens, &wg)

    // 让程序运行30秒,收集数据
    time.Sleep(30 * time.Second)

    // 输出最终报告
    fmt.Println("\n📈 最终价格报告:")
    generateFinalReport(aggregator, tokens)

    // 关闭价格更新通道,通知处理 goroutine 退出
    close(aggregator.priceUpdateChan)

    // 等待所有 goroutine 结束
    wg.Wait()
}

练习3:DeFi协议并发用户操作模拟

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// UserOperation 表示一个用户操作
type UserOperation struct {
    UserID        string    // 用户ID
    OperationType string    // 操作类型
    Token         string    // 代币
    Amount        float64   // 数量
    Timestamp     time.Time // 时间
}

// DeFiProtocol 表示一个 DeFi 协议
type DeFiProtocol struct {
    Name           string             // 协议名称
    TotalLiquidity float64            // 总流动性
    UserBalances   map[string]float64 // 用户余额
    Operations     []UserOperation    // 操作记录
    Mu             sync.RWMutex       // 读写锁
    OperationChan  chan UserOperation // 操作通道
}

// NewDeFiProtocol 创建一个新的 DeFi 协议
func NewDeFiProtocol(name string, initialLiquidity float64) *DeFiProtocol {
    return &DeFiProtocol{
        Name:           name,
        TotalLiquidity: initialLiquidity,
        UserBalances:   make(map[string]float64),
        OperationChan:  make(chan UserOperation, 100),
    }
}

// ProcessOperations 处理用户操作
func (protocol *DeFiProtocol) ProcessOperations(wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("🔄 %s 开始处理用户操作\n", protocol.Name)

    for op := range protocol.OperationChan {
        protocol.Mu.Lock()

        switch op.OperationType {
        case "存款":
            protocol.handleDeposit(op)
        case "取款":
            protocol.handleWithdraw(op)
        case "交换":
            protocol.handleSwap(op)
        case "质押":
            protocol.handleStake(op)
        }

        protocol.Operations = append(protocol.Operations, op)
        protocol.Mu.Unlock()

        // 模拟处理时间
        time.Sleep(time.Duration(rand.Intn(50)) * time.Millisecond)
    }

    fmt.Printf("🛑 %s 停止处理用户操作\n", protocol.Name)
}

// handleDeposit 处理存款操作
func (protocol *DeFiProtocol) handleDeposit(op UserOperation) {
    protocol.UserBalances[op.UserID] += op.Amount
    protocol.TotalLiquidity += op.Amount

    fmt.Printf("💰 %s %s 存款 %.2f %s | 余额: %.2f\n",
        protocol.Name, op.UserID, op.Amount, op.Token, protocol.UserBalances[op.UserID])
}

// handleWithdraw 处理取款操作
func (protocol *DeFiProtocol) handleWithdraw(op UserOperation) {
    if protocol.UserBalances[op.UserID] >= op.Amount {
        protocol.UserBalances[op.UserID] -= op.Amount
        protocol.TotalLiquidity -= op.Amount

        fmt.Printf("💸 %s %s 取款 %.2f %s | 余额: %.2f\n",
            protocol.Name, op.UserID, op.Amount, op.Token, protocol.UserBalances[op.UserID])
    } else {
        fmt.Printf("❌ %s %s 取款失败 | 余额不足: %.2f < %.2f\n",
            protocol.Name, op.UserID, protocol.UserBalances[op.UserID], op.Amount)
    }
}

// handleSwap 处理交换操作
func (protocol *DeFiProtocol) handleSwap(op UserOperation) {
    if protocol.UserBalances[op.UserID] >= op.Amount {
        // 模拟交换逻辑(简化)
        protocol.UserBalances[op.UserID] -= op.Amount
        swapOutput := op.Amount * 0.99 // 1%手续费

        fmt.Printf("🔄 %s %s 交换 %.2f %s → %.2f USDC\n",
            protocol.Name, op.UserID, op.Amount, op.Token, swapOutput)
    } else {
        fmt.Printf("❌ %s %s 交换失败 | 余额不足\n", protocol.Name, op.UserID)
    }
}

// handleStake 处理质押操作
func (protocol *DeFiProtocol) handleStake(op UserOperation) {
    if protocol.UserBalances[op.UserID] >= op.Amount {
        protocol.UserBalances[op.UserID] -= op.Amount

        fmt.Printf("🔒 %s %s 质押 %.2f %s\n",
            protocol.Name, op.UserID, op.Amount, op.Token)
    } else {
        fmt.Printf("❌ %s %s 质押失败 | 余额不足\n", protocol.Name, op.UserID)
    }
}

// SimulateUser 模拟一个用户的操作行为
func SimulateUser(userID string, protocols []*DeFiProtocol, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("👤 用户 %s 开始操作\n", userID)

    operationTypes := []string{"存款", "取款", "交换", "质押"}
    tokens := []string{"ETH", "USDC", "BTC"}

    for i := 0; i < 8; i++ {
        // 随机选择协议和操作
        protocol := protocols[rand.Intn(len(protocols))]
        opType := operationTypes[rand.Intn(len(operationTypes))]
        token := tokens[rand.Intn(len(tokens))]
        amount := 50.0 + rand.Float64()*200 // 50-250

        op := UserOperation{
            UserID:        userID,
            OperationType: opType,
            Token:         token,
            Amount:        amount,
            Timestamp:     time.Now(),
        }

        // 发送操作到协议
        select {
        case protocol.OperationChan <- op:
            // 发送成功
        case <-time.After(100 * time.Millisecond):
            fmt.Printf("⏰ 用户 %s 操作超时\n", userID)
        }

        time.Sleep(time.Duration(500+rand.Intn(500)) * time.Millisecond)
    }

    fmt.Printf("👤 用户 %s 完成操作\n", userID)
}

// MonitorProtocols 监控所有协议的状态
func MonitorProtocols(protocols []*DeFiProtocol, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("📊 启动协议监控器")

    ticker := time.NewTicker(3 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        fmt.Println("\n📈 协议状态监控:")

        for _, protocol := range protocols {
            protocol.Mu.RLock()

            fmt.Printf("%s:\n", protocol.Name)
            fmt.Printf("  总流动性: $%.2f\n", protocol.TotalLiquidity)
            fmt.Printf("  用户数量: %d\n", len(protocol.UserBalances))
            fmt.Printf("  操作记录: %d 条\n", len(protocol.Operations))

            // 计算活跃用户
            activeUsers := 0
            for _, balance := range protocol.UserBalances {
                if balance > 0 {
                    activeUsers++
                }
            }
            fmt.Printf("  活跃用户: %d\n", activeUsers)

            protocol.Mu.RUnlock()
        }
    }
}

// GenerateReport 生成所有协议的最终报告
func GenerateReport(protocols []*DeFiProtocol) {
    totalLiquidity := 0.0
    totalUsers := 0
    totalOps := 0

    for _, protocol := range protocols {
        protocol.Mu.RLock()

        totalLiquidity += protocol.TotalLiquidity

        userCount := len(protocol.UserBalances)
        totalUsers += userCount

        opCount := len(protocol.Operations)
        totalOps += opCount

        fmt.Printf("\n%s 详细报告:\n", protocol.Name)
        fmt.Printf("  最终流动性: $%.2f\n", protocol.TotalLiquidity)
        fmt.Printf("  用户数量: %d\n", userCount)
        fmt.Printf("  操作数量: %d\n", opCount)

        // 用户余额排名
        fmt.Println("  用户余额排名:")
        rank := 0
        for userID, balance := range protocol.UserBalances {
            if rank < 3 { // 只显示前3名
                fmt.Printf("    %s: $%.2f\n", userID, balance)
                rank++
            }
        }

        protocol.Mu.RUnlock()
    }

    fmt.Printf("\n📊 总体统计:\n")
    fmt.Printf("  总流动性: $%.2f\n", totalLiquidity)
    fmt.Printf("  总用户数: %d\n", totalUsers)
    fmt.Printf("  总操作数: %d\n", totalOps)
    fmt.Printf("  平均每用户操作数: %.1f\n", float64(totalOps)/float64(totalUsers))
}

func main() {
    fmt.Println("🔄 DeFi协议并发用户操作模拟")
    fmt.Println("========================")

    rand.Seed(time.Now().UnixNano())

    // 创建多个DeFi协议
    protocols := []*DeFiProtocol{
        NewDeFiProtocol("Uniswap V3", 1000000),
        NewDeFiProtocol("Compound", 500000),
        NewDeFiProtocol("Aave V3", 800000),
        NewDeFiProtocol("Curve Finance", 600000),
    }

    var wg sync.WaitGroup

    // 启动所有协议的操作处理器
    for _, protocol := range protocols {
        wg.Add(1)
        go protocol.ProcessOperations(&wg)
    }

    // 启动协议监控器
    wg.Add(1)
    go MonitorProtocols(protocols, &wg)

    // 模拟多个用户并发操作
    users := []string{"Alice", "Bob", "Charlie", "David", "Eve", "Frank"}

    for _, user := range users {
        wg.Add(1)
        go SimulateUser(user, protocols, &wg)
    }

    // 等待用户操作完成
    time.Sleep(15 * time.Second)

    // 关闭所有协议的操作通道
    for _, protocol := range protocols {
        close(protocol.OperationChan)
    }

    // 等待所有操作完成
    wg.Wait()

    // 生成最终报告
    fmt.Println("\n📋 最终协议报告:")
    GenerateReport(protocols)
}

练习4:智能合约事件并发处理

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// ContractEvent 表示一个智能合约事件
type ContractEvent struct {
    EventType       string                 // 事件类型
    ContractAddress string                 // 合约地址
    TxHash          string                 // 交易哈希
    BlockNumber     uint64                 // 块高
    LogIndex        uint                   // 日志索引
    Data            map[string]interface{} // 事件数据
    Timestamp       time.Time              // 时间戳
}

// EventProcessor 事件处理器
type EventProcessor struct {
    Name            string             // 处理器名称
    ProcessedEvents map[string]int     // 处理的事件统计
    Mu              sync.RWMutex       // 读写锁
    EventChan       chan ContractEvent // 事件通道
}

// NewEventProcessor 创建一个新的事件处理器
func NewEventProcessor(name string) *EventProcessor {
    return &EventProcessor{
        Name:            name,
        ProcessedEvents: make(map[string]int),
        EventChan:       make(chan ContractEvent, 100),
    }
}

// StartProcessing 开始处理事件
func (p *EventProcessor) StartProcessing(wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("🔄 %s 开始处理事件\n", p.Name)

    for event := range p.EventChan {
        // 模拟处理时间
        processingTime := time.Duration(50+rand.Intn(100)) * time.Millisecond
        time.Sleep(processingTime)

        p.Mu.Lock()
        p.ProcessedEvents[event.EventType]++
        p.Mu.Unlock()

        fmt.Printf("📨 %s 处理 %s 事件 | 块高: %d | 耗时: %v\n",
            p.Name, event.EventType, event.BlockNumber, processingTime)

        // 根据事件类型执行特定逻辑
        p.handleSpecificEvent(event)
    }

    fmt.Printf("🛑 %s 停止处理事件\n", p.Name)
}

// handleSpecificEvent 根据事件类型执行特定处理
func (p *EventProcessor) handleSpecificEvent(event ContractEvent) {
    switch event.EventType {
    case "Transfer":
        from, _ := event.Data["from"].(string)
        to, _ := event.Data["to"].(string)
        value, _ := event.Data["value"].(float64)

        fmt.Printf("   🔄 转账: %s → %s (%.2f)\n",
            hideAddress(from), hideAddress(to), value)

    case "Swap":
        amountIn, _ := event.Data["amountIn"].(float64)
        amountOut, _ := event.Data["amountOut"].(float64)

        fmt.Printf("   🔀 交换: %.2f → %.2f\n", amountIn, amountOut)

    case "LiquidityAdded":
        provider, _ := event.Data["provider"].(string)
        amount, _ := event.Data["amount"].(float64)

        fmt.Printf("   💧 添加流动性: %s (%.2f)\n",
            hideAddress(provider), amount)

    case "LiquidityRemoved":
        provider, _ := event.Data["provider"].(string)
        amount, _ := event.Data["amount"].(float64)

        fmt.Printf("   🚰 移除流动性: %s (%.2f)\n",
            hideAddress(provider), amount)
    }
}

// EventDispatcher 事件分发器
type EventDispatcher struct {
    Processors []*EventProcessor  // 处理器列表
    EventChan  chan ContractEvent // 事件通道
}

// NewEventDispatcher 创建一个新的事件分发器
func NewEventDispatcher() *EventDispatcher {
    return &EventDispatcher{
        EventChan: make(chan ContractEvent, 1000),
    }
}

// AddProcessor 添加处理器
func (d *EventDispatcher) AddProcessor(p *EventProcessor) {
    d.Processors = append(d.Processors, p)
}

// StartDispatching 开始分发事件
func (d *EventDispatcher) StartDispatching(wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("🌐 事件分发器启动")

    for event := range d.EventChan {
        // 根据事件类型路由到不同的处理器
        for _, proc := range d.Processors {
            if d.shouldHandle(proc.Name, event.EventType) {
                select {
                case proc.EventChan <- event:
                    // 成功发送
                case <-time.After(50 * time.Millisecond):
                    fmt.Printf("⏰ %s 事件发送超时\n", proc.Name)
                }
            }
        }
    }

    // 关闭所有处理器的事件通道
    for _, proc := range d.Processors {
        close(proc.EventChan)
    }

    fmt.Println("🌐 事件分发器停止")
}

// shouldHandle 判断处理器是否应该处理该事件
func (d *EventDispatcher) shouldHandle(processorName string, eventType string) bool {
    switch processorName {
    case "转账处理器":
        return eventType == "Transfer"
    case "交换处理器":
        return eventType == "Swap"
    case "流动性处理器":
        return eventType == "LiquidityAdded" || eventType == "LiquidityRemoved"
    case "统计处理器":
        return true // 处理所有事件
    default:
        return false
    }
}

// EventGenerator 事件生成器
func EventGenerator(dispatcher *EventDispatcher, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("🎲 事件生成器启动")

    eventTypes := []string{"Transfer", "Swap", "LiquidityAdded", "LiquidityRemoved"}

    for i := 1; i <= 50; i++ {
        et := eventTypes[rand.Intn(len(eventTypes))]

        event := ContractEvent{
            EventType:       et,
            ContractAddress: fmt.Sprintf("0x%x", rand.Int63()),
            TxHash:          fmt.Sprintf("0x%x", rand.Int63()),
            BlockNumber:     uint64(15000000 + i),
            LogIndex:        uint(rand.Intn(10)),
            Timestamp:       time.Now(),
            Data:            generateEventData(et),
        }

        dispatcher.EventChan <- event
        time.Sleep(time.Duration(100+rand.Intn(200)) * time.Millisecond)
    }

    close(dispatcher.EventChan)
    fmt.Println("🎲 事件生成器停止")
}

// generateEventData 生成事件数据
func generateEventData(eventType string) map[string]interface{} {
    data := make(map[string]interface{})

    switch eventType {
    case "Transfer":
        data["from"] = fmt.Sprintf("0x%x", rand.Int63())
        data["to"] = fmt.Sprintf("0x%x", rand.Int63())
        data["value"] = 100.0 + rand.Float64()*1000

    case "Swap":
        data["amountIn"] = 50.0 + rand.Float64()*500
        data["amountOut"] = 45.0 + rand.Float64()*450

    case "LiquidityAdded", "LiquidityRemoved":
        data["provider"] = fmt.Sprintf("0x%x", rand.Int63())
        data["amount"] = 1000.0 + rand.Float64()*5000
    }

    return data
}

// Monitor 监控器,定期输出处理器统计
func Monitor(processors []*EventProcessor, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("📊 启动事件监控器")

    ticker := time.NewTicker(2 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        fmt.Println("\n📈 事件处理统计:")

        for _, proc := range processors {
            proc.Mu.RLock()

            fmt.Printf("%s:\n", proc.Name)
            totalEvents := 0
            for evType, count := range proc.ProcessedEvents {
                fmt.Printf("  %s: %d\n", evType, count)
                totalEvents += count
            }
            fmt.Printf("  总计: %d\n", totalEvents)

            proc.Mu.RUnlock()
        }
    }
}

// hideAddress 隐藏地址中间部分,只显示首尾
func hideAddress(addr string) string {
    if len(addr) > 10 {
        return addr[:6] + "..." + addr[len(addr)-4:]
    }
    return addr
}

func main() {
    fmt.Println("📝 智能合约事件并发处理")
    fmt.Println("========================")

    rand.Seed(time.Now().UnixNano())

    // 创建事件分发器
    dispatcher := NewEventDispatcher()

    // 创建多个事件处理器
    processors := []*EventProcessor{
        NewEventProcessor("转账处理器"),
        NewEventProcessor("交换处理器"),
        NewEventProcessor("流动性处理器"),
        NewEventProcessor("统计处理器"),
    }

    // 注册处理器到分发器
    for _, proc := range processors {
        dispatcher.AddProcessor(proc)
    }

    var wg sync.WaitGroup

    // 启动事件分发器
    wg.Add(1)
    go dispatcher.StartDispatching(&wg)

    // 启动所有事件处理器
    for _, proc := range processors {
        wg.Add(1)
        go proc.StartProcessing(&wg)
    }

    // 启动监控器
    wg.Add(1)
    go Monitor(processors, &wg)

    // 启动事件生成器
    wg.Add(1)
    go EventGenerator(dispatcher, &wg)

    // 等待所有任务完成
    wg.Wait()

    // 生成最终报告
    fmt.Println("\n📋 最终事件处理报告:")
    generateFinalReport(processors)
}

// generateFinalReport 生成最终统计报告
func generateFinalReport(processors []*EventProcessor) {
    totalProcessed := 0

    for _, proc := range processors {
        proc.Mu.RLock()

        procTotal := 0
        for _, count := range proc.ProcessedEvents {
            procTotal += count
        }
        totalProcessed += procTotal

        fmt.Printf("\n%s:\n", proc.Name)
        fmt.Printf("  处理事件总数: %d\n", procTotal)
        fmt.Printf("  处理事件类型: %d 种\n", len(proc.ProcessedEvents))

        // 显示事件类型分布
        for evType, count := range proc.ProcessedEvents {
            percentage := float64(count) / float64(procTotal) * 100
            fmt.Printf("    %s: %d (%.1f%%)\n", evType, count, percentage)
        }

        proc.Mu.RUnlock()
    }

    fmt.Printf("\n📊 总体统计:\n")
    fmt.Printf("  总处理事件: %d\n", totalProcessed)
    fmt.Printf("  平均每处理器: %d\n", totalProcessed/len(processors))

    // 性能分析
    if totalProcessed > 0 {
        fmt.Printf("  处理效率: 约 %.1f 事件/秒\n",
            float64(totalProcessed)/30.0) // 假设运行30秒
    }
}

练习5:跨链桥并发资产转移

package main

import (
    "fmt"
    "math/rand"
    "sync"
    "time"
)

// CrossChainTransferRequest represents a cross-chain transfer request
type CrossChainTransferRequest struct {
    TransferID  string    // 转移ID
    SourceChain string    // 源链
    TargetChain string    // 目标链
    Asset       string    // 资产
    Amount      float64   // 数量
    UserAddress string    // 用户地址
    Status      string    // 状态
    CreatedAt   time.Time // 创建时间
    UpdatedAt   time.Time // 更新时间
}

// BridgeService represents a cross-chain bridge service
type BridgeService struct {
    Name              string                                // 服务名称
    SupportedPairs    map[string]bool                       // 支持链对 "ETH-POLYGON": true
    PendingRequests   map[string]*CrossChainTransferRequest // 处理中请求
    CompletedRequests map[string]*CrossChainTransferRequest // 完成请求
    Mu                sync.RWMutex                          // 互斥锁
    RequestChan       chan *CrossChainTransferRequest       // 请求通道
    StatusChan        chan *CrossChainTransferRequest       // 状态更新通道
}

// NewBridgeService creates a new bridge service
func NewBridgeService(name string) *BridgeService {
    return &BridgeService{
        Name:              name,
        SupportedPairs:    make(map[string]bool),
        PendingRequests:   make(map[string]*CrossChainTransferRequest),
        CompletedRequests: make(map[string]*CrossChainTransferRequest),
        RequestChan:       make(chan *CrossChainTransferRequest, 100),
        StatusChan:        make(chan *CrossChainTransferRequest, 100),
    }
}

// AddSupportedPair adds a supported chain pair to the bridge
func (b *BridgeService) AddSupportedPair(sourceChain, targetChain string) {
    pair := sourceChain + "-" + targetChain
    b.SupportedPairs[pair] = true
    fmt.Printf("🌉 %s 支持链对: %s → %s\n", b.Name, sourceChain, targetChain)
}

// ProcessTransferRequests handles incoming transfer requests
func (b *BridgeService) ProcessTransferRequests(wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("🔄 %s 开始处理转移请求\n", b.Name)

    for req := range b.RequestChan {
        // validate request
        if !b.validateRequest(req) {
            continue
        }

        // start processing
        b.Mu.Lock()
        req.Status = "处理中"
        req.UpdatedAt = time.Now()
        b.PendingRequests[req.TransferID] = req
        b.Mu.Unlock()

        b.StatusChan <- req

        // simulate cross-chain transfer process
        go b.executeTransfer(req)
    }

    fmt.Printf("🛑 %s 停止处理转移请求\n", b.Name)
}

// validateRequest validates a transfer request
func (b *BridgeService) validateRequest(req *CrossChainTransferRequest) bool {
    pair := req.SourceChain + "-" + req.TargetChain

    b.Mu.RLock()
    defer b.Mu.RUnlock()

    if !b.SupportedPairs[pair] {
        fmt.Printf("❌ %s 不支持链对: %s\n", b.Name, pair)
        return false
    }

    if b.PendingRequests[req.TransferID] != nil {
        fmt.Printf("❌ %s 请求已存在: %s\n", b.Name, req.TransferID)
        return false
    }

    return true
}

// executeTransfer performs the cross-chain transfer steps
func (b *BridgeService) executeTransfer(req *CrossChainTransferRequest) {
    fmt.Printf("🚀 %s 开始执行转移: %s\n", b.Name, req.TransferID)

    // step1: lock assets on source chain
    b.updateRequestStatus(req, "锁定中")
    time.Sleep(time.Duration(1+rand.Intn(2)) * time.Second)

    // step2: wait for cross-chain validation
    b.updateRequestStatus(req, "验证中")
    time.Sleep(time.Duration(2+rand.Intn(3)) * time.Second)

    // step3: mint assets on target chain
    b.updateRequestStatus(req, "铸造中")
    time.Sleep(time.Duration(1+rand.Intn(2)) * time.Second)

    // step4: complete
    b.updateRequestStatus(req, "完成")

    b.Mu.Lock()
    delete(b.PendingRequests, req.TransferID)
    b.CompletedRequests[req.TransferID] = req
    b.Mu.Unlock()

    fmt.Printf("✅ %s 转移完成: %s\n", b.Name, req.TransferID)
}

// updateRequestStatus updates the status of a request and sends to status channel
func (b *BridgeService) updateRequestStatus(req *CrossChainTransferRequest, status string) {
    req.Status = status
    req.UpdatedAt = time.Now()
    b.StatusChan <- req
}

// StatusMonitor monitors status updates
func (b *BridgeService) StatusMonitor(wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("👀 %s 状态监控器启动\n", b.Name)

    for req := range b.StatusChan {
        fmt.Printf("📊 %s 状态更新: %s - %s\n",
            b.Name, req.TransferID, req.Status)
    }

    fmt.Printf("👀 %s 状态监控器停止\n", b.Name)
}

// SubmitTransferRequest submits a new transfer request to the bridge
func (b *BridgeService) SubmitTransferRequest(req *CrossChainTransferRequest) bool {
    req.CreatedAt = time.Now()
    req.Status = "已提交"

    select {
    case b.RequestChan <- req:
        fmt.Printf("📨 提交转移请求: %s (%s → %s, %.2f %s)\n",
            req.TransferID, req.SourceChain, req.TargetChain, req.Amount, req.Asset)
        return true
    case <-time.After(100 * time.Millisecond):
        fmt.Printf("⏰ 提交请求超时: %s\n", req.TransferID)
        return false
    }
}

// SimulateUser simulates a user using the bridge
func SimulateUser(userID string, bridge *BridgeService, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Printf("👤 用户 %s 开始使用跨链桥\n", userID)

    pairList := [][2]string{
        {"Ethereum", "Polygon"},
        {"Polygon", "Ethereum"},
        {"BSC", "Avalanche"},
        {"Avalanche", "BSC"},
    }
    assetList := []string{"ETH", "USDC", "USDT", "MATIC"}

    for i := 0; i < 4; i++ {
        pair := pairList[rand.Intn(len(pairList))]
        asset := assetList[rand.Intn(len(assetList))]
        amount := 10.0 + rand.Float64()*100

        req := &CrossChainTransferRequest{
            TransferID:  fmt.Sprintf("TX-%s-%d", userID, i+1),
            SourceChain: pair[0],
            TargetChain: pair[1],
            Asset:       asset,
            Amount:      amount,
            UserAddress: userID,
        }

        if bridge.SubmitTransferRequest(req) {
            time.Sleep(time.Duration(500+rand.Intn(1000)) * time.Millisecond)
        } else {
            time.Sleep(100 * time.Millisecond)
        }
    }

    fmt.Printf("👤 用户 %s 完成跨链操作\n", userID)
}

// BridgeManager manages multiple bridge services
type BridgeManager struct {
    BridgeServices []*BridgeService // 桥服务列表
}

// NewBridgeManager creates a new bridge manager
func NewBridgeManager() *BridgeManager {
    return &BridgeManager{
        BridgeServices: []*BridgeService{},
    }
}

// AddBridgeService adds a bridge service to the manager
func (m *BridgeManager) AddBridgeService(bridge *BridgeService) {
    m.BridgeServices = append(m.BridgeServices, bridge)
}

// GlobalMonitor monitors all bridges
func GlobalMonitor(bridges []*BridgeService, wg *sync.WaitGroup) {
    defer wg.Done()

    fmt.Println("🌐 全局监控器启动")

    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()

    for range ticker.C {
        fmt.Println("\n📈 跨链桥全局状态:")

        totalPending := 0
        totalCompleted := 0

        for _, b := range bridges {
            b.Mu.RLock()

            pending := len(b.PendingRequests)
            completed := len(b.CompletedRequests)

            totalPending += pending
            totalCompleted += completed

            fmt.Printf("%s: %d 处理中, %d 完成\n",
                b.Name, pending, completed)

            b.Mu.RUnlock()
        }

        fmt.Printf("总计: %d 处理中, %d 完成\n", totalPending, totalCompleted)

        // performance metrics
        if totalCompleted > 0 {
            completionRate := float64(totalCompleted) / float64(totalPending+totalCompleted) * 100
            fmt.Printf("完成率: %.1f%%\n", completionRate)
        }
    }
}

func main() {
    fmt.Println("🌉 跨链桥并发资产转移")
    fmt.Println("========================")

    rand.Seed(time.Now().UnixNano())

    // create bridge manager
    manager := NewBridgeManager()

    // create multiple bridge services
    polygonBridge := NewBridgeService("Polygon Bridge")
    polygonBridge.AddSupportedPair("Ethereum", "Polygon")
    polygonBridge.AddSupportedPair("Polygon", "Ethereum")

    avalancheBridge := NewBridgeService("Avalanche Bridge")
    avalancheBridge.AddSupportedPair("Ethereum", "Avalanche")
    avalancheBridge.AddSupportedPair("BSC", "Avalanche")
    avalancheBridge.AddSupportedPair("Avalanche", "BSC")

    arbitrumBridge := NewBridgeService("Arbitrum Bridge")
    arbitrumBridge.AddSupportedPair("Ethereum", "Arbitrum")
    arbitrumBridge.AddSupportedPair("Arbitrum", "Ethereum")

    manager.AddBridgeService(polygonBridge)
    manager.AddBridgeService(avalancheBridge)
    manager.AddBridgeService(arbitrumBridge)

    var wg sync.WaitGroup

    // start processors and monitors for all bridges
    for _, b := range manager.BridgeServices {
        wg.Add(1)
        go b.ProcessTransferRequests(&wg)

        wg.Add(1)
        go b.StatusMonitor(&wg)
    }

    // start global monitor
    wg.Add(1)
    go GlobalMonitor(manager.BridgeServices, &wg)

    // simulate multiple users using bridges concurrently
    users := []string{"Alice", "Bob", "Charlie", "David", "Eve"}

    for _, user := range users {
        wg.Add(1)
        // user randomly chooses a bridge service
        bridge := manager.BridgeServices[rand.Intn(len(manager.BridgeServices))]
        go SimulateUser(user, bridge, &wg)
    }

    // run for a while
    time.Sleep(20 * time.Second)

    // close all bridge channels
    for _, b := range manager.BridgeServices {
        close(b.RequestChan)
        close(b.StatusChan)
    }

    // wait for all goroutines to finish
    wg.Wait()

    // generate final report
    fmt.Println("\n📋 跨链桥最终报告:")
    generateBridgeReport(manager.BridgeServices)
}

func generateBridgeReport(bridges []*BridgeService) {
    totalVolume := 0.0
    totalRequests := 0
    successfulRequests := 0

    for _, b := range bridges {
        b.Mu.RLock()

        bridgeVolume := 0.0
        for _, req := range b.CompletedRequests {
            bridgeVolume += req.Amount
        }

        reqCount := len(b.CompletedRequests) + len(b.PendingRequests)
        successCount := len(b.CompletedRequests)

        totalVolume += bridgeVolume
        totalRequests += reqCount
        successfulRequests += successCount

        fmt.Printf("\n%s 报告:\n", b.Name)
        fmt.Printf("  总转移量: %.2f\n", bridgeVolume)
        fmt.Printf("  总请求数: %d\n", reqCount)
        fmt.Printf("  成功请求: %d\n", successCount)

        if reqCount > 0 {
            successRate := float64(successCount) / float64(reqCount) * 100
            fmt.Printf("  成功率: %.1f%%\n", successRate)
        }

        // chain pair statistics
        fmt.Println("  链对分布:")
        pairStats := make(map[string]int)
        for _, req := range b.CompletedRequests {
            pair := req.SourceChain + "→" + req.TargetChain
            pairStats[pair]++
        }
        for pair, count := range pairStats {
            fmt.Printf("    %s: %d\n", pair, count)
        }

        b.Mu.RUnlock()
    }

    fmt.Printf("\n📊 总体统计:\n")
    fmt.Printf("  总转移资产: %.2f\n", totalVolume)
    fmt.Printf("  总请求数: %d\n", totalRequests)
    fmt.Printf("  总成功数: %d\n", successfulRequests)

    if totalRequests > 0 {
        overallSuccessRate := float64(successfulRequests) / float64(totalRequests) * 100
        fmt.Printf("  总成功率: %.1f%%\n", overallSuccessRate)
        fmt.Printf("  平均每请求: %.2f\n", totalVolume/float64(successfulRequests))
    }
}

这些练习涵盖了并发编程在Web3.0中的主要应用场景,包括区块链数据同步、价格聚合、用户操作处理、事件处理和跨链转移。通过这些练习,你可以掌握Go语言并发编程的核心概念和在Web3.0开发中的实际应用。

每个练习都包含了:

  1. 详细的生活案例解释,帮助理解并发概念

  2. 完整的Web3.0应用场景,贴近实际开发需求

  3. 详细的代码实现,包含完整的错误处理和性能考虑

  4. 并发模式的最佳实践,如Channel使用、锁管理、WaitGroup等

这些知识将帮助你在实际的Web3.0项目中构建高性能、可靠的并发系统。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐