Complex Event Processing (CEP) Patterns Guide
Complex Event Processing (CEP) Patterns Guide
Overview
Complex Event Processing enables pattern detection in event streams using MATCH_RECOGNIZE SQL syntax and NFA (Non-deterministic Finite Automaton) matching. HeliosDB Streaming provides comprehensive CEP capabilities for detecting complex patterns across time-ordered events.
Pattern Types
1. Sequence Patterns
Detect ordered sequences of events.
Use Case: User journey tracking (view → add to cart → purchase)
SQL Example:
SELECT *FROM eventsMATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY event_time MEASURES FIRST(A.event_time) AS journey_start, LAST(C.event_time) AS journey_end, (C.event_time - A.event_time) AS conversion_time PATTERN (A B C) DEFINE A AS A.event_type = 'page_view', B AS B.event_type = 'add_to_cart', C AS C.event_type = 'purchase')Rust API Example:
use heliosdb_streaming::cep::{Pattern, PatternBuilder};use std::time::Duration;
let pattern = PatternBuilder::new() .name("user_journey") .sequence() .event("view", |e: &Event| e.event_type == "page_view") .followed_by("cart", |e: &Event| e.event_type == "add_to_cart") .followed_by("purchase", |e: &Event| e.event_type == "purchase") .within(Duration::from_secs(3600)) // 1 hour session .build()?;
let matches = stream.match_pattern(pattern).await?;2. Missing Event Patterns
Detect absence of expected events within a time window.
Use Case: Abandoned cart detection
SQL Example:
SELECT *FROM eventsMATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY event_time MEASURES A.cart_id AS abandoned_cart, A.event_time AS cart_time, CURRENT_TIMESTAMP AS detected_at PATTERN (A B* NOT C) WITHIN INTERVAL '1' HOUR DEFINE A AS A.event_type = 'add_to_cart', B AS B.event_type != 'purchase' AND B.event_type != 'cart_clear', C AS C.event_type = 'purchase')Rust API Example:
let pattern = PatternBuilder::new() .name("abandoned_cart") .sequence() .event("add_cart", |e: &Event| e.event_type == "add_to_cart") .followed_by_any("other", |e: &Event| { e.event_type != "purchase" && e.event_type != "cart_clear" }) .not_followed_by("purchase", |e: &Event| e.event_type == "purchase") .within(Duration::from_secs(3600)) // 1 hour timeout .build()?;3. Looping Patterns
Detect repeated occurrences of events.
Use Case: Failed login detection (3+ failed attempts)
SQL Example:
SELECT *FROM eventsMATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY event_time MEASURES COUNT(A.*) AS failed_attempts, FIRST(A.event_time) AS first_failure, LAST(A.event_time) AS last_failure PATTERN (A{3,}) WITHIN INTERVAL '5' MINUTE DEFINE A AS A.event_type = 'login_failed')Rust API Example:
let pattern = PatternBuilder::new() .name("brute_force_detection") .times(3, Some(10)) // 3 to 10 occurrences .event("failed_login", |e: &Event| e.event_type == "login_failed") .within(Duration::from_secs(300)) // 5 minutes .build()?;4. Alternation Patterns
Multiple possible event sequences.
Use Case: Payment method selection
SQL Example:
SELECT *FROM eventsMATCH_RECOGNIZE ( PARTITION BY user_id ORDER BY event_time MEASURES CASE WHEN MATCH_NUMBER(B) = 1 THEN 'credit_card' WHEN MATCH_NUMBER(C) = 1 THEN 'paypal' ELSE 'unknown' END AS payment_method PATTERN ((A B) | (A C)) DEFINE A AS A.event_type = 'checkout_started', B AS B.event_type = 'credit_card_selected', C AS C.event_type = 'paypal_selected')Rust API Example:
let pattern = PatternBuilder::new() .name("payment_flow") .alternation() .branch(|b| b .event("checkout", |e: &Event| e.event_type == "checkout_started") .followed_by("credit", |e: &Event| e.event_type == "credit_card_selected") ) .branch(|b| b .event("checkout", |e: &Event| e.event_type == "checkout_started") .followed_by("paypal", |e: &Event| e.event_type == "paypal_selected") ) .within(Duration::from_secs(600)) .build()?;5. Conditional Patterns
Patterns with conditional logic.
Use Case: High-value transaction anomaly detection
SQL Example:
SELECT *FROM eventsMATCH_RECOGNIZE ( PARTITION BY account_id ORDER BY event_time MEASURES A.amount AS suspicious_amount, AVG(B.amount) AS avg_normal_amount, (A.amount / AVG(B.amount)) AS anomaly_ratio PATTERN (B{5,} A) WITHIN INTERVAL '1' DAY DEFINE B AS B.amount BETWEEN 10 AND 100, A AS A.amount > AVG(B.amount) * 5)Rust API Example:
let pattern = PatternBuilder::new() .name("anomaly_detection") .sequence() .times(5, Some(100)) .event("normal", |e: &Transaction| { e.amount >= 10.0 && e.amount <= 100.0 }) .followed_by_with_condition("anomaly", |e: &Transaction, ctx| { let avg = ctx.average("normal", |t: &Transaction| t.amount); e.amount > avg * 5.0 } ) .within(Duration::from_secs(86400)) .build()?;Performance Considerations
1. NFA State Explosion
Complex patterns can create large state machines. Mitigation:
- Limit pattern complexity (max 5-7 operators per pattern)
- Use time constraints to bound state lifetime
- Partition by key to distribute load
2. Memory Usage
Each active pattern instance maintains state:
// Set maximum pattern instances per partitionlet config = CEPConfig { max_pattern_instances: 10_000, state_cleanup_interval: Duration::from_secs(60), ..Default::default()};3. Latency Optimization
- Early Termination: Use
NOTpatterns for fast rejection - Pattern Ordering: Most selective conditions first
- Partition Pruning: Partition by high-cardinality keys
Best Practices
1. Always Use Time Constraints
// ❌ BAD: Unbounded pattern can accumulate unbounded statelet pattern = PatternBuilder::new() .event("A", |e| e.event_type == "start") .followed_by("B", |e| e.event_type == "end") .build()?;
// GOOD: Bounded by time windowlet pattern = PatternBuilder::new() .event("A", |e| e.event_type == "start") .followed_by("B", |e| e.event_type == "end") .within(Duration::from_secs(3600)) // 1 hour max .build()?;2. Partition for Parallelism
let stream = source .partition_by(|event| event.user_id) // Parallelize by user .match_pattern(pattern);3. Test with Realistic Data
#[tokio::test]async fn test_pattern_performance() { let pattern = create_fraud_pattern(); let test_stream = generate_realistic_events(100_000);
let start = Instant::now(); let matches = test_stream.match_pattern(pattern).await?; let duration = start.elapsed();
assert!(duration < Duration::from_secs(10)); assert!(matches.len() > 0);}4. Monitor NFA State Size
// Export metrics for monitoringmetrics::gauge!("cep.active_patterns", active_count as f64);metrics::histogram!("cep.pattern_match_latency", match_duration.as_millis() as f64);Advanced Examples
Fraud Detection Pattern
let fraud_pattern = PatternBuilder::new() .name("velocity_fraud") .sequence() // Detect 5+ transactions in 1 minute .times(5, Some(10)) .event("transaction", |t: &Transaction| t.amount > 0.0) .within(Duration::from_secs(60)) // Followed by high-value transaction .followed_by("large_tx", |t: &Transaction| t.amount > 1000.0) .build()?;
let alerts = transactions .partition_by(|t| t.card_id) .match_pattern(fraud_pattern) .map(|matches| FraudAlert { card_id: matches.card_id, pattern: "velocity_fraud", confidence: 0.85, timestamp: Utc::now(), }) .await?;Session Analysis
let session_pattern = PatternBuilder::new() .name("engaged_session") .sequence() .event("start", |e| e.event_type == "session_start") .followed_by_any("activity", |e| { e.event_type == "click" || e.event_type == "scroll" }) .times(10, Some(100)) // 10-100 interactions .followed_by("conversion", |e| e.event_type == "purchase") .within(Duration::from_secs(1800)) // 30 min session .build()?;Troubleshooting
High Memory Usage
Symptom: OOM errors or high GC pressure
Solution:
- Reduce
max_pattern_instances - Decrease time windows
- Add more aggressive state cleanup:
let config = CEPConfig { state_cleanup_interval: Duration::from_secs(30), max_state_age: Duration::from_secs(3600), ..Default::default()};Slow Pattern Matching
Symptom: High latency in pattern matching
Solution:
- Simplify pattern complexity
- Increase parallelism
- Optimize event predicates (avoid expensive operations)
Missing Matches
Symptom: Expected patterns not detected
Solution:
- Check event ordering (ORDER BY in SQL)
- Verify partition keys
- Increase time windows
- Enable debug logging:
env::set_var("RUST_LOG", "heliosdb_streaming::cep=debug");References
- Flink CEP Documentation
- SQL MATCH_RECOGNIZE Syntax
- HeliosDB Streaming API Documentation:
cargo doc --open
Last Updated: November 2025 Version: 1.0