Feature 05: Rate Limiting & Query Throttling
Feature 05: Rate Limiting & Query Throttling
Priority: High | Complexity: Medium | Phase: 2 (Resilience)
Overview
Problem Statement
Uncontrolled database access causes problems:
- Runaway queries exhaust connection pools
- Buggy applications create query storms
- DoS attacks (intentional or accidental) degrade service
- Heavy users starve other tenants in shared environments
Without rate limiting:
- Primary becomes overloaded during traffic spikes
- Standbys fall behind due to excessive read load
- Critical queries blocked by bulk operations
Solution
Implement multi-dimensional rate limiting at the proxy layer:
┌─────────────────────────────────────────────────┐ │ RATE LIMITER │ │ │ Query ───────────►│ ┌──────────────────────────────────────────┐ │ │ │ 1. Identify Limiter Keys │ │ │ │ - User/Role │ │ │ │ - Client IP │ │ │ │ - Database │ │ │ │ - Query pattern │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 2. Check Rate Limits │ │ │ │ ┌─────────────────┐ │ │ │ │ │ Token Bucket │ Queries/sec │ │ │ │ ├─────────────────┤ │ │ │ │ │ Sliding Window │ Queries/minute │ │ │ │ ├─────────────────┤ │ │ │ │ │ Concurrency │ Active queries │ │ │ │ └─────────────────┘ │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ┌─────────────┴─────────────┐ │ │ ▼ ▼ │ │ ┌────────┐ ┌──────────────┐ │ │ │ ALLOW │ │ THROTTLE/DENY│ │ │ └────────┘ └──────────────┘ │ └─────────────────────────────────────────────────┘Architecture
Rate Limiter Components
pub struct RateLimiter { /// Token bucket limiters (burst + sustained rate) token_buckets: DashMap<LimiterKey, TokenBucket>,
/// Sliding window limiters (rolling counts) sliding_windows: DashMap<LimiterKey, SlidingWindow>,
/// Concurrency limiters (active query count) concurrency: DashMap<LimiterKey, ConcurrencyLimiter>,
/// Configuration config: RateLimitConfig,
/// Metrics collector metrics: Arc<RateLimitMetrics>,}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]pub enum LimiterKey { /// Per-user limits User(String),
/// Per-client IP limits ClientIp(IpAddr),
/// Per-database limits Database(String),
/// Per-tenant limits (multi-tenancy) Tenant(String),
/// Per-query-pattern limits QueryPattern(String),
/// Composite key Composite(Vec<LimiterKey>),}
pub struct RateLimitConfig { /// Default limits pub default_qps: u32, pub default_burst: u32, pub default_concurrency: u32,
/// Per-key overrides pub overrides: HashMap<LimiterKey, LimitOverride>,
/// Action on limit exceeded pub exceeded_action: ExceededAction,
/// Retry-After header behavior pub retry_after: bool,}
#[derive(Debug, Clone)]pub enum ExceededAction { /// Return error immediately Reject,
/// Queue and wait (up to timeout) Queue { max_wait: Duration },
/// Throttle by delaying response Throttle { delay: Duration },
/// Log warning but allow Warn,}Token Bucket Implementation
pub struct TokenBucket { /// Maximum tokens (burst capacity) capacity: u32,
/// Current token count tokens: AtomicU32,
/// Refill rate (tokens per second) refill_rate: f64,
/// Last refill timestamp last_refill: AtomicU64,}
impl TokenBucket { pub fn try_acquire(&self, tokens: u32) -> Result<(), RateLimitExceeded> { self.refill();
let current = self.tokens.load(Ordering::SeqCst); if current >= tokens { self.tokens.fetch_sub(tokens, Ordering::SeqCst); Ok(()) } else { Err(RateLimitExceeded { retry_after: self.time_until_available(tokens), }) } }
fn refill(&self) { let now = Instant::now().elapsed().as_nanos() as u64; let last = self.last_refill.load(Ordering::SeqCst); let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
let new_tokens = (elapsed_secs * self.refill_rate) as u32; if new_tokens > 0 { let current = self.tokens.load(Ordering::SeqCst); let updated = (current + new_tokens).min(self.capacity); self.tokens.store(updated, Ordering::SeqCst); self.last_refill.store(now, Ordering::SeqCst); } }
fn time_until_available(&self, tokens: u32) -> Duration { let current = self.tokens.load(Ordering::SeqCst); let needed = tokens.saturating_sub(current); Duration::from_secs_f64(needed as f64 / self.refill_rate) }}Concurrency Limiter
pub struct ConcurrencyLimiter { /// Maximum concurrent queries max_concurrent: u32,
/// Currently active queries active: AtomicU32,
/// Waiting queue waiters: Mutex<VecDeque<oneshot::Sender<()>>>,}
impl ConcurrencyLimiter { pub async fn acquire(&self) -> ConcurrencyGuard { loop { let current = self.active.load(Ordering::SeqCst); if current < self.max_concurrent { if self.active.compare_exchange( current, current + 1, Ordering::SeqCst, Ordering::SeqCst ).is_ok() { return ConcurrencyGuard { limiter: self }; } } else { // Wait for slot let (tx, rx) = oneshot::channel(); self.waiters.lock().push_back(tx); let _ = rx.await; } } }
fn release(&self) { self.active.fetch_sub(1, Ordering::SeqCst);
// Wake up next waiter if let Some(waiter) = self.waiters.lock().pop_front() { let _ = waiter.send(()); } }}Query Cost Estimation
pub struct QueryCostEstimator { /// Base cost per query base_cost: u32,
/// Cost multipliers by operation type operation_costs: HashMap<OperationType, f32>,}
impl QueryCostEstimator { pub fn estimate_cost(&self, query: &str) -> u32 { let op_type = self.detect_operation(query); let multiplier = self.operation_costs.get(&op_type).unwrap_or(&1.0);
let base = match op_type { OperationType::Select => 1, OperationType::Insert => 2, OperationType::Update => 3, OperationType::Delete => 3, OperationType::DDL => 10, OperationType::FullTableScan => 5, OperationType::VectorSearch => 3, };
(base as f32 * multiplier) as u32 }}API Specification
Configuration (heliosproxy.toml)
[rate_limit]enabled = true
# Default limits (apply to all)[rate_limit.default]qps = 1000 # Queries per secondburst = 2000 # Burst capacitymax_concurrent = 100 # Concurrent queriesexceeded_action = "reject" # reject, queue, throttle, warn
# Per-user limits[rate_limit.users.admin]qps = 10000burst = 20000max_concurrent = 500
[rate_limit.users.readonly_user]qps = 100max_concurrent = 10
# Per-database limits[rate_limit.databases.analytics]qps = 500max_concurrent = 50
# Per-client-IP limits[rate_limit.client_ips]enabled = trueqps = 100burst = 200
# Per-query-pattern limits[rate_limit.patterns]"SELECT.*FROM users.*" = { qps = 50 }"INSERT INTO logs.*" = { qps = 1000 }
# Queue settings (when exceeded_action = "queue")[rate_limit.queue]max_wait = "5s"queue_size = 1000SQL Hints
-- Request high priority (bypass some limits)/*helios:priority=high*/SELECT * FROM critical_alerts;
-- Request low priority (accept more throttling)/*helios:priority=low*/SELECT COUNT(*) FROM large_table;
-- Specify custom cost for rate limiting/*helios:cost=10*/SELECT * FROM complex_view;Admin API
GET /rate-limit/status{ "global": { "qps_limit": 10000, "current_qps": 3456, "rejected_last_minute": 12, "queued_current": 5 }, "by_user": { "admin": { "qps_limit": 10000, "current": 1234, "rejected": 0 }, "app_user": { "qps_limit": 1000, "current": 456, "rejected": 5 } }, "by_database": { "heliosdb": { "qps_limit": 5000, "current": 2000, "rejected": 0 }, "analytics": { "qps_limit": 500, "current": 480, "rejected": 10 } }}
POST /rate-limit/override# Temporarily adjust limits{ "user": "batch_job", "qps": 5000, "duration": "1h"}
POST /rate-limit/reset# Reset limit counters{ "key": "user:app_user" }
GET /rate-limit/history?duration=1h# Rate limit event historyAI/Agent Innovations
1. Agent Token Budget
AI agents get token budgets instead of simple rate limits:
pub struct AgentTokenBudget { /// Daily token allocation daily_tokens: u64,
/// Used tokens today used_tokens: AtomicU64,
/// Token cost per operation operation_costs: HashMap<String, u64>,}
impl AgentTokenBudget { pub fn consume(&self, operation: &str, estimated_tokens: u64) -> Result<(), BudgetExceeded> { let cost = self.operation_costs.get(operation).unwrap_or(&1); let total_cost = cost * estimated_tokens;
let used = self.used_tokens.fetch_add(total_cost, Ordering::SeqCst); if used + total_cost > self.daily_tokens { self.used_tokens.fetch_sub(total_cost, Ordering::SeqCst); return Err(BudgetExceeded { remaining: self.daily_tokens - used }); }
Ok(()) }}2. RAG Pipeline Fair Scheduling
Ensure embedding queries don’t starve other operations:
[rate_limit.workloads]# RAG embedding queriesrag_embedding.max_concurrent = 20rag_embedding.qps = 500
# RAG retrievalrag_retrieval.max_concurrent = 50rag_retrieval.qps = 1000
# Regular queriesdefault.max_concurrent = 1003. Agentic Workflow Quotas
Track and limit agent workflow executions:
pub struct WorkflowQuota { /// Max workflows per hour hourly_limit: u32,
/// Max steps per workflow max_steps: u32,
/// Current hour's workflows hourly_count: AtomicU32,}
impl WorkflowQuota { pub fn begin_workflow(&self) -> Result<WorkflowToken, QuotaExceeded> { let count = self.hourly_count.fetch_add(1, Ordering::SeqCst); if count >= self.hourly_limit { self.hourly_count.fetch_sub(1, Ordering::SeqCst); return Err(QuotaExceeded::HourlyLimit); }
Ok(WorkflowToken { remaining_steps: AtomicU32::new(self.max_steps), }) }}4. LLM-Friendly Error Messages
Return structured errors that LLMs can understand:
{ "error": "rate_limit_exceeded", "message": "Query rate limit exceeded for user 'agent_1'", "details": { "limit_type": "qps", "current_rate": 150, "limit": 100, "retry_after_seconds": 5 }, "suggestion": "Reduce query frequency or request a higher rate limit", "documentation_url": "https://docs.heliosdb.io/rate-limits"}HeliosDB-Lite Integration
1. Replication-Aware Throttling
Throttle writes when standbys are lagging:
impl RateLimiter { pub fn check_replication_pressure(&self, lag_monitor: &LagMonitor) -> Option<Duration> { let max_lag = lag_monitor.get_max_standby_lag();
if max_lag > self.config.replication_throttle_threshold { // Slow down writes to let standbys catch up let slowdown = (max_lag.as_millis() as f64 / 1000.0).min(5.0); return Some(Duration::from_secs_f64(slowdown * 0.1)); }
None }}2. Sync Mode Cost Accounting
Different sync modes have different costs:
impl QueryCostEstimator { pub fn estimate_write_cost(&self, sync_mode: SyncMode) -> u32 { match sync_mode { SyncMode::Sync => 5, // Waits for standby ACK SyncMode::SemiSync => 3, // Bounded wait SyncMode::Async => 1, // Fire and forget } }}3. Branch-Aware Limits
Separate limits per branch:
[rate_limit.branches]main.qps = 5000development.qps = 10000 # Allow more in devanalytics.qps = 500 # Limit heavy analytics4. Vector Search Throttling
Limit expensive vector operations:
impl RateLimiter { pub fn check_vector_limit(&self, query: &str) -> Result<(), RateLimitExceeded> { if self.is_vector_query(query) { let key = LimiterKey::QueryPattern("vector_search".to_string()); self.token_buckets.get(&key)?.try_acquire(1)?; } Ok(()) }}Implementation Notes
File Locations
src/proxy/├── rate_limit/│ ├── mod.rs # Public API│ ├── limiter.rs # RateLimiter implementation│ ├── token_bucket.rs # Token bucket algorithm│ ├── sliding_window.rs # Sliding window algorithm│ ├── concurrency.rs # Concurrency limiter│ ├── cost_estimator.rs # Query cost estimation│ └── metrics.rs # Rate limit metricsKey Considerations
-
Distributed Rate Limiting: For multi-proxy deployments, use Redis or distributed counter for global limits.
-
Fair Queuing: Implement weighted fair queuing when throttling to avoid starvation.
-
Metrics: Expose rate limit counters, rejection rates, queue depths.
-
Graceful Degradation: Prefer throttling over hard rejection when possible.
-
Hot Reload: Allow limit changes without proxy restart.
Rate Limit Response Headers
HTTP/1.1 429 Too Many RequestsX-RateLimit-Limit: 1000X-RateLimit-Remaining: 0X-RateLimit-Reset: 1706188800Retry-After: 5Content-Type: application/json
{ "error": "rate_limit_exceeded", "retry_after": 5}Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| Limit check latency | <5μs | p99 |
| Token bucket refill | <1μs | per operation |
| Concurrency acquire | <10μs | p99 (no contention) |
| Queue wait overhead | <1ms | p99 (when queuing) |
Related Features
- Circuit Breaker - Handle backend failures
- Multi-Tenancy - Tenant-specific limits
- Query Analytics - Identify heavy queries