正在加载,请稍候…

Go 并发模式:Goroutines、Channels 和 sync 原语

掌握 Go 并发编程:goroutines、channels、select、WaitGroup、Mutex 和 context 取消。学习扇出/扇入、工作池和管

Go 并发模式:Goroutines、Channels 和 sync 原语

Go 并发模式

Go 的并发模型基于 goroutines 和 channels,使并发编程变得平易近人。本指南涵盖了生产级 Go 代码的基本模式。

Goroutines 和 Channels

// 基本 goroutine
go func() {
    fmt.Println("Running in background")
}()

// 通道通信
ch := make(chan int, 10) // 缓冲通道

go func() {
    for i := 0; i < 5; i++ {
        ch <- i
    }
    close(ch) // 信号:不再有值
}()

for v := range ch {
    fmt.Println(v)
}

Go 并发模式:Goroutines、Channels 和 sync 原语 插图

工作池模式

func workerPool(jobs []Job, numWorkers int) []Result {
    jobCh := make(chan Job, len(jobs))
    resultCh := make(chan Result, len(jobs))

    // 启动 workers
    var wg sync.WaitGroup
    for i := 0; i < numWorkers; i++ {
        wg.Add(1)
        go func() {
            defer wg.Done()
            for job := range jobCh {
                result := processJob(job)
                resultCh <- result
            }
        }()
    }

    // 发送任务
    for _, job := range jobs {
        jobCh <- job
    }
    close(jobCh)

    // 等待并关闭结果通道
    go func() {
        wg.Wait()
        close(resultCh)
    }()

    // 收集结果
    var results []Result
    for r := range resultCh {
        results = append(results, r)
    }
    return results
}

扇出 / 扇入

func fanOut(input <-chan int, numWorkers int) []<-chan int {
    outputs := make([]<-chan int, numWorkers)
    for i := 0; i < numWorkers; i++ {
        outputs[i] = process(input)
    }
    return outputs
}

func fanIn(channels ...<-chan int) <-chan int {
    merged := make(chan int)
    var wg sync.WaitGroup

    for _, ch := range channels {
        wg.Add(1)
        go func(c <-chan int) {
            defer wg.Done()
            for v := range c {
                merged <- v
            }
        }(ch)
    }

    go func() {
        wg.Wait()
        close(merged)
    }()

    return merged
}

Go 并发模式:Goroutines、Channels 和 sync 原语 插图

使用 Context 进行取消

func fetchWithTimeout(url string) ([]byte, error) {
    ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
    defer cancel()

    req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
    resp, err := http.DefaultClient.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    return io.ReadAll(resp.Body)
}

// 可取消的服务器处理函数
func handler(w http.ResponseWriter, r *http.Request) {
    ctx := r.Context()

    result := make(chan string, 1)
    go func() {
        // 模拟慢速工作
        time.Sleep(2 * time.Second)
        result <- "data"
    }()

    select {
    case data := <-result:
        fmt.Fprintln(w, data)
    case <-ctx.Done():
        http.Error(w, "Request cancelled", 499)
    }
}

Select 语句

func prioritizedSelect(highPriority, lowPriority <-chan Message) {
    for {
        // 先检查高优先级
        select {
        case msg := <-highPriority:
            handleUrgent(msg)
            continue
        default:
        }

        // 然后处理任何可用的
        select {
        case msg := <-highPriority:
            handleUrgent(msg)
        case msg := <-lowPriority:
            handleNormal(msg)
        case <-time.After(100 * time.Millisecond):
            // 超时:执行内务处理
            doHousekeeping()
        }
    }
}

Go 并发模式:Goroutines、Channels 和 sync 原语 插图

sync.Mutex 和 sync.RWMutex

type SafeCounter struct {
    mu    sync.RWMutex
    count map[string]int
}

func (c *SafeCounter) Increment(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.count[key]++
}

func (c *SafeCounter) Get(key string) int {
    c.mu.RLock() // 允许多个读取者
    defer c.mu.RUnlock()
    return c.count[key]
}

使用 errgroup 进行错误传播

import "golang.org/x/sync/errgroup"

func fetchAll(urls []string) ([][]byte, error) {
    g, ctx := errgroup.WithContext(context.Background())
    results := make([][]byte, len(urls))

    for i, url := range urls {
        i, url := i, url // 捕获循环变量
        g.Go(func() error {
            data, err := fetchURL(ctx, url)
            if err != nil {
                return fmt.Errorf("fetching %s: %w", url, err)
            }
            results[i] = data
            return nil
        })
    }

    if err := g.Wait(); err != nil {
        return nil, err
    }
    return results, nil
}

总结

Go 并发最佳实践:

  • 自由使用 goroutines,它们很轻量(约 2KB 栈)
  • 使用 channels 进行通信,使用 mutexes 保护共享状态
  • 始终使用 WaitGroup 或 context 管理 goroutine 生命周期
  • 使用 errgroup 在并发代码中传播错误
  • 仅从发送端关闭 channels