ML Feedback Loop System for Intelligent Filter Advisor
ML Feedback Loop System for Intelligent Filter Advisor
Overview
HeliosDB-Lite implements an automatic, self-optimizing filter management system that uses machine learning principles to continuously improve filter recommendations. The system operates without manual model training - it learns from actual query execution patterns.
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐│ QUERY EXECUTION LAYER ││ ┌─────────────────────────────────────────────────────────────────────┐ ││ │ Query Optimizer │ ││ │ FilterCostModel.estimate_filter_cost() │ ││ │ → Decides whether to use filter based on cost/benefit │ ││ └─────────────────────────────────────────────────────────────────────┘ │└────────────────────────────────────┬────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────────────────────┐│ STORAGE EXECUTION LAYER ││ ┌─────────────────────────────────────────────────────────────────────┐ ││ │ PredicateEvaluator │ ││ │ can_prune_segment() → Uses bloom/zone filters │ ││ │ → Calls record_hit() / record_miss() / record_false_positive() │ ││ └─────────────────────────────────────────────────────────────────────┘ │└────────────────────────────────────┬────────────────────────────────────────┘ │ ▼┌─────────────────────────────────────────────────────────────────────────────┐│ ML FEEDBACK LOOP LAYER ││ ││ ┌───────────────────────┐ ┌───────────────────────┐ ││ │FilterPerformanceTracker│───▶│ FilterAdvisor │ ││ │ │ │ │ ││ │ • Collects metrics │ │ • Receives feedback │ ││ │ • Calculates scores │ │ • Updates weights │ ││ │ • Batches feedback │ │ • Makes recommendations│ ││ └───────────────────────┘ └───────────┬───────────┘ ││ │ ││ ┌───────────────────────┐ ┌───────────▼───────────┐ ││ │ WorkloadAnalyzer │───▶│ FilterBuildCoordinator│ ││ │ │ │ │ ││ │ • Query patterns │ │ • Builds new filters │ ││ │ • Access frequency │ │ • Evicts poor filters │ ││ │ • Feature vectors │ │ • Online rebuild │ ││ └───────────────────────┘ └───────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────────────────┘Components
1. FilterPerformanceTracker
File: src/storage/filter_performance_tracker.rs
Tracks actual filter performance during query execution.
Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
min_samples | u64 | 100 | Minimum lookups before calculating metrics |
rolling_window_size | usize | 1000 | Window size for rolling metrics |
feedback_interval | u64 | 500 | Events between feedback batches |
auto_feedback_enabled | bool | true | Enable automatic feedback to advisor |
effectiveness_threshold | f64 | 0.8 | Threshold for “effective” filter (0.0-1.0) |
Metrics Tracked
| Metric | Description | Formula |
|---|---|---|
total_lookups | Total filter queries | Count |
positive_results | Filter said “might contain” | Count |
negative_results | Filter said “definitely not” | Count |
true_positives | Positive + key actually found | Count |
false_positives | Positive + key not found | Count |
total_lookup_time_ns | Cumulative lookup latency | Nanoseconds |
rows_pruned | Rows skipped due to filter | Count |
bytes_saved | Estimated bytes not read | rows_pruned * 100 |
Derived Metrics
/// Hit rate: How often filter returns positivehit_rate = positive_results / total_lookups
/// False positive rate: How often positive is wrongfalse_positive_rate = false_positives / positive_results
/// Average lookup latencyavg_lookup_latency_ns = total_lookup_time_ns / total_lookups
/// Effectiveness score (0.0-1.0)/// Combines prune rate (60%) and accuracy (40%)effectiveness_score = (prune_rate * 0.6) + (accuracy * 0.4)where: prune_rate = negative_results / total_lookups accuracy = true_positives / positive_results
/// Benefit score: Net value of using the filterbenefit_score = (latency_saved - lookup_overhead) / total_lookupswhere: latency_saved = rows_pruned * 1.0 // 1ms per row estimate lookup_overhead = total_lookups * avg_lookup_latency_ns / 1_000_000API Methods
impl FilterPerformanceTracker { /// Create tracker linked to registry pub fn new(registry: Arc<FilterRegistry>) -> Self;
/// Configure advisor for feedback pub fn with_advisor(self, advisor: Arc<FilterAdvisor>) -> Self;
/// Configure workload analyzer for features pub fn with_workload_analyzer(self, analyzer: Arc<WorkloadAnalyzer>) -> Self;
/// Set custom configuration pub fn with_config(self, config: TrackerConfig) -> Self;
/// Record filter lookup event (full details) pub fn record_lookup(&self, event: LookupEvent);
/// Record a hit (filter positive, key found) pub fn record_hit(&self, filter_id: FilterId, duration_ns: u64);
/// Record a miss (filter negative, segment pruned) pub fn record_miss(&self, filter_id: FilterId, duration_ns: u64, rows_pruned: u64);
/// Record false positive (filter positive, key not found) pub fn record_false_positive(&self, filter_id: FilterId, duration_ns: u64);
/// Get metrics for specific filter pub fn get_metrics(&self, filter_id: FilterId) -> Option<FilterMetrics>;
/// Get all metrics pub fn get_all_metrics(&self) -> HashMap<FilterId, FilterMetrics>;
/// Check if filter is effective pub fn is_filter_effective(&self, filter_id: FilterId) -> Option<bool>;
/// Get underperforming filters for eviction pub fn get_underperforming_filters(&self) -> Vec<(FilterId, FilterMetrics)>;
/// Manually trigger feedback batch pub fn send_feedback_batch(&self);
/// Reset metrics (after filter rebuild) pub fn reset_metrics(&self, filter_id: FilterId);}2. FilterAdvisor
File: src/storage/filter_advisor.rs
ML-based decision engine for filter recommendations.
Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
learning_rate | f64 | 0.01 | How fast weights adapt to feedback |
min_feedback_samples | u64 | 50 | Samples before adjusting weights |
recommendation_threshold | f64 | 0.5 | Min score to recommend filter |
auto_create_enabled | bool | true | Auto-create recommended filters |
auto_evict_enabled | bool | true | Auto-evict poor filters |
Feature Vector
The advisor uses a feature vector to characterize columns/tables:
pub struct FilterFeatureVector { // Column characteristics pub cardinality: u64, // Distinct values pub null_ratio: f64, // % null values pub avg_value_size: u64, // Average bytes per value pub data_type: DataTypeCategory, // Int, String, Float, etc.
// Access patterns pub read_frequency: f64, // Reads per minute pub write_frequency: f64, // Writes per minute pub delete_frequency: f64, // Deletes per minute pub scan_vs_point_ratio: f64, // Scan queries / Point queries
// Query patterns pub equality_predicate_count: u64, // WHERE col = ? pub range_predicate_count: u64, // WHERE col > ? pub in_list_predicate_count: u64, // WHERE col IN (...) pub like_predicate_count: u64, // WHERE col LIKE ?
// Data temperature pub data_temperature: DataTemperature, // Hot/Warm/Cold
// Current filter (if exists) pub current_filter_type: Option<FilterType>, pub current_hit_rate: Option<f64>, pub current_fpr: Option<f64>,}
pub enum DataTemperature { Hot, // >100 accesses/min Warm, // 1-100 accesses/min Cold, // <1 access/min}ML Model Weights
The advisor uses adaptive weights that learn from feedback:
pub struct ModelWeights { // Feature importance weights (0.0-1.0) pub read_weight: f64, // Default: 0.3 pub write_weight: f64, // Default: -0.1 (writes hurt filter value) pub equality_weight: f64, // Default: 0.4 pub range_weight: f64, // Default: 0.2 pub cardinality_weight: f64, // Default: 0.1
// Filter type multipliers pub bloom_multiplier: f64, // Default: 1.0 pub cuckoo_multiplier: f64, // Default: 1.1 pub xor_multiplier: f64, // Default: 1.2 pub ribbon_multiplier: f64, // Default: 1.15 pub mphf_multiplier: f64, // Default: 1.3}Decision Algorithm
impl FilterAdvisor { /// Main recommendation entry point pub fn recommend_filter(&self, features: &FilterFeatureVector) -> FilterRecommendation { // 1. Check if filter is beneficial if !self.is_filter_beneficial(features) { return FilterRecommendation::none(); }
// 2. Select filter type using decision tree let filter_type = self.select_filter_type(features);
// 3. Apply ML scoring let ml_score = self.model.predict_benefit(features, &filter_type);
// 4. Apply budget constraints let constrained = self.apply_budget_constraints(filter_type, features);
FilterRecommendation { filter_type, score: ml_score, ... } }
/// Filter type selection decision tree fn select_filter_type(&self, f: &FilterFeatureVector) -> FilterType { // DELETE-heavy? → Cuckoo (supports deletion) if f.delete_frequency > 0.1 { return FilterType::Cuckoo { fpr: 0.01 }; }
// Cold data? → XOR (smallest footprint) if f.data_temperature == DataTemperature::Cold { return FilterType::Xor { fpr: 0.01 }; }
// String with low cardinality? → MPHF (dictionary) if f.data_type == DataTypeCategory::String && f.cardinality < f.table_row_count / 10 { return FilterType::Mphf; }
// High write frequency? → Bloom (fastest insert) if f.write_frequency > f.read_frequency * 2.0 { return FilterType::Bloom { fpr: 0.01 }; }
// Default: Cuckoo (good all-around) FilterType::Cuckoo { fpr: 0.01, supports_delete: true } }
/// Receive feedback from performance tracker pub fn record_feedback( &self, filter_id: FilterId, features: &FilterFeatureVector, filter_type: &FilterType, actual_benefit: f64, ) { // Store training example self.training_data.push(TrainingExample { features: features.clone(), filter_type: filter_type.clone(), actual_benefit, timestamp: Instant::now(), });
// Optionally trigger weight update if self.should_retrain() { self.update_weights(); } }}API Methods
impl FilterAdvisor { /// Create new advisor pub fn new() -> Self;
/// Configure settings pub fn with_config(self, config: AdvisorConfig) -> Self;
/// Get recommendation for a column pub fn recommend_filter(&self, features: &FilterFeatureVector) -> FilterRecommendation;
/// Record actual performance feedback pub fn record_feedback( &self, filter_id: FilterId, features: &FilterFeatureVector, filter_type: &FilterType, actual_benefit: f64 );
/// Get all pending recommendations pub fn get_recommendations(&self) -> Vec<FilterRecommendation>;
/// Manually trigger weight update pub fn update_weights(&self);
/// Get current model weights (for debugging) pub fn get_weights(&self) -> ModelWeights;}3. WorkloadAnalyzer
File: src/storage/workload_analyzer.rs
Tracks query patterns to generate feature vectors.
Configuration Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
window_duration | Duration | 1 hour | Time window for pattern analysis |
max_patterns | usize | 10000 | Max patterns to store |
hot_threshold | f64 | 100.0 | Accesses/min for “hot” data |
warm_threshold | f64 | 1.0 | Accesses/min for “warm” data |
Tracked Patterns
pub struct QueryPattern { pub tables_accessed: Vec<TableId>, pub columns_accessed: Vec<(TableId, ColumnId)>, pub predicates: Vec<PredicateInfo>, pub join_columns: Vec<(TableId, ColumnId)>, pub execution_time_ms: u64, pub rows_scanned: u64, pub rows_returned: u64, pub timestamp: Instant,}
pub struct PredicateInfo { pub column: (TableId, ColumnId), pub predicate_type: PredicateType, // Equality, Range, In, Like pub selectivity: f64, // Estimated % rows matching}API Methods
impl WorkloadAnalyzer { /// Create analyzer with default config pub fn new() -> Self;
/// Create with custom config pub fn with_config(config: WorkloadAnalyzerConfig) -> Self;
/// Record a query execution pub fn record_query(&self, pattern: QueryPattern);
/// Generate feature vector for a column pub fn generate_features(&self, table: &str, column: &str) -> FilterFeatureVector;
/// Get columns with high filter potential pub fn get_high_value_columns(&self) -> Vec<(TableId, ColumnId, f64)>;
/// Get data temperature for a table pub fn get_data_temperature(&self, table: &str) -> DataTemperature;}4. FilterCostModel (Query Optimizer Integration)
File: src/optimizer/cost.rs
Integrates filter costs into query optimization.
Cost Estimation
pub struct FilterCostEstimate { pub lookup_cost_ns: u64, // CPU cost of filter lookup pub io_saved_bytes: u64, // Estimated I/O avoided pub false_positive_cost: f64, // Cost of FP verification pub net_benefit: f64, // Overall benefit score}
impl FilterCostModel { /// Estimate cost/benefit of using a filter pub fn estimate_filter_cost( &self, filter: &FilterInfo, predicate: &Predicate, row_count: u64, ) -> FilterCostEstimate { let lookup_cost = filter.lookup_cost_ns * row_count; let expected_hits = row_count as f64 * predicate.selectivity(); let false_positives = expected_hits * filter.fpr;
FilterCostEstimate { lookup_cost_ns: lookup_cost, io_saved_bytes: self.estimate_io_saved(filter, expected_hits, row_count), false_positive_cost: self.estimate_fp_cost(false_positives), net_benefit: self.compute_net_benefit(..), } }
/// Select best filter for predicate pub fn select_best_filter( &self, table: TableId, column: ColumnId, predicate: &Predicate, ) -> Option<FilterId>;}Data Flow
1. Query Execution → Metric Collection
User Query: SELECT * FROM users WHERE id = 42 │ ▼ ┌───────────────┐ │ Query Planner │ │ │ │ Checks FilterCostModel: │ "Is there a filter for users.id?" │ "Is it cost-effective?" └───────┬───────┘ │ Yes, use filter ▼ ┌───────────────────┐ │ PredicateEvaluator │ │ │ │ bloom_filter.might_contain(42) │ │ │ ├─► Returns FALSE → record_miss() │ │ → Prune entire segment! │ │ │ └─► Returns TRUE → Check actual data │ │ │ ├─► Key found → record_hit() │ └─► Key not found → record_false_positive() └───────────────────┘2. Metric Collection → Feedback
Every 500 events (configurable): │ ▼ ┌───────────────────────────────┐ │ FilterPerformanceTracker │ │ │ │ For each filter with ≥100 samples: │ 1. Calculate effectiveness_score │ 2. Calculate benefit_score │ 3. Get feature vector from WorkloadAnalyzer │ 4. Send to FilterAdvisor.record_feedback() └───────────────┬───────────────┘ │ ▼ ┌───────────────────────────────┐ │ FilterAdvisor │ │ │ │ 1. Store training example │ │ 2. Compare predicted vs actual│ │ 3. Update model weights │ │ 4. Adjust future predictions │ └───────────────────────────────┘3. Feedback → New Recommendations
Periodically or on-demand: │ ▼ ┌───────────────────────────────┐ │ FilterAdvisor.recommend() │ │ │ │ For columns without filters: │ │ 1. Get features from WorkloadAnalyzer │ 2. Apply decision tree │ │ 3. Apply ML scoring │ │ 4. If score > threshold: │ │ → Create recommendation │ └───────────────┬───────────────┘ │ ▼ ┌───────────────────────────────┐ │ FilterBuildCoordinator │ │ │ │ 1. Queue filter build │ │ 2. Build in background │ │ 3. Apply deltas │ │ 4. Atomic swap │ │ 5. Register in FilterRegistry │ └───────────────────────────────┘SQL Views (Planned)
Filter Metrics View
-- View all filter performance metricsCREATE VIEW helios_filter_metrics ASSELECT f.filter_id, f.table_name, f.column_name, f.filter_type, m.total_lookups, m.hit_rate, m.false_positive_rate, m.effectiveness_score, m.benefit_score, m.avg_lookup_latency_ns, m.rows_pruned, m.bytes_saved, m.last_updateFROM filter_registry fJOIN filter_performance_metrics m ON f.filter_id = m.filter_id;Filter Recommendations View
-- View pending filter recommendationsCREATE VIEW helios_filter_recommendations ASSELECT r.target_table, r.target_column, r.recommended_type, r.priority_score, r.estimated_benefit_ms, r.estimated_storage_bytes, r.confidence, r.reasonFROM filter_advisor_recommendations rWHERE r.status = 'pending'ORDER BY r.priority_score DESC;Workload Patterns View
-- View column access patternsCREATE VIEW helios_workload_patterns ASSELECT table_name, column_name, read_frequency, write_frequency, delete_frequency, equality_predicates, range_predicates, data_temperature, filter_recommendationFROM workload_analyzer_patterns;Configuration
Runtime Configuration (SQL)
-- Enable/disable automatic filter creationSET helios.filter_auto_create = true;
-- Enable/disable automatic filter evictionSET helios.filter_auto_evict = true;
-- Set feedback interval (events)SET helios.filter_feedback_interval = 500;
-- Set minimum samples for metricsSET helios.filter_min_samples = 100;
-- Set effectiveness thresholdSET helios.filter_effectiveness_threshold = 0.8;
-- Set learning rateSET helios.filter_learning_rate = 0.01;
-- View current settingsSHOW helios.filter_%;Programmatic Configuration (Rust)
// Configure performance trackerlet tracker_config = TrackerConfig { min_samples: 100, rolling_window_size: 1000, feedback_interval: 500, auto_feedback_enabled: true, effectiveness_threshold: 0.8,};
let tracker = FilterPerformanceTracker::new(registry) .with_config(tracker_config) .with_advisor(advisor) .with_workload_analyzer(analyzer);
// Configure advisorlet advisor_config = AdvisorConfig { learning_rate: 0.01, min_feedback_samples: 50, recommendation_threshold: 0.5, auto_create_enabled: true, auto_evict_enabled: true,};
let advisor = FilterAdvisor::new() .with_config(advisor_config);Tuning Guidelines
For OLTP Workloads (High Write/Delete)
-- Prefer Cuckoo filters (support deletion)SET helios.filter_default_type = 'cuckoo';
-- Lower feedback interval (faster adaptation)SET helios.filter_feedback_interval = 200;
-- Higher effectiveness thresholdSET helios.filter_effectiveness_threshold = 0.85;For OLAP Workloads (Read-Heavy)
-- Prefer XOR filters (smallest footprint)SET helios.filter_default_type = 'xor';
-- Higher feedback interval (less overhead)SET helios.filter_feedback_interval = 1000;
-- Lower threshold (more filters)SET helios.filter_effectiveness_threshold = 0.7;For Mixed Workloads
-- Let advisor choose per-columnSET helios.filter_default_type = 'auto';
-- Balanced settingsSET helios.filter_feedback_interval = 500;SET helios.filter_effectiveness_threshold = 0.8;Monitoring & Debugging
Check Filter Effectiveness
-- Find underperforming filtersSELECT * FROM helios_filter_metricsWHERE effectiveness_score < 0.5 AND total_lookups > 1000;
-- Find high-value filtersSELECT * FROM helios_filter_metricsWHERE benefit_score > 0.5ORDER BY benefit_score DESCLIMIT 10;Check Workload Patterns
-- Find columns that should have filtersSELECT * FROM helios_workload_patternsWHERE read_frequency > 100 AND equality_predicates > 0 AND filter_recommendation IS NOT NULL;Debug ML Weights
// Get current model weightslet weights = advisor.get_weights();println!("Read weight: {}", weights.read_weight);println!("Equality weight: {}", weights.equality_weight);
// Get training exampleslet examples = advisor.get_training_data();for ex in examples.iter().take(10) { println!("Predicted: {}, Actual: {}", advisor.predict(&ex.features, &ex.filter_type), ex.actual_benefit);}Implementation Status
| Component | Status | File |
|---|---|---|
| FilterPerformanceTracker | ✅ Complete | src/storage/filter_performance_tracker.rs |
| FilterAdvisor | ✅ Complete | src/storage/filter_advisor.rs |
| WorkloadAnalyzer | ✅ Complete | src/storage/workload_analyzer.rs |
| FilterCostModel | ✅ Complete | src/optimizer/cost.rs |
| PredicateEvaluator integration | ✅ Complete | src/storage/predicate/evaluator.rs |
| StorageEngine integration | ✅ Complete | src/storage/engine.rs |
| SQL Views | 🔲 Planned | - |
| SQL Configuration | 🔲 Planned | - |