
MongoDB 聚合管道
聚合管道通过多个阶段处理文档,在每个步骤中对文档进行转换。

基础管道阶段
// db.orders.aggregate([...])
// $match:过滤文档(使用索引!)
{ $match: { status: 'completed', createdAt: { $gte: new Date('2026-01-01') } } }
// $group:按字段聚合
{ $group: {
_id: '$userId',
orderCount: { $sum: 1 },
totalRevenue: { $sum: '$total' },
avgOrder: { $avg: '$total' },
firstOrder: { $min: '$createdAt' },
lastOrder: { $max: '$createdAt' },
} }
// $sort:排序结果
{ $sort: { totalRevenue: -1 } }
// $limit 和 $skip:分页
{ $limit: 20 }
// $project:重塑文档
{ $project: {
userId: '$_id',
_id: 0,
orderCount: 1,
totalRevenue: { $round: ['$totalRevenue', 2] },
avgOrder: { $round: ['$avgOrder', 2] },
} }
$lookup(连接)
// 左连接 orders 与 users
db.orders.aggregate([
{ $match: { status: 'completed' } },
// 简单连接
{ $lookup: {
from: 'users',
localField: 'userId',
foreignField: '_id',
as: 'user',
} },
// 展平数组(左连接返回数组)
{ $unwind: { path: '$user', preserveNullAndEmptyArrays: true } },
// 管道连接(更强大)
{ $lookup: {
from: 'products',
let: { itemIds: '$items.productId' },
pipeline: [
{ $match: { $expr: { $in: ['$_id', '$itemIds'] } } },
{ $project: { name: 1, price: 1, category: 1 } },
],
as: 'products',
} },
])

$facet(单次查询中的多重聚合)
// 对同一输入运行多个子管道
db.products.aggregate([
{ $match: { active: true } },
{ $facet: {
// 总数和统计
metadata: [
{ $count: 'total' },
],
// 价格分布
priceRanges: [
{ $bucket: {
groupBy: '$price',
boundaries: [0, 25, 50, 100, 200, 500],
default: '500+',
output: { count: { $sum: 1 }, products: { $push: '$name' } }
} },
],
// 热门类别
categories: [
{ $group: { _id: '$category', count: { $sum: 1 } } },
{ $sort: { count: -1 } },
{ $limit: 5 },
],
} },
])
时间序列聚合
// 最近30天的每日收入
db.orders.aggregate([
{ $match: {
createdAt: { $gte: new Date(Date.now() - 30 * 24 * 3600 * 1000) },
status: 'completed',
} },
{ $group: {
_id: {
year: { $year: '$createdAt' },
month: { $month: '$createdAt' },
day: { $dayOfMonth: '$createdAt' },
},
revenue: { $sum: '$total' },
count: { $sum: 1 },
} },
{ $sort: { '_id.year': 1, '_id.month': 1, '_id.day': 1 } },
{ $project: {
date: {
$dateFromParts: {
year: '$_id.year',
month: '$_id.month',
day: '$_id.day',
}
},
revenue: { $round: ['$revenue', 2] },
count: 1,
_id: 0,
} },
])

窗口函数($setWindowFields)
// 累计收入(运行总计)
db.orders.aggregate([
{ $match: { status: 'completed' } },
{ $setWindowFields: {
partitionBy: '$userId',
sortBy: { createdAt: 1 },
output: {
runningTotal: {
$sum: '$total',
window: { documents: ['unbounded', 'current'] },
},
previousOrderTotal: {
$shift: { output: '$total', by: -1, default: 0 },
},
rankInUser: { $rank: {} },
},
} },
])
性能:在管道中使用索引
// $match 阶段必须放在首位才能使用索引
// 错误:
db.orders.aggregate([
{ $group: { _id: '$userId', total: { $sum: '$amount' } } },
{ $match: { total: { $gt: 1000 } } }, // 太晚,无法使用索引!
])
// 正确:
db.orders.aggregate([
{ $match: { createdAt: { $gte: new Date('2026-01-01') } } }, // 使用索引
{ $group: { _id: '$userId', total: { $sum: '$amount' } } },
])
// 使用 explain() 验证
db.orders.explain('executionStats').aggregate([...])
Atlas Search(全文搜索)
// 需要 Atlas Search 索引
db.products.aggregate([
{ $search: {
index: 'products_search',
compound: {
must: [
{ text: { query: 'wireless headphones', path: ['name', 'description'] } },
],
should: [
{ range: { path: 'rating', gte: 4.0, boost: { value: 2 } } },
],
filter: [
{ equals: { path: 'inStock', value: true } },
],
},
} },
{ $project: {
name: 1,
price: 1,
score: { $meta: 'searchScore' },
} },
{ $sort: { score: -1 } },
])