diff --git a/xrspatial/accessor.py b/xrspatial/accessor.py index bd3bb35d..c5b80670 100644 --- a/xrspatial/accessor.py +++ b/xrspatial/accessor.py @@ -1027,7 +1027,7 @@ def open_geotiff(self, source, **kwargs): y_min, y_max = float(y.min()), float(y.max()) x_min, x_max = float(x.min()), float(x.max()) - geo_info, file_h, file_w = _read_geo_info(source) + geo_info, file_h, file_w, _dtype, _nbands = _read_geo_info(source) t = geo_info.transform # Expand extent by half a pixel so we capture edge pixels diff --git a/xrspatial/cost_distance.py b/xrspatial/cost_distance.py index dc250bc8..29a088d7 100644 --- a/xrspatial/cost_distance.py +++ b/xrspatial/cost_distance.py @@ -340,7 +340,7 @@ def _cost_distance_cupy(source_data, friction_data, cellsize_x, cellsize_y, changed = cp.zeros(1, dtype=cp.int32) griddim, blockdim = cuda_args((height, width)) - max_iterations = height + width + max_iterations = height * width for _ in range(max_iterations): changed[0] = 0 _cost_distance_relax_kernel[griddim, blockdim]( diff --git a/xrspatial/geotiff/__init__.py b/xrspatial/geotiff/__init__.py index 1a300afc..e93f525c 100644 --- a/xrspatial/geotiff/__init__.py +++ b/xrspatial/geotiff/__init__.py @@ -114,11 +114,19 @@ def _coords_to_transform(da: xr.DataArray) -> GeoTransform | None: ) -def _read_geo_info(source: str): +def _read_geo_info(source: str, *, overview_level: int | None = None): """Read only the geographic metadata and image dimensions from a GeoTIFF. - Returns (geo_info, height, width) without reading pixel data. + Returns (geo_info, height, width, dtype, n_bands) without reading pixel + data. Uses mmap for header-only access -- O(1) memory regardless of file + size. + + Parameters + ---------- + overview_level : int or None + Overview IFD index (0 = full resolution). """ + from ._dtypes import tiff_dtype_to_numpy from ._geotags import extract_geo_info from ._header import parse_all_ifds, parse_header @@ -128,9 +136,17 @@ def _read_geo_info(source: str): try: header = parse_header(data) ifds = parse_all_ifds(data, header) - ifd = ifds[0] + ifd_idx = 0 + if overview_level is not None: + ifd_idx = min(overview_level, len(ifds) - 1) + ifd = ifds[ifd_idx] geo_info = extract_geo_info(ifd, data, header.byte_order) - return geo_info, ifd.height, ifd.width + bps = ifd.bits_per_sample + if isinstance(bps, tuple): + bps = bps[0] + file_dtype = tiff_dtype_to_numpy(bps, ifd.sample_format) + n_bands = ifd.samples_per_pixel if ifd.samples_per_pixel > 1 else 0 + return geo_info, ifd.height, ifd.width, file_dtype, n_bands finally: data.close() @@ -873,11 +889,9 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512, if source.lower().endswith('.vrt'): return read_vrt(source, dtype=dtype, name=name, chunks=chunks) - # First, do a metadata-only read to get shape, dtype, coords, attrs - arr, geo_info = read_to_array(source, overview_level=overview_level) - full_h, full_w = arr.shape[:2] - n_bands = arr.shape[2] if arr.ndim == 3 else 0 - file_dtype = arr.dtype + # Metadata-only read: O(1) memory via mmap, no pixel decompression + geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info( + source, overview_level=overview_level) nodata = geo_info.nodata # Nodata masking promotes integer arrays to float64 (for NaN). diff --git a/xrspatial/kde.py b/xrspatial/kde.py index 1f8a032b..33e09029 100644 --- a/xrspatial/kde.py +++ b/xrspatial/kde.py @@ -324,15 +324,39 @@ def _run_kde_cupy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id): return out +def _filter_points_to_tile(xs, ys, ws, tile_x0, tile_y0, dx, dy, + tile_rows, tile_cols, cutoff): + """Return (xs, ys, ws) subset that could affect this tile. + + Points whose cutoff circle doesn't overlap the tile extent are + excluded, reducing serialization and speeding up the kernel. + """ + tile_x1 = tile_x0 + tile_cols * dx + tile_y1 = tile_y0 + tile_rows * dy + mask = ((xs >= tile_x0 - cutoff) & (xs <= tile_x1 + cutoff) & + (ys >= tile_y0 - cutoff) & (ys <= tile_y1 + cutoff)) + if mask.all(): + return xs, ys, ws + return xs[mask], ys[mask], ws[mask] + + def _run_kde_dask_numpy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id, chunks): - """Dask-backed KDE: each chunk computes its own tile independently.""" + """Dask-backed KDE: each chunk computes its own tile independently. + + Points are pre-filtered per tile so each delayed task receives only + the relevant subset, reducing serialization from O(n_tiles * n_points) + to O(n_tiles * points_per_tile). + """ # Determine chunk layout if chunks is None: chunks = (min(256, shape[0]), min(256, shape[1])) row_splits = _split_sizes(shape[0], chunks[0]) col_splits = _split_sizes(shape[1], chunks[1]) + # Cutoff radius matching the kernel implementation + cutoff = 4.0 * bw if kernel_id == 0 else bw + blocks = [] row_off = 0 for rs in row_splits: @@ -342,8 +366,11 @@ def _run_kde_dask_numpy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id, tile_y0 = y0 + row_off * dy tile_x0 = x0 + col_off * dx tile_shape = (rs, cs) + # Pre-filter points to this tile's extent + cutoff + txs, tys, tws = _filter_points_to_tile( + xs, ys, ws, tile_x0, tile_y0, dx, dy, rs, cs, cutoff) block = dask.delayed(_run_kde_numpy)( - xs, ys, ws, tile_shape, + txs, tys, tws, tile_shape, tile_x0, tile_y0, dx, dy, bw, kernel_id, ) row_blocks.append( @@ -358,12 +385,18 @@ def _run_kde_dask_numpy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id, def _run_kde_dask_cupy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id, chunks): - """Dask+CuPy KDE: each chunk uses the GPU kernel.""" + """Dask+CuPy KDE: each chunk uses the GPU kernel. + + Points are pre-filtered per tile (same as the numpy dask path) + so each delayed task serializes only the relevant subset. + """ if chunks is None: chunks = (min(256, shape[0]), min(256, shape[1])) row_splits = _split_sizes(shape[0], chunks[0]) col_splits = _split_sizes(shape[1], chunks[1]) + cutoff = 4.0 * bw if kernel_id == 0 else bw + blocks = [] row_off = 0 for rs in row_splits: @@ -373,8 +406,10 @@ def _run_kde_dask_cupy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id, tile_y0 = y0 + row_off * dy tile_x0 = x0 + col_off * dx tile_shape = (rs, cs) + txs, tys, tws = _filter_points_to_tile( + xs, ys, ws, tile_x0, tile_y0, dx, dy, rs, cs, cutoff) block = dask.delayed(_run_kde_cupy)( - xs, ys, ws, tile_shape, + txs, tys, tws, tile_shape, tile_x0, tile_y0, dx, dy, bw, kernel_id, ) row_blocks.append( diff --git a/xrspatial/pathfinding.py b/xrspatial/pathfinding.py index 3f274a56..76beb12e 100644 --- a/xrspatial/pathfinding.py +++ b/xrspatial/pathfinding.py @@ -1075,7 +1075,9 @@ def a_star_search(surface: xr.DataArray, else: _check_memory(h, w) if friction_np is None: - friction_np = np.ones((h, w), dtype=np.float64) + # Dummy 1x1 array: kernel never reads friction when + # use_friction is False, so avoid allocating h*w float64. + friction_np = np.ones((1, 1), dtype=np.float64) path_img = np.full(surface.shape, np.nan, dtype=np.float64) if start_py != NONE: _a_star_search(surface_np, path_img, start_py, start_px, @@ -1132,7 +1134,9 @@ def a_star_search(surface: xr.DataArray, # Full-grid A* _check_memory(h, w) if friction_data is None: - friction_data = np.ones((h, w), dtype=np.float64) + # Dummy 1x1 array: kernel never reads friction when + # use_friction is False, so avoid allocating h*w float64. + friction_data = np.ones((1, 1), dtype=np.float64) path_img = np.full(surface.shape, np.nan, dtype=np.float64) if start_py != NONE: _a_star_search(surface_data, path_img, start_py, start_px, diff --git a/xrspatial/polygon_clip.py b/xrspatial/polygon_clip.py index f8e2fe65..59fe028b 100644 --- a/xrspatial/polygon_clip.py +++ b/xrspatial/polygon_clip.py @@ -201,46 +201,55 @@ def clip_polygon( mask = rasterize(geom_pairs, **kw) - # Get mask as a plain numpy array regardless of what rasterize returned - mask_np = mask.data - if has_dask_array() and isinstance(mask_np, da.Array): - mask_np = mask_np.compute() - if has_cuda_and_cupy() and is_cupy_array(mask_np): - mask_np = mask_np.get() - - # Apply the mask. For CuPy backends we operate on the raw array - # to avoid an xarray/cupy incompatibility in ``DataArray.where()``. - cond_np = mask_np == 1 - - if has_cuda_and_cupy() and is_cupy_array(raster.data): + # Apply the mask. Keep it lazy for dask backends to avoid + # materializing the full mask into RAM (which would OOM for 30TB + # inputs). For non-dask backends, compute the mask eagerly. + mask_data = mask.data + + if has_dask_array() and isinstance(raster.data, da.Array): + # Dask path: keep mask lazy -- no .compute() + if isinstance(mask_data, da.Array): + cond = mask_data == 1 + else: + # Mask came back non-dask despite dask input (shouldn't happen, + # but handle gracefully) + cond = da.from_array( + np.asarray(mask_data == 1) if not is_cupy_array(mask_data) + else mask_data.get() == 1, + chunks=raster.data.chunks[-2:], + ) + + if has_cuda_and_cupy() and is_dask_cupy(raster): + # dask+cupy: use map_blocks with both raster and condition + def _apply_mask(raster_block, cond_block): + import cupy + out = raster_block.copy() + out[~cond_block.astype(bool)] = nodata + return out + + out = da.map_blocks( + _apply_mask, raster.data, cond, + dtype=raster.dtype, + ) + result = xr.DataArray(out, dims=raster.dims, coords=raster.coords) + else: + # dask+numpy: xarray.where handles lazy condition natively + result = raster.where(cond, other=nodata) + elif has_cuda_and_cupy() and is_cupy_array(raster.data): + # Pure CuPy: operate on raw arrays to avoid xarray/cupy + # incompatibility in DataArray.where(). import cupy - cond_cp = cupy.asarray(cond_np) + if is_cupy_array(mask_data): + cond_cp = mask_data == 1 + else: + cond_cp = cupy.asarray(np.asarray(mask_data) == 1) out = raster.data.copy() out[~cond_cp] = nodata result = xr.DataArray(out, dims=raster.dims, coords=raster.coords) - elif has_cuda_and_cupy() and is_dask_cupy(raster): - import cupy - cond_cp = cupy.asarray(cond_np) - - def _apply_mask(block, block_info=None): - if block_info is not None: - loc = block_info[0]['array-location'] - y_sl = slice(loc[-2][0], loc[-2][1]) - x_sl = slice(loc[-1][0], loc[-1][1]) - c = cond_cp[y_sl, x_sl] - else: - c = cond_cp - out = block.copy() - out[~c] = nodata - return out - - out = raster.data.map_blocks(_apply_mask, dtype=raster.dtype) - result = xr.DataArray(out, dims=raster.dims, coords=raster.coords) - elif has_dask_array() and isinstance(raster.data, da.Array): - cond_da = da.from_array(cond_np, chunks=raster.data.chunks[-2:]) - result = raster.where(cond_da, other=nodata) else: - result = raster.where(cond_np, other=nodata) + # Pure numpy + cond = np.asarray(mask_data) == 1 + result = raster.where(cond, other=nodata) result.attrs = raster.attrs result.name = name if name is not None else raster.name diff --git a/xrspatial/polygonize.py b/xrspatial/polygonize.py index 35d6ac0d..38030e12 100644 --- a/xrspatial/polygonize.py +++ b/xrspatial/polygonize.py @@ -1449,6 +1449,45 @@ def sort_key(item): return column, polygon_points +def _merge_from_separated(all_interior, boundary_by_value, transform): + """Merge pre-separated interior/boundary polygons into final output. + + Like _merge_chunk_polygons but takes already-separated data so the + caller can accumulate incrementally (one chunk at a time) instead of + holding all chunk_results in memory simultaneously. + """ + # Merge boundary polygons per value using edge cancellation. + merged = [] + for val, polys_list in boundary_by_value.items(): + if len(polys_list) == 1: + merged.append((val, polys_list[0])) + else: + merged_polys = _merge_polygon_rings(polys_list) + for rings in merged_polys: + merged.append((val, rings)) + + all_polys = all_interior + merged + + def sort_key(item): + ext = item[1][0] + min_y = np.min(ext[:, 1]) + min_x = np.min(ext[ext[:, 1] == min_y, 0]) + return (min_y, min_x, item[0]) + + all_polys.sort(key=sort_key) + + column = [] + polygon_points = [] + for val, rings in all_polys: + if transform is not None: + for ring in rings: + _transform_points(ring, transform) + column.append(val) + polygon_points.append(rings) + + return column, polygon_points + + def _polygonize_dask(dask_data, mask_data, connectivity_8, transform): """Dask backend for polygonize: per-chunk polygonize + edge merge.""" # Ensure mask chunks match raster chunks. @@ -1464,21 +1503,30 @@ def _polygonize_dask(dask_data, mask_data, connectivity_8, transform): ny_total = int(sum(row_chunks)) nx_total = int(sum(col_chunks)) - delayed_results = [] + # Process chunks incrementally: compute one at a time so only boundary + # polygons accumulate in memory. Interior polygons (fully inside a + # chunk, no merging needed) go straight to the output list. This keeps + # peak memory proportional to boundary_polygon_count rather than + # total_polygon_count * n_chunks. + all_interior = [] + boundary_by_value = {} + for iy in range(len(row_chunks)): for ix in range(len(col_chunks)): block = dask_data.blocks[iy, ix] mask_block = (mask_data.blocks[iy, ix] if mask_data is not None else None) - delayed_results.append( + interior, boundary = dask.compute( dask.delayed(_polygonize_chunk)( block, mask_block, connectivity_8, int(row_offsets[iy]), int(col_offsets[ix]), ny_total, nx_total, - )) + ))[0] + all_interior.extend(interior) + for val, rings in boundary: + boundary_by_value.setdefault(val, []).append(rings) - chunk_results = dask.compute(*delayed_results) - return _merge_chunk_polygons(chunk_results, transform) + return _merge_from_separated(all_interior, boundary_by_value, transform) def polygonize( diff --git a/xrspatial/reproject/__init__.py b/xrspatial/reproject/__init__.py index a454a349..f87f6c60 100644 --- a/xrspatial/reproject/__init__.py +++ b/xrspatial/reproject/__init__.py @@ -965,20 +965,18 @@ def _reproject_dask_cupy( resampling, nodata, precision, chunk_size, ): - """Dask+CuPy backend: process output chunks on GPU sequentially. - - Instead of dask.delayed per chunk (which has ~15ms overhead each from - pyproj init + small CUDA launches), we: - 1. Create CRS/transformer objects once - 2. Use GPU-sized output chunks (2048x2048 by default) - 3. For each output chunk, compute CUDA coordinates and fetch only - the source window needed from the dask array - 4. Assemble the result as a CuPy array - - For sources that fit in GPU memory, this is ~22x faster than the - dask.delayed path. For sources that don't fit, each chunk fetches - only its required window, so GPU memory usage scales with chunk size, - not source size. + """Dask+CuPy backend: process output chunks on GPU. + + Two modes based on available GPU memory: + + **Fast path** (output fits in GPU VRAM): pre-allocates the full output + on GPU and fills it chunk-by-chunk. ~22x faster than the map_blocks + path because CRS/transformer objects are created once and CUDA kernels + run with minimal launch overhead. + + **Chunked fallback** (output exceeds GPU VRAM): delegates to + ``_reproject_dask(is_cupy=True)`` which uses ``map_blocks`` and + processes one chunk at a time with O(chunk_size) GPU memory. """ import cupy as cp @@ -999,18 +997,29 @@ def _reproject_dask_cupy( src_res_x = (src_right - src_left) / src_w src_res_y = (src_top - src_bottom) / src_h - # Memory guard: the full output is allocated on GPU. + # Memory check: if the full output doesn't fit in GPU memory, + # fall back to the map_blocks path which is O(chunk_size) memory. estimated = out_shape[0] * out_shape[1] * 8 # float64 try: free_gpu, _total = cp.cuda.Device().mem_info - if estimated > 0.8 * free_gpu: - raise MemoryError( - f"_reproject_dask_cupy needs ~{estimated / 1e9:.1f} GB on GPU " - f"for the full output but only ~{free_gpu / 1e9:.1f} GB free. " - f"Reduce output resolution or use the dask+numpy path." - ) + fits_in_gpu = estimated < 0.5 * free_gpu except (AttributeError, RuntimeError): - pass # no device info available + fits_in_gpu = False + + if not fits_in_gpu: + import warnings + warnings.warn( + f"Output ({estimated / 1e9:.1f} GB) exceeds GPU memory; " + f"falling back to chunked map_blocks path.", + stacklevel=3, + ) + return _reproject_dask( + raster, src_bounds, src_shape, y_desc, + src_wkt, tgt_wkt, + out_bounds, out_shape, + resampling, nodata, precision, + chunk_size or 2048, True, # is_cupy=True + ) result = cp.full(out_shape, nodata, dtype=cp.float64) diff --git a/xrspatial/sieve.py b/xrspatial/sieve.py index 1ed66d91..ff35a226 100644 --- a/xrspatial/sieve.py +++ b/xrspatial/sieve.py @@ -131,10 +131,32 @@ def _label_connected(data, valid, neighborhood): ): _uf_union(parent, rank, idx, (r - 1) * cols + (c + 1)) + # --- Count unique regions first so region_val_buf is right-sized --- + # Reuse rank array (no longer needed after union-find) as root_to_id. + # This eliminates a separate n-element int32 allocation. + root_to_id = rank # alias; rank is dead after union-find + for i in range(n): + root_to_id[i] = 0 # clear + + n_regions = 0 + for i in range(n): + r = i // cols + c = i % cols + if not valid[r, c]: + continue + root = _uf_find(parent, i) + if root_to_id[root] == 0: + root_to_id[root] = 1 # mark as seen + n_regions += 1 + + # Allocate region_val_buf at actual region count, not pixel count. + # For a 46K x 46K raster with 100K regions this saves ~16 GB. + region_val_buf = np.full(n_regions + 1, np.nan, dtype=np.float64) + # Assign contiguous region IDs region_map_flat = np.zeros(n, dtype=np.int32) - root_to_id = np.zeros(n, dtype=np.int32) - region_val_buf = np.full(n + 1, np.nan, dtype=np.float64) + for i in range(n): + root_to_id[i] = 0 # clear for ID assignment next_id = 1 for i in range(n): @@ -319,14 +341,17 @@ def _available_memory_bytes(): def _sieve_dask(data, threshold, neighborhood, skip_values): """Dask+numpy backend: compute to numpy, sieve, wrap back.""" avail = _available_memory_bytes() - estimated_bytes = np.prod(data.shape) * data.dtype.itemsize - if estimated_bytes * 5 > 0.5 * avail: + n_pixels = np.prod(data.shape) + # Peak memory: input + result (float64 each) + parent + rank + + # region_map_flat (int32 each) = 2*8 + 3*4 = 28 bytes/pixel. + estimated_bytes = n_pixels * 28 + if estimated_bytes > 0.5 * avail: raise MemoryError( - f"sieve() needs the full array in memory " - f"(~{estimated_bytes * 5 / 1e9:.1f} GB) but only " - f"~{avail / 1e9:.1f} GB is available. Connected-component " - f"labeling is a global operation that cannot be chunked. " - f"Consider downsampling or tiling the input manually." + f"sieve() needs ~{estimated_bytes / 1e9:.1f} GB for the full " + f"array plus CCL bookkeeping, but only ~{avail / 1e9:.1f} GB " + f"is available. Connected-component labeling is a global " + f"operation that cannot be chunked. Consider downsampling " + f"or tiling the input manually." ) np_data = data.compute() @@ -338,18 +363,19 @@ def _sieve_dask(data, threshold, neighborhood, skip_values): def _sieve_dask_cupy(data, threshold, neighborhood, skip_values): """Dask+CuPy backend: compute to cupy, sieve via CPU fallback, wrap back.""" - estimated_bytes = np.prod(data.shape) * data.dtype.itemsize + n_pixels = np.prod(data.shape) + estimated_bytes = n_pixels * 28 try: import cupy as cp free_gpu, _total = cp.cuda.Device().mem_info - if estimated_bytes * 5 > 0.5 * free_gpu: + if estimated_bytes > 0.5 * free_gpu: raise MemoryError( - f"sieve() needs the full array on GPU " - f"(~{estimated_bytes * 5 / 1e9:.1f} GB) but only " - f"~{free_gpu / 1e9:.1f} GB free. Connected-component " - f"labeling is a global operation that cannot be chunked. " - f"Consider downsampling or tiling the input manually." + f"sieve() needs ~{estimated_bytes / 1e9:.1f} GB for the " + f"full array plus CCL bookkeeping, but only " + f"~{free_gpu / 1e9:.1f} GB free GPU memory. Connected-" + f"component labeling is a global operation that cannot be " + f"chunked. Consider downsampling or tiling the input." ) except (ImportError, AttributeError): pass diff --git a/xrspatial/tests/test_cost_distance.py b/xrspatial/tests/test_cost_distance.py index 2599fc35..6c4c1e37 100644 --- a/xrspatial/tests/test_cost_distance.py +++ b/xrspatial/tests/test_cost_distance.py @@ -415,6 +415,47 @@ def test_source_on_impassable_cell(backend): assert np.all(np.isnan(out)) +# ----------------------------------------------------------------------- +# Snake-maze: long shortest paths that need many Bellman-Ford iterations +# ----------------------------------------------------------------------- + +@pytest.mark.parametrize("backend", ['numpy', 'cupy', 'dask+numpy', 'dask+cupy']) +def test_snake_maze_long_path(backend): + """Maze where shortest path has more edges than height + width. + + Regression test for #1191: the CuPy Bellman-Ford loop used + max_iterations = height + width, which is too few for snaking paths. + """ + h, w = 5, 5 + source = np.zeros((h, w)) + source[0, 0] = 1.0 + + # Snake: row 0 right, down at col 4, row 2 left, down at col 0, row 4 right + friction = np.full((h, w), np.nan) + friction[0, :] = 1.0 + friction[1, 4] = 1.0 + friction[2, :] = 1.0 + friction[3, 0] = 1.0 + friction[4, :] = 1.0 + + raster = _make_raster(source, backend=backend, chunks=(5, 5)) + friction_r = _make_raster(friction, backend=backend, chunks=(5, 5)) + + result = cost_distance(raster, friction_r, connectivity=4) + out = _compute(result) + + # Path to (4,4) has 16 edges, each costing 1.0 + expected = np.array([ + [ 0., 1., 2., 3., 4.], + [np.nan, np.nan, np.nan, np.nan, 5.], + [10., 9., 8., 7., 6.], + [11., np.nan, np.nan, np.nan, np.nan], + [12., 13., 14., 15., 16.], + ], dtype=np.float32) + + np.testing.assert_allclose(out, expected, equal_nan=True, atol=1e-5) + + # ----------------------------------------------------------------------- # CuPy GPU tests # -----------------------------------------------------------------------