From 23008066e6507f664b2e403dc333379d0eac81a7 Mon Sep 17 00:00:00 2001 From: David Lutterkort Date: Thu, 3 Apr 2025 08:33:14 -0700 Subject: [PATCH] store: Revert entity versions during copy instead of after Reverting entities after copying can be very slow; the step is also unnecessary since we already know during copying which entity versions need to be unclamped and we never copy versions that would have to be deleted by the revert. We move the revert logic into CopyEntityBatchQuery so that entity versions are reverted as they are copied rather than in a separate revert_block pass after copying completes. The post-copy revert_block call in start_subgraph is kept as a no-op safety net for copies that were started with older code and resumed after upgrading. It can be removed once a release with this logic has been out for long enough. --- store/postgres/src/copy.rs | 34 +++++++++++++++++------- store/postgres/src/deployment_store.rs | 20 ++++++++++---- store/postgres/src/relational_queries.rs | 24 ++++++++++++++++- 3 files changed, 63 insertions(+), 15 deletions(-) diff --git a/store/postgres/src/copy.rs b/store/postgres/src/copy.rs index 54c1a03a896..fd6cd2c4fe0 100644 --- a/store/postgres/src/copy.rs +++ b/store/postgres/src/copy.rs @@ -178,7 +178,14 @@ impl CopyState { dst: Arc, target_block: BlockPtr, ) -> Result { - let tables = TableState::load(conn, primary, src.as_ref(), dst.as_ref()).await?; + let tables = TableState::load( + conn, + primary, + src.as_ref(), + dst.as_ref(), + target_block.number, + ) + .await?; let (finished, mut unfinished): (Vec<_>, Vec<_>) = tables.into_iter().partition(|table| table.finished()); unfinished.sort_by_key(|table| table.dst.object.to_string()); @@ -329,6 +336,7 @@ struct TableState { dst_site: Arc, batcher: VidBatcher, duration_ms: i64, + target_block: BlockNumber, } impl TableState { @@ -351,6 +359,7 @@ impl TableState { dst_site, batcher, duration_ms: 0, + target_block: target_block.number, }) } @@ -363,6 +372,7 @@ impl TableState { primary: Primary, src_layout: &Layout, dst_layout: &Layout, + target_block: BlockNumber, ) -> Result, StoreError> { use copy_table_state as cts; @@ -429,6 +439,7 @@ impl TableState { dst_site: dst_layout.site.clone(), batcher, duration_ms, + target_block, }; states.push(state); } @@ -503,15 +514,20 @@ impl TableState { } async fn copy_batch(&mut self, conn: &mut AsyncPgConnection) -> Result { - let (duration, count) = self + let (duration, count): (_, Option) = self .batcher - .step(async |start, end| { - let count = - rq::CopyEntityBatchQuery::new(self.dst.as_ref(), &self.src, start, end)? - .count_current() - .get_result::(conn) - .await - .optional()?; + .step(async |start: i64, end: i64| { + let count = rq::CopyEntityBatchQuery::new( + self.dst.as_ref(), + &self.src, + start, + end, + self.target_block, + )? + .count_current() + .get_result::(conn) + .await + .optional()?; Ok(count.unwrap_or(0) as i32) }) .await?; diff --git a/store/postgres/src/deployment_store.rs b/store/postgres/src/deployment_store.rs index 4d8df5c3a28..d4ab71c7f11 100644 --- a/store/postgres/src/deployment_store.rs +++ b/store/postgres/src/deployment_store.rs @@ -1648,11 +1648,21 @@ impl DeploymentStore { .await?; } - // Rewind the subgraph so that entity versions that are - // clamped in the future (beyond `block`) become valid for - // all blocks after `block`. `revert_block` gets rid of - // everything including the block passed to it. We want to - // preserve `block` and therefore revert `block+1` + // CopyEntityBatchQuery now reverts entity versions + // during copying, making this rewind redundant for new + // copies. We keep it for backward compatibility: a copy + // that was started before this change and is resumed + // after upgrading will have already-copied rows that + // weren't reverted during copy. For data that was + // already reverted during copy, this is a no-op. This + // code can be removed once a release with this change + // has been out for a while and we are sure that there + // are no more copies in progress that started before + // the change + // + // `revert_block` gets rid of everything including the + // block passed to it. We want to preserve `block` and + // therefore revert `block+1` let start = Instant::now(); let block_to_revert: BlockNumber = block .number diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 1c746f1338e..34d88585ab0 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -5093,6 +5093,7 @@ pub struct CopyEntityBatchQuery<'a> { columns: Vec<&'a Column>, first_vid: i64, last_vid: i64, + target_block: BlockNumber, } impl<'a> CopyEntityBatchQuery<'a> { @@ -5101,6 +5102,7 @@ impl<'a> CopyEntityBatchQuery<'a> { src: &'a Table, first_vid: i64, last_vid: i64, + target_block: BlockNumber, ) -> Result { let mut columns = Vec::new(); for dcol in &dst.columns { @@ -5127,6 +5129,7 @@ impl<'a> CopyEntityBatchQuery<'a> { columns, first_vid, last_vid, + target_block, }) } @@ -5211,7 +5214,16 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { ); out.push_sql(&checked_conversion); } - (false, false) => out.push_sql(BLOCK_RANGE_COLUMN), + (false, false) => { + let range_conv = format!( + r#" + case when upper({BLOCK_RANGE_COLUMN}) > {} + then int4range(lower({BLOCK_RANGE_COLUMN}), null) + else {BLOCK_RANGE_COLUMN} end"#, + self.target_block + ); + out.push_sql(&range_conv) + } } match (self.src.has_causality_region, self.dst.has_causality_region) { @@ -5241,6 +5253,16 @@ impl<'a> QueryFragment for CopyEntityBatchQuery<'a> { out.push_bind_param::(&self.first_vid)?; out.push_sql(" and vid <= "); out.push_bind_param::(&self.last_vid)?; + out.push_sql(" and "); + if self.src.immutable { + out.push_sql(BLOCK_COLUMN); + } else { + out.push_sql("lower("); + out.push_sql(BLOCK_RANGE_COLUMN); + out.push_sql(")"); + } + out.push_sql(" <= "); + out.push_bind_param::(&self.target_block)?; out.push_sql("\n returning "); if self.dst.immutable { out.push_sql("true");