Skip to content

Document Store: Aggregation Framework

Document Store: Aggregation Framework

Part of: HeliosDB Document Store User Guide

Aggregation Framework

The aggregation framework processes documents through a pipeline of stages, transforming data step by step.

Pipeline Concept

// Input documents → Stage 1 → Stage 2 → Stage 3 → Output results
//
// Example: Top 5 customers by total spend
// Documents → $match (filter) → $group (aggregate) → $sort → $limit → Results

$match (Filtering)

Filter Documents Early:

let pipeline = vec![
doc! {
"$match": {
"status": "completed",
"orderDate": { "$gte": "2025-01-01" }
}
}
];
let results = collection.aggregate(pipeline, None).await?;

HeliosDB Native:

use heliosdb_document::{AggregationStage, Filter};
let pipeline = vec![
AggregationStage::Match {
filter: Filter::eq("status", json!("completed")),
},
];
let results = store.aggregate(&Collection::new("orders"), pipeline)?;

Best Practice: Use $match as early as possible to reduce documents in pipeline.

$group (Grouping and Aggregation)

Group by Single Field:

let pipeline = vec![
doc! {
"$group": {
"_id": "$category", // Group by category
"count": { "$sum": 1 },
"totalSales": { "$sum": "$amount" },
"avgSales": { "$avg": "$amount" }
}
}
];

Group by Multiple Fields:

let pipeline = vec![
doc! {
"$group": {
"_id": {
"category": "$category",
"region": "$region"
},
"total": { "$sum": "$amount" }
}
}
];

HeliosDB Native:

use std::collections::HashMap;
use heliosdb_document::AggregationOp;
let mut fields = HashMap::new();
fields.insert("count".to_string(), AggregationOp::Count);
fields.insert("total".to_string(), AggregationOp::Sum {
field: "amount".to_string(),
});
fields.insert("avg".to_string(), AggregationOp::Avg {
field: "amount".to_string(),
});
let pipeline = vec![
AggregationStage::Group {
id: Some(json!("$category")),
fields,
},
];

Aggregation Operators:

OperatorDescriptionExample
$sumSum values{ "$sum": "$amount" }
$avgAverage{ "$avg": "$age" }
$minMinimum{ "$min": "$price" }
$maxMaximum{ "$max": "$price" }
$firstFirst value{ "$first": "$name" }
$lastLast value{ "$last": "$name" }
$pushArray of values{ "$push": "$item" }
$addToSetUnique array{ "$addToSet": "$tag" }

$project (Field Selection and Transformation)

Include/Exclude Fields:

let pipeline = vec![
doc! {
"$project": {
"name": 1, // Include
"email": 1, // Include
"age": 1, // Include
"_id": 0 // Exclude
}
}
];

Computed Fields:

let pipeline = vec![
doc! {
"$project": {
"name": 1,
"fullName": {
"$concat": ["$firstName", " ", "$lastName"]
},
"ageGroup": {
"$cond": {
"if": { "$gte": ["$age", 65] },
"then": "senior",
"else": {
"$cond": {
"if": { "$gte": ["$age", 18] },
"then": "adult",
"else": "minor"
}
}
}
}
}
}
];

HeliosDB Native:

use heliosdb_document::ProjectionValue;
let mut fields = HashMap::new();
fields.insert("name".to_string(), ProjectionValue::Include(true));
fields.insert("email".to_string(), ProjectionValue::Include(true));
fields.insert("_id".to_string(), ProjectionValue::Include(false));
let pipeline = vec![
AggregationStage::Project { fields },
];

$sort (Ordering)

let pipeline = vec![
doc! {
"$sort": {
"totalSales": -1, // Descending (highest first)
"customerName": 1 // Then ascending by name
}
}
];

HeliosDB Native:

use heliosdb_document::SortOrder;
let pipeline = vec![
AggregationStage::Sort {
fields: vec![
("totalSales".to_string(), SortOrder::Desc),
("customerName".to_string(), SortOrder::Asc),
],
},
];

$limit and $skip (Pagination)

// Get second page (items 11-20)
let pipeline = vec![
doc! { "$skip": 10 },
doc! { "$limit": 10 },
];

HeliosDB Native:

let pipeline = vec![
AggregationStage::Skip { count: 10 },
AggregationStage::Limit { count: 10 },
];

$unwind (Array Flattening)

Explode Array into Multiple Documents:

// Input: { "name": "Alice", "skills": ["Rust", "MongoDB", "Docker"] }
// Output (3 docs):
// { "name": "Alice", "skills": "Rust" }
// { "name": "Alice", "skills": "MongoDB" }
// { "name": "Alice", "skills": "Docker" }
let pipeline = vec![
doc! {
"$unwind": "$skills"
}
];

With Preserve Null:

let pipeline = vec![
doc! {
"$unwind": {
"path": "$skills",
"preserveNullAndEmptyArrays": true // Keep docs with no skills
}
}
];

HeliosDB Native:

let pipeline = vec![
AggregationStage::Unwind {
field: "skills".to_string(),
preserve_null: true,
},
];

$lookup (Joins)

Simple Join:

// Join orders with products
let pipeline = vec![
doc! {
"$lookup": {
"from": "products", // Collection to join
"localField": "productId", // Field in orders
"foreignField": "_id", // Field in products
"as": "product" // Output array field
}
}
];

Complex Join with Pipeline:

let pipeline = vec![
doc! {
"$lookup": {
"from": "products",
"let": { "prodId": "$productId", "qty": "$quantity" },
"pipeline": [
{
"$match": {
"$expr": {
"$and": [
{ "$eq": ["$_id", "$$prodId"] },
{ "$gte": ["$stock", "$$qty"] }
]
}
}
}
],
"as": "product"
}
}
];

HeliosDB Native:

let pipeline = vec![
AggregationStage::Lookup {
from: "products".to_string(),
local_field: "productId".to_string(),
foreign_field: "_id".to_string(),
as_field: "product".to_string(),
},
];

$facet (Multi-Faceted Aggregation)

Multiple Independent Pipelines:

let pipeline = vec![
doc! {
"$facet": {
"categoryCounts": [
{ "$group": { "_id": "$category", "count": { "$sum": 1 } } },
{ "$sort": { "count": -1 } }
],
"priceRanges": [
{
"$bucket": {
"groupBy": "$price",
"boundaries": [0, 50, 100, 200, 500],
"default": "Other",
"output": { "count": { "$sum": 1 } }
}
}
],
"topProducts": [
{ "$sort": { "sales": -1 } },
{ "$limit": 10 }
]
}
}
];

$bucket (Bucketing)

Group by Ranges:

let pipeline = vec![
doc! {
"$bucket": {
"groupBy": "$age",
"boundaries": [0, 18, 30, 50, 65, 120],
"default": "Other",
"output": {
"count": { "$sum": 1 },
"avgIncome": { "$avg": "$income" }
}
}
}
];
// Results:
// { "_id": 0, "count": 5, "avgIncome": 0 } // 0-17 years
// { "_id": 18, "count": 150, "avgIncome": 45000 } // 18-29 years
// { "_id": 30, "count": 200, "avgIncome": 75000 } // 30-49 years
// { "_id": 50, "count": 100, "avgIncome": 90000 } // 50-64 years
// { "_id": 65, "count": 45, "avgIncome": 65000 } // 65-119 years

$count

let pipeline = vec![
doc! { "$match": { "status": "active" } },
doc! { "$count": "totalActive" }
];
// Result: { "totalActive": 1234 }

HeliosDB Native:

let pipeline = vec![
AggregationStage::Count {
field: "total".to_string(),
},
];

Complete Example: Sales Analytics

// Calculate monthly sales by region with top products
let pipeline = vec![
// Filter to current year
doc! {
"$match": {
"orderDate": { "$gte": "2025-01-01" },
"status": "completed"
}
},
// Add month field
doc! {
"$project": {
"region": 1,
"amount": 1,
"product": 1,
"month": { "$month": "$orderDate" }
}
},
// Group by region and month
doc! {
"$group": {
"_id": {
"region": "$region",
"month": "$month"
},
"totalSales": { "$sum": "$amount" },
"avgOrderValue": { "$avg": "$amount" },
"orderCount": { "$sum": 1 },
"topProducts": { "$push": "$product" }
}
},
// Sort by sales
doc! {
"$sort": { "totalSales": -1 }
},
// Top 20 results
doc! {
"$limit": 20
}
];
let mut cursor = collection.aggregate(pipeline, None).await?;
while cursor.advance().await? {
let result = cursor.deserialize_current()?;
println!("{:?}", result);
}

Navigation: ← Previous: Indexing | Back to Index | Next: Change Streams →