Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Test
on:
pull_request:
push:
branches:
- develop

jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-go@v5
with:
go-version: '1.25'
- name: test
run: go test ./...
3 changes: 3 additions & 0 deletions forkable/forkable.go
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,9 @@ func (p *Forkable) triggersNewLongestChain(blk *pbbstream.Block) bool {
if blk.PartialIndex == 0 {
return true // we have the full version of block that was previously partial
}
if p.lastBlockSent.LastPartial && !blk.LastPartial {
return false // the block is settled by its 'last partial': further non-last partial versions are noise until a reorg replaces it
}
return blk.PartialIndex > p.lastBlockSent.PartialIndex
}
return false
Expand Down
136 changes: 136 additions & 0 deletions forkable/forkable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2528,6 +2528,142 @@ func TestForkable_ProcessBlock_WithPartialBlocks(t *testing.T) {
},
},
},
{
name: "ignore non-last partial after lastPartial",
forkDB: fdbLinked("00000002a"),
protocolFirstBlock: 2,
processBlocks: []*pbbstream.Block{
bTestBlock("00000003a", "00000002a"), // block{3}
partialBlock("00000004b", "00000003a", 2), // partialBlock{4, idx=2}
lastPartialBlock("00000004a", "00000003a", 3), // partialBlock{4, idx=3, LAST}: block 4 is settled
partialBlock("00000004e", "00000003a", 7), // higher index, but block 4 is settled -> ignored
bTestBlock("00000005a", "00000004a"), // block{5}
},
expectedResult: []*ForkableObject{
{
step: bstream.StepNew,
Obj: "00000003a",
block: tinyBlk("00000003a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000002a"),
},
{
step: bstream.StepPartial,
Obj: "00000004b",
block: tinyBlk("00000004b"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000003a"),
},
{
step: bstream.StepNewPartial,
Obj: "00000004a",
block: tinyBlk("00000004a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000003a"),
},
// ignored 4e: block 4 was settled by its last partial
{
step: bstream.StepNew,
Obj: "00000005a",
block: tinyBlk("00000005a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000004a"),
},
},
expectResultWithoutPartialBlocks: []*ForkableObject{
{
step: bstream.StepNew,
Obj: "00000003a",
block: tinyBlk("00000003a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000002a"),
},
// StepPartial(4b) filtered out
{
step: bstream.StepNewPartial,
Obj: "00000004a",
block: tinyBlk("00000004a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000003a"),
},
// ignored 4e: block 4 was settled by its last partial
{
step: bstream.StepNew,
Obj: "00000005a",
block: tinyBlk("00000005a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000004a"),
},
},
},
{
name: "ignore 'duplicate' last partial with higher index after lastPartial",
forkDB: fdbLinked("00000002a"),
protocolFirstBlock: 2,
processBlocks: []*pbbstream.Block{
bTestBlock("00000003a", "00000002a"), // block{3}
partialBlock("00000004b", "00000003a", 2), // partialBlock{4, idx=2}
lastPartialBlock("00000004a", "00000003a", 3), // partialBlock{4, idx=3, LAST}: block 4 is settled
lastPartialBlock("00000004a", "00000003a", 4), // partialBlock{4, idx=4, LAST}: block 4 is settled again, from another reader that had to bump index again

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 nit: this case passes with the fix reverted — the duplicate 00000004a is dropped by same-ID dedup in AddLink (returns exists), not by the new LastPartial && !blk.LastPartial branch (that branch needs !blk.LastPartial, but this block IS a last-partial). To actually exercise the new logic, give the second close a different ID — but note a different-ID, higher-index last-partial then hits return blk.PartialIndex > p.lastBlockSent.PartialIndex = true and triggers, so assert the behavior you actually want.

bTestBlock("00000005a", "00000004a"), // block{5}
},
expectedResult: []*ForkableObject{
{
step: bstream.StepNew,
Obj: "00000003a",
block: tinyBlk("00000003a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000002a"),
},
{
step: bstream.StepPartial,
Obj: "00000004b",
block: tinyBlk("00000004b"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000003a"),
},
{
step: bstream.StepNewPartial,
Obj: "00000004a",
block: tinyBlk("00000004a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000003a"),
},
// ignored 4e: block 4 was settled by its last partial

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 nit: copy-paste comment — there is no 4e block in this case (that's case 1). The ignored block here is the duplicate 00000004a idx=4. Same at L2657.

{
step: bstream.StepNew,
Obj: "00000005a",
block: tinyBlk("00000005a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000004a"),
},
},
expectResultWithoutPartialBlocks: []*ForkableObject{
{
step: bstream.StepNew,
Obj: "00000003a",
block: tinyBlk("00000003a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000002a"),
},
// StepPartial(4b) filtered out
{
step: bstream.StepNewPartial,
Obj: "00000004a",
block: tinyBlk("00000004a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000003a"),
},
// ignored 4e: block 4 was settled by its last partial
{
step: bstream.StepNew,
Obj: "00000005a",
block: tinyBlk("00000005a"),
lastLIBSent: tinyBlk("00000002a"),
parentBlock: tinyBlk("00000004a"),
},
},
},
{
name: "no LIB change on partial blocks unless it is lastPartial",
forkDB: fdbLinked("00000002a"),
Expand Down
8 changes: 6 additions & 2 deletions hub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,13 @@ func (s *Subscription) getLatestPendingVersionOfCandidateBlock(candidate *bstrea
}
}

// only look for next block in a chain of "partial" blocks.
// only skip over plain "partial" blocks (intermediate versions of a block being built).
// Any other step must be delivered as-is: in particular StepNewPartial (the closing
// 'last partial' of a block) also matches StepPartial, but skipping past it would
// hide the authoritative version of the block from the subscriber.
if stepable, ok := candidate.Obj.(bstream.Stepable); ok {
if !stepable.Step().Matches(bstream.StepPartial) {
step := stepable.Step()
if !step.Matches(bstream.StepPartial) || step.Matches(bstream.StepNew) {
return candidate
}
}
Expand Down
80 changes: 80 additions & 0 deletions hub/subscription_repro_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package hub

import (
"fmt"
"testing"
"time"

"github.com/streamingfast/bstream"
pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)

func reproBlock(id string, num uint64, idx int32, last bool) *pbbstream.Block {
return &pbbstream.Block{
Id: id,
Number: num,
ParentId: fmt.Sprintf("%04da", num-1),
ParentNum: num - 1,
LibNum: 1,
Timestamp: timestamppb.New(time.Now()),
PartialIndex: idx,
LastPartial: last,
}
}

type stepObj struct{ step bstream.StepType }

func (s stepObj) Step() bstream.StepType { return s.step }
func (s stepObj) FinalBlockHeight() uint64 { return 0 }
func (s stepObj) ReorgJunctionBlock() bstream.BlockRef { return nil }

var _ bstream.Stepable = stepObj{}

// reproduces the live flashblocks flow under subscriber lag: the channel holds
// several blocks; the closing "last partial" (StepNewPartial) of a block is
// followed in the channel by a stale partial version of the same block number.
// The skip-to-latest-version logic must not skip past the closing block.
func TestSubscriptionEatsLastPartial(t *testing.T) {
var received []string
handler := bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error {
received = append(received, fmt.Sprintf("%s %s(idx=%d,last=%v)", obj.(bstream.Stepable).Step(), blk.Id, blk.PartialIndex, blk.LastPartial))
return nil
})

sub := NewSubscription(handler, 100, true)

push := func(step bstream.StepType, blk *pbbstream.Block) {
if err := sub.push(&bstream.PreprocessedBlock{Block: blk, Obj: stepObj{step}}); err != nil {
t.Fatal(err)
}
}

// subscriber is lagging: all of these are already buffered in the channel
// when it starts draining (flashblocks of block 4, its closing last-partial,
// one stale post-seal flashblock version, then flashblocks of block 5)
push(bstream.StepPartial, reproBlock("0004p1", 4, 1, false))
push(bstream.StepPartial, reproBlock("0004p2", 4, 2, false))
push(bstream.StepNewPartial, reproBlock("0004a", 4, 3, true)) // closing of block 4
push(bstream.StepPartial, reproBlock("0004p4", 4, 4, false)) // stale flashblock version, post-seal
push(bstream.StepPartial, reproBlock("0005p1", 5, 1, false))
push(bstream.StepNewPartial, reproBlock("0005a", 5, 2, true)) // closing of block 5
push(bstream.StepPartial, reproBlock("0006p1", 6, 1, false))

go func() {
time.Sleep(500 * time.Millisecond)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 nit: fixed time.Sleep(500ms) adds 0.5s wall-clock and couples the test to timing — it only works because every block is pre-buffered before Run(). Prefer shutting down once len(received) reaches the expected count.

sub.Shutdown(nil)
}()
sub.Run()

for _, r := range received {
fmt.Println(" delivered:", r)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔵 nit: fmt.Println spams test output. Drop it or use t.Log.

}

for _, r := range received {
if r == "new,partial 0004a(idx=3,last=true)" {
return // closing was delivered, all good
}
}
t.Fatalf("closing last-partial block 0004a was never delivered to the subscriber")
}
Loading