
使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡
gRPC 已成为 Go 微服务间通信的标准协议,通过 Protocol Buffers 提供强类型、双向流式传输和卓越性能。本指南涵盖从定义第一个 protobuf 服务到生产级负载均衡的所有内容。
为什么选择 gRPC 构建 Go 微服务
gRPC 相对于 REST 的优势:
- 强类型:Protocol Buffers 在编译时强制模式
- 性能:二进制序列化 vs JSON 文本
- 流式传输:原生支持服务端、客户端和双向流
- 代码生成:从 .proto 文件生成客户端和服务端
- 拦截器:类似于 HTTP 中间件的中间件

定义 Protobuf 服务
// user.proto
syntax = "proto3";
package user.v1;
option go_package = "github.com/example/app/gen/user/v1;userv1";
import "google/protobuf/timestamp.proto";
message User {
string id = 1;
string name = 2;
string email = 3;
google.protobuf.Timestamp created_at = 4;
}
message GetUserRequest {
string id = 1;
}
message CreateUserRequest {
string name = 1;
string email = 2;
}
message ListUsersRequest {
int32 page_size = 1;
string page_token = 2;
}
message ListUsersResponse {
repeated User users = 1;
string next_page_token = 2;
}
// 流式传输:监听用户更新
message WatchUsersRequest {
repeated string ids = 1;
}
message UserEvent {
enum Type {
CREATED = 0;
UPDATED = 1;
DELETED = 2;
}
Type type = 1;
User user = 2;
}
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc CreateUser(CreateUserRequest) returns (User);
rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
// 服务端流式传输:服务端发送多个响应
rpc WatchUsers(WatchUsersRequest) returns (stream UserEvent);
// 客户端流式传输:客户端发送多个请求
rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
// 双向流式传输
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
生成 Go 代码:
protoc --go_out=. --go_opt=paths=source_relative \
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
user.proto
实现服务端
package main
import (
"context"
"net"
"log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
userv1 "github.com/example/app/gen/user/v1"
)
type UserServer struct {
userv1.UnimplementedUserServiceServer
store UserStore
}
func (s *UserServer) GetUser(ctx context.Context, req *userv1.GetUserRequest) (*userv1.User, error) {
user, err := s.store.Get(ctx, req.Id)
if err != nil {
if errors.Is(err, ErrNotFound) {
return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
}
return nil, status.Errorf(codes.Internal, "failed to get user: %v", err)
}
return userToProto(user), nil
}
func (s *UserServer) WatchUsers(req *userv1.WatchUsersRequest, stream userv1.UserService_WatchUsersServer) error {
ctx := stream.Context()
eventCh := s.store.Subscribe(req.Ids)
defer s.store.Unsubscribe(eventCh)
for {
select {
case event, ok := <-eventCh:
if !ok {
return nil
}
if err := stream.Send(&userv1.UserEvent{
Type: userv1.UserEvent_UPDATED,
User: userToProto(event.User),
}); err != nil {
return err
}
case <-ctx.Done():
return ctx.Err()
}
}
}
func (s *UserServer) BatchCreateUsers(stream userv1.UserService_BatchCreateUsersServer) error {
var created []*userv1.User
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return status.Errorf(codes.Internal, "failed to receive: %v", err)
}
user, err := s.store.Create(stream.Context(), req.Name, req.Email)
if err != nil {
return status.Errorf(codes.Internal, "failed to create user: %v", err)
}
created = append(created, userToProto(user))
}
return stream.SendAndClose(&userv1.ListUsersResponse{Users: created})
}
func main() {
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
loggingInterceptor,
authInterceptor,
recoveryInterceptor,
),
grpc.ChainStreamInterceptor(
streamLoggingInterceptor,
),
)
userv1.RegisterUserServiceServer(server, &UserServer{})
log.Println("gRPC server listening on :50051")
if err := server.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

gRPC 拦截器
拦截器为 gRPC 提供中间件功能:
// 一元拦截器用于日志记录
func loggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("%s completed in %v, err: %v", info.FullMethod, time.Since(start), err)
return resp, err
}
// 认证拦截器
func authInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
tokens := md.Get("authorization")
if len(tokens) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing authorization token")
}
if !validateToken(tokens[0]) {
return nil, status.Error(codes.Unauthenticated, "invalid token")
}
return handler(ctx, req)
}
// 恢复拦截器用于处理 panic
func recoveryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("panic in %s: %v", info.FullMethod, r)
err = status.Errorf(codes.Internal, "internal server error")
}
}()
return handler(ctx, req)
}
使用状态码处理错误
import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/genproto/googleapis/rpc/errdetails"
)
func (s *UserServer) CreateUser(ctx context.Context, req *userv1.CreateUserRequest) (*userv1.User, error) {
// 带有详细错误信息的验证
if req.Email == "" {
st, _ := status.New(codes.InvalidArgument, "email is required").
WithDetails(&errdetails.BadRequest{
FieldViolations: []*errdetails.BadRequest_FieldViolation{
{Field: "email", Description: "email cannot be empty"},
},
})
return nil, st.Err()
}
user, err := s.store.Create(ctx, req.Name, req.Email)
if err != nil {
if errors.Is(err, ErrEmailConflict) {
return nil, status.Errorf(codes.AlreadyExists, "email %s already exists", req.Email)
}
return nil, status.Errorf(codes.Internal, "create user: %v", err)
}
return userToProto(user), nil
}

带指数退避的重试逻辑
import "google.golang.org/grpc/backoff"
func newClientWithRetry(addr string) (*grpc.ClientConn, error) {
retryPolicy := `{
"methodConfig": [{
"name": [{"service": "user.v1.UserService"}],
"retryPolicy": {
"maxAttempts": 4,
"initialBackoff": "0.1s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED"]
}
}]
}`
return grpc.Dial(addr,
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(retryPolicy),
)
}
负载均衡
// 跨多个后端进行轮询负载均衡
func newLoadBalancedClient(endpoints []string) (*grpc.ClientConn, error) {
// 使用基于 DNS 的服务发现
return grpc.Dial(
"dns:///user-service.default.svc.cluster.local:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
)
}
// 用于自定义服务发现的静态解析器
type StaticResolver struct {
endpoints []string
}
func (r *StaticResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
addrs := make([]resolver.Address, len(r.endpoints))
for i, ep := range r.endpoints {
addrs[i] = resolver.Address{Addr: ep}
}
cc.UpdateState(resolver.State{Addresses: addrs})
return r, nil
}
健康检查
import "google.golang.org/grpc/health/grpc_health_v1"
type HealthServer struct{}
func (h *HealthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
// 检查数据库、依赖项
if err := checkDependencies(ctx); err != nil {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
}, nil
}
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
func (h *HealthServer) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
// 流式传输健康状态更新
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
stream.Send(&grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
})
case <-stream.Context().Done():
return nil
}
}
}
结论
gRPC 与 Go 结合为微服务通信提供了强大的基础。Protocol Buffers 提供了类型安全和高效的序列化。拦截器处理认证和日志等横切关注点。内置的流式传输在大多数情况下消除了对 WebSocket 的需求。结合正确的错误码、重试逻辑和健康检查,您拥有构建生产级微服务基础设施所需的一切。