diff --git a/pyiceberg/catalog/hive.py b/pyiceberg/catalog/hive.py index aee1a0d01e..aaaec2e7f3 100644 --- a/pyiceberg/catalog/hive.py +++ b/pyiceberg/catalog/hive.py @@ -127,9 +127,6 @@ HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name" HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive" -LOCK_ENABLED = "lock-enabled" -DEFAULT_LOCK_ENABLED = True - LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time" LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time" LOCK_CHECK_RETRIES = "lock-check-retries" @@ -304,7 +301,6 @@ def __init__(self, name: str, **properties: str): super().__init__(name, **properties) self._client = self._create_hive_client(properties) - self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED) self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME) self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME) self._lock_check_retries = property_as_float( @@ -593,6 +589,19 @@ def _do_commit( metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location ) + @staticmethod + def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool: + """Determine whether HMS locking is enabled for a commit. + + Matches the Java implementation in HiveTableOperations: checks the table property first, + then falls back to catalog properties, then defaults to True. + """ + if TableProperties.HIVE_LOCK_ENABLED in table_properties: + return property_as_bool( + table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT + ) + return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT) + def commit_table( self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...] ) -> CommitTableResponse: @@ -612,10 +621,11 @@ def commit_table( """ table_identifier = table.name() database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError) + lock_enabled = self._hive_lock_enabled(table.properties, self.properties) # commit to hive # https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232 with self._client as open_client: - if self._lock_enabled: + if lock_enabled: lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name)) try: diff --git a/pyiceberg/table/__init__.py b/pyiceberg/table/__init__.py index cc0d9ff341..eb044ed434 100644 --- a/pyiceberg/table/__init__.py +++ b/pyiceberg/table/__init__.py @@ -247,6 +247,9 @@ class TableProperties: MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep" MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1 + HIVE_LOCK_ENABLED = "engine.hive.lock-enabled" + HIVE_LOCK_ENABLED_DEFAULT = True + class Transaction: _table: Table diff --git a/tests/catalog/test_hive.py b/tests/catalog/test_hive.py index 29c1ccf943..83e25aea8e 100644 --- a/tests/catalog/test_hive.py +++ b/tests/catalog/test_hive.py @@ -51,7 +51,6 @@ LOCK_CHECK_MAX_WAIT_TIME, LOCK_CHECK_MIN_WAIT_TIME, LOCK_CHECK_RETRIES, - LOCK_ENABLED, HiveCatalog, _construct_hive_storage_descriptor, _HiveClient, @@ -66,6 +65,7 @@ ) from pyiceberg.partitioning import PartitionField, PartitionSpec from pyiceberg.schema import Schema +from pyiceberg.table import TableProperties from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2 from pyiceberg.table.refs import SnapshotRef, SnapshotRefType from pyiceberg.table.snapshots import ( @@ -1410,28 +1410,65 @@ def test_create_hive_client_with_kerberos_using_context_manager( assert open_client._iprot.trans.isOpen() -def test_lock_enabled_defaults_to_true() -> None: - """Verify that lock-enabled defaults to True for backward compatibility.""" +def test_hive_lock_enabled_defaults_to_true() -> None: + """Without any lock property set, locking should be enabled (backward compatible).""" + assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties={}) is True + + +def test_hive_lock_enabled_table_property_disables_lock() -> None: + """Table property engine.hive.lock-enabled=false disables locking.""" + table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties={}) is False + + +def test_hive_lock_enabled_catalog_property_disables_lock() -> None: + """Catalog property engine.hive.lock-enabled=false disables locking when table doesn't set it.""" + catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties=catalog_props) is False + + +def test_hive_lock_enabled_table_property_overrides_catalog() -> None: + """Table property takes precedence over catalog property.""" + table_props = {TableProperties.HIVE_LOCK_ENABLED: "true"} + catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is True + + table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"} + catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "true"} + assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is False + + +def test_commit_table_skips_locking_when_table_property_disables_it() -> None: + """When table property engine.hive.lock-enabled=false, commit_table must not lock/unlock.""" prop = {"uri": HIVE_METASTORE_FAKE_URL} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) - assert catalog._lock_enabled is True + catalog._client = MagicMock() + + mock_table = MagicMock() + mock_table.name.return_value = ("default", "my_table") + mock_table.properties = {TableProperties.HIVE_LOCK_ENABLED: "false"} + mock_do_commit = MagicMock() + mock_do_commit.return_value = MagicMock() -def test_lock_enabled_can_be_disabled() -> None: - """Verify that lock-enabled can be set to false.""" - prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} - catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) - assert catalog._lock_enabled is False + with patch.object(catalog, "_do_commit", mock_do_commit): + catalog.commit_table(mock_table, requirements=(), updates=()) + + mock_do_commit.assert_called_once() + catalog._client.__enter__().lock.assert_not_called() + catalog._client.__enter__().check_lock.assert_not_called() + catalog._client.__enter__().unlock.assert_not_called() -def test_commit_table_skips_locking_when_lock_disabled() -> None: - """When lock-enabled is false, commit_table must not call lock, check_lock, or unlock.""" - prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"} +def test_commit_table_skips_locking_when_catalog_property_disables_it() -> None: + """When catalog property engine.hive.lock-enabled=false, commit_table must not lock/unlock.""" + prop = {"uri": HIVE_METASTORE_FAKE_URL, TableProperties.HIVE_LOCK_ENABLED: "false"} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) catalog._client = MagicMock() mock_table = MagicMock() mock_table.name.return_value = ("default", "my_table") + mock_table.properties = {} mock_do_commit = MagicMock() mock_do_commit.return_value = MagicMock() @@ -1439,17 +1476,14 @@ def test_commit_table_skips_locking_when_lock_disabled() -> None: with patch.object(catalog, "_do_commit", mock_do_commit): catalog.commit_table(mock_table, requirements=(), updates=()) - # The core commit logic should still be called mock_do_commit.assert_called_once() - - # But no locking operations should have been performed catalog._client.__enter__().lock.assert_not_called() catalog._client.__enter__().check_lock.assert_not_called() catalog._client.__enter__().unlock.assert_not_called() -def test_commit_table_uses_locking_when_lock_enabled() -> None: - """When lock-enabled is true (default), commit_table must call lock and unlock.""" +def test_commit_table_uses_locking_by_default() -> None: + """When no lock property is set, commit_table must acquire and release a lock.""" lockid = 99999 prop = {"uri": HIVE_METASTORE_FAKE_URL} catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop) @@ -1462,6 +1496,7 @@ def test_commit_table_uses_locking_when_lock_enabled() -> None: mock_table = MagicMock() mock_table.name.return_value = ("default", "my_table") + mock_table.properties = {} mock_do_commit = MagicMock() mock_do_commit.return_value = MagicMock() @@ -1469,8 +1504,6 @@ def test_commit_table_uses_locking_when_lock_enabled() -> None: with patch.object(catalog, "_do_commit", mock_do_commit): catalog.commit_table(mock_table, requirements=(), updates=()) - # Locking operations should have been performed mock_client.lock.assert_called_once() mock_client.unlock.assert_called_once() - # The core commit logic should still be called mock_do_commit.assert_called_once()