diff --git a/controller/BUILD.bazel b/controller/BUILD.bazel index 581ba3b..521d3f3 100644 --- a/controller/BUILD.bazel +++ b/controller/BUILD.bazel @@ -3,6 +3,7 @@ load("@rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "controller", srcs = [ + "comparer.go", "controller.go", "distance_filter.go", "errors.go", diff --git a/controller/comparer.go b/controller/comparer.go new file mode 100644 index 0000000..0a136ca --- /dev/null +++ b/controller/comparer.go @@ -0,0 +1,659 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "fmt" + "time" + + "github.com/uber-go/tally" + "github.com/uber/tango/core/common" + pb "github.com/uber/tango/tangopb" + "go.uber.org/zap" +) + +// targetGraphComparer diffs two target graph streams and produces a chunked +// GetChangedTargetsResponse stream. Created fresh per comparison call because +// the mappers accumulate per-call state. +type targetGraphComparer struct { + scope tally.Scope + targetMapper *common.NameIDMapper + ruleTypeMapper *common.NameIDMapper + tagMapper *common.NameIDMapper + attrNameMapper *common.NameIDMapper + attrValMapper *common.NameIDMapper + + changedTargetChunkSize int + metadataMapChunkSize int +} + +func newTargetGraphComparer(scope tally.Scope, changedTargetChunkSize, metadataMapChunkSize int) *targetGraphComparer { + return &targetGraphComparer{ + scope: scope.SubScope("compare_target_graphs"), + targetMapper: common.NewNameIDMapper(), + ruleTypeMapper: common.NewNameIDMapper(), + tagMapper: common.NewNameIDMapper(), + attrNameMapper: common.NewNameIDMapper(), + attrValMapper: common.NewNameIDMapper(), + changedTargetChunkSize: changedTargetChunkSize, + metadataMapChunkSize: metadataMapChunkSize, + } +} + +// Compare diffs two target graph streams. Targets are classified as NEW (only +// in second), DELETED (only in first), or CHANGED (present in both, differs). +// Distances are always computed: a target is a distance-0 seed when it is +// NEW, DELETED, a source file with a changed hash, or a rule whose own +// configuration (attributes or direct deps) changed. All other CHANGED +// targets get their distance from BFS over the reverse-dep graph. +// Output IDs are re-mapped into a canonical per-call namespace so the +// response metadata only carries the names actually referenced. +func (c *targetGraphComparer) Compare(ctx context.Context, logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) { + start := time.Now() + logger.Info("compareTargetGraphs: Computing differences between target graphs") + + // 1) Extract targets and metadata; index by canonical names + indexStart := time.Now() + firstTargetsByID, firstMetadata, err := getTargetsAndMetadata(ctx, firstGraph) + if err != nil { + return nil, err + } + secondTargetsByID, secondMetadata, err := getTargetsAndMetadata(ctx, secondGraph) + if err != nil { + return nil, err + } + // Release raw chunk slices — individual target protos are now held by the ID maps. + firstGraph = nil + secondGraph = nil + firstByName, err := buildNameIndex(ctx, firstTargetsByID, firstMetadata) + if err != nil { + return nil, err + } + firstTargetsByID = nil // all pointers are now in firstByName; drop the duplicate map + secondByName, err := buildNameIndex(ctx, secondTargetsByID, secondMetadata) + if err != nil { + return nil, err + } + secondTargetsByID = nil + indexDuration := time.Since(indexStart) + c.scope.Timer("index_duration").Record(indexDuration) + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + sourceFileRuleTypeID := detectSourceFileID(secondMetadata) + + changedByName := make(map[string]*pb.ChangedTarget) + seeds := make(map[string]struct{}) + + getTargetId := func(name string) int32 { return c.targetMapper.ID(name) } + getRuleTypeId := func(name string) int32 { return c.ruleTypeMapper.ID(name) } + getTagId := func(name string) int32 { return c.tagMapper.ID(name) } + getAttrNameId := func(name string) int32 { return c.attrNameMapper.ID(name) } + getAttrValId := func(name string) int32 { return c.attrValMapper.ID(name) } + + // Pass 1: walk second revision. Targets not in first revision are NEW (seeds). + // Targets in both with differing hashes are CHANGED; source-file CHANGED + // targets are also seeds. Rules that own a changed source file are promoted + // to seeds in pass 2 via hasChangedSourceFileDep. + diffScanStart := time.Now() + for name, newT := range secondByName { + oldT, exists := firstByName[name] + if !exists { + changedByName[name] = &pb.ChangedTarget{ + ChangeType: pb.CHANGE_TYPE_NEW, + NewTarget: transposeOptimizedTarget( + newT, + secondMetadata.GetTargetIdMapping(), + secondMetadata.GetRuleTypeMapping(), + secondMetadata.GetTagMapping(), + secondMetadata.GetAttributeNameMapping(), + secondMetadata.GetAttributeStringValueMapping(), + getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, + ), + } + seeds[name] = struct{}{} + continue + } + if oldT.GetHash() == newT.GetHash() { + continue + } + if sourceFileRuleTypeID != -1 && newT.GetRuleType() == sourceFileRuleTypeID { + seeds[name] = struct{}{} + } + newTarget := transposeOptimizedTarget( + newT, + secondMetadata.GetTargetIdMapping(), + secondMetadata.GetRuleTypeMapping(), + secondMetadata.GetTagMapping(), + secondMetadata.GetAttributeNameMapping(), + secondMetadata.GetAttributeStringValueMapping(), + getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, + ) + oldTarget := transposeOptimizedTarget( + oldT, + firstMetadata.GetTargetIdMapping(), + firstMetadata.GetRuleTypeMapping(), + firstMetadata.GetTagMapping(), + firstMetadata.GetAttributeNameMapping(), + firstMetadata.GetAttributeStringValueMapping(), + getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, + ) + changedByName[name] = &pb.ChangedTarget{ + ChangeType: pb.CHANGE_TYPE_CHANGED, + OldTarget: oldTarget, + NewTarget: newTarget, + } + } + diffScanDuration := time.Since(diffScanStart) + c.scope.Timer("diff_scan_duration").Record(diffScanDuration) + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Pass 2: decide which CHANGED rule targets are seeds (distance 0). + classifyStart := time.Now() + for name, ct := range changedByName { + if _, isSeed := seeds[name]; isSeed { + continue + } + if ct.GetChangeType() != pb.CHANGE_TYPE_CHANGED { + continue + } + newT := secondByName[name] + oldT := firstByName[name] + + anyChanged, depsChanged := changedDepStatus(oldT, firstMetadata, newT, secondMetadata, changedByName) + if !anyChanged { + seeds[name] = struct{}{} + continue + } + if depsChanged { + seeds[name] = struct{}{} + continue + } + if hasChangedSourceFileDep(newT, secondMetadata, changedByName, secondByName, sourceFileRuleTypeID) { + seeds[name] = struct{}{} + continue + } + attrsChanged, err := attributesChanged(oldT, firstMetadata, newT, secondMetadata) + if err != nil { + return nil, fmt.Errorf("failed to check attributes changed: %w", err) + } + if attrsChanged { + seeds[name] = struct{}{} + } + } + classifyDuration := time.Since(classifyStart) + c.scope.Timer("classify_duration").Record(classifyDuration) + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Pass 3: emit DELETED entries for targets present only in the first revision. + for name, oldT := range firstByName { + if _, exists := secondByName[name]; exists { + continue + } + changedByName[name] = &pb.ChangedTarget{ + ChangeType: pb.CHANGE_TYPE_DELETED, + OldTarget: transposeOptimizedTarget( + oldT, + firstMetadata.GetTargetIdMapping(), + firstMetadata.GetRuleTypeMapping(), + firstMetadata.GetTagMapping(), + firstMetadata.GetAttributeNameMapping(), + firstMetadata.GetAttributeStringValueMapping(), + getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, + ), + } + seeds[name] = struct{}{} + } + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Distances are always computed; seeds get 0, BFS assigns 1+ to consumers. + distancesStart := time.Now() + if err := computeDistances(ctx, changedByName, secondByName, secondMetadata, seeds, maxDist); err != nil { + return nil, err + } + distancesDuration := time.Since(distancesStart) + c.scope.Timer("distances_duration").Record(distancesDuration) + + if ctx.Err() != nil { + return nil, ctx.Err() + } + + // Collect changed targets. + changed := make([]*pb.ChangedTarget, 0, len(changedByName)) + for _, ct := range changedByName { + changed = append(changed, ct) + } + + // Emit changes in chunks to stay within gRPC per-message size limits, followed by chunked metadata. + var results []*pb.GetChangedTargetsResponse + for i := 0; i < len(changed); i += c.changedTargetChunkSize { + end := i + c.changedTargetChunkSize + if end > len(changed) { + end = len(changed) + } + results = append(results, &pb.GetChangedTargetsResponse{ + Item: &pb.GetChangedTargetsResponse_ChangedTargets{ + ChangedTargets: &pb.ChangedTargets{ + ChangedTargets: changed[i:end], + }, + }, + }) + } + if len(results) == 0 { + results = append(results, &pb.GetChangedTargetsResponse{ + Item: &pb.GetChangedTargetsResponse_ChangedTargets{ + ChangedTargets: &pb.ChangedTargets{}, + }, + }) + } + for _, meta := range common.ChunkMetadata( + c.targetMapper.Invert(), + c.ruleTypeMapper.Invert(), + c.tagMapper.Invert(), + c.attrNameMapper.Invert(), + c.attrValMapper.Invert(), + c.metadataMapChunkSize, + ) { + results = append(results, &pb.GetChangedTargetsResponse{ + Item: &pb.GetChangedTargetsResponse_Metadata{ + Metadata: meta, + }, + }) + } + totalDuration := time.Since(start) + logger.Info("compareTargetGraphs: Done", + zap.Duration("total_duration", totalDuration), + ) + c.scope.Timer("total_duration").Record(totalDuration) + return results, nil +} + +// cancelCheckInterval is how often long-running loops check ctx.Err(). +const cancelCheckInterval = 4096 + +// getTargetsAndMetadata builds ID->target maps and merges metadata from a target graph stream. +// Metadata may arrive in multiple chunks (e.g. when target_id_mapping exceeds the gRPC message +// size limit); all chunks are merged into a single Metadata so callers can use it uniformly. +func getTargetsAndMetadata(ctx context.Context, graph []*pb.GetTargetGraphResponse) (map[int32]*pb.OptimizedTarget, *pb.Metadata, error) { + targets := make(map[int32]*pb.OptimizedTarget) + merged := &pb.Metadata{ + TargetIdMapping: make(map[int32]string), + RuleTypeMapping: make(map[int32]string), + TagMapping: make(map[int32]string), + AttributeNameMapping: make(map[int32]string), + AttributeStringValueMapping: make(map[int32]string), + } + for _, chunk := range graph { + if ctx.Err() != nil { + return nil, nil, ctx.Err() + } + switch item := chunk.GetItem().(type) { + case *pb.GetTargetGraphResponse_Targets: + for _, t := range item.Targets.GetTargets() { + targets[t.GetId()] = t + } + case *pb.GetTargetGraphResponse_Metadata: + m := item.Metadata + for k, v := range m.GetTargetIdMapping() { + merged.TargetIdMapping[k] = v + } + for k, v := range m.GetRuleTypeMapping() { + merged.RuleTypeMapping[k] = v + } + for k, v := range m.GetTagMapping() { + merged.TagMapping[k] = v + } + for k, v := range m.GetAttributeNameMapping() { + merged.AttributeNameMapping[k] = v + } + for k, v := range m.GetAttributeStringValueMapping() { + merged.AttributeStringValueMapping[k] = v + } + } + } + return targets, merged, nil +} + +// buildNameIndex creates name->target maps using the provided metadata information. +func buildNameIndex(ctx context.Context, targetsByID map[int32]*pb.OptimizedTarget, meta *pb.Metadata) (map[string]*pb.OptimizedTarget, error) { + byName := make(map[string]*pb.OptimizedTarget, len(targetsByID)) + i := 0 + for id, t := range targetsByID { + if i%cancelCheckInterval == 0 && ctx.Err() != nil { + return nil, ctx.Err() + } + i++ + name, err := canonicalTargetName(id, meta) + if err != nil { + // If a target ID is missing in metadata, skip it. + continue + } + byName[name] = t + } + return byName, nil +} + +// detectSourceFileID returns the literal rule type name for source file if present. +func detectSourceFileID(meta *pb.Metadata) int32 { + if meta == nil || len(meta.GetRuleTypeMapping()) == 0 { + return -1 + } + for id, name := range meta.GetRuleTypeMapping() { + if name == "source file" { + return id + } + } + return -1 +} + +// canonicalTargetName returns a stable identifier for a target using metadata mapping when available. +func canonicalTargetName(id int32, meta *pb.Metadata) (string, error) { + if meta != nil { + if name, ok := meta.GetTargetIdMapping()[id]; ok && name != "" { + return name, nil + } + } + return "", fmt.Errorf("target id %d not found in metadata", id) +} + +// changedDepStatus reports two facts about a CHANGED rule's direct deps in a +// single pass over newTarget.GetDirectDependencies(): +// - anyChanged: at least one current direct dep is itself CHANGED between +// the two revisions (i.e. appears as CHANGE_TYPE_CHANGED in changedByName). +// - setDiffered: the *set of dep names* — not their hashes — differs between +// old and new. A dep changing its hash while keeping the same name leaves +// setDiffered false; that case is handled by BFS reaching the consumer at +// distance >= 1. +// +// The name-set walk over oldTarget is skipped entirely when lengths already +// disagree (setDiffered is trivially true) or when anyChanged is false and +// the caller will seed the rule regardless of setDiffered. +func changedDepStatus( + oldTarget *pb.OptimizedTarget, + oldMeta *pb.Metadata, + newTarget *pb.OptimizedTarget, + newMeta *pb.Metadata, + changedByName map[string]*pb.ChangedTarget, +) (anyChanged, setDiffered bool) { + if newTarget == nil || newMeta == nil { + return false, false + } + + newDepIDs := newTarget.GetDirectDependencies() + newIDMap := newMeta.GetTargetIdMapping() + + var oldDepIDs []int32 + var oldIDMap map[int32]string + if oldTarget != nil && oldMeta != nil { + oldDepIDs = oldTarget.GetDirectDependencies() + oldIDMap = oldMeta.GetTargetIdMapping() + } + + lengthsMatch := len(oldDepIDs) == len(newDepIDs) + var newDepSet map[string]struct{} + if lengthsMatch && len(newDepIDs) > 0 { + newDepSet = make(map[string]struct{}, len(newDepIDs)) + } + + for _, depID := range newDepIDs { + name := newIDMap[depID] + if name == "" { + continue + } + if !anyChanged { + if ct, ok := changedByName[name]; ok && ct.GetChangeType() == pb.CHANGE_TYPE_CHANGED { + anyChanged = true + } + } + if newDepSet != nil { + newDepSet[name] = struct{}{} + } + } + + if !lengthsMatch { + return anyChanged, true + } + for _, depID := range oldDepIDs { + name := oldIDMap[depID] + if name == "" { + continue + } + if _, exists := newDepSet[name]; !exists { + return anyChanged, true + } + } + return anyChanged, false +} + +// hasChangedSourceFileDep reports whether any direct dependency of the given +// target is a changed source file. +func hasChangedSourceFileDep( + target *pb.OptimizedTarget, + meta *pb.Metadata, + changedByName map[string]*pb.ChangedTarget, + targetsByName map[string]*pb.OptimizedTarget, + sourceFileRuleTypeID int32, +) bool { + if target == nil || meta == nil || sourceFileRuleTypeID == -1 { + return false + } + idMapping := meta.GetTargetIdMapping() + for _, depID := range target.GetDirectDependencies() { + depName := idMapping[depID] + if depName == "" { + continue + } + if _, changed := changedByName[depName]; !changed { + continue + } + if depTarget, ok := targetsByName[depName]; ok && depTarget.GetRuleType() == sourceFileRuleTypeID { + return true + } + } + return false +} + +// attributesChanged checks if the attributes changed between old and new targets. +func attributesChanged(oldTarget *pb.OptimizedTarget, oldMeta *pb.Metadata, newTarget *pb.OptimizedTarget, newMeta *pb.Metadata) (bool, error) { + if oldMeta == nil || newMeta == nil { + return false, nil + } + if err := validateTargetNames(oldTarget, newTarget, oldMeta, newMeta); err != nil { + return false, err + } + + oldAttrIDs := oldTarget.GetAttributes() + newAttrIDs := newTarget.GetAttributes() + + if len(oldAttrIDs) != len(newAttrIDs) { + return true, nil + } + if len(oldAttrIDs) == 0 { + return false, nil + } + + oldAttrNameMapping := oldMeta.GetAttributeNameMapping() + oldAttrValMapping := oldMeta.GetAttributeStringValueMapping() + newAttrNameMapping := newMeta.GetAttributeNameMapping() + newAttrValMapping := newMeta.GetAttributeStringValueMapping() + + newAttrMap := make(map[string]string, len(newAttrIDs)) + for attrNameID, attrValID := range newAttrIDs { + if attrName := newAttrNameMapping[attrNameID]; attrName != "" { + newAttrMap[attrName] = newAttrValMapping[attrValID] + } + } + + for attrNameID, attrValID := range oldAttrIDs { + if attrName := oldAttrNameMapping[attrNameID]; attrName != "" { + oldVal := oldAttrValMapping[attrValID] + newVal, exists := newAttrMap[attrName] + if !exists || newVal != oldVal { + return true, nil + } + } + } + return false, nil +} + +// validateTargetNames checks if the target names are the same between old and new targets. +func validateTargetNames(oldTarget, newTarget *pb.OptimizedTarget, oldMeta, newMeta *pb.Metadata) error { + oldTargetName, ok := oldMeta.GetTargetIdMapping()[oldTarget.GetId()] + if !ok { + return fmt.Errorf("old target id %d not found in metadata", oldTarget.GetId()) + } + newTargetName, ok := newMeta.GetTargetIdMapping()[newTarget.GetId()] + if !ok { + return fmt.Errorf("new target id %d not found in metadata", newTarget.GetId()) + } + if oldTargetName != newTargetName { + return fmt.Errorf("target names are different %s != %s", oldTargetName, newTargetName) + } + return nil +} + +// transposeOptimizedTarget remaps a target into the canonical ID space using name-based mappers. +func transposeOptimizedTarget( + src *pb.OptimizedTarget, + oldTargetIdMap map[int32]string, + oldRuleTypeIdMap map[int32]string, + oldTagIdMap map[int32]string, + attrNameIdMap map[int32]string, + attrValIdMap map[int32]string, + getTargetId func(string) int32, + getRuleTypeId func(string) int32, + getTagId func(string) int32, + getAttrNameId func(string) int32, + getAttrValId func(string) int32, +) *pb.OptimizedTarget { + if src == nil { + return nil + } + dst := &pb.OptimizedTarget{ + Id: getTargetId(oldTargetIdMap[src.GetId()]), + Hash: src.GetHash(), + Root: src.GetRoot(), + External: src.GetExternal(), + } + // Direct deps + deps := src.GetDirectDependencies() + if len(deps) > 0 { + out := make([]int32, 0, len(deps)) + for _, d := range deps { + out = append(out, getTargetId(oldTargetIdMap[d])) + } + dst.DirectDependencies = out + } + // Rule type + if rtName := oldRuleTypeIdMap[src.GetRuleType()]; rtName != "" { + dst.RuleType = getRuleTypeId(rtName) + } + // Tags + if tags := src.GetTags(); len(tags) > 0 { + out := make([]int32, 0, len(tags)) + for _, tg := range tags { + out = append(out, getTagId(oldTagIdMap[tg])) + } + dst.Tags = out + } + // Attributes + if attrs := src.GetAttributes(); len(attrs) > 0 { + out := make(map[int32]int32, len(attrs)) + for k, v := range attrs { + name := attrNameIdMap[k] + val := attrValIdMap[v] + out[getAttrNameId(name)] = getAttrValId(val) + } + dst.Attributes = out + } + return dst +} + +// computeDistances assigns each CHANGED target its BFS distance from the +// nearest distance-0 seed in the reverse-dependency graph. +func computeDistances(ctx context.Context, changedByName map[string]*pb.ChangedTarget, targetsByName map[string]*pb.OptimizedTarget, meta *pb.Metadata, seeds map[string]struct{}, maxDistance int32) error { + if meta == nil { + return nil + } + + targetIDMapping := meta.GetTargetIdMapping() + + reverseDeps := make(map[string][]string, len(targetsByName)) + revDepIter := 0 + for name, t := range targetsByName { + if revDepIter%cancelCheckInterval == 0 && ctx.Err() != nil { + return ctx.Err() + } + revDepIter++ + for _, depID := range t.GetDirectDependencies() { + depName := targetIDMapping[depID] + if depName != "" { + reverseDeps[depName] = append(reverseDeps[depName], name) + } + } + } + + var queue []string + visited := make(map[string]struct{}, len(changedByName)) + for name, ct := range changedByName { + if _, isSeed := seeds[name]; isSeed { + ct.Distance = 0 + queue = append(queue, name) + visited[name] = struct{}{} + } else { + ct.Distance = -1 + } + } + + bfsIter := 0 + for len(queue) > 0 { + if bfsIter%cancelCheckInterval == 0 && ctx.Err() != nil { + return ctx.Err() + } + bfsIter++ + current := queue[0] + queue = queue[1:] + currentDist := changedByName[current].GetDistance() + + for _, revDep := range reverseDeps[current] { + if _, seen := visited[revDep]; seen { + continue + } + nextDist := currentDist + 1 + if maxDistance >= 0 && nextDist > maxDistance { + continue + } + visited[revDep] = struct{}{} + queue = append(queue, revDep) + + if ct, ok := changedByName[revDep]; ok { + ct.Distance = nextDist + } + } + } + return nil +} diff --git a/controller/getchangedtargets.go b/controller/getchangedtargets.go index 1014984..a70c95d 100644 --- a/controller/getchangedtargets.go +++ b/controller/getchangedtargets.go @@ -255,7 +255,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str jobs[1].graphStreamChunks = nil compareStart := time.Now() - changedTargetsResponses, err := c.compareTargetGraphs(ctx, logger, firstGraph, secondGraph, maxDist) + changedTargetsResponses, err := newTargetGraphComparer(c.scope, c.changedTargetChunkSize, c.metadataMapChunkSize).Compare(ctx, logger, firstGraph, secondGraph, maxDist) // Allow GC of raw graph data while the caching goroutine runs. firstGraph = nil secondGraph = nil @@ -322,595 +322,6 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str return nil } -// compareTargetGraphs diffs two target graph streams and produces a chunked -// GetChangedTargetsResponse stream. Targets are classified as NEW (only in -// second), DELETED (only in first), or CHANGED (present in both, differs). -// Distances are always computed: a target is a distance-0 seed when it is -// NEW, DELETED, a source file with a changed hash, or a rule whose own -// configuration (attributes or direct deps) changed. All other CHANGED -// targets get their distance from BFS over the reverse-dep graph. -// Output IDs are re-mapped into a canonical per-call namespace so the -// response metadata only carries the names actually referenced. -func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) { - start := time.Now() - scope := c.scope.SubScope("compare_target_graphs") - logger.Info("compareTargetGraphs: Computing differences between target graphs") - - // 1) Extract targets and metadata; index by canonical names - indexStart := time.Now() - firstTargetsByID, firstMetadata, err := getTargetsAndMetadata(ctx, firstGraph) - if err != nil { - return nil, err - } - secondTargetsByID, secondMetadata, err := getTargetsAndMetadata(ctx, secondGraph) - if err != nil { - return nil, err - } - // Release raw chunk slices — individual target protos are now held by the ID maps. - firstGraph = nil - secondGraph = nil - firstByName, err := buildNameIndex(ctx, firstTargetsByID, firstMetadata) - if err != nil { - return nil, err - } - firstTargetsByID = nil // all pointers are now in firstByName; drop the duplicate map - secondByName, err := buildNameIndex(ctx, secondTargetsByID, secondMetadata) - if err != nil { - return nil, err - } - secondTargetsByID = nil - indexDuration := time.Since(indexStart) - scope.Timer("index_duration").Record(indexDuration) - - if ctx.Err() != nil { - return nil, ctx.Err() - } - - sourceFileRuleTypeID := detectSourceFileID(secondMetadata) - - changedByName := make(map[string]*pb.ChangedTarget) - // seeds are targets whose own state changed: NEW, DELETED, a source file - // whose hash changed, or a rule whose attributes or direct-deps changed. - // BFS over reverse-deps assigns distance 0 to seeds and distance >=1 to - // downstream consumers. - seeds := make(map[string]struct{}) - - // 3) Create canonical mappers for IDs (targets, rule types, tags, attributes) - targetMapper := common.NewNameIDMapper() - ruleTypeMapper := common.NewNameIDMapper() - tagMapper := common.NewNameIDMapper() - attrNameMapper := common.NewNameIDMapper() - attrValMapper := common.NewNameIDMapper() - // These functions are used to transpose the target into the canonical ID space. - // When called, we attempt to find the ID for the name in the metadata and return the ID. - getTargetId := func(name string) int32 { return targetMapper.ID(name) } - getRuleTypeId := func(name string) int32 { return ruleTypeMapper.ID(name) } - getTagId := func(name string) int32 { return tagMapper.ID(name) } - getAttrNameId := func(name string) int32 { return attrNameMapper.ID(name) } - getAttrValId := func(name string) int32 { return attrValMapper.ID(name) } - - // Pass 1: walk second revision. Targets not in first revision are NEW (seeds). - // Targets in both with differing hashes are CHANGED; source-file CHANGED - // targets are also seeds. Rules that own a changed source file are promoted - // to seeds in pass 2 via hasChangedSourceFileDep. - diffScanStart := time.Now() - for name, newT := range secondByName { - oldT, exists := firstByName[name] - if !exists { - changedByName[name] = &pb.ChangedTarget{ - ChangeType: pb.CHANGE_TYPE_NEW, - NewTarget: transposeOptimizedTarget( - newT, - secondMetadata.GetTargetIdMapping(), - secondMetadata.GetRuleTypeMapping(), - secondMetadata.GetTagMapping(), - secondMetadata.GetAttributeNameMapping(), - secondMetadata.GetAttributeStringValueMapping(), - getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, - ), - } - seeds[name] = struct{}{} - continue - } - if oldT.GetHash() == newT.GetHash() { - // same hash -> unchanged - continue - } - // Source files with a hash change are seeds; rules will be evaluated - // for own-config changes in pass 2 below. - if sourceFileRuleTypeID != -1 && newT.GetRuleType() == sourceFileRuleTypeID { - seeds[name] = struct{}{} - } - newTarget := transposeOptimizedTarget( - newT, - secondMetadata.GetTargetIdMapping(), - secondMetadata.GetRuleTypeMapping(), - secondMetadata.GetTagMapping(), - secondMetadata.GetAttributeNameMapping(), - secondMetadata.GetAttributeStringValueMapping(), - getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, - ) - oldTarget := transposeOptimizedTarget( - oldT, - firstMetadata.GetTargetIdMapping(), - firstMetadata.GetRuleTypeMapping(), - firstMetadata.GetTagMapping(), - firstMetadata.GetAttributeNameMapping(), - firstMetadata.GetAttributeStringValueMapping(), - getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, - ) - changedByName[name] = &pb.ChangedTarget{ - ChangeType: pb.CHANGE_TYPE_CHANGED, - OldTarget: oldTarget, - NewTarget: newTarget, - } - } - diffScanDuration := time.Since(diffScanStart) - scope.Timer("diff_scan_duration").Record(diffScanDuration) - - if ctx.Err() != nil { - return nil, ctx.Err() - } - - // Pass 2: decide which CHANGED rule targets are seeds (distance 0). - // - // We trust the hasher: a CHANGED entry means the rule's hash differs. - // The only reason a hash change should land at distance >= 1 (rather than - // 0) is when it is fully explained by a *direct dep* having changed — - // i.e. the change is purely transitive. In every other case the rule is - // a seed. - // - // Concretely, attribute / dep-list inspection is only needed to promote - // a rule that BFS would otherwise put at distance 1 (because a dep - // changed) down to distance 0 (because the rule's own configuration - // changed too). If no dep changed, the hash change has no upstream - // explanation and the rule is a seed regardless of what inspection says. - classifyStart := time.Now() - for name, ct := range changedByName { - if _, isSeed := seeds[name]; isSeed { - // Already a seed (NEW or changed source file). - continue - } - if ct.GetChangeType() != pb.CHANGE_TYPE_CHANGED { - continue - } - newT := secondByName[name] - oldT := firstByName[name] - - // Single pass over newT.deps decides two things at once: whether any - // direct dep itself changed (would otherwise transitively explain the - // hash diff at distance >= 1), and whether the rule's own dep-name set - // changed (its own configuration changed → seed). - anyChanged, depsChanged := changedDepStatus(oldT, firstMetadata, newT, secondMetadata, changedByName) - if !anyChanged { - // No direct dep changed: the hash diff has no upstream explanation, - // trust the hasher and seed. - seeds[name] = struct{}{} - continue - } - if depsChanged { - seeds[name] = struct{}{} - continue - } - if hasChangedSourceFileDep(newT, secondMetadata, changedByName, secondByName, sourceFileRuleTypeID) { - seeds[name] = struct{}{} - continue - } - attrsChanged, err := attributesChanged(oldT, firstMetadata, newT, secondMetadata) - if err != nil { - return nil, fmt.Errorf("failed to check attributes changed: %w", err) - } - if attrsChanged { - seeds[name] = struct{}{} - } - } - classifyDuration := time.Since(classifyStart) - scope.Timer("classify_duration").Record(classifyDuration) - - if ctx.Err() != nil { - return nil, ctx.Err() - } - - // Pass 3: emit DELETED entries for targets present only in the first revision. - // Deletions are seeds (distance 0) but have no entries in secondByName / - // reverseDeps, so BFS naturally propagates nothing from them. - for name, oldT := range firstByName { - if _, exists := secondByName[name]; exists { - continue - } - changedByName[name] = &pb.ChangedTarget{ - ChangeType: pb.CHANGE_TYPE_DELETED, - OldTarget: transposeOptimizedTarget( - oldT, - firstMetadata.GetTargetIdMapping(), - firstMetadata.GetRuleTypeMapping(), - firstMetadata.GetTagMapping(), - firstMetadata.GetAttributeNameMapping(), - firstMetadata.GetAttributeStringValueMapping(), - getTargetId, getRuleTypeId, getTagId, getAttrNameId, getAttrValId, - ), - } - seeds[name] = struct{}{} - } - - if ctx.Err() != nil { - return nil, ctx.Err() - } - - // Distances are always computed; seeds get 0, BFS assigns 1+ to consumers. - distancesStart := time.Now() - if err := computeDistances(ctx, changedByName, secondByName, secondMetadata, seeds, maxDist); err != nil { - return nil, err - } - distancesDuration := time.Since(distancesStart) - scope.Timer("distances_duration").Record(distancesDuration) - - if ctx.Err() != nil { - return nil, ctx.Err() - } - - // Collect changed targets. - changed := make([]*pb.ChangedTarget, 0, len(changedByName)) - for _, ct := range changedByName { - changed = append(changed, ct) - } - - // Emit changes in chunks to stay within gRPC per-message size limits, followed by chunked metadata. - var results []*pb.GetChangedTargetsResponse - for i := 0; i < len(changed); i += c.changedTargetChunkSize { - end := i + c.changedTargetChunkSize - if end > len(changed) { - end = len(changed) - } - results = append(results, &pb.GetChangedTargetsResponse{ - Item: &pb.GetChangedTargetsResponse_ChangedTargets{ - ChangedTargets: &pb.ChangedTargets{ - ChangedTargets: changed[i:end], - }, - }, - }) - } - if len(results) == 0 { - results = append(results, &pb.GetChangedTargetsResponse{ - Item: &pb.GetChangedTargetsResponse_ChangedTargets{ - ChangedTargets: &pb.ChangedTargets{}, - }, - }) - } - for _, meta := range common.ChunkMetadata( - targetMapper.Invert(), - ruleTypeMapper.Invert(), - tagMapper.Invert(), - attrNameMapper.Invert(), - attrValMapper.Invert(), - c.metadataMapChunkSize, - ) { - results = append(results, &pb.GetChangedTargetsResponse{ - Item: &pb.GetChangedTargetsResponse_Metadata{ - Metadata: meta, - }, - }) - } - totalDuration := time.Since(start) - logger.Info("compareTargetGraphs: Done", - zap.Duration("total_duration", totalDuration), - ) - scope.Timer("total_duration").Record(totalDuration) - return results, nil -} - -// cancelCheckInterval is how often long-running loops check ctx.Err(). -const cancelCheckInterval = 4096 - -// getTargetsAndMetadata builds ID->target maps and merges metadata from a target graph stream. -// Metadata may arrive in multiple chunks (e.g. when target_id_mapping exceeds the gRPC message -// size limit); all chunks are merged into a single Metadata so callers can use it uniformly. -func getTargetsAndMetadata(ctx context.Context, graph []*pb.GetTargetGraphResponse) (map[int32]*pb.OptimizedTarget, *pb.Metadata, error) { - targets := make(map[int32]*pb.OptimizedTarget) - merged := &pb.Metadata{ - TargetIdMapping: make(map[int32]string), - RuleTypeMapping: make(map[int32]string), - TagMapping: make(map[int32]string), - AttributeNameMapping: make(map[int32]string), - AttributeStringValueMapping: make(map[int32]string), - } - for _, chunk := range graph { - if ctx.Err() != nil { - return nil, nil, ctx.Err() - } - switch item := chunk.GetItem().(type) { - case *pb.GetTargetGraphResponse_Targets: - for _, t := range item.Targets.GetTargets() { - targets[t.GetId()] = t - } - case *pb.GetTargetGraphResponse_Metadata: - m := item.Metadata - for k, v := range m.GetTargetIdMapping() { - merged.TargetIdMapping[k] = v - } - for k, v := range m.GetRuleTypeMapping() { - merged.RuleTypeMapping[k] = v - } - for k, v := range m.GetTagMapping() { - merged.TagMapping[k] = v - } - for k, v := range m.GetAttributeNameMapping() { - merged.AttributeNameMapping[k] = v - } - for k, v := range m.GetAttributeStringValueMapping() { - merged.AttributeStringValueMapping[k] = v - } - } - } - return targets, merged, nil -} - -// buildNameIndex creates name->target maps using the provided metadata information. -func buildNameIndex(ctx context.Context, targetsByID map[int32]*pb.OptimizedTarget, meta *pb.Metadata) (map[string]*pb.OptimizedTarget, error) { - byName := make(map[string]*pb.OptimizedTarget, len(targetsByID)) - i := 0 - for id, t := range targetsByID { - if i%cancelCheckInterval == 0 && ctx.Err() != nil { - return nil, ctx.Err() - } - i++ - name, err := canonicalTargetName(id, meta) - if err != nil { - // If a target ID is missing in metadata, skip it. - continue - } - byName[name] = t - } - return byName, nil -} - -// detectSourceFileID returns the literal rule type name for source file if present. -func detectSourceFileID(meta *pb.Metadata) int32 { - if meta == nil || len(meta.GetRuleTypeMapping()) == 0 { - return -1 - } - // check the id in the rule type mapping for "source file" - for id, name := range meta.GetRuleTypeMapping() { - if name == "source file" { - return id - } - } - return -1 -} - -// canonicalTargetName returns a stable identifier for a target using metadata mapping when available. -func canonicalTargetName(id int32, meta *pb.Metadata) (string, error) { - if meta != nil { - if name, ok := meta.GetTargetIdMapping()[id]; ok && name != "" { - return name, nil - } - } - return "", fmt.Errorf("target id %d not found in metadata", id) -} - -// changedDepStatus reports two facts about a CHANGED rule's direct deps in a -// single pass over newTarget.GetDirectDependencies(): -// - anyChanged: at least one current direct dep is itself CHANGED between -// the two revisions (i.e. appears as CHANGE_TYPE_CHANGED in changedByName). -// - setDiffered: the *set of dep names* — not their hashes — differs between -// old and new. A dep changing its hash while keeping the same name leaves -// setDiffered false; that case is handled by BFS reaching the consumer at -// distance >= 1. -// -// The name-set walk over oldTarget is skipped entirely when lengths already -// disagree (setDiffered is trivially true) or when anyChanged is false and -// the caller will seed the rule regardless of setDiffered. -func changedDepStatus( - oldTarget *pb.OptimizedTarget, - oldMeta *pb.Metadata, - newTarget *pb.OptimizedTarget, - newMeta *pb.Metadata, - changedByName map[string]*pb.ChangedTarget, -) (anyChanged, setDiffered bool) { - if newTarget == nil || newMeta == nil { - return false, false - } - - newDepIDs := newTarget.GetDirectDependencies() - newIDMap := newMeta.GetTargetIdMapping() - - var oldDepIDs []int32 - var oldIDMap map[int32]string - if oldTarget != nil && oldMeta != nil { - oldDepIDs = oldTarget.GetDirectDependencies() - oldIDMap = oldMeta.GetTargetIdMapping() - } - - // If lengths differ, setDiffered is trivially true — no need to allocate - // a name set for membership checks. - lengthsMatch := len(oldDepIDs) == len(newDepIDs) - var newDepSet map[string]struct{} - if lengthsMatch && len(newDepIDs) > 0 { - newDepSet = make(map[string]struct{}, len(newDepIDs)) - } - - for _, depID := range newDepIDs { - name := newIDMap[depID] - if name == "" { - continue - } - if !anyChanged { - if ct, ok := changedByName[name]; ok && ct.GetChangeType() == pb.CHANGE_TYPE_CHANGED { - anyChanged = true - } - } - if newDepSet != nil { - newDepSet[name] = struct{}{} - } - } - - if !lengthsMatch { - return anyChanged, true - } - for _, depID := range oldDepIDs { - name := oldIDMap[depID] - if name == "" { - continue - } - if _, exists := newDepSet[name]; !exists { - return anyChanged, true - } - } - return anyChanged, false -} - -// hasChangedSourceFileDep reports whether any direct dependency of the given -// target is a changed source file. When true the rule's own inputs changed and -// it should be treated as a seed (distance 0) rather than a transitive consumer. -func hasChangedSourceFileDep( - target *pb.OptimizedTarget, - meta *pb.Metadata, - changedByName map[string]*pb.ChangedTarget, - targetsByName map[string]*pb.OptimizedTarget, - sourceFileRuleTypeID int32, -) bool { - if target == nil || meta == nil || sourceFileRuleTypeID == -1 { - return false - } - idMapping := meta.GetTargetIdMapping() - for _, depID := range target.GetDirectDependencies() { - depName := idMapping[depID] - if depName == "" { - continue - } - if _, changed := changedByName[depName]; !changed { - continue - } - if depTarget, ok := targetsByName[depName]; ok && depTarget.GetRuleType() == sourceFileRuleTypeID { - return true - } - } - return false -} - -// attributesChanged checks if the attributes changed between old and new targets. -func attributesChanged(oldTarget *pb.OptimizedTarget, oldMeta *pb.Metadata, newTarget *pb.OptimizedTarget, newMeta *pb.Metadata) (bool, error) { - if oldMeta == nil || newMeta == nil { - return false, nil - } - // validate target names are equivalent. - if err := validateTargetNames(oldTarget, newTarget, oldMeta, newMeta); err != nil { - return false, err - } - - oldAttrIDs := oldTarget.GetAttributes() - newAttrIDs := newTarget.GetAttributes() - - // Early exit: if lengths differ, attributes changed - if len(oldAttrIDs) != len(newAttrIDs) { - return true, nil - } - - // Early exit: if both are empty, no change - if len(oldAttrIDs) == 0 { - return false, nil - } - - // Cache metadata mappings to avoid repeated map lookups - oldAttrNameMapping := oldMeta.GetAttributeNameMapping() - oldAttrValMapping := oldMeta.GetAttributeStringValueMapping() - newAttrNameMapping := newMeta.GetAttributeNameMapping() - newAttrValMapping := newMeta.GetAttributeStringValueMapping() - - // Build map of new attributes (only one map needed) - newAttrMap := make(map[string]string, len(newAttrIDs)) - for attrNameID, attrValID := range newAttrIDs { - if attrName := newAttrNameMapping[attrNameID]; attrName != "" { - newAttrMap[attrName] = newAttrValMapping[attrValID] - } - } - - // Check if all old attributes match - for attrNameID, attrValID := range oldAttrIDs { - if attrName := oldAttrNameMapping[attrNameID]; attrName != "" { - oldVal := oldAttrValMapping[attrValID] - newVal, exists := newAttrMap[attrName] - if !exists || newVal != oldVal { - return true, nil - } - } - } - return false, nil -} - -// validateTargetNames checks if the target names are the same between old and new targets, and exists in both metadata maps. -func validateTargetNames(oldTarget, newTarget *pb.OptimizedTarget, oldMeta, newMeta *pb.Metadata) error { - oldTargetName, ok := oldMeta.GetTargetIdMapping()[oldTarget.GetId()] - if !ok { - return fmt.Errorf("old target id %d not found in metadata", oldTarget.GetId()) - } - newTargetName, ok := newMeta.GetTargetIdMapping()[newTarget.GetId()] - if !ok { - return fmt.Errorf("new target id %d not found in metadata", newTarget.GetId()) - } - if oldTargetName != newTargetName { - return fmt.Errorf("target names are different %s != %s", oldTargetName, newTargetName) - } - return nil -} - -// transposeOptimizedTarget remaps a target into the canonical ID space using name-based mappers. -func transposeOptimizedTarget( - src *pb.OptimizedTarget, - oldTargetIdMap map[int32]string, - oldRuleTypeIdMap map[int32]string, - oldTagIdMap map[int32]string, - attrNameIdMap map[int32]string, - attrValIdMap map[int32]string, - getTargetId func(string) int32, - getRuleTypeId func(string) int32, - getTagId func(string) int32, - getAttrNameId func(string) int32, - getAttrValId func(string) int32, -) *pb.OptimizedTarget { - if src == nil { - return nil - } - dst := &pb.OptimizedTarget{ - Id: getTargetId(oldTargetIdMap[src.GetId()]), - Hash: src.GetHash(), - Root: src.GetRoot(), - External: src.GetExternal(), - } - // Direct deps - deps := src.GetDirectDependencies() - if len(deps) > 0 { - out := make([]int32, 0, len(deps)) - for _, d := range deps { - out = append(out, getTargetId(oldTargetIdMap[d])) - } - dst.DirectDependencies = out - } - // Rule type - if rtName := oldRuleTypeIdMap[src.GetRuleType()]; rtName != "" { - dst.RuleType = getRuleTypeId(rtName) - } - // Tags - if tags := src.GetTags(); len(tags) > 0 { - out := make([]int32, 0, len(tags)) - for _, tg := range tags { - out = append(out, getTagId(oldTagIdMap[tg])) - } - dst.Tags = out - } - // Attributes - if attrs := src.GetAttributes(); len(attrs) > 0 { - out := make(map[int32]int32, len(attrs)) - for k, v := range attrs { - name := attrNameIdMap[k] - val := attrValIdMap[v] - out[getAttrNameId(name)] = getAttrValId(val) - } - dst.Attributes = out - } - return dst -} - // sendTrimmedChangedTargets streams responses to the client, filtering changed targets to those // within maxDist from any distance-0 seed when maxDist >= 0, stripping per-target // hash/tags/attributes per outputConfig's include_* flags, and pruning metadata mappings @@ -951,79 +362,6 @@ func sendTrimmedChangedTargets(stream pb.TangoServiceGetChangedTargetsYARPCServe return nil } -// computeDistances assigns each CHANGED target its BFS distance from the -// nearest distance-0 seed in the reverse-dependency graph. Seeds are passed in -// pre-classified and start at distance 0; everything else starts at -1 and -// gets overwritten if reachable. Targets beyond `maxDistance` (when >= 0) are -// never enqueued, so they keep their initial distance of -1 (out-of-range). -func computeDistances(ctx context.Context, changedByName map[string]*pb.ChangedTarget, targetsByName map[string]*pb.OptimizedTarget, meta *pb.Metadata, seeds map[string]struct{}, maxDistance int32) error { - if meta == nil { - return nil - } - - targetIDMapping := meta.GetTargetIdMapping() - - // Build reverse dependency graph: if B depends on A, then A -> B. - reverseDeps := make(map[string][]string, len(targetsByName)) - revDepIter := 0 - for name, t := range targetsByName { - if revDepIter%cancelCheckInterval == 0 && ctx.Err() != nil { - return ctx.Err() - } - revDepIter++ - for _, depID := range t.GetDirectDependencies() { - depName := targetIDMapping[depID] - if depName != "" { - reverseDeps[depName] = append(reverseDeps[depName], name) - } - } - } - - // Initialize all distances. Seeds at 0 and enqueued; everything else at -1. - var queue []string - visited := make(map[string]struct{}, len(changedByName)) - for name, ct := range changedByName { - if _, isSeed := seeds[name]; isSeed { - ct.Distance = 0 - queue = append(queue, name) - visited[name] = struct{}{} - } else { - ct.Distance = -1 - } - } - - // BFS from seeds through reverseDeps. Shortest distance wins. - bfsIter := 0 - for len(queue) > 0 { - if bfsIter%cancelCheckInterval == 0 && ctx.Err() != nil { - return ctx.Err() - } - bfsIter++ - current := queue[0] - queue = queue[1:] - currentDist := changedByName[current].GetDistance() - - for _, revDep := range reverseDeps[current] { - // BFS guarantees shortest distance, so skip if already visited. - if _, seen := visited[revDep]; seen { - continue - } - nextDist := currentDist + 1 - // Prune: if a maxDistance is set and the next distance exceeds it, skip. - if maxDistance >= 0 && nextDist > maxDistance { - continue - } - visited[revDep] = struct{}{} - queue = append(queue, revDep) - - if ct, ok := changedByName[revDep]; ok { - ct.Distance = nextDist - } - } - } - return nil -} - // validateGetChangedTargetsRequest enforces the minimal invariants the // comparison pipeline relies on: both revisions present, both populated // with a remote and base SHA, and both pointing at the same remote. diff --git a/controller/getchangedtargets_test.go b/controller/getchangedtargets_test.go index 1e854fc..7596e8c 100644 --- a/controller/getchangedtargets_test.go +++ b/controller/getchangedtargets_test.go @@ -133,7 +133,6 @@ func TestValidateGetChangedTargetsRequest(t *testing.T) { } func TestCompareTargetGraphs(t *testing.T) { - c := newTestController(zap.NewNop()) firstGraph := &pb.GetTargetGraphResponse{ Item: &pb.GetTargetGraphResponse_Metadata{ @@ -146,7 +145,7 @@ func TestCompareTargetGraphs(t *testing.T) { }, } - response, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1) + response, err := newTestComparer().Compare(context.Background(), zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1) require.NoError(t, err) require.NotNil(t, response) } @@ -576,7 +575,6 @@ func TestGetChangedTargets_CacheWriteUsesAppCtx(t *testing.T) { } func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) first := []*pb.GetTargetGraphResponse{ { @@ -613,7 +611,7 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) require.Len(t, res, 2) cs := res[0].GetChangedTargets() @@ -629,7 +627,6 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) { } func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: source file A (id 1, hash h1), lib L (id 2, hash h1, dep -> A) first := []*pb.GetTargetGraphResponse{ @@ -685,7 +682,7 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -713,7 +710,6 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) { } func TestCompareTargetGraphs_ChangedRuleUnreachableFromAnySeed(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: T (id 1, rule), no deps first := []*pb.GetTargetGraphResponse{ @@ -758,7 +754,7 @@ func TestCompareTargetGraphs_ChangedRuleUnreachableFromAnySeed(t *testing.T) { // Hash-only change on a rule with no own-config change and no reachable // seed: under "trust the hasher" semantics, an orphan CHANGED rule with // no upstream explanation becomes a distance-0 seed itself. - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -769,7 +765,6 @@ func TestCompareTargetGraphs_ChangedRuleUnreachableFromAnySeed(t *testing.T) { } func TestCompareTargetGraphs_ChangedWhenDependenciesChanged(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: T (id 1, rule) with deps on A first := []*pb.GetTargetGraphResponse{ @@ -825,7 +820,7 @@ func TestCompareTargetGraphs_ChangedWhenDependenciesChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -845,7 +840,6 @@ func TestCompareTargetGraphs_ChangedWhenDependenciesChanged(t *testing.T) { } func TestCompareTargetGraphs_ChangedWhenAttributesChanged(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: T with attribute "key1" -> "value1" first := []*pb.GetTargetGraphResponse{ @@ -907,7 +901,7 @@ func TestCompareTargetGraphs_ChangedWhenAttributesChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -918,7 +912,6 @@ func TestCompareTargetGraphs_ChangedWhenAttributesChanged(t *testing.T) { } func TestCompareTargetGraphs_ChangedWhenNewAttributeAdded(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: T with one attribute first := []*pb.GetTargetGraphResponse{ @@ -989,7 +982,7 @@ func TestCompareTargetGraphs_ChangedWhenNewAttributeAdded(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1256,7 +1249,6 @@ func TestComputeDistances_NilMetadata(t *testing.T) { } func TestCompareTargetGraphs_HashOnlyChangePropagatesViaBFS(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: T (rule) with deps on source file A (id 10) and attributes first := []*pb.GetTargetGraphResponse{ @@ -1329,7 +1321,7 @@ func TestCompareTargetGraphs_HashOnlyChangePropagatesViaBFS(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1349,7 +1341,6 @@ func TestCompareTargetGraphs_HashOnlyChangePropagatesViaBFS(t *testing.T) { } func TestCompareTargetGraphs_SiblingRuleNotPromotedToSeed(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Source file A (id 1) owned by rule L (id 2). // Rule T (id 3) depends on L (sibling rule), NOT directly on A. @@ -1411,7 +1402,7 @@ func TestCompareTargetGraphs_SiblingRuleNotPromotedToSeed(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1428,7 +1419,6 @@ func TestCompareTargetGraphs_SiblingRuleNotPromotedToSeed(t *testing.T) { } func TestCompareTargetGraphs_DeletedTargetEmitted(t *testing.T) { - c := newTestController(zaptest.NewLogger(t)) // Old: T (rule) exists; New: T is gone. first := []*pb.GetTargetGraphResponse{ @@ -1465,7 +1455,7 @@ func TestCompareTargetGraphs_DeletedTargetEmitted(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := newTestComparer().Compare(context.Background(), zap.NewNop(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) diff --git a/controller/testhelper_test.go b/controller/testhelper_test.go index 56ff439..dd6dad7 100644 --- a/controller/testhelper_test.go +++ b/controller/testhelper_test.go @@ -15,21 +15,10 @@ package controller import ( - "context" - "github.com/uber-go/tally" "github.com/uber/tango/core/common" - "go.uber.org/zap" ) -func newTestController(logger *zap.Logger) *controller { - return &controller{ - logger: logger, - scope: tally.NoopScope, - targetChunkSize: common.DefaultTargetChunkSize, - changedTargetChunkSize: common.DefaultChangedTargetChunkSize, - metadataMapChunkSize: common.DefaultMetadataMapChunkSize, - totalDurationBuckets: _totalDurationBuckets, - appCtx: context.Background(), - } +func newTestComparer() *targetGraphComparer { + return newTargetGraphComparer(tally.NoopScope, common.DefaultChangedTargetChunkSize, common.DefaultMetadataMapChunkSize) } diff --git a/core/repomanager/BUILD.bazel b/core/repomanager/BUILD.bazel index eaf89e9..b076d2c 100644 --- a/core/repomanager/BUILD.bazel +++ b/core/repomanager/BUILD.bazel @@ -9,6 +9,7 @@ go_library( "//core/git", "//core/workspace", "//tangopb", + "@org_uber_go_fx//:fx", "@org_uber_go_zap//:zap", ], ) diff --git a/core/repomanager/repo_manager.go b/core/repomanager/repo_manager.go index 2bd5f60..a602a73 100644 --- a/core/repomanager/repo_manager.go +++ b/core/repomanager/repo_manager.go @@ -25,6 +25,7 @@ import ( "github.com/uber/tango/core/git" "github.com/uber/tango/core/workspace" "github.com/uber/tango/tangopb" + "go.uber.org/fx" "go.uber.org/zap" ) @@ -42,17 +43,6 @@ type repoManager struct { mu sync.Mutex pools map[string]*workerPool - - // appCtx represents the app's overall lifetime. It is passed in by the - // caller at construction and is expected to be cancelled when the whole - // application is shutting down (e.g. on SIGTERM/SIGINT). Any future - // fire-and-forget goroutines this manager starts should use this context - // instead of context.Background() so they abort promptly on shutdown - // rather than running unbounded past server teardown. - // - // Per-request cancellation should still use the request's own context; - // appCtx is only for work that intentionally outlives the request. - appCtx context.Context } // workerPool manages a fixed set of worker slots for a single repo. @@ -74,19 +64,17 @@ type workerSlot struct { // Params for creating a RepoManager. type Params struct { + fx.In + Git git.Interface Logger *zap.SugaredLogger - RepoManagerClonePath string - WorkerRootPath string - PoolSize int + RepoManagerClonePath string `name:"repoManagerClonePath"` + WorkerRootPath string `name:"workerRootPath"` + PoolSize int `name:"workerPoolSize"` } // NewRepoManager creates a new repo manager with pooled worker workspaces. -// -// appCtx is the application-lifetime context. Cancel it when the process is -// shutting down (e.g. wire it to SIGTERM/SIGINT in main) to abort any -// background goroutines the manager spawns. -func NewRepoManager(appCtx context.Context, p Params) RepoManager { +func NewRepoManager(p Params) RepoManager { return &repoManager{ git: p.Git, repoManagerClonePath: p.RepoManagerClonePath, @@ -94,7 +82,6 @@ func NewRepoManager(appCtx context.Context, p Params) RepoManager { logger: p.Logger, poolSize: p.PoolSize, pools: make(map[string]*workerPool), - appCtx: appCtx, } } diff --git a/core/repomanager/repo_manager_test.go b/core/repomanager/repo_manager_test.go index 1d0b951..1da9cca 100644 --- a/core/repomanager/repo_manager_test.go +++ b/core/repomanager/repo_manager_test.go @@ -43,7 +43,7 @@ func TestLease_ClonesOriginAndCreatesWorker(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) assert.Equal(t, workerDir, ws.Path()) @@ -65,7 +65,7 @@ func TestLease_SkipsOriginClone_WhenExists(t *testing.T) { // Only worker clone expected g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) assert.Equal(t, workerDir, ws.Path()) @@ -86,7 +86,7 @@ func TestLease_ReusesWorker_AfterRelease(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() ws1, err := rm.Lease(ctx, tangopb.BuildDescription{Remote: remote}) @@ -115,7 +115,7 @@ func TestLease_CreatesMultipleWorkers(t *testing.T) { g.EXPECT().Clone(gomock.Any(), originDir, dir, "--local", "-c", "gc.auto=0").Return(nil) } - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 2}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 2}) ctx := context.Background() ws1, err := rm.Lease(ctx, tangopb.BuildDescription{Remote: remote}) @@ -141,7 +141,7 @@ func TestLease_BlocksUntilReturn(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() ws1, err := rm.Lease(ctx, tangopb.BuildDescription{Remote: remote}) @@ -187,7 +187,7 @@ func TestLease_CtxCanceled(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws1, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) @@ -209,7 +209,7 @@ func TestLease_OriginCloneFails(t *testing.T) { remote := "git@github.com:org/repo" g.EXPECT().Clone(gomock.Any(), remote, filepath.Join(root, "org/repo"), "-c", "gc.auto=0").Return(assert.AnError) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) _, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.Error(t, err) assert.Contains(t, err.Error(), "clone origin") @@ -228,7 +228,7 @@ func TestLease_WorkerCloneFails(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(assert.AnError) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) _, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.Error(t, err) assert.Contains(t, err.Error(), "create worker") @@ -247,7 +247,7 @@ func TestLease_DiscoversExistingWorker(t *testing.T) { require.NoError(t, os.MkdirAll(filepath.Join(root, ".workers", "org/repo", "worker-1", ".git"), 0o755)) // No Clone calls — everything already exists - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) assert.Contains(t, ws.Path(), "worker-1") @@ -273,7 +273,7 @@ func TestLease_DifferentRepos_IndependentPools(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote2, origin2, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), origin2, worker2, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() // Both repos can be leased concurrently even with pool size 1 @@ -306,7 +306,7 @@ func TestLease_WorkerCloneFails_SlotReturnedToPool(t *testing.T) { g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil), ) - rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() // First attempt fails diff --git a/example/BUILD.bazel b/example/BUILD.bazel index 0350ddd..3ed0b18 100644 --- a/example/BUILD.bazel +++ b/example/BUILD.bazel @@ -7,13 +7,12 @@ go_library( visibility = ["//visibility:private"], deps = [ "//config", - "//controller", "//core/git", - "//core/repomanager", "//core/storage", "//core/storage/disk", - "//orchestrator", + "//module", "//tangopb", + "@org_uber_go_fx//:fx", "@org_uber_go_yarpc//:yarpc", "@org_uber_go_yarpc//api/transport", "@org_uber_go_yarpc//transport/grpc", diff --git a/example/main.go b/example/main.go index 2f4077a..300c9bd 100644 --- a/example/main.go +++ b/example/main.go @@ -19,18 +19,15 @@ import ( "fmt" "net" "os" - "os/signal" "path/filepath" - "syscall" "github.com/uber/tango/config" - "github.com/uber/tango/controller" "github.com/uber/tango/core/git" - "github.com/uber/tango/core/repomanager" "github.com/uber/tango/core/storage" "github.com/uber/tango/core/storage/disk" - "github.com/uber/tango/orchestrator" + tangomodule "github.com/uber/tango/module" pb "github.com/uber/tango/tangopb" + "go.uber.org/fx" "go.uber.org/yarpc" "go.uber.org/yarpc/api/transport" yarpcgrpc "go.uber.org/yarpc/transport/grpc" @@ -38,101 +35,118 @@ import ( ) func main() { - if err := run(); err != nil { - fmt.Fprintf(os.Stderr, "Failed to start: %v\n", err) - os.Exit(1) - } + fx.New( + tangomodule.Module, + fx.Provide( + provideConfig, + provideStorage, + provideLoggers, + provideGit, + provideAppCtx, + ), + fx.Invoke(startServer), + ).Run() } -func run() error { - zl, _ := zap.NewDevelopment() - defer zl.Sync() - logger := zl.Sugar() - - // appCtx is the application-lifetime context. It is cancelled on - // SIGINT/SIGTERM so background work (e.g. the controller's async - // cache write) is aborted instead of leaking past process exit. - appCtx, stopSignals := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) - defer stopSignals() - +func provideConfig() (*config.Config, config.RepositoryConfigProvider, error) { configFilePath := filepath.Join("example", "tango-config.yaml") cfg, err := config.Parse(configFilePath) if err != nil { - return fmt.Errorf("failed to parse config: %w", err) + return nil, nil, fmt.Errorf("failed to parse config: %w", err) } + return cfg, cfg, nil +} - store, err := newStorage(cfg.Storage) - if err != nil { - return fmt.Errorf("failed to create storage: %w", err) +func provideStorage(cfg *config.Config) (storage.Storage, error) { + return newStorage(cfg.Storage) +} + +type loggerResult struct { + fx.Out + Logger *zap.Logger + SugaredLogger *zap.SugaredLogger +} + +func provideLoggers() loggerResult { + zl, _ := zap.NewDevelopment() + return loggerResult{ + Logger: zl, + SugaredLogger: zl.Sugar(), } - logger.Infof("Using storage type: %s", cfg.Storage.Type) +} - // Repo manager and orchestrator +type serviceParams struct { + fx.Out + RepoManagerClonePath string `name:"repoManagerClonePath"` + WorkerRootPath string `name:"workerRootPath"` + WorkerPoolSize int `name:"workerPoolSize"` +} + +func provideGit(lc fx.Lifecycle, cfg *config.Config) (git.Interface, serviceParams, error) { repoManagerClonePath := cfg.Service.RepoManagerClonePath workerRootPath := cfg.Service.WorkerRootPath if err := os.MkdirAll(repoManagerClonePath, 0o755); err != nil { - return fmt.Errorf("failed to create repo manager clone path: %w", err) + return nil, serviceParams{}, fmt.Errorf("failed to create repo manager clone path: %w", err) } - defer os.RemoveAll(repoManagerClonePath) if err := os.MkdirAll(workerRootPath, 0o755); err != nil { - return fmt.Errorf("failed to create worker root path: %w", err) + return nil, serviceParams{}, fmt.Errorf("failed to create worker root path: %w", err) } - defer os.RemoveAll(workerRootPath) - - rm := repomanager.NewRepoManager(appCtx, repomanager.Params{ - Git: git.New(repoManagerClonePath), - Logger: logger, + lc.Append(fx.StopHook(func() { + os.RemoveAll(repoManagerClonePath) + os.RemoveAll(workerRootPath) + })) + return git.New(repoManagerClonePath), serviceParams{ RepoManagerClonePath: repoManagerClonePath, WorkerRootPath: workerRootPath, - PoolSize: cfg.Service.WorkerPoolSize, - }) - orch := orchestrator.NewNativeOrchestrator(appCtx, orchestrator.Params{ - Storage: store, - RepoManager: rm, - Logger: logger, - GitFactory: git.New, - ConfigFilePath: configFilePath, - }) + WorkerPoolSize: cfg.Service.WorkerPoolSize, + }, nil +} - // Controller (YARPC server implementation). appCtx is forwarded so the - // controller's background goroutines are tied to process lifetime. - ctrl := controller.NewController(appCtx, controller.Params{ - Logger: zl, - Storage: store, - Orchestrator: orch, - }) +type appCtxResult struct { + fx.Out + AppCtx context.Context `name:"appCtx"` +} + +func provideAppCtx(lc fx.Lifecycle) appCtxResult { + ctx, cancel := context.WithCancel(context.Background()) + lc.Append(fx.StopHook(cancel)) + return appCtxResult{AppCtx: ctx} +} - // YARPC transports and dispatcher +type serverParams struct { + fx.In + Server pb.TangoYARPCServer + Logger *zap.SugaredLogger +} + +func startServer(lc fx.Lifecycle, p serverParams) error { grpcTransport := yarpcgrpc.NewTransport() port := "127.0.0.1:8081" grpcListener, err := net.Listen("tcp", port) if err != nil { return fmt.Errorf("failed to listen on gRPC port: %w", err) } - - inbounds := []transport.Inbound{ - grpcTransport.NewInbound(grpcListener), - } dispatcher := yarpc.NewDispatcher(yarpc.Config{ - Name: "tango", - Inbounds: inbounds, + Name: "tango", + Inbounds: []transport.Inbound{ + grpcTransport.NewInbound(grpcListener), + }, + }) + dispatcher.Register(pb.BuildTangoYARPCProcedures(p.Server)) + + lc.Append(fx.Hook{ + OnStart: func(context.Context) error { + p.Logger.Infof("Tango server starting:") + p.Logger.Infof("- gRPC inbound: %s", port) + return dispatcher.Start() + }, + OnStop: func(context.Context) error { + return dispatcher.Stop() + }, }) - dispatcher.Register(pb.BuildTangoYARPCProcedures(ctrl)) - - if err := dispatcher.Start(); err != nil { - return fmt.Errorf("failed to start dispatcher: %w", err) - } - defer dispatcher.Stop() - - logger.Infof("Tango server is running:") - logger.Infof("- gRPC inbound: %s", port) - logger.Infof("Press Ctrl+C to stop.") - // Block until SIGINT/SIGTERM cancels appCtx. - <-appCtx.Done() return nil } -// newStorage creates a Storage implementation based on the provided configuration. func newStorage(cfg config.StorageConfig) (storage.Storage, error) { switch cfg.Type { case config.StorageTypeMemory, "": diff --git a/module/BUILD.bazel b/module/BUILD.bazel new file mode 100644 index 0000000..c0acaf4 --- /dev/null +++ b/module/BUILD.bazel @@ -0,0 +1,15 @@ +load("@rules_go//go:def.bzl", "go_library") + +go_library( + name = "module", + srcs = ["module.go"], + importpath = "github.com/uber/tango/module", + visibility = ["//visibility:public"], + deps = [ + "//controller", + "//core/repomanager", + "//orchestrator", + "//tangopb", + "@org_uber_go_fx//:fx", + ], +) diff --git a/module/module.go b/module/module.go new file mode 100644 index 0000000..bf2766b --- /dev/null +++ b/module/module.go @@ -0,0 +1,50 @@ +// Copyright (c) 2025 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package module provides the Fx dependency injection wiring for all Tango layers. +package module + +import ( + "github.com/uber/tango/controller" + "github.com/uber/tango/core/repomanager" + "github.com/uber/tango/orchestrator" + pb "github.com/uber/tango/tangopb" + "go.uber.org/fx" +) + +// Module wires all Tango service layers: controller, orchestrator, and repo manager. +// +// Callers must supply the following values before including this module: +// - context.Context named "appCtx" — the application-lifetime context (used by the controller for background work) +// - *zap.Logger and *zap.SugaredLogger — structured loggers +// - storage.Storage — blob storage backend +// - config.RepositoryConfigProvider — repository configuration +// - git.Interface — git CLI wrapper (for repo manager origin operations) +// - string named "repoManagerClonePath" — directory for origin clones +// - string named "workerRootPath" — directory for worker clones +// - int named "workerPoolSize" — max workers per repo +var Module = fx.Module("tango", + fx.Provide( + repomanager.NewRepoManager, + fx.Annotate( + orchestrator.NewNativeOrchestrator, + fx.As(new(orchestrator.Orchestrator)), + ), + fx.Annotate( + controller.NewController, + fx.ParamTags(`name:"appCtx"`, ``), + fx.As(new(pb.TangoYARPCServer)), + ), + ), +) diff --git a/orchestrator/BUILD.bazel b/orchestrator/BUILD.bazel index 64ceb5c..fd55806 100644 --- a/orchestrator/BUILD.bazel +++ b/orchestrator/BUILD.bazel @@ -20,6 +20,7 @@ go_library( "//graphrunner", "//tangopb", "@com_github_uber_go_tally//:tally", + "@org_uber_go_fx//:fx", "@org_uber_go_zap//:zap", ], ) @@ -30,13 +31,16 @@ go_test( data = glob(["testdata/**"]), embed = [":orchestrator"], deps = [ + "//config", "//core/git", "//core/git/gitmock", "//core/repomanager/mock", "//core/storage", "//core/storage/storagemock", "//core/targethasher", + "//core/workspace", "//core/workspace/workspacemock", + "//graphrunner", "//graphrunner/mock", "//tangopb", "@com_github_gogo_protobuf//io", diff --git a/orchestrator/native_orchestrator.go b/orchestrator/native_orchestrator.go index 58d7ac5..0fba797 100644 --- a/orchestrator/native_orchestrator.go +++ b/orchestrator/native_orchestrator.go @@ -20,7 +20,6 @@ import ( "errors" "fmt" "os" - "time" "github.com/uber-go/tally" @@ -32,142 +31,160 @@ import ( "github.com/uber/tango/core/storage" "github.com/uber/tango/core/workspace" "github.com/uber/tango/graphrunner" + pb "github.com/uber/tango/tangopb" + "go.uber.org/fx" "go.uber.org/zap" ) +// GraphRunnerFactory creates a GraphRunner for a materialized workspace. +// Set at construction time so the request path has no nil-check branching. +type GraphRunnerFactory func(ctx context.Context, ws workspace.Workspace, gitClient git.Interface, repoCfg config.RepositoryConfig, extraExcludedFiles []string) (graphrunner.GraphRunner, error) + // nativeOrchestrator implements native version of Orchestrator type nativeOrchestrator struct { - storage storage.Storage - repoManager repomanager.RepoManager - logger *zap.SugaredLogger - scope tally.Scope - // gitFactory allows injecting a git.Interface constructor for testing - gitFactory func(directory string) git.Interface - graphRunner graphrunner.GraphRunner - configFilePath string - - // appCtx represents the app's overall lifetime. It is passed in by the - // caller at construction and is expected to be cancelled when the whole - // application is shutting down (e.g. on SIGTERM/SIGINT). Any future - // fire-and-forget goroutines this orchestrator starts should use this - // context instead of context.Background() so they abort promptly on - // shutdown rather than running unbounded past server teardown. - // - // Per-request cancellation should still use the request's own context; - // appCtx is only for work that intentionally outlives the request. - appCtx context.Context + storage storage.Storage + repoManager repomanager.RepoManager + logger *zap.SugaredLogger + gitFactory func(directory string) git.Interface + graphRunnerFactory GraphRunnerFactory + config config.RepositoryConfigProvider + + // metrics are pre-created at construction time. + getTargetGraphScope tally.Scope + getTargetGraphCalls tally.Counter + getTargetGraphSuccess tally.Counter + getTargetGraphFailure tally.Counter } type Params struct { - Storage storage.Storage - RepoManager repomanager.RepoManager - Logger *zap.SugaredLogger - Scope tally.Scope - GitFactory func(directory string) git.Interface - GraphRunner graphrunner.GraphRunner - ConfigFilePath string + fx.In + + Storage storage.Storage + RepoManager repomanager.RepoManager + Logger *zap.SugaredLogger + Scope tally.Scope `optional:"true"` + GitFactory func(directory string) git.Interface `optional:"true"` + GraphRunnerFactory GraphRunnerFactory `optional:"true"` + Config config.RepositoryConfigProvider } // NewNativeOrchestrator creates a new native orchestrator with the given parameters. -// -// appCtx is the application-lifetime context. Cancel it when the process is -// shutting down (e.g. wire it to SIGTERM/SIGINT in main) to abort any -// background goroutines the orchestrator spawns. -func NewNativeOrchestrator(appCtx context.Context, p Params) Orchestrator { +func NewNativeOrchestrator(p Params) Orchestrator { scope := p.Scope if scope == nil { scope = tally.NoopScope } + gitFactory := p.GitFactory + if gitFactory == nil { + gitFactory = git.New + } + orchScope := scope.SubScope("orchestrator") + grFactory := p.GraphRunnerFactory + if grFactory == nil { + grFactory = defaultGraphRunnerFactory(p.Logger, orchScope) + } + gtgScope := orchScope.SubScope("get_target_graph") return &nativeOrchestrator{ - storage: p.Storage, - repoManager: p.RepoManager, - logger: p.Logger, - scope: scope.SubScope("orchestrator"), - gitFactory: p.GitFactory, - graphRunner: p.GraphRunner, - configFilePath: p.ConfigFilePath, - appCtx: appCtx, + storage: p.Storage, + repoManager: p.RepoManager, + logger: p.Logger, + gitFactory: gitFactory, + graphRunnerFactory: grFactory, + config: p.Config, + getTargetGraphScope: gtgScope, + getTargetGraphCalls: gtgScope.Counter("calls"), + getTargetGraphSuccess: gtgScope.Counter("success"), + getTargetGraphFailure: gtgScope.Counter("failure"), } } -// GetTargetGraph is used to compute the target graph locally. -// It leases a workspace, checks out the base revision, applies the change requests, and computes the target graph. -func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTargetGraphParam) (_ storage.GraphReader, retErr error) { - scope := b.scope.SubScope("get_target_graph") - scope.Counter("calls").Inc(1) - defer func() { - if retErr != nil { - scope.Counter("failure").Inc(1) - var ce common.ClassifiedError - if !errors.As(retErr, &ce) { - ce = common.WithReason(common.FailureReasonUnknown, common.ErrorTypeInfra, retErr) - } - scope.Tagged(map[string]string{ - "failure_type": ce.Type(), - "failure_reason": ce.Reason(), - }).Counter("failure_type").Inc(1) - } else { - scope.Counter("success").Inc(1) +// defaultGraphRunnerFactory returns a factory that creates a NativeGraphRunner +// backed by a bazel client for the given workspace. +func defaultGraphRunnerFactory(logger *zap.SugaredLogger, scope tally.Scope) GraphRunnerFactory { + return func(ctx context.Context, ws workspace.Workspace, gitClient git.Interface, repoCfg config.RepositoryConfig, extraExcludedFiles []string) (graphrunner.GraphRunner, error) { + client, err := bazel.NewBazelClient(ctx, bazel.Params{ + WorkspacePath: ws.Path(), + Logger: logger, + BazelCommand: repoCfg.BazelCommand, + QueryTimeout: time.Duration(repoCfg.QueryTimeout) * time.Second, + StreamLogs: repoCfg.StreamBazelLogs, + }) + if err != nil { + return nil, fmt.Errorf("create bazel client: %w", err) } - }() - logger := b.logger.With(zap.Any("build_description", param.Req.BuildDescription)) - logger.Infow("GetTargetGraph: Processing request") - - // parse the config file - cfg, err := config.Parse(b.configFilePath) - if err != nil { - return nil, fmt.Errorf("parse config %q: %w", b.configFilePath, err) + return graphrunner.NewNativeGraphRunner(graphrunner.NativeGraphRunnerParams{ + BazelClient: client, + GitClient: gitClient, + Config: repoCfg, + ExtraExcludedFiles: extraExcludedFiles, + Scope: scope, + }), nil } - remote := param.Req.BuildDescription.Remote - repoCfg, ok := cfg.GetRepositoryConfig(remote) +} + +// preparedWorkspace holds the result of leasing and preparing a workspace. +type preparedWorkspace struct { + ws workspace.Workspace + git git.Interface + repoCfg config.RepositoryConfig +} + +// prepareWorkspace leases a workspace, checks out the base revision, and applies +// change requests. The returned cleanup function releases the workspace and must +// be deferred by the caller. +func (b *nativeOrchestrator) prepareWorkspace(ctx context.Context, desc *pb.BuildDescription, logger *zap.SugaredLogger) (_ *preparedWorkspace, cleanup func(), retErr error) { + repoCfg, ok := b.config.GetRepositoryConfig(desc.Remote) if !ok { - return nil, fmt.Errorf("no repository configuration found for remote %q", remote) + return nil, nil, fmt.Errorf("no repository configuration found for remote %q", desc.Remote) } - ws, err := b.repoManager.Lease(ctx, *param.Req.BuildDescription) + ws, err := b.repoManager.Lease(ctx, *desc) if err != nil { - return nil, fmt.Errorf("lease workspace: %w", err) + return nil, nil, fmt.Errorf("lease workspace: %w", err) } - defer func() { - err := ws.Release() - if err != nil { - // clean up the workspace if release fails. + cleanup = func() { + if err := ws.Release(); err != nil { if removeErr := os.RemoveAll(ws.Path()); removeErr != nil { logger.Errorw("GetTargetGraph: Failed to remove workspace", zap.Error(removeErr)) } } + } + defer func() { + if retErr != nil { + cleanup() + } }() - err = ws.Checkout(ctx, param.Req.BuildDescription.Remote, param.Req.BuildDescription.BaseSha) - if err != nil { - return nil, fmt.Errorf("checkout %s@%s: %w", param.Req.BuildDescription.Remote, param.Req.BuildDescription.BaseSha, err) + if err := ws.Checkout(ctx, desc.Remote, desc.BaseSha); err != nil { + return nil, nil, fmt.Errorf("checkout %s@%s: %w", desc.Remote, desc.BaseSha, err) } logger.Infow("GetTargetGraph: Checked out base revision") - requests := make([]workspace.Request, 0, len(param.Req.BuildDescription.Requests)) - factory := b.gitFactory - if factory == nil { - factory = git.New - } - - gitModule := factory(ws.Path()) - for _, req := range param.Req.BuildDescription.Requests { - request, err := workspace.NewRequest(req.GetUrl(), gitModule, param.Req.BuildDescription.BaseSha, req.GetCommit(), logger) + gitModule := b.gitFactory(ws.Path()) + requests := make([]workspace.Request, 0, len(desc.Requests)) + for _, req := range desc.Requests { + r, err := workspace.NewRequest(req.GetUrl(), gitModule, desc.BaseSha, req.GetCommit(), logger) if err != nil { - return nil, fmt.Errorf("create request for %q: %w", req.GetUrl(), err) + return nil, nil, fmt.Errorf("create request for %q: %w", req.GetUrl(), err) } - requests = append(requests, request) + requests = append(requests, r) } - err = ws.ApplyRequests(ctx, requests) - if err != nil { - return nil, fmt.Errorf("apply requests: %w", err) + if err := ws.ApplyRequests(ctx, requests); err != nil { + return nil, nil, fmt.Errorf("apply requests: %w", err) } logger.Infow("GetTargetGraph: Applied requests", zap.Int("request_count", len(requests))) - // Compute the treehash and download the target graph from storage if exists. - treehash, err := gitModule.RevParse(ctx, "HEAD^{tree}") + return &preparedWorkspace{ws: ws, git: gitModule, repoCfg: repoCfg}, cleanup, nil +} + +// computeGraph computes (or fetches from cache) the target graph for a prepared +// workspace and stores the result. +func (b *nativeOrchestrator) computeGraph(ctx context.Context, pw *preparedWorkspace, param GetTargetGraphParam, logger *zap.SugaredLogger) (storage.GraphReader, error) { + desc := param.Req.BuildDescription + treehash, err := pw.git.RevParse(ctx, "HEAD^{tree}") if err != nil { return nil, fmt.Errorf("compute treehash: %w", err) } - treehashPath := common.GetGraphByTreeHash(param.Req.BuildDescription.Remote, treehash, param.Req.BuildDescription.GetStrategy(), param.Req.GetRequestOptions()) + treehashPath := common.GetGraphByTreeHash(desc.Remote, treehash, desc.GetStrategy(), param.Req.GetRequestOptions()) + if !param.BypassCache { graphReader, err := storage.NewGraphReader(ctx, b.storage, treehashPath) if err == nil { @@ -181,29 +198,12 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget } else { logger.Infow("GetTargetGraph: bypass_cache=true, computing target graph") } - // Compute the target graph and store it in storage. - runner := b.graphRunner - if runner == nil { - client, err := bazel.NewBazelClient(ctx, bazel.Params{ - WorkspacePath: ws.Path(), - Logger: b.logger, - BazelCommand: repoCfg.BazelCommand, - QueryTimeout: time.Duration(repoCfg.QueryTimeout) * time.Second, - StreamLogs: repoCfg.StreamBazelLogs, - }) - if err != nil { - return nil, fmt.Errorf("create bazel client: %w", err) - } - // Use default native graph runner - runner = graphrunner.NewNativeGraphRunner(graphrunner.NativeGraphRunnerParams{ - BazelClient: client, - GitClient: gitModule, - Config: repoCfg, - ExtraExcludedFiles: param.Req.GetRequestOptions().GetExtraExcludeFilesRegex(), - Scope: b.scope, - }) + + runner, err := b.graphRunnerFactory(ctx, pw.ws, pw.git, pw.repoCfg, param.Req.GetRequestOptions().GetExtraExcludeFilesRegex()) + if err != nil { + return nil, err } - result, err := runner.Compute(ctx, ws) + result, err := runner.Compute(ctx, pw.ws) if err != nil { return nil, fmt.Errorf("compute target graph: %w", err) } @@ -211,14 +211,11 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget if err != nil { return nil, fmt.Errorf("convert target graph to response: %w", err) } - err = storage.WriteGraphStream(ctx, b.storage, treehashPath, responses) - if err != nil { + if err := storage.WriteGraphStream(ctx, b.storage, treehashPath, responses); err != nil { return nil, fmt.Errorf("write graph to storage at %s: %w", treehashPath, err) } - treehashCachePath := common.GetTreehashCachePath(param.Req.BuildDescription) - treehashReader := bytes.NewReader([]byte(treehash)) - err = b.storage.Put(ctx, storage.UploadRequest{Key: treehashCachePath, Reader: treehashReader}) - if err != nil { + treehashCachePath := common.GetTreehashCachePath(desc) + if err := b.storage.Put(ctx, storage.UploadRequest{Key: treehashCachePath, Reader: bytes.NewReader([]byte(treehash))}); err != nil { return nil, fmt.Errorf("store treehash mapping at %s: %w", treehashCachePath, err) } graphReader, err := storage.NewGraphReader(ctx, b.storage, treehashPath) @@ -228,3 +225,34 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget logger.Infow("GetTargetGraph: Done computing and storing target graph", zap.String("treehash", treehash)) return graphReader, nil } + +// GetTargetGraph computes the target graph locally. +func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTargetGraphParam) (_ storage.GraphReader, retErr error) { + b.getTargetGraphCalls.Inc(1) + defer func() { + if retErr != nil { + b.getTargetGraphFailure.Inc(1) + var ce common.ClassifiedError + if !errors.As(retErr, &ce) { + ce = common.WithReason(common.FailureReasonUnknown, common.ErrorTypeInfra, retErr) + } + b.getTargetGraphScope.Tagged(map[string]string{ + "failure_type": ce.Type(), + "failure_reason": ce.Reason(), + }).Counter("failure_type").Inc(1) + } else { + b.getTargetGraphSuccess.Inc(1) + } + }() + + logger := b.logger.With(zap.Any("build_description", param.Req.BuildDescription)) + logger.Infow("GetTargetGraph: Processing request") + + pw, cleanup, err := b.prepareWorkspace(ctx, param.Req.BuildDescription, logger) + if err != nil { + return nil, err + } + defer cleanup() + + return b.computeGraph(ctx, pw, param, logger) +} diff --git a/orchestrator/native_orchestrator_test.go b/orchestrator/native_orchestrator_test.go index 9739ad4..75abac8 100644 --- a/orchestrator/native_orchestrator_test.go +++ b/orchestrator/native_orchestrator_test.go @@ -24,19 +24,29 @@ import ( gogio "github.com/gogo/protobuf/io" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/tango/config" "github.com/uber/tango/core/git" gitmock "github.com/uber/tango/core/git/gitmock" repomanagermock "github.com/uber/tango/core/repomanager/mock" "github.com/uber/tango/core/storage" storagemock "github.com/uber/tango/core/storage/storagemock" targethasher "github.com/uber/tango/core/targethasher" + "github.com/uber/tango/core/workspace" workspacemock "github.com/uber/tango/core/workspace/workspacemock" + "github.com/uber/tango/graphrunner" graphmock "github.com/uber/tango/graphrunner/mock" pb "github.com/uber/tango/tangopb" "go.uber.org/mock/gomock" "go.uber.org/zap/zaptest" ) +func testConfig(t *testing.T) *config.Config { + t.Helper() + cfg, err := config.Parse("testdata/config.yaml") + require.NoError(t, err) + return cfg +} + func TestNative_GetTargetGraph_Success(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() @@ -63,12 +73,12 @@ func TestNative_GetTargetGraph_Success(t *testing.T) { rm := repomanagermock.NewMockRepoManager(ctrl) rm.EXPECT().Lease(gomock.Any(), gomock.Any()).Return(ws, nil) - o := NewNativeOrchestrator(context.Background(), Params{ - Storage: st, - RepoManager: rm, - Logger: zaptest.NewLogger(t).Sugar(), - GitFactory: func(dir string) git.Interface { return g }, - ConfigFilePath: "testdata/config.yaml", + o := NewNativeOrchestrator(Params{ + Storage: st, + RepoManager: rm, + Logger: zaptest.NewLogger(t).Sugar(), + GitFactory: func(dir string) git.Interface { return g }, + Config: testConfig(t), }) reader, err := o.GetTargetGraph(context.Background(), GetTargetGraphParam{ Req: &pb.GetTargetGraphRequest{ @@ -119,13 +129,15 @@ func TestNative_GetTargetGraph_TreehashNotFound_NoError(t *testing.T) { RuleType: "go_library", }, }}, nil) - o := NewNativeOrchestrator(context.Background(), Params{ - Storage: st, - RepoManager: rm, - Logger: zaptest.NewLogger(t).Sugar(), - GitFactory: func(dir string) git.Interface { return g }, - GraphRunner: graphRunner, - ConfigFilePath: "testdata/config.yaml", + o := NewNativeOrchestrator(Params{ + Storage: st, + RepoManager: rm, + Logger: zaptest.NewLogger(t).Sugar(), + GitFactory: func(dir string) git.Interface { return g }, + GraphRunnerFactory: func(context.Context, workspace.Workspace, git.Interface, config.RepositoryConfig, []string) (graphrunner.GraphRunner, error) { + return graphRunner, nil + }, + Config: testConfig(t), }) reader, err := o.GetTargetGraph(context.Background(), GetTargetGraphParam{ Req: &pb.GetTargetGraphRequest{BuildDescription: &pb.BuildDescription{Remote: "git@github:uber/tango", BaseSha: "1234567890"}}, @@ -151,12 +163,12 @@ func TestNative_GetTargetGraph_RevParseError_Propagates(t *testing.T) { ws.EXPECT().Release().Return(nil) rm := repomanagermock.NewMockRepoManager(ctrl) rm.EXPECT().Lease(gomock.Any(), gomock.Any()).Return(ws, nil) - o := NewNativeOrchestrator(context.Background(), Params{ - Storage: st, - RepoManager: rm, - Logger: zaptest.NewLogger(t).Sugar(), - GitFactory: func(dir string) git.Interface { return g }, - ConfigFilePath: "testdata/config.yaml", + o := NewNativeOrchestrator(Params{ + Storage: st, + RepoManager: rm, + Logger: zaptest.NewLogger(t).Sugar(), + GitFactory: func(dir string) git.Interface { return g }, + Config: testConfig(t), }) resp, err := o.GetTargetGraph(context.Background(), GetTargetGraphParam{ Req: &pb.GetTargetGraphRequest{BuildDescription: &pb.BuildDescription{Remote: "git@github:uber/tango", BaseSha: "1234567890"}}, @@ -190,12 +202,12 @@ func TestNative_GetTargetGraph_AppliesGitHubPR(t *testing.T) { ws.EXPECT().Release().Return(nil) rm := repomanagermock.NewMockRepoManager(ctrl) rm.EXPECT().Lease(gomock.Any(), gomock.Any()).Return(ws, nil) - o := NewNativeOrchestrator(context.Background(), Params{ - Storage: st, - RepoManager: rm, - Logger: zaptest.NewLogger(t).Sugar(), - GitFactory: func(dir string) git.Interface { return g }, - ConfigFilePath: "testdata/config.yaml", + o := NewNativeOrchestrator(Params{ + Storage: st, + RepoManager: rm, + Logger: zaptest.NewLogger(t).Sugar(), + GitFactory: func(dir string) git.Interface { return g }, + Config: testConfig(t), }) reader, err := o.GetTargetGraph(context.Background(), GetTargetGraphParam{ Req: &pb.GetTargetGraphRequest{