diff --git a/graph/src/blockchain/mock.rs b/graph/src/blockchain/mock.rs index 74d23b388a9..79ca098849e 100644 --- a/graph/src/blockchain/mock.rs +++ b/graph/src/blockchain/mock.rs @@ -6,7 +6,7 @@ use crate::{ network_provider::ChainName, store::{ BlockNumber, ChainHeadStore, ChainIdStore, DeploymentCursorTracker, DeploymentLocator, - SourceableStore, + SourceableStore, StaleCallCacheResult, }, subgraph::InstanceDSTemplateInfo, }, @@ -595,8 +595,8 @@ impl ChainStore for MockChainStore { async fn clear_stale_call_cache( &self, _ttl_days: usize, - _ttl_max_contracts: Option, - ) -> Result<(), Error> { + _max_contracts: Option, + ) -> Result { unimplemented!() } async fn chain_identifier(&self) -> Result { diff --git a/graph/src/components/store/traits.rs b/graph/src/components/store/traits.rs index 4c39f182c01..bb4397432ff 100644 --- a/graph/src/components/store/traits.rs +++ b/graph/src/components/store/traits.rs @@ -530,6 +530,18 @@ pub trait ChainIdStore: Send + Sync + 'static { /// configuration can change this for individual chains pub const BLOCK_CACHE_SIZE: BlockNumber = i32::MAX; +/// Result of clearing stale call cache entries. +pub struct StaleCallCacheResult { + /// The effective TTL in days that was actually used for deletion. + /// This may be larger than the requested TTL if `max_contracts` + /// was set and caused the cutoff to be adjusted. + pub effective_ttl_days: usize, + /// Number of cache entries deleted from the call cache. + pub cache_entries_deleted: usize, + /// Number of contract entries deleted from call meta. + pub contracts_deleted: usize, +} + /// Common trait for blockchain store implementations. #[async_trait] pub trait ChainStore: ChainHeadStore { @@ -657,12 +669,14 @@ pub trait ChainStore: ChainHeadStore { /// Clears call cache of the chain for the given `from` and `to` block number. async fn clear_call_cache(&self, from: BlockNumber, to: BlockNumber) -> Result<(), Error>; - /// Clears stale call cache entries for the given TTL in days. + /// Clears stale call cache entries for the given TTL in days. If + /// `max_contracts` is set, increase the effective TTL so that at + /// most `max_contracts` contracts are evicted. async fn clear_stale_call_cache( &self, ttl_days: usize, - ttl_max_contracts: Option, - ) -> Result<(), Error>; + max_contracts: Option, + ) -> Result; /// Return the chain identifier for this store. async fn chain_identifier(&self) -> Result; diff --git a/graph/src/env/store.rs b/graph/src/env/store.rs index c38ad9aaac3..e36e60c8b84 100644 --- a/graph/src/env/store.rs +++ b/graph/src/env/store.rs @@ -171,11 +171,6 @@ pub struct EnvVarsStore { /// Disables storing or reading `eth_call` results from the store call cache. /// Set by `GRAPH_STORE_DISABLE_CALL_CACHE`. Defaults to false. pub disable_call_cache: bool, - /// The number of contracts to delete from the call cache in one batch - /// when clearing stale entries, set by - /// `GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE`. The default - /// value is 100 contracts. - pub stale_call_cache_contracts_batch_size: usize, /// Set by `GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE`. Default is false. /// Set to true to disable chain_head_ptr caching (safety escape hatch). pub disable_chain_head_ptr_cache: bool, @@ -253,7 +248,6 @@ impl TryFrom for EnvVarsStore { account_like_min_versions_count: x.account_like_min_versions_count, account_like_max_unique_ratio: x.account_like_max_unique_ratio.map(|r| r.0), disable_call_cache: x.disable_call_cache, - stale_call_cache_contracts_batch_size: x.stale_call_cache_contracts_batch_size, disable_chain_head_ptr_cache: x.disable_chain_head_ptr_cache, connection_validation_idle_secs: Duration::from_secs(x.connection_validation_idle_secs), connection_unavailable_retry: Duration::from_secs( @@ -370,11 +364,6 @@ pub struct InnerStore { account_like_max_unique_ratio: Option, #[envconfig(from = "GRAPH_STORE_DISABLE_CALL_CACHE", default = "false")] disable_call_cache: bool, - #[envconfig( - from = "GRAPH_STORE_STALE_CALL_CACHE_CONTRACTS_BATCH_SIZE", - default = "100" - )] - stale_call_cache_contracts_batch_size: usize, #[envconfig(from = "GRAPH_STORE_DISABLE_CHAIN_HEAD_PTR_CACHE", default = "false")] disable_chain_head_ptr_cache: bool, #[envconfig(from = "GRAPH_STORE_CONNECTION_VALIDATION_IDLE_SECS", default = "30")] diff --git a/graph/src/lib.rs b/graph/src/lib.rs index bf3777ae05e..83a8a4a9112 100644 --- a/graph/src/lib.rs +++ b/graph/src/lib.rs @@ -128,8 +128,8 @@ pub mod prelude { EntityCollection, EntityFilter, EntityLink, EntityOperation, EntityOrder, EntityOrderByChild, EntityOrderByChildInfo, EntityQuery, EntityRange, EntityWindow, EthereumCallCache, ParentLink, PartialBlockPtr, PoolWaitStats, QueryStore, - QueryStoreManager, SeqGenerator, StoreError, StoreEvent, StoreEventStreamBox, - SubgraphStore, UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, + QueryStoreManager, SeqGenerator, StaleCallCacheResult, StoreError, StoreEvent, + StoreEventStreamBox, SubgraphStore, UnfailOutcome, WindowAttribute, BLOCK_NUMBER_MAX, }; pub use crate::components::subgraph::{ BlockState, BlockStateCheckpoint, HostMetrics, InstanceDSTemplateInfo, RuntimeHost, diff --git a/node/src/bin/manager.rs b/node/src/bin/manager.rs index 684c3936070..a3e8216410c 100644 --- a/node/src/bin/manager.rs +++ b/node/src/bin/manager.rs @@ -620,10 +620,11 @@ pub enum CallCacheCommand { remove_entire_cache: bool, /// Remove the cache for contracts that have not been accessed in the last days #[clap(long, conflicts_with_all = &["from", "to", "remove-entire-cache"], value_parser = clap::value_parser!(u32).range(1..))] - ttl_days: Option, - /// Limits the number of contracts to consider for cache removal when using --ttl_days - #[clap(long, conflicts_with_all = &["remove-entire-cache", "to", "from"], requires = "ttl_days", value_parser = clap::value_parser!(u64).range(1..))] - ttl_max_contracts: Option, + ttl_days: Option, + /// Maximum number of contracts to evict. When set, the effective TTL + /// is increased so that at most this many contracts are deleted. + #[clap(long, requires = "ttl_days")] + max_contracts: Option, /// Starting block number #[clap(long, short, conflicts_with = "remove-entire-cache", requires = "to")] from: Option, @@ -1549,14 +1550,14 @@ async fn main() -> anyhow::Result<()> { to, remove_entire_cache, ttl_days, - ttl_max_contracts, + max_contracts, } => { let chain_store = ctx.chain_store(&chain_name).await?; if let Some(ttl_days) = ttl_days { return commands::chain::clear_stale_call_cache( chain_store, - ttl_days, - ttl_max_contracts, + ttl_days as usize, + max_contracts, ) .await; } diff --git a/node/src/manager/commands/chain.rs b/node/src/manager/commands/chain.rs index df50c5e4069..3943861fe56 100644 --- a/node/src/manager/commands/chain.rs +++ b/node/src/manager/commands/chain.rs @@ -84,15 +84,27 @@ pub async fn clear_call_cache( pub async fn clear_stale_call_cache( chain_store: Arc, ttl_days: usize, - ttl_max_contracts: Option, + max_contracts: Option, ) -> Result<(), Error> { println!( "Removing stale entries from the call cache for `{}`", chain_store.chain ); - chain_store - .clear_stale_call_cache(ttl_days, ttl_max_contracts) + let result = chain_store + .clear_stale_call_cache(ttl_days, max_contracts) .await?; + if result.effective_ttl_days != ttl_days { + println!( + "Effective TTL: {} days (adjusted from {} to stay within {} contracts)", + result.effective_ttl_days, + ttl_days, + max_contracts.unwrap() + ); + } + println!( + "Deleted {} cache entries for {} contracts", + result.cache_entries_deleted, result.contracts_deleted + ); Ok(()) } diff --git a/store/postgres/src/chain_store.rs b/store/postgres/src/chain_store.rs index dfd088934bc..e1d9bea2538 100644 --- a/store/postgres/src/chain_store.rs +++ b/store/postgres/src/chain_store.rs @@ -12,7 +12,7 @@ use graph::parking_lot::RwLock; use graph::prelude::alloy::primitives::B256; use graph::prelude::MetricsRegistry; use graph::prometheus::{CounterVec, GaugeVec}; -use graph::slog::{info, Logger}; +use graph::slog::{info, o, Logger}; use graph::stable_hash::crypto_stable_hash; use graph::util::herd_cache::HerdCache; @@ -32,7 +32,8 @@ use graph::cheap_clone::CheapClone; use graph::components::ethereum::CachedBlock; use graph::prelude::{ serde_json as json, transaction_receipt::LightTransactionReceipt, BlockNumber, BlockPtr, - CachedEthereumCall, ChainStore as ChainStoreTrait, Error, EthereumCallCache, StoreError, + CachedEthereumCall, ChainStore as ChainStoreTrait, Error, EthereumCallCache, + StaleCallCacheResult, StoreError, }; use graph::{ensure, internal_error}; @@ -169,6 +170,7 @@ mod data { pub(crate) const ETHEREUM_BLOCKS_TABLE_NAME: &str = "public.ethereum_blocks"; pub(crate) const ETHEREUM_CALL_CACHE_TABLE_NAME: &str = "public.eth_call_cache"; + pub(crate) const ETHEREUM_CALL_META_TABLE_NAME: &str = "public.eth_call_meta"; mod public { pub(super) use super::super::public::ethereum_networks; @@ -305,11 +307,6 @@ mod data { fn contract_address(&self) -> DynColumn { self.table.column::("contract_address") } - - fn accessed_at(&self) -> DynColumn { - self.table - .column::(Self::ACCESSED_AT) - } } #[derive(Clone, Debug)] @@ -1707,117 +1704,122 @@ mod data { Ok(()) } - /// Find up to `batch_limit` contract addresses that have not - /// been accessed in the last `ttl_days` days - pub(super) async fn stale_contracts( + /// Delete up to `batch_size` call_cache rows whose contract_address + /// appears in call_meta with accessed_at older than `ttl_days`. + /// Returns the number of deleted rows. + pub(super) async fn delete_stale_calls_batch( &self, conn: &mut AsyncPgConnection, - batch_limit: usize, - ttl_days: usize, - ) -> Result>, StoreError> { - let ttl_days = ttl_days as i64; - let batch_limit = batch_limit as i64; - match self { - Storage::Shared => { - use public::eth_call_meta as meta; - - meta::table - .select(meta::contract_address) - .filter( - meta::accessed_at - .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), - ) - .limit(batch_limit) - .get_results::>(conn) - .await - .map_err(StoreError::from) - } - Storage::Private(Schema { call_meta, .. }) => call_meta - .table() - .select(call_meta.contract_address()) - .filter( - call_meta - .accessed_at() - .lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), - ) - .limit(batch_limit) - .get_results::>(conn) - .await - .map_err(StoreError::from), - } + ttl_days: i64, + batch_size: i64, + ) -> Result { + let (cache, meta) = match self { + Storage::Shared => ( + ETHEREUM_CALL_CACHE_TABLE_NAME, + ETHEREUM_CALL_META_TABLE_NAME, + ), + Storage::Private(Schema { + call_cache, + call_meta, + .. + }) => (call_cache.qname.as_str(), call_meta.qname.as_str()), + }; + let query = format!( + "/* controller='call_cache_cleanup',days={ttl_days},batch_size={batch_size} */ \ + delete from {cache} + where ctid in ( + select cc.ctid + from {cache} cc + join {meta} cm + on cc.contract_address = cm.contract_address + where cm.accessed_at < current_date - $1 * interval '1 day' + limit $2 + )" + ); + sql_query(&query) + .bind::(ttl_days) + .bind::(batch_size) + .execute(conn) + .await + .map_err(StoreError::from) } - /// Delete up to `batch_size` calls for the given - /// `stale_contracts`. Returns the number of deleted calls. - pub(super) async fn delete_calls( + /// Given `ttl_days` and `max_contracts`, compute an effective + /// TTL in days such that at most `max_contracts` contracts would + /// be evicted. Returns a value >= `ttl_days`. + /// + /// We look at the (max_contracts+1)th oldest stale contract. If + /// it exists with `accessed_at = D`, the effective TTL is set to + /// `current_date - D` days, making the deletion cutoff exactly + /// `D`. Since the deletion query uses `accessed_at < cutoff`, + /// contracts with `accessed_at = D` are excluded, guaranteeing + /// at most `max_contracts` deletions. + /// + /// When multiple contracts share the boundary date, we may + /// delete fewer than `max_contracts` since we can only control + /// the cutoff at date granularity. + pub(super) async fn effective_ttl( &self, conn: &mut AsyncPgConnection, - stale_contracts: &[Vec], - batch_size: i64, - ) -> Result { - match self { - Storage::Shared => { - use public::eth_call_cache as cache; - - let next_batch = cache::table - .select(cache::id) - .filter(cache::contract_address.eq_any(stale_contracts)) - .limit(batch_size) - .get_results::>(conn) - .await?; + ttl_days: i64, + max_contracts: i64, + ) -> Result { + #[derive(QueryableByName)] + struct Row { + #[diesel(sql_type = diesel::sql_types::BigInt)] + effective_ttl: i64, + } - diesel::delete(cache::table.filter(cache::id.eq_any(&next_batch))) - .execute(conn) - .await - .map_err(StoreError::from) - } - Storage::Private(Schema { call_cache, .. }) => { - let delete_cache_query = format!( - "WITH targets AS ( - SELECT id - FROM {qname} - WHERE contract_address = ANY($1) - LIMIT $2 - ) - DELETE FROM {qname} USING targets - WHERE {qname}.id = targets.id", - qname = call_cache.qname - ); + let meta_table = match self { + Storage::Shared => ETHEREUM_CALL_META_TABLE_NAME, + Storage::Private(Schema { call_meta, .. }) => call_meta.qname.as_str(), + }; + let query = format!( + "select extract(day from current_date - accessed_at)::int8 + 1\ + as effective_ttl \ + from {meta_table} \ + where accessed_at < current_date - $1 * interval '1 day' \ + order by accessed_at asc \ + offset $2 \ + limit 1" + ); + let effective_ttl: Option = sql_query(&query) + .bind::(ttl_days) + .bind::(max_contracts) + .get_result::(conn) + .await + .optional()? + .map(|r| r.effective_ttl); - sql_query(&delete_cache_query) - .bind::, _>(stale_contracts) - .bind::(batch_size) - .execute(conn) - .await - .map_err(StoreError::from) - } - } + Ok(effective_ttl.unwrap_or(ttl_days)) } - pub(super) async fn delete_contracts( + /// Delete all call_meta entries with accessed_at older than + /// `ttl_days`. Should be called after all corresponding + /// call_cache rows have been deleted. + pub(super) async fn delete_stale_meta( &self, conn: &mut AsyncPgConnection, - stale_contracts: &[Vec], + ttl_days: i64, ) -> Result { match self { Storage::Shared => { use public::eth_call_meta as meta; - diesel::delete( - meta::table.filter(meta::contract_address.eq_any(stale_contracts)), - ) + diesel::delete(meta::table.filter( + meta::accessed_at.lt(diesel::dsl::date(diesel::dsl::now - ttl_days.days())), + )) .execute(conn) .await .map_err(StoreError::from) } Storage::Private(Schema { call_meta, .. }) => { - let delete_meta_query = format!( - "DELETE FROM {} WHERE contract_address = ANY($1)", + let query = format!( + "delete from {} where accessed_at < current_date - $1 * interval '1 day'", call_meta.qname ); - - sql_query(&delete_meta_query) - .bind::, _>(stale_contracts) + sql_query(&query) + .bind::(ttl_days) .execute(conn) .await .map_err(StoreError::from) @@ -3279,83 +3281,88 @@ impl ChainStoreTrait for ChainStore { async fn clear_stale_call_cache( &self, ttl_days: usize, - ttl_max_contracts: Option, - ) -> Result<(), Error> { + max_contracts: Option, + ) -> Result { + const LOG_INTERVAL: Duration = Duration::from_mins(5); + let conn = &mut self.pool.get_permitted().await?; + let logger = self.logger.new(o!("component" => "CallCacheCleanup")); + self.storage .ensure_contract_address_index(conn, &self.logger) .await?; - let mut total_calls: usize = 0; - let mut total_contracts: usize = 0; - // We process contracts in batches to avoid loading too many - // entries into memory at once. Each contract can have many - // calls, so we delete calls in adaptive batches that - // self-tune based on query duration. - let contracts_batch_size: usize = ENV_VARS.store.stale_call_cache_contracts_batch_size; - let mut batch_size = AdaptiveBatchSize::with_size(100); - - // Limits the number of contracts to process if ttl_max_contracts is set. - // Used also to adjust the final batch size, so we don't process more - // contracts than the set limit. - let remaining_contracts = |processed: usize| -> Option { - ttl_max_contracts.map(|limit| limit.saturating_sub(processed)) + let ttl_days = ttl_days as i64; + let effective_ttl = match max_contracts { + Some(max) => { + self.storage + .effective_ttl(conn, ttl_days, max as i64) + .await? + } + None => ttl_days, }; - loop { - if let Some(0) = remaining_contracts(total_contracts) { - info!(self.logger, - "Finished cleaning call cache: deleted {} entries for {} contracts (limit reached)", - total_calls, total_contracts); - break; - } + if effective_ttl != ttl_days { + info!( + logger, + "adjusted ttl from {} to {} days to stay within {} contracts", + ttl_days, + effective_ttl, + max_contracts.unwrap() + ); + } - let batch_limit = remaining_contracts(total_contracts) - .map(|left| left.min(contracts_batch_size)) - .unwrap_or(contracts_batch_size); + let mut total_deleted: usize = 0; + let mut batch_count: usize = 0; + let mut batch_size = AdaptiveBatchSize::with_size(100); + let mut last_log = Instant::now(); - let stale_contracts = self + loop { + let current_size = batch_size.size; + let start = Instant::now(); + let deleted = self .storage - .stale_contracts(conn, batch_limit, ttl_days) + .delete_stale_calls_batch(conn, effective_ttl, current_size) .await?; - if stale_contracts.is_empty() { + batch_size.adapt(start.elapsed()); + total_deleted += deleted; + batch_count += 1; + + if last_log.elapsed() >= LOG_INTERVAL { info!( - self.logger, - "Finished cleaning call cache: deleted {} entries for {} contracts", - total_calls, - total_contracts + logger, + "deleted {} entries in this batch \ + ({} total in {} batches, batch_size now {})", + deleted, + total_deleted, + batch_count, + batch_size.size ); - break; + last_log = Instant::now(); } - loop { - let current_size = batch_size.size; - let start = Instant::now(); - let deleted_count = self - .storage - .delete_calls(conn, &stale_contracts, current_size) - .await?; - - batch_size.adapt(start.elapsed()); - - total_calls += deleted_count; - - if (deleted_count as i64) < current_size { - break; - } + if (deleted as i64) < current_size { + break; } + } - let deleted_contracts = self - .storage - .delete_contracts(conn, &stale_contracts) - .await?; + let contracts_deleted = self.storage.delete_stale_meta(conn, effective_ttl).await?; - total_contracts += deleted_contracts; - } + info!( + logger, + "deleted {} cache entries and {} contract entries in {} batches", + total_deleted, + contracts_deleted, + batch_count + ); - Ok(()) + Ok(StaleCallCacheResult { + effective_ttl_days: effective_ttl as usize, + cache_entries_deleted: total_deleted, + contracts_deleted, + }) } async fn transaction_receipts_in_block(