正在加载,请稍候…

微服务架构模式:何时使用及如何实现

微服务实用指南:服务拆分、API网关、服务发现、断路器、Saga模式与事件驱动架构,附真实实现示例

微服务架构模式:何时使用及如何实现

微服务不是目标,而是权衡

微服务的热潮几年前就已达到顶峰。现实是:微服务在解决特定组织和扩展问题的同时,也引入了巨大的复杂性。运行50个服务而不是一个单体,意味着50个部署管道、50个监控仪表盘,以及每个请求上的分布式系统问题。

在采用微服务之前,请问:你的团队是否大到在单体上协作变得痛苦?系统的某些部分是否以截然不同的速率扩展?如果答案是否定的,那么一个结构良好的单体将更适合你。

本指南假设你已经决定微服务是合适的,并解释如何良好地实现它们。

微服务架构模式:何时使用及如何实现 插图

服务拆分:在哪里划定边界

微服务中最困难的问题。如果搞错了,你会得到一个“分布式单体”——拥有微服务的所有复杂性却没有好处。

领域驱动设计(DDD)的有界上下文提供了最可靠的拆分策略:

电商系统有界上下文:

订单上下文          用户上下文          库存上下文
├── 订单              ├── 用户            ├── 产品
├── 订单行            ├── 地址            ├── 库存
├── 订单状态          ├── 支付方式        ├── 仓库
└── 配送              └── 个人资料        └── 预留

支付上下文          通知上下文
├── 交易              ├── 邮件
├── 退款              ├── 短信
└── 发票              └── 推送通知

每个有界上下文成为一个服务。关键点:服务应该能够独立变更。如果更改订单服务总是需要更改用户服务,那么边界就是错误的。

API网关模式

外部客户端不应直接调用各个服务:

// 使用Express的简单API网关
import express from 'express';
import { createProxyMiddleware } from 'http-proxy-middleware';

const app = express();

// 认证中间件(在所有代理之前运行)
app.use(async (req, res, next) => {
  const token = req.headers.authorization?.split(' ')[1];
  if (!token) return res.status(401).json({ error: 'No token' });
  
  try {
    req.user = await verifyJWT(token);
    next();
  } catch {
    res.status(401).json({ error: 'Invalid token' });
  }
});

// 路由到相应的微服务
app.use('/api/users', createProxyMiddleware({
  target: process.env.USER_SERVICE_URL,
  changeOrigin: true,
  pathRewrite: { '^/api/users': '' },
}));

app.use('/api/orders', createProxyMiddleware({
  target: process.env.ORDER_SERVICE_URL,
  changeOrigin: true,
  pathRewrite: { '^/api/orders': '' },
}));

// 聚合:组合多个服务
app.get('/api/dashboard', async (req, res) => {
  const [user, orders, notifications] = await Promise.all([
    fetch(`${USER_SERVICE}/users/${req.user.id}`).then(r => r.json()),
    fetch(`${ORDER_SERVICE}/users/${req.user.id}/recent`).then(r => r.json()),
    fetch(`${NOTIF_SERVICE}/users/${req.user.id}/unread`).then(r => r.json()),
  ]);
  
  res.json({ user, orders, notifications });
});

服务间通信

微服务架构模式:何时使用及如何实现 插图

同步:REST / gRPC

// 类型化的服务客户端(gRPC风格)
import { createChannel, createClient } from 'nice-grpc';
import { OrderServiceDefinition } from './proto/order';

const channel = createChannel('order-service:50051');
const orderClient = createClient(OrderServiceDefinition, channel);

// 类型安全、高效的二进制协议
const order = await orderClient.getOrder({ orderId: '123' });
const { stream } = orderClient.watchOrderStatus({ orderId: '123' });

for await (const status of stream) {
  console.log('Order status:', status);
}

异步:消息队列

// 使用amqplib的RabbitMQ
import amqp from 'amqplib';

const connection = await amqp.connect(process.env.RABBITMQ_URL);
const channel = await connection.createChannel();

// 发布者:订单服务
async function publishOrderCreated(order) {
  await channel.assertExchange('orders', 'topic', { durable: true });
  
  channel.publish(
    'orders',
    'order.created',
    Buffer.from(JSON.stringify({
      orderId: order.id,
      userId: order.userId,
      total: order.total,
      timestamp: new Date().toISOString(),
    })),
    { persistent: true }
  );
}

// 订阅者:库存服务
await channel.assertExchange('orders', 'topic', { durable: true });
const { queue } = await channel.assertQueue('inventory.order-created');
await channel.bindQueue(queue, 'orders', 'order.created');

channel.consume(queue, async (msg) => {
  const event = JSON.parse(msg.content.toString());
  
  try {
    await reserveInventory(event.orderId);
    channel.ack(msg); // 确认成功
  } catch (error) {
    // 否定确认——重新入队重试
    channel.nack(msg, false, true);
  }
});

断路器模式

防止下游服务出现问题时发生级联故障:

import CircuitBreaker from 'opossum';

// 将任何异步函数包装在断路器中
const paymentCircuitBreaker = new CircuitBreaker(
  async (paymentData) => {
    return fetch(`${PAYMENT_SERVICE}/charge`, {
      method: 'POST',
      body: JSON.stringify(paymentData),
      signal: AbortSignal.timeout(3000), // 3秒超时
    }).then(r => r.json());
  },
  {
    timeout: 3000,          // 多久后认为请求失败
    errorThresholdPercentage: 50,  // 如果50%+失败则打开断路器
    resetTimeout: 30000,    // 30秒后重试
    volumeThreshold: 5,     // 打开前的最小请求数
  }
);

paymentCircuitBreaker.on('open', () => {
  console.error('Payment service circuit OPEN — using fallback');
});
paymentCircuitBreaker.on('halfOpen', () => {
  console.log('Payment service circuit testing...');
});
paymentCircuitBreaker.on('close', () => {
  console.log('Payment service circuit CLOSED — normal operation');
});

// 使用
async function processPayment(orderId, amount) {
  try {
    return await paymentCircuitBreaker.fire({ orderId, amount });
  } catch (error) {
    if (paymentCircuitBreaker.opened) {
      // 降级:排队稍后处理
      await queuePaymentForRetry({ orderId, amount });
      return { status: 'queued', message: 'Payment queued for processing' };
    }
    throw error;
  }
}

微服务架构模式:何时使用及如何实现 插图

Saga模式:分布式事务

当一个操作跨越多个服务时,你需要Saga而不是ACID事务:

// 基于编排的Saga:服务对事件做出反应
// (无中央协调器——服务发出并响应事件)

// 订单服务创建订单 → 发布OrderCreated
async function createOrder(orderData) {
  const order = await db.orders.create({ ...orderData, status: 'pending' });
  await eventBus.publish('OrderCreated', { orderId: order.id, ...orderData });
  return order;
}

// 支付服务收到OrderCreated → 扣款
 eventBus.subscribe('OrderCreated', async (event) => {
  try {
    await chargePayment(event.userId, event.total);
    await eventBus.publish('PaymentProcessed', { orderId: event.orderId });
  } catch (error) {
    // 补偿事务
    await eventBus.publish('PaymentFailed', { orderId: event.orderId });
  }
});

// 订单服务收到PaymentFailed → 取消订单
eventBus.subscribe('PaymentFailed', async (event) => {
  await db.orders.update(event.orderId, { status: 'cancelled' });
  await eventBus.publish('OrderCancelled', { orderId: event.orderId });
});

// 库存服务收到PaymentProcessed → 预留商品
// 库存服务收到OrderCancelled → 释放商品
// 基于编排的Saga:中央协调器
class CreateOrderSaga {
  async execute(orderData) {
    const steps = [];
    
    try {
      // 步骤1:创建订单
      const order = await this.orderService.create(orderData);
      steps.push(() => this.orderService.cancel(order.id));
      
      // 步骤2:预留库存
      await this.inventoryService.reserve(order.items);
      steps.push(() => this.inventoryService.release(order.items));
      
      // 步骤3:处理支付
      await this.paymentService.charge(order.userId, order.total);
      steps.push(() => this.paymentService.refund(order.userId, order.total));
      
      // 步骤4:确认
      await this.orderService.confirm(order.id);
      
      return order;
    } catch (error) {
      // 回滚:以相反顺序执行补偿事务
      for (const compensate of steps.reverse()) {
        await compensate().catch(console.error);
      }
      throw error;
    }
  }
}

服务发现

// 使用Consul进行服务发现
import Consul from 'consul';

const consul = new Consul({ host: 'consul', port: 8500 });

// 启动时注册
await consul.agent.service.register({
  id: `order-service-${process.env.POD_IP}`,
  name: 'order-service',
  address: process.env.POD_IP,
  port: 3000,
  check: {
    http: `http://${process.env.POD_IP}:3000/health`,
    interval: '10s',
    deregisterCriticalServiceAfter: '30s',
  },
  tags: ['api', 'v2'],
});

// 发现并调用另一个服务
async function callUserService(userId) {
  // 获取user-service的健康实例
  const services = await consul.health.service({
    service: 'user-service',
    passing: true, // 仅健康实例
  });
  
  if (services.length === 0) throw new Error('No user-service instances available');
  
  // 简单轮询(实践中使用负载均衡器或服务网格)
  const instance = services[Math.floor(Math.random() * services.length)];
  const url = `http://${instance.Service.Address}:${instance.Service.Port}`;
  
  return fetch(`${url}/users/${userId}`).then(r => r.json());
}

关键权衡总结

关注点 单体 微服务
复杂性 高(分布式系统)
部署 简单 复杂(多管道)
扩展 整个应用 按服务
团队规模 1-15 50+(康威定律)
延迟 进程内 网络跳转
事务 ACID 需要Saga模式
调试 简单 需要分布式追踪
数据隔离 共享数据库 每个服务独立数据库

→ 使用 JSON to YAML Converter 在YAML和JSON之间转换服务配置。