diff --git a/dataretrieval/waterdata/__init__.py b/dataretrieval/waterdata/__init__.py index cacf8f1..f0df1f1 100644 --- a/dataretrieval/waterdata/__init__.py +++ b/dataretrieval/waterdata/__init__.py @@ -19,16 +19,16 @@ get_latest_continuous, get_latest_daily, get_monitoring_locations, - get_nearest_continuous, get_reference_table, get_samples, get_stats_date_range, get_stats_por, get_time_series_metadata, ) +from .filters import FILTER_LANG +from .nearest import get_nearest_continuous from .types import ( CODE_SERVICES, - FILTER_LANG, PROFILE_LOOKUP, PROFILES, SERVICES, diff --git a/dataretrieval/waterdata/api.py b/dataretrieval/waterdata/api.py index 370d610..a3c0a94 100644 --- a/dataretrieval/waterdata/api.py +++ b/dataretrieval/waterdata/api.py @@ -9,16 +9,16 @@ import json import logging from io import StringIO -from typing import Literal, get_args +from typing import get_args import pandas as pd import requests from requests.models import PreparedRequest from dataretrieval.utils import BaseMetadata, to_str +from dataretrieval.waterdata.filters import FILTER_LANG from dataretrieval.waterdata.types import ( CODE_SERVICES, - FILTER_LANG, METADATA_COLLECTIONS, PROFILES, SERVICES, @@ -180,18 +180,11 @@ def get_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -365,18 +358,11 @@ def get_continuous( allowable limit is 10000. It may be beneficial to set this number lower if your internet connection is spotty. The default (NA) will set the limit to the maximum allowable limit for the service. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector @@ -400,9 +386,9 @@ def get_continuous( ... time="2021-01-01T00:00:00Z/2022-01-01T00:00:00Z", ... ) - >>> # The ``time`` parameter accepts a single instant or a single - >>> # interval. To pull several disjoint windows in one call, pass a - >>> # CQL-text ``filter`` expression instead: + >>> # Pull several disjoint time windows in one call via a CQL + >>> # ``filter``. See ``dataretrieval.waterdata.filters`` for the + >>> # full grammar, auto-chunking, and pitfalls. >>> df, md = dataretrieval.waterdata.get_continuous( ... monitoring_location_id="USGS-02238500", ... parameter_code="00060", @@ -414,22 +400,6 @@ def get_continuous( ... ), ... filter_lang="cql-text", ... ) - - >>> # Long top-level ``OR`` chains (e.g. one window per discrete - >>> # measurement timestamp) are built up the same way. If the - >>> # resulting URL would exceed the server's length limit, the - >>> # client transparently splits it into multiple sub-requests and - >>> # returns the concatenated, deduplicated result. - >>> windows = [ - ... f"(time >= '2023-{m:02d}-15T00:00:00Z' " - ... f"AND time <= '2023-{m:02d}-15T00:30:00Z')" - ... for m in range(1, 13) - ... ] - >>> df, md = dataretrieval.waterdata.get_continuous( - ... monitoring_location_id="USGS-02238500", - ... parameter_code="00060", - ... filter=" OR ".join(windows), - ... ) """ service = "continuous" output_id = "continuous_id" @@ -440,239 +410,6 @@ def get_continuous( return get_ogc_data(args, output_id, service) -def get_nearest_continuous( - targets, - monitoring_location_id: str | list[str] | None = None, - parameter_code: str | list[str] | None = None, - *, - window: str | pd.Timedelta = "PT7M30S", - on_tie: Literal["first", "last", "mean"] = "first", - **kwargs, -) -> tuple[pd.DataFrame, BaseMetadata]: - """For each target timestamp, return the nearest continuous observation. - - Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause - per target, joins them as a top-level CQL ``OR`` filter, and lets - ``get_continuous`` (with its auto-chunking) fetch every observation - that falls in any window. Then, per ``(monitoring_location_id, target)`` - pair, picks the single observation with the smallest ``|time - target|``. - - The USGS continuous endpoint matches ``time`` parameters exactly rather - than fuzzily, and it does not implement ``sortby`` for arbitrary fields; - this function is the single-round-trip way to ask "what reading is - nearest this timestamp?" for many timestamps at once. - - Parameters - ---------- - targets : list-like of datetime-convertible - Target timestamps. Naive datetimes are treated as UTC. Accepts a - list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``, - or anything ``pandas.to_datetime`` consumes. - monitoring_location_id : string or list of strings, optional - Forwarded to ``get_continuous``. - parameter_code : string or list of strings, optional - Forwarded to ``get_continuous``. - window : string or ``pandas.Timedelta``, default ``"PT7M30S"`` - Half-window around each target, as an ISO 8601 duration - (``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts - any other form ``pandas.Timedelta`` parses — ``HH:MM:SS`` - (``"00:07:30"``), pandas shorthand (``"7min30s"``, - ``"450s"``), or a ``pd.Timedelta`` directly. See the - `pandas.Timedelta docs - `_ - for the full grammar. - - Must be small enough that every target's window captures - roughly one observation at the service cadence. The default - matches a 15-minute continuous gauge; widen (e.g. - ``"PT15M"``) for irregular cadences or resilience to data - gaps. - on_tie : {"first", "last", "mean"}, default ``"first"`` - How to resolve ties when two observations are exactly equidistant - from a target (which happens when the target falls at the midpoint - between grid points — e.g. target ``10:22:30`` for a 15-minute - gauge). - - - ``"first"``: keep the earlier observation. - - ``"last"``: keep the later observation. - - ``"mean"``: average numeric columns; set the ``time`` column to - the target, since no real observation exists at the midpoint. - - **kwargs - Additional keyword arguments forwarded to ``get_continuous`` - (e.g. ``statistic_id``, ``approval_status``, ``properties``). - Passing ``time``, ``filter``, or ``filter_lang`` raises - ``TypeError`` — this function builds those itself. - - Returns - ------- - df : ``pandas.DataFrame`` - One row per ``(target, monitoring_location_id)`` combination that - had at least one observation in its window. Rows are augmented - with a ``target_time`` column indicating which target they - correspond to. Targets with no observations in their window are - silently dropped. - md : :class:`~dataretrieval.utils.BaseMetadata` - Metadata from the underlying ``get_continuous`` call. - - Notes - ----- - *Window sizing and ties.* When ``window`` is exactly half the service - cadence, most targets' windows contain a single observation and - ``on_tie`` is moot. Ties arise only when a target sits exactly at the - window edge — rare in practice but possible. Setting ``window`` to a - full cadence (or larger) guarantees at least one observation per - target in steady state at the cost of more bytes per response. - - *Why windowed CQL rather than sort+limit.* The API's advertised - ``sortby`` parameter would make this a one-liner per target (``filter`` - by ``time <= t`` and ``limit 1``), but it is per-query — you would need - one HTTP round-trip per target. The CQL ``OR``-chain approach folds - all N targets into one request (auto-chunked when the URL is long). - - Examples - -------- - .. code:: - - >>> import pandas as pd - >>> from dataretrieval import waterdata - - >>> # Pair three off-grid timestamps with nearby observations - >>> targets = pd.to_datetime( - ... [ - ... "2023-06-15T10:30:31Z", - ... "2023-06-15T14:07:12Z", - ... "2023-06-16T03:45:19Z", - ... ] - ... ) - >>> df, md = waterdata.get_nearest_continuous( - ... targets, - ... monitoring_location_id="USGS-02238500", - ... parameter_code="00060", - ... ) - - >>> # Widen the window for an irregular-cadence gauge - >>> df, md = waterdata.get_nearest_continuous( - ... targets, - ... monitoring_location_id="USGS-02238500", - ... parameter_code="00060", - ... window="PT30M", - ... on_tie="mean", - ... ) - """ - _check_nearest_kwargs(kwargs, on_tie) - targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True)) - window_td = pd.Timedelta(window) - - if len(targets) == 0: - raise ValueError("targets must contain at least one timestamp") - - filter_expr = _build_window_or_filter(targets, window_td) - df, md = get_continuous( - monitoring_location_id=monitoring_location_id, - parameter_code=parameter_code, - filter=filter_expr, - filter_lang="cql-text", - **kwargs, - ) - if df.empty: - return _empty_nearest_result(df), md - - df = df.assign(time=pd.to_datetime(df["time"], utc=True)) - site_groups = ( - df.groupby("monitoring_location_id", sort=False) - if "monitoring_location_id" in df.columns - else [(None, df)] - ) - - selected = [ - row - for _, site_df in site_groups - for target in targets - if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None - ] - if not selected: - return _empty_nearest_result(df), md - return pd.DataFrame(selected).reset_index(drop=True), md - - -_VALID_ON_TIE = ("first", "last", "mean") - - -def _check_nearest_kwargs(kwargs: dict, on_tie: str) -> None: - """Reject kwargs the helper owns; validate ``on_tie``.""" - for forbidden in ("time", "filter", "filter_lang"): - if forbidden in kwargs: - raise TypeError( - f"get_nearest_continuous constructs its own {forbidden!r}; " - "do not pass it directly" - ) - if on_tie not in _VALID_ON_TIE: - raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}") - - -def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str: - """Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows. - - ``get_continuous`` auto-chunks the result if the full URL would - exceed the server's length limit, so this is always safe to build - as one string even for many targets. - """ - return " OR ".join( - f"(time >= '{(t - window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}' " - f"AND time <= '{(t + window_td).strftime('%Y-%m-%dT%H:%M:%SZ')}')" - for t in targets - ) - - -def _pick_nearest_row( - site_df: pd.DataFrame, - target: pd.Timestamp, - window_td: pd.Timedelta, - on_tie: str, -) -> pd.Series | None: - """Return the single row within ``window_td`` of ``target``, or ``None``. - - Resolves ties (two rows equidistant from ``target``) per ``on_tie``. - The returned row carries a ``target_time`` column identifying which - target it was selected for. - """ - in_window = site_df[ - (site_df["time"] >= target - window_td) - & (site_df["time"] <= target + window_td) - ] - if in_window.empty: - return None - deltas = (in_window["time"] - target).abs() - candidates = in_window[deltas == deltas.min()].sort_values("time") - - if len(candidates) == 1 or on_tie == "first": - row = candidates.iloc[0].copy() - elif on_tie == "last": - row = candidates.iloc[-1].copy() - else: # "mean" — synthesize a row whose numeric cols are averaged and - # whose ``time`` is the target (no real observation sits at the midpoint). - row = candidates.iloc[0].copy() - for col in candidates.select_dtypes("number").columns: - row[col] = candidates[col].mean() - row["time"] = target - - row["target_time"] = target - return row - - -def _empty_nearest_result(template: pd.DataFrame | None = None) -> pd.DataFrame: - """Empty frame with a ``target_time`` column, for no-match cases. - - When ``template`` is provided, preserve its columns/dtypes so the - returned frame matches the shape of a real ``get_continuous`` - response. - """ - base = pd.DataFrame() if template is None else template.iloc[0:0].copy() - base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") - return base - - def get_monitoring_locations( monitoring_location_id: list[str] | None = None, agency_code: list[str] | None = None, @@ -930,18 +667,11 @@ def get_monitoring_locations( The returning object will be a data frame with no spatial information. Note that the USGS Water Data APIs use camelCase "skipGeometry" in CQL2 queries. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1160,18 +890,11 @@ def get_time_series_metadata( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1349,18 +1072,11 @@ def get_latest_continuous( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1537,18 +1253,11 @@ def get_latest_daily( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -1717,18 +1426,11 @@ def get_field_measurements( allowable limit is 50000. It may be beneficial to set this number lower if your internet connection is spotty. The default (None) will set the limit to the maximum allowable limit for the service. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, converts columns to appropriate types. @@ -2488,18 +2190,11 @@ def get_channel( vertical_velocity_description, longitudinal_velocity_description, measurement_type, last_modified, channel_measurement_type. The default (NA) will return all columns of the data. - filter : string, optional - A CQL text or JSON expression passed through to the OGC API - ``filter`` query parameter. Commonly used to OR several time - ranges into a single request. At the time of writing the server - accepts ``cql-text`` (default) and ``cql-json``; ``cql2-text`` / - ``cql2-json`` are not yet supported. A long expression made up - of a top-level ``OR`` chain is automatically split into - multiple requests that each fit under the server's URI length - limit; the results are concatenated. - filter_lang : string, optional - Language of the ``filter`` expression, for example ``cql-text`` - (default) or ``cql-json``. Sent as ``filter-lang`` in the URL. + filter, filter_lang : optional + Server-side CQL filter passed through as the OGC ``filter`` / + ``filter-lang`` query parameters. See + :mod:`dataretrieval.waterdata.filters` for syntax, auto-chunking, + and the lexicographic-comparison pitfall. convert_type : boolean, optional If True, the function will convert the data to dates and qualifier to string vector diff --git a/dataretrieval/waterdata/filters.py b/dataretrieval/waterdata/filters.py new file mode 100644 index 0000000..4c136b8 --- /dev/null +++ b/dataretrieval/waterdata/filters.py @@ -0,0 +1,334 @@ +"""CQL ``filter`` support for the Water Data OGC getters. + +Two names are public to the rest of the package: + +- ``FILTER_LANG``: the type alias used for the ``filter_lang`` kwarg. +- ``chunked``: the decorator ``utils.py`` applies to its single-request + fetch function. It runs the lexicographic-comparison pitfall guard, + splits long cql-text filters at top-level ``OR`` so each sub-request + fits under the server's URL byte limit, and concatenates the results. + +Other CQL shapes (``AND``, ``NOT``, ``LIKE``, spatial/temporal predicates, +function calls) are forwarded verbatim — only top-level ``OR`` chunks +losslessly into independent sub-queries whose result sets can be union'd. +""" + +from __future__ import annotations + +import functools +import re +from collections.abc import Callable +from typing import Any, Literal, TypeVar +from urllib.parse import quote_plus + +import pandas as pd +import requests + +FILTER_LANG = Literal["cql-text", "cql-json"] + +# Conservative fallback budget when ``_chunk_cql_or`` is called without +# an explicit ``max_len``. The ``chunked`` decorator computes a tighter +# per-request budget from ``_WATERDATA_URL_BYTE_LIMIT``. +_CQL_FILTER_CHUNK_LEN = 5000 + +# Empirically the API replies HTTP 414 above ~8200 bytes of full URL — +# matches nginx's default ``large_client_header_buffers`` of 8 KB. 8000 +# leaves ~200 bytes for request-line framing and proxy variance. +_WATERDATA_URL_BYTE_LIMIT = 8000 + +# Conservative over-estimate of URL bytes used by everything *except* +# the filter value. Used only by the fast path in +# ``_effective_filter_budget`` to skip the probe when the encoded filter +# clearly already fits. +_NON_FILTER_URL_HEADROOM = 1000 + + +_NUM = r"-?(?:\d+(?:\.\d+)?|\.\d+)(?:[eE][+-]?\d+)?" +_IDENT = r"[A-Za-z_]\w*" +_OP = r">=|<=|<>|!=|==|=|>|<" +_FIELD_NEGATED = rf"\b(?!NOT\b)(?P{_IDENT})\s+(?PNOT\s+)?" + +_NUMERIC_COMPARE_RE = re.compile( + rf""" + (?: + \b(?P{_IDENT})\s*(?P{_OP})\s*(?P{_NUM})\b + | + \b(?P{_NUM})\s*(?P{_OP})\s*(?P{_IDENT})\b + ) + """, + re.VERBOSE, +) +_IN_NUMERIC_RE = re.compile( + rf"{_FIELD_NEGATED}IN\s*\([^)]*\b{_NUM}\b[^)]*\)", + re.IGNORECASE, +) +_BETWEEN_NUMERIC_RE = re.compile( + rf"{_FIELD_NEGATED}BETWEEN\s+(?:{_NUM}\b[^)]*?\bAND\b|[^)]*?\bAND\s+{_NUM}\b)", + re.IGNORECASE, +) +_QUOTED_STR_RE = re.compile(r"'[^']*'") + + +def _split_top_level_or(expr: str) -> list[str]: + """Split ``expr`` at each top-level ``OR``, respecting quotes and parens. + + ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. + Matching is case-insensitive; whitespace around each part is stripped; + empty parts are dropped. + """ + parts: list[str] = [] + last = 0 + depth = 0 + in_quote: str | None = None + i = 0 + n = len(expr) + while i < n: + ch = expr[i] + if in_quote is not None: + if ch == in_quote: + in_quote = None + i += 1 + continue + if ch in ("'", '"'): + in_quote = ch + i += 1 + continue + if ch == "(": + depth += 1 + i += 1 + continue + if ch == ")": + depth -= 1 + i += 1 + continue + if depth == 0 and ch.isspace(): + j = i + 1 + while j < n and expr[j].isspace(): + j += 1 + if j + 2 <= n and expr[j : j + 2].lower() == "or": + k = j + 2 + if k < n and expr[k].isspace(): + m = k + 1 + while m < n and expr[m].isspace(): + m += 1 + parts.append(expr[last:i].strip()) + last = m + i = m + continue + i += 1 + parts.append(expr[last:].strip()) + return [p for p in parts if p] + + +def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: + """Split ``expr`` into OR-chunks each under ``max_len`` characters. + + Only top-level ``OR`` chains can be recombined losslessly as a disjunction + of independent sub-queries. Returns ``[expr]`` unchanged when the whole + expression already fits, when there is no top-level ``OR``, or when any + single clause exceeds ``max_len`` (sending it as-is and surfacing the + server's 414 is clearer than silently dropping data). + """ + if len(expr) <= max_len: + return [expr] + parts = _split_top_level_or(expr) + if len(parts) < 2 or any(len(p) > max_len for p in parts): + return [expr] + + chunks = [] + current: list[str] = [] + current_len = 0 + for part in parts: + join_cost = len(" OR ") if current else 0 + if current and current_len + join_cost + len(part) > max_len: + chunks.append(" OR ".join(current)) + current = [part] + current_len = len(part) + else: + current.append(part) + current_len += join_cost + len(part) + if current: + chunks.append(" OR ".join(current)) + return chunks + + +def _effective_filter_budget( + args: dict[str, Any], + filter_expr: str, + build_request: Callable[..., Any], +) -> int: + """Raw-CQL byte budget that, after URL-encoding, fits the URL byte limit. + + The server caps total URL length, not raw CQL length. We probe the + non-filter URL bytes by building the request with a 1-byte placeholder + filter, subtract from the URL limit to get the bytes available for the + encoded filter, then convert back to raw CQL bytes via the *maximum* + per-clause encoding ratio (a chunk could contain only the heavier-encoding + clauses, so budgeting by the average ratio could overflow). + """ + # Fast path: encoded filter clearly fits with room for any plausible + # non-filter URL. Skips the PreparedRequest build and splitter scan. + encoded_len = len(quote_plus(filter_expr)) + if encoded_len + _NON_FILTER_URL_HEADROOM <= _WATERDATA_URL_BYTE_LIMIT: + return len(filter_expr) + 1 + + probe = build_request(**{**args, "filter": "x"}) + available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - (len(probe.url) - 1) + if available_url_bytes <= 0: + # Non-filter URL already over the limit. Pass through unchanged so + # the caller sees one 414 instead of N parallel sub-request failures. + return len(filter_expr) + 1 + parts = _split_top_level_or(filter_expr) or [filter_expr] + encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts) + return max(100, int(available_url_bytes / encoding_ratio)) + + +def _check_numeric_filter_pitfall(filter_expr: str) -> None: + """Raise if the filter pairs a field with an unquoted numeric literal. + + Every queryable on the Water Data OGC API is typed as a string, including + fields whose *values* look numeric (``value``, ``parameter_code`` like + ``'00060'``, ``statistic_id`` like ``'00011'``, ``district_code``, + ``hydrologic_unit_code``, ``channel_flow``). Any unquoted numeric + comparison — ``value >= 1000``, ``parameter_code = 60``, + ``parameter_code IN (60, 61)``, ``value BETWEEN 5 AND 10`` — either gets + rejected with HTTP 500 or silently produces lexicographic results; + zero-padded codes are the worst case (``parameter_code = '60'`` matches + nothing because the real codes are ``'00060'``-shaped). + + Quoted literals (``value >= '1000'``) are not flagged — the caller has + signalled they know the column is textual. + """ + # Mask quoted strings so ``name = 'value > 5'`` doesn't false-positive. + masked = ( + _QUOTED_STR_RE.sub("''", filter_expr) if "'" in filter_expr else filter_expr + ) + + def fail(field: str, offense: str) -> None: + raise ValueError( + f"Filter uses an unquoted numeric comparison against {field!r} " + f"(``{offense}``). Every queryable on the Water Data API is " + f"typed as a string, so the server rejects unquoted numeric " + f"literals with HTTP 500; even quoting the literal gives a " + f"lexicographic comparison (``value > '10'`` matches " + f"``value='34.52'``, ``parameter_code = '60'`` matches nothing " + f"because the real codes are ``'00060'``-shaped). For a true " + f"numeric filter, fetch a wider result and reduce in pandas." + ) + + compare = _NUMERIC_COMPARE_RE.search(masked) + if compare: + field = compare.group("field1") or compare.group("field2") + op = compare.group("op1") or compare.group("op2") + num = compare.group("num1") or compare.group("num2") + fail(field, f"{field} {op} {num}") + + membership = _IN_NUMERIC_RE.search(masked) + if membership: + field = membership.group("field") + op = "NOT IN" if membership.group("negated") else "IN" + fail(field, f"{field} {op} (…)") + + between = _BETWEEN_NUMERIC_RE.search(masked) + if between: + field = between.group("field") + op = "NOT BETWEEN" if between.group("negated") else "BETWEEN" + fail(field, f"{field} {op} …") + + +def _is_chunkable(filter_expr: Any, filter_lang: Any) -> bool: + """Only non-empty cql-text filters can be safely split at top-level OR.""" + return ( + isinstance(filter_expr, str) + and bool(filter_expr) + and filter_lang in {None, "cql-text"} + ) + + +def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: + """Concatenate per-chunk frames, dropping empties and deduping by ``id``. + + ``_get_resp_data`` returns a plain ``pd.DataFrame()`` on empty responses; + concat'ing it with real GeoDataFrames downgrades the result to plain + DataFrame and strips geometry/CRS, so empties are dropped first. Dedup + on the pre-rename feature ``id`` keeps overlapping user OR-clauses from + producing duplicate rows across chunks. + """ + non_empty = [f for f in frames if not f.empty] + if not non_empty: + return pd.DataFrame() + if len(non_empty) == 1: + return non_empty[0] + combined = pd.concat(non_empty, ignore_index=True) + if "id" in combined.columns: + combined = combined.drop_duplicates(subset="id", ignore_index=True) + return combined + + +def _combine_chunk_responses( + responses: list[requests.Response], +) -> requests.Response: + """Return one response: first chunk's URL/headers + summed ``elapsed``. + + Mutates the first response in place (only ``elapsed``); downstream only + reads ``elapsed`` (in ``BaseMetadata.query_time``), URL, and headers. + """ + head = responses[0] + if len(responses) > 1: + head.elapsed = sum((r.elapsed for r in responses[1:]), start=head.elapsed) + return head + + +_FetchOnce = TypeVar( + "_FetchOnce", + bound=Callable[[dict[str, Any]], tuple[pd.DataFrame, requests.Response]], +) + + +def chunked(*, build_request: Callable[..., Any]) -> Callable[[_FetchOnce], _FetchOnce]: + """Decorator that adds CQL-filter chunking to a single-request fetch. + + The wrapped function has signature ``(args: dict) -> (frame, response)`` + and represents one HTTP round-trip. The decorator inspects ``args``: + + - No chunkable filter: pass through unchanged. + - Chunkable cql-text filter: run the lexicographic-pitfall guard, split + into URL-length-safe sub-expressions, call the wrapped function once + per chunk, concatenate frames (drop empties, dedup by feature ``id``), + and return an aggregated response (first chunk's URL/headers, summed + ``elapsed``). + + Either way the return shape matches the undecorated function's, so the + caller wraps the response in ``BaseMetadata`` the same way in both paths. + + ``build_request`` is injected so the decorator can probe URL length + without importing any specific HTTP builder; it receives the same kwargs + the wrapped function's ``args`` would and returns a prepared-request-like + object with a ``.url`` attribute. + """ + + def decorator(fetch_once: _FetchOnce) -> _FetchOnce: + @functools.wraps(fetch_once) + def wrapper( + args: dict[str, Any], + ) -> tuple[pd.DataFrame, requests.Response]: + filter_expr = args.get("filter") + if not _is_chunkable(filter_expr, args.get("filter_lang")): + return fetch_once(args) + + _check_numeric_filter_pitfall(filter_expr) + budget = _effective_filter_budget(args, filter_expr, build_request) + chunks = _chunk_cql_or(filter_expr, max_len=budget) + + frames: list[pd.DataFrame] = [] + responses: list[requests.Response] = [] + for chunk in chunks: + frame, response = fetch_once({**args, "filter": chunk}) + frames.append(frame) + responses.append(response) + + return _combine_chunk_frames(frames), _combine_chunk_responses(responses) + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/dataretrieval/waterdata/nearest.py b/dataretrieval/waterdata/nearest.py new file mode 100644 index 0000000..2948487 --- /dev/null +++ b/dataretrieval/waterdata/nearest.py @@ -0,0 +1,241 @@ +"""``get_nearest_continuous``: nearest-timestamp convenience on top of +``get_continuous``. Built on the CQL ``filter`` passthrough; only +``get_nearest_continuous`` is public — everything else is package-private. +""" + +from __future__ import annotations + +from typing import Literal, get_args + +import pandas as pd + +from dataretrieval.utils import BaseMetadata +from dataretrieval.waterdata.api import get_continuous + +OnTie = Literal["first", "last", "mean"] +_VALID_ON_TIE: tuple[OnTie, ...] = get_args(OnTie) + + +def get_nearest_continuous( + targets, + monitoring_location_id: str | list[str] | None = None, + parameter_code: str | list[str] | None = None, + *, + window: str | pd.Timedelta = "PT7M30S", + on_tie: OnTie = "first", + **kwargs, +) -> tuple[pd.DataFrame, BaseMetadata]: + """For each target timestamp, return the nearest continuous observation. + + Builds one bracketed ``(time >= t-window AND time <= t+window)`` clause + per target, joins them as a top-level CQL ``OR`` filter, and lets + ``get_continuous`` (with its auto-chunking) fetch every observation + that falls in any window. Then, per ``(monitoring_location_id, target)`` + pair, picks the single observation with the smallest ``|time - target|``. + + The USGS continuous endpoint matches ``time`` parameters exactly rather + than fuzzily, and it does not implement ``sortby`` for arbitrary fields; + this function is the single-round-trip way to ask "what reading is + nearest this timestamp?" for many timestamps at once. + + Parameters + ---------- + targets : list-like of datetime-convertible + Target timestamps. Naive datetimes are treated as UTC. Accepts a + list, ``pandas.Series``, ``pandas.DatetimeIndex``, ``numpy.ndarray``, + or anything ``pandas.to_datetime`` consumes. + monitoring_location_id : string or list of strings, optional + Forwarded to ``get_continuous``. + parameter_code : string or list of strings, optional + Forwarded to ``get_continuous``. + window : string or ``pandas.Timedelta``, default ``"PT7M30S"`` + Half-window around each target, as an ISO 8601 duration + (``"PT7M30S"``, ``"PT15M"``, ``"PT1H"``, etc.). Also accepts + any other form ``pandas.Timedelta`` parses — ``HH:MM:SS`` + (``"00:07:30"``), pandas shorthand (``"7min30s"``, + ``"450s"``), or a ``pd.Timedelta`` directly. See the + `pandas.Timedelta docs + `_ + for the full grammar. + + Must be small enough that every target's window captures + roughly one observation at the service cadence. The default + matches a 15-minute continuous gauge; widen (e.g. + ``"PT15M"``) for irregular cadences or resilience to data + gaps. + on_tie : {"first", "last", "mean"}, default ``"first"`` + How to resolve ties when two observations are exactly equidistant + from a target (which happens when the target falls at the midpoint + between grid points — e.g. target ``10:22:30`` for a 15-minute + gauge). + + - ``"first"``: keep the earlier observation. + - ``"last"``: keep the later observation. + - ``"mean"``: average numeric columns; set the ``time`` column to + the target, since no real observation exists at the midpoint. + + **kwargs + Additional keyword arguments forwarded to ``get_continuous`` + (e.g. ``statistic_id``, ``approval_status``, ``properties``). + Passing ``time``, ``filter``, or ``filter_lang`` raises + ``TypeError`` — this function builds those itself. + + Returns + ------- + df : ``pandas.DataFrame`` + One row per ``(target, monitoring_location_id)`` combination that + had at least one observation in its window. Rows are augmented + with a ``target_time`` column indicating which target they + correspond to. Targets with no observations in their window are + silently dropped. + md : :class:`~dataretrieval.utils.BaseMetadata` + Metadata from the underlying ``get_continuous`` call. + + Notes + ----- + *Window sizing and ties.* When ``window`` is exactly half the service + cadence, most targets' windows contain a single observation and + ``on_tie`` is moot. Ties arise only when a target sits exactly at the + window edge — rare in practice but possible. Setting ``window`` to a + full cadence (or larger) guarantees at least one observation per + target in steady state at the cost of more bytes per response. + + *Why windowed CQL rather than sort+limit.* The API's advertised + ``sortby`` parameter would make this a one-liner per target (``filter`` + by ``time <= t`` and ``limit 1``), but it is per-query — you would need + one HTTP round-trip per target. The CQL ``OR``-chain approach folds + all N targets into one request (auto-chunked when the URL is long). + + Examples + -------- + .. code:: + + >>> import pandas as pd + >>> from dataretrieval import waterdata + + >>> # Pair three off-grid timestamps with nearby observations + >>> targets = pd.to_datetime( + ... [ + ... "2023-06-15T10:30:31Z", + ... "2023-06-15T14:07:12Z", + ... "2023-06-16T03:45:19Z", + ... ] + ... ) + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... ) + + >>> # Widen the window for an irregular-cadence gauge + >>> df, md = waterdata.get_nearest_continuous( + ... targets, + ... monitoring_location_id="USGS-02238500", + ... parameter_code="00060", + ... window="PT30M", + ... on_tie="mean", + ... ) + """ + _check_nearest_kwargs(kwargs, on_tie) + targets = pd.DatetimeIndex(pd.to_datetime(targets, utc=True)) + window_td = pd.Timedelta(window) + + if len(targets) == 0: + raise ValueError("targets must contain at least one timestamp") + + filter_expr = _build_window_or_filter(targets, window_td) + df, md = get_continuous( + monitoring_location_id=monitoring_location_id, + parameter_code=parameter_code, + filter=filter_expr, + filter_lang="cql-text", + **kwargs, + ) + if df.empty: + return _empty_nearest_result(df), md + + df = df.assign(time=pd.to_datetime(df["time"], utc=True)) + site_groups = ( + df.groupby("monitoring_location_id", sort=False) + if "monitoring_location_id" in df.columns + else [(None, df)] + ) + + selected = [ + row + for _, site_df in site_groups + for target in targets + if (row := _pick_nearest_row(site_df, target, window_td, on_tie)) is not None + ] + if not selected: + return _empty_nearest_result(df), md + return pd.DataFrame(selected).reset_index(drop=True), md + + +def _check_nearest_kwargs(kwargs: dict, on_tie: OnTie) -> None: + """Reject kwargs the helper owns; validate ``on_tie``.""" + for forbidden in ("time", "filter", "filter_lang"): + if forbidden in kwargs: + raise TypeError( + f"get_nearest_continuous constructs its own {forbidden!r}; " + "do not pass it directly" + ) + if on_tie not in _VALID_ON_TIE: + raise ValueError(f"on_tie must be one of {_VALID_ON_TIE}; got {on_tie!r}") + + +def _build_window_or_filter(targets: pd.DatetimeIndex, window_td: pd.Timedelta) -> str: + """Build the CQL OR-chain of ``time >= ... AND time <= ...`` windows. + + ``get_continuous`` auto-chunks the result if the full URL would + exceed the server's length limit, so this is always safe to build + as one string even for many targets. + """ + fmt = "%Y-%m-%dT%H:%M:%SZ" + lowers = (targets - window_td).strftime(fmt) + uppers = (targets + window_td).strftime(fmt) + return " OR ".join( + f"(time >= '{lo}' AND time <= '{up}')" for lo, up in zip(lowers, uppers) + ) + + +def _pick_nearest_row( + site_df: pd.DataFrame, + target: pd.Timestamp, + window_td: pd.Timedelta, + on_tie: OnTie, +) -> pd.Series | None: + """Return the single row within ``window_td`` of ``target``, or ``None``. + + Resolves ties (two rows equidistant from ``target``) per ``on_tie``. + The returned row carries a ``target_time`` column identifying which + target it was selected for. + """ + in_window = site_df[ + (site_df["time"] >= target - window_td) + & (site_df["time"] <= target + window_td) + ] + if in_window.empty: + return None + deltas = (in_window["time"] - target).abs() + candidates = in_window[deltas == deltas.min()].sort_values("time") + + if len(candidates) == 1 or on_tie == "first": + row = candidates.iloc[0].copy() + elif on_tie == "last": + row = candidates.iloc[-1].copy() + else: # "mean" — average numeric cols, set time to the target. + row = candidates.iloc[0].copy() + for col in candidates.select_dtypes("number").columns: + row[col] = candidates[col].mean() + row["time"] = target + + row["target_time"] = target + return row + + +def _empty_nearest_result(template: pd.DataFrame) -> pd.DataFrame: + """Empty frame matching ``template``'s columns plus a ``target_time``.""" + base = template.iloc[0:0].copy() + base["target_time"] = pd.Series(dtype="datetime64[ns, UTC]") + return base diff --git a/dataretrieval/waterdata/types.py b/dataretrieval/waterdata/types.py index c965571..f5e1496 100644 --- a/dataretrieval/waterdata/types.py +++ b/dataretrieval/waterdata/types.py @@ -40,8 +40,6 @@ "results", ] -FILTER_LANG = Literal["cql-text", "cql-json"] - PROFILES = Literal[ "actgroup", "actmetric", diff --git a/dataretrieval/waterdata/utils.py b/dataretrieval/waterdata/utils.py index ec83774..7070da5 100644 --- a/dataretrieval/waterdata/utils.py +++ b/dataretrieval/waterdata/utils.py @@ -4,10 +4,8 @@ import logging import os import re -from collections.abc import Iterator from datetime import datetime from typing import Any, get_args -from urllib.parse import quote_plus import pandas as pd import requests @@ -15,6 +13,7 @@ from dataretrieval import __version__ from dataretrieval.utils import BaseMetadata +from dataretrieval.waterdata import filters from dataretrieval.waterdata.types import ( PROFILE_LOOKUP, PROFILES, @@ -226,165 +225,6 @@ def _format_api_dates( raise ValueError("datetime_input should only include 1-2 values") -# Conservative fallback budget (characters) for a single CQL ``filter`` -# query parameter, used when the caller invokes ``_chunk_cql_or`` without -# a ``max_len``. ``get_ogc_data`` computes a tighter per-request budget -# from ``_WATERDATA_URL_BYTE_LIMIT`` below. -_CQL_FILTER_CHUNK_LEN = 5000 - -# Total URL byte limit the Water Data API will accept before replying -# HTTP 414 (Request-URI Too Large). Empirically the cliff sits at -# ~8,200 bytes of full URL, which lines up with nginx's default -# ``large_client_header_buffers`` of 8 KB (8192). 8000 leaves ~200 bytes -# of headroom for request-line framing ("GET ... HTTP/1.1\r\n") and any -# intermediate proxy variance. -_WATERDATA_URL_BYTE_LIMIT = 8000 - -# Conservative over-estimate of the URL bytes consumed by everything -# *except* the filter value — the base URL, other query params, and the -# ``&filter=`` / ``&filter-lang=...`` keys. Used only to decide whether a -# filter is small enough that the expensive budget probe can be skipped. -_NON_FILTER_URL_HEADROOM = 1000 - - -def _iter_or_boundaries(expr: str) -> Iterator[tuple[int, int]]: - """Yield ``(start, end)`` spans of each top-level ``OR`` separator. - - Tracks single/double-quoted string literals and parenthesized - sub-expressions so that ``OR`` tokens inside them are skipped. - Matching is case-insensitive and the yielded span covers the - surrounding whitespace on both sides. - """ - depth = 0 - in_quote = None - i = 0 - n = len(expr) - while i < n: - ch = expr[i] - if in_quote is not None: - if ch == in_quote: - in_quote = None - i += 1 - continue - if ch in ("'", '"'): - in_quote = ch - i += 1 - continue - if ch == "(": - depth += 1 - i += 1 - continue - if ch == ")": - depth -= 1 - i += 1 - continue - if depth == 0 and ch.isspace(): - j = i + 1 - while j < n and expr[j].isspace(): - j += 1 - if j + 2 <= n and expr[j : j + 2].lower() == "or": - k = j + 2 - if k < n and expr[k].isspace(): - m = k + 1 - while m < n and expr[m].isspace(): - m += 1 - yield i, m - i = m - continue - i += 1 - - -def _split_top_level_or(expr: str) -> list[str]: - """Split a CQL expression at each top-level ``OR`` separator. - - Respects parentheses and single/double-quoted string literals so that - ``OR`` tokens inside ``(A OR B)`` or ``'word OR word'`` are left alone. - Matching is case-insensitive. Whitespace around each emitted part is - stripped; empty parts are dropped. - """ - parts = [] - last = 0 - for start, end in _iter_or_boundaries(expr): - parts.append(expr[last:start].strip()) - last = end - parts.append(expr[last:].strip()) - return [p for p in parts if p] - - -def _chunk_cql_or(expr: str, max_len: int = _CQL_FILTER_CHUNK_LEN) -> list[str]: - """Split a CQL expression into OR-chunks that each fit under ``max_len``. - - The splitter only understands top-level ``OR`` chains, since that is - the only shape that can be recombined losslessly as a disjunction of - independent sub-queries. Returns ``[expr]`` unchanged when the whole - expression already fits, when it contains no top-level ``OR``, or when - any single clause is larger than ``max_len`` on its own (we would - rather send a too-long request and surface the server's 414 than - silently drop data). - """ - if len(expr) <= max_len: - return [expr] - parts = _split_top_level_or(expr) - if len(parts) < 2 or any(len(p) > max_len for p in parts): - return [expr] - - chunks = [] - current = [] - current_len = 0 - for part in parts: - join_cost = len(" OR ") if current else 0 - if current and current_len + join_cost + len(part) > max_len: - chunks.append(" OR ".join(current)) - current = [part] - current_len = len(part) - else: - current.append(part) - current_len += join_cost + len(part) - if current: - chunks.append(" OR ".join(current)) - return chunks - - -def _effective_filter_budget(args: dict[str, Any], filter_expr: str) -> int: - """Compute the raw CQL byte budget for ``filter_expr`` in this request. - - The server limits total URL length (see ``_WATERDATA_URL_BYTE_LIMIT``), - not raw CQL length. To derive a raw-byte budget we can hand to - ``_chunk_cql_or``: - - 1. Probe the URL space consumed by the other query params by building - the request with a 1-byte placeholder filter. - 2. Subtract from the URL limit to get the bytes available for the - encoded filter value. - 3. Convert back to raw CQL bytes using the *maximum* per-clause - encoding ratio, not the whole-filter average. A chunk can end up - containing only the heavier-encoding clauses (e.g. heavy ones - clustered at one end of the filter), so budgeting against the - average lets such a chunk overflow the URL limit by a few bytes. - """ - # Fast path: if the whole encoded filter already fits with room for - # any plausible non-filter URL overhead, skip the probe and the - # splitter entirely. Signals pass-through via a budget larger than - # the filter. Saves a PreparedRequest build + a full splitter scan - # on every short-filter call. - encoded_len = len(quote_plus(filter_expr)) - if encoded_len + _NON_FILTER_URL_HEADROOM <= _WATERDATA_URL_BYTE_LIMIT: - return len(filter_expr) + 1 - - probe = _construct_api_requests(**{**args, "filter": "x"}) - non_filter_url_bytes = len(probe.url) - 1 - available_url_bytes = _WATERDATA_URL_BYTE_LIMIT - non_filter_url_bytes - if available_url_bytes <= 0: - # The non-filter URL already exceeds the byte limit, so no chunk - # we could produce would fit. Return a budget larger than the - # filter so _chunk_cql_or passes it through unchanged — one 414 - # from the server is clearer than a burst of N failing sub-requests. - return len(filter_expr) + 1 - parts = _split_top_level_or(filter_expr) or [filter_expr] - encoding_ratio = max(len(quote_plus(p)) / len(p) for p in parts if p) - return max(100, int(available_url_bytes / encoding_ratio)) - - def _cql2_param(args: dict[str, Any]) -> str: """ Convert query parameters to CQL2 JSON format for POST requests. @@ -995,96 +835,30 @@ def get_ogc_data( convert_type = args.pop("convert_type", False) args = {k: v for k, v in args.items() if v is not None} - chunks = _plan_filter_chunks(args) - frames, responses = _fetch_chunks(args, chunks) - - return_list = _combine_chunk_frames(frames) + return_list, response = _fetch_once(args) return_list = _deal_with_empty(return_list, properties, service) if convert_type: return_list = _type_cols(return_list) return_list = _arrange_cols(return_list, properties, output_id) return_list = _sort_rows(return_list) - return return_list, _aggregate_response_metadata(responses) + return return_list, BaseMetadata(response) -def _plan_filter_chunks(args: dict[str, Any]) -> list[str | None]: - """Decide how to fan ``args["filter"]`` out across HTTP calls. +@filters.chunked(build_request=_construct_api_requests) +def _fetch_once( + args: dict[str, Any], +) -> tuple[pd.DataFrame, requests.Response]: + """Send one prepared-args OGC request; return the frame + response. - Returns one entry per request to send. A ``None`` entry means "send - ``args`` as-is" — either there is no filter, or the filter language - is not one we can safely split (only cql-text top-level ``OR`` - chains are chunkable). Otherwise each string entry is a chunked - cql-text expression that replaces ``args["filter"]`` for its - sub-request. Overlapping user OR-clauses are deduplicated by feature - id later in ``_combine_chunk_frames``. + Filter chunking is added orthogonally by the ``@filters.chunked`` + decorator: with no filter (or an un-chunkable one) the decorator + passes ``args`` through to this body; with a chunkable filter it + fans out and calls this body once per sub-filter, then combines. + Either way the return shape is ``(frame, response)``. """ - filter_expr = args.get("filter") - filter_lang = args.get("filter_lang") - chunkable = ( - isinstance(filter_expr, str) - and filter_expr - and filter_lang in {None, "cql-text"} - ) - if not chunkable: - return [None] - raw_budget = _effective_filter_budget(args, filter_expr) - return _chunk_cql_or(filter_expr, max_len=raw_budget) - - -def _fetch_chunks( - args: dict[str, Any], chunks: list[str | None] -) -> tuple[list[pd.DataFrame], list[requests.Response]]: - """Send one request per chunk; return the per-chunk frames and responses.""" - frames: list[pd.DataFrame] = [] - responses: list[requests.Response] = [] - for chunk in chunks: - chunk_args = args if chunk is None else {**args, "filter": chunk} - req = _construct_api_requests(**chunk_args) - frame, response = _walk_pages(geopd=GEOPANDAS, req=req) - frames.append(frame) - responses.append(response) - return frames, responses - - -def _combine_chunk_frames(frames: list[pd.DataFrame]) -> pd.DataFrame: - """Concatenate per-chunk frames, handling the edge cases. - - Drops empty frames before concat — ``_get_resp_data`` returns a - plain ``pd.DataFrame()`` on empty responses, which would downgrade - a concat of real GeoDataFrames back to a plain DataFrame and strip - geometry/CRS. Also dedups on the pre-rename feature ``id`` so - overlapping user-supplied OR-clauses don't produce duplicate rows - across chunks. - """ - non_empty = [f for f in frames if not f.empty] - if not non_empty: - return pd.DataFrame() - if len(non_empty) == 1: - return non_empty[0] - combined = pd.concat(non_empty, ignore_index=True) - if "id" in combined.columns: - combined = combined.drop_duplicates(subset="id", ignore_index=True) - return combined - - -def _aggregate_response_metadata( - responses: list[requests.Response], -) -> BaseMetadata: - """Build metadata from the first response, summing elapsed across chunks. - - The first response's URL and headers are the representative ones to - return. When the filter was fanned across multiple chunks, replace - its elapsed with the sum so ``query_time`` reflects the full - operation rather than just the first sub-request. - """ - metadata_response = responses[0] - if len(responses) > 1: - metadata_response.elapsed = sum( - (r.elapsed for r in responses[1:]), - start=metadata_response.elapsed, - ) - return BaseMetadata(metadata_response) + req = _construct_api_requests(**args) + return _walk_pages(geopd=GEOPANDAS, req=req) def _handle_stats_nesting( diff --git a/tests/waterdata_filters_test.py b/tests/waterdata_filters_test.py new file mode 100644 index 0000000..21eb6c1 --- /dev/null +++ b/tests/waterdata_filters_test.py @@ -0,0 +1,589 @@ +from datetime import timedelta +from types import SimpleNamespace +from unittest import mock +from urllib.parse import parse_qs, urlsplit + +import pandas as pd +import pytest + +from dataretrieval.waterdata.filters import ( + _CQL_FILTER_CHUNK_LEN, + _WATERDATA_URL_BYTE_LIMIT, + _check_numeric_filter_pitfall, + _chunk_cql_or, + _effective_filter_budget, + _split_top_level_or, +) +from dataretrieval.waterdata.utils import _construct_api_requests + + +def _query_params(prepared_request): + return parse_qs(urlsplit(prepared_request.url).query) + + +def _fake_prepared_request(url="https://example.test"): + """Stand-in for the object ``_construct_api_requests`` returns.""" + return SimpleNamespace(url=url, method="GET", headers={}) + + +def _fake_response(url="https://example.test", elapsed_ms=1): + """Stand-in for the response object ``_walk_pages`` returns.""" + return SimpleNamespace( + url=url, + elapsed=timedelta(milliseconds=elapsed_ms), + headers={}, + ) + + +def _build_request(**kwargs): + """Wrapper that matches the ``build_request`` callable shape.""" + return _construct_api_requests(**kwargs) + + +def test_construct_filter_passthrough(): + """`filter` is forwarded verbatim as a query parameter.""" + expr = ( + "(time >= '2023-01-06T16:00:00Z' AND time <= '2023-01-06T18:00:00Z') " + "OR (time >= '2023-01-10T18:00:00Z' AND time <= '2023-01-10T20:00:00Z')" + ) + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + ) + qs = _query_params(req) + assert qs["filter"] == [expr] + + +def test_construct_filter_lang_hyphenated(): + """The Python kwarg `filter_lang` is sent as URL key `filter-lang`.""" + req = _construct_api_requests( + service="continuous", + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter="time >= '2023-01-01T00:00:00Z'", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter-lang"] == ["cql-text"] + assert "filter_lang" not in qs + + +def test_split_top_level_or_simple(): + parts = _split_top_level_or("A OR B OR C") + assert parts == ["A", "B", "C"] + + +def test_split_top_level_or_case_insensitive(): + assert _split_top_level_or("A or B Or C") == ["A", "B", "C"] + + +def test_split_top_level_or_respects_parens(): + assert _split_top_level_or("(A OR B) OR (C OR D)") == ["(A OR B)", "(C OR D)"] + + +def test_split_top_level_or_respects_quotes(): + expr = "name = 'foo OR bar' OR id = 1" + assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] + + +def test_split_top_level_or_handles_doubled_quote_escape(): + """CQL text escapes a single quote inside a literal as ``''``. The + two quotes are adjacent, so the scanner's naive toggle-on-quote logic + happens to land back in the correct state with nothing between the + toggles to misclassify. Lock that behavior in so a future refactor + can't regress it.""" + cases = [ + ("name = 'O''Reilly OR Co' OR id = 1", ["name = 'O''Reilly OR Co'", "id = 1"]), + ("name = 'It''s' OR id = 1", ["name = 'It''s'", "id = 1"]), + ( + "name = 'alpha ''or'' beta' OR id = 1", + ["name = 'alpha ''or'' beta'", "id = 1"], + ), + ("'x'' OR ''y' OR id = 1", ["'x'' OR ''y'", "id = 1"]), + ] + for expr, expected in cases: + assert _split_top_level_or(expr) == expected, expr + + +def test_split_top_level_or_single_clause(): + assert _split_top_level_or("time >= '2023-01-01T00:00:00Z'") == [ + "time >= '2023-01-01T00:00:00Z'" + ] + + +def test_chunk_cql_or_short_passthrough(): + expr = "time >= '2023-01-01T00:00:00Z'" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +def test_chunk_cql_or_splits_into_multiple(): + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 200) + chunks = _chunk_cql_or(expr, max_len=1000) + # each chunk must be under the budget + assert all(len(c) <= 1000 for c in chunks) + # rejoined chunks must cover every clause + rejoined_clauses = sum(len(c.split(" OR ")) for c in chunks) + assert rejoined_clauses == 200 + # and must be a valid OR chain (each chunk is itself a top-level OR of clauses) + assert len(chunks) > 1 + + +def test_chunk_cql_or_unsplittable_returns_input(): + big = "value > 0 AND " + ("A " * 4000) + assert _chunk_cql_or(big, max_len=1000) == [big] + + +def test_chunk_cql_or_single_clause_over_budget_returns_input(): + huge_clause = "(value > " + "9" * 6000 + ")" + expr = f"{huge_clause} OR (value > 0)" + assert _chunk_cql_or(expr, max_len=1000) == [expr] + + +@pytest.mark.parametrize( + "service", + [ + "daily", + "continuous", + "monitoring-locations", + "time-series-metadata", + "latest-continuous", + "latest-daily", + "field-measurements", + "channel-measurements", + ], +) +def test_construct_filter_on_all_ogc_services(service): + """Filter passthrough works uniformly for every OGC collection endpoint.""" + req = _construct_api_requests( + service=service, + filter="value > 0", + filter_lang="cql-text", + ) + qs = _query_params(req) + assert qs["filter"] == ["value > 0"] + assert qs["filter-lang"] == ["cql-text"] + + +def test_long_filter_fans_out_into_multiple_requests(): + """An oversized top-level OR filter triggers multiple HTTP requests + whose results are concatenated.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + assert len(expr) > _CQL_FILTER_CHUNK_LEN + + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return _fake_prepared_request() + + def fake_walk_pages(*_args, **_kwargs): + idx = len(sent_filters) + frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.filters._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Mocking _effective_filter_budget bypasses the URL-length probe, so + # sent_filters contains only real chunk requests. Assert invariants: + # chunking happened, every original clause is preserved exactly once + # in order, each chunk stays under the budget, and the mock's + # one-row-per-chunk responses concatenate to a row per chunk. + expected_parts = _split_top_level_or(expr) + assert len(sent_filters) > 1 + rejoined_parts = [] + for chunk in sent_filters: + rejoined_parts.extend(_split_top_level_or(chunk)) + assert rejoined_parts == expected_parts + assert len(df) == len(sent_filters) + assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) + + +def test_long_filter_deduplicates_cross_chunk_overlap(): + """Features returned by multiple chunks (same feature `id`) are + deduplicated in the concatenated result.""" + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.filters._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # Chunking must have happened (otherwise dedup wouldn't be exercised). + assert call_count["n"] > 1 + # Even though each chunk returned a feature, dedup by id collapses them. + assert len(df) == 1 + + +def test_empty_chunks_do_not_downgrade_geodataframe(): + """A mix of empty and non-empty chunk responses must not downgrade a + GeoDataFrame-typed result to a plain DataFrame. ``_get_resp_data`` + returns ``pd.DataFrame()`` on empty responses, which would otherwise + strip geometry/CRS from the concatenated output.""" + pytest.importorskip("geopandas") + import geopandas as gpd + from shapely.geometry import Point + + from dataretrieval.waterdata import get_continuous + + clause = ( + "(time >= '2023-01-{day:02d}T00:00:00Z' " + "AND time <= '2023-01-{day:02d}T00:30:00Z')" + ) + expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) + + call_count = {"n": 0} + + def fake_walk_pages(*_args, **_kwargs): + call_count["n"] += 1 + # Chunk 2 returns empty; chunks 1 and 3 return GeoDataFrames. + if call_count["n"] == 2: + frame = pd.DataFrame() + else: + frame = gpd.GeoDataFrame( + {"id": [f"feat-{call_count['n']}"], "value": [call_count["n"]]}, + geometry=[Point(call_count["n"], call_count["n"])], + crs="EPSG:4326", + ) + return frame, _fake_response() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + return_value=_fake_prepared_request(), + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages + ), mock.patch( + "dataretrieval.waterdata.filters._effective_filter_budget", + return_value=_CQL_FILTER_CHUNK_LEN, + ): + df, _ = get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-text", + ) + + # The empty chunk must not have stripped the GeoDataFrame type. + assert isinstance(df, gpd.GeoDataFrame) + assert "geometry" in df.columns + assert df.crs is not None + + +def test_effective_filter_budget_respects_url_limit(): + """The computed budget, once encoded, fits within the URL byte limit + alongside the other query params.""" + from urllib.parse import quote_plus + + filter_expr = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "parameter_code": "00060", + "filter": filter_expr, + "filter_lang": "cql-text", + } + raw_budget = _effective_filter_budget(args, filter_expr, _build_request) + + # Build a chunk exactly at the raw budget (padded with the clause repeated) + # and confirm the full URL it produces stays under the URL byte limit. + padded = (" OR ".join([filter_expr] * 200))[:raw_budget] + req = _construct_api_requests(**{**args, "filter": padded}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT + # And the budget scales inversely with encoding ratio (sanity). + assert raw_budget < _WATERDATA_URL_BYTE_LIMIT + # Quick sanity on the encoding math itself. + assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_uses_max_clause_ratio(): + """Heavy clauses clustered in one part of the filter must not be able + to push any chunk over the URL limit. The budget is computed against + the max per-clause encoding ratio, not the whole-filter average, so + a chunk of only-heaviest-clauses still fits.""" + from urllib.parse import quote_plus + + heavy = ( + "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z' " + "AND approval_status IN ('Approved','Provisional','Revised'))" + ) + light = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + # Heavy ratio < light ratio for these shapes; cluster them at opposite + # ends so the chunker must produce at least one light-only chunk. + clauses = [heavy] * 100 + [light] * 400 + expr = " OR ".join(clauses) + args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + budget = _effective_filter_budget(args, expr, _build_request) + chunks = _chunk_cql_or(expr, max_len=budget) + assert len(chunks) > 1 + + # Every chunk, once built into a full request, fits under the URL byte + # limit — even the all-light chunks that have a higher-than-average ratio. + for chunk in chunks: + req = _construct_api_requests(**{**args, "filter": chunk}) + assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT, ( + f"chunk url {len(req.url)} exceeds {_WATERDATA_URL_BYTE_LIMIT}" + ) + + # Budget should be tight enough that a chunk of only-light clauses + # (the heavier-encoding shape here) still fits. + assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT + + +def test_effective_filter_budget_passes_through_when_no_url_space(): + """If the non-filter URL already exceeds the byte limit, chunking + cannot make the request succeed. The budget helper should signal + pass-through (return a budget larger than the filter) so + ``_chunk_cql_or`` emits one chunk — one 414 from the server is + clearer than a burst of N guaranteed-414 sub-requests.""" + expr = " OR ".join( + ["(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')"] * 50 + ) + fake_build = mock.Mock( + return_value=_fake_prepared_request(url="https://example.test/" + "A" * 9000) + ) + budget = _effective_filter_budget({"filter": expr}, expr, fake_build) + # Budget is large enough that _chunk_cql_or returns the expression + # unchanged (passthrough) rather than producing many small chunks. + assert budget > len(expr) + assert _chunk_cql_or(expr, max_len=budget) == [expr] + + +def test_effective_filter_budget_shrinks_with_more_url_params(): + """Adding more scalar query params consumes URL bytes and should + shrink the raw filter budget accordingly. Use a filter large enough + to skip the short-circuit fast path so the probe actually runs.""" + clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" + expr = " OR ".join([clause] * 100) + sparse_args = { + "service": "continuous", + "monitoring_location_id": "USGS-02238500", + "filter": expr, + "filter_lang": "cql-text", + } + dense_args = { + **sparse_args, + "parameter_code": "00060", + "statistic_id": "00003", + "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", + } + sparse_budget = _effective_filter_budget(sparse_args, expr, _build_request) + dense_budget = _effective_filter_budget(dense_args, expr, _build_request) + assert dense_budget < sparse_budget + + +def test_cql_json_filter_is_not_chunked(): + """Chunking applies only to cql-text; cql-json is passed through unchanged.""" + from dataretrieval.waterdata import get_continuous + + clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" + expr = " OR ".join([clause] * 300) + sent_filters = [] + + def fake_construct_api_requests(**kwargs): + sent_filters.append(kwargs.get("filter")) + return _fake_prepared_request() + + with mock.patch( + "dataretrieval.waterdata.utils._construct_api_requests", + side_effect=fake_construct_api_requests, + ), mock.patch( + "dataretrieval.waterdata.utils._walk_pages", + return_value=( + pd.DataFrame({"id": ["row-1"], "value": [1]}), + _fake_response(), + ), + ): + get_continuous( + monitoring_location_id="USGS-07374525", + parameter_code="72255", + filter=expr, + filter_lang="cql-json", + ) + + assert sent_filters == [expr] + + +@pytest.mark.parametrize( + "expr", + [ + # The motivating case — numeric-valued string field + "value >= 1000", + "value > 1000", + "value <= 1000", + "value < 1000", + "value = 1000", + "value != 1000", + "value >= 1000.5", + "value >= -50", + # Zero-padded codes: `parameter_code = 60` matches nothing + # because the real values are all `'00060'`-shaped + "parameter_code = 60", + "statistic_id = 11", + "district_code = 1", + "county_code != 0", + "hydrologic_unit_code = 20301030401", + # Channel-measurements numeric-looking string fields + "channel_flow > 500", + "channel_velocity >= 1.5", + # Scientific notation — floats expressed as 1e5, 1.5e-3 + "value > 1e5", + "value >= 2.5E+3", + "value < 1.5e-3", + # Leading-dot decimals (``.5`` is a fraction, not a typo) + "value > .5", + "value >= -.5", + "value < .5e-3", + # ``IN`` list form — same footgun, common pattern for codes + "parameter_code IN (60, 61)", + "value IN (10, 20, 30)", + "statistic_id in (11)", # case-insensitive, single-element + # ``NOT IN`` with numbers — same footgun via negation + "value NOT IN (1, 2, 3)", + "parameter_code not in (60, 61)", + # ``BETWEEN`` range form — same footgun + "value BETWEEN 5 AND 10", + "channel_flow between 100 and 500", + # ``NOT BETWEEN`` with numbers + "value NOT BETWEEN 0 AND 100", + "channel_flow not between 50 and 150", + # Composite expressions + "time >= '2023-01-01T00:00:00Z' AND value >= 1000", + "value > 1000 OR value < 0", + "parameter_code = 60 AND statistic_id = 11", + # Reverse (literal on the left) + "1000 <= value", + "60 = parameter_code", + ], +) +def test_check_numeric_filter_pitfall_raises(expr): + """Unquoted numeric comparisons against any field resolve + lexicographically on this API — every queryable is string-typed — + so reject them with a clear message before the request is sent.""" + with pytest.raises(ValueError, match="lexicographic"): + _check_numeric_filter_pitfall(expr) + + +@pytest.mark.parametrize( + "expr", + [ + # Quoted literals — caller has opted into string comparison + "value >= '1000'", + "value = '42.5'", + "parameter_code = '00060'", + "district_code = '01'", + "hydrologic_unit_code = '020301030401'", + # Pure string comparisons + "time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-02T00:00:00Z'", + "monitoring_location_id = 'USGS-02238500'", + "approval_status = 'Approved'", + "qualifier IN ('A', 'P')", + "parameter_code IN ('00060', '00065')", + "value BETWEEN '1' AND '9'", + # Footgun identifiers appearing only inside string literals + "monitoring_location_id = 'USGS-value >= 1000'", + "name = 'why I care about parameter_code = 60'", + "note = 'see district_code = 1 in docs'", + "note = 'quoted: value IN (10, 20) within literal'", + # Multi-clause where every comparison is quoted + "parameter_code = '00060' AND statistic_id = '00011'", + # CQL escape-quote (``O''Reilly``) within a quoted literal + "name = 'O''Reilly 1000'", + # Identifiers that start with "NOT" (e.g. ``NOTES``) must not be + # mistakenly treated as the CQL negation keyword + "NOTES = 'hello'", + "NOTE_VAL LIKE 'anything%'", + ], +) +def test_check_numeric_filter_pitfall_allows(expr): + """Quoted literals and comparisons that don't pair a field with an + unquoted numeric literal must not trigger the check.""" + _check_numeric_filter_pitfall(expr) # must not raise + + +@pytest.mark.parametrize( + "expr,field,op", + [ + ("value NOT IN (1, 2)", "value", "NOT IN"), + ("parameter_code NOT IN (60, 61)", "parameter_code", "NOT IN"), + ("value IN (1, 2)", "value", "IN"), + ("value NOT BETWEEN 0 AND 10", "value", "NOT BETWEEN"), + ("channel_flow between 100 and 500", "channel_flow", "BETWEEN"), + ], +) +def test_pitfall_error_names_real_field_not_NOT_keyword(expr, field, op): + """The CQL keyword ``NOT`` must not be reported as the offending field + — the error should identify the actual column and include ``NOT`` as + part of the operator form so the caller knows what to quote.""" + with pytest.raises(ValueError) as exc: + _check_numeric_filter_pitfall(expr) + msg = str(exc.value) + assert f"against {field!r}" in msg, msg + assert op.upper() in msg.upper(), msg + + +def test_get_continuous_surfaces_pitfall_to_caller(): + """End-to-end: the check runs at the ``get_continuous`` boundary, + not as a deep internal-only protection, so callers see the error + before any HTTP traffic.""" + from dataretrieval.waterdata import get_continuous + + with mock.patch("dataretrieval.waterdata.utils._construct_api_requests") as build: + with pytest.raises(ValueError, match="lexicographic"): + get_continuous( + monitoring_location_id="USGS-02238500", + parameter_code="00060", + filter="value >= 1000", + filter_lang="cql-text", + ) + build.assert_not_called() diff --git a/tests/waterdata_nearest_test.py b/tests/waterdata_nearest_test.py index 9873d82..4dc0ab9 100644 --- a/tests/waterdata_nearest_test.py +++ b/tests/waterdata_nearest_test.py @@ -9,7 +9,7 @@ import pandas as pd import pytest -from dataretrieval.waterdata.api import get_nearest_continuous +from dataretrieval.waterdata.nearest import get_nearest_continuous def _fake_df(rows): @@ -26,7 +26,7 @@ def _fake_df(rows): @pytest.fixture def patch_get_continuous(): """Replace ``waterdata.api.get_continuous`` with a controllable stub.""" - with mock.patch("dataretrieval.waterdata.api.get_continuous") as m: + with mock.patch("dataretrieval.waterdata.nearest.get_continuous") as m: yield m diff --git a/tests/waterdata_utils_test.py b/tests/waterdata_utils_test.py index ebfc685..36150be 100644 --- a/tests/waterdata_utils_test.py +++ b/tests/waterdata_utils_test.py @@ -1,42 +1,13 @@ -from datetime import timedelta -from types import SimpleNamespace from unittest import mock -from urllib.parse import parse_qs, urlsplit -import pandas as pd -import pytest import requests from dataretrieval.waterdata.utils import ( - _CQL_FILTER_CHUNK_LEN, - _WATERDATA_URL_BYTE_LIMIT, - _chunk_cql_or, - _construct_api_requests, - _effective_filter_budget, _get_args, - _split_top_level_or, _walk_pages, ) -def _query_params(prepared_request): - return parse_qs(urlsplit(prepared_request.url).query) - - -def _fake_prepared_request(url="https://example.test"): - """Stand-in for the object ``_construct_api_requests`` returns.""" - return SimpleNamespace(url=url, method="GET", headers={}) - - -def _fake_response(url="https://example.test", elapsed_ms=1): - """Stand-in for the response object ``_walk_pages`` returns.""" - return SimpleNamespace( - url=url, - elapsed=timedelta(milliseconds=elapsed_ms), - headers={}, - ) - - def test_get_args_basic(): local_vars = { "monitoring_location_id": "123", @@ -109,418 +80,3 @@ def test_walk_pages_multiple_mocked(): assert mock_client.send.called assert mock_client.request.called assert mock_client.request.call_args[0][1] == "https://example.com/page2" - - -def test_construct_filter_passthrough(): - """`filter` is forwarded verbatim as a query parameter.""" - expr = ( - "(time >= '2023-01-06T16:00:00Z' AND time <= '2023-01-06T18:00:00Z') " - "OR (time >= '2023-01-10T18:00:00Z' AND time <= '2023-01-10T20:00:00Z')" - ) - req = _construct_api_requests( - service="continuous", - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - ) - qs = _query_params(req) - assert qs["filter"] == [expr] - - -def test_construct_filter_lang_hyphenated(): - """The Python kwarg `filter_lang` is sent as URL key `filter-lang`.""" - req = _construct_api_requests( - service="continuous", - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter="time >= '2023-01-01T00:00:00Z'", - filter_lang="cql-text", - ) - qs = _query_params(req) - assert qs["filter-lang"] == ["cql-text"] - assert "filter_lang" not in qs - - -def test_split_top_level_or_simple(): - parts = _split_top_level_or("A OR B OR C") - assert parts == ["A", "B", "C"] - - -def test_split_top_level_or_case_insensitive(): - assert _split_top_level_or("A or B Or C") == ["A", "B", "C"] - - -def test_split_top_level_or_respects_parens(): - assert _split_top_level_or("(A OR B) OR (C OR D)") == ["(A OR B)", "(C OR D)"] - - -def test_split_top_level_or_respects_quotes(): - expr = "name = 'foo OR bar' OR id = 1" - assert _split_top_level_or(expr) == ["name = 'foo OR bar'", "id = 1"] - - -def test_split_top_level_or_handles_doubled_quote_escape(): - """CQL text escapes a single quote inside a literal as ``''``. The - two quotes are adjacent, so the scanner's naive toggle-on-quote logic - happens to land back in the correct state with nothing between the - toggles to misclassify. Lock that behavior in so a future refactor - can't regress it.""" - cases = [ - ("name = 'O''Reilly OR Co' OR id = 1", ["name = 'O''Reilly OR Co'", "id = 1"]), - ("name = 'It''s' OR id = 1", ["name = 'It''s'", "id = 1"]), - ( - "name = 'alpha ''or'' beta' OR id = 1", - ["name = 'alpha ''or'' beta'", "id = 1"], - ), - ("'x'' OR ''y' OR id = 1", ["'x'' OR ''y'", "id = 1"]), - ] - for expr, expected in cases: - assert _split_top_level_or(expr) == expected, expr - - -def test_split_top_level_or_single_clause(): - assert _split_top_level_or("time >= '2023-01-01T00:00:00Z'") == [ - "time >= '2023-01-01T00:00:00Z'" - ] - - -def test_chunk_cql_or_short_passthrough(): - expr = "time >= '2023-01-01T00:00:00Z'" - assert _chunk_cql_or(expr, max_len=1000) == [expr] - - -def test_chunk_cql_or_splits_into_multiple(): - clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" - expr = " OR ".join([clause] * 200) - chunks = _chunk_cql_or(expr, max_len=1000) - # each chunk must be under the budget - assert all(len(c) <= 1000 for c in chunks) - # rejoined chunks must cover every clause - rejoined_clauses = sum(len(c.split(" OR ")) for c in chunks) - assert rejoined_clauses == 200 - # and must be a valid OR chain (each chunk is itself a top-level OR of clauses) - assert len(chunks) > 1 - - -def test_chunk_cql_or_unsplittable_returns_input(): - big = "value > 0 AND " + ("A " * 4000) - assert _chunk_cql_or(big, max_len=1000) == [big] - - -def test_chunk_cql_or_single_clause_over_budget_returns_input(): - huge_clause = "(value > " + "9" * 6000 + ")" - expr = f"{huge_clause} OR (value > 0)" - assert _chunk_cql_or(expr, max_len=1000) == [expr] - - -@pytest.mark.parametrize( - "service", - [ - "daily", - "continuous", - "monitoring-locations", - "time-series-metadata", - "latest-continuous", - "latest-daily", - "field-measurements", - "channel-measurements", - ], -) -def test_construct_filter_on_all_ogc_services(service): - """Filter passthrough works uniformly for every OGC collection endpoint.""" - req = _construct_api_requests( - service=service, - filter="value > 0", - filter_lang="cql-text", - ) - qs = _query_params(req) - assert qs["filter"] == ["value > 0"] - assert qs["filter-lang"] == ["cql-text"] - - -def test_long_filter_fans_out_into_multiple_requests(): - """An oversized top-level OR filter triggers multiple HTTP requests - whose results are concatenated.""" - from dataretrieval.waterdata import get_continuous - - clause = ( - "(time >= '2023-01-{day:02d}T00:00:00Z' " - "AND time <= '2023-01-{day:02d}T00:30:00Z')" - ) - expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) - assert len(expr) > _CQL_FILTER_CHUNK_LEN - - sent_filters = [] - - def fake_construct_api_requests(**kwargs): - sent_filters.append(kwargs.get("filter")) - return _fake_prepared_request() - - def fake_walk_pages(*_args, **_kwargs): - idx = len(sent_filters) - frame = pd.DataFrame({"id": [f"chunk-{idx}"], "value": [idx]}) - return frame, _fake_response() - - with mock.patch( - "dataretrieval.waterdata.utils._construct_api_requests", - side_effect=fake_construct_api_requests, - ), mock.patch( - "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages - ), mock.patch( - "dataretrieval.waterdata.utils._effective_filter_budget", - return_value=_CQL_FILTER_CHUNK_LEN, - ): - df, _ = get_continuous( - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - filter_lang="cql-text", - ) - - # Mocking _effective_filter_budget bypasses the URL-length probe, so - # sent_filters contains only real chunk requests. Assert invariants: - # chunking happened, every original clause is preserved exactly once - # in order, each chunk stays under the budget, and the mock's - # one-row-per-chunk responses concatenate to a row per chunk. - expected_parts = _split_top_level_or(expr) - assert len(sent_filters) > 1 - rejoined_parts = [] - for chunk in sent_filters: - rejoined_parts.extend(_split_top_level_or(chunk)) - assert rejoined_parts == expected_parts - assert len(df) == len(sent_filters) - assert all(len(chunk) <= _CQL_FILTER_CHUNK_LEN for chunk in sent_filters) - - -def test_long_filter_deduplicates_cross_chunk_overlap(): - """Features returned by multiple chunks (same feature `id`) are - deduplicated in the concatenated result.""" - from dataretrieval.waterdata import get_continuous - - clause = ( - "(time >= '2023-01-{day:02d}T00:00:00Z' " - "AND time <= '2023-01-{day:02d}T00:30:00Z')" - ) - expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) - - call_count = {"n": 0} - - def fake_walk_pages(*_args, **_kwargs): - call_count["n"] += 1 - frame = pd.DataFrame({"id": ["shared-feature"], "value": [1]}) - return frame, _fake_response() - - with mock.patch( - "dataretrieval.waterdata.utils._construct_api_requests", - return_value=_fake_prepared_request(), - ), mock.patch( - "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages - ), mock.patch( - "dataretrieval.waterdata.utils._effective_filter_budget", - return_value=_CQL_FILTER_CHUNK_LEN, - ): - df, _ = get_continuous( - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - filter_lang="cql-text", - ) - - # Chunking must have happened (otherwise dedup wouldn't be exercised). - assert call_count["n"] > 1 - # Even though each chunk returned a feature, dedup by id collapses them. - assert len(df) == 1 - - -def test_empty_chunks_do_not_downgrade_geodataframe(): - """A mix of empty and non-empty chunk responses must not downgrade a - GeoDataFrame-typed result to a plain DataFrame. ``_get_resp_data`` - returns ``pd.DataFrame()`` on empty responses, which would otherwise - strip geometry/CRS from the concatenated output.""" - pytest.importorskip("geopandas") - import geopandas as gpd - from shapely.geometry import Point - - from dataretrieval.waterdata import get_continuous - - clause = ( - "(time >= '2023-01-{day:02d}T00:00:00Z' " - "AND time <= '2023-01-{day:02d}T00:30:00Z')" - ) - expr = " OR ".join(clause.format(day=(i % 28) + 1) for i in range(300)) - - call_count = {"n": 0} - - def fake_walk_pages(*_args, **_kwargs): - call_count["n"] += 1 - # Chunk 2 returns empty; chunks 1 and 3 return GeoDataFrames. - if call_count["n"] == 2: - frame = pd.DataFrame() - else: - frame = gpd.GeoDataFrame( - {"id": [f"feat-{call_count['n']}"], "value": [call_count["n"]]}, - geometry=[Point(call_count["n"], call_count["n"])], - crs="EPSG:4326", - ) - return frame, _fake_response() - - with mock.patch( - "dataretrieval.waterdata.utils._construct_api_requests", - return_value=_fake_prepared_request(), - ), mock.patch( - "dataretrieval.waterdata.utils._walk_pages", side_effect=fake_walk_pages - ), mock.patch( - "dataretrieval.waterdata.utils._effective_filter_budget", - return_value=_CQL_FILTER_CHUNK_LEN, - ): - df, _ = get_continuous( - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - filter_lang="cql-text", - ) - - # The empty chunk must not have stripped the GeoDataFrame type. - assert isinstance(df, gpd.GeoDataFrame) - assert "geometry" in df.columns - assert df.crs is not None - - -def test_effective_filter_budget_respects_url_limit(): - """The computed budget, once encoded, fits within the URL byte limit - alongside the other query params.""" - from urllib.parse import quote_plus - - filter_expr = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" - args = { - "service": "continuous", - "monitoring_location_id": "USGS-02238500", - "parameter_code": "00060", - "filter": filter_expr, - "filter_lang": "cql-text", - } - raw_budget = _effective_filter_budget(args, filter_expr) - - # Build a chunk exactly at the raw budget (padded with the clause repeated) - # and confirm the full URL it produces stays under the URL byte limit. - padded = (" OR ".join([filter_expr] * 200))[:raw_budget] - req = _construct_api_requests(**{**args, "filter": padded}) - assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT - # And the budget scales inversely with encoding ratio (sanity). - assert raw_budget < _WATERDATA_URL_BYTE_LIMIT - # Quick sanity on the encoding math itself. - assert len(quote_plus(padded)) <= _WATERDATA_URL_BYTE_LIMIT - - -def test_effective_filter_budget_uses_max_clause_ratio(): - """Heavy clauses clustered in one part of the filter must not be able - to push any chunk over the URL limit. The budget is computed against - the max per-clause encoding ratio, not the whole-filter average, so - a chunk of only-heaviest-clauses still fits.""" - from urllib.parse import quote_plus - - heavy = ( - "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z' " - "AND approval_status IN ('Approved','Provisional','Revised'))" - ) - light = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" - # Heavy ratio < light ratio for these shapes; cluster them at opposite - # ends so the chunker must produce at least one light-only chunk. - clauses = [heavy] * 100 + [light] * 400 - expr = " OR ".join(clauses) - args = { - "service": "continuous", - "monitoring_location_id": "USGS-02238500", - "filter": expr, - "filter_lang": "cql-text", - } - budget = _effective_filter_budget(args, expr) - chunks = _chunk_cql_or(expr, max_len=budget) - assert len(chunks) > 1 - - # Every chunk, once built into a full request, fits under the URL byte - # limit — even the all-light chunks that have a higher-than-average ratio. - for chunk in chunks: - req = _construct_api_requests(**{**args, "filter": chunk}) - assert len(req.url) <= _WATERDATA_URL_BYTE_LIMIT, ( - f"chunk url {len(req.url)} exceeds {_WATERDATA_URL_BYTE_LIMIT}" - ) - - # Budget should be tight enough that a chunk of only-light clauses - # (the heavier-encoding shape here) still fits. - assert len(quote_plus(light)) * (budget // len(light)) < _WATERDATA_URL_BYTE_LIMIT - - -def test_effective_filter_budget_passes_through_when_no_url_space(): - """If the non-filter URL already exceeds the byte limit, chunking - cannot make the request succeed. The budget helper should signal - pass-through (return a budget larger than the filter) so - ``_chunk_cql_or`` emits one chunk — one 414 from the server is - clearer than a burst of N guaranteed-414 sub-requests.""" - expr = " OR ".join( - ["(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')"] * 50 - ) - with mock.patch( - "dataretrieval.waterdata.utils._construct_api_requests", - return_value=_fake_prepared_request(url="https://example.test/" + "A" * 9000), - ): - budget = _effective_filter_budget({"filter": expr}, expr) - # Budget is large enough that _chunk_cql_or returns the expression - # unchanged (passthrough) rather than producing many small chunks. - assert budget > len(expr) - assert _chunk_cql_or(expr, max_len=budget) == [expr] - - -def test_effective_filter_budget_shrinks_with_more_url_params(): - """Adding more scalar query params consumes URL bytes and should - shrink the raw filter budget accordingly. Use a filter large enough - to skip the short-circuit fast path so the probe actually runs.""" - clause = "(time >= '2023-01-15T00:00:00Z' AND time <= '2023-01-15T00:30:00Z')" - expr = " OR ".join([clause] * 100) - sparse_args = { - "service": "continuous", - "monitoring_location_id": "USGS-02238500", - "filter": expr, - "filter_lang": "cql-text", - } - dense_args = { - **sparse_args, - "parameter_code": "00060", - "statistic_id": "00003", - "last_modified": "2023-01-01T00:00:00Z/2023-12-31T23:59:59Z", - } - sparse_budget = _effective_filter_budget(sparse_args, expr) - dense_budget = _effective_filter_budget(dense_args, expr) - assert dense_budget < sparse_budget - - -def test_cql_json_filter_is_not_chunked(): - """Chunking applies only to cql-text; cql-json is passed through unchanged.""" - from dataretrieval.waterdata import get_continuous - - clause = "(time >= '2023-01-01T00:00:00Z' AND time <= '2023-01-01T00:30:00Z')" - expr = " OR ".join([clause] * 300) - sent_filters = [] - - def fake_construct_api_requests(**kwargs): - sent_filters.append(kwargs.get("filter")) - return _fake_prepared_request() - - with mock.patch( - "dataretrieval.waterdata.utils._construct_api_requests", - side_effect=fake_construct_api_requests, - ), mock.patch( - "dataretrieval.waterdata.utils._walk_pages", - return_value=( - pd.DataFrame({"id": ["row-1"], "value": [1]}), - _fake_response(), - ), - ): - get_continuous( - monitoring_location_id="USGS-07374525", - parameter_code="72255", - filter=expr, - filter_lang="cql-json", - ) - - assert sent_filters == [expr]