importer: checkpoint SST metadata in distributed merge for retry correctness#164536
importer: checkpoint SST metadata in distributed merge for retry correctness#164536trunk-io[bot] merged 7 commits intocockroachdb:masterfrom
Conversation
|
😎 Merged successfully - details. |
|
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
mw5h
left a comment
There was a problem hiding this comment.
@mw5h partially reviewed 28 files and all commit messages, and made 7 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained.
pkg/sql/importer/import_processor.go line 190 at r6 (raw file):
// All SST manifests are now sent via the OnFlush callback synchronized // with flush events, eliminating the need for final manifest emission.
No need to reference what was here before. This sort of information belongs in the commit message.
pkg/sql/importer/import_processor.go line 225 at r6 (raw file):
} // SST metadata is now emitted via the progress channel (see lines 188-199)
No need to reference what was here before. This sort of information belongs in the commit message.
pkg/sql/importer/import_processor.go line 530 at r6 (raw file):
} type importProgressTracker struct {
There should be concise comments describing the purpose of each entry in the struct and any invariants we need to maintain.
pkg/sql/importer/import_processor.go line 543 at r6 (raw file):
unflushedRows []int64 unflushedFraction []float32 }
There should probably a unit test of the importProgressTracker.
pkg/sql/importer/import_processor_planning.go line 208 at r6 (raw file):
// manifestBuf accumulates SST file metadata as processors complete, following // the index backfiller pattern. It maintains a complete cumulative list of all // manifests rather than deltas, eliminating timing windows for orphaned SSTs.
We also talk about the timing window and orphaned SSTs below. Both comments are not necessary. This one seem extraneous.
pkg/sql/importer/import_processor_planning.go line 259 at r6 (raw file):
} // Initialize manifest buffer with checkpointed manifests from prior runs,
The comment above says the same thing.
pkg/sql/importer/import_processor_test.go line 799 at r6 (raw file):
require.Equal(t, int64(csv1.numRows)-resumePos, importSummary.Rows) // DEBUG: Show what rows are actually in the table
Remove this debugging code.
a4b2dcf to
bd73287
Compare
mw5h
left a comment
There was a problem hiding this comment.
@mw5h partially reviewed 28 files and all commit messages.
Reviewable status:complete! 0 of 0 LGTMs obtained.
spilchen
left a comment
There was a problem hiding this comment.
nice to see code reuse between import and index backfill
@spilchen partially reviewed 28 files and all commit messages, and made 8 comments.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on andrew-r-thomas, DrewKimball, jeffswenson, msbutler, and mw5h).
pkg/sql/importer/import_processor_planning.go line 273 at r12 (raw file):
The buffer deduplicates and
I couldn't see where the buffer deduplicates manifests. It seems like it's a straight up append.
pkg/sql/bulksst/sink.go line 1 at r12 (raw file):
// Copyright 2025 The Cockroach Authors.
super nit: 2026
pkg/sql/execinfrapb/data.proto line 337 at r12 (raw file):
// during distributed merge. This metadata is accumulated and persisted in // job progress for retry safety. repeated jobs.jobspb.BulkSSTManifest sst_metadata = 11 [(gogoproto.customname) = "SSTMetadata", (gogoproto.nullable) = false];
I think the pattern is to use the progress_details field instead of adding this. It's an Any field, so you put whatever struct in it. That's how the index backfiller flows progress back to the coord.
pkg/sql/importer/import_processor.go line 427 at r12 (raw file):
if err := pkSink.Flush(ctx); err != nil { if errors.HasType(err, (*kvserverbase.DuplicateKeyError)(nil)) { return errors.Wrapf(err, "duplicate key in primary index")
nit: will it always be a duplicate in the primary? For the distributed merge path, there is only one sink, which I guess handles both PK and secondary indexes? If so, the error message should be a bit more generic.
pkg/sql/importer/import_processor.go line 635 at r12 (raw file):
// When the PK adder flushes, everything written has been flushed, so we set // pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we
the old code had an optimization where the PK adder's SetOnFlush callback would also advance the index adder's flushed row position if the index adder's buffer was empty. I think this allowed the resume position to advance with PK flushes when the index adder was idle. Without this, the resume position could stay back, which could mean restarting from an early spot on retry for tables with large PKs and small secondary indexes. Is this a problem?
pkg/sql/importer/import_processor_planning.go line 447 at r12 (raw file):
var distributedMergeFilePrefix string if details.UseDistributedMerge { distributedMergeFilePrefix = bulkutil.NewDistMergePaths(job.ID()).MapPath()
nit: doesn't the file prefix stay the same for each loop iteration? We could compute this once outside the loop.
pkg/sql/importer/import_processor_test.go line 1012 at r12 (raw file):
// fall back to the original server factory (if provided). if es.Provider != cloudpb.ExternalStorageProvider_http { if len(originalFactory) != 1 && originalFactory[0] != nil {
was this suppose to be len(originalFactory) == 1? Otherwise, don't we risk an 'index out of range' error here?
| // up job-scoped files on completion or cancellation. | ||
| repeated string sst_storage_prefixes = 8 [(gogoproto.customname) = "SSTStoragePrefixes"]; | ||
|
|
||
| // CompletedMapWork stores SST file metadata from processors that have |
There was a problem hiding this comment.
I would strongly suggest storing this field in a transparently sharded job info key, via jobs.WriteChunkedFileToJobInfo api. It seems like this repeated proto field could get arbitrarily large. Big keys in the job system can cause all sorts of availability problems.
At some point, import should probably migrate other fields in this progress proto to info keys, but for now, this seems like a good opportunity to dip import's toes into the info key framework, as you don't need to worry about backwards compatibility here. cc @dt
There was a problem hiding this comment.
Good suggestion. For this PR the manifests are stored in the existing CompletedMapWork repeated field in import progress, matching the index backfiller pattern (which also uses a repeated proto field in SchemaChangeDetails). The list is bounded by the number of SST files produced in a single import attempt.
Filed #164675 to track migrating both import and index backfill to WriteChunkedFileToJobInfo.
There was a problem hiding this comment.
@msbutler let me know if this does or does not meet with your approval.
There was a problem hiding this comment.
i think the issue captures the problem, thanks! i imagine that if you did this work for 26.2, distributed merge wouldn't require a migration, as customers can't run it in 26.1?
|
@spilchen re: the This is now covered. I added The |
|
This is RFAL. |
mw5h
left a comment
There was a problem hiding this comment.
@mw5h partially reviewed 28 files and all commit messages, made 4 comments, and resolved 2 discussions.
Reviewable status:complete! 0 of 0 LGTMs obtained (waiting on andrew-r-thomas, DrewKimball, dt, jeffswenson, and spilchen).
pkg/sql/execinfrapb/data.proto line 337 at r12 (raw file):
Previously, spilchen wrote…
I think the pattern is to use the
progress_detailsfield instead of adding this. It's anAnyfield, so you put whatever struct in it. That's how the index backfiller flows progress back to the coord.
Done.
pkg/sql/importer/import_processor.go line 635 at r12 (raw file):
Previously, spilchen wrote…
the old code had an optimization where the PK adder's
SetOnFlushcallback would also advance the index adder's flushed row position if the index adder's buffer was empty. I think this allowed the resume position to advance with PK flushes when the index adder was idle. Without this, the resume position could stay back, which could mean restarting from an early spot on retry for tables with large PKs and small secondary indexes. Is this a problem?
Done.
pkg/sql/importer/import_processor_planning.go line 273 at r12 (raw file):
Previously, spilchen wrote…
The buffer deduplicates and
I couldn't see where the buffer deduplicates manifests. It seems like it's a straight up append.
Done.
pkg/sql/importer/import_processor_test.go line 1012 at r12 (raw file):
Previously, spilchen wrote…
was this suppose to be
len(originalFactory) == 1? Otherwise, don't we risk an 'index out of range' error here?
Done.
spilchen
left a comment
There was a problem hiding this comment.
@spilchen made 2 comments.
Reviewable status:complete! 1 of 0 LGTMs obtained (waiting on andrew-r-thomas, DrewKimball, dt, jeffswenson, and mw5h).
pkg/sql/importer/import_processor.go line 502 at r23 (raw file):
if err := sink.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil { if errors.HasType(err, (*kvserverbase.DuplicateKeyError)(nil)) { return errors.Wrapf(err, "duplicate key in %s", errTarget)
nit: will this be correct for the distributed merge path? I think it'll say "duplicate key in primary index" since there is only one sink. But could the duplicate be in the secondary?
mw5h
left a comment
There was a problem hiding this comment.
@mw5h partially reviewed 28 files and all commit messages, made 1 comment, and resolved 1 discussion.
Reviewable status:complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on andrew-r-thomas, DrewKimball, dt, jeffswenson, and spilchen).
pkg/sql/bulksst/sink.go line 1 at r12 (raw file):
Previously, spilchen wrote…
super nit: 2026
Done.
This reverts commit d5ca4f7. Informs: cockroachdb#161490
IndexBackfillSSTManifest was originally designed for index backfill operations but is now also used by IMPORT's distributed merge pipeline. The name is misleading since it suggests the type is specific to index backfill when it's actually a general-purpose SST manifest for bulk operations. This commit renames the type to BulkSSTManifest to better reflect its usage across both index backfill and import operations. The rename touches protobuf definitions and all references in backfill and import code. Release note: none Informs: cockroachdb#161490 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit extracts the indexBackfillSink interface from rowexec into a shared BulkSink interface in pkg/sql/bulksst. This refactoring prepares for code reuse between index backfill and IMPORT for distributed merge functionality. Before this change, the indexBackfillSink interface and bulkAdderIndexBackfillSink implementation were local to rowexec, preventing reuse in the importer package. Since both index backfill and IMPORT use the same bulk write patterns (BulkAdder ingestion vs SST file creation with manifest tracking), extracting a common interface eliminates the need for duplicate implementations. The new BulkSink interface in pkg/sql/bulksst defines the contract for bulk write destinations: - Add() enqueues KV pairs - Flush() persists buffered data - SetOnFlush() installs progress callbacks - ConsumeFlushManifests() retrieves SST metadata (for distributed merge) - Close() releases resources BulkAdderSink wraps kvserverbase.BulkAdder to provide the legacy ingestion path where SSTs are sent directly to KV via AddSSTable requests. sstIndexBackfillSink (in rowexec) provides the distributed merge path where SSTs are staged in external storage. This design allows both packages to use the same abstraction without circular dependencies, since both already import pkg/sql/bulksst. Changes: - Created pkg/sql/bulksst/sink.go defining BulkSink interface and BulkAdderSink implementation - Updated pkg/sql/rowexec to use bulksst.BulkSink instead of local indexBackfillSink interface - Removed duplicate bulkAdderIndexBackfillSink from rowexec - Updated sstIndexBackfillSink to implement bulksst.BulkSink - Updated tests to reference bulksst.BulkAdderSink Informs: cockroachdb#161490 Release note: None Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…sink Refactor collectNewManifests to use the shared bulksst.SSTFilesToManifests conversion function instead of inline manifest construction. Release Notes: None Informs: cockroachdb#161490
This commit moves sstIndexBackfillSink from pkg/sql/rowexec to pkg/sql/bulksst, renaming it to SSTSink. This enables code reuse between index backfill and IMPORT for distributed merge functionality. Before this change, sstIndexBackfillSink was implemented in rowexec and tightly coupled to execinfra.FlowCtx, preventing reuse in the importer package. Since both index backfill and IMPORT need identical SST sink functionality for distributed merge (writing SSTs to external storage, tracking manifests, and coordinating with OnFlush callbacks), moving the implementation to a shared location eliminates duplication. The new bulksst.SSTSink provides a general-purpose SST sink that: - Writes KV pairs to SST files in external storage - Tracks SST file metadata via manifests - Supports OnFlush callbacks for progress reporting - Handles duplicate detection for unique indexes The constructor bulksst.NewSSTSink takes dependencies directly rather than requiring execinfra.FlowCtx, making it usable from any package: - settings: cluster settings for SST batch configuration - externalStorageFromURI: factory for creating external storage - clock: for timestamping SST files - distributedMergeFilePrefix: base URI for SST file storage - writeAsOf: timestamp to write on all KVs - processorID: unique ID for per-processor subdirectories - checkDuplicates: whether to detect duplicate keys Tests for SSTSink were moved from indexbackfiller_sst_sink_test.go to bulksst/sink_test.go, and references in rowexec were updated to use the shared implementation. Changes: - Moved sstIndexBackfillSink to bulksst.SSTSink in pkg/sql/bulksst/sink.go - Added NewSSTSink constructor with dependency injection - Moved tests to pkg/sql/bulksst/sink_test.go - Updated rowexec/indexbackfiller.go to call bulksst.NewSSTSink - Updated rowexec/indexbackfiller_test.go to use bulksst.SSTSink type - Removed pkg/sql/rowexec/indexbackfiller_sst_sink.go - Removed pkg/sql/rowexec/indexbackfiller_sst_sink_test.go - Updated BUILD.bazel files in both packages Informs: cockroachdb#161490 Release note: None Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Rename the proto message to a more generic name so that it can be reused by import map-stage processors in addition to index backfill. Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
…ST metadata Refactor the import processor's ingestKvs to use the BulkSink interface directly rather than the intermediate ingestHelper abstraction. This removes legacyImportBulkAdder and mergeImportBulkAdder, replacing them with the shared BulkAdderSink and SSTSink implementations from bulksst. Introduce importProgressTracker to centralize progress tracking. Each registered BulkSink's OnFlush callback records flushed row positions and accumulates SST manifests, which are then sent via the progress channel rather than as row results. This ensures SST metadata is checkpointed incrementally in the job progress record, following the index backfiller pattern. On the coordinator side, an SSTManifestBuffer accumulates manifests from processor progress updates and persists complete snapshots in the job progress. On retry, previously checkpointed manifests are restored so already-written SSTs are not lost. Fixes: cockroachdb#161490 Release Notes: None
msbutler
left a comment
There was a problem hiding this comment.
sorry, had not realized you were waiting on me. lgtm!
PR cockroachdb#164536 moved SST metadata from the import processor's third output column to the progress metadata channel. PR cockroachdb#164977 updated the processor's own output type declaration (importProcessorOutputTypes) from 3 to 2 columns, but missed the separate output type declaration in import_processor_planning.go used to build the physical plan. This caused the receiver (gateway node) to set up a StreamDecoder expecting 3 columns via InputSyncSpec.ColumnTypes, while the producer sent 2-column rows. When the gateway decoded the row, it panicked with "index out of range [2] with length 2" in StreamDecoder.GetRow, crashing the node. Fix by using importProcessorOutputTypes (the single source of truth) in the flow plan instead of a separate local variable. Fixes: cockroachdb#165147 Fixes: cockroachdb#165149 Fixes: cockroachdb#165150 Fixes: cockroachdb#165151 Fixes: cockroachdb#165152 Fixes: cockroachdb#165155 Fixes: cockroachdb#165156 Fixes: cockroachdb#165157 Fixes: cockroachdb#165158 Epic: CRDB-48845 Release note: None
PR cockroachdb#164536 moved SST metadata from the import processor's third output column to the progress metadata channel. PR cockroachdb#164977 updated the processor's own output type declaration (importProcessorOutputTypes) from 3 to 2 columns, but missed the separate output type declaration in import_processor_planning.go used to build the physical plan. This caused the receiver (gateway node) to set up a StreamDecoder expecting 3 columns via InputSyncSpec.ColumnTypes, while the producer sent 2-column rows. When the gateway decoded the row, it panicked with "index out of range [2] with length 2" in StreamDecoder.GetRow, crashing the node. Fix by using importProcessorOutputTypes (the single source of truth) in the flow plan instead of a separate local variable. Fixes: cockroachdb#165147 Epic: CRDB-48845 Release note: None
Summary
IndexBackfillSSTManifesttoBulkSSTManifestto reflect shared usage across index backfill and import.BulkSinkinterface intopkg/sql/bulksstfor shared bulk write abstraction between index backfill and import.SSTFilesToManifestsconversion.sstIndexBackfillSinkfromrowexectobulksst.SSTSinkfor reuse by the import processor.BulkSinkdirectly, replacingingestHelper/legacyImportBulkAdder/mergeImportBulkAdder. IntroduceimportProgressTrackerwith OnFlush-based manifest accumulation and checkpoint SST metadata incrementally in job progress, fixing metadata loss on retry.Fixes: #161490