正在加载,请稍候…

Go 并发:Goroutines、Channels 与真正可扩展的模式

掌握 Go 并发:goroutines 与线程对比、channel(缓冲/非缓冲)、select 语句、同步原语、常见模式(扇出、管道、工作池)以及避免竞态条件

Go 并发:Goroutines、Channels 与真正可扩展的模式

为什么 Go 的并发模型独树一帜

Go 从设计之初就将并发作为一等公民。它不是事后添加的库,也不是框架的附加组件——它被内置于语言语法和运行时中。

结果是一个与基于线程的语言截然不同的并发模型。其口号是:“不要通过共享内存来通信;而要通过通信来共享内存。”

本指南假设你已熟悉 Go 的基础知识。我们将深入探讨 goroutines、channels 以及经验丰富的 Go 开发者在生产环境中使用的模式。

Go 并发:Goroutines、Channels 与真正可扩展的模式 插图

Goroutines:不是线程

Goroutines 看起来像线程,但它们有本质区别:

package main

import (
    "fmt"
    "time"
)

func doWork(id int) {
    fmt.Printf("Worker %d starting\n", id)
    time.Sleep(100 * time.Millisecond) // 模拟 I/O
    fmt.Printf("Worker %d done\n", id)
}

func main() {
    // 启动 1000 个 goroutines——没问题
    for i := 0; i < 1000; i++ {
        go doWork(i)
    }
    
    time.Sleep(500 * time.Millisecond) // 等待所有完成(坏模式——见 WaitGroup)
    fmt.Println("All done")
}

Goroutines 与线程的区别:

操作系统线程 Goroutines
栈大小 1-8MB(固定或增长) 2-8KB(按需增长)
创建开销 ~1ms,系统调用 ~1μs,仅运行时
典型限制 ~1,000-10,000 数百万
调度 操作系统内核(抢占式) Go 运行时(协作式 + 抢占式)

Go 运行时将 goroutines 多路复用到操作系统线程上(M:N 调度)。当一个 goroutine 在 I/O 上阻塞时,运行时将其挂起并运行另一个——无需你编写任何回调或 async/await。

Channels:通信原语

Channel 是一个类型化的管道,goroutines 通过它进行通信:

// 创建 channels
ch := make(chan int)         // 非缓冲——同步
ch := make(chan int, 100)    // 缓冲——最多 100 个元素而不阻塞

// 发送和接收
ch <- 42        // 发送(如果非缓冲且没有接收者就绪则阻塞)
value := <-ch   // 接收(阻塞直到有值可用)

// 关闭 channel(发送者信号不再有值)
close(ch)

// 遍历 channel(当 channel 关闭时退出)
for value := range ch {
    fmt.Println(value)
}

// 检查 channel 是否已关闭
value, ok := <-ch
if !ok {
    fmt.Println("Channel closed")
}

非缓冲 vs 缓冲

// 非缓冲:发送者和接收者必须同时就绪
func unbufferedExample() {
    ch := make(chan int)
    
    go func() {
        fmt.Println("Sending...")
        ch <- 42 // 阻塞直到接收者就绪
        fmt.Println("Sent!")
    }()
    
    time.Sleep(1 * time.Second) // 模拟延迟
    value := <-ch // 现在两者都就绪——解除发送者阻塞
    fmt.Println("Received:", value)
}

// 缓冲:发送者可以继续而无需等待接收者
func bufferedExample() {
    ch := make(chan int, 3) // 可容纳 3 个值
    
    ch <- 1 // 立即返回(缓冲区未满)
    ch <- 2
    ch <- 3
    // ch <- 4 // 会阻塞——缓冲区已满
    
    fmt.Println(<-ch) // 1
    fmt.Println(<-ch) // 2
    fmt.Println(<-ch) // 3
}

select 语句

select 就像 channel 的 switch——它处理多个 channel 操作:

func main() {
    ch1 := make(chan string)
    ch2 := make(chan string)
    
    go func() {
        time.Sleep(1 * time.Second)
        ch1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        ch2 <- "two"
    }()
    
    // 等待任一 channel 发送
    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-ch1:
            fmt.Println("Received from ch1:", msg1)
        case msg2 := <-ch2:
            fmt.Println("Received from ch2:", msg2)
        }
    }
}

Go 并发:Goroutines、Channels 与真正可扩展的模式 插图

使用 select 进行非阻塞操作:

// 非阻塞接收
select {
case msg := <-ch:
    fmt.Println("Received:", msg)
default:
    fmt.Println("No message ready")
}

// 非阻塞发送
select {
case ch <- value:
    fmt.Println("Sent successfully")
default:
    fmt.Println("Channel full or no receiver")
}

超时模式:

func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
    resultCh := make(chan string, 1)
    
    go func() {
        result := fetchURL(url) // 可能很慢的操作
        resultCh <- result
    }()
    
    select {
    case result := <-resultCh:
        return result, nil
    case <-time.After(timeout):
        return "", fmt.Errorf("request timed out after %v", timeout)
    }
}

用于取消的 Context

context.Context 是 Go 中用于取消和截止时间的标准机制:

import (
    "context"
    "fmt"
    "time"
)

func slowOperation(ctx context.Context) (string, error) {
    select {
    case <-time.After(5 * time.Second): // 模拟慢速工作
        return "result", nil
    case <-ctx.Done():
        return "", ctx.Err() // context.Canceled 或 context.DeadlineExceeded
    }
}

func main() {
    // 带超时
    ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    defer cancel() // 始终调用 cancel 以释放资源
    
    result, err := slowOperation(ctx)
    if err != nil {
        fmt.Println("Error:", err) // "context deadline exceeded"
        return
    }
    fmt.Println("Result:", result)
}

// 手动取消
func withCancel() {
    ctx, cancel := context.WithCancel(context.Background())
    
    go func() {
        time.Sleep(1 * time.Second)
        cancel() // 触发取消
    }()
    
    <-ctx.Done()
    fmt.Println("Cancelled:", ctx.Err())
}

通过请求传播 context:

// HTTP 处理器将 context 传递给所有下游操作
func userHandler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context() // 请求 context——当客户端断开连接时取消
    
    user, err := db.GetUser(ctx, userID)
    if err != nil {
        if errors.Is(err, context.Canceled) {
            // 客户端断开连接——无需响应
            return
        }
        http.Error(w, err.Error(), 500)
        return
    }
    
    json.NewEncoder(w).Encode(user)
}

// 数据库函数尊重 context
func (db *DB) GetUser(ctx context.Context, id int) (*User, error) {
    var user User
    err := db.pool.QueryRowContext(ctx, 
        "SELECT * FROM users WHERE id = $1", id,
    ).Scan(&user.ID, &user.Name, &user.Email)
    
    return &user, err
}

Go 并发:Goroutines、Channels 与真正可扩展的模式 插图

并发模式

工作池

func workerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {
    var wg sync.WaitGroup
    
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func(workerID int) {
            defer wg.Done()
            for job := range jobs { // 接收任务直到 channel 关闭
                result := processJob(job)
                results <- result
            }
        }(i)
    }
    
    // 当所有 worker 完成时关闭 results
    go func() {
        wg.Wait()
        close(results)
    }()
}

func main() {
    jobs := make(chan Job, 100)
    results := make(chan Result, 100)
    
    // 启动工作池
    workerPool(jobs, results, 10) // 10 个并发 worker
    
    // 发送任务
    go func() {
        for _, job := range getJobs() {
            jobs <- job
        }
        close(jobs) // 信号:没有更多任务
    }()
    
    // 收集结果
    for result := range results {
        fmt.Println(result)
    }
}

管道模式

// 管道:由 channel 连接的阶段链
func generate(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

func square(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

func filter(in <-chan int, predicate func(int) bool) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            if predicate(n) {
                out <- n
            }
        }
        close(out)
    }()
    return out
}

func main() {
    // 管道:generate → square → filter even → print
    numbers := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
    squared := square(numbers)
    evenSquares := filter(squared, func(n int) bool { return n%2 == 0 })
    
    for n := range evenSquares {
        fmt.Println(n) // 4, 16, 36, 64, 100
    }
}

扇出、扇入

// 扇出:一个输入 channel,多个处理 goroutines
// 扇入:合并多个 channel 为一个

func fanOut(in <-chan Work, numWorkers int) []<-chan Result {
    outputs := make([]<-chan Result, numWorkers)
    for i := 0; i < numWorkers; i++ {
        outputs[i] = worker(in) // 每个 worker 从同一输入读取
    }
    return outputs
}

func fanIn(channels ...<-chan Result) <-chan Result {
    var wg sync.WaitGroup
    merged := make(chan Result, len(channels))
    
    output := func(ch <-chan Result) {
        defer wg.Done()
        for result := range ch {
            merged <- result
        }
    }
    
    wg.Add(len(channels))
    for _, ch := range channels {
        go output(ch)
    }
    
    go func() {
        wg.Wait()
        close(merged)
    }()
    
    return merged
}

同步原语

有时共享状态是正确的工具:

import "sync"

// 用于保护共享数据的 Mutex
type SafeCounter struct {
    mu    sync.Mutex
    count int
}

func (c *SafeCounter) Increment() {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count++
}

func (c *SafeCounter) Value() int {
    c.mu.RLock() // 读多写少场景使用 RWMutex
    defer c.mu.RUnlock()
    return c.count
}

// sync.Once——确保初始化只执行一次
var (
    instance *Database
    once     sync.Once
)

func GetDB() *Database {
    once.Do(func() {
        instance = connectToDatabase() // 只调用一次
    })
    return instance
}

// sync.Map——并发 map(读多写少工作负载)
var cache sync.Map

cache.Store("key", "value")
value, ok := cache.Load("key")
cache.LoadOrStore("key", "default") // 原子检查并设置
cache.Delete("key")

// WaitGroup——等待 goroutine 完成
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
    wg.Add(1)
    go func(n int) {
        defer wg.Done()
        doWork(n)
    }(i)
}
wg.Wait() // 阻塞直到所有 goroutine 调用 Done()

竞态条件检测

Go 内置了竞态检测器:

go run -race main.go
go test -race ./...
// 这存在竞态条件:
counter := 0

for i := 0; i < 1000; i++ {
    go func() {
        counter++ // 竞态!多个 goroutines 同时写入
    }()
}

// 修复 1:使用 sync/atomic
import "sync/atomic"
var counter int64

go func() {
    atomic.AddInt64(&counter, 1) // 原子递增
}()

// 修复 2:使用 channel
counterCh := make(chan struct{}, 1)
go func() {
    counterCh <- struct{}{}
    counter++
    <-counterCh
}()

// 修复 3:使用 mutex(如上所示)

Go 的并发模型鼓励以通信和所有权而非锁的角度思考。当你与模型对抗时——使用大量 mutex、挣扎于 goroutine 生命周期——这通常是一个信号,表明你应该退后一步,围绕 channel 重新设计。

→ 使用 Hash Text 工具计算哈希值以进行验证和安全保护。