Compression Integration Architecture - Part 3 of 4
Compression Integration Architecture - Part 3 of 4
Migration & Testing Strategies
Navigation: Index | ← Part 2 | Part 3 | Part 4 →
Migration Strategy
Backward Compatibility
The compression layer is designed for zero-downtime migration:
- Lazy Migration: Existing data remains uncompressed until first write
- Transparent Reads: Both compressed and uncompressed blocks are supported
- Gradual Rollout: New writes use compression; old data migrated on UPDATE
- Rollback Safe: Can disable compression without data loss
Migration Phases
┌──────────────────────────────────────────────────────────────┐│ Migration Timeline │├──────────────────────────────────────────────────────────────┤│ ││ Phase 1: Enable Compression (Day 0) ││ ┌────────────────────────────────────────────────┐ ││ │ • Deploy new code with compression enabled │ ││ │ • All NEW writes compressed automatically │ ││ │ • All reads work transparently │ ││ │ • Zero downtime │ ││ └────────────────────────────────────────────────┘ ││ ││ Phase 2: Background Migration (Day 1-7) ││ ┌────────────────────────────────────────────────┐ ││ │ • Optional background job compresses old data │ ││ │ • Low priority, runs during idle periods │ ││ │ • Progress: 0% → 100% over 7 days │ ││ │ • Can pause/resume anytime │ ││ └────────────────────────────────────────────────┘ ││ ││ Phase 3: Optimization (Day 7+) ││ ┌────────────────────────────────────────────────┐ ││ │ • Analyze compression statistics │ ││ │ • Tune codec selection rules │ ││ │ • Adjust configuration based on workload │ ││ │ • Monitor storage savings │ ││ └────────────────────────────────────────────────┘ ││ │└──────────────────────────────────────────────────────────────┘Block Format Versioning
//! Block format with version support/// Compressed block format (on-disk)////// Layout:/// ```/// +------------------+/// | Magic (4 bytes) | "HCMP" = compressed, "HUNM" = uncompressed/// +------------------+/// | Version (2 bytes)| Format version (currently 1)/// +------------------+/// | Algorithm (1) | Compression algorithm enum/// +------------------+/// | Flags (1) | Reserved flags/// +------------------+/// | Original Size (8)| Uncompressed size in bytes/// +------------------+/// | Compressed (8) | Compressed size in bytes/// +------------------+/// | Checksum (4) | CRC32 of original data/// +------------------+/// | Pattern (1) | Detected data pattern/// +------------------+/// | Timestamp (8) | Compression timestamp/// +------------------+/// | Data (variable) | Compressed data/// +------------------+/// ```const MAGIC_COMPRESSED: &[u8; 4] = b"HCMP";const MAGIC_UNCOMPRESSED: &[u8; 4] = b"HUNM";const CURRENT_VERSION: u16 = 1;
pub struct BlockFormat;
impl BlockFormat { /// Serialize compressed block to bytes pub fn serialize(block: &CompressedBlock) -> Vec<u8> { let mut buf = Vec::with_capacity(37 + block.data.len());
// Magic if block.algorithm == CompressionAlgorithm::None { buf.extend_from_slice(MAGIC_UNCOMPRESSED); } else { buf.extend_from_slice(MAGIC_COMPRESSED); }
// Version buf.extend_from_slice(&CURRENT_VERSION.to_le_bytes());
// Algorithm buf.push(Self::algorithm_to_byte(block.algorithm));
// Flags (reserved) buf.push(0);
// Sizes buf.extend_from_slice(&(block.original_size as u64).to_le_bytes()); buf.extend_from_slice(&(block.data.len() as u64).to_le_bytes());
// Checksum buf.extend_from_slice(&block.checksum.to_le_bytes());
// Pattern buf.push(Self::pattern_to_byte(block.pattern));
// Timestamp buf.extend_from_slice(&block.compressed_at.to_le_bytes());
// Data buf.extend_from_slice(&block.data);
buf }
/// Deserialize block from bytes pub fn deserialize(data: &[u8]) -> Result<CompressedBlock> { if data.len() < 37 { return Err(Error::InvalidBlockFormat("Too short".to_string())); }
// Check magic let magic = &data[0..4]; let is_compressed = if magic == MAGIC_COMPRESSED { true } else if magic == MAGIC_UNCOMPRESSED { false } else { return Err(Error::InvalidBlockFormat("Invalid magic".to_string())); };
// Version let version = u16::from_le_bytes([data[4], data[5]]); if version != CURRENT_VERSION { return Err(Error::UnsupportedVersion(version)); }
// Algorithm let algorithm = Self::byte_to_algorithm(data[6])?;
// Flags (unused) let _flags = data[7];
// Sizes let original_size = u64::from_le_bytes([ data[8], data[9], data[10], data[11], data[12], data[13], data[14], data[15], ]) as usize;
let compressed_size = u64::from_le_bytes([ data[16], data[17], data[18], data[19], data[20], data[21], data[22], data[23], ]) as usize;
// Checksum let checksum = u32::from_le_bytes([data[24], data[25], data[26], data[27]]);
// Pattern let pattern = Self::byte_to_pattern(data[28])?;
// Timestamp let compressed_at = u64::from_le_bytes([ data[29], data[30], data[31], data[32], data[33], data[34], data[35], data[36], ]);
// Data if data.len() < 37 + compressed_size { return Err(Error::InvalidBlockFormat("Truncated data".to_string())); } let block_data = data[37..37+compressed_size].to_vec();
Ok(CompressedBlock { algorithm, version: CURRENT_VERSION, original_size, data: block_data, checksum, compressed_at, pattern, }) }
fn algorithm_to_byte(algo: CompressionAlgorithm) -> u8 { match algo { CompressionAlgorithm::None => 0, CompressionAlgorithm::Lz4 => 1, CompressionAlgorithm::Zstd => 2, CompressionAlgorithm::Fsst => 3, CompressionAlgorithm::Alp => 4, CompressionAlgorithm::Dictionary => 5, CompressionAlgorithm::Rle => 6, CompressionAlgorithm::Delta => 7, CompressionAlgorithm::Auto => 255, // Should not be serialized } }
fn byte_to_algorithm(byte: u8) -> Result<CompressionAlgorithm> { Ok(match byte { 0 => CompressionAlgorithm::None, 1 => CompressionAlgorithm::Lz4, 2 => CompressionAlgorithm::Zstd, 3 => CompressionAlgorithm::Fsst, 4 => CompressionAlgorithm::Alp, 5 => CompressionAlgorithm::Dictionary, 6 => CompressionAlgorithm::Rle, 7 => CompressionAlgorithm::Delta, _ => return Err(Error::UnknownAlgorithm(byte)), }) }
fn pattern_to_byte(pattern: DataPattern) -> u8 { match pattern { DataPattern::Random => 0, DataPattern::StringData => 1, DataPattern::IntegerData => 2, DataPattern::FloatingPointData => 3, DataPattern::LowCardinality => 4, DataPattern::Sequential => 5, DataPattern::TimeSeries => 6, DataPattern::StructuredText => 7, } }
fn byte_to_pattern(byte: u8) -> Result<DataPattern> { Ok(match byte { 0 => DataPattern::Random, 1 => DataPattern::StringData, 2 => DataPattern::IntegerData, 3 => DataPattern::FloatingPointData, 4 => DataPattern::LowCardinality, 5 => DataPattern::Sequential, 6 => DataPattern::TimeSeries, 7 => DataPattern::StructuredText, _ => return Err(Error::UnknownPattern(byte)), }) }}Background Migration Job
//! Background compression migration jobuse super::{CompressionManager, CompressedBlock};use crate::storage::StorageEngine;use crate::error::Result;use std::sync::Arc;use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};use std::time::Duration;
/// Background migration configuration#[derive(Debug, Clone)]pub struct MigrationConfig { /// Enable background migration pub enabled: bool, /// Batch size (number of keys per batch) pub batch_size: usize, /// Delay between batches (milliseconds) pub batch_delay_ms: u64, /// Maximum CPU usage (0.0-1.0) pub max_cpu_usage: f64, /// Run during specific hours only (24-hour format) pub run_hours: Option<(u8, u8)>, // e.g., Some((22, 6)) = 10pm-6am only}
impl Default for MigrationConfig { fn default() -> Self { Self { enabled: false, // Opt-in batch_size: 100, batch_delay_ms: 100, max_cpu_usage: 0.15, // 15% max run_hours: None, // Run anytime } }}
/// Background migration jobpub struct MigrationJob { config: MigrationConfig, storage: Arc<StorageEngine>, compression: Arc<CompressionManager>, running: Arc<AtomicBool>, progress: Arc<AtomicU64>, total_keys: Arc<AtomicU64>,}
impl MigrationJob { pub fn new( config: MigrationConfig, storage: Arc<StorageEngine>, compression: Arc<CompressionManager>, ) -> Self { Self { config, storage, compression, running: Arc::new(AtomicBool::new(false)), progress: Arc::new(AtomicU64::new(0)), total_keys: Arc::new(AtomicU64::new(0)), } }
/// Start migration job in background pub fn start(&self) -> Result<()> { if !self.config.enabled { return Ok(()); }
if self.running.swap(true, Ordering::SeqCst) { return Err(Error::AlreadyRunning); }
// Clone Arcs for thread let running = Arc::clone(&self.running); let progress = Arc::clone(&self.progress); let total_keys = Arc::clone(&self.total_keys); let storage = Arc::clone(&self.storage); let compression = Arc::clone(&self.compression); let config = self.config.clone();
// Spawn background thread std::thread::spawn(move || { Self::run_migration(running, progress, total_keys, storage, compression, config); });
Ok(()) }
/// Stop migration job pub fn stop(&self) { self.running.store(false, Ordering::SeqCst); }
/// Get migration progress (0.0-1.0) pub fn progress(&self) -> f64 { let processed = self.progress.load(Ordering::Relaxed); let total = self.total_keys.load(Ordering::Relaxed); if total == 0 { 0.0 } else { processed as f64 / total as f64 } }
fn run_migration( running: Arc<AtomicBool>, progress: Arc<AtomicU64>, total_keys: Arc<AtomicU64>, storage: Arc<StorageEngine>, compression: Arc<CompressionManager>, config: MigrationConfig, ) { tracing::info!("Starting background compression migration");
// Count total keys (approximate) // This is a simplified version - actual implementation would use RocksDB iterator let total = 1_000_000; // Placeholder total_keys.store(total, Ordering::Relaxed);
let mut processed = 0u64;
while running.load(Ordering::Relaxed) { // Check if we should run now if !Self::should_run_now(&config) { std::thread::sleep(Duration::from_secs(60)); continue; }
// Check CPU usage if Self::get_cpu_usage() > config.max_cpu_usage { std::thread::sleep(Duration::from_millis(config.batch_delay_ms * 10)); continue; }
// Process batch match Self::process_batch(&storage, &compression, config.batch_size) { Ok(count) => { processed += count as u64; progress.store(processed, Ordering::Relaxed);
if count == 0 { tracing::info!("Background compression migration complete"); running.store(false, Ordering::Relaxed); break; } } Err(e) => { tracing::error!("Migration batch failed: {}", e); std::thread::sleep(Duration::from_secs(10)); } }
// Delay between batches std::thread::sleep(Duration::from_millis(config.batch_delay_ms)); }
tracing::info!("Background compression migration stopped"); }
fn should_run_now(config: &MigrationConfig) -> bool { if let Some((start, end)) = config.run_hours { let now = chrono::Local::now().hour() as u8; if start < end { now >= start && now < end } else { // Wrap around midnight now >= start || now < end } } else { true } }
fn get_cpu_usage() -> f64 { // Placeholder - actual implementation would use sysinfo crate 0.05 }
fn process_batch( storage: &Arc<StorageEngine>, compression: &Arc<CompressionManager>, batch_size: usize, ) -> Result<usize> { // Simplified - actual implementation would: // 1. Iterate over RocksDB keys // 2. Check if already compressed (parse block header) // 3. If not compressed, read value, compress, write back // 4. Use write batch for atomicity
// Placeholder Ok(batch_size) }}Testing Strategy
Unit Tests
#[cfg(test)]mod tests { use super::*;
#[test] fn test_fsst_codec_string_data() { let codec = FsstCodec::new(); let data = b"Hello world! Hello world! Hello world!";
let compressed = codec.compress(data).unwrap(); assert!(compressed.len() < data.len());
let decompressed = codec.decompress(&compressed, Some(data.len())).unwrap(); assert_eq!(decompressed, data); }
#[test] fn test_alp_codec_float_data() { let codec = AlpCodec::new(); let floats: Vec<f64> = vec![1.23, 1.24, 1.25, 1.26, 1.27]; let data = unsafe { std::slice::from_raw_parts( floats.as_ptr() as *const u8, floats.len() * 8, ) };
let compressed = codec.compress(data).unwrap(); assert!(compressed.len() < data.len());
let decompressed = codec.decompress(&compressed, Some(data.len())).unwrap(); assert_eq!(decompressed, data); }
#[test] fn test_pattern_analyzer() { let analyzer = PatternAnalyzer::new();
// String data let string_data = b"Hello world! This is a test string."; assert_eq!(analyzer.analyze(string_data).unwrap(), DataPattern::StringData);
// Float data let floats: Vec<f64> = vec![1.1, 2.2, 3.3, 4.4, 5.5]; let float_data = unsafe { std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 8) }; assert_eq!(analyzer.analyze(float_data).unwrap(), DataPattern::FloatingPointData);
// Low cardinality let low_card = vec![1u8, 2, 3, 1, 2, 3, 1, 2, 3]; // Only 3 unique values assert_eq!(analyzer.analyze(&low_card).unwrap(), DataPattern::LowCardinality); }
#[test] fn test_codec_selector() { let selector = CodecSelector::new();
// String data should use FSST let string_data = b"Hello world!"; let algo = selector.select_for_pattern(DataPattern::StringData, string_data).unwrap(); assert_eq!(algo, CompressionAlgorithm::Fsst);
// Float data should use ALP let algo = selector.select_for_pattern(DataPattern::FloatingPointData, &[]).unwrap(); assert_eq!(algo, CompressionAlgorithm::Alp);
// Low cardinality should use Dictionary let algo = selector.select_for_pattern(DataPattern::LowCardinality, &[]).unwrap(); assert_eq!(algo, CompressionAlgorithm::Dictionary); }
#[test] fn test_compression_manager() { let config = CompressionConfig::default(); let manager = CompressionManager::new(config).unwrap();
let data = b"Test data for compression"; let block = manager.compress(data).unwrap();
assert!(block.original_size == data.len()); assert!(block.algorithm != CompressionAlgorithm::None);
let decompressed = manager.decompress(&block).unwrap(); assert_eq!(decompressed, data); }
#[test] fn test_block_format_serialization() { let block = CompressedBlock { algorithm: CompressionAlgorithm::Fsst, version: 1, original_size: 1000, data: vec![1, 2, 3, 4, 5], checksum: 0x12345678, compressed_at: 1700000000, pattern: DataPattern::StringData, };
let serialized = BlockFormat::serialize(&block); let deserialized = BlockFormat::deserialize(&serialized).unwrap();
assert_eq!(deserialized.algorithm, block.algorithm); assert_eq!(deserialized.original_size, block.original_size); assert_eq!(deserialized.data, block.data); assert_eq!(deserialized.checksum, block.checksum); }}Integration Tests
#[cfg(test)]mod integration_tests { use super::*; use crate::storage::StorageEngine; use crate::config::Config;
#[test] fn test_storage_engine_with_compression() { let mut config = Config::in_memory(); config.storage.compression.enabled = true; config.storage.compression.algorithm = CompressionAlgorithmConfig::Auto;
let engine = StorageEngine::open_in_memory(&config).unwrap();
// Insert data let key = b"test_key".to_vec(); let value = b"test_value_with_some_repeated_data_repeated_data".to_vec(); engine.put(&key, &value).unwrap();
// Retrieve data let retrieved = engine.get(&key).unwrap(); assert_eq!(retrieved, Some(value)); }
#[test] fn test_tuple_compression() { let mut config = Config::in_memory(); config.storage.compression.enabled = true;
let engine = StorageEngine::open_in_memory(&config).unwrap();
// Create tuple let tuple = Tuple::new(vec![ Value::String("Hello".to_string()), Value::String("World".to_string()), Value::Int4(42), ]);
// Insert tuple let row_id = engine.insert_tuple("test_table", tuple.clone()).unwrap();
// Scan table let tuples = engine.scan_table("test_table").unwrap(); assert_eq!(tuples.len(), 1); assert_eq!(tuples[0], tuple); }
#[test] fn test_compression_stats() { let mut config = Config::in_memory(); config.storage.compression.enabled = true; config.storage.compression.auto.collect_stats = true;
let engine = StorageEngine::open_in_memory(&config).unwrap();
// Insert multiple keys for i in 0..100 { let key = format!("key_{}", i).into_bytes(); let value = format!("value_{}_repeated_repeated", i).into_bytes(); engine.put(&key, &value).unwrap(); }
// Get stats let stats = engine.compression_stats().unwrap(); assert!(stats.total_compressions > 0); assert!(stats.avg_compression_ratio > 1.0); }}Benchmark Tests
#[cfg(test)]mod benchmarks { use super::*; use criterion::{black_box, criterion_group, criterion_main, Criterion, BenchmarkId};
fn bench_compression_algorithms(c: &mut Criterion) { let data_sizes = vec![1024, 10_240, 102_400, 1_024_000]; // 1KB to 1MB
for size in data_sizes { let data = vec![b'x'; size];
let mut group = c.benchmark_group(format!("compress_{}_bytes", size));
// FSST group.bench_function(BenchmarkId::new("fsst", size), |b| { let codec = FsstCodec::new(); b.iter(|| codec.compress(black_box(&data))) });
// ALP (with float data) let float_data: Vec<f64> = (0..size/8).map(|i| i as f64 * 1.1).collect(); let float_bytes = unsafe { std::slice::from_raw_parts( float_data.as_ptr() as *const u8, float_data.len() * 8, ) }; group.bench_function(BenchmarkId::new("alp", size), |b| { let codec = AlpCodec::new(); b.iter(|| codec.compress(black_box(float_bytes))) });
// Dictionary group.bench_function(BenchmarkId::new("dictionary", size), |b| { let codec = DictionaryCodec::new(); b.iter(|| codec.compress(black_box(&data))) });
group.finish(); } }
fn bench_pattern_analysis(c: &mut Criterion) { let analyzer = PatternAnalyzer::new();
let mut group = c.benchmark_group("pattern_analysis");
// String data let string_data = b"Hello world! ".repeat(100); group.bench_function("string_data", |b| { b.iter(|| analyzer.analyze(black_box(&string_data))) });
// Float data let floats: Vec<f64> = (0..1000).map(|i| i as f64 * 1.1).collect(); let float_bytes = unsafe { std::slice::from_raw_parts(floats.as_ptr() as *const u8, floats.len() * 8) }; group.bench_function("float_data", |b| { b.iter(|| analyzer.analyze(black_box(float_bytes))) });
group.finish(); }
criterion_group!(benches, bench_compression_algorithms, bench_pattern_analysis); criterion_main!(benches);}