聚合管道
聚合管道是 MongoDB 强大的数据分析和处理框架,允许你对数据进行多阶段转换和处理。与简单的查询不同,聚合管道可以对数据进行复杂的计算、转换和分析,是实现数据分析、报表统计等功能的利器。
聚合管道概述
聚合管道由多个阶段(Stage)组成,每个阶段对输入数据进行处理后传递给下一个阶段。这种设计类似于 Unix 管道的概念,数据流经各个阶段,每个阶段完成特定的处理任务。
为什么需要聚合管道?
普通的 find 查询虽然强大,但有以下限制:
- 无法分组统计:如计算每个类别的商品数量
- 无法转换数据:如将日期格式化为字符串
- 无法关联查询:如关联订单和用户信息
- 无法复杂计算:如计算移动平均值
聚合管道解决了这些问题,让 MongoDB 具备了完整的数据处理能力。
基本语法
db.collection.aggregate([
{ stage1: { ... } },
{ stage2: { ... } },
...
])
常用聚合阶段
$match - 过滤阶段
$match 用于过滤数据,应该放在管道前面以减少后续处理的数据量:
// 按状态过滤订单
db.orders.aggregate([
{ $match: { status: "completed" } }
])
// 多个条件
db.orders.aggregate([
{ $match: {
status: "completed",
createdAt: { $gte: new Date("2024-01-01") }
}}
])
// 应该在 $match 阶段使用索引
db.orders.aggregate([
{ $match: { status: "pending" } },
{ $group: { _id: "$customerId", count: { $sum: 1 } } }
])
$project - 投影阶段
控制输出字段的格式:
// 选择字段和重命名
db.orders.aggregate([
{ $project: {
orderId: 1,
customerName: "$customer.name",
totalAmount: "$total",
_id: 0
}}
])
// 添加计算字段
db.products.aggregate([
{ $project: {
name: 1,
price: 1,
tax: { $multiply: ["$price", 0.1] },
total: { $add: ["$price", { $multiply: ["$price", 0.1] }] }
}}
])
// 条件字段
db.orders.aggregate([
{ $project: {
orderId: 1,
status: 1,
label: {
$switch: {
branches: [
{ case: { $eq: ["$status", "completed"] }, then: "已完成" },
{ case: { $eq: ["$status", "pending"] }, then: "待处理" },
{ case: { $eq: ["$status", "cancelled"] }, then: "已取消" }
],
default: "未知"
}
}
}}
])
$group - 分组阶段
对文档进行分组计算:
// 按字段分组统计
db.orders.aggregate([
{ $group: {
_id: "$customerId",
totalOrders: { $sum: 1 },
totalAmount: { $sum: "$total" },
avgAmount: { $avg: "$total" },
minAmount: { $min: "$total" },
maxAmount: { $max: "$total" }
}}
])
// 多字段分组
db.orders.aggregate([
{ $group: {
_id: { customer: "$customerId", status: "$status" },
count: { $sum: 1 }
}}
])
// 按日期分组(年-月)
db.orders.aggregate([
{ $group: {
_id: { $dateToString: { format: "%Y-%m", date: "$createdAt" } },
totalAmount: { $sum: "$total" },
count: { $sum: 1 }
}},
{ $sort: { _id: 1 } }
])
$sort - 排序阶段
// 按金额降序排序
db.orders.aggregate([
{ $sort: { total: -1 } }
])
// 多字段排序
db.orders.aggregate([
{ $sort: { status: 1, createdAt: -1 } }
])
skip - 分页阶段
// 取前 10 个文档
db.orders.aggregate([
{ $limit: 10 }
])
// 分页:跳过前 20 条,取 10 条
db.orders.aggregate([
{ $skip: 20 },
{ $limit: 10 }
])
$unwind - 展开阶段
将数组字段展开为多个文档:
// 原始文档
// { _id: 1, items: ["a", "b", "c"] }
// 使用 $unwind
db.orders.aggregate([
{ $unwind: "$items" }
])
// 结果
// { _id: 1, items: "a" }
// { _id: 1, items: "b" }
// { _id: 1, items: "c" }
// preserveNullAndEmptyArrays: 保留空数组或缺失数组的文档
db.orders.aggregate([
{ $unwind: { path: "$items", preserveNullAndEmptyArrays: true } }
])
$lookup - 连接阶段
实现类似 SQL 的 JOIN 功能:
// 基本语法
{
$lookup: {
from: "products", // 连接的集合
localField: "productId", // 本地字段
foreignField: "_id", // 外部集合字段
as: "productInfo" // 输出字段名
}
}
// 示例:连接订单和产品
db.orders.aggregate([
{ $lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "productDetails"
}}
])
$addFields - 添加字段阶段
添加新字段而不移除现有字段:
db.orders.aggregate([
{ $addFields: {
shippingFee: 10,
totalWithShipping: { $add: ["$total", 10] },
year: { $year: "$createdAt" }
}}
])
$count - 计数阶段
db.orders.aggregate([
{ $match: { status: "completed" } },
{ $count: "completedOrders" }
])
// 输出: { "completedOrders": 150 }
聚合表达式
算术表达式
// $add, $subtract, $multiply, $divide, $mod
db.products.aggregate([
{ $project: {
name: 1,
price: 1,
discountedPrice: { $multiply: ["$price", 0.9] }
}}
])
字符串表达式
// $toUpper, $toLower, $concat, $substr, $trim
db.users.aggregate([
{ $project: {
username: { $toUpper: "$username" },
fullName: { $concat: ["$firstName", " ", "$lastName"] },
domain: { $substr: ["$email", { $indexOfBytes: ["$email", "@"] }, -1] }
}}
])
日期表达式
// $year, $month, $dayOfMonth, $hour, $minute, $second
db.orders.aggregate([
{ $project: {
orderId: 1,
createdAt: 1,
year: { $year: "$createdAt" },
month: { $month: "$createdAt" },
day: { $dayOfMonth: "$createdAt" }
}}
])
// $dateToString 格式化日期
db.orders.aggregate([
{ $project: {
dateStr: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } }
}}
])
条件表达式
// $cond (三元运算符)
db.products.aggregate([
{ $project: {
name: 1,
price: 1,
label: { $cond: { if: { $gte: ["$price", 1000] }, then: "高价", else: "普通" } }
}}
])
// $switch
db.orders.aggregate([
{ $project: {
status: 1,
statusText: {
$switch: {
branches: [
{ case: { $eq: ["$status", 1] }, then: "待支付" },
{ case: { $eq: ["$status", 2] }, then: "已支付" },
{ case: { $eq: ["$status", 3] }, then: "已完成" }
],
default: "未知"
}
}
}}
])
// $ifNull
db.users.aggregate([
{ $project: {
displayName: { $ifNull: ["$nickname", "$username", "匿名用户"] }
}}
])
数组表达式
// $arrayElemAt, $size, $slice, $concatArrays, $filter
db.products.aggregate([
{ $project: {
name: 1,
firstImage: { $arrayElemAt: ["$images", 0] },
imageCount: { $size: "$images" },
preview: { $slice: ["$images", 0, 3] }
}}
])
// $filter - 过滤数组
db.orders.aggregate([
{ $project: {
orderId: 1,
expensiveItems: {
$filter: {
input: "$items",
as: "item",
cond: { $gte: ["$$item.price", 100] }
}
}
}}
])
实战示例
1. 销售报表
db.orders.aggregate([
// 过滤已完成订单
{ $match: { status: "completed" } },
// 解构订单项
{ $unwind: "$items" },
// 按日期分组统计
{ $group: {
_id: { $dateToString: { format: "%Y-%m-%d", date: "$createdAt" } },
totalRevenue: { $sum: { $multiply: ["$items.price", "$items.quantity"] } },
orderCount: { $sum: 1 }
}},
// 排序
{ $sort: { _id: 1 } }
])
2. 用户消费分析
db.orders.aggregate([
// 按用户分组
{ $group: {
_id: "$customerId",
totalSpent: { $sum: "$total" },
orderCount: { $sum: 1 },
avgOrderValue: { $avg: "$total" },
lastOrderDate: { $max: "$createdAt" }
}},
// 添加用户等级
{ $addFields: {
level: {
$switch: {
branches: [
{ case: { $gte: ["$totalSpent", 10000] }, then: "VIP" },
{ case: { $gte: ["$totalSpent", 5000] }, then: "高级" },
{ case: { $gte: ["$totalSpent", 1000] }, then: "普通" }
],
default: "新人"
}
}
}},
// 按消费金额排序
{ $sort: { totalSpent: -1 } },
// 限制前 10 名
{ $limit: 10 }
])
3. 产品类别分析
db.products.aggregate([
// 过滤在售产品
{ $match: { status: "active" } },
// 按类别分组
{ $group: {
_id: "$category",
productCount: { $sum: 1 },
avgPrice: { $avg: "$price" },
minPrice: { $min: "$price" },
maxPrice: { $max: "$price" },
totalStock: { $sum: "$stock" }
}},
// 格式化输出
{ $project: {
_id: 0,
category: "$_id",
productCount: 1,
avgPrice: { $round: ["$avgPrice", 2] },
minPrice: 1,
maxPrice: 1,
totalStock: 1
}},
// 排序
{ $sort: { productCount: -1 } }
])
聚合性能优化
- 将 $match 放在前面:减少后续阶段处理的数据量
- 限制字段:使用 $project 只输出需要的字段
- 使用索引:确保 $match 阶段可以使用索引
- 避免 $unwind 大数组:大量数据会导致性能问题
- 合理使用 $group:避免产生过大的中间结果
// 不好的写法
db.orders.aggregate([
{ $project: { all: 1 } }, // 输出所有字段
{ $match: { status: "completed" } }, // 过滤放后面
{ $unwind: "$items" }, // 展开大数组
{ $sort: { total: -1 } } // 排序放最后
])
// 好的写法
db.orders.aggregate([
{ $match: { status: "completed" } }, // 过滤放前面
{ $project: { orderId: 1, total: 1, items: 1 } }, // 只输出需要的字段
{ $unwind: "$items" },
{ $sort: { total: -1 } }
])
小结
本章我们学习了:
- 聚合管道基础:阶段概念和语法
- 常用阶段:project, sort, unwind, $lookup
- 聚合表达式:算术、字符串、日期、条件、数组表达式
- 实战示例:销售报表、用户分析、产品分析
- 性能优化:管道顺序、字段限制、索引使用