Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion xrspatial/accessor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,7 +1027,7 @@ def open_geotiff(self, source, **kwargs):
y_min, y_max = float(y.min()), float(y.max())
x_min, x_max = float(x.min()), float(x.max())

geo_info, file_h, file_w = _read_geo_info(source)
geo_info, file_h, file_w, _dtype, _nbands = _read_geo_info(source)
t = geo_info.transform

# Expand extent by half a pixel so we capture edge pixels
Expand Down
2 changes: 1 addition & 1 deletion xrspatial/cost_distance.py
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,7 @@ def _cost_distance_cupy(source_data, friction_data, cellsize_x, cellsize_y,
changed = cp.zeros(1, dtype=cp.int32)
griddim, blockdim = cuda_args((height, width))

max_iterations = height + width
max_iterations = height * width
for _ in range(max_iterations):
changed[0] = 0
_cost_distance_relax_kernel[griddim, blockdim](
Expand Down
32 changes: 23 additions & 9 deletions xrspatial/geotiff/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,19 @@ def _coords_to_transform(da: xr.DataArray) -> GeoTransform | None:
)


def _read_geo_info(source: str):
def _read_geo_info(source: str, *, overview_level: int | None = None):
"""Read only the geographic metadata and image dimensions from a GeoTIFF.

Returns (geo_info, height, width) without reading pixel data.
Returns (geo_info, height, width, dtype, n_bands) without reading pixel
data. Uses mmap for header-only access -- O(1) memory regardless of file
size.

Parameters
----------
overview_level : int or None
Overview IFD index (0 = full resolution).
"""
from ._dtypes import tiff_dtype_to_numpy
from ._geotags import extract_geo_info
from ._header import parse_all_ifds, parse_header

Expand All @@ -128,9 +136,17 @@ def _read_geo_info(source: str):
try:
header = parse_header(data)
ifds = parse_all_ifds(data, header)
ifd = ifds[0]
ifd_idx = 0
if overview_level is not None:
ifd_idx = min(overview_level, len(ifds) - 1)
ifd = ifds[ifd_idx]
geo_info = extract_geo_info(ifd, data, header.byte_order)
return geo_info, ifd.height, ifd.width
bps = ifd.bits_per_sample
if isinstance(bps, tuple):
bps = bps[0]
file_dtype = tiff_dtype_to_numpy(bps, ifd.sample_format)
n_bands = ifd.samples_per_pixel if ifd.samples_per_pixel > 1 else 0
return geo_info, ifd.height, ifd.width, file_dtype, n_bands
finally:
data.close()

Expand Down Expand Up @@ -873,11 +889,9 @@ def read_geotiff_dask(source: str, *, dtype=None, chunks: int | tuple = 512,
if source.lower().endswith('.vrt'):
return read_vrt(source, dtype=dtype, name=name, chunks=chunks)

# First, do a metadata-only read to get shape, dtype, coords, attrs
arr, geo_info = read_to_array(source, overview_level=overview_level)
full_h, full_w = arr.shape[:2]
n_bands = arr.shape[2] if arr.ndim == 3 else 0
file_dtype = arr.dtype
# Metadata-only read: O(1) memory via mmap, no pixel decompression
geo_info, full_h, full_w, file_dtype, n_bands = _read_geo_info(
source, overview_level=overview_level)
nodata = geo_info.nodata

# Nodata masking promotes integer arrays to float64 (for NaN).
Expand Down
43 changes: 39 additions & 4 deletions xrspatial/kde.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,15 +324,39 @@ def _run_kde_cupy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id):
return out


def _filter_points_to_tile(xs, ys, ws, tile_x0, tile_y0, dx, dy,
tile_rows, tile_cols, cutoff):
"""Return (xs, ys, ws) subset that could affect this tile.

Points whose cutoff circle doesn't overlap the tile extent are
excluded, reducing serialization and speeding up the kernel.
"""
tile_x1 = tile_x0 + tile_cols * dx
tile_y1 = tile_y0 + tile_rows * dy
mask = ((xs >= tile_x0 - cutoff) & (xs <= tile_x1 + cutoff) &
(ys >= tile_y0 - cutoff) & (ys <= tile_y1 + cutoff))
if mask.all():
return xs, ys, ws
return xs[mask], ys[mask], ws[mask]


def _run_kde_dask_numpy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id,
chunks):
"""Dask-backed KDE: each chunk computes its own tile independently."""
"""Dask-backed KDE: each chunk computes its own tile independently.

Points are pre-filtered per tile so each delayed task receives only
the relevant subset, reducing serialization from O(n_tiles * n_points)
to O(n_tiles * points_per_tile).
"""
# Determine chunk layout
if chunks is None:
chunks = (min(256, shape[0]), min(256, shape[1]))
row_splits = _split_sizes(shape[0], chunks[0])
col_splits = _split_sizes(shape[1], chunks[1])

# Cutoff radius matching the kernel implementation
cutoff = 4.0 * bw if kernel_id == 0 else bw

blocks = []
row_off = 0
for rs in row_splits:
Expand All @@ -342,8 +366,11 @@ def _run_kde_dask_numpy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id,
tile_y0 = y0 + row_off * dy
tile_x0 = x0 + col_off * dx
tile_shape = (rs, cs)
# Pre-filter points to this tile's extent + cutoff
txs, tys, tws = _filter_points_to_tile(
xs, ys, ws, tile_x0, tile_y0, dx, dy, rs, cs, cutoff)
block = dask.delayed(_run_kde_numpy)(
xs, ys, ws, tile_shape,
txs, tys, tws, tile_shape,
tile_x0, tile_y0, dx, dy, bw, kernel_id,
)
row_blocks.append(
Expand All @@ -358,12 +385,18 @@ def _run_kde_dask_numpy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id,

def _run_kde_dask_cupy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id,
chunks):
"""Dask+CuPy KDE: each chunk uses the GPU kernel."""
"""Dask+CuPy KDE: each chunk uses the GPU kernel.

Points are pre-filtered per tile (same as the numpy dask path)
so each delayed task serializes only the relevant subset.
"""
if chunks is None:
chunks = (min(256, shape[0]), min(256, shape[1]))
row_splits = _split_sizes(shape[0], chunks[0])
col_splits = _split_sizes(shape[1], chunks[1])

cutoff = 4.0 * bw if kernel_id == 0 else bw

blocks = []
row_off = 0
for rs in row_splits:
Expand All @@ -373,8 +406,10 @@ def _run_kde_dask_cupy(xs, ys, ws, shape, x0, y0, dx, dy, bw, kernel_id,
tile_y0 = y0 + row_off * dy
tile_x0 = x0 + col_off * dx
tile_shape = (rs, cs)
txs, tys, tws = _filter_points_to_tile(
xs, ys, ws, tile_x0, tile_y0, dx, dy, rs, cs, cutoff)
block = dask.delayed(_run_kde_cupy)(
xs, ys, ws, tile_shape,
txs, tys, tws, tile_shape,
tile_x0, tile_y0, dx, dy, bw, kernel_id,
)
row_blocks.append(
Expand Down
8 changes: 6 additions & 2 deletions xrspatial/pathfinding.py
Original file line number Diff line number Diff line change
Expand Up @@ -1075,7 +1075,9 @@ def a_star_search(surface: xr.DataArray,
else:
_check_memory(h, w)
if friction_np is None:
friction_np = np.ones((h, w), dtype=np.float64)
# Dummy 1x1 array: kernel never reads friction when
# use_friction is False, so avoid allocating h*w float64.
friction_np = np.ones((1, 1), dtype=np.float64)
path_img = np.full(surface.shape, np.nan, dtype=np.float64)
if start_py != NONE:
_a_star_search(surface_np, path_img, start_py, start_px,
Expand Down Expand Up @@ -1132,7 +1134,9 @@ def a_star_search(surface: xr.DataArray,
# Full-grid A*
_check_memory(h, w)
if friction_data is None:
friction_data = np.ones((h, w), dtype=np.float64)
# Dummy 1x1 array: kernel never reads friction when
# use_friction is False, so avoid allocating h*w float64.
friction_data = np.ones((1, 1), dtype=np.float64)
path_img = np.full(surface.shape, np.nan, dtype=np.float64)
if start_py != NONE:
_a_star_search(surface_data, path_img, start_py, start_px,
Expand Down
79 changes: 44 additions & 35 deletions xrspatial/polygon_clip.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,46 +201,55 @@ def clip_polygon(

mask = rasterize(geom_pairs, **kw)

# Get mask as a plain numpy array regardless of what rasterize returned
mask_np = mask.data
if has_dask_array() and isinstance(mask_np, da.Array):
mask_np = mask_np.compute()
if has_cuda_and_cupy() and is_cupy_array(mask_np):
mask_np = mask_np.get()

# Apply the mask. For CuPy backends we operate on the raw array
# to avoid an xarray/cupy incompatibility in ``DataArray.where()``.
cond_np = mask_np == 1

if has_cuda_and_cupy() and is_cupy_array(raster.data):
# Apply the mask. Keep it lazy for dask backends to avoid
# materializing the full mask into RAM (which would OOM for 30TB
# inputs). For non-dask backends, compute the mask eagerly.
mask_data = mask.data

if has_dask_array() and isinstance(raster.data, da.Array):
# Dask path: keep mask lazy -- no .compute()
if isinstance(mask_data, da.Array):
cond = mask_data == 1
else:
# Mask came back non-dask despite dask input (shouldn't happen,
# but handle gracefully)
cond = da.from_array(
np.asarray(mask_data == 1) if not is_cupy_array(mask_data)
else mask_data.get() == 1,
chunks=raster.data.chunks[-2:],
)

if has_cuda_and_cupy() and is_dask_cupy(raster):
# dask+cupy: use map_blocks with both raster and condition
def _apply_mask(raster_block, cond_block):
import cupy
out = raster_block.copy()
out[~cond_block.astype(bool)] = nodata
return out

out = da.map_blocks(
_apply_mask, raster.data, cond,
dtype=raster.dtype,
)
result = xr.DataArray(out, dims=raster.dims, coords=raster.coords)
else:
# dask+numpy: xarray.where handles lazy condition natively
result = raster.where(cond, other=nodata)
elif has_cuda_and_cupy() and is_cupy_array(raster.data):
# Pure CuPy: operate on raw arrays to avoid xarray/cupy
# incompatibility in DataArray.where().
import cupy
cond_cp = cupy.asarray(cond_np)
if is_cupy_array(mask_data):
cond_cp = mask_data == 1
else:
cond_cp = cupy.asarray(np.asarray(mask_data) == 1)
out = raster.data.copy()
out[~cond_cp] = nodata
result = xr.DataArray(out, dims=raster.dims, coords=raster.coords)
elif has_cuda_and_cupy() and is_dask_cupy(raster):
import cupy
cond_cp = cupy.asarray(cond_np)

def _apply_mask(block, block_info=None):
if block_info is not None:
loc = block_info[0]['array-location']
y_sl = slice(loc[-2][0], loc[-2][1])
x_sl = slice(loc[-1][0], loc[-1][1])
c = cond_cp[y_sl, x_sl]
else:
c = cond_cp
out = block.copy()
out[~c] = nodata
return out

out = raster.data.map_blocks(_apply_mask, dtype=raster.dtype)
result = xr.DataArray(out, dims=raster.dims, coords=raster.coords)
elif has_dask_array() and isinstance(raster.data, da.Array):
cond_da = da.from_array(cond_np, chunks=raster.data.chunks[-2:])
result = raster.where(cond_da, other=nodata)
else:
result = raster.where(cond_np, other=nodata)
# Pure numpy
cond = np.asarray(mask_data) == 1
result = raster.where(cond, other=nodata)

result.attrs = raster.attrs
result.name = name if name is not None else raster.name
Expand Down
58 changes: 53 additions & 5 deletions xrspatial/polygonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -1449,6 +1449,45 @@ def sort_key(item):
return column, polygon_points


def _merge_from_separated(all_interior, boundary_by_value, transform):
"""Merge pre-separated interior/boundary polygons into final output.

Like _merge_chunk_polygons but takes already-separated data so the
caller can accumulate incrementally (one chunk at a time) instead of
holding all chunk_results in memory simultaneously.
"""
# Merge boundary polygons per value using edge cancellation.
merged = []
for val, polys_list in boundary_by_value.items():
if len(polys_list) == 1:
merged.append((val, polys_list[0]))
else:
merged_polys = _merge_polygon_rings(polys_list)
for rings in merged_polys:
merged.append((val, rings))

all_polys = all_interior + merged

def sort_key(item):
ext = item[1][0]
min_y = np.min(ext[:, 1])
min_x = np.min(ext[ext[:, 1] == min_y, 0])
return (min_y, min_x, item[0])

all_polys.sort(key=sort_key)

column = []
polygon_points = []
for val, rings in all_polys:
if transform is not None:
for ring in rings:
_transform_points(ring, transform)
column.append(val)
polygon_points.append(rings)

return column, polygon_points


def _polygonize_dask(dask_data, mask_data, connectivity_8, transform):
"""Dask backend for polygonize: per-chunk polygonize + edge merge."""
# Ensure mask chunks match raster chunks.
Expand All @@ -1464,21 +1503,30 @@ def _polygonize_dask(dask_data, mask_data, connectivity_8, transform):
ny_total = int(sum(row_chunks))
nx_total = int(sum(col_chunks))

delayed_results = []
# Process chunks incrementally: compute one at a time so only boundary
# polygons accumulate in memory. Interior polygons (fully inside a
# chunk, no merging needed) go straight to the output list. This keeps
# peak memory proportional to boundary_polygon_count rather than
# total_polygon_count * n_chunks.
all_interior = []
boundary_by_value = {}

for iy in range(len(row_chunks)):
for ix in range(len(col_chunks)):
block = dask_data.blocks[iy, ix]
mask_block = (mask_data.blocks[iy, ix]
if mask_data is not None else None)
delayed_results.append(
interior, boundary = dask.compute(
dask.delayed(_polygonize_chunk)(
block, mask_block, connectivity_8,
int(row_offsets[iy]), int(col_offsets[ix]),
ny_total, nx_total,
))
))[0]
all_interior.extend(interior)
for val, rings in boundary:
boundary_by_value.setdefault(val, []).append(rings)

chunk_results = dask.compute(*delayed_results)
return _merge_chunk_polygons(chunk_results, transform)
return _merge_from_separated(all_interior, boundary_by_value, transform)


def polygonize(
Expand Down
Loading
Loading