diff --git a/changes/3715.misc.md b/changes/3715.misc.md new file mode 100644 index 0000000000..caf06d1c54 --- /dev/null +++ b/changes/3715.misc.md @@ -0,0 +1,11 @@ +Added several performance optimizations to chunk encoding and decoding. Low-latency stores that do not benefit from +`async` operations can now implement synchronous IO methods which will be used when available during chunk processing. +Similarly, codecs can implement a synchronous API which will be used if available during chunk processing. +These changes remove unnecessary interactions with the event loop. + +The synchronous chunk processing path optionally uses a thread pool to parallelize codec work across chunks. +The pool is skipped for single-chunk operations and for pipelines that only contain cheap codecs (e.g. endian +swap, transpose, checksum). + +Use of the thread pool can be disabled in the global configuration. The minimum number of threads +and the maximum number of threads can be set via the configuration as well. diff --git a/src/zarr/abc/codec.py b/src/zarr/abc/codec.py index d41c457b4e..b7271a13ef 100644 --- a/src/zarr/abc/codec.py +++ b/src/zarr/abc/codec.py @@ -2,7 +2,7 @@ from abc import abstractmethod from collections.abc import Mapping -from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar +from typing import TYPE_CHECKING, Generic, Protocol, TypeGuard, TypeVar, runtime_checkable from typing_extensions import ReadOnly, TypedDict @@ -32,6 +32,7 @@ "CodecInput", "CodecOutput", "CodecPipeline", + "SupportsSyncCodec", ] CodecInput = TypeVar("CodecInput", bound=NDBuffer | Buffer) @@ -59,6 +60,19 @@ def _check_codecjson_v2(data: object) -> TypeGuard[CodecJSON_V2[str]]: """The widest type of JSON-like input that could specify a codec.""" +@runtime_checkable +class SupportsSyncCodec(Protocol): + """Protocol for codecs that support synchronous encode/decode.""" + + def _decode_sync( + self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec + ) -> NDBuffer | Buffer: ... + + def _encode_sync( + self, chunk_data: NDBuffer | Buffer, chunk_spec: ArraySpec + ) -> NDBuffer | Buffer | None: ... + + class BaseCodec(Metadata, Generic[CodecInput, CodecOutput]): """Generic base class for codecs. @@ -459,6 +473,59 @@ async def write( """ ... + # ------------------------------------------------------------------- + # Fully synchronous read/write (opt-in) + # + # When a CodecPipeline subclass can run the entire read/write path + # (store IO + codec compute + buffer scatter) without touching the + # event loop, it overrides these methods and sets supports_sync_io + # to True. This lets Array selection methods bypass sync() entirely. + # + # The default implementations raise NotImplementedError. + # BatchedCodecPipeline overrides these when all codecs support sync. + # ------------------------------------------------------------------- + + @property + def supports_sync_io(self) -> bool: + """Whether this pipeline can run read/write entirely on the calling thread. + + True when: + - All codecs implement ``SupportsSyncCodec`` + - The pipeline's read_sync/write_sync methods are implemented + + Checked by ``Array._can_use_sync_path()`` to decide whether to bypass + the ``sync()`` event-loop bridge. + """ + return False + + def read_sync( + self, + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + out: NDBuffer, + drop_axes: tuple[int, ...] = (), + ) -> None: + """Synchronous read: fetch bytes from store, decode, scatter into out. + + Runs entirely on the calling thread. Only available when + ``supports_sync_io`` is True. Called by ``_get_selection_sync`` in + ``array.py`` when the sync bypass is active. + """ + raise NotImplementedError + + def write_sync( + self, + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + value: NDBuffer, + drop_axes: tuple[int, ...] = (), + ) -> None: + """Synchronous write: gather from value, encode, persist to store. + + Runs entirely on the calling thread. Only available when + ``supports_sync_io`` is True. Called by ``_set_selection_sync`` in + ``array.py`` when the sync bypass is active. + """ + raise NotImplementedError + async def _batching_helper( func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]], diff --git a/src/zarr/abc/store.py b/src/zarr/abc/store.py index 87df89a683..104c63a6be 100644 --- a/src/zarr/abc/store.py +++ b/src/zarr/abc/store.py @@ -16,7 +16,14 @@ from zarr.core.buffer import Buffer, BufferPrototype -__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"] +__all__ = [ + "ByteGetter", + "ByteSetter", + "Store", + "SyncByteGetter", + "SyncByteSetter", + "set_or_delete", +] @dataclass @@ -700,6 +707,24 @@ async def delete(self) -> None: ... async def set_if_not_exists(self, default: Buffer) -> None: ... +@runtime_checkable +class SyncByteGetter(Protocol): + """Protocol for stores that support synchronous byte reads.""" + + def get_sync( + self, prototype: BufferPrototype, byte_range: ByteRequest | None = None + ) -> Buffer | None: ... + + +@runtime_checkable +class SyncByteSetter(SyncByteGetter, Protocol): + """Protocol for stores that support synchronous byte reads, writes, and deletes.""" + + def set_sync(self, value: Buffer) -> None: ... + + def delete_sync(self) -> None: ... + + async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None: """Set or delete a value in a byte setter diff --git a/src/zarr/api/asynchronous.py b/src/zarr/api/asynchronous.py index 6164cda957..6ad92025ac 100644 --- a/src/zarr/api/asynchronous.py +++ b/src/zarr/api/asynchronous.py @@ -386,7 +386,9 @@ async def open( is_v3_array = zarr_format == 3 and _metadata_dict.get("node_type") == "array" if is_v3_array or zarr_format == 2: return AsyncArray( - store_path=store_path, metadata=_metadata_dict, config=kwargs.get("config") + store_path=store_path, + metadata=_metadata_dict, + config=kwargs.get("config"), ) except (AssertionError, FileNotFoundError, NodeTypeValidationError): pass @@ -1279,7 +1281,10 @@ async def open_array( _warn_write_empty_chunks_kwarg() try: - return await AsyncArray.open(store_path, zarr_format=zarr_format) + return await AsyncArray.open( + store_path, + zarr_format=zarr_format, + ) except FileNotFoundError as err: if not store_path.read_only and mode in _CREATE_MODES: overwrite = _infer_overwrite(mode) diff --git a/src/zarr/codecs/blosc.py b/src/zarr/codecs/blosc.py index 5b91cfa005..fd1e3d449b 100644 --- a/src/zarr/codecs/blosc.py +++ b/src/zarr/codecs/blosc.py @@ -299,28 +299,29 @@ def _blosc_codec(self) -> Blosc: config_dict["typesize"] = self.typesize return Blosc.from_config(config_dict) + def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer: + return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype) + + def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None: + # Since blosc only support host memory, we convert the input and output of the encoding + # between numpy array and buffer + return chunk_spec.prototype.buffer.from_bytes( + self._blosc_codec.encode(chunk_bytes.as_numpy_array()) + ) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - # Since blosc only support host memory, we convert the input and output of the encoding - # between numpy array and buffer - return await asyncio.to_thread( - lambda chunk: chunk_spec.prototype.buffer.from_bytes( - self._blosc_codec.encode(chunk.as_numpy_array()) - ), - chunk_bytes, - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/codecs/bytes.py b/src/zarr/codecs/bytes.py index 1fbdeef497..3d62eac2bb 100644 --- a/src/zarr/codecs/bytes.py +++ b/src/zarr/codecs/bytes.py @@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: ) return self - async def _decode_single( + def _decode_sync( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, @@ -88,7 +88,7 @@ async def _decode_single( ) return chunk_array - async def _encode_single( + def _encode_sync( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, @@ -109,5 +109,19 @@ async def _encode_single( nd_array = nd_array.ravel().view(dtype="B") return chunk_spec.prototype.buffer.from_array_like(nd_array) + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + async def _encode_single( + self, + chunk_array: NDBuffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_array, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/crc32c_.py b/src/zarr/codecs/crc32c_.py index 9536d0d558..3cd3aef873 100644 --- a/src/zarr/codecs/crc32c_.py +++ b/src/zarr/codecs/crc32c_.py @@ -31,11 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "crc32c"} - async def _decode_single( - self, - chunk_bytes: Buffer, - chunk_spec: ArraySpec, - ) -> Buffer: + def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer: data = chunk_bytes.as_numpy_array() crc32_bytes = data[-4:] inner_bytes = data[:-4] @@ -51,11 +47,7 @@ async def _decode_single( ) return chunk_spec.prototype.buffer.from_array_like(inner_bytes) - async def _encode_single( - self, - chunk_bytes: Buffer, - chunk_spec: ArraySpec, - ) -> Buffer | None: + def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None: data = chunk_bytes.as_numpy_array() # Calculate the checksum and "cast" it to a numpy array checksum = np.array( @@ -64,5 +56,19 @@ async def _encode_single( # Append the checksum (as bytes) to the data return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B"))) + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + return self._decode_sync(chunk_bytes, chunk_spec) + + async def _encode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer | None: + return self._encode_sync(chunk_bytes, chunk_spec) + def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length + 4 diff --git a/src/zarr/codecs/gzip.py b/src/zarr/codecs/gzip.py index 610ca9dadd..a883b0d640 100644 --- a/src/zarr/codecs/gzip.py +++ b/src/zarr/codecs/gzip.py @@ -2,6 +2,7 @@ import asyncio from dataclasses import dataclass +from functools import cached_property from typing import TYPE_CHECKING from numcodecs.gzip import GZip @@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self: def to_dict(self) -> dict[str, JSON]: return {"name": "gzip", "configuration": {"level": self.level}} + # Cache the numcodecs GZip instance. GzipCodec is a frozen dataclass, + # so `level` never changes after construction, making this safe. + # This matches the pattern used by ZstdCodec._zstd_codec and + # BloscCodec._blosc_codec. Without caching, a new GZip(level) was + # created on every encode/decode call. + @cached_property + def _gzip_codec(self) -> GZip: + return GZip(self.level) + + def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer: + # Use the cached codec instance instead of creating GZip(self.level) + # each time. The async _decode_single delegates to this method via + # asyncio.to_thread, so both paths benefit from the cache. + return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype) + + def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None: + return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - return await asyncio.to_thread( - as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size( self, diff --git a/src/zarr/codecs/sharding.py b/src/zarr/codecs/sharding.py index 85162c2f74..74422309de 100644 --- a/src/zarr/codecs/sharding.py +++ b/src/zarr/codecs/sharding.py @@ -16,6 +16,7 @@ ArrayBytesCodecPartialEncodeMixin, Codec, CodecPipeline, + SupportsSyncCodec, ) from zarr.abc.store import ( ByteGetter, @@ -100,6 +101,21 @@ async def get( start, stop = _normalize_byte_range_index(value, byte_range) return value[start:stop] + def get_sync( + self, prototype: BufferPrototype, byte_range: ByteRequest | None = None + ) -> Buffer | None: + # Sync equivalent of get() — just a dict lookup, no IO. + assert prototype == default_buffer_prototype(), ( + f"prototype is not supported within shards currently. diff: {prototype} != {default_buffer_prototype()}" + ) + value = self.shard_dict.get(self.chunk_coords) + if value is None: + return None + if byte_range is None: + return value + start, stop = _normalize_byte_range_index(value, byte_range) + return value[start:stop] + @dataclass(frozen=True) class _ShardingByteSetter(_ShardingByteGetter, ByteSetter): @@ -115,6 +131,12 @@ async def delete(self) -> None: async def set_if_not_exists(self, default: Buffer) -> None: self.shard_dict.setdefault(self.chunk_coords, default) + def set_sync(self, value: Buffer) -> None: + self.shard_dict[self.chunk_coords] = value + + def delete_sync(self) -> None: + del self.shard_dict[self.chunk_coords] + class _ShardIndex(NamedTuple): # dtype uint64, shape (chunks_per_shard_0, chunks_per_shard_1, ..., 2) @@ -242,6 +264,22 @@ async def from_bytes( obj.index = await codec._decode_shard_index(shard_index_bytes, chunks_per_shard) return obj + @classmethod + def from_bytes_sync( + cls, buf: Buffer, codec: ShardingCodec, chunks_per_shard: tuple[int, ...] + ) -> _ShardReader: + """Synchronous version of from_bytes — decodes the shard index inline.""" + shard_index_size = codec._shard_index_size(chunks_per_shard) + obj = cls() + obj.buf = buf + if codec.index_location == ShardingCodecIndexLocation.start: + shard_index_bytes = obj.buf[:shard_index_size] + else: + shard_index_bytes = obj.buf[-shard_index_size:] + + obj.index = codec._decode_shard_index_sync(shard_index_bytes, chunks_per_shard) + return obj + @classmethod def create_empty( cls, chunks_per_shard: tuple[int, ...], buffer_prototype: BufferPrototype | None = None @@ -447,6 +485,319 @@ async def _decode_single( return out + def _decode_sync( + self, + shard_bytes: Buffer, + shard_spec: ArraySpec, + ) -> NDBuffer: + """Synchronous full-shard decode. + + Receives the complete shard bytes, decodes the index inline, then + decodes each inner chunk through the inner codec pipeline's sync path. + The inner codec pipeline's read_sync uses _ShardingByteGetter.get_sync + (a dict lookup) for IO, so the entire operation is synchronous. + """ + shard_shape = shard_spec.shape + chunk_shape = self.chunk_shape + chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) + + indexer = BasicIndexer( + tuple(slice(0, s) for s in shard_shape), + shape=shard_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), + ) + + # setup output array + out = chunk_spec.prototype.nd_buffer.empty( + shape=shard_shape, + dtype=shard_spec.dtype.to_native_dtype(), + order=shard_spec.order, + ) + shard_dict = _ShardReader.from_bytes_sync(shard_bytes, self, chunks_per_shard) + + if shard_dict.index.is_all_empty(): + out.fill(shard_spec.fill_value) + return out + + # Decode each inner chunk synchronously through the inner pipeline. + # _ShardingByteGetter.get_sync is a dict lookup, so IO is trivial. + self.codec_pipeline.read_sync( + [ + ( + _ShardingByteGetter(shard_dict, chunk_coords), + chunk_spec, + chunk_selection, + out_selection, + is_complete_shard, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer + ], + out, + ) + + return out + + def _encode_sync( + self, + shard_array: NDBuffer, + shard_spec: ArraySpec, + ) -> Buffer | None: + """Synchronous full-shard encode. + + Encodes each inner chunk through the inner codec pipeline's sync path, + then assembles the shard with index bytes. + """ + shard_shape = shard_spec.shape + chunk_shape = self.chunk_shape + chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) + + indexer = list( + BasicIndexer( + tuple(slice(0, s) for s in shard_shape), + shape=shard_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), + ) + ) + + shard_builder = dict.fromkeys(morton_order_iter(chunks_per_shard)) + + # Encode each inner chunk synchronously + self.codec_pipeline.write_sync( + [ + ( + _ShardingByteSetter(shard_builder, chunk_coords), + chunk_spec, + chunk_selection, + out_selection, + is_complete_shard, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer + ], + shard_array, + ) + + return self._encode_shard_dict_sync( + shard_builder, + chunks_per_shard=chunks_per_shard, + buffer_prototype=default_buffer_prototype(), + ) + + def _encode_shard_dict_sync( + self, + map: ShardMapping, + chunks_per_shard: tuple[int, ...], + buffer_prototype: BufferPrototype, + ) -> Buffer | None: + """Synchronous version of _encode_shard_dict.""" + index = _ShardIndex.create_empty(chunks_per_shard) + + buffers = [] + + template = buffer_prototype.buffer.create_zero_length() + chunk_start = 0 + for chunk_coords in morton_order_iter(chunks_per_shard): + value = map.get(chunk_coords) + if value is None: + continue + + if len(value) == 0: + continue + + chunk_length = len(value) + buffers.append(value) + index.set_chunk_slice(chunk_coords, slice(chunk_start, chunk_start + chunk_length)) + chunk_start += chunk_length + + if len(buffers) == 0: + return None + + index_bytes = self._encode_shard_index_sync(index) + if self.index_location == ShardingCodecIndexLocation.start: + empty_chunks_mask = index.offsets_and_lengths[..., 0] == MAX_UINT_64 + index.offsets_and_lengths[~empty_chunks_mask, 0] += len(index_bytes) + index_bytes = self._encode_shard_index_sync( + index + ) # encode again with corrected offsets + buffers.insert(0, index_bytes) + else: + buffers.append(index_bytes) + + return template.combine(buffers) + + def _load_shard_index_maybe_sync( + self, byte_getter: Any, chunks_per_shard: tuple[int, ...] + ) -> _ShardIndex | None: + """Synchronous version of _load_shard_index_maybe. + + Reads the shard index bytes via byte_getter.get_sync (a sync byte-range + read from the store), then decodes the index inline. + """ + shard_index_size = self._shard_index_size(chunks_per_shard) + if self.index_location == ShardingCodecIndexLocation.start: + index_bytes = byte_getter.get_sync( + prototype=numpy_buffer_prototype(), + byte_range=RangeByteRequest(0, shard_index_size), + ) + else: + index_bytes = byte_getter.get_sync( + prototype=numpy_buffer_prototype(), + byte_range=SuffixByteRequest(shard_index_size), + ) + if index_bytes is not None: + return self._decode_shard_index_sync(index_bytes, chunks_per_shard) + return None + + def _load_full_shard_maybe_sync( + self, + byte_getter: Any, + prototype: BufferPrototype, + chunks_per_shard: tuple[int, ...], + ) -> _ShardReader | None: + """Synchronous version of _load_full_shard_maybe.""" + shard_bytes = byte_getter.get_sync(prototype=prototype) + return ( + _ShardReader.from_bytes_sync(shard_bytes, self, chunks_per_shard) + if shard_bytes + else None + ) + + def _decode_partial_sync( + self, + byte_getter: Any, + selection: SelectorTuple, + shard_spec: ArraySpec, + ) -> NDBuffer | None: + """Synchronous partial decode: fetch shard index + requested chunks + via sync byte-range reads, then decode through the inner pipeline. + + The byte_getter is a StorePath with get_sync(). After fetching the + index (one byte-range read), each requested chunk is another byte-range + read. Once all bytes are in memory, the inner pipeline decodes them + synchronously via read_sync with _ShardingByteGetter (dict lookups). + """ + shard_shape = shard_spec.shape + chunk_shape = self.chunk_shape + chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) + + indexer = get_indexer( + selection, + shape=shard_shape, + chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape), + ) + + # setup output array + out = shard_spec.prototype.nd_buffer.empty( + shape=indexer.shape, + dtype=shard_spec.dtype.to_native_dtype(), + order=shard_spec.order, + ) + + indexed_chunks = list(indexer) + all_chunk_coords = {chunk_coords for chunk_coords, *_ in indexed_chunks} + + # reading bytes of all requested chunks + shard_dict: ShardMapping = {} + if self._is_total_shard(all_chunk_coords, chunks_per_shard): + # read entire shard + shard_dict_maybe = self._load_full_shard_maybe_sync( + byte_getter=byte_getter, + prototype=chunk_spec.prototype, + chunks_per_shard=chunks_per_shard, + ) + if shard_dict_maybe is None: + return None + shard_dict = shard_dict_maybe + else: + # read some chunks within the shard + shard_index = self._load_shard_index_maybe_sync(byte_getter, chunks_per_shard) + if shard_index is None: + return None + shard_dict = {} + for chunk_coords in all_chunk_coords: + chunk_byte_slice = shard_index.get_chunk_slice(chunk_coords) + if chunk_byte_slice: + chunk_bytes = byte_getter.get_sync( + prototype=chunk_spec.prototype, + byte_range=RangeByteRequest(chunk_byte_slice[0], chunk_byte_slice[1]), + ) + if chunk_bytes: + shard_dict[chunk_coords] = chunk_bytes + + # decoding chunks and writing them into the output buffer + self.codec_pipeline.read_sync( + [ + ( + _ShardingByteGetter(shard_dict, chunk_coords), + chunk_spec, + chunk_selection, + out_selection, + is_complete_shard, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer + ], + out, + ) + + if hasattr(indexer, "sel_shape"): + return out.reshape(indexer.sel_shape) + else: + return out + + def _encode_partial_sync( + self, + byte_setter: Any, + shard_array: NDBuffer, + selection: SelectorTuple, + shard_spec: ArraySpec, + ) -> None: + """Synchronous partial encode: read existing shard, merge new data, + encode and write back via sync store IO.""" + shard_shape = shard_spec.shape + chunk_shape = self.chunk_shape + chunks_per_shard = self._get_chunks_per_shard(shard_spec) + chunk_spec = self._get_chunk_spec(shard_spec) + + shard_reader = self._load_full_shard_maybe_sync( + byte_getter=byte_setter, + prototype=chunk_spec.prototype, + chunks_per_shard=chunks_per_shard, + ) + shard_reader = shard_reader or _ShardReader.create_empty(chunks_per_shard) + shard_dict = {k: shard_reader.get(k) for k in morton_order_iter(chunks_per_shard)} + + indexer = list( + get_indexer( + selection, shape=shard_shape, chunk_grid=RegularChunkGrid(chunk_shape=chunk_shape) + ) + ) + + self.codec_pipeline.write_sync( + [ + ( + _ShardingByteSetter(shard_dict, chunk_coords), + chunk_spec, + chunk_selection, + out_selection, + is_complete_shard, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_shard in indexer + ], + shard_array, + ) + buf = self._encode_shard_dict_sync( + shard_dict, + chunks_per_shard=chunks_per_shard, + buffer_prototype=default_buffer_prototype(), + ) + + if buf is None: + byte_setter.delete_sync() + else: + byte_setter.set_sync(buf) + async def _decode_partial_single( self, byte_getter: ByteGetter, @@ -661,6 +1012,74 @@ def _is_total_shard( chunk_coords in all_chunk_coords for chunk_coords in c_order_iter(chunks_per_shard) ) + def _decode_shard_index_sync( + self, index_bytes: Buffer, chunks_per_shard: tuple[int, ...] + ) -> _ShardIndex: + """Decode shard index synchronously by running index codecs inline. + + The index codecs are always simple codecs (BytesCodec + Crc32cCodec) + that support _decode_sync. We run them directly without going through + a pipeline: bytes-bytes codecs in reverse, then the array-bytes codec. + """ + index_chunk_spec = self._get_index_chunk_spec(chunks_per_shard) + + # Classify index codecs the same way a pipeline would + from zarr.core.codec_pipeline import codecs_from_list + + aa_codecs, ab_codec, bb_codecs = codecs_from_list(list(self.index_codecs)) + + # Resolve metadata through the chain + spec = index_chunk_spec + aa_with_spec = [] + for aa in aa_codecs: + aa_with_spec.append((aa, spec)) + spec = aa.resolve_metadata(spec) + ab_spec = spec + spec = ab_codec.resolve_metadata(spec) + bb_with_spec = [] + for bb in bb_codecs: + bb_with_spec.append((bb, spec)) + spec = bb.resolve_metadata(spec) + + # Decode: reverse bb, then ab, then reverse aa + bb_out: Any = index_bytes + for bb_codec, s in reversed(bb_with_spec): + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, s) + ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec) + for aa_codec, s in reversed(aa_with_spec): + ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, s) + + assert ab_out is not None + return _ShardIndex(ab_out.as_numpy_array()) + + def _encode_shard_index_sync(self, index: _ShardIndex) -> Buffer: + """Encode shard index synchronously by running index codecs inline.""" + index_chunk_spec = self._get_index_chunk_spec(index.chunks_per_shard) + + from zarr.core.codec_pipeline import codecs_from_list + + aa_codecs, ab_codec, bb_codecs = codecs_from_list(list(self.index_codecs)) + + aa_out: Any = get_ndbuffer_class().from_numpy_array(index.offsets_and_lengths) + + # Encode: aa forward, then ab, then bb forward + spec = index_chunk_spec + for aa_codec in aa_codecs: + assert aa_out is not None + aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) + spec = aa_codec.resolve_metadata(spec) + assert aa_out is not None + bb_out: Any = cast("SupportsSyncCodec", ab_codec)._encode_sync(aa_out, spec) + spec = ab_codec.resolve_metadata(spec) + for bb_codec in bb_codecs: + assert bb_out is not None + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec) + spec = bb_codec.resolve_metadata(spec) + + assert bb_out is not None + assert isinstance(bb_out, Buffer) + return bb_out + async def _decode_shard_index( self, index_bytes: Buffer, chunks_per_shard: tuple[int, ...] ) -> _ShardIndex: diff --git a/src/zarr/codecs/transpose.py b/src/zarr/codecs/transpose.py index a8570b6e8f..abbebfd090 100644 --- a/src/zarr/codecs/transpose.py +++ b/src/zarr/codecs/transpose.py @@ -95,20 +95,26 @@ def resolve_metadata(self, chunk_spec: ArraySpec) -> ArraySpec: prototype=chunk_spec.prototype, ) + def _decode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> NDBuffer: + inverse_order = np.argsort(self.order) + return chunk_array.transpose(inverse_order) + + def _encode_sync(self, chunk_array: NDBuffer, _chunk_spec: ArraySpec) -> NDBuffer | None: + return chunk_array.transpose(self.order) + async def _decode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> NDBuffer: - inverse_order = np.argsort(self.order) - return chunk_array.transpose(inverse_order) + return self._decode_sync(chunk_array, chunk_spec) async def _encode_single( self, chunk_array: NDBuffer, _chunk_spec: ArraySpec, ) -> NDBuffer | None: - return chunk_array.transpose(self.order) + return self._encode_sync(chunk_array, _chunk_spec) def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: return input_byte_length diff --git a/src/zarr/codecs/vlen_utf8.py b/src/zarr/codecs/vlen_utf8.py index fb1fb76126..16de25001c 100644 --- a/src/zarr/codecs/vlen_utf8.py +++ b/src/zarr/codecs/vlen_utf8.py @@ -40,12 +40,7 @@ def to_dict(self) -> dict[str, JSON]: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return self - # TODO: expand the tests for this function - async def _decode_single( - self, - chunk_bytes: Buffer, - chunk_spec: ArraySpec, - ) -> NDBuffer: + def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> NDBuffer: assert isinstance(chunk_bytes, Buffer) raw_bytes = chunk_bytes.as_array_like() @@ -55,15 +50,25 @@ async def _decode_single( as_string_dtype = decoded.astype(chunk_spec.dtype.to_native_dtype(), copy=False) return chunk_spec.prototype.nd_buffer.from_numpy_array(as_string_dtype) + def _encode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> Buffer | None: + assert isinstance(chunk_array, NDBuffer) + return chunk_spec.prototype.buffer.from_bytes( + _vlen_utf8_codec.encode(chunk_array.as_numpy_array()) + ) + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + async def _encode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> Buffer | None: - assert isinstance(chunk_array, NDBuffer) - return chunk_spec.prototype.buffer.from_bytes( - _vlen_utf8_codec.encode(chunk_array.as_numpy_array()) - ) + return self._encode_sync(chunk_array, chunk_spec) def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: # what is input_byte_length for an object dtype? @@ -86,11 +91,7 @@ def to_dict(self) -> dict[str, JSON]: def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return self - async def _decode_single( - self, - chunk_bytes: Buffer, - chunk_spec: ArraySpec, - ) -> NDBuffer: + def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> NDBuffer: assert isinstance(chunk_bytes, Buffer) raw_bytes = chunk_bytes.as_array_like() @@ -99,15 +100,25 @@ async def _decode_single( decoded = _reshape_view(decoded, chunk_spec.shape) return chunk_spec.prototype.nd_buffer.from_numpy_array(decoded) + def _encode_sync(self, chunk_array: NDBuffer, chunk_spec: ArraySpec) -> Buffer | None: + assert isinstance(chunk_array, NDBuffer) + return chunk_spec.prototype.buffer.from_bytes( + _vlen_bytes_codec.encode(chunk_array.as_numpy_array()) + ) + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> NDBuffer: + return self._decode_sync(chunk_bytes, chunk_spec) + async def _encode_single( self, chunk_array: NDBuffer, chunk_spec: ArraySpec, ) -> Buffer | None: - assert isinstance(chunk_array, NDBuffer) - return chunk_spec.prototype.buffer.from_bytes( - _vlen_bytes_codec.encode(chunk_array.as_numpy_array()) - ) + return self._encode_sync(chunk_array, chunk_spec) def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int: # what is input_byte_length for an object dtype? diff --git a/src/zarr/codecs/zstd.py b/src/zarr/codecs/zstd.py index 27cc9a7777..fab4fd573e 100644 --- a/src/zarr/codecs/zstd.py +++ b/src/zarr/codecs/zstd.py @@ -71,23 +71,25 @@ def _zstd_codec(self) -> Zstd: config_dict = {"level": self.level, "checksum": self.checksum} return Zstd.from_config(config_dict) + def _decode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer: + return as_numpy_array_wrapper(self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype) + + def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None: + return as_numpy_array_wrapper(self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype) + async def _decode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._zstd_codec.decode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec) async def _encode_single( self, chunk_bytes: Buffer, chunk_spec: ArraySpec, ) -> Buffer | None: - return await asyncio.to_thread( - as_numpy_array_wrapper, self._zstd_codec.encode, chunk_bytes, chunk_spec.prototype - ) + return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec) def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int: raise NotImplementedError diff --git a/src/zarr/core/array.py b/src/zarr/core/array.py index 564d0e915a..8c8b645ece 100644 --- a/src/zarr/core/array.py +++ b/src/zarr/core/array.py @@ -25,6 +25,7 @@ import zarr from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec from zarr.abc.numcodec import Numcodec, _is_numcodec +from zarr.abc.store import SyncByteGetter from zarr.codecs._v2 import V2Codec from zarr.codecs.bytes import BytesCodec from zarr.codecs.vlen_utf8 import VLenBytesCodec, VLenUTF8Codec @@ -1973,6 +1974,29 @@ def config(self) -> ArrayConfig: """ return self.async_array.config + def _can_use_sync_path(self) -> bool: + """Check if we can bypass the event loop entirely for read/write. + + Two conditions must hold: + + 1. The codec pipeline supports fully synchronous IO (all codecs + implement ``SupportsSyncCodec``). This is True for + BatchedCodecPipeline when all codecs support sync. + + 2. The store supports synchronous operations (implements + ``SyncByteGetter``). MemoryStore and LocalStore provide this; + remote stores do not. + + When both hold, the selection methods below call + _get_selection_sync / _set_selection_sync directly, running the + entire read/write path on the calling thread with zero async + overhead. Otherwise, the async path with concurrent IO overlap + is used automatically. + """ + pipeline = self.async_array.codec_pipeline + store = self.async_array.store_path.store + return getattr(pipeline, "supports_sync_io", False) and isinstance(store, SyncByteGetter) + @classmethod @deprecated("Use zarr.create_array instead.", category=ZarrDeprecationWarning) def create( @@ -3049,9 +3073,28 @@ def get_basic_selection( if prototype is None: prototype = default_buffer_prototype() + indexer = BasicIndexer(selection, self.shape, self.metadata.chunk_grid) + # Sync bypass: when the codec pipeline and store both support + # synchronous operation, skip the sync() → event loop bridge and + # run the entire read path on the calling thread. This pattern is + # repeated in all 10 get_*/set_* methods below. + if self._can_use_sync_path(): + return _get_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + out=out, + fields=fields, + prototype=prototype, + ) + # Fallback: submit the async coroutine to the background event loop + # thread via sync(). Used for remote stores or when the sync bypass + # is not active. return sync( self.async_array._get_selection( - BasicIndexer(selection, self.shape, self.metadata.chunk_grid), + indexer, out=out, fields=fields, prototype=prototype, @@ -3159,6 +3202,18 @@ def set_basic_selection( if prototype is None: prototype = default_buffer_prototype() indexer = BasicIndexer(selection, self.shape, self.metadata.chunk_grid) + if self._can_use_sync_path(): + _set_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + value, + fields=fields, + prototype=prototype, + ) + return sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) def get_orthogonal_selection( @@ -3287,6 +3342,17 @@ def get_orthogonal_selection( if prototype is None: prototype = default_buffer_prototype() indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid) + if self._can_use_sync_path(): + return _get_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + out=out, + fields=fields, + prototype=prototype, + ) return sync( self.async_array._get_selection( indexer=indexer, out=out, fields=fields, prototype=prototype @@ -3406,9 +3472,19 @@ def set_orthogonal_selection( if prototype is None: prototype = default_buffer_prototype() indexer = OrthogonalIndexer(selection, self.shape, self.metadata.chunk_grid) - return sync( - self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype) - ) + if self._can_use_sync_path(): + _set_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + value, + fields=fields, + prototype=prototype, + ) + return + sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) def get_mask_selection( self, @@ -3494,6 +3570,17 @@ def get_mask_selection( if prototype is None: prototype = default_buffer_prototype() indexer = MaskIndexer(mask, self.shape, self.metadata.chunk_grid) + if self._can_use_sync_path(): + return _get_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + out=out, + fields=fields, + prototype=prototype, + ) return sync( self.async_array._get_selection( indexer=indexer, out=out, fields=fields, prototype=prototype @@ -3584,6 +3671,18 @@ def set_mask_selection( if prototype is None: prototype = default_buffer_prototype() indexer = MaskIndexer(mask, self.shape, self.metadata.chunk_grid) + if self._can_use_sync_path(): + _set_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + value, + fields=fields, + prototype=prototype, + ) + return sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) def get_coordinate_selection( @@ -3672,11 +3771,23 @@ def get_coordinate_selection( if prototype is None: prototype = default_buffer_prototype() indexer = CoordinateIndexer(selection, self.shape, self.metadata.chunk_grid) - out_array = sync( - self.async_array._get_selection( - indexer=indexer, out=out, fields=fields, prototype=prototype + if self._can_use_sync_path(): + out_array = _get_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + out=out, + fields=fields, + prototype=prototype, + ) + else: + out_array = sync( + self.async_array._get_selection( + indexer=indexer, out=out, fields=fields, prototype=prototype + ) ) - ) if hasattr(out_array, "shape"): # restore shape @@ -3786,6 +3897,18 @@ def set_coordinate_selection( f"elements with an array of {value.shape[0]} elements." ) + if self._can_use_sync_path(): + _set_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + value, + fields=fields, + prototype=prototype, + ) + return sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) def get_block_selection( @@ -3887,6 +4010,17 @@ def get_block_selection( if prototype is None: prototype = default_buffer_prototype() indexer = BlockIndexer(selection, self.shape, self.metadata.chunk_grid) + if self._can_use_sync_path(): + return _get_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + out=out, + fields=fields, + prototype=prototype, + ) return sync( self.async_array._get_selection( indexer=indexer, out=out, fields=fields, prototype=prototype @@ -3988,6 +4122,18 @@ def set_block_selection( if prototype is None: prototype = default_buffer_prototype() indexer = BlockIndexer(selection, self.shape, self.metadata.chunk_grid) + if self._can_use_sync_path(): + _set_selection_sync( + self.async_array.store_path, + self.async_array.metadata, + self.async_array.codec_pipeline, + self.async_array.config, + indexer, + value, + fields=fields, + prototype=prototype, + ) + return sync(self.async_array._set_selection(indexer, value, fields=fields, prototype=prototype)) @property @@ -5619,6 +5765,174 @@ async def _get_selection( return out_buffer.as_ndarray_like() +def _get_selection_sync( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + indexer: Indexer, + *, + prototype: BufferPrototype, + out: NDBuffer | None = None, + fields: Fields | None = None, +) -> NDArrayLikeOrScalar: + """Synchronous version of _get_selection — bypasses the event loop entirely. + + This function mirrors ``_get_selection`` (the async version defined above) + exactly, with one critical difference: it calls ``codec_pipeline.read_sync()`` + instead of ``await codec_pipeline.read()``. This means the entire operation + — store IO, codec decode, buffer scatter — runs on the calling thread with + no event loop involvement. + + Called by ``Array.get_basic_selection``, ``get_orthogonal_selection``, etc. + when ``Array._can_use_sync_path()`` returns True. + + The setup logic (dtype resolution, output buffer creation, field checks) is + duplicated from the async version rather than extracted into a shared helper. + This keeps the hot path simple and avoids adding indirection. The two + versions should be kept in sync manually. + """ + # Get dtype from metadata — same logic as async _get_selection + if metadata.zarr_format == 2: + zdtype = metadata.dtype + else: + zdtype = metadata.data_type + dtype = zdtype.to_native_dtype() + + # Determine memory order + if metadata.zarr_format == 2: + order = metadata.order + else: + order = config.order + + # check fields are sensible + out_dtype = check_fields(fields, dtype) + + # setup output buffer + if out is not None: + if isinstance(out, NDBuffer): + out_buffer = out + else: + raise TypeError(f"out argument needs to be an NDBuffer. Got {type(out)!r}") + if out_buffer.shape != indexer.shape: + raise ValueError( + f"shape of out argument doesn't match. Expected {indexer.shape}, got {out.shape}" + ) + else: + out_buffer = prototype.nd_buffer.empty( + shape=indexer.shape, + dtype=out_dtype, + order=order, + ) + if product(indexer.shape) > 0: + _config = config + if metadata.zarr_format == 2: + _config = replace(_config, order=order) + + # This is the key difference from the async version: read_sync() + # runs the entire pipeline (store fetch → codec decode → scatter) + # on this thread. Each entry in the list is a (StorePath, ArraySpec, + # chunk_selection, out_selection, is_complete_chunk) tuple. + # StorePath acts as the ByteGetter — its get_sync() method is called + # by the pipeline to fetch raw chunk bytes from the store. + codec_pipeline.read_sync( + [ + ( + store_path / metadata.encode_chunk_key(chunk_coords), + metadata.get_chunk_spec(chunk_coords, _config, prototype=prototype), + chunk_selection, + out_selection, + is_complete_chunk, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer + ], + out_buffer, + drop_axes=indexer.drop_axes, + ) + if isinstance(indexer, BasicIndexer) and indexer.shape == (): + return out_buffer.as_scalar() + return out_buffer.as_ndarray_like() + + +def _set_selection_sync( + store_path: StorePath, + metadata: ArrayMetadata, + codec_pipeline: CodecPipeline, + config: ArrayConfig, + indexer: Indexer, + value: npt.ArrayLike, + *, + prototype: BufferPrototype, + fields: Fields | None = None, +) -> None: + """Synchronous version of _set_selection — bypasses the event loop entirely. + + Mirrors ``_set_selection`` (the async version) with the same setup logic + (dtype coercion, value shape validation, buffer wrapping) but calls + ``codec_pipeline.write_sync()`` instead of ``await codec_pipeline.write()``. + + Called by ``Array.set_basic_selection``, ``set_orthogonal_selection``, etc. + when ``Array._can_use_sync_path()`` returns True. + """ + # Get dtype from metadata + if metadata.zarr_format == 2: + zdtype = metadata.dtype + else: + zdtype = metadata.data_type + dtype = zdtype.to_native_dtype() + + # check fields are sensible + check_fields(fields, dtype) + fields = check_no_multi_fields(fields) + + # check value shape + if np.isscalar(value): + array_like = prototype.buffer.create_zero_length().as_array_like() + if isinstance(array_like, np._typing._SupportsArrayFunc): + array_like_ = cast("np._typing._SupportsArrayFunc", array_like) + value = np.asanyarray(value, dtype=dtype, like=array_like_) + else: + if not hasattr(value, "shape"): + value = np.asarray(value, dtype) + if not hasattr(value, "dtype") or value.dtype.name != dtype.name: + if hasattr(value, "astype"): + value = value.astype(dtype=dtype, order="A") + else: + value = np.array(value, dtype=dtype, order="A") + value = cast("NDArrayLike", value) + + value_buffer = prototype.nd_buffer.from_ndarray_like(value) + + # Determine memory order + if metadata.zarr_format == 2: + order = metadata.order + else: + order = config.order + + _config = config + if metadata.zarr_format == 2: + _config = replace(_config, order=order) + + # Key difference from async version: write_sync() runs the entire + # pipeline (read existing → decode → merge → encode → store write) + # on this thread. StorePath acts as ByteSetter — its set_sync() and + # delete_sync() methods persist/remove chunk bytes directly. + codec_pipeline.write_sync( + [ + ( + store_path / metadata.encode_chunk_key(chunk_coords), + metadata.get_chunk_spec(chunk_coords, _config, prototype), + chunk_selection, + out_selection, + is_complete_chunk, + ) + for chunk_coords, chunk_selection, out_selection, is_complete_chunk in indexer + ], + value_buffer, + drop_axes=indexer.drop_axes, + ) + + async def _getitem( store_path: StorePath, metadata: ArrayMetadata, diff --git a/src/zarr/core/codec_pipeline.py b/src/zarr/core/codec_pipeline.py index fd557ac43e..26d56fa97a 100644 --- a/src/zarr/core/codec_pipeline.py +++ b/src/zarr/core/codec_pipeline.py @@ -1,8 +1,10 @@ from __future__ import annotations -from dataclasses import dataclass -from itertools import islice, pairwise -from typing import TYPE_CHECKING, Any, TypeVar +import os +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass, field +from itertools import pairwise +from typing import TYPE_CHECKING, Any, TypeVar, cast from warnings import warn from zarr.abc.codec import ( @@ -13,8 +15,9 @@ BytesBytesCodec, Codec, CodecPipeline, + SupportsSyncCodec, ) -from zarr.core.common import concurrent_map +from zarr.core.common import concurrent_map, product from zarr.core.config import config from zarr.core.indexing import SelectorTuple, is_scalar from zarr.errors import ZarrUserWarning @@ -43,14 +46,6 @@ def _unzip2(iterable: Iterable[tuple[T, U]]) -> tuple[list[T], list[U]]: return (out0, out1) -def batched(iterable: Iterable[T], n: int) -> Iterable[tuple[T, ...]]: - if n < 1: - raise ValueError("n must be at least one") - it = iter(iterable) - while batch := tuple(islice(it, n)): - yield batch - - def resolve_batched(codec: Codec, chunk_specs: Iterable[ArraySpec]) -> Iterable[ArraySpec]: return [codec.resolve_metadata(chunk_spec) for chunk_spec in chunk_specs] @@ -68,32 +63,118 @@ def fill_value_or_default(chunk_spec: ArraySpec) -> Any: return fill_value +# --------------------------------------------------------------------------- +# Thread pool for parallel codec compute +# --------------------------------------------------------------------------- + +# Minimum chunk size (in bytes) to consider using the thread pool. +# Below this, per-chunk codec work is too small to offset dispatch overhead. +_MIN_CHUNK_NBYTES_FOR_POOL = 100_000 # 100 KB + + +def _get_codec_worker_config() -> tuple[bool, int, int]: + """Read the ``threading.codec_workers`` config. + + Returns (enabled, min_workers, max_workers). + """ + codec_workers = config.get("threading.codec_workers") + enabled: bool = codec_workers.get("enabled", True) + min_workers: int = codec_workers.get("min", 0) + max_workers: int = max(codec_workers.get("max") or os.cpu_count() or 4, min_workers) + return enabled, min_workers, max_workers + + +def _choose_workers(n_chunks: int, chunk_nbytes: int, codecs: Iterable[Codec]) -> int: + """Decide how many thread pool workers to use (0 = don't use pool). + + Respects ``threading.codec_workers`` config: + - ``enabled``: if False, always returns 0. + - ``min``: floor for the number of workers. + - ``max``: ceiling for the number of workers (default: ``os.cpu_count()``). + """ + enabled, min_workers, max_workers = _get_codec_worker_config() + if not enabled: + return 0 + + if n_chunks < 2: + return min_workers + + # Only use the pool when at least one codec does real work + # (BytesBytesCodec = compression/checksum, which releases the GIL in C) + # and the chunks are large enough to offset dispatch overhead. + if not any(isinstance(c, BytesBytesCodec) for c in codecs) and min_workers == 0: + return 0 + if chunk_nbytes < _MIN_CHUNK_NBYTES_FOR_POOL and min_workers == 0: + return 0 + + return max(min_workers, min(n_chunks, max_workers)) + + +def _get_pool() -> ThreadPoolExecutor: + """Get the module-level thread pool, creating it lazily.""" + global _pool + if _pool is None: + _, _, max_workers = _get_codec_worker_config() + _pool = ThreadPoolExecutor(max_workers=max_workers) + return _pool + + +_pool: ThreadPoolExecutor | None = None + +# Sentinel to distinguish "delete this key" from None. +_DELETED = object() + + @dataclass(frozen=True) class BatchedCodecPipeline(CodecPipeline): - """Default codec pipeline. + """Codec pipeline that automatically selects the optimal execution strategy. + + When all codecs support synchronous operations and the store supports + sync IO, this pipeline runs the entire read/write path on the calling + thread with zero async overhead, using a thread pool for parallel codec + compute on multi-chunk operations. - This batched codec pipeline divides the chunk batches into batches of a configurable - batch size ("mini-batch"). Fetching, decoding, encoding and storing are performed in - lock step for each mini-batch. Multiple mini-batches are processing concurrently. + When the store requires async IO (e.g. cloud stores), this pipeline uses + the async path with concurrent IO overlap via ``concurrent_map``. + + This automatic dispatch eliminates the need for users to choose between + pipeline implementations — the right strategy is selected based on codec + and store capabilities. """ array_array_codecs: tuple[ArrayArrayCodec, ...] array_bytes_codec: ArrayBytesCodec bytes_bytes_codecs: tuple[BytesBytesCodec, ...] - batch_size: int + batch_size: int | None = None + + _all_sync: bool = field(default=False, init=False, repr=False, compare=False) + + def __post_init__(self) -> None: + if self.batch_size is not None: + warn( + "The 'batch_size' parameter is deprecated and has no effect. " + "Batch size is now determined automatically.", + FutureWarning, + stacklevel=2, + ) + # Compute once; frozen dataclass requires object.__setattr__. + object.__setattr__( + self, + "_all_sync", + all(isinstance(c, SupportsSyncCodec) for c in self), + ) def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self: return type(self).from_codecs(c.evolve_from_array_spec(array_spec=array_spec) for c in self) @classmethod - def from_codecs(cls, codecs: Iterable[Codec], *, batch_size: int | None = None) -> Self: - array_array_codecs, array_bytes_codec, bytes_bytes_codecs = codecs_from_list(codecs) + def from_codecs(cls, codecs: Iterable[Codec]) -> Self: + array_array_codecs, array_bytes_codec, bytes_bytes_codecs = codecs_from_list(list(codecs)) return cls( array_array_codecs=array_array_codecs, array_bytes_codec=array_bytes_codec, bytes_bytes_codecs=bytes_bytes_codecs, - batch_size=batch_size or config.get("codec_pipeline.batch_size"), ) @property @@ -149,6 +230,103 @@ def compute_encoded_size(self, byte_length: int, array_spec: ArraySpec) -> int: array_spec = codec.resolve_metadata(array_spec) return byte_length + # ------------------------------------------------------------------- + # Per-chunk sync codec chain + # ------------------------------------------------------------------- + + def _resolve_metadata_chain( + self, chunk_spec: ArraySpec + ) -> tuple[ + list[tuple[ArrayArrayCodec, ArraySpec]], + tuple[ArrayBytesCodec, ArraySpec], + list[tuple[BytesBytesCodec, ArraySpec]], + ]: + """Resolve metadata through the codec chain for a single chunk_spec.""" + aa_codecs_with_spec: list[tuple[ArrayArrayCodec, ArraySpec]] = [] + spec = chunk_spec + for aa_codec in self.array_array_codecs: + aa_codecs_with_spec.append((aa_codec, spec)) + spec = aa_codec.resolve_metadata(spec) + + ab_codec_with_spec = (self.array_bytes_codec, spec) + spec = self.array_bytes_codec.resolve_metadata(spec) + + bb_codecs_with_spec: list[tuple[BytesBytesCodec, ArraySpec]] = [] + for bb_codec in self.bytes_bytes_codecs: + bb_codecs_with_spec.append((bb_codec, spec)) + spec = bb_codec.resolve_metadata(spec) + + return (aa_codecs_with_spec, ab_codec_with_spec, bb_codecs_with_spec) + + def _decode_one( + self, + chunk_bytes: Buffer | None, + chunk_spec: ArraySpec, + aa_chain: list[tuple[ArrayArrayCodec, ArraySpec]], + ab_pair: tuple[ArrayBytesCodec, ArraySpec], + bb_chain: list[tuple[BytesBytesCodec, ArraySpec]], + ) -> NDBuffer | None: + """Decode a single chunk through the full codec chain, synchronously. + + Only called when ``_all_sync`` is True, so every codec implements + ``SupportsSyncCodec``. + """ + if chunk_bytes is None: + return None + + # Use Any to avoid verbose casts on every codec call — we know + # all codecs satisfy SupportsSyncCodec because _all_sync is True. + bb_out: Any = chunk_bytes + for bb_codec, spec in reversed(bb_chain): + bb_out = cast("SupportsSyncCodec", bb_codec)._decode_sync(bb_out, spec) + + ab_codec, ab_spec = ab_pair + ab_out: Any = cast("SupportsSyncCodec", ab_codec)._decode_sync(bb_out, ab_spec) + + for aa_codec, spec in reversed(aa_chain): + ab_out = cast("SupportsSyncCodec", aa_codec)._decode_sync(ab_out, spec) + + return ab_out # type: ignore[no-any-return] + + def _encode_one( + self, + chunk_array: NDBuffer | None, + chunk_spec: ArraySpec, + ) -> Buffer | None: + """Encode a single chunk through the full codec chain, synchronously. + + Only called when ``_all_sync`` is True, so every codec implements + ``SupportsSyncCodec``. + """ + if chunk_array is None: + return None + + spec = chunk_spec + aa_out: Any = chunk_array + + for aa_codec in self.array_array_codecs: + if aa_out is None: + return None + aa_out = cast("SupportsSyncCodec", aa_codec)._encode_sync(aa_out, spec) + spec = aa_codec.resolve_metadata(spec) + + if aa_out is None: + return None + bb_out: Any = cast("SupportsSyncCodec", self.array_bytes_codec)._encode_sync(aa_out, spec) + spec = self.array_bytes_codec.resolve_metadata(spec) + + for bb_codec in self.bytes_bytes_codecs: + if bb_out is None: + return None + bb_out = cast("SupportsSyncCodec", bb_codec)._encode_sync(bb_out, spec) + spec = bb_codec.resolve_metadata(spec) + + return bb_out # type: ignore[no-any-return] + + # ------------------------------------------------------------------- + # Batched async decode/encode (layer-by-layer across all chunks) + # ------------------------------------------------------------------- + def _codecs_with_resolved_metadata_batched( self, chunk_specs: Iterable[ArraySpec] ) -> tuple[ @@ -246,12 +424,57 @@ async def encode_partial_batch( assert isinstance(self.array_bytes_codec, ArrayBytesCodecPartialEncodeMixin) await self.array_bytes_codec.encode_partial(batch_info) + # ------------------------------------------------------------------- + # Top-level decode / encode + # ------------------------------------------------------------------- + + async def decode( + self, + chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], + ) -> Iterable[NDBuffer | None]: + items = list(chunk_bytes_and_specs) + if not items: + return [] + + if self._all_sync: + # All codecs support sync -- run the full chain inline (no threading). + _, first_spec = items[0] + aa_chain, ab_pair, bb_chain = self._resolve_metadata_chain(first_spec) + return [ + self._decode_one(chunk_bytes, chunk_spec, aa_chain, ab_pair, bb_chain) + for chunk_bytes, chunk_spec in items + ] + + # Async fallback: layer-by-layer across all chunks. + return list(await self.decode_batch(items)) + + async def encode( + self, + chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], + ) -> Iterable[Buffer | None]: + items = list(chunk_arrays_and_specs) + if not items: + return [] + + if self._all_sync: + # All codecs support sync -- run the full chain inline (no threading). + return [self._encode_one(chunk_array, chunk_spec) for chunk_array, chunk_spec in items] + + # Async fallback: layer-by-layer across all chunks. + return list(await self.encode_batch(items)) + + # ------------------------------------------------------------------- + # Async read / write (IO overlap via concurrent_map) + # ------------------------------------------------------------------- + async def read_batch( self, batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], out: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: + batch_info = list(batch_info) + if self.supports_partial_decode: chunk_array_batch = await self.decode_partial_batch( [ @@ -266,23 +489,28 @@ async def read_batch( out[out_selection] = chunk_array else: out[out_selection] = fill_value_or_default(chunk_spec) - else: - chunk_bytes_batch = await concurrent_map( - [(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info], - lambda byte_getter, prototype: byte_getter.get(prototype), - config.get("async.concurrency"), - ) - chunk_array_batch = await self.decode_batch( - [ - (chunk_bytes, chunk_spec) - for chunk_bytes, (_, chunk_spec, *_) in zip( - chunk_bytes_batch, batch_info, strict=False - ) - ], - ) - for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip( - chunk_array_batch, batch_info, strict=False - ): + return + + if self._all_sync: + # Streaming per-chunk pipeline: each chunk flows through + # fetch → decode → scatter as a single task. Running N tasks + # concurrently overlaps IO with codec compute. + _, first_spec, *_ = batch_info[0] + aa_chain, ab_pair, bb_chain = self._resolve_metadata_chain(first_spec) + + async def _read_chunk( + byte_getter: ByteGetter, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + out_selection: SelectorTuple, + ) -> None: + # 1) Fetch + chunk_bytes = await byte_getter.get(prototype=chunk_spec.prototype) + + # 2) Decode (full chain, sync) + chunk_array = self._decode_one(chunk_bytes, chunk_spec, aa_chain, ab_pair, bb_chain) + + # 3) Scatter if chunk_array is not None: tmp = chunk_array[chunk_selection] if drop_axes != (): @@ -291,6 +519,61 @@ async def read_batch( else: out[out_selection] = fill_value_or_default(chunk_spec) + await concurrent_map( + [ + (byte_getter, chunk_spec, chunk_selection, out_selection) + for byte_getter, chunk_spec, chunk_selection, out_selection, _ in batch_info + ], + _read_chunk, + config.get("async.concurrency"), + ) + else: + # Async fallback: fetch all → decode all (async codec API) → scatter. + # Used for codecs that don't implement _decode_sync (e.g. numcodecs). + + async def _fetch(byte_getter: ByteGetter, prototype: BufferPrototype) -> Buffer | None: + return await byte_getter.get(prototype=prototype) + + chunk_bytes_batch = await concurrent_map( + [(byte_getter, chunk_spec.prototype) for byte_getter, chunk_spec, *_ in batch_info], + _fetch, + config.get("async.concurrency"), + ) + chunk_array_batch = await self.decode_batch( + zip( + chunk_bytes_batch, + [chunk_spec for _, chunk_spec, *_ in batch_info], + strict=False, + ) + ) + self._scatter(chunk_array_batch, batch_info, out, drop_axes) + + @staticmethod + def _scatter( + chunk_array_batch: Iterable[NDBuffer | None], + batch_info: list[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + out: NDBuffer, + drop_axes: tuple[int, ...], + ) -> None: + for chunk_array, (_, chunk_spec, chunk_selection, out_selection, _) in zip( + chunk_array_batch, batch_info, strict=False + ): + if chunk_array is not None: + tmp = chunk_array[chunk_selection] + if drop_axes != (): + tmp = tmp.squeeze(axis=drop_axes) + out[out_selection] = tmp + else: + out[out_selection] = fill_value_or_default(chunk_spec) + + async def read( + self, + batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + out: NDBuffer, + drop_axes: tuple[int, ...] = (), + ) -> None: + await self.read_batch(batch_info, out, drop_axes) + def _merge_chunk_array( self, existing_chunk_array: NDBuffer | None, @@ -341,6 +624,8 @@ async def write_batch( value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: + batch_info = list(batch_info) + if self.supports_partial_encode: # Pass scalar values as is if len(value.shape) == 0: @@ -357,79 +642,82 @@ async def write_batch( for byte_setter, chunk_spec, chunk_selection, out_selection, _ in batch_info ], ) + return + + if self._all_sync: + # Streaming per-chunk pipeline: each chunk flows through + # read_existing → decode → merge → encode → write as a single + # task. Running N tasks concurrently overlaps IO with compute. + async def _write_chunk( + byte_setter: ByteSetter, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + out_selection: SelectorTuple, + is_complete_chunk: bool, + ) -> None: + # 1) Read existing chunk (for partial writes) + existing_bytes: Buffer | None = None + if not is_complete_chunk: + existing_bytes = await byte_setter.get(prototype=chunk_spec.prototype) + + # 2) Compute: decode existing, merge, encode + chunk_bytes = self._write_chunk_compute( + existing_bytes, + chunk_spec, + chunk_selection, + out_selection, + is_complete_chunk, + value, + drop_axes, + ) - else: - # Read existing bytes if not total slice - async def _read_key( - byte_setter: ByteSetter | None, prototype: BufferPrototype - ) -> Buffer | None: - if byte_setter is None: - return None - return await byte_setter.get(prototype=prototype) + # 3) Write result + if chunk_bytes is _DELETED: + await byte_setter.delete() + else: + await byte_setter.set(chunk_bytes) # type: ignore[arg-type] - chunk_bytes_batch: Iterable[Buffer | None] - chunk_bytes_batch = await concurrent_map( + await concurrent_map( [ - ( - None if is_complete_chunk else byte_setter, - chunk_spec.prototype, - ) - for byte_setter, chunk_spec, chunk_selection, _, is_complete_chunk in batch_info + (byte_setter, chunk_spec, chunk_selection, out_selection, is_complete_chunk) + for byte_setter, chunk_spec, chunk_selection, out_selection, is_complete_chunk in batch_info ], - _read_key, + _write_chunk, config.get("async.concurrency"), ) - chunk_array_decoded = await self.decode_batch( + else: + # Async fallback: phased approach for codecs without sync support. + # Phase 1: Fetch existing chunks for partial writes. + + async def _fetch_existing( + byte_setter: ByteSetter, chunk_spec: ArraySpec, is_complete_chunk: bool + ) -> Buffer | None: + if is_complete_chunk: + return None + return await byte_setter.get(prototype=chunk_spec.prototype) + + existing_bytes_list: list[Buffer | None] = await concurrent_map( [ - (chunk_bytes, chunk_spec) - for chunk_bytes, (_, chunk_spec, *_) in zip( - chunk_bytes_batch, batch_info, strict=False - ) + (byte_setter, chunk_spec, is_complete_chunk) + for byte_setter, chunk_spec, _, _, is_complete_chunk in batch_info ], + _fetch_existing, + config.get("async.concurrency"), ) - chunk_array_merged = [ - self._merge_chunk_array( - chunk_array, - value, - out_selection, - chunk_spec, - chunk_selection, - is_complete_chunk, - drop_axes, + # Phase 2: Decode → merge → encode (async codec API). + decode_items: list[tuple[Buffer | None, ArraySpec]] = [ + (existing_bytes if not is_complete_chunk else None, chunk_spec) + for existing_bytes, (_, chunk_spec, _, _, is_complete_chunk) in zip( + existing_bytes_list, batch_info, strict=False ) - for chunk_array, ( - _, - chunk_spec, - chunk_selection, - out_selection, - is_complete_chunk, - ) in zip(chunk_array_decoded, batch_info, strict=False) ] - chunk_array_batch: list[NDBuffer | None] = [] - for chunk_array, (_, chunk_spec, *_) in zip( - chunk_array_merged, batch_info, strict=False - ): - if chunk_array is None: - chunk_array_batch.append(None) # type: ignore[unreachable] - else: - if not chunk_spec.config.write_empty_chunks and chunk_array.all_equal( - fill_value_or_default(chunk_spec) - ): - chunk_array_batch.append(None) - else: - chunk_array_batch.append(chunk_array) - - chunk_bytes_batch = await self.encode_batch( - [ - (chunk_array, chunk_spec) - for chunk_array, (_, chunk_spec, *_) in zip( - chunk_array_batch, batch_info, strict=False - ) - ], + encoded_list = await self._write_batch_compute( + decode_items, batch_info, value, drop_axes ) - async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> None: + # Phase 3: Write encoded chunks to store. + async def _write_out(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> None: if chunk_bytes is None: await byte_setter.delete() else: @@ -438,61 +726,262 @@ async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> Non await concurrent_map( [ (byte_setter, chunk_bytes) - for chunk_bytes, (byte_setter, *_) in zip( - chunk_bytes_batch, batch_info, strict=False + for (byte_setter, *_), chunk_bytes in zip( + batch_info, encoded_list, strict=False ) ], - _write_key, + _write_out, config.get("async.concurrency"), ) - async def decode( + async def _write_batch_compute( self, - chunk_bytes_and_specs: Iterable[tuple[Buffer | None, ArraySpec]], - ) -> Iterable[NDBuffer | None]: - output: list[NDBuffer | None] = [] - for batch_info in batched(chunk_bytes_and_specs, self.batch_size): - output.extend(await self.decode_batch(batch_info)) - return output + decode_items: list[tuple[Buffer | None, ArraySpec]], + batch_info: list[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + value: NDBuffer, + drop_axes: tuple[int, ...], + ) -> list[Buffer | None]: + chunk_array_decoded: Iterable[NDBuffer | None] = await self.decode(decode_items) - async def encode( + chunk_array_batch = self._merge_and_filter( + chunk_array_decoded, batch_info, value, drop_axes + ) + + encoded_batch: Iterable[Buffer | None] = await self.encode( + [ + (chunk_array, chunk_spec) + for chunk_array, (_, chunk_spec, *_) in zip( + chunk_array_batch, batch_info, strict=False + ) + ] + ) + return list(encoded_batch) + + def _merge_and_filter( self, - chunk_arrays_and_specs: Iterable[tuple[NDBuffer | None, ArraySpec]], - ) -> Iterable[Buffer | None]: - output: list[Buffer | None] = [] - for single_batch_info in batched(chunk_arrays_and_specs, self.batch_size): - output.extend(await self.encode_batch(single_batch_info)) - return output + chunk_array_decoded: Iterable[NDBuffer | None], + batch_info: list[tuple[Any, ArraySpec, SelectorTuple, SelectorTuple, bool]], + value: NDBuffer, + drop_axes: tuple[int, ...], + ) -> list[NDBuffer | None]: + chunk_array_merged = [ + self._merge_chunk_array( + chunk_array, + value, + out_selection, + chunk_spec, + chunk_selection, + is_complete_chunk, + drop_axes, + ) + for chunk_array, ( + _, + chunk_spec, + chunk_selection, + out_selection, + is_complete_chunk, + ) in zip(chunk_array_decoded, batch_info, strict=False) + ] + chunk_array_batch: list[NDBuffer | None] = [] + for chunk_array, (_, chunk_spec, *_) in zip(chunk_array_merged, batch_info, strict=False): + if chunk_array is None: + chunk_array_batch.append(None) # type: ignore[unreachable] + else: + if not chunk_spec.config.write_empty_chunks and chunk_array.all_equal( + fill_value_or_default(chunk_spec) + ): + chunk_array_batch.append(None) + else: + chunk_array_batch.append(chunk_array) + return chunk_array_batch - async def read( + async def write( self, - batch_info: Iterable[tuple[ByteGetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + value: NDBuffer, + drop_axes: tuple[int, ...] = (), + ) -> None: + await self.write_batch(batch_info, value, drop_axes) + + # ------------------------------------------------------------------- + # Fully synchronous read / write (no event loop) + # ------------------------------------------------------------------- + + @property + def supports_sync_io(self) -> bool: + return self._all_sync + + def read_sync( + self, + batch_info: Iterable[tuple[Any, ArraySpec, SelectorTuple, SelectorTuple, bool]], out: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: - await concurrent_map( - [ - (single_batch_info, out, drop_axes) - for single_batch_info in batched(batch_info, self.batch_size) - ], - self.read_batch, - config.get("async.concurrency"), + batch_info_list = list(batch_info) + if not batch_info_list: + return + + if self.supports_partial_decode: + ab_codec: Any = self.array_bytes_codec + for byte_getter, chunk_spec, chunk_selection, out_selection, _ in batch_info_list: + chunk_array: NDBuffer | None = ab_codec._decode_partial_sync( + byte_getter, chunk_selection, chunk_spec + ) + if chunk_array is not None: + out[out_selection] = chunk_array + else: + out[out_selection] = fill_value_or_default(chunk_spec) + return + + _, first_spec, *_ = batch_info_list[0] + aa_chain, ab_pair, bb_chain = self._resolve_metadata_chain(first_spec) + + # Phase 1: IO — fetch all chunk bytes sequentially. + chunk_bytes_list: list[Buffer | None] = [ + byte_getter.get_sync(prototype=chunk_spec.prototype) + for byte_getter, chunk_spec, *_ in batch_info_list + ] + + # Phase 2: Decode — run the codec chain for each chunk. + chunk_nbytes = product(first_spec.shape) * getattr(first_spec.dtype, "item_size", 1) + n_workers = _choose_workers(len(batch_info_list), chunk_nbytes, self) + if n_workers > 0: + pool = _get_pool() + chunk_arrays: list[NDBuffer | None] = list( + pool.map( + self._decode_one, + chunk_bytes_list, + [chunk_spec for _, chunk_spec, *_ in batch_info_list], + [aa_chain] * len(batch_info_list), + [ab_pair] * len(batch_info_list), + [bb_chain] * len(batch_info_list), + ) + ) + else: + chunk_arrays = [ + self._decode_one(chunk_bytes, chunk_spec, aa_chain, ab_pair, bb_chain) + for chunk_bytes, (_, chunk_spec, *_) in zip( + chunk_bytes_list, batch_info_list, strict=False + ) + ] + + # Phase 3: Scatter decoded chunk data into the output buffer. + self._scatter(chunk_arrays, batch_info_list, out, drop_axes) + + def _write_chunk_compute( + self, + existing_bytes: Buffer | None, + chunk_spec: ArraySpec, + chunk_selection: SelectorTuple, + out_selection: SelectorTuple, + is_complete_chunk: bool, + value: NDBuffer, + drop_axes: tuple[int, ...], + ) -> Buffer | None | object: + """Per-chunk compute for write: decode existing -> merge -> encode.""" + existing_array: NDBuffer | None = None + if existing_bytes is not None: + aa_chain, ab_pair, bb_chain = self._resolve_metadata_chain(chunk_spec) + existing_array = self._decode_one( + existing_bytes, chunk_spec, aa_chain, ab_pair, bb_chain + ) + + chunk_array: NDBuffer | None = self._merge_chunk_array( + existing_array, + value, + out_selection, + chunk_spec, + chunk_selection, + is_complete_chunk, + drop_axes, ) - async def write( + if ( + chunk_array is not None + and not chunk_spec.config.write_empty_chunks + and chunk_array.all_equal(fill_value_or_default(chunk_spec)) + ): + chunk_array = None + + if chunk_array is None: + return _DELETED + chunk_bytes = self._encode_one(chunk_array, chunk_spec) + if chunk_bytes is None: + return _DELETED + return chunk_bytes + + def write_sync( self, - batch_info: Iterable[tuple[ByteSetter, ArraySpec, SelectorTuple, SelectorTuple, bool]], + batch_info: Iterable[tuple[Any, ArraySpec, SelectorTuple, SelectorTuple, bool]], value: NDBuffer, drop_axes: tuple[int, ...] = (), ) -> None: - await concurrent_map( - [ - (single_batch_info, value, drop_axes) - for single_batch_info in batched(batch_info, self.batch_size) - ], - self.write_batch, - config.get("async.concurrency"), - ) + batch_info_list = list(batch_info) + if not batch_info_list: + return + + if self.supports_partial_encode: + ab_codec: Any = self.array_bytes_codec + if len(value.shape) == 0: + for byte_setter, chunk_spec, chunk_selection, _, _ in batch_info_list: + ab_codec._encode_partial_sync(byte_setter, value, chunk_selection, chunk_spec) + else: + for byte_setter, chunk_spec, chunk_selection, out_selection, _ in batch_info_list: + ab_codec._encode_partial_sync( + byte_setter, value[out_selection], chunk_selection, chunk_spec + ) + return + + # Phase 1: IO — read existing chunk bytes for partial writes. + existing_bytes_list: list[Buffer | None] = [ + byte_setter.get_sync(prototype=chunk_spec.prototype) if not is_complete_chunk else None + for byte_setter, chunk_spec, _, _, is_complete_chunk in batch_info_list + ] + + # Phase 2: Compute — decode existing, merge new data, encode. + _, first_spec, *_ = batch_info_list[0] + chunk_nbytes = product(first_spec.shape) * getattr(first_spec.dtype, "item_size", 1) + n_workers = _choose_workers(len(batch_info_list), chunk_nbytes, self) + if n_workers > 0: + pool = _get_pool() + encoded_list: list[Buffer | None | object] = list( + pool.map( + self._write_chunk_compute, + existing_bytes_list, + [chunk_spec for _, chunk_spec, *_ in batch_info_list], + [chunk_selection for _, _, chunk_selection, _, _ in batch_info_list], + [out_selection for _, _, _, out_selection, _ in batch_info_list], + [is_complete for _, _, _, _, is_complete in batch_info_list], + [value] * len(batch_info_list), + [drop_axes] * len(batch_info_list), + ) + ) + else: + encoded_list = [ + self._write_chunk_compute( + existing_bytes, + chunk_spec, + chunk_selection, + out_selection, + is_complete_chunk, + value, + drop_axes, + ) + for existing_bytes, ( + _, + chunk_spec, + chunk_selection, + out_selection, + is_complete_chunk, + ) in zip(existing_bytes_list, batch_info_list, strict=False) + ] + + # Phase 3: IO — write encoded chunks to store. + for encoded, (byte_setter, *_) in zip(encoded_list, batch_info_list, strict=False): + if encoded is _DELETED: + byte_setter.delete_sync() + else: + byte_setter.set_sync(encoded) def codecs_from_list( @@ -500,11 +989,12 @@ def codecs_from_list( ) -> tuple[tuple[ArrayArrayCodec, ...], ArrayBytesCodec, tuple[BytesBytesCodec, ...]]: from zarr.codecs.sharding import ShardingCodec + codecs = list(codecs) array_array: tuple[ArrayArrayCodec, ...] = () array_bytes_maybe: ArrayBytesCodec | None = None bytes_bytes: tuple[BytesBytesCodec, ...] = () - if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(tuple(codecs)) > 1: + if any(isinstance(codec, ShardingCodec) for codec in codecs) and len(codecs) > 1: warn( "Combining a `sharding_indexed` codec disables partial reads and " "writes, which may lead to inefficient performance.", diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index f8f8ea4f5f..f21637c495 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -99,11 +99,13 @@ def enable_gpu(self) -> ConfigSet: "target_shard_size_bytes": None, }, "async": {"concurrency": 10, "timeout": None}, - "threading": {"max_workers": None}, + "threading": { + "max_workers": None, + "codec_workers": {"enabled": True, "min": 0, "max": None}, + }, "json_indent": 2, "codec_pipeline": { "path": "zarr.core.codec_pipeline.BatchedCodecPipeline", - "batch_size": 1, }, "codecs": { "blosc": "zarr.codecs.blosc.BloscCodec", diff --git a/src/zarr/storage/_common.py b/src/zarr/storage/_common.py index 4bea04f024..15a9b7846d 100644 --- a/src/zarr/storage/_common.py +++ b/src/zarr/storage/_common.py @@ -228,6 +228,39 @@ async def is_empty(self) -> bool: """ return await self.store.is_empty(self.path) + # ------------------------------------------------------------------- + # Synchronous IO delegation + # + # StorePath is what gets passed to the codec pipeline as a ByteGetter / + # ByteSetter. The async path uses get() / set() / delete(); the sync + # bypass uses these sync variants instead. They simply prepend + # self.path to the key and delegate to the underlying Store's sync + # methods. + # + # Note: These methods satisfy the SyncByteGetter / SyncByteSetter + # protocols (from zarr.abc.store) only when the underlying Store + # also has get_sync / set_sync / delete_sync. Callers check the + # store before invoking these. + # ------------------------------------------------------------------- + + def get_sync( + self, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + """Synchronous read — delegates to ``self.store.get_sync(self.path, ...)``.""" + if prototype is None: + prototype = default_buffer_prototype() + return self.store.get_sync(self.path, prototype=prototype, byte_range=byte_range) # type: ignore[attr-defined, no-any-return] + + def set_sync(self, value: Buffer) -> None: + """Synchronous write — delegates to ``self.store.set_sync(self.path, value)``.""" + self.store.set_sync(self.path, value) # type: ignore[attr-defined] + + def delete_sync(self) -> None: + """Synchronous delete — delegates to ``self.store.delete_sync(self.path)``.""" + self.store.delete_sync(self.path) # type: ignore[attr-defined] + def __truediv__(self, other: str) -> StorePath: """Combine this store path with another path""" return self.__class__(self.store, _dereference_path(self.path, other)) diff --git a/src/zarr/storage/_local.py b/src/zarr/storage/_local.py index 80233a112d..28fea7ca0e 100644 --- a/src/zarr/storage/_local.py +++ b/src/zarr/storage/_local.py @@ -187,6 +187,69 @@ def __repr__(self) -> str: def __eq__(self, other: object) -> bool: return isinstance(other, type(self)) and self.root == other.root + # ------------------------------------------------------------------- + # Synchronous store methods + # + # LocalStore's async get/set wrap the synchronous helpers _get() and + # _put() (defined at module level) in asyncio.to_thread(). These sync + # methods call _get/_put directly, removing the thread-hop overhead. + # + # The open-guard logic is inlined from _open(): create root dir if + # writable, check existence, set _is_open. We can't call the async + # _open() from a sync context, so we replicate its logic here. + # ------------------------------------------------------------------- + + def get_sync( + self, + key: str, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + if prototype is None: + prototype = default_buffer_prototype() + # Inline open guard: mirrors async _open() but without await. + if not self._is_open: + if not self.read_only: + self.root.mkdir(parents=True, exist_ok=True) + if not self.root.exists(): + raise FileNotFoundError(f"{self.root} does not exist") + self._is_open = True + assert isinstance(key, str) + path = self.root / key + try: + # Call _get() directly — the async version wraps this same + # function in asyncio.to_thread(). + return _get(path, prototype, byte_range) + except (FileNotFoundError, IsADirectoryError, NotADirectoryError): + return None + + def set_sync(self, key: str, value: Buffer) -> None: + if not self._is_open: + if not self.read_only: + self.root.mkdir(parents=True, exist_ok=True) + if not self.root.exists(): + raise FileNotFoundError(f"{self.root} does not exist") + self._is_open = True + self._check_writable() + assert isinstance(key, str) + if not isinstance(value, Buffer): + raise TypeError( + f"LocalStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) + path = self.root / key + # Call _put() directly — the async version wraps this in + # asyncio.to_thread(). + _put(path, value) + + def delete_sync(self, key: str) -> None: + self._check_writable() + path = self.root / key + # Same logic as async delete(), but without await. + if path.is_dir(): + shutil.rmtree(path) + else: + path.unlink(missing_ok=True) + async def get( self, key: str, diff --git a/src/zarr/storage/_memory.py b/src/zarr/storage/_memory.py index e6f9b7a512..1cb1da41f1 100644 --- a/src/zarr/storage/_memory.py +++ b/src/zarr/storage/_memory.py @@ -77,6 +77,60 @@ def __eq__(self, other: object) -> bool: and self.read_only == other.read_only ) + # ------------------------------------------------------------------- + # Synchronous store methods + # + # MemoryStore is a thin wrapper around a Python dict. The async get/set + # methods are already synchronous in substance — they just happen to be + # ``async def``. These sync variants let the codec pipeline's read_sync / + # write_sync access the dict directly without going through the event + # loop, eliminating the dominant source of overhead for in-memory arrays. + # + # The logic mirrors the async counterparts exactly, except: + # - We set _is_open = True inline instead of ``await self._open()``, + # since MemoryStore._open() is a no-op beyond setting the flag. + # ------------------------------------------------------------------- + + def get_sync( + self, + key: str, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + if prototype is None: + prototype = default_buffer_prototype() + # Inline open: MemoryStore._open() just sets _is_open = True. + if not self._is_open: + self._is_open = True + assert isinstance(key, str) + try: + # Direct dict lookup — this is what async get() does too, + # but without the event loop round-trip. + value = self._store_dict[key] + start, stop = _normalize_byte_range_index(value, byte_range) + return prototype.buffer.from_buffer(value[start:stop]) + except KeyError: + return None + + def set_sync(self, key: str, value: Buffer) -> None: + self._check_writable() + if not self._is_open: + self._is_open = True + assert isinstance(key, str) + if not isinstance(value, Buffer): + raise TypeError( + f"MemoryStore.set(): `value` must be a Buffer instance. Got an instance of {type(value)} instead." + ) + # Direct dict assignment — no event loop overhead. + self._store_dict[key] = value + + def delete_sync(self, key: str) -> None: + self._check_writable() + try: + del self._store_dict[key] + except KeyError: + logger.debug("Key %s does not exist.", key) + async def get( self, key: str, diff --git a/src/zarr/testing/buffer.py b/src/zarr/testing/buffer.py index 6096ece2f8..abedb07306 100644 --- a/src/zarr/testing/buffer.py +++ b/src/zarr/testing/buffer.py @@ -72,6 +72,11 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None assert isinstance(value, TestBuffer) await super().set(key, value, byte_range) + def set_sync(self, key: str, value: Buffer) -> None: + if "json" not in key: + assert isinstance(value, TestBuffer) + super().set_sync(key, value) + async def get( self, key: str, @@ -84,3 +89,16 @@ async def get( if ret is not None: assert isinstance(ret, prototype.buffer) return ret + + def get_sync( + self, + key: str, + prototype: BufferPrototype | None = None, + byte_range: Any = None, + ) -> Buffer | None: + if "json" not in key and prototype is not None: + assert prototype.buffer is TestBuffer + ret = super().get_sync(key=key, prototype=prototype, byte_range=byte_range) + if ret is not None and prototype is not None: + assert isinstance(ret, prototype.buffer) + return ret diff --git a/tests/package_with_entrypoint/__init__.py b/tests/package_with_entrypoint/__init__.py index 7b5dfb5a1e..7394b2e5c8 100644 --- a/tests/package_with_entrypoint/__init__.py +++ b/tests/package_with_entrypoint/__init__.py @@ -40,7 +40,7 @@ def compute_encoded_size(self, input_byte_length: int, chunk_spec: ArraySpec) -> class TestEntrypointCodecPipeline(CodecPipeline): - def __init__(self, batch_size: int = 1) -> None: + def __init__(self) -> None: pass async def encode( diff --git a/tests/test_config.py b/tests/test_config.py index c3102e8efe..ba74140b75 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -56,11 +56,13 @@ def test_config_defaults_set() -> None: "target_shard_size_bytes": None, }, "async": {"concurrency": 10, "timeout": None}, - "threading": {"max_workers": None}, + "threading": { + "max_workers": None, + "codec_workers": {"enabled": True, "min": 0, "max": None}, + }, "json_indent": 2, "codec_pipeline": { "path": "zarr.core.codec_pipeline.BatchedCodecPipeline", - "batch_size": 1, }, "codecs": { "blosc": "zarr.codecs.blosc.BloscCodec", @@ -103,7 +105,6 @@ def test_config_defaults_set() -> None: assert config.get("array.order") == "C" assert config.get("async.concurrency") == 10 assert config.get("async.timeout") is None - assert config.get("codec_pipeline.batch_size") == 1 assert config.get("json_indent") == 2 @@ -132,7 +133,7 @@ def test_config_codec_pipeline_class(store: Store) -> None: # has default value assert get_pipeline_class().__name__ != "" - config.set({"codec_pipeline.name": "zarr.core.codec_pipeline.BatchedCodecPipeline"}) + config.set({"codec_pipeline.path": "zarr.core.codec_pipeline.BatchedCodecPipeline"}) assert get_pipeline_class() == zarr.core.codec_pipeline.BatchedCodecPipeline _mock = Mock() @@ -146,6 +147,14 @@ async def write( ) -> None: _mock.call() + def write_sync( + self, + batch_info: Any, + value: NDBuffer, + drop_axes: tuple[int, ...] = (), + ) -> None: + _mock.call() + register_pipeline(MockCodecPipeline) config.set({"codec_pipeline.path": fully_qualified_name(MockCodecPipeline)}) @@ -191,6 +200,10 @@ async def _encode_single(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Bu _mock.call() return None + def _encode_sync(self, chunk_bytes: Buffer, chunk_spec: ArraySpec) -> Buffer | None: + _mock.call() + return None + register_codec("blosc", MockBloscCodec) with config.set({"codecs.blosc": fully_qualified_name(MockBloscCodec)}): assert get_codec_class("blosc") == MockBloscCodec diff --git a/tests/test_indexing.py b/tests/test_indexing.py index c0bf7dd270..ac54cd0cb4 100644 --- a/tests/test_indexing.py +++ b/tests/test_indexing.py @@ -34,6 +34,7 @@ if TYPE_CHECKING: from collections.abc import AsyncGenerator + from zarr.abc.store import ByteRequest from zarr.core.buffer import BufferPrototype from zarr.core.buffer.core import Buffer @@ -83,6 +84,21 @@ async def set(self, key: str, value: Buffer, byte_range: tuple[int, int] | None self.counter["__setitem__", key_suffix] += 1 return await super().set(key, value, byte_range) + def get_sync( + self, + key: str, + prototype: BufferPrototype | None = None, + byte_range: ByteRequest | None = None, + ) -> Buffer | None: + key_suffix = "/".join(key.split("/")[1:]) + self.counter["__getitem__", key_suffix] += 1 + return super().get_sync(key, prototype, byte_range) + + def set_sync(self, key: str, value: Buffer) -> None: + key_suffix = "/".join(key.split("/")[1:]) + self.counter["__setitem__", key_suffix] += 1 + return super().set_sync(key, value) + def test_normalize_integer_selection() -> None: assert 1 == normalize_integer_selection(1, 100) diff --git a/tests/test_sync_codec_pipeline.py b/tests/test_sync_codec_pipeline.py new file mode 100644 index 0000000000..8fac3d54f6 --- /dev/null +++ b/tests/test_sync_codec_pipeline.py @@ -0,0 +1,305 @@ +"""Tests for sync codec capabilities in BatchedCodecPipeline.""" + +from __future__ import annotations + +from typing import Any + +import numpy as np +import pytest + +import zarr +from zarr.abc.codec import SupportsSyncCodec +from zarr.codecs.bytes import BytesCodec +from zarr.codecs.gzip import GzipCodec +from zarr.codecs.transpose import TransposeCodec +from zarr.codecs.zstd import ZstdCodec +from zarr.core.array_spec import ArrayConfig, ArraySpec +from zarr.core.buffer import default_buffer_prototype +from zarr.core.codec_pipeline import BatchedCodecPipeline +from zarr.core.dtype import get_data_type_from_native_dtype +from zarr.storage import MemoryStore + + +def _make_array_spec(shape: tuple[int, ...], dtype: np.dtype[Any]) -> ArraySpec: + zdtype = get_data_type_from_native_dtype(dtype) + return ArraySpec( + shape=shape, + dtype=zdtype, + fill_value=zdtype.default_scalar(), + config=ArrayConfig(order="C", write_empty_chunks=True), + prototype=default_buffer_prototype(), + ) + + +def _make_nd_buffer(arr: np.ndarray[Any, Any]) -> zarr.core.buffer.NDBuffer: + return default_buffer_prototype().nd_buffer.from_numpy_array(arr) + + +# --------------------------------------------------------------------------- +# Unit tests: SupportsSyncCodec protocol +# --------------------------------------------------------------------------- + + +class TestSupportsSync: + def test_gzip_supports_sync(self) -> None: + assert isinstance(GzipCodec(), SupportsSyncCodec) + + def test_zstd_supports_sync(self) -> None: + assert isinstance(ZstdCodec(), SupportsSyncCodec) + + def test_bytes_supports_sync(self) -> None: + assert isinstance(BytesCodec(), SupportsSyncCodec) + + def test_transpose_supports_sync(self) -> None: + assert isinstance(TransposeCodec(order=(0, 1)), SupportsSyncCodec) + + def test_sharding_supports_sync(self) -> None: + from zarr.codecs.sharding import ShardingCodec + + assert isinstance(ShardingCodec(chunk_shape=(8,)), SupportsSyncCodec) + + +# --------------------------------------------------------------------------- +# Unit tests: individual codec sync roundtrips +# --------------------------------------------------------------------------- + + +class TestGzipCodecSync: + def test_roundtrip(self) -> None: + codec = GzipCodec(level=1) + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) + + +class TestZstdCodecSync: + def test_roundtrip(self) -> None: + codec = ZstdCodec(level=1) + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + buf = default_buffer_prototype().buffer.from_array_like(arr.view("B")) + + encoded = codec._encode_sync(buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + result = np.frombuffer(decoded.as_numpy_array(), dtype="float64") + np.testing.assert_array_equal(arr, result) + + +class TestBytesCodecSync: + def test_roundtrip(self) -> None: + codec = BytesCodec() + arr = np.arange(100, dtype="float64") + spec = _make_array_spec(arr.shape, arr.dtype) + nd_buf = _make_nd_buffer(arr) + + # Evolve from array spec (handles endianness) + codec = codec.evolve_from_array_spec(spec) + + encoded = codec._encode_sync(nd_buf, spec) + assert encoded is not None + decoded = codec._decode_sync(encoded, spec) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + +class TestTransposeCodecSync: + def test_roundtrip(self) -> None: + codec = TransposeCodec(order=(1, 0)) + arr = np.arange(12, dtype="float64").reshape(3, 4) + spec = _make_array_spec(arr.shape, arr.dtype) + nd_buf = _make_nd_buffer(arr) + + encoded = codec._encode_sync(nd_buf, spec) + assert encoded is not None + resolved_spec = codec.resolve_metadata(spec) + decoded = codec._decode_sync(encoded, resolved_spec) + np.testing.assert_array_equal(arr, decoded.as_numpy_array()) + + +# --------------------------------------------------------------------------- +# Unit tests: pipeline construction +# --------------------------------------------------------------------------- + + +class TestPipelineConstruction: + def test_from_codecs_valid(self) -> None: + pipeline = BatchedCodecPipeline.from_codecs([BytesCodec(), GzipCodec(level=1)]) + assert isinstance(pipeline, BatchedCodecPipeline) + assert len(pipeline.bytes_bytes_codecs) == 1 + assert isinstance(pipeline.array_bytes_codec, BytesCodec) + + def test_from_codecs_accepts_sharding(self) -> None: + from zarr.codecs.sharding import ShardingCodec + + pipeline = BatchedCodecPipeline.from_codecs([ShardingCodec(chunk_shape=(8,))]) + assert isinstance(pipeline, BatchedCodecPipeline) + assert pipeline._all_sync + + def test_from_codecs_rejects_missing_array_bytes(self) -> None: + with pytest.raises(ValueError, match="Required ArrayBytesCodec"): + BatchedCodecPipeline.from_codecs([GzipCodec()]) + + def test_from_codecs_with_transpose(self) -> None: + pipeline = BatchedCodecPipeline.from_codecs( + [ + TransposeCodec(order=(1, 0)), + BytesCodec(), + GzipCodec(level=1), + ] + ) + assert len(pipeline.array_array_codecs) == 1 + assert isinstance(pipeline.array_array_codecs[0], TransposeCodec) + + +# --------------------------------------------------------------------------- +# Unit tests: pipeline encode/decode roundtrip +# --------------------------------------------------------------------------- + + +class TestPipelineRoundtrip: + @pytest.mark.asyncio + async def test_encode_decode_single_chunk(self) -> None: + pipeline = BatchedCodecPipeline.from_codecs([BytesCodec(), GzipCodec(level=1)]) + arr = np.random.default_rng(42).standard_normal((32, 32)).astype("float64") + spec = _make_array_spec(arr.shape, arr.dtype) + pipeline = pipeline.evolve_from_array_spec(spec) + nd_buf = _make_nd_buffer(arr) + + encoded = await pipeline.encode([(nd_buf, spec)]) + decoded = await pipeline.decode([(next(iter(encoded)), spec)]) + result = next(iter(decoded)) + assert result is not None + np.testing.assert_array_equal(arr, result.as_numpy_array()) + + @pytest.mark.asyncio + async def test_encode_decode_multiple_chunks(self) -> None: + pipeline = BatchedCodecPipeline.from_codecs([BytesCodec(), GzipCodec(level=1)]) + rng = np.random.default_rng(42) + spec = _make_array_spec((16, 16), np.dtype("float64")) + pipeline = pipeline.evolve_from_array_spec(spec) + chunks = [rng.standard_normal((16, 16)).astype("float64") for _ in range(10)] + nd_bufs = [_make_nd_buffer(c) for c in chunks] + + encoded = list(await pipeline.encode([(buf, spec) for buf in nd_bufs])) + decoded = list(await pipeline.decode([(enc, spec) for enc in encoded])) + for original, dec in zip(chunks, decoded, strict=False): + assert dec is not None + np.testing.assert_array_equal(original, dec.as_numpy_array()) + + @pytest.mark.asyncio + async def test_encode_decode_empty_batch(self) -> None: + pipeline = BatchedCodecPipeline.from_codecs([BytesCodec(), GzipCodec(level=1)]) + encoded = await pipeline.encode([]) + assert list(encoded) == [] + decoded = await pipeline.decode([]) + assert list(decoded) == [] + + @pytest.mark.asyncio + async def test_encode_decode_none_chunk(self) -> None: + pipeline = BatchedCodecPipeline.from_codecs([BytesCodec(), GzipCodec(level=1)]) + spec = _make_array_spec((8,), np.dtype("float64")) + pipeline = pipeline.evolve_from_array_spec(spec) + + encoded = list(await pipeline.encode([(None, spec)])) + assert encoded[0] is None + + decoded = list(await pipeline.decode([(None, spec)])) + assert decoded[0] is None + + +# --------------------------------------------------------------------------- +# Integration tests: default pipeline has sync capabilities +# --------------------------------------------------------------------------- + + +class TestDefaultPipelineSync: + def test_create_array_uses_batched_pipeline(self) -> None: + store = MemoryStore() + arr = zarr.create_array( + store, + shape=(100, 100), + chunks=(32, 32), + dtype="float64", + ) + assert isinstance(arr.async_array.codec_pipeline, BatchedCodecPipeline) + + data = np.random.default_rng(42).standard_normal((100, 100)) + arr[:] = data + np.testing.assert_array_equal(arr[:], data) + + def test_open_uses_batched_pipeline(self) -> None: + store = MemoryStore() + arr = zarr.create_array( + store, + shape=(50, 50), + chunks=(25, 25), + dtype="float64", + ) + data = np.random.default_rng(42).standard_normal((50, 50)) + arr[:] = data + + arr2 = zarr.open_array(store=store) + assert isinstance(arr2.async_array.codec_pipeline, BatchedCodecPipeline) + np.testing.assert_array_equal(arr2[:], data) + + def test_from_array_uses_batched_pipeline(self) -> None: + store1 = MemoryStore() + arr1 = zarr.create_array( + store1, + shape=(20, 20), + chunks=(10, 10), + dtype="float64", + ) + data = np.random.default_rng(42).standard_normal((20, 20)) + arr1[:] = data + + store2 = MemoryStore() + arr2 = zarr.from_array(store2, data=arr1) + assert isinstance(arr2.async_array.codec_pipeline, BatchedCodecPipeline) + np.testing.assert_array_equal(arr2[:], data) + + def test_partial_write(self) -> None: + store = MemoryStore() + arr = zarr.create_array( + store, + shape=(100,), + chunks=(10,), + dtype="int32", + fill_value=0, + ) + arr[5:15] = np.arange(10, dtype="int32") + 1 + result = arr[:] + expected = np.zeros(100, dtype="int32") + expected[5:15] = np.arange(10, dtype="int32") + 1 + np.testing.assert_array_equal(result, expected) + + def test_zstd_codec(self) -> None: + store = MemoryStore() + arr = zarr.create_array( + store, + shape=(50,), + chunks=(10,), + dtype="float32", + compressors=ZstdCodec(level=3), + ) + data = np.random.default_rng(42).standard_normal(50).astype("float32") + arr[:] = data + np.testing.assert_array_equal(arr[:], data) + + def test_supports_sync_io(self) -> None: + """Default pipeline supports sync IO when all codecs are sync.""" + pipeline = BatchedCodecPipeline.from_codecs([BytesCodec(), GzipCodec(level=1)]) + assert pipeline.supports_sync_io + + def test_supports_sync_io_default(self) -> None: + """Default BatchedCodecPipeline is the sync pipeline — no config switch needed.""" + store = MemoryStore() + arr = zarr.create_array(store, shape=(10,), dtype="float64") + assert isinstance(arr.async_array.codec_pipeline, BatchedCodecPipeline) + assert arr.async_array.codec_pipeline.supports_sync_io