diff --git a/go.mod b/go.mod index 64c782a84..85cfce0cc 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/jhump/protoreflect v1.14.0 github.com/spf13/cobra v1.10.2 github.com/spf13/pflag v1.0.10 - github.com/streamingfast/bstream v0.0.2-0.20260402095814-607e840ece3d + github.com/streamingfast/bstream v0.0.2-0.20260610182139-c1572d2f51c6 github.com/streamingfast/cli v0.0.4-0.20250815192146-d8a233ec3d0b github.com/streamingfast/dauth v0.0.0-20260304175046-02898e30442d github.com/streamingfast/dbin v0.9.1-0.20231117225723-59790c798e2c diff --git a/go.sum b/go.sum index 68fcb8a5b..5dc9c8377 100644 --- a/go.sum +++ b/go.sum @@ -482,8 +482,8 @@ github.com/spf13/viper v1.21.0 h1:x5S+0EU27Lbphp4UKm1C+1oQO+rKx36vfCoaVebLFSU= github.com/spf13/viper v1.21.0/go.mod h1:P0lhsswPGWD/1lZJ9ny3fYnVqxiegrlNrEmgLjbTCAY= github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= -github.com/streamingfast/bstream v0.0.2-0.20260402095814-607e840ece3d h1:Qw06Pcp/XoPy5/AbHGPMH4AkFt0VYjTvYuRzkWyE0dI= -github.com/streamingfast/bstream v0.0.2-0.20260402095814-607e840ece3d/go.mod h1:6IkpMw6g2z+E4gH2DJkm6vsQfZ8Y/WdYdYWjsLJIYr4= +github.com/streamingfast/bstream v0.0.2-0.20260610182139-c1572d2f51c6 h1:8LMg/E/Rw2GljnTgfiU0DA/nD9aD7xYCMnJD0/FHCys= +github.com/streamingfast/bstream v0.0.2-0.20260610182139-c1572d2f51c6/go.mod h1:6IkpMw6g2z+E4gH2DJkm6vsQfZ8Y/WdYdYWjsLJIYr4= github.com/streamingfast/cli v0.0.4-0.20250815192146-d8a233ec3d0b h1:ztYeX3/5rg2tV2EU7edcrcHzMz6wUbdJB+LqCrP5W8s= github.com/streamingfast/cli v0.0.4-0.20250815192146-d8a233ec3d0b/go.mod h1:o9R/tjNON01X2mgWL5qirl2MV6xQ4EZI5D504ST3K/M= github.com/streamingfast/dauth v0.0.0-20260304175046-02898e30442d h1:NYrpGcWWLNpIYbHwvNo/W8E+oXhsrn/0kUwun0Rwx8w= diff --git a/go.work.sum b/go.work.sum index b8318f2c2..2f6619259 100644 --- a/go.work.sum +++ b/go.work.sum @@ -602,16 +602,34 @@ github.com/aws/aws-sdk-go v1.37.0/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zK github.com/aws/aws-sdk-go v1.44.233/go.mod h1:aVsgQcEevwlmQ7qHE9I3h+dtQgpqhFB+i8Phjh7fkwI= github.com/aws/aws-sdk-go v1.49.6 h1:yNldzF5kzLBRvKlKz1S0bkvc2+04R1kt13KfBWQBfFA= github.com/aws/aws-sdk-go v1.49.6/go.mod h1:LF8svs817+Nz+DmiMQKTO3ubZ/6IaTpq3TjupRn3Eqk= +github.com/aws/aws-sdk-go-v2 v1.32.6/go.mod h1:P5WJBrYqqbWVaOxgH0X/FYYD47/nooaPOZPlQdmiN2U= github.com/aws/aws-sdk-go-v2 v1.36.3 h1:mJoei2CxPutQVxaATCzDUjcZEjVRdpsiiXi2o38yqWM= github.com/aws/aws-sdk-go-v2 v1.36.3/go.mod h1:LLXuLpgzEbD766Z5ECcRmi8AzSwfZItDtmABVkRLGzg= +github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.7/go.mod h1:QraP0UcVlQJsmHfioCrveWOC1nbiWUl3ej08h4mXWoc= +github.com/aws/aws-sdk-go-v2/config v1.28.6/go.mod h1:GDzxJ5wyyFSCoLkS+UhGB0dArhb9mI+Co4dHtoTxbko= +github.com/aws/aws-sdk-go-v2/credentials v1.17.47/go.mod h1:+KdckOejLW3Ks3b0E3b5rHsr2f9yuORBum0WPnE5o5w= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.21/go.mod h1:AjUdLYe4Tgs6kpH4Bv7uMZo7pottoyHMn4eTcIcneaY= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.25/go.mod h1:IgPfDv5jqFIzQSNbUEMoitNooSMXjRSDkhXv8jiROvU= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34 h1:ZK5jHhnrioRkUNOc+hOgQKlUL5JeC3S6JgLxtQ+Rm0Q= github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.34/go.mod h1:p4VfIceZokChbA9FzMbRGz5OV+lekcVtHlPKEO0gSZY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.25/go.mod h1:DBdPrgeocww+CSl1C8cEV8PN1mHMBhuCDLpXezyvWkE= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34 h1:SZwFm17ZUNNg5Np0ioo/gq8Mn6u9w19Mri8DnJ15Jf0= github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.34/go.mod h1:dFZsC0BLo346mvKQLWmoJxT+Sjp+qcVR1tRVHQGOH9Q= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.25/go.mod h1:GrGY+Q4fIokYLtjCVB/aFfCVL6hhGUFl8inD18fDalE= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.1/go.mod h1:9nu0fVANtYiAePIBh2/pFUSwtJ402hLnp854CNoDOeE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3 h1:eAh2A4b5IzM/lum78bZ590jy36+d/aFLgKF/4Vd1xPE= github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.12.3/go.mod h1:0yKJC/kb8sAnmlYa6Zs3QVYqaC8ug2AbnNChv5Ox3uA= +github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.4.6/go.mod h1:ngUiVRCco++u+soRRVBIvBZxSMMvOVMXA4PJ36JLfSw= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.6/go.mod h1:WqgLmwY7so32kG01zD8CPTJWVWM+TzJoOVHwTg4aPug= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15 h1:dM9/92u2F1JbDaGooxTq18wmmFzbJRfXfVfy96/1CXM= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.12.15/go.mod h1:SwFBy2vjtA0vZbjjaFtfN045boopadnoVPhu4Fv66vY= +github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.18.6/go.mod h1:hLMJt7Q8ePgViKupeymbqI0la+t9/iYFBjxQCFwuAwI= +github.com/aws/aws-sdk-go-v2/service/s3 v1.71.0/go.mod h1:sT/iQz8JK3u/5gZkT+Hmr7GzVZehUMkRZpOaAwYXeGY= +github.com/aws/aws-sdk-go-v2/service/sso v1.24.7/go.mod h1:ZHtuQJ6t9A/+YDuxOLnbryAmITtr8UysSny3qcyvJTc= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.28.6/go.mod h1:URronUEGfXZN1VpdktPSD1EkAL9mfrV+2F4sjH38qOY= +github.com/aws/aws-sdk-go-v2/service/sts v1.33.2/go.mod h1:mVggCnIWoM09jP71Wh+ea7+5gAp53q+49wDFs1SW5z8= +github.com/aws/smithy-go v1.22.1/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/aws/smithy-go v1.22.2/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/aws/smithy-go v1.22.3 h1:Z//5NuZCSW6R4PhQ93hShNbyBbn8BWCmCVCt+Q8Io5k= github.com/aws/smithy-go v1.22.3/go.mod h1:t1ufH5HMublsJYulve2RKmHDC15xu1f26kHCp/HgceI= @@ -1026,6 +1044,7 @@ github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5Fsn github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M= github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0= github.com/prometheus/client_golang v1.15.1/go.mod h1:e9yaBhRPU2pPNsZwE+JdQl0KEt1N9XgF6zxWmaC0xOk= +github.com/prometheus/client_golang v1.16.0/go.mod h1:Zsulrv/L9oM40tJ7T815tM89lFEugiJ9HzIqaAx4LKc= github.com/prometheus/client_golang v1.20.4/go.mod h1:PIEt8X02hGcP8JWbeHyeZ53Y/jReSnHgO035n//V5WE= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= @@ -1038,6 +1057,7 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8 github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= github.com/prometheus/common v0.42.0/go.mod h1:xBwqVerjNdUDjgODMpudtOMwlOwf2SaTr1yjz4b7Zbc= +github.com/prometheus/common v0.44.0/go.mod h1:ofAIvZbQ1e/nugmZGz4/qCb9Ap1VoSTIO7x0VV9VvuY= github.com/prometheus/common v0.62.0/go.mod h1:vyBcEuLSvWos9B1+CyL7JZ2up+uFzXhkqml0W5zIY1I= github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA= @@ -1045,6 +1065,7 @@ github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4O github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA= github.com/prometheus/procfs v0.9.0/go.mod h1:+pB4zwohETzFnmlpe6yd2lSc+0/46IYZRB/chUwxUZY= github.com/prometheus/procfs v0.10.1/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= +github.com/prometheus/procfs v0.11.0/go.mod h1:nwNm2aOCAYw8uTR/9bWRREkZFxAUcWzPHWJq+XBB/FM= github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rabbitmq/amqp091-go v1.8.1/go.mod h1:+jPrT9iY2eLjRaMSRHUhc3z14E/l85kv/f+6luSD3pc= github.com/rabbitmq/amqp091-go v1.9.0 h1:qrQtyzB4H8BQgEuJwhmVQqVHB9O4+MNDJCCAcpc3Aoo= @@ -1281,6 +1302,7 @@ go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqe go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.opentelemetry.io/proto/otlp v1.7.0/go.mod h1:fSKjH6YJ7HDlwzltzyMj036AJ3ejJLCgCSHGj4efDDo= go.opentelemetry.io/proto/otlp v1.7.1/go.mod h1:b2rVh6rfI/s2pHWNlB7ILJcRALpcNDzKhACevjI+ZnE= +go.opentelemetry.io/proto/otlp v1.9.0/go.mod h1:xE+Cx5E/eEHw+ISFkwPLwCZefwVjY+pqKg1qcK03+/4= go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= go.uber.org/automaxprocs v1.5.1/go.mod h1:BF4eumQw0P9GtnuxxovUd06vwm1o18oMzFtK66vU6XU= diff --git a/pipeline/pipeline.go b/pipeline/pipeline.go index dc11a0615..afadd150c 100644 --- a/pipeline/pipeline.go +++ b/pipeline/pipeline.go @@ -101,6 +101,7 @@ type Pipeline struct { partialProcessingState *partialProcessingState previousLastPartialBlock bstream.BlockRef // when we get a partial with 'isLast', we nil out partialProcessingState and write the block ref here. + undonePartialsBlockNum uint64 // when partials for a block number were undone after a transactions mismatch, further non-last partials for that number are dropped until its last partial or full block arrives respFunc substreams.ResponseFunc lastProgressSent time.Time diff --git a/pipeline/process_block.go b/pipeline/process_block.go index 1f458910d..d6e1b6a87 100644 --- a/pipeline/process_block.go +++ b/pipeline/process_block.go @@ -347,14 +347,37 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl if err := p.handleStepUndoPartial(cursor); err != nil { return err } - // now that we have undone previous partials, we start a new one - p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, cursor.HeadBlock) //on StepPartial, the cursor.headBlock is the parent of the partial + // partialProcessingState is now nil, a new sequence starts below } - } else { - p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, cursor.HeadBlock) + } + + if p.partialProcessingState == nil { + if clock.Number == p.undonePartialsBlockNum && !isLast { + // partials for this block number were already undone after a transactions mismatch: + // stay silent until its last partial or full block arrives instead of streaming + // (and undoing) every churning version of the block + return nil + } + p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, cursor.HeadBlock) // on StepPartial, the cursor.headBlock is the parent of the partial } txCount, txsHash, err := splitPartialblock(execOutput, p.blockType, p.partialProcessingState.processedTransactionsCount, p.partialProcessingState.processedTransactionsHash) + if errors.Is(err, errHashMismatch) { + // this partial block does not extend the transactions already sent for that block number: + // it is a different version of the block (a reorg happening inside partial blocks). + // Undo the partials sent so far. + parentRef := p.partialProcessingState.previousBlockRef // same block number: same parent + if err := p.handleStepUndoPartial(cursor); err != nil { + return err + } + if !isLast { + p.undonePartialsBlockNum = clock.Number + return nil + } + // the last partial (or full block) settles the block: process it from its first transaction + p.partialProcessingState = newPartialProcessingState(clock.Number, clock.Id, idx, parentRef) + txCount, txsHash, err = splitPartialblock(execOutput, p.blockType, 0, nil) + } if err != nil { // an error occurred, partial blocks do not contain what they should... return err @@ -374,6 +397,8 @@ func (p *Pipeline) handleStepPartial(ctx context.Context, clock *pbsubstreams.Cl panic("tier2 requests should never receive partial block") } + p.insideReorgUpTo = nil // new data will flow: a future undo back to the same junction must be sent again + if err := p.executeModules(ctx, execOutput, false, false); err != nil { return fmt.Errorf("execute modules: %w", err) } diff --git a/pipeline/process_block_test.go b/pipeline/process_block_test.go index 5f5c76bc9..e9ab58091 100644 --- a/pipeline/process_block_test.go +++ b/pipeline/process_block_test.go @@ -205,6 +205,110 @@ func TestHandleStepPartial(t *testing.T) { dataResp(11, "11a"), }, }, + { + name: "partial with mismatching transactions triggers undo then waits for the last partial, not an error", + blocks: []blockWithStep{ + withStepNew(testBlock(3, "3a")), + withTxs(withStepNew(testPartialBlock(4, "4x", 1, false)), "t1", "t2"), + withTxs(withStepNew(testPartialBlock(4, "4y", 2, false)), "t1", "t2", "t3"), // extends 4x: no undo + withTxs(withStepNew(testPartialBlock(4, "4z", 3, false)), "o1", "o2"), // different transactions: new version of block 4 + withTxs(withStepNew(testPartialBlock(4, "4a", 4, true)), "o1", "o2", "o3"), // last partial settles the block + withStepNew(testBlock(5, "5a")), + }, + expectedResponses: []substreamsResp{ + dataResp(3, "3a"), + partialDataResp(4, "4x", 1, false), + partialDataResp(4, "4y", 2, false), + undoResp(3, "3a"), + // 4z is not sent: after the undo, we wait for the last partial of block 4 + partialDataResp(4, "4a", 4, true), + dataResp(5, "5a"), + }, + }, + { + name: "partial with fewer transactions than already sent triggers undo", + blocks: []blockWithStep{ + withStepNew(testBlock(3, "3a")), + withTxs(withStepNew(testPartialBlock(4, "4x", 1, false)), "t1", "t2", "t3"), + withTxs(withStepNew(testPartialBlock(4, "4y", 2, false)), "t1"), // shrunk: new version of block 4 + withTxs(withStepNew(testPartialBlock(4, "4a", 3, true)), "t1", "t2"), + }, + expectedResponses: []substreamsResp{ + dataResp(3, "3a"), + partialDataResp(4, "4x", 1, false), + undoResp(3, "3a"), + partialDataResp(4, "4a", 3, true), + }, + }, + { + name: "churning partial versions trigger a single undo then silence until the last partial", + blocks: []blockWithStep{ + withStepNew(testBlock(3, "3a")), + withTxs(withStepNew(testPartialBlock(4, "4w", 1, false)), "t1"), + withTxs(withStepNew(testPartialBlock(4, "4x", 2, false)), "t1", "t2"), + withTxs(withStepNew(testPartialBlock(4, "4y", 3, false)), "u1", "u2"), // mismatch: undo, then mute + withTxs(withStepNew(testPartialBlock(4, "4z", 4, false)), "u1", "u2", "u3"), // muted, even though it extends 4y + withTxs(withStepNew(testPartialBlock(4, "4q", 5, false)), "v1"), // muted + withTxs(withStepNew(testPartialBlock(4, "4a", 6, true)), "v1", "v2"), // last partial settles the block + withStepNew(testBlock(5, "5a")), + }, + expectedResponses: []substreamsResp{ + dataResp(3, "3a"), + partialDataResp(4, "4w", 1, false), + partialDataResp(4, "4x", 2, false), + undoResp(3, "3a"), + partialDataResp(4, "4a", 6, true), + dataResp(5, "5a"), + }, + }, + { + name: "undo to the same junction after a mismatch-undo is not suppressed", + blocks: []blockWithStep{ + withStepNew(testBlock(3, "3a")), + withTxs(withStepNew(testPartialBlock(4, "4x", 1, false)), "t1", "t2"), + withTxs(withStepNew(testPartialBlock(4, "4y", 2, false)), "u1", "u2"), // mismatch: undo to 3a, then mute + withTxs(withStepNew(testPartialBlock(4, "4b", 3, true)), "u1", "u2", "u3"), + withStepUndo(testBlock(4, "4b"), 3, "3a"), // chain reorgs block 4 away: undo to 3a again + withStepNew(testBlock(4, "4a")), + }, + expectedResponses: []substreamsResp{ + dataResp(3, "3a"), + partialDataResp(4, "4x", 1, false), + undoResp(3, "3a"), + partialDataResp(4, "4b", 3, true), + undoResp(3, "3a"), // must not be suppressed by the undo dedup + dataResp(4, "4a"), + }, + }, + { + name: "full block with mismatching transactions triggers undo and is sent as last partial", + blocks: []blockWithStep{ + withStepNew(testBlock(3, "3a")), + withTxs(withStepNew(testPartialBlock(4, "4y", 1, false)), "t1", "t2"), + withTxs(withStepNew(testBlock(4, "4y")), "o1", "o2"), // same ID as the partial but different content + withStepNew(testBlock(5, "5a")), + }, + expectedResponses: []substreamsResp{ + dataResp(3, "3a"), + partialDataResp(4, "4y", 1, false), + undoResp(3, "3a"), + partialDataResp(4, "4y", 2, true), + dataResp(5, "5a"), + }, + }, + { + name: "partials on a chain with skipped block numbers are accepted", + blocks: []blockWithStep{ + withStepNew(testBlock(3, "3a")), + withTxs(withStepNew(testPartialBlock(5, "5x", 1, false)), "t1"), // block 4 skipped + withTxs(withStepNew(testPartialBlock(5, "5a", 2, true)), "t1", "t2"), + }, + expectedResponses: []substreamsResp{ + dataResp(3, "3a"), + partialDataResp(5, "5x", 1, false), + partialDataResp(5, "5a", 2, true), + }, + }, { name: "partial undo does not reset 'lastPartial'", blocks: []blockWithStep{ @@ -253,7 +357,7 @@ func TestHandleStepPartial(t *testing.T) { for _, blk := range tt.blocks { - err := fillTestAcmeBlock(blk.blk) + err := fillTestAcmeBlock(blk.blk, blk.txs) require.NoError(t, err) clock := BlockToClock(blk.blk) @@ -367,6 +471,13 @@ type blockWithStep struct { blk *pbbstream.Block step bstream.StepType junction bstream.BlockRef + txs []string +} + +// withTxs sets the transaction hashes carried by the underlying acme block +func withTxs(b blockWithStep, txs ...string) blockWithStep { + b.txs = txs + return b } func withStepUndo(blk *pbbstream.Block, junctionNum uint64, junctionID string) blockWithStep { @@ -397,13 +508,17 @@ func withStepNew(blk *pbbstream.Block) blockWithStep { } } -func fillTestAcmeBlock(blk *pbbstream.Block) error { +func fillTestAcmeBlock(blk *pbbstream.Block, txs []string) error { + var transactions []*pbacme.Transaction + for _, hash := range txs { + transactions = append(transactions, &pbacme.Transaction{Hash: hash}) + } acmeBlock := &pbacme.Block{ Header: &pbacme.BlockHeader{ Height: blk.Number, Hash: blk.Id, }, - Transactions: nil, + Transactions: transactions, } payload, err := anypb.New(acmeBlock)