Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d007c64
feat(core): add _coalesce module skeleton with CoalesceOptions and stub
d-v-b Apr 24, 2026
abac6d3
test(core): add failing tests for coalesced_get basic cases
d-v-b Apr 24, 2026
f65018a
feat(core): implement coalesced_get for basic sequential cases
d-v-b Apr 24, 2026
9e1f1d2
test(core): cover Offset/Suffix/None and mixed-cluster cases in coale…
d-v-b Apr 24, 2026
cd5097b
feat(core): run coalesced fetches concurrently under max_concurrency
d-v-b Apr 24, 2026
3a85488
test(core): cover key-missing (start/mid) and fetch-raises in coalesc…
d-v-b Apr 24, 2026
4553523
test(core): cover max_coalesced_bytes cap in coalesced_get
d-v-b Apr 24, 2026
162dd6d
test(core): add coverage-invariant property test for coalesced_get
d-v-b Apr 24, 2026
401e28b
test(core): drop unused HEAVY_MERGE/NO_MERGE constants
d-v-b Apr 24, 2026
913928c
docs(core): shorten coalesced_get docstring summary line
d-v-b Apr 24, 2026
850b9cd
refactor(core): drop dead invariant checks in merged-group path
d-v-b Apr 24, 2026
b2ec638
test(core): cover key-missing on uncoalescable input
d-v-b Apr 24, 2026
a4c3330
fix(core): cancel pending fetches on early exit and stop-after-miss
d-v-b Apr 24, 2026
5b1d8cd
test(core): cover mid-stream miss with concurrency > 1
d-v-b Apr 24, 2026
6aa6f4b
test(core): verify consumer-break cancels pending fetches
d-v-b Apr 24, 2026
dded848
fix(core): type coalesced_get as AsyncGenerator
d-v-b Apr 24, 2026
865baf0
refactor(test): drop redundant pytestmark asyncio
d-v-b Apr 24, 2026
17d9f75
feat(storage): add private SupportsGetRanges protocol
d-v-b Apr 24, 2026
3ab711d
test(storage): add failing tests for FsspecStore.get_ranges
d-v-b Apr 24, 2026
913be10
feat(storage): add FsspecStore.get_ranges and coalesce_options kwarg
d-v-b Apr 24, 2026
0328e01
test(storage): add SupportsGetRanges conformance tests
d-v-b Apr 24, 2026
e7432c5
chore: add towncrier fragment for get_ranges
d-v-b Apr 24, 2026
349bd9c
docs: drop private-symbol mention from get_ranges changelog
d-v-b Apr 24, 2026
79e9927
test: refactor tests
d-v-b Apr 24, 2026
8754f85
test: skip fsspec-backed get_ranges tests on old fsspec
d-v-b Apr 24, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changes/0000.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add `zarr.storage.FsspecStore.get_ranges` for concurrent, coalesced multi-range reads from a single key. A new keyword-only constructor argument `coalesce_options` on `FsspecStore` controls the max gap, max coalesced size, and max concurrency of the underlying requests.
195 changes: 195 additions & 0 deletions src/zarr/core/_coalesce.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# src/zarr/core/_coalesce.py
from __future__ import annotations

import asyncio
from typing import TYPE_CHECKING, TypedDict

if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence

from zarr.abc.store import ByteRequest
from zarr.core.buffer import Buffer


class CoalesceOptions(TypedDict):
"""Knobs for coalescing contiguous byte ranges into fewer I/O requests.

All fields required. See DEFAULT_COALESCE_OPTIONS for a sensible default.
"""

max_gap_bytes: int
"""Two RangeByteRequests separated by at most this many bytes may be merged into one fetch."""
max_coalesced_bytes: int
"""Upper bound on the size of a single merged fetch (ignored for an already-oversized single request)."""
max_concurrency: int
"""Maximum number of merged fetches in flight at once."""


DEFAULT_COALESCE_OPTIONS: CoalesceOptions = {
"max_gap_bytes": 1 << 20, # 1 MiB
"max_coalesced_bytes": 16 << 20, # 16 MiB
"max_concurrency": 10,
}


async def coalesced_get(
fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]],
byte_ranges: Iterable[ByteRequest | None],
*,
options: CoalesceOptions,
) -> AsyncGenerator[Sequence[tuple[int, Buffer | None]], None]:
"""Read many byte ranges through ``fetch`` with coalescing and concurrency.

Nearby ranges are merged into a single underlying I/O (subject to
``options``), and merged fetches are run concurrently. Each yield
corresponds to exactly one underlying I/O operation: a sequence of
``(input_index, result)`` tuples for all input ranges served by that I/O.
Tuples within a yielded sequence are ordered by start offset. Yields across
groups are in completion order, not input order.

Parameters
----------
fetch
Callable that reads one byte range and returns a ``Buffer`` (or ``None``
if the underlying key does not exist). Typically constructed via
``functools.partial(store.get, key, prototype)``.
byte_ranges
Input ranges. ``None`` means "the whole value".
options
Coalescing knobs.

Yields
------
Sequence[tuple[int, Buffer | None]]
Per-I/O batch of ``(input_index, result)`` tuples.

Notes
-----
- Only ``RangeByteRequest`` inputs are coalesced. ``OffsetByteRequest``,
``SuffixByteRequest``, and ``None`` are each treated as uncoalescable
(one fetch, one single-tuple yield per input).
- If any fetch returns ``None`` the iterator stops scheduling further fetches
and completes without yielding the missing group. Groups completed before
the miss remain observable.
- If a fetch raises, the exception propagates on the yield that produced the
failing group; earlier-completed groups remain observable.
"""
# Local import to avoid cycles at module import time.
from zarr.abc.store import RangeByteRequest

indexed: list[tuple[int, ByteRequest | None]] = list(enumerate(byte_ranges))
if not indexed:
return

# Split inputs into coalescable (RangeByteRequest only) and uncoalescable (the rest).
mergeable: list[tuple[int, RangeByteRequest]] = [
(i, r) for i, r in indexed if isinstance(r, RangeByteRequest)
]
uncoalescable: list[tuple[int, ByteRequest | None]] = [
(i, r) for i, r in indexed if not isinstance(r, RangeByteRequest)
]

# Sort mergeables by start offset, then merge.
mergeable.sort(key=lambda pair: pair[1].start)
groups: list[list[tuple[int, RangeByteRequest]]] = []
for pair in mergeable:
_i, r = pair
if groups:
last = groups[-1]
last_end = max(x[1].end for x in last)
gap = r.start - last_end
merged_start = min(x[1].start for x in last)
prospective_end = max(last_end, r.end)
prospective_size = prospective_end - merged_start
if (
gap <= options["max_gap_bytes"]
and prospective_size <= options["max_coalesced_bytes"]
):
last.append(pair)
continue
groups.append([pair])

# Build a uniform list of work items. Each work item is a list of
# (input_index, ByteRequest | None) pairs. Merged groups have multiple
# members (all RangeByteRequest); uncoalescable items have a single member.
work_items: list[list[tuple[int, ByteRequest | None]]] = [
[(idx, r) for idx, r in g] for g in groups
]
work_items.extend([(idx, single)] for idx, single in uncoalescable)

# Completion queue entries are either ("ok", payload), ("missing", None),
# or ("error", exception). Kept as Any internally to avoid dragging
# Sequence out of TYPE_CHECKING.
completion_queue: asyncio.Queue[
tuple[str, Sequence[tuple[int, Buffer | None]] | BaseException | None]
] = asyncio.Queue()
semaphore = asyncio.Semaphore(options["max_concurrency"])

async def run_one(members: list[tuple[int, ByteRequest | None]]) -> None:
try:
async with semaphore:
if len(members) == 1 and not isinstance(members[0][1], RangeByteRequest):
# Uncoalescable single fetch.
idx, single = members[0]
buf = await fetch(single)
if buf is None:
await completion_queue.put(("missing", None))
return
await completion_queue.put(("ok", ((idx, buf),)))
return
# Merged group path: all members are RangeByteRequest.
assert all(isinstance(r, RangeByteRequest) for _, r in members)
starts = [r.start for _, r in members] # type: ignore[union-attr]
ends = [r.end for _, r in members] # type: ignore[union-attr]
group_start = min(starts)
group_end = max(ends)
big = await fetch(RangeByteRequest(group_start, group_end))
if big is None:
await completion_queue.put(("missing", None))
return
ordered = sorted(members, key=lambda pair: pair[1].start) # type: ignore[union-attr]
sliced: list[tuple[int, Buffer | None]] = []
for idx, r in ordered:
sliced.append((idx, big[r.start - group_start : r.end - group_start])) # type: ignore[union-attr]
await completion_queue.put(("ok", tuple(sliced)))
except asyncio.CancelledError:
# Cancellation is expected when we stop scheduling on a missing key.
raise
except BaseException as exc:
await completion_queue.put(("error", exc))

# Launch all work items as tasks. The semaphore bounds actual concurrency.
tasks: set[asyncio.Task[None]] = set()
for item in work_items:
tasks.add(asyncio.create_task(run_one(item)))

try:
pending_error: BaseException | None = None
for _ in range(len(work_items)):
kind, payload = await completion_queue.get()
if kind == "ok":
assert payload is not None
assert not isinstance(payload, BaseException)
yield payload
continue
# "missing" or "error": stop scheduling and cancel pending work.
# Late arrivals that raced to enqueue before cancellation took
# effect sit in the completion queue and are discarded by the
# finally block (the queue is local and will be garbage-collected).
for t in tasks:
if not t.done():
t.cancel()
if kind == "error":
assert isinstance(payload, BaseException)
pending_error = payload
break
if pending_error is not None:
raise pending_error
finally:
# Best-effort cancellation for in-flight tasks (covers the consumer
# break / early-exit case where we did not proactively cancel).
for t in tasks:
if not t.done():
t.cancel()
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
33 changes: 32 additions & 1 deletion src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import json
import warnings
from contextlib import suppress
from functools import partial
from typing import TYPE_CHECKING, Any

from packaging.version import parse as parse_version
Expand All @@ -14,18 +15,24 @@
Store,
SuffixByteRequest,
)
from zarr.core._coalesce import (
DEFAULT_COALESCE_OPTIONS,
CoalesceOptions,
coalesced_get,
)
from zarr.core.buffer import Buffer
from zarr.errors import ZarrUserWarning
from zarr.storage._utils import _join_paths, normalize_path

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable
from collections.abc import AsyncIterator, Iterable, Sequence

from fsspec import AbstractFileSystem
from fsspec.asyn import AsyncFileSystem
from fsspec.mapping import FSMap

from zarr.core.buffer import BufferPrototype
from zarr.storage._protocols import SupportsGetRanges


ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = (
Expand Down Expand Up @@ -124,11 +131,14 @@ def __init__(
read_only: bool = False,
path: str = "/",
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
*,
coalesce_options: CoalesceOptions = DEFAULT_COALESCE_OPTIONS,
) -> None:
super().__init__(read_only=read_only)
self.fs = fs
self.path = normalize_path(path)
self.allowed_exceptions = allowed_exceptions
self.coalesce_options = coalesce_options

if not self.fs.async_impl:
raise TypeError("Filesystem needs to support async operations.")
Expand Down Expand Up @@ -315,6 +325,22 @@ async def get(
else:
return value

async def get_ranges(
self,
key: str,
byte_ranges: Iterable[ByteRequest | None],
*,
prototype: BufferPrototype,
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
"""Read many byte ranges from ``key``, coalescing nearby ranges and fetching concurrently.

See :class:`zarr.storage._protocols.SupportsGetRanges` for the contract and
:func:`zarr.core._coalesce.coalesced_get` for the full semantics.
"""
fetch = partial(self.get, key, prototype)
async for group in coalesced_get(fetch, byte_ranges, options=self.coalesce_options):
yield group

async def set(
self,
key: str,
Expand Down Expand Up @@ -440,3 +466,8 @@ async def getsize(self, key: str) -> int:
else:
# fsspec doesn't have typing. We'll need to assume or verify this is true
return int(size)


# Module-level type assertion: FsspecStore structurally satisfies SupportsGetRanges.
# This line is a no-op at runtime but causes mypy/pyright to complain if the shape drifts.
_: type[SupportsGetRanges] = FsspecStore
34 changes: 34 additions & 0 deletions src/zarr/storage/_protocols.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# src/zarr/storage/_protocols.py
from __future__ import annotations

from typing import TYPE_CHECKING, Protocol, runtime_checkable

if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable, Sequence

from zarr.abc.store import ByteRequest
from zarr.core.buffer import Buffer, BufferPrototype


@runtime_checkable
class SupportsGetRanges(Protocol):
"""Stores that satisfy this protocol can efficiently read many byte ranges
from a single key in a single call, typically via coalescing and concurrent fetch.

Private / unstable. Shape may change before being made public.
"""

def get_ranges(
self,
key: str,
byte_ranges: Iterable[ByteRequest | None],
*,
prototype: BufferPrototype,
) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]:
"""Read many byte ranges from ``key``.

Each yield corresponds to one underlying I/O operation.

See :func:`zarr.core._coalesce.coalesced_get` for full semantics.
"""
...
Loading
Loading