diff --git a/doc/conf.py b/doc/conf.py index c3042c774..21b76e071 100644 --- a/doc/conf.py +++ b/doc/conf.py @@ -6,6 +6,7 @@ import numpy as np import blosc2 +from blosc2.utils import constructors, elementwise_funcs, reducers def genbody(f, func_list, lib="blosc2"): @@ -99,7 +100,7 @@ def genbody(f, func_list, lib="blosc2"): Their result is always a :ref:`LazyExpr` instance, which can be evaluated (with ``compute`` or ``__getitem__``) to get the actual values of the computation. -Note: The functions ``conj``, ``real``, ``imag``, ``contains``, ``where`` are not technically ufuncs. +Note: The functions ``real``, ``imag``, ``contains``, ``where`` are not technically ufuncs. .. currentmodule:: blosc2 @@ -110,7 +111,7 @@ def genbody(f, func_list, lib="blosc2"): genbody(f, blosc2_ufuncs) # GENERATE additional_funcs.rst -blosc2_addfuncs = sorted(["conj", "real", "imag", "contains", "where", "clip", "round"]) +blosc2_addfuncs = sorted(set(elementwise_funcs) - set(blosc2_ufuncs)) blosc2_dtypefuncs = sorted(["astype", "can_cast", "result_type", "isdtype"]) with open("reference/additional_funcs.rst", "w") as f: @@ -133,7 +134,9 @@ def genbody(f, func_list, lib="blosc2"): ) genbody(f, blosc2_addfuncs) f.write( - """Type Utilities + """ + +Type Utilities -------------- The following functions are useful for working with datatypes. @@ -158,12 +161,14 @@ def genbody(f, func_list, lib="blosc2"): "broadcast_to", "meshgrid", "indices", + "concat", + "stack", ] ) with open("reference/index_funcs.rst", "w") as f: f.write( - """Indexing Functions and Utilities + """Indexing and Manipulation Functions and Utilities ======================================= The following functions are useful for performing indexing and other associated operations. @@ -195,7 +200,68 @@ def genbody(f, func_list, lib="blosc2"): """ ) - genbody(f, linalg_funcs, "blosc2.linalg") + genbody(f, sorted(linalg_funcs), "blosc2.linalg") + +with open("reference/reduction_functions.rst", "w") as f: + f.write( + """Reduction Functions +------------------- + +Contrarily to lazy functions, reduction functions are evaluated eagerly, and the result is always a NumPy array (although this can be converted internally into an :ref:`NDArray ` if you pass any :func:`blosc2.empty` arguments in ``kwargs``). + +Reduction operations can be used with any of :ref:`NDArray `, :ref:`C2Array `, :ref:`NDField ` and :ref:`LazyExpr `. Again, although these can be part of a :ref:`LazyExpr `, you must be aware that they are not lazy, but will be evaluated eagerly during the construction of a LazyExpr instance (this might change in the future). When the input is a :ref:`LazyExpr`, reductions accept ``fp_accuracy`` to control floating-point accuracy, and it is forwarded to :func:`LazyExpr.compute`. + +.. currentmodule:: blosc2 + +.. autosummary:: + +""" + ) + genbody(f, sorted(reducers)) + +with open("reference/ndarray.rst", "w") as f: + f.write( + """.. _NDArray: + +NDArray +======= + +The multidimensional data array class. Instances may be constructed using the constructor functions in the list below `NDArrayConstructors`_. +In addition, all the functions from the :ref:`Lazy Functions ` section can be used with NDArray instances. + +.. currentmodule:: blosc2 + +.. autoclass:: NDArray + :members: + :inherited-members: + :exclude-members: get_slice, set_slice, get_slice_numpy, get_oindex_numpy, set_oindex_numpy + :member-order: groupwise + + :Special Methods: + + .. autosummary:: + + __iter__ + __len__ + __getitem__ + __setitem__ + + Utility Methods + --------------- + + .. automethod:: __iter__ + .. automethod:: __len__ + .. automethod:: __getitem__ + .. automethod:: __setitem__ + +Constructors +------------ +.. _NDArrayConstructors: +.. autosummary:: + +""" + ) + genbody(f, sorted(constructors)) hidden = "_ignore_multiple_size" diff --git a/doc/reference/additional_funcs.rst b/doc/reference/additional_funcs.rst index 238af622c..0453d9731 100644 --- a/doc/reference/additional_funcs.rst +++ b/doc/reference/additional_funcs.rst @@ -12,8 +12,8 @@ Their result is typically a :ref:`LazyExpr` instance, which can be evaluated (wi .. autosummary:: + broadcast_to clip - conj contains imag real @@ -22,13 +22,15 @@ Their result is typically a :ref:`LazyExpr` instance, which can be evaluated (wi +.. autofunction:: blosc2.broadcast_to .. autofunction:: blosc2.clip -.. autofunction:: blosc2.conj .. autofunction:: blosc2.contains .. autofunction:: blosc2.imag .. autofunction:: blosc2.real .. autofunction:: blosc2.round .. autofunction:: blosc2.where + + Type Utilities -------------- diff --git a/doc/reference/index_funcs.rst b/doc/reference/index_funcs.rst index 9e9d2bb1a..78a8165c4 100644 --- a/doc/reference/index_funcs.rst +++ b/doc/reference/index_funcs.rst @@ -1,4 +1,4 @@ -Indexing Functions and Utilities +Indexing and Manipulation Functions and Utilities ======================================= The following functions are useful for performing indexing and other associated operations. diff --git a/doc/reference/linalg.rst b/doc/reference/linalg.rst index 9f06b308c..ea84542d9 100644 --- a/doc/reference/linalg.rst +++ b/doc/reference/linalg.rst @@ -6,22 +6,22 @@ The following functions can be used for computing linear algebra operations with .. autosummary:: + diagonal matmul - tensordot - vecdot - permute_dims - transpose matrix_transpose - diagonal outer + permute_dims + tensordot + transpose + vecdot +.. autofunction:: blosc2.linalg.diagonal .. autofunction:: blosc2.linalg.matmul -.. autofunction:: blosc2.linalg.tensordot -.. autofunction:: blosc2.linalg.vecdot -.. autofunction:: blosc2.linalg.permute_dims -.. autofunction:: blosc2.linalg.transpose .. autofunction:: blosc2.linalg.matrix_transpose -.. autofunction:: blosc2.linalg.diagonal .. autofunction:: blosc2.linalg.outer +.. autofunction:: blosc2.linalg.permute_dims +.. autofunction:: blosc2.linalg.tensordot +.. autofunction:: blosc2.linalg.transpose +.. autofunction:: blosc2.linalg.vecdot diff --git a/doc/reference/misc.rst b/doc/reference/misc.rst index 279ed79a5..50cb0c1b1 100644 --- a/doc/reference/misc.rst +++ b/doc/reference/misc.rst @@ -7,6 +7,7 @@ This page documents the miscellaneous members of the ``blosc2`` module that do n :members: :exclude-members: LazyArray, LazyExpr, + LazyUDF, lazyexpr, lazyudf, evaluate, @@ -227,4 +228,6 @@ This page documents the miscellaneous members of the ``blosc2`` module that do n round, are_partitions_aligned, are_partitions_behaved, - indices + indices, + cumulative_sum, + cumulative_prod, diff --git a/doc/reference/ndarray.rst b/doc/reference/ndarray.rst index 5c50aeec3..5fe00c412 100644 --- a/doc/reference/ndarray.rst +++ b/doc/reference/ndarray.rst @@ -38,14 +38,13 @@ Constructors arange asarray - concat copy empty empty_like - expand_dims eye frombuffer fromiter + fromiter full full_like linspace @@ -55,31 +54,30 @@ Constructors ones ones_like reshape - stack - squeeze uninit zeros zeros_like -.. autofunction:: arange -.. autofunction:: asarray -.. autofunction:: concat -.. autofunction:: copy -.. autofunction:: empty -.. autofunction:: empty_like -.. autofunction:: expand_dims -.. autofunction:: eye -.. autofunction:: frombuffer -.. autofunction:: fromiter -.. autofunction:: full -.. autofunction:: full_like -.. autofunction:: linspace -.. autofunction:: nans -.. autofunction:: ndarray_from_cframe -.. autofunction:: ones -.. autofunction:: ones_like -.. autofunction:: reshape -.. autofunction:: stack -.. autofunction:: uninit -.. autofunction:: zeros -.. autofunction:: zeros_like + + +.. autofunction:: blosc2.arange +.. autofunction:: blosc2.asarray +.. autofunction:: blosc2.copy +.. autofunction:: blosc2.empty +.. autofunction:: blosc2.empty_like +.. autofunction:: blosc2.eye +.. autofunction:: blosc2.frombuffer +.. autofunction:: blosc2.fromiter +.. autofunction:: blosc2.fromiter +.. autofunction:: blosc2.full +.. autofunction:: blosc2.full_like +.. autofunction:: blosc2.linspace +.. autofunction:: blosc2.meshgrid +.. autofunction:: blosc2.nans +.. autofunction:: blosc2.ndarray_from_cframe +.. autofunction:: blosc2.ones +.. autofunction:: blosc2.ones_like +.. autofunction:: blosc2.reshape +.. autofunction:: blosc2.uninit +.. autofunction:: blosc2.zeros +.. autofunction:: blosc2.zeros_like diff --git a/doc/reference/reduction_functions.rst b/doc/reference/reduction_functions.rst index 45f27abe1..4c21c150e 100644 --- a/doc/reference/reduction_functions.rst +++ b/doc/reference/reduction_functions.rst @@ -11,24 +11,32 @@ Reduction operations can be used with any of :ref:`NDArray `, :ref:`C2A all any - sum - prod + argmax + argmin + count_nonzero + cumulative_prod + cumulative_sum + max mean + min + prod std + sum var - min - max - argmin - argmax -.. autofunction:: all -.. autofunction:: any -.. autofunction:: sum -.. autofunction:: prod -.. autofunction:: mean -.. autofunction:: std -.. autofunction:: var -.. autofunction:: min -.. autofunction:: max -.. autofunction:: argmin -.. autofunction:: argmax + + +.. autofunction:: blosc2.all +.. autofunction:: blosc2.any +.. autofunction:: blosc2.argmax +.. autofunction:: blosc2.argmin +.. autofunction:: blosc2.count_nonzero +.. autofunction:: blosc2.cumulative_prod +.. autofunction:: blosc2.cumulative_sum +.. autofunction:: blosc2.max +.. autofunction:: blosc2.mean +.. autofunction:: blosc2.min +.. autofunction:: blosc2.prod +.. autofunction:: blosc2.std +.. autofunction:: blosc2.sum +.. autofunction:: blosc2.var diff --git a/doc/reference/ufuncs.rst b/doc/reference/ufuncs.rst index 09ffb7908..3ae5397a7 100644 --- a/doc/reference/ufuncs.rst +++ b/doc/reference/ufuncs.rst @@ -5,7 +5,7 @@ The following elementwise functions can be used for computing with any of :ref:` Their result is always a :ref:`LazyExpr` instance, which can be evaluated (with ``compute`` or ``__getitem__``) to get the actual values of the computation. -Note: The functions ``conj``, ``real``, ``imag``, ``contains``, ``where`` are not technically ufuncs. +Note: The functions ``real``, ``imag``, ``contains``, ``where`` are not technically ufuncs. .. currentmodule:: blosc2 diff --git a/src/blosc2/__init__.py b/src/blosc2/__init__.py index c9c3ab36b..c877c1ce5 100644 --- a/src/blosc2/__init__.py +++ b/src/blosc2/__init__.py @@ -556,6 +556,8 @@ def _raise(exc): cos, cosh, count_nonzero, + cumulative_prod, + cumulative_sum, divide, equal, exp, @@ -719,6 +721,8 @@ def _raise(exc): "count_nonzero", "cparams_dflts", "cpu_info", + "cumulative_prod", + "cumulative_sum", "decompress", "decompress2", "detect_number_of_cores", diff --git a/src/blosc2/lazyexpr.py b/src/blosc2/lazyexpr.py index 565d97a6c..ea72da284 100644 --- a/src/blosc2/lazyexpr.py +++ b/src/blosc2/lazyexpr.py @@ -13,6 +13,7 @@ import builtins import concurrent.futures import copy +import enum import inspect import linecache import math @@ -49,6 +50,10 @@ from .proxy import _convert_dtype from .utils import ( NUMPY_GE_2_0, + _get_chunk_operands, + _sliced_chunk_iter, + check_smaller_shape, + compute_smaller_slice, constructors, elementwise_funcs, get_chunks_idx, @@ -56,6 +61,8 @@ infer_shape, linalg_attrs, linalg_funcs, + npcumprod, + npcumsum, npvecdot, process_key, reducers, @@ -90,6 +97,8 @@ safe_numpy_globals["concat"] = np.concatenate safe_numpy_globals["matrix_transpose"] = np.transpose safe_numpy_globals["vecdot"] = npvecdot + safe_numpy_globals["cumulative_sum"] = npcumsum + safe_numpy_globals["cumulative_prod"] = npcumprod # Set this to False if miniexpr should not be tried out try_miniexpr = True @@ -129,6 +138,19 @@ def ne_evaluate(expression, local_dict=None, **kwargs): raise e # unsafe expression except Exception: # non_numexpr functions present global safe_blosc2_globals + # ne_evaluate will need safe_blosc2_globals for some functions (e.g. clip, logaddexp) + # that are implemented in python-blosc2 not in numexpr + if len(safe_blosc2_globals) == 0: + # First eval call, fill blosc2_safe_globals for ne_evaluate + safe_blosc2_globals = {"blosc2": blosc2} + # Add all first-level blosc2 functions + safe_blosc2_globals.update( + { + name: getattr(blosc2, name) + for name in dir(blosc2) + if callable(getattr(blosc2, name)) and not name.startswith("_") + } + ) res = eval(expression, safe_blosc2_globals, local_dict) if "out" in kwargs: out = kwargs.pop("out") @@ -137,6 +159,45 @@ def ne_evaluate(expression, local_dict=None, **kwargs): return res[()] if isinstance(res, blosc2.Operand) else res +def _get_result(expression, chunk_operands, ne_args, where=None, indices=None, _order=None): + chunk_indices = None + if (expression == "o0" or expression == "(o0)") and where is None: + # We don't have an actual expression, so avoid a copy except to make contiguous (later) + return chunk_operands["o0"], None + # Apply the where condition (in result) + if where is not None and len(where) == 2: + # x = chunk_operands["_where_x"] + # y = chunk_operands["_where_y"] + # result = np.where(result, x, y) + # numexpr is a bit faster than np.where, and we can fuse operations in this case + new_expr = f"where({expression}, _where_x, _where_y)" + return ne_evaluate(new_expr, chunk_operands, **ne_args), None + + result = ne_evaluate(expression, chunk_operands, **ne_args) + if where is None: + return result, None + elif len(where) == 1: + x = chunk_operands["_where_x"] + if (indices is not None) or (_order is not None): + # Return indices only makes sense when the where condition is a tuple with one element + # and result is a boolean array + if len(x.shape) > 1: + raise ValueError("indices() and sort() only support 1D arrays") + if result.dtype != np.bool_: + raise ValueError("indices() and sort() only support bool conditions") + if _order: + # We need to cumulate all the fields in _order, as well as indices + chunk_indices = indices[result] + result = x[_order][result] + else: + chunk_indices = None + result = indices[result] + return result, chunk_indices + else: + return x[result], None + raise ValueError("The where condition must be a tuple with one or two elements") + + # Define empty ndindex tuple for function defaults NDINDEX_EMPTY_TUPLE = ndindex.Tuple() @@ -215,37 +276,55 @@ def get_expr_globals(expression): if hasattr(blosc2, func): _globals[func] = getattr(blosc2, func) # Fall back to numpy - elif hasattr(np, func): - _globals[func] = getattr(np, func) - # Function not found in either module else: - raise AttributeError(f"Function {func} not found in blosc2 or numpy") + try: + _globals[func] = safe_numpy_globals[func] + # Function not found in either module + except KeyError as e: + raise AttributeError(f"Function {func} not found in blosc2 or numpy") from e return _globals +if not hasattr(enum, "member"): + # copy-pasted from Lib/enum.py + class _mymember: + """ + Forces item to become an Enum member during class creation. + """ + + def __init__(self, value): + self.value = value +else: + _mymember = enum.member # only available after python 3.11 + + class ReduceOp(Enum): """ Available reduce operations. """ - SUM = np.add - PROD = np.multiply - MEAN = np.mean - STD = np.std - VAR = np.var + # wrap as enum.member so that Python doesn't treat some funcs + # as class methods (rather than Enum members) + SUM = _mymember(np.add) + PROD = _mymember(np.multiply) + MEAN = _mymember(np.mean) + STD = _mymember(np.std) + VAR = _mymember(np.var) # Computing a median from partial results is not straightforward because the median # is a positional statistic, which means it depends on the relative ordering of all # the data points. Unlike statistics such as the sum or mean, you can't compute a median # from partial results without knowing the entire dataset, and this is way too expensive # for arrays that cannot typically fit in-memory (e.g. disk-based NDArray). # MEDIAN = np.median - MAX = np.maximum - MIN = np.minimum - ANY = np.any - ALL = np.all - ARGMAX = np.argmax - ARGMIN = np.argmin + MAX = _mymember(np.maximum) + MIN = _mymember(np.minimum) + ANY = _mymember(np.any) + ALL = _mymember(np.all) + ARGMAX = _mymember(np.argmax) + ARGMIN = _mymember(np.argmin) + CUMULATIVE_SUM = _mymember(npcumsum) + CUMULATIVE_PROD = _mymember(npcumprod) class LazyArrayEnum(Enum): @@ -522,79 +601,6 @@ def compute_broadcast_shape(arrays): return np.broadcast_shapes(*shapes) if shapes else None -def check_smaller_shape(value_shape, shape, slice_shape, slice_): - """Check whether the shape of the value is smaller than the shape of the array. - - This follows the NumPy broadcasting rules. - """ - # slice_shape must be as long as shape - if len(slice_shape) != len(slice_): - raise ValueError("slice_shape must be as long as slice_") - no_nones_shape = tuple(sh for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) - no_nones_slice = tuple(s for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) - is_smaller_shape = any( - s > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_shape) - ) - slice_past_bounds = any( - s.stop > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_slice) - ) - return len(value_shape) < len(shape) or is_smaller_shape or slice_past_bounds - - -def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): - smaller_slice = [] - diff_dims = len(larger_shape) - len(smaller_shape) - - for i in range(len(larger_shape)): - if i < diff_dims: - # For leading dimensions of the larger array that the smaller array doesn't have, - # we don't add anything to the smaller slice - pass - else: - # For dimensions that both arrays have, the slice for the smaller array should be - # the same as the larger array unless the smaller array's size along that dimension - # is 1, in which case we use None to indicate the full slice - if smaller_shape[i - diff_dims] != 1: - smaller_slice.append(larger_slice[i]) - else: - smaller_slice.append(slice(0, larger_shape[i])) - - return tuple(smaller_slice) - - -# A more compact version of the function above, albeit less readable -def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): - """ - Returns the slice of the smaller array that corresponds to the slice of the larger array. - """ - j_small = len(smaller_shape) - 1 - j_large = len(larger_shape) - 1 - smaller_shape_nones = [] - larger_shape_nones = [] - for s in reversed(larger_slice): - if s is None: - smaller_shape_nones.append(1) - larger_shape_nones.append(1) - else: - if j_small >= 0: - smaller_shape_nones.append(smaller_shape[j_small]) - j_small -= 1 - if j_large >= 0: - larger_shape_nones.append(larger_shape[j_large]) - j_large -= 1 - smaller_shape_nones.reverse() - larger_shape_nones.reverse() - diff_dims = len(larger_shape_nones) - len(smaller_shape_nones) - return tuple( - None - if larger_slice[i] is None - else ( - larger_slice[i] if smaller_shape_nones[i - diff_dims] != 1 else slice(0, larger_shape_nones[i]) - ) - for i in range(diff_dims, len(larger_shape_nones)) - ) - - # Define the patterns for validation validation_patterns = [ r"[\;]", # Flow control characters @@ -1106,10 +1112,14 @@ def get_chunk(arr, info, nchunk): async def async_read_chunks(arrs, info, queue): loop = asyncio.get_event_loop() - nchunks = arrs[0].schunk.nchunks - + shape, chunks_ = arrs[0].shape, arrs[0].chunks with concurrent.futures.ThreadPoolExecutor() as executor: - for nchunk in range(nchunks): + my_chunk_iter = range(arrs[0].schunk.nchunks) + if len(info) == 5: + if info[-1] is not None: + my_chunk_iter = _sliced_chunk_iter(chunks_, (), shape, axis=info[-1], nchunk=True) + info = info[:4] + for i, nchunk in enumerate(my_chunk_iter): futures = [ (index, loop.run_in_executor(executor, get_chunk, arr, info, nchunk)) for index, arr in enumerate(arrs) @@ -1122,7 +1132,7 @@ async def async_read_chunks(arrs, info, queue): print(f"Exception occurred: {chunk}") raise chunk chunks_sorted.append(chunk) - queue.put((nchunk, chunks_sorted)) # use non-async queue.put() + queue.put((i, chunks_sorted)) # use non-async queue.put() queue.put(None) # signal the end of the chunks @@ -1159,7 +1169,7 @@ def read_nchunk(arrs, info): def fill_chunk_operands( - operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False + operands, slice_, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=False, axis=None ): """Retrieve the chunk operands for evaluating an expression. @@ -1172,12 +1182,12 @@ def fill_chunk_operands( low_mem = os.environ.get("BLOSC_LOW_MEM", False) # This method is only useful when all operands are NDArray and shows better # performance only when at least one of them is persisted on disk - if nchunk == 0: + if iter_chunks is None: # Initialize the iterator for reading the chunks # Take any operand (all should have the same shape and chunks) key, arr = next(iter(operands.items())) chunks_idx, _ = get_chunks_idx(arr.shape, arr.chunks) - info = (reduc, aligned[key], low_mem, chunks_idx) + info = (reduc, aligned[key], low_mem, chunks_idx, axis) iter_chunks = read_nchunk(list(operands.values()), info) # Run the asynchronous file reading function from a synchronous context chunks = next(iter_chunks) @@ -1185,7 +1195,10 @@ def fill_chunk_operands( for i, (key, value) in enumerate(operands.items()): # Chunks are already decompressed, so we can use them directly if not low_mem: - chunk_operands[key] = chunks[i] + if full_chunk: + chunk_operands[key] = chunks[i] + else: + chunk_operands[key] = value[slice_] continue # Otherwise, we need to decompress them if aligned[key]: @@ -1590,14 +1603,16 @@ def slices_eval( # noqa: C901 intersecting_chunks = get_intersecting_chunks( _slice, shape, chunks ) # if _slice is (), returns all chunks + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) - for nchunk, chunk_slice in enumerate(intersecting_chunks): - # get intersection of chunk and target - if _slice != (): - cslice = step_handler(chunk_slice.raw, _slice) - else: - cslice = chunk_slice.raw - + for chunk_slice in intersecting_chunks: + # Check whether current cslice intersects with _slice + cslice = chunk_slice.raw + nchunk = builtins.sum([c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(cslice)]) + if cslice != () and _slice != (): + # get intersection of chunk and target + cslice = step_handler(cslice, _slice) + offset = tuple(s.start for s in cslice) # offset for the udf cslice_shape = tuple(s.stop - s.start for s in cslice) len_chunk = math.prod(cslice_shape) # get local index of part of out that is to be updated @@ -1605,35 +1620,7 @@ def slices_eval( # noqa: C901 ndindex.ndindex(cslice).as_subindex(_slice).raw ) # in the case _slice=(), just gives cslice - # Get the starts and stops for the slice - starts = [s.start if s.start is not None else 0 for s in cslice] - stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, cslice_shape, strict=True)] - unit_steps = np.all([s.step == 1 for s in cslice]) - - # Get the slice of each operand - for key, value in operands.items(): - if np.isscalar(value): - chunk_operands[key] = value - continue - if value.shape == (): - chunk_operands[key] = value[()] - continue - if check_smaller_shape(value.shape, shape, cslice_shape, cslice): - # We need to fetch the part of the value that broadcasts with the operand - smaller_slice = compute_smaller_slice(shape, value.shape, cslice) - chunk_operands[key] = value[smaller_slice] - continue - # If key is in operands, we can reuse the buffer - if ( - key in chunk_operands - and cslice_shape == chunk_operands[key].shape - and isinstance(value, blosc2.NDArray) - and unit_steps - ): - value.get_slice_numpy(chunk_operands[key], (starts, stops)) - continue - - chunk_operands[key] = value[cslice] + _get_chunk_operands(operands, cslice, chunk_operands, shape) if out is None: shape_ = shape_slice if shape_slice is not None else shape @@ -1664,47 +1651,16 @@ def slices_eval( # noqa: C901 result = np.empty(cslice_shape, dtype=out.dtype) # raises error if out is None # cslice should be equal to cslice_subidx # Call the udf directly and use result as the output array - offset = tuple(s.start for s in cslice) expression(tuple(chunk_operands.values()), result, offset=offset) out[cslice_subidx] = result continue - if where is None: - result = ne_evaluate(expression, chunk_operands, **ne_args) + if _indices or _order: + indices = np.arange(leninputs, leninputs + len_chunk, dtype=np.int64).reshape(cslice_shape) + leninputs += len_chunk + result, chunk_indices = _get_result(expression, chunk_operands, ne_args, where, indices, _order) else: - # Apply the where condition (in result) - if len(where) == 2: - # x = chunk_operands["_where_x"] - # y = chunk_operands["_where_y"] - # result = np.where(result, x, y) - # numexpr is a bit faster than np.where, and we can fuse operations in this case - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, chunk_operands, **ne_args) - elif len(where) == 1: - result = ne_evaluate(expression, chunk_operands, **ne_args) - if _indices or _order: - # Return indices only makes sense when the where condition is a tuple with one element - # and result is a boolean array - x = chunk_operands["_where_x"] - if len(x.shape) > 1: - raise ValueError("indices() and sort() only support 1D arrays") - if result.dtype != np.bool_: - raise ValueError("indices() and sort() only support bool conditions") - indices = np.arange(leninputs, leninputs + len_chunk, dtype=np.int64).reshape( - cslice_shape - ) - if _order: - # We need to cumulate all the fields in _order, as well as indices - chunk_indices = indices[result] - result = x[_order][result] - else: - result = indices[result] - leninputs += len_chunk - else: - x = chunk_operands["_where_x"] - result = x[result] - else: - raise ValueError("The where condition must be a tuple with one or two elements") + result, _ = _get_result(expression, chunk_operands, ne_args, where) # Enforce contiguity of result (necessary to fill the out array) # but avoid copy if already contiguous result = np.require(result, requirements="C") @@ -1811,9 +1767,9 @@ def slices_eval_getitem( shape = out.shape # compute the shape of the output array - _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice.raw) - slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions _slice = _slice.raw + _slice_bcast = tuple(slice(i, i + 1) if isinstance(i, int) else i for i in _slice) + slice_shape = ndindex.ndindex(_slice_bcast).newshape(shape) # includes dummy dimensions # Get the slice of each operand slice_operands = {} @@ -1838,12 +1794,7 @@ def slices_eval_getitem( result = np.empty(slice_shape, dtype=dtype) expression(tuple(slice_operands.values()), result, offset=offset) else: - if where is None: - result = ne_evaluate(expression, slice_operands, **ne_args) - else: - # Apply the where condition (in result) - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, slice_operands, **ne_args) + result, _ = _get_result(expression, slice_operands, ne_args, where) if out is None: # avoid copying unnecessarily try: @@ -1863,7 +1814,7 @@ def infer_reduction_dtype(dtype, operation): my_float = np.result_type( dtype, np.float32 if dtype in (np.float32, np.complex64) else blosc2.DEFAULT_FLOAT ) - if operation in {ReduceOp.SUM, ReduceOp.PROD}: + if operation in {ReduceOp.SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: if np.issubdtype(dtype, np.bool_): return np.int64 if np.issubdtype(dtype, np.unsignedinteger): @@ -1945,7 +1896,8 @@ def reduce_slices( # noqa: C901 reduce_op = reduce_args.pop("op") reduce_op_str = reduce_args.pop("op_str", None) axis = reduce_args["axis"] - keepdims = reduce_args["keepdims"] + keepdims = reduce_args.get("keepdims", False) + include_initial = reduce_args.pop("include_initial", False) dtype = reduce_args.get("dtype", None) if dtype is None: dtype = kwargs.pop("dtype", None) @@ -1974,14 +1926,23 @@ def reduce_slices( # noqa: C901 if np.any(mask_slice): add_idx = np.cumsum(mask_slice) axis = tuple(a + add_idx[a] for a in axis) # axis now refers to new shape with dummy dims - if reduce_args["axis"] is not None: - # conserve as integer if was not tuple originally - reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis - if keepdims: - reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) + if reduce_args["axis"] is not None: + # conserve as integer if was not tuple originally + reduce_args["axis"] = axis[0] if np.isscalar(reduce_args["axis"]) else axis + if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + reduced_shape = (np.prod(shape_slice),) if reduce_args["axis"] is None else shape_slice + # if reduce_args["axis"] is None, have to have 1D input array; otherwise, ensure positive scalar + reduce_args["axis"] = 0 if reduce_args["axis"] is None else axis[0] + if include_initial: + reduced_shape = tuple( + s + 1 if i == reduce_args["axis"] else s for i, s in enumerate(shape_slice) + ) else: - reduced_shape = tuple(s for i, s in enumerate(shape_slice) if i not in axis) - mask_slice = mask_slice[[i for i in range(len(mask_slice)) if i not in axis]] + if keepdims: + reduced_shape = tuple(1 if i in axis else s for i, s in enumerate(shape_slice)) + else: + reduced_shape = tuple(s for i, s in enumerate(shape_slice) if i not in axis) + mask_slice = mask_slice[[i for i in range(len(mask_slice)) if i not in axis]] if out is not None and reduced_shape != out.shape: raise ValueError("Provided output shape does not match the reduced shape.") @@ -2046,7 +2007,7 @@ def reduce_slices( # noqa: C901 use_miniexpr = False # Some reductions are not supported yet in miniexpr - if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN): + if reduce_op in (ReduceOp.ARGMAX, ReduceOp.ARGMIN, ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM): use_miniexpr = False # Check whether we can use miniexpr @@ -2081,9 +2042,9 @@ def reduce_slices( # noqa: C901 nblocks = res_eval.nbytes // res_eval.blocksize # Initialize aux_reduc based on the reduction operation # Padding blocks won't be written, so initial values matter for the final reduction - if reduce_op in {ReduceOp.SUM, ReduceOp.ANY}: + if reduce_op in {ReduceOp.SUM, ReduceOp.ANY, ReduceOp.CUMULATIVE_SUM}: aux_reduc = np.zeros(nblocks, dtype=dtype) - elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL}: + elif reduce_op in {ReduceOp.PROD, ReduceOp.ALL, ReduceOp.CUMULATIVE_PROD}: aux_reduc = np.ones(nblocks, dtype=dtype) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): @@ -2124,10 +2085,8 @@ def reduce_slices( # noqa: C901 # (continue to the manual chunked evaluation below) pass else: - if reduce_op == ReduceOp.ANY: - result = np.any(aux_reduc, **reduce_args) - elif reduce_op == ReduceOp.ALL: - result = np.all(aux_reduc, **reduce_args) + if reduce_op in {ReduceOp.ANY, ReduceOp.ALL}: + result = reduce_op.value(aux_reduc, **reduce_args) else: result = reduce_op.value.reduce(aux_reduc, **reduce_args) return result @@ -2135,66 +2094,66 @@ def reduce_slices( # noqa: C901 # Iterate over the operands and get the chunks chunk_operands = {} # Check which chunks intersect with _slice - # if chunks has 0 we loop once but fast path is false as gives error (schunk has no chunks) - intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) + if np.isscalar(reduce_args["axis"]): # iterate over chunks incrementing along reduction axis + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks, axis=reduce_args["axis"]) + else: # iterate over chunks incrementing along last axis + intersecting_chunks = get_intersecting_chunks(_slice, shape, chunks) out_init = False + res_out_init = False + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) - for nchunk, chunk_slice in enumerate(intersecting_chunks): + for chunk_slice in intersecting_chunks: cslice = chunk_slice.raw - offset = tuple(s.start for s in cslice) # offset for the udf + nchunk = builtins.sum([c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(cslice)]) # Check whether current cslice intersects with _slice if cslice != () and _slice != (): # get intersection of chunk and target cslice = step_handler(cslice, _slice) - chunks_ = tuple(s.stop - s.start for s in cslice) - unit_steps = np.all([s.step == 1 for s in cslice]) - # Starts for slice + offset = tuple(s.start for s in cslice) # offset for the udf starts = [s.start if s.start is not None else 0 for s in cslice] + unit_steps = np.all([s.step == 1 for s in cslice]) + cslice_shape = tuple(s.stop - s.start for s in cslice) + # get local index of part of out that is to be updated + cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice if _slice == () and fast_path and unit_steps: # Fast path - full_chunk = chunks_ == chunks + full_chunk = cslice_shape == chunks fill_chunk_operands( - operands, cslice, chunks_, full_chunk, aligned, nchunk, iter_disk, chunk_operands, reduc=True + operands, + cslice, + cslice_shape, + full_chunk, + aligned, + nchunk, + iter_disk, + chunk_operands, + reduc=True, + axis=reduce_args["axis"] if np.isscalar(reduce_args["axis"]) else None, ) else: - # Get the stops for the slice - stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, chunks_, strict=True)] - # Get the slice of each operand - for key, value in operands.items(): - if np.isscalar(value): - chunk_operands[key] = value - continue - if value.shape == (): - chunk_operands[key] = value[()] - continue - if check_smaller_shape(value.shape, shape, chunks_, cslice): - # We need to fetch the part of the value that broadcasts with the operand - smaller_slice = compute_smaller_slice(operand.shape, value.shape, cslice) - chunk_operands[key] = value[smaller_slice] - continue - # If key is in operands, we can reuse the buffer - if ( - key in chunk_operands - and chunks_ == chunk_operands[key].shape - and isinstance(value, blosc2.NDArray) - and unit_steps - ): - value.get_slice_numpy(chunk_operands[key], (starts, stops)) - continue - chunk_operands[key] = value[cslice] + _get_chunk_operands(operands, cslice, chunk_operands, shape) - # get local index of part of out that is to be updated - cslice_subidx = ndindex.ndindex(cslice).as_subindex(_slice).raw # if _slice is (), just gives cslice - if keepdims: - reduced_slice = tuple(slice(None) if i in axis else sl for i, sl in enumerate(cslice_subidx)) + if reduce_op in {ReduceOp.CUMULATIVE_PROD, ReduceOp.CUMULATIVE_SUM}: + reduced_slice = ( + tuple( + slice(sl.start + 1, sl.stop + 1, sl.step) if i == reduce_args["axis"] else sl + for i, sl in enumerate(cslice_subidx) + ) + if include_initial + else cslice_subidx + ) else: - reduced_slice = tuple(sl for i, sl in enumerate(cslice_subidx) if i not in axis) + reduced_slice = ( + tuple(slice(None) if i in axis else sl for i, sl in enumerate(cslice_subidx)) + if keepdims + else tuple(sl for i, sl in enumerate(cslice_subidx) if i not in axis) + ) # Evaluate and reduce the expression using chunks of operands if callable(expression): # TODO: Implement the reductions for UDFs (and test them) - result = np.empty(chunks_, dtype=out.dtype) + result = np.empty(cslice_shape, dtype=out.dtype) expression(tuple(chunk_operands.values()), result, offset=offset) # Reduce the result result = reduce_op.value.reduce(result, **reduce_args) @@ -2202,37 +2161,20 @@ def reduce_slices( # noqa: C901 out[reduced_slice] = reduce_op.value(out[reduced_slice], result) continue - if where is None: - if expression in {"o0", "(o0)"}: - # We don't have an actual expression, so avoid a copy except to make contiguous - result = np.require(chunk_operands["o0"], requirements="C") - else: - result = ne_evaluate(expression, chunk_operands, **ne_args) - else: - # Apply the where condition (in result) - if len(where) == 2: - new_expr = f"where({expression}, _where_x, _where_y)" - result = ne_evaluate(new_expr, chunk_operands, **ne_args) - elif len(where) == 1: - result = ne_evaluate(expression, chunk_operands, **ne_args) - x = chunk_operands["_where_x"] - result = x[result] - else: - raise NotImplementedError( - "A where condition with no params in combination with reductions is not supported yet" - ) + result, _ = _get_result(expression, chunk_operands, ne_args, where) + # Enforce contiguity of result (necessary to fill the out array) + # but avoid copy if already contiguous + result = np.require(result, requirements="C") # Reduce the result if result.shape == (): if reduce_op == ReduceOp.SUM and result[()] == 0: # Avoid a reduction when result is a zero scalar. Faster for sparse data. continue - # Note that chunks_ refers to slice of operand chunks, not reduced_slice - result = np.full(chunks_, result[()]) - if reduce_op == ReduceOp.ANY: - result = np.any(result, **reduce_args) - elif reduce_op == ReduceOp.ALL: - result = np.all(result, **reduce_args) + # Note that cslice_shape refers to slice of operand chunks, not reduced_slice + result = np.full(cslice_shape, result[()]) + if reduce_op in {ReduceOp.ANY, ReduceOp.ALL, ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + result = reduce_op.value(result, **reduce_args) elif reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: # offset for start of slice slice_ref = ( @@ -2243,14 +2185,10 @@ def reduce_slices( # noqa: C901 for s, sl in zip(starts, _slice, strict=True) ] ) - result_idx = ( - np.argmin(result, **reduce_args) - if reduce_op == ReduceOp.ARGMIN - else np.argmax(result, **reduce_args) - ) + result_idx = reduce_op.value(result, **reduce_args) if reduce_args["axis"] is None: # indexing into flattened array result = result[np.unravel_index(result_idx, shape=result.shape)] - idx_within_cslice = np.unravel_index(result_idx, shape=chunks_) + idx_within_cslice = np.unravel_index(result_idx, shape=cslice_shape) result_idx = np.ravel_multi_index( tuple(o + i for o, i in zip(slice_ref, idx_within_cslice, strict=True)), shape_slice ) @@ -2266,7 +2204,7 @@ def reduce_slices( # noqa: C901 result = reduce_op.value.reduce(result, **reduce_args) if not out_init: - out_, res_out_ = convert_none_out(result.dtype, reduce_op, reduced_shape) + out_ = convert_none_out(result.dtype, reduce_op, reduced_shape) if out is not None: out[:] = out_ del out_ @@ -2274,29 +2212,38 @@ def reduce_slices( # noqa: C901 out = out_ out_init = True + # res_out only used be argmin/max and cumulative_sum/prod which only accept axis=int argument + if (not res_out_init) or ( + np.isscalar(reduce_args["axis"]) and cslice_subidx[reduce_args["axis"]].start == 0 + ): # starting reduction again along axis + res_out_ = _get_res_out(result.shape, reduce_args["axis"], dtype, reduce_op) + res_out_init = True + # Update the output array with the result if reduce_op == ReduceOp.ANY: out[reduced_slice] += result elif reduce_op == ReduceOp.ALL: out[reduced_slice] *= result - elif res_out_ is not None: # i.e. ReduceOp.ARGMAX or ReduceOp.ARGMIN + elif res_out_ is not None: # need lowest index for which optimum attained - cond = (res_out_[reduced_slice] == result) & (result_idx < out[reduced_slice]) - if reduce_op == ReduceOp.ARGMAX: - cond |= res_out_[reduced_slice] < result - else: # ARGMIN - cond |= res_out_[reduced_slice] > result - if reduced_slice == (): - out = np.where(cond, result_idx, out[reduced_slice]) - res_out_ = np.where(cond, result, res_out_[reduced_slice]) - else: + if reduce_op in {ReduceOp.ARGMAX, ReduceOp.ARGMIN}: + cond = (res_out_ == result) & (result_idx < out[reduced_slice]) + cond |= res_out_ < result if reduce_op == ReduceOp.ARGMAX else res_out_ > result out[reduced_slice] = np.where(cond, result_idx, out[reduced_slice]) - res_out_[reduced_slice] = np.where(cond, result, res_out_[reduced_slice]) + res_out_ = np.where(cond, result, res_out_) + else: # CUMULATIVE_SUM or CUMULATIVE_PROD + idx_lastval = tuple( + slice(-1, None) if i == reduce_args["axis"] else slice(None, None) + for i, c in enumerate(reduced_slice) + ) + if reduce_op == ReduceOp.CUMULATIVE_SUM: + result += res_out_ + else: # CUMULATIVE_PROD + result *= res_out_ + res_out_ = result[idx_lastval] + out[reduced_slice] = result else: - if reduced_slice == (): - out = reduce_op.value(out, result) - else: - out[reduced_slice] = reduce_op.value(out[reduced_slice], result) + out[reduced_slice] = reduce_op.value(out[reduced_slice], result) # No longer need res_out_ del res_out_ @@ -2307,8 +2254,9 @@ def reduce_slices( # noqa: C901 if dtype is None: # We have no hint here, so choose a default dtype dtype = np.float64 - out, _ = convert_none_out(dtype, reduce_op, reduced_shape) + out = convert_none_out(dtype, reduce_op, reduced_shape) + out = out[()] if reduced_shape == () else out # undo dummy dim from inside convert_none_out final_mask = tuple(np.where(mask_slice)[0]) if np.any(mask_slice): # remove dummy dims out = np.squeeze(out, axis=final_mask) @@ -2318,13 +2266,39 @@ def reduce_slices( # noqa: C901 return out +def _get_res_out(reduced_shape, axis, dtype, reduce_op): + reduced_shape = (1,) if reduced_shape == () else reduced_shape + # Get res_out to hold running sums along axes for chunks when doing cumulative sums/prods with axis not None + if reduce_op in {ReduceOp.CUMULATIVE_SUM, ReduceOp.CUMULATIVE_PROD}: + temp_shape = tuple(1 if i == axis else s for i, s in enumerate(reduced_shape)) + res_out_ = ( + np.zeros(temp_shape, dtype=dtype) + if reduce_op == ReduceOp.CUMULATIVE_SUM + else np.ones(temp_shape, dtype=dtype) + ) + elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: + temp_shape = reduced_shape + res_out_ = np.ones(temp_shape, dtype=dtype) + if np.issubdtype(dtype, np.integer): + res_out_ *= np.iinfo(dtype).max if reduce_op == ReduceOp.ARGMIN else np.iinfo(dtype).min + elif np.issubdtype(dtype, np.bool): + res_out_ = res_out_ if reduce_op == ReduceOp.ARGMIN else np.zeros(temp_shape, dtype=dtype) + else: + res_out_ *= np.inf if reduce_op == ReduceOp.ARGMIN else -np.inf + else: + res_out_ = None + return res_out_ + + def convert_none_out(dtype, reduce_op, reduced_shape): - out = None + reduced_shape = (1,) if reduced_shape == () else reduced_shape # out will be a proper numpy.ndarray - if reduce_op == ReduceOp.SUM: - out = np.zeros(reduced_shape, dtype=dtype) - elif reduce_op == ReduceOp.PROD: - out = np.ones(reduced_shape, dtype=dtype) + if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM, ReduceOp.PROD, ReduceOp.CUMULATIVE_PROD}: + out = ( + np.zeros(reduced_shape, dtype=dtype) + if reduce_op in {ReduceOp.SUM, ReduceOp.CUMULATIVE_SUM} + else np.ones(reduced_shape, dtype=dtype) + ) elif reduce_op == ReduceOp.MIN: if np.issubdtype(dtype, np.integer): out = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) @@ -2339,19 +2313,9 @@ def convert_none_out(dtype, reduce_op, reduced_shape): out = np.zeros(reduced_shape, dtype=np.bool_) elif reduce_op == ReduceOp.ALL: out = np.ones(reduced_shape, dtype=np.bool_) - elif reduce_op == ReduceOp.ARGMIN: - if np.issubdtype(dtype, np.integer): - res_out_ = np.iinfo(dtype).max * np.ones(reduced_shape, dtype=dtype) - else: - res_out_ = np.inf * np.ones(reduced_shape, dtype=dtype) - out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) - elif reduce_op == ReduceOp.ARGMAX: - if np.issubdtype(dtype, np.integer): - res_out_ = np.iinfo(dtype).min * np.ones(reduced_shape, dtype=dtype) - else: - res_out_ = -np.inf * np.ones(reduced_shape, dtype=dtype) - out = (np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX), res_out_) - return out if isinstance(out, tuple) else (out, None) + elif reduce_op in {ReduceOp.ARGMIN, ReduceOp.ARGMAX}: + out = np.zeros(reduced_shape, dtype=blosc2.DEFAULT_INDEX) + return out def chunked_eval( @@ -3141,6 +3105,38 @@ def argmin( } return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + def cumulative_sum( + self, + axis=None, + include_initial: bool = False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): + reduce_args = { + "op": ReduceOp.CUMULATIVE_SUM, + "axis": axis, + "include_initial": include_initial, + } + if self.ndim != 1 and axis is None: + raise ValueError("axis must be specified for cumulative_sum of non-1D array.") + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + + def cumulative_prod( + self, + axis=None, + include_initial: bool = False, + fp_accuracy: blosc2.FPAccuracy = blosc2.FPAccuracy.DEFAULT, + **kwargs, + ): + reduce_args = { + "op": ReduceOp.CUMULATIVE_PROD, + "axis": axis, + "include_initial": include_initial, + } + if self.ndim != 1 and axis is None: + raise ValueError("axis must be specified for cumulative_prod of non-1D array.") + return self.compute(_reduce_args=reduce_args, fp_accuracy=fp_accuracy, **kwargs) + def _eval_constructor(self, expression, constructor, operands): """Evaluate a constructor function inside a string expression.""" @@ -3192,22 +3188,7 @@ def find_args(expr): return value, expression[idx:idx2] - def _compute_expr(self, item, kwargs): # noqa : C901 - # ne_evaluate will need safe_blosc2_globals for some functions (e.g. clip, logaddexp) - # that are implemented in python-blosc2 not in numexpr - global safe_blosc2_globals - if len(safe_blosc2_globals) == 0: - # First eval call, fill blosc2_safe_globals for ne_evaluate - safe_blosc2_globals = {"blosc2": blosc2} - # Add all first-level blosc2 functions - safe_blosc2_globals.update( - { - name: getattr(blosc2, name) - for name in dir(blosc2) - if callable(getattr(blosc2, name)) and not name.startswith("_") - } - ) - + def _compute_expr(self, item, kwargs): if any(method in self.expression for method in eager_funcs): # We have reductions in the expression (probably coming from a string lazyexpr) # Also includes slice diff --git a/src/blosc2/ndarray.py b/src/blosc2/ndarray.py index 13144f4cf..121d6a737 100644 --- a/src/blosc2/ndarray.py +++ b/src/blosc2/ndarray.py @@ -538,6 +538,78 @@ def sum( return ndarr.sum(axis=axis, dtype=dtype, keepdims=keepdims, **kwargs) +def cumulative_sum( + ndarr: blosc2.Array, + axis: int | tuple[int] | None = None, + dtype: np.dtype | str = None, + include_initial: bool = False, + **kwargs: Any, +) -> blosc2.Array: + """ + Calculates the cumulative sum of elements in the input array ndarr. + + Parameters + ----------- + ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` + The input array or expression. + axis: int + Axis along which a cumulative sum must be computed. If array is 1D, axis may be None; otherwise the axis must be specified. + dtype: dtype + Data type of the returned array. + include_initial : bool + Boolean indicating whether to include the initial value as the first value in the output. Initial value will be zero. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. + kwargs: dict, optional + Additional keyword arguments supported by the :func:`empty` constructor. + + Returns + ------- + out: blosc2.Array + An array containing the cumulative sums. Let N be the size of the axis along which to compute the cumulative sum. + If include_initial is True, the returned array has the same shape as ndarr, except the size of the axis along which to compute the cumulative sum is N+1. + If include_initial is False, the returned array has the same shape as ndarr. + """ + return ndarr.cumulative_sum(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + +def cumulative_prod( + ndarr: blosc2.Array, + axis: int | tuple[int] | None = None, + dtype: np.dtype | str = None, + include_initial: bool = False, + **kwargs: Any, +) -> blosc2.Array: + """ + Calculates the cumulative product of elements in the input array ndarr. + + Parameters + ----------- + ndarr: :ref:`NDArray` or :ref:`NDField` or :ref:`C2Array` or :ref:`LazyExpr` + The input array or expression. + axis: int + Axis along which a cumulative product must be computed. If array is 1D, axis may be None; otherwise the axis must be specified. + dtype: dtype + Data type of the returned array. + include_initial : bool + Boolean indicating whether to include the initial value as the first value in the output. Initial value will be one. Default: False. + fp_accuracy: :ref:`blosc2.FPAccuracy`, optional + Specifies the floating-point accuracy for reductions on :ref:`LazyExpr`. + Passed to :func:`LazyExpr.compute` when :paramref:`ndarr` is a LazyExpr. + kwargs: dict, optional + Additional keyword arguments supported by the :func:`empty` constructor. + + Returns + ------- + out: blosc2.Array + An array containing the cumulative products. Let N be the size of the axis along which to compute the cumulative product. + If include_initial is True, the returned array has the same shape as ndarr, except the size of the axis along which to compute the cumulative product is N+1. + If include_initial is False, the returned array has the same shape as ndarr. + """ + return ndarr.cumulative_prod(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + def mean( ndarr: blosc2.Array, axis: int | tuple[int] | None = None, @@ -3371,6 +3443,16 @@ def sum(self, axis=None, dtype=None, keepdims=False, **kwargs): expr = blosc2.LazyExpr(new_op=(self, None, None)) return expr.sum(axis=axis, dtype=dtype, keepdims=keepdims, **kwargs) + @is_documented_by(cumulative_sum) + def cumulative_sum(self, axis=None, dtype=None, include_initial=False, **kwargs): + expr = blosc2.LazyExpr(new_op=(self, None, None)) + return expr.cumulative_sum(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + + @is_documented_by(cumulative_prod) + def cumulative_prod(self, axis=None, dtype=None, include_initial=False, **kwargs): + expr = blosc2.LazyExpr(new_op=(self, None, None)) + return expr.cumulative_prod(axis=axis, dtype=dtype, include_initial=include_initial, **kwargs) + @is_documented_by(mean) def mean(self, axis=None, dtype=None, keepdims=False, **kwargs): expr = blosc2.LazyExpr(new_op=(self, None, None)) diff --git a/src/blosc2/utils.py b/src/blosc2/utils.py index 2903c8c3d..5ce7b1122 100644 --- a/src/blosc2/utils.py +++ b/src/blosc2/utils.py @@ -9,12 +9,15 @@ import builtins import math import warnings +from itertools import product import ndindex import numpy as np from ndindex.subindex_helpers import ceiling from numpy import broadcast_shapes +import blosc2 + # NumPy version and a convenient boolean flag NUMPY_GE_2_0 = np.__version__ >= "2.0" # handle different numpy versions @@ -24,11 +27,19 @@ npbinvert = np.bitwise_invert npvecdot = np.vecdot nptranspose = np.permute_dims + if hasattr(np, "cumulative_sum"): + npcumsum = np.cumulative_sum + npcumprod = np.cumulative_prod + else: + npcumsum = np.cumsum + npcumprod = np.cumprod else: # not array-api compliant nplshift = np.left_shift nprshift = np.right_shift npbinvert = np.bitwise_not nptranspose = np.transpose + npcumsum = np.cumsum + npcumprod = np.cumprod def npvecdot(a, b, axis=-1): return np.einsum("...i,...i->...", np.moveaxis(np.conj(a), axis, -1), np.moveaxis(b, axis, -1)) @@ -107,16 +118,11 @@ def npvecdot(a, b, axis=-1): "sinh", "sqrt", "square", - "std", "subtract", - "sum", "tan", "tanh", "trunc", - "var", "where", - "zeros", - "zeros_like", ] linalg_funcs = [ @@ -148,11 +154,15 @@ def npvecdot(a, b, axis=-1): "count_nonzero", "argmax", "argmin", + "cumulative_sum", + "cumulative_prod", ] # All the available constructors and reducers necessary for the (string) expression evaluator constructors = [ + "asarray", "arange", + "copy", "linspace", "fromiter", "zeros", @@ -166,7 +176,11 @@ def npvecdot(a, b, axis=-1): "empty_like", "eye", "nans", + "ndarray_from_cframe", + "uninit", + "meshgrid", ] + # Note that, as reshape is accepted as a method too, it should always come last in the list constructors += ["reshape"] @@ -369,9 +383,20 @@ def elementwise(*args): return broadcast_shapes(*args) +def cumulative_shape(x, axis=None, include_initial=False, out=None): + if axis is None: + if len(x) == 1: + axis = 0 + else: + raise ValueError("axis can only be None for 1D arrays") + return tuple(d + 1 if (i == axis and include_initial) else d for i, d in enumerate(x)) + + # --- Function registry --- REDUCTIONS = { # ignore out arg - func: lambda x, axis=None, keepdims=False, out=None: reduce_shape(x, axis, keepdims) + func: cumulative_shape + if func in {"cumulative_sum", "cumulative_prod"} + else lambda x, axis=None, keepdims=False, out=None: reduce_shape(x, axis, keepdims) for func in reducers # any unknown function will default to elementwise } @@ -758,14 +783,81 @@ def _get_local_slice(prior_selection, post_selection, chunk_bounds): return locbegin, locend -def get_intersecting_chunks(_slice, shape, chunks): - if 0 not in chunks: - chunk_size = ndindex.ChunkSize(chunks) - return chunk_size.as_subchunks(_slice, shape) # if _slice is (), returns all chunks +def _sliced_chunk_iter(chunks, idx, shape, axis=None, nchunk=False): + """ + If nchunk is True, retrun at iterator over the number of the chunk. + """ + ratio = np.ceil(np.asarray(shape) / np.asarray(chunks)).astype(np.int64) + idx = ndindex.ndindex(idx).expand(shape) + if axis is not None: + idx = tuple(a for i, a in enumerate(idx.args) if i != axis) + (idx.args[axis],) + chunks_ = tuple(a for i, a in enumerate(chunks) if i != axis) + (chunks[axis],) else: - return ( - ndindex.ndindex(...).expand(shape), - ) # chunk is whole array so just return full tuple to do loop once + chunks_ = chunks + idx_iter = iter(idx) # iterate over tuple of slices in order + chunk_iter = iter(chunks_) # iterate over chunk_shape in order + + iters = [] + while True: + try: + i = next(idx_iter) # slice along axis + n = next(chunk_iter) # chunklen along dimension + except StopIteration: + break + if not isinstance(i, ndindex.Slice): + raise ValueError("Only slices may be used with axis arg") + + def _slice_iter(s, n): + a, N, m = s.args + if m > n: + yield from ((a + k * m) // n for k in range(ceiling(N - a, m))) + else: + yield from range(a // n, ceiling(N, n)) + + iters.append(_slice_iter(i, n)) + + def _indices(iters): + my_list = [ndindex.Slice(None, None)] * len(chunks) + for p in product(*iters): + # p increments over arg axis first before other axes + # p = (...., -1, axis) + if axis is None: + my_list = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape, chunks, p, strict=True) + ] + else: + my_list[:axis] = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape[:axis], chunks[:axis], p[:axis], strict=True) + ] + n, cs, ci = shape[axis], chunks[axis], p[-1] + my_list[axis] = ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + my_list[axis + 1 :] = [ + ndindex.Slice(cs * ci, min(cs * (ci + 1), n), 1) + for n, cs, ci in zip(shape[axis + 1 :], chunks[axis + 1 :], p[axis:-1], strict=True) + ] + if nchunk: + yield builtins.sum( + [c.start // chunks[i] * np.prod(ratio[i + 1 :]) for i, c in enumerate(my_list)] + ) + else: + yield ndindex.Tuple(*my_list) + + yield from _indices(iters) + + +def get_intersecting_chunks(idx, shape, chunks, axis=None): + if len(chunks) != len(shape): + raise ValueError("chunks must be same length as shape!") + if 0 in chunks: # chunk is whole array so just return full tuple to do loop once + return (ndindex.ndindex(...).expand(shape),), range(0) + chunk_size = ndindex.ChunkSize(chunks) + if axis is None: + return chunk_size.as_subchunks(idx, shape) # if _slice is (), returns all chunks + + # special algorithm to iterate over axis first (adapted from ndindex source) + return _sliced_chunk_iter(chunks, idx, shape, axis) def get_chunks_idx(shape, chunks): @@ -781,3 +873,107 @@ def process_key(key, shape): ) # mask to track dummy dims introduced by int -> slice(k, k+1) key = tuple(slice(k, k + 1, None) if isinstance(k, int) else k for k in key) # key is slice, None, int return key, mask + + +def check_smaller_shape(value_shape, shape, slice_shape, slice_): + """Check whether the shape of the value is smaller than the shape of the array. + + This follows the NumPy broadcasting rules. + """ + # slice_shape must be as long as shape + if len(slice_shape) != len(slice_): + raise ValueError("slice_shape must be as long as slice_") + no_nones_shape = tuple(sh for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) + no_nones_slice = tuple(s for sh, s in zip(slice_shape, slice_, strict=True) if s is not None) + is_smaller_shape = any( + s > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_shape) + ) + slice_past_bounds = any( + s.stop > (1 if i >= len(value_shape) else value_shape[i]) for i, s in enumerate(no_nones_slice) + ) + return len(value_shape) < len(shape) or is_smaller_shape or slice_past_bounds + + +def _compute_smaller_slice(larger_shape, smaller_shape, larger_slice): + smaller_slice = [] + diff_dims = len(larger_shape) - len(smaller_shape) + + for i in range(len(larger_shape)): + if i < diff_dims: + # For leading dimensions of the larger array that the smaller array doesn't have, + # we don't add anything to the smaller slice + pass + else: + # For dimensions that both arrays have, the slice for the smaller array should be + # the same as the larger array unless the smaller array's size along that dimension + # is 1, in which case we use None to indicate the full slice + if smaller_shape[i - diff_dims] != 1: + smaller_slice.append(larger_slice[i]) + else: + smaller_slice.append(slice(0, larger_shape[i])) + + return tuple(smaller_slice) + + +# A more compact version of the function above, albeit less readable +def compute_smaller_slice(larger_shape, smaller_shape, larger_slice): + """ + Returns the slice of the smaller array that corresponds to the slice of the larger array. + """ + j_small = len(smaller_shape) - 1 + j_large = len(larger_shape) - 1 + smaller_shape_nones = [] + larger_shape_nones = [] + for s in reversed(larger_slice): + if s is None: + smaller_shape_nones.append(1) + larger_shape_nones.append(1) + else: + if j_small >= 0: + smaller_shape_nones.append(smaller_shape[j_small]) + j_small -= 1 + if j_large >= 0: + larger_shape_nones.append(larger_shape[j_large]) + j_large -= 1 + smaller_shape_nones.reverse() + larger_shape_nones.reverse() + diff_dims = len(larger_shape_nones) - len(smaller_shape_nones) + return tuple( + None + if larger_slice[i] is None + else ( + larger_slice[i] if smaller_shape_nones[i - diff_dims] != 1 else slice(0, larger_shape_nones[i]) + ) + for i in range(diff_dims, len(larger_shape_nones)) + ) + + +def _get_chunk_operands(operands, cslice, chunk_operands, shape): + # Get the starts and stops for the slice + cslice_shape = tuple(s.stop - s.start for s in cslice) + starts = [s.start if s.start is not None else 0 for s in cslice] + stops = [s.stop if s.stop is not None else sh for s, sh in zip(cslice, cslice_shape, strict=True)] + unit_steps = np.all([s.step == 1 for s in cslice]) + # Get the slice of each operand + for key, value in operands.items(): + if np.isscalar(value): + chunk_operands[key] = value + continue + if value.shape == (): + chunk_operands[key] = value[()] + continue + if check_smaller_shape(value.shape, shape, cslice_shape, cslice): + # We need to fetch the part of the value that broadcasts with the operand + smaller_slice = compute_smaller_slice(shape, value.shape, cslice) + chunk_operands[key] = value[smaller_slice] + continue + # If key is in operands, we can reuse the buffer + if ( + key in chunk_operands + and cslice_shape == chunk_operands[key].shape + and isinstance(value, blosc2.NDArray) + and unit_steps + ): + value.get_slice_numpy(chunk_operands[key], (starts, stops)) + continue + chunk_operands[key] = value[cslice] diff --git a/tests/ndarray/test_reductions.py b/tests/ndarray/test_reductions.py index ca14f7905..a3d445d86 100644 --- a/tests/ndarray/test_reductions.py +++ b/tests/ndarray/test_reductions.py @@ -11,7 +11,7 @@ import pytest import blosc2 -from blosc2.lazyexpr import ne_evaluate +from blosc2.lazyexpr import ne_evaluate, npcumprod, npcumsum NITEMS_SMALL = 1000 NITEMS = 10_000 @@ -52,24 +52,34 @@ def array_fixture(dtype_fixture, shape_fixture): # @pytest.mark.parametrize("reduce_op", ["sum"]) -@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +@pytest.mark.parametrize( + "reduce_op", + ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin", "cumulative_sum", "cumulative_prod"], +) def test_reduce_bool(array_fixture, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture expr = (a1 + a2) > (a3 * a4) nres = ne_evaluate("(na1 + na2) > (na3 * na4)") - res = getattr(expr, reduce_op)() - # res = expr.sum() - # print("res:", res) - nres = getattr(nres, reduce_op)() + axis = None + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(nres, axis={axis})") + else: + nres = getattr(nres, reduce_op)(axis=axis) + res = getattr(expr, reduce_op)(axis=axis) tol = 1e-15 if a1.dtype == "float64" else 1e-6 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # @pytest.mark.parametrize("reduce_op", ["sum"]) -@pytest.mark.parametrize("reduce_op", ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin"]) +@pytest.mark.parametrize( + "reduce_op", + ["sum", "prod", "min", "max", "any", "all", "argmax", "argmin", "cumulative_sum", "cumulative_prod"], +) def test_reduce_where(array_fixture, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture - if reduce_op == "prod": + if reduce_op in {"prod", "cumulative_prod"}: # To avoid overflow, create a1 and a2 with small values na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) a1 = blosc2.asarray(na1) @@ -80,10 +90,16 @@ def test_reduce_where(array_fixture, reduce_op): else: expr = blosc2.where(a1 < a2, a2, a1) nres = eval("np.where(na1 < na2, na2, na1)") - res = getattr(expr, reduce_op)() - nres = getattr(nres, reduce_op)() + axis = None + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(nres, axis={axis})") + else: + nres = getattr(nres, reduce_op)(axis=axis) + res = getattr(expr, reduce_op)(axis=axis) # print("res:", res, nres, type(res), type(nres)) - tol = 1e-15 if a1.dtype == "float64" else 1e-6 + tol = 1e-12 if a1.dtype == "float64" else 1e-5 np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @@ -105,7 +121,22 @@ def test_fp_accuracy(accuracy, dtype): @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "mean", + "std", + "var", + "min", + "max", + "any", + "all", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [1, (0, 1), None]) @pytest.mark.parametrize("keepdims", [True, False]) @@ -117,11 +148,22 @@ def test_fp_accuracy(accuracy, dtype): @pytest.mark.heavy def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwargs): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture + reduce_args = {"axis": axis} + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + if npcumprod.__name__ == "cumulative_prod": + reduce_args["include_initial"] = keepdims # include_initial only available in cumulative_ + else: + reduce_args["keepdims"] = keepdims + if reduce_op in ("mean", "std") and dtype_out == np.int16: + # mean and std need float dtype as output + dtype_out = np.float64 + if reduce_op in ("sum", "prod", "mean", "std"): + reduce_args["dtype"] = dtype_out if axis is not None and np.isscalar(axis) and len(a1.shape) >= axis: return if isinstance(axis, tuple) and (len(a1.shape) < len(axis) or reduce_op in ("argmax", "argmin")): return - if reduce_op == "prod": + if reduce_op in {"prod", "cumulative_prod"}: # To avoid overflow, create a1 and a2 with small values na1 = np.linspace(0, 0.1, np.prod(a1.shape), dtype=np.float32).reshape(a1.shape) a1 = blosc2.asarray(na1) @@ -132,15 +174,9 @@ def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwar else: expr = a1 + a2 - a3 * a4 nres = eval("na1 + na2 - na3 * na4") - if reduce_op in ("sum", "prod", "mean", "std"): - if reduce_op in ("mean", "std") and dtype_out == np.int16: - # mean and std need float dtype as output - dtype_out = np.float64 - res = getattr(expr, reduce_op)(axis=axis, keepdims=keepdims, dtype=dtype_out, **kwargs) - nres = getattr(nres, reduce_op)(axis=axis, keepdims=keepdims, dtype=dtype_out) - else: - res = getattr(expr, reduce_op)(axis=axis, keepdims=keepdims, **kwargs) - nres = getattr(nres, reduce_op)(axis=axis, keepdims=keepdims) + + res = getattr(expr, reduce_op)(**reduce_args, **kwargs) + nres = getattr(nres, reduce_op)(**reduce_args) tol = 1e-15 if a1.dtype == "float64" else 1e-6 if kwargs != {}: if not np.isscalar(res): @@ -152,24 +188,57 @@ def test_reduce_params(array_fixture, axis, keepdims, dtype_out, reduce_op, kwar # TODO: "prod" is not supported here because it overflows with current values @pytest.mark.parametrize( - "reduce_op", ["sum", "min", "max", "mean", "std", "var", "any", "all", "argmax", "argmin"] + "reduce_op", + ["cumulative_sum", "sum", "min", "max", "mean", "std", "var", "any", "all", "argmax", "argmin"], ) -@pytest.mark.parametrize("axis", [0, 1, None]) +@pytest.mark.parametrize("axis", [None, 0, 1]) def test_reduce_expr_arr(array_fixture, axis, reduce_op): a1, a2, a3, a4, na1, na2, na3, na4 = array_fixture - if axis is not None and len(a1.shape) >= axis: - return + if axis is not None: + if len(a1.shape) <= axis: + return + else: + if reduce_op == "cumulative_sum": + return expr = a1 + a2 - a3 * a4 nres = eval("na1 + na2 - na3 * na4") + tol = 1e-12 if a1.dtype == "float64" else 5e-5 res = getattr(expr, reduce_op)(axis=axis) + getattr(a1, reduce_op)(axis=axis) - nres = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) - tol = 1e-15 if a1.dtype == "float64" else 1e-6 - np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) + if reduce_op == "cumulative_sum": + nres_ = npcumsum(nres, axis=axis) + npcumsum(na1, axis=axis) + else: + nres_ = getattr(nres, reduce_op)(axis=axis) + getattr(na1, reduce_op)(axis=axis) + try: + np.testing.assert_allclose(res, nres_, atol=tol, rtol=tol) + except AssertionError as e: + if reduce_op == "cumulative_sum": + sl = tuple(slice(None, None) if i != axis else -1 for i in range(a1.ndim)) + _nres_ = np.sum(nres, axis=axis) + np.sum(na1, axis=axis) + npcumsumVsnpsum = np.max(np.abs(nres_[sl] - _nres_)) + blosccumsumVsnpsum = np.max(np.abs(res[sl] - _nres_)) + print(blosccumsumVsnpsum, npcumsumVsnpsum) + if blosccumsumVsnpsum < npcumsumVsnpsum: + return + raise e # Test broadcasting @pytest.mark.parametrize( - "reduce_op", ["sum", "mean", "std", "var", "min", "max", "any", "all", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "mean", + "std", + "var", + "min", + "max", + "any", + "all", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) @pytest.mark.parametrize("keepdims", [True, False]) @@ -182,8 +251,15 @@ def test_reduce_expr_arr(array_fixture, axis, reduce_op): ], ) def test_broadcast_params(axis, keepdims, reduce_op, shapes): - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if reduce_op[:3] == "cum" else axis + reduce_args = {"axis": axis} + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + if npcumprod.__name__ == "cumulative_prod": + reduce_args["include_initial"] = keepdims # include_initial only available in cumulative_ + else: + reduce_args["keepdims"] = keepdims na1 = np.linspace(0, 1, np.prod(shapes[0])).reshape(shapes[0]) na2 = np.linspace(1, 2, np.prod(shapes[1])).reshape(shapes[1]) na3 = np.linspace(2, 3, np.prod(shapes[2])).reshape(shapes[2]) @@ -192,12 +268,19 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): a3 = blosc2.asarray(na3) expr1 = a1 + a2 - a3 assert expr1.shape == shapes[0] - expr2 = a1 * a2 + 1 - assert expr2.shape == shapes[0] - res = expr1 - getattr(expr2, reduce_op)(axis=axis, keepdims=keepdims) - assert res.shape == shapes[0] + expr2 = a2 * a3 + 1 + assert expr2.shape == shapes[1] # print(f"res: {res.shape} expr1: {expr1.shape} expr2: {expr2.shape}") - nres = eval(f"na1 + na2 - na3 - (na1 * na2 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})") + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + res = expr2 - getattr(expr1, reduce_op)(**reduce_args) + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + expr = f"na2 * na3 + 1 - {oploc}(na1 + na2 - na3, axis={axis}" + include_inital = reduce_args.get("include_initial", False) + expr += f", include_initial={keepdims})" if include_inital else ")" + else: + res = expr1 - getattr(expr2, reduce_op)(**reduce_args) + expr = f"na1 + na2 - na3 - (na2 * na3 + 1).{reduce_op}(axis={axis}, keepdims={keepdims})" + nres = eval(expr) tol = 1e-14 if a1.dtype == "float64" else 1e-5 np.testing.assert_allclose(res[:], nres, atol=tol, rtol=tol) @@ -205,7 +288,22 @@ def test_broadcast_params(axis, keepdims, reduce_op, shapes): # Test reductions with item parameter @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("dtype", [np.float32, np.float64]) @pytest.mark.parametrize("stripes", ["rows", "columns"]) @@ -223,7 +321,7 @@ def test_reduce_item(reduce_op, dtype, stripes, stripe_len, shape, chunks): else: _slice = (slice(None), slice(i, i + stripe_len)) slice_ = na[_slice] - if slice_.size == 0 and reduce_op not in ("sum", "prod"): + if slice_.size == 0 and reduce_op not in ("sum", "prod", "cumulative_sum", "cumulative_prod"): # For mean, std, and var, numpy just raises a warning, so don't check if reduce_op in ("min", "max", "argmin", "argmax"): # Check that a ValueError is raised when the slice is empty @@ -238,7 +336,22 @@ def test_reduce_item(reduce_op, dtype, stripes, stripe_len, shape, chunks): @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) def test_reduce_slice(reduce_op): shape = (8, 12, 5) @@ -247,24 +360,30 @@ def test_reduce_slice(reduce_op): tol = 1e-6 if na.dtype == np.float32 else 1e-15 _slice = (slice(1, 2, 1), slice(3, 7, 1)) res = getattr(a, reduce_op)(item=_slice, axis=-1) - nres = getattr(na[_slice], reduce_op)(axis=-1) + if reduce_op == "cumulative_sum": + oploc = "npcumsum" + elif reduce_op == "cumulative_prod": + oploc = "npcumprod" + else: + oploc = f"np.{reduce_op}" + nres = eval(f"{oploc}(na[_slice], axis=-1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test reductions with slices and strides _slice = (slice(1, 2, 1), slice(1, 9, 2)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) # Test reductions with ints _slice = (0, slice(1, 9, 1)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) _slice = (0, slice(1, 9, 2)) res = getattr(a, reduce_op)(item=_slice, axis=1) - nres = getattr(na[_slice], reduce_op)(axis=1) + nres = eval(f"{oploc}(na[_slice], axis=1)") np.testing.assert_allclose(res, nres, atol=tol, rtol=tol) @@ -272,19 +391,34 @@ def test_reduce_slice(reduce_op): @pytest.mark.parametrize( ("chunks", "blocks"), [ + ((10, 50, 70), (10, 25, 50)), ((20, 50, 100), (10, 50, 100)), - ((10, 25, 70), (10, 25, 50)), ((10, 50, 100), (6, 25, 75)), ((15, 30, 75), (7, 20, 50)), - ((20, 50, 100), (10, 50, 60)), + ((1, 50, 100), (1, 50, 60)), ], ) @pytest.mark.parametrize("disk", [True, False]) -@pytest.mark.parametrize("fill_value", [0, 1, 0.32]) +@pytest.mark.parametrize("fill_value", [1, 0, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) -@pytest.mark.parametrize("axis", [0, 1, None]) +@pytest.mark.parametrize("axis", [None, 0, 1]) def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): shape = (20, 50, 100) urlpath = "a1.b2nd" if disk else None @@ -295,10 +429,26 @@ def test_fast_path(chunks, blocks, disk, fill_value, reduce_op, axis): if disk: a = blosc2.open(urlpath) na = a[:] - + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 if axis is None else axis + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + else: + nres = getattr(na, reduce_op)(axis=axis) res = getattr(a, reduce_op)(axis=axis) - nres = getattr(na, reduce_op)(axis=axis) + assert np.allclose(res, nres) + # Try with a slice + b = blosc2.linspace(0, 1, blocks=blocks, chunks=chunks, shape=shape, dtype=a.dtype) + nb = b[:] + slice_ = (slice(5, 7),) + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + axis = 0 if axis is None else axis + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}((na + nb)[{slice_}], axis={axis})") + else: + nres = getattr((na + nb)[slice_], reduce_op)(axis=axis) + res = getattr(a + b, reduce_op)(axis=axis, item=slice_) assert np.allclose(res, nres) @@ -340,13 +490,29 @@ def test_miniexpr_slice(chunks, blocks, disk, fill_value, reduce_op): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version1(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -369,7 +535,11 @@ def test_save_version1(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = na + getattr(nb[()], reduce_op)(axis=axis) + 1 + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = na + eval(f"{oploc}(nb, axis={axis})") + 1 + else: + nres = na + getattr(nb, reduce_op)(axis=axis) + 1 assert np.allclose(res[()], nres) if disk: @@ -381,13 +551,29 @@ def test_save_version1(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version2(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -409,7 +595,11 @@ def test_save_version2(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + nb + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + nb + else: + nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) if disk: @@ -421,13 +611,29 @@ def test_save_version2(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version3(disk, fill_value, reduce_op, axis): shape = (20, 50, 100) - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis shape = (20, 20, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -449,7 +655,11 @@ def test_save_version3(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + nb + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + nb + else: + nres = getattr(na, reduce_op)(axis=axis) + nb assert np.allclose(res[()], nres) if disk: @@ -461,12 +671,29 @@ def test_save_version3(disk, fill_value, reduce_op, axis): @pytest.mark.parametrize("disk", [True, False]) @pytest.mark.parametrize("fill_value", [0, 1, 0.32]) @pytest.mark.parametrize( - "reduce_op", ["sum", "prod", "min", "max", "any", "all", "mean", "std", "var", "argmax", "argmin"] + "reduce_op", + [ + "sum", + "prod", + "min", + "max", + "any", + "all", + "mean", + "std", + "var", + "argmax", + "argmin", + "cumulative_sum", + "cumulative_prod", + ], ) @pytest.mark.parametrize("axis", [0, (0, 1), None]) def test_save_version4(disk, fill_value, reduce_op, axis): - if isinstance(axis, tuple) and (reduce_op in ("argmax", "argmin")): - axis = 1 + if reduce_op in ("argmax", "argmin", "cumulative_sum", "cumulative_prod"): + axis = 1 if isinstance(axis, tuple) else axis + axis = 0 if (reduce_op[:3] == "cum" and axis is None) else axis + shape = (20, 20, 100) shape = (20, 50, 100) urlpath = "a1.b2nd" if disk else None if fill_value != 0: @@ -487,7 +714,11 @@ def test_save_version4(disk, fill_value, reduce_op, axis): lexpr.save("out.b2nd") lexpr = blosc2.open("out.b2nd") res = lexpr.compute() - nres = getattr(na[()], reduce_op)(axis=axis) + if reduce_op in {"cumulative_sum", "cumulative_prod"}: + oploc = "npcumsum" if reduce_op == "cumulative_sum" else "npcumprod" + nres = eval(f"{oploc}(na, axis={axis})") + else: + nres = getattr(na, reduce_op)(axis=axis) assert np.allclose(res[()], nres) if disk: @@ -643,6 +874,6 @@ def test_reduce_string(): c = a**2 + b**2 + 2 * a * b + 1 # Evaluate: output is a NDArray d = blosc2.lazyexpr("sl + c.sum() + a.std()", operands={"a": a, "c": c, "sl": a.slice((1, 1))}) - sum = d.compute()[()] + sum = d[()] npsum = npa[1, 1] + np.sum(npc) + np.std(npa) - np.testing.assert_allclose(sum, npsum) + np.testing.assert_allclose(sum, npsum, rtol=1e-6, atol=1e-6)