Skip to content

importer: checkpoint SST metadata in distributed merge for retry correctness#164536

Merged
trunk-io[bot] merged 7 commits intocockroachdb:masterfrom
mw5h:import-dist-cp
Mar 4, 2026
Merged

importer: checkpoint SST metadata in distributed merge for retry correctness#164536
trunk-io[bot] merged 7 commits intocockroachdb:masterfrom
mw5h:import-dist-cp

Conversation

@mw5h
Copy link
Contributor

@mw5h mw5h commented Feb 27, 2026

Summary

  • Revert entry count tracking that was superseded by the manifest-based approach.
  • Rename IndexBackfillSSTManifest to BulkSSTManifest to reflect shared usage across index backfill and import.
  • Extract BulkSink interface into pkg/sql/bulksst for shared bulk write abstraction between index backfill and import.
  • Refactor index backfill sink to use shared SSTFilesToManifests conversion.
  • Move sstIndexBackfillSink from rowexec to bulksst.SSTSink for reuse by the import processor.
  • Refactor import processor to use BulkSink directly, replacing ingestHelper/legacyImportBulkAdder/mergeImportBulkAdder. Introduce importProgressTracker with OnFlush-based manifest accumulation and checkpoint SST metadata incrementally in job progress, fixing metadata loss on retry.

Fixes: #161490

@trunk-io
Copy link
Contributor

trunk-io bot commented Feb 27, 2026

😎 Merged successfully - details.

@blathers-crl
Copy link

blathers-crl bot commented Feb 27, 2026

Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks.

🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf.

@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mw5h partially reviewed 28 files and all commit messages, and made 7 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained.


pkg/sql/importer/import_processor.go line 190 at r6 (raw file):

		// All SST manifests are now sent via the OnFlush callback synchronized
		// with flush events, eliminating the need for final manifest emission.

No need to reference what was here before. This sort of information belongs in the commit message.


pkg/sql/importer/import_processor.go line 225 at r6 (raw file):

	}

	// SST metadata is now emitted via the progress channel (see lines 188-199)

No need to reference what was here before. This sort of information belongs in the commit message.


pkg/sql/importer/import_processor.go line 530 at r6 (raw file):

}

type importProgressTracker struct {

There should be concise comments describing the purpose of each entry in the struct and any invariants we need to maintain.


pkg/sql/importer/import_processor.go line 543 at r6 (raw file):

	unflushedRows     []int64
	unflushedFraction []float32
}

There should probably a unit test of the importProgressTracker.


pkg/sql/importer/import_processor_planning.go line 208 at r6 (raw file):

	// manifestBuf accumulates SST file metadata as processors complete, following
	// the index backfiller pattern. It maintains a complete cumulative list of all
	// manifests rather than deltas, eliminating timing windows for orphaned SSTs.

We also talk about the timing window and orphaned SSTs below. Both comments are not necessary. This one seem extraneous.


pkg/sql/importer/import_processor_planning.go line 259 at r6 (raw file):

	}

	// Initialize manifest buffer with checkpointed manifests from prior runs,

The comment above says the same thing.


pkg/sql/importer/import_processor_test.go line 799 at r6 (raw file):

	require.Equal(t, int64(csv1.numRows)-resumePos, importSummary.Rows)

	// DEBUG: Show what rows are actually in the table

Remove this debugging code.

@mw5h mw5h force-pushed the import-dist-cp branch 2 times, most recently from a4b2dcf to bd73287 Compare February 27, 2026 23:35
Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mw5h partially reviewed 28 files and all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained.

@mw5h mw5h marked this pull request as ready for review February 28, 2026 00:50
@mw5h mw5h requested review from a team as code owners February 28, 2026 00:50
@mw5h mw5h requested review from DrewKimball, andrew-r-thomas, jeffswenson and spilchen and removed request for a team February 28, 2026 00:50
@msbutler msbutler self-requested a review February 28, 2026 15:18
Copy link
Contributor

@spilchen spilchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice to see code reuse between import and index backfill

@spilchen partially reviewed 28 files and all commit messages, and made 8 comments.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on andrew-r-thomas, DrewKimball, jeffswenson, msbutler, and mw5h).


pkg/sql/importer/import_processor_planning.go line 273 at r12 (raw file):

The buffer deduplicates and

I couldn't see where the buffer deduplicates manifests. It seems like it's a straight up append.


pkg/sql/bulksst/sink.go line 1 at r12 (raw file):

// Copyright 2025 The Cockroach Authors.

super nit: 2026


pkg/sql/execinfrapb/data.proto line 337 at r12 (raw file):

    // during distributed merge. This metadata is accumulated and persisted in
    // job progress for retry safety.
    repeated jobs.jobspb.BulkSSTManifest sst_metadata = 11 [(gogoproto.customname) = "SSTMetadata", (gogoproto.nullable) = false];

I think the pattern is to use the progress_details field instead of adding this. It's an Any field, so you put whatever struct in it. That's how the index backfiller flows progress back to the coord.


pkg/sql/importer/import_processor.go line 427 at r12 (raw file):

		if err := pkSink.Flush(ctx); err != nil {
			if errors.HasType(err, (*kvserverbase.DuplicateKeyError)(nil)) {
				return errors.Wrapf(err, "duplicate key in primary index")

nit: will it always be a duplicate in the primary? For the distributed merge path, there is only one sink, which I guess handles both PK and secondary indexes? If so, the error message should be a bit more generic.


pkg/sql/importer/import_processor.go line 635 at r12 (raw file):

		// When the PK adder flushes, everything written has been flushed, so we set
		// pkFlushedRow to writtenRow. Additionally if the indexAdder is empty then we

the old code had an optimization where the PK adder's SetOnFlush callback would also advance the index adder's flushed row position if the index adder's buffer was empty. I think this allowed the resume position to advance with PK flushes when the index adder was idle. Without this, the resume position could stay back, which could mean restarting from an early spot on retry for tables with large PKs and small secondary indexes. Is this a problem?


pkg/sql/importer/import_processor_planning.go line 447 at r12 (raw file):

			var distributedMergeFilePrefix string
			if details.UseDistributedMerge {
				distributedMergeFilePrefix = bulkutil.NewDistMergePaths(job.ID()).MapPath()

nit: doesn't the file prefix stay the same for each loop iteration? We could compute this once outside the loop.


pkg/sql/importer/import_processor_test.go line 1012 at r12 (raw file):

		// fall back to the original server factory (if provided).
		if es.Provider != cloudpb.ExternalStorageProvider_http {
			if len(originalFactory) != 1 && originalFactory[0] != nil {

was this suppose to be len(originalFactory) == 1? Otherwise, don't we risk an 'index out of range' error here?

// up job-scoped files on completion or cancellation.
repeated string sst_storage_prefixes = 8 [(gogoproto.customname) = "SSTStoragePrefixes"];

// CompletedMapWork stores SST file metadata from processors that have
Copy link
Collaborator

@msbutler msbutler Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would strongly suggest storing this field in a transparently sharded job info key, via jobs.WriteChunkedFileToJobInfo api. It seems like this repeated proto field could get arbitrarily large. Big keys in the job system can cause all sorts of availability problems.

At some point, import should probably migrate other fields in this progress proto to info keys, but for now, this seems like a good opportunity to dip import's toes into the info key framework, as you don't need to worry about backwards compatibility here. cc @dt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. For this PR the manifests are stored in the existing CompletedMapWork repeated field in import progress, matching the index backfiller pattern (which also uses a repeated proto field in SchemaChangeDetails). The list is bounded by the number of SST files produced in a single import attempt.

Filed #164675 to track migrating both import and index backfill to WriteChunkedFileToJobInfo.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@msbutler let me know if this does or does not meet with your approval.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think the issue captures the problem, thanks! i imagine that if you did this work for 26.2, distributed merge wouldn't require a migration, as customers can't run it in 26.1?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to be pushy @msbutler, but am I okay to push this pr with the understanding that we address #164675?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, sgtm

@mw5h mw5h force-pushed the import-dist-cp branch from bd73287 to 20a04e1 Compare March 2, 2026 18:40
@mw5h
Copy link
Contributor Author

mw5h commented Mar 2, 2026

@spilchen re: the SetOnFlush optimization where the PK flush advanced the index flushed row when the index adder was empty —

This is now covered. I added IsEmpty() to the BulkSink interface and use it at reporting time in formatProgress: when computing the min flushed position across sinks, if a sink's buffer is empty it uses the current unflushedRows position instead of its (possibly stale) flushedRows. This means an empty sink never holds back the resume position, matching the old behavior without coupling the sinks' OnFlush callbacks together.

The TestImportProgressTracker test now exercises this case explicitly.

@mw5h
Copy link
Contributor Author

mw5h commented Mar 2, 2026

This is RFAL.

Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mw5h partially reviewed 28 files and all commit messages, made 4 comments, and resolved 2 discussions.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on andrew-r-thomas, DrewKimball, dt, jeffswenson, and spilchen).


pkg/sql/execinfrapb/data.proto line 337 at r12 (raw file):

Previously, spilchen wrote…

I think the pattern is to use the progress_details field instead of adding this. It's an Any field, so you put whatever struct in it. That's how the index backfiller flows progress back to the coord.

Done.


pkg/sql/importer/import_processor.go line 635 at r12 (raw file):

Previously, spilchen wrote…

the old code had an optimization where the PK adder's SetOnFlush callback would also advance the index adder's flushed row position if the index adder's buffer was empty. I think this allowed the resume position to advance with PK flushes when the index adder was idle. Without this, the resume position could stay back, which could mean restarting from an early spot on retry for tables with large PKs and small secondary indexes. Is this a problem?

Done.


pkg/sql/importer/import_processor_planning.go line 273 at r12 (raw file):

Previously, spilchen wrote…

The buffer deduplicates and

I couldn't see where the buffer deduplicates manifests. It seems like it's a straight up append.

Done.


pkg/sql/importer/import_processor_test.go line 1012 at r12 (raw file):

Previously, spilchen wrote…

was this suppose to be len(originalFactory) == 1? Otherwise, don't we risk an 'index out of range' error here?

Done.

Copy link
Contributor

@spilchen spilchen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:lgtm:

@spilchen made 2 comments.
Reviewable status: :shipit: complete! 1 of 0 LGTMs obtained (waiting on andrew-r-thomas, DrewKimball, dt, jeffswenson, and mw5h).


pkg/sql/importer/import_processor.go line 502 at r23 (raw file):

				if err := sink.Add(ctx, kv.Key, kv.Value.RawBytes); err != nil {
					if errors.HasType(err, (*kvserverbase.DuplicateKeyError)(nil)) {
						return errors.Wrapf(err, "duplicate key in %s", errTarget)

nit: will this be correct for the distributed merge path? I think it'll say "duplicate key in primary index" since there is only one sink. But could the duplicate be in the secondary?

@mw5h mw5h force-pushed the import-dist-cp branch from 76cbd6a to c0485f5 Compare March 3, 2026 15:59
Copy link
Contributor Author

@mw5h mw5h left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mw5h partially reviewed 28 files and all commit messages, made 1 comment, and resolved 1 discussion.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (and 1 stale) (waiting on andrew-r-thomas, DrewKimball, dt, jeffswenson, and spilchen).


pkg/sql/bulksst/sink.go line 1 at r12 (raw file):

Previously, spilchen wrote…

super nit: 2026

Done.

mw5h and others added 7 commits March 3, 2026 15:09
IndexBackfillSSTManifest was originally designed for index backfill
operations but is now also used by IMPORT's distributed merge pipeline.
The name is misleading since it suggests the type is specific to index
backfill when it's actually a general-purpose SST manifest for bulk
operations.

This commit renames the type to BulkSSTManifest to better reflect its
usage across both index backfill and import operations. The rename
touches protobuf definitions and all references in backfill and import
code.

Release note: none

Informs: cockroachdb#161490

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit extracts the indexBackfillSink interface from rowexec into
a shared BulkSink interface in pkg/sql/bulksst. This refactoring
prepares for code reuse between index backfill and IMPORT for
distributed merge functionality.

Before this change, the indexBackfillSink interface and
bulkAdderIndexBackfillSink implementation were local to rowexec,
preventing reuse in the importer package. Since both index backfill and
IMPORT use the same bulk write patterns (BulkAdder ingestion vs SST
file creation with manifest tracking), extracting a common interface
eliminates the need for duplicate implementations.

The new BulkSink interface in pkg/sql/bulksst defines the contract for
bulk write destinations:
- Add() enqueues KV pairs
- Flush() persists buffered data
- SetOnFlush() installs progress callbacks
- ConsumeFlushManifests() retrieves SST metadata (for distributed merge)
- Close() releases resources

BulkAdderSink wraps kvserverbase.BulkAdder to provide the legacy
ingestion path where SSTs are sent directly to KV via AddSSTable
requests. sstIndexBackfillSink (in rowexec) provides the distributed
merge path where SSTs are staged in external storage.

This design allows both packages to use the same abstraction without
circular dependencies, since both already import pkg/sql/bulksst.

Changes:
- Created pkg/sql/bulksst/sink.go defining BulkSink interface and
  BulkAdderSink implementation
- Updated pkg/sql/rowexec to use bulksst.BulkSink instead of local
  indexBackfillSink interface
- Removed duplicate bulkAdderIndexBackfillSink from rowexec
- Updated sstIndexBackfillSink to implement bulksst.BulkSink
- Updated tests to reference bulksst.BulkAdderSink

Informs: cockroachdb#161490

Release note: None

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
…sink

Refactor collectNewManifests to use the shared bulksst.SSTFilesToManifests
conversion function instead of inline manifest construction.

Release Notes: None
Informs: cockroachdb#161490
This commit moves sstIndexBackfillSink from pkg/sql/rowexec to
pkg/sql/bulksst, renaming it to SSTSink. This enables code reuse
between index backfill and IMPORT for distributed merge functionality.

Before this change, sstIndexBackfillSink was implemented in rowexec and
tightly coupled to execinfra.FlowCtx, preventing reuse in the importer
package. Since both index backfill and IMPORT need identical SST sink
functionality for distributed merge (writing SSTs to external storage,
tracking manifests, and coordinating with OnFlush callbacks), moving
the implementation to a shared location eliminates duplication.

The new bulksst.SSTSink provides a general-purpose SST sink that:
- Writes KV pairs to SST files in external storage
- Tracks SST file metadata via manifests
- Supports OnFlush callbacks for progress reporting
- Handles duplicate detection for unique indexes

The constructor bulksst.NewSSTSink takes dependencies directly rather
than requiring execinfra.FlowCtx, making it usable from any package:
- settings: cluster settings for SST batch configuration
- externalStorageFromURI: factory for creating external storage
- clock: for timestamping SST files
- distributedMergeFilePrefix: base URI for SST file storage
- writeAsOf: timestamp to write on all KVs
- processorID: unique ID for per-processor subdirectories
- checkDuplicates: whether to detect duplicate keys

Tests for SSTSink were moved from indexbackfiller_sst_sink_test.go to
bulksst/sink_test.go, and references in rowexec were updated to use the
shared implementation.

Changes:
- Moved sstIndexBackfillSink to bulksst.SSTSink in pkg/sql/bulksst/sink.go
- Added NewSSTSink constructor with dependency injection
- Moved tests to pkg/sql/bulksst/sink_test.go
- Updated rowexec/indexbackfiller.go to call bulksst.NewSSTSink
- Updated rowexec/indexbackfiller_test.go to use bulksst.SSTSink type
- Removed pkg/sql/rowexec/indexbackfiller_sst_sink.go
- Removed pkg/sql/rowexec/indexbackfiller_sst_sink_test.go
- Updated BUILD.bazel files in both packages

Informs: cockroachdb#161490

Release note: None

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Rename the proto message to a more generic name so that it can be
reused by import map-stage processors in addition to index backfill.

Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
…ST metadata

Refactor the import processor's ingestKvs to use the BulkSink interface
directly rather than the intermediate ingestHelper abstraction. This removes
legacyImportBulkAdder and mergeImportBulkAdder, replacing them with the
shared BulkAdderSink and SSTSink implementations from bulksst.

Introduce importProgressTracker to centralize progress tracking. Each
registered BulkSink's OnFlush callback records flushed row positions and
accumulates SST manifests, which are then sent via the progress channel
rather than as row results. This ensures SST metadata is checkpointed
incrementally in the job progress record, following the index backfiller
pattern.

On the coordinator side, an SSTManifestBuffer accumulates manifests from
processor progress updates and persists complete snapshots in the job
progress. On retry, previously checkpointed manifests are restored so
already-written SSTs are not lost.

Fixes: cockroachdb#161490

Release Notes: None
@mw5h mw5h force-pushed the import-dist-cp branch from c0485f5 to 520e1cc Compare March 3, 2026 23:09
Copy link
Collaborator

@msbutler msbutler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, had not realized you were waiting on me. lgtm!

@trunk-io trunk-io bot merged commit fc52cb9 into cockroachdb:master Mar 4, 2026
42 of 43 checks passed
mw5h added a commit to mw5h/cockroach that referenced this pull request Mar 9, 2026
PR cockroachdb#164536 moved SST metadata from the import processor's third output
column to the progress metadata channel. PR cockroachdb#164977 updated the
processor's own output type declaration (importProcessorOutputTypes)
from 3 to 2 columns, but missed the separate output type declaration
in import_processor_planning.go used to build the physical plan.

This caused the receiver (gateway node) to set up a StreamDecoder
expecting 3 columns via InputSyncSpec.ColumnTypes, while the producer
sent 2-column rows. When the gateway decoded the row, it panicked with
"index out of range [2] with length 2" in StreamDecoder.GetRow,
crashing the node.

Fix by using importProcessorOutputTypes (the single source of truth)
in the flow plan instead of a separate local variable.

Fixes: cockroachdb#165147
Fixes: cockroachdb#165149
Fixes: cockroachdb#165150
Fixes: cockroachdb#165151
Fixes: cockroachdb#165152
Fixes: cockroachdb#165155
Fixes: cockroachdb#165156
Fixes: cockroachdb#165157
Fixes: cockroachdb#165158
Epic: CRDB-48845

Release note: None
mw5h added a commit to mw5h/cockroach that referenced this pull request Mar 9, 2026
PR cockroachdb#164536 moved SST metadata from the import processor's third output
column to the progress metadata channel. PR cockroachdb#164977 updated the
processor's own output type declaration (importProcessorOutputTypes)
from 3 to 2 columns, but missed the separate output type declaration
in import_processor_planning.go used to build the physical plan.

This caused the receiver (gateway node) to set up a StreamDecoder
expecting 3 columns via InputSyncSpec.ColumnTypes, while the producer
sent 2-column rows. When the gateway decoded the row, it panicked with
"index out of range [2] with length 2" in StreamDecoder.GetRow,
crashing the node.

Fix by using importProcessorOutputTypes (the single source of truth)
in the flow plan instead of a separate local variable.

Fixes: cockroachdb#165147
Epic: CRDB-48845

Release note: None
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

importer: distributed import not properly checkpointing state

4 participants