From b36cdd74b9c486e359814d7fbc8200c174102976 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Wed, 13 May 2026 09:10:21 +0300 Subject: [PATCH 1/3] added cancel test --- tests/integration/asyncio/cancel_test.py | 34 ++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 tests/integration/asyncio/cancel_test.py diff --git a/tests/integration/asyncio/cancel_test.py b/tests/integration/asyncio/cancel_test.py new file mode 100644 index 0000000000..56e03f6ca6 --- /dev/null +++ b/tests/integration/asyncio/cancel_test.py @@ -0,0 +1,34 @@ +import asyncio + +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import random_string + + +class MapTest(SingleMemberTestCase): + + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.map = await self.client.get_map(random_string()) + + async def asyncTearDown(self): + await self.map.destroy() + await super().asyncTearDown() + + async def test_cancel(self): + self.skipTest("broken") + async def keep_setting(): + for i in range(1000): + print(i) + await self.map.set("foo", i) + await asyncio.sleep(0) + + task = asyncio.create_task(keep_setting()) + await asyncio.sleep(0.01) + task.cancel() + value = await self.map.get("foo") + self.assertGreater(value, 0) From 9073478b30bc9f23813472510827adf836f00e82 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 15 Jun 2026 05:24:01 +0300 Subject: [PATCH 2/3] Cancellation handling and tests --- hazelcast/internal/asyncio_invocation.py | 17 ++++++------ hazelcast/serialization/service.py | 8 +++--- tests/integration/asyncio/cancel_test.py | 34 ++++++++++++++++++++++-- 3 files changed, 45 insertions(+), 14 deletions(-) diff --git a/hazelcast/internal/asyncio_invocation.py b/hazelcast/internal/asyncio_invocation.py index f7effc6e5b..56c16c612f 100644 --- a/hazelcast/internal/asyncio_invocation.py +++ b/hazelcast/internal/asyncio_invocation.py @@ -253,14 +253,15 @@ def _send(self, invocation, connection): return True def _complete(self, invocation: Invocation, client_message: InboundMessage) -> None: - try: - result = invocation.response_handler(client_message) - invocation.future.set_result(result) - except SchemaNotFoundError as e: - self._fetch_schema_and_complete_again(e, invocation, client_message) - return - except Exception as e: - invocation.future.set_exception(e) + if not invocation.future.cancelled(): + try: + result = invocation.response_handler(client_message) + invocation.future.set_result(result) + except SchemaNotFoundError as e: + self._fetch_schema_and_complete_again(e, invocation, client_message) + return + except Exception as e: + invocation.future.set_exception(e) correlation_id = invocation.request.get_correlation_id() self._pending.pop(correlation_id, None) diff --git a/hazelcast/serialization/service.py b/hazelcast/serialization/service.py index dd6a3c7118..7913b7e2dd 100644 --- a/hazelcast/serialization/service.py +++ b/hazelcast/serialization/service.py @@ -141,7 +141,7 @@ def to_data(self, obj, partitioning_strategy=None): out.write_int_big_endian(serializer.get_type_id()) serializer.write(out, obj) return Data(out.to_byte_array()) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def to_object(self, data): @@ -169,7 +169,7 @@ def to_object(self, data): else: raise HazelcastInstanceNotActiveError() return serializer.read(inp) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def write_object(self, out, obj): @@ -181,7 +181,7 @@ def write_object(self, out, obj): serializer = self._registry.serializer_for(obj) out.write_int(serializer.get_type_id()) serializer.write(out, obj) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def read_object(self, inp): @@ -196,7 +196,7 @@ def read_object(self, inp): else: raise HazelcastInstanceNotActiveError() return serializer.read(inp) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def _calculate_partitioning_hash(self, obj, partitioning_strategy): diff --git a/tests/integration/asyncio/cancel_test.py b/tests/integration/asyncio/cancel_test.py index 56e03f6ca6..8dce67dbc2 100644 --- a/tests/integration/asyncio/cancel_test.py +++ b/tests/integration/asyncio/cancel_test.py @@ -1,4 +1,5 @@ import asyncio +from asyncio import CancelledError from tests.integration.asyncio.base import SingleMemberTestCase from tests.util import random_string @@ -19,8 +20,23 @@ async def asyncTearDown(self): await self.map.destroy() await super().asyncTearDown() + async def test_operation_after_cancel(self): + key = "k1" + await self.map.set(key, "v1") + task = asyncio.create_task(self.map.get(key)) + task.cancel() + + try: + await task + except CancelledError: + pass + else: + self.fail("expected CancelledError to be raised") + + value = await self.map.get(key) + self.assertEqual("v1", value) + async def test_cancel(self): - self.skipTest("broken") async def keep_setting(): for i in range(1000): print(i) @@ -28,7 +44,21 @@ async def keep_setting(): await asyncio.sleep(0) task = asyncio.create_task(keep_setting()) - await asyncio.sleep(0.01) + await asyncio.sleep(0.1) task.cancel() value = await self.map.get("foo") self.assertGreater(value, 0) + + async def test_timeout(self): + async def keep_setting(): + for i in range(1000): + print(i) + await self.map.set("foo", i) + await asyncio.sleep(0) + + try: + await asyncio.wait_for(keep_setting(), 0.1) + except TimeoutError: + pass + value = await self.map.get("foo") + self.assertGreater(value, 0) From b19f20a84f1c663f6d61d58be24d424499a2dcc8 Mon Sep 17 00:00:00 2001 From: Yuce Tekol Date: Mon, 15 Jun 2026 09:40:40 +0300 Subject: [PATCH 3/3] black --- tests/integration/asyncio/cancel_test.py | 1 - 1 file changed, 1 deletion(-) diff --git a/tests/integration/asyncio/cancel_test.py b/tests/integration/asyncio/cancel_test.py index 8dce67dbc2..f4be9be2cf 100644 --- a/tests/integration/asyncio/cancel_test.py +++ b/tests/integration/asyncio/cancel_test.py @@ -6,7 +6,6 @@ class MapTest(SingleMemberTestCase): - @classmethod def configure_client(cls, config): config["cluster_name"] = cls.cluster.id