
MongoDB 聚合管道
聚合管道通过多个阶段处理文档,在每个步骤中转换数据。
基本管道
db.orders.aggregate([
// 阶段 1:过滤文档
{ $match: { status: 'completed', createdAt: { $gte: new Date('2024-01-01') } } },
// 阶段 2:分组并计算
{
$group: {
_id: '$userId',
totalSpent: { $sum: '$total' },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: '$total' },
lastOrder: { $max: '$createdAt' },
}
},
// 阶段 3:过滤分组结果
{ $match: { totalSpent: { $gte: 100 } } },
// 阶段 4:排序
{ $sort: { totalSpent: -1 } },
// 阶段 5:限制结果
{ $limit: 10 },
// 阶段 6:重塑输出
{
$project: {
userId: '$_id',
_id: 0,
totalSpent: { $round: ['$totalSpent', 2] },
orderCount: 1,
avgOrderValue: { $round: ['$avgOrderValue', 2] },
}
}
]);

$lookup(JOIN 等效)
db.orders.aggregate([
{ $match: { status: 'pending' } },
{
$lookup: {
from: 'users',
localField: 'userId',
foreignField: '_id',
as: 'user',
}
},
{ $unwind: '$user' }, // 将 user 数组展平为对象
{
$project: {
orderId: '$_id',
userName: '$user.name',
userEmail: '$user.email',
total: 1,
status: 1,
}
}
]);
// 使用管道的进阶 lookup
db.orders.aggregate([
{
$lookup: {
from: 'orderItems',
let: { orderId: '$_id' },
pipeline: [
{ $match: { $expr: { $eq: ['$orderId', '$orderId'] } } },
{ $lookup: { from: 'products', localField: 'productId', foreignField: '_id', as: 'product' } },
{ $unwind: '$product' },
{ $project: { name: '$product.name', quantity: 1, price: 1 } },
],
as: 'items',
}
}
]);

$facet(多管道)
// 在单个查询中获取结果和计数
db.products.aggregate([
{ $match: { active: true } },
{
$facet: {
// 分支 1:分页结果
results: [
{ $sort: { price: 1 } },
{ $skip: 0 },
{ $limit: 20 },
{ $project: { name: 1, price: 1, category: 1 } },
],
// 分支 2:总计数
totalCount: [
{ $count: 'count' }
],
// 分支 3:按类别分布
byCategory: [
{ $group: { _id: '$category', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
],
// 分支 4:价格统计
priceStats: [
{ $group: {
_id: null,
min: { $min: '$price' },
max: { $max: '$price' },
avg: { $avg: '$price' },
}}
],
}
}
]);

时间序列聚合
// 最近 30 天的每日收入
db.orders.aggregate([
{
$match: {
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
status: 'completed',
}
},
{
$group: {
_id: {
year: { $year: '$createdAt' },
month: { $month: '$createdAt' },
day: { $dayOfMonth: '$createdAt' },
},
revenue: { $sum: '$total' },
orders: { $sum: 1 },
}
},
{ $sort: { '_id.year': 1, '_id.month': 1, '_id.day': 1 } },
{
$project: {
date: {
$dateToString: {
format: '%Y-%m-%d',
date: {
$dateFromParts: {
year: '$_id.year', month: '$_id.month', day: '$_id.day'
}
}
}
},
revenue: { $round: ['$revenue', 2] },
orders: 1,
}
}
]);
文本搜索
// 创建文本索引
db.articles.createIndex({ title: 'text', body: 'text' });
// 带评分的文本搜索
db.articles.aggregate([
{ $match: { $text: { $search: 'mongodb performance optimization' } } },
{ $addFields: { score: { $meta: 'textScore' } } },
{ $sort: { score: { $meta: 'textScore' } } },
{ $limit: 10 },
]);
在聚合中使用 explain() 来理解查询计划并确保使用了索引。