IndustryManufacturing / Industrial IoT
SolutionHeliosDB-Lite (embedded, 60MB ARM64)
Cost Reduction85% ($12.4K to $1.9K)
Query Latency (P95)4.7ms (was 340ms)

The Challenge

A precision manufacturing company operates three facilities producing aerospace-grade components. Each facility runs 150-200 IoT sensors monitoring CNC machines, environmental controls, vibration analyzers, and quality inspection stations. The sensors collectively generate approximately 2.3 million data points per hour -- temperature readings, vibration frequencies, tool wear measurements, dimensional tolerances, and machine state transitions.

Their original architecture streamed all sensor data to a cloud-hosted InfluxDB cluster for time-series storage, with a separate PostgreSQL instance on RDS for relational data (work orders, machine configurations, quality specs) and a custom Node.js analytics service that joined data across both databases to detect anomalies and predict tool failures. The total monthly cloud bill for this infrastructure was $12,400, of which $8,200 was data transfer and ingestion costs alone -- the price of pushing 2.3 million data points per hour from three facilities to a centralized cloud.

The architecture had a critical operational flaw: it depended on continuous internet connectivity. When the network went down -- which happened 47 times over the previous six months, averaging 23 minutes per outage -- the facilities lost all analytics capability. During one 4-hour outage caused by a regional ISP failure, operators had no visibility into machine health. A CNC spindle bearing that was showing progressive degradation went undetected, resulting in a catastrophic failure that destroyed a $180,000 titanium billet mid-cut. The insurance claim covered the material, but the 3-day production delay cost the company an estimated $420,000 in late delivery penalties.

Why InfluxDB + PostgreSQL + Cloud Analytics Wasn't Enough

Problem Impact
$8.2K/month data transfer costs65% of total infrastructure spend was just moving data to the cloud
Network dependency47 outages in 6 months; each one blinded operators to machine health
Query latency (P95)340ms round-trip to cloud -- too slow for real-time machine control
Two databases to queryTime-series in InfluxDB, relational in PostgreSQL -- no native joins
InfluxDB query limitationsFlux/InfluxQL cannot express complex joins with relational data
No local analyticsAll computation in the cloud; edge gateways were just data forwarders
Data sovereignty concernsSome defense contracts required sensor data to remain on-premises
Schema rigidityAdding a new sensor type required coordinated InfluxDB + PostgreSQL schema changes

The HeliosDB Solution

The team deployed HeliosDB-Lite directly on industrial edge gateways (Advantech UNO-2484G, ARM64, 8GB RAM) at each facility. The 60MB single binary runs as an embedded database within their analytics application, handling both time-series ingestion and relational queries locally. A lightweight sync service replicates aggregated data to a central dashboard when connectivity is available, but each facility operates fully independently.

Schema Design

-- Machine registry and configuration (relational data)
CREATE TABLE machines (
    id SERIAL PRIMARY KEY,
    facility_id INT NOT NULL,
    machine_type VARCHAR(50) NOT NULL,     -- 'cnc_5axis', 'cmm', 'furnace'
    model VARCHAR(100) NOT NULL,
    location VARCHAR(50),                   -- 'bay_3_north'
    commissioned_date DATE,
    maintenance_interval_hours INT DEFAULT 500,
    metadata JSONB
);

-- Sensor definitions
CREATE TABLE sensors (
    id SERIAL PRIMARY KEY,
    machine_id INT NOT NULL REFERENCES machines(id),
    sensor_type VARCHAR(50) NOT NULL,       -- 'temperature', 'vibration', 'pressure'
    unit VARCHAR(20) NOT NULL,              -- 'celsius', 'mm_s', 'bar'
    min_threshold DECIMAL(10, 4),
    max_threshold DECIMAL(10, 4),
    sample_rate_hz INT DEFAULT 1
);

-- Time-series sensor readings
-- Partitioned by time range for efficient queries and automatic data lifecycle
CREATE TABLE sensor_readings (
    sensor_id INT NOT NULL REFERENCES sensors(id),
    recorded_at TIMESTAMP NOT NULL,
    value DECIMAL(14, 6) NOT NULL,
    quality_flag SMALLINT DEFAULT 0,        -- 0=good, 1=suspect, 2=bad
    PRIMARY KEY (sensor_id, recorded_at)
) PARTITION BY RANGE (recorded_at);

-- Create monthly partitions
CREATE TABLE sensor_readings_2025_01 PARTITION OF sensor_readings
    FOR VALUES FROM ('2025-01-01') TO ('2025-02-01');
CREATE TABLE sensor_readings_2025_02 PARTITION OF sensor_readings
    FOR VALUES FROM ('2025-02-01') TO ('2025-03-01');
-- ... additional partitions created by maintenance scheduler

-- Anomaly events detected by local analytics
CREATE TABLE anomalies (
    id BIGSERIAL PRIMARY KEY,
    sensor_id INT NOT NULL REFERENCES sensors(id),
    detected_at TIMESTAMP NOT NULL DEFAULT NOW(),
    anomaly_type VARCHAR(50) NOT NULL,      -- 'threshold_breach', 'trend_deviation', 'spike'
    severity VARCHAR(10) NOT NULL,          -- 'info', 'warning', 'critical'
    value DECIMAL(14, 6),
    threshold DECIMAL(14, 6),
    context JSONB,                          -- additional diagnostic data
    acknowledged BOOLEAN DEFAULT FALSE,
    acknowledged_by VARCHAR(100)
);

-- Indexes optimized for edge query patterns
CREATE INDEX idx_readings_time ON sensor_readings (recorded_at DESC);
CREATE INDEX idx_readings_sensor_time ON sensor_readings (sensor_id, recorded_at DESC);
CREATE INDEX idx_anomalies_severity ON anomalies (severity, detected_at DESC)
    WHERE acknowledged = FALSE;

High-Throughput Sensor Ingestion (Go)

package main

import (
	"context"
	"fmt"
	"log"
	"sync"
	"time"

	"github.com/jackc/pgx/v5"
	"github.com/jackc/pgx/v5/pgxpool"
)

// SensorReading represents a single sensor data point.
type SensorReading struct {
	SensorID   int
	RecordedAt time.Time
	Value      float64
	Quality    int
}

// EdgeIngester handles high-throughput sensor data ingestion into HeliosDB-Lite.
// Batches readings for efficient bulk inserts while maintaining low latency.
type EdgeIngester struct {
	pool      *pgxpool.Pool
	buffer    []SensorReading
	mu        sync.Mutex
	batchSize int
	flushInterval time.Duration
}

func NewEdgeIngester(dsn string) (*EdgeIngester, error) {
	config, err := pgxpool.ParseConfig(dsn)
	if err != nil {
		return nil, fmt.Errorf("parse config: %w", err)
	}

	// Tuned for edge gateway (8GB RAM, ARM64)
	config.MaxConns = 4
	config.MinConns = 2

	pool, err := pgxpool.NewWithConfig(context.Background(), config)
	if err != nil {
		return nil, fmt.Errorf("create pool: %w", err)
	}

	ei := &EdgeIngester{
		pool:          pool,
		buffer:        make([]SensorReading, 0, 1000),
		batchSize:     500,
		flushInterval: 100 * time.Millisecond,
	}

	go ei.periodicFlush()
	return ei, nil
}

// Ingest adds a reading to the buffer. Non-blocking.
func (ei *EdgeIngester) Ingest(reading SensorReading) {
	ei.mu.Lock()
	ei.buffer = append(ei.buffer, reading)
	shouldFlush := len(ei.buffer) >= ei.batchSize
	ei.mu.Unlock()

	if shouldFlush {
		ei.flush()
	}
}

func (ei *EdgeIngester) flush() {
	ei.mu.Lock()
	if len(ei.buffer) == 0 {
		ei.mu.Unlock()
		return
	}
	batch := ei.buffer
	ei.buffer = make([]SensorReading, 0, ei.batchSize)
	ei.mu.Unlock()

	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	// Use COPY protocol for maximum throughput
	// HeliosDB-Lite supports this via the PostgreSQL wire protocol
	_, err := ei.pool.CopyFrom(
		ctx,
		pgx.Identifier{"sensor_readings"},
		[]string{"sensor_id", "recorded_at", "value", "quality_flag"},
		pgx.CopyFromSlice(len(batch), func(i int) ([]any, error) {
			r := batch[i]
			return []any{r.SensorID, r.RecordedAt, r.Value, r.Quality}, nil
		}),
	)
	if err != nil {
		log.Printf("batch insert failed (%d readings): %v", len(batch), err)
		// On failure, readings are logged to a local WAL recovery file
		// and retried on next flush cycle
	}
}

func (ei *EdgeIngester) periodicFlush() {
	ticker := time.NewTicker(ei.flushInterval)
	defer ticker.Stop()
	for range ticker.C {
		ei.flush()
	}
}

func main() {
	// Connect to HeliosDB-Lite running locally on the edge gateway
	ingester, err := NewEdgeIngester(
		"postgresql://edge:password@localhost:5433/factory_floor",
	)
	if err != nil {
		log.Fatalf("failed to create ingester: %v", err)
	}

	// Simulate sensor readings (in production, these come from MQTT/OPC-UA)
	for i := 0; i < 500; i++ {
		ingester.Ingest(SensorReading{
			SensorID:   (i % 200) + 1,
			RecordedAt: time.Now(),
			Value:      22.5 + float64(i%100)*0.01,
			Quality:    0,
		})
	}

	// Wait for flush
	time.Sleep(200 * time.Millisecond)
	log.Println("batch ingested successfully")
}

Real-Time Anomaly Detection (SQL)

Queries that run locally on the edge gateway with 5ms P95 latency:

-- Rolling average with anomaly detection using window functions
-- This query runs every 5 seconds on the edge gateway
-- Previously required a cloud round-trip (340ms) -- now completes in <5ms

WITH recent_readings AS (
    SELECT
        s.id AS sensor_id,
        s.sensor_type,
        m.machine_type,
        m.location,
        sr.recorded_at,
        sr.value,
        s.min_threshold,
        s.max_threshold,
        AVG(sr.value) OVER (
            PARTITION BY sr.sensor_id
            ORDER BY sr.recorded_at
            ROWS BETWEEN 60 PRECEDING AND CURRENT ROW
        ) AS rolling_avg_60s,
        STDDEV(sr.value) OVER (
            PARTITION BY sr.sensor_id
            ORDER BY sr.recorded_at
            ROWS BETWEEN 300 PRECEDING AND CURRENT ROW
        ) AS stddev_5min
    FROM sensor_readings sr
    JOIN sensors s ON sr.sensor_id = s.id
    JOIN machines m ON s.machine_id = m.id
    WHERE sr.recorded_at >= NOW() - INTERVAL '10 minutes'
)
SELECT
    sensor_id,
    sensor_type,
    machine_type,
    location,
    recorded_at,
    value,
    rolling_avg_60s,
    stddev_5min,
    CASE
        WHEN value > max_threshold THEN 'THRESHOLD_HIGH'
        WHEN value < min_threshold THEN 'THRESHOLD_LOW'
        WHEN stddev_5min > 0
             AND ABS(value - rolling_avg_60s) > 3 * stddev_5min THEN 'STATISTICAL_OUTLIER'
        WHEN rolling_avg_60s > max_threshold * 0.85 THEN 'TRENDING_HIGH'
        ELSE 'NORMAL'
    END AS status
FROM recent_readings
WHERE value > max_threshold
   OR value < min_threshold
   OR (stddev_5min > 0 AND ABS(value - rolling_avg_60s) > 3 * stddev_5min)
   OR rolling_avg_60s > max_threshold * 0.85
ORDER BY recorded_at DESC;
-- Tool wear prediction: correlate vibration trends with operational hours
-- Joins time-series data with relational machine configuration in a single query
-- This was impossible in InfluxDB alone and required a separate PostgreSQL lookup

WITH machine_hours AS (
    SELECT
        m.id AS machine_id,
        m.model,
        m.location,
        m.maintenance_interval_hours,
        COUNT(DISTINCT DATE_TRUNC('hour', sr.recorded_at)) AS operational_hours
    FROM machines m
    JOIN sensors s ON s.machine_id = m.id
    JOIN sensor_readings sr ON sr.sensor_id = s.id
    WHERE s.sensor_type = 'spindle_load'
      AND sr.value > 5.0                    -- machine is actively cutting
      AND sr.recorded_at >= m.commissioned_date
    GROUP BY m.id, m.model, m.location, m.maintenance_interval_hours
),
vibration_trend AS (
    SELECT
        s.machine_id,
        AVG(sr.value) AS avg_vibration_7d,
        MAX(sr.value) AS peak_vibration_7d,
        STDDEV(sr.value) AS vibration_variance_7d
    FROM sensors s
    JOIN sensor_readings sr ON sr.sensor_id = s.id
    WHERE s.sensor_type = 'vibration'
      AND sr.recorded_at >= NOW() - INTERVAL '7 days'
    GROUP BY s.machine_id
)
SELECT
    mh.machine_id,
    mh.model,
    mh.location,
    mh.operational_hours,
    mh.maintenance_interval_hours,
    mh.maintenance_interval_hours - mh.operational_hours AS hours_until_maintenance,
    vt.avg_vibration_7d,
    vt.peak_vibration_7d,
    vt.vibration_variance_7d,
    CASE
        WHEN vt.vibration_variance_7d > 2.0
             AND mh.operational_hours > mh.maintenance_interval_hours * 0.7
            THEN 'SCHEDULE_IMMEDIATELY'
        WHEN vt.avg_vibration_7d > 8.0
            THEN 'INSPECT_WITHIN_24H'
        WHEN mh.operational_hours > mh.maintenance_interval_hours * 0.9
            THEN 'APPROACHING_INTERVAL'
        ELSE 'NORMAL'
    END AS recommendation
FROM machine_hours mh
JOIN vibration_trend vt ON mh.machine_id = vt.machine_id
ORDER BY
    CASE
        WHEN vt.vibration_variance_7d > 2.0 THEN 0
        WHEN vt.avg_vibration_7d > 8.0 THEN 1
        ELSE 2
    END,
    hours_until_maintenance ASC;

Edge-to-Cloud Sync (Python)

Lightweight aggregation sync that runs when connectivity is available:

import psycopg2
import requests
import json
from datetime import datetime, timedelta

class EdgeCloudSync:
    """
    Synchronizes aggregated analytics from edge HeliosDB-Lite instances
    to the central dashboard. Only aggregates are sent -- raw sensor data
    stays on-premises (data sovereignty requirement).

    Sync is opportunistic: runs when network is available, queues when not.
    Each facility operates fully independently during outages.
    """

    def __init__(self, local_dsn: str, central_api: str, facility_id: int):
        self.local = psycopg2.connect(local_dsn)
        self.central_api = central_api
        self.facility_id = facility_id

    def sync_hourly_aggregates(self) -> dict:
        """
        Compute hourly aggregates locally and send to central dashboard.
        Raw data never leaves the facility.
        """
        with self.local.cursor() as cur:
            cur.execute("""
                SELECT
                    s.machine_id,
                    s.sensor_type,
                    DATE_TRUNC('hour', sr.recorded_at) AS hour,
                    COUNT(*) AS sample_count,
                    AVG(sr.value) AS avg_value,
                    MIN(sr.value) AS min_value,
                    MAX(sr.value) AS max_value,
                    STDDEV(sr.value) AS stddev_value,
                    SUM(CASE WHEN sr.quality_flag > 0 THEN 1 ELSE 0 END)
                        AS bad_quality_count
                FROM sensor_readings sr
                JOIN sensors s ON sr.sensor_id = s.id
                WHERE sr.recorded_at >= NOW() - INTERVAL '2 hours'
                  AND sr.recorded_at < DATE_TRUNC('hour', NOW())
                GROUP BY s.machine_id, s.sensor_type,
                         DATE_TRUNC('hour', sr.recorded_at)
                ORDER BY hour DESC
            """)

            aggregates = [
                {
                    "facility_id": self.facility_id,
                    "machine_id": row[0],
                    "sensor_type": row[1],
                    "hour": row[2].isoformat(),
                    "sample_count": row[3],
                    "avg_value": float(row[4]),
                    "min_value": float(row[5]),
                    "max_value": float(row[6]),
                    "stddev_value": float(row[7]) if row[7] else 0.0,
                    "bad_quality_count": row[8],
                }
                for row in cur.fetchall()
            ]

        # Send aggregates to central dashboard (best-effort)
        try:
            resp = requests.post(
                f"{self.central_api}/api/v1/aggregates",
                json={"facility_id": self.facility_id, "data": aggregates},
                timeout=10,
            )
            return {
                "status": "synced",
                "records": len(aggregates),
                "http_status": resp.status_code,
            }
        except requests.ConnectionError:
            # Network unavailable -- will retry next cycle.
            # Local analytics continue uninterrupted.
            return {"status": "queued", "records": len(aggregates)}

    def sync_anomalies(self) -> dict:
        """Sync unacknowledged anomalies to central alerting system."""
        with self.local.cursor() as cur:
            cur.execute("""
                SELECT
                    a.id,
                    a.sensor_id,
                    s.sensor_type,
                    m.machine_type,
                    m.location,
                    a.detected_at,
                    a.anomaly_type,
                    a.severity,
                    a.value,
                    a.threshold,
                    a.context
                FROM anomalies a
                JOIN sensors s ON a.sensor_id = s.id
                JOIN machines m ON s.machine_id = m.id
                WHERE a.acknowledged = FALSE
                  AND a.severity IN ('warning', 'critical')
                ORDER BY a.detected_at DESC
                LIMIT 100
            """)

            anomalies = [
                {
                    "edge_anomaly_id": row[0],
                    "facility_id": self.facility_id,
                    "sensor_id": row[1],
                    "sensor_type": row[2],
                    "machine_type": row[3],
                    "location": row[4],
                    "detected_at": row[5].isoformat(),
                    "anomaly_type": row[6],
                    "severity": row[7],
                    "value": float(row[8]),
                    "threshold": float(row[9]),
                    "context": row[10],
                }
                for row in cur.fetchall()
            ]

        try:
            resp = requests.post(
                f"{self.central_api}/api/v1/anomalies",
                json={"facility_id": self.facility_id, "data": anomalies},
                timeout=10,
            )
            return {
                "status": "synced",
                "anomalies": len(anomalies),
                "http_status": resp.status_code,
            }
        except requests.ConnectionError:
            return {"status": "queued", "anomalies": len(anomalies)}


# Runs on cron every 5 minutes on the edge gateway
syncer = EdgeCloudSync(
    local_dsn="postgresql://edge:password@localhost:5433/factory_floor",
    central_api="https://analytics.internal.example.com",
    facility_id=3,
)

agg_result = syncer.sync_hourly_aggregates()
print(f"Aggregates: {agg_result}")

anomaly_result = syncer.sync_anomalies()
print(f"Anomalies: {anomaly_result}")

Results

Metric Before (InfluxDB + PG + Cloud) After (HeliosDB-Lite on Edge) Improvement
Monthly cloud data transfer$8,200$0Eliminated
Total monthly infrastructure$12,400$1,800 (edge hardware amortized)85% reduction
Query latency (P50)120ms (cloud round-trip)1.8ms (local)98.5% faster
Query latency (P95)340ms4.7ms98.6% faster
Network outage impactTotal analytics blackoutZero (fully autonomous)Eliminated
Anomaly detection delay2-5s (cloud processing)<50ms (local)97-99% faster
Binary sizeN/A (cloud-hosted)60MB (ARM64)Runs on edge hardware
Memory usageN/A (cloud)1.2GB average on 8GB gatewayFits edge constraints
Databases to manage2 (InfluxDB + PostgreSQL)1 (HeliosDB-Lite)50% fewer
Data sovereigntyData leaves facilityData stays on-premisesCompliant
Unplanned downtime incidents47 outages in 6 months0 (locally autonomous)Eliminated

Cost Breakdown

Cost Category Before (Monthly) After (Monthly) Notes
Cloud data transfer / ingestion$8,200$0All processing local
InfluxDB Cloud (time-series)$2,400$0Replaced by HeliosDB-Lite
RDS PostgreSQL (relational)$1,200$0Replaced by HeliosDB-Lite
Custom analytics compute (EC2)$600$0Runs on edge gateways
Edge gateway hardware (amortized)$0$1,8003 gateways at $600/mo amortized over 3 years
Central dashboard (minimal)$0$120Small VM for aggregated views
Total$12,400$1,92084.5% reduction

Key Takeaways

  • A 60MB binary replaced $8K/month in cloud data transfer. The fundamental insight was that most analytics queries only need recent local data. By running HeliosDB-Lite directly on ARM64 edge gateways, sensor data never needs to leave the facility for real-time analytics. Only pre-computed aggregates are synced to the central dashboard.
  • Offline operation prevented a $420K+ incident from recurring. The previous architecture's total dependency on cloud connectivity meant any network outage blinded operators. HeliosDB-Lite runs fully autonomously on each edge gateway. During the 47 network outages in the evaluation period, all three facilities maintained complete analytics capability with zero interruption.
  • Unifying time-series and relational data eliminated query complexity. InfluxDB could store sensor readings efficiently but could not join with machine configuration, maintenance schedules, or quality specifications. HeliosDB-Lite handles both workloads, enabling queries like "show vibration trends for all machines approaching their maintenance interval" that previously required custom application-level joins.
  • Partition pruning made time-range queries predictably fast. Monthly partitions on sensor_readings combined with HeliosDB-Lite's partition pruning means a query for "last 10 minutes of data" only touches the current partition, keeping P95 latency under 5ms even as data accumulates.
  • Data sovereignty compliance came for free. Defense contracts required that raw sensor data from certain production lines not leave the facility. With edge deployment, this requirement was satisfied architecturally rather than through access controls or encryption -- the data simply never traverses a network.

Technical Stack

Component Technology
DatabaseHeliosDB-Lite (embedded, 60MB ARM64 binary)
Edge HardwareAdvantech UNO-2484G (ARM64, 8GB RAM, 256GB NVMe)
Ingestion ProtocolPostgreSQL wire protocol (COPY for bulk inserts)
Application LanguageGo 1.22 (ingestion + analytics), Python 3.12 (sync service)
Driverpgx v5 (Go), psycopg2 (Python)
Sensor ProtocolOPC-UA via open62541, MQTT via Eclipse Paho
Edge OSUbuntu 22.04 LTS (ARM64)
Central DashboardGrafana (reads aggregated data from central PostgreSQL)
PartitioningPARTITION BY RANGE on timestamp (monthly partitions)
Previous Stack (replaced)InfluxDB Cloud, PostgreSQL 15 (RDS), Node.js analytics on EC2

Ready to see similar results?

Schedule a demo to see how HeliosDB can transform your data infrastructure.

Schedule Demo More Case Studies