Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .claude/sweep-performance-state.csv
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ convolution,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,
corridor,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,
cost_distance,2026-06-15,RISKY,memory-bound,1,3342,"Perf sweep 2026-06-15. HIGH: bounded map_overlap branch in _cost_distance_dask gated on full dims (pad>=height/width) not chunk size; pad>chunk collapses to single chunk (#880-class OOM, verified npartitions=1 at chunks=10/pad=96). Fixed: compare pad vs max chunk dim, route to iterative when pad>=chunk (matches GPU path L484). dask+cupy path already correct. Register count 37 (no pressure). nanmin().compute() L478/L1149 intentional scalar. iterative tile_cache full-dataset materialization is documented MemoryError-guarded design (#1118). All 56 tests pass incl GPU."
curvature,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,
dasymetric,2026-03-31T18:00:00Z,SAFE,memory-bound,0,1126,Memory guard added to validate_disaggregation. Core disaggregate uses map_blocks.
dasymetric,2026-06-20,SAFE,compute-bound,0,3408,"MEDIUM: disaggregate() looped per-zone building full-shape boolean masks (O(n_zones x n_pixels)); vectorized via searchsorted + np.add.at to O(n_pixels + n_zones) in PR for #3408 (numpy core, dask per-chunk sums, distribute step). cupy is documented CPU fallback (not flagged); eager zone-sum dask.compute() is required global reduction, bounded per chunk (not flagged). CUDA+cupy validated end-to-end on host."
diffusion,2026-03-31T18:00:00Z,WILL OOM,memory-bound,2,1116,Scalar diffusivity now passed as float to chunks. DataArray diffusivity passed as dask array via map_overlap.
edge_detection,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,
emerging_hotspots,2026-03-31T18:00:00Z,SAFE,compute-bound,0,,
Expand Down
129 changes: 92 additions & 37 deletions xrspatial/dasymetric.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,48 @@ def _normalize_values(values):
# numpy backend
# ---------------------------------------------------------------------------

def _disaggregate_numpy(zones, weight, values_dict, method, nodata_zone):
"""Core numpy implementation. Returns a float64 ndarray."""
def _pixel_zone_index(zones_arr, invalid, sorted_zone_ids):
"""Map each valid pixel to the index of its zone in *sorted_zone_ids*.

Returns an int64 array the same shape as *zones_arr*. Pixels that are
invalid, or whose zone id is not present in *sorted_zone_ids*, get -1.

``sorted_zone_ids`` must be a sorted float64 ndarray of the zone ids in
``values_dict``; ``np.searchsorted`` then turns the per-zone membership
loop into a single vectorized lookup.
"""
idx = np.full(zones_arr.shape, -1, dtype=np.int64)
if sorted_zone_ids.size == 0:
return idx
valid = ~invalid
zf = zones_arr[valid]
pos = np.searchsorted(sorted_zone_ids, zf)
# clip so out-of-range positions index a real slot; the equality check
# below rejects them.
pos_clipped = np.clip(pos, 0, sorted_zone_ids.size - 1)
matches = sorted_zone_ids[pos_clipped] == zf
pix_idx = np.where(matches, pos_clipped, -1)
idx[valid] = pix_idx
return idx


def _zone_weight_sums(weight_arr, zone_index, n_zones):
"""Accumulate per-zone weight sums in a single pass.

*zone_index* is the output of :func:`_pixel_zone_index` (-1 for pixels
that belong to no tracked zone). Returns a float64 array of length
*n_zones*.
"""
sums = np.zeros(n_zones, dtype=np.float64)
if n_zones == 0:
return sums
sel = zone_index >= 0
np.add.at(sums, zone_index[sel], weight_arr[sel])
return sums


def _preprocess(zones, weight, method, nodata_zone):
"""Coerce to float64, clamp/binarize weights, and build the invalid mask."""
zones_arr = np.asarray(zones, dtype=np.float64)
weight_arr = np.asarray(weight, dtype=np.float64)

Expand All @@ -175,35 +215,65 @@ def _disaggregate_numpy(zones, weight, values_dict, method, nodata_zone):
if nodata_zone is not None:
invalid |= (zones_arr == nodata_zone)

# compute zone weight sums
zone_ids = np.array(sorted(values_dict.keys()), dtype=np.float64)
zone_sums = {}
for zid in zone_ids:
mask = (~invalid) & (zones_arr == zid)
zone_sums[int(zid)] = float(np.nansum(weight_arr[mask]))
return zones_arr, weight_arr, invalid


def _disaggregate_numpy(zones, weight, values_dict, method, nodata_zone):
"""Core numpy implementation. Returns a float64 ndarray."""
zones_arr, weight_arr, invalid = _preprocess(
zones, weight, method, nodata_zone,
)

sorted_ids = np.array(sorted(values_dict.keys()), dtype=np.float64)
zone_index = _pixel_zone_index(zones_arr, invalid, sorted_ids)
sums_arr = _zone_weight_sums(weight_arr, zone_index, sorted_ids.size)
zone_sums = {int(zid): float(s) for zid, s in zip(sorted_ids, sums_arr)}

# reuse the already-computed zone_index for the distribute step
return _disaggregate_numpy_with_sums(
zones_arr, weight_arr, values_dict, zone_sums, invalid,
zone_index=zone_index,
)


def _disaggregate_numpy_with_sums(
zones_arr, weight_arr, values_dict, zone_sums, invalid,
zone_index=None,
):
"""Distribute using precomputed zone weight sums.

Used directly by numpy and also called per-chunk by the dask backend.
Vectorized: one ``searchsorted`` lookup plus a gathered division, instead
of one full-array boolean mask per zone. *zone_index* (from
:func:`_pixel_zone_index`) may be supplied to skip a redundant lookup
when the caller has already computed it.
"""
result = np.full(zones_arr.shape, np.nan, dtype=np.float64)

for zid, zval in values_dict.items():
mask = (~invalid) & (zones_arr == zid)
wsum = zone_sums.get(int(zid), 0.0)
if wsum == 0.0:
# zero total weight -> NaN
result[mask] = np.nan
else:
result[mask] = zval * weight_arr[mask] / wsum
sorted_ids = np.array(sorted(values_dict.keys()), dtype=np.float64)
if sorted_ids.size == 0:
return result

if zone_index is None:
zone_index = _pixel_zone_index(zones_arr, invalid, sorted_ids)

# per-zone-slot lookup tables aligned with sorted_ids
zvals = np.array([values_dict[int(zid)] for zid in sorted_ids],
dtype=np.float64)
wsums = np.array([zone_sums.get(int(zid), 0.0) for zid in sorted_ids],
dtype=np.float64)

sel = zone_index >= 0
slot = zone_index[sel]
slot_wsum = wsums[slot]
# zero-weight-sum zones stay NaN; everyone else gets the proportional share
with np.errstate(invalid='ignore', divide='ignore'):
share = np.where(
slot_wsum == 0.0,
np.nan,
zvals[slot] * weight_arr[sel] / slot_wsum,
)
result[sel] = share

return result

Expand Down Expand Up @@ -232,21 +302,13 @@ def _compute_zone_sums_dask(zones_da, weight_da, zone_ids, method,
Returns a plain ``dict[int, float]``.
"""
zone_ids_set = set(int(z) for z in zone_ids)
sorted_ids = np.array(sorted(zone_ids_set), dtype=np.float64)

def _chunk_sums(z_block, w_block):
z = np.asarray(z_block, dtype=np.float64)
w = np.asarray(w_block, dtype=np.float64)
w = np.where(w < 0, 0.0, w)
if method == 'binary':
w = np.where(w != 0, 1.0, 0.0)
invalid = np.isnan(z) | np.isnan(w)
if nodata_zone is not None:
invalid |= (z == nodata_zone)
sums = {}
for zid in zone_ids_set:
mask = (~invalid) & (z == zid)
sums[zid] = float(np.nansum(w[mask]))
return sums
z, w, invalid = _preprocess(z_block, w_block, method, nodata_zone)
zone_index = _pixel_zone_index(z, invalid, sorted_ids)
sums_arr = _zone_weight_sums(w, zone_index, sorted_ids.size)
return {int(zid): float(s) for zid, s in zip(sorted_ids, sums_arr)}

# collect delayed chunk sums
z_blocks = zones_da.to_delayed().ravel()
Expand All @@ -266,14 +328,7 @@ def _chunk_sums(z_block, w_block):
def _distribute_chunk(z_block, w_block, values_dict, zone_sums, method,
nodata_zone):
"""Map-blocks worker: distribute within a single chunk."""
z = np.asarray(z_block, dtype=np.float64)
w = np.asarray(w_block, dtype=np.float64)
w = np.where(w < 0, 0.0, w)
if method == 'binary':
w = np.where(w != 0, 1.0, 0.0)
invalid = np.isnan(z) | np.isnan(w)
if nodata_zone is not None:
invalid |= (z == nodata_zone)
z, w, invalid = _preprocess(z_block, w_block, method, nodata_zone)
return _disaggregate_numpy_with_sums(z, w, values_dict, zone_sums,
invalid)

Expand Down
56 changes: 56 additions & 0 deletions xrspatial/tests/test_dasymetric.py
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,62 @@ def test_extra_key_silently_ignored(self, simple_zones, simple_weight):
# should not raise; zone 999 doesn't exist, just ignore
assert result.shape == simple_zones.shape

def test_many_zones_conservation(self):
"""Vectorized path: many zones still conserve totals and leave
missing/zero-weight zones as NaN (issue #3408)."""
rng = np.random.default_rng(0)
n_zones = 200
side = 240 # wide enough that every zone id appears in the raster
zones_data = np.tile(
np.arange(1, n_zones + 1, dtype=np.float64),
(side, (side + n_zones - 1) // n_zones),
)[:side, :side]
weight_data = rng.random((side, side)) + 0.1
# zone 23 all-zero weight -> NaN
weight_data[zones_data == 23] = 0.0
# zone 17 present in raster but absent from values -> NaN
values = {i: 10.0 * i for i in range(1, n_zones + 1) if i != 17}

present = set(np.unique(zones_data).astype(int))
assert {17, 23}.issubset(present)

zones = create_test_raster(zones_data, backend='numpy')
weight = create_test_raster(weight_data, backend='numpy')
result = disaggregate(zones, values, weight).values

for zid, zval in values.items():
if zid == 23:
assert np.all(np.isnan(result[zones_data == 23]))
continue
total = np.nansum(result[zones_data == zid])
assert total == pytest.approx(zval, rel=1e-10)
# zone 17 was never in values -> NaN
assert np.all(np.isnan(result[zones_data == 17]))

@pytest.mark.skipif(not has_dask_array(), reason="dask not available")
def test_many_zones_dask_matches_numpy(self):
"""Vectorized per-chunk path matches numpy with many zones (#3408)."""
rng = np.random.default_rng(1)
n_zones = 150
side = 64
zones_data = np.tile(
np.arange(1, n_zones + 1, dtype=np.float64),
(side, (side + n_zones - 1) // n_zones),
)[:side, :side]
weight_data = rng.random((side, side)) - 0.2 # exercise clamping
values = {i: 5.0 * i for i in range(1, n_zones + 1)}

zones_np = create_test_raster(zones_data, backend='numpy')
weight_np = create_test_raster(weight_data, backend='numpy')
zones_dk = create_test_raster(zones_data, backend='dask+numpy',
chunks=(16, 16))
weight_dk = create_test_raster(weight_data, backend='dask+numpy',
chunks=(16, 16))

result_np = disaggregate(zones_np, values, weight_np).values
result_dk = disaggregate(zones_dk, values, weight_dk).values
np.testing.assert_allclose(result_np, result_dk, equal_nan=True)


# ---------------------------------------------------------------------------
# TestValidation
Expand Down
Loading