From d4c1f29a7ef2d3e2c6b1ab6429e170c6841e68d3 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sat, 25 Apr 2026 17:58:48 -0500 Subject: [PATCH 1/2] Add optional DuckDB connectors prototype for waterdata + wqp Adds dataretrieval.duckdb_connectors, an optional extension that wraps DuckDB connections with helper methods exposing the dataretrieval waterdata (OGC) and wqp endpoints as registerable SQL views. Each helper returns a duckdb.DuckDBPyRelation; pass register_as= to also publish the result as a named view that subsequent SQL can reference. Highlights: * Per-source layout (duckdb_connectors/{waterdata,wqp}.py) sharing a thin _BaseConnection in _base.py * Optional dependency: pip install dataretrieval[duckdb] * Compound spatial extra (pip install dataretrieval[spatial]) bundles geopandas + duckdb; spatial=True flag on connect() runs INSTALL spatial; LOAD spatial on the underlying connection so ST_GeomFromText etc. become available against registered views * Geometry handled by converting GeoDataFrame geometry to WKT plus longitude/latitude columns so the prototype works without the spatial extension by default * WQP connector threads connection-level legacy / ssl_check defaults through to every helper; per-call overrides supported * dataretrieval.duckdb_connector preserved as a backward-compat alias for the waterdata connector * Demo notebook covering site discovery, daily values, monthly aggregation, top-N window functions, sites x daily joins, latest readings, cross-source waterdata x WQP joins, and the spatial flag 15 tests pass; ruff check + format clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- NOTES.md | 180 +++++++ dataretrieval/duckdb_connector.py | 30 ++ dataretrieval/duckdb_connectors/__init__.py | 22 + dataretrieval/duckdb_connectors/_base.py | 176 ++++++ dataretrieval/duckdb_connectors/waterdata.py | 134 +++++ dataretrieval/duckdb_connectors/wqp.py | 175 ++++++ demos/duckdb_waterdata_demo.ipynb | 531 +++++++++++++++++++ pyproject.toml | 16 +- tests/duckdb_connectors_waterdata_test.py | 181 +++++++ tests/duckdb_connectors_wqp_test.py | 141 +++++ 10 files changed, 1584 insertions(+), 2 deletions(-) create mode 100644 NOTES.md create mode 100644 dataretrieval/duckdb_connector.py create mode 100644 dataretrieval/duckdb_connectors/__init__.py create mode 100644 dataretrieval/duckdb_connectors/_base.py create mode 100644 dataretrieval/duckdb_connectors/waterdata.py create mode 100644 dataretrieval/duckdb_connectors/wqp.py create mode 100644 demos/duckdb_waterdata_demo.ipynb create mode 100644 tests/duckdb_connectors_waterdata_test.py create mode 100644 tests/duckdb_connectors_wqp_test.py diff --git a/NOTES.md b/NOTES.md new file mode 100644 index 00000000..e261f71c --- /dev/null +++ b/NOTES.md @@ -0,0 +1,180 @@ +# DuckDB connector — design notes + +Notes captured before implementation. The goal is a prototype that lets users +query USGS waterdata endpoints via DuckDB SQL. + +## Package conventions to follow + +- **Style**: ruff-managed, py38 target, double quotes, docstring code + formatted at width 72. +- **Type hints**: full hints, use `from __future__ import annotations`, + `str | list[str] | None` style. +- **Docstrings**: numpy-style (Parameters / Returns sections with dashes). + Module top has a short one-paragraph summary. +- **Naming**: snake_case functions, leading underscore for private helpers, + UPPER_SNAKE_CASE module constants. +- **Logging**: `logger = logging.getLogger(__name__)` at module top. +- **Errors**: raise `ImportError` with a pip install hint when an optional + dep is missing, `ValueError` for bad arguments, `RuntimeError` for + unexpected response shapes. + +## Optional-dependency pattern (mirror `geopandas`) + +`dataretrieval/waterdata/utils.py:24`: +```python +try: + import geopandas as gpd + GEOPANDAS = True +except ImportError: + GEOPANDAS = False +``` + +For the connector we will: +- attempt `import duckdb` at the top of the module; +- on `ImportError` set a sentinel and raise a clear `ImportError` from + the public entry point telling users to `pip install dataretrieval[duckdb]`. + +`pyproject.toml` extras (current state): +```toml +[project.optional-dependencies] +test = [...] +doc = [...] +nldi = ['geopandas>=0.10'] +``` +Add a new `duckdb = ["duckdb>=1.0.0"]` extra. + +## Endpoint shape + +All `dataretrieval.waterdata.api.get_*` functions return +`tuple[pandas.DataFrame | geopandas.GeoDataFrame, BaseMetadata]`. Pagination +is fully handled inside `_walk_pages`, so a single call is the whole +result set. + +Endpoints we will expose first (highest user value, all OGC): +- `get_monitoring_locations` — site discovery (returns GeoDataFrame when + geopandas installed) +- `get_daily` — daily values +- `get_continuous` — instantaneous values (≤3y per call by API contract) +- `get_time_series_metadata` — what's available at each site +- `get_latest_continuous`, `get_latest_daily` — most recent obs + +Each accepts `monitoring_location_id`, `parameter_code`, etc. as scalar or +list, plus the new `filter` / `filter_lang` CQL passthrough (#238). + +## Architecture (after wqp + per-source split) + +After surveying `wqp.py` we moved to a per-source connector package: + +``` +dataretrieval/duckdb_connectors/ +├── _base.py # _require_duckdb, _flatten_geometry, _BaseConnection +├── waterdata.py # WaterdataConnection + connect() +└── wqp.py # WQPConnection + connect() (handles legacy / WQX3 flag) +``` + +`dataretrieval/duckdb_connector.py` stays as a thin alias re-exporting +the waterdata connector so the older import path keeps working. + +WQP differences vs waterdata that the connector has to absorb: + +* WQP getters take `**kwargs` (CamelCase URL params) rather than fully + enumerated signatures, so the connector can't validate kwargs — it + just forwards them. +* Two parallel schemas (legacy WQX vs WQX 3.0) controlled by `legacy=` + per call. The connection holds a default that callers can override + per call. +* `ssl_check` is also a connection-level default. +* WQP returns a custom `WQP_Metadata` instead of `BaseMetadata`, but + since the connector only consumes the DataFrame this doesn't matter. + +Joining across the two sources: each connector owns its own duckdb +connection, so to join you either materialise to a DataFrame and +`.con.register(name, df)` it onto the other connection, or open a +single `duckdb.connect()` directly and pass it into both +`WaterdataConnection(con)` and `WQPConnection(con)` manually. + +Other modules surveyed but not given connectors: + +* `nwis` — deprecated; users are being pushed to waterdata. +* `nldi` — returns GeoDataFrames / dicts; spatial-only, different + contract; possible later. +* `streamstats`, `nadp` — return non-tabular data (Watershed objects, + zip files / TIFs); not connector candidates. +* `ngwmn` — does return DataFrames but very narrow scope; could add + later if needed. +* `samples` — already covered by the waterdata connector via + `wd.samples(...)` (the `samples.py` module is a deprecated shim + that forwards to `waterdata.get_samples`). + +## DuckDB integration choices + +DuckDB ≥0.8 supports registering Python objects via `con.register(name, df)` +which makes a pandas DataFrame queryable as a view. That's the simplest +path and works with any DuckDB build — no compiled extension needed for a +prototype. + +DuckDB also supports `create_function` for **scalar** UDFs but **table** +UDFs (table-valued functions callable as `FROM tvf(...)`) require either +the in-progress python table-function API or a workaround. For a +prototype the simpler API is preferable — register helper *methods* on a +connection that take kwargs, fetch a DataFrame, register it under a +caller-chosen name, and return a `duckdb.DuckDBPyRelation`. The user +writes: + +```python +con = waterdata_duckdb.connect() +sites = con.monitoring_locations(state_name="Illinois") # relation +con.sql("SELECT * FROM sites WHERE site_type = 'Stream'") +``` + +This keeps it pythonic and lets users compose with arbitrary SQL, +including joins across two registered relations. + +A second affordance: a `con.sql_table(name, fn, **kwargs)` that registers +a one-shot DataFrame view by name, so: + +```python +con.sql_table("daily", waterdata.get_daily, + monitoring_location_id="USGS-05586100", + parameter_code="00060", time="2023/2024") +con.sql("SELECT date_trunc('month', time) AS m, avg(value) " + "FROM daily GROUP BY 1 ORDER BY 1") +``` + +## Geometry handling + +When geopandas is available, `get_monitoring_locations` returns a +GeoDataFrame with a `geometry` column. DuckDB has a `spatial` extension +that understands WKB/WKT but it isn't loaded by default. Safe path for +the prototype: convert geometry to WKT string and add `longitude` / +`latitude` columns. That keeps the relation queryable from plain DuckDB +without extension setup. + +## Tests + +Existing tests (`tests/waterdata_test.py`) use `requests-mock` against +real URLs. For our connector we don't need to re-test the HTTP layer — +we should mock the waterdata `get_*` functions directly with +`unittest.mock.patch` (this is the pattern in +`tests/waterdata_nearest_test.py`) and assert that: + +1. `connect()` raises a clean `ImportError` if duckdb isn't installed. +2. Helper methods invoke the underlying `get_*` with the kwargs we passed. +3. The returned object is a queryable DuckDB relation. +4. `sql_table` registers a view that returns the same row count as the + source DataFrame. +5. Geometry conversion produces WKT + lon/lat columns and drops the + GeoDataFrame `geometry` column (or keeps it as WKT) without needing + the spatial extension. + +## Notebook + +Goes in `demos/`. Should: +- show a real query against `api.waterdata.usgs.gov` +- demonstrate something easier in SQL than pandas (window function over + daily flow, monthly aggregation, join of monitoring-location metadata + to daily values) +- gracefully note that this needs `pip install dataretrieval[duckdb]` + +Demos are excluded from ruff (`extend-exclude = ["demos"]` in +pyproject.toml) so we don't have to fight formatting there. diff --git a/dataretrieval/duckdb_connector.py b/dataretrieval/duckdb_connector.py new file mode 100644 index 00000000..afaecf85 --- /dev/null +++ b/dataretrieval/duckdb_connector.py @@ -0,0 +1,30 @@ +"""Backwards-compatible alias for :mod:`dataretrieval.duckdb_connectors.waterdata`. + +The single-source connector originally lived here. It has since been +generalised into a per-source package so additional sources (WQP, +NGWMN, …) can be added without bloating one module. New code should +import directly from :mod:`dataretrieval.duckdb_connectors`:: + + from dataretrieval.duckdb_connectors import waterdata + + con = waterdata.connect() + +This module preserves the older entry point:: + + from dataretrieval import duckdb_connector + + con = duckdb_connector.connect() + +which is equivalent to the waterdata connector. +""" + +from __future__ import annotations + +from .duckdb_connectors._base import DUCKDB +from .duckdb_connectors.waterdata import WaterdataConnection, connect + +__all__ = [ + "DUCKDB", + "WaterdataConnection", + "connect", +] diff --git a/dataretrieval/duckdb_connectors/__init__.py b/dataretrieval/duckdb_connectors/__init__.py new file mode 100644 index 00000000..a9bf1a99 --- /dev/null +++ b/dataretrieval/duckdb_connectors/__init__.py @@ -0,0 +1,22 @@ +"""DuckDB connectors for ``dataretrieval`` data sources. + +Each submodule wraps one ``dataretrieval`` source (waterdata, wqp, …) +behind a duckdb connection so its endpoints can be queried as named +SQL views. The connectors are an *optional* extension; install with:: + + pip install dataretrieval[duckdb] + +Quickstart +---------- +>>> from dataretrieval.duckdb_connectors import waterdata, wqp +>>> with waterdata.connect() as con: +... con.monitoring_locations(state_name="Illinois", register_as="sites") +... con.sql("SELECT count(*) FROM sites").fetchone() +""" + +from __future__ import annotations + +from . import waterdata, wqp +from ._base import DUCKDB + +__all__ = ["DUCKDB", "waterdata", "wqp"] diff --git a/dataretrieval/duckdb_connectors/_base.py b/dataretrieval/duckdb_connectors/_base.py new file mode 100644 index 00000000..2e02dcff --- /dev/null +++ b/dataretrieval/duckdb_connectors/_base.py @@ -0,0 +1,176 @@ +"""Shared building blocks for the per-source DuckDB connectors. + +The connectors are an *optional* extension. Each public connector +(``waterdata``, ``wqp``, ...) wraps a ``duckdb.DuckDBPyConnection`` and +exposes the corresponding ``dataretrieval`` getters as helper methods +that return ``duckdb.DuckDBPyRelation`` objects. + +This module hosts what's common to all of them: + +* the optional-import guard, +* a ``_flatten_geometry`` helper for GeoDataFrame inputs, +* a ``_BaseConnection`` class with the connection-lifecycle plumbing + and the generic ``register_table`` / ``_endpoint`` helpers. +""" + +from __future__ import annotations + +import logging +from collections.abc import Callable +from typing import TYPE_CHECKING, Any + +import pandas as pd + +try: + import duckdb + + DUCKDB = True +except ImportError: + DUCKDB = False + +try: + import geopandas as gpd + + GEOPANDAS = True +except ImportError: + GEOPANDAS = False + +if TYPE_CHECKING: + import duckdb as _duckdb # noqa: F401 (resolved by type-checker only) + +logger = logging.getLogger(__name__) + +_INSTALL_HINT = ( + "duckdb is required for the dataretrieval DuckDB connectors. " + "Install it with `pip install dataretrieval[duckdb]`." +) + + +def _require_duckdb() -> None: + """Raise a clear ``ImportError`` if duckdb isn't installed.""" + if not DUCKDB: + raise ImportError(_INSTALL_HINT) + + +def _load_spatial(con: duckdb.DuckDBPyConnection) -> None: + """Install (if needed) and load DuckDB's ``spatial`` extension. + + The extension is a runtime C++ binary that DuckDB itself downloads + on first ``INSTALL spatial``; it is not a pip-installable package. + Once loaded, registered ``geometry`` columns (stored as WKT by the + connectors) can be parsed with ``ST_GeomFromText(geometry)``. + + Raises + ------ + RuntimeError + If the extension can't be installed or loaded — typically + because the host has no network access on first install. + """ + try: + con.execute("INSTALL spatial") + con.execute("LOAD spatial") + except Exception as exc: # pragma: no cover - depends on DuckDB build + raise RuntimeError( + "Failed to install/load DuckDB's spatial extension. " + "DuckDB downloads the extension on first install; check " + "network access, or install the spatial-aware DuckDB build." + ) from exc + + +def _flatten_geometry(df: pd.DataFrame) -> pd.DataFrame: + """Coerce a GeoDataFrame to a plain DataFrame DuckDB can register. + + DuckDB registers pandas DataFrames natively but does not understand + a ``geopandas`` ``GeoSeries`` without the spatial extension. To keep + the prototype dependency-light we convert any geometry column to + WKT and surface ``longitude`` / ``latitude`` columns when point + geometries are available. Non-geo input is returned unchanged. + """ + if not GEOPANDAS or not isinstance(df, gpd.GeoDataFrame): + return df + + geom_name = df.geometry.name + out = pd.DataFrame(df).copy() + + geom = df.geometry + try: + out["longitude"] = geom.x + out["latitude"] = geom.y + except Exception: + # Non-point geometries (lines, polygons): skip lon/lat shortcut. + pass + + out[geom_name] = geom.to_wkt() + return out + + +class _BaseConnection: + """Connection-lifecycle plumbing shared by every connector class. + + Subclasses are expected to add per-source endpoint helpers that + delegate to :meth:`_endpoint`. + """ + + def __init__(self, con: duckdb.DuckDBPyConnection) -> None: + self._con = con + + @property + def con(self) -> duckdb.DuckDBPyConnection: + """The underlying ``duckdb.DuckDBPyConnection``.""" + return self._con + + def sql(self, query: str) -> duckdb.DuckDBPyRelation: + """Run a SQL query against the connection. + + Equivalent to ``self.con.sql(query)``. + """ + return self._con.sql(query) + + def close(self) -> None: + """Close the underlying duckdb connection.""" + self._con.close() + + def __enter__(self): + return self + + def __exit__(self, *exc: Any) -> None: + self.close() + + def register_table( + self, + name: str, + fn: Callable[..., tuple[pd.DataFrame, Any]], + **kwargs: Any, + ) -> duckdb.DuckDBPyRelation: + """Call any ``(DataFrame, metadata)`` getter and register it. + + Parameters + ---------- + name : str + Name to register the resulting view under. + fn : callable + Any function returning ``(DataFrame, metadata)`` — for + example a ``dataretrieval.waterdata`` or ``dataretrieval.wqp`` + getter. + **kwargs + Forwarded to ``fn``. + + Returns + ------- + duckdb.DuckDBPyRelation + A relation pointing at the newly registered view. + """ + return self._endpoint(fn, name, kwargs) + + def _endpoint( + self, + fn: Callable[..., tuple[pd.DataFrame, Any]], + register_as: str | None, + kwargs: dict[str, Any], + ) -> duckdb.DuckDBPyRelation: + df, _ = fn(**kwargs) + df = _flatten_geometry(df) + if register_as is not None: + self._con.register(register_as, df) + return self._con.table(register_as) + return self._con.from_df(df) diff --git a/dataretrieval/duckdb_connectors/waterdata.py b/dataretrieval/duckdb_connectors/waterdata.py new file mode 100644 index 00000000..1ef77399 --- /dev/null +++ b/dataretrieval/duckdb_connectors/waterdata.py @@ -0,0 +1,134 @@ +"""DuckDB connector for the :mod:`dataretrieval.waterdata` endpoints. + +Wraps a ``duckdb.DuckDBPyConnection`` and exposes the OGC waterdata +getters as helper methods. Each helper returns a +``duckdb.DuckDBPyRelation``; pass ``register_as=`` to also +register the result as a named view that subsequent SQL can reference. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from dataretrieval import waterdata + +from ._base import DUCKDB, _BaseConnection, _load_spatial, _require_duckdb + +if TYPE_CHECKING: + import duckdb + + +class WaterdataConnection(_BaseConnection): + """A duckdb connection bundled with waterdata helper methods. + + Each helper calls the corresponding ``dataretrieval.waterdata`` + function, flattens any geometry, and returns a + ``duckdb.DuckDBPyRelation``. Pass ``register_as=`` to also + register the result as a named view on the connection so it can be + referenced from SQL by that name. + + The wrapper exposes :meth:`sql` and the underlying :attr:`con` + for any operation the helpers don't cover. + """ + + def monitoring_locations( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_monitoring_locations`.""" + return self._endpoint(waterdata.get_monitoring_locations, register_as, kwargs) + + def daily( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_daily`.""" + return self._endpoint(waterdata.get_daily, register_as, kwargs) + + def continuous( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_continuous`.""" + return self._endpoint(waterdata.get_continuous, register_as, kwargs) + + def time_series_metadata( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_time_series_metadata`.""" + return self._endpoint(waterdata.get_time_series_metadata, register_as, kwargs) + + def latest_continuous( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_latest_continuous`.""" + return self._endpoint(waterdata.get_latest_continuous, register_as, kwargs) + + def latest_daily( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_latest_daily`.""" + return self._endpoint(waterdata.get_latest_daily, register_as, kwargs) + + def field_measurements( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_field_measurements`.""" + return self._endpoint(waterdata.get_field_measurements, register_as, kwargs) + + def samples( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.waterdata.get_samples`. + + The samples endpoint queries the USGS Aquarius Samples database + and is distinct from :class:`dataretrieval.duckdb_connectors.wqp`, + which queries the multi-agency Water Quality Portal. + """ + return self._endpoint(waterdata.get_samples, register_as, kwargs) + + +def connect( + database: str = ":memory:", + *, + spatial: bool = False, + **kwargs: Any, +) -> WaterdataConnection: + """Open a DuckDB connection with waterdata helpers attached. + + Parameters + ---------- + database : str, default ``":memory:"`` + Path forwarded to :func:`duckdb.connect`. Use ``":memory:"`` + for an ephemeral connection. + spatial : bool, default ``False`` + If ``True``, ``INSTALL spatial; LOAD spatial;`` is run on the + underlying connection so that registered geometry columns + (stored as WKT) can be parsed with ``ST_GeomFromText``. The + extension is downloaded by DuckDB on first install and is not + a pip dependency. Pair with the ``spatial`` extra + (``pip install dataretrieval[spatial]``) to also pull in + ``geopandas`` for richer client-side geometry handling. + **kwargs + Additional keyword arguments forwarded to :func:`duckdb.connect`. + + Returns + ------- + WaterdataConnection + A connection wrapper exposing :meth:`daily`, :meth:`continuous`, + :meth:`monitoring_locations`, etc. + + Raises + ------ + ImportError + If the optional ``duckdb`` dependency is not installed. + RuntimeError + If ``spatial=True`` and the spatial extension cannot be loaded. + """ + _require_duckdb() + import duckdb # local import: only required after the guard above + + raw = duckdb.connect(database, **kwargs) + if spatial: + _load_spatial(raw) + return WaterdataConnection(raw) + + +__all__ = ["DUCKDB", "WaterdataConnection", "connect"] diff --git a/dataretrieval/duckdb_connectors/wqp.py b/dataretrieval/duckdb_connectors/wqp.py new file mode 100644 index 00000000..d53ed4e0 --- /dev/null +++ b/dataretrieval/duckdb_connectors/wqp.py @@ -0,0 +1,175 @@ +"""DuckDB connector for the :mod:`dataretrieval.wqp` endpoints. + +The Water Quality Portal (WQP) is a multi-agency repository covering +USGS, EPA, state agencies, etc. Its API exposes two parallel schemas: + +* the legacy WQX data profiles (default; ``legacy=True``), and +* the modern WQX 3.0 profiles (``legacy=False``). + +The connection holds the chosen schema as a default that's threaded +into every helper call; individual calls can override it with +``legacy=False`` (or vice versa) when needed. ``ssl_check`` is also a +connection-level default for the same reason. +""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any + +from dataretrieval import wqp + +from ._base import DUCKDB, _BaseConnection, _load_spatial, _require_duckdb + +if TYPE_CHECKING: + import duckdb + + +class WQPConnection(_BaseConnection): + """A duckdb connection bundled with WQP helper methods. + + Parameters + ---------- + con : duckdb.DuckDBPyConnection + The underlying duckdb connection. + legacy : bool, default ``True`` + Default schema used by every helper. ``True`` queries the + legacy WQX profiles (e.g. ``resultPhysChem``, ``narrowResult``, + ``biological``); ``False`` queries WQX 3.0 (``fullPhysChem``, + ``basicPhysChem``, ``narrow``). Override per-call with + ``legacy=...`` in any helper. + ssl_check : bool, default ``True`` + Default value for the ``ssl_check`` argument forwarded to the + WQP getters. Override per-call when needed. + """ + + def __init__( + self, + con: duckdb.DuckDBPyConnection, + legacy: bool = True, + ssl_check: bool = True, + ) -> None: + super().__init__(con) + self._legacy = legacy + self._ssl_check = ssl_check + + @property + def legacy(self) -> bool: + """Default schema used by every helper (``True`` = legacy WQX).""" + return self._legacy + + # --- Endpoint helpers ------------------------------------------------- + # + # Each helper forwards arbitrary CamelCase WQP query parameters as + # **kwargs. See the corresponding ``dataretrieval.wqp`` function for + # the full list of supported parameters. + + def get_results( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.get_results`.""" + return self._wqp_call(wqp.get_results, register_as, kwargs) + + def what_sites( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_sites`.""" + return self._wqp_call(wqp.what_sites, register_as, kwargs) + + def what_organizations( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_organizations`.""" + return self._wqp_call(wqp.what_organizations, register_as, kwargs) + + def what_projects( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_projects`.""" + return self._wqp_call(wqp.what_projects, register_as, kwargs) + + def what_activities( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_activities`.""" + return self._wqp_call(wqp.what_activities, register_as, kwargs) + + def what_detection_limits( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_detection_limits`.""" + return self._wqp_call(wqp.what_detection_limits, register_as, kwargs) + + def what_habitat_metrics( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_habitat_metrics`.""" + return self._wqp_call(wqp.what_habitat_metrics, register_as, kwargs) + + def what_activity_metrics( + self, *, register_as: str | None = None, **kwargs: Any + ) -> duckdb.DuckDBPyRelation: + """Wrap :func:`dataretrieval.wqp.what_activity_metrics`.""" + return self._wqp_call(wqp.what_activity_metrics, register_as, kwargs) + + # --- Internal --------------------------------------------------------- + + def _wqp_call( + self, + fn: Any, + register_as: str | None, + kwargs: dict[str, Any], + ) -> duckdb.DuckDBPyRelation: + """Inject the connection-level ``legacy`` / ``ssl_check`` defaults.""" + kwargs.setdefault("legacy", self._legacy) + kwargs.setdefault("ssl_check", self._ssl_check) + return self._endpoint(fn, register_as, kwargs) + + +def connect( + database: str = ":memory:", + *, + legacy: bool = True, + ssl_check: bool = True, + spatial: bool = False, + **kwargs: Any, +) -> WQPConnection: + """Open a DuckDB connection with WQP helpers attached. + + Parameters + ---------- + database : str, default ``":memory:"`` + Path forwarded to :func:`duckdb.connect`. + legacy : bool, default ``True`` + Default schema for every helper. See :class:`WQPConnection`. + ssl_check : bool, default ``True`` + Default ``ssl_check`` flag forwarded to the WQP getters. + spatial : bool, default ``False`` + If ``True``, install + load DuckDB's ``spatial`` extension on + the underlying connection. See + :func:`dataretrieval.duckdb_connectors.waterdata.connect`. + **kwargs + Additional keyword arguments forwarded to :func:`duckdb.connect`. + + Returns + ------- + WQPConnection + A connection wrapper exposing :meth:`get_results`, + :meth:`what_sites`, etc. + + Raises + ------ + ImportError + If the optional ``duckdb`` dependency is not installed. + RuntimeError + If ``spatial=True`` and the spatial extension cannot be loaded. + """ + _require_duckdb() + import duckdb # local: only required after the guard above + + raw = duckdb.connect(database, **kwargs) + if spatial: + _load_spatial(raw) + return WQPConnection(raw, legacy=legacy, ssl_check=ssl_check) + + +__all__ = ["DUCKDB", "WQPConnection", "connect"] diff --git a/demos/duckdb_waterdata_demo.ipynb b/demos/duckdb_waterdata_demo.ipynb new file mode 100644 index 00000000..2fd89d8e --- /dev/null +++ b/demos/duckdb_waterdata_demo.ipynb @@ -0,0 +1,531 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Querying USGS data with DuckDB\n", + "\n", + "This notebook demonstrates the optional\n", + "`dataretrieval.duckdb_connectors` package. There is one connector per\n", + "data source \u2014 `waterdata` for the OGC API, `wqp` for the\n", + "Water Quality Portal \u2014 and each wraps a [DuckDB](https://duckdb.org/)\n", + "connection so that endpoints can be queried directly from SQL.\n", + "\n", + "## Why DuckDB?\n", + "\n", + "The getters already return pandas DataFrames, so why involve another\n", + "engine? A few reasons:\n", + "\n", + "* **Joining heterogeneous endpoints in one query.** Sites, daily\n", + " values, time-series metadata and water-quality results live behind\n", + " separate endpoints with different shapes. Once registered as views\n", + " they can be combined with a single SQL statement \u2014 even across\n", + " data sources.\n", + "* **Window functions and time aggregations.** Ranking the wettest\n", + " months per gauge or computing rolling means is very compact in SQL.\n", + "* **Compose with files on disk.** DuckDB can read Parquet, CSV and\n", + " remote S3 objects natively, so cached water data can be joined\n", + " against external datasets without leaving SQL.\n", + "\n", + "## Install\n", + "\n", + "```bash\n", + "pip install dataretrieval[duckdb]\n", + "```\n", + "\n", + "If `geopandas` is also installed, monitoring-location geometry is\n", + "converted to WKT plus `longitude`/`latitude` columns so the prototype\n", + "doesn't need the DuckDB spatial extension.\n", + "\n", + "Set an [API token](https://api.waterdata.usgs.gov/signup/) for higher\n", + "rate limits:\n", + "\n", + "```python\n", + "import os\n", + "os.environ[\"API_USGS_PAT\"] = \"your_api_key_here\"\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from dataretrieval.duckdb_connectors import waterdata, wqp\n", + "\n", + "wd = waterdata.connect() # in-memory connection wired to the OGC waterdata API" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 1. Discover sites\n", + "\n", + "Pull a small set of streamflow gauges in Illinois and register them as\n", + "the `sites` view. Every endpoint helper accepts `register_as=`\n", + "to publish the result as a view that subsequent SQL can reference." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd.monitoring_locations(\n", + " state_name=\"Illinois\",\n", + " site_type=\"Stream\",\n", + " register_as=\"sites\",\n", + " limit=200,\n", + ")\n", + "wd.sql(\n", + " \"\"\"\n", + " SELECT monitoring_location_id, monitoring_location_name,\n", + " county_name, drainage_area, longitude, latitude\n", + " FROM sites\n", + " WHERE drainage_area IS NOT NULL\n", + " ORDER BY drainage_area DESC\n", + " LIMIT 10\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 2. Pull daily streamflow for a few gauges\n", + "\n", + "Pick three Illinois River basin gauges and pull a year of daily mean\n", + "discharge. The waterdata getter accepts a list of monitoring-location\n", + "IDs and handles pagination internally, so the single call below\n", + "returns the entire year for all three sites in one DataFrame which we\n", + "register as the `daily` view." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "GAUGES = [\n", + " \"USGS-05586100\", # Illinois River at Valley City\n", + " \"USGS-05543500\", # Illinois River at Marseilles\n", + " \"USGS-05447500\", # Green River near Geneseo\n", + "]\n", + "\n", + "wd.daily(\n", + " monitoring_location_id=GAUGES,\n", + " parameter_code=\"00060\", # discharge, cubic feet per second\n", + " statistic_id=\"00003\", # daily mean\n", + " time=\"2023-01-01/2023-12-31\",\n", + " register_as=\"daily\",\n", + ")\n", + "wd.sql(\"SELECT count(*) AS n_rows, count(DISTINCT monitoring_location_id) AS n_sites FROM daily\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 3. Aggregate: monthly mean flow per gauge\n", + "\n", + "Once the data is in DuckDB, time-bucketing is a one-liner with\n", + "`date_trunc`. Compare with the equivalent pandas pipeline\n", + "(`groupby([id, pd.Grouper(...)])` and `unstack`) and the SQL is\n", + "noticeably more direct." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd.sql(\n", + " \"\"\"\n", + " SELECT monitoring_location_id,\n", + " date_trunc('month', time)::DATE AS month,\n", + " round(avg(value), 1) AS mean_cfs,\n", + " round(max(value), 1) AS max_cfs\n", + " FROM daily\n", + " GROUP BY monitoring_location_id, month\n", + " ORDER BY monitoring_location_id, month\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 4. Window function: top 5 highest-flow days per gauge\n", + "\n", + "Window functions are where SQL really pays off. The query below ranks\n", + "every day within each gauge by discharge and returns the five\n", + "highest. The same operation in pandas is doable but reads less\n", + "cleanly." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd.sql(\n", + " \"\"\"\n", + " SELECT *\n", + " FROM (\n", + " SELECT monitoring_location_id, time::DATE AS date, value,\n", + " row_number() OVER (\n", + " PARTITION BY monitoring_location_id\n", + " ORDER BY value DESC\n", + " ) AS rk\n", + " FROM daily\n", + " )\n", + " WHERE rk <= 5\n", + " ORDER BY monitoring_location_id, rk\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 5. Join site metadata to daily values\n", + "\n", + "`sites` and `daily` are independent endpoints; in SQL they're just\n", + "two views joined on `monitoring_location_id`. The query below produces\n", + "annual statistics enriched with site name and drainage area." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Re-pull sites filtered to just our gauges so we have their metadata\n", + "wd.monitoring_locations(\n", + " monitoring_location_id=GAUGES,\n", + " register_as=\"gauge_sites\",\n", + ")\n", + "\n", + "wd.sql(\n", + " \"\"\"\n", + " SELECT s.monitoring_location_name AS name,\n", + " s.drainage_area,\n", + " round(avg(d.value), 1) AS mean_cfs,\n", + " round(min(d.value), 1) AS min_cfs,\n", + " round(max(d.value), 1) AS max_cfs,\n", + " round(avg(d.value) / NULLIF(s.drainage_area, 0), 3)\n", + " AS unit_runoff_cfs_per_sqmi\n", + " FROM gauge_sites s\n", + " JOIN daily d USING (monitoring_location_id)\n", + " GROUP BY s.monitoring_location_name, s.drainage_area\n", + " ORDER BY mean_cfs DESC\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 6. Latest readings across many sites\n", + "\n", + "The `latest_continuous` endpoint returns one row per site \u2014 perfect\n", + "for a near-realtime dashboard." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd.latest_continuous(\n", + " monitoring_location_id=GAUGES,\n", + " parameter_code=\"00060\",\n", + " register_as=\"latest\",\n", + ")\n", + "wd.sql(\n", + " \"\"\"\n", + " SELECT s.monitoring_location_name AS name,\n", + " l.time AS observed_at,\n", + " l.value AS cfs,\n", + " l.qualifier\n", + " FROM gauge_sites s\n", + " JOIN latest l USING (monitoring_location_id)\n", + " ORDER BY observed_at DESC\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 7. Cross-source: pair waterdata streamflow with WQP water quality\n", + "\n", + "The Water Quality Portal (WQP) is a separate multi-agency repository\n", + "covering USGS plus EPA, state agencies, etc. Its connector follows\n", + "the same pattern, but the WQP API uses CamelCase parameters and a\n", + "different ID schema (e.g. `MonitoringLocationIdentifier`).\n", + "\n", + "Below we open a second connection to WQP, pull water-quality results\n", + "for our Illinois River gauge, and ATTACH the waterdata in-memory\n", + "database into the WQP one \u2014 letting a single query join the two\n", + "sources." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import warnings\n", + "warnings.filterwarnings(\"ignore\") # WQP raises a UserWarning on legacy use\n", + "\n", + "wq = wqp.connect(legacy=True)\n", + "wq.get_results(\n", + " siteid=\"USGS-05586100\",\n", + " startDateLo=\"01-01-2010\",\n", + " startDateHi=\"12-31-2015\",\n", + " register_as=\"wqp_results\",\n", + ")\n", + "wq.sql(\n", + " \"\"\"\n", + " SELECT count(*) AS n_results,\n", + " count(DISTINCT CharacteristicName) AS n_characteristics,\n", + " min(ActivityStartDate) AS earliest,\n", + " max(ActivityStartDate) AS latest\n", + " FROM wqp_results\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Top 10 most-measured characteristics at this gauge in WQP\n", + "wq.sql(\n", + " \"\"\"\n", + " SELECT CharacteristicName,\n", + " count(*) AS n_obs\n", + " FROM wqp_results\n", + " WHERE ResultMeasureValue IS NOT NULL\n", + " GROUP BY CharacteristicName\n", + " ORDER BY n_obs DESC\n", + " LIMIT 10\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Joining across connections\n", + "\n", + "Each connector owns its own duckdb connection. To join across the two,\n", + "we register one source's relation onto the other's connection. Below\n", + "we publish `gauge_sites` (from the waterdata connection) onto the WQP\n", + "connection and then JOIN it with `wqp_results`." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Pull just the metadata we want from waterdata as a DataFrame and\n", + "# register it on the WQP connection.\n", + "gauge_sites_df = wd.sql(\n", + " \"\"\"\n", + " SELECT monitoring_location_id,\n", + " monitoring_location_name AS site_name,\n", + " drainage_area\n", + " FROM gauge_sites\n", + " \"\"\"\n", + ").df()\n", + "wq.con.register(\"gauge_sites\", gauge_sites_df)\n", + "\n", + "wq.sql(\n", + " \"\"\"\n", + " SELECT s.site_name,\n", + " r.CharacteristicName AS characteristic,\n", + " count(*) AS n_obs,\n", + " round(avg(r.ResultMeasureValue), 3) AS mean_value\n", + " FROM gauge_sites s\n", + " JOIN wqp_results r\n", + " ON s.monitoring_location_id = r.MonitoringLocationIdentifier\n", + " WHERE r.ResultMeasureValue IS NOT NULL\n", + " AND r.CharacteristicName IN ('pH', 'Temperature, water', 'Dissolved oxygen (DO)')\n", + " GROUP BY s.site_name, r.CharacteristicName\n", + " ORDER BY characteristic\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 8. Drop into pandas / arrow when it's useful\n", + "\n", + "Every helper returns a `duckdb.DuckDBPyRelation`. Convert to a\n", + "DataFrame for plotting, or to Arrow for downstream tooling, with the\n", + "standard DuckDB methods. The underlying connection is exposed as\n", + "`con.con` for any DuckDB feature the helpers don't cover (Parquet\n", + "export, attached databases, extensions)." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "monthly = wd.sql(\n", + " \"\"\"\n", + " SELECT date_trunc('month', time)::DATE AS month,\n", + " monitoring_location_id,\n", + " avg(value) AS mean_cfs\n", + " FROM daily\n", + " GROUP BY month, monitoring_location_id\n", + " ORDER BY month\n", + " \"\"\"\n", + ").df()\n", + "monthly.head()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Optional: persist any joined result as Parquet for later reuse.\n", + "# wd.sql(\"COPY (SELECT * FROM daily) TO 'illinois_daily_2023.parquet'\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd.close()\n", + "wq.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## 9. Optional: native geometry via the spatial extension\n", + "\n", + "By default, geometry is flattened to WKT plus `longitude`/`latitude`\n", + "columns so no extension is required. Pass `spatial=True` to\n", + "`connect()` to install + load DuckDB's `spatial` extension on the\n", + "underlying connection \u2014 `ST_GeomFromText`, `ST_DWithin_Sphere`, etc.\n", + "then become available against the registered views.\n", + "\n", + "Install the compound extra to also pull in `geopandas` for richer\n", + "client-side geometry:\n", + "\n", + "```bash\n", + "pip install dataretrieval[spatial]\n", + "```\n", + "\n", + "(The DuckDB spatial extension itself is a runtime C++ extension\n", + "downloaded by DuckDB on first install; it's not a pip package.)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd_geo = waterdata.connect(spatial=True)\n", + "wd_geo.monitoring_locations(\n", + " state_name=\"Illinois\",\n", + " site_type=\"Stream\",\n", + " register_as=\"sites\",\n", + " limit=200,\n", + ")\n", + "\n", + "# Point-in-polygon: streamflow gauges within a central-Illinois box.\n", + "wd_geo.sql(\n", + " \"\"\"\n", + " WITH central_il AS (\n", + " SELECT ST_GeomFromText(\n", + " 'POLYGON((-90.5 39.5, -88.5 39.5, -88.5 40.5, -90.5 40.5, -90.5 39.5))'\n", + " ) AS poly\n", + " )\n", + " SELECT monitoring_location_name, longitude, latitude\n", + " FROM sites, central_il\n", + " WHERE ST_Within(ST_GeomFromText(geometry), poly)\n", + " ORDER BY monitoring_location_name\n", + " LIMIT 10\n", + " \"\"\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "wd_geo.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Notes on the prototype\n", + "\n", + "* The connectors are intentionally thin \u2014 they don't replace any of\n", + " the underlying APIs, they just make results queryable as views.\n", + " Pagination and CQL filter chunking are still handled inside\n", + " `dataretrieval.waterdata`.\n", + "* By default, geometry is flattened to WKT so the demo doesn't\n", + " require any extension. Pass `spatial=True` to `connect()` for\n", + " native DuckDB geometry support (see section 9).\n", + "* `register_table(name, fn, **kwargs)` accepts any function that\n", + " returns `(DataFrame, metadata)` \u2014 useful for endpoints not yet\n", + " exposed as named helpers (e.g. `dataretrieval.waterdata.get_samples`).\n", + "* The legacy import path `from dataretrieval import duckdb_connector`\n", + " is preserved as an alias for the waterdata connector." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "name": "python", + "pygments_lexer": "ipython3" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 1322dcc3..6df85161 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -26,7 +26,7 @@ dependencies = [ dynamic = ["version"] [tool.setuptools] -packages = ["dataretrieval", "dataretrieval.codes"] +packages = ["dataretrieval", "dataretrieval.codes", "dataretrieval.duckdb_connectors"] [tool.setuptools.package-data] dataretrieval = ["py.typed"] @@ -53,6 +53,18 @@ doc = [ nldi = [ 'geopandas>=0.10' ] +duckdb = [ + "duckdb>=1.0.0" +] +# Compound extra: pulls in both ``nldi`` (geopandas) and ``duckdb``. +# DuckDB's ``spatial`` extension itself isn't a pip package — it's a +# runtime C++ extension that DuckDB downloads on first +# ``INSTALL spatial; LOAD spatial;``. Pass ``spatial=True`` to a +# connector's ``connect()`` to trigger that load. +spatial = [ + "dataretrieval[nldi]", + "dataretrieval[duckdb]" +] [project.urls] homepage = "https://github.com/DOI-USGS/dataretrieval-python" @@ -63,7 +75,7 @@ repository = "https://github.com/DOI-USGS/dataretrieval-python.git" write_to = "dataretrieval/_version.py" [tool.ruff] -target-version = "py38" +target-version = "py310" extend-exclude = ["demos"] [tool.ruff.lint] diff --git a/tests/duckdb_connectors_waterdata_test.py b/tests/duckdb_connectors_waterdata_test.py new file mode 100644 index 00000000..2639bee4 --- /dev/null +++ b/tests/duckdb_connectors_waterdata_test.py @@ -0,0 +1,181 @@ +"""Tests for the optional waterdata DuckDB connector. + +The waterdata getters are mocked at the function boundary so tests run +without a network or an API key. We rely on a real ``duckdb`` install +to validate the SQL-side behaviour. Tests are skipped when duckdb is +not available so the test suite still runs for users without the +optional extra installed. +""" + +from __future__ import annotations + +from unittest import mock + +import pandas as pd +import pytest + +duckdb = pytest.importorskip("duckdb") + +from dataretrieval.duckdb_connectors import waterdata as wd_connector # noqa: E402 + + +@pytest.fixture +def sites_df() -> pd.DataFrame: + return pd.DataFrame( + { + "id": ["USGS-01", "USGS-02", "USGS-03"], + "site_type": ["Stream", "Well", "Stream"], + "state_name": ["Illinois", "Illinois", "Iowa"], + } + ) + + +@pytest.fixture +def daily_df() -> pd.DataFrame: + return pd.DataFrame( + { + "monitoring_location_id": ["USGS-01"] * 3 + ["USGS-03"] * 3, + "time": pd.to_datetime( + [ + "2024-01-01", + "2024-01-02", + "2024-01-03", + "2024-01-01", + "2024-01-02", + "2024-01-03", + ], + utc=True, + ), + "value": [10.0, 11.0, 12.0, 20.0, 21.0, 22.0], + "parameter_code": ["00060"] * 6, + } + ) + + +def test_connect_returns_wrapper(): + con = wd_connector.connect() + try: + assert isinstance(con, wd_connector.WaterdataConnection) + assert isinstance(con.con, duckdb.DuckDBPyConnection) + assert con.sql("SELECT 1 AS x").fetchone() == (1,) + finally: + con.close() + + +def test_connect_raises_without_duckdb(monkeypatch): + """If duckdb isn't installed, ``connect`` must give a clear error.""" + from dataretrieval.duckdb_connectors import _base + + monkeypatch.setattr(_base, "DUCKDB", False) + with pytest.raises(ImportError, match="duckdb"): + wd_connector.connect() + + +def test_monitoring_locations_forwards_kwargs(sites_df): + with mock.patch("dataretrieval.waterdata.get_monitoring_locations") as m: + m.return_value = (sites_df, mock.Mock()) + with wd_connector.connect() as con: + rel = con.monitoring_locations(state_name="Illinois", site_type="Stream") + assert isinstance(rel, duckdb.DuckDBPyRelation) + assert rel.fetchall() == list(sites_df.itertuples(index=False, name=None)) + m.assert_called_once_with(state_name="Illinois", site_type="Stream") + + +def test_register_as_creates_named_view(sites_df): + with mock.patch("dataretrieval.waterdata.get_monitoring_locations") as m: + m.return_value = (sites_df, mock.Mock()) + with wd_connector.connect() as con: + con.monitoring_locations(register_as="sites") + count = con.sql("SELECT count(*) FROM sites").fetchone()[0] + assert count == len(sites_df) + stream_ids = { + row[0] + for row in con.sql( + "SELECT id FROM sites WHERE site_type = 'Stream'" + ).fetchall() + } + assert stream_ids == {"USGS-01", "USGS-03"} + + +def test_register_table_works_with_arbitrary_getter(sites_df): + """``register_table`` should accept any (df, meta)-returning callable.""" + + def fake_getter(**kwargs): + assert kwargs == {"foo": "bar"} + return sites_df, mock.Mock() + + with wd_connector.connect() as con: + rel = con.register_table("custom", fake_getter, foo="bar") + assert isinstance(rel, duckdb.DuckDBPyRelation) + assert con.sql("SELECT count(*) FROM custom").fetchone() == (len(sites_df),) + + +def test_sql_join_across_two_endpoints(sites_df, daily_df): + """Demonstrates the actual value-add: SQL across registered views.""" + with ( + mock.patch("dataretrieval.waterdata.get_monitoring_locations") as m_sites, + mock.patch("dataretrieval.waterdata.get_daily") as m_daily, + ): + m_sites.return_value = (sites_df, mock.Mock()) + m_daily.return_value = (daily_df, mock.Mock()) + with wd_connector.connect() as con: + con.monitoring_locations(register_as="sites") + con.daily(register_as="daily") + rows = con.sql( + """ + SELECT s.id, s.state_name, avg(d.value) AS mean_value + FROM sites s + JOIN daily d ON s.id = d.monitoring_location_id + WHERE s.site_type = 'Stream' + GROUP BY s.id, s.state_name + ORDER BY s.id + """ + ).fetchall() + assert rows == [("USGS-01", "Illinois", 11.0), ("USGS-03", "Iowa", 21.0)] + + +def test_spatial_flag_loads_extension(): + """``spatial=True`` should make ``ST_*`` functions available. + + Skipped if the host can't reach DuckDB's extension registry. + """ + try: + con = wd_connector.connect(spatial=True) + except RuntimeError as exc: # network-less test runner + pytest.skip(f"Spatial extension unavailable: {exc}") + try: + wkt = con.sql("SELECT ST_AsText(ST_Point(-90.1, 38.6))").fetchone()[0] + assert wkt == "POINT (-90.1 38.6)" + finally: + con.close() + + +def test_geometry_is_flattened_to_wkt(): + """GeoDataFrame input should be converted to a registerable frame. + + Skipped if geopandas isn't installed; the connector handles that + case by passing through, and the smoke tests above already cover + the non-geo path. + """ + gpd = pytest.importorskip("geopandas") + from shapely.geometry import Point + + gdf = gpd.GeoDataFrame( + { + "id": ["USGS-01", "USGS-02"], + "geometry": [Point(-90.1, 38.6), Point(-89.4, 39.7)], + }, + crs="EPSG:4326", + ) + with mock.patch("dataretrieval.waterdata.get_monitoring_locations") as m: + m.return_value = (gdf, mock.Mock()) + with wd_connector.connect() as con: + con.monitoring_locations(register_as="sites") + cols = [c[0] for c in con.sql("DESCRIBE sites").fetchall()] + assert {"longitude", "latitude", "geometry"} <= set(cols) + row = con.sql( + "SELECT longitude, latitude, geometry FROM sites WHERE id = 'USGS-01'" + ).fetchone() + assert row[0] == pytest.approx(-90.1) + assert row[1] == pytest.approx(38.6) + assert row[2].startswith("POINT") diff --git a/tests/duckdb_connectors_wqp_test.py b/tests/duckdb_connectors_wqp_test.py new file mode 100644 index 00000000..10c7a759 --- /dev/null +++ b/tests/duckdb_connectors_wqp_test.py @@ -0,0 +1,141 @@ +"""Tests for the optional WQP DuckDB connector. + +The wqp getters are mocked at the function boundary so tests run +without a network. Like ``duckdb_connectors_waterdata_test``, the +whole module is skipped when duckdb isn't installed. +""" + +from __future__ import annotations + +from unittest import mock + +import pandas as pd +import pytest + +duckdb = pytest.importorskip("duckdb") + +from dataretrieval.duckdb_connectors import wqp as wqp_connector # noqa: E402 + + +@pytest.fixture +def results_df() -> pd.DataFrame: + return pd.DataFrame( + { + "OrganizationIdentifier": ["USGS-IL", "USGS-IL"], + "MonitoringLocationIdentifier": ["USGS-05586100", "USGS-05586100"], + "ActivityStartDate": ["2023-05-01", "2023-06-01"], + "CharacteristicName": ["pH", "Temperature, water"], + "ResultMeasureValue": [7.4, 19.2], + "ResultMeasure/MeasureUnitCode": ["std units", "deg C"], + } + ) + + +@pytest.fixture +def sites_df() -> pd.DataFrame: + return pd.DataFrame( + { + "OrganizationIdentifier": ["USGS-IL", "USGS-IL"], + "MonitoringLocationIdentifier": ["USGS-05586100", "USGS-05543500"], + "MonitoringLocationName": [ + "ILLINOIS RIVER AT VALLEY CITY, IL", + "ILLINOIS RIVER AT MARSEILLES, IL", + ], + "StateCode": ["US:17", "US:17"], + } + ) + + +def test_connect_returns_wrapper(): + con = wqp_connector.connect() + try: + assert isinstance(con, wqp_connector.WQPConnection) + assert con.legacy is True + assert isinstance(con.con, duckdb.DuckDBPyConnection) + finally: + con.close() + + +def test_connect_legacy_flag_is_threaded(): + con = wqp_connector.connect(legacy=False) + try: + assert con.legacy is False + finally: + con.close() + + +def test_get_results_forwards_kwargs_and_legacy(results_df): + """The connection-level ``legacy`` flag should reach the underlying call.""" + with mock.patch("dataretrieval.wqp.get_results") as m: + m.return_value = (results_df, mock.Mock()) + with wqp_connector.connect(legacy=False) as con: + rel = con.get_results(siteid="USGS-05586100", characteristicName="pH") + assert isinstance(rel, duckdb.DuckDBPyRelation) + kwargs = m.call_args.kwargs + assert kwargs["siteid"] == "USGS-05586100" + assert kwargs["characteristicName"] == "pH" + assert kwargs["legacy"] is False + assert kwargs["ssl_check"] is True + + +def test_per_call_overrides_connection_default(results_df): + """Passing ``legacy=`` to a helper must override the connection default.""" + with mock.patch("dataretrieval.wqp.get_results") as m: + m.return_value = (results_df, mock.Mock()) + with wqp_connector.connect(legacy=True) as con: + con.get_results(siteid="USGS-05586100", legacy=False) + assert m.call_args.kwargs["legacy"] is False + + +def test_what_sites_register_as(sites_df): + with mock.patch("dataretrieval.wqp.what_sites") as m: + m.return_value = (sites_df, mock.Mock()) + with wqp_connector.connect() as con: + con.what_sites(statecode="US:17", register_as="sites") + count = con.sql("SELECT count(*) FROM sites").fetchone()[0] + assert count == len(sites_df) + + +def test_join_results_to_sites(results_df, sites_df): + """A WQP join across two services in one query.""" + with ( + mock.patch("dataretrieval.wqp.what_sites") as m_sites, + mock.patch("dataretrieval.wqp.get_results") as m_results, + ): + m_sites.return_value = (sites_df, mock.Mock()) + m_results.return_value = (results_df, mock.Mock()) + with wqp_connector.connect() as con: + con.what_sites(statecode="US:17", register_as="sites") + con.get_results(statecode="US:17", register_as="results") + rows = con.sql( + """ + SELECT s.MonitoringLocationName AS name, + r.CharacteristicName AS characteristic, + r.ResultMeasureValue AS value + FROM sites s + JOIN results r USING (MonitoringLocationIdentifier) + ORDER BY r.ActivityStartDate + """ + ).fetchall() + assert rows == [ + ("ILLINOIS RIVER AT VALLEY CITY, IL", "pH", 7.4), + ("ILLINOIS RIVER AT VALLEY CITY, IL", "Temperature, water", 19.2), + ] + + +def test_what_endpoints_invoke_correct_underlying(results_df): + """Each helper should invoke its corresponding wqp function.""" + helper_to_target = { + "what_organizations": "what_organizations", + "what_projects": "what_projects", + "what_activities": "what_activities", + "what_detection_limits": "what_detection_limits", + "what_habitat_metrics": "what_habitat_metrics", + "what_activity_metrics": "what_activity_metrics", + } + for helper_name, target in helper_to_target.items(): + with mock.patch(f"dataretrieval.wqp.{target}") as m: + m.return_value = (results_df, mock.Mock()) + with wqp_connector.connect() as con: + getattr(con, helper_name)(statecode="US:17") + assert m.call_count == 1, helper_name From e4c84bf057a2d881c561cdd4d96a03dfab2db446 Mon Sep 17 00:00:00 2001 From: thodson-usgs Date: Sat, 25 Apr 2026 19:41:09 -0500 Subject: [PATCH 2/2] Apply /simplify pass on duckdb connectors * Narrow `_flatten_geometry` exception from `Exception` to `ValueError` (the actual error geopandas raises on `.x`/`.y` for non-Point geometries) so genuine bugs aren't swallowed. * Drop the `if TYPE_CHECKING: import duckdb as _duckdb` block in `_base.py`. The runtime `try: import duckdb` is enough for type checkers; the alias was dead code. * Parametrize `test_what_endpoint_invokes_correct_underlying` so each WQP helper gets its own test case (matches the parametrize pattern already used in `tests/waterdata_utils_test.py`). 20 tests pass; ruff clean. Co-Authored-By: Claude Opus 4.7 (1M context) --- dataretrieval/duckdb_connectors/_base.py | 9 +++---- tests/duckdb_connectors_wqp_test.py | 32 +++++++++++++----------- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/dataretrieval/duckdb_connectors/_base.py b/dataretrieval/duckdb_connectors/_base.py index 2e02dcff..d7d0f03b 100644 --- a/dataretrieval/duckdb_connectors/_base.py +++ b/dataretrieval/duckdb_connectors/_base.py @@ -17,7 +17,7 @@ import logging from collections.abc import Callable -from typing import TYPE_CHECKING, Any +from typing import Any import pandas as pd @@ -35,9 +35,6 @@ except ImportError: GEOPANDAS = False -if TYPE_CHECKING: - import duckdb as _duckdb # noqa: F401 (resolved by type-checker only) - logger = logging.getLogger(__name__) _INSTALL_HINT = ( @@ -96,8 +93,8 @@ def _flatten_geometry(df: pd.DataFrame) -> pd.DataFrame: try: out["longitude"] = geom.x out["latitude"] = geom.y - except Exception: - # Non-point geometries (lines, polygons): skip lon/lat shortcut. + except ValueError: + # geopandas raises ValueError on .x/.y for non-Point geometries. pass out[geom_name] = geom.to_wkt() diff --git a/tests/duckdb_connectors_wqp_test.py b/tests/duckdb_connectors_wqp_test.py index 10c7a759..2cf534b6 100644 --- a/tests/duckdb_connectors_wqp_test.py +++ b/tests/duckdb_connectors_wqp_test.py @@ -123,19 +123,21 @@ def test_join_results_to_sites(results_df, sites_df): ] -def test_what_endpoints_invoke_correct_underlying(results_df): +@pytest.mark.parametrize( + "helper", + [ + "what_organizations", + "what_projects", + "what_activities", + "what_detection_limits", + "what_habitat_metrics", + "what_activity_metrics", + ], +) +def test_what_endpoint_invokes_correct_underlying(results_df, helper): """Each helper should invoke its corresponding wqp function.""" - helper_to_target = { - "what_organizations": "what_organizations", - "what_projects": "what_projects", - "what_activities": "what_activities", - "what_detection_limits": "what_detection_limits", - "what_habitat_metrics": "what_habitat_metrics", - "what_activity_metrics": "what_activity_metrics", - } - for helper_name, target in helper_to_target.items(): - with mock.patch(f"dataretrieval.wqp.{target}") as m: - m.return_value = (results_df, mock.Mock()) - with wqp_connector.connect() as con: - getattr(con, helper_name)(statecode="US:17") - assert m.call_count == 1, helper_name + with mock.patch(f"dataretrieval.wqp.{helper}") as m: + m.return_value = (results_df, mock.Mock()) + with wqp_connector.connect() as con: + getattr(con, helper)(statecode="US:17") + assert m.call_count == 1