From 58791d9a7586ebf558ea4211ad811f674e9310d7 Mon Sep 17 00:00:00 2001 From: nachatz Date: Wed, 28 Jan 2026 16:22:08 -0800 Subject: [PATCH 1/3] feat: add shutdown support to async client --- pulsar/asyncio.py | 10 ++++++++++ tests/asyncio_test.py | 12 ++++++++++++ 2 files changed, 22 insertions(+) diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index a1ca0c00..7f01bcb7 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -775,6 +775,16 @@ async def subscribe(self, topic: Union[str, List[str]], schema.attach_client(self._client) return Consumer(await future, schema) + + def shutdown(self) -> None: + """ + Shutdown the client and all the associated producers and consumers + + Raises + ------ + PulsarException + """ + self._client.shutdown() async def close(self) -> None: """ diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 44408091..54d57dbe 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -164,6 +164,18 @@ async def test_producer_is_connected(self): await producer.close() self.assertFalse(producer.is_connected()) + async def test_shutdown_client(self): + producer = await self._client.create_producer("persistent://public/default/partitioned_topic_name_test") + await producer.send(b"hello") + self._client.shutdown() + + try: + await producer.send(b"hello") + self.assertTrue(False) + except PulsarException: + # Expected + pass + async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: msg_ids = [] for i in range(5): From 7a6b34bf5d1f5f778d7cc24c5f466206fa02a3d9 Mon Sep 17 00:00:00 2001 From: nachatz Date: Sun, 1 Feb 2026 19:30:26 -0800 Subject: [PATCH 2/3] refactor: utilize fail for better semantics --- tests/asyncio_test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 54d57dbe..1d12a0f9 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -171,7 +171,7 @@ async def test_shutdown_client(self): try: await producer.send(b"hello") - self.assertTrue(False) + self.fail("Expected PulsarException after client shutdown") except PulsarException: # Expected pass From 79f5416b48aeb973bc235403e7202ad315e5e3a7 Mon Sep 17 00:00:00 2001 From: nachatz Date: Sun, 1 Feb 2026 19:45:09 -0800 Subject: [PATCH 3/3] test: update exception check --- tests/asyncio_test.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 1d12a0f9..363c77ce 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -171,10 +171,9 @@ async def test_shutdown_client(self): try: await producer.send(b"hello") - self.fail("Expected PulsarException after client shutdown") - except PulsarException: - # Expected - pass + self.fail("Expected AlreadyClosed exception after client shutdown") + except PulsarException as e: + self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: msg_ids = []