正在加载,请稍候…

gRPC 与 Go:Protocol Buffers、流式传输、拦截器及生产模式

使用 gRPC 和 Go 构建高性能微服务,涵盖 Protocol Buffers、流式 RPC、认证拦截器、重试策略、负载均衡和健康检查。

gRPC with Go: Protocol Buffers, Streaming, Interceptors, and Production Patterns

gRPC 是服务间通信的事实标准。比 REST 更快,内置流式传输,并为每种主流语言提供类型安全的客户端。

Protocol Buffers

syntax = "proto3";
package order;
option go_package = "github.com/company/pb/order";

service OrderService {
  rpc CreateOrder(CreateOrderRequest) returns (Order);
  rpc ListOrders(ListOrdersRequest) returns (stream Order);
}

message CreateOrderRequest {
  string customer_id = 1;
  repeated OrderItem items = 2;
  double total = 3;
}

message Order {
  string id = 1;
  string customer_id = 2;
  OrderStatus status = 3;
  double total = 4;
}

enum OrderStatus {
  ORDER_STATUS_UNSPECIFIED = 0;
  ORDER_STATUS_PENDING = 1;
  ORDER_STATUS_CONFIRMED = 2;
}

gRPC with Go: Protocol Buffers, Streaming, Interceptors, and Production Patterns illustration

服务端实现

type orderServer struct {
    pb.UnimplementedOrderServiceServer
    db     *sqlx.DB
    logger *zap.Logger
}

func (s *orderServer) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.Order, error) {
    if req.CustomerId == "" {
        return nil, status.Error(codes.InvalidArgument, "customer_id required")
    }
    order, err := s.db.CreateOrder(ctx, req)
    if err != nil {
        s.logger.Error("create_order_failed", zap.Error(err))
        return nil, status.Errorf(codes.Internal, "failed to create order")
    }
    return orderToProto(order), nil
}

// 服务端流式传输:适用于大数据集的高效传输
func (s *orderServer) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
    orders, err := s.db.ListOrders(stream.Context(), req.CustomerId)
    if err != nil {
        return status.Errorf(codes.Internal, "db error: %v", err)
    }
    for _, order := range orders {
        if err := stream.Send(orderToProto(order)); err != nil {
            return err  // 客户端断开连接
        }
    }
    return nil
}

gRPC with Go: Protocol Buffers, Streaming, Interceptors, and Production Patterns illustration

链式拦截器

server := grpc.NewServer(
    grpc.ChainUnaryInterceptor(
        RecoveryInterceptor(),
        LoggingInterceptor(logger),
        MetricsInterceptor(registry),
        AuthUnaryInterceptor(jwtKey),
    ),
)

gRPC with Go: Protocol Buffers, Streaming, Interceptors, and Production Patterns illustration

带重试策略的客户端

conn, _ := grpc.Dial("order-service:50051",
    grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
    grpc.WithDefaultServiceConfig(`{
        "methodConfig": [{
            "name": [{"service": "order.OrderService"}],
            "retryPolicy": {
                "maxAttempts": 4,
                "initialBackoff": "0.5s",
                "maxBackoff": "30s",
                "backoffMultiplier": 2,
                "retryableStatusCodes": ["UNAVAILABLE"]
            },
            "timeout": "10s"
        }]
    }`),
)
client := pb.NewOrderServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
order, err := client.CreateOrder(ctx, req)

健康检查

import "google.golang.org/grpc/health/grpc_health_v1"

type healthServer struct{}

func (h *healthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
    return &grpc_health_v1.HealthCheckResponse{
        Status: grpc_health_v1.HealthCheckResponse_SERVING,
    }, nil
}

grpc_health_v1.RegisterHealthServer(server, &healthServer{})

gRPC 代码生成消除了整类集成错误。.proto 文件就是契约。

→ 使用 Base64 转换器 工具编码 protobuf 二进制数据。