diff --git a/controller/getchangedtargets.go b/controller/getchangedtargets.go index 1014984..fbb30e9 100644 --- a/controller/getchangedtargets.go +++ b/controller/getchangedtargets.go @@ -43,6 +43,7 @@ type job struct { func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, stream pb.TangoServiceGetChangedTargetsYARPCServer) (retErr error) { scope := c.scope.SubScope("get_changed_targets") scope.Counter("calls").Inc(1) + logger := c.logger defer func() { if retErr != nil { scope.Counter("failure").Inc(1) @@ -52,19 +53,18 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str } }() if err := validateGetChangedTargetsRequest(request); err != nil { - return common.WithReason(common.FailureReasonValidation, common.ErrorTypeUser, err) + logger.Error("GetChangedTargets: validation failed", zap.Error(err)) + return fmt.Errorf("validate request: %w", err) } scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())}) ctx, cancelLink := c.linkRequestCtx(stream.Context()) defer cancelLink() start := time.Now() - logger := c.logger.With( + logger = c.logger.With( zap.Any("first_revision", request.GetFirstRevision()), zap.Any("second_revision", request.GetSecondRevision()), ) - logger.Info("GetChangedTargets: Processing request") - // Default max_distance to -1 (no filtering) when the client omits OutputConfig // entirely. When OutputConfig is supplied, take max_distance at face value — // see proto/tango.proto OutputConfig.max_distance for the wire-default caveat. @@ -82,14 +82,15 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str cacheStart := time.Now() treehash1, treehash2, err := readTreehashParallel(ctx, c.storage, request.GetFirstRevision(), request.GetSecondRevision()) if err != nil { - logger.Error("GetChangedTargets: Failed to read revision treehash", zap.Error(err)) - return common.WithReason(failureReasonTreehashRead, common.ErrorTypeInfra, err) + logger.Error("GetChangedTargets: failed to read revision treehash", zap.Error(err)) + return fmt.Errorf("read revision treehash: %w", err) } if treehash1 != "" && treehash2 != "" { cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions()) cachedReader, cacheErr := storage.NewChangedTargetsReader(ctx, c.storage, cacheKey) if cacheErr != nil && !storage.IsNotFound(cacheErr) { - logger.Warn("GetChangedTargets: Failed to read from cache, proceeding to compute", zap.Error(cacheErr)) + logger.Error("GetChangedTargets: failed to read from cache", zap.Error(cacheErr)) + return fmt.Errorf("failed to read from cache: %w", cacheErr) } else if cachedReader != nil { // Buffer all responses before sending any. A concurrent goroutine write may have // left a partial blob in storage; buffering lets us detect corruption and fall @@ -98,9 +99,8 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str var readErr error for { if err := ctx.Err(); err != nil { - cachedReader.Close() - // Client gave up while we were draining the cache. Surface as a user-cancelled error. - return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, err) + closeErr := cachedReader.Close() + return errors.Join(err, closeErr) } var resp *pb.GetChangedTargetsResponse resp, readErr = cachedReader.Read() @@ -113,30 +113,26 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str } cached = append(cached, resp) } - cachedReader.Close() + if closeErr := cachedReader.Close(); closeErr != nil { + logger.Error("GetChangedTargets: failed to close cached reader", zap.Error(closeErr)) + return fmt.Errorf("close cached reader: %w", closeErr) + } if readErr != nil { - // Blob is corrupt (likely an incomplete write). Log and fall through to recompute. - logger.Warn("GetChangedTargets: Cached result is incomplete, recomputing", zap.Error(readErr)) - } else { - cacheReadDuration := time.Since(cacheStart) - logger.Info("GetChangedTargets: Cache hit, streaming from storage", - zap.Duration("cache_read_duration", cacheReadDuration), - ) - scope.Counter("changed_targets_cache_hit").Inc(1) - scope.Timer("cache_read_duration").Record(cacheReadDuration) - if sendErr := sendTrimmedChangedTargets(stream, cached, maxDist, request.GetOutputConfig()); sendErr != nil { - logger.Error("GetChangedTargets: Failed to send cached response", zap.Error(sendErr)) - return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send cached response: %w", sendErr)) - } - totalDuration := time.Since(start) - logger.Info("GetChangedTargets: Successfully streamed from cache", - zap.Duration("total_duration", totalDuration), - ) - scope.Timer("total_duration").Record(totalDuration) - scope.Histogram("total_duration.histogram", c.totalDurationBuckets).RecordDuration(totalDuration) - return nil + logger.Error("GetChangedTargets: failed to read cached response", zap.Error(readErr)) + return fmt.Errorf("read cached response: %w", readErr) } + cacheReadDuration := time.Since(cacheStart) + scope.Counter("changed_targets_cache_hit").Inc(1) + scope.Timer("cache_read_duration").Record(cacheReadDuration) + if sendErr := sendTrimmedChangedTargets(stream, cached, maxDist, request.GetOutputConfig()); sendErr != nil { + logger.Error("GetChangedTargets: failed to send cached response", zap.Error(sendErr)) + return fmt.Errorf("failed to send cached response: %w", sendErr) + } + totalDuration := time.Since(start) + scope.Timer("total_duration").Record(totalDuration) + scope.Histogram("total_duration.histogram", c.totalDurationBuckets).RecordDuration(totalDuration) + return nil } } } @@ -223,14 +219,10 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str } graphFetchDuration := time.Since(graphFetchStart) - logger.Info("GetChangedTargets: Both graphs fetched", - zap.Duration("graph_fetch_duration", graphFetchDuration), - ) scope.Timer("graph_fetch_duration").Record(graphFetchDuration) if ctx.Err() != nil { - // If the context was cancelled by the upstream, just return the original error without additional augmentation - return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + return ctx.Err() } // Process errors, only aggregating the ones that are original ones and not a result of the other job being cancelled @@ -246,6 +238,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str } if err != nil { + logger.Error("GetChangedTargets: failed to get target graphs", zap.Error(err)) return err } firstGraph := jobs[0].graphStreamChunks @@ -255,21 +248,18 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str jobs[1].graphStreamChunks = nil compareStart := time.Now() - changedTargetsResponses, err := c.compareTargetGraphs(ctx, logger, firstGraph, secondGraph, maxDist) + changedTargetsResponses, err := c.compareTargetGraphs(ctx, firstGraph, secondGraph, maxDist) // Allow GC of raw graph data while the caching goroutine runs. firstGraph = nil secondGraph = nil if err != nil { if ctx.Err() != nil { - return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + return ctx.Err() } - logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err)) - return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err)) + logger.Error("GetChangedTargets: failed to compare target graphs", zap.Error(err)) + return fmt.Errorf("failed to compare target graphs: %w", err) } compareDuration := time.Since(compareStart) - logger.Info("GetChangedTargets: Target graphs compared", - zap.Duration("compare_duration", compareDuration), - ) scope.Timer("compare_duration").Record(compareDuration) // Cache the computed result concurrently so it doesn't block the stream send. @@ -306,17 +296,13 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str sendStart := time.Now() if err := sendTrimmedChangedTargets(stream, changedTargetsResponses, maxDist, request.GetOutputConfig()); err != nil { - logger.Error("GetChangedTargets: Failed to send response", zap.Error(err)) - return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("failed to send response: %w", err)) + logger.Error("GetChangedTargets: failed to send response", zap.Error(err)) + return fmt.Errorf("failed to send response: %w", err) } sendDuration := time.Since(sendStart) scope.Timer("send_duration").Record(sendDuration) totalDuration := time.Since(start) - logger.Info("GetChangedTargets: Successfully processed request", - zap.Duration("send_duration", sendDuration), - zap.Duration("total_duration", totalDuration), - ) scope.Timer("total_duration").Record(totalDuration) scope.Histogram("total_duration.histogram", c.totalDurationBuckets).RecordDuration(totalDuration) return nil @@ -331,10 +317,9 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str // targets get their distance from BFS over the reverse-dep graph. // Output IDs are re-mapped into a canonical per-call namespace so the // response metadata only carries the names actually referenced. -func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) { +func (c *controller) compareTargetGraphs(ctx context.Context, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) { start := time.Now() scope := c.scope.SubScope("compare_target_graphs") - logger.Info("compareTargetGraphs: Computing differences between target graphs") // 1) Extract targets and metadata; index by canonical names indexStart := time.Now() @@ -592,9 +577,6 @@ func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger }) } totalDuration := time.Since(start) - logger.Info("compareTargetGraphs: Done", - zap.Duration("total_duration", totalDuration), - ) scope.Timer("total_duration").Record(totalDuration) return results, nil } diff --git a/controller/getchangedtargets_test.go b/controller/getchangedtargets_test.go index 1e854fc..1f1c18a 100644 --- a/controller/getchangedtargets_test.go +++ b/controller/getchangedtargets_test.go @@ -27,7 +27,6 @@ import ( gogio "github.com/gogo/protobuf/io" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/tango/core/common" "github.com/uber/tango/core/storage" storagemock "github.com/uber/tango/core/storage/storagemock" orchestratormock "github.com/uber/tango/orchestrator/orchestratormock" @@ -146,7 +145,7 @@ func TestCompareTargetGraphs(t *testing.T) { }, } - response, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1) + response, err := c.compareTargetGraphs(context.Background(), []*pb.GetTargetGraphResponse{firstGraph}, []*pb.GetTargetGraphResponse{secondGraph}, -1) require.NoError(t, err) require.NotNil(t, response) } @@ -158,7 +157,7 @@ func TestGetChangedTargets_ValidationError(t *testing.T) { c := NewController(context.Background(), Params{Logger: zap.NewNop(), Orchestrator: orchestratormock.NewMockOrchestrator(ctrl)}) err := c.GetChangedTargets(nil, stream) - assert.EqualError(t, err, "request cannot be nil") + assert.EqualError(t, err, "validate request: request cannot be nil") } func TestGetChangedTargets_CacheHit(t *testing.T) { @@ -219,10 +218,7 @@ func TestGetChangedTargets_TreehashReadError(t *testing.T) { storagemock := storagemock.NewMockStorage(ctrl) // A non-NotFound storage error on a treehash read must surface as a failed - // request (with failureReasonTreehashRead) rather than be silently treated - // as a cache miss. Both revision treehashes are read in parallel, so two Get - // calls happen; the handler returns the first failure (and drops the - // cancelled sibling's error) before any graph fetch happens. + // request rather than be silently treated as a cache miss. injected := errors.New("storage exploded") storagemock.EXPECT().Get(gomock.Any(), gomock.Any()). Return(storage.DownloadResponse{}, injected).Times(2) @@ -242,10 +238,7 @@ func TestGetChangedTargets_TreehashReadError(t *testing.T) { err := c.GetChangedTargets(request, stream) require.Error(t, err) require.ErrorIs(t, err, injected) - var ce common.ClassifiedError - require.True(t, errors.As(err, &ce), "expected ClassifiedError, got %T", err) - assert.Equal(t, failureReasonTreehashRead, ce.Reason()) - assert.Equal(t, common.ErrorTypeInfra, ce.Type()) + assert.Contains(t, err.Error(), "read revision treehash") } func TestReadTreehash(t *testing.T) { @@ -613,7 +606,7 @@ func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) require.Len(t, res, 2) cs := res[0].GetChangedTargets() @@ -685,7 +678,7 @@ func TestCompareTargetGraphs_SourceFileDirectAndPropagation(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -758,7 +751,7 @@ func TestCompareTargetGraphs_ChangedRuleUnreachableFromAnySeed(t *testing.T) { // Hash-only change on a rule with no own-config change and no reachable // seed: under "trust the hasher" semantics, an orphan CHANGED rule with // no upstream explanation becomes a distance-0 seed itself. - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -825,7 +818,7 @@ func TestCompareTargetGraphs_ChangedWhenDependenciesChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -907,7 +900,7 @@ func TestCompareTargetGraphs_ChangedWhenAttributesChanged(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -989,7 +982,7 @@ func TestCompareTargetGraphs_ChangedWhenNewAttributeAdded(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1329,7 +1322,7 @@ func TestCompareTargetGraphs_HashOnlyChangePropagatesViaBFS(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1411,7 +1404,7 @@ func TestCompareTargetGraphs_SiblingRuleNotPromotedToSeed(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) @@ -1465,7 +1458,7 @@ func TestCompareTargetGraphs_DeletedTargetEmitted(t *testing.T) { }, }, } - res, err := c.compareTargetGraphs(context.Background(), zap.NewNop(), first, second, -1) + res, err := c.compareTargetGraphs(context.Background(), first, second, -1) require.NoError(t, err) cs := res[0].GetChangedTargets() require.NotNil(t, cs) diff --git a/controller/gettargetgraph.go b/controller/gettargetgraph.go index fa0450e..b947062 100644 --- a/controller/gettargetgraph.go +++ b/controller/gettargetgraph.go @@ -33,6 +33,12 @@ import ( func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb.TangoServiceGetTargetGraphYARPCServer) (retErr error) { scope := c.scope.SubScope("get_target_graph") scope.Counter("calls").Inc(1) + start := time.Now() + ctx, cancelLink := c.linkRequestCtx(stream.Context()) + defer cancelLink() + logger := c.logger.With( + zap.Any("build_description", request.GetBuildDescription()), + ) defer func() { if retErr != nil { scope.Counter("failure").Inc(1) @@ -41,15 +47,10 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb scope.Counter("success").Inc(1) } }() - start := time.Now() - ctx, cancelLink := c.linkRequestCtx(stream.Context()) - defer cancelLink() - logger := c.logger.With( - zap.Any("build_description", request.GetBuildDescription()), - ) scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetBuildDescription().GetRemote())}) graphReader, err := c.getGraph(ctx, request.GetBuildDescription(), request.GetOutputConfig(), request.GetRequestOptions(), request.GetBypassCache()) if err != nil { + logger.Error("GetTargetGraph: failed to get graph", zap.Error(err)) return err } if graphReader == nil { @@ -64,20 +65,18 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb if err == io.EOF { sendDuration := time.Since(sendStart) totalDuration := time.Since(start) - logger.Info("GetTargetGraph: Done streaming", - zap.Duration("send_duration", sendDuration), - zap.Duration("total_duration", totalDuration), - ) scope.Timer("send_duration").Record(sendDuration) scope.Timer("total_duration").Record(totalDuration) return nil } if err != nil { + logger.Error("GetTargetGraph: failed to read graph stream", zap.Error(err)) return common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err) } toSend := applyOptimizedTargetsOutputConfigToChunk(graphStreamChunk, outputConfig) err = stream.Send(toSend) if err != nil { + logger.Error("GetTargetGraph: failed to send graph", zap.Error(err)) return common.WithReason(failureReasonSend, common.ErrorTypeInfra, fmt.Errorf("send graph: %w", err)) } } @@ -94,48 +93,34 @@ func (c *controller) getGraph(ctx context.Context, buildDescription *pb.BuildDes if buildDescription.GetBaseSha() == "" || buildDescription.GetRemote() == "" { return nil, fmt.Errorf("build description is missing required fields: base_sha: %s, remote: %s", buildDescription.GetBaseSha(), buildDescription.GetRemote()) } - logger := c.logger.With( - zap.Any("build_description", buildDescription), - ) if !bypassCache { // Look up the the git treehash based on cache path treehashCachePath := common.GetTreehashCachePath(buildDescription) treehashResponse, err := c.storage.Get(ctx, storage.DownloadRequest{Key: treehashCachePath}) if err != nil { if storage.IsNotFound(err) { - // Cache miss - blob doesn't exist, need to compute and store target graph - logger.Debug("getGraph: treehash not found", zap.Error(err)) + // Cache miss - fall through to compute } else { - // Other errors (network, infra issues) should be retried - logger.Error("getGraph: Storage error", zap.Error(err)) - return nil, err + return nil, fmt.Errorf("storage error reading treehash: %w", err) } } else { defer treehashResponse.ReadCloser.Close() treehashBytes, err := io.ReadAll(treehashResponse.ReadCloser) if err != nil { - logger.Error("getGraph: Error reading treehash", zap.Error(err)) - return nil, err + return nil, fmt.Errorf("read treehash: %w", err) } - logger.Info("getGraph: treehash found") treehashPath := common.GetGraphByTreeHash(buildDescription.GetRemote(), string(treehashBytes), buildDescription.GetStrategy(), requestOptions) // Download the target graph based on treehash. storageStart := time.Now() graphReader, err := storage.NewGraphReader(ctx, c.storage, treehashPath) if err != nil { if ctx.Err() != nil { - return nil, common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + return nil, fmt.Errorf("context cancelled: %w", ctx.Err()) } if !storage.IsNotFound(err) { - logger.Error("getGraph: Error reading graph from Storage", zap.Error(err)) - return nil, common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err) + return nil, fmt.Errorf("read graph from storage: %w", err) } - logger.Warn("getGraph: graph not found at treehash path", zap.Error(err)) } else { - logger.Info("getGraph: loaded graph from storage", - zap.Duration("storage_duration", time.Since(storageStart)), - zap.Duration("total_duration", time.Since(start)), - ) scope := c.scope.SubScope("get_graph") scope.Counter("graph_cache_hit").Inc(1) scope.Timer("storage_duration").Record(time.Since(storageStart)) @@ -143,25 +128,15 @@ func (c *controller) getGraph(ctx context.Context, buildDescription *pb.BuildDes return graphReader, nil } } - } else { - logger.Info("getGraph: bypass_cache=true, skipping cache lookup") } computeStart := time.Now() graphReader, err := c.orchestrator.GetTargetGraph(ctx, orchestrator.GetTargetGraphParam{Req: &pb.GetTargetGraphRequest{BuildDescription: buildDescription, OutputConfig: outputConfig, RequestOptions: requestOptions}, BypassCache: bypassCache}) if err != nil { if ctx.Err() != nil { - return nil, common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + return nil, fmt.Errorf("context cancelled: %w", ctx.Err()) } - var ce common.ClassifiedError - if errors.As(err, &ce) { - return nil, err - } - return nil, common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err) + return nil, fmt.Errorf("compute target graph: %w", err) } - logger.Info("getGraph: computed target graph", - zap.Duration("compute_duration", time.Since(computeStart)), - zap.Duration("total_duration", time.Since(start)), - ) scope := c.scope.SubScope("get_graph") scope.Timer("compute_duration").Record(time.Since(computeStart)) scope.Timer("total_duration").Record(time.Since(start)) diff --git a/controller/gettargetgraph_test.go b/controller/gettargetgraph_test.go index e62856f..ce626e8 100644 --- a/controller/gettargetgraph_test.go +++ b/controller/gettargetgraph_test.go @@ -24,7 +24,6 @@ import ( gogio "github.com/gogo/protobuf/io" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "github.com/uber/tango/core/common" "github.com/uber/tango/core/storage" storagemock "github.com/uber/tango/core/storage/storagemock" orchestratormock "github.com/uber/tango/orchestrator/orchestratormock" @@ -248,10 +247,8 @@ func TestGetTargetGraph_GraphFetchError(t *testing.T) { BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, }, stream) require.Error(t, err) - var ce common.ClassifiedError - require.True(t, errors.As(err, &ce)) - assert.Equal(t, failureReasonGraphFetch, ce.Reason()) - assert.Equal(t, common.ErrorTypeInfra, ce.Type()) + assert.Contains(t, err.Error(), "read graph from storage") + assert.Contains(t, err.Error(), "graph error") } // New coverage: io.ReadFrom fails on graph read -> error returned. @@ -351,10 +348,7 @@ func TestGetTargetGraph_GraphReadCancelled(t *testing.T) { BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, }, stream) require.Error(t, err) - var ce common.ClassifiedError - require.True(t, errors.As(err, &ce)) - assert.Equal(t, common.FailureReasonCancelled, ce.Reason()) - assert.Equal(t, common.ErrorTypeUser, ce.Type()) + assert.Contains(t, err.Error(), "context cancelled") } func TestGetTargetGraph_OrchestratorCancelled(t *testing.T) { @@ -376,10 +370,7 @@ func TestGetTargetGraph_OrchestratorCancelled(t *testing.T) { BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, }, stream) require.Error(t, err) - var ce common.ClassifiedError - require.True(t, errors.As(err, &ce)) - assert.Equal(t, common.FailureReasonCancelled, ce.Reason()) - assert.Equal(t, common.ErrorTypeUser, ce.Type()) + assert.Contains(t, err.Error(), "context cancelled") } func newMockReadCloser(data []byte) io.ReadCloser { diff --git a/orchestrator/native_orchestrator.go b/orchestrator/native_orchestrator.go index 58d7ac5..24e6a5d 100644 --- a/orchestrator/native_orchestrator.go +++ b/orchestrator/native_orchestrator.go @@ -111,7 +111,6 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget } }() logger := b.logger.With(zap.Any("build_description", param.Req.BuildDescription)) - logger.Infow("GetTargetGraph: Processing request") // parse the config file cfg, err := config.Parse(b.configFilePath) @@ -128,11 +127,10 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget return nil, fmt.Errorf("lease workspace: %w", err) } defer func() { - err := ws.Release() - if err != nil { - // clean up the workspace if release fails. + if releaseErr := ws.Release(); releaseErr != nil { + retErr = errors.Join(retErr, fmt.Errorf("release workspace: %w", releaseErr)) if removeErr := os.RemoveAll(ws.Path()); removeErr != nil { - logger.Errorw("GetTargetGraph: Failed to remove workspace", zap.Error(removeErr)) + retErr = errors.Join(retErr, fmt.Errorf("remove workspace: %w", removeErr)) } } }() @@ -140,7 +138,6 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget if err != nil { return nil, fmt.Errorf("checkout %s@%s: %w", param.Req.BuildDescription.Remote, param.Req.BuildDescription.BaseSha, err) } - logger.Infow("GetTargetGraph: Checked out base revision") requests := make([]workspace.Request, 0, len(param.Req.BuildDescription.Requests)) factory := b.gitFactory @@ -160,7 +157,6 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget if err != nil { return nil, fmt.Errorf("apply requests: %w", err) } - logger.Infow("GetTargetGraph: Applied requests", zap.Int("request_count", len(requests))) // Compute the treehash and download the target graph from storage if exists. treehash, err := gitModule.RevParse(ctx, "HEAD^{tree}") @@ -171,15 +167,11 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget if !param.BypassCache { graphReader, err := storage.NewGraphReader(ctx, b.storage, treehashPath) if err == nil { - logger.Infow("GetTargetGraph: Cache hit on treehash", zap.String("treehash", treehash)) return graphReader, nil } if !storage.IsNotFound(err) { return nil, fmt.Errorf("read graph at treehash %s: %w", treehash, err) } - logger.Infow("GetTargetGraph: Treehash not found, computing target graph", zap.String("treehash", treehash)) - } else { - logger.Infow("GetTargetGraph: bypass_cache=true, computing target graph") } // Compute the target graph and store it in storage. runner := b.graphRunner @@ -225,6 +217,5 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget if err != nil { return nil, fmt.Errorf("create graph reader at %s: %w", treehashPath, err) } - logger.Infow("GetTargetGraph: Done computing and storing target graph", zap.String("treehash", treehash)) return graphReader, nil }