跳到主要内容

聚合管道

聚合管道是 MongoDB 强大的数据分析和处理框架,允许你对数据进行多阶段转换和处理。与简单的查询不同,聚合管道可以对数据进行复杂的计算、转换和分析,是实现数据分析、报表统计等功能的利器。

聚合管道概述

聚合管道由多个阶段(Stage)组成,每个阶段对输入数据进行处理后传递给下一个阶段。这种设计类似于 Unix 管道的概念,数据流经各个阶段,每个阶段完成特定的处理任务。

为什么需要聚合管道?

普通的 find 查询虽然强大,但有以下限制:

  • 无法分组统计:如计算每个类别的商品数量
  • 无法转换数据:如将日期格式化为字符串
  • 无法关联查询:如关联订单和用户信息
  • 无法复杂计算:如计算移动平均值

聚合管道解决了这些问题,让 MongoDB 具备了完整的数据处理能力。

基本语法

db.collection.aggregate([
{ stage1: { ... } },
{ stage2: { ... } },
...
])

常用聚合阶段

$match - 过滤阶段

$match 用于过滤数据,应该放在管道前面以减少后续处理的数据量:

// 按状态过滤订单
db.orders.aggregate([
{ $match: { status: "completed" } }
])

// 多个条件
db.orders.aggregate([
{ $match: {
status: "completed",
createdAt: { $gte: new Date("2024-01-01") }
}}
])

// 应该在 $match 阶段使用索引
db.orders.aggregate([
{ $match: { status: "pending" } },
{ $group: { _id: "$customerId", count: { $sum: 1 } } }
])

$project - 投影阶段

控制输出字段的格式:

// 选择字段和重命名
db.orders.aggregate([
{ $project: {
orderId: 1,
customerName: "$customer.name",
totalAmount: "$total",
_id: 0
}}
])

// 添加计算字段
db.products.aggregate([
{ $project: {
name: 1,
price: 1,
tax: { $multiply: ["$price", 0.1] },
total: { $add: ["$price", { $multiply: ["$price", 0.1] }] }
}}
])

// 条件字段
db.orders.aggregate([
{ $project: {
orderId: 1,
status: 1,
label: {
$switch: {
branches: [
{ case: { $eq: ["$status", "completed"] }, then: "已完成" },
{ case: { $eq: ["$status", "pending"] }, then: "待处理" },
{ case: { $eq: ["$status", "cancelled"] }, then: "已取消" }
],
default: "未知"
}
}
}}
])

$group - 分组阶段

对文档进行分组计算:

// 按字段分组统计
db.orders.aggregate([
{ $group: {
_id: "$customerId",
totalOrders: { $sum: 1 },
totalAmount: { $sum: "$total" },
avgAmount: { $avg: "$total" },
minAmount: { $min: "$total" },
maxAmount: { $max: "$total" }
}}
])

// 多字段分组
db.orders.aggregate([
{ $group: {
_id: { customer: "$customerId", status: "$status" },
count: { $sum: 1 }
}}
])

// 按日期分组(年-月)
db.orders.aggregate([
{ $group: {
_id: { $dateToString: { format: "%Y-%m", date: "$createdAt" } },
totalAmount: { $sum: "$total" },
count: { $sum: 1 }
}},
{ $sort: { _id: 1 } }
])

$sort - 排序阶段

// 按金额降序排序
db.orders.aggregate([
{ $sort: { total: -1 } }
])

// 多字段排序
db.orders.aggregate([
{ $sort: { status: 1, createdAt: -1 } }
])

limitlimit 和 skip - 分页阶段

// 取前 10 个文档
db.orders.aggregate([
{ $limit: 10 }
])

// 分页:跳过前 20 条,取 10 条
db.orders.aggregate([
{ $skip: 20 },
{ $limit: 10 }
])

$unwind - 展开阶段

将数组字段展开为多个文档:

// 原始文档
// { _id: 1, items: ["a", "b", "c"] }

// 使用 $unwind
db.orders.aggregate([
{ $unwind: "$items" }
])

// 结果
// { _id: 1, items: "a" }
// { _id: 1, items: "b" }
// { _id: 1, items: "c" }

// preserveNullAndEmptyArrays: 保留空数组或缺失数组的文档
db.orders.aggregate([
{ $unwind: { path: "$items", preserveNullAndEmptyArrays: true } }
])

$lookup - 连接阶段

实现类似 SQL 的 JOIN 功能:

// 基本语法
{
$lookup: {
from: "products", // 连接的集合
localField: "productId", // 本地字段
foreignField: "_id", // 外部集合字段
as: "productInfo" // 输出字段名
}
}

// 示例:连接订单和产品
db.orders.aggregate([
{ $lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "productDetails"
}}
])

$addFields - 添加字段阶段

添加新字段而不移除现有字段:

db.orders.aggregate([
{ $addFields: {
shippingFee: 10,
totalWithShipping: { $add: ["$total", 10] },
year: { $year: "$createdAt" }
}}
])

$count - 计数阶段

db.orders.aggregate([
{ $match: { status: "completed" } },
{ $count: "completedOrders" }
])
// 输出: { "completedOrders": 150 }

聚合表达式

算术表达式

// $add, $subtract, $multiply, $divide, $mod
db.products.aggregate([
{ $project: {
name: 1,
price: 1,
discountedPrice: { $multiply: ["$price", 0.9] }
}}
])

字符串表达式

// $toUpper, $toLower, $concat, $substr, $trim
db.users.aggregate([
{ $project: {
username: { $toUpper: "$username" },
fullName: { $concat: ["$firstName", " ", "$lastName"] },
domain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1] }
}}
])

日期表达式

// $year, $month, $dayOfMonth, $hour, $minute, $second
db.orders.aggregate([
{ $project: {
orderId: 1,
createdAt: 1,
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" }
}}
])

// $dateToString 格式化日期
db.orders.aggregate([
{ $project: {
dateStr: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } }
}}
])

条件表达式

// $cond (三元运算符)
db.products.aggregate([
{ $project: {
name: 1,
price: 1,
label: { $cond: { if: { $gte: ["$price", 1000] }, then: "高价", else: "普通" } }
}}
])

// $switch
db.orders.aggregate([
{ $project: {
status: 1,
statusText: {
$switch: {
branches: [
{ case: { $eq: ["$status", 1] }, then: "待支付" },
{ case: { $eq: ["$status", 2] }, then: "已支付" },
{ case: { $eq: ["$status", 3] }, then: "已完成" }
],
default: "未知"
}
}
}}
])

// $ifNull
db.users.aggregate([
{ $project: {
displayName: { $ifNull: ["$nickname", "$username", "匿名用户"] }
}}
])

数组表达式

// $arrayElemAt, $size, $slice, $concatArrays, $filter
db.products.aggregate([
{ $project: {
name: 1,
firstImage: { $arrayElemAt: ["$images", 0] },
imageCount: { $size: "$images" },
preview: { $slice: ["$images", 0, 3] }
}}
])

// $filter - 过滤数组
db.orders.aggregate([
{ $project: {
orderId: 1,
expensiveItems: {
$filter: {
input: "$items",
as: "item",
cond: { $gte: ["$$item.price", 100] }
}
}
}}
])

实战示例

1. 销售报表

db.orders.aggregate([
// 过滤已完成订单
{ $match: { status: "completed" } },

// 解构订单项
{ $unwind: "$items" },

// 按日期分组统计
{ $group: {
_id: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } },
totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
orderCount: { $sum: 1 }
}},

// 排序
{ $sort: { _id: 1 } }
])

2. 用户消费分析

db.orders.aggregate([
// 按用户分组
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$total" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$total" },
lastOrderDate: { $max: "$createdAt" }
}},

// 添加用户等级
{ $addFields: {
level: {
$switch: {
branches: [
{ case: { $gte: ["$totalSpent", 10000] }, then: "VIP" },
{ case: { $gte: ["$totalSpent", 5000] }, then: "高级" },
{ case: { $gte: ["$totalSpent", 1000] }, then: "普通" }
],
default: "新人"
}
}
}},

// 按消费金额排序
{ $sort: { totalSpent: -1 } },

// 限制前 10 名
{ $limit: 10 }
])

3. 产品类别分析

db.products.aggregate([
// 过滤在售产品
{ $match: { status: "active" } },

// 按类别分组
{ $group: {
_id: "$category",
productCount: { $sum: 1 },
avgPrice: { $avg: "$price" },
minPrice: { $min: "$price" },
maxPrice: { $max: "$price" },
totalStock: { $sum: "$stock" }
}},

// 格式化输出
{ $project: {
_id: 0,
category: "$_id",
productCount: 1,
avgPrice: { $round: ["$avgPrice", 2] },
minPrice: 1,
maxPrice: 1,
totalStock: 1
}},

// 排序
{ $sort: { productCount: -1 } }
])

聚合性能优化

  1. 将 $match 放在前面:减少后续阶段处理的数据量
  2. 限制字段:使用 $project 只输出需要的字段
  3. 使用索引:确保 $match 阶段可以使用索引
  4. 避免 $unwind 大数组:大量数据会导致性能问题
  5. 合理使用 $group:避免产生过大的中间结果
// 不好的写法
db.orders.aggregate([
{ $project: { all: 1 } }, // 输出所有字段
{ $match: { status: "completed" } }, // 过滤放后面
{ $unwind: "$items" }, // 展开大数组
{ $sort: { total: -1 } } // 排序放最后
])

// 好的写法
db.orders.aggregate([
{ $match: { status: "completed" } }, // 过滤放前面
{ $project: { orderId: 1, total: 1, items: 1 } }, // 只输出需要的字段
{ $unwind: "$items" },
{ $sort: { total: -1 } }
])

小结

本章我们学习了:

  1. 聚合管道基础:阶段概念和语法
  2. 常用阶段match,match, project, group,group, sort, limit,limit, unwind, $lookup
  3. 聚合表达式:算术、字符串、日期、条件、数组表达式
  4. 实战示例:销售报表、用户分析、产品分析
  5. 性能优化:管道顺序、字段限制、索引使用