From 7371be73019d35c8bdcc84d8a615d04c3906a28f Mon Sep 17 00:00:00 2001 From: Cody Fincher Date: Sat, 13 Jun 2026 17:45:43 +0000 Subject: [PATCH 1/5] feat: add native metadata statistics --- docs/reference/adapters/adbc.rst | 46 +++ docs/reference/adapters/arrow_odbc.rst | 12 + sqlspec/adapters/adbc/data_dictionary.py | 290 ++++++++++++++- .../adapters/arrow_odbc/data_dictionary.py | 73 +++- sqlspec/data_dictionary/__init__.py | 2 + sqlspec/data_dictionary/_types.py | 14 + .../adapters/adbc/test_data_dictionary.py | 51 +++ .../integration/adapters/contracts/README.md | 3 +- .../integration/adapters/contracts/_cases.py | 6 + .../adapters/contracts/behaviors.py | 55 ++- .../contracts/test_metadata_contract.py | 23 ++ .../test_adbc/test_data_dictionary.py | 335 ++++++++++++++++++ tests/unit/adapters/test_arrow_odbc.py | 83 ++++- .../test_contract_capability_flags.py | 4 + tests/unit/data_dictionary/test_types.py | 27 ++ 15 files changed, 1012 insertions(+), 12 deletions(-) create mode 100644 tests/integration/adapters/contracts/test_metadata_contract.py create mode 100644 tests/unit/adapters/test_adbc/test_data_dictionary.py create mode 100644 tests/unit/data_dictionary/test_types.py diff --git a/docs/reference/adapters/adbc.rst b/docs/reference/adapters/adbc.rst index 41743bad8..bdee8d3d8 100644 --- a/docs/reference/adapters/adbc.rst +++ b/docs/reference/adapters/adbc.rst @@ -231,3 +231,49 @@ Data Dictionary .. autoclass:: sqlspec.adapters.adbc.data_dictionary.AdbcDataDictionary :members: :show-inheritance: + +Native Metadata And Statistics +============================== + +``AdbcDataDictionary`` prefers the standardized ADBC metadata APIs +(``adbc_get_objects``, ``adbc_get_table_schema``) and falls back to +dialect-specific SQL introspection when the underlying driver raises +``NotSupportedError`` or ``OperationalError``. ``get_statistics`` wraps +``adbc_get_statistics`` and has no SQL fallback: unsupported drivers raise +:exc:`sqlspec.exceptions.OperationalError`. + +.. list-table:: ADBC native metadata support (driver manager 1.11.0) + :header-rows: 1 + + * - Backend + - GetObjects (tables/columns/foreign keys) + - GetStatistics + * - PostgreSQL + - Native + - Native (approximate; run ``ANALYZE`` for fresh estimates) + * - SQLite + - Native (type names populated; nullability unreliable) + - Unsupported (raises ``OperationalError``) + * - DuckDB + - Native for single tables (types filled from the Arrow table schema); + schema-wide column listings fall back to SQL + - Unsupported (raises ``OperationalError``) + * - Flight SQL / GizmoSQL + - Native (server dependent) + - Server dependent + * - BigQuery + - SQL fallback + - Unverified + +Precision limits: + +- ADBC name filters are SQL ``LIKE`` patterns; SQLSpec post-filters results by + exact table name, but schema filters containing ``_`` or ``%`` may match + more broadly on the server side. +- The SQLite driver reports ``xdbc_is_nullable`` as ``YES`` even for + ``NOT NULL`` columns. +- Index metadata always uses SQL introspection; ADBC GetObjects has no + portable index representation. +- ``get_statistics`` maps the standard ADBC statistic keys 0-6 to their + canonical names (``adbc.statistic.row_count`` and friends); driver-specific + keys are reported numerically. diff --git a/docs/reference/adapters/arrow_odbc.rst b/docs/reference/adapters/arrow_odbc.rst index 22c57ffc9..8df0b0324 100644 --- a/docs/reference/adapters/arrow_odbc.rst +++ b/docs/reference/adapters/arrow_odbc.rst @@ -35,3 +35,15 @@ Data Dictionary .. autoclass:: sqlspec.adapters.arrow_odbc.data_dictionary.ArrowOdbcDataDictionary :members: :show-inheritance: + +Schema Discovery +================ + +``ArrowOdbcDataDictionary.get_columns`` first uses bundled dialect catalog +queries. When no query exists for the detected dialect (or it returns no +rows) and a table name is given, the driver issues a zero-row probe +(``SELECT * FROM "schema"."table" WHERE 1=0``) and derives column names, +ordering, nullability, and SQL type names from the Arrow reader schema. +Arrow-derived type names are approximations (for example ``VARCHAR`` for any +string column); ``mssql_python`` and other ODBC adapters without native +metadata APIs remain SQL-only. diff --git a/sqlspec/adapters/adbc/data_dictionary.py b/sqlspec/adapters/adbc/data_dictionary.py index 5cf05158c..0c8f1a3f5 100644 --- a/sqlspec/adapters/adbc/data_dictionary.py +++ b/sqlspec/adapters/adbc/data_dictionary.py @@ -1,7 +1,9 @@ """ADBC multi-dialect data dictionary for metadata queries.""" -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, Final +from adbc_driver_manager import NotSupportedError as AdbcNotSupportedError +from adbc_driver_manager import OperationalError as AdbcOperationalError from mypy_extensions import mypyc_attr from sqlspec.adapters.sqlite.core import format_identifier @@ -10,6 +12,7 @@ ForeignKeyMetadata, IndexMetadata, TableMetadata, + TableStatisticsMetadata, VersionInfo, get_data_dictionary_loader, get_dialect_config, @@ -25,7 +28,8 @@ from sqlspec.data_dictionary.dialects.postgres import resolve_postgres_json_type from sqlspec.data_dictionary.dialects.sqlite import resolve_sqlite_json_type from sqlspec.driver import SyncDataDictionaryBase -from sqlspec.exceptions import SQLFileNotFoundError +from sqlspec.exceptions import OperationalError, SQLFileNotFoundError +from sqlspec.utils.logging import get_logger from sqlspec.utils.text import normalize_identifier if TYPE_CHECKING: @@ -34,6 +38,192 @@ __all__ = ("AdbcDataDictionary",) +logger = get_logger("sqlspec.adapters.adbc") + +_NATIVE_TABLE_TYPES: Final = frozenset({"table", "base table"}) +_NATIVE_FALLBACK_ERRORS: Final = (AdbcNotSupportedError, AdbcOperationalError) + + +class _NativeMetadataIncompleteError(Exception): + pass + + +def _iter_object_tables(rows: "list[dict[str, Any]]") -> "list[tuple[str | None, str | None, dict[str, Any]]]": + entries: list[tuple[str | None, str | None, dict[str, Any]]] = [] + for catalog in rows: + catalog_name = catalog.get("catalog_name") + for db_schema in catalog.get("catalog_db_schemas") or []: + schema_name = db_schema.get("db_schema_name") + entries.extend((catalog_name, schema_name, table) for table in db_schema.get("db_schema_tables") or []) + return entries + + +def _primary_key_columns(table: "dict[str, Any]") -> "set[str]": + names: set[str] = set() + for constraint in table.get("table_constraints") or []: + if str(constraint.get("constraint_type") or "").upper() == "PRIMARY KEY": + names.update(str(column) for column in constraint.get("constraint_column_names") or []) + return names + + +def _normalize_native_tables(rows: "list[dict[str, Any]]") -> "list[TableMetadata]": + tables: list[TableMetadata] = [] + for catalog_name, schema_name, table in _iter_object_tables(rows): + table_type = str(table.get("table_type") or "") + if table_type.lower() not in _NATIVE_TABLE_TYPES: + continue + metadata: TableMetadata = {"table_name": str(table["table_name"]), "table_type": table_type} + resolved_schema = schema_name or catalog_name + if resolved_schema: + metadata["schema_name"] = str(resolved_schema) + if catalog_name: + metadata["table_catalog"] = str(catalog_name) + if schema_name: + metadata["table_schema"] = str(schema_name) + tables.append(metadata) + return tables + + +def _normalize_native_columns( + rows: "list[dict[str, Any]]", table_name_exact: "str | None" = None +) -> "list[ColumnMetadata]": + columns: list[ColumnMetadata] = [] + for catalog_name, schema_name, table in _iter_object_tables(rows): + if str(table.get("table_type") or "").lower() not in _NATIVE_TABLE_TYPES: + continue + table_name = str(table["table_name"]) + if table_name_exact is not None and table_name != table_name_exact: + continue + primary_columns = _primary_key_columns(table) + resolved_schema = schema_name or catalog_name + for column in table.get("table_columns") or []: + entry: ColumnMetadata = {"table_name": table_name, "column_name": str(column["column_name"])} + if resolved_schema: + entry["schema_name"] = str(resolved_schema) + type_name = column.get("xdbc_type_name") + if type_name: + entry["data_type"] = str(type_name) + ordinal = column.get("ordinal_position") + if ordinal is not None: + entry["ordinal_position"] = int(ordinal) + nullable = column.get("xdbc_is_nullable") + if nullable is not None: + entry["is_nullable"] = str(nullable) + default = column.get("xdbc_column_def") + if default is not None: + entry["column_default"] = str(default) + size = column.get("xdbc_column_size") + if size is not None: + entry["max_length"] = int(size) + digits = column.get("xdbc_decimal_digits") + if digits is not None: + entry["numeric_scale"] = int(digits) + if entry["column_name"] in primary_columns: + entry["is_primary"] = True + columns.append(entry) + return columns + + +def _normalize_native_foreign_keys( + rows: "list[dict[str, Any]]", table_name_exact: "str | None" = None +) -> "list[ForeignKeyMetadata]": + keys: list[ForeignKeyMetadata] = [] + for catalog_name, schema_name, table in _iter_object_tables(rows): + table_name = str(table["table_name"]) + if table_name_exact is not None and table_name != table_name_exact: + continue + resolved_schema = schema_name or catalog_name + for constraint in table.get("table_constraints") or []: + if str(constraint.get("constraint_type") or "").upper() != "FOREIGN KEY": + continue + column_names = [str(column) for column in constraint.get("constraint_column_names") or []] + usage_entries = constraint.get("constraint_column_usage") or [] + constraint_name = constraint.get("constraint_name") + for column_name, usage in zip(column_names, usage_entries, strict=False): + referenced_schema = usage.get("fk_db_schema") or usage.get("fk_catalog") + keys.append( + ForeignKeyMetadata( + table_name=table_name, + column_name=column_name, + referenced_table=str(usage["fk_table"]), + referenced_column=str(usage["fk_column_name"]), + constraint_name=str(constraint_name) if constraint_name else None, + schema=str(resolved_schema) if resolved_schema else None, + referenced_schema=str(referenced_schema) if referenced_schema else None, + ) + ) + return keys + + +_ARROW_DECIMAL_FORMAT: Final = "DECIMAL({precision},{scale})" + + +def _arrow_type_to_sql(data_type: Any) -> str: + import pyarrow as pa + + types = pa.types + if types.is_boolean(data_type): + return "BOOLEAN" + if types.is_int8(data_type) or types.is_int16(data_type) or types.is_uint8(data_type) or types.is_uint16(data_type): + return "SMALLINT" + if types.is_int32(data_type) or types.is_uint32(data_type): + return "INTEGER" + if types.is_int64(data_type) or types.is_uint64(data_type): + return "BIGINT" + if types.is_float16(data_type) or types.is_float32(data_type): + return "REAL" + if types.is_float64(data_type): + return "DOUBLE" + if types.is_decimal(data_type): + return _ARROW_DECIMAL_FORMAT.format(precision=data_type.precision, scale=data_type.scale) + if types.is_string(data_type) or types.is_large_string(data_type): + return "VARCHAR" + if types.is_binary(data_type) or types.is_large_binary(data_type) or types.is_fixed_size_binary(data_type): + return "VARBINARY" + if types.is_date(data_type): + return "DATE" + if types.is_time(data_type): + return "TIME" + if types.is_timestamp(data_type): + return "TIMESTAMP" + return str(data_type).upper() + + +_ADBC_STATISTIC_NAMES: dict[int, str] = { + 0: "adbc.statistic.byte_width", + 1: "adbc.statistic.distinct_count", + 2: "adbc.statistic.max_byte_width", + 3: "adbc.statistic.max_value", + 4: "adbc.statistic.min_value", + 5: "adbc.statistic.null_count", + 6: "adbc.statistic.row_count", +} + + +def _normalize_native_statistics(rows: "list[dict[str, Any]]") -> "list[TableStatisticsMetadata]": + statistics: list[TableStatisticsMetadata] = [] + for catalog in rows: + catalog_name = catalog.get("catalog_name") + for db_schema in catalog.get("catalog_db_schemas") or []: + schema_name = db_schema.get("db_schema_name") + for entry in db_schema.get("db_schema_statistics") or []: + key = int(entry["statistic_key"]) + column_name = entry.get("column_name") + record: TableStatisticsMetadata = { + "table_name": str(entry["table_name"]), + "column_name": str(column_name) if column_name is not None else None, + "statistic_key": key, + "statistic_name": _ADBC_STATISTIC_NAMES.get(key, str(key)), + "statistic_value": entry.get("statistic_value"), + "is_approximate": bool(entry.get("statistic_is_approximate")), + } + if catalog_name: + record["catalog_name"] = str(catalog_name) + if schema_name: + record["schema_name"] = str(schema_name) + statistics.append(record) + return statistics + @mypyc_attr(allow_interpreted_subclasses=True, native_class=False) class AdbcDataDictionary(SyncDataDictionaryBase): @@ -182,6 +372,11 @@ def get_tables(self, driver: "AdbcDriver", schema: "str | None" = None) -> "list schema_name: str | None = self._resolve_schema(dialect, schema) self._log_schema_introspect(driver, schema_name=schema_name, table_name=None, operation="tables") + try: + return self._native_get_tables(driver, dialect, schema_name) + except _NATIVE_FALLBACK_ERRORS as exc: + logger.debug("ADBC native get_objects unavailable for tables: %s", exc) + if dialect == "bigquery": tables_table, kcu_table, rc_table = format_bigquery_information_schema_tables(schema_name) query_text = self._get_query_text(dialect, "tables_by_schema").format( @@ -209,6 +404,12 @@ def get_columns( else: self._log_table_describe(driver, schema_name=schema_name, table_name=table, operation="columns") + resolved_table = self._resolve_identifier(dialect, table) if table is not None else None + try: + return self._native_get_columns(driver, dialect, resolved_table, schema_name) + except (*_NATIVE_FALLBACK_ERRORS, _NativeMetadataIncompleteError) as exc: + logger.debug("ADBC native get_objects unavailable for columns: %s", exc) + if dialect == "bigquery": schema_prefix = format_bigquery_schema_prefix(schema_name) if table is None: @@ -241,6 +442,57 @@ def get_columns( schema_type=ColumnMetadata, ) + def _native_object_filters( + self, dialect: str, schema_name: "str | None", table_name: "str | None" + ) -> "dict[str, str | None]": + catalog_filter: str | None = None + db_schema_filter: str | None = None + if schema_name: + if dialect == "sqlite": + catalog_filter = schema_name + else: + db_schema_filter = schema_name + return {"catalog_filter": catalog_filter, "db_schema_filter": db_schema_filter, "table_name_filter": table_name} + + def _native_get_objects( + self, driver: "AdbcDriver", dialect: str, depth: str, schema_name: "str | None", table_name: "str | None" + ) -> "list[dict[str, Any]]": + filters = self._native_object_filters(dialect, schema_name, table_name) + reader = driver.connection.adbc_get_objects( + depth=depth, + catalog_filter=filters["catalog_filter"], + db_schema_filter=filters["db_schema_filter"], + table_name_filter=filters["table_name_filter"], + ) + return reader.read_all().to_pylist() # type: ignore[no-any-return] + + def _native_get_tables( + self, driver: "AdbcDriver", dialect: str, schema_name: "str | None" + ) -> "list[TableMetadata]": + rows = self._native_get_objects(driver, dialect, "tables", schema_name, None) + return _normalize_native_tables(rows) + + def _native_get_columns( + self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" + ) -> "list[ColumnMetadata]": + rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) + columns = _normalize_native_columns(rows, table_name_exact=table_name) + missing_types = [entry for entry in columns if "data_type" not in entry] + if not missing_types: + return columns + if table_name is None: + raise _NativeMetadataIncompleteError + filters = self._native_object_filters(dialect, schema_name, None) + arrow_schema = driver.connection.adbc_get_table_schema( + table_name, catalog_filter=filters["catalog_filter"], db_schema_filter=filters["db_schema_filter"] + ) + type_by_name = {field.name: _arrow_type_to_sql(field.type) for field in arrow_schema} + for entry in missing_types: + resolved = type_by_name.get(entry["column_name"]) + if resolved is not None: + entry["data_type"] = resolved + return columns + def get_indexes( self, driver: "AdbcDriver", table: "str | None" = None, schema: "str | None" = None ) -> "list[IndexMetadata]": @@ -322,6 +574,12 @@ def get_foreign_keys( else: self._log_table_describe(driver, schema_name=schema_name, table_name=table, operation="foreign_keys") + resolved_table = self._resolve_identifier(dialect, table) if table is not None else None + try: + return self._native_get_foreign_keys(driver, dialect, resolved_table, schema_name) + except _NATIVE_FALLBACK_ERRORS as exc: + logger.debug("ADBC native get_objects unavailable for foreign keys: %s", exc) + if dialect == "bigquery": _, kcu_table, rc_table = format_bigquery_information_schema_tables(schema_name) if table is None: @@ -361,3 +619,31 @@ def get_foreign_keys( table_name=resolved_table_name, schema_type=ForeignKeyMetadata, ) + + def _native_get_foreign_keys( + self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" + ) -> "list[ForeignKeyMetadata]": + rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) + return _normalize_native_foreign_keys(rows, table_name_exact=table_name) + + def get_statistics( + self, driver: "AdbcDriver", table: str, schema: "str | None" = None, *, approximate: bool = True + ) -> "list[TableStatisticsMetadata]": + """Get native driver statistics for a table via ADBC GetStatistics.""" + dialect = self._normalize_dialect(driver) + schema_name = self._resolve_schema(dialect, schema) + table_name = self._resolve_identifier(dialect, table) + self._log_table_describe(driver, schema_name=schema_name, table_name=table_name, operation="statistics") + filters = self._native_object_filters(dialect, schema_name, table_name) + try: + reader = driver.connection.adbc_get_statistics( + catalog_filter=filters["catalog_filter"], + db_schema_filter=filters["db_schema_filter"], + table_name_filter=filters["table_name_filter"], + approximate=approximate, + ) + except _NATIVE_FALLBACK_ERRORS as exc: + msg = f"ADBC driver for dialect {dialect!r} does not support native table statistics: {exc}" + raise OperationalError(msg) from exc + rows = reader.read_all().to_pylist() + return [entry for entry in _normalize_native_statistics(rows) if entry["table_name"] == table_name] diff --git a/sqlspec/adapters/arrow_odbc/data_dictionary.py b/sqlspec/adapters/arrow_odbc/data_dictionary.py index 2a7f02555..3123f5254 100644 --- a/sqlspec/adapters/arrow_odbc/data_dictionary.py +++ b/sqlspec/adapters/arrow_odbc/data_dictionary.py @@ -1,6 +1,6 @@ """Generic data dictionary for arrow-odbc connections.""" -from typing import TYPE_CHECKING, Any, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar, Final from mypy_extensions import mypyc_attr @@ -15,7 +15,7 @@ ) from sqlspec.driver import SyncDataDictionaryBase from sqlspec.exceptions import SQLFileNotFoundError -from sqlspec.utils.text import normalize_identifier +from sqlspec.utils.text import normalize_identifier, quote_identifier if TYPE_CHECKING: from sqlspec.adapters.arrow_odbc.driver import ArrowOdbcDriver @@ -24,6 +24,39 @@ __all__ = ("ArrowOdbcDataDictionary",) +_ARROW_DECIMAL_FORMAT: Final = "DECIMAL({precision},{scale})" + + +def _arrow_type_to_sql(data_type: Any) -> str: + import pyarrow as pa + + types = pa.types + if types.is_boolean(data_type): + return "BOOLEAN" + if types.is_int8(data_type) or types.is_int16(data_type) or types.is_uint8(data_type) or types.is_uint16(data_type): + return "SMALLINT" + if types.is_int32(data_type) or types.is_uint32(data_type): + return "INTEGER" + if types.is_int64(data_type) or types.is_uint64(data_type): + return "BIGINT" + if types.is_float16(data_type) or types.is_float32(data_type): + return "REAL" + if types.is_float64(data_type): + return "DOUBLE" + if types.is_decimal(data_type): + return _ARROW_DECIMAL_FORMAT.format(precision=data_type.precision, scale=data_type.scale) + if types.is_string(data_type) or types.is_large_string(data_type): + return "VARCHAR" + if types.is_binary(data_type) or types.is_large_binary(data_type) or types.is_fixed_size_binary(data_type): + return "VARBINARY" + if types.is_date(data_type): + return "DATE" + if types.is_time(data_type): + return "TIME" + if types.is_timestamp(data_type): + return "TIMESTAMP" + return str(data_type).upper() + @mypyc_attr(allow_interpreted_subclasses=True, native_class=False) class ArrowOdbcDataDictionary(SyncDataDictionaryBase): @@ -62,6 +95,29 @@ def resolve_identifier(self, identifier: str) -> str: """Return a runtime-dialect-normalized identifier.""" return normalize_identifier(identifier, self.get_dialect_config().name) + def _probe_columns(self, driver: "ArrowOdbcDriver", table: str, schema: "str | None") -> "list[ColumnMetadata]": + qualified = ( + quote_identifier(table) if schema is None else f"{quote_identifier(schema)}.{quote_identifier(table)}" + ) + probe_sql = f"SELECT * FROM {qualified} WHERE 1=0" + try: + reader = driver._read_arrow_batches(probe_sql, None, 1) # pyright: ignore[reportPrivateUsage] + except Exception: + return [] + columns: list[ColumnMetadata] = [] + for position, field in enumerate(reader.schema, start=1): + entry: ColumnMetadata = { + "table_name": table, + "column_name": field.name, + "data_type": _arrow_type_to_sql(field.type), + "is_nullable": bool(field.nullable), + "ordinal_position": position, + } + if schema is not None: + entry["schema_name"] = schema + columns.append(entry) + return columns + def get_version(self, driver: "ArrowOdbcDriver") -> VersionInfo | None: """Get database version information when the runtime dialect provides a query.""" driver_id = id(driver) @@ -116,13 +172,18 @@ def get_columns( ) -> list[ColumnMetadata]: """Get column metadata for dialects with bundled catalog queries.""" query_name = "columns_by_table" if table is not None else "columns_by_schema" - parameters: dict[str, Any] = {"schema_name": self.resolve_schema(schema)} + resolved_schema = self.resolve_schema(schema) + resolved_table = self.resolve_identifier(table) if table is not None else None + parameters: dict[str, Any] = {"schema_name": resolved_schema} if table is not None: - parameters["table_name"] = self.resolve_identifier(table) + parameters["table_name"] = resolved_table try: - return driver.select(self.get_query(query_name), schema_type=ColumnMetadata, **parameters) + rows = driver.select(self.get_query(query_name), schema_type=ColumnMetadata, **parameters) except SQLFileNotFoundError: - return [] + rows = [] + if rows or resolved_table is None: + return rows + return self._probe_columns(driver, resolved_table, resolved_schema) def get_indexes( self, driver: "ArrowOdbcDriver", table: str | None = None, schema: str | None = None diff --git a/sqlspec/data_dictionary/__init__.py b/sqlspec/data_dictionary/__init__.py index c139b0929..df6eda46d 100644 --- a/sqlspec/data_dictionary/__init__.py +++ b/sqlspec/data_dictionary/__init__.py @@ -16,6 +16,7 @@ ForeignKeyMetadata, IndexMetadata, TableMetadata, + TableStatisticsMetadata, VersionCacheResult, VersionInfo, ) @@ -32,6 +33,7 @@ "ForeignKeyMetadata", "IndexMetadata", "TableMetadata", + "TableStatisticsMetadata", "VersionCacheResult", "VersionInfo", "get_data_dictionary_loader", diff --git a/sqlspec/data_dictionary/_types.py b/sqlspec/data_dictionary/_types.py index 55b2eb683..90c543973 100644 --- a/sqlspec/data_dictionary/_types.py +++ b/sqlspec/data_dictionary/_types.py @@ -13,6 +13,7 @@ "ForeignKeyMetadata", "IndexMetadata", "TableMetadata", + "TableStatisticsMetadata", "VersionCacheResult", "VersionInfo", ) @@ -241,6 +242,19 @@ class FeatureVersions(TypedDict, total=False): supports_window_functions: "VersionInfo" +class TableStatisticsMetadata(TypedDict, total=False): + """Native driver statistics for a table or column.""" + + catalog_name: str + schema_name: str + table_name: str + column_name: str | None + statistic_key: int + statistic_name: str + statistic_value: int | float | str | bytes | None + is_approximate: bool + + @mypyc_attr(allow_interpreted_subclasses=False) class DialectConfig: """Static configuration for a database dialect.""" diff --git a/tests/integration/adapters/adbc/test_data_dictionary.py b/tests/integration/adapters/adbc/test_data_dictionary.py index d7a745c8d..2fd588d80 100644 --- a/tests/integration/adapters/adbc/test_data_dictionary.py +++ b/tests/integration/adapters/adbc/test_data_dictionary.py @@ -140,3 +140,54 @@ def test_adbc_data_dictionary_consistency(adbc_sync_driver: "AdbcDriver") -> Non assert version1.major == version2.major assert version1.minor == version2.minor assert version1.patch == version2.patch + + +@pytest.mark.adbc +def test_adbc_native_metadata_postgres(adbc_sync_driver: "AdbcDriver") -> None: + """Native GetObjects lists tables, typed columns, and foreign keys on PostgreSQL.""" + adbc_sync_driver.execute_script("DROP TABLE IF EXISTS dd_native_child") + adbc_sync_driver.execute_script("DROP TABLE IF EXISTS dd_native_parent") + adbc_sync_driver.execute_script("CREATE TABLE dd_native_parent (id INTEGER PRIMARY KEY, label TEXT NOT NULL)") + adbc_sync_driver.execute_script( + "CREATE TABLE dd_native_child (id INTEGER PRIMARY KEY, parent_id INTEGER REFERENCES dd_native_parent(id))" + ) + adbc_sync_driver.commit() + data_dict = adbc_sync_driver.data_dictionary + try: + tables = data_dict.get_tables(adbc_sync_driver) + table_names = {entry.get("table_name") for entry in tables} + assert {"dd_native_parent", "dd_native_child"} <= table_names + + columns = data_dict.get_columns(adbc_sync_driver, table="dd_native_parent") + by_name = {entry["column_name"]: entry for entry in columns} + assert set(by_name) >= {"id", "label"} + assert all(entry.get("data_type") for entry in by_name.values()) + + foreign_keys = data_dict.get_foreign_keys(adbc_sync_driver, table="dd_native_child") + assert any( + key.column_name == "parent_id" and key.referenced_table == "dd_native_parent" for key in foreign_keys + ) + finally: + adbc_sync_driver.execute_script("DROP TABLE IF EXISTS dd_native_child") + adbc_sync_driver.execute_script("DROP TABLE IF EXISTS dd_native_parent") + adbc_sync_driver.commit() + + +@pytest.mark.adbc +def test_adbc_native_statistics_postgres(adbc_sync_driver: "AdbcDriver") -> None: + """Native GetStatistics returns normalized entries on PostgreSQL.""" + adbc_sync_driver.execute_script("DROP TABLE IF EXISTS dd_native_stats") + adbc_sync_driver.execute_script("CREATE TABLE dd_native_stats (id INTEGER PRIMARY KEY, payload TEXT)") + adbc_sync_driver.execute_script("INSERT INTO dd_native_stats (id, payload) VALUES (1, 'a'), (2, 'b'), (3, 'c')") + adbc_sync_driver.execute_script("ANALYZE dd_native_stats") + adbc_sync_driver.commit() + try: + statistics = adbc_sync_driver.data_dictionary.get_statistics(adbc_sync_driver, "dd_native_stats") + assert isinstance(statistics, list) + for entry in statistics: + assert entry["table_name"] == "dd_native_stats" + assert isinstance(entry["statistic_key"], int) + assert isinstance(entry["is_approximate"], bool) + finally: + adbc_sync_driver.execute_script("DROP TABLE IF EXISTS dd_native_stats") + adbc_sync_driver.commit() diff --git a/tests/integration/adapters/contracts/README.md b/tests/integration/adapters/contracts/README.md index 60c386b23..19f1c99f5 100644 --- a/tests/integration/adapters/contracts/README.md +++ b/tests/integration/adapters/contracts/README.md @@ -90,7 +90,8 @@ Complete flag set: - **Types / codecs**: `supports_json`, `supports_json_native`, `supports_arrays`, `supports_native_array_codec`, `supports_vector`, `supports_lob` - **Schema / migrations**: `supports_migrations`, `supports_schema_qualified_ddl`, - `supports_multi_schema_migrations`, `supports_data_dictionary` + `supports_multi_schema_migrations`, `supports_data_dictionary`, `supports_native_metadata`, + `supports_native_statistics` - **Connectivity / transactions**: `supports_transactions`, `supports_exception_translation` - **Lifecycle (config-factory)**: `supports_pooling`, `supports_connection_hook`, `supports_connection_instance`, `supports_lowercase_columns`, `supports_uuid_feature`, diff --git a/tests/integration/adapters/contracts/_cases.py b/tests/integration/adapters/contracts/_cases.py index 1ffca6198..028e18544 100644 --- a/tests/integration/adapters/contracts/_cases.py +++ b/tests/integration/adapters/contracts/_cases.py @@ -67,6 +67,8 @@ class DriverCase: supports_custom_type_adapters: bool = False supports_multi_schema_migrations: bool = False supports_data_dictionary: bool = False + supports_native_metadata: bool = False + supports_native_statistics: bool = False config_factory_fixture: str | None = None deviations: tuple[str, ...] = () extra_assertions: tuple[str, ...] = () @@ -244,6 +246,7 @@ class DriverCaseContext: id="adbc-sqlite-sync", supports_arrow_streaming=True, supports_native_arrow=True, + supports_native_metadata=True, fixture_name="contract_adbc_sqlite_driver", adapter="adbc", dialect="sqlite", @@ -260,6 +263,7 @@ class DriverCaseContext: id="adbc-duckdb-sync", supports_arrow_streaming=True, supports_native_arrow=True, + supports_native_metadata=True, fixture_name="contract_adbc_duckdb_driver", adapter="adbc", dialect="duckdb", @@ -276,6 +280,8 @@ class DriverCaseContext: id="adbc-postgres-sync", supports_arrow_streaming=True, supports_native_arrow=True, + supports_native_metadata=True, + supports_native_statistics=True, fixture_name="contract_adbc_postgres_driver", adapter="adbc", dialect="postgres", diff --git a/tests/integration/adapters/contracts/behaviors.py b/tests/integration/adapters/contracts/behaviors.py index 06dfe01a6..e3f17629a 100644 --- a/tests/integration/adapters/contracts/behaviors.py +++ b/tests/integration/adapters/contracts/behaviors.py @@ -12,7 +12,7 @@ from sqlspec import SQL, SQLResult, StatementStack, sql from sqlspec.builder import Explain from sqlspec.core.filters import InCollectionFilter, LimitOffsetFilter, OrderByFilter, SearchFilter -from sqlspec.exceptions import ImproperConfigurationError, SQLParsingError, SQLSpecError +from sqlspec.exceptions import ImproperConfigurationError, OperationalError, SQLParsingError, SQLSpecError from sqlspec.utils.serializers import from_json, to_json from tests.integration.adapters.contracts._assertions import assert_result_data, assert_sql_result from tests.integration.adapters.contracts._cases import DriverCase @@ -23,7 +23,12 @@ ParameterStyleCase, StatementInputCase, ) -from tests.integration.adapters.contracts._schema import DEFAULT_CONTRACT_TABLE, ContractRow, ContractTable +from tests.integration.adapters.contracts._schema import ( + DEFAULT_CONTRACT_TABLE, + DUCKDB_CONTRACT_TABLE, + ContractRow, + ContractTable, +) if TYPE_CHECKING: from sqlspec.typing import ArrowRecordBatch @@ -4092,3 +4097,49 @@ async def assert_async_exception_contract(driver: object, violation: ExceptionVi with contextlib.suppress(Exception): await async_driver.execute_script(violation.teardown_script) await async_driver.commit() + + +def assert_sync_native_metadata_contract(driver: object, case: DriverCase) -> None: + """Assert native metadata discovery returns the contract table and its columns.""" + if not case.supports_native_metadata: + pytest.skip(f"{case.adapter} has no native metadata support") + sync_driver = cast("SyncContractDriver", driver) + data_dictionary = cast("Any", sync_driver).data_dictionary + tables = data_dictionary.get_tables(sync_driver) + table_names = {entry.get("table_name") for entry in tables} + assert case.table.name in table_names + columns = data_dictionary.get_columns(sync_driver, table=case.table.name) + column_names = {entry["column_name"] for entry in columns} + expected_columns = ( + {"name", "value", "note"} if case.table is DUCKDB_CONTRACT_TABLE else {"id", "name", "value", "note"} + ) + assert expected_columns <= column_names + typed = [entry for entry in columns if entry.get("data_type")] + assert typed + + +async def assert_async_native_metadata_contract(driver: object, case: DriverCase) -> None: + """Assert async native metadata discovery (no async adapter currently opts in).""" + if not case.supports_native_metadata: + pytest.skip(f"{case.adapter} has no native metadata support") + pytest.fail("async native metadata behavior must be implemented when an async adapter opts in") + + +def assert_sync_native_statistics_contract(driver: object, case: DriverCase) -> None: + """Assert native statistics succeed where supported and fail clearly elsewhere.""" + if not case.supports_native_metadata: + pytest.skip(f"{case.adapter} has no native metadata support") + sync_driver = cast("SyncContractDriver", driver) + data_dictionary = cast("Any", sync_driver).data_dictionary + if not hasattr(data_dictionary, "get_statistics"): + pytest.skip(f"{case.adapter} data dictionary exposes no get_statistics") + if not case.supports_native_statistics: + with pytest.raises(OperationalError): + data_dictionary.get_statistics(sync_driver, case.table.name) + return + statistics = data_dictionary.get_statistics(sync_driver, case.table.name) + assert isinstance(statistics, list) + for entry in statistics: + assert entry["table_name"] == case.table.name + assert isinstance(entry["statistic_name"], str) + assert isinstance(entry["is_approximate"], bool) diff --git a/tests/integration/adapters/contracts/test_metadata_contract.py b/tests/integration/adapters/contracts/test_metadata_contract.py new file mode 100644 index 000000000..3fcdebf1f --- /dev/null +++ b/tests/integration/adapters/contracts/test_metadata_contract.py @@ -0,0 +1,23 @@ +"""Shared adapter native metadata and statistics contracts.""" + +from tests.integration.adapters.contracts._cases import DriverCaseContext +from tests.integration.adapters.contracts.behaviors import ( + assert_async_native_metadata_contract, + assert_sync_native_metadata_contract, + assert_sync_native_statistics_contract, +) + + +def test_sync_native_metadata_contract(sync_driver_case: DriverCaseContext) -> None: + """Sync drivers with native metadata list contract tables and columns.""" + assert_sync_native_metadata_contract(sync_driver_case.driver, sync_driver_case.case) + + +async def test_async_native_metadata_contract(async_driver_case: DriverCaseContext) -> None: + """Async drivers with native metadata list contract tables and columns.""" + await assert_async_native_metadata_contract(async_driver_case.driver, async_driver_case.case) + + +def test_sync_native_statistics_contract(sync_driver_case: DriverCaseContext) -> None: + """Sync drivers surface native statistics or fail clearly.""" + assert_sync_native_statistics_contract(sync_driver_case.driver, sync_driver_case.case) diff --git a/tests/unit/adapters/test_adbc/test_data_dictionary.py b/tests/unit/adapters/test_adbc/test_data_dictionary.py new file mode 100644 index 000000000..6c313cfe1 --- /dev/null +++ b/tests/unit/adapters/test_adbc/test_data_dictionary.py @@ -0,0 +1,335 @@ +"""Unit tests for ADBC native metadata normalization and fallback.""" + +from typing import Any +from unittest.mock import Mock + +import pyarrow as pa +import pytest + +pytest.importorskip("adbc_driver_manager") + +from adbc_driver_manager import NotSupportedError as AdbcNotSupportedError + +from sqlspec.adapters.adbc.data_dictionary import ( + AdbcDataDictionary, + _arrow_type_to_sql, + _normalize_native_columns, + _normalize_native_foreign_keys, + _normalize_native_statistics, + _normalize_native_tables, +) +from sqlspec.exceptions import OperationalError + +SQLITE_OBJECTS_PAYLOAD: list[dict[str, Any]] = [ + { + "catalog_name": "main", + "catalog_db_schemas": [ + { + "db_schema_name": "", + "db_schema_tables": [ + { + "table_name": "t1", + "table_type": "table", + "table_columns": [ + { + "column_name": "id", + "ordinal_position": 1, + "xdbc_type_name": "INTEGER", + "xdbc_is_nullable": "YES", + }, + { + "column_name": "name", + "ordinal_position": 2, + "xdbc_type_name": "TEXT", + "xdbc_is_nullable": "YES", + }, + { + "column_name": "ref_id", + "ordinal_position": 3, + "xdbc_type_name": "INTEGER", + "xdbc_is_nullable": "YES", + }, + ], + "table_constraints": [ + { + "constraint_name": None, + "constraint_type": "PRIMARY KEY", + "constraint_column_names": ["id"], + "constraint_column_usage": None, + }, + { + "constraint_name": None, + "constraint_type": "FOREIGN KEY", + "constraint_column_names": ["ref_id"], + "constraint_column_usage": [ + {"fk_catalog": "main", "fk_db_schema": "", "fk_table": "t1", "fk_column_name": "id"} + ], + }, + ], + }, + {"table_name": "idx_name", "table_type": "index", "table_columns": [], "table_constraints": []}, + ], + } + ], + } +] + +DUCKDB_OBJECTS_PAYLOAD: list[dict[str, Any]] = [ + { + "catalog_name": "memory", + "catalog_db_schemas": [ + { + "db_schema_name": "main", + "db_schema_tables": [ + { + "table_name": "t2", + "table_type": "BASE TABLE", + "table_columns": [ + { + "column_name": "id", + "ordinal_position": 1, + "xdbc_is_nullable": "YES", + "xdbc_type_name": None, + }, + { + "column_name": "val", + "ordinal_position": 2, + "xdbc_is_nullable": "YES", + "xdbc_type_name": None, + }, + ], + "table_constraints": [], + } + ], + } + ], + } +] + + +def _make_reader(payload: list[dict[str, Any]]) -> Mock: + reader = Mock() + reader.read_all.return_value.to_pylist.return_value = payload + return reader + + +def test_normalize_native_tables_filters_index_rows() -> None: + """Native table normalization should ignore index leak rows and keep schema fallback.""" + tables = _normalize_native_tables(SQLITE_OBJECTS_PAYLOAD) + + assert len(tables) == 1 + assert tables[0]["table_name"] == "t1" + assert tables[0]["schema_name"] == "main" + assert all(entry["table_name"] != "idx_name" for entry in tables) + + +def test_normalize_native_columns_marks_primary_key() -> None: + """Native column normalization should mark primary-key columns and preserve types.""" + columns = _normalize_native_columns(SQLITE_OBJECTS_PAYLOAD, table_name_exact="t1") + by_name = {entry["column_name"]: entry for entry in columns} + + assert by_name["id"]["is_primary"] is True + assert by_name["id"]["data_type"] == "INTEGER" + assert by_name["id"]["ordinal_position"] == 1 + assert "is_primary" not in by_name["name"] + + +def test_normalize_native_columns_exact_table_filter() -> None: + """Native column normalization should respect exact table-name filtering.""" + assert _normalize_native_columns(SQLITE_OBJECTS_PAYLOAD, table_name_exact="other") == [] + + +def test_normalize_native_foreign_keys() -> None: + """Native foreign-key normalization should preserve table and schema linkage.""" + keys = _normalize_native_foreign_keys(SQLITE_OBJECTS_PAYLOAD, table_name_exact="t1") + + assert len(keys) == 1 + key = keys[0] + assert key.table_name == "t1" + assert key.column_name == "ref_id" + assert key.referenced_table == "t1" + assert key.referenced_column == "id" + assert key.constraint_name is None + assert key.schema == "main" + assert key.referenced_schema == "main" + + +@pytest.mark.parametrize( + ("data_type", "expected"), + [ + (pa.bool_(), "BOOLEAN"), + (pa.int16(), "SMALLINT"), + (pa.int32(), "INTEGER"), + (pa.int64(), "BIGINT"), + (pa.float32(), "REAL"), + (pa.float64(), "DOUBLE"), + (pa.decimal128(10, 2), "DECIMAL(10,2)"), + (pa.string(), "VARCHAR"), + (pa.binary(), "VARBINARY"), + (pa.date32(), "DATE"), + (pa.time64("us"), "TIME"), + (pa.timestamp("us"), "TIMESTAMP"), + ], +) +def test_arrow_type_to_sql_mapping(data_type: pa.DataType, expected: str) -> None: + """Arrow field types should map to SQL type strings for schema probing.""" + assert _arrow_type_to_sql(data_type) == expected + + +def test_get_tables_falls_back_to_sql_on_not_supported() -> None: + """Native GetObjects failures should fall back to the SQL table query path.""" + driver = Mock() + driver.dialect = "sqlite" + driver.connection.adbc_get_objects.side_effect = AdbcNotSupportedError("NOT_IMPLEMENTED") + driver.select.return_value = [{"table_name": "fallback"}] + + result = AdbcDataDictionary().get_tables(driver) + + assert result == [{"table_name": "fallback"}] + driver.select.assert_called_once() + + +def test_get_columns_schema_wide_incomplete_falls_back() -> None: + """Schema-wide native column discovery should fall back when type names are incomplete.""" + driver = Mock() + driver.dialect = "duckdb" + driver.connection.adbc_get_objects.return_value = _make_reader(DUCKDB_OBJECTS_PAYLOAD) + driver.select.return_value = [{"column_name": "fallback"}] + + result = AdbcDataDictionary().get_columns(driver) + + assert result == [{"column_name": "fallback"}] + driver.connection.adbc_get_table_schema.assert_not_called() + driver.select.assert_called_once() + + +def test_get_columns_single_table_enriched_from_table_schema() -> None: + """Single-table native column discovery should enrich missing types from the Arrow schema.""" + driver = Mock() + driver.dialect = "duckdb" + driver.connection.adbc_get_objects.return_value = _make_reader(DUCKDB_OBJECTS_PAYLOAD) + driver.connection.adbc_get_table_schema.return_value = pa.schema([("id", pa.int32()), ("val", pa.float64())]) + + result = AdbcDataDictionary().get_columns(driver, table="t2") + by_name = {entry["column_name"]: entry for entry in result} + + assert by_name["id"]["data_type"] == "INTEGER" + assert by_name["val"]["data_type"] == "DOUBLE" + driver.select.assert_not_called() + + +def test_normalize_native_statistics() -> None: + """Native statistics normalization should keep catalog and schema context.""" + payload = [ + { + "catalog_name": "db", + "catalog_db_schemas": [ + { + "db_schema_name": "public", + "db_schema_statistics": [ + { + "table_name": "items", + "column_name": None, + "statistic_key": 6, + "statistic_value": 42, + "statistic_is_approximate": True, + }, + { + "table_name": "items", + "column_name": "name", + "statistic_key": 5, + "statistic_value": 0, + "statistic_is_approximate": False, + }, + ], + } + ], + } + ] + + stats = _normalize_native_statistics(payload) + + assert len(stats) == 2 + assert stats[0]["catalog_name"] == "db" + assert stats[0]["schema_name"] == "public" + assert stats[0]["statistic_name"] == "adbc.statistic.row_count" + assert stats[0]["column_name"] is None + assert stats[0]["is_approximate"] is True + assert stats[1]["statistic_name"] == "adbc.statistic.null_count" + assert stats[1]["column_name"] == "name" + + +def test_normalize_native_statistics_unknown_key() -> None: + """Unknown statistics keys should preserve the numeric key as a string name.""" + payload = [ + { + "catalog_name": "db", + "catalog_db_schemas": [ + { + "db_schema_name": "public", + "db_schema_statistics": [ + { + "table_name": "items", + "column_name": None, + "statistic_key": 1100, + "statistic_value": 1, + "statistic_is_approximate": False, + } + ], + } + ], + } + ] + + stats = _normalize_native_statistics(payload) + + assert stats[0]["statistic_name"] == "1100" + + +def test_get_statistics_raises_operational_error_when_unsupported() -> None: + """Unsupported native statistics should raise sqlspec OperationalError.""" + driver = Mock() + driver.dialect = "sqlite" + driver.connection.adbc_get_statistics.side_effect = AdbcNotSupportedError("NOT_IMPLEMENTED") + + with pytest.raises(OperationalError, match="does not support native table statistics"): + AdbcDataDictionary().get_statistics(driver, "items") + + +def test_get_statistics_filters_exact_table() -> None: + """Native statistics should be filtered back to the exact requested table name.""" + driver = Mock() + driver.dialect = "duckdb" + driver.connection.adbc_get_statistics.return_value = _make_reader( + [ + { + "catalog_name": "memory", + "catalog_db_schemas": [ + { + "db_schema_name": "main", + "db_schema_statistics": [ + { + "table_name": "items", + "column_name": None, + "statistic_key": 6, + "statistic_value": 3, + "statistic_is_approximate": True, + }, + { + "table_name": "items_archive", + "column_name": None, + "statistic_key": 6, + "statistic_value": 7, + "statistic_is_approximate": True, + }, + ], + } + ], + } + ] + ) + + statistics = AdbcDataDictionary().get_statistics(driver, "items") + + assert len(statistics) == 1 + assert statistics[0]["table_name"] == "items" diff --git a/tests/unit/adapters/test_arrow_odbc.py b/tests/unit/adapters/test_arrow_odbc.py index ad0a2b3c2..0c08285a7 100644 --- a/tests/unit/adapters/test_arrow_odbc.py +++ b/tests/unit/adapters/test_arrow_odbc.py @@ -17,7 +17,8 @@ odbc_type_to_arrow, resolve_dialect_from_dbms_name, ) -from sqlspec.exceptions import SQLSpecError +from sqlspec.adapters.arrow_odbc.data_dictionary import ArrowOdbcDataDictionary +from sqlspec.exceptions import SQLFileNotFoundError, SQLSpecError if TYPE_CHECKING: from sqlspec.adapters.arrow_odbc._typing import ArrowOdbcConnection @@ -80,12 +81,92 @@ class ErrorConnection(FakeConnection): """Connection stub that raises an ODBC driver error.""" def read_arrow_batches(self, **kwargs: Any) -> FakeReader: + self.read_calls.append(kwargs) raise FakeOdbcError("read failed") def from_table_to_db(self, source: pa.Table, target: str, chunk_size: int = 1000) -> None: raise FakeOdbcError("insert failed") +def _empty_table_reader() -> FakeReader: + return FakeReader(pa.table({"id": pa.array([], type=pa.int64()), "name": pa.array([], type=pa.string())})) + + +def test_get_columns_probes_arrow_schema_when_query_missing(monkeypatch: pytest.MonkeyPatch) -> None: + """Missing bundled column SQL should fall back to a zero-row Arrow schema probe.""" + connection = FakeConnection() + driver = ArrowOdbcDriver(cast("ArrowOdbcConnection", connection), driver_features={"chunk_size": 2}) + + def read_arrow_batches(**kwargs: Any) -> FakeReader: + connection.read_calls.append(kwargs) + return _empty_table_reader() + + connection.read_arrow_batches = read_arrow_batches # type: ignore[assignment] + + def fail_get_query(self: ArrowOdbcDataDictionary, name: str) -> Any: + raise SQLFileNotFoundError(name) + + monkeypatch.setattr(ArrowOdbcDataDictionary, "get_query", fail_get_query) + + result = driver.data_dictionary.get_columns(driver, table="items") + + assert [entry["data_type"] for entry in result] == ["BIGINT", "VARCHAR"] + assert [entry["ordinal_position"] for entry in result] == [1, 2] + assert connection.read_calls[-1]["query"] == 'SELECT * FROM "items" WHERE 1=0' + + +def test_get_columns_probe_quotes_schema(monkeypatch: pytest.MonkeyPatch) -> None: + """Schema-qualified probes should quote both schema and table identifiers.""" + connection = FakeConnection() + driver = ArrowOdbcDriver(cast("ArrowOdbcConnection", connection), driver_features={"chunk_size": 2}) + + def read_arrow_batches(**kwargs: Any) -> FakeReader: + connection.read_calls.append(kwargs) + return _empty_table_reader() + + connection.read_arrow_batches = read_arrow_batches # type: ignore[assignment] + + def fail_get_query(self: ArrowOdbcDataDictionary, name: str) -> Any: + raise SQLFileNotFoundError(name) + + monkeypatch.setattr(ArrowOdbcDataDictionary, "get_query", fail_get_query) + + result = driver.data_dictionary.get_columns(driver, table="items", schema="dbo") + + assert [entry["schema_name"] for entry in result] == ["dbo", "dbo"] + assert connection.read_calls[-1]["query"] == 'SELECT * FROM "dbo"."items" WHERE 1=0' + + +def test_get_columns_schema_wide_does_not_probe(monkeypatch: pytest.MonkeyPatch) -> None: + """Schema-wide missing SQL should stay on the empty SQL fallback and skip probing.""" + connection = FakeConnection() + driver = ArrowOdbcDriver(cast("ArrowOdbcConnection", connection), driver_features={"chunk_size": 2}) + + def fail_get_query(self: ArrowOdbcDataDictionary, name: str) -> Any: + raise SQLFileNotFoundError(name) + + monkeypatch.setattr(ArrowOdbcDataDictionary, "get_query", fail_get_query) + + result = driver.data_dictionary.get_columns(driver) + + assert result == [] + assert connection.read_calls == [] + + +def test_get_columns_probe_failure_returns_empty(monkeypatch: pytest.MonkeyPatch) -> None: + """Probe failures should preserve the existing empty-result contract.""" + connection = ErrorConnection() + driver = ArrowOdbcDriver(cast("ArrowOdbcConnection", connection), driver_features={"chunk_size": 2}) + + def fail_get_query(self: ArrowOdbcDataDictionary, name: str) -> Any: + raise SQLFileNotFoundError(name) + + monkeypatch.setattr(ArrowOdbcDataDictionary, "get_query", fail_get_query) + + assert driver.data_dictionary.get_columns(driver, table="items") == [] + assert connection.read_calls[-1]["query"] == 'SELECT * FROM "items" WHERE 1=0' + + def test_resolve_dialect_from_dbms_name() -> None: """ODBC DBMS names and driver strings should map to SQLSpec dialects.""" assert resolve_dialect_from_dbms_name("Microsoft SQL Server") == "mssql" diff --git a/tests/unit/adapters/test_contract_capability_flags.py b/tests/unit/adapters/test_contract_capability_flags.py index ff152841a..5b638f29f 100644 --- a/tests/unit/adapters/test_contract_capability_flags.py +++ b/tests/unit/adapters/test_contract_capability_flags.py @@ -13,6 +13,8 @@ "supports_pooling", "supports_multi_schema_migrations", "supports_data_dictionary", + "supports_native_metadata", + "supports_native_statistics", "supports_arrow_streaming", "supports_native_arrow", "arrow_reader_honors_batch_size", @@ -51,6 +53,8 @@ def test_new_capability_flags_opt_in_independently() -> None: supports_pooling=True, supports_multi_schema_migrations=True, supports_data_dictionary=True, + supports_native_metadata=True, + supports_native_statistics=True, supports_arrow_streaming=True, supports_native_arrow=True, arrow_reader_honors_batch_size=True, diff --git a/tests/unit/data_dictionary/test_types.py b/tests/unit/data_dictionary/test_types.py new file mode 100644 index 000000000..5f45072fc --- /dev/null +++ b/tests/unit/data_dictionary/test_types.py @@ -0,0 +1,27 @@ +"""Unit tests for data dictionary metadata types.""" + +from sqlspec.data_dictionary import TableStatisticsMetadata + + +def test_table_statistics_metadata_constructible() -> None: + """TableStatisticsMetadata should accept the full native statistics shape.""" + entry: TableStatisticsMetadata = { + "catalog_name": "main", + "schema_name": "public", + "table_name": "items", + "column_name": None, + "statistic_key": 6, + "statistic_name": "adbc.statistic.row_count", + "statistic_value": 42, + "is_approximate": True, + } + + assert entry["statistic_name"] == "adbc.statistic.row_count" + assert entry["column_name"] is None + + +def test_table_statistics_metadata_partial() -> None: + """TableStatisticsMetadata should remain optional for incremental construction.""" + entry: TableStatisticsMetadata = {"table_name": "items", "statistic_key": 1} + + assert entry["table_name"] == "items" From b6ef8fed54b02569138651b04386714114f1eea5 Mon Sep 17 00:00:00 2001 From: Cody Fincher Date: Sat, 13 Jun 2026 19:05:14 +0000 Subject: [PATCH 2/5] fix: satisfy metadata statistics CI --- .../test_adbc/test_data_dictionary.py | 57 +++++++++---------- 1 file changed, 28 insertions(+), 29 deletions(-) diff --git a/tests/unit/adapters/test_adbc/test_data_dictionary.py b/tests/unit/adapters/test_adbc/test_data_dictionary.py index 6c313cfe1..eed391a86 100644 --- a/tests/unit/adapters/test_adbc/test_data_dictionary.py +++ b/tests/unit/adapters/test_adbc/test_data_dictionary.py @@ -208,7 +208,8 @@ def test_get_columns_single_table_enriched_from_table_schema() -> None: driver = Mock() driver.dialect = "duckdb" driver.connection.adbc_get_objects.return_value = _make_reader(DUCKDB_OBJECTS_PAYLOAD) - driver.connection.adbc_get_table_schema.return_value = pa.schema([("id", pa.int32()), ("val", pa.float64())]) + schema_fields: list[pa.Field[Any]] = [pa.field("id", pa.int32()), pa.field("val", pa.float64())] + driver.connection.adbc_get_table_schema.return_value = pa.schema(schema_fields) result = AdbcDataDictionary().get_columns(driver, table="t2") by_name = {entry["column_name"]: entry for entry in result} @@ -300,34 +301,32 @@ def test_get_statistics_filters_exact_table() -> None: """Native statistics should be filtered back to the exact requested table name.""" driver = Mock() driver.dialect = "duckdb" - driver.connection.adbc_get_statistics.return_value = _make_reader( - [ - { - "catalog_name": "memory", - "catalog_db_schemas": [ - { - "db_schema_name": "main", - "db_schema_statistics": [ - { - "table_name": "items", - "column_name": None, - "statistic_key": 6, - "statistic_value": 3, - "statistic_is_approximate": True, - }, - { - "table_name": "items_archive", - "column_name": None, - "statistic_key": 6, - "statistic_value": 7, - "statistic_is_approximate": True, - }, - ], - } - ], - } - ] - ) + driver.connection.adbc_get_statistics.return_value = _make_reader([ + { + "catalog_name": "memory", + "catalog_db_schemas": [ + { + "db_schema_name": "main", + "db_schema_statistics": [ + { + "table_name": "items", + "column_name": None, + "statistic_key": 6, + "statistic_value": 3, + "statistic_is_approximate": True, + }, + { + "table_name": "items_archive", + "column_name": None, + "statistic_key": 6, + "statistic_value": 7, + "statistic_is_approximate": True, + }, + ], + } + ], + } + ]) statistics = AdbcDataDictionary().get_statistics(driver, "items") From e87f4474a92bcdf6e2c9dad2aee40ac7f9db65dd Mon Sep 17 00:00:00 2001 From: Cody Fincher Date: Sat, 13 Jun 2026 19:13:11 +0000 Subject: [PATCH 3/5] fix: harden metadata statistics CI --- sqlspec/adapters/adbc/data_dictionary.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/sqlspec/adapters/adbc/data_dictionary.py b/sqlspec/adapters/adbc/data_dictionary.py index 0c8f1a3f5..26876b7cc 100644 --- a/sqlspec/adapters/adbc/data_dictionary.py +++ b/sqlspec/adapters/adbc/data_dictionary.py @@ -1,6 +1,6 @@ """ADBC multi-dialect data dictionary for metadata queries.""" -from typing import TYPE_CHECKING, Any, ClassVar, Final +from typing import TYPE_CHECKING, Any, ClassVar, Final, cast from adbc_driver_manager import NotSupportedError as AdbcNotSupportedError from adbc_driver_manager import OperationalError as AdbcOperationalError @@ -374,7 +374,7 @@ def get_tables(self, driver: "AdbcDriver", schema: "str | None" = None) -> "list try: return self._native_get_tables(driver, dialect, schema_name) - except _NATIVE_FALLBACK_ERRORS as exc: + except (*_NATIVE_FALLBACK_ERRORS, _NativeMetadataIncompleteError) as exc: logger.debug("ADBC native get_objects unavailable for tables: %s", exc) if dialect == "bigquery": @@ -464,7 +464,10 @@ def _native_get_objects( db_schema_filter=filters["db_schema_filter"], table_name_filter=filters["table_name_filter"], ) - return reader.read_all().to_pylist() # type: ignore[no-any-return] + rows = reader.read_all().to_pylist() + if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows): + raise _NativeMetadataIncompleteError + return cast("list[dict[str, Any]]", rows) def _native_get_tables( self, driver: "AdbcDriver", dialect: str, schema_name: "str | None" @@ -577,7 +580,7 @@ def get_foreign_keys( resolved_table = self._resolve_identifier(dialect, table) if table is not None else None try: return self._native_get_foreign_keys(driver, dialect, resolved_table, schema_name) - except _NATIVE_FALLBACK_ERRORS as exc: + except (*_NATIVE_FALLBACK_ERRORS, _NativeMetadataIncompleteError) as exc: logger.debug("ADBC native get_objects unavailable for foreign keys: %s", exc) if dialect == "bigquery": From a9d6e2720ba383bba46676c9868547942d520901 Mon Sep 17 00:00:00 2001 From: Cody Fincher Date: Sat, 13 Jun 2026 19:31:14 +0000 Subject: [PATCH 4/5] fix: complete native metadata fallbacks --- sqlspec/adapters/adbc/data_dictionary.py | 12 +++- .../test_adbc/test_data_dictionary.py | 57 +++++++++++++++++++ 2 files changed, 66 insertions(+), 3 deletions(-) diff --git a/sqlspec/adapters/adbc/data_dictionary.py b/sqlspec/adapters/adbc/data_dictionary.py index 26876b7cc..f3cc23334 100644 --- a/sqlspec/adapters/adbc/data_dictionary.py +++ b/sqlspec/adapters/adbc/data_dictionary.py @@ -481,9 +481,10 @@ def _native_get_columns( rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) columns = _normalize_native_columns(rows, table_name_exact=table_name) missing_types = [entry for entry in columns if "data_type" not in entry] - if not missing_types: + missing_nullability = any("is_nullable" not in entry for entry in columns) + if not missing_types and not missing_nullability: return columns - if table_name is None: + if table_name is None or missing_nullability: raise _NativeMetadataIncompleteError filters = self._native_object_filters(dialect, schema_name, None) arrow_schema = driver.connection.adbc_get_table_schema( @@ -494,6 +495,8 @@ def _native_get_columns( resolved = type_by_name.get(entry["column_name"]) if resolved is not None: entry["data_type"] = resolved + if any("data_type" not in entry or "is_nullable" not in entry for entry in columns): + raise _NativeMetadataIncompleteError return columns def get_indexes( @@ -627,7 +630,10 @@ def _native_get_foreign_keys( self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" ) -> "list[ForeignKeyMetadata]": rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) - return _normalize_native_foreign_keys(rows, table_name_exact=table_name) + foreign_keys = _normalize_native_foreign_keys(rows, table_name_exact=table_name) + if not foreign_keys: + raise _NativeMetadataIncompleteError + return foreign_keys def get_statistics( self, driver: "AdbcDriver", table: str, schema: "str | None" = None, *, approximate: bool = True diff --git a/tests/unit/adapters/test_adbc/test_data_dictionary.py b/tests/unit/adapters/test_adbc/test_data_dictionary.py index eed391a86..67822775c 100644 --- a/tests/unit/adapters/test_adbc/test_data_dictionary.py +++ b/tests/unit/adapters/test_adbc/test_data_dictionary.py @@ -189,6 +189,32 @@ def test_get_tables_falls_back_to_sql_on_not_supported() -> None: driver.select.assert_called_once() +def test_get_foreign_keys_empty_native_result_falls_back() -> None: + """Native foreign-key discovery should fall back when drivers omit constraint metadata.""" + driver = Mock() + driver.dialect = "duckdb" + driver.connection.adbc_get_objects.return_value = _make_reader([ + { + "catalog_name": "memory", + "catalog_db_schemas": [ + { + "db_schema_name": "main", + "db_schema_tables": [ + {"table_name": "t2", "table_type": "table", "table_columns": [], "table_constraints": []} + ], + } + ], + } + ]) + fallback: list[Any] = [Mock()] + driver.select.return_value = fallback + + result = AdbcDataDictionary().get_foreign_keys(driver, table="t2") + + assert result == fallback + driver.select.assert_called_once() + + def test_get_columns_schema_wide_incomplete_falls_back() -> None: """Schema-wide native column discovery should fall back when type names are incomplete.""" driver = Mock() @@ -203,6 +229,37 @@ def test_get_columns_schema_wide_incomplete_falls_back() -> None: driver.select.assert_called_once() +def test_get_columns_missing_native_nullability_falls_back() -> None: + """Native column discovery should fall back when nullability metadata is incomplete.""" + driver = Mock() + driver.dialect = "duckdb" + driver.connection.adbc_get_objects.return_value = _make_reader([ + { + "catalog_name": "memory", + "catalog_db_schemas": [ + { + "db_schema_name": "main", + "db_schema_tables": [ + { + "table_name": "t2", + "table_type": "table", + "table_columns": [{"column_name": "id", "xdbc_type_name": "INTEGER"}], + "table_constraints": [], + } + ], + } + ], + } + ]) + driver.select.return_value = [{"column_name": "fallback"}] + + result = AdbcDataDictionary().get_columns(driver, table="t2") + + assert result == [{"column_name": "fallback"}] + driver.connection.adbc_get_table_schema.assert_not_called() + driver.select.assert_called_once() + + def test_get_columns_single_table_enriched_from_table_schema() -> None: """Single-table native column discovery should enrich missing types from the Arrow schema.""" driver = Mock() From 088f05a82efa7771bd1f03f62b06d09356a1fa68 Mon Sep 17 00:00:00 2001 From: Cody Fincher Date: Sat, 13 Jun 2026 20:03:54 +0000 Subject: [PATCH 5/5] fix: clarify adbc metadata fallback --- docs/reference/adapters/adbc.rst | 40 ++-- sqlspec/adapters/adbc/data_dictionary.py | 252 +++++++++++------------ 2 files changed, 150 insertions(+), 142 deletions(-) diff --git a/docs/reference/adapters/adbc.rst b/docs/reference/adapters/adbc.rst index bdee8d3d8..8e510b5ce 100644 --- a/docs/reference/adapters/adbc.rst +++ b/docs/reference/adapters/adbc.rst @@ -235,41 +235,49 @@ Data Dictionary Native Metadata And Statistics ============================== -``AdbcDataDictionary`` prefers the standardized ADBC metadata APIs -(``adbc_get_objects``, ``adbc_get_table_schema``) and falls back to -dialect-specific SQL introspection when the underlying driver raises -``NotSupportedError`` or ``OperationalError``. ``get_statistics`` wraps -``adbc_get_statistics`` and has no SQL fallback: unsupported drivers raise +``AdbcDataDictionary`` keeps SQLSpec's central dialect data-dictionary queries +as the canonical fallback. It detects the database behind the ADBC connection +and uses the same dialect query registry as the native adapter when ADBC +metadata is unsupported, incomplete, or too broad. The standardized ADBC +metadata APIs (``adbc_get_objects``, ``adbc_get_table_schema``) are an optional +overlay when the driver returns complete table, column, and foreign-key +payloads that can be normalized to SQLSpec's public metadata types. + +``get_statistics`` is separate from the shared data dictionary surface because +SQLSpec does not define a portable SQL statistics contract. It wraps +``adbc_get_statistics`` directly; unsupported drivers raise :exc:`sqlspec.exceptions.OperationalError`. .. list-table:: ADBC native metadata support (driver manager 1.11.0) :header-rows: 1 * - Backend - - GetObjects (tables/columns/foreign keys) - - GetStatistics + - Metadata behavior + - Statistics behavior * - PostgreSQL - - Native + - Native overlay when available; central PostgreSQL SQL fallback - Native (approximate; run ``ANALYZE`` for fresh estimates) * - SQLite - - Native (type names populated; nullability unreliable) + - Native overlay when available; central SQLite SQL fallback + (type names populated; nullability unreliable) - Unsupported (raises ``OperationalError``) * - DuckDB - - Native for single tables (types filled from the Arrow table schema); - schema-wide column listings fall back to SQL + - Native overlay for single tables when schema enrichment succeeds; + schema-wide column listings use central DuckDB SQL - Unsupported (raises ``OperationalError``) * - Flight SQL / GizmoSQL - - Native (server dependent) + - Native overlay is server dependent; central dialect fallback applies + when the backend dialect is mapped - Server dependent * - BigQuery - - SQL fallback + - Central BigQuery SQL fallback - Unverified Precision limits: -- ADBC name filters are SQL ``LIKE`` patterns; SQLSpec post-filters results by - exact table name, but schema filters containing ``_`` or ``%`` may match - more broadly on the server side. +- ADBC native name filters are SQL ``LIKE`` patterns; SQLSpec post-filters + native results by exact table name, but schema filters containing ``_`` or + ``%`` may match more broadly on the server side before fallback. - The SQLite driver reports ``xdbc_is_nullable`` as ``YES`` even for ``NOT NULL`` columns. - Index metadata always uses SQL introspection; ADBC GetObjects has no diff --git a/sqlspec/adapters/adbc/data_dictionary.py b/sqlspec/adapters/adbc/data_dictionary.py index f3cc23334..5a1c0b5c2 100644 --- a/sqlspec/adapters/adbc/data_dictionary.py +++ b/sqlspec/adapters/adbc/data_dictionary.py @@ -234,66 +234,6 @@ class AdbcDataDictionary(SyncDataDictionaryBase): def __init__(self) -> None: super().__init__() - def _normalize_dialect(self, driver: "AdbcDriver") -> str: - dialect_value = str(driver.dialect) - return normalize_dialect_name(dialect_value) - - def _get_query(self, dialect: str, name: str) -> "SQL": - loader = get_data_dictionary_loader() - return loader.get_query(dialect, name) - - def _get_query_text(self, dialect: str, name: str) -> str: - loader = get_data_dictionary_loader() - return loader.get_query_text(dialect, name) - - def _get_query_text_or_none(self, dialect: str, name: str) -> "str | None": - try: - return self._get_query_text(dialect, name) - except SQLFileNotFoundError: - return None - - def _resolve_schema(self, dialect: str, schema: "str | None") -> "str | None": - try: - config = get_dialect_config(dialect) - except ValueError: - return schema - if schema is not None: - return normalize_identifier(schema, config.name) - if config.default_schema is None: - return None - return normalize_identifier(config.default_schema, config.name) - - def _resolve_identifier(self, dialect: str, identifier: str) -> str: - try: - config = get_dialect_config(dialect) - except ValueError: - return identifier - return normalize_identifier(identifier, config.name) - - def _resolve_feature_flag(self, dialect: str, feature: str, version_info: "VersionInfo | None") -> bool: - try: - config = get_dialect_config(dialect) - except ValueError: - return False - flag = config.get_feature_flag(feature) - if flag is not None: - return flag - required_version = config.get_feature_version(feature) - if required_version is None or version_info is None: - return False - return bool(version_info >= required_version) - - def list_available_features(self) -> "list[str]": - features = set(self.get_default_features()) - for dialect in list_registered_dialects(): - try: - config = get_dialect_config(dialect) - except ValueError: - continue - features.update(config.feature_flags.keys()) - features.update(config.feature_versions.keys()) - return sorted(features) - def get_version(self, driver: "AdbcDriver") -> "VersionInfo | None": """Get database version information based on detected dialect.""" dialect = self._normalize_dialect(driver) @@ -366,6 +306,17 @@ def get_optimal_type(self, driver: "AdbcDriver", type_category: str) -> str: return config.get_optimal_type(type_category) + def list_available_features(self) -> "list[str]": + features = set(self.get_default_features()) + for dialect in list_registered_dialects(): + try: + config = get_dialect_config(dialect) + except ValueError: + continue + features.update(config.feature_flags.keys()) + features.update(config.feature_versions.keys()) + return sorted(features) + def get_tables(self, driver: "AdbcDriver", schema: "str | None" = None) -> "list[TableMetadata]": """Get tables for the current dialect.""" dialect = self._normalize_dialect(driver) @@ -442,63 +393,6 @@ def get_columns( schema_type=ColumnMetadata, ) - def _native_object_filters( - self, dialect: str, schema_name: "str | None", table_name: "str | None" - ) -> "dict[str, str | None]": - catalog_filter: str | None = None - db_schema_filter: str | None = None - if schema_name: - if dialect == "sqlite": - catalog_filter = schema_name - else: - db_schema_filter = schema_name - return {"catalog_filter": catalog_filter, "db_schema_filter": db_schema_filter, "table_name_filter": table_name} - - def _native_get_objects( - self, driver: "AdbcDriver", dialect: str, depth: str, schema_name: "str | None", table_name: "str | None" - ) -> "list[dict[str, Any]]": - filters = self._native_object_filters(dialect, schema_name, table_name) - reader = driver.connection.adbc_get_objects( - depth=depth, - catalog_filter=filters["catalog_filter"], - db_schema_filter=filters["db_schema_filter"], - table_name_filter=filters["table_name_filter"], - ) - rows = reader.read_all().to_pylist() - if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows): - raise _NativeMetadataIncompleteError - return cast("list[dict[str, Any]]", rows) - - def _native_get_tables( - self, driver: "AdbcDriver", dialect: str, schema_name: "str | None" - ) -> "list[TableMetadata]": - rows = self._native_get_objects(driver, dialect, "tables", schema_name, None) - return _normalize_native_tables(rows) - - def _native_get_columns( - self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" - ) -> "list[ColumnMetadata]": - rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) - columns = _normalize_native_columns(rows, table_name_exact=table_name) - missing_types = [entry for entry in columns if "data_type" not in entry] - missing_nullability = any("is_nullable" not in entry for entry in columns) - if not missing_types and not missing_nullability: - return columns - if table_name is None or missing_nullability: - raise _NativeMetadataIncompleteError - filters = self._native_object_filters(dialect, schema_name, None) - arrow_schema = driver.connection.adbc_get_table_schema( - table_name, catalog_filter=filters["catalog_filter"], db_schema_filter=filters["db_schema_filter"] - ) - type_by_name = {field.name: _arrow_type_to_sql(field.type) for field in arrow_schema} - for entry in missing_types: - resolved = type_by_name.get(entry["column_name"]) - if resolved is not None: - entry["data_type"] = resolved - if any("data_type" not in entry or "is_nullable" not in entry for entry in columns): - raise _NativeMetadataIncompleteError - return columns - def get_indexes( self, driver: "AdbcDriver", table: "str | None" = None, schema: "str | None" = None ) -> "list[IndexMetadata]": @@ -626,15 +520,6 @@ def get_foreign_keys( schema_type=ForeignKeyMetadata, ) - def _native_get_foreign_keys( - self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" - ) -> "list[ForeignKeyMetadata]": - rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) - foreign_keys = _normalize_native_foreign_keys(rows, table_name_exact=table_name) - if not foreign_keys: - raise _NativeMetadataIncompleteError - return foreign_keys - def get_statistics( self, driver: "AdbcDriver", table: str, schema: "str | None" = None, *, approximate: bool = True ) -> "list[TableStatisticsMetadata]": @@ -656,3 +541,118 @@ def get_statistics( raise OperationalError(msg) from exc rows = reader.read_all().to_pylist() return [entry for entry in _normalize_native_statistics(rows) if entry["table_name"] == table_name] + + def _normalize_dialect(self, driver: "AdbcDriver") -> str: + dialect_value = str(driver.dialect) + return normalize_dialect_name(dialect_value) + + def _get_query(self, dialect: str, name: str) -> "SQL": + loader = get_data_dictionary_loader() + return loader.get_query(dialect, name) + + def _get_query_text(self, dialect: str, name: str) -> str: + loader = get_data_dictionary_loader() + return loader.get_query_text(dialect, name) + + def _get_query_text_or_none(self, dialect: str, name: str) -> "str | None": + try: + return self._get_query_text(dialect, name) + except SQLFileNotFoundError: + return None + + def _resolve_schema(self, dialect: str, schema: "str | None") -> "str | None": + try: + config = get_dialect_config(dialect) + except ValueError: + return schema + if schema is not None: + return normalize_identifier(schema, config.name) + if config.default_schema is None: + return None + return normalize_identifier(config.default_schema, config.name) + + def _resolve_identifier(self, dialect: str, identifier: str) -> str: + try: + config = get_dialect_config(dialect) + except ValueError: + return identifier + return normalize_identifier(identifier, config.name) + + def _resolve_feature_flag(self, dialect: str, feature: str, version_info: "VersionInfo | None") -> bool: + try: + config = get_dialect_config(dialect) + except ValueError: + return False + flag = config.get_feature_flag(feature) + if flag is not None: + return flag + required_version = config.get_feature_version(feature) + if required_version is None or version_info is None: + return False + return bool(version_info >= required_version) + + def _native_object_filters( + self, dialect: str, schema_name: "str | None", table_name: "str | None" + ) -> "dict[str, str | None]": + catalog_filter: str | None = None + db_schema_filter: str | None = None + if schema_name: + if dialect == "sqlite": + catalog_filter = schema_name + else: + db_schema_filter = schema_name + return {"catalog_filter": catalog_filter, "db_schema_filter": db_schema_filter, "table_name_filter": table_name} + + def _native_get_objects( + self, driver: "AdbcDriver", dialect: str, depth: str, schema_name: "str | None", table_name: "str | None" + ) -> "list[dict[str, Any]]": + filters = self._native_object_filters(dialect, schema_name, table_name) + reader = driver.connection.adbc_get_objects( + depth=depth, + catalog_filter=filters["catalog_filter"], + db_schema_filter=filters["db_schema_filter"], + table_name_filter=filters["table_name_filter"], + ) + rows = reader.read_all().to_pylist() + if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows): + raise _NativeMetadataIncompleteError + return cast("list[dict[str, Any]]", rows) + + def _native_get_tables( + self, driver: "AdbcDriver", dialect: str, schema_name: "str | None" + ) -> "list[TableMetadata]": + rows = self._native_get_objects(driver, dialect, "tables", schema_name, None) + return _normalize_native_tables(rows) + + def _native_get_columns( + self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" + ) -> "list[ColumnMetadata]": + rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) + columns = _normalize_native_columns(rows, table_name_exact=table_name) + missing_types = [entry for entry in columns if "data_type" not in entry] + missing_nullability = any("is_nullable" not in entry for entry in columns) + if not missing_types and not missing_nullability: + return columns + if table_name is None or missing_nullability: + raise _NativeMetadataIncompleteError + filters = self._native_object_filters(dialect, schema_name, None) + arrow_schema = driver.connection.adbc_get_table_schema( + table_name, catalog_filter=filters["catalog_filter"], db_schema_filter=filters["db_schema_filter"] + ) + type_by_name = {field.name: _arrow_type_to_sql(field.type) for field in arrow_schema} + for entry in missing_types: + resolved = type_by_name.get(entry["column_name"]) + if resolved is not None: + entry["data_type"] = resolved + if any("data_type" not in entry or "is_nullable" not in entry for entry in columns): + raise _NativeMetadataIncompleteError + return columns + + def _native_get_foreign_keys( + self, driver: "AdbcDriver", dialect: str, table_name: "str | None", schema_name: "str | None" + ) -> "list[ForeignKeyMetadata]": + rows = self._native_get_objects(driver, dialect, "all", schema_name, table_name) + foreign_keys = _normalize_native_foreign_keys(rows, table_name_exact=table_name) + if not foreign_keys: + raise _NativeMetadataIncompleteError + return foreign_keys