正在加载,请稍候…

gRPC 与 Node.js 和 TypeScript:流式传输与 Protocol Buffers

使用 Node.js 和 TypeScript 构建高性能 gRPC 服务。学习 Protocol Buffers、服务定义、一元和流式 RPC、拦截器以及 g

gRPC 与 Node.js 和 TypeScript:流式传输与 Protocol Buffers

gRPC 与 Node.js 和 TypeScript

Protocol Buffer 定义

// orders.proto
syntax = "proto3";
package orders.v1;

import "google/protobuf/timestamp.proto";

service OrderService {
  // 一元 RPC
  rpc GetOrder(GetOrderRequest) returns (Order);
  rpc CreateOrder(CreateOrderRequest) returns (Order);

  // 服务端流式:实时订单更新
  rpc WatchOrder(WatchOrderRequest) returns (stream OrderEvent);

  // 客户端流式:批量导入
  rpc ImportOrders(stream CreateOrderRequest) returns (ImportResult);

  // 双向流式:实时聊天
  rpc LiveSupport(stream SupportMessage) returns (stream SupportMessage);
}

message Order {
  string id = 1;
  string customer_id = 2;
  OrderStatus status = 3;
  double total = 4;
  google.protobuf.Timestamp created_at = 5;
  repeated OrderItem items = 6;
}

enum OrderStatus {
  ORDER_STATUS_UNSPECIFIED = 0;
  ORDER_STATUS_PENDING = 1;
  ORDER_STATUS_CONFIRMED = 2;
  ORDER_STATUS_SHIPPED = 3;
  ORDER_STATUS_DELIVERED = 4;
}

gRPC 与 Node.js 和 TypeScript:流式传输与 Protocol Buffers 插图

TypeScript 服务端实现

import * as grpc from '@grpc/grpc-js';
import * as protoLoader from '@grpc/proto-loader';

const packageDef = protoLoader.loadSync('./protos/orders.proto', {
  keepCase: true,
  longs: String,
  enums: String,
  defaults: true,
});
const proto = grpc.loadPackageDefinition(packageDef) as any;

const orderServiceImpl = {
  // 一元
  async getOrder(call: grpc.ServerUnaryCall<any, any>, callback: grpc.sendUnaryData<any>) {
    try {
      const order = await orderRepo.findById(call.request.id);
      if (!order) {
        return callback({ code: grpc.status.NOT_FOUND, message: `Order ${call.request.id} not found` });
      }
      callback(null, orderToProto(order));
    } catch (err) {
      callback({ code: grpc.status.INTERNAL, message: 'Internal error' });
    }
  },

  // 服务端流式
  watchOrder(call: grpc.ServerWritableStream<any, any>) {
    const { order_id } = call.request;

    const unsubscribe = orderEvents.subscribe(order_id, (event) => {
      call.write({ event_type: event.type, order: orderToProto(event.order) });

      if (event.type === 'DELIVERED') {
        call.end();
        unsubscribe();
      }
    });

    call.on('cancelled', () => unsubscribe());
  },

  // 客户端流式
  async importOrders(call: grpc.ServerReadableStream<any, any>, callback: grpc.sendUnaryData<any>) {
    const orders: Order[] = [];

    call.on('data', async (req) => {
      orders.push(createOrderFromProto(req));
    });

    call.on('end', async () => {
      try {
        const imported = await orderRepo.bulkInsert(orders);
        callback(null, { imported_count: imported.length, failed_count: 0 });
      } catch (err) {
        callback({ code: grpc.status.INTERNAL });
      }
    });
  },
};

const server = new grpc.Server();
server.addService(proto.orders.v1.OrderService.service, orderServiceImpl);
server.bindAsync('0.0.0.0:50051', grpc.ServerCredentials.createInsecure(), () => {
  console.log('gRPC server running on port 50051');
  server.start();
});

gRPC 与 Node.js 和 TypeScript:流式传输与 Protocol Buffers 插图

TypeScript 客户端

const client = new proto.orders.v1.OrderService(
  'localhost:50051',
  grpc.credentials.createInsecure()
);

// 一元调用
const order = await new Promise<Order>((resolve, reject) => {
  client.getOrder({ id: 'order_123' }, (err: any, response: any) => {
    if (err) reject(err);
    else resolve(response);
  });
});

// 服务端流式
const stream = client.watchOrder({ order_id: 'order_123' });
stream.on('data', (event: any) => {
  console.log('Order event:', event.event_type, event.order.status);
});
stream.on('end', () => console.log('Stream ended'));
stream.on('error', (err: Error) => console.error('Stream error:', err));

gRPC 与 Node.js 和 TypeScript:流式传输与 Protocol Buffers 插图

拦截器(中间件)

// 日志拦截器
function loggingInterceptor(
  options: grpc.InterceptorOptions,
  nextCall: (options: grpc.InterceptorOptions) => grpc.InterceptingCall
): grpc.InterceptingCall {
  const start = Date.now();
  return new grpc.InterceptingCall(nextCall(options), {
    start(metadata, listener, next) {
      next(metadata, {
        onReceiveStatus(status, next) {
          console.log(`gRPC ${options.method_definition.path}: ${status.code} in ${Date.now()-start}ms`);
          next(status);
        },
      });
    },
  });
}

const client = new proto.orders.v1.OrderService(
  'localhost:50051',
  grpc.credentials.createInsecure(),
  { interceptors: [loggingInterceptor] }
);

与 REST 对比

gRPC:
  + 比 REST+JSON 快 2-7 倍(Protocol Buffers 二进制编码)
  + 强类型契约(proto 文件)
  + 内置流式支持
  + 所有语言代码生成
  - 浏览器支持需要 gRPC-Web 代理
  - 调试性较差(二进制格式)
  - 学习曲线较陡

REST:
  + 通用浏览器支持
  + 人类可读(JSON)
  + 更易调试
  - 无流式
  - 无原生类型安全

gRPC 非常适合内部微服务;在需要 gRPC 性能之前,公共 API 可使用 REST。