
为什么 Go 的并发模型独树一帜
Go 从设计之初就将并发作为一等公民。它不是事后添加的库,也不是框架的附加组件——它被内置于语言语法和运行时中。
结果是一个与基于线程的语言截然不同的并发模型。其口号是:“不要通过共享内存来通信;而要通过通信来共享内存。”
本指南假设你已熟悉 Go 的基础知识。我们将深入探讨 goroutines、channels 以及经验丰富的 Go 开发者在生产环境中使用的模式。

Goroutines:不是线程
Goroutines 看起来像线程,但它们有本质区别:
package main
import (
"fmt"
"time"
)
func doWork(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(100 * time.Millisecond) // 模拟 I/O
fmt.Printf("Worker %d done\n", id)
}
func main() {
// 启动 1000 个 goroutines——没问题
for i := 0; i < 1000; i++ {
go doWork(i)
}
time.Sleep(500 * time.Millisecond) // 等待所有完成(坏模式——见 WaitGroup)
fmt.Println("All done")
}
Goroutines 与线程的区别:
| 操作系统线程 | Goroutines | |
|---|---|---|
| 栈大小 | 1-8MB(固定或增长) | 2-8KB(按需增长) |
| 创建开销 | ~1ms,系统调用 | ~1μs,仅运行时 |
| 典型限制 | ~1,000-10,000 | 数百万 |
| 调度 | 操作系统内核(抢占式) | Go 运行时(协作式 + 抢占式) |
Go 运行时将 goroutines 多路复用到操作系统线程上(M:N 调度)。当一个 goroutine 在 I/O 上阻塞时,运行时将其挂起并运行另一个——无需你编写任何回调或 async/await。
Channels:通信原语
Channel 是一个类型化的管道,goroutines 通过它进行通信:
// 创建 channels
ch := make(chan int) // 非缓冲——同步
ch := make(chan int, 100) // 缓冲——最多 100 个元素而不阻塞
// 发送和接收
ch <- 42 // 发送(如果非缓冲且没有接收者就绪则阻塞)
value := <-ch // 接收(阻塞直到有值可用)
// 关闭 channel(发送者信号不再有值)
close(ch)
// 遍历 channel(当 channel 关闭时退出)
for value := range ch {
fmt.Println(value)
}
// 检查 channel 是否已关闭
value, ok := <-ch
if !ok {
fmt.Println("Channel closed")
}
非缓冲 vs 缓冲
// 非缓冲:发送者和接收者必须同时就绪
func unbufferedExample() {
ch := make(chan int)
go func() {
fmt.Println("Sending...")
ch <- 42 // 阻塞直到接收者就绪
fmt.Println("Sent!")
}()
time.Sleep(1 * time.Second) // 模拟延迟
value := <-ch // 现在两者都就绪——解除发送者阻塞
fmt.Println("Received:", value)
}
// 缓冲:发送者可以继续而无需等待接收者
func bufferedExample() {
ch := make(chan int, 3) // 可容纳 3 个值
ch <- 1 // 立即返回(缓冲区未满)
ch <- 2
ch <- 3
// ch <- 4 // 会阻塞——缓冲区已满
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
fmt.Println(<-ch) // 3
}
select 语句
select 就像 channel 的 switch——它处理多个 channel 操作:
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(1 * time.Second)
ch1 <- "one"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "two"
}()
// 等待任一 channel 发送
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println("Received from ch1:", msg1)
case msg2 := <-ch2:
fmt.Println("Received from ch2:", msg2)
}
}
}
使用 select 进行非阻塞操作:
// 非阻塞接收
select {
case msg := <-ch:
fmt.Println("Received:", msg)
default:
fmt.Println("No message ready")
}
// 非阻塞发送
select {
case ch <- value:
fmt.Println("Sent successfully")
default:
fmt.Println("Channel full or no receiver")
}
超时模式:
func fetchWithTimeout(url string, timeout time.Duration) (string, error) {
resultCh := make(chan string, 1)
go func() {
result := fetchURL(url) // 可能很慢的操作
resultCh <- result
}()
select {
case result := <-resultCh:
return result, nil
case <-time.After(timeout):
return "", fmt.Errorf("request timed out after %v", timeout)
}
}
用于取消的 Context
context.Context 是 Go 中用于取消和截止时间的标准机制:
import (
"context"
"fmt"
"time"
)
func slowOperation(ctx context.Context) (string, error) {
select {
case <-time.After(5 * time.Second): // 模拟慢速工作
return "result", nil
case <-ctx.Done():
return "", ctx.Err() // context.Canceled 或 context.DeadlineExceeded
}
}
func main() {
// 带超时
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel() // 始终调用 cancel 以释放资源
result, err := slowOperation(ctx)
if err != nil {
fmt.Println("Error:", err) // "context deadline exceeded"
return
}
fmt.Println("Result:", result)
}
// 手动取消
func withCancel() {
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Second)
cancel() // 触发取消
}()
<-ctx.Done()
fmt.Println("Cancelled:", ctx.Err())
}
通过请求传播 context:
// HTTP 处理器将 context 传递给所有下游操作
func userHandler(w http.ResponseWriter, r *http.Request) {
ctx := r.Context() // 请求 context——当客户端断开连接时取消
user, err := db.GetUser(ctx, userID)
if err != nil {
if errors.Is(err, context.Canceled) {
// 客户端断开连接——无需响应
return
}
http.Error(w, err.Error(), 500)
return
}
json.NewEncoder(w).Encode(user)
}
// 数据库函数尊重 context
func (db *DB) GetUser(ctx context.Context, id int) (*User, error) {
var user User
err := db.pool.QueryRowContext(ctx,
"SELECT * FROM users WHERE id = $1", id,
).Scan(&user.ID, &user.Name, &user.Email)
return &user, err
}
并发模式
工作池
func workerPool(jobs <-chan Job, results chan<- Result, numWorkers int) {
var wg sync.WaitGroup
for i := 0; i < numWorkers; i++ {
wg.Add(1)
go func(workerID int) {
defer wg.Done()
for job := range jobs { // 接收任务直到 channel 关闭
result := processJob(job)
results <- result
}
}(i)
}
// 当所有 worker 完成时关闭 results
go func() {
wg.Wait()
close(results)
}()
}
func main() {
jobs := make(chan Job, 100)
results := make(chan Result, 100)
// 启动工作池
workerPool(jobs, results, 10) // 10 个并发 worker
// 发送任务
go func() {
for _, job := range getJobs() {
jobs <- job
}
close(jobs) // 信号:没有更多任务
}()
// 收集结果
for result := range results {
fmt.Println(result)
}
}
管道模式
// 管道:由 channel 连接的阶段链
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func filter(in <-chan int, predicate func(int) bool) <-chan int {
out := make(chan int)
go func() {
for n := range in {
if predicate(n) {
out <- n
}
}
close(out)
}()
return out
}
func main() {
// 管道:generate → square → filter even → print
numbers := generate(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
squared := square(numbers)
evenSquares := filter(squared, func(n int) bool { return n%2 == 0 })
for n := range evenSquares {
fmt.Println(n) // 4, 16, 36, 64, 100
}
}
扇出、扇入
// 扇出:一个输入 channel,多个处理 goroutines
// 扇入:合并多个 channel 为一个
func fanOut(in <-chan Work, numWorkers int) []<-chan Result {
outputs := make([]<-chan Result, numWorkers)
for i := 0; i < numWorkers; i++ {
outputs[i] = worker(in) // 每个 worker 从同一输入读取
}
return outputs
}
func fanIn(channels ...<-chan Result) <-chan Result {
var wg sync.WaitGroup
merged := make(chan Result, len(channels))
output := func(ch <-chan Result) {
defer wg.Done()
for result := range ch {
merged <- result
}
}
wg.Add(len(channels))
for _, ch := range channels {
go output(ch)
}
go func() {
wg.Wait()
close(merged)
}()
return merged
}
同步原语
有时共享状态是正确的工具:
import "sync"
// 用于保护共享数据的 Mutex
type SafeCounter struct {
mu sync.Mutex
count int
}
func (c *SafeCounter) Increment() {
c.mu.Lock()
defer c.mu.Unlock()
c.count++
}
func (c *SafeCounter) Value() int {
c.mu.RLock() // 读多写少场景使用 RWMutex
defer c.mu.RUnlock()
return c.count
}
// sync.Once——确保初始化只执行一次
var (
instance *Database
once sync.Once
)
func GetDB() *Database {
once.Do(func() {
instance = connectToDatabase() // 只调用一次
})
return instance
}
// sync.Map——并发 map(读多写少工作负载)
var cache sync.Map
cache.Store("key", "value")
value, ok := cache.Load("key")
cache.LoadOrStore("key", "default") // 原子检查并设置
cache.Delete("key")
// WaitGroup——等待 goroutine 完成
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func(n int) {
defer wg.Done()
doWork(n)
}(i)
}
wg.Wait() // 阻塞直到所有 goroutine 调用 Done()
竞态条件检测
Go 内置了竞态检测器:
go run -race main.go
go test -race ./...
// 这存在竞态条件:
counter := 0
for i := 0; i < 1000; i++ {
go func() {
counter++ // 竞态!多个 goroutines 同时写入
}()
}
// 修复 1:使用 sync/atomic
import "sync/atomic"
var counter int64
go func() {
atomic.AddInt64(&counter, 1) // 原子递增
}()
// 修复 2:使用 channel
counterCh := make(chan struct{}, 1)
go func() {
counterCh <- struct{}{}
counter++
<-counterCh
}()
// 修复 3:使用 mutex(如上所示)
Go 的并发模型鼓励以通信和所有权而非锁的角度思考。当你与模型对抗时——使用大量 mutex、挣扎于 goroutine 生命周期——这通常是一个信号,表明你应该退后一步,围绕 channel 重新设计。
→ 使用 Hash Text 工具计算哈希值以进行验证和安全保护。