Document Store: Aggregation Framework
Document Store: Aggregation Framework
Part of: HeliosDB Document Store User Guide
Aggregation Framework
The aggregation framework processes documents through a pipeline of stages, transforming data step by step.
Pipeline Concept
// Input documents → Stage 1 → Stage 2 → Stage 3 → Output results//// Example: Top 5 customers by total spend// Documents → $match (filter) → $group (aggregate) → $sort → $limit → Results$match (Filtering)
Filter Documents Early:
let pipeline = vec![ doc! { "$match": { "status": "completed", "orderDate": { "$gte": "2025-01-01" } } }];
let results = collection.aggregate(pipeline, None).await?;HeliosDB Native:
use heliosdb_document::{AggregationStage, Filter};
let pipeline = vec![ AggregationStage::Match { filter: Filter::eq("status", json!("completed")), },];
let results = store.aggregate(&Collection::new("orders"), pipeline)?;Best Practice: Use $match as early as possible to reduce documents in pipeline.
$group (Grouping and Aggregation)
Group by Single Field:
let pipeline = vec![ doc! { "$group": { "_id": "$category", // Group by category "count": { "$sum": 1 }, "totalSales": { "$sum": "$amount" }, "avgSales": { "$avg": "$amount" } } }];Group by Multiple Fields:
let pipeline = vec![ doc! { "$group": { "_id": { "category": "$category", "region": "$region" }, "total": { "$sum": "$amount" } } }];HeliosDB Native:
use std::collections::HashMap;use heliosdb_document::AggregationOp;
let mut fields = HashMap::new();fields.insert("count".to_string(), AggregationOp::Count);fields.insert("total".to_string(), AggregationOp::Sum { field: "amount".to_string(),});fields.insert("avg".to_string(), AggregationOp::Avg { field: "amount".to_string(),});
let pipeline = vec![ AggregationStage::Group { id: Some(json!("$category")), fields, },];Aggregation Operators:
| Operator | Description | Example |
|---|---|---|
$sum | Sum values | { "$sum": "$amount" } |
$avg | Average | { "$avg": "$age" } |
$min | Minimum | { "$min": "$price" } |
$max | Maximum | { "$max": "$price" } |
$first | First value | { "$first": "$name" } |
$last | Last value | { "$last": "$name" } |
$push | Array of values | { "$push": "$item" } |
$addToSet | Unique array | { "$addToSet": "$tag" } |
$project (Field Selection and Transformation)
Include/Exclude Fields:
let pipeline = vec![ doc! { "$project": { "name": 1, // Include "email": 1, // Include "age": 1, // Include "_id": 0 // Exclude } }];Computed Fields:
let pipeline = vec![ doc! { "$project": { "name": 1, "fullName": { "$concat": ["$firstName", " ", "$lastName"] }, "ageGroup": { "$cond": { "if": { "$gte": ["$age", 65] }, "then": "senior", "else": { "$cond": { "if": { "$gte": ["$age", 18] }, "then": "adult", "else": "minor" } } } } } }];HeliosDB Native:
use heliosdb_document::ProjectionValue;
let mut fields = HashMap::new();fields.insert("name".to_string(), ProjectionValue::Include(true));fields.insert("email".to_string(), ProjectionValue::Include(true));fields.insert("_id".to_string(), ProjectionValue::Include(false));
let pipeline = vec![ AggregationStage::Project { fields },];$sort (Ordering)
let pipeline = vec![ doc! { "$sort": { "totalSales": -1, // Descending (highest first) "customerName": 1 // Then ascending by name } }];HeliosDB Native:
use heliosdb_document::SortOrder;
let pipeline = vec![ AggregationStage::Sort { fields: vec![ ("totalSales".to_string(), SortOrder::Desc), ("customerName".to_string(), SortOrder::Asc), ], },];$limit and $skip (Pagination)
// Get second page (items 11-20)let pipeline = vec![ doc! { "$skip": 10 }, doc! { "$limit": 10 },];HeliosDB Native:
let pipeline = vec![ AggregationStage::Skip { count: 10 }, AggregationStage::Limit { count: 10 },];$unwind (Array Flattening)
Explode Array into Multiple Documents:
// Input: { "name": "Alice", "skills": ["Rust", "MongoDB", "Docker"] }// Output (3 docs):// { "name": "Alice", "skills": "Rust" }// { "name": "Alice", "skills": "MongoDB" }// { "name": "Alice", "skills": "Docker" }
let pipeline = vec![ doc! { "$unwind": "$skills" }];With Preserve Null:
let pipeline = vec![ doc! { "$unwind": { "path": "$skills", "preserveNullAndEmptyArrays": true // Keep docs with no skills } }];HeliosDB Native:
let pipeline = vec![ AggregationStage::Unwind { field: "skills".to_string(), preserve_null: true, },];$lookup (Joins)
Simple Join:
// Join orders with productslet pipeline = vec![ doc! { "$lookup": { "from": "products", // Collection to join "localField": "productId", // Field in orders "foreignField": "_id", // Field in products "as": "product" // Output array field } }];Complex Join with Pipeline:
let pipeline = vec![ doc! { "$lookup": { "from": "products", "let": { "prodId": "$productId", "qty": "$quantity" }, "pipeline": [ { "$match": { "$expr": { "$and": [ { "$eq": ["$_id", "$$prodId"] }, { "$gte": ["$stock", "$$qty"] } ] } } } ], "as": "product" } }];HeliosDB Native:
let pipeline = vec![ AggregationStage::Lookup { from: "products".to_string(), local_field: "productId".to_string(), foreign_field: "_id".to_string(), as_field: "product".to_string(), },];$facet (Multi-Faceted Aggregation)
Multiple Independent Pipelines:
let pipeline = vec![ doc! { "$facet": { "categoryCounts": [ { "$group": { "_id": "$category", "count": { "$sum": 1 } } }, { "$sort": { "count": -1 } } ], "priceRanges": [ { "$bucket": { "groupBy": "$price", "boundaries": [0, 50, 100, 200, 500], "default": "Other", "output": { "count": { "$sum": 1 } } } } ], "topProducts": [ { "$sort": { "sales": -1 } }, { "$limit": 10 } ] } }];$bucket (Bucketing)
Group by Ranges:
let pipeline = vec![ doc! { "$bucket": { "groupBy": "$age", "boundaries": [0, 18, 30, 50, 65, 120], "default": "Other", "output": { "count": { "$sum": 1 }, "avgIncome": { "$avg": "$income" } } } }];
// Results:// { "_id": 0, "count": 5, "avgIncome": 0 } // 0-17 years// { "_id": 18, "count": 150, "avgIncome": 45000 } // 18-29 years// { "_id": 30, "count": 200, "avgIncome": 75000 } // 30-49 years// { "_id": 50, "count": 100, "avgIncome": 90000 } // 50-64 years// { "_id": 65, "count": 45, "avgIncome": 65000 } // 65-119 years$count
let pipeline = vec![ doc! { "$match": { "status": "active" } }, doc! { "$count": "totalActive" }];
// Result: { "totalActive": 1234 }HeliosDB Native:
let pipeline = vec![ AggregationStage::Count { field: "total".to_string(), },];Complete Example: Sales Analytics
// Calculate monthly sales by region with top productslet pipeline = vec![ // Filter to current year doc! { "$match": { "orderDate": { "$gte": "2025-01-01" }, "status": "completed" } }, // Add month field doc! { "$project": { "region": 1, "amount": 1, "product": 1, "month": { "$month": "$orderDate" } } }, // Group by region and month doc! { "$group": { "_id": { "region": "$region", "month": "$month" }, "totalSales": { "$sum": "$amount" }, "avgOrderValue": { "$avg": "$amount" }, "orderCount": { "$sum": 1 }, "topProducts": { "$push": "$product" } } }, // Sort by sales doc! { "$sort": { "totalSales": -1 } }, // Top 20 results doc! { "$limit": 20 }];
let mut cursor = collection.aggregate(pipeline, None).await?;
while cursor.advance().await? { let result = cursor.deserialize_current()?; println!("{:?}", result);}Navigation: ← Previous: Indexing | Back to Index | Next: Change Streams →