正在加载,请稍候…

MongoDB 聚合管道:复杂数据处理

掌握 MongoDB 聚合管道,通过多个阶段处理文档,实现数据过滤、分组、连接、分面和时间序列分析等复杂操作。

MongoDB 聚合管道:复杂数据处理

MongoDB 聚合管道

聚合管道通过多个阶段处理文档,在每个步骤中转换数据。

基本管道

db.orders.aggregate([
  // 阶段 1:过滤文档
  { $match: { status: 'completed', createdAt: { $gte: new Date('2024-01-01') } } },

  // 阶段 2:分组并计算
  {
    $group: {
      _id: '$userId',
      totalSpent: { $sum: '$total' },
      orderCount: { $sum: 1 },
      avgOrderValue: { $avg: '$total' },
      lastOrder: { $max: '$createdAt' },
    }
  },

  // 阶段 3:过滤分组结果
  { $match: { totalSpent: { $gte: 100 } } },

  // 阶段 4:排序
  { $sort: { totalSpent: -1 } },

  // 阶段 5:限制结果
  { $limit: 10 },

  // 阶段 6:重塑输出
  {
    $project: {
      userId: '$_id',
      _id: 0,
      totalSpent: { $round: ['$totalSpent', 2] },
      orderCount: 1,
      avgOrderValue: { $round: ['$avgOrderValue', 2] },
    }
  }
]);

MongoDB 聚合管道:复杂数据处理 插图

$lookup(JOIN 等效)

db.orders.aggregate([
  { $match: { status: 'pending' } },
  {
    $lookup: {
      from: 'users',
      localField: 'userId',
      foreignField: '_id',
      as: 'user',
    }
  },
  { $unwind: '$user' },  // 将 user 数组展平为对象
  {
    $project: {
      orderId: '$_id',
      userName: '$user.name',
      userEmail: '$user.email',
      total: 1,
      status: 1,
    }
  }
]);

// 使用管道的进阶 lookup
db.orders.aggregate([
  {
    $lookup: {
      from: 'orderItems',
      let: { orderId: '$_id' },
      pipeline: [
        { $match: { $expr: { $eq: ['$orderId', '$orderId'] } } },
        { $lookup: { from: 'products', localField: 'productId', foreignField: '_id', as: 'product' } },
        { $unwind: '$product' },
        { $project: { name: '$product.name', quantity: 1, price: 1 } },
      ],
      as: 'items',
    }
  }
]);

MongoDB 聚合管道:复杂数据处理 插图

$facet(多管道)

// 在单个查询中获取结果和计数
db.products.aggregate([
  { $match: { active: true } },
  {
    $facet: {
      // 分支 1:分页结果
      results: [
        { $sort: { price: 1 } },
        { $skip: 0 },
        { $limit: 20 },
        { $project: { name: 1, price: 1, category: 1 } },
      ],
      // 分支 2:总计数
      totalCount: [
        { $count: 'count' }
      ],
      // 分支 3:按类别分布
      byCategory: [
        { $group: { _id: '$category', count: { $sum: 1 } } },
        { $sort: { count: -1 } },
      ],
      // 分支 4:价格统计
      priceStats: [
        { $group: {
          _id: null,
          min: { $min: '$price' },
          max: { $max: '$price' },
          avg: { $avg: '$price' },
        }}
      ],
    }
  }
]);

MongoDB 聚合管道:复杂数据处理 插图

时间序列聚合

// 最近 30 天的每日收入
db.orders.aggregate([
  {
    $match: {
      createdAt: { $gte: new Date(Date.now() - 30 * 24 * 60 * 60 * 1000) },
      status: 'completed',
    }
  },
  {
    $group: {
      _id: {
        year: { $year: '$createdAt' },
        month: { $month: '$createdAt' },
        day: { $dayOfMonth: '$createdAt' },
      },
      revenue: { $sum: '$total' },
      orders: { $sum: 1 },
    }
  },
  { $sort: { '_id.year': 1, '_id.month': 1, '_id.day': 1 } },
  {
    $project: {
      date: {
        $dateToString: {
          format: '%Y-%m-%d',
          date: {
            $dateFromParts: {
              year: '$_id.year', month: '$_id.month', day: '$_id.day'
            }
          }
        }
      },
      revenue: { $round: ['$revenue', 2] },
      orders: 1,
    }
  }
]);

文本搜索

// 创建文本索引
db.articles.createIndex({ title: 'text', body: 'text' });

// 带评分的文本搜索
db.articles.aggregate([
  { $match: { $text: { $search: 'mongodb performance optimization' } } },
  { $addFields: { score: { $meta: 'textScore' } } },
  { $sort: { score: { $meta: 'textScore' } } },
  { $limit: 10 },
]);

在聚合中使用 explain() 来理解查询计划并确保使用了索引。