Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ BIGQUERY_PROJECT=
BIGQUERY_DATASET=
BIGQUERY_CREDENTIALS=
BIGQUERY_CREDENTIALS_RO=
# Use BigQuery's built-in 24h query-results cache on read paths
# (datastore_search / datastore_search_sql / datastore_info). Identical
# SELECTs return free + fast on cache hits. False = force fresh scan.
BIGQUERY_USE_QUERY_CACHE=true
BIGQUERY_EXPORT_BUCKET=
BIGQUERY_EXPORT_URL_EXPIRY_HOURS=1
SQL_FUNCTIONS_ALLOW_FILE=

# --- Auth ---
Expand Down
48 changes: 47 additions & 1 deletion CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,14 +424,60 @@ Each endpoint takes a single `ContextDep`. The handler calls `context.authorize(
| GET | `/api/3/action/datastore_search` | **implemented** (streaming) | `DatastoreSearchRequest` | `DatastoreSearchResponse` |
| GET | `/api/3/action/datastore_search_sql` | **implemented** (streaming) | `DatastoreSearchSQLRequest` | `DatastoreSearchResponse` |
| GET | `/api/3/action/datastore_info` | **implemented** | `DatastoreInfoRequest` | `DatastoreInfoResponse` |
| GET | `/datastore/dump/{resource_id}` | **implemented** | `format=csv\|ndjson\|parquet` | 302 → GCS *or* streaming body (see §5.3) |

The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, `_table_metadata` for Frictionless schema + unique_key round-trip, and a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`. The DuckLake engine is the next concrete adapter — see §7.
The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, `_table_metadata` for Frictionless schema + unique_key round-trip, a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`, and `EXPORT DATA`-backed dump with `table.modified`-keyed GCS caching. The DuckLake engine is the next concrete adapter — see §7.

`datastore_create` accepts two shapes:

- `resource_id` — table name only. Works under any `AUTH_TYPE`.
- `resource` (dict) — calls `ckan.resource_create(...)` first to materialise a CKAN resource, then writes the datastore table. **Only valid under `AUTH_TYPE=ckan`**; the endpoint rejects this shape with a `Validation Error` under JWT / anonymous since there's no CKAN to land it.

### 5.3 `GET /datastore/dump/{resource_id}`

Full-table download, **one URL → one file** from the caller's point of view. Bytes never sit in API memory — small dumps redirect to GCS, large dumps stream-concat through async httpx.

Pipeline:

1. **Resolve cache key** — read `table.modified` from BigQuery, compute `rev = hex(microsec_epoch(modified))`, prefix becomes `dumps/<rid>/<fmt>/<rev>`.
2. **GCS cache lookup** — `list_blobs(prefix=…)`. Non-empty → skip steps 3-5; log `cache HIT`.
3. **Submit `EXPORT DATA`** — Parquet → single-file URI `gs://<bucket>/<prefix>.parquet`; CSV/NDJSON → wildcard URI `gs://<bucket>/<prefix>_*.<ext>` so BigQuery shards >1 GB exports. The SELECT casts `TIMESTAMP` + `DATETIME` columns to ISO 8601 for CSV/NDJSON; Parquet keeps native types.
4. **Poll non-blockingly** — `await asyncio.to_thread(job.reload)` + `await asyncio.sleep(_DUMP_POLL_INTERVAL_SECONDS)` between iterations. Worker thread is held only during the brief `reload` call, not the wait.
5. **GC stale revisions** — after a successful extract, sweep `dumps/<rid>/<fmt>/` and delete any blob that doesn't start with the current `prefix`. Best-effort, failures logged. Storage stays bounded to one rev per `(rid, fmt)`.
6. **Sign URLs** — V4 signed URLs with `response-content-disposition: attachment; filename="<rid>.<ext>"` (single shard) or `<rid>_NN.<ext>` (multi-shard, 1-indexed, zero-padded). Signing offloaded to a thread (IAM round-trip under workload identity).
7. **Return**:
- 1 URL → `RedirectResponse(302)`. Bytes flow GCS → client; server is **out of the byte path**.
- N URLs → `StreamingResponse` over `services.dump.stream_*_shards` (async httpx, 64 KiB chunks, serial shard walk, CSV header-dedup via `_skip_first_line`, NDJSON pure byte-concat).

Per-stream resource profile (multi-shard branch): ~64 KiB resident memory, **0** worker threads, byte-copy CPU only, async cancellation propagates from client disconnect → httpx → GCS connection released.

Errors:
- Parquet >1 GB → `EXPORT DATA` job fails with a "single URI / wildcard" message; classifier in `_is_export_too_large` flips it to `PayloadTooLargeError` (413). Caller switches to `format=csv` or `format=ndjson`.
- Any other BigQuery / GCS failure → `ServerError` (500) with the upstream message.
- `BIGQUERY_EXPORT_BUCKET` unset → `ServerError` at request time (the lifespan doesn't fail-fast because dump is an optional capability).

Required IAM. Dump follows a strict **ro for reading, rw for writing/updating** model — see [bigquery/client.py](datastore/infrastructure/engines/bigquery/client.py) `load_credentials` + `_build_bq_client` / `_build_storage_client` on the backend:

| Step | Identity | Why |
|---|---|---|
| `get_table` | RO BQ (`self.client`) | Reading BigQuery metadata. |
| `list_blobs` cache lookup | RO GCS | Reading GCS objects. |
| `client.query("EXPORT DATA …")` | RW BQ (built on demand) | BigQuery writes shards to GCS under this SA's identity — it's a write op even though the SQL surface is `SELECT`. |
| Post-extract `list_blobs` refresh | RW GCS | Blobs are passed straight to `generate_signed_url` next; we want them bound to the rw client. |
| `delete` (GC) | RW GCS | Writing/deleting objects. |
| `generate_signed_url` | RW GCS | Under workload identity this calls IAM `signBlob`, which typically only the rw SA holds via `iam.serviceAccountTokenCreator`. |

Concrete perm sets:

- **RO SA** (`BIGQUERY_CREDENTIALS_RO`) — `bigquery.tables.get` + `storage.objects.list`.
- **RW SA** (`BIGQUERY_CREDENTIALS`) — `bigquery.jobs.create` + `bigquery.tables.export` + `bigquery.tables.getData` + `storage.objects.{create,list,delete}` + `iam.serviceAccountTokenCreator`.

A single SA works if both perm sets land on the same identity — `BIGQUERY_CREDENTIALS_RO` empty falls through to ADC; same env var can drive both. `_build_bq_client` and `_build_storage_client` on the backend are deliberately small + stub-friendly so tests inject mocks without monkey-patching `google.cloud.*` globally.

A 24h object-lifecycle rule on the bucket is a useful belt-and-braces: the engine GCs older revs already, but lifecycle catches anything stranded by a crashed dump.

The GCS client is built with the same credentials as the BigQuery client for the active engine mode (`load_credentials(config, mode)` in [bigquery/client.py](datastore/infrastructure/engines/bigquery/client.py)). Without this shim, a service-account JSON loaded via `BIGQUERY_CREDENTIALS_RO` would drive BigQuery but `storage.Client(...)` would silently fall back to ADC — a near-invisible identity split. Workload identity / `GOOGLE_APPLICATION_CREDENTIALS`-style setups still work because `load_credentials` returns `None` for ADC and the storage client follows the same default-credentials path.

---

## 6. Request / Response Contracts
Expand Down
19 changes: 16 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ datastore/
│ ├── error_handlers.py # Exception handlers (APIError → CKAN error envelope)
│ └── endpoints/ # Route handlers, one file per resource group
│ ├── health.py # /, /health, /ready
│ └── datastore.py # /api/3/action/datastore_*
│ ├── datastore.py # /api/3/action/datastore_*
│ └── dump.py # /datastore/dump/<resource_id> (302 single / stream multi)
├── auth/ # Pluggable auth providers — one subpackage per type
│ ├── base.py # AuthProvider Protocol + Decision dataclass +
Expand All @@ -60,7 +61,9 @@ datastore/
│ ├── write.py # create / upsert / delete orchestration
│ ├── read.py # search / search_sql orchestration (engine call,
│ │ # format dispatch, pagination links)
│ └── streaming.py # per-format byte-yielding writers used by read.py
│ ├── streaming.py # per-format byte-yielding writers used by read.py
│ └── dump.py # multi-shard stream-concat over async httpx
│ # (drives /datastore/dump for >1 GB CSV/NDJSON)
└── infrastructure/ # Adapters to outside systems
├── cache.py # InMemoryCache + RedisCache (CachePort protocol)
Expand Down Expand Up @@ -124,7 +127,14 @@ What's shipped and what's next. Tick each box as the change set lands.
- `datastore_search_sql` (sqlglot parses tables + functions; per-table
CKAN authorize; per-engine function allow-list)
- `datastore_info` (column schema + free-form `meta` dict)
- [x] Health endpoints `/`, `/health`, `/ready` returning the CKAN envelope shape.
- [x] `GET /datastore/dump/<resource_id>?format=csv|ndjson|parquet` — full-table download
via BigQuery `EXPORT DATA`. **1 shard** (≤1 GB CSV/NDJSON, or any Parquet ≤1 GB):
302 to a GCS signed URL (server out of the byte path). **N shards** (>1 GB CSV/NDJSON):
server stream-concats shards via async httpx (~64 KiB peak memory, no threadpool).
Parquet >1 GB returns 413 (parquet shards can't be byte-concatenated). Results are
cached in GCS keyed by `table.modified`; unchanged tables skip the extract entirely,
and stale revisions are GC'd on the next cache miss so storage stays bounded to one
rev per `(resource_id, format)`.
`/ready` builds the rw + ro engine instances during lifespan and probes
`engine.healthcheck()` on each — 503 with a `Service Unavailable` envelope
if either fails (so k8s pulls the pod from the Service).
Expand Down Expand Up @@ -221,9 +231,12 @@ Every entry below maps 1:1 to a field on `datastore.core.config.Config`. See [.e
| `DATASTORE_ENGINE` | `bigquery` | Storage backend — must match a folder under `infrastructure/engines/`; validated at startup |
| `SQL_FUNCTIONS_ALLOW_FILE` | _(empty)_ | Override path to the `datastore_search_sql` function allow-list; defaults to `<engine>/allowed_functions.txt` |
| `BIGQUERY_PROJECT` | _(empty)_ | Google Cloud project ID. Required when `DATASTORE_ENGINE=bigquery`; unset → `/ready` returns 503 with a clear warning. |
| `BIGQUERY_DATASET` | _(empty)_ | BigQuery dataset that holds per-resource tables + the engine-managed `_table_metadata`. Required when `DATASTORE_ENGINE=bigquery`; unset → metadata store is disabled and writes fall through to placeholder mode. |
| `BIGQUERY_CREDENTIALS` | _(empty)_ | Read-write service-account creds. Accepts a JSON blob (leading `{`), a path to a service-account JSON file, or empty (→ Application Default Credentials). |
| `BIGQUERY_CREDENTIALS_RO` | _(empty)_ | Read-only service-account creds (same format). Empty → falls back to `BIGQUERY_CREDENTIALS` so single-credential deployments work. |
| `BIGQUERY_USE_QUERY_CACHE` | `true` | Use BigQuery's 24h query-results cache on `datastore_search` / `datastore_search_sql` / `datastore_info`. Identical SELECTs return free + fast on cache hits. Set `false` to force a fresh scan. |
| `BIGQUERY_EXPORT_BUCKET` | _(empty)_ | GCS bucket name (no `gs://` prefix) that `/datastore/dump/<rid>` writes `EXPORT DATA` shards into. Required when the dump endpoint is in use. **Credential model: ro reads, rw writes.** RO SA (`BIGQUERY_CREDENTIALS_RO`) does the BigQuery `get_table` and the initial GCS `list_blobs` cache lookup. RW SA (`BIGQUERY_CREDENTIALS`) runs `EXPORT DATA` (it writes shards under its own identity), does GC `delete`, and signs URLs. **RO SA perms:** `bigquery.tables.get` + `storage.objects.list`. **RW SA perms:** `bigquery.jobs.create` + `bigquery.tables.export` + `bigquery.tables.getData` + `storage.objects.{create,list,delete}` + `iam.serviceAccountTokenCreator` (for V4 signing under workload identity). A 24h object-lifecycle rule on the bucket is recommended as a safety net. |
| `BIGQUERY_EXPORT_URL_EXPIRY_HOURS` | `1` | Signed-URL TTL for dump manifest entries (hours). |
| `REDIS_URL` | _(empty)_ | Redis URL for cache; empty → in-process `InMemoryCache` |
| `CKAN_URL` | _(empty)_ | Base URL of the CKAN instance (required when `AUTH_TYPE=ckan`) |
| `HTTP_TIMEOUT_SECONDS` | `10` | Timeout for outbound CKAN calls (seconds) |
Expand Down
67 changes: 67 additions & 0 deletions datastore/api/endpoints/dump.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
"""`GET /datastore/dump/{resource_id}` — single download for a table.

Behaviour by shard count (decided by BigQuery from the export size):

- **1 shard** (≤ 1 GB, or any-size Parquet): 302 redirect to the
GCS signed URL. Zero server bandwidth — bytes go GCS → client.
- **N shards** (>1 GB CSV/NDJSON): `StreamingResponse` over
`services.dump.stream_*_shards`, which pulls each shard from GCS
via async httpx and byte-forwards (CSV header-dedup; NDJSON pure
concat). Memory ≈ one chunk in flight; no threadpool consumption.

Parquet >1 GB is refused upstream with 413 (parquet shards can't be
byte-concatenated). Caller picks CSV/NDJSON.
"""

from __future__ import annotations

from typing import Annotated, Literal

from fastapi import APIRouter, Query
from starlette.responses import RedirectResponse, StreamingResponse

from datastore.api.context import Context
from datastore.infrastructure.engines import get_datastore_engine
from datastore.services.dump import stream_csv_shards, stream_ndjson_shards

DumpFormat = Literal["csv", "ndjson", "parquet"]

_MEDIA_TYPE: dict[str, str] = {
"csv": "text/csv",
"ndjson": "application/x-ndjson",
"parquet": "application/vnd.apache.parquet",
}

router = APIRouter(tags=["dump"])


@router.get("/datastore/dump/{resource_id}")
async def dump(
context: Context,
resource_id: str,
fmt: Annotated[DumpFormat, Query(alias="format")] = "csv",
):
await context.authorize(resource_id=resource_id, permission="read")
engine = get_datastore_engine(context, mode="ro")

urls = await engine.dump(resource_id, fmt)

if len(urls) == 1:
return RedirectResponse(url=urls[0], status_code=302)

if fmt == "csv":
body = stream_csv_shards(urls)
elif fmt == "ndjson":
body = stream_ndjson_shards(urls)
else: # pragma: no cover — Parquet never returns >1 shard
raise RuntimeError(f"unexpected multi-shard format: {fmt}")

return StreamingResponse(
body,
media_type=_MEDIA_TYPE[fmt],
headers={
"Content-Disposition": (
f'attachment; filename="{resource_id}.{fmt}"'
),
},
)
5 changes: 3 additions & 2 deletions datastore/api/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

from fastapi import APIRouter

from datastore.api.endpoints import datastore, health
from datastore.api.endpoints import datastore, dump, health

api_router = APIRouter()
api_router.include_router(health.welcome_router)
api_router.include_router(health.probe_router)
api_router.include_router(health.probe_router, prefix="/api/3/action")
api_router.include_router(datastore.router, prefix="/api/3/action")
api_router.include_router(datastore.router, prefix="/api/3/action")
api_router.include_router(dump.router)
28 changes: 10 additions & 18 deletions datastore/auth/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,10 @@

from __future__ import annotations

import base64
import hashlib
from dataclasses import dataclass
from typing import Any, Protocol

import orjson


@dataclass(slots=True, frozen=True)
class Decision:
Expand Down Expand Up @@ -47,20 +44,15 @@ def key_id(self, credential: str) -> str:


def default_key_id(credential: str) -> str:
"""JWT `jti` if the credential is a JWT; sha256 prefix otherwise.

Shared by providers that accept either opaque or JWT tokens.
"""sha256 prefix of the full credential string.

Security note: deliberately ignores any embedded JWT `jti` claim. An
unverified `jti` from the token's payload can be forged to collide
with a cached authorization decision for a different (verified)
token — the cache lookup is keyed before signature verification, so
a forged `jti:<value>` lookup would return the cached decision for
the legitimate user with the same `jti`. Hashing the whole
credential keeps the cache identity tied to bytes-on-the-wire and
makes any collision strictly equivalent to a sha256 collision.
"""
parts = credential.split(".")
if len(parts) == 3:
try:
segment = parts[1]
padded = segment + "=" * (-len(segment) % 4)
payload = orjson.loads(base64.urlsafe_b64decode(padded))
if isinstance(payload, dict):
jti = payload.get("jti")
if isinstance(jti, str) and jti:
return f"jti:{jti}"
except (ValueError, TypeError, orjson.JSONDecodeError):
pass
return "h:" + hashlib.sha256(credential.encode()).hexdigest()[:16]
26 changes: 20 additions & 6 deletions datastore/auth/ckan/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,22 @@ async def authorize(

cached = await _safe_get(self._cache, cache_key)
if cached is not None:
log.debug(
"ckan auth cache HIT scope=%s target=%s perm=%s",
scope, target, permission,
)
return _decision_from_bytes(cached)
try:
decision = _decision_from_bytes(cached)
log.debug(
"ckan auth cache HIT scope=%s target=%s perm=%s",
scope, target, permission,
)
return decision
except (AuthorizationError, ValueError, TypeError) as e:
# Treat a corrupt cache entry as a miss — fall through
# to CKAN. Blocking auth on a poisoned cache would be a
# self-inflicted outage.
log.warning(
"ckan auth cache entry malformed for scope=%s target=%s: "
"%s — falling back to CKAN",
scope, target, e,
)

log.debug(
"ckan auth cache MISS scope=%s target=%s perm=%s -> CKAN",
Expand All @@ -67,8 +78,11 @@ async def authorize(
package_id=package_id,
permission=permission,
)
# `subject` rides through the cache (orjson-serialised). Never
# store the raw credential there — use the same hash we already
# derive for the cache key.
decision = Decision(
subject=credential,
subject=self.key_id(credential) if credential else None,
resource=result.get("resource"),
package=result.get("package"),
)
Expand Down
14 changes: 14 additions & 0 deletions datastore/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,20 @@ def _check_engine_available(cls, v: str) -> str:
"force a fresh scan in tests."
),
)
BIGQUERY_EXPORT_BUCKET: str = Field(
default="",
description=(
"GCS bucket name (no `gs://` prefix) that `/datastore/dump/<rid>` "
),
)
BIGQUERY_EXPORT_URL_EXPIRY_HOURS: int = Field(
default=1,
ge=1,
le=168,
description=(
"Signed-URL TTL for dump manifest entries (hours). Defaults to 1h."
),
)

# Per-row system columns
INCLUDE_UPDATED_AT: bool = Field(
Expand Down
13 changes: 12 additions & 1 deletion datastore/core/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,17 @@ class ConflictError(APIError):
type_label = "Conflict Error"


class PayloadTooLargeError(APIError):
"""Raised when `/datastore/dump/<rid>?format=parquet` exceeds
BigQuery's 1 GB single-file limit. CSV / NDJSON dumps stitch
multiple shards into one download, but Parquet shards can't be
byte-concatenated (each shard has its own footer), so big tables
have to use CSV / NDJSON instead."""

status_code = 413
type_label = "Payload Too Large"


class ServerError(APIError):
status_code = 500
type_label = "Internal Error"
Expand All @@ -50,7 +61,7 @@ class ServerError(APIError):
404: "Not Found Error",
405: "Not Found Error",
409: "Conflict Error",
413: "Validation Error",
413: "Payload Too Large",
422: "Validation Error",
501: "Not Implemented",
}
Loading
Loading