gRPC 是服务间通信的事实标准。比 REST 更快,内置流式传输,并为每种主流语言提供类型安全的客户端。
Protocol Buffers
syntax = "proto3";
package order;
option go_package = "github.com/company/pb/order";
service OrderService {
rpc CreateOrder(CreateOrderRequest) returns (Order);
rpc ListOrders(ListOrdersRequest) returns (stream Order);
}
message CreateOrderRequest {
string customer_id = 1;
repeated OrderItem items = 2;
double total = 3;
}
message Order {
string id = 1;
string customer_id = 2;
OrderStatus status = 3;
double total = 4;
}
enum OrderStatus {
ORDER_STATUS_UNSPECIFIED = 0;
ORDER_STATUS_PENDING = 1;
ORDER_STATUS_CONFIRMED = 2;
}
服务端实现
type orderServer struct {
pb.UnimplementedOrderServiceServer
db *sqlx.DB
logger *zap.Logger
}
func (s *orderServer) CreateOrder(ctx context.Context, req *pb.CreateOrderRequest) (*pb.Order, error) {
if req.CustomerId == "" {
return nil, status.Error(codes.InvalidArgument, "customer_id required")
}
order, err := s.db.CreateOrder(ctx, req)
if err != nil {
s.logger.Error("create_order_failed", zap.Error(err))
return nil, status.Errorf(codes.Internal, "failed to create order")
}
return orderToProto(order), nil
}
// 服务端流式传输:适用于大数据集的高效传输
func (s *orderServer) ListOrders(req *pb.ListOrdersRequest, stream pb.OrderService_ListOrdersServer) error {
orders, err := s.db.ListOrders(stream.Context(), req.CustomerId)
if err != nil {
return status.Errorf(codes.Internal, "db error: %v", err)
}
for _, order := range orders {
if err := stream.Send(orderToProto(order)); err != nil {
return err // 客户端断开连接
}
}
return nil
}
链式拦截器
server := grpc.NewServer(
grpc.ChainUnaryInterceptor(
RecoveryInterceptor(),
LoggingInterceptor(logger),
MetricsInterceptor(registry),
AuthUnaryInterceptor(jwtKey),
),
)
带重试策略的客户端
conn, _ := grpc.Dial("order-service:50051",
grpc.WithTransportCredentials(credentials.NewClientTLSFromCert(nil, "")),
grpc.WithDefaultServiceConfig(`{
"methodConfig": [{
"name": [{"service": "order.OrderService"}],
"retryPolicy": {
"maxAttempts": 4,
"initialBackoff": "0.5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE"]
},
"timeout": "10s"
}]
}`),
)
client := pb.NewOrderServiceClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
order, err := client.CreateOrder(ctx, req)
健康检查
import "google.golang.org/grpc/health/grpc_health_v1"
type healthServer struct{}
func (h *healthServer) Check(ctx context.Context, req *grpc_health_v1.HealthCheckRequest) (*grpc_health_v1.HealthCheckResponse, error) {
return &grpc_health_v1.HealthCheckResponse{
Status: grpc_health_v1.HealthCheckResponse_SERVING,
}, nil
}
grpc_health_v1.RegisterHealthServer(server, &healthServer{})
gRPC 代码生成消除了整类集成错误。.proto 文件就是契约。
→ 使用 Base64 转换器 工具编码 protobuf 二进制数据。