diff --git a/TODO.md b/TODO.md index 1d1ab8d2..96493a30 100644 --- a/TODO.md +++ b/TODO.md @@ -92,7 +92,6 @@ Deferred items from PR reviews that were not addressed before merge. | Survey-weighted Silverman bandwidth in EfficientDiD conditional Omega* — `_silverman_bandwidth()` uses unweighted mean/std for bandwidth selection; survey-weighted statistics would better reflect the population distribution but is a second-order refinement | `efficient_did_covariates.py` | — | Low | | Survey sandwich SE is not exactly invariant to zero-weight (subpopulation / padded) rows: the shared `_compute_stratified_psu_meat` finite-sample correction counts zero-weight units as PSUs (an `n_psu/(n_psu-1)`-style factor), so adding zero-weight rows shifts the SE by a second-order amount (~2e-4 relative in the EfficientDiD e2e). The point estimate is exactly invariant and the weighted scores of zero-weight rows are already zero — only the DOF correction's PSU count includes them. Cross-cutting across all survey-enabled estimators; fix by counting only positive-weight PSUs in the correction. | `survey.py` (`_compute_stratified_psu_meat`) | PR-B follow-up | Low | | ImputationDiD: leave-one-out (LOO) conservative-variance refinement (BJS 2024 Supplementary Appendix A.9) not implemented — a finite-sample improvement to the auxiliary-model residuals that reduces overfitting of `tau_tilde_g` to `epsilon`. The asymptotic Theorem-3 variance is implemented and matches R `didimputation` (which also omits LOO by default). | `imputation.py` | imputation-validation follow-up | Low | -| TROP: extend Wave 4's `_setup_trop_data` helper to also cover the duplicated bootstrap resampling loop in `_bootstrap_variance` / `_bootstrap_variance_global` (~40 LoC dedup; mirrors the data-setup helper pattern with a `fit_callable` parameter for the per-draw refit step). | `trop_local.py`, `trop_global.py` | follow-up | Low | | StaggeredTripleDifference R cross-validation: CSV fixtures not committed (gitignored); tests skip without local R + triplediff. Commit fixtures or generate deterministically. | `tests/test_methodology_staggered_triple_diff.py` | #245 | Medium | | StaggeredTripleDifference R parity: benchmark only tests no-covariate path (xformla=~1). Add covariate-adjusted scenarios and aggregation SE parity assertions. | `benchmarks/R/benchmark_staggered_triplediff.R` | #245 | Medium | | StaggeredTripleDifference: per-cohort group-effect SEs include WIF (conservative vs R's wif=NULL). Documented in REGISTRY. Could override mixin for exact R match. | `staggered_triple_diff.py` | #245 | Low | diff --git a/diff_diff/trop_global.py b/diff_diff/trop_global.py index ec7402e7..4b8b4182 100644 --- a/diff_diff/trop_global.py +++ b/diff_diff/trop_global.py @@ -28,7 +28,11 @@ stratified_bootstrap_indices, warn_bootstrap_failure_rate, ) -from diff_diff.trop_local import _setup_trop_data, _soft_threshold_svd +from diff_diff.trop_local import ( + _run_trop_bootstrap_loop, + _setup_trop_data, + _soft_threshold_svd, +) from diff_diff.trop_results import TROPResults from diff_diff.utils import safe_inference, warn_if_not_converged @@ -978,43 +982,28 @@ def _bootstrap_variance_global( ) # Python fallback: consume the same indices the Rust branch would have used. - bootstrap_estimates_list: List[float] = [] - nonconverg_tracker: List[int] = [] - - for b in range(self.n_bootstrap): - sampled_control = ( - control_units[control_idx[b]] if n_control_units > 0 else np.array([], dtype=object) - ) - sampled_treated = ( - treated_units[treated_idx[b]] if n_treated_units > 0 else np.array([], dtype=object) - ) - sampled_units = np.concatenate([sampled_control, sampled_treated]) - - # Create bootstrap sample - boot_data = pd.concat( - [ - data[data[unit] == u].assign(**{unit: f"{u}_{idx}"}) - for idx, u in enumerate(sampled_units) - ], - ignore_index=True, - ) - - try: - tau = self._fit_global_with_fixed_lambda( - boot_data, - outcome, - treatment, - unit, - time, - optimal_lambda, - treated_periods, - survey_design=survey_design, - _nonconvergence_tracker=nonconverg_tracker, - ) - if np.isfinite(tau): - bootstrap_estimates_list.append(tau) - except (ValueError, np.linalg.LinAlgError, KeyError): - continue + bootstrap_estimates_list, nonconverg_tracker = _run_trop_bootstrap_loop( + data, + unit, + control_units, + treated_units, + control_idx, + treated_idx, + n_control_units, + n_treated_units, + self.n_bootstrap, + lambda boot_data, tracker: self._fit_global_with_fixed_lambda( + boot_data, + outcome, + treatment, + unit, + time, + optimal_lambda, + treated_periods, + survey_design=survey_design, + _nonconvergence_tracker=tracker, + ), + ) bootstrap_estimates = np.array(bootstrap_estimates_list) diff --git a/diff_diff/trop_local.py b/diff_diff/trop_local.py index 64b633d1..04c13f8e 100644 --- a/diff_diff/trop_local.py +++ b/diff_diff/trop_local.py @@ -12,7 +12,7 @@ import logging import warnings -from typing import List, Optional, Tuple +from typing import Callable, List, Optional, Tuple import numpy as np import pandas as pd @@ -194,6 +194,66 @@ def _setup_trop_data(data, outcome, treatment, unit, time, resolved_survey, surv } +def _run_trop_bootstrap_loop( + data: pd.DataFrame, + unit: str, + control_units: np.ndarray, + treated_units: np.ndarray, + control_idx: np.ndarray, + treated_idx: np.ndarray, + n_control_units: int, + n_treated_units: int, + n_bootstrap: int, + fit_callable: Callable[[pd.DataFrame, List[int]], float], +) -> Tuple[List[float], List[int]]: + """Shared per-draw resample-and-refit loop for the TROP pairs bootstrap. + + Used by both ``TROP._bootstrap_variance`` (local) and + ``TROP._bootstrap_variance_global``, whose Python-fallback loops were + byte-identical apart from the refit call. RNG-free: ``control_idx`` / + ``treated_idx`` are pre-generated by the caller via + :func:`stratified_bootstrap_indices`, so the Rust and Python paths consume the + identical draw sequence and this loop is deterministic. ``fit_callable(boot_data, + nonconverg_tracker) -> float`` performs the fixed-lambda refit -- the only thing + that differs between the local and global methods. Returns the list of finite + per-draw estimates and the shared non-convergence tracker; the caller keeps its + own (method-specific) warnings and ``np.std(ddof=1)`` SE computation. + """ + bootstrap_estimates_list: List[float] = [] + nonconverg_tracker: List[int] = [] + + for b in range(n_bootstrap): + sampled_control = ( + control_units[control_idx[b]] + if n_control_units > 0 + else np.array([], dtype=control_units.dtype) + ) + sampled_treated = ( + treated_units[treated_idx[b]] + if n_treated_units > 0 + else np.array([], dtype=treated_units.dtype) + ) + sampled_units = np.concatenate([sampled_control, sampled_treated]) + + # Create bootstrap sample with unique unit IDs + boot_data = pd.concat( + [ + data[data[unit] == u].assign(**{unit: f"{u}_{idx}"}) + for idx, u in enumerate(sampled_units) + ], + ignore_index=True, + ) + + try: + est = fit_callable(boot_data, nonconverg_tracker) + if np.isfinite(est): + bootstrap_estimates_list.append(est) + except (ValueError, np.linalg.LinAlgError, KeyError): + continue + + return bootstrap_estimates_list, nonconverg_tracker + + # Module-level convergence tolerance for SVD singular value truncation. # Singular values below this threshold after soft-thresholding are treated # as zero to improve numerical stability. @@ -1119,47 +1179,28 @@ def _bootstrap_variance( ) # Python fallback: consume the same indices the Rust branch would have used. - bootstrap_estimates_list = [] - nonconverg_tracker: List[int] = [] - - for b in range(self.n_bootstrap): - sampled_control = ( - control_units[control_idx[b]] - if n_control_units > 0 - else np.array([], dtype=control_units.dtype) - ) - sampled_treated = ( - treated_units[treated_idx[b]] - if n_treated_units > 0 - else np.array([], dtype=treated_units.dtype) - ) - sampled_units = np.concatenate([sampled_control, sampled_treated]) - - # Create bootstrap sample with unique unit IDs - boot_data = pd.concat( - [ - data[data[unit] == u].assign(**{unit: f"{u}_{idx}"}) - for idx, u in enumerate(sampled_units) - ], - ignore_index=True, - ) - - try: - # Fit with fixed lambda (skip LOOCV for speed) - att = self._fit_with_fixed_lambda( - boot_data, - outcome, - treatment, - unit, - time, - optimal_lambda, - survey_design=survey_design, - _nonconvergence_tracker=nonconverg_tracker, - ) - if np.isfinite(att): - bootstrap_estimates_list.append(att) - except (ValueError, np.linalg.LinAlgError, KeyError): - continue + bootstrap_estimates_list, nonconverg_tracker = _run_trop_bootstrap_loop( + data, + unit, + control_units, + treated_units, + control_idx, + treated_idx, + n_control_units, + n_treated_units, + self.n_bootstrap, + # Fit with fixed lambda (skip LOOCV for speed) + lambda boot_data, tracker: self._fit_with_fixed_lambda( + boot_data, + outcome, + treatment, + unit, + time, + optimal_lambda, + survey_design=survey_design, + _nonconvergence_tracker=tracker, + ), + ) bootstrap_estimates = np.array(bootstrap_estimates_list) diff --git a/tests/test_trop.py b/tests/test_trop.py index 1ecf2870..88642481 100644 --- a/tests/test_trop.py +++ b/tests/test_trop.py @@ -10,6 +10,7 @@ from diff_diff.prep import generate_factor_data from diff_diff.trop import TROP, TROPResults, trop +from diff_diff.trop_local import _run_trop_bootstrap_loop def generate_factor_dgp( @@ -2864,6 +2865,168 @@ def test_local_all_treated_nan_warns(self): assert np.isnan(results.att) +class TestRunTropBootstrapLoop: + """Direct unit tests for the shared ``_run_trop_bootstrap_loop`` helper - the + deduplicated per-draw resample-and-refit loop used by both + ``TROP._bootstrap_variance`` (local) and ``TROP._bootstrap_variance_global``. + + A stub ``fit_callable`` exercises the loop logic with no linalg (platform-stable, + no BLAS flakiness): the resampling + ``f"{u}_{idx}"`` rename, the ``np.isfinite`` + filter, the exception-skip guard, the non-convergence tracker passthrough, and the + degenerate empty-pool branches. + """ + + @staticmethod + def _panel(): + # 4 units x 2 periods; string ids so the rename device is visible. + rows = [] + for u in ["a", "b", "c", "d"]: + for t in [0, 1]: + rows.append({"unit": u, "period": t, "outcome": float(ord(u) + t)}) + return pd.DataFrame(rows) + + def test_resamples_with_renamed_ids_and_calls_fit(self): + data = self._panel() + control_units = np.array(["a", "b"]) + treated_units = np.array(["c", "d"]) + # 2 draws, deterministic index arrays (the helper is RNG-free). + control_idx = np.array([[0, 1], [1, 1]]) + treated_idx = np.array([[0, 1], [0, 0]]) + seen = [] + estimates, tracker = _run_trop_bootstrap_loop( + data, + "unit", + control_units, + treated_units, + control_idx, + treated_idx, + 2, + 2, + 2, + lambda boot_data, _tr: (seen.append(boot_data), 1.5)[1], + ) + assert estimates == [1.5, 1.5] + assert tracker == [] + # Draw 0: control a,b + treated c,d -> renamed a_0,b_1,c_2,d_3; 2 rows each. + assert sorted(seen[0]["unit"].unique()) == ["a_0", "b_1", "c_2", "d_3"] + assert len(seen[0]) == 8 + # Draw 1: control b,b + treated c,c -> duplicated units stay distinct via idx. + assert sorted(seen[1]["unit"].unique()) == ["b_0", "b_1", "c_2", "c_3"] + assert len(seen[1]) == 8 + + def test_finite_filter_drops_nan_estimates(self): + data = self._panel() + cu, tu = np.array(["a", "b"]), np.array(["c", "d"]) + cidx = tidx = np.array([[0, 1], [0, 1], [0, 1]]) + vals = iter([2.0, float("nan"), 3.0]) + estimates, _ = _run_trop_bootstrap_loop( + data, + "unit", + cu, + tu, + cidx, + tidx, + 2, + 2, + 3, + lambda _bd, _tr: next(vals), + ) + assert estimates == [2.0, 3.0] # the NaN draw is dropped + + def test_exception_draw_is_skipped(self): + data = self._panel() + cu, tu = np.array(["a", "b"]), np.array(["c", "d"]) + cidx = tidx = np.array([[0, 1], [0, 1], [0, 1]]) + calls = {"n": 0} + + def stub(_bd, _tr): + calls["n"] += 1 + if calls["n"] == 2: + raise ValueError("forced failure") + return 4.0 + + estimates, _ = _run_trop_bootstrap_loop( + data, + "unit", + cu, + tu, + cidx, + tidx, + 2, + 2, + 3, + stub, + ) + assert estimates == [4.0, 4.0] # draw 2 skipped; draws 1 + 3 collected + assert calls["n"] == 3 + + def test_nonconverg_tracker_passthrough(self): + data = self._panel() + cu, tu = np.array(["a", "b"]), np.array(["c", "d"]) + cidx = tidx = np.array([[0, 1], [0, 1]]) + estimates, tracker = _run_trop_bootstrap_loop( + data, + "unit", + cu, + tu, + cidx, + tidx, + 2, + 2, + 2, + lambda _bd, tr: (tr.append(1), 5.0)[1], + ) + assert estimates == [5.0, 5.0] + assert tracker == [1, 1] # the helper's tracker is threaded into fit_callable + + def test_empty_control_pool_branch(self): + data = self._panel() + cu = np.array([], dtype=object) + tu = np.array(["c", "d"]) + cidx = np.zeros((1, 0), dtype=int) # unused when n_control_units == 0 + tidx = np.array([[0, 1]]) + seen = [] + estimates, _ = _run_trop_bootstrap_loop( + data, + "unit", + cu, + tu, + cidx, + tidx, + 0, + 2, + 1, + lambda bd, _tr: (seen.append(bd), 1.0)[1], + ) + # Only treated units sampled -> renamed c_0, d_1 (assert ids/rows, not dtype). + assert sorted(seen[0]["unit"].unique()) == ["c_0", "d_1"] + assert len(seen[0]) == 4 + assert estimates == [1.0] + + def test_empty_treated_pool_branch(self): + data = self._panel() + cu = np.array(["a", "b"]) + tu = np.array([], dtype=object) + cidx = np.array([[0, 1]]) + tidx = np.zeros((1, 0), dtype=int) # unused when n_treated_units == 0 + seen = [] + estimates, _ = _run_trop_bootstrap_loop( + data, + "unit", + cu, + tu, + cidx, + tidx, + 2, + 0, + 1, + lambda bd, _tr: (seen.append(bd), 2.0)[1], + ) + assert sorted(seen[0]["unit"].unique()) == ["a_0", "b_1"] + assert len(seen[0]) == 4 + assert estimates == [2.0] + + class TestTROPBootstrapNaNSE: """Tests for NaN SE when bootstrap has <2 successful draws."""