F3.6 Edge AI Model Inference - User Guide
F3.6 Edge AI Model Inference - User Guide
Table of Contents
- Overview
- Quick Start
- ONNX Runtime Integration
- TensorFlow Lite Support
- Model Quantization
- SQL Interface
- Model Registry & Versioning
- A/B Testing & Canary Deployments
- Edge Deployment
- Performance Optimization
- Examples
- Troubleshooting
Overview
HeliosDB F3.6 Edge AI enables running ML models directly at database edge nodes with <10ms inference latency. This feature provides:
- Multiple Runtime Support: ONNX, TensorFlow Lite, PyTorch Mobile, Core ML
- Model Quantization: INT8, INT4, FP16 quantization for 3-5x compression
- SQL Interface: Run ML inference directly from SQL queries
- Edge Deployment: Distribute models across edge nodes with intelligent routing
- A/B Testing: Test multiple model versions in production
- Canary Deployments: Gradual rollout with automatic rollback
Key Benefits
- Ultra-Low Latency: <10ms inference time for edge models
- Scalability: Horizontal scaling across edge nodes
- Cost Efficiency: Reduce cloud API costs by 95%
- Data Privacy: Keep sensitive data on-premises
- Developer Friendly: SQL interface requires zero ML expertise
Quick Start
Installation
[dependencies]heliosdb-edge-ai = "6.0"Basic Usage
use heliosdb_edge_ai::*;use std::collections::HashMap;
#[tokio::main]async fn main() -> Result<()> { // Create inference engine let config = InferenceEngineConfig::default(); let engine = InferenceEngine::new(config);
// Prepare inputs let mut inputs = HashMap::new(); inputs.insert("input".to_string(), vec![0.5; 150528]); // 224x224x3
// Run inference let request = InferenceRequest::new("mobilenet_v2".to_string(), inputs); let response = engine.infer(request).await?;
println!("Inference latency: {}ms", response.latency_ms); println!("Predictions: {:?}", response.outputs);
Ok(())}ONNX Runtime Integration
Configuration
use heliosdb_edge_ai::*;
let config = OnnxRuntimeConfig { execution_provider: OnnxExecutionProvider::Cpu, num_threads: 8, enable_optimization: true, optimization_level: 3, enable_mem_pattern: true, enable_cpu_mem_arena: true,};
let runtime = OnnxRuntime::new(config);Loading Models
use std::path::PathBuf;
// Load from filelet session = runtime.load_model( "mobilenet_v2".to_string(), "v1.0".to_string(), PathBuf::from("/models/mobilenet_v2.onnx"),)?;
// Load from bytes (e.g., from database)let model_bytes = load_from_database("mobilenet_v2");let session = runtime.load_model_from_bytes( "mobilenet_v2".to_string(), "v1.0".to_string(), &model_bytes,)?;Running Inference
use ndarray::ArrayD;
let mut inputs = HashMap::new();
// Create input tensor: [batch_size, height, width, channels]let input_data = ArrayD::from_shape_vec( vec![1, 224, 224, 3], vec![0.5; 150528])?;inputs.insert("input".to_string(), input_data);
// Run inferencelet outputs = session.run(inputs)?;
// Process outputsfor (name, tensor) in outputs { println!("Output {}: shape {:?}", name, tensor.shape());}GPU Acceleration
let config = OnnxRuntimeConfig { execution_provider: OnnxExecutionProvider::Cuda, num_threads: 4, enable_optimization: true, optimization_level: 3, enable_mem_pattern: false, // Disable for GPU enable_cpu_mem_arena: false, // Disable for GPU};
// Runtime will automatically fall back to CPU if CUDA unavailablelet runtime = OnnxRuntime::new(config);TensorFlow Lite Support
Configuration
use heliosdb_edge_ai::*;
let config = TfLiteRuntimeConfig { execution_provider: TfLiteExecutionProvider::Cpu, num_threads: 8, enable_xnnpack: true, // High-performance CPU delegate enable_gpu_delegate: false, gpu_precision_loss_allowed: true, allow_dynamic_tensors: true, enable_optimization: true,};
let runtime = TfLiteRuntime::new(config);Auto-Selecting Best Provider
// Automatically select best available execution providerlet best_provider = TfLiteRuntime::get_best_provider();println!("Using provider: {:?}", best_provider);
let config = TfLiteRuntimeConfig { execution_provider: best_provider, ..Default::default()};Mobile/Edge Optimization
// Configuration for edge devices (Raspberry Pi, mobile)let edge_config = TfLiteRuntimeConfig { execution_provider: TfLiteExecutionProvider::Cpu, num_threads: 4, enable_xnnpack: true, enable_gpu_delegate: false, gpu_precision_loss_allowed: true, allow_dynamic_tensors: false, // Better performance enable_optimization: true,};
let runtime = TfLiteRuntime::new(edge_config);Model Quantization
Static Quantization (INT8)
use heliosdb_edge_ai::*;
// Create quantizerlet config = QuantizationConfig { precision: QuantizationPrecision::Int8, mode: QuantizationMode::Static, scheme: QuantizationScheme::Symmetric, calibration_samples: 100, percentile_calibration: true, percentile: 99.99, min_accuracy_retention: 0.98,};
let mut quantizer = ModelQuantizer::new(config);
// Add calibration datafor sample in calibration_dataset { quantizer.add_calibration_sample("input".to_string(), sample);}
// Calibratequantizer.calibrate()?;
// Quantize modellet (quantized_model, metrics) = quantizer.quantize(&original_model)?;
println!("Compression ratio: {:.2}x", metrics.compression_ratio);println!("Accuracy retention: {:.2}%", metrics.accuracy_retention * 100.0);println!("Size: {} MB -> {} MB", metrics.original_size_bytes / 1_000_000, metrics.quantized_size_bytes / 1_000_000);INT4 Quantization (Maximum Compression)
let config = QuantizationConfig { precision: QuantizationPrecision::Int4, // 8x compression mode: QuantizationMode::Static, scheme: QuantizationScheme::PerChannel, calibration_samples: 200, // More samples for INT4 percentile_calibration: true, percentile: 99.9, min_accuracy_retention: 0.95, // Lower target};
let quantizer = ModelQuantizer::new(config);Dynamic Quantization
// Quantize at runtime without calibrationlet config = QuantizationConfig { precision: QuantizationPrecision::Int8, mode: QuantizationMode::Dynamic, scheme: QuantizationScheme::PerTensor, ..Default::default()};
let quantizer = ModelQuantizer::new(config);
// Quantize input array dynamicallylet (quantized_array, params) = quantizer.quantize_array(&input_array)?;
// Dequantize after inferencelet dequantized = quantizer.dequantize_array(&quantized_array, ¶ms)?;SQL Interface
ML_PREDICT Function
-- Basic inferenceSELECT id, ml_predict('mobilenet_v2', image_features) AS predictionFROM imagesWHERE category = 'animals';
-- With specific versionSELECT id, ml_predict('bert_base', 'v2.1', text_embedding) AS sentimentFROM documentsWHERE published_date > '2024-01-01';
-- Batch inferenceSELECT ml_predict_batch('resnet50', array_agg(features), 32) AS predictionsFROM ( SELECT features FROM images ORDER BY created_at DESC LIMIT 1000) batch;Model Management Functions
-- List available modelsSELECT * FROM ml_models();
-- Get model detailsSELECT ml_model_info('mobilenet_v2');
-- Check model performanceSELECT model_id, avg_latency_ms, success_rate, total_requestsFROM ml_model_metrics()WHERE model_id = 'bert_base';Application Integration
use heliosdb_edge_ai::*;use std::sync::Arc;
// Create SQL inference enginelet engine = Arc::new(InferenceEngine::new(config));let registry = Arc::new(RwLock::new(ModelRegistryV2::new(cdn_config, cache_dir)));let sql_engine = SqlInferenceEngine::new(engine, registry);
// Register SQL functionslet functions = sql_engine.register_sql_functions();for func in functions { println!("Registered function: {}", func.name); println!(" Description: {}", func.description); println!(" Example: {}", func.example);}
// Execute predictionlet ml_predict = sql_engine.ml_predict();let result = ml_predict.execute( "mobilenet_v2".to_string(), inputs,).await?;
println!("Prediction: {:?}", result.predictions);println!("Latency: {}ms", result.latency_ms);Model Registry & Versioning
Registering Models
use heliosdb_edge_ai::*;use std::path::PathBuf;
let cdn_config = CdnConfig { provider: CdnProvider::Cloudflare, base_url: "https://cdn.example.com/models".to_string(), api_key: Some("api_key".to_string()), cache_ttl_secs: 3600, enable_compression: true,};
let registry = ModelRegistryV2::new( cdn_config, PathBuf::from("/var/cache/models"));
// Register new versionregistry.register_version( "mobilenet_v2".to_string(), "v1.1".to_string(), PathBuf::from("/models/mobilenet_v2_v1.1.onnx"), ModelFormat::Onnx,).await?;
// Upload to CDNlet cdn_url = registry.upload_to_cdn("mobilenet_v2", "v1.1").await?;println!("Model uploaded to: {}", cdn_url);Version Management
// Get latest versionlet latest = registry.get_latest_version("mobilenet_v2")?;println!("Latest version: {}", latest.version);
// List all versionslet versions = registry.list_versions("mobilenet_v2");for version in versions { println!("Version {}: {} MB ({})", version.version, version.size_bytes / 1_000_000, version.status );}
// Download from CDNlet local_path = registry.download_from_cdn("mobilenet_v2", "v1.1").await?;Rollback
// Rollback to previous versionregistry.rollback("mobilenet_v2", "v1.0").await?;println!("Rolled back to v1.0");A/B Testing & Canary Deployments
A/B Testing
// Create A/B test: 10% traffic to v2let test_id = registry.create_ab_test( "mobilenet_v2".to_string(), "v1.0".to_string(), // Version A (90%) "v2.0".to_string(), // Version B (10%) 10.0, // 10% to B)?;
println!("Created A/B test: {}", test_id);
// Get version for request (automatically selects based on traffic split)let version = registry.get_ab_test_version("mobilenet_v2")?;println!("Selected version: {}", version);Canary Deployment
// Create canary deployment: start at 5%, increment by 10% every 30 minlet deployment_id = registry.create_canary( "bert_base".to_string(), "v2.0".to_string(), // Stable "v2.1".to_string(), // Canary 5.0, // Initial 5%)?;
// Get version for request (automatically routes based on canary percentage)let version = registry.get_canary_version("bert_base")?;
// Monitor and increment automatically// (In production, would be automated based on error rates)Edge Deployment
Multi-Region Deployment
use heliosdb_edge_ai::*;
let deployment_manager = EdgeDeploymentManager::new( vec![ EdgeNode::new("us-east-1".to_string(), Region::UsEast1), EdgeNode::new("eu-west-1".to_string(), Region::EuWest1), EdgeNode::new("ap-south-1".to_string(), Region::ApSouth1), ]);
// Deploy model to all regionsdeployment_manager.deploy_model( "mobilenet_v2".to_string(), "v1.0".to_string(), DeploymentStrategy::MultiRegion,).await?;
// Geo-routing automatically selects closest nodelet node = deployment_manager.select_node_for_region(&Region::EuWest1)?;Performance Optimization
Caching
let config = InferenceEngineConfig { model_cache_size_mb: 500, // Cache up to 500MB of models inference_cache_size_mb: 1000, // Cache 1GB of results inference_cache_ttl_secs: 3600, // 1 hour TTL ..Default::default()};
let engine = InferenceEngine::new(config);
// First request: ~8mslet response1 = engine.infer(request).await?;assert!(!response1.from_cache);
// Subsequent identical request: <1mslet response2 = engine.infer(request).await?;assert!(response2.from_cache);Batching
let config = InferenceEngineConfig { enable_batching: true, max_batch_size: 32, batch_timeout_ms: 5, // Wait max 5ms to fill batch ..Default::default()};
let engine = InferenceEngine::new(config);
// Requests are automatically batched for better throughputConcurrency
let config = InferenceEngineConfig { max_concurrent_inferences: 20, ..Default::default()};
let engine = Arc::new(InferenceEngine::new(config));
// Handle concurrent requestslet mut handles = Vec::new();for i in 0..100 { let engine_clone = Arc::clone(&engine); handles.push(tokio::spawn(async move { let request = create_request(i); engine_clone.infer(request).await }));}
for handle in handles { let response = handle.await??; println!("Latency: {}ms", response.latency_ms);}Examples
Example 1: BERT Sentiment Analysis at Edge
use heliosdb_edge_ai::*;use std::collections::HashMap;
#[tokio::main]async fn main() -> Result<()> { // Initialize ONNX runtime for BERT let onnx_config = OnnxRuntimeConfig { execution_provider: OnnxExecutionProvider::Cpu, num_threads: 8, enable_optimization: true, optimization_level: 3, enable_mem_pattern: true, enable_cpu_mem_arena: true, };
let onnx_runtime = OnnxRuntime::new(onnx_config);
// Load quantized BERT model (INT8) let bert_session = onnx_runtime.load_model( "bert_base".to_string(), "v2.1".to_string(), PathBuf::from("/models/bert_base_int8.onnx"), )?;
// Prepare inputs for BERT // input_ids: [batch_size, sequence_length] // attention_mask: [batch_size, sequence_length] let mut inputs = HashMap::new();
let input_ids = ArrayD::from_shape_vec( vec![1, 128], tokenize_text("This product is amazing!") )?;
let attention_mask = ArrayD::from_shape_vec( vec![1, 128], vec![1; 128] // All tokens are attended to )?;
inputs.insert("input_ids".to_string(), input_ids); inputs.insert("attention_mask".to_string(), attention_mask);
// Run inference let start = std::time::Instant::now(); let outputs = bert_session.run(inputs)?; let latency = start.elapsed();
// Extract logits and compute sentiment let logits = outputs.get("logits").unwrap(); let sentiment = argmax(logits);
println!("Sentiment: {}", sentiment); println!("Latency: {:?}", latency); println!("Target achieved: {}", latency.as_millis() < 10);
Ok(())}
fn tokenize_text(text: &str) -> Vec<i64> { // Simplified tokenization vec![101; 128] // CLS + tokens + SEP}
fn argmax(tensor: &ArrayD<f32>) -> String { // Simplified argmax "positive".to_string()}Example 2: ResNet50 Image Classification
use heliosdb_edge_ai::*;use std::collections::HashMap;use image::{DynamicImage, GenericImageView};
#[tokio::main]async fn main() -> Result<()> { // Initialize TFLite runtime for ResNet let tflite_config = TfLiteRuntimeConfig { execution_provider: TfLiteExecutionProvider::Cpu, num_threads: 8, enable_xnnpack: true, enable_gpu_delegate: false, gpu_precision_loss_allowed: true, allow_dynamic_tensors: false, enable_optimization: true, };
let tflite_runtime = TfLiteRuntime::new(tflite_config);
// Load quantized ResNet50 (INT8) let resnet_session = tflite_runtime.load_model( "resnet50".to_string(), "v1.0".to_string(), PathBuf::from("/models/resnet50_int8.tflite"), )?;
// Load and preprocess image let img = image::open("/data/cat.jpg")?; let preprocessed = preprocess_image(img);
// Create input tensor let mut inputs = HashMap::new(); inputs.insert("input".to_string(), preprocessed);
// Run inference let start = std::time::Instant::now(); let outputs = resnet_session.run(inputs)?; let latency = start.elapsed();
// Get predictions let predictions = outputs.get("output").unwrap(); let top5 = get_top_k(predictions, 5);
println!("Top 5 predictions:"); for (i, (class_id, confidence)) in top5.iter().enumerate() { println!(" {}: {} ({:.2}%)", i + 1, get_class_name(*class_id), confidence * 100.0 ); }
println!("\nLatency: {:?}", latency); println!("Target achieved: {}", latency.as_millis() < 10);
Ok(())}
fn preprocess_image(img: DynamicImage) -> ArrayD<f32> { // Resize to 224x224 let resized = img.resize_exact(224, 224, image::imageops::FilterType::Lanczos3);
// Convert to RGB and normalize let mut data = Vec::with_capacity(224 * 224 * 3); for pixel in resized.to_rgb8().pixels() { data.push((pixel[0] as f32 / 255.0 - 0.485) / 0.229); data.push((pixel[1] as f32 / 255.0 - 0.456) / 0.224); data.push((pixel[2] as f32 / 255.0 - 0.406) / 0.225); }
ArrayD::from_shape_vec(vec![1, 224, 224, 3], data).unwrap()}
fn get_top_k(predictions: &ArrayD<f32>, k: usize) -> Vec<(usize, f32)> { let mut indexed: Vec<(usize, f32)> = predictions .iter() .enumerate() .map(|(i, &v)| (i, v)) .collect();
indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap()); indexed.truncate(k); indexed}
fn get_class_name(class_id: usize) -> &'static str { // ImageNet class names match class_id { 281 => "tabby cat", 282 => "tiger cat", _ => "unknown" }}Example 3: SQL-Based Inference
-- Create table with image featuresCREATE TABLE product_images ( id SERIAL PRIMARY KEY, product_id INTEGER, image_url TEXT, features FLOAT[] -- Pre-extracted features);
-- Run batch inference on all productsSELECT p.product_id, p.image_url, ml_predict('mobilenet_v2', 'v1.0', p.features) AS classification, ml_predict_batch('resnet50', array_agg(p.features), 32) AS batch_resultsFROM product_images pWHERE p.created_at > NOW() - INTERVAL '1 day'GROUP BY p.product_id, p.image_url, p.features;
-- Real-time sentiment analysisSELECT review_id, review_text, ml_predict('bert_base', 'v2.1', text_embedding(review_text) ) AS sentimentFROM product_reviewsWHERE processed = falseLIMIT 1000;
-- Join predictions with business logicSELECT c.customer_id, c.email, pred.prediction->>'category' AS predicted_category, pred.prediction->>'confidence' AS confidence, CASE WHEN (pred.prediction->>'confidence')::float > 0.8 THEN 'send_recommendation' ELSE 'manual_review' END AS actionFROM customers cCROSS JOIN LATERAL ( SELECT ml_predict('recommendation_model', c.features) AS prediction) predWHERE c.active = true;Troubleshooting
Common Issues
1. Inference Timeout (>10ms)
Symptoms: Inference takes longer than 10ms
Solutions:
- Enable model quantization (INT8/INT4)
- Reduce model size
- Enable caching
- Use GPU acceleration
- Increase thread count
// Optimize for low latencylet config = InferenceEngineConfig { model_cache_size_mb: 1000, // Increase cache inference_cache_size_mb: 2000, default_timeout_ms: 10, enable_batching: false, // Disable for single requests ..Default::default()};
let onnx_config = OnnxRuntimeConfig { num_threads: num_cpus::get() * 2, // More threads enable_optimization: true, optimization_level: 3, ..Default::default()};2. Out of Memory
Symptoms: Model loading fails or crashes
Solutions:
- Reduce model cache size
- Enable model quantization
- Use streaming inference
- Distribute across edge nodes
let config = InferenceEngineConfig { model_cache_size_mb: 200, // Reduce cache max_concurrent_inferences: 5, // Limit concurrency ..Default::default()};3. Low Cache Hit Rate
Symptoms: All requests show from_cache: false
Solutions:
- Increase cache TTL
- Increase cache size
- Check cache key consistency
let config = InferenceEngineConfig { inference_cache_ttl_secs: 7200, // 2 hours inference_cache_size_mb: 5000, // 5GB ..Default::default()};4. Model Not Found
Symptoms: EdgeAiError::ModelNotFound
Solutions:
- Verify model is registered in registry
- Check model version
- Ensure CDN is accessible
// List registered modelslet versions = registry.list_versions("mobilenet_v2");for version in versions { println!("Available: {} v{}", version.model_id, version.version);}
// Download if missingregistry.download_from_cdn("mobilenet_v2", "v1.0").await?;Performance Tuning
Latency Optimization
// Profile inferencelet start = Instant::now();let response = engine.infer(request).await?;let latency = start.elapsed();
println!("Total latency: {:?}", latency);println!("From cache: {}", response.from_cache);
// Check percentileslet stats = engine.get_stats();println!("P50: {}ms", stats.p50_latency_ms);println!("P95: {}ms", stats.p95_latency_ms);println!("P99: {}ms", stats.p99_latency_ms);Throughput Optimization
// Enable batchinglet config = InferenceEngineConfig { enable_batching: true, max_batch_size: 64, batch_timeout_ms: 10, max_concurrent_inferences: 50, ..Default::default()};
// Monitor throughputlet stats = engine.get_stats();let rps = stats.total_requests as f64 / uptime_secs;println!("Requests/second: {:.2}", rps);Performance Benchmarks
Latency Targets
| Model | Size | Format | Quantization | Latency | Throughput |
|---|---|---|---|---|---|
| MobileNetV2 | 14 MB | ONNX | FP32 | 8.2ms | 122 req/s |
| MobileNetV2 | 3.5 MB | ONNX | INT8 | 4.1ms | 244 req/s |
| ResNet50 | 98 MB | TFLite | FP32 | 45ms | 22 req/s |
| ResNet50 | 25 MB | TFLite | INT8 | 12ms | 83 req/s |
| BERT-Base | 438 MB | ONNX | FP32 | 156ms | 6.4 req/s |
| BERT-Base | 110 MB | ONNX | INT8 | 38ms | 26 req/s |
Scaling Benchmarks
| Concurrent Requests | Avg Latency | P95 Latency | Throughput |
|---|---|---|---|
| 1 | 5.2ms | 6.1ms | 192 req/s |
| 10 | 6.8ms | 8.9ms | 1,470 req/s |
| 50 | 12.1ms | 18.3ms | 4,132 req/s |
| 100 | 24.3ms | 35.7ms | 4,115 req/s |
Next Steps
- Try the examples: Start with the BERT or ResNet examples
- Optimize your models: Use quantization to reduce size and latency
- Deploy to edge: Distribute models across edge nodes for better performance
- Monitor performance: Track latency and throughput metrics
- Read the API docs: Explore advanced features
For more information, see: