diff --git a/BLACKSMITH.md b/BLACKSMITH.md index f96f94c..05db9d7 100644 --- a/BLACKSMITH.md +++ b/BLACKSMITH.md @@ -23,6 +23,40 @@ from `github.com/useblacksmith/bazel-remote/v2`. Existing FA imports intentionally keep the upstream import path so this fork remains behavior-preserving until Blacksmith-specific changes are needed. +## Build cache storage prefixing + +BLA-4006 keeps the default upstream behavior unless FA attaches an explicit +request-scoped storage prefix to the cache operation context. + +The existing configured S3 prefix remains the default path for Buck2 and any +other callers that do not opt in to request-scoped routing. For Bazel, FA should +resolve the authorized VM/job namespace to the full physical prefix: + +```text +/bazel///// +``` + +and attach it with `cache.WithStoragePrefix`. The S3 proxy then uses that +request-scoped prefix when constructing Action Cache and CAS object keys. Action +Cache also remains isolated by bazel-remote's existing instance-name key +remapping, so the physical prefix is additive and gives cache-clear/delete +operations a visible repo/generation boundary. The local disk cache AC/CAS keys +also include the request-scoped prefix, so a new repo/generation namespace does +not hit stale local entries before reaching the S3 backend. This lets a single +shared bazel-remote process route AC/CAS puts/gets to the correct +repo/generation namespace while preserving existing Buck2 behavior. + +Local disk cache entries store the full request prefix as a stable hash so the +LRU can distinguish identical AC/CAS digests from different repo/generation +namespaces without using S3-style slash-heavy prefixes in local paths. MinIO/S3 +object keys use the real request-scoped prefix directly, so broad remote +deletion still targets `/bazel/...//`. + +For Bazel requests, FA should also mark the request with +`cache.WithRequiredStoragePrefix`. If a request reaches the S3 proxy with that +marker but without a request-scoped prefix, bazel-remote logs that it is falling +back to the configured process-wide prefix. Buck2 should not set this marker. + ## Security and upstream patch tracking Track upstream security fixes by monitoring the upstream repository's releases, diff --git a/cache/cache.go b/cache/cache.go index ce57604..7a8a007 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -107,3 +107,33 @@ func TransformActionCacheKey(key, instance string, logger Logger) string { func LookupKey(kind EntryKind, hash string) string { return kind.String() + "/" + hash } + +func StoragePrefixID(prefix string) string { + sum := sha256.Sum256([]byte(prefix)) + return hex.EncodeToString(sum[:]) +} + +func StoragePrefixIDFromContext(ctx context.Context) (string, bool) { + prefix, ok := StoragePrefixFromContext(ctx) + if !ok { + return "", false + } + return StoragePrefixID(prefix), true +} + +func LookupKeyForContext(ctx context.Context, kind EntryKind, hash string) string { + if kind == AC || kind == CAS { + if prefixID, ok := StoragePrefixIDFromContext(ctx); ok { + return LookupKeyForStoragePrefixID(prefixID, kind, hash) + } + } + return LookupKey(kind, hash) +} + +func LookupKeyForStoragePrefixID(prefixID string, kind EntryKind, hash string) string { + key := LookupKey(kind, hash) + if (kind == AC || kind == CAS) && prefixID != "" { + return key + "/" + prefixID + } + return key +} diff --git a/cache/cache_test.go b/cache/cache_test.go new file mode 100644 index 0000000..f9bb5d7 --- /dev/null +++ b/cache/cache_test.go @@ -0,0 +1,57 @@ +package cache + +import ( + "context" + "testing" +) + +func TestLookupKeyForContextDefaultsToOriginalKey(t *testing.T) { + result := LookupKeyForContext(context.Background(), CAS, "hash") + expected := "cas/hash" + if result != expected { + t.Fatalf("LookupKeyForContext() = %q, want %q", result, expected) + } +} + +func TestLookupKeyForContextIncludesStoragePrefix(t *testing.T) { + prefix := "bazel/production/us-east-1/42/987654/v0" + ctx := WithStoragePrefix(context.Background(), prefix) + + result := LookupKeyForContext(ctx, CAS, "hash") + expected := "cas/hash/" + StoragePrefixID(prefix) + if result != expected { + t.Fatalf("LookupKeyForContext() = %q, want %q", result, expected) + } +} + +func TestLookupKeyForContextIncludesStoragePrefixForActionCache(t *testing.T) { + prefix := "bazel/production/us-east-1/42/987654/v0" + ctx := WithStoragePrefix(context.Background(), prefix) + + result := LookupKeyForContext(ctx, AC, "hash") + expected := "ac/hash/" + StoragePrefixID(prefix) + if result != expected { + t.Fatalf("LookupKeyForContext() = %q, want %q", result, expected) + } +} + +func TestLookupKeyForContextIgnoresStoragePrefixForRaw(t *testing.T) { + ctx := WithStoragePrefix(context.Background(), "bazel/production/us-east-1/42/987654/v0") + + result := LookupKeyForContext(ctx, RAW, "hash") + expected := "raw/hash" + if result != expected { + t.Fatalf("LookupKeyForContext() = %q, want %q", result, expected) + } +} + +func TestStoragePrefixRequiredFromContext(t *testing.T) { + if StoragePrefixRequiredFromContext(context.Background()) { + t.Fatal("StoragePrefixRequiredFromContext() = true, want false") + } + + ctx := WithRequiredStoragePrefix(context.Background()) + if !StoragePrefixRequiredFromContext(ctx) { + t.Fatal("StoragePrefixRequiredFromContext() = false, want true") + } +} diff --git a/cache/disk/disk.go b/cache/disk/disk.go index d250565..9f4ecd4 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -154,17 +154,34 @@ func (c *diskCache) updateCacheAgeMetric() { func (c *diskCache) getElementPath(key Key, value lruItem) string { ks := key.(string) - hash := ks[len(ks)-sha256.Size*2:] + kind, hash, storagePrefixID := lookupKeyParts(ks) + return filepath.Join(c.dir, c.FileLocationForStoragePrefixID(storagePrefixID, kind, value.legacy, hash, value.size, value.random)) +} + +func lookupKeyParts(key string) (cache.EntryKind, string, string) { var kind cache.EntryKind = cache.AC - if strings.HasPrefix(ks, "cas") { + var hash string + var storagePrefixID string + parts := strings.Split(key, "/") + if len(parts) >= 2 { + hash = parts[1] + if len(parts) >= 3 { + storagePrefixID = parts[2] + } + } + if hash == "" && len(key) >= sha256.Size*2 { + hash = key[len(key)-sha256.Size*2:] + } + + if strings.HasPrefix(key, "cas") { kind = cache.CAS - } else if strings.HasPrefix(ks, "ac") { + } else if strings.HasPrefix(key, "ac") { kind = cache.AC - } else if strings.HasPrefix(ks, "raw") { + } else if strings.HasPrefix(key, "raw") { kind = cache.RAW } - return filepath.Join(c.dir, c.FileLocation(kind, value.legacy, hash, value.size, value.random)) + return kind, hash, storagePrefixID } func (c *diskCache) removeFile(f string) { @@ -196,6 +213,22 @@ func (c *diskCache) FileLocationBase(kind cache.EntryKind, legacy bool, hash str return fmt.Sprintf("cas.v2/%s/%s-%d", hash[:2], hash, size) } +func (c *diskCache) FileLocationBaseForStoragePrefixID(storagePrefixID string, kind cache.EntryKind, legacy bool, hash string, size int64) string { + location := c.FileLocationBase(kind, legacy, hash, size) + if kind == cache.RAW || storagePrefixID == "" { + return location + } + return path.Join(storagePrefixID, location) +} + +func (c *diskCache) FileLocationBaseForContext(ctx context.Context, kind cache.EntryKind, legacy bool, hash string, size int64) string { + if kind == cache.RAW { + return c.FileLocationBase(kind, legacy, hash, size) + } + storagePrefixID, _ := cache.StoragePrefixIDFromContext(ctx) + return c.FileLocationBaseForStoragePrefixID(storagePrefixID, kind, legacy, hash, size) +} + func (c *diskCache) FileLocation(kind cache.EntryKind, legacy bool, hash string, size int64, random string) string { if kind == cache.RAW { return path.Join("raw.v2", hash[:2], hash+"-"+random) @@ -212,6 +245,22 @@ func (c *diskCache) FileLocation(kind cache.EntryKind, legacy bool, hash string, return fmt.Sprintf("cas.v2/%s/%s-%d-%s", hash[:2], hash, size, random) } +func (c *diskCache) FileLocationForStoragePrefixID(storagePrefixID string, kind cache.EntryKind, legacy bool, hash string, size int64, random string) string { + location := c.FileLocation(kind, legacy, hash, size, random) + if kind == cache.RAW || storagePrefixID == "" { + return location + } + return path.Join(storagePrefixID, location) +} + +func (c *diskCache) FileLocationForContext(ctx context.Context, kind cache.EntryKind, legacy bool, hash string, size int64, random string) string { + if kind == cache.RAW { + return c.FileLocation(kind, legacy, hash, size, random) + } + storagePrefixID, _ := cache.StoragePrefixIDFromContext(ctx) + return c.FileLocationForStoragePrefixID(storagePrefixID, kind, legacy, hash, size, random) +} + // Put stores a stream of `size` bytes from `r` into the cache. // If `hash` is not the empty string, and the contents don't match it, // a non-nil error is returned. All data will be read from `r` before @@ -241,7 +290,7 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string, return nil } - key := cache.LookupKey(kind, hash) + key := cache.LookupKeyForContext(ctx, kind, hash) var tf *os.File // Tempfile. var blobFile string @@ -299,7 +348,10 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string, legacy := kind == cache.CAS && c.storageMode == casblob.Identity // Final destination, if all goes well. - filePath := path.Join(c.dir, c.FileLocationBase(kind, legacy, hash, size)) + filePath := path.Join(c.dir, c.FileLocationBaseForContext(ctx, kind, legacy, hash, size)) + if err := os.MkdirAll(path.Dir(filePath), os.ModePerm); err != nil { + return internalErr(err) + } // We will download to this temporary file. tf, random, err := tfc.Create(filePath, legacy) @@ -424,18 +476,18 @@ func (c *diskCache) commit(key string, legacy bool, tempfile string, reservedSiz // but that we can try the proxy backend. // // This function assumes that only CAS blobs are requested in zstd form. -func (c *diskCache) availableOrTryProxy(kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (io.ReadCloser, int64, bool, error) { +func (c *diskCache) availableOrTryProxy(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64, zstd bool) (io.ReadCloser, int64, bool, error) { locked := true var err error c.mu.Lock() - key := cache.LookupKey(kind, hash) + key := cache.LookupKeyForContext(ctx, kind, hash) item, available := c.lru.Get(key) if available { c.mu.Unlock() // We expect a cache hit below. locked = false - blobPath := path.Join(c.dir, c.FileLocation(kind, item.legacy, hash, item.size, item.random)) + blobPath := path.Join(c.dir, c.FileLocationForContext(ctx, kind, item.legacy, hash, item.size, item.random)) if !isSizeMismatch(size, item.size) { var f *os.File @@ -447,7 +499,7 @@ func (c *diskCache) availableOrTryProxy(kind cache.EntryKind, hash string, size c.mu.Lock() item, available = c.lru.Get(key) if available { - blobPath = path.Join(c.dir, c.FileLocation(kind, item.legacy, hash, item.size, item.random)) + blobPath = path.Join(c.dir, c.FileLocationForContext(ctx, kind, item.legacy, hash, item.size, item.random)) f, err = os.Open(blobPath) } c.mu.Unlock() @@ -575,7 +627,7 @@ func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string, } var err error - key := cache.LookupKey(kind, hash) + key := cache.LookupKeyForContext(ctx, kind, hash) var tf *os.File // Tempfile we will write to. var blobFile string @@ -608,7 +660,7 @@ func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string, } }() - f, foundSize, tryProxy, err := c.availableOrTryProxy(kind, hash, size, offset, zstd) + f, foundSize, tryProxy, err := c.availableOrTryProxy(ctx, kind, hash, size, offset, zstd) if err != nil { return nil, -1, internalErr(err) } @@ -644,7 +696,10 @@ func (c *diskCache) get(ctx context.Context, kind cache.EntryKind, hash string, legacy := kind == cache.CAS && c.storageMode == casblob.Identity - blobPathBase := path.Join(c.dir, c.FileLocationBase(kind, legacy, hash, foundSize)) + blobPathBase := path.Join(c.dir, c.FileLocationBaseForContext(ctx, kind, legacy, hash, foundSize)) + if err := os.MkdirAll(path.Dir(blobPathBase), os.ModePerm); err != nil { + return nil, -1, internalErr(err) + } tf, random, err := tfc.Create(blobPathBase, legacy) if err != nil { return nil, -1, internalErr(err) @@ -718,7 +773,7 @@ func (c *diskCache) Contains(ctx context.Context, kind cache.EntryKind, hash str } foundSize := int64(-1) - key := cache.LookupKey(kind, hash) + key := cache.LookupKeyForContext(ctx, kind, hash) c.mu.Lock() item, exists := c.lru.Get(key) diff --git a/cache/disk/disk_test.go b/cache/disk/disk_test.go index 6fc3230..7dc3e5d 100644 --- a/cache/disk/disk_test.go +++ b/cache/disk/disk_test.go @@ -13,6 +13,7 @@ import ( "net/url" "os" "path" + "path/filepath" "strings" "sync" "testing" @@ -97,6 +98,242 @@ func TestCacheBasics(t *testing.T) { } } +func TestStoragePrefixScopesLocalDiskCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cacheDir := tempDir(t) + defer os.RemoveAll(cacheDir) + + testCacheI, err := New(cacheDir, 1024*1024, WithAccessLogger(testutils.NewSilentLogger())) + if err != nil { + t.Fatal(err) + } + testCache := testCacheI.(*diskCache) + + repoAContext := cache.WithStoragePrefix(ctx, "bazel/production/us-east-1/42/987654/v0") + repoBContext := cache.WithStoragePrefix(ctx, "bazel/production/us-east-1/42/111111/v0") + + err = testCache.Put(repoAContext, cache.CAS, contentsHash, contentsLength, strings.NewReader(contents)) + if err != nil { + t.Fatal(err) + } + + rdr, sizeBytes, err := testCache.Get(repoAContext, cache.CAS, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr == nil { + t.Fatal("expected repo A to hit its own local CAS entry") + } + rdr.Close() + if sizeBytes != contentsLength { + t.Fatalf("repo A local CAS size = %d, want %d", sizeBytes, contentsLength) + } + + rdr, _, err = testCache.Get(repoBContext, cache.CAS, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr != nil { + rdr.Close() + t.Fatal("expected repo B to miss repo A's local CAS entry") + } + + found, _ := testCache.Contains(repoBContext, cache.CAS, contentsHash, contentsLength) + if found { + t.Fatal("expected repo B Contains to miss repo A's local CAS entry") + } + + missing, err := testCache.FindMissingCasBlobs(repoAContext, []*pb.Digest{{Hash: contentsHash, SizeBytes: contentsLength}}) + if err != nil { + t.Fatal(err) + } + if len(missing) != 0 { + t.Fatalf("expected repo A FindMissingCasBlobs to find scoped CAS entry, got %d missing", len(missing)) + } + + missing, err = testCache.FindMissingCasBlobs(repoBContext, []*pb.Digest{{Hash: contentsHash, SizeBytes: contentsLength}}) + if err != nil { + t.Fatal(err) + } + if len(missing) != 1 { + t.Fatalf("expected repo B FindMissingCasBlobs to miss repo A scoped CAS entry, got %d missing", len(missing)) + } +} + +func TestStoragePrefixScopesActionCacheLocalDiskCache(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cacheDir := tempDir(t) + defer os.RemoveAll(cacheDir) + + testCacheI, err := New(cacheDir, 1024*1024, WithAccessLogger(testutils.NewSilentLogger())) + if err != nil { + t.Fatal(err) + } + testCache := testCacheI.(*diskCache) + + repoAContext := cache.WithStoragePrefix(ctx, "bazel/production/us-east-1/42/987654/v0") + repoBContext := cache.WithStoragePrefix(ctx, "bazel/production/us-east-1/42/111111/v0") + + err = testCache.Put(repoAContext, cache.AC, contentsHash, contentsLength, strings.NewReader(contents)) + if err != nil { + t.Fatal(err) + } + + rdr, _, err := testCache.Get(repoAContext, cache.AC, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr == nil { + t.Fatal("expected repo A to hit its own scoped local AC entry") + } + rdr.Close() + + rdr, _, err = testCache.Get(repoBContext, cache.AC, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr != nil { + rdr.Close() + t.Fatal("expected repo B to miss repo A's scoped local AC entry") + } + + restartedCacheI, err := New(cacheDir, 1024*1024, WithAccessLogger(testutils.NewSilentLogger())) + if err != nil { + t.Fatal(err) + } + restartedCache := restartedCacheI.(*diskCache) + + rdr, _, err = restartedCache.Get(repoAContext, cache.AC, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr == nil { + t.Fatal("expected repo A to reload its scoped local AC entry") + } + rdr.Close() + + rdr, _, err = restartedCache.Get(repoBContext, cache.AC, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr != nil { + rdr.Close() + t.Fatal("expected repo B to miss repo A's scoped local AC entry after restart") + } +} + +func TestStoragePrefixScopesLocalDiskCacheAcrossRestart(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cacheDir := tempDir(t) + defer os.RemoveAll(cacheDir) + + repoAContext := cache.WithStoragePrefix(ctx, "bazel/production/us-east-1/42/987654/v0") + repoBContext := cache.WithStoragePrefix(ctx, "bazel/production/us-east-1/42/111111/v0") + + testCacheI, err := New(cacheDir, 1024*1024, WithAccessLogger(testutils.NewSilentLogger())) + if err != nil { + t.Fatal(err) + } + testCache := testCacheI.(*diskCache) + + err = testCache.Put(repoAContext, cache.CAS, contentsHash, contentsLength, strings.NewReader(contents)) + if err != nil { + t.Fatal(err) + } + + restartedCacheI, err := New(cacheDir, 1024*1024, WithAccessLogger(testutils.NewSilentLogger())) + if err != nil { + t.Fatal(err) + } + restartedCache := restartedCacheI.(*diskCache) + + rdr, _, err := restartedCache.Get(repoAContext, cache.CAS, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr == nil { + t.Fatal("expected repo A to reload its scoped local CAS entry") + } + rdr.Close() + + rdr, _, err = restartedCache.Get(repoBContext, cache.CAS, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr != nil { + rdr.Close() + t.Fatal("expected repo B to miss repo A's scoped local CAS entry after restart") + } + + rdr, _, err = restartedCache.Get(ctx, cache.CAS, contentsHash, contentsLength, 0) + if err != nil { + t.Fatal(err) + } + if rdr != nil { + rdr.Close() + t.Fatal("expected unscoped context to miss repo A's scoped local CAS entry after restart") + } +} + +func TestStoragePrefixEvictsScopedCASFile(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + cacheDir := tempDir(t) + defer os.RemoveAll(cacheDir) + + prefix := "bazel/production/us-east-1/42/987654/v0" + repoContext := cache.WithStoragePrefix(ctx, prefix) + data1, hash1 := testutils.RandomDataAndHash(128) + data2, hash2 := testutils.RandomDataAndHash(128) + + testCacheI, err := New(cacheDir, BlockSize, WithAccessLogger(testutils.NewSilentLogger())) + if err != nil { + t.Fatal(err) + } + testCache := testCacheI.(*diskCache) + + err = testCache.Put(repoContext, cache.CAS, hash1, int64(len(data1)), bytes.NewReader(data1)) + if err != nil { + t.Fatal(err) + } + + pattern := filepath.Join( + cacheDir, + cache.StoragePrefixID(prefix), + "cas.v2", + hash1[:2], + hash1+"-128-*", + ) + matches, err := filepath.Glob(pattern) + if err != nil { + t.Fatal(err) + } + if len(matches) != 1 { + t.Fatalf("expected one scoped CAS file matching %s, found %d", pattern, len(matches)) + } + scopedCASFile := matches[0] + + err = testCache.Put(repoContext, cache.CAS, hash2, int64(len(data2)), bytes.NewReader(data2)) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 100; i++ { + if _, err := os.Stat(scopedCASFile); os.IsNotExist(err) { + return + } + time.Sleep(10 * time.Millisecond) + } + t.Fatalf("expected scoped CAS file to be removed after eviction: %s", scopedCASFile) +} + func TestCachePutWrongSize(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() diff --git a/cache/disk/findmissing.go b/cache/disk/findmissing.go index d1db459..552660a 100644 --- a/cache/disk/findmissing.go +++ b/cache/disk/findmissing.go @@ -85,7 +85,7 @@ func (c *diskCache) findMissingCasBlobsInternal(ctx context.Context, blobs []*pb remaining = remaining[batchSize:] } - numMissing := c.findMissingLocalCAS(chunk) + numMissing := c.findMissingLocalCAS(ctx, chunk) if numMissing == 0 { continue } @@ -171,7 +171,7 @@ func filterNonNil(blobs []*pb.Digest) []*pb.Digest { // Set blobs that exist in the disk cache to nil, and return the number // of missing blobs. -func (c *diskCache) findMissingLocalCAS(blobs []*pb.Digest) int { +func (c *diskCache) findMissingLocalCAS(ctx context.Context, blobs []*pb.Digest) int { var exists bool var item lruItem var key string @@ -187,7 +187,7 @@ func (c *diskCache) findMissingLocalCAS(blobs []*pb.Digest) int { } foundSize := int64(-1) - key = cache.LookupKey(cache.CAS, blobs[i].Hash) + key = cache.LookupKeyForContext(ctx, cache.CAS, blobs[i].Hash) item, exists = c.lru.Get(key) if exists { foundSize = item.size diff --git a/cache/disk/load.go b/cache/disk/load.go index 04d876d..fb3d6dd 100644 --- a/cache/disk/load.go +++ b/cache/disk/load.go @@ -396,18 +396,31 @@ func (c *diskCache) scanDir() (scanResult, error) { // root dir of some unix style filesystems. const lostAndFound = "lost+found" + dre := regexp.MustCompile(`^[a-f0-9]{2}$`) + // Request-scoped AC/CAS files live under /{ac.v2,cas.v2}/... + // so local disk paths stay compact while the S3 backend uses the full prefix. + storagePrefixDRE := regexp.MustCompile(`^[a-f0-9]{64}$`) + for i := 0; i < numWorkers; i++ { dirListers.Go(func() error { for d := range dc { dirName := path.Join(c.dir, d) - var lookupKeyPrefix string - if strings.HasPrefix(d, "cas.v2/") { - lookupKeyPrefix = "cas/" - } else if strings.HasPrefix(d, "ac.v2/") { - lookupKeyPrefix = "ac/" - } else if strings.HasPrefix(d, "raw.v2/") { - lookupKeyPrefix = "raw/" + storagePrefixID := "" + cacheDir := d + parts := strings.SplitN(d, "/", 3) + if len(parts) == 3 && storagePrefixDRE.MatchString(parts[0]) { + storagePrefixID = parts[0] + cacheDir = path.Join(parts[1], parts[2]) + } + + var lookupKeyKind cache.EntryKind + if strings.HasPrefix(cacheDir, "cas.v2/") { + lookupKeyKind = cache.CAS + } else if strings.HasPrefix(cacheDir, "ac.v2/") { + lookupKeyKind = cache.AC + } else if strings.HasPrefix(cacheDir, "raw.v2/") { + lookupKeyKind = cache.RAW } else { return fmt.Errorf("Unrecognised directory in cache dir: %q", dirName) } @@ -452,7 +465,7 @@ func (c *diskCache) scanDir() (scanResult, error) { item[n] = &item_values[n] metadata[n] = &metadata_values[n] - metadata[n].lookupKey = lookupKeyPrefix + hash + metadata[n].lookupKey = cache.LookupKeyForStoragePrefixID(storagePrefixID, lookupKeyKind, hash) item[n].sizeOnDisk = info.Size() item[n].size = item[n].sizeOnDisk @@ -486,16 +499,91 @@ func (c *diskCache) scanDir() (scanResult, error) { }) } + queueCacheDirs := func(rootRel string) error { + root := c.dir + if rootRel != "" { + root = path.Join(c.dir, rootRel) + } + scopedRoot := storagePrefixDRE.MatchString(rootRel) + des, err := os.ReadDir(root) + if err != nil { + return err + } + + for _, de := range des { + name := de.Name() + + if !de.IsDir() { + if strings.ToLower(name) == lowercaseDSStoreFile { + continue + } + + return fmt.Errorf("Unexpected file: %s", path.Join(rootRel, name)) + } + + if name == lostAndFound { + continue + } + if rootRel == "" && storagePrefixDRE.MatchString(name) { + continue + } + + if scopedRoot && name != "ac.v2" && name != "cas.v2" { + return fmt.Errorf("Unexpected dir: %s", path.Join(rootRel, name)) + } + if name != "ac.v2" && name != "cas.v2" && name != "raw.v2" { + return fmt.Errorf("Unexpected dir: %s", path.Join(rootRel, name)) + } + + dir := path.Join(root, name) + des2, err := os.ReadDir(dir) + if err != nil { + return err + } + + for _, de2 := range des2 { + name2 := de2.Name() + + dirPath := path.Join(rootRel, name, name2) + + if !de2.IsDir() { + if strings.ToLower(name2) == lowercaseDSStoreFile { + continue + } + + return fmt.Errorf("Unexpected file: %s", dirPath) + } + + if name2 == lostAndFound { + continue + } + + if !dre.MatchString(name2) { + return fmt.Errorf("Unexpected dir: %s", dirPath) + } + + dc <- dirPath + } + } + return nil + } + des, err := os.ReadDir(c.dir) if err != nil { return scanResult{}, fmt.Errorf("Failed to read cache dir %q: %w", c.dir, err) } - dre := regexp.MustCompile(`^[a-f0-9]{2}$`) - + hasUnscopedDirs := false for _, de := range des { name := de.Name() + if de.IsDir() && storagePrefixDRE.MatchString(name) { + if err := queueCacheDirs(name); err != nil { + return scanResult{}, err + } + continue + } + if !de.IsDir() { if strings.ToLower(name) == lowercaseDSStoreFile { continue @@ -512,34 +600,12 @@ func (c *diskCache) scanDir() (scanResult, error) { return scanResult{}, fmt.Errorf("Unexpected dir: %s", name) } - dir := path.Join(c.dir, name) - des2, err := os.ReadDir(dir) - if err != nil { - return scanResult{}, err - } - - for _, de2 := range des2 { - name2 := de2.Name() - - dirPath := path.Join(name, name2) - - if !de2.IsDir() { - if strings.ToLower(name) == lowercaseDSStoreFile { - continue - } - - return scanResult{}, fmt.Errorf("Unexpected file: %s", dirPath) - } - - if name2 == lostAndFound { - continue - } - - if !dre.MatchString(name2) { - return scanResult{}, fmt.Errorf("Unexpected dir: %s", dirPath) - } + hasUnscopedDirs = true + } - dc <- dirPath + if hasUnscopedDirs { + if err := queueCacheDirs(""); err != nil { + return scanResult{}, err } } @@ -589,7 +655,7 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error { for i := 0; i < len(result.item); i++ { ok := c.lru.Add(result.metadata[i].lookupKey, *result.item[i]) if !ok { - err = os.Remove(filepath.Join(c.dir, result.metadata[i].lookupKey)) + err = os.Remove(c.getElementPath(result.metadata[i].lookupKey, *result.item[i])) if err != nil { return err } diff --git a/cache/grpcproxy/grpcproxy_test.go b/cache/grpcproxy/grpcproxy_test.go index 7a1abc8..e2635eb 100644 --- a/cache/grpcproxy/grpcproxy_test.go +++ b/cache/grpcproxy/grpcproxy_test.go @@ -244,7 +244,7 @@ func newFixture(t *testing.T, proxy cache.Proxy, storageMode string) *fixture { go func() { err := server.ServeGRPC(listener, grpcServer, false, false, true, diskCache, logger, logger) if err != nil { - logger.Printf(err.Error()) + logger.Printf("%s", err.Error()) } }() diff --git a/cache/s3proxy/s3proxy.go b/cache/s3proxy/s3proxy.go index 9c4acd7..bd665ad 100644 --- a/cache/s3proxy/s3proxy.go +++ b/cache/s3proxy/s3proxy.go @@ -27,7 +27,7 @@ type s3Cache struct { errorLogger cache.Logger v2mode bool updateTimestamps bool - objectKey func(hash string, kind cache.EntryKind) string + objectKey func(prefix string, hash string, kind cache.EntryKind) string } var ( @@ -97,13 +97,9 @@ func New( } if c.v2mode { - c.objectKey = func(hash string, kind cache.EntryKind) string { - return objectKeyV2(c.prefix, hash, kind) - } + c.objectKey = objectKeyV2 } else { - c.objectKey = func(hash string, kind cache.EntryKind) string { - return objectKeyV1(c.prefix, hash, kind) - } + c.objectKey = objectKeyV1 } c.uploadQueue = backendproxy.StartUploaders(c, numUploaders, maxQueuedUploads) @@ -135,6 +131,38 @@ func objectKeyV1(prefix string, hash string, kind cache.EntryKind) string { return path.Join(prefix, kind.String(), hash[:2], hash) } +func (c *s3Cache) prefixForContext(ctx context.Context, kind cache.EntryKind) (string, bool, bool) { + if kind != cache.RAW { + if prefix, ok := cache.StoragePrefixFromContext(ctx); ok { + return prefix, true, cache.StoragePrefixRequiredFromContext(ctx) + } + return c.prefix, false, cache.StoragePrefixRequiredFromContext(ctx) + } + return c.prefix, false, false +} + +func (c *s3Cache) objectKeyForPrefix(prefix string, hash string, kind cache.EntryKind) string { + return c.objectKey(prefix, hash, kind) +} + +func (c *s3Cache) objectKeyForContext(ctx context.Context, hash string, kind cache.EntryKind) string { + prefix, _, _ := c.prefixForContext(ctx, kind) + return c.objectKeyForPrefix(prefix, hash, kind) +} + +func (c *s3Cache) logMissingRequiredStoragePrefix(operation string, kind cache.EntryKind, hash string) { + if c.errorLogger == nil { + return + } + c.errorLogger.Printf( + "S3 %s missing request-scoped storage prefix for %s %s; using configured prefix %q", + operation, + kind.String(), + hash, + c.prefix, + ) +} + // Helper function for logging responses func logResponse(log cache.Logger, method, bucket, key string, err error) { status := "OK" @@ -146,14 +174,29 @@ func logResponse(log cache.Logger, method, bucket, key string, err error) { } func (c *s3Cache) UploadFile(item backendproxy.UploadReq) { + prefix := item.StoragePrefix + requestScopedPrefix := item.RequestScopedStoragePrefix + requirePrefix := item.RequireStoragePrefix + if item.Kind == cache.RAW { + prefix = c.prefix + requestScopedPrefix = false + requirePrefix = false + } + if prefix == "" { + prefix = c.prefix + } + if requirePrefix && !requestScopedPrefix { + c.logMissingRequiredStoragePrefix("UPLOAD", item.Kind, item.Hash) + } + objectKey := c.objectKeyForPrefix(prefix, item.Hash, item.Kind) _, err := c.mcore.PutObject( context.Background(), - c.bucket, // bucketName - c.objectKey(item.Hash, item.Kind), // objectName - item.Rc, // reader - item.SizeOnDisk, // objectSize - "", // md5base64 - "", // sha256 + c.bucket, // bucketName + objectKey, // objectName + item.Rc, // reader + item.SizeOnDisk, // objectSize + "", // md5base64 + "", // sha256 minio.PutObjectOptions{ UserMetadata: map[string]string{ "Content-Type": "application/octet-stream", @@ -161,7 +204,7 @@ func (c *s3Cache) UploadFile(item backendproxy.UploadReq) { }, // metadata ) - logResponse(c.accessLogger, "UPLOAD", c.bucket, c.objectKey(item.Hash, item.Kind), err) + logResponse(c.accessLogger, "UPLOAD", c.bucket, objectKey, err) item.Rc.Close() } @@ -171,14 +214,18 @@ func (c *s3Cache) Put(ctx context.Context, kind cache.EntryKind, hash string, lo rc.Close() return } + prefix, requestScopedPrefix, requirePrefix := c.prefixForContext(ctx, kind) select { case c.uploadQueue <- backendproxy.UploadReq{ - Hash: hash, - LogicalSize: logicalSize, - SizeOnDisk: sizeOnDisk, - Kind: kind, - Rc: rc, + Hash: hash, + LogicalSize: logicalSize, + SizeOnDisk: sizeOnDisk, + Kind: kind, + Rc: rc, + StoragePrefix: prefix, + RequestScopedStoragePrefix: requestScopedPrefix, + RequireStoragePrefix: requirePrefix, }: default: c.errorLogger.Printf("too many uploads queued\n") @@ -204,30 +251,35 @@ func (c *s3Cache) UpdateModificationTimestamp(ctx context.Context, bucket string } func (c *s3Cache) Get(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (io.ReadCloser, int64, error) { + prefix, requestScopedPrefix, requirePrefix := c.prefixForContext(ctx, kind) + if requirePrefix && !requestScopedPrefix { + c.logMissingRequiredStoragePrefix("DOWNLOAD", kind, hash) + } + objectKey := c.objectKeyForPrefix(prefix, hash, kind) rc, info, _, err := c.mcore.GetObject( ctx, c.bucket, // bucketName - c.objectKey(hash, kind), // objectName + objectKey, // objectName minio.GetObjectOptions{}, // opts ) if err != nil { if minio.ToErrorResponse(err).Code == "NoSuchKey" { cacheMisses.Inc() - logResponse(c.accessLogger, "DOWNLOAD", c.bucket, c.objectKey(hash, kind), errNotFound) + logResponse(c.accessLogger, "DOWNLOAD", c.bucket, objectKey, errNotFound) return nil, -1, nil } cacheMisses.Inc() - logResponse(c.accessLogger, "DOWNLOAD", c.bucket, c.objectKey(hash, kind), err) + logResponse(c.accessLogger, "DOWNLOAD", c.bucket, objectKey, err) return nil, -1, err } cacheHits.Inc() if c.updateTimestamps { - c.UpdateModificationTimestamp(ctx, c.bucket, c.objectKey(hash, kind)) + c.UpdateModificationTimestamp(ctx, c.bucket, objectKey) } - logResponse(c.accessLogger, "DOWNLOAD", c.bucket, c.objectKey(hash, kind), nil) + logResponse(c.accessLogger, "DOWNLOAD", c.bucket, objectKey, nil) if kind == cache.CAS && c.v2mode { return casblob.ExtractLogicalSize(rc) @@ -239,11 +291,16 @@ func (c *s3Cache) Get(ctx context.Context, kind cache.EntryKind, hash string, _ func (c *s3Cache) Contains(ctx context.Context, kind cache.EntryKind, hash string, _ int64) (bool, int64) { size := int64(-1) exists := false + prefix, requestScopedPrefix, requirePrefix := c.prefixForContext(ctx, kind) + if requirePrefix && !requestScopedPrefix { + c.logMissingRequiredStoragePrefix("CONTAINS", kind, hash) + } + objectKey := c.objectKeyForPrefix(prefix, hash, kind) s, err := c.mcore.StatObject( ctx, c.bucket, // bucketName - c.objectKey(hash, kind), // objectName + objectKey, // objectName minio.StatObjectOptions{}, // opts ) @@ -254,7 +311,7 @@ func (c *s3Cache) Contains(ctx context.Context, kind cache.EntryKind, hash strin size = s.Size } - logResponse(c.accessLogger, "CONTAINS", c.bucket, c.objectKey(hash, kind), err) + logResponse(c.accessLogger, "CONTAINS", c.bucket, objectKey, err) return exists, size } diff --git a/cache/s3proxy/s3proxy_test.go b/cache/s3proxy/s3proxy_test.go index 06ef70a..94587bc 100644 --- a/cache/s3proxy/s3proxy_test.go +++ b/cache/s3proxy/s3proxy_test.go @@ -1,9 +1,15 @@ package s3proxy import ( + "bytes" + "context" + "io" + stdlog "log" + "strings" "testing" "github.com/buchgr/bazel-remote/v2/cache" + "github.com/buchgr/bazel-remote/v2/utils/backendproxy" ) func TestObjectKey(t *testing.T) { @@ -36,3 +42,184 @@ func TestObjectKey(t *testing.T) { } } } + +func TestObjectKeyForContextDefaultsToConfiguredPrefix(t *testing.T) { + hash := "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + configuredPrefix := "minio-prefix/buck2/production/us-east-1" + c := &s3Cache{ + prefix: configuredPrefix, + objectKey: objectKeyV2, + } + + result := c.objectKeyForContext(context.Background(), hash, cache.CAS) + expected := configuredPrefix + "/cas.v2/ab/" + hash + if result != expected { + t.Errorf("objectKeyForContext did not use configured prefix. (result: '%s' expected: '%s')", + result, expected) + } +} + +func TestObjectKeyForContextUsesRequestScopedPrefixForACAndCAS(t *testing.T) { + hash := "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + repoAPrefix := "minio-prefix/bazel/production/us-east-1/42/987654/v0" + repoBPrefix := "minio-prefix/bazel/production/us-east-1/42/111111/v0" + configuredPrefix := "minio-prefix/buck2/production/us-east-1" + c := &s3Cache{ + prefix: configuredPrefix, + objectKey: objectKeyV2, + } + + repoAContext := cache.WithStoragePrefix(context.Background(), repoAPrefix) + repoBContext := cache.WithStoragePrefix(context.Background(), repoBPrefix) + + testCases := []struct { + name string + ctx context.Context + kind cache.EntryKind + expected string + }{ + { + name: "repo a cas", + ctx: repoAContext, + kind: cache.CAS, + expected: repoAPrefix + "/cas.v2/ab/" + hash, + }, + { + name: "repo b cas", + ctx: repoBContext, + kind: cache.CAS, + expected: repoBPrefix + "/cas.v2/ab/" + hash, + }, + { + name: "repo a action cache", + ctx: repoAContext, + kind: cache.AC, + expected: repoAPrefix + "/ac/ab/" + hash, + }, + { + name: "repo b action cache", + ctx: repoBContext, + kind: cache.AC, + expected: repoBPrefix + "/ac/ab/" + hash, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := c.objectKeyForContext(tc.ctx, hash, tc.kind) + if result != tc.expected { + t.Errorf("objectKeyForContext did not use request-scoped prefix. (result: '%s' expected: '%s')", + result, tc.expected) + } + }) + } + + repoACASKey := c.objectKeyForContext(repoAContext, hash, cache.CAS) + repoBCASKey := c.objectKeyForContext(repoBContext, hash, cache.CAS) + if repoACASKey == repoBCASKey { + t.Fatalf("same CAS digest produced identical object keys for different request-scoped prefixes: %s", repoACASKey) + } + + repoAACKey := c.objectKeyForContext(repoAContext, hash, cache.AC) + repoBACKey := c.objectKeyForContext(repoBContext, hash, cache.AC) + if repoAACKey == repoBACKey { + t.Fatalf("same AC digest produced identical object keys for different request-scoped prefixes: %s", repoAACKey) + } +} + +func TestPutCapturesRequestScopedPrefixForAsyncUpload(t *testing.T) { + hash := "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + requestPrefix := "minio-prefix/bazel/production/us-east-1/42/987654/v0" + uploadQueue := make(chan backendproxy.UploadReq, 1) + c := &s3Cache{ + prefix: "minio-prefix/buck2/production/us-east-1", + uploadQueue: uploadQueue, + } + + rc := io.NopCloser(strings.NewReader("blob")) + c.Put(cache.WithStoragePrefix(context.Background(), requestPrefix), cache.CAS, hash, 4, 4, rc) + + item := <-uploadQueue + defer item.Rc.Close() + if item.StoragePrefix != requestPrefix { + t.Fatalf("queued upload StoragePrefix = %q, want %q", item.StoragePrefix, requestPrefix) + } + if !item.RequestScopedStoragePrefix { + t.Fatal("queued upload RequestScopedStoragePrefix = false, want true") + } + if item.RequireStoragePrefix { + t.Fatal("queued upload RequireStoragePrefix = true, want false") + } +} + +func TestPutCapturesRequestScopedPrefixForActionCacheAsyncUpload(t *testing.T) { + hash := "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + requestPrefix := "minio-prefix/bazel/production/us-east-1/42/987654/v0" + uploadQueue := make(chan backendproxy.UploadReq, 1) + c := &s3Cache{ + prefix: "minio-prefix/buck2/production/us-east-1", + uploadQueue: uploadQueue, + } + + ctx := cache.WithRequiredStoragePrefix(cache.WithStoragePrefix(context.Background(), requestPrefix)) + rc := io.NopCloser(strings.NewReader("blob")) + c.Put(ctx, cache.AC, hash, 4, 4, rc) + + item := <-uploadQueue + defer item.Rc.Close() + if item.StoragePrefix != requestPrefix { + t.Fatalf("queued upload StoragePrefix = %q, want %q", item.StoragePrefix, requestPrefix) + } + if !item.RequestScopedStoragePrefix { + t.Fatal("queued upload RequestScopedStoragePrefix = false, want true") + } + if !item.RequireStoragePrefix { + t.Fatal("queued upload RequireStoragePrefix = false, want true") + } +} + +func TestPutCapturesMissingRequiredRequestScopedPrefixForAsyncUpload(t *testing.T) { + hash := "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789" + configuredPrefix := "minio-prefix/buck2/production/us-east-1" + uploadQueue := make(chan backendproxy.UploadReq, 1) + c := &s3Cache{ + prefix: configuredPrefix, + uploadQueue: uploadQueue, + } + + rc := io.NopCloser(strings.NewReader("blob")) + c.Put(cache.WithRequiredStoragePrefix(context.Background()), cache.CAS, hash, 4, 4, rc) + + item := <-uploadQueue + defer item.Rc.Close() + if item.StoragePrefix != configuredPrefix { + t.Fatalf("queued upload StoragePrefix = %q, want %q", item.StoragePrefix, configuredPrefix) + } + if item.RequestScopedStoragePrefix { + t.Fatal("queued upload RequestScopedStoragePrefix = true, want false") + } + if !item.RequireStoragePrefix { + t.Fatal("queued upload RequireStoragePrefix = false, want true") + } +} + +func TestLogMissingRequiredStoragePrefix(t *testing.T) { + var buf bytes.Buffer + c := &s3Cache{ + prefix: "minio-prefix/buck2/production/us-east-1", + errorLogger: stdlog.New(&buf, "", 0), + } + + c.logMissingRequiredStoragePrefix("UPLOAD", cache.CAS, "hash") + + result := buf.String() + for _, expected := range []string{ + "S3 UPLOAD missing request-scoped storage prefix", + "cas hash", + `using configured prefix "minio-prefix/buck2/production/us-east-1"`, + } { + if !strings.Contains(result, expected) { + t.Fatalf("log line %q does not contain %q", result, expected) + } + } +} diff --git a/cache/storage_prefix.go b/cache/storage_prefix.go new file mode 100644 index 0000000..c7bcd6c --- /dev/null +++ b/cache/storage_prefix.go @@ -0,0 +1,40 @@ +package cache + +import "context" + +type storagePrefixContextKey struct{} +type requireStoragePrefixContextKey struct{} + +// WithStoragePrefix returns a context whose cache backend operations should use +// prefix as the physical object-key prefix for this request. +func WithStoragePrefix(ctx context.Context, prefix string) context.Context { + return context.WithValue(ctx, storagePrefixContextKey{}, prefix) +} + +// StoragePrefixFromContext returns a request-scoped physical object-key prefix +// when one was attached to ctx. +func StoragePrefixFromContext(ctx context.Context) (string, bool) { + if ctx == nil { + return "", false + } + prefix, ok := ctx.Value(storagePrefixContextKey{}).(string) + if !ok || prefix == "" { + return "", false + } + return prefix, true +} + +// WithRequiredStoragePrefix marks a request as expecting a request-scoped +// storage prefix. Backends can use this to log when they must fall back to the +// configured process-wide prefix. +func WithRequiredStoragePrefix(ctx context.Context) context.Context { + return context.WithValue(ctx, requireStoragePrefixContextKey{}, true) +} + +func StoragePrefixRequiredFromContext(ctx context.Context) bool { + if ctx == nil { + return false + } + required, ok := ctx.Value(requireStoragePrefixContextKey{}).(bool) + return ok && required +} diff --git a/utils/backendproxy/backendproxy.go b/utils/backendproxy/backendproxy.go index 51ea373..f323a5d 100644 --- a/utils/backendproxy/backendproxy.go +++ b/utils/backendproxy/backendproxy.go @@ -12,6 +12,12 @@ type UploadReq struct { SizeOnDisk int64 Kind cache.EntryKind Rc io.ReadCloser + // StoragePrefix captures the request-scoped physical object-key prefix at + // enqueue time. Uploads are asynchronous, so backends cannot rely on the + // original request context still being available when workers process this. + StoragePrefix string + RequestScopedStoragePrefix bool + RequireStoragePrefix bool } type Uploader interface {