diff --git a/hazelcast/internal/asyncio_invocation.py b/hazelcast/internal/asyncio_invocation.py index f7effc6e5b..56c16c612f 100644 --- a/hazelcast/internal/asyncio_invocation.py +++ b/hazelcast/internal/asyncio_invocation.py @@ -253,14 +253,15 @@ def _send(self, invocation, connection): return True def _complete(self, invocation: Invocation, client_message: InboundMessage) -> None: - try: - result = invocation.response_handler(client_message) - invocation.future.set_result(result) - except SchemaNotFoundError as e: - self._fetch_schema_and_complete_again(e, invocation, client_message) - return - except Exception as e: - invocation.future.set_exception(e) + if not invocation.future.cancelled(): + try: + result = invocation.response_handler(client_message) + invocation.future.set_result(result) + except SchemaNotFoundError as e: + self._fetch_schema_and_complete_again(e, invocation, client_message) + return + except Exception as e: + invocation.future.set_exception(e) correlation_id = invocation.request.get_correlation_id() self._pending.pop(correlation_id, None) diff --git a/hazelcast/serialization/service.py b/hazelcast/serialization/service.py index dd6a3c7118..7913b7e2dd 100644 --- a/hazelcast/serialization/service.py +++ b/hazelcast/serialization/service.py @@ -141,7 +141,7 @@ def to_data(self, obj, partitioning_strategy=None): out.write_int_big_endian(serializer.get_type_id()) serializer.write(out, obj) return Data(out.to_byte_array()) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def to_object(self, data): @@ -169,7 +169,7 @@ def to_object(self, data): else: raise HazelcastInstanceNotActiveError() return serializer.read(inp) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def write_object(self, out, obj): @@ -181,7 +181,7 @@ def write_object(self, out, obj): serializer = self._registry.serializer_for(obj) out.write_int(serializer.get_type_id()) serializer.write(out, obj) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def read_object(self, inp): @@ -196,7 +196,7 @@ def read_object(self, inp): else: raise HazelcastInstanceNotActiveError() return serializer.read(inp) - except: + except Exception: handle_exception(sys.exc_info()[1], sys.exc_info()[2]) def _calculate_partitioning_hash(self, obj, partitioning_strategy): diff --git a/tests/integration/asyncio/cancel_test.py b/tests/integration/asyncio/cancel_test.py new file mode 100644 index 0000000000..f4be9be2cf --- /dev/null +++ b/tests/integration/asyncio/cancel_test.py @@ -0,0 +1,63 @@ +import asyncio +from asyncio import CancelledError + +from tests.integration.asyncio.base import SingleMemberTestCase +from tests.util import random_string + + +class MapTest(SingleMemberTestCase): + @classmethod + def configure_client(cls, config): + config["cluster_name"] = cls.cluster.id + return config + + async def asyncSetUp(self): + await super().asyncSetUp() + self.map = await self.client.get_map(random_string()) + + async def asyncTearDown(self): + await self.map.destroy() + await super().asyncTearDown() + + async def test_operation_after_cancel(self): + key = "k1" + await self.map.set(key, "v1") + task = asyncio.create_task(self.map.get(key)) + task.cancel() + + try: + await task + except CancelledError: + pass + else: + self.fail("expected CancelledError to be raised") + + value = await self.map.get(key) + self.assertEqual("v1", value) + + async def test_cancel(self): + async def keep_setting(): + for i in range(1000): + print(i) + await self.map.set("foo", i) + await asyncio.sleep(0) + + task = asyncio.create_task(keep_setting()) + await asyncio.sleep(0.1) + task.cancel() + value = await self.map.get("foo") + self.assertGreater(value, 0) + + async def test_timeout(self): + async def keep_setting(): + for i in range(1000): + print(i) + await self.map.set("foo", i) + await asyncio.sleep(0) + + try: + await asyncio.wait_for(keep_setting(), 0.1) + except TimeoutError: + pass + value = await self.map.get("foo") + self.assertGreater(value, 0)