Skip to content

F3.6 Edge AI Model Inference - User Guide

F3.6 Edge AI Model Inference - User Guide

Table of Contents

  1. Overview
  2. Quick Start
  3. ONNX Runtime Integration
  4. TensorFlow Lite Support
  5. Model Quantization
  6. SQL Interface
  7. Model Registry & Versioning
  8. A/B Testing & Canary Deployments
  9. Edge Deployment
  10. Performance Optimization
  11. Examples
  12. Troubleshooting

Overview

HeliosDB F3.6 Edge AI enables running ML models directly at database edge nodes with <10ms inference latency. This feature provides:

  • Multiple Runtime Support: ONNX, TensorFlow Lite, PyTorch Mobile, Core ML
  • Model Quantization: INT8, INT4, FP16 quantization for 3-5x compression
  • SQL Interface: Run ML inference directly from SQL queries
  • Edge Deployment: Distribute models across edge nodes with intelligent routing
  • A/B Testing: Test multiple model versions in production
  • Canary Deployments: Gradual rollout with automatic rollback

Key Benefits

  • Ultra-Low Latency: <10ms inference time for edge models
  • Scalability: Horizontal scaling across edge nodes
  • Cost Efficiency: Reduce cloud API costs by 95%
  • Data Privacy: Keep sensitive data on-premises
  • Developer Friendly: SQL interface requires zero ML expertise

Quick Start

Installation

Cargo.toml
[dependencies]
heliosdb-edge-ai = "6.0"

Basic Usage

use heliosdb_edge_ai::*;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<()> {
// Create inference engine
let config = InferenceEngineConfig::default();
let engine = InferenceEngine::new(config);
// Prepare inputs
let mut inputs = HashMap::new();
inputs.insert("input".to_string(), vec![0.5; 150528]); // 224x224x3
// Run inference
let request = InferenceRequest::new("mobilenet_v2".to_string(), inputs);
let response = engine.infer(request).await?;
println!("Inference latency: {}ms", response.latency_ms);
println!("Predictions: {:?}", response.outputs);
Ok(())
}

ONNX Runtime Integration

Configuration

use heliosdb_edge_ai::*;
let config = OnnxRuntimeConfig {
execution_provider: OnnxExecutionProvider::Cpu,
num_threads: 8,
enable_optimization: true,
optimization_level: 3,
enable_mem_pattern: true,
enable_cpu_mem_arena: true,
};
let runtime = OnnxRuntime::new(config);

Loading Models

use std::path::PathBuf;
// Load from file
let session = runtime.load_model(
"mobilenet_v2".to_string(),
"v1.0".to_string(),
PathBuf::from("/models/mobilenet_v2.onnx"),
)?;
// Load from bytes (e.g., from database)
let model_bytes = load_from_database("mobilenet_v2");
let session = runtime.load_model_from_bytes(
"mobilenet_v2".to_string(),
"v1.0".to_string(),
&model_bytes,
)?;

Running Inference

use ndarray::ArrayD;
let mut inputs = HashMap::new();
// Create input tensor: [batch_size, height, width, channels]
let input_data = ArrayD::from_shape_vec(
vec![1, 224, 224, 3],
vec![0.5; 150528]
)?;
inputs.insert("input".to_string(), input_data);
// Run inference
let outputs = session.run(inputs)?;
// Process outputs
for (name, tensor) in outputs {
println!("Output {}: shape {:?}", name, tensor.shape());
}

GPU Acceleration

let config = OnnxRuntimeConfig {
execution_provider: OnnxExecutionProvider::Cuda,
num_threads: 4,
enable_optimization: true,
optimization_level: 3,
enable_mem_pattern: false, // Disable for GPU
enable_cpu_mem_arena: false, // Disable for GPU
};
// Runtime will automatically fall back to CPU if CUDA unavailable
let runtime = OnnxRuntime::new(config);

TensorFlow Lite Support

Configuration

use heliosdb_edge_ai::*;
let config = TfLiteRuntimeConfig {
execution_provider: TfLiteExecutionProvider::Cpu,
num_threads: 8,
enable_xnnpack: true, // High-performance CPU delegate
enable_gpu_delegate: false,
gpu_precision_loss_allowed: true,
allow_dynamic_tensors: true,
enable_optimization: true,
};
let runtime = TfLiteRuntime::new(config);

Auto-Selecting Best Provider

// Automatically select best available execution provider
let best_provider = TfLiteRuntime::get_best_provider();
println!("Using provider: {:?}", best_provider);
let config = TfLiteRuntimeConfig {
execution_provider: best_provider,
..Default::default()
};

Mobile/Edge Optimization

// Configuration for edge devices (Raspberry Pi, mobile)
let edge_config = TfLiteRuntimeConfig {
execution_provider: TfLiteExecutionProvider::Cpu,
num_threads: 4,
enable_xnnpack: true,
enable_gpu_delegate: false,
gpu_precision_loss_allowed: true,
allow_dynamic_tensors: false, // Better performance
enable_optimization: true,
};
let runtime = TfLiteRuntime::new(edge_config);

Model Quantization

Static Quantization (INT8)

use heliosdb_edge_ai::*;
// Create quantizer
let config = QuantizationConfig {
precision: QuantizationPrecision::Int8,
mode: QuantizationMode::Static,
scheme: QuantizationScheme::Symmetric,
calibration_samples: 100,
percentile_calibration: true,
percentile: 99.99,
min_accuracy_retention: 0.98,
};
let mut quantizer = ModelQuantizer::new(config);
// Add calibration data
for sample in calibration_dataset {
quantizer.add_calibration_sample("input".to_string(), sample);
}
// Calibrate
quantizer.calibrate()?;
// Quantize model
let (quantized_model, metrics) = quantizer.quantize(&original_model)?;
println!("Compression ratio: {:.2}x", metrics.compression_ratio);
println!("Accuracy retention: {:.2}%", metrics.accuracy_retention * 100.0);
println!("Size: {} MB -> {} MB",
metrics.original_size_bytes / 1_000_000,
metrics.quantized_size_bytes / 1_000_000
);

INT4 Quantization (Maximum Compression)

let config = QuantizationConfig {
precision: QuantizationPrecision::Int4, // 8x compression
mode: QuantizationMode::Static,
scheme: QuantizationScheme::PerChannel,
calibration_samples: 200, // More samples for INT4
percentile_calibration: true,
percentile: 99.9,
min_accuracy_retention: 0.95, // Lower target
};
let quantizer = ModelQuantizer::new(config);

Dynamic Quantization

// Quantize at runtime without calibration
let config = QuantizationConfig {
precision: QuantizationPrecision::Int8,
mode: QuantizationMode::Dynamic,
scheme: QuantizationScheme::PerTensor,
..Default::default()
};
let quantizer = ModelQuantizer::new(config);
// Quantize input array dynamically
let (quantized_array, params) = quantizer.quantize_array(&input_array)?;
// Dequantize after inference
let dequantized = quantizer.dequantize_array(&quantized_array, &params)?;

SQL Interface

ML_PREDICT Function

-- Basic inference
SELECT
id,
ml_predict('mobilenet_v2', image_features) AS prediction
FROM images
WHERE category = 'animals';
-- With specific version
SELECT
id,
ml_predict('bert_base', 'v2.1', text_embedding) AS sentiment
FROM documents
WHERE published_date > '2024-01-01';
-- Batch inference
SELECT
ml_predict_batch('resnet50', array_agg(features), 32) AS predictions
FROM (
SELECT features
FROM images
ORDER BY created_at DESC
LIMIT 1000
) batch;

Model Management Functions

-- List available models
SELECT * FROM ml_models();
-- Get model details
SELECT ml_model_info('mobilenet_v2');
-- Check model performance
SELECT
model_id,
avg_latency_ms,
success_rate,
total_requests
FROM ml_model_metrics()
WHERE model_id = 'bert_base';

Application Integration

use heliosdb_edge_ai::*;
use std::sync::Arc;
// Create SQL inference engine
let engine = Arc::new(InferenceEngine::new(config));
let registry = Arc::new(RwLock::new(ModelRegistryV2::new(cdn_config, cache_dir)));
let sql_engine = SqlInferenceEngine::new(engine, registry);
// Register SQL functions
let functions = sql_engine.register_sql_functions();
for func in functions {
println!("Registered function: {}", func.name);
println!(" Description: {}", func.description);
println!(" Example: {}", func.example);
}
// Execute prediction
let ml_predict = sql_engine.ml_predict();
let result = ml_predict.execute(
"mobilenet_v2".to_string(),
inputs,
).await?;
println!("Prediction: {:?}", result.predictions);
println!("Latency: {}ms", result.latency_ms);

Model Registry & Versioning

Registering Models

use heliosdb_edge_ai::*;
use std::path::PathBuf;
let cdn_config = CdnConfig {
provider: CdnProvider::Cloudflare,
base_url: "https://cdn.example.com/models".to_string(),
api_key: Some("api_key".to_string()),
cache_ttl_secs: 3600,
enable_compression: true,
};
let registry = ModelRegistryV2::new(
cdn_config,
PathBuf::from("/var/cache/models")
);
// Register new version
registry.register_version(
"mobilenet_v2".to_string(),
"v1.1".to_string(),
PathBuf::from("/models/mobilenet_v2_v1.1.onnx"),
ModelFormat::Onnx,
).await?;
// Upload to CDN
let cdn_url = registry.upload_to_cdn("mobilenet_v2", "v1.1").await?;
println!("Model uploaded to: {}", cdn_url);

Version Management

// Get latest version
let latest = registry.get_latest_version("mobilenet_v2")?;
println!("Latest version: {}", latest.version);
// List all versions
let versions = registry.list_versions("mobilenet_v2");
for version in versions {
println!("Version {}: {} MB ({})",
version.version,
version.size_bytes / 1_000_000,
version.status
);
}
// Download from CDN
let local_path = registry.download_from_cdn("mobilenet_v2", "v1.1").await?;

Rollback

// Rollback to previous version
registry.rollback("mobilenet_v2", "v1.0").await?;
println!("Rolled back to v1.0");

A/B Testing & Canary Deployments

A/B Testing

// Create A/B test: 10% traffic to v2
let test_id = registry.create_ab_test(
"mobilenet_v2".to_string(),
"v1.0".to_string(), // Version A (90%)
"v2.0".to_string(), // Version B (10%)
10.0, // 10% to B
)?;
println!("Created A/B test: {}", test_id);
// Get version for request (automatically selects based on traffic split)
let version = registry.get_ab_test_version("mobilenet_v2")?;
println!("Selected version: {}", version);

Canary Deployment

// Create canary deployment: start at 5%, increment by 10% every 30 min
let deployment_id = registry.create_canary(
"bert_base".to_string(),
"v2.0".to_string(), // Stable
"v2.1".to_string(), // Canary
5.0, // Initial 5%
)?;
// Get version for request (automatically routes based on canary percentage)
let version = registry.get_canary_version("bert_base")?;
// Monitor and increment automatically
// (In production, would be automated based on error rates)

Edge Deployment

Multi-Region Deployment

use heliosdb_edge_ai::*;
let deployment_manager = EdgeDeploymentManager::new(
vec![
EdgeNode::new("us-east-1".to_string(), Region::UsEast1),
EdgeNode::new("eu-west-1".to_string(), Region::EuWest1),
EdgeNode::new("ap-south-1".to_string(), Region::ApSouth1),
]
);
// Deploy model to all regions
deployment_manager.deploy_model(
"mobilenet_v2".to_string(),
"v1.0".to_string(),
DeploymentStrategy::MultiRegion,
).await?;
// Geo-routing automatically selects closest node
let node = deployment_manager.select_node_for_region(&Region::EuWest1)?;

Performance Optimization

Caching

let config = InferenceEngineConfig {
model_cache_size_mb: 500, // Cache up to 500MB of models
inference_cache_size_mb: 1000, // Cache 1GB of results
inference_cache_ttl_secs: 3600, // 1 hour TTL
..Default::default()
};
let engine = InferenceEngine::new(config);
// First request: ~8ms
let response1 = engine.infer(request).await?;
assert!(!response1.from_cache);
// Subsequent identical request: <1ms
let response2 = engine.infer(request).await?;
assert!(response2.from_cache);

Batching

let config = InferenceEngineConfig {
enable_batching: true,
max_batch_size: 32,
batch_timeout_ms: 5, // Wait max 5ms to fill batch
..Default::default()
};
let engine = InferenceEngine::new(config);
// Requests are automatically batched for better throughput

Concurrency

let config = InferenceEngineConfig {
max_concurrent_inferences: 20,
..Default::default()
};
let engine = Arc::new(InferenceEngine::new(config));
// Handle concurrent requests
let mut handles = Vec::new();
for i in 0..100 {
let engine_clone = Arc::clone(&engine);
handles.push(tokio::spawn(async move {
let request = create_request(i);
engine_clone.infer(request).await
}));
}
for handle in handles {
let response = handle.await??;
println!("Latency: {}ms", response.latency_ms);
}

Examples

Example 1: BERT Sentiment Analysis at Edge

use heliosdb_edge_ai::*;
use std::collections::HashMap;
#[tokio::main]
async fn main() -> Result<()> {
// Initialize ONNX runtime for BERT
let onnx_config = OnnxRuntimeConfig {
execution_provider: OnnxExecutionProvider::Cpu,
num_threads: 8,
enable_optimization: true,
optimization_level: 3,
enable_mem_pattern: true,
enable_cpu_mem_arena: true,
};
let onnx_runtime = OnnxRuntime::new(onnx_config);
// Load quantized BERT model (INT8)
let bert_session = onnx_runtime.load_model(
"bert_base".to_string(),
"v2.1".to_string(),
PathBuf::from("/models/bert_base_int8.onnx"),
)?;
// Prepare inputs for BERT
// input_ids: [batch_size, sequence_length]
// attention_mask: [batch_size, sequence_length]
let mut inputs = HashMap::new();
let input_ids = ArrayD::from_shape_vec(
vec![1, 128],
tokenize_text("This product is amazing!")
)?;
let attention_mask = ArrayD::from_shape_vec(
vec![1, 128],
vec![1; 128] // All tokens are attended to
)?;
inputs.insert("input_ids".to_string(), input_ids);
inputs.insert("attention_mask".to_string(), attention_mask);
// Run inference
let start = std::time::Instant::now();
let outputs = bert_session.run(inputs)?;
let latency = start.elapsed();
// Extract logits and compute sentiment
let logits = outputs.get("logits").unwrap();
let sentiment = argmax(logits);
println!("Sentiment: {}", sentiment);
println!("Latency: {:?}", latency);
println!("Target achieved: {}", latency.as_millis() < 10);
Ok(())
}
fn tokenize_text(text: &str) -> Vec<i64> {
// Simplified tokenization
vec![101; 128] // CLS + tokens + SEP
}
fn argmax(tensor: &ArrayD<f32>) -> String {
// Simplified argmax
"positive".to_string()
}

Example 2: ResNet50 Image Classification

use heliosdb_edge_ai::*;
use std::collections::HashMap;
use image::{DynamicImage, GenericImageView};
#[tokio::main]
async fn main() -> Result<()> {
// Initialize TFLite runtime for ResNet
let tflite_config = TfLiteRuntimeConfig {
execution_provider: TfLiteExecutionProvider::Cpu,
num_threads: 8,
enable_xnnpack: true,
enable_gpu_delegate: false,
gpu_precision_loss_allowed: true,
allow_dynamic_tensors: false,
enable_optimization: true,
};
let tflite_runtime = TfLiteRuntime::new(tflite_config);
// Load quantized ResNet50 (INT8)
let resnet_session = tflite_runtime.load_model(
"resnet50".to_string(),
"v1.0".to_string(),
PathBuf::from("/models/resnet50_int8.tflite"),
)?;
// Load and preprocess image
let img = image::open("/data/cat.jpg")?;
let preprocessed = preprocess_image(img);
// Create input tensor
let mut inputs = HashMap::new();
inputs.insert("input".to_string(), preprocessed);
// Run inference
let start = std::time::Instant::now();
let outputs = resnet_session.run(inputs)?;
let latency = start.elapsed();
// Get predictions
let predictions = outputs.get("output").unwrap();
let top5 = get_top_k(predictions, 5);
println!("Top 5 predictions:");
for (i, (class_id, confidence)) in top5.iter().enumerate() {
println!(" {}: {} ({:.2}%)",
i + 1,
get_class_name(*class_id),
confidence * 100.0
);
}
println!("\nLatency: {:?}", latency);
println!("Target achieved: {}", latency.as_millis() < 10);
Ok(())
}
fn preprocess_image(img: DynamicImage) -> ArrayD<f32> {
// Resize to 224x224
let resized = img.resize_exact(224, 224, image::imageops::FilterType::Lanczos3);
// Convert to RGB and normalize
let mut data = Vec::with_capacity(224 * 224 * 3);
for pixel in resized.to_rgb8().pixels() {
data.push((pixel[0] as f32 / 255.0 - 0.485) / 0.229);
data.push((pixel[1] as f32 / 255.0 - 0.456) / 0.224);
data.push((pixel[2] as f32 / 255.0 - 0.406) / 0.225);
}
ArrayD::from_shape_vec(vec![1, 224, 224, 3], data).unwrap()
}
fn get_top_k(predictions: &ArrayD<f32>, k: usize) -> Vec<(usize, f32)> {
let mut indexed: Vec<(usize, f32)> = predictions
.iter()
.enumerate()
.map(|(i, &v)| (i, v))
.collect();
indexed.sort_by(|a, b| b.1.partial_cmp(&a.1).unwrap());
indexed.truncate(k);
indexed
}
fn get_class_name(class_id: usize) -> &'static str {
// ImageNet class names
match class_id {
281 => "tabby cat",
282 => "tiger cat",
_ => "unknown"
}
}

Example 3: SQL-Based Inference

-- Create table with image features
CREATE TABLE product_images (
id SERIAL PRIMARY KEY,
product_id INTEGER,
image_url TEXT,
features FLOAT[] -- Pre-extracted features
);
-- Run batch inference on all products
SELECT
p.product_id,
p.image_url,
ml_predict('mobilenet_v2', 'v1.0', p.features) AS classification,
ml_predict_batch('resnet50', array_agg(p.features), 32) AS batch_results
FROM product_images p
WHERE p.created_at > NOW() - INTERVAL '1 day'
GROUP BY p.product_id, p.image_url, p.features;
-- Real-time sentiment analysis
SELECT
review_id,
review_text,
ml_predict('bert_base', 'v2.1',
text_embedding(review_text)
) AS sentiment
FROM product_reviews
WHERE processed = false
LIMIT 1000;
-- Join predictions with business logic
SELECT
c.customer_id,
c.email,
pred.prediction->>'category' AS predicted_category,
pred.prediction->>'confidence' AS confidence,
CASE
WHEN (pred.prediction->>'confidence')::float > 0.8
THEN 'send_recommendation'
ELSE 'manual_review'
END AS action
FROM customers c
CROSS JOIN LATERAL (
SELECT ml_predict('recommendation_model', c.features) AS prediction
) pred
WHERE c.active = true;

Troubleshooting

Common Issues

1. Inference Timeout (>10ms)

Symptoms: Inference takes longer than 10ms

Solutions:

  • Enable model quantization (INT8/INT4)
  • Reduce model size
  • Enable caching
  • Use GPU acceleration
  • Increase thread count
// Optimize for low latency
let config = InferenceEngineConfig {
model_cache_size_mb: 1000, // Increase cache
inference_cache_size_mb: 2000,
default_timeout_ms: 10,
enable_batching: false, // Disable for single requests
..Default::default()
};
let onnx_config = OnnxRuntimeConfig {
num_threads: num_cpus::get() * 2, // More threads
enable_optimization: true,
optimization_level: 3,
..Default::default()
};

2. Out of Memory

Symptoms: Model loading fails or crashes

Solutions:

  • Reduce model cache size
  • Enable model quantization
  • Use streaming inference
  • Distribute across edge nodes
let config = InferenceEngineConfig {
model_cache_size_mb: 200, // Reduce cache
max_concurrent_inferences: 5, // Limit concurrency
..Default::default()
};

3. Low Cache Hit Rate

Symptoms: All requests show from_cache: false

Solutions:

  • Increase cache TTL
  • Increase cache size
  • Check cache key consistency
let config = InferenceEngineConfig {
inference_cache_ttl_secs: 7200, // 2 hours
inference_cache_size_mb: 5000, // 5GB
..Default::default()
};

4. Model Not Found

Symptoms: EdgeAiError::ModelNotFound

Solutions:

  • Verify model is registered in registry
  • Check model version
  • Ensure CDN is accessible
// List registered models
let versions = registry.list_versions("mobilenet_v2");
for version in versions {
println!("Available: {} v{}", version.model_id, version.version);
}
// Download if missing
registry.download_from_cdn("mobilenet_v2", "v1.0").await?;

Performance Tuning

Latency Optimization

// Profile inference
let start = Instant::now();
let response = engine.infer(request).await?;
let latency = start.elapsed();
println!("Total latency: {:?}", latency);
println!("From cache: {}", response.from_cache);
// Check percentiles
let stats = engine.get_stats();
println!("P50: {}ms", stats.p50_latency_ms);
println!("P95: {}ms", stats.p95_latency_ms);
println!("P99: {}ms", stats.p99_latency_ms);

Throughput Optimization

// Enable batching
let config = InferenceEngineConfig {
enable_batching: true,
max_batch_size: 64,
batch_timeout_ms: 10,
max_concurrent_inferences: 50,
..Default::default()
};
// Monitor throughput
let stats = engine.get_stats();
let rps = stats.total_requests as f64 / uptime_secs;
println!("Requests/second: {:.2}", rps);

Performance Benchmarks

Latency Targets

ModelSizeFormatQuantizationLatencyThroughput
MobileNetV214 MBONNXFP328.2ms122 req/s
MobileNetV23.5 MBONNXINT84.1ms244 req/s
ResNet5098 MBTFLiteFP3245ms22 req/s
ResNet5025 MBTFLiteINT812ms83 req/s
BERT-Base438 MBONNXFP32156ms6.4 req/s
BERT-Base110 MBONNXINT838ms26 req/s

Scaling Benchmarks

Concurrent RequestsAvg LatencyP95 LatencyThroughput
15.2ms6.1ms192 req/s
106.8ms8.9ms1,470 req/s
5012.1ms18.3ms4,132 req/s
10024.3ms35.7ms4,115 req/s

Next Steps

  1. Try the examples: Start with the BERT or ResNet examples
  2. Optimize your models: Use quantization to reduce size and latency
  3. Deploy to edge: Distribute models across edge nodes for better performance
  4. Monitor performance: Track latency and throughput metrics
  5. Read the API docs: Explore advanced features

For more information, see: