并发模式(Patterns):Worker Pool 与 Pipeline 模式实现
模式适用场景核心组件任务数量巨大,需要控制并发度,防止资源耗尽。Pipeline数据流处理(ETL),步骤之间解耦。Fan-Out/In某个处理步骤是性能瓶颈,需要并行加速。Merge 函数, WaitGroup。
"Primitives are easy; Architecture is hard." —— 学会了
go关键字只是第一步,如何组织它们才是工程能力的体现。
在前面的章节中,我们已经掌握了 Channel、Mutex、Atomic 和 Context 等基础原语。但在实际的微服务开发中,我们很少直接裸写 go func(),因为不受控的并发是生产环境的噩梦。
-
场景一:你需要处理 100 万张图片。如果你直接开 100 万个 Goroutine,服务器内存会瞬间被撑爆。
-
场景二:你需要从数据库读取数据,进行清洗,然后写入 Kafka。这是一个典型的流式处理,如何让代码解耦且高效?
今天,我们用基础原语构建出两个强大的并发模式:Worker Pool(工作池) 和 Pipeline(流水线)。
1. Worker Pool:控制并发的洪峰
Worker Pool 的核心思想是:限制 Goroutine 的数量,复用 Worker 来处理源源不断的任务。 这本质上是一种生产者-消费者模型。
1.1 架构图
graph LR
P[Producer] -->|Jobs Channel| W1[Worker 1]
P -->|Jobs Channel| W2[Worker 2]
P -->|Jobs Channel| W3[Worker 3]
W1 -->|Results Channel| C[Consumer]
W2 -->|Results Channel| C
W3 -->|Results Channel| C
1.2 核心实现代码
这是一个工业级的 Worker Pool 模板,它包含三个部分:Job(任务)、Result(结果)、以及核心的调度逻辑。
package main
import (
"fmt"
"sync"
"time"
)
// Job 代表一个任务
type Job struct {
ID int
Data string
}
// Result 代表任务的处理结果
type Result struct {
JobID int
Err error
}
// Worker:消费者逻辑
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
// 不断从 jobs channel 中读取任务
for job := range jobs {
fmt.Printf("Worker %d started job %d\n", id, job.ID)
// 模拟耗时操作
time.Sleep(time.Millisecond * 500)
// 发送结果
results <- Result{
JobID: job.ID,
Err: nil,
}
}
}
func main() {
const numJobs = 10
const numWorkers = 3 // 限制最大并发数为 3
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 1. 启动固定数量的 Workers
for w := 1; w <= numWorkers; w++ {
wg.Add(1)
go worker(w, jobs, results, &wg)
}
// 2. 生产者:发送任务
for j := 1; j <= numJobs; j++ {
jobs <- Job{ID: j, Data: "image.jpg"}
}
close(jobs) // 【关键】发送完必须关闭 channel,否则 worker 永远不会退出
// 3. 等待所有 worker 完成 (另起一个 goroutine 等待,为了不阻塞 main 读取结果)
go func() {
wg.Wait()
close(results) // 【关键】所有 worker 走完了,才能关闭 results
}()
// 4. 收集结果
for res := range results {
fmt.Printf("Result received for job %d\n", res.JobID)
}
}
关键细节解析:
-
Backpressure (背压):通过限制
numWorkers,我们保证了系统不会因为任务过多而崩溃。 -
Graceful Shutdown:通过
close(jobs)通知 Worker 退出range循环。 -
双重 Close:
-
主程关闭
jobs-> Worker 退出。 -
监控协程等待
wg.Wait()-> 关闭results-> 主程读取完毕退出。
-
2. Pipeline:数据流水的艺术
Pipeline 模式将一个复杂的任务拆分为多个步骤(Stage),每个步骤由一组 Goroutine 负责,通过 Channel 连接。这就像工厂的流水线。
场景:
-
Generator: 生成一组数字。
-
Squarer: 计算平方。
-
Printer: 打印结果。
2.1 核心实现代码
每个 Stage 都是一个函数,接收一个输入 Channel,返回一个输出 Channel。
package main
import (
"fmt"
)
// Stage 1: 生成器 (Generator)
// 将一组整数转换为 channel 流
func gen(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out) // 任务发完,关闭输出
}()
return out
}
// Stage 2: 处理逻辑 (Squarer)
// 从 in 读取,计算平方,写入 out
func sq(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out) // 上游关闭导致循环结束,我也关闭下游
}()
return out
}
func main() {
// 建立流水线
// 2 -> 3
// 4 -> 16
c := gen(2, 4, 6, 8) // c 是 Stage 1 的输出
out := sq(c) // out 是 Stage 2 的输出
// Stage 3: 消费结果
for res := range out {
fmt.Println(res)
}
}
3. Fan-Out / Fan-In:并行化流水线
如果 Pipeline 中某个步骤(比如 sq)特别慢,我们可以通过 Fan-Out(扇出) 启动多个 sq 协程来并行处理,然后再通过 Fan-In(扇入) 将结果合并到一个 Channel。
3.1 Fan-In (合并) 实现
这是面试中常考的高级模式:如何将多个 Channel 合并成一个?
func merge(cs ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入 channel 启动一个 output goroutine
output := func(c <-chan int) {
defer wg.Done()
for n := range c {
out <- n
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
// 启动一个监控协程,等所有输入都处理完,关闭 out
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := gen(2, 4, 6, 8)
// Fan-Out: 启动两个 sq worker 读取同一个 in
c1 := sq(in)
c2 := sq(in)
// Fan-In: 合并结果
for n := range merge(c1, c2) {
fmt.Println(n) // 顺序可能是乱的,但处理速度翻倍
}
}
4. 结合 Context:一键停止流水线
上面的 Pipeline 有一个隐患:如果下游(Consumer)因为出错退出了,不再读取 out channel,那么上游的 Goroutine 会因为试图写入满的 channel 而永久阻塞(Goroutine Leak)。
解决方案:引入 context.Context 或 done channel。
func sq(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for n := range in {
select {
case out <- n * n: // 正常写入
case <-ctx.Done(): // 上游喊停
return // 立即退出,防止泄露
}
}
}()
return out
}
工程原则:任何涉及 Channel 写入的 Goroutine,都必须考虑接收方提前退出的情况。
5. 总结
| 模式 | 适用场景 | 核心组件 |
|
Worker Pool |
任务数量巨大,需要控制并发度,防止资源耗尽。 |
Job Chan, Result Chan, WaitGroup |
|
Pipeline |
数据流处理(ETL),步骤之间解耦。 |
Input Chan -> Output Chan |
|
Fan-Out/In |
某个处理步骤是性能瓶颈,需要并行加速。 |
Merge 函数, WaitGroup |
更多推荐



所有评论(0)