Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ jobs:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.25'
go-version-file: 'go.mod'
- name: test
run: go test ./...
30 changes: 30 additions & 0 deletions filesource.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,16 @@ type FileSource struct {
blockIndexProvider BlockIndexProvider
errorOutOnMissingFile bool

// liveBlockFloorGetter, when set, returns the lowest block number currently held
// in the live (reversible) buffer. Block-index skipping is only ever correct for
// strictly-historical blocks (the index is produced after merged-blocks), so once
// the reader reaches the overlap with the live buffer it stops consulting the
// index and emits every block. This lets the joining source re-join the live
// source instead of waiting for a not-yet-merged file (firehose-core issue #109).
// The per-block filtering seen by the client is unaffected: that is done by the
// preprocessor, not the index.
liveBlockFloorGetter func() uint64

// these blocks will be included even if the filter does not want them.
// If we are on a chain that skips block numbers, the NEXT block will be sent.
whitelistedBlocks map[uint64]bool
Expand Down Expand Up @@ -115,6 +125,17 @@ func FileSourceWithBlockIndexProvider(prov BlockIndexProvider) FileSourceOption
}
}

// FileSourceWithLiveBlockFloorGetter makes the file source stop using its
// blockIndexProvider (i.e. stop skipping non-matching blocks) once it reaches
// blocks that also live in the reversible buffer, as reported by getter(). This
// guarantees the joining source can re-join the live source near HEAD. A getter
// returning 0 disables the behaviour (e.g. live buffer not ready yet).
func FileSourceWithLiveBlockFloorGetter(getter func() uint64) FileSourceOption {
return func(s *FileSource) {
s.liveBlockFloorGetter = getter
}
}

// FileSourceErrorOnMissingMergedBlocksFile will make the file source fail if the merged blocks file is missing
// instead of waiting for it forever.
//
Expand Down Expand Up @@ -388,6 +409,15 @@ func (s *FileSource) lookupBlockIndex(in uint64) (baseBlock uint64, outBlocks []
begin := time.Now()
baseBlock = in
for {
// Once this bundle may overlap the live buffer, stop using the index: the
// index only ever covers strictly-historical blocks, and we must emit every
// block from here so the joining source can re-join the live source.
if s.liveBlockFloorGetter != nil {
if floor := s.liveBlockFloorGetter(); floor != 0 && baseBlock+s.bundleSize > floor {
return baseBlock, nil, true
}
}

filteredBlocks, err := s.blockIndexProvider.BlocksInRange(baseBlock, s.bundleSize)
if err != nil {
s.logger.Debug("blocks_in_range returns error, deactivating",
Expand Down
29 changes: 29 additions & 0 deletions filesource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,3 +309,32 @@ func TestFileSource_lookupBlockIndex(t *testing.T) {
}

}

// TestFileSource_lookupBlockIndex_LiveFloor checks that index-skipping stops at
// the live buffer floor: instead of skipping non-matching blocks all the way up
// to LastIndexedBlock (which would prevent the joining source from re-joining the
// live source), the lookup stops as soon as a bundle may overlap the live buffer
// and signals noMoreIndex so that bundle is read entirely. (firehose-core #109)
func TestFileSource_lookupBlockIndex_LiveFloor(t *testing.T) {
// No matching blocks anywhere; without a floor this would skip up to base 400.
// Live buffer floor is 250, so the lookup must stop at the bundle [200,300).
fs := &FileSource{
blockIndexProvider: &TestBlockIndexProvider{Blocks: nil, LastIndexedBlock: 399},
bundleSize: 100,
logger: zlog,
timeBetweenProgressBlocks: 10 * time.Second,
liveBlockFloorGetter: func() uint64 { return 250 },
}

baseBlock, blocks, noMoreIndex := fs.lookupBlockIndex(100)
assert.True(t, noMoreIndex, "must stop using the index at the live overlap")
assert.Equal(t, uint64(200), baseBlock, "stops at the first bundle that can contain the floor block")
assert.Nil(t, blocks, "nil => bundle is read entirely, so overlap blocks are emitted for the join")

// A floor of 0 (live buffer not ready) keeps the previous behaviour.
fs.blockIndexProvider = &TestBlockIndexProvider{Blocks: nil, LastIndexedBlock: 399}
fs.liveBlockFloorGetter = func() uint64 { return 0 }
baseBlock, _, noMoreIndex = fs.lookupBlockIndex(100)
assert.True(t, noMoreIndex)
assert.Equal(t, uint64(400), baseBlock, "floor 0 disables the early stop")
}
22 changes: 21 additions & 1 deletion forkable/forkable.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,27 @@ func (p *Forkable) blocksFromCursor(cursor *bstream.Cursor) ([]*bstream.Preproce
}

if cursor.LIB.Num() < seg[0].BlockNum {
return nil, fmt.Errorf("complete segment does not include cursor LIB (lowest block: %d, cursor lib: %d)", seg[0].BlockNum, cursor.LIB.Num())
// The cursor's LIB is older than the oldest block still held in the live
// buffer. A cursor block that is itself below the buffer is purely historical
// and can only be served from the merged-blocks (archive).
if cursor.Block.Num() < seg[0].BlockNum {
return nil, fmt.Errorf("complete segment does not include cursor block (lowest block: %d, cursor block: %d, cursor lib: %d)", seg[0].BlockNum, cursor.Block.Num(), cursor.LIB.Num())
}

// The cursor block is in the live range. Only clamp the stale LIB up to the
// buffer floor when the block is on the canonical chain: seg[0] is then one of
// its (irreversible) ancestors, so the client loses nothing and we avoid the
// archive fallback that would hang while the merger is behind
// (https://github.com/streamingfast/firehose-core/issues/109). A forked cursor
// block does NOT descend from seg[0], so we must
// not move its LIB there: leave it untouched and let the forked-cursor path
// below resolve it (it re-joins the canonical chain, or errors if that would
// require blocks already purged below the buffer floor).
if blockIn(cursor.Block.ID(), seg) {
clamped := *cursor
clamped.LIB = seg[0].AsRef()
cursor = &clamped
}
}

// cursor is not forked, we can bring it quickly to forkDB HEAD
Expand Down
102 changes: 102 additions & 0 deletions forkable/forkable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1606,6 +1606,108 @@ type expectedBlock struct {
cursorLibNum uint64
}

// TestForkable_BlocksFromCursor_StaleLIB_LiveBlock is a regression test for
// firehose-core issue #109: when a stream resumes from a cursor whose LIB has
// already been purged from the live buffer, but whose Block is still a live block
// (above LIB, inside the buffer), the forkable must still serve it from the live
// buffer instead of rejecting it (which made the caller fall back to archive
// streaming and hang while the merger was behind).
func TestForkable_BlocksFromCursor_StaleLIB_LiveBlock(t *testing.T) {
const window = 5 // head-LIB distance
frkb := New(nullHandler, WithKeptFinalBlocks(2))
for n := uint64(1); n <= 12; n++ {
lib := uint64(1)
if n > window {
lib = n - window
}
blk := bstream.TestBlockWithLIBNum(fmt.Sprintf("%08x", n), fmt.Sprintf("%08x", n-1), lib)
require.NoError(t, frkb.ProcessBlock(blk, nil))
}
// After feeding 12 blocks: head=12, forkdb LIB=7, buffer floor (seg[0])=5.

t.Run("live block resumes from buffer despite stale cursor LIB", func(t *testing.T) {
// Block 9 is above LIB (live); cursor LIB 3 was long purged (< floor 5).
cursor := &bstream.Cursor{
Step: bstream.StepNew,
Block: bstream.NewBlockRef("00000009", 9),
HeadBlock: bstream.NewBlockRef("00000009", 9),
LIB: bstream.NewBlockRef("00000003", 3),
}
out, err := frkb.blocksFromCursor(cursor)
require.NoError(t, err)

var seenBlocks []expectedBlock
for _, blk := range out {
seenBlocks = append(seenBlocks, expectedBlock{blk.Block, blk.Obj.(*ForkableObject).Step(), blk.Obj.(*ForkableObject).Cursor().LIB.Num()})
}
assertExpectedBlocks(t, []expectedBlock{
{bstream.TestBlockWithLIBNum("00000006", "00000005", 1), bstream.StepIrreversible, 6},
{bstream.TestBlockWithLIBNum("00000007", "00000006", 2), bstream.StepIrreversible, 7},
{bstream.TestBlockWithLIBNum("0000000a", "00000009", 5), bstream.StepNew, 7},
{bstream.TestBlockWithLIBNum("0000000b", "0000000a", 6), bstream.StepNew, 7},
{bstream.TestBlockWithLIBNum("0000000c", "0000000b", 7), bstream.StepNew, 7},
}, seenBlocks)
})

t.Run("historical block below buffer still errors (archive territory)", func(t *testing.T) {
// Block 3 is below the buffer floor (5): genuinely historical, must error so
// the caller serves it from merged-blocks.
cursor := &bstream.Cursor{
Step: bstream.StepNew,
Block: bstream.NewBlockRef("00000003", 3),
HeadBlock: bstream.NewBlockRef("00000003", 3),
LIB: bstream.NewBlockRef("00000001", 1),
}
_, err := frkb.blocksFromCursor(cursor)
require.Error(t, err)
})

t.Run("reorged cursor block that re-joins above the floor is served, cursor LIB not important", func(t *testing.T) {
// "00000009b" is a fork sibling of canonical block 9 (same parent 8). Its
// number (9) is in the live range, but it is off the canonical chain, so its
// LIB is NOT clamped. The forked-cursor path re-joins at block 8 (in seg) and
// resolves it: UNDO 00000009b, then the canonical chain forward.
require.NoError(t, frkb.ProcessBlock(bstream.TestBlockWithLIBNum("00000009b", "00000008", 4), nil))

cursor := &bstream.Cursor{
Step: bstream.StepNew,
Block: bstream.NewBlockRef("00000009b", 9),
HeadBlock: bstream.NewBlockRef("00000009b", 9),
LIB: bstream.NewBlockRef("00000003", 3),
}
out, err := frkb.blocksFromCursor(cursor)
require.NoError(t, err)

var seenBlocks []expectedBlock
for _, blk := range out {
seenBlocks = append(seenBlocks, expectedBlock{blk.Block, blk.Obj.(*ForkableObject).Step(), blk.Obj.(*ForkableObject).Cursor().LIB.Num()})
}
assertExpectedBlocks(t, []expectedBlock{
{bstream.TestBlockWithLIBNum("00000009b", "00000008", 4), bstream.StepUndo, 3}, // LIB left stale (3), not clamped
{bstream.TestBlockWithLIBNum("00000006", "00000005", 1), bstream.StepIrreversible, 6},
{bstream.TestBlockWithLIBNum("00000007", "00000006", 2), bstream.StepIrreversible, 7},
{bstream.TestBlockWithLIBNum("00000009", "00000008", 4), bstream.StepNew, 7},
{bstream.TestBlockWithLIBNum("0000000a", "00000009", 5), bstream.StepNew, 7},
{bstream.TestBlockWithLIBNum("0000000b", "0000000a", 6), bstream.StepNew, 7},
{bstream.TestBlockWithLIBNum("0000000c", "0000000b", 7), bstream.StepNew, 7},
}, seenBlocks)
})

t.Run("cursor block that cannot re-join the live segment errors (archive territory)", func(t *testing.T) {
// A cursor block the buffer never saw (or one whose fork only re-joins below
// the purged floor): it is not clamped and the forked-cursor path cannot find
// a junction, so it errors and the caller falls back to archive.
cursor := &bstream.Cursor{
Step: bstream.StepNew,
Block: bstream.NewBlockRef("000000ff", 9),
HeadBlock: bstream.NewBlockRef("000000ff", 9),
LIB: bstream.NewBlockRef("00000003", 3),
}
_, err := frkb.blocksFromCursor(cursor)
require.Error(t, err)
})
}

func TestForkable_BlocksFromIrreversibleNum(t *testing.T) {

tests := []struct {
Expand Down
Loading
Loading