Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions pyiceberg/catalog/hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,6 @@
HIVE_KERBEROS_SERVICE_NAME = "hive.kerberos-service-name"
HIVE_KERBEROS_SERVICE_NAME_DEFAULT = "hive"

LOCK_ENABLED = "lock-enabled"
DEFAULT_LOCK_ENABLED = True

LOCK_CHECK_MIN_WAIT_TIME = "lock-check-min-wait-time"
LOCK_CHECK_MAX_WAIT_TIME = "lock-check-max-wait-time"
LOCK_CHECK_RETRIES = "lock-check-retries"
Expand Down Expand Up @@ -304,7 +301,6 @@ def __init__(self, name: str, **properties: str):
super().__init__(name, **properties)
self._client = self._create_hive_client(properties)

self._lock_enabled = property_as_bool(properties, LOCK_ENABLED, DEFAULT_LOCK_ENABLED)
self._lock_check_min_wait_time = property_as_float(properties, LOCK_CHECK_MIN_WAIT_TIME, DEFAULT_LOCK_CHECK_MIN_WAIT_TIME)
self._lock_check_max_wait_time = property_as_float(properties, LOCK_CHECK_MAX_WAIT_TIME, DEFAULT_LOCK_CHECK_MAX_WAIT_TIME)
self._lock_check_retries = property_as_float(
Expand Down Expand Up @@ -593,6 +589,19 @@ def _do_commit(
metadata=updated_staged_table.metadata, metadata_location=updated_staged_table.metadata_location
)

@staticmethod
def _hive_lock_enabled(table_properties: Properties, catalog_properties: Properties) -> bool:
"""Determine whether HMS locking is enabled for a commit.

Matches the Java implementation in HiveTableOperations: checks the table property first,
then falls back to catalog properties, then defaults to True.
"""
if TableProperties.HIVE_LOCK_ENABLED in table_properties:
return property_as_bool(
table_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT
)
return property_as_bool(catalog_properties, TableProperties.HIVE_LOCK_ENABLED, TableProperties.HIVE_LOCK_ENABLED_DEFAULT)

def commit_table(
self, table: Table, requirements: tuple[TableRequirement, ...], updates: tuple[TableUpdate, ...]
) -> CommitTableResponse:
Expand All @@ -612,10 +621,11 @@ def commit_table(
"""
table_identifier = table.name()
database_name, table_name = self.identifier_to_database_and_table(table_identifier, NoSuchTableError)
lock_enabled = self._hive_lock_enabled(table.properties, self.properties)
# commit to hive
# https://github.com/apache/hive/blob/master/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift#L1232
with self._client as open_client:
if self._lock_enabled:
if lock_enabled:
lock: LockResponse = open_client.lock(self._create_lock_request(database_name, table_name))

try:
Expand Down
3 changes: 3 additions & 0 deletions pyiceberg/table/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,9 @@ class TableProperties:
MIN_SNAPSHOTS_TO_KEEP = "history.expire.min-snapshots-to-keep"
MIN_SNAPSHOTS_TO_KEEP_DEFAULT = 1

HIVE_LOCK_ENABLED = "engine.hive.lock-enabled"
HIVE_LOCK_ENABLED_DEFAULT = True


class Transaction:
_table: Table
Expand Down
71 changes: 52 additions & 19 deletions tests/catalog/test_hive.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@
LOCK_CHECK_MAX_WAIT_TIME,
LOCK_CHECK_MIN_WAIT_TIME,
LOCK_CHECK_RETRIES,
LOCK_ENABLED,
HiveCatalog,
_construct_hive_storage_descriptor,
_HiveClient,
Expand All @@ -66,6 +65,7 @@
)
from pyiceberg.partitioning import PartitionField, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.table import TableProperties
from pyiceberg.table.metadata import TableMetadataUtil, TableMetadataV1, TableMetadataV2
from pyiceberg.table.refs import SnapshotRef, SnapshotRefType
from pyiceberg.table.snapshots import (
Expand Down Expand Up @@ -1410,46 +1410,80 @@ def test_create_hive_client_with_kerberos_using_context_manager(
assert open_client._iprot.trans.isOpen()


def test_lock_enabled_defaults_to_true() -> None:
"""Verify that lock-enabled defaults to True for backward compatibility."""
def test_hive_lock_enabled_defaults_to_true() -> None:
"""Without any lock property set, locking should be enabled (backward compatible)."""
assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties={}) is True


def test_hive_lock_enabled_table_property_disables_lock() -> None:
"""Table property engine.hive.lock-enabled=false disables locking."""
table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties={}) is False


def test_hive_lock_enabled_catalog_property_disables_lock() -> None:
"""Catalog property engine.hive.lock-enabled=false disables locking when table doesn't set it."""
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
assert HiveCatalog._hive_lock_enabled(table_properties={}, catalog_properties=catalog_props) is False


def test_hive_lock_enabled_table_property_overrides_catalog() -> None:
"""Table property takes precedence over catalog property."""
table_props = {TableProperties.HIVE_LOCK_ENABLED: "true"}
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is True

table_props = {TableProperties.HIVE_LOCK_ENABLED: "false"}
catalog_props = {TableProperties.HIVE_LOCK_ENABLED: "true"}
assert HiveCatalog._hive_lock_enabled(table_properties=table_props, catalog_properties=catalog_props) is False


def test_commit_table_skips_locking_when_table_property_disables_it() -> None:
"""When table property engine.hive.lock-enabled=false, commit_table must not lock/unlock."""
prop = {"uri": HIVE_METASTORE_FAKE_URL}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
assert catalog._lock_enabled is True
catalog._client = MagicMock()

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")
mock_table.properties = {TableProperties.HIVE_LOCK_ENABLED: "false"}

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

def test_lock_enabled_can_be_disabled() -> None:
"""Verify that lock-enabled can be set to false."""
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
assert catalog._lock_enabled is False
with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

mock_do_commit.assert_called_once()
catalog._client.__enter__().lock.assert_not_called()
catalog._client.__enter__().check_lock.assert_not_called()
catalog._client.__enter__().unlock.assert_not_called()


def test_commit_table_skips_locking_when_lock_disabled() -> None:
"""When lock-enabled is false, commit_table must not call lock, check_lock, or unlock."""
prop = {"uri": HIVE_METASTORE_FAKE_URL, LOCK_ENABLED: "false"}
def test_commit_table_skips_locking_when_catalog_property_disables_it() -> None:
"""When catalog property engine.hive.lock-enabled=false, commit_table must not lock/unlock."""
prop = {"uri": HIVE_METASTORE_FAKE_URL, TableProperties.HIVE_LOCK_ENABLED: "false"}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
catalog._client = MagicMock()

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")
mock_table.properties = {}

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

# The core commit logic should still be called
mock_do_commit.assert_called_once()

# But no locking operations should have been performed
catalog._client.__enter__().lock.assert_not_called()
catalog._client.__enter__().check_lock.assert_not_called()
catalog._client.__enter__().unlock.assert_not_called()


def test_commit_table_uses_locking_when_lock_enabled() -> None:
"""When lock-enabled is true (default), commit_table must call lock and unlock."""
def test_commit_table_uses_locking_by_default() -> None:
"""When no lock property is set, commit_table must acquire and release a lock."""
lockid = 99999
prop = {"uri": HIVE_METASTORE_FAKE_URL}
catalog = HiveCatalog(HIVE_CATALOG_NAME, **prop)
Expand All @@ -1462,15 +1496,14 @@ def test_commit_table_uses_locking_when_lock_enabled() -> None:

mock_table = MagicMock()
mock_table.name.return_value = ("default", "my_table")
mock_table.properties = {}

mock_do_commit = MagicMock()
mock_do_commit.return_value = MagicMock()

with patch.object(catalog, "_do_commit", mock_do_commit):
catalog.commit_table(mock_table, requirements=(), updates=())

# Locking operations should have been performed
mock_client.lock.assert_called_once()
mock_client.unlock.assert_called_once()
# The core commit logic should still be called
mock_do_commit.assert_called_once()