diff --git a/CLAUDE.md b/CLAUDE.md index 56ad551..0b3ba88 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -426,12 +426,14 @@ Each endpoint takes a single `ContextDep`. The handler calls `context.authorize( | GET | `/api/3/action/datastore_info` | **implemented** | `DatastoreInfoRequest` | `DatastoreInfoResponse` | | GET | `/datastore/dump/{resource_id}` | **implemented** | `format=csv\|ndjson\|parquet` | 302 → GCS *or* streaming body (see §5.3) | -The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, `_table_metadata` for Frictionless schema + unique_key round-trip, a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`, and `EXPORT DATA`-backed dump with `table.modified`-keyed GCS caching. The DuckLake engine is the next concrete adapter — see §7. +The BigQuery engine is wired end-to-end: DDL, MERGE-based upsert, DML delete, parameterised search, native table-level metadata (the Frictionless schema + unique_key are JSON-encoded into the table's own `description` OPTION) for the schema round-trip, a row-count fast path via `INFORMATION_SCHEMA.TABLE_STORAGE`, and `EXPORT DATA`-backed dump with `table.modified`-keyed GCS caching. The DuckLake engine is the next concrete adapter — see §7. `datastore_create` accepts two shapes: - `resource_id` — table name only. Works under any `AUTH_TYPE`. -- `resource` (dict) — calls `ckan.resource_create(...)` first to materialise a CKAN resource, then writes the datastore table. **Only valid under `AUTH_TYPE=ckan`**; the endpoint rejects this shape with a `Validation Error` under JWT / anonymous since there's no CKAN to land it. +- `resource` (dict) — calls `ckan.resource_create(...)` first to materialise a CKAN resource, then writes the datastore table. The resource is created with `url_type="datastore"` so CKAN (and the read-only guard below) knows the datastore owns its data. **Only valid under `AUTH_TYPE=ckan`**; the endpoint rejects this shape with a `Validation Error` under JWT / anonymous since there's no CKAN to land it. + +**Read-only guard (`AUTH_TYPE=ckan` only).** `datastore_create`, `datastore_upsert`, and `datastore_delete` refuse to write a resource whose CKAN record carries `url_type="datastore"` unless the request sets `force: true` — a `Validation Error` ("Cannot update a read-only resource. Use \"force\" to force update.") otherwise. This mirrors CKAN's protection against clobbering datastore-managed data by accident. The guard is gated on `AUTH_TYPE=ckan` and skipped entirely under any other provider (only the CKAN provider attaches a resource record). ### 5.3 `GET /datastore/dump/{resource_id}` @@ -813,7 +815,8 @@ GET /api/3/datastore_search_sql?sql= "force": false } ``` -Empty `filters` (or omitted) → the entire table is dropped. +Empty `filters` (or omitted) → the entire table is dropped. Passing `fields` +(mutually exclusive with `filters`) drops those columns instead of rows. **Response** ```json @@ -824,6 +827,22 @@ Empty `filters` (or omitted) → the entire table is dropped. } ``` +When `fields` is supplied (column drop), `result` also carries `schema` — the +Frictionless Table Schema after the listed columns were removed — so the caller +can confirm the table's new shape without a follow-up `datastore_info`: + +```json +{ + "help": "...", + "success": true, + "result": { + "resource_id": "balancing_auction_results_2025", + "fields": ["bidder_metadata"], + "schema": {"fields": [{"id": "auction_id", "type": "integer"}, "..."], "primaryKey": ["auction_id", "product_code"]} + } +} +``` + ### 6.6 `GET /api/3/datastore_info` Returns the same field shape that was supplied to `datastore_create`, including @@ -906,7 +925,7 @@ The original phase plan that used to live here has mostly shipped. This section - [x] **Foundation** — `pyproject.toml`, `Dockerfile`, `Makefile`, `.env.example`, `docker-compose.yml`. App factory + lifespan in [datastore/main.py](datastore/main.py); body-size middleware in [datastore/api/middleware.py](datastore/api/middleware.py); startup log line via `uvicorn.error` showing the active engine + auth provider + cache backend. - [x] **All six `datastore_*` actions wired** — `create`, `upsert`, `delete`, `search`, `search_sql`, `info` mounted via [datastore/api/routes.py](datastore/api/routes.py). Every endpoint authorizes via `context.authorize(...)` and delegates to a service. -- [x] **Real BigQuery backend** — [datastore/infrastructure/engines/bigquery/](datastore/infrastructure/engines/bigquery/) implements DDL, parameterised `search`, MERGE-based `upsert` (`method=upsert` / `insert` / `update`), DML `delete` (whole-table drop, row delete, column drop), parameterised `search_sql`, and `info`. Frictionless schema + `unique_key` round-trip via the `_table_metadata` table. Row counts use the cheap `INFORMATION_SCHEMA.TABLE_STORAGE` fast path when filters don't apply. +- [x] **Real BigQuery backend** — [datastore/infrastructure/engines/bigquery/](datastore/infrastructure/engines/bigquery/) implements DDL, parameterised `search`, MERGE-based `upsert` (`method=upsert` / `insert` / `update`), DML `delete` (whole-table drop, row delete, column drop), parameterised `search_sql`, and `info`. Frictionless schema + `unique_key` round-trip via native table-level metadata — JSON-encoded into the table's own `description` OPTION (no separate metadata table). Row counts use the cheap `INFORMATION_SCHEMA.TABLE_STORAGE` fast path when filters don't apply. - [x] **Streaming search** — [datastore/services/streaming.py](datastore/services/streaming.py) yields the CKAN envelope chunk-by-chunk for all four `records_format` values (`objects`, `lists`, `csv`, `tsv`); CSV/TSV ride the same JSON envelope (records is a multi-line string). Peak memory ≈ 1 row regardless of N. `_links.start` / `_links.next` carry full scheme + host with all non-`offset` params preserved. - [x] **`datastore_search_sql` SQL safety** — schema rejects non-SELECT / multi-statement / unparseable SQL (sqlglot). [datastore/schemas/validators.py](datastore/schemas/validators.py)'s `parse_sql_references` pulls table + function names; endpoint authorizes each table as a `resource_id`; service rejects functions outside the engine's allow-list at `engines//allowed_functions.txt` (overridable via `SQL_FUNCTIONS_ALLOW_FILE`). - [x] **Request validation** — Pydantic models in [datastore/schemas/request.py](datastore/schemas/request.py) with `extra="forbid"`. `datastore_info` / `datastore_delete` accept `resource_id` or `id` (normalised). Pydantic errors → CKAN error envelope with a `fields` map. diff --git a/datastore/api/auth.py b/datastore/api/auth.py index 059bad8..acd4fc2 100644 --- a/datastore/api/auth.py +++ b/datastore/api/auth.py @@ -61,3 +61,25 @@ async def authorize( permission=permission, ) return {"resource": decision.resource or {}, "package": decision.package or {}} + + +def ensure_resource_writable( + resource: dict[str, Any], *, force: bool, auth_type: str +) -> None: + """Block writes to a CKAN datastore-managed resource unless `force`. + + CKAN tags resources whose data the datastore owns with + `url_type="datastore"`; editing one through datastore_create / + _upsert / _delete requires an explicit `force` so managed data isn't + clobbered by accident. + + Applies under `AUTH_TYPE="ckan"` only — that's the sole provider that + attaches a CKAN resource record. Other providers carry no such record, + so the guard is skipped entirely. + """ + if auth_type != "ckan": + return + if not force and resource.get("url_type") == "datastore": + raise ValidationError( + 'Cannot update a read-only resource. Use "force" to force update.' + ) diff --git a/datastore/api/endpoints/datastore.py b/datastore/api/endpoints/datastore.py index eca9dc0..15ef941 100644 --- a/datastore/api/endpoints/datastore.py +++ b/datastore/api/endpoints/datastore.py @@ -6,6 +6,7 @@ from starlette.requests import Request from starlette.responses import StreamingResponse +from datastore.api.auth import ensure_resource_writable from datastore.api.context import Context from datastore.api.responses import _deprecation_warnings, _success_response from datastore.core.exceptions import ValidationError @@ -63,6 +64,14 @@ async def datastore_create( permission="create", ) + # Refuse to re-declare a datastore-managed resource unless forced + # (CKAN auth only; no-op on the new-resource dict path). + ensure_resource_writable( + data_dict["resource"], + force=bool(payload.force), + auth_type=context.config.AUTH_TYPE, + ) + data_dict.update( { "resource": payload.resource_id or payload.resource, @@ -90,6 +99,11 @@ async def datastore_upsert( resource_id=payload.resource_id, permission="update", ) + ensure_resource_writable( + data_dict["resource"], + force=payload.force, + auth_type=context.config.AUTH_TYPE, + ) data_dict.update(payload.model_dump()) result = await upsert_datastore(context, data_dict) return _success_response(request, result) @@ -181,6 +195,13 @@ async def datastore_delete( Returns the original `filters` echoed back (CKAN convention) so the caller can confirm what the server actually applied. """ - await context.authorize(resource_id=payload.resource_id, permission="delete") + data_dict = await context.authorize( + resource_id=payload.resource_id, permission="delete" + ) + ensure_resource_writable( + data_dict["resource"], + force=payload.force, + auth_type=context.config.AUTH_TYPE, + ) result = await delete_datastore(context, payload.model_dump()) return _success_response(request, result) diff --git a/datastore/core/config.py b/datastore/core/config.py index 1854f9e..727d819 100644 --- a/datastore/core/config.py +++ b/datastore/core/config.py @@ -100,9 +100,8 @@ def _check_engine_available(cls, v: str) -> str: BIGQUERY_DATASET: str = Field( default="", description=( - "BigQuery dataset that holds the datastore tables. Both the " - "per-resource data tables and the internal `_table_metadata` " - "table live here. Required when DATASTORE_ENGINE=bigquery." + "BigQuery dataset that holds the per-resource datastore " + "tables. Required when DATASTORE_ENGINE=bigquery." ), ) BIGQUERY_CREDENTIALS: str = Field( diff --git a/datastore/infrastructure/ckan_client.py b/datastore/infrastructure/ckan_client.py index 2ebf3f4..5465581 100644 --- a/datastore/infrastructure/ckan_client.py +++ b/datastore/infrastructure/ckan_client.py @@ -54,7 +54,7 @@ async def datastore_authorize( permission: str | None = None, ) -> dict[str, Any]: """`/api/3/action/datastore_authorize`. - Authorize resource and package. + Authorize resource and package. """ if (resource_id is None) == (package_id is None): raise ValidationError( diff --git a/datastore/infrastructure/engines/base.py b/datastore/infrastructure/engines/base.py index 7e1fc61..e638cef 100644 --- a/datastore/infrastructure/engines/base.py +++ b/datastore/infrastructure/engines/base.py @@ -3,7 +3,7 @@ from abc import ABC, abstractmethod from collections.abc import Iterator from dataclasses import dataclass -from typing import Any, Protocol, runtime_checkable +from typing import Any @dataclass @@ -31,6 +31,10 @@ def columns(self) -> list[str]: class WriteResult: rows_written: int = 0 total: int | None = None + # Resulting Frictionless schema after the write — populated by + # `delete()`'s column-drop path so the response can echo the table's + # shape minus the dropped columns. `None` for other write paths. + schema: dict[str, Any] | None = None @dataclass @@ -44,52 +48,6 @@ class InfoResult: meta: dict[str, Any] -@runtime_checkable -class MetadataStore(Protocol): - """Per-engine storage for table-level metadata. - - Holds one row per `resource_id`, keyed by the resource_id itself. The - canonical column shape is `(resource_id, schema, created_at, - updated_at)` where `schema` is a Frictionless Table Schema dict. - - Each engine subpackage provides a concrete implementation - (e.g. `bigquery/metadata.py: BigQueryMetadataStore`) so the SQL - dialect, connection management, and column types stay engine-private. - The backend constructs its store in `__init__`, calls `initialize()` - once at startup to create the underlying table, and calls `upsert` - from `create()` whenever a caller declares a new resource. - - Adding a new engine = drop a sibling `metadata.py` implementing this - Protocol; the backend wires it in by holding `self.metadata`. - """ - - def initialize(self) -> None: - """Create the metadata table if it doesn't exist. Idempotent.""" - - def insert(self, resource_id: str, schema: dict[str, Any]) -> None: - """Insert a new metadata row for `resource_id`. - - Sets `created_at` and `updated_at` to now. Fails if a row with - the same `resource_id` already exists — that's a real conflict - that callers should surface (a second `datastore_create` for an - already-declared resource). - """ - - def update(self, resource_id: str, schema: dict[str, Any]) -> None: - """Update the metadata row for `resource_id`. - - Replaces `schema` and bumps `updated_at`; `created_at` is - preserved. Keyed on `resource_id`; no-op when the row is absent. - """ - - def get(self, resource_id: str) -> dict[str, Any] | None: - """Return the stored Frictionless schema for `resource_id`, - or `None` when no row exists.""" - - def delete(self, resource_id: str) -> None: - """Remove the metadata row for `resource_id`. No-op when absent.""" - - class DatastoreBackend(ABC): @abstractmethod def initialize(self) -> None: diff --git a/datastore/infrastructure/engines/bigquery/backend.py b/datastore/infrastructure/engines/bigquery/backend.py index 1b7f931..8f31d84 100644 --- a/datastore/infrastructure/engines/bigquery/backend.py +++ b/datastore/infrastructure/engines/bigquery/backend.py @@ -8,9 +8,12 @@ every BigQuery call is routed through `_run_query` so transport / SQL errors surface as `ServerError` with `resource_id` + operation name baked in, never as raw `google.api_core` exceptions. - 3. Create helpers (`_create_data_table`, `_alter_data_table`, - `_insert_records`, and the branch helpers `_apply_new_resource` / - `_apply_existing_resource`). + 3. Write helpers — `_build_dml` (builder SQL; ValueError → 400) and + `_write_rows` (run a `@rows` write; BQ errors → 400). The DDL / + DML helpers (`_create_table_sql`, `_alter_data_table`, + `_insert_records` / `_merge_records` / `_update_records`) and the + create branches (`_apply_new_resource` / `_apply_existing_resource`) + build on those two. 4. CKAN action methods (`create`, `upsert`, `search`, `search_sql`, `delete`, `info`, `get_columns`, `healthcheck`). """ @@ -30,7 +33,6 @@ from datastore.infrastructure.engines.base import ( DatastoreBackend, InfoResult, - MetadataStore, SearchResult, WriteResult, ) @@ -42,10 +44,14 @@ drop_columns_sql, insert_sql, merge_sql, + normalize_pk, qualify_table_refs, reject_unsupported_type_changes, schema_diff, + set_table_options_sql, strip_limit_offset, + table_options_clause, + table_to_schema, unfiltered_table_name, update_sql, ) @@ -67,26 +73,12 @@ def __init__( self.context = context self.config = config self.client: Any = None - # `metadata` is set in `initialize()` once the client is built. - # Stays `None` in placeholder mode (no BIGQUERY_PROJECT / - # BIGQUERY_DATASET) so the rest of the app can boot — `create()` - # skips the data + metadata writes in that mode rather than crash. - self.metadata: MetadataStore | None = None def initialize(self) -> None: - """Build the BigQuery client when configured; no-op otherwise. - - Lenient on missing config: if `BIGQUERY_PROJECT` is unset, log a - warning and leave `client=None`. Lets the rest of the app boot - without real GCP creds — `/ready` will return 503 (healthcheck - returns False with no client) so the misconfiguration is loud - enough in production without being fatal at import time. - - When the client is built, also constructs the `MetadataStore` - and runs its `initialize()` so the `_table_metadata` table - exists. Only the read-write engine creates DDL — the read-only - engine constructs the store for `get()` but skips `initialize()` - so it doesn't need CREATE privileges. + """Build the BigQuery client when project + dataset are configured. + + Missing config logs a warning and leaves `client=None` so the + app still boots — `/ready` returns 503 until configured. """ if self.config is None or not self.config.BIGQUERY_PROJECT.strip(): log.warning( @@ -95,10 +87,14 @@ def initialize(self) -> None: self.mode, ) return + if not self.config.BIGQUERY_DATASET.strip(): + log.warning( + "BigQueryBackend: BIGQUERY_DATASET unset (mode=%s); client " + "not built — /ready will return 503 until configured.", + self.mode, + ) + return from datastore.infrastructure.engines.bigquery.client import build_client - from datastore.infrastructure.engines.bigquery.metadata import ( - BigQueryMetadataStore, - ) self.client = build_client(self.config, self.mode) log.info( @@ -106,23 +102,28 @@ def initialize(self) -> None: self.config.BIGQUERY_PROJECT, self.mode, ) - dataset = self.config.BIGQUERY_DATASET.strip() - if not dataset: - log.warning( - "BigQueryBackend: BIGQUERY_DATASET unset (mode=%s); " - "metadata store disabled — `datastore_create` will not " - "record per-resource schemas until configured.", - self.mode, - ) - return + def _read_schema(self, resource_id: str) -> dict | None: + """Return Frictionless schema for `resource_id`, or `None` when absent. + + Uses `tables.get` (REST, no query job, ~200ms). Wraps + non-`NotFound` errors as `ServerError`. Caller must ensure + `self.client` is set. + """ + from google.api_core.exceptions import NotFound - self.metadata = BigQueryMetadataStore( - client=self.client, - project=self.config.BIGQUERY_PROJECT, - dataset=dataset, + ref = ( + f"{self.config.BIGQUERY_PROJECT}" + f".{self.config.BIGQUERY_DATASET}.{resource_id}" ) - if self.mode == "rw": - self.metadata.initialize() + try: + table = self.client.get_table(ref) + except NotFound: + return None + except Exception as e: + raise ServerError( + f"BigQuery tables.get failed for resource {resource_id!r}: {e}" + ) from e + return table_to_schema(table) # ----- table refs + low-level client wrappers ------------------------ @@ -147,20 +148,11 @@ def _data_table_ref(self, resource_id: str) -> str: ) def _read_job_config(self, params: list | None = None) -> Any: - """QueryJobConfig for read paths — enables BigQuery's query cache. - - BigQuery caches the result of every deterministic SELECT for - ~24h; an identical query hits the cache and returns free + fast - (no bytes scanned, sub-100ms typically). The flag is on by - default in BigQuery, but every read site builds its config - through this helper so: - - the read-side contract is explicit in the code, - - the `BIGQUERY_USE_QUERY_CACHE` opt-out actually flows - through to the wire (e.g. integration tests that need - a fresh scan can set it to False). - - Write paths (DDL / DML) don't go through this — BigQuery's - cache only applies to SELECT anyway. + """QueryJobConfig for SELECT paths — honours `BIGQUERY_USE_QUERY_CACHE`. + + BQ's 24h results cache makes identical SELECTs free + fast on + hit. Writes don't go through this; BQ's cache only applies to + SELECT anyway. """ from google.cloud import bigquery return bigquery.QueryJobConfig( @@ -178,20 +170,11 @@ def _run_query( resource_id: str, job_config: Any = None, ) -> Any: - """Submit `sql`, wait for completion, and return the QueryJob. - - Wraps every `client.query` call so any - `google.api_core` / transport error becomes a CKAN-shaped - `ServerError` carrying the action name (`op`) and target - `resource_id`. Callers never have to know about Google's - exception hierarchy. - - Returning the `QueryJob` (rather than its `.result()` value) - lets callers grab whichever output they need without a second - helper: rows from `job.result()`, DML row counts from - `job.num_dml_affected_rows`. DDL / MERGE callers simply ignore - the return value — the `.result()` call inside has already - waited for completion. + """Submit `sql`, wait for completion, return the QueryJob. + + Wraps any `client.query` exception as `ServerError` carrying + `op` + `resource_id`. Returning the job lets callers grab + rows (`job.result()`) or DML counts (`num_dml_affected_rows`). """ try: job = self.client.query(sql, job_config=job_config) @@ -203,10 +186,9 @@ def _run_query( ) from e # ----- create helpers (DDL + records + branch orchestration) -------- - def _create_data_table(self, resource_id: str, schema: dict) -> None: - """`CREATE TABLE IF NOT EXISTS` with columns derived from the - Frictionless schema. Idempotent — a second call on the same - resource is a no-op DDL on the BigQuery side.""" + def _create_table_sql(self, resource_id: str, schema: dict) -> str | None: + """Render the `CREATE TABLE … OPTIONS(...)` DDL. + """ cols = column_defs(schema, include_updated_at=self._include_updated_at) if not cols: log.warning( @@ -214,30 +196,35 @@ def _create_data_table(self, resource_id: str, schema: dict) -> None: "skipping CREATE TABLE.", resource_id, ) - return - sql = ( - f"CREATE TABLE IF NOT EXISTS {self._data_table_ref(resource_id)} " - f"({', '.join(cols)})" + return None + return ( + f"CREATE TABLE {self._data_table_ref(resource_id)} " + f"({', '.join(cols)}){table_options_clause(schema)}" + ) + + def _refresh_table_options(self, resource_id: str, schema: dict) -> None: + """Issue `ALTER TABLE … SET OPTIONS(...)` to rewrite the + table-level metadata block. + """ + sql = set_table_options_sql( + self._data_table_ref(resource_id), schema, + ) + self._run_query( + sql, op="ALTER TABLE SET OPTIONS", resource_id=resource_id, ) - self._run_query(sql, op="CREATE TABLE", resource_id=resource_id) - log.info("BigQuery table created: %s", resource_id) def _alter_data_table( self, resource_id: str, old_schema: dict, new_schema: dict ) -> None: """Apply the schema diff as DDL. - Three diff classes: - - **Added columns** → `ALTER TABLE ADD COLUMN IF NOT EXISTS`. - - **Type changes** → `ALTER TABLE ALTER COLUMN SET DATA TYPE` - when BigQuery accepts the transition (`types.can_widen`). - Unsupported transitions raise `ConflictError` BEFORE any - DDL runs so a single bad column can't half-apply the others. - - **Removed columns** → logged and skipped; dropping a column - would lose user data on a metadata edit. - - All ADD / ALTER clauses go in a single `ALTER TABLE` statement - so BigQuery applies them atomically. + Added columns → `ADD COLUMN IF NOT EXISTS`. Widened types → + `ALTER COLUMN SET DATA TYPE` (unsupported widenings raise + `ConflictError` before any DDL runs). Removed columns are + logged and kept (dropping would lose user data). + + Always follows up with `SET OPTIONS` to refresh the + table-level metadata, even when no column actions ran. """ added, type_changes, removed = schema_diff(old_schema, new_schema) reject_unsupported_type_changes(type_changes) @@ -250,100 +237,40 @@ def _alter_data_table( ) clauses = alter_clauses(added, type_changes, new_schema) - if not clauses: - return - sql = ( - f"ALTER TABLE {self._data_table_ref(resource_id)} " - f"{', '.join(clauses)}" - ) - self._run_query(sql, op="ALTER TABLE", resource_id=resource_id) - log.info( - "BigQuery table altered: %s (added=%s, type_changes=%s)", - resource_id, added, type_changes, - ) - - def _insert_records( - self, resource_id: str, schema: dict, records: list - ) -> None: - """Insert rows via DML `INSERT INTO ... SELECT FROM UNNEST(@rows)`. - - Why DML rather than `Client.insert_rows_json`: the streaming - insert API parks rows in a streaming buffer for 30–90 minutes, - and DML statements (UPDATE / DELETE / MERGE) cannot touch rows - still in that buffer. That makes `datastore_create` + immediate - `datastore_upsert` impossible. DML INSERT writes straight to - table storage, so any follow-up upsert/update on the same - primaryKey works without delay. - - Rows ride as a single JSON-array string parameter `@rows`; - BigQuery unpacks it inside the SQL — one statement regardless - of batch size, no Python-side serialisation pass needed (JSON - columns are handled by `PARSE_JSON(JSON_QUERY(...))` inside - the SELECT). - - Empty `records` is a no-op. SQL/transport errors propagate as - `ServerError` via `_run_query`. - """ - import orjson - - if not records: - return - try: - sql = insert_sql( - self._data_table_ref(resource_id), - schema, - include_updated_at=self._include_updated_at, + if clauses: + sql = ( + f"ALTER TABLE {self._data_table_ref(resource_id)} " + f"{', '.join(clauses)}" ) - except ValueError as e: - raise ValidationError(str(e)) from e + self._run_query(sql, op="ALTER TABLE", resource_id=resource_id) + log.info( + "BigQuery table altered: %s (added=%s, type_changes=%s)", + resource_id, added, type_changes, + ) + + # Refresh the table-level metadata even when no column actions + # ran — `primaryKey` or the per-column type hints may have + # changed without a column add/alter (e.g. user re-declares + # `primaryKey` on the same column set). + self._refresh_table_options(resource_id, new_schema) + def _rows_job_config(self, records: list) -> Any: + """`QueryJobConfig` carrying `@rows` as a JSON-array string param.""" + import orjson from google.cloud import bigquery - # `MAX(_id)` is computed inline in the INSERT SQL — saves a - # separate round-trip per call (the older two-statement form - # cost ~1s of BigQuery job overhead for nothing). - job_config = bigquery.QueryJobConfig( + return bigquery.QueryJobConfig( query_parameters=[ bigquery.ScalarQueryParameter( - "rows", "STRING", orjson.dumps(records).decode("utf-8") + "rows", "STRING", orjson.dumps(records).decode("utf-8"), ), ] ) - try: - self._run_query( - sql, op="INSERT", resource_id=resource_id, - job_config=job_config, - ) - except ServerError as e: - raise _translate_bigquery_error( - e, resource_id, "insert" - ) from e - log.info( - "BigQuery rows inserted: %s (%d row(s))", - resource_id, len(records), - ) - - def _merge_records( - self, resource_id: str, schema: dict, records: list - ) -> None: - """Upsert rows via `MERGE` keyed on `schema.primaryKey`. - - Rows whose primary-key columns match an existing row are - UPDATEd; others are INSERTed. The full payload travels as a - single JSON-array string parameter so we issue one statement - regardless of batch size. - Empty `records` is a no-op. Missing primary key on the stored - schema raises `ValidationError` — upsert can't dedup without - one; the caller can fall back to `method="insert"` or declare - a primaryKey on the resource. - """ - import orjson - - if not records: - return + def _build_dml(self, builder: Any, resource_id: str, schema: dict) -> str: + """Render a DML builder's SQL, surfacing its `ValueError` as 400.""" try: - sql = merge_sql( + return builder( self._data_table_ref(resource_id), schema, include_updated_at=self._include_updated_at, @@ -351,75 +278,61 @@ def _merge_records( except ValueError as e: raise ValidationError(str(e)) from e - from google.cloud import bigquery + def _write_rows( + self, resource_id: str, sql: str, *, op: str, action: str, records: list + ) -> Any: + """Run a `@rows`-parameterised write; map BQ write errors to 400s. - # `MAX(_id)` is inlined in the MERGE's WHEN NOT MATCHED clause - # so the upsert is a single round-trip. - job_config = bigquery.QueryJobConfig( - query_parameters=[ - bigquery.ScalarQueryParameter( - "rows", "STRING", orjson.dumps(records).decode("utf-8") - ), - ] - ) + `MAX(_id)` and `CURRENT_TIMESTAMP()` are inlined in the SQL, so + each write is a single round-trip. Returns the job for callers + that need `num_dml_affected_rows`. + """ try: - self._run_query( - sql, op="MERGE", resource_id=resource_id, - job_config=job_config, + job = self._run_query( + sql, op=op, resource_id=resource_id, + job_config=self._rows_job_config(records), ) except ServerError as e: - raise _translate_bigquery_error(e, resource_id, "upsert") from e - log.info( - "BigQuery rows upserted: %s (%d row(s))", - resource_id, len(records), - ) + raise _translate_bigquery_error(e, resource_id, action) from e + log.info("BigQuery %s: %s (%d row(s))", op, resource_id, len(records)) + return job - def _update_records( + def _insert_records( self, resource_id: str, schema: dict, records: list ) -> None: - """Update existing rows via DML `UPDATE`, keyed on - `schema.primaryKey`. - - Update-only semantics: every row in `records` must match an - existing row by primary key. After the statement runs we - compare `num_dml_affected_rows` against the row count and - raise `NotFoundError` if any row had no matching key — DML - UPDATE itself treats misses as a silent no-op, so the count - check is what gives the caller a real signal. - - Empty `records` is a no-op. Missing primary key or all-PK - schema raises `ValidationError` (via `update_sql`'s - `ValueError` re-raise). - """ - import orjson + """DML `INSERT` for `records` (empty → no-op).""" + if not records: + return + sql = self._build_dml(insert_sql, resource_id, schema) + self._write_rows( + resource_id, sql, op="INSERT", action="insert", records=records, + ) + def _merge_records( + self, resource_id: str, schema: dict, records: list + ) -> None: + """`MERGE` keyed on `schema.primaryKey` (empty → no-op).""" if not records: return - try: - sql = update_sql( - self._data_table_ref(resource_id), - schema, - include_updated_at=self._include_updated_at, - ) - except ValueError as e: - raise ValidationError(str(e)) from e + sql = self._build_dml(merge_sql, resource_id, schema) + self._write_rows( + resource_id, sql, op="MERGE", action="upsert", records=records, + ) - from google.cloud import bigquery + def _update_records( + self, resource_id: str, schema: dict, records: list + ) -> None: + """DML `UPDATE` keyed on `schema.primaryKey` (empty → no-op). - job_config = bigquery.QueryJobConfig( - query_parameters=[ - bigquery.ScalarQueryParameter( - "rows", "STRING", orjson.dumps(records).decode("utf-8") - ), - ] + DML UPDATE silently no-ops on PK misses, so any unmatched row + (affected < input count) is reported as `NotFoundError`. + """ + if not records: + return + sql = self._build_dml(update_sql, resource_id, schema) + job = self._write_rows( + resource_id, sql, op="UPDATE", action="update", records=records, ) - try: - job = self._run_query( - sql, op="UPDATE", resource_id=resource_id, - job_config=job_config, - ) - except ServerError as e: - raise _translate_bigquery_error(e, resource_id, "update") from e affected = job.num_dml_affected_rows or 0 if affected < len(records): missing = len(records) - affected @@ -428,23 +341,36 @@ def _update_records( f"had no matching primary key in resource {resource_id!r}; " "use method='upsert' to insert missing rows" ) - log.info( - "BigQuery rows updated: %s (%d row(s))", resource_id, affected, - ) def _apply_new_resource( self, resource_id: str, schema: dict, records: list ) -> None: - """First-time declaration: create the table, seed it, record it. + """First-time create: CREATE TABLE (+ INSERT) as one BQ script. - `metadata.insert` is the final step so any failure earlier - leaves the metadata store untouched and the resource appears - un-declared on retry. + Empty `records` collapses to a standalone CREATE. """ - assert self.metadata is not None - self._create_data_table(resource_id, schema) - self._insert_records(resource_id, schema, records) - self.metadata.insert(resource_id, schema) + create_sql = self._create_table_sql(resource_id, schema) + if create_sql is None: + return + + if not records: + self._run_query( + create_sql, op="CREATE TABLE", resource_id=resource_id, + ) + log.info("BigQuery table created: %s", resource_id) + return + + # `;` joins the DDL + DML into one BigQuery script — one job + # submission, shared `@rows`. The INSERT's `MAX(_id)` subquery + # sees the just-created empty table, so `_id` starts at 1. + script = ( + f"{create_sql};\n" + f"{self._build_dml(insert_sql, resource_id, schema)}" + ) + self._write_rows( + resource_id, script, + op="CREATE TABLE + INSERT", action="insert", records=records, + ) def _apply_existing_resource( self, @@ -453,16 +379,13 @@ def _apply_existing_resource( new_schema: dict, records: list, ) -> None: - """Re-declaration on an existing resource: migrate the table, - append rows, then update the metadata row. + """Re-declare: ALTER (diff + refresh OPTIONS) then INSERT. - If alter OR the record insert raises, `metadata.update` is - skipped and the metadata stays at the old schema version. + ALTER first so the reader sees the new schema consistently + even if INSERT fails. """ - assert self.metadata is not None self._alter_data_table(resource_id, old_schema, new_schema) self._insert_records(resource_id, new_schema, records) - self.metadata.update(resource_id, new_schema) # ----- CKAN action methods ------------------------------------------- def create( @@ -472,19 +395,16 @@ def create( records: list | None, include_total: bool, ) -> WriteResult: - """Declare a resource: DDL → records insert → metadata write. + """Declare a resource, optionally seeding it with rows. - The order is load-bearing — see `_apply_new_resource` / - `_apply_existing_resource` for the per-branch sequence. Any - failure short-circuits before the metadata write so the - metadata row never describes a state the actual table doesn't - match. - - Placeholder mode (no project/dataset) is a no-op echo so the - unit suite can exercise the call path without GCP creds. + Always reads existing schema first so column adds / type + widens apply on re-declares — even when `records` is empty. + Dispatches to `_apply_new_resource` (table absent) or + `_apply_existing_resource` (table present). Placeholder mode + (no client) is an echo. """ - if self.metadata is not None: - existing = self.metadata.get(resource_id) + if self.client is not None: + existing = self._read_schema(resource_id) rows = records or [] if existing is None: self._apply_new_resource(resource_id, schema, rows) @@ -509,27 +429,13 @@ def upsert( ) -> WriteResult: """Insert / update / upsert records into an existing resource. - Method dispatch: - - **"upsert"** (default): `MERGE` keyed on `schema.primaryKey`. - Rows that match an existing key are UPDATEd; the rest are - INSERTed. Requires a `primaryKey` on the stored schema. - - **"insert"**: plain streaming insert (no PK check). Faster - than upsert; raises if any row collides with an existing - primary key (BigQuery row-level errors). - - **"update"**: DML `UPDATE` keyed on `schema.primaryKey`. - Every row must match an existing row — otherwise - `NotFoundError` is raised after the statement runs. Requires - a `primaryKey`. - - The resource must have been declared by `datastore_create` - first; the schema (column types + primaryKey) is read from the - metadata store and used to build the SQL. Calling `upsert` on - an undeclared resource raises `NotFoundError`. - - Placeholder mode (no project/dataset) is a no-op echo so the - unit suite can exercise the call path without GCP creds. + `method="upsert"` → `MERGE` keyed on `schema.primaryKey`; + `method="insert"` → plain DML INSERT (no PK check); + `method="update"` → DML UPDATE (missing PK raises + `NotFoundError`). Resource must already exist + (`datastore_create` first). Placeholder mode is an echo. """ - if self.metadata is None: + if self.client is None: # Placeholder mode — echo (matches the create() pattern). return { "resource_id": resource_id, @@ -539,7 +445,7 @@ def upsert( "total": len(records or []), } - schema = self.metadata.get(resource_id) + schema = self._read_schema(resource_id) if schema is None: raise NotFoundError( f"resource {resource_id!r} is not declared; call " @@ -581,28 +487,14 @@ def search( sort: str | None, include_total: bool, ) -> SearchResult: - """Run a parameterised SELECT against the data table. - - Pipeline: - 1. Resolve schema from `_table_metadata` (404 if undeclared). - 2. Build search + (optional) count SQL via `search.py`. - Validation of `fields` / `sort` / `filters` / `q` columns - happens inside the builders so a bad request becomes a - clean 400, never reaches BigQuery. - 3. Submit both queries. When only an unfiltered total is - needed, fall back to `__TABLES__.row_count` — free vs the - COUNT(*) billing. - 4. Return a row iterator that yields tuples in projection - order; memory stays bounded by the RowIterator's page - size, not the result set size. - - `plain` and `language` are accepted for CKAN compatibility but - currently have no effect on the BigQuery side — `SEARCH()` - tokenises uniformly regardless of `plain`, and we don't expose - the analyzer arg. - - Placeholder mode (no metadata store) returns an empty result so - the unit suite can exercise the call path without GCP creds. + """Parameterised SELECT against the data table, returning a tuple iterator. + + Fires the data query and the (optional) count query in + parallel — wall time ≈ max(both). Unfiltered totals come from + free `INFORMATION_SCHEMA` metadata instead of COUNT(*). + `plain` / `language` are accepted for CKAN compatibility but + ignored (BQ's `SEARCH()` tokenises uniformly). Placeholder + mode (no client) returns an empty result. """ from datastore.infrastructure.engines.bigquery.search import ( build_count, @@ -610,7 +502,7 @@ def search( needs_count_query, ) - if self.metadata is None: + if self.client is None: # Placeholder mode (no GCP creds) — echo the requested # field shape so the unit suite can exercise the streaming # writer + envelope plumbing without a real backend. @@ -626,7 +518,7 @@ def search( records_truncated=False, ) - schema = self.metadata.get(resource_id) + schema = self._read_schema(resource_id) if schema is None: raise NotFoundError( f"resource {resource_id!r} is not declared; call " @@ -657,7 +549,8 @@ def search( # Fire both jobs before waiting on either: BigQuery's # `client.query()` is non-blocking, so the count and the page # query run in parallel — wall time ≈ max(both). - count_job = None + count_cfg = None + count_sql = "" if include_total and needs_count_query( filters=filters, q=q, distinct=distinct, ): @@ -671,11 +564,16 @@ def search( distinct=distinct, ) count_cfg = self._read_job_config(params=count_params) - count_job = self.client.query(count_sql, job_config=count_cfg) - - search_job = self.client.query(sql, job_config=job_config) + # Submit both jobs before waiting on either, so the COUNT and the + # page query run in parallel. The submits sit inside the try too — + # submit-time failures (auth, quota, bad config) map to ServerError + # just like a result() failure, never a raw google exception. + count_job = None try: + if count_cfg is not None: + count_job = self.client.query(count_sql, job_config=count_cfg) + search_job = self.client.query(sql, job_config=job_config) row_iter = search_job.result() except Exception as e: raise ServerError( @@ -705,27 +603,14 @@ def search( ) def search_sql(self, sql: str, limit: int) -> SearchResult: - """Execute a vetted SELECT/WITH statement and stream tuples. - - Safety relies on three layers, none of which this method itself - re-checks (validation already happened upstream): - 1. The request schema rejects non-SELECT / multi-statement - / unparseable SQL (`schemas/request.py:DatastoreSearchSQLRequest`). - 2. The endpoint authorises every referenced table against - CKAN as a resource_id, and the service rejects function - calls outside the engine's allow-list. - 3. **The load-bearing guard:** this engine is built with the - read-only credential (`mode="ro"` selects `BIGQUERY_CREDENTIALS_RO`), - so BigQuery IAM physically refuses any DML / DDL even if - upstream checks were bypassed. The assertion below catches - the dev mistake of dispatching `search_sql` through the - rw engine. - - Result schema is read from BigQuery's job schema (column types - come back as BQ types and are mapped to Frictionless via - `frictionless_type_from_bigquery`). Row output is bounded by - `limit` via `itertools.islice` so a runaway SELECT without an - embedded LIMIT can't pin the streaming response open forever. + """Execute a vetted SELECT/WITH, stream tuples, bounded by `limit`. + + Safety relies on upstream layers (schema rejects non-SELECT, + endpoint authorises tables, service checks function allow-list). + The load-bearing guard is `mode="ro"` — read-only IAM + physically refuses any DML/DDL. Total comes from free + `INFORMATION_SCHEMA.TABLE_STORAGE` for plain SELECTs, else + `COUNT(*) FROM (...)`; COUNT failures are non-fatal. """ from itertools import islice @@ -762,49 +647,7 @@ def search_sql(self, sql: str, limit: int) -> SearchResult: f"failed to qualify table references in SQL: {e}" ) from e - # Pick the cheapest viable path for `total`: - # - # 1. Plain `SELECT cols FROM table [LIMIT/OFFSET]` (no - # WHERE/GROUP/JOIN/aggregate) → read `total_rows` from - # `INFORMATION_SCHEMA.TABLE_STORAGE`. Free metadata query, - # no bytes scanned. - # - # 2. Anything that filters, joins, aggregates, or otherwise - # changes row count → wrap the user's SQL (LIMIT/OFFSET - # stripped) in `SELECT COUNT(*) FROM (...)`. Same pattern - # datastore_search uses for filtered/distinct queries. - # - # `RowIterator.total_rows` alone won't do — it's the row count - # of the destination temp table (post-LIMIT page size), so - # building pagination from it would always say "last page". - count_sql: str | None - count_params: list = [] - try: - table = unfiltered_table_name(qualified_sql) - if table is not None: - count_sql = ( - "SELECT total_rows AS n FROM " - f"`{self.config.BIGQUERY_PROJECT}." - f"{self.config.BIGQUERY_DATASET}." - "INFORMATION_SCHEMA.TABLE_STORAGE` " - "WHERE table_name = @table_name" - ) - from google.cloud import bigquery - count_params = [ - bigquery.ScalarQueryParameter( - "table_name", "STRING", table, - ), - ] - else: - inner = strip_limit_offset(qualified_sql) - count_sql = f"SELECT COUNT(*) AS n FROM ({inner})" - except Exception as e: - log.warning( - "search_sql: could not build COUNT query (%s); " - "total will be omitted", - e, - ) - count_sql = None + count_sql, count_params = self._search_sql_count_query(qualified_sql) # Submit COUNT first (non-blocking) so it runs in parallel with # the data query. A COUNT failure is non-fatal — log and degrade @@ -850,6 +693,43 @@ def search_sql(self, sql: str, limit: int) -> SearchResult: records_truncated=False, ) + def _search_sql_count_query( + self, qualified_sql: str + ) -> tuple[str | None, list]: + """Pick the cheapest `total` query for a vetted SELECT. + + Plain `SELECT cols FROM t [LIMIT/OFFSET]` reads the free + `total_rows` from `INFORMATION_SCHEMA.TABLE_STORAGE`; anything + that filters/joins/aggregates wraps the LIMIT-stripped query in + `COUNT(*)`. `RowIterator.total_rows` can't be used — it counts + the post-LIMIT page, so pagination would always read "last + page". Returns `(None, [])` if no COUNT can be built (non-fatal). + """ + try: + table = unfiltered_table_name(qualified_sql) + if table is not None: + from google.cloud import bigquery + sql = ( + "SELECT total_rows AS n FROM " + f"`{self.config.BIGQUERY_PROJECT}." + f"{self.config.BIGQUERY_DATASET}." + "INFORMATION_SCHEMA.TABLE_STORAGE` " + "WHERE table_name = @table_name" + ) + params = [ + bigquery.ScalarQueryParameter("table_name", "STRING", table), + ] + return sql, params + inner = strip_limit_offset(qualified_sql) + return f"SELECT COUNT(*) AS n FROM ({inner})", [] + except Exception as e: + log.warning( + "search_sql: could not build COUNT query (%s); " + "total will be omitted", + e, + ) + return None, [] + def delete( self, resource_id: str, @@ -859,22 +739,22 @@ def delete( """Drop the table (both None), delete rows by `filters`, or drop columns by `fields`. Schema layer enforces mutual exclusivity.""" - if self.metadata is None: + if self.client is None: return WriteResult() - schema = self.metadata.get(resource_id) + schema = self._read_schema(resource_id) if schema is None: raise NotFoundError( f"resource {resource_id!r} is not declared; nothing to delete" ) if fields is not None: - self._drop_columns(resource_id, schema, fields) - return WriteResult() + new_schema = self._drop_columns(resource_id, schema, fields) + return WriteResult(schema=new_schema) if filters is None: + # Metadata lives on the table itself, so DROP removes both. self._drop_data_table(resource_id) - self.metadata.delete(resource_id) return WriteResult() self._delete_rows(resource_id, schema, filters) @@ -919,21 +799,17 @@ def _drop_columns( resource_id: str, schema: dict[str, Any], fields: list[str], - ) -> None: - """``ALTER TABLE DROP COLUMN …`` + rewrite the stored schema. - Rejects system columns, unknown columns, and PK columns.""" - assert self.metadata is not None + ) -> dict[str, Any]: + """`ALTER TABLE DROP COLUMN …` + refresh table OPTIONS. + Rejects system columns, unknown columns, and PK columns up front. + Returns the resulting Frictionless schema (minus the dropped columns). + """ existing = { f["name"] for f in schema.get("fields", []) if f.get("name") } - pk_raw = schema.get("primaryKey") - pk: set[str] = ( - {pk_raw} if isinstance(pk_raw, str) - else set(pk_raw or []) - ) # System-column check first: `_id` / `_updated_at` aren't in # the stored schema, so the unknown-column check would shadow @@ -949,6 +825,7 @@ def _drop_columns( raise ValidationError( f"cannot drop unknown column(s): {sorted(unknown)}" ) + pk = set(normalize_pk(schema)) pk_violations = [c for c in fields if c in pk] if pk_violations: raise ValidationError( @@ -968,30 +845,26 @@ def _drop_columns( if f.get("name") not in drop_set ], } - self.metadata.update(resource_id, new_schema) + self._refresh_table_options(resource_id, new_schema) log.info( "BigQuery columns dropped: %s (%s)", resource_id, sorted(fields), ) + return new_schema def info(self, resource_id: str) -> InfoResult: """Return the table schema + row stats for a resource. - Reads `schema` from the engine-managed `_table_metadata` (not - BigQuery's `INFORMATION_SCHEMA`) so the `primaryKey` and per- - field `info` data dictionary round-trip exactly as declared at - `datastore_create`. Row count comes from a `COUNT(*)` on the - data table. - - Placeholder mode (no metadata store) returns a stub so the unit - suite can exercise the call path without GCP creds. + Schema via `_read_schema` (single `tables.get` call — no SQL + job); total via `COUNT(*)`. Placeholder mode (no client) + returns a stub. """ - if self.metadata is None: + if self.client is None: return InfoResult( schema={"fields": []}, meta={"resource_id": resource_id, "total": 0}, ) - schema = self.metadata.get(resource_id) + schema = self._read_schema(resource_id) if schema is None: raise NotFoundError( f"resource {resource_id!r} is not declared; call " @@ -1000,27 +873,21 @@ def info(self, resource_id: str) -> InfoResult: total = self._count_rows(resource_id) - pk_raw = schema.get("primaryKey") - pk: list[str] = ( - [pk_raw] if isinstance(pk_raw, str) else list(pk_raw or []) - ) - return InfoResult( schema=schema, meta={ "resource_id": resource_id, "total": total, - "primary_key": pk, + "primary_key": normalize_pk(schema), }, ) def _count_rows(self, resource_id: str) -> int: - """`COUNT(*)` against the data table; returns 0 on missing table. + """`COUNT(*)` on the data table; logs + returns 0 on failure. - A missing data table while metadata exists is an inconsistent - state (manual cleanup, partial drop). Logging it as a warning - and returning 0 keeps `datastore_info` informative rather than - 500-ing the whole call. + A missing data table while metadata exists is inconsistent + state — returning 0 keeps `datastore_info` informative + instead of 500-ing the whole call. """ sql = ( f"SELECT COUNT(*) AS n FROM " @@ -1068,11 +935,6 @@ async def dump(self, resource_id: str, fmt: str) -> list[str]: "/datastore/dump cannot run without an export bucket." ) - if self.metadata is not None and self.metadata.get(resource_id) is None: - raise NotFoundError( - f"resource {resource_id!r} is not declared; nothing to dump" - ) - from google.cloud import bigquery # Clients: ro for reads (BQ get_table, GCS list); rw for the @@ -1086,8 +948,14 @@ async def dump(self, resource_id: str, fmt: str) -> list[str]: f"{self.config.BIGQUERY_PROJECT}" f".{self.config.BIGQUERY_DATASET}.{resource_id}" ) + from google.api_core.exceptions import NotFound + try: table = await asyncio.to_thread(self.client.get_table, table_ref) + except NotFound as e: + raise NotFoundError( + f"resource {resource_id!r} is not declared; nothing to dump" + ) from e except Exception as e: raise ServerError( f"BigQuery get_table failed for resource {resource_id!r}: {e}" @@ -1266,11 +1134,7 @@ def _build_storage_client(self, mode: str) -> Any: return storage.Client(**kwargs) def get_columns(self, resource_id: str) -> list[str]: - """Return column names for a table. - - Placeholder — replaced when real `search` lands. Empty list - keeps callers from crashing on the dead code path. - """ + """Return column names. Placeholder — returns `[]`.""" return [] def healthcheck(self) -> bool: @@ -1279,17 +1143,6 @@ def healthcheck(self) -> bool: """ if self.client is None: return False - if ( - self.config is not None - and self.config.BIGQUERY_PROJECT.strip() - and self.metadata is None - ): - log.warning( - "BigQuery healthcheck failed (mode=%s): metadata store " - "unavailable — set BIGQUERY_DATASET.", - self.mode, - ) - return False try: self.client.query("SELECT 1").result() return True @@ -1303,25 +1156,14 @@ def healthcheck(self) -> bool: def _translate_bigquery_error( exc: ServerError, resource_id: str, action: str ) -> Exception: - """Map known BigQuery error signatures (raised on INSERT / MERGE / - UPDATE against the JSON-array source) to clear `ValidationError`s. - - BigQuery's raw messages are technically accurate but unhelpful — - e.g. *"Scalar subquery produced more than one element"* really - means "your records have duplicate primary keys" and *"Bad double - value: jk"* means "you sent the string 'jk' for a `number` - column". Both surface as 400 ValidationError with a message that - names the actual problem. - - Patterns handled: - - duplicate primaryKey rows in the batch; - - per-column type mismatches (`Bad value: …`, - `Could not cast …`, `Could not parse …`); - - out-of-range numeric values (`Value out of range …`); - - bad date / time / timestamp literals (`Invalid : …`). - - Other errors pass through unchanged so the caller can re-raise as - a generic `ServerError`. + """Translate raw BQ write errors into actionable `ValidationError`s. + + Rewrites BQ messages whose literal text is unhelpful — e.g. + *"Scalar subquery produced more than one element"* really means + duplicate primary keys; *"Bad double value: jk"* means a non-numeric + value for a `number` column. Handles: duplicate PKs, per-column + type mismatches, out-of-range numerics, bad date/time literals. + Other errors pass through unchanged. """ import re diff --git a/datastore/infrastructure/engines/bigquery/lib.py b/datastore/infrastructure/engines/bigquery/lib.py index 216478f..395fa2e 100644 --- a/datastore/infrastructure/engines/bigquery/lib.py +++ b/datastore/infrastructure/engines/bigquery/lib.py @@ -1,14 +1,24 @@ -"""Side-effect-free helpers for the BigQuery backend: schema diffs, -DDL clause rendering, DML statement builders, JSON extractors.""" +"""Side-effect-free helpers for the BigQuery backend. +""" from __future__ import annotations +import json +import logging +from typing import TYPE_CHECKING, Any + from datastore.core.exceptions import ConflictError from datastore.infrastructure.engines.bigquery.types import ( bigquery_type, can_widen, + frictionless_type_from_bigquery, ) +if TYPE_CHECKING: + from google.cloud import bigquery + +log = logging.getLogger(__name__) + # Frictionless types that map to BigQuery `JSON`. JSON_FRICTIONLESS_TYPES = frozenset({"object", "array", "geojson"}) @@ -16,6 +26,33 @@ # via `Config.INCLUDE_UPDATED_AT`. Same-named user fields are dropped. SYSTEM_COLUMN_NAMES: frozenset[str] = frozenset({"_id", "_updated_at"}) +# Frictionless type → BigQuery scalar parameter type for filter values. +# JSON / array / geojson absent — equality on those is rejected. +_FILTER_PARAM_TYPE: dict[str, str] = { + "integer": "INT64", + "number": "FLOAT64", + "boolean": "BOOL", + "string": "STRING", + "date": "DATE", + "datetime": "TIMESTAMP", + "time": "TIME", + "any": "STRING", +} + +# Native-metadata: sentinel under which the engine namespaces its own +# metadata in the table-level `description`. NOTE: the description is +# engine-owned — each write (`set_table_options_sql`) rewrites it wholesale +# from the schema, so any human-authored description text or non-datastore +# labels set in the BQ console are NOT preserved across a refresh. +DATASTORE_KEY = "datastore" + +# Bumped when the JSON shape under `DATASTORE_KEY` changes in a way +# that needs explicit translation on read. +SCHEMA_VERSION = 1 + + +# ── 3. system columns ─────────────────────────────────────────────────────── + def _system_col_defs(include_updated_at: bool) -> tuple[str, ...]: return ( @@ -29,9 +66,33 @@ def _system_col_insert_list(include_updated_at: bool) -> str: return "`_id`, `_updated_at`" if include_updated_at else "`_id`" +def normalize_pk(schema: dict) -> list[str]: + """`schema.primaryKey` as a list (str → 1-elem, missing → empty).""" + pk = schema.get("primaryKey") + if isinstance(pk, str): + return [pk] + return list(pk or []) + + +def _user_fields(schema: dict) -> list[dict]: + """Declared fields minus engine-managed system columns.""" + return [ + f for f in schema.get("fields", []) + if f.get("name") and f["name"] not in SYSTEM_COLUMN_NAMES + ] + + +# ── 4. DDL builders (CREATE / ALTER / DROP) ──────────────────────────────── + + def column_defs(schema: dict, *, include_updated_at: bool = True) -> list[str]: - """Render `schema.fields` as ``\\`name\\` TYPE`` for `CREATE TABLE`, - prepending system columns and skipping any field that collides.""" + """Render `schema.fields` as ``\\`name\\` TYPE`` column DDL. + + Per-field metadata (`info`, `title`, …) is NOT emitted per column + — the full schema lives in the table-level OPTIONS instead so + reads need one slot, not N. System columns are prepended; + user fields that collide with them are dropped. + """ cols: list[str] = list(_system_col_defs(include_updated_at)) for f in schema.get("fields", []): name = f.get("name") @@ -91,15 +152,16 @@ def alter_clauses( type_changes: list[tuple[str, str | None, str | None]], new_schema: dict, ) -> list[str]: - """Per-column clauses for a single `ALTER TABLE`.""" + """Per-column clauses for a single `ALTER TABLE` statement.""" new_by_name = { f["name"]: f for f in new_schema.get("fields", []) if f.get("name") } clauses: list[str] = [] for name in added: + field = new_by_name[name] clauses.append( f"ADD COLUMN IF NOT EXISTS `{name}` " - f"{bigquery_type(new_by_name[name].get('type'))}" + f"{bigquery_type(field.get('type'))}" ) for name, _, new_t in type_changes: clauses.append( @@ -108,19 +170,28 @@ def alter_clauses( return clauses +def drop_columns_sql(table_ref: str, columns: list[str]) -> str: + """Render ``ALTER TABLE DROP COLUMN …``. + Caller validates column names — identifiers can't be parameterised.""" + if not columns: + raise ValueError("drop_columns_sql requires at least one column") + clauses = ", ".join(f"DROP COLUMN `{c}`" for c in columns) + return f"ALTER TABLE {table_ref} {clauses}" + + +# ── 5. DML builders (INSERT / MERGE / UPDATE / DELETE) ───────────────────── + + def insert_sql( table_ref: str, schema: dict, *, include_updated_at: bool = True ) -> str: - """Render `INSERT INTO ... SELECT FROM UNNEST(JSON_QUERY_ARRAY(@rows))`. + """Render `INSERT INTO … SELECT FROM UNNEST(JSON_QUERY_ARRAY(@rows))`. - DML (not streaming) so follow-up MERGE/UPDATE stays consistent. - `_id` = `(SELECT IFNULL(MAX(_id), 0) FROM tbl) + ROW_NUMBER() OVER ()` - — one MAX baseline per batch, ROW_NUMBER per row. + `_id` is inlined as `(SELECT IFNULL(MAX(_id), 0) FROM tbl) + + ROW_NUMBER() OVER ()` — one MAX baseline per batch, no separate + probe. """ - fields = [ - f for f in schema.get("fields", []) - if f.get("name") and f["name"] not in SYSTEM_COLUMN_NAMES - ] + fields = _user_fields(schema) if not fields: raise ValueError("schema has no user fields; cannot INSERT") @@ -148,18 +219,12 @@ def merge_sql( ) -> str: """Render `MERGE` keyed by `schema.primaryKey`. - Matched rows update only if a non-PK column differs (so + Matched rows update only when a non-PK column differs (so `_updated_at` advances only on real changes). Unmatched rows - insert with `_id` = `(SELECT MAX(_id) FROM tbl) + _rn`. + insert with `_id` continuing from `MAX(_id) + _rn`. """ - fields = [ - f for f in schema.get("fields", []) - if f.get("name") and f["name"] not in SYSTEM_COLUMN_NAMES - ] - pk_raw = schema.get("primaryKey") - pk: list[str] = ( - [pk_raw] if isinstance(pk_raw, str) else list(pk_raw or []) - ) + fields = _user_fields(schema) + pk = normalize_pk(schema) if not pk: raise ValueError( "schema has no 'primaryKey'; upsert requires one to " @@ -214,17 +279,14 @@ def merge_sql( def update_sql( table_ref: str, schema: dict, *, include_updated_at: bool = True ) -> str: - """Render `UPDATE T SET ... FROM (SELECT ... FROM UNNEST(@rows)) S - WHERE `. Caller must compare affected rows to input size - and raise `NotFoundError` for unmatched keys.""" - fields = [ - f for f in schema.get("fields", []) - if f.get("name") and f["name"] not in SYSTEM_COLUMN_NAMES - ] - pk_raw = schema.get("primaryKey") - pk: list[str] = ( - [pk_raw] if isinstance(pk_raw, str) else list(pk_raw or []) - ) + """Render `UPDATE T SET … FROM (UNNEST(@rows)) S WHERE `. + + Caller must compare affected rows to input size and raise + `NotFoundError` for unmatched keys — DML UPDATE silently no-ops + on misses. + """ + fields = _user_fields(schema) + pk = normalize_pk(schema) if not pk: raise ValueError( "schema has no 'primaryKey'; update requires one to " @@ -259,63 +321,17 @@ def update_sql( ) -def _diff_expr(field: dict) -> str: - """NULL-safe inequality between `T.` and `S.`. JSON - columns are canonicalised via `TO_JSON_STRING` first.""" - name = field["name"] - if field.get("type") in JSON_FRICTIONLESS_TYPES: - return ( - f"TO_JSON_STRING(T.`{name}`) IS DISTINCT FROM " - f"TO_JSON_STRING(S.`{name}`)" - ) - return f"T.`{name}` IS DISTINCT FROM S.`{name}`" - - -def _json_extract(field: dict) -> str: - """Typed extraction of a field from JSON row variable `r`.""" - name = field["name"] - fr_type = field.get("type") - bq_type = bigquery_type(fr_type) - path = f"'$.{name}'" - if fr_type in JSON_FRICTIONLESS_TYPES: - return f"PARSE_JSON(JSON_QUERY(r, {path}))" - if bq_type == "STRING": - return f"JSON_VALUE(r, {path})" - return f"CAST(JSON_VALUE(r, {path}) AS {bq_type})" - - -# Frictionless type → BigQuery scalar parameter type for filter values. -# JSON / array / geojson absent — equality on those is rejected. -_FILTER_PARAM_TYPE: dict[str, str] = { - "integer": "INT64", - "number": "FLOAT64", - "boolean": "BOOL", - "string": "STRING", - "date": "DATE", - "datetime": "TIMESTAMP", - "time": "TIME", - "any": "STRING", -} - - -def drop_columns_sql(table_ref: str, columns: list[str]) -> str: - """Render ``ALTER TABLE DROP COLUMN …``. Caller must - validate column names against the schema first (identifiers can't - be parameterised).""" - if not columns: - raise ValueError("drop_columns_sql requires at least one column") - clauses = ", ".join(f"DROP COLUMN `{c}`" for c in columns) - return f"ALTER TABLE {table_ref} {clauses}" - - def delete_sql( table_ref: str, schema: dict, filters: dict, ) -> tuple[str, list]: - """Render parameterised ``DELETE FROM WHERE …``. Empty - ``filters`` yields ``WHERE TRUE`` (BigQuery requires a WHERE on - every DELETE). Returns ``(sql, query_parameters)``.""" + """Render parameterised ``DELETE FROM WHERE …``. + + Empty `filters` yields `WHERE TRUE` (BQ requires a WHERE on every + DELETE). Returns `(sql, query_parameters)`. + """ + # Lazy import keeps `google-cloud-bigquery` engine-private. from google.cloud import bigquery if filters is None or not isinstance(filters, dict): @@ -365,23 +381,46 @@ def delete_sql( return f"DELETE FROM {table_ref} WHERE {where}", params +def _diff_expr(field: dict) -> str: + """NULL-safe inequality between `T.` and `S.`. + JSON columns are canonicalised via `TO_JSON_STRING` first.""" + name = field["name"] + if field.get("type") in JSON_FRICTIONLESS_TYPES: + return ( + f"TO_JSON_STRING(T.`{name}`) IS DISTINCT FROM " + f"TO_JSON_STRING(S.`{name}`)" + ) + return f"T.`{name}` IS DISTINCT FROM S.`{name}`" + + +def _json_extract(field: dict) -> str: + """Typed extraction of a field from JSON row variable `r`.""" + name = field["name"] + fr_type = field.get("type") + bq_type = bigquery_type(fr_type) + path = f"'$.{name}'" + if fr_type in JSON_FRICTIONLESS_TYPES: + return f"PARSE_JSON(JSON_QUERY(r, {path}))" + if bq_type == "STRING": + return f"JSON_VALUE(r, {path})" + return f"CAST(JSON_VALUE(r, {path}) AS {bq_type})" + + +# ── 6. SQL parsing utilities (sqlglot) ────────────────────────────────────── + + def unfiltered_table_name( sql: str, *, dialect: str = "bigquery", ) -> str | None: - """Return the single source table name when `sql` is a plain - `SELECT cols FROM [LIMIT/OFFSET]` — i.e. the result row - count equals the source table's row count. - - Returns None whenever any clause could change the row count: - WHERE, GROUP BY, HAVING, JOIN, DISTINCT, QUALIFY, aggregate - functions, set ops (UNION/EXCEPT/INTERSECT), subqueries, or more - than one source table. The caller falls back to a real - `COUNT(*) FROM ()` in those cases. - - Used by `datastore_search_sql` to route the unfiltered total - through `INFORMATION_SCHEMA.TABLE_STORAGE` — free metadata read, - no bytes scanned — instead of a full table scan via COUNT(*). + """Return the source table name when `sql` is a plain + `SELECT cols FROM
[LIMIT/OFFSET]` — i.e. result row count + = source row count. `None` if any clause could change row count + (WHERE, GROUP BY, JOIN, DISTINCT, aggregates, set ops, subqueries). + + Lets `datastore_search_sql` route the unfiltered total through + free `INFORMATION_SCHEMA.TABLE_STORAGE` instead of a full COUNT(*). """ + # Lazy import — sqlglot is heavy. import sqlglot from sqlglot import expressions as exp @@ -420,13 +459,8 @@ def unfiltered_table_name( def strip_limit_offset(sql: str, *, dialect: str = "bigquery") -> str: - """Return `sql` with its LIMIT and OFFSET clauses removed. - - Used by `datastore_search_sql` to wrap the user's filtered query in - a `SELECT COUNT(*) FROM (...)` for the total — the count has to - ignore the page size or it would just report the current page's - row count, breaking `total_pages` / `next` links. - """ + """Return `sql` with LIMIT/OFFSET removed — used when wrapping a + paginated user query in `SELECT COUNT(*) FROM (...)` for the total.""" import sqlglot tree = sqlglot.parse_one(sql, dialect=dialect) @@ -436,20 +470,12 @@ def strip_limit_offset(sql: str, *, dialect: str = "bigquery") -> str: def qualify_table_refs(sql: str, project: str, dataset: str) -> str: - """Rewrite every non-CTE table reference to its fully-qualified - BigQuery form and re-serialise the SQL in BigQuery dialect. - - Users pass `datastore_search_sql` SQL with table refs that look - like CKAN resource_ids (`FROM "uuid"` or `FROM uuid`). BigQuery - needs `project.dataset.uuid` with backticked identifiers — so the - backend parses the user's SQL (postgres dialect, which accepts - double-quoted identifiers), tags each unqualified table with the - configured project + dataset, and serialises out as BigQuery SQL. - - Tables that already carry a `catalog` (project) are left alone — - callers who fully-qualify their refs win against the auto-prefix. - CTE aliases are also skipped (they're defined inline, not external - tables). + """Rewrite unqualified table refs to `project.dataset.` for BigQuery. + + User SQL refs look like CKAN resource_ids (`FROM "uuid"` / + `FROM uuid`); we parse as postgres (accepts double-quoted idents) + and re-emit as BigQuery. Already-qualified refs and CTE aliases + are left alone. """ import sqlglot from sqlglot import expressions as exp @@ -468,3 +494,107 @@ def qualify_table_refs(sql: str, project: str, dataset: str) -> str: table.set("catalog", exp.to_identifier(project, quoted=True)) table.set("db", exp.to_identifier(dataset, quoted=True)) return tree.sql(dialect="bigquery") + + +# ── 7. native metadata (encoders + parsers) ──────────────────────────────── +# +# Writes go through `table_options_clause` (on CREATE) or +# `set_table_options_sql` (on ALTER). Reads happen in `backend.py` +# via a single `client.get_table` call piped through `table_to_schema`. + + +def table_to_schema(table: "bigquery.Table") -> dict[str, Any]: + """Reconstruct the Frictionless schema stored on `table`. + + Returns the user-supplied schema dict verbatim when the table + carries our `datastore.schema` block; falls back to BQ-column + inference for unmanaged tables. + """ + table_meta = _parse_description(table.description).get(DATASTORE_KEY, {}) + stored = table_meta.get("schema") + if isinstance(stored, dict): + return stored + return _infer_schema_from_bq(table) + + +def _infer_schema_from_bq(table: "bigquery.Table") -> dict[str, Any]: + """Minimal Frictionless schema from BQ column metadata. + Used only for tables created outside the engine.""" + fields: list[dict[str, Any]] = [ + { + "name": col.name, + # Map to canonical Frictionless names — downstream helpers + # (filter type maps in delete_sql / search) only understand + # those, not raw BigQuery type names. + "type": frictionless_type_from_bigquery(col.field_type), + } + for col in table.schema + if col.name not in SYSTEM_COLUMN_NAMES + ] + return {"fields": fields} + + +def table_options_clause(schema: dict[str, Any]) -> str: + """Return ` OPTIONS(description = '…', labels = […])` for a table DDL.""" + desc = _encode_table_description(schema) + return ( + f" OPTIONS(description = {_sql_literal(desc)}, " + f"labels = [(\"datastore_managed\", \"true\")])" + ) + + +def set_table_options_sql(table_ref: str, schema: dict[str, Any]) -> str: + """Stand-alone `ALTER TABLE … SET OPTIONS(...)` statement. + + Separate from column ALTERs because BQ refuses to mix column + actions and SET OPTIONS in one statement. + """ + desc = _encode_table_description(schema) + return ( + f"ALTER TABLE {table_ref} " + f"SET OPTIONS(description = {_sql_literal(desc)}, " + f"labels = [(\"datastore_managed\", \"true\")])" + ) + + +def _parse_description(s: str | None) -> dict[str, Any]: + """Parse a description blob as JSON, or `{}` on empty/non-object/malformed.""" + if not s: + return {} + try: + result = json.loads(s) + except (json.JSONDecodeError, ValueError): + return {} + return result if isinstance(result, dict) else {} + + +def _encode_table_description(schema: dict[str, Any]) -> str: + """JSON-encode the user schema verbatim under `datastore.schema`. + Strips engine-managed system columns from `fields` first.""" + payload: dict[str, Any] = { + DATASTORE_KEY: { + "schema_version": SCHEMA_VERSION, + "schema": _strip_system_fields(schema), + } + } + return json.dumps(payload, ensure_ascii=False, separators=(",", ":")) + + +def _strip_system_fields(schema: dict[str, Any]) -> dict[str, Any]: + """Shallow copy of `schema` with `_id`/`_updated_at` filtered out of `fields`.""" + return { + **schema, + "fields": [ + f for f in schema.get("fields", []) + if f.get("name") not in SYSTEM_COLUMN_NAMES + ], + } + + +def _sql_literal(s: str) -> str: + """Single-quote a string for inline use in BigQuery SQL. + + Order matters: backslashes first, then single quotes. Only + descriptive payloads go through here — never identifiers. + """ + return "'" + s.replace("\\", "\\\\").replace("'", "\\'") + "'" diff --git a/datastore/infrastructure/engines/bigquery/metadata.py b/datastore/infrastructure/engines/bigquery/metadata.py deleted file mode 100644 index 8c47069..0000000 --- a/datastore/infrastructure/engines/bigquery/metadata.py +++ /dev/null @@ -1,220 +0,0 @@ -"""BigQuery implementation of the `MetadataStore` Protocol. - -Stores one row per `resource_id` in a hidden `_table_metadata` table -that lives alongside the user data tables in `BIGQUERY_DATASET`. The -row carries the Frictionless schema declared at `datastore_create` -time plus `created_at` / `updated_at` timestamps so callers can -reconstruct the column declaration without re-parsing user tables. - -The table is created on engine startup (`initialize()`) and updated via -parameterised `MERGE` from `create()`. Other engines (DuckLake, -Postgres, …) provide their own implementation of the same Protocol — -the backend layer only depends on the methods declared in -`engines/base.py:MetadataStore`. -""" - -from __future__ import annotations - -import logging -from typing import TYPE_CHECKING, Any - -import orjson - -from datastore.core.exceptions import ServerError - -if TYPE_CHECKING: - from google.cloud import bigquery - -log = logging.getLogger(__name__) - -# Hidden by convention: BigQuery treats leading-underscore tables as -# internal, hiding them from default list / autocomplete in most UIs. -METADATA_TABLE_NAME = "_table_metadata" - - -class BigQueryMetadataStore: - """`MetadataStore` backed by a BigQuery table. - - Schema (DDL applied by `initialize`): - - resource_id STRING NOT NULL - schema JSON NOT NULL - created_at TIMESTAMP NOT NULL - updated_at TIMESTAMP NOT NULL - - The table is keyed on `resource_id` at the application layer - (BigQuery has no enforced PK / unique constraints); the `MERGE` in - `upsert()` provides single-row semantics. - """ - - def __init__( - self, - *, - client: bigquery.Client, - project: str, - dataset: str, - table_name: str = METADATA_TABLE_NAME, - ) -> None: - self.client = client - self.project = project - self.dataset = dataset - self.table_name = table_name - - @property - def table_ref(self) -> str: - """Fully-qualified `project.dataset.table` reference for SQL.""" - return f"`{self.project}.{self.dataset}.{self.table_name}`" - - def initialize(self) -> None: - """Create the metadata table if it doesn't exist. Idempotent. - - Uses `CREATE TABLE IF NOT EXISTS` so concurrent pods racing to - start up don't trip over each other. The dataset itself is - assumed to exist — creating datasets is an out-of-band ops task, - not something the application does at request time. - """ - ddl = f""" - CREATE TABLE IF NOT EXISTS {self.table_ref} ( - resource_id STRING NOT NULL, - schema JSON NOT NULL, - created_at TIMESTAMP NOT NULL, - updated_at TIMESTAMP NOT NULL - ) - """ - self._run(ddl, op="metadata CREATE TABLE", resource_id=None) - log.info( - "BigQuery metadata table ready: %s.%s.%s", - self.project, self.dataset, self.table_name, - ) - - def insert(self, resource_id: str, schema: dict) -> None: - """Insert a new metadata row for `resource_id`. - - Sets `created_at` / `updated_at` to now. Fails if a row already - exists for this `resource_id` — that's a genuine conflict - (duplicate `datastore_create`) that callers should surface. - """ - sql = f""" - INSERT INTO {self.table_ref} - (resource_id, schema, created_at, updated_at) - VALUES ( - @resource_id, - PARSE_JSON(@schema), - CURRENT_TIMESTAMP(), - CURRENT_TIMESTAMP() - ) - """ - self._run( - sql, - op="metadata INSERT", - resource_id=resource_id, - job_config=self._schema_params(resource_id, schema), - ) - - def update(self, resource_id: str, schema: dict) -> None: - """Update the metadata row keyed by `resource_id`. - - Replaces `schema` and bumps `updated_at`; `created_at` is - preserved. Plain `UPDATE` — no MERGE, no insert fallback. When - no row matches the predicate the statement is a no-op. - """ - sql = f""" - UPDATE {self.table_ref} - SET schema = PARSE_JSON(@schema), - updated_at = CURRENT_TIMESTAMP() - WHERE resource_id = @resource_id - """ - self._run( - sql, - op="metadata UPDATE", - resource_id=resource_id, - job_config=self._schema_params(resource_id, schema), - ) - - def _schema_params( - self, resource_id: str, schema: dict - ) -> "bigquery.QueryJobConfig": - """Build the `(resource_id, schema)` parameter set shared by - `insert` and `update`. Keeps the SQL strings free of inline - values and the marshalling rule in one place.""" - from google.cloud import bigquery - - return bigquery.QueryJobConfig( - query_parameters=[ - bigquery.ScalarQueryParameter("resource_id", "STRING", resource_id), - bigquery.ScalarQueryParameter( - "schema", "STRING", orjson.dumps(schema).decode("utf-8") - ), - ] - ) - - def get(self, resource_id: str) -> dict | None: - """Return the stored Frictionless schema for `resource_id`, - or `None` when no row exists.""" - sql = f""" - SELECT TO_JSON_STRING(schema) AS schema_json - FROM {self.table_ref} - WHERE resource_id = @resource_id - LIMIT 1 - """ - rows = list( - self._run( - sql, - op="metadata SELECT", - resource_id=resource_id, - job_config=self._resource_id_params(resource_id), - ) - ) - if not rows: - return None - raw = rows[0]["schema_json"] - parsed: Any = orjson.loads(raw) - return parsed if isinstance(parsed, dict) else None - - def delete(self, resource_id: str) -> None: - """Remove the metadata row for `resource_id`. No-op when absent.""" - sql = f"DELETE FROM {self.table_ref} WHERE resource_id = @resource_id" - self._run( - sql, - op="metadata DELETE", - resource_id=resource_id, - job_config=self._resource_id_params(resource_id), - ) - - def _run( - self, - sql: str, - *, - op: str, - resource_id: str | None, - job_config: "bigquery.QueryJobConfig | None" = None, - ) -> Any: - """Run a metadata SQL statement and wait for completion. - - Wraps every `client.query` so transport / SQL failures arrive at - callers as `ServerError` with the operation name + the - `resource_id` being touched (or `` for `initialize`), - rather than raw `google.api_core` exceptions. - """ - try: - return self.client.query(sql, job_config=job_config).result() - except Exception as e: - target = resource_id if resource_id is not None else "" - raise ServerError( - f"BigQuery {op} failed for resource {target!r}: {e}" - ) from e - - def _resource_id_params( - self, resource_id: str - ) -> "bigquery.QueryJobConfig": - """Job config carrying just the `resource_id` parameter (for - `get` and `delete` which don't bind a schema).""" - from google.cloud import bigquery - - return bigquery.QueryJobConfig( - query_parameters=[ - bigquery.ScalarQueryParameter( - "resource_id", "STRING", resource_id - ), - ] - ) diff --git a/datastore/schemas/responses.py b/datastore/schemas/responses.py index a7f5108..35b4fc1 100644 --- a/datastore/schemas/responses.py +++ b/datastore/schemas/responses.py @@ -101,6 +101,9 @@ class Result(BaseModel): resource_id: str filters: dict[str, Any] | None = None fields: list[str] | None = None + # Set only on the column-drop path: the resulting Frictionless + # Table Schema after the listed columns were removed. + schema: dict[str, Any] | None = None result: Result diff --git a/datastore/services/write.py b/datastore/services/write.py index 6203ef1..eaf6e48 100644 --- a/datastore/services/write.py +++ b/datastore/services/write.py @@ -33,7 +33,12 @@ async def create_datastore( assert context.ckan is not None, ( "datastore_create `resource` dict path requires AUTH_TYPE=ckan" ) - resource = await context.ckan.resource_create(resource=resource) + # Tag the new resource as datastore-managed so CKAN (and our own + # read-only guard on subsequent writes) knows the datastore owns + # its data. Caller-supplied url_type is overridden on purpose. + resource = await context.ckan.resource_create( + resource={**resource, "url_type": "datastore"} + ) resource_id = resource["id"] else: resource_id = resource @@ -98,7 +103,7 @@ async def delete_datastore( fields = data_dict.get("fields") engine = get_datastore_engine(context, mode="rw") - await asyncio.to_thread( + result = await asyncio.to_thread( engine.delete, resource_id=resource_id, filters=filters, fields=fields, ) @@ -106,4 +111,7 @@ async def delete_datastore( resource_id=resource_id, filters=filters, fields=fields, + # Populated only on the column-drop path: the table's schema + # after the listed columns were removed. + schema=result.schema, ) diff --git a/tests/auth/test_orchestration.py b/tests/auth/test_orchestration.py index 03676d9..f54cda8 100644 --- a/tests/auth/test_orchestration.py +++ b/tests/auth/test_orchestration.py @@ -16,7 +16,7 @@ from typing import Any import pytest -from datastore.api.auth import authorize +from datastore.api.auth import authorize, ensure_resource_writable from datastore.auth.base import Decision from datastore.core.exceptions import AuthorizationError, ValidationError @@ -145,3 +145,33 @@ def test_provider_authorization_error_propagates() -> None: api_key="tok", provider=provider, resource_id="res-1", package_id=None, permission="read", )) + + +# --- ensure_resource_writable (read-only force guard) ----------------------- + + +def test_readonly_guard_blocks_datastore_resource_under_ckan() -> None: + with pytest.raises(ValidationError, match="read-only"): + ensure_resource_writable( + {"url_type": "datastore"}, force=False, auth_type="ckan", + ) + + +def test_readonly_guard_allows_with_force() -> None: + ensure_resource_writable( + {"url_type": "datastore"}, force=True, auth_type="ckan", + ) + + +def test_readonly_guard_is_ckan_only() -> None: + """Non-CKAN auth never trips the guard, even if a resource somehow + carries `url_type="datastore"`.""" + for auth_type in ("anonymous", "jwt"): + ensure_resource_writable( + {"url_type": "datastore"}, force=False, auth_type=auth_type, + ) + + +def test_readonly_guard_ignores_non_datastore_resources() -> None: + ensure_resource_writable({"url_type": "upload"}, force=False, auth_type="ckan") + ensure_resource_writable({}, force=False, auth_type="ckan") diff --git a/tests/conftest.py b/tests/conftest.py index bcf0889..6c85e6d 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -13,6 +13,11 @@ ): os.environ[_name] = "" os.environ["BIGQUERY_EXPORT_URL_EXPIRY_HOURS"] = "1" +# `AUTH_TYPE` defaults to `ckan`, whose validator requires a non-empty +# `CKAN_URL`. `create_app()` runs at import (module-level `app`), so give +# it a dummy when the env doesn't carry one — tests override the CKAN +# client via DI, so this URL is never contacted. +os.environ.setdefault("CKAN_URL", "http://test-ckan.local") from collections.abc import Iterator # noqa: E402 from typing import Any # noqa: E402 diff --git a/tests/engines/bigquery/test_metadata.py b/tests/engines/bigquery/test_metadata.py index b93bc66..ff9d536 100644 --- a/tests/engines/bigquery/test_metadata.py +++ b/tests/engines/bigquery/test_metadata.py @@ -1,235 +1,102 @@ -"""Unit tests for `BigQueryMetadataStore`. +"""Unit tests for the BigQuery native-metadata helpers in `lib.py`. -The store talks to BigQuery via `client.query(sql, job_config=...)`. We -mock the client so tests can pin: - - what SQL the store issues (DDL on `initialize`, INSERT on `insert`, - UPDATE on `update`, SELECT on `get`, DELETE on `delete`); - - what query parameters travel alongside the SQL. - -No real BigQuery is contacted — these are pure unit tests over the -SQL the store generates. +The engine stores the Frictionless schema in the table-level +`description` (under a `datastore` key) rather than a side table, so +these tests pin the encode → decode round-trip, the DDL/ALTER OPTIONS +shape, and the fall-back to BQ-column inference for unmanaged tables. +No real BigQuery is contacted — table objects are lightweight stubs. """ from __future__ import annotations import json +from types import SimpleNamespace from typing import Any -from unittest.mock import MagicMock -import pytest -from datastore.infrastructure.engines.bigquery.metadata import ( - METADATA_TABLE_NAME, - BigQueryMetadataStore, +from datastore.infrastructure.engines.bigquery.lib import ( + DATASTORE_KEY, + normalize_pk, + set_table_options_sql, + table_options_clause, + table_to_schema, ) -@pytest.fixture -def mock_client() -> MagicMock: - """A `bigquery.Client` stand-in that records `.query(...)` calls. - - `client.query(sql, job_config=...)` returns a job whose `.result()` - yields whatever the test arranges via `mock_client.set_rows([...])`. - """ - client = MagicMock() - job = MagicMock() - job.result.return_value = [] - client.query.return_value = job - - def _set_rows(rows: list[dict[str, Any]]) -> None: - job.result.return_value = rows - - client.set_rows = _set_rows # type: ignore[attr-defined] - return client - - -@pytest.fixture -def store(mock_client: MagicMock) -> BigQueryMetadataStore: - return BigQueryMetadataStore( - client=mock_client, - project="proj-1", - dataset="ds-1", - ) - - -# --- initialize ------------------------------------------------------------ - - -def test_initialize_issues_create_table_if_not_exists( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - store.initialize() - - assert mock_client.query.call_count == 1 - sql = mock_client.query.call_args[0][0] - assert "CREATE TABLE IF NOT EXISTS" in sql - assert "`proj-1.ds-1._table_metadata`" in sql - # Schema columns are declared. - for col in ( - "resource_id STRING", - "schema JSON", - "created_at TIMESTAMP", - "updated_at TIMESTAMP", - ): - assert col in sql +def _table(description: str | None = None, columns: list[tuple[str, str]] | None = None) -> Any: + """A `bigquery.Table` stand-in carrying `description` + `schema`.""" + schema = [ + SimpleNamespace(name=name, field_type=ftype) + for name, ftype in (columns or []) + ] + return SimpleNamespace(description=description, schema=schema) -def test_initialize_is_idempotent( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - """Two `initialize()` calls — both safe because the DDL uses - `IF NOT EXISTS`.""" - store.initialize() - store.initialize() +# --- OPTIONS round-trip ---------------------------------------------------- - assert mock_client.query.call_count == 2 - -# --- insert ---------------------------------------------------------------- - - -def test_insert_issues_parameterised_insert( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: +def test_table_options_clause_round_trips_schema_without_system_fields() -> None: schema = { - "fields": [{"name": "a", "type": "integer"}], - "primaryKey": ["a"], + "fields": [ + {"name": "_id", "type": "integer"}, + {"name": "auction_id", "type": "integer"}, + {"name": "product_code", "type": "string"}, + ], + "primaryKey": ["auction_id"], } - store.insert("res-1", schema) - - assert mock_client.query.call_count == 1 - sql, kwargs = mock_client.query.call_args - sql_text = sql[0] - assert "INSERT INTO" in sql_text - assert "MERGE" not in sql_text # no upsert semantics - assert "PARSE_JSON(@schema)" in sql_text - assert "CURRENT_TIMESTAMP()" in sql_text - - params = {p.name: p.value for p in kwargs["job_config"].query_parameters} - assert params["resource_id"] == "res-1" - assert json.loads(params["schema"]) == schema - - -# --- update ---------------------------------------------------------------- - - -def test_update_issues_parameterised_update( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - schema = {"fields": [{"name": "b", "type": "string"}]} - store.update("res-1", schema) - - assert mock_client.query.call_count == 1 - sql, kwargs = mock_client.query.call_args - sql_text = sql[0] - assert "UPDATE" in sql_text - assert "SET schema = PARSE_JSON(@schema)" in sql_text - assert "WHERE resource_id = @resource_id" in sql_text - # `created_at` must NOT be reassigned by update. - assert "created_at" not in sql_text + clause = table_options_clause(schema) - params = {p.name: p.value for p in kwargs["job_config"].query_parameters} - assert params["resource_id"] == "res-1" - assert json.loads(params["schema"]) == schema + assert clause.startswith(" OPTIONS(description = '") + assert 'labels = [("datastore_managed", "true")]' in clause + # Recover the JSON description and confirm the schema round-trips + # with system columns stripped. + desc = clause.split("description = '", 1)[1].rsplit("', labels", 1)[0] + payload = json.loads(desc.replace("\\'", "'").replace("\\\\", "\\")) + stored = payload[DATASTORE_KEY]["schema"] + assert stored["primaryKey"] == ["auction_id"] + assert [f["name"] for f in stored["fields"]] == ["auction_id", "product_code"] + assert table_to_schema(_table(description=desc)) == stored -# --- get ------------------------------------------------------------------- +def test_set_table_options_sql_emits_alter_with_managed_label() -> None: + sql = set_table_options_sql("`p.d.r`", {"fields": [{"name": "a", "type": "string"}]}) + assert sql.startswith("ALTER TABLE `p.d.r` SET OPTIONS(description = '") + assert 'labels = [("datastore_managed", "true")]' in sql -def test_get_returns_parsed_schema( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - schema = {"fields": [{"name": "a", "type": "integer"}]} - mock_client.set_rows([{"schema_json": json.dumps(schema)}]) - out = store.get("res-1") +# --- table_to_schema ------------------------------------------------------- - assert out == schema +def test_table_to_schema_returns_stored_block_verbatim() -> None: + stored = {"fields": [{"name": "x", "type": "number", "info": {"unit": "MWh"}}]} + desc = json.dumps({DATASTORE_KEY: {"schema_version": 1, "schema": stored}}) + assert table_to_schema(_table(description=desc)) == stored -def test_get_returns_none_when_no_row( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - mock_client.set_rows([]) - assert store.get("does-not-exist") is None - - -# --- delete ---------------------------------------------------------------- - - -def test_delete_issues_parameterised_delete( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - store.delete("res-1") - - assert mock_client.query.call_count == 1 - sql, kwargs = mock_client.query.call_args - assert "DELETE FROM" in sql[0] - assert "WHERE resource_id = @resource_id" in sql[0] - params = {p.name: p.value for p in kwargs["job_config"].query_parameters} - assert params["resource_id"] == "res-1" - - -# --- table reference ------------------------------------------------------- - -def test_table_ref_format(store: BigQueryMetadataStore) -> None: - assert store.table_ref == "`proj-1.ds-1._table_metadata`" - assert store.table_name == METADATA_TABLE_NAME - - -# --- error wrapping -------------------------------------------------------- - - -def test_insert_wraps_bigquery_errors_as_server_error( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - """Raw BigQuery exceptions are wrapped as `ServerError` carrying - the operation name + resource_id.""" - from datastore.core.exceptions import ServerError - - mock_client.query.return_value.result.side_effect = RuntimeError( - "quota exceeded" - ) - - with pytest.raises(ServerError) as exc: - store.insert("res-1", {"fields": [{"name": "a", "type": "integer"}]}) - - msg = str(exc.value) - assert "metadata INSERT" in msg - assert "'res-1'" in msg - assert "quota exceeded" in msg - - -def test_update_wraps_bigquery_errors_as_server_error( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - from datastore.core.exceptions import ServerError - - mock_client.query.return_value.result.side_effect = RuntimeError( - "bigquery is sad" - ) - - with pytest.raises(ServerError) as exc: - store.update("res-1", {"fields": [{"name": "a", "type": "integer"}]}) +def test_table_to_schema_infers_from_columns_for_unmanaged_table() -> None: + table = _table(columns=[("_id", "INT64"), ("name", "STRING"), ("age", "INT64")]) + schema = table_to_schema(table) + # System columns dropped; BQ field types mapped to canonical + # Frictionless names so downstream filter type maps understand them. + assert schema == { + "fields": [ + {"name": "name", "type": "string"}, + {"name": "age", "type": "integer"}, + ] + } - assert "metadata UPDATE" in str(exc.value) - assert "'res-1'" in str(exc.value) +def test_table_to_schema_ignores_malformed_description() -> None: + assert table_to_schema(_table(description="not json", columns=[("a", "STRING")])) == { + "fields": [{"name": "a", "type": "string"}] + } -def test_initialize_wraps_bigquery_errors_as_server_error( - store: BigQueryMetadataStore, mock_client: MagicMock -) -> None: - """Init has no resource_id; the error message uses `` so the - target is still labelled.""" - from datastore.core.exceptions import ServerError - mock_client.query.return_value.result.side_effect = RuntimeError( - "permission denied" - ) +# --- normalize_pk ---------------------------------------------------------- - with pytest.raises(ServerError) as exc: - store.initialize() - msg = str(exc.value) - assert "metadata CREATE TABLE" in msg - assert "" in msg +def test_normalize_pk_handles_str_list_and_missing() -> None: + assert normalize_pk({"primaryKey": "id"}) == ["id"] + assert normalize_pk({"primaryKey": ["a", "b"]}) == ["a", "b"] + assert normalize_pk({}) == [] + assert normalize_pk({"primaryKey": None}) == [] diff --git a/tests/engines/bigquery/test_tables.py b/tests/engines/bigquery/test_tables.py index 06db34e..f9b06a0 100644 --- a/tests/engines/bigquery/test_tables.py +++ b/tests/engines/bigquery/test_tables.py @@ -83,30 +83,49 @@ def test_data_table_ref_uses_backticks(mock_client: MagicMock) -> None: ) -def test_create_data_table_emits_create_table_if_not_exists( +def test_create_table_sql_emits_ddl_with_options( mock_client: MagicMock, ) -> None: + """`_create_table_sql` renders `CREATE TABLE` DDL carrying the + table-level OPTIONS block with the user's full schema verbatim — + `fields`, `primaryKey`, and any per-field keys (`title`, + `description`, `unit`, custom extensions) all stored on the table + itself rather than scattered across column descriptions.""" backend = _backend(mock_client) - backend._create_data_table( + sql = backend._create_table_sql( "res-1", - {"fields": [ - {"name": "id", "type": "integer"}, - {"name": "label", "type": "string"}, - ]}, + { + "fields": [ + {"name": "id", "type": "integer", "title": "ID"}, + {"name": "label", "type": "string"}, + ], + "primaryKey": ["id"], + }, ) - sql = mock_client.query.call_args[0][0] - assert "CREATE TABLE IF NOT EXISTS `proj-1.ds-1.res-1`" in sql - # User columns. - assert "`id` INT64" in sql + assert sql is not None + assert "CREATE TABLE `proj-1.ds-1.res-1`" in sql + # User columns (plain DDL, no per-column OPTIONS). + assert "`id` INT64," in sql assert "`label` STRING" in sql + assert " OPTIONS(" not in sql.split(") OPTIONS")[0] # nothing pre-table-OPTIONS # System columns auto-prepended. assert "`_id` INT64" in sql assert "`_updated_at` TIMESTAMP" in sql + # Table-level OPTIONS contains the full user schema, verbatim. + assert '"title":"ID"' in sql + assert '"primaryKey":["id"]' in sql + assert sql.endswith( + ', labels = [("datastore_managed", "true")])' + ) -def test_alter_adds_new_columns_and_widens_supported_types( +def test_alter_adds_new_columns_widens_types_then_refreshes_options( mock_client: MagicMock, ) -> None: + """`_alter_data_table` issues two statements: the column ADD / ALTER + actions first, then `ALTER TABLE … SET OPTIONS(...)` to refresh the + table-level metadata block (BQ refuses to mix the two in one + statement).""" backend = _backend(mock_client) old = {"fields": [{"name": "a", "type": "integer"}]} new = {"fields": [ @@ -116,10 +135,37 @@ def test_alter_adds_new_columns_and_widens_supported_types( backend._alter_data_table("res-1", old, new) + assert mock_client.query.call_count == 2 + col_sql = mock_client.query.call_args_list[0].args[0] + opts_sql = mock_client.query.call_args_list[1].args[0] + assert "ALTER TABLE `proj-1.ds-1.res-1`" in col_sql + assert "ADD COLUMN IF NOT EXISTS `b` STRING" in col_sql + assert "ALTER COLUMN `a` SET DATA TYPE FLOAT64" in col_sql + assert opts_sql.startswith( + "ALTER TABLE `proj-1.ds-1.res-1` SET OPTIONS(" + ) + # Refreshed table OPTIONS carries the new schema verbatim. + assert '"name":"a","type":"number"' in opts_sql + assert '"name":"b","type":"string"' in opts_sql + + +def test_alter_refreshes_options_even_when_no_column_changes( + mock_client: MagicMock, +) -> None: + """Re-declaring the same schema (e.g. to change `primaryKey` on + the same column set) still refreshes the table-level OPTIONS so + the reader sees the new primaryKey on the next `tables.get`.""" + backend = _backend(mock_client) + schema = { + "fields": [{"name": "a", "type": "integer"}], + "primaryKey": ["a"], + } + backend._alter_data_table("res-1", schema, schema) + + assert mock_client.query.call_count == 1 sql = mock_client.query.call_args[0][0] - assert "ALTER TABLE `proj-1.ds-1.res-1`" in sql - assert "ADD COLUMN IF NOT EXISTS `b` STRING" in sql - assert "ALTER COLUMN `a` SET DATA TYPE FLOAT64" in sql + assert sql.startswith("ALTER TABLE `proj-1.ds-1.res-1` SET OPTIONS(") + assert '"primaryKey":["a"]' in sql def test_alter_rejects_unsupported_type_change_before_any_ddl( @@ -197,8 +243,9 @@ def test_client_query_errors_surface_as_server_error_with_context( ) backend = _backend(mock_client) with pytest.raises(ServerError) as exc: - backend._create_data_table( - "res-1", {"fields": [{"name": "a", "type": "integer"}]} + backend._run_query( + "CREATE TABLE foo (x INT64)", + op="CREATE TABLE", resource_id="res-1", ) assert "CREATE TABLE" in str(exc.value) assert "'res-1'" in str(exc.value) @@ -207,20 +254,15 @@ def test_client_query_errors_surface_as_server_error_with_context( # --- create() orchestration ----------------------------------------------- -def test_create_new_resource_runs_ddl_records_then_metadata_in_order( +def test_create_new_resource_runs_combined_create_and_insert_script( mock_client: MagicMock, ) -> None: - """New resource path: CREATE TABLE → INSERT INTO → metadata.insert. - Two BigQuery round-trips (`MAX(_id)` is inlined into the INSERT - statement). Metadata is the last write so any failure earlier - leaves it untouched.""" + """New resource path: CREATE TABLE + INSERT INTO travel as a single + multi-statement BigQuery script (one `client.query` submission, + one polling cycle). Saves ~1s of job overhead vs running them + separately.""" backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None - - parent = MagicMock() - parent.attach_mock(mock_client.query, "query") - parent.attach_mock(backend.metadata.insert, "metadata_insert") + backend._read_schema = MagicMock(return_value=None) backend.create( "res-1", @@ -229,33 +271,88 @@ def test_create_new_resource_runs_ddl_records_then_metadata_in_order( include_total=False, ) - sql_calls = [c for c in parent.mock_calls if c[0] == "query"] - assert len(sql_calls) == 2 # no separate MAX(_id) probe - assert sql_calls[0].args[0].startswith("CREATE TABLE IF NOT EXISTS") - assert sql_calls[1].args[0].startswith("INSERT INTO ") - # The inlined MAX(_id) subquery rides inside the INSERT itself. - assert "SELECT IFNULL(MAX(`_id`), 0)" in sql_calls[1].args[0] - # metadata.insert came last. - assert parent.mock_calls[-1][0] == "metadata_insert" + assert mock_client.query.call_count == 1 + script = mock_client.query.call_args[0][0] + assert script.startswith("CREATE TABLE ") + # Table-level OPTIONS lives on the CREATE — no follow-up metadata write. + assert ") OPTIONS(description = '" in script + # The INSERT statement comes after the CREATE, separated by `;`. + assert ";\nINSERT INTO " in script + # `MAX(_id)` is inlined into the INSERT — no separate probe. + assert "SELECT IFNULL(MAX(`_id`), 0)" in script + # @rows parameter rides on the same job_config. + params = mock_client.query.call_args.kwargs["job_config"].query_parameters + assert [p.name for p in params] == ["rows"] + + +def test_create_with_no_records_on_new_resource_runs_create_only( + mock_client: MagicMock, +) -> None: + """When `records` is empty and the resource is new, the engine + issues just `CREATE TABLE` (no INSERT) — but it still reads the + existing schema first so a re-declare with new columns on an + existing table can ALTER them in (see the alter-path tests).""" + backend = _backend(mock_client) + backend._read_schema = MagicMock(return_value=None) + backend.create( + "res-1", + schema={"fields": [{"name": "a", "type": "integer"}]}, + records=None, + include_total=False, + ) -def test_create_skips_metadata_when_records_insert_fails( + backend._read_schema.assert_called_once_with("res-1") + assert mock_client.query.call_count == 1 + sql = mock_client.query.call_args[0][0] + assert sql.startswith("CREATE TABLE ") + assert "INSERT" not in sql + + +def test_create_with_no_records_on_existing_resource_alters_columns( mock_client: MagicMock, ) -> None: - """Atomicity: a DML INSERT failure leaves metadata untouched. + """Re-declaring an existing resource with NEW columns and no + records still adds the columns — `_read_schema` runs unconditionally + so the diff fires whether or not the caller had rows to insert.""" + backend = _backend(mock_client) + backend._read_schema = MagicMock(return_value={ + "fields": [{"name": "a", "type": "integer"}], + }) - Create flow: CREATE TABLE → INSERT. Fail the second query (the - INSERT); the first (CREATE TABLE) succeeds. - """ + backend.create( + "res-1", + schema={"fields": [ + {"name": "a", "type": "integer"}, + {"name": "b", "type": "string"}, + ]}, + records=None, + include_total=False, + ) + + # Column ALTER + table-level SET OPTIONS, no INSERT. + assert mock_client.query.call_count == 2 + col_sql = mock_client.query.call_args_list[0].args[0] + opts_sql = mock_client.query.call_args_list[1].args[0] + assert "ADD COLUMN IF NOT EXISTS `b` STRING" in col_sql + assert opts_sql.startswith( + "ALTER TABLE `proj-1.ds-1.res-1` SET OPTIONS(" + ) + + +def test_create_propagates_insert_failure_as_server_error( + mock_client: MagicMock, +) -> None: + """If the combined CREATE+INSERT script fails (e.g. type coercion + error on a row), the error surfaces as `ServerError` carrying the + resource_id. The CREATE half is idempotent, so a retry with fixed + records re-runs the whole script and lands the table.""" backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None + backend._read_schema = MagicMock(return_value=None) - success_job = MagicMock() - success_job.result.return_value = [] fail_job = MagicMock() fail_job.result.side_effect = RuntimeError("insert failed") - mock_client.query.side_effect = [success_job, fail_job] + mock_client.query.return_value = fail_job with pytest.raises(ServerError): backend.create( @@ -264,8 +361,6 @@ def test_create_skips_metadata_when_records_insert_fails( records=[{"a": 1}], include_total=False, ) - backend.metadata.insert.assert_not_called() - backend.metadata.update.assert_not_called() def test_create_placeholder_mode_skips_everything( @@ -274,7 +369,7 @@ def test_create_placeholder_mode_skips_everything( """No metadata store → no DDL, no metadata calls. Lets the unit suite run without GCP creds.""" backend = _backend(mock_client) - backend.metadata = None + backend.client = None backend.create( "res-1", @@ -365,8 +460,7 @@ def _backend_with_schema( mock_client: MagicMock, schema: dict[str, Any] ) -> BigQueryBackend: backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = schema + backend._read_schema = MagicMock(return_value=schema) return backend @@ -475,8 +569,7 @@ def test_upsert_undeclared_resource_raises_not_found( """`upsert` before `create` → NotFoundError. Metadata store is the source of truth for whether a resource exists.""" backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None + backend._read_schema = MagicMock(return_value=None) with pytest.raises(NotFoundError, match="not declared"): backend.upsert( @@ -582,19 +675,18 @@ def test_insert_translates_bigquery_bad_double_value_to_type_mismatch( a non-numeric string for a `number` column) is translated to a clear ValidationError naming the bad value and the expected type. - The create-flow runs CREATE TABLE → INSERT INTO — only the INSERT - (2nd call) should fail. The first (CREATE TABLE) succeeds. + The create-flow runs CREATE TABLE + INSERT INTO as a single + multi-statement script; BigQuery aborts the script on the INSERT + statement and surfaces the type error to the single + `client.query` caller. """ - success_job = MagicMock() - success_job.result.return_value = [] fail_job = MagicMock() fail_job.result.side_effect = RuntimeError( "400 Bad double value: jk; reason: invalidQuery, location: query" ) - mock_client.query.side_effect = [success_job, fail_job] + mock_client.query.return_value = fail_job backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None + backend._read_schema = MagicMock(return_value=None) with pytest.raises(ValidationError) as exc: backend.create( @@ -814,16 +906,16 @@ def test_backend_propagates_config_flag_into_ddl( mock_client: MagicMock, ) -> None: """`BigQueryBackend._include_updated_at` reads `INCLUDE_UPDATED_AT` - off the attached config; `_create_data_table` honours it.""" + off the attached config; `_create_table_sql` honours it.""" backend = _backend(mock_client) backend.config.INCLUDE_UPDATED_AT = False - backend._create_data_table( + sql = backend._create_table_sql( "res-1", {"fields": [{"name": "id", "type": "integer"}]}, ) - sql = mock_client.query.call_args[0][0] + assert sql is not None assert "`_id` INT64" in sql assert "_updated_at" not in sql @@ -869,8 +961,7 @@ def test_info_raises_not_found_for_undeclared_resource( out-of-band but the engine treats `_table_metadata` as the declaration source of truth.""" backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None + backend._read_schema = MagicMock(return_value=None) with pytest.raises(NotFoundError, match="not declared"): backend.info("ghost") @@ -903,7 +994,7 @@ def test_info_placeholder_mode_returns_stub(mock_client: MagicMock) -> None: """No metadata store → return an empty stub so the unit suite can exercise the call path without GCP creds.""" backend = _backend(mock_client) - backend.metadata = None + backend.client = None result = backend.info("res-1") @@ -1181,8 +1272,7 @@ def test_search_raises_not_found_for_undeclared_resource( mock_client: MagicMock, ) -> None: backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None + backend._read_schema = MagicMock(return_value=None) with pytest.raises(NotFoundError, match="not declared"): backend.search( @@ -1215,26 +1305,27 @@ def test_search_translates_builder_error_to_validation_error( # --- delete() ------------------------------------------------------------- -def test_delete_with_no_filters_or_fields_drops_table_and_metadata( +def test_delete_with_no_filters_or_fields_drops_table( mock_client: MagicMock, ) -> None: - """Both `filters` and `fields` omitted → `DROP TABLE` + the - metadata row is removed. Resource disappears entirely.""" + """Both `filters` and `fields` omitted → `DROP TABLE`. Metadata + lives on the table itself, so the drop removes both at once — + no follow-up metadata cleanup needed.""" schema = {"fields": [{"name": "id", "type": "integer"}]} backend = _backend_with_schema(mock_client, schema) backend.delete("res-1", filters=None, fields=None) + assert mock_client.query.call_count == 1 sql = mock_client.query.call_args[0][0] assert sql == "DROP TABLE IF EXISTS `proj-1.ds-1.res-1`" - backend.metadata.delete.assert_called_once_with("res-1") def test_delete_with_empty_filters_deletes_all_rows( mock_client: MagicMock, ) -> None: - """`filters={}` → `DELETE FROM … WHERE TRUE`. Table + metadata - survive; only rows go.""" + """`filters={}` → `DELETE FROM … WHERE TRUE`. Table (and its + metadata) survive; only rows go.""" schema = {"fields": [{"name": "id", "type": "integer"}]} backend = _backend_with_schema(mock_client, schema) @@ -1242,7 +1333,6 @@ def test_delete_with_empty_filters_deletes_all_rows( sql = mock_client.query.call_args[0][0] assert sql == "DELETE FROM `proj-1.ds-1.res-1` WHERE TRUE" - backend.metadata.delete.assert_not_called() def test_delete_with_filters_binds_typed_parameters( @@ -1273,11 +1363,13 @@ def test_delete_with_filters_binds_typed_parameters( assert params == {"f0": (144, "INT64"), "f1": (False, "BOOL")} -def test_delete_with_fields_drops_columns_and_updates_metadata( +def test_delete_with_fields_drops_columns_and_refreshes_options( mock_client: MagicMock, ) -> None: """`fields=[…]` → `ALTER TABLE DROP COLUMN …` (one ALTER, multiple - clauses) and the stored schema is rewritten without those fields.""" + clauses) followed by a `SET OPTIONS` that rewrites the table-level + metadata without the dropped column names. Two SQL calls total — + BigQuery refuses to mix column actions and SET OPTIONS.""" schema = { "fields": [ {"name": "id", "type": "integer"}, @@ -1288,17 +1380,30 @@ def test_delete_with_fields_drops_columns_and_updates_metadata( } backend = _backend_with_schema(mock_client, schema) - backend.delete("res-1", filters=None, fields=["extra", "obsolete"]) + result = backend.delete("res-1", filters=None, fields=["extra", "obsolete"]) - sql = mock_client.query.call_args[0][0] - assert sql == ( + # The resulting schema (minus the dropped columns) is returned so the + # response can echo the table's shape after the drop. + assert result.schema == { + "fields": [{"name": "id", "type": "integer"}], + "primaryKey": ["id"], + } + + assert mock_client.query.call_count == 2 + drop_sql = mock_client.query.call_args_list[0].args[0] + opts_sql = mock_client.query.call_args_list[1].args[0] + assert drop_sql == ( "ALTER TABLE `proj-1.ds-1.res-1` " "DROP COLUMN `extra`, DROP COLUMN `obsolete`" ) - # Stored schema shrinks to just the surviving fields. - new_schema = backend.metadata.update.call_args[0][1] - assert [f["name"] for f in new_schema["fields"]] == ["id"] - assert new_schema["primaryKey"] == ["id"] + assert opts_sql.startswith( + "ALTER TABLE `proj-1.ds-1.res-1` SET OPTIONS(" + ) + # Refreshed table metadata only references surviving fields + PK. + assert '"primaryKey":["id"]' in opts_sql + assert '"name":"id"' in opts_sql + assert '"extra"' not in opts_sql + assert '"obsolete"' not in opts_sql def test_delete_rejects_dropping_primary_key_columns( @@ -1366,8 +1471,7 @@ def test_delete_raises_not_found_for_undeclared_resource( mock_client: MagicMock, ) -> None: backend = _backend(mock_client) - backend.metadata = MagicMock() - backend.metadata.get.return_value = None + backend._read_schema = MagicMock(return_value=None) with pytest.raises(NotFoundError, match="not declared"): backend.delete("ghost", filters=None, fields=None) diff --git a/tests/test_datastore_create.py b/tests/test_datastore_create.py index 91c6f8d..8eb48dd 100644 --- a/tests/test_datastore_create.py +++ b/tests/test_datastore_create.py @@ -359,3 +359,56 @@ def test_create_with_invalid_schema_returns_validation_error( error = response.json()["error"] assert error["__type"] == "Validation Error" assert "schema" in error["fields"] + + +# Read-only resource guard (url_type="datastore") --------------------------- + + +def test_create_with_resource_dict_tags_url_type_datastore( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + """The resource-dict path marks the new CKAN resource datastore-managed.""" + response = client.post(CREATE_URL, json=_valid_payload_with_resource()) + + assert response.status_code == 200 + created = fake_ckan.resources["balancing_auction_results_2025"] + assert created["url_type"] == "datastore" + + +def test_create_on_readonly_resource_requires_force( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + fake_ckan.add_resource( + "ro-res", package_id="pkg-balancing-2025", url_type="datastore" + ) + payload = { + "resource_id": "ro-res", + "fields": [{"id": "auction_id", "type": "int4"}], + "records": [], + } + + response = client.post(CREATE_URL, json=payload) + + assert response.status_code == 400 + error = response.json()["error"] + assert error["__type"] == "Validation Error" + assert "read-only" in error["message"] + + +def test_create_on_readonly_resource_with_force_succeeds( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + fake_ckan.add_resource( + "ro-res", package_id="pkg-balancing-2025", url_type="datastore" + ) + payload = { + "resource_id": "ro-res", + "force": True, + "fields": [{"id": "auction_id", "type": "int4"}], + "records": [], + } + + response = client.post(CREATE_URL, json=payload) + + assert response.status_code == 200 + assert response.json()["success"] is True diff --git a/tests/test_datastore_delete.py b/tests/test_datastore_delete.py index 22e3597..4780fef 100644 --- a/tests/test_datastore_delete.py +++ b/tests/test_datastore_delete.py @@ -166,3 +166,36 @@ def test_denied_key_returns_403( assert response.status_code == 403 assert response.json()["error"]["__type"] == "Authorization Error" + + +# 5. Read-only resource guard (url_type="datastore") ------------------------ + + +def test_delete_on_readonly_resource_requires_force( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + fake_ckan.add_resource( + "ro-res", package_id="pkg-balancing-2025", url_type="datastore" + ) + + response = client.post(DELETE_URL, json={"resource_id": "ro-res"}) + + assert response.status_code == 400 + error = response.json()["error"] + assert error["__type"] == "Validation Error" + assert "read-only" in error["message"] + + +def test_delete_on_readonly_resource_with_force_succeeds( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + fake_ckan.add_resource( + "ro-res", package_id="pkg-balancing-2025", url_type="datastore" + ) + + response = client.post( + DELETE_URL, json={"resource_id": "ro-res", "force": True} + ) + + assert response.status_code == 200 + assert response.json()["success"] is True diff --git a/tests/test_datastore_dump.py b/tests/test_datastore_dump.py index ee856fb..19085c9 100644 --- a/tests/test_datastore_dump.py +++ b/tests/test_datastore_dump.py @@ -349,7 +349,6 @@ def _engine_with_storage(storage_blobs: list[Any]) -> tuple[BigQueryBackend, Any # ADC (which we've stubbed via sys.modules below). backend.config.BIGQUERY_CREDENTIALS = "" backend.config.BIGQUERY_CREDENTIALS_RO = "" - backend.metadata = None table = MagicMock() table.schema = [] diff --git a/tests/test_datastore_upsert.py b/tests/test_datastore_upsert.py index 41ec2ad..afa2981 100644 --- a/tests/test_datastore_upsert.py +++ b/tests/test_datastore_upsert.py @@ -171,3 +171,36 @@ def test_denied_key_returns_403( assert response.status_code == 403 body = response.json() assert body["error"]["__type"] == "Authorization Error" + + +# 6. Read-only resource guard (url_type="datastore") ------------------------ + + +def test_upsert_on_readonly_resource_requires_force( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + fake_ckan.add_resource( + "ro-res", package_id="pkg-balancing-2025", url_type="datastore" + ) + + response = client.post(UPSERT_URL, json=_payload(resource_id="ro-res")) + + assert response.status_code == 400 + error = response.json()["error"] + assert error["__type"] == "Validation Error" + assert "read-only" in error["message"] + + +def test_upsert_on_readonly_resource_with_force_succeeds( + client: TestClient, fake_ckan: FakeCKAN +) -> None: + fake_ckan.add_resource( + "ro-res", package_id="pkg-balancing-2025", url_type="datastore" + ) + + response = client.post( + UPSERT_URL, json=_payload(resource_id="ro-res", force=True) + ) + + assert response.status_code == 200 + assert response.json()["success"] is True