Skip to content

Use Case Examples

Use Case Examples

Real-world implementations with HeliosDB Nano


Use CaseDescriptionComplexity
RAG BackendRetrieval-Augmented GenerationMedium
AI Agent MemoryPersistent conversation contextEasy
Multi-Tenant SaaSRow-level security platformMedium
Semantic SearchProduct/document similarityEasy
Event AnalyticsUser behavior trackingMedium
Feature FlagsDynamic configurationEasy

RAG Backend

Build a complete Retrieval-Augmented Generation system with HeliosDB Nano as the vector store.

Architecture

┌─────────────┐ ┌──────────────┐ ┌─────────────┐
│ Documents │────▶│ Embeddings │────▶│ HeliosDB │
│ (PDF, MD) │ │ (OpenAI) │ │ (HNSW+PQ) │
└─────────────┘ └──────────────┘ └─────────────┘
┌─────────────┐ ┌──────────────┐ │
│ User Query │────▶│ LLM (GPT-4) │◀──────────┘
└─────────────┘ └──────────────┘ Retrieved Context

Setup

Terminal window
# Create documents table with vector column
curl -X POST http://localhost:6543/api/v1/query \
-H "Authorization: Bearer $TOKEN" \
-d '{
"sql": "CREATE TABLE documents (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source TEXT NOT NULL,
chunk_index INTEGER NOT NULL,
content TEXT NOT NULL,
embedding VECTOR(1536) NOT NULL,
metadata JSONB,
created_at TIMESTAMP DEFAULT now()
)"
}'
# Create HNSW index with Product Quantization
curl -X POST http://localhost:6543/api/v1/query \
-H "Authorization: Bearer $TOKEN" \
-d '{
"sql": "CREATE INDEX documents_embedding_idx ON documents
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 200)"
}'

Python Implementation

import openai
import requests
from typing import List, Dict
class RAGBackend:
def __init__(self, helios_url: str, helios_token: str, openai_key: str):
self.helios_url = helios_url
self.token = helios_token
openai.api_key = openai_key
def _headers(self):
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def embed_text(self, text: str) -> List[float]:
"""Generate embedding using OpenAI."""
response = openai.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
def chunk_document(self, content: str, chunk_size: int = 500) -> List[str]:
"""Split document into chunks."""
words = content.split()
chunks = []
for i in range(0, len(words), chunk_size):
chunk = " ".join(words[i:i+chunk_size])
chunks.append(chunk)
return chunks
def ingest_document(self, source: str, content: str, metadata: Dict = None):
"""Chunk, embed, and store a document."""
chunks = self.chunk_document(content)
for idx, chunk in enumerate(chunks):
embedding = self.embed_text(chunk)
requests.post(
f"{self.helios_url}/api/v1/query",
headers=self._headers(),
json={
"sql": """
INSERT INTO documents (source, chunk_index, content, embedding, metadata)
VALUES ($1, $2, $3, $4, $5)
""",
"params": [source, idx, chunk, embedding, metadata or {}]
}
)
def search(self, query: str, limit: int = 5) -> List[Dict]:
"""Find relevant document chunks."""
query_embedding = self.embed_text(query)
response = requests.post(
f"{self.helios_url}/api/v1/vectors/search",
headers=self._headers(),
json={
"table": "documents",
"column": "embedding",
"query_vector": query_embedding,
"limit": limit,
"metric": "cosine"
}
)
return response.json()["data"]["results"]
def answer(self, question: str) -> str:
"""RAG pipeline: retrieve context and generate answer."""
# 1. Retrieve relevant chunks
context_chunks = self.search(question, limit=5)
context = "\n\n".join([c["content"] for c in context_chunks])
# 2. Generate answer with context
response = openai.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": f"Answer based on this context:\n{context}"},
{"role": "user", "content": question}
]
)
return response.choices[0].message.content
# Usage
rag = RAGBackend(
helios_url="http://localhost:6543",
helios_token="your-token",
openai_key="your-openai-key"
)
# Ingest documents
rag.ingest_document("docs/readme.md", open("README.md").read())
# Query
answer = rag.answer("How do I set up authentication?")
print(answer)

AI Agent Memory

Implement persistent memory for AI agents with automatic summarization.

Setup

-- Sessions table
CREATE TABLE agent_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
agent_id TEXT NOT NULL,
user_id TEXT,
started_at TIMESTAMP DEFAULT now(),
last_active TIMESTAMP DEFAULT now(),
token_count INTEGER DEFAULT 0,
summary TEXT,
metadata JSONB
);
-- Messages table
CREATE TABLE agent_messages (
id SERIAL PRIMARY KEY,
session_id UUID REFERENCES agent_sessions(id),
role TEXT NOT NULL,
content TEXT NOT NULL,
token_count INTEGER,
created_at TIMESTAMP DEFAULT now()
);
-- Create index for fast session lookup
CREATE INDEX idx_sessions_agent ON agent_sessions(agent_id, last_active DESC);

JavaScript Implementation

class AgentMemory {
private baseUrl: string;
private token: string;
constructor(baseUrl: string, token: string) {
this.baseUrl = baseUrl;
this.token = token;
}
async createSession(agentId: string, userId?: string): Promise<string> {
const response = await fetch(`${this.baseUrl}/api/v1/agents/sessions`, {
method: "POST",
headers: this.headers(),
body: JSON.stringify({ agent_id: agentId, user_id: userId })
});
const data = await response.json();
return data.data.session_id;
}
async addMessage(sessionId: string, role: string, content: string): Promise<void> {
await fetch(`${this.baseUrl}/api/v1/agents/sessions/${sessionId}/messages`, {
method: "POST",
headers: this.headers(),
body: JSON.stringify({ role, content })
});
}
async getHistory(sessionId: string, limit: number = 50): Promise<Message[]> {
const response = await fetch(
`${this.baseUrl}/api/v1/agents/sessions/${sessionId}?include=messages&limit=${limit}`,
{ headers: this.headers() }
);
const data = await response.json();
return data.data.messages;
}
async summarizeIfNeeded(sessionId: string, maxTokens: number = 4000): Promise<void> {
const session = await this.getSession(sessionId);
if (session.token_count > maxTokens) {
await fetch(`${this.baseUrl}/api/v1/agents/sessions/${sessionId}/summarize`, {
method: "POST",
headers: this.headers(),
body: JSON.stringify({ target_tokens: maxTokens / 2 })
});
}
}
private headers() {
return {
"Authorization": `Bearer ${this.token}`,
"Content-Type": "application/json"
};
}
}
// Usage
const memory = new AgentMemory("http://localhost:6543", "your-token");
// Start conversation
const sessionId = await memory.createSession("support-bot", "user-123");
// Add messages
await memory.addMessage(sessionId, "user", "How do I reset my password?");
await memory.addMessage(sessionId, "assistant", "To reset your password...");
// Get context for next response
const history = await memory.getHistory(sessionId);
// Summarize long conversations
await memory.summarizeIfNeeded(sessionId);

Multi-Tenant SaaS

Build a SaaS platform with automatic tenant isolation using Row-Level Security.

Setup

-- Enable multi-tenancy
CREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Tenants table (managed by system)
CREATE TABLE tenants (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name TEXT NOT NULL,
plan TEXT DEFAULT 'free',
created_at TIMESTAMP DEFAULT now()
);
-- Your application tables (with tenant_id)
CREATE TABLE projects (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
tenant_id UUID NOT NULL REFERENCES tenants(id),
name TEXT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT now()
);
-- Row-Level Security Policy
CREATE POLICY projects_tenant_isolation ON projects
USING (tenant_id = current_setting('app.tenant_id')::uuid);
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;

Implementation

import requests
class TenantDB:
def __init__(self, base_url: str, admin_token: str):
self.base_url = base_url
self.admin_token = admin_token
def create_tenant(self, name: str, plan: str = "free") -> dict:
"""Create new tenant."""
response = requests.post(
f"{self.base_url}/api/v1/query",
headers=self._admin_headers(),
json={
"sql": "INSERT INTO tenants (name, plan) VALUES ($1, $2) RETURNING *",
"params": [name, plan]
}
)
return response.json()["data"]["rows"][0]
def get_tenant_token(self, tenant_id: str) -> str:
"""Get scoped token for tenant."""
response = requests.post(
f"{self.base_url}/auth/v1/token/tenant",
headers=self._admin_headers(),
json={"tenant_id": tenant_id}
)
return response.json()["access_token"]
def query_as_tenant(self, tenant_token: str, sql: str, params: list = None):
"""Execute query with tenant context."""
response = requests.post(
f"{self.base_url}/api/v1/query",
headers={
"Authorization": f"Bearer {tenant_token}",
"Content-Type": "application/json"
},
json={"sql": sql, "params": params or []}
)
return response.json()
def _admin_headers(self):
return {
"Authorization": f"Bearer {self.admin_token}",
"Content-Type": "application/json"
}
# Usage
db = TenantDB("http://localhost:6543", "admin-token")
# Create tenants
acme = db.create_tenant("Acme Corp", "pro")
startup = db.create_tenant("Cool Startup", "free")
# Get tenant-scoped tokens
acme_token = db.get_tenant_token(acme["id"])
startup_token = db.get_tenant_token(startup["id"])
# Each tenant only sees their own data
db.query_as_tenant(acme_token, "INSERT INTO projects (name) VALUES ('Acme Project')")
db.query_as_tenant(startup_token, "INSERT INTO projects (name) VALUES ('Startup MVP')")
# Acme sees only Acme's projects
acme_projects = db.query_as_tenant(acme_token, "SELECT * FROM projects")
# Returns: [{"name": "Acme Project", ...}]
# Startup sees only Startup's projects
startup_projects = db.query_as_tenant(startup_token, "SELECT * FROM projects")
# Returns: [{"name": "Startup MVP", ...}]

Implement product search with natural language understanding.

Setup

CREATE TABLE products (
id SERIAL PRIMARY KEY,
name TEXT NOT NULL,
description TEXT,
category TEXT,
price DECIMAL(10,2),
embedding VECTOR(384), -- sentence-transformers dimension
created_at TIMESTAMP DEFAULT now()
);
CREATE INDEX products_embedding_idx ON products
USING hnsw (embedding vector_cosine_ops);

Implementation

from sentence_transformers import SentenceTransformer
import requests
class SemanticSearch:
def __init__(self, helios_url: str, token: str):
self.helios_url = helios_url
self.token = token
self.model = SentenceTransformer('all-MiniLM-L6-v2')
def index_product(self, name: str, description: str, category: str, price: float):
"""Index a product with its embedding."""
# Combine text for embedding
text = f"{name}. {description}. Category: {category}"
embedding = self.model.encode(text).tolist()
requests.post(
f"{self.helios_url}/api/v1/query",
headers=self._headers(),
json={
"sql": """
INSERT INTO products (name, description, category, price, embedding)
VALUES ($1, $2, $3, $4, $5)
""",
"params": [name, description, category, price, embedding]
}
)
def search(self, query: str, category: str = None, max_price: float = None, limit: int = 10):
"""Search products by natural language query."""
query_embedding = self.model.encode(query).tolist()
# Build filter conditions
filters = []
params = [query_embedding, limit]
if category:
filters.append(f"category = ${len(params) + 1}")
params.append(category)
if max_price:
filters.append(f"price <= ${len(params) + 1}")
params.append(max_price)
where_clause = f"WHERE {' AND '.join(filters)}" if filters else ""
response = requests.post(
f"{self.helios_url}/api/v1/query",
headers=self._headers(),
json={
"sql": f"""
SELECT id, name, description, category, price,
embedding <-> $1 AS similarity
FROM products
{where_clause}
ORDER BY embedding <-> $1
LIMIT $2
""",
"params": params
}
)
return response.json()["data"]["rows"]
def _headers(self):
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
# Usage
search = SemanticSearch("http://localhost:6543", "your-token")
# Index products
search.index_product(
"Wireless Noise-Canceling Headphones",
"Premium over-ear headphones with ANC and 30-hour battery",
"electronics",
299.99
)
# Natural language search
results = search.search(
"headphones for working from home",
category="electronics",
max_price=350
)

Event Analytics

Track user behavior with time-series queries.

Setup

CREATE TABLE events (
id BIGSERIAL PRIMARY KEY,
user_id TEXT NOT NULL,
event_type TEXT NOT NULL,
properties JSONB,
timestamp TIMESTAMP DEFAULT now()
);
-- Partition by month for performance
-- (conceptual - adjust for your needs)
CREATE INDEX events_user_time_idx ON events (user_id, timestamp DESC);
CREATE INDEX events_type_time_idx ON events (event_type, timestamp DESC);

Implementation

class EventTracker {
constructor(private baseUrl: string, private token: string) {}
async track(userId: string, eventType: string, properties: Record<string, any> = {}) {
await fetch(`${this.baseUrl}/api/v1/query`, {
method: "POST",
headers: this.headers(),
body: JSON.stringify({
sql: "INSERT INTO events (user_id, event_type, properties) VALUES ($1, $2, $3)",
params: [userId, eventType, properties]
})
});
}
async getUserEvents(userId: string, since: Date, eventTypes?: string[]): Promise<any[]> {
let sql = `
SELECT * FROM events
WHERE user_id = $1 AND timestamp >= $2
`;
const params: any[] = [userId, since.toISOString()];
if (eventTypes?.length) {
sql += ` AND event_type = ANY($3)`;
params.push(eventTypes);
}
sql += ` ORDER BY timestamp DESC LIMIT 100`;
const response = await fetch(`${this.baseUrl}/api/v1/query`, {
method: "POST",
headers: this.headers(),
body: JSON.stringify({ sql, params })
});
return (await response.json()).data.rows;
}
async getMetrics(eventType: string, interval: string = '1 day'): Promise<any> {
const response = await fetch(`${this.baseUrl}/api/v1/query`, {
method: "POST",
headers: this.headers(),
body: JSON.stringify({
sql: `
SELECT
date_trunc($2, timestamp) AS period,
COUNT(*) AS event_count,
COUNT(DISTINCT user_id) AS unique_users
FROM events
WHERE event_type = $1
AND timestamp >= now() - interval '30 days'
GROUP BY period
ORDER BY period DESC
`,
params: [eventType, interval]
})
});
return (await response.json()).data.rows;
}
private headers() {
return {
"Authorization": `Bearer ${this.token}`,
"Content-Type": "application/json"
};
}
}
// Usage
const tracker = new EventTracker("http://localhost:6543", "your-token");
// Track events
await tracker.track("user-123", "page_view", { page: "/products", referrer: "google" });
await tracker.track("user-123", "add_to_cart", { product_id: "prod-456", price: 29.99 });
await tracker.track("user-123", "purchase", { order_id: "order-789", total: 29.99 });
// Query metrics
const dailyPurchases = await tracker.getMetrics("purchase", "1 day");

Feature Flags

Implement dynamic feature flags with database-backed configuration.

Setup

CREATE TABLE feature_flags (
id TEXT PRIMARY KEY,
enabled BOOLEAN DEFAULT false,
rollout_percentage INTEGER DEFAULT 0,
target_users TEXT[],
target_plans TEXT[],
metadata JSONB,
updated_at TIMESTAMP DEFAULT now()
);
-- Insert some flags
INSERT INTO feature_flags (id, enabled, rollout_percentage, target_plans) VALUES
('new_checkout', true, 50, ARRAY['pro', 'enterprise']),
('dark_mode', true, 100, NULL),
('beta_features', false, 0, ARRAY['enterprise']);

Implementation

import hashlib
import requests
class FeatureFlags:
def __init__(self, helios_url: str, token: str):
self.helios_url = helios_url
self.token = token
self._cache = {}
def is_enabled(self, flag_id: str, user_id: str = None, plan: str = None) -> bool:
"""Check if a feature flag is enabled for a user."""
flag = self._get_flag(flag_id)
if not flag or not flag["enabled"]:
return False
# Check target users
if flag.get("target_users") and user_id:
if user_id in flag["target_users"]:
return True
# Check target plans
if flag.get("target_plans") and plan:
if plan not in flag["target_plans"]:
return False
# Check rollout percentage
if flag["rollout_percentage"] < 100 and user_id:
bucket = self._get_bucket(flag_id, user_id)
return bucket < flag["rollout_percentage"]
return flag["rollout_percentage"] == 100
def _get_flag(self, flag_id: str) -> dict:
"""Fetch flag from database (with caching)."""
if flag_id in self._cache:
return self._cache[flag_id]
response = requests.post(
f"{self.helios_url}/api/v1/query",
headers={
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
},
json={
"sql": "SELECT * FROM feature_flags WHERE id = $1",
"params": [flag_id]
}
)
rows = response.json()["data"]["rows"]
flag = rows[0] if rows else None
self._cache[flag_id] = flag
return flag
def _get_bucket(self, flag_id: str, user_id: str) -> int:
"""Deterministic bucket assignment (0-99)."""
key = f"{flag_id}:{user_id}"
hash_val = hashlib.md5(key.encode()).hexdigest()
return int(hash_val[:8], 16) % 100
# Usage
flags = FeatureFlags("http://localhost:6543", "your-token")
user_id = "user-123"
user_plan = "pro"
if flags.is_enabled("new_checkout", user_id, user_plan):
# Show new checkout
pass
else:
# Show old checkout
pass

Next Steps


Need help? Join Discord | File Issue