diff --git a/multinode/config/config.go b/multinode/config/config.go index b218b732..4d7d8cae 100644 --- a/multinode/config/config.go +++ b/multinode/config/config.go @@ -34,9 +34,6 @@ type MultiNode struct { FinalityDepth *uint32 FinalityTagEnabled *bool FinalizedBlockOffset *uint32 - - // Finalized State Availability Check - FinalizedStateCheckFailureThreshold *uint32 } func (c *MultiNodeConfig) Enabled() bool { @@ -97,10 +94,6 @@ func (c *MultiNodeConfig) FinalityTagEnabled() bool { return *c.MultiNode.Finali func (c *MultiNodeConfig) FinalizedBlockOffset() uint32 { return *c.MultiNode.FinalizedBlockOffset } -func (c *MultiNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { - return *c.MultiNode.FinalizedStateCheckFailureThreshold -} - func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.Enabled != nil { c.MultiNode.Enabled = f.MultiNode.Enabled @@ -157,9 +150,4 @@ func (c *MultiNodeConfig) SetFrom(f *MultiNodeConfig) { if f.MultiNode.FinalizedBlockOffset != nil { c.MultiNode.FinalizedBlockOffset = f.MultiNode.FinalizedBlockOffset } - - // Finalized State Availability Check - if f.MultiNode.FinalizedStateCheckFailureThreshold != nil { - c.MultiNode.FinalizedStateCheckFailureThreshold = f.MultiNode.FinalizedStateCheckFailureThreshold - } } diff --git a/multinode/mock_node_metrics_test.go b/multinode/mock_node_metrics_test.go index 8f5829ad..261d7cf9 100644 --- a/multinode/mock_node_metrics_test.go +++ b/multinode/mock_node_metrics_test.go @@ -21,40 +21,6 @@ func (_m *mockNodeMetrics) EXPECT() *mockNodeMetrics_Expecter { return &mockNodeMetrics_Expecter{mock: &_m.Mock} } -// IncrementFinalizedStateFailed provides a mock function with given fields: ctx, nodeName -func (_m *mockNodeMetrics) IncrementFinalizedStateFailed(ctx context.Context, nodeName string) { - _m.Called(ctx, nodeName) -} - -// mockNodeMetrics_IncrementFinalizedStateFailed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementFinalizedStateFailed' -type mockNodeMetrics_IncrementFinalizedStateFailed_Call struct { - *mock.Call -} - -// IncrementFinalizedStateFailed is a helper method to define mock.On call -// - ctx context.Context -// - nodeName string -func (_e *mockNodeMetrics_Expecter) IncrementFinalizedStateFailed(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { - return &mockNodeMetrics_IncrementFinalizedStateFailed_Call{Call: _e.mock.On("IncrementFinalizedStateFailed", ctx, nodeName)} -} - -func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Return() *mockNodeMetrics_IncrementFinalizedStateFailed_Call { - _c.Call.Return() - return _c -} - -func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { - _c.Run(run) - return _c -} - // IncrementNodeTransitionsToAlive provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToAlive(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) @@ -89,40 +55,6 @@ func (_c *mockNodeMetrics_IncrementNodeTransitionsToAlive_Call) RunAndReturn(run return _c } -// IncrementNodeTransitionsToFinalizedStateNotAvailable provides a mock function with given fields: ctx, nodeName -func (_m *mockNodeMetrics) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) { - _m.Called(ctx, nodeName) -} - -// mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementNodeTransitionsToFinalizedStateNotAvailable' -type mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call struct { - *mock.Call -} - -// IncrementNodeTransitionsToFinalizedStateNotAvailable is a helper method to define mock.On call -// - ctx context.Context -// - nodeName string -func (_e *mockNodeMetrics_Expecter) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { - return &mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call{Call: _e.mock.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", ctx, nodeName)} -} - -func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context), args[1].(string)) - }) - return _c -} - -func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Return() *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { - _c.Call.Return() - return _c -} - -func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { - _c.Run(run) - return _c -} - // IncrementNodeTransitionsToInSync provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToInSync(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) diff --git a/multinode/mock_rpc_client_test.go b/multinode/mock_rpc_client_test.go index d0b02eec..a90063e0 100644 --- a/multinode/mock_rpc_client_test.go +++ b/multinode/mock_rpc_client_test.go @@ -79,52 +79,6 @@ func (_c *mockRPCClient_ChainID_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(cont return _c } -// CheckFinalizedStateAvailability provides a mock function with given fields: ctx -func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { - ret := _m.Called(ctx) - - if len(ret) == 0 { - panic("no return value specified for CheckFinalizedStateAvailability") - } - - var r0 error - if rf, ok := ret.Get(0).(func(context.Context) error); ok { - r0 = rf(ctx) - } else { - r0 = ret.Error(0) - } - - return r0 -} - -// mockRPCClient_CheckFinalizedStateAvailability_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckFinalizedStateAvailability' -type mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID ID, HEAD Head] struct { - *mock.Call -} - -// CheckFinalizedStateAvailability is a helper method to define mock.On call -// - ctx context.Context -func (_e *mockRPCClient_Expecter[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx interface{}) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { - return &mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx)} -} - -func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Run(run func(ctx context.Context)) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(context.Context)) - }) - return _c -} - -func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) Return(_a0 error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { - _c.Call.Return(_a0) - return _c -} - -func (_c *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD]) RunAndReturn(run func(context.Context) error) *mockRPCClient_CheckFinalizedStateAvailability_Call[CHAIN_ID, HEAD] { - _c.Call.Return(run) - return _c -} - // ClientVersion provides a mock function with given fields: _a0 func (_m *mockRPCClient[CHAIN_ID, HEAD]) ClientVersion(_a0 context.Context) (string, error) { ret := _m.Called(_a0) diff --git a/multinode/node.go b/multinode/node.go index 9611829d..67294595 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -27,7 +27,6 @@ type NodeConfig interface { DeathDeclarationDelay() time.Duration NewHeadsPollInterval() time.Duration VerifyChainID() bool - FinalizedStateCheckFailureThreshold() uint32 } type ChainConfig interface { @@ -49,7 +48,6 @@ type nodeMetrics interface { IncrementNodeTransitionsToInvalidChainID(ctx context.Context, nodeName string) IncrementNodeTransitionsToUnusable(ctx context.Context, nodeName string) IncrementNodeTransitionsToSyncing(ctx context.Context, nodeName string) - IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) RecordNodeClientVersion(ctx context.Context, nodeName string, version string) SetHighestSeenBlock(ctx context.Context, nodeName string, blockNumber int64) SetHighestFinalizedBlock(ctx context.Context, nodeName string, blockNumber int64) @@ -57,7 +55,6 @@ type nodeMetrics interface { IncrementPolls(ctx context.Context, nodeName string) IncrementPollsFailed(ctx context.Context, nodeName string) IncrementPollsSuccess(ctx context.Context, nodeName string) - IncrementFinalizedStateFailed(ctx context.Context, nodeName string) } type Node[ @@ -276,7 +273,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg // The node is already closed, and any subsequent transition is invalid. // To make spotting such transitions a bit easier, return the invalid node state. return nodeStateLen - case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: + case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing: default: panic(fmt.Sprintf("cannot verify node in state %v", st)) } diff --git a/multinode/node_fsm.go b/multinode/node_fsm.go index dcb5c619..818363e7 100644 --- a/multinode/node_fsm.go +++ b/multinode/node_fsm.go @@ -35,8 +35,6 @@ func (n nodeState) String() string { return "Syncing" case nodeStateFinalizedBlockOutOfSync: return "FinalizedBlockOutOfSync" - case nodeStateFinalizedStateNotAvailable: - return "FinalizedStateNotAvailable" default: return fmt.Sprintf("nodeState(%d)", n) } @@ -74,8 +72,6 @@ const ( nodeStateSyncing // nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block nodeStateFinalizedBlockOutOfSync - // nodeStateFinalizedStateNotAvailable - node cannot serve historical state at finalized block - nodeStateFinalizedStateNotAvailable // nodeStateLen tracks the number of states nodeStateLen ) @@ -186,7 +182,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: + case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing: n.state = nodeStateAlive default: panic(transitionFail(n.state, nodeStateAlive)) @@ -270,7 +266,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) { return } switch n.state { - case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: + case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing: n.rpc.Close() n.state = nodeStateUnreachable default: @@ -292,8 +288,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) { n.declareSyncing() case nodeStateAlive: n.declareAlive() - case nodeStateFinalizedStateNotAvailable: - n.declareFinalizedStateNotAvailable() default: panic(fmt.Sprintf("%#v state declaration is not implemented", state)) } @@ -317,7 +311,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: + case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing: n.rpc.Close() n.state = nodeStateInvalidChainID default: @@ -344,7 +338,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable: + case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID: n.rpc.Close() n.state = nodeStateSyncing default: @@ -357,33 +351,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { fn() } -func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() { - n.transitionToFinalizedStateNotAvailable(func() { - n.lfcLog.Errorw("RPC Node cannot serve finalized state", "nodeState", n.state) - n.wg.Add(1) - go n.finalizedStateNotAvailableLoop() - }) -} - -func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) { - ctx, cancel := n.stopCh.NewCtx() - defer cancel() - n.metrics.IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx, n.name) - n.stateMu.Lock() - defer n.stateMu.Unlock() - if n.state == nodeStateClosed { - return - } - switch n.state { - case nodeStateAlive: - n.rpc.Close() - n.state = nodeStateFinalizedStateNotAvailable - default: - panic(transitionFail(n.state, nodeStateFinalizedStateNotAvailable)) - } - fn() -} - func transitionFail(from nodeState, to nodeState) string { return fmt.Sprintf("cannot transition from %#v to %#v", from, to) } diff --git a/multinode/node_fsm_test.go b/multinode/node_fsm_test.go index d513f306..17d312c3 100644 --- a/multinode/node_fsm_test.go +++ b/multinode/node_fsm_test.go @@ -36,7 +36,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) { t.Run("transitionToAlive", func(t *testing.T) { const destinationState = nodeStateAlive - allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} + allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing} rpc := newMockRPCClient[ID, Head](t) testTransition(t, rpc, testNode.transitionToAlive, destinationState, allowedStates...) }) @@ -56,21 +56,21 @@ func TestUnit_Node_StateTransitions(t *testing.T) { }) t.Run("transitionToUnreachable", func(t *testing.T) { const destinationState = nodeStateUnreachable - allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} + allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing} rpc := newMockRPCClient[ID, Head](t) rpc.On("Close") testTransition(t, rpc, testNode.transitionToUnreachable, destinationState, allowedStates...) }) t.Run("transitionToInvalidChain", func(t *testing.T) { const destinationState = nodeStateInvalidChainID - allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} + allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing} rpc := newMockRPCClient[ID, Head](t) rpc.On("Close") testTransition(t, rpc, testNode.transitionToInvalidChainID, destinationState, allowedStates...) }) t.Run("transitionToSyncing", func(t *testing.T) { const destinationState = nodeStateSyncing - allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable} + allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID} rpc := newMockRPCClient[ID, Head](t) rpc.On("Close") testTransition(t, rpc, testNode.transitionToSyncing, destinationState, allowedStates...) @@ -86,13 +86,6 @@ func TestUnit_Node_StateTransitions(t *testing.T) { node.transitionToSyncing(fn.Fn) }) }) - t.Run("transitionToFinalizedStateNotAvailable", func(t *testing.T) { - const destinationState = nodeStateFinalizedStateNotAvailable - allowedStates := []nodeState{nodeStateAlive} - rpc := newMockRPCClient[ID, Head](t) - rpc.On("Close") - testTransition(t, rpc, testNode.transitionToFinalizedStateNotAvailable, destinationState, allowedStates...) - }) } func testTransition(t *testing.T, rpc *mockRPCClient[ID, Head], transition func(node testNode, fn func()), destinationState nodeState, allowedStates ...nodeState) { diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index b97124d2..8fd4e9d9 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -2,7 +2,6 @@ package multinode import ( "context" - "errors" "fmt" "math" "math/big" @@ -103,15 +102,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 - // Finalized state availability check config - finalizedStateCheckFailureThreshold := n.nodePoolCfg.FinalizedStateCheckFailureThreshold() - var finalizedStateFailures uint32 - if finalizedStateCheckFailureThreshold > 0 { - lggr.Debugw("Finalized state availability check enabled", "finalizedStateCheckFailureThreshold", finalizedStateCheckFailureThreshold) - } else { - lggr.Debug("Finalized state availability check disabled") - } - for { select { case <-ctx.Done(): @@ -155,41 +145,6 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareOutOfSync(syncStatusNotInSyncWithPool) return } - // Separate finalized state availability check - if finalizedStateCheckFailureThreshold > 0 { - stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, pollInterval) - stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx) - stateCheckCancel() - if stateErr != nil { - if errors.Is(stateErr, ErrFinalizedStateUnavailable) { - if finalizedStateFailures < math.MaxUint32 { - n.metrics.IncrementFinalizedStateFailed(ctx, n.name) - finalizedStateFailures++ - } - lggr.Warnw("Finalized state not available", "err", stateErr, "failures", finalizedStateFailures, "threshold", finalizedStateCheckFailureThreshold) - if finalizedStateFailures >= finalizedStateCheckFailureThreshold { - lggr.Errorw("RPC node cannot serve finalized state after consecutive failures", "failures", finalizedStateFailures) - if n.poolInfoProvider != nil { - if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 2 && !n.isLoadBalancedRPC { - lggr.Criticalf("RPC endpoint cannot serve finalized state; %s %s", msgCannotDisable, msgDegradedState) - continue - } - } - n.declareFinalizedStateNotAvailable() - return - } - } else { - // Treat as RPC reachability error - if pollFailures < math.MaxUint32 { - n.metrics.IncrementPollsFailed(ctx, n.name) - pollFailures++ - } - lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr, "pollFailures", pollFailures) - } - } else { - finalizedStateFailures = 0 - } - } case bh, open := <-headsSub.Heads: if !open { lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) @@ -724,58 +679,3 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { } } } - -func (n *node[CHAIN_ID, HEAD, RPC]) finalizedStateNotAvailableLoop() { - defer n.wg.Done() - ctx, cancel := n.newCtx() - defer cancel() - - { - state := n.getCachedState() - switch state { - case nodeStateFinalizedStateNotAvailable: - case nodeStateClosed: - return - default: - panic(fmt.Sprintf("finalizedStateNotAvailableLoop can only run for node in FinalizedStateNotAvailable state, got: %s", state)) - } - } - - unavailableAt := time.Now() - - lggr := logger.Sugared(logger.Named(n.lfcLog, "FinalizedStateNotAvailable")) - lggr.Debugw("Trying to revive RPC node with unavailable finalized state", "nodeState", n.getCachedState()) - - // Need to redial since finalized-state-not-available nodes are automatically disconnected - state := n.createVerifiedConn(ctx, lggr) - if state != nodeStateAlive { - n.declareState(state) - return - } - - recheckBackoff := NewRedialBackoff() - - for { - select { - case <-ctx.Done(): - return - case <-time.After(recheckBackoff.Duration()): - stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, n.nodePoolCfg.PollInterval()) - stateErr := n.RPC().CheckFinalizedStateAvailability(stateCheckCtx) - stateCheckCancel() - if stateErr != nil { - if errors.Is(stateErr, ErrFinalizedStateUnavailable) { - lggr.Warnw("Finalized state still not available", "err", stateErr) - continue - } - lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr) - n.declareUnreachable() - return - } - - lggr.Infow(fmt.Sprintf("Successfully verified RPC node %s. Finalized state was unavailable for %s", n.String(), time.Since(unavailableAt)), "nodeState", n.getCachedState()) - n.declareAlive() - return - } - } -} diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index 568a35a3..f1eb250f 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -1,7 +1,6 @@ package multinode import ( - "context" "errors" "fmt" "math/big" @@ -10,7 +9,6 @@ import ( "sync" "sync/atomic" "testing" - "time" prom "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -149,8 +147,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }).Once() // redundant call to stay in alive state rpc.On("ClientVersion", mock.Anything).Return("", nil) - // CheckFinalizedStateAvailability is called after successful polling - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertLogCountEventually(t, observedLogs, "Ping successful", 2) @@ -174,8 +170,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")).Maybe() - // CheckFinalizedStateAvailability may be called - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogCountEventually(t, observedLogs, fmt.Sprintf("Poll failure, RPC endpoint %s failed to respond properly", node.String()), pollFailureThreshold) tests.AssertEventually(t, func() bool { @@ -204,8 +198,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) - // CheckFinalizedStateAvailability may be called - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) assert.Equal(t, nodeStateAlive, node.State()) @@ -233,8 +225,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: 20}, ChainInfo{BlockNumber: 20}) pollError := errors.New("failed to get ClientVersion") rpc.On("ClientVersion", mock.Anything).Return("", pollError) - // CheckFinalizedStateAvailability may be called - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() node.declareAlive() tests.AssertLogEventually(t, observedLogs, fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailureThreshold)) tests.AssertEventually(t, func() bool { @@ -257,7 +247,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) - const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -293,7 +282,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) - const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) poolInfo := newMockPoolChainInfoProvider(t) @@ -322,7 +310,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) - const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}).Twice() poolInfo := newMockPoolChainInfoProvider(t) @@ -357,9 +344,6 @@ func TestUnit_NodeLifecycle_aliveLoop(t *testing.T) { }) defer func() { assert.NoError(t, node.close()) }() rpc.On("ClientVersion", mock.Anything).Return("", nil) - - // CheckFinalizedStateAvailability is called after successful polling - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() const mostRecentBlock = 20 rpc.On("GetInterceptedChainInfo").Return(ChainInfo{BlockNumber: mostRecentBlock}, ChainInfo{BlockNumber: 30}) node.declareAlive() @@ -2161,134 +2145,3 @@ func TestNode_State(t *testing.T) { }) } } - -func TestUnit_NodeLifecycle_finalizedStateNotAvailableLoop(t *testing.T) { - t.Parallel() - - newFinalizedStateNotAvailableNode := func(t *testing.T, opts testNodeOpts) testNode { - node := newTestNode(t, opts) - opts.rpc.On("Close").Return(nil) - node.setState(nodeStateFinalizedStateNotAvailable) - return node - } - - t.Run("returns on closed", func(t *testing.T) { - t.Parallel() - node := newTestNode(t, testNodeOpts{}) - node.setState(nodeStateClosed) - node.wg.Add(1) - node.finalizedStateNotAvailableLoop() - }) - - t.Run("on failed dial keeps trying", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - nodeChainID := RandomID() - lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) - node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ - rpc: rpc, - chainID: nodeChainID, - lggr: lggr, - }) - defer func() { assert.NoError(t, node.close()) }() - - rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")) - node.wg.Add(1) - go node.finalizedStateNotAvailableLoop() - tests.AssertLogCountEventually(t, observedLogs, "Node is unreachable", 2) - }) - - t.Run("on finalized state still unavailable keeps trying", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - nodeChainID := RandomID() - lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) - node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ - rpc: rpc, - chainID: nodeChainID, - lggr: lggr, - }) - defer func() { assert.NoError(t, node.close()) }() - - rpc.On("Dial", mock.Anything).Return(nil) - rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable)) - - node.wg.Add(1) - go node.finalizedStateNotAvailableLoop() - tests.AssertLogCountEventually(t, observedLogs, "Finalized state still not available", 2) - }) - - t.Run("on successful verification and state check becomes alive", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - nodeChainID := RandomID() - node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ - rpc: rpc, - chainID: nodeChainID, - }) - defer func() { assert.NoError(t, node.close()) }() - - rpc.On("Dial", mock.Anything).Return(nil) - rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil) - - setupRPCForAliveLoop(t, rpc) - - node.wg.Add(1) - go node.finalizedStateNotAvailableLoop() - tests.AssertEventually(t, func() bool { - return node.State() == nodeStateAlive - }) - }) - - t.Run("transitions from alive to finalizedStateNotAvailable and back", func(t *testing.T) { - t.Parallel() - rpc := newMockRPCClient[ID, Head](t) - nodeChainID := RandomID() - lggr, observedLogs := logger.TestObserved(t, zap.ErrorLevel) - node := newTestNode(t, testNodeOpts{ - rpc: rpc, - chainID: nodeChainID, - lggr: lggr, - config: testNodeConfig{ - pollInterval: 10 * time.Millisecond, - finalizedStateCheckFailureThreshold: 2, - }, - }) - defer func() { assert.NoError(t, node.close()) }() - - rpc.On("Close").Return(nil) - rpc.On("Dial", mock.Anything).Return(nil) - rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) - - sub := newMockSubscription(t) - sub.On("Err").Return(nil).Maybe() - sub.On("Unsubscribe").Maybe() - headsCh := make(chan Head) - rpc.On("SubscribeToHeads", mock.Anything).Return((<-chan Head)(headsCh), sub, nil).Maybe() - rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return(make(<-chan Head), sub, nil).Maybe() - rpc.On("SetAliveLoopSub", mock.Anything).Maybe() - rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Maybe() - rpc.On("ClientVersion", mock.Anything).Return("test-version", nil).Maybe() - - var stateCheckCallCount int32 - rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(func(ctx context.Context) error { - count := atomic.AddInt32(&stateCheckCallCount, 1) - if count <= 2 { - return fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable) - } - return nil - }).Maybe() - - node.setState(nodeStateAlive) - node.wg.Add(1) - go node.aliveLoop() - - tests.AssertLogEventually(t, observedLogs, "RPC Node cannot serve finalized state") - - tests.AssertEventually(t, func() bool { - return node.State() == nodeStateAlive - }) - }) -} diff --git a/multinode/node_test.go b/multinode/node_test.go index 440209c3..e3c8d712 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -15,16 +15,15 @@ import ( ) type testNodeConfig struct { - pollFailureThreshold uint32 - pollInterval time.Duration - selectionMode string - syncThreshold uint32 - nodeIsSyncingEnabled bool - enforceRepeatableRead bool - finalizedBlockPollInterval time.Duration - deathDeclarationDelay time.Duration - newHeadsPollInterval time.Duration - finalizedStateCheckFailureThreshold uint32 + pollFailureThreshold uint32 + pollInterval time.Duration + selectionMode string + syncThreshold uint32 + nodeIsSyncingEnabled bool + enforceRepeatableRead bool + finalizedBlockPollInterval time.Duration + deathDeclarationDelay time.Duration + newHeadsPollInterval time.Duration } func (n testNodeConfig) NewHeadsPollInterval() time.Duration { @@ -67,10 +66,6 @@ func (n testNodeConfig) VerifyChainID() bool { return true } -func (n testNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { - return n.finalizedStateCheckFailureThreshold -} - type testNode struct { *node[ID, Head, RPCClient[ID, Head]] } @@ -134,14 +129,12 @@ func makeMockNodeMetrics(t *testing.T) *mockNodeMetrics { mockMetrics.On("IncrementNodeTransitionsToInvalidChainID", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToUnusable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToSyncing", mock.Anything, mock.Anything).Maybe() - mockMetrics.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestSeenBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestFinalizedBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementSeenBlocks", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPolls", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsSuccess", mock.Anything, mock.Anything).Maybe() - mockMetrics.On("IncrementFinalizedStateFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("RecordNodeClientVersion", mock.Anything, mock.Anything, mock.Anything).Maybe() return mockMetrics } diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index 39dde68a..91362add 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -331,10 +331,3 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse defer m.chainInfoLock.RUnlock() return m.latestChainInfo, m.highestUserObservations } - -// CheckFinalizedStateAvailability provides a default no-op implementation for the RPCClient interface. -// Chain-specific RPC clients can override this method to verify that the RPC can serve -// historical state at the finalized block (e.g., by calling eth_getBalance at the finalized block). -func (m *RPCClientBase[HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { - return nil -} diff --git a/multinode/types.go b/multinode/types.go index 19055d57..6400b2de 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -2,15 +2,10 @@ package multinode import ( "context" - "errors" "fmt" "math/big" ) -// ErrFinalizedStateUnavailable is returned by CheckFinalizedStateAvailability when the RPC -// cannot serve historical state at the finalized block (e.g., pruned/non-archive node). -var ErrFinalizedStateUnavailable = errors.New("finalized state unavailable") - // ID represents the base type, for any chain's ID. // It should be convertible to a string, that can uniquely identify this chain type ID fmt.Stringer @@ -82,11 +77,6 @@ type RPCClient[ // Ensure implementation does not have a race condition when values are reset before request completion and as // a result latest ChainInfo contains information from the previous cycle. GetInterceptedChainInfo() (latest, highestUserObservations ChainInfo) - // CheckFinalizedStateAvailability - verifies if the RPC can serve historical state at the finalized block. - // This is used to detect non-archive nodes that cannot serve state queries for older blocks. - // Returns ErrFinalizedStateUnavailable if the RPC cannot serve historical state. - // Returns nil if the check passes or is not applicable, or another error for RPC issues. - CheckFinalizedStateAvailability(ctx context.Context) error } // Head is the interface required by the NodeClient