-
Notifications
You must be signed in to change notification settings - Fork 4.1k
importer: checkpoint SST metadata in distributed merge for retry correctness #164536
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
7 commits
Select commit
Hold shift + click to select a range
0fb03ec
Revert "importer: track entry counts in distributed merge import path"
mw5h 7e68d89
sql: rename IndexBackfillSSTManifest to BulkSSTManifest
mw5h 0a4f9f6
bulksst: introduce BulkSink interface for shared bulk write abstraction
mw5h a81f846
rowexec: use shared SSTFilesToManifests conversion in index backfill …
mw5h 4efc126
bulksst: move SST sink implementation to shared package
mw5h aee4c63
execinfrapb: rename IndexBackfillMapProgress to BulkMapProgress
mw5h 520e1cc
importer: refactor distributed merge to use BulkSink and checkpoint S…
mw5h File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,84 @@ | ||
| // Copyright 2026 The Cockroach Authors. | ||
| // | ||
| // Use of this software is governed by the CockroachDB Software License | ||
| // included in the /LICENSE file. | ||
|
|
||
| package bulksst | ||
|
|
||
| import ( | ||
| "github.com/cockroachdb/cockroach/pkg/jobs/jobspb" | ||
| "github.com/cockroachdb/cockroach/pkg/roachpb" | ||
| "github.com/cockroachdb/cockroach/pkg/util/hlc" | ||
| ) | ||
|
|
||
| // SSTFileToManifest converts a single bulksst.SSTFileInfo to a | ||
| // jobspb.BulkSSTManifest for persistence in job progress. | ||
| // | ||
| // The writeTS parameter is optional (can be nil). When non-nil, it's stored in | ||
| // the manifest's WriteTimestamp field. This is used by index backfill but not | ||
| // by import operations. | ||
| func SSTFileToManifest(f *SSTFileInfo, writeTS *hlc.Timestamp) jobspb.BulkSSTManifest { | ||
| span := &roachpb.Span{ | ||
| Key: append(roachpb.Key(nil), f.StartKey...), | ||
| EndKey: append(roachpb.Key(nil), f.EndKey...), | ||
| } | ||
| manifest := jobspb.BulkSSTManifest{ | ||
| URI: f.URI, | ||
| Span: span, | ||
| FileSize: f.FileSize, | ||
| RowSample: append(roachpb.Key(nil), f.RowSample...), | ||
| KeyCount: f.KeyCount, | ||
| } | ||
| if writeTS != nil { | ||
| manifest.WriteTimestamp = writeTS | ||
| } | ||
| return manifest | ||
| } | ||
|
|
||
| // SSTFilesToManifests converts a slice of bulksst.SSTFileInfo to | ||
| // jobspb.BulkSSTManifest entries for persistence. | ||
| // | ||
| // This is used by both import and index backfill operations to convert their | ||
| // in-memory SST file lists to the format stored in job progress. The writeTS | ||
| // parameter is optional and only used by index backfill. | ||
| func SSTFilesToManifests(files *SSTFiles, writeTS *hlc.Timestamp) []jobspb.BulkSSTManifest { | ||
| if files == nil || len(files.SST) == 0 { | ||
| return nil | ||
| } | ||
|
|
||
| manifests := make([]jobspb.BulkSSTManifest, 0, len(files.SST)) | ||
| for _, f := range files.SST { | ||
| manifests = append(manifests, SSTFileToManifest(f, writeTS)) | ||
| } | ||
| return manifests | ||
| } | ||
|
|
||
| // ManifestsToSSTFiles converts jobspb.BulkSSTManifest entries back to | ||
| // bulksst.SSTFiles. This is the inverse of SSTFilesToManifests. | ||
| // | ||
| // This is used when restoring SST metadata from job progress during retry or | ||
| // resume operations. | ||
| func ManifestsToSSTFiles(manifests []jobspb.BulkSSTManifest) SSTFiles { | ||
| if len(manifests) == 0 { | ||
| return SSTFiles{} | ||
| } | ||
|
|
||
| result := SSTFiles{ | ||
| SST: make([]*SSTFileInfo, 0, len(manifests)), | ||
| } | ||
|
|
||
| for _, manifest := range manifests { | ||
| fileInfo := &SSTFileInfo{ | ||
| URI: manifest.URI, | ||
| StartKey: append(roachpb.Key(nil), manifest.Span.Key...), | ||
| EndKey: append(roachpb.Key(nil), manifest.Span.EndKey...), | ||
| FileSize: manifest.FileSize, | ||
| RowSample: append(roachpb.Key(nil), manifest.RowSample...), | ||
| KeyCount: manifest.KeyCount, | ||
| } | ||
| result.SST = append(result.SST, fileInfo) | ||
| result.TotalSize += manifest.FileSize | ||
| } | ||
|
|
||
| return result | ||
| } |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would strongly suggest storing this field in a transparently sharded job info key, via
jobs.WriteChunkedFileToJobInfoapi. It seems like this repeated proto field could get arbitrarily large. Big keys in the job system can cause all sorts of availability problems.At some point, import should probably migrate other fields in this progress proto to info keys, but for now, this seems like a good opportunity to dip import's toes into the info key framework, as you don't need to worry about backwards compatibility here. cc @dt
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. For this PR the manifests are stored in the existing
CompletedMapWorkrepeated field in import progress, matching the index backfiller pattern (which also uses a repeated proto field inSchemaChangeDetails). The list is bounded by the number of SST files produced in a single import attempt.Filed #164675 to track migrating both import and index backfill to
WriteChunkedFileToJobInfo.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@msbutler let me know if this does or does not meet with your approval.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think the issue captures the problem, thanks! i imagine that if you did this work for 26.2, distributed merge wouldn't require a migration, as customers can't run it in 26.1?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's correct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry to be pushy @msbutler, but am I okay to push this pr with the understanding that we address #164675?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah, sgtm