Compression Integration Architecture - Part 1 of 4
Compression Integration Architecture - Part 1 of 4
Architecture Overview & API Design
Navigation: Index | Part 1 | Part 2 →
HeliosDB Nano v2.1 - Compression Integration Architecture
Version: 2.1.0 Created: November 18, 2025 Status: Architecture Design Target: HeliosDB Nano Phase 3 - Week 2
Executive Summary
This document defines the architecture for integrating advanced compression codecs (FSST and ALP) into HeliosDB Nano v2.1, providing automatic, transparent compression that achieves 5-15x storage reduction with minimal performance overhead.
Key Design Principles
- Zero Configuration - Automatic codec selection based on data patterns
- Non-Breaking - Fully backward compatible with existing storage
- Modular - Clean separation between codecs, storage, and Arrow integration
- Observable - Comprehensive metrics and performance monitoring
- Zero IP - Only open-source algorithms (FSST, ALP, standard codecs)
Success Metrics
| Metric | Target | Measurement |
|---|---|---|
| Storage Reduction | 5-15x | Compression ratio on real workloads |
| Read Overhead | <3% | Query latency with compression vs. without |
| Write Overhead | <5% | Insert throughput with compression vs. without |
| CPU Overhead | <15% | Background compression CPU usage |
| Migration Time | 0 seconds | Lazy migration on read/write |
Architecture Overview
System Context Diagram
┌─────────────────────────────────────────────────────────────────┐│ HeliosDB Nano v2.1 │├─────────────────────────────────────────────────────────────────┤│ ││ ┌────────────────────────────────────────────────────────┐ ││ │ SQL Query Interface │ ││ │ (PostgreSQL Wire Protocol + REPL) │ ││ └────────────────┬───────────────────────────────────────┘ ││ │ ││ ┌────────────────▼───────────────────────────────────────┐ ││ │ Query Executor (Volcano Model) │ ││ │ • Scan Operator │ ││ │ • Filter/Project │ ││ │ • Join/Aggregate │ ││ └────────────────┬───────────────────────────────────────┘ ││ │ ││ ┌────────────────▼───────────────────────────────────────┐ ││ │ Storage Engine (RocksDB-based) │ ││ │ ┌──────────────────────────────────────────────┐ │ ││ │ │ Compression Layer (NEW) │ │ ││ │ │ ┌──────────┬──────────┬─────────┬────────┐ │ │ ││ │ │ │ FSST │ ALP │ Dict │ RLE │ │ │ ││ │ │ └──────────┴──────────┴─────────┴────────┘ │ │ ││ │ │ ┌──────────────────────────────────────┐ │ │ ││ │ │ │ Codec Selector (Pattern-based) │ │ │ ││ │ │ └──────────────────────────────────────┘ │ │ ││ │ └──────────────────────────────────────────────┘ │ ││ │ ┌──────────────────────────────────────────────┐ │ ││ │ │ Existing LZ4/Zstd (RocksDB native) │ │ ││ │ └──────────────────────────────────────────────┘ │ ││ │ ┌──────────────────────────────────────────────┐ │ ││ │ │ MVCC Layer (Versioned Values) │ │ ││ │ └──────────────────────────────────────────────┘ │ ││ │ ┌──────────────────────────────────────────────┐ │ ││ │ │ RocksDB (Persistent Storage) │ │ ││ │ └──────────────────────────────────────────────┘ │ ││ └────────────────────────────────────────────────────────┘ ││ ││ ┌────────────────────────────────────────────────────────┐ ││ │ Apache Arrow Integration (Columnar) │ ││ │ • Batch Processing │ ││ │ • Column-specific Compression │ ││ │ • Parquet Export (with compression metadata) │ ││ └────────────────────────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────┘Compression Layer Architecture
┌──────────────────────────────────────────────────────────────────┐│ Compression Layer │├──────────────────────────────────────────────────────────────────┤│ ││ ┌──────────────────────────────────────────────────────────┐ ││ │ CompressionManager (Main API) │ ││ │ • compress(data) -> CompressedBlock │ ││ │ • decompress(block) -> Vec<u8> │ ││ │ • compress_column(column) -> CompressedColumn │ ││ └────┬───────────────────────────────────────┬─────────────┘ ││ │ │ ││ ▼ ▼ ││ ┌────────────────────────┐ ┌────────────────────┐ ││ │ Pattern Analyzer │ │ Codec Registry │ ││ │ • SIMD-accelerated │ │ • FSST │ ││ │ • Data type detection │ │ • ALP │ ││ │ • Entropy calculation │ │ • Dictionary │ ││ │ • Cardinality check │ │ • RLE │ ││ └────┬───────────────────┘ │ • Delta │ ││ │ │ • LZ4/Zstd │ ││ ▼ └────────────────────┘ ││ ┌────────────────────────┐ ││ │ Codec Selector │◄────────────────────┐ ││ │ • Rule-based │ │ ││ │ • Cost model │ │ ││ │ • Adaptive learning │ │ ││ └────┬───────────────────┘ │ ││ │ │ ││ ▼ │ ││ ┌────────────────────────┐ ┌───────────┴──────────┐ ││ │ Compression Pipeline │ │ Stats Collector │ ││ │ 1. Analyze pattern │─────────▶│ • Ratio history │ ││ │ 2. Select codec │ │ • Performance │ ││ │ 3. Compress │ │ • Pattern freq │ ││ │ 4. Validate checksum │ └──────────────────────┘ ││ │ 5. Record stats │ ││ └────────────────────────┘ ││ │└──────────────────────────────────────────────────────────────────┘Integration Points
┌─────────────────────────────────────────────────────────────────┐│ Integration Points │├─────────────────────────────────────────────────────────────────┤│ ││ 1. RocksDB Storage Layer Integration ││ ┌─────────────────────────────────────────────────┐ ││ │ StorageEngine::put(key, value) │ ││ │ │ │ ││ │ ├──▶ serialize(value) │ ││ │ ├──▶ compress(serialized) ◄── NEW │ ││ │ ├──▶ encrypt(compressed) [if enabled] │ ││ │ └──▶ db.put(key, encrypted) │ ││ │ │ ││ │ StorageEngine::get(key) │ ││ │ │ │ ││ │ ├──▶ db.get(key) │ ││ │ ├──▶ decrypt(data) [if enabled] │ ││ │ ├──▶ decompress(decrypted) ◄── NEW │ ││ │ └──▶ deserialize(decompressed) │ ││ └─────────────────────────────────────────────────┘ ││ ││ 2. Apache Arrow Integration ││ ┌─────────────────────────────────────────────────┐ ││ │ scan_table_columnar(table) -> RecordBatch │ ││ │ │ │ ││ │ ├──▶ collect tuples │ ││ │ ├──▶ convert to Arrow columns │ ││ │ ├──▶ compress_column(col) ◄── NEW │ ││ │ │ • FSST for strings │ ││ │ │ • ALP for floats │ ││ │ │ • Dictionary for low cardinality │ ││ │ └──▶ return RecordBatch │ ││ └─────────────────────────────────────────────────┘ ││ ││ 3. Tuple Serialization Integration ││ ┌─────────────────────────────────────────────────┐ ││ │ insert_tuple(table, tuple) -> row_id │ ││ │ │ │ ││ │ ├──▶ bincode::serialize(tuple) │ ││ │ ├──▶ compression.compress(bytes) ◄── NEW │ ││ │ │ • Per-column analysis │ ││ │ │ • Metadata stored in block │ ││ │ └──▶ storage.put(key, compressed) │ ││ └─────────────────────────────────────────────────┘ ││ ││ 4. Configuration Integration ││ ┌─────────────────────────────────────────────────┐ ││ │ Config::storage::compression ◄── ENHANCED │ ││ │ • None / Lz4 / Zstd (existing) │ ││ │ • Auto (new - uses pattern-based selection) │ ││ │ • Custom { codecs: [...], strategy: Auto } │ ││ └─────────────────────────────────────────────────┘ ││ │└─────────────────────────────────────────────────────────────────┘API Design
Compression Traits
//! Compression codec trait definitionsuse serde::{Deserialize, Serialize};use std::time::Duration;
/// Compression algorithm identifier#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]pub enum CompressionAlgorithm { /// No compression None, /// LZ4 - fast general purpose (existing, via RocksDB) Lz4, /// Zstd - high ratio general purpose (existing, via RocksDB) Zstd, /// FSST - Fast Static Symbol Table for strings (NEW) Fsst, /// ALP - Adaptive Lossless floating-Point (NEW) Alp, /// Dictionary encoding - low cardinality data Dictionary, /// RLE - Run Length Encoding Rle, /// Delta encoding - sequential numeric data Delta, /// Automatic selection based on data pattern Auto,}
/// Data pattern detected by analyzer#[derive(Debug, Clone, Copy, PartialEq, Eq)]pub enum DataPattern { /// Random/incompressible data Random, /// String data with repeated patterns StringData, /// Numeric integer data IntegerData, /// Floating point data FloatingPointData, /// Low cardinality (few unique values) LowCardinality, /// Sequential/sorted data Sequential, /// Time-series data TimeSeries, /// JSON/structured text StructuredText,}
/// Codec performance characteristics#[derive(Debug, Clone)]pub struct CodecCharacteristics { /// Compression speed (MB/s) pub compress_speed_mbps: f64, /// Decompression speed (MB/s) pub decompress_speed_mbps: f64, /// Typical compression ratio pub typical_ratio: f64, /// CPU overhead percentage (0.0-1.0) pub cpu_overhead: f64, /// Best for pattern pub best_for: Vec<DataPattern>,}
/// Main compression codec traitpub trait CompressionCodec: Send + Sync { /// Get codec algorithm identifier fn algorithm(&self) -> CompressionAlgorithm;
/// Compress data fn compress(&self, data: &[u8]) -> Result<Vec<u8>>;
/// Decompress data fn decompress(&self, compressed: &[u8], original_size: Option<usize>) -> Result<Vec<u8>>;
/// Get codec characteristics fn characteristics(&self) -> CodecCharacteristics;
/// Estimate compressed size without actually compressing fn estimate_compressed_size(&self, data: &[u8]) -> usize { let ratio = self.characteristics().typical_ratio; ((data.len() as f64 / ratio).ceil() as usize).max(128) }
/// Check if codec can handle this data pattern efficiently fn supports_pattern(&self, pattern: DataPattern) -> bool { self.characteristics().best_for.contains(&pattern) }}
/// Compressed data block with metadata#[derive(Debug, Clone, Serialize, Deserialize)]pub struct CompressedBlock { /// Algorithm used for compression pub algorithm: CompressionAlgorithm, /// Version of the algorithm pub version: u16, /// Original uncompressed size pub original_size: usize, /// Compressed data pub data: Vec<u8>, /// CRC32 checksum of original data pub checksum: u32, /// Compression timestamp pub compressed_at: u64, /// Data pattern detected pub pattern: DataPattern,}
impl CompressedBlock { /// Get compression ratio pub fn compression_ratio(&self) -> f64 { self.original_size as f64 / self.data.len() as f64 }
/// Get space savings percentage pub fn space_savings_pct(&self) -> f64 { (1.0 - (self.data.len() as f64 / self.original_size as f64)) * 100.0 }}Codec Implementations
//! FSST Codec Implementationuse super::{CompressionAlgorithm, CompressionCodec, CodecCharacteristics, DataPattern};use crate::error::Result;
/// FSST (Fast Static Symbol Table) compression codec////// FSST is a string compression algorithm that builds a symbol table/// of frequent byte sequences and replaces them with single-byte codes./// Excellent for string data with repeated patterns (logs, URLs, JSON).////// Reference: "FSST: Fast Static Symbol Table Compression" (DuckDB)/// Patent Status: Open source, MIT licensedpub struct FsstCodec { // Configuration symbol_table_size: usize, min_pattern_length: usize,}
impl FsstCodec { pub fn new() -> Self { Self { symbol_table_size: 256, // Single-byte codes min_pattern_length: 3, // Minimum pattern to compress } }
/// Build symbol table from sample data fn build_symbol_table(&self, data: &[u8]) -> SymbolTable { // Implementation: // 1. Count n-gram frequencies (n=3..8) // 2. Select top 256 most frequent patterns // 3. Build encoding/decoding tables todo!("Build FSST symbol table") }
/// Compress data using symbol table fn compress_with_table(&self, data: &[u8], table: &SymbolTable) -> Vec<u8> { // Implementation: // 1. Serialize symbol table (header) // 2. Replace patterns with single-byte codes // 3. Emit unmatched bytes literally todo!("FSST compression") }}
impl CompressionCodec for FsstCodec { fn algorithm(&self) -> CompressionAlgorithm { CompressionAlgorithm::Fsst }
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> { // Build symbol table from data let table = self.build_symbol_table(data); // Compress using table Ok(self.compress_with_table(data, &table)) }
fn decompress(&self, compressed: &[u8], _original_size: Option<usize>) -> Result<Vec<u8>> { // Implementation: // 1. Deserialize symbol table from header // 2. Decode byte-by-byte, expanding symbols todo!("FSST decompression") }
fn characteristics(&self) -> CodecCharacteristics { CodecCharacteristics { compress_speed_mbps: 500.0, decompress_speed_mbps: 2000.0, typical_ratio: 5.0, // 5x better than Zstd for strings cpu_overhead: 0.04, best_for: vec![ DataPattern::StringData, DataPattern::StructuredText, ], } }}
struct SymbolTable { // Symbol encoding table: pattern -> code encoding: Vec<(Vec<u8>, u8)>, // Symbol decoding table: code -> pattern decoding: Vec<Vec<u8>>,}//! ALP Codec Implementationuse super::{CompressionAlgorithm, CompressionCodec, CodecCharacteristics, DataPattern};use crate::error::Result;
/// ALP (Adaptive Lossless floating-Point) compression codec////// ALP is a floating-point compression algorithm that exploits the structure/// of floating-point data to achieve better compression than general-purpose/// algorithms.////// Reference: "ALP: Adaptive Lossless floating-Point Compression" (DuckDB)/// Patent Status: Open sourcepub struct AlpCodec { // Configuration use_exceptions: bool,}
impl AlpCodec { pub fn new() -> Self { Self { use_exceptions: true, } }
/// Analyze floating point data and determine encoding parameters fn analyze_floats(&self, data: &[u8]) -> AlpParameters { // Implementation: // 1. Parse as f32/f64 array // 2. Detect common exponent // 3. Determine precision requirements // 4. Calculate optimal encoding todo!("ALP analysis") }}
impl CompressionCodec for AlpCodec { fn algorithm(&self) -> CompressionAlgorithm { CompressionAlgorithm::Alp }
fn compress(&self, data: &[u8]) -> Result<Vec<u8>> { // Implementation: // 1. Analyze data to get parameters // 2. Encode parameters in header // 3. Encode mantissas with common exponent // 4. Handle exceptions (if enabled) todo!("ALP compression") }
fn decompress(&self, compressed: &[u8], original_size: Option<usize>) -> Result<Vec<u8>> { // Implementation: // 1. Read parameters from header // 2. Decode mantissas // 3. Apply exceptions // 4. Reconstruct floats todo!("ALP decompression") }
fn characteristics(&self) -> CodecCharacteristics { CodecCharacteristics { compress_speed_mbps: 600.0, decompress_speed_mbps: 1800.0, typical_ratio: 3.0, // 3x better than Zstd for floats cpu_overhead: 0.03, best_for: vec![ DataPattern::FloatingPointData, DataPattern::TimeSeries, ], } }}
struct AlpParameters { common_exponent: i32, precision_bits: u8, exception_count: usize,}Compression Manager API
//! Compression Manager - Main APIuse super::{CompressionAlgorithm, CompressionCodec, CompressedBlock, DataPattern};use crate::error::Result;use std::sync::Arc;
/// Compression manager configuration#[derive(Debug, Clone)]pub struct CompressionConfig { /// Enable compression pub enabled: bool, /// Default algorithm (Auto for automatic selection) pub default_algorithm: CompressionAlgorithm, /// Enable automatic codec switching pub auto_switch: bool, /// Enable statistics collection pub collect_stats: bool, /// Minimum data size to compress (bytes) pub min_compress_size: usize, /// Target compression ratio (for auto mode) pub target_ratio: f64, /// Maximum compression time (microseconds) pub max_compress_time_us: u64,}
impl Default for CompressionConfig { fn default() -> Self { Self { enabled: true, default_algorithm: CompressionAlgorithm::Auto, auto_switch: true, collect_stats: true, min_compress_size: 256, // Don't compress tiny blocks target_ratio: 2.0, // Aim for 2x minimum max_compress_time_us: 1000, // 1ms max } }}
/// Main compression managerpub struct CompressionManager { config: CompressionConfig, codecs: Arc<CodecRegistry>, analyzer: Arc<PatternAnalyzer>, selector: Arc<CodecSelector>, stats: Arc<StatsCollector>,}
impl CompressionManager { pub fn new(config: CompressionConfig) -> Result<Self> { Ok(Self { config, codecs: Arc::new(CodecRegistry::new()), analyzer: Arc::new(PatternAnalyzer::new()), selector: Arc::new(CodecSelector::new()), stats: Arc::new(StatsCollector::new()), }) }
/// Compress data with automatic codec selection pub fn compress(&self, data: &[u8]) -> Result<CompressedBlock> { // Skip compression for small data if data.len() < self.config.min_compress_size { return self.create_uncompressed_block(data); }
// Analyze data pattern let pattern = self.analyzer.analyze(data)?;
// Select best codec let algorithm = if self.config.default_algorithm == CompressionAlgorithm::Auto { self.selector.select_for_pattern(pattern, data)? } else { self.config.default_algorithm };
// Get codec and compress let codec = self.codecs.get(algorithm)?; let start = std::time::Instant::now(); let compressed = codec.compress(data)?; let duration = start.elapsed();
// Check if compression was beneficial if compressed.len() >= data.len() { return self.create_uncompressed_block(data); }
// Create block with metadata let block = CompressedBlock { algorithm, version: 1, original_size: data.len(), data: compressed, checksum: crc32fast::hash(data), compressed_at: current_timestamp(), pattern, };
// Record statistics if self.config.collect_stats { self.stats.record(algorithm, pattern, &block, duration); }
Ok(block) }
/// Compress with specific algorithm pub fn compress_with( &self, data: &[u8], algorithm: CompressionAlgorithm, ) -> Result<CompressedBlock> { let codec = self.codecs.get(algorithm)?; let compressed = codec.compress(data)?;
Ok(CompressedBlock { algorithm, version: 1, original_size: data.len(), data: compressed, checksum: crc32fast::hash(data), compressed_at: current_timestamp(), pattern: self.analyzer.analyze(data)?, }) }
/// Decompress data pub fn decompress(&self, block: &CompressedBlock) -> Result<Vec<u8>> { // Handle uncompressed blocks if block.algorithm == CompressionAlgorithm::None { return Ok(block.data.clone()); }
// Get codec and decompress let codec = self.codecs.get(block.algorithm)?; let decompressed = codec.decompress(&block.data, Some(block.original_size))?;
// Verify checksum let checksum = crc32fast::hash(&decompressed); if checksum != block.checksum { return Err(Error::ChecksumMismatch { expected: block.checksum, actual: checksum, }); }
Ok(decompressed) }
/// Get compression statistics pub fn stats(&self) -> CompressionStats { self.stats.snapshot() }
fn create_uncompressed_block(&self, data: &[u8]) -> Result<CompressedBlock> { Ok(CompressedBlock { algorithm: CompressionAlgorithm::None, version: 1, original_size: data.len(), data: data.to_vec(), checksum: crc32fast::hash(data), compressed_at: current_timestamp(), pattern: DataPattern::Random, }) }}
/// Codec registrystruct CodecRegistry { codecs: HashMap<CompressionAlgorithm, Arc<dyn CompressionCodec>>,}
impl CodecRegistry { fn new() -> Self { let mut codecs: HashMap<CompressionAlgorithm, Arc<dyn CompressionCodec>> = HashMap::new();
// Register all codecs codecs.insert(CompressionAlgorithm::Fsst, Arc::new(FsstCodec::new())); codecs.insert(CompressionAlgorithm::Alp, Arc::new(AlpCodec::new())); codecs.insert(CompressionAlgorithm::Dictionary, Arc::new(DictionaryCodec::new())); codecs.insert(CompressionAlgorithm::Rle, Arc::new(RleCodec::new())); codecs.insert(CompressionAlgorithm::Delta, Arc::new(DeltaCodec::new())); // Note: LZ4/Zstd handled by RocksDB layer
Self { codecs } }
fn get(&self, algorithm: CompressionAlgorithm) -> Result<Arc<dyn CompressionCodec>> { self.codecs .get(&algorithm) .cloned() .ok_or(Error::UnsupportedAlgorithm(algorithm)) }}Pattern Analyzer
//! Pattern Analyzer - Data pattern detectionuse super::DataPattern;use crate::error::Result;
/// Pattern analyzer using SIMD accelerationpub struct PatternAnalyzer { sample_size: usize,}
impl PatternAnalyzer { pub fn new() -> Self { Self { sample_size: 4096, // Sample first 4KB } }
/// Analyze data and detect pattern pub fn analyze(&self, data: &[u8]) -> Result<DataPattern> { let sample = self.get_sample(data);
// Run multiple detection heuristics let is_string = self.detect_string_data(sample); let is_float = self.detect_float_data(sample); let is_integer = self.detect_integer_data(sample); let cardinality = self.calculate_cardinality(sample); let entropy = self.calculate_entropy(sample); let is_sequential = self.detect_sequential(sample);
// Pattern selection logic if entropy < 0.3 { return Ok(DataPattern::Random); }
if cardinality < 256 && cardinality as f64 / sample.len() as f64 < 0.1 { return Ok(DataPattern::LowCardinality); }
if is_sequential { return Ok(DataPattern::Sequential); }
if is_float { return Ok(DataPattern::FloatingPointData); }
if is_integer { return Ok(DataPattern::IntegerData); }
if is_string { // Check if it's JSON-like if self.looks_like_json(sample) { return Ok(DataPattern::StructuredText); } return Ok(DataPattern::StringData); }
Ok(DataPattern::Random) }
fn get_sample<'a>(&self, data: &'a [u8]) -> &'a [u8] { if data.len() <= self.sample_size { data } else { &data[..self.sample_size] } }
fn detect_string_data(&self, data: &[u8]) -> bool { // Check for printable ASCII/UTF-8 let printable_count = data.iter() .filter(|&&b| (b >= 32 && b <= 126) || b == b'\n' || b == b'\t') .count(); printable_count as f64 / data.len() as f64 > 0.8 }
fn detect_float_data(&self, data: &[u8]) -> bool { // Check if data aligns to f32/f64 boundaries and has float patterns if data.len() % 8 == 0 || data.len() % 4 == 0 { // Try to interpret as floats and check for valid ranges // This is a heuristic - not perfect let floats_count = (data.len() / 8).min(100); let valid = unsafe { std::slice::from_raw_parts(data.as_ptr() as *const f64, floats_count) }.iter() .filter(|&&f| f.is_finite() && f.abs() < 1e10) .count();
valid as f64 / floats_count as f64 > 0.7 } else { false } }
fn detect_integer_data(&self, data: &[u8]) -> bool { // Similar heuristic for integers if data.len() % 8 == 0 { // Check for reasonable integer ranges true // Simplified } else { false } }
fn calculate_cardinality(&self, data: &[u8]) -> usize { let mut seen = std::collections::HashSet::new(); for &byte in data.iter().take(1024) { seen.insert(byte); } seen.len() }
fn calculate_entropy(&self, data: &[u8]) -> f64 { let mut counts = [0u32; 256]; for &byte in data { counts[byte as usize] += 1; }
let len = data.len() as f64; let mut entropy = 0.0; for &count in &counts { if count > 0 { let p = count as f64 / len; entropy -= p * p.log2(); } } entropy / 8.0 // Normalize to 0-1 }
fn detect_sequential(&self, data: &[u8]) -> bool { // Check for monotonic sequences if data.len() < 16 { return false; }
let diffs: Vec<i16> = data.windows(2) .map(|w| w[1] as i16 - w[0] as i16) .collect();
// Check if most differences are the same let mut diff_counts = std::collections::HashMap::new(); for &diff in &diffs { *diff_counts.entry(diff).or_insert(0) += 1; }
let max_count = diff_counts.values().max().unwrap_or(&0); *max_count as f64 / diffs.len() as f64 > 0.7 }
fn looks_like_json(&self, data: &[u8]) -> bool { // Simple heuristic: check for { } [ ] : , " let json_chars = data.iter() .filter(|&&b| matches!(b, b'{' | b'}' | b'[' | b']' | b':' | b',' | b'"')) .count(); json_chars as f64 / data.len() as f64 > 0.1 }}Codec Selector
//! Codec Selector - Automatic codec selectionuse super::{CompressionAlgorithm, DataPattern};use crate::error::Result;
/// Codec selector with rule-based and adaptive logicpub struct CodecSelector { // Historical performance data history: Arc<Mutex<SelectionHistory>>,}
impl CodecSelector { pub fn new() -> Self { Self { history: Arc::new(Mutex::new(SelectionHistory::new())), } }
/// Select best codec for data pattern pub fn select_for_pattern(&self, pattern: DataPattern, data: &[u8]) -> Result<CompressionAlgorithm> { // Rule-based selection let algorithm = match pattern { DataPattern::StringData => CompressionAlgorithm::Fsst, DataPattern::StructuredText => CompressionAlgorithm::Fsst, DataPattern::FloatingPointData => CompressionAlgorithm::Alp, DataPattern::TimeSeries => CompressionAlgorithm::Alp, DataPattern::LowCardinality => CompressionAlgorithm::Dictionary, DataPattern::Sequential => CompressionAlgorithm::Delta, DataPattern::IntegerData => { // Check if sequential if self.is_sequential_integers(data) { CompressionAlgorithm::Delta } else { CompressionAlgorithm::Dictionary } } DataPattern::Random => CompressionAlgorithm::Lz4, };
// Check historical performance and potentially override if let Some(better) = self.check_history(pattern, algorithm) { return Ok(better); }
Ok(algorithm) }
fn is_sequential_integers(&self, data: &[u8]) -> bool { // Heuristic for detecting sequential integer patterns if data.len() < 32 { return false; }
// Sample a few integers and check differences let samples = data.len() / 8; if samples < 4 { return false; }
let integers: Vec<i64> = (0..samples.min(100)) .map(|i| { let offset = i * 8; i64::from_le_bytes([ data[offset], data[offset+1], data[offset+2], data[offset+3], data[offset+4], data[offset+5], data[offset+6], data[offset+7], ]) }) .collect();
// Check if differences are consistent let diffs: Vec<i64> = integers.windows(2) .map(|w| w[1] - w[0]) .collect();
let avg_diff = diffs.iter().sum::<i64>() as f64 / diffs.len() as f64; let variance = diffs.iter() .map(|&d| (d as f64 - avg_diff).powi(2)) .sum::<f64>() / diffs.len() as f64;
// Low variance indicates sequential variance < 100.0 }
fn check_history(&self, pattern: DataPattern, current: CompressionAlgorithm) -> Option<CompressionAlgorithm> { let history = self.history.lock().unwrap(); history.get_better_algorithm(pattern, current) }}
struct SelectionHistory { // Pattern -> Algorithm -> Performance metrics performance: HashMap<DataPattern, HashMap<CompressionAlgorithm, PerformanceMetrics>>,}
impl SelectionHistory { fn new() -> Self { Self { performance: HashMap::new(), } }
fn get_better_algorithm(&self, pattern: DataPattern, current: CompressionAlgorithm) -> Option<CompressionAlgorithm> { if let Some(algorithms) = self.performance.get(&pattern) { // Find algorithm with best ratio and acceptable speed let current_perf = algorithms.get(¤t)?;
for (algo, perf) in algorithms { if perf.avg_ratio > current_perf.avg_ratio * 1.2 && perf.avg_compress_us < current_perf.avg_compress_us * 2.0 { return Some(*algo); } } } None }}
#[derive(Debug, Clone)]struct PerformanceMetrics { avg_ratio: f64, avg_compress_us: f64, avg_decompress_us: f64, sample_count: usize,}