Document Store: Change Streams
Document Store: Change Streams
Part of: HeliosDB Document Store User Guide
Change Streams
Change streams provide real-time notifications when documents are inserted, updated, or deleted.
Basic Change Stream
Watch Collection:
use heliosdb_document::{ChangeStream, ChangeEventType};
// Watch for all changeslet mut stream = store.watch(Collection::new("orders"))?;
// Process eventswhile let Some(event) = stream.next().await? { match event.event_type { ChangeEventType::Insert => { println!("New order: {:?}", event.full_document); } ChangeEventType::Update => { println!("Order updated: {}", event.document_id.as_str()); if let Some(desc) = event.update_description { println!(" Updated fields: {:?}", desc.updated_fields); println!(" Removed fields: {:?}", desc.removed_fields); } } ChangeEventType::Delete => { println!("Order deleted: {}", event.document_id.as_str()); } ChangeEventType::Replace => { println!("Order replaced: {:?}", event.full_document); } }}Filter Change Streams
Watch Specific Events:
// Only watch for inserts of high-value orderslet mut stream = store.watch(Collection::new("orders"))?;
while let Some(event) = stream.next().await? { if event.event_type == ChangeEventType::Insert { if let Some(doc) = event.full_document { if doc.data["total"].as_f64().unwrap_or(0.0) > 1000.0 { println!("High-value order: {:?}", doc); send_notification(&doc).await?; } } }}Resume Tokens
Resume from Last Position:
// Save resume tokenlet mut stream = store.watch(Collection::new("orders"))?;let mut last_token = None;
loop { match stream.next().await { Ok(Some(event)) => { last_token = Some(event.resume_token.clone()); process_event(event)?; } Err(e) => { println!("Stream error: {}, resuming from token", e); // Resume from last known position if let Some(token) = last_token { stream = store.watch_from_token( Collection::new("orders"), token )?; } } _ => break, }}Use Cases
1. Cache Invalidation
// Invalidate cache when documents changelet mut stream = store.watch(Collection::new("products"))?;
tokio::spawn(async move { while let Some(event) = stream.next().await.unwrap() { match event.event_type { ChangeEventType::Update | ChangeEventType::Delete => { // Invalidate cache entry cache.invalidate(&format!("product:{}", event.document_id.as_str())).await; } _ => {} } }});2. Audit Logging
// Log all changes to audit collectionlet mut stream = store.watch(Collection::new("users"))?;let audit_collection = Collection::new("audit_log");
tokio::spawn(async move { while let Some(event) = stream.next().await.unwrap() { let audit_entry = json!({ "timestamp": chrono::Utc::now().to_rfc3339(), "collection": "users", "document_id": event.document_id.as_str(), "event_type": format!("{:?}", event.event_type), "user": get_current_user().await, "changes": event.update_description });
store.insert( &audit_collection, &DocumentId::new(&format!("audit_{}", uuid::Uuid::new_v4())), audit_entry ).unwrap(); }});3. Real-time Dashboard
// Update dashboard on order changeslet mut stream = store.watch(Collection::new("orders"))?;
tokio::spawn(async move { while let Some(event) = stream.next().await.unwrap() { match event.event_type { ChangeEventType::Insert => { if let Some(doc) = event.full_document { // Update real-time metrics metrics.increment_total_orders(); metrics.add_revenue(doc.data["total"].as_f64().unwrap_or(0.0));
// Notify connected clients via WebSocket websocket_broadcast(json!({ "type": "new_order", "order": doc.data })).await; } } ChangeEventType::Update => { // Update order status in dashboard if let Some(desc) = event.update_description { if desc.updated_fields.contains_key("status") { websocket_broadcast(json!({ "type": "order_status_update", "order_id": event.document_id.as_str(), "status": desc.updated_fields["status"] })).await; } } } _ => {} } }});4. Event Sourcing
// Capture all events for replaylet mut stream = store.watch(Collection::new("accounts"))?;let events_collection = Collection::new("account_events");
tokio::spawn(async move { while let Some(event) = stream.next().await.unwrap() { let event_doc = json!({ "event_id": uuid::Uuid::new_v4().to_string(), "timestamp": event.timestamp, "aggregate_id": event.document_id.as_str(), "event_type": format!("{:?}", event.event_type), "data": event.full_document, "changes": event.update_description });
store.insert( &events_collection, &DocumentId::new(&format!("evt_{}", uuid::Uuid::new_v4())), event_doc ).unwrap(); }});Change Stream Features
| Feature | Description | Performance |
|---|---|---|
| Multiple Subscribers | Multiple streams on same collection | <1ms notification |
| Buffering | 10,000 event buffer per stream | Zero loss |
| Resume Tokens | Resume from any point | Instant |
| Filtering | Filter events in application | N/A |
| Low Latency | Sub-millisecond notification | <1ms |
Navigation: ← Previous: Aggregation Framework | Back to Index | Next: Use Cases →