Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/3547.misc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Moved concurrency-limiting functionality to store classes. The global configuration object no longer
controls concurrency limits. Concurrency limits, if applicable, must now be specified when constructing a store.
2 changes: 1 addition & 1 deletion docs/user-guide/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Configuration options include the following:
- Default Zarr format `default_zarr_version`
- Default array order in memory `array.order`
- Whether empty chunks are written to storage `array.write_empty_chunks`
- Async and threading options, e.g. `async.concurrency` and `threading.max_workers`
- Threading options, e.g. `threading.max_workers`
- Selections of implementations of codecs, codec pipelines and buffers
- Enabling GPU support with `zarr.config.enable_gpu()`. See GPU support for more.

Expand Down
40 changes: 21 additions & 19 deletions docs/user-guide/performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -191,20 +191,18 @@ scenarios.
### Concurrent I/O operations

Zarr uses asynchronous I/O internally to enable concurrent reads and writes across multiple chunks.
The level of concurrency is controlled by the `async.concurrency` configuration setting, which
determines the maximum number of concurrent I/O operations.

The default value is 10, which is a conservative value. You may get improved performance by tuning
the concurrency limit. You can adjust this value based on your specific needs:
Concurrency is controlled at the **store level** — each store instance can have its own concurrency
limit, set via the `concurrency_limit` parameter when creating the store.

```python
import zarr

# Set concurrency for the current session
zarr.config.set({'async.concurrency': 128})
# Local filesystem store with custom concurrency limit
store = zarr.storage.LocalStore("data/my_array.zarr", concurrency_limit=64)

# Or use environment variable
# export ZARR_ASYNC_CONCURRENCY=128
# Remote store with higher concurrency for network I/O
from obstore.store import S3Store
store = zarr.storage.ObjectStore(S3Store.from_url("s3://bucket/path"), concurrency_limit=128)
```

Higher concurrency values can improve throughput when:
Expand All @@ -217,32 +215,36 @@ Lower concurrency values may be beneficial when:
- Memory is constrained (each concurrent operation requires buffer space)
- Using Zarr within a parallel computing framework (see below)

Set `concurrency_limit=None` to disable the concurrency limit entirely.

### Using Zarr with Dask

[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and Zarr's concurrency settings.
[Dask](https://www.dask.org/) is a popular parallel computing library that works well with Zarr for processing large arrays. When using Zarr with Dask, it's important to consider the interaction between Dask's thread pool and the store's concurrency limit.

**Important**: When using many Dask threads, you may need to reduce both Zarr's `async.concurrency` and `threading.max_workers` settings to avoid creating too many concurrent operations. The total number of concurrent I/O operations can be roughly estimated as:
**Important**: When using many Dask threads, you may need to reduce the store's `concurrency_limit` and Zarr's `threading.max_workers` setting to avoid creating too many concurrent operations. The total number of concurrent I/O operations can be roughly estimated as:

```
total_concurrency ≈ dask_threads × zarr_async_concurrency
total_concurrency ≈ dask_threads × store_concurrency_limit
```

For example, if you're running Dask with 10 threads and Zarr's default concurrency of 64, you could potentially have up to 640 concurrent operations, which may overwhelm your storage system or cause memory issues.
For example, if you're running Dask with 10 threads and a store concurrency limit of 64, you could potentially have up to 640 concurrent operations, which may overwhelm your storage system or cause memory issues.

**Recommendation**: When using Dask with many threads, configure Zarr's concurrency settings:
**Recommendation**: When using Dask with many threads, configure concurrency settings:

```python
import zarr
import dask.array as da

# If using Dask with many threads (e.g., 8-16), reduce Zarr's concurrency settings
# Create store with reduced concurrency limit for Dask workloads
store = zarr.storage.LocalStore("data/large_array.zarr", concurrency_limit=4)

# Also limit Zarr's internal thread pool
zarr.config.set({
'async.concurrency': 4, # Limit concurrent async operations
'threading.max_workers': 4, # Limit Zarr's internal thread pool
})

# Open Zarr array
z = zarr.open_array('data/large_array.zarr', mode='r')
z = zarr.open_array(store=store, mode='r')

# Create Dask array from Zarr array
arr = da.from_array(z, chunks=z.chunks)
Expand All @@ -253,8 +255,8 @@ result = arr.mean(axis=0).compute()

**Configuration guidelines for Dask workloads**:

- `async.concurrency`: Controls the maximum number of concurrent async I/O operations. Start with a lower value (e.g., 4-8) when using many Dask threads.
- `threading.max_workers`: Controls Zarr's internal thread pool size for blocking operations (defaults to CPU count). Reduce this to avoid thread contention with Dask's scheduler.
- `concurrency_limit` (per-store): Controls the maximum number of concurrent async I/O operations for a given store. Start with a lower value (e.g., 4-8) when using many Dask threads.
- `threading.max_workers` (global config): Controls Zarr's internal thread pool size for blocking operations (defaults to CPU count). Reduce this to avoid thread contention with Dask's scheduler.

You may need to experiment with different values to find the optimal balance for your workload. Monitor your system's resource usage and adjust these settings based on whether your storage system or CPU is the bottleneck.

Expand Down
25 changes: 8 additions & 17 deletions src/zarr/abc/codec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from abc import abstractmethod
from collections.abc import Mapping
from typing import TYPE_CHECKING, Generic, TypeGuard, TypeVar
Expand All @@ -8,8 +9,7 @@

from zarr.abc.metadata import Metadata
from zarr.core.buffer import Buffer, NDBuffer
from zarr.core.common import NamedConfig, concurrent_map
from zarr.core.config import config
from zarr.core.common import NamedConfig

if TYPE_CHECKING:
from collections.abc import Awaitable, Callable, Iterable
Expand Down Expand Up @@ -225,11 +225,8 @@ async def decode_partial(
-------
Iterable[NDBuffer | None]
"""
return await concurrent_map(
list(batch_info),
self._decode_partial_single,
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
return await asyncio.gather(*[self._decode_partial_single(*info) for info in batch_info])


class ArrayBytesCodecPartialEncodeMixin:
Expand Down Expand Up @@ -262,11 +259,8 @@ async def encode_partial(
The ByteSetter is used to write the necessary bytes and fetch bytes for existing chunk data.
The chunk spec contains information about the chunk.
"""
await concurrent_map(
list(batch_info),
self._encode_partial_single,
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
await asyncio.gather(*[self._encode_partial_single(*info) for info in batch_info])


class CodecPipeline:
Expand Down Expand Up @@ -464,11 +458,8 @@ async def _batching_helper(
func: Callable[[CodecInput, ArraySpec], Awaitable[CodecOutput | None]],
batch_info: Iterable[tuple[CodecInput | None, ArraySpec]],
) -> list[CodecOutput | None]:
return await concurrent_map(
list(batch_info),
_noop_for_none(func),
config.get("async.concurrency"),
)
# Store handles concurrency limiting internally
return await asyncio.gather(*[_noop_for_none(func)(chunk, spec) for chunk, spec in batch_info])


def _noop_for_none(
Expand Down
9 changes: 2 additions & 7 deletions src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -670,13 +670,8 @@ async def getsize_prefix(self, prefix: str) -> int:
# improve tail latency and might reduce memory pressure (since not all keys
# would be in memory at once).

# avoid circular import
from zarr.core.common import concurrent_map
from zarr.core.config import config

keys = [(x,) async for x in self.list_prefix(prefix)]
limit = config.get("async.concurrency")
sizes = await concurrent_map(keys, self.getsize, limit=limit)
keys = [x async for x in self.list_prefix(prefix)]
sizes = await asyncio.gather(*[self.getsize(key) for key in keys])
return sum(sizes)


Expand Down
38 changes: 17 additions & 21 deletions src/zarr/core/array.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
import json
import warnings
from asyncio import gather
Expand All @@ -22,7 +23,6 @@
import numpy as np
from typing_extensions import deprecated

import zarr
from zarr.abc.codec import ArrayArrayCodec, ArrayBytesCodec, BytesBytesCodec, Codec
from zarr.abc.numcodec import Numcodec, _is_numcodec
from zarr.codecs._v2 import V2Codec
Expand Down Expand Up @@ -60,7 +60,6 @@
_default_zarr_format,
_warn_order_kwarg,
ceildiv,
concurrent_map,
parse_shapelike,
product,
)
Expand Down Expand Up @@ -4481,28 +4480,26 @@ async def from_array(
if write_data:
if isinstance(data, Array):

async def _copy_array_region(
chunk_coords: tuple[int, ...] | slice, _data: AnyArray
) -> None:
async def _copy_array_region(chunk_coords: tuple[slice, ...], _data: AnyArray) -> None:
arr = await _data.async_array.getitem(chunk_coords)
await result.setitem(chunk_coords, arr)

# Stream data from the source array to the new array
await concurrent_map(
[(region, data) for region in result._iter_shard_regions()],
_copy_array_region,
zarr.core.config.config.get("async.concurrency"),
# Store handles concurrency limiting internally
await asyncio.gather(
*[_copy_array_region(region, data) for region in result._iter_shard_regions()]
)
else:

async def _copy_arraylike_region(chunk_coords: slice, _data: NDArrayLike) -> None:
await result.setitem(chunk_coords, _data[chunk_coords])
async def _copy_arraylike_region(
chunk_coords: tuple[slice, ...], _data: npt.ArrayLike
) -> None:
await result.setitem(chunk_coords, _data[chunk_coords]) # type: ignore[call-overload, index]

# Stream data from the source array to the new array
await concurrent_map(
[(region, data) for region in result._iter_shard_regions()],
_copy_arraylike_region,
zarr.core.config.config.get("async.concurrency"),
# Store handles concurrency limiting internally
await asyncio.gather(
*[_copy_arraylike_region(region, data) for region in result._iter_shard_regions()]
)
return result

Expand Down Expand Up @@ -6001,13 +5998,12 @@ async def _resize(
async def _delete_key(key: str) -> None:
await (array.store_path / key).delete()

await concurrent_map(
[
(array.metadata.encode_chunk_key(chunk_coords),)
# Store handles concurrency limiting internally
await asyncio.gather(
*[
_delete_key(array.metadata.encode_chunk_key(chunk_coords))
for chunk_coords in old_chunk_coords.difference(new_chunk_coords)
],
_delete_key,
zarr_config.get("async.concurrency"),
]
)

# Write new metadata
Expand Down
56 changes: 27 additions & 29 deletions src/zarr/core/codec_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import asyncio
from dataclasses import dataclass
from itertools import islice, pairwise
from typing import TYPE_CHECKING, Any, TypeVar
Expand All @@ -14,7 +15,6 @@
Codec,
CodecPipeline,
)
from zarr.core.common import concurrent_map
from zarr.core.config import config
from zarr.core.indexing import SelectorTuple, is_scalar
from zarr.errors import ZarrUserWarning
Expand Down Expand Up @@ -267,10 +267,12 @@ async def read_batch(
else:
out[out_selection] = fill_value_or_default(chunk_spec)
else:
chunk_bytes_batch = await concurrent_map(
[(byte_getter, array_spec.prototype) for byte_getter, array_spec, *_ in batch_info],
lambda byte_getter, prototype: byte_getter.get(prototype),
config.get("async.concurrency"),
# Store handles concurrency limiting internally
chunk_bytes_batch = await asyncio.gather(
*[
byte_getter.get(array_spec.prototype)
for byte_getter, array_spec, *_ in batch_info
]
)
chunk_array_batch = await self.decode_batch(
[
Expand Down Expand Up @@ -368,16 +370,15 @@ async def _read_key(
return await byte_setter.get(prototype=prototype)

chunk_bytes_batch: Iterable[Buffer | None]
chunk_bytes_batch = await concurrent_map(
[
(
# Store handles concurrency limiting internally
chunk_bytes_batch = await asyncio.gather(
*[
_read_key(
None if is_complete_chunk else byte_setter,
chunk_spec.prototype,
)
for byte_setter, chunk_spec, chunk_selection, _, is_complete_chunk in batch_info
],
_read_key,
config.get("async.concurrency"),
]
)
chunk_array_decoded = await self.decode_batch(
[
Expand Down Expand Up @@ -435,15 +436,14 @@ async def _write_key(byte_setter: ByteSetter, chunk_bytes: Buffer | None) -> Non
else:
await byte_setter.set(chunk_bytes)

await concurrent_map(
[
(byte_setter, chunk_bytes)
# Store handles concurrency limiting internally
await asyncio.gather(
*[
_write_key(byte_setter, chunk_bytes)
for chunk_bytes, (byte_setter, *_) in zip(
chunk_bytes_batch, batch_info, strict=False
)
],
_write_key,
config.get("async.concurrency"),
]
)

async def decode(
Expand All @@ -470,13 +470,12 @@ async def read(
out: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
await concurrent_map(
[
(single_batch_info, out, drop_axes)
# Process mini-batches concurrently - stores handle I/O concurrency internally
await asyncio.gather(
*[
self.read_batch(single_batch_info, out, drop_axes)
for single_batch_info in batched(batch_info, self.batch_size)
],
self.read_batch,
config.get("async.concurrency"),
]
)

async def write(
Expand All @@ -485,13 +484,12 @@ async def write(
value: NDBuffer,
drop_axes: tuple[int, ...] = (),
) -> None:
await concurrent_map(
[
(single_batch_info, value, drop_axes)
# Process mini-batches concurrently - stores handle I/O concurrency internally
await asyncio.gather(
*[
self.write_batch(single_batch_info, value, drop_axes)
for single_batch_info in batched(batch_info, self.batch_size)
],
self.write_batch,
config.get("async.concurrency"),
]
)


Expand Down
Loading