Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MODULE.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use_repo(
"org_uber_go_fx",
"org_uber_go_goleak",
"org_uber_go_mock",
"org_uber_go_multierr",
"org_uber_go_yarpc",
"org_uber_go_zap",
)
84 changes: 30 additions & 54 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,28 +43,28 @@ type job struct {
func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, stream pb.TangoServiceGetChangedTargetsYARPCServer) (retErr error) {
scope := c.scope.SubScope("get_changed_targets")
scope.Counter("calls").Inc(1)
logger := c.logger
defer func() {
if retErr != nil {
scope.Counter("failure").Inc(1)
emitFailureMetric(scope, retErr)
logger.Error("GetChangedTargets failed", zap.Error(retErr))
} else {
scope.Counter("success").Inc(1)
}
}()
if err := validateGetChangedTargetsRequest(request); err != nil {
return common.WithReason(common.FailureReasonValidation, common.ErrorTypeUser, err)
return fmt.Errorf("validate request: %w", err)
}
scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())})
ctx, cancelLink := c.linkRequestCtx(stream.Context())
defer cancelLink()
start := time.Now()
logger := c.logger.With(
logger = c.logger.With(
zap.Any("first_revision", request.GetFirstRevision()),
zap.Any("second_revision", request.GetSecondRevision()),
)

logger.Info("GetChangedTargets: Processing request")

// Default max_distance to -1 (no filtering) when the client omits OutputConfig
// entirely. When OutputConfig is supplied, take max_distance at face value —
// see proto/tango.proto OutputConfig.max_distance for the wire-default caveat.
Expand All @@ -82,14 +82,13 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
cacheStart := time.Now()
treehash1, treehash2, err := readTreehashParallel(ctx, c.storage, request.GetFirstRevision(), request.GetSecondRevision())
if err != nil {
logger.Error("GetChangedTargets: Failed to read revision treehash", zap.Error(err))
return common.WithReason(failureReasonTreehashRead, common.ErrorTypeInfra, err)
return fmt.Errorf("read revision treehash: %w", err)
}
if treehash1 != "" && treehash2 != "" {
cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
cachedReader, cacheErr := storage.NewChangedTargetsReader(ctx, c.storage, cacheKey)
if cacheErr != nil && !storage.IsNotFound(cacheErr) {
logger.Warn("GetChangedTargets: Failed to read from cache, proceeding to compute", zap.Error(cacheErr))
return fmt.Errorf("failed to read from cache: %w", cacheErr)
} else if cachedReader != nil {
// Buffer all responses before sending any. A concurrent goroutine write may have
// left a partial blob in storage; buffering lets us detect corruption and fall
Expand All @@ -98,9 +97,10 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
var readErr error
for {
if err := ctx.Err(); err != nil {
cachedReader.Close()
// Client gave up while we were draining the cache. Surface as a user-cancelled error.
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, err)
if closeErr := cachedReader.Close(); closeErr != nil {
return fmt.Errorf("close cached reader: %w (original: %w)", closeErr, err)
}
return err
}
var resp *pb.GetChangedTargetsResponse
resp, readErr = cachedReader.Read()
Expand All @@ -113,30 +113,23 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
}
cached = append(cached, resp)
}
cachedReader.Close()
if closeErr := cachedReader.Close(); closeErr != nil {
return fmt.Errorf("close cached reader: %w", closeErr)
}

if readErr != nil {
// Blob is corrupt (likely an incomplete write). Log and fall through to recompute.
logger.Warn("GetChangedTargets: Cached result is incomplete, recomputing", zap.Error(readErr))
} else {
cacheReadDuration := time.Since(cacheStart)
logger.Info("GetChangedTargets: Cache hit, streaming from storage",
zap.Duration("cache_read_duration", cacheReadDuration),
)
scope.Counter("changed_targets_cache_hit").Inc(1)
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if sendErr := sendTrimmedChangedTargets(stream, cached, maxDist, request.GetOutputConfig()); sendErr != nil {
logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr))
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send cached response: %w", sendErr))
}
totalDuration := time.Since(start)
logger.Info("GetChangedTargets: Successfully streamed from cache",
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
scope.Histogram("total_duration.histogram", c.totalDurationBuckets).RecordDuration(totalDuration)
return nil
return fmt.Errorf("read cached response: %w", readErr)
}
cacheReadDuration := time.Since(cacheStart)
scope.Counter("changed_targets_cache_hit").Inc(1)
scope.Timer("cache_read_duration").Record(cacheReadDuration)
if sendErr := sendTrimmedChangedTargets(stream, cached, maxDist, request.GetOutputConfig()); sendErr != nil {
return fmt.Errorf("failed to send cached response: %w", sendErr)
}
totalDuration := time.Since(start)
scope.Timer("total_duration").Record(totalDuration)
scope.Histogram("total_duration.histogram", c.totalDurationBuckets).RecordDuration(totalDuration)
return nil
}
}
}
Expand Down Expand Up @@ -223,14 +216,10 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
}

graphFetchDuration := time.Since(graphFetchStart)
logger.Info("GetChangedTargets: Both graphs fetched",
zap.Duration("graph_fetch_duration", graphFetchDuration),
)
scope.Timer("graph_fetch_duration").Record(graphFetchDuration)

if ctx.Err() != nil {
// If the context was cancelled by the upstream, just return the original error without additional augmentation
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
return ctx.Err()
}

// Process errors, only aggregating the ones that are original ones and not a result of the other job being cancelled
Expand All @@ -255,21 +244,17 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
jobs[1].graphStreamChunks = nil

compareStart := time.Now()
changedTargetsResponses, err := c.compareTargetGraphs(ctx, logger, firstGraph, secondGraph, maxDist)
changedTargetsResponses, err := c.compareTargetGraphs(ctx, firstGraph, secondGraph, maxDist)
// Allow GC of raw graph data while the caching goroutine runs.
firstGraph = nil
secondGraph = nil
if err != nil {
if ctx.Err() != nil {
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
return ctx.Err()
}
logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err))
return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err))
return fmt.Errorf("failed to compare target graphs: %w", err)
}
compareDuration := time.Since(compareStart)
logger.Info("GetChangedTargets: Target graphs compared",
zap.Duration("compare_duration", compareDuration),
)
scope.Timer("compare_duration").Record(compareDuration)

// Cache the computed result concurrently so it doesn't block the stream send.
Expand Down Expand Up @@ -306,17 +291,12 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str

sendStart := time.Now()
if err := sendTrimmedChangedTargets(stream, changedTargetsResponses, maxDist, request.GetOutputConfig()); err != nil {
logger.Error("GetChangedTargets: Failed to send response", zap.Error(err))
return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send response: %w", err))
return fmt.Errorf("failed to send response: %w", err)
}
sendDuration := time.Since(sendStart)
scope.Timer("send_duration").Record(sendDuration)

totalDuration := time.Since(start)
logger.Info("GetChangedTargets: Successfully processed request",
zap.Duration("send_duration", sendDuration),
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
scope.Histogram("total_duration.histogram", c.totalDurationBuckets).RecordDuration(totalDuration)
return nil
Expand All @@ -331,10 +311,9 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
// targets get their distance from BFS over the reverse-dep graph.
// Output IDs are re-mapped into a canonical per-call namespace so the
// response metadata only carries the names actually referenced.
func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) {
func (c *controller) compareTargetGraphs(ctx context.Context, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) {
start := time.Now()
scope := c.scope.SubScope("compare_target_graphs")
logger.Info("compareTargetGraphs: Computing differences between target graphs")

// 1) Extract targets and metadata; index by canonical names
indexStart := time.Now()
Expand Down Expand Up @@ -592,9 +571,6 @@ func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger
})
}
totalDuration := time.Since(start)
logger.Info("compareTargetGraphs: Done",
zap.Duration("total_duration", totalDuration),
)
scope.Timer("total_duration").Record(totalDuration)
return results, nil
}
Expand Down
33 changes: 13 additions & 20 deletions controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
gogio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/tango/core/common"
"github.com/uber/tango/core/storage"
storagemock "github.com/uber/tango/core/storage/storagemock"
orchestratormock "github.com/uber/tango/orchestrator/orchestratormock"
Expand Down Expand Up @@ -146,7 +145,7 @@ func TestCompareTargetGraphs(t *testing.T) {
},
}

response, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1)
response, err := c.compareTargetGraphs(context.Background(),[]*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1)
require.NoError(t, err)
require.NotNil(t, response)
}
Expand All @@ -158,7 +157,7 @@ func TestGetChangedTargets_ValidationError(t *testing.T) {
c := NewController(context.Background(), Params{Logger: zap.NewNop(), Orchestrator: orchestratormock.NewMockOrchestrator(ctrl)})

err := c.GetChangedTargets(nil, stream)
assert.EqualError(t, err, "request cannot be nil")
assert.EqualError(t, err, "validate request: request cannot be nil")
}

func TestGetChangedTargets_CacheHit(t *testing.T) {
Expand Down Expand Up @@ -219,10 +218,7 @@ func TestGetChangedTargets_TreehashReadError(t *testing.T) {

storagemock := storagemock.NewMockStorage(ctrl)
// A non-NotFound storage error on a treehash read must surface as a failed
// request (with failureReasonTreehashRead) rather than be silently treated
// as a cache miss. Both revision treehashes are read in parallel, so two Get
// calls happen; the handler returns the first failure (and drops the
// cancelled sibling's error) before any graph fetch happens.
// request rather than be silently treated as a cache miss.
injected := errors.New("storage exploded")
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(storage.DownloadResponse{}, injected).Times(2)
Expand All @@ -242,10 +238,7 @@ func TestGetChangedTargets_TreehashReadError(t *testing.T) {
err := c.GetChangedTargets(request, stream)
require.Error(t, err)
require.ErrorIs(t, err, injected)
var ce common.ClassifiedError
require.True(t, errors.As(err, &ce), "expected ClassifiedError, got %T", err)
assert.Equal(t, failureReasonTreehashRead, ce.Reason())
assert.Equal(t, common.ErrorTypeInfra, ce.Type())
assert.Contains(t, err.Error(), "read revision treehash")
}

func TestReadTreehash(t *testing.T) {
Expand Down Expand Up @@ -613,7 +606,7 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
require.Len(t, res, 2)
cs := res[0].GetChangedTargets()
Expand Down Expand Up @@ -685,7 +678,7 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -758,7 +751,7 @@ func TestCompareTargetGraphs_ChangedRuleUnreachableFromAnySeed(t *testing.T) {
// Hash-only change on a rule with no own-config change and no reachable
// seed: under "trust the hasher" semantics, an orphan CHANGED rule with
// no upstream explanation becomes a distance-0 seed itself.
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -825,7 +818,7 @@ func TestCompareTargetGraphs_ChangedWhenDependenciesChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -907,7 +900,7 @@ func TestCompareTargetGraphs_ChangedWhenAttributesChanged(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -989,7 +982,7 @@ func TestCompareTargetGraphs_ChangedWhenNewAttributeAdded(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -1329,7 +1322,7 @@ func TestCompareTargetGraphs_HashOnlyChangePropagatesViaBFS(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -1411,7 +1404,7 @@ func TestCompareTargetGraphs_SiblingRuleNotPromotedToSeed(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down Expand Up @@ -1465,7 +1458,7 @@ func TestCompareTargetGraphs_DeletedTargetEmitted(t *testing.T) {
},
},
}
res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1)
res, err := c.compareTargetGraphs(context.Background(),first, second, -1)
require.NoError(t, err)
cs := res[0].GetChangedTargets()
require.NotNil(t, cs)
Expand Down
Loading
Loading