Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions example/runway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ func run() error {
mergeController := merge.NewController(merge.Params{
Logger: logger.Sugar(),
Scope: scope,
MergerFactory: mergerFactory,
Registry: registry,
TopicKey: runwaymq.TopicKeyMerge,
ConsumerGroup: "runway-merge",
})
Expand Down Expand Up @@ -279,5 +281,10 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
subscriberName, "runway-merge",
),
},
{
Key: runwaymq.TopicKeyMergeSignal,
Name: "merge-signal",
Queue: q,
},
})
}
7 changes: 7 additions & 0 deletions runway/controller/merge/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/metrics",
"//runway/extension/merger",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -20,8 +23,12 @@ go_test(
embed = [":merge"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/extension/messagequeue/mock",
"//runway/extension/merger",
"//runway/extension/merger/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
86 changes: 74 additions & 12 deletions runway/controller/merge/merge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,23 +13,27 @@
// limitations under the License.

// Package merge consumes committing merge requests from Runway's merge queue. A
// request asks Runway to apply an ordered sequence of merge steps onto the target
// branch and commit the result.
// request asks Runway to apply an ordered sequence of merge steps onto the merge
// target and commit the result.
//
// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue
// and logs it. The real merge (apply and commit the steps, then publish a
// MergeResult with the produced revisions to the merge-signal queue) is not wired
// yet.
// The controller obtains a Merger for the request's merge target, calls Merge,
// and publishes the MergeResult (with the produced revisions) to the merge-signal
// queue. A merge conflict is an expected outcome (ack + publish FAILED result),
// not an infrastructure error.
package merge

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally"
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/runway/extension/merger"
"go.uber.org/zap"
)

Expand All @@ -40,6 +44,8 @@ var _ consumer.Controller = (*Controller)(nil)
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
mergerFactory merger.Factory
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
}
Expand All @@ -49,6 +55,9 @@ type Params struct {
TopicKey consumer.TopicKey
ConsumerGroup string

MergerFactory merger.Factory
Registry consumer.TopicRegistry

Scope tally.Scope
Logger *zap.SugaredLogger
}
Expand All @@ -58,13 +67,15 @@ func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("merge_controller"),
metricsScope: p.Scope.SubScope("merge_controller"),
mergerFactory: p.MergerFactory,
registry: p.Registry,
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
}
}

// Process deserializes the merge request and logs it. Returns nil to ack, or an
// error to nack.
// Process deserializes the merge request, performs the committing merge, and
// publishes the result. Returns nil to ack, or an error to nack.
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

Expand All @@ -76,13 +87,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
request := &runwaymq.MergeRequest{}
if err := runwaymq.Unmarshal(msg.Payload, request); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
// Non-retryable: a malformed payload will never deserialize on retry.
return fmt.Errorf("failed to deserialize merge request: %w", err)
}

// TODO: apply and commit the ordered merge steps and publish a MergeResult
// with the produced revisions to the merge-signal queue. For now the request
// is only logged after parsing.
c.logger.Infow("received merge request",
"id", request.Id,
"queue_name", request.QueueName,
Expand All @@ -91,6 +98,61 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
"partition_key", msg.PartitionKey,
)

m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()})
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1)
return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err)
}

result, err := m.Merge(ctx, request)
if err != nil {
if !errors.Is(err, merger.ErrConflict) {
metrics.NamedCounter(c.metricsScope, opName, "merge_errors", 1)
return fmt.Errorf("failed to merge for %s: %w", request.GetId(), err)
}
metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1)
c.logger.Infow("merge conflict detected",
"id", request.GetId(),
"queue_name", request.GetQueueName(),
)
result = &runwaymq.MergeResult{
Id: request.GetId(),
Outcome: runwaypb.Outcome_FAILED,
Reason: err.Error(),
}
}

if err := c.publish(ctx, runwaymq.TopicKeyMergeSignal, result, msg.PartitionKey); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish merge result for %s: %w", request.GetId(), err)
}

return nil
}

// publish serializes a MergeResult and publishes it to the given signal topic.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error {
payload, err := runwaymq.Marshal(result)
if err != nil {
return fmt.Errorf("failed to serialize merge result: %w", err)
}

msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", key)
}

topicName, ok := c.registry.TopicName(key)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", key)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

Expand Down
Loading
Loading