导航菜单

并发模式

Go 语言以其简洁的并发原语(goroutine + channel)著称,但如何将它们组合成可靠、高效的并发程序,则需要掌握一些经典的并发模式。这些模式是从大量实践中提炼出来的通用解决方案,能够帮助开发者避免常见的并发陷阱(竞态条件、goroutine 泄漏、死锁等),并构建出结构清晰、易于维护的并发系统。

本文将系统讲解 Go 中最常用的七种并发模式,每种模式都配有完整的代码示例。

Worker Pool(工作池模式)

Worker Pool(工作池)

工作池是一种限制并发 goroutine 数量的模式。系统预先创建固定数量的 worker goroutine,它们从一个共享的任务 channel 中读取任务并执行。当所有 worker 都忙时,新任务会在 channel 中排队等待。

package main

import (
	"fmt"
	"sync"
	"time"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d 开始处理任务 %d\n", id, job)
		time.Sleep(time.Millisecond * 500) // 模拟工作耗时
		results <- job * 2
		fmt.Printf("Worker %d 完成任务 %d,结果: %d\n", id, job, job*2)
	}
}

func main() {
	const numJobs = 10
	const numWorkers = 3

	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	var wg sync.WaitGroup

	// 启动固定数量的 worker
	for w := 1; w <= numWorkers; w++ {
		wg.Add(1)
		go worker(w, jobs, results, &wg)
	}

	// 发送任务
	for j := 1; j <= numJobs; j++ {
		jobs <- j
	}
	close(jobs) // 关闭任务 channel,worker 处理完后会自动退出 range 循环

	// 启动一个 goroutine 等待所有 worker 完成后关闭 results channel
	go func() {
		wg.Wait()
		close(results)
	}()

	// 收集结果
	for result := range results {
		fmt.Printf("收到结果: %d\n", result)
	}

	fmt.Println("所有任务处理完毕")
}

工作池模式要点

  • 固定 worker 数量:通常设置为 runtime.NumCPU() 或根据任务类型手动调整
  • 带缓冲的 channel:任务 channel 设置缓冲区大小,避免发送方阻塞
  • 优雅关闭:关闭 jobs channel 后,worker 的 range 循环会自动退出;使用 sync.WaitGroup 确保所有 worker 完成后再关闭 results channel
  • 适用场景:HTTP 请求处理、数据库批量操作、文件并行处理等

Pipeline(流水线模式)

Pipeline(流水线)

流水线模式将数据处理过程拆分为多个独立的阶段,每个阶段由一个或多个 goroutine 负责。阶段之间通过 channel 连接,前一个阶段的输出作为后一个阶段的输入,数据像流水线一样依次流经各阶段。

package main

import (
	"fmt"
)

// 阶段1:生成器——产生整数序列
func generator(nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		for _, n := range nums {
			out <- n
		}
		close(out)
	}()
	return out
}

// 阶段2:平方运算
func square(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			out <- n * n
		}
		close(out)
	}()
	return out
}

// 阶段3:过滤奇数
func filterOdd(in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		for n := range in {
			if n%2 == 0 {
				out <- n
			}
		}
		close(out)
	}()
	return out
}

func main() {
	// 构建流水线:生成 → 平方 → 过滤奇数
	nums := generator(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
	squared := square(nums)
	filtered := filterOdd(squared)

	// 消费最终结果
	for result := range filtered {
		fmt.Printf("结果: %d\n", result)
	}
	// 输出: 4, 16, 36, 64, 100
}

流水线模式要点

  • 单向 channel:每个阶段的输入输出 channel 使用 <-chanchan<- 明确方向,增强类型安全
  • 自动传播关闭:当上游 channel 关闭且数据全部读取完毕后,当前阶段处理完毕并关闭自己的输出 channel
  • 背压处理:无缓冲 channel 天然提供背压——下游处理慢时,上游会自动阻塞等待
  • 适用场景:数据 ETL 流程、图像处理管道、日志分析等

Fan-In / Fan-Out(扇入扇出模式)

Fan-In / Fan-Out(扇入扇出)
  • Fan-Out(扇出):将一个输入 channel 的数据分发到多个 worker goroutine 并行处理
  • Fan-In(扇入):将多个 goroutine 的输出 channel 合并为一个统一的输出 channel
package main

import (
	"fmt"
	"sync"
	"time"
)

// 模拟耗时计算
func heavyCompute(n int) int {
	time.Sleep(time.Millisecond * 200)
	return n * n
}

// Fan-Out: 从输入 channel 读取并分发给多个 worker
func fanOut(in <-chan int, numWorkers int) []<-chan int {
	var workers []<-chan int
	for i := 0; i < numWorkers; i++ {
		ch := make(chan int)
		workers = append(workers, ch)
		go func(workerCh chan<- int) {
			defer close(workerCh)
			for n := range in {
				result := heavyCompute(n)
				workerCh <- result
			}
		}(ch)
	}
	return workers
}

// Fan-In: 将多个 channel 合并为一个
func fanIn(channels ...<-chan int) <-chan int {
	var wg sync.WaitGroup
	out := make(chan int)

	// 为每个输入 channel 启动一个 goroutine
	multiplexer := func(ch <-chan int) {
		defer wg.Done()
		for v := range ch {
			out <- v
		}
	}

	wg.Add(len(channels))
	for _, ch := range channels {
		go multiplexer(ch)
	}

	// 所有输入 channel 关闭后,关闭输出 channel
	go func() {
		wg.Wait()
		close(out)
	}()

	return out
}

func main() {
	// 生成数据
	in := make(chan int)
	go func() {
		defer close(in)
		for i := 1; i <= 10; i++ {
			in <- i
		}
	}()

	// Fan-Out: 启动 4 个 worker 并行处理
	workers := fanOut(in, 4)

	// Fan-In: 合并所有 worker 的结果
	results := fanIn(workers...)

	// 消费结果
	for result := range results {
		fmt.Printf("处理结果: %d\n", result)
	}
}

扇入扇出模式要点

  • 并行加速:Fan-Out 的 worker 数量应根据任务类型调整——CPU 密集型任务适合 GOMAXPROCS,IO 密集型可以更多
  • 结果无序:多个 worker 并行处理,结果到达顺序与输入顺序不一定一致。如需保序,需要额外的排序或编号机制
  • 适用场景:并行网络爬虫、批量 API 调用、分布式任务处理

Future / Promise 模式

Future / Promise 模式

Future/Promise 模式是一种异步编程范式。发起一个异步操作后立即获得一个”Future”(代表未来某个时刻的结果),调用方可以在需要结果时阻塞等待,或继续做其他事情后再获取结果。在 Go 中,通常通过 channel 来实现这一模式。

package main

import (
	"fmt"
	"math/rand"
	"time"
)

// Future 是一个只读 channel,代表未来的结果
type Future[T any] <-chan T

// Promise 是一个可写的 channel,用于写入结果
type Promise[T any] chan T

// NewPromise 创建一个 Promise
func NewPromise[T any]() Promise[T] {
	return make(chan T, 1) // 缓冲为 1,写入不会阻塞
}

// Future 获取 Future 视图
func (p Promise[T]) Future() Future[T] {
	return p
}

// Resolve 写入结果
func (p Promise[T]) Resolve(value T) {
	p <- value
	close(p)
}

// AsyncCompute 异步执行计算,返回 Future
func AsyncCompute(a, b int) Future[int] {
	p := NewPromise[int]()
	go func() {
		// 模拟耗时计算
		time.Sleep(time.Duration(rand.Intn(500)+100) * time.Millisecond)
		result := a + b
		p.Resolve(result)
	}()
	return p.Future()
}

func main() {
	// 发起多个异步计算
	f1 := AsyncCompute(10, 20)
	f2 := AsyncCompute(30, 40)
	f3 := AsyncCompute(50, 60)

	fmt.Println("异步计算已发起,继续做其他事情...")

	// 需要结果时,从 Future 中读取(会阻塞直到结果就绪)
	fmt.Printf("结果1: %d\n", <-f1)
	fmt.Printf("结果2: %d\n", <-f2)
	fmt.Printf("结果3: %d\n", <-f3)
}

Future/Promise 模式要点

  • 带缓冲的 channel:缓冲区大小为 1,确保写入方不会阻塞
  • 泛型支持:Go 1.18+ 的泛型让 Future/Promise 可以用于任意类型
  • 组合使用:Future 可以与 select 组合实现超时、取消等高级功能
  • 适用场景:异步 HTTP 请求、并发数据库查询、任务编排

Timeout 与 Cancellation 模式

Timeout 与 Cancellation 模式

通过 select 语句配合 time.After(或 time.NewTimer)和 context.Context,实现对长时间运行操作的超时控制主动取消,防止 goroutine 无限期阻塞。

基于 select + time.After 的超时控制

package main

import (
	"fmt"
	"time"
)

func slowOperation() string {
	time.Sleep(2 * time.Second) // 模拟耗时操作
	return "操作完成"
}

func main() {
	ch := make(chan string)

	go func() {
		ch <- slowOperation()
	}()

	select {
	case result := <-ch:
		fmt.Println(result)
	case <-time.After(1 * time.Second):
		fmt.Println("操作超时!")
	}

	// 注意:此示例中 slowOperation 的 goroutine 仍然在运行
	// 生产环境中应使用 context.Context 来实现真正的取消
}

基于 context.Context 的优雅取消

package main

import (
	"context"
	"fmt"
	"time"
)

func slowOperationWithContext(ctx context.Context) (string, error) {
	resultCh := make(chan string, 1)
	errorCh := make(chan error, 1)

	go func() {
		// 模拟耗时操作,同时检查 context 是否已取消
		select {
		case <-time.After(3 * time.Second):
			resultCh <- "操作完成"
		case <-ctx.Done():
			errorCh <- ctx.Err()
		}
	}()

	select {
	case result := <-resultCh:
		return result, nil
	case err := <-errorCh:
		return "", err
	}
}

func main() {
	// 带超时的 context
	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
	defer cancel()

	result, err := slowOperationWithContext(ctx)
	if err != nil {
		fmt.Printf("操作失败: %v\n", err) // 输出: 操作失败: context deadline exceeded
		return
	}
	fmt.Println(result)
}

Timeout 与 Cancellation 模式要点

  • time.After 的隐患:每次调用都会创建新的 Timer,在循环中使用可能导致内存泄漏,推荐使用 time.NewTimer 并手动 Stop()
  • context.Context 是标准方案:Go 社区的推荐做法,支持超时(WithTimeout)、手动取消(WithCancel)、值传递(WithValue)和截止时间(WithDeadline
  • 传播取消信号:将 ctx 作为第一个参数传递给所有函数,形成取消信号的传播链
  • 适用场景:HTTP 请求超时、数据库查询取消、分布式任务协调

Rate Limiting(限流模式)

Rate Limiting(限流)

限流模式控制单位时间内的操作频率,防止系统因请求过多而过载。常见的限流算法有令牌桶(Token Bucket)、**滑动窗口(Sliding Window)漏桶(Leaky Bucket)**等。

基于 time.Ticker 的简单限流

package main

import (
	"fmt"
	"time"
)

func main() {
	// 每秒最多处理 3 个请求
	limiter := time.NewTicker(time.Second / 3)
	defer limiter.Stop()

	requests := []string{"A", "B", "C", "D", "E", "F", "G", "H"}

	for _, req := range requests {
		<-limiter.C // 等待令牌
		fmt.Printf("[%s] 处理请求 %s\n", time.Now().Format("15:04:05.000"), req)
	}
}

基于令牌桶的限流(使用 golang.org/x/time/rate)

package main

import (
	"context"
	"fmt"
	"golang.org/x/time/rate"
	"time"
)

func main() {
	// 每秒产生 2 个令牌,桶容量为 5(允许突发 5 个请求)
	limiter := rate.NewLimiter(2, 5)

	requests := []string{"A", "B", "C", "D", "E", "F", "G", "H"}

	for _, req := range requests {
		// Wait 阻塞等待直到获取到令牌
		err := limiter.Wait(context.Background())
		if err != nil {
			fmt.Printf("请求 %s 被拒绝: %v\n", req, err)
			continue
		}
		fmt.Printf("[%s] 处理请求 %s\n", time.Now().Format("15:04:05.000"), req)
	}

	// 使用 Allow 检查但不阻塞
	if limiter.Allow() {
		fmt.Println("请求被允许")
	} else {
		fmt.Println("请求被限流")
	}
}

限流模式要点

  • 令牌桶算法:以固定速率向桶中放入令牌,请求需要获取令牌才能执行。桶有最大容量,允许一定程度的突发流量
  • golang.org/x/time/rate:Go 官方扩展库提供的限流器,支持令牌桶算法,生产环境推荐使用
  • Wait() vs Allow()Wait() 阻塞等待令牌,Allow() 非阻塞地检查是否可以获得令牌
  • 适用场景:API 网关限流、数据库连接池控制、消息队列消费速率限制

生产者-消费者模型

生产者-消费者模型(Producer-Consumer)

生产者-消费者模型通过一个共享的缓冲队列(在 Go 中通常是 buffered channel)将数据的产生和消费解耦。生产者向 channel 写入数据,消费者从 channel 读取数据。两者可以独立地以不同的速率运行。

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

// Task 表示要处理的任务
type Task struct {
	ID   int
	Data string
}

// Producer 生产者:生成任务并发送到 channel
func producer(id int, tasks chan<- Task, wg *sync.WaitGroup) {
	defer wg.Done()
	for i := 0; i < 5; i++ {
		task := Task{
			ID:   id*100 + i,
			Data: fmt.Sprintf("Producer-%d 的第 %d 个任务", id, i+1),
		}
		tasks <- task
		fmt.Printf("[Producer-%d] 生产任务: %d\n", id, task.ID)
		time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
	}
}

// Consumer 消费者:从 channel 读取任务并处理
func consumer(id int, tasks <-chan Task, wg *sync.WaitGroup) {
	defer wg.Done()
	for task := range tasks {
		fmt.Printf("[Consumer-%d] 消费任务: %d, 数据: %s\n", id, task.ID, task.Data)
		time.Sleep(time.Duration(rand.Intn(500)+200) * time.Millisecond) // 模拟处理耗时
	}
}

func main() {
	const bufferSize = 10
	tasks := make(chan Task, bufferSize) // 带缓冲的 channel 充当队列

	var wg sync.WaitGroup

	// 启动 2 个生产者
	for i := 1; i <= 2; i++ {
		wg.Add(1)
		go producer(i, tasks, &wg)
	}

	// 启动 3 个消费者
	for i := 1; i <= 3; i++ {
		wg.Add(1)
		go consumer(i, tasks, &wg)
	}

	// 等待所有生产者完成
	// 注意:这里需要分离生产者和消费者的 WaitGroup
	// 上面的简化示例中,消费者依赖 tasks channel 的关闭来退出
	// 实际项目推荐使用两个独立的 WaitGroup
	go func() {
		wg.Wait()
		close(tasks)
	}()

	// 等待消费者处理完所有任务
	// (此处简化处理,实际需要单独追踪消费者的完成状态)
	time.Sleep(3 * time.Second)
	fmt.Println("程序结束")
}

更健壮的生产者-消费者实现

package main

import (
	"fmt"
	"math/rand"
	"sync"
	"time"
)

type Task struct {
	ID   int
	Data string
}

func main() {
	const bufferSize = 10
	tasks := make(chan Task, bufferSize)

	var producerWg, consumerWg sync.WaitGroup

	// 启动 2 个生产者
	for i := 1; i <= 2; i++ {
		producerWg.Add(1)
		go func(id int) {
			defer producerWg.Done()
			for j := 0; j < 5; j++ {
				tasks <- Task{ID: id*100 + j, Data: fmt.Sprintf("Task-%d", id*100+j)}
				time.Sleep(time.Duration(rand.Intn(200)+50) * time.Millisecond)
			}
		}(i)
	}

	// 生产者全部完成后关闭 channel
	go func() {
		producerWg.Wait()
		close(tasks)
	}()

	// 启动 3 个消费者
	for i := 1; i <= 3; i++ {
		consumerWg.Add(1)
		go func(id int) {
			defer consumerWg.Done()
			for task := range tasks {
				fmt.Printf("[Consumer-%d] 处理: %+v\n", id, task)
				time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
			}
		}(i)
	}

	// 等待所有消费者完成
	consumerWg.Wait()
	fmt.Println("所有任务处理完毕")
}

生产者-消费者模型要点

  • 缓冲 channel 解耦:缓冲区大小决定了生产者和消费者之间的解耦程度——缓冲区越大,生产者越不容易被阻塞
  • 关闭 channel 的时机只应由生产者关闭 channel,且必须确保所有生产者都已完成
  • 独立 WaitGroup:生产者和消费者应使用各自独立的 WaitGroup,避免混淆
  • 适用场景:日志收集系统、消息队列、任务调度系统、事件驱动架构

练习题

练习 1:实现一个带限流的工作池

结合 Worker Pool 和 Rate Limiting 模式,实现一个工作池:启动 5 个 worker,同时限制任务启动速率不超过每秒 3 个,总共处理 20 个任务。

参考答案
package main

import (
	"context"
	"fmt"
	"sync"
	"time"

	"golang.org/x/time/rate"
)

func worker(id int, jobs <-chan int, results chan<- int, wg *sync.WaitGroup) {
	defer wg.Done()
	for job := range jobs {
		fmt.Printf("Worker %d 处理任务 %d\n", id, job)
		time.Sleep(300 * time.Millisecond)
		results <- job * 2
	}
}

func main() {
	const numJobs = 20
	const numWorkers = 5

	jobs := make(chan int, numJobs)
	results := make(chan int, numJobs)

	// 限流器:每秒 3 个任务
	limiter := rate.NewLimiter(3, 3)

	// 启动 workers
	var workerWg sync.WaitGroup
	for w := 1; w <= numWorkers; w++ {
		workerWg.Add(1)
		go worker(w, jobs, results, &workerWg)
	}

	go func() {
		workerWg.Wait()
		close(results)
	}()

	// 以限流速率发送任务
	go func() {
		for j := 1; j <= numJobs; j++ {
			limiter.Wait(context.Background())
			jobs <- j
			fmt.Printf("发送任务 %d\n", j)
		}
		close(jobs)
	}()

	for r := range results {
		fmt.Printf("收到结果: %d\n", r)
	}
	fmt.Println("全部完成")
}

这个实现将 Worker Pool 和 Rate Limiting 结合:limiter.Wait() 确保任务发送速率不超过每秒 3 个,而 5 个 worker 可以并行处理这些任务。

练习 2:实现带超时的 Pipeline

构建一个三阶段 Pipeline:生成器产生 1~20 的整数 → 计算阶段对每个数执行耗时 200ms 的平方运算 → 消费阶段打印结果。要求使用 context.WithTimeout 设置总超时时间为 2 秒,超时后优雅停止所有阶段。

参考答案
package main

import (
	"context"
	"fmt"
	"time"
)

func generator(ctx context.Context, nums ...int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for _, n := range nums {
			select {
			case out <- n:
			case <-ctx.Done():
				fmt.Println("生成器:收到取消信号")
				return
			}
		}
	}()
	return out
}

func square(ctx context.Context, in <-chan int) <-chan int {
	out := make(chan int)
	go func() {
		defer close(out)
		for n := range in {
			select {
			case <-ctx.Done():
				fmt.Println("计算阶段:收到取消信号")
				return
			case <-time.After(200 * time.Millisecond):
				select {
				case out <- n * n:
				case <-ctx.Done():
					fmt.Println("计算阶段:发送时收到取消信号")
					return
				}
			}
		}
	}()
	return out
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	nums := make([]int, 20)
	for i := range nums {
		nums[i] = i + 1
	}

	stage1 := generator(ctx, nums...)
	stage2 := square(ctx, stage1)

	count := 0
	for result := range stage2 {
		fmt.Printf("结果: %d\n", result)
		count++
	}
	fmt.Printf("共处理 %d 个结果\n", count)
}

关键点:每个阶段都通过 select 监听 ctx.Done(),超时后逐级传播取消信号,实现优雅关闭。

练习 3:实现 Future 的错误处理

扩展示例中的 AsyncCompute Future/Promise 模式,使其能够处理错误情况。定义一个 Result[T] 类型,同时包含结果值和错误信息。实现 AsyncDivide(a, b float64) 异步除法函数,当除数为 0 时返回错误。

参考答案
package main

import (
	"fmt"
	"math/rand"
	"time"
)

// Result 包含值和错误
type Result[T any] struct {
	Value T
	Err   error
}

type Future[T any] <-chan Result[T]
type Promise[T any] chan Result[T]

func NewPromise[T any]() Promise[T] {
	return make(chan Result[T], 1)
}

func (p Promise[T]) Future() Future[T] {
	return p
}

func (p Promise[T]) Resolve(value T, err error) {
	p <- Result[T]{Value: value, Err: err}
	close(p)
}

// 异步除法
func AsyncDivide(a, b float64) Future[float64] {
	p := NewPromise[float64]()
	go func() {
		time.Sleep(time.Duration(rand.Intn(300)+100) * time.Millisecond)
		if b == 0 {
			p.Resolve(0, fmt.Errorf("除数不能为零"))
			return
		}
		p.Resolve(a/b, nil)
	}()
	return p.Future()
}

func main() {
	divisions := []struct{ a, b float64 }{
		{10, 3}, {20, 4}, {5, 0}, {100, 10}, {8, 0},
	}

	var futures []Future[float64]
	for _, d := range divisions {
		futures = append(futures, AsyncDivide(d.a, d.b))
	}

	for i, f := range futures {
		result := <-f
		if result.Err != nil {
			fmt.Printf("除法 %d 失败: %v\n", i+1, result.Err)
		} else {
			fmt.Printf("除法 %d 结果: %.2f\n", i+1, result.Value)
		}
	}
}

通过泛型 Result[T] 结构体统一封装值和错误,使 Future 模式能正确处理异常情况,符合 Go 的错误处理惯例。

Go 的并发原语虽然简单(goroutine + channel),但通过组合它们可以实现强大而灵活的并发模式。掌握这些模式不仅能让代码更健壮、更高效,还能帮助你更好地理解并发编程的核心思想。

搜索