Skip to content

Complex Event Processing (CEP) Patterns Guide

Complex Event Processing (CEP) Patterns Guide

Overview

Complex Event Processing enables pattern detection in event streams using MATCH_RECOGNIZE SQL syntax and NFA (Non-deterministic Finite Automaton) matching. HeliosDB Streaming provides comprehensive CEP capabilities for detecting complex patterns across time-ordered events.

Pattern Types

1. Sequence Patterns

Detect ordered sequences of events.

Use Case: User journey tracking (view → add to cart → purchase)

SQL Example:

SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
FIRST(A.event_time) AS journey_start,
LAST(C.event_time) AS journey_end,
(C.event_time - A.event_time) AS conversion_time
PATTERN (A B C)
DEFINE
A AS A.event_type = 'page_view',
B AS B.event_type = 'add_to_cart',
C AS C.event_type = 'purchase'
)

Rust API Example:

use heliosdb_streaming::cep::{Pattern, PatternBuilder};
use std::time::Duration;
let pattern = PatternBuilder::new()
.name("user_journey")
.sequence()
.event("view", |e: &Event| e.event_type == "page_view")
.followed_by("cart", |e: &Event| e.event_type == "add_to_cart")
.followed_by("purchase", |e: &Event| e.event_type == "purchase")
.within(Duration::from_secs(3600)) // 1 hour session
.build()?;
let matches = stream.match_pattern(pattern).await?;

2. Missing Event Patterns

Detect absence of expected events within a time window.

Use Case: Abandoned cart detection

SQL Example:

SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
A.cart_id AS abandoned_cart,
A.event_time AS cart_time,
CURRENT_TIMESTAMP AS detected_at
PATTERN (A B* NOT C)
WITHIN INTERVAL '1' HOUR
DEFINE
A AS A.event_type = 'add_to_cart',
B AS B.event_type != 'purchase' AND B.event_type != 'cart_clear',
C AS C.event_type = 'purchase'
)

Rust API Example:

let pattern = PatternBuilder::new()
.name("abandoned_cart")
.sequence()
.event("add_cart", |e: &Event| e.event_type == "add_to_cart")
.followed_by_any("other", |e: &Event| {
e.event_type != "purchase" && e.event_type != "cart_clear"
})
.not_followed_by("purchase", |e: &Event| e.event_type == "purchase")
.within(Duration::from_secs(3600)) // 1 hour timeout
.build()?;

3. Looping Patterns

Detect repeated occurrences of events.

Use Case: Failed login detection (3+ failed attempts)

SQL Example:

SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
COUNT(A.*) AS failed_attempts,
FIRST(A.event_time) AS first_failure,
LAST(A.event_time) AS last_failure
PATTERN (A{3,})
WITHIN INTERVAL '5' MINUTE
DEFINE
A AS A.event_type = 'login_failed'
)

Rust API Example:

let pattern = PatternBuilder::new()
.name("brute_force_detection")
.times(3, Some(10)) // 3 to 10 occurrences
.event("failed_login", |e: &Event| e.event_type == "login_failed")
.within(Duration::from_secs(300)) // 5 minutes
.build()?;

4. Alternation Patterns

Multiple possible event sequences.

Use Case: Payment method selection

SQL Example:

SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY user_id
ORDER BY event_time
MEASURES
CASE
WHEN MATCH_NUMBER(B) = 1 THEN 'credit_card'
WHEN MATCH_NUMBER(C) = 1 THEN 'paypal'
ELSE 'unknown'
END AS payment_method
PATTERN ((A B) | (A C))
DEFINE
A AS A.event_type = 'checkout_started',
B AS B.event_type = 'credit_card_selected',
C AS C.event_type = 'paypal_selected'
)

Rust API Example:

let pattern = PatternBuilder::new()
.name("payment_flow")
.alternation()
.branch(|b| b
.event("checkout", |e: &Event| e.event_type == "checkout_started")
.followed_by("credit", |e: &Event| e.event_type == "credit_card_selected")
)
.branch(|b| b
.event("checkout", |e: &Event| e.event_type == "checkout_started")
.followed_by("paypal", |e: &Event| e.event_type == "paypal_selected")
)
.within(Duration::from_secs(600))
.build()?;

5. Conditional Patterns

Patterns with conditional logic.

Use Case: High-value transaction anomaly detection

SQL Example:

SELECT *
FROM events
MATCH_RECOGNIZE (
PARTITION BY account_id
ORDER BY event_time
MEASURES
A.amount AS suspicious_amount,
AVG(B.amount) AS avg_normal_amount,
(A.amount / AVG(B.amount)) AS anomaly_ratio
PATTERN (B{5,} A)
WITHIN INTERVAL '1' DAY
DEFINE
B AS B.amount BETWEEN 10 AND 100,
A AS A.amount > AVG(B.amount) * 5
)

Rust API Example:

let pattern = PatternBuilder::new()
.name("anomaly_detection")
.sequence()
.times(5, Some(100))
.event("normal", |e: &Transaction| {
e.amount >= 10.0 && e.amount <= 100.0
})
.followed_by_with_condition("anomaly",
|e: &Transaction, ctx| {
let avg = ctx.average("normal", |t: &Transaction| t.amount);
e.amount > avg * 5.0
}
)
.within(Duration::from_secs(86400))
.build()?;

Performance Considerations

1. NFA State Explosion

Complex patterns can create large state machines. Mitigation:

  • Limit pattern complexity (max 5-7 operators per pattern)
  • Use time constraints to bound state lifetime
  • Partition by key to distribute load

2. Memory Usage

Each active pattern instance maintains state:

// Set maximum pattern instances per partition
let config = CEPConfig {
max_pattern_instances: 10_000,
state_cleanup_interval: Duration::from_secs(60),
..Default::default()
};

3. Latency Optimization

  • Early Termination: Use NOT patterns for fast rejection
  • Pattern Ordering: Most selective conditions first
  • Partition Pruning: Partition by high-cardinality keys

Best Practices

1. Always Use Time Constraints

// ❌ BAD: Unbounded pattern can accumulate unbounded state
let pattern = PatternBuilder::new()
.event("A", |e| e.event_type == "start")
.followed_by("B", |e| e.event_type == "end")
.build()?;
// GOOD: Bounded by time window
let pattern = PatternBuilder::new()
.event("A", |e| e.event_type == "start")
.followed_by("B", |e| e.event_type == "end")
.within(Duration::from_secs(3600)) // 1 hour max
.build()?;

2. Partition for Parallelism

let stream = source
.partition_by(|event| event.user_id) // Parallelize by user
.match_pattern(pattern);

3. Test with Realistic Data

#[tokio::test]
async fn test_pattern_performance() {
let pattern = create_fraud_pattern();
let test_stream = generate_realistic_events(100_000);
let start = Instant::now();
let matches = test_stream.match_pattern(pattern).await?;
let duration = start.elapsed();
assert!(duration < Duration::from_secs(10));
assert!(matches.len() > 0);
}

4. Monitor NFA State Size

// Export metrics for monitoring
metrics::gauge!("cep.active_patterns", active_count as f64);
metrics::histogram!("cep.pattern_match_latency", match_duration.as_millis() as f64);

Advanced Examples

Fraud Detection Pattern

let fraud_pattern = PatternBuilder::new()
.name("velocity_fraud")
.sequence()
// Detect 5+ transactions in 1 minute
.times(5, Some(10))
.event("transaction", |t: &Transaction| t.amount > 0.0)
.within(Duration::from_secs(60))
// Followed by high-value transaction
.followed_by("large_tx", |t: &Transaction| t.amount > 1000.0)
.build()?;
let alerts = transactions
.partition_by(|t| t.card_id)
.match_pattern(fraud_pattern)
.map(|matches| FraudAlert {
card_id: matches.card_id,
pattern: "velocity_fraud",
confidence: 0.85,
timestamp: Utc::now(),
})
.await?;

Session Analysis

let session_pattern = PatternBuilder::new()
.name("engaged_session")
.sequence()
.event("start", |e| e.event_type == "session_start")
.followed_by_any("activity", |e| {
e.event_type == "click" || e.event_type == "scroll"
})
.times(10, Some(100)) // 10-100 interactions
.followed_by("conversion", |e| e.event_type == "purchase")
.within(Duration::from_secs(1800)) // 30 min session
.build()?;

Troubleshooting

High Memory Usage

Symptom: OOM errors or high GC pressure

Solution:

  1. Reduce max_pattern_instances
  2. Decrease time windows
  3. Add more aggressive state cleanup:
let config = CEPConfig {
state_cleanup_interval: Duration::from_secs(30),
max_state_age: Duration::from_secs(3600),
..Default::default()
};

Slow Pattern Matching

Symptom: High latency in pattern matching

Solution:

  1. Simplify pattern complexity
  2. Increase parallelism
  3. Optimize event predicates (avoid expensive operations)

Missing Matches

Symptom: Expected patterns not detected

Solution:

  1. Check event ordering (ORDER BY in SQL)
  2. Verify partition keys
  3. Increase time windows
  4. Enable debug logging:
env::set_var("RUST_LOG", "heliosdb_streaming::cep=debug");

References


Last Updated: November 2025 Version: 1.0