From 47ab9c5fd691a5345238673ecb62be8e3e7fb366 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Thu, 26 Feb 2026 15:56:32 +0100 Subject: [PATCH 1/4] refactor --- src/lib.rs | 7 +- src/macros/rpc.rs | 2 - src/robust_provider/builder.rs | 9 +- src/robust_provider/errors.rs | 43 +-- src/robust_provider/http_subscription.rs | 351 ----------------------- src/robust_provider/mod.rs | 16 +- src/robust_provider/provider.rs | 193 ++++--------- src/robust_provider/subscription.rs | 283 ++++++++---------- tests/http_subscription.rs | 8 +- tests/rpc_failover.rs | 24 +- tests/subscription.rs | 23 +- 11 files changed, 239 insertions(+), 720 deletions(-) delete mode 100644 src/robust_provider/http_subscription.rs diff --git a/src/lib.rs b/src/lib.rs index 8dd5611..e1c7349 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -68,11 +68,8 @@ pub use robust_provider::{ CoreError, DEFAULT_CALL_TIMEOUT, DEFAULT_MAX_RETRIES, DEFAULT_MIN_DELAY, DEFAULT_RECONNECT_INTERVAL, DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, DEFAULT_SUBSCRIPTION_TIMEOUT, Error, IntoRobustProvider, IntoRootProvider, RobustProvider, RobustProviderBuilder, - RobustSubscription, RobustSubscriptionStream, SubscriptionError, + RobustSubscription, RobustSubscriptionStream, }; #[cfg(feature = "http-subscription")] -pub use robust_provider::{ - DEFAULT_BUFFER_CAPACITY, DEFAULT_POLL_INTERVAL, HttpPollingSubscription, - HttpSubscriptionConfig, HttpSubscriptionError, -}; +pub use robust_provider::DEFAULT_POLL_INTERVAL; diff --git a/src/macros/rpc.rs b/src/macros/rpc.rs index bf9448c..9123ccd 100644 --- a/src/macros/rpc.rs +++ b/src/macros/rpc.rs @@ -129,7 +129,6 @@ macro_rules! robust_rpc { // Call the provider method with turbofish syntax if generics are present provider.$method $(::<$($generic),+>)? ($($($arg),+)?).await }, - false, // is_subscription = false ) .await; @@ -203,7 +202,6 @@ macro_rules! robust_rpc { provider.$method $(::<$($generic),+>)? ($($arg),+).await } }, - false, // is_subscription = false ) .await; diff --git a/src/robust_provider/builder.rs b/src/robust_provider/builder.rs index 4944ece..1958562 100644 --- a/src/robust_provider/builder.rs +++ b/src/robust_provider/builder.rs @@ -6,9 +6,6 @@ use crate::robust_provider::{ Error, IntoRootProvider, RobustProvider, subscription::DEFAULT_RECONNECT_INTERVAL, }; -#[cfg(feature = "http-subscription")] -use crate::robust_provider::http_subscription::DEFAULT_POLL_INTERVAL; - type BoxedProviderFuture = Pin, Error>> + Send>>; // RPC retry and timeout settings @@ -22,6 +19,12 @@ pub const DEFAULT_MAX_RETRIES: usize = 3; pub const DEFAULT_MIN_DELAY: Duration = Duration::from_secs(1); /// Default subscription channel size. pub const DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY: usize = 128; +/// Default polling interval for HTTP subscriptions. +/// +/// Set to 12 seconds to match approximate Ethereum mainnet block time. +/// Adjust based on the target chain's block time. +#[cfg(feature = "http-subscription")] +pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(12); /// Builder for constructing a [`RobustProvider`]. /// diff --git a/src/robust_provider/errors.rs b/src/robust_provider/errors.rs index 57dff92..7af857b 100644 --- a/src/robust_provider/errors.rs +++ b/src/robust_provider/errors.rs @@ -17,9 +17,7 @@ use std::sync::Arc; use alloy::transports::{RpcError, TransportErrorKind}; use thiserror::Error; -use tokio::time::error as TokioError; - -use super::subscription; +use tokio::{sync::broadcast::error::RecvError, time::error as TokioError}; /// Errors that can occur when using [`super::RobustProvider`]. #[derive(Error, Debug, Clone)] @@ -43,6 +41,13 @@ pub enum Error { /// [`Error::RpcError`]. #[error("Block not found")] BlockNotFound, + + /// The subscription channel was closed. + #[error("Subscription channel closed")] + Closed, + + #[error("Subscription lagged behind by: {0}")] + Lagged(u64), } /// Low-level error related to RPC calls and failover logic. @@ -98,13 +103,11 @@ impl From for Error { } } -impl From for Error { - fn from(err: subscription::Error) -> Self { +impl From for Error { + fn from(err: RecvError) -> Self { match err { - subscription::Error::RpcError(e) => Error::RpcError(e), - subscription::Error::Timeout - | subscription::Error::Closed - | subscription::Error::Lagged(_) => Error::Timeout, + RecvError::Closed => Error::Closed, + RecvError::Lagged(count) => Error::Lagged(count), } } } @@ -120,9 +123,9 @@ pub(crate) fn is_retryable_error(code: i64, message: &str) -> bool { } pub(crate) fn is_block_not_found(code: i64, message: &str) -> bool { - geth::is_block_not_found(code, message) - || besu::is_block_not_found(code, message) - || anvil::is_block_not_found(code, message) + geth::is_block_not_found(code, message) || + besu::is_block_not_found(code, message) || + anvil::is_block_not_found(code, message) } pub(crate) fn is_invalid_log_filter(code: i64, message: &str) -> bool { @@ -173,14 +176,14 @@ mod geth { ( DEFAULT_ERROR_CODE, // https://github.com/ethereum/go-ethereum/blob/ef815c59a207d50668afb343811ed7ff02cc640b/eth/filters/api.go#L39-L46 - "invalid block range params" - | "block range extends beyond current head block" - | "can't specify fromBlock/toBlock with blockHash" - | "pending logs are not supported" - | "unknown block" - | "exceed max topics" - | "exceed max addresses or topics per search position" - | "filter not found" + "invalid block range params" | + "block range extends beyond current head block" | + "can't specify fromBlock/toBlock with blockHash" | + "pending logs are not supported" | + "unknown block" | + "exceed max topics" | + "exceed max addresses or topics per search position" | + "filter not found" ) ) } diff --git a/src/robust_provider/http_subscription.rs b/src/robust_provider/http_subscription.rs deleted file mode 100644 index b6553a8..0000000 --- a/src/robust_provider/http_subscription.rs +++ /dev/null @@ -1,351 +0,0 @@ -//! HTTP-based polling subscription for providers without pubsub support. -//! -//! This module provides a polling-based alternative to WebSocket subscriptions, -//! allowing HTTP providers to participate in block subscriptions by periodically -//! polling for new blocks. -//! -//! # Feature Flag -//! -//! This module requires the `http-subscription` feature: -//! -//! ```toml -//! robust-provider = { version = "0.2", features = ["http-subscription"] } -//! ``` -//! -//! # Example -//! -//! ```rust,no_run -//! use alloy::providers::ProviderBuilder; -//! use robust_provider::RobustProviderBuilder; -//! use std::time::Duration; -//! -//! # async fn example() -> anyhow::Result<()> { -//! let http = ProviderBuilder::new().connect_http("http://localhost:8545")?; -//! -//! let robust = RobustProviderBuilder::new(http) -//! .allow_http_subscriptions(true) -//! .poll_interval(Duration::from_secs(12)) -//! .build() -//! .await?; -//! -//! let mut subscription = robust.subscribe_blocks().await?; -//! while let Ok(block) = subscription.recv().await { -//! println!("New block: {}", block.number); -//! } -//! # Ok(()) } -//! ``` - -use std::{sync::Arc, time::Duration}; - -use crate::RobustProvider; -use alloy::{ - network::{BlockResponse, Network}, - primitives::BlockHash, - transports::{RpcError, TransportErrorKind}, -}; -use futures_util::{StreamExt, stream}; -use tokio::sync::mpsc; - -/// Default polling interval for HTTP subscriptions. -/// -/// Set to 12 seconds to match approximate Ethereum mainnet block time. -/// Adjust based on the target chain's block time. -pub const DEFAULT_POLL_INTERVAL: Duration = Duration::from_secs(12); - -/// Default timeout for individual RPC calls during HTTP polling. -pub const DEFAULT_CALL_TIMEOUT: Duration = Duration::from_secs(30); - -/// Default buffer capacity for the internal subscription channel. -pub const DEFAULT_BUFFER_CAPACITY: usize = 128; - -/// Errors specific to HTTP polling subscriptions. -#[derive(Debug, Clone, thiserror::Error)] -pub enum Error { - /// Polling operation exceeded the configured timeout. - #[error("Polling operation timed out")] - Timeout, - - /// An RPC error occurred during polling. - #[error("RPC error during polling: {0}")] - RpcError(Arc>), - - /// The subscription channel was closed. - #[error("Subscription channel closed")] - Closed, - - /// Failed to fetch block from the provider. - #[error("Block fetch failed")] - BlockNotFound, -} - -impl From> for Error { - fn from(err: RpcError) -> Self { - Error::RpcError(Arc::new(err)) - } -} - -impl From for Error { - fn from(err: crate::Error) -> Self { - match err { - crate::Error::Timeout => Error::Timeout, - crate::Error::BlockNotFound => Error::BlockNotFound, - crate::Error::RpcError(rpc_err) => Error::RpcError(rpc_err), - } - } -} - -/// Configuration for HTTP polling subscriptions. -#[derive(Debug, Clone)] -pub struct HttpSubscriptionConfig { - /// Interval between polling requests. - /// - /// Default: [`DEFAULT_POLL_INTERVAL`] (12 seconds) - pub poll_interval: Duration, - - /// Timeout for individual RPC calls. - /// - /// Default: [`DEFAULT_CALL_TIMEOUT`] (30 seconds) - pub call_timeout: Duration, - - /// Buffer size for the internal channel. - /// - /// Default: [`DEFAULT_BUFFER_CAPACITY`] - pub buffer_capacity: usize, -} - -impl Default for HttpSubscriptionConfig { - fn default() -> Self { - Self { - poll_interval: DEFAULT_POLL_INTERVAL, - call_timeout: DEFAULT_CALL_TIMEOUT, - buffer_capacity: DEFAULT_BUFFER_CAPACITY, - } - } -} - -/// HTTP-based polling subscription that emulates WebSocket subscriptions -/// by polling for new blocks at regular intervals. -/// -/// This struct provides a similar interface to native WebSocket subscriptions, -/// allowing HTTP providers to participate in the subscription system. -/// -/// # How It Works -/// -/// Uses alloy's [`watch_blocks()`](alloy::providers::Provider::watch_blocks), which: -/// 1. Creates a block filter via `eth_newBlockFilter` -/// 2. Polls `eth_getFilterChanges` at `poll_interval` to get new block hashes -/// 3. Fetches full block headers for each hash -/// -/// # Trade-offs -/// -/// * **Latency**: New blocks are detected with up to `poll_interval` delay -/// * **RPC Load**: One filter poll per interval, plus one `get_block_by_hash` per new block -pub struct HttpPollingSubscription { - /// Receiver for block hashes from the poller - receiver: mpsc::Receiver, - /// Provider used to fetch block headers from hashes - provider: RobustProvider, -} - -impl HttpPollingSubscription -where - N::HeaderResponse: Clone + Send, -{ - /// Create a new HTTP polling subscription. - /// - /// Sets up a block filter and returns a subscription that polls for new blocks. - /// - /// # Arguments - /// - /// * `provider` - The HTTP provider to poll - /// * `config` - Configuration for polling behavior - /// - /// # Errors - /// - /// Returns an error if the block filter cannot be created. - /// - /// # Example - /// - /// ```rust,no_run - /// use alloy::providers::ProviderBuilder; - /// use robust_provider::{ - /// RobustProvider, RobustProviderBuilder, - /// robust_provider::http_subscription::{HttpPollingSubscription, HttpSubscriptionConfig}, - /// }; - /// use std::time::Duration; - /// - /// # async fn example() -> anyhow::Result<()> { - /// let http = ProviderBuilder::new().connect_http("http://localhost:8545".parse()?)?; - /// let provider = RobustProviderBuilder::new(http).build().await?; - /// let config = - /// HttpSubscriptionConfig { poll_interval: Duration::from_secs(6), ..Default::default() }; - /// let mut sub = HttpPollingSubscription::new(provider, config).await?; - /// # Ok(()) - /// # } - /// ``` - pub async fn new( - provider: RobustProvider, - config: HttpSubscriptionConfig, - ) -> Result { - let poller = provider - .watch_blocks() - .await? - .with_poll_interval(config.poll_interval) - .with_channel_size(config.buffer_capacity); - - let (sender, receiver) = mpsc::channel(config.buffer_capacity); - - // Spawn a task to forward block hashes to the channel - let stream = poller.into_stream().flat_map(stream::iter); - tokio::spawn(async move { - let mut stream = std::pin::pin!(stream); - while let Some(hash) = stream.next().await { - if sender.send(hash).await.is_err() { - // Receiver dropped, stop polling - break; - } - } - }); - - Ok(Self { receiver, provider }) - } - - /// Receive the next block header. - /// - /// This will block until a new block is available or an error occurs. - /// - /// # Errors - /// - /// * [`Error::Closed`] - if the subscription channel is closed. - /// * [`Error::Timeout`] - if the polling operation times out. - /// * [`Error::RpcError`] - if an RPC error occurs during polling. - /// * [`Error::BlockNotFound`] - if the block fetch fails. - pub async fn recv(&mut self) -> Result { - let block_hash = self.receiver.recv().await.ok_or(Error::Closed)?; - - let block = self.provider.get_block_by_hash(block_hash).await.map_err(|e| match e { - crate::Error::Timeout => Error::Timeout, - crate::Error::BlockNotFound => Error::BlockNotFound, - crate::Error::RpcError(rpc_err) => Error::RpcError(rpc_err), - })?; - Ok(block.header().clone()) - } - - /// Check if the subscription channel is empty (no pending messages). - #[must_use] - pub fn is_empty(&self) -> bool { - self.receiver.is_empty() - } -} - -impl std::fmt::Debug for HttpPollingSubscription { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("HttpPollingSubscription") - .field("stream", &"") - .field("provider", &"") - .finish() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::RobustProviderBuilder; - use alloy::{ - consensus::BlockHeader, - node_bindings::Anvil, - providers::{ProviderBuilder, ext::AnvilApi}, - }; - use std::time::Duration; - - #[tokio::test] - async fn test_http_polling_config_defaults() { - let config = HttpSubscriptionConfig::default(); - assert_eq!(config.poll_interval, DEFAULT_POLL_INTERVAL); - assert_eq!(config.call_timeout, DEFAULT_CALL_TIMEOUT); - assert_eq!(config.buffer_capacity, DEFAULT_BUFFER_CAPACITY); - } - - #[tokio::test] - async fn test_http_polling_receives_new_block() -> anyhow::Result<()> { - let anvil = Anvil::new().try_spawn()?; - let root_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); - let provider = RobustProviderBuilder::new(root_provider.clone()).build().await?; - - let config = HttpSubscriptionConfig { - poll_interval: Duration::from_millis(50), - call_timeout: Duration::from_secs(5), - buffer_capacity: 16, - }; - - let mut sub = HttpPollingSubscription::new(provider, config).await?; - - // Mine a block - root_provider.anvil_mine(Some(1), None).await?; - - // Should receive the newly mined block - let result = tokio::time::timeout(Duration::from_secs(2), sub.recv()).await; - assert!(result.is_ok(), "Should receive new block within timeout"); - let block = result.unwrap()?; - assert_eq!(block.number(), 1, "Should receive block 1"); - - Ok(()) - } - - #[tokio::test] - async fn test_http_polling_receives_new_blocks() -> anyhow::Result<()> { - let anvil = Anvil::new().try_spawn()?; - let root_provider = ProviderBuilder::new().connect_http(anvil.endpoint_url()); - let provider = RobustProviderBuilder::new(root_provider.clone()).build().await?; - - let config = HttpSubscriptionConfig { - poll_interval: Duration::from_millis(50), - call_timeout: Duration::from_secs(5), - buffer_capacity: 16, - }; - - let mut sub = HttpPollingSubscription::new(provider, config).await?; - - // Mine a new block - root_provider.anvil_mine(Some(1), None).await?; - - // Should receive block 1 - let block = tokio::time::timeout(Duration::from_secs(2), sub.recv()) - .await - .expect("timeout waiting for block 1") - .expect("recv error on block 1"); - assert_eq!(block.number(), 1); - - // Mine another block - root_provider.anvil_mine(Some(1), None).await?; - - // Should receive block 2 - let block = tokio::time::timeout(Duration::from_secs(2), sub.recv()) - .await - .expect("timeout waiting for block 2") - .expect("recv error on block 2"); - assert_eq!(block.number(), 2); - - Ok(()) - } - - #[tokio::test] - async fn test_http_subscription_error_types() { - // Test Timeout error - let timeout_err = Error::Timeout; - assert!(matches!(timeout_err, Error::Timeout)); - - // Test RpcError conversion - let rpc_err: RpcError = TransportErrorKind::custom_str("test error"); - let sub_err: Error = rpc_err.into(); - assert!(matches!(sub_err, Error::RpcError(_))); - - // Test Closed error - let closed_err = Error::Closed; - assert!(matches!(closed_err, Error::Closed)); - - // Test BlockFetchFailed error - let fetch_err = Error::BlockNotFound; - assert!(matches!(fetch_err, Error::BlockNotFound)); - } -} diff --git a/src/robust_provider/mod.rs b/src/robust_provider/mod.rs index 6205223..2558f1b 100644 --- a/src/robust_provider/mod.rs +++ b/src/robust_provider/mod.rs @@ -16,27 +16,17 @@ //! //! # Feature Flags //! -//! * `http-subscription` - Enable HTTP-based polling subscriptions for providers without -//! native pubsub support +//! * `http-subscription` - Enable HTTP-based polling subscriptions for providers without native +//! pubsub support mod builder; mod errors; -#[cfg(feature = "http-subscription")] -mod http_subscription; mod provider; mod provider_conversion; mod subscription; pub use builder::*; pub use errors::{CoreError, Error}; -#[cfg(feature = "http-subscription")] -pub use http_subscription::{ - DEFAULT_BUFFER_CAPACITY, DEFAULT_CALL_TIMEOUT, DEFAULT_POLL_INTERVAL, - Error as HttpSubscriptionError, HttpPollingSubscription, HttpSubscriptionConfig, -}; pub use provider::RobustProvider; pub use provider_conversion::{IntoRobustProvider, IntoRootProvider}; -pub use subscription::{ - DEFAULT_RECONNECT_INTERVAL, Error as SubscriptionError, RobustSubscription, - RobustSubscriptionStream, -}; +pub use subscription::{DEFAULT_RECONNECT_INTERVAL, RobustSubscription, RobustSubscriptionStream}; diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index 44ea5e2..50e78b2 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -34,10 +34,10 @@ use alloy::{ transports::{RpcError, TransportErrorKind}, }; -use crate::{Error, block_not_found_doc, robust_provider::RobustSubscription}; - -#[cfg(feature = "http-subscription")] -use crate::robust_provider::http_subscription::{HttpPollingSubscription, HttpSubscriptionConfig}; +use crate::{ + Error, block_not_found_doc, + robust_provider::{RobustSubscription, subscription::SubscriptionBackend}, +}; /// Provider wrapper with built-in retry and timeout mechanisms. /// @@ -499,97 +499,37 @@ impl RobustProvider { /// * [`Error::Timeout`] - if the overall operation timeout elapses (i.e. exceeds /// `call_timeout`). pub async fn subscribe_blocks(&self) -> Result, Error> { - #[cfg(feature = "http-subscription")] - { - let primary_supports_pubsub = self.primary().client().pubsub_frontend().is_some(); - if primary_supports_pubsub { - return self.subscribe_blocks_ws().await; - } - return self.subscribe_blocks_http().await; - } - - #[cfg(not(feature = "http-subscription"))] - self.subscribe_blocks_ws().await - } - - /// Subscribe to new block headers using WebSocket with failover. - async fn subscribe_blocks_ws(&self) -> Result, Error> { - let subscription = self - .try_operation_with_failover( - move |provider| async move { - provider - .subscribe_blocks() - .channel_size(self.subscription_buffer_capacity) - .await - }, - true, // require_pubsub - ) + let subscription: SubscriptionBackend = self + .try_operation_with_failover(move |provider| async move { + #[cfg(feature = "http-subscription")] + { + let not_pubsub = provider.client().pubsub_frontend().is_none(); + if not_pubsub && self.allow_http_subscriptions { + return provider.watch_blocks().await.map(|builder| { + builder + .with_poll_interval(self.poll_interval) + .with_channel_size(self.subscription_buffer_capacity) + .into() + }); + } + } + // Non-pubsub providers will properly trigger fallback logic without retries because + // they return an appropriate RPC error, see the match logic in + // `try_provider_with_timeout`. + provider + .subscribe_blocks() + .channel_size(self.subscription_buffer_capacity) + .await + .map(Into::>::into) + }) .await?; Ok(RobustSubscription::new(subscription, self.clone())) } + // TODO: set watch blocks params from the RP itself robust_rpc!(fn watch_blocks() -> PollerBuilder<(U256,), Vec>); - /// Subscribe to new block headers using HTTP polling. - /// Falls back to WebSocket if HTTP polling fails. - #[cfg(feature = "http-subscription")] - async fn subscribe_blocks_http(&self) -> Result, Error> { - use crate::robust_provider::http_subscription::Error as HttpSubscriptionError; - - if !self.allow_http_subscriptions { - return self.subscribe_blocks_ws().await; - } - - let config = HttpSubscriptionConfig { - poll_interval: self.poll_interval, - call_timeout: self.call_timeout, - buffer_capacity: self.subscription_buffer_capacity, - }; - - info!( - poll_interval_ms = self.poll_interval.as_millis(), - "Starting HTTP polling subscription on primary provider" - ); - - // Try HTTP polling on primary first - let http_sub_result = HttpPollingSubscription::new(self.clone(), config.clone()).await; - - if let Ok(http_sub) = http_sub_result { - return Ok(RobustSubscription::new_http(http_sub, self.clone(), config)); - } - - // Track the last error for proper error reporting - let last_error: Option = http_sub_result.err(); - - warn!("HTTP subscription on primary failed, trying fallback providers"); - - // Primary HTTP subscription failed, try WebSocket on fallback providers - for (fallback_idx, provider) in self.fallback_providers().iter().enumerate() { - // Try WebSocket subscription if supported - if provider.client().pubsub_frontend().is_some() { - let operation = move |p: RootProvider| async move { - p.subscribe_blocks().channel_size(self.subscription_buffer_capacity).await - }; - - if let Ok(sub) = self.try_provider_with_timeout(provider, &operation).await { - info!( - fallback_index = fallback_idx, - "Subscription switched to fallback provider (WebSocket)" - ); - return Ok(RobustSubscription::new(sub, self.clone())); - } - } - } - - // All providers exhausted - return the actual error instead of generic Timeout - Err(match last_error { - Some(HttpSubscriptionError::RpcError(e)) => Error::RpcError(e), - Some(HttpSubscriptionError::Timeout) | None => Error::Timeout, - Some(e) => Error::RpcError(std::sync::Arc::new(RpcError::LocalUsageError(Box::new(e)))), - }) - } - /// Execute `operation` with exponential backoff and a total timeout. /// /// Wraps the retry logic with [`tokio::time::timeout`] so @@ -610,7 +550,6 @@ impl RobustProvider { pub async fn try_operation_with_failover( &self, operation: F, - require_pubsub: bool, ) -> Result where F: Fn(RootProvider) -> Fut, @@ -621,7 +560,7 @@ impl RobustProvider { match self.try_provider_with_timeout(primary, &operation).await { Ok(value) => Ok(value), Err(last_error) => self - .try_fallback_providers_from(&operation, require_pubsub, last_error, 0) + .try_fallback_providers_from(&operation, last_error, 0) .await .map(|(value, _)| value), } @@ -631,7 +570,6 @@ impl RobustProvider { pub(crate) async fn try_fallback_providers_from( &self, operation: F, - require_pubsub: bool, mut last_error: CoreError, start_index: usize, ) -> Result<(T, usize), CoreError> @@ -644,20 +582,11 @@ impl RobustProvider { debug!( start_index = start_index, total_fallbacks = fallback_providers.len(), - require_pubsub = require_pubsub, "Primary provider failed, attempting fallback providers" ); let fallback_iter = fallback_providers.iter().enumerate().skip(start_index); for (fallback_idx, provider) in fallback_iter { - if require_pubsub && !Self::supports_pubsub(provider) { - debug!( - provider_index = fallback_idx, - "Skipping fallback provider: pubsub not supported" - ); - continue; - } - trace!( fallback_index = fallback_idx, total_fallbacks = fallback_providers.len(), @@ -731,12 +660,6 @@ impl RobustProvider { .map_err(CoreError::from) } } - - /// Check if a provider supports pubsub - #[must_use] - fn supports_pubsub(provider: &RootProvider) -> bool { - provider.client().pubsub_frontend().is_some() - } } #[cfg(test)] @@ -780,14 +703,11 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .try_operation_with_failover( - |_| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - Ok(count) - }, - false, - ) + .try_operation_with_failover(|_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + Ok(count) + }) .await; assert!(matches!(result, Ok(1))); @@ -800,18 +720,15 @@ mod tests { let call_count = AtomicUsize::new(0); let result = provider - .try_operation_with_failover( - |_| async { - call_count.fetch_add(1, Ordering::SeqCst); - let count = call_count.load(Ordering::SeqCst); - match count { - 3 => Ok(count), - // retriable error - _ => Err(TransportErrorKind::custom_str("429 Too Many Requests")), - } - }, - false, - ) + .try_operation_with_failover(|_| async { + call_count.fetch_add(1, Ordering::SeqCst); + let count = call_count.load(Ordering::SeqCst); + match count { + 3 => Ok(count), + // retriable error + _ => Err(TransportErrorKind::custom_str("429 Too Many Requests")), + } + }) .await; assert!(matches!(result, Ok(3))); @@ -824,14 +741,11 @@ mod tests { let call_count = AtomicUsize::new(0); let result: Result<(), CoreError> = provider - .try_operation_with_failover( - |_| async { - call_count.fetch_add(1, Ordering::SeqCst); - // retriable error - Err(TransportErrorKind::custom_str("429 Too Many Requests")) - }, - false, - ) + .try_operation_with_failover(|_| async { + call_count.fetch_add(1, Ordering::SeqCst); + // retriable error + Err(TransportErrorKind::custom_str("429 Too Many Requests")) + }) .await; assert!(matches!(result, Err(CoreError::RpcError(_)))); @@ -844,13 +758,10 @@ mod tests { let provider = test_provider(call_timeout, 10, 1); let result = provider - .try_operation_with_failover( - move |_provider| async move { - sleep(Duration::from_millis(call_timeout + 10)).await; - Ok(42) - }, - false, - ) + .try_operation_with_failover(move |_provider| async move { + sleep(Duration::from_millis(call_timeout + 10)).await; + Ok(42) + }) .await; assert!(matches!(result, Err(CoreError::Timeout))); diff --git a/src/robust_provider/subscription.rs b/src/robust_provider/subscription.rs index 1c73f38..4a0e7c4 100644 --- a/src/robust_provider/subscription.rs +++ b/src/robust_provider/subscription.rs @@ -1,6 +1,5 @@ use std::{ pin::Pin, - sync::Arc, task::{Context, Poll, ready}, time::{Duration, Instant}, }; @@ -9,74 +8,26 @@ use alloy::{ network::Network, providers::{Provider, RootProvider}, pubsub::Subscription, - transports::{RpcError, TransportErrorKind}, }; -use thiserror::Error; -use tokio::{sync::broadcast::error::RecvError, time::timeout}; +#[cfg(feature = "http-subscription")] +use alloy::{ + primitives::{BlockHash, U256}, + rpc::client::PollerBuilder, +}; +#[cfg(feature = "http-subscription")] +use tokio::sync::mpsc; +use tokio::time::timeout; use tokio_stream::Stream; use tokio_util::sync::ReusableBoxFuture; -use crate::robust_provider::{CoreError, RobustProvider}; - -#[cfg(feature = "http-subscription")] -use crate::robust_provider::http_subscription::{ - Error as HttpSubscriptionError, HttpPollingSubscription, HttpSubscriptionConfig, +use crate::{ + Error, + robust_provider::{CoreError, RobustProvider}, }; -/// Errors that can occur when using [`RobustSubscription`]. -#[derive(Error, Debug, Clone)] -pub enum Error { - #[error("Operation timed out")] - Timeout, - #[error("RPC call failed after exhausting all retry attempts: {0}")] - RpcError(Arc>), - #[error("Subscription closed")] - Closed, - #[error("Subscription lagged behind by: {0}")] - Lagged(u64), -} - -impl From for Error { - fn from(err: CoreError) -> Self { - match err { - CoreError::Timeout => Error::Timeout, - CoreError::RpcError(e) => Error::RpcError(Arc::new(e)), - } - } -} - -impl From for Error { - fn from(err: RecvError) -> Self { - match err { - RecvError::Closed => Error::Closed, - RecvError::Lagged(count) => Error::Lagged(count), - } - } -} - -impl From for Error { - fn from(_: tokio::time::error::Elapsed) -> Self { - Error::Timeout - } -} - -#[cfg(feature = "http-subscription")] -impl From for Error { - fn from(err: HttpSubscriptionError) -> Self { - match err { - HttpSubscriptionError::Timeout => Error::Timeout, - HttpSubscriptionError::RpcError(e) => Error::RpcError(e), - HttpSubscriptionError::Closed | HttpSubscriptionError::BlockNotFound => Error::Closed, - } - } -} - /// Default time interval between primary provider reconnection attempts pub const DEFAULT_RECONNECT_INTERVAL: Duration = Duration::from_secs(30); -/// Timeout for validating HTTP provider reachability during reconnection -pub const HTTP_RECONNECT_VALIDATION_TIMEOUT: Duration = Duration::from_millis(150); - /// Backend for subscriptions - either native WebSocket or HTTP polling. /// /// This enum allows `RobustSubscription` to transparently handle both @@ -87,7 +38,36 @@ pub(crate) enum SubscriptionBackend { WebSocket(Subscription), /// HTTP polling-based subscription (requires `http-subscription` feature) #[cfg(feature = "http-subscription")] - HttpPolling(HttpPollingSubscription), + HttpPolling(mpsc::Receiver), +} + +impl From> for SubscriptionBackend { + fn from(value: Subscription) -> Self { + SubscriptionBackend::WebSocket(value) + } +} + +#[cfg(feature = "http-subscription")] +impl From>> for SubscriptionBackend { + fn from(value: PollerBuilder<(U256,), Vec>) -> Self { + use futures_util::{StreamExt, stream}; + + let (sender, receiver) = mpsc::channel(value.channel_size()); + + // Spawn a task to forward block hashes to the channel + let stream = value.into_stream().flat_map(stream::iter); + tokio::spawn(async move { + let mut stream = std::pin::pin!(stream); + while let Some(hash) = stream.next().await { + if sender.send(hash).await.is_err() { + // Receiver dropped, stop polling + break; + } + } + }); + + SubscriptionBackend::HttpPolling(receiver) + } } /// A robust subscription wrapper that automatically handles provider failover @@ -98,47 +78,19 @@ pub struct RobustSubscription { robust_provider: RobustProvider, last_reconnect_attempt: Option, current_fallback_index: Option, - /// Configuration for HTTP polling (stored for failover to HTTP providers) - #[cfg(feature = "http-subscription")] - http_config: HttpSubscriptionConfig, } impl RobustSubscription { /// Create a new [`RobustSubscription`] with a WebSocket backend. pub(crate) fn new( - subscription: Subscription, + subscription: impl Into>, robust_provider: RobustProvider, ) -> Self { - #[cfg(feature = "http-subscription")] - let http_config = HttpSubscriptionConfig { - poll_interval: robust_provider.poll_interval, - call_timeout: robust_provider.call_timeout, - buffer_capacity: robust_provider.subscription_buffer_capacity, - }; - Self { - backend: SubscriptionBackend::WebSocket(subscription), + backend: subscription.into(), robust_provider, last_reconnect_attempt: None, current_fallback_index: None, - #[cfg(feature = "http-subscription")] - http_config, - } - } - - /// Create a new [`RobustSubscription`] with an HTTP polling backend. - #[cfg(feature = "http-subscription")] - pub(crate) fn new_http( - subscription: HttpPollingSubscription, - robust_provider: RobustProvider, - config: HttpSubscriptionConfig, - ) -> Self { - Self { - backend: SubscriptionBackend::HttpPolling(subscription), - robust_provider, - last_reconnect_attempt: None, - current_fallback_index: None, - http_config: config, } } @@ -179,9 +131,23 @@ impl RobustSubscription { } #[cfg(feature = "http-subscription")] SubscriptionBackend::HttpPolling(sub) => { - match timeout(subscription_timeout, sub.recv()).await { - Ok(Ok(header)) => Ok(header), - Ok(Err(e)) => Err(Error::from(e)), + let result = timeout(subscription_timeout, sub.recv()).await; + match result { + Ok(Some(hash)) => { + use alloy::network::BlockResponse; + + match timeout( + subscription_timeout, + self.robust_provider.get_block_by_hash(hash), + ) + .await + { + Ok(Ok(block)) => Ok(block.header().clone()), + Ok(Err(e)) => Err(e), + Err(_elapsed) => Err(Error::Timeout), + } + } + Ok(None) => Err(Error::Closed), Err(_elapsed) => Err(Error::Timeout), } } @@ -204,6 +170,7 @@ impl RobustSubscription { // Propagate these errors directly without failover Err(Error::Closed) => return Err(Error::Closed), Err(Error::Lagged(count)) => return Err(Error::Lagged(count)), + Err(Error::BlockNotFound) => return Err(Error::BlockNotFound), // RPC errors trigger failover Err(Error::RpcError(_e)) => { warn!("Subscription RPC error, switching provider"); @@ -217,8 +184,8 @@ impl RobustSubscription { /// Returns true if reconnection was successful, false if it's not time yet or if it failed. async fn try_reconnect_to_primary(&mut self, force: bool) -> bool { // Check if we should attempt reconnection - let should_reconnect = force - || match self.last_reconnect_attempt { + let should_reconnect = force || + match self.last_reconnect_attempt { None => false, Some(last_attempt) => { last_attempt.elapsed() >= self.robust_provider.reconnect_interval @@ -230,44 +197,41 @@ impl RobustSubscription { } let primary = self.robust_provider.primary(); - - // Try WebSocket subscription first if supported - if Self::supports_pubsub(primary) { - let operation = - move |provider: RootProvider| async move { provider.subscribe_blocks().await }; - - let subscription = - self.robust_provider.try_provider_with_timeout(primary, &operation).await; - - if let Ok(sub) = subscription { - info!("Reconnected to primary provider (WebSocket)"); - self.backend = SubscriptionBackend::WebSocket(sub); - self.current_fallback_index = None; - self.last_reconnect_attempt = None; - return true; - } - } - - // Try HTTP polling if enabled and WebSocket not available/failed + let subscription_buffer_capacity = self.robust_provider.subscription_buffer_capacity; #[cfg(feature = "http-subscription")] - if self.robust_provider.allow_http_subscriptions { - let validation = - tokio::time::timeout(HTTP_RECONNECT_VALIDATION_TIMEOUT, primary.get_block_number()) - .await; - - if matches!(validation, Ok(Ok(_))) - && let Ok(http_sub) = HttpPollingSubscription::new( - self.robust_provider.clone(), - self.http_config.clone(), - ) - .await + let poll_interval = self.robust_provider.poll_interval; + #[cfg(feature = "http-subscription")] + let allow_http_subscriptions = self.robust_provider.allow_http_subscriptions; + + let operation = move |provider: RootProvider| async move { + #[cfg(feature = "http-subscription")] { - info!("Reconnected to primary provider (HTTP polling)"); - self.backend = SubscriptionBackend::HttpPolling(http_sub); - self.current_fallback_index = None; - self.last_reconnect_attempt = None; - return true; + let not_pubsub = provider.client().pubsub_frontend().is_none(); + if not_pubsub && allow_http_subscriptions { + return provider.watch_blocks().await.map(|builder| { + builder + .with_poll_interval(poll_interval) + .with_channel_size(subscription_buffer_capacity) + .into() + }); + } } + provider + .subscribe_blocks() + .channel_size(subscription_buffer_capacity) + .await + .map(Into::>::into) + }; + + let subscription = + self.robust_provider.try_provider_with_timeout(primary, &operation).await; + + if let Ok(backend) = subscription { + info!("Reconnected to primary provider"); + self.backend = backend; + self.current_fallback_index = None; + self.last_reconnect_attempt = None; + return true; } self.last_reconnect_attempt = Some(Instant::now()); @@ -287,40 +251,38 @@ impl RobustSubscription { // Start searching from the next provider after the current one let start_index = self.current_fallback_index.map_or(0, |idx| idx + 1); let fallback_providers = self.robust_provider.fallback_providers(); + let subscription_buffer_capacity = self.robust_provider.subscription_buffer_capacity; + #[cfg(feature = "http-subscription")] + let poll_interval = self.robust_provider.poll_interval; + #[cfg(feature = "http-subscription")] + let allow_http_subscriptions = self.robust_provider.allow_http_subscriptions; // Try each fallback provider for (idx, provider) in fallback_providers.iter().enumerate().skip(start_index) { - // Try WebSocket subscription first if provider supports pubsub - if Self::supports_pubsub(provider) { - let operation = move |p: RootProvider| async move { p.subscribe_blocks().await }; - - if let Ok(sub) = - self.robust_provider.try_provider_with_timeout(provider, &operation).await + let operation = move |p: RootProvider| async move { + #[cfg(feature = "http-subscription")] { - info!( - fallback_index = idx, - "Subscription switched to fallback provider (WebSocket)" - ); - self.backend = SubscriptionBackend::WebSocket(sub); - self.current_fallback_index = Some(idx); - return Ok(()); + let not_pubsub = p.client().pubsub_frontend().is_none(); + if not_pubsub && allow_http_subscriptions { + return p.watch_blocks().await.map(|builder| { + builder + .with_poll_interval(poll_interval) + .with_channel_size(subscription_buffer_capacity) + .into() + }); + } } - } + p.subscribe_blocks() + .channel_size(subscription_buffer_capacity) + .await + .map(Into::>::into) + }; - // Try HTTP polling if enabled - #[cfg(feature = "http-subscription")] - if self.robust_provider.allow_http_subscriptions - && let Ok(http_sub) = HttpPollingSubscription::new( - self.robust_provider.clone(), - self.http_config.clone(), - ) - .await + if let Ok(backend) = + self.robust_provider.try_provider_with_timeout(provider, &operation).await { - info!( - fallback_index = idx, - "Subscription switched to fallback provider (HTTP polling)" - ); - self.backend = SubscriptionBackend::HttpPolling(http_sub); + info!(fallback_index = idx, "Subscription switched to fallback provider"); + self.backend = backend; self.current_fallback_index = Some(idx); return Ok(()); } @@ -339,11 +301,6 @@ impl RobustSubscription { self.current_fallback_index.is_some() } - /// Check if a provider supports native pubsub (WebSocket) - fn supports_pubsub(provider: &RootProvider) -> bool { - provider.client().pubsub_frontend().is_some() - } - /// Check if the subscription channel is empty (no pending messages) #[must_use] pub fn is_empty(&self) -> bool { diff --git a/tests/http_subscription.rs b/tests/http_subscription.rs index bfa279f..88b5927 100644 --- a/tests/http_subscription.rs +++ b/tests/http_subscription.rs @@ -15,7 +15,7 @@ use alloy::{ providers::{Provider, ProviderBuilder, RootProvider, ext::AnvilApi}, }; use common::{BUFFER_TIME, SHORT_TIMEOUT}; -use robust_provider::{RobustProviderBuilder, SubscriptionError}; +use robust_provider::{Error, RobustProviderBuilder}; use tokio_stream::StreamExt; // ============================================================================ @@ -434,7 +434,7 @@ async fn test_all_providers_fail_returns_error() -> anyhow::Result<()> { Ok(Err(e)) => { // Expected - got an error assert!( - matches!(e, SubscriptionError::Timeout | SubscriptionError::RpcError(_)), + matches!(e, Error::Timeout | Error::RpcError(_)), "Expected Timeout or RpcError, got {e:?}", ); } @@ -721,10 +721,10 @@ async fn test_single_fallback_timeout_exhausts_providers() -> anyhow::Result<()> #[allow(clippy::match_same_arms)] match result { - Ok(Err(SubscriptionError::Timeout)) => { + Ok(Err(Error::Timeout)) => { // Expected: all providers exhausted, returns timeout error } - Ok(Err(SubscriptionError::RpcError(_))) => { + Ok(Err(Error::RpcError(_))) => { // Also acceptable: RPC error from dead providers } Ok(Ok(block)) => { diff --git a/tests/rpc_failover.rs b/tests/rpc_failover.rs index dfceb28..48bb988 100644 --- a/tests/rpc_failover.rs +++ b/tests/rpc_failover.rs @@ -8,6 +8,7 @@ use alloy::{ eips::BlockNumberOrTag, node_bindings::Anvil, providers::{Provider, ProviderBuilder, ext::AnvilApi}, + transports::{RpcError, TransportErrorKind}, }; use robust_provider::{Error, RobustProviderBuilder}; @@ -148,18 +149,29 @@ async fn test_operation_completes_when_provider_unavailable() -> anyhow::Result< let provider = ProviderBuilder::new().connect_http(endpoint); - let robust = RobustProviderBuilder::fragile(provider) - .call_timeout(Duration::from_secs(2)) - .build() - .await?; + let timeout = Duration::from_secs(5); + + let robust = RobustProviderBuilder::fragile(provider).call_timeout(timeout).build().await?; let start = Instant::now(); let result = robust.get_block_number().await; let elapsed = start.elapsed(); // Should fail (connection refused) and not hang - assert!(result.is_err()); - assert!(elapsed < Duration::from_secs(5)); + let err = result.expect_err("expected RPC error due to unavailable provider"); + match err { + Error::RpcError(e) => { + let e = e.as_ref(); + match e { + RpcError::Transport(TransportErrorKind::Custom(_)) => {} + other => panic!( + "expected RpcError::Transport(TransportErrorKind::Custom), got {other:?}" + ), + } + } + other => panic!("expected Error::RpcError, got {other:?}"), + } + assert!(elapsed < timeout); Ok(()) } diff --git a/tests/subscription.rs b/tests/subscription.rs index 64969e7..4d72b8b 100644 --- a/tests/subscription.rs +++ b/tests/subscription.rs @@ -14,8 +14,7 @@ use alloy::{ }; use common::{BUFFER_TIME, RECONNECT_INTERVAL, SHORT_TIMEOUT, spawn_ws_anvil}; use robust_provider::{ - DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, RobustProviderBuilder, RobustSubscriptionStream, - SubscriptionError, + DEFAULT_SUBSCRIPTION_BUFFER_CAPACITY, Error, RobustProviderBuilder, RobustSubscriptionStream, }; use tokio::time::sleep; use tokio_stream::StreamExt; @@ -209,11 +208,11 @@ async fn test_stream_continues_streaming_errors() -> anyhow::Result<()> { assert_next_block!(stream, 1); // Trigger timeout error - the stream will continue to stream errors - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); // Without fallbacks, subsequent calls will continue to return errors // (not None, since only Error::Closed terminates the stream) - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -276,7 +275,7 @@ async fn subscription_fails_with_no_fallbacks() -> anyhow::Result<()> { assert_next_block!(stream, 1); // No fallback available - should error after timeout - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -307,7 +306,7 @@ async fn ws_fails_http_fallback_returns_primary_error() -> anyhow::Result<()> { assert_next_block!(stream, 2); // Verify: HTTP fallback can't provide subscription, so we get an error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -342,7 +341,7 @@ async fn test_single_fallback_provider() -> anyhow::Result<()> { trigger_failover(&mut stream, fallback.clone(), 1).await?; // FB -> try PP (fails) -> no more fallbacks -> error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -381,7 +380,7 @@ async fn subscription_cycles_through_multiple_fallbacks() -> anyhow::Result<()> assert_next_block!(stream, 2); // FP2 times out -> tries PP (fails) -> no more fallbacks -> error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -423,7 +422,7 @@ async fn test_many_fallback_providers() -> anyhow::Result<()> { trigger_failover_with_delay(&mut stream, fb_4.clone(), 1, SHORT_TIMEOUT).await?; trigger_failover_with_delay(&mut stream, fb_5.clone(), 1, SHORT_TIMEOUT).await?; - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -688,7 +687,7 @@ async fn test_backend_gone_error_propagation() -> anyhow::Result<()> { drop(anvil); // Should get BackendGone or Timeout error - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -713,7 +712,7 @@ async fn test_immediate_consecutive_failures() -> anyhow::Result<()> { drop(anvil); // First failure - assert!(matches!(stream.next().await.unwrap(), Err(SubscriptionError::Timeout))); + assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } @@ -738,7 +737,7 @@ async fn test_subscription_lagged_error() -> anyhow::Result<()> { // First recv should return Lagged error (skipped some blocks) let result = subscription.recv().await; - assert!(matches!(result, Err(SubscriptionError::Lagged(_)))); + assert!(matches!(result, Err(Error::Lagged(_)))); Ok(()) } From 5eab3ee73297e6f31de1c5b7cf9606685e2d03b9 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Fri, 27 Feb 2026 09:42:57 +0100 Subject: [PATCH 2/4] add test_mixed_chain_skips_http_until_ws_is_found --- tests/subscription.rs | 55 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/tests/subscription.rs b/tests/subscription.rs index 4d72b8b..1f7edf2 100644 --- a/tests/subscription.rs +++ b/tests/subscription.rs @@ -305,12 +305,67 @@ async fn ws_fails_http_fallback_returns_primary_error() -> anyhow::Result<()> { ws_provider.anvil_mine(Some(1), None).await?; assert_next_block!(stream, 2); + // Mine on HTTP fallback to ensure that even if it has blocks, it still cannot serve + // a subscription when the `http-subscription` feature is disabled. + let http_clone = http_provider.clone(); + tokio::spawn(async move { + sleep(Duration::from_secs(1) + BUFFER_TIME).await; + http_clone.anvil_mine(Some(5), None).await.unwrap(); + }); + // Verify: HTTP fallback can't provide subscription, so we get an error assert!(matches!(stream.next().await.unwrap(), Err(Error::Timeout))); Ok(()) } +#[tokio::test] +async fn test_mixed_chain_skips_http_until_ws_is_found() -> anyhow::Result<()> { + // Chain: + // - Primary: HTTP (no pubsub) + // - Fallback #1: HTTP (no pubsub) + // - Fallback #2: WS (pubsub) + // With `http-subscription` feature disabled, subscribe_blocks should eventually succeed + // by selecting the WS fallback. + + let anvil_http_primary = Anvil::new().try_spawn()?; + let http_primary = ProviderBuilder::new().connect_http(anvil_http_primary.endpoint_url()); + + let anvil_http_fb = Anvil::new().try_spawn()?; + let http_fallback = ProviderBuilder::new().connect_http(anvil_http_fb.endpoint_url()); + + let (_anvil_ws, ws_fallback) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(http_primary) + .fallback(http_fallback.clone()) + .fallback(ws_fallback.clone()) + .call_timeout(Duration::from_secs(2)) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + // This should succeed by skipping non-pubsub HTTP providers and using the WS fallback. + let mut subscription = robust.subscribe_blocks().await?; + + // Mine blocks on both HTTP providers; if an HTTP provider were incorrectly used for + // subscription, we'd observe those blocks. + http_fallback.anvil_mine(Some(10), None).await?; + let http_primary_for_mining = + ProviderBuilder::new().connect_http(anvil_http_primary.endpoint_url()); + http_primary_for_mining.anvil_mine(Some(20), None).await?; + + // Mine exactly one block on WS fallback and ensure we receive WS block #1. + ws_fallback.anvil_mine(Some(1), None).await?; + let header = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timed out") + .expect("recv error"); + assert_eq!(header.number, 1); + assert!(subscription.is_empty()); + + Ok(()) +} + // ============================================================================ // Fallback Cycling Tests // ============================================================================ From c89615cfbacc7411a222422dbf1e9bf889983e60 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Fri, 27 Feb 2026 10:13:09 +0100 Subject: [PATCH 3/4] add test-log crate --- Cargo.lock | 209 +++++++++++++++++++++++++++++++++++++++++++++++++++++ Cargo.toml | 1 + 2 files changed, 210 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 2b18ee6..ac3bbe7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,15 @@ # It is not intended for manual editing. version = 4 +[[package]] +name = "aho-corasick" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" +dependencies = [ + "memchr", +] + [[package]] name = "allocator-api2" version = "0.2.21" @@ -718,6 +727,56 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43d5b281e737544384e969a5ccad3f1cdd24b48086a0fc1b2a5262a26b8f4f4a" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" + +[[package]] +name = "anstyle-parse" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4e7644824f0aa2c7b9384579234ef10eb7efb6a0deb83f9630a49594dd9c15c2" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] + [[package]] name = "anyhow" version = "1.0.100" @@ -1193,6 +1252,12 @@ dependencies = [ "windows-link", ] +[[package]] +name = "colorchoice" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" + [[package]] name = "const-hex" version = "1.17.0" @@ -1534,6 +1599,27 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "env_filter" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a1c3cc8e57274ec99de65301228b537f1e4eedc1b8e0f9411c6caac8ae7308f" +dependencies = [ + "log", +] + +[[package]] +name = "env_logger" +version = "0.11.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2daee4ea451f429a58296525ddf28b45a3b64f1acf6587e2067437bb11e218d" +dependencies = [ + "anstream", + "anstyle", + "env_filter", + "log", +] + [[package]] name = "equivalent" version = "1.0.2" @@ -2163,6 +2249,12 @@ dependencies = [ "serde", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.10.5" @@ -2239,6 +2331,12 @@ dependencies = [ "sha3-asm", ] +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + [[package]] name = "libc" version = "0.2.179" @@ -2304,6 +2402,15 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "matchers" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" +dependencies = [ + "regex-automata", +] + [[package]] name = "memchr" version = "2.7.6" @@ -2321,6 +2428,15 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "nu-ansi-term" +version = "0.50.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" +dependencies = [ + "windows-sys 0.61.2", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -2407,6 +2523,12 @@ version = "1.21.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "42f5e15c9953c5e4ccceeb2e7382a716482c34515315f7b03532b8b4e8393d2d" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "parity-scale-codec" version = "3.7.5" @@ -2817,6 +2939,17 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "regex-automata" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + [[package]] name = "regex-syntax" version = "0.8.8" @@ -2904,6 +3037,7 @@ dependencies = [ "backon", "futures-util", "serde_json", + "test-log", "thiserror", "tokio", "tokio-stream", @@ -3281,6 +3415,15 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "sharded-slab" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6" +dependencies = [ + "lazy_static", +] + [[package]] name = "shlex" version = "1.3.0" @@ -3450,6 +3593,28 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "test-log" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37d53ac171c92a39e4769491c4b4dde7022c60042254b5fc044ae409d34a24d4" +dependencies = [ + "env_logger", + "test-log-macros", + "tracing-subscriber", +] + +[[package]] +name = "test-log-macros" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be35209fd0781c5401458ab66e4f98accf63553e8fae7425503e92fdd319783b" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.114", +] + [[package]] name = "thiserror" version = "2.0.17" @@ -3470,6 +3635,15 @@ dependencies = [ "syn 2.0.114", ] +[[package]] +name = "thread_local" +version = "1.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f60246a4944f24f6e018aa17cdeffb7818b76356965d03b07d6a9886e8962185" +dependencies = [ + "cfg-if", +] + [[package]] name = "threadpool" version = "1.8.1" @@ -3725,6 +3899,35 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", + "valuable", +] + +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + +[[package]] +name = "tracing-subscriber" +version = "0.3.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" +dependencies = [ + "matchers", + "nu-ansi-term", + "once_cell", + "regex-automata", + "sharded-slab", + "thread_local", + "tracing", + "tracing-core", + "tracing-log", ] [[package]] @@ -3830,6 +4033,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "valuable" version = "0.1.1" diff --git a/Cargo.toml b/Cargo.toml index e046fe2..8cea655 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,6 +35,7 @@ serde_json = "1.0.149" [dev-dependencies] anyhow = "1.0" alloy = { version = "1.1.2", features = ["node-bindings", "provider-ws"] } +test-log = { version = "0.2.18", features = ["trace"] } [package.metadata.docs.rs] all-features = true From 4beb33d04a7dfb46be69f37375abaf2721ad7aaf Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Fri, 27 Feb 2026 10:13:23 +0100 Subject: [PATCH 4/4] add additional tests --- tests/http_subscription.rs | 210 ++++++++++++++++++++++++++++++++++--- 1 file changed, 196 insertions(+), 14 deletions(-) diff --git a/tests/http_subscription.rs b/tests/http_subscription.rs index 88b5927..4cbdbf7 100644 --- a/tests/http_subscription.rs +++ b/tests/http_subscription.rs @@ -81,6 +81,153 @@ async fn test_http_subscription_basic_flow() -> anyhow::Result<()> { Ok(()) } +// ============================================================================ +// Regression Tests +// ============================================================================ + +/// Test: Enabling `allow_http_subscriptions(true)` does not break WS-only chains. +/// +/// This is a regression guard ensuring pubsub-capable providers still use WS subscriptions +/// even when HTTP subscriptions are enabled. +#[tokio::test] +async fn test_ws_only_chain_works_with_http_subscriptions_enabled() -> anyhow::Result<()> { + let (anvil_primary, primary) = spawn_ws_anvil().await?; + let (_anvil_fallback, fallback) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(primary.clone()) + .fallback(fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .subscription_timeout(SHORT_TIMEOUT) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Receive initial block from WS primary. + primary.anvil_mine(Some(1), None).await?; + // mine different number of blocks on fallback node + fallback.anvil_mine(Some(5), None).await?; + + // should get block from primary + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + assert!(subscription.is_empty()); + + // Kill WS primary and ensure we can still fail over to WS fallback. + drop(anvil_primary); + + tokio::spawn(async move { + // sleep just enough before mining to ensure subscription switches to this fallback provider + tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; + fallback.anvil_mine(Some(1), None).await.unwrap(); + }); + + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert_eq!(block.number, 6); + assert!(subscription.is_empty()); + + Ok(()) +} + +/// Test: With mixed fallbacks, HTTP is used when allowed, and WS is used if HTTP dies. +/// +/// Chain: +/// - Primary: WS (pubsub) +/// - Fallback #1: HTTP (polling) +/// - Fallback #2: WS (pubsub) +#[tokio::test] +async fn test_mixed_fallback_ordering_ws_to_http_to_ws() -> anyhow::Result<()> { + let (anvil_ws_primary, ws_primary) = spawn_ws_anvil().await?; + let (anvil_http, http_fallback) = spawn_http_anvil().await?; + let (_anvil_ws2, ws_fallback) = spawn_ws_anvil().await?; + + let robust = RobustProviderBuilder::fragile(ws_primary.clone()) + .fallback(http_fallback.clone()) + .fallback(ws_fallback.clone()) + .allow_http_subscriptions(true) + .poll_interval(TEST_POLL_INTERVAL) + .max_retries(0) + .min_delay(Duration::from_millis(0)) + // Same reasoning as `test_failover_ws_to_http_on_provider_death`. + .call_timeout(Duration::from_millis(200)) + .subscription_timeout(Duration::from_secs(2)) + .build() + .await?; + + let mut subscription = robust.subscribe_blocks().await?; + + // Confirm we start on WS primary. + ws_primary.anvil_mine(Some(1), None).await?; + let block = subscription.recv().await?; + assert_eq!(block.number, 1); + + // Kill WS primary to force failover to HTTP fallback. + drop(anvil_ws_primary); + let http_clone = http_fallback.clone(); + let http_mining_task = tokio::spawn(async move { + tokio::time::sleep(BUFFER_TIME).await; + + // Mine long enough to cover the failover window. + for _ in 0..120 { + if http_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }); + + // Must receive a block after WS primary died; this should come from HTTP fallback. + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert!(block.number >= 1); + + // Stop mining on HTTP so we don't enqueue extra hashes while switching away. + http_mining_task.abort(); + + // Drain any already-enqueued HTTP hashes to avoid `BlockNotFound` after the HTTP provider is + // dropped and robust-provider routes `get_block_by_hash` to a different backend. + for _ in 0..50 { + if subscription.is_empty() { + break; + } + + // Use an outer timeout so we don't block here if `is_empty()` is stale. + let _ = tokio::time::timeout(Duration::from_millis(200), subscription.recv()).await; + } + + // Now kill HTTP fallback too, and ensure we can fail over to WS fallback. + drop(anvil_http); + let ws2_clone = ws_fallback.clone(); + tokio::spawn(async move { + // Wait long enough for: + // - the HTTP polling recv() to time out + // - fallback switching logic to establish a WS subscription + tokio::time::sleep(Duration::from_millis(2500)).await; + + // Mine repeatedly to avoid racing with WS subscription establishment. + for _ in 0..20 { + if ws2_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } + }); + + let block = tokio::time::timeout(Duration::from_secs(5), subscription.recv()) + .await + .expect("timeout") + .expect("recv error"); + assert!(block.number >= 1); + + Ok(()) +} + /// Test: HTTP subscription correctly receives multiple consecutive blocks #[tokio::test] async fn test_http_subscription_multiple_blocks() -> anyhow::Result<()> { @@ -152,7 +299,7 @@ async fn test_http_subscription_as_stream() -> anyhow::Result<()> { /// /// Verification: We confirm failover by checking that after WS death, /// we still receive blocks (which must come from HTTP since WS is dead) -#[tokio::test] +#[test_log::test(tokio::test)] async fn test_failover_ws_to_http_on_provider_death() -> anyhow::Result<()> { let (anvil_ws, ws_provider) = spawn_ws_anvil().await?; let (_anvil_http, http_provider) = spawn_http_anvil().await?; @@ -161,6 +308,8 @@ async fn test_failover_ws_to_http_on_provider_death() -> anyhow::Result<()> { .fallback(http_provider.clone()) .allow_http_subscriptions(true) .poll_interval(TEST_POLL_INTERVAL) + // Ensure robust-provider block fetching can fail over within the recv timeout. + .call_timeout(SHORT_TIMEOUT / 2) .subscription_timeout(SHORT_TIMEOUT) .build() .await?; @@ -169,17 +318,32 @@ async fn test_failover_ws_to_http_on_provider_death() -> anyhow::Result<()> { // Receive initial block from WS ws_provider.anvil_mine(Some(1), None).await?; + + // mine different number of blocks on fallback + http_provider.anvil_mine(Some(5), None).await?; + + // only primary blocks are received let block = subscription.recv().await?; assert_eq!(block.number, 1, "Should receive from WS primary"); + assert!(subscription.is_empty()); // Kill WS provider - this will cause subscription to fail drop(anvil_ws); - // Spawn task to mine on HTTP after timeout triggers failover + // Spawn task to mine repeatedly on HTTP after timeout triggers failover. + // Mining just once can be flaky if it happens before the HTTP poller is fully established. let http_clone = http_provider.clone(); - tokio::spawn(async move { - tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; - http_clone.anvil_mine(Some(1), None).await.unwrap(); + let http_mining_task = tokio::spawn(async move { + // Start mining soon and keep mining long enough to cover the failover window. + // Failover only happens after `subscription_timeout` elapses on the WS backend. + tokio::time::sleep(BUFFER_TIME).await; + + for _ in 0..120 { + if http_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } }); // Should eventually receive a block - since WS is dead, this MUST be from HTTP @@ -189,8 +353,10 @@ async fn test_failover_ws_to_http_on_provider_death() -> anyhow::Result<()> { .expect("recv error"); // We received a block after WS died, proving failover worked - // (HTTP starts at genesis, so we get block 0 or 1 depending on timing) - assert!(block.number <= 1, "Should receive low block number from HTTP fallback"); + // The block number may be > 5 because the test mines multiple blocks to avoid races. + assert!(block.number >= 5, "Should receive a block from HTTP fallback"); + + http_mining_task.abort(); Ok(()) } @@ -502,13 +668,16 @@ async fn test_poll_interval_propagated_from_builder() -> anyhow::Result<()> { let (_anvil_http, http_provider) = spawn_http_anvil().await?; // Use a distinctive poll interval that's different from the default (12s) - let custom_poll_interval = Duration::from_millis(30); + let custom_poll_interval = Duration::from_millis(500); let robust = RobustProviderBuilder::fragile(ws_provider.clone()) .fallback(http_provider.clone()) .allow_http_subscriptions(true) .poll_interval(custom_poll_interval) - .subscription_timeout(SHORT_TIMEOUT) + // Ensure robust-provider block fetching can fail over within the recv timeout. + // Keep this very small so per-block fetching doesn't dominate the poll-interval timing. + .call_timeout(Duration::from_millis(50)) + .subscription_timeout(Duration::from_secs(2)) .build() .await?; @@ -516,17 +685,28 @@ async fn test_poll_interval_propagated_from_builder() -> anyhow::Result<()> { let mut subscription = robust.subscribe_blocks().await?; ws_provider.anvil_mine(Some(1), None).await?; + + http_provider.anvil_mine(Some(5), None).await?; + let block = subscription.recv().await?; assert_eq!(block.number, 1); + assert!(subscription.is_empty()); // Kill WS to force failover to HTTP drop(anvil_ws); // Mine on HTTP and wait for failover let http_clone = http_provider.clone(); - tokio::spawn(async move { - tokio::time::sleep(SHORT_TIMEOUT + BUFFER_TIME).await; - http_clone.anvil_mine(Some(1), None).await.unwrap(); + let http_mining_task = tokio::spawn(async move { + tokio::time::sleep(BUFFER_TIME).await; + + // Mine long enough to cover the failover window. + for _ in 0..120 { + if http_clone.anvil_mine(Some(1), None).await.is_err() { + break; + } + tokio::time::sleep(Duration::from_millis(50)).await; + } }); // Should receive block from HTTP fallback @@ -536,7 +716,9 @@ async fn test_poll_interval_propagated_from_builder() -> anyhow::Result<()> { .expect("recv error"); // Verify we got a block (proving failover worked with correct config) - assert!(block.number <= 1); + assert!(block.number >= 5); + + http_mining_task.abort(); // Now verify the poll interval is being used by timing block reception // Mine another block and measure how long until we receive it @@ -552,7 +734,7 @@ async fn test_poll_interval_propagated_from_builder() -> anyhow::Result<()> { // Should take roughly poll_interval to detect the new block // Allow some margin but it should be much less than the default 12s assert!( - elapsed < Duration::from_millis(500), + elapsed < custom_poll_interval + BUFFER_TIME, // multiply to add margin "Poll interval not respected. Elapsed {elapsed:?}, expected ~{custom_poll_interval:?}", );