diff --git a/docs/en/engines/database-engines/datalake.md b/docs/en/engines/database-engines/datalake.md index 9c20511172b3..f90294b87bd4 100644 --- a/docs/en/engines/database-engines/datalake.md +++ b/docs/en/engines/database-engines/datalake.md @@ -56,6 +56,7 @@ The following settings are supported: | `storage_endpoint` | Endpoint URL for the underlying storage | | `oauth_server_uri` | URI of the OAuth2 authorization server for authentication | | `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) | +| `vended_credentials_cache_ttl` | Maximum cache entry lifetime (in seconds) for vended credentials (REST catalogs only). Default `300`; `0` disables caching. | | `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) | | `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) | | `region` | AWS region for the service (e.g., `us-east-1`) | diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp index 993e9d6a85cd..0828ee9dc92f 100644 --- a/src/Common/ProfileEvents.cpp +++ b/src/Common/ProfileEvents.cpp @@ -1429,6 +1429,8 @@ The server successfully detected this situation and will download merged part fr M(DataLakeRestCatalogGetTableMetadataMicroseconds, "Total time of 'get table metadata' requests to Iceberg REST catalog.", ValueType::Microseconds) \ M(DataLakeRestCatalogGetCredentials, "Number of 'get credentials' requests to Iceberg REST catalog.", ValueType::Number) \ M(DataLakeRestCatalogGetCredentialsMicroseconds, "Total time of 'get credentials' requests to Iceberg REST catalog.", ValueType::Microseconds) \ + M(DataLakeRestCatalogCredentialsVended, "Number of table metadata requests to Iceberg REST catalog that asked the catalog to vend storage credentials (i.e. cache miss).", ValueType::Number) \ + M(DataLakeRestCatalogCredentialsCacheHits, "Number of table metadata requests to Iceberg REST catalog that reused cached storage credentials and did not ask the catalog to vend new ones.", ValueType::Number) \ M(DataLakeRestCatalogCreateNamespace, "Number of 'create namespace' requests to Iceberg REST catalog.", ValueType::Number) \ M(DataLakeRestCatalogCreateNamespaceMicroseconds, "Total time of 'create namespace' requests to Iceberg REST catalog.", ValueType::Microseconds) \ M(DataLakeRestCatalogCreateTable, "Number of 'create table' requests to Iceberg REST catalog.", ValueType::Number) \ diff --git a/src/Databases/DataLake/DatabaseDataLake.cpp b/src/Databases/DataLake/DatabaseDataLake.cpp index 112c07809a03..975806c6c3ca 100644 --- a/src/Databases/DataLake/DatabaseDataLake.cpp +++ b/src/Databases/DataLake/DatabaseDataLake.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include #include @@ -65,6 +66,7 @@ namespace DatabaseDataLakeSetting extern const DatabaseDataLakeSettingsString oauth_server_uri; extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body; extern const DatabaseDataLakeSettingsBool vended_credentials; + extern const DatabaseDataLakeSettingsUInt64 vended_credentials_cache_ttl; extern const DatabaseDataLakeSettingsString object_storage_cluster; extern const DatabaseDataLakeSettingsString aws_access_key_id; extern const DatabaseDataLakeSettingsString aws_secret_access_key; @@ -335,6 +337,10 @@ std::shared_ptr DatabaseDataLake::getCatalog() const } } + if (catalog_impl) + catalog_impl->setVendedCredentialsCacheTTL( + std::chrono::seconds(settings[DatabaseDataLakeSetting::vended_credentials_cache_ttl].value)); + return catalog_impl; } diff --git a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp index c65e0f18cea7..f12268490840 100644 --- a/src/Databases/DataLake/DatabaseDataLakeSettings.cpp +++ b/src/Databases/DataLake/DatabaseDataLakeSettings.cpp @@ -20,6 +20,7 @@ namespace ErrorCodes DECLARE(DatabaseDataLakeCatalogType, catalog_type, DatabaseDataLakeCatalogType::NONE, "Catalog type", 0) \ DECLARE(String, catalog_credential, "", "", 0) \ DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \ + DECLARE(UInt64, vended_credentials_cache_ttl, 300, "Maximum cache entry lifetime (in seconds) for vended credentials. '0' disables caching.", 0) \ DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \ DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \ DECLARE(Bool, oauth_server_use_request_body, true, "Put parameters into request body or query params", 0) \ diff --git a/src/Databases/DataLake/ICatalog.h b/src/Databases/DataLake/ICatalog.h index 9ccb420ac759..cbd700fbabcc 100644 --- a/src/Databases/DataLake/ICatalog.h +++ b/src/Databases/DataLake/ICatalog.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -219,6 +220,8 @@ class ICatalog return std::nullopt; } + virtual void setVendedCredentialsCacheTTL(std::chrono::seconds /*ttl*/) {} + protected: /// Name of the warehouse, /// which is sometimes also called "catalog name". diff --git a/src/Databases/DataLake/RestCatalog.cpp b/src/Databases/DataLake/RestCatalog.cpp index 60e6dbc59c12..e030c873a152 100644 --- a/src/Databases/DataLake/RestCatalog.cpp +++ b/src/Databases/DataLake/RestCatalog.cpp @@ -44,6 +44,11 @@ #include #include #include +#include +#include +#include +#include +#include namespace DB::ErrorCodes @@ -66,6 +71,8 @@ namespace ProfileEvents extern const Event DataLakeRestCatalogGetTableMetadataMicroseconds; extern const Event DataLakeRestCatalogGetCredentials; extern const Event DataLakeRestCatalogGetCredentialsMicroseconds; + extern const Event DataLakeRestCatalogCredentialsVended; + extern const Event DataLakeRestCatalogCredentialsCacheHits; extern const Event DataLakeRestCatalogCreateNamespace; extern const Event DataLakeRestCatalogCreateNamespaceMicroseconds; extern const Event DataLakeRestCatalogCreateTable; @@ -928,15 +935,28 @@ bool RestCatalog::getTableMetadataImpl( "Namespace {} is filtered by `namespaces` database parameter", namespace_name); DB::HTTPHeaderEntries headers; - if (result.requiresCredentials()) + + const bool want_credentials = result.requiresCredentials(); + + /// Reuse previously vended credentials is possible + std::optional cached_credentials; + if (want_credentials) { + cached_credentials = tryGetCachedCredentials(namespace_name, table_name); + /// Header `X-Iceberg-Access-Delegation` tells catalog to include storage credentials in LoadTableResponse. /// Value can be one of the two: /// 1. `vended-credentials` /// 2. `remote-signing` /// Currently we support only the first. /// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L1832 - headers.emplace_back("X-Iceberg-Access-Delegation", "vended-credentials"); + if (cached_credentials) + ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogCredentialsCacheHits); + else + { + ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogCredentialsVended); + headers.emplace_back("X-Iceberg-Access-Delegation", "vended-credentials"); + } } const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encodeNamespaceForURI(namespace_name) / "tables" / table_name; @@ -994,16 +1014,28 @@ bool RestCatalog::getTableMetadataImpl( result.setSchema(*schema); } - if (result.isDefaultReadableTable() && result.requiresCredentials() && object->has("config")) + if (want_credentials && result.isDefaultReadableTable()) { - auto config_object = object->get("config").extract(); - if (!config_object) - throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse config result"); - auto [parsed_credentials, parsed_endpoint] = getCredentialsAndEndpoint(config_object, location); - if (parsed_credentials) - result.setStorageCredentials(parsed_credentials); - if (!parsed_endpoint.empty()) - result.setEndpoint(parsed_endpoint); + if (cached_credentials) + { + result.setStorageCredentials(cached_credentials->credentials); + if (!cached_credentials->endpoint.empty()) + result.setEndpoint(cached_credentials->endpoint); + } + else if (object->has("config")) + { + auto config_object = object->get("config").extract(); + if (!config_object) + throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse config result"); + auto parsed = getCredentialsAndEndpoint(config_object, location); + if (parsed.credentials) + { + result.setStorageCredentials(parsed.credentials); + cacheCredentials(namespace_name, table_name, parsed); + } + if (!parsed.endpoint.empty()) + result.setEndpoint(parsed.endpoint); + } } if (result.requiresDataLakeSpecificProperties()) @@ -1229,7 +1261,44 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_ } } -std::pair, String> RestCatalog::getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const +namespace +{ +std::optional parseSasTokenExpiry(const std::string & sas_token) +{ + std::string token = sas_token; + if (!token.empty() && token.front() == '?') + token.erase(0, 1); + + Poco::StringTokenizer params(token, "&", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM); + for (const auto & param : params) + { + if (!param.starts_with("se=")) + continue; + + try + { + std::string decoded; + Poco::URI::decode(param.substr(3), decoded); + + int time_zone_differential = 0; + Poco::DateTime date_time; + if (Poco::DateTimeParser::tryParse(Poco::DateTimeFormat::ISO8601_FORMAT, decoded, date_time, time_zone_differential)) + { + date_time.makeUTC(time_zone_differential); + return std::chrono::system_clock::from_time_t(date_time.timestamp().epochTime()); + } + } + catch (...) + { + return std::nullopt; + } + return std::nullopt; + } + return std::nullopt; +} +} + +VendedStorageCredentials RestCatalog::getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const { auto storage_type = parseStorageTypeFromLocation(location); switch (storage_type) @@ -1240,11 +1309,13 @@ std::pair, String> RestCatalog::getCredenti static constexpr auto secret_access_key_str = "s3.secret-access-key"; static constexpr auto session_token_str = "s3.session-token"; static constexpr auto storage_endpoint_str = "s3.endpoint"; + static constexpr auto session_token_expires_at_ms_str = "s3.session-token-expires-at-ms"; std::string access_key_id; std::string secret_access_key; std::string session_token; std::string storage_endpoint; + std::optional expires_at; if (object->has(access_key_id_str)) access_key_id = object->get(access_key_id_str).extract(); if (object->has(secret_access_key_str)) @@ -1253,9 +1324,26 @@ std::pair, String> RestCatalog::getCredenti session_token = object->get(session_token_str).extract(); if (object->has(storage_endpoint_str)) storage_endpoint = object->get(storage_endpoint_str).extract(); + if (object->has(session_token_expires_at_ms_str)) + { + try + { + static constexpr Int64 max_representable_sec + = std::chrono::duration_cast(std::chrono::system_clock::duration::max()).count(); + const Int64 expires_at_ms = object->get(session_token_expires_at_ms_str).convert(); + if (expires_at_ms <= 0) + expires_at = std::chrono::system_clock::time_point{}; /// Already invalid: do not cache. + else if (expires_at_ms / 1000 < max_representable_sec) + expires_at = std::chrono::system_clock::from_time_t(static_cast(expires_at_ms / 1000)); + } + catch (...) + { + LOG_DEBUG(log, "Failed to parse '{}' from vended credentials config", session_token_expires_at_ms_str); + } + } LOG_DEBUG(log, "get tokens for location {}", location); - return {std::make_shared(access_key_id, secret_access_key, session_token), storage_endpoint}; + return {std::make_shared(access_key_id, secret_access_key, session_token), storage_endpoint, expires_at}; } case StorageType::Azure: { @@ -1277,15 +1365,59 @@ std::pair, String> RestCatalog::getCredenti } if (!sas_token.empty()) - { - return {std::make_shared(sas_token), ""}; - } + return {std::make_shared(sas_token), "", parseSasTokenExpiry(sas_token)}; break; } default: break; } - return {nullptr, ""}; + return {nullptr, "", std::nullopt}; +} + +std::optional RestCatalog::tryGetCachedCredentials( + const std::string & namespace_name, const std::string & table_name) const +{ + if (vended_credentials_cache_ttl.load(std::memory_order_relaxed) <= std::chrono::seconds::zero()) + return std::nullopt; + + std::lock_guard lock(credentials_cache_mutex); + auto it = credentials_cache.find({namespace_name, table_name}); + if (it == credentials_cache.end()) + return std::nullopt; + if (std::chrono::system_clock::now() >= it->second.expires_at.value()) + { + credentials_cache.erase(it); /// Drop the stale entry. + return std::nullopt; + } + + return it->second; +} + +void RestCatalog::cacheCredentials( + const std::string & namespace_name, const std::string & table_name, const VendedStorageCredentials & parsed) const +{ + const auto ttl = vended_credentials_cache_ttl.load(std::memory_order_relaxed); + if (ttl <= std::chrono::seconds::zero()) + return; + + if (!parsed.credentials || parsed.credentials->isEmpty()) + return; + + const auto now = std::chrono::system_clock::now(); + + /// Cap at the configured TTL so an entry never outlives the documented maximum lifetime. + auto refresh_after = now + ttl; + if (parsed.expires_at && parsed.expires_at.value() < refresh_after) + refresh_after = parsed.expires_at.value(); + if (refresh_after <= now) + return; + + std::lock_guard lock(credentials_cache_mutex); + + if (credentials_cache.size() >= credentials_cache_cleanup_threshold) + std::erase_if(credentials_cache, [&now](const auto & entry) { return now >= entry.second.expires_at.value(); }); + credentials_cache[{namespace_name, table_name}] + = VendedStorageCredentials{parsed.credentials, parsed.endpoint, refresh_after}; } ICatalog::CredentialsRefreshCallback RestCatalog::getCredentialsConfigurationCallback(const DB::StorageID & storage_id) @@ -1344,8 +1476,10 @@ ICatalog::CredentialsRefreshCallback RestCatalog::getCredentialsConfigurationCal throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot read table {}, because no 'metadata-location' in response", table_name); } - auto [new_credentials, _] = getCredentialsAndEndpoint(config_object, location); - return new_credentials; + auto parsed = getCredentialsAndEndpoint(config_object, location); + /// Refresh the per-table cache so subsequent queries reuse these freshly vended credentials. + cacheCredentials(namespace_name, table_name, parsed); + return parsed.credentials; }; } diff --git a/src/Databases/DataLake/RestCatalog.h b/src/Databases/DataLake/RestCatalog.h index 49fe684e0eaa..9184e93e4530 100644 --- a/src/Databases/DataLake/RestCatalog.h +++ b/src/Databases/DataLake/RestCatalog.h @@ -7,7 +7,13 @@ #include #include #include +#include +#include +#include #include +#include +#include +#include #include namespace DB @@ -31,6 +37,13 @@ struct AccessToken } }; +struct VendedStorageCredentials +{ + std::shared_ptr credentials; + std::string endpoint; + std::optional expires_at; +}; + class RestCatalog : public ICatalog, public DB::WithContext { public: @@ -82,6 +95,8 @@ class RestCatalog : public ICatalog, public DB::WithContext ICatalog::CredentialsRefreshCallback getCredentialsConfigurationCallback(const DB::StorageID & storage_id) override; + void setVendedCredentialsCacheTTL(std::chrono::seconds ttl) override { vended_credentials_cache_ttl.store(ttl, std::memory_order_relaxed); } + String getClientId() const { return client_id; } String getClientSecret() const { return client_secret; } @@ -127,6 +142,16 @@ class RestCatalog : public ICatalog, public DB::WithContext bool oauth_server_use_request_body; mutable std::optional access_token; + /// TTL for caching vended credentials per table (0 means no caching). + std::atomic vended_credentials_cache_ttl{std::chrono::seconds::zero()}; + + /// Sweep trigger threshold, not capacity! + static constexpr size_t credentials_cache_cleanup_threshold = 1000; + mutable std::mutex credentials_cache_mutex; + + mutable std::map, VendedStorageCredentials> credentials_cache + TSA_GUARDED_BY(credentials_cache_mutex); + public: class AllowedNamespaces { @@ -194,7 +219,13 @@ class RestCatalog : public ICatalog, public DB::WithContext const String & method = Poco::Net::HTTPRequest::HTTP_POST, bool ignore_result = false) const; - std::pair, String> getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const; + VendedStorageCredentials getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const; + + std::optional tryGetCachedCredentials( + const std::string & namespace_name, const std::string & table_name) const; + + void cacheCredentials( + const std::string & namespace_name, const std::string & table_name, const VendedStorageCredentials & parsed) const; AccessToken retrieveAccessToken() const; }; diff --git a/src/Databases/DataLake/StorageCredentials.h b/src/Databases/DataLake/StorageCredentials.h index 8d3d75d029e2..2e500e16885a 100644 --- a/src/Databases/DataLake/StorageCredentials.h +++ b/src/Databases/DataLake/StorageCredentials.h @@ -17,6 +17,9 @@ class IStorageCredentials virtual ~IStorageCredentials() = default; virtual void addCredentialsToEngineArgs(DB::ASTs & engine_args) const = 0; + + /// True when the credentials are unusable (mandatory fields empty); such credentials are not cached. + virtual bool isEmpty() const = 0; }; class S3Credentials final : public IStorageCredentials @@ -31,7 +34,7 @@ class S3Credentials final : public IStorageCredentials , session_token(session_token_) {} - bool isEmpty() const { return access_key_id.empty() || secret_access_key.empty(); } + bool isEmpty() const override { return access_key_id.empty() || secret_access_key.empty(); } void addCredentialsToEngineArgs(DB::ASTs & engine_args) const override { @@ -81,6 +84,8 @@ class AzureCredentials final : public IStorageCredentials engine_args.push_back(DB::make_intrusive(sas_token)); } + bool isEmpty() const override { return sas_token.empty(); } + private: std::string sas_token; }; diff --git a/tests/integration/test_database_iceberg_lakekeeper_catalog/test.py b/tests/integration/test_database_iceberg_lakekeeper_catalog/test.py index 9a9cea39fd09..90a71b7ced26 100644 --- a/tests/integration/test_database_iceberg_lakekeeper_catalog/test.py +++ b/tests/integration/test_database_iceberg_lakekeeper_catalog/test.py @@ -12,6 +12,7 @@ from pyiceberg.schema import Schema from pyiceberg.types import ( DoubleType, + IntegerType, NestedField, StringType, ) @@ -384,3 +385,76 @@ def test_invalid_auth_header_format(started_cluster): ) assert "Invalid auth header format" in str(err.value) + +def get_credentials_profile_events(node, query_id): + node.query("SYSTEM FLUSH LOGS") + vended = int(node.query( + f"SELECT ProfileEvents['DataLakeRestCatalogCredentialsVended'] " + f"FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'" + )) + hits = int(node.query( + f"SELECT ProfileEvents['DataLakeRestCatalogCredentialsCacheHits'] " + f"FROM system.query_log WHERE query_id = '{query_id}' AND type = 'QueryFinish'" + )) + return vended, hits + + +def test_vended_credentials_cache(started_cluster): + node = started_cluster.instances["node1"] + catalog = load_catalog_impl(started_cluster) + + test_ref = f"test_vended_credentials_cache_{uuid.uuid4().hex[:8]}" + namespace = (f"{test_ref}_namespace",) + table_name = f"{test_ref}_table" + db_name = f"{test_ref}_database" + + if namespace not in catalog.list_namespaces(): + catalog.create_namespace(namespace) + + schema = Schema( + NestedField(field_id=1, name="id", field_type=IntegerType(), required=False), + NestedField(field_id=2, name="data", field_type=StringType(), required=False), + ) + table = catalog.create_table( + namespace + (table_name,), + schema=schema, + properties={"write.metadata.compression-codec": "none"}, + ) + table.append( + pa.Table.from_pandas( + pd.DataFrame({"id": [1], "data": ["x"]}).astype({"id": "int32"}) + ) + ) + + query = f"SELECT count() FROM {db_name}.`{namespace[0]}.{table_name}`" + + # Caching enabled (default TTL): the second query reuses cached credentials + # and does not ask the catalog to vend them again. + create_clickhouse_iceberg_database(started_cluster, node, db_name) + + qid = f"{test_ref}-cache-1-{uuid.uuid4()}" + node.query(query, query_id=qid) + vended, _ = get_credentials_profile_events(node, qid) + assert vended >= 1 + + qid = f"{test_ref}-cache-2-{uuid.uuid4()}" + node.query(query, query_id=qid) + vended, hits = get_credentials_profile_events(node, qid) + assert vended == 0 and hits >= 1 + + # Caching disabled (TTL = 0): every query asks the catalog to vend credentials. + create_clickhouse_iceberg_database( + started_cluster, node, db_name, + additional_settings={"vended_credentials_cache_ttl": 0}, + ) + + qid = f"{test_ref}-nocache-1-{uuid.uuid4()}" + node.query(query, query_id=qid) + vended, hits = get_credentials_profile_events(node, qid) + assert vended >= 1 and hits == 0 + + qid = f"{test_ref}-nocache-2-{uuid.uuid4()}" + node.query(query, query_id=qid) + vended, hits = get_credentials_profile_events(node, qid) + assert vended >= 1 and hits == 0 +