diff --git a/example/runway/server/main.go b/example/runway/server/main.go index 85fef699..d694f705 100644 --- a/example/runway/server/main.go +++ b/example/runway/server/main.go @@ -171,6 +171,8 @@ func run() error { mergeController := merge.NewController(merge.Params{ Logger: logger.Sugar(), Scope: scope, + MergerFactory: mergerFactory, + Registry: registry, TopicKey: runwaymq.TopicKeyMerge, ConsumerGroup: "runway-merge", }) @@ -279,5 +281,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe subscriberName, "runway-merge", ), }, + { + Key: runwaymq.TopicKeyMergeSignal, + Name: "merge-signal", + Queue: q, + }, }) } diff --git a/runway/controller/merge/BUILD.bazel b/runway/controller/merge/BUILD.bazel index 3053e8f7..83e0229c 100644 --- a/runway/controller/merge/BUILD.bazel +++ b/runway/controller/merge/BUILD.bazel @@ -7,8 +7,11 @@ go_library( visibility = ["//visibility:public"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", + "//platform/base/messagequeue", "//platform/consumer", "//platform/metrics", + "//runway/extension/merger", "@com_github_uber_go_tally//:tally", "@org_uber_go_zap//:zap", ], @@ -20,8 +23,12 @@ go_test( embed = [":merge"], deps = [ "//api/runway/messagequeue", + "//api/runway/messagequeue/protopb", "//platform/base/messagequeue", + "//platform/consumer", "//platform/extension/messagequeue/mock", + "//runway/extension/merger", + "//runway/extension/merger/mock", "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", "@com_github_uber_go_tally//:tally", diff --git a/runway/controller/merge/merge.go b/runway/controller/merge/merge.go index d3f889ae..dfba9801 100644 --- a/runway/controller/merge/merge.go +++ b/runway/controller/merge/merge.go @@ -13,23 +13,27 @@ // limitations under the License. // Package merge consumes committing merge requests from Runway's merge queue. A -// request asks Runway to apply an ordered sequence of merge steps onto the target -// branch and commit the result. +// request asks Runway to apply an ordered sequence of merge steps onto the merge +// target and commit the result. // -// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue -// and logs it. The real merge (apply and commit the steps, then publish a -// MergeResult with the produced revisions to the merge-signal queue) is not wired -// yet. +// The controller obtains a Merger for the request's merge target, calls Merge, +// and publishes the MergeResult (with the produced revisions) to the merge-signal +// queue. A merge conflict is an expected outcome (ack + publish FAILED result), +// not an infrastructure error. package merge import ( "context" + "errors" "fmt" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" + entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" "github.com/uber/submitqueue/platform/consumer" "github.com/uber/submitqueue/platform/metrics" + "github.com/uber/submitqueue/runway/extension/merger" "go.uber.org/zap" ) @@ -40,6 +44,8 @@ var _ consumer.Controller = (*Controller)(nil) type Controller struct { logger *zap.SugaredLogger metricsScope tally.Scope + mergerFactory merger.Factory + registry consumer.TopicRegistry topicKey consumer.TopicKey consumerGroup string } @@ -49,6 +55,9 @@ type Params struct { TopicKey consumer.TopicKey ConsumerGroup string + MergerFactory merger.Factory + Registry consumer.TopicRegistry + Scope tally.Scope Logger *zap.SugaredLogger } @@ -58,13 +67,15 @@ func NewController(p Params) *Controller { return &Controller{ logger: p.Logger.Named("merge_controller"), metricsScope: p.Scope.SubScope("merge_controller"), + mergerFactory: p.MergerFactory, + registry: p.Registry, topicKey: p.TopicKey, consumerGroup: p.ConsumerGroup, } } -// Process deserializes the merge request and logs it. Returns nil to ack, or an -// error to nack. +// Process deserializes the merge request, performs the committing merge, and +// publishes the result. Returns nil to ack, or an error to nack. func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) { const opName = "process" @@ -76,13 +87,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r request := &runwaymq.MergeRequest{} if err := runwaymq.Unmarshal(msg.Payload, request); err != nil { metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1) - // Non-retryable: a malformed payload will never deserialize on retry. return fmt.Errorf("failed to deserialize merge request: %w", err) } - // TODO: apply and commit the ordered merge steps and publish a MergeResult - // with the produced revisions to the merge-signal queue. For now the request - // is only logged after parsing. c.logger.Infow("received merge request", "id", request.Id, "queue_name", request.QueueName, @@ -91,6 +98,61 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r "partition_key", msg.PartitionKey, ) + m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()}) + if err != nil { + metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1) + return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err) + } + + result, err := m.Merge(ctx, request) + if err != nil { + if !errors.Is(err, merger.ErrConflict) { + metrics.NamedCounter(c.metricsScope, opName, "merge_errors", 1) + return fmt.Errorf("failed to merge for %s: %w", request.GetId(), err) + } + metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1) + c.logger.Infow("merge conflict detected", + "id", request.GetId(), + "queue_name", request.GetQueueName(), + ) + result = &runwaymq.MergeResult{ + Id: request.GetId(), + Outcome: runwaypb.Outcome_FAILED, + Reason: err.Error(), + } + } + + if err := c.publish(ctx, runwaymq.TopicKeyMergeSignal, result, msg.PartitionKey); err != nil { + metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1) + return fmt.Errorf("failed to publish merge result for %s: %w", request.GetId(), err) + } + + return nil +} + +// publish serializes a MergeResult and publishes it to the given signal topic. +func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error { + payload, err := runwaymq.Marshal(result) + if err != nil { + return fmt.Errorf("failed to serialize merge result: %w", err) + } + + msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil) + + q, ok := c.registry.Queue(key) + if !ok { + return fmt.Errorf("no queue registered for topic key %s", key) + } + + topicName, ok := c.registry.TopicName(key) + if !ok { + return fmt.Errorf("no topic name registered for topic key %s", key) + } + + if err := q.Publisher().Publish(ctx, topicName, msg); err != nil { + return fmt.Errorf("failed to publish message: %w", err) + } + return nil } diff --git a/runway/controller/merge/merge_test.go b/runway/controller/merge/merge_test.go index c74135eb..0b6ce6e6 100644 --- a/runway/controller/merge/merge_test.go +++ b/runway/controller/merge/merge_test.go @@ -16,14 +16,19 @@ package merge import ( "context" + "fmt" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/uber-go/tally" runwaymq "github.com/uber/submitqueue/api/runway/messagequeue" + runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb" entityqueue "github.com/uber/submitqueue/platform/base/messagequeue" + "github.com/uber/submitqueue/platform/consumer" queuemock "github.com/uber/submitqueue/platform/extension/messagequeue/mock" + "github.com/uber/submitqueue/runway/extension/merger" + mergermock "github.com/uber/submitqueue/runway/extension/merger/mock" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) @@ -34,16 +39,6 @@ const ( testPartitionKey = "test-queue" ) -func newController(t *testing.T) *Controller { - t.Helper() - return NewController(Params{ - Logger: zaptest.NewLogger(t).Sugar(), - Scope: tally.NoopScope, - TopicKey: runwaymq.TopicKeyMerge, - ConsumerGroup: "runway-merge", - }) -} - func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemock.MockDelivery { t.Helper() msg := entityqueue.NewMessage(testID, payload, testPartitionKey, nil) @@ -53,26 +48,140 @@ func newDelivery(t *testing.T, ctrl *gomock.Controller, payload []byte) *queuemo return d } -func requestPayload(t *testing.T, req runwaymq.MergeRequest) []byte { +func requestPayload(t *testing.T, req *runwaymq.MergeRequest) []byte { t.Helper() - payload, err := runwaymq.Marshal(&req) + payload, err := runwaymq.Marshal(req) require.NoError(t, err) return payload } +func newRegistry(t *testing.T, ctrl *gomock.Controller, publishErr error) (consumer.TopicRegistry, *queuemock.MockPublisher) { + t.Helper() + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, _ string, _ entityqueue.Message) error { + return publishErr + }, + ).AnyTimes() + + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeSignal, Name: "merge-signal", Queue: q}, + }) + require.NoError(t, err) + return registry, pub +} + +func newController(t *testing.T, factory merger.Factory, registry consumer.TopicRegistry) *Controller { + t.Helper() + return NewController(Params{ + Logger: zaptest.NewLogger(t).Sugar(), + Scope: tally.NoopScope, + MergerFactory: factory, + Registry: registry, + TopicKey: runwaymq.TopicKeyMerge, + ConsumerGroup: "runway-merge", + }) +} + func TestNewController(t *testing.T) { - controller := newController(t) + ctrl := gomock.NewController(t) + factory := mergermock.NewMockFactory(ctrl) + registry, _ := newRegistry(t, ctrl, nil) + + controller := newController(t, factory, registry) require.NotNil(t, controller) assert.Equal(t, runwaymq.TopicKeyMerge, controller.TopicKey()) assert.Equal(t, "runway-merge", controller.ConsumerGroup()) assert.Equal(t, "merge", controller.Name()) } -func TestProcess_LogsParsedRequest(t *testing.T) { +func TestProcess_Success(t *testing.T) { + ctrl := gomock.NewController(t) + + expectedResult := &runwaymq.MergeResult{ + Id: testID, + Outcome: runwaypb.Outcome_SUCCEEDED, + Steps: []*runwaymq.StepResult{ + {StepId: "step-1", Outputs: []*runwaymq.StepOutput{{Id: "abc123"}}}, + }, + } + + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().Merge(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeSignal, Name: "merge-signal", Queue: q}, + }) + require.NoError(t, err) + + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "merge-signal", gotTopic) + result := &runwaymq.MergeResult{} + require.NoError(t, runwaymq.Unmarshal(gotPayload, result)) + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_SUCCEEDED, result.Outcome) + require.Len(t, result.Steps, 1) + require.Len(t, result.Steps[0].Outputs, 1) + assert.Equal(t, "abc123", result.Steps[0].Outputs[0].Id) +} + +func TestProcess_MergeConflict(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) - req := runwaymq.MergeRequest{ + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().Merge(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("conflict in foo.go: %w", merger.ErrConflict)) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + var gotTopic string + var gotPayload []byte + pub := queuemock.NewMockPublisher(ctrl) + pub.EXPECT().Publish(gomock.Any(), gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, topic string, msg entityqueue.Message) error { + gotTopic = topic + gotPayload = msg.Payload + return nil + }, + ) + q := queuemock.NewMockQueue(ctrl) + q.EXPECT().Publisher().Return(pub).AnyTimes() + registry, err := consumer.NewTopicRegistry([]consumer.TopicConfig{ + {Key: runwaymq.TopicKeyMergeSignal, Name: "merge-signal", Queue: q}, + }) + require.NoError(t, err) + + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ Id: testID, QueueName: testQueue, Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, @@ -80,11 +189,91 @@ func TestProcess_LogsParsedRequest(t *testing.T) { delivery := newDelivery(t, ctrl, requestPayload(t, req)) require.NoError(t, controller.Process(context.Background(), delivery)) + + assert.Equal(t, "merge-signal", gotTopic) + result := &runwaymq.MergeResult{} + require.NoError(t, runwaymq.Unmarshal(gotPayload, result)) + assert.Equal(t, testID, result.Id) + assert.Equal(t, runwaypb.Outcome_FAILED, result.Outcome) + assert.NotEmpty(t, result.Reason) +} + +func TestProcess_MergerInfraError(t *testing.T) { + ctrl := gomock.NewController(t) + + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().Merge(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("git timeout")) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + registry, _ := newRegistry(t, ctrl, nil) + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) +} + +func TestProcess_FactoryError(t *testing.T) { + ctrl := gomock.NewController(t) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(nil, fmt.Errorf("unknown queue")) + + registry, _ := newRegistry(t, ctrl, nil) + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) +} + +func TestProcess_PublishError(t *testing.T) { + ctrl := gomock.NewController(t) + + expectedResult := &runwaymq.MergeResult{ + Id: testID, + Outcome: runwaypb.Outcome_SUCCEEDED, + } + + m := mergermock.NewMockMerger(ctrl) + m.EXPECT().Merge(gomock.Any(), gomock.Any()).Return(expectedResult, nil) + + factory := mergermock.NewMockFactory(ctrl) + factory.EXPECT().For(merger.Config{QueueName: testQueue}).Return(m, nil) + + registry, _ := newRegistry(t, ctrl, fmt.Errorf("publish failed")) + controller := newController(t, factory, registry) + + req := &runwaymq.MergeRequest{ + Id: testID, + QueueName: testQueue, + Steps: []*runwaymq.MergeStep{{StepId: "step-1"}}, + } + delivery := newDelivery(t, ctrl, requestPayload(t, req)) + + err := controller.Process(context.Background(), delivery) + require.Error(t, err) } func TestProcess_DeserializeError(t *testing.T) { ctrl := gomock.NewController(t) - controller := newController(t) + factory := mergermock.NewMockFactory(ctrl) + registry, _ := newRegistry(t, ctrl, nil) + controller := newController(t, factory, registry) delivery := newDelivery(t, ctrl, []byte(`{"id": not json}`))