Skip to content

Hawk-API/hawkapi-celery

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

hawkapi-celery

Celery integration for HawkAPI. Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.

Install

pip install hawkapi-celery
pip install 'hawkapi-celery[redis]'         # adds redis client

Quickstart

from hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
    CeleryConfig, bind_context, get_celery, init_celery, task,
)


celery_app: Celery  # populated below


def make_app() -> HawkAPI:
    app = HawkAPI()
    global celery_app
    celery_app = init_celery(
        app,
        config=CeleryConfig(
            broker_url="redis://localhost:6379/0",
            result_backend="redis://localhost:6379/0",
        ),
    )


    @task(celery_app, name="emails.send")
    async def send_email(to: str, subject: str, body: str) -> None:
        ...  # any await-able send logic


    @app.post("/notify")
    async def notify(email: str, c: Celery = Depends(get_celery)):
        with bind_context(request_id="…"):
            send_email.delay(email, "Welcome", "Hello!")
        return {"ok": True}

    return app

Tasks

from hawkapi_celery import task

@task(celery_app, name="myapp.work", queue="default",
      autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int:        # async def — runs on a private event loop
    ...
    return x * 2


@task(celery_app, bind=True)
def slow_work(self, payload):          # sync — bound `self` for retry handling
    try:
        do_thing(payload)
    except TransientError as exc:
        raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))

Beat (periodic tasks)

from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every

add_periodic(celery_app, "cleanup",
             Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))

add_periodic(celery_app, "nightly_report",
             Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
                      kwargs={"date": "yesterday"}))

Context propagation

bind_context() carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call current_context() to read it back.

from hawkapi_celery import bind_context, current_context

@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
    ctx = current_context()                  # {"request_id": "…", "user_id": "…"}
    log.info("event", **ctx, **payload)


@app.post("/event")
async def post_event(p: Payload):
    with bind_context(request_id=p.request_id, user_id=p.user_id):
        log_event.delay(p.model_dump())

Wired automatically by init_celery(..., propagate_context=True) (default).

Healthchecks

from hawkapi_celery import healthcheck


@app.get("/healthz")
async def healthz():
    report = healthcheck(celery_app, timeout=2.0)
    return {
        "broker": report.broker_ok,
        "workers_alive": report.workers_alive,
        "workers": list(report.workers),
    }

Testing

from hawkapi_celery import eager_mode, record_tasks


def test_signup_enqueues_welcome_email(client, celery_app):
    with record_tasks(celery_app) as recorder:
        client.post("/signup", json={"email": "x@y.z"})
    assert any(t.name == "emails.send" for t in recorder.captured)


def test_signup_runs_welcome_email_inline(client, celery_app):
    with eager_mode(celery_app):
        client.post("/signup", json={"email": "x@y.z"})
    # All tasks executed synchronously in-process — assert their side-effects directly.

CeleryConfig

CeleryConfig(
    broker_url="redis://localhost:6379/0",
    result_backend="redis://localhost:6379/0",
    task_serializer="json",
    timezone="UTC",
    task_time_limit=600,
    task_soft_time_limit=540,
    worker_prefetch_multiplier=1,
    worker_max_tasks_per_child=1000,
    task_default_queue="default",
    extra_kwargs={...},          # forwarded to celery.conf.update
)

Development

git clone https://github.com/Hawk-API/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/

License

MIT.

About

Celery integration for HawkAPI — async tasks, beat scheduler, context propagation, healthchecks, eager-mode test fixtures

Topics

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages