Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 40 additions & 2 deletions pyiceberg/catalog/rest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
PlanSubmitted,
PlanTableScanRequest,
ScanTasks,
StorageCredential,
)
from pyiceberg.exceptions import (
AuthorizationExpiredError,
Expand Down Expand Up @@ -256,6 +257,7 @@ class TableResponse(IcebergBaseModel):
metadata_location: str | None = Field(alias="metadata-location", default=None)
metadata: TableMetadata
config: Properties = Field(default_factory=dict)
storage_credentials: list[StorageCredential] | None = Field(alias="storage-credentials", default=None)


class CreateTableRequest(IcebergBaseModel):
Expand Down Expand Up @@ -728,13 +730,40 @@ def add_headers(self, request: PreparedRequest, **kwargs: Any) -> None: # pylin

session.mount(self.uri, SigV4Adapter(**self.properties))

@staticmethod
def _get_credentials(
storage_credentials: list[StorageCredential] | None,
config: Properties,
metadata_location: str | None,
table_location: str | None,
) -> Properties:
if not storage_credentials:
return config
target = metadata_location or table_location
if not target:
return config
matching = [sc for sc in storage_credentials if target.startswith(sc.prefix)]
if not matching:
return config
selected = max(matching, key=lambda sc: len(sc.prefix))
return selected.config

def _response_to_table(self, identifier_tuple: tuple[str, ...], table_response: TableResponse) -> Table:
return Table(
identifier=identifier_tuple,
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
{
**table_response.metadata.properties,
**self._get_credentials(
table_response.storage_credentials,
table_response.config,
table_response.metadata_location,
getattr(table_response.metadata, "location", None),
),
},
table_response.metadata_location,
),
catalog=self,
config=table_response.config,
Expand All @@ -746,7 +775,16 @@ def _response_to_staged_table(self, identifier_tuple: tuple[str, ...], table_res
metadata_location=table_response.metadata_location, # type: ignore
metadata=table_response.metadata,
io=self._load_file_io(
{**table_response.metadata.properties, **table_response.config}, table_response.metadata_location
{
**table_response.metadata.properties,
**self._get_credentials(
table_response.storage_credentials,
table_response.config,
table_response.metadata_location,
getattr(table_response.metadata, "location", None),
),
},
table_response.metadata_location,
),
catalog=self,
)
Expand Down
71 changes: 71 additions & 0 deletions tests/catalog/test_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2351,3 +2351,74 @@ def test_table_uuid_check_on_refresh(rest_mock: Mocker, example_table_metadata_v
assert "Table UUID does not match" in str(exc_info.value)
assert f"current={original_uuid}" in str(exc_info.value)
assert f"refreshed={different_uuid}" in str(exc_info.value)


def test_storage_credentials_over_config(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
) -> None:
response_with_storage_creds = {
**example_table_metadata_with_snapshot_v1_rest_json,
"storage-credentials": [
{
"prefix": "s3://warehouse/",
"config": {
"s3.access-key-id": "storage-cred-key",
"s3.secret-access-key": "storage-cred-secret",
},
}
],
}
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=response_with_storage_creds,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("fokko", "table"))
assert table.io.properties["s3.access-key-id"] == "storage-cred-key"
assert table.io.properties["s3.secret-access-key"] == "storage-cred-secret"


def test_config_when_no_storage_credentials(
rest_mock: Mocker, example_table_metadata_with_snapshot_v1_rest_json: dict[str, Any]
) -> None:
rest_mock.get(
f"{TEST_URI}v1/namespaces/fokko/tables/table",
json=example_table_metadata_with_snapshot_v1_rest_json,
status_code=200,
request_headers=TEST_HEADERS,
)
catalog = RestCatalog("rest", uri=TEST_URI, token=TEST_TOKEN)
table = catalog.load_table(("fokko", "table"))
# config from the fixture should be used since there are no storage-credentials
assert table.io.properties["region"] == "us-west-2"


def test_storage_credentials_no_prefix_match() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

creds = [StorageCredential(prefix="s3://other-bucket/", config={"key": "val"})]
result = RestCatalog._get_credentials(
storage_credentials=creds,
config={"fallback-key": "fallback-val"},
metadata_location="s3://warehouse/database/table/metadata/file.json",
table_location="s3://warehouse/database/table",
)
assert result == {"fallback-key": "fallback-val"}


def test_storage_credentials_longest_prefix_wins() -> None:
from pyiceberg.catalog.rest.scan_planning import StorageCredential

creds = [
StorageCredential(prefix="s3://warehouse/", config={"key": "short-prefix"}),
StorageCredential(prefix="s3://warehouse/database/table/", config={"key": "long-prefix"}),
]
result = RestCatalog._get_credentials(
storage_credentials=creds,
config={"key": "fallback"},
metadata_location="s3://warehouse/database/table/metadata/file.json",
table_location="s3://warehouse/database/table",
)
assert result == {"key": "long-prefix"}