Skip to content

ML Feedback Loop System for Intelligent Filter Advisor

ML Feedback Loop System for Intelligent Filter Advisor

Overview

HeliosDB-Lite implements an automatic, self-optimizing filter management system that uses machine learning principles to continuously improve filter recommendations. The system operates without manual model training - it learns from actual query execution patterns.

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│ QUERY EXECUTION LAYER │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ Query Optimizer │ │
│ │ FilterCostModel.estimate_filter_cost() │ │
│ │ → Decides whether to use filter based on cost/benefit │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────┬────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ STORAGE EXECUTION LAYER │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ PredicateEvaluator │ │
│ │ can_prune_segment() → Uses bloom/zone filters │ │
│ │ → Calls record_hit() / record_miss() / record_false_positive() │ │
│ └─────────────────────────────────────────────────────────────────────┘ │
└────────────────────────────────────┬────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────────────────────────┐
│ ML FEEDBACK LOOP LAYER │
│ │
│ ┌───────────────────────┐ ┌───────────────────────┐ │
│ │FilterPerformanceTracker│───▶│ FilterAdvisor │ │
│ │ │ │ │ │
│ │ • Collects metrics │ │ • Receives feedback │ │
│ │ • Calculates scores │ │ • Updates weights │ │
│ │ • Batches feedback │ │ • Makes recommendations│ │
│ └───────────────────────┘ └───────────┬───────────┘ │
│ │ │
│ ┌───────────────────────┐ ┌───────────▼───────────┐ │
│ │ WorkloadAnalyzer │───▶│ FilterBuildCoordinator│ │
│ │ │ │ │ │
│ │ • Query patterns │ │ • Builds new filters │ │
│ │ • Access frequency │ │ • Evicts poor filters │ │
│ │ • Feature vectors │ │ • Online rebuild │ │
│ └───────────────────────┘ └───────────────────────┘ │
│ │
└─────────────────────────────────────────────────────────────────────────────┘

Components

1. FilterPerformanceTracker

File: src/storage/filter_performance_tracker.rs

Tracks actual filter performance during query execution.

Configuration Parameters

ParameterTypeDefaultDescription
min_samplesu64100Minimum lookups before calculating metrics
rolling_window_sizeusize1000Window size for rolling metrics
feedback_intervalu64500Events between feedback batches
auto_feedback_enabledbooltrueEnable automatic feedback to advisor
effectiveness_thresholdf640.8Threshold for “effective” filter (0.0-1.0)

Metrics Tracked

MetricDescriptionFormula
total_lookupsTotal filter queriesCount
positive_resultsFilter said “might contain”Count
negative_resultsFilter said “definitely not”Count
true_positivesPositive + key actually foundCount
false_positivesPositive + key not foundCount
total_lookup_time_nsCumulative lookup latencyNanoseconds
rows_prunedRows skipped due to filterCount
bytes_savedEstimated bytes not readrows_pruned * 100

Derived Metrics

/// Hit rate: How often filter returns positive
hit_rate = positive_results / total_lookups
/// False positive rate: How often positive is wrong
false_positive_rate = false_positives / positive_results
/// Average lookup latency
avg_lookup_latency_ns = total_lookup_time_ns / total_lookups
/// Effectiveness score (0.0-1.0)
/// Combines prune rate (60%) and accuracy (40%)
effectiveness_score = (prune_rate * 0.6) + (accuracy * 0.4)
where:
prune_rate = negative_results / total_lookups
accuracy = true_positives / positive_results
/// Benefit score: Net value of using the filter
benefit_score = (latency_saved - lookup_overhead) / total_lookups
where:
latency_saved = rows_pruned * 1.0 // 1ms per row estimate
lookup_overhead = total_lookups * avg_lookup_latency_ns / 1_000_000

API Methods

impl FilterPerformanceTracker {
/// Create tracker linked to registry
pub fn new(registry: Arc<FilterRegistry>) -> Self;
/// Configure advisor for feedback
pub fn with_advisor(self, advisor: Arc<FilterAdvisor>) -> Self;
/// Configure workload analyzer for features
pub fn with_workload_analyzer(self, analyzer: Arc<WorkloadAnalyzer>) -> Self;
/// Set custom configuration
pub fn with_config(self, config: TrackerConfig) -> Self;
/// Record filter lookup event (full details)
pub fn record_lookup(&self, event: LookupEvent);
/// Record a hit (filter positive, key found)
pub fn record_hit(&self, filter_id: FilterId, duration_ns: u64);
/// Record a miss (filter negative, segment pruned)
pub fn record_miss(&self, filter_id: FilterId, duration_ns: u64, rows_pruned: u64);
/// Record false positive (filter positive, key not found)
pub fn record_false_positive(&self, filter_id: FilterId, duration_ns: u64);
/// Get metrics for specific filter
pub fn get_metrics(&self, filter_id: FilterId) -> Option<FilterMetrics>;
/// Get all metrics
pub fn get_all_metrics(&self) -> HashMap<FilterId, FilterMetrics>;
/// Check if filter is effective
pub fn is_filter_effective(&self, filter_id: FilterId) -> Option<bool>;
/// Get underperforming filters for eviction
pub fn get_underperforming_filters(&self) -> Vec<(FilterId, FilterMetrics)>;
/// Manually trigger feedback batch
pub fn send_feedback_batch(&self);
/// Reset metrics (after filter rebuild)
pub fn reset_metrics(&self, filter_id: FilterId);
}

2. FilterAdvisor

File: src/storage/filter_advisor.rs

ML-based decision engine for filter recommendations.

Configuration Parameters

ParameterTypeDefaultDescription
learning_ratef640.01How fast weights adapt to feedback
min_feedback_samplesu6450Samples before adjusting weights
recommendation_thresholdf640.5Min score to recommend filter
auto_create_enabledbooltrueAuto-create recommended filters
auto_evict_enabledbooltrueAuto-evict poor filters

Feature Vector

The advisor uses a feature vector to characterize columns/tables:

pub struct FilterFeatureVector {
// Column characteristics
pub cardinality: u64, // Distinct values
pub null_ratio: f64, // % null values
pub avg_value_size: u64, // Average bytes per value
pub data_type: DataTypeCategory, // Int, String, Float, etc.
// Access patterns
pub read_frequency: f64, // Reads per minute
pub write_frequency: f64, // Writes per minute
pub delete_frequency: f64, // Deletes per minute
pub scan_vs_point_ratio: f64, // Scan queries / Point queries
// Query patterns
pub equality_predicate_count: u64, // WHERE col = ?
pub range_predicate_count: u64, // WHERE col > ?
pub in_list_predicate_count: u64, // WHERE col IN (...)
pub like_predicate_count: u64, // WHERE col LIKE ?
// Data temperature
pub data_temperature: DataTemperature, // Hot/Warm/Cold
// Current filter (if exists)
pub current_filter_type: Option<FilterType>,
pub current_hit_rate: Option<f64>,
pub current_fpr: Option<f64>,
}
pub enum DataTemperature {
Hot, // >100 accesses/min
Warm, // 1-100 accesses/min
Cold, // <1 access/min
}

ML Model Weights

The advisor uses adaptive weights that learn from feedback:

pub struct ModelWeights {
// Feature importance weights (0.0-1.0)
pub read_weight: f64, // Default: 0.3
pub write_weight: f64, // Default: -0.1 (writes hurt filter value)
pub equality_weight: f64, // Default: 0.4
pub range_weight: f64, // Default: 0.2
pub cardinality_weight: f64, // Default: 0.1
// Filter type multipliers
pub bloom_multiplier: f64, // Default: 1.0
pub cuckoo_multiplier: f64, // Default: 1.1
pub xor_multiplier: f64, // Default: 1.2
pub ribbon_multiplier: f64, // Default: 1.15
pub mphf_multiplier: f64, // Default: 1.3
}

Decision Algorithm

impl FilterAdvisor {
/// Main recommendation entry point
pub fn recommend_filter(&self, features: &FilterFeatureVector) -> FilterRecommendation {
// 1. Check if filter is beneficial
if !self.is_filter_beneficial(features) {
return FilterRecommendation::none();
}
// 2. Select filter type using decision tree
let filter_type = self.select_filter_type(features);
// 3. Apply ML scoring
let ml_score = self.model.predict_benefit(features, &filter_type);
// 4. Apply budget constraints
let constrained = self.apply_budget_constraints(filter_type, features);
FilterRecommendation { filter_type, score: ml_score, ... }
}
/// Filter type selection decision tree
fn select_filter_type(&self, f: &FilterFeatureVector) -> FilterType {
// DELETE-heavy? → Cuckoo (supports deletion)
if f.delete_frequency > 0.1 {
return FilterType::Cuckoo { fpr: 0.01 };
}
// Cold data? → XOR (smallest footprint)
if f.data_temperature == DataTemperature::Cold {
return FilterType::Xor { fpr: 0.01 };
}
// String with low cardinality? → MPHF (dictionary)
if f.data_type == DataTypeCategory::String
&& f.cardinality < f.table_row_count / 10 {
return FilterType::Mphf;
}
// High write frequency? → Bloom (fastest insert)
if f.write_frequency > f.read_frequency * 2.0 {
return FilterType::Bloom { fpr: 0.01 };
}
// Default: Cuckoo (good all-around)
FilterType::Cuckoo { fpr: 0.01, supports_delete: true }
}
/// Receive feedback from performance tracker
pub fn record_feedback(
&self,
filter_id: FilterId,
features: &FilterFeatureVector,
filter_type: &FilterType,
actual_benefit: f64,
) {
// Store training example
self.training_data.push(TrainingExample {
features: features.clone(),
filter_type: filter_type.clone(),
actual_benefit,
timestamp: Instant::now(),
});
// Optionally trigger weight update
if self.should_retrain() {
self.update_weights();
}
}
}

API Methods

impl FilterAdvisor {
/// Create new advisor
pub fn new() -> Self;
/// Configure settings
pub fn with_config(self, config: AdvisorConfig) -> Self;
/// Get recommendation for a column
pub fn recommend_filter(&self, features: &FilterFeatureVector) -> FilterRecommendation;
/// Record actual performance feedback
pub fn record_feedback(
&self,
filter_id: FilterId,
features: &FilterFeatureVector,
filter_type: &FilterType,
actual_benefit: f64
);
/// Get all pending recommendations
pub fn get_recommendations(&self) -> Vec<FilterRecommendation>;
/// Manually trigger weight update
pub fn update_weights(&self);
/// Get current model weights (for debugging)
pub fn get_weights(&self) -> ModelWeights;
}

3. WorkloadAnalyzer

File: src/storage/workload_analyzer.rs

Tracks query patterns to generate feature vectors.

Configuration Parameters

ParameterTypeDefaultDescription
window_durationDuration1 hourTime window for pattern analysis
max_patternsusize10000Max patterns to store
hot_thresholdf64100.0Accesses/min for “hot” data
warm_thresholdf641.0Accesses/min for “warm” data

Tracked Patterns

pub struct QueryPattern {
pub tables_accessed: Vec<TableId>,
pub columns_accessed: Vec<(TableId, ColumnId)>,
pub predicates: Vec<PredicateInfo>,
pub join_columns: Vec<(TableId, ColumnId)>,
pub execution_time_ms: u64,
pub rows_scanned: u64,
pub rows_returned: u64,
pub timestamp: Instant,
}
pub struct PredicateInfo {
pub column: (TableId, ColumnId),
pub predicate_type: PredicateType, // Equality, Range, In, Like
pub selectivity: f64, // Estimated % rows matching
}

API Methods

impl WorkloadAnalyzer {
/// Create analyzer with default config
pub fn new() -> Self;
/// Create with custom config
pub fn with_config(config: WorkloadAnalyzerConfig) -> Self;
/// Record a query execution
pub fn record_query(&self, pattern: QueryPattern);
/// Generate feature vector for a column
pub fn generate_features(&self, table: &str, column: &str) -> FilterFeatureVector;
/// Get columns with high filter potential
pub fn get_high_value_columns(&self) -> Vec<(TableId, ColumnId, f64)>;
/// Get data temperature for a table
pub fn get_data_temperature(&self, table: &str) -> DataTemperature;
}

4. FilterCostModel (Query Optimizer Integration)

File: src/optimizer/cost.rs

Integrates filter costs into query optimization.

Cost Estimation

pub struct FilterCostEstimate {
pub lookup_cost_ns: u64, // CPU cost of filter lookup
pub io_saved_bytes: u64, // Estimated I/O avoided
pub false_positive_cost: f64, // Cost of FP verification
pub net_benefit: f64, // Overall benefit score
}
impl FilterCostModel {
/// Estimate cost/benefit of using a filter
pub fn estimate_filter_cost(
&self,
filter: &FilterInfo,
predicate: &Predicate,
row_count: u64,
) -> FilterCostEstimate {
let lookup_cost = filter.lookup_cost_ns * row_count;
let expected_hits = row_count as f64 * predicate.selectivity();
let false_positives = expected_hits * filter.fpr;
FilterCostEstimate {
lookup_cost_ns: lookup_cost,
io_saved_bytes: self.estimate_io_saved(filter, expected_hits, row_count),
false_positive_cost: self.estimate_fp_cost(false_positives),
net_benefit: self.compute_net_benefit(..),
}
}
/// Select best filter for predicate
pub fn select_best_filter(
&self,
table: TableId,
column: ColumnId,
predicate: &Predicate,
) -> Option<FilterId>;
}

Data Flow

1. Query Execution → Metric Collection

User Query: SELECT * FROM users WHERE id = 42
┌───────────────┐
│ Query Planner │
│ │
│ Checks FilterCostModel:
│ "Is there a filter for users.id?"
│ "Is it cost-effective?"
└───────┬───────┘
│ Yes, use filter
┌───────────────────┐
│ PredicateEvaluator │
│ │
│ bloom_filter.might_contain(42)
│ │
│ ├─► Returns FALSE → record_miss()
│ │ → Prune entire segment!
│ │
│ └─► Returns TRUE → Check actual data
│ │
│ ├─► Key found → record_hit()
│ └─► Key not found → record_false_positive()
└───────────────────┘

2. Metric Collection → Feedback

Every 500 events (configurable):
┌───────────────────────────────┐
│ FilterPerformanceTracker │
│ │
│ For each filter with ≥100 samples:
│ 1. Calculate effectiveness_score
│ 2. Calculate benefit_score
│ 3. Get feature vector from WorkloadAnalyzer
│ 4. Send to FilterAdvisor.record_feedback()
└───────────────┬───────────────┘
┌───────────────────────────────┐
│ FilterAdvisor │
│ │
│ 1. Store training example │
│ 2. Compare predicted vs actual│
│ 3. Update model weights │
│ 4. Adjust future predictions │
└───────────────────────────────┘

3. Feedback → New Recommendations

Periodically or on-demand:
┌───────────────────────────────┐
│ FilterAdvisor.recommend() │
│ │
│ For columns without filters: │
│ 1. Get features from WorkloadAnalyzer
│ 2. Apply decision tree │
│ 3. Apply ML scoring │
│ 4. If score > threshold: │
│ → Create recommendation │
└───────────────┬───────────────┘
┌───────────────────────────────┐
│ FilterBuildCoordinator │
│ │
│ 1. Queue filter build │
│ 2. Build in background │
│ 3. Apply deltas │
│ 4. Atomic swap │
│ 5. Register in FilterRegistry │
└───────────────────────────────┘

SQL Views (Planned)

Filter Metrics View

-- View all filter performance metrics
CREATE VIEW helios_filter_metrics AS
SELECT
f.filter_id,
f.table_name,
f.column_name,
f.filter_type,
m.total_lookups,
m.hit_rate,
m.false_positive_rate,
m.effectiveness_score,
m.benefit_score,
m.avg_lookup_latency_ns,
m.rows_pruned,
m.bytes_saved,
m.last_update
FROM filter_registry f
JOIN filter_performance_metrics m ON f.filter_id = m.filter_id;

Filter Recommendations View

-- View pending filter recommendations
CREATE VIEW helios_filter_recommendations AS
SELECT
r.target_table,
r.target_column,
r.recommended_type,
r.priority_score,
r.estimated_benefit_ms,
r.estimated_storage_bytes,
r.confidence,
r.reason
FROM filter_advisor_recommendations r
WHERE r.status = 'pending'
ORDER BY r.priority_score DESC;

Workload Patterns View

-- View column access patterns
CREATE VIEW helios_workload_patterns AS
SELECT
table_name,
column_name,
read_frequency,
write_frequency,
delete_frequency,
equality_predicates,
range_predicates,
data_temperature,
filter_recommendation
FROM workload_analyzer_patterns;

Configuration

Runtime Configuration (SQL)

-- Enable/disable automatic filter creation
SET helios.filter_auto_create = true;
-- Enable/disable automatic filter eviction
SET helios.filter_auto_evict = true;
-- Set feedback interval (events)
SET helios.filter_feedback_interval = 500;
-- Set minimum samples for metrics
SET helios.filter_min_samples = 100;
-- Set effectiveness threshold
SET helios.filter_effectiveness_threshold = 0.8;
-- Set learning rate
SET helios.filter_learning_rate = 0.01;
-- View current settings
SHOW helios.filter_%;

Programmatic Configuration (Rust)

// Configure performance tracker
let tracker_config = TrackerConfig {
min_samples: 100,
rolling_window_size: 1000,
feedback_interval: 500,
auto_feedback_enabled: true,
effectiveness_threshold: 0.8,
};
let tracker = FilterPerformanceTracker::new(registry)
.with_config(tracker_config)
.with_advisor(advisor)
.with_workload_analyzer(analyzer);
// Configure advisor
let advisor_config = AdvisorConfig {
learning_rate: 0.01,
min_feedback_samples: 50,
recommendation_threshold: 0.5,
auto_create_enabled: true,
auto_evict_enabled: true,
};
let advisor = FilterAdvisor::new()
.with_config(advisor_config);

Tuning Guidelines

For OLTP Workloads (High Write/Delete)

-- Prefer Cuckoo filters (support deletion)
SET helios.filter_default_type = 'cuckoo';
-- Lower feedback interval (faster adaptation)
SET helios.filter_feedback_interval = 200;
-- Higher effectiveness threshold
SET helios.filter_effectiveness_threshold = 0.85;

For OLAP Workloads (Read-Heavy)

-- Prefer XOR filters (smallest footprint)
SET helios.filter_default_type = 'xor';
-- Higher feedback interval (less overhead)
SET helios.filter_feedback_interval = 1000;
-- Lower threshold (more filters)
SET helios.filter_effectiveness_threshold = 0.7;

For Mixed Workloads

-- Let advisor choose per-column
SET helios.filter_default_type = 'auto';
-- Balanced settings
SET helios.filter_feedback_interval = 500;
SET helios.filter_effectiveness_threshold = 0.8;

Monitoring & Debugging

Check Filter Effectiveness

-- Find underperforming filters
SELECT * FROM helios_filter_metrics
WHERE effectiveness_score < 0.5
AND total_lookups > 1000;
-- Find high-value filters
SELECT * FROM helios_filter_metrics
WHERE benefit_score > 0.5
ORDER BY benefit_score DESC
LIMIT 10;

Check Workload Patterns

-- Find columns that should have filters
SELECT * FROM helios_workload_patterns
WHERE read_frequency > 100
AND equality_predicates > 0
AND filter_recommendation IS NOT NULL;

Debug ML Weights

// Get current model weights
let weights = advisor.get_weights();
println!("Read weight: {}", weights.read_weight);
println!("Equality weight: {}", weights.equality_weight);
// Get training examples
let examples = advisor.get_training_data();
for ex in examples.iter().take(10) {
println!("Predicted: {}, Actual: {}",
advisor.predict(&ex.features, &ex.filter_type),
ex.actual_benefit);
}

Implementation Status

ComponentStatusFile
FilterPerformanceTracker✅ Completesrc/storage/filter_performance_tracker.rs
FilterAdvisor✅ Completesrc/storage/filter_advisor.rs
WorkloadAnalyzer✅ Completesrc/storage/workload_analyzer.rs
FilterCostModel✅ Completesrc/optimizer/cost.rs
PredicateEvaluator integration✅ Completesrc/storage/predicate/evaluator.rs
StorageEngine integration✅ Completesrc/storage/engine.rs
SQL Views🔲 Planned-
SQL Configuration🔲 Planned-

References