聚合
聚合操作处理多个文档并返回计算结果。您可以使用聚合操作来:
-
将多个文档中的值分组在一起。
-
对分组数据执行操作以返回单个结果。
-
分析数据随时间的变化。
要执行聚合操作,您可以使用:
-
聚合管道
-
单一目的聚合方法
-
Map-reduce 函数
聚合管道
聚合管道由一个或多个处理文档的阶段组成:
-
除$out、$merge、$geoNear和$changeStream阶段之外的所有阶段都可以在管道中出现多次。
-
每个阶段都对输入文档执行操作。例如,阶段可以过滤文档、对文档进行分组以及计算值。
-
从一个阶段输出的文档将输入到下一阶段。
-
聚合管道可以返回文档组的结果。例如,返回总计、平均值、最大值和最小值。
聚合阶段
在db.collection.aggregate()方法和db.aggregate()方法中,管道阶段出现在数组中。文档按顺序经过这些阶段。
db.collection.aggregate( [ { <stage> }, ... ] )
常见几种阶段:
阶段 | 描述 |
---|---|
$group | 按指定的标识符表达式对输入文档进行分组,并在指定的情况下对每个组应用累加器表达式。消耗所有输入文档,并为每个不同的组输出一个文档。输出文档只包含标识符字段,如果指定的话,还包含累积字段。 |
$limit | 将未修改的前n 个文档传递到管道,其中n是指定的限制。对于每个输入文档,输出一个文档(前n 个文档)或零个文档(前n 个文档之后)。 |
$match | 过滤文档流以仅允许匹配的文档未经修改地传递到下一个管道阶段。 $match使用标准 MongoDB 查询。对于每个输入文档,输出一个文档(匹配)或零个文档(不匹配)。 |
$merge | 将聚合管道的结果文档写入集合。该阶段可以将结果合并(插入新文档、合并文档、替换文档、保留现有文档、操作失败、使用自定义更新管道处理文档)结果到输出集合中。要使用该$merge阶段,它必须是管道中的最后一个阶段。 |
$out | 将聚合管道的结果文档写入集合。要使用该$out阶段,它必须是管道中的最后一个阶段。 |
$project | 重塑流中的每个文档,例如通过添加新字段或删除现有字段。对于每个输入文档,输出一个文档。 |
$sort | 按指定的排序键对文档流重新排序。仅顺序发生变化;文件保持不变。对于每个输入文档,输出一个文档。 |
$unwind | 从输入文档解构数组字段以输出每个元素的文档。每个输出文档都用一个元素值替换数组。对于每个输入文档,输出n个文档,其中n是数组元素的数量,对于空数组可以为零。 |
测试数据如下:
sit_rs1:PRIMARY> db.orders.find()
{ "_id" : 4, "cust_id" : "B", "ord_date" : ISODate("2023-06-18T00:00:00Z"), "price" : 26, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 6, "cust_id" : "C", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 38, "items" : [ { "sku" : "carrots", "qty" : 10, "price" : 1 }, { "sku" : "apples", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 1, "cust_id" : "A", "ord_date" : ISODate("2023-06-01T00:00:00Z"), "price" : 15, "items" : [ { "sku" : "apple", "qty" : 5, "price" : 2.5 }, { "sku" : "apples", "qty" : 5, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 2, "cust_id" : "A", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 60, "items" : [ { "sku" : "apple", "qty" : 8, "price" : 2.5 }, { "sku" : "banana", "qty" : 5, "price" : 10 } ], "status" : "1" }
{ "_id" : 9, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 51, "items" : [ { "sku" : "carrots", "qty" : 5, "price" : 1 }, { "sku" : "apples", "qty" : 10, "price" : 2.5 }, { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 3, "cust_id" : "B", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 55, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 }, { "sku" : "pears", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 5, "cust_id" : "B", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 40, "items" : [ { "sku" : "banana", "qty" : 5, "price" : 10 } ], "status" : "1" }
{ "_id" : 7, "cust_id" : "C", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 21, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 8, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 76, "items" : [ { "sku" : "banana", "qty" : 5, "price" : 10 }, { "sku" : "apples", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 10, "cust_id" : "D", "ord_date" : ISODate("2023-06-23T00:00:00Z"), "price" : 23, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
示例一:
查询客户: “D” 的订单,按日期分组,返回每天的订单总价
$match 阶段:
- 匹配 “cust_id” : “D” 的记录。
- 将过滤后的文档输出到 group 阶段。
$group 阶段:
- 输入的文档按 ord_date 进行分组。
- 使用 $sum 运算符计算价格总和,并将结果存放聚合管道 sumPrice 字段中。
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { "cust_id" : "D" } },
... { $group: { _id: "$ord_date", sumPrice: { $sum: "$price" } } }
... ] )
{ "_id" : ISODate("2023-06-20T00:00:00Z"), "sumPrice" : 127 }
{ "_id" : ISODate("2023-06-23T00:00:00Z"), "sumPrice" : 23 }
示例二
按照客户进行分组,统计每个客户的订单总价,并总价降序排序。取前三名的客户,如下
$group 阶段:
- 输入的文档按 cust_id进行分组。
- 使用 $sum 运算符计算价格总和,并将结果存放聚合管道 sumPrice 字段中。
$sort 阶段:
- 按 sumPrice 字段以相反的顺序对这些文档进行排序。
$limit 阶段:
- 运算 $limit 仅包含前 3 个结果文档。
sit_rs1:PRIMARY> db.orders.aggregate(
... [
... { $group : { _id : "$cust_id" , sumPrice : { $sum : "$price" } } },
... { $sort : { sumPrice : -1 } },
... { $limit : 3 }
... ]
... )
{ "_id" : "D", "sumPrice" : 150 }
{ "_id" : "B", "sumPrice" : 121 }
{ "_id" : "A", "sumPrice" : 75 }
示例三
按照客户进行分组,统计每个客户的订单总价,并输出到结果集合:agg_cust_id_1 ,如下
$group 阶段:
- 输入的文档按 cust_id进行分组。
- 使用 $sum 运算符计算价格总和,并将结果存放聚合管道 value字段中。
$out 阶段
- 将输出写入集合 agg_cust_id_1
sit_rs1:PRIMARY> db.orders.aggregate([
... { $group: { _id: "$cust_id", value: { $sum: "$price" } } },
... { $out: "agg_cust_id_1" }
... ])
# 查询 agg_cust_id_1 集合以验证结果:
sit_rs1:PRIMARY> db.agg_cust_id_1.find()
{ "_id" : "A", "value" : 75 }
{ "_id" : "D", "value" : 150 }
{ "_id" : "B", "value" : 121 }
{ "_id" : "C", "value" : 59 }
sit_rs1:PRIMARY> db.orders.find({}, { "cust_id": 1, "price": 1})
{ "_id" : 4, "cust_id" : "B", "price" : 26 }
{ "_id" : 6, "cust_id" : "C", "price" : 38 }
{ "_id" : 1, "cust_id" : "A", "price" : 15 }
{ "_id" : 2, "cust_id" : "A", "price" : 60 }
{ "_id" : 9, "cust_id" : "D", "price" : 51 }
{ "_id" : 3, "cust_id" : "B", "price" : 55 }
{ "_id" : 5, "cust_id" : "B", "price" : 40 }
{ "_id" : 7, "cust_id" : "C", "price" : 21 }
{ "_id" : 8, "cust_id" : "D", "price" : 76 }
{ "_id" : 10, "cust_id" : "D", "price" : 23 }
示例四
计算某个SKU单品总共下了多少单,销售总数量,及平均每单数量是多少,如下:
首先查询日期大于等于 2023-03-01 的订单, 按数组 items 字段 分解文档(即如果数组包含N个元素,将分解为N个文档)。 再对 items.sku进行分组统计,并计算 qty 的数量总和。 orders_ids 把相同组的订单ID号添加到数组。
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { ord_date: { $gte: new Date("2023-03-01") } } },
... { $unwind: "$items" },
... { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
... { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
... { $merge: { into: "agg_sku", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
... ] )
# 查询 agg_sku 集合以验证结果:
sit_rs1:PRIMARY> db.agg_sku.find()
{ "_id" : "apple", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "banana", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
接下来,各阶段执行过程详细如下:
$match阶段
- 该阶段仅选择那些大于或等于2023-03-01 的文档,输出如下:
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { ord_date: { $gte: new Date("2023-03-01") } } }
... ] )
{ "_id" : 2, "cust_id" : "A", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 60, "items" : [ { "sku" : "apple", "qty" : 8, "price" : 2.5 }, { "sku" : "banana", "qty" : 5, "price" : 10 } ], "status" : "1" }
{ "_id" : 3, "cust_id" : "B", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 55, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 }, { "sku" : "pears", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 7, "cust_id" : "C", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 21, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 9, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 51, "items" : [ { "sku" : "carrots", "qty" : 5, "price" : 1 }, { "sku" : "apples", "qty" : 10, "price" : 2.5 }, { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 1, "cust_id" : "A", "ord_date" : ISODate("2023-06-01T00:00:00Z"), "price" : 15, "items" : [ { "sku" : "apple", "qty" : 5, "price" : 2.5 }, { "sku" : "apples", "qty" : 5, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 4, "cust_id" : "B", "ord_date" : ISODate("2023-06-18T00:00:00Z"), "price" : 26, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 10, "cust_id" : "D", "ord_date" : ISODate("2023-06-23T00:00:00Z"), "price" : 23, "items" : [ { "sku" : "apple", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 6, "cust_id" : "C", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 38, "items" : [ { "sku" : "carrots", "qty" : 10, "price" : 1 }, { "sku" : "apples", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
{ "_id" : 5, "cust_id" : "B", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 40, "items" : [ { "sku" : "banana", "qty" : 5, "price" : 10 } ], "status" : "1" }
{ "_id" : 8, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 76, "items" : [ { "sku" : "banana", "qty" : 5, "price" : 10 }, { "sku" : "apples", "qty" : 10, "price" : 2.5 } ], "status" : "1" }
$unwind阶段:
- 该阶段按 items 数组字段分解文档,items为每个数组元素输出一个文档。例如:
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { ord_date: { $gte: new Date("2023-03-01") } } },
... { $unwind: "$items" }
... ] )
{ "_id" : 2, "cust_id" : "A", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 60, "items" : { "sku" : "apple", "qty" : 8, "price" : 2.5 }, "status" : "1" }
{ "_id" : 2, "cust_id" : "A", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 60, "items" : { "sku" : "banana", "qty" : 5, "price" : 10 }, "status" : "1" }
{ "_id" : 3, "cust_id" : "B", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 55, "items" : { "sku" : "apple", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 3, "cust_id" : "B", "ord_date" : ISODate("2023-06-08T00:00:00Z"), "price" : 55, "items" : { "sku" : "pears", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 7, "cust_id" : "C", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 21, "items" : { "sku" : "apple", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 9, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 51, "items" : { "sku" : "carrots", "qty" : 5, "price" : 1 }, "status" : "1" }
{ "_id" : 9, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 51, "items" : { "sku" : "apples", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 9, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 51, "items" : { "sku" : "apple", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 1, "cust_id" : "A", "ord_date" : ISODate("2023-06-01T00:00:00Z"), "price" : 15, "items" : { "sku" : "apple", "qty" : 5, "price" : 2.5 }, "status" : "1" }
{ "_id" : 1, "cust_id" : "A", "ord_date" : ISODate("2023-06-01T00:00:00Z"), "price" : 15, "items" : { "sku" : "apples", "qty" : 5, "price" : 2.5 }, "status" : "1" }
{ "_id" : 4, "cust_id" : "B", "ord_date" : ISODate("2023-06-18T00:00:00Z"), "price" : 26, "items" : { "sku" : "apple", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 10, "cust_id" : "D", "ord_date" : ISODate("2023-06-23T00:00:00Z"), "price" : 23, "items" : { "sku" : "apple", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 6, "cust_id" : "C", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 38, "items" : { "sku" : "carrots", "qty" : 10, "price" : 1 }, "status" : "1" }
{ "_id" : 6, "cust_id" : "C", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 38, "items" : { "sku" : "apples", "qty" : 10, "price" : 2.5 }, "status" : "1" }
{ "_id" : 5, "cust_id" : "B", "ord_date" : ISODate("2023-06-19T00:00:00Z"), "price" : 40, "items" : { "sku" : "banana", "qty" : 5, "price" : 10 }, "status" : "1" }
{ "_id" : 8, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 76, "items" : { "sku" : "banana", "qty" : 5, "price" : 10 }, "status" : "1" }
{ "_id" : 8, "cust_id" : "D", "ord_date" : ISODate("2023-06-20T00:00:00Z"), "price" : 76, "items" : { "sku" : "apples", "qty" : 10, "price" : 2.5 }, "status" : "1" }
$group阶段
- 该阶段按 items.sku 进行分组,计算每个 sku 的总数量。
- order _ids 数组包括不相同的 items.sku 元素。
- $addToSet : 返回所有唯一值的数组,这些值来自每个文档(使用相同字段group by 的一组文档中)。输出数组中元素的顺序未指定。
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { ord_date: { $gte: new Date("2023-03-01") } } },
... { $unwind: "$items" },
... { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } }
... ] )
{ "_id" : "carrots", "qty" : 15, "orders_ids" : [ 6, 9 ] }
{ "_id" : "apples", "qty" : 35, "orders_ids" : [ 1, 6, 9, 8 ] }
{ "_id" : "banana", "qty" : 15, "orders_ids" : [ 8, 5, 2 ] }
{ "_id" : "apple", "qty" : 63, "orders_ids" : [ 10, 1, 7, 4, 9, 2, 3 ] }
{ "_id" : "pears", "qty" : 10, "orders_ids" : [ 3 ] }
$project阶段
- 该阶段重塑输出文档以镜像映射缩减的输出,使其具有两个字段_id和 value。
- 运算符 $divide 计算 订单平均数量。 即将一个数字除以另一个数字并返回结果。将参数传递给在一个数组中。
- 使用 $size 来确定数组orders_ids的大小。
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { ord_date: { $gte: new Date("2023-03-01") } } },
... { $unwind: "$items" },
... { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
... { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } }
... ] )
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
{ "_id" : "banana", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "apple", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
$merge阶段
最后,$merge将输出写入集合 agg_sku 。如果现有文档_id与新结果具有相同的键,则该操作将覆盖现有文档。如果不存在具有相同键的现有文档,则该操作将插入该文档。
sit_rs1:PRIMARY> db.orders.aggregate( [
... { $match: { ord_date: { $gte: new Date("2023-03-01") } } },
... { $unwind: "$items" },
... { $group: { _id: "$items.sku", qty: { $sum: "$items.qty" }, orders_ids: { $addToSet: "$_id" } } },
... { $project: { value: { count: { $size: "$orders_ids" }, qty: "$qty", avg: { $divide: [ "$qty", { $size: "$orders_ids" } ] } } } },
... { $merge: { into: "agg_sku", on: "_id", whenMatched: "replace", whenNotMatched: "insert" } }
... ] )
# 该操作返回以下文档:
sit_rs1:PRIMARY> db.agg_sku.find().sort( { _id: 1 } )
{ "_id" : "apple", "value" : { "count" : 7, "qty" : 63, "avg" : 9 } }
{ "_id" : "apples", "value" : { "count" : 4, "qty" : 35, "avg" : 8.75 } }
{ "_id" : "banana", "value" : { "count" : 3, "qty" : 15, "avg" : 5 } }
{ "_id" : "carrots", "value" : { "count" : 2, "qty" : 15, "avg" : 7.5 } }
{ "_id" : "pears", "value" : { "count" : 1, "qty" : 10, "avg" : 10 } }
聚合管道的限制
聚合管道对值类型和结果大小有一些限制:
-
该aggregate命令可以返回游标或将结果存储在集合中。结果集中的每个文档均受 16 MB BSON 文档大小限制。如果任何单个文档超过BSON 文档大小限制,聚合就会产生错误。该限制仅适用于返回的文件。在管道处理过程中,文档可能会超过此大小。该 db.collection.aggregate()方法默认返回一个游标。
-
每个单独的管道阶段的 RAM 限制为 100 MB。默认情况下,如果某个阶段超过此限制,MongoDB 会生成错误。$search
聚合阶段不限于 100 MB RAM,因为它在单独的进程中运行文章来源:https://www.toymoban.com/news/detail-614696.html -
如果管道阶段之一的结果$sort超出限制,请考虑添加 $limit stage 。文章来源地址https://www.toymoban.com/news/detail-614696.html
到了这里,关于Mongodb 多文档聚合操作处理方法三(聚合管道)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!