
NestJS 微服务架构
NestJS 通过传输层(从 TCP 到消息队列)为微服务提供了一流支持。
设置微服务
npm install @nestjs/microservices
// main.ts - 微服务引导
import { NestFactory } from '@nestjs/core';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.TCP,
options: {
host: '0.0.0.0',
port: 3001,
},
},
);
await app.listen();
console.log('Microservice is listening on port 3001');
}
bootstrap();
消息模式
// users.controller.ts
import { Controller } from '@nestjs/common';
import { MessagePattern, Payload } from '@nestjs/microservices';
import { UsersService } from './users.service';
@Controller()
export class UsersController {
constructor(private readonly usersService: UsersService) {}
@MessagePattern({ cmd: 'get_user' })
async getUser(@Payload() data: { id: string }) {
return this.usersService.findById(data.id);
}
@MessagePattern({ cmd: 'create_user' })
async createUser(@Payload() createUserDto: CreateUserDto) {
return this.usersService.create(createUserDto);
}
}
RabbitMQ 传输
// app.module.ts - RabbitMQ 设置
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
@Module({
imports: [
ClientsModule.register([
{
name: 'ORDERS_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'orders_queue',
queueOptions: { durable: false },
},
},
]),
],
})
export class AppModule {}
// 从客户端发送消息
import { Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
export class OrdersService {
constructor(
@Inject('ORDERS_SERVICE') private ordersClient: ClientProxy,
) {}
async createOrder(dto: CreateOrderDto) {
return this.ordersClient.send({ cmd: 'create_order' }, dto).toPromise();
}
async emitOrderCreated(order: Order) {
this.ordersClient.emit('order_created', order);
}
}
gRPC 传输
// main.ts - gRPC 微服务
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.GRPC,
options: {
package: 'users',
protoPath: join(__dirname, './users.proto'),
url: 'localhost:5000',
},
},
);
// users.proto
syntax = "proto3";
package users;
service UsersService {
rpc FindOne (UserById) returns (User);
rpc FindAll (Empty) returns (UsersResponse);
}
message UserById {
int32 id = 1;
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
基于 Redis Pub/Sub 的事件驱动
// 使用 Redis 实现事件驱动架构
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
transport: Transport.REDIS,
options: {
host: 'localhost',
port: 6379,
},
},
);
微服务中的异常过滤器
import { Catch, RpcExceptionFilter } from '@nestjs/common';
import { Observable, throwError } from 'rxjs';
import { RpcException } from '@nestjs/microservices';
@Catch(RpcException)
export class ExceptionFilter implements RpcExceptionFilter<RpcException> {
catch(exception: RpcException): Observable<any> {
return throwError(() => exception.getError());
}
}
健康检查
import { HealthCheckService, MicroserviceHealthIndicator } from '@nestjs/terminus';
import { Transport } from '@nestjs/microservices';
@Controller('health')
export class HealthController {
constructor(
private health: HealthCheckService,
private microservice: MicroserviceHealthIndicator,
) {}
@Get()
@HealthCheck()
check() {
return this.health.check([
() =>
this.microservice.pingCheck('orders-service', {
transport: Transport.TCP,
options: { host: 'orders-service', port: 3001 },
}),
]);
}
}
服务间通信模式
请求-响应
使用 send() 进行需要回复的同步操作。
即发即忘
使用 emit() 进行无需响应的事件。
Saga(分布式事务)
实现 Saga 模式处理多服务事务:
// 订单 saga:扣款 -> 预留库存 -> 确认订单
async createOrderSaga(orderId: string) {
try {
await this.paymentService.debit(orderId);
await this.inventoryService.reserve(orderId);
await this.ordersService.confirm(orderId);
} catch (err) {
// 补偿事务
await this.inventoryService.release(orderId);
await this.paymentService.refund(orderId);
throw err;
}
}
最佳实践
- 使用 DTO + class-validator 处理所有服务间消息
- 添加关联 ID 以跨服务追踪请求
- 实现断路器,使用如
opossum等库 - 对 API 进行版本控制,支持滚动更新
- 使用共享库 管理公共 DTO 和事件类型