正在加载,请稍候…

MongoDB 聚合管道:从基础到进阶

掌握 MongoDB 聚合管道,从基础阶段到高级功能如 $lookup、$facet、时间序列聚合和窗口函数,并优化性能。

MongoDB 聚合管道:从基础到进阶

MongoDB 聚合管道

聚合管道通过多个阶段处理文档,在每个步骤中对文档进行转换。

MongoDB 聚合管道:从基础到进阶 示意图

基础管道阶段

// db.orders.aggregate([...])

// $match:过滤文档(使用索引!)
{ $match: { status: 'completed', createdAt: { $gte: new Date('2026-01-01') } } }

// $group:按字段聚合
{ $group: {
  _id: '$userId',
  orderCount: { $sum: 1 },
  totalRevenue: { $sum: '$total' },
  avgOrder: { $avg: '$total' },
  firstOrder: { $min: '$createdAt' },
  lastOrder: { $max: '$createdAt' },
} }

// $sort:排序结果
{ $sort: { totalRevenue: -1 } }

// $limit 和 $skip:分页
{ $limit: 20 }

// $project:重塑文档
{ $project: {
  userId: '$_id',
  _id: 0,
  orderCount: 1,
  totalRevenue: { $round: ['$totalRevenue', 2] },
  avgOrder: { $round: ['$avgOrder', 2] },
} }

$lookup(连接)

// 左连接 orders 与 users
 db.orders.aggregate([
  { $match: { status: 'completed' } },
  
  // 简单连接
  { $lookup: {
    from: 'users',
    localField: 'userId',
    foreignField: '_id',
    as: 'user',
  } },
  
  // 展平数组(左连接返回数组)
  { $unwind: { path: '$user', preserveNullAndEmptyArrays: true } },
  
  // 管道连接(更强大)
  { $lookup: {
    from: 'products',
    let: { itemIds: '$items.productId' },
    pipeline: [
      { $match: { $expr: { $in: ['$_id', '$itemIds'] } } },
      { $project: { name: 1, price: 1, category: 1 } },
    ],
    as: 'products',
  } },
])

MongoDB 聚合管道:从基础到进阶 示意图

$facet(单次查询中的多重聚合)

// 对同一输入运行多个子管道
 db.products.aggregate([
  { $match: { active: true } },
  
  { $facet: {
    // 总数和统计
    metadata: [
      { $count: 'total' },
    ],
    
    // 价格分布
    priceRanges: [
      { $bucket: {
        groupBy: '$price',
        boundaries: [0, 25, 50, 100, 200, 500],
        default: '500+',
        output: { count: { $sum: 1 }, products: { $push: '$name' } }
      } },
    ],
    
    // 热门类别
    categories: [
      { $group: { _id: '$category', count: { $sum: 1 } } },
      { $sort: { count: -1 } },
      { $limit: 5 },
    ],
  } },
])

时间序列聚合

// 最近30天的每日收入
 db.orders.aggregate([
  { $match: {
    createdAt: { $gte: new Date(Date.now() - 30 * 24 * 3600 * 1000) },
    status: 'completed',
  } },
  
  { $group: {
    _id: {
      year: { $year: '$createdAt' },
      month: { $month: '$createdAt' },
      day: { $dayOfMonth: '$createdAt' },
    },
    revenue: { $sum: '$total' },
    count: { $sum: 1 },
  } },
  
  { $sort: { '_id.year': 1, '_id.month': 1, '_id.day': 1 } },
  
  { $project: {
    date: {
      $dateFromParts: {
        year: '$_id.year',
        month: '$_id.month',
        day: '$_id.day',
      }
    },
    revenue: { $round: ['$revenue', 2] },
    count: 1,
    _id: 0,
  } },
])

MongoDB 聚合管道:从基础到进阶 示意图

窗口函数($setWindowFields)

// 累计收入(运行总计)
 db.orders.aggregate([
  { $match: { status: 'completed' } },
  
  { $setWindowFields: {
    partitionBy: '$userId',
    sortBy: { createdAt: 1 },
    output: {
      runningTotal: {
        $sum: '$total',
        window: { documents: ['unbounded', 'current'] },
      },
      previousOrderTotal: {
        $shift: { output: '$total', by: -1, default: 0 },
      },
      rankInUser: { $rank: {} },
    },
  } },
])

性能:在管道中使用索引

// $match 阶段必须放在首位才能使用索引
// 错误:
 db.orders.aggregate([
  { $group: { _id: '$userId', total: { $sum: '$amount' } } },
  { $match: { total: { $gt: 1000 } } },  // 太晚,无法使用索引!
])

// 正确:
 db.orders.aggregate([
  { $match: { createdAt: { $gte: new Date('2026-01-01') } } },  // 使用索引
  { $group: { _id: '$userId', total: { $sum: '$amount' } } },
])

// 使用 explain() 验证
 db.orders.explain('executionStats').aggregate([...])

Atlas Search(全文搜索)

// 需要 Atlas Search 索引
 db.products.aggregate([
  { $search: {
    index: 'products_search',
    compound: {
      must: [
        { text: { query: 'wireless headphones', path: ['name', 'description'] } },
      ],
      should: [
        { range: { path: 'rating', gte: 4.0, boost: { value: 2 } } },
      ],
      filter: [
        { equals: { path: 'inStock', value: true } },
      ],
    },
  } },
  
  { $project: {
    name: 1,
    price: 1,
    score: { $meta: 'searchScore' },
  } },
  
  { $sort: { score: -1 } },
])