Skip to content

Document Store: Change Streams

Document Store: Change Streams

Part of: HeliosDB Document Store User Guide

Change Streams

Change streams provide real-time notifications when documents are inserted, updated, or deleted.

Basic Change Stream

Watch Collection:

use heliosdb_document::{ChangeStream, ChangeEventType};
// Watch for all changes
let mut stream = store.watch(Collection::new("orders"))?;
// Process events
while let Some(event) = stream.next().await? {
match event.event_type {
ChangeEventType::Insert => {
println!("New order: {:?}", event.full_document);
}
ChangeEventType::Update => {
println!("Order updated: {}", event.document_id.as_str());
if let Some(desc) = event.update_description {
println!(" Updated fields: {:?}", desc.updated_fields);
println!(" Removed fields: {:?}", desc.removed_fields);
}
}
ChangeEventType::Delete => {
println!("Order deleted: {}", event.document_id.as_str());
}
ChangeEventType::Replace => {
println!("Order replaced: {:?}", event.full_document);
}
}
}

Filter Change Streams

Watch Specific Events:

// Only watch for inserts of high-value orders
let mut stream = store.watch(Collection::new("orders"))?;
while let Some(event) = stream.next().await? {
if event.event_type == ChangeEventType::Insert {
if let Some(doc) = event.full_document {
if doc.data["total"].as_f64().unwrap_or(0.0) > 1000.0 {
println!("High-value order: {:?}", doc);
send_notification(&doc).await?;
}
}
}
}

Resume Tokens

Resume from Last Position:

// Save resume token
let mut stream = store.watch(Collection::new("orders"))?;
let mut last_token = None;
loop {
match stream.next().await {
Ok(Some(event)) => {
last_token = Some(event.resume_token.clone());
process_event(event)?;
}
Err(e) => {
println!("Stream error: {}, resuming from token", e);
// Resume from last known position
if let Some(token) = last_token {
stream = store.watch_from_token(
Collection::new("orders"),
token
)?;
}
}
_ => break,
}
}

Use Cases

1. Cache Invalidation

// Invalidate cache when documents change
let mut stream = store.watch(Collection::new("products"))?;
tokio::spawn(async move {
while let Some(event) = stream.next().await.unwrap() {
match event.event_type {
ChangeEventType::Update | ChangeEventType::Delete => {
// Invalidate cache entry
cache.invalidate(&format!("product:{}", event.document_id.as_str())).await;
}
_ => {}
}
}
});

2. Audit Logging

// Log all changes to audit collection
let mut stream = store.watch(Collection::new("users"))?;
let audit_collection = Collection::new("audit_log");
tokio::spawn(async move {
while let Some(event) = stream.next().await.unwrap() {
let audit_entry = json!({
"timestamp": chrono::Utc::now().to_rfc3339(),
"collection": "users",
"document_id": event.document_id.as_str(),
"event_type": format!("{:?}", event.event_type),
"user": get_current_user().await,
"changes": event.update_description
});
store.insert(
&audit_collection,
&DocumentId::new(&format!("audit_{}", uuid::Uuid::new_v4())),
audit_entry
).unwrap();
}
});

3. Real-time Dashboard

// Update dashboard on order changes
let mut stream = store.watch(Collection::new("orders"))?;
tokio::spawn(async move {
while let Some(event) = stream.next().await.unwrap() {
match event.event_type {
ChangeEventType::Insert => {
if let Some(doc) = event.full_document {
// Update real-time metrics
metrics.increment_total_orders();
metrics.add_revenue(doc.data["total"].as_f64().unwrap_or(0.0));
// Notify connected clients via WebSocket
websocket_broadcast(json!({
"type": "new_order",
"order": doc.data
})).await;
}
}
ChangeEventType::Update => {
// Update order status in dashboard
if let Some(desc) = event.update_description {
if desc.updated_fields.contains_key("status") {
websocket_broadcast(json!({
"type": "order_status_update",
"order_id": event.document_id.as_str(),
"status": desc.updated_fields["status"]
})).await;
}
}
}
_ => {}
}
}
});

4. Event Sourcing

// Capture all events for replay
let mut stream = store.watch(Collection::new("accounts"))?;
let events_collection = Collection::new("account_events");
tokio::spawn(async move {
while let Some(event) = stream.next().await.unwrap() {
let event_doc = json!({
"event_id": uuid::Uuid::new_v4().to_string(),
"timestamp": event.timestamp,
"aggregate_id": event.document_id.as_str(),
"event_type": format!("{:?}", event.event_type),
"data": event.full_document,
"changes": event.update_description
});
store.insert(
&events_collection,
&DocumentId::new(&format!("evt_{}", uuid::Uuid::new_v4())),
event_doc
).unwrap();
}
});

Change Stream Features

FeatureDescriptionPerformance
Multiple SubscribersMultiple streams on same collection<1ms notification
Buffering10,000 event buffer per streamZero loss
Resume TokensResume from any pointInstant
FilteringFilter events in applicationN/A
Low LatencySub-millisecond notification<1ms

Navigation: ← Previous: Aggregation Framework | Back to Index | Next: Use Cases →