Skip to content

Checkpoint Encryption Security Specification

Checkpoint Encryption Security Specification

Feature: F1.3 Flink Streaming - Checkpoint Encryption Version: 1.0 Date: October 29, 2025 Status: APPROVED Priority: P0 CRITICAL SECURITY Classification: CONFIDENTIAL Implementation Timeline: Week 2 (Nov 11-17, 2025)


โš  CRITICAL SECURITY GAP

Current Status: F1.3 checkpoints are stored unencrypted, exposing:

  • Application state data
  • User data in state backends
  • Metadata (job IDs, timestamps, configuration)

Risk Level: HIGH Impact: Data breach, compliance violations (GDPR, HIPAA, PCI-DSS) Likelihood: MEDIUM (depends on deployment environment)

This specification addresses the gap with production-grade encryption.


Executive Summary

This specification defines a comprehensive security architecture for encrypting F1.3 Flink Streaming checkpoints. The system uses AES-256-GCM authenticated encryption with multi-cloud KMS integration (AWS KMS, Azure Key Vault, GCP KMS) and automatic key rotation (30-day policy).

Security Objectives

ObjectiveDescriptionStatus
ConfidentialityProtect checkpoint data from unauthorized accessAES-256-GCM
IntegrityDetect tampering with checkpoint dataGMAC authentication
AvailabilityEnsure checkpoints remain accessibleKey versioning
ComplianceMeet FIPS 140-2, GDPR, HIPAA requirementsKMS integration
AuditabilityLog all encryption/decryption operationsAudit logging

Requirements

Functional Requirements

IDRequirementPriority
FR-1Encrypt checkpoint data with AES-256-GCMP0
FR-2Support AWS KMS integrationP0
FR-3Support Azure Key Vault integrationP0
FR-4Support GCP KMS integration (coming soon)P1
FR-5Automatic key rotation every 30 daysP0
FR-6Key versioning for backward compatibilityP0
FR-7Encrypt checkpoint metadataP0
FR-8Audit logging for all crypto operationsP1
FR-9Multi-tenant key isolationP1
FR-10FIPS 140-2 compliance (optional)P2

Non-Functional Requirements

IDRequirementTarget
NFR-1Encryption overhead<5%
NFR-2Decryption latency<10ms
NFR-3Key rotation time<1 second
NFR-4KMS call latency<100ms
NFR-5Test coverage>95%
NFR-6Security auditPass all findings

๐Ÿ— Architecture

High-Level Design

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚ EncryptedCheckpointStore โ”‚
โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ค
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Encryption โ”‚ โ”‚ KMS Client โ”‚ โ”‚ Key Rotation โ”‚ โ”‚
โ”‚ โ”‚ Engine โ”‚ โ”‚ (AWS/Azure) โ”‚ โ”‚ Manager โ”‚ โ”‚
โ”‚ โ”‚ (AES-256-GCM) โ”‚ โ”‚ โ”‚ โ”‚ (30-day) โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”โ”‚
โ”‚ โ”‚ Data Flow โ”‚โ”‚
โ”‚ โ”œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”คโ”‚
โ”‚ โ”‚ โ”‚โ”‚
โ”‚ โ”‚ Plaintext โ”€โ”€โ–ถ Encrypt โ”€โ”€โ–ถ Store โ”€โ”€โ–ถ Retrieve โ”€โ”€โ–ถ Decryptโ”‚โ”‚
โ”‚ โ”‚ Checkpoint (AES-GCM) (S3/FS) (Encrypted) (Plain)โ”‚โ”‚
โ”‚ โ”‚ โ”‚โ”‚
โ”‚ โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚โ”‚
โ”‚ โ”‚ โ”‚ KMS โ”‚โ—€โ”€โ”€โ”€ Get DEK (encrypted) โ”€โ”€โ”€โ”‚ Stored โ”‚ โ”‚โ”‚
โ”‚ โ”‚ โ”‚ Master โ”‚ โ”‚ Metadata โ”‚ โ”‚โ”‚
โ”‚ โ”‚ โ”‚ Key โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚โ”‚
โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚โ”‚
โ”‚ โ”‚ โ”‚โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜โ”‚
โ”‚ โ”‚
โ”‚ โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚
โ”‚ โ”‚ Audit Logger โ”‚ โ”‚ Metrics โ”‚ โ”‚ Key Cache โ”‚ โ”‚
โ”‚ โ”‚ (Compliance) โ”‚ โ”‚ (Performance) โ”‚ โ”‚ (Performance) โ”‚ โ”‚
โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚
โ”‚ โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜

Encryption Architecture

We use envelope encryption for performance and security:

  1. Master Key (MEK): Stored in KMS (AWS/Azure/GCP)
  2. Data Encryption Key (DEK): Generated per checkpoint, encrypted with MEK
  3. Checkpoint Data: Encrypted with DEK using AES-256-GCM

Benefits:

  • Fast encryption (no KMS call per operation)
  • Secure key management (MEK never leaves KMS)
  • Efficient key rotation (re-encrypt DEKs, not data)

Process:

1. Generate DEK (256-bit random key)
2. Encrypt checkpoint data with DEK (AES-256-GCM)
3. Encrypt DEK with MEK (KMS call)
4. Store: encrypted_data || encrypted_DEK || metadata

API Design

Configuration

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EncryptionConfig {
/// Encryption enabled
pub enabled: bool,
/// Encryption algorithm (aes-256-gcm, aes-256-cbc)
pub algorithm: EncryptionAlgorithm,
/// KMS provider
pub kms_provider: KmsProvider,
/// KMS configuration
pub kms_config: KmsConfig,
/// Key rotation policy
pub key_rotation: KeyRotationPolicy,
/// Audit logging enabled
pub audit_logging: bool,
/// FIPS mode (use FIPS 140-2 validated crypto)
pub fips_mode: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum EncryptionAlgorithm {
/// AES-256-GCM (recommended)
Aes256Gcm,
/// AES-256-CBC with HMAC-SHA256
Aes256CbcHmac,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum KmsProvider {
AwsKms(AwsKmsConfig),
AzureKeyVault(AzureKeyVaultConfig),
GcpKms(GcpKmsConfig),
LocalHsm(LocalHsmConfig), // For testing
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsKmsConfig {
/// AWS region
pub region: String,
/// KMS key ID or ARN
pub key_id: String,
/// AWS credentials (optional, uses IAM role by default)
pub credentials: Option<AwsCredentials>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AzureKeyVaultConfig {
/// Key Vault URL
pub vault_url: String,
/// Key name
pub key_name: String,
/// Key version (optional, uses latest)
pub key_version: Option<String>,
/// Azure credentials
pub credentials: AzureCredentials,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KeyRotationPolicy {
/// Rotation enabled
pub enabled: bool,
/// Rotation interval (days)
pub interval_days: u32,
/// Max key age (days) before forced rotation
pub max_age_days: u32,
/// Auto-rotate on schedule
pub auto_rotate: bool,
}
impl Default for KeyRotationPolicy {
fn default() -> Self {
Self {
enabled: true,
interval_days: 30,
max_age_days: 90,
auto_rotate: true,
}
}
}

Encrypted Checkpoint Store

pub struct EncryptedCheckpointStore {
/// Inner store (S3, filesystem, etc.)
inner_store: Arc<dyn CheckpointStore>,
/// Encryption engine
encryption_engine: Arc<EncryptionEngine>,
/// KMS client
kms_client: Arc<dyn KmsClient>,
/// Configuration
config: EncryptionConfig,
/// Key cache (for performance)
key_cache: Arc<KeyCache>,
/// Audit logger
audit_logger: Arc<AuditLogger>,
/// Metrics
metrics: Arc<EncryptionMetrics>,
}
impl EncryptedCheckpointStore {
pub async fn new(
inner_store: Arc<dyn CheckpointStore>,
config: EncryptionConfig,
) -> Result<Self> {
// 1. Create KMS client
let kms_client = Self::create_kms_client(&config.kms_provider).await?;
// 2. Verify KMS connectivity
kms_client.test_connection().await?;
// 3. Create encryption engine
let encryption_engine = EncryptionEngine::new(config.algorithm)?;
// 4. Initialize key cache
let key_cache = KeyCache::new(100); // Cache 100 keys
// 5. Initialize audit logger
let audit_logger = if config.audit_logging {
AuditLogger::new("checkpoint_encryption").await?
} else {
AuditLogger::noop()
};
// 6. Initialize metrics
let metrics = EncryptionMetrics::new();
Ok(Self {
inner_store,
encryption_engine,
kms_client,
config,
key_cache,
audit_logger,
metrics,
})
}
async fn create_kms_client(provider: &KmsProvider) -> Result<Arc<dyn KmsClient>> {
match provider {
KmsProvider::AwsKms(config) => {
Ok(Arc::new(AwsKmsClient::new(config.clone()).await?))
}
KmsProvider::AzureKeyVault(config) => {
Ok(Arc::new(AzureKmsClient::new(config.clone()).await?))
}
KmsProvider::GcpKms(config) => {
Ok(Arc::new(GcpKmsClient::new(config.clone()).await?))
}
KmsProvider::LocalHsm(config) => {
Ok(Arc::new(LocalHsmClient::new(config.clone())?))
}
}
}
}
#[async_trait]
impl CheckpointStore for EncryptedCheckpointStore {
async fn save(&self, checkpoint_id: CheckpointId, data: &[u8]) -> Result<()> {
let start = Instant::now();
// 1. Generate DEK (Data Encryption Key)
let dek = self.encryption_engine.generate_key()?;
// 2. Encrypt checkpoint data with DEK
let encrypted_data = self.encryption_engine.encrypt(&dek, data)?;
// 3. Encrypt DEK with MEK (Master Encryption Key from KMS)
let encrypted_dek = self.kms_client.encrypt(&dek.bytes).await?;
// 4. Create encrypted checkpoint
let encrypted_checkpoint = EncryptedCheckpoint {
checkpoint_id,
encrypted_data,
encrypted_dek,
algorithm: self.config.algorithm,
key_version: self.kms_client.current_key_version().await?,
created_at: SystemTime::now(),
};
// 5. Serialize and store
let serialized = bincode::serialize(&encrypted_checkpoint)?;
self.inner_store.save(checkpoint_id, &serialized).await?;
// 6. Audit log
self.audit_logger.log_encryption(checkpoint_id, data.len()).await;
// 7. Metrics
self.metrics.encryption_count.inc();
self.metrics.encryption_duration.observe(start.elapsed().as_secs_f64());
Ok(())
}
async fn load(&self, checkpoint_id: CheckpointId) -> Result<Vec<u8>> {
let start = Instant::now();
// 1. Load encrypted checkpoint
let serialized = self.inner_store.load(checkpoint_id).await?;
let encrypted_checkpoint: EncryptedCheckpoint = bincode::deserialize(&serialized)?;
// 2. Check key cache
let dek = if let Some(cached_dek) = self.key_cache.get(&encrypted_checkpoint.encrypted_dek) {
cached_dek
} else {
// 3. Decrypt DEK with KMS
let dek_bytes = self.kms_client.decrypt(&encrypted_checkpoint.encrypted_dek).await?;
let dek = DataEncryptionKey::from_bytes(dek_bytes)?;
// 4. Cache DEK
self.key_cache.insert(encrypted_checkpoint.encrypted_dek.clone(), dek.clone());
dek
};
// 5. Decrypt checkpoint data with DEK
let plaintext = self.encryption_engine.decrypt(&dek, &encrypted_checkpoint.encrypted_data)?;
// 6. Audit log
self.audit_logger.log_decryption(checkpoint_id, plaintext.len()).await;
// 7. Metrics
self.metrics.decryption_count.inc();
self.metrics.decryption_duration.observe(start.elapsed().as_secs_f64());
Ok(plaintext)
}
async fn delete(&self, checkpoint_id: CheckpointId) -> Result<()> {
// Delegate to inner store
self.inner_store.delete(checkpoint_id).await?;
// Audit log
self.audit_logger.log_deletion(checkpoint_id).await;
Ok(())
}
async fn list(&self) -> Result<Vec<CheckpointId>> {
self.inner_store.list().await
}
}

Encryption Engine

pub struct EncryptionEngine {
algorithm: EncryptionAlgorithm,
}
impl EncryptionEngine {
pub fn new(algorithm: EncryptionAlgorithm) -> Result<Self> {
Ok(Self { algorithm })
}
pub fn generate_key(&self) -> Result<DataEncryptionKey> {
let mut key_bytes = vec![0u8; 32]; // 256 bits
OsRng.fill_bytes(&mut key_bytes);
Ok(DataEncryptionKey::from_bytes(key_bytes)?)
}
pub fn encrypt(&self, key: &DataEncryptionKey, plaintext: &[u8]) -> Result<Vec<u8>> {
match self.algorithm {
EncryptionAlgorithm::Aes256Gcm => self.encrypt_aes_gcm(key, plaintext),
EncryptionAlgorithm::Aes256CbcHmac => self.encrypt_aes_cbc_hmac(key, plaintext),
}
}
pub fn decrypt(&self, key: &DataEncryptionKey, ciphertext: &[u8]) -> Result<Vec<u8>> {
match self.algorithm {
EncryptionAlgorithm::Aes256Gcm => self.decrypt_aes_gcm(key, ciphertext),
EncryptionAlgorithm::Aes256CbcHmac => self.decrypt_aes_cbc_hmac(key, ciphertext),
}
}
fn encrypt_aes_gcm(&self, key: &DataEncryptionKey, plaintext: &[u8]) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, Key, Nonce};
use aes_gcm::aead::{Aead, NewAead};
let cipher = Aes256Gcm::new(Key::from_slice(&key.bytes));
// Generate random nonce (96 bits)
let mut nonce_bytes = [0u8; 12];
OsRng.fill_bytes(&mut nonce_bytes);
let nonce = Nonce::from_slice(&nonce_bytes);
// Encrypt
let ciphertext = cipher.encrypt(nonce, plaintext)
.map_err(|e| Error::EncryptionFailed(e.to_string()))?;
// Format: nonce (12 bytes) || ciphertext || tag (16 bytes)
let mut result = Vec::with_capacity(12 + ciphertext.len());
result.extend_from_slice(&nonce_bytes);
result.extend_from_slice(&ciphertext);
Ok(result)
}
fn decrypt_aes_gcm(&self, key: &DataEncryptionKey, ciphertext: &[u8]) -> Result<Vec<u8>> {
use aes_gcm::{Aes256Gcm, Key, Nonce};
use aes_gcm::aead::{Aead, NewAead};
if ciphertext.len() < 12 {
return Err(Error::DecryptionFailed("Invalid ciphertext length".to_string()));
}
let cipher = Aes256Gcm::new(Key::from_slice(&key.bytes));
// Extract nonce and ciphertext
let nonce = Nonce::from_slice(&ciphertext[..12]);
let ct = &ciphertext[12..];
// Decrypt (includes authentication tag verification)
let plaintext = cipher.decrypt(nonce, ct)
.map_err(|e| Error::DecryptionFailed(e.to_string()))?;
Ok(plaintext)
}
// AES-256-CBC with HMAC-SHA256 (alternative)
fn encrypt_aes_cbc_hmac(&self, key: &DataEncryptionKey, plaintext: &[u8]) -> Result<Vec<u8>> {
// Implementation similar to AES-GCM but using CBC mode + HMAC
todo!("Implement AES-CBC-HMAC")
}
fn decrypt_aes_cbc_hmac(&self, key: &DataEncryptionKey, ciphertext: &[u8]) -> Result<Vec<u8>> {
todo!("Implement AES-CBC-HMAC")
}
}

KMS Client Trait

#[async_trait]
pub trait KmsClient: Send + Sync {
/// Encrypt data with master key
async fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>>;
/// Decrypt data with master key
async fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>>;
/// Get current key version
async fn current_key_version(&self) -> Result<String>;
/// Test KMS connectivity
async fn test_connection(&self) -> Result<()>;
/// Generate data key (envelope encryption)
async fn generate_data_key(&self, key_spec: KeySpec) -> Result<(Vec<u8>, Vec<u8>)>;
}
pub struct AwsKmsClient {
kms: aws_sdk_kms::Client,
key_id: String,
}
impl AwsKmsClient {
pub async fn new(config: AwsKmsConfig) -> Result<Self> {
let sdk_config = aws_config::from_env()
.region(Region::new(config.region))
.load()
.await;
let kms = aws_sdk_kms::Client::new(&sdk_config);
Ok(Self {
kms,
key_id: config.key_id,
})
}
}
#[async_trait]
impl KmsClient for AwsKmsClient {
async fn encrypt(&self, plaintext: &[u8]) -> Result<Vec<u8>> {
let output = self.kms
.encrypt()
.key_id(&self.key_id)
.plaintext(Blob::new(plaintext))
.send()
.await
.map_err(|e| Error::KmsFailed(e.to_string()))?;
Ok(output.ciphertext_blob.unwrap().into_inner())
}
async fn decrypt(&self, ciphertext: &[u8]) -> Result<Vec<u8>> {
let output = self.kms
.decrypt()
.key_id(&self.key_id)
.ciphertext_blob(Blob::new(ciphertext))
.send()
.await
.map_err(|e| Error::KmsFailed(e.to_string()))?;
Ok(output.plaintext.unwrap().into_inner())
}
async fn current_key_version(&self) -> Result<String> {
let output = self.kms
.describe_key()
.key_id(&self.key_id)
.send()
.await
.map_err(|e| Error::KmsFailed(e.to_string()))?;
Ok(output.key_metadata.unwrap().key_id)
}
async fn test_connection(&self) -> Result<()> {
self.kms
.describe_key()
.key_id(&self.key_id)
.send()
.await
.map_err(|e| Error::KmsFailed(e.to_string()))?;
Ok(())
}
async fn generate_data_key(&self, key_spec: KeySpec) -> Result<(Vec<u8>, Vec<u8>)> {
let output = self.kms
.generate_data_key()
.key_id(&self.key_id)
.key_spec(key_spec.into())
.send()
.await
.map_err(|e| Error::KmsFailed(e.to_string()))?;
let plaintext_key = output.plaintext.unwrap().into_inner();
let encrypted_key = output.ciphertext_blob.unwrap().into_inner();
Ok((plaintext_key, encrypted_key))
}
}

Key Rotation Manager

pub struct KeyRotationManager {
kms_client: Arc<dyn KmsClient>,
policy: KeyRotationPolicy,
current_key_version: RwLock<String>,
last_rotation: RwLock<SystemTime>,
}
impl KeyRotationManager {
pub async fn new(
kms_client: Arc<dyn KmsClient>,
policy: KeyRotationPolicy,
) -> Result<Self> {
let current_key_version = kms_client.current_key_version().await?;
Ok(Self {
kms_client,
policy,
current_key_version: RwLock::new(current_key_version),
last_rotation: RwLock::new(SystemTime::now()),
})
}
pub async fn should_rotate(&self) -> Result<bool> {
if !self.policy.enabled {
return Ok(false);
}
let last_rotation = *self.last_rotation.read().await;
let elapsed = SystemTime::now().duration_since(last_rotation)?;
Ok(elapsed.as_secs() > (self.policy.interval_days as u64 * 86400))
}
pub async fn rotate_key(&self) -> Result<String> {
// Request KMS to create new key version
let new_version = self.kms_client.current_key_version().await?;
// Update current version
*self.current_key_version.write().await = new_version.clone();
*self.last_rotation.write().await = SystemTime::now();
Ok(new_version)
}
pub async fn start_auto_rotation(&self) {
if !self.policy.auto_rotate {
return;
}
let policy = self.policy.clone();
let self_clone = Arc::new(self.clone());
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(3600)).await; // Check every hour
if let Ok(should_rotate) = self_clone.should_rotate().await {
if should_rotate {
if let Err(e) = self_clone.rotate_key().await {
eprintln!("Key rotation failed: {}", e);
}
}
}
}
});
}
}

๐Ÿงช Testing Strategy

Unit Tests (8 tests)

  1. Encryption/Decryption correctness

    • Encrypt โ†’ Decrypt = original data
    • Multiple encryptions produce different ciphertexts (nonce randomness)
  2. Key generation

    • 256-bit keys
    • Randomness validation
  3. Algorithm selection

    • AES-256-GCM
    • AES-256-CBC-HMAC
  4. Error handling

    • Invalid ciphertext
    • Corrupted data
    • Authentication failure

Integration Tests (12 tests)

  1. AWS KMS integration

    • Connect to AWS KMS
    • Encrypt/decrypt with AWS KMS
    • Generate data key
  2. Azure Key Vault integration

    • Connect to Azure Key Vault
    • Encrypt/decrypt with Azure
    • Key versioning
  3. End-to-end checkpoint encryption

    • Save encrypted checkpoint
    • Load and decrypt checkpoint
    • Verify data integrity
  4. Key rotation

    • Manual rotation
    • Automatic rotation (simulated)
    • Backward compatibility (old key version)
  5. Performance

    • Encryption overhead <5%
    • Decryption latency <10ms
    • KMS call latency <100ms
  6. Multi-tenant isolation

    • Different keys for different tenants
    • No cross-tenant key access
  7. Audit logging

    • Encrypt operations logged
    • Decrypt operations logged
    • Key rotation logged
  8. Error recovery

    • KMS unavailable
    • Network timeout
    • Invalid key ID
  9. FIPS compliance (if enabled)

    • FIPS-validated crypto modules
    • Compliance validation
  10. Key cache

    • Cache hit rate
    • Cache eviction
    • Cache coherency
  11. Concurrent access

    • Multiple threads encrypting
    • Multiple threads decrypting
    • Thread safety
  12. Security audit prep

    • Penetration testing scenarios
    • Vulnerability scanning
    • Compliance checks

Security Tests (5 tests)

  1. Authentication failure detection

    • Tampered ciphertext
    • Modified tag
    • Incorrect key
  2. Forward secrecy

    • Old checkpoints remain encrypted with old keys
    • New checkpoints use new keys
  3. Key compromise recovery

    • Rotate compromised key
    • Re-encrypt affected checkpoints
  4. Side-channel protection

    • Constant-time operations
    • No timing leaks
  5. Compliance validation

    • FIPS 140-2 (if applicable)
    • GDPR compliance
    • HIPAA compliance

Security Audit Preparation

External Audit Scope

AreaDescriptionStatus
Cryptographic DesignEnvelope encryption, key managementTo be audited
ImplementationAES-GCM, KMS integrationTo be audited
Key RotationAutomated rotation, versioningTo be audited
Access ControlsMulti-tenant isolationTo be audited
Audit LoggingTamper-evident logsTo be audited
ComplianceFIPS, GDPR, HIPAATo be audited

Audit Deliverables

  1. Security Audit Report (from external auditor)
  2. Penetration Test Results
  3. Compliance Certification (FIPS, SOC 2)
  4. Vulnerability Scan Results
  5. Remediation Plan (if findings exist)

Schedule

  • Week 2 (Nov 11-17): Implementation complete
  • Week 3 (Nov 18-24): External security audit
  • Week 4 (Nov 25-Dec 1): Remediation (if needed)
  • Week 5 (Dec 2-8): Re-audit and certification

Metrics

Security Metrics

pub struct EncryptionMetrics {
pub encryption_count: Counter,
pub decryption_count: Counter,
pub encryption_duration: Histogram,
pub decryption_duration: Histogram,
pub kms_call_duration: Histogram,
pub key_rotation_count: Counter,
pub encryption_errors: Counter,
pub decryption_errors: Counter,
pub auth_failures: Counter,
}

Prometheus Metrics

  • heliosdb_checkpoint_encryption_total
  • heliosdb_checkpoint_decryption_total
  • heliosdb_checkpoint_encryption_duration_seconds
  • heliosdb_checkpoint_decryption_duration_seconds
  • heliosdb_kms_call_duration_seconds
  • heliosdb_key_rotation_total
  • heliosdb_encryption_errors_total
  • heliosdb_decryption_errors_total
  • heliosdb_auth_failures_total

๐Ÿšจ Security Considerations

Threat Model

ThreatMitigation
Unauthorized access to checkpoint filesAES-256-GCM encryption
Tampering with checkpointsGMAC authentication
Key compromiseKey rotation, KMS isolation
KMS unavailableKey caching, fallback mechanisms
Side-channel attacksConstant-time operations
Insider threatsAudit logging, multi-tenant isolation

Compliance

  • GDPR: Encryption at rest, key management
  • HIPAA: PHI protection, audit logging
  • PCI-DSS: Encryption requirements (3.4, 3.5)
  • SOC 2: Security controls, audit trails
  • FIPS 140-2: Validated crypto modules (optional)

Acceptance Criteria

  • All 25 tests passing (unit + integration + security)
  • Code coverage >95%
  • Encryption overhead <5% (benchmark)
  • Decryption latency <10ms (benchmark)
  • KMS integration working (AWS + Azure)
  • Key rotation functional (manual + automatic)
  • Audit logging complete
  • Security audit scheduled (Week 3)
  • Documentation complete
  • Code reviewed and approved

Document Version: 1.0 Last Updated: October 29, 2025 Status: APPROVED Classification: CONFIDENTIAL Implementation: Week 2 (Nov 11-17, 2025) Security Audit: Week 3 (Nov 18-24, 2025)