Celery integration for HawkAPI. Async tasks, beat scheduler, request-context propagation, broker/worker healthchecks, and eager-mode fixtures for tests.
pip install hawkapi-celery
pip install 'hawkapi-celery[redis]' # adds redis clientfrom hawkapi import Depends, HawkAPI
from celery import Celery
from hawkapi_celery import (
CeleryConfig, bind_context, get_celery, init_celery, task,
)
celery_app: Celery # populated below
def make_app() -> HawkAPI:
app = HawkAPI()
global celery_app
celery_app = init_celery(
app,
config=CeleryConfig(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
),
)
@task(celery_app, name="emails.send")
async def send_email(to: str, subject: str, body: str) -> None:
... # any await-able send logic
@app.post("/notify")
async def notify(email: str, c: Celery = Depends(get_celery)):
with bind_context(request_id="…"):
send_email.delay(email, "Welcome", "Hello!")
return {"ok": True}
return appfrom hawkapi_celery import task
@task(celery_app, name="myapp.work", queue="default",
autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=5)
async def work(x: int) -> int: # async def — runs on a private event loop
...
return x * 2
@task(celery_app, bind=True)
def slow_work(self, payload): # sync — bound `self` for retry handling
try:
do_thing(payload)
except TransientError as exc:
raise self.retry(exc=exc, countdown=compute_backoff(self.request.retries))from datetime import timedelta
from hawkapi_celery import Periodic, add_periodic, crontab, every
add_periodic(celery_app, "cleanup",
Periodic(task="myapp.cleanup", schedule=every(timedelta(hours=1))))
add_periodic(celery_app, "nightly_report",
Periodic(task="myapp.report", schedule=crontab(hour=2, minute=0),
kwargs={"date": "yesterday"}))bind_context() carries a dict from the HTTP handler to the worker process via the task headers. Inside the task call current_context() to read it back.
from hawkapi_celery import bind_context, current_context
@task(celery_app, name="log.event")
def log_event(payload: dict) -> None:
ctx = current_context() # {"request_id": "…", "user_id": "…"}
log.info("event", **ctx, **payload)
@app.post("/event")
async def post_event(p: Payload):
with bind_context(request_id=p.request_id, user_id=p.user_id):
log_event.delay(p.model_dump())Wired automatically by init_celery(..., propagate_context=True) (default).
from hawkapi_celery import healthcheck
@app.get("/healthz")
async def healthz():
report = healthcheck(celery_app, timeout=2.0)
return {
"broker": report.broker_ok,
"workers_alive": report.workers_alive,
"workers": list(report.workers),
}from hawkapi_celery import eager_mode, record_tasks
def test_signup_enqueues_welcome_email(client, celery_app):
with record_tasks(celery_app) as recorder:
client.post("/signup", json={"email": "x@y.z"})
assert any(t.name == "emails.send" for t in recorder.captured)
def test_signup_runs_welcome_email_inline(client, celery_app):
with eager_mode(celery_app):
client.post("/signup", json={"email": "x@y.z"})
# All tasks executed synchronously in-process — assert their side-effects directly.CeleryConfig(
broker_url="redis://localhost:6379/0",
result_backend="redis://localhost:6379/0",
task_serializer="json",
timezone="UTC",
task_time_limit=600,
task_soft_time_limit=540,
worker_prefetch_multiplier=1,
worker_max_tasks_per_child=1000,
task_default_queue="default",
extra_kwargs={...}, # forwarded to celery.conf.update
)git clone https://github.com/Hawk-API/hawkapi-celery.git
cd hawkapi-celery
uv sync --extra dev
uv run pytest -q
uv run ruff check . && uv run ruff format --check .
uv run pyright src/MIT.