From fd47c158357e13d962c7540c0a871165b116d082 Mon Sep 17 00:00:00 2001 From: Giuseppe D'Anna Date: Wed, 29 Apr 2026 16:20:57 +0200 Subject: [PATCH 1/4] wip --- Cargo.lock | 23 +- integration/vault/docker-compose.yml | 32 ++ integration/vault/pgdog.toml | 32 ++ integration/vault/setup-vault.sh | 94 ++++ integration/vault/setup.sql | 28 ++ integration/vault/users.toml | 17 + pgdog-config/src/core.rs | 8 + pgdog-config/src/lib.rs | 2 + pgdog-config/src/users.rs | 11 + pgdog-config/src/vault.rs | 284 ++++++++++++ pgdog/Cargo.toml | 2 +- pgdog/src/auth/mod.rs | 1 - pgdog/src/backend/auth/mod.rs | 1 + pgdog/src/backend/auth/vault/api.rs | 312 +++++++++++++ pgdog/src/backend/auth/vault/error.rs | 22 + pgdog/src/backend/auth/vault/mod.rs | 645 ++++++++++++++++++++++++++ pgdog/src/backend/pool/address.rs | 51 +- pgdog/src/main.rs | 15 + 18 files changed, 1565 insertions(+), 15 deletions(-) create mode 100644 integration/vault/docker-compose.yml create mode 100644 integration/vault/pgdog.toml create mode 100755 integration/vault/setup-vault.sh create mode 100644 integration/vault/setup.sql create mode 100644 integration/vault/users.toml create mode 100644 pgdog-config/src/vault.rs create mode 100644 pgdog/src/backend/auth/vault/api.rs create mode 100644 pgdog/src/backend/auth/vault/error.rs create mode 100644 pgdog/src/backend/auth/vault/mod.rs diff --git a/Cargo.lock b/Cargo.lock index 61004877a..c78599b54 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1481,9 +1481,9 @@ dependencies = [ [[package]] name = "digest" -version = "0.11.2" +version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4850db49bf08e663084f7fb5c87d202ef91a3907271aff24a94eb97ff039153c" +checksum = "f1dd6dbb5841937940781866fa1281a1ff7bd3bf827091440879f9994983d5c2" dependencies = [ "block-buffer 0.12.0", "const-oid 0.10.2", @@ -2069,7 +2069,7 @@ version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6303bc9732ae41b04cb554b844a762b4115a61bfaa81e3e83050991eeb56863f" dependencies = [ - "digest 0.11.2", + "digest 0.11.3", ] [[package]] @@ -2719,7 +2719,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69b6441f590336821bb897fb28fc622898ccceb1d6cea3fde5ea86b090c4de98" dependencies = [ "cfg-if", - "digest 0.11.2", + "digest 0.11.3", ] [[package]] @@ -2973,15 +2973,14 @@ checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" [[package]] name = "openssl" -version = "0.10.78" +version = "0.10.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f38c4372413cdaaf3cc79dd92d29d7d9f5ab09b51b10dded508fb90bb70b9222" +checksum = "bf0b434746ee2832f4f0baf10137e1cabb18cbe6912c69e2e33263c45250f542" dependencies = [ "bitflags 2.11.1", "cfg-if", "foreign-types", "libc", - "once_cell", "openssl-macros", "openssl-sys", ] @@ -3005,9 +3004,9 @@ checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" [[package]] name = "openssl-sys" -version = "0.9.114" +version = "0.9.115" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13ce1245cd07fcc4cfdb438f7507b0c7e4f3849a69fd84d52374c66d83741bb6" +checksum = "158fe5b292746440aa6e7a7e690e55aeb72d41505e2804c23c6973ad0e9c9781" dependencies = [ "cc", "libc", @@ -4510,7 +4509,7 @@ checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" dependencies = [ "cfg-if", "cpufeatures 0.3.0", - "digest 0.11.2", + "digest 0.11.3", ] [[package]] @@ -4593,9 +4592,9 @@ checksum = "e3a9fe34e3e7a50316060351f37187a3f546bce95496156754b601a5fa71b76e" [[package]] name = "siphasher" -version = "1.0.2" +version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2aa850e253778c88a04c3d7323b043aeda9d3e30d5971937c1855769763678e" +checksum = "8ee5873ec9cce0195efcb7a4e9507a04cd49aec9c83d0389df45b1ef7ba2e649" [[package]] name = "slab" diff --git a/integration/vault/docker-compose.yml b/integration/vault/docker-compose.yml new file mode 100644 index 000000000..5668cd1b1 --- /dev/null +++ b/integration/vault/docker-compose.yml @@ -0,0 +1,32 @@ +services: + postgres: + image: postgres:18 + environment: + POSTGRES_DB: pgdog + POSTGRES_USER: postgres + POSTGRES_PASSWORD: postgres + ports: + - "15432:5432" + volumes: + - ./setup.sql:/docker-entrypoint-initdb.d/setup.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U postgres"] + interval: 2s + timeout: 2s + retries: 10 + + vault: + image: hashicorp/vault:1.17 + environment: + VAULT_DEV_ROOT_TOKEN_ID: "root-token" + VAULT_DEV_LISTEN_ADDRESS: "0.0.0.0:8200" + VAULT_ADDR: "http://0.0.0.0:8200" + ports: + - "18200:8200" + cap_add: + - IPC_LOCK + healthcheck: + test: ["CMD", "vault", "status"] + interval: 2s + timeout: 2s + retries: 10 diff --git a/integration/vault/pgdog.toml b/integration/vault/pgdog.toml new file mode 100644 index 000000000..ceb220405 --- /dev/null +++ b/integration/vault/pgdog.toml @@ -0,0 +1,32 @@ +#:schema ./.schema/pgdog.schema.json +# +# PgDog configuration. +[general] +host = "0.0.0.0" +port = 6432 +workers = 2 +default_pool_size = 10 +min_pool_size = 1 +pooler_mode = "transaction" +tls_verify = "disabled" +shutdown_timeout = 60_000 +shutdown_termination_timeout = 60_000 +openmetrics_port = 9090 +openmetrics_namespace = "pgdog_" +log_format = "json" +log_level = "info" +passthrough_auth = "disabled" +connect_attempt_delay = 1_000 +auth_type = "scram" + +[[databases]] +name = "pgdog" +host = "127.0.0.1" +port = 15432 +role = "primary" + +[vault] +address = "http://127.0.0.1:18200" +auth_method = "app_role" +role_id = "9d9a096a-35ba-6e01-980e-582fb6fa9ae5" +secret_id = "35cb60a5-78b5-a6b5-e88d-c8051882b341" diff --git a/integration/vault/setup-vault.sh b/integration/vault/setup-vault.sh new file mode 100755 index 000000000..6a9fa9e18 --- /dev/null +++ b/integration/vault/setup-vault.sh @@ -0,0 +1,94 @@ +#!/usr/bin/env bash +set -euo pipefail + +## Configure Vault's database secrets engine for the pgdog vault integration. +## Run after docker compose up. + +export VAULT_ADDR="http://127.0.0.1:18200" +export VAULT_TOKEN="root-token" + +echo "==> Enabling database secrets engine..." +vault secrets enable database 2>/dev/null || echo " (already enabled)" + +echo "==> Configuring PostgreSQL connection..." +vault write database/config/pgdog \ + plugin_name=postgresql-database-plugin \ + allowed_roles="pgdog-admin,dml-role,ddl-role,readonly-role" \ + connection_url="postgresql://{{username}}:{{password}}@host.docker.internal:15432/pgdog?sslmode=disable" \ + username="postgres" \ + password="postgres" + +echo "==> Creating DML role (30m TTL)..." +vault write database/roles/dml-role \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"dml_role\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Creating DDL role (30m TTL)..." +vault write database/roles/ddl-role \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"ddl_role\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Creating readonly role (30m TTL)..." +vault write database/roles/readonly-role \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"readonly_role\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Creating pgdog-admin role (for pg_authid access, 30m TTL)..." +vault write database/roles/pgdog-admin \ + db_name=pgdog \ + creation_statements="CREATE ROLE \"{{name}}\" WITH LOGIN PASSWORD '{{password}}' VALID UNTIL '{{expiration}}' IN ROLE \"pgdog_admin_role\"; GRANT pg_read_all_data TO \"{{name}}\";" \ + revocation_statements="DROP ROLE IF EXISTS \"{{name}}\";" \ + default_ttl="30m" \ + max_ttl="1h" + +echo "==> Enabling AppRole auth..." +vault auth enable approle 2>/dev/null || echo " (already enabled)" + +echo "==> Creating pgdog AppRole..." +vault write auth/approle/role/pgdog \ + token_ttl=1h \ + token_max_ttl=4h \ + token_policies="pgdog-policy" + +echo "==> Creating pgdog policy..." +vault policy write pgdog-policy - <<'POLICY' +path "database/creds/*" { + capabilities = ["read"] +} + +# Renew leases +path "sys/leases/renew" { + capabilities = ["update"] +} +POLICY + +echo "==> Fetching AppRole credentials..." +ROLE_ID=$(vault read -field=role_id auth/approle/role/pgdog/role-id) +SECRET_ID=$(vault write -field=secret_id -f auth/approle/role/pgdog/secret-id) + +echo "" +echo "============================================" +echo " Vault setup complete!" +echo "============================================" +echo "" +echo " AppRole role_id: $ROLE_ID" +echo " AppRole secret_id: $SECRET_ID" +echo "" +echo " Test credential generation:" +echo " vault read database/creds/dml-role" +echo " vault read database/creds/ddl-role" +echo " vault read database/creds/readonly-role" +echo "" +echo " To use with pgdog, update integration/vault/pgdog.toml with:" +echo " role_id = \"$ROLE_ID\"" +echo " secret_id = \"$SECRET_ID\"" +echo "" diff --git a/integration/vault/setup.sql b/integration/vault/setup.sql new file mode 100644 index 000000000..4867018ea --- /dev/null +++ b/integration/vault/setup.sql @@ -0,0 +1,28 @@ +-- Creates the parent roles that Vault's IN ROLE clause references. + +-- DML role: read/write on application tables +CREATE ROLE dml_role NOLOGIN; +GRANT SELECT,INSERT,UPDATE,DELETE ON ALL TABLES IN SCHEMA public TO dml_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT,INSERT,UPDATE,DELETE ON TABLES TO dml_role; +GRANT USAGE,SELECT,UPDATE ON ALL SEQUENCES IN SCHEMA public TO dml_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT USAGE,SELECT,UPDATE ON SEQUENCES TO dml_role; + +-- DDL role: schema migrations +CREATE ROLE ddl_role NOLOGIN; +GRANT ALL ON SCHEMA public TO ddl_role; +GRANT ALL ON ALL TABLES IN SCHEMA public TO ddl_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT ALL ON TABLES TO ddl_role; + +-- Readonly role: analytics/reporting +CREATE ROLE readonly_role NOLOGIN; +GRANT SELECT ON ALL TABLES IN SCHEMA public TO readonly_role; +ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO readonly_role; + +-- Create a sample table for testing +CREATE TABLE IF NOT EXISTS demo_items ( + id SERIAL PRIMARY KEY, + name TEXT NOT NULL, + created_at TIMESTAMPTZ DEFAULT NOW() +); + +INSERT INTO demo_items (name) VALUES ('item-1'), ('item-2'), ('item-3'); diff --git a/integration/vault/users.toml b/integration/vault/users.toml new file mode 100644 index 000000000..c0709e99e --- /dev/null +++ b/integration/vault/users.toml @@ -0,0 +1,17 @@ +#:schema ./.schema/users.schema.json +# +# Basic users configuration. +# +[[users]] +name = "dml_role" +password = "pgdog" +database = "pgdog" +server_auth = "vault" +vault_path = "database/creds/dml-role" + +[[users]] +name = "ddl_role" +password = "pgdog" +database = "pgdog" +server_auth = "vault" +vault_path = "database/creds/ddl-role" diff --git a/pgdog-config/src/core.rs b/pgdog-config/src/core.rs index ecc1bb4ec..d49075b10 100644 --- a/pgdog-config/src/core.rs +++ b/pgdog-config/src/core.rs @@ -23,6 +23,7 @@ use super::replication::{MirrorConfig, Mirroring, MirroringLevel, ReplicaLag, Re use super::rewrite::Rewrite; use super::sharding::{ManualQuery, OmnishardedTables, ShardedMapping, ShardedTable}; use super::users::{Admin, Plugin, Users}; +use super::vault::VaultConfig; #[derive(Debug, Clone, PartialEq)] pub struct ConfigAndUsers { @@ -222,6 +223,13 @@ pub struct Config { #[schemars(default = "crate::users::Admin::schemars_default_stub")] pub admin: Admin, + /// Vault integration for dynamic credential rotation. + /// + /// When set, pgdog fetches PostgreSQL credentials from Vault for any pool + /// that has `vault_path` configured in `users.toml`. + #[serde(default)] + pub vault: Option, + /// To detect and route queries with sharding keys, PgDog expects the sharded column to be specified in the configuration. /// /// https://docs.pgdog.dev/configuration/pgdog.toml/sharded_tables/ diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 1f5af5964..9165749a8 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -17,6 +17,7 @@ pub mod system_catalogs; pub mod url; pub mod users; pub mod util; +pub mod vault; pub use auth::{AuthType, PassthroughAuth}; pub use core::{Config, ConfigAndUsers}; @@ -36,6 +37,7 @@ pub use rewrite::{Rewrite, RewriteMode}; pub use sharding::*; pub use system_catalogs::system_catalogs; pub use users::{Admin, Plugin, ServerAuth, User, Users}; +pub use vault::{VaultAuthMethod, VaultConfig}; use std::time::Duration; diff --git a/pgdog-config/src/users.rs b/pgdog-config/src/users.rs index f855d5535..bd4850644 100644 --- a/pgdog-config/src/users.rs +++ b/pgdog-config/src/users.rs @@ -150,6 +150,10 @@ pub enum ServerAuth { RdsIam, /// Generate an Azure Workload Identity auth token per connection attempt. AzureWorkloadIdentity, + /// Credentials are managed by the Vault integration. pgdog fetches + /// `server_user` and `server_password` from Vault at runtime; never + /// falls back to static client passwords for backend connections. + Vault, } impl ServerAuth { @@ -284,6 +288,13 @@ pub struct User { pub two_phase_commit_auto: Option, /// Server connections older than this (in milliseconds) will be closed when returned to the pool. pub server_lifetime: Option, + /// Vault database credential path for this pool, e.g. `database/creds/dml-role`. + /// + /// When set, pgdog manages backend credentials via Vault: it fetches a fresh + /// username/password from this path and rotates them before the lease expires. + /// The `[vault]` block in `pgdog.toml` must also be configured. + #[serde(default)] + pub vault_path: Option, } impl User { diff --git a/pgdog-config/src/vault.rs b/pgdog-config/src/vault.rs new file mode 100644 index 000000000..95c687eb0 --- /dev/null +++ b/pgdog-config/src/vault.rs @@ -0,0 +1,284 @@ +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +/// Vault authentication method used by pgdog to obtain a Vault token. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] +#[serde(rename_all = "snake_case")] +pub enum VaultAuthMethod { + /// AppRole authentication — provide `role_id` and `secret_id` / `secret_id_file`. + #[default] + AppRole, + /// Kubernetes authentication — pgdog presents the pod's service account JWT + /// to Vault, which validates it against the Kubernetes API server. + Kubernetes, +} + +/// Settings for Vault integration. +/// +/// When configured, pgdog fetches dynamic PostgreSQL credentials from Vault +/// for pools that have `vault_path` set in `users.toml`, rotating them +/// automatically before expiry. +/// +/// # AppRole example +/// ```toml +/// [vault] +/// address = "https://vault.example.com:8200" +/// auth_method = "approle" +/// role_id = "pgdog-role-id" +/// secret_id = "..." +/// ``` +/// +/// # Kubernetes example +/// ```toml +/// [vault] +/// address = "https://vault.example.com:8200" +/// auth_method = "kubernetes" +/// kubernetes_role = "pgdog" +/// ``` +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, JsonSchema)] +#[serde(deny_unknown_fields)] +pub struct VaultConfig { + /// Vault server address, e.g. `https://vault.example.com:8200`. + pub address: String, + + /// Authentication method used to obtain a Vault token (default: `approle`). + #[serde(default)] + pub auth_method: VaultAuthMethod, + + /// Fetch fresh credentials when this percentage of the lease TTL has elapsed. + /// Must be between 1 and 99. Default: 75. + #[serde(default = "VaultConfig::default_pre_rotation_pct")] + pub pre_rotation_pct: u8, + + // ── AppRole fields ──────────────────────────────────────────────────────── + + /// AppRole `role_id`. Required when `auth_method = "approle"`. + pub role_id: Option, + + /// AppRole `secret_id` provided inline. Mutually exclusive with `secret_id_file`. + pub secret_id: Option, + + /// Path to a file containing the AppRole `secret_id` (e.g. a Kubernetes secret + /// mounted at `/etc/pgdog/vault-secret-id`). Mutually exclusive with `secret_id`. + pub secret_id_file: Option, + + // ── Kubernetes fields ───────────────────────────────────────────────────── + + /// Vault role name to authenticate as. Required when `auth_method = "kubernetes"`. + pub kubernetes_role: Option, + + /// Path to the Kubernetes service account JWT token file. + /// Default: `/var/run/secrets/kubernetes.io/serviceaccount/token`. + #[serde(default = "VaultConfig::default_kubernetes_jwt_path")] + pub kubernetes_jwt_path: String, + + /// Vault mount path for the Kubernetes auth method. Default: `kubernetes`. + #[serde(default = "VaultConfig::default_kubernetes_mount_path")] + pub kubernetes_mount_path: String, +} + +impl VaultConfig { + fn default_pre_rotation_pct() -> u8 { + 75 + } + + pub fn default_kubernetes_jwt_path() -> String { + "/var/run/secrets/kubernetes.io/serviceaccount/token".into() + } + + pub fn default_kubernetes_mount_path() -> String { + "kubernetes".into() + } + + /// Returns the AppRole `secret_id`, reading from `secret_id_file` if necessary. + /// Trims whitespace so files written with a trailing newline work out of the box. + pub fn secret_id(&self) -> std::io::Result { + if let Some(ref s) = self.secret_id { + return Ok(s.clone()); + } + if let Some(ref path) = self.secret_id_file { + return std::fs::read_to_string(path).map(|s| s.trim().to_string()); + } + Err(std::io::Error::new( + std::io::ErrorKind::NotFound, + "vault: neither secret_id nor secret_id_file is configured", + )) + } + + /// Returns the path to the Kubernetes service account JWT. + /// Callers should read the file asynchronously via `tokio::fs::read_to_string`. + pub fn jwt_path(&self) -> &str { + &self.kubernetes_jwt_path + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn approle_base() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: VaultAuthMethod::AppRole, + pre_rotation_pct: 75, + role_id: Some("test-role".into()), + secret_id: Some("inline-secret".into()), + secret_id_file: None, + kubernetes_role: None, + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + } + } + + fn kubernetes_base() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: VaultAuthMethod::Kubernetes, + pre_rotation_pct: 75, + role_id: None, + secret_id: None, + secret_id_file: None, + kubernetes_role: Some("pgdog".into()), + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + } + } + + // ── AppRole ─────────────────────────────────────────────────────────────── + + #[test] + fn test_secret_id_inline() { + assert_eq!(approle_base().secret_id().unwrap(), "inline-secret"); + } + + #[test] + fn test_secret_id_missing_returns_error() { + let cfg = VaultConfig { + secret_id: None, + secret_id_file: None, + ..approle_base() + }; + assert!(cfg.secret_id().is_err()); + } + + #[test] + fn test_secret_id_file() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, " file-secret ").unwrap(); + let cfg = VaultConfig { + secret_id: None, + secret_id_file: Some(f.path().to_str().unwrap().into()), + ..approle_base() + }; + assert_eq!(cfg.secret_id().unwrap(), "file-secret"); + } + + #[test] + fn test_secret_id_inline_wins_over_file() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, "file-secret").unwrap(); + let cfg = VaultConfig { + secret_id: Some("inline".into()), + secret_id_file: Some(f.path().to_str().unwrap().into()), + ..approle_base() + }; + assert_eq!(cfg.secret_id().unwrap(), "inline"); + } + + // ── Kubernetes ──────────────────────────────────────────────────────────── + + #[test] + fn test_kubernetes_mount_path_default() { + assert_eq!(kubernetes_base().kubernetes_mount_path, "kubernetes"); + } + + #[test] + fn test_kubernetes_jwt_path_default() { + assert_eq!( + kubernetes_base().kubernetes_jwt_path, + "/var/run/secrets/kubernetes.io/serviceaccount/token" + ); + } + + #[test] + fn test_jwt_path_returns_configured_path() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, " eyJhbGciOiJSUzI1NiJ9.stub ").unwrap(); + let cfg = VaultConfig { + kubernetes_jwt_path: f.path().to_str().unwrap().into(), + ..kubernetes_base() + }; + // Callers read the file with tokio::fs::read_to_string; test the path is returned. + assert_eq!(cfg.jwt_path(), f.path().to_str().unwrap()); + // Verify the file itself is readable synchronously (tokio::fs would be the same content). + let content = std::fs::read_to_string(cfg.jwt_path()).unwrap(); + assert_eq!(content.trim(), "eyJhbGciOiJSUzI1NiJ9.stub"); + } + + #[test] + fn test_jwt_path_default() { + let cfg = kubernetes_base(); + assert_eq!( + cfg.jwt_path(), + "/var/run/secrets/kubernetes.io/serviceaccount/token" + ); + } + + // ── serde ───────────────────────────────────────────────────────────────── + + #[test] + fn test_default_pre_rotation_pct() { + let toml = r#" + address = "https://vault.example.com:8200" + role_id = "r" + secret_id = "s" + "#; + let cfg: VaultConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.pre_rotation_pct, 75); + } + + #[test] + fn test_kubernetes_defaults_applied_when_not_set() { + let toml = r#" + address = "https://vault.example.com:8200" + auth_method = "kubernetes" + kubernetes_role = "pgdog" + "#; + let cfg: VaultConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.auth_method, VaultAuthMethod::Kubernetes); + assert_eq!(cfg.kubernetes_role.as_deref(), Some("pgdog")); + assert_eq!( + cfg.kubernetes_jwt_path, + "/var/run/secrets/kubernetes.io/serviceaccount/token" + ); + assert_eq!(cfg.kubernetes_mount_path, "kubernetes"); + } + + #[test] + fn test_kubernetes_custom_mount_and_jwt_path() { + let toml = r#" + address = "https://vault.example.com:8200" + auth_method = "kubernetes" + kubernetes_role = "pgdog" + kubernetes_mount_path = "k8s-cluster-a" + kubernetes_jwt_path = "/custom/token" + "#; + let cfg: VaultConfig = toml::from_str(toml).unwrap(); + assert_eq!(cfg.kubernetes_mount_path, "k8s-cluster-a"); + assert_eq!(cfg.kubernetes_jwt_path, "/custom/token"); + } + + #[test] + fn test_unknown_field_rejected() { + let toml = r#" + address = "https://vault.example.com:8200" + role_id = "r" + secret_id = "s" + typo_field = true + "#; + assert!(toml::from_str::(toml).is_err()); + } +} diff --git a/pgdog/Cargo.toml b/pgdog/Cargo.toml index 497249088..97c06ff65 100644 --- a/pgdog/Cargo.toml +++ b/pgdog/Cargo.toml @@ -76,7 +76,7 @@ azure_identity = "0.34.0" azure_core = "0.34.0" crc32c = "0.6.8" bit-vec = "0.8" -reqwest = { version = "0.12", default-features = false, features = ["rustls-tls-webpki-roots-no-provider"] } +reqwest = { version = "0.12", default-features = false, features = ["json", "rustls-tls-webpki-roots-no-provider"] } [target.'cfg(unix)'.dependencies] libc = "0.2" diff --git a/pgdog/src/auth/mod.rs b/pgdog/src/auth/mod.rs index 98d6160a8..b270c6a2e 100644 --- a/pgdog/src/auth/mod.rs +++ b/pgdog/src/auth/mod.rs @@ -3,6 +3,5 @@ pub mod error; pub mod md5; pub mod scram; - pub use error::Error; pub use md5::Client; diff --git a/pgdog/src/backend/auth/mod.rs b/pgdog/src/backend/auth/mod.rs index cda4083ee..c89d0c9c9 100644 --- a/pgdog/src/backend/auth/mod.rs +++ b/pgdog/src/backend/auth/mod.rs @@ -1,2 +1,3 @@ pub mod azure_workload_identity; pub mod rds_iam; +pub mod vault; diff --git a/pgdog/src/backend/auth/vault/api.rs b/pgdog/src/backend/auth/vault/api.rs new file mode 100644 index 000000000..905527a7e --- /dev/null +++ b/pgdog/src/backend/auth/vault/api.rs @@ -0,0 +1,312 @@ +//! Vault HTTP API client built on `reqwest`. + +use serde::Deserialize; + +use super::Error; + +// ── public types ────────────────────────────────────────────────────────────── + +#[derive(Debug, Clone)] +pub struct VaultToken { + pub client_token: String, + pub lease_duration: u64, + pub renewable: bool, +} + +#[derive(Debug, Clone)] +pub struct VaultCredential { + pub username: String, + pub password: String, + pub lease_duration: u64, +} + +// ── Vault JSON shapes ───────────────────────────────────────────────────────── + +/// Both AppRole and Kubernetes login responses share the same `auth` wrapper. +#[derive(Deserialize)] +struct LoginResponse { + auth: AuthData, +} + +#[derive(Deserialize)] +struct AuthData { + client_token: String, + lease_duration: u64, + renewable: bool, +} + +#[derive(Deserialize)] +struct CredentialResponse { + data: CredentialData, + lease_duration: u64, +} + +#[derive(Deserialize)] +struct CredentialData { + username: String, + password: String, +} + +// ── public API ──────────────────────────────────────────────────────────────── + +fn client() -> Result { + reqwest::Client::builder() + .build() + .map_err(|e| Error::Http(e.to_string())) +} + +/// Authenticate to Vault via AppRole and return a client token. +pub async fn approle_login( + addr: &str, + role_id: &str, + secret_id: &str, +) -> Result { + #[cfg(test)] + if let Some(result) = test_support::login_override() { + return result; + } + + let url = format!("{}/v1/auth/approle/login", addr.trim_end_matches('/')); + let body = serde_json::json!({ "role_id": role_id, "secret_id": secret_id }); + + post_login(&url, &body).await +} + +/// Authenticate to Vault via Kubernetes service account JWT and return a client token. +/// +/// `mount_path` is the Vault auth mount (default: `"kubernetes"`). +/// `role` is the Vault role name configured for this cluster. +/// `jwt` is the contents of the pod's service account token file. +pub async fn kubernetes_login( + addr: &str, + mount_path: &str, + role: &str, + jwt: &str, +) -> Result { + #[cfg(test)] + if let Some(result) = test_support::login_override() { + return result; + } + + let url = format!( + "{}/v1/auth/{}/login", + addr.trim_end_matches('/'), + mount_path.trim_matches('/') + ); + let body = serde_json::json!({ "role": role, "jwt": jwt }); + + post_login(&url, &body).await +} + +async fn post_login(url: &str, body: &serde_json::Value) -> Result { + let response = client()? + .post(url) + .json(body) + .send() + .await + .map_err(|e| Error::Http(e.to_string()))?; + + let response = check_status(response).await?; + + let parsed: LoginResponse = response + .json() + .await + .map_err(|e| Error::Parse(e.to_string()))?; + + Ok(VaultToken { + client_token: parsed.auth.client_token, + lease_duration: parsed.auth.lease_duration, + renewable: parsed.auth.renewable, + }) +} + +/// Fetch dynamic PostgreSQL credentials from `path` (e.g. `database/creds/dml-role`). +pub async fn fetch_credential( + addr: &str, + token: &str, + path: &str, +) -> Result { + #[cfg(test)] + if let Some(result) = test_support::credential_override() { + return result; + } + + let url = format!( + "{}/v1/{}", + addr.trim_end_matches('/'), + path.trim_start_matches('/') + ); + + let response = client()? + .get(&url) + .header("X-Vault-Token", token) + .send() + .await + .map_err(|e| Error::Http(e.to_string()))?; + + let response = check_status(response).await?; + + let parsed: CredentialResponse = response + .json() + .await + .map_err(|e| Error::Parse(e.to_string()))?; + + Ok(VaultCredential { + username: parsed.data.username, + password: parsed.data.password, + lease_duration: parsed.lease_duration, + }) +} + +async fn check_status(response: reqwest::Response) -> Result { + let status = response.status().as_u16(); + if !(200..300).contains(&(status as usize)) { + let body = response + .text() + .await + .unwrap_or_else(|_| "".into()); + return Err(Error::VaultStatus { status, body }); + } + Ok(response) +} + +// ── test support (override hooks) ───────────────────────────────────────────── + +#[cfg(test)] +pub mod test_support { + use super::{Error, VaultCredential, VaultToken}; + use std::cell::RefCell; + + thread_local! { + static LOGIN: RefCell>> = const { RefCell::new(None) }; + static CREDENTIAL: RefCell>> = const { RefCell::new(None) }; + } + + pub fn set_login(result: Option>) { + LOGIN.with(|c| *c.borrow_mut() = result); + } + + pub fn set_credential(result: Option>) { + CREDENTIAL.with(|c| *c.borrow_mut() = result); + } + + pub(super) fn login_override() -> Option> { + LOGIN.with(|c| c.borrow_mut().take()) + } + + pub(super) fn credential_override() -> Option> { + CREDENTIAL.with(|c| c.borrow_mut().take()) + } +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_login_response() { + let json = r#"{ + "auth": { + "client_token": "s.abc123", + "lease_duration": 86400, + "renewable": true, + "accessor": "ignored", + "policies": [] + } + }"#; + let parsed: LoginResponse = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.auth.client_token, "s.abc123"); + assert_eq!(parsed.auth.lease_duration, 86400); + assert!(parsed.auth.renewable); + } + + #[tokio::test] + async fn test_kubernetes_login_uses_override() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "k8s-token".into(), + lease_duration: 7200, + renewable: true, + }))); + let token = kubernetes_login( + "http://irrelevant", + "kubernetes", + "pgdog", + "eyJhbGciOiJSUzI1NiJ9.stub", + ) + .await + .unwrap(); + assert_eq!(token.client_token, "k8s-token"); + assert_eq!(token.lease_duration, 7200); + } + + #[tokio::test] + async fn test_kubernetes_login_propagates_error() { + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "".into(), + }))); + let err = kubernetes_login("http://irrelevant", "kubernetes", "pgdog", "jwt") + .await + .unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 403, .. })); + } + + #[test] + fn test_parse_credential_response() { + let json = r#"{ + "data": { + "username": "v-approle-dml-AbCdEf", + "password": "s3cr3t-pass" + }, + "lease_duration": 3600, + "lease_id": "database/creds/dml-role/xyz", + "renewable": true + }"#; + let parsed: CredentialResponse = serde_json::from_str(json).unwrap(); + assert_eq!(parsed.data.username, "v-approle-dml-AbCdEf"); + assert_eq!(parsed.data.password, "s3cr3t-pass"); + assert_eq!(parsed.lease_duration, 3600); + } + + #[tokio::test] + async fn test_approle_login_uses_override() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "test-token".into(), + lease_duration: 3600, + renewable: true, + }))); + let token = approle_login("http://irrelevant", "role", "secret") + .await + .unwrap(); + assert_eq!(token.client_token, "test-token"); + assert_eq!(token.lease_duration, 3600); + } + + #[tokio::test] + async fn test_fetch_credential_uses_override() { + test_support::set_credential(Some(Ok(VaultCredential { + username: "v-approle-dml-XyZ".into(), + password: "pw".into(), + lease_duration: 86400, + }))); + let cred = fetch_credential("http://irrelevant", "tok", "database/creds/dml-role") + .await + .unwrap(); + assert_eq!(cred.username, "v-approle-dml-XyZ"); + assert_eq!(cred.lease_duration, 86400); + } + + #[tokio::test] + async fn test_approle_login_propagates_error_override() { + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "".into(), + }))); + let err = approle_login("http://irrelevant", "role", "bad") + .await + .unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 403, .. })); + } +} diff --git a/pgdog/src/backend/auth/vault/error.rs b/pgdog/src/backend/auth/vault/error.rs new file mode 100644 index 000000000..b47a6e11d --- /dev/null +++ b/pgdog/src/backend/auth/vault/error.rs @@ -0,0 +1,22 @@ +use thiserror::Error; + +#[derive(Debug, Error)] +pub enum Error { + #[error("vault: HTTP error: {0}")] + Http(String), + + #[error("vault: unexpected status {status}: {body}")] + VaultStatus { status: u16, body: String }, + + #[error("vault: response parse error: {0}")] + Parse(String), + + #[error("vault: secret_id not available: {0}")] + SecretId(String), + + #[error("vault: config update failed: {0}")] + ConfigUpdate(String), + + #[error("vault: no pool named \"{0}\" found in config — check that vault_path is set on the correct [[users]] entry")] + PoolNotFound(String), +} diff --git a/pgdog/src/backend/auth/vault/mod.rs b/pgdog/src/backend/auth/vault/mod.rs new file mode 100644 index 000000000..c24d6630f --- /dev/null +++ b/pgdog/src/backend/auth/vault/mod.rs @@ -0,0 +1,645 @@ +//! Vault dynamic credential lifecycle management for pgdog backend pools. +//! +//! ## Startup +//! +//! Call [`init`] **before** `databases::init()`. It authenticates to Vault, fetches +//! credentials for every user with `vault_path`, and writes them into the live +//! config via [`update_config`] (no pool reload). This ensures pools are created +//! with the correct credentials on the very first connection. +//! +//! ## Background renewal +//! +//! After `databases::init()`, call [`VaultManager::start`] with the `initial_delay` +//! returned by [`init`]. The single background task: +//! +//! 1. Sleeps for `initial_delay` (skips redundant re-fetch of credentials just obtained at startup). +//! 2. Authenticates to Vault, fetches fresh credentials for all pools, and calls +//! [`apply_credential`] which writes the config and triggers `reload_from_existing()`. +//! 3. Sleeps until `pre_rotation_pct`% of the shortest lease TTL has elapsed, then repeats. +//! +//! If any step fails the task retries with exponential backoff (capped at 60 s). + +pub mod api; +pub mod error; + +use std::time::Duration; + +use tokio::task::JoinHandle; +use tracing::{error, info}; + +use crate::backend::databases::{lock, reload_from_existing}; +use crate::config::{config, set}; + +pub use api::{VaultCredential, VaultToken}; +pub use error::Error; + +use pgdog_config::{User, VaultAuthMethod, VaultConfig}; + +// ── startup init ────────────────────────────────────────────────────────────── + +/// Fetch Vault credentials for all users with `vault_path` and write them into +/// the live config without triggering pool reload. Call this **before** +/// `databases::init()` so pools start with the correct credentials. +/// +/// Returns the rotation interval to pass to [`VaultManager::start`] as +/// `initial_delay`. Returns `Duration::ZERO` on error so the background task +/// retries immediately. +pub async fn init(vault_config: &VaultConfig, users: &[User]) -> Duration { + let pools = vault_pools(users); + if pools.is_empty() { + return Duration::ZERO; + } + + match fetch_and_update_config(vault_config, &pools).await { + Ok(min_lease) => { + info!(pools = pools.len(), "vault: initial credentials fetched"); + rotation_interval(min_lease, vault_config.pre_rotation_pct) + } + Err(err) => { + error!("vault: initial credential fetch failed: {err}"); + Duration::ZERO + } + } +} + +// ── public interface ────────────────────────────────────────────────────────── + +/// Owns the single background renewal task. +pub struct VaultManager { + handle: JoinHandle<()>, +} + +impl VaultManager { + /// Spawn a single renewal task covering all pools with `vault_path` configured. + /// + /// `initial_delay` is the value returned by [`init`]. The task sleeps for that + /// duration before its first renewal so it does not redundantly re-fetch + /// credentials that were just obtained at startup. Pass `Duration::ZERO` to + /// start immediately (e.g. when [`init`] failed). + /// + /// Returns `None` when no users have `vault_path` set. + pub fn start( + vault_config: &VaultConfig, + users: &[User], + initial_delay: Duration, + ) -> Option { + let pools = vault_pools(users); + if pools.is_empty() { + return None; + } + + let cfg = vault_config.clone(); + let handle = tokio::spawn(async move { + renewal_task(cfg, pools, initial_delay).await; + }); + + Some(Self { handle }) + } +} + +impl Drop for VaultManager { + fn drop(&mut self) { + self.handle.abort(); + } +} + +// ── background task ─────────────────────────────────────────────────────────── + +/// A single task that sleeps for `initial_delay`, then repeatedly authenticates +/// to Vault, rotates credentials for all configured pools, and sleeps until the +/// next rotation is due. +async fn renewal_task(cfg: VaultConfig, pools: Vec<(String, String)>, initial_delay: Duration) { + // Skip initial sleep when init() failed (initial_delay == ZERO) to retry promptly. + if !initial_delay.is_zero() { + tokio::time::sleep(initial_delay).await; + } + + let mut backoff = Duration::from_secs(1); + const MAX_BACKOFF: Duration = Duration::from_secs(60); + + loop { + match rotation_cycle(&cfg, &pools).await { + Ok(min_lease) => { + backoff = Duration::from_secs(1); + let next = rotation_interval(min_lease, cfg.pre_rotation_pct); + info!( + pools = pools.len(), + min_lease_secs = min_lease, + next_refresh_secs = next.as_secs(), + "vault: credentials rotated" + ); + tokio::time::sleep(next).await; + } + Err(err) => { + error!( + backoff_secs = backoff.as_secs(), + "vault: rotation failed: {err}" + ); + tokio::time::sleep(backoff).await; + backoff = (backoff * 2).min(MAX_BACKOFF); + } + } + } +} + +/// Authenticate once, fetch credentials for every pool, call `on_credential` for +/// each. Returns minimum lease duration across all pools. +async fn fetch_credentials( + cfg: &VaultConfig, + pools: &[(String, String)], + mut on_credential: impl FnMut(&str, &VaultCredential) -> Result<(), Error>, +) -> Result { + let token = vault_login(cfg).await?; + let mut min_lease = u64::MAX; + + for (pool_name, vault_path) in pools { + let cred = api::fetch_credential(&cfg.address, &token.client_token, vault_path).await?; + if cred.lease_duration == 0 { + tracing::warn!( + pool = %pool_name, + "vault: lease_duration is 0 — credentials may not be renewable; check Vault backend config" + ); + } + on_credential(pool_name, &cred)?; + min_lease = min_lease.min(cred.lease_duration); + } + + Ok(if min_lease == u64::MAX { 0 } else { min_lease }) +} + +/// Fetch + apply credentials (with pool reload). Used by renewal task. +async fn rotation_cycle(cfg: &VaultConfig, pools: &[(String, String)]) -> Result { + fetch_credentials(cfg, pools, apply_credential).await +} + +/// Fetch + write credentials to config without pool reload. Used by [`init`]. +async fn fetch_and_update_config( + cfg: &VaultConfig, + pools: &[(String, String)], +) -> Result { + fetch_credentials(cfg, pools, update_config).await +} + +/// Authenticate to Vault using whichever method is configured. +async fn vault_login(cfg: &VaultConfig) -> Result { + match cfg.auth_method { + VaultAuthMethod::AppRole => { + let role_id = cfg.role_id.as_deref().ok_or_else(|| { + Error::SecretId("vault: role_id is required for AppRole auth".into()) + })?; + let secret_id = cfg + .secret_id() + .map_err(|e| Error::SecretId(e.to_string()))?; + api::approle_login(&cfg.address, role_id, &secret_id).await + } + VaultAuthMethod::Kubernetes => { + let role = cfg.kubernetes_role.as_deref().ok_or_else(|| { + Error::SecretId("vault: kubernetes_role is required for Kubernetes auth".into()) + })?; + let jwt = tokio::fs::read_to_string(cfg.jwt_path()) + .await + .map(|s| s.trim().to_string()) + .map_err(|e| { + Error::SecretId(format!( + "vault: failed to read JWT from {}: {e}", + cfg.jwt_path() + )) + })?; + api::kubernetes_login(&cfg.address, &cfg.kubernetes_mount_path, role, &jwt).await + } + } +} + +// ── config update ───────────────────────────────────────────────────────────── + +/// Update `server_user` / `server_password` for `pool_name` in the live config, +/// then trigger `reload_from_existing()` so pgdog reconnects with the new credentials. +fn apply_credential(pool_name: &str, cred: &VaultCredential) -> Result<(), Error> { + // update_config acquires and releases databases::lock() internally. + // reload_from_existing() must be called AFTER the lock is released — it + // re-acquires the same lock, so holding it across the call would deadlock. + update_config(pool_name, cred)?; + reload_from_existing().map_err(|e| Error::ConfigUpdate(e.to_string())) +} + +/// Write `server_user` / `server_password` for `pool_name` into the live config. +/// Does **not** trigger pool reload; used by [`init`] before pools exist. +fn update_config(pool_name: &str, cred: &VaultCredential) -> Result<(), Error> { + let _lock = lock(); + let mut cfg = (*config()).clone(); + + let found = cfg.users.users.iter_mut().any(|u| { + if u.name == pool_name { + u.server_user = Some(cred.username.clone()); + u.server_password = Some(cred.password.clone()); + true + } else { + false + } + }); + + if !found { + return Err(Error::PoolNotFound(pool_name.to_string())); + } + + set(cfg) + .map(|_| ()) + .map_err(|e| Error::ConfigUpdate(e.to_string())) +} + +// ── helpers ─────────────────────────────────────────────────────────────────── + +fn vault_pools(users: &[User]) -> Vec<(String, String)> { + users + .iter() + .filter_map(|u| { + u.vault_path + .as_ref() + .map(|path| (u.name.clone(), path.clone())) + }) + .collect() +} + +/// Minimum rotation interval regardless of TTL, to prevent a busy-loop when +/// Vault returns `lease_duration = 0` (non-renewable credentials, misconfigured backend). +const MIN_ROTATION_INTERVAL: Duration = Duration::from_secs(10); + +/// How long to wait before fetching the next credential generation. +/// Clamps `pre_rotation_pct` to [1, 99] and enforces a 10-second minimum +/// so a zero-TTL response never causes a tight loop hammering Vault. +pub fn rotation_interval(lease_duration_secs: u64, pre_rotation_pct: u8) -> Duration { + let pct = pre_rotation_pct.clamp(1, 99) as f64 / 100.0; + let computed = Duration::from_secs_f64(lease_duration_secs as f64 * pct); + computed.max(MIN_ROTATION_INTERVAL) +} + +// ── tests ───────────────────────────────────────────────────────────────────── + +#[cfg(test)] +mod tests { + use super::api::test_support; + use super::*; + use crate::config::{load_test, set}; + use pgdog_config::VaultConfig; + + /// Set up a minimal config with a "pgdog" user but do NOT call databases::init(). + /// Use this in tests that exercise the config-update path but never call + /// reload_from_existing() (e.g. error paths, init()-only paths). + fn set_config_only() { + let mut cfg = pgdog_config::ConfigAndUsers::default(); + cfg.config.databases = vec![pgdog_config::Database { + name: "pgdog".into(), + host: "127.0.0.1".into(), + port: 5432, + ..Default::default() + }]; + cfg.users.users = vec![pgdog_config::User { + name: "pgdog".into(), + database: "pgdog".into(), + password: Some("pgdog".into()), + ..Default::default() + }]; + set(cfg).unwrap(); + } + + fn vault_cfg() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: pgdog_config::VaultAuthMethod::AppRole, + pre_rotation_pct: 75, + role_id: Some("test-role-id".into()), + secret_id: Some("test-secret-id".into()), + secret_id_file: None, + kubernetes_role: None, + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + } + } + + fn k8s_cfg() -> VaultConfig { + VaultConfig { + address: "http://127.0.0.1:8200".into(), + auth_method: pgdog_config::VaultAuthMethod::Kubernetes, + pre_rotation_pct: 75, + role_id: None, + secret_id: None, + secret_id_file: None, + kubernetes_role: Some("pgdog".into()), + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + } + } + + fn test_cred(username: &str) -> VaultCredential { + test_cred_with_lease(username, 3600) + } + + fn test_cred_with_lease(username: &str, lease_duration: u64) -> VaultCredential { + VaultCredential { + username: username.into(), + password: "s3cr3t".into(), + lease_duration, + } + } + + // ── rotation_interval ──────────────────────────────────────────────────── + + #[test] + fn test_rotation_interval_75pct_of_one_hour() { + assert_eq!(rotation_interval(3600, 75), Duration::from_secs(2700)); + } + + #[test] + fn test_rotation_interval_50pct_of_one_day() { + assert_eq!(rotation_interval(86400, 50), Duration::from_secs(43200)); + } + + #[test] + fn test_rotation_interval_clamps_100_to_99() { + // 100 clamped → 99%; should equal explicit 99% + assert_eq!(rotation_interval(100, 100), rotation_interval(100, 99)); + assert_eq!(rotation_interval(100, 99), Duration::from_secs(99)); + } + + #[test] + fn test_rotation_interval_clamps_0_to_1() { + // 0 clamped → 1%; 1% of 1000s = 10s, which equals MIN_ROTATION_INTERVAL + assert_eq!(rotation_interval(1000, 0), Duration::from_secs(10)); + } + + #[test] + fn test_rotation_interval_zero_lease_uses_minimum() { + // Vault returning lease_duration=0 must not cause a busy-loop. + assert_eq!(rotation_interval(0, 75), MIN_ROTATION_INTERVAL); + assert_eq!(rotation_interval(0, 0), MIN_ROTATION_INTERVAL); + } + + #[test] + fn test_rotation_interval_short_lease_uses_minimum() { + // 1s lease at 75% = 0.75s, below minimum — should clamp to 10s. + assert_eq!(rotation_interval(1, 75), MIN_ROTATION_INTERVAL); + } + + // ── apply_credential ───────────────────────────────────────────────────── + + #[tokio::test] + async fn test_apply_credential_errors_on_unknown_pool() { + // update_config returns PoolNotFound before reload_from_existing is called, + // so no pool init needed. + set_config_only(); + let cred = test_cred("v-approle-dml-AbCdEf"); + let err = apply_credential("nonexistent_vault_pool", &cred).unwrap_err(); + assert!(matches!(err, Error::PoolNotFound(_))); + } + + #[tokio::test] + async fn test_apply_credential_updates_known_pool() { + load_test(); + let cred = test_cred("v-approle-pgdog-AbCdEf"); + // "pgdog" is the user set up by load_test() + let result = apply_credential("pgdog", &cred); + assert!( + result.is_ok(), + "apply_credential should succeed for a known pool" + ); + } + + // ── rotation_cycle (mocked) ─────────────────────────────────────────────── + + #[tokio::test] + async fn test_rotation_cycle_uses_api_overrides() { + load_test(); + + test_support::set_login(Some(Ok(VaultToken { + client_token: "tok".into(), + lease_duration: 3600, + renewable: true, + }))); + test_support::set_credential(Some(Ok(test_cred("v-approle-dml-mock")))); + + let cfg = vault_cfg(); + let pools = vec![("pgdog".into(), "database/creds/dml-role".into())]; + let lease = rotation_cycle(&cfg, &pools) + .await + .expect("rotation_cycle should succeed with mocked API"); + + assert_eq!(lease, 3600); + } + + #[tokio::test] + async fn test_rotation_cycle_propagates_login_error() { + // Login fails before apply_credential is called, so no pool init needed. + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "".into(), + }))); + + let pools = vec![("pool".into(), "database/creds/role".into())]; + let err = rotation_cycle(&vault_cfg(), &pools).await.unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 403, .. })); + } + + #[tokio::test] + async fn test_rotation_cycle_propagates_credential_error() { + // Credential fetch fails before apply_credential is called, so no pool init needed. + test_support::set_login(Some(Ok(VaultToken { + client_token: "tok".into(), + lease_duration: 3600, + renewable: true, + }))); + test_support::set_credential(Some(Err(Error::VaultStatus { + status: 500, + body: "".into(), + }))); + + let pools = vec![("pool".into(), "database/creds/role".into())]; + let err = rotation_cycle(&vault_cfg(), &pools).await.unwrap_err(); + assert!(matches!(err, Error::VaultStatus { status: 500, .. })); + } + + // ── VaultManager ───────────────────────────────────────────────────────── + + #[test] + fn test_manager_returns_none_when_no_vault_users() { + let users: Vec = vec![pgdog_config::User { + name: "plain_user".into(), + vault_path: None, + ..Default::default() + }]; + let result = VaultManager::start(&vault_cfg(), &users, Duration::ZERO); + assert!(result.is_none()); + } + + #[tokio::test] + async fn test_manager_spawns_tasks_for_vault_users() { + let users = vec![ + pgdog_config::User { + name: "dml_role".into(), + vault_path: Some("database/creds/dml-role".into()), + ..Default::default() + }, + pgdog_config::User { + name: "ro_role".into(), + vault_path: Some("database/creds/ro-role".into()), + ..Default::default() + }, + pgdog_config::User { + name: "plain".into(), + vault_path: None, + ..Default::default() + }, + ]; + // Large initial_delay so the spawned renewal_task never actually fires during + // the test — we're only checking that a manager is created, not that it runs. + let manager = VaultManager::start(&vault_cfg(), &users, Duration::from_secs(3600)); + // Two vault_path users → one task managing both pools + assert!(manager.is_some()); + } + + // ── init ───────────────────────────────────────────────────────────────── + + #[tokio::test] + async fn test_init_populates_config_and_returns_rotation_interval() { + // init() calls update_config() only — no reload_from_existing, no pool init needed. + set_config_only(); + + test_support::set_login(Some(Ok(VaultToken { + client_token: "tok".into(), + lease_duration: 3600, + renewable: true, + }))); + test_support::set_credential(Some(Ok(test_cred("v-approle-init-AbCdEf")))); + + let users = vec![pgdog_config::User { + name: "pgdog".into(), + vault_path: Some("database/creds/dml-role".into()), + ..Default::default() + }]; + + let delay = init(&vault_cfg(), &users).await; + // 75% of 3600s = 2700s + assert_eq!(delay, Duration::from_secs(2700)); + + let cfg = config(); + let user = cfg.users.users.iter().find(|u| u.name == "pgdog").unwrap(); + assert_eq!(user.server_user.as_deref(), Some("v-approle-init-AbCdEf")); + assert_eq!(user.server_password.as_deref(), Some("s3cr3t")); + } + + #[tokio::test] + async fn test_init_returns_zero_on_login_failure() { + test_support::set_login(Some(Err(Error::VaultStatus { + status: 403, + body: "permission denied".into(), + }))); + + let users = vec![pgdog_config::User { + name: "dml_role".into(), + vault_path: Some("database/creds/dml-role".into()), + ..Default::default() + }]; + + let delay = init(&vault_cfg(), &users).await; + assert_eq!( + delay, + Duration::ZERO, + "failed init must return ZERO so background task retries immediately" + ); + } + + #[tokio::test] + async fn test_init_returns_zero_when_no_vault_users() { + let users: Vec = vec![]; + let delay = init(&vault_cfg(), &users).await; + assert_eq!(delay, Duration::ZERO); + } + + // ── AppRole vault_login ─────────────────────────────────────────────────── + + #[tokio::test] + async fn test_vault_login_approle() { + test_support::set_login(Some(Ok(VaultToken { + client_token: "approle-tok".into(), + lease_duration: 3600, + renewable: true, + }))); + let tok = vault_login(&vault_cfg()).await.unwrap(); + assert_eq!(tok.client_token, "approle-tok"); + } + + #[tokio::test] + async fn test_vault_login_approle_missing_role_id_errors() { + let cfg = VaultConfig { + role_id: None, + ..vault_cfg() + }; + let err = vault_login(&cfg).await.unwrap_err(); + assert!(matches!(err, Error::SecretId(_))); + } + + // ── Kubernetes vault_login ──────────────────────────────────────────────── + + #[tokio::test] + async fn test_vault_login_kubernetes() { + use std::io::Write; + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, "eyJhbGciOiJSUzI1NiJ9.stub").unwrap(); + + test_support::set_login(Some(Ok(VaultToken { + client_token: "k8s-tok".into(), + lease_duration: 7200, + renewable: true, + }))); + + let cfg = VaultConfig { + kubernetes_jwt_path: f.path().to_str().unwrap().into(), + ..k8s_cfg() + }; + let tok = vault_login(&cfg).await.unwrap(); + assert_eq!(tok.client_token, "k8s-tok"); + } + + #[tokio::test] + async fn test_vault_login_kubernetes_missing_role_errors() { + let cfg = VaultConfig { + kubernetes_role: None, + ..k8s_cfg() + }; + let err = vault_login(&cfg).await.unwrap_err(); + assert!(matches!(err, Error::SecretId(_))); + } + + #[tokio::test] + async fn test_vault_login_kubernetes_missing_jwt_file_errors() { + let cfg = VaultConfig { + kubernetes_jwt_path: "/nonexistent/token".into(), + ..k8s_cfg() + }; + let err = vault_login(&cfg).await.unwrap_err(); + assert!(matches!(err, Error::SecretId(_))); + } + + #[tokio::test] + async fn test_rotation_cycle_kubernetes() { + use std::io::Write; + load_test(); + + let mut f = tempfile::NamedTempFile::new().unwrap(); + writeln!(f, "eyJhbGciOiJSUzI1NiJ9.stub").unwrap(); + + test_support::set_login(Some(Ok(VaultToken { + client_token: "k8s-tok".into(), + lease_duration: 7200, + renewable: true, + }))); + test_support::set_credential(Some(Ok(test_cred_with_lease("v-k8s-dml-AbCdEf", 7200)))); + + let cfg = VaultConfig { + kubernetes_jwt_path: f.path().to_str().unwrap().into(), + ..k8s_cfg() + }; + let pools = vec![("pgdog".into(), "database/creds/dml-role".into())]; + let lease = rotation_cycle(&cfg, &pools).await.unwrap(); + assert_eq!(lease, 7200); + } +} diff --git a/pgdog/src/backend/pool/address.rs b/pgdog/src/backend/pool/address.rs index 46c3e834e..23837ca41 100644 --- a/pgdog/src/backend/pool/address.rs +++ b/pgdog/src/backend/pool/address.rs @@ -91,7 +91,8 @@ impl Address { pub async fn auth_secrets(&self) -> Result, Error> { match self.server_auth { - ServerAuth::Password => Ok(self.passwords.clone()), + // Vault: vault::init() already wrote server_password into Address.passwords at startup + ServerAuth::Password | ServerAuth::Vault => Ok(self.passwords.clone()), ServerAuth::RdsIam => Ok(vec![crate::backend::auth::rds_iam::token(self).await?]), ServerAuth::AzureWorkloadIdentity => Ok(vec![ crate::backend::auth::azure_workload_identity::token(self).await?, @@ -207,6 +208,54 @@ mod test { assert_eq!(address.passwords.first().unwrap(), "hunter3"); } + #[test] + fn test_vault_uses_server_password() { + // vault::init() writes server_user + server_password; no database-level password set. + let database = Database { + name: "myapp".into(), + host: "127.0.0.1".into(), + port: 5432, + ..Default::default() + }; + let user = User { + name: "dml_role".into(), + password: Some("client-pass".into()), + server_user: Some("v-approle-dml-XyZ".into()), + server_password: Some("vault-pass".into()), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/dml-role".into()), + database: "myapp".into(), + ..Default::default() + }; + + let address = Address::new(&database, &user, 0); + assert_eq!(address.user, "v-approle-dml-XyZ", "must use vault-generated username"); + assert_eq!(address.passwords, vec!["vault-pass"], "must use vault-generated password"); + assert_eq!(address.server_auth, ServerAuth::Vault); + } + + #[test] + fn test_vault_empty_before_init() { + // vault::init() not yet run: server_user/server_password None, no client password set. + let database = Database { + name: "myapp".into(), + host: "127.0.0.1".into(), + port: 5432, + ..Default::default() + }; + let user = User { + name: "dml_role".into(), + server_auth: ServerAuth::Vault, + vault_path: Some("database/creds/dml-role".into()), + database: "myapp".into(), + ..Default::default() + }; + + let address = Address::new(&database, &user, 0); + assert_eq!(address.user, "dml_role"); + assert!(address.passwords.is_empty()); + } + #[test] fn test_rds_iam_does_not_use_static_password() { let database = Database { diff --git a/pgdog/src/main.rs b/pgdog/src/main.rs index fd1179fd0..5017609c0 100644 --- a/pgdog/src/main.rs +++ b/pgdog/src/main.rs @@ -115,9 +115,24 @@ async fn pgdog(command: Option) -> Result<(), Box Date: Mon, 4 May 2026 17:26:21 +0200 Subject: [PATCH 2/4] cleanup --- integration/vault/pgdog.toml | 5 +++-- pgdog/src/backend/auth/vault/api.rs | 13 +++++++++---- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/integration/vault/pgdog.toml b/integration/vault/pgdog.toml index ceb220405..d02cb2d70 100644 --- a/integration/vault/pgdog.toml +++ b/integration/vault/pgdog.toml @@ -28,5 +28,6 @@ role = "primary" [vault] address = "http://127.0.0.1:18200" auth_method = "app_role" -role_id = "9d9a096a-35ba-6e01-980e-582fb6fa9ae5" -secret_id = "35cb60a5-78b5-a6b5-e88d-c8051882b341" +# Run integration/vault/setup-vault.sh and paste the output here. +role_id = "" +secret_id = "" diff --git a/pgdog/src/backend/auth/vault/api.rs b/pgdog/src/backend/auth/vault/api.rs index 905527a7e..4a37756ba 100644 --- a/pgdog/src/backend/auth/vault/api.rs +++ b/pgdog/src/backend/auth/vault/api.rs @@ -1,5 +1,6 @@ //! Vault HTTP API client built on `reqwest`. +use once_cell::sync::Lazy; use serde::Deserialize; use super::Error; @@ -49,10 +50,14 @@ struct CredentialData { // ── public API ──────────────────────────────────────────────────────────────── -fn client() -> Result { +static CLIENT: Lazy = Lazy::new(|| { reqwest::Client::builder() .build() - .map_err(|e| Error::Http(e.to_string())) + .expect("failed to build Vault HTTP client") +}); + +fn client() -> &'static reqwest::Client { + &CLIENT } /// Authenticate to Vault via AppRole and return a client token. @@ -99,7 +104,7 @@ pub async fn kubernetes_login( } async fn post_login(url: &str, body: &serde_json::Value) -> Result { - let response = client()? + let response = client() .post(url) .json(body) .send() @@ -137,7 +142,7 @@ pub async fn fetch_credential( path.trim_start_matches('/') ); - let response = client()? + let response = client() .get(&url) .header("X-Vault-Token", token) .send() From 3cd2ec4c46e88a5a116404d7d44823bb04cb2435 Mon Sep 17 00:00:00 2001 From: Giuseppe D'Anna Date: Mon, 4 May 2026 18:14:09 +0200 Subject: [PATCH 3/4] vault tls --- integration/vault/pgdog.toml | 1 + pgdog-config/src/lib.rs | 2 +- pgdog-config/src/vault.rs | 31 +++ pgdog/src/backend/auth/vault/api.rs | 138 +++++++++--- pgdog/src/backend/auth/vault/mod.rs | 334 ++++++++++++++++------------ 5 files changed, 329 insertions(+), 177 deletions(-) diff --git a/integration/vault/pgdog.toml b/integration/vault/pgdog.toml index d02cb2d70..82c09a947 100644 --- a/integration/vault/pgdog.toml +++ b/integration/vault/pgdog.toml @@ -28,6 +28,7 @@ role = "primary" [vault] address = "http://127.0.0.1:18200" auth_method = "app_role" +tls_verify = "disable" # Run integration/vault/setup-vault.sh and paste the output here. role_id = "" secret_id = "" diff --git a/pgdog-config/src/lib.rs b/pgdog-config/src/lib.rs index 9165749a8..ee7c5e27b 100644 --- a/pgdog-config/src/lib.rs +++ b/pgdog-config/src/lib.rs @@ -37,7 +37,7 @@ pub use rewrite::{Rewrite, RewriteMode}; pub use sharding::*; pub use system_catalogs::system_catalogs; pub use users::{Admin, Plugin, ServerAuth, User, Users}; -pub use vault::{VaultAuthMethod, VaultConfig}; +pub use vault::{VaultAuthMethod, VaultConfig, VaultTlsVerify}; use std::time::Duration; diff --git a/pgdog-config/src/vault.rs b/pgdog-config/src/vault.rs index 95c687eb0..e042556d6 100644 --- a/pgdog-config/src/vault.rs +++ b/pgdog-config/src/vault.rs @@ -1,6 +1,21 @@ use schemars::JsonSchema; use serde::{Deserialize, Serialize}; +/// How pgdog verifies the Vault server's TLS certificate. +#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] +#[serde(rename_all = "kebab-case")] +pub enum VaultTlsVerify { + /// Skip all TLS verification. Accepts any certificate or hostname. + /// Use only in development; never in production. + Disable, + /// Verify the server certificate is signed by a trusted CA, but do not + /// check that the hostname matches the certificate's CN / SANs. + VerifyCa, + /// Full TLS verification: trusted CA **and** hostname match (default). + #[default] + VerifyFull, +} + /// Vault authentication method used by pgdog to obtain a Vault token. #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Default, JsonSchema)] #[serde(rename_all = "snake_case")] @@ -75,6 +90,18 @@ pub struct VaultConfig { /// Vault mount path for the Kubernetes auth method. Default: `kubernetes`. #[serde(default = "VaultConfig::default_kubernetes_mount_path")] pub kubernetes_mount_path: String, + + // ── TLS fields ──────────────────────────────────────────────────────────── + + /// TLS verification mode for connections to the Vault server. + /// Default: `verify-full`. + #[serde(default)] + pub tls_verify: VaultTlsVerify, + + /// Path to a PEM-encoded CA certificate bundle used to verify the Vault + /// server's TLS certificate. When provided, the bundle is added on top of + /// the system-wide certificate store. When omitted, only system certs are used. + pub tls_server_ca_certificate: Option, } impl VaultConfig { @@ -127,6 +154,8 @@ mod tests { kubernetes_role: None, kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, } } @@ -141,6 +170,8 @@ mod tests { kubernetes_role: Some("pgdog".into()), kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, } } diff --git a/pgdog/src/backend/auth/vault/api.rs b/pgdog/src/backend/auth/vault/api.rs index 4a37756ba..de8305068 100644 --- a/pgdog/src/backend/auth/vault/api.rs +++ b/pgdog/src/backend/auth/vault/api.rs @@ -1,8 +1,9 @@ //! Vault HTTP API client built on `reqwest`. -use once_cell::sync::Lazy; use serde::Deserialize; +use pgdog_config::{VaultConfig, VaultTlsVerify}; + use super::Error; // ── public types ────────────────────────────────────────────────────────────── @@ -48,20 +49,42 @@ struct CredentialData { password: String, } -// ── public API ──────────────────────────────────────────────────────────────── +// ── client construction ─────────────────────────────────────────────────────── + +/// Build a `reqwest::Client` configured with the TLS settings from `cfg`. +/// Call once per vault config (at startup and in the renewal task) and reuse +/// the resulting client for all API calls within that lifecycle. +pub fn build_client(cfg: &VaultConfig) -> Result { + let mut builder = reqwest::Client::builder(); + + builder = match cfg.tls_verify { + VaultTlsVerify::Disable => builder.danger_accept_invalid_certs(true), + VaultTlsVerify::VerifyCa => builder.danger_accept_invalid_hostnames(true), + VaultTlsVerify::VerifyFull => builder, + }; + + if let Some(ref ca_path) = cfg.tls_server_ca_certificate { + let pem = std::fs::read(ca_path).map_err(|e| { + Error::Http(format!( + "vault: failed to read tls_server_ca_certificate {ca_path}: {e}" + )) + })?; + let cert = reqwest::Certificate::from_pem(&pem).map_err(|e| { + Error::Http(format!("vault: invalid tls_server_ca_certificate: {e}")) + })?; + builder = builder.add_root_certificate(cert); + } -static CLIENT: Lazy = Lazy::new(|| { - reqwest::Client::builder() + builder .build() - .expect("failed to build Vault HTTP client") -}); - -fn client() -> &'static reqwest::Client { - &CLIENT + .map_err(|e| Error::Http(format!("vault: failed to build HTTP client: {e}"))) } +// ── public API ──────────────────────────────────────────────────────────────── + /// Authenticate to Vault via AppRole and return a client token. pub async fn approle_login( + client: &reqwest::Client, addr: &str, role_id: &str, secret_id: &str, @@ -74,7 +97,7 @@ pub async fn approle_login( let url = format!("{}/v1/auth/approle/login", addr.trim_end_matches('/')); let body = serde_json::json!({ "role_id": role_id, "secret_id": secret_id }); - post_login(&url, &body).await + post_login(client, &url, &body).await } /// Authenticate to Vault via Kubernetes service account JWT and return a client token. @@ -83,6 +106,7 @@ pub async fn approle_login( /// `role` is the Vault role name configured for this cluster. /// `jwt` is the contents of the pod's service account token file. pub async fn kubernetes_login( + client: &reqwest::Client, addr: &str, mount_path: &str, role: &str, @@ -100,11 +124,15 @@ pub async fn kubernetes_login( ); let body = serde_json::json!({ "role": role, "jwt": jwt }); - post_login(&url, &body).await + post_login(client, &url, &body).await } -async fn post_login(url: &str, body: &serde_json::Value) -> Result { - let response = client() +async fn post_login( + client: &reqwest::Client, + url: &str, + body: &serde_json::Value, +) -> Result { + let response = client .post(url) .json(body) .send() @@ -127,6 +155,7 @@ async fn post_login(url: &str, body: &serde_json::Value) -> Result Result = Lazy::new(|| { + let _ = tokio_rustls::rustls::crypto::aws_lc_rs::default_provider().install_default(); + }); + + fn test_client() -> reqwest::Client { + let _ = *RING; + reqwest::Client::new() + } #[test] fn test_parse_login_response() { @@ -235,6 +279,7 @@ mod tests { renewable: true, }))); let token = kubernetes_login( + &test_client(), "http://irrelevant", "kubernetes", "pgdog", @@ -248,11 +293,8 @@ mod tests { #[tokio::test] async fn test_kubernetes_login_propagates_error() { - test_support::set_login(Some(Err(Error::VaultStatus { - status: 403, - body: "".into(), - }))); - let err = kubernetes_login("http://irrelevant", "kubernetes", "pgdog", "jwt") + test_support::set_login(Some(Err(Error::VaultStatus { status: 403, body: "".into() }))); + let err = kubernetes_login(&test_client(), "http://irrelevant", "kubernetes", "pgdog", "jwt") .await .unwrap_err(); assert!(matches!(err, Error::VaultStatus { status: 403, .. })); @@ -282,7 +324,7 @@ mod tests { lease_duration: 3600, renewable: true, }))); - let token = approle_login("http://irrelevant", "role", "secret") + let token = approle_login(&test_client(), "http://irrelevant", "role", "secret") .await .unwrap(); assert_eq!(token.client_token, "test-token"); @@ -296,22 +338,62 @@ mod tests { password: "pw".into(), lease_duration: 86400, }))); - let cred = fetch_credential("http://irrelevant", "tok", "database/creds/dml-role") - .await - .unwrap(); + let cred = fetch_credential( + &test_client(), + "http://irrelevant", + "tok", + "database/creds/dml-role", + ) + .await + .unwrap(); assert_eq!(cred.username, "v-approle-dml-XyZ"); assert_eq!(cred.lease_duration, 86400); } #[tokio::test] async fn test_approle_login_propagates_error_override() { - test_support::set_login(Some(Err(Error::VaultStatus { - status: 403, - body: "".into(), - }))); - let err = approle_login("http://irrelevant", "role", "bad") + test_support::set_login(Some(Err(Error::VaultStatus { status: 403, body: "".into() }))); + let err = approle_login(&test_client(), "http://irrelevant", "role", "bad") .await .unwrap_err(); assert!(matches!(err, Error::VaultStatus { status: 403, .. })); } + + // ── build_client TLS ───────────────────────────────────────────────────── + + fn tls_cfg(tls_verify: VaultTlsVerify, ca: Option<&str>) -> VaultConfig { + VaultConfig { + address: "https://vault.example.com".into(), + auth_method: pgdog_config::VaultAuthMethod::AppRole, + pre_rotation_pct: 75, + role_id: None, + secret_id: None, + secret_id_file: None, + kubernetes_role: None, + kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), + kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify, + tls_server_ca_certificate: ca.map(String::from), + } + } + + #[test] + fn test_build_client_verify_full_default() { + assert!(build_client(&tls_cfg(VaultTlsVerify::VerifyFull, None)).is_ok()); + } + + #[test] + fn test_build_client_disable() { + assert!(build_client(&tls_cfg(VaultTlsVerify::Disable, None)).is_ok()); + } + + #[test] + fn test_build_client_verify_ca() { + assert!(build_client(&tls_cfg(VaultTlsVerify::VerifyCa, None)).is_ok()); + } + + #[test] + fn test_build_client_invalid_ca_path_errors() { + assert!(build_client(&tls_cfg(VaultTlsVerify::VerifyFull, Some("/nonexistent/ca.pem"))).is_err()); + } } diff --git a/pgdog/src/backend/auth/vault/mod.rs b/pgdog/src/backend/auth/vault/mod.rs index c24d6630f..d84cc4980 100644 --- a/pgdog/src/backend/auth/vault/mod.rs +++ b/pgdog/src/backend/auth/vault/mod.rs @@ -35,6 +35,94 @@ pub use error::Error; use pgdog_config::{User, VaultAuthMethod, VaultConfig}; +// ── Vault HTTP client ───────────────────────────────────────────────────────── + +/// Holds the `reqwest` client and config for a single Vault connection. +/// Built once — at startup via [`init`] and once inside the renewal task — +/// so TLS handshakes and connection pool state are reused across all API calls. +struct VaultClient { + client: reqwest::Client, + cfg: VaultConfig, +} + +impl VaultClient { + fn new(cfg: VaultConfig) -> Result { + let client = api::build_client(&cfg)?; + Ok(Self { client, cfg }) + } + + /// Authenticate to Vault and return a short-lived token. + async fn login(&self) -> Result { + match self.cfg.auth_method { + VaultAuthMethod::AppRole => { + let role_id = self.cfg.role_id.as_deref().ok_or_else(|| { + Error::SecretId("vault: role_id is required for AppRole auth".into()) + })?; + let secret_id = self + .cfg + .secret_id() + .map_err(|e| Error::SecretId(e.to_string()))?; + api::approle_login(&self.client, &self.cfg.address, role_id, &secret_id).await + } + VaultAuthMethod::Kubernetes => { + let role = self.cfg.kubernetes_role.as_deref().ok_or_else(|| { + Error::SecretId( + "vault: kubernetes_role is required for Kubernetes auth".into(), + ) + })?; + let jwt = tokio::fs::read_to_string(self.cfg.jwt_path()) + .await + .map(|s| s.trim().to_string()) + .map_err(|e| { + Error::SecretId(format!( + "vault: failed to read JWT from {}: {e}", + self.cfg.jwt_path() + )) + })?; + api::kubernetes_login( + &self.client, + &self.cfg.address, + &self.cfg.kubernetes_mount_path, + role, + &jwt, + ) + .await + } + } + } + + /// Authenticate once, fetch credentials for every pool, call `on_credential` for + /// each. Returns the minimum lease duration seen across all pools. + async fn fetch_credentials( + &self, + pools: &[(String, String)], + mut on_credential: impl FnMut(&str, &VaultCredential) -> Result<(), Error>, + ) -> Result { + let token = self.login().await?; + let mut min_lease = u64::MAX; + + for (pool_name, vault_path) in pools { + let cred = api::fetch_credential( + &self.client, + &self.cfg.address, + &token.client_token, + vault_path, + ) + .await?; + if cred.lease_duration == 0 { + tracing::warn!( + pool = %pool_name, + "vault: lease_duration is 0 — credentials may not be renewable; check Vault backend config" + ); + } + on_credential(pool_name, &cred)?; + min_lease = min_lease.min(cred.lease_duration); + } + + Ok(if min_lease == u64::MAX { 0 } else { min_lease }) + } +} + // ── startup init ────────────────────────────────────────────────────────────── /// Fetch Vault credentials for all users with `vault_path` and write them into @@ -50,7 +138,15 @@ pub async fn init(vault_config: &VaultConfig, users: &[User]) -> Duration { return Duration::ZERO; } - match fetch_and_update_config(vault_config, &pools).await { + let client = match VaultClient::new(vault_config.clone()) { + Ok(c) => c, + Err(err) => { + error!("vault: failed to build HTTP client: {err}"); + return Duration::ZERO; + } + }; + + match client.fetch_credentials(&pools, update_config).await { Ok(min_lease) => { info!(pools = pools.len(), "vault: initial credentials fetched"); rotation_interval(min_lease, vault_config.pre_rotation_pct) @@ -90,7 +186,14 @@ impl VaultManager { let cfg = vault_config.clone(); let handle = tokio::spawn(async move { - renewal_task(cfg, pools, initial_delay).await; + let client = match VaultClient::new(cfg) { + Ok(c) => c, + Err(err) => { + error!("vault: failed to build HTTP client: {err}"); + return; + } + }; + renewal_task(client, pools, initial_delay).await; }); Some(Self { handle }) @@ -108,7 +211,11 @@ impl Drop for VaultManager { /// A single task that sleeps for `initial_delay`, then repeatedly authenticates /// to Vault, rotates credentials for all configured pools, and sleeps until the /// next rotation is due. -async fn renewal_task(cfg: VaultConfig, pools: Vec<(String, String)>, initial_delay: Duration) { +async fn renewal_task( + client: VaultClient, + pools: Vec<(String, String)>, + initial_delay: Duration, +) { // Skip initial sleep when init() failed (initial_delay == ZERO) to retry promptly. if !initial_delay.is_zero() { tokio::time::sleep(initial_delay).await; @@ -118,10 +225,10 @@ async fn renewal_task(cfg: VaultConfig, pools: Vec<(String, String)>, initial_de const MAX_BACKOFF: Duration = Duration::from_secs(60); loop { - match rotation_cycle(&cfg, &pools).await { + match client.fetch_credentials(&pools, apply_credential).await { Ok(min_lease) => { backoff = Duration::from_secs(1); - let next = rotation_interval(min_lease, cfg.pre_rotation_pct); + let next = rotation_interval(min_lease, client.cfg.pre_rotation_pct); info!( pools = pools.len(), min_lease_secs = min_lease, @@ -142,74 +249,6 @@ async fn renewal_task(cfg: VaultConfig, pools: Vec<(String, String)>, initial_de } } -/// Authenticate once, fetch credentials for every pool, call `on_credential` for -/// each. Returns minimum lease duration across all pools. -async fn fetch_credentials( - cfg: &VaultConfig, - pools: &[(String, String)], - mut on_credential: impl FnMut(&str, &VaultCredential) -> Result<(), Error>, -) -> Result { - let token = vault_login(cfg).await?; - let mut min_lease = u64::MAX; - - for (pool_name, vault_path) in pools { - let cred = api::fetch_credential(&cfg.address, &token.client_token, vault_path).await?; - if cred.lease_duration == 0 { - tracing::warn!( - pool = %pool_name, - "vault: lease_duration is 0 — credentials may not be renewable; check Vault backend config" - ); - } - on_credential(pool_name, &cred)?; - min_lease = min_lease.min(cred.lease_duration); - } - - Ok(if min_lease == u64::MAX { 0 } else { min_lease }) -} - -/// Fetch + apply credentials (with pool reload). Used by renewal task. -async fn rotation_cycle(cfg: &VaultConfig, pools: &[(String, String)]) -> Result { - fetch_credentials(cfg, pools, apply_credential).await -} - -/// Fetch + write credentials to config without pool reload. Used by [`init`]. -async fn fetch_and_update_config( - cfg: &VaultConfig, - pools: &[(String, String)], -) -> Result { - fetch_credentials(cfg, pools, update_config).await -} - -/// Authenticate to Vault using whichever method is configured. -async fn vault_login(cfg: &VaultConfig) -> Result { - match cfg.auth_method { - VaultAuthMethod::AppRole => { - let role_id = cfg.role_id.as_deref().ok_or_else(|| { - Error::SecretId("vault: role_id is required for AppRole auth".into()) - })?; - let secret_id = cfg - .secret_id() - .map_err(|e| Error::SecretId(e.to_string()))?; - api::approle_login(&cfg.address, role_id, &secret_id).await - } - VaultAuthMethod::Kubernetes => { - let role = cfg.kubernetes_role.as_deref().ok_or_else(|| { - Error::SecretId("vault: kubernetes_role is required for Kubernetes auth".into()) - })?; - let jwt = tokio::fs::read_to_string(cfg.jwt_path()) - .await - .map(|s| s.trim().to_string()) - .map_err(|e| { - Error::SecretId(format!( - "vault: failed to read JWT from {}: {e}", - cfg.jwt_path() - )) - })?; - api::kubernetes_login(&cfg.address, &cfg.kubernetes_mount_path, role, &jwt).await - } - } -} - // ── config update ───────────────────────────────────────────────────────────── /// Update `server_user` / `server_password` for `pool_name` in the live config, @@ -280,11 +319,14 @@ mod tests { use super::api::test_support; use super::*; use crate::config::{load_test, set}; + use once_cell::sync::Lazy; use pgdog_config::VaultConfig; - /// Set up a minimal config with a "pgdog" user but do NOT call databases::init(). - /// Use this in tests that exercise the config-update path but never call - /// reload_from_existing() (e.g. error paths, init()-only paths). + // reqwest with rustls-tls needs a process-level CryptoProvider. + static RING: Lazy<()> = Lazy::new(|| { + let _ = tokio_rustls::rustls::crypto::aws_lc_rs::default_provider().install_default(); + }); + fn set_config_only() { let mut cfg = pgdog_config::ConfigAndUsers::default(); cfg.config.databases = vec![pgdog_config::Database { @@ -303,6 +345,7 @@ mod tests { } fn vault_cfg() -> VaultConfig { + let _ = *RING; VaultConfig { address: "http://127.0.0.1:8200".into(), auth_method: pgdog_config::VaultAuthMethod::AppRole, @@ -313,6 +356,8 @@ mod tests { kubernetes_role: None, kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: pgdog_config::VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, } } @@ -327,9 +372,19 @@ mod tests { kubernetes_role: Some("pgdog".into()), kubernetes_jwt_path: VaultConfig::default_kubernetes_jwt_path(), kubernetes_mount_path: VaultConfig::default_kubernetes_mount_path(), + tls_verify: pgdog_config::VaultTlsVerify::VerifyFull, + tls_server_ca_certificate: None, } } + fn vault_client() -> VaultClient { + VaultClient::new(vault_cfg()).expect("failed to build VaultClient in test") + } + + fn k8s_client() -> VaultClient { + VaultClient::new(k8s_cfg()).expect("failed to build VaultClient in test") + } + fn test_cred(username: &str) -> VaultCredential { test_cred_with_lease(username, 3600) } @@ -356,27 +411,23 @@ mod tests { #[test] fn test_rotation_interval_clamps_100_to_99() { - // 100 clamped → 99%; should equal explicit 99% assert_eq!(rotation_interval(100, 100), rotation_interval(100, 99)); assert_eq!(rotation_interval(100, 99), Duration::from_secs(99)); } #[test] fn test_rotation_interval_clamps_0_to_1() { - // 0 clamped → 1%; 1% of 1000s = 10s, which equals MIN_ROTATION_INTERVAL assert_eq!(rotation_interval(1000, 0), Duration::from_secs(10)); } #[test] fn test_rotation_interval_zero_lease_uses_minimum() { - // Vault returning lease_duration=0 must not cause a busy-loop. assert_eq!(rotation_interval(0, 75), MIN_ROTATION_INTERVAL); assert_eq!(rotation_interval(0, 0), MIN_ROTATION_INTERVAL); } #[test] fn test_rotation_interval_short_lease_uses_minimum() { - // 1s lease at 75% = 0.75s, below minimum — should clamp to 10s. assert_eq!(rotation_interval(1, 75), MIN_ROTATION_INTERVAL); } @@ -384,8 +435,6 @@ mod tests { #[tokio::test] async fn test_apply_credential_errors_on_unknown_pool() { - // update_config returns PoolNotFound before reload_from_existing is called, - // so no pool init needed. set_config_only(); let cred = test_cred("v-approle-dml-AbCdEf"); let err = apply_credential("nonexistent_vault_pool", &cred).unwrap_err(); @@ -396,20 +445,14 @@ mod tests { async fn test_apply_credential_updates_known_pool() { load_test(); let cred = test_cred("v-approle-pgdog-AbCdEf"); - // "pgdog" is the user set up by load_test() - let result = apply_credential("pgdog", &cred); - assert!( - result.is_ok(), - "apply_credential should succeed for a known pool" - ); + assert!(apply_credential("pgdog", &cred).is_ok()); } - // ── rotation_cycle (mocked) ─────────────────────────────────────────────── + // ── fetch_credentials (mocked) ─────────────────────────────────────────── #[tokio::test] - async fn test_rotation_cycle_uses_api_overrides() { + async fn test_fetch_credentials_apply_rotation() { load_test(); - test_support::set_login(Some(Ok(VaultToken { client_token: "tok".into(), lease_duration: 3600, @@ -417,31 +460,30 @@ mod tests { }))); test_support::set_credential(Some(Ok(test_cred("v-approle-dml-mock")))); - let cfg = vault_cfg(); let pools = vec![("pgdog".into(), "database/creds/dml-role".into())]; - let lease = rotation_cycle(&cfg, &pools) + let lease = vault_client() + .fetch_credentials(&pools, apply_credential) .await - .expect("rotation_cycle should succeed with mocked API"); - + .expect("fetch_credentials should succeed with mocked API"); assert_eq!(lease, 3600); } #[tokio::test] - async fn test_rotation_cycle_propagates_login_error() { - // Login fails before apply_credential is called, so no pool init needed. + async fn test_fetch_credentials_propagates_login_error() { test_support::set_login(Some(Err(Error::VaultStatus { status: 403, body: "".into(), }))); - let pools = vec![("pool".into(), "database/creds/role".into())]; - let err = rotation_cycle(&vault_cfg(), &pools).await.unwrap_err(); + let err = vault_client() + .fetch_credentials(&pools, apply_credential) + .await + .unwrap_err(); assert!(matches!(err, Error::VaultStatus { status: 403, .. })); } #[tokio::test] - async fn test_rotation_cycle_propagates_credential_error() { - // Credential fetch fails before apply_credential is called, so no pool init needed. + async fn test_fetch_credentials_propagates_credential_error() { test_support::set_login(Some(Ok(VaultToken { client_token: "tok".into(), lease_duration: 3600, @@ -451,9 +493,11 @@ mod tests { status: 500, body: "".into(), }))); - let pools = vec![("pool".into(), "database/creds/role".into())]; - let err = rotation_cycle(&vault_cfg(), &pools).await.unwrap_err(); + let err = vault_client() + .fetch_credentials(&pools, apply_credential) + .await + .unwrap_err(); assert!(matches!(err, Error::VaultStatus { status: 500, .. })); } @@ -466,8 +510,7 @@ mod tests { vault_path: None, ..Default::default() }]; - let result = VaultManager::start(&vault_cfg(), &users, Duration::ZERO); - assert!(result.is_none()); + assert!(VaultManager::start(&vault_cfg(), &users, Duration::ZERO).is_none()); } #[tokio::test] @@ -489,20 +532,17 @@ mod tests { ..Default::default() }, ]; - // Large initial_delay so the spawned renewal_task never actually fires during - // the test — we're only checking that a manager is created, not that it runs. - let manager = VaultManager::start(&vault_cfg(), &users, Duration::from_secs(3600)); - // Two vault_path users → one task managing both pools - assert!(manager.is_some()); + // Large initial_delay so the renewal_task never fires during the test. + assert!( + VaultManager::start(&vault_cfg(), &users, Duration::from_secs(3600)).is_some() + ); } // ── init ───────────────────────────────────────────────────────────────── #[tokio::test] async fn test_init_populates_config_and_returns_rotation_interval() { - // init() calls update_config() only — no reload_from_existing, no pool init needed. set_config_only(); - test_support::set_login(Some(Ok(VaultToken { client_token: "tok".into(), lease_duration: 3600, @@ -517,8 +557,7 @@ mod tests { }]; let delay = init(&vault_cfg(), &users).await; - // 75% of 3600s = 2700s - assert_eq!(delay, Duration::from_secs(2700)); + assert_eq!(delay, Duration::from_secs(2700)); // 75% of 3600s let cfg = config(); let user = cfg.users.users.iter().find(|u| u.name == "pgdog").unwrap(); @@ -532,55 +571,47 @@ mod tests { status: 403, body: "permission denied".into(), }))); - let users = vec![pgdog_config::User { name: "dml_role".into(), vault_path: Some("database/creds/dml-role".into()), ..Default::default() }]; - let delay = init(&vault_cfg(), &users).await; - assert_eq!( - delay, - Duration::ZERO, - "failed init must return ZERO so background task retries immediately" - ); + assert_eq!(delay, Duration::ZERO); } #[tokio::test] async fn test_init_returns_zero_when_no_vault_users() { - let users: Vec = vec![]; - let delay = init(&vault_cfg(), &users).await; + let delay = init(&vault_cfg(), &[]).await; assert_eq!(delay, Duration::ZERO); } - // ── AppRole vault_login ─────────────────────────────────────────────────── + // ── VaultClient::login ──────────────────────────────────────────────────── #[tokio::test] - async fn test_vault_login_approle() { + async fn test_login_approle() { test_support::set_login(Some(Ok(VaultToken { client_token: "approle-tok".into(), lease_duration: 3600, renewable: true, }))); - let tok = vault_login(&vault_cfg()).await.unwrap(); + let tok = vault_client().login().await.unwrap(); assert_eq!(tok.client_token, "approle-tok"); } #[tokio::test] - async fn test_vault_login_approle_missing_role_id_errors() { - let cfg = VaultConfig { + async fn test_login_approle_missing_role_id_errors() { + let client = VaultClient::new(VaultConfig { role_id: None, ..vault_cfg() - }; - let err = vault_login(&cfg).await.unwrap_err(); + }) + .unwrap(); + let err = client.login().await.unwrap_err(); assert!(matches!(err, Error::SecretId(_))); } - // ── Kubernetes vault_login ──────────────────────────────────────────────── - #[tokio::test] - async fn test_vault_login_kubernetes() { + async fn test_login_kubernetes() { use std::io::Write; let mut f = tempfile::NamedTempFile::new().unwrap(); writeln!(f, "eyJhbGciOiJSUzI1NiJ9.stub").unwrap(); @@ -591,36 +622,39 @@ mod tests { renewable: true, }))); - let cfg = VaultConfig { + let client = VaultClient::new(VaultConfig { kubernetes_jwt_path: f.path().to_str().unwrap().into(), ..k8s_cfg() - }; - let tok = vault_login(&cfg).await.unwrap(); + }) + .unwrap(); + let tok = client.login().await.unwrap(); assert_eq!(tok.client_token, "k8s-tok"); } #[tokio::test] - async fn test_vault_login_kubernetes_missing_role_errors() { - let cfg = VaultConfig { + async fn test_login_kubernetes_missing_role_errors() { + let client = VaultClient::new(VaultConfig { kubernetes_role: None, ..k8s_cfg() - }; - let err = vault_login(&cfg).await.unwrap_err(); + }) + .unwrap(); + let err = client.login().await.unwrap_err(); assert!(matches!(err, Error::SecretId(_))); } #[tokio::test] - async fn test_vault_login_kubernetes_missing_jwt_file_errors() { - let cfg = VaultConfig { + async fn test_login_kubernetes_missing_jwt_file_errors() { + let client = VaultClient::new(VaultConfig { kubernetes_jwt_path: "/nonexistent/token".into(), ..k8s_cfg() - }; - let err = vault_login(&cfg).await.unwrap_err(); + }) + .unwrap(); + let err = client.login().await.unwrap_err(); assert!(matches!(err, Error::SecretId(_))); } #[tokio::test] - async fn test_rotation_cycle_kubernetes() { + async fn test_fetch_credentials_kubernetes() { use std::io::Write; load_test(); @@ -634,12 +668,16 @@ mod tests { }))); test_support::set_credential(Some(Ok(test_cred_with_lease("v-k8s-dml-AbCdEf", 7200)))); - let cfg = VaultConfig { + let client = VaultClient::new(VaultConfig { kubernetes_jwt_path: f.path().to_str().unwrap().into(), ..k8s_cfg() - }; + }) + .unwrap(); let pools = vec![("pgdog".into(), "database/creds/dml-role".into())]; - let lease = rotation_cycle(&cfg, &pools).await.unwrap(); + let lease = client + .fetch_credentials(&pools, apply_credential) + .await + .unwrap(); assert_eq!(lease, 7200); } } From 884f14e6391a69b11b8da22ca074ca62e5e36190 Mon Sep 17 00:00:00 2001 From: Lev Kokotov Date: Tue, 5 May 2026 11:56:05 -0700 Subject: [PATCH 4/4] fix rebase --- pgdog/src/backend/pool/address.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pgdog/src/backend/pool/address.rs b/pgdog/src/backend/pool/address.rs index 89e6de13f..7066001cd 100644 --- a/pgdog/src/backend/pool/address.rs +++ b/pgdog/src/backend/pool/address.rs @@ -94,6 +94,7 @@ impl Address { /// Get address passwords, in valid order. pub async fn auth_secrets(&self) -> Result, Error> { let mut secrets = match self.server_auth { + ServerAuth::Vault => self.passwords.clone(), // Vault passwords are set on boot. TODO: refresh them on `RELOAD`/`RECONNECT`. ServerAuth::Password => self.passwords.clone(), ServerAuth::RdsIam => vec![crate::backend::auth::rds_iam::token(self).await?.into()], ServerAuth::AzureWorkloadIdentity => { @@ -238,8 +239,15 @@ mod test { }; let address = Address::new(&database, &user, 0); - assert_eq!(address.user, "v-approle-dml-XyZ", "must use vault-generated username"); - assert_eq!(address.passwords, vec!["vault-pass"], "must use vault-generated password"); + assert_eq!( + address.user, "v-approle-dml-XyZ", + "must use vault-generated username" + ); + assert_eq!( + address.passwords, + vec!["vault-pass"], + "must use vault-generated password" + ); assert_eq!(address.server_auth, ServerAuth::Vault); }