Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 55 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,65 @@ broker.configure_producer(request_timeout_ms=100000)
broker.configure_consumer(group_id="the best group ever.")
```

## Multiple topics

By default `AioKafkaBroker` sends all tasks to `kafka_topic`.
You can also configure the broker to listen to multiple topics and bind
different tasks to different default topics.

```python
from taskiq_aio_kafka import AioKafkaBroker
from taskiq_aio_kafka.topic import Topic

broker = AioKafkaBroker(
bootstrap_servers="localhost",
kafka_topic="default-topic",
kafka_topics=[
Topic("emails"),
Topic("reports"),
],
)


@broker.task_with_topic("emails")
async def send_email(user_id: int) -> None:
print(f"Send email to {user_id}")


@broker.task_with_topic("reports")
async def build_report(report_id: int) -> None:
print(f"Build report {report_id}")
```

In this example the worker listens to `default-topic`, `emails`, and `reports`.
When you call `send_email.kiq(...)`, the task is sent to `emails` by default.
When you call `build_report.kiq(...)`, the task is sent to `reports` by default.

You can override a task topic for a single kick with `kicker().with_topic(...)`:

```python
await send_email.kicker().with_topic("reports").kiq(user_id=1)
```

Tasks without a custom topic keep the old behavior and are sent to `kafka_topic`.
The regular `@broker.task` decorator keeps the standard taskiq labels behavior.

```python
@broker.task
async def regular_task() -> None:
print("This task goes to default-topic.")


await regular_task.kiq()
```

## Configuration

AioKafkaBroker parameters:
* `bootstrap_servers` - url to kafka nodes. Can be either string or list of strings.
* `kafka_topic` - custom topic in kafka.
* `kafka_topic` - default topic in kafka.
* `kafka_topics` - additional topics that worker should listen to.
* `result_backend` - custom result backend.
* `task_id_generator` - custom task_id genertaor.
* `kafka_admin_client` - custom `kafka` admin client.
* `delete_topic_on_shutdown` - flag to delete topic on broker shutdown.
* `delete_topic_on_shutdown` - flag to delete topics on broker shutdown.
4 changes: 2 additions & 2 deletions taskiq_aio_kafka/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
"""Taskiq integration with aiokafka."""

from taskiq_aio_kafka.broker import AioKafkaBroker
__all__ = ("AioKafkaBroker",)

__all__ = ["AioKafkaBroker"]
from taskiq_aio_kafka.broker import AioKafkaBroker
193 changes: 162 additions & 31 deletions taskiq_aio_kafka/broker.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,29 @@
__all__ = ("AioKafkaBroker",)

import asyncio
from collections.abc import AsyncGenerator, Callable
from collections.abc import AsyncGenerator, Callable, Iterable
from logging import getLogger
from typing import Any, TypeVar
from typing import Any, TypeVar, overload

from aiokafka import AIOKafkaConsumer, AIOKafkaProducer
from kafka.admin import KafkaAdminClient, NewTopic
from kafka.coordinator.assignors.roundrobin import RoundRobinPartitionAssignor
from kafka.partitioner.default import DefaultPartitioner
from taskiq import AsyncResultBackend, BrokerMessage
from taskiq.abc.broker import AsyncBroker
from typing_extensions import ParamSpec

from taskiq_aio_kafka.exceptions import WrongAioKafkaBrokerParametersError
from taskiq_aio_kafka.models import KafkaConsumerParameters, KafkaProducerParameters
from .constants import TASK_TOPIC_LABEL
from .decorated_task import AioKafkaDecoratedTask
from .exceptions import WrongAioKafkaBrokerParametersError
from .models import KafkaConsumerParameters, KafkaProducerParameters
from .topic import Topic
from .types import TopicType
from .utils import get_topic_name

_T = TypeVar("_T")
_FuncParams = ParamSpec("_FuncParams")
_ReturnType = TypeVar("_ReturnType")


logger = getLogger("taskiq.kafka_broker")
Expand All @@ -22,10 +32,13 @@
class AioKafkaBroker(AsyncBroker):
"""Broker that works with Kafka."""

task_topic_label = TASK_TOPIC_LABEL

def __init__( # noqa: PLR0913
self,
bootstrap_servers: str | list[str] | None,
kafka_topic: NewTopic | None = None,
kafka_topic: TopicType | None = None,
kafka_topics: Iterable[TopicType] | None = None,
result_backend: AsyncResultBackend[_T] | None = None,
task_id_generator: Callable[[], str] | None = None,
kafka_admin_client: KafkaAdminClient | None = None,
Expand All @@ -35,7 +48,8 @@ def __init__( # noqa: PLR0913
"""Construct a new broker.

:param bootstrap_servers: string with url to kafka or list with urls.
:param kafka_topic: kafka topic.
:param kafka_topic: default kafka topic.
:param kafka_topics: all kafka topics to listen.
:param result_backend: custom result backend.
:param task_id_generator: custom task_id generator.
:param kafka_admin_client: configured KafkaAdminClient.
Expand All @@ -46,6 +60,7 @@ def __init__( # noqa: PLR0913
aiokafka_consumer were specified but bootstrap_servers wasn't specified.
"""
super().__init__(result_backend, task_id_generator)
self.decorator_class = AioKafkaDecoratedTask

if kafka_admin_client and not bootstrap_servers:
raise WrongAioKafkaBrokerParametersError
Expand All @@ -54,11 +69,16 @@ def __init__( # noqa: PLR0913

self._loop: asyncio.AbstractEventLoop | None = loop

self._kafka_topic: NewTopic = kafka_topic or NewTopic(
name="taskiq_topic",
num_partitions=1,
replication_factor=1,
)
self._kafka_topic: NewTopic = self._normalize_default_topic(kafka_topic)
self._kafka_topics: dict[str, TopicType] = {
self._kafka_topic.name: self._kafka_topic,
}
if kafka_topics is not None:
for topic in kafka_topics:
self._kafka_topics.setdefault(
get_topic_name(topic),
topic,
)

self._aiokafka_producer_params: KafkaProducerParameters = (
KafkaProducerParameters()
Expand All @@ -83,6 +103,44 @@ def __init__( # noqa: PLR0913
self._is_producer_started = False
self._is_consumer_started = False

@classmethod
def _normalize_default_topic(
cls,
kafka_topic: TopicType | None,
) -> NewTopic:
if kafka_topic is None:
return NewTopic(
name="taskiq_topic",
num_partitions=1,
replication_factor=1,
)
if isinstance(kafka_topic, str):
return NewTopic(
name=kafka_topic,
num_partitions=1,
replication_factor=1,
)
if isinstance(kafka_topic, Topic):
if kafka_topic.topic_config.declare:
return kafka_topic.new_topic()
return NewTopic(
name=kafka_topic.name,
num_partitions=1,
replication_factor=1,
)
return kafka_topic

@classmethod
def _get_declaration_topic(
cls,
topic: TopicType,
) -> NewTopic | None:
if isinstance(topic, NewTopic):
return topic
if isinstance(topic, Topic) and topic.topic_config.declare:
return topic.new_topic()
return None

def configure_producer(self, **producer_parameters: Any) -> None:
"""Configure kafka producer.

Expand All @@ -107,21 +165,93 @@ def configure_consumer(self, **consumer_parameters: Any) -> None:
**consumer_parameters,
)

@overload
def task(
self,
task_name: Callable[_FuncParams, _ReturnType],
**labels: Any,
) -> AioKafkaDecoratedTask[_FuncParams, _ReturnType]: ...

@overload
def task(
self,
task_name: str | None = None,
**labels: Any,
) -> Callable[
[Callable[_FuncParams, _ReturnType]],
AioKafkaDecoratedTask[_FuncParams, _ReturnType],
]: ...

def task(
self,
task_name: str | Callable[..., Any] | None = None,
**labels: Any,
) -> Any:
"""Decorate function."""
if callable(task_name):
return super().task(task_name, **labels)

return super().task(
task_name=task_name,
**labels,
)

@overload
def task_with_topic(
self,
topic: TopicType,
task_name: Callable[_FuncParams, _ReturnType],
**labels: Any,
) -> AioKafkaDecoratedTask[_FuncParams, _ReturnType]: ...

@overload
def task_with_topic(
self,
topic: TopicType,
task_name: str | None = None,
**labels: Any,
) -> Callable[
[Callable[_FuncParams, _ReturnType]],
AioKafkaDecoratedTask[_FuncParams, _ReturnType],
]: ...

def task_with_topic(
self,
topic: TopicType,
task_name: str | Callable[..., Any] | None = None,
**labels: Any,
) -> Any:
"""Decorate function and bind it to a kafka topic by default."""
topic_name = get_topic_name(topic)
self._kafka_topics.setdefault(topic_name, topic)
labels[self.task_topic_label] = topic_name

if callable(task_name):
return super().task(task_name, **labels)

return super().task(
task_name=task_name,
**labels,
)

async def startup(self) -> None:
"""Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.

We will have 2 topics for default and high priority.

Also we need to create AIOKafkaProducer and AIOKafkaConsumer
if there are no producer and consumer passed.
"""
await super().startup()
available_condition: bool = (
self._kafka_topic.name not in self._kafka_admin_client.list_topics()
)
if available_condition:
existed_topic_names = set(self._kafka_admin_client.list_topics())

new_topics = []
for topic in self._kafka_topics.values():
new_topic = self._get_declaration_topic(topic)
if new_topic is not None and new_topic.name not in existed_topic_names:
new_topics.append(new_topic)

if new_topics:
self._kafka_admin_client.create_topics(
new_topics=[self._kafka_topic],
new_topics=new_topics,
validate_only=False,
)

Expand All @@ -145,7 +275,7 @@ async def startup(self) -> None:
partition_assignment_strategy
)
self._aiokafka_consumer = AIOKafkaConsumer(
self._kafka_topic.name,
*self._kafka_topics,
bootstrap_servers=self._bootstrap_servers,
loop=self._loop,
**consumer_kwargs,
Expand All @@ -166,18 +296,16 @@ async def shutdown(self) -> None:
if self._is_consumer_started:
await self._aiokafka_consumer.stop()

topic_delete_condition: bool = all(
(
self._delete_topic_on_shutdown,
self._kafka_topic.name in self._kafka_admin_client.list_topics(),
),
)

if self._kafka_admin_client:
if topic_delete_condition:
self._kafka_admin_client.delete_topics(
[self._kafka_topic.name],
)
if self._delete_topic_on_shutdown:
existed_topic_names = set(self._kafka_admin_client.list_topics())
topic_names = [
topic_name
for topic_name in self._kafka_topics
if topic_name in existed_topic_names
]
if topic_names:
self._kafka_admin_client.delete_topics(topic_names)
self._kafka_admin_client.close()

async def kick(self, message: BrokerMessage) -> None:
Expand All @@ -194,7 +322,10 @@ async def kick(self, message: BrokerMessage) -> None:
if not self._is_producer_started:
raise ValueError("Please run startup before kicking.")

topic_name: str = self._kafka_topic.name
topic_name: str = message.labels.get(
self.task_topic_label,
self._kafka_topic.name,
)

await self._aiokafka_producer.send(
topic=topic_name,
Expand Down
3 changes: 3 additions & 0 deletions taskiq_aio_kafka/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
__all__ = ("TASK_TOPIC_LABEL",)

TASK_TOPIC_LABEL = "taskiq_aio_kafka_topic"
24 changes: 24 additions & 0 deletions taskiq_aio_kafka/decorated_task.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
__all__ = ("AioKafkaDecoratedTask",)

from typing import TypeVar

from taskiq.decor import AsyncTaskiqDecoratedTask
from typing_extensions import ParamSpec

from .kicker import AioKafkaKicker

_FuncParams = ParamSpec("_FuncParams")
_ReturnType = TypeVar("_ReturnType")


class AioKafkaDecoratedTask(AsyncTaskiqDecoratedTask[_FuncParams, _ReturnType]):
"""Taskiq decorated task with kafka-specific kicker."""

def kicker(self) -> AioKafkaKicker[_FuncParams, _ReturnType]:
"""Return kafka-aware kicker."""
return AioKafkaKicker(
task_name=self.task_name,
broker=self.broker,
labels=self.labels,
return_type=self.return_type,
)
Loading
Loading