正在加载,请稍候…

微服务模式:断路器、重试与舱壁隔离

通过断路器、重试、超时和舱壁隔离构建弹性微服务,使用 Node.js 和 TypeScript 实现这些模式。

微服务模式:断路器、重试与舱壁隔离

微服务模式:断路器、重试与舱壁隔离

弹性模式可防止分布式系统中的级联故障。

断路器模式

enum CircuitState { CLOSED, OPEN, HALF_OPEN }

class CircuitBreaker {
  private state = CircuitState.CLOSED;
  private failureCount = 0;
  private successCount = 0;
  private lastFailureTime = 0;

  constructor(
    private readonly failureThreshold = 5,
    private readonly recoveryTimeout = 30000,
    private readonly successThreshold = 2
  ) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === CircuitState.OPEN) {
      if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
        this.state = CircuitState.HALF_OPEN;
      } else {
        throw new Error('Circuit is OPEN - service unavailable');
      }
    }

    try {
      const result = await fn();
      this.onSuccess();
      return result;
    } catch (error) {
      this.onFailure();
      throw error;
    }
  }

  private onSuccess() {
    this.failureCount = 0;
    if (this.state === CircuitState.HALF_OPEN) {
      this.successCount++;
      if (this.successCount >= this.successThreshold) {
        this.state = CircuitState.CLOSED;
        this.successCount = 0;
      }
    }
  }

  private onFailure() {
    this.failureCount++;
    this.lastFailureTime = Date.now();
    if (this.failureCount >= this.failureThreshold) {
      this.state = CircuitState.OPEN;
    }
  }
}

微服务模式:断路器、重试与舱壁隔离示意图

指数退避重试

interface RetryOptions {
  maxAttempts: number;
  initialDelay: number;
  maxDelay: number;
  backoffFactor: number;
  retryableErrors?: string[];
}

async function withRetry<T>(
  fn: () => Promise<T>,
  options: RetryOptions
): Promise<T> {
  let lastError: Error;
  let delay = options.initialDelay;

  for (let attempt = 1; attempt <= options.maxAttempts; attempt++) {
    try {
      return await fn();
    } catch (error) {
      lastError = error as Error;
      const isRetryable = !options.retryableErrors ||
        options.retryableErrors.some(e => lastError.message.includes(e));

      if (!isRetryable || attempt === options.maxAttempts) throw lastError;

      console.log(`Attempt ${attempt} failed, retrying in ${delay}ms...`);
      await sleep(delay + Math.random() * 100); // jitter
      delay = Math.min(delay * options.backoffFactor, options.maxDelay);
    }
  }

  throw lastError!;
}

// 使用示例
const result = await withRetry(
  () => paymentService.charge(order),
  { maxAttempts: 3, initialDelay: 500, maxDelay: 5000, backoffFactor: 2 }
);

微服务模式:断路器、重试与舱壁隔离示意图

舱壁隔离模式

隔离资源,防止一个故障服务耗尽所有资源。

class BulkheadExecutor {
  private running = 0;
  private queue: Array<() => void> = [];

  constructor(private readonly maxConcurrent: number) {}

  async execute<T>(fn: () => Promise<T>): Promise<T> {
    if (this.running >= this.maxConcurrent) {
      await new Promise<void>((resolve, reject) => {
        this.queue.push(resolve);
        setTimeout(() => reject(new Error('Bulkhead full')), 5000);
      });
    }

    this.running++;
    try {
      return await fn();
    } finally {
      this.running--;
      const next = this.queue.shift();
      if (next) next();
    }
  }
}

// 每个服务独立的舱壁
const paymentBulkhead = new BulkheadExecutor(10);
const inventoryBulkhead = new BulkheadExecutor(20);

微服务模式:断路器、重试与舱壁隔离示意图

超时模式

function withTimeout<T>(fn: () => Promise<T>, ms: number): Promise<T> {
  const timeout = new Promise<never>((_, reject) =>
    setTimeout(() => reject(new Error(`Operation timed out after ${ms}ms`)), ms)
  );
  return Promise.race([fn(), timeout]);
}

// 与断路器结合使用
const breaker = new CircuitBreaker();
const result = await breaker.execute(
  () => withTimeout(() => externalService.call(), 3000)
);

降级模式

class ProductService {
  async getProduct(id: string): Promise<Product> {
    try {
      return await this.remoteService.getProduct(id);
    } catch (error) {
      // 降级到缓存
      const cached = await this.cache.get(`product:${id}`);
      if (cached) return cached;
      // 降级到默认值
      return this.getDefaultProduct(id);
    }
  }
}

这些弹性模式对于生产环境中的微服务至关重要。