导航菜单

Goroutine

Goroutine

Goroutine 是 Go 语言中轻量级的并发执行单元。它是 Go 运行时管理的用户态线程(协程),初始栈大小仅 2KB,可动态伸缩至 GB 级别。使用 go 关键字即可启动一个 Goroutine,创建成本极低,一个程序中可以轻松运行数十万甚至数百万个 Goroutine。

go 关键字

基本用法

在函数或方法调用前加上 go 关键字,即可将其作为 Goroutine 并发执行:

package main

import (
    "fmt"
    "time"
)

func sayHello(name string) {
    for i := 0; i < 3; i++ {
        fmt.Printf("Hello, %s! (%d)\n", name, i)
        time.Sleep(100 * time.Millisecond)
    }
}

func main() {
    // 启动一个 Goroutine
    go sayHello("Alice")
    go sayHello("Bob")

    // 主 Goroutine 也需要等待,否则程序会直接退出
    time.Sleep(1 * time.Second)
    fmt.Println("主函数结束")
}

方法的 Goroutine

go 关键字也可以用于启动方法的 Goroutine:

package main

import (
    "fmt"
    "sync"
    "time"
)

type Worker struct {
    id int
}

func (w *Worker) doTask(task string) {
    fmt.Printf("Worker %d 开始处理: %s\n", w.id, task)
    time.Sleep(200 * time.Millisecond)
    fmt.Printf("Worker %d 完成: %s\n", w.id, task)
}

func main() {
    var wg sync.WaitGroup
    workers := []*Worker{{1}, {2}, {3}}

    for _, w := range workers {
        wg.Add(1)
        go func(worker *Worker) {
            defer wg.Done()
            worker.doTask("数据处理")
        }(w)
    }

    wg.Wait()
    fmt.Println("所有任务完成")
}

匿名函数启动 Goroutine

基本形式

使用匿名函数是启动 Goroutine 的常用方式,特别适合一次性任务:

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d 开始\n", id)
        }(i) // 将 i 作为参数传入
    }

    wg.Wait()
    fmt.Println("完成")
}

使用闭包传递参数

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup
    tasks := []string{"任务A", "任务B", "任务C"}

    for _, task := range tasks {
        wg.Add(1)
        go func(t string) {
            defer wg.Done()
            fmt.Printf("开始处理: %s\n", t)
            // 模拟工作
            // process(t)
            fmt.Printf("完成处理: %s\n", t)
        }(task) // 将 task 作为参数传入闭包
    }

    wg.Wait()
}

Goroutine 的生命周期

Goroutine 生命周期

Goroutine 从 go func() 调用开始创建,经历以下状态:创建 → 就绪(Runnable)→ 运行(Running)→ 等待(Waiting)→ 结束。当 Goroutine 的函数执行完毕(return)或调用了 runtime.Goexit() 时,Goroutine 终止。主 Goroutine(main 函数)退出会导致整个程序退出。

状态流转

     go func()              被调度器选中
  ┌──────────┐           ┌──────────┐
  │  Created  │────────→  │ Runnable │
  └──────────┘           └────┬─────┘

                         获得 P 和 M

                         ┌──────────┐
                         │ Running  │
                         └──┬───┬───┘
                            │   │
               Channel/锁    │   │   函数返回
               操作阻塞       │   │   runtime.Goexit()
                            ↓   ↓
                      ┌──────────┐    ┌──────────┐
                      │ Waiting  │    │  Dead    │
                      └──────────┘    └──────────┘

控制 Goroutine 退出

package main

import (
    "fmt"
    "sync"
    "time"
)

func worker(id int, stop <-chan struct{}, wg *sync.WaitGroup) {
    defer wg.Done()
    for {
        select {
        case <-stop:
            fmt.Printf("Worker %d 收到停止信号,退出\n", id)
            return
        default:
            fmt.Printf("Worker %d 工作中...\n", id)
            time.Sleep(300 * time.Millisecond)
        }
    }
}

func main() {
    stop := make(chan struct{})
    var wg sync.WaitGroup

    for i := 1; i <= 3; i++ {
        wg.Add(1)
        go worker(i, stop, &wg)
    }

    // 运行 2 秒后停止
    time.Sleep(2 * time.Second)
    close(stop)

    wg.Wait()
    fmt.Println("所有 Worker 已退出")
}

runtime.Goexit()

runtime.Goexit() 终止当前 Goroutine 的执行,但不会影响其他 Goroutine。与 os.Exit() 不同,Goexit() 只退出当前 Goroutine,且会执行 defer 语句:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func() {
        defer fmt.Println("defer: Goroutine 退出前执行")
        fmt.Println("Goroutine 开始工作")
        runtime.Goexit() // 终止当前 Goroutine
        fmt.Println("这行不会执行") // 不会打印
    }()

    // 主 Goroutine 等待
    runtime.Goexit() // 终止主 Goroutine,程序退出

    fmt.Println("这行也不会执行")
}
// 输出:
// Goroutine 开始工作
// defer: Goroutine 退出前执行

runtime.Gosched()

runtime.Gosched()

runtime.Gosched() 让当前 Goroutine 放弃当前时间片,将 CPU 让给其他等待运行的 Goroutine。它不会挂起当前 Goroutine,当前 Goroutine 仍然在运行队列中,会在调度器下次选中时继续执行。

package main

import (
    "fmt"
    "runtime"
)

func main() {
    go func() {
        for i := 0; i < 5; i++ {
            fmt.Println("Goroutine:", i)
        }
    }()

    // 让出 CPU,让上面的 Goroutine 有机会执行
    for i := 0; i < 5; i++ {
        runtime.Gosched()
        fmt.Println("Main:", i)
    }
}
// 可能的输出(交替进行):
// Goroutine: 0
// Main: 0
// Goroutine: 1
// Main: 1
// ...

runtime.GOMAXPROCS()

runtime.GOMAXPROCS(n)

runtime.GOMAXPROCS(n) 设置 Go 程序能同时使用的最大 CPU 核心数(即 P 的数量)。默认值为 runtime.NumCPU()。返回上一次的设置值。传入 0 表示只查询不修改。

package main

import (
    "fmt"
    "runtime"
    "time"
)

func cpuIntensive(id int) {
    start := time.Now()
    // CPU 密集型计算
    sum := 0
    for i := 0; i < 100_000_000; i++ {
        sum += i
    }
    fmt.Printf("Goroutine %d 完成, sum=%d, 耗时: %v\n", id, sum, time.Since(start))
}

func main() {
    fmt.Println("CPU 核心数:", runtime.NumCPU())

    // 查询当前设置(传入 0)
    fmt.Println("当前 GOMAXPROCS:", runtime.GOMAXPROCS(0))

    // 设置为 1(所有 Goroutine 串行执行)
    runtime.GOMAXPROCS(1)
    fmt.Println("设置为 1:", runtime.GOMAXPROCS(0))

    // 测试
    var wg sync.WaitGroup
    start := time.Now()

    for i := 0; i < 4; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            cpuIntensive(id)
        }(i)
    }
    wg.Wait()
    fmt.Printf("总耗时 (GOMAXPROCS=1): %v\n\n", time.Since(start))

    // 恢复默认值
    runtime.GOMAXPROCS(runtime.NumCPU())
}

容器环境中的 GOMAXPROCS

// 在容器中使用 uber-go/automaxprocs 自动设置
import _ "go.uber.org/automaxprocs"

func main() {
    // automaxprocs 会自动根据容器 cgroup 的 CPU 限制设置 GOMAXPROCS
    fmt.Println("GOMAXPROCS:", runtime.GOMAXPROCS(0))
}

Goroutine 泄漏问题

Goroutine 泄漏(Goroutine Leak)

Goroutine 泄漏是指 Goroutine 被阻塞且永远无法继续执行或退出的情况。泄漏的 Goroutine 会持续占用内存(至少 2KB 栈空间)和调度器资源。如果不加以控制,长时间运行的服务会因大量泄漏的 Goroutine 导致 OOM(内存溢出)。

常见的泄漏场景

1. Channel 无接收者导致阻塞

// ❌ Goroutine 泄漏:ch 永远不会被读取
func leak() chan int {
    ch := make(chan int)
    go func() {
        ch <- doWork() // 如果没有人读 ch,这个 Goroutine 永远阻塞
    }()
    return ch
}

2. Channel 无发送者导致阻塞

// ❌ Goroutine 泄漏:ch 永远不会有数据
func leak2() {
    ch := make(chan int)
    go func() {
        for val := range ch { // 如果没有人写 ch,这个 Goroutine 永远阻塞
            process(val)
        }
    }()
    // 忘记关闭 ch 或向 ch 发送数据
}

3. 向 nil Channel 操作

// ❌ 向 nil channel 发送,永远阻塞
var ch chan int // nil channel
ch <- 1        // 永远阻塞

// ❌ 从 nil channel 接收,永远阻塞
<-ch           // 永远阻塞

4. select 中只有一个永远不就绪的 case

// ❌ 永远阻塞
ch := make(chan int)
select {
case <-ch:
    // 只有这个 case,且 ch 永远不会有数据
}

5. 死锁

// ❌ 经典死锁
ch1 := make(chan int)
ch2 := make(chan int)

go func() {
    <-ch1  // 等待 ch1
    ch2 <- 1
}()

ch1 <- <-ch2  // 等待 ch2 的同时等待 ch1 发送,双方互相等待

修复泄漏的方法

// ✅ 正确做法:使用 context 控制生命周期
func safeWorker(ctx context.Context, results chan<- int, id int) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("Worker %d 退出: %v\n", id, ctx.Err())
            return
        default:
            results <- doWork(id)
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    results := make(chan int, 10)
    for i := 0; i < 3; i++ {
        go safeWorker(ctx, results, i)
    }

    // 5 秒后 context 超时,所有 Worker 会自动退出
    <-ctx.Done()
    fmt.Println("所有 Worker 已退出")
}

使用 runtime 检测泄漏

package main

import (
    "fmt"
    "runtime"
    "time"
)

func main() {
    fmt.Println("初始 Goroutine 数量:", runtime.NumGoroutine())

    // 模拟泄漏
    for i := 0; i < 10; i++ {
        go leakyFunction()
    }

    time.Sleep(100 * time.Millisecond)
    fmt.Println("泄漏后 Goroutine 数量:", runtime.NumGoroutine())
    // 应该看到数量增加了 10
}

func leakyFunction() {
    ch := make(chan int)
    go func() {
        <-ch // 永远阻塞
    }()
    // ch 从未被发送数据,也未被关闭
}

使用 pprof 排查泄漏

import (
    "net/http"
    _ "net/http/pprof"
)

func main() {
    // 启动 pprof HTTP 服务
    go http.ListenAndServe(":6060", nil)

    // 你的程序代码...
}

然后访问 http://localhost:6060/debug/pprof/goroutine?debug=1 可以查看所有 Goroutine 的堆栈信息。

练习题

练习 1:Goroutine 同步与控制

编写一个程序,启动 10 个 Goroutine,每个 Goroutine 打印自己的编号(0-9)。要求:

  1. 使用 sync.WaitGroup 等待所有 Goroutine 完成
  2. 打印顺序不一定按编号顺序(体现并发性)
  3. 使用匿名函数并正确处理循环变量
参考答案

代码

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    for i := 0; i < 10; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            fmt.Printf("Goroutine %d 正在执行\n", id)
        }(i) // 将 i 作为参数传入,避免闭包捕获问题
    }

    wg.Wait()
    fmt.Println("所有 Goroutine 完成")
}

可能输出(每次运行顺序不同):

Goroutine 3 正在执行
Goroutine 0 正在执行
Goroutine 7 正在执行
Goroutine 1 正在执行
Goroutine 9 正在执行
Goroutine 4 正在执行
Goroutine 2 正在执行
Goroutine 5 正在执行
Goroutine 8 正在执行
Goroutine 6 正在执行
所有 Goroutine 完成

关键点:将循环变量 i 作为参数 id 传入匿名函数,确保每个 Goroutine 捕获到的是当前迭代的值,而不是循环结束后变量的最终值。

练习 2:Goroutine 泄漏检测与修复

以下代码存在 Goroutine 泄漏问题。请指出泄漏原因,并使用 context 修复它,使 Goroutine 在超时后能够正确退出。

func monitor(ch <-chan string) {
    for msg := range ch {
        fmt.Println("收到消息:", msg)
    }
}

func main() {
    ch := make(chan string)
    go monitor(ch)
    time.Sleep(2 * time.Second)
    fmt.Println("主函数结束")
}
参考答案

泄漏原因monitor 函数中使用了 for range ch,这个循环会在 channel 被关闭时自动退出。但 main 函数中没有关闭 channel(也没有向 channel 发送任何数据),导致 monitor Goroutine 永远阻塞在 range ch,造成泄漏。

修复代码

package main

import (
    "context"
    "fmt"
    "time"
)

func monitor(ctx context.Context, ch <-chan string) {
    for {
        select {
        case msg, ok := <-ch:
            if !ok {
                fmt.Println("Channel 已关闭,monitor 退出")
                return
            }
            fmt.Println("收到消息:", msg)
        case <-ctx.Done():
            fmt.Println("超时退出:", ctx.Err())
            return
        }
    }
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel() // 确保资源释放

    ch := make(chan string)
    go monitor(ctx, ch)

    // 模拟发送一些消息
    go func() {
        for i := 0; i < 5; i++ {
            ch <- fmt.Sprintf("消息 %d", i)
            time.Sleep(500 * time.Millisecond)
        }
    }()

    // 等待 context 超时
    <-ctx.Done()
    fmt.Println("程序结束")
}

修复要点

  1. 引入 context.WithTimeout 设置超时
  2. monitor 函数中使用 select 同时监听 channel 和 context
  3. 当 context 超时或取消时,Goroutine 能够正确退出
  4. 使用 defer cancel() 确保资源释放

练习 3:Goroutine 数量监控

编写一个监控函数,每隔 1 秒打印当前的 Goroutine 数量,持续监控 10 秒。同时启动一批 Goroutine(100 个),每个 Goroutine 执行一个耗时 3 秒的任务。观察 Goroutine 数量的变化趋势。

参考答案

代码

package main

import (
    "fmt"
    "runtime"
    "sync"
    "time"
)

func monitorGoroutines(ctx context.Context) {
    ticker := time.NewTicker(1 * time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-ticker.C:
            count := runtime.NumGoroutine()
            fmt.Printf("[监控] 当前 Goroutine 数量: %d\n", count)
        case <-ctx.Done():
            count := runtime.NumGoroutine()
            fmt.Printf("[监控] 结束,最终 Goroutine 数量: %d\n", count)
            return
        }
    }
}

func worker(id int, wg *sync.WaitGroup) {
    defer wg.Done()
    time.Sleep(3 * time.Second) // 模拟耗时任务
}

func main() {
    ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
    defer cancel()

    // 启动监控
    go monitorGoroutines(ctx)

    fmt.Println("初始 Goroutine 数量:", runtime.NumGoroutine())

    // 批量启动 Worker
    var wg sync.WaitGroup
    fmt.Println("启动 100 个 Worker...")
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go worker(i, &wg)
    }

    fmt.Printf("启动后 Goroutine 数量: %d\n", runtime.NumGoroutine())

    // 等待所有 Worker 完成
    wg.Wait()
    fmt.Printf("所有 Worker 完成后 Goroutine 数量: %d\n", runtime.NumGoroutine())

    // 等待监控结束
    <-ctx.Done()
}

典型输出

初始 Goroutine 数量: 2
启动 100 个 Worker...
启动后 Goroutine 数量: 103
[监控] 当前 Goroutine 数量: 103
[监控] 当前 Goroutine 数量: 103
[监控] 当前 Goroutine 数量: 102   ← Worker 陆续完成
[监控] 当前 Goroutine 数量: 5     ← 大部分已完成
所有 Worker 完成后 Goroutine 数量: 3
[监控] 当前 Goroutine 数量: 2     ← 只剩监控和主 Goroutine
[监控] 当前 Goroutine 数量: 2
...
[监控] 结束,最终 Goroutine 数量: 1

分析:可以观察到 Goroutine 数量在 Worker 启动后急剧上升,在 3 秒后(Worker 任务完成)逐渐回落,验证了 Goroutine 的生命周期特性。

搜索