Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/jhump/protoreflect v1.14.0
github.com/spf13/cobra v1.10.2
github.com/spf13/pflag v1.0.10
github.com/streamingfast/bstream v0.0.2-0.20260402095814-607e840ece3d
github.com/streamingfast/bstream v0.0.2-0.20260610182139-c1572d2f51c6
github.com/streamingfast/cli v0.0.4-0.20250815192146-d8a233ec3d0b
github.com/streamingfast/dauth v0.0.0-20260304175046-02898e30442d
github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -482,8 +482,8 @@ github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU=
github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY=
github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo=
github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs=
github.com/streamingfast/bstream v0.0.2-0.20260402095814-607e840ece3d h1:Qw06Pcp/XoPy5/AbHGPMH4AkFt0VYjTvYuRzkWyE0dI=
github.com/streamingfast/bstream v0.0.2-0.20260402095814-607e840ece3d/go.mod h1:6IkpMw6g2z+E4gH2DJkm6vsQfZ8Y/WdYdYWjsLJIYr4=
github.com/streamingfast/bstream v0.0.2-0.20260610182139-c1572d2f51c6 h1:8LMg/E/Rw2GljnTgfiU0DA/nD9aD7xYCMnJD0/FHCys=
github.com/streamingfast/bstream v0.0.2-0.20260610182139-c1572d2f51c6/go.mod h1:6IkpMw6g2z+E4gH2DJkm6vsQfZ8Y/WdYdYWjsLJIYr4=
github.com/streamingfast/cli v0.0.4-0.20250815192146-d8a233ec3d0b h1:ztYeX3/5rg2tV2EU7edcrcHzMz6wUbdJB+LqCrP5W8s=
github.com/streamingfast/cli v0.0.4-0.20250815192146-d8a233ec3d0b/go.mod h1:o9R/tjNON01X2mgWL5qirl2MV6xQ4EZI5D504ST3K/M=
github.com/streamingfast/dauth v0.0.0-20260304175046-02898e30442d h1:NYrpGcWWLNpIYbHwvNo/W8E+oXhsrn/0kUwun0Rwx8w=
Expand Down
22 changes: 22 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -602,16 +602,34 @@ github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zK
github.com/aws/aws-sdk-go v1.44.233/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI=
github.com/aws/aws-sdk-go v1.49.6 h1:yNldzF5kzLBRvKlKz1S0bkvc2+04R1kt13KfBWQBfFA=
github.com/aws/aws-sdk-go v1.49.6/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk=
github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U=
github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM=
github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg=
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc=
github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko=
github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0=
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q=
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc=
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25/go.mod h1:GrGY+Q4fIokYLtjCVB/aFfCVL6hhGUFl8inD18fDalE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE=
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA=
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6/go.mod h1:ngUiVRCco++u+soRRVBIvBZxSMMvOVMXA4PJ36JLfSw=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY=
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.6/go.mod h1:hLMJt7Q8ePgViKupeymbqI0la+t9/iYFBjxQCFwuAwI=
github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0/go.mod h1:sT/iQz8JK3u/5gZkT+Hmr7GzVZehUMkRZpOaAwYXeGY=
github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc=
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY=
github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8=
github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg=
github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k=
github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI=
Expand Down Expand Up @@ -1026,6 +1044,7 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk=
github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc=
github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
Expand All @@ -1038,13 +1057,15 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc=
github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY=
github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY=
github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM=
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc=
github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo=
Expand Down Expand Up @@ -1281,6 +1302,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo=
go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE=
go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU=
Expand Down
1 change: 1 addition & 0 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ type Pipeline struct {

partialProcessingState *partialProcessingState
previousLastPartialBlock bstream.BlockRef // when we get a partial with 'isLast', we nil out partialProcessingState and write the block ref here.
undonePartialsBlockNum uint64 // when partials for a block number were undone after a transactions mismatch, further non-last partials for that number are dropped until its last partial or full block arrives

respFunc substreams.ResponseFunc
lastProgressSent time.Time
Expand Down
33 changes: 29 additions & 4 deletions pipeline/process_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,37 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl
if err := p.handleStepUndoPartial(cursor); err != nil {
return err
}
// now that we have undone previous partials, we start a new one
p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, cursor.HeadBlock) //on StepPartial, the cursor.headBlock is the parent of the partial
// partialProcessingState is now nil, a new sequence starts below
}
} else {
p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, cursor.HeadBlock)
}

if p.partialProcessingState == nil {
if clock.Number == p.undonePartialsBlockNum && !isLast {
// partials for this block number were already undone after a transactions mismatch:
// stay silent until its last partial or full block arrives instead of streaming
// (and undoing) every churning version of the block
return nil
}
p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, cursor.HeadBlock) // on StepPartial, the cursor.headBlock is the parent of the partial
}

txCount, txsHash, err := splitPartialblock(execOutput, p.blockType, p.partialProcessingState.processedTransactionsCount, p.partialProcessingState.processedTransactionsHash)
if errors.Is(err, errHashMismatch) {
// this partial block does not extend the transactions already sent for that block number:
// it is a different version of the block (a reorg happening inside partial blocks).
// Undo the partials sent so far.
parentRef := p.partialProcessingState.previousBlockRef // same block number: same parent
if err := p.handleStepUndoPartial(cursor); err != nil {
return err
}
if !isLast {
p.undonePartialsBlockNum = clock.Number
return nil
}
// the last partial (or full block) settles the block: process it from its first transaction
p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, parentRef)
txCount, txsHash, err = splitPartialblock(execOutput, p.blockType, 0, nil)
}
if err != nil {
// an error occurred, partial blocks do not contain what they should...
return err
Expand All @@ -374,6 +397,8 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl
panic("tier2 requests should never receive partial block")
}

p.insideReorgUpTo = nil // new data will flow: a future undo back to the same junction must be sent again

if err := p.executeModules(ctx, execOutput, false, false); err != nil {
return fmt.Errorf("execute modules: %w", err)
}
Expand Down
121 changes: 118 additions & 3 deletions pipeline/process_block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,110 @@ func TestHandleStepPartial(t *testing.T) {
dataResp(11, "11a"),
},
},
{
name: "partial with mismatching transactions triggers undo then waits for the last partial, not an error",
blocks: []blockWithStep{
withStepNew(testBlock(3, "3a")),
withTxs(withStepNew(testPartialBlock(4, "4x", 1, false)), "t1", "t2"),
withTxs(withStepNew(testPartialBlock(4, "4y", 2, false)), "t1", "t2", "t3"), // extends 4x: no undo
withTxs(withStepNew(testPartialBlock(4, "4z", 3, false)), "o1", "o2"), // different transactions: new version of block 4
withTxs(withStepNew(testPartialBlock(4, "4a", 4, true)), "o1", "o2", "o3"), // last partial settles the block
withStepNew(testBlock(5, "5a")),
},
expectedResponses: []substreamsResp{
dataResp(3, "3a"),
partialDataResp(4, "4x", 1, false),
partialDataResp(4, "4y", 2, false),
undoResp(3, "3a"),
// 4z is not sent: after the undo, we wait for the last partial of block 4
partialDataResp(4, "4a", 4, true),
dataResp(5, "5a"),
},
},
{
name: "partial with fewer transactions than already sent triggers undo",
blocks: []blockWithStep{
withStepNew(testBlock(3, "3a")),
withTxs(withStepNew(testPartialBlock(4, "4x", 1, false)), "t1", "t2", "t3"),
withTxs(withStepNew(testPartialBlock(4, "4y", 2, false)), "t1"), // shrunk: new version of block 4
withTxs(withStepNew(testPartialBlock(4, "4a", 3, true)), "t1", "t2"),
},
expectedResponses: []substreamsResp{
dataResp(3, "3a"),
partialDataResp(4, "4x", 1, false),
undoResp(3, "3a"),
partialDataResp(4, "4a", 3, true),
},
},
{
name: "churning partial versions trigger a single undo then silence until the last partial",
blocks: []blockWithStep{
withStepNew(testBlock(3, "3a")),
withTxs(withStepNew(testPartialBlock(4, "4w", 1, false)), "t1"),
withTxs(withStepNew(testPartialBlock(4, "4x", 2, false)), "t1", "t2"),
withTxs(withStepNew(testPartialBlock(4, "4y", 3, false)), "u1", "u2"), // mismatch: undo, then mute
withTxs(withStepNew(testPartialBlock(4, "4z", 4, false)), "u1", "u2", "u3"), // muted, even though it extends 4y
withTxs(withStepNew(testPartialBlock(4, "4q", 5, false)), "v1"), // muted
withTxs(withStepNew(testPartialBlock(4, "4a", 6, true)), "v1", "v2"), // last partial settles the block
withStepNew(testBlock(5, "5a")),
},
expectedResponses: []substreamsResp{
dataResp(3, "3a"),
partialDataResp(4, "4w", 1, false),
partialDataResp(4, "4x", 2, false),
undoResp(3, "3a"),
partialDataResp(4, "4a", 6, true),
dataResp(5, "5a"),
},
},
{
name: "undo to the same junction after a mismatch-undo is not suppressed",
blocks: []blockWithStep{
withStepNew(testBlock(3, "3a")),
withTxs(withStepNew(testPartialBlock(4, "4x", 1, false)), "t1", "t2"),
withTxs(withStepNew(testPartialBlock(4, "4y", 2, false)), "u1", "u2"), // mismatch: undo to 3a, then mute
withTxs(withStepNew(testPartialBlock(4, "4b", 3, true)), "u1", "u2", "u3"),
withStepUndo(testBlock(4, "4b"), 3, "3a"), // chain reorgs block 4 away: undo to 3a again
withStepNew(testBlock(4, "4a")),
},
expectedResponses: []substreamsResp{
dataResp(3, "3a"),
partialDataResp(4, "4x", 1, false),
undoResp(3, "3a"),
partialDataResp(4, "4b", 3, true),
undoResp(3, "3a"), // must not be suppressed by the undo dedup
dataResp(4, "4a"),
},
},
{
name: "full block with mismatching transactions triggers undo and is sent as last partial",
blocks: []blockWithStep{
withStepNew(testBlock(3, "3a")),
withTxs(withStepNew(testPartialBlock(4, "4y", 1, false)), "t1", "t2"),
withTxs(withStepNew(testBlock(4, "4y")), "o1", "o2"), // same ID as the partial but different content
withStepNew(testBlock(5, "5a")),
},
expectedResponses: []substreamsResp{
dataResp(3, "3a"),
partialDataResp(4, "4y", 1, false),
undoResp(3, "3a"),
partialDataResp(4, "4y", 2, true),
dataResp(5, "5a"),
},
},
{
name: "partials on a chain with skipped block numbers are accepted",
blocks: []blockWithStep{
withStepNew(testBlock(3, "3a")),
withTxs(withStepNew(testPartialBlock(5, "5x", 1, false)), "t1"), // block 4 skipped
withTxs(withStepNew(testPartialBlock(5, "5a", 2, true)), "t1", "t2"),
},
expectedResponses: []substreamsResp{
dataResp(3, "3a"),
partialDataResp(5, "5x", 1, false),
partialDataResp(5, "5a", 2, true),
},
},
{
name: "partial undo does not reset 'lastPartial'",
blocks: []blockWithStep{
Expand Down Expand Up @@ -253,7 +357,7 @@ func TestHandleStepPartial(t *testing.T) {

for _, blk := range tt.blocks {

err := fillTestAcmeBlock(blk.blk)
err := fillTestAcmeBlock(blk.blk, blk.txs)
require.NoError(t, err)

clock := BlockToClock(blk.blk)
Expand Down Expand Up @@ -367,6 +471,13 @@ type blockWithStep struct {
blk *pbbstream.Block
step bstream.StepType
junction bstream.BlockRef
txs []string
}

// withTxs sets the transaction hashes carried by the underlying acme block
func withTxs(b blockWithStep, txs ...string) blockWithStep {
b.txs = txs
return b
}

func withStepUndo(blk *pbbstream.Block, junctionNum uint64, junctionID string) blockWithStep {
Expand Down Expand Up @@ -397,13 +508,17 @@ func withStepNew(blk *pbbstream.Block) blockWithStep {
}
}

func fillTestAcmeBlock(blk *pbbstream.Block) error {
func fillTestAcmeBlock(blk *pbbstream.Block, txs []string) error {
var transactions []*pbacme.Transaction
for _, hash := range txs {
transactions = append(transactions, &pbacme.Transaction{Hash: hash})
}
acmeBlock := &pbacme.Block{
Header: &pbacme.BlockHeader{
Height: blk.Number,
Hash: blk.Id,
},
Transactions: nil,
Transactions: transactions,
}

payload, err := anypb.New(acmeBlock)
Expand Down
Loading