diff --git a/cmd/mithril/node/node.go b/cmd/mithril/node/node.go index 89a713bd..5fb1cc9f 100644 --- a/cmd/mithril/node/node.go +++ b/cmd/mithril/node/node.go @@ -1235,7 +1235,40 @@ postBootstrap: NearTipPollMs: blockNearTipPollMs, NearTipLookahead: blockNearTipLookahead, } - result := runReplayWithRecovery(ctx, accountsDb, accountsPath, manifest, resumeState, uint64(startSlot), liveEndSlot, rpcEndpoints, blockstorePath, int(txParallelism), true, useLightbringer, dbgOpts, metricsWriter, rpcServer, mithrilState, blockFetchOpts, replayStartTime) + // Build consensus options from config + consensusMaxDepth := config.GetInt("consensus.skip_path_max_depth") + if consensusMaxDepth <= 0 { + consensusMaxDepth = 64 + } + consensusPolicy := config.GetString("consensus.unresolved_policy") + if consensusPolicy == "" { + consensusPolicy = "halt" + } + switch consensusPolicy { + case "halt", "warn": + // valid + default: + mlog.Log.Errorf("invalid consensus.unresolved_policy %q (must be \"halt\" or \"warn\"), defaulting to \"halt\"", consensusPolicy) + consensusPolicy = "halt" + } + consensusEnforceSource := config.GetString("consensus.enforce_on_source") + if consensusEnforceSource == "" { + consensusEnforceSource = "lightbringer" + } + switch consensusEnforceSource { + case "lightbringer", "all": + // valid + default: + mlog.Log.Errorf("invalid consensus.enforce_on_source %q (must be \"lightbringer\" or \"all\"), defaulting to \"lightbringer\"", consensusEnforceSource) + consensusEnforceSource = "lightbringer" + } + consensusOpts := &replay.ConsensusOpts{ + SkipPathMaxDepth: consensusMaxDepth, + UnresolvedPolicy: consensusPolicy, + EnforceOnSource: consensusEnforceSource, + } + + result := runReplayWithRecovery(ctx, accountsDb, accountsPath, manifest, resumeState, uint64(startSlot), liveEndSlot, rpcEndpoints, blockstorePath, int(txParallelism), true, useLightbringer, dbgOpts, metricsWriter, rpcServer, mithrilState, blockFetchOpts, consensusOpts, replayStartTime) // Update state file with last persisted slot and shutdown context // Skip if already written during cancellation (eliminates timing window) @@ -2099,6 +2132,7 @@ func runReplayWithRecovery( rpcServer *rpcserver.RpcServer, mithrilState *state.MithrilState, blockFetchOpts *replay.BlockFetchOpts, + consensusOpts *replay.ConsensusOpts, replayStartTime time.Time, // Start time for resume context ) *replay.ReplayResult { var result *replay.ReplayResult @@ -2210,6 +2244,6 @@ func runReplayWithRecovery( } }() - result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, onCancelWriteState) + result = replay.ReplayBlocks(ctx, accountsDb, accountsDbPath, mithrilState, resumeState, startSlot, endSlot, rpcEndpoints, blockDir, txParallelism, isLive, useLightbringer, dbgOpts, metricsWriter, rpcServer, blockFetchOpts, consensusOpts, onCancelWriteState) return result } diff --git a/config.example.toml b/config.example.toml index cb6841a2..6ac3b669 100644 --- a/config.example.toml +++ b/config.example.toml @@ -208,6 +208,35 @@ name = "mithril" # num_slots = 0 # end_slot = -1 +# ============================================================================ +# [consensus] - Vote-Anchored Consensus +# ============================================================================ +# +# Controls how Mithril uses on-chain vote data to verify block correctness. +# The fork choice service accumulates vote stake per slot and determines +# which bank hash has reached 2/3 supermajority. +# +# When using Lightbringer as the block source, the consensus coordinator +# resolves ambiguous slot ranges by finding a valid skip path that chains +# to the vote-confirmed hash. If no valid path exists, the configured +# policy determines behavior. + +[consensus] + # Maximum depth (number of slots) the skip-path solver will explore. + # Longer ranges take more memory. 64 covers ~26 seconds of slots. + skip_path_max_depth = 64 + + # What to do when a Lightbringer slot range cannot be resolved: + # "halt" - Graceful shutdown, write diagnostic artifact (recommended) + # "warn" - Log warning and continue (use only for debugging) + unresolved_policy = "halt" + + # Which block source to enforce consensus on: + # "lightbringer" - Only enforce on Lightbringer blocks (RPC blocks are trusted) + # "all" - Enforce on all block sources (not yet implemented) + enforce_on_source = "lightbringer" + + # ============================================================================ # [rpc] - Mithril RPC Server # ============================================================================ diff --git a/pkg/config/config.go b/pkg/config/config.go index b4a28682..fb09701f 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -141,6 +141,13 @@ type LogConfig struct { MaxBackups int `toml:"max_backups" mapstructure:"max_backups"` // Keep up to N old log files } +// ConsensusConfig holds vote-anchored consensus configuration +type ConsensusConfig struct { + SkipPathMaxDepth int `toml:"skip_path_max_depth" mapstructure:"skip_path_max_depth"` // Max slots the skip-path solver explores (default: 64) + UnresolvedPolicy string `toml:"unresolved_policy" mapstructure:"unresolved_policy"` // "halt" or "warn" (default: "halt") + EnforceOnSource string `toml:"enforce_on_source" mapstructure:"enforce_on_source"` // "lightbringer" or "all" (default: "lightbringer") +} + // Config holds all configuration options for Mithril (Firedancer-style hierarchy) type Config struct { // Top-level (matches Firedancer style) @@ -152,6 +159,7 @@ type Config struct { Rpc RpcConfig `toml:"rpc" mapstructure:"rpc"` Replay ReplayConfig `toml:"replay" mapstructure:"replay"` Block BlockConfig `toml:"block" mapstructure:"block"` + Consensus ConsensusConfig `toml:"consensus" mapstructure:"consensus"` Snapshot SnapshotConfig `toml:"snapshot" mapstructure:"snapshot"` Development DevelopmentConfig `toml:"development" mapstructure:"development"` Reporting ReportingConfig `toml:"reporting" mapstructure:"reporting"` diff --git a/pkg/epochstakes/epoch_authorized_voters.go b/pkg/epochstakes/epoch_authorized_voters.go index dbfe061d..106c9e57 100644 --- a/pkg/epochstakes/epoch_authorized_voters.go +++ b/pkg/epochstakes/epoch_authorized_voters.go @@ -24,3 +24,13 @@ func (cache *EpochAuthorizedVotersCache) IsAuthorizedVoter(voteAcct solana.Publi } return false } + +// Entries returns the underlying map for serialization/persistence. +func (cache *EpochAuthorizedVotersCache) Entries() map[solana.PublicKey][]solana.PublicKey { + return cache.authorizedVoters +} + +// Len returns the number of vote accounts in the cache. +func (cache *EpochAuthorizedVotersCache) Len() int { + return len(cache.authorizedVoters) +} diff --git a/pkg/forkchoice/consensus_coordinator.go b/pkg/forkchoice/consensus_coordinator.go new file mode 100644 index 00000000..37e6e2c4 --- /dev/null +++ b/pkg/forkchoice/consensus_coordinator.go @@ -0,0 +1,92 @@ +package forkchoice + +import ( + "errors" + + "github.com/gagliardetto/solana-go" +) + +var ( + ErrNeedWait = errors.New("consensus: vote landing window not reached, need to wait") + ErrNoSupermajority = errors.New("consensus: no hash reached supermajority for target slot") +) + +// SlotDecision represents the resolved action for a single slot. +type SlotDecision struct { + Slot uint64 + UseBlock bool // true = use the block, false = slot is empty/skipped +} + +// ConsensusCoordinator bridges the ForkChoiceService (vote accumulation) and +// the SkipPath solver to verify bankhash-chain consistency for a slot range. +// +// For a given range [startSlot, endSlot], it: +// 1. Queries forkchoice for the vote-confirmed bankhash at endSlot +// 2. Runs the skipPath solver to find a valid chain from prevBankhash to that hash +// 3. Returns per-slot decisions (use block or skip) +type ConsensusCoordinator struct { + forkChoice *ForkChoiceService + maxDepth int + policy string // "halt" = return error on unresolved, "warn" = log and continue +} + +// NewConsensusCoordinator creates a coordinator with the given forkchoice service, +// maximum skipPath solver depth, and unresolved policy. +func NewConsensusCoordinator(fc *ForkChoiceService, maxDepth int, policy string) *ConsensusCoordinator { + return &ConsensusCoordinator{ + forkChoice: fc, + maxDepth: maxDepth, + policy: policy, + } +} + +// ResolveRange attempts to determine which slots in [startSlot, endSlot] should +// use blocks vs be treated as skipped. +// +// prevBankhash is the parent bankhash at startSlot-1 (the last executed block's bankhash). +// candidates are executed slot candidates carrying computed bankhash + parent bankhash. +// +// Returns slot decisions or an error: +// - ErrNeedWait: votes haven't landed yet, caller should retry later +// - ErrNoSupermajority: no hash reached 2/3 threshold for endSlot +// - ErrNoPath: solver couldn't find a valid chain to the target hash +// - ErrDepthExceeded: range exceeds maxDepth +func (cc *ConsensusCoordinator) ResolveRange( + startSlot, endSlot uint64, + prevBankhash solana.Hash, + candidates map[uint64]*SlotCandidate, +) ([]SlotDecision, error) { + // Query forkchoice for the vote-confirmed hash at the end slot. + targetHash, status := cc.forkChoice.GetSupermajorityHash(endSlot) + + switch status { + case BankhashNeedWait: + return nil, ErrNeedWait + case BankhashNoSupermajority: + return nil, ErrNoSupermajority + case BankhashHasSupermajority: + // Continue to solve + } + + // Run the skipPath solver to find a valid chain. + result, err := SkipPath(startSlot, endSlot, prevBankhash, candidates, targetHash, cc.maxDepth) + if err != nil { + return nil, err + } + + // Convert []bool path to []SlotDecision. + decisions := make([]SlotDecision, len(result.Path)) + for i, useBlock := range result.Path { + decisions[i] = SlotDecision{ + Slot: startSlot + uint64(i), + UseBlock: useBlock, + } + } + + return decisions, nil +} + +// Policy returns the coordinator's unresolved policy ("halt" or "warn"). +func (cc *ConsensusCoordinator) Policy() string { + return cc.policy +} diff --git a/pkg/forkchoice/consensus_coordinator_test.go b/pkg/forkchoice/consensus_coordinator_test.go new file mode 100644 index 00000000..cf77f356 --- /dev/null +++ b/pkg/forkchoice/consensus_coordinator_test.go @@ -0,0 +1,163 @@ +package forkchoice + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// newTestForkChoiceState creates a minimal forkChoiceState for testing. +// Does NOT start the service or worker pool — only the state is used. +func newTestForkChoiceState(totalStake uint64) *ForkChoiceService { + state := &forkChoiceState{ + voteStakeTotals: make(map[uint64]*slotVoteAccumulator), + totalEpochStake: totalStake, + } + return &ForkChoiceService{state: state} +} + +// injectSupermajority simulates a slot reaching supermajority by directly +// manipulating the accumulator state. This isolates coordinator tests from +// vote parsing and block submission internals. +func injectSupermajority(fc *ForkChoiceService, slot uint64, winningHash solana.Hash, stake uint64) { + acc := newSlotVoteAccumulator(fc.state.totalEpochStake, slot) + tracker := &voteStakeTracker{ + voted: make(map[solana.PublicKey]struct{}), + stake: stake, + } + acc.trackers[winningHash] = tracker + acc.confirmed = true + acc.confirmedHash = winningHash + fc.state.voteStakeTotals[slot] = acc +} + +func TestResolveRangeSuccess(t *testing.T) { + fc := newTestForkChoiceState(100) + + prevBankhash := hash(0x01) + blockHash := hash(0x02) + + // Simulate: slot 10 has a block, supermajority confirms blockHash. + injectSupermajority(fc, 10, blockHash, 70) + fc.state.latestSlotIngested = 10 + VoteConfirmationTimeoutSlots + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: blockHash, LastBlockhash: prevBankhash}, + } + + cc := NewConsensusCoordinator(fc, 64, "halt") + decisions, err := cc.ResolveRange(10, 10, prevBankhash, candidates) + require.NoError(t, err) + require.Len(t, decisions, 1) + assert.Equal(t, uint64(10), decisions[0].Slot) + assert.True(t, decisions[0].UseBlock) +} + +func TestResolveRangeNeedWait(t *testing.T) { + fc := newTestForkChoiceState(100) + + // latestSlotIngested is 0 — way before landing window for slot 10. + fc.state.latestSlotIngested = 0 + + cc := NewConsensusCoordinator(fc, 64, "halt") + _, err := cc.ResolveRange(10, 10, hash(0x01), nil) + assert.ErrorIs(t, err, ErrNeedWait) +} + +func TestResolveRangeNoSupermajority(t *testing.T) { + fc := newTestForkChoiceState(100) + + // Landing window passed but no votes at all for slot 10. + fc.state.latestSlotIngested = 10 + VoteConfirmationTimeoutSlots + + cc := NewConsensusCoordinator(fc, 64, "halt") + _, err := cc.ResolveRange(10, 10, hash(0x01), nil) + assert.ErrorIs(t, err, ErrNoSupermajority) +} + +func TestResolveRangeNoPath(t *testing.T) { + fc := newTestForkChoiceState(100) + + prevBankhash := hash(0x01) + targetHash := hash(0xFF) + + // Supermajority says targetHash won, but our candidates can't chain to it. + injectSupermajority(fc, 10, targetHash, 70) + fc.state.latestSlotIngested = 10 + VoteConfirmationTimeoutSlots + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: hash(0x02), LastBlockhash: prevBankhash}, + } + + cc := NewConsensusCoordinator(fc, 64, "halt") + _, err := cc.ResolveRange(10, 10, prevBankhash, candidates) + assert.ErrorIs(t, err, ErrNoPath) +} + +func TestResolveRangeMultiSlotMixed(t *testing.T) { + fc := newTestForkChoiceState(100) + + // Slots 10-13: 10(empty), 11(block), 12(empty), 13(block) + prevBankhash := hash(0x01) + block11Hash := hash(0x02) + block13Hash := hash(0x03) + + injectSupermajority(fc, 13, block13Hash, 70) + fc.state.latestSlotIngested = 13 + VoteConfirmationTimeoutSlots + + candidates := map[uint64]*SlotCandidate{ + 11: {Slot: 11, HasBlock: true, Blockhash: block11Hash, LastBlockhash: prevBankhash}, + 13: {Slot: 13, HasBlock: true, Blockhash: block13Hash, LastBlockhash: block11Hash}, + } + + cc := NewConsensusCoordinator(fc, 64, "halt") + decisions, err := cc.ResolveRange(10, 13, prevBankhash, candidates) + require.NoError(t, err) + require.Len(t, decisions, 4) + + assert.Equal(t, SlotDecision{Slot: 10, UseBlock: false}, decisions[0]) + assert.Equal(t, SlotDecision{Slot: 11, UseBlock: true}, decisions[1]) + assert.Equal(t, SlotDecision{Slot: 12, UseBlock: false}, decisions[2]) + assert.Equal(t, SlotDecision{Slot: 13, UseBlock: true}, decisions[3]) +} + +func TestResolveRangeDepthExceeded(t *testing.T) { + fc := newTestForkChoiceState(100) + + injectSupermajority(fc, 100, hash(0xAA), 70) + fc.state.latestSlotIngested = 100 + VoteConfirmationTimeoutSlots + + // Range is 0..100 = 101 slots, but maxDepth is 64. + cc := NewConsensusCoordinator(fc, 64, "halt") + _, err := cc.ResolveRange(0, 100, hash(0x01), nil) + assert.ErrorIs(t, err, ErrDepthExceeded) +} + +func TestResolveRangeAllEmpty(t *testing.T) { + fc := newTestForkChoiceState(100) + + prevBankhash := hash(0x01) + + // Target hash equals prevBankhash — all slots skipped. + injectSupermajority(fc, 12, prevBankhash, 70) + fc.state.latestSlotIngested = 12 + VoteConfirmationTimeoutSlots + + cc := NewConsensusCoordinator(fc, 64, "halt") + decisions, err := cc.ResolveRange(10, 12, prevBankhash, nil) + require.NoError(t, err) + require.Len(t, decisions, 3) + for _, d := range decisions { + assert.False(t, d.UseBlock) + } +} + +func TestCoordinatorPolicy(t *testing.T) { + fc := newTestForkChoiceState(100) + cc := NewConsensusCoordinator(fc, 64, "halt") + assert.Equal(t, "halt", cc.Policy()) + + cc2 := NewConsensusCoordinator(fc, 64, "warn") + assert.Equal(t, "warn", cc2.Policy()) +} diff --git a/pkg/forkchoice/forkchoice.go b/pkg/forkchoice/forkchoice.go index 0a89fc3e..c16a2910 100644 --- a/pkg/forkchoice/forkchoice.go +++ b/pkg/forkchoice/forkchoice.go @@ -4,14 +4,55 @@ import ( "fmt" "sync" + "github.com/Overclock-Validator/mithril/pkg/base58" "github.com/Overclock-Validator/mithril/pkg/epochstakes" + "github.com/Overclock-Validator/mithril/pkg/mlog" "github.com/gagliardetto/solana-go" - "github.com/panjf2000/ants/v2" ) +// BankhashStatus represents the confirmation status of a slot's bankhash. +type BankhashStatus int + +const ( + BankhashHasSupermajority BankhashStatus = iota + BankhashNoSupermajority + BankhashNeedWait +) + +func (s BankhashStatus) String() string { + switch s { + case BankhashHasSupermajority: + return "has_supermajority" + case BankhashNoSupermajority: + return "no_supermajority" + case BankhashNeedWait: + return "need_wait" + default: + return "unknown" + } +} + +// BankhashResult provides detailed fork choice query results. +type BankhashResult struct { + Status BankhashStatus + WinningHash solana.Hash + StakeForHash uint64 // stake accumulated for the queried hash + WinningStake uint64 // stake accumulated for the winning hash (may differ from StakeForHash) + TotalEpochStake uint64 + ThresholdStake uint64 +} + type blockJob struct { slot uint64 txs []*solana.Transaction + + // Epoch data captured at submission time so that each block is processed + // with the epoch view that was current when it was submitted, not when + // it happens to be dequeued. This prevents post-boundary epoch data from + // being applied to pre-boundary blocks sitting in the queue. + epochStakes map[solana.PublicKey]uint64 + epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache + totalEpochStake uint64 } type voteUpdate struct { @@ -20,19 +61,17 @@ type voteUpdate struct { } type forkChoiceState struct { - voteStakeTotals map[uint64]*voteStakeAccumulator + voteStakeTotals map[uint64]*slotVoteAccumulator epoch uint64 epochStakes map[solana.PublicKey]uint64 epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache totalEpochStake uint64 - slotAlreadyConfirmed map[uint64]struct{} latestSlotIngested uint64 mu sync.Mutex } type ForkChoiceService struct { state *forkChoiceState - pool *ants.PoolWithFunc jobChan chan *blockJob wg sync.WaitGroup shutdown chan struct{} @@ -43,35 +82,21 @@ func NewForkChoiceService( epochStakes map[solana.PublicKey]uint64, totalEpochStake uint64, epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache, - poolSize int, -) (*ForkChoiceService, error) { +) *ForkChoiceService { state := &forkChoiceState{ - voteStakeTotals: make(map[uint64]*voteStakeAccumulator), - slotAlreadyConfirmed: make(map[uint64]struct{}), + voteStakeTotals: make(map[uint64]*slotVoteAccumulator), epoch: epoch, epochStakes: epochStakes, epochAuthorizedVoters: epochAuthorizedVoters, totalEpochStake: totalEpochStake, } - service := &ForkChoiceService{ + return &ForkChoiceService{ state: state, - jobChan: make(chan *blockJob, poolSize), + jobChan: make(chan *blockJob, 32), shutdown: make(chan struct{}), } - - poolSubmitFunc := func(job interface{}) { - service.processBlock(job.(*blockJob)) - } - - pool, err := ants.NewPoolWithFunc(poolSize, poolSubmitFunc) - if err != nil { - return nil, fmt.Errorf("failed to create ants pool: %w", err) - } - service.pool = pool - - return service, nil } func (s *ForkChoiceService) Start() { @@ -82,7 +107,6 @@ func (s *ForkChoiceService) Start() { func (s *ForkChoiceService) Stop() { close(s.shutdown) s.wg.Wait() - s.pool.Release() close(s.jobChan) } @@ -94,20 +118,37 @@ func (s *ForkChoiceService) run() { if !ok { return } - if err := s.pool.Invoke(job); err != nil { - fmt.Printf("error submitting job to ants pool for slot %d: %v\n", job.slot, err) - } + s.processBlock(job) case <-s.shutdown: - return + // Drain remaining jobs before exiting. + for { + select { + case job, ok := <-s.jobChan: + if !ok { + return + } + s.processBlock(job) + default: + return + } + } } } } func (s *ForkChoiceService) SubmitBlock(slot uint64, txs []*solana.Transaction) { + // Snapshot epoch data at submission time so the job carries a consistent + // epoch view regardless of when it is actually processed. This prevents + // UpdateEpoch from changing the epoch mid-queue. + s.state.mu.Lock() job := &blockJob{ - slot: slot, - txs: txs, + slot: slot, + txs: txs, + epochStakes: s.state.epochStakes, + epochAuthorizedVoters: s.state.epochAuthorizedVoters, + totalEpochStake: s.state.totalEpochStake, } + s.state.mu.Unlock() select { case s.jobChan <- job: @@ -117,6 +158,13 @@ func (s *ForkChoiceService) SubmitBlock(slot uint64, txs []*solana.Transaction) } func (s *ForkChoiceService) processBlock(job *blockJob) { + // Use epoch data captured at SubmitBlock time. This ensures vote parsing, + // stake lookups, and threshold computation all use a consistent epoch view + // — even if UpdateEpoch fires between submission and processing. + epochStakes := job.epochStakes + epochAuthorizedVoters := job.epochAuthorizedVoters + totalEpochStake := job.totalEpochStake + var updatesToApply []voteUpdate for _, tx := range job.txs { @@ -124,12 +172,12 @@ func (s *ForkChoiceService) processBlock(job *blockJob) { continue } - voteInfo, ok := s.state.parseAndValidateVoteTxForBankhashAndSlot(tx) + voteInfo, ok := parseAndValidateVoteTx(tx, epochAuthorizedVoters) if !ok { continue } - stakeForVoteAcct, ok := s.state.epochStakes[voteInfo.votePubkey] + stakeForVoteAcct, ok := epochStakes[voteInfo.votePubkey] if !ok { continue } @@ -144,18 +192,27 @@ func (s *ForkChoiceService) processBlock(job *blockJob) { defer s.state.mu.Unlock() for _, update := range updatesToApply { - _, alreadyConfirmed := s.state.slotAlreadyConfirmed[update.voteInfo.slot] - if alreadyConfirmed { - continue - } - accumulator, exists := s.state.voteStakeTotals[update.voteInfo.slot] if !exists { - accumulator = newVoteStakeAccumulator(s.state.totalEpochStake, update.voteInfo.slot) + accumulator = newSlotVoteAccumulator(totalEpochStake, update.voteInfo.slot) s.state.voteStakeTotals[update.voteInfo.slot] = accumulator } - accumulator.add(update.voteInfo.bankHash, update.stake) + thresholdCrossed, _ := accumulator.addVote( + update.voteInfo.bankHash, + update.voteInfo.votePubkey, + update.stake, + ) + + if thresholdCrossed { + mlog.Log.Infof("forkchoice: slot %d hash %s crossed supermajority (stake=%d/%d threshold=%d)", + update.voteInfo.slot, + base58.Encode(update.voteInfo.bankHash[:]), + accumulator.stakeForHash(update.voteInfo.bankHash), + totalEpochStake, + accumulator.thresholdStake, + ) + } } if s.state.latestSlotIngested < job.slot { @@ -163,31 +220,132 @@ func (s *ForkChoiceService) processBlock(job *blockJob) { } } -const ( - BankhashHasSupermajority = iota - BankhashNoSupermajority - BankhashNeedWait -) +// UpdateEpoch swaps in new epoch stake data. Called at epoch boundaries +// so that vote stake weights and authorized voter lookups use current data. +func (s *ForkChoiceService) UpdateEpoch( + epoch uint64, + epochStakes map[solana.PublicKey]uint64, + totalEpochStake uint64, + epochAuthorizedVoters *epochstakes.EpochAuthorizedVotersCache, +) { + s.state.mu.Lock() + defer s.state.mu.Unlock() + + s.state.epoch = epoch + s.state.epochStakes = epochStakes + s.state.totalEpochStake = totalEpochStake + s.state.epochAuthorizedVoters = epochAuthorizedVoters -const voteLandingPeriodInSlots = 32 + mlog.Log.Infof("forkchoice: updated epoch stakes for epoch %d (total_stake=%d, validators=%d)", + epoch, totalEpochStake, len(epochStakes)) +} -func (s *ForkChoiceService) IsBankhashCorrect(slot uint64, bankHash solana.Hash) int { +// VoteConfirmationTimeoutSlots is the grace window (in slots) before an +// unresolved slot (no supermajority winner) transitions from NeedWait to +// NoSupermajority. This is NOT a mandatory delay before confirmation — a slot +// with observed supermajority is confirmed immediately regardless of this window. +const VoteConfirmationTimeoutSlots = 32 + +// IsBankhashCorrect queries the confirmation status of a slot's bankhash. +// Returns a BankhashResult with status, winning hash, stake details, and threshold. +// +// A slot is confirmed immediately when a winner is observed — no mandatory delay. +// The timeout window (VoteConfirmationTimeoutSlots) only governs how long an +// unresolved slot (no winner) stays in NeedWait before becoming NoSupermajority. +func (s *ForkChoiceService) IsBankhashCorrect(slot uint64, bankHash solana.Hash) BankhashResult { s.state.mu.Lock() defer s.state.mu.Unlock() - if s.state.latestSlotIngested < (slot + voteLandingPeriodInSlots) { - return BankhashNeedWait + accumulator, exists := s.state.voteStakeTotals[slot] + + // Fast path: if a winner exists, return immediately regardless of timeout. + if exists { + winningHash, hasWinner := accumulator.winningHash() + if hasWinner { + stakeForHash := accumulator.stakeForHash(bankHash) + winningStake := accumulator.stakeForHash(winningHash) + + if winningHash == bankHash { + return BankhashResult{ + Status: BankhashHasSupermajority, + WinningHash: winningHash, + StakeForHash: stakeForHash, + WinningStake: winningStake, + TotalEpochStake: accumulator.totalEpochStake, + ThresholdStake: accumulator.thresholdStake, + } + } + + // Mismatch: our hash lost to a different supermajority hash. + mlog.Log.Warnf("forkchoice: slot %d bankhash mismatch! our=%s winning=%s (our_stake=%d winning_stake=%d/%d)", + slot, + base58.Encode(bankHash[:]), + base58.Encode(winningHash[:]), + stakeForHash, + winningStake, + accumulator.totalEpochStake, + ) + return BankhashResult{ + Status: BankhashNoSupermajority, + WinningHash: winningHash, + StakeForHash: stakeForHash, + WinningStake: winningStake, + TotalEpochStake: accumulator.totalEpochStake, + ThresholdStake: accumulator.thresholdStake, + } + } } - accumulator, exists := s.state.voteStakeTotals[slot] - if !exists { - return BankhashNeedWait + // No winner yet — use timeout to decide NeedWait vs NoSupermajority. + if s.state.latestSlotIngested < (slot + VoteConfirmationTimeoutSlots) { + totalStake := s.state.totalEpochStake + if exists { + totalStake = accumulator.totalEpochStake + } + return BankhashResult{ + Status: BankhashNeedWait, + TotalEpochStake: totalStake, + } + } + + // Timeout expired with no winner. + if exists { + stakeForHash := accumulator.stakeForHash(bankHash) + return BankhashResult{ + Status: BankhashNoSupermajority, + StakeForHash: stakeForHash, + TotalEpochStake: accumulator.totalEpochStake, + ThresholdStake: accumulator.thresholdStake, + } } - if accumulator.hashHasSupermajority(bankHash) { - s.state.slotAlreadyConfirmed[slot] = struct{}{} - return BankhashHasSupermajority - } else { - return BankhashNoSupermajority + return BankhashResult{ + Status: BankhashNoSupermajority, + TotalEpochStake: s.state.totalEpochStake, } } + +// GetSupermajorityHash returns the vote-confirmed hash for a slot, if any hash +// has crossed the 2/3 supermajority threshold. Used by the consensus coordinator +// to obtain the target hash for skipPath solving. +// +// Returns immediately when a winner is observed. The timeout window only governs +// when an unresolved slot transitions from NeedWait to NoSupermajority. +func (s *ForkChoiceService) GetSupermajorityHash(slot uint64) (solana.Hash, BankhashStatus) { + s.state.mu.Lock() + defer s.state.mu.Unlock() + + // Fast path: winner exists → return immediately. + if accumulator, exists := s.state.voteStakeTotals[slot]; exists { + if winningHash, ok := accumulator.winningHash(); ok { + return winningHash, BankhashHasSupermajority + } + } + + // No winner — use timeout to decide NeedWait vs NoSupermajority. + if s.state.latestSlotIngested < (slot + VoteConfirmationTimeoutSlots) { + return solana.Hash{}, BankhashNeedWait + } + + return solana.Hash{}, BankhashNoSupermajority +} diff --git a/pkg/forkchoice/forkchoice_test.go b/pkg/forkchoice/forkchoice_test.go new file mode 100644 index 00000000..3a24c776 --- /dev/null +++ b/pkg/forkchoice/forkchoice_test.go @@ -0,0 +1,461 @@ +package forkchoice + +import ( + "encoding/binary" + "testing" + + "github.com/Overclock-Validator/mithril/pkg/epochstakes" + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" +) + +func TestNeedWaitWhenNoWinnerBeforeTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + service := NewForkChoiceService(0, epochStakes, 100, epochAuth) + + + // No blocks ingested yet, no winner, within timeout → NeedWait. + result := service.IsBankhashCorrect(10, solana.Hash{1}) + assert.Equal(t, BankhashNeedWait, result.Status) + assert.Equal(t, uint64(100), result.TotalEpochStake) +} + +func TestHasSupermajorityAfterEnoughVotes(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + hash := solana.Hash{0xAA} + + // Manually populate state to test query path + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + for i := 0; i < 67; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + pk[1] = byte((i + 1) >> 8) + acc.addVote(hash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + VoteConfirmationTimeoutSlots + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(slot, hash) + assert.Equal(t, BankhashHasSupermajority, result.Status) + assert.Equal(t, hash, result.WinningHash) + assert.Equal(t, uint64(67), result.StakeForHash) + assert.Equal(t, totalStake, result.TotalEpochStake) + assert.Equal(t, computeThresholdStake(totalStake), result.ThresholdStake) +} + +func TestNoSupermajorityAfterLandingWindow(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + hash := solana.Hash{0xAA} + + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + // Only 30 stake — not enough for threshold of 66 + for i := 0; i < 30; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + acc.addVote(hash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + VoteConfirmationTimeoutSlots + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(slot, hash) + assert.Equal(t, BankhashNoSupermajority, result.Status) + assert.Equal(t, uint64(30), result.StakeForHash) + assert.Equal(t, totalStake, result.TotalEpochStake) + assert.Equal(t, computeThresholdStake(totalStake), result.ThresholdStake) +} + +func TestNoVotesSeenAfterLandingWindow(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + // Landing window passed but no accumulator for this slot + service.state.mu.Lock() + service.state.latestSlotIngested = 50 + VoteConfirmationTimeoutSlots + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(50, solana.Hash{0xAA}) + assert.Equal(t, BankhashNoSupermajority, result.Status) +} + +// TestEarlyConfirmationBeforeTimeout verifies that a slot with observed +// supermajority is confirmed immediately, even before the timeout window. +func TestEarlyConfirmationBeforeTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + hash := solana.Hash{0xAA} + + // Inject supermajority but keep latestSlotIngested BELOW timeout. + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + for i := 0; i < 67; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + pk[1] = byte((i + 1) >> 8) + acc.addVote(hash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + 5 // Well below slot + 32 + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(slot, hash) + assert.Equal(t, BankhashHasSupermajority, result.Status, "should confirm immediately when winner exists") + assert.Equal(t, hash, result.WinningHash) + assert.Equal(t, uint64(67), result.StakeForHash) + assert.Equal(t, totalStake, result.TotalEpochStake) + assert.Equal(t, computeThresholdStake(totalStake), result.ThresholdStake) +} + +// TestNeedWaitPartialVotesBeforeTimeout verifies NeedWait when there are +// partial votes (below threshold) and the timeout hasn't expired. +func TestNeedWaitPartialVotesBeforeTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + hash := solana.Hash{0xAA} + + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + // Only 30 stake — not enough for threshold of 66 + for i := 0; i < 30; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + acc.addVote(hash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + 10 // Below timeout + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(slot, hash) + assert.Equal(t, BankhashNeedWait, result.Status, "no winner + before timeout = NeedWait") +} + +// TestNoSupermajorityPartialVotesAfterTimeout verifies NoSupermajority when +// partial votes exist but the timeout has expired without a winner. +func TestNoSupermajorityPartialVotesAfterTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + hash := solana.Hash{0xAA} + + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + for i := 0; i < 30; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + acc.addVote(hash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + VoteConfirmationTimeoutSlots + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(slot, hash) + assert.Equal(t, BankhashNoSupermajority, result.Status, "no winner + after timeout = NoSupermajority") + assert.Equal(t, uint64(30), result.StakeForHash) + assert.Equal(t, computeThresholdStake(totalStake), result.ThresholdStake) +} + +// TestEarlyMismatchBeforeTimeout verifies that a bankhash mismatch is surfaced +// immediately when a different hash wins supermajority, even before timeout. +func TestEarlyMismatchBeforeTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + winnerHash := solana.Hash{0xBB} + ourHash := solana.Hash{0xAA} + + // Inject supermajority for winnerHash, also add some stake for ourHash. + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + for i := 0; i < 67; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + pk[1] = byte((i + 1) >> 8) + acc.addVote(winnerHash, solana.PublicKeyFromBytes(pk[:]), 1) + } + // Add 10 stake for our hash (below threshold). + for i := 0; i < 10; i++ { + var pk [32]byte + pk[0] = byte(i + 100) + acc.addVote(ourHash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + 5 // Well below timeout + service.state.mu.Unlock() + + result := service.IsBankhashCorrect(slot, ourHash) + assert.Equal(t, BankhashNoSupermajority, result.Status, "mismatch surfaced immediately") + assert.Equal(t, winnerHash, result.WinningHash, "winning hash should be the other hash") + assert.Equal(t, uint64(10), result.StakeForHash, "our hash stake") + assert.Equal(t, uint64(67), result.WinningStake, "winner stake") +} + +// TestGetSupermajorityHashEarlyConfirmation verifies that GetSupermajorityHash +// returns the winner immediately, before the timeout window. +func TestGetSupermajorityHashEarlyConfirmation(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + winnerHash := solana.Hash{0xAA} + + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + for i := 0; i < 67; i++ { + var pk [32]byte + pk[0] = byte(i + 1) + pk[1] = byte((i + 1) >> 8) + acc.addVote(winnerHash, solana.PublicKeyFromBytes(pk[:]), 1) + } + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + 5 // Below timeout + service.state.mu.Unlock() + + hash, status := service.GetSupermajorityHash(slot) + assert.Equal(t, BankhashHasSupermajority, status, "should return winner immediately") + assert.Equal(t, winnerHash, hash) +} + +// TestGetSupermajorityHashNeedWaitBeforeTimeout verifies that GetSupermajorityHash +// returns NeedWait when no winner exists and the timeout hasn't expired. +func TestGetSupermajorityHashNeedWaitBeforeTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + + slot := uint64(10) + + // Partial votes, no winner. + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + var pk [32]byte + pk[0] = 1 + acc.addVote(solana.Hash{0xAA}, solana.PublicKeyFromBytes(pk[:]), 30) + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + 10 // Below timeout + service.state.mu.Unlock() + + hash, status := service.GetSupermajorityHash(slot) + assert.Equal(t, BankhashNeedWait, status, "no winner + before timeout = NeedWait") + assert.Equal(t, solana.Hash{}, hash) +} + +// TestGetSupermajorityHashNoSupermajorityAfterTimeout verifies that +// GetSupermajorityHash returns NoSupermajority when partial votes exist +// but the timeout has expired without a winner. +func TestGetSupermajorityHashNoSupermajorityAfterTimeout(t *testing.T) { + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochStakes := map[solana.PublicKey]uint64{} + totalStake := uint64(100) + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + + slot := uint64(10) + + // Partial votes, no winner, timeout expired. + service.state.mu.Lock() + acc := newSlotVoteAccumulator(totalStake, slot) + var pk [32]byte + pk[0] = 1 + acc.addVote(solana.Hash{0xAA}, solana.PublicKeyFromBytes(pk[:]), 30) + service.state.voteStakeTotals[slot] = acc + service.state.latestSlotIngested = slot + VoteConfirmationTimeoutSlots + service.state.mu.Unlock() + + hash, status := service.GetSupermajorityHash(slot) + assert.Equal(t, BankhashNoSupermajority, status, "no winner + after timeout = NoSupermajority") + assert.Equal(t, solana.Hash{}, hash) +} + +// TestSubmitBlockCapturesEpochDataBeforeUpdateEpoch verifies that a block +// submitted before UpdateEpoch is processed with the epoch data that was +// current at submission time, not the post-update data. This prevents +// pre-boundary blocks queued before an epoch transition from being parsed +// and weighted with post-boundary stakes/voters. +func TestSubmitBlockCapturesEpochDataBeforeUpdateEpoch(t *testing.T) { + voterKey := solana.PublicKey{1} + voteAcct := solana.PublicKey{2} + + // Epoch 0: voterKey authorized for voteAcct, stake=50, total=100. + epoch0Auth := epochstakes.NewEpochAuthorizedVotersCache() + epoch0Auth.PutEntry(voteAcct, voterKey) + epoch0Stakes := map[solana.PublicKey]uint64{voteAcct: 50} + epoch0Total := uint64(100) + + // Epoch 1: voterKey NOT authorized, different total. + epoch1Auth := epochstakes.NewEpochAuthorizedVotersCache() + epoch1Stakes := map[solana.PublicKey]uint64{voteAcct: 75} + epoch1Total := uint64(200) + + service := NewForkChoiceService(0, epoch0Stakes, epoch0Total, epoch0Auth) + + + // Build a vote tx: voterKey votes for slot 50 with hash {0xBB}. + votedSlot := uint64(50) + votedHash := solana.Hash{0xBB} + voteTx := buildTestVoteTx(voteAcct, voterKey, votedSlot, votedHash) + + // Submit the block — captures epoch 0 data in the job. + // Service is NOT started, so the job sits in the channel. + service.SubmitBlock(100, []*solana.Transaction{voteTx}) + + // Update to epoch 1 BEFORE the job is processed. + service.UpdateEpoch(1, epoch1Stakes, epoch1Total, epoch1Auth) + + // Drain the job and verify it carries epoch 0 data. + job := <-service.jobChan + assert.Equal(t, epoch0Total, job.totalEpochStake, "job should carry epoch 0 total stake") + + // Process the job — should use epoch 0 data from the job. + service.processBlock(job) + + // The vote should have been accepted (voterKey authorized in epoch 0), + // and the accumulator should use epoch 0's total stake for threshold. + service.state.mu.Lock() + acc, exists := service.state.voteStakeTotals[votedSlot] + service.state.mu.Unlock() + + assert.True(t, exists, "accumulator should exist — vote authorized in epoch 0") + assert.Equal(t, epoch0Total, acc.totalEpochStake, "accumulator threshold should use epoch 0 total") + assert.Equal(t, computeThresholdStake(epoch0Total), acc.thresholdStake) + assert.Equal(t, uint64(50), acc.stakeForHash(votedHash), "vote weighted with epoch 0 stake") + + // Verify the converse: if the job had used epoch 1 data, the vote would + // have been rejected (voterKey not in epoch1Auth) and no accumulator created. + // The existence of the accumulator with epoch 0 weights proves the snapshot. +} + +// TestSequentialProcessingDeterminesWinner verifies that block processing order +// determines winner selection when two competing hashes can both independently +// cross the 2/3 threshold. This is a regression test for the ordering fix that +// replaced the concurrent ants pool with sequential inline processing. +// +// The test exercises the full service path: Start() → SubmitBlock() → run() → +// processBlock() → Stop(). Channel FIFO ordering guarantees block 100 is +// processed before block 101, so hashX always crosses the threshold first. +// +// With the old ants pool, thread scheduling determined which block's votes +// applied first, making the winner non-deterministic. +func TestSequentialProcessingDeterminesWinner(t *testing.T) { + // Two voters, each with enough stake to independently cross the threshold. + voterA := solana.PublicKey{1} + voteAcctA := solana.PublicKey{2} + voterB := solana.PublicKey{3} + voteAcctB := solana.PublicKey{4} + + totalStake := uint64(100) // threshold = uint64(100 * 2/3) = 66 + + epochAuth := epochstakes.NewEpochAuthorizedVotersCache() + epochAuth.PutEntry(voteAcctA, voterA) + epochAuth.PutEntry(voteAcctB, voterB) + + epochStakes := map[solana.PublicKey]uint64{ + voteAcctA: 67, // > threshold of 66 + voteAcctB: 67, + } + + service := NewForkChoiceService(0, epochStakes, totalStake, epochAuth) + service.Start() + + // Both voters vote for the same slot but different hashes. + votedSlot := uint64(50) + hashX := solana.Hash{0xAA} + hashY := solana.Hash{0xBB} + + // Block at slot 100: voterA votes for (slot 50, hashX) — submitted first. + txA := buildTestVoteTx(voteAcctA, voterA, votedSlot, hashX) + service.SubmitBlock(100, []*solana.Transaction{txA}) + + // Block at slot 101: voterB votes for (slot 50, hashY) — submitted second. + txB := buildTestVoteTx(voteAcctB, voterB, votedSlot, hashY) + service.SubmitBlock(101, []*solana.Transaction{txB}) + + // Stop drains all queued jobs before returning. + service.Stop() + + // latestSlotIngested should have been advanced by processBlock (not manual). + service.state.mu.Lock() + latestIngested := service.state.latestSlotIngested + service.state.mu.Unlock() + assert.Equal(t, uint64(101), latestIngested, "watermark should advance to 101") + + // hashX must win because block 100 was processed first (channel FIFO). + resultX := service.IsBankhashCorrect(votedSlot, hashX) + assert.Equal(t, BankhashHasSupermajority, resultX.Status, "hashX should have supermajority") + assert.Equal(t, hashX, resultX.WinningHash) + + // hashY loses — a different hash already won supermajority. + resultY := service.IsBankhashCorrect(votedSlot, hashY) + assert.Equal(t, BankhashNoSupermajority, resultY.Status, "hashY should be NoSupermajority (mismatch)") + assert.Equal(t, hashX, resultY.WinningHash, "winning hash should still be hashX") +} + +// buildTestVoteTx constructs a minimal valid vote transaction for testing. +// The tx passes IsVote(), IsSigner(authority), and parseAndValidateVoteTx(). +func buildTestVoteTx(voteAcct, voteAuthority solana.PublicKey, slot uint64, hash solana.Hash) *solana.Transaction { + // Encode VoteProgramInstrTypeVote (type=2): + // [type:4][num_slots:8][slot:8][hash:32][timestamp_opt:1] = 53 bytes + data := make([]byte, 53) + binary.LittleEndian.PutUint32(data[0:4], 2) // VoteProgramInstrTypeVote + binary.LittleEndian.PutUint64(data[4:12], 1) // 1 slot (Rust Vec len is u64) + binary.LittleEndian.PutUint64(data[12:20], slot) + copy(data[20:52], hash[:]) + data[52] = 0 // No timestamp + + return &solana.Transaction{ + Message: solana.Message{ + Header: solana.MessageHeader{ + NumRequiredSignatures: 2, + }, + AccountKeys: []solana.PublicKey{voteAcct, voteAuthority, solana.VoteProgramID}, + Instructions: []solana.CompiledInstruction{ + { + ProgramIDIndex: 2, + Accounts: []uint16{0, 1}, + Data: solana.Base58(data), + }, + }, + }, + Signatures: []solana.Signature{{}, {}}, + } +} diff --git a/pkg/forkchoice/skip_path.go b/pkg/forkchoice/skip_path.go new file mode 100644 index 00000000..b63cb06e --- /dev/null +++ b/pkg/forkchoice/skip_path.go @@ -0,0 +1,119 @@ +package forkchoice + +import ( + "errors" + "fmt" + + "github.com/gagliardetto/solana-go" +) + +var ( + ErrDepthExceeded = errors.New("skip path: depth exceeded max allowed") + ErrNoPath = errors.New("skip path: no valid path found to target hash") +) + +// SlotCandidate represents a candidate block at a specific slot. +type SlotCandidate struct { + Slot uint64 + HasBlock bool + Blockhash solana.Hash // computed bankhash after executing this slot's block + LastBlockhash solana.Hash // expected parent bankhash (block.LastBlockhash) +} + +// SolveResult contains the resolved skip path. +// Path[i] corresponds to slot (startSlot + i): +// +// true → use the block at that slot +// false → slot is empty/skipped +type SolveResult struct { + Path []bool + MatchedHash solana.Hash +} + +// candidate represents one possible bankhash state and the decisions that led there. +type candidate struct { + hash solana.Hash + path []bool +} + +// SkipPath finds a sequence of empty/block decisions from startSlot to endSlot +// such that, starting from prevBankhash (the parent bankhash at startSlot-1), +// the bankhash state at endSlot equals targetHash. +// +// This solver operates over executed candidate results, not raw shreds. +// A skipped slot leaves the bankhash state unchanged in the current verifier +// model. A block candidate can only advance the state if its LastBlockhash +// matches the current bankhash. +// +// The solver uses BFS with hash-dedup at each depth to prevent state explosion. +// Since empty slots are identity on bankhash state, consecutive skips collapse +// to a single state. +func SkipPath( + startSlot uint64, + endSlot uint64, + prevBankhash solana.Hash, + candidates map[uint64]*SlotCandidate, + targetHash solana.Hash, + maxDepth int, +) (*SolveResult, error) { + if endSlot < startSlot { + return nil, fmt.Errorf("skip path: endSlot %d < startSlot %d", endSlot, startSlot) + } + + depth := endSlot - startSlot + 1 + if int(depth) > maxDepth { + return nil, ErrDepthExceeded + } + + // Start with a single candidate at prevBankhash with no decisions. + current := []candidate{ + {hash: prevBankhash, path: nil}, + } + + for slot := startSlot; slot <= endSlot; slot++ { + cand := candidates[slot] + + // Deduplicate next states by bankhash. Two paths reaching the same + // bankhash state at the same slot produce identical results going forward. + seen := make(map[solana.Hash]struct{}) + var next []candidate + + for _, c := range current { + // Branch 1: slot is empty (skipped). Bankhash state unchanged. + emptyHash := c.hash + if _, dup := seen[emptyHash]; !dup { + seen[emptyHash] = struct{}{} + newPath := make([]bool, len(c.path)+1) + copy(newPath, c.path) + newPath[len(c.path)] = false + next = append(next, candidate{hash: emptyHash, path: newPath}) + } + + // Branch 2: slot has a block AND block's parent bankhash matches our state. + if cand != nil && cand.HasBlock && cand.LastBlockhash == c.hash { + blockHash := cand.Blockhash + if _, dup := seen[blockHash]; !dup { + seen[blockHash] = struct{}{} + newPath := make([]bool, len(c.path)+1) + copy(newPath, c.path) + newPath[len(c.path)] = true + next = append(next, candidate{hash: blockHash, path: newPath}) + } + } + } + + current = next + } + + // Look for a candidate whose final bankhash equals targetHash. + for _, c := range current { + if c.hash == targetHash { + return &SolveResult{ + Path: c.path, + MatchedHash: targetHash, + }, nil + } + } + + return nil, ErrNoPath +} diff --git a/pkg/forkchoice/skip_path_test.go b/pkg/forkchoice/skip_path_test.go new file mode 100644 index 00000000..bfbc62a4 --- /dev/null +++ b/pkg/forkchoice/skip_path_test.go @@ -0,0 +1,141 @@ +package forkchoice + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func hash(b byte) solana.Hash { + return solana.Hash{b} +} + +func TestSkipPathSingleBlock(t *testing.T) { + // prevBankhash → executed slot 10 candidate → targetHash + prevBankhash := hash(0x01) + blockHash := hash(0x02) + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: blockHash, LastBlockhash: prevBankhash}, + } + + result, err := SkipPath(10, 10, prevBankhash, candidates, blockHash, 64) + require.NoError(t, err) + assert.Equal(t, []bool{true}, result.Path) + assert.Equal(t, blockHash, result.MatchedHash) +} + +func TestSkipPathMultipleBlocks(t *testing.T) { + // prevBankhash → executed slot 10 → executed slot 12 → targetHash + // Slots 11 is empty (skipped) + prevBankhash := hash(0x01) + block10Hash := hash(0x02) + block12Hash := hash(0x03) + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: block10Hash, LastBlockhash: prevBankhash}, + 12: {Slot: 12, HasBlock: true, Blockhash: block12Hash, LastBlockhash: block10Hash}, + } + + result, err := SkipPath(10, 12, prevBankhash, candidates, block12Hash, 64) + require.NoError(t, err) + assert.Equal(t, []bool{true, false, true}, result.Path) + assert.Equal(t, block12Hash, result.MatchedHash) +} + +func TestSkipPathNoPathExists(t *testing.T) { + prevBankhash := hash(0x01) + wrongTarget := hash(0xFF) + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: hash(0x02), LastBlockhash: prevBankhash}, + } + + _, err := SkipPath(10, 10, prevBankhash, candidates, wrongTarget, 64) + assert.ErrorIs(t, err, ErrNoPath) +} + +func TestSkipPathDepthExceeded(t *testing.T) { + prevBankhash := hash(0x01) + target := hash(0x02) + + _, err := SkipPath(0, 100, prevBankhash, nil, target, 64) + assert.ErrorIs(t, err, ErrDepthExceeded) +} + +func TestSkipPathEmptyOnlyPath(t *testing.T) { + // All slots skipped: target == prevBankhash + prevBankhash := hash(0x01) + + result, err := SkipPath(10, 13, prevBankhash, nil, prevBankhash, 64) + require.NoError(t, err) + assert.Equal(t, []bool{false, false, false, false}, result.Path) +} + +func TestSkipPathMixedEmptyAndBlock(t *testing.T) { + // slots: 10(empty), 11(block), 12(empty), 13(block) + prevBankhash := hash(0x01) + block11Hash := hash(0x02) + block13Hash := hash(0x03) + + candidates := map[uint64]*SlotCandidate{ + 11: {Slot: 11, HasBlock: true, Blockhash: block11Hash, LastBlockhash: prevBankhash}, + 13: {Slot: 13, HasBlock: true, Blockhash: block13Hash, LastBlockhash: block11Hash}, + } + + result, err := SkipPath(10, 13, prevBankhash, candidates, block13Hash, 64) + require.NoError(t, err) + assert.Equal(t, []bool{false, true, false, true}, result.Path) +} + +func TestSkipPathBlockLastBlockhashMismatch(t *testing.T) { + // Block exists but its LastBlockhash doesn't match prevBankhash — can't use it + prevBankhash := hash(0x01) + wrongParent := hash(0xFF) + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: hash(0x02), LastBlockhash: wrongParent}, + } + + // Target is prevBankhash (all empty), which should work + result, err := SkipPath(10, 10, prevBankhash, candidates, prevBankhash, 64) + require.NoError(t, err) + assert.Equal(t, []bool{false}, result.Path, "should skip the block with mismatched parent") +} + +func TestSkipPathDedupCollapsesStates(t *testing.T) { + // Multiple consecutive empty slots should not cause state explosion. + // 60 slots, all empty, target = prevBankhash + prevBankhash := hash(0x01) + + result, err := SkipPath(0, 59, prevBankhash, nil, prevBankhash, 64) + require.NoError(t, err) + assert.Len(t, result.Path, 60) + for _, decision := range result.Path { + assert.False(t, decision) + } +} + +func TestSkipPathEndSlotBeforeStartSlot(t *testing.T) { + _, err := SkipPath(10, 5, hash(0x01), nil, hash(0x01), 64) + assert.Error(t, err) + assert.Contains(t, err.Error(), "endSlot 5 < startSlot 10") +} + +func TestSkipPathChooseBlockOverEmpty(t *testing.T) { + // When both paths work (block.Blockhash == prevBankhash would mean empty + // and block both reach same state), the solver should still find a valid path. + // This tests deterministic behavior. + prevBankhash := hash(0x01) + target := hash(0x02) + + candidates := map[uint64]*SlotCandidate{ + 10: {Slot: 10, HasBlock: true, Blockhash: target, LastBlockhash: prevBankhash}, + } + + result, err := SkipPath(10, 10, prevBankhash, candidates, target, 64) + require.NoError(t, err) + assert.Equal(t, []bool{true}, result.Path) +} diff --git a/pkg/forkchoice/vote_parser.go b/pkg/forkchoice/vote_parser.go index cd839006..01e1271b 100644 --- a/pkg/forkchoice/vote_parser.go +++ b/pkg/forkchoice/vote_parser.go @@ -1,6 +1,7 @@ package forkchoice import ( + "github.com/Overclock-Validator/mithril/pkg/epochstakes" "github.com/Overclock-Validator/mithril/pkg/sealevel" bin "github.com/gagliardetto/binary" "github.com/gagliardetto/solana-go" @@ -13,7 +14,9 @@ type voteInfo struct { votePubkey solana.PublicKey } -func (f *forkChoiceState) parseAndValidateVoteTxForBankhashAndSlot(tx *solana.Transaction) (*voteInfo, bool) { +// parseAndValidateVoteTx validates a vote transaction against the given authorized +// voters cache. Accepts the cache as a parameter to avoid racing with epoch updates. +func parseAndValidateVoteTx(tx *solana.Transaction, authorizedVoters *epochstakes.EpochAuthorizedVotersCache) (*voteInfo, bool) { if len(tx.Message.Instructions) < 1 { return nil, false } @@ -26,7 +29,10 @@ func (f *forkChoiceState) parseAndValidateVoteTxForBankhashAndSlot(tx *solana.Tr votePubkey := tx.Message.AccountKeys[instr.Accounts[0]] voteAuthority := tx.Message.AccountKeys[instr.Accounts[1]] - if !(tx.IsSigner(voteAuthority) && f.epochAuthorizedVoters.IsAuthorizedVoter(votePubkey, voteAuthority)) { + if authorizedVoters == nil { + return nil, false + } + if !(tx.IsSigner(voteAuthority) && authorizedVoters.IsAuthorizedVoter(votePubkey, voteAuthority)) { return nil, false } diff --git a/pkg/forkchoice/vote_stake_accumulator.go b/pkg/forkchoice/vote_stake_accumulator.go index eab77a4e..ba1a67d3 100644 --- a/pkg/forkchoice/vote_stake_accumulator.go +++ b/pkg/forkchoice/vote_stake_accumulator.go @@ -4,26 +4,115 @@ import ( "github.com/gagliardetto/solana-go" ) -type voteStakeAccumulator struct { - stakePerHash map[solana.Hash]uint64 - supermajorityStake uint64 - slot uint64 -} - -func newVoteStakeAccumulator(totalStake uint64, slot uint64) *voteStakeAccumulator { - return &voteStakeAccumulator{ - stakePerHash: make(map[solana.Hash]uint64), - // calculate supermajority stake weight threshold (2/3) - supermajorityStake: (totalStake*2)/3 + 1, - slot: slot, +// VoteThresholdSize matches Agave's VOTE_THRESHOLD_SIZE = 2f64 / 3f64. +// See: agave/runtime/src/commitment.rs:9 +const VoteThresholdSize = 2.0 / 3.0 + +// computeThresholdStake computes the threshold using Agave's exact formula: +// +// threshold_stake = (total_stake as f64 * threshold) as u64 +// +// Supermajority is reached when accumulated stake exceeds this value. +// See: agave/core/src/consensus/vote_stake_tracker.rs:30 +func computeThresholdStake(totalStake uint64) uint64 { + return uint64(float64(totalStake) * VoteThresholdSize) +} + +// voteStakeTracker tracks per-pubkey vote stake for a single (slot, hash) pair. +// Equivalent to Agave's VoteStakeTracker. +// See: agave/core/src/consensus/vote_stake_tracker.rs +type voteStakeTracker struct { + voted map[solana.PublicKey]struct{} + stake uint64 +} + +// slotVoteAccumulator tracks all hash trackers for a single slot. +// Equivalent to Agave's SlotVoteTracker.optimistic_votes_tracker. +// See: agave/core/src/cluster_info_vote_listener.rs:75 +type slotVoteAccumulator struct { + trackers map[solana.Hash]*voteStakeTracker + totalEpochStake uint64 + thresholdStake uint64 + slot uint64 + confirmed bool + confirmedHash solana.Hash +} + +func newSlotVoteAccumulator(totalEpochStake uint64, slot uint64) *slotVoteAccumulator { + return &slotVoteAccumulator{ + trackers: make(map[solana.Hash]*voteStakeTracker), + totalEpochStake: totalEpochStake, + thresholdStake: computeThresholdStake(totalEpochStake), + slot: slot, } } -func (accumulator *voteStakeAccumulator) add(hash solana.Hash, stake uint64) { - accumulator.stakePerHash[hash] += stake +// addVote records a vote for the given hash by votePubkey with the given stake. +// Returns (thresholdCrossed, isNew). +// +// thresholdCrossed is true only on the exact vote that causes the hash to cross +// the 2/3 threshold. Uses Agave crossing semantics: +// +// old_stake <= threshold_stake && threshold_stake < new_stake +// +// isNew is true if this pubkey had not previously voted for this (slot, hash). +// Dedup is per (slot, hash, pubkey) — same pubkey can vote for different hashes. +// +// See: agave/core/src/consensus/vote_stake_tracker.rs:14-37 +func (acc *slotVoteAccumulator) addVote(hash solana.Hash, votePubkey solana.PublicKey, stake uint64) (thresholdCrossed bool, isNew bool) { + tracker, exists := acc.trackers[hash] + if !exists { + tracker = &voteStakeTracker{ + voted: make(map[solana.PublicKey]struct{}), + } + acc.trackers[hash] = tracker + } + + if _, alreadyVoted := tracker.voted[votePubkey]; alreadyVoted { + return false, false + } + + tracker.voted[votePubkey] = struct{}{} + oldStake := tracker.stake + newStake := oldStake + stake + tracker.stake = newStake + + crossed := oldStake <= acc.thresholdStake && acc.thresholdStake < newStake + if crossed && !acc.confirmed { + acc.confirmed = true + acc.confirmedHash = hash + } + + return crossed, true +} + +// hasSupermajority returns true if any hash for this slot has reached 2/3 supermajority. +func (acc *slotVoteAccumulator) hasSupermajority() bool { + return acc.confirmed } -func (accumulator *voteStakeAccumulator) hashHasSupermajority(hash solana.Hash) bool { - stake := accumulator.stakePerHash[hash] - return stake >= accumulator.supermajorityStake +// hashHasSupermajority returns true if the given specific hash has crossed the 2/3 threshold. +func (acc *slotVoteAccumulator) hashHasSupermajority(hash solana.Hash) bool { + tracker, exists := acc.trackers[hash] + if !exists { + return false + } + return tracker.stake > acc.thresholdStake +} + +// winningHash returns the hash that crossed the threshold, if any. +func (acc *slotVoteAccumulator) winningHash() (solana.Hash, bool) { + if !acc.confirmed { + return solana.Hash{}, false + } + return acc.confirmedHash, true +} + +// stakeForHash returns the accumulated stake for a specific hash. +func (acc *slotVoteAccumulator) stakeForHash(hash solana.Hash) uint64 { + tracker, exists := acc.trackers[hash] + if !exists { + return 0 + } + return tracker.stake } diff --git a/pkg/forkchoice/vote_stake_accumulator_test.go b/pkg/forkchoice/vote_stake_accumulator_test.go new file mode 100644 index 00000000..13744dba --- /dev/null +++ b/pkg/forkchoice/vote_stake_accumulator_test.go @@ -0,0 +1,128 @@ +package forkchoice + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func makePubkey(id byte) solana.PublicKey { + var pk [32]byte + pk[0] = id + return solana.PublicKeyFromBytes(pk[:]) +} + +func TestComputeThresholdStake(t *testing.T) { + // Match Agave's formula: uint64(float64(total) * 2.0/3.0) + assert.Equal(t, uint64(6), computeThresholdStake(10)) + assert.Equal(t, uint64(66), computeThresholdStake(100)) + assert.Equal(t, uint64(0), computeThresholdStake(0)) + assert.Equal(t, uint64(0), computeThresholdStake(1)) + // total=3: int(3 * 0.666...) = int(2.0) = 2 + assert.Equal(t, uint64(2), computeThresholdStake(3)) +} + +// TestThresholdCrossingSmallTotal mirrors Agave's test_add_vote_pubkey: +// total=10, 10 voters each with stake=1, threshold crosses at i=6 (stake=7 > threshold=6). +func TestThresholdCrossingSmallTotal(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + hash := solana.Hash{1} + + for i := 0; i < 10; i++ { + pubkey := makePubkey(byte(i + 1)) + crossed, isNew := acc.addVote(hash, pubkey, 1) + assert.True(t, isNew, "vote %d should be new", i) + + // Agave: at i=6, stake goes from 6 to 7, crossing threshold of 6 + if i == 6 { + assert.True(t, crossed, "threshold should cross at i=6 (stake=7)") + } else { + assert.False(t, crossed, "threshold should NOT cross at i=%d", i) + } + } + + assert.True(t, acc.hasSupermajority()) + assert.True(t, acc.hashHasSupermajority(hash)) + assert.Equal(t, uint64(10), acc.stakeForHash(hash)) +} + +func TestDuplicateVotePubkeyDoesNotIncreaseStake(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + hash := solana.Hash{1} + pubkey := makePubkey(1) + + _, isNew1 := acc.addVote(hash, pubkey, 5) + assert.True(t, isNew1) + assert.Equal(t, uint64(5), acc.stakeForHash(hash)) + + crossed2, isNew2 := acc.addVote(hash, pubkey, 5) + assert.False(t, isNew2, "duplicate vote should not be new") + assert.False(t, crossed2, "duplicate vote should not cross threshold") + assert.Equal(t, uint64(5), acc.stakeForHash(hash), "stake should not increase on duplicate") +} + +func TestTwoHashesSameSlot(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + hashA := solana.Hash{0xAA} + hashB := solana.Hash{0xBB} + + // 7 voters for hashA + for i := 0; i < 7; i++ { + acc.addVote(hashA, makePubkey(byte(i+1)), 1) + } + // 3 voters for hashB + for i := 0; i < 3; i++ { + acc.addVote(hashB, makePubkey(byte(i+100)), 1) + } + + assert.True(t, acc.hashHasSupermajority(hashA), "hashA should have supermajority") + assert.False(t, acc.hashHasSupermajority(hashB), "hashB should NOT have supermajority") + + winning, ok := acc.winningHash() + require.True(t, ok) + assert.Equal(t, hashA, winning) +} + +func TestNoThresholdReached(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + hash := solana.Hash{1} + + // Only 3 stake — not enough for threshold of 6 + for i := 0; i < 3; i++ { + acc.addVote(hash, makePubkey(byte(i+1)), 1) + } + + assert.False(t, acc.hasSupermajority()) + assert.False(t, acc.hashHasSupermajority(hash)) + _, ok := acc.winningHash() + assert.False(t, ok) +} + +// TestDedupAcrossDifferentHashes verifies that dedup is per (slot, hash, pubkey), +// matching Agave behavior. Same pubkey voting for different hashes should count in both. +func TestDedupAcrossDifferentHashes(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + hashA := solana.Hash{0xAA} + hashB := solana.Hash{0xBB} + pubkey := makePubkey(1) + + _, isNew1 := acc.addVote(hashA, pubkey, 5) + _, isNew2 := acc.addVote(hashB, pubkey, 5) + + assert.True(t, isNew1) + assert.True(t, isNew2, "same pubkey voting for different hash should be new") + assert.Equal(t, uint64(5), acc.stakeForHash(hashA)) + assert.Equal(t, uint64(5), acc.stakeForHash(hashB)) +} + +func TestStakeForNonexistentHash(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + assert.Equal(t, uint64(0), acc.stakeForHash(solana.Hash{0xFF})) +} + +func TestHashHasSupermajorityNonexistentHash(t *testing.T) { + acc := newSlotVoteAccumulator(10, 42) + assert.False(t, acc.hashHasSupermajority(solana.Hash{0xFF})) +} diff --git a/pkg/global/global_ctx.go b/pkg/global/global_ctx.go index a2fcc376..e93631f6 100644 --- a/pkg/global/global_ctx.go +++ b/pkg/global/global_ctx.go @@ -71,11 +71,15 @@ func SetForkChoice(forkChoice *forkchoice.ForkChoiceService) { instance.forkChoice = forkChoice } +func HasForkChoice() bool { + return instance.forkChoice != nil +} + func SubmitBlockToForkChoiceService(slot uint64, txs []*solana.Transaction) { instance.forkChoice.SubmitBlock(slot, txs) } -func BankhashConfirmedForSlot(slot uint64, bankHash solana.Hash) int { +func BankhashConfirmedForSlot(slot uint64, bankHash solana.Hash) forkchoice.BankhashResult { return instance.forkChoice.IsBankhashCorrect(slot, bankHash) } @@ -103,6 +107,12 @@ func EpochAuthorizedVoters() *epochstakes.EpochAuthorizedVotersCache { return instance.epochAuthorizedVoters } +// SetEpochAuthorizedVoters replaces the entire authorized voters cache. +// Called at epoch boundaries after rebuilding from vote accounts. +func SetEpochAuthorizedVoters(cache *epochstakes.EpochAuthorizedVotersCache) { + instance.epochAuthorizedVoters = cache +} + func PutSlotConfirmed(slot uint64) { if instance.slotsConfirmed == nil { instance.slotsConfirmed = make(map[uint64]struct{}) diff --git a/pkg/replay/block.go b/pkg/replay/block.go index 8f2e4433..02853f11 100644 --- a/pkg/replay/block.go +++ b/pkg/replay/block.go @@ -4,6 +4,7 @@ import ( "context" "crypto/rand" "encoding/base64" + "errors" "encoding/hex" "encoding/json" "fmt" @@ -29,6 +30,7 @@ import ( "github.com/Overclock-Validator/mithril/pkg/blockstream" "github.com/Overclock-Validator/mithril/pkg/features" "github.com/Overclock-Validator/mithril/pkg/fees" + "github.com/Overclock-Validator/mithril/pkg/forkchoice" "github.com/Overclock-Validator/mithril/pkg/global" "github.com/Overclock-Validator/mithril/pkg/lthash" "github.com/Overclock-Validator/mithril/pkg/metrics" @@ -63,6 +65,14 @@ type BlockFetchOpts struct { NearTipLookahead int // Slots ahead to schedule in near-tip, 0 = use default } +// ConsensusOpts contains vote-anchored consensus configuration. +// Nil means use defaults (max_depth=64, policy="halt"). +type ConsensusOpts struct { + SkipPathMaxDepth int // Max slots for skip-path solver (default: 64) + UnresolvedPolicy string // "halt" or "warn" (default: "halt") + EnforceOnSource string // "lightbringer" or "all" (default: "lightbringer") +} + var SerializedParameterArena *arena.Arena[byte] // Commit state tracking for panic recovery @@ -1200,6 +1210,7 @@ func ReplayBlocks( metricsWriter io.Writer, rpcServer *rpcserver.RpcServer, blockFetchOpts *BlockFetchOpts, + consensusOpts *ConsensusOpts, // nil = use defaults (max_depth=64, policy="halt") onCancelWriteState OnCancelWriteState, // callback to write state immediately on cancellation (can be nil) ) *ReplayResult { result := &ReplayResult{} @@ -1291,6 +1302,25 @@ func ReplayBlocks( result.Error = fmt.Errorf("missing required epoch stakes for current epoch %d - cannot resume (need fresh snapshot)", currentEpoch) return result } + // Load EpochAuthorizedVoters from state file (required for forkchoice vote parsing). + // buildInitialEpochStakesCache loads these, but this path skips that function. + if len(mithrilState.ManifestEpochAuthorizedVoters) > 0 { + for voteAcctStr, authorizedVoterStrs := range mithrilState.ManifestEpochAuthorizedVoters { + voteAcct, vErr := base58.DecodeFromString(voteAcctStr) + if vErr != nil { + result.Error = fmt.Errorf("corrupted state file: failed to decode epoch_authorized_voters key %s: %w", voteAcctStr, vErr) + return result + } + for _, authorizedVoterStr := range authorizedVoterStrs { + authorizedVoter, vErr := base58.DecodeFromString(authorizedVoterStr) + if vErr != nil { + result.Error = fmt.Errorf("corrupted state file: failed to decode epoch_authorized_voters value %s: %w", authorizedVoterStr, vErr) + return result + } + global.PutEpochAuthorizedVoter(voteAcct, authorizedVoter) + } + } + } } else { // Resume in same epoch as snapshot, no boundaries crossed - state file epoch stakes still valid if err := buildInitialEpochStakesCache(mithrilState); err != nil { @@ -1305,9 +1335,46 @@ func ReplayBlocks( return result } } - //forkChoice, err := forkchoice.NewForkChoiceService(currentEpoch, global.EpochStakes(currentEpoch), global.EpochTotalStake(currentEpoch), global.EpochAuthorizedVoters(), 4) - //forkChoice.Start() - //global.SetForkChoice(forkChoice) + // Resolve consensus config defaults before forkchoice init so we can + // check whether enforcement requires authorized voters. + consensusMaxDepth := 64 + consensusPolicy := "halt" + consensusEnforceSource := "lightbringer" + if consensusOpts != nil { + if consensusOpts.SkipPathMaxDepth > 0 { + consensusMaxDepth = consensusOpts.SkipPathMaxDepth + } + if consensusOpts.UnresolvedPolicy != "" { + consensusPolicy = consensusOpts.UnresolvedPolicy + } + if consensusOpts.EnforceOnSource != "" { + consensusEnforceSource = consensusOpts.EnforceOnSource + } + } + + // Determine whether consensus enforcement is active for this block source. + // "lightbringer" = only enforce on Lightbringer blocks; "all" = enforce on all sources. + consensusEnforceActive := (consensusEnforceSource == "all") || (consensusEnforceSource == "lightbringer" && useLightbringer) + + epochAuthVoters := global.EpochAuthorizedVoters() + if epochAuthVoters == nil { + // Without authorized voters, forkchoice can't parse votes → no supermajority → enforcement is blind. + // If consensus enforcement is active, this is a fatal misconfiguration. + if consensusEnforceActive && consensusPolicy == "halt" { + result.Error = fmt.Errorf("forkchoice: EpochAuthorizedVoters is nil — cannot enforce consensus without vote parsing (check snapshot/state file)") + return result + } + mlog.Log.Warnf("forkchoice: EpochAuthorizedVoters is nil — vote parsing will be skipped until populated") + } + forkChoice := forkchoice.NewForkChoiceService(currentEpoch, global.EpochStakes(currentEpoch), global.EpochTotalStake(currentEpoch), epochAuthVoters) + forkChoice.Start() + global.SetForkChoice(forkChoice) + defer forkChoice.Stop() + + // Instantiate the consensus coordinator for skip-path resolution and policy. + // The coordinator drives halt/warn policy and will be used for Lightbringer batch + // range resolution when that mode is fully activated. + consensusCoordinator := forkchoice.NewConsensusCoordinator(forkChoice, consensusMaxDepth, consensusPolicy) var statsCounter int var execTimes []float64 // seconds per block @@ -1325,6 +1392,18 @@ func ReplayBlocks( voteTxCounts = make([]uint64, 0, summaryInterval) nonVoteTxCounts = make([]uint64, 0, summaryInterval) + // Skip-path chain verification state: tracks processed blocks as candidates so that + // ResolveRange can retroactively verify the bankhash chain when supermajority confirms a slot. + // The "Blockhash" in candidates is the computed bankhash (not PoH blockhash), and + // "LastBlockhash" is the parent bankhash — bankhashes chain correctly through blocks. + var candidateBlocks map[uint64]*forkchoice.SlotCandidate + var chainAnchorSlot uint64 + var chainAnchorHash solana.Hash + var chainAnchorSet bool + if consensusEnforceActive { + candidateBlocks = make(map[uint64]*forkchoice.SlotCandidate) + } + var opts *blockstream.BlockSourceOpts if useLightbringer { opts = &blockstream.BlockSourceOpts{ @@ -1371,6 +1450,40 @@ func ReplayBlocks( var skippedSlotsCount int // Track skipped slots for 100-slot summary replayStartLogged := false + // writeConsensusArtifact writes a best-effort JSON diagnostic artifact to the + // per-run consensus subdirectory. If the log dir is empty or any step fails, + // it logs a warning and continues — artifact failure must not crash replay. + writeConsensusArtifact := func(filename string, data map[string]interface{}) { + logDir := mlog.GetLogDir() + if logDir == "" { + return + } + dir := filepath.Join(logDir, "consensus") + if err := os.MkdirAll(dir, 0755); err != nil { + mlog.Log.Warnf("consensus artifact: failed to create directory %s: %v", dir, err) + return + } + artifactPath := filepath.Join(dir, filename) + artifactJSON, jsonErr := json.MarshalIndent(data, "", " ") + if jsonErr != nil { + mlog.Log.Warnf("consensus artifact: failed to marshal JSON for %s: %v", filename, jsonErr) + return + } + if writeErr := os.WriteFile(artifactPath, artifactJSON, 0644); writeErr != nil { + mlog.Log.Warnf("consensus artifact: failed to write %s: %v", artifactPath, writeErr) + return + } + mlog.Log.FileOnlyf("consensus artifact written: %s", artifactPath) + } + + // Forkchoice Lightbringer enforcement: track slots whose bankhash verification + // is deferred (votes haven't landed yet). Swept each iteration. + type pendingBankhashCheck struct { + slot uint64 + bankhash solana.Hash + } + var pendingBankhashChecks []pendingBankhashCheck + for { // Start stall monitor goroutine (only after first block to avoid startup false positives) // Logs to file every second while waiting for a block @@ -1483,6 +1596,30 @@ func ReplayBlocks( partitionedRewardsInfo = handleEpochTransition(acctsDb, partitionedEpochRewardsEnabled, lastSlotCtx, replayCtx, epochSchedule, replayCtx.CurrentFeatures, block, currentEpoch) currentEpoch = block.Epoch justCrossedEpochBoundary = true + + // Refresh forkchoice with new epoch's stake weights and authorized voters + if global.HasForkChoice() { + forkChoice.UpdateEpoch( + currentEpoch, + global.EpochStakes(currentEpoch), + global.EpochTotalStake(currentEpoch), + global.EpochAuthorizedVoters(), + ) + } + + // Persist rebuilt authorized voters to state file so resume loads fresh data + if cache := global.EpochAuthorizedVoters(); cache != nil && mithrilState != nil { + updatedVoters := make(map[string][]string, cache.Len()) + for voteAcct, voters := range cache.Entries() { + voterStrs := make([]string, len(voters)) + for i, v := range voters { + voterStrs[i] = base58.Encode(v[:]) + } + updatedVoters[base58.Encode(voteAcct[:])] = voterStrs + } + mithrilState.ManifestEpochAuthorizedVoters = updatedVoters + } + if len(newlyActivatedFeatures) != 0 { block.EpochUpdatedAccts = append(block.EpochUpdatedAccts, newlyActivatedFeatures...) block.ParentEpochUpdatedAccts = append(block.ParentEpochUpdatedAccts, parentNewlyActivatedFeatures...) @@ -1560,6 +1697,180 @@ func ReplayBlocks( break } + // Submit block to fork choice service for vote accumulation + if global.HasForkChoice() { + global.SubmitBlockToForkChoiceService(block.Slot, block.Transactions) + } + + // Track processed blocks as candidates for skip-path chain verification. + // Uses bankhashes (not PoH hashes): Blockhash = our computed bankhash, + // LastBlockhash = parent bankhash. Skipped slots are omitted (solver handles gaps). + if candidateBlocks != nil { + candidateBlocks[block.Slot] = &forkchoice.SlotCandidate{ + Slot: block.Slot, + HasBlock: true, + Blockhash: solana.HashFromBytes(lastSlotCtx.FinalBankhash), + LastBlockhash: solana.Hash(block.ParentBankhash), + } + if !chainAnchorSet { + chainAnchorSlot = block.ParentSlot + chainAnchorHash = solana.Hash(block.ParentBankhash) + chainAnchorSet = true + } + } + + // Consensus enforcement: verify bankhashes against vote-confirmed hashes. + // Slots with observed supermajority are confirmed immediately; the 32-slot + // timeout only applies to unresolved slots (no winner yet). + // Enforcement scope is controlled by consensus.enforce_on_source config. + if consensusEnforceActive && global.HasForkChoice() { + pendingBankhashChecks = append(pendingBankhashChecks, pendingBankhashCheck{ + slot: block.Slot, + bankhash: solana.HashFromBytes(lastSlotCtx.FinalBankhash), + }) + + // TTL: slots pending longer than 2x the confirmation timeout are definitively unresolvable. + const pendingTTLSlots = 2 * forkchoice.VoteConfirmationTimeoutSlots + + // Track latest confirmed slot in this sweep for chain verification. + var latestConfirmedSlot uint64 + var latestConfirmedHash solana.Hash + + var remaining []pendingBankhashCheck + for _, pc := range pendingBankhashChecks { + confirmed := global.BankhashConfirmedForSlot(pc.slot, pc.bankhash) + switch confirmed.Status { + case forkchoice.BankhashHasSupermajority: + mlog.Log.Debugf("forkchoice: slot %d bankhash confirmed by supermajority", pc.slot) + if pc.slot > latestConfirmedSlot { + latestConfirmedSlot = pc.slot + latestConfirmedHash = confirmed.WinningHash + } + + case forkchoice.BankhashNeedWait: + remaining = append(remaining, pc) + + case forkchoice.BankhashNoSupermajority: + if confirmed.WinningHash != (solana.Hash{}) && confirmed.WinningHash != pc.bankhash { + // Our bankhash lost to a different supermajority hash — fork mismatch. + mlog.Log.Errorf("CONSENSUS MISMATCH: checked_slot=%d replay_slot=%d our=%s winning=%s (our_stake=%d winning_stake=%d/%d threshold=%d)", + pc.slot, + block.Slot, + base58.Encode(pc.bankhash[:]), + base58.Encode(confirmed.WinningHash[:]), + confirmed.StakeForHash, + confirmed.WinningStake, + confirmed.TotalEpochStake, + confirmed.ThresholdStake, + ) + + writeConsensusArtifact( + fmt.Sprintf("bankhash_mismatch_slot_%d.json", pc.slot), + map[string]interface{}{ + "type": "bankhash_mismatch", + "checked_slot": pc.slot, + "observed_while_replaying_slot": block.Slot, + "our_bankhash": base58.Encode(pc.bankhash[:]), + "winning_bankhash": base58.Encode(confirmed.WinningHash[:]), + "our_stake": confirmed.StakeForHash, + "winning_stake": confirmed.WinningStake, + "total_epoch_stake": confirmed.TotalEpochStake, + "threshold_stake": confirmed.ThresholdStake, + "confirmation_status": confirmed.Status.String(), + "policy": consensusCoordinator.Policy(), + "pending_age_slots": block.Slot - pc.slot, + "confirmation_timeout_slots": forkchoice.VoteConfirmationTimeoutSlots, + "chain_anchor_slot": chainAnchorSlot, + "chain_anchor_hash": base58.Encode(chainAnchorHash[:]), + "run_id": CurrentRunID, + }, + ) + + if consensusCoordinator.Policy() == "halt" { + result.Error = fmt.Errorf("consensus halt: slot %d bankhash mismatch (our=%s winning=%s)", + pc.slot, base58.Encode(pc.bankhash[:]), base58.Encode(confirmed.WinningHash[:])) + break + } + // policy == "warn": log already emitted, continue + } else if block.Slot > pc.slot+pendingTTLSlots { + // TTL expired — no supermajority emerged. Not a fork issue, just low participation. + mlog.Log.Debugf("forkchoice: slot %d no supermajority after TTL (our_stake=%d/%d)", pc.slot, confirmed.StakeForHash, confirmed.TotalEpochStake) + } else { + // No definitive result yet — keep checking. + remaining = append(remaining, pc) + } + } + if result.Error != nil { + break + } + } + pendingBankhashChecks = remaining + + if result.Error != nil { + mlog.Log.Errorf("Triggering graceful shutdown due to consensus mismatch") + break + } + + // Chain verification: use the skip-path solver to verify the bankhash chain + // from the last verified anchor to the latest confirmed slot. This validates + // that processed blocks chain correctly through parent bankhashes. + if candidateBlocks != nil && chainAnchorSet && latestConfirmedSlot > chainAnchorSlot { + _, resolveErr := consensusCoordinator.ResolveRange( + chainAnchorSlot+1, latestConfirmedSlot, chainAnchorHash, candidateBlocks) + if resolveErr != nil { + if errors.Is(resolveErr, forkchoice.ErrNoPath) { + mlog.Log.Errorf("CHAIN VERIFICATION FAILED: no valid bankhash chain from slot %d to %d (while replaying slot %d)", + chainAnchorSlot+1, latestConfirmedSlot, block.Slot) + writeConsensusArtifact( + fmt.Sprintf("chain_verification_failure_%d_%d.json", chainAnchorSlot+1, latestConfirmedSlot), + map[string]interface{}{ + "type": "chain_verification_failure", + "start_slot": chainAnchorSlot + 1, + "end_slot": latestConfirmedSlot, + "anchor_slot": chainAnchorSlot, + "anchor_hash": base58.Encode(chainAnchorHash[:]), + "target_slot": latestConfirmedSlot, + "target_hash": base58.Encode(latestConfirmedHash[:]), + "observed_while_replaying_slot": block.Slot, + "reason": "no_path", + "policy": consensusCoordinator.Policy(), + "candidate_count": len(candidateBlocks), + "run_id": CurrentRunID, + }, + ) + if consensusCoordinator.Policy() == "halt" { + result.Error = fmt.Errorf("consensus halt: chain verification failed (slots %d-%d)", + chainAnchorSlot+1, latestConfirmedSlot) + } + } else if errors.Is(resolveErr, forkchoice.ErrDepthExceeded) { + // Range too large for solver — not a chain error, just advance the anchor. + mlog.Log.Debugf("forkchoice: chain verification skipped (depth exceeded for slots %d-%d), advancing anchor", + chainAnchorSlot+1, latestConfirmedSlot) + } else { + mlog.Log.Debugf("forkchoice: chain verification deferred for slots %d-%d: %v", + chainAnchorSlot+1, latestConfirmedSlot, resolveErr) + } + } else { + mlog.Log.Debugf("forkchoice: chain verified slots %d-%d via skip-path solver", + chainAnchorSlot+1, latestConfirmedSlot) + } + // Advance anchor regardless — verified or not, we won't re-check this range. + chainAnchorSlot = latestConfirmedSlot + chainAnchorHash = latestConfirmedHash + // Clean up old candidates to bound memory usage. + for slot := range candidateBlocks { + if slot <= latestConfirmedSlot { + delete(candidateBlocks, slot) + } + } + + if result.Error != nil { + mlog.Log.Errorf("Triggering graceful shutdown due to chain verification failure") + break + } + } + } + replayCtx.Capitalization -= lastSlotCtx.LamportsBurnt // Clear ManifestEpochStakes after first replayed slot past snapshot @@ -2216,15 +2527,9 @@ func ProcessBlock( slotCtx.FinalBankhash = bankhash.CalculateBankHash(slotCtx, writableAccts, modifiedAccts, block.ParentBankhash, block.NumSignatures, block.Blockhash) metrics.GlobalBlockReplay.BankHash.AddTimingSince(start) - /*confirmed := global.BankhashConfirmedForSlot(slotCtx.Slot, solana.HashFromBytes(slotCtx.FinalBankhash)) - for confirmed == forkchoice.BankhashNeedWait { - confirmed = global.BankhashConfirmedForSlot(slotCtx.Slot, solana.HashFromBytes(slotCtx.FinalBankhash)) - }*/ - - // this slot should be skipped. - /*if confirmed == forkchoice.BankhashNoSupermajority { - // TODO: return signal that slot should be skipped - }*/ + // Bankhash consensus enforcement is handled in the replay loop (not here) + // because forkchoice is fed after ProcessBlock returns — checking here would + // never see votes from recently submitted blocks and could deadlock. // Enter critical commit window - panics here may leave AccountsDB inconsistent commitSlot.Store(slotCtx.Slot) diff --git a/pkg/replay/epoch.go b/pkg/replay/epoch.go index 95858ab6..4ae0f82a 100644 --- a/pkg/replay/epoch.go +++ b/pkg/replay/epoch.go @@ -272,6 +272,11 @@ func updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch uint64, b *block.B mlog.Log.Errorf("failed to rebuild vote cache at epoch boundary: %v", err) } + // Rebuild authorized voters cache from vote accounts for the new epoch. + // This ensures forkchoice vote parsing uses current authorities, not stale manifest data. + newEpoch := b.Epoch + rebuildAuthorizedVotersFromVoteCache(newEpoch) + // Skip epoch stakes storage if already cached (resume) if hasEpochStakes { mlog.Log.Infof("already had EpochStakes for epoch %d", leaderScheduleEpoch) @@ -291,3 +296,35 @@ func updateEpochStakesAndRefreshVoteCache(leaderScheduleEpoch uint64, b *block.B maps.Copy(b.EpochStakesPerVoteAcct, global.EpochStakes(leaderScheduleEpoch)) b.TotalEpochStake = scanResult.TotalEffectiveStake } + +// rebuildAuthorizedVotersFromVoteCache rebuilds the epoch authorized voters cache +// using vote states already loaded in the global VoteCache. This avoids re-reading +// AccountsDB since RebuildVoteCacheFromAccountsDB already populated the cache. +func rebuildAuthorizedVotersFromVoteCache(epoch uint64) { + voteCache := global.VoteCache() + newCache := epochstakes.NewEpochAuthorizedVotersCache() + + for voteAcct, voteState := range voteCache { + if voteState == nil { + continue + } + switch voteState.Type { + case sealevel.VoteStateVersionV0_23_5: + // V0_23_5 has a single authorized voter + newCache.PutEntry(voteAcct, voteState.V0_23_5.AuthorizedVoter) + case sealevel.VoteStateVersionV1_14_11: + voter, _, err := voteState.V1_14_11.AuthorizedVoters.GetOrCalculateAuthorizedVoterForEpoch(epoch) + if err == nil { + newCache.PutEntry(voteAcct, voter) + } + case sealevel.VoteStateVersionCurrent: + voter, _, err := voteState.Current.AuthorizedVoters.GetOrCalculateAuthorizedVoterForEpoch(epoch) + if err == nil { + newCache.PutEntry(voteAcct, voter) + } + } + } + + global.SetEpochAuthorizedVoters(newCache) + mlog.Log.Infof("forkchoice: rebuilt authorized voters cache for epoch %d (%d entries)", epoch, newCache.Len()) +}