正在加载,请稍候…

使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡

使用 gRPC 构建生产级 Go 微服务:定义 protobuf 服务、实现一元和流式 RPC、编写认证和日志拦截器、使用状态码处理错误以及配置负载均衡。

使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡

使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡

gRPC 已成为 Go 微服务间通信的标准协议,通过 Protocol Buffers 提供强类型、双向流式传输和卓越性能。本指南涵盖从定义第一个 protobuf 服务到生产级负载均衡的所有内容。

为什么选择 gRPC 构建 Go 微服务

gRPC 相对于 REST 的优势:

  • 强类型:Protocol Buffers 在编译时强制模式
  • 性能:二进制序列化 vs JSON 文本
  • 流式传输:原生支持服务端、客户端和双向流
  • 代码生成:从 .proto 文件生成客户端和服务端
  • 拦截器:类似于 HTTP 中间件的中间件

使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡插图

定义 Protobuf 服务

// user.proto
syntax = "proto3";
package user.v1;
option go_package = "github.com/example/app/gen/user/v1;userv1";

import "google/protobuf/timestamp.proto";

message User {
  string id = 1;
  string name = 2;
  string email = 3;
  google.protobuf.Timestamp created_at = 4;
}

message GetUserRequest {
  string id = 1;
}

message CreateUserRequest {
  string name = 1;
  string email = 2;
}

message ListUsersRequest {
  int32 page_size = 1;
  string page_token = 2;
}

message ListUsersResponse {
  repeated User users = 1;
  string next_page_token = 2;
}

// 流式传输:监听用户更新
message WatchUsersRequest {
  repeated string ids = 1;
}

message UserEvent {
  enum Type {
    CREATED = 0;
    UPDATED = 1;
    DELETED = 2;
  }
  Type type = 1;
  User user = 2;
}

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (ListUsersResponse);
  
  // 服务端流式传输:服务端发送多个响应
  rpc WatchUsers(WatchUsersRequest) returns (stream UserEvent);
  
  // 客户端流式传输:客户端发送多个请求
  rpc BatchCreateUsers(stream CreateUserRequest) returns (ListUsersResponse);
  
  // 双向流式传输
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

生成 Go 代码:

protoc --go_out=. --go_opt=paths=source_relative \
       --go-grpc_out=. --go-grpc_opt=paths=source_relative \
       user.proto

实现服务端

package main

import (
    "context"
    "net"
    "log"
    
    "google.golang.org/grpc"
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    
    userv1 "github.com/example/app/gen/user/v1"
)

type UserServer struct {
    userv1.UnimplementedUserServiceServer
    store UserStore
}

func (s *UserServer) GetUser(ctx context.Context, req *userv1.GetUserRequest) (*userv1.User, error) {
    user, err := s.store.Get(ctx, req.Id)
    if err != nil {
        if errors.Is(err, ErrNotFound) {
            return nil, status.Errorf(codes.NotFound, "user %s not found", req.Id)
        }
        return nil, status.Errorf(codes.Internal, "failed to get user: %v", err)
    }
    return userToProto(user), nil
}

func (s *UserServer) WatchUsers(req *userv1.WatchUsersRequest, stream userv1.UserService_WatchUsersServer) error {
    ctx := stream.Context()
    eventCh := s.store.Subscribe(req.Ids)
    defer s.store.Unsubscribe(eventCh)
    
    for {
        select {
        case event, ok := <-eventCh:
            if !ok {
                return nil
            }
            if err := stream.Send(&userv1.UserEvent{
                Type: userv1.UserEvent_UPDATED,
                User: userToProto(event.User),
            }); err != nil {
                return err
            }
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

func (s *UserServer) BatchCreateUsers(stream userv1.UserService_BatchCreateUsersServer) error {
    var created []*userv1.User
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return status.Errorf(codes.Internal, "failed to receive: %v", err)
        }
        user, err := s.store.Create(stream.Context(), req.Name, req.Email)
        if err != nil {
            return status.Errorf(codes.Internal, "failed to create user: %v", err)
        }
        created = append(created, userToProto(user))
    }
    return stream.SendAndClose(&userv1.ListUsersResponse{Users: created})
}

func main() {
    lis, err := net.Listen("tcp", ":50051")
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    
    server := grpc.NewServer(
        grpc.ChainUnaryInterceptor(
            loggingInterceptor,
            authInterceptor,
            recoveryInterceptor,
        ),
        grpc.ChainStreamInterceptor(
            streamLoggingInterceptor,
        ),
    )
    
    userv1.RegisterUserServiceServer(server, &UserServer{})
    
    log.Println("gRPC server listening on :50051")
    if err := server.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡插图

gRPC 拦截器

拦截器为 gRPC 提供中间件功能:

// 一元拦截器用于日志记录
func loggingInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    start := time.Now()
    resp, err := handler(ctx, req)
    log.Printf("%s completed in %v, err: %v", info.FullMethod, time.Since(start), err)
    return resp, err
}

// 认证拦截器
func authInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (interface{}, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return nil, status.Error(codes.Unauthenticated, "missing metadata")
    }
    
    tokens := md.Get("authorization")
    if len(tokens) == 0 {
        return nil, status.Error(codes.Unauthenticated, "missing authorization token")
    }
    
    if !validateToken(tokens[0]) {
        return nil, status.Error(codes.Unauthenticated, "invalid token")
    }
    
    return handler(ctx, req)
}

// 恢复拦截器用于处理 panic
func recoveryInterceptor(
    ctx context.Context,
    req interface{},
    info *grpc.UnaryServerInfo,
    handler grpc.UnaryHandler,
) (resp interface{}, err error) {
    defer func() {
        if r := recover(); r != nil {
            log.Printf("panic in %s: %v", info.FullMethod, r)
            err = status.Errorf(codes.Internal, "internal server error")
        }
    }()
    return handler(ctx, req)
}

使用状态码处理错误

import (
    "google.golang.org/grpc/codes"
    "google.golang.org/grpc/status"
    "google.golang.org/genproto/googleapis/rpc/errdetails"
)

func (s *UserServer) CreateUser(ctx context.Context, req *userv1.CreateUserRequest) (*userv1.User, error) {
    // 带有详细错误信息的验证
    if req.Email == "" {
        st, _ := status.New(codes.InvalidArgument, "email is required").
            WithDetails(&errdetails.BadRequest{
                FieldViolations: []*errdetails.BadRequest_FieldViolation{
                    {Field: "email", Description: "email cannot be empty"},
                },
            })
        return nil, st.Err()
    }
    
    user, err := s.store.Create(ctx, req.Name, req.Email)
    if err != nil {
        if errors.Is(err, ErrEmailConflict) {
            return nil, status.Errorf(codes.AlreadyExists, "email %s already exists", req.Email)
        }
        return nil, status.Errorf(codes.Internal, "create user: %v", err)
    }
    return userToProto(user), nil
}

使用 gRPC 构建 Go 微服务:流式传输、拦截器与负载均衡插图

带指数退避的重试逻辑

import "google.golang.org/grpc/backoff"

func newClientWithRetry(addr string) (*grpc.ClientConn, error) {
    retryPolicy := `{
        "methodConfig": [{
            "name": [{"service": "user.v1.UserService"}],
            "retryPolicy": {
                "maxAttempts": 4,
                "initialBackoff": "0.1s",
                "maxBackoff": "1s",
                "backoffMultiplier": 2,
                "retryableStatusCodes": ["UNAVAILABLE", "RESOURCE_EXHAUSTED"]
            }
        }]
    }`
    
    return grpc.Dial(addr,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(retryPolicy),
    )
}

负载均衡

// 跨多个后端进行轮询负载均衡
func newLoadBalancedClient(endpoints []string) (*grpc.ClientConn, error) {
    // 使用基于 DNS 的服务发现
    return grpc.Dial(
        "dns:///user-service.default.svc.cluster.local:50051",
        grpc.WithTransportCredentials(insecure.NewCredentials()),
        grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"round_robin"}`),
    )
}

// 用于自定义服务发现的静态解析器
type StaticResolver struct {
    endpoints []string
}

func (r *StaticResolver) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    addrs := make([]resolver.Address, len(r.endpoints))
    for i, ep := range r.endpoints {
        addrs[i] = resolver.Address{Addr: ep}
    }
    cc.UpdateState(resolver.State{Addresses: addrs})
    return r, nil
}

健康检查

import "google.golang.org/grpc/health/grpc_health_v1"

type HealthServer struct{}

func (h *HealthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
    // 检查数据库、依赖项
    if err := checkDependencies(ctx); err != nil {
        return &grpc_health_v1.HealthCheckResponse{
            Status: grpc_health_v1.HealthCheckResponse_NOT_SERVING,
        }, nil
    }
    return &grpc_health_v1.HealthCheckResponse{
        Status: grpc_health_v1.HealthCheckResponse_SERVING,
    }, nil
}

func (h *HealthServer) Watch(req *grpc_health_v1.HealthCheckRequest, stream grpc_health_v1.Health_WatchServer) error {
    // 流式传输健康状态更新
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
    for {
        select {
        case <-ticker.C:
            stream.Send(&grpc_health_v1.HealthCheckResponse{
                Status: grpc_health_v1.HealthCheckResponse_SERVING,
            })
        case <-stream.Context().Done():
            return nil
        }
    }
}

结论

gRPC 与 Go 结合为微服务通信提供了强大的基础。Protocol Buffers 提供了类型安全和高效的序列化。拦截器处理认证和日志等横切关注点。内置的流式传输在大多数情况下消除了对 WebSocket 的需求。结合正确的错误码、重试逻辑和健康检查,您拥有构建生产级微服务基础设施所需的一切。