Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 29 additions & 2 deletions BLACKSMITH.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ other callers that do not opt in to request-scoped routing. For Bazel, FA should
resolve the authorized VM/job namespace to the full physical prefix:

```text
<MINIO_PREFIX>/bazel/<environment>/<region>/<model_installation_id>/<repository_id>/<generation>
<MINIO_PREFIX>/<environment>/<model_installation_id>/<repository_id>/<generation>/<tool>
```

and attach it with `cache.WithStoragePrefix`. The S3 proxy then uses that
Expand All @@ -50,7 +50,7 @@ Local disk cache entries store the full request prefix as a stable hash so the
LRU can distinguish identical AC/CAS digests from different repo/generation
namespaces without using S3-style slash-heavy prefixes in local paths. MinIO/S3
object keys use the real request-scoped prefix directly, so broad remote
deletion still targets `<MINIO_PREFIX>/bazel/.../<generation>/`.
deletion still targets `<MINIO_PREFIX>/<environment>/<model_installation_id>/<repository_id>/<generation>/`.

For Bazel requests, FA should also mark the request with
`cache.WithRequiredStoragePrefix`. If a request reaches the S3 proxy with that
Expand All @@ -70,3 +70,30 @@ tags, and security advisories for `bazel-remote`. To apply an upstream patch:
5. Run the FA agent build and Buck2 cache tests before merging.

BLA-4006 should make CAS namespacing changes in this repository.

## Build cache operation observation

BLA-4010 adds optional cache operation observation for FA-owned customer
metrics. Callers may attach opaque identity labels with
`cache.WithMetricsLabels`. bazel-remote stores and forwards those labels but
does not interpret tenant, repository, VM, or job identity.

The disk cache accepts an optional `cache.OperationObserver` and invokes it next
to the existing endpoint metrics decorator for semantic cache outcomes:

- `action_cache_get`: `hit`, `miss`, or `error`
- `cas_lookup`: `hit`, `miss`, or `error`

The S3 proxy accepts the same observer and records backend async upload health
only:

- `backend_upload`: `error` or `dropped`

Client transfer bytes are intentionally not inferred inside bazel-remote; FA
observes gRPC request/response payloads and emits `client_upload` and
`client_download` rows with byte counts.

Nil observers preserve existing behavior. Observer panics are swallowed through
the cache package helper so metrics collection cannot change cache request
outcomes. The fork still has no Laravel/Web dependency; FA owns aggregation and
ClickHouse delivery.
110 changes: 70 additions & 40 deletions cache/disk/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,20 @@ import (
)

type metricsDecorator struct {
counter *prometheus.CounterVec
counter *prometheus.CounterVec
observer cache.OperationObserver
*diskCache
}

const (
hitStatus = "hit"
missStatus = "miss"
hitStatus = "hit"
missStatus = "miss"
errorStatus = "error"

containsMethod = "contains"
getMethod = "get"
actionCacheGet = "action_cache_get"
casLookup = "cas_lookup"
//putMethod = "put"

acKind = "ac" // This must be lowercase to match cache.EntryKind.String()
Expand All @@ -30,74 +34,72 @@ const (
)

func (m *metricsDecorator) RegisterMetrics() {
prometheus.MustRegister(m.counter)
if m.counter != nil {
prometheus.MustRegister(m.counter)
}
m.diskCache.RegisterMetrics()
}

func (m *metricsDecorator) Get(ctx context.Context, kind cache.EntryKind, hash string, size int64, offset int64) (io.ReadCloser, int64, error) {
rc, size, err := m.diskCache.Get(ctx, kind, hash, size, offset)
if err != nil {
m.recordLookup(ctx, kind, getMethod, errorStatus, "get_failed", 1, 0)
return rc, size, err
}

lbls := prometheus.Labels{"method": getMethod, "kind": kind.String()}
status := missStatus
if rc != nil {
lbls["status"] = hitStatus
} else {
lbls["status"] = missStatus
status = hitStatus
}
m.counter.With(lbls).Inc()
m.incCounter(getMethod, kind.String(), status, 1)
m.recordLookup(ctx, kind, getMethod, status, "", 1, nonNegativeUint64(size))

return rc, size, nil
}

func (m *metricsDecorator) GetValidatedActionResult(ctx context.Context, hash string) (*pb.ActionResult, []byte, error) {
ar, data, err := m.diskCache.GetValidatedActionResult(ctx, hash)
if err != nil {
m.record(ctx, actionCacheGet, errorStatus, "get_action_result_failed", 1, 0)
return ar, data, err
}

lbls := prometheus.Labels{"method": getMethod, "kind": acKind}
status := missStatus
if ar != nil {
lbls["status"] = hitStatus
} else {
lbls["status"] = missStatus
status = hitStatus
}
m.counter.With(lbls).Inc()
m.incCounter(getMethod, acKind, status, 1)
m.record(ctx, actionCacheGet, status, "", 1, uint64(len(data)))

return ar, data, err
}

func (m *metricsDecorator) GetZstd(ctx context.Context, hash string, size int64, offset int64) (io.ReadCloser, int64, error) {
rc, size, err := m.diskCache.GetZstd(ctx, hash, size, offset)
if err != nil {
m.record(ctx, casLookup, errorStatus, "get_zstd_failed", 1, 0)
return rc, size, err
}

lbls := prometheus.Labels{
"method": getMethod,
"kind": "cas",
}
status := missStatus
if rc != nil {
lbls["status"] = hitStatus
} else {
lbls["status"] = missStatus
status = hitStatus
}
m.counter.With(lbls).Inc()
m.incCounter(getMethod, casKind, status, 1)
m.record(ctx, casLookup, status, "", 1, nonNegativeUint64(size))

return rc, size, nil
}

func (m *metricsDecorator) Contains(ctx context.Context, kind cache.EntryKind, hash string, size int64) (bool, int64) {
ok, size := m.diskCache.Contains(ctx, kind, hash, size)

lbls := prometheus.Labels{"method": containsMethod, "kind": kind.String()}
status := missStatus
if ok {
lbls["status"] = hitStatus
} else {
lbls["status"] = missStatus
status = hitStatus
}
m.counter.With(lbls).Inc()
m.incCounter(containsMethod, kind.String(), status, 1)
m.recordLookup(ctx, kind, containsMethod, status, "", 1, nonNegativeUint64(size))

return ok, size
}
Expand All @@ -106,29 +108,57 @@ func (m *metricsDecorator) FindMissingCasBlobs(ctx context.Context, blobs []*pb.
numLooking := len(blobs)
digests, err := m.diskCache.FindMissingCasBlobs(ctx, blobs)
if err != nil {
m.record(ctx, casLookup, errorStatus, "find_missing_cas_blobs_failed", uint64(numLooking), 0)
return digests, err
}

numMissing := len(digests)

numFound := numLooking - numMissing

hitLabels := prometheus.Labels{
"method": containsMethod,
"kind": "cas",
"status": hitStatus,
m.incCounter(containsMethod, casKind, hitStatus, float64(numFound))
m.incCounter(containsMethod, casKind, missStatus, float64(numMissing))
if numFound > 0 {
m.record(ctx, casLookup, hitStatus, "", uint64(numFound), 0)
}
if numMissing > 0 {
m.record(ctx, casLookup, missStatus, "", uint64(numMissing), 0)
}
hits := m.counter.With(hitLabels)

missLabels := prometheus.Labels{
"method": containsMethod,
"kind": "cas",
"status": missStatus,
return digests, nil
}

func (m *metricsDecorator) incCounter(method, kind, status string, value float64) {
if m.counter == nil || value == 0 {
return
}
misses := m.counter.With(missLabels)
m.counter.With(prometheus.Labels{"method": method, "kind": kind, "status": status}).Add(value)
}

hits.Add(float64(numFound))
misses.Add(float64(numMissing))
func (m *metricsDecorator) recordLookup(ctx context.Context, kind cache.EntryKind, method, status, reason string, ops uint64, bytes uint64) {
switch kind {
case cache.AC:
m.record(ctx, actionCacheGet, status, reason, ops, bytes)
case cache.CAS:
m.record(ctx, casLookup, status, reason, ops, bytes)
default:
m.record(ctx, method, status, reason, ops, bytes)
}
}

return digests, nil
func (m *metricsDecorator) record(ctx context.Context, operation, status, reason string, ops uint64, bytes uint64) {
cache.ObserveOperation(ctx, m.observer, cache.OperationOutcome{
Method: operation,
Status: status,
Reason: reason,
Ops: ops,
Bytes: bytes,
})
}

func nonNegativeUint64(value int64) uint64 {
if value < 0 {
return 0
}
return uint64(value)
}
86 changes: 86 additions & 0 deletions cache/disk/operation_observer_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package disk

import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"io"
stdlog "log"
"testing"

"github.com/buchgr/bazel-remote/v2/cache"
pb "github.com/buchgr/bazel-remote/v2/genproto/build/bazel/remote/execution/v2"
"google.golang.org/protobuf/proto"
)

type recordingObserver struct {
outcomes []cache.OperationOutcome
}

func (r *recordingObserver) RecordOutcome(_ context.Context, outcome cache.OperationOutcome) {
r.outcomes = append(r.outcomes, outcome)
}

func TestOperationObserverReceivesActionCacheHitAndMiss(t *testing.T) {
observer := &recordingObserver{}
diskCache, err := New(t.TempDir(), 1024*1024, WithOperationObserver(observer), WithAccessLogger(stdlog.New(io.Discard, "", 0)))
if err != nil {
t.Fatalf("New() error = %v", err)
}

result := &pb.ActionResult{StdoutRaw: []byte("ok")}
data, err := proto.Marshal(result)
if err != nil {
t.Fatalf("Marshal() error = %v", err)
}
sum := sha256.Sum256(data)
hash := hex.EncodeToString(sum[:])
if err := diskCache.Put(context.Background(), cache.AC, hash, int64(len(data)), bytes.NewReader(data)); err != nil {
t.Fatalf("Put() error = %v", err)
}
if _, _, err := diskCache.GetValidatedActionResult(context.Background(), hash); err != nil {
t.Fatalf("GetValidatedActionResult(hit) error = %v", err)
}
if _, _, err := diskCache.GetValidatedActionResult(context.Background(), "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"); err != nil {
t.Fatalf("GetValidatedActionResult(miss) error = %v", err)
}

requireOutcome(t, observer.outcomes, actionCacheGet, hitStatus)
requireOutcome(t, observer.outcomes, actionCacheGet, missStatus)
}

func TestOperationObserverReceivesFindMissingCasBlobsOutcomes(t *testing.T) {
observer := &recordingObserver{}
diskCache, err := New(t.TempDir(), 1024*1024, WithOperationObserver(observer), WithAccessLogger(stdlog.New(io.Discard, "", 0)))
if err != nil {
t.Fatalf("New() error = %v", err)
}

data := []byte("blob")
sum := sha256.Sum256(data)
hash := hex.EncodeToString(sum[:])
if err := diskCache.Put(context.Background(), cache.CAS, hash, int64(len(data)), bytes.NewReader(data)); err != nil {
t.Fatalf("Put() error = %v", err)
}
_, err = diskCache.FindMissingCasBlobs(context.Background(), []*pb.Digest{
{Hash: hash, SizeBytes: int64(len(data))},
{Hash: "abcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789", SizeBytes: 12},
})
if err != nil {
t.Fatalf("FindMissingCasBlobs() error = %v", err)
}

requireOutcome(t, observer.outcomes, casLookup, hitStatus)
requireOutcome(t, observer.outcomes, casLookup, missStatus)
}

func requireOutcome(t *testing.T, outcomes []cache.OperationOutcome, method string, status string) {
t.Helper()
for _, outcome := range outcomes {
if outcome.Method == method && outcome.Status == status {
return
}
}
t.Fatalf("missing outcome method=%s status=%s in %+v", method, status, outcomes)
}
28 changes: 21 additions & 7 deletions cache/disk/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,17 +86,18 @@ func WithAccessLogger(logger *log.Logger) Option {

func WithEndpointMetrics() Option {
return func(c *CacheConfig) error {
if c.metrics != nil {
if c.metrics != nil && c.metrics.counter != nil {
return fmt.Errorf("WithEndpointMetrics specified multiple times")
}

c.metrics = &metricsDecorator{
counter: prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "bazel_remote_incoming_requests_total",
Help: "The number of incoming cache requests",
},
[]string{"method", "kind", "status"}),
if c.metrics == nil {
c.metrics = &metricsDecorator{}
}
c.metrics.counter = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "bazel_remote_incoming_requests_total",
Help: "The number of incoming cache requests",
},
[]string{"method", "kind", "status"})

c.metrics.counter.WithLabelValues("get", "cas", "hit").Add(0)
c.metrics.counter.WithLabelValues("get", "cas", "miss").Add(0)
Expand All @@ -108,3 +109,16 @@ func WithEndpointMetrics() Option {
return nil
}
}

func WithOperationObserver(observer cache.OperationObserver) Option {
return func(c *CacheConfig) error {
if observer == nil {
return nil
}
if c.metrics == nil {
c.metrics = &metricsDecorator{}
}
c.metrics.observer = observer
return nil
}
}
Loading
Loading