Feature 09: Authentication Proxy
Feature 09: Authentication Proxy
Priority: Medium | Complexity: High | Phase: 3 (Enterprise)
Overview
Problem Statement
Database authentication has limitations:
- PostgreSQL auth is database-centric, not application-centric
- No native support for modern auth (OAuth, OIDC, SAML)
- Connection-string credentials are hard to rotate
- No fine-grained access control (tables, rows, columns)
Applications need:
- Centralized identity management
- Token-based authentication
- Dynamic credential rotation
- Fine-grained authorization
Solution
Implement an authentication proxy layer that bridges modern identity systems with database access:
┌─────────────────────────────────────────────────┐ │ AUTHENTICATION PROXY │ │ │ Client ─────────►│ ┌──────────────────────────────────────────┐ │ (JWT/OAuth) │ │ 1. Extract & Validate Token │ │ │ │ - JWT validation │ │ │ │ - OAuth token introspection │ │ │ │ - OIDC userinfo │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 2. Map Identity to DB Credentials │ │ │ │ - Role lookup │ │ │ │ - Permission resolution │ │ │ │ - Credential selection │ │ │ └──────────────────────────────────────────┘ │ │ │ │ │ ▼ │ │ ┌──────────────────────────────────────────┐ │ │ │ 3. Connect to Backend │ │ │ │ - Use service account │ │ │ │ - Set session variables │ │ │ │ - Apply RLS context │ │ │ └──────────────────────────────────────────┘ │ └─────────────────────────────────────────────────┘ │ ▼ ┌──────────────┐ │ Backend │ │ (single SA) │ └──────────────┘Architecture
Authentication Handler
pub struct AuthenticationHandler { /// JWT validator jwt_validator: Option<JwtValidator>,
/// OAuth introspection client oauth_client: Option<OAuthClient>,
/// OIDC provider oidc_provider: Option<OidcProvider>,
/// LDAP connector ldap_connector: Option<LdapConnector>,
/// Identity to role mapper role_mapper: Arc<RoleMapper>,
/// Backend credential provider credential_provider: Arc<CredentialProvider>,}
impl AuthenticationHandler { pub async fn authenticate(&self, request: &AuthRequest) -> Result<AuthResult, AuthError> { // 1. Extract credentials from request let identity = self.extract_identity(request).await?;
// 2. Validate identity let validated = self.validate_identity(&identity).await?;
// 3. Map to database role let db_role = self.role_mapper.map(&validated)?;
// 4. Get backend credentials let credentials = self.credential_provider.get_for_role(&db_role)?;
Ok(AuthResult { identity: validated, db_role, credentials, session_vars: self.build_session_vars(&validated), }) }
async fn extract_identity(&self, request: &AuthRequest) -> Result<Identity, AuthError> { match &request.auth_type { AuthType::Jwt(token) => { let claims = self.jwt_validator.as_ref() .ok_or(AuthError::UnsupportedAuthType)? .validate(token)?;
Ok(Identity::from_jwt_claims(claims)) } AuthType::OAuth(access_token) => { let userinfo = self.oauth_client.as_ref() .ok_or(AuthError::UnsupportedAuthType)? .introspect(access_token).await?;
Ok(Identity::from_oauth(userinfo)) } AuthType::Basic(username, password) => { // Pass through to backend or LDAP if let Some(ldap) = &self.ldap_connector { ldap.authenticate(username, password).await?; } Ok(Identity::basic(username.clone())) } AuthType::ApiKey(key) => { let identity = self.api_key_store.lookup(key)?; Ok(identity) } } }}JWT Validator
pub struct JwtValidator { /// JWKS (JSON Web Key Set) for signature verification jwks: Arc<RwLock<Jwks>>,
/// Issuer whitelist allowed_issuers: HashSet<String>,
/// Audience requirement required_audience: Option<String>,
/// Clock skew tolerance clock_skew: Duration,}
impl JwtValidator { pub fn validate(&self, token: &str) -> Result<JwtClaims, JwtError> { let header = self.decode_header(token)?; let key = self.jwks.read().get_key(&header.kid)?;
let claims = self.verify_signature(token, &key)?;
// Validate standard claims self.validate_exp(&claims)?; self.validate_iat(&claims)?; self.validate_iss(&claims)?; self.validate_aud(&claims)?;
Ok(claims) }
/// Background task to refresh JWKS pub async fn start_jwks_refresh(&self, interval: Duration) { loop { if let Err(e) = self.refresh_jwks().await { warn!("Failed to refresh JWKS: {}", e); } tokio::time::sleep(interval).await; } }}Role Mapper
pub struct RoleMapper { /// Rules for mapping identity to database role rules: Vec<RoleMappingRule>,
/// Default role if no rules match default_role: Option<String>,}
#[derive(Debug)]pub struct RoleMappingRule { /// Condition to match pub condition: RoleCondition,
/// Database role to assign pub db_role: String,
/// Priority (higher = evaluated first) pub priority: i32,}
#[derive(Debug)]pub enum RoleCondition { /// Match JWT claim JwtClaim { name: String, value: String },
/// Match OAuth scope OAuthScope(String),
/// Match group membership Group(String),
/// Match email domain EmailDomain(String),
/// Always match Always,}
impl RoleMapper { pub fn map(&self, identity: &Identity) -> Result<String, RoleMappingError> { for rule in self.rules.iter().sorted_by_key(|r| -r.priority) { if self.matches(&rule.condition, identity) { return Ok(rule.db_role.clone()); } }
self.default_role.clone() .ok_or(RoleMappingError::NoMatchingRole) }}Credential Provider
pub struct CredentialProvider { /// Static credentials static_creds: HashMap<String, Credentials>,
/// Vault integration vault_client: Option<VaultClient>,
/// AWS Secrets Manager aws_secrets: Option<AwsSecretsClient>,
/// Credential cache cache: Cache<String, Credentials>,}
impl CredentialProvider { pub async fn get_for_role(&self, role: &str) -> Result<Credentials, CredentialError> { // Check cache first if let Some(creds) = self.cache.get(role) { return Ok(creds); }
// Try Vault if let Some(vault) = &self.vault_client { if let Ok(creds) = vault.get_database_credentials(role).await { self.cache.insert(role.to_string(), creds.clone(), creds.ttl); return Ok(creds); } }
// Try AWS Secrets Manager if let Some(aws) = &self.aws_secrets { if let Ok(creds) = aws.get_secret(&format!("db/{}", role)).await { self.cache.insert(role.to_string(), creds.clone(), Duration::from_secs(300)); return Ok(creds); } }
// Fall back to static self.static_creds.get(role) .cloned() .ok_or(CredentialError::NotFound(role.to_string())) }}API Specification
Configuration (heliosproxy.toml)
[auth]enabled = true
# JWT authentication[auth.jwt]enabled = truejwks_url = "https://auth.example.com/.well-known/jwks.json"jwks_refresh_interval = "1h"issuer = "https://auth.example.com"audience = "heliosdb-api"clock_skew = "60s"
# OAuth introspection[auth.oauth]enabled = trueintrospection_url = "https://auth.example.com/oauth/introspect"client_id = "heliosproxy"client_secret = "${OAUTH_CLIENT_SECRET}"
# LDAP authentication[auth.ldap]enabled = falseserver = "ldaps://ldap.example.com:636"bind_dn = "cn=proxy,dc=example,dc=com"bind_password = "${LDAP_BIND_PASSWORD}"search_base = "ou=users,dc=example,dc=com"user_filter = "(uid={0})"group_attribute = "memberOf"
# API Key authentication[auth.api_keys]enabled = trueheader_name = "X-API-Key"
# Role mapping rules[[auth.role_mapping]]condition = { jwt_claim = { name = "role", value = "admin" } }db_role = "db_admin"priority = 100
[[auth.role_mapping]]condition = { group = "developers" }db_role = "db_readwrite"priority = 50
[[auth.role_mapping]]condition = { email_domain = "example.com" }db_role = "db_readonly"priority = 10
[[auth.role_mapping]]condition = "always"db_role = "db_minimal"priority = 0
# Credential providers[auth.credentials]default_provider = "vault"
[auth.credentials.vault]address = "https://vault.example.com"auth_method = "kubernetes"role = "heliosproxy"secret_path = "database/creds"
[auth.credentials.static]db_readonly = { username = "readonly", password = "${DB_READONLY_PASSWORD}" }db_minimal = { username = "minimal", password = "${DB_MINIMAL_PASSWORD}" }
# Session variables to set after auth[auth.session_vars]"app.current_user" = "{identity.user_id}""app.tenant_id" = "{identity.tenant_id}""app.roles" = "{identity.roles}"Admin API
GET /auth/status{ "jwt": { "enabled": true, "jwks_last_refresh": "2026-01-25T10:00:00Z" }, "oauth": { "enabled": true, "introspection_available": true }, "ldap": { "enabled": false }}
GET /auth/sessions{ "active_sessions": 45, "sessions": [ { "session_id": "sess_123", "identity": "user@example.com", "db_role": "db_readwrite", "connected_at": "2026-01-25T10:30:00Z", "last_query": "2026-01-25T10:35:00Z" } ]}
POST /auth/api-keys# Create API key{ "name": "CI Pipeline", "db_role": "db_readwrite", "expires_at": "2027-01-25T00:00:00Z"}
Response:{ "key_id": "key_abc123", "api_key": "hpk_xxxxxxxxxxxx", // Only shown once "name": "CI Pipeline"}
DELETE /auth/api-keys/{key_id}# Revoke API key
POST /auth/invalidate# Invalidate cached credentials{ "role": "db_readwrite" }AI/Agent Innovations
1. Agent Identity Tokens
Issue short-lived tokens for AI agents:
pub struct AgentTokenIssuer { signer: Signer, config: AgentTokenConfig,}
impl AgentTokenIssuer { pub fn issue_agent_token(&self, agent: &AgentIdentity) -> String { let claims = JwtClaims { sub: agent.id.clone(), iss: "heliosproxy".to_string(), exp: now() + self.config.token_ttl, agent_type: Some(agent.agent_type.clone()), tool_permissions: Some(agent.allowed_tools.clone()), resource_quota: Some(agent.quota.clone()), };
self.signer.sign(claims) }}2. Tool-Based Authorization
Authorize based on AI tool usage:
[auth.agent_tools]# Tool-specific permissionsknowledge_search = { db_role = "db_readonly", tables = ["documents", "embeddings"] }user_lookup = { db_role = "db_readonly", tables = ["users"] }create_order = { db_role = "db_readwrite", tables = ["orders", "order_items"] }pub struct ToolAuthorizer { tool_permissions: HashMap<String, ToolPermission>,}
impl ToolAuthorizer { pub fn authorize_tool(&self, identity: &Identity, tool: &str) -> Result<DbRole, AuthError> { let permission = self.tool_permissions.get(tool) .ok_or(AuthError::UnknownTool(tool.to_string()))?;
// Check if identity has permission for this tool if !identity.can_use_tool(tool) { return Err(AuthError::ToolNotAllowed(tool.to_string())); }
Ok(permission.db_role.clone()) }}3. Conversation-Scoped Tokens
Tokens valid only within a conversation:
pub struct ConversationScopedToken { pub token: String, pub conversation_id: String, pub expires_at: DateTime<Utc>,}
impl AuthenticationHandler { pub fn validate_conversation_token( &self, token: &str, conversation_id: &str, ) -> Result<Identity, AuthError> { let claims = self.jwt_validator.validate(token)?;
// Verify conversation scope if claims.conversation_id != Some(conversation_id.to_string()) { return Err(AuthError::InvalidConversationScope); }
Ok(Identity::from_jwt_claims(claims)) }}4. LLM-Safe Credential Handling
Never expose credentials to LLM context:
pub struct SecureCredentialInjector { /// Credential store (never logged or returned) credentials: SecureStore,}
impl SecureCredentialInjector { /// Inject credentials without exposing them pub fn connect_securely(&self, role: &str) -> Result<Connection, Error> { // Credentials fetched internally, never returned let creds = self.credentials.get(role)?;
// Connect with credentials (not logged) let conn = Connection::new_with_credentials(&creds);
// Clear credentials from memory creds.zeroize();
Ok(conn) }}HeliosDB-Lite Integration
1. Branch-Based Authorization
Control branch access per identity:
[auth.branches]# Who can access which branchesmain = { roles = ["db_admin", "db_readwrite"] }development = { roles = ["db_admin", "db_readwrite", "db_developer"] }analytics = { roles = ["db_admin", "db_analyst"] }impl BranchAuthorizer { pub fn can_access_branch(&self, identity: &Identity, branch: &str) -> bool { if let Some(allowed_roles) = self.branch_permissions.get(branch) { return identity.roles.iter().any(|r| allowed_roles.contains(r)); } false }}2. Sync Mode Authorization
Control who can use sync writes:
[auth.sync_modes]# Sync mode permissionssync = { roles = ["db_admin", "db_critical"] }semisync = { roles = ["db_admin", "db_critical", "db_readwrite"] }async = { roles = ["*"] } # Everyone3. Time-Travel Authorization
Control historical data access:
impl TimeTravelAuthorizer { pub fn can_time_travel(&self, identity: &Identity, as_of: DateTime<Utc>) -> bool { // Some identities may only see recent history if let Some(max_lookback) = identity.max_time_travel_lookback { let min_allowed = Utc::now() - max_lookback; return as_of >= min_allowed; }
// Admins can access all history identity.is_admin() }}4. Replication Auth Context
Pass identity through replication:
pub struct ReplicationAuthContext { pub original_identity: String, pub auth_method: String, pub auth_timestamp: DateTime<Utc>,}
impl WalReplicator { pub fn replicate_with_context(&self, entry: WalEntry, auth: &ReplicationAuthContext) { let enriched = entry.with_auth_context(auth); self.send_to_standbys(enriched); }}Implementation Notes
File Locations
src/proxy/├── auth/│ ├── mod.rs # Public API│ ├── handler.rs # AuthenticationHandler│ ├── jwt.rs # JwtValidator│ ├── oauth.rs # OAuthClient│ ├── ldap.rs # LdapConnector│ ├── api_keys.rs # API key management│ ├── role_mapper.rs # RoleMapper│ ├── credentials.rs # CredentialProvider│ └── session.rs # Session managementKey Considerations
-
Token Caching: Cache validated tokens to avoid repeated validation.
-
Credential Rotation: Support dynamic credential rotation from Vault/secrets managers.
-
Audit Logging: Log all authentication events for compliance.
-
Rate Limiting Auth: Prevent brute-force attacks on auth endpoints.
-
TLS Required: Enforce TLS for all auth-related connections.
Security Checklist
- JWT signatures verified with JWKS
- Token expiration enforced
- Credentials never logged or exposed
- API keys hashed in storage
- Session timeout enforced
- Auth failures rate-limited
Performance Targets
| Metric | Target | Measurement |
|---|---|---|
| JWT validation | <1ms | p99 (cached JWKS) |
| OAuth introspection | <100ms | p99 (network) |
| Role mapping | <100μs | p99 |
| Credential lookup | <10ms | p99 (with cache) |
Related Features
- Multi-Tenancy - Tenant identification via auth
- Rate Limiting - Per-identity limits
- Query Analytics - Per-identity analytics