From d007c64ad9200463c97e1dffafd01a4dd66059c4 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:09:40 +0200 Subject: [PATCH 01/25] feat(core): add _coalesce module skeleton with CoalesceOptions and stub --- src/zarr/core/_coalesce.py | 76 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 76 insertions(+) create mode 100644 src/zarr/core/_coalesce.py diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py new file mode 100644 index 0000000000..d155c143f9 --- /dev/null +++ b/src/zarr/core/_coalesce.py @@ -0,0 +1,76 @@ +# src/zarr/core/_coalesce.py +from __future__ import annotations + +from typing import TYPE_CHECKING, TypedDict + +if TYPE_CHECKING: + from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence + + from zarr.abc.store import ByteRequest + from zarr.core.buffer import Buffer + + +class CoalesceOptions(TypedDict): + """Knobs for coalescing contiguous byte ranges into fewer I/O requests. + + All fields required. See DEFAULT_COALESCE_OPTIONS for a sensible default. + """ + + max_gap_bytes: int + """Two RangeByteRequests separated by at most this many bytes may be merged into one fetch.""" + max_coalesced_bytes: int + """Upper bound on the size of a single merged fetch (ignored for an already-oversized single request).""" + max_concurrency: int + """Maximum number of merged fetches in flight at once.""" + + +DEFAULT_COALESCE_OPTIONS: CoalesceOptions = { + "max_gap_bytes": 1 << 20, # 1 MiB + "max_coalesced_bytes": 16 << 20, # 16 MiB + "max_concurrency": 10, +} + + +async def coalesced_get( + fetch: Callable[[ByteRequest | None], Awaitable[Buffer | None]], + byte_ranges: Iterable[ByteRequest | None], + *, + options: CoalesceOptions, +) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: + """Read many byte ranges through ``fetch``, coalescing nearby ranges and firing merged requests concurrently. + + Each yield corresponds to exactly one underlying I/O operation: a sequence of + ``(input_index, result)`` tuples for all input ranges served by that I/O. + Tuples within a yielded sequence are ordered by start offset. Yields across + groups are in completion order, not input order. + + Parameters + ---------- + fetch + Callable that reads one byte range and returns a ``Buffer`` (or ``None`` + if the underlying key does not exist). Typically constructed via + ``functools.partial(store.get, key, prototype)``. + byte_ranges + Input ranges. ``None`` means "the whole value". + options + Coalescing knobs. + + Yields + ------ + Sequence[tuple[int, Buffer | None]] + Per-I/O batch of ``(input_index, result)`` tuples. + + Notes + ----- + - Only ``RangeByteRequest`` inputs are coalesced. ``OffsetByteRequest``, + ``SuffixByteRequest``, and ``None`` are each treated as uncoalescable + (one fetch, one single-tuple yield per input). + - If any fetch returns ``None`` the iterator stops scheduling further fetches + and completes without yielding the missing group. Groups completed before + the miss remain observable. + - If a fetch raises, the exception propagates on the yield that produced the + failing group; earlier-completed groups remain observable. + """ + # Stub body; real implementation filled in by later tasks. + raise NotImplementedError + yield () # type: ignore[unreachable] # pragma: no cover -- keeps this function an async generator From abac6d37a6003ed7bc51bc0bf94c06fd879f0319 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:10:42 +0200 Subject: [PATCH 02/25] test(core): add failing tests for coalesced_get basic cases --- tests/test_coalesce.py | 160 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 tests/test_coalesce.py diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py new file mode 100644 index 0000000000..8642803b11 --- /dev/null +++ b/tests/test_coalesce.py @@ -0,0 +1,160 @@ +# tests/test_coalesce.py +from __future__ import annotations + +from dataclasses import dataclass, field +from typing import TYPE_CHECKING + +import pytest + +from zarr.abc.store import ( + ByteRequest, + OffsetByteRequest, + RangeByteRequest, + SuffixByteRequest, +) +from zarr.core._coalesce import ( + DEFAULT_COALESCE_OPTIONS, + CoalesceOptions, + coalesced_get, +) +from zarr.core.buffer import Buffer, default_buffer_prototype + +if TYPE_CHECKING: + from collections.abc import AsyncIterator, Callable, Sequence + +pytestmark = pytest.mark.asyncio + + +def _buf(data: bytes) -> Buffer: + return default_buffer_prototype().buffer.from_bytes(data) + + +@dataclass +class FakeFetch: + """Records every call and serves canned bytes from an in-memory blob.""" + + blob: bytes + key_exists: bool = True + raise_on: Callable[[ByteRequest | None], bool] | None = None + calls: list[ByteRequest | None] = field(default_factory=list) + + async def __call__(self, byte_range: ByteRequest | None) -> Buffer | None: + self.calls.append(byte_range) + if not self.key_exists: + return None + if self.raise_on is not None and self.raise_on(byte_range): + raise OSError("injected") + if byte_range is None: + return _buf(self.blob) + if isinstance(byte_range, RangeByteRequest): + return _buf(self.blob[byte_range.start : byte_range.end]) + if isinstance(byte_range, OffsetByteRequest): + return _buf(self.blob[byte_range.offset :]) + if isinstance(byte_range, SuffixByteRequest): + return _buf(self.blob[-byte_range.suffix :]) + raise AssertionError(f"unknown byte_range {byte_range!r}") + + +# A permissive options value used by most tests (heavy merging allowed). +HEAVY_MERGE: CoalesceOptions = { + "max_gap_bytes": 1 << 20, + "max_coalesced_bytes": 1 << 30, + "max_concurrency": 10, +} + +# An options value that forbids all merging (gap threshold 0 and any adjacent +# merges would still be allowed at gap==0, so we also cap size). +NO_MERGE: CoalesceOptions = { + "max_gap_bytes": -1, # strictly less-than semantics -- any positive gap breaks + "max_coalesced_bytes": 1 << 30, + "max_concurrency": 10, +} + + +async def _collect( + agen: AsyncIterator[Sequence[tuple[int, Buffer | None]]], +) -> list[list[tuple[int, Buffer | None]]]: + """Drain an async generator of groups into a list of lists of tuples.""" + return [list(group) async for group in agen] + + +def _contents(groups: list[list[tuple[int, Buffer | None]]]) -> dict[int, bytes]: + """Flatten to {index: bytes}.""" + result: dict[int, bytes] = {} + for group in groups: + for idx, buf in group: + assert buf is not None + result[idx] = buf.to_bytes() + return result + + +async def test_empty_input() -> None: + fetch = FakeFetch(b"abc" * 1000) + groups = await _collect(coalesced_get(fetch, [], options=DEFAULT_COALESCE_OPTIONS)) + assert groups == [] + assert fetch.calls == [] + + +async def test_single_range() -> None: + fetch = FakeFetch(b"0123456789") + groups = await _collect( + coalesced_get( + fetch, + [RangeByteRequest(2, 5)], + options=DEFAULT_COALESCE_OPTIONS, + ) + ) + assert len(groups) == 1 + assert len(groups[0]) == 1 + assert _contents(groups) == {0: b"234"} + assert len(fetch.calls) == 1 + + +async def test_fully_disjoint_ranges_each_get_own_group() -> None: + # Ranges 100 bytes apart with max_gap_bytes < 100 will not merge. + fetch = FakeFetch(b"x" * 10_000) + opts: CoalesceOptions = { + "max_gap_bytes": 50, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, + } + ranges: list[ByteRequest | None] = [ + RangeByteRequest(0, 10), + RangeByteRequest(200, 210), + RangeByteRequest(500, 510), + ] + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + assert len(groups) == 3 + assert all(len(g) == 1 for g in groups) + # 3 fetches, one per input. + assert len(fetch.calls) == 3 + + +async def test_adjacent_ranges_merge_into_one_group() -> None: + # Three ranges within 10 bytes of each other; max_gap_bytes=50 -> one merged fetch. + fetch = FakeFetch(b"".join(bytes([i % 256]) for i in range(1000))) + opts: CoalesceOptions = { + "max_gap_bytes": 50, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, + } + ranges: list[ByteRequest | None] = [ + RangeByteRequest(0, 5), + RangeByteRequest(10, 15), + RangeByteRequest(20, 25), + ] + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + assert len(groups) == 1 + assert len(groups[0]) == 3 + # The single fetch should span the full merged region. + assert len(fetch.calls) == 1 + call = fetch.calls[0] + assert isinstance(call, RangeByteRequest) + assert call.start == 0 + assert call.end == 25 + # Contents correct. + assert _contents(groups) == { + 0: bytes(range(5)), + 1: bytes(range(10, 15)), + 2: bytes(range(20, 25)), + } From f65018a1d8a31a948a6ed8062b2e040ba0fae0b0 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:11:26 +0200 Subject: [PATCH 03/25] feat(core): implement coalesced_get for basic sequential cases --- src/zarr/core/_coalesce.py | 60 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 57 insertions(+), 3 deletions(-) diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py index d155c143f9..3d0d01185f 100644 --- a/src/zarr/core/_coalesce.py +++ b/src/zarr/core/_coalesce.py @@ -71,6 +71,60 @@ async def coalesced_get( - If a fetch raises, the exception propagates on the yield that produced the failing group; earlier-completed groups remain observable. """ - # Stub body; real implementation filled in by later tasks. - raise NotImplementedError - yield () # type: ignore[unreachable] # pragma: no cover -- keeps this function an async generator + # Local import to avoid cycles at module import time. + from zarr.abc.store import RangeByteRequest + + indexed: list[tuple[int, ByteRequest | None]] = list(enumerate(byte_ranges)) + if not indexed: + return + + # Split inputs into coalescable (RangeByteRequest only) and uncoalescable (the rest). + mergeable: list[tuple[int, RangeByteRequest]] = [ + (i, r) for i, r in indexed if isinstance(r, RangeByteRequest) + ] + uncoalescable: list[tuple[int, ByteRequest | None]] = [ + (i, r) for i, r in indexed if not isinstance(r, RangeByteRequest) + ] + + # Sort mergeables by start offset, then merge. + mergeable.sort(key=lambda pair: pair[1].start) + groups: list[list[tuple[int, RangeByteRequest]]] = [] + for pair in mergeable: + _i, r = pair + if groups: + last = groups[-1] + last_end = max(x[1].end for x in last) + gap = r.start - last_end + merged_start = min(x[1].start for x in last) + prospective_end = max(last_end, r.end) + prospective_size = prospective_end - merged_start + if ( + gap <= options["max_gap_bytes"] + and prospective_size <= options["max_coalesced_bytes"] + ): + last.append(pair) + continue + groups.append([pair]) + + # For now, serve groups sequentially (concurrency added in Task 5). + for group in groups: + group_start = min(x[1].start for x in group) + group_end = max(x[1].end for x in group) + big = await fetch(RangeByteRequest(group_start, group_end)) + if big is None: + return # key missing, stop yielding + # Slice back into per-input buffers, ordered by start offset. + group.sort(key=lambda pair: pair[1].start) + yielded: list[tuple[int, Buffer | None]] = [] + for i, r in group: + local_start = r.start - group_start + local_end = r.end - group_start + yielded.append((i, big[local_start:local_end])) + yield tuple(yielded) + + # Uncoalescable inputs are fetched one at a time, each as its own one-tuple group. + for idx, single in uncoalescable: + buf = await fetch(single) + if buf is None: + return # key missing, stop yielding + yield ((idx, buf),) From 9e1f1d2e1119fff1f2638b8c6f4dd7267e3f6241 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:12:03 +0200 Subject: [PATCH 04/25] test(core): cover Offset/Suffix/None and mixed-cluster cases in coalesced_get --- tests/test_coalesce.py | 81 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 8642803b11..6d3466d300 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -158,3 +158,84 @@ async def test_adjacent_ranges_merge_into_one_group() -> None: 1: bytes(range(10, 15)), 2: bytes(range(20, 25)), } + + +async def test_offset_and_suffix_and_none_each_get_own_group() -> None: + fetch = FakeFetch(b"abcdefghij") + ranges: list[ByteRequest | None] = [ + RangeByteRequest(0, 3), + OffsetByteRequest(5), + SuffixByteRequest(2), + None, + ] + groups = await _collect(coalesced_get(fetch, ranges, options=DEFAULT_COALESCE_OPTIONS)) + # 1 group from the RangeByteRequest + 3 one-tuple groups from the rest. + assert len(groups) == 4 + # Contents. + flat = _contents(groups) + assert flat[0] == b"abc" + assert flat[1] == b"fghij" + assert flat[2] == b"ij" + assert flat[3] == b"abcdefghij" + + +async def test_indices_preserved_under_shuffled_input() -> None: + fetch = FakeFetch(b"".join(bytes([i % 256]) for i in range(1000))) + # Construct ranges in a deliberately non-sorted order. + ranges: list[ByteRequest | None] = [ + RangeByteRequest(500, 510), + RangeByteRequest(0, 10), + RangeByteRequest(200, 210), + RangeByteRequest(300, 310), + ] + opts: CoalesceOptions = { + "max_gap_bytes": 50, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, + } + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + flat = _contents(groups) + # Indices match original positions, not sorted order. + assert flat[0] == bytes(b % 256 for b in range(500, 510)) + assert flat[1] == bytes(b % 256 for b in range(10)) + assert flat[2] == bytes(b % 256 for b in range(200, 210)) + assert flat[3] == bytes(b % 256 for b in range(300, 310)) + + +async def test_within_group_ordering_is_start_offset() -> None: + fetch = FakeFetch(b"".join(bytes([i % 256]) for i in range(100))) + # Two ranges will merge; one has a later start but is listed first in input. + ranges: list[ByteRequest | None] = [ + RangeByteRequest(20, 25), + RangeByteRequest(0, 5), + ] + opts: CoalesceOptions = { + "max_gap_bytes": 50, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, + } + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + assert len(groups) == 1 + # Within the group, tuples are ordered by start offset. + # Input index 1 (start=0) comes first, then 0 (start=20). + assert [idx for idx, _ in groups[0]] == [1, 0] + + +async def test_mixed_mergeable_and_non_mergeable_counts_correct() -> None: + fetch = FakeFetch(b"x" * 10_000) + opts: CoalesceOptions = { + "max_gap_bytes": 50, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, + } + # Two clusters + one far-away singleton. + ranges: list[ByteRequest | None] = [ + RangeByteRequest(0, 10), + RangeByteRequest(20, 30), + RangeByteRequest(500, 510), + ] + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + assert len(groups) == 2 + # First group has 2, second has 1. + sizes = sorted(len(g) for g in groups) + assert sizes == [1, 2] From cd5097bc9805b5530317c737c93afcf4f50c6236 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:14:19 +0200 Subject: [PATCH 05/25] feat(core): run coalesced fetches concurrently under max_concurrency --- src/zarr/core/_coalesce.py | 116 ++++++++++++++++++++++++++++++------- tests/test_coalesce.py | 30 ++++++++++ 2 files changed, 124 insertions(+), 22 deletions(-) diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py index 3d0d01185f..ae1937c873 100644 --- a/src/zarr/core/_coalesce.py +++ b/src/zarr/core/_coalesce.py @@ -1,6 +1,7 @@ # src/zarr/core/_coalesce.py from __future__ import annotations +import asyncio from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: @@ -106,25 +107,96 @@ async def coalesced_get( continue groups.append([pair]) - # For now, serve groups sequentially (concurrency added in Task 5). - for group in groups: - group_start = min(x[1].start for x in group) - group_end = max(x[1].end for x in group) - big = await fetch(RangeByteRequest(group_start, group_end)) - if big is None: - return # key missing, stop yielding - # Slice back into per-input buffers, ordered by start offset. - group.sort(key=lambda pair: pair[1].start) - yielded: list[tuple[int, Buffer | None]] = [] - for i, r in group: - local_start = r.start - group_start - local_end = r.end - group_start - yielded.append((i, big[local_start:local_end])) - yield tuple(yielded) - - # Uncoalescable inputs are fetched one at a time, each as its own one-tuple group. - for idx, single in uncoalescable: - buf = await fetch(single) - if buf is None: - return # key missing, stop yielding - yield ((idx, buf),) + # Build a uniform list of work items. Each work item is a list of + # (input_index, ByteRequest | None) pairs. Merged groups have multiple + # members (all RangeByteRequest); uncoalescable items have a single member. + work_items: list[list[tuple[int, ByteRequest | None]]] = [ + [(idx, r) for idx, r in g] for g in groups + ] + work_items.extend([(idx, single)] for idx, single in uncoalescable) + + total = len(work_items) + if total == 0: + return + + # Completion queue entries are either ("ok", payload), ("missing", None), + # or ("error", exception). Kept as Any internally to avoid dragging + # Sequence out of TYPE_CHECKING. + completion_queue: asyncio.Queue[ + tuple[str, Sequence[tuple[int, Buffer | None]] | BaseException | None] + ] = asyncio.Queue() + semaphore = asyncio.Semaphore(options["max_concurrency"]) + + async def run_one(members: list[tuple[int, ByteRequest | None]]) -> None: + try: + async with semaphore: + if len(members) == 1 and not isinstance(members[0][1], RangeByteRequest): + # Uncoalescable single fetch. + idx, single = members[0] + buf = await fetch(single) + if buf is None: + await completion_queue.put(("missing", None)) + return + await completion_queue.put(("ok", ((idx, buf),))) + return + # Merged group path: all members are RangeByteRequest. + starts = [r.start for _, r in members if isinstance(r, RangeByteRequest)] + ends = [r.end for _, r in members if isinstance(r, RangeByteRequest)] + group_start = min(starts) + group_end = max(ends) + big = await fetch(RangeByteRequest(group_start, group_end)) + if big is None: + await completion_queue.put(("missing", None)) + return + ordered = sorted( + members, + key=lambda pair: pair[1].start if isinstance(pair[1], RangeByteRequest) else 0, + ) + sliced: list[tuple[int, Buffer | None]] = [] + for idx, r in ordered: + assert isinstance(r, RangeByteRequest) + sliced.append((idx, big[r.start - group_start : r.end - group_start])) + await completion_queue.put(("ok", tuple(sliced))) + except asyncio.CancelledError: + # Cancellation is expected when we stop scheduling on a missing key. + raise + except BaseException as exc: + await completion_queue.put(("error", exc)) + + # Launch all work items as tasks. The semaphore bounds actual concurrency. + tasks: set[asyncio.Task[None]] = set() + for item in work_items: + tasks.add(asyncio.create_task(run_one(item))) + + try: + drained = 0 + stopped = False + pending_error: BaseException | None = None + while drained < total: + kind, payload = await completion_queue.get() + drained += 1 + if stopped: + continue # Discard remaining results after a miss or error. + if kind == "ok": + assert payload is not None + assert not isinstance(payload, BaseException) + yield payload + elif kind == "missing": + stopped = True + # Cancel any still-pending tasks to avoid unnecessary I/O. + for t in tasks: + if not t.done(): + t.cancel() + else: # "error" + assert isinstance(payload, BaseException) + stopped = True + pending_error = payload + for t in tasks: + if not t.done(): + t.cancel() + if pending_error is not None: + raise pending_error + finally: + # Ensure we wait for any cancelled tasks to finish so no task escapes. + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 6d3466d300..69d3ee3a70 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -1,6 +1,7 @@ # tests/test_coalesce.py from __future__ import annotations +import asyncio from dataclasses import dataclass, field from typing import TYPE_CHECKING @@ -239,3 +240,32 @@ async def test_mixed_mergeable_and_non_mergeable_counts_correct() -> None: # First group has 2, second has 1. sizes = sorted(len(g) for g in groups) assert sizes == [1, 2] + + +async def test_max_concurrency_is_honored() -> None: + # Build 10 non-mergeable ranges, have the fetch hold a counter of in-flight calls. + in_flight = 0 + peak = 0 + lock = asyncio.Lock() + + async def fetch(byte_range: ByteRequest | None) -> Buffer | None: + nonlocal in_flight, peak + async with lock: + in_flight += 1 + peak = max(peak, in_flight) + # give the scheduler a chance to run other tasks + await asyncio.sleep(0.01) + async with lock: + in_flight -= 1 + return _buf(b"x") + + ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(10)] + opts: CoalesceOptions = { + "max_gap_bytes": 0, # force no merging + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 3, + } + async for _group in coalesced_get(fetch, ranges, options=opts): + pass + assert peak <= 3 + assert peak >= 2 # must have been some real concurrency From 3a85488becdc7a23362c6df3467c544e3a50612e Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:14:49 +0200 Subject: [PATCH 06/25] test(core): cover key-missing (start/mid) and fetch-raises in coalesced_get --- tests/test_coalesce.py | 47 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 69d3ee3a70..6927c07482 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -269,3 +269,50 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: pass assert peak <= 3 assert peak >= 2 # must have been some real concurrency + + +async def test_key_missing_from_first_call_yields_nothing() -> None: + fetch = FakeFetch(b"x" * 100, key_exists=False) + ranges: list[ByteRequest | None] = [RangeByteRequest(0, 10), RangeByteRequest(20, 30)] + groups = await _collect(coalesced_get(fetch, ranges, options=DEFAULT_COALESCE_OPTIONS)) + assert groups == [] + + +async def test_key_missing_mid_stream_yields_earlier_groups_only() -> None: + # Two non-mergeable ranges; the second fetch returns None. + call_count = 0 + + async def fetch(byte_range: ByteRequest | None) -> Buffer | None: + nonlocal call_count + call_count += 1 + # Ensure deterministic ordering: first call serves, second returns None. + await asyncio.sleep(0.01 if call_count == 1 else 0.02) + if call_count >= 2: + return None + return _buf(b"ok") + + opts: CoalesceOptions = { + "max_gap_bytes": -1, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 1, # serialize for determinism + } + ranges: list[ByteRequest | None] = [RangeByteRequest(0, 2), RangeByteRequest(100, 102)] + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + # Exactly one group (the first) -- the second went missing. + assert len(groups) == 1 + assert len(groups[0]) == 1 + + +async def test_fetch_raises_propagates() -> None: + fetch = FakeFetch( + b"x" * 100, + raise_on=lambda r: isinstance(r, RangeByteRequest) and r.start >= 100, + ) + opts: CoalesceOptions = { + "max_gap_bytes": -1, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 1, + } + ranges: list[ByteRequest | None] = [RangeByteRequest(0, 10), RangeByteRequest(200, 210)] + with pytest.raises(OSError, match="injected"): + await _collect(coalesced_get(fetch, ranges, options=opts)) From 4553523aa7179823ed830bd71d55b91ba2061370 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:15:16 +0200 Subject: [PATCH 07/25] test(core): cover max_coalesced_bytes cap in coalesced_get --- tests/test_coalesce.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 6927c07482..053c64d78f 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -316,3 +316,35 @@ async def test_fetch_raises_propagates() -> None: ranges: list[ByteRequest | None] = [RangeByteRequest(0, 10), RangeByteRequest(200, 210)] with pytest.raises(OSError, match="injected"): await _collect(coalesced_get(fetch, ranges, options=opts)) + + +async def test_max_coalesced_bytes_prevents_over_cap_merge() -> None: + fetch = FakeFetch(b"x" * 10_000) + opts: CoalesceOptions = { + "max_gap_bytes": 1000, # allow gap + "max_coalesced_bytes": 50, # but cap the merged size + "max_concurrency": 10, + } + # Two ranges 20 bytes apart, each 20 bytes -- merged span would be 20+20+20=60 > 50. + ranges: list[ByteRequest | None] = [RangeByteRequest(0, 20), RangeByteRequest(40, 60)] + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + assert len(groups) == 2 + assert all(len(g) == 1 for g in groups) + + +async def test_single_range_larger_than_cap_is_passed_through() -> None: + fetch = FakeFetch(b"x" * 10_000) + opts: CoalesceOptions = { + "max_gap_bytes": 1000, + "max_coalesced_bytes": 50, + "max_concurrency": 10, + } + # A single 200-byte range, larger than the cap. Should still be fetched. + ranges: list[ByteRequest | None] = [RangeByteRequest(0, 200)] + groups = await _collect(coalesced_get(fetch, ranges, options=opts)) + assert len(groups) == 1 + assert len(groups[0]) == 1 + idx, buf = groups[0][0] + assert idx == 0 + assert buf is not None + assert buf.to_bytes() == b"x" * 200 From 162dd6de636a16cb6f0f43ad93c27c38e665a70c Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:15:44 +0200 Subject: [PATCH 08/25] test(core): add coverage-invariant property test for coalesced_get --- tests/test_coalesce.py | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 053c64d78f..b34aa32a9d 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -348,3 +348,30 @@ async def test_single_range_larger_than_cap_is_passed_through() -> None: assert idx == 0 assert buf is not None assert buf.to_bytes() == b"x" * 200 + + +async def test_coverage_invariant_random_inputs() -> None: + import random + + rng = random.Random(42) + blob = bytes(i % 256 for i in range(10_000)) + fetch = FakeFetch(blob) + + # Generate 50 random RangeByteRequests within the blob. + ranges: list[ByteRequest | None] = [] + for _ in range(50): + start = rng.randint(0, 9000) + length = rng.randint(1, 500) + ranges.append(RangeByteRequest(start, start + length)) + + groups = await _collect(coalesced_get(fetch, ranges, options=DEFAULT_COALESCE_OPTIONS)) + seen: list[int] = [] + for group in groups: + for idx, _buf in group: + seen.append(idx) + assert sorted(seen) == list(range(len(ranges))) + # And the bytes are correct. + flat = _contents(groups) + for i, r in enumerate(ranges): + assert isinstance(r, RangeByteRequest) + assert flat[i] == blob[r.start : r.end] From 401e28bf681cba272bbb5ed31ac1547924d71583 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:26:11 +0200 Subject: [PATCH 09/25] test(core): drop unused HEAVY_MERGE/NO_MERGE constants Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_coalesce.py | 16 ---------------- 1 file changed, 16 deletions(-) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index b34aa32a9d..25962be6e4 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -56,22 +56,6 @@ async def __call__(self, byte_range: ByteRequest | None) -> Buffer | None: raise AssertionError(f"unknown byte_range {byte_range!r}") -# A permissive options value used by most tests (heavy merging allowed). -HEAVY_MERGE: CoalesceOptions = { - "max_gap_bytes": 1 << 20, - "max_coalesced_bytes": 1 << 30, - "max_concurrency": 10, -} - -# An options value that forbids all merging (gap threshold 0 and any adjacent -# merges would still be allowed at gap==0, so we also cap size). -NO_MERGE: CoalesceOptions = { - "max_gap_bytes": -1, # strictly less-than semantics -- any positive gap breaks - "max_coalesced_bytes": 1 << 30, - "max_concurrency": 10, -} - - async def _collect( agen: AsyncIterator[Sequence[tuple[int, Buffer | None]]], ) -> list[list[tuple[int, Buffer | None]]]: From 913928ce1513452275d1d3333a61f95506652633 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:26:41 +0200 Subject: [PATCH 10/25] docs(core): shorten coalesced_get docstring summary line Split the overlong first line into a short numpydoc summary plus an extended description. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/zarr/core/_coalesce.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py index ae1937c873..216b8fd9a8 100644 --- a/src/zarr/core/_coalesce.py +++ b/src/zarr/core/_coalesce.py @@ -38,9 +38,11 @@ async def coalesced_get( *, options: CoalesceOptions, ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: - """Read many byte ranges through ``fetch``, coalescing nearby ranges and firing merged requests concurrently. + """Read many byte ranges through ``fetch`` with coalescing and concurrency. - Each yield corresponds to exactly one underlying I/O operation: a sequence of + Nearby ranges are merged into a single underlying I/O (subject to + ``options``), and merged fetches are run concurrently. Each yield + corresponds to exactly one underlying I/O operation: a sequence of ``(input_index, result)`` tuples for all input ranges served by that I/O. Tuples within a yielded sequence are ordered by start offset. Yields across groups are in completion order, not input order. From 850b9cd92d5ec9be769baa5646371d2ce67fde17 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:27:05 +0200 Subject: [PATCH 11/25] refactor(core): drop dead invariant checks in merged-group path After the input split at the top of coalesced_get, merged groups only ever contain RangeByteRequest members. Replace the per-element isinstance filters (and the defensive ``else 0`` sort-key branch) with a single assertion at the top of the merged-group block and direct attribute access. Also remove the unreachable ``if total == 0: return`` guard (``indexed`` is non-empty by construction once we pass the earlier guard). Co-Authored-By: Claude Opus 4.7 (1M context) --- src/zarr/core/_coalesce.py | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py index 216b8fd9a8..62d98334ce 100644 --- a/src/zarr/core/_coalesce.py +++ b/src/zarr/core/_coalesce.py @@ -118,8 +118,6 @@ async def coalesced_get( work_items.extend([(idx, single)] for idx, single in uncoalescable) total = len(work_items) - if total == 0: - return # Completion queue entries are either ("ok", payload), ("missing", None), # or ("error", exception). Kept as Any internally to avoid dragging @@ -142,22 +140,19 @@ async def run_one(members: list[tuple[int, ByteRequest | None]]) -> None: await completion_queue.put(("ok", ((idx, buf),))) return # Merged group path: all members are RangeByteRequest. - starts = [r.start for _, r in members if isinstance(r, RangeByteRequest)] - ends = [r.end for _, r in members if isinstance(r, RangeByteRequest)] + assert all(isinstance(r, RangeByteRequest) for _, r in members) + starts = [r.start for _, r in members] # type: ignore[union-attr] + ends = [r.end for _, r in members] # type: ignore[union-attr] group_start = min(starts) group_end = max(ends) big = await fetch(RangeByteRequest(group_start, group_end)) if big is None: await completion_queue.put(("missing", None)) return - ordered = sorted( - members, - key=lambda pair: pair[1].start if isinstance(pair[1], RangeByteRequest) else 0, - ) + ordered = sorted(members, key=lambda pair: pair[1].start) # type: ignore[union-attr] sliced: list[tuple[int, Buffer | None]] = [] for idx, r in ordered: - assert isinstance(r, RangeByteRequest) - sliced.append((idx, big[r.start - group_start : r.end - group_start])) + sliced.append((idx, big[r.start - group_start : r.end - group_start])) # type: ignore[union-attr] await completion_queue.put(("ok", tuple(sliced))) except asyncio.CancelledError: # Cancellation is expected when we stop scheduling on a missing key. From b2ec638f76847dd52512a88ec3e5f6e1db797a3f Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:27:30 +0200 Subject: [PATCH 12/25] test(core): cover key-missing on uncoalescable input Exercise the ``kind == "missing"`` branch in the uncoalescable single-fetch arm for Offset/Suffix/None inputs, which was not hit by existing tests. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_coalesce.py | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 25962be6e4..1bd020a886 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -287,6 +287,25 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: assert len(groups[0]) == 1 +@pytest.mark.parametrize( + "byte_range", + [ + OffsetByteRequest(5), + SuffixByteRequest(5), + None, + ], + ids=["offset", "suffix", "none"], +) +async def test_key_missing_on_uncoalescable_input_yields_nothing( + byte_range: ByteRequest | None, +) -> None: + # Uncoalescable inputs take a distinct code path from the merged-group + # path; a missing key on that path must still short-circuit cleanly. + fetch = FakeFetch(b"x" * 100, key_exists=False) + groups = await _collect(coalesced_get(fetch, [byte_range], options=DEFAULT_COALESCE_OPTIONS)) + assert groups == [] + + async def test_fetch_raises_propagates() -> None: fetch = FakeFetch( b"x" * 100, From a4c333072298d630819e92d372a1b62868825a13 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:49:52 +0200 Subject: [PATCH 13/25] fix(core): cancel pending fetches on early exit and stop-after-miss Two related correctness issues in coalesced_get's drain loop: 1. When the consumer breaks out of the async-for (early exit), the generator's finally block only awaited in-flight tasks rather than cancelling them. That wasted I/O. Cancel first, then gather. 2. The drain loop waited on completion_queue for ``total`` entries, but after a "missing" or "error" we cancel pending tasks -- and cancelled tasks never enqueue a completion. With max_concurrency > 1 this could hang. Rework the drain loop to break out immediately on the first miss/error; the finally block handles cleanup. The new structure also collapses the redundant miss/error branches and removes the now-unused ``total``/``drained``/``stopped`` bookkeeping. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/zarr/core/_coalesce.py | 36 ++++++++++++++++-------------------- 1 file changed, 16 insertions(+), 20 deletions(-) diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py index 62d98334ce..83f8e8b9f9 100644 --- a/src/zarr/core/_coalesce.py +++ b/src/zarr/core/_coalesce.py @@ -117,8 +117,6 @@ async def coalesced_get( ] work_items.extend([(idx, single)] for idx, single in uncoalescable) - total = len(work_items) - # Completion queue entries are either ("ok", payload), ("missing", None), # or ("error", exception). Kept as Any internally to avoid dragging # Sequence out of TYPE_CHECKING. @@ -166,34 +164,32 @@ async def run_one(members: list[tuple[int, ByteRequest | None]]) -> None: tasks.add(asyncio.create_task(run_one(item))) try: - drained = 0 - stopped = False pending_error: BaseException | None = None - while drained < total: + for _ in range(len(work_items)): kind, payload = await completion_queue.get() - drained += 1 - if stopped: - continue # Discard remaining results after a miss or error. if kind == "ok": assert payload is not None assert not isinstance(payload, BaseException) yield payload - elif kind == "missing": - stopped = True - # Cancel any still-pending tasks to avoid unnecessary I/O. - for t in tasks: - if not t.done(): - t.cancel() - else: # "error" + continue + # "missing" or "error": stop scheduling and cancel pending work. + # Late arrivals that raced to enqueue before cancellation took + # effect sit in the completion queue and are discarded by the + # finally block (the queue is local and will be garbage-collected). + for t in tasks: + if not t.done(): + t.cancel() + if kind == "error": assert isinstance(payload, BaseException) - stopped = True pending_error = payload - for t in tasks: - if not t.done(): - t.cancel() + break if pending_error is not None: raise pending_error finally: - # Ensure we wait for any cancelled tasks to finish so no task escapes. + # Best-effort cancellation for in-flight tasks (covers the consumer + # break / early-exit case where we did not proactively cancel). + for t in tasks: + if not t.done(): + t.cancel() if tasks: await asyncio.gather(*tasks, return_exceptions=True) From 5b1d8cd96955497d571582d7199b09da578fd61a Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:50:24 +0200 Subject: [PATCH 14/25] test(core): cover mid-stream miss with concurrency > 1 Exercises the concurrent path where a missing key is observed while other fetches are still in flight. Uses an asyncio.Event to gate late arrivals until after the miss has been processed, giving the drain loop an opportunity to observe and discard post-stop completions, and verifies the iterator terminates cleanly without hanging or raising. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_coalesce.py | 59 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 59 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 1bd020a886..bfc53610c9 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -287,6 +287,65 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: assert len(groups[0]) == 1 +async def test_key_missing_mid_stream_with_concurrency_drains_late_arrivals() -> None: + # Schedule multiple non-mergeable fetches under max_concurrency=3. + # - Fetch #0 completes FIRST (short sleep) -> at least one yield observed. + # - Fetch #1 returns None shortly after -> triggers the miss path. + # - Fetches #2..#N are gated on an asyncio.Event so they only unblock + # AFTER the miss has been observed, producing late arrivals that + # exercise the `if stopped: continue` discard branch in the drain loop. + late_gate = asyncio.Event() + miss_fired = asyncio.Event() + + async def fetch(byte_range: ByteRequest | None) -> Buffer | None: + assert isinstance(byte_range, RangeByteRequest) + start = byte_range.start + if start == 0: + # First to complete: small sleep so it arrives before the miss. + await asyncio.sleep(0.01) + return _buf(b"ok") + if start == 1000: + # Miss: a little later than #0 so #0 yields first. + await asyncio.sleep(0.03) + miss_fired.set() + return None + # Late arrivals: wait until the miss has been processed, then return + # a buffer so the drain loop sees them post-stop. + await asyncio.wait_for(miss_fired.wait(), timeout=5.0) + await asyncio.wait_for(late_gate.wait(), timeout=5.0) + return _buf(b"ok") + + opts: CoalesceOptions = { + "max_gap_bytes": -1, # force no merging (each range its own fetch) + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 3, + } + # Stride by 1000 to avoid merging. 7 items fits within max_concurrency=3 + # while producing pending work after the miss. + ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(7)] + + groups: list[list[tuple[int, Buffer | None]]] = [] + agen = coalesced_get(fetch, ranges, options=opts) + try: + async for group in agen: + groups.append(list(group)) + # After the first yield, release the late gate so remaining + # in-flight tasks can complete and arrive post-stop. + late_gate.set() + finally: + # Guard against a bug preventing any yield: unblock waiters anyway. + late_gate.set() + + # We observed exactly the one pre-miss yield. + assert len(groups) == 1 + assert len(groups[0]) == 1 + idx, buf = groups[0][0] + assert idx == 0 + assert buf is not None + # The iterator completed cleanly without raising. + assert miss_fired.is_set() + + @pytest.mark.parametrize( "byte_range", [ From 6aa6f4b4cc0c1e3628be868b5ba47117aa8cecbe Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:50:45 +0200 Subject: [PATCH 15/25] test(core): verify consumer-break cancels pending fetches Drives many slow ranges with a small max_concurrency, breaks out of the async-for after the first yield, and verifies that at least one still-running fetch was cancelled rather than being left to run to completion. Cancellation is observed via a counter in the fetch's CancelledError branch. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_coalesce.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index bfc53610c9..4df3c7ce44 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -365,6 +365,48 @@ async def test_key_missing_on_uncoalescable_input_yields_nothing( assert groups == [] +async def test_consumer_break_cancels_pending_fetches() -> None: + # Kick off many slow ranges with small max_concurrency, break after the + # first yielded group, and verify the remaining tasks are cancelled rather + # than allowed to run to completion. + completed_calls = 0 + cancelled_calls = 0 + + async def fetch(byte_range: ByteRequest | None) -> Buffer | None: + nonlocal completed_calls, cancelled_calls + assert isinstance(byte_range, RangeByteRequest) + start = byte_range.start + try: + # First fetch returns fast so the async for body runs and can break. + # Later fetches sleep long enough that cancellation has room to land. + await asyncio.sleep(0.001 if start == 0 else 2.0) + except asyncio.CancelledError: + cancelled_calls += 1 + raise + completed_calls += 1 + return _buf(b"x") + + opts: CoalesceOptions = { + "max_gap_bytes": -1, # no merging + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 3, + } + ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(6)] + + agen = coalesced_get(fetch, ranges, options=opts) + # Break after receiving the first yield. + async for _group in agen: + break + # Make sure the async generator is fully closed so its finally runs. + await agen.aclose() + + # At least one slow fetch was actually running under the semaphore and got + # cancelled (rather than running to completion). + assert cancelled_calls >= 1 + # And the first range's fetch completed normally (no spurious cancels there). + assert completed_calls >= 1 + + async def test_fetch_raises_propagates() -> None: fetch = FakeFetch( b"x" * 100, From dded848a44305532e4690e397b67b8b983c60d92 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 15:54:05 +0200 Subject: [PATCH 16/25] fix(core): type coalesced_get as AsyncGenerator coalesced_get is implemented as an async generator (uses yield) and callers need access to aclose() to drive its finally block deterministically. Declaring the return type as AsyncGenerator instead of AsyncIterator exposes aclose()/asend()/athrow() through the type system, matches the runtime object, and lets consumers (e.g. the consumer-break test) avoid type-ignore escape hatches. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/zarr/core/_coalesce.py | 4 ++-- tests/test_coalesce.py | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/src/zarr/core/_coalesce.py b/src/zarr/core/_coalesce.py index 83f8e8b9f9..40fe14bf04 100644 --- a/src/zarr/core/_coalesce.py +++ b/src/zarr/core/_coalesce.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, TypedDict if TYPE_CHECKING: - from collections.abc import AsyncIterator, Awaitable, Callable, Iterable, Sequence + from collections.abc import AsyncGenerator, Awaitable, Callable, Iterable, Sequence from zarr.abc.store import ByteRequest from zarr.core.buffer import Buffer @@ -37,7 +37,7 @@ async def coalesced_get( byte_ranges: Iterable[ByteRequest | None], *, options: CoalesceOptions, -) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: +) -> AsyncGenerator[Sequence[tuple[int, Buffer | None]], None]: """Read many byte ranges through ``fetch`` with coalescing and concurrency. Nearby ranges are merged into a single underlying I/O (subject to diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 4df3c7ce44..c95570f323 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -397,7 +397,8 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: # Break after receiving the first yield. async for _group in agen: break - # Make sure the async generator is fully closed so its finally runs. + # Explicitly close the generator so its finally block runs (cancelling + # in-flight tasks) before we make assertions. await agen.aclose() # At least one slow fetch was actually running under the semaphore and got From 865baf0828874b185b46b7c6fd64e94ff0722f0a Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:05:08 +0200 Subject: [PATCH 17/25] refactor(test): drop redundant pytestmark asyncio pyproject asyncio_mode=auto already covers async test dispatch; the explicit pytestmark was a vestige. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_coalesce.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index c95570f323..41ccc71652 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -23,8 +23,6 @@ if TYPE_CHECKING: from collections.abc import AsyncIterator, Callable, Sequence -pytestmark = pytest.mark.asyncio - def _buf(data: bytes) -> Buffer: return default_buffer_prototype().buffer.from_bytes(data) From 17d9f759b0c4ecbea4be575bfd70f5e8850c13a9 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:07:17 +0200 Subject: [PATCH 18/25] feat(storage): add private SupportsGetRanges protocol --- src/zarr/storage/_protocols.py | 34 ++++++++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 src/zarr/storage/_protocols.py diff --git a/src/zarr/storage/_protocols.py b/src/zarr/storage/_protocols.py new file mode 100644 index 0000000000..086a15dd90 --- /dev/null +++ b/src/zarr/storage/_protocols.py @@ -0,0 +1,34 @@ +# src/zarr/storage/_protocols.py +from __future__ import annotations + +from typing import TYPE_CHECKING, Protocol, runtime_checkable + +if TYPE_CHECKING: + from collections.abc import AsyncIterator, Iterable, Sequence + + from zarr.abc.store import ByteRequest + from zarr.core.buffer import Buffer, BufferPrototype + + +@runtime_checkable +class SupportsGetRanges(Protocol): + """Stores that satisfy this protocol can efficiently read many byte ranges + from a single key in a single call, typically via coalescing and concurrent fetch. + + Private / unstable. Shape may change before being made public. + """ + + def get_ranges( + self, + key: str, + byte_ranges: Iterable[ByteRequest | None], + *, + prototype: BufferPrototype, + ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: + """Read many byte ranges from ``key``. + + Each yield corresponds to one underlying I/O operation. + + See :func:`zarr.core._coalesce.coalesced_get` for full semantics. + """ + ... From 3ab711d64906e01d0fc21f5dce8e08b4c4c13f81 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:09:02 +0200 Subject: [PATCH 19/25] test(storage): add failing tests for FsspecStore.get_ranges --- tests/test_store/test_fsspec_get_ranges.py | 122 +++++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 tests/test_store/test_fsspec_get_ranges.py diff --git a/tests/test_store/test_fsspec_get_ranges.py b/tests/test_store/test_fsspec_get_ranges.py new file mode 100644 index 0000000000..9eb517095a --- /dev/null +++ b/tests/test_store/test_fsspec_get_ranges.py @@ -0,0 +1,122 @@ +# tests/test_store/test_fsspec_get_ranges.py +"""Lightweight integration tests for FsspecStore.get_ranges using MemoryFileSystem. + +These don't need moto/s3 — they exercise the new method against an in-process +fsspec MemoryFileSystem wrapped in the async wrapper. +""" + +from __future__ import annotations + +import pytest + +from zarr.abc.store import RangeByteRequest +from zarr.core._coalesce import DEFAULT_COALESCE_OPTIONS, CoalesceOptions +from zarr.core.buffer import Buffer, default_buffer_prototype +from zarr.storage import FsspecStore +from zarr.storage._fsspec import _make_async + +fsspec = pytest.importorskip("fsspec") + + +@pytest.fixture +def memory_store() -> FsspecStore: + """An FsspecStore backed by fsspec MemoryFileSystem (wrapped async).""" + from fsspec.implementations.memory import MemoryFileSystem + + # Each test gets a clean filesystem; MemoryFileSystem is a singleton per target_options, + # so clear state explicitly. + fs: MemoryFileSystem = MemoryFileSystem() + fs.store.clear() + fs.pseudo_dirs.clear() + async_fs = _make_async(fs) + return FsspecStore(fs=async_fs, path="/root") + + +async def _write(store: FsspecStore, key: str, data: bytes) -> None: + buf = default_buffer_prototype().buffer.from_bytes(data) + await store.set(key, buf) + + +async def test_get_ranges_happy_path(memory_store: FsspecStore) -> None: + blob = bytes(i % 256 for i in range(1024)) + await _write(memory_store, "blob", blob) + proto = default_buffer_prototype() + + ranges = [ + RangeByteRequest(0, 10), + RangeByteRequest(100, 110), + RangeByteRequest(500, 520), + ] + groups: list[list[tuple[int, Buffer | None]]] = [ + list(group) async for group in memory_store.get_ranges("blob", ranges, prototype=proto) + ] + + flat: dict[int, bytes] = {} + for group in groups: + for idx, buf in group: + assert buf is not None + flat[idx] = buf.to_bytes() + + assert flat[0] == blob[0:10] + assert flat[1] == blob[100:110] + assert flat[2] == blob[500:520] + + +async def test_get_ranges_missing_key_yields_nothing(memory_store: FsspecStore) -> None: + proto = default_buffer_prototype() + groups: list[list[tuple[int, Buffer | None]]] = [ + list(group) + async for group in memory_store.get_ranges( + "does-not-exist", [RangeByteRequest(0, 10)], prototype=proto + ) + ] + assert groups == [] + + +async def test_default_coalesce_options_on_store_without_arg() -> None: + from fsspec.implementations.memory import MemoryFileSystem + + fs = MemoryFileSystem() + fs.store.clear() + store = FsspecStore(fs=_make_async(fs), path="/x") + assert store.coalesce_options == DEFAULT_COALESCE_OPTIONS + + +async def test_coalesce_options_wired_through() -> None: + from fsspec.implementations.memory import MemoryFileSystem + + fs = MemoryFileSystem() + fs.store.clear() + custom: CoalesceOptions = { + "max_gap_bytes": 0, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 2, + } + store = FsspecStore(fs=_make_async(fs), path="/x", coalesce_options=custom) + assert store.coalesce_options == custom + + +async def test_get_ranges_mixed_range_types(memory_store: FsspecStore) -> None: + """Covers RangeByteRequest, OffsetByteRequest, SuffixByteRequest, and None in one call.""" + from zarr.abc.store import OffsetByteRequest, SuffixByteRequest + + blob = bytes(i % 256 for i in range(512)) + await _write(memory_store, "mixed", blob) + proto = default_buffer_prototype() + + ranges = [ + RangeByteRequest(0, 10), + OffsetByteRequest(500), + SuffixByteRequest(12), + None, + ] + flat: dict[int, bytes] = {} + async for group in memory_store.get_ranges("mixed", ranges, prototype=proto): + for idx, buf in group: + assert buf is not None + flat[idx] = buf.to_bytes() + + assert flat[0] == blob[0:10] + assert flat[1] == blob[500:] + assert flat[2] == blob[-12:] + assert flat[3] == blob From 913be1067c203080484ef18ccf5f1e4e5a7a703c Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:13:15 +0200 Subject: [PATCH 20/25] feat(storage): add FsspecStore.get_ranges and coalesce_options kwarg --- src/zarr/storage/_fsspec.py | 33 +++++++++++++++++++++- tests/test_store/test_fsspec_get_ranges.py | 4 +-- 2 files changed, 34 insertions(+), 3 deletions(-) diff --git a/src/zarr/storage/_fsspec.py b/src/zarr/storage/_fsspec.py index 74e5869a66..d967e64ca2 100644 --- a/src/zarr/storage/_fsspec.py +++ b/src/zarr/storage/_fsspec.py @@ -3,6 +3,7 @@ import json import warnings from contextlib import suppress +from functools import partial from typing import TYPE_CHECKING, Any from packaging.version import parse as parse_version @@ -14,18 +15,24 @@ Store, SuffixByteRequest, ) +from zarr.core._coalesce import ( + DEFAULT_COALESCE_OPTIONS, + CoalesceOptions, + coalesced_get, +) from zarr.core.buffer import Buffer from zarr.errors import ZarrUserWarning from zarr.storage._utils import _join_paths, normalize_path if TYPE_CHECKING: - from collections.abc import AsyncIterator, Iterable + from collections.abc import AsyncIterator, Iterable, Sequence from fsspec import AbstractFileSystem from fsspec.asyn import AsyncFileSystem from fsspec.mapping import FSMap from zarr.core.buffer import BufferPrototype + from zarr.storage._protocols import SupportsGetRanges ALLOWED_EXCEPTIONS: tuple[type[Exception], ...] = ( @@ -124,11 +131,14 @@ def __init__( read_only: bool = False, path: str = "/", allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS, + *, + coalesce_options: CoalesceOptions = DEFAULT_COALESCE_OPTIONS, ) -> None: super().__init__(read_only=read_only) self.fs = fs self.path = normalize_path(path) self.allowed_exceptions = allowed_exceptions + self.coalesce_options = coalesce_options if not self.fs.async_impl: raise TypeError("Filesystem needs to support async operations.") @@ -315,6 +325,22 @@ async def get( else: return value + async def get_ranges( + self, + key: str, + byte_ranges: Iterable[ByteRequest | None], + *, + prototype: BufferPrototype, + ) -> AsyncIterator[Sequence[tuple[int, Buffer | None]]]: + """Read many byte ranges from ``key``, coalescing nearby ranges and fetching concurrently. + + See :class:`zarr.storage._protocols.SupportsGetRanges` for the contract and + :func:`zarr.core._coalesce.coalesced_get` for the full semantics. + """ + fetch = partial(self.get, key, prototype) + async for group in coalesced_get(fetch, byte_ranges, options=self.coalesce_options): + yield group + async def set( self, key: str, @@ -440,3 +466,8 @@ async def getsize(self, key: str) -> int: else: # fsspec doesn't have typing. We'll need to assume or verify this is true return int(size) + + +# Module-level type assertion: FsspecStore structurally satisfies SupportsGetRanges. +# This line is a no-op at runtime but causes mypy/pyright to complain if the shape drifts. +_: type[SupportsGetRanges] = FsspecStore diff --git a/tests/test_store/test_fsspec_get_ranges.py b/tests/test_store/test_fsspec_get_ranges.py index 9eb517095a..b30868a78b 100644 --- a/tests/test_store/test_fsspec_get_ranges.py +++ b/tests/test_store/test_fsspec_get_ranges.py @@ -98,13 +98,13 @@ async def test_coalesce_options_wired_through() -> None: async def test_get_ranges_mixed_range_types(memory_store: FsspecStore) -> None: """Covers RangeByteRequest, OffsetByteRequest, SuffixByteRequest, and None in one call.""" - from zarr.abc.store import OffsetByteRequest, SuffixByteRequest + from zarr.abc.store import ByteRequest, OffsetByteRequest, SuffixByteRequest blob = bytes(i % 256 for i in range(512)) await _write(memory_store, "mixed", blob) proto = default_buffer_prototype() - ranges = [ + ranges: list[ByteRequest | None] = [ RangeByteRequest(0, 10), OffsetByteRequest(500), SuffixByteRequest(12), From 0328e01e2b81550c6b0f54e06da542dcff93bbaa Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:13:34 +0200 Subject: [PATCH 21/25] test(storage): add SupportsGetRanges conformance tests --- tests/test_store/test_protocols.py | 38 ++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 tests/test_store/test_protocols.py diff --git a/tests/test_store/test_protocols.py b/tests/test_store/test_protocols.py new file mode 100644 index 0000000000..c47cf928c3 --- /dev/null +++ b/tests/test_store/test_protocols.py @@ -0,0 +1,38 @@ +# tests/test_store/test_protocols.py +"""Runtime and static conformance tests for zarr.storage._protocols.SupportsGetRanges.""" + +from __future__ import annotations + +import pytest + +from zarr.storage._protocols import SupportsGetRanges + + +def test_fsspec_store_satisfies_supports_get_ranges() -> None: + pytest.importorskip("fsspec") + from fsspec.implementations.memory import MemoryFileSystem + + from zarr.storage import FsspecStore + from zarr.storage._fsspec import _make_async + + fs = MemoryFileSystem() + fs.store.clear() + store = FsspecStore(fs=_make_async(fs), path="/x") + assert isinstance(store, SupportsGetRanges) + + +def test_memory_store_does_not_satisfy_supports_get_ranges() -> None: + """Sanity check: stores that don't implement get_ranges shouldn't satisfy the protocol.""" + from zarr.storage import MemoryStore + + store = MemoryStore() + assert not isinstance(store, SupportsGetRanges) + + +def test_type_assignment_at_module_level() -> None: + """Smoke-test the module-level `_: type[SupportsGetRanges] = FsspecStore`. + + If this runs without error the module imported cleanly; the static check is in mypy. + """ + pytest.importorskip("fsspec") + from zarr.storage import _fsspec # noqa: F401 From e7432c5c67209b7a15465c582d1827f63936457f Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:16:46 +0200 Subject: [PATCH 22/25] chore: add towncrier fragment for get_ranges Used 0000.feature.md as a placeholder; rename to {pr-number}.feature.md once the PR is opened. Co-Authored-By: Claude Opus 4.7 (1M context) --- changes/0000.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 changes/0000.feature.md diff --git a/changes/0000.feature.md b/changes/0000.feature.md new file mode 100644 index 0000000000..288c8af0b4 --- /dev/null +++ b/changes/0000.feature.md @@ -0,0 +1 @@ +Add `zarr.storage.FsspecStore.get_ranges` for concurrent, coalesced multi-range reads from a single key. The method satisfies the private `zarr.storage._protocols.SupportsGetRanges` protocol. A new keyword-only constructor argument `coalesce_options` on `FsspecStore` controls the max gap, max coalesced size, and max concurrency of the underlying requests. From 349bd9c6bc99e30349c817a3fd3d76e37cd05a64 Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 16:58:44 +0200 Subject: [PATCH 23/25] docs: drop private-symbol mention from get_ranges changelog The SupportsGetRanges protocol is private; a user-facing release note shouldn't advertise it. Co-Authored-By: Claude Opus 4.7 (1M context) --- changes/0000.feature.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/changes/0000.feature.md b/changes/0000.feature.md index 288c8af0b4..ce26a8c209 100644 --- a/changes/0000.feature.md +++ b/changes/0000.feature.md @@ -1 +1 @@ -Add `zarr.storage.FsspecStore.get_ranges` for concurrent, coalesced multi-range reads from a single key. The method satisfies the private `zarr.storage._protocols.SupportsGetRanges` protocol. A new keyword-only constructor argument `coalesce_options` on `FsspecStore` controls the max gap, max coalesced size, and max concurrency of the underlying requests. +Add `zarr.storage.FsspecStore.get_ranges` for concurrent, coalesced multi-range reads from a single key. A new keyword-only constructor argument `coalesce_options` on `FsspecStore` controls the max gap, max coalesced size, and max concurrency of the underlying requests. From 79e9927e6ecc05e77ecb08c0ab8c364959d7352f Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 20:43:56 +0200 Subject: [PATCH 24/25] test: refactor tests --- tests/test_coalesce.py | 542 ++++++++++++++++++++++------------------- 1 file changed, 289 insertions(+), 253 deletions(-) diff --git a/tests/test_coalesce.py b/tests/test_coalesce.py index 41ccc71652..68a620ea5d 100644 --- a/tests/test_coalesce.py +++ b/tests/test_coalesce.py @@ -71,161 +71,236 @@ def _contents(groups: list[list[tuple[int, Buffer | None]]]) -> dict[int, bytes] return result -async def test_empty_input() -> None: - fetch = FakeFetch(b"abc" * 1000) - groups = await _collect(coalesced_get(fetch, [], options=DEFAULT_COALESCE_OPTIONS)) - assert groups == [] - assert fetch.calls == [] +# --------------------------------------------------------------------------- +# Shared option values for parametrized structural tests. +# --------------------------------------------------------------------------- + +DEFAULT: CoalesceOptions = DEFAULT_COALESCE_OPTIONS +"""The library default; permissive merging.""" + +MERGE_GAP_50: CoalesceOptions = { + "max_gap_bytes": 50, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, +} +"""Merge ranges within 50 bytes of each other.""" + +NO_MERGE: CoalesceOptions = { + "max_gap_bytes": -1, + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 10, +} +"""No merging: any positive gap is > -1, so no pair ever coalesces.""" + +CAP_50: CoalesceOptions = { + "max_gap_bytes": 1000, + "max_coalesced_bytes": 50, + "max_concurrency": 10, +} +"""Gap permissive but merged size capped at 50 bytes.""" + + +# A deterministic blob used for content-sensitive cases: byte i == (i % 256). +_INDEXED_BLOB = bytes(i % 256 for i in range(10_000)) + + +# --------------------------------------------------------------------------- +# Parametrized structural/content tests (cases without async timing or errors). +# --------------------------------------------------------------------------- + + +@dataclass(frozen=True) +class StructuralCase: + """One row of the parametrized structure-and-contents table.""" + + id: str + """pytest id for the case.""" + ranges: list[ByteRequest | None] + """Input to coalesced_get.""" + options: CoalesceOptions + """Coalescing knobs.""" + expected_group_sizes: list[int] + """Sorted list of group tuple-counts (order-independent).""" + expected_contents: dict[int, bytes] | None = None + """{input_index: bytes} to verify bytes, or None to skip the content check.""" + expected_n_fetches: int | None = None + """Exact number of calls to the fetch callable, or None to skip the check.""" + + +_STRUCTURAL_CASES: list[StructuralCase] = [ + StructuralCase( + id="empty-input", + ranges=[], + options=DEFAULT, + expected_group_sizes=[], + expected_n_fetches=0, + ), + StructuralCase( + id="single-range", + ranges=[RangeByteRequest(2, 5)], + options=DEFAULT, + expected_group_sizes=[1], + expected_contents={0: _INDEXED_BLOB[2:5]}, + expected_n_fetches=1, + ), + StructuralCase( + id="disjoint-3-no-merge", + ranges=[ + RangeByteRequest(0, 10), + RangeByteRequest(200, 210), + RangeByteRequest(500, 510), + ], + options=MERGE_GAP_50, + expected_group_sizes=[1, 1, 1], + expected_contents={ + 0: _INDEXED_BLOB[0:10], + 1: _INDEXED_BLOB[200:210], + 2: _INDEXED_BLOB[500:510], + }, + expected_n_fetches=3, + ), + StructuralCase( + id="adjacent-3-one-merged-group", + ranges=[ + RangeByteRequest(0, 5), + RangeByteRequest(10, 15), + RangeByteRequest(20, 25), + ], + options=MERGE_GAP_50, + expected_group_sizes=[3], + expected_contents={ + 0: _INDEXED_BLOB[0:5], + 1: _INDEXED_BLOB[10:15], + 2: _INDEXED_BLOB[20:25], + }, + expected_n_fetches=1, + ), + StructuralCase( + id="two-clusters-one-singleton", + ranges=[ + RangeByteRequest(0, 10), + RangeByteRequest(20, 30), + RangeByteRequest(500, 510), + ], + options=MERGE_GAP_50, + expected_group_sizes=[1, 2], + expected_contents={ + 0: _INDEXED_BLOB[0:10], + 1: _INDEXED_BLOB[20:30], + 2: _INDEXED_BLOB[500:510], + }, + expected_n_fetches=2, + ), + StructuralCase( + id="uncoalescable-mixed-with-range", + ranges=[ + RangeByteRequest(0, 3), + OffsetByteRequest(5), + SuffixByteRequest(2), + None, + ], + options=DEFAULT, + expected_group_sizes=[1, 1, 1, 1], + expected_contents={ + 0: _INDEXED_BLOB[0:3], + 1: _INDEXED_BLOB[5:], + 2: _INDEXED_BLOB[-2:], + 3: _INDEXED_BLOB, + }, + expected_n_fetches=4, + ), + StructuralCase( + id="shuffled-input-indices-preserved", + ranges=[ + RangeByteRequest(500, 510), + RangeByteRequest(0, 10), + RangeByteRequest(200, 210), + RangeByteRequest(300, 310), + ], + options=MERGE_GAP_50, + expected_group_sizes=[1, 1, 1, 1], + expected_contents={ + 0: _INDEXED_BLOB[500:510], + 1: _INDEXED_BLOB[0:10], + 2: _INDEXED_BLOB[200:210], + 3: _INDEXED_BLOB[300:310], + }, + expected_n_fetches=4, + ), + StructuralCase( + id="cap-prevents-merge-of-close-ranges", + # 20 + 20 gap + 20 = 60-byte merged span > cap of 50. + ranges=[RangeByteRequest(0, 20), RangeByteRequest(40, 60)], + options=CAP_50, + expected_group_sizes=[1, 1], + expected_n_fetches=2, + ), + StructuralCase( + id="single-range-larger-than-cap-passes-through", + # Cap only applies to MERGE decisions; a lone oversized range still fetches. + ranges=[RangeByteRequest(0, 200)], + options=CAP_50, + expected_group_sizes=[1], + expected_contents={0: _INDEXED_BLOB[0:200]}, + expected_n_fetches=1, + ), +] + + +@pytest.mark.parametrize("case", _STRUCTURAL_CASES, ids=lambda c: c.id) +async def test_coalescing_structure_and_contents(case: StructuralCase) -> None: + """Group structure, byte contents, and fetch-call count for the deterministic cases.""" + fetch = FakeFetch(_INDEXED_BLOB) + groups = await _collect(coalesced_get(fetch, case.ranges, options=case.options)) + + assert sorted(len(g) for g in groups) == sorted(case.expected_group_sizes) + + if case.expected_contents is not None: + assert _contents(groups) == case.expected_contents + + if case.expected_n_fetches is not None: + assert len(fetch.calls) == case.expected_n_fetches + + +# --------------------------------------------------------------------------- +# Focused non-parametrized tests for cases with distinctive assertion shapes. +# --------------------------------------------------------------------------- -async def test_single_range() -> None: - fetch = FakeFetch(b"0123456789") - groups = await _collect( - coalesced_get( - fetch, - [RangeByteRequest(2, 5)], - options=DEFAULT_COALESCE_OPTIONS, - ) - ) +async def test_within_group_ordering_is_start_offset() -> None: + """Within a merged group, tuples are ordered by start offset, not input order.""" + fetch = FakeFetch(_INDEXED_BLOB) + # Two ranges that merge; one has a later start but is listed first in input. + ranges: list[ByteRequest | None] = [RangeByteRequest(20, 25), RangeByteRequest(0, 5)] + groups = await _collect(coalesced_get(fetch, ranges, options=MERGE_GAP_50)) assert len(groups) == 1 - assert len(groups[0]) == 1 - assert _contents(groups) == {0: b"234"} - assert len(fetch.calls) == 1 - - -async def test_fully_disjoint_ranges_each_get_own_group() -> None: - # Ranges 100 bytes apart with max_gap_bytes < 100 will not merge. - fetch = FakeFetch(b"x" * 10_000) - opts: CoalesceOptions = { - "max_gap_bytes": 50, - "max_coalesced_bytes": 1 << 20, - "max_concurrency": 10, - } - ranges: list[ByteRequest | None] = [ - RangeByteRequest(0, 10), - RangeByteRequest(200, 210), - RangeByteRequest(500, 510), - ] - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - assert len(groups) == 3 - assert all(len(g) == 1 for g in groups) - # 3 fetches, one per input. - assert len(fetch.calls) == 3 + # Input index 1 (start=0) comes first, then 0 (start=20). + assert [idx for idx, _ in groups[0]] == [1, 0] -async def test_adjacent_ranges_merge_into_one_group() -> None: - # Three ranges within 10 bytes of each other; max_gap_bytes=50 -> one merged fetch. - fetch = FakeFetch(b"".join(bytes([i % 256]) for i in range(1000))) - opts: CoalesceOptions = { - "max_gap_bytes": 50, - "max_coalesced_bytes": 1 << 20, - "max_concurrency": 10, - } +async def test_adjacent_ranges_fire_single_fetch_spanning_merged_region() -> None: + """Verify the merged fetch covers exactly the span from min-start to max-end.""" + fetch = FakeFetch(_INDEXED_BLOB) ranges: list[ByteRequest | None] = [ RangeByteRequest(0, 5), RangeByteRequest(10, 15), RangeByteRequest(20, 25), ] - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - assert len(groups) == 1 - assert len(groups[0]) == 3 - # The single fetch should span the full merged region. + await _collect(coalesced_get(fetch, ranges, options=MERGE_GAP_50)) assert len(fetch.calls) == 1 call = fetch.calls[0] assert isinstance(call, RangeByteRequest) assert call.start == 0 assert call.end == 25 - # Contents correct. - assert _contents(groups) == { - 0: bytes(range(5)), - 1: bytes(range(10, 15)), - 2: bytes(range(20, 25)), - } -async def test_offset_and_suffix_and_none_each_get_own_group() -> None: - fetch = FakeFetch(b"abcdefghij") - ranges: list[ByteRequest | None] = [ - RangeByteRequest(0, 3), - OffsetByteRequest(5), - SuffixByteRequest(2), - None, - ] - groups = await _collect(coalesced_get(fetch, ranges, options=DEFAULT_COALESCE_OPTIONS)) - # 1 group from the RangeByteRequest + 3 one-tuple groups from the rest. - assert len(groups) == 4 - # Contents. - flat = _contents(groups) - assert flat[0] == b"abc" - assert flat[1] == b"fghij" - assert flat[2] == b"ij" - assert flat[3] == b"abcdefghij" - - -async def test_indices_preserved_under_shuffled_input() -> None: - fetch = FakeFetch(b"".join(bytes([i % 256]) for i in range(1000))) - # Construct ranges in a deliberately non-sorted order. - ranges: list[ByteRequest | None] = [ - RangeByteRequest(500, 510), - RangeByteRequest(0, 10), - RangeByteRequest(200, 210), - RangeByteRequest(300, 310), - ] - opts: CoalesceOptions = { - "max_gap_bytes": 50, - "max_coalesced_bytes": 1 << 20, - "max_concurrency": 10, - } - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - flat = _contents(groups) - # Indices match original positions, not sorted order. - assert flat[0] == bytes(b % 256 for b in range(500, 510)) - assert flat[1] == bytes(b % 256 for b in range(10)) - assert flat[2] == bytes(b % 256 for b in range(200, 210)) - assert flat[3] == bytes(b % 256 for b in range(300, 310)) - - -async def test_within_group_ordering_is_start_offset() -> None: - fetch = FakeFetch(b"".join(bytes([i % 256]) for i in range(100))) - # Two ranges will merge; one has a later start but is listed first in input. - ranges: list[ByteRequest | None] = [ - RangeByteRequest(20, 25), - RangeByteRequest(0, 5), - ] - opts: CoalesceOptions = { - "max_gap_bytes": 50, - "max_coalesced_bytes": 1 << 20, - "max_concurrency": 10, - } - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - assert len(groups) == 1 - # Within the group, tuples are ordered by start offset. - # Input index 1 (start=0) comes first, then 0 (start=20). - assert [idx for idx, _ in groups[0]] == [1, 0] - - -async def test_mixed_mergeable_and_non_mergeable_counts_correct() -> None: - fetch = FakeFetch(b"x" * 10_000) - opts: CoalesceOptions = { - "max_gap_bytes": 50, - "max_coalesced_bytes": 1 << 20, - "max_concurrency": 10, - } - # Two clusters + one far-away singleton. - ranges: list[ByteRequest | None] = [ - RangeByteRequest(0, 10), - RangeByteRequest(20, 30), - RangeByteRequest(500, 510), - ] - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - assert len(groups) == 2 - # First group has 2, second has 1. - sizes = sorted(len(g) for g in groups) - assert sizes == [1, 2] +# --------------------------------------------------------------------------- +# Concurrency and cancellation. +# --------------------------------------------------------------------------- async def test_max_concurrency_is_honored() -> None: - # Build 10 non-mergeable ranges, have the fetch hold a counter of in-flight calls. + """With 10 non-mergeable ranges and max_concurrency=3, peak in-flight must not exceed 3.""" in_flight = 0 peak = 0 lock = asyncio.Lock() @@ -253,21 +328,78 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: assert peak >= 2 # must have been some real concurrency +async def test_consumer_break_cancels_pending_fetches() -> None: + """Breaking out of the async for should cancel pending fetches rather than let them complete.""" + completed_calls = 0 + cancelled_calls = 0 + + async def fetch(byte_range: ByteRequest | None) -> Buffer | None: + nonlocal completed_calls, cancelled_calls + assert isinstance(byte_range, RangeByteRequest) + start = byte_range.start + try: + # First fetch returns fast so the async-for body runs and can break. + # Later fetches sleep long enough that cancellation has room to land. + await asyncio.sleep(0.001 if start == 0 else 2.0) + except asyncio.CancelledError: + cancelled_calls += 1 + raise + completed_calls += 1 + return _buf(b"x") + + opts: CoalesceOptions = { + "max_gap_bytes": -1, # no merging + "max_coalesced_bytes": 1 << 20, + "max_concurrency": 3, + } + ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(6)] + + agen = coalesced_get(fetch, ranges, options=opts) + async for _group in agen: + break + # Explicitly close the generator so its finally block runs (cancelling + # in-flight tasks) before we make assertions. + await agen.aclose() + + assert cancelled_calls >= 1 + assert completed_calls >= 1 + + +# --------------------------------------------------------------------------- +# Key-missing semantics. +# --------------------------------------------------------------------------- + + async def test_key_missing_from_first_call_yields_nothing() -> None: + """If the very first fetch returns None, the iterator yields no groups.""" fetch = FakeFetch(b"x" * 100, key_exists=False) ranges: list[ByteRequest | None] = [RangeByteRequest(0, 10), RangeByteRequest(20, 30)] groups = await _collect(coalesced_get(fetch, ranges, options=DEFAULT_COALESCE_OPTIONS)) assert groups == [] +@pytest.mark.parametrize( + "byte_range", + [OffsetByteRequest(5), SuffixByteRequest(5), None], + ids=["offset", "suffix", "none"], +) +async def test_key_missing_on_uncoalescable_input_yields_nothing( + byte_range: ByteRequest | None, +) -> None: + """Uncoalescable inputs take a distinct path; key-missing must still short-circuit.""" + fetch = FakeFetch(b"x" * 100, key_exists=False) + groups = await _collect(coalesced_get(fetch, [byte_range], options=DEFAULT_COALESCE_OPTIONS)) + assert groups == [] + + async def test_key_missing_mid_stream_yields_earlier_groups_only() -> None: - # Two non-mergeable ranges; the second fetch returns None. + """If a later fetch returns None, earlier-completed groups remain observable.""" call_count = 0 async def fetch(byte_range: ByteRequest | None) -> Buffer | None: nonlocal call_count call_count += 1 - # Ensure deterministic ordering: first call serves, second returns None. + # Deterministic: first call serves, second returns None. await asyncio.sleep(0.01 if call_count == 1 else 0.02) if call_count >= 2: return None @@ -280,18 +412,16 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: } ranges: list[ByteRequest | None] = [RangeByteRequest(0, 2), RangeByteRequest(100, 102)] groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - # Exactly one group (the first) -- the second went missing. assert len(groups) == 1 assert len(groups[0]) == 1 async def test_key_missing_mid_stream_with_concurrency_drains_late_arrivals() -> None: - # Schedule multiple non-mergeable fetches under max_concurrency=3. - # - Fetch #0 completes FIRST (short sleep) -> at least one yield observed. - # - Fetch #1 returns None shortly after -> triggers the miss path. - # - Fetches #2..#N are gated on an asyncio.Event so they only unblock - # AFTER the miss has been observed, producing late arrivals that - # exercise the `if stopped: continue` discard branch in the drain loop. + """ + Under max_concurrency > 1, a mid-stream miss should still cause the iterator + to complete cleanly even when unrelated tasks are still in flight and arrive + after the miss has been observed. + """ late_gate = asyncio.Event() miss_fired = asyncio.Event() @@ -299,7 +429,7 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: assert isinstance(byte_range, RangeByteRequest) start = byte_range.start if start == 0: - # First to complete: small sleep so it arrives before the miss. + # First to complete: arrives before the miss. await asyncio.sleep(0.01) return _buf(b"ok") if start == 1000: @@ -314,12 +444,10 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: return _buf(b"ok") opts: CoalesceOptions = { - "max_gap_bytes": -1, # force no merging (each range its own fetch) + "max_gap_bytes": -1, "max_coalesced_bytes": 1 << 20, "max_concurrency": 3, } - # Stride by 1000 to avoid merging. 7 items fits within max_concurrency=3 - # while producing pending work after the miss. ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(7)] groups: list[list[tuple[int, Buffer | None]]] = [] @@ -327,88 +455,27 @@ async def fetch(byte_range: ByteRequest | None) -> Buffer | None: try: async for group in agen: groups.append(list(group)) - # After the first yield, release the late gate so remaining - # in-flight tasks can complete and arrive post-stop. late_gate.set() finally: - # Guard against a bug preventing any yield: unblock waiters anyway. late_gate.set() - # We observed exactly the one pre-miss yield. assert len(groups) == 1 assert len(groups[0]) == 1 idx, buf = groups[0][0] assert idx == 0 assert buf is not None - # The iterator completed cleanly without raising. assert miss_fired.is_set() -@pytest.mark.parametrize( - "byte_range", - [ - OffsetByteRequest(5), - SuffixByteRequest(5), - None, - ], - ids=["offset", "suffix", "none"], -) -async def test_key_missing_on_uncoalescable_input_yields_nothing( - byte_range: ByteRequest | None, -) -> None: - # Uncoalescable inputs take a distinct code path from the merged-group - # path; a missing key on that path must still short-circuit cleanly. - fetch = FakeFetch(b"x" * 100, key_exists=False) - groups = await _collect(coalesced_get(fetch, [byte_range], options=DEFAULT_COALESCE_OPTIONS)) - assert groups == [] - - -async def test_consumer_break_cancels_pending_fetches() -> None: - # Kick off many slow ranges with small max_concurrency, break after the - # first yielded group, and verify the remaining tasks are cancelled rather - # than allowed to run to completion. - completed_calls = 0 - cancelled_calls = 0 - - async def fetch(byte_range: ByteRequest | None) -> Buffer | None: - nonlocal completed_calls, cancelled_calls - assert isinstance(byte_range, RangeByteRequest) - start = byte_range.start - try: - # First fetch returns fast so the async for body runs and can break. - # Later fetches sleep long enough that cancellation has room to land. - await asyncio.sleep(0.001 if start == 0 else 2.0) - except asyncio.CancelledError: - cancelled_calls += 1 - raise - completed_calls += 1 - return _buf(b"x") - - opts: CoalesceOptions = { - "max_gap_bytes": -1, # no merging - "max_coalesced_bytes": 1 << 20, - "max_concurrency": 3, - } - ranges: list[ByteRequest | None] = [RangeByteRequest(i * 1000, i * 1000 + 1) for i in range(6)] - - agen = coalesced_get(fetch, ranges, options=opts) - # Break after receiving the first yield. - async for _group in agen: - break - # Explicitly close the generator so its finally block runs (cancelling - # in-flight tasks) before we make assertions. - await agen.aclose() - - # At least one slow fetch was actually running under the semaphore and got - # cancelled (rather than running to completion). - assert cancelled_calls >= 1 - # And the first range's fetch completed normally (no spurious cancels there). - assert completed_calls >= 1 +# --------------------------------------------------------------------------- +# Error propagation. +# --------------------------------------------------------------------------- async def test_fetch_raises_propagates() -> None: + """An exception raised by fetch propagates on the yield that produced the failing group.""" fetch = FakeFetch( - b"x" * 100, + _INDEXED_BLOB, raise_on=lambda r: isinstance(r, RangeByteRequest) and r.start >= 100, ) opts: CoalesceOptions = { @@ -421,46 +488,18 @@ async def test_fetch_raises_propagates() -> None: await _collect(coalesced_get(fetch, ranges, options=opts)) -async def test_max_coalesced_bytes_prevents_over_cap_merge() -> None: - fetch = FakeFetch(b"x" * 10_000) - opts: CoalesceOptions = { - "max_gap_bytes": 1000, # allow gap - "max_coalesced_bytes": 50, # but cap the merged size - "max_concurrency": 10, - } - # Two ranges 20 bytes apart, each 20 bytes -- merged span would be 20+20+20=60 > 50. - ranges: list[ByteRequest | None] = [RangeByteRequest(0, 20), RangeByteRequest(40, 60)] - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - assert len(groups) == 2 - assert all(len(g) == 1 for g in groups) - - -async def test_single_range_larger_than_cap_is_passed_through() -> None: - fetch = FakeFetch(b"x" * 10_000) - opts: CoalesceOptions = { - "max_gap_bytes": 1000, - "max_coalesced_bytes": 50, - "max_concurrency": 10, - } - # A single 200-byte range, larger than the cap. Should still be fetched. - ranges: list[ByteRequest | None] = [RangeByteRequest(0, 200)] - groups = await _collect(coalesced_get(fetch, ranges, options=opts)) - assert len(groups) == 1 - assert len(groups[0]) == 1 - idx, buf = groups[0][0] - assert idx == 0 - assert buf is not None - assert buf.to_bytes() == b"x" * 200 +# --------------------------------------------------------------------------- +# Property-style coverage invariant. +# --------------------------------------------------------------------------- async def test_coverage_invariant_random_inputs() -> None: + """For any random RangeByteRequest input, every input index appears exactly once.""" import random rng = random.Random(42) - blob = bytes(i % 256 for i in range(10_000)) - fetch = FakeFetch(blob) + fetch = FakeFetch(_INDEXED_BLOB) - # Generate 50 random RangeByteRequests within the blob. ranges: list[ByteRequest | None] = [] for _ in range(50): start = rng.randint(0, 9000) @@ -468,13 +507,10 @@ async def test_coverage_invariant_random_inputs() -> None: ranges.append(RangeByteRequest(start, start + length)) groups = await _collect(coalesced_get(fetch, ranges, options=DEFAULT_COALESCE_OPTIONS)) - seen: list[int] = [] - for group in groups: - for idx, _buf in group: - seen.append(idx) + seen: list[int] = [idx for group in groups for idx, _buf in group] assert sorted(seen) == list(range(len(ranges))) - # And the bytes are correct. + flat = _contents(groups) for i, r in enumerate(ranges): assert isinstance(r, RangeByteRequest) - assert flat[i] == blob[r.start : r.end] + assert flat[i] == _INDEXED_BLOB[r.start : r.end] From 8754f854976a2fcfa9034bcc511bfc96cde5735f Mon Sep 17 00:00:00 2001 From: Davis Vann Bennett Date: Fri, 24 Apr 2026 21:20:29 +0200 Subject: [PATCH 25/25] test: skip fsspec-backed get_ranges tests on old fsspec The min_deps CI job pins fsspec to 2023.10.0, which predates AsyncFileSystemWrapper. Wrapping a sync MemoryFileSystem fails there at fixture setup. Guard the affected tests with the same skipif pattern already used in test_fsspec.py. Co-Authored-By: Claude Opus 4.7 (1M context) --- tests/test_store/test_fsspec_get_ranges.py | 8 ++++++++ tests/test_store/test_protocols.py | 14 ++++++++++++-- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/tests/test_store/test_fsspec_get_ranges.py b/tests/test_store/test_fsspec_get_ranges.py index b30868a78b..a659540182 100644 --- a/tests/test_store/test_fsspec_get_ranges.py +++ b/tests/test_store/test_fsspec_get_ranges.py @@ -8,6 +8,7 @@ from __future__ import annotations import pytest +from packaging.version import parse as parse_version from zarr.abc.store import RangeByteRequest from zarr.core._coalesce import DEFAULT_COALESCE_OPTIONS, CoalesceOptions @@ -17,6 +18,13 @@ fsspec = pytest.importorskip("fsspec") +# AsyncFileSystemWrapper (needed to wrap a sync MemoryFileSystem) landed in fsspec 2024.12.0. +# Older versions are pinned by the min-deps CI job, so skip the whole file there. +pytestmark = pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) + @pytest.fixture def memory_store() -> FsspecStore: diff --git a/tests/test_store/test_protocols.py b/tests/test_store/test_protocols.py index c47cf928c3..9dae19ffba 100644 --- a/tests/test_store/test_protocols.py +++ b/tests/test_store/test_protocols.py @@ -7,9 +7,20 @@ from zarr.storage._protocols import SupportsGetRanges +fsspec = pytest.importorskip("fsspec") +from packaging.version import parse as parse_version # noqa: E402 + +# AsyncFileSystemWrapper (needed to wrap a sync MemoryFileSystem) landed in fsspec 2024.12.0. +# Older versions are pinned by the min-deps CI job. +_needs_async_wrapper = pytest.mark.skipif( + parse_version(fsspec.__version__) < parse_version("2024.12.0"), + reason="No AsyncFileSystemWrapper", +) + + +@_needs_async_wrapper def test_fsspec_store_satisfies_supports_get_ranges() -> None: - pytest.importorskip("fsspec") from fsspec.implementations.memory import MemoryFileSystem from zarr.storage import FsspecStore @@ -34,5 +45,4 @@ def test_type_assignment_at_module_level() -> None: If this runs without error the module imported cleanly; the static check is in mypy. """ - pytest.importorskip("fsspec") from zarr.storage import _fsspec # noqa: F401