From 426e7fe6c43195cc13964c1a8db580c396939a33 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 01:08:25 -0700 Subject: [PATCH 1/8] [python] Add failing repro: upsert by key leaves stale duplicate rows When the table already holds multiple rows sharing an upsert key (append-only tables allow duplicate keys), TableUpsertByKey updates only the last-scanned matching row and leaves the others stale. Add a test asserting the intended behavior (all matching rows updated); it currently fails. Fix to follow. --- .../tests/table_upsert_by_key_test.py | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py index f276e44ab8ea..162d0f3e6147 100644 --- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py +++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py @@ -139,6 +139,28 @@ def test_mixed_update_and_append(self): ) self.assertEqual([(1, 'Alice'), (2, 'Bob_new'), (3, 'Carol')], rows) + def test_existing_table_duplicate_keys_all_updated(self): + """If the table already holds multiple rows with the same key, upsert + must update ALL of them, leaving no stale rows.""" + table = self._create_table() + # Two physical rows sharing key id=1 (plain appends; append-only + # tables allow duplicate keys). + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['old_A'], 'age': [10], 'city': ['X'], + }, schema=self.pa_schema)) + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['old_B'], 'age': [20], 'city': ['Y'], + }, schema=self.pa_schema)) + + self._upsert(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['UPDATED'], 'age': [99], 'city': ['Z'], + }, schema=self.pa_schema), upsert_keys=['id']) + + result = self._read_all(table) + names = sorted(n for i, n in zip(result['id'].to_pylist(), + result['name'].to_pylist()) if i == 1) + self.assertEqual(['UPDATED', 'UPDATED'], names) + def test_composite_key_upsert(self): """Upsert with a multi-column composite key.""" table = self._create_table() From d5fcb130465f5e4f1b375769a3e37c50eeee1b3e Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 16:26:02 +0800 Subject: [PATCH 2/8] clean code --- paimon-python/pypaimon/tests/table_upsert_by_key_test.py | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py index 162d0f3e6147..5793481100bc 100644 --- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py +++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py @@ -139,12 +139,8 @@ def test_mixed_update_and_append(self): ) self.assertEqual([(1, 'Alice'), (2, 'Bob_new'), (3, 'Carol')], rows) - def test_existing_table_duplicate_keys_all_updated(self): - """If the table already holds multiple rows with the same key, upsert - must update ALL of them, leaving no stale rows.""" + def test_upsert_for_existing_table_duplicate_keys(self): table = self._create_table() - # Two physical rows sharing key id=1 (plain appends; append-only - # tables allow duplicate keys). self._write_arrow(table, pa.Table.from_pydict({ 'id': [1], 'name': ['old_A'], 'age': [10], 'city': ['X'], }, schema=self.pa_schema)) From ca35cf25b115f99f4d2372dd3a1b7308195d7ee7 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 01:32:41 -0700 Subject: [PATCH 3/8] [python] Upsert by key: update all rows matching an existing key When the table already holds multiple rows sharing an upsert key, TableUpsertByKey updated only the last-scanned row and left the others stale. Collect all row ids per key (key -> [row_id, ...]) and expand each matched input row to every matching row id, so all of them are updated. Turns the previously failing test_upsert_for_existing_table_duplicate_keys green. --- .../pypaimon/write/table_upsert_by_key.py | 45 ++++++++++--------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py b/paimon-python/pypaimon/write/table_upsert_by_key.py index 42d3ced13ec4..6f69221e5376 100644 --- a/paimon-python/pypaimon/write/table_upsert_by_key.py +++ b/paimon-python/pypaimon/write/table_upsert_by_key.py @@ -168,9 +168,9 @@ def _upsert_partition( partition_data, input_key_tuples, partition_spec, ) - # 3. Scan partition once, keeping only key → _ROW_ID pairs that + # 3. Scan partition once, keeping key → [_ROW_ID, ...] for keys that # appear in the input (memory ∝ |input|, not |partition|). - key_to_row_id = self._build_key_to_row_id_map( + key_to_row_ids = self._build_key_to_row_ids_map( match_keys, partition_spec, set(input_key_tuples), ) @@ -178,7 +178,7 @@ def _upsert_partition( matched_indices: List[int] = [] new_indices: List[int] = [] for i, key_tuple in enumerate(input_key_tuples): - (matched_indices if key_tuple in key_to_row_id else new_indices).append(i) + (matched_indices if key_tuple in key_to_row_ids else new_indices).append(i) logger.info( "Upserting partition %s: %d matched, %d new", @@ -189,7 +189,7 @@ def _upsert_partition( if matched_indices: commit_messages.extend(self._do_updates( partition_data, matched_indices, - input_key_tuples, key_to_row_id, update_cols, + input_key_tuples, key_to_row_ids, update_cols, )) if new_indices: commit_messages.extend(self._do_appends(partition_data, new_indices)) @@ -274,15 +274,16 @@ def _validate_inputs(self, data: pa.Table, upsert_keys: List[str], # that partition columns can be stripped first. The same non-partition # key may legally appear in different partitions. - def _build_key_to_row_id_map( + def _build_key_to_row_ids_map( self, match_keys: List[str], partition_spec: Optional[Dict[str, Any]], input_key_set: set, - ) -> Dict[_KeyTuple, int]: + ) -> Dict[_KeyTuple, List[int]]: """ - Scan the partition in batches and collect key → _ROW_ID only for - rows whose composite key is in *input_key_set*. + Scan the partition in batches and collect key → [_ROW_ID, ...] for + rows whose composite key is in *input_key_set*. All row ids sharing a + key are kept so every matching row can be updated. The partition spec (if any) is pushed down as an ``and`` of per-key equality predicates so non-matching partitions are pruned by the @@ -322,7 +323,7 @@ def _build_key_to_row_id_map( ) # Stream batches and filter against input_key_set on-the-fly - key_to_row_id: Dict[_KeyTuple, int] = {} + key_to_row_ids: Dict[_KeyTuple, List[int]] = {} row_id_col = SpecialFields.ROW_ID.name for batch in table_read.to_arrow_batch_reader(splits): batch_key_cols = [batch.column(k).to_pylist() for k in match_keys] @@ -330,27 +331,31 @@ def _build_key_to_row_id_map( for j, row_id in enumerate(batch_row_ids): key_tuple = tuple(col[j] for col in batch_key_cols) if key_tuple in input_key_set: - key_to_row_id[key_tuple] = row_id + key_to_row_ids.setdefault(key_tuple, []).append(row_id) - return key_to_row_id + return key_to_row_ids def _do_updates( self, data: pa.Table, matched_indices: List[int], input_key_tuples: List[_KeyTuple], - key_to_row_id: Dict[_KeyTuple, int], + key_to_row_ids: Dict[_KeyTuple, List[int]], update_cols: Optional[List[str]] ) -> List[CommitMessage]: """Update matched rows by rewriting them in-place via - :class:`TableUpdateByRowId`.""" - matched_data = data.take(matched_indices) - row_id_array = pa.array( - [key_to_row_id[input_key_tuples[i]] for i in matched_indices], - type=pa.int64(), - ) - update_data = matched_data.append_column( - SpecialFields.ROW_ID.name, row_id_array, + :class:`TableUpdateByRowId`. A key matching several existing rows + updates all of them to the same input row.""" + # Expand each matched input row once per existing row id sharing its key. + expanded_input_indices: List[int] = [] + row_ids: List[int] = [] + for i in matched_indices: + for row_id in key_to_row_ids[input_key_tuples[i]]: + expanded_input_indices.append(i) + row_ids.append(row_id) + + update_data = data.take(expanded_input_indices).append_column( + SpecialFields.ROW_ID.name, pa.array(row_ids, type=pa.int64()), ) cols_to_update = list(update_cols) if update_cols else list(self.table.field_names) From e5c6f2b6d2e143dbdd287125c5f4e0e1591df5f9 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 01:42:47 -0700 Subject: [PATCH 4/8] [python] Tighten upsert by key comments --- paimon-python/pypaimon/write/table_upsert_by_key.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py b/paimon-python/pypaimon/write/table_upsert_by_key.py index 6f69221e5376..c98a5fb1e1eb 100644 --- a/paimon-python/pypaimon/write/table_upsert_by_key.py +++ b/paimon-python/pypaimon/write/table_upsert_by_key.py @@ -343,10 +343,9 @@ def _do_updates( key_to_row_ids: Dict[_KeyTuple, List[int]], update_cols: Optional[List[str]] ) -> List[CommitMessage]: - """Update matched rows by rewriting them in-place via - :class:`TableUpdateByRowId`. A key matching several existing rows - updates all of them to the same input row.""" - # Expand each matched input row once per existing row id sharing its key. + """Update matched rows in-place via :class:`TableUpdateByRowId`; + a key matching several existing rows updates all of them.""" + # Fan out each matched input row to all its existing row ids. expanded_input_indices: List[int] = [] row_ids: List[int] = [] for i in matched_indices: From f6979a987a4f40cf4801bbbac216653b307c49c3 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 01:46:44 -0700 Subject: [PATCH 5/8] [python] Document update-all in upsert docstring and assert it in composite-key test --- .../pypaimon/tests/table_upsert_by_key_test.py | 10 +++++++--- paimon-python/pypaimon/write/table_upsert_by_key.py | 4 ++-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py index 5793481100bc..4c0a07f5238e 100644 --- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py +++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py @@ -167,8 +167,7 @@ def test_composite_key_upsert(self): 'city': ['NYC', 'LA', 'Chicago'], }, schema=self.pa_schema)) - # (id, name) = (1, Alice) appears twice in the table → matches the - # first occurrence; (2, Carol) is new. + # (id, name) = (1, Alice) appears twice → both are updated; (2, Carol) is new. self._upsert(table, pa.Table.from_pydict({ 'id': [1, 2], 'name': ['Alice', 'Carol'], @@ -183,7 +182,12 @@ def test_composite_key_upsert(self): result['name'].to_pylist(), result['city'].to_pylist(), )) - self.assertIn((2, 'Carol', 'Dallas'), rows) + self.assertEqual([ + (1, 'Alice', 'Updated'), + (1, 'Alice', 'Updated'), + (2, 'Bob', 'Chicago'), + (2, 'Carol', 'Dallas'), + ], rows) def test_sequential_upserts(self): """A second upsert sees the rows inserted by the first.""" diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py b/paimon-python/pypaimon/write/table_upsert_by_key.py index c98a5fb1e1eb..d2b588ffb3a5 100644 --- a/paimon-python/pypaimon/write/table_upsert_by_key.py +++ b/paimon-python/pypaimon/write/table_upsert_by_key.py @@ -37,8 +37,8 @@ class TableUpsertByKey: Table upsert by one or more user-specified key columns for append-only tables. For each row in the input Arrow table: - - If a row with the same upsert_keys composite value already exists → update that row - (in-place rewrite). + - If one or more rows with the same upsert_keys composite value already exist → + update all of them (in-place rewrite). - If no matching row exists → append as a new row. All upsert_keys must be columns present in both the input data and the table schema. From d6b36d7140943950bfd522c3eb5bf8cfb05dd254 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 01:50:56 -0700 Subject: [PATCH 6/8] [python] Add upsert tests for update_cols, partitioned, and multi-key duplicates --- .../tests/table_upsert_by_key_test.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py index 4c0a07f5238e..cf612a968cb0 100644 --- a/paimon-python/pypaimon/tests/table_upsert_by_key_test.py +++ b/paimon-python/pypaimon/tests/table_upsert_by_key_test.py @@ -157,6 +157,64 @@ def test_upsert_for_existing_table_duplicate_keys(self): result['name'].to_pylist()) if i == 1) self.assertEqual(['UPDATED', 'UPDATED'], names) + def test_existing_duplicate_keys_partial_update_cols(self): + """update_cols restricts which columns are rewritten; every matching + row is still updated, other columns keep each row's own value.""" + table = self._create_table() + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['old_A'], 'age': [10], 'city': ['X'], + }, schema=self.pa_schema)) + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['old_B'], 'age': [20], 'city': ['Y'], + }, schema=self.pa_schema)) + + self._upsert(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['UPDATED'], 'age': [99], 'city': ['Z'], + }, schema=self.pa_schema), upsert_keys=['id'], update_cols=['name']) + + result = self._read_all(table) + rows = sorted(zip(result['id'].to_pylist(), result['name'].to_pylist(), + result['age'].to_pylist(), result['city'].to_pylist())) + self.assertEqual([(1, 'UPDATED', 10, 'X'), (1, 'UPDATED', 20, 'Y')], rows) + + def test_existing_duplicate_keys_partitioned(self): + """Duplicate keys within a partition are all updated; rows in other + partitions are untouched.""" + table = self._create_table( + pa_schema=self.partitioned_pa_schema, partition_keys=['region']) + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1, 1], 'name': ['a1', 'a2'], 'age': [10, 20], 'region': ['A', 'A'], + }, schema=self.partitioned_pa_schema)) + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['b1'], 'age': [30], 'region': ['B'], + }, schema=self.partitioned_pa_schema)) + + self._upsert(table, pa.Table.from_pydict({ + 'id': [1], 'name': ['UPDATED'], 'age': [99], 'region': ['A'], + }, schema=self.partitioned_pa_schema), upsert_keys=['id']) + + result = self._read_all(table) + rows = sorted(zip(result['id'].to_pylist(), result['name'].to_pylist(), + result['region'].to_pylist())) + self.assertEqual( + [(1, 'UPDATED', 'A'), (1, 'UPDATED', 'A'), (1, 'b1', 'B')], rows) + + def test_multiple_keys_each_with_duplicates(self): + """One upsert updates every matching row across several keys.""" + table = self._create_table() + self._write_arrow(table, pa.Table.from_pydict({ + 'id': [1, 1, 2, 2], 'name': ['a', 'b', 'c', 'd'], + 'age': [1, 2, 3, 4], 'city': ['p', 'q', 'r', 's'], + }, schema=self.pa_schema)) + + self._upsert(table, pa.Table.from_pydict({ + 'id': [1, 2], 'name': ['U1', 'U2'], 'age': [10, 20], 'city': ['X', 'Y'], + }, schema=self.pa_schema), upsert_keys=['id']) + + result = self._read_all(table) + names = sorted(zip(result['id'].to_pylist(), result['name'].to_pylist())) + self.assertEqual([(1, 'U1'), (1, 'U1'), (2, 'U2'), (2, 'U2')], names) + def test_composite_key_upsert(self): """Upsert with a multi-column composite key.""" table = self._create_table() From 0e04fa155854a02ad504250bd6f872d84a093918 Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 06:58:04 -0700 Subject: [PATCH 7/8] [python] Trim redundant upsert docstrings and fix stale memory comment --- paimon-python/pypaimon/write/table_upsert_by_key.py | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py b/paimon-python/pypaimon/write/table_upsert_by_key.py index d2b588ffb3a5..c7c6360881bc 100644 --- a/paimon-python/pypaimon/write/table_upsert_by_key.py +++ b/paimon-python/pypaimon/write/table_upsert_by_key.py @@ -169,7 +169,8 @@ def _upsert_partition( ) # 3. Scan partition once, keeping key → [_ROW_ID, ...] for keys that - # appear in the input (memory ∝ |input|, not |partition|). + # appear in the input (memory ∝ matched existing rows, not the + # whole partition). key_to_row_ids = self._build_key_to_row_ids_map( match_keys, partition_spec, set(input_key_tuples), ) @@ -282,8 +283,7 @@ def _build_key_to_row_ids_map( ) -> Dict[_KeyTuple, List[int]]: """ Scan the partition in batches and collect key → [_ROW_ID, ...] for - rows whose composite key is in *input_key_set*. All row ids sharing a - key are kept so every matching row can be updated. + rows whose composite key is in *input_key_set*. The partition spec (if any) is pushed down as an ``and`` of per-key equality predicates so non-matching partitions are pruned by the @@ -343,9 +343,7 @@ def _do_updates( key_to_row_ids: Dict[_KeyTuple, List[int]], update_cols: Optional[List[str]] ) -> List[CommitMessage]: - """Update matched rows in-place via :class:`TableUpdateByRowId`; - a key matching several existing rows updates all of them.""" - # Fan out each matched input row to all its existing row ids. + """Update matched rows in-place via :class:`TableUpdateByRowId`.""" expanded_input_indices: List[int] = [] row_ids: List[int] = [] for i in matched_indices: From d3c79328ade38e096e3917c8fc811089d70de9da Mon Sep 17 00:00:00 2001 From: xiaohongbo Date: Mon, 22 Jun 2026 07:34:15 -0700 Subject: [PATCH 8/8] [python] Log upsert fan-out and fix update-all docstring in table_update --- paimon-python/pypaimon/write/table_update.py | 4 ++-- paimon-python/pypaimon/write/table_upsert_by_key.py | 8 ++++++++ 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/paimon-python/pypaimon/write/table_update.py b/paimon-python/pypaimon/write/table_update.py index 4b063dfa7bf5..8271b19f3789 100644 --- a/paimon-python/pypaimon/write/table_update.py +++ b/paimon-python/pypaimon/write/table_update.py @@ -154,8 +154,8 @@ def _upsert_by_arrow_with_key( For each row in the input Arrow table: - * If a row with the same composite ``upsert_keys`` value already - exists → update that row in-place. + * If one or more rows with the same composite ``upsert_keys`` value + already exist → update all of them in-place. * Otherwise → append as a new row. The public method lives on the concrete subclasses so each can diff --git a/paimon-python/pypaimon/write/table_upsert_by_key.py b/paimon-python/pypaimon/write/table_upsert_by_key.py index c7c6360881bc..4faf02fea0bd 100644 --- a/paimon-python/pypaimon/write/table_upsert_by_key.py +++ b/paimon-python/pypaimon/write/table_upsert_by_key.py @@ -185,6 +185,14 @@ def _upsert_partition( "Upserting partition %s: %d matched, %d new", partition_spec, len(matched_indices), len(new_indices), ) + total_updates = sum( + len(key_to_row_ids[input_key_tuples[i]]) for i in matched_indices) + if total_updates > len(matched_indices): + logger.info( + "Upsert fan-out in partition %s: %d input rows expand to " + "%d row updates", partition_spec, + len(matched_indices), total_updates, + ) commit_messages: List[CommitMessage] = [] if matched_indices: