Skip to content

F6.23: Advanced Event-Driven Webhooks - Architecture Proposal

F6.23: Advanced Event-Driven Webhooks - Architecture Proposal

Feature ID: F6.23 Feature Name: Advanced Event-Driven Webhooks for HeliosDB v7.0 Document Type: Technical Architecture Date: November 2, 2025 Author: Research Agent (HeliosDB Hive Mind) Status: Architecture Proposal Implementation Priority: P2 (v7.0 Innovation Feature)


Executive Summary

This document proposes the technical architecture for F6.23: Advanced Event-Driven Webhooks, a next-generation webhook system that integrates SQL-based filtering, exactly-once delivery, and template transformations into HeliosDB v7.0.

Key Innovations

  1. SQL-Based Event Filtering - Filter webhook events using SQL WHERE predicates
  2. Exactly-Once Delivery - Guarantee each webhook is delivered exactly once
  3. Template Transformations - Transform payloads using Jinja2 templates
  4. Unified CDC+Webhook System - Seamless integration with existing CDC infrastructure

Performance Targets

MetricTargetJustification
End-to-End Latency<100ms (p99)CDC capture + filtering + template + delivery
Throughput10K webhooks/secCluster-wide parallelized delivery
Delivery Success99.99%With retries and circuit breakers
Exactly-Once100% guaranteeWhen enabled via checkpointing

Integration with Existing Systems

  • heliosdb-cdc (Phase 1 complete, Phase 2 for pipeline integration)
  • heliosdb-webhooks (production-ready HTTP delivery)
  • heliosdb-streaming (exactly-once semantics available)

Table of Contents

  1. System Architecture
  2. Component Design
  3. Data Flow
  4. API Design
  5. Performance Optimization
  6. Deployment Architecture
  7. Monitoring and Observability
  8. Implementation Roadmap

1. System Architecture

1.1 High-Level Architecture

┌─────────────────────────────────────────────────────────────────────┐
│ F6.23 Advanced Webhook System │
│ (HeliosDB v7.0) │
└─────────────────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────┐
│ Webhook Coordinator │
│ - Manages webhook registrations │
│ - Coordinates CDC → Filtering → Templates → Delivery │
│ - Handles checkpointing and recovery │
└───────────────────────┬─────────────────────────────────────────────┘
┌───────────────┼────────────────┬──────────────┐
│ │ │ │
▼ ▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ CDC Event │ │ SQL Filter │ │ Template │ │ Exactly-Once │
│ Capture │─▶│ Engine │─▶│ Engine │─▶│ Coordinator │
│ │ │ │ │ │ │ │
│ (heliosdb- │ │ - Parse SQL │ │ - Jinja2 │ │ - Checkpoint │
│ cdc) │ │ - Evaluate │ │ - Render │ │ - 2PC │
│ │ │ - Cache │ │ - Cache │ │ - Idempotent │
└──────────────┘ └──────────────┘ └──────────────┘ └──────┬───────┘
┌──────────────────────────────────────────────────┐
│ HTTP Webhook Delivery │
│ - Retry logic (exponential backoff) │
│ - Circuit breaker (failures → open) │
│ - Rate limiting (token bucket) │
│ - HMAC signatures (security) │
│ (heliosdb-webhooks) │
└──────────────────────────────────────────────────┘

1.2 Component Integration

Existing Components (Reuse):

  1. heliosdb-cdc (TenantCdcProcessor, TenantEventStream)
  2. heliosdb-webhooks (HTTP delivery, security, retry logic)
  3. heliosdb-streaming (checkpointing, exactly-once semantics)

New Components (Build):

  1. SqlFilterEngine - Evaluate SQL WHERE predicates on events
  2. TemplateEngine - Render Jinja2 templates for payloads
  3. WebhookCoordinator - Orchestrate end-to-end flow
  4. ExactlyOnceCoordinator - Manage checkpoints and 2PC

1.3 System Boundaries

Input Boundary: Database WAL (Write-Ahead Log) Output Boundary: HTTP webhooks to external services Internal State: Checkpoint store, delivery log, compiled filters, compiled templates


2. Component Design

2.1 Webhook Registration

Data Model:

pub struct WebhookRegistration {
pub id: Uuid,
pub name: String,
pub table: String, // Target table
pub sql_filter: Option<String>, // SQL WHERE clause
pub template: Option<String>, // Jinja2 template
pub endpoint: String, // HTTP URL
pub method: HttpMethod, // POST/PUT/PATCH
pub headers: HashMap<String, String>,// Custom headers
pub signature_config: Option<SignatureConfig>,
pub retry_policy: RetryPolicy,
pub rate_limit: Option<RateLimit>,
pub circuit_breaker: Option<CircuitBreakerConfig>,
pub exactly_once: bool, // Enable exactly-once delivery
pub enabled: bool,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
}
pub struct SignatureConfig {
pub algorithm: SignatureAlgorithm, // HmacSha256
pub secret: String, // Shared secret
pub header: String, // X-Signature
}
pub struct RetryPolicy {
pub max_attempts: u32, // Default: 5
pub initial_delay_ms: u64, // Default: 1000
pub max_delay_ms: u64, // Default: 300000
pub backoff_multiplier: f64, // Default: 2.0
pub retry_on_status: Vec<u16>, // [408, 429, 500, 502, 503, 504]
}

SQL DDL:

CREATE WEBHOOK high_value_orders
ON TABLE orders
FILTER WHERE amount > 1000 AND status = 'pending'
TEMPLATE '{"order_id": {{ event.new_data.id }}, "amount": {{ event.new_data.amount }}}'
TO ENDPOINT 'https://api.example.com/webhooks/high-value-orders'
WITH (
method = 'POST',
headers = '{"Authorization": "Bearer secret_token"}',
retry_max_attempts = 5,
exactly_once = true,
rate_limit = 100 per minute,
circuit_breaker_enabled = true,
enabled = true
);

2.2 SQL Filter Engine

Architecture:

pub struct SqlFilterEngine {
parser: SqlParser, // Parse SQL WHERE clauses
executor: Arc<FilterExecutor>, // Evaluate predicates
compiled_cache: Arc<RwLock<HashMap<String, CompiledFilter>>>,
}
pub struct CompiledFilter {
predicate_ast: Expr, // Parsed AST
referenced_columns: Vec<String>, // Columns used in predicate
estimated_cost: u64, // Evaluation cost estimate
}
impl SqlFilterEngine {
pub async fn evaluate(&self, predicate: &str, event: &ChangeEvent) -> Result<bool> {
// 1. Check compiled cache
let compiled = self.get_or_compile(predicate).await?;
// 2. Extract row data (only referenced columns)
let row = self.extract_row(event, &compiled.referenced_columns)?;
// 3. Evaluate predicate against row
self.executor.evaluate(&compiled.predicate_ast, &row)
}
async fn get_or_compile(&self, predicate: &str) -> Result<CompiledFilter> {
// Cache hit
if let Some(cached) = self.compiled_cache.read().await.get(predicate) {
return Ok(cached.clone());
}
// Cache miss - compile and cache
let compiled = self.compile(predicate)?;
self.compiled_cache.write().await.insert(predicate.to_string(), compiled.clone());
Ok(compiled)
}
fn compile(&self, predicate: &str) -> Result<CompiledFilter> {
// Parse SQL WHERE clause
let ast = self.parser.parse_where_clause(predicate)?;
// Extract referenced columns
let columns = self.extract_columns(&ast)?;
// Estimate evaluation cost
let cost = self.estimate_cost(&ast)?;
Ok(CompiledFilter {
predicate_ast: ast,
referenced_columns: columns,
estimated_cost: cost,
})
}
}

Example Evaluation:

let filter = SqlFilterEngine::new();
let predicate = "amount > 1000 AND status = 'pending'";
let event = ChangeEvent {
table: "orders",
operation: ChangeType::Insert,
new_data: json!({
"id": 12345,
"amount": 1500.00,
"status": "pending",
"user_id": 67890
}),
..Default::default()
};
// Evaluates to true
assert_eq!(filter.evaluate(predicate, &event).await?, true);

Optimization Strategies:

  1. Compiled Predicate Caching - Cache parsed AST (99%+ cache hit rate)
  2. Column Pruning - Extract only referenced columns from event (10x faster)
  3. Early Termination - Short-circuit AND/OR evaluation
  4. Index Acceleration - Pre-build indexes on commonly filtered columns (future)

2.3 Template Engine

Architecture:

pub struct TemplateEngine {
tera: Tera, // Jinja2 engine (tera crate)
compiled_cache: Arc<RwLock<HashMap<String, CompiledTemplate>>>,
}
impl TemplateEngine {
pub fn new() -> Self {
let mut tera = Tera::default();
// Register custom filters
tera.register_filter("mask_email", mask_email);
tera.register_filter("mask_phone", mask_phone);
tera.register_filter("unix_timestamp", unix_timestamp);
tera.register_filter("iso8601", iso8601);
Self {
tera,
compiled_cache: Arc::new(RwLock::new(HashMap::new())),
}
}
pub async fn render(&self, template: &str, context: &WebhookContext) -> Result<String> {
// 1. Create Tera context
let mut tera_context = tera::Context::new();
tera_context.insert("event", &context.event);
tera_context.insert("webhook", &context.webhook);
tera_context.insert("timestamp", &Utc::now().to_rfc3339());
// 2. Render template
self.tera.render_str(template, &tera_context)
.map_err(|e| WebhookError::TemplateRenderError(e.to_string()))
}
}

Custom Filters:

fn mask_email(value: &str) -> Result<String, tera::Error> {
let parts: Vec<&str> = value.split('@').collect();
if parts.len() == 2 {
let local = parts[0];
let domain = parts[1];
let masked = if local.len() > 2 {
format!("{}***{}", &local[..1], &local[local.len()-1..])
} else {
"***".to_string()
};
Ok(format!("{}@{}", masked, domain))
} else {
Ok("***@***.com".to_string())
}
}
// john.doe@example.com → j***e@example.com

Template Examples:

{# Example 1: Slack webhook #}
{
"text": "New order #{{ event.new_data.id }}",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Order #{{ event.new_data.id }}*\nAmount: ${{ event.new_data.amount }}"
}
}
]
}
{# Example 2: PII masking #}
{
"user_id": {{ event.new_data.id }},
"email": "{{ event.new_data.email | mask_email }}",
"phone": "{{ event.new_data.phone | mask_phone }}",
"action": "{{ event.operation }}"
}
{# Example 3: Multi-tenant customization #}
{% if tenant.format == 'slack' %}
{"text": "{{ event.new_data.message }}", "channel": "{{ tenant.channel }}"}
{% elif tenant.format == 'discord' %}
{"content": "{{ event.new_data.message }}", "username": "HeliosDB"}
{% else %}
{"event": "{{ event.operation }}", "data": {{ event.new_data | tojson }}}
{% endif %}

2.4 Exactly-Once Coordinator

Architecture:

pub struct ExactlyOnceCoordinator {
checkpoint_store: Arc<CheckpointStore>, // S3, distributed FS
delivery_log: Arc<DeliveryLog>, // PostgreSQL, Redis
webhook_sink: Arc<ExactlyOnceWebhookSink>,
checkpoint_interval: Duration, // 10 seconds default
}
impl ExactlyOnceCoordinator {
pub async fn process_event_stream(&mut self, mut stream: TenantEventStream) -> Result<()> {
let mut checkpoint_id = self.load_last_checkpoint().await?;
let mut batch = Vec::new();
let mut last_checkpoint = Instant::now();
while let Some(event) = stream.recv().await {
batch.push(event);
// Checkpoint on interval or batch size
if last_checkpoint.elapsed() >= self.checkpoint_interval || batch.len() >= 1000 {
checkpoint_id += 1;
self.process_batch(checkpoint_id, &batch).await?;
batch.clear();
last_checkpoint = Instant::now();
}
}
// Process remaining events
if !batch.is_empty() {
checkpoint_id += 1;
self.process_batch(checkpoint_id, &batch).await?;
}
Ok(())
}
async fn process_batch(&self, checkpoint_id: u64, batch: &[ChangeEvent]) -> Result<()> {
// Phase 1: Pre-commit (prepare webhooks)
self.webhook_sink.pre_commit(checkpoint_id, batch.to_vec()).await?;
// Phase 2: Commit (deliver webhooks)
match self.webhook_sink.commit(checkpoint_id).await {
Ok(_) => {
// Save checkpoint
self.checkpoint_store.save(checkpoint_id).await?;
Ok(())
}
Err(e) => {
// Abort (rollback)
self.webhook_sink.abort(checkpoint_id).await?;
Err(e)
}
}
}
}

Two-Phase Commit Webhook Sink:

pub struct ExactlyOnceWebhookSink {
delivery_log: Arc<DeliveryLog>,
http_client: reqwest::Client,
retry_policy: RetryPolicy,
pending_deliveries: Arc<Mutex<HashMap<u64, Vec<PendingDelivery>>>>,
}
#[async_trait]
impl TwoPhaseCommitSink<WebhookEvent> for ExactlyOnceWebhookSink {
async fn pre_commit(&mut self, checkpoint_id: u64, events: Vec<WebhookEvent>) -> Result<()> {
let mut pending = Vec::new();
for event in events {
// Check idempotency log
if self.delivery_log.is_delivered(&event.id).await? {
continue; // Skip duplicate
}
// Log delivery intent
self.delivery_log.log_intent(checkpoint_id, &event.id, &event.url).await?;
// Prepare HTTP request (but don't send yet)
let request = self.build_webhook_request(&event)?;
pending.push(PendingDelivery { event, request });
}
// Store pending deliveries
self.pending_deliveries.lock().await.insert(checkpoint_id, pending);
Ok(())
}
async fn commit(&mut self, checkpoint_id: u64) -> Result<()> {
// Retrieve pending deliveries
let pending = self.pending_deliveries.lock().await.remove(&checkpoint_id)
.ok_or(WebhookError::CheckpointNotFound(checkpoint_id))?;
// Deliver all webhooks
for delivery in pending {
// Send HTTP request
let response = self.http_client.execute(delivery.request).await?;
if response.status().is_success() {
// Mark as delivered in log
self.delivery_log.mark_delivered(&delivery.event.id).await?;
} else {
return Err(WebhookError::DeliveryFailed(response.status()));
}
}
Ok(())
}
async fn abort(&mut self, checkpoint_id: u64) -> Result<()> {
// Discard pending deliveries
self.pending_deliveries.lock().await.remove(&checkpoint_id);
// Rollback delivery intents in log
self.delivery_log.rollback_intent(checkpoint_id).await?;
Ok(())
}
}

Idempotency Log (PostgreSQL):

CREATE TABLE webhook_delivery_log (
event_id TEXT PRIMARY KEY,
checkpoint_id BIGINT NOT NULL,
webhook_url TEXT NOT NULL,
status TEXT NOT NULL CHECK (status IN ('intent', 'delivered', 'failed')),
delivered_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX idx_webhook_delivery_checkpoint ON webhook_delivery_log(checkpoint_id);
CREATE INDEX idx_webhook_delivery_status ON webhook_delivery_log(status);

2.5 Webhook Coordinator

Architecture:

pub struct WebhookCoordinator {
cdc_processor: TenantCdcProcessor,
sql_filter: Arc<SqlFilterEngine>,
template_engine: Arc<TemplateEngine>,
exactly_once_coordinator: Option<ExactlyOnceCoordinator>,
webhooks: Arc<RwLock<HashMap<String, WebhookRegistration>>>,
}
impl WebhookCoordinator {
pub async fn start(&mut self) -> Result<()> {
// Start CDC processor
self.cdc_processor.start().await?;
// Get event stream
let mut event_stream = self.cdc_processor.event_stream();
// Process events
while let Some(event) = event_stream.recv().await {
self.process_event(event).await?;
}
Ok(())
}
async fn process_event(&self, event: ChangeEvent) -> Result<()> {
// Find matching webhooks
let webhooks = self.find_matching_webhooks(&event).await?;
for webhook in webhooks {
// 1. SQL filtering
if let Some(filter) = &webhook.sql_filter {
if !self.sql_filter.evaluate(filter, &event).await? {
continue; // Event doesn't match filter
}
}
// 2. Template transformation
let payload = if let Some(template) = &webhook.template {
let context = WebhookContext { event: &event, webhook: &webhook };
self.template_engine.render(template, &context).await?
} else {
serde_json::to_string(&event)?
};
// 3. Deliver webhook
if webhook.exactly_once {
// Use exactly-once coordinator
self.exactly_once_coordinator.as_ref()
.ok_or(WebhookError::ExactlyOnceNotConfigured)?
.deliver_webhook(&webhook, payload).await?;
} else {
// Direct delivery (at-least-once)
self.deliver_webhook_best_effort(&webhook, payload).await?;
}
}
Ok(())
}
}

3. Data Flow

3.1 End-to-End Flow

1. Application writes to database
2. WAL entry created
3. CDC EventProcessor reads WAL (heliosdb-cdc)
4. TenantEventFilter filters by tenant_id and table
5. TenantEventSink buffers events in ring buffer
6. TenantEventStream yields events to WebhookCoordinator
7. WebhookCoordinator finds matching webhooks (table name)
8. SQL Filter Engine evaluates WHERE predicate
↓ (if passes filter)
9. Template Engine renders payload (if template configured)
10. ExactlyOnceCoordinator batches events for checkpoint
11. ExactlyOnceWebhookSink pre-commits (logs intent)
12. HTTP client delivers webhooks
13. Idempotency log marks as delivered
14. Checkpoint saved
15. On failure: Recover from checkpoint, check idempotency log, skip delivered

3.2 Latency Breakdown

StageLatency (p50)Latency (p99)Notes
CDC Capture2ms5msWAL read + parse
Event Stream0.5ms1msRing buffer dequeue
Table Matching0.1ms0.2msHash map lookup
SQL Filtering0.5ms2msCompiled predicate eval
Template Rendering0.3ms1msPre-compiled template
Checkpoint Barrier0.01ms0.05msAmortized (every 10s)
HTTP Delivery30ms80msNetwork latency (depends on target)
Delivery Log2ms5msPostgreSQL/Redis write
Total~36ms~95msMeets <100ms target

4. API Design

4.1 SQL DDL

-- Create webhook
CREATE WEBHOOK <name>
ON TABLE <table>
[FILTER WHERE <predicate>]
[TEMPLATE '<jinja2_template>']
TO ENDPOINT '<url>'
[WITH (
method = 'POST' | 'PUT' | 'PATCH',
headers = '<json>',
retry_max_attempts = <int>,
retry_initial_delay_ms = <int>,
retry_backoff_multiplier = <float>,
exactly_once = true | false,
rate_limit = <int> per minute | hour | day,
circuit_breaker_enabled = true | false,
circuit_breaker_threshold = <int>,
enabled = true | false
)];
-- Update webhook
ALTER WEBHOOK <name> SET enabled = true | false;
ALTER WEBHOOK <name> SET filter WHERE <new_predicate>;
ALTER WEBHOOK <name> SET template '<new_template>';
ALTER WEBHOOK <name> SET rate_limit <int> per minute;
-- Drop webhook
DROP WEBHOOK <name>;
-- List webhooks
SELECT * FROM helios_webhooks;
-- Metrics
SELECT * FROM helios_webhook_metrics WHERE webhook_name = '<name>';
-- Delivery log
SELECT * FROM helios_webhook_delivery_log
WHERE webhook_name = '<name>'
ORDER BY delivered_at DESC
LIMIT 100;

4.2 REST API

POST /api/webhooks - Create webhook
GET /api/webhooks - List webhooks
GET /api/webhooks/:id - Get webhook details
PUT /api/webhooks/:id - Update webhook
DELETE /api/webhooks/:id - Delete webhook
POST /api/webhooks/:id/enable - Enable webhook
POST /api/webhooks/:id/disable - Disable webhook
GET /api/webhooks/:id/metrics - Get metrics
GET /api/webhooks/:id/logs - Get delivery logs (last 100)
POST /api/webhooks/:id/test - Test webhook (send sample event)

Example Request (Create Webhook):

Terminal window
curl -X POST http://localhost:8080/api/webhooks \
-H "Content-Type: application/json" \
-d '{
"name": "high_value_orders",
"table": "orders",
"sql_filter": "amount > 1000 AND status = '\''pending'\''",
"template": "{\"order_id\": {{ event.new_data.id }}, \"amount\": {{ event.new_data.amount }}}",
"endpoint": "https://api.example.com/webhooks/orders",
"method": "POST",
"headers": {"Authorization": "Bearer secret_token"},
"retry_policy": {
"max_attempts": 5,
"initial_delay_ms": 1000,
"backoff_multiplier": 2.0
},
"exactly_once": true,
"rate_limit": 100,
"enabled": true
}'

5. Performance Optimization

5.1 Compiled Predicate Caching

Strategy: Pre-compile SQL predicates to AST, cache in memory

Implementation:

let compiled_cache: Arc<RwLock<HashMap<String, CompiledFilter>>> = ...;
// Cache hit (99%+ of requests)
if let Some(cached) = compiled_cache.read().await.get(predicate) {
return cached.clone(); // <1μs
}
// Cache miss (1% of requests)
let compiled = compile_predicate(predicate)?; // ~10ms
compiled_cache.write().await.insert(predicate.to_string(), compiled);

Expected Performance:

  • Cache hit: <1μs (memory lookup)
  • Cache miss: ~10ms (parsing + optimization)
  • Cache eviction: LRU, max 10,000 entries (~10MB RAM)

5.2 Column Pruning

Strategy: Extract only columns referenced in SQL predicate

Example:

// Predicate: "amount > 1000 AND status = 'pending'"
// Referenced columns: ["amount", "status"]
// Full row extraction (slow)
let row = event.new_data.clone(); // Copies all columns
// Pruned extraction (fast)
let row = HashMap::from([
("amount", event.new_data["amount"].clone()),
("status", event.new_data["status"].clone()),
]);

Expected Performance: 10x faster for events with many columns (e.g., 50+ columns)

5.3 Template Caching

Strategy: Pre-compile Jinja2 templates, cache in memory

Implementation:

let template_cache: Arc<RwLock<HashMap<String, CompiledTemplate>>> = ...;
// Warm path (compiled template cached)
if let Some(compiled) = template_cache.read().await.get(template) {
return self.tera.render_compiled(&compiled, context)?; // <0.5ms
}
// Cold path (first render)
let rendered = self.tera.render_str(template, context)?; // ~10ms
template_cache.write().await.insert(template.to_string(), compiled);

Expected Performance:

  • Warm path: <0.5ms (cached template)
  • Cold path: ~10ms (parsing + optimization)

6. Deployment Architecture

6.1 Single-Node Deployment

┌────────────────────────────────────────────┐
│ HeliosDB Single Node │
│ │
│ ┌──────────────┐ ┌──────────────┐ │
│ │ PostgreSQL │──────│ heliosdb- │ │
│ │ (WAL) │ │ cdc │ │
│ └──────────────┘ └───────┬──────┘ │
│ │ │
│ ┌─────────▼───────┐ │
│ │ F6.23 Webhook │ │
│ │ Coordinator │ │
│ └─────────┬───────┘ │
│ │ │
│ ▼ │
│ External Webhooks │
└────────────────────────────────────────────┘

6.2 Multi-Node Deployment (Production)

┌────────────────────────────────────────────────────────────────┐
│ Load Balancer (HAProxy) │
└───────────────────────┬────────────────────────────────────────┘
┌───────────────┼───────────────┐
│ │ │
▼ ▼ ▼
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ HeliosDB │ │ HeliosDB │ │ HeliosDB │
│ Node 1 │ │ Node 2 │ │ Node 3 │
│ │ │ │ │ │
│ (Shard: │ │ (Shard: │ │ (Shard: │
│ tenants │ │ tenants │ │ tenants │
│ 0-999) │ │ 1000-1999) │ │ 2000-2999) │
└──────┬───────┘ └──────┬───────┘ └──────┬───────┘
│ │ │
└─────────────────┼─────────────────┘
┌─────────────────────┐
│ Shared Services │
│ - Checkpoint Store │
│ (S3) │
│ - Delivery Log │
│ (PostgreSQL) │
│ - Metrics DB │
│ (Prometheus) │
└─────────────────────┘

Sharding Strategy: Shard by tenant_id or table name

Scaling Targets:

  • Nodes: 3-10 (horizontal scaling)
  • Webhooks/sec: 10K+ (cluster-wide)
  • Tenants: 1000+ (sharded across nodes)

7. Monitoring and Observability

7.1 Metrics

Key Metrics (Prometheus format):

# Webhook delivery metrics
heliosdb_webhook_deliveries_total{webhook_name, status} - Counter
heliosdb_webhook_delivery_latency_seconds{webhook_name, quantile} - Histogram
heliosdb_webhook_delivery_errors_total{webhook_name, error_type} - Counter
# SQL filtering metrics
heliosdb_webhook_filter_evaluations_total{webhook_name} - Counter
heliosdb_webhook_filter_latency_seconds{webhook_name, quantile} - Histogram
heliosdb_webhook_filter_matches_total{webhook_name} - Counter
# Template rendering metrics
heliosdb_webhook_template_renders_total{webhook_name} - Counter
heliosdb_webhook_template_latency_seconds{webhook_name, quantile} - Histogram
heliosdb_webhook_template_errors_total{webhook_name, error_type} - Counter
# Exactly-once metrics
heliosdb_webhook_checkpoint_saves_total - Counter
heliosdb_webhook_checkpoint_save_latency_seconds{quantile} - Histogram
heliosdb_webhook_idempotency_checks_total - Counter
heliosdb_webhook_duplicate_deliveries_prevented_total - Counter
# Circuit breaker metrics
heliosdb_webhook_circuit_breaker_state{webhook_name, state} - Gauge
heliosdb_webhook_circuit_breaker_transitions_total{webhook_name, from, to} - Counter

7.2 Dashboards

Grafana Dashboard: F6.23 Webhooks Overview

Panels:

  1. Delivery Success Rate (99.99% target)
  2. End-to-End Latency (p50, p99, p999)
  3. Throughput (webhooks/sec)
  4. Error Rate by Type
  5. Circuit Breaker Status
  6. SQL Filter Performance
  7. Template Render Performance
  8. Exactly-Once Checkpoint Lag

8. Implementation Roadmap

8.1 Phase 1: Foundation (Weeks 1-3)

Deliverables:

  • SqlFilterEngine (500-700 LOC)
  • TemplateEngine integration (300-400 LOC)
  • WebhookRegistration API (400-500 LOC)
  • Basic integration tests (200-300 LOC)

Milestones:

  • Week 1: SqlFilterEngine + unit tests
  • Week 2: TemplateEngine + unit tests
  • Week 3: WebhookRegistration API + integration

Estimated LOC: 1,600-2,200 LOC

8.2 Phase 2: Exactly-Once (Weeks 4-7)

Deliverables:

  • ExactlyOnceCoordinator (600-800 LOC)
  • ExactlyOnceWebhookSink (500-700 LOC)
  • DeliveryLog (300-400 LOC)
  • Recovery logic (400-500 LOC)
  • Integration tests (400-500 LOC)

Milestones:

  • Weeks 4-5: Coordinator + 2PC sink
  • Week 6: DeliveryLog + recovery
  • Week 7: Testing + performance tuning

Estimated LOC: 2,200-2,900 LOC

8.3 Phase 3: Production Hardening (Weeks 8-10)

Deliverables:

  • Circuit breaker (200-300 LOC)
  • Advanced rate limiting (300-400 LOC)
  • Monitoring and metrics (400-500 LOC)
  • Documentation (1,000+ lines)
  • Load testing (300-400 LOC)

Milestones:

  • Week 8: Circuit breaker + rate limiting
  • Week 9: Monitoring + documentation
  • Week 10: Load testing + optimization

Estimated LOC: 1,200-1,600 LOC

8.4 Total Estimates

Total Timeline: 10 weeks (2.5 months) Total LOC: 5,000-7,000 LOC (production code) Total Tests: 60-80 tests Resource Requirements: 2-3 engineers full-time


Conclusion

F6.23: Advanced Event-Driven Webhooks provides a world-class webhook system with SQL filtering, exactly-once delivery, and template transformations. The architecture leverages existing HeliosDB infrastructure (CDC, streaming, webhooks) while adding novel capabilities that differentiate HeliosDB from all competitors.

Key Differentiators:

  • SQL-based filtering (no competitor has this)
  • Exactly-once delivery (only Debezium has this, not for webhooks)
  • Template transformations (common, but integrated with CDC+exactly-once is unique)
  • Unified system (no competitor integrates all three)

Business Impact:

  • Market Differentiation: First database with SQL-filtered, exactly-once webhooks
  • ARR Potential: $15-25M (based on event-driven architecture adoption)
  • Competitive Moat: 18-24 month technical lead
  • Target Customers: E-commerce, fintech, real-time analytics, IoT

Next Steps:

  1. Executive approval for v7.0 roadmap
  2. Budget allocation ($400K-500K for 2-3 engineers × 2.5 months)
  3. Defensive publication (establish prior art)
  4. Phase 1 kickoff (SqlFilterEngine + TemplateEngine)

Document Version: 1.0 Status: Architecture Proposal Author: Research Agent (HeliosDB Hive Mind) Date: November 2, 2025


END OF ARCHITECTURE DOCUMENT