Compression Integration Architecture - Part 2 of 4
Compression Integration Architecture - Part 2 of 4
Configuration & Performance Monitoring
Navigation: Index | ← Part 1 | Part 2 | Part 3 →
TOML Configuration
[storage]path = "./data"memory_only = falsecache_size = 536870912 # 512 MB
# Compression configuration (ENHANCED)[storage.compression]enabled = true
# Algorithm options:# - "none" - no compression# - "lz4" - fast general purpose (via RocksDB)# - "zstd" - high ratio general purpose (via RocksDB)# - "auto" - automatic selection based on data pattern (RECOMMENDED)algorithm = "auto"
# Auto-selection configuration (only used when algorithm = "auto")[storage.compression.auto]# Enable automatic codec switching based on data patternsauto_switch = true
# Minimum data size to compress (bytes)# Data smaller than this will not be compressedmin_compress_size = 256
# Target compression ratio# Codec selector aims for this minimum ratiotarget_ratio = 2.0
# Maximum compression time (microseconds)# Fall back to faster codec if exceededmax_compress_time_us = 1000
# Enable statistics collectioncollect_stats = true
# Enabled codecs for auto mode# Codecs will be selected based on data patternenabled_codecs = ["fsst", "alp", "dictionary", "rle", "delta", "lz4", "zstd"]
# Per-codec configuration[storage.compression.codecs.fsst]enabled = truesymbol_table_size = 256min_pattern_length = 3
[storage.compression.codecs.alp]enabled = trueuse_exceptions = true
[storage.compression.codecs.dictionary]enabled = truemax_dictionary_size = 65536 # 64 KB
[storage.compression.codecs.rle]enabled = truemin_run_length = 3
[storage.compression.codecs.delta]enabled = trueencoding = "varint" # or "fixed"
# Performance tuning[performance]worker_threads = 8query_timeout_secs = 300simd_enabled = trueparallel_query = true
# Compression performance tuning[performance.compression]# Enable parallel compression for large dataparallel_threshold_bytes = 1048576 # 1 MB# Number of threads for parallel compressioncompression_threads = 4Rust Configuration Structures
//! Configuration structures//! Location: src/config.rs (ENHANCED)
use serde::{Deserialize, Serialize};
/// Storage configuration (ENHANCED)#[derive(Debug, Clone, Serialize, Deserialize)]pub struct StorageConfig { pub path: Option<PathBuf>, pub memory_only: bool, pub wal_enabled: bool, pub cache_size: usize, /// Compression configuration (ENHANCED) pub compression: CompressionConfig,}
/// Compression configuration#[derive(Debug, Clone, Serialize, Deserialize)]pub struct CompressionConfig { /// Enable compression pub enabled: bool, /// Compression algorithm pub algorithm: CompressionAlgorithmConfig, /// Auto-selection configuration #[serde(default)] pub auto: AutoCompressionConfig, /// Per-codec configuration #[serde(default)] pub codecs: CodecConfigs,}
impl Default for CompressionConfig { fn default() -> Self { Self { enabled: true, algorithm: CompressionAlgorithmConfig::Auto, auto: AutoCompressionConfig::default(), codecs: CodecConfigs::default(), } }}
/// Compression algorithm configuration#[derive(Debug, Clone, Serialize, Deserialize)]#[serde(rename_all = "lowercase")]pub enum CompressionAlgorithmConfig { None, Lz4, Zstd, Fsst, Alp, Dictionary, Rle, Delta, Auto,}
/// Auto-selection configuration#[derive(Debug, Clone, Serialize, Deserialize)]pub struct AutoCompressionConfig { pub auto_switch: bool, pub min_compress_size: usize, pub target_ratio: f64, pub max_compress_time_us: u64, pub collect_stats: bool, pub enabled_codecs: Vec<String>,}
impl Default for AutoCompressionConfig { fn default() -> Self { Self { auto_switch: true, min_compress_size: 256, target_ratio: 2.0, max_compress_time_us: 1000, collect_stats: true, enabled_codecs: vec![ "fsst".to_string(), "alp".to_string(), "dictionary".to_string(), "rle".to_string(), "delta".to_string(), "lz4".to_string(), "zstd".to_string(), ], } }}
/// Per-codec configuration#[derive(Debug, Clone, Default, Serialize, Deserialize)]pub struct CodecConfigs { #[serde(default)] pub fsst: FsstConfig, #[serde(default)] pub alp: AlpConfig, #[serde(default)] pub dictionary: DictionaryConfig, #[serde(default)] pub rle: RleConfig, #[serde(default)] pub delta: DeltaConfig,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct FsstConfig { pub enabled: bool, pub symbol_table_size: usize, pub min_pattern_length: usize,}
impl Default for FsstConfig { fn default() -> Self { Self { enabled: true, symbol_table_size: 256, min_pattern_length: 3, } }}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct AlpConfig { pub enabled: bool, pub use_exceptions: bool,}
impl Default for AlpConfig { fn default() -> Self { Self { enabled: true, use_exceptions: true, } }}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct DictionaryConfig { pub enabled: bool, pub max_dictionary_size: usize,}
impl Default for DictionaryConfig { fn default() -> Self { Self { enabled: true, max_dictionary_size: 65536, } }}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct RleConfig { pub enabled: bool, pub min_run_length: usize,}
impl Default for RleConfig { fn default() -> Self { Self { enabled: true, min_run_length: 3, } }}
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct DeltaConfig { pub enabled: bool, pub encoding: DeltaEncoding,}
impl Default for DeltaConfig { fn default() -> Self { Self { enabled: true, encoding: DeltaEncoding::Varint, } }}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]#[serde(rename_all = "lowercase")]pub enum DeltaEncoding { Varint, Fixed,}Performance Monitoring Design
Metrics Collection
//! Statistics collector for compression operationsuse super::{CompressionAlgorithm, DataPattern};use std::sync::Arc;use std::sync::Mutex;use std::time::Duration;
/// Compression statistics#[derive(Debug, Clone)]pub struct CompressionStats { /// Total bytes compressed pub total_bytes_compressed: u64, /// Total bytes decompressed pub total_bytes_decompressed: u64, /// Total compression operations pub total_compressions: u64, /// Total decompression operations pub total_decompressions: u64, /// Total time spent compressing pub total_compress_time: Duration, /// Total time spent decompressing pub total_decompress_time: Duration, /// Average compression ratio pub avg_compression_ratio: f64, /// Per-algorithm statistics pub by_algorithm: HashMap<CompressionAlgorithm, AlgorithmStats>, /// Per-pattern statistics pub by_pattern: HashMap<DataPattern, PatternStats>,}
#[derive(Debug, Clone)]pub struct AlgorithmStats { pub algorithm: CompressionAlgorithm, pub use_count: u64, pub total_input_bytes: u64, pub total_output_bytes: u64, pub avg_ratio: f64, pub avg_compress_time_us: f64, pub avg_decompress_time_us: f64, pub min_ratio: f64, pub max_ratio: f64,}
#[derive(Debug, Clone)]pub struct PatternStats { pub pattern: DataPattern, pub detection_count: u64, pub best_algorithm: CompressionAlgorithm, pub avg_ratio: f64,}
/// Statistics collectorpub struct StatsCollector { stats: Arc<Mutex<CompressionStats>>, history_size: usize, history: Arc<Mutex<VecDeque<CompressionEvent>>>,}
impl StatsCollector { pub fn new(history_size: usize) -> Self { Self { stats: Arc::new(Mutex::new(CompressionStats::default())), history_size, history: Arc::new(Mutex::new(VecDeque::new())), } }
/// Record compression event pub fn record( &self, algorithm: CompressionAlgorithm, pattern: DataPattern, block: &CompressedBlock, duration: Duration, ) { let mut stats = self.stats.lock().unwrap();
// Update totals stats.total_bytes_compressed += block.original_size as u64; stats.total_compressions += 1; stats.total_compress_time += duration;
// Update per-algorithm stats let algo_stats = stats.by_algorithm .entry(algorithm) .or_insert_with(|| AlgorithmStats::new(algorithm)); algo_stats.record_compression(block, duration);
// Update per-pattern stats let pattern_stats = stats.by_pattern .entry(pattern) .or_insert_with(|| PatternStats::new(pattern)); pattern_stats.record(algorithm, block.compression_ratio());
// Update average ratio let total_compressions = stats.total_compressions as f64; let prev_avg = stats.avg_compression_ratio; stats.avg_compression_ratio = (prev_avg * (total_compressions - 1.0) + block.compression_ratio()) / total_compressions;
// Add to history let mut history = self.history.lock().unwrap(); history.push_back(CompressionEvent { timestamp: std::time::SystemTime::now(), algorithm, pattern, ratio: block.compression_ratio(), input_size: block.original_size, output_size: block.data.len(), duration, });
// Trim history while history.len() > self.history_size { history.pop_front(); } }
/// Get current statistics snapshot pub fn snapshot(&self) -> CompressionStats { self.stats.lock().unwrap().clone() }
/// Get recent compression events pub fn recent_events(&self, count: usize) -> Vec<CompressionEvent> { let history = self.history.lock().unwrap(); history.iter() .rev() .take(count) .cloned() .collect() }
/// Generate statistics report pub fn report(&self) -> String { let stats = self.snapshot(); format!( "Compression Statistics:\n\ Total Compressions: {}\n\ Total Bytes Compressed: {:.2} MB\n\ Average Compression Ratio: {:.2}x\n\ Average Compress Time: {:.2} µs\n\ \n\ By Algorithm:\n{}\n\ \n\ By Pattern:\n{}", stats.total_compressions, stats.total_bytes_compressed as f64 / 1_000_000.0, stats.avg_compression_ratio, stats.total_compress_time.as_micros() as f64 / stats.total_compressions as f64, Self::format_algorithm_stats(&stats.by_algorithm), Self::format_pattern_stats(&stats.by_pattern), ) }
fn format_algorithm_stats(stats: &HashMap<CompressionAlgorithm, AlgorithmStats>) -> String { let mut lines = Vec::new(); for (algo, stat) in stats { lines.push(format!( " {:?}: {} uses, {:.2}x ratio, {:.0} µs", algo, stat.use_count, stat.avg_ratio, stat.avg_compress_time_us, )); } lines.join("\n") }
fn format_pattern_stats(stats: &HashMap<DataPattern, PatternStats>) -> String { let mut lines = Vec::new(); for (pattern, stat) in stats { lines.push(format!( " {:?}: {} detections, best: {:?}, {:.2}x ratio", pattern, stat.detection_count, stat.best_algorithm, stat.avg_ratio, )); } lines.join("\n") }}
#[derive(Debug, Clone)]struct CompressionEvent { timestamp: std::time::SystemTime, algorithm: CompressionAlgorithm, pattern: DataPattern, ratio: f64, input_size: usize, output_size: usize, duration: Duration,}
impl AlgorithmStats { fn new(algorithm: CompressionAlgorithm) -> Self { Self { algorithm, use_count: 0, total_input_bytes: 0, total_output_bytes: 0, avg_ratio: 0.0, avg_compress_time_us: 0.0, avg_decompress_time_us: 0.0, min_ratio: f64::MAX, max_ratio: 0.0, } }
fn record_compression(&mut self, block: &CompressedBlock, duration: Duration) { self.use_count += 1; self.total_input_bytes += block.original_size as u64; self.total_output_bytes += block.data.len() as u64;
let ratio = block.compression_ratio(); self.min_ratio = self.min_ratio.min(ratio); self.max_ratio = self.max_ratio.max(ratio);
// Update average ratio let n = self.use_count as f64; self.avg_ratio = (self.avg_ratio * (n - 1.0) + ratio) / n;
// Update average time let time_us = duration.as_micros() as f64; self.avg_compress_time_us = (self.avg_compress_time_us * (n - 1.0) + time_us) / n; }}
impl PatternStats { fn new(pattern: DataPattern) -> Self { Self { pattern, detection_count: 0, best_algorithm: CompressionAlgorithm::None, avg_ratio: 0.0, } }
fn record(&mut self, algorithm: CompressionAlgorithm, ratio: f64) { self.detection_count += 1;
// Update average ratio let n = self.detection_count as f64; self.avg_ratio = (self.avg_ratio * (n - 1.0) + ratio) / n;
// Update best algorithm if this performed better if ratio > self.avg_ratio * 1.1 { self.best_algorithm = algorithm; } }}Monitoring Endpoints
//! Monitoring and metrics endpointsuse super::stats::{CompressionStats, StatsCollector};use std::sync::Arc;
/// Compression monitoring interfacepub struct CompressionMonitor { stats: Arc<StatsCollector>,}
impl CompressionMonitor { pub fn new(stats: Arc<StatsCollector>) -> Self { Self { stats } }
/// Get current metrics snapshot pub fn metrics(&self) -> CompressionMetrics { let stats = self.stats.snapshot();
CompressionMetrics { total_compressions: stats.total_compressions, total_decompressions: stats.total_decompressions, total_bytes_in: stats.total_bytes_compressed, total_bytes_out: stats.total_bytes_compressed as f64 / stats.avg_compression_ratio, avg_ratio: stats.avg_compression_ratio, avg_compress_time_us: stats.total_compress_time.as_micros() as f64 / stats.total_compressions.max(1) as f64, avg_decompress_time_us: stats.total_decompress_time.as_micros() as f64 / stats.total_decompressions.max(1) as f64, algorithms: stats.by_algorithm.iter() .map(|(algo, stat)| AlgorithmMetric { algorithm: format!("{:?}", algo), use_count: stat.use_count, avg_ratio: stat.avg_ratio, avg_time_us: stat.avg_compress_time_us, }) .collect(), patterns: stats.by_pattern.iter() .map(|(pattern, stat)| PatternMetric { pattern: format!("{:?}", pattern), detection_count: stat.detection_count, best_algorithm: format!("{:?}", stat.best_algorithm), avg_ratio: stat.avg_ratio, }) .collect(), } }
/// Get formatted text report pub fn report(&self) -> String { self.stats.report() }
/// Get JSON metrics pub fn json_metrics(&self) -> serde_json::Value { let metrics = self.metrics(); serde_json::to_value(&metrics).unwrap() }}
#[derive(Debug, Clone, serde::Serialize)]pub struct CompressionMetrics { pub total_compressions: u64, pub total_decompressions: u64, pub total_bytes_in: u64, pub total_bytes_out: f64, pub avg_ratio: f64, pub avg_compress_time_us: f64, pub avg_decompress_time_us: f64, pub algorithms: Vec<AlgorithmMetric>, pub patterns: Vec<PatternMetric>,}
#[derive(Debug, Clone, serde::Serialize)]pub struct AlgorithmMetric { pub algorithm: String, pub use_count: u64, pub avg_ratio: f64, pub avg_time_us: f64,}
#[derive(Debug, Clone, serde::Serialize)]pub struct PatternMetric { pub pattern: String, pub detection_count: u64, pub best_algorithm: String, pub avg_ratio: f64,}SQL Interface for Monitoring
-- View compression statisticsSELECT * FROM heliosdb_compression_stats;
-- Output:-- algorithm | uses | avg_ratio | avg_compress_us | avg_decompress_us | total_bytes_in | total_bytes_out-- ----------|-------|-----------|-----------------|-------------------|----------------|------------------ FSST | 15023 | 6.2 | 45.3 | 12.1 | 1.2 GB | 195 MB-- ALP | 8891 | 3.8 | 52.1 | 18.7 | 800 MB | 211 MB-- Dictionary| 4532 | 12.3 | 28.4 | 8.9 | 450 MB | 37 MB-- RLE | 1203 | 25.1 | 15.2 | 5.3 | 120 MB | 4.8 MB-- Delta | 9874 | 8.9 | 22.7 | 9.1 | 950 MB | 107 MB-- LZ4 | 3421 | 2.1 | 18.3 | 6.2 | 340 MB | 162 MB
-- View pattern detection statisticsSELECT * FROM heliosdb_pattern_stats;
-- Output:-- pattern | detections | best_algorithm | avg_ratio-- -----------------|------------|----------------|------------ StringData | 15023 | FSST | 6.2-- FloatingPointData| 8891 | ALP | 3.8-- LowCardinality | 4532 | Dictionary | 12.3-- Sequential | 9874 | Delta | 8.9-- Random | 3421 | LZ4 | 2.1
-- View recent compression eventsSELECT * FROM heliosdb_compression_events LIMIT 10;
-- Output:-- timestamp | algorithm | pattern | ratio | input_kb | output_kb | duration_us-- --------------------|------------|------------------|-------|----------|-----------|-------------- 2025-11-18 10:23:45 | FSST | StringData | 7.2 | 124 | 17 | 52-- 2025-11-18 10:23:44 | ALP | FloatingPointData| 4.1 | 256 | 62 | 68-- 2025-11-18 10:23:43 | Dictionary | LowCardinality | 15.3 | 64 | 4 | 31-- ...
-- Get current compression configurationSELECT * FROM heliosdb_config WHERE key LIKE 'compression.%';
-- Output:-- key | value-- -------------------------------------|--------- compression.enabled | true-- compression.algorithm | auto-- compression.auto.auto_switch | true-- compression.auto.min_compress_size | 256-- compression.auto.target_ratio | 2.0-- compression.auto.max_compress_time_us| 1000