Skip to content

Feature 09: Authentication Proxy

Feature 09: Authentication Proxy

Priority: Medium | Complexity: High | Phase: 3 (Enterprise)


Overview

Problem Statement

Database authentication has limitations:

  • PostgreSQL auth is database-centric, not application-centric
  • No native support for modern auth (OAuth, OIDC, SAML)
  • Connection-string credentials are hard to rotate
  • No fine-grained access control (tables, rows, columns)

Applications need:

  • Centralized identity management
  • Token-based authentication
  • Dynamic credential rotation
  • Fine-grained authorization

Solution

Implement an authentication proxy layer that bridges modern identity systems with database access:

┌─────────────────────────────────────────────────┐
│ AUTHENTICATION PROXY │
│ │
Client ─────────►│ ┌──────────────────────────────────────────┐ │
(JWT/OAuth) │ │ 1. Extract & Validate Token │ │
│ │ - JWT validation │ │
│ │ - OAuth token introspection │ │
│ │ - OIDC userinfo │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 2. Map Identity to DB Credentials │ │
│ │ - Role lookup │ │
│ │ - Permission resolution │ │
│ │ - Credential selection │ │
│ └──────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────────────────────────────┐ │
│ │ 3. Connect to Backend │ │
│ │ - Use service account │ │
│ │ - Set session variables │ │
│ │ - Apply RLS context │ │
│ └──────────────────────────────────────────┘ │
└─────────────────────────────────────────────────┘
┌──────────────┐
│ Backend │
│ (single SA) │
└──────────────┘

Architecture

Authentication Handler

pub struct AuthenticationHandler {
/// JWT validator
jwt_validator: Option<JwtValidator>,
/// OAuth introspection client
oauth_client: Option<OAuthClient>,
/// OIDC provider
oidc_provider: Option<OidcProvider>,
/// LDAP connector
ldap_connector: Option<LdapConnector>,
/// Identity to role mapper
role_mapper: Arc<RoleMapper>,
/// Backend credential provider
credential_provider: Arc<CredentialProvider>,
}
impl AuthenticationHandler {
pub async fn authenticate(&self, request: &AuthRequest) -> Result<AuthResult, AuthError> {
// 1. Extract credentials from request
let identity = self.extract_identity(request).await?;
// 2. Validate identity
let validated = self.validate_identity(&identity).await?;
// 3. Map to database role
let db_role = self.role_mapper.map(&validated)?;
// 4. Get backend credentials
let credentials = self.credential_provider.get_for_role(&db_role)?;
Ok(AuthResult {
identity: validated,
db_role,
credentials,
session_vars: self.build_session_vars(&validated),
})
}
async fn extract_identity(&self, request: &AuthRequest) -> Result<Identity, AuthError> {
match &request.auth_type {
AuthType::Jwt(token) => {
let claims = self.jwt_validator.as_ref()
.ok_or(AuthError::UnsupportedAuthType)?
.validate(token)?;
Ok(Identity::from_jwt_claims(claims))
}
AuthType::OAuth(access_token) => {
let userinfo = self.oauth_client.as_ref()
.ok_or(AuthError::UnsupportedAuthType)?
.introspect(access_token).await?;
Ok(Identity::from_oauth(userinfo))
}
AuthType::Basic(username, password) => {
// Pass through to backend or LDAP
if let Some(ldap) = &self.ldap_connector {
ldap.authenticate(username, password).await?;
}
Ok(Identity::basic(username.clone()))
}
AuthType::ApiKey(key) => {
let identity = self.api_key_store.lookup(key)?;
Ok(identity)
}
}
}
}

JWT Validator

pub struct JwtValidator {
/// JWKS (JSON Web Key Set) for signature verification
jwks: Arc<RwLock<Jwks>>,
/// Issuer whitelist
allowed_issuers: HashSet<String>,
/// Audience requirement
required_audience: Option<String>,
/// Clock skew tolerance
clock_skew: Duration,
}
impl JwtValidator {
pub fn validate(&self, token: &str) -> Result<JwtClaims, JwtError> {
let header = self.decode_header(token)?;
let key = self.jwks.read().get_key(&header.kid)?;
let claims = self.verify_signature(token, &key)?;
// Validate standard claims
self.validate_exp(&claims)?;
self.validate_iat(&claims)?;
self.validate_iss(&claims)?;
self.validate_aud(&claims)?;
Ok(claims)
}
/// Background task to refresh JWKS
pub async fn start_jwks_refresh(&self, interval: Duration) {
loop {
if let Err(e) = self.refresh_jwks().await {
warn!("Failed to refresh JWKS: {}", e);
}
tokio::time::sleep(interval).await;
}
}
}

Role Mapper

pub struct RoleMapper {
/// Rules for mapping identity to database role
rules: Vec<RoleMappingRule>,
/// Default role if no rules match
default_role: Option<String>,
}
#[derive(Debug)]
pub struct RoleMappingRule {
/// Condition to match
pub condition: RoleCondition,
/// Database role to assign
pub db_role: String,
/// Priority (higher = evaluated first)
pub priority: i32,
}
#[derive(Debug)]
pub enum RoleCondition {
/// Match JWT claim
JwtClaim { name: String, value: String },
/// Match OAuth scope
OAuthScope(String),
/// Match group membership
Group(String),
/// Match email domain
EmailDomain(String),
/// Always match
Always,
}
impl RoleMapper {
pub fn map(&self, identity: &Identity) -> Result<String, RoleMappingError> {
for rule in self.rules.iter().sorted_by_key(|r| -r.priority) {
if self.matches(&rule.condition, identity) {
return Ok(rule.db_role.clone());
}
}
self.default_role.clone()
.ok_or(RoleMappingError::NoMatchingRole)
}
}

Credential Provider

pub struct CredentialProvider {
/// Static credentials
static_creds: HashMap<String, Credentials>,
/// Vault integration
vault_client: Option<VaultClient>,
/// AWS Secrets Manager
aws_secrets: Option<AwsSecretsClient>,
/// Credential cache
cache: Cache<String, Credentials>,
}
impl CredentialProvider {
pub async fn get_for_role(&self, role: &str) -> Result<Credentials, CredentialError> {
// Check cache first
if let Some(creds) = self.cache.get(role) {
return Ok(creds);
}
// Try Vault
if let Some(vault) = &self.vault_client {
if let Ok(creds) = vault.get_database_credentials(role).await {
self.cache.insert(role.to_string(), creds.clone(), creds.ttl);
return Ok(creds);
}
}
// Try AWS Secrets Manager
if let Some(aws) = &self.aws_secrets {
if let Ok(creds) = aws.get_secret(&format!("db/{}", role)).await {
self.cache.insert(role.to_string(), creds.clone(), Duration::from_secs(300));
return Ok(creds);
}
}
// Fall back to static
self.static_creds.get(role)
.cloned()
.ok_or(CredentialError::NotFound(role.to_string()))
}
}

API Specification

Configuration (heliosproxy.toml)

[auth]
enabled = true
# JWT authentication
[auth.jwt]
enabled = true
jwks_url = "https://auth.example.com/.well-known/jwks.json"
jwks_refresh_interval = "1h"
issuer = "https://auth.example.com"
audience = "heliosdb-api"
clock_skew = "60s"
# OAuth introspection
[auth.oauth]
enabled = true
introspection_url = "https://auth.example.com/oauth/introspect"
client_id = "heliosproxy"
client_secret = "${OAUTH_CLIENT_SECRET}"
# LDAP authentication
[auth.ldap]
enabled = false
server = "ldaps://ldap.example.com:636"
bind_dn = "cn=proxy,dc=example,dc=com"
bind_password = "${LDAP_BIND_PASSWORD}"
search_base = "ou=users,dc=example,dc=com"
user_filter = "(uid={0})"
group_attribute = "memberOf"
# API Key authentication
[auth.api_keys]
enabled = true
header_name = "X-API-Key"
# Role mapping rules
[[auth.role_mapping]]
condition = { jwt_claim = { name = "role", value = "admin" } }
db_role = "db_admin"
priority = 100
[[auth.role_mapping]]
condition = { group = "developers" }
db_role = "db_readwrite"
priority = 50
[[auth.role_mapping]]
condition = { email_domain = "example.com" }
db_role = "db_readonly"
priority = 10
[[auth.role_mapping]]
condition = "always"
db_role = "db_minimal"
priority = 0
# Credential providers
[auth.credentials]
default_provider = "vault"
[auth.credentials.vault]
address = "https://vault.example.com"
auth_method = "kubernetes"
role = "heliosproxy"
secret_path = "database/creds"
[auth.credentials.static]
db_readonly = { username = "readonly", password = "${DB_READONLY_PASSWORD}" }
db_minimal = { username = "minimal", password = "${DB_MINIMAL_PASSWORD}" }
# Session variables to set after auth
[auth.session_vars]
"app.current_user" = "{identity.user_id}"
"app.tenant_id" = "{identity.tenant_id}"
"app.roles" = "{identity.roles}"

Admin API

GET /auth/status
{
"jwt": { "enabled": true, "jwks_last_refresh": "2026-01-25T10:00:00Z" },
"oauth": { "enabled": true, "introspection_available": true },
"ldap": { "enabled": false }
}
GET /auth/sessions
{
"active_sessions": 45,
"sessions": [
{
"session_id": "sess_123",
"identity": "user@example.com",
"db_role": "db_readwrite",
"connected_at": "2026-01-25T10:30:00Z",
"last_query": "2026-01-25T10:35:00Z"
}
]
}
POST /auth/api-keys
# Create API key
{
"name": "CI Pipeline",
"db_role": "db_readwrite",
"expires_at": "2027-01-25T00:00:00Z"
}
Response:
{
"key_id": "key_abc123",
"api_key": "hpk_xxxxxxxxxxxx", // Only shown once
"name": "CI Pipeline"
}
DELETE /auth/api-keys/{key_id}
# Revoke API key
POST /auth/invalidate
# Invalidate cached credentials
{ "role": "db_readwrite" }

AI/Agent Innovations

1. Agent Identity Tokens

Issue short-lived tokens for AI agents:

pub struct AgentTokenIssuer {
signer: Signer,
config: AgentTokenConfig,
}
impl AgentTokenIssuer {
pub fn issue_agent_token(&self, agent: &AgentIdentity) -> String {
let claims = JwtClaims {
sub: agent.id.clone(),
iss: "heliosproxy".to_string(),
exp: now() + self.config.token_ttl,
agent_type: Some(agent.agent_type.clone()),
tool_permissions: Some(agent.allowed_tools.clone()),
resource_quota: Some(agent.quota.clone()),
};
self.signer.sign(claims)
}
}

2. Tool-Based Authorization

Authorize based on AI tool usage:

[auth.agent_tools]
# Tool-specific permissions
knowledge_search = { db_role = "db_readonly", tables = ["documents", "embeddings"] }
user_lookup = { db_role = "db_readonly", tables = ["users"] }
create_order = { db_role = "db_readwrite", tables = ["orders", "order_items"] }
pub struct ToolAuthorizer {
tool_permissions: HashMap<String, ToolPermission>,
}
impl ToolAuthorizer {
pub fn authorize_tool(&self, identity: &Identity, tool: &str) -> Result<DbRole, AuthError> {
let permission = self.tool_permissions.get(tool)
.ok_or(AuthError::UnknownTool(tool.to_string()))?;
// Check if identity has permission for this tool
if !identity.can_use_tool(tool) {
return Err(AuthError::ToolNotAllowed(tool.to_string()));
}
Ok(permission.db_role.clone())
}
}

3. Conversation-Scoped Tokens

Tokens valid only within a conversation:

pub struct ConversationScopedToken {
pub token: String,
pub conversation_id: String,
pub expires_at: DateTime<Utc>,
}
impl AuthenticationHandler {
pub fn validate_conversation_token(
&self,
token: &str,
conversation_id: &str,
) -> Result<Identity, AuthError> {
let claims = self.jwt_validator.validate(token)?;
// Verify conversation scope
if claims.conversation_id != Some(conversation_id.to_string()) {
return Err(AuthError::InvalidConversationScope);
}
Ok(Identity::from_jwt_claims(claims))
}
}

4. LLM-Safe Credential Handling

Never expose credentials to LLM context:

pub struct SecureCredentialInjector {
/// Credential store (never logged or returned)
credentials: SecureStore,
}
impl SecureCredentialInjector {
/// Inject credentials without exposing them
pub fn connect_securely(&self, role: &str) -> Result<Connection, Error> {
// Credentials fetched internally, never returned
let creds = self.credentials.get(role)?;
// Connect with credentials (not logged)
let conn = Connection::new_with_credentials(&creds);
// Clear credentials from memory
creds.zeroize();
Ok(conn)
}
}

HeliosDB-Lite Integration

1. Branch-Based Authorization

Control branch access per identity:

[auth.branches]
# Who can access which branches
main = { roles = ["db_admin", "db_readwrite"] }
development = { roles = ["db_admin", "db_readwrite", "db_developer"] }
analytics = { roles = ["db_admin", "db_analyst"] }
impl BranchAuthorizer {
pub fn can_access_branch(&self, identity: &Identity, branch: &str) -> bool {
if let Some(allowed_roles) = self.branch_permissions.get(branch) {
return identity.roles.iter().any(|r| allowed_roles.contains(r));
}
false
}
}

2. Sync Mode Authorization

Control who can use sync writes:

[auth.sync_modes]
# Sync mode permissions
sync = { roles = ["db_admin", "db_critical"] }
semisync = { roles = ["db_admin", "db_critical", "db_readwrite"] }
async = { roles = ["*"] } # Everyone

3. Time-Travel Authorization

Control historical data access:

impl TimeTravelAuthorizer {
pub fn can_time_travel(&self, identity: &Identity, as_of: DateTime<Utc>) -> bool {
// Some identities may only see recent history
if let Some(max_lookback) = identity.max_time_travel_lookback {
let min_allowed = Utc::now() - max_lookback;
return as_of >= min_allowed;
}
// Admins can access all history
identity.is_admin()
}
}

4. Replication Auth Context

Pass identity through replication:

pub struct ReplicationAuthContext {
pub original_identity: String,
pub auth_method: String,
pub auth_timestamp: DateTime<Utc>,
}
impl WalReplicator {
pub fn replicate_with_context(&self, entry: WalEntry, auth: &ReplicationAuthContext) {
let enriched = entry.with_auth_context(auth);
self.send_to_standbys(enriched);
}
}

Implementation Notes

File Locations

src/proxy/
├── auth/
│ ├── mod.rs # Public API
│ ├── handler.rs # AuthenticationHandler
│ ├── jwt.rs # JwtValidator
│ ├── oauth.rs # OAuthClient
│ ├── ldap.rs # LdapConnector
│ ├── api_keys.rs # API key management
│ ├── role_mapper.rs # RoleMapper
│ ├── credentials.rs # CredentialProvider
│ └── session.rs # Session management

Key Considerations

  1. Token Caching: Cache validated tokens to avoid repeated validation.

  2. Credential Rotation: Support dynamic credential rotation from Vault/secrets managers.

  3. Audit Logging: Log all authentication events for compliance.

  4. Rate Limiting Auth: Prevent brute-force attacks on auth endpoints.

  5. TLS Required: Enforce TLS for all auth-related connections.

Security Checklist

  • JWT signatures verified with JWKS
  • Token expiration enforced
  • Credentials never logged or exposed
  • API keys hashed in storage
  • Session timeout enforced
  • Auth failures rate-limited

Performance Targets

MetricTargetMeasurement
JWT validation<1msp99 (cached JWKS)
OAuth introspection<100msp99 (network)
Role mapping<100μsp99
Credential lookup<10msp99 (with cache)