diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..20e11f1 --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,17 @@ +name: Test +on: + pull_request: + push: + branches: + - develop + +jobs: + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - uses: actions/setup-go@v5 + with: + go-version: '1.25' + - name: test + run: go test ./... diff --git a/forkable/forkable.go b/forkable/forkable.go index c4f7b47..0a21cf0 100644 --- a/forkable/forkable.go +++ b/forkable/forkable.go @@ -1106,6 +1106,9 @@ func (p *Forkable) triggersNewLongestChain(blk *pbbstream.Block) bool { if blk.PartialIndex == 0 { return true // we have the full version of block that was previously partial } + if p.lastBlockSent.LastPartial && !blk.LastPartial { + return false // the block is settled by its 'last partial': further non-last partial versions are noise until a reorg replaces it + } return blk.PartialIndex > p.lastBlockSent.PartialIndex } return false diff --git a/forkable/forkable_test.go b/forkable/forkable_test.go index 23737d5..6b0d936 100644 --- a/forkable/forkable_test.go +++ b/forkable/forkable_test.go @@ -2528,6 +2528,142 @@ func TestForkable_ProcessBlock_WithPartialBlocks(t *testing.T) { }, }, }, + { + name: "ignore non-last partial after lastPartial", + forkDB: fdbLinked("00000002a"), + protocolFirstBlock: 2, + processBlocks: []*pbbstream.Block{ + bTestBlock("00000003a", "00000002a"), // block{3} + partialBlock("00000004b", "00000003a", 2), // partialBlock{4, idx=2} + lastPartialBlock("00000004a", "00000003a", 3), // partialBlock{4, idx=3, LAST}: block 4 is settled + partialBlock("00000004e", "00000003a", 7), // higher index, but block 4 is settled -> ignored + bTestBlock("00000005a", "00000004a"), // block{5} + }, + expectedResult: []*ForkableObject{ + { + step: bstream.StepNew, + Obj: "00000003a", + block: tinyBlk("00000003a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000002a"), + }, + { + step: bstream.StepPartial, + Obj: "00000004b", + block: tinyBlk("00000004b"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000003a"), + }, + { + step: bstream.StepNewPartial, + Obj: "00000004a", + block: tinyBlk("00000004a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000003a"), + }, + // ignored 4e: block 4 was settled by its last partial + { + step: bstream.StepNew, + Obj: "00000005a", + block: tinyBlk("00000005a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000004a"), + }, + }, + expectResultWithoutPartialBlocks: []*ForkableObject{ + { + step: bstream.StepNew, + Obj: "00000003a", + block: tinyBlk("00000003a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000002a"), + }, + // StepPartial(4b) filtered out + { + step: bstream.StepNewPartial, + Obj: "00000004a", + block: tinyBlk("00000004a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000003a"), + }, + // ignored 4e: block 4 was settled by its last partial + { + step: bstream.StepNew, + Obj: "00000005a", + block: tinyBlk("00000005a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000004a"), + }, + }, + }, + { + name: "ignore 'duplicate' last partial with higher index after lastPartial", + forkDB: fdbLinked("00000002a"), + protocolFirstBlock: 2, + processBlocks: []*pbbstream.Block{ + bTestBlock("00000003a", "00000002a"), // block{3} + partialBlock("00000004b", "00000003a", 2), // partialBlock{4, idx=2} + lastPartialBlock("00000004a", "00000003a", 3), // partialBlock{4, idx=3, LAST}: block 4 is settled + lastPartialBlock("00000004a", "00000003a", 4), // partialBlock{4, idx=4, LAST}: block 4 is settled again, from another reader that had to bump index again + bTestBlock("00000005a", "00000004a"), // block{5} + }, + expectedResult: []*ForkableObject{ + { + step: bstream.StepNew, + Obj: "00000003a", + block: tinyBlk("00000003a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000002a"), + }, + { + step: bstream.StepPartial, + Obj: "00000004b", + block: tinyBlk("00000004b"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000003a"), + }, + { + step: bstream.StepNewPartial, + Obj: "00000004a", + block: tinyBlk("00000004a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000003a"), + }, + // ignored 4e: block 4 was settled by its last partial + { + step: bstream.StepNew, + Obj: "00000005a", + block: tinyBlk("00000005a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000004a"), + }, + }, + expectResultWithoutPartialBlocks: []*ForkableObject{ + { + step: bstream.StepNew, + Obj: "00000003a", + block: tinyBlk("00000003a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000002a"), + }, + // StepPartial(4b) filtered out + { + step: bstream.StepNewPartial, + Obj: "00000004a", + block: tinyBlk("00000004a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000003a"), + }, + // ignored 4e: block 4 was settled by its last partial + { + step: bstream.StepNew, + Obj: "00000005a", + block: tinyBlk("00000005a"), + lastLIBSent: tinyBlk("00000002a"), + parentBlock: tinyBlk("00000004a"), + }, + }, + }, { name: "no LIB change on partial blocks unless it is lastPartial", forkDB: fdbLinked("00000002a"), diff --git a/hub/subscription.go b/hub/subscription.go index 924491d..953f132 100644 --- a/hub/subscription.go +++ b/hub/subscription.go @@ -80,9 +80,13 @@ func (s *Subscription) getLatestPendingVersionOfCandidateBlock(candidate *bstrea } } - // only look for next block in a chain of "partial" blocks. + // only skip over plain "partial" blocks (intermediate versions of a block being built). + // Any other step must be delivered as-is: in particular StepNewPartial (the closing + // 'last partial' of a block) also matches StepPartial, but skipping past it would + // hide the authoritative version of the block from the subscriber. if stepable, ok := candidate.Obj.(bstream.Stepable); ok { - if !stepable.Step().Matches(bstream.StepPartial) { + step := stepable.Step() + if !step.Matches(bstream.StepPartial) || step.Matches(bstream.StepNew) { return candidate } } diff --git a/hub/subscription_repro_test.go b/hub/subscription_repro_test.go new file mode 100644 index 0000000..6e53336 --- /dev/null +++ b/hub/subscription_repro_test.go @@ -0,0 +1,80 @@ +package hub + +import ( + "fmt" + "testing" + "time" + + "github.com/streamingfast/bstream" + pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func reproBlock(id string, num uint64, idx int32, last bool) *pbbstream.Block { + return &pbbstream.Block{ + Id: id, + Number: num, + ParentId: fmt.Sprintf("%04da", num-1), + ParentNum: num - 1, + LibNum: 1, + Timestamp: timestamppb.New(time.Now()), + PartialIndex: idx, + LastPartial: last, + } +} + +type stepObj struct{ step bstream.StepType } + +func (s stepObj) Step() bstream.StepType { return s.step } +func (s stepObj) FinalBlockHeight() uint64 { return 0 } +func (s stepObj) ReorgJunctionBlock() bstream.BlockRef { return nil } + +var _ bstream.Stepable = stepObj{} + +// reproduces the live flashblocks flow under subscriber lag: the channel holds +// several blocks; the closing "last partial" (StepNewPartial) of a block is +// followed in the channel by a stale partial version of the same block number. +// The skip-to-latest-version logic must not skip past the closing block. +func TestSubscriptionEatsLastPartial(t *testing.T) { + var received []string + handler := bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { + received = append(received, fmt.Sprintf("%s %s(idx=%d,last=%v)", obj.(bstream.Stepable).Step(), blk.Id, blk.PartialIndex, blk.LastPartial)) + return nil + }) + + sub := NewSubscription(handler, 100, true) + + push := func(step bstream.StepType, blk *pbbstream.Block) { + if err := sub.push(&bstream.PreprocessedBlock{Block: blk, Obj: stepObj{step}}); err != nil { + t.Fatal(err) + } + } + + // subscriber is lagging: all of these are already buffered in the channel + // when it starts draining (flashblocks of block 4, its closing last-partial, + // one stale post-seal flashblock version, then flashblocks of block 5) + push(bstream.StepPartial, reproBlock("0004p1", 4, 1, false)) + push(bstream.StepPartial, reproBlock("0004p2", 4, 2, false)) + push(bstream.StepNewPartial, reproBlock("0004a", 4, 3, true)) // closing of block 4 + push(bstream.StepPartial, reproBlock("0004p4", 4, 4, false)) // stale flashblock version, post-seal + push(bstream.StepPartial, reproBlock("0005p1", 5, 1, false)) + push(bstream.StepNewPartial, reproBlock("0005a", 5, 2, true)) // closing of block 5 + push(bstream.StepPartial, reproBlock("0006p1", 6, 1, false)) + + go func() { + time.Sleep(500 * time.Millisecond) + sub.Shutdown(nil) + }() + sub.Run() + + for _, r := range received { + fmt.Println(" delivered:", r) + } + + for _, r := range received { + if r == "new,partial 0004a(idx=3,last=true)" { + return // closing was delivered, all good + } + } + t.Fatalf("closing last-partial block 0004a was never delivered to the subscriber") +}