From d96f96a618805d679cb19553637a56770942c1b8 Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 15 Jan 2026 02:15:26 +0100 Subject: [PATCH 1/6] fix CPU query --- framework/leak/detector.go | 18 +++++++++--------- framework/leak/detector_cl_node.go | 2 +- framework/leak/detector_test.go | 9 ++++----- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/framework/leak/detector.go b/framework/leak/detector.go index 803184469..ceede3591 100644 --- a/framework/leak/detector.go +++ b/framework/leak/detector.go @@ -91,10 +91,10 @@ func (rc *ResourceLeakChecker) MeasureLeak( resStart := memStart.Data.Result resEnd := memEnd.Data.Result if len(resStart) == 0 { - return 0, fmt.Errorf("no results for start timestamp: %s", c.Start) + return 0, fmt.Errorf("no results for start timestamp: %s, query: %s", startWithWarmUp, c.Query) } if len(resEnd) == 0 { - return 0, fmt.Errorf("no results for end timestamp: %s", c.End) + return 0, fmt.Errorf("no results for end timestamp: %s, query: %s", c.End, c.Query) } if len(resStart[0].Value) < 2 { @@ -113,21 +113,21 @@ func (rc *ResourceLeakChecker) MeasureLeak( return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.End, memEnd.Data.Result[0].Value[1]) } - memStartValFloat, err := strconv.ParseFloat(memStartVal, 64) + startValFloat, err := strconv.ParseFloat(memStartVal, 64) if err != nil { return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err) } - memEndValFloat, err := strconv.ParseFloat(memEndVal, 64) + endValFloat, err := strconv.ParseFloat(memEndVal, 64) if err != nil { return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err) } - totalIncreasePercentage := (memEndValFloat / memStartValFloat * 100) - 100 + totalIncreasePercentage := (endValFloat / startValFloat * 100) - 100 - f.L.Debug(). - Float64("Start", memStartValFloat). - Float64("End", memEndValFloat). + f.L.Info(). + Float64("Start", startValFloat). + Float64("End", endValFloat). Float64("Increase", totalIncreasePercentage). - Msg("Memory increase total (percentage)") + Msg("Increase total (percentage)") return totalIncreasePercentage, nil } diff --git a/framework/leak/detector_cl_node.go b/framework/leak/detector_cl_node.go index 888b648c3..b1c045bd8 100644 --- a/framework/leak/detector_cl_node.go +++ b/framework/leak/detector_cl_node.go @@ -55,7 +55,7 @@ func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDet } switch cd.Mode { case "devenv": - cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name=~"don-node%d"}[5m])) * 100` + cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[1h])) * 100` cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024` case "griddle": return nil, fmt.Errorf("not implemented yet") diff --git a/framework/leak/detector_test.go b/framework/leak/detector_test.go index b48c98c19..c9c909d07 100644 --- a/framework/leak/detector_test.go +++ b/framework/leak/detector_test.go @@ -130,11 +130,10 @@ func TestRealCLNodesLeakDetectionLocalDevenv(t *testing.T) { require.NoError(t, err) errs := cnd.Check(&leak.CLNodesCheck{ NumNodes: 4, - Start: mustTime("2026-01-12T20:53:00Z"), - End: mustTime("2026-01-13T10:11:00Z"), - WarmUpDuration: 1 * time.Hour, - CPUThreshold: 10.0, - MemoryThreshold: 10.0, + Start: mustTime("2026-01-15T01:14:00Z"), + End: mustTime("2026-01-15T02:04:00Z"), + CPUThreshold: 20.0, + MemoryThreshold: 20.0, }) require.NoError(t, errs) fmt.Println(errs) From c7eef55a207153f100a563137de4be8ac17b891e Mon Sep 17 00:00:00 2001 From: skudasov Date: Thu, 15 Jan 2026 13:17:12 +0100 Subject: [PATCH 2/6] changeset --- framework/.changeset/v0.13.2.md | 2 ++ 1 file changed, 2 insertions(+) create mode 100644 framework/.changeset/v0.13.2.md diff --git a/framework/.changeset/v0.13.2.md b/framework/.changeset/v0.13.2.md new file mode 100644 index 000000000..e33983ea9 --- /dev/null +++ b/framework/.changeset/v0.13.2.md @@ -0,0 +1,2 @@ +- Add pprof dumps download +- Generalize and change CPU query From 4221e14f33681bfde4157e2a0daf05a0aabdce59 Mon Sep 17 00:00:00 2001 From: skudasov Date: Fri, 16 Jan 2026 15:05:38 +0100 Subject: [PATCH 3/6] wip --- book/src/SUMMARY.md | 1 + book/src/framework/resource_leaks.md | 30 ++++++ framework/leak/cmd/Dockerfile | 14 +++ framework/leak/cmd/Justfile | 8 ++ framework/leak/cmd/go.mod | 3 + framework/leak/cmd/main.go | 149 +++++++++++++++++++++++++++ framework/leak/detector.go | 4 +- framework/leak/detector_cl_node.go | 8 +- framework/leak/detector_hog_test.go | 85 +++++++++++++++ framework/leak/detector_test.go | 4 +- framework/prometheus.go | 16 +-- framework/prometheus_test.go | 6 +- 12 files changed, 303 insertions(+), 25 deletions(-) create mode 100644 book/src/framework/resource_leaks.md create mode 100644 framework/leak/cmd/Dockerfile create mode 100644 framework/leak/cmd/Justfile create mode 100644 framework/leak/cmd/go.mod create mode 100644 framework/leak/cmd/main.go create mode 100644 framework/leak/detector_hog_test.go diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index f178fad7f..23a16e894 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -22,6 +22,7 @@ - [Components Resources](framework/components/resources.md) - [Containers Network Isolation](framework/components/network_isolation.md) - [Fake Services](framework/components/mocking.md) + - [Detecting Resource Leaks](framework/resource_leaks.md) - [Copying Files](framework/copying_files.md) - [External Environment](framework/components/external.md) - [Observability Stack](framework/observability/observability_stack.md) diff --git a/book/src/framework/resource_leaks.md b/book/src/framework/resource_leaks.md new file mode 100644 index 000000000..98ccfe12c --- /dev/null +++ b/book/src/framework/resource_leaks.md @@ -0,0 +1,30 @@ +## Resource Leak Detector + +We have a simple utility to detect resource leaks in our tests + +## CL Nodes Leak Detection + +In this example test will fail if any node will consume more than 2 additional cores and allocate 20% more memory at the end of a test. +```go +import ( + "github.com/smartcontractkit/chainlink-testing-framework/framework/leak" +) +``` + +```go + l, err := leak.NewCLNodesLeakDetector(leak.NewResourceLeakChecker()) + require.NoError(t, err) + errs := l.Check(&leak.CLNodesCheck{ + NumNodes: in.NodeSets[0].Nodes, + Start: start, + End: time.Now(), + WarmUpDuration: 10 * time.Minute, + CPUThreshold: 2000.0, + MemoryThreshold: 20.0, + }) + require.NoError(t, errs) +``` + +## Custom Resource Assertion + +You can also use low-level API to verify \ No newline at end of file diff --git a/framework/leak/cmd/Dockerfile b/framework/leak/cmd/Dockerfile new file mode 100644 index 000000000..6896c3f68 --- /dev/null +++ b/framework/leak/cmd/Dockerfile @@ -0,0 +1,14 @@ +# Build stage +FROM golang:1.25-alpine AS builder + +WORKDIR /app + +COPY go.mod ./ +RUN go mod download +COPY . . +RUN go build -o resource-hog main.go + +FROM alpine:latest +WORKDIR /root/ +COPY --from=builder /app/resource-hog . +CMD ["./resource-hog"] \ No newline at end of file diff --git a/framework/leak/cmd/Justfile b/framework/leak/cmd/Justfile new file mode 100644 index 000000000..28fc53220 --- /dev/null +++ b/framework/leak/cmd/Justfile @@ -0,0 +1,8 @@ +run: + go run main.go + +build: + docker build -t resource-hog:latest . + +clean-hog: + docker rm -f resource-hog-test 2>/dev/null || true \ No newline at end of file diff --git a/framework/leak/cmd/go.mod b/framework/leak/cmd/go.mod new file mode 100644 index 000000000..e880b8d06 --- /dev/null +++ b/framework/leak/cmd/go.mod @@ -0,0 +1,3 @@ +module github.com/smartcontractkit/chainlink-testing-framework/framework/hog + +go 1.25.4 diff --git a/framework/leak/cmd/main.go b/framework/leak/cmd/main.go new file mode 100644 index 000000000..54a025b30 --- /dev/null +++ b/framework/leak/cmd/main.go @@ -0,0 +1,149 @@ +package main + +import ( + "fmt" + "log" + "math" + "os" + "strconv" + "strings" + "time" +) + +const ( + StepTick = 3 * time.Minute +) + +func main() { + fmt.Println("Starting CPU and Memory hog...") + + workersSchedule := os.Getenv("WORKERS") + memorySchedule := os.Getenv("MEMORY") + repeatStr := os.Getenv("REPEAT") + + leaks := make([][]byte, 0) + workerCounter := 0 + + go scheduleMemoryLeaks(memorySchedule, parseRepeat(repeatStr), &leaks) + go scheduleCPUWorkers(workersSchedule, parseRepeat(repeatStr), &workerCounter) + + select {} +} + +func parseRepeat(repeatStr string) int { + if repeatStr == "" { + return 1 + } + repeat, _ := strconv.Atoi(repeatStr) + if repeat < 1 { + return 1 + } + return repeat +} + +func scheduleMemoryLeaks(schedule string, repeat int, leaks *[][]byte) { + if schedule == "" { + return + } + + levels := parseSchedule(schedule) + + for r := 0; r < repeat; r++ { + for _, target := range levels { + timer := time.NewTimer(StepTick) + + current := len(*leaks) / 100 + + if target > current { + for i := 0; i < target-current; i++ { + leak := make([]byte, 100*1024*1024) + for j := range leak { + leak[j] = byte(j % 256) + } + *leaks = append(*leaks, leak) + } + } else if target < current { + removeCount := current - target + if removeCount > len(*leaks)/100 { + removeCount = len(*leaks) / 100 + } + *leaks = (*leaks)[:len(*leaks)-removeCount*100] + } + + log.Printf("Memory: %dx100MB", len(*leaks)/100) + <-timer.C + } + } +} + +func scheduleCPUWorkers(schedule string, repeat int, counter *int) { + if schedule == "" { + return + } + + levels := parseSchedule(schedule) + activeWorkers := 0 + + for r := 0; r < repeat; r++ { + for _, target := range levels { + timer := time.NewTimer(StepTick) + + if target > activeWorkers { + for i := 0; i < target-activeWorkers; i++ { + *counter++ + go cpuWorker(*counter) + } + } else if target < activeWorkers { + stopWorkers(activeWorkers - target) + } + + activeWorkers = target + log.Printf("CPU Workers: %d", activeWorkers) + <-timer.C + } + } +} + +var stopChan = make(chan bool, 1000) + +func cpuWorker(id int) { + for { + select { + case <-stopChan: + return + default: + for n := 2; n < 100000; n++ { + isPrime := true + for i := 2; i <= int(math.Sqrt(float64(n))); i++ { + if n%i == 0 { + isPrime = false + break + } + } + _ = isPrime + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func stopWorkers(count int) { + for i := 0; i < count; i++ { + select { + case stopChan <- true: + default: + } + } +} + +func parseSchedule(schedule string) []int { + parts := strings.Split(schedule, ",") + var result []int + for _, part := range parts { + val, _ := strconv.Atoi(strings.TrimSpace(part)) + if val >= 0 { + result = append(result, val) + } + } + return result +} diff --git a/framework/leak/detector.go b/framework/leak/detector.go index ceede3591..f621d92de 100644 --- a/framework/leak/detector.go +++ b/framework/leak/detector.go @@ -66,9 +66,9 @@ type CheckConfig struct { WarmUpDuration time.Duration } -// MeasureLeak measures resource leak between start and end timestamps +// MeasureDelta measures resource leak delta between start and end timestamps // WarmUpDuration is used to ignore warm up interval results for more stable comparison -func (rc *ResourceLeakChecker) MeasureLeak( +func (rc *ResourceLeakChecker) MeasureDelta( c *CheckConfig, ) (float64, error) { if c.Start.After(c.End) { diff --git a/framework/leak/detector_cl_node.go b/framework/leak/detector_cl_node.go index b1c045bd8..7ea597896 100644 --- a/framework/leak/detector_cl_node.go +++ b/framework/leak/detector_cl_node.go @@ -55,8 +55,8 @@ func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDet } switch cd.Mode { case "devenv": - cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[1h])) * 100` - cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024` + cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[5m])) * 100` + cd.MemoryQuery = `avg_over_time(container_memory_rss{name="don-node%d"}[5m]) / 1024 / 1024` case "griddle": return nil, fmt.Errorf("not implemented yet") default: @@ -74,7 +74,7 @@ func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error { cpuDiffs := make([]float64, 0) errs := make([]error, 0) for i := range t.NumNodes { - memoryDiff, err := cd.c.MeasureLeak(&CheckConfig{ + memoryDiff, err := cd.c.MeasureDelta(&CheckConfig{ Query: fmt.Sprintf(cd.MemoryQuery, i), Start: t.Start, End: t.End, @@ -84,7 +84,7 @@ func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error { return fmt.Errorf("memory leak check failed: %w", err) } memoryDiffs = append(memoryDiffs, memoryDiff) - cpuDiff, err := cd.c.MeasureLeak(&CheckConfig{ + cpuDiff, err := cd.c.MeasureDelta(&CheckConfig{ Query: fmt.Sprintf(cd.CPUQuery, i), Start: t.Start, End: t.End, diff --git a/framework/leak/detector_hog_test.go b/framework/leak/detector_hog_test.go new file mode 100644 index 000000000..6598d47b7 --- /dev/null +++ b/framework/leak/detector_hog_test.go @@ -0,0 +1,85 @@ +package leak_test + +import ( + "context" + "fmt" + "log" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-testing-framework/framework/leak" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestRunHog(t *testing.T) { + ctx := context.Background() + hog, err := SetupResourceHog( + ctx, + "resource-hog:latest", + map[string]string{ + "WORKERS": "1,2,3,2,1", + "MEMORY": "1,2,3,2,1", + "REPEAT": "1", + }, + ) + require.NoError(t, err) + time.Sleep(15 * time.Minute) + t.Cleanup(func() { + if err := hog.Terminate(ctx); err != nil { + log.Printf("Failed to terminate container: %v", err) + } + }) +} + +func TestVerifyHog(t *testing.T) { + lc := leak.NewResourceLeakChecker() + // cpu + diff, err := lc.MeasureDelta(&leak.CheckConfig{ + Query: `sum(rate(container_cpu_usage_seconds_total{name="resource-hog"}[5m])) * 100`, + Start: mustTime("2026-01-16T13:20:30Z"), + End: mustTime("2026-01-16T13:39:45Z"), + }) + fmt.Println(diff) + require.NoError(t, err) + + // mem + diff, err = lc.MeasureDelta(&leak.CheckConfig{ + Query: `avg_over_time(container_memory_rss{name="resource-hog"}[5m]) / 1024 / 1024`, + Start: mustTime("2026-01-16T13:20:30Z"), + End: mustTime("2026-01-16T13:38:25Z"), + }) + fmt.Println(diff) + require.NoError(t, err) +} + +// ResourceHogContainer represents a container that hogs CPU and memory +type ResourceHogContainer struct { + testcontainers.Container + URI string +} + +// SetupResourceHog starts a container that consumes CPU and memory +func SetupResourceHog(ctx context.Context, image string, env map[string]string) (*ResourceHogContainer, error) { + // Build request for the container + req := testcontainers.ContainerRequest{ + Name: "resource-hog", + Image: image, + ExposedPorts: []string{}, + Env: env, + WaitingFor: wait.ForLog("Starting CPU and Memory hog"), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to start container: %w", err) + } + + return &ResourceHogContainer{Container: container}, nil +} diff --git a/framework/leak/detector_test.go b/framework/leak/detector_test.go index c9c909d07..3300fff18 100644 --- a/framework/leak/detector_test.go +++ b/framework/leak/detector_test.go @@ -104,7 +104,7 @@ func TestMeasure(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { qc.SetResponses(tc.startResponse, tc.endResponse) - diff, err := lc.MeasureLeak(&leak.CheckConfig{ + diff, err := lc.MeasureDelta(&leak.CheckConfig{ // Prometheus returns good errors when query is invalid // so we do not test it since there is no additional validation Query: ``, @@ -148,7 +148,7 @@ func TestRealPrometheusLowLevelAPI(t *testing.T) { lc := leak.NewResourceLeakChecker() for i := range donNodes { - diff, err := lc.MeasureLeak(&leak.CheckConfig{ + diff, err := lc.MeasureDelta(&leak.CheckConfig{ Query: fmt.Sprintf(`quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`, i), Start: mustTime("2026-01-12T21:53:00Z"), End: mustTime("2026-01-13T10:11:00Z"), diff --git a/framework/prometheus.go b/framework/prometheus.go index 48363bbc7..8fc17b986 100644 --- a/framework/prometheus.go +++ b/framework/prometheus.go @@ -41,18 +41,6 @@ type PromQueryResponseResult struct { Value []interface{} `json:"value"` } -// QueryRangeResponse represents the response from Prometheus range query API -type QueryRangeResponse struct { - Status string `json:"status"` - Data struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Values [][]interface{} `json:"values"` - } `json:"result"` - } `json:"data"` -} - // QueryRangeParams contains parameters for range queries type QueryRangeParams struct { Query string @@ -87,7 +75,7 @@ func (p *PrometheusQueryClient) Query(query string, timestamp time.Time) (*Prome } // QueryRange executes a range query against the Prometheus API -func (p *PrometheusQueryClient) QueryRange(params QueryRangeParams) (*QueryRangeResponse, error) { +func (p *PrometheusQueryClient) QueryRange(params QueryRangeParams) (*PrometheusQueryResponse, error) { url := fmt.Sprintf("%s/api/v1/query_range", p.baseURL) resp, err := p.client.R(). SetQueryParams(map[string]string{ @@ -103,7 +91,7 @@ func (p *PrometheusQueryClient) QueryRange(params QueryRangeParams) (*QueryRange if resp.StatusCode() != 200 { return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode()) } - var result QueryRangeResponse + var result PrometheusQueryResponse if err := json.Unmarshal(resp.Body(), &result); err != nil { return nil, fmt.Errorf("failed to unmarshal response: %w", err) } diff --git a/framework/prometheus_test.go b/framework/prometheus_test.go index c4c095bbb..12c651a8c 100644 --- a/framework/prometheus_test.go +++ b/framework/prometheus_test.go @@ -193,7 +193,7 @@ func TestPrometheusQueryClientQueryRange(t *testing.T) { response string expectedStatus string expectedCount int - validateResult func(t *testing.T, result *QueryRangeResponse) + validateResult func(t *testing.T, result *PrometheusQueryResponse) }{ { name: "successful range query with multiple data points", @@ -219,7 +219,7 @@ func TestPrometheusQueryClientQueryRange(t *testing.T) { }`, expectedStatus: "success", expectedCount: 1, - validateResult: func(t *testing.T, result *QueryRangeResponse) { + validateResult: func(t *testing.T, result *PrometheusQueryResponse) { assert.Equal(t, "matrix", result.Data.ResultType) assert.Equal(t, "http_requests_total", result.Data.Result[0].Metric["__name__"]) assert.Equal(t, "api-server", result.Data.Result[0].Metric["job"]) @@ -258,7 +258,7 @@ func TestPrometheusQueryClientQueryRange(t *testing.T) { }`, expectedStatus: "success", expectedCount: 2, - validateResult: func(t *testing.T, result *QueryRangeResponse) { + validateResult: func(t *testing.T, result *PrometheusQueryResponse) { assert.Equal(t, "matrix", result.Data.ResultType) assert.Equal(t, "cpu_usage", result.Data.Result[0].Metric["__name__"]) assert.Equal(t, "server1", result.Data.Result[0].Metric["instance"]) From f23a5710227d283911e2cb03a9a53bb69e4c2112 Mon Sep 17 00:00:00 2001 From: skudasov Date: Fri, 16 Jan 2026 15:10:06 +0100 Subject: [PATCH 4/6] rollback prom --- framework/prometheus.go | 18 +++++++++++++++--- framework/prometheus_test.go | 6 +++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/framework/prometheus.go b/framework/prometheus.go index 8fc17b986..5dd9ccbb2 100644 --- a/framework/prometheus.go +++ b/framework/prometheus.go @@ -41,6 +41,18 @@ type PromQueryResponseResult struct { Value []interface{} `json:"value"` } +// QueryRangeResponse represents the response from Prometheus range query API +type QueryRangeResponse struct { + Status string `json:"status"` + Data struct { + ResultType string `json:"resultType"` + Result []struct { + Metric map[string]string `json:"metric"` + Values [][]interface{} `json:"values"` + } `json:"result"` + } `json:"data"` +} + // QueryRangeParams contains parameters for range queries type QueryRangeParams struct { Query string @@ -75,7 +87,7 @@ func (p *PrometheusQueryClient) Query(query string, timestamp time.Time) (*Prome } // QueryRange executes a range query against the Prometheus API -func (p *PrometheusQueryClient) QueryRange(params QueryRangeParams) (*PrometheusQueryResponse, error) { +func (p *PrometheusQueryClient) QueryRange(params QueryRangeParams) (*QueryRangeResponse, error) { url := fmt.Sprintf("%s/api/v1/query_range", p.baseURL) resp, err := p.client.R(). SetQueryParams(map[string]string{ @@ -91,7 +103,7 @@ func (p *PrometheusQueryClient) QueryRange(params QueryRangeParams) (*Prometheus if resp.StatusCode() != 200 { return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode()) } - var result PrometheusQueryResponse + var result QueryRangeResponse if err := json.Unmarshal(resp.Body(), &result); err != nil { return nil, fmt.Errorf("failed to unmarshal response: %w", err) } @@ -122,4 +134,4 @@ func formatPrometheusTime(t time.Time) string { // formatDuration formats duration for Prometheus API func formatDuration(d time.Duration) string { return fmt.Sprintf("%.0fs", d.Seconds()) -} +} \ No newline at end of file diff --git a/framework/prometheus_test.go b/framework/prometheus_test.go index 12c651a8c..c4c095bbb 100644 --- a/framework/prometheus_test.go +++ b/framework/prometheus_test.go @@ -193,7 +193,7 @@ func TestPrometheusQueryClientQueryRange(t *testing.T) { response string expectedStatus string expectedCount int - validateResult func(t *testing.T, result *PrometheusQueryResponse) + validateResult func(t *testing.T, result *QueryRangeResponse) }{ { name: "successful range query with multiple data points", @@ -219,7 +219,7 @@ func TestPrometheusQueryClientQueryRange(t *testing.T) { }`, expectedStatus: "success", expectedCount: 1, - validateResult: func(t *testing.T, result *PrometheusQueryResponse) { + validateResult: func(t *testing.T, result *QueryRangeResponse) { assert.Equal(t, "matrix", result.Data.ResultType) assert.Equal(t, "http_requests_total", result.Data.Result[0].Metric["__name__"]) assert.Equal(t, "api-server", result.Data.Result[0].Metric["job"]) @@ -258,7 +258,7 @@ func TestPrometheusQueryClientQueryRange(t *testing.T) { }`, expectedStatus: "success", expectedCount: 2, - validateResult: func(t *testing.T, result *PrometheusQueryResponse) { + validateResult: func(t *testing.T, result *QueryRangeResponse) { assert.Equal(t, "matrix", result.Data.ResultType) assert.Equal(t, "cpu_usage", result.Data.Result[0].Metric["__name__"]) assert.Equal(t, "server1", result.Data.Result[0].Metric["instance"]) From 0bf8ba45e2b762802bc87e9ce1f51b762cdc2cc3 Mon Sep 17 00:00:00 2001 From: skudasov Date: Fri, 16 Jan 2026 18:39:05 +0100 Subject: [PATCH 5/6] docs and make CPU query smoother --- book/src/framework/resource_leaks.md | 29 +++++++++++++++++++++++++++- framework/leak/detector_cl_node.go | 3 ++- framework/leak/detector_hog_test.go | 11 ++++++----- framework/prometheus.go | 2 +- 4 files changed, 37 insertions(+), 8 deletions(-) diff --git a/book/src/framework/resource_leaks.md b/book/src/framework/resource_leaks.md index 98ccfe12c..cec555d7c 100644 --- a/book/src/framework/resource_leaks.md +++ b/book/src/framework/resource_leaks.md @@ -27,4 +27,31 @@ import ( ## Custom Resource Assertion -You can also use low-level API to verify \ No newline at end of file +You can also use low-level API to verify +```go + diff, err := lc.MeasureDelta(&leak.CheckConfig{ + Query: fmt.Sprintf(`quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`, i), + Start: mustTime("2026-01-12T21:53:00Z"), + End: mustTime("2026-01-13T10:11:00Z"), + WarmUpDuration: 1 * time.Hour, + }) + require.NoError(t, err) +``` + +## Adding New Queries + +You can use our test `hog` to debug new metrics and verify its correctness +```bash +cd framework/leak/cmd +just build +``` + +Run different hogs +```bash +ctf obs up +go test -v -timeout 1h -run TestCyclicHog +``` +Then verify your query +```bash +go test -v -run TestVerifyCyclicHog +``` diff --git a/framework/leak/detector_cl_node.go b/framework/leak/detector_cl_node.go index 7ea597896..d12aca307 100644 --- a/framework/leak/detector_cl_node.go +++ b/framework/leak/detector_cl_node.go @@ -55,7 +55,8 @@ func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDet } switch cd.Mode { case "devenv": - cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[5m])) * 100` + // aggregate it on 5m interval with 2m step for mitigating spikes + cd.CPUQuery = `avg_over_time((sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[5m])) * 100)[5m:2m])` cd.MemoryQuery = `avg_over_time(container_memory_rss{name="don-node%d"}[5m]) / 1024 / 1024` case "griddle": return nil, fmt.Errorf("not implemented yet") diff --git a/framework/leak/detector_hog_test.go b/framework/leak/detector_hog_test.go index 6598d47b7..c421b2a1e 100644 --- a/framework/leak/detector_hog_test.go +++ b/framework/leak/detector_hog_test.go @@ -15,7 +15,7 @@ import ( "github.com/testcontainers/testcontainers-go/wait" ) -func TestRunHog(t *testing.T) { +func TestCyclicHog(t *testing.T) { ctx := context.Background() hog, err := SetupResourceHog( ctx, @@ -35,13 +35,14 @@ func TestRunHog(t *testing.T) { }) } -func TestVerifyHog(t *testing.T) { +func TestVerifyCyclicHog(t *testing.T) { lc := leak.NewResourceLeakChecker() // cpu diff, err := lc.MeasureDelta(&leak.CheckConfig{ - Query: `sum(rate(container_cpu_usage_seconds_total{name="resource-hog"}[5m])) * 100`, - Start: mustTime("2026-01-16T13:20:30Z"), - End: mustTime("2026-01-16T13:39:45Z"), + Query: `avg_over_time((sum(rate(container_cpu_usage_seconds_total{name="resource-hog"}[5m])) * 100)[5m:2m])`, + Start: mustTime("2026-01-16T13:20:30Z"), + End: mustTime("2026-01-16T13:32:40Z"), + WarmUpDuration: 2 * time.Minute, }) fmt.Println(diff) require.NoError(t, err) diff --git a/framework/prometheus.go b/framework/prometheus.go index 5dd9ccbb2..48363bbc7 100644 --- a/framework/prometheus.go +++ b/framework/prometheus.go @@ -134,4 +134,4 @@ func formatPrometheusTime(t time.Time) string { // formatDuration formats duration for Prometheus API func formatDuration(d time.Duration) string { return fmt.Sprintf("%.0fs", d.Seconds()) -} \ No newline at end of file +} From d2e3af24ba31660f557e603a7686d04b1a8fda49 Mon Sep 17 00:00:00 2001 From: skudasov Date: Fri, 16 Jan 2026 18:50:05 +0100 Subject: [PATCH 6/6] cleanup --- framework/leak/detector_hog_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/framework/leak/detector_hog_test.go b/framework/leak/detector_hog_test.go index c421b2a1e..c01915e66 100644 --- a/framework/leak/detector_hog_test.go +++ b/framework/leak/detector_hog_test.go @@ -16,6 +16,7 @@ import ( ) func TestCyclicHog(t *testing.T) { + t.Skip("unskip when debugging new queries") ctx := context.Background() hog, err := SetupResourceHog( ctx, @@ -36,6 +37,7 @@ func TestCyclicHog(t *testing.T) { } func TestVerifyCyclicHog(t *testing.T) { + t.Skip("unskip when debugging new queries") lc := leak.NewResourceLeakChecker() // cpu diff, err := lc.MeasureDelta(&leak.CheckConfig{