Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions example/runway/server/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ go_library(
"//runway/controller",
"//runway/controller/merge",
"//runway/controller/mergeconflictcheck",
"//runway/extension/merger",
"//runway/extension/merger/noop",
"@com_github_go_sql_driver_mysql//:mysql",
"@com_github_uber_go_tally//:tally",
"@org_golang_google_grpc//:grpc",
Expand Down
30 changes: 26 additions & 4 deletions example/runway/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ import (
"github.com/uber/submitqueue/runway/controller"
"github.com/uber/submitqueue/runway/controller/merge"
"github.com/uber/submitqueue/runway/controller/mergeconflictcheck"
"github.com/uber/submitqueue/runway/extension/merger"
"github.com/uber/submitqueue/runway/extension/merger/noop"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
Expand Down Expand Up @@ -152,9 +154,13 @@ func run() error {
),
)

mergerFactory := newMergerFactory()

mergeConflictCheckController := mergeconflictcheck.NewController(mergeconflictcheck.Params{
Logger: logger.Sugar(),
Scope: scope,
MergerFactory: mergerFactory,
Registry: registry,
TopicKey: runwaymq.TopicKeyMergeConflictCheck,
ConsumerGroup: "runway-mergeconflictcheck",
})
Expand Down Expand Up @@ -235,10 +241,21 @@ func run() error {
return err
}

// newTopicRegistry builds the TopicRegistry for Runway's consumed merge queues.
// Runway is the consumer of the merge-conflict-check and merge queues; each is
// registered with a consuming subscription. The corresponding signal queues
// (where results are published) are not wired yet.
// newMergerFactory returns a merger.Factory for the example server. The noop
// implementation always succeeds; a real deployment wires a VCS-backed factory.
func newMergerFactory() merger.Factory {
return &noopMergerFactory{}
}

type noopMergerFactory struct{}

func (f *noopMergerFactory) For(_ merger.Config) (merger.Merger, error) {
return noop.New(), nil
}

// newTopicRegistry builds the TopicRegistry for Runway's merge queues. Inbound
// topics (merge-conflict-check, merge) have subscriptions; outbound signal topics
// are publish-only.
func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRegistry, error) {
return consumer.NewTopicRegistry([]consumer.TopicConfig{
{
Expand All @@ -249,6 +266,11 @@ func newTopicRegistry(q extqueue.Queue, subscriberName string) (consumer.TopicRe
subscriberName, "runway-mergeconflictcheck",
),
},
{
Key: runwaymq.TopicKeyMergeConflictCheckSignal,
Name: "merge-conflict-check-signal",
Queue: q,
},
{
Key: runwaymq.TopicKeyMerge,
Name: "merge",
Expand Down
7 changes: 7 additions & 0 deletions runway/controller/mergeconflictcheck/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/metrics",
"//runway/extension/merger",
"@com_github_uber_go_tally//:tally",
"@org_uber_go_zap//:zap",
],
Expand All @@ -20,8 +23,12 @@ go_test(
embed = [":mergeconflictcheck"],
deps = [
"//api/runway/messagequeue",
"//api/runway/messagequeue/protopb",
"//platform/base/messagequeue",
"//platform/consumer",
"//platform/extension/messagequeue/mock",
"//runway/extension/merger",
"//runway/extension/merger/mock",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_github_uber_go_tally//:tally",
Expand Down
83 changes: 73 additions & 10 deletions runway/controller/mergeconflictcheck/mergeconflictcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,26 @@

// Package mergeconflictcheck consumes dry-run merge-conflict check requests from
// Runway's merge-conflict-check queue. A request asks whether an ordered sequence
// of merge steps applies cleanly onto the target branch without committing.
// of merge steps can be applied cleanly onto the merge target.
//
// Currently a parse-and-log stub: it deserializes the MergeRequest off the queue
// and logs it. The real check (attempt the merge without committing and publish a
// MergeResult to the merge-conflict-check-signal queue) is not wired yet.
// The controller obtains a Merger for the request's merge target, calls
// CheckMergeability, and publishes the MergeResult to the
// merge-conflict-check-signal queue. A merge conflict is an expected outcome
// (ack + publish FAILED result), not an infrastructure error.
package mergeconflictcheck

import (
"context"
"errors"
"fmt"

"github.com/uber-go/tally"
runwaymq "github.com/uber/submitqueue/api/runway/messagequeue"
runwaypb "github.com/uber/submitqueue/api/runway/messagequeue/protopb"
entityqueue "github.com/uber/submitqueue/platform/base/messagequeue"
"github.com/uber/submitqueue/platform/consumer"
"github.com/uber/submitqueue/platform/metrics"
"github.com/uber/submitqueue/runway/extension/merger"
"go.uber.org/zap"
)

Expand All @@ -39,6 +44,8 @@ var _ consumer.Controller = (*Controller)(nil)
type Controller struct {
logger *zap.SugaredLogger
metricsScope tally.Scope
mergerFactory merger.Factory
registry consumer.TopicRegistry
topicKey consumer.TopicKey
consumerGroup string
}
Expand All @@ -48,6 +55,9 @@ type Params struct {
TopicKey consumer.TopicKey
ConsumerGroup string

MergerFactory merger.Factory
Registry consumer.TopicRegistry

Scope tally.Scope
Logger *zap.SugaredLogger
}
Expand All @@ -57,13 +67,15 @@ func NewController(p Params) *Controller {
return &Controller{
logger: p.Logger.Named("mergeconflictcheck_controller"),
metricsScope: p.Scope.SubScope("mergeconflictcheck_controller"),
mergerFactory: p.MergerFactory,
registry: p.Registry,
topicKey: p.TopicKey,
consumerGroup: p.ConsumerGroup,
}
}

// Process deserializes the merge request and logs it. Returns nil to ack, or an
// error to nack.
// Process deserializes the merge request, performs a dry-run merge check, and
// publishes the result. Returns nil to ack, or an error to nack.
func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (retErr error) {
const opName = "process"

Expand All @@ -75,13 +87,9 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
request := &runwaymq.MergeRequest{}
if err := runwaymq.Unmarshal(msg.Payload, request); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "deserialize_errors", 1)
// Non-retryable: a malformed payload will never deserialize on retry.
return fmt.Errorf("failed to deserialize merge request: %w", err)
}

// TODO: attempt the ordered merge steps without committing and publish a
// MergeResult to the merge-conflict-check-signal queue. For now the request
// is only logged after parsing.
c.logger.Infow("received merge-conflict-check request",
"id", request.Id,
"queue_name", request.QueueName,
Expand All @@ -90,6 +98,61 @@ func (c *Controller) Process(ctx context.Context, delivery consumer.Delivery) (r
"partition_key", msg.PartitionKey,
)

m, err := c.mergerFactory.For(merger.Config{QueueName: request.GetQueueName()})
if err != nil {
metrics.NamedCounter(c.metricsScope, opName, "factory_errors", 1)
return fmt.Errorf("failed to create merger for queue %s: %w", request.GetQueueName(), err)
}

result, err := m.CheckMergeability(ctx, request)
if err != nil {
if !errors.Is(err, merger.ErrConflict) {
metrics.NamedCounter(c.metricsScope, opName, "check_errors", 1)
return fmt.Errorf("failed to check mergeability for %s: %w", request.GetId(), err)
}
metrics.NamedCounter(c.metricsScope, opName, "merge_conflicts", 1)
c.logger.Infow("merge conflict detected",
"id", request.GetId(),
"queue_name", request.GetQueueName(),
)
result = &runwaymq.MergeResult{
Id: request.GetId(),
Outcome: runwaypb.Outcome_FAILED,
Reason: err.Error(),
}
}

if err := c.publish(ctx, runwaymq.TopicKeyMergeConflictCheckSignal, result, msg.PartitionKey); err != nil {
metrics.NamedCounter(c.metricsScope, opName, "publish_errors", 1)
return fmt.Errorf("failed to publish merge-conflict-check result for %s: %w", request.GetId(), err)
}

return nil
}

// publish serializes a MergeResult and publishes it to the given signal topic.
func (c *Controller) publish(ctx context.Context, key consumer.TopicKey, result *runwaymq.MergeResult, partitionKey string) error {
payload, err := runwaymq.Marshal(result)
if err != nil {
return fmt.Errorf("failed to serialize merge result: %w", err)
}

msg := entityqueue.NewMessage(result.GetId(), payload, partitionKey, nil)

q, ok := c.registry.Queue(key)
if !ok {
return fmt.Errorf("no queue registered for topic key %s", key)
}

topicName, ok := c.registry.TopicName(key)
if !ok {
return fmt.Errorf("no topic name registered for topic key %s", key)
}

if err := q.Publisher().Publish(ctx, topicName, msg); err != nil {
return fmt.Errorf("failed to publish message: %w", err)
}

return nil
}

Expand Down
Loading
Loading