Skip to content

Commit 1937aef

Browse files
committed
refactor: split platformvm/state.go (3617 lines) into 9 focused modules
Zero behavior change. Same package, same receiver, same struct. Methods moved to logical files by domain: - state_blocks.go: block storage, reindexing - state_chains.go: chain/net CRUD, chain names - state_commit.go: Commit, sync, init, genesis - state_diffs.go: validator diffs, weight/pubkey application - state_expiry.go: expiry iterator, load, write - state_metadata.go: timestamps, supply, fees, heights - state_txs.go: transaction and reward UTXO storage - state_utxos.go: UTXO get/add/delete/write - state_validators.go: all validator CRUD, load, write
1 parent 8f876cb commit 1937aef

10 files changed

Lines changed: 2891 additions & 2749 deletions

File tree

vms/platformvm/state/state.go

Lines changed: 0 additions & 2749 deletions
Large diffs are not rendered by default.
Lines changed: 268 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,268 @@
1+
// Copyright (C) 2019-2025, Lux Industries Inc. All rights reserved.
2+
// See the file LICENSE for licensing terms.
3+
4+
package state
5+
6+
import (
7+
"errors"
8+
"fmt"
9+
"math"
10+
"sync"
11+
"time"
12+
13+
"github.com/luxfi/database"
14+
"github.com/luxfi/ids"
15+
"github.com/luxfi/log"
16+
"github.com/luxfi/node/vms/platformvm/block"
17+
"github.com/luxfi/timer"
18+
)
19+
20+
func (s *state) AddStatelessBlock(block block.Block) {
21+
blkID := block.ID()
22+
s.addedBlockIDs[block.Height()] = blkID
23+
s.addedBlocks[blkID] = block
24+
}
25+
26+
func (s *state) GetStatelessBlock(blockID ids.ID) (block.Block, error) {
27+
if blk, exists := s.addedBlocks[blockID]; exists {
28+
return blk, nil
29+
}
30+
if blk, cached := s.blockCache.Get(blockID); cached {
31+
if blk == nil {
32+
return nil, database.ErrNotFound
33+
}
34+
return blk, nil
35+
}
36+
37+
blkBytes, err := s.blockDB.Get(blockID[:])
38+
if err == database.ErrNotFound {
39+
s.blockCache.Put(blockID, nil)
40+
return nil, database.ErrNotFound
41+
}
42+
if err != nil {
43+
return nil, err
44+
}
45+
46+
blk, _, err := parseStoredBlock(blkBytes)
47+
if err != nil {
48+
return nil, err
49+
}
50+
51+
s.blockCache.Put(blockID, blk)
52+
return blk, nil
53+
}
54+
55+
func (s *state) GetBlockIDAtHeight(height uint64) (ids.ID, error) {
56+
if blkID, exists := s.addedBlockIDs[height]; exists {
57+
return blkID, nil
58+
}
59+
if blkID, cached := s.blockIDCache.Get(height); cached {
60+
if blkID == ids.Empty {
61+
return ids.Empty, database.ErrNotFound
62+
}
63+
return blkID, nil
64+
}
65+
66+
blkID, err := database.GetID(s.blockIDDB, database.PackUInt64(height))
67+
if err == database.ErrNotFound {
68+
s.blockIDCache.Put(height, ids.Empty)
69+
return ids.Empty, database.ErrNotFound
70+
}
71+
if err != nil {
72+
return ids.Empty, err
73+
}
74+
75+
s.blockIDCache.Put(height, blkID)
76+
return blkID, nil
77+
}
78+
79+
func (s *state) writeBlocks() error {
80+
for blkID, blk := range s.addedBlocks {
81+
blkBytes := blk.Bytes()
82+
blkHeight := blk.Height()
83+
heightKey := database.PackUInt64(blkHeight)
84+
85+
delete(s.addedBlockIDs, blkHeight)
86+
s.blockIDCache.Put(blkHeight, blkID)
87+
if err := database.PutID(s.blockIDDB, heightKey, blkID); err != nil {
88+
return fmt.Errorf("failed to add blockID: %w", err)
89+
}
90+
91+
delete(s.addedBlocks, blkID)
92+
// Note: Evict is used rather than Put here because blk may end up
93+
// referencing additional data (because of shared byte slices) that
94+
// would not be properly accounted for in the cache sizing.
95+
s.blockCache.Evict(blkID)
96+
if err := s.blockDB.Put(blkID[:], blkBytes); err != nil {
97+
return fmt.Errorf("failed to write block %s: %w", blkID, err)
98+
}
99+
}
100+
return nil
101+
}
102+
103+
// parseStoredBlock returns the block and whether it is a legacy [stateBlk].
104+
// Invariant: blkBytes is safe to parse with blocks.GenesisCodec.
105+
// Retained for backward compatibility with pre-v1.14.x databases.
106+
func parseStoredBlock(blkBytes []byte) (block.Block, bool, error) {
107+
// Attempt to parse as blocks.Block
108+
blk, err := block.Parse(block.GenesisCodec, blkBytes)
109+
if err == nil {
110+
return blk, false, nil
111+
}
112+
113+
// Fallback to [stateBlk] using our legacy codec
114+
blkState := stateBlk{}
115+
if _, err := block.GenesisCodec.Unmarshal(blkBytes, &blkState); err != nil {
116+
// If we can't unmarshal as stateBlk, this might not be a block at all
117+
// (could be an index entry or other data in the blockDB)
118+
// Return the original parse error
119+
return nil, false, err
120+
}
121+
122+
blk, err = block.Parse(block.GenesisCodec, blkState.Bytes)
123+
return blk, true, err
124+
}
125+
126+
func (s *state) ReindexBlocks(lock sync.Locker, log log.Logger) error {
127+
has, err := s.singletonDB.Has(BlocksReindexedKey)
128+
if err != nil {
129+
return err
130+
}
131+
if has {
132+
log.Info("blocks already reindexed")
133+
return nil
134+
}
135+
136+
// It is possible that new blocks are added after grabbing this iterator.
137+
// New blocks are guaranteed to be persisted in the new format, so we don't
138+
// need to check them.
139+
blockIterator := s.blockDB.NewIterator()
140+
// Releasing is done using a closure to ensure that updating blockIterator
141+
// will result in having the most recent iterator released when executing
142+
// the deferred function.
143+
defer func() {
144+
blockIterator.Release()
145+
}()
146+
147+
log.Info("starting block reindexing")
148+
149+
var (
150+
startTime = time.Now()
151+
lastCommit = startTime
152+
nextUpdate = startTime.Add(indexLogFrequency)
153+
numIndicesChecked = 0
154+
numIndicesUpdated = 0
155+
)
156+
157+
for blockIterator.Next() {
158+
keyBytes := blockIterator.Key()
159+
valueBytes := blockIterator.Value()
160+
161+
// Skip entries that are not 32 bytes (not block IDs)
162+
if len(keyBytes) != ids.IDLen {
163+
continue
164+
}
165+
166+
blk, isStateBlk, err := parseStoredBlock(valueBytes)
167+
if err != nil {
168+
// Skip entries that can't be parsed as blocks
169+
// This could be metadata or other non-block data
170+
continue
171+
}
172+
173+
blkID := blk.ID()
174+
175+
// This block was previously stored using the legacy format, update the
176+
// index to remove the usage of stateBlk.
177+
if isStateBlk {
178+
blkBytes := blk.Bytes()
179+
if err := s.blockDB.Put(blkID[:], blkBytes); err != nil {
180+
return fmt.Errorf("failed to write block: %w", err)
181+
}
182+
183+
numIndicesUpdated++
184+
}
185+
186+
numIndicesChecked++
187+
188+
now := time.Now()
189+
if now.After(nextUpdate) {
190+
nextUpdate = now.Add(indexLogFrequency)
191+
192+
progress := timer.ProgressFromHash(blkID[:])
193+
eta := timer.EstimateETA(
194+
startTime,
195+
progress,
196+
math.MaxUint64,
197+
)
198+
199+
log.Info("reindexing blocks",
200+
"numIndicesUpdated", numIndicesUpdated,
201+
"numIndicesChecked", numIndicesChecked,
202+
"eta", eta,
203+
)
204+
}
205+
206+
if numIndicesChecked%indexIterationLimit == 0 {
207+
// We must hold the lock during committing to make sure we don't
208+
// attempt to commit to disk while a block is concurrently being
209+
// accepted.
210+
lock.Lock()
211+
err := errors.Join(
212+
s.Commit(),
213+
blockIterator.Error(),
214+
)
215+
lock.Unlock()
216+
if err != nil {
217+
return err
218+
}
219+
220+
// We release the iterator here to allow the underlying database to
221+
// clean up deleted state.
222+
blockIterator.Release()
223+
224+
// We take the minimum here because it's possible that the node is
225+
// currently bootstrapping. This would mean that grabbing the lock
226+
// could take an extremely long period of time; which we should not
227+
// delay processing for.
228+
indexDuration := now.Sub(lastCommit)
229+
sleepDuration := min(
230+
indexIterationSleepMultiplier*indexDuration,
231+
indexIterationSleepCap,
232+
)
233+
time.Sleep(sleepDuration)
234+
235+
// Make sure not to include the sleep duration into the next index
236+
// duration.
237+
lastCommit = time.Now()
238+
239+
blockIterator = s.blockDB.NewIteratorWithStart(blkID[:])
240+
}
241+
}
242+
243+
// Ensure we fully iterated over all blocks before writing that indexing has
244+
// finished.
245+
//
246+
// Note: This is needed because a transient read error could cause the
247+
// iterator to stop early.
248+
if err := blockIterator.Error(); err != nil {
249+
return fmt.Errorf("failed to iterate over historical blocks: %w", err)
250+
}
251+
252+
if err := s.singletonDB.Put(BlocksReindexedKey, nil); err != nil {
253+
return fmt.Errorf("failed to put marked blocks as reindexed: %w", err)
254+
}
255+
256+
// We must hold the lock during committing to make sure we don't attempt to
257+
// commit to disk while a block is concurrently being accepted.
258+
lock.Lock()
259+
defer lock.Unlock()
260+
261+
log.Info("finished block reindexing",
262+
"numIndicesUpdated", numIndicesUpdated,
263+
"numIndicesChecked", numIndicesChecked,
264+
"duration", time.Since(startTime),
265+
)
266+
267+
return s.Commit()
268+
}

0 commit comments

Comments
 (0)