Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions multinode/.mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,4 @@ packages:
nodeMetrics:
sendOnlyNodeMetrics:
transactionSenderMetrics:
FinalizedStateChecker:
82 changes: 82 additions & 0 deletions multinode/mock_finalized_state_checker_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions multinode/mock_node_metrics_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions multinode/mock_rpc_client_ext_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package multinode

import (
"context"
)

// CheckFinalizedStateAvailability extends mockRPCClient to also satisfy FinalizedStateChecker,
// allowing the type assertion any(n.rpc).(FinalizedStateChecker) to succeed in tests.
func (_m *mockRPCClient[CHAIN_ID, HEAD]) CheckFinalizedStateAvailability(ctx context.Context) error {
ret := _m.Called(ctx)

if len(ret) == 0 {
panic("no return value specified for CheckFinalizedStateAvailability")
}

var r0 error
if rf, ok := ret.Get(0).(func(context.Context) error); ok {
r0 = rf(ctx)
} else {
r0 = ret.Error(0)
}

return r0
}
14 changes: 13 additions & 1 deletion multinode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ type ChainConfig interface {
FinalizedBlockOffset() uint32
}

// FinalizedStateCheckConfig is an optional interface for enabling finalized state availability checking.
type FinalizedStateCheckConfig interface {
FinalizedStateCheckFailureThreshold() uint32
}

// FinalizedStateChecker is an optional interface for RPCClients that support finalized state checks.
type FinalizedStateChecker interface {
CheckFinalizedStateAvailability(ctx context.Context) error
}

type nodeMetrics interface {
IncrementNodeVerifies(ctx context.Context, nodeName string)
IncrementNodeVerifiesFailed(ctx context.Context, nodeName string)
Expand All @@ -48,13 +58,15 @@ type nodeMetrics interface {
IncrementNodeTransitionsToInvalidChainID(ctx context.Context, nodeName string)
IncrementNodeTransitionsToUnusable(ctx context.Context, nodeName string)
IncrementNodeTransitionsToSyncing(ctx context.Context, nodeName string)
IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx context.Context, nodeName string)
RecordNodeClientVersion(ctx context.Context, nodeName string, version string)
SetHighestSeenBlock(ctx context.Context, nodeName string, blockNumber int64)
SetHighestFinalizedBlock(ctx context.Context, nodeName string, blockNumber int64)
IncrementSeenBlocks(ctx context.Context, nodeName string)
IncrementPolls(ctx context.Context, nodeName string)
IncrementPollsFailed(ctx context.Context, nodeName string)
IncrementPollsSuccess(ctx context.Context, nodeName string)
IncrementFinalizedStateFailed(ctx context.Context, nodeName string)
}

type Node[
Expand Down Expand Up @@ -273,7 +285,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) verifyChainID(callerCtx context.Context, lgg
// The node is already closed, and any subsequent transition is invalid.
// To make spotting such transitions a bit easier, return the invalid node state.
return nodeStateLen
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
default:
panic(fmt.Sprintf("cannot verify node in state %v", st))
}
Expand Down
41 changes: 37 additions & 4 deletions multinode/node_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ func (n nodeState) String() string {
return "Syncing"
case nodeStateFinalizedBlockOutOfSync:
return "FinalizedBlockOutOfSync"
case nodeStateFinalizedStateNotAvailable:
return "FinalizedStateNotAvailable"
default:
return fmt.Sprintf("nodeState(%d)", n)
}
Expand Down Expand Up @@ -72,6 +74,8 @@ const (
nodeStateSyncing
// nodeStateFinalizedBlockOutOfSync - node is lagging behind on latest finalized block
nodeStateFinalizedBlockOutOfSync
// nodeStateFinalizedStateNotAvailable - node cannot serve historical state at finalized block
nodeStateFinalizedStateNotAvailable
// nodeStateLen tracks the number of states
nodeStateLen
)
Expand Down Expand Up @@ -182,7 +186,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToAlive(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing:
case nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
n.state = nodeStateAlive
default:
panic(transitionFail(n.state, nodeStateAlive))
Expand Down Expand Up @@ -266,7 +270,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToUnreachable(fn func()) {
return
}
switch n.state {
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing:
case nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
n.rpc.Close()
n.state = nodeStateUnreachable
default:
Expand All @@ -288,6 +292,8 @@ func (n *node[CHAIN_ID, HEAD, RPC]) declareState(state nodeState) {
n.declareSyncing()
case nodeStateAlive:
n.declareAlive()
case nodeStateFinalizedStateNotAvailable:
n.declareFinalizedStateNotAvailable()
default:
panic(fmt.Sprintf("%#v state declaration is not implemented", state))
}
Expand All @@ -311,7 +317,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToInvalidChainID(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing:
case nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable:
n.rpc.Close()
n.state = nodeStateInvalidChainID
default:
Expand All @@ -338,7 +344,7 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
return
}
switch n.state {
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID:
case nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable:
n.rpc.Close()
n.state = nodeStateSyncing
default:
Expand All @@ -351,6 +357,33 @@ func (n *node[CHAIN_ID, HEAD, RPC]) transitionToSyncing(fn func()) {
fn()
}

func (n *node[CHAIN_ID, HEAD, RPC]) declareFinalizedStateNotAvailable() {
n.transitionToFinalizedStateNotAvailable(func() {
n.lfcLog.Errorw("RPC Node cannot serve finalized state", "nodeState", n.state)
n.wg.Add(1)
go n.finalizedStateNotAvailableLoop()
})
}

func (n *node[CHAIN_ID, HEAD, RPC]) transitionToFinalizedStateNotAvailable(fn func()) {
ctx, cancel := n.stopCh.NewCtx()
defer cancel()
n.metrics.IncrementNodeTransitionsToFinalizedStateNotAvailable(ctx, n.name)
n.stateMu.Lock()
defer n.stateMu.Unlock()
if n.state == nodeStateClosed {
return
}
switch n.state {
case nodeStateAlive:
n.rpc.Close()
n.state = nodeStateFinalizedStateNotAvailable
default:
panic(transitionFail(n.state, nodeStateFinalizedStateNotAvailable))
}
fn()
}

func transitionFail(from nodeState, to nodeState) string {
return fmt.Sprintf("cannot transition from %#v to %#v", from, to)
}
15 changes: 11 additions & 4 deletions multinode/node_fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestUnit_Node_StateTransitions(t *testing.T) {

t.Run("transitionToAlive", func(t *testing.T) {
const destinationState = nodeStateAlive
allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing}
allowedStates := []nodeState{nodeStateDialed, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable}
rpc := newMockRPCClient[ID, Head](t)
testTransition(t, rpc, testNode.transitionToAlive, destinationState, allowedStates...)
})
Expand All @@ -56,21 +56,21 @@ func TestUnit_Node_StateTransitions(t *testing.T) {
})
t.Run("transitionToUnreachable", func(t *testing.T) {
const destinationState = nodeStateUnreachable
allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing}
allowedStates := []nodeState{nodeStateUndialed, nodeStateDialed, nodeStateAlive, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateSyncing, nodeStateFinalizedStateNotAvailable}
rpc := newMockRPCClient[ID, Head](t)
rpc.On("Close")
testTransition(t, rpc, testNode.transitionToUnreachable, destinationState, allowedStates...)
})
t.Run("transitionToInvalidChain", func(t *testing.T) {
const destinationState = nodeStateInvalidChainID
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing}
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateSyncing, nodeStateFinalizedStateNotAvailable}
rpc := newMockRPCClient[ID, Head](t)
rpc.On("Close")
testTransition(t, rpc, testNode.transitionToInvalidChainID, destinationState, allowedStates...)
})
t.Run("transitionToSyncing", func(t *testing.T) {
const destinationState = nodeStateSyncing
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID}
allowedStates := []nodeState{nodeStateDialed, nodeStateOutOfSync, nodeStateInvalidChainID, nodeStateFinalizedStateNotAvailable}
rpc := newMockRPCClient[ID, Head](t)
rpc.On("Close")
testTransition(t, rpc, testNode.transitionToSyncing, destinationState, allowedStates...)
Expand All @@ -86,6 +86,13 @@ func TestUnit_Node_StateTransitions(t *testing.T) {
node.transitionToSyncing(fn.Fn)
})
})
t.Run("transitionToFinalizedStateNotAvailable", func(t *testing.T) {
const destinationState = nodeStateFinalizedStateNotAvailable
allowedStates := []nodeState{nodeStateAlive}
rpc := newMockRPCClient[ID, Head](t)
rpc.On("Close")
testTransition(t, rpc, testNode.transitionToFinalizedStateNotAvailable, destinationState, allowedStates...)
})
}

func testTransition(t *testing.T, rpc *mockRPCClient[ID, Head], transition func(node testNode, fn func()), destinationState nodeState, allowedStates ...nodeState) {
Expand Down
Loading
Loading