From 216486e0506e394a09ec14be18e1a85ee61ccc6a Mon Sep 17 00:00:00 2001 From: Sagar Ghimire Date: Wed, 27 May 2026 15:02:40 +0545 Subject: [PATCH 1/4] =?UTF-8?q?Docs=20(openapi):=20enrich=20/docs=20?= =?UTF-8?q?=E2=80=94=20metadata,=20Authorize=20button,=20param=20+=20error?= =?UTF-8?q?=20docs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The interactive docs were running on FastAPI defaults. This fills them in: App metadata: title, version (from package metadata), summary, a markdown description (envelope shape, auth model, streaming/pagination), tag descriptions, and contact — so the /docs landing page is no longer bare. Security: declare the `Authorization` header as an APIKeyHeader security scheme instead of a raw Header param. Swagger now shows an 'Authorize' button (set the token once) and the header stops leaking as a parameter on every operation. Params + examples: every request field gets a description; query params on datastore_search (filters/q/sort/fields/…) are documented; create/upsert/delete bodies carry worked examples so 'Try it out' is pre-filled. Errors: add an ErrorEnvelope model and attach the real CKAN 4xx/5xx envelope (400/403/404/409/500) to every action via a shared ERROR_RESPONSES; document /ready's 503 and dump's 302; strip FastAPI's misleading auto-422 (validation errors are remapped to a 400 envelope). Human operation summaries replace the machine-derived ones (e.g. 'Datastore Search Sql'). Also: set a dummy CKAN_URL in conftest so `create_app()` builds under the default ckan auth in CI (same import-time fix as the schema-in-OPTIONS PR; harmless when both land). Co-Authored-By: Claude Opus 4.7 (1M context) --- datastore/api/context.py | 20 ++- datastore/api/endpoints/datastore.py | 44 ++++- datastore/api/endpoints/dump.py | 17 +- datastore/api/endpoints/health.py | 15 +- datastore/api/responses.py | 13 ++ datastore/main.py | 92 +++++++++- datastore/schemas/request.py | 252 ++++++++++++++++++++++----- datastore/schemas/responses.py | 33 ++++ tests/conftest.py | 5 + 9 files changed, 434 insertions(+), 57 deletions(-) diff --git a/datastore/api/context.py b/datastore/api/context.py index 0882806..60e55b3 100644 --- a/datastore/api/context.py +++ b/datastore/api/context.py @@ -3,7 +3,8 @@ from dataclasses import dataclass, field from typing import Annotated, Any -from fastapi import Depends, Header +from fastapi import Depends +from fastapi.security import APIKeyHeader from starlette.requests import Request from datastore.api import auth as auth_fns @@ -15,6 +16,21 @@ ConfigDep = Annotated[Config, Depends(get_config)] +# Declared as a security scheme (not a plain `Header`) so OpenAPI renders +# an "Authorize" button and the token is set once for every operation +# instead of leaking as a parameter on each. `auto_error=False` keeps the +# header optional — `anonymous` auth needs no token. +_auth_header = APIKeyHeader( + name="Authorization", + scheme_name="Authorization", + auto_error=False, + description=( + "API token, per the active `AUTH_TYPE`: a CKAN API key (`ckan`) or a " + "signed JWT (`jwt`). Accepts a raw token or `Bearer `. Not " + "required under `anonymous`." + ), +) + def get_ckan_client(request: Request) -> CKANClient | None: """CKAN client installed by the app lifespan in `request.app.state.ckan`. @@ -74,7 +90,7 @@ def get_context( config: ConfigDep, ckan: Annotated[CKANClient | None, Depends(get_ckan_client)], provider: Annotated[AuthProvider, Depends(get_auth_provider)], - authorization: Annotated[str | None, Header(alias="Authorization")] = None, + authorization: Annotated[str | None, Depends(_auth_header)] = None, ) -> RequestContext: api_key = parse_authorization_header(authorization) return RequestContext( diff --git a/datastore/api/endpoints/datastore.py b/datastore/api/endpoints/datastore.py index eca9dc0..57449f4 100644 --- a/datastore/api/endpoints/datastore.py +++ b/datastore/api/endpoints/datastore.py @@ -7,7 +7,11 @@ from starlette.responses import StreamingResponse from datastore.api.context import Context -from datastore.api.responses import _deprecation_warnings, _success_response +from datastore.api.responses import ( + ERROR_RESPONSES, + _deprecation_warnings, + _success_response, +) from datastore.core.exceptions import ValidationError from datastore.schemas.request import ( DatastoreCreateRequest, @@ -35,10 +39,14 @@ upsert_datastore, ) -router = APIRouter(tags=["datastore"]) +router = APIRouter(tags=["datastore"], responses=ERROR_RESPONSES) -@router.post("/datastore_create", response_model=DatastoreCreateResponse) +@router.post( + "/datastore_create", + response_model=DatastoreCreateResponse, + summary="Declare a resource (and optionally seed rows)", +) async def datastore_create( request: Request, payload: DatastoreCreateRequest, @@ -79,7 +87,11 @@ async def datastore_create( return _success_response(request, result, warnings=warnings or None) -@router.post("/datastore_upsert", response_model=DatastoreUpsertResponse) +@router.post( + "/datastore_upsert", + response_model=DatastoreUpsertResponse, + summary="Insert / update / upsert rows", +) async def datastore_upsert( request: Request, payload: DatastoreUpsertRequest, @@ -95,7 +107,11 @@ async def datastore_upsert( return _success_response(request, result) -@router.get("/datastore_search", response_model=DatastoreSearchResponse) +@router.get( + "/datastore_search", + response_model=DatastoreSearchResponse, + summary="Search a resource (streaming)", +) async def datastore_search( request: Request, context: Context, @@ -119,7 +135,11 @@ async def datastore_search( return StreamingResponse(body_iter, media_type="application/json") -@router.get("/datastore_search_sql", response_model=DatastoreSearchResponse) +@router.get( + "/datastore_search_sql", + response_model=DatastoreSearchResponse, + summary="Run a read-only SQL SELECT (streaming)", +) async def datastore_search_sql( request: Request, context: Context, @@ -141,7 +161,11 @@ async def datastore_search_sql( return StreamingResponse(body_iter, media_type="application/json") -@router.get("/datastore_info", response_model=DatastoreInfoResponse) +@router.get( + "/datastore_info", + response_model=DatastoreInfoResponse, + summary="Get a resource's schema + row stats", +) async def datastore_info( request: Request, context: Context, @@ -163,7 +187,11 @@ async def datastore_info( return _success_response(request, result) -@router.post("/datastore_delete", response_model=DatastoreDeleteResponse) +@router.post( + "/datastore_delete", + response_model=DatastoreDeleteResponse, + summary="Delete rows, drop columns, or drop the table", +) async def datastore_delete( request: Request, payload: DatastoreDeleteRequest, diff --git a/datastore/api/endpoints/dump.py b/datastore/api/endpoints/dump.py index 1da8eec..df4c6b8 100644 --- a/datastore/api/endpoints/dump.py +++ b/datastore/api/endpoints/dump.py @@ -21,6 +21,7 @@ from starlette.responses import RedirectResponse, StreamingResponse from datastore.api.context import Context +from datastore.api.responses import ERROR_RESPONSES from datastore.infrastructure.engines import get_datastore_engine from datastore.services.dump import stream_csv_shards, stream_ndjson_shards @@ -32,15 +33,27 @@ "parquet": "application/vnd.apache.parquet", } -router = APIRouter(tags=["dump"]) +router = APIRouter(tags=["dump"], responses=ERROR_RESPONSES) -@router.get("/datastore/dump/{resource_id}") +@router.get( + "/datastore/dump/{resource_id}", + summary="Download a whole resource", + responses={ + 302: {"description": "Single-shard export — redirect to a signed GCS URL."}, + 200: {"description": "Multi-shard export — streamed CSV / NDJSON body."}, + }, +) async def dump( context: Context, resource_id: str, fmt: Annotated[DumpFormat, Query(alias="format")] = "csv", ): + """Download an entire resource as `csv` (default), `ndjson`, or `parquet`. + + Small exports redirect (302) straight to a signed GCS URL; large ones + stream a concatenated body. Select the format with `?format=`. + """ await context.authorize(resource_id=resource_id, permission="read") engine = get_datastore_engine(context, mode="ro") diff --git a/datastore/api/endpoints/health.py b/datastore/api/endpoints/health.py index 6142fae..f5fe277 100644 --- a/datastore/api/endpoints/health.py +++ b/datastore/api/endpoints/health.py @@ -17,7 +17,9 @@ probe_router = APIRouter(tags=["health"]) -@welcome_router.get("/", response_model=WelcomeResponse) +@welcome_router.get( + "/", response_model=WelcomeResponse, summary="Service welcome message" +) def welcome(request: Request): return _success_response( request, @@ -25,13 +27,20 @@ def welcome(request: Request): ) -@probe_router.get("/health", response_model=StatusResponse) +@probe_router.get( + "/health", response_model=StatusResponse, summary="Liveness probe" +) def health(request: Request): """Liveness — always 200 while the process is up.""" return _success_response(request, StatusResponse.Result(status="ok")) -@probe_router.get("/ready", response_model=StatusResponse) +@probe_router.get( + "/ready", + response_model=StatusResponse, + summary="Readiness probe", + responses={503: {"model": StatusResponse, "description": "One or more engines unavailable"}}, +) def ready(request: Request): """Readiness — 200 when both rw and ro engines pass `healthcheck()`, 503 otherwise. Probes both modes because the credential split means diff --git a/datastore/api/responses.py b/datastore/api/responses.py index 0269e43..6f9b748 100644 --- a/datastore/api/responses.py +++ b/datastore/api/responses.py @@ -7,6 +7,19 @@ from starlette.requests import Request from starlette.responses import JSONResponse +from datastore.schemas.responses import ErrorEnvelope + +# Shared OpenAPI doc for the CKAN error envelope, attached at router level +# so every action documents the real 4xx / 5xx shape instead of FastAPI's +# default 422. The taxonomy matches `core.exceptions.APIError`. +ERROR_RESPONSES: dict[int, dict[str, Any]] = { + 400: {"model": ErrorEnvelope, "description": "Validation Error"}, + 403: {"model": ErrorEnvelope, "description": "Authorization Error"}, + 404: {"model": ErrorEnvelope, "description": "Not Found Error"}, + 409: {"model": ErrorEnvelope, "description": "Conflict Error"}, + 500: {"model": ErrorEnvelope, "description": "Internal Error"}, +} + def _orjson_default(obj: Any) -> Any: if hasattr(obj, "model_dump"): diff --git a/datastore/main.py b/datastore/main.py index 1bbb6f0..2dff2bb 100644 --- a/datastore/main.py +++ b/datastore/main.py @@ -1,8 +1,10 @@ from __future__ import annotations +import importlib.metadata import logging from collections.abc import AsyncIterator from contextlib import AsyncExitStack, asynccontextmanager +from typing import Any import httpx from fastapi import FastAPI @@ -23,6 +25,85 @@ log = logging.getLogger("uvicorn.error") +API_DESCRIPTION = """\ +A **CKAN-compatible datastore API** — tabular CRUD + search over a pluggable +storage backend (BigQuery today; DuckLake planned). + +### Response envelope +Every `/api/3/action/*` response uses the CKAN envelope: + +```json +{ "help": "", "success": true, "result": { ... } } +``` + +On failure `success` is `false` and `error` carries a `__type` label +(`Validation Error` · `Authorization Error` · `Not Found Error` · +`Conflict Error` · `Internal Error`) plus a human `message`. + +### Authentication +Send your token in the **`Authorization`** header — click **Authorize** above +to set it once for every call. The active provider is chosen by `AUTH_TYPE` +(`ckan` / `jwt` / `anonymous`); under `anonymous` no header is required. + +### Search & streaming +`datastore_search` and `datastore_search_sql` **stream** their response +(peak memory ≈ one row, regardless of result size) and support +`records_format` = `objects` · `lists` · `csv` · `tsv`. Page through results +with the `_links.next` URL returned in `result`. +""" + +OPENAPI_TAGS = [ + { + "name": "datastore", + "description": ( + "CKAN `datastore_*` actions — create, upsert, delete, search, " + "search_sql, and info." + ), + }, + { + "name": "health", + "description": ( + "Liveness (`/health`) and readiness (`/ready`) probes for " + "orchestration." + ), + }, + { + "name": "dump", + "description": "Bulk download of an entire resource (CSV / JSON / Parquet).", + }, +] + + +def _api_version() -> str: + """Installed package version, so `/docs` tracks releases automatically.""" + try: + return importlib.metadata.version("datastore") + except importlib.metadata.PackageNotFoundError: + return "0.0.0" + + +def _strip_default_422(app: FastAPI) -> None: + """Drop FastAPI's auto-generated 422 from the schema. + + `RequestValidationError` is remapped to a 400 CKAN error envelope (see + `error_handlers`), so a documented 422 never actually occurs — the real + 4xx shapes are declared via `ERROR_RESPONSES`. + """ + default_openapi = app.openapi + + def openapi() -> dict[str, Any]: + schema = default_openapi() + for path_item in schema.get("paths", {}).values(): + for operation in path_item.values(): + if isinstance(operation, dict): + operation.get("responses", {}).pop("422", None) + components = schema.get("components", {}).get("schemas", {}) + components.pop("HTTPValidationError", None) + components.pop("ValidationError", None) + return schema + + app.openapi = openapi # type: ignore[method-assign] + @asynccontextmanager async def lifespan(app: FastAPI) -> AsyncIterator[None]: @@ -69,7 +150,15 @@ async def lifespan(app: FastAPI) -> AsyncIterator[None]: def create_app() -> FastAPI: config = get_config() app = FastAPI( - title=config.APP_MESSAGE, + title="Datastore API", + version=_api_version(), + summary=( + "CKAN-compatible tabular CRUD + search over a pluggable storage " + "backend." + ), + description=API_DESCRIPTION, + openapi_tags=OPENAPI_TAGS, + contact={"name": "Datopian", "url": "https://github.com/datopian/datastore"}, lifespan=lifespan, default_response_class=ORJSONResponse, ) @@ -82,6 +171,7 @@ def create_app() -> FastAPI: register_exception_handlers(app) app.include_router(api_router) + _strip_default_422(app) return app diff --git a/datastore/schemas/request.py b/datastore/schemas/request.py index b14bc24..030d48b 100644 --- a/datastore/schemas/request.py +++ b/datastore/schemas/request.py @@ -39,26 +39,81 @@ class DatastoreCreateRequest(BaseModel): `primaryKey` is the single source of truth for the unique key. """ - model_config = ConfigDict(extra="forbid") - - resource_id: str | None = None - resource: dict[str, Any] | None = None + model_config = ConfigDict( + extra="forbid", + json_schema_extra={ + "examples": [ + { + "resource_id": "balancing_auction_results_2025", + "schema": { + "fields": [ + {"name": "auction_id", "type": "integer"}, + {"name": "product_code", "type": "string"}, + {"name": "clearing_price_gbp_per_mwh", "type": "number"}, + ], + "primaryKey": ["auction_id", "product_code"], + }, + "records": [ + { + "auction_id": 144, + "product_code": "DCL", + "clearing_price_gbp_per_mwh": 47.82, + } + ], + } + ] + }, + ) + + resource_id: str | None = Field( + default=None, + description="Target table name. Provide this **or** `resource`, not both.", + ) + resource: dict[str, Any] | None = Field( + default=None, + description=( + "CKAN resource dict — materialises a CKAN resource first, then the " + "table. Only valid under `AUTH_TYPE=ckan`." + ), + ) # `deprecated=` must ride on `Annotated` metadata, not as a `Field()` # default — Pydantic silently drops it on union- / Annotated-aliased # fields when supplied via `Field(default=..., deprecated=...)`. fields: Annotated[ list[FieldSpec] | None, - Field(deprecated="use 'schema' (Frictionless Table Schema) instead"), + Field( + deprecated="use 'schema' (Frictionless Table Schema) instead", + description="Legacy column list `[{id, type, info}]`. Prefer `schema`.", + ), ] = None - schema: dict[str, Any] | None = None + schema: dict[str, Any] | None = Field( + default=None, + description=( + "Frictionless Table Schema: " + "`{fields: [{name, type, ...}], primaryKey: [...]}`. The native " + "shape; `primaryKey` is the source of truth for the unique key." + ), + ) primary_key: Annotated[ StringOrList, - Field(deprecated="use 'schema.primaryKey' instead"), + Field( + deprecated="use 'schema.primaryKey' instead", + description="Legacy unique key. Prefer `schema.primaryKey`.", + ), ] = None - records: list[dict[str, Any]] | None = None - include_records: bool = False - include_total: bool = False - force: bool | None = None + records: list[dict[str, Any]] | None = Field( + default=None, description="Optional rows to seed the new table with." + ) + include_records: bool = Field( + default=False, description="Echo the written rows back in `result.records`." + ) + include_total: bool = Field( + default=False, + description="Run `COUNT(*)` after the write and return `result.total`.", + ) + force: bool | None = Field( + default=None, description="Bypass optional client-side guards (reserved)." + ) _check_schema = field_validator("schema")(validate_frictionless_schema) @@ -113,14 +168,49 @@ def _build_canonical_schema(self) -> DatastoreCreateRequest: class DatastoreUpsertRequest(BaseModel): """Request body for `POST /api/3/datastore_upsert`.""" - model_config = ConfigDict(extra="forbid") - - resource_id: str - records: list[dict[str, Any]] | None = None - method: UpsertMethod = "upsert" - include_records: bool = False - include_total: bool = False - force: bool = False + model_config = ConfigDict( + extra="forbid", + json_schema_extra={ + "examples": [ + { + "resource_id": "balancing_auction_results_2025", + "method": "upsert", + "records": [ + { + "auction_id": 144, + "product_code": "DCL", + "clearing_price_gbp_per_mwh": 48.05, + } + ], + } + ] + }, + ) + + resource_id: str = Field( + description="Target table — must already exist (call `datastore_create` first)." + ) + records: list[dict[str, Any]] | None = Field( + default=None, description="Rows to write." + ) + method: UpsertMethod = Field( + default="upsert", + description=( + "`upsert` — MERGE on the table's `primaryKey` (match→update, " + "miss→insert); `insert` — append, no key check; `update` — every " + "row must match an existing key (else 404)." + ), + ) + include_records: bool = Field( + default=False, description="Echo the written rows back in `result.records`." + ) + include_total: bool = Field( + default=False, + description="Run `COUNT(*)` after the write and return `result.total`.", + ) + force: bool = Field( + default=False, description="Bypass optional client-side guards (reserved)." + ) class DatastoreSearchRequest(BaseModel): @@ -144,20 +234,59 @@ class DatastoreSearchRequest(BaseModel): model_config = ConfigDict(extra="forbid") - resource_id: str - filters: str | None = None - q: str | None = None - distinct: bool = False - plain: bool = True - language: str = "english" + resource_id: str = Field(description="Resource (table) to search.") + filters: str | None = Field( + default=None, + description=( + "JSON object of `column → value` (or `column → [values]` for IN). " + "Matches rows where every pair holds." + ), + examples=['{"product_code": "DCL", "accepted": true}'], + ) + q: str | None = Field( + default=None, + description=( + "Full-text query. A plain string scans every column; a JSON object " + "`{column: term}` searches per column." + ), + examples=["DCL"], + ) + distinct: bool = Field(default=False, description="Return only distinct rows.") + plain: bool = Field( + default=True, description="Treat `q` as plain text (reserved; CKAN-compat)." + ) + language: str = Field( + default="english", + description="Full-text language (reserved; CKAN-compat).", + ) # Engine enforces `Config.SEARCH_RESULT_ROWS_MAX` (default 32000). # No `le` here so ops can lift the cap via env without a schema change. - limit: int = Field(default=100, ge=0) - offset: int = Field(default=0, ge=0) - fields: str | None = None - sort: str | None = None - include_total: bool = True - records_format: RecordsFormat = "objects" + limit: int = Field( + default=100, + ge=0, + description="Max rows to return (capped by `SEARCH_RESULT_ROWS_MAX`).", + ) + offset: int = Field( + default=0, ge=0, description="Rows to skip — pagination offset." + ) + fields: str | None = Field( + default=None, + description="Comma-separated columns to project. Default: all columns.", + examples=["auction_id,product_code,clearing_price_gbp_per_mwh"], + ) + sort: str | None = Field( + default=None, + description='Sort spec: `"col [asc|desc], col2 [asc|desc]"`.', + examples=["delivery_start desc, auction_id asc"], + ) + include_total: bool = Field( + default=True, + description="Include `result.total` (row count). Set `false` to skip the COUNT.", + ) + records_format: RecordsFormat = Field( + default="objects", + description="Shape of `result.records`: `objects` · `lists` · `csv` · `tsv`.", + ) @field_validator("filters") @classmethod @@ -198,7 +327,17 @@ class DatastoreSearchSQLRequest(BaseModel): model_config = ConfigDict(extra="forbid") - sql: str + sql: str = Field( + description=( + "A single read-only `SELECT` / `WITH` statement. Reference " + "resources by their id and include a `LIMIT` (required — used for " + "pagination + the streaming cap)." + ), + examples=[ + 'SELECT * FROM "balancing_auction_results_2025" ' + "WHERE accepted = true LIMIT 100" + ], + ) # Set by `_extract_sql_references` after sql validates. Private so # they're not user-settable from the URL and don't show in OpenAPI; @@ -294,8 +433,12 @@ class DatastoreInfoRequest(BaseModel): model_config = ConfigDict(extra="forbid") - resource_id: str | None = None - id: str | None = None + resource_id: str | None = Field( + default=None, description="Resource (table) to describe." + ) + id: str | None = Field( + default=None, description="CKAN alias for `resource_id`. Send exactly one." + ) @model_validator(mode="after") def _require_resource_id_or_id(self) -> DatastoreInfoRequest: @@ -321,13 +464,40 @@ class DatastoreDeleteRequest(BaseModel): delete when `filters` is set; column drop when `fields` is set. `filters` and `fields` are mutually exclusive.""" - model_config = ConfigDict(extra="forbid") - - resource_id: str | None = None - id: str | None = None - filters: dict[str, Any] | None = None - fields: list[str] | None = None - force: bool = False + model_config = ConfigDict( + extra="forbid", + json_schema_extra={ + "examples": [ + { + "resource_id": "balancing_auction_results_2025", + "filters": {"auction_id": 144, "accepted": False}, + } + ] + }, + ) + + resource_id: str | None = Field( + default=None, description="Resource (table) to delete from / drop." + ) + id: str | None = Field( + default=None, description="CKAN alias for `resource_id`. Send exactly one." + ) + filters: dict[str, Any] | None = Field( + default=None, + description=( + "Delete only rows matching every `column: value` pair. Omit both " + "`filters` and `fields` to drop the whole table." + ), + ) + fields: list[str] | None = Field( + default=None, + description=( + "Drop these columns instead of rows. Mutually exclusive with `filters`." + ), + ) + force: bool = Field( + default=False, description="Required to delete from a CKAN read-only resource." + ) @model_validator(mode="after") def _require_resource_id_or_id(self) -> DatastoreDeleteRequest: diff --git a/datastore/schemas/responses.py b/datastore/schemas/responses.py index a7f5108..04e4bdb 100644 --- a/datastore/schemas/responses.py +++ b/datastore/schemas/responses.py @@ -25,6 +25,39 @@ class ResponseModel(BaseModel): success: bool = True +class ErrorEnvelope(BaseModel): + """CKAN-shaped error body returned for every 4xx / 5xx response.""" + + model_config = ConfigDict( + populate_by_name=True, + json_schema_extra={ + "example": { + "help": "https://example.com/api/3/action/datastore_search", + "success": False, + "error": { + "__type": "Validation Error", + "message": "resource 'foo' is not declared", + }, + } + }, + ) + + class Error(BaseModel): + type_: str = Field( + alias="__type", + description="Error class: `Validation Error` · `Authorization Error` " + "· `Not Found Error` · `Conflict Error` · `Internal Error`.", + ) + message: str = Field(description="Human-readable explanation.") + fields: dict[str, Any] | None = Field( + default=None, description="Per-field detail, present on validation errors." + ) + + help: str + success: bool = False + error: Error + + # --- health ----------------------------------------------------------------- diff --git a/tests/conftest.py b/tests/conftest.py index bcf0889..6c85e6d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,11 @@ ): os.environ[_name] = "" os.environ["BIGQUERY_EXPORT_URL_EXPIRY_HOURS"] = "1" +# `AUTH_TYPE` defaults to `ckan`, whose validator requires a non-empty +# `CKAN_URL`. `create_app()` runs at import (module-level `app`), so give +# it a dummy when the env doesn't carry one — tests override the CKAN +# client via DI, so this URL is never contacted. +os.environ.setdefault("CKAN_URL", "http://test-ckan.local") from collections.abc import Iterator # noqa: E402 from typing import Any # noqa: E402 From 89fa3a3a24f23f4a24475857633d1262609b566b Mon Sep 17 00:00:00 2001 From: Sagar Ghimire Date: Wed, 27 May 2026 15:20:07 +0545 Subject: [PATCH 2/4] Docs (openapi): rename tag groups, group delete with the other writes Recapitalise the Swagger tag groups (Health / Datastore / Datastore Download, Health first), move datastore_delete up next to create/upsert, trim the app description, and point contact at datopian.com. --- datastore/api/endpoints/datastore.py | 55 ++++++++++++++-------------- datastore/api/endpoints/dump.py | 2 +- datastore/api/endpoints/health.py | 4 +- datastore/main.py | 50 ++++++------------------- 4 files changed, 43 insertions(+), 68 deletions(-) diff --git a/datastore/api/endpoints/datastore.py b/datastore/api/endpoints/datastore.py index 57449f4..b3567ac 100644 --- a/datastore/api/endpoints/datastore.py +++ b/datastore/api/endpoints/datastore.py @@ -39,7 +39,7 @@ upsert_datastore, ) -router = APIRouter(tags=["datastore"], responses=ERROR_RESPONSES) +router = APIRouter(tags=["Datastore"], responses=ERROR_RESPONSES) @router.post( @@ -107,6 +107,33 @@ async def datastore_upsert( return _success_response(request, result) +@router.post( + "/datastore_delete", + response_model=DatastoreDeleteResponse, + summary="Delete rows, drop columns, or drop the table", +) +async def datastore_delete( + request: Request, + payload: DatastoreDeleteRequest, + context: Context, +): + """`POST /api/3/datastore_delete` — delete rows or drop the table. + + Body: + `resource_id` / `id` (one required) — table to delete from. + `filters` (optional dict) — only rows matching every key/value + pair are deleted. Omit → whole table is dropped. + `force` (optional bool) — required to delete from a CKAN + read-only resource. + + Returns the original `filters` echoed back (CKAN convention) so the + caller can confirm what the server actually applied. + """ + await context.authorize(resource_id=payload.resource_id, permission="delete") + result = await delete_datastore(context, payload.model_dump()) + return _success_response(request, result) + + @router.get( "/datastore_search", response_model=DatastoreSearchResponse, @@ -186,29 +213,3 @@ async def datastore_info( result = await info_datastore(context, params.model_dump()) return _success_response(request, result) - -@router.post( - "/datastore_delete", - response_model=DatastoreDeleteResponse, - summary="Delete rows, drop columns, or drop the table", -) -async def datastore_delete( - request: Request, - payload: DatastoreDeleteRequest, - context: Context, -): - """`POST /api/3/datastore_delete` — delete rows or drop the table. - - Body: - `resource_id` / `id` (one required) — table to delete from. - `filters` (optional dict) — only rows matching every key/value - pair are deleted. Omit → whole table is dropped. - `force` (optional bool) — required to delete from a CKAN - read-only resource. - - Returns the original `filters` echoed back (CKAN convention) so the - caller can confirm what the server actually applied. - """ - await context.authorize(resource_id=payload.resource_id, permission="delete") - result = await delete_datastore(context, payload.model_dump()) - return _success_response(request, result) diff --git a/datastore/api/endpoints/dump.py b/datastore/api/endpoints/dump.py index df4c6b8..d326948 100644 --- a/datastore/api/endpoints/dump.py +++ b/datastore/api/endpoints/dump.py @@ -33,7 +33,7 @@ "parquet": "application/vnd.apache.parquet", } -router = APIRouter(tags=["dump"], responses=ERROR_RESPONSES) +router = APIRouter(tags=["Datastore Download"], responses=ERROR_RESPONSES) @router.get( diff --git a/datastore/api/endpoints/health.py b/datastore/api/endpoints/health.py index f5fe277..7b8c888 100644 --- a/datastore/api/endpoints/health.py +++ b/datastore/api/endpoints/health.py @@ -11,10 +11,10 @@ from datastore.infrastructure.engines.registry import get_datastore_engine from datastore.schemas.responses import StatusResponse, WelcomeResponse -welcome_router = APIRouter(tags=["health"]) +welcome_router = APIRouter(tags=["Health"]) -probe_router = APIRouter(tags=["health"]) +probe_router = APIRouter(tags=["Health"]) @welcome_router.get( diff --git a/datastore/main.py b/datastore/main.py index 2dff2bb..7225d36 100644 --- a/datastore/main.py +++ b/datastore/main.py @@ -25,50 +25,25 @@ log = logging.getLogger("uvicorn.error") -API_DESCRIPTION = """\ -A **CKAN-compatible datastore API** — tabular CRUD + search over a pluggable -storage backend (BigQuery today; DuckLake planned). - -### Response envelope -Every `/api/3/action/*` response uses the CKAN envelope: - -```json -{ "help": "", "success": true, "result": { ... } } -``` - -On failure `success` is `false` and `error` carries a `__type` label -(`Validation Error` · `Authorization Error` · `Not Found Error` · -`Conflict Error` · `Internal Error`) plus a human `message`. - -### Authentication -Send your token in the **`Authorization`** header — click **Authorize** above -to set it once for every call. The active provider is chosen by `AUTH_TYPE` -(`ckan` / `jwt` / `anonymous`); under `anonymous` no header is required. - -### Search & streaming -`datastore_search` and `datastore_search_sql` **stream** their response -(peak memory ≈ one row, regardless of result size) and support -`records_format` = `objects` · `lists` · `csv` · `tsv`. Page through results -with the `_links.next` URL returned in `result`. -""" OPENAPI_TAGS = [ - { - "name": "datastore", + { + "name": "Health", "description": ( - "CKAN `datastore_*` actions — create, upsert, delete, search, " - "search_sql, and info." + "Liveness (`/health`) and readiness (`/ready`) probes for " + "orchestration." ), }, { - "name": "health", + "name": "Datastore", "description": ( - "Liveness (`/health`) and readiness (`/ready`) probes for " - "orchestration." + "A `datastore` API endpoint - create, upsert, delete, search, " + "search_sql, and info." ), }, + { - "name": "dump", + "name": "Datastore Download", "description": "Bulk download of an entire resource (CSV / JSON / Parquet).", }, ] @@ -153,12 +128,11 @@ def create_app() -> FastAPI: title="Datastore API", version=_api_version(), summary=( - "CKAN-compatible tabular CRUD + search over a pluggable storage " - "backend." + "A Datasore API endpoint for managing tabular data resources. " ), - description=API_DESCRIPTION, + description="", openapi_tags=OPENAPI_TAGS, - contact={"name": "Datopian", "url": "https://github.com/datopian/datastore"}, + contact={"name": "Datopian", "url": "https://www.datopian.com/"}, lifespan=lifespan, default_response_class=ORJSONResponse, ) From 6434a7886af6f9c91157161c38e2f1d58aafaf87 Mon Sep 17 00:00:00 2001 From: Sagar Ghimire Date: Wed, 27 May 2026 16:41:58 +0545 Subject: [PATCH 3/4] Docs: minimal README, add API.md reference, slim CLAUDE.md MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rewrite README to a minimal overview (quick start, config essentials, doc links). - Add API.md — full per-endpoint HTTP reference (envelope, auth, errors, examples, Postman link). - Replace CLAUDE.md's request/response contracts section with a pointer to API.md (~951 -> ~540 lines). - Improve the Swagger operation summaries on datastore_search / datastore_search_sql / dump. --- API.md | 421 +++++++++++++++++++++++++++ CLAUDE.md | 415 +------------------------- README.md | 407 ++------------------------ datastore/api/endpoints/datastore.py | 12 +- datastore/api/endpoints/dump.py | 2 +- 5 files changed, 456 insertions(+), 801 deletions(-) create mode 100644 API.md diff --git a/API.md b/API.md new file mode 100644 index 0000000..872224d --- /dev/null +++ b/API.md @@ -0,0 +1,421 @@ +# Datastore API Reference + +A standalone, CKAN-compatible datastore service: tabular CRUD + search over a +pluggable storage backend. Every action lives under `/api/3/action/` and returns +the CKAN envelope, so existing CKAN datastore clients work unchanged — whether +this runs alongside CKAN or independently. + +- **Interactive docs:** `GET /docs` (Swagger UI) · `GET /redoc` · `GET /openapi.json` +- **Postman:** import [postman/collection.json](postman/collection.json) — one worked request per endpoint. + +--- + +## Conventions + +### Response envelope + +Every response is a CKAN envelope. On success: + +```json +{ "help": "", "success": true, "result": { ... } } +``` + +On failure: + +```json +{ + "help": "", + "success": false, + "error": { + "__type": "Validation Error", + "message": "human-readable explanation", + "fields": { "field": ["..."] } + } +} +``` + +`error.fields` is present only on validation errors. `null` values are never +serialised — absent fields are simply omitted. + +### Error types + +| `__type` | HTTP | When | +|---|---|---| +| `Validation Error` | 400 | Bad input — shape, types, unknown column, read-only resource | +| `Authorization Error` | 403 | Caller may not perform the action | +| `Not Found Error` | 404 | Resource not declared | +| `Conflict Error` | 409 | Unsupported in-place change (e.g. narrowing a column type) | +| `Internal Error` | 500 | Backend/transport failure | + +### Authentication + +Send the token in the **`Authorization`** header. The active provider is set by +`AUTH_TYPE`: + +| `AUTH_TYPE` | Behaviour | Token | +|---|---|---| +| `ckan` | Delegates to CKAN `datastore_authorize` (TTL-cached) | CKAN API key | +| `jwt` | Verifies signature + `aud`/`iss`/`exp` locally | signed JWT | +| `anonymous` | Allows everything; no identity | none | + +Read actions (`datastore_search`, `datastore_search_sql`, `datastore_info`) may +be attempted without a token; the provider decides. All write actions require a +token (except under `anonymous`). + +### Endpoints at a glance + +| Method | Path | Summary | +|---|---|---| +| POST | `/api/3/action/datastore_create` | Declare a resource (and optionally seed rows) | +| POST | `/api/3/action/datastore_upsert` | Insert / update / upsert rows | +| POST | `/api/3/action/datastore_delete` | Delete rows, drop columns, or drop the table | +| GET | `/api/3/action/datastore_search` | Search a resource (streaming) | +| GET | `/api/3/action/datastore_search_sql` | Run a read-only SQL `SELECT` (streaming) | +| GET | `/api/3/action/datastore_info` | Schema + row stats for a resource | +| GET | `/datastore/dump/{resource_id}` | Download a whole resource (CSV/NDJSON/Parquet) | +| GET | `/` · `/health` · `/ready` | Welcome / liveness / readiness | + +--- + +## `POST /api/3/action/datastore_create` + +Declare a resource (table) and optionally seed it with rows. Re-declaring an +existing resource adds columns and widens types (see below). + +**Two input shapes:** + +- `resource_id` — table name only. Works under any `AUTH_TYPE`. +- `resource` (dict) — creates a CKAN resource first (with `url_type="datastore"`), + then writes the table. **`AUTH_TYPE=ckan` only**; rejected otherwise. + +### Body + +| Field | Type | Notes | +|---|---|---| +| `resource_id` | string | Target table. Provide this **or** `resource`. | +| `resource` | object | CKAN resource dict (ckan auth only). | +| `schema` | object | Frictionless Table Schema — the native column shape. | +| `fields` | array | *Deprecated* legacy `[{id, type, info}]`. Use `schema`. | +| `primary_key` | string \| array | *Deprecated*. Use `schema.primaryKey`. | +| `records` | array | Optional rows to seed. | +| `include_records` | bool | Echo written rows back in `result.records`. | +| `include_total` | bool | Run `COUNT(*)` and return `result.total`. | +| `force` | bool | Required to write a datastore-managed resource (see [read-only guard](#read-only-resource-guard)). | + +**Field types** accept Frictionless canonical names (`integer`, `number`, +`string`, `boolean`, `date`, `datetime`, `time`, `object`, `array`, `geopoint`, +`geojson`, `any`) or SQL aliases (`int4`, `bigint`, `varchar`, `text`, `float`, +`numeric`, `bool`, `timestamp`, `json`, …), normalised to canonical on storage. +Each field may carry an `info` data dictionary (`title`, `description`, +`comment`, `example`, `unit`, plus custom keys), stored verbatim and +round-tripped by `datastore_info`. + +### Request + +```json +{ + "resource_id": "balancing_auction_results_2025", + "schema": { + "fields": [ + {"name": "auction_id", "type": "integer", "info": {"title": "Auction ID"}}, + {"name": "product_code", "type": "string"}, + {"name": "delivery_start", "type": "datetime"}, + {"name": "clearing_price_gbp_per_mwh", "type": "number", "info": {"unit": "GBP/MWh"}}, + {"name": "accepted", "type": "boolean"}, + {"name": "bidder_metadata", "type": "object"} + ], + "primaryKey": ["auction_id", "product_code"] + }, + "records": [ + {"auction_id": 144, "product_code": "DCL", "delivery_start": "2025-11-04T16:00:00Z", + "clearing_price_gbp_per_mwh": 47.82, "accepted": true, + "bidder_metadata": {"unit_id": "DRAX-1"}} + ] +} +``` + +### Response — 200 + +```json +{ + "help": "...", + "success": true, + "result": { + "resource_id": "balancing_auction_results_2025", + "fields": [{"id": "auction_id", "type": "integer", "info": {"...": "..."}}, "..."], + "schema": {"fields": ["..."], "primaryKey": ["auction_id", "product_code"]}, + "primary_key": ["auction_id", "product_code"] + } +} +``` + +`records` and `total` appear only when `include_records` / `include_total` are set. + +--- + +## `POST /api/3/action/datastore_upsert` + +Write rows into an existing resource (declare it with `datastore_create` first). + +### Body + +| Field | Type | Notes | +|---|---|---| +| `resource_id` | string | Target table (required). | +| `records` | array | Rows to write. | +| `method` | string | `upsert` (default) · `insert` · `update`. | +| `include_records` | bool | Echo written rows in `result.records`. | +| `include_total` | bool | Return `result.total`. | +| `force` | bool | Required for a datastore-managed resource (see [read-only guard](#read-only-resource-guard)). | + +- **`upsert`** — `MERGE` on the table's stored `primaryKey`: match → update, miss → insert. +- **`insert`** — append rows; no key check. +- **`update`** — every row must match an existing key, else `Not Found Error`. + +The table's `unique_key` (set at create) decides matching — the request body +never carries it. + +### Request + +```json +{ + "resource_id": "balancing_auction_results_2025", + "method": "upsert", + "records": [ + {"auction_id": 144, "product_code": "DCL", "clearing_price_gbp_per_mwh": 48.05, "accepted": true} + ] +} +``` + +### Response — 200 + +```json +{ "help": "...", "success": true, + "result": {"resource_id": "balancing_auction_results_2025", "method": "upsert"} } +``` + +--- + +## `POST /api/3/action/datastore_delete` + +Three modes (`filters` and `fields` are mutually exclusive): + +- **Drop table** — omit both `filters` and `fields`. +- **Delete rows** — `filters` (only rows matching every `column: value`). +- **Drop columns** — `fields` (list of column names). + +### Body + +| Field | Type | Notes | +|---|---|---| +| `resource_id` / `id` | string | Target table (one required; `id` is a CKAN alias). | +| `filters` | object | Row filter. Omit (with no `fields`) → drop the table. | +| `fields` | array | Columns to drop. Mutually exclusive with `filters`. | +| `force` | bool | Required for a datastore-managed resource (see [read-only guard](#read-only-resource-guard)). | + +### Request + +```json +{ "resource_id": "balancing_auction_results_2025", + "filters": {"auction_id": 144, "accepted": false} } +``` + +### Response — 200 + +```json +{ "help": "...", "success": true, + "result": {"resource_id": "balancing_auction_results_2025"} } +``` + +On a **column drop**, `result` also carries `schema` — the Frictionless schema +after the columns were removed — so you can confirm the new shape without a +follow-up `datastore_info`: + +```json +{ "help": "...", "success": true, + "result": { + "resource_id": "balancing_auction_results_2025", + "fields": ["bidder_metadata"], + "schema": {"fields": [{"name": "auction_id", "type": "integer"}, "..."], + "primaryKey": ["auction_id", "product_code"]} + } } +``` + +--- + +## `GET /api/3/action/datastore_search` + +Parameterised search; the response is **streamed** (peak memory ≈ one row). + +### Query parameters + +| Name | Type | Default | Notes | +|---|---|---|---| +| `resource_id` | string | — | required | +| `filters` | JSON object | — | `{"col": value}` or `{"col": [v1, v2]}` (IN) | +| `q` | string \| JSON | — | full-text (string = all columns; object = per column) | +| `distinct` | bool | `false` | | +| `plain` | bool | `true` | reserved (CKAN-compat) | +| `language` | string | `"english"` | reserved (CKAN-compat) | +| `limit` | int | `100` | capped by `SEARCH_RESULT_ROWS_MAX` | +| `offset` | int | `0` | | +| `fields` | CSV | all | comma-separated columns to project | +| `sort` | string | — | `"col asc, col2 desc"` | +| `include_total` | bool | `true` | runs `COUNT(*)` when needed | +| `records_format` | string | `"objects"` | `objects` · `lists` · `csv` · `tsv` | + +### Example + +``` +GET /api/3/action/datastore_search + ?resource_id=balancing_auction_results_2025 + &filters={"product_code":"DCL","accepted":true} + &sort=delivery_start desc + &limit=100 +``` + +### Response (records_format=objects) + +```json +{ + "help": "...", + "success": true, + "result": { + "fields": [{"id": "auction_id", "type": "integer"}, "..."], + "records": [ + {"auction_id": 144, "product_code": "DCL", "clearing_price_gbp_per_mwh": 47.82} + ], + "total": 2, + "_links": { + "start": ".../datastore_search?resource_id=...&limit=100", + "next": ".../datastore_search?resource_id=...&limit=100&offset=100" + } + } +} +``` + +- `records_format=lists` → each record is a positional array (column order = `fields`). +- `records_format=csv` / `tsv` → `records` is a single text body (header row first), + still inside the JSON envelope. +- Paginate by following `_links.next`; end-of-data is an empty `records` array. + +--- + +## `GET /api/3/action/datastore_search_sql` + +Run a single read-only `SELECT` / `WITH` statement and stream the result. Tables +are referenced by `resource_id`; each is authorized individually, and functions +are checked against the engine's allow-list. Include a `LIMIT` (required). + +### Query parameters + +| Name | Type | Notes | +|---|---|---| +| `sql` | string | A single `SELECT`/`WITH`; no multi-statement, no DML/DDL. | + +### Example + +``` +GET /api/3/action/datastore_search_sql?sql= + SELECT product_code, AVG(clearing_price_gbp_per_mwh) AS avg_price + FROM "balancing_auction_results_2025" + WHERE accepted = true + GROUP BY product_code + LIMIT 1000 +``` + +### Response + +```json +{ + "help": "...", + "success": true, + "result": { + "fields": [{"id": "product_code", "type": "string"}, {"id": "avg_price", "type": "number"}], + "records": [{"product_code": "DCL", "avg_price": 47.82}] + } +} +``` + +Safety: the schema rejects non-`SELECT` / multi-statement / unparseable SQL; +the load-bearing guard is a **read-only** backend credential that physically +refuses DML/DDL. + +--- + +## `GET /api/3/action/datastore_info` + +Returns the column schema (including the `info` data dictionary, verbatim) plus +row stats — a column-level metadata catalog without a side store. + +### Query parameters + +| Name | Type | Notes | +|---|---|---| +| `resource_id` / `id` | string | One required (`id` is a CKAN alias). | + +### Response + +```json +{ + "help": "...", + "success": true, + "result": { + "fields": [ + {"id": "auction_id", "type": "integer", + "info": {"title": "Auction ID", "comment": "MANDATORY"}} + ], + "schema": {"fields": ["..."], "primaryKey": ["auction_id", "product_code"]}, + "meta": { + "resource_id": "balancing_auction_results_2025", + "primary_key": ["auction_id", "product_code"], + "total": 18420 + } + } +} +``` + +--- + +## `GET /datastore/dump/{resource_id}` + +Download an entire resource. Pick the format with `?format=csv` (default), +`ndjson`, or `parquet`. + +- **Small export (single shard):** `302` redirect to a signed GCS URL (bytes go + straight from storage to the client). +- **Large export (multiple shards, CSV/NDJSON):** a streamed body + (`200`, ~constant memory). +- Parquet over the single-shard limit returns `413` — switch to CSV/NDJSON. + +Requires `read` permission on the resource and a configured export bucket +(`BIGQUERY_EXPORT_BUCKET`). + +--- + +## Health + +All return the CKAN envelope. + +| Method | Path | Result | +|---|---|---| +| GET | `/` | `{"message": ""}` | +| GET | `/health` | `{"status": "ok"}` — liveness; always 200 while the process runs | +| GET | `/ready` | `{"status": "ready"}` — 200 when both engines pass `healthcheck()`; `503` (`{"status": "not_ready"}`) otherwise | + +--- + +## Read-only resource guard + +Under **`AUTH_TYPE=ckan`**, `datastore_create`, `datastore_upsert`, and +`datastore_delete` refuse to write a resource whose CKAN record carries +`url_type="datastore"` unless the request sets `force: true`: + +```json +{ "help": "...", "success": false, + "error": {"__type": "Validation Error", + "message": "Cannot update a read-only resource. Use \"force\" to force update."} } +``` + +This mirrors CKAN's protection against clobbering datastore-managed data. It is +gated on `AUTH_TYPE=ckan` and skipped entirely under other providers. diff --git a/CLAUDE.md b/CLAUDE.md index 56ad551..3ab755f 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -482,420 +482,9 @@ The GCS client is built with the same credentials as the BigQuery client for the ## 6. Request / Response Contracts -CKAN-style envelope: every response has `help`, `success`, and either `result` or `error`. - -### 6.1 `POST /api/3/datastore_create` - -Running example: an electricity balancing-market auction-results table. Used -consistently across the rest of §6 so the request → search → info round-trip -is easy to follow. - -**Request** -```json -{ - "resource_id": "balancing_auction_results_2025", - "fields": [ - { - "id": "auction_id", - "type": "integer", - "info": { - "title": "Auction ID", - "description": "Unique auction identifier. Stable across all products auctioned in the same market window.", - "comment": "MANDATORY", - "example": "144", - "unit": "N/A" - } - }, - { - "id": "product_code", - "type": "string", - "info": { - "title": "Product Code", - "description": "Product mnemonic for the balancing service (e.g. DCL, DCH, FFR).", - "example": "DCL" - } - }, - { - "id": "delivery_start", - "type": "datetime", - "info": { - "title": "Delivery Start (UTC)", - "description": "First instant of the delivery window. Stored as UTC; clients render local time.", - "example": "2025-11-04T16:00:00Z" - } - }, - { - "id": "duration_minutes", - "type": "integer", - "info": { - "title": "Delivery Duration", - "description": "Length of the delivery window.", - "unit": "minutes", - "example": "30" - } - }, - { - "id": "clearing_price_gbp_per_mwh", - "type": "number", - "info": { - "title": "Clearing Price", - "description": "Pay-as-cleared price for the auction. Negative values are possible during oversupply.", - "unit": "GBP/MWh", - "example": "47.82" - } - }, - { - "id": "volume_mwh", - "type": "number", - "info": { - "title": "Cleared Volume", - "description": "Total volume cleared in this auction.", - "unit": "MWh", - "example": "120.0" - } - }, - { - "id": "accepted", - "type": "boolean", - "info": { - "title": "Accepted", - "description": "Whether the bid cleared (true) or was rejected (false)." - } - }, - { - "id": "bidder_metadata", - "type": "object", - "info": { - "title": "Bidder Metadata", - "description": "Free-form provider-specific metadata captured at submission time.", - "comment": "Schema not enforced; kept opaque for downstream analytics." - } - } - ], - "unique_key": ["auction_id", "product_code"], - "records": [ - { - "auction_id": 144, - "product_code": "DCL", - "delivery_start": "2025-11-04T16:00:00Z", - "duration_minutes": 30, - "clearing_price_gbp_per_mwh": 47.82, - "volume_mwh": 120.0, - "accepted": true, - "bidder_metadata": {"unit_id": "DRAX-1", "submission_lag_ms": 412} - }, - { - "auction_id": 144, - "product_code": "DCH", - "delivery_start": "2025-11-04T16:00:00Z", - "duration_minutes": 30, - "clearing_price_gbp_per_mwh": 51.10, - "volume_mwh": 75.5, - "accepted": true, - "bidder_metadata": {"unit_id": "EDF-COTT-2", "submission_lag_ms": 280} - } - ] -} -``` - -- `resource_id` — SQL identifier, required. -- `fields` — non-empty; each entry contains: - - `id` (or alias `name`) — column identifier; SQL-safe. - - `type` — column type. Accepts Frictionless canonical (`integer`, `number`, `string`, `boolean`, `date`, `datetime`, `time`, `object`, `array`, `geopoint`, `geojson`, `any`) or SQL aliases (`int4`, `int8`, `bigint`, `varchar`, `text`, `float`, `double`, `numeric`, `bool`, `timestamp`, `json`, …) which are normalised to canonical on storage. - - `info` — optional **data dictionary** for documentation. Free-form object; recognised keys: `title`, `description`, `comment`, `example`, `unit`, plus any custom metadata. Stored verbatim and round-tripped on `datastore_info`. The outer `type` is canonical; any `info.type` is treated as a hint and ignored. Whitespace in string values is trimmed. -- `unique_key` — string or list of strings; all entries must reference declared field ids. The example uses a composite key (`auction_id` + `product_code`) since one auction clears multiple products. -- `records` — optional; each record's keys must be a subset of declared field ids. -- `primary_key` — accepted for back-compat; emits deprecation warning. - -**Response — 200** -```json -{ - "help": "", - "success": true, - "result": { - "resource_id": "balancing_auction_results_2025", - "fields": [ - {"id": "auction_id", "type": "integer", "info": {"title": "Auction ID", "...": "..."}}, - {"id": "product_code", "type": "string", "info": {"...": "..."}}, - {"id": "delivery_start", "type": "datetime", "info": {"...": "..."}}, - {"id": "duration_minutes", "type": "integer", "info": {"...": "..."}}, - {"id": "clearing_price_gbp_per_mwh", "type": "number", "info": {"...": "..."}}, - {"id": "volume_mwh", "type": "number", "info": {"...": "..."}}, - {"id": "accepted", "type": "boolean", "info": {"...": "..."}}, - {"id": "bidder_metadata", "type": "object", "info": {"...": "..."}} - ], - "primary_key": ["auction_id", "product_code"], - "unique_key": ["auction_id", "product_code"] - } -} -``` - -Optional response fields (omitted from the body when not requested): -- `records` — echoes the input rows back when the request sets `include_records: true`. -- `total` — total row count after the write, populated when `include_total: true`. - -### 6.2 `GET /api/3/datastore_search` - -**Query params** -| Name | Type | Default | Notes | -|---|---|---|---| -| `resource_id` | str | — | required unless `q` supplied | -| `filters` | JSON-encoded object | `null` | `{"col": value}` or `{"col": [v1, v2]}` | -| `q` | str / JSON | `null` | full-text or per-column | -| `distinct` | bool | `false` | | -| `plain` | bool | `true` | | -| `language` | str | `"english"` | reserved | -| `limit` | int | `1000` | clamped to `[0, 10000]` | -| `offset` | int | `0` | | -| `fields` | comma-separated list | all | | -| `sort` | str | `null` | `"col asc, col2 desc"` | -| `include_total` | bool | `true` | runs `COUNT(*)` if true | -| `records_format` | str | `"objects"` | `objects` / `lists` / `csv` / `tsv` | - -**Example request** - -``` -GET /api/3/datastore_search - ?resource_id=balancing_auction_results_2025 - &filters={"product_code": "DCL", "accepted": true} - &sort=delivery_start desc, clearing_price_gbp_per_mwh asc - &fields=auction_id,product_code,delivery_start,clearing_price_gbp_per_mwh,volume_mwh - &limit=100 - &offset=0 -``` - -**Response (records_format=objects) — streamed** -```json -{ - "help": "...", - "success": true, - "result": { - "fields": [ - {"id": "auction_id", "type": "integer"}, - {"id": "product_code", "type": "string"}, - {"id": "delivery_start", "type": "datetime"}, - {"id": "clearing_price_gbp_per_mwh", "type": "number"}, - {"id": "volume_mwh", "type": "number"} - ], - "records": [ - {"auction_id": 152, "product_code": "DCL", "delivery_start": "2025-11-05T18:30:00Z", "clearing_price_gbp_per_mwh": 39.40, "volume_mwh": 95.0}, - {"auction_id": 144, "product_code": "DCL", "delivery_start": "2025-11-04T16:00:00Z", "clearing_price_gbp_per_mwh": 47.82, "volume_mwh": 120.0} - ], - "total": 2, - "_links": { - "start": "https://example.com/api/3/action/datastore_search?resource_id=balancing_auction_results_2025&limit=100", - "next": "https://example.com/api/3/action/datastore_search?resource_id=balancing_auction_results_2025&limit=100&offset=100" - } - } -} -``` - -`_links` carries the same scheme + host as the request URL, with all -non-`offset` params preserved. `start` omits `offset` (it defaults to 0); -`next` advances `offset` by `limit`. Clients detect end-of-data by an -empty `records` array on the next page — there's no `prev` field today. - -`records_format=lists` returns each record as a positional array (column order matches `fields`). -`records_format=csv` / `tsv` return a streaming text body with the header row first. - -### 6.3 `POST /api/3/datastore_upsert` - -**Request — late-arriving correction to an auction result** -```json -{ - "resource_id": "balancing_auction_results_2025", - "method": "upsert", - "unique_key": ["auction_id", "product_code"], - "records": [ - { - "auction_id": 144, - "product_code": "DCL", - "delivery_start": "2025-11-04T16:00:00Z", - "duration_minutes": 30, - "clearing_price_gbp_per_mwh": 48.05, - "volume_mwh": 120.0, - "accepted": true, - "bidder_metadata": {"unit_id": "DRAX-1", "submission_lag_ms": 412, "revision": 2} - }, - { - "auction_id": 153, - "product_code": "FFR", - "delivery_start": "2025-11-05T19:00:00Z", - "duration_minutes": 60, - "clearing_price_gbp_per_mwh": 32.40, - "volume_mwh": 200.0, - "accepted": false, - "bidder_metadata": {"unit_id": "SSE-PEH-3", "rejection_reason": "above_cap"} - } - ], - "include_records": false, - "include_total": false, - "force": false -} -``` - -- `method`: `upsert` | `insert` | `update`. The table's stored `unique_key` (set at `datastore_create`) decides which rows match — the request body itself never carries it. -- `include_records`: if `true`, echoes the written rows back in the response. -- `include_total`: if `true`, the engine runs a `COUNT(*)` after the write and populates `result.total`. Off by default. -- `force`: bypasses optional client-side guards (reserved; backend-specific). - -**Response** -```json -{ - "help": "...", - "success": true, - "result": { - "resource_id": "balancing_auction_results_2025", - "method": "upsert" - } -} -``` - -Optional fields appear in `result` only when requested: - -- `records` — echoes input rows when `include_records: true`. -- `total` — total row count after the write when `include_total: true`. - -`null` is never serialised — fields that aren't populated are simply omitted (see `_orjson_default` in `api/responses.py`). - -### 6.4 `GET /api/3/datastore_search_sql` - -**Query params**: `sql` (required), `limit` (default 32000). - -**Example request — daily clearing-price summary** -``` -GET /api/3/datastore_search_sql?sql= - SELECT - DATE(delivery_start) AS delivery_date, - product_code, - AVG(clearing_price_gbp_per_mwh) AS avg_price, - SUM(volume_mwh) AS total_volume - FROM balancing_auction_results_2025 - WHERE accepted = true - AND delivery_start >= '2025-11-01' - GROUP BY delivery_date, product_code - ORDER BY delivery_date DESC, product_code -&limit=10000 -``` - -**Response — streamed** -```json -{ - "help": "...", - "success": true, - "result": { - "fields": [ - {"id": "delivery_date", "type": "date"}, - {"id": "product_code", "type": "string"}, - {"id": "avg_price", "type": "number"}, - {"id": "total_volume", "type": "number"} - ], - "records": [ - {"delivery_date": "2025-11-05", "product_code": "DCL", "avg_price": 41.20, "total_volume": 1840.0}, - {"delivery_date": "2025-11-05", "product_code": "DCH", "avg_price": 49.75, "total_volume": 720.5}, - {"delivery_date": "2025-11-04", "product_code": "DCL", "avg_price": 47.82, "total_volume": 1200.0} - ], - "records_truncated": false - } -} -``` - -### 6.5 `POST /api/3/datastore_delete` - -**Request — purge rejected bids for a single auction window** -```json -{ - "resource_id": "balancing_auction_results_2025", - "filters": { - "auction_id": 144, - "accepted": false - }, - "force": false -} -``` -Empty `filters` (or omitted) → the entire table is dropped. - -**Response** -```json -{ - "help": "...", - "success": true, - "result": {"resource_id": "balancing_auction_results_2025"} -} -``` - -### 6.6 `GET /api/3/datastore_info` - -Returns the same field shape that was supplied to `datastore_create`, including -the `info` data dictionary verbatim — clients can use this as a column-level -metadata catalog (titles, descriptions, units, examples) without a side store. - -**Response** -```json -{ - "help": "...", - "success": true, - "result": { - "resource_id": "balancing_auction_results_2025", - "fields": [ - { - "id": "auction_id", - "type": "integer", - "info": { - "title": "Auction ID", - "description": "Unique auction identifier. Stable across all products auctioned in the same market window.", - "comment": "MANDATORY", - "example": "144", - "unit": "N/A" - } - }, - { - "id": "product_code", - "type": "string", - "info": { - "title": "Product Code", - "description": "Product mnemonic for the balancing service (e.g. DCL, DCH, FFR).", - "example": "DCL" - } - }, - { - "id": "delivery_start", - "type": "datetime", - "info": { - "title": "Delivery Start (UTC)", - "description": "First instant of the delivery window. Stored as UTC; clients render local time.", - "example": "2025-11-04T16:00:00Z" - } - }, - {"id": "duration_minutes", "type": "integer", "info": {"title": "Delivery Duration", "unit": "minutes"}}, - {"id": "clearing_price_gbp_per_mwh", "type": "number", "info": {"title": "Clearing Price", "unit": "GBP/MWh"}}, - {"id": "volume_mwh", "type": "number", "info": {"title": "Cleared Volume", "unit": "MWh"}}, - {"id": "accepted", "type": "boolean", "info": {"title": "Accepted"}}, - {"id": "bidder_metadata", "type": "object", "info": {"title": "Bidder Metadata"}} - ], - "unique_key": ["auction_id", "product_code"], - "primary_key": ["auction_id", "product_code"], - "total": 18420 - } -} -``` - -### 6.7 Error envelope (all 4xx / 5xx) - -```json -{ - "help": "", - "success": false, - "error": { - "__type": "Validation Error", - "message": "fields[0].id is not a valid identifier: '1bad'", - "fields": {"fields": ["..."]} // optional, present on validation errors - } -} -``` +Every response is the CKAN envelope — `help`, `success`, and either `result` or `error`. The full per-endpoint reference (request bodies, query params, worked examples, and error shapes) lives in **[API.md](API.md)**. -`__type` taxonomy: `Validation Error` (400), `Authorization Error` (403), -`Not Found Error` (404), `Conflict Error` (409), `Internal Error` (500). +`__type` taxonomy: `Validation Error` (400), `Authorization Error` (403), `Not Found Error` (404), `Conflict Error` (409), `Internal Error` (500). ## 7. Roadmap diff --git a/README.md b/README.md index 2bc68d6..9992ab9 100644 --- a/README.md +++ b/README.md @@ -1,399 +1,44 @@ # Datastore API -A CKAN-shaped action API for tabular data storage and querying, built -on FastAPI with **two pluggable axes**: +A **standalone datastore service** for tabular data. It provides a simple API +for creating tables, inserting / updating / deleting rows, and searching them +with filters or SQL. It can serve as a CKAN datastore or run independently. -- **Storage engine** — `DATASTORE_ENGINE` selects a folder under - `datastore/infrastructure/engines/` (BigQuery today; DuckLake planned). -- **Auth provider** — `AUTH_TYPE` selects a folder under `datastore/auth/`. - Built-in: `ckan` (delegates to an upstream CKAN, TTL-cached), - `jwt` (verifies signature + claims locally), `anonymous` (allow-all, - for local dev / CI). +Storage backends and auth providers are pluggable and easy to extend: +- **Pluggable storage** — `DATASTORE_ENGINE` selects a backend (BigQuery today; DuckLake planned). +- **Pluggable auth** — `AUTH_TYPE` selects a provider: `ckan` / `jwt` / `anonymous`. -Exposes `/api/3/action/datastore_*` endpoints. Runs **standalone** -under `AUTH_TYPE=anonymous` or `AUTH_TYPE=jwt` — no CKAN required — -or as a satellite to CKAN under `AUTH_TYPE=ckan`, in which case CKAN -remains the single source of truth for users, packages, resources, -and permissions, and the heavy datastore work lives here. - -## Project structure - -``` -datastore/ -├── main.py # FastAPI app factory + lifespan -│ -├── api/ # HTTP layer — only layer that imports fastapi / starlette -│ ├── routes.py # Top-level APIRouter; aggregates endpoints/ -│ ├── context.py # RequestContext (per-request DI bundle: config, -│ │ # api_key, auth_provider, ckan); .authorize() method -│ ├── auth.py # Boundary policy (permission whitelist + anonymous-read -│ │ # rule); delegates to the active AuthProvider -│ ├── middleware.py # ASGI middleware (e.g. BodySizeLimitMiddleware) -│ ├── responses.py # Envelope response helpers (_success_response / _error_response) -│ ├── error_handlers.py # Exception handlers (APIError → CKAN error envelope) -│ └── endpoints/ # Route handlers, one file per resource group -│ ├── health.py # /, /health, /ready -│ ├── datastore.py # /api/3/action/datastore_* -│ └── dump.py # /datastore/dump/ (302 single / stream multi) -│ -├── auth/ # Pluggable auth providers — one subpackage per type -│ ├── base.py # AuthProvider Protocol + Decision dataclass + -│ │ # default_key_id (JWT jti / sha256 helper) -│ ├── registry.py # get_auth_provider(config, **extras) — importlib dispatch -│ ├── ckan/ # AUTH_TYPE=ckan: calls /api/3/action/datastore_authorize -│ │ # via CKANClient; holds its own TTL cache (the only -│ │ # network-bound provider) so we don't hit CKAN per request -│ ├── jwt/ # AUTH_TYPE=jwt: verifies HS*/RS*/ES* signature + aud/iss -│ └── anonymous/ # AUTH_TYPE=anonymous: always allows; no identity -│ -├── core/ # Cross-cutting helpers — no I/O, no fastapi -│ ├── config.py # Pydantic-Settings `Config` (env-driven) + get_config() -│ ├── constants.py # Shared constants (type maps, defaults, …) -│ ├── exceptions.py # APIError taxonomy + HTTP status → label map -│ └── helper.py # Pure helpers (e.g. parse_authorization_header) -│ -├── schemas/ # Pydantic request/response shapes (boundary validation only) -│ ├── request.py # Inbound request models (DatastoreCreateRequest, …) -│ ├── responses.py # Outbound CKAN envelopes (ResponseModel + per-endpoint) -│ └── validators.py # Reusable Annotated types + field validators -│ -├── services/ # Business logic -│ ├── write.py # create / upsert / delete orchestration -│ ├── read.py # search / search_sql orchestration (engine call, -│ │ # format dispatch, pagination links) -│ ├── streaming.py # per-format byte-yielding writers used by read.py -│ └── dump.py # multi-shard stream-concat over async httpx -│ # (drives /datastore/dump for >1 GB CSV/NDJSON) -│ -└── infrastructure/ # Adapters to outside systems - ├── cache.py # InMemoryCache + RedisCache (CachePort protocol) - ├── ckan_client.py # CKAN action API client (httpx-backed). Built in - │ # lifespan only when AUTH_TYPE=ckan; otherwise None. - └── engines/ # Storage backends — one subpackage per engine - ├── base.py # DatastoreBackend ABC + result dataclasses - ├── registry.py # get_datastore_engine + get_allowed_sql_functions; - │ # dynamic importlib dispatch keyed on - │ # context.config.DATASTORE_ENGINE - ├── bigquery/ # Engine package (one folder per backend). - | ├── __init__.py # Exports `Backend = BigQueryBackend` — - | | # the registry imports `Backend`, so the - | | # concrete class name is engine-private. - | ├── backend.py # DatastoreBackend subclass - | ├── client.py # google-cloud-bigquery `Client` construction - | ├── lib.py # Backend-specific helpers - | ├── metadata.py # _table_metadata table — Frictionless schema + unique_key - | ├── search.py # SQL builder for datastore_search - | ├── types.py # Frictionless → BigQuery type map - | └── allowed_functions.txt # Per-engine datastore_search_sql - | # function allow-list — one name per - | # line, `#` comments allowed. - └── ducklake/ # Future planned engine - -postman/ # Importable Postman collection -├── collection.json # Auto-generated from example_payload/ -└── generate_postman.py # Generator script (regenerate after edits) -``` - -To add a new engine (e.g. `ducklake`), drop a sibling folder following -the same layout (`__init__.py` exports `Backend = `, -`backend.py` subclasses `DatastoreBackend`, plus an `allowed_functions.txt`). -`DATASTORE_ENGINE` is validated against the set of engine subdirectories -that exist at process start, and the factory imports each engine's -`Backend` via `importlib` — no `registry.py` / `config.py` edits. - -## Column definitions - -**Goal:** make Frictionless schema the native column shape while staying -drop-in compatible with existing CKAN clients during migration. - -`datastore_create` accepts one of two input shapes: - -| Shape | Keys | Status | -|---|---|---| -| Frictionless `schema` | `schema` — [Frictionless Table Schema](https://specs.frictionlessdata.io/table-schema/) | Recommended | -| Legacy CKAN `fields` | `fields`, `primary_key` | Deprecated; emits a `warnings` entry | - - -## Roadmap - -What's shipped and what's next. Tick each box as the change set lands. - -### Done - -- [x] Foundation (app factory, lifespan, middleware, Dockerfile, Makefile, env config) -- [x] All six `datastore_*` actions wired end-to-end: - - `datastore_create`, `datastore_upsert`, `datastore_delete` - - `datastore_search` (streaming JSON / CSV / TSV; CKAN `_links` pagination) - - `datastore_search_sql` (sqlglot parses tables + functions; per-table - CKAN authorize; per-engine function allow-list) - - `datastore_info` (column schema + free-form `meta` dict) -- [x] `GET /datastore/dump/?format=csv|ndjson|parquet` — full-table download - via BigQuery `EXPORT DATA`. **1 shard** (≤1 GB CSV/NDJSON, or any Parquet ≤1 GB): - 302 to a GCS signed URL (server out of the byte path). **N shards** (>1 GB CSV/NDJSON): - server stream-concats shards via async httpx (~64 KiB peak memory, no threadpool). - Parquet >1 GB returns 413 (parquet shards can't be byte-concatenated). Results are - cached in GCS keyed by `table.modified`; unchanged tables skip the extract entirely, - and stale revisions are GC'd on the next cache miss so storage stays bounded to one - rev per `(resource_id, format)`. - `/ready` builds the rw + ro engine instances during lifespan and probes - `engine.healthcheck()` on each — 503 with a `Service Unavailable` envelope - if either fails (so k8s pulls the pod from the Service). -- [x] Strict request validation (Pydantic) + structured error envelopes -- [x] CKAN auth gate with TTL cache (InMemory by default; Redis when `REDIS_URL` is set) -- [x] Request context bundle (`RequestContext` / `ContextDep` / bound `CKANClient`) -- [x] Service / engine / streaming layer separation -- [x] Engine-agnostic registry — drop a folder under `infrastructure/engines//` - exporting `Backend`; `DATASTORE_ENGINE` is validated against engine directories - on disk, no registry / config edit required. -- [x] Real BigQuery backend (replace the placeholder in `infrastructure/engines/bigquery/backend.py`) - -### Next -- [ ] Observability — JSON structured logs + request-id middleware -- [ ] Opt-in query-result cache (deferred until BigQuery lands) -- [ ] DuckLake backend (future planned engine) - - - -## Auth - -`AUTH_TYPE` selects the provider; each lives at `datastore/auth//`. - -| AUTH_TYPE | What it does | Required env | -|---|---|---| -| `ckan` (default) | Calls CKAN `/api/3/action/datastore_authorize` per request. TTL-cached inside the provider so we don't hit CKAN repeatedly. | `CKAN_URL` | -| `jwt` | Verifies the bearer JWT signature + optional `aud` / `iss`. No external service. | `JWT_SECRET` (HS*) or `JWT_PUBLIC_KEY` (RS*/ES*) | -| `anonymous` | Allows every call; no identity. Local dev / CI without auth. | _(none)_ | - -The orchestration in `datastore/api/auth.py` is provider-agnostic — it -owns only the boundary policy (permission whitelist, `resource_id` XOR -`package_id` rule, and the anonymous-read rule: `permission=read` calls -forward to the provider without a credential; everything else -hard-fails when the `Authorization` header is missing). - -**CKAN provider.** Uses the `datastore_authorize` action, which is **not -part of stock CKAN** — it ships in the -[`ckanext-datastore-authz`](https://github.com/datopian/ckanext-datastore-authz) -extension. Before pointing this service at a CKAN instance, install -the extension and confirm the action is reachable: - -```sh -curl -s "$CKAN_URL/api/3/action/datastore_authorize" \ - -H "Authorization: $CKAN_API_KEY" \ - -H 'Content-Type: application/json' \ - -d '{"resource_id": ""}' | jq -``` - -A CKAN envelope with `success: true` and a `result.{package, resource}` -body means you're set. 404 means the extension isn't enabled in -`ckan.plugins`. - -**Adding a new provider.** Drop `datastore/auth//` with an -`__init__.py` exporting `Provider = ` and a `provider.py` -implementing the `AuthProvider` Protocol (`base.py`). No registry edit -required — `AUTH_TYPE` is validated against the directories on disk at -startup, same auto-discovery as `DATASTORE_ENGINE`. - -**Standalone caveat.** `datastore_create` accepts two shapes: -`resource_id` (table name only) and `resource` (a CKAN resource dict — -the service calls `ckan.resource_create(...)` first, then writes the -datastore table). The dict form is only valid under `AUTH_TYPE=ckan`; -under JWT / anonymous it's rejected with a clear validation error. - - - -## Development setup +## Quick start Requires Python 3.12+. ```sh -# Install dependencies (editable, with dev tools) -pip install -e ".[dev]" - -# Run dev server -uvicorn datastore.main:app --reload - - - -# Run tests -pytest +pip install -e ".[dev]" # install (editable, with dev tools) +uvicorn datastore.main:app --reload # run dev server +pytest # run tests ``` -Dependencies live in `pyproject.toml` (`[project].dependencies` and `[project.optional-dependencies].dev`). +Open `http://localhost:8000/docs` for interactive API docs. -## Env vars +## Configuration -Every entry below maps 1:1 to a field on `datastore.core.config.Config`. See [.env.example](.env.example) for a copy-and-fill template. +All settings are environment variables mapping 1:1 to `datastore.core.config.Config`. +Copy [.env.example](.env.example) and fill it in. The essentials: -| Name | Default | Purpose | +| Var | Default | Purpose | |---|---|---| -| `APP_MESSAGE` | `"Datastore API"` | Banner returned by `GET /` | -| `MAX_REQUEST_BODY_MB` | `50` | Reject request bodies larger than this (MB) | -| `DATASTORE_ENGINE` | `bigquery` | Storage backend — must match a folder under `infrastructure/engines/`; validated at startup | -| `SQL_FUNCTIONS_ALLOW_FILE` | _(empty)_ | Override path to the `datastore_search_sql` function allow-list; defaults to `/allowed_functions.txt` | -| `BIGQUERY_PROJECT` | _(empty)_ | Google Cloud project ID. Required when `DATASTORE_ENGINE=bigquery`; unset → `/ready` returns 503 with a clear warning. | -| `BIGQUERY_DATASET` | _(empty)_ | BigQuery dataset that holds per-resource tables + the engine-managed `_table_metadata`. Required when `DATASTORE_ENGINE=bigquery`; unset → metadata store is disabled and writes fall through to placeholder mode. | -| `BIGQUERY_CREDENTIALS` | _(empty)_ | Read-write service-account creds. Accepts a JSON blob (leading `{`), a path to a service-account JSON file, or empty (→ Application Default Credentials). | -| `BIGQUERY_CREDENTIALS_RO` | _(empty)_ | Read-only service-account creds (same format). Empty → falls back to `BIGQUERY_CREDENTIALS` so single-credential deployments work. | -| `BIGQUERY_USE_QUERY_CACHE` | `true` | Use BigQuery's 24h query-results cache on `datastore_search` / `datastore_search_sql` / `datastore_info`. Identical SELECTs return free + fast on cache hits. Set `false` to force a fresh scan. | -| `BIGQUERY_EXPORT_BUCKET` | _(empty)_ | GCS bucket name (no `gs://` prefix) that `/datastore/dump/` writes `EXPORT DATA` shards into. Required when the dump endpoint is in use. **Credential model: ro reads, rw writes.** RO SA (`BIGQUERY_CREDENTIALS_RO`) does the BigQuery `get_table` and the initial GCS `list_blobs` cache lookup. RW SA (`BIGQUERY_CREDENTIALS`) runs `EXPORT DATA` (it writes shards under its own identity), does GC `delete`, and signs URLs. **RO SA perms:** `bigquery.tables.get` + `storage.objects.list`. **RW SA perms:** `bigquery.jobs.create` + `bigquery.tables.export` + `bigquery.tables.getData` + `storage.objects.{create,list,delete}` + `iam.serviceAccountTokenCreator` (for V4 signing under workload identity). A 24h object-lifecycle rule on the bucket is recommended as a safety net. | -| `BIGQUERY_EXPORT_URL_EXPIRY_HOURS` | `1` | Signed-URL TTL for dump manifest entries (hours). | -| `REDIS_URL` | _(empty)_ | Redis URL for cache; empty → in-process `InMemoryCache` | -| `CKAN_URL` | _(empty)_ | Base URL of the CKAN instance (required when `AUTH_TYPE=ckan`) | -| `HTTP_TIMEOUT_SECONDS` | `10` | Timeout for outbound CKAN calls (seconds) | -| `AUTH_TYPE` | `ckan` | Auth provider — must match a folder under `datastore/auth/`. Built-in: `ckan`, `jwt`, `anonymous` | -| `AUTH_CACHE_TTL` | `10` | TTL for cached auth decisions (seconds) | -| `JWT_ALGORITHM` | `HS256` | JWT signing algorithm. HS* uses `JWT_SECRET`; RS*/ES* uses `JWT_PUBLIC_KEY` | -| `JWT_SECRET` | _(empty)_ | HS* shared secret. Required when `AUTH_TYPE=jwt` and `JWT_ALGORITHM=HS*` | -| `JWT_PUBLIC_KEY` | _(empty)_ | RS*/ES* PEM-encoded public key. Required for RS*/ES* | -| `JWT_AUDIENCE` | _(empty)_ | Expected `aud` claim. Empty = skip audience check | -| `JWT_ISSUER` | _(empty)_ | Expected `iss` claim. Empty = skip issuer check | -| `LOG_LEVEL` | `INFO` | Stdlib logging level (`DEBUG` / `INFO` / `WARNING` / `ERROR` / `CRITICAL`) | +| `DATASTORE_ENGINE` | `bigquery` | Storage backend (folder under `datastore/infrastructure/engines/`) | +| `AUTH_TYPE` | `ckan` | Auth provider: `ckan` · `jwt` · `anonymous` | +| `CKAN_URL` | — | CKAN base URL (required when `AUTH_TYPE=ckan`) | +| `BIGQUERY_PROJECT` / `BIGQUERY_DATASET` | — | Required when `DATASTORE_ENGINE=bigquery` | +| `REDIS_URL` | — | Cache backend; empty → in-process cache | -## API Documentation +## Documentation - http://localhost:8000/docs - -## Development notes - - -### Adding a new endpoint - -Handler in `datastore/api/endpoints/.py` (parse → call service → return CKAN envelope), request shape in `datastore/schemas/`, business logic in `datastore/services/`. Wire a new file into `datastore/api/routes.py`. - - -### Request context - -Each endpoint takes a single `Context` that bundles the per-request -handles. The bundle wires them together so handlers stay one-liner. - -```python -from datastore.api.context import Context - -@router.post("/datastore_create", response_model=DatastoreCreateResponse) -async def datastore_create( - request: Request, - payload: DatastoreCreateRequest, - context: Context, -): - # Run policy + delegate to the active AuthProvider (CKAN / JWT / - # anonymous). Pass `resource_id` (existing) or `package_id` (new) — - # exactly one. - data_dict = await context.authorize( - resource_id=payload.resource_id, - permission="create", # read | create | update | delete | patch - ) - - # The service does the actual work (engine.create; CKAN resource_create - # when AUTH_TYPE=ckan and the request supplies a `resource` dict). - result = await create_datastore(context, data_dict) - return _success_response(request, result) -``` - -- `context.authorize(...)` — runs the boundary policy and delegates to - the active `AuthProvider`. Returns the `data_dict` shape - `{"resource": , "package": }` ready to merge - with the request payload. -- `context.ckan` — `CKANClient | None`, already bound to the caller's - `api_key`. `None` under non-CKAN auth (standalone). Code paths that - need CKAN must guard for `None`. -- `context.api_key` — the raw bearer string (parsed from the - `Authorization` header). Provider-internal use; endpoints rarely - touch it. -- `context.auth_provider` — the active provider instance (built once - in the lifespan, stored on `app.state.auth_provider`). -- `context.config` — the loaded `Config`. - - - -### Response envelopes - -Every successful response follows the CKAN shape `{help, success, result}`. The base `ResponseModel` in [datastore/schemas/responses.py](datastore/schemas/responses.py) carries `help` + `success`; each endpoint subclasses it and declares an inner `Result`: - -```python -class DatastoreCreateResponse(ResponseModel): - class Result(BaseModel): - resource_id: str - package_id: str | None = None - # Canonical Frictionless Table Schema (carries `primaryKey` inside). - schema: dict[str, Any] - # Legacy mirror — marked deprecated in OpenAPI / IDE tooltips. - fields: Annotated[ - list[FieldSpec], - Field(deprecated="use 'schema' (Frictionless Table Schema) instead"), - ] - primary_key: Annotated[ - list[str], - Field(deprecated="use 'schema.primaryKey' instead"), - ] - records: list[dict[str, Any]] | None = None # when include_records=True - total: int | None = None # when include_total=True - - result: Result -``` - -Wire-up has three matching pieces — service return type, route `response_model`, and the runtime envelope: - -```python -# service -async def create_datastore(...) -> DatastoreCreateResponse.Result: ... - -# route -@router.post("/datastore_create", response_model=DatastoreCreateResponse) -async def datastore_create(...): - return _success_response(request, await create_datastore(...)) -``` - -`_success_response` wraps the `Result` into the full `{help, success, result}` envelope. `response_model=...` makes `/docs` document the contract; the service return type lets mypy catch drift. - -Endpoints that aren't implemented yet `raise HTTPException(status_code=501, …)` — the error handler converts that to a CKAN error envelope with `__type: "Not Implemented"`. - -### Adding a new env var - -1. Add a `Field(default=..., description=...)` to `Config` in [datastore/core/config.py](datastore/core/config.py) (with bounds where appropriate: `ge=`, `le=`, `Literal[...]`). -2. Mirror the var in `.env.example` with a safe default and a one-line comment. -3. Document it in the "Env vars" table above. - -### Raising errors - -Endpoints (and services they call) should raise from `datastore/core/exceptions.py` — never return error envelopes by hand: - -```python -from datastore.core.exceptions import NotFoundError, AuthorizationError, ValidationError - -raise NotFoundError(f"resource '{rid}' not found") -``` - -`datastore/api/error_handlers.py` converts each `APIError` subclass to the matching CKAN envelope + status code. - -### Testing - -Tests live in [tests/](tests/), organised by what they exercise: - -``` -tests/ -├── conftest.py # FakeCKAN + InMemoryCache + TestClient fixture -├── test_health.py # /, /health, /ready -├── test_datastore_*.py # End-to-end per endpoint (TestClient) -├── test_read_service.py # Direct service calls — no HTTP -├── test_write_service.py -│ -├── auth/ # Auth layer — one folder per provider -│ ├── test_base.py # Decision + default_key_id -│ ├── test_registry.py # AUTH_TYPE dispatch -│ ├── test_orchestration.py # api/auth.py boundary policy -│ ├── ckan/test_provider.py # CKAN provider + TTL cache -│ ├── jwt/test_provider.py # JWT signature / aud / iss / exp -│ └── anonymous/test_provider.py -│ -└── engines/ - ├── bigquery/test_*.py # Real BigQuery backend, fully mocked - └── ducklake/ # (placeholder for future engine) -``` +- **[API.md](API.md)** — full API reference (endpoints, request/response, examples). +- **`GET /docs`** — interactive Swagger UI (and `/redoc`, `/openapi.json`). +- **[CLAUDE.md](CLAUDE.md)** — architecture, design decisions, and layout. -The `client` fixture in `conftest.py` wires up `FakeCKAN` (in-memory -CKAN stand-in) and an `InMemoryCache` via `app.dependency_overrides`, -and installs a `CKANAuthProvider` backed by the fake. No real network -calls. `FakeCKAN` exposes `add_resource(...)`, `add_package(...)`, -`deny(api_key)` and an `authorize_calls` counter to assert cache -behaviour. +## License +See repository. diff --git a/datastore/api/endpoints/datastore.py b/datastore/api/endpoints/datastore.py index b3567ac..f8617b1 100644 --- a/datastore/api/endpoints/datastore.py +++ b/datastore/api/endpoints/datastore.py @@ -45,7 +45,7 @@ @router.post( "/datastore_create", response_model=DatastoreCreateResponse, - summary="Declare a resource (and optionally seed rows)", + summary="Create a datastore table and optionally insert rows", ) async def datastore_create( request: Request, @@ -90,7 +90,7 @@ async def datastore_create( @router.post( "/datastore_upsert", response_model=DatastoreUpsertResponse, - summary="Insert / update / upsert rows", + summary="Insert / update / upsert records in a datastore table", ) async def datastore_upsert( request: Request, @@ -110,7 +110,7 @@ async def datastore_upsert( @router.post( "/datastore_delete", response_model=DatastoreDeleteResponse, - summary="Delete rows, drop columns, or drop the table", + summary="Delete rows, drop columns, or drop the datastore table", ) async def datastore_delete( request: Request, @@ -137,7 +137,7 @@ async def datastore_delete( @router.get( "/datastore_search", response_model=DatastoreSearchResponse, - summary="Search a resource (streaming)", + summary="Search a datastore table (filters, full-text, sort, paging)", ) async def datastore_search( request: Request, @@ -165,7 +165,7 @@ async def datastore_search( @router.get( "/datastore_search_sql", response_model=DatastoreSearchResponse, - summary="Run a read-only SQL SELECT (streaming)", + summary="Query datastore tables with a read-only SQL SELECT", ) async def datastore_search_sql( request: Request, @@ -191,7 +191,7 @@ async def datastore_search_sql( @router.get( "/datastore_info", response_model=DatastoreInfoResponse, - summary="Get a resource's schema + row stats", + summary="Get a resource's schema and metadata", ) async def datastore_info( request: Request, diff --git a/datastore/api/endpoints/dump.py b/datastore/api/endpoints/dump.py index d326948..b319ede 100644 --- a/datastore/api/endpoints/dump.py +++ b/datastore/api/endpoints/dump.py @@ -38,7 +38,7 @@ @router.get( "/datastore/dump/{resource_id}", - summary="Download a whole resource", + summary="Download an entire table (CSV / NDJSON / Parquet)", responses={ 302: {"description": "Single-shard export — redirect to a signed GCS URL."}, 200: {"description": "Multi-shard export — streamed CSV / NDJSON body."}, From 374170f089caf9d60b3e23b647300e480691fa67 Mon Sep 17 00:00:00 2001 From: Sagar Ghimire Date: Wed, 27 May 2026 16:48:47 +0545 Subject: [PATCH 4/4] fix (doc): update resource id as ckan resource id --- API.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/API.md b/API.md index 872224d..d1b04d6 100644 --- a/API.md +++ b/API.md @@ -114,7 +114,7 @@ round-tripped by `datastore_info`. ```json { - "resource_id": "balancing_auction_results_2025", + "resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "schema": { "fields": [ {"name": "auction_id", "type": "integer", "info": {"title": "Auction ID"}}, @@ -141,7 +141,7 @@ round-tripped by `datastore_info`. "help": "...", "success": true, "result": { - "resource_id": "balancing_auction_results_2025", + "resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "fields": [{"id": "auction_id", "type": "integer", "info": {"...": "..."}}, "..."], "schema": {"fields": ["..."], "primaryKey": ["auction_id", "product_code"]}, "primary_key": ["auction_id", "product_code"] @@ -179,7 +179,7 @@ never carries it. ```json { - "resource_id": "balancing_auction_results_2025", + "resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "method": "upsert", "records": [ {"auction_id": 144, "product_code": "DCL", "clearing_price_gbp_per_mwh": 48.05, "accepted": true} @@ -191,7 +191,7 @@ never carries it. ```json { "help": "...", "success": true, - "result": {"resource_id": "balancing_auction_results_2025", "method": "upsert"} } + "result": {"resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "method": "upsert"} } ``` --- @@ -216,7 +216,7 @@ Three modes (`filters` and `fields` are mutually exclusive): ### Request ```json -{ "resource_id": "balancing_auction_results_2025", +{ "resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "filters": {"auction_id": 144, "accepted": false} } ``` @@ -224,7 +224,7 @@ Three modes (`filters` and `fields` are mutually exclusive): ```json { "help": "...", "success": true, - "result": {"resource_id": "balancing_auction_results_2025"} } + "result": {"resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937"} } ``` On a **column drop**, `result` also carries `schema` — the Frictionless schema @@ -234,7 +234,7 @@ follow-up `datastore_info`: ```json { "help": "...", "success": true, "result": { - "resource_id": "balancing_auction_results_2025", + "resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "fields": ["bidder_metadata"], "schema": {"fields": [{"name": "auction_id", "type": "integer"}, "..."], "primaryKey": ["auction_id", "product_code"]} @@ -268,7 +268,7 @@ Parameterised search; the response is **streamed** (peak memory ≈ one row). ``` GET /api/3/action/datastore_search - ?resource_id=balancing_auction_results_2025 + ?resource_id=c6153a74-43cb-4edf-8bdf-bb664feca937 &filters={"product_code":"DCL","accepted":true} &sort=delivery_start desc &limit=100 @@ -318,7 +318,7 @@ are checked against the engine's allow-list. Include a `LIMIT` (required). ``` GET /api/3/action/datastore_search_sql?sql= SELECT product_code, AVG(clearing_price_gbp_per_mwh) AS avg_price - FROM "balancing_auction_results_2025" + FROM "c6153a74-43cb-4edf-8bdf-bb664feca937" WHERE accepted = true GROUP BY product_code LIMIT 1000 @@ -367,7 +367,7 @@ row stats — a column-level metadata catalog without a side store. ], "schema": {"fields": ["..."], "primaryKey": ["auction_id", "product_code"]}, "meta": { - "resource_id": "balancing_auction_results_2025", + "resource_id": "c6153a74-43cb-4edf-8bdf-bb664feca937", "primary_key": ["auction_id", "product_code"], "total": 18420 }