1313// limitations under the License.
1414
1515// Package merge consumes committing merge requests from Runway's merge queue. A
16- // request asks Runway to apply an ordered sequence of merge steps onto the target
17- // branch and commit the result.
16+ // request asks Runway to apply an ordered sequence of merge steps onto the merge
17+ // target and commit the result.
1818//
19- // Currently a parse-and-log stub: it deserializes the MergeRequest off the queue
20- // and logs it. The real merge (apply and commit the steps, then publish a
21- // MergeResult with the produced revisions to the merge-signal queue) is not wired
22- // yet .
19+ // The controller obtains a Merger for the request's merge target, calls Merge,
20+ // and publishes the MergeResult (with the produced revisions) to the merge-signal
21+ // queue. A merge conflict is an expected outcome (ack + publish FAILED result),
22+ // not an infrastructure error .
2323package merge
2424
2525import (
2626 "context"
27+ "errors"
2728 "fmt"
2829
2930 "github.com/uber-go/tally"
3031 runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
32+ runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb"
33+ entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
3134 "github.com/uber/submitqueue/platform/consumer"
3235 "github.com/uber/submitqueue/platform/metrics"
36+ "github.com/uber/submitqueue/runway/extension/merger"
3337 "go.uber.org/zap"
3438)
3539
@@ -40,6 +44,8 @@ var _ consumer.Controller = (*Controller)(nil)
4044type Controller struct {
4145 logger * zap.SugaredLogger
4246 metricsScope tally.Scope
47+ mergerFactory merger.Factory
48+ registry consumer.TopicRegistry
4349 topicKey consumer.TopicKey
4450 consumerGroup string
4551}
@@ -49,6 +55,9 @@ type Params struct {
4955 TopicKey consumer.TopicKey
5056 ConsumerGroup string
5157
58+ MergerFactory merger.Factory
59+ Registry consumer.TopicRegistry
60+
5261 Scope tally.Scope
5362 Logger * zap.SugaredLogger
5463}
@@ -58,13 +67,15 @@ func NewController(p Params) *Controller {
5867 return & Controller {
5968 logger : p .Logger .Named ("merge_controller" ),
6069 metricsScope : p .Scope .SubScope ("merge_controller" ),
70+ mergerFactory : p .MergerFactory ,
71+ registry : p .Registry ,
6172 topicKey : p .TopicKey ,
6273 consumerGroup : p .ConsumerGroup ,
6374 }
6475}
6576
66- // Process deserializes the merge request and logs it. Returns nil to ack, or an
67- // error to nack.
77+ // Process deserializes the merge request, performs the committing merge, and
78+ // publishes the result. Returns nil to ack, or an error to nack.
6879func (c * Controller ) Process (ctx context.Context , delivery consumer.Delivery ) (retErr error ) {
6980 const opName = "process"
7081
@@ -76,13 +87,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
7687 request := & runwaymq.MergeRequest {}
7788 if err := runwaymq .Unmarshal (msg .Payload , request ); err != nil {
7889 metrics .NamedCounter (c .metricsScope , opName , "deserialize_errors" , 1 )
79- // Non-retryable: a malformed payload will never deserialize on retry.
8090 return fmt .Errorf ("failed to deserialize merge request: %w" , err )
8191 }
8292
83- // TODO: apply and commit the ordered merge steps and publish a MergeResult
84- // with the produced revisions to the merge-signal queue. For now the request
85- // is only logged after parsing.
8693 c .logger .Infow ("received merge request" ,
8794 "id" , request .Id ,
8895 "queue_name" , request .QueueName ,
@@ -91,6 +98,60 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
9198 "partition_key" , msg .PartitionKey ,
9299 )
93100
101+ m , err := c .mergerFactory .For (merger.Config {QueueName : request .GetQueueName ()})
102+ if err != nil {
103+ metrics .NamedCounter (c .metricsScope , opName , "factory_errors" , 1 )
104+ return fmt .Errorf ("failed to create merger for queue %s: %w" , request .GetQueueName (), err )
105+ }
106+
107+ result , err := m .Merge (ctx , request )
108+ if err != nil && ! errors .Is (err , merger .ErrConflict ) {
109+ metrics .NamedCounter (c .metricsScope , opName , "merge_errors" , 1 )
110+ return fmt .Errorf ("failed to merge for %s: %w" , request .GetId (), err )
111+ } else if err != nil {
112+ metrics .NamedCounter (c .metricsScope , opName , "merge_conflicts" , 1 )
113+ c .logger .Infow ("merge conflict detected" ,
114+ "id" , request .GetId (),
115+ "queue_name" , request .GetQueueName (),
116+ )
117+ result = & runwaymq.MergeResult {
118+ Id : request .GetId (),
119+ Outcome : runwaypb .Outcome_FAILED ,
120+ Reason : err .Error (),
121+ }
122+ }
123+
124+ if err := c .publish (ctx , runwaymq .TopicKeyMergeSignal , result , msg .PartitionKey ); err != nil {
125+ metrics .NamedCounter (c .metricsScope , opName , "publish_errors" , 1 )
126+ return fmt .Errorf ("failed to publish merge result for %s: %w" , request .GetId (), err )
127+ }
128+
129+ return nil
130+ }
131+
132+ // publish serializes a MergeResult and publishes it to the given signal topic.
133+ func (c * Controller ) publish (ctx context.Context , key consumer.TopicKey , result * runwaymq.MergeResult , partitionKey string ) error {
134+ payload , err := runwaymq .Marshal (result )
135+ if err != nil {
136+ return fmt .Errorf ("failed to serialize merge result: %w" , err )
137+ }
138+
139+ msg := entityqueue .NewMessage (result .GetId (), payload , partitionKey , nil )
140+
141+ q , ok := c .registry .Queue (key )
142+ if ! ok {
143+ return fmt .Errorf ("no queue registered for topic key %s" , key )
144+ }
145+
146+ topicName , ok := c .registry .TopicName (key )
147+ if ! ok {
148+ return fmt .Errorf ("no topic name registered for topic key %s" , key )
149+ }
150+
151+ if err := q .Publisher ().Publish (ctx , topicName , msg ); err != nil {
152+ return fmt .Errorf ("failed to publish message: %w" , err )
153+ }
154+
94155 return nil
95156}
96157
0 commit comments