Use Case Examples
Use Case Examples
Real-world implementations with HeliosDB Nano
Quick Links
| Use Case | Description | Complexity |
|---|---|---|
| RAG Backend | Retrieval-Augmented Generation | Medium |
| AI Agent Memory | Persistent conversation context | Easy |
| Multi-Tenant SaaS | Row-level security platform | Medium |
| Semantic Search | Product/document similarity | Easy |
| Event Analytics | User behavior tracking | Medium |
| Feature Flags | Dynamic configuration | Easy |
RAG Backend
Build a complete Retrieval-Augmented Generation system with HeliosDB Nano as the vector store.
Architecture
┌─────────────┐ ┌──────────────┐ ┌─────────────┐│ Documents │────▶│ Embeddings │────▶│ HeliosDB ││ (PDF, MD) │ │ (OpenAI) │ │ (HNSW+PQ) │└─────────────┘ └──────────────┘ └─────────────┘ │┌─────────────┐ ┌──────────────┐ ││ User Query │────▶│ LLM (GPT-4) │◀──────────┘└─────────────┘ └──────────────┘ Retrieved ContextSetup
# Create documents table with vector columncurl -X POST http://localhost:6543/api/v1/query \ -H "Authorization: Bearer $TOKEN" \ -d '{ "sql": "CREATE TABLE documents ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), source TEXT NOT NULL, chunk_index INTEGER NOT NULL, content TEXT NOT NULL, embedding VECTOR(1536) NOT NULL, metadata JSONB, created_at TIMESTAMP DEFAULT now() )" }'
# Create HNSW index with Product Quantizationcurl -X POST http://localhost:6543/api/v1/query \ -H "Authorization: Bearer $TOKEN" \ -d '{ "sql": "CREATE INDEX documents_embedding_idx ON documents USING hnsw (embedding vector_cosine_ops) WITH (m = 16, ef_construction = 200)" }'Python Implementation
import openaiimport requestsfrom typing import List, Dict
class RAGBackend: def __init__(self, helios_url: str, helios_token: str, openai_key: str): self.helios_url = helios_url self.token = helios_token openai.api_key = openai_key
def _headers(self): return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" }
def embed_text(self, text: str) -> List[float]: """Generate embedding using OpenAI.""" response = openai.embeddings.create( model="text-embedding-3-small", input=text ) return response.data[0].embedding
def chunk_document(self, content: str, chunk_size: int = 500) -> List[str]: """Split document into chunks.""" words = content.split() chunks = [] for i in range(0, len(words), chunk_size): chunk = " ".join(words[i:i+chunk_size]) chunks.append(chunk) return chunks
def ingest_document(self, source: str, content: str, metadata: Dict = None): """Chunk, embed, and store a document.""" chunks = self.chunk_document(content)
for idx, chunk in enumerate(chunks): embedding = self.embed_text(chunk)
requests.post( f"{self.helios_url}/api/v1/query", headers=self._headers(), json={ "sql": """ INSERT INTO documents (source, chunk_index, content, embedding, metadata) VALUES ($1, $2, $3, $4, $5) """, "params": [source, idx, chunk, embedding, metadata or {}] } )
def search(self, query: str, limit: int = 5) -> List[Dict]: """Find relevant document chunks.""" query_embedding = self.embed_text(query)
response = requests.post( f"{self.helios_url}/api/v1/vectors/search", headers=self._headers(), json={ "table": "documents", "column": "embedding", "query_vector": query_embedding, "limit": limit, "metric": "cosine" } )
return response.json()["data"]["results"]
def answer(self, question: str) -> str: """RAG pipeline: retrieve context and generate answer.""" # 1. Retrieve relevant chunks context_chunks = self.search(question, limit=5) context = "\n\n".join([c["content"] for c in context_chunks])
# 2. Generate answer with context response = openai.chat.completions.create( model="gpt-4", messages=[ {"role": "system", "content": f"Answer based on this context:\n{context}"}, {"role": "user", "content": question} ] )
return response.choices[0].message.content
# Usagerag = RAGBackend( helios_url="http://localhost:6543", helios_token="your-token", openai_key="your-openai-key")
# Ingest documentsrag.ingest_document("docs/readme.md", open("README.md").read())
# Queryanswer = rag.answer("How do I set up authentication?")print(answer)AI Agent Memory
Implement persistent memory for AI agents with automatic summarization.
Setup
-- Sessions tableCREATE TABLE agent_sessions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), agent_id TEXT NOT NULL, user_id TEXT, started_at TIMESTAMP DEFAULT now(), last_active TIMESTAMP DEFAULT now(), token_count INTEGER DEFAULT 0, summary TEXT, metadata JSONB);
-- Messages tableCREATE TABLE agent_messages ( id SERIAL PRIMARY KEY, session_id UUID REFERENCES agent_sessions(id), role TEXT NOT NULL, content TEXT NOT NULL, token_count INTEGER, created_at TIMESTAMP DEFAULT now());
-- Create index for fast session lookupCREATE INDEX idx_sessions_agent ON agent_sessions(agent_id, last_active DESC);JavaScript Implementation
class AgentMemory { private baseUrl: string; private token: string;
constructor(baseUrl: string, token: string) { this.baseUrl = baseUrl; this.token = token; }
async createSession(agentId: string, userId?: string): Promise<string> { const response = await fetch(`${this.baseUrl}/api/v1/agents/sessions`, { method: "POST", headers: this.headers(), body: JSON.stringify({ agent_id: agentId, user_id: userId }) }); const data = await response.json(); return data.data.session_id; }
async addMessage(sessionId: string, role: string, content: string): Promise<void> { await fetch(`${this.baseUrl}/api/v1/agents/sessions/${sessionId}/messages`, { method: "POST", headers: this.headers(), body: JSON.stringify({ role, content }) }); }
async getHistory(sessionId: string, limit: number = 50): Promise<Message[]> { const response = await fetch( `${this.baseUrl}/api/v1/agents/sessions/${sessionId}?include=messages&limit=${limit}`, { headers: this.headers() } ); const data = await response.json(); return data.data.messages; }
async summarizeIfNeeded(sessionId: string, maxTokens: number = 4000): Promise<void> { const session = await this.getSession(sessionId);
if (session.token_count > maxTokens) { await fetch(`${this.baseUrl}/api/v1/agents/sessions/${sessionId}/summarize`, { method: "POST", headers: this.headers(), body: JSON.stringify({ target_tokens: maxTokens / 2 }) }); } }
private headers() { return { "Authorization": `Bearer ${this.token}`, "Content-Type": "application/json" }; }}
// Usageconst memory = new AgentMemory("http://localhost:6543", "your-token");
// Start conversationconst sessionId = await memory.createSession("support-bot", "user-123");
// Add messagesawait memory.addMessage(sessionId, "user", "How do I reset my password?");await memory.addMessage(sessionId, "assistant", "To reset your password...");
// Get context for next responseconst history = await memory.getHistory(sessionId);
// Summarize long conversationsawait memory.summarizeIfNeeded(sessionId);Multi-Tenant SaaS
Build a SaaS platform with automatic tenant isolation using Row-Level Security.
Setup
-- Enable multi-tenancyCREATE EXTENSION IF NOT EXISTS "uuid-ossp";
-- Tenants table (managed by system)CREATE TABLE tenants ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name TEXT NOT NULL, plan TEXT DEFAULT 'free', created_at TIMESTAMP DEFAULT now());
-- Your application tables (with tenant_id)CREATE TABLE projects ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), tenant_id UUID NOT NULL REFERENCES tenants(id), name TEXT NOT NULL, description TEXT, created_at TIMESTAMP DEFAULT now());
-- Row-Level Security PolicyCREATE POLICY projects_tenant_isolation ON projects USING (tenant_id = current_setting('app.tenant_id')::uuid);
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;Implementation
import requests
class TenantDB: def __init__(self, base_url: str, admin_token: str): self.base_url = base_url self.admin_token = admin_token
def create_tenant(self, name: str, plan: str = "free") -> dict: """Create new tenant.""" response = requests.post( f"{self.base_url}/api/v1/query", headers=self._admin_headers(), json={ "sql": "INSERT INTO tenants (name, plan) VALUES ($1, $2) RETURNING *", "params": [name, plan] } ) return response.json()["data"]["rows"][0]
def get_tenant_token(self, tenant_id: str) -> str: """Get scoped token for tenant.""" response = requests.post( f"{self.base_url}/auth/v1/token/tenant", headers=self._admin_headers(), json={"tenant_id": tenant_id} ) return response.json()["access_token"]
def query_as_tenant(self, tenant_token: str, sql: str, params: list = None): """Execute query with tenant context.""" response = requests.post( f"{self.base_url}/api/v1/query", headers={ "Authorization": f"Bearer {tenant_token}", "Content-Type": "application/json" }, json={"sql": sql, "params": params or []} ) return response.json()
def _admin_headers(self): return { "Authorization": f"Bearer {self.admin_token}", "Content-Type": "application/json" }
# Usagedb = TenantDB("http://localhost:6543", "admin-token")
# Create tenantsacme = db.create_tenant("Acme Corp", "pro")startup = db.create_tenant("Cool Startup", "free")
# Get tenant-scoped tokensacme_token = db.get_tenant_token(acme["id"])startup_token = db.get_tenant_token(startup["id"])
# Each tenant only sees their own datadb.query_as_tenant(acme_token, "INSERT INTO projects (name) VALUES ('Acme Project')")db.query_as_tenant(startup_token, "INSERT INTO projects (name) VALUES ('Startup MVP')")
# Acme sees only Acme's projectsacme_projects = db.query_as_tenant(acme_token, "SELECT * FROM projects")# Returns: [{"name": "Acme Project", ...}]
# Startup sees only Startup's projectsstartup_projects = db.query_as_tenant(startup_token, "SELECT * FROM projects")# Returns: [{"name": "Startup MVP", ...}]Semantic Search
Implement product search with natural language understanding.
Setup
CREATE TABLE products ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, description TEXT, category TEXT, price DECIMAL(10,2), embedding VECTOR(384), -- sentence-transformers dimension created_at TIMESTAMP DEFAULT now());
CREATE INDEX products_embedding_idx ON products USING hnsw (embedding vector_cosine_ops);Implementation
from sentence_transformers import SentenceTransformerimport requests
class SemanticSearch: def __init__(self, helios_url: str, token: str): self.helios_url = helios_url self.token = token self.model = SentenceTransformer('all-MiniLM-L6-v2')
def index_product(self, name: str, description: str, category: str, price: float): """Index a product with its embedding.""" # Combine text for embedding text = f"{name}. {description}. Category: {category}" embedding = self.model.encode(text).tolist()
requests.post( f"{self.helios_url}/api/v1/query", headers=self._headers(), json={ "sql": """ INSERT INTO products (name, description, category, price, embedding) VALUES ($1, $2, $3, $4, $5) """, "params": [name, description, category, price, embedding] } )
def search(self, query: str, category: str = None, max_price: float = None, limit: int = 10): """Search products by natural language query.""" query_embedding = self.model.encode(query).tolist()
# Build filter conditions filters = [] params = [query_embedding, limit]
if category: filters.append(f"category = ${len(params) + 1}") params.append(category)
if max_price: filters.append(f"price <= ${len(params) + 1}") params.append(max_price)
where_clause = f"WHERE {' AND '.join(filters)}" if filters else ""
response = requests.post( f"{self.helios_url}/api/v1/query", headers=self._headers(), json={ "sql": f""" SELECT id, name, description, category, price, embedding <-> $1 AS similarity FROM products {where_clause} ORDER BY embedding <-> $1 LIMIT $2 """, "params": params } )
return response.json()["data"]["rows"]
def _headers(self): return { "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" }
# Usagesearch = SemanticSearch("http://localhost:6543", "your-token")
# Index productssearch.index_product( "Wireless Noise-Canceling Headphones", "Premium over-ear headphones with ANC and 30-hour battery", "electronics", 299.99)
# Natural language searchresults = search.search( "headphones for working from home", category="electronics", max_price=350)Event Analytics
Track user behavior with time-series queries.
Setup
CREATE TABLE events ( id BIGSERIAL PRIMARY KEY, user_id TEXT NOT NULL, event_type TEXT NOT NULL, properties JSONB, timestamp TIMESTAMP DEFAULT now());
-- Partition by month for performance-- (conceptual - adjust for your needs)CREATE INDEX events_user_time_idx ON events (user_id, timestamp DESC);CREATE INDEX events_type_time_idx ON events (event_type, timestamp DESC);Implementation
class EventTracker { constructor(private baseUrl: string, private token: string) {}
async track(userId: string, eventType: string, properties: Record<string, any> = {}) { await fetch(`${this.baseUrl}/api/v1/query`, { method: "POST", headers: this.headers(), body: JSON.stringify({ sql: "INSERT INTO events (user_id, event_type, properties) VALUES ($1, $2, $3)", params: [userId, eventType, properties] }) }); }
async getUserEvents(userId: string, since: Date, eventTypes?: string[]): Promise<any[]> { let sql = ` SELECT * FROM events WHERE user_id = $1 AND timestamp >= $2 `; const params: any[] = [userId, since.toISOString()];
if (eventTypes?.length) { sql += ` AND event_type = ANY($3)`; params.push(eventTypes); }
sql += ` ORDER BY timestamp DESC LIMIT 100`;
const response = await fetch(`${this.baseUrl}/api/v1/query`, { method: "POST", headers: this.headers(), body: JSON.stringify({ sql, params }) });
return (await response.json()).data.rows; }
async getMetrics(eventType: string, interval: string = '1 day'): Promise<any> { const response = await fetch(`${this.baseUrl}/api/v1/query`, { method: "POST", headers: this.headers(), body: JSON.stringify({ sql: ` SELECT date_trunc($2, timestamp) AS period, COUNT(*) AS event_count, COUNT(DISTINCT user_id) AS unique_users FROM events WHERE event_type = $1 AND timestamp >= now() - interval '30 days' GROUP BY period ORDER BY period DESC `, params: [eventType, interval] }) });
return (await response.json()).data.rows; }
private headers() { return { "Authorization": `Bearer ${this.token}`, "Content-Type": "application/json" }; }}
// Usageconst tracker = new EventTracker("http://localhost:6543", "your-token");
// Track eventsawait tracker.track("user-123", "page_view", { page: "/products", referrer: "google" });await tracker.track("user-123", "add_to_cart", { product_id: "prod-456", price: 29.99 });await tracker.track("user-123", "purchase", { order_id: "order-789", total: 29.99 });
// Query metricsconst dailyPurchases = await tracker.getMetrics("purchase", "1 day");Feature Flags
Implement dynamic feature flags with database-backed configuration.
Setup
CREATE TABLE feature_flags ( id TEXT PRIMARY KEY, enabled BOOLEAN DEFAULT false, rollout_percentage INTEGER DEFAULT 0, target_users TEXT[], target_plans TEXT[], metadata JSONB, updated_at TIMESTAMP DEFAULT now());
-- Insert some flagsINSERT INTO feature_flags (id, enabled, rollout_percentage, target_plans) VALUES ('new_checkout', true, 50, ARRAY['pro', 'enterprise']), ('dark_mode', true, 100, NULL), ('beta_features', false, 0, ARRAY['enterprise']);Implementation
import hashlibimport requests
class FeatureFlags: def __init__(self, helios_url: str, token: str): self.helios_url = helios_url self.token = token self._cache = {}
def is_enabled(self, flag_id: str, user_id: str = None, plan: str = None) -> bool: """Check if a feature flag is enabled for a user.""" flag = self._get_flag(flag_id)
if not flag or not flag["enabled"]: return False
# Check target users if flag.get("target_users") and user_id: if user_id in flag["target_users"]: return True
# Check target plans if flag.get("target_plans") and plan: if plan not in flag["target_plans"]: return False
# Check rollout percentage if flag["rollout_percentage"] < 100 and user_id: bucket = self._get_bucket(flag_id, user_id) return bucket < flag["rollout_percentage"]
return flag["rollout_percentage"] == 100
def _get_flag(self, flag_id: str) -> dict: """Fetch flag from database (with caching).""" if flag_id in self._cache: return self._cache[flag_id]
response = requests.post( f"{self.helios_url}/api/v1/query", headers={ "Authorization": f"Bearer {self.token}", "Content-Type": "application/json" }, json={ "sql": "SELECT * FROM feature_flags WHERE id = $1", "params": [flag_id] } )
rows = response.json()["data"]["rows"] flag = rows[0] if rows else None self._cache[flag_id] = flag return flag
def _get_bucket(self, flag_id: str, user_id: str) -> int: """Deterministic bucket assignment (0-99).""" key = f"{flag_id}:{user_id}" hash_val = hashlib.md5(key.encode()).hexdigest() return int(hash_val[:8], 16) % 100
# Usageflags = FeatureFlags("http://localhost:6543", "your-token")
user_id = "user-123"user_plan = "pro"
if flags.is_enabled("new_checkout", user_id, user_plan): # Show new checkout passelse: # Show old checkout passNext Steps
Need help? Join Discord | File Issue