From 2a0b2a6f61a5b6a81f41ca49cf67c7ff95949a28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Cuevas?= Date: Thu, 19 Mar 2026 17:46:57 +0100 Subject: [PATCH 1/4] feat(admin): add metrics collection, computation, and REST API Parse Prometheus text format from proxy /metrics endpoints using expfmt, store snapshots in per-instance ring buffers (~1h retention at 10s intervals), compute per-second rates from counter deltas (handling resets), compute p50/p95/p99 from histogram buckets via linear interpolation, and expose results via REST API (GET /api/metrics/{id}, GET /api/metrics/fleet). New admin/metrics package: parser, ring buffer, rate/percentile math, and Collector orchestrator. Poller extended to scrape /metrics on successful health probes. Fleet endpoint aggregates KPIs across instances with trend indicators vs 1h ago. --- admin/api/metrics.go | 61 ++++ admin/api/metrics_test.go | 177 +++++++++++ admin/cmd/chaperone-admin/main.go | 15 +- admin/go.mod | 8 +- admin/go.sum | 34 +- admin/metrics/collector.go | 508 ++++++++++++++++++++++++++++++ admin/metrics/collector_test.go | 419 ++++++++++++++++++++++++ admin/metrics/compute.go | 95 ++++++ admin/metrics/compute_test.go | 176 +++++++++++ admin/metrics/metrics.go | 154 +++++++++ admin/metrics/parse.go | 117 +++++++ admin/metrics/parse_test.go | 174 ++++++++++ admin/metrics/ring.go | 48 +++ admin/metrics/ring_test.go | 95 ++++++ admin/poller/poller.go | 81 ++++- admin/poller/poller_test.go | 123 +++++++- admin/server.go | 13 +- 17 files changed, 2267 insertions(+), 31 deletions(-) create mode 100644 admin/api/metrics.go create mode 100644 admin/api/metrics_test.go create mode 100644 admin/metrics/collector.go create mode 100644 admin/metrics/collector_test.go create mode 100644 admin/metrics/compute.go create mode 100644 admin/metrics/compute_test.go create mode 100644 admin/metrics/metrics.go create mode 100644 admin/metrics/parse.go create mode 100644 admin/metrics/parse_test.go create mode 100644 admin/metrics/ring.go create mode 100644 admin/metrics/ring_test.go diff --git a/admin/api/metrics.go b/admin/api/metrics.go new file mode 100644 index 0000000..61e8cb1 --- /dev/null +++ b/admin/api/metrics.go @@ -0,0 +1,61 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "log/slog" + "net/http" + + "github.com/cloudblue/chaperone/admin/metrics" + "github.com/cloudblue/chaperone/admin/store" +) + +// MetricsHandler serves computed metrics via the REST API. +type MetricsHandler struct { + store *store.Store + collector *metrics.Collector +} + +// NewMetricsHandler creates a handler backed by the given store and collector. +func NewMetricsHandler(st *store.Store, c *metrics.Collector) *MetricsHandler { + return &MetricsHandler{store: st, collector: c} +} + +// Register mounts metrics routes on the given mux. +func (h *MetricsHandler) Register(mux *http.ServeMux) { + mux.HandleFunc("GET /api/metrics/fleet", h.fleet) + mux.HandleFunc("GET /api/metrics/{id}", h.instance) +} + +func (h *MetricsHandler) fleet(w http.ResponseWriter, r *http.Request) { + instances, err := h.store.ListInstances(r.Context()) + if err != nil { + slog.Error("listing instances for fleet metrics", "error", err) + respondError(w, http.StatusInternalServerError, "INTERNAL_ERROR", "Failed to list instances") + return + } + + ids := make([]int64, len(instances)) + for i := range instances { + ids[i] = instances[i].ID + } + + fm := h.collector.GetFleetMetrics(ids) + respondJSON(w, http.StatusOK, fm) +} + +func (h *MetricsHandler) instance(w http.ResponseWriter, r *http.Request) { + id, ok := parseID(w, r) + if !ok { + return + } + + im := h.collector.GetInstanceMetrics(id) + if im == nil { + respondError(w, http.StatusNotFound, "NO_METRICS", "No metric data available for this instance") + return + } + + respondJSON(w, http.StatusOK, im) +} diff --git a/admin/api/metrics_test.go b/admin/api/metrics_test.go new file mode 100644 index 0000000..43f598d --- /dev/null +++ b/admin/api/metrics_test.go @@ -0,0 +1,177 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package api + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/cloudblue/chaperone/admin/metrics" +) + +func makeSnapshot(t time.Time, totalReq, errReq, active, panics float64) metrics.Snapshot { + return metrics.Snapshot{ + Time: t, + Vendors: map[string]*metrics.VendorSnapshot{ + "acme": { + RequestsTotal: totalReq, + RequestsErrors: errReq, + Duration: metrics.Histogram{ + Count: totalReq, + Buckets: []metrics.Bucket{ + {UpperBound: 0.1, Count: totalReq * 0.5}, + {UpperBound: 0.5, Count: totalReq * 0.9}, + {UpperBound: 1.0, Count: totalReq}, + }, + }, + }, + }, + ActiveConnections: active, + PanicsTotal: panics, + } +} + +func TestMetricsHandler_Fleet_ReturnsAggregated(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + + ctx := context.Background() + inst, err := st.CreateInstance(ctx, "proxy-1", "10.0.0.1:9090") + if err != nil { + t.Fatalf("CreateInstance() error = %v", err) + } + + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + c.Record(inst.ID, makeSnapshot(t0, 1000, 50, 10, 1)) + c.Record(inst.ID, makeSnapshot(t0.Add(10*time.Second), 1100, 55, 12, 2)) + + h := NewMetricsHandler(st, c) + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/api/metrics/fleet", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var fm metrics.FleetMetrics + if err := json.NewDecoder(rec.Body).Decode(&fm); err != nil { + t.Fatalf("decode error: %v", err) + } + if fm.TotalRPS <= 0 { + t.Errorf("TotalRPS = %v, want > 0", fm.TotalRPS) + } + if len(fm.Instances) != 1 { + t.Errorf("Instances = %d, want 1", len(fm.Instances)) + } +} + +func TestMetricsHandler_Instance_ReturnsMetrics(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + + ctx := context.Background() + inst, err := st.CreateInstance(ctx, "proxy-1", "10.0.0.1:9090") + if err != nil { + t.Fatalf("CreateInstance() error = %v", err) + } + + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + c.Record(inst.ID, makeSnapshot(t0, 1000, 50, 10, 1)) + c.Record(inst.ID, makeSnapshot(t0.Add(10*time.Second), 1100, 55, 12, 2)) + + h := NewMetricsHandler(st, c) + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/api/metrics/1", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var im metrics.InstanceMetrics + if err := json.NewDecoder(rec.Body).Decode(&im); err != nil { + t.Fatalf("decode error: %v", err) + } + if im.RPS <= 0 { + t.Errorf("RPS = %v, want > 0", im.RPS) + } + if im.DataPoints != 2 { + t.Errorf("DataPoints = %d, want 2", im.DataPoints) + } +} + +func TestMetricsHandler_Instance_NoData_Returns404(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + + h := NewMetricsHandler(st, c) + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/api/metrics/99", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusNotFound { + t.Errorf("status = %d, want %d", rec.Code, http.StatusNotFound) + } +} + +func TestMetricsHandler_Instance_InvalidID_Returns400(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + + h := NewMetricsHandler(st, c) + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/api/metrics/abc", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusBadRequest { + t.Errorf("status = %d, want %d", rec.Code, http.StatusBadRequest) + } +} + +func TestMetricsHandler_Fleet_EmptyFleet_ReturnsEmptyInstances(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + + h := NewMetricsHandler(st, c) + mux := http.NewServeMux() + h.Register(mux) + + req := httptest.NewRequest(http.MethodGet, "/api/metrics/fleet", nil) + rec := httptest.NewRecorder() + mux.ServeHTTP(rec, req) + + if rec.Code != http.StatusOK { + t.Fatalf("status = %d, want %d", rec.Code, http.StatusOK) + } + + var fm metrics.FleetMetrics + if err := json.NewDecoder(rec.Body).Decode(&fm); err != nil { + t.Fatalf("decode error: %v", err) + } + if len(fm.Instances) != 0 { + t.Errorf("Instances = %d, want 0", len(fm.Instances)) + } +} diff --git a/admin/cmd/chaperone-admin/main.go b/admin/cmd/chaperone-admin/main.go index 55d3997..6f34f7d 100644 --- a/admin/cmd/chaperone-admin/main.go +++ b/admin/cmd/chaperone-admin/main.go @@ -17,6 +17,7 @@ import ( "github.com/cloudblue/chaperone/admin" "github.com/cloudblue/chaperone/admin/config" + "github.com/cloudblue/chaperone/admin/metrics" "github.com/cloudblue/chaperone/admin/poller" "github.com/cloudblue/chaperone/admin/store" ) @@ -59,24 +60,30 @@ func run() error { } defer st.Close() - srv, err := admin.NewServer(cfg, st) + collector := metrics.NewCollector(metrics.DefaultCapacity) + + srv, err := admin.NewServer(cfg, st, collector) if err != nil { return fmt.Errorf("creating server: %w", err) } - // Start the background health poller. + // Start the background health + metrics poller. pollerCtx, pollerCancel := context.WithCancel(context.Background()) defer pollerCancel() - p := poller.New(st, cfg.Scraper.Interval.Unwrap(), cfg.Scraper.Timeout.Unwrap()) + p := poller.New(st, collector, cfg.Scraper.Interval.Unwrap(), cfg.Scraper.Timeout.Unwrap()) go p.Run(pollerCtx) + return serve(cfg.Server.Addr, srv) +} + +func serve(addr string, srv *admin.Server) error { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop() errCh := make(chan error, 1) go func() { - slog.Info("listening", "addr", cfg.Server.Addr) + slog.Info("listening", "addr", addr) errCh <- srv.ListenAndServe() }() diff --git a/admin/go.mod b/admin/go.mod index 4d4b61c..f4f777b 100644 --- a/admin/go.mod +++ b/admin/go.mod @@ -3,6 +3,8 @@ module github.com/cloudblue/chaperone/admin go 1.26.1 require ( + github.com/prometheus/client_model v0.6.2 + github.com/prometheus/common v0.67.5 gopkg.in/yaml.v3 v3.0.1 modernc.org/sqlite v1.46.1 ) @@ -10,11 +12,15 @@ require ( require ( github.com/dustin/go-humanize v1.0.1 // indirect github.com/google/uuid v1.6.0 // indirect + github.com/kr/pretty v0.3.1 // indirect github.com/mattn/go-isatty v0.0.20 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/ncruces/go-strftime v1.0.0 // indirect github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + go.yaml.in/yaml/v2 v2.4.3 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect - golang.org/x/sys v0.37.0 // indirect + golang.org/x/sys v0.39.0 // indirect + google.golang.org/protobuf v1.36.11 // indirect modernc.org/libc v1.67.6 // indirect modernc.org/mathutil v1.7.1 // indirect modernc.org/memory v1.11.0 // indirect diff --git a/admin/go.sum b/admin/go.sum index b2791d1..a22dc4b 100644 --- a/admin/go.sum +++ b/admin/go.sum @@ -1,17 +1,42 @@ +github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= +github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/ncruces/go-strftime v1.0.0 h1:HMFp8mLCTPp341M/ZnA4qaf7ZlsbTc+miZjCLOFAw7w= github.com/ncruces/go-strftime v1.0.0/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.6.2 h1:oBsgwpGs7iVziMvrGhE53c/GrLUsZdHnqNwqPLxwZyk= +github.com/prometheus/client_model v0.6.2/go.mod h1:y3m2F6Gdpfy6Ut/GBsUqTWZqCUvMVzSfMLjcu6wAwpE= +github.com/prometheus/common v0.67.5 h1:pIgK94WWlQt1WLwAC5j2ynLaBRDiinoAb86HZHTUGI4= +github.com/prometheus/common v0.67.5/go.mod h1:SjE/0MzDEEAyrdr5Gqc6G+sXI67maCxzaT3A2+HqjUw= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= +github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ= +github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= +go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 h1:mgKeJMpvi0yx/sU5GsxQ7p6s2wtOnGAHZWCHUM4KGzY= golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546/go.mod h1:j/pmGrbnkbPtQfxEe5D0VQhZC6qKbfKifgD0oM7sR70= golang.org/x/mod v0.29.0 h1:HV8lRxZC4l2cr3Zq1LvtOsi/ThTgWnUk/y64QSs8GwA= @@ -19,12 +44,15 @@ golang.org/x/mod v0.29.0/go.mod h1:NyhrlYXJ2H4eJiRy/WDBO6HMqZQ6q9nk4JzS3NuCK+w= golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= -golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.39.0 h1:CvCKL8MeisomCi6qNZ+wbb0DN9E5AATixKsvNtMoMFk= +golang.org/x/sys v0.39.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ= golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE= +google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= modernc.org/cc/v4 v4.27.1 h1:9W30zRlYrefrDV2JE2O8VDtJ1yPGownxciz5rrbQZis= diff --git a/admin/metrics/collector.go b/admin/metrics/collector.go new file mode 100644 index 0000000..8be1198 --- /dev/null +++ b/admin/metrics/collector.go @@ -0,0 +1,508 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "sort" + "sync" + "time" +) + +// Collector manages per-instance metric ring buffers and computes derived +// metrics (rates, percentiles) on demand. +type Collector struct { + mu sync.RWMutex + buffers map[int64]*Ring + capacity int +} + +// NewCollector creates a Collector with the given ring buffer capacity. +func NewCollector(capacity int) *Collector { + return &Collector{ + buffers: make(map[int64]*Ring), + capacity: capacity, + } +} + +// RecordScrape parses raw Prometheus text data and stores the resulting +// snapshot for the given instance. +func (c *Collector) RecordScrape(instanceID int64, data []byte, t time.Time) error { + snap, err := Parse(data, t) + if err != nil { + return err + } + c.Record(instanceID, *snap) + return nil +} + +// Record stores a pre-parsed snapshot for the given instance. +func (c *Collector) Record(instanceID int64, snap Snapshot) { + c.mu.Lock() + defer c.mu.Unlock() + + buf, ok := c.buffers[instanceID] + if !ok { + buf = NewRing(c.capacity) + c.buffers[instanceID] = buf + } + buf.Push(snap) +} + +// Remove deletes all stored data for the given instance. +func (c *Collector) Remove(instanceID int64) { + c.mu.Lock() + defer c.mu.Unlock() + delete(c.buffers, instanceID) +} + +// Prune removes ring buffers for instances not in the activeIDs set. +func (c *Collector) Prune(activeIDs map[int64]bool) { + c.mu.Lock() + defer c.mu.Unlock() + for id := range c.buffers { + if !activeIDs[id] { + delete(c.buffers, id) + } + } +} + +// GetInstanceMetrics computes full metrics for a single instance. +// Returns nil if no data exists for the given instance. +func (c *Collector) GetInstanceMetrics(instanceID int64) *InstanceMetrics { + c.mu.RLock() + defer c.mu.RUnlock() + + buf, ok := c.buffers[instanceID] + if !ok || buf.Len() == 0 { + return nil + } + + last, _ := buf.Last() + im := &InstanceMetrics{ + InstanceID: instanceID, + CollectedAt: last.Time, + DataPoints: buf.Len(), + ActiveConnections: last.ActiveConnections, + PanicsTotal: last.PanicsTotal, + } + + if buf.Len() >= 2 { + prev := buf.At(buf.Len() - 2) + c.fillCurrentKPIs(im, prev, last) + c.fillVendorMetrics(im, prev, last) + } + + c.fillTrends(im, buf) + im.Series = c.buildSeries(buf) + im.VendorSeries = c.buildVendorSeries(buf) + + return im +} + +// GetInstanceSummary returns a compact summary for the fleet view. +// Returns nil if no data exists. +func (c *Collector) GetInstanceSummary(instanceID int64) *InstanceSummary { + c.mu.RLock() + defer c.mu.RUnlock() + + buf, ok := c.buffers[instanceID] + if !ok || buf.Len() == 0 { + return nil + } + + last, _ := buf.Last() + s := &InstanceSummary{ + InstanceID: instanceID, + ActiveConnections: last.ActiveConnections, + PanicsTotal: last.PanicsTotal, + } + + if buf.Len() >= 2 { + prev := buf.At(buf.Len() - 2) + dt := last.Time.Sub(prev.Time) + s.RPS = counterRate(prev.totalRequests(), last.totalRequests(), dt) + s.ErrorRate = errorRate(prev.totalRequests(), last.totalRequests(), prev.totalErrors(), last.totalErrors()) + dh := mergeHistogramDelta(prev, last) + s.P99 = secondsToMs(histogramQuantile(0.99, dh)) + } + + return s +} + +// fleetAccumulator collects per-instance data for fleet-wide aggregation. +type fleetAccumulator struct { + totalReqDelta float64 + totalErrDelta float64 + trendOldRPS float64 + trendOldReqDelta float64 + trendOldErrDelta float64 + hasTrend bool +} + +// GetFleetMetrics aggregates metrics across the given instance IDs. +func (c *Collector) GetFleetMetrics(instanceIDs []int64) *FleetMetrics { + c.mu.RLock() + defer c.mu.RUnlock() + + fm := &FleetMetrics{ + CollectedAt: time.Now(), + Instances: make([]InstanceSummary, 0, len(instanceIDs)), + } + + var acc fleetAccumulator + for _, id := range instanceIDs { + buf, ok := c.buffers[id] + if !ok || buf.Len() == 0 { + continue + } + s := c.summarizeForFleet(buf, id, &acc) + fm.TotalRPS += s.RPS + fm.TotalActiveConnections += s.ActiveConnections + fm.TotalPanics += s.PanicsTotal + fm.Instances = append(fm.Instances, s) + } + + if acc.totalReqDelta > 0 { + fm.FleetErrorRate = acc.totalErrDelta / acc.totalReqDelta + } + acc.applyTrends(fm) + return fm +} + +// summarizeForFleet computes an InstanceSummary and accumulates fleet-wide deltas. +func (c *Collector) summarizeForFleet(buf *Ring, id int64, acc *fleetAccumulator) InstanceSummary { + last, _ := buf.Last() + s := InstanceSummary{ + InstanceID: id, + ActiveConnections: last.ActiveConnections, + PanicsTotal: last.PanicsTotal, + } + + if buf.Len() >= 2 { + prev := buf.At(buf.Len() - 2) + dt := last.Time.Sub(prev.Time) + s.RPS = counterRate(prev.totalRequests(), last.totalRequests(), dt) + s.ErrorRate = errorRate(prev.totalRequests(), last.totalRequests(), prev.totalErrors(), last.totalErrors()) + s.P99 = secondsToMs(histogramQuantile(0.99, mergeHistogramDelta(prev, last))) + + reqD := last.totalRequests() - prev.totalRequests() + errD := last.totalErrors() - prev.totalErrors() + if reqD >= 0 && errD >= 0 { + acc.totalReqDelta += reqD + acc.totalErrDelta += errD + } + } + + if td, ok := c.trendSnapshot(buf); ok { + acc.trendOldRPS += td.rps + acc.trendOldReqDelta += td.reqDelta + acc.trendOldErrDelta += td.errDelta + acc.hasTrend = true + } + return s +} + +func (acc *fleetAccumulator) applyTrends(fm *FleetMetrics) { + if !acc.hasTrend { + return + } + rpsTrend := fm.TotalRPS - acc.trendOldRPS + fm.RPSTrend = &rpsTrend + + var oldErrRate float64 + if acc.trendOldReqDelta > 0 { + oldErrRate = acc.trendOldErrDelta / acc.trendOldReqDelta + } + errTrend := fm.FleetErrorRate - oldErrRate + fm.ErrorRateTrend = &errTrend +} + +// fillCurrentKPIs populates global RPS, error rate, and latency percentiles +// from the two most recent snapshots. +func (*Collector) fillCurrentKPIs(im *InstanceMetrics, prev, curr Snapshot) { + dt := curr.Time.Sub(prev.Time) + im.RPS = counterRate(prev.totalRequests(), curr.totalRequests(), dt) + im.ErrorRate = errorRate(prev.totalRequests(), curr.totalRequests(), prev.totalErrors(), curr.totalErrors()) + + dh := mergeHistogramDelta(prev, curr) + im.P50 = secondsToMs(histogramQuantile(0.50, dh)) + im.P95 = secondsToMs(histogramQuantile(0.95, dh)) + im.P99 = secondsToMs(histogramQuantile(0.99, dh)) +} + +// fillVendorMetrics populates per-vendor KPIs from the two most recent snapshots. +func (*Collector) fillVendorMetrics(im *InstanceMetrics, prev, curr Snapshot) { + dt := curr.Time.Sub(prev.Time) + vendorIDs := collectVendorIDs(prev, curr) + + for _, vid := range vendorIDs { + pv := vendorOr(prev, vid) + cv := vendorOr(curr, vid) + + vm := VendorMetrics{VendorID: vid} + vm.RPS = counterRate(pv.RequestsTotal, cv.RequestsTotal, dt) + vm.ErrorRate = errorRate(pv.RequestsTotal, cv.RequestsTotal, pv.RequestsErrors, cv.RequestsErrors) + + dh := histogramDelta(pv.Duration, cv.Duration) + vm.P50 = secondsToMs(histogramQuantile(0.50, dh)) + vm.P95 = secondsToMs(histogramQuantile(0.95, dh)) + vm.P99 = secondsToMs(histogramQuantile(0.99, dh)) + + im.Vendors = append(im.Vendors, vm) + } + sort.Slice(im.Vendors, func(i, j int) bool { + return im.Vendors[i].VendorID < im.Vendors[j].VendorID + }) +} + +// fillTrends computes trend values by comparing the current rate to the rate +// from approximately 1 hour ago. +func (*Collector) fillTrends(im *InstanceMetrics, buf *Ring) { + if buf.Len() < 4 { + return + } + newest := buf.At(buf.Len() - 1) + oldest := buf.At(0) + if newest.Time.Sub(oldest.Time) < 50*time.Minute { + return + } + + // Find the snapshot closest to 1h ago and form a rate pair. + target := newest.Time.Add(-1 * time.Hour) + idx := findNearest(buf, target) + start := idx + if start > 0 { + start = idx - 1 + } + if start+1 >= buf.Len() { + return + } + prev := buf.At(start) + curr := buf.At(start + 1) + dt := curr.Time.Sub(prev.Time) + + oldRPS := counterRate(prev.totalRequests(), curr.totalRequests(), dt) + rpsTrend := im.RPS - oldRPS + im.RPSTrend = &rpsTrend + + oldErr := errorRate(prev.totalRequests(), curr.totalRequests(), prev.totalErrors(), curr.totalErrors()) + errTrend := im.ErrorRate - oldErr + im.ErrorRateTrend = &errTrend +} + +type historicalTrend struct { + rps float64 + reqDelta float64 + errDelta float64 +} + +// trendSnapshot returns historical RPS and request/error deltas from ~1h ago. +func (*Collector) trendSnapshot(buf *Ring) (historicalTrend, bool) { + if buf.Len() < 4 { + return historicalTrend{}, false + } + newest := buf.At(buf.Len() - 1) + oldest := buf.At(0) + if newest.Time.Sub(oldest.Time) < 50*time.Minute { + return historicalTrend{}, false + } + + target := newest.Time.Add(-1 * time.Hour) + idx := findNearest(buf, target) + start := idx + if start > 0 { + start = idx - 1 + } + if start+1 >= buf.Len() { + return historicalTrend{}, false + } + prev := buf.At(start) + curr := buf.At(start + 1) + dt := curr.Time.Sub(prev.Time) + + reqD := curr.totalRequests() - prev.totalRequests() + errD := curr.totalErrors() - prev.totalErrors() + if reqD < 0 || errD < 0 { + return historicalTrend{}, false + } + + return historicalTrend{ + rps: counterRate(prev.totalRequests(), curr.totalRequests(), dt), + reqDelta: reqD, + errDelta: errD, + }, true +} + +// buildSeries creates the global time series from the ring buffer. +func (*Collector) buildSeries(buf *Ring) []SeriesPoint { + if buf.Len() < 2 { + return nil + } + + points := make([]SeriesPoint, 0, buf.Len()-1) + for i := 1; i < buf.Len(); i++ { + prev := buf.At(i - 1) + curr := buf.At(i) + dt := curr.Time.Sub(prev.Time) + + dh := mergeHistogramDelta(prev, curr) + points = append(points, SeriesPoint{ + Time: curr.Time.Unix(), + RPS: counterRate(prev.totalRequests(), curr.totalRequests(), dt), + ErrorRate: errorRate(prev.totalRequests(), curr.totalRequests(), prev.totalErrors(), curr.totalErrors()), + P99: secondsToMs(histogramQuantile(0.99, dh)), + ActiveConnections: curr.ActiveConnections, + }) + } + return points +} + +// buildVendorSeries creates per-vendor time series from the ring buffer. +func (*Collector) buildVendorSeries(buf *Ring) map[string][]VendorSeriesPoint { + if buf.Len() < 2 { + return nil + } + + allVendors := make(map[string]bool) + for i := 0; i < buf.Len(); i++ { + for vid := range buf.At(i).Vendors { + allVendors[vid] = true + } + } + if len(allVendors) == 0 { + return nil + } + + result := make(map[string][]VendorSeriesPoint, len(allVendors)) + for vid := range allVendors { + points := make([]VendorSeriesPoint, 0, buf.Len()-1) + for i := 1; i < buf.Len(); i++ { + prev := buf.At(i - 1) + curr := buf.At(i) + dt := curr.Time.Sub(prev.Time) + pv := vendorOr(prev, vid) + cv := vendorOr(curr, vid) + + dh := histogramDelta(pv.Duration, cv.Duration) + points = append(points, VendorSeriesPoint{ + Time: curr.Time.Unix(), + RPS: counterRate(pv.RequestsTotal, cv.RequestsTotal, dt), + ErrorRate: errorRate(pv.RequestsTotal, cv.RequestsTotal, pv.RequestsErrors, cv.RequestsErrors), + P50: secondsToMs(histogramQuantile(0.50, dh)), + P95: secondsToMs(histogramQuantile(0.95, dh)), + P99: secondsToMs(histogramQuantile(0.99, dh)), + }) + } + result[vid] = points + } + return result +} + +// --- helpers --- + +// findNearest returns the index of the snapshot closest to target time. +func findNearest(buf *Ring, target time.Time) int { + best := 0 + bestDelta := absDuration(buf.At(0).Time.Sub(target)) + for i := 1; i < buf.Len(); i++ { + d := absDuration(buf.At(i).Time.Sub(target)) + if d < bestDelta { + best = i + bestDelta = d + } + } + return best +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +} + +// mergeHistogramDelta merges all vendor histograms and computes their delta. +func mergeHistogramDelta(prev, curr Snapshot) Histogram { + pm := mergeVendorHistograms(prev) + cm := mergeVendorHistograms(curr) + return histogramDelta(pm, cm) +} + +// mergeVendorHistograms combines histograms across all vendors in a snapshot. +func mergeVendorHistograms(s Snapshot) Histogram { + var merged Histogram + for _, vs := range s.Vendors { + merged = addHistograms(merged, vs.Duration) + } + return merged +} + +// addHistograms sums two histograms that share the same bucket boundaries. +// All vendors on a single proxy share the same HistogramVec bucket layout, +// so boundaries always match in practice. If they ever diverge (proxy version +// drift), we fall back to the histogram with more observations rather than +// silently producing a corrupt merge. +func addHistograms(a, b Histogram) Histogram { + if len(a.Buckets) == 0 { + return b + } + if len(b.Buckets) == 0 { + return a + } + if !sameBucketBoundaries(a, b) { + if a.Count >= b.Count { + return a + } + return b + } + result := Histogram{ + Count: a.Count + b.Count, + Sum: a.Sum + b.Sum, + Buckets: make([]Bucket, len(a.Buckets)), + } + for i := range result.Buckets { + result.Buckets[i] = Bucket{ + UpperBound: a.Buckets[i].UpperBound, + Count: a.Buckets[i].Count + b.Buckets[i].Count, + } + } + return result +} + +func sameBucketBoundaries(a, b Histogram) bool { + if len(a.Buckets) != len(b.Buckets) { + return false + } + for i := range a.Buckets { + if a.Buckets[i].UpperBound != b.Buckets[i].UpperBound { + return false + } + } + return true +} + +// vendorOr returns the VendorSnapshot for a vendor or a zero value. +func vendorOr(s Snapshot, id string) VendorSnapshot { + if vs, ok := s.Vendors[id]; ok { + return *vs + } + return VendorSnapshot{} +} + +// collectVendorIDs returns the sorted union of vendor IDs from two snapshots. +func collectVendorIDs(a, b Snapshot) []string { + seen := make(map[string]bool) + for k := range a.Vendors { + seen[k] = true + } + for k := range b.Vendors { + seen[k] = true + } + ids := make([]string, 0, len(seen)) + for k := range seen { + ids = append(ids, k) + } + sort.Strings(ids) + return ids +} diff --git a/admin/metrics/collector_test.go b/admin/metrics/collector_test.go new file mode 100644 index 0000000..8110d44 --- /dev/null +++ b/admin/metrics/collector_test.go @@ -0,0 +1,419 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "math" + "testing" + "time" +) + +func makeSnapshot(t time.Time, totalReq, errReq, active, panics float64) Snapshot { + return Snapshot{ + Time: t, + Vendors: map[string]*VendorSnapshot{ + "acme": { + RequestsTotal: totalReq * 0.6, + RequestsErrors: errReq * 0.6, + Duration: Histogram{ + Count: totalReq * 0.6, + Sum: totalReq * 0.6 * 0.15, + Buckets: []Bucket{ + {UpperBound: 0.05, Count: totalReq * 0.6 * 0.2}, + {UpperBound: 0.1, Count: totalReq * 0.6 * 0.5}, + {UpperBound: 0.25, Count: totalReq * 0.6 * 0.8}, + {UpperBound: 0.5, Count: totalReq * 0.6 * 0.95}, + {UpperBound: 1.0, Count: totalReq * 0.6}, + }, + }, + }, + "beta": { + RequestsTotal: totalReq * 0.4, + RequestsErrors: errReq * 0.4, + Duration: Histogram{ + Count: totalReq * 0.4, + Sum: totalReq * 0.4 * 0.1, + Buckets: []Bucket{ + {UpperBound: 0.05, Count: totalReq * 0.4 * 0.3}, + {UpperBound: 0.1, Count: totalReq * 0.4 * 0.6}, + {UpperBound: 0.25, Count: totalReq * 0.4 * 0.9}, + {UpperBound: 0.5, Count: totalReq * 0.4 * 0.98}, + {UpperBound: 1.0, Count: totalReq * 0.4}, + }, + }, + }, + }, + ActiveConnections: active, + PanicsTotal: panics, + } +} + +func TestCollector_RecordScrape_ParsesAndStores(t *testing.T) { + t.Parallel() + c := NewCollector(10) + + err := c.RecordScrape(1, []byte(sampleMetrics), time.Now()) + if err != nil { + t.Fatalf("RecordScrape() error = %v", err) + } + + c.mu.RLock() + buf, ok := c.buffers[1] + c.mu.RUnlock() + + if !ok { + t.Fatal("expected buffer for instance 1") + } + if buf.Len() != 1 { + t.Errorf("buffer Len() = %d, want 1", buf.Len()) + } +} + +func TestCollector_RecordScrape_MalformedData_ReturnsError(t *testing.T) { + t.Parallel() + c := NewCollector(10) + + err := c.RecordScrape(1, []byte("# TYPE foo gauge\n# TYPE foo counter\nfoo 1\n"), time.Now()) + if err == nil { + t.Error("expected error for malformed data") + } +} + +func TestCollector_Remove(t *testing.T) { + t.Parallel() + c := NewCollector(10) + c.Record(1, Snapshot{Time: time.Now()}) + c.Remove(1) + + c.mu.RLock() + _, ok := c.buffers[1] + c.mu.RUnlock() + + if ok { + t.Error("expected buffer to be removed") + } +} + +func TestCollector_Prune(t *testing.T) { + t.Parallel() + c := NewCollector(10) + c.Record(1, Snapshot{Time: time.Now()}) + c.Record(2, Snapshot{Time: time.Now()}) + c.Record(3, Snapshot{Time: time.Now()}) + + c.Prune(map[int64]bool{1: true, 3: true}) + + c.mu.RLock() + _, has2 := c.buffers[2] + c.mu.RUnlock() + + if has2 { + t.Error("expected instance 2 to be pruned") + } +} + +func TestCollector_GetInstanceMetrics_NoData_ReturnsNil(t *testing.T) { + t.Parallel() + c := NewCollector(10) + if got := c.GetInstanceMetrics(99); got != nil { + t.Error("expected nil for unknown instance") + } +} + +func TestCollector_GetInstanceMetrics_SingleSnapshot_NoRates(t *testing.T) { + t.Parallel() + c := NewCollector(10) + c.Record(1, makeSnapshot(time.Now(), 1000, 50, 10, 2)) + + im := c.GetInstanceMetrics(1) + if im == nil { + t.Fatal("expected non-nil InstanceMetrics") + } + if im.DataPoints != 1 { + t.Errorf("DataPoints = %d, want 1", im.DataPoints) + } + // With only 1 snapshot, rates should be zero. + if im.RPS != 0 { + t.Errorf("RPS = %v, want 0 (single snapshot)", im.RPS) + } + if im.ActiveConnections != 10 { + t.Errorf("ActiveConnections = %v, want 10", im.ActiveConnections) + } +} + +func TestCollector_GetInstanceMetrics_TwoSnapshots_ComputesRates(t *testing.T) { + t.Parallel() + c := NewCollector(10) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + t1 := t0.Add(10 * time.Second) + + c.Record(1, makeSnapshot(t0, 1000, 50, 10, 1)) + c.Record(1, makeSnapshot(t1, 1100, 55, 12, 2)) + + im := c.GetInstanceMetrics(1) + if im == nil { + t.Fatal("expected non-nil InstanceMetrics") + } + + // RPS: (1100 - 1000) / 10 = 10 + if math.Abs(im.RPS-10.0) > 0.01 { + t.Errorf("RPS = %v, want ~10", im.RPS) + } + + // Error rate: (55-50) / (1100-1000) = 5/100 = 0.05 + if math.Abs(im.ErrorRate-0.05) > 0.001 { + t.Errorf("ErrorRate = %v, want ~0.05", im.ErrorRate) + } + + if im.ActiveConnections != 12 { + t.Errorf("ActiveConnections = %v, want 12", im.ActiveConnections) + } + if im.PanicsTotal != 2 { + t.Errorf("PanicsTotal = %v, want 2", im.PanicsTotal) + } + + // Should have latency percentiles > 0 + if im.P50 <= 0 { + t.Errorf("P50 = %v, want > 0", im.P50) + } + if im.P99 <= 0 { + t.Errorf("P99 = %v, want > 0", im.P99) + } + + // Should have vendor metrics + if len(im.Vendors) != 2 { + t.Errorf("Vendors = %d, want 2", len(im.Vendors)) + } +} + +func TestCollector_GetInstanceMetrics_SeriesGenerated(t *testing.T) { + t.Parallel() + c := NewCollector(100) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + + for i := 0; i < 5; i++ { + c.Record(1, makeSnapshot( + t0.Add(time.Duration(i)*10*time.Second), + float64(1000+i*100), + float64(50+i*5), + 10, + float64(i), + )) + } + + im := c.GetInstanceMetrics(1) + if im == nil { + t.Fatal("expected non-nil InstanceMetrics") + } + + // 5 snapshots → 4 series points + if len(im.Series) != 4 { + t.Errorf("Series length = %d, want 4", len(im.Series)) + } + + // Should have vendor series for both vendors + if len(im.VendorSeries) != 2 { + t.Errorf("VendorSeries count = %d, want 2", len(im.VendorSeries)) + } + for vid, series := range im.VendorSeries { + if len(series) != 4 { + t.Errorf("VendorSeries[%s] length = %d, want 4", vid, len(series)) + } + } +} + +func TestCollector_GetInstanceMetrics_TrendWithEnoughData(t *testing.T) { + t.Parallel() + c := NewCollector(DefaultCapacity) + t0 := time.Date(2026, 3, 7, 11, 0, 0, 0, time.UTC) + + // Fill 1h of data at 10s intervals + for i := 0; i <= 360; i++ { + c.Record(1, makeSnapshot( + t0.Add(time.Duration(i)*10*time.Second), + float64(i*100), + float64(i*5), + 10, + 0, + )) + } + + im := c.GetInstanceMetrics(1) + if im == nil { + t.Fatal("expected non-nil InstanceMetrics") + } + if im.RPSTrend == nil { + t.Error("expected RPSTrend to be set with 1h of data") + } + if im.ErrorRateTrend == nil { + t.Error("expected ErrorRateTrend to be set with 1h of data") + } +} + +func TestCollector_GetInstanceMetrics_NoTrendWithInsufficientData(t *testing.T) { + t.Parallel() + c := NewCollector(100) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + + for i := 0; i < 5; i++ { + c.Record(1, makeSnapshot( + t0.Add(time.Duration(i)*10*time.Second), + float64(1000+i*100), + float64(50+i*5), + 10, + 0, + )) + } + + im := c.GetInstanceMetrics(1) + if im == nil { + t.Fatal("expected non-nil InstanceMetrics") + } + if im.RPSTrend != nil { + t.Error("expected nil RPSTrend with < 50min of data") + } +} + +func TestCollector_GetFleetMetrics_AggregatesAcrossInstances(t *testing.T) { + t.Parallel() + c := NewCollector(10) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + t1 := t0.Add(10 * time.Second) + + c.Record(1, makeSnapshot(t0, 1000, 50, 10, 1)) + c.Record(1, makeSnapshot(t1, 1100, 55, 12, 2)) + + c.Record(2, makeSnapshot(t0, 2000, 100, 20, 0)) + c.Record(2, makeSnapshot(t1, 2200, 110, 22, 1)) + + fm := c.GetFleetMetrics([]int64{1, 2}) + + // RPS: instance1=10, instance2=20 → 30 + if math.Abs(fm.TotalRPS-30.0) > 0.01 { + t.Errorf("TotalRPS = %v, want ~30", fm.TotalRPS) + } + if fm.TotalActiveConnections != 34 { + t.Errorf("TotalActiveConnections = %v, want 34", fm.TotalActiveConnections) + } + if fm.TotalPanics != 3 { + t.Errorf("TotalPanics = %v, want 3", fm.TotalPanics) + } + if len(fm.Instances) != 2 { + t.Errorf("Instances = %d, want 2", len(fm.Instances)) + } +} + +func TestCollector_GetFleetMetrics_SkipsMissingInstances(t *testing.T) { + t.Parallel() + c := NewCollector(10) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + t1 := t0.Add(10 * time.Second) + + c.Record(1, makeSnapshot(t0, 1000, 50, 10, 0)) + c.Record(1, makeSnapshot(t1, 1100, 55, 12, 0)) + + // Instance 99 has no data + fm := c.GetFleetMetrics([]int64{1, 99}) + + if len(fm.Instances) != 1 { + t.Errorf("Instances = %d, want 1 (skip missing)", len(fm.Instances)) + } +} + +func TestAddHistograms_MismatchedBoundaries_FallsBack(t *testing.T) { + t.Parallel() + a := Histogram{ + Count: 100, + Buckets: []Bucket{{UpperBound: 0.05, Count: 50}, {UpperBound: 0.1, Count: 100}}, + } + b := Histogram{ + Count: 200, + Buckets: []Bucket{{UpperBound: 0.01, Count: 100}, {UpperBound: 0.5, Count: 200}}, + } + result := addHistograms(a, b) + + // Should fall back to b (higher count) instead of merging mismatched buckets. + if result.Count != 200 { + t.Errorf("Count = %v, want 200 (fallback to b)", result.Count) + } + if result.Buckets[0].UpperBound != 0.01 { + t.Errorf("Bucket[0].UpperBound = %v, want 0.01 (b's boundaries)", result.Buckets[0].UpperBound) + } +} + +func TestGetFleetMetrics_CounterReset_DoesNotCorruptErrorRate(t *testing.T) { + t.Parallel() + c := NewCollector(10) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + t1 := t0.Add(10 * time.Second) + + // Instance 1: normal operation + c.Record(1, makeSnapshot(t0, 1000, 50, 10, 0)) + c.Record(1, makeSnapshot(t1, 1100, 55, 12, 0)) + + // Instance 2: counter reset (counters dropped from 2000 to 100) + c.Record(2, makeSnapshot(t0, 2000, 100, 20, 0)) + c.Record(2, makeSnapshot(t1, 100, 5, 22, 0)) + + fm := c.GetFleetMetrics([]int64{1, 2}) + + // Fleet error rate should only reflect instance 1 (instance 2 skipped due to reset). + // Instance 1: 5 errors / 100 requests = 0.05 + if fm.FleetErrorRate < 0 { + t.Errorf("FleetErrorRate = %v, want >= 0 (counter reset should not corrupt)", fm.FleetErrorRate) + } + if math.Abs(fm.FleetErrorRate-0.05) > 0.01 { + t.Errorf("FleetErrorRate = %v, want ~0.05 (only instance 1)", fm.FleetErrorRate) + } +} + +func TestGetFleetMetrics_ErrorRateTrend_Populated(t *testing.T) { + t.Parallel() + c := NewCollector(DefaultCapacity) + t0 := time.Date(2026, 3, 7, 11, 0, 0, 0, time.UTC) + + for i := 0; i <= 360; i++ { + c.Record(1, makeSnapshot( + t0.Add(time.Duration(i)*10*time.Second), + float64(i*100), + float64(i*5), + 10, + 0, + )) + } + + fm := c.GetFleetMetrics([]int64{1}) + if fm.ErrorRateTrend == nil { + t.Error("expected ErrorRateTrend to be set with 1h of data") + } +} + +func TestCollector_GetInstanceSummary_NoData_ReturnsNil(t *testing.T) { + t.Parallel() + c := NewCollector(10) + if got := c.GetInstanceSummary(1); got != nil { + t.Error("expected nil for unknown instance") + } +} + +func TestCollector_GetInstanceSummary_ComputesKPIs(t *testing.T) { + t.Parallel() + c := NewCollector(10) + t0 := time.Date(2026, 3, 7, 12, 0, 0, 0, time.UTC) + t1 := t0.Add(10 * time.Second) + + c.Record(1, makeSnapshot(t0, 1000, 50, 10, 1)) + c.Record(1, makeSnapshot(t1, 1100, 55, 12, 2)) + + s := c.GetInstanceSummary(1) + if s == nil { + t.Fatal("expected non-nil InstanceSummary") + } + if math.Abs(s.RPS-10.0) > 0.01 { + t.Errorf("RPS = %v, want ~10", s.RPS) + } + if s.ActiveConnections != 12 { + t.Errorf("ActiveConnections = %v, want 12", s.ActiveConnections) + } +} diff --git a/admin/metrics/compute.go b/admin/metrics/compute.go new file mode 100644 index 0000000..5f54164 --- /dev/null +++ b/admin/metrics/compute.go @@ -0,0 +1,95 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "math" + "time" +) + +// counterRate computes the per-second rate between two counter values. +// Returns 0 if the counter was reset (curr < prev) or the time delta is zero. +func counterRate(prev, curr float64, dt time.Duration) float64 { + if dt <= 0 || curr < prev { + return 0 + } + return (curr - prev) / dt.Seconds() +} + +// errorRate computes the fraction of errors in the interval. +// Returns 0 if the total delta is zero or a counter reset occurred. +func errorRate(prevTotal, currTotal, prevErrors, currErrors float64) float64 { + totalDelta := currTotal - prevTotal + if totalDelta <= 0 { + return 0 + } + errDelta := currErrors - prevErrors + if errDelta < 0 { + return 0 + } + rate := errDelta / totalDelta + return math.Min(rate, 1) +} + +// histogramQuantile computes the q-th quantile (0 ≤ q ≤ 1) from cumulative +// histogram buckets using linear interpolation — the standard Prometheus method. +// +// Buckets must be sorted by UpperBound with the +Inf bucket excluded. +func histogramQuantile(q float64, h Histogram) float64 { + if h.Count == 0 || len(h.Buckets) == 0 { + return 0 + } + + rank := q * h.Count + var prevCount, prevBound float64 + + for _, b := range h.Buckets { + if b.Count >= rank { + if b.Count == prevCount { + return prevBound + } + fraction := (rank - prevCount) / (b.Count - prevCount) + return prevBound + fraction*(b.UpperBound-prevBound) + } + prevCount = b.Count + prevBound = b.UpperBound + } + + // All observations above the highest finite bucket. + return h.Buckets[len(h.Buckets)-1].UpperBound +} + +// histogramDelta computes the per-interval histogram by subtracting prev from +// curr. On counter reset (curr.Count < prev.Count) it returns curr as-is. +func histogramDelta(prev, curr Histogram) Histogram { + if curr.Count < prev.Count { + return curr + } + + delta := Histogram{ + Count: curr.Count - prev.Count, + Sum: curr.Sum - prev.Sum, + } + + for i, b := range curr.Buckets { + bc := b.Count + if i < len(prev.Buckets) { + bc -= prev.Buckets[i].Count + if bc < 0 { + bc = 0 + } + } + delta.Buckets = append(delta.Buckets, Bucket{ + UpperBound: b.UpperBound, + Count: bc, + }) + } + + return delta +} + +// secondsToMs converts seconds to milliseconds, rounding to 2 decimal places. +func secondsToMs(s float64) float64 { + return math.Round(s*100000) / 100 +} diff --git a/admin/metrics/compute_test.go b/admin/metrics/compute_test.go new file mode 100644 index 0000000..f35b97e --- /dev/null +++ b/admin/metrics/compute_test.go @@ -0,0 +1,176 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "math" + "testing" + "time" +) + +func TestCounterRate_Normal(t *testing.T) { + t.Parallel() + got := counterRate(100, 200, 10*time.Second) + if got != 10.0 { + t.Errorf("counterRate(100, 200, 10s) = %v, want 10", got) + } +} + +func TestCounterRate_Reset_ReturnsZero(t *testing.T) { + t.Parallel() + got := counterRate(200, 50, 10*time.Second) + if got != 0 { + t.Errorf("counterRate(200, 50, 10s) = %v, want 0 (reset)", got) + } +} + +func TestCounterRate_ZeroDuration_ReturnsZero(t *testing.T) { + t.Parallel() + got := counterRate(100, 200, 0) + if got != 0 { + t.Errorf("counterRate(100, 200, 0) = %v, want 0", got) + } +} + +func TestErrorRate_Normal(t *testing.T) { + t.Parallel() + // 100 total requests, 10 errors → 10% + got := errorRate(900, 1000, 40, 50) + if math.Abs(got-0.1) > 0.001 { + t.Errorf("errorRate(900,1000,40,50) = %v, want ~0.1", got) + } +} + +func TestErrorRate_ZeroTotal_ReturnsZero(t *testing.T) { + t.Parallel() + got := errorRate(100, 100, 5, 5) + if got != 0 { + t.Errorf("errorRate with zero total delta = %v, want 0", got) + } +} + +func TestErrorRate_Reset_ReturnsZero(t *testing.T) { + t.Parallel() + got := errorRate(100, 50, 10, 5) + if got != 0 { + t.Errorf("errorRate with counter reset = %v, want 0", got) + } +} + +func TestErrorRate_CappedAtOne(t *testing.T) { + t.Parallel() + // Pathological case: error delta > total delta + got := errorRate(0, 10, 0, 20) + if got != 1 { + t.Errorf("errorRate capped = %v, want 1", got) + } +} + +func TestHistogramQuantile_P50(t *testing.T) { + t.Parallel() + h := Histogram{ + Count: 1000, + Buckets: []Bucket{ + {UpperBound: 0.05, Count: 200}, + {UpperBound: 0.1, Count: 500}, + {UpperBound: 0.25, Count: 800}, + {UpperBound: 0.5, Count: 950}, + {UpperBound: 1.0, Count: 1000}, + }, + } + // rank = 0.5 * 1000 = 500 + // Falls in bucket [0.05, 0.1] at count 500 exactly → boundary + got := histogramQuantile(0.50, h) + if got != 0.1 { + t.Errorf("p50 = %v, want 0.1", got) + } +} + +func TestHistogramQuantile_P99(t *testing.T) { + t.Parallel() + h := Histogram{ + Count: 1000, + Buckets: []Bucket{ + {UpperBound: 0.05, Count: 200}, + {UpperBound: 0.1, Count: 500}, + {UpperBound: 0.25, Count: 800}, + {UpperBound: 0.5, Count: 950}, + {UpperBound: 1.0, Count: 1000}, + }, + } + // rank = 0.99 * 1000 = 990 + // Falls in bucket [0.5, 1.0]: prevCount=950, count=1000 + // fraction = (990 - 950) / (1000 - 950) = 40/50 = 0.8 + // result = 0.5 + 0.8 * (1.0 - 0.5) = 0.5 + 0.4 = 0.9 + got := histogramQuantile(0.99, h) + if math.Abs(got-0.9) > 0.001 { + t.Errorf("p99 = %v, want ~0.9", got) + } +} + +func TestHistogramQuantile_Empty_ReturnsZero(t *testing.T) { + t.Parallel() + got := histogramQuantile(0.5, Histogram{}) + if got != 0 { + t.Errorf("quantile of empty histogram = %v, want 0", got) + } +} + +func TestHistogramQuantile_AllAboveHighestBucket(t *testing.T) { + t.Parallel() + h := Histogram{ + Count: 100, + Buckets: []Bucket{{UpperBound: 0.1, Count: 0}}, + } + got := histogramQuantile(0.5, h) + // All 100 observations are above the only bucket → return bucket upper bound + if got != 0.1 { + t.Errorf("quantile above all buckets = %v, want 0.1", got) + } +} + +func TestHistogramDelta_Normal(t *testing.T) { + t.Parallel() + prev := Histogram{ + Count: 100, Sum: 10, + Buckets: []Bucket{{UpperBound: 0.1, Count: 50}, {UpperBound: 0.5, Count: 100}}, + } + curr := Histogram{ + Count: 200, Sum: 25, + Buckets: []Bucket{{UpperBound: 0.1, Count: 80}, {UpperBound: 0.5, Count: 200}}, + } + + d := histogramDelta(prev, curr) + if d.Count != 100 { + t.Errorf("delta Count = %v, want 100", d.Count) + } + if d.Sum != 15 { + t.Errorf("delta Sum = %v, want 15", d.Sum) + } + if d.Buckets[0].Count != 30 { + t.Errorf("delta bucket[0] Count = %v, want 30", d.Buckets[0].Count) + } + if d.Buckets[1].Count != 100 { + t.Errorf("delta bucket[1] Count = %v, want 100", d.Buckets[1].Count) + } +} + +func TestHistogramDelta_Reset_ReturnsCurr(t *testing.T) { + t.Parallel() + prev := Histogram{Count: 200, Buckets: []Bucket{{UpperBound: 0.1, Count: 200}}} + curr := Histogram{Count: 50, Buckets: []Bucket{{UpperBound: 0.1, Count: 50}}} + + d := histogramDelta(prev, curr) + if d.Count != 50 { + t.Errorf("delta after reset Count = %v, want 50 (curr)", d.Count) + } +} + +func TestSecondsToMs(t *testing.T) { + t.Parallel() + got := secondsToMs(0.123) + if math.Abs(got-123.0) > 0.01 { + t.Errorf("secondsToMs(0.123) = %v, want ~123", got) + } +} diff --git a/admin/metrics/metrics.go b/admin/metrics/metrics.go new file mode 100644 index 0000000..5c0e5e2 --- /dev/null +++ b/admin/metrics/metrics.go @@ -0,0 +1,154 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +// Package metrics parses Prometheus metrics from Chaperone proxy instances, +// stores them in ring buffers, and computes rates and percentiles. +package metrics + +import "time" + +// DefaultCapacity is the number of scrape snapshots retained per instance. +// At 10s intervals this gives ~1 hour of history. +const DefaultCapacity = 360 + +// Prometheus metric names emitted by the Chaperone proxy. +const ( + metricRequestsTotal = "chaperone_requests_total" + metricDurationSeconds = "chaperone_request_duration_seconds" + metricActiveConns = "chaperone_active_connections" + metricPanicsTotal = "chaperone_panics_total" + + labelVendorID = "vendor_id" + labelStatusClass = "status_class" +) + +// Snapshot holds parsed metrics from a single scrape of one proxy instance. +type Snapshot struct { + Time time.Time + Vendors map[string]*VendorSnapshot + ActiveConnections float64 + PanicsTotal float64 +} + +// VendorSnapshot holds per-vendor counters and histogram for a single scrape. +type VendorSnapshot struct { + RequestsTotal float64 + RequestsErrors float64 // 4xx + 5xx + Duration Histogram +} + +// Histogram holds cumulative histogram bucket data. +type Histogram struct { + Buckets []Bucket + Count float64 + Sum float64 +} + +// Bucket is a single cumulative histogram bucket. +type Bucket struct { + UpperBound float64 + Count float64 // cumulative count of observations <= UpperBound +} + +// vendorOrCreate returns the VendorSnapshot for the given vendor, creating it if needed. +func (s *Snapshot) vendorOrCreate(id string) *VendorSnapshot { + if id == "" { + id = "unknown" + } + vs, ok := s.Vendors[id] + if !ok { + vs = &VendorSnapshot{} + s.Vendors[id] = vs + } + return vs +} + +// totalRequests returns the sum of RequestsTotal across all vendors. +func (s *Snapshot) totalRequests() float64 { + var total float64 + for _, vs := range s.Vendors { + total += vs.RequestsTotal + } + return total +} + +// totalErrors returns the sum of RequestsErrors across all vendors. +func (s *Snapshot) totalErrors() float64 { + var total float64 + for _, vs := range s.Vendors { + total += vs.RequestsErrors + } + return total +} + +// --- API response types --- + +// InstanceMetrics is returned by GET /api/metrics/{id}. +type InstanceMetrics struct { + InstanceID int64 `json:"instance_id"` + CollectedAt time.Time `json:"collected_at"` + DataPoints int `json:"data_points"` + RPS float64 `json:"rps"` + ErrorRate float64 `json:"error_rate"` + ActiveConnections float64 `json:"active_connections"` + PanicsTotal float64 `json:"panics_total"` + P50 float64 `json:"p50_ms"` + P95 float64 `json:"p95_ms"` + P99 float64 `json:"p99_ms"` + RPSTrend *float64 `json:"rps_trend"` + ErrorRateTrend *float64 `json:"error_rate_trend"` + Vendors []VendorMetrics `json:"vendors"` + Series []SeriesPoint `json:"series"` + VendorSeries map[string][]VendorSeriesPoint `json:"vendor_series"` +} + +// FleetMetrics is returned by GET /api/metrics/fleet. +type FleetMetrics struct { + CollectedAt time.Time `json:"collected_at"` + TotalRPS float64 `json:"total_rps"` + FleetErrorRate float64 `json:"fleet_error_rate"` + TotalActiveConnections float64 `json:"total_active_connections"` + TotalPanics float64 `json:"total_panics"` + RPSTrend *float64 `json:"rps_trend"` + ErrorRateTrend *float64 `json:"error_rate_trend"` + Instances []InstanceSummary `json:"instances"` +} + +// InstanceSummary is a compact per-instance overview for the fleet endpoint. +type InstanceSummary struct { + InstanceID int64 `json:"instance_id"` + RPS float64 `json:"rps"` + ErrorRate float64 `json:"error_rate"` + ActiveConnections float64 `json:"active_connections"` + PanicsTotal float64 `json:"panics_total"` + P99 float64 `json:"p99_ms"` +} + +// VendorMetrics holds current per-vendor KPIs. +type VendorMetrics struct { + VendorID string `json:"vendor_id"` + RPS float64 `json:"rps"` + ErrorRate float64 `json:"error_rate"` + P50 float64 `json:"p50_ms"` + P95 float64 `json:"p95_ms"` + P99 float64 `json:"p99_ms"` +} + +// SeriesPoint is one data point in a global time series. +type SeriesPoint struct { + Time int64 `json:"t"` + RPS float64 `json:"rps"` + ErrorRate float64 `json:"err"` + P99 float64 `json:"p99"` + ActiveConnections float64 `json:"conn"` +} + +// VendorSeriesPoint is one data point in a per-vendor time series. +type VendorSeriesPoint struct { + Time int64 `json:"t"` + RPS float64 `json:"rps"` + ErrorRate float64 `json:"err"` + P50 float64 `json:"p50"` + P95 float64 `json:"p95"` + P99 float64 `json:"p99"` +} diff --git a/admin/metrics/parse.go b/admin/metrics/parse.go new file mode 100644 index 0000000..7ed4160 --- /dev/null +++ b/admin/metrics/parse.go @@ -0,0 +1,117 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "bytes" + "fmt" + "math" + "sort" + "time" + + dto "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" + "github.com/prometheus/common/model" +) + +// Parse parses Prometheus text exposition format data into a Snapshot. +// Only Chaperone-specific metrics are extracted; unknown metrics are ignored. +func Parse(data []byte, t time.Time) (*Snapshot, error) { + parser := expfmt.NewTextParser(model.LegacyValidation) + families, err := parser.TextToMetricFamilies(bytes.NewReader(data)) + if err != nil { + return nil, fmt.Errorf("parsing prometheus text: %w", err) + } + + snap := &Snapshot{ + Time: t, + Vendors: make(map[string]*VendorSnapshot), + } + + for name, family := range families { + switch name { + case metricRequestsTotal: + parseRequests(family, snap) + case metricDurationSeconds: + parseDuration(family, snap) + case metricActiveConns: + parseGauge(family, &snap.ActiveConnections) + case metricPanicsTotal: + parseCounter(family, &snap.PanicsTotal) + } + } + + return snap, nil +} + +// parseRequests extracts chaperone_requests_total counters, summing across +// methods but preserving vendor_id and status_class dimensions. +func parseRequests(family *dto.MetricFamily, snap *Snapshot) { + for _, m := range family.GetMetric() { + vendorID := labelValue(m.GetLabel(), labelVendorID) + statusClass := labelValue(m.GetLabel(), labelStatusClass) + value := m.GetCounter().GetValue() + + vs := snap.vendorOrCreate(vendorID) + vs.RequestsTotal += value + if statusClass == "4xx" || statusClass == "5xx" { + vs.RequestsErrors += value + } + } +} + +// parseDuration extracts chaperone_request_duration_seconds histograms per vendor. +func parseDuration(family *dto.MetricFamily, snap *Snapshot) { + for _, m := range family.GetMetric() { + vendorID := labelValue(m.GetLabel(), labelVendorID) + h := m.GetHistogram() + if h == nil { + continue + } + + hist := Histogram{ + Count: float64(h.GetSampleCount()), + Sum: h.GetSampleSum(), + } + for _, b := range h.GetBucket() { + hist.Buckets = append(hist.Buckets, Bucket{ + UpperBound: b.GetUpperBound(), + Count: float64(b.GetCumulativeCount()), + }) + } + sort.Slice(hist.Buckets, func(i, j int) bool { + return hist.Buckets[i].UpperBound < hist.Buckets[j].UpperBound + }) + // Strip the +Inf bucket — we use Count for that. + if n := len(hist.Buckets); n > 0 && math.IsInf(hist.Buckets[n-1].UpperBound, 1) { + hist.Buckets = hist.Buckets[:n-1] + } + + snap.vendorOrCreate(vendorID).Duration = hist + } +} + +// parseGauge extracts a single gauge value (first metric in the family). +func parseGauge(family *dto.MetricFamily, dst *float64) { + if ms := family.GetMetric(); len(ms) > 0 { + *dst = ms[0].GetGauge().GetValue() + } +} + +// parseCounter extracts a single counter value (first metric in the family). +func parseCounter(family *dto.MetricFamily, dst *float64) { + if ms := family.GetMetric(); len(ms) > 0 { + *dst = ms[0].GetCounter().GetValue() + } +} + +// labelValue returns the value for the named label, or "" if not found. +func labelValue(labels []*dto.LabelPair, name string) string { + for _, l := range labels { + if l.GetName() == name { + return l.GetValue() + } + } + return "" +} diff --git a/admin/metrics/parse_test.go b/admin/metrics/parse_test.go new file mode 100644 index 0000000..8487d52 --- /dev/null +++ b/admin/metrics/parse_test.go @@ -0,0 +1,174 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "testing" + "time" +) + +const sampleMetrics = `# HELP chaperone_requests_total Total number of requests processed +# TYPE chaperone_requests_total counter +chaperone_requests_total{vendor_id="acme",status_class="2xx",method="GET"} 1500 +chaperone_requests_total{vendor_id="acme",status_class="2xx",method="POST"} 300 +chaperone_requests_total{vendor_id="acme",status_class="4xx",method="GET"} 50 +chaperone_requests_total{vendor_id="acme",status_class="5xx",method="POST"} 5 +chaperone_requests_total{vendor_id="beta",status_class="2xx",method="GET"} 800 +chaperone_requests_total{vendor_id="beta",status_class="4xx",method="GET"} 20 +# HELP chaperone_panics_total Total number of recovered panics +# TYPE chaperone_panics_total counter +chaperone_panics_total 2 +# HELP chaperone_request_duration_seconds Total request duration +# TYPE chaperone_request_duration_seconds histogram +chaperone_request_duration_seconds_bucket{vendor_id="acme",le="0.05"} 600 +chaperone_request_duration_seconds_bucket{vendor_id="acme",le="0.1"} 1000 +chaperone_request_duration_seconds_bucket{vendor_id="acme",le="0.25"} 1500 +chaperone_request_duration_seconds_bucket{vendor_id="acme",le="0.5"} 1700 +chaperone_request_duration_seconds_bucket{vendor_id="acme",le="1"} 1800 +chaperone_request_duration_seconds_bucket{vendor_id="acme",le="+Inf"} 1855 +chaperone_request_duration_seconds_sum{vendor_id="acme"} 250.5 +chaperone_request_duration_seconds_count{vendor_id="acme"} 1855 +chaperone_request_duration_seconds_bucket{vendor_id="beta",le="0.05"} 400 +chaperone_request_duration_seconds_bucket{vendor_id="beta",le="0.1"} 700 +chaperone_request_duration_seconds_bucket{vendor_id="beta",le="0.25"} 780 +chaperone_request_duration_seconds_bucket{vendor_id="beta",le="0.5"} 800 +chaperone_request_duration_seconds_bucket{vendor_id="beta",le="1"} 810 +chaperone_request_duration_seconds_bucket{vendor_id="beta",le="+Inf"} 820 +chaperone_request_duration_seconds_sum{vendor_id="beta"} 80.0 +chaperone_request_duration_seconds_count{vendor_id="beta"} 820 +# HELP chaperone_active_connections Number of active connections +# TYPE chaperone_active_connections gauge +chaperone_active_connections 15 +` + +func TestParse_FullSample_ExtractsAllMetrics(t *testing.T) { + t.Parallel() + now := time.Now() + + snap, err := Parse([]byte(sampleMetrics), now) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + if snap.Time != now { + t.Errorf("Time = %v, want %v", snap.Time, now) + } + if snap.ActiveConnections != 15 { + t.Errorf("ActiveConnections = %v, want 15", snap.ActiveConnections) + } + if snap.PanicsTotal != 2 { + t.Errorf("PanicsTotal = %v, want 2", snap.PanicsTotal) + } + if len(snap.Vendors) != 2 { + t.Fatalf("Vendors count = %d, want 2", len(snap.Vendors)) + } +} + +func TestParse_Requests_SumsAcrossMethods(t *testing.T) { + t.Parallel() + + snap, err := Parse([]byte(sampleMetrics), time.Now()) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + acme := snap.Vendors["acme"] + if acme == nil { + t.Fatal("missing vendor 'acme'") + } + // GET 1500 + POST 300 (2xx) + GET 50 (4xx) + POST 5 (5xx) = 1855 + if acme.RequestsTotal != 1855 { + t.Errorf("acme.RequestsTotal = %v, want 1855", acme.RequestsTotal) + } + // 4xx: 50 + 5xx: 5 = 55 + if acme.RequestsErrors != 55 { + t.Errorf("acme.RequestsErrors = %v, want 55", acme.RequestsErrors) + } + + beta := snap.Vendors["beta"] + if beta == nil { + t.Fatal("missing vendor 'beta'") + } + // GET 800 (2xx) + GET 20 (4xx) = 820 + if beta.RequestsTotal != 820 { + t.Errorf("beta.RequestsTotal = %v, want 820", beta.RequestsTotal) + } + if beta.RequestsErrors != 20 { + t.Errorf("beta.RequestsErrors = %v, want 20", beta.RequestsErrors) + } +} + +func TestParse_Histogram_ParsesBucketsCorrectly(t *testing.T) { + t.Parallel() + + snap, err := Parse([]byte(sampleMetrics), time.Now()) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + + acme := snap.Vendors["acme"] + h := acme.Duration + if h.Count != 1855 { + t.Errorf("acme histogram Count = %v, want 1855", h.Count) + } + if h.Sum != 250.5 { + t.Errorf("acme histogram Sum = %v, want 250.5", h.Sum) + } + // +Inf bucket should be stripped + if len(h.Buckets) != 5 { + t.Fatalf("acme histogram Buckets = %d, want 5", len(h.Buckets)) + } + if h.Buckets[0].UpperBound != 0.05 { + t.Errorf("first bucket UpperBound = %v, want 0.05", h.Buckets[0].UpperBound) + } + if h.Buckets[0].Count != 600 { + t.Errorf("first bucket Count = %v, want 600", h.Buckets[0].Count) + } +} + +func TestParse_EmptyInput_ReturnsEmptySnapshot(t *testing.T) { + t.Parallel() + + snap, err := Parse([]byte(""), time.Now()) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if len(snap.Vendors) != 0 { + t.Errorf("Vendors count = %d, want 0", len(snap.Vendors)) + } + if snap.ActiveConnections != 0 { + t.Errorf("ActiveConnections = %v, want 0", snap.ActiveConnections) + } +} + +func TestParse_UnknownMetrics_Ignored(t *testing.T) { + t.Parallel() + input := `# HELP custom_metric A custom metric +# TYPE custom_metric gauge +custom_metric 42 +# HELP chaperone_active_connections Number of active connections +# TYPE chaperone_active_connections gauge +chaperone_active_connections 7 +` + snap, err := Parse([]byte(input), time.Now()) + if err != nil { + t.Fatalf("Parse() error = %v", err) + } + if snap.ActiveConnections != 7 { + t.Errorf("ActiveConnections = %v, want 7", snap.ActiveConnections) + } +} + +func TestParse_MalformedInput_ReturnsError(t *testing.T) { + t.Parallel() + // Invalid Prometheus format: duplicate TYPE declaration + input := `# TYPE chaperone_active_connections gauge +# TYPE chaperone_active_connections counter +chaperone_active_connections 7 +` + _, err := Parse([]byte(input), time.Now()) + if err == nil { + t.Error("expected error for malformed input, got nil") + } +} diff --git a/admin/metrics/ring.go b/admin/metrics/ring.go new file mode 100644 index 0000000..7e1e468 --- /dev/null +++ b/admin/metrics/ring.go @@ -0,0 +1,48 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +// Ring is a fixed-capacity circular buffer of Snapshots. +// It is not safe for concurrent use; the Collector handles synchronization. +type Ring struct { + data []Snapshot + write int // next write position + count int // number of stored snapshots +} + +// NewRing creates a Ring with the given capacity. +func NewRing(capacity int) *Ring { + return &Ring{data: make([]Snapshot, capacity)} +} + +// Push adds a snapshot, overwriting the oldest if at capacity. +func (r *Ring) Push(s Snapshot) { + r.data[r.write] = s + r.write = (r.write + 1) % len(r.data) + if r.count < len(r.data) { + r.count++ + } +} + +// Len returns the number of stored snapshots. +func (r *Ring) Len() int { return r.count } + +// At returns the i-th snapshot where 0 is the oldest. +// Panics if i is out of range. +func (r *Ring) At(i int) Snapshot { + if i < 0 || i >= r.count { + panic("ring: index out of range") + } + start := (r.write - r.count + len(r.data)) % len(r.data) + return r.data[(start+i)%len(r.data)] +} + +// Last returns the most recent snapshot and true, or a zero Snapshot and false +// if the ring is empty. +func (r *Ring) Last() (Snapshot, bool) { + if r.count == 0 { + return Snapshot{}, false + } + return r.At(r.count - 1), true +} diff --git a/admin/metrics/ring_test.go b/admin/metrics/ring_test.go new file mode 100644 index 0000000..a24848f --- /dev/null +++ b/admin/metrics/ring_test.go @@ -0,0 +1,95 @@ +// Copyright 2026 CloudBlue LLC +// SPDX-License-Identifier: Apache-2.0 + +package metrics + +import ( + "testing" + "time" +) + +func TestRing_PushAndLen(t *testing.T) { + t.Parallel() + r := NewRing(5) + + if r.Len() != 0 { + t.Fatalf("Len() = %d, want 0", r.Len()) + } + + for i := 0; i < 3; i++ { + r.Push(Snapshot{Time: time.Unix(int64(i), 0)}) + } + if r.Len() != 3 { + t.Fatalf("Len() = %d, want 3", r.Len()) + } +} + +func TestRing_At_ReturnsOldestFirst(t *testing.T) { + t.Parallel() + r := NewRing(5) + + for i := 0; i < 3; i++ { + r.Push(Snapshot{Time: time.Unix(int64(i+10), 0)}) + } + + if got := r.At(0).Time.Unix(); got != 10 { + t.Errorf("At(0).Time = %d, want 10", got) + } + if got := r.At(2).Time.Unix(); got != 12 { + t.Errorf("At(2).Time = %d, want 12", got) + } +} + +func TestRing_Wraparound_OverwritesOldest(t *testing.T) { + t.Parallel() + r := NewRing(3) + + for i := 0; i < 5; i++ { + r.Push(Snapshot{Time: time.Unix(int64(i), 0)}) + } + + if r.Len() != 3 { + t.Fatalf("Len() = %d, want 3", r.Len()) + } + // After pushing 0,1,2,3,4 into capacity 3, oldest should be 2. + if got := r.At(0).Time.Unix(); got != 2 { + t.Errorf("At(0).Time = %d, want 2 (oldest after wrap)", got) + } + if got := r.At(2).Time.Unix(); got != 4 { + t.Errorf("At(2).Time = %d, want 4 (newest)", got) + } +} + +func TestRing_Last_ReturnsNewest(t *testing.T) { + t.Parallel() + r := NewRing(5) + + _, ok := r.Last() + if ok { + t.Error("Last() on empty ring should return false") + } + + r.Push(Snapshot{Time: time.Unix(100, 0)}) + r.Push(Snapshot{Time: time.Unix(200, 0)}) + + last, ok := r.Last() + if !ok { + t.Fatal("Last() returned false on non-empty ring") + } + if last.Time.Unix() != 200 { + t.Errorf("Last().Time = %d, want 200", last.Time.Unix()) + } +} + +func TestRing_At_PanicsOnOutOfRange(t *testing.T) { + t.Parallel() + r := NewRing(3) + r.Push(Snapshot{}) + + defer func() { + if recover() == nil { + t.Error("expected panic for out of range index") + } + }() + r.At(1) // only 1 element, index 1 is out of range +} diff --git a/admin/poller/poller.go b/admin/poller/poller.go index efcbbd9..68d6e64 100644 --- a/admin/poller/poller.go +++ b/admin/poller/poller.go @@ -8,6 +8,7 @@ import ( "encoding/json" "errors" "fmt" + "io" "log/slog" "math/rand/v2" "net" @@ -15,6 +16,7 @@ import ( "sync" "time" + "github.com/cloudblue/chaperone/admin/metrics" "github.com/cloudblue/chaperone/admin/store" ) @@ -48,23 +50,26 @@ func Probe(ctx context.Context, client *http.Client, address string) ProbeResult // Poller periodically polls registered proxy instances for health and version. type Poller struct { - store *store.Store - client *http.Client - interval time.Duration - timeout time.Duration + store *store.Store + collector *metrics.Collector + client *http.Client + interval time.Duration + timeout time.Duration mu sync.Mutex failures map[int64]int // instance ID → consecutive failure count } // New creates a Poller with the given configuration. -func New(st *store.Store, interval, timeout time.Duration) *Poller { +// If collector is non-nil, each successful poll also scrapes /metrics. +func New(st *store.Store, collector *metrics.Collector, interval, timeout time.Duration) *Poller { return &Poller{ - store: st, - client: &http.Client{Timeout: timeout}, - interval: interval, - timeout: timeout, - failures: make(map[int64]int), + store: st, + collector: collector, + client: &http.Client{Timeout: timeout}, + interval: interval, + timeout: timeout, + failures: make(map[int64]int), } } @@ -95,16 +100,18 @@ func (p *Poller) pollAll(ctx context.Context) { slog.Error("poller: listing instances", "error", err) return } - // Prune failure counts for instances no longer in the registry. + // Prune failure counts and stale metric buffers. p.pruneFailures(instances) + p.pruneCollector(instances) if len(instances) == 0 { return } type result struct { - id int64 - probe ProbeResult + id int64 + probe ProbeResult + metrics []byte // raw /metrics text, nil if unavailable } results := make(chan result, len(instances)) @@ -120,7 +127,11 @@ func (p *Poller) pollAll(ctx context.Context) { sleep(ctx, jitter) pr := Probe(ctx, p.client, inst.Address) - results <- result{id: inst.ID, probe: pr} + var raw []byte + if pr.OK && p.collector != nil { + raw = fetchMetrics(ctx, p.client, inst.Address) + } + results <- result{id: inst.ID, probe: pr, metrics: raw} }() } @@ -129,8 +140,14 @@ func (p *Poller) pollAll(ctx context.Context) { close(results) }() + now := time.Now() for r := range results { p.applyResult(ctx, r.id, r.probe) + if r.metrics != nil { + if err := p.collector.RecordScrape(r.id, r.metrics, now); err != nil { + slog.Warn("poller: parsing metrics", "id", r.id, "error", err) + } + } } } @@ -229,6 +246,42 @@ func fetchVersion(ctx context.Context, client *http.Client, address string) (str return body.Version, nil } +func (p *Poller) pruneCollector(active []store.Instance) { + if p.collector == nil { + return + } + ids := make(map[int64]bool, len(active)) + for i := range active { + ids[active[i].ID] = true + } + p.collector.Prune(ids) +} + +// fetchMetrics calls GET /metrics on a proxy admin port and returns the raw body. +func fetchMetrics(ctx context.Context, client *http.Client, address string) []byte { + url := fmt.Sprintf("http://%s/metrics", address) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, http.NoBody) + if err != nil { + return nil + } + + resp, err := client.Do(req) // #nosec G704 -- address comes from admin-managed instance registry + if err != nil { + return nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil + } + + data, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 1 MB limit + if err != nil { + return nil + } + return data +} + // friendlyError converts network errors into user-facing messages. func friendlyError(err error) string { var netErr net.Error diff --git a/admin/poller/poller_test.go b/admin/poller/poller_test.go index e81947a..84ba627 100644 --- a/admin/poller/poller_test.go +++ b/admin/poller/poller_test.go @@ -12,6 +12,7 @@ import ( "testing" "time" + "github.com/cloudblue/chaperone/admin/metrics" "github.com/cloudblue/chaperone/admin/store" ) @@ -43,6 +44,35 @@ func fakeProxy(t *testing.T) *httptest.Server { return srv } +const sampleMetrics = `# HELP chaperone_requests_total Total number of requests processed +# TYPE chaperone_requests_total counter +chaperone_requests_total{vendor_id="acme",status_class="2xx",method="GET"} 1000 +# HELP chaperone_active_connections Number of active connections +# TYPE chaperone_active_connections gauge +chaperone_active_connections 5 +` + +func fakeProxyWithMetrics(t *testing.T) *httptest.Server { + t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/_ops/health": + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"status":"alive"}`)) + case "/_ops/version": + w.Header().Set("Content-Type", "application/json") + w.Write([]byte(`{"version":"1.0.0"}`)) + case "/metrics": + w.Header().Set("Content-Type", "text/plain") + w.Write([]byte(sampleMetrics)) + default: + http.NotFound(w, r) + } + })) + t.Cleanup(srv.Close) + return srv +} + func TestProbe_HealthyProxy_ReturnsOK(t *testing.T) { t.Parallel() proxy := fakeProxy(t) @@ -102,7 +132,7 @@ func TestPoller_SinglePoll_SetsHealthy(t *testing.T) { t.Fatalf("CreateInstance() error = %v", err) } - p := New(st, 1*time.Hour, 2*time.Second) // long interval; we call pollAll manually. + p := New(st, nil, 1*time.Hour, 2*time.Second) // long interval; we call pollAll manually. p.pollAll(ctx) got, err := st.GetInstance(ctx, inst.ID) @@ -127,7 +157,7 @@ func TestPoller_ThreeFailures_SetsUnreachable(t *testing.T) { t.Fatalf("CreateInstance() error = %v", err) } - p := New(st, 1*time.Hour, 500*time.Millisecond) + p := New(st, nil, 1*time.Hour, 500*time.Millisecond) // Poll 3 times to reach unreachable threshold. for i := 0; i < failuresUntilUnreachable; i++ { @@ -153,7 +183,7 @@ func TestPoller_TwoFailures_StaysUnknown(t *testing.T) { t.Fatalf("CreateInstance() error = %v", err) } - p := New(st, 1*time.Hour, 500*time.Millisecond) + p := New(st, nil, 1*time.Hour, 500*time.Millisecond) // Poll only twice — should not yet transition to unreachable. for i := 0; i < failuresUntilUnreachable-1; i++ { @@ -181,7 +211,7 @@ func TestPoller_RecoveryAfterUnreachable_SetsHealthy(t *testing.T) { t.Fatalf("CreateInstance() error = %v", err) } - p := New(st, 1*time.Hour, 500*time.Millisecond) + p := New(st, nil, 1*time.Hour, 500*time.Millisecond) // Drive to unreachable. for i := 0; i < failuresUntilUnreachable; i++ { @@ -216,7 +246,7 @@ func TestPoller_DeletedInstance_PrunesFailures(t *testing.T) { t.Fatalf("CreateInstance() error = %v", err) } - p := New(st, 1*time.Hour, 500*time.Millisecond) + p := New(st, nil, 1*time.Hour, 500*time.Millisecond) // Accumulate failures. p.pollAll(ctx) @@ -246,7 +276,7 @@ func TestPoller_RunStopsOnContextCancel(t *testing.T) { t.Parallel() st := openTestStore(t) - p := New(st, 50*time.Millisecond, 500*time.Millisecond) + p := New(st, nil, 50*time.Millisecond, 500*time.Millisecond) ctx, cancel := context.WithCancel(context.Background()) done := make(chan struct{}) @@ -264,3 +294,84 @@ func TestPoller_RunStopsOnContextCancel(t *testing.T) { t.Fatal("Run did not stop after context cancellation") } } + +func TestPoller_MetricsScraping_RecordsToCollector(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + proxy := fakeProxyWithMetrics(t) + addr := strings.TrimPrefix(proxy.URL, "http://") + + ctx := context.Background() + inst, err := st.CreateInstance(ctx, "test-proxy", addr) + if err != nil { + t.Fatalf("CreateInstance() error = %v", err) + } + + p := New(st, c, 1*time.Hour, 2*time.Second) + p.pollAll(ctx) + + // Verify the collector received a snapshot. + im := c.GetInstanceMetrics(inst.ID) + if im == nil { + t.Fatal("expected metrics to be recorded after poll") + } + if im.DataPoints != 1 { + t.Errorf("DataPoints = %d, want 1", im.DataPoints) + } + if im.ActiveConnections != 5 { + t.Errorf("ActiveConnections = %v, want 5", im.ActiveConnections) + } +} + +func TestPoller_MetricsScraping_SkippedOnHealthFailure(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + + ctx := context.Background() + inst, err := st.CreateInstance(ctx, "test-proxy", "127.0.0.1:1") + if err != nil { + t.Fatalf("CreateInstance() error = %v", err) + } + + p := New(st, c, 1*time.Hour, 500*time.Millisecond) + p.pollAll(ctx) + + // Collector should have no data — probe failed, so /metrics was not fetched. + if im := c.GetInstanceMetrics(inst.ID); im != nil { + t.Error("expected no metrics for unreachable instance") + } +} + +func TestPoller_DeletedInstance_PrunesCollector(t *testing.T) { + t.Parallel() + st := openTestStore(t) + c := metrics.NewCollector(10) + proxy := fakeProxyWithMetrics(t) + addr := strings.TrimPrefix(proxy.URL, "http://") + + ctx := context.Background() + inst, err := st.CreateInstance(ctx, "test-proxy", addr) + if err != nil { + t.Fatalf("CreateInstance() error = %v", err) + } + + p := New(st, c, 1*time.Hour, 2*time.Second) + p.pollAll(ctx) + + // Verify data exists. + if im := c.GetInstanceMetrics(inst.ID); im == nil { + t.Fatal("expected metrics after poll") + } + + // Delete instance and poll again — collector should be pruned. + if err := st.DeleteInstance(ctx, inst.ID); err != nil { + t.Fatalf("DeleteInstance() error = %v", err) + } + p.pollAll(ctx) + + if im := c.GetInstanceMetrics(inst.ID); im != nil { + t.Error("expected metrics to be pruned after instance deletion") + } +} diff --git a/admin/server.go b/admin/server.go index f2cdbeb..e730e7c 100644 --- a/admin/server.go +++ b/admin/server.go @@ -15,6 +15,7 @@ import ( "github.com/cloudblue/chaperone/admin/api" "github.com/cloudblue/chaperone/admin/config" + "github.com/cloudblue/chaperone/admin/metrics" "github.com/cloudblue/chaperone/admin/store" ) @@ -23,10 +24,11 @@ type Server struct { httpServer *http.Server config *config.Config store *store.Store + collector *metrics.Collector } // NewServer creates a new admin portal server. -func NewServer(cfg *config.Config, st *store.Store) (*Server, error) { +func NewServer(cfg *config.Config, st *store.Store, collector *metrics.Collector) (*Server, error) { mux := http.NewServeMux() s := &Server{ @@ -38,8 +40,9 @@ func NewServer(cfg *config.Config, st *store.Store) (*Server, error) { WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, }, - config: cfg, - store: st, + config: cfg, + store: st, + collector: collector, } if err := s.routes(mux); err != nil { @@ -66,6 +69,10 @@ func (s *Server) routes(mux *http.ServeMux) error { instances := api.NewInstanceHandler(s.store, s.config.Scraper.Timeout.Unwrap()) instances.Register(mux) + // Metrics API. + metricsAPI := api.NewMetricsHandler(s.store, s.collector) + metricsAPI.Register(mux) + // SPA serving — all non-API routes serve the Vue app. assets, err := loadUIAssets() if err != nil { From 9534d9e7eaa77ea307230e6a720e7dee3eaf2b2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Cuevas?= Date: Mon, 23 Mar 2026 18:17:56 +0100 Subject: [PATCH 2/4] feat(admin): add metrics dashboard UI with fleet KPIs, instance detail, and vendor charts - Add ECharts + vue-echarts with tree-shaken imports (line chart, grid, tooltip, legend, data zoom) - Add metrics formatting utilities (formatRps, formatLatency, formatErrorRate, formatCount, trendDirection) with 25 tests - Add metrics Pinia store for fleet and instance metrics with 6 tests - Add FleetKpiPanel with 4 KPI cards (RPS, error rate, connections, panics) and trend indicators - Add instance detail view at /instances/:id with breadcrumb, status header, and Overview/Traffic tabs - Overview tab: 5 KPI cards + P50/P95/P99 latency time-series chart - Traffic tab: interactive vendor table with checkboxes, color dots, totals row; 3 stacked charts (RPS, Latency, Error Rate) per selected vendor - Latency chart shows P50/P95/P99 for single vendor, P99-only for multiple (per design spec) - Auto-select top 3 vendors by RPS, prune stale selections when vendor list changes - Make instance cards and table rows clickable to navigate to detail view - Add P50/P95 fields to SeriesPoint in Go backend for overview latency chart - Escape HTML in ECharts tooltip formatters to prevent XSS via vendor IDs - Add tabpanel ARIA roles for accessibility --- admin/metrics/collector.go | 2 + admin/metrics/metrics.go | 2 + admin/ui/package.json | 12 +- admin/ui/pnpm-lock.yaml | 37 +++ admin/ui/src/components/FleetKpiPanel.vue | 71 +++++ admin/ui/src/components/InstanceCard.vue | 19 +- admin/ui/src/components/InstanceTable.vue | 17 +- admin/ui/src/components/KpiCard.vue | 120 +++++++++ admin/ui/src/components/OverviewTab.vue | 189 +++++++++++++ admin/ui/src/components/TrafficTab.vue | 306 ++++++++++++++++++++++ admin/ui/src/components/VendorTable.vue | 218 +++++++++++++++ admin/ui/src/layouts/AppLayout.vue | 6 +- admin/ui/src/router/index.js | 6 + admin/ui/src/stores/metrics.js | 46 ++++ admin/ui/src/stores/metrics.test.js | 94 +++++++ admin/ui/src/utils/chart-setup.js | 20 ++ admin/ui/src/utils/html.js | 7 + admin/ui/src/utils/metrics.js | 64 +++++ admin/ui/src/utils/metrics.test.js | 142 ++++++++++ admin/ui/src/views/DashboardView.vue | 19 ++ admin/ui/src/views/InstanceDetailView.vue | 231 ++++++++++++++++ 21 files changed, 1614 insertions(+), 14 deletions(-) create mode 100644 admin/ui/src/components/FleetKpiPanel.vue create mode 100644 admin/ui/src/components/KpiCard.vue create mode 100644 admin/ui/src/components/OverviewTab.vue create mode 100644 admin/ui/src/components/TrafficTab.vue create mode 100644 admin/ui/src/components/VendorTable.vue create mode 100644 admin/ui/src/stores/metrics.js create mode 100644 admin/ui/src/stores/metrics.test.js create mode 100644 admin/ui/src/utils/chart-setup.js create mode 100644 admin/ui/src/utils/html.js create mode 100644 admin/ui/src/utils/metrics.js create mode 100644 admin/ui/src/utils/metrics.test.js create mode 100644 admin/ui/src/views/InstanceDetailView.vue diff --git a/admin/metrics/collector.go b/admin/metrics/collector.go index 8be1198..27df8ff 100644 --- a/admin/metrics/collector.go +++ b/admin/metrics/collector.go @@ -351,6 +351,8 @@ func (*Collector) buildSeries(buf *Ring) []SeriesPoint { Time: curr.Time.Unix(), RPS: counterRate(prev.totalRequests(), curr.totalRequests(), dt), ErrorRate: errorRate(prev.totalRequests(), curr.totalRequests(), prev.totalErrors(), curr.totalErrors()), + P50: secondsToMs(histogramQuantile(0.50, dh)), + P95: secondsToMs(histogramQuantile(0.95, dh)), P99: secondsToMs(histogramQuantile(0.99, dh)), ActiveConnections: curr.ActiveConnections, }) diff --git a/admin/metrics/metrics.go b/admin/metrics/metrics.go index 5c0e5e2..243a68d 100644 --- a/admin/metrics/metrics.go +++ b/admin/metrics/metrics.go @@ -139,6 +139,8 @@ type SeriesPoint struct { Time int64 `json:"t"` RPS float64 `json:"rps"` ErrorRate float64 `json:"err"` + P50 float64 `json:"p50"` + P95 float64 `json:"p95"` P99 float64 `json:"p99"` ActiveConnections float64 `json:"conn"` } diff --git a/admin/ui/package.json b/admin/ui/package.json index ce27df9..f2b5bb6 100644 --- a/admin/ui/package.json +++ b/admin/ui/package.json @@ -14,20 +14,24 @@ "test:watch": "vitest" }, "dependencies": { + "echarts": "^6.0.0", + "pinia": "^3.0.4", "vue": "^3.5.29", - "vue-router": "^5.0.3", - "pinia": "^3.0.4" + "vue-echarts": "^8.0.1", + "vue-router": "^5.0.3" }, "pnpm": { - "onlyBuiltDependencies": ["esbuild"] + "onlyBuiltDependencies": [ + "esbuild" + ] }, "devDependencies": { "@vitejs/plugin-vue": "^6.0.4", "eslint": "^10.0.2", "eslint-config-prettier": "^10.1.8", "eslint-plugin-vue": "^10.8.0", - "prettier": "^3.8.1", "jsdom": "^26.1.0", + "prettier": "^3.8.1", "vite": "^7.3.1", "vitest": "^3.2.1" } diff --git a/admin/ui/pnpm-lock.yaml b/admin/ui/pnpm-lock.yaml index 8404993..3c06d2a 100644 --- a/admin/ui/pnpm-lock.yaml +++ b/admin/ui/pnpm-lock.yaml @@ -8,12 +8,18 @@ importers: .: dependencies: + echarts: + specifier: ^6.0.0 + version: 6.0.0 pinia: specifier: ^3.0.4 version: 3.0.4(vue@3.5.29) vue: specifier: ^3.5.29 version: 3.5.29 + vue-echarts: + specifier: ^8.0.1 + version: 8.0.1(echarts@6.0.0)(vue@3.5.29) vue-router: specifier: ^5.0.3 version: 5.0.3(@vue/compiler-sfc@3.5.29)(pinia@3.0.4(vue@3.5.29))(vue@3.5.29) @@ -671,6 +677,9 @@ packages: deep-is@0.1.4: resolution: {integrity: sha512-oIPzksmTg4/MriiaYGO+okXDT7ztn/w3Eptv/+gSIdMdKsJo0u4CfYNFJPy+4SKMuCqGw2wxnA+URMg3t8a/bQ==} + echarts@6.0.0: + resolution: {integrity: sha512-Tte/grDQRiETQP4xz3iZWSvoHrkCQtwqd6hs+mifXcjrCuo2iKWbajFObuLJVBlDIJlOzgQPd1hsaKt/3+OMkQ==} + entities@6.0.1: resolution: {integrity: sha512-aN97NXWF6AWBTahfVOIrB/NShkzi5H7F9r1s9mD3cDj4Ko5f2qhhVoYMibXF7GlLveb/D2ioWay8lxI97Ven3g==} engines: {node: '>=0.12'} @@ -1126,6 +1135,9 @@ packages: resolution: {integrity: sha512-hdF5ZgjTqgAntKkklYw0R03MG2x/bSzTtkxmIRw/sTNV8YXsCJ1tfLAX23lhxhHJlEf3CRCOCGGWw3vI3GaSPw==} engines: {node: '>=18'} + tslib@2.3.0: + resolution: {integrity: sha512-N82ooyxVNm6h1riLCoyS9e3fuJ3AMG2zIZs2Gd1ATcSFjSA23Q0fzjjZeh0jbJvWVDZ0cJT8yaNNaaXHzueNjg==} + type-check@0.4.0: resolution: {integrity: sha512-XleUoc9uwGXqjWwXaUTZAmzMcFZ5858QA2vvx1Ur5xIcixXIP+8LnFDgRplU30us6teqdlskFfu+ae4K79Ooew==} engines: {node: '>= 0.8.0'} @@ -1220,6 +1232,12 @@ packages: jsdom: optional: true + vue-echarts@8.0.1: + resolution: {integrity: sha512-23rJTFLu1OUEGRWjJGmdGt8fP+8+ja1gVgzMYPIPaHWpXegcO1viIAaeu2H4QHESlVeHzUAHIxKXGrwjsyXAaA==} + peerDependencies: + echarts: ^6.0.0 + vue: ^3.3.0 + vue-eslint-parser@10.4.0: resolution: {integrity: sha512-Vxi9pJdbN3ZnVGLODVtZ7y4Y2kzAAE2Cm0CZ3ZDRvydVYxZ6VrnBhLikBsRS+dpwj4Jv4UCv21PTEwF5rQ9WXg==} engines: {node: ^18.18.0 || ^20.9.0 || >=21.1.0} @@ -1319,6 +1337,9 @@ packages: resolution: {integrity: sha512-rVksvsnNCdJ/ohGc6xgPwyN8eheCxsiLM8mxuE/t/mOVqJewPuO1miLpTHQiRgTKCLexL4MeAFVagts7HmNZ2Q==} engines: {node: '>=10'} + zrender@6.0.0: + resolution: {integrity: sha512-41dFXEEXuJpNecuUQq6JlbybmnHaqqpGlbH1yxnA5V9MMP4SbohSVZsJIwz+zdjQXSSlR1Vc34EgH1zxyTDvhg==} + snapshots: '@asamuzakjp/css-color@3.2.0': @@ -1832,6 +1853,11 @@ snapshots: deep-is@0.1.4: {} + echarts@6.0.0: + dependencies: + tslib: 2.3.0 + zrender: 6.0.0 + entities@6.0.1: {} entities@7.0.1: {} @@ -2303,6 +2329,8 @@ snapshots: dependencies: punycode: 2.3.1 + tslib@2.3.0: {} + type-check@0.4.0: dependencies: prelude-ls: 1.2.1 @@ -2400,6 +2428,11 @@ snapshots: - tsx - yaml + vue-echarts@8.0.1(echarts@6.0.0)(vue@3.5.29): + dependencies: + echarts: 6.0.0 + vue: 3.5.29 + vue-eslint-parser@10.4.0(eslint@10.0.2): dependencies: debug: 4.4.3 @@ -2485,3 +2518,7 @@ snapshots: yaml@2.8.2: {} yocto-queue@0.1.0: {} + + zrender@6.0.0: + dependencies: + tslib: 2.3.0 diff --git a/admin/ui/src/components/FleetKpiPanel.vue b/admin/ui/src/components/FleetKpiPanel.vue new file mode 100644 index 0000000..524e2d7 --- /dev/null +++ b/admin/ui/src/components/FleetKpiPanel.vue @@ -0,0 +1,71 @@ + + + + + diff --git a/admin/ui/src/components/InstanceCard.vue b/admin/ui/src/components/InstanceCard.vue index edc440d..f74b593 100644 --- a/admin/ui/src/components/InstanceCard.vue +++ b/admin/ui/src/components/InstanceCard.vue @@ -1,5 +1,5 @@