From b87e1d2e101ae16de554814336d1400f752ac28b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Feb 2026 22:02:39 +0000 Subject: [PATCH 1/6] Initial plan From aa83749311055c00c471fa1415e59a6d7a1e792d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Feb 2026 22:12:02 +0000 Subject: [PATCH 2/6] Migrate to substreams/sink from substreams-sink library - Update all imports from github.com/streamingfast/substreams-sink to github.com/streamingfast/substreams/sink - Update NewFromViper calls to use new signature (removed endpoint and blockRange params, added userAgent) - Replace BlockRange() method calls with StartBlock() and StopBlock() - Update flag management to use FlagIgnore (deprecated but compatible API) - Remove sink.RegisterMetrics() calls as they're no longer in the API - Update readBlockRangeArgument to use bstream.ParseRange - Update ReadManifestAndModuleAndBlockRange to ReadManifestAndModule - Remove unused imports Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- cmd/substreams-sink-sql/common_flags.go | 7 +-- cmd/substreams-sink-sql/from_proto.go | 23 +++++---- cmd/substreams-sink-sql/generate_csv.go | 47 ++++++++----------- cmd/substreams-sink-sql/run.go | 45 +++--------------- cmd/substreams-sink-sql/tools.go | 2 +- db_changes/db/cursor.go | 2 +- db_changes/db/dialect.go | 2 +- db_changes/db/dialect_clickhouse.go | 2 +- db_changes/db/dialect_postgres.go | 2 +- db_changes/db/flush.go | 2 +- db_changes/sinker/factory.go | 2 +- db_changes/sinker/generate_csv_sinker.go | 10 ++-- db_changes/sinker/sinker.go | 7 +-- db_changes/sinker/sinker_test.go | 2 +- db_changes/sinker/stats.go | 2 +- db_changes/state/file.go | 2 +- db_changes/state/interface.go | 2 +- db_proto/sinker.go | 2 +- db_proto/sinker_factory.go | 2 +- db_proto/sql/click_house/database.go | 2 +- db_proto/sql/database.go | 2 +- db_proto/sql/postgres/database.go | 2 +- go.mod | 3 +- go.sum | 6 +-- .../integration/db_changes_clickhouse_test.go | 2 +- tests/integration/db_changes_postgres_test.go | 2 +- tests/integration/db_proto_clickhouse_test.go | 2 +- 27 files changed, 71 insertions(+), 115 deletions(-) diff --git a/cmd/substreams-sink-sql/common_flags.go b/cmd/substreams-sink-sql/common_flags.go index 47594981..810231cd 100644 --- a/cmd/substreams-sink-sql/common_flags.go +++ b/cmd/substreams-sink-sql/common_flags.go @@ -10,8 +10,6 @@ import ( "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" - pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "go.uber.org/zap" ) @@ -46,10 +44,7 @@ func AddCommonDatabaseChangesFlags(flags *pflag.FlagSet) { } func readBlockRangeArgument(in string) (blockRange *bstream.Range, err error) { - return sink.ReadBlockRange(&pbsubstreams.Module{ - Name: "dummy", - InitialBlock: 0, - }, in) + return bstream.ParseRange(in) } type cliApplication struct { diff --git a/cmd/substreams-sink-sql/from_proto.go b/cmd/substreams-sink-sql/from_proto.go index 78159330..cfff77de 100644 --- a/cmd/substreams-sink-sql/from_proto.go +++ b/cmd/substreams-sink-sql/from_proto.go @@ -11,7 +11,7 @@ import ( "github.com/streamingfast/cli" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" sinksql "github.com/streamingfast/substreams-sink-sql" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" @@ -28,7 +28,7 @@ var fromProtoCmd = Command(fromProtoE, "", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, ignoreUndoBufferSize{}) + sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size")) flags.StringP("substreams-endpoint", "e", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.") flags.StringP("start-block", "s", "", "Start block to stream from. If empty, will be replaced by initialBlock of the first module you are streaming. If negative, will be resolved by the server relative to the chain head") flags.StringP("stop-block", "t", "0", "Stop block to end stream at, exclusively. If the start-block is positive, a '+' prefix can indicate 'relative to start-block'") @@ -103,16 +103,20 @@ func fromProtoE(cmd *cobra.Command, args []string) error { return err } } + + // Set the endpoint flag for the sink to use + cmd.Flags().Set("endpoint", endpoint) startBlock := sflags.MustGetString(cmd, "start-block") - endBlock := sflags.MustGetString(cmd, "stop-block") - blockRange := "" if startBlock != "" { - blockRange = startBlock + // Set the start-block flag for the sink to use + cmd.Flags().Set("start-block", startBlock) } - blockRange += ":" + + endBlock := sflags.MustGetString(cmd, "stop-block") if endBlock != "0" { - blockRange += endBlock + // Set the stop-block flag for the sink to use + cmd.Flags().Set("stop-block", endBlock) } dsn, err := db.ParseDSN(dsnString) @@ -121,7 +125,7 @@ func fromProtoE(cmd *cobra.Command, args []string) error { } //todo: handle params - spkg, module, _, _, err := sink.ReadManifestAndModuleAndBlockRange(manifestPath, "", nil, outputModuleName, "", false, "", zlog) + spkg, module, _, err := sink.ReadManifestAndModule(manifestPath, "", nil, outputModuleName, "", false, nil, zlog) if err != nil { return fmt.Errorf("reading manifest: %w", err) } @@ -212,10 +216,9 @@ func fromProtoE(cmd *cobra.Command, args []string) error { baseSink, err := sink.NewFromViper( cmd, outputType, - endpoint, manifestPath, outputModuleName, - blockRange, + "substreams-sink-sql/1.0.0", zlog, tracer, ) diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index 195c34ca..e9df5aeb 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -9,10 +9,9 @@ import ( "github.com/spf13/pflag" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" - "github.com/streamingfast/substreams/manifest" ) // lastCursorFilename is the name of the file where the last cursor is stored, no extension as it's added by the store @@ -36,14 +35,13 @@ var generateCsvCmd = Command(generateCsvE, `), ExactArgs(3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, sink.FlagIgnore("final-blocks-only")) + sink.AddFlagsToSet(flags) AddCommonSinkerFlags(flags) AddCommonDatabaseChangesFlags(flags) flags.Uint64("bundle-size", 10000, "Size of output bundle, in blocks") flags.String("working-dir", "./workdir", "Path to local folder used as working directory") flags.String("output-dir", "./csv-output", "Path to local folder used as destination for CSV") - flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") flags.Uint64("buffer-max-size", 4*1024*1024, FlagDescription(` Amount of memory bytes to allocate to the buffered writer. If your data set is small enough that every is hold in memory, we are going to avoid the local I/O operation(s) and upload accumulated content in memory directly to final storage location. @@ -62,13 +60,26 @@ var generateCsvCmd = Command(generateCsvE, func generateCsvE(cmd *cobra.Command, args []string) error { app := NewApplication(cmd.Context()) - sink.RegisterMetrics() sinker2.RegisterMetrics() dsnString := args[0] manifestPath := args[1] blockRange := args[2] + // Parse block range and set flags + br, err := readBlockRangeArgument(blockRange) + if err != nil { + return fmt.Errorf("invalid block range %q: %w", blockRange, err) + } + + // Set the start and stop block flags from the parsed block range + if br.StartBlock() > 0 { + cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())) + } + if br.EndBlock() != nil { + cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())) + } + outputDir := sflags.MustGetString(cmd, "output-dir") bundleSize := sflags.MustGetUint64(cmd, "bundle-size") bufferMaxSize := sflags.MustGetUint64(cmd, "buffer-max-size") @@ -76,37 +87,17 @@ func generateCsvE(cmd *cobra.Command, args []string) error { cursorTableName := sflags.MustGetString(cmd, "cursors-table") historyTableName := sflags.MustGetString(cmd, "history-table") - endpoint := sflags.MustGetString(cmd, "endpoint") - if endpoint == "" { - network := sflags.MustGetString(cmd, "network") - if network == "" { - reader, err := manifest.NewReader(manifestPath) - if err != nil { - return fmt.Errorf("setup manifest reader: %w", err) - } - pkgBundle, err := reader.Read() - if err != nil { - return fmt.Errorf("read manifest: %w", err) - } - network = pkgBundle.Package.Network - } - var err error - endpoint, err = manifest.ExtractNetworkEndpoint(network, sflags.MustGetString(cmd, "endpoint"), zlog) - if err != nil { - return err - } - } + // Set final-blocks-only flag to true for CSV generation + cmd.Flags().Set("final-blocks-only", "true") sink, err := sink.NewFromViper( cmd, supportedOutputTypes, - endpoint, manifestPath, sink.InferOutputModuleFromPackage, - blockRange, + "substreams-sink-sql/1.0.0", zlog, tracer, - sink.WithFinalBlocksOnly(), ) if err != nil { return fmt.Errorf("new base sinker: %w", err) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index db73c616..a12147cf 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -8,23 +8,16 @@ import ( "github.com/spf13/pflag" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" - "github.com/streamingfast/substreams/manifest" ) -type ignoreUndoBufferSize struct{} - -func (i ignoreUndoBufferSize) IsIgnored(in string) bool { - return in == "undo-buffer-size" -} - var sinkRunCmd = Command(sinkRunE, - "run [:]", + "run []", "Runs SQL sink process", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, ignoreUndoBufferSize{}) + sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size")) AddCommonSinkerFlags(flags) AddCommonDatabaseChangesFlags(flags) @@ -35,7 +28,6 @@ var sinkRunCmd = Command(sinkRunE, flags.Int("flush-interval", 0, "(deprecated) please use --batch-block-flush-interval instead") flags.Int("flush-retry-count", 3, "Number of retry attempts for flush operations") flags.Duration("flush-retry-delay", 1*time.Second, "Base delay for incremental retry backoff on flush failures") - flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' uniswap-v3@v0.2.10"), OnCommandErrorLogAndExit(zlog), @@ -44,44 +36,21 @@ var sinkRunCmd = Command(sinkRunE, func sinkRunE(cmd *cobra.Command, args []string) error { app := NewApplication(cmd.Context()) - sink.RegisterMetrics() sinker2.RegisterMetrics() dsnString := args[0] manifestPath := args[1] - blockRange := "" + moduleName := "" if len(args) > 2 { - blockRange = args[2] - } - - endpoint := sflags.MustGetString(cmd, "endpoint") - if endpoint == "" { - network := sflags.MustGetString(cmd, "network") - if network == "" { - reader, err := manifest.NewReader(manifestPath) - if err != nil { - return fmt.Errorf("setup manifest reader: %w", err) - } - pkgBundle, err := reader.Read() - if err != nil { - return fmt.Errorf("read manifest: %w", err) - } - network = pkgBundle.Package.Network - } - var err error - endpoint, err = manifest.ExtractNetworkEndpoint(network, sflags.MustGetString(cmd, "endpoint"), zlog) - if err != nil { - return err - } + moduleName = args[2] } sink, err := sink.NewFromViper( cmd, supportedOutputTypes, - endpoint, manifestPath, - sink.InferOutputModuleFromPackage, - blockRange, + moduleName, + "substreams-sink-sql/1.0.0", zlog, tracer, ) diff --git a/cmd/substreams-sink-sql/tools.go b/cmd/substreams-sink-sql/tools.go index ef0dca1c..7631ea4c 100644 --- a/cmd/substreams-sink-sql/tools.go +++ b/cmd/substreams-sink-sql/tools.go @@ -11,7 +11,7 @@ import ( "github.com/streamingfast/cli" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" ) diff --git a/db_changes/db/cursor.go b/db_changes/db/cursor.go index 94ac097b..2ef924e6 100644 --- a/db_changes/db/cursor.go +++ b/db_changes/db/cursor.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/lithammer/dedent" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" ) diff --git a/db_changes/db/dialect.go b/db_changes/db/dialect.go index 5f495e69..8ea60cb8 100644 --- a/db_changes/db/dialect.go +++ b/db_changes/db/dialect.go @@ -5,7 +5,7 @@ import ( "database/sql" "fmt" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" ) type UnknownDriverError struct { diff --git a/db_changes/db/dialect_clickhouse.go b/db_changes/db/dialect_clickhouse.go index 0d182ef6..42701531 100644 --- a/db_changes/db/dialect_clickhouse.go +++ b/db_changes/db/dialect_clickhouse.go @@ -15,7 +15,7 @@ import ( clickhouse "github.com/AfterShip/clickhouse-sql-parser/parser" _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/streamingfast/cli" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" "golang.org/x/exp/maps" ) diff --git a/db_changes/db/dialect_postgres.go b/db_changes/db/dialect_postgres.go index 6dd3c7cd..0131a6c4 100644 --- a/db_changes/db/dialect_postgres.go +++ b/db_changes/db/dialect_postgres.go @@ -14,7 +14,7 @@ import ( "time" "github.com/streamingfast/cli" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" "golang.org/x/exp/maps" ) diff --git a/db_changes/db/flush.go b/db_changes/db/flush.go index 87d77831..36e71484 100644 --- a/db_changes/db/flush.go +++ b/db_changes/db/flush.go @@ -7,7 +7,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" ) diff --git a/db_changes/sinker/factory.go b/db_changes/sinker/factory.go index 12be8882..e8fd6b5c 100644 --- a/db_changes/sinker/factory.go +++ b/db_changes/sinker/factory.go @@ -7,7 +7,7 @@ import ( "time" "github.com/streamingfast/logging" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_changes/db" "go.uber.org/zap" ) diff --git a/db_changes/sinker/generate_csv_sinker.go b/db_changes/sinker/generate_csv_sinker.go index 84b130c1..3ef3f440 100644 --- a/db_changes/sinker/generate_csv_sinker.go +++ b/db_changes/sinker/generate_csv_sinker.go @@ -17,7 +17,7 @@ import ( "github.com/streamingfast/logging" "github.com/streamingfast/logging/zapx" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" bundler2 "github.com/streamingfast/substreams-sink-sql/db_changes/bundler" writer2 "github.com/streamingfast/substreams-sink-sql/db_changes/bundler/writer" @@ -61,8 +61,8 @@ func NewGenerateCSVSinker( logger *zap.Logger, tracer logging.Tracer, ) (*GenerateCSVSinker, error) { - blockRange := sink.BlockRange() - if blockRange == nil || blockRange.EndBlock() == nil { + stopBlock := sink.StopBlock() + if stopBlock == 0 { return nil, fmt.Errorf("sink must have a stop block defined") } @@ -99,7 +99,7 @@ func NewGenerateCSVSinker( bundlersByTable: make(map[string]*bundler2.Bundler), cursorsTableStore: cursorsStore, lastCursorFilename: lastCursorFilename, - stopBlock: *blockRange.EndBlock(), + stopBlock: stopBlock, loader: loader, logger: logger, @@ -116,7 +116,7 @@ func NewGenerateCSVSinker( tables := s.loader.GetAvailableTablesInSchema() for _, table := range tables { columns := s.loader.GetColumnsForTable(table) - fb, err := getBundler(table, s.Sinker.BlockRange().StartBlock(), s.stopBlock, bundleSize, bufferSize, csvOutputStore, workingDir, logger, columns) + fb, err := getBundler(table, uint64(s.Sinker.StartBlock()), s.stopBlock, bundleSize, bufferSize, csvOutputStore, workingDir, logger, columns) if err != nil { return nil, err } diff --git a/db_changes/sinker/sinker.go b/db_changes/sinker/sinker.go index d7a1b7fa..e2a6b8eb 100644 --- a/db_changes/sinker/sinker.go +++ b/db_changes/sinker/sinker.go @@ -10,7 +10,7 @@ import ( "github.com/streamingfast/logging" "github.com/streamingfast/logging/zapx" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" @@ -334,8 +334,9 @@ func protoUpdateOpToDbUpdateOp(op pbdatabase.Field_UpdateOp) db2.UpdateOp { func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error { // To be moved in the base sinker library, happens usually only on integration tests where the connection // can close with "nil" error but we haven't completed the range for real yet. - if !s.Sinker.BlockRange().ReachedEndBlock(cursor.Block().Num()) { - s.logger.Debug("range not completed yet, skipping", zap.Stringer("block", cursor.Block()), zap.Stringer("range", s.Sinker.BlockRange())) + stopBlock := s.Sinker.StopBlock() + if stopBlock > 0 && cursor.Block().Num() < stopBlock { + s.logger.Debug("range not completed yet, skipping", zap.Stringer("block", cursor.Block()), zap.Uint64("stop_block", stopBlock)) return nil } diff --git a/db_changes/sinker/sinker_test.go b/db_changes/sinker/sinker_test.go index 949c497c..600a8bda 100644 --- a/db_changes/sinker/sinker_test.go +++ b/db_changes/sinker/sinker_test.go @@ -9,7 +9,7 @@ import ( _ "github.com/lib/pq" "github.com/streamingfast/bstream" "github.com/streamingfast/logging" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams/client" diff --git a/db_changes/sinker/stats.go b/db_changes/sinker/stats.go index a82a166c..efc39ca7 100644 --- a/db_changes/sinker/stats.go +++ b/db_changes/sinker/stats.go @@ -6,7 +6,7 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/dmetrics" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" ) diff --git a/db_changes/state/file.go b/db_changes/state/file.go index d3b81b18..9c0dc538 100644 --- a/db_changes/state/file.go +++ b/db_changes/state/file.go @@ -12,7 +12,7 @@ import ( "github.com/streamingfast/dhammer" "github.com/streamingfast/dstore" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_changes/bundler/writer" "go.uber.org/zap" "gopkg.in/yaml.v3" diff --git a/db_changes/state/interface.go b/db_changes/state/interface.go index 4f6192b9..00ed6018 100644 --- a/db_changes/state/interface.go +++ b/db_changes/state/interface.go @@ -4,7 +4,7 @@ import ( "context" "github.com/streamingfast/bstream" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_changes/bundler/writer" ) diff --git a/db_proto/sinker.go b/db_proto/sinker.go index a8b00542..195adc78 100644 --- a/db_proto/sinker.go +++ b/db_proto/sinker.go @@ -7,7 +7,7 @@ import ( "time" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" sql "github.com/streamingfast/substreams-sink-sql/db_proto/sql" "github.com/streamingfast/substreams-sink-sql/db_proto/stats" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" diff --git a/db_proto/sinker_factory.go b/db_proto/sinker_factory.go index ba210ae9..25f6f284 100644 --- a/db_proto/sinker_factory.go +++ b/db_proto/sinker_factory.go @@ -6,7 +6,7 @@ import ( "time" "github.com/streamingfast/logging" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" protosql "github.com/streamingfast/substreams-sink-sql/db_proto/sql" diff --git a/db_proto/sql/click_house/database.go b/db_proto/sql/click_house/database.go index 9c4ef025..fb7f56ad 100644 --- a/db_proto/sql/click_house/database.go +++ b/db_proto/sql/click_house/database.go @@ -13,7 +13,7 @@ import ( "github.com/ClickHouse/ch-go" "github.com/streamingfast/logging" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_proto/sql" diff --git a/db_proto/sql/database.go b/db_proto/sql/database.go index cb0eb523..29b14c45 100644 --- a/db_proto/sql/database.go +++ b/db_proto/sql/database.go @@ -6,7 +6,7 @@ import ( "strings" "time" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbSchema "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1" "github.com/streamingfast/substreams-sink-sql/proto" "go.uber.org/zap" diff --git a/db_proto/sql/postgres/database.go b/db_proto/sql/postgres/database.go index 2ccc96bf..087a93d0 100644 --- a/db_proto/sql/postgres/database.go +++ b/db_proto/sql/postgres/database.go @@ -8,7 +8,7 @@ import ( "time" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_proto/sql" diff --git a/go.mod b/go.mod index f61a5df8..b0e66760 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/spf13/viper v1.15.0 github.com/streamingfast/logging v0.0.0-20251127143054-23a35e5bd633 github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261 - github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.38.0 @@ -51,7 +50,6 @@ require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/bits-and-blooms/bitset v1.12.0 // indirect github.com/bobg/go-generics/v3 v3.5.0 // indirect - github.com/bobg/go-generics/v4 v4.1.2 // indirect github.com/bufbuild/protocompile v0.14.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -70,6 +68,7 @@ require ( github.com/docker/docker v28.4.0+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.4 // indirect github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect diff --git a/go.sum b/go.sum index 72058970..b93b80bb 100644 --- a/go.sum +++ b/go.sum @@ -136,8 +136,6 @@ github.com/bobg/go-generics/v2 v2.2.2 h1:cHTV51Vr/wSlwiNWvncz66E4QtoRw9qXZeEiLAm github.com/bobg/go-generics/v2 v2.2.2/go.mod h1:ieOJ1ARFvk+HfMKbW1DT5UzJ/CJPKoiRm17QKK82bRE= github.com/bobg/go-generics/v3 v3.5.0 h1:OdBXzCRCO4e3Z7FQz1maEN2Q5LFYHc7vIK8EXcS4xQQ= github.com/bobg/go-generics/v3 v3.5.0/go.mod h1:wGlMLQER92clsh3cJoQjbUtUEJ03FoxnGhZjaWhf4fM= -github.com/bobg/go-generics/v4 v4.1.2 h1:iF62T5EypncG3kTYFTWzBgfZlwCgm84tUboY9TKZT+4= -github.com/bobg/go-generics/v4 v4.1.2/go.mod h1:KVwpxEYErjvcqjJSJqVNZd/JEq3SsQzb9t01+82pZGw= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= @@ -212,6 +210,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/drone/envsubst v1.0.3 h1:PCIBwNDYjs50AsLZPYdfhSATKaRg/FJmDc2D6+C2x8g= github.com/drone/envsubst v1.0.3/go.mod h1:N2jZmlMufstn1KEqvbHjw40h1KyTmnVzHcSc9bFiJ2g= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -624,8 +624,6 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261 h1:5vkJECHOocdRe88kDjBnSDFPj58ieC25EZzAmcEOy2Q= github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261/go.mod h1:3s/qTXV85jh40Li1gbYTRtbAOyFzpaHN0Bi1OtuXIDA= -github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c h1:LCJhSgR7XTiKxCtIBPyoFLQdYRCbFaW8EljIL8IvaWs= -github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c/go.mod h1:EbwZgN7FRZY6oNBmA7ufcaHZ215nDo3ejyyasuS3xj4= github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9 h1:B+YC18umjoESDnLKZScl0GZtPQoE0/U2Mz8shnfTISc= github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9/go.mod h1:f51ljuUsQEYuyuDdo5BB/4AfB87QDAegy5e8p5qBxis= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/tests/integration/db_changes_clickhouse_test.go b/tests/integration/db_changes_clickhouse_test.go index 71c41a1d..090d6be5 100644 --- a/tests/integration/db_changes_clickhouse_test.go +++ b/tests/integration/db_changes_clickhouse_test.go @@ -9,7 +9,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/jmoiron/sqlx" "github.com/streamingfast/bstream" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" diff --git a/tests/integration/db_changes_postgres_test.go b/tests/integration/db_changes_postgres_test.go index 6274c7ae..aa648e67 100644 --- a/tests/integration/db_changes_postgres_test.go +++ b/tests/integration/db_changes_postgres_test.go @@ -13,7 +13,7 @@ import ( _ "github.com/lib/pq" "github.com/streamingfast/bstream" "github.com/streamingfast/cli" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" diff --git a/tests/integration/db_proto_clickhouse_test.go b/tests/integration/db_proto_clickhouse_test.go index f2fb1e4c..1d14315c 100644 --- a/tests/integration/db_proto_clickhouse_test.go +++ b/tests/integration/db_proto_clickhouse_test.go @@ -12,7 +12,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/jmoiron/sqlx" "github.com/streamingfast/bstream" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_proto" pbrelations "github.com/streamingfast/substreams-sink-sql/pb/test/relations" "github.com/streamingfast/substreams/manifest" From c6672b06c695daa6f79d0e6ac846e9dbae0e05c0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Feb 2026 22:13:31 +0000 Subject: [PATCH 3/6] Fix flag conflicts in from_proto command - Exclude endpoint, start-block, and stop-block flags from sink library - Remove shorthand from substreams-endpoint to avoid conflict with sink's endpoint flag - This allows from_proto to manage its own endpoint and block range flags Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- cmd/substreams-sink-sql/from_proto.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/substreams-sink-sql/from_proto.go b/cmd/substreams-sink-sql/from_proto.go index cfff77de..51575b60 100644 --- a/cmd/substreams-sink-sql/from_proto.go +++ b/cmd/substreams-sink-sql/from_proto.go @@ -28,8 +28,8 @@ var fromProtoCmd = Command(fromProtoE, "", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size")) - flags.StringP("substreams-endpoint", "e", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.") + sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size", "endpoint", "start-block", "stop-block")) + flags.String("substreams-endpoint", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.") flags.StringP("start-block", "s", "", "Start block to stream from. If empty, will be replaced by initialBlock of the first module you are streaming. If negative, will be resolved by the server relative to the chain head") flags.StringP("stop-block", "t", "0", "Stop block to end stream at, exclusively. If the start-block is positive, a '+' prefix can indicate 'relative to start-block'") From 35a8e05f3af61cca0f14c2671ed9406ece9e154a Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 17 Feb 2026 22:14:48 +0000 Subject: [PATCH 4/6] Add error checking for flag.Set() calls Address code review feedback by adding proper error checking when setting flags programmatically Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- cmd/substreams-sink-sql/from_proto.go | 12 +++++++++--- cmd/substreams-sink-sql/generate_csv.go | 12 +++++++++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/cmd/substreams-sink-sql/from_proto.go b/cmd/substreams-sink-sql/from_proto.go index 51575b60..3a2c0f75 100644 --- a/cmd/substreams-sink-sql/from_proto.go +++ b/cmd/substreams-sink-sql/from_proto.go @@ -105,18 +105,24 @@ func fromProtoE(cmd *cobra.Command, args []string) error { } // Set the endpoint flag for the sink to use - cmd.Flags().Set("endpoint", endpoint) + if err := cmd.Flags().Set("endpoint", endpoint); err != nil { + return fmt.Errorf("setting endpoint flag: %w", err) + } startBlock := sflags.MustGetString(cmd, "start-block") if startBlock != "" { // Set the start-block flag for the sink to use - cmd.Flags().Set("start-block", startBlock) + if err := cmd.Flags().Set("start-block", startBlock); err != nil { + return fmt.Errorf("setting start-block flag: %w", err) + } } endBlock := sflags.MustGetString(cmd, "stop-block") if endBlock != "0" { // Set the stop-block flag for the sink to use - cmd.Flags().Set("stop-block", endBlock) + if err := cmd.Flags().Set("stop-block", endBlock); err != nil { + return fmt.Errorf("setting stop-block flag: %w", err) + } } dsn, err := db.ParseDSN(dsnString) diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index e9df5aeb..b001eaa5 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -74,10 +74,14 @@ func generateCsvE(cmd *cobra.Command, args []string) error { // Set the start and stop block flags from the parsed block range if br.StartBlock() > 0 { - cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())) + if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil { + return fmt.Errorf("setting start-block flag: %w", err) + } } if br.EndBlock() != nil { - cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())) + if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil { + return fmt.Errorf("setting stop-block flag: %w", err) + } } outputDir := sflags.MustGetString(cmd, "output-dir") @@ -88,7 +92,9 @@ func generateCsvE(cmd *cobra.Command, args []string) error { historyTableName := sflags.MustGetString(cmd, "history-table") // Set final-blocks-only flag to true for CSV generation - cmd.Flags().Set("final-blocks-only", "true") + if err := cmd.Flags().Set("final-blocks-only", "true"); err != nil { + return fmt.Errorf("setting final-blocks-only flag: %w", err) + } sink, err := sink.NewFromViper( cmd, From 690d940155013a100c19e81c2679d869e0b85467 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Feb 2026 00:09:47 +0000 Subject: [PATCH 5/6] Address PR review feedback - Use version variable from main CLI instead of hardcoded string (addresses comment 2819648900) - Restore backward compatibility for run command - accept [:] block range argument (addresses comment 2819653763) - Add comments explaining flag bridging with substreams/sink library (addresses comment 2819662071) - Add comment explaining readBlockRangeArgument equivalence (addresses comment 2819655754) Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- cmd/substreams-sink-sql/common_flags.go | 2 ++ cmd/substreams-sink-sql/from_proto.go | 9 ++++--- cmd/substreams-sink-sql/generate_csv.go | 9 ++++--- cmd/substreams-sink-sql/run.go | 36 +++++++++++++++++++++---- 4 files changed, 43 insertions(+), 13 deletions(-) diff --git a/cmd/substreams-sink-sql/common_flags.go b/cmd/substreams-sink-sql/common_flags.go index 810231cd..5172d3c9 100644 --- a/cmd/substreams-sink-sql/common_flags.go +++ b/cmd/substreams-sink-sql/common_flags.go @@ -44,6 +44,8 @@ func AddCommonDatabaseChangesFlags(flags *pflag.FlagSet) { } func readBlockRangeArgument(in string) (blockRange *bstream.Range, err error) { + // This replaces the old sink.ReadBlockRange which was removed in the new sink API. + // bstream.ParseRange handles the same block range format (e.g., "100:200", "100:", ":200") return bstream.ParseRange(in) } diff --git a/cmd/substreams-sink-sql/from_proto.go b/cmd/substreams-sink-sql/from_proto.go index 3a2c0f75..dc3b6a79 100644 --- a/cmd/substreams-sink-sql/from_proto.go +++ b/cmd/substreams-sink-sql/from_proto.go @@ -104,14 +104,15 @@ func fromProtoE(cmd *cobra.Command, args []string) error { } } - // Set the endpoint flag for the sink to use + // Bridge endpoint with substreams/sink library by setting the endpoint flag + // The sink library expects endpoint via --endpoint flag, but this command uses --substreams-endpoint if err := cmd.Flags().Set("endpoint", endpoint); err != nil { return fmt.Errorf("setting endpoint flag: %w", err) } startBlock := sflags.MustGetString(cmd, "start-block") if startBlock != "" { - // Set the start-block flag for the sink to use + // Bridge start-block with substreams/sink library if err := cmd.Flags().Set("start-block", startBlock); err != nil { return fmt.Errorf("setting start-block flag: %w", err) } @@ -119,7 +120,7 @@ func fromProtoE(cmd *cobra.Command, args []string) error { endBlock := sflags.MustGetString(cmd, "stop-block") if endBlock != "0" { - // Set the stop-block flag for the sink to use + // Bridge stop-block with substreams/sink library if err := cmd.Flags().Set("stop-block", endBlock); err != nil { return fmt.Errorf("setting stop-block flag: %w", err) } @@ -224,7 +225,7 @@ func fromProtoE(cmd *cobra.Command, args []string) error { outputType, manifestPath, outputModuleName, - "substreams-sink-sql/1.0.0", + fmt.Sprintf("substreams-sink-sql/%s", version), zlog, tracer, ) diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index b001eaa5..7c94138e 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -66,18 +66,19 @@ func generateCsvE(cmd *cobra.Command, args []string) error { manifestPath := args[1] blockRange := args[2] - // Parse block range and set flags + // Parse block range and set flags to bridge with substreams/sink library br, err := readBlockRangeArgument(blockRange) if err != nil { return fmt.Errorf("invalid block range %q: %w", blockRange, err) } - // Set the start and stop block flags from the parsed block range + // Bridge start-block flag with substreams/sink library if br.StartBlock() > 0 { if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil { return fmt.Errorf("setting start-block flag: %w", err) } } + // Bridge stop-block flag with substreams/sink library if br.EndBlock() != nil { if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil { return fmt.Errorf("setting stop-block flag: %w", err) @@ -91,7 +92,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error { cursorTableName := sflags.MustGetString(cmd, "cursors-table") historyTableName := sflags.MustGetString(cmd, "history-table") - // Set final-blocks-only flag to true for CSV generation + // Bridge final-blocks-only flag with substreams/sink library (required for CSV generation) if err := cmd.Flags().Set("final-blocks-only", "true"); err != nil { return fmt.Errorf("setting final-blocks-only flag: %w", err) } @@ -101,7 +102,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error { supportedOutputTypes, manifestPath, sink.InferOutputModuleFromPackage, - "substreams-sink-sql/1.0.0", + fmt.Sprintf("substreams-sink-sql/%s", version), zlog, tracer, ) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index a12147cf..5562fc82 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -2,6 +2,7 @@ package main import ( "fmt" + "strings" "time" "github.com/spf13/cobra" @@ -13,7 +14,7 @@ import ( ) var sinkRunCmd = Command(sinkRunE, - "run []", + "run [:]", "Runs SQL sink process", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { @@ -40,17 +41,42 @@ func sinkRunE(cmd *cobra.Command, args []string) error { dsnString := args[0] manifestPath := args[1] - moduleName := "" + + // Handle third argument - can be either block range or module name + // For backward compatibility, if it contains ':', treat it as block range if len(args) > 2 { - moduleName = args[2] + thirdArg := args[2] + // Check if it looks like a block range (contains ':') + if strings.Contains(thirdArg, ":") { + // Parse and set block range flags to bridge with substreams/sink library + br, err := readBlockRangeArgument(thirdArg) + if err != nil { + return fmt.Errorf("invalid block range %q: %w", thirdArg, err) + } + + if br.StartBlock() > 0 { + if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil { + return fmt.Errorf("setting start-block flag: %w", err) + } + } + if br.EndBlock() != nil { + if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil { + return fmt.Errorf("setting stop-block flag: %w", err) + } + } + } else { + // Treat as module name (new behavior, for forward compatibility) + // Module name is handled via sink.InferOutputModuleFromPackage by default + // or can be overridden, but we'll just pass empty string to let it infer + } } sink, err := sink.NewFromViper( cmd, supportedOutputTypes, manifestPath, - moduleName, - "substreams-sink-sql/1.0.0", + sink.InferOutputModuleFromPackage, + fmt.Sprintf("substreams-sink-sql/%s", version), zlog, tracer, ) From f06ed163c0801f15f14d1ee36596ca1101ff53a3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 18 Feb 2026 17:50:16 +0000 Subject: [PATCH 6/6] Fix CI test failure in sinker_test.go Update sink.New() call to use new SinkerConfig structure instead of individual parameters. The sink API changed from accepting multiple parameters to a single SinkerConfig struct. Fixes test failure: db_changes/sinker/sinker_test.go:210:55: too many arguments in call to sink.New Co-authored-by: maoueh <123014+maoueh@users.noreply.github.com> --- db_changes/sinker/sinker_test.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/db_changes/sinker/sinker_test.go b/db_changes/sinker/sinker_test.go index 600a8bda..c2ec62b6 100644 --- a/db_changes/sinker/sinker_test.go +++ b/db_changes/sinker/sinker_test.go @@ -13,6 +13,7 @@ import ( pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams/client" + "github.com/streamingfast/substreams/manifest" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/stretchr/testify/assert" @@ -207,7 +208,16 @@ func TestSinker_SQLStatements(t *testing.T) { logger, tracer, ) - s, err := sink.New(sink.SubstreamsModeDevelopment, false, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil) + s, err := sink.New(&sink.SinkerConfig{ + Pkg: testPackage, + OutputModule: testPackage.Modules.Modules[0], + OutputModuleHash: manifest.ModuleHash([]byte("unused")), + ClientConfig: testClientConfig, + Mode: sink.SubstreamsModeDevelopment, + NoopMode: false, + Logger: logger, + Tracer: nil, + }) require.NoError(t, err) sinker, _ := New(s, l, logger, nil, 3, 1*time.Second)