From b9bbc558b97e5f21e6e42634f094103e8764afbb Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Wed, 4 Mar 2026 12:20:28 +0100 Subject: [PATCH 1/5] refactor stream creation when building SubscriptionBackend::HttpPolling --- Cargo.lock | 1 - Cargo.toml | 1 - src/robust_provider/subscription.rs | 16 ++++++++-------- 3 files changed, 8 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ac3bbe7..bad7198 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3035,7 +3035,6 @@ dependencies = [ "alloy", "anyhow", "backon", - "futures-util", "serde_json", "test-log", "thiserror", diff --git a/Cargo.toml b/Cargo.toml index 8cea655..7249f03 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,7 +27,6 @@ backon = "1.6.0" tokio-stream = "0.1.17" thiserror = "2.0.17" tokio-util = "0.7.17" -futures-util = "0.3" tracing = { version = "0.1", optional = true } serde_json = "1.0.149" diff --git a/src/robust_provider/subscription.rs b/src/robust_provider/subscription.rs index 4a0e7c4..3f8ecc1 100644 --- a/src/robust_provider/subscription.rs +++ b/src/robust_provider/subscription.rs @@ -50,18 +50,18 @@ impl From> for SubscriptionBackend From>> for SubscriptionBackend { fn from(value: PollerBuilder<(U256,), Vec>) -> Self { - use futures_util::{StreamExt, stream}; + use tokio_stream::StreamExt; let (sender, receiver) = mpsc::channel(value.channel_size()); - // Spawn a task to forward block hashes to the channel - let stream = value.into_stream().flat_map(stream::iter); + let mut stream = value.into_stream(); tokio::spawn(async move { - let mut stream = std::pin::pin!(stream); - while let Some(hash) = stream.next().await { - if sender.send(hash).await.is_err() { - // Receiver dropped, stop polling - break; + while let Some(hashes) = stream.next().await { + for hash in hashes { + if sender.send(hash).await.is_err() { + // Receiver dropped, stop polling + break; + } } } }); From 669022952a8cd86d3191e4ae8e3867571ffe6603 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Wed, 4 Mar 2026 12:28:25 +0100 Subject: [PATCH 2/5] explain the subscribe_blocks logic in a comment --- src/robust_provider/provider.rs | 9 ++++++--- src/robust_provider/subscription.rs | 6 ++++++ 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index 50e78b2..f74a577 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -501,6 +501,12 @@ impl RobustProvider { pub async fn subscribe_blocks(&self) -> Result, Error> { let subscription: SubscriptionBackend = self .try_operation_with_failover(move |provider| async move { + // if HTTP subscriptions are enabled and the provider currently being tried is HTTP, + // we will attempt to connect using it. + // Otherwise try subscribing through a PubSub operation, and if the provider is HTTP + // just let it fail; the error will be non-retriable, so the algorithm will + // automatically switch to the next fallback provider (see + // `try_provider_with_timeout`). #[cfg(feature = "http-subscription")] { let not_pubsub = provider.client().pubsub_frontend().is_none(); @@ -513,9 +519,6 @@ impl RobustProvider { }); } } - // Non-pubsub providers will properly trigger fallback logic without retries because - // they return an appropriate RPC error, see the match logic in - // `try_provider_with_timeout`. provider .subscribe_blocks() .channel_size(self.subscription_buffer_capacity) diff --git a/src/robust_provider/subscription.rs b/src/robust_provider/subscription.rs index 3f8ecc1..05c6de3 100644 --- a/src/robust_provider/subscription.rs +++ b/src/robust_provider/subscription.rs @@ -204,6 +204,12 @@ impl RobustSubscription { let allow_http_subscriptions = self.robust_provider.allow_http_subscriptions; let operation = move |provider: RootProvider| async move { + // if HTTP subscriptions are enabled and the provider currently being tried is HTTP, + // we will attempt to connect using it. + // Otherwise try subscribing through a PubSub operation, and if the provider is HTTP + // just let it fail; the error will be non-retriable, so the algorithm will + // automatically switch to the next fallback provider (see + // `try_provider_with_timeout`). #[cfg(feature = "http-subscription")] { let not_pubsub = provider.client().pubsub_frontend().is_none(); From 14d3d2a073111bc69d6b716f8d1acbae3717dc6c Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Wed, 4 Mar 2026 12:29:06 +0100 Subject: [PATCH 3/5] remove mentions of require_pubsub --- src/robust_provider/provider.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/robust_provider/provider.rs b/src/robust_provider/provider.rs index f74a577..f02813d 100644 --- a/src/robust_provider/provider.rs +++ b/src/robust_provider/provider.rs @@ -542,8 +542,6 @@ impl RobustProvider { /// If the timeout is exceeded and fallback providers are available, it will /// attempt to use each fallback provider in sequence. /// - /// If `require_pubsub` is true, providers that don't support pubsub will be skipped. - /// /// # Errors /// /// * [`CoreError::RpcError`] - if no fallback providers succeeded; contains the last error From 6ce15abba162b519604a8e21483ed8201a53e1e8 Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Wed, 4 Mar 2026 12:44:19 +0100 Subject: [PATCH 4/5] update doc wording in builder.rs --- src/robust_provider/builder.rs | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/src/robust_provider/builder.rs b/src/robust_provider/builder.rs index 65de7af..a419f73 100644 --- a/src/robust_provider/builder.rs +++ b/src/robust_provider/builder.rs @@ -153,6 +153,16 @@ impl> RobustProviderBuilder { /// # Feature Flag /// /// This method requires the `http-subscription` feature. + /// + /// # Example + /// + /// ```rust,ignore + /// let robust = RobustProviderBuilder::new(http_provider) + /// .allow_http_subscriptions(true) + /// .poll_interval(Duration::from_secs(6)) // For faster chains + /// .build() + /// .await?; + /// ``` #[cfg(feature = "http-subscription")] #[must_use] pub fn poll_interval(mut self, interval: Duration) -> Self { @@ -169,7 +179,9 @@ impl> RobustProviderBuilder { /// /// * **Latency**: New blocks detected with up to `poll_interval` delay /// * **RPC Load**: Generates one RPC call per `poll_interval` - /// * **Missed Blocks**: If `poll_interval` > block time, intermediate blocks may be missed + /// * **Intermediate Blocks**: Depending on the node/provider semantics, you may not observe + /// every intermediate block when `poll_interval` is larger than the chain's block time (e.g. + /// if only the latest head is exposed). /// /// # Feature Flag /// @@ -180,7 +192,6 @@ impl> RobustProviderBuilder { /// ```rust,ignore /// let robust = RobustProviderBuilder::new(http_provider) /// .allow_http_subscriptions(true) - /// .poll_interval(Duration::from_secs(6)) // For faster chains /// .build() /// .await?; /// ``` From eeef439b0f1dcd13f870fdc69f3abc50e2b5b38a Mon Sep 17 00:00:00 2001 From: 0xNeshi Date: Wed, 4 Mar 2026 12:46:33 +0100 Subject: [PATCH 5/5] update readme to mention http subscriptions --- README.md | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/README.md b/README.md index 7dc0712..5ed28ff 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,26 @@ while let Some(result) = stream.next().await { - When a fallback fails, the primary is tried first before moving to the next fallback. - The `Lagged` error indicates the consumer is not keeping pace with incoming blocks. +#### HTTP-based subscriptions (feature flag) + +By default, subscriptions use WebSocket/pubsub-capable providers. Normally, HTTP-only providers are skipped during subscription retries. If your environment only exposes HTTP endpoints, you can enable HTTP-based block subscriptions via polling using the `http-subscription` Cargo feature: + +```toml +[dependencies] +robust-provider = { version = "1.0.0", features = ["http-subscription"] } +``` + +With this feature enabled and `allow_http_subscriptions(true)` is set, those HTTP providers can also act as subscription sources via polling, and are treated like regular pubsub-capable providers in the retry/failover logic: + +```rust +let robust = RobustProviderBuilder::new(http_provider) + .allow_http_subscriptions(true) + // Optional: tune how often to poll for new blocks (defaults to ~12s) + .poll_interval(Duration::from_secs(12)) + .build() + .await?; +``` + --- ## Provider Conversion