diff --git a/book/src/SUMMARY.md b/book/src/SUMMARY.md index f178fad7f..23a16e894 100644 --- a/book/src/SUMMARY.md +++ b/book/src/SUMMARY.md @@ -22,6 +22,7 @@ - [Components Resources](framework/components/resources.md) - [Containers Network Isolation](framework/components/network_isolation.md) - [Fake Services](framework/components/mocking.md) + - [Detecting Resource Leaks](framework/resource_leaks.md) - [Copying Files](framework/copying_files.md) - [External Environment](framework/components/external.md) - [Observability Stack](framework/observability/observability_stack.md) diff --git a/book/src/framework/resource_leaks.md b/book/src/framework/resource_leaks.md new file mode 100644 index 000000000..cec555d7c --- /dev/null +++ b/book/src/framework/resource_leaks.md @@ -0,0 +1,57 @@ +## Resource Leak Detector + +We have a simple utility to detect resource leaks in our tests + +## CL Nodes Leak Detection + +In this example test will fail if any node will consume more than 2 additional cores and allocate 20% more memory at the end of a test. +```go +import ( + "github.com/smartcontractkit/chainlink-testing-framework/framework/leak" +) +``` + +```go + l, err := leak.NewCLNodesLeakDetector(leak.NewResourceLeakChecker()) + require.NoError(t, err) + errs := l.Check(&leak.CLNodesCheck{ + NumNodes: in.NodeSets[0].Nodes, + Start: start, + End: time.Now(), + WarmUpDuration: 10 * time.Minute, + CPUThreshold: 2000.0, + MemoryThreshold: 20.0, + }) + require.NoError(t, errs) +``` + +## Custom Resource Assertion + +You can also use low-level API to verify +```go + diff, err := lc.MeasureDelta(&leak.CheckConfig{ + Query: fmt.Sprintf(`quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`, i), + Start: mustTime("2026-01-12T21:53:00Z"), + End: mustTime("2026-01-13T10:11:00Z"), + WarmUpDuration: 1 * time.Hour, + }) + require.NoError(t, err) +``` + +## Adding New Queries + +You can use our test `hog` to debug new metrics and verify its correctness +```bash +cd framework/leak/cmd +just build +``` + +Run different hogs +```bash +ctf obs up +go test -v -timeout 1h -run TestCyclicHog +``` +Then verify your query +```bash +go test -v -run TestVerifyCyclicHog +``` diff --git a/framework/.changeset/v0.13.2.md b/framework/.changeset/v0.13.2.md new file mode 100644 index 000000000..e33983ea9 --- /dev/null +++ b/framework/.changeset/v0.13.2.md @@ -0,0 +1,2 @@ +- Add pprof dumps download +- Generalize and change CPU query diff --git a/framework/leak/cmd/Dockerfile b/framework/leak/cmd/Dockerfile new file mode 100644 index 000000000..6896c3f68 --- /dev/null +++ b/framework/leak/cmd/Dockerfile @@ -0,0 +1,14 @@ +# Build stage +FROM golang:1.25-alpine AS builder + +WORKDIR /app + +COPY go.mod ./ +RUN go mod download +COPY . . +RUN go build -o resource-hog main.go + +FROM alpine:latest +WORKDIR /root/ +COPY --from=builder /app/resource-hog . +CMD ["./resource-hog"] \ No newline at end of file diff --git a/framework/leak/cmd/Justfile b/framework/leak/cmd/Justfile new file mode 100644 index 000000000..28fc53220 --- /dev/null +++ b/framework/leak/cmd/Justfile @@ -0,0 +1,8 @@ +run: + go run main.go + +build: + docker build -t resource-hog:latest . + +clean-hog: + docker rm -f resource-hog-test 2>/dev/null || true \ No newline at end of file diff --git a/framework/leak/cmd/go.mod b/framework/leak/cmd/go.mod new file mode 100644 index 000000000..e880b8d06 --- /dev/null +++ b/framework/leak/cmd/go.mod @@ -0,0 +1,3 @@ +module github.com/smartcontractkit/chainlink-testing-framework/framework/hog + +go 1.25.4 diff --git a/framework/leak/cmd/main.go b/framework/leak/cmd/main.go new file mode 100644 index 000000000..54a025b30 --- /dev/null +++ b/framework/leak/cmd/main.go @@ -0,0 +1,149 @@ +package main + +import ( + "fmt" + "log" + "math" + "os" + "strconv" + "strings" + "time" +) + +const ( + StepTick = 3 * time.Minute +) + +func main() { + fmt.Println("Starting CPU and Memory hog...") + + workersSchedule := os.Getenv("WORKERS") + memorySchedule := os.Getenv("MEMORY") + repeatStr := os.Getenv("REPEAT") + + leaks := make([][]byte, 0) + workerCounter := 0 + + go scheduleMemoryLeaks(memorySchedule, parseRepeat(repeatStr), &leaks) + go scheduleCPUWorkers(workersSchedule, parseRepeat(repeatStr), &workerCounter) + + select {} +} + +func parseRepeat(repeatStr string) int { + if repeatStr == "" { + return 1 + } + repeat, _ := strconv.Atoi(repeatStr) + if repeat < 1 { + return 1 + } + return repeat +} + +func scheduleMemoryLeaks(schedule string, repeat int, leaks *[][]byte) { + if schedule == "" { + return + } + + levels := parseSchedule(schedule) + + for r := 0; r < repeat; r++ { + for _, target := range levels { + timer := time.NewTimer(StepTick) + + current := len(*leaks) / 100 + + if target > current { + for i := 0; i < target-current; i++ { + leak := make([]byte, 100*1024*1024) + for j := range leak { + leak[j] = byte(j % 256) + } + *leaks = append(*leaks, leak) + } + } else if target < current { + removeCount := current - target + if removeCount > len(*leaks)/100 { + removeCount = len(*leaks) / 100 + } + *leaks = (*leaks)[:len(*leaks)-removeCount*100] + } + + log.Printf("Memory: %dx100MB", len(*leaks)/100) + <-timer.C + } + } +} + +func scheduleCPUWorkers(schedule string, repeat int, counter *int) { + if schedule == "" { + return + } + + levels := parseSchedule(schedule) + activeWorkers := 0 + + for r := 0; r < repeat; r++ { + for _, target := range levels { + timer := time.NewTimer(StepTick) + + if target > activeWorkers { + for i := 0; i < target-activeWorkers; i++ { + *counter++ + go cpuWorker(*counter) + } + } else if target < activeWorkers { + stopWorkers(activeWorkers - target) + } + + activeWorkers = target + log.Printf("CPU Workers: %d", activeWorkers) + <-timer.C + } + } +} + +var stopChan = make(chan bool, 1000) + +func cpuWorker(id int) { + for { + select { + case <-stopChan: + return + default: + for n := 2; n < 100000; n++ { + isPrime := true + for i := 2; i <= int(math.Sqrt(float64(n))); i++ { + if n%i == 0 { + isPrime = false + break + } + } + _ = isPrime + } + time.Sleep(10 * time.Millisecond) + } + } +} + +func stopWorkers(count int) { + for i := 0; i < count; i++ { + select { + case stopChan <- true: + default: + } + } +} + +func parseSchedule(schedule string) []int { + parts := strings.Split(schedule, ",") + var result []int + for _, part := range parts { + val, _ := strconv.Atoi(strings.TrimSpace(part)) + if val >= 0 { + result = append(result, val) + } + } + return result +} diff --git a/framework/leak/detector.go b/framework/leak/detector.go index 803184469..f621d92de 100644 --- a/framework/leak/detector.go +++ b/framework/leak/detector.go @@ -66,9 +66,9 @@ type CheckConfig struct { WarmUpDuration time.Duration } -// MeasureLeak measures resource leak between start and end timestamps +// MeasureDelta measures resource leak delta between start and end timestamps // WarmUpDuration is used to ignore warm up interval results for more stable comparison -func (rc *ResourceLeakChecker) MeasureLeak( +func (rc *ResourceLeakChecker) MeasureDelta( c *CheckConfig, ) (float64, error) { if c.Start.After(c.End) { @@ -91,10 +91,10 @@ func (rc *ResourceLeakChecker) MeasureLeak( resStart := memStart.Data.Result resEnd := memEnd.Data.Result if len(resStart) == 0 { - return 0, fmt.Errorf("no results for start timestamp: %s", c.Start) + return 0, fmt.Errorf("no results for start timestamp: %s, query: %s", startWithWarmUp, c.Query) } if len(resEnd) == 0 { - return 0, fmt.Errorf("no results for end timestamp: %s", c.End) + return 0, fmt.Errorf("no results for end timestamp: %s, query: %s", c.End, c.Query) } if len(resStart[0].Value) < 2 { @@ -113,21 +113,21 @@ func (rc *ResourceLeakChecker) MeasureLeak( return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.End, memEnd.Data.Result[0].Value[1]) } - memStartValFloat, err := strconv.ParseFloat(memStartVal, 64) + startValFloat, err := strconv.ParseFloat(memStartVal, 64) if err != nil { return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err) } - memEndValFloat, err := strconv.ParseFloat(memEndVal, 64) + endValFloat, err := strconv.ParseFloat(memEndVal, 64) if err != nil { return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err) } - totalIncreasePercentage := (memEndValFloat / memStartValFloat * 100) - 100 + totalIncreasePercentage := (endValFloat / startValFloat * 100) - 100 - f.L.Debug(). - Float64("Start", memStartValFloat). - Float64("End", memEndValFloat). + f.L.Info(). + Float64("Start", startValFloat). + Float64("End", endValFloat). Float64("Increase", totalIncreasePercentage). - Msg("Memory increase total (percentage)") + Msg("Increase total (percentage)") return totalIncreasePercentage, nil } diff --git a/framework/leak/detector_cl_node.go b/framework/leak/detector_cl_node.go index 888b648c3..d12aca307 100644 --- a/framework/leak/detector_cl_node.go +++ b/framework/leak/detector_cl_node.go @@ -55,8 +55,9 @@ func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDet } switch cd.Mode { case "devenv": - cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name=~"don-node%d"}[5m])) * 100` - cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024` + // aggregate it on 5m interval with 2m step for mitigating spikes + cd.CPUQuery = `avg_over_time((sum(rate(container_cpu_usage_seconds_total{name="don-node%d"}[5m])) * 100)[5m:2m])` + cd.MemoryQuery = `avg_over_time(container_memory_rss{name="don-node%d"}[5m]) / 1024 / 1024` case "griddle": return nil, fmt.Errorf("not implemented yet") default: @@ -74,7 +75,7 @@ func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error { cpuDiffs := make([]float64, 0) errs := make([]error, 0) for i := range t.NumNodes { - memoryDiff, err := cd.c.MeasureLeak(&CheckConfig{ + memoryDiff, err := cd.c.MeasureDelta(&CheckConfig{ Query: fmt.Sprintf(cd.MemoryQuery, i), Start: t.Start, End: t.End, @@ -84,7 +85,7 @@ func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error { return fmt.Errorf("memory leak check failed: %w", err) } memoryDiffs = append(memoryDiffs, memoryDiff) - cpuDiff, err := cd.c.MeasureLeak(&CheckConfig{ + cpuDiff, err := cd.c.MeasureDelta(&CheckConfig{ Query: fmt.Sprintf(cd.CPUQuery, i), Start: t.Start, End: t.End, diff --git a/framework/leak/detector_hog_test.go b/framework/leak/detector_hog_test.go new file mode 100644 index 000000000..c01915e66 --- /dev/null +++ b/framework/leak/detector_hog_test.go @@ -0,0 +1,88 @@ +package leak_test + +import ( + "context" + "fmt" + "log" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-testing-framework/framework/leak" + + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/wait" +) + +func TestCyclicHog(t *testing.T) { + t.Skip("unskip when debugging new queries") + ctx := context.Background() + hog, err := SetupResourceHog( + ctx, + "resource-hog:latest", + map[string]string{ + "WORKERS": "1,2,3,2,1", + "MEMORY": "1,2,3,2,1", + "REPEAT": "1", + }, + ) + require.NoError(t, err) + time.Sleep(15 * time.Minute) + t.Cleanup(func() { + if err := hog.Terminate(ctx); err != nil { + log.Printf("Failed to terminate container: %v", err) + } + }) +} + +func TestVerifyCyclicHog(t *testing.T) { + t.Skip("unskip when debugging new queries") + lc := leak.NewResourceLeakChecker() + // cpu + diff, err := lc.MeasureDelta(&leak.CheckConfig{ + Query: `avg_over_time((sum(rate(container_cpu_usage_seconds_total{name="resource-hog"}[5m])) * 100)[5m:2m])`, + Start: mustTime("2026-01-16T13:20:30Z"), + End: mustTime("2026-01-16T13:32:40Z"), + WarmUpDuration: 2 * time.Minute, + }) + fmt.Println(diff) + require.NoError(t, err) + + // mem + diff, err = lc.MeasureDelta(&leak.CheckConfig{ + Query: `avg_over_time(container_memory_rss{name="resource-hog"}[5m]) / 1024 / 1024`, + Start: mustTime("2026-01-16T13:20:30Z"), + End: mustTime("2026-01-16T13:38:25Z"), + }) + fmt.Println(diff) + require.NoError(t, err) +} + +// ResourceHogContainer represents a container that hogs CPU and memory +type ResourceHogContainer struct { + testcontainers.Container + URI string +} + +// SetupResourceHog starts a container that consumes CPU and memory +func SetupResourceHog(ctx context.Context, image string, env map[string]string) (*ResourceHogContainer, error) { + // Build request for the container + req := testcontainers.ContainerRequest{ + Name: "resource-hog", + Image: image, + ExposedPorts: []string{}, + Env: env, + WaitingFor: wait.ForLog("Starting CPU and Memory hog"), + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + return nil, fmt.Errorf("failed to start container: %w", err) + } + + return &ResourceHogContainer{Container: container}, nil +} diff --git a/framework/leak/detector_test.go b/framework/leak/detector_test.go index b48c98c19..3300fff18 100644 --- a/framework/leak/detector_test.go +++ b/framework/leak/detector_test.go @@ -104,7 +104,7 @@ func TestMeasure(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { qc.SetResponses(tc.startResponse, tc.endResponse) - diff, err := lc.MeasureLeak(&leak.CheckConfig{ + diff, err := lc.MeasureDelta(&leak.CheckConfig{ // Prometheus returns good errors when query is invalid // so we do not test it since there is no additional validation Query: ``, @@ -130,11 +130,10 @@ func TestRealCLNodesLeakDetectionLocalDevenv(t *testing.T) { require.NoError(t, err) errs := cnd.Check(&leak.CLNodesCheck{ NumNodes: 4, - Start: mustTime("2026-01-12T20:53:00Z"), - End: mustTime("2026-01-13T10:11:00Z"), - WarmUpDuration: 1 * time.Hour, - CPUThreshold: 10.0, - MemoryThreshold: 10.0, + Start: mustTime("2026-01-15T01:14:00Z"), + End: mustTime("2026-01-15T02:04:00Z"), + CPUThreshold: 20.0, + MemoryThreshold: 20.0, }) require.NoError(t, errs) fmt.Println(errs) @@ -149,7 +148,7 @@ func TestRealPrometheusLowLevelAPI(t *testing.T) { lc := leak.NewResourceLeakChecker() for i := range donNodes { - diff, err := lc.MeasureLeak(&leak.CheckConfig{ + diff, err := lc.MeasureDelta(&leak.CheckConfig{ Query: fmt.Sprintf(`quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`, i), Start: mustTime("2026-01-12T21:53:00Z"), End: mustTime("2026-01-13T10:11:00Z"),