Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions hazelcast/internal/asyncio_proxy/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,3 +248,15 @@ def get_entry_listener_flags(**kwargs):
if value:
flags |= getattr(EntryEventType, key)
return flags


def task_id():
# Builtin id function returns an integer which is guaranteed to be unique among existing objects.
# The returned id is derived from the memory location of the object in CPython (the only Python implementation we officially support).
# See: https://docs.python.org/3/library/functions.html#id
# The address space limit for 64bit systems is 52bits (AMD64) to 56bits (ARM64), so the id fits into a long comfortably:
# See: https://en.wikipedia.org/wiki/64-bit_computing#Limits_of_processors
# Since the task itself is an object, the id can be used as a pseudo-id for the task.
# When the task ends, the id can be assigned to another task, but that's not an issue.
# Since the task id is used to distinguish between running tasks. --YT
return id(asyncio.current_task())
4 changes: 2 additions & 2 deletions hazelcast/internal/asyncio_proxy/cp.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
CPGroupDestroyedError,
)
from hazelcast.internal.asyncio_invocation import Invocation
from hazelcast.internal.asyncio_proxy.base import task_id
from hazelcast.protocol import RaftGroupId
from hazelcast.protocol.codec import (
cp_group_destroy_cp_object_codec,
Expand Down Expand Up @@ -115,8 +116,7 @@ async def get_or_create_unique_thread_id(self, group_id):
if self._shutdown:
raise HazelcastClientNotActiveError("Session manager is already shut down!")

# TODO: replace 0 with the lock context once implemented
key = (group_id, 0)
key = (group_id, task_id())
global_thread_id = self._thread_ids.get(key)
if global_thread_id:
return global_thread_id
Expand Down
232 changes: 202 additions & 30 deletions hazelcast/internal/asyncio_proxy/map.py

Large diffs are not rendered by default.

196 changes: 176 additions & 20 deletions hazelcast/internal/asyncio_proxy/multi_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,25 @@
multi_map_size_codec,
multi_map_value_count_codec,
multi_map_values_codec,
multi_map_is_locked_codec,
multi_map_force_unlock_codec,
multi_map_lock_codec,
multi_map_try_lock_codec,
multi_map_unlock_codec,
)
from hazelcast.internal.asyncio_proxy.base import Proxy, EntryEvent, EntryEventType
from hazelcast.internal.asyncio_proxy.base import Proxy, EntryEvent, EntryEventType, task_id
from hazelcast.types import ValueType, KeyType
from hazelcast.serialization.data import Data
from hazelcast.serialization.compact import SchemaNotReplicatedError
from hazelcast.util import (
check_not_none,
deserialize_list_in_place,
deserialize_entry_list_in_place,
to_millis,
)

EntryEventCallable = typing.Callable[[EntryEvent[KeyType, ValueType]], None]

default_thread_id = 0


class MultiMap(Proxy, typing.Generic[KeyType, ValueType]):
"""A specialized map whose keys can be associated with multiple values.
Expand All @@ -46,11 +50,12 @@ class MultiMap(Proxy, typing.Generic[KeyType, ValueType]):
>>> print("get", await my_map.get("key"))

Warning:
Asyncio client multi map proxy is not thread-safe, do not access it from other threads.
Asyncio client multimap proxy is not thread-safe, do not access it from other threads.
"""

def __init__(self, service_name, name, context):
super(MultiMap, self).__init__(service_name, name, context)
self._reference_id_generator = context.lock_reference_id_generator

async def add_entry_listener(
self,
Expand Down Expand Up @@ -158,9 +163,7 @@ async def contains_key(self, key: KeyType) -> bool:
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.contains_key, key)

request = multi_map_contains_key_codec.encode_request(
self.name, key_data, default_thread_id
)
request = multi_map_contains_key_codec.encode_request(self.name, key_data, task_id())
return await self._invoke_on_key(
request, key_data, multi_map_contains_key_codec.decode_response
)
Expand Down Expand Up @@ -205,7 +208,7 @@ async def contains_entry(self, key: KeyType, value: ValueType) -> bool:
return await self._send_schema_and_retry(e, self.contains_entry, key, value)

request = multi_map_contains_entry_codec.encode_request(
self.name, key_data, value_data, default_thread_id
self.name, key_data, value_data, task_id()
)
return await self._invoke_on_key(
request, key_data, multi_map_contains_entry_codec.decode_response
Expand All @@ -221,7 +224,7 @@ async def entry_set(self) -> typing.List[typing.Tuple[KeyType, ValueType]]:

Warning:
The list is NOT backed by the map, so changes to the map are NOT
reflected in the list, and vice-versa.
reflected in the list, and vice versa.

Returns:
The list of key-value tuples in the multimap.
Expand All @@ -245,7 +248,7 @@ async def get(self, key: KeyType) -> typing.Optional[typing.List[ValueType]]:

Warning:
The list is NOT backed by the multimap, so changes to the map are
not reflected in the collection, and vice-versa.
not reflected in the collection, and vice versa.

Args:
key: The specified key.
Expand All @@ -263,15 +266,66 @@ def handler(message):
data_list = multi_map_get_codec.decode_response(message)
return deserialize_list_in_place(data_list, self._to_object)

request = multi_map_get_codec.encode_request(self.name, key_data, default_thread_id)
request = multi_map_get_codec.encode_request(self.name, key_data, task_id())
return await self._invoke_on_key(request, key_data, handler)

async def is_locked(self, key: KeyType) -> bool:
"""Checks the lock for the specified key.

Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.

Args:
key: The key that is checked for lock.

Returns:
``True`` if lock is acquired, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.is_locked, key)

request = multi_map_is_locked_codec.encode_request(self.name, key_data)
return await self._invoke_on_key(
request, key_data, multi_map_is_locked_codec.decode_response
)

async def force_unlock(self, key: KeyType) -> None:
"""Releases the lock for the specified key regardless of the lock
owner.

It always successfully unlocks the key, never blocks, and returns
immediately.

Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.

Args:
key: The key to lock.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.force_unlock, key)

request = multi_map_force_unlock_codec.encode_request(
self.name, key_data, self._reference_id_generator.get_and_increment()
)
return await self._invoke_on_key(request, key_data)

async def key_set(self) -> typing.List[KeyType]:
"""Returns the list of keys in the multimap.

Warning:
The list is NOT backed by the map, so changes to the map are NOT
reflected in the list, and vice-versa.
reflected in the list, and vice versa.

Returns:
A list of the clone of the keys.
Expand All @@ -284,6 +338,44 @@ def handler(message):
request = multi_map_key_set_codec.encode_request(self.name)
return await self._invoke(request, handler)

async def lock(self, key: KeyType, lease_time: float = None) -> None:
"""Acquires the lock for the specified key infinitely or for the
specified lease time if provided.

If the lock is not available, the current task becomes disabled for
scheduling purposes and lies dormant until the lock has been
acquired.

Scope of the lock is this map only. Acquired lock is only for the key
in this map.

Locks are re-entrant; so, if the key is locked N times, it should be
unlocked N times before another thread can acquire it.

Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.

Args:
key: The key to lock.
lease_time: Time in seconds to wait before releasing the lock.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.lock, key, lease_time)

request = multi_map_lock_codec.encode_request(
self.name,
key_data,
task_id(),
to_millis(lease_time),
self._reference_id_generator.get_and_increment(),
)
return await self._invoke_on_key(request, key_data)

async def remove(self, key: KeyType, value: ValueType) -> bool:
"""Removes the given key-value tuple from the multimap.

Expand All @@ -309,7 +401,7 @@ async def remove(self, key: KeyType, value: ValueType) -> bool:
return await self._send_schema_and_retry(e, self.remove, key, value)

request = multi_map_remove_entry_codec.encode_request(
self.name, key_data, value_data, default_thread_id
self.name, key_data, value_data, task_id()
)
return await self._invoke_on_key(
request, key_data, multi_map_remove_entry_codec.decode_response
Expand All @@ -326,7 +418,7 @@ async def remove_all(self, key: KeyType) -> typing.List[ValueType]:

Warning:
The returned list is NOT backed by the map, so changes to the map
are NOT reflected in the list, and vice-versa.
are NOT reflected in the list, and vice versa.

Args:
key: The key of the entries to remove.
Expand All @@ -345,7 +437,7 @@ def handler(message):
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.remove_all, key)

request = multi_map_remove_codec.encode_request(self.name, key_data, default_thread_id)
request = multi_map_remove_codec.encode_request(self.name, key_data, task_id())
return await self._invoke_on_key(request, key_data, handler)

async def put(self, key: KeyType, value: ValueType) -> bool:
Expand All @@ -372,9 +464,7 @@ async def put(self, key: KeyType, value: ValueType) -> bool:
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.put, key, value)

request = multi_map_put_codec.encode_request(
self.name, key_data, value_data, default_thread_id
)
request = multi_map_put_codec.encode_request(self.name, key_data, value_data, task_id())
return await self._invoke_on_key(request, key_data, multi_map_put_codec.decode_response)

async def put_all(self, multimap: typing.Dict[KeyType, typing.Sequence[ValueType]]) -> None:
Expand Down Expand Up @@ -464,7 +554,7 @@ async def value_count(self, key: KeyType) -> int:
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.value_count, key)

request = multi_map_value_count_codec.encode_request(self.name, key_data, default_thread_id)
request = multi_map_value_count_codec.encode_request(self.name, key_data, task_id())
return await self._invoke_on_key(
request, key_data, multi_map_value_count_codec.decode_response
)
Expand All @@ -474,7 +564,7 @@ async def values(self) -> typing.List[ValueType]:

Warning:
The returned list is NOT backed by the map, so changes to the map
are NOT reflected in the list, and vice-versa.
are NOT reflected in the list, and vice versa.

Returns:
The list of values in the multimap.
Expand All @@ -487,6 +577,72 @@ def handler(message):
request = multi_map_values_codec.encode_request(self.name)
return await self._invoke(request, handler)

async def try_lock(self, key: KeyType, lease_time: float = None, timeout: float = 0) -> bool:
"""Tries to acquire the lock for the specified key.

When the lock is not available:

- If the timeout is not provided, the current task doesn't wait and
returns ``False`` immediately.
- If the timeout is provided, the current task becomes disabled for
scheduling purposes and lies dormant until one of the
followings happens:

- The lock is acquired by the current task, or
- The specified waiting time elapses.

If the lease time is provided, lock will be released after this time
elapses.

Args:
key: Key to lock in this map.
lease_time: Time in seconds to wait before releasing the lock.
timeout: Maximum time in seconds to wait for the lock.

Returns:
``True`` if the lock was acquired, ``False`` otherwise.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.try_lock, key, lease_time, timeout)

request = multi_map_try_lock_codec.encode_request(
self.name,
key_data,
task_id(),
to_millis(lease_time),
to_millis(timeout),
self._reference_id_generator.get_and_increment(),
)
return await self._invoke_on_key(
request, key_data, multi_map_try_lock_codec.decode_response
)

async def unlock(self, key: KeyType) -> None:
"""Releases the lock for the specified key. It never blocks and
returns immediately.

Warning:
This method uses ``__hash__`` and ``__eq__`` methods of binary form
of the key, not the actual implementations of ``__hash__`` and
``__eq__`` defined in key's class.

Args:
key: The key to lock.
"""
check_not_none(key, "key can't be None")
try:
key_data = self._to_data(key)
except SchemaNotReplicatedError as e:
return await self._send_schema_and_retry(e, self.unlock, key)

request = multi_map_unlock_codec.encode_request(
self.name, key_data, task_id(), self._reference_id_generator.get_and_increment()
)
return await self._invoke_on_key(request, key_data)


async def create_multi_map_proxy(service_name, name, context):
return MultiMap(service_name, name, context)
Loading
Loading