Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions packages/bigframes/bigframes/_config/compute_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ class ComputeOptions:
int | None: Number of rows, if set.
"""

enable_peek_cache: bool = False
"""
If enabled, peeking at a relation will pull a larger local sample (e.g. 10k rows)
and cache it locally. Subsequent compatible operations on the relation will run
locally on the cached sample, enabling fast interactive iteration.
"""

peek_cache_size: int = 10000
"""
The size of the local sample to pull and cache when peeking at a relation.
Defaults to 10000.
"""

semantic_ops_confirmation_threshold: Optional[int] = 0
"""
Deprecated.
Expand Down
3 changes: 3 additions & 0 deletions packages/bigframes/bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,9 @@ def __init__(
labels=tuple(labels.items()),
function_manager=self._function_session,
)
from bigframes.session.peek_cache_executor import PeekCacheExecutor

self._executor = PeekCacheExecutor(self._executor, publisher=self._publisher)

def __del__(self):
"""Automatic cleanup of internal resources."""
Expand Down
59 changes: 59 additions & 0 deletions packages/bigframes/bigframes/session/_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import asyncio
import threading
from typing import Optional

_bg_loop: Optional[asyncio.AbstractEventLoop] = None
_bg_thread: Optional[threading.Thread] = None
_bg_lock = threading.Lock()


def _get_bg_loop() -> asyncio.AbstractEventLoop:
global _bg_loop, _bg_thread
with _bg_lock:
if _bg_loop is None:
loop = asyncio.new_event_loop()
_bg_loop = loop

def run():
asyncio.set_event_loop(loop)
loop.run_forever()

_bg_thread = threading.Thread(
target=run, daemon=True, name="bigframes-bg-loop"
)
_bg_thread.start()
return _bg_loop


def run_sync(coro):
"""
Runs a coroutine synchronously, either in the current thread's event loop
if none is running, or by scheduling it on a background thread's event loop.
"""
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop is None:
return asyncio.run(coro)
else:
bg_loop = _get_bg_loop()
future = asyncio.run_coroutine_threadsafe(coro, bg_loop)
return future.result()
44 changes: 5 additions & 39 deletions packages/bigframes/bigframes/session/bq_caching_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import concurrent.futures
import dataclasses
import math
import threading
from typing import Literal, Optional, Sequence, Tuple

import google.api_core.exceptions
Expand Down Expand Up @@ -71,41 +70,7 @@
MAX_SMALL_RESULT_BYTES = 10 * 1024 * 1024 * 1024 # 10G


_bg_loop = None
_bg_thread = None
_bg_lock = threading.Lock()


def _get_bg_loop():
global _bg_loop, _bg_thread
with _bg_lock:
if _bg_loop is None:
loop = asyncio.new_event_loop()
_bg_loop = loop

def run():
asyncio.set_event_loop(loop)
loop.run_forever()

_bg_thread = threading.Thread(
target=run, daemon=True, name="bigframes-bg-loop"
)
_bg_thread.start()
return _bg_loop


def _run_sync(coro):
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop is None:
return asyncio.run(coro)
else:
bg_loop = _get_bg_loop()
future = asyncio.run_coroutine_threadsafe(coro, bg_loop)
return future.result()
from bigframes.session._async import run_sync


class BigQueryCachingExecutor(executor.Executor):
Expand Down Expand Up @@ -182,7 +147,7 @@ def to_sql(
if enable_cache
else array_value.node
)
node = _run_sync(self._substitute_large_local_sources(node))
node = run_sync(self._substitute_large_local_sources(node))
compiled = compile.compile_sql(
compile.CompileRequest(node, sort_rows=ordered),
compiler_name=self._compiler_name,
Expand All @@ -196,7 +161,7 @@ def execute(
) -> executor.ExecuteResult:
# Need to grab thread local before starting async execution.
execution_spec = execution_spec.with_compute_options(bigframes.options.compute)
return _run_sync(
return run_sync(
self._execute_async(
array_value,
execution_spec,
Expand All @@ -209,6 +174,7 @@ async def _execute_async(
execution_spec: ex_spec.ExecutionSpec,
) -> executor.ExecuteResult:
await self._publisher.publish_async(bigframes.core.events.ExecutionStarted())

maybe_result = await self._try_execute_semi_executors(
array_value, execution_spec
)
Expand Down Expand Up @@ -418,7 +384,7 @@ def cached(
bq_compute_options = ex_spec.BqComputeOptions.from_compute_options(
bigframes.options.compute
)
return _run_sync(
return run_sync(
self._cached_async(
array_value, config=config, compute_options=bq_compute_options
)
Expand Down
4 changes: 4 additions & 0 deletions packages/bigframes/bigframes/session/execution_spec.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@ class BqComputeOptions:
enable_multi_query_execution: bool = True
maximum_bytes_billed: Optional[int] = None
extra_query_labels: tuple[tuple[str, str], ...] = ()
enable_peek_cache: bool = False
peek_cache_size: int = 10000

@classmethod
def from_compute_options(cls, compute_options: ComputeOptions) -> BqComputeOptions:
return cls(
enable_multi_query_execution=compute_options.enable_multi_query_execution,
maximum_bytes_billed=compute_options.maximum_bytes_billed,
extra_query_labels=tuple(compute_options.extra_query_labels.items()),
enable_peek_cache=compute_options.enable_peek_cache,
peek_cache_size=compute_options.peek_cache_size,
)

def push_labels(self, labels: Mapping[str, str]) -> BqComputeOptions:
Expand Down
122 changes: 122 additions & 0 deletions packages/bigframes/bigframes/session/peek_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from __future__ import annotations

import dataclasses
import threading
from collections import OrderedDict
from typing import Optional

from bigframes.core import local_data, nodes


@dataclasses.dataclass(frozen=True)
class CachedRelation:
table: local_data.ManagedArrowTable
is_complete: bool = False


class PeekCache:
"""
Thread-safe LRU cache for storing local samples or complete copies of query relations.
This enables fast iteration on subsequent compatible operations.
"""

def __init__(self, capacity: int = 100):
self.capacity = capacity
self._cache: OrderedDict[nodes.BigFrameNode, CachedRelation] = OrderedDict()
self._lock = threading.Lock()

def get(self, key: nodes.BigFrameNode) -> Optional[CachedRelation]:
with self._lock:
if key not in self._cache:
return None
# Move to end (most recently used)
self._cache.move_to_end(key)
return self._cache[key]

def put(
self,
key: nodes.BigFrameNode,
table: local_data.ManagedArrowTable,
is_complete: bool = False,
) -> None:
with self._lock:
value = CachedRelation(table, is_complete)
if key in self._cache:
self._cache.move_to_end(key)
self._cache[key] = value
if len(self._cache) > self.capacity:
self._cache.popitem(last=False)

def clear(self) -> None:
with self._lock:
self._cache.clear()


def substitute_peek_cached_subplans(
root: nodes.BigFrameNode,
peek_cache: PeekCache,
min_rows_required: Optional[int],
) -> nodes.BigFrameNode:
"""
Recursively replaces subplans in the tree that have a cached local relation
in the peek cache with a ReadLocalNode, provided that:
1. The cached relation is complete (contains the entire dataset).
2. Or, all ancestors of the subplan are compatible with running on a sample,
and the cached sample contains at least the required number of rows.
"""
# Intermediate nodes that preserve the semantic validity of a sample.
# WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
# because evaluating them on a sample breaks semantic contracts.
_COMPATIBLE_ANCESTOR_CLASSES = (
nodes.SelectionNode,
nodes.ProjectionNode,
nodes.FilterNode,
nodes.PromoteOffsetsNode,
)

def traverse(
node: nodes.BigFrameNode, ancestors_compatible: bool
) -> nodes.BigFrameNode:
cached_entry = peek_cache.get(node)
if cached_entry is not None:
if cached_entry.is_complete or (
ancestors_compatible
and min_rows_required is not None
and cached_entry.table.data.num_rows >= min_rows_required
):
# Replace the node with a ReadLocalNode containing the cached relation
scan_list = nodes.ScanList(
tuple(
nodes.ScanItem(field.id, field.id.name) for field in node.fields
)
)
session = node.session if node.session is not None else root.session
return nodes.ReadLocalNode(
local_data_source=cached_entry.table,
scan_list=scan_list,
session=session,
)

# If we didn't replace, recursively transform children
is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
next_ancestors_compatible = ancestors_compatible and is_current_compatible

return node.transform_children(
lambda child: traverse(child, next_ancestors_compatible)
)

return traverse(root, True)
Comment on lines +69 to +122

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There are two issues in substitute_peek_cached_subplans:

  1. Correctness / Truncation Bug: If a user requests a larger peek size (e.g., df.head(20000)) than what is currently cached (e.g., 10000 rows), the function will still substitute the cached subplan and return the truncated 10000-row sample. We should only use the cached sample if it contains at least the requested number of rows (min_rows_required).
  2. SQL Escaping Bug: Using field.id.sql as the source_id for ScanItem will return the SQL-escaped representation of the column name (e.g., with backticks or double quotes). This will cause a runtime KeyError or column mismatch when reading from the local Arrow table, which contains raw, unescaped column names. Use field.id.name instead to get the raw column name.
def substitute_peek_cached_subplans(
    root: nodes.BigFrameNode,
    peek_cache: PeekCache,
    min_rows_required: int,
) -> nodes.BigFrameNode:
    """
    Recursively replaces subplans in the tree that have a cached local sample
    in the peek cache with a ReadLocalNode, provided that all ancestors
    of the subplan are compatible with running on a sample, and the cached
    sample contains at least the required number of rows.
    """
    # Intermediate nodes that preserve the semantic validity of a sample.
    # WindowOpNode, AggregateNode, OrderByNode, JoinNode, etc. are excluded
    # because evaluating them on a sample breaks semantic contracts.
    _COMPATIBLE_ANCESTOR_CLASSES = (
        nodes.SelectionNode,
        nodes.ProjectionNode,
        nodes.FilterNode,
        nodes.PromoteOffsetsNode,
    )

    def traverse(node: nodes.BigFrameNode, ancestors_compatible: bool) -> nodes.BigFrameNode:
        if ancestors_compatible:
            cached_sample = peek_cache.get(node)
            if cached_sample is not None and cached_sample.num_rows >= min_rows_required:
                # Replace the node with a ReadLocalNode containing the cached sample
                scan_list = nodes.ScanList(
                    tuple(nodes.ScanItem(field.id, field.id.name) for field in node.fields)
                )
                session = node.session if node.session is not None else root.session
                return nodes.ReadLocalNode(
                    local_data_source=cached_sample,
                    scan_list=scan_list,
                    session=session,
                )

        # If we didn't replace, recursively transform children
        is_current_compatible = isinstance(node, _COMPATIBLE_ANCESTOR_CLASSES)
        next_ancestors_compatible = ancestors_compatible and is_current_compatible

        return node.transform_children(lambda child: traverse(child, next_ancestors_compatible))

    return traverse(root, True)

Loading
Loading