diff --git a/multinode/.mockery.yaml b/multinode/.mockery.yaml index 91aef66..390128f 100644 --- a/multinode/.mockery.yaml +++ b/multinode/.mockery.yaml @@ -22,3 +22,4 @@ packages: nodeMetrics: sendOnlyNodeMetrics: transactionSenderMetrics: + FinalizedStateChecker: diff --git a/multinode/mock_finalized_state_checker_test.go b/multinode/mock_finalized_state_checker_test.go new file mode 100644 index 0000000..3dad194 --- /dev/null +++ b/multinode/mock_finalized_state_checker_test.go @@ -0,0 +1,82 @@ +// Code generated by mockery v2.53.0. DO NOT EDIT. + +package multinode + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// mockFinalizedStateChecker is an autogenerated mock type for the FinalizedStateChecker type +type mockFinalizedStateChecker struct { + mock.Mock +} + +type mockFinalizedStateChecker_Expecter struct { + mock *mock.Mock +} + +func (_m *mockFinalizedStateChecker) EXPECT() *mockFinalizedStateChecker_Expecter { + return &mockFinalizedStateChecker_Expecter{mock: &_m.Mock} +} + +// CheckFinalizedStateAvailability provides a mock function with given fields: ctx +func (_m *mockFinalizedStateChecker) CheckFinalizedStateAvailability(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for CheckFinalizedStateAvailability") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CheckFinalizedStateAvailability' +type mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call struct { + *mock.Call +} + +// CheckFinalizedStateAvailability is a helper method to define mock.On call +// - ctx context.Context +func (_e *mockFinalizedStateChecker_Expecter) CheckFinalizedStateAvailability(ctx interface{}) *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call { + return &mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call{Call: _e.mock.On("CheckFinalizedStateAvailability", ctx)} +} + +func (_c *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call) Run(run func(ctx context.Context)) *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call) Return(_a0 error) *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call) RunAndReturn(run func(context.Context) error) *mockFinalizedStateChecker_CheckFinalizedStateAvailability_Call { + _c.Call.Return(run) + return _c +} + +// newMockFinalizedStateChecker creates a new instance of mockFinalizedStateChecker. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func newMockFinalizedStateChecker(t interface { + mock.TestingT + Cleanup(func()) +}) *mockFinalizedStateChecker { + mock := &mockFinalizedStateChecker{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/multinode/mock_node_metrics_test.go b/multinode/mock_node_metrics_test.go index 261d7cf..8f5829a 100644 --- a/multinode/mock_node_metrics_test.go +++ b/multinode/mock_node_metrics_test.go @@ -21,6 +21,40 @@ func (_m *mockNodeMetrics) EXPECT() *mockNodeMetrics_Expecter { return &mockNodeMetrics_Expecter{mock: &_m.Mock} } +// IncrementFinalizedStateFailed provides a mock function with given fields: ctx, nodeName +func (_m *mockNodeMetrics) IncrementFinalizedStateFailed(ctx context.Context, nodeName string) { + _m.Called(ctx, nodeName) +} + +// mockNodeMetrics_IncrementFinalizedStateFailed_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementFinalizedStateFailed' +type mockNodeMetrics_IncrementFinalizedStateFailed_Call struct { + *mock.Call +} + +// IncrementFinalizedStateFailed is a helper method to define mock.On call +// - ctx context.Context +// - nodeName string +func (_e *mockNodeMetrics_Expecter) IncrementFinalizedStateFailed(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + return &mockNodeMetrics_IncrementFinalizedStateFailed_Call{Call: _e.mock.On("IncrementFinalizedStateFailed", ctx, nodeName)} +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) Return() *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Call.Return() + return _c +} + +func (_c *mockNodeMetrics_IncrementFinalizedStateFailed_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementFinalizedStateFailed_Call { + _c.Run(run) + return _c +} + // IncrementNodeTransitionsToAlive provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToAlive(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) @@ -55,6 +89,40 @@ func (_c *mockNodeMetrics_IncrementNodeTransitionsToAlive_Call) RunAndReturn(run return _c } +// IncrementNodeTransitionsToFinalizedStateNotAvailable provides a mock function with given fields: ctx, nodeName +func (_m *mockNodeMetrics) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) { + _m.Called(ctx, nodeName) +} + +// mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'IncrementNodeTransitionsToFinalizedStateNotAvailable' +type mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call struct { + *mock.Call +} + +// IncrementNodeTransitionsToFinalizedStateNotAvailable is a helper method to define mock.On call +// - ctx context.Context +// - nodeName string +func (_e *mockNodeMetrics_Expecter) IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx interface{}, nodeName interface{}) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + return &mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call{Call: _e.mock.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", ctx, nodeName)} +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Run(run func(ctx context.Context, nodeName string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) Return() *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Call.Return() + return _c +} + +func (_c *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call) RunAndReturn(run func(context.Context, string)) *mockNodeMetrics_IncrementNodeTransitionsToFinalizedStateNotAvailable_Call { + _c.Run(run) + return _c +} + // IncrementNodeTransitionsToInSync provides a mock function with given fields: ctx, nodeName func (_m *mockNodeMetrics) IncrementNodeTransitionsToInSync(ctx context.Context, nodeName string) { _m.Called(ctx, nodeName) diff --git a/multinode/mock_rpc_client_ext_test.go b/multinode/mock_rpc_client_ext_test.go new file mode 100644 index 0000000..f8a3397 --- /dev/null +++ b/multinode/mock_rpc_client_ext_test.go @@ -0,0 +1,24 @@ +package multinode + +import ( + "context" +) + +// CheckFinalizedStateAvailability extends mockRPCClient to also satisfy FinalizedStateChecker, +// allowing the type assertion any(n.rpc).(FinalizedStateChecker) to succeed in tests. +func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for CheckFinalizedStateAvailability") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} diff --git a/multinode/node.go b/multinode/node.go index 6729459..db45aae 100644 --- a/multinode/node.go +++ b/multinode/node.go @@ -37,6 +37,16 @@ type ChainConfig interface { FinalizedBlockOffset() uint32 } +// FinalizedStateCheckConfig is an optional interface for enabling finalized state availability checking. +type FinalizedStateCheckConfig interface { + FinalizedStateCheckFailureThreshold() uint32 +} + +// FinalizedStateChecker is an optional interface for RPCClients that support finalized state checks. +type FinalizedStateChecker interface { + CheckFinalizedStateAvailability(ctx context.Context) error +} + type nodeMetrics interface { IncrementNodeVerifies(ctx context.Context, nodeName string) IncrementNodeVerifiesFailed(ctx context.Context, nodeName string) @@ -48,6 +58,7 @@ type nodeMetrics interface { IncrementNodeTransitionsToInvalidChainID(ctx context.Context, nodeName string) IncrementNodeTransitionsToUnusable(ctx context.Context, nodeName string) IncrementNodeTransitionsToSyncing(ctx context.Context, nodeName string) + IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string) RecordNodeClientVersion(ctx context.Context, nodeName string, version string) SetHighestSeenBlock(ctx context.Context, nodeName string, blockNumber int64) SetHighestFinalizedBlock(ctx context.Context, nodeName string, blockNumber int64) @@ -55,6 +66,7 @@ type nodeMetrics interface { IncrementPolls(ctx context.Context, nodeName string) IncrementPollsFailed(ctx context.Context, nodeName string) IncrementPollsSuccess(ctx context.Context, nodeName string) + IncrementFinalizedStateFailed(ctx context.Context, nodeName string) } type Node[ @@ -273,7 +285,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg // The node is already closed, and any subsequent transition is invalid. // To make spotting such transitions a bit easier, return the invalid node state. return nodeStateLen - case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: default: panic(fmt.Sprintf("cannot verify node in state %v", st)) } diff --git a/multinode/node_fsm.go b/multinode/node_fsm.go index 818363e..dcb5c61 100644 --- a/multinode/node_fsm.go +++ b/multinode/node_fsm.go @@ -35,6 +35,8 @@ func (n nodeState) String() string { return "Syncing" case nodeStateFinalizedBlockOutOfSync: return "FinalizedBlockOutOfSync" + case nodeStateFinalizedStateNotAvailable: + return "FinalizedStateNotAvailable" default: return fmt.Sprintf("nodeState(%d)", n) } @@ -72,6 +74,8 @@ const ( nodeStateSyncing // nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block nodeStateFinalizedBlockOutOfSync + // nodeStateFinalizedStateNotAvailable - node cannot serve historical state at finalized block + nodeStateFinalizedStateNotAvailable // nodeStateLen tracks the number of states nodeStateLen ) @@ -182,7 +186,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: n.state = nodeStateAlive default: panic(transitionFail(n.state, nodeStateAlive)) @@ -266,7 +270,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) { return } switch n.state { - case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing: + case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: n.rpc.Close() n.state = nodeStateUnreachable default: @@ -288,6 +292,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) { n.declareSyncing() case nodeStateAlive: n.declareAlive() + case nodeStateFinalizedStateNotAvailable: + n.declareFinalizedStateNotAvailable() default: panic(fmt.Sprintf("%#v state declaration is not implemented", state)) } @@ -311,7 +317,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing: + case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable: n.rpc.Close() n.state = nodeStateInvalidChainID default: @@ -338,7 +344,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { return } switch n.state { - case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID: + case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable: n.rpc.Close() n.state = nodeStateSyncing default: @@ -351,6 +357,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) { fn() } +func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() { + n.transitionToFinalizedStateNotAvailable(func() { + n.lfcLog.Errorw("RPC Node cannot serve finalized state", "nodeState", n.state) + n.wg.Add(1) + go n.finalizedStateNotAvailableLoop() + }) +} + +func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) { + ctx, cancel := n.stopCh.NewCtx() + defer cancel() + n.metrics.IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx, n.name) + n.stateMu.Lock() + defer n.stateMu.Unlock() + if n.state == nodeStateClosed { + return + } + switch n.state { + case nodeStateAlive: + n.rpc.Close() + n.state = nodeStateFinalizedStateNotAvailable + default: + panic(transitionFail(n.state, nodeStateFinalizedStateNotAvailable)) + } + fn() +} + func transitionFail(from nodeState, to nodeState) string { return fmt.Sprintf("cannot transition from %#v to %#v", from, to) } diff --git a/multinode/node_fsm_test.go b/multinode/node_fsm_test.go index 17d312c..d513f30 100644 --- a/multinode/node_fsm_test.go +++ b/multinode/node_fsm_test.go @@ -36,7 +36,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) { t.Run("transitionToAlive", func(t *testing.T) { const destinationState = nodeStateAlive - allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing} + allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} rpc := newMockRPCClient[ID, Head](t) testTransition(t, rpc, testNode.transitionToAlive, destinationState, allowedStates...) }) @@ -56,21 +56,21 @@ func TestUnit_Node_StateTransitions(t *testing.T) { }) t.Run("transitionToUnreachable", func(t *testing.T) { const destinationState = nodeStateUnreachable - allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing} + allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} rpc := newMockRPCClient[ID, Head](t) rpc.On("Close") testTransition(t, rpc, testNode.transitionToUnreachable, destinationState, allowedStates...) }) t.Run("transitionToInvalidChain", func(t *testing.T) { const destinationState = nodeStateInvalidChainID - allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing} + allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable} rpc := newMockRPCClient[ID, Head](t) rpc.On("Close") testTransition(t, rpc, testNode.transitionToInvalidChainID, destinationState, allowedStates...) }) t.Run("transitionToSyncing", func(t *testing.T) { const destinationState = nodeStateSyncing - allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID} + allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable} rpc := newMockRPCClient[ID, Head](t) rpc.On("Close") testTransition(t, rpc, testNode.transitionToSyncing, destinationState, allowedStates...) @@ -86,6 +86,13 @@ func TestUnit_Node_StateTransitions(t *testing.T) { node.transitionToSyncing(fn.Fn) }) }) + t.Run("transitionToFinalizedStateNotAvailable", func(t *testing.T) { + const destinationState = nodeStateFinalizedStateNotAvailable + allowedStates := []nodeState{nodeStateAlive} + rpc := newMockRPCClient[ID, Head](t) + rpc.On("Close") + testTransition(t, rpc, testNode.transitionToFinalizedStateNotAvailable, destinationState, allowedStates...) + }) } func testTransition(t *testing.T, rpc *mockRPCClient[ID, Head], transition func(node testNode, fn func()), destinationState nodeState, allowedStates ...nodeState) { diff --git a/multinode/node_lifecycle.go b/multinode/node_lifecycle.go index 8fd4e9d..65e999a 100644 --- a/multinode/node_lifecycle.go +++ b/multinode/node_lifecycle.go @@ -2,6 +2,7 @@ package multinode import ( "context" + "errors" "fmt" "math" "math/big" @@ -102,6 +103,20 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { localHighestChainInfo, _ := n.rpc.GetInterceptedChainInfo() var pollFailures uint32 + // Finalized state availability check via optional interfaces + var finalizedStateCheckFailureThreshold uint32 + finalizedStateCfg, hasFinalizedStateCfg := n.nodePoolCfg.(FinalizedStateCheckConfig) + _, hasFinalizedStateChecker := any(n.rpc).(FinalizedStateChecker) + if hasFinalizedStateCfg && hasFinalizedStateChecker { + finalizedStateCheckFailureThreshold = finalizedStateCfg.FinalizedStateCheckFailureThreshold() + } + var finalizedStateFailures uint32 + if finalizedStateCheckFailureThreshold > 0 { + lggr.Debugw("Finalized state availability check enabled", "finalizedStateCheckFailureThreshold", finalizedStateCheckFailureThreshold) + } else { + lggr.Debug("Finalized state availability check disabled") + } + for { select { case <-ctx.Done(): @@ -145,6 +160,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() { n.declareOutOfSync(syncStatusNotInSyncWithPool) return } + if finalizedStateCheckFailureThreshold > 0 { + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, pollInterval) + stateErr := any(n.rpc).(FinalizedStateChecker).CheckFinalizedStateAvailability(stateCheckCtx) + stateCheckCancel() + if stateErr != nil { + if errors.Is(stateErr, ErrFinalizedStateUnavailable) { + if finalizedStateFailures < math.MaxUint32 { + n.metrics.IncrementFinalizedStateFailed(ctx, n.name) + finalizedStateFailures++ + } + lggr.Warnw("Finalized state not available", "err", stateErr, "failures", finalizedStateFailures, "threshold", finalizedStateCheckFailureThreshold) + if finalizedStateFailures >= finalizedStateCheckFailureThreshold { + lggr.Errorw("RPC node cannot serve finalized state after consecutive failures", "failures", finalizedStateFailures) + if n.poolInfoProvider != nil { + if l, _ := n.poolInfoProvider.LatestChainInfo(n.name); l < 2 && !n.isLoadBalancedRPC { + lggr.Criticalf("RPC endpoint cannot serve finalized state; %s %s", msgCannotDisable, msgDegradedState) + continue + } + } + n.declareFinalizedStateNotAvailable() + return + } + } else { + if pollFailures < math.MaxUint32 { + n.metrics.IncrementPollsFailed(ctx, n.name) + pollFailures++ + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr, "pollFailures", pollFailures) + } + } else { + finalizedStateFailures = 0 + } + } case bh, open := <-headsSub.Heads: if !open { lggr.Errorw("Subscription channel unexpectedly closed", "nodeState", n.getCachedState()) @@ -679,3 +727,64 @@ func (n *node[CHAIN_ID, HEAD, RPC]) syncingLoop() { } } } + +func (n *node[CHAIN_ID, HEAD, RPC]) finalizedStateNotAvailableLoop() { + defer n.wg.Done() + ctx, cancel := n.newCtx() + defer cancel() + + { + state := n.getCachedState() + switch state { + case nodeStateFinalizedStateNotAvailable: + case nodeStateClosed: + return + default: + panic(fmt.Sprintf("finalizedStateNotAvailableLoop can only run for node in FinalizedStateNotAvailable state, got: %s", state)) + } + } + + unavailableAt := time.Now() + + lggr := logger.Sugared(logger.Named(n.lfcLog, "FinalizedStateNotAvailable")) + lggr.Debugw("Trying to revive RPC node with unavailable finalized state", "nodeState", n.getCachedState()) + + state := n.createVerifiedConn(ctx, lggr) + if state != nodeStateAlive { + n.declareState(state) + return + } + + checker, ok := any(n.rpc).(FinalizedStateChecker) + if !ok { + lggr.Infow("RPC does not implement FinalizedStateChecker, transitioning to alive") + n.declareAlive() + return + } + + recheckBackoff := NewRedialBackoff() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(recheckBackoff.Duration()): + stateCheckCtx, stateCheckCancel := context.WithTimeout(ctx, n.nodePoolCfg.PollInterval()) + stateErr := checker.CheckFinalizedStateAvailability(stateCheckCtx) + stateCheckCancel() + if stateErr != nil { + if errors.Is(stateErr, ErrFinalizedStateUnavailable) { + lggr.Warnw("Finalized state still not available", "err", stateErr) + continue + } + lggr.Warnw("Finalized state check failed with RPC error", "err", stateErr) + n.declareUnreachable() + return + } + + lggr.Infow(fmt.Sprintf("Successfully verified RPC node %s. Finalized state was unavailable for %s", n.String(), time.Since(unavailableAt)), "nodeState", n.getCachedState()) + n.declareAlive() + return + } + } +} diff --git a/multinode/node_lifecycle_test.go b/multinode/node_lifecycle_test.go index f1eb250..53735b1 100644 --- a/multinode/node_lifecycle_test.go +++ b/multinode/node_lifecycle_test.go @@ -9,6 +9,7 @@ import ( "sync" "sync/atomic" "testing" + "time" prom "github.com/prometheus/client_model/go" "github.com/stretchr/testify/assert" @@ -2145,3 +2146,136 @@ func TestNode_State(t *testing.T) { }) } } + +func TestUnit_NodeLifecycle_finalizedStateNotAvailableLoop(t *testing.T) { + t.Parallel() + + newFinalizedStateNotAvailableNode := func(t *testing.T, opts testNodeOpts) testNode { + node := newTestNode(t, opts) + opts.rpc.On("Close").Return(nil) + node.setState(nodeStateFinalizedStateNotAvailable) + return node + } + + t.Run("returns on closed", func(t *testing.T) { + t.Parallel() + node := newTestNode(t, testNodeOpts{}) + node.setState(nodeStateClosed) + node.wg.Add(1) + node.finalizedStateNotAvailableLoop() + }) + + t.Run("on failed dial keeps trying", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(errors.New("failed to dial")) + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertLogCountEventually(t, observedLogs, "Node is unreachable", 2) + }) + + t.Run("on finalized state still unavailable keeps trying", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.DebugLevel) + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable)) + + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertLogCountEventually(t, observedLogs, "Finalized state still not available", 2) + }) + + t.Run("on successful verification and state check becomes alive", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + node := newFinalizedStateNotAvailableNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Dial", mock.Anything).Return(nil) + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil) + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil) + + setupRPCForAliveLoop(t, rpc) + + node.wg.Add(1) + go node.finalizedStateNotAvailableLoop() + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) + + t.Run("transitions from alive to finalizedStateNotAvailable and back", func(t *testing.T) { + t.Parallel() + rpc := newMockRPCClient[ID, Head](t) + nodeChainID := RandomID() + lggr, observedLogs := logger.TestObserved(t, zap.ErrorLevel) + node := newTestNode(t, testNodeOpts{ + rpc: rpc, + chainID: nodeChainID, + lggr: lggr, + config: testNodeConfig{ + pollInterval: 10 * time.Millisecond, + finalizedStateCheckFailureThreshold: 2, + }, + }) + defer func() { assert.NoError(t, node.close()) }() + + rpc.On("Close").Return(nil) + rpc.On("Dial", mock.Anything).Return(nil).Maybe() + rpc.On("ChainID", mock.Anything).Return(nodeChainID, nil).Maybe() + + sub := newMockSubscription(t) + sub.On("Err").Return(nil).Maybe() + sub.On("Unsubscribe").Maybe() + headsCh := make(chan Head) + rpc.On("SubscribeToHeads", mock.Anything).Return((<-chan Head)(headsCh), sub, nil).Maybe() + rpc.On("SubscribeToFinalizedHeads", mock.Anything).Return((<-chan Head)(headsCh), sub, nil).Maybe() + rpc.On("GetInterceptedChainInfo").Return(ChainInfo{}, ChainInfo{}).Maybe() + rpc.On("ClientVersion", mock.Anything).Return("", nil).Maybe() + + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return( + fmt.Errorf("%w: missing trie node", ErrFinalizedStateUnavailable), + ).Times(3) + + poolInfo := newMockPoolChainInfoProvider(t) + poolInfo.On("LatestChainInfo", mock.Anything).Return(5, ChainInfo{}).Maybe() + poolInfo.On("HighestUserObservations").Return(ChainInfo{}).Maybe() + node.SetPoolChainInfoProvider(poolInfo) + + node.setState(nodeStateDialed) + node.declareAlive() + tests.AssertLogEventually(t, observedLogs, "RPC node cannot serve finalized state after consecutive failures") + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateFinalizedStateNotAvailable + }) + + rpc.On("CheckFinalizedStateAvailability", mock.Anything).Return(nil).Maybe() + + tests.AssertEventually(t, func() bool { + return node.State() == nodeStateAlive + }) + }) +} diff --git a/multinode/node_test.go b/multinode/node_test.go index e3c8d71..440209c 100644 --- a/multinode/node_test.go +++ b/multinode/node_test.go @@ -15,15 +15,16 @@ import ( ) type testNodeConfig struct { - pollFailureThreshold uint32 - pollInterval time.Duration - selectionMode string - syncThreshold uint32 - nodeIsSyncingEnabled bool - enforceRepeatableRead bool - finalizedBlockPollInterval time.Duration - deathDeclarationDelay time.Duration - newHeadsPollInterval time.Duration + pollFailureThreshold uint32 + pollInterval time.Duration + selectionMode string + syncThreshold uint32 + nodeIsSyncingEnabled bool + enforceRepeatableRead bool + finalizedBlockPollInterval time.Duration + deathDeclarationDelay time.Duration + newHeadsPollInterval time.Duration + finalizedStateCheckFailureThreshold uint32 } func (n testNodeConfig) NewHeadsPollInterval() time.Duration { @@ -66,6 +67,10 @@ func (n testNodeConfig) VerifyChainID() bool { return true } +func (n testNodeConfig) FinalizedStateCheckFailureThreshold() uint32 { + return n.finalizedStateCheckFailureThreshold +} + type testNode struct { *node[ID, Head, RPCClient[ID, Head]] } @@ -129,12 +134,14 @@ func makeMockNodeMetrics(t *testing.T) *mockNodeMetrics { mockMetrics.On("IncrementNodeTransitionsToInvalidChainID", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToUnusable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementNodeTransitionsToSyncing", mock.Anything, mock.Anything).Maybe() + mockMetrics.On("IncrementNodeTransitionsToFinalizedStateNotAvailable", mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestSeenBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("SetHighestFinalizedBlock", mock.Anything, mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementSeenBlocks", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPolls", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("IncrementPollsSuccess", mock.Anything, mock.Anything).Maybe() + mockMetrics.On("IncrementFinalizedStateFailed", mock.Anything, mock.Anything).Maybe() mockMetrics.On("RecordNodeClientVersion", mock.Anything, mock.Anything, mock.Anything).Maybe() return mockMetrics } diff --git a/multinode/rpc_client_base.go b/multinode/rpc_client_base.go index 91362ad..5d118c8 100644 --- a/multinode/rpc_client_base.go +++ b/multinode/rpc_client_base.go @@ -331,3 +331,10 @@ func (m *RPCClientBase[HEAD]) GetInterceptedChainInfo() (latest, highestUserObse defer m.chainInfoLock.RUnlock() return m.latestChainInfo, m.highestUserObservations } + +// CheckFinalizedStateAvailability provides a default no-op implementation for the FinalizedStateChecker interface. +// Chain-specific RPC clients can override this method to verify that the RPC can serve +// historical state at the finalized block (e.g., by calling eth_getBalance at the finalized block). +func (m *RPCClientBase[HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error { + return nil +} diff --git a/multinode/types.go b/multinode/types.go index 6400b2d..8435c86 100644 --- a/multinode/types.go +++ b/multinode/types.go @@ -2,10 +2,15 @@ package multinode import ( "context" + "errors" "fmt" "math/big" ) +// ErrFinalizedStateUnavailable is returned by CheckFinalizedStateAvailability when the RPC +// cannot serve historical state at the finalized block (e.g., pruned/non-archive node). +var ErrFinalizedStateUnavailable = errors.New("finalized state unavailable") + // ID represents the base type, for any chain's ID. // It should be convertible to a string, that can uniquely identify this chain type ID fmt.Stringer