diff --git a/hazelcast/internal/asyncio_proxy/base.py b/hazelcast/internal/asyncio_proxy/base.py index 136a6b8200..d075ee6d37 100644 --- a/hazelcast/internal/asyncio_proxy/base.py +++ b/hazelcast/internal/asyncio_proxy/base.py @@ -248,3 +248,15 @@ def get_entry_listener_flags(**kwargs): if value: flags |= getattr(EntryEventType, key) return flags + + +def task_id(): + # Builtin id function returns an integer which is guaranteed to be unique among existing objects. + # The returned id is derived from the memory location of the object in CPython (the only Python implementation we officially support). + # See: https://docs.python.org/3/library/functions.html#id + # The address space limit for 64bit systems is 52bits (AMD64) to 56bits (ARM64), so the id fits into a long comfortably: + # See: https://en.wikipedia.org/wiki/64-bit_computing#Limits_of_processors + # Since the task itself is an object, the id can be used as a pseudo-id for the task. + # When the task ends, the id can be assigned to another task, but that's not an issue. + # Since the task id is used to distinguish between running tasks. --YT + return id(asyncio.current_task()) diff --git a/hazelcast/internal/asyncio_proxy/cp.py b/hazelcast/internal/asyncio_proxy/cp.py index 0138ebcbe6..6b7493cfec 100644 --- a/hazelcast/internal/asyncio_proxy/cp.py +++ b/hazelcast/internal/asyncio_proxy/cp.py @@ -8,6 +8,7 @@ CPGroupDestroyedError, ) from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.internal.asyncio_proxy.base import task_id from hazelcast.protocol import RaftGroupId from hazelcast.protocol.codec import ( cp_group_destroy_cp_object_codec, @@ -115,8 +116,7 @@ async def get_or_create_unique_thread_id(self, group_id): if self._shutdown: raise HazelcastClientNotActiveError("Session manager is already shut down!") - # TODO: replace 0 with the lock context once implemented - key = (group_id, 0) + key = (group_id, task_id()) global_thread_id = self._thread_ids.get(key) if global_thread_id: return global_thread_id diff --git a/hazelcast/internal/asyncio_proxy/map.py b/hazelcast/internal/asyncio_proxy/map.py index 94845135c4..0f5b76c31f 100644 --- a/hazelcast/internal/asyncio_proxy/map.py +++ b/hazelcast/internal/asyncio_proxy/map.py @@ -5,6 +5,7 @@ from hazelcast.aggregator import Aggregator from hazelcast.config import IndexUtil, IndexType, IndexConfig from hazelcast.core import SimpleEntryView +from hazelcast.internal.asyncio_invocation import Invocation from hazelcast.projection import Projection from hazelcast.protocol import PagingPredicateHolder from hazelcast.protocol.codec import ( @@ -65,12 +66,19 @@ map_remove_interceptor_codec, map_remove_all_codec, map_add_near_cache_invalidation_listener_codec, + map_force_unlock_codec, + map_lock_codec, + map_try_lock_codec, + map_is_locked_codec, + map_unlock_codec, ) from hazelcast.internal.asyncio_proxy.base import ( Proxy, EntryEvent, EntryEventType, get_entry_listener_flags, + task_id, + MAX_SIZE, ) from hazelcast.predicate import Predicate, _PagingPredicate from hazelcast.serialization.data import Data @@ -78,7 +86,6 @@ from hazelcast.serialization.compact import SchemaNotReplicatedError from hazelcast.util import ( check_not_none, - thread_id, to_millis, IterationType, deserialize_entry_list_in_place, @@ -334,7 +341,7 @@ async def add_index( >>> employees = await client.get_map("employees") >>> await employees.add_index(attributes=["age"]) # Sorted index for range queries - >>> await employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates + >>> await employees.add_index(attributes=["active"], index_type=IndexType.HASH) # Hash index for equality predicates Index attribute should either have a getter method or be public. You should also make sure to add the indexes before adding @@ -361,7 +368,7 @@ async def add_index( possible values. - **unique_key_transformation** (int|str): The transformation is applied to every value extracted from the unique key - attribue. Defaults to ``OBJECT``. See the + attribute. Defaults to ``OBJECT``. See the :class:`hazelcast.config.UniqueKeyTransformation` for possible values. """ @@ -526,7 +533,7 @@ async def entry_set( Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Args: predicate: Predicate for the map to filter entries. @@ -612,7 +619,7 @@ async def execute_on_entries( Args: entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must - have a serializable EntryProcessor counter part registered + have a serializable EntryProcessor counterpart registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. predicate: Predicate for filtering the entries. @@ -662,7 +669,7 @@ async def execute_on_key(self, key: KeyType, entry_processor: typing.Any) -> typ key: Specified key for the entry to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must - have a serializable EntryProcessor counter part registered on + have a serializable EntryProcessor counterpart registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. @@ -689,7 +696,7 @@ async def execute_on_keys( keys: Collection of the keys for the entries to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must - have a serializable EntryProcessor counter part registered on + have a serializable EntryProcessor counterpart registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. @@ -723,6 +730,32 @@ async def flush(self) -> None: request = map_flush_codec.encode_request(self.name) return await self._invoke(request) + async def force_unlock(self, key: KeyType) -> None: + """Releases the lock for the specified key regardless of the lock + owner. + + It always successfully unlocks the key, never blocks, and returns + immediately. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key to lock. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.force_unlock, key) + + request = map_force_unlock_codec.encode_request( + self.name, key_data, self._reference_id_generator.get_and_increment() + ) + return await self._invoke_on_key(request, key_data) + async def get(self, key: KeyType) -> typing.Optional[ValueType]: """Returns the value for the specified key, or ``None`` if this map does not contain this key. @@ -760,7 +793,7 @@ async def get_all(self, keys: typing.Sequence[KeyType]) -> typing.Dict[KeyType, Warning: The returned map is NOT backed by the original map, so changes to the original map are NOT reflected in the returned map, and - vice-versa. + vice versa. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form @@ -827,7 +860,7 @@ def handler(message): entry_view.value = self._to_object(entry_view.value) return entry_view - request = map_get_entry_view_codec.encode_request(self.name, key_data, thread_id()) + request = map_get_entry_view_codec.encode_request(self.name, key_data, task_id()) return await self._invoke_on_key(request, key_data, handler) async def is_empty(self) -> bool: @@ -840,13 +873,36 @@ async def is_empty(self) -> bool: request = map_is_empty_codec.encode_request(self.name) return await self._invoke(request, map_is_empty_codec.decode_response) + async def is_locked(self, key: KeyType) -> bool: + """Checks the lock for the specified key. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key that is checked for lock + + Returns: + ``True`` if lock is acquired, ``False`` otherwise. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.is_locked, key) + + request = map_is_locked_codec.encode_request(self.name, key_data) + return await self._invoke_on_key(request, key_data, map_is_locked_codec.decode_response) + async def key_set(self, predicate: Predicate | None = None) -> typing.List[ValueType]: """Returns a List clone of the keys contained in this map or the keys of the entries filtered with the predicate if provided. Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Args: predicate: Predicate to filter the entries. @@ -918,6 +974,53 @@ async def load_all( request = map_load_all_codec.encode_request(self.name, replace_existing_values) return await self._invoke(request) + async def lock(self, key: KeyType, lease_time: float = None) -> None: + """Acquires the lock for the specified key infinitely or for the + specified lease time if provided. + + If the lock is not available, the current task becomes disabled for + scheduling purposes and lies dormant until the lock has been acquired. + + You get a lock whether the value is present in the map or not. Other + tasks (possibly on other systems) would block on their invoke of + lock() until the non-existent key is unlocked. If the lock holder + introduces the key to the map, the put() operation is not blocked. If + a task not holding a lock on the non-existent key tries to introduce + the key while a lock exists on the non-existent key, the put() + operation blocks until it is unlocked. + + Scope of the lock is this map only. Acquired lock is only for the key + in this map. + + Locks are re-entrant; so, if the key is locked N times, it should be + unlocked N times before another thread can acquire it. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key to lock. + lease_time: Time in seconds to wait before releasing the lock. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.lock, key, lease_time) + + request = map_lock_codec.encode_request( + self.name, + key_data, + task_id(), + to_millis(lease_time), + self._reference_id_generator.get_and_increment(), + ) + partition_id = self._context.partition_service.get_partition_id(key_data) + invocation = Invocation(request, partition_id=partition_id, timeout=MAX_SIZE) + return await self._invocation_service.ainvoke(invocation) + async def project( self, projection: Projection[ProjectionType], predicate: Predicate = None ) -> ProjectionType: @@ -1374,6 +1477,54 @@ async def size(self) -> int: request = map_size_codec.encode_request(self.name) return await self._invoke(request, map_size_codec.decode_response) + async def try_lock(self, key: KeyType, lease_time: float = None, timeout: float = 0) -> bool: + """Tries to acquire the lock for the specified key. + + When the lock is not available: + + - If the timeout is not provided, the current task doesn't wait and + returns ``False`` immediately. + - If the timeout is provided, the current task becomes disabled for + thread scheduling purposes and lies dormant until one of the + followings happens: + + - The lock is acquired by the current task, or + - The specified waiting time elapses. + + If the lease time is provided, lock will be released after this time + elapses. + + Args: + key: Key to lock in this map. + lease_time: Time in seconds to wait before releasing the lock. + timeout: Maximum time in seconds to wait for the lock. + + Returns: + ``True`` if the lock was acquired, ``False`` otherwise. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.try_lock, key, lease_time, timeout) + + request = map_try_lock_codec.encode_request( + self.name, + key_data, + task_id(), + to_millis(lease_time), + to_millis(timeout), + self._reference_id_generator.get_and_increment(), + ) + partition_id = self._context.partition_service.get_partition_id(key_data) + invocation = Invocation( + request, + partition_id=partition_id, + timeout=MAX_SIZE, + response_handler=map_try_lock_codec.decode_response, + ) + return await self._invocation_service.ainvoke(invocation) + async def try_put(self, key: KeyType, value: ValueType, timeout: float = 0) -> bool: """Tries to put the given key and value into this map and returns immediately if timeout is not provided. @@ -1419,13 +1570,34 @@ async def try_remove(self, key: KeyType, timeout: float = 0) -> bool: return await self._send_schema_and_retry(e, self.try_remove, key, timeout) return await self._try_remove_internal(key_data, timeout) + async def unlock(self, key: KeyType) -> None: + """Releases the lock for the specified key. + + It never blocks and returns immediately. If the current task is the + holder of this lock, then the hold count is decremented. If the hold + count is zero, then the lock is released. + + Args: + key: The key to lock. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.unlock, key) + + request = map_unlock_codec.encode_request( + self.name, key_data, task_id(), self._reference_id_generator.get_and_increment() + ) + return await self._invoke_on_key(request, key_data) + async def values(self, predicate: Predicate = None) -> typing.List[ValueType]: """Returns a list clone of the values contained in this map or values of the entries which are filtered with the predicate if provided. Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Args: predicate: Predicate to filter the entries. @@ -1473,14 +1645,14 @@ def handler(message): return await self._invoke(request, handler) def _contains_key_internal(self, key_data): - request = map_contains_key_codec.encode_request(self.name, key_data, thread_id()) + request = map_contains_key_codec.encode_request(self.name, key_data, task_id()) return self._invoke_on_key(request, key_data, map_contains_key_codec.decode_response) def _get_internal(self, key_data): def handler(message): return self._to_object(map_get_codec.decode_response(message)) - request = map_get_codec.encode_request(self.name, key_data, thread_id()) + request = map_get_codec.encode_request(self.name, key_data, task_id()) return self._invoke_on_key(request, key_data, handler) async def _get_all_internal(self, partition_to_keys, tasks=None): @@ -1501,7 +1673,7 @@ def _remove_internal(self, key_data): def handler(message): return self._to_object(map_remove_codec.decode_response(message)) - request = map_remove_codec.encode_request(self.name, key_data, thread_id()) + request = map_remove_codec.encode_request(self.name, key_data, task_id()) return self._invoke_on_key(request, key_data, handler) def _remove_all_internal(self, predicate_data): @@ -1510,14 +1682,14 @@ def _remove_all_internal(self, predicate_data): def _remove_if_same_internal_(self, key_data, value_data): request = map_remove_if_same_codec.encode_request( - self.name, key_data, value_data, thread_id() + self.name, key_data, value_data, task_id() ) return self._invoke_on_key( request, key_data, response_handler=map_remove_if_same_codec.decode_response ) def _delete_internal(self, key_data): - request = map_delete_codec.encode_request(self.name, key_data, thread_id()) + request = map_delete_codec.encode_request(self.name, key_data, task_id()) return self._invoke_on_key(request, key_data) async def _put_internal(self, key_data, value_data, ttl, max_idle): @@ -1526,22 +1698,22 @@ def handler(message): if max_idle is not None: request = map_put_with_max_idle_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) + self.name, key_data, value_data, task_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl) + self.name, key_data, value_data, task_id(), to_millis(ttl) ) return await self._invoke_on_key(request, key_data, handler) def _set_internal(self, key_data, value_data, ttl, max_idle): if max_idle is not None: request = map_set_with_max_idle_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) + self.name, key_data, value_data, task_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_set_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl) + self.name, key_data, value_data, task_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data) @@ -1551,24 +1723,24 @@ def _set_ttl_internal(self, key_data, ttl): def _try_remove_internal(self, key_data, timeout): request = map_try_remove_codec.encode_request( - self.name, key_data, thread_id(), to_millis(timeout) + self.name, key_data, task_id(), to_millis(timeout) ) return self._invoke_on_key(request, key_data, map_try_remove_codec.decode_response) def _try_put_internal(self, key_data, value_data, timeout): request = map_try_put_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(timeout) + self.name, key_data, value_data, task_id(), to_millis(timeout) ) return self._invoke_on_key(request, key_data, map_try_put_codec.decode_response) def _put_transient_internal(self, key_data, value_data, ttl, max_idle): if max_idle is not None: request = map_put_transient_with_max_idle_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) + self.name, key_data, value_data, task_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_transient_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl) + self.name, key_data, value_data, task_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data) @@ -1578,17 +1750,17 @@ def handler(message): if max_idle is not None: request = map_put_if_absent_with_max_idle_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl), to_millis(max_idle) + self.name, key_data, value_data, task_id(), to_millis(ttl), to_millis(max_idle) ) else: request = map_put_if_absent_codec.encode_request( - self.name, key_data, value_data, thread_id(), to_millis(ttl) + self.name, key_data, value_data, task_id(), to_millis(ttl) ) return self._invoke_on_key(request, key_data, handler) def _replace_if_same_internal(self, key_data, old_value_data, new_value_data): request = map_replace_if_same_codec.encode_request( - self.name, key_data, old_value_data, new_value_data, thread_id() + self.name, key_data, old_value_data, new_value_data, task_id() ) return self._invoke_on_key(request, key_data, map_replace_if_same_codec.decode_response) @@ -1596,11 +1768,11 @@ def _replace_internal(self, key_data, value_data): def handler(message): return self._to_object(map_replace_codec.decode_response(message)) - request = map_replace_codec.encode_request(self.name, key_data, value_data, thread_id()) + request = map_replace_codec.encode_request(self.name, key_data, value_data, task_id()) return self._invoke_on_key(request, key_data, handler) def _evict_internal(self, key_data): - request = map_evict_codec.encode_request(self.name, key_data, thread_id()) + request = map_evict_codec.encode_request(self.name, key_data, task_id()) return self._invoke_on_key(request, key_data, map_evict_codec.decode_response) def _load_all_internal(self, key_data_list, replace_existing_values): @@ -1614,7 +1786,7 @@ def handler(message): return self._to_object(map_execute_on_key_codec.decode_response(message)) request = map_execute_on_key_codec.encode_request( - self.name, entry_processor_data, key_data, thread_id() + self.name, entry_processor_data, key_data, task_id() ) return self._invoke_on_key(request, key_data, handler) diff --git a/hazelcast/internal/asyncio_proxy/multi_map.py b/hazelcast/internal/asyncio_proxy/multi_map.py index 1895b46dcc..546e053df4 100644 --- a/hazelcast/internal/asyncio_proxy/multi_map.py +++ b/hazelcast/internal/asyncio_proxy/multi_map.py @@ -21,8 +21,13 @@ multi_map_size_codec, multi_map_value_count_codec, multi_map_values_codec, + multi_map_is_locked_codec, + multi_map_force_unlock_codec, + multi_map_lock_codec, + multi_map_try_lock_codec, + multi_map_unlock_codec, ) -from hazelcast.internal.asyncio_proxy.base import Proxy, EntryEvent, EntryEventType +from hazelcast.internal.asyncio_proxy.base import Proxy, EntryEvent, EntryEventType, task_id from hazelcast.types import ValueType, KeyType from hazelcast.serialization.data import Data from hazelcast.serialization.compact import SchemaNotReplicatedError @@ -30,12 +35,11 @@ check_not_none, deserialize_list_in_place, deserialize_entry_list_in_place, + to_millis, ) EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None] -default_thread_id = 0 - class MultiMap(Proxy, typing.Generic[KeyType, ValueType]): """A specialized map whose keys can be associated with multiple values. @@ -46,11 +50,12 @@ class MultiMap(Proxy, typing.Generic[KeyType, ValueType]): >>> print("get", await my_map.get("key")) Warning: - Asyncio client multi map proxy is not thread-safe, do not access it from other threads. + Asyncio client multimap proxy is not thread-safe, do not access it from other threads. """ def __init__(self, service_name, name, context): super(MultiMap, self).__init__(service_name, name, context) + self._reference_id_generator = context.lock_reference_id_generator async def add_entry_listener( self, @@ -158,9 +163,7 @@ async def contains_key(self, key: KeyType) -> bool: except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.contains_key, key) - request = multi_map_contains_key_codec.encode_request( - self.name, key_data, default_thread_id - ) + request = multi_map_contains_key_codec.encode_request(self.name, key_data, task_id()) return await self._invoke_on_key( request, key_data, multi_map_contains_key_codec.decode_response ) @@ -205,7 +208,7 @@ async def contains_entry(self, key: KeyType, value: ValueType) -> bool: return await self._send_schema_and_retry(e, self.contains_entry, key, value) request = multi_map_contains_entry_codec.encode_request( - self.name, key_data, value_data, default_thread_id + self.name, key_data, value_data, task_id() ) return await self._invoke_on_key( request, key_data, multi_map_contains_entry_codec.decode_response @@ -221,7 +224,7 @@ async def entry_set(self) -> typing.List[typing.Tuple[KeyType, ValueType]]: Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Returns: The list of key-value tuples in the multimap. @@ -245,7 +248,7 @@ async def get(self, key: KeyType) -> typing.Optional[typing.List[ValueType]]: Warning: The list is NOT backed by the multimap, so changes to the map are - not reflected in the collection, and vice-versa. + not reflected in the collection, and vice versa. Args: key: The specified key. @@ -263,15 +266,66 @@ def handler(message): data_list = multi_map_get_codec.decode_response(message) return deserialize_list_in_place(data_list, self._to_object) - request = multi_map_get_codec.encode_request(self.name, key_data, default_thread_id) + request = multi_map_get_codec.encode_request(self.name, key_data, task_id()) return await self._invoke_on_key(request, key_data, handler) + async def is_locked(self, key: KeyType) -> bool: + """Checks the lock for the specified key. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key that is checked for lock. + + Returns: + ``True`` if lock is acquired, ``False`` otherwise. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.is_locked, key) + + request = multi_map_is_locked_codec.encode_request(self.name, key_data) + return await self._invoke_on_key( + request, key_data, multi_map_is_locked_codec.decode_response + ) + + async def force_unlock(self, key: KeyType) -> None: + """Releases the lock for the specified key regardless of the lock + owner. + + It always successfully unlocks the key, never blocks, and returns + immediately. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key to lock. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.force_unlock, key) + + request = multi_map_force_unlock_codec.encode_request( + self.name, key_data, self._reference_id_generator.get_and_increment() + ) + return await self._invoke_on_key(request, key_data) + async def key_set(self) -> typing.List[KeyType]: """Returns the list of keys in the multimap. Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Returns: A list of the clone of the keys. @@ -284,6 +338,44 @@ def handler(message): request = multi_map_key_set_codec.encode_request(self.name) return await self._invoke(request, handler) + async def lock(self, key: KeyType, lease_time: float = None) -> None: + """Acquires the lock for the specified key infinitely or for the + specified lease time if provided. + + If the lock is not available, the current task becomes disabled for + scheduling purposes and lies dormant until the lock has been + acquired. + + Scope of the lock is this map only. Acquired lock is only for the key + in this map. + + Locks are re-entrant; so, if the key is locked N times, it should be + unlocked N times before another thread can acquire it. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key to lock. + lease_time: Time in seconds to wait before releasing the lock. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.lock, key, lease_time) + + request = multi_map_lock_codec.encode_request( + self.name, + key_data, + task_id(), + to_millis(lease_time), + self._reference_id_generator.get_and_increment(), + ) + return await self._invoke_on_key(request, key_data) + async def remove(self, key: KeyType, value: ValueType) -> bool: """Removes the given key-value tuple from the multimap. @@ -309,7 +401,7 @@ async def remove(self, key: KeyType, value: ValueType) -> bool: return await self._send_schema_and_retry(e, self.remove, key, value) request = multi_map_remove_entry_codec.encode_request( - self.name, key_data, value_data, default_thread_id + self.name, key_data, value_data, task_id() ) return await self._invoke_on_key( request, key_data, multi_map_remove_entry_codec.decode_response @@ -326,7 +418,7 @@ async def remove_all(self, key: KeyType) -> typing.List[ValueType]: Warning: The returned list is NOT backed by the map, so changes to the map - are NOT reflected in the list, and vice-versa. + are NOT reflected in the list, and vice versa. Args: key: The key of the entries to remove. @@ -345,7 +437,7 @@ def handler(message): except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.remove_all, key) - request = multi_map_remove_codec.encode_request(self.name, key_data, default_thread_id) + request = multi_map_remove_codec.encode_request(self.name, key_data, task_id()) return await self._invoke_on_key(request, key_data, handler) async def put(self, key: KeyType, value: ValueType) -> bool: @@ -372,9 +464,7 @@ async def put(self, key: KeyType, value: ValueType) -> bool: except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.put, key, value) - request = multi_map_put_codec.encode_request( - self.name, key_data, value_data, default_thread_id - ) + request = multi_map_put_codec.encode_request(self.name, key_data, value_data, task_id()) return await self._invoke_on_key(request, key_data, multi_map_put_codec.decode_response) async def put_all(self, multimap: typing.Dict[KeyType, typing.Sequence[ValueType]]) -> None: @@ -464,7 +554,7 @@ async def value_count(self, key: KeyType) -> int: except SchemaNotReplicatedError as e: return await self._send_schema_and_retry(e, self.value_count, key) - request = multi_map_value_count_codec.encode_request(self.name, key_data, default_thread_id) + request = multi_map_value_count_codec.encode_request(self.name, key_data, task_id()) return await self._invoke_on_key( request, key_data, multi_map_value_count_codec.decode_response ) @@ -474,7 +564,7 @@ async def values(self) -> typing.List[ValueType]: Warning: The returned list is NOT backed by the map, so changes to the map - are NOT reflected in the list, and vice-versa. + are NOT reflected in the list, and vice versa. Returns: The list of values in the multimap. @@ -487,6 +577,72 @@ def handler(message): request = multi_map_values_codec.encode_request(self.name) return await self._invoke(request, handler) + async def try_lock(self, key: KeyType, lease_time: float = None, timeout: float = 0) -> bool: + """Tries to acquire the lock for the specified key. + + When the lock is not available: + + - If the timeout is not provided, the current task doesn't wait and + returns ``False`` immediately. + - If the timeout is provided, the current task becomes disabled for + scheduling purposes and lies dormant until one of the + followings happens: + + - The lock is acquired by the current task, or + - The specified waiting time elapses. + + If the lease time is provided, lock will be released after this time + elapses. + + Args: + key: Key to lock in this map. + lease_time: Time in seconds to wait before releasing the lock. + timeout: Maximum time in seconds to wait for the lock. + + Returns: + ``True`` if the lock was acquired, ``False`` otherwise. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.try_lock, key, lease_time, timeout) + + request = multi_map_try_lock_codec.encode_request( + self.name, + key_data, + task_id(), + to_millis(lease_time), + to_millis(timeout), + self._reference_id_generator.get_and_increment(), + ) + return await self._invoke_on_key( + request, key_data, multi_map_try_lock_codec.decode_response + ) + + async def unlock(self, key: KeyType) -> None: + """Releases the lock for the specified key. It never blocks and + returns immediately. + + Warning: + This method uses ``__hash__`` and ``__eq__`` methods of binary form + of the key, not the actual implementations of ``__hash__`` and + ``__eq__`` defined in key's class. + + Args: + key: The key to lock. + """ + check_not_none(key, "key can't be None") + try: + key_data = self._to_data(key) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.unlock, key) + + request = multi_map_unlock_codec.encode_request( + self.name, key_data, task_id(), self._reference_id_generator.get_and_increment() + ) + return await self._invoke_on_key(request, key_data) + async def create_multi_map_proxy(service_name, name, context): return MultiMap(service_name, name, context) diff --git a/hazelcast/internal/asyncio_proxy/semaphore.py b/hazelcast/internal/asyncio_proxy/semaphore.py index 6d46a41f41..d6dcd58ea3 100644 --- a/hazelcast/internal/asyncio_proxy/semaphore.py +++ b/hazelcast/internal/asyncio_proxy/semaphore.py @@ -2,6 +2,7 @@ import uuid from hazelcast.errors import IllegalStateError, SessionExpiredError, WaitKeyCancelledError +from hazelcast.internal.asyncio_proxy.base import task_id from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy, SessionAwareCPProxy, _NO_SESSION_ID from hazelcast.protocol.codec import ( semaphore_init_codec, @@ -273,16 +274,12 @@ async def _do_change_permits(self, permits): class SessionAwareSemaphore(Semaphore, SessionAwareCPProxy): async def acquire(self, permits=1): check_true(permits > 0, "Permits must be positive") - # TODO: replace 0 with the lock context once implemented: - current_thread_id = 0 invocation_uuid = uuid.uuid4() - await self._do_acquire(current_thread_id, invocation_uuid, permits) + await self._do_acquire(task_id(), invocation_uuid, permits) async def drain_permits(self): - # TODO: replace 0 with the lock context once implemented: - current_thread_id = 0 invocation_uuid = uuid.uuid4() - return await self._do_drain(current_thread_id, invocation_uuid) + return await self._do_drain(task_id(), invocation_uuid) async def release(self, permits=1): check_true(permits > 0, "Permits must be positive") @@ -290,12 +287,10 @@ async def release(self, permits=1): if session_id == _NO_SESSION_ID: raise self._new_illegal_state_error() - # TODO: replace 0 with the lock context once implemented: - current_thread_id = 0 invocation_uuid = uuid.uuid4() try: - await self._request_release(session_id, current_thread_id, invocation_uuid, permits) + await self._request_release(session_id, task_id(), invocation_uuid, permits) except SessionExpiredError as e: self._invalidate_session(session_id) raise self._new_illegal_state_error(e) @@ -305,10 +300,8 @@ async def release(self, permits=1): async def try_acquire(self, permits=1, timeout=0): check_true(permits > 0, "Permits must be positive") timeout = max(0.0, timeout) - # TODO: replace 0 with the lock context once implemented: - current_thread_id = 0 invocation_uuid = uuid.uuid4() - return await self._do_try_acquire(current_thread_id, invocation_uuid, permits, timeout) + return await self._do_try_acquire(task_id(), invocation_uuid, permits, timeout) async def _do_acquire(self, current_thread_id, invocation_uuid, permits): async def do_acquire_once(session_id): @@ -350,13 +343,11 @@ async def do_drain_once(session_id): return await do_drain_once(session_id) async def _do_change_permits(self, delta): - # TODO: replace 0 with the lock context once implemented: - current_thread_id = 0 invocation_uuid = uuid.uuid4() async def do_change_permits_once(session_id): try: - await self._request_change(session_id, current_thread_id, invocation_uuid, delta) + await self._request_change(session_id, task_id(), invocation_uuid, delta) except SessionExpiredError as e: self._invalidate_session(session_id) raise self._new_illegal_state_error(e) diff --git a/hazelcast/proxy/map.py b/hazelcast/proxy/map.py index 5c90fe0ffe..fa1aad8dc3 100644 --- a/hazelcast/proxy/map.py +++ b/hazelcast/proxy/map.py @@ -348,7 +348,7 @@ def add_index( >>> employees = client.get_map("employees") >>> employees.add_index(attributes=["age"]) # Sorted index for range queries - >>> employees.add_index(attributes=["active"], index_type=IndexType.HASH)) # Hash index for equality predicates + >>> employees.add_index(attributes=["active"], index_type=IndexType.HASH) # Hash index for equality predicates Index attribute should either have a getter method or be public. You should also make sure to add the indexes before adding @@ -375,7 +375,7 @@ def add_index( possible values. - **unique_key_transformation** (int|str): The transformation is applied to every value extracted from the unique key - attribue. Defaults to ``OBJECT``. See the + attribute. Defaults to ``OBJECT``. See the :class:`hazelcast.config.UniqueKeyTransformation` for possible values. """ @@ -543,7 +543,7 @@ def entry_set( Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Args: predicate: Predicate for the map to filter entries. @@ -629,7 +629,7 @@ def execute_on_entries( Args: entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must - have a serializable EntryProcessor counter part registered + have a serializable EntryProcessor counterpart registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. predicate: Predicate for filtering the entries. @@ -679,7 +679,7 @@ def execute_on_key(self, key: KeyType, entry_processor: typing.Any) -> Future[ty key: Specified key for the entry to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must - have a serializable EntryProcessor counter part registered on + have a serializable EntryProcessor counterpart registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. @@ -706,7 +706,7 @@ def execute_on_keys( keys: Collection of the keys for the entries to be processed. entry_processor: A stateful serializable object which represents the EntryProcessor defined on server side. This object must - have a serializable EntryProcessor counter part registered on + have a serializable EntryProcessor counterpart registered on server side with the actual ``com.hazelcast.map.EntryProcessor`` implementation. @@ -805,7 +805,7 @@ def get_all(self, keys: typing.Sequence[KeyType]) -> Future[typing.Dict[KeyType, Warning: The returned map is NOT backed by the original map, so changes to the original map are NOT reflected in the returned map, and - vice-versa. + vice versa. Warning: This method uses ``__hash__`` and ``__eq__`` methods of binary form @@ -918,7 +918,7 @@ def key_set(self, predicate: Predicate = None) -> Future[typing.List[ValueType]] Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Args: predicate: Predicate to filter the entries. @@ -1624,7 +1624,7 @@ def values(self, predicate: Predicate = None) -> Future[typing.List[ValueType]]: Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Args: predicate: Predicate to filter the entries. diff --git a/hazelcast/proxy/multi_map.py b/hazelcast/proxy/multi_map.py index afb3fa4448..267e56f46f 100644 --- a/hazelcast/proxy/multi_map.py +++ b/hazelcast/proxy/multi_map.py @@ -214,7 +214,7 @@ def entry_set(self) -> Future[typing.List[typing.Tuple[KeyType, ValueType]]]: Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Returns: The list of key-value tuples in the multimap. @@ -238,7 +238,7 @@ def get(self, key: KeyType) -> Future[typing.Optional[typing.List[ValueType]]]: Warning: The list is NOT backed by the multimap, so changes to the map are - list reflected in the collection, and vice-versa. + list reflected in the collection, and vice versa. Args: key: The specified key. @@ -313,7 +313,7 @@ def key_set(self) -> Future[typing.List[KeyType]]: Warning: The list is NOT backed by the map, so changes to the map are NOT - reflected in the list, and vice-versa. + reflected in the list, and vice versa. Returns: A list of the clone of the keys. @@ -404,7 +404,7 @@ def remove_all(self, key: KeyType) -> Future[typing.List[ValueType]]: Warning: The returned list is NOT backed by the map, so changes to the map - are NOT reflected in the list, and vice-versa. + are NOT reflected in the list, and vice versa. Args: key: The key of the entries to remove. @@ -548,7 +548,7 @@ def values(self) -> Future[typing.List[ValueType]]: Warning: The returned list is NOT backed by the map, so changes to the map - are NOT reflected in the list, and vice-versa. + are NOT reflected in the list, and vice versa. Returns: The list of values in the multimap. diff --git a/tests/integration/asyncio/proxy/map_test.py b/tests/integration/asyncio/proxy/map_test.py index b84e2187f8..ba3b8ca811 100644 --- a/tests/integration/asyncio/proxy/map_test.py +++ b/tests/integration/asyncio/proxy/map_test.py @@ -3,6 +3,7 @@ import time import unittest +from hazelcast.errors import HazelcastError try: from hazelcast.aggregator import ( @@ -337,6 +338,18 @@ async def test_flush(self): await self.fill_map() await self.map.flush() + async def test_force_unlock(self): + await self.map.put("key", "value") + await self.map.lock("key") + + task = asyncio.create_task(self.map.force_unlock("key")) + + async def check_map_is_not_locked(): + self.assertFalse(await self.map.is_locked("key")) + + await self.assertTrueEventually(check_map_is_not_locked) + await task + async def test_get_all(self): expected = await self.fill_map(1000) actual = await self.map.get_all(list(expected.keys())) @@ -375,6 +388,14 @@ async def test_is_empty(self): await self.map.clear() self.assertTrue(await self.map.is_empty()) + async def test_is_locked(self): + await self.map.put("key", "value") + self.assertFalse(await self.map.is_locked("key")) + await self.map.lock("key") + self.assertTrue(await self.map.is_locked("key")) + await self.map.unlock("key") + self.assertFalse(await self.map.is_locked("key")) + async def test_key_set(self): keys = list((await self.fill_map()).keys()) self.assertCountEqual(await self.map.key_set(), keys) @@ -383,6 +404,11 @@ async def test_key_set_with_predicate(self): await self.fill_map() self.assertEqual(await self.map.key_set(sql("this == 'value-1'")), ["key-1"]) + async def test_lock(self): + await self.map.put("key", "value") + await asyncio.create_task(self.map.lock("key")) + self.assertFalse(await self.map.try_put("key", "new_value", timeout=0.01)) + async def test_put_all(self): m = {"key-%d" % x: "value-%d" % x for x in range(0, 1000)} await self.map.put_all(m) @@ -518,6 +544,46 @@ async def test_size(self): self.assertEqual(10, await self.map.size()) + async def test_try_lock_when_unlocked(self): + self.assertTrue(await self.map.try_lock("key")) + self.assertTrue(await self.map.is_locked("key")) + + async def test_try_lock_when_locked(self): + await asyncio.create_task(self.map.lock("key")) + self.assertFalse(await self.map.try_lock("key", timeout=0.1)) + + async def test_try_put_when_unlocked(self): + self.assertTrue(await self.map.try_put("key", "value")) + self.assertEqual(await self.map.get("key"), "value") + + async def test_try_put_when_locked(self): + await asyncio.create_task(self.map.lock("key")) + self.assertFalse(await self.map.try_put("key", "value", timeout=0.1)) + + async def test_try_remove_when_unlocked(self): + await self.map.put("key", "value") + self.assertTrue(await self.map.try_remove("key")) + self.assertIsNone(await self.map.get("key")) + + async def test_try_remove_when_locked(self): + await self.map.put("key", "value") + await asyncio.create_task(self.map.lock("key")) + self.assertFalse(await self.map.try_remove("key", timeout=0.1)) + + async def test_unlock(self): + await self.map.lock("key") + self.assertTrue(await self.map.is_locked("key")) + await self.map.unlock("key") + self.assertFalse(await self.map.is_locked("key")) + + async def test_unlock_when_no_lock(self): + try: + await self.map.unlock("key") + except HazelcastError: + pass + else: + self.fail("expected HazelcastError to be raised") + async def test_values(self): values = list((await self.fill_map()).values()) diff --git a/tests/integration/asyncio/proxy/multi_map_test.py b/tests/integration/asyncio/proxy/multi_map_test.py index d9350e2885..b37724cd8f 100644 --- a/tests/integration/asyncio/proxy/multi_map_test.py +++ b/tests/integration/asyncio/proxy/multi_map_test.py @@ -1,6 +1,7 @@ import asyncio import itertools +from hazelcast.errors import HazelcastError from hazelcast.internal.asyncio_proxy.base import EntryEventType from tests.integration.asyncio.base import SingleMemberTestCase from tests.util import ( @@ -114,10 +115,33 @@ async def test_entry_set(self): self.assertCountEqual(await self.multi_map.entry_set(), entry_list) + async def test_force_unlock(self): + await self.multi_map.put("key", "value") + await self.multi_map.lock("key") + + async def check_locked(): + self.assertFalse(await self.multi_map.is_locked("key")) + + asyncio.create_task(self.multi_map.force_unlock("key")) + await self.assertTrueEventually(check_locked) + + async def test_is_locked(self): + await self.multi_map.put("key", "value") + self.assertFalse(await self.multi_map.is_locked("key")) + await self.multi_map.lock("key") + self.assertTrue(await self.multi_map.is_locked("key")) + await self.multi_map.unlock("key") + self.assertFalse(await self.multi_map.is_locked("key")) + async def test_key_set(self): keys = list((await self.fill_map()).keys()) self.assertCountEqual(await self.multi_map.key_set(), keys) + async def test_lock(self): + await self.multi_map.put("key", "value") + await asyncio.create_task(self.multi_map.lock("key")) + self.assertFalse(await self.multi_map.try_lock("key", timeout=0.01)) + async def test_put_get(self): self.assertTrue(await self.multi_map.put("key", "value1")) self.assertTrue(await self.multi_map.put("key", "value2")) @@ -161,9 +185,30 @@ async def test_remove_entry_listener(self): async def test_size(self): await self.fill_map(5) - self.assertEqual(25, await self.multi_map.size()) + async def test_try_lock_when_unlocked(self): + self.assertTrue(await self.multi_map.try_lock("key")) + self.assertTrue(await self.multi_map.is_locked("key")) + + async def test_try_lock_when_locked(self): + await asyncio.create_task(self.multi_map.lock("key")) + self.assertFalse(await self.multi_map.try_lock("key", timeout=0.1)) + + async def test_unlock(self): + await self.multi_map.lock("key") + self.assertTrue(await self.multi_map.is_locked("key")) + await self.multi_map.unlock("key") + self.assertFalse(await self.multi_map.is_locked("key")) + + async def test_unlock_when_no_lock(self): + try: + await self.multi_map.unlock("key") + except HazelcastError: + pass + else: + self.fail("expected HazelcastError to be raised") + async def test_value_count(self): await self.fill_map(key_count=1, value_count=10) diff --git a/tests/integration/asyncio/proxy/semaphore_test.py b/tests/integration/asyncio/proxy/semaphore_test.py index 5d2e36c938..f5533f3751 100644 --- a/tests/integration/asyncio/proxy/semaphore_test.py +++ b/tests/integration/asyncio/proxy/semaphore_test.py @@ -21,7 +21,7 @@ async def asyncSetUp(self): async def asyncTearDown(self): if self.semaphore: - self.semaphore.destroy() + await self.semaphore.destroy() await super().asyncTearDown() async def test_semaphore_in_another_group(self): @@ -117,8 +117,58 @@ async def test_acquire_when_not_enough_permits(self): else: self.fail("expected DistributedObjectDestroyedError to be raised") - # TODO: Implement test_acquire_blocks_until_someone_releases after lock context is implemented - # TODO: test_acquire_blocks_until_semaphore_is_destroyed after lock context is implemented + async def test_acquire_blocks_until_someone_releases(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 1) + event = asyncio.Event() + event2 = asyncio.Event() + + async def run(): + await semaphore.acquire(1) + event.set() + await event2.wait() + await asyncio.sleep(1) + await semaphore.release() + + t = asyncio.create_task(run()) + await event.wait() + start = get_current_timestamp() + f = semaphore.acquire() + event2.set() + await f + self.assertGreaterEqual(get_current_timestamp() - start, 1) + await t + + async def test_acquire_blocks_until_semaphore_is_destroyed(self): + for semaphore_type in SEMAPHORE_TYPES: + with self.subTest(semaphore_type, semaphore_type=semaphore_type): + semaphore = await self.get_semaphore(semaphore_type, 1) + event = asyncio.Event() + event2 = asyncio.Event() + + async def run(): + await semaphore.acquire(1) + event.set() + await event2.wait() + await asyncio.sleep(1) + await semaphore.destroy() + + t = asyncio.create_task(run()) + await event.wait() + start = get_current_timestamp() + f = semaphore.acquire() + event2.set() + + try: + await f + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + self.assertGreaterEqual(get_current_timestamp() - start, 1) + await t async def test_available_permits(self): for semaphore_type in SEMAPHORE_TYPES: