Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/engines/database-engines/datalake.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ The following settings are supported:
| `storage_endpoint` | Endpoint URL for the underlying storage |
| `oauth_server_uri` | URI of the OAuth2 authorization server for authentication |
| `vended_credentials` | Boolean indicating whether to use vended credentials from the catalog (supports AWS S3 and Azure ADLS Gen2) |
| `vended_credentials_cache_ttl` | Maximum cache entry lifetime (in seconds) for vended credentials (REST catalogs only). Default `300`; `0` disables caching. |
| `aws_access_key_id` | AWS access key ID for S3/Glue access (if not using vended credentials) |
| `aws_secret_access_key` | AWS secret access key for S3/Glue access (if not using vended credentials) |
| `region` | AWS region for the service (e.g., `us-east-1`) |
Expand Down
2 changes: 2 additions & 0 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1429,6 +1429,8 @@ The server successfully detected this situation and will download merged part fr
M(DataLakeRestCatalogGetTableMetadataMicroseconds, "Total time of 'get table metadata' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogGetCredentials, "Number of 'get credentials' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogGetCredentialsMicroseconds, "Total time of 'get credentials' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogCredentialsVended, "Number of table metadata requests to Iceberg REST catalog that asked the catalog to vend storage credentials (i.e. cache miss).", ValueType::Number) \
M(DataLakeRestCatalogCredentialsCacheHits, "Number of table metadata requests to Iceberg REST catalog that reused cached storage credentials and did not ask the catalog to vend new ones.", ValueType::Number) \
M(DataLakeRestCatalogCreateNamespace, "Number of 'create namespace' requests to Iceberg REST catalog.", ValueType::Number) \
M(DataLakeRestCatalogCreateNamespaceMicroseconds, "Total time of 'create namespace' requests to Iceberg REST catalog.", ValueType::Microseconds) \
M(DataLakeRestCatalogCreateTable, "Number of 'create table' requests to Iceberg REST catalog.", ValueType::Number) \
Expand Down
6 changes: 6 additions & 0 deletions src/Databases/DataLake/DatabaseDataLake.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <algorithm>
#include <array>
#include <chrono>
#include <memory>
#include <Databases/DataLake/DatabaseDataLake.h>
#include <Core/SettingsEnums.h>
Expand Down Expand Up @@ -65,6 +66,7 @@ namespace DatabaseDataLakeSetting
extern const DatabaseDataLakeSettingsString oauth_server_uri;
extern const DatabaseDataLakeSettingsBool oauth_server_use_request_body;
extern const DatabaseDataLakeSettingsBool vended_credentials;
extern const DatabaseDataLakeSettingsUInt64 vended_credentials_cache_ttl;
extern const DatabaseDataLakeSettingsString object_storage_cluster;
extern const DatabaseDataLakeSettingsString aws_access_key_id;
extern const DatabaseDataLakeSettingsString aws_secret_access_key;
Expand Down Expand Up @@ -335,6 +337,10 @@ std::shared_ptr<DataLake::ICatalog> DatabaseDataLake::getCatalog() const
}
}

if (catalog_impl)
catalog_impl->setVendedCredentialsCacheTTL(
std::chrono::seconds(settings[DatabaseDataLakeSetting::vended_credentials_cache_ttl].value));

return catalog_impl;
}

Expand Down
1 change: 1 addition & 0 deletions src/Databases/DataLake/DatabaseDataLakeSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ namespace ErrorCodes
DECLARE(DatabaseDataLakeCatalogType, catalog_type, DatabaseDataLakeCatalogType::NONE, "Catalog type", 0) \
DECLARE(String, catalog_credential, "", "", 0) \
DECLARE(Bool, vended_credentials, true, "Use vended credentials (storage credentials) from catalog", 0) \
DECLARE(UInt64, vended_credentials_cache_ttl, 300, "Maximum cache entry lifetime (in seconds) for vended credentials. '0' disables caching.", 0) \
DECLARE(String, auth_scope, "PRINCIPAL_ROLE:ALL", "Authorization scope for client credentials or token exchange", 0) \
DECLARE(String, oauth_server_uri, "", "OAuth server uri", 0) \
DECLARE(Bool, oauth_server_use_request_body, true, "Put parameters into request body or query params", 0) \
Expand Down
3 changes: 3 additions & 0 deletions src/Databases/DataLake/ICatalog.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#pragma once
#include <chrono>
#include <optional>
#include <Core/Types.h>
#include <Core/NamesAndTypes.h>
Expand Down Expand Up @@ -219,6 +220,8 @@ class ICatalog
return std::nullopt;
}

virtual void setVendedCredentialsCacheTTL(std::chrono::seconds /*ttl*/) {}

protected:
/// Name of the warehouse,
/// which is sometimes also called "catalog name".
Expand Down
172 changes: 153 additions & 19 deletions src/Databases/DataLake/RestCatalog.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@
#include <Poco/Net/HTTPSClientSession.h>
#include <Poco/Net/SSLManager.h>
#include <Poco/StreamCopier.h>
#include <Poco/DateTime.h>
#include <Poco/DateTimeFormat.h>
#include <Poco/DateTimeParser.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>


namespace DB::ErrorCodes
Expand All @@ -66,6 +71,8 @@ namespace ProfileEvents
extern const Event DataLakeRestCatalogGetTableMetadataMicroseconds;
extern const Event DataLakeRestCatalogGetCredentials;
extern const Event DataLakeRestCatalogGetCredentialsMicroseconds;
extern const Event DataLakeRestCatalogCredentialsVended;
extern const Event DataLakeRestCatalogCredentialsCacheHits;
extern const Event DataLakeRestCatalogCreateNamespace;
extern const Event DataLakeRestCatalogCreateNamespaceMicroseconds;
extern const Event DataLakeRestCatalogCreateTable;
Expand Down Expand Up @@ -928,15 +935,28 @@ bool RestCatalog::getTableMetadataImpl(
"Namespace {} is filtered by `namespaces` database parameter", namespace_name);

DB::HTTPHeaderEntries headers;
if (result.requiresCredentials())

const bool want_credentials = result.requiresCredentials();

/// Reuse previously vended credentials is possible
std::optional<VendedStorageCredentials> cached_credentials;
if (want_credentials)
{
cached_credentials = tryGetCachedCredentials(namespace_name, table_name);

/// Header `X-Iceberg-Access-Delegation` tells catalog to include storage credentials in LoadTableResponse.
/// Value can be one of the two:
/// 1. `vended-credentials`
/// 2. `remote-signing`
/// Currently we support only the first.
/// https://github.com/apache/iceberg/blob/3badfe0c1fcf0c0adfc7aa4a10f0b50365c48cf9/open-api/rest-catalog-open-api.yaml#L1832
headers.emplace_back("X-Iceberg-Access-Delegation", "vended-credentials");
if (cached_credentials)
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogCredentialsCacheHits);
else
{
ProfileEvents::increment(ProfileEvents::DataLakeRestCatalogCredentialsVended);
headers.emplace_back("X-Iceberg-Access-Delegation", "vended-credentials");
}
}

const std::string endpoint = std::filesystem::path(NAMESPACES_ENDPOINT) / encodeNamespaceForURI(namespace_name) / "tables" / table_name;
Expand Down Expand Up @@ -994,16 +1014,28 @@ bool RestCatalog::getTableMetadataImpl(
result.setSchema(*schema);
}

if (result.isDefaultReadableTable() && result.requiresCredentials() && object->has("config"))
if (want_credentials && result.isDefaultReadableTable())
{
auto config_object = object->get("config").extract<Poco::JSON::Object::Ptr>();
if (!config_object)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse config result");
auto [parsed_credentials, parsed_endpoint] = getCredentialsAndEndpoint(config_object, location);
if (parsed_credentials)
result.setStorageCredentials(parsed_credentials);
if (!parsed_endpoint.empty())
result.setEndpoint(parsed_endpoint);
if (cached_credentials)
{
result.setStorageCredentials(cached_credentials->credentials);
if (!cached_credentials->endpoint.empty())
result.setEndpoint(cached_credentials->endpoint);
}
else if (object->has("config"))
{
auto config_object = object->get("config").extract<Poco::JSON::Object::Ptr>();
if (!config_object)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Cannot parse config result");
auto parsed = getCredentialsAndEndpoint(config_object, location);
if (parsed.credentials)
{
result.setStorageCredentials(parsed.credentials);
cacheCredentials(namespace_name, table_name, parsed);
}
if (!parsed.endpoint.empty())
result.setEndpoint(parsed.endpoint);
}
}

if (result.requiresDataLakeSpecificProperties())
Expand Down Expand Up @@ -1229,7 +1261,44 @@ void RestCatalog::dropTable(const String & namespace_name, const String & table_
}
}

std::pair<std::shared_ptr<IStorageCredentials>, String> RestCatalog::getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const
namespace
{
std::optional<std::chrono::system_clock::time_point> parseSasTokenExpiry(const std::string & sas_token)
{
std::string token = sas_token;
if (!token.empty() && token.front() == '?')
token.erase(0, 1);

Poco::StringTokenizer params(token, "&", Poco::StringTokenizer::TOK_IGNORE_EMPTY | Poco::StringTokenizer::TOK_TRIM);
for (const auto & param : params)
{
if (!param.starts_with("se="))
continue;

try
{
std::string decoded;
Poco::URI::decode(param.substr(3), decoded);

int time_zone_differential = 0;
Poco::DateTime date_time;
if (Poco::DateTimeParser::tryParse(Poco::DateTimeFormat::ISO8601_FORMAT, decoded, date_time, time_zone_differential))
{
date_time.makeUTC(time_zone_differential);
return std::chrono::system_clock::from_time_t(date_time.timestamp().epochTime());
}
}
catch (...)
{
return std::nullopt;
}
return std::nullopt;
}
return std::nullopt;
}
}

VendedStorageCredentials RestCatalog::getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const
{
auto storage_type = parseStorageTypeFromLocation(location);
switch (storage_type)
Expand All @@ -1240,11 +1309,13 @@ std::pair<std::shared_ptr<IStorageCredentials>, String> RestCatalog::getCredenti
static constexpr auto secret_access_key_str = "s3.secret-access-key";
static constexpr auto session_token_str = "s3.session-token";
static constexpr auto storage_endpoint_str = "s3.endpoint";
static constexpr auto session_token_expires_at_ms_str = "s3.session-token-expires-at-ms";

std::string access_key_id;
std::string secret_access_key;
std::string session_token;
std::string storage_endpoint;
std::optional<std::chrono::system_clock::time_point> expires_at;
if (object->has(access_key_id_str))
access_key_id = object->get(access_key_id_str).extract<String>();
if (object->has(secret_access_key_str))
Expand All @@ -1253,9 +1324,26 @@ std::pair<std::shared_ptr<IStorageCredentials>, String> RestCatalog::getCredenti
session_token = object->get(session_token_str).extract<String>();
if (object->has(storage_endpoint_str))
storage_endpoint = object->get(storage_endpoint_str).extract<String>();
if (object->has(session_token_expires_at_ms_str))
{
try
{
static constexpr Int64 max_representable_sec
= std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::duration::max()).count();
const Int64 expires_at_ms = object->get(session_token_expires_at_ms_str).convert<Int64>();
if (expires_at_ms <= 0)
expires_at = std::chrono::system_clock::time_point{}; /// Already invalid: do not cache.
else if (expires_at_ms / 1000 < max_representable_sec)
expires_at = std::chrono::system_clock::from_time_t(static_cast<std::time_t>(expires_at_ms / 1000));
}
catch (...)
{
LOG_DEBUG(log, "Failed to parse '{}' from vended credentials config", session_token_expires_at_ms_str);
}
}

LOG_DEBUG(log, "get tokens for location {}", location);
return {std::make_shared<S3Credentials>(access_key_id, secret_access_key, session_token), storage_endpoint};
return {std::make_shared<S3Credentials>(access_key_id, secret_access_key, session_token), storage_endpoint, expires_at};
}
case StorageType::Azure:
{
Expand All @@ -1277,15 +1365,59 @@ std::pair<std::shared_ptr<IStorageCredentials>, String> RestCatalog::getCredenti
}

if (!sas_token.empty())
{
return {std::make_shared<AzureCredentials>(sas_token), ""};
}
return {std::make_shared<AzureCredentials>(sas_token), "", parseSasTokenExpiry(sas_token)};
break;
}
default:
break;
}
return {nullptr, ""};
return {nullptr, "", std::nullopt};
}

std::optional<VendedStorageCredentials> RestCatalog::tryGetCachedCredentials(
const std::string & namespace_name, const std::string & table_name) const
{
if (vended_credentials_cache_ttl.load(std::memory_order_relaxed) <= std::chrono::seconds::zero())
return std::nullopt;

std::lock_guard lock(credentials_cache_mutex);
auto it = credentials_cache.find({namespace_name, table_name});
if (it == credentials_cache.end())
return std::nullopt;
if (std::chrono::system_clock::now() >= it->second.expires_at.value())
{
credentials_cache.erase(it); /// Drop the stale entry.
return std::nullopt;
}

return it->second;
}

void RestCatalog::cacheCredentials(
const std::string & namespace_name, const std::string & table_name, const VendedStorageCredentials & parsed) const
{
const auto ttl = vended_credentials_cache_ttl.load(std::memory_order_relaxed);
if (ttl <= std::chrono::seconds::zero())
return;

if (!parsed.credentials || parsed.credentials->isEmpty())
return;

const auto now = std::chrono::system_clock::now();

/// Cap at the configured TTL so an entry never outlives the documented maximum lifetime.
auto refresh_after = now + ttl;
if (parsed.expires_at && parsed.expires_at.value() < refresh_after)
refresh_after = parsed.expires_at.value();
if (refresh_after <= now)
return;

std::lock_guard lock(credentials_cache_mutex);

if (credentials_cache.size() >= credentials_cache_cleanup_threshold)
std::erase_if(credentials_cache, [&now](const auto & entry) { return now >= entry.second.expires_at.value(); });
credentials_cache[{namespace_name, table_name}]
= VendedStorageCredentials{parsed.credentials, parsed.endpoint, refresh_after};
}

ICatalog::CredentialsRefreshCallback RestCatalog::getCredentialsConfigurationCallback(const DB::StorageID & storage_id)
Expand Down Expand Up @@ -1344,8 +1476,10 @@ ICatalog::CredentialsRefreshCallback RestCatalog::getCredentialsConfigurationCal
throw DB::Exception(DB::ErrorCodes::BAD_ARGUMENTS, "Cannot read table {}, because no 'metadata-location' in response", table_name);
}

auto [new_credentials, _] = getCredentialsAndEndpoint(config_object, location);
return new_credentials;
auto parsed = getCredentialsAndEndpoint(config_object, location);
/// Refresh the per-table cache so subsequent queries reuse these freshly vended credentials.
cacheCredentials(namespace_name, table_name, parsed);
return parsed.credentials;
};
}

Expand Down
33 changes: 32 additions & 1 deletion src/Databases/DataLake/RestCatalog.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,13 @@
#include <IO/ReadWriteBufferFromHTTP.h>
#include <IO/HTTPHeaderEntries.h>
#include <Interpreters/Context_fwd.h>
#include <base/defines.h>
#include <atomic>
#include <chrono>
#include <filesystem>
#include <map>
#include <mutex>
#include <optional>
#include <Poco/JSON/Object.h>

namespace DB
Expand All @@ -31,6 +37,13 @@ struct AccessToken
}
};

struct VendedStorageCredentials
{
std::shared_ptr<IStorageCredentials> credentials;
std::string endpoint;
std::optional<std::chrono::system_clock::time_point> expires_at;
};

class RestCatalog : public ICatalog, public DB::WithContext
{
public:
Expand Down Expand Up @@ -82,6 +95,8 @@ class RestCatalog : public ICatalog, public DB::WithContext

ICatalog::CredentialsRefreshCallback getCredentialsConfigurationCallback(const DB::StorageID & storage_id) override;

void setVendedCredentialsCacheTTL(std::chrono::seconds ttl) override { vended_credentials_cache_ttl.store(ttl, std::memory_order_relaxed); }

String getClientId() const { return client_id; }
String getClientSecret() const { return client_secret; }

Expand Down Expand Up @@ -127,6 +142,16 @@ class RestCatalog : public ICatalog, public DB::WithContext
bool oauth_server_use_request_body;
mutable std::optional<AccessToken> access_token;

/// TTL for caching vended credentials per table (0 means no caching).
std::atomic<std::chrono::seconds> vended_credentials_cache_ttl{std::chrono::seconds::zero()};

/// Sweep trigger threshold, not capacity!
static constexpr size_t credentials_cache_cleanup_threshold = 1000;
mutable std::mutex credentials_cache_mutex;

mutable std::map<std::pair<std::string, std::string>, VendedStorageCredentials> credentials_cache
TSA_GUARDED_BY(credentials_cache_mutex);

public:
class AllowedNamespaces
{
Expand Down Expand Up @@ -194,7 +219,13 @@ class RestCatalog : public ICatalog, public DB::WithContext
const String & method = Poco::Net::HTTPRequest::HTTP_POST,
bool ignore_result = false) const;

std::pair<std::shared_ptr<IStorageCredentials>, String> getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const;
VendedStorageCredentials getCredentialsAndEndpoint(Poco::JSON::Object::Ptr object, const String & location) const;

std::optional<VendedStorageCredentials> tryGetCachedCredentials(
const std::string & namespace_name, const std::string & table_name) const;

void cacheCredentials(
const std::string & namespace_name, const std::string & table_name, const VendedStorageCredentials & parsed) const;

AccessToken retrieveAccessToken() const;
};
Expand Down
Loading
Loading