Conversational BI Implementation Guide
Conversational BI Implementation Guide
Tactical Implementation Handbook
Document Version: 1.0 Created: November 9, 2025 Companion to: CONVERSATIONAL_BI_ARCHITECTURE.md
Table of Contents
- Development Environment Setup
- Crate Structure
- Phase-by-Phase Implementation
- Code Templates
- Testing Framework
- Integration Checklist
- Deployment Guide
1. Development Environment Setup
1.1 Prerequisites
# Rust toolchainrustup update stablerustup component add clippy rustfmt
# Python (for LLM integration testing)python3 -m pip install openai anthropic cohere-python
# Ollama (for local model testing)curl -fsSL https://ollama.com/install.sh | shollama pull llama3ollama pull codellama
# Database for testingdocker run -d --name heliosdb-test-postgres \ -e POSTGRES_PASSWORD=test \ -p 5432:5432 postgres:161.2 Project Setup
cd /home/claude/HeliosDB
# Create new cratecargo new --lib heliosdb-conversational-bicd heliosdb-conversational-bi
# Add to workspaceecho 'heliosdb-conversational-bi' >> ../Cargo.toml1.3 Dependencies
[package]name = "heliosdb-conversational-bi"version = "0.1.0"edition = "2021"
[dependencies]# HeliosDB internal cratesheliosdb-common = { path = "../heliosdb-common" }heliosdb-compute = { path = "../heliosdb-compute" }heliosdb-ml = { path = "../heliosdb-ml" }
# Async runtimetokio = { version = "1.40", features = ["full"] }futures = "0.3"
# HTTP clients for LLM APIsreqwest = { version = "0.12", features = ["json"] }async-openai = "0.24"
# Serializationserde = { version = "1.0", features = ["derive"] }serde_json = "1.0"
# Collectionsdashmap = "6.0"
# SQL parsingsqlparser = "0.51"
# Vector operationsndarray = "0.16"
# Embeddingsfastembed = "3.0" # For local embedding models
# Cachingmoka = { version = "0.12", features = ["future"] }
# Loggingtracing = "0.1"tracing-subscriber = "0.3"
# Error handlingthiserror = "1.0"anyhow = "1.0"
# Timechrono = { version = "0.4", features = ["serde"] }
# UUIDuuid = { version = "1.0", features = ["v4", "serde"] }
[dev-dependencies]# Testingtokio-test = "0.4"mockall = "0.13"proptest = "1.0"
# Test datacsv = "1.3"
[features]default = ["cloud-models", "local-models"]cloud-models = [] # OpenAI, Anthropic, Coherelocal-models = [] # Ollama, ONNX2. Crate Structure
heliosdb-conversational-bi/├── Cargo.toml├── README.md├── benches/│ ├── accuracy_benchmarks.rs # BIRD, Spider, WikiSQL benchmarks│ ├── latency_benchmarks.rs # Latency measurements│ └── throughput_benchmarks.rs # QPS testing├── examples/│ ├── basic_conversation.rs # Simple usage example│ ├── multi_turn_example.rs # Multi-turn conversation│ ├── local_model_example.rs # Using Ollama│ └── cloud_model_example.rs # Using OpenAI/Claude├── src/│ ├── lib.rs # Public API│ ├── engine.rs # ConversationalBiEngine│ ├── session/│ │ ├── mod.rs│ │ ├── manager.rs # SessionManager│ │ ├── context.rs # ConversationContext│ │ └── persistence.rs # Session storage│ ├── nl2sql/│ │ ├── mod.rs│ │ ├── generator.rs # SqlGenerator│ │ ├── validator.rs # SqlValidator│ │ ├── corrector.rs # SelfCorrectionEngine│ │ ├── understander.rs # QueryUnderstander│ │ └── examples.rs # ExampleStore│ ├── schema/│ │ ├── mod.rs│ │ ├── augmenter.rs # SchemaAugmenter│ │ ├── introspector.rs # SchemaIntrospector│ │ └── cache.rs # SchemaCache│ ├── context/│ │ ├── mod.rs│ │ ├── tracker.rs # ContextTracker│ │ ├── reference.rs # ReferenceResolver│ │ └── clarification.rs # ClarificationEngine│ ├── explanation/│ │ ├── mod.rs│ │ ├── explainer.rs # ExplanationEngine│ │ ├── plan_explainer.rs # PlanExplainer│ │ └── optimizer.rs # OptimizerSuggester│ ├── models/│ │ ├── mod.rs│ │ ├── router.rs # ModelRouter│ │ ├── openai.rs # OpenAI integration│ │ ├── anthropic.rs # Anthropic integration│ │ ├── cohere.rs # Cohere integration│ │ ├── ollama.rs # Ollama integration│ │ └── traits.rs # LanguageModel trait│ ├── cache/│ │ ├── mod.rs│ │ ├── query_cache.rs # Query caching│ │ └── semantic_search.rs # Embedding-based cache│ ├── prompts/│ │ ├── mod.rs│ │ ├── templates.rs # Prompt templates│ │ └── builder.rs # Prompt construction│ ├── types.rs # Core types│ ├── error.rs # Error types│ └── config.rs # Configuration├── tests/│ ├── integration/│ │ ├── basic_tests.rs│ │ ├── multi_turn_tests.rs│ │ ├── accuracy_tests.rs│ │ └── performance_tests.rs│ ├── fixtures/│ │ ├── bird_samples.csv # Sample BIRD dataset│ │ ├── spider_samples.csv # Sample Spider dataset│ │ └── test_schemas.sql # Test database schemas│ └── test_utils.rs # Testing utilities└── datasets/ ├── download_bird.sh # Download BIRD dataset ├── download_spider.sh # Download Spider dataset └── examples/ # Custom example queries ├── retail.json ├── finance.json └── healthcare.json3. Phase-by-Phase Implementation
Phase 1: Foundation (Week 1-3)
Week 1: Session Management
Task 1.1: Core Types
use chrono::{DateTime, Utc};use serde::{Deserialize, Serialize};use std::collections::{HashMap, VecDeque};use uuid::Uuid;
pub type SessionId = Uuid;
#[derive(Debug, Clone, Serialize, Deserialize)]pub struct ConversationTurn { pub turn_id: u32, pub timestamp: DateTime<Utc>, pub user_query: String, pub sql: Option<String>, pub results_summary: Option<String>, pub entities_mentioned: Vec<String>, pub intent: Intent,}
#[derive(Debug, Clone, Serialize, Deserialize)]pub enum Intent { Query, Explore, Analyze, Compare, Filter, Clarify, Export,}
#[derive(Debug, Clone)]pub struct Session { pub id: SessionId, pub user_id: String, pub database: String, pub created_at: DateTime<Utc>, pub last_accessed: DateTime<Utc>, pub turn_count: u32, pub context: ConversationContext,}
#[derive(Debug, Clone, Default)]pub struct ConversationContext { pub turns: VecDeque<ConversationTurn>, pub mentioned_tables: HashSet<String>, pub mentioned_columns: HashMap<String, String>, pub current_focus: Option<String>,}Task 1.2: Session Manager
use dashmap::DashMap;use std::sync::Arc;use tokio::time::{interval, Duration};
pub struct SessionManager { sessions: Arc<DashMap<SessionId, Session>>, config: SessionConfig,}
pub struct SessionConfig { pub max_sessions_per_user: usize, pub idle_timeout_minutes: u64, pub max_turns: usize,}
impl Default for SessionConfig { fn default() -> Self { Self { max_sessions_per_user: 10, idle_timeout_minutes: 30, max_turns: 20, } }}
impl SessionManager { pub fn new(config: SessionConfig) -> Self { let manager = Self { sessions: Arc::new(DashMap::new()), config, };
// Start cleanup task manager.start_cleanup_task();
manager }
pub fn create(&self, user_id: String, database: String) -> Result<SessionId> { let session_id = Uuid::new_v4();
let session = Session { id: session_id, user_id: user_id.clone(), database, created_at: Utc::now(), last_accessed: Utc::now(), turn_count: 0, context: ConversationContext::default(), };
// Check user session limit let user_sessions = self.get_user_sessions(&user_id); if user_sessions.len() >= self.config.max_sessions_per_user { return Err(anyhow::anyhow!("User has too many active sessions")); }
self.sessions.insert(session_id, session);
Ok(session_id) }
pub fn get(&self, session_id: SessionId) -> Result<Session> { self.sessions .get(&session_id) .map(|s| s.clone()) .ok_or_else(|| anyhow::anyhow!("Session not found")) }
pub fn touch(&self, session_id: SessionId) -> Result<()> { self.sessions .get_mut(&session_id) .map(|mut s| { s.last_accessed = Utc::now(); }) .ok_or_else(|| anyhow::anyhow!("Session not found")) }
pub fn update_context(&self, session_id: SessionId, context: ConversationContext) -> Result<()> { self.sessions .get_mut(&session_id) .map(|mut s| { s.context = context; s.turn_count += 1; s.last_accessed = Utc::now(); }) .ok_or_else(|| anyhow::anyhow!("Session not found")) }
fn get_user_sessions(&self, user_id: &str) -> Vec<SessionId> { self.sessions .iter() .filter(|entry| entry.value().user_id == user_id) .map(|entry| *entry.key()) .collect() }
fn start_cleanup_task(&self) { let sessions = Arc::clone(&self.sessions); let timeout = Duration::from_secs(self.config.idle_timeout_minutes * 60);
tokio::spawn(async move { let mut ticker = interval(Duration::from_secs(300)); // 5 minutes
loop { ticker.tick().await;
let now = Utc::now(); let expired: Vec<SessionId> = sessions .iter() .filter(|entry| { let idle_duration = now - entry.value().last_accessed; idle_duration.num_seconds() > timeout.as_secs() as i64 }) .map(|entry| *entry.key()) .collect();
for session_id in expired { sessions.remove(&session_id); tracing::info!("Cleaned up expired session: {}", session_id); } } }); }}
#[cfg(test)]mod tests { use super::*;
#[tokio::test] async fn test_create_session() { let manager = SessionManager::new(SessionConfig::default()); let session_id = manager.create("user1".to_string(), "testdb".to_string()).unwrap();
let session = manager.get(session_id).unwrap(); assert_eq!(session.user_id, "user1"); assert_eq!(session.database, "testdb"); }
#[tokio::test] async fn test_session_limit() { let config = SessionConfig { max_sessions_per_user: 2, ..Default::default() }; let manager = SessionManager::new(config);
manager.create("user1".to_string(), "db1".to_string()).unwrap(); manager.create("user1".to_string(), "db2".to_string()).unwrap();
let result = manager.create("user1".to_string(), "db3".to_string()); assert!(result.is_err()); }}Week 2: Schema Integration
Task 2.1: Schema Introspector
use sqlparser::dialect::PostgreSqlDialect;use std::collections::HashMap;
pub struct SchemaIntrospector { connection_pool: Arc<ConnectionPool>,}
#[derive(Debug, Clone)]pub struct DatabaseSchema { pub database_name: String, pub tables: Vec<TableSchema>, pub relationships: Vec<Relationship>,}
#[derive(Debug, Clone)]pub struct TableSchema { pub name: String, pub columns: Vec<ColumnSchema>, pub primary_key: Vec<String>, pub indexes: Vec<IndexSchema>,}
#[derive(Debug, Clone)]pub struct ColumnSchema { pub name: String, pub data_type: String, pub nullable: bool, pub default_value: Option<String>,}
#[derive(Debug, Clone)]pub struct Relationship { pub from_table: String, pub from_column: String, pub to_table: String, pub to_column: String, pub relationship_type: RelationshipType,}
#[derive(Debug, Clone)]pub enum RelationshipType { OneToOne, OneToMany, ManyToMany,}
impl SchemaIntrospector { pub async fn get_schema(&self, database: &str) -> Result<DatabaseSchema> { let tables = self.get_tables(database).await?; let relationships = self.infer_relationships(database).await?;
Ok(DatabaseSchema { database_name: database.to_string(), tables, relationships, }) }
async fn get_tables(&self, database: &str) -> Result<Vec<TableSchema>> { // Query information_schema let query = r#" SELECT t.table_name, c.column_name, c.data_type, c.is_nullable, c.column_default FROM information_schema.tables t JOIN information_schema.columns c ON t.table_name = c.table_name WHERE t.table_schema = 'public' ORDER BY t.table_name, c.ordinal_position "#;
let rows = self.connection_pool.query(query).await?;
// Group by table let mut tables_map: HashMap<String, Vec<ColumnSchema>> = HashMap::new();
for row in rows { let table_name: String = row.get("table_name"); let column = ColumnSchema { name: row.get("column_name"), data_type: row.get("data_type"), nullable: row.get::<String, _>("is_nullable") == "YES", default_value: row.get("column_default"), };
tables_map.entry(table_name).or_insert_with(Vec::new).push(column); }
// Convert to TableSchema let mut tables = Vec::new(); for (table_name, columns) in tables_map { let primary_key = self.get_primary_key(database, &table_name).await?; let indexes = self.get_indexes(database, &table_name).await?;
tables.push(TableSchema { name: table_name, columns, primary_key, indexes, }); }
Ok(tables) }
async fn infer_relationships(&self, database: &str) -> Result<Vec<Relationship>> { // Query foreign key constraints let query = r#" SELECT tc.table_name as from_table, kcu.column_name as from_column, ccu.table_name as to_table, ccu.column_name as to_column FROM information_schema.table_constraints tc JOIN information_schema.key_column_usage kcu ON tc.constraint_name = kcu.constraint_name JOIN information_schema.constraint_column_usage ccu ON tc.constraint_name = ccu.constraint_name WHERE tc.constraint_type = 'FOREIGN KEY' "#;
let rows = self.connection_pool.query(query).await?;
let relationships = rows .into_iter() .map(|row| Relationship { from_table: row.get("from_table"), from_column: row.get("from_column"), to_table: row.get("to_table"), to_column: row.get("to_column"), relationship_type: RelationshipType::OneToMany, // Simplified }) .collect();
Ok(relationships) }}Task 2.2: Schema Augmenter
pub struct SchemaAugmenter { embedding_model: Arc<EmbeddingModel>,}
#[derive(Debug, Clone)]pub struct AugmentedSchema { pub base_schema: DatabaseSchema, pub table_descriptions: HashMap<String, String>, pub column_descriptions: HashMap<String, String>, pub column_examples: HashMap<String, Vec<String>>, pub business_terms: HashMap<String, Vec<String>>,}
impl SchemaAugmenter { pub async fn augment(&self, schema: &DatabaseSchema) -> Result<AugmentedSchema> { let mut augmented = AugmentedSchema { base_schema: schema.clone(), table_descriptions: HashMap::new(), column_descriptions: HashMap::new(), column_examples: HashMap::new(), business_terms: HashMap::new(), };
// Generate descriptions from table/column names for table in &schema.tables { augmented.table_descriptions.insert( table.name.clone(), self.generate_description(&table.name), );
for column in &table.columns { let key = format!("{}.{}", table.name, column.name); augmented.column_descriptions.insert( key.clone(), self.generate_description(&column.name), );
// Sample values if let Ok(examples) = self.sample_column_values(&table.name, &column.name).await { augmented.column_examples.insert(key, examples); } } }
// Add business term mappings augmented.business_terms.insert("revenue".to_string(), vec!["sales_amount".to_string(), "total_price".to_string()]); augmented.business_terms.insert("customer".to_string(), vec!["client".to_string(), "user".to_string()]);
Ok(augmented) }
fn generate_description(&self, name: &str) -> String { // Convert snake_case to human-readable name.replace('_', " ") .split_whitespace() .map(|word| { let mut chars = word.chars(); match chars.next() { None => String::new(), Some(first) => first.to_uppercase().collect::<String>() + chars.as_str(), } }) .collect::<Vec<_>>() .join(" ") }
async fn sample_column_values(&self, table: &str, column: &str) -> Result<Vec<String>> { let query = format!( "SELECT DISTINCT {} FROM {} WHERE {} IS NOT NULL LIMIT 5", column, table, column );
let rows = self.connection_pool.query(&query).await?;
Ok(rows .into_iter() .map(|row| row.get::<String, _>(0)) .collect()) }
pub fn to_prompt_string(&self, schema: &AugmentedSchema) -> String { let mut prompt = String::new();
prompt.push_str("DATABASE SCHEMA:\n\n");
for table in &schema.base_schema.tables { prompt.push_str(&format!("Table: {}\n", table.name));
if let Some(desc) = schema.table_descriptions.get(&table.name) { prompt.push_str(&format!("Description: {}\n", desc)); }
prompt.push_str("Columns:\n"); for column in &table.columns { let key = format!("{}.{}", table.name, column.name); prompt.push_str(&format!(" - {} ({})", column.name, column.data_type));
if let Some(desc) = schema.column_descriptions.get(&key) { prompt.push_str(&format!(" - {}", desc)); }
if let Some(examples) = schema.column_examples.get(&key) { prompt.push_str(&format!(" - Examples: {}", examples.join(", "))); }
prompt.push('\n'); }
prompt.push('\n'); }
prompt }}4. Code Templates
4.1 NL2SQL Generator Template
pub struct SqlGenerator { model_router: Arc<ModelRouter>, example_store: Arc<ExampleStore>, prompt_builder: Arc<PromptBuilder>,}
impl SqlGenerator { pub async fn generate( &self, query: &str, context: &ConversationContext, schema: &AugmentedSchema, dialect: SqlDialect, ) -> Result<String> { // 1. Find similar examples let examples = self.example_store.find_similar(query, 5).await?;
// 2. Build prompt let prompt = self.prompt_builder.build( query, context, schema, &examples, dialect, );
// 3. Generate SQL let model = self.model_router.select_model(&ModelRequirements::default()).await?; let sql = model.generate(&prompt, &GenerationConfig::default()).await?;
// 4. Extract SQL from response let sql = self.extract_sql(&sql);
Ok(sql) }
fn extract_sql(&self, response: &str) -> String { // Extract SQL from markdown code blocks if let Some(start) = response.find("```sql") { if let Some(end) = response[start + 6..].find("```") { return response[start + 6..start + 6 + end].trim().to_string(); } }
// Fallback: return entire response response.trim().to_string() }}4.2 Prompt Builder Template
pub struct PromptBuilder;
impl PromptBuilder { pub fn build( &self, query: &str, context: &ConversationContext, schema: &AugmentedSchema, examples: &[Example], dialect: SqlDialect, ) -> String { let mut prompt = String::new();
// System prompt prompt.push_str(SYSTEM_PROMPT); prompt.push_str(&format!("\nSQL DIALECT: {}\n\n", dialect));
// Schema prompt.push_str(&schema.to_prompt_string());
// Few-shot examples prompt.push_str("EXAMPLES:\n\n"); for example in examples { prompt.push_str(&format!( "Question: {}\nSQL: {}\n\n", example.natural_language, example.sql )); }
// Context if !context.turns.is_empty() { prompt.push_str("CONVERSATION HISTORY:\n"); for turn in context.turns.iter().rev().take(3).rev() { prompt.push_str(&format!( "User: {}\nSQL: {}\n\n", turn.user_query, turn.sql.as_deref().unwrap_or("(no SQL generated)") )); } }
// User query prompt.push_str(&format!("QUESTION: {}\n\n", query)); prompt.push_str("Generate SQL for this question:");
prompt }}
const SYSTEM_PROMPT: &str = r#"You are an expert SQL query generator.
RULES:1. Only use tables and columns from the schema2. Always qualify columns with table aliases3. Use appropriate JOINs (not subqueries when possible)4. Add LIMIT for large result sets5. Format dates correctly for the SQL dialect
IMPORTANT: Return only the SQL query, wrapped in ```sql code blocks."#;5. Testing Framework
5.1 Accuracy Test Template
use heliosdb_conversational_bi::*;
#[tokio::test]async fn test_bird_samples() { let engine = ConversationalBiEngine::new(Config::default()).await.unwrap();
let bird_samples = load_bird_samples();
let mut correct = 0; let total = bird_samples.len();
for sample in bird_samples { let session_id = engine.create_session(&sample.database).await.unwrap();
let response = engine.process_query(session_id, &sample.question).await.unwrap();
if let Some(generated_sql) = response.sql { // Execute both queries let generated_results = execute_sql(&generated_sql).await.unwrap(); let gold_results = execute_sql(&sample.gold_sql).await.unwrap();
if generated_results == gold_results { correct += 1; } } }
let accuracy = correct as f64 / total as f64; println!("BIRD Accuracy: {:.2}%", accuracy * 100.0);
assert!(accuracy >= 0.95, "Accuracy below 95% threshold");}
fn load_bird_samples() -> Vec<BirdSample> { // Load from tests/fixtures/bird_samples.csv let mut reader = csv::Reader::from_path("tests/fixtures/bird_samples.csv").unwrap();
reader .deserialize() .map(|result| result.unwrap()) .collect()}
#[derive(Debug, Deserialize)]struct BirdSample { database: String, question: String, gold_sql: String,}5.2 Multi-Turn Test Template
#[tokio::test]async fn test_reference_resolution() { let engine = ConversationalBiEngine::new(Config::default()).await.unwrap(); let session_id = engine.create_session("test_db").await.unwrap();
// Turn 1 let response1 = engine.process_query(session_id, "Show sales by region").await.unwrap(); assert!(response1.sql.is_some());
// Turn 2 (reference to previous query) let response2 = engine.process_query(session_id, "Which had the highest?").await.unwrap(); assert!(response2.sql.is_some()); assert!(response2.sql.unwrap().contains("ORDER BY") && response2.sql.unwrap().contains("DESC"));
// Turn 3 (reference to entity from turn 1) let response3 = engine.process_query(session_id, "Show that by month").await.unwrap(); assert!(response3.sql.is_some()); assert!(response3.sql.unwrap().contains("DATE_TRUNC"));}6. Integration Checklist
6.1 Pre-Implementation
- Review architecture document
- Set up development environment
- Create crate structure
- Add dependencies to Cargo.toml
- Configure LLM API keys (OpenAI, Anthropic, Cohere)
- Install Ollama and download models
- Set up test database
6.2 Phase 1 Checklist
- Implement SessionManager
- Implement ConversationContext
- Implement SchemaIntrospector
- Implement SchemaAugmenter
- Implement basic ModelRouter (OpenAI only)
- Write unit tests (>80% coverage)
- Write integration tests
- Test end-to-end single-turn queries
6.3 Phase 2 Checklist
- Implement QueryUnderstander
- Implement SqlGenerator
- Implement SqlValidator
- Implement SelfCorrectionEngine
- Implement ExampleStore
- Load BIRD/Spider datasets
- Achieve 85%+ accuracy on BIRD
- Support PostgreSQL, MySQL, Oracle dialects
6.4 Phase 3 Checklist
- Implement ContextTracker
- Implement ReferenceResolver
- Implement ClarificationEngine
- Test 10+ turn conversations
- Optimize context window usage
6.5 Phase 4 Checklist
- Implement ExplanationEngine
- Implement PlanExplainer
- Implement OptimizerSuggester
- Generate visual query plans
6.6 Phase 5 Checklist
- Implement query caching
- Optimize latency (<2s p50)
- Run BIRD benchmark (95%+ target)
- Run performance benchmarks (100+ QPS)
- Security testing
7. Deployment Guide
7.1 Configuration
conversational_bi: session: max_sessions_per_user: 10 idle_timeout_minutes: 30 max_turns: 20
models: primary: provider: openai model: gpt-4 api_key: ${OPENAI_API_KEY} max_tokens: 4096
fallback: provider: anthropic model: claude-3-sonnet-20240229 api_key: ${ANTHROPIC_API_KEY}
cache: enable_query_cache: true enable_schema_cache: true query_cache_size: 10000 query_cache_ttl_hours: 1
performance: max_concurrent_sessions: 1000 latency_target_ms: 20007.2 Monitoring
// Prometheus metricsuse prometheus::{Registry, Counter, Histogram};
pub struct ConversationalMetrics { pub query_total: Counter, pub query_latency: Histogram, pub cache_hits: Counter, pub cache_misses: Counter, pub validation_failures: Counter, pub accuracy: Histogram,}
impl ConversationalMetrics { pub fn new(registry: &Registry) -> Self { // Register metrics Self { query_total: Counter::new("conversational_bi_queries_total", "Total queries processed").unwrap(), query_latency: Histogram::new("conversational_bi_query_latency_seconds", "Query latency").unwrap(), // ... } }}7.3 Production Deployment
# Build releasecargo build --release --package heliosdb-conversational-bi
# Run benchmarks before deployingcargo bench --package heliosdb-conversational-bi
# Deploy with monitoring./deploy.sh --enable-monitoring --config config/conversational_bi.yamlDocument Status: Complete - Ready for Implementation Next Steps: Begin Phase 1 Week 1 implementation