Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 9 additions & 8 deletions hazelcast/internal/asyncio_invocation.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,14 +253,15 @@ def _send(self, invocation, connection):
return True

def _complete(self, invocation: Invocation, client_message: InboundMessage) -> None:
try:
result = invocation.response_handler(client_message)
invocation.future.set_result(result)
except SchemaNotFoundError as e:
self._fetch_schema_and_complete_again(e, invocation, client_message)
return
except Exception as e:
invocation.future.set_exception(e)
if not invocation.future.cancelled():
try:
result = invocation.response_handler(client_message)
invocation.future.set_result(result)
except SchemaNotFoundError as e:
self._fetch_schema_and_complete_again(e, invocation, client_message)
return
except Exception as e:
invocation.future.set_exception(e)

correlation_id = invocation.request.get_correlation_id()
self._pending.pop(correlation_id, None)
Expand Down
8 changes: 4 additions & 4 deletions hazelcast/serialization/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ def to_data(self, obj, partitioning_strategy=None):
out.write_int_big_endian(serializer.get_type_id())
serializer.write(out, obj)
return Data(out.to_byte_array())
except:
except Exception:
handle_exception(sys.exc_info()[1], sys.exc_info()[2])

def to_object(self, data):
Expand Down Expand Up @@ -169,7 +169,7 @@ def to_object(self, data):
else:
raise HazelcastInstanceNotActiveError()
return serializer.read(inp)
except:
except Exception:
handle_exception(sys.exc_info()[1], sys.exc_info()[2])

def write_object(self, out, obj):
Expand All @@ -181,7 +181,7 @@ def write_object(self, out, obj):
serializer = self._registry.serializer_for(obj)
out.write_int(serializer.get_type_id())
serializer.write(out, obj)
except:
except Exception:
handle_exception(sys.exc_info()[1], sys.exc_info()[2])

def read_object(self, inp):
Expand All @@ -196,7 +196,7 @@ def read_object(self, inp):
else:
raise HazelcastInstanceNotActiveError()
return serializer.read(inp)
except:
except Exception:
handle_exception(sys.exc_info()[1], sys.exc_info()[2])

def _calculate_partitioning_hash(self, obj, partitioning_strategy):
Expand Down
63 changes: 63 additions & 0 deletions tests/integration/asyncio/cancel_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
import asyncio
from asyncio import CancelledError

from tests.integration.asyncio.base import SingleMemberTestCase
from tests.util import random_string


class MapTest(SingleMemberTestCase):
@classmethod
def configure_client(cls, config):
config["cluster_name"] = cls.cluster.id
return config

async def asyncSetUp(self):
await super().asyncSetUp()
self.map = await self.client.get_map(random_string())

async def asyncTearDown(self):
await self.map.destroy()
await super().asyncTearDown()

async def test_operation_after_cancel(self):
key = "k1"
await self.map.set(key, "v1")
task = asyncio.create_task(self.map.get(key))
task.cancel()

try:
await task
except CancelledError:
pass
else:
self.fail("expected CancelledError to be raised")

value = await self.map.get(key)
self.assertEqual("v1", value)

async def test_cancel(self):
async def keep_setting():
for i in range(1000):
print(i)
await self.map.set("foo", i)
await asyncio.sleep(0)

task = asyncio.create_task(keep_setting())
await asyncio.sleep(0.1)
task.cancel()
value = await self.map.get("foo")
self.assertGreater(value, 0)

async def test_timeout(self):
async def keep_setting():
for i in range(1000):
print(i)
await self.map.set("foo", i)
await asyncio.sleep(0)

try:
await asyncio.wait_for(keep_setting(), 0.1)
except TimeoutError:
pass
value = await self.map.get("foo")
self.assertGreater(value, 0)
Loading