Skip to content

Commit 12106db

Browse files
craig[bot]mw5h
andcommitted
Merge #158655
158655: bulkmerge: implement task distribution and loopback coordination r=mw5h a=mw5h Previously, the bulkmerge processors had only a skeleton implementation. This change implements the core coordination logic: 1. Task Distribution: The merge coordinator now maintains a task set and distributes tasks across worker SQL instances using a loopback channel. As workers complete tasks, they are dynamically assigned new tasks until all tasks are processed. 2. Loopback Communication: Implements a loopback processor that receives task assignments from the coordinator and routes them back through the merge processors. Uses a flow-scoped channel map to enable communication between the coordinator and loopback processors. 3. Result Aggregation: The coordinator collects SST outputs from all processors and aggregates them into a final result proto. 4. Testing: Adds comprehensive tests for both single-node and multi-node scenarios, verifying that all tasks are processed and distributed across available instances. The implementation uses protobuf marshaling for SST output and includes proper error handling throughout the pipeline. Epic: None 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-authored-by: Matt White <matt.white@cockroachlabs.com>
2 parents 9913b68 + 49f6580 commit 12106db

File tree

6 files changed

+384
-53
lines changed

6 files changed

+384
-53
lines changed

pkg/sql/bulkmerge/BUILD.bazel

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,9 @@ go_library(
2525
"//pkg/sql/rowexec",
2626
"//pkg/sql/sem/tree",
2727
"//pkg/sql/types",
28+
"//pkg/util/protoutil",
29+
"//pkg/util/syncutil",
30+
"//pkg/util/taskset",
2831
"@com_github_cockroachdb_errors//:errors",
2932
],
3033
)
@@ -55,6 +58,7 @@ go_test(
5558
"//pkg/testutils/testcluster",
5659
"//pkg/util/leaktest",
5760
"//pkg/util/log",
61+
"//pkg/util/protoutil",
5862
"//pkg/util/randutil",
5963
"@com_github_stretchr_testify//require",
6064
],

pkg/sql/bulkmerge/merge_coordinator.go

Lines changed: 135 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
1515
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1616
"github.com/cockroachdb/cockroach/pkg/sql/types"
17+
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
18+
"github.com/cockroachdb/cockroach/pkg/util/taskset"
1719
"github.com/cockroachdb/errors"
1820
)
1921

@@ -31,34 +33,152 @@ var mergeCoordinatorOutputTypes = []*types.T{
3133

3234
type mergeCoordinator struct {
3335
execinfra.ProcessorBase
36+
3437
input execinfra.RowSource
38+
spec execinfrapb.MergeCoordinatorSpec
39+
tasks taskset.TaskSet
40+
41+
loopback chan rowenc.EncDatumRow
42+
cleanup func()
43+
44+
done bool
45+
results execinfrapb.BulkMergeSpec_Output
46+
}
47+
48+
type mergeCoordinatorInput struct {
49+
sqlInstanceID string
50+
taskID taskset.TaskID
51+
outputSSTs []execinfrapb.BulkMergeSpec_SST
52+
}
53+
54+
// parseCoordinatorInput ensures each column has the correct type and unmarshals
55+
// the output SSTs.
56+
func parseCoordinatorInput(row rowenc.EncDatumRow) (mergeCoordinatorInput, error) {
57+
if len(row) != 3 {
58+
return mergeCoordinatorInput{}, errors.Newf("expected 3 columns, got %d", len(row))
59+
}
60+
if err := row[0].EnsureDecoded(types.Bytes, nil); err != nil {
61+
return mergeCoordinatorInput{}, err
62+
}
63+
sqlInstanceID, ok := row[0].Datum.(*tree.DBytes)
64+
if !ok {
65+
return mergeCoordinatorInput{},
66+
errors.Newf("expected bytes column for sqlInstanceID, got %s", row[0].Datum.String())
67+
}
68+
if err := row[1].EnsureDecoded(types.Int4, nil); err != nil {
69+
return mergeCoordinatorInput{}, err
70+
}
71+
taskID, ok := row[1].Datum.(*tree.DInt)
72+
if !ok {
73+
return mergeCoordinatorInput{},
74+
errors.Newf("expected int4 column for taskID, got %s", row[1].Datum.String())
75+
}
76+
if err := row[2].EnsureDecoded(types.Bytes, nil); err != nil {
77+
return mergeCoordinatorInput{}, err
78+
}
79+
outputBytes, ok := row[2].Datum.(*tree.DBytes)
80+
if !ok {
81+
return mergeCoordinatorInput{},
82+
errors.Newf("expected bytes column for outputSSTs, got %s", row[2].Datum.String())
83+
}
84+
results := execinfrapb.BulkMergeSpec_Output{}
85+
if err := protoutil.Unmarshal([]byte(*outputBytes), &results); err != nil {
86+
return mergeCoordinatorInput{}, err
87+
}
88+
return mergeCoordinatorInput{
89+
sqlInstanceID: string(*sqlInstanceID),
90+
taskID: taskset.TaskID(*taskID),
91+
outputSSTs: results.SSTs,
92+
}, nil
3593
}
3694

3795
// Next implements execinfra.RowSource.
3896
func (m *mergeCoordinator) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
3997
for m.State == execinfra.StateRunning {
4098
row, meta := m.input.Next()
4199
switch {
42-
case row == nil && meta == nil:
43-
m.MoveToDraining(nil /* err */)
44-
case meta != nil && meta.Err != nil:
100+
case row != nil:
101+
err := m.handleRow(row)
102+
if err != nil {
103+
m.MoveToDraining(err)
104+
}
105+
case meta == nil:
106+
if m.done {
107+
m.MoveToDraining(nil /* err */)
108+
break
109+
}
110+
m.done = true
111+
return m.emitResults()
112+
case meta.Err != nil:
45113
m.MoveToDraining(meta.Err)
46-
case meta != nil:
114+
default:
47115
m.MoveToDraining(errors.Newf("unexpected meta: %v", meta))
48-
case row != nil:
49-
base := *row[2].Datum.(*tree.DBytes)
50-
return rowenc.EncDatumRow{
51-
rowenc.EncDatum{Datum: tree.NewDBytes(base + "->coordinator")},
52-
}, nil
53116
}
54117
}
55118
return nil, m.DrainHelper()
56119
}
57120

121+
func (m *mergeCoordinator) emitResults() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
122+
marshaled, err := protoutil.Marshal(&m.results)
123+
if err != nil {
124+
m.MoveToDraining(errors.Wrap(err, "failed to marshal results"))
125+
return nil, m.DrainHelper()
126+
}
127+
return rowenc.EncDatumRow{
128+
rowenc.EncDatum{Datum: tree.NewDBytes(tree.DBytes(marshaled))},
129+
}, nil
130+
}
131+
132+
func (m *mergeCoordinator) publishInitialTasks() {
133+
for _, sqlInstanceID := range m.spec.WorkerSqlInstanceIds {
134+
taskID := m.tasks.ClaimFirst()
135+
if taskID.IsDone() {
136+
m.closeLoopback()
137+
return
138+
}
139+
m.loopback <- rowenc.EncDatumRow{
140+
rowenc.EncDatum{Datum: tree.NewDBytes(tree.DBytes(sqlInstanceID))},
141+
rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(taskID))},
142+
}
143+
}
144+
}
145+
146+
func (m *mergeCoordinator) closeLoopback() {
147+
if m.cleanup != nil {
148+
m.cleanup()
149+
m.cleanup = nil
150+
}
151+
}
152+
153+
// handleRow accepts a row output by the merge processor, marks its task as
154+
// complete
155+
func (m *mergeCoordinator) handleRow(row rowenc.EncDatumRow) error {
156+
input, err := parseCoordinatorInput(row)
157+
if err != nil {
158+
return err
159+
}
160+
161+
m.results.SSTs = append(m.results.SSTs, input.outputSSTs...)
162+
163+
next := m.tasks.ClaimNext(input.taskID)
164+
if next.IsDone() {
165+
m.closeLoopback()
166+
return nil
167+
}
168+
169+
m.loopback <- rowenc.EncDatumRow{
170+
rowenc.EncDatum{Datum: tree.NewDBytes(tree.DBytes(input.sqlInstanceID))},
171+
rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(next))},
172+
}
173+
174+
return nil
175+
}
176+
58177
// Start implements execinfra.RowSource.
59178
func (m *mergeCoordinator) Start(ctx context.Context) {
60179
m.StartInternal(ctx, "mergeCoordinator")
61180
m.input.Start(ctx)
181+
m.publishInitialTasks()
62182
}
63183

64184
func init() {
@@ -70,8 +190,13 @@ func init() {
70190
postSpec *execinfrapb.PostProcessSpec,
71191
input execinfra.RowSource,
72192
) (execinfra.Processor, error) {
193+
channel, cleanup := loopback.create(flow)
73194
mc := &mergeCoordinator{
74-
input: input,
195+
input: input,
196+
tasks: taskset.MakeTaskSet(spec.TaskCount, int64(len(spec.WorkerSqlInstanceIds))),
197+
loopback: channel,
198+
cleanup: cleanup,
199+
spec: spec,
75200
}
76201
err := mc.Init(
77202
ctx, mc, postSpec, mergeCoordinatorOutputTypes, flow, flowID, nil,

pkg/sql/bulkmerge/merge_loopback.go

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,48 @@ import (
1010

1111
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
1212
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
13-
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
1413
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
1514
"github.com/cockroachdb/cockroach/pkg/sql/rowexec"
16-
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
1715
"github.com/cockroachdb/cockroach/pkg/sql/types"
16+
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
17+
"github.com/cockroachdb/errors"
1818
)
1919

20+
// loopbackMap allows the mergeLoopback processor to communicate with the merge
21+
// coordinator by mapping flow IDs to channels.
22+
type loopbackMap struct {
23+
syncutil.Mutex
24+
loopback map[execinfrapb.FlowID]chan rowenc.EncDatumRow
25+
}
26+
27+
var loopback = &loopbackMap{
28+
loopback: make(map[execinfrapb.FlowID]chan rowenc.EncDatumRow),
29+
}
30+
31+
// get returns the channel for the given id if it exists.
32+
func (l *loopbackMap) get(flowCtx *execinfra.FlowCtx) (chan rowenc.EncDatumRow, bool) {
33+
l.Lock()
34+
defer l.Unlock()
35+
id := flowCtx.ID
36+
channel, ok := l.loopback[id]
37+
return channel, ok
38+
}
39+
40+
// create returns a channel for the given id and a function to close it.
41+
func (l *loopbackMap) create(flowCtx *execinfra.FlowCtx) (chan rowenc.EncDatumRow, func()) {
42+
l.Lock()
43+
defer l.Unlock()
44+
id := flowCtx.ID
45+
ch := make(chan rowenc.EncDatumRow)
46+
l.loopback[id] = ch
47+
return ch, func() {
48+
l.Lock()
49+
defer l.Unlock()
50+
delete(l.loopback, id)
51+
close(ch)
52+
}
53+
}
54+
2055
var (
2156
_ execinfra.Processor = &mergeLoopback{}
2257
_ execinfra.RowSource = &mergeLoopback{}
@@ -32,29 +67,47 @@ var mergeLoopbackOutputTypes = []*types.T{
3267

3368
type mergeLoopback struct {
3469
execinfra.ProcessorBase
35-
done bool
70+
loopback chan rowenc.EncDatumRow
3671
}
3772

3873
// Next implements execinfra.RowSource.
3974
func (m *mergeLoopback) Next() (rowenc.EncDatumRow, *execinfrapb.ProducerMetadata) {
40-
if m.done {
41-
m.MoveToDraining(nil)
42-
return nil, m.DrainHelper()
75+
// Read from the loopback channel until it's closed
76+
if m.State == execinfra.StateRunning {
77+
row, ok := <-m.loopback
78+
if !ok {
79+
m.MoveToDraining(nil)
80+
return nil, m.DrainHelper()
81+
}
82+
return row, nil
4383
}
44-
m.done = true
45-
// Generate a routing key for the current SQL instance (where this processor is running).
46-
// This ensures the routing key matches one of the spans in the range router.
47-
routingDatum, _ := physicalplan.RoutingDatumsForSQLInstance(m.FlowCtx.NodeID.SQLInstanceID())
48-
return rowenc.EncDatumRow{
49-
routingDatum,
50-
rowenc.EncDatum{Datum: tree.NewDInt(1)},
51-
}, nil
84+
return nil, m.DrainHelper()
5285
}
5386

5487
// Start implements execinfra.RowSource.
5588
func (m *mergeLoopback) Start(ctx context.Context) {
5689
m.StartInternal(ctx, "mergeLoopback")
57-
// TODO(jeffswenson): create the initial set of tasks
90+
var ok bool
91+
m.loopback, ok = loopback.get(m.FlowCtx)
92+
if !ok {
93+
m.MoveToDraining(errors.New("loopback channel not found"))
94+
return
95+
}
96+
}
97+
98+
func (m *mergeLoopback) DrainHelper() *execinfrapb.ProducerMetadata {
99+
// First drain any inputs coming back from the coordinator.
100+
for again := true; again; {
101+
select {
102+
case _, ok := <-m.loopback:
103+
if ok {
104+
continue
105+
}
106+
default:
107+
}
108+
again = false
109+
}
110+
return m.ProcessorBase.DrainHelper()
58111
}
59112

60113
func init() {

pkg/sql/bulkmerge/merge_planning.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ import (
1616
)
1717

1818
func newBulkMergePlan(
19-
ctx context.Context, execCtx sql.JobExecContext,
19+
ctx context.Context, execCtx sql.JobExecContext, taskCount int,
2020
) (*sql.PhysicalPlan, *sql.PlanningCtx, error) {
2121
// NOTE: This implementation is inspired by the physical plan created by
2222
// restore in `pkg/backup/restore_processor_planning.go`
@@ -32,6 +32,11 @@ func newBulkMergePlan(
3232
// Use the gateway node as the coordinator, which is where the job was initiated.
3333
coordinatorID := plan.GatewaySQLInstanceID
3434

35+
keys := make([][]byte, 0, len(sqlInstanceIDs))
36+
for _, id := range sqlInstanceIDs {
37+
keys = append(keys, physicalplan.RoutingKeyForSQLInstance(id))
38+
}
39+
3540
router, err := physicalplan.MakeInstanceRouter(sqlInstanceIDs)
3641
if err != nil {
3742
return nil, nil, errors.Wrap(err, "unable to make instance router")
@@ -85,7 +90,8 @@ func newBulkMergePlan(
8590

8691
plan.AddSingleGroupStage(ctx, coordinatorID, execinfrapb.ProcessorCoreUnion{
8792
MergeCoordinator: &execinfrapb.MergeCoordinatorSpec{
88-
// TODO fill in the rest of the spec
93+
TaskCount: int64(taskCount),
94+
WorkerSqlInstanceIds: keys,
8995
},
9096
}, execinfrapb.PostProcessSpec{}, mergeCoordinatorOutputTypes, nil /* finalizeLastStageCb */)
9197

0 commit comments

Comments
 (0)