-
Notifications
You must be signed in to change notification settings - Fork 13
fix/hub subscription skips lastpartial #49
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d0b3d0f
03699a2
7f1fa4a
06f33f4
3f1949c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,17 @@ | ||
| name: Test | ||
| on: | ||
| pull_request: | ||
| push: | ||
| branches: | ||
| - develop | ||
|
|
||
| jobs: | ||
| test: | ||
| runs-on: ubuntu-latest | ||
| steps: | ||
| - uses: actions/checkout@v4 | ||
| - uses: actions/setup-go@v5 | ||
| with: | ||
| go-version: '1.25' | ||
| - name: test | ||
| run: go test ./... |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2528,6 +2528,142 @@ func TestForkable_ProcessBlock_WithPartialBlocks(t *testing.T) { | |
| }, | ||
| }, | ||
| }, | ||
| { | ||
| name: "ignore non-last partial after lastPartial", | ||
| forkDB: fdbLinked("00000002a"), | ||
| protocolFirstBlock: 2, | ||
| processBlocks: []*pbbstream.Block{ | ||
| bTestBlock("00000003a", "00000002a"), // block{3} | ||
| partialBlock("00000004b", "00000003a", 2), // partialBlock{4, idx=2} | ||
| lastPartialBlock("00000004a", "00000003a", 3), // partialBlock{4, idx=3, LAST}: block 4 is settled | ||
| partialBlock("00000004e", "00000003a", 7), // higher index, but block 4 is settled -> ignored | ||
| bTestBlock("00000005a", "00000004a"), // block{5} | ||
| }, | ||
| expectedResult: []*ForkableObject{ | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000003a", | ||
| block: tinyBlk("00000003a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000002a"), | ||
| }, | ||
| { | ||
| step: bstream.StepPartial, | ||
| Obj: "00000004b", | ||
| block: tinyBlk("00000004b"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000003a"), | ||
| }, | ||
| { | ||
| step: bstream.StepNewPartial, | ||
| Obj: "00000004a", | ||
| block: tinyBlk("00000004a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000003a"), | ||
| }, | ||
| // ignored 4e: block 4 was settled by its last partial | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000005a", | ||
| block: tinyBlk("00000005a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000004a"), | ||
| }, | ||
| }, | ||
| expectResultWithoutPartialBlocks: []*ForkableObject{ | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000003a", | ||
| block: tinyBlk("00000003a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000002a"), | ||
| }, | ||
| // StepPartial(4b) filtered out | ||
| { | ||
| step: bstream.StepNewPartial, | ||
| Obj: "00000004a", | ||
| block: tinyBlk("00000004a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000003a"), | ||
| }, | ||
| // ignored 4e: block 4 was settled by its last partial | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000005a", | ||
| block: tinyBlk("00000005a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000004a"), | ||
| }, | ||
| }, | ||
| }, | ||
| { | ||
| name: "ignore 'duplicate' last partial with higher index after lastPartial", | ||
| forkDB: fdbLinked("00000002a"), | ||
| protocolFirstBlock: 2, | ||
| processBlocks: []*pbbstream.Block{ | ||
| bTestBlock("00000003a", "00000002a"), // block{3} | ||
| partialBlock("00000004b", "00000003a", 2), // partialBlock{4, idx=2} | ||
| lastPartialBlock("00000004a", "00000003a", 3), // partialBlock{4, idx=3, LAST}: block 4 is settled | ||
| lastPartialBlock("00000004a", "00000003a", 4), // partialBlock{4, idx=4, LAST}: block 4 is settled again, from another reader that had to bump index again | ||
| bTestBlock("00000005a", "00000004a"), // block{5} | ||
| }, | ||
| expectedResult: []*ForkableObject{ | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000003a", | ||
| block: tinyBlk("00000003a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000002a"), | ||
| }, | ||
| { | ||
| step: bstream.StepPartial, | ||
| Obj: "00000004b", | ||
| block: tinyBlk("00000004b"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000003a"), | ||
| }, | ||
| { | ||
| step: bstream.StepNewPartial, | ||
| Obj: "00000004a", | ||
| block: tinyBlk("00000004a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000003a"), | ||
| }, | ||
| // ignored 4e: block 4 was settled by its last partial | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 nit: copy-paste comment — there is no |
||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000005a", | ||
| block: tinyBlk("00000005a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000004a"), | ||
| }, | ||
| }, | ||
| expectResultWithoutPartialBlocks: []*ForkableObject{ | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000003a", | ||
| block: tinyBlk("00000003a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000002a"), | ||
| }, | ||
| // StepPartial(4b) filtered out | ||
| { | ||
| step: bstream.StepNewPartial, | ||
| Obj: "00000004a", | ||
| block: tinyBlk("00000004a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000003a"), | ||
| }, | ||
| // ignored 4e: block 4 was settled by its last partial | ||
| { | ||
| step: bstream.StepNew, | ||
| Obj: "00000005a", | ||
| block: tinyBlk("00000005a"), | ||
| lastLIBSent: tinyBlk("00000002a"), | ||
| parentBlock: tinyBlk("00000004a"), | ||
| }, | ||
| }, | ||
| }, | ||
| { | ||
| name: "no LIB change on partial blocks unless it is lastPartial", | ||
| forkDB: fdbLinked("00000002a"), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,80 @@ | ||
| package hub | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "testing" | ||
| "time" | ||
|
|
||
| "github.com/streamingfast/bstream" | ||
| pbbstream "github.com/streamingfast/bstream/pb/sf/bstream/v1" | ||
| "google.golang.org/protobuf/types/known/timestamppb" | ||
| ) | ||
|
|
||
| func reproBlock(id string, num uint64, idx int32, last bool) *pbbstream.Block { | ||
| return &pbbstream.Block{ | ||
| Id: id, | ||
| Number: num, | ||
| ParentId: fmt.Sprintf("%04da", num-1), | ||
| ParentNum: num - 1, | ||
| LibNum: 1, | ||
| Timestamp: timestamppb.New(time.Now()), | ||
| PartialIndex: idx, | ||
| LastPartial: last, | ||
| } | ||
| } | ||
|
|
||
| type stepObj struct{ step bstream.StepType } | ||
|
|
||
| func (s stepObj) Step() bstream.StepType { return s.step } | ||
| func (s stepObj) FinalBlockHeight() uint64 { return 0 } | ||
| func (s stepObj) ReorgJunctionBlock() bstream.BlockRef { return nil } | ||
|
|
||
| var _ bstream.Stepable = stepObj{} | ||
|
|
||
| // reproduces the live flashblocks flow under subscriber lag: the channel holds | ||
| // several blocks; the closing "last partial" (StepNewPartial) of a block is | ||
| // followed in the channel by a stale partial version of the same block number. | ||
| // The skip-to-latest-version logic must not skip past the closing block. | ||
| func TestSubscriptionEatsLastPartial(t *testing.T) { | ||
| var received []string | ||
| handler := bstream.HandlerFunc(func(blk *pbbstream.Block, obj interface{}) error { | ||
| received = append(received, fmt.Sprintf("%s %s(idx=%d,last=%v)", obj.(bstream.Stepable).Step(), blk.Id, blk.PartialIndex, blk.LastPartial)) | ||
| return nil | ||
| }) | ||
|
|
||
| sub := NewSubscription(handler, 100, true) | ||
|
|
||
| push := func(step bstream.StepType, blk *pbbstream.Block) { | ||
| if err := sub.push(&bstream.PreprocessedBlock{Block: blk, Obj: stepObj{step}}); err != nil { | ||
| t.Fatal(err) | ||
| } | ||
| } | ||
|
|
||
| // subscriber is lagging: all of these are already buffered in the channel | ||
| // when it starts draining (flashblocks of block 4, its closing last-partial, | ||
| // one stale post-seal flashblock version, then flashblocks of block 5) | ||
| push(bstream.StepPartial, reproBlock("0004p1", 4, 1, false)) | ||
| push(bstream.StepPartial, reproBlock("0004p2", 4, 2, false)) | ||
| push(bstream.StepNewPartial, reproBlock("0004a", 4, 3, true)) // closing of block 4 | ||
| push(bstream.StepPartial, reproBlock("0004p4", 4, 4, false)) // stale flashblock version, post-seal | ||
| push(bstream.StepPartial, reproBlock("0005p1", 5, 1, false)) | ||
| push(bstream.StepNewPartial, reproBlock("0005a", 5, 2, true)) // closing of block 5 | ||
| push(bstream.StepPartial, reproBlock("0006p1", 6, 1, false)) | ||
|
|
||
| go func() { | ||
| time.Sleep(500 * time.Millisecond) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 nit: fixed |
||
| sub.Shutdown(nil) | ||
| }() | ||
| sub.Run() | ||
|
|
||
| for _, r := range received { | ||
| fmt.Println(" delivered:", r) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔵 nit: |
||
| } | ||
|
|
||
| for _, r := range received { | ||
| if r == "new,partial 0004a(idx=3,last=true)" { | ||
| return // closing was delivered, all good | ||
| } | ||
| } | ||
| t.Fatalf("closing last-partial block 0004a was never delivered to the subscriber") | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🔵 nit: this case passes with the fix reverted — the duplicate
00000004ais dropped by same-ID dedup inAddLink(returnsexists), not by the newLastPartial && !blk.LastPartialbranch (that branch needs!blk.LastPartial, but this block IS a last-partial). To actually exercise the new logic, give the second close a different ID — but note a different-ID, higher-index last-partial then hitsreturn blk.PartialIndex > p.lastBlockSent.PartialIndex= true and triggers, so assert the behavior you actually want.