From fa750607ed07e56dcc09ca3cfc312a14cdfbb6f7 Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Fri, 27 Feb 2026 10:37:03 -0500 Subject: [PATCH 1/2] Add nvCOMP-backed GPU decode for Blosc zstd --- docs/user-guide/gpu.md | 11 +- pyproject.toml | 1 + src/zarr/codecs/__init__.py | 2 + src/zarr/codecs/gpu.py | 220 +++++++++++++++++++++++++ src/zarr/core/config.py | 6 +- tests/test_codecs/test_nvcomp_blosc.py | 96 +++++++++++ tests/test_config.py | 8 + 7 files changed, 337 insertions(+), 7 deletions(-) create mode 100644 src/zarr/codecs/gpu.py create mode 100644 tests/test_codecs/test_nvcomp_blosc.py diff --git a/docs/user-guide/gpu.md b/docs/user-guide/gpu.md index 3317bdf065..791c2094a7 100644 --- a/docs/user-guide/gpu.md +++ b/docs/user-guide/gpu.md @@ -3,13 +3,12 @@ Zarr can use GPUs to accelerate your workload by running `zarr.Config.enable_gpu`. !!! note - `zarr-python` currently supports reading the ndarray data into device (GPU) - memory as the final stage of the codec pipeline. Data will still be read into - or copied to host (CPU) memory for encoding and decoding. + GPU codec acceleration is currently available for the Blosc codec when all of + the following hold: - In the future, codecs will be available compressing and decompressing data on - the GPU, avoiding the need to move data between the host and device for - compression and decompression. + - `cname="zstd"` + - `shuffle` is `bitshuffle` or `noshuffle` + - array dtype is `float32` ## Reading data into device memory diff --git a/pyproject.toml b/pyproject.toml index 068caa1f0d..70812cb5ed 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -68,6 +68,7 @@ remote = [ ] gpu = [ "cupy-cuda12x", + "nvidia-nvcomp-cu12", ] cli = ["typer"] # Testing extras diff --git a/src/zarr/codecs/__init__.py b/src/zarr/codecs/__init__.py index 4c621290e7..192ddd7efe 100644 --- a/src/zarr/codecs/__init__.py +++ b/src/zarr/codecs/__init__.py @@ -3,6 +3,7 @@ from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle from zarr.codecs.bytes import BytesCodec, Endian from zarr.codecs.crc32c_ import Crc32cCodec +from zarr.codecs.gpu import NvcompBloscCodec from zarr.codecs.gzip import GzipCodec from zarr.codecs.numcodecs import ( BZ2, @@ -41,6 +42,7 @@ "Crc32cCodec", "Endian", "GzipCodec", + "NvcompBloscCodec", "ShardingCodec", "ShardingCodecIndexLocation", "TransposeCodec", diff --git a/src/zarr/codecs/gpu.py b/src/zarr/codecs/gpu.py new file mode 100644 index 0000000000..b0fa95209a --- /dev/null +++ b/src/zarr/codecs/gpu.py @@ -0,0 +1,220 @@ +from __future__ import annotations + +import struct +from functools import cached_property +from typing import TYPE_CHECKING + +import numpy as np + +from zarr.codecs.blosc import BloscCodec +from zarr.registry import register_codec + +if TYPE_CHECKING: + from zarr.core.array_spec import ArraySpec + from zarr.core.buffer import Buffer + +try: + import cupy as cp +except ImportError: # pragma: no cover + cp = None + +try: + from nvidia import nvcomp +except ImportError: # pragma: no cover + nvcomp = None + +_BLOSC_MAX_OVERHEAD = 16 +_BLOSC_FLAG_DOSHUFFLE = 0x01 +_BLOSC_FLAG_MEMCPYED = 0x02 +_BLOSC_FLAG_DOBITSHUFFLE = 0x04 +_BLOSC_FLAG_DONT_SPLIT = 0x10 +_BLOSC_COMPFORMAT_ZSTD = 4 +_BLOSC_MIN_BUFFERSIZE = 128 +_BLOSC_MAX_SPLITS = 16 + + +def _read_i32(source: "cp.ndarray", offset: int) -> int: + raw = cp.asnumpy(source[offset : offset + 4]).tobytes() + return int(struct.unpack(" nvcomp.Codec: + assert cp is not None + assert nvcomp is not None + device = cp.cuda.Device() + stream = cp.cuda.get_current_stream() + return nvcomp.Codec( + algorithm="Zstd", + bitstream_kind=nvcomp.BitstreamKind.RAW, + device_id=device.id, + cuda_stream=stream.ptr, + ) + + @staticmethod + def _ensure_supported_dtype(chunk_spec: ArraySpec) -> None: + dtype = np.dtype(chunk_spec.dtype.to_native_dtype()) + if dtype != np.dtype("float32"): + raise ValueError( + "NvcompBloscCodec only supports float32 for GPU decode. " + f"Got dtype={dtype!s}." + ) + + @staticmethod + def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.ndarray": + bits = cp.unpackbits(src, bitorder="little") + matrix = bits.reshape((typesize * 8, n_elements)) + out_bits = matrix.T.reshape((-1,)) + return cp.packbits(out_bits, bitorder="little") + + async def _decode_single( + self, + chunk_bytes: Buffer, + chunk_spec: ArraySpec, + ) -> Buffer: + source = chunk_bytes.as_array_like() + if cp is None or not isinstance(source, cp.ndarray): + return await super()._decode_single(chunk_bytes, chunk_spec) + if nvcomp is None: + raise RuntimeError( + "NvcompBloscCodec requires `nvidia-nvcomp-cu12` to decode Blosc zstd chunks on GPU." + ) + + if source.size < _BLOSC_MAX_OVERHEAD: + raise ValueError( + f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}." + ) + + header = cp.asnumpy(source[:_BLOSC_MAX_OVERHEAD]).tobytes() + _, _, flags, typesize, nbytes, blocksize, cbytes = struct.unpack("> 5 + + self._ensure_supported_dtype(chunk_spec) + + if do_shuffle: + raise ValueError("NvcompBloscCodec does not support byte-shuffle Blosc chunks.") + if compformat != _BLOSC_COMPFORMAT_ZSTD: + raise ValueError( + "NvcompBloscCodec only supports Blosc chunks with cname='zstd'. " + f"Got compformat={compformat}." + ) + + if is_memcpyed: + if cbytes < _BLOSC_MAX_OVERHEAD + nbytes: + raise ValueError( + "Invalid memcpyed Blosc payload: missing raw bytes after header." + ) + raw = source[_BLOSC_MAX_OVERHEAD : _BLOSC_MAX_OVERHEAD + nbytes] + return chunk_spec.prototype.buffer.from_array_like(raw.copy()) + + if blocksize == 0: + raise ValueError("Invalid Blosc payload: blocksize must be > 0.") + if nbytes % typesize != 0: + raise ValueError( + f"Invalid Blosc payload: nbytes={nbytes} is not divisible by typesize={typesize}." + ) + + leftover = nbytes % blocksize + nblocks = nbytes // blocksize + (1 if leftover else 0) + + bstarts_offset = _BLOSC_MAX_OVERHEAD + bstarts_size = nblocks * 4 + bstarts_end = bstarts_offset + bstarts_size + if cbytes < bstarts_end: + raise ValueError("Invalid Blosc payload: missing block-start table.") + + bstarts = np.frombuffer( + cp.asnumpy(source[bstarts_offset:bstarts_end]).tobytes(), + dtype=" 0) else blocksize + leftover_block = bi == nblocks - 1 and leftover > 0 + + if ( + (not dont_split) + and (typesize <= _BLOSC_MAX_SPLITS) + and ((blocksize // typesize) >= _BLOSC_MIN_BUFFERSIZE) + and (not leftover_block) + ): + nsplits = typesize + else: + nsplits = 1 + + if bsize % nsplits != 0: + raise ValueError( + f"Invalid Blosc payload: blocksize {bsize} not divisible by nsplits {nsplits}." + ) + neblock = bsize // nsplits + + block_tmp = cp.empty((bsize,), dtype=cp.uint8) + decode_inputs: list[cp.ndarray] = [] + decode_targets: list[tuple[int, int]] = [] + + p = int(bstarts[bi]) + for si in range(nsplits): + if p < 0 or (p + 4) > cbytes: + raise ValueError("Invalid Blosc payload: split header outside payload.") + csize = _read_i32(source, p) + p += 4 + if csize < 0 or (p + csize) > cbytes: + raise ValueError("Invalid Blosc payload: split data outside payload.") + split = source[p : p + csize] + p += csize + + start = si * neblock + end = start + neblock + if csize == neblock: + block_tmp[start:end] = split + else: + decode_inputs.append(split) + decode_targets.append((start, end)) + + if decode_inputs: + decoded = self._zstd_codec.decode(nvcomp.as_arrays(decode_inputs)) + cp.cuda.get_current_stream().synchronize() + for dec, (start, end) in zip(decoded, decode_targets, strict=True): + block_tmp[start:end] = cp.asarray(dec, dtype=cp.uint8) + + if do_bitshuffle: + if bsize % typesize != 0: + raise ValueError( + f"Invalid bitshuffle payload: block bytes {bsize} not divisible by typesize {typesize}." + ) + n_elements = bsize // typesize + if n_elements % 8 != 0: + raise ValueError( + "NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8." + ) + block_out = self._bitunshuffle( + block_tmp, + typesize=typesize, + n_elements=n_elements, + ) + else: + block_out = block_tmp + + block_start = bi * blocksize + output[block_start : block_start + bsize] = block_out + + return chunk_spec.prototype.buffer.from_array_like(output) + + +register_codec("blosc", NvcompBloscCodec, qualname="zarr.codecs.gpu.NvcompBloscCodec") diff --git a/src/zarr/core/config.py b/src/zarr/core/config.py index f8f8ea4f5f..ac89bdbef1 100644 --- a/src/zarr/core/config.py +++ b/src/zarr/core/config.py @@ -64,7 +64,11 @@ def enable_gpu(self) -> ConfigSet: Configure Zarr to use GPUs where possible. """ return self.set( - {"buffer": "zarr.buffer.gpu.Buffer", "ndbuffer": "zarr.buffer.gpu.NDBuffer"} + { + "buffer": "zarr.buffer.gpu.Buffer", + "ndbuffer": "zarr.buffer.gpu.NDBuffer", + "codecs": {"blosc": "zarr.codecs.gpu.NvcompBloscCodec"}, + } ) diff --git a/tests/test_codecs/test_nvcomp_blosc.py b/tests/test_codecs/test_nvcomp_blosc.py new file mode 100644 index 0000000000..12eb47125c --- /dev/null +++ b/tests/test_codecs/test_nvcomp_blosc.py @@ -0,0 +1,96 @@ +from __future__ import annotations + +import warnings + +import numpy as np +import pytest + +import zarr +from zarr.codecs.blosc import BloscCodec +from zarr.errors import ZarrUserWarning +from zarr.testing.utils import gpu_test + + +@gpu_test +@pytest.mark.parametrize("shuffle", ["bitshuffle", "noshuffle"]) +def test_nvcomp_blosc_decode_supported(shuffle: str) -> None: + import cupy as cp + + src = np.arange(256, dtype=np.float32).reshape(16, 16) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle=shuffle), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + out = zr[:, :] + + assert isinstance(out, cp.ndarray) + cp.testing.assert_array_equal(out, cp.asarray(src)) + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_byte_shuffle() -> None: + src = np.arange(64, dtype=np.float32).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="shuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="byte-shuffle"): + _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_non_float32() -> None: + src = np.arange(64, dtype=np.float64).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="zstd", shuffle="bitshuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="float32"): + _ = zr[:, :] + + +@gpu_test +def test_nvcomp_blosc_decode_raises_on_non_zstd() -> None: + src = np.arange(64, dtype=np.float32).reshape(8, 8) + store = zarr.storage.MemoryStore() + z = zarr.create_array( + store=store, + shape=src.shape, + chunks=(8, 8), + dtype=src.dtype, + compressors=BloscCodec(cname="lz4", shuffle="noshuffle"), + ) + z[:, :] = src + + with zarr.config.enable_gpu(), warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=ZarrUserWarning) + zr = zarr.open_array(store=store, mode="r") + with pytest.raises(ValueError, match="cname='zstd'"): + _ = zr[:, :] diff --git a/tests/test_config.py b/tests/test_config.py index c3102e8efe..6fc993f901 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -291,6 +291,14 @@ def test_config_buffer_backwards_compatibility_gpu() -> None: get_ndbuffer_class() +@pytest.mark.gpu +def test_enable_gpu_sets_gpu_blosc_codec() -> None: + with zarr.config.enable_gpu(): + assert config.get("buffer") == "zarr.buffer.gpu.Buffer" + assert config.get("ndbuffer") == "zarr.buffer.gpu.NDBuffer" + assert config.get("codecs.blosc") == "zarr.codecs.gpu.NvcompBloscCodec" + + @pytest.mark.filterwarnings("error") def test_warning_on_missing_codec_config() -> None: class NewCodec(BytesCodec): From 96e2f752b9483ac36cc891eb7390f8af9556c0d9 Mon Sep 17 00:00:00 2001 From: OA jder bot Date: Fri, 27 Feb 2026 10:45:01 -0500 Subject: [PATCH 2/2] Set hatch-vcs fallback version for downstream xarray compatibility --- pyproject.toml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 70812cb5ed..53f056856d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -155,6 +155,9 @@ omit = [ [tool.hatch] version.source = "vcs" +[tool.hatch.version.raw-options] +fallback_version = "3.1.0" + [tool.hatch.build] hooks.vcs.version-file = "src/zarr/_version.py"