diff --git a/pulsar/asyncio.py b/pulsar/asyncio.py index a429cf8..7fa7c3d 100644 --- a/pulsar/asyncio.py +++ b/pulsar/asyncio.py @@ -777,6 +777,16 @@ async def subscribe(self, topic: Union[str, List[str]], schema.attach_client(self._client) return Consumer(await future, schema) + def shutdown(self) -> None: + """ + Shutdown the client and all the associated producers and consumers + + Raises + ------ + PulsarException + """ + self._client.shutdown() + async def get_topic_partitions(self, topic: str) -> List[str]: """ Get the list of partitions for a given topic in asynchronous mode. diff --git a/tests/asyncio_test.py b/tests/asyncio_test.py index 32371cb..b6df63a 100644 --- a/tests/asyncio_test.py +++ b/tests/asyncio_test.py @@ -227,6 +227,17 @@ async def test_producer_is_connected(self): await producer.close() self.assertFalse(producer.is_connected()) + async def test_shutdown_client(self): + producer = await self._client.create_producer("persistent://public/default/partitioned_topic_name_test") + await producer.send(b"hello") + self._client.shutdown() + + try: + await producer.send(b"hello") + self.fail("Expected AlreadyClosed exception after client shutdown") + except PulsarException as e: + self.assertEqual(e.error(), pulsar.Result.AlreadyClosed) + async def _prepare_messages(self, producer: Producer) -> List[pulsar.MessageId]: msg_ids = [] for i in range(5):