"Primitives are easy; Architecture is hard." —— 学会了 go 关键字只是第一步,如何组织它们才是工程能力的体现。

在前面的章节中,我们已经掌握了 Channel、Mutex、Atomic 和 Context 等基础原语。但在实际的微服务开发中,我们很少直接裸写 go func(),因为不受控的并发是生产环境的噩梦

  • 场景一:你需要处理 100 万张图片。如果你直接开 100 万个 Goroutine,服务器内存会瞬间被撑爆。

  • 场景二:你需要从数据库读取数据,进行清洗,然后写入 Kafka。这是一个典型的流式处理,如何让代码解耦且高效?

今天,我们用基础原语构建出两个强大的并发模式:Worker Pool(工作池)Pipeline(流水线)

1. Worker Pool:控制并发的洪峰

Worker Pool 的核心思想是:限制 Goroutine 的数量,复用 Worker 来处理源源不断的任务。 这本质上是一种生产者-消费者模型。

1.1 架构图

graph LR
    P[Producer] -->|Jobs Channel| W1[Worker 1]
    P -->|Jobs Channel| W2[Worker 2]
    P -->|Jobs Channel| W3[Worker 3]
    W1 -->|Results Channel| C[Consumer]
    W2 -->|Results Channel| C
    W3 -->|Results Channel| C

1.2 核心实现代码

这是一个工业级的 Worker Pool 模板,它包含三个部分:Job(任务)、Result(结果)、以及核心的调度逻辑。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Job 代表一个任务
type Job struct {
    ID  int
    Data string
}

// Result 代表任务的处理结果
type Result struct {
    JobID int
    Err   error
}

// Worker:消费者逻辑
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
    defer wg.Done()
    
    // 不断从 jobs channel 中读取任务
    for job := range jobs {
        fmt.Printf("Worker %d started job %d\n", id, job.ID)
        
        // 模拟耗时操作
        time.Sleep(time.Millisecond * 500)
        
        // 发送结果
        results <- Result{
            JobID: job.ID,
            Err:   nil,
        }
    }
}

func main() {
    const numJobs = 10
    const numWorkers = 3 // 限制最大并发数为 3

    jobs := make(chan Job, numJobs)
    results := make(chan Result, numJobs)
    
    var wg sync.WaitGroup

    // 1. 启动固定数量的 Workers
    for w := 1; w <= numWorkers; w++ {
        wg.Add(1)
        go worker(w, jobs, results, &wg)
    }

    // 2. 生产者:发送任务
    for j := 1; j <= numJobs; j++ {
        jobs <- Job{ID: j, Data: "image.jpg"}
    }
    close(jobs) // 【关键】发送完必须关闭 channel,否则 worker 永远不会退出

    // 3. 等待所有 worker 完成 (另起一个 goroutine 等待,为了不阻塞 main 读取结果)
    go func() {
        wg.Wait()
        close(results) // 【关键】所有 worker 走完了,才能关闭 results
    }()

    // 4. 收集结果
    for res := range results {
        fmt.Printf("Result received for job %d\n", res.JobID)
    }
}

关键细节解析

  1. Backpressure (背压):通过限制 numWorkers,我们保证了系统不会因为任务过多而崩溃。

  2. Graceful Shutdown:通过 close(jobs) 通知 Worker 退出 range 循环。

  3. 双重 Close

    • 主程关闭 jobs -> Worker 退出。

    • 监控协程等待 wg.Wait() -> 关闭 results -> 主程读取完毕退出。

2. Pipeline:数据流水的艺术

Pipeline 模式将一个复杂的任务拆分为多个步骤(Stage),每个步骤由一组 Goroutine 负责,通过 Channel 连接。这就像工厂的流水线。

场景

  1. Generator: 生成一组数字。

  2. Squarer: 计算平方。

  3. Printer: 打印结果。

2.1 核心实现代码

每个 Stage 都是一个函数,接收一个输入 Channel,返回一个输出 Channel

package main

import (
    "fmt"
)

// Stage 1: 生成器 (Generator)
// 将一组整数转换为 channel 流
func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out) // 任务发完,关闭输出
    }()
    return out
}

// Stage 2: 处理逻辑 (Squarer)
// 从 in 读取,计算平方,写入 out
func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out) // 上游关闭导致循环结束,我也关闭下游
    }()
    return out
}

func main() {
    // 建立流水线
    // 2 -> 3
    // 4 -> 16
    c := gen(2, 4, 6, 8) // c 是 Stage 1 的输出
    out := sq(c)         // out 是 Stage 2 的输出

    // Stage 3: 消费结果
    for res := range out {
        fmt.Println(res)
    }
}

3. Fan-Out / Fan-In:并行化流水线

如果 Pipeline 中某个步骤(比如 sq)特别慢,我们可以通过 Fan-Out(扇出) 启动多个 sq 协程来并行处理,然后再通过 Fan-In(扇入) 将结果合并到一个 Channel。

3.1 Fan-In (合并) 实现

这是面试中常考的高级模式:如何将多个 Channel 合并成一个?

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 为每个输入 channel 启动一个 output goroutine
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            out <- n
        }
    }

    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 启动一个监控协程,等所有输入都处理完,关闭 out
    go func() {
        wg.Wait()
        close(out)
    }()

    return out
}

func main() {
    in := gen(2, 4, 6, 8)

    // Fan-Out: 启动两个 sq worker 读取同一个 in
    c1 := sq(in)
    c2 := sq(in)

    // Fan-In: 合并结果
    for n := range merge(c1, c2) {
        fmt.Println(n) // 顺序可能是乱的,但处理速度翻倍
    }
}

4. 结合 Context:一键停止流水线

上面的 Pipeline 有一个隐患:如果下游(Consumer)因为出错退出了,不再读取 out channel,那么上游的 Goroutine 会因为试图写入满的 channel 而永久阻塞(Goroutine Leak)

解决方案:引入 context.Contextdone channel。

func sq(ctx context.Context, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n: // 正常写入
            case <-ctx.Done(): // 上游喊停
                return // 立即退出,防止泄露
            }
        }
    }()
    return out
}

工程原则:任何涉及 Channel 写入的 Goroutine,都必须考虑接收方提前退出的情况。

5. 总结

模式 适用场景 核心组件

Worker Pool

任务数量巨大,需要控制并发度,防止资源耗尽。

Job Chan, Result Chan, WaitGroup

Pipeline

数据流处理(ETL),步骤之间解耦。

Input Chan -> Output Chan

Fan-Out/In

某个处理步骤是性能瓶颈,需要并行加速。

Merge 函数, WaitGroup

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐