正在加载,请稍候…

NestJS 微服务:构建可扩展的分布式系统

使用 NestJS 构建生产级微服务——消息代理、gRPC、事件溯源、服务发现及服务间通信模式。

NestJS 微服务:构建可扩展的分布式系统

NestJS 微服务架构

NestJS 通过传输层(从 TCP 到消息队列)为微服务提供了一流支持。

NestJS 微服务:构建可扩展的分布式系统 插图

设置微服务

npm install @nestjs/microservices
// main.ts - 微服务引导
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      options: {
        host: '0.0.0.0',
        port: 3001,
      },
    },
  );
  await app.listen();
  console.log('Microservice is listening on port 3001');
}
bootstrap();

消息模式

// users.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { UsersService } from './users.service';

@Controller()
export class UsersController {
  constructor(private readonly usersService: UsersService) {}

  @MessagePattern({ cmd: 'get_user' })
  async getUser(@Payload() data: { id: string }) {
    return this.usersService.findById(data.id);
  }

  @MessagePattern({ cmd: 'create_user' })
  async createUser(@Payload() createUserDto: CreateUserDto) {
    return this.usersService.create(createUserDto);
  }
}

RabbitMQ 传输

// app.module.ts - RabbitMQ 设置
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'ORDERS_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'orders_queue',
          queueOptions: { durable: false },
        },
      },
    ]),
  ],
})
export class AppModule {}
// 从客户端发送消息
import { Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';

export class OrdersService {
  constructor(
    @Inject('ORDERS_SERVICE') private ordersClient: ClientProxy,
  ) {}

  async createOrder(dto: CreateOrderDto) {
    return this.ordersClient.send({ cmd: 'create_order' }, dto).toPromise();
  }

  async emitOrderCreated(order: Order) {
    this.ordersClient.emit('order_created', order);
  }
}

gRPC 传输

// main.ts - gRPC 微服务
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.GRPC,
    options: {
      package: 'users',
      protoPath: join(__dirname, './users.proto'),
      url: 'localhost:5000',
    },
  },
);
// users.proto
syntax = "proto3";

package users;

service UsersService {
  rpc FindOne (UserById) returns (User);
  rpc FindAll (Empty) returns (UsersResponse);
}

message UserById {
  int32 id = 1;
}

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}

NestJS 微服务:构建可扩展的分布式系统 插图

基于 Redis Pub/Sub 的事件驱动

// 使用 Redis 实现事件驱动架构
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.REDIS,
    options: {
      host: 'localhost',
      port: 6379,
    },
  },
);

微服务中的异常过滤器

import { Catch, RpcExceptionFilter } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';

@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
  catch(exception: RpcException): Observable<any> {
    return throwError(() => exception.getError());
  }
}

健康检查

import { HealthCheckService, MicroserviceHealthIndicator } from '@nestjs/terminus';
import { Transport } from '@nestjs/microservices';

@Controller('health')
export class HealthController {
  constructor(
    private health: HealthCheckService,
    private microservice: MicroserviceHealthIndicator,
  ) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([
      () =>
        this.microservice.pingCheck('orders-service', {
          transport: Transport.TCP,
          options: { host: 'orders-service', port: 3001 },
        }),
    ]);
  }
}

服务间通信模式

NestJS 微服务:构建可扩展的分布式系统 插图

请求-响应

使用 send() 进行需要回复的同步操作。

即发即忘

使用 emit() 进行无需响应的事件。

Saga(分布式事务)

实现 Saga 模式处理多服务事务:

// 订单 saga:扣款 -> 预留库存 -> 确认订单
async createOrderSaga(orderId: string) {
  try {
    await this.paymentService.debit(orderId);
    await this.inventoryService.reserve(orderId);
    await this.ordersService.confirm(orderId);
  } catch (err) {
    // 补偿事务
    await this.inventoryService.release(orderId);
    await this.paymentService.refund(orderId);
    throw err;
  }
}

最佳实践

  • 使用 DTO + class-validator 处理所有服务间消息
  • 添加关联 ID 以跨服务追踪请求
  • 实现断路器,使用如 opossum 等库
  • 对 API 进行版本控制,支持滚动更新
  • 使用共享库 管理公共 DTO 和事件类型