Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
"github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
"github.com/streamingfast/shutter"
sink "github.com/streamingfast/substreams-sink"
pbsubstreams "github.com/streamingfast/substreams/pb/sf/substreams/v1"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -46,10 +44,9 @@ func AddCommonDatabaseChangesFlags(flags *pflag.FlagSet) {
}

func readBlockRangeArgument(in string) (blockRange *bstream.Range, err error) {
return sink.ReadBlockRange(&pbsubstreams.Module{
Name: "dummy",
InitialBlock: 0,
}, in)
// This replaces the old sink.ReadBlockRange which was removed in the new sink API.
// bstream.ParseRange handles the same block range format (e.g., "100:200", "100:", ":200")
return bstream.ParseRange(in)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this equivalent to previous code?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's equivalent (commit 690d940). Added a comment explaining that bstream.ParseRange handles the same block range format that the old sink.ReadBlockRange did (e.g., "100:200", "100:", ":200"). The only difference is the API surface changed but the parsing behavior remains the same.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot It was not, but DO NOT change anything, I'm handling that locally.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood. No changes will be made to this file from my side.

}

type cliApplication struct {
Expand Down
32 changes: 21 additions & 11 deletions cmd/substreams-sink-sql/from_proto.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/streamingfast/cli"
. "github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
sinksql "github.com/streamingfast/substreams-sink-sql"
"github.com/streamingfast/substreams-sink-sql/bytes"
"github.com/streamingfast/substreams-sink-sql/db_changes/db"
Expand All @@ -28,8 +28,8 @@ var fromProtoCmd = Command(fromProtoE,
"",
RangeArgs(2, 3),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags, ignoreUndoBufferSize{})
flags.StringP("substreams-endpoint", "e", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.")
sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size", "endpoint", "start-block", "stop-block"))
flags.String("substreams-endpoint", "", "Substreams gRPC endpoint. If empty, will be replaced by the SUBSTREAMS_ENDPOINT_{network_name} environment variable, where `network_name` is determined from the substreams manifest. Some network names have default endpoints.")
flags.StringP("start-block", "s", "", "Start block to stream from. If empty, will be replaced by initialBlock of the first module you are streaming. If negative, will be resolved by the server relative to the chain head")
flags.StringP("stop-block", "t", "0", "Stop block to end stream at, exclusively. If the start-block is positive, a '+' prefix can indicate 'relative to start-block'")

Expand Down Expand Up @@ -103,16 +103,27 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
return err
}
}

// Bridge endpoint with substreams/sink library by setting the endpoint flag
// The sink library expects endpoint via --endpoint flag, but this command uses --substreams-endpoint
if err := cmd.Flags().Set("endpoint", endpoint); err != nil {
return fmt.Errorf("setting endpoint flag: %w", err)
}

startBlock := sflags.MustGetString(cmd, "start-block")
endBlock := sflags.MustGetString(cmd, "stop-block")
blockRange := ""
if startBlock != "" {
blockRange = startBlock
// Bridge start-block with substreams/sink library
if err := cmd.Flags().Set("start-block", startBlock); err != nil {
return fmt.Errorf("setting start-block flag: %w", err)
}
}
blockRange += ":"

endBlock := sflags.MustGetString(cmd, "stop-block")
if endBlock != "0" {
blockRange += endBlock
// Bridge stop-block with substreams/sink library
if err := cmd.Flags().Set("stop-block", endBlock); err != nil {
return fmt.Errorf("setting stop-block flag: %w", err)
}
}

dsn, err := db.ParseDSN(dsnString)
Expand All @@ -121,7 +132,7 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
}

//todo: handle params
spkg, module, _, _, err := sink.ReadManifestAndModuleAndBlockRange(manifestPath, "", nil, outputModuleName, "", false, "", zlog)
spkg, module, _, err := sink.ReadManifestAndModule(manifestPath, "", nil, outputModuleName, "", false, nil, zlog)
if err != nil {
return fmt.Errorf("reading manifest: %w", err)
}
Expand Down Expand Up @@ -212,10 +223,9 @@ func fromProtoE(cmd *cobra.Command, args []string) error {
baseSink, err := sink.NewFromViper(
cmd,
outputType,
endpoint,
manifestPath,
outputModuleName,
blockRange,
fmt.Sprintf("substreams-sink-sql/%s", version),
zlog,
tracer,
)
Expand Down
52 changes: 25 additions & 27 deletions cmd/substreams-sink-sql/generate_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,9 @@ import (
"github.com/spf13/pflag"
. "github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db"
sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker"
"github.com/streamingfast/substreams/manifest"
)

// lastCursorFilename is the name of the file where the last cursor is stored, no extension as it's added by the store
Expand All @@ -36,14 +35,13 @@ var generateCsvCmd = Command(generateCsvE,
`),
ExactArgs(3),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags, sink.FlagIgnore("final-blocks-only"))
sink.AddFlagsToSet(flags)
AddCommonSinkerFlags(flags)
AddCommonDatabaseChangesFlags(flags)

flags.Uint64("bundle-size", 10000, "Size of output bundle, in blocks")
flags.String("working-dir", "./workdir", "Path to local folder used as working directory")
flags.String("output-dir", "./csv-output", "Path to local folder used as destination for CSV")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
flags.Uint64("buffer-max-size", 4*1024*1024, FlagDescription(`
Amount of memory bytes to allocate to the buffered writer. If your data set is small enough that every is hold in memory, we are going to avoid
the local I/O operation(s) and upload accumulated content in memory directly to final storage location.
Expand All @@ -62,51 +60,51 @@ var generateCsvCmd = Command(generateCsvE,
func generateCsvE(cmd *cobra.Command, args []string) error {
app := NewApplication(cmd.Context())

sink.RegisterMetrics()
sinker2.RegisterMetrics()

dsnString := args[0]
manifestPath := args[1]
blockRange := args[2]

// Parse block range and set flags to bridge with substreams/sink library
br, err := readBlockRangeArgument(blockRange)
if err != nil {
return fmt.Errorf("invalid block range %q: %w", blockRange, err)
}

// Bridge start-block flag with substreams/sink library
if br.StartBlock() > 0 {
if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil {
return fmt.Errorf("setting start-block flag: %w", err)
}
}
// Bridge stop-block flag with substreams/sink library
if br.EndBlock() != nil {
if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil {
return fmt.Errorf("setting stop-block flag: %w", err)
}
}

outputDir := sflags.MustGetString(cmd, "output-dir")
bundleSize := sflags.MustGetUint64(cmd, "bundle-size")
bufferMaxSize := sflags.MustGetUint64(cmd, "buffer-max-size")
workingDir := sflags.MustGetString(cmd, "working-dir")
cursorTableName := sflags.MustGetString(cmd, "cursors-table")
historyTableName := sflags.MustGetString(cmd, "history-table")

endpoint := sflags.MustGetString(cmd, "endpoint")
if endpoint == "" {
network := sflags.MustGetString(cmd, "network")
if network == "" {
reader, err := manifest.NewReader(manifestPath)
if err != nil {
return fmt.Errorf("setup manifest reader: %w", err)
}
pkgBundle, err := reader.Read()
if err != nil {
return fmt.Errorf("read manifest: %w", err)
}
network = pkgBundle.Package.Network
}
var err error
endpoint, err = manifest.ExtractNetworkEndpoint(network, sflags.MustGetString(cmd, "endpoint"), zlog)
if err != nil {
return err
}
// Bridge final-blocks-only flag with substreams/sink library (required for CSV generation)
if err := cmd.Flags().Set("final-blocks-only", "true"); err != nil {
return fmt.Errorf("setting final-blocks-only flag: %w", err)
}

sink, err := sink.NewFromViper(
cmd,
supportedOutputTypes,
endpoint,
manifestPath,
sink.InferOutputModuleFromPackage,
blockRange,
fmt.Sprintf("substreams-sink-sql/%s", version),
zlog,
tracer,
sink.WithFinalBlocksOnly(),
)
if err != nil {
return fmt.Errorf("new base sinker: %w", err)
Expand Down
59 changes: 27 additions & 32 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,23 @@ package main

import (
"fmt"
"strings"
"time"

"github.com/spf13/cobra"
"github.com/spf13/pflag"
. "github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
sinker2 "github.com/streamingfast/substreams-sink-sql/db_changes/sinker"
"github.com/streamingfast/substreams/manifest"
)

type ignoreUndoBufferSize struct{}

func (i ignoreUndoBufferSize) IsIgnored(in string) bool {
return in == "undo-buffer-size"
}

var sinkRunCmd = Command(sinkRunE,
"run <dsn> <manifest> [<start>:<stop>]",
"Runs SQL sink process",
RangeArgs(2, 3),
Flags(func(flags *pflag.FlagSet) {
sink.AddFlagsToSet(flags, ignoreUndoBufferSize{})
sink.AddFlagsToSet(flags, sink.FlagIgnore("undo-buffer-size"))
AddCommonSinkerFlags(flags)
AddCommonDatabaseChangesFlags(flags)

Expand All @@ -35,7 +29,6 @@ var sinkRunCmd = Command(sinkRunE,
flags.Int("flush-interval", 0, "(deprecated) please use --batch-block-flush-interval instead")
flags.Int("flush-retry-count", 3, "Number of retry attempts for flush operations")
flags.Duration("flush-retry-delay", 1*time.Second, "Base delay for incremental retry backoff on flush failures")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
}),
Example("substreams-sink-sql run 'postgres://localhost:5432/posgres?sslmode=disable' uniswap-v3@v0.2.10"),
OnCommandErrorLogAndExit(zlog),
Expand All @@ -44,44 +37,46 @@ var sinkRunCmd = Command(sinkRunE,
func sinkRunE(cmd *cobra.Command, args []string) error {
app := NewApplication(cmd.Context())

sink.RegisterMetrics()
sinker2.RegisterMetrics()

dsnString := args[0]
manifestPath := args[1]
blockRange := ""

// Handle third argument - can be either block range or module name
// For backward compatibility, if it contains ':', treat it as block range
if len(args) > 2 {
blockRange = args[2]
}

endpoint := sflags.MustGetString(cmd, "endpoint")
if endpoint == "" {
network := sflags.MustGetString(cmd, "network")
if network == "" {
reader, err := manifest.NewReader(manifestPath)
thirdArg := args[2]
// Check if it looks like a block range (contains ':')
if strings.Contains(thirdArg, ":") {
// Parse and set block range flags to bridge with substreams/sink library
br, err := readBlockRangeArgument(thirdArg)
if err != nil {
return fmt.Errorf("setup manifest reader: %w", err)
return fmt.Errorf("invalid block range %q: %w", thirdArg, err)
}
pkgBundle, err := reader.Read()
if err != nil {
return fmt.Errorf("read manifest: %w", err)

if br.StartBlock() > 0 {
if err := cmd.Flags().Set("start-block", fmt.Sprintf("%d", br.StartBlock())); err != nil {
return fmt.Errorf("setting start-block flag: %w", err)
}
}
network = pkgBundle.Package.Network
}
var err error
endpoint, err = manifest.ExtractNetworkEndpoint(network, sflags.MustGetString(cmd, "endpoint"), zlog)
if err != nil {
return err
if br.EndBlock() != nil {
if err := cmd.Flags().Set("stop-block", fmt.Sprintf("%d", *br.EndBlock())); err != nil {
return fmt.Errorf("setting stop-block flag: %w", err)
}
}
} else {
// Treat as module name (new behavior, for forward compatibility)
// Module name is handled via sink.InferOutputModuleFromPackage by default
// or can be overridden, but we'll just pass empty string to let it infer
}
}

sink, err := sink.NewFromViper(
cmd,
supportedOutputTypes,
endpoint,
manifestPath,
sink.InferOutputModuleFromPackage,
blockRange,
fmt.Sprintf("substreams-sink-sql/%s", version),
zlog,
tracer,
)
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"github.com/streamingfast/cli"
. "github.com/streamingfast/cli"
"github.com/streamingfast/cli/sflags"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
db2 "github.com/streamingfast/substreams-sink-sql/db_changes/db"
)

Expand Down
2 changes: 1 addition & 1 deletion db_changes/db/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"strings"

"github.com/lithammer/dedent"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion db_changes/db/dialect.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"database/sql"
"fmt"

sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
)

type UnknownDriverError struct {
Expand Down
2 changes: 1 addition & 1 deletion db_changes/db/dialect_clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
clickhouse "github.com/AfterShip/clickhouse-sql-parser/parser"
_ "github.com/ClickHouse/clickhouse-go/v2"
"github.com/streamingfast/cli"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down
2 changes: 1 addition & 1 deletion db_changes/db/dialect_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"time"

"github.com/streamingfast/cli"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
"go.uber.org/zap"
"golang.org/x/exp/maps"
)
Expand Down
2 changes: 1 addition & 1 deletion db_changes/db/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/streamingfast/logging/zapx"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
"go.uber.org/zap"
)

Expand Down
2 changes: 1 addition & 1 deletion db_changes/sinker/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"time"

"github.com/streamingfast/logging"
sink "github.com/streamingfast/substreams-sink"
sink "github.com/streamingfast/substreams/sink"
"github.com/streamingfast/substreams-sink-sql/db_changes/db"
"go.uber.org/zap"
)
Expand Down
Loading