正在加载,请稍候…

Node.js 异步模式:Promise、async/await 与并发控制

掌握 Node.js 异步编程,学习 Promise 模式、async/await 最佳实践、并发控制、错误处理以及常见陷阱的避免方法。

Node.js 异步模式:Promise、async/await 与并发控制

Node.js 异步模式:Promise、async/await 与并发控制

Promise 基础

// 创建 Promise
const delay = (ms: number): Promise<void> =>
  new Promise(resolve => setTimeout(resolve, ms));

const fetchUser = (id: string): Promise<User> =>
  new Promise((resolve, reject) => {
    db.find(id, (err, user) => {
      if (err) reject(err);
      else if (!user) reject(new Error('Not found'));
      else resolve(user);
    });
  });

Node.js 异步模式:Promise、async/await 与并发控制 插图

并行与串行执行

// 串行(慢——每个等待前一个完成)
const user1 = await fetchUser('1');
const user2 = await fetchUser('2');
const user3 = await fetchUser('3');

// 并行(快——所有同时运行)
const [user1, user2, user3] = await Promise.all([
  fetchUser('1'),
  fetchUser('2'),
  fetchUser('3'),
]);

// 第一个完成者获胜
const firstAvailable = await Promise.race([
  fetchFromPrimary(id),
  delay(500).then(() => fetchFromFallback(id)),
]);

// 全部完成(不因单个错误而失败)
const results = await Promise.allSettled([
  fetchUser('1'),
  fetchUser('invalid'),
  fetchUser('3'),
]);

results.forEach(result => {
  if (result.status === 'fulfilled') console.log(result.value);
  else console.error(result.reason);
});

并发控制

// 以有限并发处理数组(p-limit 模式)
async function mapWithConcurrency<T, R>(
  items: T[],
  fn: (item: T) => Promise<R>,
  concurrency: number
): Promise<R[]> {
  const results: R[] = [];
  const queue = [...items];
  
  const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
    while (queue.length > 0) {
      const item = queue.shift()!;
      const result = await fn(item);
      results.push(result);
    }
  });

  await Promise.all(workers);
  return results;
}

// 用法:处理 100 个用户,但每次只处理 5 个
const processed = await mapWithConcurrency(userIds, processUser, 5);

Node.js 异步模式:Promise、async/await 与并发控制 插图

错误处理模式

// 使用 Result 类型包装
async function safeAsync<T>(
  fn: () => Promise<T>
): Promise<{ data: T; error: null } | { data: null; error: Error }> {
  try {
    return { data: await fn(), error: null };
  } catch (err) {
    return { data: null, error: err as Error };
  }
}

const { data: user, error } = await safeAsync(() => fetchUser(id));
if (error) {
  console.error('获取用户失败:', error.message);
  return;
}
// TypeScript 知道此处 user 不为 null

// 重试逻辑
async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
  baseDelay = 1000
): Promise<T> {
  for (let attempt = 0; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (err) {
      if (attempt === maxRetries) throw err;
      await delay(baseDelay * Math.pow(2, attempt));
    }
  }
  throw new Error('unreachable');
}

异步迭代

// 以流的形式处理数据库结果
async function* streamUsers(filter: UserFilter) {
  let page = 0;
  const pageSize = 100;

  while (true) {
    const users = await userRepo.findAll({ ...filter, page, pageSize });
    if (users.length === 0) break;
    yield* users;
    page++;
  }
}

for await (const user of streamUsers({ role: 'admin' })) {
  await processUser(user);
}

Node.js 异步模式:Promise、async/await 与并发控制 插图

使用 AbortController 取消

async function fetchWithTimeout<T>(
  fn: (signal: AbortSignal) => Promise<T>,
  timeoutMs: number
): Promise<T> {
  const controller = new AbortController();
  const timeoutId = setTimeout(() => controller.abort(), timeoutMs);

  try {
    return await fn(controller.signal);
  } finally {
    clearTimeout(timeoutId);
  }
}

// 用法
const user = await fetchWithTimeout(
  (signal) => fetch('/api/user', { signal }).then(r => r.json()),
  5000
);

常见反模式

// 错误:在 map 中不必要的 await 导致串行执行
const users = await Promise.all(ids.map(async id => {
  return await fetchUser(id);  // async map 中的 'await' 虽然可以但多余
}));

// 错误:忘记 await
async function updateAllUsers(updates: UserUpdate[]): Promise<void> {
  updates.forEach(async update => {  // Bug!forEach 不会等待 Promise
    await processUpdate(update);
  });
}

// 正确:使用 for...of 或 Promise.all
async function updateAllUsers(updates: UserUpdate[]): Promise<void> {
  for (const update of updates) {
    await processUpdate(update);
  }
  // 或:await Promise.all(updates.map(processUpdate));
}

理解 Node.js 事件循环和异步模式是编写高性能服务端代码的基础。