diff --git a/.vscode/settings.json b/.vscode/settings.json index a47cdf9..afa3b89 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -1,3 +1,4 @@ { - "rust-analyzer.rustfmt.extraArgs": ["+nightly"] + "rust-analyzer.rustfmt.extraArgs": ["+nightly"], + "rust-analyzer.cargo.features": ["http-subscription"] } diff --git a/Cargo.lock b/Cargo.lock index 312b95a..bad7198 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -718,6 +727,56 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.100" @@ -1193,6 +1252,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "const-hex" version = "1.17.0" @@ -1534,6 +1599,27 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "env_filter" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +dependencies = [ + "log", +] + +[[package]] +name = "env_logger" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -2163,6 +2249,12 @@ dependencies = [ "serde", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.10.5" @@ -2239,6 +2331,12 @@ dependencies = [ "sha3-asm", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.179" @@ -2304,6 +2402,15 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.7.6" @@ -2321,6 +2428,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -2407,6 +2523,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -2817,6 +2939,17 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.8.8" @@ -2903,6 +3036,7 @@ dependencies = [ "anyhow", "backon", "serde_json", + "test-log", "thiserror", "tokio", "tokio-stream", @@ -3280,6 +3414,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3449,6 +3592,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "test-log" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d53ac171c92a39e4769491c4b4dde7022c60042254b5fc044ae409d34a24d4" +dependencies = [ + "env_logger", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be35209fd0781c5401458ab66e4f98accf63553e8fae7425503e92fdd319783b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "thiserror" version = "2.0.17" @@ -3469,6 +3634,15 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -3724,6 +3898,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3829,6 +4032,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index 73177be..7249f03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,12 +34,14 @@ serde_json = "1.0.149" [dev-dependencies] anyhow = "1.0" alloy = { version = "1.1.2", features = ["node-bindings", "provider-ws"] } +test-log = { version = "0.2.18", features = ["trace"] } [package.metadata.docs.rs] all-features = true [features] tracing = ["dep:tracing"] +http-subscription = [] [profile.release] lto = "thin" diff --git a/README.md b/README.md index 7dc0712..5ed28ff 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,26 @@ while let Some(result) = stream.next().await { - When a fallback fails, the primary is tried first before moving to the next fallback. - The `Lagged` error indicates the consumer is not keeping pace with incoming blocks. +#### HTTP-based subscriptions (feature flag) + +By default, subscriptions use WebSocket/pubsub-capable providers. Normally, HTTP-only providers are skipped during subscription retries. If your environment only exposes HTTP endpoints, you can enable HTTP-based block subscriptions via polling using the `http-subscription` Cargo feature: + +```toml +[dependencies] +robust-provider = { version = "1.0.0", features = ["http-subscription"] } +``` + +With this feature enabled and `allow_http_subscriptions(true)` is set, those HTTP providers can also act as subscription sources via polling, and are treated like regular pubsub-capable providers in the retry/failover logic: + +```rust +let robust = RobustProviderBuilder::new(http_provider) + .allow_http_subscriptions(true) + // Optional: tune how often to poll for new blocks (defaults to ~12s) + .poll_interval(Duration::from_secs(12)) + .build() + .await?; +``` + --- ## Provider Conversion diff --git a/src/lib.rs b/src/lib.rs index 0daa9ca..e1c7349 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,5 +68,8 @@ pub use robust_provider::{ CoreError, DEFAULT_CALL_TIMEOUT, DEFAULT_MAX_RETRIES, DEFAULT_MIN_DELAY, DEFAULT_RECONNECT_INTERVAL, DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, DEFAULT_SUBSCRIPTION_TIMEOUT, Error, IntoRobustProvider, IntoRootProvider, RobustProvider, RobustProviderBuilder, - RobustSubscription, RobustSubscriptionStream, SubscriptionError, + RobustSubscription, RobustSubscriptionStream, }; + +#[cfg(feature = "http-subscription")] +pub use robust_provider::DEFAULT_POLL_INTERVAL; diff --git a/src/macros/rpc.rs b/src/macros/rpc.rs index bf9448c..9123ccd 100644 --- a/src/macros/rpc.rs +++ b/src/macros/rpc.rs @@ -129,7 +129,6 @@ macro_rules! robust_rpc { // Call the provider method with turbofish syntax if generics are present provider.$method $(::<$($generic),+>)? ($($($arg),+)?).await }, - false, // is_subscription = false ) .await; @@ -203,7 +202,6 @@ macro_rules! robust_rpc { provider.$method $(::<$($generic),+>)? ($($arg),+).await } }, - false, // is_subscription = false ) .await; diff --git a/src/robust_provider/builder.rs b/src/robust_provider/builder.rs index 4fe4db5..a419f73 100644 --- a/src/robust_provider/builder.rs +++ b/src/robust_provider/builder.rs @@ -19,6 +19,12 @@ pub const DEFAULT_MAX_RETRIES: usize = 3; pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1); /// Default subscription channel size. pub const DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY: usize = 128; +/// Default polling interval for HTTP subscriptions. +/// +/// Set to 12 seconds to match approximate Ethereum mainnet block time. +/// Adjust based on the target chain's block time. +#[cfg(feature = "http-subscription")] +pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(12); /// Builder for constructing a [`RobustProvider`]. /// @@ -32,6 +38,10 @@ pub struct RobustProviderBuilder> { min_delay: Duration, reconnect_interval: Duration, subscription_buffer_capacity: usize, + #[cfg(feature = "http-subscription")] + poll_interval: Duration, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: bool, } impl> RobustProviderBuilder { @@ -50,6 +60,10 @@ impl> RobustProviderBuilder { min_delay: DEFAULT_MIN_DELAY, reconnect_interval: DEFAULT_RECONNECT_INTERVAL, subscription_buffer_capacity: DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, + #[cfg(feature = "http-subscription")] + poll_interval: DEFAULT_POLL_INTERVAL, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: false, } } @@ -127,6 +141,67 @@ impl> RobustProviderBuilder { self } + /// Set the polling interval for HTTP-based subscriptions. + /// + /// This controls how frequently HTTP providers poll for new blocks + /// when used as subscription sources. Only relevant when + /// [`allow_http_subscriptions`](Self::allow_http_subscriptions) is enabled. + /// + /// Default is [`DEFAULT_POLL_INTERVAL`]. + /// Adjust based on your target chain's block time. + /// + /// # Feature Flag + /// + /// This method requires the `http-subscription` feature. + /// + /// # Example + /// + /// ```rust,ignore + /// let robust = RobustProviderBuilder::new(http_provider) + /// .allow_http_subscriptions(true) + /// .poll_interval(Duration::from_secs(6)) // For faster chains + /// .build() + /// .await?; + /// ``` + #[cfg(feature = "http-subscription")] + #[must_use] + pub fn poll_interval(mut self, interval: Duration) -> Self { + self.poll_interval = interval; + self + } + + /// Enable HTTP providers for subscriptions via polling. + /// + /// When enabled, HTTP providers can participate in subscriptions + /// by polling for new blocks at the configured [`poll_interval`](Self::poll_interval). + /// + /// # Trade-offs + /// + /// * **Latency**: New blocks detected with up to `poll_interval` delay + /// * **RPC Load**: Generates one RPC call per `poll_interval` + /// * **Intermediate Blocks**: Depending on the node/provider semantics, you may not observe + /// every intermediate block when `poll_interval` is larger than the chain's block time (e.g. + /// if only the latest head is exposed). + /// + /// # Feature Flag + /// + /// This method requires the `http-subscription` feature. + /// + /// # Example + /// + /// ```rust,ignore + /// let robust = RobustProviderBuilder::new(http_provider) + /// .allow_http_subscriptions(true) + /// .build() + /// .await?; + /// ``` + #[cfg(feature = "http-subscription")] + #[must_use] + pub fn allow_http_subscriptions(mut self, allow: bool) -> Self { + self.allow_http_subscriptions = allow; + self + } + /// Build the `RobustProvider`. /// /// Final builder method: consumes the builder and returns the built [`RobustProvider`]. @@ -165,6 +240,10 @@ impl> RobustProviderBuilder { min_delay: self.min_delay, reconnect_interval: self.reconnect_interval, subscription_buffer_capacity: self.subscription_buffer_capacity, + #[cfg(feature = "http-subscription")] + poll_interval: self.poll_interval, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: self.allow_http_subscriptions, }) } } diff --git a/src/robust_provider/errors.rs b/src/robust_provider/errors.rs index e864e94..7af857b 100644 --- a/src/robust_provider/errors.rs +++ b/src/robust_provider/errors.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use alloy::transports::{RpcError, TransportErrorKind}; use thiserror::Error; -use tokio::time::error as TokioError; - -use super::subscription; +use tokio::{sync::broadcast::error::RecvError, time::error as TokioError}; /// Errors that can occur when using [`super::RobustProvider`]. #[derive(Error, Debug, Clone)] @@ -43,6 +41,13 @@ pub enum Error { /// [`Error::RpcError`]. #[error("Block not found")] BlockNotFound, + + /// The subscription channel was closed. + #[error("Subscription channel closed")] + Closed, + + #[error("Subscription lagged behind by: {0}")] + Lagged(u64), } /// Low-level error related to RPC calls and failover logic. @@ -98,13 +103,11 @@ impl From for Error { } } -impl From for Error { - fn from(err: subscription::Error) -> Self { +impl From for Error { + fn from(err: RecvError) -> Self { match err { - subscription::Error::RpcError(e) => Error::RpcError(e), - subscription::Error::Timeout | - subscription::Error::Closed | - subscription::Error::Lagged(_) => Error::Timeout, + RecvError::Closed => Error::Closed, + RecvError::Lagged(count) => Error::Lagged(count), } } } diff --git a/src/robust_provider/mod.rs b/src/robust_provider/mod.rs index 47dcbf2..2558f1b 100644 --- a/src/robust_provider/mod.rs +++ b/src/robust_provider/mod.rs @@ -13,6 +13,11 @@ //! //! * [`IntoRobustProvider`] - Convert types into a `RobustProvider` //! * [`IntoRootProvider`] - Convert types into an underlying root provider +//! +//! # Feature Flags +//! +//! * `http-subscription` - Enable HTTP-based polling subscriptions for providers without native +//! pubsub support mod builder; mod errors; @@ -24,7 +29,4 @@ pub use builder::*; pub use errors::{CoreError, Error}; pub use provider::RobustProvider; pub use provider_conversion::{IntoRobustProvider, IntoRootProvider}; -pub use subscription::{ - DEFAULT_RECONNECT_INTERVAL, Error as SubscriptionError, RobustSubscription, - RobustSubscriptionStream, -}; +pub use subscription::{DEFAULT_RECONNECT_INTERVAL, RobustSubscription, RobustSubscriptionStream}; diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index 1f581e9..f02813d 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -34,7 +34,10 @@ use alloy::{ transports::{RpcError, TransportErrorKind}, }; -use crate::{Error, block_not_found_doc, robust_provider::RobustSubscription}; +use crate::{ + Error, block_not_found_doc, + robust_provider::{RobustSubscription, subscription::SubscriptionBackend}, +}; /// Provider wrapper with built-in retry and timeout mechanisms. /// @@ -50,6 +53,12 @@ pub struct RobustProvider { pub(crate) min_delay: Duration, pub(crate) reconnect_interval: Duration, pub(crate) subscription_buffer_capacity: usize, + /// Polling interval for HTTP-based subscriptions. + #[cfg(feature = "http-subscription")] + pub(crate) poll_interval: Duration, + /// Whether HTTP providers can participate in subscriptions via polling. + #[cfg(feature = "http-subscription")] + pub(crate) allow_http_subscriptions: bool, } impl RobustProvider { @@ -477,6 +486,10 @@ impl RobustProvider { /// * Detects and recovers from lagged subscriptions /// * Periodically attempts to reconnect to the primary provider /// + /// When the `http-subscription` feature is enabled and + /// [`allow_http_subscriptions`](crate::RobustProviderBuilder::allow_http_subscriptions) + /// is set to `true`, HTTP providers can participate in subscriptions via polling. + /// /// This is a wrapper function for [`Provider::subscribe_blocks`]. /// /// # Errors @@ -486,21 +499,38 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn subscribe_blocks(&self) -> Result, Error> { - let subscription = self - .try_operation_with_failover( - move |provider| async move { - provider - .subscribe_blocks() - .channel_size(self.subscription_buffer_capacity) - .await - }, - true, - ) + let subscription: SubscriptionBackend = self + .try_operation_with_failover(move |provider| async move { + // if HTTP subscriptions are enabled and the provider currently being tried is HTTP, + // we will attempt to connect using it. + // Otherwise try subscribing through a PubSub operation, and if the provider is HTTP + // just let it fail; the error will be non-retriable, so the algorithm will + // automatically switch to the next fallback provider (see + // `try_provider_with_timeout`). + #[cfg(feature = "http-subscription")] + { + let not_pubsub = provider.client().pubsub_frontend().is_none(); + if not_pubsub && self.allow_http_subscriptions { + return provider.watch_blocks().await.map(|builder| { + builder + .with_poll_interval(self.poll_interval) + .with_channel_size(self.subscription_buffer_capacity) + .into() + }); + } + } + provider + .subscribe_blocks() + .channel_size(self.subscription_buffer_capacity) + .await + .map(Into::>::into) + }) .await?; Ok(RobustSubscription::new(subscription, self.clone())) } + // TODO: set watch blocks params from the RP itself robust_rpc!(fn watch_blocks() -> PollerBuilder<(U256,), Vec>); /// Execute `operation` with exponential backoff and a total timeout. @@ -512,8 +542,6 @@ impl RobustProvider { /// If the timeout is exceeded and fallback providers are available, it will /// attempt to use each fallback provider in sequence. /// - /// If `require_pubsub` is true, providers that don't support pubsub will be skipped. - /// /// # Errors /// /// * [`CoreError::RpcError`] - if no fallback providers succeeded; contains the last error @@ -523,7 +551,6 @@ impl RobustProvider { pub async fn try_operation_with_failover( &self, operation: F, - require_pubsub: bool, ) -> Result where F: Fn(RootProvider) -> Fut, @@ -534,7 +561,7 @@ impl RobustProvider { match self.try_provider_with_timeout(primary, &operation).await { Ok(value) => Ok(value), Err(last_error) => self - .try_fallback_providers_from(&operation, require_pubsub, last_error, 0) + .try_fallback_providers_from(&operation, last_error, 0) .await .map(|(value, _)| value), } @@ -544,7 +571,6 @@ impl RobustProvider { pub(crate) async fn try_fallback_providers_from( &self, operation: F, - require_pubsub: bool, mut last_error: CoreError, start_index: usize, ) -> Result<(T, usize), CoreError> @@ -557,20 +583,11 @@ impl RobustProvider { debug!( start_index = start_index, total_fallbacks = fallback_providers.len(), - require_pubsub = require_pubsub, "Primary provider failed, attempting fallback providers" ); let fallback_iter = fallback_providers.iter().enumerate().skip(start_index); for (fallback_idx, provider) in fallback_iter { - if require_pubsub && !Self::supports_pubsub(provider) { - debug!( - provider_index = fallback_idx, - "Skipping fallback provider: pubsub not supported" - ); - continue; - } - trace!( fallback_index = fallback_idx, total_fallbacks = fallback_providers.len(), @@ -644,12 +661,6 @@ impl RobustProvider { .map_err(CoreError::from) } } - - /// Check if a provider supports pubsub - #[must_use] - fn supports_pubsub(provider: &RootProvider) -> bool { - provider.client().pubsub_frontend().is_some() - } } #[cfg(test)] @@ -679,6 +690,10 @@ mod tests { min_delay: Duration::from_millis(min_delay), reconnect_interval: DEFAULT_RECONNECT_INTERVAL, subscription_buffer_capacity: DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, + #[cfg(feature = "http-subscription")] + poll_interval: crate::DEFAULT_POLL_INTERVAL, + #[cfg(feature = "http-subscription")] + allow_http_subscriptions: false, } } @@ -689,14 +704,11 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .try_operation_with_failover( - |_| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - Ok(count) - }, - false, - ) + .try_operation_with_failover(|_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + Ok(count) + }) .await; assert!(matches!(result, Ok(1))); @@ -709,18 +721,15 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .try_operation_with_failover( - |_| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - match count { - 3 => Ok(count), - // retriable error - _ => Err(TransportErrorKind::custom_str("429 Too Many Requests")), - } - }, - false, - ) + .try_operation_with_failover(|_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + // retriable error + _ => Err(TransportErrorKind::custom_str("429 Too Many Requests")), + } + }) .await; assert!(matches!(result, Ok(3))); @@ -733,14 +742,11 @@ mod tests { let call_count = AtomicUsize::new(0); let result: Result<(), CoreError> = provider - .try_operation_with_failover( - |_| async { - call_count.fetch_add(1, Ordering::SeqCst); - // retriable error - Err(TransportErrorKind::custom_str("429 Too Many Requests")) - }, - false, - ) + .try_operation_with_failover(|_| async { + call_count.fetch_add(1, Ordering::SeqCst); + // retriable error + Err(TransportErrorKind::custom_str("429 Too Many Requests")) + }) .await; assert!(matches!(result, Err(CoreError::RpcError(_)))); @@ -753,13 +759,10 @@ mod tests { let provider = test_provider(call_timeout, 10, 1); let result = provider - .try_operation_with_failover( - move |_provider| async move { - sleep(Duration::from_millis(call_timeout + 10)).await; - Ok(42) - }, - false, - ) + .try_operation_with_failover(move |_provider| async move { + sleep(Duration::from_millis(call_timeout + 10)).await; + Ok(42) + }) .await; assert!(matches!(result, Err(CoreError::Timeout))); diff --git a/src/robust_provider/subscription.rs b/src/robust_provider/subscription.rs index cb5ae5b..05c6de3 100644 --- a/src/robust_provider/subscription.rs +++ b/src/robust_provider/subscription.rs @@ -1,6 +1,5 @@ use std::{ pin::Pin, - sync::Arc, task::{Context, Poll, ready}, time::{Duration, Instant}, }; @@ -9,73 +8,86 @@ use alloy::{ network::Network, providers::{Provider, RootProvider}, pubsub::Subscription, - transports::{RpcError, TransportErrorKind}, }; -use thiserror::Error; -use tokio::{sync::broadcast::error::RecvError, time::timeout}; +#[cfg(feature = "http-subscription")] +use alloy::{ + primitives::{BlockHash, U256}, + rpc::client::PollerBuilder, +}; +#[cfg(feature = "http-subscription")] +use tokio::sync::mpsc; +use tokio::time::timeout; use tokio_stream::Stream; use tokio_util::sync::ReusableBoxFuture; -use crate::robust_provider::{CoreError, RobustProvider}; - -/// Errors that can occur when using [`RobustSubscription`]. -#[derive(Error, Debug, Clone)] -pub enum Error { - #[error("Operation timed out")] - Timeout, - #[error("RPC call failed after exhausting all retry attempts: {0}")] - RpcError(Arc>), - #[error("Subscription closed")] - Closed, - #[error("Subscription lagged behind by: {0}")] - Lagged(u64), -} +use crate::{ + Error, + robust_provider::{CoreError, RobustProvider}, +}; -impl From for Error { - fn from(err: CoreError) -> Self { - match err { - CoreError::Timeout => Error::Timeout, - CoreError::RpcError(e) => Error::RpcError(Arc::new(e)), - } - } +/// Default time interval between primary provider reconnection attempts +pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::from_secs(30); + +/// Backend for subscriptions - either native WebSocket or HTTP polling. +/// +/// This enum allows `RobustSubscription` to transparently handle both +/// WebSocket-based and HTTP polling-based subscriptions. +#[derive(Debug)] +pub(crate) enum SubscriptionBackend { + /// Native WebSocket subscription using pubsub + WebSocket(Subscription), + /// HTTP polling-based subscription (requires `http-subscription` feature) + #[cfg(feature = "http-subscription")] + HttpPolling(mpsc::Receiver), } -impl From for Error { - fn from(err: RecvError) -> Self { - match err { - RecvError::Closed => Error::Closed, - RecvError::Lagged(count) => Error::Lagged(count), - } +impl From> for SubscriptionBackend { + fn from(value: Subscription) -> Self { + SubscriptionBackend::WebSocket(value) } } -impl From for Error { - fn from(_: tokio::time::error::Elapsed) -> Self { - Error::Timeout +#[cfg(feature = "http-subscription")] +impl From>> for SubscriptionBackend { + fn from(value: PollerBuilder<(U256,), Vec>) -> Self { + use tokio_stream::StreamExt; + + let (sender, receiver) = mpsc::channel(value.channel_size()); + + let mut stream = value.into_stream(); + tokio::spawn(async move { + while let Some(hashes) = stream.next().await { + for hash in hashes { + if sender.send(hash).await.is_err() { + // Receiver dropped, stop polling + break; + } + } + } + }); + + SubscriptionBackend::HttpPolling(receiver) } } -/// Default time interval between primary provider reconnection attempts -pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::from_secs(30); - /// A robust subscription wrapper that automatically handles provider failover /// and periodic reconnection attempts to the primary provider. #[derive(Debug)] pub struct RobustSubscription { - subscription: Subscription, + backend: SubscriptionBackend, robust_provider: RobustProvider, last_reconnect_attempt: Option, current_fallback_index: Option, } impl RobustSubscription { - /// Create a new [`RobustSubscription`] + /// Create a new [`RobustSubscription`] with a WebSocket backend. pub(crate) fn new( - subscription: Subscription, + subscription: impl Into>, robust_provider: RobustProvider, ) -> Self { Self { - subscription, + backend: subscription.into(), robust_provider, last_reconnect_attempt: None, current_fallback_index: None, @@ -85,7 +97,7 @@ impl RobustSubscription { /// Receive the next item from the subscription with automatic failover. /// /// This method will: - /// * Attempt to receive from the current subscription + /// * Attempt to receive from the current subscription (WebSocket or HTTP polling) /// * Handle errors by switching to fallback providers /// * Periodically attempt to reconnect to the primary provider /// * Will switch to fallback providers if subscription timeout is exhausted @@ -108,21 +120,62 @@ impl RobustSubscription { let subscription_timeout = self.robust_provider.subscription_timeout; loop { - match timeout(subscription_timeout, self.subscription.recv()).await { - Ok(Ok(header)) => { + // Receive from the appropriate backend + let result = match &mut self.backend { + SubscriptionBackend::WebSocket(sub) => { + match timeout(subscription_timeout, sub.recv()).await { + Ok(Ok(header)) => Ok(header), + Ok(Err(recv_error)) => Err(Error::from(recv_error)), + Err(_elapsed) => Err(Error::Timeout), + } + } + #[cfg(feature = "http-subscription")] + SubscriptionBackend::HttpPolling(sub) => { + let result = timeout(subscription_timeout, sub.recv()).await; + match result { + Ok(Some(hash)) => { + use alloy::network::BlockResponse; + + match timeout( + subscription_timeout, + self.robust_provider.get_block_by_hash(hash), + ) + .await + { + Ok(Ok(block)) => Ok(block.header().clone()), + Ok(Err(e)) => Err(e), + Err(_elapsed) => Err(Error::Timeout), + } + } + Ok(None) => Err(Error::Closed), + Err(_elapsed) => Err(Error::Timeout), + } + } + }; + + match result { + Ok(header) => { if self.is_on_fallback() { self.try_reconnect_to_primary(false).await; } return Ok(header); } - Ok(Err(recv_error)) => return Err(recv_error.into()), - Err(_elapsed) => { + Err(Error::Timeout) => { warn!( timeout_secs = subscription_timeout.as_secs(), "Subscription timeout - no block received, switching provider" ); self.switch_to_fallback(CoreError::Timeout).await?; } + // Propagate these errors directly without failover + Err(Error::Closed) => return Err(Error::Closed), + Err(Error::Lagged(count)) => return Err(Error::Lagged(count)), + Err(Error::BlockNotFound) => return Err(Error::BlockNotFound), + // RPC errors trigger failover + Err(Error::RpcError(_e)) => { + warn!("Subscription RPC error, switching provider"); + self.switch_to_fallback(CoreError::Timeout).await?; + } } } } @@ -143,23 +196,52 @@ impl RobustSubscription { return false; } - let operation = - move |provider: RootProvider| async move { provider.subscribe_blocks().await }; - let primary = self.robust_provider.primary(); + let subscription_buffer_capacity = self.robust_provider.subscription_buffer_capacity; + #[cfg(feature = "http-subscription")] + let poll_interval = self.robust_provider.poll_interval; + #[cfg(feature = "http-subscription")] + let allow_http_subscriptions = self.robust_provider.allow_http_subscriptions; + + let operation = move |provider: RootProvider| async move { + // if HTTP subscriptions are enabled and the provider currently being tried is HTTP, + // we will attempt to connect using it. + // Otherwise try subscribing through a PubSub operation, and if the provider is HTTP + // just let it fail; the error will be non-retriable, so the algorithm will + // automatically switch to the next fallback provider (see + // `try_provider_with_timeout`). + #[cfg(feature = "http-subscription")] + { + let not_pubsub = provider.client().pubsub_frontend().is_none(); + if not_pubsub && allow_http_subscriptions { + return provider.watch_blocks().await.map(|builder| { + builder + .with_poll_interval(poll_interval) + .with_channel_size(subscription_buffer_capacity) + .into() + }); + } + } + provider + .subscribe_blocks() + .channel_size(subscription_buffer_capacity) + .await + .map(Into::>::into) + }; + let subscription = self.robust_provider.try_provider_with_timeout(primary, &operation).await; - if let Ok(sub) = subscription { + if let Ok(backend) = subscription { info!("Reconnected to primary provider"); - self.subscription = sub; + self.backend = backend; self.current_fallback_index = None; self.last_reconnect_attempt = None; - true - } else { - self.last_reconnect_attempt = Some(Instant::now()); - false + return true; } + + self.last_reconnect_attempt = Some(Instant::now()); + false } async fn switch_to_fallback(&mut self, last_error: CoreError) -> Result<(), Error> { @@ -172,21 +254,52 @@ impl RobustSubscription { self.last_reconnect_attempt = Some(Instant::now()); } - let operation = - move |provider: RootProvider| async move { provider.subscribe_blocks().await }; - // Start searching from the next provider after the current one let start_index = self.current_fallback_index.map_or(0, |idx| idx + 1); + let fallback_providers = self.robust_provider.fallback_providers(); + let subscription_buffer_capacity = self.robust_provider.subscription_buffer_capacity; + #[cfg(feature = "http-subscription")] + let poll_interval = self.robust_provider.poll_interval; + #[cfg(feature = "http-subscription")] + let allow_http_subscriptions = self.robust_provider.allow_http_subscriptions; + + // Try each fallback provider + for (idx, provider) in fallback_providers.iter().enumerate().skip(start_index) { + let operation = move |p: RootProvider| async move { + #[cfg(feature = "http-subscription")] + { + let not_pubsub = p.client().pubsub_frontend().is_none(); + if not_pubsub && allow_http_subscriptions { + return p.watch_blocks().await.map(|builder| { + builder + .with_poll_interval(poll_interval) + .with_channel_size(subscription_buffer_capacity) + .into() + }); + } + } + p.subscribe_blocks() + .channel_size(subscription_buffer_capacity) + .await + .map(Into::>::into) + }; - let (sub, fallback_idx) = self - .robust_provider - .try_fallback_providers_from(&operation, true, last_error, start_index) - .await?; + if let Ok(backend) = + self.robust_provider.try_provider_with_timeout(provider, &operation).await + { + info!(fallback_index = idx, "Subscription switched to fallback provider"); + self.backend = backend; + self.current_fallback_index = Some(idx); + return Ok(()); + } + } - info!(fallback_index = fallback_idx, "Subscription switched to fallback provider"); - self.subscription = sub; - self.current_fallback_index = Some(fallback_idx); - Ok(()) + // All fallbacks exhausted + error!( + attempted_providers = fallback_providers.len() + 1, + "All providers exhausted for subscription" + ); + Err(last_error.into()) } /// Returns true if currently using a fallback provider @@ -197,7 +310,11 @@ impl RobustSubscription { /// Check if the subscription channel is empty (no pending messages) #[must_use] pub fn is_empty(&self) -> bool { - self.subscription.is_empty() + match &self.backend { + SubscriptionBackend::WebSocket(sub) => sub.is_empty(), + #[cfg(feature = "http-subscription")] + SubscriptionBackend::HttpPolling(sub) => sub.is_empty(), + } } /// Convert the subscription into a stream. diff --git a/tests/http_subscription.rs b/tests/http_subscription.rs new file mode 100644 index 0000000..4cbdbf7 --- /dev/null +++ b/tests/http_subscription.rs @@ -0,0 +1,924 @@ +//! Integration tests for HTTP subscription functionality. +//! +//! These tests verify that HTTP providers can participate in subscriptions +//! via polling when the `http-subscription` feature is enabled. + +#![cfg(feature = "http-subscription")] + +mod common; + +use std::time::Duration; + +use alloy::{ + network::Ethereum, + node_bindings::Anvil, + providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi}, +}; +use common::{BUFFER_TIME, SHORT_TIMEOUT}; +use robust_provider::{Error, RobustProviderBuilder}; +use tokio_stream::StreamExt; + +// ============================================================================ +// Test Helpers +// ============================================================================ + +/// Short poll interval for tests +const TEST_POLL_INTERVAL: Duration = Duration::from_millis(50); + +#[allow(clippy::unused_async)] +async fn spawn_http_anvil() +-> anyhow::Result<(alloy::node_bindings::AnvilInstance, RootProvider)> { + let anvil = Anvil::new().try_spawn()?; + let provider = RootProvider::new_http(anvil.endpoint_url()); + Ok((anvil, provider)) +} + +async fn spawn_ws_anvil() +-> anyhow::Result<(alloy::node_bindings::AnvilInstance, RootProvider)> { + let anvil = Anvil::new().try_spawn()?; + let provider = ProviderBuilder::new().connect(anvil.ws_endpoint_url().as_str()).await?; + Ok((anvil, provider.root().clone())) +} + +// ============================================================================ +// Basic HTTP Subscription Tests +// ============================================================================ + +/// Test: HTTP polling subscription receives blocks correctly +#[tokio::test] +async fn test_http_subscription_basic_flow() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine a block + provider.anvil_mine(Some(1), None).await?; + + // Should receive block 1 + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout waiting for block 1") + .expect("recv error"); + assert_eq!(block.number, 1, "Should receive block 1"); + + // Mine another block + provider.anvil_mine(Some(1), None).await?; + + // Should receive block 2 + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout waiting for block 2") + .expect("recv error"); + assert_eq!(block.number, 2, "Should receive block 2"); + + Ok(()) +} + +// ============================================================================ +// Regression Tests +// ============================================================================ + +/// Test: Enabling `allow_http_subscriptions(true)` does not break WS-only chains. +/// +/// This is a regression guard ensuring pubsub-capable providers still use WS subscriptions +/// even when HTTP subscriptions are enabled. +#[tokio::test] +async fn test_ws_only_chain_works_with_http_subscriptions_enabled() -> anyhow::Result<()> { + let (anvil_primary, primary) = spawn_ws_anvil().await?; + let (_anvil_fallback, fallback) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Receive initial block from WS primary. + primary.anvil_mine(Some(1), None).await?; + // mine different number of blocks on fallback node + fallback.anvil_mine(Some(5), None).await?; + + // should get block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + assert!(subscription.is_empty()); + + // Kill WS primary and ensure we can still fail over to WS fallback. + drop(anvil_primary); + + tokio::spawn(async move { + // sleep just enough before mining to ensure subscription switches to this fallback provider + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + fallback.anvil_mine(Some(1), None).await.unwrap(); + }); + + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 6); + assert!(subscription.is_empty()); + + Ok(()) +} + +/// Test: With mixed fallbacks, HTTP is used when allowed, and WS is used if HTTP dies. +/// +/// Chain: +/// - Primary: WS (pubsub) +/// - Fallback #1: HTTP (polling) +/// - Fallback #2: WS (pubsub) +#[tokio::test] +async fn test_mixed_fallback_ordering_ws_to_http_to_ws() -> anyhow::Result<()> { + let (anvil_ws_primary, ws_primary) = spawn_ws_anvil().await?; + let (anvil_http, http_fallback) = spawn_http_anvil().await?; + let (_anvil_ws2, ws_fallback) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(ws_primary.clone()) + .fallback(http_fallback.clone()) + .fallback(ws_fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .max_retries(0) + .min_delay(Duration::from_millis(0)) + // Same reasoning as `test_failover_ws_to_http_on_provider_death`. + .call_timeout(Duration::from_millis(200)) + .subscription_timeout(Duration::from_secs(2)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Confirm we start on WS primary. + ws_primary.anvil_mine(Some(1), None).await?; + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill WS primary to force failover to HTTP fallback. + drop(anvil_ws_primary); + let http_clone = http_fallback.clone(); + let http_mining_task = tokio::spawn(async move { + tokio::time::sleep(BUFFER_TIME).await; + + // Mine long enough to cover the failover window. + for _ in 0..120 { + if http_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }); + + // Must receive a block after WS primary died; this should come from HTTP fallback. + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert!(block.number >= 1); + + // Stop mining on HTTP so we don't enqueue extra hashes while switching away. + http_mining_task.abort(); + + // Drain any already-enqueued HTTP hashes to avoid `BlockNotFound` after the HTTP provider is + // dropped and robust-provider routes `get_block_by_hash` to a different backend. + for _ in 0..50 { + if subscription.is_empty() { + break; + } + + // Use an outer timeout so we don't block here if `is_empty()` is stale. + let _ = tokio::time::timeout(Duration::from_millis(200), subscription.recv()).await; + } + + // Now kill HTTP fallback too, and ensure we can fail over to WS fallback. + drop(anvil_http); + let ws2_clone = ws_fallback.clone(); + tokio::spawn(async move { + // Wait long enough for: + // - the HTTP polling recv() to time out + // - fallback switching logic to establish a WS subscription + tokio::time::sleep(Duration::from_millis(2500)).await; + + // Mine repeatedly to avoid racing with WS subscription establishment. + for _ in 0..20 { + if ws2_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }); + + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert!(block.number >= 1); + + Ok(()) +} + +/// Test: HTTP subscription correctly receives multiple consecutive blocks +#[tokio::test] +async fn test_http_subscription_multiple_blocks() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive 5 blocks sequentially + for expected_block in 1..=5 { + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, expected_block, "Block number mismatch"); + } + + Ok(()) +} + +/// Test: HTTP subscription works correctly when converted to a Stream +#[tokio::test] +async fn test_http_subscription_as_stream() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let subscription = robust.subscribe_blocks().await?; + let mut stream = subscription.into_stream(); + + // Mine and receive via stream + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), stream.next()) + .await + .expect("timeout") + .expect("stream ended unexpectedly") + .expect("recv error"); + assert_eq!(block.number, 1); + + // Mine another and receive via stream + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), stream.next()) + .await + .expect("timeout") + .expect("stream ended unexpectedly") + .expect("recv error"); + assert_eq!(block.number, 2); + + Ok(()) +} + +// ============================================================================ +// Failover Tests +// ============================================================================ + +/// Test: When WS primary dies, subscription fails over to HTTP fallback +/// +/// Verification: We confirm failover by checking that after WS death, +/// we still receive blocks (which must come from HTTP since WS is dead) +#[test_log::test(tokio::test)] +async fn test_failover_ws_to_http_on_provider_death() -> anyhow::Result<()> { + let (anvil_ws, ws_provider) = spawn_ws_anvil().await?; + let (_anvil_http, http_provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(ws_provider.clone()) + .fallback(http_provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + // Ensure robust-provider block fetching can fail over within the recv timeout. + .call_timeout(SHORT_TIMEOUT / 2) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Receive initial block from WS + ws_provider.anvil_mine(Some(1), None).await?; + + // mine different number of blocks on fallback + http_provider.anvil_mine(Some(5), None).await?; + + // only primary blocks are received + let block = subscription.recv().await?; + assert_eq!(block.number, 1, "Should receive from WS primary"); + assert!(subscription.is_empty()); + + // Kill WS provider - this will cause subscription to fail + drop(anvil_ws); + + // Spawn task to mine repeatedly on HTTP after timeout triggers failover. + // Mining just once can be flaky if it happens before the HTTP poller is fully established. + let http_clone = http_provider.clone(); + let http_mining_task = tokio::spawn(async move { + // Start mining soon and keep mining long enough to cover the failover window. + // Failover only happens after `subscription_timeout` elapses on the WS backend. + tokio::time::sleep(BUFFER_TIME).await; + + for _ in 0..120 { + if http_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }); + + // Should eventually receive a block - since WS is dead, this MUST be from HTTP + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout - failover may have failed") + .expect("recv error"); + + // We received a block after WS died, proving failover worked + // The block number may be > 5 because the test mines multiple blocks to avoid races. + assert!(block.number >= 5, "Should receive a block from HTTP fallback"); + + http_mining_task.abort(); + + Ok(()) +} + +/// Test: When HTTP primary becomes unavailable, subscription fails over to WS fallback +#[tokio::test] +async fn test_failover_http_to_ws_on_provider_death() -> anyhow::Result<()> { + let (anvil_http, http_provider) = spawn_http_anvil().await?; + let (_anvil_ws, ws_provider) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(http_provider.clone()) + .fallback(ws_provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive from HTTP + http_provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 1, "Should start on HTTP primary"); + + // Kill HTTP provider + drop(anvil_http); + + // Mine on WS shortly after HTTP error is detected. + // The HTTP poll will fail quickly (connection refused), triggering immediate failover to WS. + // We mine after a small delay to ensure WS subscription is established. + let ws_clone = ws_provider.clone(); + tokio::spawn(async move { + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + ws_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should receive from WS fallback (WS also starts at genesis, so block 1 after mining) + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout - failover may have failed") + .expect("recv error"); + + assert_eq!(block.number, 1, "Should receive block from WS fallback"); + + Ok(()) +} + +// ============================================================================ +// Configuration Tests +// ============================================================================ + +/// Test: All-HTTP provider chain works (no WS providers at all) +#[tokio::test] +async fn test_http_only_provider_chain() -> anyhow::Result<()> { + let (_anvil1, http1) = spawn_http_anvil().await?; + let (_anvil2, http2) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(http1.clone()) + .fallback(http2.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive + http1.anvil_mine(Some(1), None).await?; + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + http1.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 2); + + Ok(()) +} + +/// Test: When `allow_http_subscriptions` is false (default), HTTP providers are skipped +/// and subscription uses WS fallback +#[tokio::test] +async fn test_http_subscriptions_disabled_skips_http() -> anyhow::Result<()> { + let (_anvil_http, http_provider) = spawn_http_anvil().await?; + let (_anvil_ws, ws_provider) = spawn_ws_anvil().await?; + + // HTTP primary but http subscriptions NOT enabled (default) + let robust = RobustProviderBuilder::new(http_provider.clone()) + .fallback(ws_provider.clone()) + // allow_http_subscriptions defaults to false + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + // subscribe_blocks should skip HTTP and use WS + let mut subscription = robust.subscribe_blocks().await?; + + // Mine on both - if HTTP was used, we'd get block 0 first + // Since HTTP is skipped, we should only see WS blocks + ws_provider.anvil_mine(Some(1), None).await?; + http_provider.anvil_mine(Some(5), None).await?; // Mine more on HTTP + + let block = subscription.recv().await?; + // WS block 1, not HTTP block 0 or 5 + assert_eq!(block.number, 1, "Should use WS fallback, not HTTP primary"); + + Ok(()) +} + +/// Test: When `allow_http_subscriptions` is false and no WS providers exist, +/// `subscribe_blocks` should fail +#[tokio::test] +async fn test_http_disabled_no_ws_fails() -> anyhow::Result<()> { + let (_anvil, http_provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(http_provider.clone()) + // No fallbacks, HTTP subscriptions disabled + .subscription_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Should fail because no pubsub-capable provider exists + let result = robust.subscribe_blocks().await; + assert!(result.is_err(), "Should fail when no WS providers and HTTP disabled"); + + Ok(()) +} + +/// Test: `poll_interval` configuration is respected +#[tokio::test] +async fn test_poll_interval_is_respected() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let poll_interval = Duration::from_millis(200); + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(poll_interval) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine first block and receive it + provider.anvil_mine(Some(1), None).await?; + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + + // Mine another block + provider.anvil_mine(Some(1), None).await?; + + // Measure how long it takes to receive the next block + let start = std::time::Instant::now(); + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + let elapsed = start.elapsed(); + + // Should take at least half the poll interval + // (being lenient because block might arrive mid-interval) + let min_expected = poll_interval / 2; + assert!( + elapsed >= min_expected, + "Poll interval not respected. Expected >= {min_expected:?}, got {elapsed:?}", + ); + + Ok(()) +} + +// ============================================================================ +// Error Handling Tests +// ============================================================================ + +/// Test: HTTP subscription handles provider errors gracefully +#[tokio::test] +async fn test_http_subscription_survives_temporary_errors() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine blocks - subscription should work + for i in 1..=3 { + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, i); + } + + Ok(()) +} + +/// Test: When all providers fail, subscription returns an error +#[tokio::test] +async fn test_all_providers_fail_returns_error() -> anyhow::Result<()> { + let (anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine and receive a block first + provider.anvil_mine(Some(1), None).await?; + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + + // Kill the only provider + drop(anvil); + + // Next recv should eventually error (after timeout) + let result = tokio::time::timeout(Duration::from_secs(5), subscription.recv()).await; + + match result { + Ok(Ok(_)) => panic!("Should not receive block from dead provider"), + Ok(Err(e)) => { + // Expected - got an error + assert!( + matches!(e, Error::Timeout | Error::RpcError(_)), + "Expected Timeout or RpcError, got {e:?}", + ); + } + Err(_) => { + // Timeout is also acceptable + } + } + + Ok(()) +} + +// ============================================================================ +// Deduplication Tests +// ============================================================================ + +/// Test: HTTP polling correctly deduplicates blocks (same block not emitted twice) +#[tokio::test] +async fn test_http_polling_deduplication() -> anyhow::Result<()> { + let (_anvil, provider) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::new(provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(Duration::from_millis(20)) // Very fast polling + .subscription_timeout(Duration::from_secs(5)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine first block + provider.anvil_mine(Some(1), None).await?; + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 1); + + // Wait for multiple poll cycles without mining + tokio::time::sleep(Duration::from_millis(100)).await; + + // Now mine ONE more block + provider.anvil_mine(Some(1), None).await?; + + // Should receive exactly block 2 (not duplicate of block 1) + let block = tokio::time::timeout(Duration::from_secs(1), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 2, "Should receive block 2, not duplicate of 1"); + + Ok(()) +} + +// ============================================================================ +// Configuration Propagation Tests +// ============================================================================ + +/// Test: `poll_interval` from builder is used when subscription fails over to HTTP +/// +/// This verifies fix for bug where `http_config` used defaults instead of +/// user-configured values when a WebSocket subscription was created first. +#[tokio::test] +async fn test_poll_interval_propagated_from_builder() -> anyhow::Result<()> { + let (anvil_ws, ws_provider) = spawn_ws_anvil().await?; + let (_anvil_http, http_provider) = spawn_http_anvil().await?; + + // Use a distinctive poll interval that's different from the default (12s) + let custom_poll_interval = Duration::from_millis(500); + + let robust = RobustProviderBuilder::fragile(ws_provider.clone()) + .fallback(http_provider.clone()) + .allow_http_subscriptions(true) + .poll_interval(custom_poll_interval) + // Ensure robust-provider block fetching can fail over within the recv timeout. + // Keep this very small so per-block fetching doesn't dominate the poll-interval timing. + .call_timeout(Duration::from_millis(50)) + .subscription_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Start subscription on WebSocket + let mut subscription = robust.subscribe_blocks().await?; + + ws_provider.anvil_mine(Some(1), None).await?; + + http_provider.anvil_mine(Some(5), None).await?; + + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + assert!(subscription.is_empty()); + + // Kill WS to force failover to HTTP + drop(anvil_ws); + + // Mine on HTTP and wait for failover + let http_clone = http_provider.clone(); + let http_mining_task = tokio::spawn(async move { + tokio::time::sleep(BUFFER_TIME).await; + + // Mine long enough to cover the failover window. + for _ in 0..120 { + if http_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }); + + // Should receive block from HTTP fallback + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout waiting for HTTP fallback block") + .expect("recv error"); + + // Verify we got a block (proving failover worked with correct config) + assert!(block.number >= 5); + + http_mining_task.abort(); + + // Now verify the poll interval is being used by timing block reception + // Mine another block and measure how long until we receive it + http_provider.anvil_mine(Some(1), None).await?; + + let start = std::time::Instant::now(); + let _ = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + let elapsed = start.elapsed(); + + // Should take roughly poll_interval to detect the new block + // Allow some margin but it should be much less than the default 12s + assert!( + elapsed < custom_poll_interval + BUFFER_TIME, // multiply to add margin + "Poll interval not respected. Elapsed {elapsed:?}, expected ~{custom_poll_interval:?}", + ); + + Ok(()) +} + +// ============================================================================ +// HTTP Reconnection Validation Tests +// ============================================================================ + +/// Test: HTTP reconnection validates provider is reachable before claiming success +/// +/// This verifies fix for bug where HTTP reconnection didn't validate the provider, +/// potentially "reconnecting" to a dead provider. +#[tokio::test] +async fn test_http_reconnect_validates_provider() -> anyhow::Result<()> { + // Start with HTTP primary (will be killed) and HTTP fallback + let (anvil_primary, primary) = spawn_http_anvil().await?; + let (_anvil_fallback, fallback) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .reconnect_interval(Duration::from_millis(100)) // Fast reconnect for test + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine a block on primary after subscription + primary.anvil_mine(Some(1), None).await?; + + // Get initial block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill primary - subscription should failover to fallback + drop(anvil_primary); + + // Trigger failover by waiting for timeout, then mine on fallback + let fb_clone = fallback.clone(); + tokio::spawn(async move { + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + fb_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should receive from fallback (block 1 on fallback) + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + let fallback_block = block.number; + assert_eq!(fallback_block, 1, "Should receive block 1 from fallback"); + + // Wait for reconnect interval to elapse + tokio::time::sleep(Duration::from_millis(150)).await; + + // Mine another block on fallback - this triggers reconnect attempt + // Since primary is dead, reconnect should FAIL validation and stay on fallback + fallback.anvil_mine(Some(1), None).await?; + + let block = tokio::time::timeout(Duration::from_secs(2), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + + // Should still be on fallback (next block), NOT have "reconnected" to dead primary + assert!( + block.number > fallback_block, + "Should still be on fallback after failed reconnect, got block {}", + block.number + ); + + Ok(()) +} + +/// Test: Timeout-triggered failover cycles through multiple fallbacks correctly +/// +/// When a fallback times out (no blocks received), the subscription should: +/// 1. Try to reconnect to primary (fails if dead) +/// 2. Move to the next fallback +/// 3. Eventually receive blocks from a working fallback +#[tokio::test] +async fn test_timeout_triggered_failover_with_multiple_fallbacks() -> anyhow::Result<()> { + let (anvil_primary, primary) = spawn_http_anvil().await?; + let (anvil_fb1, fallback1) = spawn_http_anvil().await?; + let (_anvil_fb2, fallback2) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback1.clone()) + .fallback(fallback2.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Mine a block on primary after subscription + primary.anvil_mine(Some(1), None).await?; + + // Get initial block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill primary AND fallback1 - only fallback2 will work + drop(anvil_primary); + drop(anvil_fb1); + + // Don't mine on fallback2 immediately - let timeouts trigger failover + // After SHORT_TIMEOUT, primary poll fails -> try fallback1 + // After SHORT_TIMEOUT, fallback1 poll fails -> try fallback2 + // Then mine on fallback2 + let fb2_clone = fallback2.clone(); + tokio::spawn(async move { + // Wait for a timeout cycle plus buffer + tokio::time::sleep(SHORT_TIMEOUT + Duration::from_millis(50)).await; + fb2_clone.anvil_mine(Some(1), None).await.unwrap(); + }); + + // Should eventually receive from fallback2 + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout - failover chain may have failed") + .expect("recv error"); + + // Block should be from fallback2 (block number >= 1) + assert!(block.number >= 1, "Should receive block from fallback2, got {}", block.number); + + Ok(()) +} + +/// Test: Single fallback timeout behavior +/// +/// When there's only one fallback and it times out, after exhausting reconnect +/// attempts, the subscription should return an error (no more providers to try). +#[tokio::test] +async fn test_single_fallback_timeout_exhausts_providers() -> anyhow::Result<()> { + let (anvil_primary, primary) = spawn_http_anvil().await?; + let (anvil_fb, fallback) = spawn_http_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + primary.anvil_mine(Some(1), None).await?; + + // Get initial block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill both providers + drop(anvil_primary); + drop(anvil_fb); + + // Don't mine anything - let it timeout and exhaust providers + let result = tokio::time::timeout(Duration::from_secs(3), subscription.recv()).await; + + #[allow(clippy::match_same_arms)] + match result { + Ok(Err(Error::Timeout)) => { + // Expected: all providers exhausted, returns timeout error + } + Ok(Err(Error::RpcError(_))) => { + // Also acceptable: RPC error from dead providers + } + Ok(Ok(block)) => { + panic!("Should not receive block, got block {}", block.number); + } + Err(_) => { + // Outer timeout - also acceptable, means it's still trying + } + Ok(Err(e)) => { + panic!("Unexpected error type: {e:?}"); + } + } + + Ok(()) +} diff --git a/tests/rpc_failover.rs b/tests/rpc_failover.rs new file mode 100644 index 0000000..48bb988 --- /dev/null +++ b/tests/rpc_failover.rs @@ -0,0 +1,285 @@ +//! Tests for RPC call retry and failover functionality. + +mod common; + +use std::time::{Duration, Instant}; + +use alloy::{ + eips::BlockNumberOrTag, + node_bindings::Anvil, + providers::{Provider, ProviderBuilder, ext::AnvilApi}, + transports::{RpcError, TransportErrorKind}, +}; +use robust_provider::{Error, RobustProviderBuilder}; + +// ============================================================================ +// RPC Failover Tests +// ============================================================================ + +#[tokio::test] +async fn test_rpc_failover_when_primary_dead() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + // Mine different number of blocks on each to distinguish them + primary.anvil_mine(Some(10), None).await?; + fallback.anvil_mine(Some(20), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Verify primary is used initially + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 10); + + // Kill primary + drop(anvil_primary); + + // Should failover to fallback + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 20); + + Ok(()) +} + +#[tokio::test] +async fn test_rpc_cycles_through_multiple_fallbacks() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fb1 = Anvil::new().try_spawn()?; + let anvil_fb2 = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fb1 = ProviderBuilder::new().connect_http(anvil_fb1.endpoint_url()); + let fb2 = ProviderBuilder::new().connect_http(anvil_fb2.endpoint_url()); + + // Mine different blocks to identify each provider + primary.anvil_mine(Some(10), None).await?; + fb1.anvil_mine(Some(20), None).await?; + fb2.anvil_mine(Some(30), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fb1) + .fallback(fb2) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary and first fallback + drop(anvil_primary); + drop(anvil_fb1); + + // Should cycle through to fb2 + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 30); + + Ok(()) +} + +#[tokio::test] +async fn test_rpc_all_providers_fail() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(1)) + .build() + .await?; + + // Kill all providers + drop(anvil_primary); + drop(anvil_fallback); + + // Should fail after trying all providers + let result = robust.get_block_number().await; + assert!(result.is_err()); + + Ok(()) +} + +// ============================================================================ +// Non-Retryable Error Tests +// ============================================================================ + +#[tokio::test] +async fn test_block_not_found_does_not_retry() -> anyhow::Result<()> { + let anvil = Anvil::new().try_spawn()?; + let provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); + + let robust = RobustProviderBuilder::new(provider) + .call_timeout(Duration::from_secs(5)) + .max_retries(3) + .min_delay(Duration::from_millis(100)) + .build() + .await?; + + let start = Instant::now(); + + // Request future block - should be BlockNotFound, not retried + let result = robust.get_block(alloy::eips::BlockId::number(999_999)).await; + + let elapsed = start.elapsed(); + + assert!(matches!(result, Err(Error::BlockNotFound))); + // With retries, this would take 300ms+ due to backoff + assert!(elapsed < Duration::from_millis(200)); + + Ok(()) +} + +// ============================================================================ +// Timeout Tests +// ============================================================================ + +#[tokio::test] +async fn test_operation_completes_when_provider_unavailable() -> anyhow::Result<()> { + // Create and immediately kill provider so endpoint doesn't exist + let anvil = Anvil::new().try_spawn()?; + let endpoint = anvil.endpoint_url(); + drop(anvil); + + let provider = ProviderBuilder::new().connect_http(endpoint); + + let timeout = Duration::from_secs(5); + + let robust = RobustProviderBuilder::fragile(provider).call_timeout(timeout).build().await?; + + let start = Instant::now(); + let result = robust.get_block_number().await; + let elapsed = start.elapsed(); + + // Should fail (connection refused) and not hang + let err = result.expect_err("expected RPC error due to unavailable provider"); + match err { + Error::RpcError(e) => { + let e = e.as_ref(); + match e { + RpcError::Transport(TransportErrorKind::Custom(_)) => {} + other => panic!( + "expected RpcError::Transport(TransportErrorKind::Custom), got {other:?}" + ), + } + } + other => panic!("expected Error::RpcError, got {other:?}"), + } + assert!(elapsed < timeout); + + Ok(()) +} + +// ============================================================================ +// Failover with Different Operations +// ============================================================================ + +#[tokio::test] +async fn test_get_accounts_failover() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary + drop(anvil_primary); + + let accounts = robust.get_accounts().await?; + assert!(!accounts.is_empty()); + + Ok(()) +} + +#[tokio::test] +async fn test_get_balance_failover() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + let accounts = fallback.get_accounts().await?; + let address = accounts[0]; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary + drop(anvil_primary); + + let balance = robust.get_balance(address).await?; + assert!(balance > alloy::primitives::U256::ZERO); + + Ok(()) +} + +#[tokio::test] +async fn test_get_block_failover() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + fallback.anvil_mine(Some(5), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Kill primary + drop(anvil_primary); + + let block = robust.get_block_by_number(BlockNumberOrTag::Number(3)).await?; + assert_eq!(block.header.number, 3); + + Ok(()) +} + +// ============================================================================ +// Primary Provider Preference +// ============================================================================ + +#[tokio::test] +async fn test_primary_provider_tried_first() -> anyhow::Result<()> { + let anvil_primary = Anvil::new().try_spawn()?; + let anvil_fallback = Anvil::new().try_spawn()?; + + let primary = ProviderBuilder::new().connect_http(anvil_primary.endpoint_url()); + let fallback = ProviderBuilder::new().connect_http(anvil_fallback.endpoint_url()); + + primary.anvil_mine(Some(100), None).await?; + fallback.anvil_mine(Some(200), None).await?; + + let robust = RobustProviderBuilder::fragile(primary) + .fallback(fallback) + .call_timeout(Duration::from_secs(2)) + .build() + .await?; + + // Multiple calls should all use primary (it's healthy) + for _ in 0..5 { + let block_num = robust.get_block_number().await?; + assert_eq!(block_num, 100); + } + + Ok(()) +} diff --git a/tests/subscription.rs b/tests/subscription.rs index 64969e7..1f7edf2 100644 --- a/tests/subscription.rs +++ b/tests/subscription.rs @@ -14,8 +14,7 @@ use alloy::{ }; use common::{BUFFER_TIME, RECONNECT_INTERVAL, SHORT_TIMEOUT, spawn_ws_anvil}; use robust_provider::{ - DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, RobustProviderBuilder, RobustSubscriptionStream, - SubscriptionError, + DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, Error, RobustProviderBuilder, RobustSubscriptionStream, }; use tokio::time::sleep; use tokio_stream::StreamExt; @@ -209,11 +208,11 @@ async fn test_stream_continues_streaming_errors() -> anyhow::Result<()> { assert_next_block!(stream, 1); // Trigger timeout error - the stream will continue to stream errors - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); // Without fallbacks, subsequent calls will continue to return errors // (not None, since only Error::Closed terminates the stream) - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -276,7 +275,7 @@ async fn subscription_fails_with_no_fallbacks() -> anyhow::Result<()> { assert_next_block!(stream, 1); // No fallback available - should error after timeout - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -306,8 +305,63 @@ async fn ws_fails_http_fallback_returns_primary_error() -> anyhow::Result<()> { ws_provider.anvil_mine(Some(1), None).await?; assert_next_block!(stream, 2); + // Mine on HTTP fallback to ensure that even if it has blocks, it still cannot serve + // a subscription when the `http-subscription` feature is disabled. + let http_clone = http_provider.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(1) + BUFFER_TIME).await; + http_clone.anvil_mine(Some(5), None).await.unwrap(); + }); + // Verify: HTTP fallback can't provide subscription, so we get an error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); + + Ok(()) +} + +#[tokio::test] +async fn test_mixed_chain_skips_http_until_ws_is_found() -> anyhow::Result<()> { + // Chain: + // - Primary: HTTP (no pubsub) + // - Fallback #1: HTTP (no pubsub) + // - Fallback #2: WS (pubsub) + // With `http-subscription` feature disabled, subscribe_blocks should eventually succeed + // by selecting the WS fallback. + + let anvil_http_primary = Anvil::new().try_spawn()?; + let http_primary = ProviderBuilder::new().connect_http(anvil_http_primary.endpoint_url()); + + let anvil_http_fb = Anvil::new().try_spawn()?; + let http_fallback = ProviderBuilder::new().connect_http(anvil_http_fb.endpoint_url()); + + let (_anvil_ws, ws_fallback) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(http_primary) + .fallback(http_fallback.clone()) + .fallback(ws_fallback.clone()) + .call_timeout(Duration::from_secs(2)) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + // This should succeed by skipping non-pubsub HTTP providers and using the WS fallback. + let mut subscription = robust.subscribe_blocks().await?; + + // Mine blocks on both HTTP providers; if an HTTP provider were incorrectly used for + // subscription, we'd observe those blocks. + http_fallback.anvil_mine(Some(10), None).await?; + let http_primary_for_mining = + ProviderBuilder::new().connect_http(anvil_http_primary.endpoint_url()); + http_primary_for_mining.anvil_mine(Some(20), None).await?; + + // Mine exactly one block on WS fallback and ensure we receive WS block #1. + ws_fallback.anvil_mine(Some(1), None).await?; + let header = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timed out") + .expect("recv error"); + assert_eq!(header.number, 1); + assert!(subscription.is_empty()); Ok(()) } @@ -342,7 +396,7 @@ async fn test_single_fallback_provider() -> anyhow::Result<()> { trigger_failover(&mut stream, fallback.clone(), 1).await?; // FB -> try PP (fails) -> no more fallbacks -> error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -381,7 +435,7 @@ async fn subscription_cycles_through_multiple_fallbacks() -> anyhow::Result<()> assert_next_block!(stream, 2); // FP2 times out -> tries PP (fails) -> no more fallbacks -> error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -423,7 +477,7 @@ async fn test_many_fallback_providers() -> anyhow::Result<()> { trigger_failover_with_delay(&mut stream, fb_4.clone(), 1, SHORT_TIMEOUT).await?; trigger_failover_with_delay(&mut stream, fb_5.clone(), 1, SHORT_TIMEOUT).await?; - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -688,7 +742,7 @@ async fn test_backend_gone_error_propagation() -> anyhow::Result<()> { drop(anvil); // Should get BackendGone or Timeout error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -713,7 +767,7 @@ async fn test_immediate_consecutive_failures() -> anyhow::Result<()> { drop(anvil); // First failure - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -738,7 +792,7 @@ async fn test_subscription_lagged_error() -> anyhow::Result<()> { // First recv should return Lagged error (skipped some blocks) let result = subscription.recv().await; - assert!(matches!(result, Err(SubscriptionError::Lagged(_)))); + assert!(matches!(result, Err(Error::Lagged(_)))); Ok(()) }