
Node.js 异步模式:Promise、async/await 与并发控制
Promise 基础
// 创建 Promise
const delay = (ms: number): Promise<void> =>
new Promise(resolve => setTimeout(resolve, ms));
const fetchUser = (id: string): Promise<User> =>
new Promise((resolve, reject) => {
db.find(id, (err, user) => {
if (err) reject(err);
else if (!user) reject(new Error('Not found'));
else resolve(user);
});
});
并行与串行执行
// 串行(慢——每个等待前一个完成)
const user1 = await fetchUser('1');
const user2 = await fetchUser('2');
const user3 = await fetchUser('3');
// 并行(快——所有同时运行)
const [user1, user2, user3] = await Promise.all([
fetchUser('1'),
fetchUser('2'),
fetchUser('3'),
]);
// 第一个完成者获胜
const firstAvailable = await Promise.race([
fetchFromPrimary(id),
delay(500).then(() => fetchFromFallback(id)),
]);
// 全部完成(不因单个错误而失败)
const results = await Promise.allSettled([
fetchUser('1'),
fetchUser('invalid'),
fetchUser('3'),
]);
results.forEach(result => {
if (result.status === 'fulfilled') console.log(result.value);
else console.error(result.reason);
});
并发控制
// 以有限并发处理数组(p-limit 模式)
async function mapWithConcurrency<T, R>(
items: T[],
fn: (item: T) => Promise<R>,
concurrency: number
): Promise<R[]> {
const results: R[] = [];
const queue = [...items];
const workers = Array.from({ length: Math.min(concurrency, items.length) }, async () => {
while (queue.length > 0) {
const item = queue.shift()!;
const result = await fn(item);
results.push(result);
}
});
await Promise.all(workers);
return results;
}
// 用法:处理 100 个用户,但每次只处理 5 个
const processed = await mapWithConcurrency(userIds, processUser, 5);
错误处理模式
// 使用 Result 类型包装
async function safeAsync<T>(
fn: () => Promise<T>
): Promise<{ data: T; error: null } | { data: null; error: Error }> {
try {
return { data: await fn(), error: null };
} catch (err) {
return { data: null, error: err as Error };
}
}
const { data: user, error } = await safeAsync(() => fetchUser(id));
if (error) {
console.error('获取用户失败:', error.message);
return;
}
// TypeScript 知道此处 user 不为 null
// 重试逻辑
async function withRetry<T>(
fn: () => Promise<T>,
maxRetries = 3,
baseDelay = 1000
): Promise<T> {
for (let attempt = 0; attempt <= maxRetries; attempt++) {
try {
return await fn();
} catch (err) {
if (attempt === maxRetries) throw err;
await delay(baseDelay * Math.pow(2, attempt));
}
}
throw new Error('unreachable');
}
异步迭代
// 以流的形式处理数据库结果
async function* streamUsers(filter: UserFilter) {
let page = 0;
const pageSize = 100;
while (true) {
const users = await userRepo.findAll({ ...filter, page, pageSize });
if (users.length === 0) break;
yield* users;
page++;
}
}
for await (const user of streamUsers({ role: 'admin' })) {
await processUser(user);
}
使用 AbortController 取消
async function fetchWithTimeout<T>(
fn: (signal: AbortSignal) => Promise<T>,
timeoutMs: number
): Promise<T> {
const controller = new AbortController();
const timeoutId = setTimeout(() => controller.abort(), timeoutMs);
try {
return await fn(controller.signal);
} finally {
clearTimeout(timeoutId);
}
}
// 用法
const user = await fetchWithTimeout(
(signal) => fetch('/api/user', { signal }).then(r => r.json()),
5000
);
常见反模式
// 错误:在 map 中不必要的 await 导致串行执行
const users = await Promise.all(ids.map(async id => {
return await fetchUser(id); // async map 中的 'await' 虽然可以但多余
}));
// 错误:忘记 await
async function updateAllUsers(updates: UserUpdate[]): Promise<void> {
updates.forEach(async update => { // Bug!forEach 不会等待 Promise
await processUpdate(update);
});
}
// 正确:使用 for...of 或 Promise.all
async function updateAllUsers(updates: UserUpdate[]): Promise<void> {
for (const update of updates) {
await processUpdate(update);
}
// 或:await Promise.all(updates.map(processUpdate));
}
理解 Node.js 事件循环和异步模式是编写高性能服务端代码的基础。