diff --git a/mettle/worker/BUILD b/mettle/worker/BUILD index e538b31a..0c32265c 100644 --- a/mettle/worker/BUILD +++ b/mettle/worker/BUILD @@ -1,6 +1,9 @@ go_library( name = "worker", - srcs = glob(["*.go"]), + srcs = glob( + ["*.go"], + exclude = ["*_test.go"], + ), visibility = ["//mettle/..."], deps = [ "///third_party/go/cloud.google.com_go_pubsub//apiv1", @@ -43,3 +46,12 @@ go_library( "//third_party/proto/resourceusage", ], ) + +go_test( + name = "worker_test", + srcs = glob(["*_test.go"]), + deps = [ + ":worker", + "///third_party/go/github.com_stretchr_testify//assert", + ], +) diff --git a/mettle/worker/worker.go b/mettle/worker/worker.go index 1150b8cd..d314975b 100644 --- a/mettle/worker/worker.go +++ b/mettle/worker/worker.go @@ -14,6 +14,7 @@ import ( "path" "path/filepath" "runtime" + "sort" "strconv" "strings" "syscall" @@ -438,6 +439,7 @@ type worker struct { fileDownload time.Duration lastURL string // This is reset somewhat lazily. stdout, stderr bytes.Buffer + platformProps map[string]string // For limiting parallelism during download / write actions limiter, iolimiter chan struct{} @@ -477,6 +479,7 @@ func (w *worker) RunTask(ctx context.Context) (*pb.ExecuteResponse, error) { } w.currentMsg = nil w.actionDigest = nil + w.platformProps = nil return response, err } @@ -517,6 +520,7 @@ func (w *worker) runTask(msg *pubsub.Message) *pb.ExecuteResponse { Status: status(err, codes.FailedPrecondition, "Badly serialised request: %s", err), } } + w.platformProps = copyPlatformProperties(msg.Metadata) return w.runTaskRequest(req) } @@ -534,6 +538,7 @@ func (w *worker) runTaskRequest(req *pb.ExecuteRequest) *pb.ExecuteResponse { logr.WithFields(logrus.Fields{ "hash": w.actionDigest.Hash, }).Debug("Received task for action digest") + w.logPlatformProperties() w.lastURL = w.actionURL() action, command, status := w.fetchRequestBlobs(req) @@ -562,6 +567,43 @@ func (w *worker) runTaskRequest(req *pb.ExecuteRequest) *pb.ExecuteResponse { return w.execute(req, action, command) } +func copyPlatformProperties(metadata map[string]string) map[string]string { + if len(metadata) == 0 { + return nil + } + ret := make(map[string]string, len(metadata)) + for k, v := range metadata { + ret[k] = v + } + return ret +} + +func formatPlatformProperties(props map[string]string) string { + if len(props) == 0 { + return "" + } + keys := make([]string, 0, len(props)) + for key := range props { + keys = append(keys, key) + } + sort.Strings(keys) + formatted := make([]string, 0, len(keys)) + for _, key := range keys { + formatted = append(formatted, key+"="+props[key]) + } + return strings.Join(formatted, ", ") +} + +func (w *worker) logPlatformProperties() { + if len(w.platformProps) == 0 { + return + } + logr.WithFields(logrus.Fields{ + "hash": w.actionDigest.Hash, + "platformProperties": formatPlatformProperties(w.platformProps), + }).Info("Action specifies platform properties; would use them to select sandbox filesystem") +} + // forceShutdown sends any shutdown reports and calls log.Fatal() to shut down the worker func (w *worker) forceShutdown(shutdownMsg string) { w.Report(false, false, false, "%s", shutdownMsg) diff --git a/mettle/worker/worker_platform_test.go b/mettle/worker/worker_platform_test.go new file mode 100644 index 00000000..eb145bac --- /dev/null +++ b/mettle/worker/worker_platform_test.go @@ -0,0 +1,39 @@ +package worker + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestCopyPlatformProperties(t *testing.T) { + original := map[string]string{ + "filesystem": "disk", + "worker_pool": "large", + } + copied := copyPlatformProperties(original) + + assert.Equal(t, original, copied) + + original["filesystem"] = "tmpfs" + assert.Equal(t, "disk", copied["filesystem"]) +} + +func TestCopyPlatformPropertiesNilWhenEmpty(t *testing.T) { + assert.Nil(t, copyPlatformProperties(nil)) + assert.Nil(t, copyPlatformProperties(map[string]string{})) +} + +func TestFormatPlatformProperties(t *testing.T) { + props := map[string]string{ + "worker_pool": "large", + "filesystem": "disk", + } + + assert.Equal(t, "filesystem=disk, worker_pool=large", formatPlatformProperties(props)) +} + +func TestFormatPlatformPropertiesEmpty(t *testing.T) { + assert.Equal(t, "", formatPlatformProperties(nil)) + assert.Equal(t, "", formatPlatformProperties(map[string]string{})) +}