Skip to content

Time-Series Examples

Time-Series Examples

Version: 6.0 Last Updated: January 4, 2026

This document provides practical code examples for common time-series use cases in HeliosDB.


Table of Contents

  1. IoT Sensor Data Ingestion
  2. Financial Tick Data
  3. Metrics and Monitoring
  4. Log Analytics
  5. SQL Examples

IoT Sensor Data Ingestion

Scenario

Industrial IoT deployment with thousands of sensors reporting temperature, humidity, pressure, and vibration readings every second.

Rust Implementation

use heliosdb_storage::timeseries::{
TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
IngestionPipeline, IngestionConfig, RetentionPolicy,
DownsamplingConfig, DownsamplingTier, AggregationFunction,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use tokio::sync::mpsc;
/// IoT sensor reading
#[derive(Clone)]
struct SensorReading {
sensor_id: String,
facility: String,
temperature: f64,
humidity: f64,
pressure: f64,
vibration: f64,
timestamp: u64,
}
/// IoT ingestion pipeline
struct IoTIngestionService {
engine: Arc<TimeSeriesEngine>,
pipeline: IngestionPipeline,
}
impl IoTIngestionService {
async fn new(data_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
// Create engine with hourly partitioning for high-volume IoT data
let engine = Arc::new(
TimeSeriesEngine::new(data_path, PartitionStrategy::Hourly).await?
);
// Configure ingestion for high throughput
let config = IngestionConfig {
batch_size: 10000,
batch_timeout: Duration::from_millis(100),
write_workers: 8,
buffer_capacity: 100000,
handle_out_of_order: true,
max_time_skew: 30000, // 30 second tolerance for network delays
backfill_mode: false,
};
let pipeline = IngestionPipeline::new(
config,
engine.storage().clone(),
engine.compressor().clone(),
);
Ok(Self { engine, pipeline })
}
async fn configure_policies(&mut self) -> Result<(), Box<dyn std::error::Error>> {
// Raw data: 7 days retention
self.engine.set_retention_policy(
RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600))
).await;
// Configure multi-tier downsampling
// Raw -> 1 minute -> 1 hour -> 1 day
let downsampling = DownsamplingConfig::new(Duration::from_secs(60))
.with_aggregation(AggregationFunction::Average)
.add_tier(
DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(3600))
.with_retention(Duration::from_secs(30 * 24 * 3600))
)
.add_tier(
DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(7 * 24 * 3600))
);
self.engine.configure_downsampling("sensors.*", downsampling).await?;
Ok(())
}
async fn ingest_reading(&self, reading: SensorReading) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = HashMap::new();
tags.insert("sensor_id".to_string(), reading.sensor_id.clone());
tags.insert("facility".to_string(), reading.facility.clone());
let points = vec![
TimeSeriesPoint::with_tags(
"sensors.temperature",
reading.timestamp,
reading.temperature,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"sensors.humidity",
reading.timestamp,
reading.humidity,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"sensors.pressure",
reading.timestamp,
reading.pressure,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"sensors.vibration",
reading.timestamp,
reading.vibration,
tags,
),
];
self.pipeline.ingest_batch(&points).await?;
Ok(())
}
async fn ingest_batch(&self, readings: Vec<SensorReading>) -> Result<(), Box<dyn std::error::Error>> {
let mut points = Vec::with_capacity(readings.len() * 4);
for reading in readings {
let mut tags = HashMap::new();
tags.insert("sensor_id".to_string(), reading.sensor_id.clone());
tags.insert("facility".to_string(), reading.facility.clone());
points.push(TimeSeriesPoint::with_tags(
"sensors.temperature",
reading.timestamp,
reading.temperature,
tags.clone(),
));
points.push(TimeSeriesPoint::with_tags(
"sensors.humidity",
reading.timestamp,
reading.humidity,
tags.clone(),
));
points.push(TimeSeriesPoint::with_tags(
"sensors.pressure",
reading.timestamp,
reading.pressure,
tags.clone(),
));
points.push(TimeSeriesPoint::with_tags(
"sensors.vibration",
reading.timestamp,
reading.vibration,
tags,
));
}
self.pipeline.ingest_batch(&points).await?;
Ok(())
}
}
/// Query IoT data with aggregations
async fn query_sensor_data(
engine: &TimeSeriesEngine,
sensor_id: &str,
hours: u64,
) -> Result<(), Box<dyn std::error::Error>> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis() as u64;
let start = now - (hours * 3600 * 1000);
// Query raw temperature data
let points = engine.query_range("sensors.temperature", start, now).await?;
println!("Retrieved {} temperature readings for sensor {}", points.len(), sensor_id);
// Calculate statistics
if !points.is_empty() {
let values: Vec<f64> = points.iter().map(|p| p.value).collect();
let avg = values.iter().sum::<f64>() / values.len() as f64;
let min = values.iter().cloned().fold(f64::INFINITY, f64::min);
let max = values.iter().cloned().fold(f64::NEG_INFINITY, f64::max);
println!("Temperature: avg={:.2}C, min={:.2}C, max={:.2}C", avg, min, max);
}
Ok(())
}
/// Detect temperature anomalies
async fn detect_anomalies(
engine: &TimeSeriesEngine,
threshold_std_dev: f64,
) -> Result<Vec<(u64, f64)>, Box<dyn std::error::Error>> {
use heliosdb_compute::time_series::{TimeSeries, TimeSeriesOps, DataPoint, TSValue};
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis() as u64;
let start = now - 3600000; // Last hour
let points = engine.query_range("sensors.temperature", start, now).await?;
// Convert to TimeSeries format
let mut ts = TimeSeries::new();
for p in &points {
ts.add_point(DataPoint::new(
chrono::Utc.timestamp_millis_opt(p.timestamp as i64).single().unwrap(),
TSValue::Float(p.value),
));
}
let anomaly_indices = TimeSeriesOps::detect_anomalies(&ts, threshold_std_dev)?;
let anomalies: Vec<(u64, f64)> = anomaly_indices
.iter()
.map(|&i| (points[i].timestamp, points[i].value))
.collect();
Ok(anomalies)
}

Financial Tick Data

Scenario

High-frequency trading system processing millions of market data ticks per second with microsecond precision.

Rust Implementation

use heliosdb_storage::timeseries::{
TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
IngestionPipeline, IngestionConfig, DownsamplingConfig,
AggregationFunction,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
/// Market data tick
#[derive(Clone, Debug)]
struct MarketTick {
symbol: String,
exchange: String,
price: f64,
volume: f64,
bid: f64,
ask: f64,
timestamp: u64, // Microseconds since epoch
}
/// Financial data ingestion service
struct MarketDataService {
engine: Arc<TimeSeriesEngine>,
pipeline: IngestionPipeline,
}
impl MarketDataService {
async fn new(data_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
// Use hourly partitions for high-frequency data
let engine = Arc::new(
TimeSeriesEngine::new(data_path, PartitionStrategy::Hourly).await?
);
// Configure for maximum throughput
let config = IngestionConfig {
batch_size: 50000,
batch_timeout: Duration::from_millis(50),
write_workers: 16,
buffer_capacity: 500000,
handle_out_of_order: true,
max_time_skew: 5000, // 5 second tolerance
backfill_mode: false,
};
let pipeline = IngestionPipeline::new(
config,
engine.storage().clone(),
engine.compressor().clone(),
);
Ok(Self { engine, pipeline })
}
async fn ingest_tick(&self, tick: MarketTick) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = HashMap::new();
tags.insert("symbol".to_string(), tick.symbol.clone());
tags.insert("exchange".to_string(), tick.exchange.clone());
// Store multiple metrics per tick
let points = vec![
TimeSeriesPoint::with_tags(
"market.price",
tick.timestamp,
tick.price,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"market.volume",
tick.timestamp,
tick.volume,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"market.bid",
tick.timestamp,
tick.bid,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"market.ask",
tick.timestamp,
tick.ask,
tags.clone(),
),
TimeSeriesPoint::with_tags(
"market.spread",
tick.timestamp,
tick.ask - tick.bid,
tags,
),
];
self.pipeline.ingest_batch(&points).await?;
Ok(())
}
/// Calculate VWAP (Volume Weighted Average Price)
async fn calculate_vwap(
&self,
symbol: &str,
start_time: u64,
end_time: u64,
) -> Result<f64, Box<dyn std::error::Error>> {
let prices = self.engine.query_range("market.price", start_time, end_time).await?;
let volumes = self.engine.query_range("market.volume", start_time, end_time).await?;
let mut total_value = 0.0;
let mut total_volume = 0.0;
for (price, volume) in prices.iter().zip(volumes.iter()) {
total_value += price.value * volume.value;
total_volume += volume.value;
}
let vwap = if total_volume > 0.0 {
total_value / total_volume
} else {
0.0
};
Ok(vwap)
}
/// Get OHLCV (Open, High, Low, Close, Volume) bars
async fn get_ohlcv(
&self,
symbol: &str,
bar_duration: Duration,
start_time: u64,
end_time: u64,
) -> Result<Vec<OHLCVBar>, Box<dyn std::error::Error>> {
let prices = self.engine.query_range("market.price", start_time, end_time).await?;
let volumes = self.engine.query_range("market.volume", start_time, end_time).await?;
let bar_ms = bar_duration.as_millis() as u64;
let mut bars: Vec<OHLCVBar> = Vec::new();
let mut current_bar_start = (start_time / bar_ms) * bar_ms;
let mut bar_prices: Vec<f64> = Vec::new();
let mut bar_volume = 0.0;
for (price, volume) in prices.iter().zip(volumes.iter()) {
let bar_boundary = (price.timestamp / bar_ms) * bar_ms;
if bar_boundary != current_bar_start && !bar_prices.is_empty() {
bars.push(OHLCVBar {
timestamp: current_bar_start,
open: bar_prices[0],
high: bar_prices.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
low: bar_prices.iter().cloned().fold(f64::INFINITY, f64::min),
close: *bar_prices.last().unwrap(),
volume: bar_volume,
});
bar_prices.clear();
bar_volume = 0.0;
current_bar_start = bar_boundary;
}
bar_prices.push(price.value);
bar_volume += volume.value;
}
// Close final bar
if !bar_prices.is_empty() {
bars.push(OHLCVBar {
timestamp: current_bar_start,
open: bar_prices[0],
high: bar_prices.iter().cloned().fold(f64::NEG_INFINITY, f64::max),
low: bar_prices.iter().cloned().fold(f64::INFINITY, f64::min),
close: *bar_prices.last().unwrap(),
volume: bar_volume,
});
}
Ok(bars)
}
}
#[derive(Debug, Clone)]
struct OHLCVBar {
timestamp: u64,
open: f64,
high: f64,
low: f64,
close: f64,
volume: f64,
}
/// Calculate Bollinger Bands
fn bollinger_bands(prices: &[f64], period: usize, num_std: f64) -> Vec<(f64, f64, f64)> {
let mut bands = Vec::new();
for i in period..prices.len() {
let window = &prices[i-period..i];
let mean = window.iter().sum::<f64>() / period as f64;
let variance = window.iter()
.map(|p| (p - mean).powi(2))
.sum::<f64>() / period as f64;
let std_dev = variance.sqrt();
bands.push((
mean, // Middle band (SMA)
mean + num_std * std_dev, // Upper band
mean - num_std * std_dev, // Lower band
));
}
bands
}

Metrics and Monitoring

Scenario

Application performance monitoring with system metrics, custom business metrics, and distributed tracing.

Rust Implementation

use heliosdb_storage::timeseries::{
TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
RetentionPolicy, DownsamplingConfig, DownsamplingTier,
AggregationFunction,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
/// Metrics collector
struct MetricsCollector {
engine: Arc<TimeSeriesEngine>,
hostname: String,
service: String,
}
impl MetricsCollector {
async fn new(
data_path: &str,
hostname: String,
service: String,
) -> Result<Self, Box<dyn std::error::Error>> {
let engine = Arc::new(
TimeSeriesEngine::new(data_path, PartitionStrategy::Daily).await?
);
// Configure retention: raw 7 days, hourly 30 days, daily 365 days
engine.set_retention_policy(
RetentionPolicy::new(Duration::from_secs(7 * 24 * 3600))
).await;
// Configure downsampling
engine.configure_downsampling(
"system.*",
DownsamplingConfig::new(Duration::from_secs(60))
.with_aggregation(AggregationFunction::Average)
.add_tier(
DownsamplingTier::new(Duration::from_secs(3600), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(7 * 24 * 3600))
)
.add_tier(
DownsamplingTier::new(Duration::from_secs(86400), AggregationFunction::Average)
.with_age_threshold(Duration::from_secs(30 * 24 * 3600))
),
).await?;
Ok(Self { engine, hostname, service })
}
fn now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis() as u64
}
/// Record gauge metric
async fn gauge(&self, name: &str, value: f64, extra_tags: HashMap<String, String>) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = self.base_tags();
tags.extend(extra_tags);
let point = TimeSeriesPoint::with_tags(
format!("system.{}", name),
Self::now(),
value,
tags,
);
self.engine.write_points(&[point]).await?;
Ok(())
}
/// Record counter (increment)
async fn counter(&self, name: &str, increment: f64, extra_tags: HashMap<String, String>) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = self.base_tags();
tags.extend(extra_tags);
let point = TimeSeriesPoint::with_tags(
format!("system.{}.count", name),
Self::now(),
increment,
tags,
);
self.engine.write_points(&[point]).await?;
Ok(())
}
/// Record histogram observation
async fn histogram(&self, name: &str, value: f64, extra_tags: HashMap<String, String>) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = self.base_tags();
tags.extend(extra_tags);
// Store raw observation
let point = TimeSeriesPoint::with_tags(
format!("system.{}", name),
Self::now(),
value,
tags,
);
self.engine.write_points(&[point]).await?;
Ok(())
}
fn base_tags(&self) -> HashMap<String, String> {
let mut tags = HashMap::new();
tags.insert("host".to_string(), self.hostname.clone());
tags.insert("service".to_string(), self.service.clone());
tags
}
}
/// Collect system metrics
async fn collect_system_metrics(collector: &MetricsCollector) -> Result<(), Box<dyn std::error::Error>> {
// Simulated system metrics (in production, use actual system calls)
let cpu_usage = 45.5;
let memory_used = 62.3;
let disk_usage = 78.1;
let network_rx = 1024.0 * 1024.0 * 150.0; // 150 MB/s
let network_tx = 1024.0 * 1024.0 * 75.0; // 75 MB/s
collector.gauge("cpu_percent", cpu_usage, HashMap::new()).await?;
collector.gauge("memory_percent", memory_used, HashMap::new()).await?;
collector.gauge("disk_percent", disk_usage, HashMap::new()).await?;
collector.gauge("network_rx_bytes", network_rx, HashMap::new()).await?;
collector.gauge("network_tx_bytes", network_tx, HashMap::new()).await?;
Ok(())
}
/// Record HTTP request metrics
async fn record_http_request(
collector: &MetricsCollector,
method: &str,
path: &str,
status_code: u16,
duration_ms: f64,
) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = HashMap::new();
tags.insert("method".to_string(), method.to_string());
tags.insert("path".to_string(), path.to_string());
tags.insert("status".to_string(), status_code.to_string());
// Record request count
collector.counter("http_requests", 1.0, tags.clone()).await?;
// Record latency
collector.histogram("http_request_duration_ms", duration_ms, tags).await?;
Ok(())
}
/// Query percentiles for latency analysis
async fn get_latency_percentiles(
engine: &TimeSeriesEngine,
metric: &str,
hours: u64,
) -> Result<LatencyStats, Box<dyn std::error::Error>> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis() as u64;
let start = now - (hours * 3600 * 1000);
let points = engine.query_range(metric, start, now).await?;
let mut values: Vec<f64> = points.iter().map(|p| p.value).collect();
values.sort_by(|a, b| a.partial_cmp(b).unwrap());
let percentile = |p: f64| -> f64 {
if values.is_empty() { return 0.0; }
let index = ((p / 100.0) * (values.len() - 1) as f64) as usize;
values[index]
};
Ok(LatencyStats {
count: values.len(),
avg: values.iter().sum::<f64>() / values.len().max(1) as f64,
p50: percentile(50.0),
p90: percentile(90.0),
p95: percentile(95.0),
p99: percentile(99.0),
max: values.last().copied().unwrap_or(0.0),
})
}
#[derive(Debug)]
struct LatencyStats {
count: usize,
avg: f64,
p50: f64,
p90: f64,
p95: f64,
p99: f64,
max: f64,
}

Log Analytics

Scenario

Centralized log aggregation with structured logging, full-text search, and metrics extraction.

Rust Implementation

use heliosdb_storage::timeseries::{
TimeSeriesEngine, TimeSeriesPoint, PartitionStrategy,
};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
/// Log entry
#[derive(Clone, Debug)]
struct LogEntry {
timestamp: u64,
level: LogLevel,
service: String,
host: String,
message: String,
fields: HashMap<String, String>,
}
#[derive(Clone, Copy, Debug, PartialEq)]
enum LogLevel {
Debug = 0,
Info = 1,
Warn = 2,
Error = 3,
Fatal = 4,
}
/// Log analytics service
struct LogAnalyticsService {
engine: Arc<TimeSeriesEngine>,
}
impl LogAnalyticsService {
async fn new(data_path: &str) -> Result<Self, Box<dyn std::error::Error>> {
let engine = Arc::new(
TimeSeriesEngine::new(data_path, PartitionStrategy::Daily).await?
);
Ok(Self { engine })
}
async fn ingest_log(&self, entry: LogEntry) -> Result<(), Box<dyn std::error::Error>> {
let mut tags = HashMap::new();
tags.insert("level".to_string(), format!("{:?}", entry.level));
tags.insert("service".to_string(), entry.service.clone());
tags.insert("host".to_string(), entry.host.clone());
tags.extend(entry.fields.clone());
// Store log level as numeric for aggregation
let point = TimeSeriesPoint::with_tags(
"logs.count",
entry.timestamp,
1.0,
tags,
);
self.engine.write_points(&[point]).await?;
Ok(())
}
/// Count logs by level in time range
async fn count_by_level(
&self,
start: u64,
end: u64,
) -> Result<HashMap<String, u64>, Box<dyn std::error::Error>> {
let points = self.engine.query_range("logs.count", start, end).await?;
let mut counts: HashMap<String, u64> = HashMap::new();
for point in points {
if let Some(level) = point.tags.get("level") {
*counts.entry(level.clone()).or_insert(0) += 1;
}
}
Ok(counts)
}
/// Get error rate per minute
async fn error_rate(
&self,
service: &str,
hours: u64,
) -> Result<Vec<(u64, f64)>, Box<dyn std::error::Error>> {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)?
.as_millis() as u64;
let start = now - (hours * 3600 * 1000);
let points = self.engine.query_range("logs.count", start, now).await?;
// Group by minute
let mut minute_errors: HashMap<u64, (u64, u64)> = HashMap::new();
for point in points {
let minute = (point.timestamp / 60000) * 60000;
let entry = minute_errors.entry(minute).or_insert((0, 0));
entry.1 += 1; // Total count
if let Some(level) = point.tags.get("level") {
if level == "Error" || level == "Fatal" {
entry.0 += 1; // Error count
}
}
}
let mut rates: Vec<(u64, f64)> = minute_errors
.iter()
.map(|(&ts, &(errors, total))| {
let rate = if total > 0 {
errors as f64 / total as f64 * 100.0
} else {
0.0
};
(ts, rate)
})
.collect();
rates.sort_by_key(|(ts, _)| *ts);
Ok(rates)
}
}
/// Extract metrics from log entries
fn extract_metrics_from_log(entry: &LogEntry) -> Vec<(String, f64)> {
let mut metrics = Vec::new();
// Extract duration from log message
if entry.message.contains("request completed in") {
if let Some(duration) = extract_duration(&entry.message) {
metrics.push(("request_duration_ms".to_string(), duration));
}
}
// Extract from structured fields
if let Some(bytes) = entry.fields.get("response_bytes") {
if let Ok(bytes) = bytes.parse::<f64>() {
metrics.push(("response_bytes".to_string(), bytes));
}
}
metrics
}
fn extract_duration(message: &str) -> Option<f64> {
// Simple pattern matching for "completed in 123ms"
let pattern = "completed in ";
if let Some(pos) = message.find(pattern) {
let start = pos + pattern.len();
let end = message[start..].find("ms").map(|p| start + p)?;
message[start..end].trim().parse().ok()
} else {
None
}
}

SQL Examples

Create Time-Series Tables

-- IoT sensor readings table
CREATE TABLE sensor_readings (
timestamp TIMESTAMPTZ NOT NULL,
sensor_id VARCHAR(64) NOT NULL,
facility VARCHAR(32) NOT NULL,
temperature DOUBLE PRECISION,
humidity DOUBLE PRECISION,
pressure DOUBLE PRECISION,
vibration DOUBLE PRECISION
) WITH (
partition_by = 'HOUR',
retention = '7 days',
compression = 'gorilla'
);
-- Create continuous aggregate for hourly averages
CREATE MATERIALIZED VIEW sensor_hourly AS
SELECT
time_bucket('1 hour', timestamp) AS bucket,
sensor_id,
facility,
AVG(temperature) AS avg_temp,
AVG(humidity) AS avg_humidity,
AVG(pressure) AS avg_pressure,
AVG(vibration) AS avg_vibration,
COUNT(*) AS sample_count
FROM sensor_readings
GROUP BY bucket, sensor_id, facility
WITH (
continuous = true,
refresh_interval = '5 minutes'
);

Query Patterns

-- Last hour of raw data
SELECT timestamp, sensor_id, temperature, humidity
FROM sensor_readings
WHERE timestamp > NOW() - INTERVAL '1 hour'
AND sensor_id = 'sensor-001'
ORDER BY timestamp;
-- Hourly averages for last 24 hours
SELECT
time_bucket('1 hour', timestamp) AS hour,
AVG(temperature) AS avg_temp,
MAX(temperature) AS max_temp,
MIN(temperature) AS min_temp
FROM sensor_readings
WHERE timestamp > NOW() - INTERVAL '24 hours'
AND sensor_id = 'sensor-001'
GROUP BY hour
ORDER BY hour;
-- Gap filling with linear interpolation
SELECT
time_bucket_gapfill('5 minutes', timestamp) AS bucket,
sensor_id,
interpolate(avg(temperature)) AS temperature
FROM sensor_readings
WHERE timestamp BETWEEN '2025-01-01 00:00:00' AND '2025-01-01 12:00:00'
GROUP BY bucket, sensor_id
ORDER BY bucket, sensor_id;
-- Detect anomalies (values > 3 standard deviations)
WITH stats AS (
SELECT
sensor_id,
AVG(temperature) AS mean,
STDDEV(temperature) AS stddev
FROM sensor_readings
WHERE timestamp > NOW() - INTERVAL '1 day'
GROUP BY sensor_id
)
SELECT r.timestamp, r.sensor_id, r.temperature,
(r.temperature - s.mean) / s.stddev AS z_score
FROM sensor_readings r
JOIN stats s ON r.sensor_id = s.sensor_id
WHERE timestamp > NOW() - INTERVAL '1 hour'
AND ABS(r.temperature - s.mean) > 3 * s.stddev
ORDER BY timestamp;
-- Moving average
SELECT
timestamp,
temperature,
AVG(temperature) OVER (
ORDER BY timestamp
ROWS BETWEEN 9 PRECEDING AND CURRENT ROW
) AS moving_avg_10
FROM sensor_readings
WHERE sensor_id = 'sensor-001'
AND timestamp > NOW() - INTERVAL '1 hour';
-- Rate of change per second
SELECT
timestamp,
temperature,
(temperature - LAG(temperature) OVER (ORDER BY timestamp)) /
EXTRACT(EPOCH FROM (timestamp - LAG(timestamp) OVER (ORDER BY timestamp)))
AS rate_per_second
FROM sensor_readings
WHERE sensor_id = 'sensor-001'
AND timestamp > NOW() - INTERVAL '1 hour';
-- OHLCV bars for financial data
SELECT
time_bucket('5 minutes', timestamp) AS bucket,
symbol,
FIRST(price, timestamp) AS open,
MAX(price) AS high,
MIN(price) AS low,
LAST(price, timestamp) AS close,
SUM(volume) AS volume
FROM market_ticks
WHERE timestamp > NOW() - INTERVAL '1 day'
AND symbol = 'AAPL'
GROUP BY bucket, symbol
ORDER BY bucket;
-- Session-based analysis
WITH sessions AS (
SELECT
user_id,
timestamp,
SUM(CASE
WHEN timestamp - LAG(timestamp) OVER (
PARTITION BY user_id ORDER BY timestamp
) > INTERVAL '30 minutes'
THEN 1 ELSE 0
END) OVER (PARTITION BY user_id ORDER BY timestamp) AS session_id
FROM user_events
WHERE timestamp > NOW() - INTERVAL '1 day'
)
SELECT
user_id,
session_id,
MIN(timestamp) AS session_start,
MAX(timestamp) AS session_end,
COUNT(*) AS event_count
FROM sessions
GROUP BY user_id, session_id
ORDER BY user_id, session_start;

Retention and Downsampling via SQL

-- Set retention policy
ALTER TABLE sensor_readings
SET RETENTION POLICY '30 days';
-- Create downsampling policy
CREATE DOWNSAMPLING POLICY ON sensor_readings
EVERY '1 hour'
WITH AGGREGATION (
temperature = AVG,
humidity = AVG,
pressure = AVG,
vibration = AVG
)
AFTER '7 days'
RETAIN '365 days';
-- Force immediate compaction
VACUUM (COMPACT) sensor_readings;

See Also: README | Quick Start | User Guide