diff --git a/cmd/substreams-sink-sql/common_flags.go b/cmd/substreams-sink-sql/common_flags.go index 47594981..5172d3c9 100644 --- a/cmd/substreams-sink-sql/common_flags.go +++ b/cmd/substreams-sink-sql/common_flags.go @@ -10,8 +10,6 @@ import ( "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" - pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "go.uber.org/zap" ) @@ -46,10 +44,9 @@ func AddCommonDatabaseChangesFlags(flags *pflag.FlagSet) { } func readBlockRangeArgument(in string) (blockRange *bstream.Range, err error) { - return sink.ReadBlockRange(&pbsubstreams.Module{ - Name: "dummy", - InitialBlock: 0, - }, in) + // This replaces the old sink.ReadBlockRange which was removed in the new sink API. + // bstream.ParseRange handles the same block range format (e.g., "100:200", "100:", ":200") + return bstream.ParseRange(in) } type cliApplication struct { diff --git a/cmd/substreams-sink-sql/from_proto.go b/cmd/substreams-sink-sql/from_proto.go index 78159330..dc3b6a79 100644 --- a/cmd/substreams-sink-sql/from_proto.go +++ b/cmd/substreams-sink-sql/from_proto.go @@ -11,7 +11,7 @@ import ( "github.com/streamingfast/cli" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" sinksql "github.com/streamingfast/substreams-sink-sql" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" @@ -28,8 +28,8 @@ var fromProtoCmd = Command(fromProtoE, "", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, ignoreUndoBufferSize{}) - flags.StringP("substreams-endpoint", "e", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.") + sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size", "endpoint", "start-block", "stop-block")) + flags.String("substreams-endpoint", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.") flags.StringP("start-block", "s", "", "Start block to stream from. If empty, will be replaced by initialBlock of the first module you are streaming. If negative, will be resolved by the server relative to the chain head") flags.StringP("stop-block", "t", "0", "Stop block to end stream at, exclusively. If the start-block is positive, a '+' prefix can indicate 'relative to start-block'") @@ -103,16 +103,27 @@ func fromProtoE(cmd *cobra.Command, args []string) error { return err } } + + // Bridge endpoint with substreams/sink library by setting the endpoint flag + // The sink library expects endpoint via --endpoint flag, but this command uses --substreams-endpoint + if err := cmd.Flags().Set("endpoint", endpoint); err != nil { + return fmt.Errorf("setting endpoint flag: %w", err) + } startBlock := sflags.MustGetString(cmd, "start-block") - endBlock := sflags.MustGetString(cmd, "stop-block") - blockRange := "" if startBlock != "" { - blockRange = startBlock + // Bridge start-block with substreams/sink library + if err := cmd.Flags().Set("start-block", startBlock); err != nil { + return fmt.Errorf("setting start-block flag: %w", err) + } } - blockRange += ":" + + endBlock := sflags.MustGetString(cmd, "stop-block") if endBlock != "0" { - blockRange += endBlock + // Bridge stop-block with substreams/sink library + if err := cmd.Flags().Set("stop-block", endBlock); err != nil { + return fmt.Errorf("setting stop-block flag: %w", err) + } } dsn, err := db.ParseDSN(dsnString) @@ -121,7 +132,7 @@ func fromProtoE(cmd *cobra.Command, args []string) error { } //todo: handle params - spkg, module, _, _, err := sink.ReadManifestAndModuleAndBlockRange(manifestPath, "", nil, outputModuleName, "", false, "", zlog) + spkg, module, _, err := sink.ReadManifestAndModule(manifestPath, "", nil, outputModuleName, "", false, nil, zlog) if err != nil { return fmt.Errorf("reading manifest: %w", err) } @@ -212,10 +223,9 @@ func fromProtoE(cmd *cobra.Command, args []string) error { baseSink, err := sink.NewFromViper( cmd, outputType, - endpoint, manifestPath, outputModuleName, - blockRange, + fmt.Sprintf("substreams-sink-sql/%s", version), zlog, tracer, ) diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index 195c34ca..7c94138e 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -9,10 +9,9 @@ import ( "github.com/spf13/pflag" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" - "github.com/streamingfast/substreams/manifest" ) // lastCursorFilename is the name of the file where the last cursor is stored, no extension as it's added by the store @@ -36,14 +35,13 @@ var generateCsvCmd = Command(generateCsvE, `), ExactArgs(3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, sink.FlagIgnore("final-blocks-only")) + sink.AddFlagsToSet(flags) AddCommonSinkerFlags(flags) AddCommonDatabaseChangesFlags(flags) flags.Uint64("bundle-size", 10000, "Size of output bundle, in blocks") flags.String("working-dir", "./workdir", "Path to local folder used as working directory") flags.String("output-dir", "./csv-output", "Path to local folder used as destination for CSV") - flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") flags.Uint64("buffer-max-size", 4*1024*1024, FlagDescription(` Amount of memory bytes to allocate to the buffered writer. If your data set is small enough that every is hold in memory, we are going to avoid the local I/O operation(s) and upload accumulated content in memory directly to final storage location. @@ -62,13 +60,31 @@ var generateCsvCmd = Command(generateCsvE, func generateCsvE(cmd *cobra.Command, args []string) error { app := NewApplication(cmd.Context()) - sink.RegisterMetrics() sinker2.RegisterMetrics() dsnString := args[0] manifestPath := args[1] blockRange := args[2] + // Parse block range and set flags to bridge with substreams/sink library + br, err := readBlockRangeArgument(blockRange) + if err != nil { + return fmt.Errorf("invalid block range %q: %w", blockRange, err) + } + + // Bridge start-block flag with substreams/sink library + if br.StartBlock() > 0 { + if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil { + return fmt.Errorf("setting start-block flag: %w", err) + } + } + // Bridge stop-block flag with substreams/sink library + if br.EndBlock() != nil { + if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil { + return fmt.Errorf("setting stop-block flag: %w", err) + } + } + outputDir := sflags.MustGetString(cmd, "output-dir") bundleSize := sflags.MustGetUint64(cmd, "bundle-size") bufferMaxSize := sflags.MustGetUint64(cmd, "buffer-max-size") @@ -76,37 +92,19 @@ func generateCsvE(cmd *cobra.Command, args []string) error { cursorTableName := sflags.MustGetString(cmd, "cursors-table") historyTableName := sflags.MustGetString(cmd, "history-table") - endpoint := sflags.MustGetString(cmd, "endpoint") - if endpoint == "" { - network := sflags.MustGetString(cmd, "network") - if network == "" { - reader, err := manifest.NewReader(manifestPath) - if err != nil { - return fmt.Errorf("setup manifest reader: %w", err) - } - pkgBundle, err := reader.Read() - if err != nil { - return fmt.Errorf("read manifest: %w", err) - } - network = pkgBundle.Package.Network - } - var err error - endpoint, err = manifest.ExtractNetworkEndpoint(network, sflags.MustGetString(cmd, "endpoint"), zlog) - if err != nil { - return err - } + // Bridge final-blocks-only flag with substreams/sink library (required for CSV generation) + if err := cmd.Flags().Set("final-blocks-only", "true"); err != nil { + return fmt.Errorf("setting final-blocks-only flag: %w", err) } sink, err := sink.NewFromViper( cmd, supportedOutputTypes, - endpoint, manifestPath, sink.InferOutputModuleFromPackage, - blockRange, + fmt.Sprintf("substreams-sink-sql/%s", version), zlog, tracer, - sink.WithFinalBlocksOnly(), ) if err != nil { return fmt.Errorf("new base sinker: %w", err) diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index db73c616..5562fc82 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -2,29 +2,23 @@ package main import ( "fmt" + "strings" "time" "github.com/spf13/cobra" "github.com/spf13/pflag" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" - "github.com/streamingfast/substreams/manifest" ) -type ignoreUndoBufferSize struct{} - -func (i ignoreUndoBufferSize) IsIgnored(in string) bool { - return in == "undo-buffer-size" -} - var sinkRunCmd = Command(sinkRunE, "run [:]", "Runs SQL sink process", RangeArgs(2, 3), Flags(func(flags *pflag.FlagSet) { - sink.AddFlagsToSet(flags, ignoreUndoBufferSize{}) + sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size")) AddCommonSinkerFlags(flags) AddCommonDatabaseChangesFlags(flags) @@ -35,7 +29,6 @@ var sinkRunCmd = Command(sinkRunE, flags.Int("flush-interval", 0, "(deprecated) please use --batch-block-flush-interval instead") flags.Int("flush-retry-count", 3, "Number of retry attempts for flush operations") flags.Duration("flush-retry-delay", 1*time.Second, "Base delay for incremental retry backoff on flush failures") - flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' uniswap-v3@v0.2.10"), OnCommandErrorLogAndExit(zlog), @@ -44,44 +37,46 @@ var sinkRunCmd = Command(sinkRunE, func sinkRunE(cmd *cobra.Command, args []string) error { app := NewApplication(cmd.Context()) - sink.RegisterMetrics() sinker2.RegisterMetrics() dsnString := args[0] manifestPath := args[1] - blockRange := "" + + // Handle third argument - can be either block range or module name + // For backward compatibility, if it contains ':', treat it as block range if len(args) > 2 { - blockRange = args[2] - } - - endpoint := sflags.MustGetString(cmd, "endpoint") - if endpoint == "" { - network := sflags.MustGetString(cmd, "network") - if network == "" { - reader, err := manifest.NewReader(manifestPath) + thirdArg := args[2] + // Check if it looks like a block range (contains ':') + if strings.Contains(thirdArg, ":") { + // Parse and set block range flags to bridge with substreams/sink library + br, err := readBlockRangeArgument(thirdArg) if err != nil { - return fmt.Errorf("setup manifest reader: %w", err) + return fmt.Errorf("invalid block range %q: %w", thirdArg, err) } - pkgBundle, err := reader.Read() - if err != nil { - return fmt.Errorf("read manifest: %w", err) + + if br.StartBlock() > 0 { + if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil { + return fmt.Errorf("setting start-block flag: %w", err) + } } - network = pkgBundle.Package.Network - } - var err error - endpoint, err = manifest.ExtractNetworkEndpoint(network, sflags.MustGetString(cmd, "endpoint"), zlog) - if err != nil { - return err + if br.EndBlock() != nil { + if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil { + return fmt.Errorf("setting stop-block flag: %w", err) + } + } + } else { + // Treat as module name (new behavior, for forward compatibility) + // Module name is handled via sink.InferOutputModuleFromPackage by default + // or can be overridden, but we'll just pass empty string to let it infer } } sink, err := sink.NewFromViper( cmd, supportedOutputTypes, - endpoint, manifestPath, sink.InferOutputModuleFromPackage, - blockRange, + fmt.Sprintf("substreams-sink-sql/%s", version), zlog, tracer, ) diff --git a/cmd/substreams-sink-sql/tools.go b/cmd/substreams-sink-sql/tools.go index ef0dca1c..7631ea4c 100644 --- a/cmd/substreams-sink-sql/tools.go +++ b/cmd/substreams-sink-sql/tools.go @@ -11,7 +11,7 @@ import ( "github.com/streamingfast/cli" . "github.com/streamingfast/cli" "github.com/streamingfast/cli/sflags" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" ) diff --git a/db_changes/db/cursor.go b/db_changes/db/cursor.go index 94ac097b..2ef924e6 100644 --- a/db_changes/db/cursor.go +++ b/db_changes/db/cursor.go @@ -8,7 +8,7 @@ import ( "strings" "github.com/lithammer/dedent" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" ) diff --git a/db_changes/db/dialect.go b/db_changes/db/dialect.go index 5f495e69..8ea60cb8 100644 --- a/db_changes/db/dialect.go +++ b/db_changes/db/dialect.go @@ -5,7 +5,7 @@ import ( "database/sql" "fmt" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" ) type UnknownDriverError struct { diff --git a/db_changes/db/dialect_clickhouse.go b/db_changes/db/dialect_clickhouse.go index 0d182ef6..42701531 100644 --- a/db_changes/db/dialect_clickhouse.go +++ b/db_changes/db/dialect_clickhouse.go @@ -15,7 +15,7 @@ import ( clickhouse "github.com/AfterShip/clickhouse-sql-parser/parser" _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/streamingfast/cli" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" "golang.org/x/exp/maps" ) diff --git a/db_changes/db/dialect_postgres.go b/db_changes/db/dialect_postgres.go index 6dd3c7cd..0131a6c4 100644 --- a/db_changes/db/dialect_postgres.go +++ b/db_changes/db/dialect_postgres.go @@ -14,7 +14,7 @@ import ( "time" "github.com/streamingfast/cli" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" "golang.org/x/exp/maps" ) diff --git a/db_changes/db/flush.go b/db_changes/db/flush.go index 87d77831..36e71484 100644 --- a/db_changes/db/flush.go +++ b/db_changes/db/flush.go @@ -7,7 +7,7 @@ import ( "github.com/ClickHouse/clickhouse-go/v2" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" ) diff --git a/db_changes/sinker/factory.go b/db_changes/sinker/factory.go index 12be8882..e8fd6b5c 100644 --- a/db_changes/sinker/factory.go +++ b/db_changes/sinker/factory.go @@ -7,7 +7,7 @@ import ( "time" "github.com/streamingfast/logging" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_changes/db" "go.uber.org/zap" ) diff --git a/db_changes/sinker/generate_csv_sinker.go b/db_changes/sinker/generate_csv_sinker.go index 84b130c1..3ef3f440 100644 --- a/db_changes/sinker/generate_csv_sinker.go +++ b/db_changes/sinker/generate_csv_sinker.go @@ -17,7 +17,7 @@ import ( "github.com/streamingfast/logging" "github.com/streamingfast/logging/zapx" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" bundler2 "github.com/streamingfast/substreams-sink-sql/db_changes/bundler" writer2 "github.com/streamingfast/substreams-sink-sql/db_changes/bundler/writer" @@ -61,8 +61,8 @@ func NewGenerateCSVSinker( logger *zap.Logger, tracer logging.Tracer, ) (*GenerateCSVSinker, error) { - blockRange := sink.BlockRange() - if blockRange == nil || blockRange.EndBlock() == nil { + stopBlock := sink.StopBlock() + if stopBlock == 0 { return nil, fmt.Errorf("sink must have a stop block defined") } @@ -99,7 +99,7 @@ func NewGenerateCSVSinker( bundlersByTable: make(map[string]*bundler2.Bundler), cursorsTableStore: cursorsStore, lastCursorFilename: lastCursorFilename, - stopBlock: *blockRange.EndBlock(), + stopBlock: stopBlock, loader: loader, logger: logger, @@ -116,7 +116,7 @@ func NewGenerateCSVSinker( tables := s.loader.GetAvailableTablesInSchema() for _, table := range tables { columns := s.loader.GetColumnsForTable(table) - fb, err := getBundler(table, s.Sinker.BlockRange().StartBlock(), s.stopBlock, bundleSize, bufferSize, csvOutputStore, workingDir, logger, columns) + fb, err := getBundler(table, uint64(s.Sinker.StartBlock()), s.stopBlock, bundleSize, bufferSize, csvOutputStore, workingDir, logger, columns) if err != nil { return nil, err } diff --git a/db_changes/sinker/sinker.go b/db_changes/sinker/sinker.go index d7a1b7fa..e2a6b8eb 100644 --- a/db_changes/sinker/sinker.go +++ b/db_changes/sinker/sinker.go @@ -10,7 +10,7 @@ import ( "github.com/streamingfast/logging" "github.com/streamingfast/logging/zapx" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" @@ -334,8 +334,9 @@ func protoUpdateOpToDbUpdateOp(op pbdatabase.Field_UpdateOp) db2.UpdateOp { func (s *SQLSinker) HandleBlockRangeCompletion(ctx context.Context, cursor *sink.Cursor) error { // To be moved in the base sinker library, happens usually only on integration tests where the connection // can close with "nil" error but we haven't completed the range for real yet. - if !s.Sinker.BlockRange().ReachedEndBlock(cursor.Block().Num()) { - s.logger.Debug("range not completed yet, skipping", zap.Stringer("block", cursor.Block()), zap.Stringer("range", s.Sinker.BlockRange())) + stopBlock := s.Sinker.StopBlock() + if stopBlock > 0 && cursor.Block().Num() < stopBlock { + s.logger.Debug("range not completed yet, skipping", zap.Stringer("block", cursor.Block()), zap.Uint64("stop_block", stopBlock)) return nil } diff --git a/db_changes/sinker/sinker_test.go b/db_changes/sinker/sinker_test.go index 949c497c..c2ec62b6 100644 --- a/db_changes/sinker/sinker_test.go +++ b/db_changes/sinker/sinker_test.go @@ -9,10 +9,11 @@ import ( _ "github.com/lib/pq" "github.com/streamingfast/bstream" "github.com/streamingfast/logging" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams/client" + "github.com/streamingfast/substreams/manifest" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1" "github.com/stretchr/testify/assert" @@ -207,7 +208,16 @@ func TestSinker_SQLStatements(t *testing.T) { logger, tracer, ) - s, err := sink.New(sink.SubstreamsModeDevelopment, false, testPackage, testPackage.Modules.Modules[0], []byte("unused"), testClientConfig, logger, nil) + s, err := sink.New(&sink.SinkerConfig{ + Pkg: testPackage, + OutputModule: testPackage.Modules.Modules[0], + OutputModuleHash: manifest.ModuleHash([]byte("unused")), + ClientConfig: testClientConfig, + Mode: sink.SubstreamsModeDevelopment, + NoopMode: false, + Logger: logger, + Tracer: nil, + }) require.NoError(t, err) sinker, _ := New(s, l, logger, nil, 3, 1*time.Second) diff --git a/db_changes/sinker/stats.go b/db_changes/sinker/stats.go index a82a166c..efc39ca7 100644 --- a/db_changes/sinker/stats.go +++ b/db_changes/sinker/stats.go @@ -6,7 +6,7 @@ import ( "github.com/streamingfast/bstream" "github.com/streamingfast/dmetrics" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "go.uber.org/zap" ) diff --git a/db_changes/state/file.go b/db_changes/state/file.go index d3b81b18..9c0dc538 100644 --- a/db_changes/state/file.go +++ b/db_changes/state/file.go @@ -12,7 +12,7 @@ import ( "github.com/streamingfast/dhammer" "github.com/streamingfast/dstore" "github.com/streamingfast/shutter" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_changes/bundler/writer" "go.uber.org/zap" "gopkg.in/yaml.v3" diff --git a/db_changes/state/interface.go b/db_changes/state/interface.go index 4f6192b9..00ed6018 100644 --- a/db_changes/state/interface.go +++ b/db_changes/state/interface.go @@ -4,7 +4,7 @@ import ( "context" "github.com/streamingfast/bstream" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_changes/bundler/writer" ) diff --git a/db_proto/sinker.go b/db_proto/sinker.go index a8b00542..195adc78 100644 --- a/db_proto/sinker.go +++ b/db_proto/sinker.go @@ -7,7 +7,7 @@ import ( "time" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" sql "github.com/streamingfast/substreams-sink-sql/db_proto/sql" "github.com/streamingfast/substreams-sink-sql/db_proto/stats" pbsubstreamsrpc "github.com/streamingfast/substreams/pb/sf/substreams/rpc/v2" diff --git a/db_proto/sinker_factory.go b/db_proto/sinker_factory.go index ba210ae9..25f6f284 100644 --- a/db_proto/sinker_factory.go +++ b/db_proto/sinker_factory.go @@ -6,7 +6,7 @@ import ( "time" "github.com/streamingfast/logging" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" protosql "github.com/streamingfast/substreams-sink-sql/db_proto/sql" diff --git a/db_proto/sql/click_house/database.go b/db_proto/sql/click_house/database.go index 9c4ef025..fb7f56ad 100644 --- a/db_proto/sql/click_house/database.go +++ b/db_proto/sql/click_house/database.go @@ -13,7 +13,7 @@ import ( "github.com/ClickHouse/ch-go" "github.com/streamingfast/logging" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_proto/sql" diff --git a/db_proto/sql/database.go b/db_proto/sql/database.go index cb0eb523..29b14c45 100644 --- a/db_proto/sql/database.go +++ b/db_proto/sql/database.go @@ -6,7 +6,7 @@ import ( "strings" "time" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbSchema "github.com/streamingfast/substreams-sink-sql/pb/sf/substreams/sink/sql/schema/v1" "github.com/streamingfast/substreams-sink-sql/proto" "go.uber.org/zap" diff --git a/db_proto/sql/postgres/database.go b/db_proto/sql/postgres/database.go index 2ccc96bf..087a93d0 100644 --- a/db_proto/sql/postgres/database.go +++ b/db_proto/sql/postgres/database.go @@ -8,7 +8,7 @@ import ( "time" "github.com/streamingfast/logging/zapx" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/bytes" "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_proto/sql" diff --git a/go.mod b/go.mod index f61a5df8..b0e66760 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,6 @@ require ( github.com/spf13/viper v1.15.0 github.com/streamingfast/logging v0.0.0-20251127143054-23a35e5bd633 github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261 - github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9 github.com/stretchr/testify v1.11.1 github.com/testcontainers/testcontainers-go v0.38.0 @@ -51,7 +50,6 @@ require ( github.com/bahlo/generic-list-go v0.2.0 // indirect github.com/bits-and-blooms/bitset v1.12.0 // indirect github.com/bobg/go-generics/v3 v3.5.0 // indirect - github.com/bobg/go-generics/v4 v4.1.2 // indirect github.com/bufbuild/protocompile v0.14.1 // indirect github.com/buger/jsonparser v1.1.1 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect @@ -70,6 +68,7 @@ require ( github.com/docker/docker v28.4.0+incompatible // indirect github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect + github.com/dustin/go-humanize v1.0.1 // indirect github.com/ebitengine/purego v0.8.4 // indirect github.com/envoyproxy/go-control-plane/envoy v1.32.4 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect diff --git a/go.sum b/go.sum index 72058970..b93b80bb 100644 --- a/go.sum +++ b/go.sum @@ -136,8 +136,6 @@ github.com/bobg/go-generics/v2 v2.2.2 h1:cHTV51Vr/wSlwiNWvncz66E4QtoRw9qXZeEiLAm github.com/bobg/go-generics/v2 v2.2.2/go.mod h1:ieOJ1ARFvk+HfMKbW1DT5UzJ/CJPKoiRm17QKK82bRE= github.com/bobg/go-generics/v3 v3.5.0 h1:OdBXzCRCO4e3Z7FQz1maEN2Q5LFYHc7vIK8EXcS4xQQ= github.com/bobg/go-generics/v3 v3.5.0/go.mod h1:wGlMLQER92clsh3cJoQjbUtUEJ03FoxnGhZjaWhf4fM= -github.com/bobg/go-generics/v4 v4.1.2 h1:iF62T5EypncG3kTYFTWzBgfZlwCgm84tUboY9TKZT+4= -github.com/bobg/go-generics/v4 v4.1.2/go.mod h1:KVwpxEYErjvcqjJSJqVNZd/JEq3SsQzb9t01+82pZGw= github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= @@ -212,6 +210,8 @@ github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4 github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/drone/envsubst v1.0.3 h1:PCIBwNDYjs50AsLZPYdfhSATKaRg/FJmDc2D6+C2x8g= github.com/drone/envsubst v1.0.3/go.mod h1:N2jZmlMufstn1KEqvbHjw40h1KyTmnVzHcSc9bFiJ2g= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= github.com/ebitengine/purego v0.8.4 h1:CF7LEKg5FFOsASUj0+QwaXf8Ht6TlFxg09+S9wz0omw= github.com/ebitengine/purego v0.8.4/go.mod h1:iIjxzd6CiRiOG0UyXP+V1+jWqUXVjPKLAI0mRfJZTmQ= github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= @@ -624,8 +624,6 @@ github.com/streamingfast/shutter v1.5.0 h1:NpzDYzj0HVpSiDJVO/FFSL6QIK/YKOxY0gJAt github.com/streamingfast/shutter v1.5.0/go.mod h1:B/T6efqdeMGbGwjzPS1ToXzYZI4kDzI5/u4I+7qbjY8= github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261 h1:5vkJECHOocdRe88kDjBnSDFPj58ieC25EZzAmcEOy2Q= github.com/streamingfast/substreams v1.16.7-0.20250925152521-9d7a8ef0f261/go.mod h1:3s/qTXV85jh40Li1gbYTRtbAOyFzpaHN0Bi1OtuXIDA= -github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c h1:LCJhSgR7XTiKxCtIBPyoFLQdYRCbFaW8EljIL8IvaWs= -github.com/streamingfast/substreams-sink v0.5.3-0.20250818134825-6b25ffb8232c/go.mod h1:EbwZgN7FRZY6oNBmA7ufcaHZ215nDo3ejyyasuS3xj4= github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9 h1:B+YC18umjoESDnLKZScl0GZtPQoE0/U2Mz8shnfTISc= github.com/streamingfast/substreams-sink-database-changes v1.3.2-0.20260110015235-04b544bbecb9/go.mod h1:f51ljuUsQEYuyuDdo5BB/4AfB87QDAegy5e8p5qBxis= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/tests/integration/db_changes_clickhouse_test.go b/tests/integration/db_changes_clickhouse_test.go index 71c41a1d..090d6be5 100644 --- a/tests/integration/db_changes_clickhouse_test.go +++ b/tests/integration/db_changes_clickhouse_test.go @@ -9,7 +9,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/jmoiron/sqlx" "github.com/streamingfast/bstream" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" diff --git a/tests/integration/db_changes_postgres_test.go b/tests/integration/db_changes_postgres_test.go index 6274c7ae..aa648e67 100644 --- a/tests/integration/db_changes_postgres_test.go +++ b/tests/integration/db_changes_postgres_test.go @@ -13,7 +13,7 @@ import ( _ "github.com/lib/pq" "github.com/streamingfast/bstream" "github.com/streamingfast/cli" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" pbdatabase "github.com/streamingfast/substreams-sink-database-changes/pb/sf/substreams/sink/database/v1" db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db" "github.com/streamingfast/substreams-sink-sql/db_changes/sinker" diff --git a/tests/integration/db_proto_clickhouse_test.go b/tests/integration/db_proto_clickhouse_test.go index f2fb1e4c..1d14315c 100644 --- a/tests/integration/db_proto_clickhouse_test.go +++ b/tests/integration/db_proto_clickhouse_test.go @@ -12,7 +12,7 @@ import ( _ "github.com/ClickHouse/clickhouse-go/v2" "github.com/jmoiron/sqlx" "github.com/streamingfast/bstream" - sink "github.com/streamingfast/substreams-sink" + sink "github.com/streamingfast/substreams/sink" "github.com/streamingfast/substreams-sink-sql/db_proto" pbrelations "github.com/streamingfast/substreams-sink-sql/pb/test/relations" "github.com/streamingfast/substreams/manifest"