From 881703369397de103436c1ee429708fa8b426f24 Mon Sep 17 00:00:00 2001 From: Vasco Figueira Date: Mon, 13 Apr 2026 15:09:06 +0100 Subject: [PATCH 1/2] DF-23489 multinode: make PollFailure hysteretic Successful polls now decrement the failure count, don't fully reset it. So that nodes (RPCs) are eventually declared unreachable if they sustain poll error rates above 1:1. --- multinode/node_lifecycle.go | 13 +++++++++-- multinode/node_lifecycle_test.go | 37 +++++++++++++++++++++++++++++--- 2 files changed, 45 insertions(+), 5 deletions(-) diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 8fd4e9d..ab0f478 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -123,7 +123,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { lggr.Debugw("Ping successful", "nodeState", n.State()) n.metrics.RecordNodeClientVersion(ctx, n.name, version) n.metrics.IncrementPollsSuccess(ctx, n.name) - pollFailures = 0 + // Decay rather than reset; detects sustained failure rates above 1:1 + if pollFailures > 0 { + pollFailures-- + } } if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold { lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState()) @@ -356,7 +359,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN } if outOfSync && n.getCachedState() == nodeStateAlive { - n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty) + n.lfcLog.Errorw( + "RPC endpoint has fallen behind", + "blockNumber", localChainInfo.BlockNumber, + "bestLatestBlockNumber", ci.BlockNumber, + "totalDifficulty", localChainInfo.TotalDifficulty, + "blockDifference", localChainInfo.BlockNumber-ci.BlockNumber, + ) } return outOfSync, ln } diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index f1eb250..1b26968 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -110,7 +110,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { tests.AssertLogEventually(t, observedLogs, "Polling disabled") assert.Equal(t, nodeStateAlive, node.State()) }) - t.Run("stays alive while below pollFailureThreshold and resets counter on success", func(t *testing.T) { + t.Run("stays alive while below pollFailureThreshold, success decrements failure count", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) @@ -132,9 +132,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { // stays healthy while below threshold assert.Equal(t, nodeStateAlive, node.State()) }).Times(pollFailureThreshold - 1) - // 2. Successful call that is expected to reset counter + // 2. Successful call that is expected to decrement the counter (counter: 2 → 1) rpc.On("ClientVersion", mock.Anything).Return("", nil).Once() - // 3. Return error. If we have not reset the timer, we'll transition to nonAliveState + // 3. Return error. Counter was decremented (not reset), so it reaches 2 — still below threshold. rpc.On("ClientVersion", mock.Anything).Return("", pollError).Once() // 4. Once during the call, check if node is alive var ensuredAlive atomic.Bool @@ -176,6 +176,37 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { return nodeStateUnreachable == node.State() }) }) + t.Run("transitions to unreachable when net poll failures accumulate despite intermittent successes", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}) + const pollFailureThreshold = 3 + node := newSubscribedNode(t, testNodeOpts{ + config: testNodeConfig{ + pollFailureThreshold: pollFailureThreshold, + pollInterval: tests.TestInterval, + }, + rpc: rpc, + }) + defer func() { assert.NoError(t, node.close()) }() + + pollError := errors.New("failed to get ClientVersion") + // Pattern F·F·S·F·F: with the decay counter the net failure debt reaches + // threshold=3 at the 5th poll (counter: 1→2→1→2→3). With the old + // reset-on-success behaviour the counter resets to 0 at S and peaks at only + // 2 before the next success, never tripping. + rpc.On("ClientVersion", mock.Anything).Return("", pollError).Times(2) + rpc.On("ClientVersion", mock.Anything).Return("", nil).Once() + rpc.On("ClientVersion", mock.Anything).Return("", pollError).Times(2) + // Unlimited successes after: ensures old code stays alive indefinitely so + // the test correctly fails (times out) when run against the old behaviour. + rpc.On("ClientVersion", mock.Anything).Return("", nil) + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() + node.declareAlive() + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateUnreachable + }) + }) t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) { t.Parallel() rpc := newMockRPCClient[ID, Head](t) From e2429621852ccfad12c0b3fdfcdccdb8ae377b4a Mon Sep 17 00:00:00 2001 From: Vasco Figueira Date: Mon, 13 Apr 2026 16:59:40 +0100 Subject: [PATCH 2/2] DF-23489 unreachable node recovery depends on successful polls --- multinode/README.md | 30 +++++++++++- multinode/config/config.go | 8 ++++ multinode/node.go | 1 + multinode/node_lifecycle.go | 38 +++++++++++++++ multinode/node_lifecycle_test.go | 82 ++++++++++++++++++++++++++++++++ multinode/node_test.go | 5 ++ 6 files changed, 163 insertions(+), 1 deletion(-) diff --git a/multinode/README.md b/multinode/README.md index 12aef2d..306b499 100644 --- a/multinode/README.md +++ b/multinode/README.md @@ -19,4 +19,32 @@ Manages all nodes performing node selection and load balancing, health checks an Used to poll for new heads and finalized heads within subscriptions. ### Transaction Sender -Used to send transactions to all healthy RPCs and aggregate the results. \ No newline at end of file +Used to send transactions to all healthy RPCs and aggregate the results. + +## States diagram + +```mermaid +graph TD + Undialed --> Dialed + Undialed --> Unreachable + Dialed --> Alive + Dialed --> InvalidChainID + Dialed --> Syncing + Dialed --> Unreachable + Alive --> OutOfSync + Alive --> Unreachable + OutOfSync --> Alive + OutOfSync --> InvalidChainID + OutOfSync --> Syncing + OutOfSync --> Unreachable + InvalidChainID --> Alive + InvalidChainID --> Syncing + InvalidChainID --> Unreachable + Syncing --> Alive + Syncing --> OutOfSync + Syncing --> InvalidChainID + Syncing --> Unreachable + Unreachable --> Dialed + Unusable:::terminal + Closed:::terminal +``` diff --git a/multinode/config/config.go b/multinode/config/config.go index 4d7d8ca..b9c2049 100644 --- a/multinode/config/config.go +++ b/multinode/config/config.go @@ -17,6 +17,7 @@ type MultiNode struct { // Node Configs PollFailureThreshold *uint32 + PollSuccessThreshold *uint32 PollInterval *config.Duration SelectionMode *string SyncThreshold *uint32 @@ -44,6 +45,10 @@ func (c *MultiNodeConfig) PollFailureThreshold() uint32 { return *c.MultiNode.PollFailureThreshold } +func (c *MultiNodeConfig) PollSuccessThreshold() uint32 { + return *c.MultiNode.PollSuccessThreshold +} + func (c *MultiNodeConfig) PollInterval() time.Duration { return c.MultiNode.PollInterval.Duration() } @@ -103,6 +108,9 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.PollFailureThreshold != nil { c.MultiNode.PollFailureThreshold = f.MultiNode.PollFailureThreshold } + if f.MultiNode.PollSuccessThreshold != nil { + c.MultiNode.PollSuccessThreshold = f.MultiNode.PollSuccessThreshold + } if f.MultiNode.PollInterval != nil { c.MultiNode.PollInterval = f.MultiNode.PollInterval } diff --git a/multinode/node.go b/multinode/node.go index 6729459..5c83545 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -18,6 +18,7 @@ var errInvalidChainID = errors.New("invalid chain id") type NodeConfig interface { PollFailureThreshold() uint32 + PollSuccessThreshold() uint32 PollInterval() time.Duration SelectionMode() string SyncThreshold() uint32 diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index ab0f478..f554697 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -527,6 +527,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) { } } +// probeUntilStable polls the node PollSuccessThreshold consecutive times before allowing it back into +// the alive pool. Returns true if all probes pass, false if any probe fails or ctx is cancelled. +// When threshold is 0 the probe is disabled and the function returns true immediately. +func (n *node[CHAIN_ID, HEAD, RPC]) probeUntilStable(ctx context.Context, lggr logger.Logger) bool { + threshold := n.nodePoolCfg.PollSuccessThreshold() + if threshold == 0 { + return true + } + pollInterval := n.nodePoolCfg.PollInterval() + var successes uint32 + for successes < threshold { + select { + case <-ctx.Done(): + return false + case <-time.After(pollInterval): + } + n.metrics.IncrementPolls(ctx, n.name) + pollCtx, cancel := context.WithTimeout(ctx, pollInterval) + version, err := n.RPC().ClientVersion(pollCtx) + cancel() + if err != nil { + n.metrics.IncrementPollsFailed(ctx, n.name) + lggr.Warnw("Recovery probe poll failed; restarting redial", "err", err, "successesSoFar", successes, "threshold", threshold) + return false + } + n.metrics.IncrementPollsSuccess(ctx, n.name) + n.metrics.RecordNodeClientVersion(ctx, n.name, version) + successes++ + lggr.Debugw("Recovery probe poll succeeded", "successes", successes, "threshold", threshold) + } + return true +} + func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() { defer n.wg.Done() ctx, cancel := n.newCtx() @@ -572,6 +605,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() { n.setState(nodeStateUnreachable) continue case nodeStateAlive: + if !n.probeUntilStable(ctx, lggr) { + n.rpc.Close() + n.setState(nodeStateUnreachable) + continue + } lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState()) fallthrough default: diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 1b26968..ff8c0d0 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -1495,6 +1495,88 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) { return node.State() == nodeStateAlive }) }) + t.Run("with PollSuccessThreshold set, without isSyncing, node becomes alive once all probe polls succeed", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + const pollSuccessThreshold = 2 + node := newAliveNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + config: testNodeConfig{ + pollSuccessThreshold: pollSuccessThreshold, + pollInterval: tests.TestInterval, + }, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil).Once() + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once() + rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice() + setupRPCForAliveLoop(t, rpc) + + node.declareUnreachable() + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) + t.Run("with PollSuccessThreshold set, node becomes alive once all probe polls succeed", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + const pollSuccessThreshold = 2 + node := newAliveNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + config: testNodeConfig{ + nodeIsSyncingEnabled: true, + pollSuccessThreshold: pollSuccessThreshold, + pollInterval: tests.TestInterval, + }, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil).Once() + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once() + rpc.On("IsSyncing", mock.Anything).Return(false, nil) + rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice() + setupRPCForAliveLoop(t, rpc) + + node.declareUnreachable() + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) + t.Run("with PollSuccessThreshold set, probe poll failure keeps node unreachable and restarts redial", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel) + const pollSuccessThreshold = 2 + node := newAliveNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + config: testNodeConfig{ + pollSuccessThreshold: pollSuccessThreshold, + pollInterval: tests.TestInterval, + }, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil).Once() + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once() + rpc.On("ClientVersion", mock.Anything).Return("", nil).Once() + rpc.On("ClientVersion", mock.Anything).Return("", errors.New("probe poll failed")).Once() + // after the probe aborts, rpc.Close() is called and the redial backoff fires again; keep failing + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")) + // guard: if current code (no probe) enters aliveLoop, fail the subscribe so the node returns to unreachable + rpc.On("SubscribeToHeads", mock.Anything).Return(nil, nil, errors.New("unexpected")).Maybe() + + node.declareUnreachable() + tests.AssertLogEventually(t, observedLogs, "Recovery probe poll failed; restarting redial") + assert.Equal(t, nodeStateUnreachable, node.State()) + }) } func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) { diff --git a/multinode/node_test.go b/multinode/node_test.go index e3c8d71..ce97b4c 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -16,6 +16,7 @@ import ( type testNodeConfig struct { pollFailureThreshold uint32 + pollSuccessThreshold uint32 pollInterval time.Duration selectionMode string syncThreshold uint32 @@ -34,6 +35,10 @@ func (n testNodeConfig) PollFailureThreshold() uint32 { return n.pollFailureThreshold } +func (n testNodeConfig) PollSuccessThreshold() uint32 { + return n.pollSuccessThreshold +} + func (n testNodeConfig) PollInterval() time.Duration { return n.pollInterval }