Skip to content

Commit a95d1dd

Browse files
craig[bot]spilchen
andcommitted
Merge #158449
158449: sql: add abstraction to index backfill for distributed merge r=spilchen a=spilchen This change introduces an abstraction layer in the index backfill infrastructure, enabling selection between the legacy bulk adder and a planned distributed merge mechanism. Currently, choosing the distributed merge mode results in an unimplemented error. However, supporting plumbing has been added, including: - A cluster setting override - Updated schema changer and new planner logic to persist the selected mode in BackfillerSpec - A new UseDistributedMergeSink flag in the processor, which inspects this mode at runtime Informs: #158378 Epic: CRDB-48845 Release note: None Co-authored-by: Matt Spilchen <matt.spilchen@cockroachlabs.com>
2 parents bf87c95 + 9ac9728 commit a95d1dd

File tree

8 files changed

+231
-16
lines changed

8 files changed

+231
-16
lines changed

pkg/sql/backfill.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1044,6 +1044,9 @@ func (sc *SchemaChanger) distIndexBackfill(
10441044
indexBatchSize := indexBackfillBatchSize.Get(&sc.execCfg.Settings.SV)
10451045
chunkSize := sc.getChunkSize(indexBatchSize)
10461046
spec := initIndexBackfillerSpec(*tableDesc.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize, addedIndexes, 0)
1047+
if err := maybeEnableDistributedMergeIndexBackfill(ctx, sc.execCfg.Settings, &spec); err != nil {
1048+
return err
1049+
}
10471050
p, err = sc.distSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, todoSpans)
10481051
return err
10491052
}); err != nil {

pkg/sql/distsql_plan_backfill.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@ import (
1010
"time"
1111
"unsafe"
1212

13+
"github.com/cockroachdb/cockroach/pkg/clusterversion"
1314
"github.com/cockroachdb/cockroach/pkg/roachpb"
1415
"github.com/cockroachdb/cockroach/pkg/settings"
16+
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
1517
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1618
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
1719
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
20+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
21+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
1822
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
1923
"github.com/cockroachdb/cockroach/pkg/sql/types"
2024
"github.com/cockroachdb/cockroach/pkg/util/hlc"
@@ -80,6 +84,26 @@ var initialSplitsPerProcessor = settings.RegisterIntSetting(
8084
settings.NonNegativeInt,
8185
)
8286

87+
var distributedMergeIndexBackfillEnabled = settings.RegisterBoolSetting(
88+
settings.ApplicationLevel,
89+
"bulkio.index_backfill.distributed_merge.enabled",
90+
"enable the distributed merge pipeline for index backfills",
91+
false,
92+
)
93+
94+
func maybeEnableDistributedMergeIndexBackfill(
95+
ctx context.Context, st *cluster.Settings, spec *execinfrapb.BackfillerSpec,
96+
) error {
97+
if !distributedMergeIndexBackfillEnabled.Get(&st.SV) {
98+
return nil
99+
}
100+
if !st.Version.IsActive(ctx, clusterversion.V26_1) {
101+
return pgerror.New(pgcode.FeatureNotSupported, "distributed merge requires cluster version 26.1")
102+
}
103+
spec.UseDistributedMergeSink = true
104+
return nil
105+
}
106+
83107
// createBackfiller generates a plan consisting of index/column backfiller
84108
// processors, one for each node that has spans that we are reading. The plan is
85109
// finalized.

pkg/sql/execinfrapb/processors_bulk_io.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ message BackfillerSpec {
8585
// the value of 0 is specified then the primary index will be used implicitly.
8686
optional uint32 source_index_id = 16 [(gogoproto.nullable) = false, (gogoproto.customname) = "SourceIndexID", (gogoproto.casttype) = "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb.IndexID"];
8787

88-
// NEXTID: 17.
88+
optional bool use_distributed_merge_sink = 17 [(gogoproto.nullable) = false];
89+
90+
// NEXTID: 18.
8991
}
9092

9193
// JobProgress identifies the job to report progress on. This reporting

pkg/sql/index_backfiller.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,9 @@ func (ib *IndexBackfillPlanner) plan(
211211
*td.TableDesc(), writeAsOf, writeAtRequestTimestamp, chunkSize,
212212
indexesToBackfill, sourceIndexID,
213213
)
214+
if err := maybeEnableDistributedMergeIndexBackfill(ctx, ib.execCfg.Settings, &spec); err != nil {
215+
return err
216+
}
214217
var err error
215218
p, err = ib.execCfg.DistSQLPlanner.createBackfillerPhysicalPlan(ctx, planCtx, spec, sourceSpans)
216219
return err

pkg/sql/logictest/testdata/logic_test/create_index

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,3 +705,30 @@ statement error pgcode 42P07 index with name \"t_index_name_conflicts_with_prima
705705
CREATE INDEX t_index_name_conflicts_with_primary_key_pkey ON t_index_name_conflicts_with_primary_key (a);
706706

707707
subtest end
708+
709+
# Temporary check to ensure the distributed merge setting is wired up. When we
710+
# implement distributed merge, we probably want to test this by reusing as many
711+
# existing tests as possible rather than strictly writing new ones.
712+
subtest distributed_merge_index_backfill_placeholder
713+
714+
statement ok
715+
CREATE TABLE dist_merge_idx (a INT PRIMARY KEY, b INT)
716+
717+
statement ok
718+
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.enabled = true
719+
720+
skipif config local-mixed-25.3 local-mixed-25.4
721+
statement error pq: .*index backfill distributed merge sink is not implemented yet.*
722+
CREATE INDEX dist_merge_idx_idx ON dist_merge_idx (b)
723+
724+
onlyif config local-mixed-25.3 local-mixed-25.4
725+
statement error pq: .*distributed merge requires cluster version 26.*
726+
CREATE INDEX dist_merge_idx_idx ON dist_merge_idx (b)
727+
728+
statement ok
729+
SET CLUSTER SETTING bulkio.index_backfill.distributed_merge.enabled = false
730+
731+
statement ok
732+
DROP TABLE dist_merge_idx
733+
734+
subtest end

pkg/sql/rowexec/BUILD.bazel

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ go_library(
102102
"//pkg/util/collatedstring",
103103
"//pkg/util/ctxgroup",
104104
"//pkg/util/encoding",
105+
"//pkg/util/errorutil/unimplemented",
105106
"//pkg/util/hlc",
106107
"//pkg/util/intsets",
107108
"//pkg/util/log",
@@ -173,6 +174,7 @@ go_test(
173174
"//pkg/kv/kvclient/rangecache",
174175
"//pkg/kv/kvpb",
175176
"//pkg/kv/kvserver",
177+
"//pkg/kv/kvserver/kvserverbase",
176178
"//pkg/multitenant/tenantcapabilitiespb",
177179
"//pkg/roachpb",
178180
"//pkg/security/securityassets",
@@ -190,6 +192,7 @@ go_test(
190192
"//pkg/sql/catalog/descs",
191193
"//pkg/sql/catalog/desctestutils",
192194
"//pkg/sql/catalog/fetchpb",
195+
"//pkg/sql/catalog/tabledesc",
193196
"//pkg/sql/execinfra",
194197
"//pkg/sql/execinfrapb",
195198
"//pkg/sql/execversion",

pkg/sql/rowexec/indexbackfiller.go

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/sql/types"
2626
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
2727
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
28+
"github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented"
2829
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2930
"github.com/cockroachdb/cockroach/pkg/util/log"
3031
"github.com/cockroachdb/cockroach/pkg/util/mon"
@@ -48,6 +49,8 @@ type indexBackfiller struct {
4849
processorID int32
4950

5051
filter backfill.MutationFilter
52+
53+
bulkAdderFactory indexBackfillBulkAdderFactory
5154
}
5255

5356
var _ execinfra.Processor = &indexBackfiller{}
@@ -86,6 +89,40 @@ var indexBackfillElasticCPUControlEnabled = settings.RegisterBoolSetting(
8689
false, // TODO(dt): enable this by default after more benchmarking.
8790
)
8891

92+
// indexBackfillSink abstracts the destination for index backfill output so the
93+
// ingestion pipeline can route built KVs either to the legacy BulkAdder path or
94+
// to future sinks (e.g. distributed-merge SST writers) without rewriting the
95+
// DistSQL processor. All sinks share the same Add/Flush/progress contract.
96+
type indexBackfillSink interface {
97+
// Add enqueues a single KV pair for eventual persistence in the sink-specific
98+
// backing store.
99+
Add(ctx context.Context, key roachpb.Key, value []byte) error
100+
// Flush forces any buffered state to be persisted.
101+
Flush(ctx context.Context) error
102+
// Close releases resources owned by the sink. Implementations should be
103+
// idempotent and safe to call even if Flush returns an error.
104+
Close(ctx context.Context)
105+
// SetOnFlush installs a callback that is invoked after the sink writes a
106+
// batch (mirrors kvserverbase.BulkAdder semantics so existing progress
107+
// plumbing can be reused).
108+
SetOnFlush(func(summary kvpb.BulkOpSummary))
109+
}
110+
111+
// indexBackfillBulkAdderFactory mirrors kvserverbase.BulkAdderFactory but is
112+
// injected so tests can swap in fakes and future sinks can reuse the backfiller
113+
// without referencing execinfra.ServerConfig directly.
114+
type indexBackfillBulkAdderFactory func(
115+
ctx context.Context, writeAsOf hlc.Timestamp, opts kvserverbase.BulkAdderOptions,
116+
) (kvserverbase.BulkAdder, error)
117+
118+
// bulkAdderIndexBackfillSink is the default sink implementation backed by
119+
// kvserverbase.BulkAdder.
120+
type bulkAdderIndexBackfillSink struct {
121+
kvserverbase.BulkAdder
122+
}
123+
124+
var _ indexBackfillSink = (*bulkAdderIndexBackfillSink)(nil)
125+
89126
func newIndexBackfiller(
90127
ctx context.Context,
91128
flowCtx *execinfra.FlowCtx,
@@ -101,6 +138,11 @@ func newIndexBackfiller(
101138
flowCtx: flowCtx,
102139
processorID: processorID,
103140
filter: backfill.IndexMutationFilter,
141+
bulkAdderFactory: func(
142+
ctx context.Context, writeAsOf hlc.Timestamp, opts kvserverbase.BulkAdderOptions,
143+
) (kvserverbase.BulkAdder, error) {
144+
return flowCtx.Cfg.BulkAdder(ctx, flowCtx.Cfg.DB.KV(), writeAsOf, opts)
145+
},
104146
}
105147

106148
if err := ib.IndexBackfiller.InitForDistributedUse(
@@ -257,15 +299,16 @@ func (ib *indexBackfiller) maybeReencodeAndWriteVectorIndexEntry(
257299
return true, nil
258300
}
259301

260-
// ingestIndexEntries adds the batches of built index entries to the buffering
261-
// adder and reports progress back to the coordinator node.
262-
func (ib *indexBackfiller) ingestIndexEntries(
263-
ctx context.Context,
264-
indexEntryCh <-chan indexEntryBatch,
265-
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
266-
) error {
267-
ctx, span := tracing.ChildSpan(ctx, "ingestIndexEntries")
268-
defer span.Finish()
302+
// makeIndexBackfillSink materializes whatever sink the current backfill should
303+
// use (legacy BulkAdder or distributed-merge sink). The choice is driven by
304+
// execinfrapb.BackfillerSpec.
305+
func (ib *indexBackfiller) makeIndexBackfillSink(ctx context.Context) (indexBackfillSink, error) {
306+
if ib.spec.UseDistributedMergeSink {
307+
return nil, unimplemented.New(
308+
"distributed merge index backfill sink",
309+
"index backfill distributed merge sink is not implemented yet",
310+
)
311+
}
269312

270313
minBufferSize := backfillerBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV)
271314
maxBufferSize := func() int64 { return backfillerMaxBufferSize.Get(&ib.flowCtx.Cfg.Settings.SV) }
@@ -278,11 +321,33 @@ func (ib *indexBackfiller) ingestIndexEntries(
278321
InitialSplitsIfUnordered: int(ib.spec.InitialSplits),
279322
WriteAtBatchTimestamp: ib.spec.WriteAtBatchTimestamp,
280323
}
281-
adder, err := ib.flowCtx.Cfg.BulkAdder(ctx, ib.flowCtx.Cfg.DB.KV(), ib.spec.WriteAsOf, opts)
324+
325+
adderFactory := ib.bulkAdderFactory
326+
if adderFactory == nil {
327+
return nil, errors.AssertionFailedf("index backfiller bulk adder factory must be configured")
328+
}
329+
adder, err := adderFactory(ctx, ib.spec.WriteAsOf, opts)
330+
if err != nil {
331+
return nil, err
332+
}
333+
return &bulkAdderIndexBackfillSink{BulkAdder: adder}, nil
334+
}
335+
336+
// ingestIndexEntries adds the batches of built index entries to the buffering
337+
// adder and reports progress back to the coordinator node.
338+
func (ib *indexBackfiller) ingestIndexEntries(
339+
ctx context.Context,
340+
indexEntryCh <-chan indexEntryBatch,
341+
progCh chan execinfrapb.RemoteProducerMetadata_BulkProcessorProgress,
342+
) error {
343+
ctx, span := tracing.ChildSpan(ctx, "ingestIndexEntries")
344+
defer span.Finish()
345+
346+
sink, err := ib.makeIndexBackfillSink(ctx)
282347
if err != nil {
283348
return err
284349
}
285-
defer adder.Close(ctx)
350+
defer sink.Close(ctx)
286351

287352
// Synchronizes read and write access on completedSpans which is updated on a
288353
// BulkAdder flush, but is read when progress is being sent back to the
@@ -302,7 +367,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
302367
mu.completedSpans = append(mu.completedSpans, mu.addedSpans...)
303368
mu.addedSpans = nil
304369
}
305-
adder.SetOnFlush(flushAddedSpans)
370+
sink.SetOnFlush(flushAddedSpans)
306371

307372
pushProgress := func() {
308373
mu.Lock()
@@ -365,7 +430,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
365430
}
366431
}
367432

368-
if err := adder.Add(ctx, indexEntry.Key, indexEntry.Value.RawBytes); err != nil {
433+
if err := sink.Add(ctx, indexEntry.Key, indexEntry.Value.RawBytes); err != nil {
369434
return ib.wrapDupError(ctx, err)
370435
}
371436
}
@@ -391,7 +456,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
391456

392457
knobs := &ib.flowCtx.Cfg.TestingKnobs
393458
if knobs.BulkAdderFlushesEveryBatch {
394-
if err := adder.Flush(ctx); err != nil {
459+
if err := sink.Flush(ctx); err != nil {
395460
return ib.wrapDupError(ctx, err)
396461
}
397462
pushProgress()
@@ -412,7 +477,7 @@ func (ib *indexBackfiller) ingestIndexEntries(
412477
if err := g.Wait(); err != nil {
413478
return err
414479
}
415-
if err := adder.Flush(ctx); err != nil {
480+
if err := sink.Flush(ctx); err != nil {
416481
return ib.wrapDupError(ctx, err)
417482
}
418483

0 commit comments

Comments
 (0)