Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//config",
"//core/common",
"//core/storage",
"//core/targethasher",
"//orchestrator",
"//tangopb",
"@com_github_uber_go_tally//:tally",
Expand Down
2 changes: 1 addition & 1 deletion controller/getchangedtargetgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
// GetChangedTargetGraph is the streaming RPC that will return the subgraph
// induced by the changed targets between two revisions. It is currently a
// stub: it records success/failure metrics and returns no data.
func (c *controller) GetChangedTargetGraph(request *pb.GetChangedTargetGraphRequest, stream pb.TangoServiceGetChangedTargetGraphYARPCServer) (retErr error) {
func (c *controller) GetChangedTargetGraph(_ *pb.GetChangedTargetGraphRequest, _ pb.TangoServiceGetChangedTargetGraphYARPCServer) (retErr error) {
scope := c.scope.SubScope("get_changed_target_graph")
defer func() {
if retErr != nil {
Expand Down
19 changes: 12 additions & 7 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/uber/tango/core/common"
"github.com/uber/tango/core/storage"
"github.com/uber/tango/core/targethasher"
pb "github.com/uber/tango/tangopb"
"go.uber.org/zap"
)
Expand All @@ -40,6 +41,8 @@ type job struct {
// GetChangedTargets returns the changed targets between two revisions. If the
// client disconnects, the stream's context is cancelled and the function
// returns with context.Canceled.
//
//nolint:gocyclo // orchestration method with inherently high branching
func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, stream pb.TangoServiceGetChangedTargetsYARPCServer) (retErr error) {
scope := c.scope.SubScope("get_changed_targets")
scope.Counter("calls").Inc(1)
Expand Down Expand Up @@ -257,8 +260,8 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
compareStart := time.Now()
changedTargetsResponses, err := c.compareTargetGraphs(ctx, logger, firstGraph, secondGraph, maxDist)
// Allow GC of raw graph data while the caching goroutine runs.
firstGraph = nil
secondGraph = nil
firstGraph = nil //nolint:ineffassign // intentional: allow GC of large slice
secondGraph = nil //nolint:ineffassign // intentional: allow GC of large slice

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now sure why this is needed?

if err != nil {
if ctx.Err() != nil {
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
Expand Down Expand Up @@ -331,6 +334,8 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
// targets get their distance from BFS over the reverse-dep graph.
// Output IDs are re-mapped into a canonical per-call namespace so the
// response metadata only carries the names actually referenced.
//
//nolint:gocyclo // comparison pipeline with multiple indexed data structures
func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger, firstGraph, secondGraph []*pb.GetTargetGraphResponse, maxDist int32) ([]*pb.GetChangedTargetsResponse, error) {
start := time.Now()
scope := c.scope.SubScope("compare_target_graphs")
Expand All @@ -347,18 +352,18 @@ func (c *controller) compareTargetGraphs(ctx context.Context, logger *zap.Logger
return nil, err
}
// Release raw chunk slices — individual target protos are now held by the ID maps.
firstGraph = nil
secondGraph = nil
firstGraph = nil //nolint:ineffassign // intentional: allow GC of large slice
secondGraph = nil //nolint:ineffassign // intentional: allow GC of large slice
firstByName, err := buildNameIndex(ctx, firstTargetsByID, firstMetadata)
if err != nil {
return nil, err
}
firstTargetsByID = nil // all pointers are now in firstByName; drop the duplicate map
firstTargetsByID = nil //nolint:ineffassign // intentional: allow GC of duplicate map
secondByName, err := buildNameIndex(ctx, secondTargetsByID, secondMetadata)
if err != nil {
return nil, err
}
secondTargetsByID = nil
secondTargetsByID = nil //nolint:ineffassign // intentional: allow GC of duplicate map
indexDuration := time.Since(indexStart)
scope.Timer("index_duration").Record(indexDuration)

Expand Down Expand Up @@ -671,7 +676,7 @@ func detectSourceFileID(meta *pb.Metadata) int32 {
}
// check the id in the rule type mapping for "source file"
for id, name := range meta.GetRuleTypeMapping() {
if name == "source file" {
if name == targethasher.SourceFileType {
return id
}
}
Expand Down
2 changes: 1 addition & 1 deletion controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,7 @@ func TestGetChangedTargets_streamChunks(t *testing.T) {
stream.EXPECT().Context().Return(context.Background())

var sentResponses []*pb.GetChangedTargetsResponse
stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *pb.GetChangedTargetsResponse, opts ...interface{}) error {
stream.EXPECT().Send(gomock.Any()).DoAndReturn(func(resp *pb.GetChangedTargetsResponse, _ ...interface{}) error {
sentResponses = append(sentResponses, resp)
return nil
}).Times(2)
Expand Down
2 changes: 1 addition & 1 deletion controller/gettargetgraph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -391,5 +391,5 @@ func newMockReadCloser(data []byte) io.ReadCloser {

type errReadCloser struct{ err error }

func (e *errReadCloser) Read(p []byte) (int, error) { return 0, e.err }
func (e *errReadCloser) Read(_ []byte) (int, error) { return 0, e.err }
func (e *errReadCloser) Close() error { return nil }
2 changes: 1 addition & 1 deletion core/bazel/bazel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestNewBazelClient(t *testing.T) {
WorkspacePath: "/tmp/test",
EnvVarsMap: map[string]string{"FOO": "bar"},
Logger: zap.NewNop().Sugar(),
ExecCommandContext: func(ctx context.Context, name string, arg ...string) commander {
ExecCommandContext: func(_ context.Context, _ string, _ ...string) commander {
return nil
},
},
Expand Down
14 changes: 8 additions & 6 deletions core/bazel/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,11 @@ func TestExecuteQuery_Success(t *testing.T) {
WorkspacePath: "/tmp/test",
EnvVarsMap: map[string]string{},
Logger: zap.NewNop().Sugar(),
ExecCommandContext: func(ctx context.Context, name string, arg ...string) commander {
ExecCommandContext: func(_ context.Context, _ string, _ ...string) commander {
return mockCmd
},
})
require.NoError(t, err)

resp, err := client.ExecuteQuery(context.Background(), &QueryRequest{Query: "//..."})
require.NoError(t, err)
Expand Down Expand Up @@ -105,7 +106,7 @@ func TestExecuteQuery_WithStartupOptions(t *testing.T) {
WorkspacePath: "/tmp/test",
EnvVarsMap: map[string]string{},
Logger: zap.NewNop().Sugar(),
ExecCommandContext: func(ctx context.Context, name string, arg ...string) commander {
ExecCommandContext: func(_ context.Context, _ string, arg ...string) commander {
capturedArgs = arg
return mockCmd
},
Expand Down Expand Up @@ -156,7 +157,7 @@ func TestExecuteQueryInternal_ContextTimeout(t *testing.T) {
EnvVarsMap: map[string]string{},
QueryTimeout: 10 * time.Millisecond, // Short timeout for test

ExecCommandContext: func(ctx context.Context, name string, arg ...string) commander {
ExecCommandContext: func(ctx context.Context, _ string, _ ...string) commander {
// Simulate process behavior: when context is cancelled, close pipes
go func() {
<-ctx.Done()
Expand All @@ -169,7 +170,7 @@ func TestExecuteQueryInternal_ContextTimeout(t *testing.T) {
})
require.NoError(t, err)
result, err := client.executeQueryInternal(context.Background(), "//...", nil)
require.Nil(t, result)
require.Empty(t, result.GetTarget())
require.Error(t, err)
// Should get timeout or deadline exceeded error
assert.Contains(t, err.Error(), "deadline exceeded")
Expand Down Expand Up @@ -234,7 +235,7 @@ func TestExecuteQueryInternal_Failures(t *testing.T) {
WorkspacePath: "/tmp/test",
EnvVarsMap: map[string]string{},
Logger: zap.NewNop().Sugar(),
ExecCommandContext: func(ctx context.Context, name string, arg ...string) commander {
ExecCommandContext: func(_ context.Context, _ string, _ ...string) commander {
return mockCmd
},
})
Expand Down Expand Up @@ -263,10 +264,11 @@ func TestExecuteQuery_ErrorCase(t *testing.T) {
WorkspacePath: "/tmp/test",
EnvVarsMap: map[string]string{},
Logger: zap.NewNop().Sugar(),
ExecCommandContext: func(ctx context.Context, name string, arg ...string) commander {
ExecCommandContext: func(_ context.Context, _ string, _ ...string) commander {
return mockCmd
},
})
require.NoError(t, err)

resp, err := client.ExecuteQuery(context.Background(), &QueryRequest{Query: "//..."})
require.Error(t, err)
Expand Down
46 changes: 18 additions & 28 deletions core/bazel/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,38 +24,28 @@ import (
)

func streamOutput(ctx context.Context, src io.Reader, dst io.Writer) error {
done := make(chan error, 1)
go func() {
_, err := io.Copy(dst, src)
done <- err
}()

select {
case <-ctx.Done():
return ctx.Err()
case err := <-done:
return err
buf := make([]byte, 32*1024)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should be separate change?

for {
if err := ctx.Err(); err != nil {
return err
}
n, readErr := src.Read(buf)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ctxReader works ok with io.Copy, and no need to reimplement io.Copy then

if n > 0 {
if _, writeErr := dst.Write(buf[:n]); writeErr != nil {
return writeErr
}
}
if readErr != nil {
if readErr == io.EOF {
return nil
}
return readErr
}
}
}

func streamAndParseTargets(ctx context.Context, src io.Reader, dst io.Writer) (*buildpb.QueryResult, error) {
type result struct {
queryResult *buildpb.QueryResult
err error
}
done := make(chan result, 1)

go func() {
queryResult, err := getQueryResult(ctx, src, dst)
done <- result{queryResult: queryResult, err: err}
}()

select {
case <-ctx.Done():
return nil, ctx.Err()
case res := <-done:
return res.queryResult, res.err
}
return getQueryResult(ctx, src, dst)
}

// cancelCheckInterval is how often we poll ctx.Err() inside per-target hot loops.
Expand Down
4 changes: 2 additions & 2 deletions core/git/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ func TestDefaultGit_FileHashes(t *testing.T) {
`100644 blob d236 file1
100644 blob 9bcc file2`),
wantHashes: map[string][]byte{
"file1": []byte{0xd2, 0x36},
"file2": []byte{0x9b, 0xcc},
"file1": {0xd2, 0x36},
"file2": {0x9b, 0xcc},
},
},
{
Expand Down
2 changes: 1 addition & 1 deletion core/repomanager/repo_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type repoManager struct {
// The origin directory holds the initial clone; workers are cheap local copies.
type workerPool struct {
originDir string
originMu sync.Mutex // one lock per repo for orginal clone
originMu sync.Mutex // one lock per repo for original clone
cloned bool

avail chan *workerSlot // available slots; pool capacity
Expand Down
6 changes: 3 additions & 3 deletions core/storage/memstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func NewMemoryStorage() Storage {
}

// Get downloads a blob from the storage. Return NotFoundError when the blob is not found.
func (m *memoryStorage) Get(ctx context.Context, req DownloadRequest) (DownloadResponse, error) {
func (m *memoryStorage) Get(_ context.Context, req DownloadRequest) (DownloadResponse, error) {
m.mu.RLock()
defer m.mu.RUnlock()
b, ok := m.data[req.Key]
Expand All @@ -60,14 +60,14 @@ func (m *memoryStorage) Put(ctx context.Context, req UploadRequest) error {
return nil
}

func (m *memoryStorage) Exists(ctx context.Context, key string) (bool, error) {
func (m *memoryStorage) Exists(_ context.Context, key string) (bool, error) {
m.mu.RLock()
defer m.mu.RUnlock()
_, ok := m.data[key]
return ok, nil
}

func (m *memoryStorage) List(ctx context.Context, prefix string) ([]string, error) {
func (m *memoryStorage) List(_ context.Context, prefix string) ([]string, error) {
m.mu.RLock()
defer m.mu.RUnlock()
var keys []string
Expand Down
4 changes: 1 addition & 3 deletions core/targethasher/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,9 @@ go_test(
"//core/bazel",
"@com_github_bazelbuild_buildtools//build_proto",
"@com_github_deckarep_golang_set_v2//:golang-set",
"@com_github_golang_mock//gomock",
"@com_github_google_go_cmp//cmp",
"@com_github_google_go_cmp//cmp/cmpopts",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@org_uber_go_mock//gomock",
],
)

Expand Down
13 changes: 0 additions & 13 deletions core/targethasher/graph.go
Original file line number Diff line number Diff line change
Expand Up @@ -734,19 +734,6 @@ func isExternalTarget(targetName string) bool {
return strings.HasPrefix(targetName, externalWorkspaceFilePrefix)
}

func isExternalWorkspaceType(t *buildpb.Target) bool {
switch *t.Type {
case buildpb.Target_SOURCE_FILE:
return strings.HasPrefix(t.SourceFile.GetName(), externalWorkspaceRulePrefix)
case buildpb.Target_RULE:
return strings.HasPrefix(t.Rule.GetName(), externalWorkspaceRulePrefix)
case buildpb.Target_GENERATED_FILE:
return strings.HasPrefix(t.GeneratedFile.GetName(), externalWorkspaceRulePrefix)
default:
return false
}
}

func isExcluded(targetName string, excludedRegex []*regexp.Regexp) bool {
for _, re := range excludedRegex {
if re.MatchString(targetName) {
Expand Down
37 changes: 8 additions & 29 deletions core/targethasher/graph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,9 @@ import (

buildpb "github.com/bazelbuild/buildtools/build_proto"
set "github.com/deckarep/golang-set/v2"
"github.com/golang/mock/gomock"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/mock/gomock"

"github.com/uber/tango/core/bazel"
)
Expand Down Expand Up @@ -77,7 +75,7 @@ func TestContextCancellation(t *testing.T) {

// verify fromProto honors context cancellation
qr := &buildpb.QueryResult{
Target: []*buildpb.Target{&buildpb.Target{}},
Target: []*buildpb.Target{{}},
}
result, err := fromProto(ctx, qr, nil, "", set.NewSet[string](), set.NewSet[string](), nil, false)
assert.Equal(t, EmptyResult(), result)
Expand Down Expand Up @@ -219,15 +217,15 @@ func Test_RemoveAttrs(t *testing.T) {
Rule: &buildpb.Rule{
Name: StringPtr("//pkg:go_default_library"),
Attribute: []*buildpb.Attribute{
&buildpb.Attribute{
{
Name: StringPtr("url"),
StringValue: StringPtr("some_url"),
},
&buildpb.Attribute{
{
Name: StringPtr("urls"),
StringListValue: []string{"url1", "url2"},
},
&buildpb.Attribute{
{
Name: StringPtr("to_keep"),
StringValue: StringPtr("target1"),
},
Expand All @@ -245,15 +243,15 @@ func Test_RemoveAttrs(t *testing.T) {
Rule: &buildpb.Rule{
Name: StringPtr("//external:some_rule"),
Attribute: []*buildpb.Attribute{
&buildpb.Attribute{
{
Name: StringPtr("url"),
StringValue: StringPtr("some_url"),
},
&buildpb.Attribute{
{
Name: StringPtr("urls"),
StringListValue: []string{"url1", "url2"},
},
&buildpb.Attribute{
{
Name: StringPtr("to_keep"),
StringValue: StringPtr("external"),
},
Expand All @@ -276,25 +274,6 @@ func Test_RemoveAttrs(t *testing.T) {
assert.Equal(t, []byte{0x7c, 0xde, 0x91, 0xc2, 0x94, 0x1a, 0x22, 0xf3, 0xb2, 0x18, 0x7c, 0x21, 0xbf, 0x32, 0x17, 0xc0, 0xa3, 0xf0, 0xc, 0x77}, external.HashWithoutDeps)
}

func validateResultIsStable(t *testing.T, baseResult, result Result) {
t.Helper()
require.ElementsMatch(t, baseResult.TargetNames, result.TargetNames)
for _, targetName := range baseResult.TargetNames {
base, ok := baseResult.Targets[targetName]
require.True(t, ok)
res, ok := result.Targets[targetName]
require.True(t, ok)
assert.Equal(t, base.Hash, res.Hash)
}
}

func assertEqualTargetHash(t *testing.T, expected, actual Target) {
opt := cmpopts.IgnoreUnexported(Target{})
// too many nested attributes to compare
ignore := cmpopts.IgnoreFields(Target{}, "Attributes", "SourceFile", "Rule")
assert.True(t, cmp.Equal(expected, actual, opt, ignore), cmp.Diff(expected, actual, opt))
}

func Test_fromProto(t *testing.T) {
ctrl := gomock.NewController(t)

Expand Down
Loading
Loading