Skip to content

Feat (dump): /datastore/dump/<rid> with 302-or-stream-concat download#2

Merged
sagargg merged 2 commits into
mainfrom
fix/dump-url
May 26, 2026
Merged

Feat (dump): /datastore/dump/<rid> with 302-or-stream-concat download#2
sagargg merged 2 commits into
mainfrom
fix/dump-url

Conversation

@sagargg
Copy link
Copy Markdown
Member

@sagargg sagargg commented May 26, 2026

Summary

  • Adds GET /datastore/dump/{resource_id}?format=csv|ndjson|parquet — one URL → one file from the caller's POV.
  • Small exports (≤1 GB) → 302 redirect to a V4 signed GCS URL; bytes flow GCS → client, server stays out of the byte path.
  • Large CSV/NDJSON exports → BigQuery wildcard EXPORT DATA produces shards in GCS; the endpoint stream-concats them through async httpx (StreamingResponse) so peak memory ≈ one 64 KiB chunk and no worker threads are held.
  • Parquet >1 GB is refused with 413 PayloadTooLargeError (Parquet footers can't be byte-concatenated); caller switches to CSV/NDJSON.
  • Cache: object prefix is content-addressed on table.modified (dumps/<rid>/<fmt>/<rev>); a GCS list_blobs lookup short-circuits re-export. After a successful extract, stale revisions for the (rid, fmt) pair are swept so storage is bounded to one rev per format.
  • IAM follows the ro/rw split documented in CLAUDE.md §5.3 — BIGQUERY_CREDENTIALS_RO for metadata reads + cache lookup, BIGQUERY_CREDENTIALS for the export job, post-extract list, signing, and GC.

Why asyncio (not threads / sync httpx)

  • The "one URL → one file" contract forces server-side stream-concat for multi-shard exports — we cannot avoid the byte path.
  • With sync httpx + StreamingResponse, FastAPI would offload each shard read to the threadpool. A multi-GB dump would hold a worker for the entire transfer — a handful of concurrent dumps starves the pool and blocks unrelated requests.
  • Async httpx + aiter_bytes(64 KiB) keeps the whole flow on the event loop: ~64 KiB resident per active dump, zero threads, and client disconnect cancels the coroutine which propagates through httpx → GCS connection released immediately.
  • Same reasoning drives asyncio.to_thread(job.reload) + await asyncio.sleep(...) in the engine's poll loop — the worker is held only during the brief reload RPC, not during the wait.

Test plan

  • Single-shard CSV/NDJSON/Parquet → 302 redirect with correct signed URL + response-content-disposition.
  • Multi-shard CSV stream-concat strips duplicate headers off non-first shards (_skip_first_line).
  • Multi-shard NDJSON pure byte-concat.
  • Parquet >1 GB returns 413.
  • Cache hit (existing rev) skips the export job.
  • Cache miss runs EXPORT DATA and sweeps prior revs.
  • ro/rw IAM split — read paths use ro creds, write/sign paths use rw creds.
  • Manual smoke against a real BQ project before merging downstream services that depend on it.

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added /datastore/dump/{resource_id} endpoint for exporting resources in CSV, NDJSON, or Parquet formats with GCS caching and multi-shard streaming support
  • Improvements

    • Simplified authentication credential caching mechanism
    • Enhanced error handling for oversized Parquet exports (413 response)
  • Configuration

    • New environment variables: BIGQUERY_EXPORT_BUCKET, BIGQUERY_EXPORT_URL_EXPIRY_HOURS
  • Dependencies

    • Added google-cloud-storage library support

Review Change Stack

sagargg and others added 2 commits May 23, 2026 21:29
- Drop JWT `jti` cache-key derivation in default_key_id — an unverified
  `jti` could collide a forged token with a real user's cached
  decision. Always sha256 the full credential.
- CKAN provider: treat a corrupt cache entry as a miss (fall back to
  CKAN) instead of failing the request.
- CKAN provider: store the hashed key in Decision.subject instead of
  the raw credential, so the cache never carries plaintext keys.
- Postman generator: fix OUT_FILE typo (`ollection.json` →
  `postman/collection.json`), URL-encode query-string values, and fix
  the docstring's run-from-root path.
- README: document `BIGQUERY_DATASET` in the env-var table.
- Tests: the "without API key" assertions actually drop the
  Authorization header now (the conftest fixture installs one by
  default).

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Implements `GET /datastore/dump/{resource_id}?format=csv|ndjson|parquet`
as a single user-facing URL that yields one file regardless of how
BigQuery shards the underlying EXPORT DATA job.

Pipeline (engine):
  1. Read `table.modified`, derive a content-addressed prefix
     `dumps/<rid>/<fmt>/<rev>` where rev = hex(microsec epoch).
  2. List GCS at that prefix — non-empty → cache hit, skip the export.
  3. On miss, submit `EXPORT DATA` (Parquet → single URI, CSV/NDJSON →
     wildcard URI so BQ shards >1 GB exports).
  4. After a successful extract, sweep stale revisions for the (rid, fmt)
     pair — storage is bounded to one rev per format.
  5. Generate V4 signed URLs with `response-content-disposition` so the
     downloaded filename is `<rid>.<ext>` regardless of GCS object name.

Why asyncio (and not threads / sync httpx):

  - **One URL → one file** is the user-visible contract. For shards
    >1 GB BQ refuses single-file CSV/NDJSON, so we have to stream-concat
    on the server. The byte path is GCS → us → client.
  - With sync httpx + StreamingResponse FastAPI would offload each shard
    read to the threadpool. With multi-GB dumps that means a worker
    thread held for the entire transfer — a handful of concurrent dumps
    starves the pool and blocks unrelated requests.
  - Async httpx + `aiter_bytes(64 KiB)` keeps the whole flow on the
    event loop: ~64 KiB resident per active dump, zero threads, and
    client disconnect cancels the coroutine which propagates through
    httpx to release the GCS connection immediately.
  - Same reasoning drives `asyncio.to_thread(job.reload)` + `await
    asyncio.sleep(...)` in the engine's poll loop — the worker is held
    only for the brief `reload` round-trip, not during the wait.

CSV stream-concat strips the duplicate header off non-first shards via
`_skip_first_line` (memory bound: header bytes + one chunk). NDJSON is
pure byte concatenation. Parquet >1 GB is refused upstream with 413
(footers can't be concatenated); caller switches to CSV/NDJSON.

IAM follows the ro-for-reading, rw-for-writing split documented in
CLAUDE.md §5.3 — `BIGQUERY_CREDENTIALS_RO` for `get_table` +
`list_blobs` cache lookup, `BIGQUERY_CREDENTIALS` for
`EXPORT DATA` + post-extract list + signing + GC.

Includes 592-line test suite covering single-shard redirect,
multi-shard stream-concat (CSV header-dedup, NDJSON concat), cache
hit/miss, stale-rev GC, Parquet >1 GB rejection, and IAM split.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented May 26, 2026

Caution

Review failed

The pull request is closed.

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: dcd8c1be-b0dc-477a-bb6c-65e498fd1b42

📥 Commits

Reviewing files that changed from the base of the PR and between 040ee94 and a7339f8.

📒 Files selected for processing (24)
  • .env.example
  • CLAUDE.md
  • README.md
  • datastore/api/endpoints/dump.py
  • datastore/api/routes.py
  • datastore/auth/base.py
  • datastore/auth/ckan/provider.py
  • datastore/core/config.py
  • datastore/core/exceptions.py
  • datastore/infrastructure/engines/base.py
  • datastore/infrastructure/engines/bigquery/backend.py
  • datastore/infrastructure/engines/bigquery/client.py
  • datastore/services/dump.py
  • datastore/services/read.py
  • datastore/services/write.py
  • postman/generate_postman.py
  • pyproject.toml
  • tests/auth/ckan/test_provider.py
  • tests/auth/jwt/test_provider.py
  • tests/auth/test_base.py
  • tests/conftest.py
  • tests/test_datastore_create.py
  • tests/test_datastore_dump.py
  • tests/test_datastore_search.py

📝 Walkthrough

Walkthrough

This PR adds a new GET /datastore/dump/{resource_id} endpoint enabling full-table exports in CSV/NDJSON/Parquet formats via BigQuery EXPORT DATA with GCS caching. It refactors auth cache keys from JWT jti claims to SHA-256 hashing, offloads blocking engine calls to worker threads, and includes comprehensive test coverage with updated configuration and documentation.

Changes

Dump Export Feature & Foundational Changes

Layer / File(s) Summary
Configuration, Exceptions, and Backend Contract
datastore/core/config.py, datastore/core/exceptions.py, datastore/infrastructure/engines/base.py, pyproject.toml
Config adds BIGQUERY_EXPORT_BUCKET (GCS bucket) and BIGQUERY_EXPORT_URL_EXPIRY_HOURS (signed URL TTL with bounds). New PayloadTooLargeError(413) exception and HTTP mapping for oversized parquet exports. DatastoreBackend declares abstract dump(resource_id, fmt) -> list[str]. Adds google-cloud-storage>=2.14 dependency.
BigQuery Client Credential Management
datastore/infrastructure/engines/bigquery/client.py
Introduces load_credentials(config, mode) helper centralizing RO vs RW credential selection. build_client delegates to this helper and conditionally passes resolved credentials (or None for ADC fallback) to bigquery.Client.
BigQuery Dump Implementation with Caching and GC
datastore/infrastructure/engines/bigquery/backend.py
BigQueryBackend.dump(resource_id, fmt) implements EXPORT DATA with revisioned GCS caching via table.modified. Builds on-demand BigQuery/GCS clients for read-only and read-write operations. Submits export job, polls non-blockingly, detects >1GB single-file failures (raises PayloadTooLargeError), performs best-effort garbage collection of stale revisions, and generates signed URLs. Exports helpers format TIMESTAMP/DATETIME per format and classify oversized export errors.
API Endpoint and Multi-Shard Streaming Services
datastore/api/endpoints/dump.py, datastore/api/routes.py, datastore/services/dump.py
GET /datastore/dump/{resource_id} authorizes read access, fetches URLs from engine, returns 302 redirect for single shard or streams multi-shard concatenation. stream_csv_shards concatenates CSV bytes while deduplicating headers via _skip_first_line. stream_ndjson_shards concatenates NDJSON bytes directly. Both use tuned async httpx timeouts and chunked byte streaming.
Auth Cache Key Refactoring from JWT jti to SHA-256
datastore/auth/base.py, datastore/auth/ckan/provider.py
default_key_id replaces JWT jti extraction with SHA-256 hash of full credential. CKANAuthProvider.authorize wraps cache decode in try/except, treating malformed entries as cache miss. Cached Decision.subject stores derived key_id(credential) hash instead of raw credential.
Offload Blocking Engine Calls to Worker Threads
datastore/services/read.py, datastore/services/write.py
search_datastore, search_sql_datastore, info_datastore, create_datastore, upsert_datastore, and delete_datastore wrap blocking engine calls in asyncio.to_thread, preventing stalls on the event loop.
Environment Variables and Postman Collection Setup
.env.example, postman/generate_postman.py, tests/conftest.py
.env.example adds BIGQUERY_EXPORT_BUCKET and BIGQUERY_EXPORT_URL_EXPIRY_HOURS. postman/generate_postman.py fixes output path and percent-encodes query parameter keys/values. conftest.py clears BigQuery env vars before app import and sets placeholder TTL in isolation fixture.
Documentation Updates
CLAUDE.md, README.md
CLAUDE.md adds detailed /datastore/dump/{resource_id} API docs covering cache/GC flow, job polling, error handling, and IAM requirements. README.md documents new modules and extended configuration table.
Auth Tests for Cache Key Hashing and Cache Behavior
tests/auth/ckan/test_provider.py, tests/auth/jwt/test_provider.py, tests/auth/test_base.py
CKAN provider tests verify hashed Decision.subject, cache error recovery as miss, and no credential leakage. JWT provider tests verify key_id produces h: hash prefix. Base auth tests assert SHA-256 hashing for all credential shapes.
Comprehensive Dump Feature Test Suite
tests/test_datastore_dump.py
Endpoint tests cover 302 redirect, multi-shard streaming, 413 for oversized parquet, validation errors, and authentication. Unit tests verify export SQL formatting, too-large detection, and _skip_first_line across chunk boundaries. Engine tests mock GCS storage to validate cache hit/miss, revision GC, and cache invalidation on table modification. Includes async polling, placeholder mode, and missing bucket tests with httpx stream mocking.
Request Header Cleanup for Authorization Tests
tests/test_datastore_create.py, tests/test_datastore_search.py
Remove default Authorization header entirely before making unauthenticated requests, ensuring accurate request shape.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

  • datopian/datastore#1: Establishes the foundational dump/export infrastructure; this PR extends and refines that backend contract with production-ready export logic and GCS caching.

Suggested reviewers

  • luccasmmg

Poem

🐇 Hop, skip, and export!
Dumps now dance through BigQuery's carts,
With hashed credentials and async arts—
Cache keys spin, threads don't block,
A full-table waltz 'cross the data rock! 🎪

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch fix/dump-url

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@sagargg sagargg merged commit fdd3dc3 into main May 26, 2026
0 of 2 checks passed
@sagargg sagargg deleted the fix/dump-url branch May 26, 2026 09:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant