feat: add HTTP subscription support via polling#32
feat: add HTTP subscription support via polling#32smartprogrammer93 wants to merge 26 commits intoOpenZeppelin:mainfrom
Conversation
Implements OpenZeppelin#23 - Support HTTP Subscription This PR adds the ability for HTTP providers to participate in block subscriptions via polling, enabling use cases where WebSocket connections are not available (e.g., behind load balancers). ## Changes ### New Feature (behind `http-subscription` feature flag) - Add `HttpPollingSubscription` that polls `eth_getBlockByNumber(latest)` at configurable intervals - Add `SubscriptionBackend` enum to handle both WebSocket and HTTP backends - Add `poll_interval()` and `allow_http_subscriptions()` builder methods - Seamless failover between mixed WS/HTTP provider chains ### Files - `src/robust_provider/http_subscription.rs` - New HTTP polling module - `src/robust_provider/subscription.rs` - Unified backend handling - `src/robust_provider/builder.rs` - New configuration options - `src/robust_provider/provider.rs` - Updated subscribe_blocks() - `Cargo.toml` - Added `http-subscription` feature flag ## Usage ```rust let robust = RobustProviderBuilder::new(http_provider) .allow_http_subscriptions(true) .poll_interval(Duration::from_secs(12)) .build() .await?; let mut sub = robust.subscribe_blocks().await?; ``` ## Trade-offs (documented) - Latency: up to `poll_interval` delay for block detection - RPC Load: one call per `poll_interval` - Feature-gated to ensure explicit opt-in Closes OpenZeppelin#23
64ba47c to
3a73de3
Compare
Add comprehensive integration tests in tests/http_subscription.rs: - test_http_subscription_basic_flow - test_http_subscription_multiple_blocks - test_http_subscription_as_stream - test_failover_from_ws_to_http - test_failover_from_http_to_ws - test_mixed_provider_chain_failover - test_http_reconnects_to_ws_primary - test_http_only_no_ws_providers - test_http_subscription_disabled_falls_back_to_ws - test_custom_poll_interval All tests gated behind #[cfg(feature = "http-subscription")]
Audit findings addressed: Unit tests (http_subscription.rs): - Improved test_http_polling_deduplication with better verification - Renamed test_http_polling_handles_drop → test_http_polling_stops_on_drop with clearer verification logic - Added test_http_subscription_error_types for all error variants - Added test_http_polling_close_method for close() functionality Integration tests (tests/http_subscription.rs) - rewritten: - Removed broken test_http_reconnects_to_ws_primary (was meaningless) - Removed flawed test_custom_poll_interval, replaced with test_poll_interval_is_respected (measures correctly) - Renamed tests for clarity on what they actually verify - Added test_http_disabled_no_ws_fails (negative test case) - Added test_all_providers_fail_returns_error (error handling) - Added test_http_subscription_survives_temporary_errors - Added test_http_polling_deduplication (integration level) - Fixed failover tests to verify behavior correctly - Removed fragile 'pre-mine to distinguish providers' hacks Test count: 73 total (19 unit + 12 http integration + 24 subscription + 18 eth)
Tests verify that RPC calls (not just subscriptions) properly: - Failover to fallback providers when primary dies - Cycle through multiple fallbacks - Return errors when all providers exhausted - Don't retry non-retryable errors (BlockNotFound) - Complete within bounded time when providers unavailable - Work correctly for various RPC methods (get_accounts, get_balance, get_block)
Fixes two bugs in HTTP subscription handling: 1. http_config now uses configured values from RobustProviderBuilder instead of defaults when a WebSocket subscription is created first. This ensures poll_interval, call_timeout, and buffer_capacity are respected when failing over to HTTP. 2. HTTP reconnection now validates the provider is reachable before claiming success. Uses a short 50ms timeout to quickly fail and not block the failover process. Also fixes test timing in test_failover_http_to_ws_on_provider_death to mine before subscription timeout instead of after. Adds two new tests: - test_poll_interval_propagated_from_builder: verifies config propagation - test_http_reconnect_validates_provider: verifies reconnect validation
f94c84d to
b6001af
Compare
Implements OpenZeppelin#23 - Support HTTP Subscription This PR adds the ability for HTTP providers to participate in block subscriptions via polling, enabling use cases where WebSocket connections are not available (e.g., behind load balancers). - Add `HttpPollingSubscription` that polls `eth_getBlockByNumber(latest)` at configurable intervals - Add `SubscriptionBackend` enum to handle both WebSocket and HTTP backends - Add `poll_interval()` and `allow_http_subscriptions()` builder methods - Seamless failover between mixed WS/HTTP provider chains - `src/robust_provider/http_subscription.rs` - New HTTP polling module - `src/robust_provider/subscription.rs` - Unified backend handling - `src/robust_provider/builder.rs` - New configuration options - `src/robust_provider/provider.rs` - Updated subscribe_blocks() - `Cargo.toml` - Added `http-subscription` feature flag ```rust let robust = RobustProviderBuilder::new(http_provider) .allow_http_subscriptions(true) .poll_interval(Duration::from_secs(12)) .build() .await?; let mut sub = robust.subscribe_blocks().await?; ``` - Latency: up to `poll_interval` delay for block detection - RPC Load: one call per `poll_interval` - Feature-gated to ensure explicit opt-in Closes OpenZeppelin#23
Add comprehensive integration tests in tests/http_subscription.rs: - test_http_subscription_basic_flow - test_http_subscription_multiple_blocks - test_http_subscription_as_stream - test_failover_from_ws_to_http - test_failover_from_http_to_ws - test_mixed_provider_chain_failover - test_http_reconnects_to_ws_primary - test_http_only_no_ws_providers - test_http_subscription_disabled_falls_back_to_ws - test_custom_poll_interval All tests gated behind #[cfg(feature = "http-subscription")]
Audit findings addressed: Unit tests (http_subscription.rs): - Improved test_http_polling_deduplication with better verification - Renamed test_http_polling_handles_drop → test_http_polling_stops_on_drop with clearer verification logic - Added test_http_subscription_error_types for all error variants - Added test_http_polling_close_method for close() functionality Integration tests (tests/http_subscription.rs) - rewritten: - Removed broken test_http_reconnects_to_ws_primary (was meaningless) - Removed flawed test_custom_poll_interval, replaced with test_poll_interval_is_respected (measures correctly) - Renamed tests for clarity on what they actually verify - Added test_http_disabled_no_ws_fails (negative test case) - Added test_all_providers_fail_returns_error (error handling) - Added test_http_subscription_survives_temporary_errors - Added test_http_polling_deduplication (integration level) - Fixed failover tests to verify behavior correctly - Removed fragile 'pre-mine to distinguish providers' hacks Test count: 73 total (19 unit + 12 http integration + 24 subscription + 18 eth)
Tests verify that RPC calls (not just subscriptions) properly: - Failover to fallback providers when primary dies - Cycle through multiple fallbacks - Return errors when all providers exhausted - Don't retry non-retryable errors (BlockNotFound) - Complete within bounded time when providers unavailable - Work correctly for various RPC methods (get_accounts, get_balance, get_block)
Fixes two bugs in HTTP subscription handling: 1. http_config now uses configured values from RobustProviderBuilder instead of defaults when a WebSocket subscription is created first. This ensures poll_interval, call_timeout, and buffer_capacity are respected when failing over to HTTP. 2. HTTP reconnection now validates the provider is reachable before claiming success. Uses a short 50ms timeout to quickly fail and not block the failover process. Also fixes test timing in test_failover_http_to_ws_on_provider_death to mine before subscription timeout instead of after. Adds two new tests: - test_poll_interval_propagated_from_builder: verifies config propagation - test_http_reconnect_validates_provider: verifies reconnect validation
e4d249d to
06a94ff
Compare
0xNeshi
left a comment
There was a problem hiding this comment.
Thanks for the contribution!
First "shallower" review iteration, will dive deeper in the following ones
|
Also, resolve conflicts please |
- Add http-subscription feature to VSCode settings for rust-analyzer - Make HTTP_RECONNECT_VALIDATION_TIMEOUT public - Fix HTTP subscription fallback: try fallback providers when primary HTTP fails - Fix buffer_capacity: use mpsc channel with configured capacity - Fix error documentation: use proper error list with stars - Remove unused imports (FutureExt, Stream)
- Add pub const DEFAULT_CALL_TIMEOUT (30 seconds) - Add pub const DEFAULT_BUFFER_CAPACITY (128) - Update rustdocs to reference the new constants - Update Default impl to use constants instead of magic numbers - Update test to use constants for consistency Addresses reviewer comment on line 105 about converting constants into actual pub const values.
LeoPatOZ
left a comment
There was a problem hiding this comment.
Hello thank you for making this PR! just leaving a couple comments here for now
| fn default() -> Self { | ||
| Self { | ||
| poll_interval: DEFAULT_POLL_INTERVAL, | ||
| call_timeout: DEFAULT_CALL_TIMEOUT, |
There was a problem hiding this comment.
We have this but I dont actually think we use it anywhere to wrap a call in a timeout?
There was a problem hiding this comment.
re: https://github.com/OpenZeppelin/Robust-Provider/pull/32/changes#r2811596761
Is this call_timeout param ever used?
|
Hey @LeoPatOZ I have addressed your comments, do take a look! |
| /// # Ok(()) | ||
| /// # } | ||
| /// ``` | ||
| pub async fn new( |
|
I think once the remaining change requests are fully addressed, we'll be ready to merge the PR |
|
And please address all failing CI jobs |
Do take a look at the changes |
0xNeshi
left a comment
There was a problem hiding this comment.
After all there are still some things to address
| fn default() -> Self { | ||
| Self { | ||
| poll_interval: DEFAULT_POLL_INTERVAL, | ||
| call_timeout: DEFAULT_CALL_TIMEOUT, |
There was a problem hiding this comment.
re: https://github.com/OpenZeppelin/Robust-Provider/pull/32/changes#r2811596761
Is this call_timeout param ever used?
| /// when used as subscription sources. Only relevant when | ||
| /// [`allow_http_subscriptions`](Self::allow_http_subscriptions) is enabled. | ||
| /// | ||
| /// Default is 12 seconds (approximate Ethereum mainnet block time). |
There was a problem hiding this comment.
| /// Default is 12 seconds (approximate Ethereum mainnet block time). | |
| /// Default is [`DEFAULT_POLL_INTERVAL`]. |
| //! This module requires the `http-subscription` feature: | ||
| //! | ||
| //! ```toml | ||
| //! robust-provider = { version = "0.2", features = ["http-subscription"] } |
There was a problem hiding this comment.
| //! robust-provider = { version = "0.2", features = ["http-subscription"] } | |
| //! robust-provider = { version = "*", features = ["http-subscription"] } |
nit: does it make sense to use * for the version here to reduce maintenance effort? Later when actual version is bumped, it's easy to forget to update it in all documentation examples
| /// Interval between polling requests. | ||
| /// | ||
| /// Default: [`DEFAULT_POLL_INTERVAL`] (12 seconds) | ||
| pub poll_interval: Duration, | ||
|
|
||
| /// Timeout for individual RPC calls. | ||
| /// | ||
| /// Default: [`DEFAULT_CALL_TIMEOUT`] (30 seconds) | ||
| pub call_timeout: Duration, | ||
|
|
||
| /// Buffer size for the internal channel. | ||
| /// | ||
| /// Default: [`DEFAULT_BUFFER_CAPACITY`] |
There was a problem hiding this comment.
| /// Interval between polling requests. | |
| /// | |
| /// Default: [`DEFAULT_POLL_INTERVAL`] (12 seconds) | |
| pub poll_interval: Duration, | |
| /// Timeout for individual RPC calls. | |
| /// | |
| /// Default: [`DEFAULT_CALL_TIMEOUT`] (30 seconds) | |
| pub call_timeout: Duration, | |
| /// Buffer size for the internal channel. | |
| /// | |
| /// Default: [`DEFAULT_BUFFER_CAPACITY`] | |
| /// Interval between polling requests. | |
| /// | |
| /// Default: [`DEFAULT_POLL_INTERVAL`]. | |
| pub poll_interval: Duration, | |
| /// Timeout for individual RPC calls. | |
| /// | |
| /// Default: [`DEFAULT_CALL_TIMEOUT`]. | |
| pub call_timeout: Duration, | |
| /// Buffer size for the internal channel. | |
| /// | |
| /// Default: [`DEFAULT_BUFFER_CAPACITY`]. |
related to #32 (comment)
| let stream = poller.into_stream().flat_map(stream::iter); | ||
| tokio::spawn(async move { | ||
| let mut stream = std::pin::pin!(stream); |
There was a problem hiding this comment.
| let stream = poller.into_stream().flat_map(stream::iter); | |
| tokio::spawn(async move { | |
| let mut stream = std::pin::pin!(stream); | |
| let mut stream = poller.into_stream().flat_map(stream::iter); | |
| tokio::spawn(async move { |
this works too, is it necessary to pin the stream?
| #[cfg(feature = "http-subscription")] | ||
| pub use http_subscription::{ | ||
| DEFAULT_BUFFER_CAPACITY, DEFAULT_CALL_TIMEOUT, DEFAULT_POLL_INTERVAL, | ||
| Error as HttpSubscriptionError, HttpPollingSubscription, HttpSubscriptionConfig, |
There was a problem hiding this comment.
| Error as HttpSubscriptionError, HttpPollingSubscription, HttpSubscriptionConfig, | |
| Error, HttpPollingSubscription, HttpSubscriptionConfig, |
we export it as Error; lib users alias it if they so desire
| /// Timeout for validating HTTP provider reachability during reconnection | ||
| pub const HTTP_RECONNECT_VALIDATION_TIMEOUT: Duration = Duration::from_millis(150); |
| #[cfg(feature = "http-subscription")] | ||
| { | ||
| let primary_supports_pubsub = self.primary().client().pubsub_frontend().is_some(); | ||
| if primary_supports_pubsub { | ||
| return self.subscribe_blocks_ws().await; | ||
| } | ||
| return self.subscribe_blocks_http().await; | ||
| } | ||
|
|
||
| #[cfg(not(feature = "http-subscription"))] | ||
| self.subscribe_blocks_ws().await | ||
| } |
There was a problem hiding this comment.
| #[cfg(feature = "http-subscription")] | |
| { | |
| let primary_supports_pubsub = self.primary().client().pubsub_frontend().is_some(); | |
| if primary_supports_pubsub { | |
| return self.subscribe_blocks_ws().await; | |
| } | |
| return self.subscribe_blocks_http().await; | |
| } | |
| #[cfg(not(feature = "http-subscription"))] | |
| self.subscribe_blocks_ws().await | |
| } | |
| #[cfg(feature = "http-subscription")] | |
| { | |
| let primary_not_pubsub = self.primary().client().pubsub_frontend().is_none(); | |
| if primary_not_pubsub { | |
| return self.subscribe_blocks_http().await; | |
| } | |
| } | |
| self.subscribe_blocks_ws().await | |
| } |
simplified
| // Primary HTTP subscription failed, try WebSocket on fallback providers | ||
| for (fallback_idx, provider) in self.fallback_providers().iter().enumerate() { | ||
| // Try WebSocket subscription if supported | ||
| if provider.client().pubsub_frontend().is_some() { |
There was a problem hiding this comment.
Does this mean that if the primary provider fails, no fallback HTTP providers are tried? If so, isn't this a mistake?
| /// | ||
| /// * **Latency**: New blocks are detected with up to `poll_interval` delay | ||
| /// * **RPC Load**: One filter poll per interval, plus one `get_block_by_hash` per new block | ||
| pub struct HttpPollingSubscription<N: Network> { |
There was a problem hiding this comment.
Managed to refactor the implementation to avoid the need for the dedicated http_subscription module and opened a PR against your branch, please review and merge
Summary
Implements HTTP subscription support via polling, allowing HTTP providers to participate in block subscriptions alongside WebSocket providers. This addresses issue #23.
Features
watch_blocks()API witheth_newBlockFilter+eth_getFilterChangesfor efficient block pollingRobustSubscriptionAPI - callers use the samesubscribe_blocks()andrecv()interfaceUsage
Enable the
http-subscriptionfeature flag:Build with HTTP subscription support:
Breaking Changes
None. Feature is behind
http-subscriptionflag and disabled by default.Thanks to @PoulavBhowmick03 for the massive improvements to this implementation.
Closes #23