Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 5 additions & 6 deletions docs/user-guide/gpu.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
Zarr can use GPUs to accelerate your workload by running `zarr.Config.enable_gpu`.

!!! note
`zarr-python` currently supports reading the ndarray data into device (GPU)
memory as the final stage of the codec pipeline. Data will still be read into
or copied to host (CPU) memory for encoding and decoding.
GPU codec acceleration is currently available for the Blosc codec when all of
the following hold:

In the future, codecs will be available compressing and decompressing data on
the GPU, avoiding the need to move data between the host and device for
compression and decompression.
- `cname="zstd"`
- `shuffle` is `bitshuffle` or `noshuffle`
- array dtype is `float32`

## Reading data into device memory

Expand Down
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ remote = [
]
gpu = [
"cupy-cuda12x",
"nvidia-nvcomp-cu12",
]
cli = ["typer"]
# Testing extras
Expand Down Expand Up @@ -154,6 +155,9 @@ omit = [
[tool.hatch]
version.source = "vcs"

[tool.hatch.version.raw-options]
fallback_version = "3.1.0"

[tool.hatch.build]
hooks.vcs.version-file = "src/zarr/_version.py"

Expand Down
2 changes: 2 additions & 0 deletions src/zarr/codecs/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from zarr.codecs.blosc import BloscCname, BloscCodec, BloscShuffle
from zarr.codecs.bytes import BytesCodec, Endian
from zarr.codecs.crc32c_ import Crc32cCodec
from zarr.codecs.gpu import NvcompBloscCodec
from zarr.codecs.gzip import GzipCodec
from zarr.codecs.numcodecs import (
BZ2,
Expand Down Expand Up @@ -41,6 +42,7 @@
"Crc32cCodec",
"Endian",
"GzipCodec",
"NvcompBloscCodec",
"ShardingCodec",
"ShardingCodecIndexLocation",
"TransposeCodec",
Expand Down
220 changes: 220 additions & 0 deletions src/zarr/codecs/gpu.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
from __future__ import annotations

import struct
from functools import cached_property
from typing import TYPE_CHECKING

import numpy as np

from zarr.codecs.blosc import BloscCodec
from zarr.registry import register_codec

if TYPE_CHECKING:
from zarr.core.array_spec import ArraySpec
from zarr.core.buffer import Buffer

try:
import cupy as cp
except ImportError: # pragma: no cover
cp = None

try:
from nvidia import nvcomp
except ImportError: # pragma: no cover
nvcomp = None

_BLOSC_MAX_OVERHEAD = 16
_BLOSC_FLAG_DOSHUFFLE = 0x01
_BLOSC_FLAG_MEMCPYED = 0x02
_BLOSC_FLAG_DOBITSHUFFLE = 0x04
_BLOSC_FLAG_DONT_SPLIT = 0x10
_BLOSC_COMPFORMAT_ZSTD = 4
_BLOSC_MIN_BUFFERSIZE = 128
_BLOSC_MAX_SPLITS = 16


def _read_i32(source: "cp.ndarray", offset: int) -> int:
raw = cp.asnumpy(source[offset : offset + 4]).tobytes()
return int(struct.unpack("<i", raw)[0])


class NvcompBloscCodec(BloscCodec):
"""GPU Blosc decoder (zstd + {bitshuffle, noshuffle}) using nvCOMP."""

@cached_property
def _zstd_codec(self) -> nvcomp.Codec:
assert cp is not None
assert nvcomp is not None
device = cp.cuda.Device()
stream = cp.cuda.get_current_stream()
return nvcomp.Codec(
algorithm="Zstd",
bitstream_kind=nvcomp.BitstreamKind.RAW,
device_id=device.id,
cuda_stream=stream.ptr,
)

@staticmethod
def _ensure_supported_dtype(chunk_spec: ArraySpec) -> None:
dtype = np.dtype(chunk_spec.dtype.to_native_dtype())
if dtype != np.dtype("float32"):
raise ValueError(
"NvcompBloscCodec only supports float32 for GPU decode. "
f"Got dtype={dtype!s}."
)

@staticmethod
def _bitunshuffle(src: "cp.ndarray", *, typesize: int, n_elements: int) -> "cp.ndarray":
bits = cp.unpackbits(src, bitorder="little")
matrix = bits.reshape((typesize * 8, n_elements))
out_bits = matrix.T.reshape((-1,))
return cp.packbits(out_bits, bitorder="little")

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
source = chunk_bytes.as_array_like()
if cp is None or not isinstance(source, cp.ndarray):
return await super()._decode_single(chunk_bytes, chunk_spec)
if nvcomp is None:
raise RuntimeError(
"NvcompBloscCodec requires `nvidia-nvcomp-cu12` to decode Blosc zstd chunks on GPU."
)

if source.size < _BLOSC_MAX_OVERHEAD:
raise ValueError(
f"Invalid Blosc payload: expected at least {_BLOSC_MAX_OVERHEAD} bytes, got {source.size}."
)

header = cp.asnumpy(source[:_BLOSC_MAX_OVERHEAD]).tobytes()
_, _, flags, typesize, nbytes, blocksize, cbytes = struct.unpack("<BBBBIII", header)
if source.size < cbytes:
raise ValueError(
f"Invalid Blosc payload: cbytes={cbytes} larger than available bytes={source.size}."
)
source = source[:cbytes]

is_memcpyed = (flags & _BLOSC_FLAG_MEMCPYED) != 0
do_shuffle = (flags & _BLOSC_FLAG_DOSHUFFLE) != 0
do_bitshuffle = (flags & _BLOSC_FLAG_DOBITSHUFFLE) != 0
dont_split = (flags & _BLOSC_FLAG_DONT_SPLIT) != 0
compformat = (flags & 0xE0) >> 5

self._ensure_supported_dtype(chunk_spec)

if do_shuffle:
raise ValueError("NvcompBloscCodec does not support byte-shuffle Blosc chunks.")
if compformat != _BLOSC_COMPFORMAT_ZSTD:
raise ValueError(
"NvcompBloscCodec only supports Blosc chunks with cname='zstd'. "
f"Got compformat={compformat}."
)

if is_memcpyed:
if cbytes < _BLOSC_MAX_OVERHEAD + nbytes:
raise ValueError(
"Invalid memcpyed Blosc payload: missing raw bytes after header."
)
raw = source[_BLOSC_MAX_OVERHEAD : _BLOSC_MAX_OVERHEAD + nbytes]
return chunk_spec.prototype.buffer.from_array_like(raw.copy())

if blocksize == 0:
raise ValueError("Invalid Blosc payload: blocksize must be > 0.")
if nbytes % typesize != 0:
raise ValueError(
f"Invalid Blosc payload: nbytes={nbytes} is not divisible by typesize={typesize}."
)

leftover = nbytes % blocksize
nblocks = nbytes // blocksize + (1 if leftover else 0)

bstarts_offset = _BLOSC_MAX_OVERHEAD
bstarts_size = nblocks * 4
bstarts_end = bstarts_offset + bstarts_size
if cbytes < bstarts_end:
raise ValueError("Invalid Blosc payload: missing block-start table.")

bstarts = np.frombuffer(
cp.asnumpy(source[bstarts_offset:bstarts_end]).tobytes(),
dtype="<i4",
count=nblocks,
)

output = cp.empty((nbytes,), dtype=cp.uint8)

for bi in range(nblocks):
bsize = leftover if (bi == nblocks - 1 and leftover > 0) else blocksize
leftover_block = bi == nblocks - 1 and leftover > 0

if (
(not dont_split)
and (typesize <= _BLOSC_MAX_SPLITS)
and ((blocksize // typesize) >= _BLOSC_MIN_BUFFERSIZE)
and (not leftover_block)
):
nsplits = typesize
else:
nsplits = 1

if bsize % nsplits != 0:
raise ValueError(
f"Invalid Blosc payload: blocksize {bsize} not divisible by nsplits {nsplits}."
)
neblock = bsize // nsplits

block_tmp = cp.empty((bsize,), dtype=cp.uint8)
decode_inputs: list[cp.ndarray] = []
decode_targets: list[tuple[int, int]] = []

p = int(bstarts[bi])
for si in range(nsplits):
if p < 0 or (p + 4) > cbytes:
raise ValueError("Invalid Blosc payload: split header outside payload.")
csize = _read_i32(source, p)
p += 4
if csize < 0 or (p + csize) > cbytes:
raise ValueError("Invalid Blosc payload: split data outside payload.")
split = source[p : p + csize]
p += csize

start = si * neblock
end = start + neblock
if csize == neblock:
block_tmp[start:end] = split
else:
decode_inputs.append(split)
decode_targets.append((start, end))

if decode_inputs:
decoded = self._zstd_codec.decode(nvcomp.as_arrays(decode_inputs))
cp.cuda.get_current_stream().synchronize()
for dec, (start, end) in zip(decoded, decode_targets, strict=True):
block_tmp[start:end] = cp.asarray(dec, dtype=cp.uint8)

if do_bitshuffle:
if bsize % typesize != 0:
raise ValueError(
f"Invalid bitshuffle payload: block bytes {bsize} not divisible by typesize {typesize}."
)
n_elements = bsize // typesize
if n_elements % 8 != 0:
raise ValueError(
"NvcompBloscCodec only supports bitshuffle blocks with element count divisible by 8."
)
block_out = self._bitunshuffle(
block_tmp,
typesize=typesize,
n_elements=n_elements,
)
else:
block_out = block_tmp

block_start = bi * blocksize
output[block_start : block_start + bsize] = block_out

return chunk_spec.prototype.buffer.from_array_like(output)


register_codec("blosc", NvcompBloscCodec, qualname="zarr.codecs.gpu.NvcompBloscCodec")
6 changes: 5 additions & 1 deletion src/zarr/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ def enable_gpu(self) -> ConfigSet:
Configure Zarr to use GPUs where possible.
"""
return self.set(
{"buffer": "zarr.buffer.gpu.Buffer", "ndbuffer": "zarr.buffer.gpu.NDBuffer"}
{
"buffer": "zarr.buffer.gpu.Buffer",
"ndbuffer": "zarr.buffer.gpu.NDBuffer",
"codecs": {"blosc": "zarr.codecs.gpu.NvcompBloscCodec"},
}
)


Expand Down
96 changes: 96 additions & 0 deletions tests/test_codecs/test_nvcomp_blosc.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from __future__ import annotations

import warnings

import numpy as np
import pytest

import zarr
from zarr.codecs.blosc import BloscCodec
from zarr.errors import ZarrUserWarning
from zarr.testing.utils import gpu_test


@gpu_test
@pytest.mark.parametrize("shuffle", ["bitshuffle", "noshuffle"])
def test_nvcomp_blosc_decode_supported(shuffle: str) -> None:
import cupy as cp

src = np.arange(256, dtype=np.float32).reshape(16, 16)
store = zarr.storage.MemoryStore()
z = zarr.create_array(
store=store,
shape=src.shape,
chunks=(8, 8),
dtype=src.dtype,
compressors=BloscCodec(cname="zstd", shuffle=shuffle),
)
z[:, :] = src

with zarr.config.enable_gpu(), warnings.catch_warnings():
warnings.filterwarnings("ignore", category=ZarrUserWarning)
zr = zarr.open_array(store=store, mode="r")
out = zr[:, :]

assert isinstance(out, cp.ndarray)
cp.testing.assert_array_equal(out, cp.asarray(src))


@gpu_test
def test_nvcomp_blosc_decode_raises_on_byte_shuffle() -> None:
src = np.arange(64, dtype=np.float32).reshape(8, 8)
store = zarr.storage.MemoryStore()
z = zarr.create_array(
store=store,
shape=src.shape,
chunks=(8, 8),
dtype=src.dtype,
compressors=BloscCodec(cname="zstd", shuffle="shuffle"),
)
z[:, :] = src

with zarr.config.enable_gpu(), warnings.catch_warnings():
warnings.filterwarnings("ignore", category=ZarrUserWarning)
zr = zarr.open_array(store=store, mode="r")
with pytest.raises(ValueError, match="byte-shuffle"):
_ = zr[:, :]


@gpu_test
def test_nvcomp_blosc_decode_raises_on_non_float32() -> None:
src = np.arange(64, dtype=np.float64).reshape(8, 8)
store = zarr.storage.MemoryStore()
z = zarr.create_array(
store=store,
shape=src.shape,
chunks=(8, 8),
dtype=src.dtype,
compressors=BloscCodec(cname="zstd", shuffle="bitshuffle"),
)
z[:, :] = src

with zarr.config.enable_gpu(), warnings.catch_warnings():
warnings.filterwarnings("ignore", category=ZarrUserWarning)
zr = zarr.open_array(store=store, mode="r")
with pytest.raises(ValueError, match="float32"):
_ = zr[:, :]


@gpu_test
def test_nvcomp_blosc_decode_raises_on_non_zstd() -> None:
src = np.arange(64, dtype=np.float32).reshape(8, 8)
store = zarr.storage.MemoryStore()
z = zarr.create_array(
store=store,
shape=src.shape,
chunks=(8, 8),
dtype=src.dtype,
compressors=BloscCodec(cname="lz4", shuffle="noshuffle"),
)
z[:, :] = src

with zarr.config.enable_gpu(), warnings.catch_warnings():
warnings.filterwarnings("ignore", category=ZarrUserWarning)
zr = zarr.open_array(store=store, mode="r")
with pytest.raises(ValueError, match="cname='zstd'"):
_ = zr[:, :]
8 changes: 8 additions & 0 deletions tests/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,14 @@ def test_config_buffer_backwards_compatibility_gpu() -> None:
get_ndbuffer_class()


@pytest.mark.gpu
def test_enable_gpu_sets_gpu_blosc_codec() -> None:
with zarr.config.enable_gpu():
assert config.get("buffer") == "zarr.buffer.gpu.Buffer"
assert config.get("ndbuffer") == "zarr.buffer.gpu.NDBuffer"
assert config.get("codecs.blosc") == "zarr.codecs.gpu.NvcompBloscCodec"


@pytest.mark.filterwarnings("error")
def test_warning_on_missing_codec_config() -> None:
class NewCodec(BytesCodec):
Expand Down