Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion multinode/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,32 @@ Manages all nodes performing node selection and load balancing, health checks an
Used to poll for new heads and finalized heads within subscriptions.

### Transaction Sender
Used to send transactions to all healthy RPCs and aggregate the results.
Used to send transactions to all healthy RPCs and aggregate the results.

## States diagram

```mermaid
graph TD
Undialed --> Dialed
Undialed --> Unreachable
Dialed --> Alive
Dialed --> InvalidChainID
Dialed --> Syncing
Dialed --> Unreachable
Alive --> OutOfSync
Alive --> Unreachable
OutOfSync --> Alive
OutOfSync --> InvalidChainID
OutOfSync --> Syncing
OutOfSync --> Unreachable
InvalidChainID --> Alive
InvalidChainID --> Syncing
InvalidChainID --> Unreachable
Syncing --> Alive
Syncing --> OutOfSync
Syncing --> InvalidChainID
Syncing --> Unreachable
Unreachable --> Dialed
Unusable:::terminal
Closed:::terminal
```
8 changes: 8 additions & 0 deletions multinode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MultiNode struct {

// Node Configs
PollFailureThreshold *uint32
PollSuccessThreshold *uint32
PollInterval *config.Duration
SelectionMode *string
SyncThreshold *uint32
Expand Down Expand Up @@ -44,6 +45,10 @@ func (c *MultiNodeConfig) PollFailureThreshold() uint32 {
return *c.MultiNode.PollFailureThreshold
}

func (c *MultiNodeConfig) PollSuccessThreshold() uint32 {
return *c.MultiNode.PollSuccessThreshold
}

func (c *MultiNodeConfig) PollInterval() time.Duration {
return c.MultiNode.PollInterval.Duration()
}
Expand Down Expand Up @@ -103,6 +108,9 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) {
if f.MultiNode.PollFailureThreshold != nil {
c.MultiNode.PollFailureThreshold = f.MultiNode.PollFailureThreshold
}
if f.MultiNode.PollSuccessThreshold != nil {
c.MultiNode.PollSuccessThreshold = f.MultiNode.PollSuccessThreshold
}
if f.MultiNode.PollInterval != nil {
c.MultiNode.PollInterval = f.MultiNode.PollInterval
}
Expand Down
1 change: 1 addition & 0 deletions multinode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var errInvalidChainID = errors.New("invalid chain id")

type NodeConfig interface {
PollFailureThreshold() uint32
PollSuccessThreshold() uint32
PollInterval() time.Duration
SelectionMode() string
SyncThreshold() uint32
Expand Down
51 changes: 49 additions & 2 deletions multinode/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Debugw("Ping successful", "nodeState", n.State())
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
n.metrics.IncrementPollsSuccess(ctx, n.name)
pollFailures = 0
// Decay rather than reset; detects sustained failure rates above 1:1
if pollFailures > 0 {
pollFailures--
}
}
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
Expand Down Expand Up @@ -356,7 +359,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN
}

if outOfSync && n.getCachedState() == nodeStateAlive {
n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty)
n.lfcLog.Errorw(
"RPC endpoint has fallen behind",
"blockNumber", localChainInfo.BlockNumber,
"bestLatestBlockNumber", ci.BlockNumber,
"totalDifficulty", localChainInfo.TotalDifficulty,
"blockDifference", localChainInfo.BlockNumber-ci.BlockNumber,
)
}
return outOfSync, ln
}
Expand Down Expand Up @@ -518,6 +527,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
}
}

// probeUntilStable polls the node PollSuccessThreshold consecutive times before allowing it back into
// the alive pool. Returns true if all probes pass, false if any probe fails or ctx is cancelled.
// When threshold is 0 the probe is disabled and the function returns true immediately.
func (n *node[CHAIN_ID, HEAD, RPC]) probeUntilStable(ctx context.Context, lggr logger.Logger) bool {
threshold := n.nodePoolCfg.PollSuccessThreshold()
if threshold == 0 {
return true
}
pollInterval := n.nodePoolCfg.PollInterval()
var successes uint32
for successes < threshold {
select {
case <-ctx.Done():
return false
case <-time.After(pollInterval):
}
n.metrics.IncrementPolls(ctx, n.name)
pollCtx, cancel := context.WithTimeout(ctx, pollInterval)
version, err := n.RPC().ClientVersion(pollCtx)
cancel()
if err != nil {
n.metrics.IncrementPollsFailed(ctx, n.name)
lggr.Warnw("Recovery probe poll failed; restarting redial", "err", err, "successesSoFar", successes, "threshold", threshold)
return false
}
n.metrics.IncrementPollsSuccess(ctx, n.name)
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
successes++
lggr.Debugw("Recovery probe poll succeeded", "successes", successes, "threshold", threshold)
}
return true
}

func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
defer n.wg.Done()
ctx, cancel := n.newCtx()
Expand Down Expand Up @@ -563,6 +605,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
n.setState(nodeStateUnreachable)
continue
case nodeStateAlive:
if !n.probeUntilStable(ctx, lggr) {
n.rpc.Close()
n.setState(nodeStateUnreachable)
continue
}
lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState())
fallthrough
default:
Expand Down
119 changes: 116 additions & 3 deletions multinode/node_lifecycle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
tests.AssertLogEventually(t, observedLogs, "Polling disabled")
assert.Equal(t, nodeStateAlive, node.State())
})
t.Run("stays alive while below pollFailureThreshold and resets counter on success", func(t *testing.T) {
t.Run("stays alive while below pollFailureThreshold, success decrements failure count", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
Expand All @@ -132,9 +132,9 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
// stays healthy while below threshold
assert.Equal(t, nodeStateAlive, node.State())
}).Times(pollFailureThreshold - 1)
// 2. Successful call that is expected to reset counter
// 2. Successful call that is expected to decrement the counter (counter: 2 → 1)
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
// 3. Return error. If we have not reset the timer, we'll transition to nonAliveState
// 3. Return error. Counter was decremented (not reset), so it reaches 2 — still below threshold.
rpc.On("ClientVersion", mock.Anything).Return("", pollError).Once()
// 4. Once during the call, check if node is alive
var ensuredAlive atomic.Bool
Expand Down Expand Up @@ -176,6 +176,37 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) {
return nodeStateUnreachable == node.State()
})
})
t.Run("transitions to unreachable when net poll failures accumulate despite intermittent successes", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{})
const pollFailureThreshold = 3
node := newSubscribedNode(t, testNodeOpts{
config: testNodeConfig{
pollFailureThreshold: pollFailureThreshold,
pollInterval: tests.TestInterval,
},
rpc: rpc,
})
defer func() { assert.NoError(t, node.close()) }()

pollError := errors.New("failed to get ClientVersion")
// Pattern F·F·S·F·F: with the decay counter the net failure debt reaches
// threshold=3 at the 5th poll (counter: 1→2→1→2→3). With the old
// reset-on-success behaviour the counter resets to 0 at S and peaks at only
// 2 before the next success, never tripping.
rpc.On("ClientVersion", mock.Anything).Return("", pollError).Times(2)
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
rpc.On("ClientVersion", mock.Anything).Return("", pollError).Times(2)
// Unlimited successes after: ensures old code stays alive indefinitely so
// the test correctly fails (times out) when run against the old behaviour.
rpc.On("ClientVersion", mock.Anything).Return("", nil)
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe()
node.declareAlive()
tests.AssertEventually(t, func() bool {
return node.State() == nodeStateUnreachable
})
})
t.Run("with threshold poll failures, but we are the last node alive, forcibly keeps it alive", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
Expand Down Expand Up @@ -1464,6 +1495,88 @@ func TestUnit_NodeLifecycle_unreachableLoop(t *testing.T) {
return node.State() == nodeStateAlive
})
})
t.Run("with PollSuccessThreshold set, without isSyncing, node becomes alive once all probe polls succeed", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
nodeChainID := RandomID()
const pollSuccessThreshold = 2
node := newAliveNode(t, testNodeOpts{
rpc: rpc,
chainID: nodeChainID,
config: testNodeConfig{
pollSuccessThreshold: pollSuccessThreshold,
pollInterval: tests.TestInterval,
},
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("Dial", mock.Anything).Return(nil).Once()
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice()
setupRPCForAliveLoop(t, rpc)

node.declareUnreachable()
tests.AssertEventually(t, func() bool {
return node.State() == nodeStateAlive
})
})
t.Run("with PollSuccessThreshold set, node becomes alive once all probe polls succeed", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
nodeChainID := RandomID()
const pollSuccessThreshold = 2
node := newAliveNode(t, testNodeOpts{
rpc: rpc,
chainID: nodeChainID,
config: testNodeConfig{
nodeIsSyncingEnabled: true,
pollSuccessThreshold: pollSuccessThreshold,
pollInterval: tests.TestInterval,
},
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("Dial", mock.Anything).Return(nil).Once()
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
rpc.On("IsSyncing", mock.Anything).Return(false, nil)
rpc.On("ClientVersion", mock.Anything).Return("", nil).Twice()
setupRPCForAliveLoop(t, rpc)

node.declareUnreachable()
tests.AssertEventually(t, func() bool {
return node.State() == nodeStateAlive
})
})
t.Run("with PollSuccessThreshold set, probe poll failure keeps node unreachable and restarts redial", func(t *testing.T) {
t.Parallel()
rpc := newMockRPCClient[ID, Head](t)
nodeChainID := RandomID()
lggr, observedLogs := logger.TestObserved(t, zap.WarnLevel)
const pollSuccessThreshold = 2
node := newAliveNode(t, testNodeOpts{
rpc: rpc,
chainID: nodeChainID,
lggr: lggr,
config: testNodeConfig{
pollSuccessThreshold: pollSuccessThreshold,
pollInterval: tests.TestInterval,
},
})
defer func() { assert.NoError(t, node.close()) }()

rpc.On("Dial", mock.Anything).Return(nil).Once()
rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Once()
rpc.On("ClientVersion", mock.Anything).Return("", nil).Once()
rpc.On("ClientVersion", mock.Anything).Return("", errors.New("probe poll failed")).Once()
// after the probe aborts, rpc.Close() is called and the redial backoff fires again; keep failing
rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial"))
// guard: if current code (no probe) enters aliveLoop, fail the subscribe so the node returns to unreachable
rpc.On("SubscribeToHeads", mock.Anything).Return(nil, nil, errors.New("unexpected")).Maybe()

node.declareUnreachable()
tests.AssertLogEventually(t, observedLogs, "Recovery probe poll failed; restarting redial")
assert.Equal(t, nodeStateUnreachable, node.State())
})
}

func TestUnit_NodeLifecycle_invalidChainIDLoop(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions multinode/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

type testNodeConfig struct {
pollFailureThreshold uint32
pollSuccessThreshold uint32
pollInterval time.Duration
selectionMode string
syncThreshold uint32
Expand All @@ -34,6 +35,10 @@ func (n testNodeConfig) PollFailureThreshold() uint32 {
return n.pollFailureThreshold
}

func (n testNodeConfig) PollSuccessThreshold() uint32 {
return n.pollSuccessThreshold
}

func (n testNodeConfig) PollInterval() time.Duration {
return n.pollInterval
}
Expand Down
Loading