Skip to content

Commit 6a21a91

Browse files
fix(kernel): cursor-state tracking + metadata parity for use_kernel
Brings the use_kernel=True path closer to Thrift parity on the cursor-state-tracking and metadata clusters from the API+CUJ gap audit, and surfaces DML rowcount. Cursor-state tracking (T7): - Set cursor.active_command_id in the _make_result_set chokepoint so every result-producing path (sync execute, async fetch, AND metadata) leaves the cursor pointing at the command that produced the current result set, matching Thrift's unconditional set in _handle_execute_response. Previously metadata calls left it stale. - On a failed sync execute, publish the server-issued statement id (read from the canceller's inflight slot) onto active_command_id before re-raising, so the cursor can correlate the FAILED query. Best-effort; never masks the original failure; left untouched on a pre-id transport failure. - get_execution_result no longer eagerly closes/drops the async handle after the first await_result. The kernel's await_result is idempotent and re-callable, so the handle stays tracked until close_command / close_session (Thrift parity: re-fetchable until explicit close). The prior eager close made a second call raise. - _closed_commands is now a bounded FIFO (OrderedDict capped at 10_000 via _record_closed) so it can't grow unbounded on a long-lived session. Metadata: - Normalize a wildcard/blank catalog ('%' / '*' / '' / None) to None (all-catalogs) across get_schemas/get_tables/get_columns via _catalog_or_none, making the three symmetric (fixes columns, which previously treated catalog as an exact identifier). - Normalize empty/whitespace-only pattern args to None (match-all) via _none_if_blank, mapping the kernel's InvalidArgument-on-"" to Thrift's effective match-all. % / * stay as real LIKE wildcards on patterns. - Drop the connector-side table_types drain + client-side refilter in get_tables; the kernel now filters table_types itself (case-insensitively as of the batch-3 kernel change), so we forward it and let the kernel do the work — restores streaming, removes the duplicated case-sensitive filter. Removes the now-dead _drain_kernel_handle / _StaticArrowHandle helpers and unused imports. - Fix the Cursor.columns() docstring: catalog_name=None is accepted on all backends (kernel issues SHOW COLUMNS across all catalogs), not rejected with ProgrammingError. DML rowcount: - Surface the kernel's num_modified_rows as cursor.rowcount for DML (INSERT/UPDATE/DELETE/MERGE) instead of the hardcoded -1. None (SELECT, or warehouses that don't report it) leaves rowcount at -1, matching the Thrift backend; getattr-guarded against an older kernel wheel. Bumps KERNEL_REV to the batch-3 kernel commit (case-insensitive table_types, HTTP error-envelope parsing, num_modified_rows pyo3 getter). Co-authored-by: Isaac Signed-off-by: Vikrant Puppala <vikrant.puppala@databricks.com>
1 parent 85f8ba3 commit 6a21a91

5 files changed

Lines changed: 626 additions & 186 deletions

File tree

KERNEL_REV

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
f4ee6fec78aabce8c0ea9c1ff47fc11b8191d013
1+
3991d8b4677f9fa8d3bdf607f3db875cd21d3304

src/databricks/sql/backend/kernel/client.py

Lines changed: 137 additions & 140 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,6 @@
1616
1717
- ``query_tags`` on execute is not supported (kernel exposes
1818
``statement_conf`` but PyO3 doesn't surface it).
19-
- ``get_tables`` with a non-empty ``table_types`` filter applies
20-
the filter client-side; today the kernel returns the full
21-
``SHOW TABLES`` shape unchanged. The connector's existing
22-
``ResultSetFilter.filter_tables_by_type`` is keyed on
23-
``SeaResultSet`` not ``KernelResultSet``, so we punt and let
24-
the caller see all rows — documented as a known gap in the
25-
design doc.
2619
- Volume PUT/GET (staging operations): kernel has no Volume API
2720
yet. Users on Thrift-only paths.
2821
"""
@@ -32,7 +25,8 @@
3225
import logging
3326
import threading
3427
import uuid
35-
from typing import Any, Dict, List, Optional, Set, TYPE_CHECKING, Union
28+
from collections import OrderedDict
29+
from typing import Any, Dict, List, Optional, TYPE_CHECKING, Union
3630

3731
from databricks.sql.backend.databricks_client import DatabricksClient
3832
from databricks.sql.backend.kernel._errors import (
@@ -52,7 +46,6 @@
5246
from databricks.sql.exc import (
5347
InterfaceError,
5448
NotSupportedError,
55-
OperationalError,
5649
ProgrammingError,
5750
)
5851
from databricks.sql.thrift_api.TCLIService import ttypes
@@ -76,6 +69,16 @@
7669
# on staging ops it can't service — see ``execute_command``.
7770
_STAGING_VERBS = ("PUT", "GET", "REMOVE")
7871

72+
# Upper bound on the per-session ``_closed_commands`` registry. The set
73+
# only needs to remember *recently* closed async command ids long enough
74+
# for a client still holding the id to poll ``get_query_state`` and see
75+
# ``CLOSED`` (rather than the SUCCEEDED fall-through). Bounding it (FIFO
76+
# eviction) prevents unbounded growth on a long-lived session that opens
77+
# and closes many async commands. An evicted (very old) id degrades from
78+
# CLOSED -> SUCCEEDED in ``get_query_state`` — consistent with the
79+
# never-tracked path, not a correctness break.
80+
_CLOSED_COMMANDS_MAX = 10_000
81+
7982

8083
def _strip_leading_sql_comments(sql: str) -> str:
8184
"""Strip leading whitespace and SQL comments (``-- …`` line and
@@ -107,6 +110,39 @@ def _strip_leading_sql_comments(sql: str) -> str:
107110
return sql[i:]
108111

109112

113+
def _none_if_blank(value: Optional[str]) -> Optional[str]:
114+
"""Map an empty/whitespace-only metadata filter to ``None``
115+
("match all"), matching the Thrift backend's effective behaviour.
116+
117+
The kernel's ``Identifier`` / ``LikePattern`` reject ``""`` with
118+
``InvalidArgument`` (-> ``ProgrammingError``); ``None`` is the
119+
kernel's canonical "match all". Applied to schema / table / column
120+
*pattern* args (which otherwise keep ``%`` / ``_`` as real LIKE
121+
wildcards)."""
122+
if value is None:
123+
return None
124+
return value if value.strip() else None
125+
126+
127+
def _catalog_or_none(value: Optional[str]) -> Optional[str]:
128+
"""Normalise a catalog filter: ``None`` / blank / ``'%'`` / ``'*'``
129+
all mean "all catalogs" -> ``None``.
130+
131+
This makes ``columns(catalog='%')`` behave like
132+
``tables(catalog='%')`` / ``schemas(catalog='%')`` — the kernel
133+
already treats blank/``%``/``*`` as "all catalogs" for SHOW SCHEMAS
134+
/ SHOW TABLES (``is_null_or_wildcard``) but treats the catalog as an
135+
exact identifier for SHOW COLUMNS, so the three diverged. Normalising
136+
connector-side makes them symmetric. This intentionally diverges from
137+
raw-Thrift literalness (Thrift treats ``%`` as a literal catalog
138+
name) in favour of JDBC "catalog is exact-or-all, not a pattern" +
139+
internal consistency. Catalog is the only arg normalised this way;
140+
schema/table/column patterns keep ``%`` / ``*`` as LIKE wildcards."""
141+
if value is None or not value.strip() or value in ("%", "*"):
142+
return None
143+
return value
144+
145+
110146
def _is_staging_statement(operation: str) -> bool:
111147
"""True iff ``operation`` is a volume/staging statement (PUT / GET /
112148
REMOVE).
@@ -219,8 +255,11 @@ def __init__(
219255
# closed (via ``close_command`` or ``close_session``). Lets
220256
# ``get_query_state`` report ``CLOSED`` for them rather than
221257
# the SUCCEEDED fall-through used for the never-tracked sync
222-
# path. Same lock as ``_async_handles``.
223-
self._closed_commands: Set[str] = set()
258+
# path. Same lock as ``_async_handles``. Bounded FIFO (see
259+
# ``_record_closed`` / ``_CLOSED_COMMANDS_MAX``) so it can't grow
260+
# without limit on a long-lived session. Used as an ordered set
261+
# (values are ignored).
262+
self._closed_commands: "OrderedDict[str, None]" = OrderedDict()
224263
self._async_handles_lock = threading.RLock()
225264
# Sync-execute cancellers keyed by ``id(cursor)``. A blocking
226265
# ``execute()`` sets ``cursor.active_command_id`` only AFTER it
@@ -355,7 +394,7 @@ def close_session(self, session_id: SessionId) -> None:
355394
self._async_handles.clear()
356395
self._async_statements.clear()
357396
for guid, _ in tracked:
358-
self._closed_commands.add(guid)
397+
self._record_closed(guid)
359398
for _, handle in tracked:
360399
# Per-handle close errors are non-fatal — PEP 249
361400
# discourages raising from session close — so log and
@@ -487,6 +526,27 @@ def execute_command(
487526
# produced to reap it.
488527
close_stmt = False
489528
except Exception as exc:
529+
# Failed sync execute: publish the server-issued
530+
# statement id (observed mid-execute via the canceller's
531+
# inflight slot, still registered here — the finally pops
532+
# it) so the cursor's query_id reflects the FAILED query,
533+
# matching the Thrift backend which sets active_command_id
534+
# on every execute regardless of outcome. statement_id()
535+
# is None for a pre-id failure (transport error on the
536+
# initial POST) — then leave active_command_id untouched.
537+
# Best-effort; never mask the original failure.
538+
try:
539+
with self._sync_cancellers_lock:
540+
canceller = self._sync_cancellers.get(id(cursor))
541+
stmt_id = (
542+
canceller.statement_id() if canceller is not None else None
543+
)
544+
if stmt_id:
545+
cursor.active_command_id = CommandId.from_sea_statement_id(
546+
stmt_id
547+
)
548+
except Exception:
549+
pass
490550
raise _wrap_kernel_exception("execute_command", exc) from exc
491551
finally:
492552
with self._sync_cancellers_lock:
@@ -502,7 +562,21 @@ def execute_command(
502562
pass
503563

504564
command_id = CommandId.from_sea_statement_id(executed.statement_id)
505-
cursor.active_command_id = command_id
565+
# Surface the affected-row count for DML (INSERT/UPDATE/DELETE/
566+
# MERGE) as ``cursor.rowcount`` instead of the hardcoded ``-1``.
567+
# ``num_modified_rows`` is ``None`` for SELECT (and warehouses
568+
# that don't report it) → leave ``rowcount`` at its ``-1``
569+
# default. ``getattr`` guards against an older kernel wheel that
570+
# predates the pyo3 getter. NB the Thrift backend also hardcodes
571+
# ``-1`` here, so this makes the kernel path *exceed* Thrift.
572+
try:
573+
modified = getattr(executed, "num_modified_rows", None)
574+
if callable(modified):
575+
modified = modified()
576+
except Exception:
577+
modified = None
578+
if modified is not None:
579+
cursor.rowcount = modified
506580
# ``KernelResultSet.__init__`` calls ``arrow_schema()`` which
507581
# can itself raise ``KernelError`` (or, in principle, a PyO3
508582
# native exception) — wrap the construction so callers see a
@@ -574,7 +648,7 @@ def close_command(self, command_id: CommandId) -> None:
574648
if handle is not None:
575649
# Record the close so ``get_query_state`` can report
576650
# ``CLOSED`` (not ``SUCCEEDED``) for this command.
577-
self._closed_commands.add(command_id.guid)
651+
self._record_closed(command_id.guid)
578652
if handle is None:
579653
logger.debug("close_command: no tracked handle for %s", command_id)
580654
# Still drop the parent Statement if somehow tracked without
@@ -650,36 +724,17 @@ def get_execution_result(
650724
stream = async_exec.await_result()
651725
except Exception as exc:
652726
raise _wrap_kernel_exception("get_execution_result", exc) from exc
653-
# The async-exec handle's role ends once it has produced the
654-
# ``ResultStream`` — keeping it around (and tracked in
655-
# ``_async_handles``) would leak the server-side
656-
# ``ExecutedAsyncStatement`` until ``close_session`` swept it
657-
# up, since ``KernelResultSet.close`` only closes the stream
658-
# it wraps. Drop tracking and fire-and-forget the close.
659-
with self._async_handles_lock:
660-
self._async_handles.pop(command_id.guid, None)
661-
stmt = self._async_statements.pop(command_id.guid, None)
662-
self._closed_commands.add(command_id.guid)
663-
try:
664-
async_exec.close()
665-
except Exception as exc:
666-
logger.warning(
667-
"Error closing async_exec after await_result for %s: %s",
668-
command_id,
669-
exc,
670-
)
671-
# The parent Statement is no longer needed once the async handle
672-
# has produced its ResultStream. Close to release server-side
673-
# tracking; matches the sync path's eager Statement close.
674-
if stmt is not None:
675-
try:
676-
stmt.close()
677-
except Exception as exc:
678-
logger.warning(
679-
"Error closing async statement after await_result for %s: %s",
680-
command_id,
681-
exc,
682-
)
727+
# Do NOT close/drop the async handle here. The kernel's
728+
# ``await_result()`` is idempotent and re-callable (it re-polls +
729+
# re-materialises a fresh ``ResultStream`` each time), so keeping
730+
# the handle tracked lets ``get_async_execution_result()`` be
731+
# called more than once — matching the Thrift backend, where the
732+
# operation handle stays valid (re-fetchable) until an explicit
733+
# ``close_command`` / ``close_session``. The prior eager close
734+
# made a second call raise ``ProgrammingError(unknown
735+
# command_id)``. The handle + parent Statement are still reaped
736+
# by ``close_command`` / ``close_session``, so this does not leak.
737+
#
683738
# ``KernelResultSet.__init__`` calls ``arrow_schema()`` which
684739
# can raise — map that to PEP 249 too.
685740
try:
@@ -697,7 +752,17 @@ def _make_result_set(
697752
) -> "ResultSet":
698753
"""Build a ``KernelResultSet`` from any kernel handle. Used
699754
by sync execute, ``get_execution_result``, and all metadata
700-
paths to keep construction in one place."""
755+
paths to keep construction in one place.
756+
757+
Sets ``cursor.active_command_id`` here so every result-producing
758+
path — sync execute, async fetch, AND metadata — leaves the
759+
cursor pointing at the command that produced the current result
760+
set. This matches the Thrift backend, which sets it
761+
unconditionally in ``_handle_execute_response``. Without it,
762+
``cursor.query_id`` / ``get_query_state`` would stay pinned to a
763+
prior query after a metadata call (the metadata methods mint a
764+
synthetic command id but previously never published it)."""
765+
cursor.active_command_id = command_id
701766
return KernelResultSet(
702767
connection=cursor.connection,
703768
backend=self,
@@ -707,6 +772,17 @@ def _make_result_set(
707772
buffer_size_bytes=cursor.buffer_size_bytes,
708773
)
709774

775+
def _record_closed(self, guid: str) -> None:
776+
"""Record an async command guid as closed, bounded FIFO.
777+
778+
Caller must hold ``_async_handles_lock``. Evicts the oldest
779+
entries past ``_CLOSED_COMMANDS_MAX`` so the registry can't grow
780+
unbounded on a long-lived session."""
781+
self._closed_commands[guid] = None
782+
self._closed_commands.move_to_end(guid)
783+
while len(self._closed_commands) > _CLOSED_COMMANDS_MAX:
784+
self._closed_commands.popitem(last=False)
785+
710786
def _synthetic_command_id(self) -> CommandId:
711787
"""Metadata calls don't produce a server statement id; mint
712788
a synthetic UUID so the ``ResultSet`` still has a stable
@@ -746,8 +822,8 @@ def get_schemas(
746822
raise InterfaceError("get_schemas requires an open session.")
747823
try:
748824
stream = self._kernel_session.metadata().list_schemas(
749-
catalog=catalog_name,
750-
schema_pattern=schema_name,
825+
catalog=_catalog_or_none(catalog_name),
826+
schema_pattern=_none_if_blank(schema_name),
751827
)
752828
return self._make_result_set(stream, cursor, self._synthetic_command_id())
753829
except Exception as exc:
@@ -767,45 +843,18 @@ def get_tables(
767843
if self._kernel_session is None:
768844
raise InterfaceError("get_tables requires an open session.")
769845
try:
846+
# ``table_types`` is filtered kernel-side (the kernel applies
847+
# it to the reshaped result, case-insensitively as of the
848+
# batch-3 kernel change), so we forward it and let the kernel
849+
# do the work — no connector-side drain + refilter. Passing it
850+
# through preserves streaming for large schemas.
770851
stream = self._kernel_session.metadata().list_tables(
771-
catalog=catalog_name,
772-
schema_pattern=schema_name,
773-
table_pattern=table_name,
774-
table_types=table_types,
775-
)
776-
if not table_types:
777-
return self._make_result_set(
778-
stream, cursor, self._synthetic_command_id()
779-
)
780-
# The kernel today returns the unfiltered ``SHOW TABLES``
781-
# shape regardless of ``table_types``. Drain to a single
782-
# Arrow table and apply the same client-side filter the
783-
# native SEA backend uses. The filter is **case-sensitive**
784-
# — matches the SEA backend's documented behaviour, and
785-
# mirrors how the warehouse reports the values
786-
# (``TABLE`` / ``VIEW`` / ``SYSTEM_TABLE`` — uppercase).
787-
# Look the column up by name rather than positional index
788-
# so a future kernel reshape of ``SHOW TABLES`` doesn't
789-
# silently filter the wrong column.
790-
from databricks.sql.backend.sea.utils.filters import ResultSetFilter
791-
792-
full_table = _drain_kernel_handle(stream)
793-
if "TABLE_TYPE" not in full_table.schema.names:
794-
raise OperationalError(
795-
"kernel get_tables result is missing a TABLE_TYPE "
796-
f"column; got {full_table.schema.names!r}"
797-
)
798-
filtered_table = ResultSetFilter._filter_arrow_table(
799-
full_table,
800-
column_name="TABLE_TYPE",
801-
allowed_values=table_types,
802-
case_sensitive=True,
803-
)
804-
return self._make_result_set(
805-
_StaticArrowHandle(filtered_table),
806-
cursor,
807-
self._synthetic_command_id(),
852+
catalog=_catalog_or_none(catalog_name),
853+
schema_pattern=_none_if_blank(schema_name),
854+
table_pattern=_none_if_blank(table_name),
855+
table_types=table_types if table_types else None,
808856
)
857+
return self._make_result_set(stream, cursor, self._synthetic_command_id())
809858
except Exception as exc:
810859
raise _wrap_kernel_exception("get_tables", exc) from exc
811860

@@ -830,10 +879,10 @@ def get_columns(
830879
# Thrift backend's `getColumns(null, …)` behaviour from
831880
# the user's perspective.
832881
stream = self._kernel_session.metadata().list_columns(
833-
catalog=catalog_name,
834-
schema_pattern=schema_name,
835-
table_pattern=table_name,
836-
column_pattern=column_name,
882+
catalog=_catalog_or_none(catalog_name),
883+
schema_pattern=_none_if_blank(schema_name),
884+
table_pattern=_none_if_blank(table_name),
885+
column_pattern=_none_if_blank(column_name),
837886
)
838887
return self._make_result_set(stream, cursor, self._synthetic_command_id())
839888
except Exception as exc:
@@ -1006,55 +1055,3 @@ def _read_pem_bytes(path: str, label: str) -> bytes:
10061055
"kernel TLS config."
10071056
)
10081057
return data
1009-
1010-
1011-
def _drain_kernel_handle(handle: Any) -> Any:
1012-
"""Drain a kernel ResultStream / ExecutedStatement into a single
1013-
``pyarrow.Table``. Used by ``get_tables`` to apply a client-side
1014-
``table_types`` filter on a metadata result; cheap because
1015-
metadata streams are small."""
1016-
import pyarrow
1017-
1018-
schema = handle.arrow_schema()
1019-
batches = []
1020-
while True:
1021-
batch = handle.fetch_next_batch()
1022-
if batch is None:
1023-
break
1024-
if batch.num_rows > 0:
1025-
batches.append(batch)
1026-
try:
1027-
handle.close()
1028-
except Exception:
1029-
# Non-fatal — the surrounding ``get_tables`` call has already
1030-
# captured the result data, and the handle's server-side
1031-
# state will be reaped by the kernel's Drop impl.
1032-
pass
1033-
return pyarrow.Table.from_batches(batches, schema=schema)
1034-
1035-
1036-
class _StaticArrowHandle:
1037-
"""Duck-typed kernel handle that replays a pre-built
1038-
``pyarrow.Table`` through ``arrow_schema()`` /
1039-
``fetch_next_batch()`` / ``close()``. Used to wrap a
1040-
post-processed table (e.g., the ``table_types``-filtered output
1041-
of ``get_tables``) so it flows back through the normal
1042-
``KernelResultSet`` path."""
1043-
1044-
def __init__(self, table: Any) -> None:
1045-
self._schema = table.schema
1046-
self._batches = list(table.to_batches())
1047-
self._idx = 0
1048-
1049-
def arrow_schema(self) -> Any:
1050-
return self._schema
1051-
1052-
def fetch_next_batch(self) -> Optional[Any]:
1053-
if self._idx >= len(self._batches):
1054-
return None
1055-
batch = self._batches[self._idx]
1056-
self._idx += 1
1057-
return batch
1058-
1059-
def close(self) -> None:
1060-
self._batches = []

0 commit comments

Comments
 (0)