From 7ec36a61361540cc14c9305110d5eb620260773a Mon Sep 17 00:00:00 2001 From: Ryan McKenna Date: Fri, 12 Jun 2026 12:59:10 -0700 Subject: [PATCH] Add OpenSetCategoricalInitializer to local_mode initialization Introduces an initializer for open-set categorical attributes that uses the DP-SIPS algorithm to privately discover significant partitions from data. The discovered partitions plus a default catch-all value form a CategoricalAttribute, and the noisy counts from SIPS are reused as the histogram measurement (no separate budget needed for the histogram). Follows the same conventions as NumericalInitializer and CategoricalInitializer: dataclass with name/attribute/rng fields, dp_event() method returning a ComposedDpEvent, and __call__() returning a ColumnMeasurement. PiperOrigin-RevId: 931290389 --- dpsynth/local_mode/initialization.py | 77 +++++++++++++++++++++++++ dpsynth/local_mode/primitives.py | 56 ++++++++++++++++++ tests/local_mode/initialization_test.py | 63 ++++++++++++++++++++ tests/local_mode/primitives_test.py | 64 ++++++++++++++++++++ 4 files changed, 260 insertions(+) diff --git a/dpsynth/local_mode/initialization.py b/dpsynth/local_mode/initialization.py index e99db09..e8eae5d 100644 --- a/dpsynth/local_mode/initialization.py +++ b/dpsynth/local_mode/initialization.py @@ -120,3 +120,80 @@ def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement: noisy_counts, (self.name,), stddev=sigma ) return ColumnMeasurement(self.attribute, transform_fn, measurement) + + +@dataclasses.dataclass +class OpenSetCategoricalInitializer: + """Mechanism that discovers and measures an open-set categorical domain. + + Uses Gaussian Thresholding (Algorithm 2 from the DP-SIPS paper) to privately + select significant partitions from the data and simultaneously obtain noisy + counts for each discovered partition. The discovered partitions, together + with the attribute's default_value (used as a catch-all for undiscovered + values), form a CategoricalAttribute used for downstream synthesis. + + Attributes: + name: Attribute name used as the clique key in the measurement. + attribute: The OpenSetCategoricalAttribute specifying the default value. + delta: Failure probability for the partition selection threshold. + rng: A numpy random number generator. + """ + + name: str + attribute: domain.OpenSetCategoricalAttribute + delta: float + rng: np.random.Generator + + def dp_event(self, zcdp_rho: float) -> dp_accounting.DpEvent: + """Returns the DpEvent for the Gaussian Thresholding mechanism. + + Args: + zcdp_rho: Total zCDP privacy budget. + + Returns: + A single GaussianDpEvent. + """ + gdp_budget = accounting.zcdp_to_gdp(zcdp_rho) + sigma = 1.0 / np.sqrt(gdp_budget) + return dp_accounting.GaussianDpEvent(noise_multiplier=sigma) + + def __call__(self, zcdp_rho: float, data: np.ndarray) -> ColumnMeasurement: + """Returns a differentially private measurement of the given data. + + Args: + zcdp_rho: Total zCDP privacy budget for partition selection. + data: 1D array of raw categorical values. + + Returns: + A ColumnMeasurement containing the discovered CategoricalAttribute, the + encoding transform, and a LinearMeasurement with the noisy counts from + DP-SIPS. The last entry in the domain is the default_value catch-all + whose count is not measured (set to zero in the measurement). + """ + # Map raw values to integer partition IDs for thresholding. + unique_values, inverse = np.unique(data, return_inverse=True) + gdp_budget = accounting.zcdp_to_gdp(zcdp_rho) + selected_ids, counts, stddev = ( + primitives.select_partitions_gaussian_thresholding( + self.rng, inverse, gdp_budget, self.delta + ) + ) + selected_values = list(unique_values[selected_ids]) + + # Build the discovered domain: default first, then selected values. + possible_values = [self.attribute.default_value] + selected_values + cat_attr = domain.CategoricalAttribute( + possible_values=possible_values, + out_of_domain_index=0, + ) + transform_fn = transformations.discrete_encoder(cat_attr) + + # The measurement covers only the discovered partitions (indices 1:), + # not the unmeasured default at index 0. + measurement = mbi.LinearMeasurement( + counts, + (self.name,), + stddev=stddev, + query=lambda x: x[1:], + ) + return ColumnMeasurement(cat_attr, transform_fn, measurement) diff --git a/dpsynth/local_mode/primitives.py b/dpsynth/local_mode/primitives.py index 644181f..c25b8bb 100644 --- a/dpsynth/local_mode/primitives.py +++ b/dpsynth/local_mode/primitives.py @@ -182,6 +182,62 @@ def _get_threshold(delta, sigma, max_part): return thresholds.max() +def select_partitions_gaussian_thresholding( + rng: np.random.Generator, + data: np.ndarray, + gdp_budget: float, + delta: float, +) -> tuple[np.ndarray, np.ndarray, float]: + """Selects partitions using Gaussian Thresholding (Weighted Gaussian). + + This implements Algorithm 2 from the DP-SIPS paper (Swanberg et al., 2023) + under item-level DP. It is the simplest partition selection mechanism: + + 1. Compute the histogram of partition counts. + 2. Add Gaussian noise calibrated to the privacy budget. + 3. Return partitions whose noisy count exceeds a threshold chosen to + bound the false-positive probability per empty partition at delta. + + Under item-level DP each record is treated as a distinct user contributing + to exactly one partition, so the histogram has L2 sensitivity 1. The + threshold is T = 1 + sigma * Phi^{-1}(1 - delta), following the paper's + formula with max_part = 1. + + Args: + rng: A numpy random number generator. + data: 1D array of integers, where each element is a partition ID. + gdp_budget: Privacy budget in terms of squared Gaussian DP mu parameter + (gdp_budget = mu^2 = 1 / sigma^2). + delta: Failure probability (false positive bound per empty partition). + + Returns: + A tuple containing: + - selected_partitions: 1D array of partition IDs that passed the + threshold. + - estimated_counts: 1D array of noisy counts for each selected + partition. + - sigma: The standard deviation of the Gaussian noise added. + """ + if gdp_budget <= 0 or delta <= 0: + raise ValueError(f"{gdp_budget=} and {delta=} must be positive.") + + sigma = 1.0 / np.sqrt(gdp_budget) + + if data.size == 0: + return np.empty(0, dtype=data.dtype), np.empty(0, dtype=float), sigma + + unique_parts, counts = np.unique(data, return_counts=True) + noisy_counts = counts + rng.normal(scale=sigma, size=counts.size) + + # Threshold: ensures that an empty partition (true count 0) passes with + # probability at most delta. For max_part=1 this simplifies to: + # T = 1/sqrt(1) + sigma * Phi^{-1}(1 - delta) = 1 + sigma * ppf(1-delta) + threshold = 1.0 + sigma * scipy.stats.norm.ppf(1.0 - delta) + passed = noisy_counts >= threshold + + return unique_parts[passed], noisy_counts[passed], sigma + + def select_partitions_sips( rng: np.random.Generator, data: np.ndarray, diff --git a/tests/local_mode/initialization_test.py b/tests/local_mode/initialization_test.py index 0807c43..490e6e3 100644 --- a/tests/local_mode/initialization_test.py +++ b/tests/local_mode/initialization_test.py @@ -104,5 +104,68 @@ def test_out_of_domain_values(self): ) +class OpenSetCategoricalInitializerTest(absltest.TestCase): + + def test_dp_event(self): + attr = domain.OpenSetCategoricalAttribute(default_value=None) + rng = np.random.default_rng(0) + initializer = initialization.OpenSetCategoricalInitializer( + name='test', attribute=attr, delta=1e-5, rng=rng + ) + event = initializer.dp_event(zcdp_rho=0.5) + self.assertIsInstance(event, dp_accounting.GaussianDpEvent) + + def test_call_noiseless(self): + attr = domain.OpenSetCategoricalAttribute(default_value=None) + rng = np.random.default_rng(42) + initializer = initialization.OpenSetCategoricalInitializer( + name='col', attribute=attr, delta=1e-5, rng=rng + ) + # 'A' appears 100 times, 'B' 50, 'C' 1 (rare). + data = np.array(['A'] * 100 + ['B'] * 50 + ['C'] * 1) + result = initializer(zcdp_rho=np.inf, data=data) + + self.assertIsInstance(result, initialization.ColumnMeasurement) + self.assertIsNotNone(result.measurement) + # With infinite budget, all values with count > 0 should be selected. + discovered = set(result.categorical_attribute.possible_values) + self.assertIn('A', discovered) + self.assertIn('B', discovered) + self.assertIn(None, discovered) # default value always present + # Default value is always first. + self.assertIsNone(result.categorical_attribute.possible_values[0]) + self.assertEqual(result.categorical_attribute.out_of_domain_index, 0) + + def test_undiscovered_values_map_to_default(self): + attr = domain.OpenSetCategoricalAttribute(default_value='OTHER') + rng = np.random.default_rng(0) + initializer = initialization.OpenSetCategoricalInitializer( + name='col', attribute=attr, delta=1e-5, rng=rng + ) + data = np.array(['A'] * 100 + ['B'] * 50) + result = initializer(zcdp_rho=np.inf, data=data) + + transform_fn = result.transform_fn + # Discovered values map to valid indices. + idx_a = transform_fn('A') + self.assertIsInstance(idx_a, int) + # Unknown value maps to the out-of-domain (default) index at 0. + self.assertEqual(result.categorical_attribute.out_of_domain_index, 0) + self.assertEqual(transform_fn('Z'), 0) + + def test_empty_data(self): + attr = domain.OpenSetCategoricalAttribute(default_value=None) + rng = np.random.default_rng(0) + initializer = initialization.OpenSetCategoricalInitializer( + name='col', attribute=attr, delta=1e-5, rng=rng + ) + data = np.array([], dtype=str) + result = initializer(zcdp_rho=np.inf, data=data) + + # Only the default value should be in the domain. + self.assertEqual(result.categorical_attribute.possible_values, [None]) + self.assertEqual(result.categorical_attribute.size, 1) + + if __name__ == '__main__': absltest.main() diff --git a/tests/local_mode/primitives_test.py b/tests/local_mode/primitives_test.py index 5f68750..2db8974 100644 --- a/tests/local_mode/primitives_test.py +++ b/tests/local_mode/primitives_test.py @@ -190,6 +190,70 @@ def test_mismatched_user_ids_raises(self): ) +class SelectPartitionsGaussianThresholdingTest(absltest.TestCase): + + def setUp(self): + super().setUp() + self.rng = np.random.default_rng(42) + + def test_basic_operation(self): + data = np.array([1] * 50 + [2] * 5) + selected, counts, sigma = ( + primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=10.0, delta=1e-5 + ) + ) + self.assertIn(1, selected) + self.assertEqual(sigma, 1.0 / np.sqrt(10.0)) + self.assertEqual(selected.size, counts.size) + + def test_empty_data(self): + data = np.array([], dtype=int) + selected, counts, sigma = ( + primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=1.0, delta=1e-5 + ) + ) + self.assertEmpty(selected) + self.assertEmpty(counts) + self.assertEqual(sigma, 1.0) + + def test_high_budget_selects_all(self): + data = np.array([1, 2, 3, 4, 5]) + selected, _, _ = primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=1e6, delta=0.1 + ) + self.assertCountEqual(selected, [1, 2, 3, 4, 5]) + + def test_zero_budget_raises(self): + data = np.array([1, 2, 3]) + with self.assertRaises(ValueError): + primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=-0.1, delta=1e-5 + ) + with self.assertRaises(ValueError): + primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=1.0, delta=-0.001 + ) + + def test_rare_items_not_selected(self): + # One item with many occurrences, another with just 1. + # With moderate budget and tight delta, the rare item should be dropped. + data = np.array([1] * 100 + [2]) + selected, _, _ = primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=0.5, delta=1e-6 + ) + self.assertIn(1, selected) + self.assertNotIn(2, selected) + + def test_string_data_type(self): + data = np.array(["a", "b", "a", "a", "c", "a", "c"]) + selected, _, _ = primitives.select_partitions_gaussian_thresholding( + self.rng, data, gdp_budget=10.0, delta=1e-5 + ) + self.assertTrue(all(isinstance(p, str) for p in selected)) + + class GaussianHistogramTest(absltest.TestCase): def setUp(self):