Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion multinode/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,32 @@ Manages all nodes performing node selection and load balancing, health checks an
Used to poll for new heads and finalized heads within subscriptions.

### Transaction Sender
Used to send transactions to all healthy RPCs and aggregate the results.
Used to send transactions to all healthy RPCs and aggregate the results.

## States diagram

```mermaid
graph TD
Undialed --> Dialed
Undialed --> Unreachable
Dialed --> Alive
Dialed --> InvalidChainID
Dialed --> Syncing
Dialed --> Unreachable
Alive --> OutOfSync
Alive --> Unreachable
OutOfSync --> Alive
OutOfSync --> InvalidChainID
OutOfSync --> Syncing
OutOfSync --> Unreachable
InvalidChainID --> Alive
InvalidChainID --> Syncing
InvalidChainID --> Unreachable
Syncing --> Alive
Syncing --> OutOfSync
Syncing --> InvalidChainID
Syncing --> Unreachable
Unreachable --> Dialed
Unusable:::terminal
Closed:::terminal
```
4 changes: 4 additions & 0 deletions multinode/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (c *MultiNodeConfig) PollFailureThreshold() uint32 {
return *c.MultiNode.PollFailureThreshold
}

func (c *MultiNodeConfig) PollSuccessThreshold() uint32 {
return 0 // retaining source compat for -solana; -evm sets via NodePoolConfig
}

func (c *MultiNodeConfig) PollInterval() time.Duration {
return c.MultiNode.PollInterval.Duration()
}
Expand Down
4 changes: 1 addition & 3 deletions multinode/ctx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import (
"testing"

"github.com/stretchr/testify/assert"

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
)

func TestContext(t *testing.T) {
ctx := tests.Context(t)
ctx := t.Context()
assert.False(t, CtxIsHealthCheckRequest(ctx), "expected false for test context")
ctx = CtxAddHealthCheckFlag(ctx)
assert.True(t, CtxIsHealthCheckRequest(ctx), "expected context to contain the healthcheck flag")
Expand Down
20 changes: 10 additions & 10 deletions multinode/multi_node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ func TestMultiNode_Dial(t *testing.T) {
selectionMode: NodeSelectionModeRoundRobin,
chainID: RandomID(),
})
err := mn.Start(tests.Context(t))
err := mn.Start(t.Context())
assert.ErrorContains(t, err, fmt.Sprintf("no available nodes for chain %s", mn.chainID))
})
t.Run("Fails with wrong node's chainID", func(t *testing.T) {
Expand All @@ -97,7 +97,7 @@ func TestMultiNode_Dial(t *testing.T) {
chainID: multiNodeChainID,
nodes: []Node[ID, multiNodeRPCClient]{node},
})
err := mn.Start(tests.Context(t))
err := mn.Start(t.Context())
assert.ErrorContains(t, err, fmt.Sprintf("node %s has configured chain ID %s which does not match multinode configured chain ID of %s", nodeName, nodeChainID, mn.chainID))
})
t.Run("Fails if node fails", func(t *testing.T) {
Expand All @@ -113,7 +113,7 @@ func TestMultiNode_Dial(t *testing.T) {
chainID: chainID,
nodes: []Node[ID, multiNodeRPCClient]{node},
})
err := mn.Start(tests.Context(t))
err := mn.Start(t.Context())
assert.ErrorIs(t, err, expectedError)
})

Expand All @@ -132,7 +132,7 @@ func TestMultiNode_Dial(t *testing.T) {
chainID: chainID,
nodes: []Node[ID, multiNodeRPCClient]{node1, node2},
})
err := mn.Start(tests.Context(t))
err := mn.Start(t.Context())
assert.ErrorIs(t, err, expectedError)
})
t.Run("Fails with wrong send only node's chainID", func(t *testing.T) {
Expand All @@ -151,7 +151,7 @@ func TestMultiNode_Dial(t *testing.T) {
nodes: []Node[ID, multiNodeRPCClient]{node},
sendonlys: []SendOnlyNode[ID, multiNodeRPCClient]{sendOnly},
})
err := mn.Start(tests.Context(t))
err := mn.Start(t.Context())
assert.ErrorContains(t, err, fmt.Sprintf("sendonly node %s has configured chain ID %s which does not match multinode configured chain ID of %s", sendOnlyName, sendOnlyChainID, mn.chainID))
})

Expand All @@ -178,7 +178,7 @@ func TestMultiNode_Dial(t *testing.T) {
nodes: []Node[ID, multiNodeRPCClient]{node},
sendonlys: []SendOnlyNode[ID, multiNodeRPCClient]{sendOnly1, sendOnly2},
})
err := mn.Start(tests.Context(t))
err := mn.Start(t.Context())
assert.ErrorIs(t, err, expectedError)
})
t.Run("Starts successfully with healthy nodes", func(t *testing.T) {
Expand All @@ -192,7 +192,7 @@ func TestMultiNode_Dial(t *testing.T) {
sendonlys: []SendOnlyNode[ID, multiNodeRPCClient]{newHealthySendOnly(t, chainID)},
})
servicetest.Run(t, mn)
selectedNode, err := mn.selectNode(tests.Context(t))
selectedNode, err := mn.selectNode(t.Context())
require.NoError(t, err)
assert.Equal(t, node, selectedNode)
})
Expand Down Expand Up @@ -336,7 +336,7 @@ func TestMultiNode_selectNode(t *testing.T) {
t.Parallel()
t.Run("Returns same node, if it's still healthy", func(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)
ctx := t.Context()
chainID := RandomID()
node1 := newMockNode[ID, multiNodeRPCClient](t)
node1.On("State").Return(nodeStateAlive).Once()
Expand All @@ -360,7 +360,7 @@ func TestMultiNode_selectNode(t *testing.T) {
})
t.Run("Updates node if active is not healthy", func(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)
ctx := t.Context()
chainID := RandomID()
oldBest := newMockNode[ID, multiNodeRPCClient](t)
oldBest.On("String").Return("oldBest").Maybe()
Expand All @@ -387,7 +387,7 @@ func TestMultiNode_selectNode(t *testing.T) {
})
t.Run("No active nodes - reports critical error", func(t *testing.T) {
t.Parallel()
ctx := tests.Context(t)
ctx := t.Context()
chainID := RandomID()
lggr, observedLogs := logger.TestObserved(t, zap.InfoLevel)
mn := newTestMultiNode(t, multiNodeOpts{
Expand Down
1 change: 1 addition & 0 deletions multinode/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ var errInvalidChainID = errors.New("invalid chain id")

type NodeConfig interface {
PollFailureThreshold() uint32
PollSuccessThreshold() uint32
PollInterval() time.Duration
SelectionMode() string
SyncThreshold() uint32
Expand Down
51 changes: 49 additions & 2 deletions multinode/node_lifecycle.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,10 @@ func (n *node[CHAIN_ID, HEAD, RPC]) aliveLoop() {
lggr.Debugw("Ping successful", "nodeState", n.State())
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
n.metrics.IncrementPollsSuccess(ctx, n.name)
pollFailures = 0
// Decay rather than reset; detects sustained failure rates above 1:1
if pollFailures > 0 {
pollFailures--
}
}
if pollFailureThreshold > 0 && pollFailures >= pollFailureThreshold {
lggr.Errorw(fmt.Sprintf("RPC endpoint failed to respond to %d consecutive polls", pollFailures), "pollFailures", pollFailures, "nodeState", n.getCachedState())
Expand Down Expand Up @@ -356,7 +359,13 @@ func (n *node[CHAIN_ID, HEAD, RPC]) isOutOfSyncWithPool() (outOfSync bool, liveN
}

if outOfSync && n.getCachedState() == nodeStateAlive {
n.lfcLog.Errorw("RPC endpoint has fallen behind", "blockNumber", localChainInfo.BlockNumber, "bestLatestBlockNumber", ci.BlockNumber, "totalDifficulty", localChainInfo.TotalDifficulty)
n.lfcLog.Errorw(
"RPC endpoint has fallen behind",
"blockNumber", localChainInfo.BlockNumber,
"bestLatestBlockNumber", ci.BlockNumber,
"totalDifficulty", localChainInfo.TotalDifficulty,
"blockDifference", localChainInfo.BlockNumber-ci.BlockNumber,
)
}
return outOfSync, ln
}
Expand Down Expand Up @@ -518,6 +527,39 @@ func (n *node[CHAIN_ID, HEAD, RPC]) outOfSyncLoop(syncIssues syncStatus) {
}
}

// probeUntilStable polls the node PollSuccessThreshold consecutive times before allowing it back into
// the alive pool. Returns true if all probes pass, false if any probe fails or ctx is cancelled.
// When threshold is 0 the probe is disabled and the function returns true immediately.
func (n *node[CHAIN_ID, HEAD, RPC]) probeUntilStable(ctx context.Context, lggr logger.Logger) bool {
threshold := n.nodePoolCfg.PollSuccessThreshold()
pollInterval := n.nodePoolCfg.PollInterval()
if threshold == 0 || pollInterval <= 0 {
return true
}
var successes uint32
for successes < threshold {
select {
case <-ctx.Done():
return false
case <-time.After(pollInterval):
}
n.metrics.IncrementPolls(ctx, n.name)
pollCtx, cancel := context.WithTimeout(ctx, pollInterval)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a config check that prevents the pollInterval from being 0? If not, in the case where PollSuccessThreshold != 0 && PollInterval == 0 this will enter an infinite failure loop.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There isn't indeed. Added it to the guard condition.

version, err := n.RPC().ClientVersion(pollCtx)
cancel()
if err != nil {
n.metrics.IncrementPollsFailed(ctx, n.name)
lggr.Warnw("Recovery probe poll failed; restarting redial", "err", err, "successesSoFar", successes, "threshold", threshold)
return false
}
n.metrics.IncrementPollsSuccess(ctx, n.name)
n.metrics.RecordNodeClientVersion(ctx, n.name, version)
successes++
lggr.Debugw("Recovery probe poll succeeded", "successes", successes, "threshold", threshold)
}
return true
}

func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
defer n.wg.Done()
ctx, cancel := n.newCtx()
Expand Down Expand Up @@ -563,6 +605,11 @@ func (n *node[CHAIN_ID, HEAD, RPC]) unreachableLoop() {
n.setState(nodeStateUnreachable)
continue
case nodeStateAlive:
if !n.probeUntilStable(ctx, lggr) {
n.rpc.Close()
n.setState(nodeStateUnreachable)
continue
}
lggr.Infow(fmt.Sprintf("Successfully redialled and verified RPC node %s. Node was offline for %s", n.String(), time.Since(unreachableAt)), "nodeState", n.getCachedState())
fallthrough
default:
Expand Down
Loading
Loading