
Go 并发模式
Go 的并发模型基于 goroutines 和 channels,使并发编程变得平易近人。本指南涵盖了生产级 Go 代码的基本模式。
Goroutines 和 Channels
// 基本 goroutine
go func() {
fmt.Println("Running in background")
}()
// 通道通信
ch := make(chan int, 10) // 缓冲通道
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 信号:不再有值
}()
for v := range ch {
fmt.Println(v)
}
工作池模式
func workerPool(jobs []Job, numWorkers int) []Result {
jobCh := make(chan Job, len(jobs))
resultCh := make(chan Result, len(jobs))
// 启动 workers
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for job := range jobCh {
result := processJob(job)
resultCh <- result
}
}()
}
// 发送任务
for _, job := range jobs {
jobCh <- job
}
close(jobCh)
// 等待并关闭结果通道
go func() {
wg.Wait()
close(resultCh)
}()
// 收集结果
var results []Result
for r := range resultCh {
results = append(results, r)
}
return results
}
扇出 / 扇入
func fanOut(input <-chan int, numWorkers int) []<-chan int {
outputs := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = process(input)
}
return outputs
}
func fanIn(channels ...<-chan int) <-chan int {
merged := make(chan int)
var wg sync.WaitGroup
for _, ch := range channels {
wg.Add(1)
go func(c <-chan int) {
defer wg.Done()
for v := range c {
merged <- v
}
}(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
使用 Context 进行取消
func fetchWithTimeout(url string) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
req, _ := http.NewRequestWithContext(ctx, "GET", url, nil)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
// 可取消的服务器处理函数
func handler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
result := make(chan string, 1)
go func() {
// 模拟慢速工作
time.Sleep(2 * time.Second)
result <- "data"
}()
select {
case data := <-result:
fmt.Fprintln(w, data)
case <-ctx.Done():
http.Error(w, "Request cancelled", 499)
}
}
Select 语句
func prioritizedSelect(highPriority, lowPriority <-chan Message) {
for {
// 先检查高优先级
select {
case msg := <-highPriority:
handleUrgent(msg)
continue
default:
}
// 然后处理任何可用的
select {
case msg := <-highPriority:
handleUrgent(msg)
case msg := <-lowPriority:
handleNormal(msg)
case <-time.After(100 * time.Millisecond):
// 超时:执行内务处理
doHousekeeping()
}
}
}
sync.Mutex 和 sync.RWMutex
type SafeCounter struct {
mu sync.RWMutex
count map[string]int
}
func (c *SafeCounter) Increment(key string) {
c.mu.Lock()
defer c.mu.Unlock()
c.count[key]++
}
func (c *SafeCounter) Get(key string) int {
c.mu.RLock() // 允许多个读取者
defer c.mu.RUnlock()
return c.count[key]
}
使用 errgroup 进行错误传播
import "golang.org/x/sync/errgroup"
func fetchAll(urls []string) ([][]byte, error) {
g, ctx := errgroup.WithContext(context.Background())
results := make([][]byte, len(urls))
for i, url := range urls {
i, url := i, url // 捕获循环变量
g.Go(func() error {
data, err := fetchURL(ctx, url)
if err != nil {
return fmt.Errorf("fetching %s: %w", url, err)
}
results[i] = data
return nil
})
}
if err := g.Wait(); err != nil {
return nil, err
}
return results, nil
}
总结
Go 并发最佳实践:
- 自由使用 goroutines,它们很轻量(约 2KB 栈)
- 使用 channels 进行通信,使用 mutexes 保护共享状态
- 始终使用 WaitGroup 或 context 管理 goroutine 生命周期
- 使用
errgroup在并发代码中传播错误 - 仅从发送端关闭 channels