Time-Series Examples
Time-Series Examples
Version: 6.0 Last Updated: January 4, 2026
This document provides practical code examples for common time-series use cases in HeliosDB.
Table of Contents
IoT Sensor Data Ingestion
Scenario
Industrial IoT deployment with thousands of sensors reporting temperature, humidity, pressure, and vibration readings every second.
Rust Implementation
use heliosdb_storage::timeseries::{ TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy, IngestionPipeline, IngestionConfig, RetentionPolicy, DownsamplingConfig, DownsamplingTier, AggregationFunction,};use std::collections::HashMap;use std::sync::Arc;use std::time::{Duration, SystemTime, UNIX_EPOCH};use tokio::sync::mpsc;
/// IoT sensor reading#[derive(Clone)]struct SensorReading { sensor_id: String, facility: String, temperature: f64, humidity: f64, pressure: f64, vibration: f64, timestamp: u64,}
/// IoT ingestion pipelinestruct IoTIngestionService { engine: Arc<TimeSeriesEngine>, pipeline: IngestionPipeline,}
impl IoTIngestionService { async fn new(data_path: &str) -> Result<Self, Box<dyn std::error::Error>> { // Create engine with hourly partitioning for high-volume IoT data let engine = Arc::new( TimeSeriesEngine::new(data_path, PartitionStrategy::Hourly).await? );
// Configure ingestion for high throughput let config = IngestionConfig { batch_size: 10000, batch_timeout: Duration::from_millis(100), write_workers: 8, buffer_capacity: 100000, handle_out_of_order: true, max_time_skew: 30000, // 30 second tolerance for network delays backfill_mode: false, };
let pipeline = IngestionPipeline::new( config, engine.storage().clone(), engine.compressor().clone(), );
Ok(Self { engine, pipeline }) }
async fn configure_policies(&mut self) -> Result<(), Box<dyn std::error::Error>> { // Raw data: 7 days retention self.engine.set_retention_policy( RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600)) ).await;
// Configure multi-tier downsampling // Raw -> 1 minute -> 1 hour -> 1 day let downsampling = DownsamplingConfig::new(Duration::from_secs(60)) .with_aggregation(AggregationFunction::Average) .add_tier( DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(3600)) .with_retention(Duration::from_secs(30 * 24 * 3600)) ) .add_tier( DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(7 * 24 * 3600)) );
self.engine.configure_downsampling("sensors.*", downsampling).await?; Ok(()) }
async fn ingest_reading(&self, reading: SensorReading) -> Result<(), Box<dyn std::error::Error>> { let mut tags = HashMap::new(); tags.insert("sensor_id".to_string(), reading.sensor_id.clone()); tags.insert("facility".to_string(), reading.facility.clone());
let points = vec![ TimeSeriesPoint::with_tags( "sensors.temperature", reading.timestamp, reading.temperature, tags.clone(), ), TimeSeriesPoint::with_tags( "sensors.humidity", reading.timestamp, reading.humidity, tags.clone(), ), TimeSeriesPoint::with_tags( "sensors.pressure", reading.timestamp, reading.pressure, tags.clone(), ), TimeSeriesPoint::with_tags( "sensors.vibration", reading.timestamp, reading.vibration, tags, ), ];
self.pipeline.ingest_batch(&points).await?; Ok(()) }
async fn ingest_batch(&self, readings: Vec<SensorReading>) -> Result<(), Box<dyn std::error::Error>> { let mut points = Vec::with_capacity(readings.len() * 4);
for reading in readings { let mut tags = HashMap::new(); tags.insert("sensor_id".to_string(), reading.sensor_id.clone()); tags.insert("facility".to_string(), reading.facility.clone());
points.push(TimeSeriesPoint::with_tags( "sensors.temperature", reading.timestamp, reading.temperature, tags.clone(), )); points.push(TimeSeriesPoint::with_tags( "sensors.humidity", reading.timestamp, reading.humidity, tags.clone(), )); points.push(TimeSeriesPoint::with_tags( "sensors.pressure", reading.timestamp, reading.pressure, tags.clone(), )); points.push(TimeSeriesPoint::with_tags( "sensors.vibration", reading.timestamp, reading.vibration, tags, )); }
self.pipeline.ingest_batch(&points).await?; Ok(()) }}
/// Query IoT data with aggregationsasync fn query_sensor_data( engine: &TimeSeriesEngine, sensor_id: &str, hours: u64,) -> Result<(), Box<dyn std::error::Error>> { let now = SystemTime::now() .duration_since(UNIX_EPOCH)? .as_millis() as u64; let start = now - (hours * 3600 * 1000);
// Query raw temperature data let points = engine.query_range("sensors.temperature", start, now).await?;
println!("Retrieved {} temperature readings for sensor {}", points.len(), sensor_id);
// Calculate statistics if !points.is_empty() { let values: Vec<f64> = points.iter().map(|p| p.value).collect(); let avg = values.iter().sum::<f64>() / values.len() as f64; let min = values.iter().cloned().fold(f64::INFINITY, f64::min); let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
println!("Temperature: avg={:.2}C, min={:.2}C, max={:.2}C", avg, min, max); }
Ok(())}
/// Detect temperature anomaliesasync fn detect_anomalies( engine: &TimeSeriesEngine, threshold_std_dev: f64,) -> Result<Vec<(u64, f64)>, Box<dyn std::error::Error>> { use heliosdb_compute::time_series::{TimeSeries, TimeSeriesOps, DataPoint, TSValue};
let now = SystemTime::now() .duration_since(UNIX_EPOCH)? .as_millis() as u64; let start = now - 3600000; // Last hour
let points = engine.query_range("sensors.temperature", start, now).await?;
// Convert to TimeSeries format let mut ts = TimeSeries::new(); for p in &points { ts.add_point(DataPoint::new( chrono::Utc.timestamp_millis_opt(p.timestamp as i64).single().unwrap(), TSValue::Float(p.value), )); }
let anomaly_indices = TimeSeriesOps::detect_anomalies(&ts, threshold_std_dev)?;
let anomalies: Vec<(u64, f64)> = anomaly_indices .iter() .map(|&i| (points[i].timestamp, points[i].value)) .collect();
Ok(anomalies)}Financial Tick Data
Scenario
High-frequency trading system processing millions of market data ticks per second with microsecond precision.
Rust Implementation
use heliosdb_storage::timeseries::{ TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy, IngestionPipeline, IngestionConfig, DownsamplingConfig, AggregationFunction,};use std::collections::HashMap;use std::sync::Arc;use std::time::Duration;
/// Market data tick#[derive(Clone, Debug)]struct MarketTick { symbol: String, exchange: String, price: f64, volume: f64, bid: f64, ask: f64, timestamp: u64, // Microseconds since epoch}
/// Financial data ingestion servicestruct MarketDataService { engine: Arc<TimeSeriesEngine>, pipeline: IngestionPipeline,}
impl MarketDataService { async fn new(data_path: &str) -> Result<Self, Box<dyn std::error::Error>> { // Use hourly partitions for high-frequency data let engine = Arc::new( TimeSeriesEngine::new(data_path, PartitionStrategy::Hourly).await? );
// Configure for maximum throughput let config = IngestionConfig { batch_size: 50000, batch_timeout: Duration::from_millis(50), write_workers: 16, buffer_capacity: 500000, handle_out_of_order: true, max_time_skew: 5000, // 5 second tolerance backfill_mode: false, };
let pipeline = IngestionPipeline::new( config, engine.storage().clone(), engine.compressor().clone(), );
Ok(Self { engine, pipeline }) }
async fn ingest_tick(&self, tick: MarketTick) -> Result<(), Box<dyn std::error::Error>> { let mut tags = HashMap::new(); tags.insert("symbol".to_string(), tick.symbol.clone()); tags.insert("exchange".to_string(), tick.exchange.clone());
// Store multiple metrics per tick let points = vec![ TimeSeriesPoint::with_tags( "market.price", tick.timestamp, tick.price, tags.clone(), ), TimeSeriesPoint::with_tags( "market.volume", tick.timestamp, tick.volume, tags.clone(), ), TimeSeriesPoint::with_tags( "market.bid", tick.timestamp, tick.bid, tags.clone(), ), TimeSeriesPoint::with_tags( "market.ask", tick.timestamp, tick.ask, tags.clone(), ), TimeSeriesPoint::with_tags( "market.spread", tick.timestamp, tick.ask - tick.bid, tags, ), ];
self.pipeline.ingest_batch(&points).await?; Ok(()) }
/// Calculate VWAP (Volume Weighted Average Price) async fn calculate_vwap( &self, symbol: &str, start_time: u64, end_time: u64, ) -> Result<f64, Box<dyn std::error::Error>> { let prices = self.engine.query_range("market.price", start_time, end_time).await?; let volumes = self.engine.query_range("market.volume", start_time, end_time).await?;
let mut total_value = 0.0; let mut total_volume = 0.0;
for (price, volume) in prices.iter().zip(volumes.iter()) { total_value += price.value * volume.value; total_volume += volume.value; }
let vwap = if total_volume > 0.0 { total_value / total_volume } else { 0.0 };
Ok(vwap) }
/// Get OHLCV (Open, High, Low, Close, Volume) bars async fn get_ohlcv( &self, symbol: &str, bar_duration: Duration, start_time: u64, end_time: u64, ) -> Result<Vec<OHLCVBar>, Box<dyn std::error::Error>> { let prices = self.engine.query_range("market.price", start_time, end_time).await?; let volumes = self.engine.query_range("market.volume", start_time, end_time).await?;
let bar_ms = bar_duration.as_millis() as u64; let mut bars: Vec<OHLCVBar> = Vec::new();
let mut current_bar_start = (start_time / bar_ms) * bar_ms; let mut bar_prices: Vec<f64> = Vec::new(); let mut bar_volume = 0.0;
for (price, volume) in prices.iter().zip(volumes.iter()) { let bar_boundary = (price.timestamp / bar_ms) * bar_ms;
if bar_boundary != current_bar_start && !bar_prices.is_empty() { bars.push(OHLCVBar { timestamp: current_bar_start, open: bar_prices[0], high: bar_prices.iter().cloned().fold(f64::NEG_INFINITY, f64::max), low: bar_prices.iter().cloned().fold(f64::INFINITY, f64::min), close: *bar_prices.last().unwrap(), volume: bar_volume, });
bar_prices.clear(); bar_volume = 0.0; current_bar_start = bar_boundary; }
bar_prices.push(price.value); bar_volume += volume.value; }
// Close final bar if !bar_prices.is_empty() { bars.push(OHLCVBar { timestamp: current_bar_start, open: bar_prices[0], high: bar_prices.iter().cloned().fold(f64::NEG_INFINITY, f64::max), low: bar_prices.iter().cloned().fold(f64::INFINITY, f64::min), close: *bar_prices.last().unwrap(), volume: bar_volume, }); }
Ok(bars) }}
#[derive(Debug, Clone)]struct OHLCVBar { timestamp: u64, open: f64, high: f64, low: f64, close: f64, volume: f64,}
/// Calculate Bollinger Bandsfn bollinger_bands(prices: &[f64], period: usize, num_std: f64) -> Vec<(f64, f64, f64)> { let mut bands = Vec::new();
for i in period..prices.len() { let window = &prices[i-period..i]; let mean = window.iter().sum::<f64>() / period as f64; let variance = window.iter() .map(|p| (p - mean).powi(2)) .sum::<f64>() / period as f64; let std_dev = variance.sqrt();
bands.push(( mean, // Middle band (SMA) mean + num_std * std_dev, // Upper band mean - num_std * std_dev, // Lower band )); }
bands}Metrics and Monitoring
Scenario
Application performance monitoring with system metrics, custom business metrics, and distributed tracing.
Rust Implementation
use heliosdb_storage::timeseries::{ TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy, RetentionPolicy, DownsamplingConfig, DownsamplingTier, AggregationFunction,};use std::collections::HashMap;use std::sync::Arc;use std::time::{Duration, SystemTime, UNIX_EPOCH};
/// Metrics collectorstruct MetricsCollector { engine: Arc<TimeSeriesEngine>, hostname: String, service: String,}
impl MetricsCollector { async fn new( data_path: &str, hostname: String, service: String, ) -> Result<Self, Box<dyn std::error::Error>> { let engine = Arc::new( TimeSeriesEngine::new(data_path, PartitionStrategy::Daily).await? );
// Configure retention: raw 7 days, hourly 30 days, daily 365 days engine.set_retention_policy( RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600)) ).await;
// Configure downsampling engine.configure_downsampling( "system.*", DownsamplingConfig::new(Duration::from_secs(60)) .with_aggregation(AggregationFunction::Average) .add_tier( DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(7 * 24 * 3600)) ) .add_tier( DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average) .with_age_threshold(Duration::from_secs(30 * 24 * 3600)) ), ).await?;
Ok(Self { engine, hostname, service }) }
fn now() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_millis() as u64 }
/// Record gauge metric async fn gauge(&self, name: &str, value: f64, extra_tags: HashMap<String, String>) -> Result<(), Box<dyn std::error::Error>> { let mut tags = self.base_tags(); tags.extend(extra_tags);
let point = TimeSeriesPoint::with_tags( format!("system.{}", name), Self::now(), value, tags, );
self.engine.write_points(&[point]).await?; Ok(()) }
/// Record counter (increment) async fn counter(&self, name: &str, increment: f64, extra_tags: HashMap<String, String>) -> Result<(), Box<dyn std::error::Error>> { let mut tags = self.base_tags(); tags.extend(extra_tags);
let point = TimeSeriesPoint::with_tags( format!("system.{}.count", name), Self::now(), increment, tags, );
self.engine.write_points(&[point]).await?; Ok(()) }
/// Record histogram observation async fn histogram(&self, name: &str, value: f64, extra_tags: HashMap<String, String>) -> Result<(), Box<dyn std::error::Error>> { let mut tags = self.base_tags(); tags.extend(extra_tags);
// Store raw observation let point = TimeSeriesPoint::with_tags( format!("system.{}", name), Self::now(), value, tags, );
self.engine.write_points(&[point]).await?; Ok(()) }
fn base_tags(&self) -> HashMap<String, String> { let mut tags = HashMap::new(); tags.insert("host".to_string(), self.hostname.clone()); tags.insert("service".to_string(), self.service.clone()); tags }}
/// Collect system metricsasync fn collect_system_metrics(collector: &MetricsCollector) -> Result<(), Box<dyn std::error::Error>> { // Simulated system metrics (in production, use actual system calls) let cpu_usage = 45.5; let memory_used = 62.3; let disk_usage = 78.1; let network_rx = 1024.0 * 1024.0 * 150.0; // 150 MB/s let network_tx = 1024.0 * 1024.0 * 75.0; // 75 MB/s
collector.gauge("cpu_percent", cpu_usage, HashMap::new()).await?; collector.gauge("memory_percent", memory_used, HashMap::new()).await?; collector.gauge("disk_percent", disk_usage, HashMap::new()).await?; collector.gauge("network_rx_bytes", network_rx, HashMap::new()).await?; collector.gauge("network_tx_bytes", network_tx, HashMap::new()).await?;
Ok(())}
/// Record HTTP request metricsasync fn record_http_request( collector: &MetricsCollector, method: &str, path: &str, status_code: u16, duration_ms: f64,) -> Result<(), Box<dyn std::error::Error>> { let mut tags = HashMap::new(); tags.insert("method".to_string(), method.to_string()); tags.insert("path".to_string(), path.to_string()); tags.insert("status".to_string(), status_code.to_string());
// Record request count collector.counter("http_requests", 1.0, tags.clone()).await?;
// Record latency collector.histogram("http_request_duration_ms", duration_ms, tags).await?;
Ok(())}
/// Query percentiles for latency analysisasync fn get_latency_percentiles( engine: &TimeSeriesEngine, metric: &str, hours: u64,) -> Result<LatencyStats, Box<dyn std::error::Error>> { let now = SystemTime::now() .duration_since(UNIX_EPOCH)? .as_millis() as u64; let start = now - (hours * 3600 * 1000);
let points = engine.query_range(metric, start, now).await?; let mut values: Vec<f64> = points.iter().map(|p| p.value).collect(); values.sort_by(|a, b| a.partial_cmp(b).unwrap());
let percentile = |p: f64| -> f64 { if values.is_empty() { return 0.0; } let index = ((p / 100.0) * (values.len() - 1) as f64) as usize; values[index] };
Ok(LatencyStats { count: values.len(), avg: values.iter().sum::<f64>() / values.len().max(1) as f64, p50: percentile(50.0), p90: percentile(90.0), p95: percentile(95.0), p99: percentile(99.0), max: values.last().copied().unwrap_or(0.0), })}
#[derive(Debug)]struct LatencyStats { count: usize, avg: f64, p50: f64, p90: f64, p95: f64, p99: f64, max: f64,}Log Analytics
Scenario
Centralized log aggregation with structured logging, full-text search, and metrics extraction.
Rust Implementation
use heliosdb_storage::timeseries::{ TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,};use std::collections::HashMap;use std::sync::Arc;use std::time::{Duration, SystemTime, UNIX_EPOCH};
/// Log entry#[derive(Clone, Debug)]struct LogEntry { timestamp: u64, level: LogLevel, service: String, host: String, message: String, fields: HashMap<String, String>,}
#[derive(Clone, Copy, Debug, PartialEq)]enum LogLevel { Debug = 0, Info = 1, Warn = 2, Error = 3, Fatal = 4,}
/// Log analytics servicestruct LogAnalyticsService { engine: Arc<TimeSeriesEngine>,}
impl LogAnalyticsService { async fn new(data_path: &str) -> Result<Self, Box<dyn std::error::Error>> { let engine = Arc::new( TimeSeriesEngine::new(data_path, PartitionStrategy::Daily).await? );
Ok(Self { engine }) }
async fn ingest_log(&self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error>> { let mut tags = HashMap::new(); tags.insert("level".to_string(), format!("{:?}", entry.level)); tags.insert("service".to_string(), entry.service.clone()); tags.insert("host".to_string(), entry.host.clone()); tags.extend(entry.fields.clone());
// Store log level as numeric for aggregation let point = TimeSeriesPoint::with_tags( "logs.count", entry.timestamp, 1.0, tags, );
self.engine.write_points(&[point]).await?; Ok(()) }
/// Count logs by level in time range async fn count_by_level( &self, start: u64, end: u64, ) -> Result<HashMap<String, u64>, Box<dyn std::error::Error>> { let points = self.engine.query_range("logs.count", start, end).await?;
let mut counts: HashMap<String, u64> = HashMap::new(); for point in points { if let Some(level) = point.tags.get("level") { *counts.entry(level.clone()).or_insert(0) += 1; } }
Ok(counts) }
/// Get error rate per minute async fn error_rate( &self, service: &str, hours: u64, ) -> Result<Vec<(u64, f64)>, Box<dyn std::error::Error>> { let now = SystemTime::now() .duration_since(UNIX_EPOCH)? .as_millis() as u64; let start = now - (hours * 3600 * 1000);
let points = self.engine.query_range("logs.count", start, now).await?;
// Group by minute let mut minute_errors: HashMap<u64, (u64, u64)> = HashMap::new();
for point in points { let minute = (point.timestamp / 60000) * 60000; let entry = minute_errors.entry(minute).or_insert((0, 0));
entry.1 += 1; // Total count
if let Some(level) = point.tags.get("level") { if level == "Error" || level == "Fatal" { entry.0 += 1; // Error count } } }
let mut rates: Vec<(u64, f64)> = minute_errors .iter() .map(|(&ts, &(errors, total))| { let rate = if total > 0 { errors as f64 / total as f64 * 100.0 } else { 0.0 }; (ts, rate) }) .collect();
rates.sort_by_key(|(ts, _)| *ts); Ok(rates) }}
/// Extract metrics from log entriesfn extract_metrics_from_log(entry: &LogEntry) -> Vec<(String, f64)> { let mut metrics = Vec::new();
// Extract duration from log message if entry.message.contains("request completed in") { if let Some(duration) = extract_duration(&entry.message) { metrics.push(("request_duration_ms".to_string(), duration)); } }
// Extract from structured fields if let Some(bytes) = entry.fields.get("response_bytes") { if let Ok(bytes) = bytes.parse::<f64>() { metrics.push(("response_bytes".to_string(), bytes)); } }
metrics}
fn extract_duration(message: &str) -> Option<f64> { // Simple pattern matching for "completed in 123ms" let pattern = "completed in "; if let Some(pos) = message.find(pattern) { let start = pos + pattern.len(); let end = message[start..].find("ms").map(|p| start + p)?; message[start..end].trim().parse().ok() } else { None }}SQL Examples
Create Time-Series Tables
-- IoT sensor readings tableCREATE TABLE sensor_readings ( timestamp TIMESTAMPTZ NOT NULL, sensor_id VARCHAR(64) NOT NULL, facility VARCHAR(32) NOT NULL, temperature DOUBLE PRECISION, humidity DOUBLE PRECISION, pressure DOUBLE PRECISION, vibration DOUBLE PRECISION) WITH ( partition_by = 'HOUR', retention = '7 days', compression = 'gorilla');
-- Create continuous aggregate for hourly averagesCREATE MATERIALIZED VIEW sensor_hourly ASSELECT time_bucket('1 hour', timestamp) AS bucket, sensor_id, facility, AVG(temperature) AS avg_temp, AVG(humidity) AS avg_humidity, AVG(pressure) AS avg_pressure, AVG(vibration) AS avg_vibration, COUNT(*) AS sample_countFROM sensor_readingsGROUP BY bucket, sensor_id, facilityWITH ( continuous = true, refresh_interval = '5 minutes');Query Patterns
-- Last hour of raw dataSELECT timestamp, sensor_id, temperature, humidityFROM sensor_readingsWHERE timestamp > NOW() - INTERVAL '1 hour' AND sensor_id = 'sensor-001'ORDER BY timestamp;
-- Hourly averages for last 24 hoursSELECT time_bucket('1 hour', timestamp) AS hour, AVG(temperature) AS avg_temp, MAX(temperature) AS max_temp, MIN(temperature) AS min_tempFROM sensor_readingsWHERE timestamp > NOW() - INTERVAL '24 hours' AND sensor_id = 'sensor-001'GROUP BY hourORDER BY hour;
-- Gap filling with linear interpolationSELECT time_bucket_gapfill('5 minutes', timestamp) AS bucket, sensor_id, interpolate(avg(temperature)) AS temperatureFROM sensor_readingsWHERE timestamp BETWEEN '2025-01-01 00:00:00' AND '2025-01-01 12:00:00'GROUP BY bucket, sensor_idORDER BY bucket, sensor_id;
-- Detect anomalies (values > 3 standard deviations)WITH stats AS ( SELECT sensor_id, AVG(temperature) AS mean, STDDEV(temperature) AS stddev FROM sensor_readings WHERE timestamp > NOW() - INTERVAL '1 day' GROUP BY sensor_id)SELECT r.timestamp, r.sensor_id, r.temperature, (r.temperature - s.mean) / s.stddev AS z_scoreFROM sensor_readings rJOIN stats s ON r.sensor_id = s.sensor_idWHERE timestamp > NOW() - INTERVAL '1 hour' AND ABS(r.temperature - s.mean) > 3 * s.stddevORDER BY timestamp;
-- Moving averageSELECT timestamp, temperature, AVG(temperature) OVER ( ORDER BY timestamp ROWS BETWEEN 9 PRECEDING AND CURRENT ROW ) AS moving_avg_10FROM sensor_readingsWHERE sensor_id = 'sensor-001' AND timestamp > NOW() - INTERVAL '1 hour';
-- Rate of change per secondSELECT timestamp, temperature, (temperature - LAG(temperature) OVER (ORDER BY timestamp)) / EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp))) AS rate_per_secondFROM sensor_readingsWHERE sensor_id = 'sensor-001' AND timestamp > NOW() - INTERVAL '1 hour';
-- OHLCV bars for financial dataSELECT time_bucket('5 minutes', timestamp) AS bucket, symbol, FIRST(price, timestamp) AS open, MAX(price) AS high, MIN(price) AS low, LAST(price, timestamp) AS close, SUM(volume) AS volumeFROM market_ticksWHERE timestamp > NOW() - INTERVAL '1 day' AND symbol = 'AAPL'GROUP BY bucket, symbolORDER BY bucket;
-- Session-based analysisWITH sessions AS ( SELECT user_id, timestamp, SUM(CASE WHEN timestamp - LAG(timestamp) OVER ( PARTITION BY user_id ORDER BY timestamp ) > INTERVAL '30 minutes' THEN 1 ELSE 0 END) OVER (PARTITION BY user_id ORDER BY timestamp) AS session_id FROM user_events WHERE timestamp > NOW() - INTERVAL '1 day')SELECT user_id, session_id, MIN(timestamp) AS session_start, MAX(timestamp) AS session_end, COUNT(*) AS event_countFROM sessionsGROUP BY user_id, session_idORDER BY user_id, session_start;Retention and Downsampling via SQL
-- Set retention policyALTER TABLE sensor_readingsSET RETENTION POLICY '30 days';
-- Create downsampling policyCREATE DOWNSAMPLING POLICY ON sensor_readings EVERY '1 hour' WITH AGGREGATION ( temperature = AVG, humidity = AVG, pressure = AVG, vibration = AVG ) AFTER '7 days' RETAIN '365 days';
-- Force immediate compactionVACUUM (COMPACT) sensor_readings;See Also: README | Quick Start | User Guide