Skip to content

feat: add HTTP subscription support via polling#32

Open
smartprogrammer93 wants to merge 26 commits intoOpenZeppelin:mainfrom
smartprogrammer93:feat/http-subscription
Open

feat: add HTTP subscription support via polling#32
smartprogrammer93 wants to merge 26 commits intoOpenZeppelin:mainfrom
smartprogrammer93:feat/http-subscription

Conversation

@smartprogrammer93
Copy link

@smartprogrammer93 smartprogrammer93 commented Jan 29, 2026

Summary

Implements HTTP subscription support via polling, allowing HTTP providers to participate in block subscriptions alongside WebSocket providers. This addresses issue #23.

Features

  • HTTP polling subscriptions: Uses Alloy's watch_blocks() API with eth_newBlockFilter + eth_getFilterChanges for efficient block polling
  • Seamless integration: Works transparently with existing RobustSubscription API - callers use the same subscribe_blocks() and recv() interface
  • Automatic deduplication: Alloy's filter-based approach prevents duplicate block emissions
  • Failover support: HTTP providers can participate in subscription failover chains (WS → HTTP and HTTP → WS)
  • Configuration propagation: Poll interval, timeouts, and buffer capacity settings carry over during failover

Usage

Enable the http-subscription feature flag:

[dependencies]
robust-provider = { version = "...", features = ["http-subscription"] }

Build with HTTP subscription support:

let provider = RobustProviderBuilder::new(ws_provider)
    .fallback(http_provider)
    .allow_http_subscriptions(true)
    .poll_interval(Duration::from_secs(12))
    .subscription_timeout(Duration::from_secs(60))
    .build()
    .await?;

// Use exactly like before - HTTP polling is transparent
let mut subscription = provider.subscribe_blocks().await?;
while let Ok(block) = subscription.recv().await {
    println!("Block {}", block.number);
}

Breaking Changes

None. Feature is behind http-subscription flag and disabled by default.


Thanks to @PoulavBhowmick03 for the massive improvements to this implementation.

Closes #23

Implements OpenZeppelin#23 - Support HTTP Subscription

This PR adds the ability for HTTP providers to participate in block
subscriptions via polling, enabling use cases where WebSocket connections
are not available (e.g., behind load balancers).

## Changes

### New Feature (behind `http-subscription` feature flag)
- Add `HttpPollingSubscription` that polls `eth_getBlockByNumber(latest)`
  at configurable intervals
- Add `SubscriptionBackend` enum to handle both WebSocket and HTTP backends
- Add `poll_interval()` and `allow_http_subscriptions()` builder methods
- Seamless failover between mixed WS/HTTP provider chains

### Files
- `src/robust_provider/http_subscription.rs` - New HTTP polling module
- `src/robust_provider/subscription.rs` - Unified backend handling
- `src/robust_provider/builder.rs` - New configuration options
- `src/robust_provider/provider.rs` - Updated subscribe_blocks()
- `Cargo.toml` - Added `http-subscription` feature flag

## Usage

```rust
let robust = RobustProviderBuilder::new(http_provider)
    .allow_http_subscriptions(true)
    .poll_interval(Duration::from_secs(12))
    .build()
    .await?;

let mut sub = robust.subscribe_blocks().await?;
```

## Trade-offs (documented)
- Latency: up to `poll_interval` delay for block detection
- RPC Load: one call per `poll_interval`
- Feature-gated to ensure explicit opt-in

Closes OpenZeppelin#23
@smartprogrammer93 smartprogrammer93 marked this pull request as draft January 29, 2026 14:14
Add comprehensive integration tests in tests/http_subscription.rs:

- test_http_subscription_basic_flow
- test_http_subscription_multiple_blocks
- test_http_subscription_as_stream
- test_failover_from_ws_to_http
- test_failover_from_http_to_ws
- test_mixed_provider_chain_failover
- test_http_reconnects_to_ws_primary
- test_http_only_no_ws_providers
- test_http_subscription_disabled_falls_back_to_ws
- test_custom_poll_interval

All tests gated behind #[cfg(feature = "http-subscription")]
Audit findings addressed:

Unit tests (http_subscription.rs):
- Improved test_http_polling_deduplication with better verification
- Renamed test_http_polling_handles_drop → test_http_polling_stops_on_drop
  with clearer verification logic
- Added test_http_subscription_error_types for all error variants
- Added test_http_polling_close_method for close() functionality

Integration tests (tests/http_subscription.rs) - rewritten:
- Removed broken test_http_reconnects_to_ws_primary (was meaningless)
- Removed flawed test_custom_poll_interval, replaced with
  test_poll_interval_is_respected (measures correctly)
- Renamed tests for clarity on what they actually verify
- Added test_http_disabled_no_ws_fails (negative test case)
- Added test_all_providers_fail_returns_error (error handling)
- Added test_http_subscription_survives_temporary_errors
- Added test_http_polling_deduplication (integration level)
- Fixed failover tests to verify behavior correctly
- Removed fragile 'pre-mine to distinguish providers' hacks

Test count: 73 total (19 unit + 12 http integration + 24 subscription + 18 eth)
Tests verify that RPC calls (not just subscriptions) properly:
- Failover to fallback providers when primary dies
- Cycle through multiple fallbacks
- Return errors when all providers exhausted
- Don't retry non-retryable errors (BlockNotFound)
- Complete within bounded time when providers unavailable
- Work correctly for various RPC methods (get_accounts, get_balance, get_block)
Fixes two bugs in HTTP subscription handling:

1. http_config now uses configured values from RobustProviderBuilder
   instead of defaults when a WebSocket subscription is created first.
   This ensures poll_interval, call_timeout, and buffer_capacity are
   respected when failing over to HTTP.

2. HTTP reconnection now validates the provider is reachable before
   claiming success. Uses a short 50ms timeout to quickly fail and
   not block the failover process.

Also fixes test timing in test_failover_http_to_ws_on_provider_death
to mine before subscription timeout instead of after.

Adds two new tests:
- test_poll_interval_propagated_from_builder: verifies config propagation
- test_http_reconnect_validates_provider: verifies reconnect validation
@smartprogrammer93 smartprogrammer93 force-pushed the feat/http-subscription branch 2 times, most recently from f94c84d to b6001af Compare January 29, 2026 15:55
PoulavBhowmick03 and others added 10 commits February 3, 2026 00:10
Implements OpenZeppelin#23 - Support HTTP Subscription

This PR adds the ability for HTTP providers to participate in block
subscriptions via polling, enabling use cases where WebSocket connections
are not available (e.g., behind load balancers).

- Add `HttpPollingSubscription` that polls `eth_getBlockByNumber(latest)`
  at configurable intervals
- Add `SubscriptionBackend` enum to handle both WebSocket and HTTP backends
- Add `poll_interval()` and `allow_http_subscriptions()` builder methods
- Seamless failover between mixed WS/HTTP provider chains

- `src/robust_provider/http_subscription.rs` - New HTTP polling module
- `src/robust_provider/subscription.rs` - Unified backend handling
- `src/robust_provider/builder.rs` - New configuration options
- `src/robust_provider/provider.rs` - Updated subscribe_blocks()
- `Cargo.toml` - Added `http-subscription` feature flag

```rust
let robust = RobustProviderBuilder::new(http_provider)
    .allow_http_subscriptions(true)
    .poll_interval(Duration::from_secs(12))
    .build()
    .await?;

let mut sub = robust.subscribe_blocks().await?;
```

- Latency: up to `poll_interval` delay for block detection
- RPC Load: one call per `poll_interval`
- Feature-gated to ensure explicit opt-in

Closes OpenZeppelin#23
Add comprehensive integration tests in tests/http_subscription.rs:

- test_http_subscription_basic_flow
- test_http_subscription_multiple_blocks
- test_http_subscription_as_stream
- test_failover_from_ws_to_http
- test_failover_from_http_to_ws
- test_mixed_provider_chain_failover
- test_http_reconnects_to_ws_primary
- test_http_only_no_ws_providers
- test_http_subscription_disabled_falls_back_to_ws
- test_custom_poll_interval

All tests gated behind #[cfg(feature = "http-subscription")]
Audit findings addressed:

Unit tests (http_subscription.rs):
- Improved test_http_polling_deduplication with better verification
- Renamed test_http_polling_handles_drop → test_http_polling_stops_on_drop
  with clearer verification logic
- Added test_http_subscription_error_types for all error variants
- Added test_http_polling_close_method for close() functionality

Integration tests (tests/http_subscription.rs) - rewritten:
- Removed broken test_http_reconnects_to_ws_primary (was meaningless)
- Removed flawed test_custom_poll_interval, replaced with
  test_poll_interval_is_respected (measures correctly)
- Renamed tests for clarity on what they actually verify
- Added test_http_disabled_no_ws_fails (negative test case)
- Added test_all_providers_fail_returns_error (error handling)
- Added test_http_subscription_survives_temporary_errors
- Added test_http_polling_deduplication (integration level)
- Fixed failover tests to verify behavior correctly
- Removed fragile 'pre-mine to distinguish providers' hacks

Test count: 73 total (19 unit + 12 http integration + 24 subscription + 18 eth)
Tests verify that RPC calls (not just subscriptions) properly:
- Failover to fallback providers when primary dies
- Cycle through multiple fallbacks
- Return errors when all providers exhausted
- Don't retry non-retryable errors (BlockNotFound)
- Complete within bounded time when providers unavailable
- Work correctly for various RPC methods (get_accounts, get_balance, get_block)
Fixes two bugs in HTTP subscription handling:

1. http_config now uses configured values from RobustProviderBuilder
   instead of defaults when a WebSocket subscription is created first.
   This ensures poll_interval, call_timeout, and buffer_capacity are
   respected when failing over to HTTP.

2. HTTP reconnection now validates the provider is reachable before
   claiming success. Uses a short 50ms timeout to quickly fail and
   not block the failover process.

Also fixes test timing in test_failover_http_to_ws_on_provider_death
to mine before subscription timeout instead of after.

Adds two new tests:
- test_poll_interval_propagated_from_builder: verifies config propagation
- test_http_reconnect_validates_provider: verifies reconnect validation
@smartprogrammer93 smartprogrammer93 marked this pull request as ready for review February 5, 2026 17:17
Copy link
Collaborator

@0xNeshi 0xNeshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!

First "shallower" review iteration, will dive deeper in the following ones

@0xNeshi
Copy link
Collaborator

0xNeshi commented Feb 11, 2026

Also, resolve conflicts please

- Add http-subscription feature to VSCode settings for rust-analyzer
- Make HTTP_RECONNECT_VALIDATION_TIMEOUT public
- Fix HTTP subscription fallback: try fallback providers when primary HTTP fails
- Fix buffer_capacity: use mpsc channel with configured capacity
- Fix error documentation: use proper error list with stars
- Remove unused imports (FutureExt, Stream)
- Add pub const DEFAULT_CALL_TIMEOUT (30 seconds)
- Add pub const DEFAULT_BUFFER_CAPACITY (128)
- Update rustdocs to reference the new constants
- Update Default impl to use constants instead of magic numbers
- Update test to use constants for consistency

Addresses reviewer comment on line 105 about converting constants
into actual pub const values.
Copy link
Collaborator

@LeoPatOZ LeoPatOZ left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello thank you for making this PR! just leaving a couple comments here for now

fn default() -> Self {
Self {
poll_interval: DEFAULT_POLL_INTERVAL,
call_timeout: DEFAULT_CALL_TIMEOUT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have this but I dont actually think we use it anywhere to wrap a call in a timeout?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@PoulavBhowmick03
Copy link

Hey @LeoPatOZ I have addressed your comments, do take a look!

/// # Ok(())
/// # }
/// ```
pub async fn new(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Address warnings

@0xNeshi
Copy link
Collaborator

0xNeshi commented Feb 24, 2026

I think once the remaining change requests are fully addressed, we'll be ready to merge the PR

@0xNeshi
Copy link
Collaborator

0xNeshi commented Feb 24, 2026

And please address all failing CI jobs

@PoulavBhowmick03
Copy link

And please address all failing CI jobs

Do take a look at the changes

Copy link
Collaborator

@0xNeshi 0xNeshi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After all there are still some things to address

fn default() -> Self {
Self {
poll_interval: DEFAULT_POLL_INTERVAL,
call_timeout: DEFAULT_CALL_TIMEOUT,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

/// when used as subscription sources. Only relevant when
/// [`allow_http_subscriptions`](Self::allow_http_subscriptions) is enabled.
///
/// Default is 12 seconds (approximate Ethereum mainnet block time).
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Default is 12 seconds (approximate Ethereum mainnet block time).
/// Default is [`DEFAULT_POLL_INTERVAL`].

//! This module requires the `http-subscription` feature:
//!
//! ```toml
//! robust-provider = { version = "0.2", features = ["http-subscription"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
//! robust-provider = { version = "0.2", features = ["http-subscription"] }
//! robust-provider = { version = "*", features = ["http-subscription"] }

nit: does it make sense to use * for the version here to reduce maintenance effort? Later when actual version is bumped, it's easy to forget to update it in all documentation examples

Comment on lines +100 to +112
/// Interval between polling requests.
///
/// Default: [`DEFAULT_POLL_INTERVAL`] (12 seconds)
pub poll_interval: Duration,

/// Timeout for individual RPC calls.
///
/// Default: [`DEFAULT_CALL_TIMEOUT`] (30 seconds)
pub call_timeout: Duration,

/// Buffer size for the internal channel.
///
/// Default: [`DEFAULT_BUFFER_CAPACITY`]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
/// Interval between polling requests.
///
/// Default: [`DEFAULT_POLL_INTERVAL`] (12 seconds)
pub poll_interval: Duration,
/// Timeout for individual RPC calls.
///
/// Default: [`DEFAULT_CALL_TIMEOUT`] (30 seconds)
pub call_timeout: Duration,
/// Buffer size for the internal channel.
///
/// Default: [`DEFAULT_BUFFER_CAPACITY`]
/// Interval between polling requests.
///
/// Default: [`DEFAULT_POLL_INTERVAL`].
pub poll_interval: Duration,
/// Timeout for individual RPC calls.
///
/// Default: [`DEFAULT_CALL_TIMEOUT`].
pub call_timeout: Duration,
/// Buffer size for the internal channel.
///
/// Default: [`DEFAULT_BUFFER_CAPACITY`].

related to #32 (comment)

Comment on lines +199 to +201
let stream = poller.into_stream().flat_map(stream::iter);
tokio::spawn(async move {
let mut stream = std::pin::pin!(stream);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let stream = poller.into_stream().flat_map(stream::iter);
tokio::spawn(async move {
let mut stream = std::pin::pin!(stream);
let mut stream = poller.into_stream().flat_map(stream::iter);
tokio::spawn(async move {

this works too, is it necessary to pin the stream?

#[cfg(feature = "http-subscription")]
pub use http_subscription::{
DEFAULT_BUFFER_CAPACITY, DEFAULT_CALL_TIMEOUT, DEFAULT_POLL_INTERVAL,
Error as HttpSubscriptionError, HttpPollingSubscription, HttpSubscriptionConfig,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Error as HttpSubscriptionError, HttpPollingSubscription, HttpSubscriptionConfig,
Error, HttpPollingSubscription, HttpSubscriptionConfig,

we export it as Error; lib users alias it if they so desire

Comment on lines +77 to +78
/// Timeout for validating HTTP provider reachability during reconnection
pub const HTTP_RECONNECT_VALIDATION_TIMEOUT: Duration = Duration::from_millis(150);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is never used

Comment on lines +502 to +513
#[cfg(feature = "http-subscription")]
{
let primary_supports_pubsub = self.primary().client().pubsub_frontend().is_some();
if primary_supports_pubsub {
return self.subscribe_blocks_ws().await;
}
return self.subscribe_blocks_http().await;
}

#[cfg(not(feature = "http-subscription"))]
self.subscribe_blocks_ws().await
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
#[cfg(feature = "http-subscription")]
{
let primary_supports_pubsub = self.primary().client().pubsub_frontend().is_some();
if primary_supports_pubsub {
return self.subscribe_blocks_ws().await;
}
return self.subscribe_blocks_http().await;
}
#[cfg(not(feature = "http-subscription"))]
self.subscribe_blocks_ws().await
}
#[cfg(feature = "http-subscription")]
{
let primary_not_pubsub = self.primary().client().pubsub_frontend().is_none();
if primary_not_pubsub {
return self.subscribe_blocks_http().await;
}
}
self.subscribe_blocks_ws().await
}

simplified

Comment on lines +567 to +570
// Primary HTTP subscription failed, try WebSocket on fallback providers
for (fallback_idx, provider) in self.fallback_providers().iter().enumerate() {
// Try WebSocket subscription if supported
if provider.client().pubsub_frontend().is_some() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this mean that if the primary provider fails, no fallback HTTP providers are tried? If so, isn't this a mistake?

///
/// * **Latency**: New blocks are detected with up to `poll_interval` delay
/// * **RPC Load**: One filter poll per interval, plus one `get_block_by_hash` per new block
pub struct HttpPollingSubscription<N: Network> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Managed to refactor the implementation to avoid the need for the dedicated http_subscription module and opened a PR against your branch, please review and merge

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support HTTP Subscription

4 participants