Skip to content

Feature 05: Rate Limiting & Query Throttling

Feature 05: Rate Limiting & Query Throttling

Priority: High | Complexity: Medium | Phase: 2 (Resilience)


Overview

Problem Statement

Uncontrolled database access causes problems:

  • Runaway queries exhaust connection pools
  • Buggy applications create query storms
  • DoS attacks (intentional or accidental) degrade service
  • Heavy users starve other tenants in shared environments

Without rate limiting:

  • Primary becomes overloaded during traffic spikes
  • Standbys fall behind due to excessive read load
  • Critical queries blocked by bulk operations

Solution

Implement multi-dimensional rate limiting at the proxy layer:

┌─────────────────────────────────────────────────┐
│ RATE LIMITER │
│ │
Query ───────────►│ ┌──────────────────────────────────────────┐ │
│ │ 1. Identify Limiter Keys │ │
│ │ - User/Role │ │
│ │ - Client IP │ │
│ │ - Database │ │
│ │ - Query pattern │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 2. Check Rate Limits │ │
│ │ ┌─────────────────┐ │ │
│ │ │ Token Bucket │ Queries/sec │ │
│ │ ├─────────────────┤ │ │
│ │ │ Sliding Window │ Queries/minute │ │
│ │ ├─────────────────┤ │ │
│ │ │ Concurrency │ Active queries │ │
│ │ └─────────────────┘ │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ┌─────────────┴─────────────┐ │
│ ▼ ▼ │
│ ┌────────┐ ┌──────────────┐ │
│ │ ALLOW │ │ THROTTLE/DENY│ │
│ └────────┘ └──────────────┘ │
└─────────────────────────────────────────────────┘

Architecture

Rate Limiter Components

pub struct RateLimiter {
/// Token bucket limiters (burst + sustained rate)
token_buckets: DashMap<LimiterKey, TokenBucket>,
/// Sliding window limiters (rolling counts)
sliding_windows: DashMap<LimiterKey, SlidingWindow>,
/// Concurrency limiters (active query count)
concurrency: DashMap<LimiterKey, ConcurrencyLimiter>,
/// Configuration
config: RateLimitConfig,
/// Metrics collector
metrics: Arc<RateLimitMetrics>,
}
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum LimiterKey {
/// Per-user limits
User(String),
/// Per-client IP limits
ClientIp(IpAddr),
/// Per-database limits
Database(String),
/// Per-tenant limits (multi-tenancy)
Tenant(String),
/// Per-query-pattern limits
QueryPattern(String),
/// Composite key
Composite(Vec<LimiterKey>),
}
pub struct RateLimitConfig {
/// Default limits
pub default_qps: u32,
pub default_burst: u32,
pub default_concurrency: u32,
/// Per-key overrides
pub overrides: HashMap<LimiterKey, LimitOverride>,
/// Action on limit exceeded
pub exceeded_action: ExceededAction,
/// Retry-After header behavior
pub retry_after: bool,
}
#[derive(Debug, Clone)]
pub enum ExceededAction {
/// Return error immediately
Reject,
/// Queue and wait (up to timeout)
Queue { max_wait: Duration },
/// Throttle by delaying response
Throttle { delay: Duration },
/// Log warning but allow
Warn,
}

Token Bucket Implementation

pub struct TokenBucket {
/// Maximum tokens (burst capacity)
capacity: u32,
/// Current token count
tokens: AtomicU32,
/// Refill rate (tokens per second)
refill_rate: f64,
/// Last refill timestamp
last_refill: AtomicU64,
}
impl TokenBucket {
pub fn try_acquire(&self, tokens: u32) -> Result<(), RateLimitExceeded> {
self.refill();
let current = self.tokens.load(Ordering::SeqCst);
if current >= tokens {
self.tokens.fetch_sub(tokens, Ordering::SeqCst);
Ok(())
} else {
Err(RateLimitExceeded {
retry_after: self.time_until_available(tokens),
})
}
}
fn refill(&self) {
let now = Instant::now().elapsed().as_nanos() as u64;
let last = self.last_refill.load(Ordering::SeqCst);
let elapsed_secs = (now - last) as f64 / 1_000_000_000.0;
let new_tokens = (elapsed_secs * self.refill_rate) as u32;
if new_tokens > 0 {
let current = self.tokens.load(Ordering::SeqCst);
let updated = (current + new_tokens).min(self.capacity);
self.tokens.store(updated, Ordering::SeqCst);
self.last_refill.store(now, Ordering::SeqCst);
}
}
fn time_until_available(&self, tokens: u32) -> Duration {
let current = self.tokens.load(Ordering::SeqCst);
let needed = tokens.saturating_sub(current);
Duration::from_secs_f64(needed as f64 / self.refill_rate)
}
}

Concurrency Limiter

pub struct ConcurrencyLimiter {
/// Maximum concurrent queries
max_concurrent: u32,
/// Currently active queries
active: AtomicU32,
/// Waiting queue
waiters: Mutex<VecDeque<oneshot::Sender<()>>>,
}
impl ConcurrencyLimiter {
pub async fn acquire(&self) -> ConcurrencyGuard {
loop {
let current = self.active.load(Ordering::SeqCst);
if current < self.max_concurrent {
if self.active.compare_exchange(
current,
current + 1,
Ordering::SeqCst,
Ordering::SeqCst
).is_ok() {
return ConcurrencyGuard { limiter: self };
}
} else {
// Wait for slot
let (tx, rx) = oneshot::channel();
self.waiters.lock().push_back(tx);
let _ = rx.await;
}
}
}
fn release(&self) {
self.active.fetch_sub(1, Ordering::SeqCst);
// Wake up next waiter
if let Some(waiter) = self.waiters.lock().pop_front() {
let _ = waiter.send(());
}
}
}

Query Cost Estimation

pub struct QueryCostEstimator {
/// Base cost per query
base_cost: u32,
/// Cost multipliers by operation type
operation_costs: HashMap<OperationType, f32>,
}
impl QueryCostEstimator {
pub fn estimate_cost(&self, query: &str) -> u32 {
let op_type = self.detect_operation(query);
let multiplier = self.operation_costs.get(&op_type).unwrap_or(&1.0);
let base = match op_type {
OperationType::Select => 1,
OperationType::Insert => 2,
OperationType::Update => 3,
OperationType::Delete => 3,
OperationType::DDL => 10,
OperationType::FullTableScan => 5,
OperationType::VectorSearch => 3,
};
(base as f32 * multiplier) as u32
}
}

API Specification

Configuration (heliosproxy.toml)

[rate_limit]
enabled = true
# Default limits (apply to all)
[rate_limit.default]
qps = 1000 # Queries per second
burst = 2000 # Burst capacity
max_concurrent = 100 # Concurrent queries
exceeded_action = "reject" # reject, queue, throttle, warn
# Per-user limits
[rate_limit.users.admin]
qps = 10000
burst = 20000
max_concurrent = 500
[rate_limit.users.readonly_user]
qps = 100
max_concurrent = 10
# Per-database limits
[rate_limit.databases.analytics]
qps = 500
max_concurrent = 50
# Per-client-IP limits
[rate_limit.client_ips]
enabled = true
qps = 100
burst = 200
# Per-query-pattern limits
[rate_limit.patterns]
"SELECT.*FROM users.*" = { qps = 50 }
"INSERT INTO logs.*" = { qps = 1000 }
# Queue settings (when exceeded_action = "queue")
[rate_limit.queue]
max_wait = "5s"
queue_size = 1000

SQL Hints

-- Request high priority (bypass some limits)
/*helios:priority=high*/
SELECT * FROM critical_alerts;
-- Request low priority (accept more throttling)
/*helios:priority=low*/
SELECT COUNT(*) FROM large_table;
-- Specify custom cost for rate limiting
/*helios:cost=10*/
SELECT * FROM complex_view;

Admin API

GET /rate-limit/status
{
"global": {
"qps_limit": 10000,
"current_qps": 3456,
"rejected_last_minute": 12,
"queued_current": 5
},
"by_user": {
"admin": { "qps_limit": 10000, "current": 1234, "rejected": 0 },
"app_user": { "qps_limit": 1000, "current": 456, "rejected": 5 }
},
"by_database": {
"heliosdb": { "qps_limit": 5000, "current": 2000, "rejected": 0 },
"analytics": { "qps_limit": 500, "current": 480, "rejected": 10 }
}
}
POST /rate-limit/override
# Temporarily adjust limits
{
"user": "batch_job",
"qps": 5000,
"duration": "1h"
}
POST /rate-limit/reset
# Reset limit counters
{ "key": "user:app_user" }
GET /rate-limit/history?duration=1h
# Rate limit event history

AI/Agent Innovations

1. Agent Token Budget

AI agents get token budgets instead of simple rate limits:

pub struct AgentTokenBudget {
/// Daily token allocation
daily_tokens: u64,
/// Used tokens today
used_tokens: AtomicU64,
/// Token cost per operation
operation_costs: HashMap<String, u64>,
}
impl AgentTokenBudget {
pub fn consume(&self, operation: &str, estimated_tokens: u64) -> Result<(), BudgetExceeded> {
let cost = self.operation_costs.get(operation).unwrap_or(&1);
let total_cost = cost * estimated_tokens;
let used = self.used_tokens.fetch_add(total_cost, Ordering::SeqCst);
if used + total_cost > self.daily_tokens {
self.used_tokens.fetch_sub(total_cost, Ordering::SeqCst);
return Err(BudgetExceeded { remaining: self.daily_tokens - used });
}
Ok(())
}
}

2. RAG Pipeline Fair Scheduling

Ensure embedding queries don’t starve other operations:

[rate_limit.workloads]
# RAG embedding queries
rag_embedding.max_concurrent = 20
rag_embedding.qps = 500
# RAG retrieval
rag_retrieval.max_concurrent = 50
rag_retrieval.qps = 1000
# Regular queries
default.max_concurrent = 100

3. Agentic Workflow Quotas

Track and limit agent workflow executions:

pub struct WorkflowQuota {
/// Max workflows per hour
hourly_limit: u32,
/// Max steps per workflow
max_steps: u32,
/// Current hour's workflows
hourly_count: AtomicU32,
}
impl WorkflowQuota {
pub fn begin_workflow(&self) -> Result<WorkflowToken, QuotaExceeded> {
let count = self.hourly_count.fetch_add(1, Ordering::SeqCst);
if count >= self.hourly_limit {
self.hourly_count.fetch_sub(1, Ordering::SeqCst);
return Err(QuotaExceeded::HourlyLimit);
}
Ok(WorkflowToken {
remaining_steps: AtomicU32::new(self.max_steps),
})
}
}

4. LLM-Friendly Error Messages

Return structured errors that LLMs can understand:

{
"error": "rate_limit_exceeded",
"message": "Query rate limit exceeded for user 'agent_1'",
"details": {
"limit_type": "qps",
"current_rate": 150,
"limit": 100,
"retry_after_seconds": 5
},
"suggestion": "Reduce query frequency or request a higher rate limit",
"documentation_url": "https://docs.heliosdb.io/rate-limits"
}

HeliosDB-Lite Integration

1. Replication-Aware Throttling

Throttle writes when standbys are lagging:

impl RateLimiter {
pub fn check_replication_pressure(&self, lag_monitor: &LagMonitor) -> Option<Duration> {
let max_lag = lag_monitor.get_max_standby_lag();
if max_lag > self.config.replication_throttle_threshold {
// Slow down writes to let standbys catch up
let slowdown = (max_lag.as_millis() as f64 / 1000.0).min(5.0);
return Some(Duration::from_secs_f64(slowdown * 0.1));
}
None
}
}

2. Sync Mode Cost Accounting

Different sync modes have different costs:

impl QueryCostEstimator {
pub fn estimate_write_cost(&self, sync_mode: SyncMode) -> u32 {
match sync_mode {
SyncMode::Sync => 5, // Waits for standby ACK
SyncMode::SemiSync => 3, // Bounded wait
SyncMode::Async => 1, // Fire and forget
}
}
}

3. Branch-Aware Limits

Separate limits per branch:

[rate_limit.branches]
main.qps = 5000
development.qps = 10000 # Allow more in dev
analytics.qps = 500 # Limit heavy analytics

4. Vector Search Throttling

Limit expensive vector operations:

impl RateLimiter {
pub fn check_vector_limit(&self, query: &str) -> Result<(), RateLimitExceeded> {
if self.is_vector_query(query) {
let key = LimiterKey::QueryPattern("vector_search".to_string());
self.token_buckets.get(&key)?.try_acquire(1)?;
}
Ok(())
}
}

Implementation Notes

File Locations

src/proxy/
├── rate_limit/
│ ├── mod.rs # Public API
│ ├── limiter.rs # RateLimiter implementation
│ ├── token_bucket.rs # Token bucket algorithm
│ ├── sliding_window.rs # Sliding window algorithm
│ ├── concurrency.rs # Concurrency limiter
│ ├── cost_estimator.rs # Query cost estimation
│ └── metrics.rs # Rate limit metrics

Key Considerations

  1. Distributed Rate Limiting: For multi-proxy deployments, use Redis or distributed counter for global limits.

  2. Fair Queuing: Implement weighted fair queuing when throttling to avoid starvation.

  3. Metrics: Expose rate limit counters, rejection rates, queue depths.

  4. Graceful Degradation: Prefer throttling over hard rejection when possible.

  5. Hot Reload: Allow limit changes without proxy restart.

Rate Limit Response Headers

HTTP/1.1 429 Too Many Requests
X-RateLimit-Limit: 1000
X-RateLimit-Remaining: 0
X-RateLimit-Reset: 1706188800
Retry-After: 5
Content-Type: application/json
{
"error": "rate_limit_exceeded",
"retry_after": 5
}

Performance Targets

MetricTargetMeasurement
Limit check latency<5μsp99
Token bucket refill<1μsper operation
Concurrency acquire<10μsp99 (no contention)
Queue wait overhead<1msp99 (when queuing)