微服务模式:断路器、重试与舱壁隔离
弹性模式可防止分布式系统中的级联故障。
断路器模式
enum CircuitState { CLOSED, OPEN, HALF_OPEN }
class CircuitBreaker {
private state = CircuitState.CLOSED;
private failureCount = 0;
private successCount = 0;
private lastFailureTime = 0;
constructor(
private readonly failureThreshold = 5,
private readonly recoveryTimeout = 30000,
private readonly successThreshold = 2
) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.state === CircuitState.OPEN) {
if (Date.now() - this.lastFailureTime > this.recoveryTimeout) {
this.state = CircuitState.HALF_OPEN;
} else {
throw new Error('Circuit is OPEN - service unavailable');
}
}
try {
const result = await fn();
this.onSuccess();
return result;
} catch (error) {
this.onFailure();
throw error;
}
}
private onSuccess() {
this.failureCount = 0;
if (this.state === CircuitState.HALF_OPEN) {
this.successCount++;
if (this.successCount >= this.successThreshold) {
this.state = CircuitState.CLOSED;
this.successCount = 0;
}
}
}
private onFailure() {
this.failureCount++;
this.lastFailureTime = Date.now();
if (this.failureCount >= this.failureThreshold) {
this.state = CircuitState.OPEN;
}
}
}
指数退避重试
interface RetryOptions {
maxAttempts: number;
initialDelay: number;
maxDelay: number;
backoffFactor: number;
retryableErrors?: string[];
}
async function withRetry<T>(
fn: () => Promise<T>,
options: RetryOptions
): Promise<T> {
let lastError: Error;
let delay = options.initialDelay;
for (let attempt = 1; attempt <= options.maxAttempts; attempt++) {
try {
return await fn();
} catch (error) {
lastError = error as Error;
const isRetryable = !options.retryableErrors ||
options.retryableErrors.some(e => lastError.message.includes(e));
if (!isRetryable || attempt === options.maxAttempts) throw lastError;
console.log(`Attempt ${attempt} failed, retrying in ${delay}ms...`);
await sleep(delay + Math.random() * 100); // jitter
delay = Math.min(delay * options.backoffFactor, options.maxDelay);
}
}
throw lastError!;
}
// 使用示例
const result = await withRetry(
() => paymentService.charge(order),
{ maxAttempts: 3, initialDelay: 500, maxDelay: 5000, backoffFactor: 2 }
);
舱壁隔离模式
隔离资源,防止一个故障服务耗尽所有资源。
class BulkheadExecutor {
private running = 0;
private queue: Array<() => void> = [];
constructor(private readonly maxConcurrent: number) {}
async execute<T>(fn: () => Promise<T>): Promise<T> {
if (this.running >= this.maxConcurrent) {
await new Promise<void>((resolve, reject) => {
this.queue.push(resolve);
setTimeout(() => reject(new Error('Bulkhead full')), 5000);
});
}
this.running++;
try {
return await fn();
} finally {
this.running--;
const next = this.queue.shift();
if (next) next();
}
}
}
// 每个服务独立的舱壁
const paymentBulkhead = new BulkheadExecutor(10);
const inventoryBulkhead = new BulkheadExecutor(20);
超时模式
function withTimeout<T>(fn: () => Promise<T>, ms: number): Promise<T> {
const timeout = new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`Operation timed out after ${ms}ms`)), ms)
);
return Promise.race([fn(), timeout]);
}
// 与断路器结合使用
const breaker = new CircuitBreaker();
const result = await breaker.execute(
() => withTimeout(() => externalService.call(), 3000)
);
降级模式
class ProductService {
async getProduct(id: string): Promise<Product> {
try {
return await this.remoteService.getProduct(id);
} catch (error) {
// 降级到缓存
const cached = await this.cache.get(`product:${id}`);
if (cached) return cached;
// 降级到默认值
return this.getDefaultProduct(id);
}
}
}
这些弹性模式对于生产环境中的微服务至关重要。