正在加载,请稍候…

NestJS 微服务:构建可扩展的服务架构

学习如何使用 NestJS 通过 TCP、Redis、Kafka 和 gRPC 传输层构建微服务,涵盖服务通信、消息模式和部署策略。

NestJS 微服务:构建可扩展的服务架构

NestJS 微服务:构建可扩展的服务

NestJS 为构建微服务提供了对多种传输层的一流支持。本指南涵盖了生产架构中的实用模式。

设置微服务

// main.ts - 微服务入口点
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';

async function bootstrap() {
  const app = await NestFactory.createMicroservice<MicroserviceOptions>(
    AppModule,
    {
      transport: Transport.TCP,
      options: {
        host: '0.0.0.0',
        port: 3001,
      },
    },
  );
  await app.listen();
  console.log('微服务正在监听');
}
bootstrap();

NestJS 微服务:构建可扩展的服务架构 插图

消息模式

// user.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, EventPattern, Payload } from '@nestjs/microservices';

@Controller()
export class UserController {
  constructor(private readonly userService: UserService) {}

  // 请求-响应模式
  @MessagePattern({ cmd: 'get_user' })
  async getUser(@Payload() data: { id: string }) {
    return this.userService.findById(data.id);
  }

  @MessagePattern({ cmd: 'create_user' })
  async createUser(@Payload() createUserDto: CreateUserDto) {
    return this.userService.create(createUserDto);
  }

  // 即发即弃事件模式
  @EventPattern('user.registered')
  async handleUserRegistered(@Payload() event: UserRegisteredEvent) {
    await this.userService.sendWelcomeEmail(event.userId);
  }
}

客户端代理

// api-gateway/app.module.ts
@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'USER_SERVICE',
        transport: Transport.TCP,
        options: { host: 'user-service', port: 3001 },
      },
      {
        name: 'ORDER_SERVICE',
        transport: Transport.TCP,
        options: { host: 'order-service', port: 3002 },
      },
    ]),
  ],
})
export class AppModule {}

// api-gateway/user.controller.ts
@Controller('users')
export class UserGatewayController {
  constructor(
    @Inject('USER_SERVICE') private readonly userService: ClientProxy,
  ) {}

  @Get(':id')
  getUser(@Param('id') id: string) {
    return this.userService.send({ cmd: 'get_user' }, { id });
  }

  @Post()
  createUser(@Body() dto: CreateUserDto) {
    return this.userService.send({ cmd: 'create_user' }, dto);
  }

  @Post(':id/deactivate')
  deactivateUser(@Param('id') id: string) {
    // 即发即弃
    this.userService.emit('user.deactivated', { userId: id });
    return { status: 'queued' };
  }
}

NestJS 微服务:构建可扩展的服务架构 插图

Kafka 传输

// 生产者微服务
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.KAFKA,
    options: {
      client: {
        brokers: ['kafka:9092'],
      },
      consumer: {
        groupId: 'order-consumer',
      },
    },
  },
);

// 消费者
@Controller()
export class OrderController {
  @MessagePattern('order.created')
  async handleOrderCreated(
    @Payload() message: any,
    @Ctx() context: KafkaContext,
  ) {
    const originalMessage = context.getMessage();
    const partition = context.getPartition();
    const topic = context.getTopic();

    console.log(`从 ${topic} [${partition}] 收到`);
    return this.orderService.process(message.value);
  }
}

Redis 传输用于发布/订阅

// 使用 Redis 作为消息代理
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
  AppModule,
  {
    transport: Transport.REDIS,
    options: {
      host: 'redis',
      port: 6379,
    },
  },
);

// 客户端模块设置
ClientsModule.register([
  {
    name: 'NOTIFICATION_SERVICE',
    transport: Transport.REDIS,
    options: { host: 'redis', port: 6379 },
  },
]),

NestJS 微服务:构建可扩展的服务架构 插图

微服务的异常过滤器

@Catch()
export class AllExceptionsFilter implements RpcExceptionFilter {
  catch(exception: any, host: ArgumentsHost): Observable<any> {
    const rpcException = new RpcException({
      statusCode: exception.status || 500,
      message: exception.message || '内部服务器错误',
    });
    return throwError(() => rpcException);
  }
}

// 全局应用
app.useGlobalFilters(new AllExceptionsFilter());

// 在客户端处理
this.userService.send({ cmd: 'get_user' }, { id }).pipe(
  catchError(err => {
    throw new HttpException(err.message, err.statusCode || 500);
  }),
);

健康检查

@Module({
  imports: [
    TerminusModule,
    HttpModule,
  ],
})
export class HealthModule {}

@Controller('health')
export class HealthController {
  constructor(
    private health: HealthCheckService,
    private http: HttpHealthIndicator,
    private readonly redis: MicroserviceHealthIndicator,
  ) {}

  @Get()
  @HealthCheck()
  check() {
    return this.health.check([
      () => this.http.pingCheck('user-service', 'http://user-service:3001/health'),
      () => this.redis.pingCheck('redis', {
        transport: Transport.REDIS,
        options: { host: 'redis', port: 6379 },
      }),
    ]);
  }
}

总结

NestJS 微服务提供:

  • 多种传输层:TCP、Redis、Kafka、gRPC、NATS
  • 开箱即用的请求-响应和事件模式
  • 类型安全的客户端代理 ClientProxy
  • 针对分布式场景的内置异常处理
  • 通过 Terminus 轻松集成健康检查