diff --git a/hazelcast/asyncio/__init__.py b/hazelcast/asyncio/__init__.py index 05b4bd8c15..ba675e0c2e 100644 --- a/hazelcast/asyncio/__init__.py +++ b/hazelcast/asyncio/__init__.py @@ -1,4 +1,6 @@ __all__ = [ + "AtomicLong", + "CPSubsystem", "EntryEventCallable", "Executor", "HazelcastClient", @@ -27,3 +29,5 @@ from hazelcast.internal.asyncio_proxy.set import Set from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.internal.asyncio_proxy.reliable_topic import ReliableTopic, ReliableMessageListener +from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem +from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong diff --git a/hazelcast/internal/asyncio_client.py b/hazelcast/internal/asyncio_client.py index 3ecc98a725..c266a42a59 100644 --- a/hazelcast/internal/asyncio_client.py +++ b/hazelcast/internal/asyncio_client.py @@ -11,6 +11,7 @@ from hazelcast.discovery import HazelcastCloudAddressProvider from hazelcast.errors import IllegalStateError, InvalidConfigurationError from hazelcast.internal.asyncio_invocation import InvocationService, Invocation +from hazelcast.internal.asyncio_proxy.cp_manager import CPSubsystem from hazelcast.internal.asyncio_proxy.pn_counter import PNCounter from hazelcast.internal.asyncio_proxy.vector_collection import VectorCollection from hazelcast.internal.asyncio_sql import _InternalSqlService, SqlService @@ -207,6 +208,7 @@ def __init__(self, config: Config | None = None, **kwargs): self._compact_schema_service, ) self._proxy_manager = ProxyManager(self._context) + self._cp_subsystem = CPSubsystem(self._context) self._lock_reference_id_generator = AtomicInteger(1) self._statistics = Statistics( self, @@ -528,6 +530,11 @@ def sql(self) -> SqlService: """Returns a service to execute distributed SQL queries.""" return self._sql_service + @property + def cp_subsystem(self) -> CPSubsystem: + """CP Subsystem offers set of in-memory linearizable data structures.""" + return self._cp_subsystem + def _create_address_provider(self): config = self._config cluster_members = config.cluster_members diff --git a/hazelcast/internal/asyncio_proxy/atomic_long.py b/hazelcast/internal/asyncio_proxy/atomic_long.py new file mode 100644 index 0000000000..7bf8190ddc --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/atomic_long.py @@ -0,0 +1,255 @@ +import typing + +from hazelcast.internal.asyncio_proxy.cp import BaseCPProxy +from hazelcast.protocol.codec import ( + atomic_long_add_and_get_codec, + atomic_long_compare_and_set_codec, + atomic_long_get_codec, + atomic_long_get_and_add_codec, + atomic_long_get_and_set_codec, + atomic_long_alter_codec, + atomic_long_apply_codec, +) +from hazelcast.serialization.compact import SchemaNotReplicatedError +from hazelcast.util import check_is_int, check_not_none + + +class AtomicLong(BaseCPProxy): + """AtomicLong is a redundant and highly available distributed counter + for 64-bit integers (``long`` type in Java). + + It works on top of the Raft consensus algorithm. It offers linearizability + during crash failures and network partitions. It is CP with respect to + the CAP principle. If a network partition occurs, it remains available + on at most one side of the partition. + + AtomicLong implementation does not offer exactly-once / effectively-once + execution semantics. It goes with at-least-once execution semantics + by default and can cause an API call to be committed multiple times + in case of CP member failures. It can be tuned to offer at-most-once + execution semantics. Please see `fail-on-indeterminate-operation-state` + server-side setting. + """ + + async def add_and_get(self, delta: int) -> int: + """Atomically adds the given value to the current value. + + Args: + delta: The value to add to the current value. + + Returns: + The updated value, the given value added to the current value. + """ + check_is_int(delta) + codec = atomic_long_add_and_get_codec + request = codec.encode_request(self._group_id, self._object_name, delta) + return await self._ainvoke(request, codec.decode_response) + + async def compare_and_set(self, expect: int, update: int) -> bool: + """Atomically sets the value to the given updated value + only if the current value equals the expected value. + + Args: + expect: The expected value. + update: The new value. + + Returns: + ``True`` if successful; or ``False`` if the actual value was not + equal to the expected value. + """ + check_is_int(expect) + check_is_int(update) + codec = atomic_long_compare_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, expect, update) + return await self._ainvoke(request, codec.decode_response) + + async def decrement_and_get(self) -> int: + """Atomically decrements the current value by one. + + Returns: + The updated value, the current value decremented by one. + """ + return await self.add_and_get(-1) + + async def get_and_decrement(self) -> int: + """Atomically decrements the current value by one. + + Returns: + The old value. + """ + return await self.get_and_add(-1) + + async def get(self) -> int: + """Gets the current value. + + Returns: + The current value. + """ + codec = atomic_long_get_codec + request = codec.encode_request(self._group_id, self._object_name) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_add(self, delta: int) -> int: + """Atomically adds the given value to the current value. + + Args: + delta: The value to add to the current value. + + Returns: + The old value before the add. + """ + check_is_int(delta) + codec = atomic_long_get_and_add_codec + request = codec.encode_request(self._group_id, self._object_name, delta) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_set(self, new_value: int) -> int: + """Atomically sets the given value and returns the old value. + + Args: + new_value: The new value. + + Returns: + The old value. + """ + check_is_int(new_value) + codec = atomic_long_get_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value) + return await self._ainvoke(request, codec.decode_response) + + async def increment_and_get(self) -> int: + """Atomically increments the current value by one. + + Returns: + The updated value, the current value incremented by one. + """ + return await self.add_and_get(1) + + async def get_and_increment(self) -> int: + """Atomically increments the current value by one. + + Returns: + The old value. + """ + return await self.get_and_add(1) + + async def set(self, new_value: int) -> None: + """Atomically sets the given value. + + Args: + new_value: The new value + """ + check_is_int(new_value) + codec = atomic_long_get_and_set_codec + request = codec.encode_request(self._group_id, self._object_name, new_value) + return await self._ainvoke(request) + + async def alter(self, function: typing.Any) -> None: + """Alters the currently stored value by applying a function on it. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter, function) + + codec = atomic_long_alter_codec + # 1 means return the new value. + # There is no way to tell server to return nothing as of now (30.09.2020) + # The new value is `long` (comes with the initial frame) and we + # don't try to decode it. So, this shouldn't cause any problems. + request = codec.encode_request(self._group_id, self._object_name, function_data, 1) + return await self._ainvoke(request) + + async def alter_and_get(self, function: typing.Any) -> int: + """Alters the currently stored value by applying a function on it and + gets the result. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + + Returns: + The new value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.alter_and_get, function) + + codec = atomic_long_alter_codec + # 1 means return the new value. + request = codec.encode_request(self._group_id, self._object_name, function_data, 1) + return await self._ainvoke(request, codec.decode_response) + + async def get_and_alter(self, function: typing.Any) -> int: + """Alters the currently stored value by applying a function on it and + gets the old value. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function that alters the currently stored value. + + Returns: + The old value. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.get_and_alter, function) + + codec = atomic_long_alter_codec + # 0 means return the old value. + request = codec.encode_request(self._group_id, self._object_name, function_data, 0) + return await self._ainvoke(request, codec.decode_response) + + async def apply(self, function: typing.Any) -> typing.Any: + """Applies a function on the value, the actual stored value will not + change. + + Notes: + ``function`` must be an instance of Hazelcast serializable type. + It must have a counterpart registered in the server-side that + implements the ``com.hazelcast.core.IFunction`` interface with + the actual logic of the function to be applied. + + Args: + function: The function applied to the currently stored value. + + Returns: + The result of the function application. + """ + check_not_none(function, "Function cannot be None") + try: + function_data = self._to_data(function) + except SchemaNotReplicatedError as e: + return await self._send_schema_and_retry(e, self.apply, function) + + codec = atomic_long_apply_codec + request = codec.encode_request(self._group_id, self._object_name, function_data) + + def handler(response): + return self._to_object(codec.decode_response(response)) + + return await self._ainvoke(request, handler) diff --git a/hazelcast/internal/asyncio_proxy/cp.py b/hazelcast/internal/asyncio_proxy/cp.py new file mode 100644 index 0000000000..333500a641 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/cp.py @@ -0,0 +1,34 @@ +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.protocol.codec import cp_group_destroy_cp_object_codec + + +def _no_op_response_handler(_): + return None + + +class BaseCPProxy: + def __init__(self, context, group_id, service_name, proxy_name, object_name): + self._group_id = group_id + self._service_name = service_name + self._proxy_name = proxy_name + self._object_name = object_name + self._invocation_service = context.invocation_service + serialization_service = context.serialization_service + self._to_data = serialization_service.to_data + self._to_object = serialization_service.to_object + self._send_schema_and_retry = context.compact_schema_service.send_schema_and_retry + + async def destroy(self) -> None: + """Destroys this proxy.""" + codec = cp_group_destroy_cp_object_codec + request = codec.encode_request(self._group_id, self._service_name, self._object_name) + return await self._ainvoke(request) + + def _invoke(self, request, response_handler=_no_op_response_handler): + invocation = Invocation(request, response_handler=response_handler) + self._invocation_service.invoke(invocation) + return invocation.future + + async def _ainvoke(self, request, response_handler=_no_op_response_handler): + fut = self._invoke(request, response_handler) + return await fut diff --git a/hazelcast/internal/asyncio_proxy/cp_manager.py b/hazelcast/internal/asyncio_proxy/cp_manager.py new file mode 100644 index 0000000000..dea872be06 --- /dev/null +++ b/hazelcast/internal/asyncio_proxy/cp_manager.py @@ -0,0 +1,79 @@ +from hazelcast.cp import ( + _without_default_group_name, + _get_object_name_for_proxy, + ATOMIC_LONG_SERVICE, +) +from hazelcast.internal.asyncio_invocation import Invocation +from hazelcast.internal.asyncio_proxy.atomic_long import AtomicLong +from hazelcast.protocol.codec import cp_group_create_cp_group_codec + + +class CPSubsystem: + """CP Subsystem is a component of Hazelcast that builds a strongly + consistent layer for a set of distributed data structures. + + Its APIs can be used for implementing distributed coordination use cases, + such as leader election, distributed locking, synchronization, and metadata + management. + + Its data structures are CP with respect to the CAP principle, i.e., they + always maintain linearizability and prefer consistency to availability + during network partitions. Besides network partitions, CP Subsystem + withstands server and client failures. + + Data structures in CP Subsystem run in CP groups. Each CP group elects + its own Raft leader and runs the Raft consensus algorithm independently. + + The CP data structures differ from the other Hazelcast data structures + in two aspects. First, an internal commit is performed on the METADATA CP + group every time you fetch a proxy from this interface. Hence, callers + should cache returned proxy objects. Second, if you call ``destroy()`` + on a CP data structure proxy, that data structure is terminated on the + underlying CP group and cannot be reinitialized until the CP group is + force-destroyed. For this reason, please make sure that you are completely + done with a CP data structure before destroying its proxy. + """ + + def __init__(self, context): + self._proxy_manager = CPProxyManager(context) + + async def get_atomic_long(self, name: str) -> AtomicLong: + """Returns the distributed AtomicLong instance with given name. + + The instance is created on CP Subsystem. + + If no group name is given within the ``name`` argument, then the + AtomicLong instance will be created on the default CP group. + If a group name is given, like ``.get_atomic_long("myLong@group1")``, + the given group will be initialized first, if not initialized + already, and then the instance will be created on this group. + + Args: + name: Name of the AtomicLong. + + Returns: + The AtomicLong proxy for the given name. + """ + return await self._proxy_manager.get_or_create(ATOMIC_LONG_SERVICE, name) + + +class CPProxyManager: + def __init__(self, context): + self._context = context + + async def get_or_create(self, service_name, proxy_name): + proxy_name = _without_default_group_name(proxy_name) + object_name = _get_object_name_for_proxy(proxy_name) + + group_id = await self._get_group_id(proxy_name) + if service_name == ATOMIC_LONG_SERVICE: + return AtomicLong(self._context, group_id, service_name, proxy_name, object_name) + + raise ValueError("Unknown service name: %s" % service_name) + + async def _get_group_id(self, proxy_name): + codec = cp_group_create_cp_group_codec + request = codec.encode_request(proxy_name) + invocation = Invocation(request, response_handler=codec.decode_response) + invocation_service = self._context.invocation_service + return await invocation_service.ainvoke(invocation) diff --git a/tests/integration/asyncio/base.py b/tests/integration/asyncio/base.py index 55c92e1b50..8fbb2a87d3 100644 --- a/tests/integration/asyncio/base.py +++ b/tests/integration/asyncio/base.py @@ -1,5 +1,6 @@ import asyncio import logging +import os import unittest from typing import Awaitable @@ -121,3 +122,38 @@ async def asyncSetUp(self): async def asyncTearDown(self): await self.client.shutdown() + + +class CPTestCase(unittest.IsolatedAsyncioTestCase, HazelcastTestCase): + + rc = None + cluster = None + client = None + + @classmethod + def setUpClass(cls): + cls.rc = cls.create_rc() + cls.cluster = cls.create_cluster(cls.rc, cls.configure_cluster()) + cls.cluster.start_member() + cls.cluster.start_member() + cls.cluster.start_member() + + @classmethod + def tearDownClass(cls): + cls.rc.terminateCluster(cls.cluster.id) + cls.rc.exit() + + @classmethod + def configure_cluster(cls): + path = os.path.abspath(__file__) + dir_path = os.path.dirname(path) + with open( + os.path.join(dir_path, "../backward_compatible/proxy/cp/hazelcast_cpsubsystem.xml") + ) as f: + return f.read() + + async def asyncSetUp(self): + self.client = await HazelcastClient.create_and_start(cluster_name=self.cluster.id) + + async def asyncTearDown(self): + await self.client.shutdown() diff --git a/tests/integration/asyncio/proxy/atomic_long_test.py b/tests/integration/asyncio/proxy/atomic_long_test.py new file mode 100644 index 0000000000..5acb0df721 --- /dev/null +++ b/tests/integration/asyncio/proxy/atomic_long_test.py @@ -0,0 +1,148 @@ +import pytest + +from hazelcast.errors import DistributedObjectDestroyedError +from hazelcast.serialization.api import IdentifiedDataSerializable +from tests.integration.asyncio.base import CPTestCase +from tests.util import skip_if_server_version_older_than + + +class Multiplication(IdentifiedDataSerializable): + def __init__(self, multiplier): + self.multiplier = multiplier + + def write_data(self, object_data_output): + object_data_output.write_long(self.multiplier) + + def read_data(self, object_data_input): + pass + + def get_factory_id(self): + return 66 + + def get_class_id(self): + return 16 + + +@pytest.mark.enterprise +class AtomicLongTest(CPTestCase): + async def asyncSetUp(self): + await super().asyncSetUp() + self.atomic_long = await self.client.cp_subsystem.get_atomic_long("long") + + async def asyncTearDown(self): + await self.atomic_long.set(0) + await super().asyncTearDown() + + async def test_atomic_long_in_another_group(self): + another_long = await self.client.cp_subsystem.get_atomic_long("long@mygroup") + self.assertEqual(1, await another_long.increment_and_get()) + # the following value has to be 0, + # as `along` belongs to the default CP group + self.assertEqual(0, await self.atomic_long.get()) + + async def test_use_after_destroy(self): + another_long = await self.client.cp_subsystem.get_atomic_long("another-long") + await another_long.destroy() + # the next destroy call should be ignored + await another_long.destroy() + + try: + await another_long.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + another_long2 = await self.client.cp_subsystem.get_atomic_long("another-long") + try: + await another_long2.get() + except DistributedObjectDestroyedError: + pass + else: + self.fail("expected DistributedObjectDestroyedError to be raised") + + async def test_initial_value(self): + self.assertEqual(0, await self.atomic_long.get()) + + async def test_add_and_get(self): + self.assertEqual(33, await self.atomic_long.add_and_get(33)) + self.assertEqual(33, await self.atomic_long.get()) + + async def test_compare_and_set_when_condition_is_met(self): + self.assertTrue(await self.atomic_long.compare_and_set(0, 23)) + self.assertEqual(23, await self.atomic_long.get()) + + async def test_compare_and_set_when_condition_is_not_met(self): + self.assertFalse(await self.atomic_long.compare_and_set(1, 23)) + self.assertEqual(0, await self.atomic_long.get()) + + async def test_decrement_and_get(self): + self.assertEqual(-1, await self.atomic_long.decrement_and_get()) + self.assertEqual(-2, await self.atomic_long.decrement_and_get()) + self.assertEqual(-2, await self.atomic_long.get()) + + async def test_get_and_decrement(self): + self.assertEqual(0, await self.atomic_long.get_and_decrement()) + self.assertEqual(-1, await self.atomic_long.get_and_decrement()) + self.assertEqual(-2, await self.atomic_long.get()) + + async def test_get(self): + self.assertEqual(0, await self.atomic_long.get()) + await self.atomic_long.set(11) + self.assertEqual(11, await self.atomic_long.get()) + long_max = 2**63 - 1 + await self.atomic_long.set(long_max) + self.assertEqual(long_max, await self.atomic_long.get()) + long_min = -(2**63) + await self.atomic_long.set(long_min) + self.assertEqual(long_min, await self.atomic_long.get()) + + async def test_get_and_add(self): + self.assertEqual(0, await self.atomic_long.get_and_add(-100)) + self.assertEqual(-100, await self.atomic_long.get()) + + async def test_get_and_set(self): + self.assertEqual(0, await self.atomic_long.get_and_set(123)) + self.assertEqual(123, await self.atomic_long.get()) + + async def test_increment_and_get(self): + self.assertEqual(1, await self.atomic_long.increment_and_get()) + self.assertEqual(2, await self.atomic_long.increment_and_get()) + self.assertEqual(2, await self.atomic_long.get()) + + async def test_get_and_increment(self): + self.assertEqual(0, await self.atomic_long.get_and_increment()) + self.assertEqual(1, await self.atomic_long.get_and_increment()) + self.assertEqual(2, await self.atomic_long.get()) + + async def test_set(self): + self.assertIsNone(await self.atomic_long.set(42)) + self.assertEqual(42, await self.atomic_long.get()) + + async def test_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(2) + self.assertIsNone(await self.atomic_long.alter(Multiplication(5))) + self.assertEqual(10, await self.atomic_long.get()) + + async def test_alter_and_get(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(-3) + self.assertEqual(-9, await self.atomic_long.alter_and_get(Multiplication(3))) + self.assertEqual(-9, await self.atomic_long.get()) + + async def test_get_and_alter(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(123) + self.assertEqual(123, await self.atomic_long.get_and_alter(Multiplication(-1))) + self.assertEqual(-123, await self.atomic_long.get()) + + async def test_apply(self): + # the class is defined in the 4.1 JAR + skip_if_server_version_older_than(self, self.client, "4.1") + await self.atomic_long.set(42) + self.assertEqual(84, await self.atomic_long.apply(Multiplication(2))) + self.assertEqual(42, await self.atomic_long.get())