diff --git a/framework/.changeset/v0.13.1.md b/framework/.changeset/v0.13.1.md new file mode 100644 index 000000000..fbc84372d --- /dev/null +++ b/framework/.changeset/v0.13.1.md @@ -0,0 +1 @@ +- Add resource leak detector automation diff --git a/framework/leak/detector.go b/framework/leak/detector.go new file mode 100644 index 000000000..803184469 --- /dev/null +++ b/framework/leak/detector.go @@ -0,0 +1,133 @@ +package leak + +/* +Resource leak detector +This module provides a Prometheus-based leak detector for long-running soak tests. It detects leaks by comparing the median resource usage at the start and end of a test and flags any increases that breach configured thresholds. + +Usage Note: Set the WarmUpDuration to at least 20% of your test length for reliable metrics. +It is also recommend to use it with 3h+ soak tests for less false-positives. +*/ + +import ( + "fmt" + "strconv" + "time" + + f "github.com/smartcontractkit/chainlink-testing-framework/framework" +) + +// ResourceLeakCheckerConfig is resource leak checker config with Prometheus base URL +type ResourceLeakCheckerConfig struct { + PrometheusBaseURL string +} + +// ResourceLeakChecker is resource leak cheker instance +type ResourceLeakChecker struct { + PrometheusURL string + c PromQuerier +} + +// WithPrometheusBaseURL sets Prometheus base URL, example http://localhost:9099 +func WithPrometheusBaseURL(url string) func(*ResourceLeakChecker) { + return func(rlc *ResourceLeakChecker) { + rlc.PrometheusURL = url + } +} + +// WithQueryClient sets Prometheus query client +func WithQueryClient(c PromQuerier) func(*ResourceLeakChecker) { + return func(rlc *ResourceLeakChecker) { + rlc.c = c + } +} + +// PromQueries is an interface for querying Prometheus containing only methods we need for detecting resource leaks +type PromQuerier interface { + Query(query string, timestamp time.Time) (*f.PrometheusQueryResponse, error) +} + +// NewResourceLeakChecker creates a new resource leak checker +func NewResourceLeakChecker(opts ...func(*ResourceLeakChecker)) *ResourceLeakChecker { + lc := &ResourceLeakChecker{} + for _, o := range opts { + o(lc) + } + if lc.c == nil { + lc.c = f.NewPrometheusQueryClient(f.LocalPrometheusBaseURL) + } + return lc +} + +// CheckConfig describes leak check configuration +type CheckConfig struct { + Query string + Start time.Time + End time.Time + WarmUpDuration time.Duration +} + +// MeasureLeak measures resource leak between start and end timestamps +// WarmUpDuration is used to ignore warm up interval results for more stable comparison +func (rc *ResourceLeakChecker) MeasureLeak( + c *CheckConfig, +) (float64, error) { + if c.Start.After(c.End) { + return 0, fmt.Errorf("start time is greated than end time: %s -> %s", c.Start, c.End) + } + if c.WarmUpDuration > c.End.Sub(c.Start)/2 { + return 0, fmt.Errorf("warm up duration can't be more than 50 percent of test interval between start and end timestamps: %s", c.WarmUpDuration) + } + startWithWarmUp := c.Start.Add(c.WarmUpDuration) + memStart, err := rc.c.Query(c.Query, startWithWarmUp) + if err != nil { + return 0, fmt.Errorf("failed to get memory for the test start: %w", err) + } + + memEnd, err := rc.c.Query(c.Query, c.End) + if err != nil { + return 0, fmt.Errorf("failed to get memory for the test end: %w", err) + } + + resStart := memStart.Data.Result + resEnd := memEnd.Data.Result + if len(resStart) == 0 { + return 0, fmt.Errorf("no results for start timestamp: %s", c.Start) + } + if len(resEnd) == 0 { + return 0, fmt.Errorf("no results for end timestamp: %s", c.End) + } + + if len(resStart[0].Value) < 2 { + return 0, fmt.Errorf("invalid Prometheus response for start timestamp, should have timestamp and value: %s", c.Start) + } + if len(resEnd[0].Value) < 2 { + return 0, fmt.Errorf("invalid Prometheus response for end timestamp, should have timestamp and value: %s", c.End) + } + + memStartVal, startOk := memStart.Data.Result[0].Value[1].(string) + if !startOk { + return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.Start, memStart.Data.Result[0].Value[1]) + } + memEndVal, endOk := memEnd.Data.Result[0].Value[1].(string) + if !endOk { + return 0, fmt.Errorf("invalid Prometheus response value for timestamp: %s, value: %v", c.End, memEnd.Data.Result[0].Value[1]) + } + + memStartValFloat, err := strconv.ParseFloat(memStartVal, 64) + if err != nil { + return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err) + } + memEndValFloat, err := strconv.ParseFloat(memEndVal, 64) + if err != nil { + return 0, fmt.Errorf("start quantile can't be parsed from string: %w", err) + } + + totalIncreasePercentage := (memEndValFloat / memStartValFloat * 100) - 100 + + f.L.Debug(). + Float64("Start", memStartValFloat). + Float64("End", memEndValFloat). + Float64("Increase", totalIncreasePercentage). + Msg("Memory increase total (percentage)") + return totalIncreasePercentage, nil +} diff --git a/framework/leak/detector_cl_node.go b/framework/leak/detector_cl_node.go new file mode 100644 index 000000000..888b648c3 --- /dev/null +++ b/framework/leak/detector_cl_node.go @@ -0,0 +1,116 @@ +package leak + +import ( + "errors" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink-testing-framework/framework" +) + +// ClNodesCheck contains thresholds which can be verified for each Chainlink node +// it is recommended to set some WarmUpDuration, 20% of overall test time +// to have more stable results +type CLNodesCheck struct { + NumNodes int + Start time.Time + End time.Time + WarmUpDuration time.Duration + CPUThreshold float64 + MemoryThreshold float64 +} + +// CLNodesLeakDetector is Chainlink node specific resource leak detector +// can be used with both local and remote Chainlink node sets (DONs) +type CLNodesLeakDetector struct { + Mode string + CPUQuery, MemoryQuery string + c *ResourceLeakChecker +} + +// WithCPUQuery allows to override CPU leak query (Prometheus) +func WithCPUQuery(q string) func(*CLNodesLeakDetector) { + return func(cd *CLNodesLeakDetector) { + cd.CPUQuery = q + } +} + +// WithCPUQuery allows to override Memory leak query (Prometheus) +func WithMemoryQuery(q string) func(*CLNodesLeakDetector) { + return func(cd *CLNodesLeakDetector) { + cd.MemoryQuery = q + } +} + +// NewCLNodesLeakDetector create new Chainlink node specific resource leak detector with Prometheus client +func NewCLNodesLeakDetector(c *ResourceLeakChecker, opts ...func(*CLNodesLeakDetector)) (*CLNodesLeakDetector, error) { + cd := &CLNodesLeakDetector{ + c: c, + } + for _, o := range opts { + o(cd) + } + if cd.Mode == "" { + cd.Mode = "devenv" + } + switch cd.Mode { + case "devenv": + cd.CPUQuery = `sum(rate(container_cpu_usage_seconds_total{name=~"don-node%d"}[5m])) * 100` + cd.MemoryQuery = `quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024` + case "griddle": + return nil, fmt.Errorf("not implemented yet") + default: + return nil, fmt.Errorf("invalid mode, use: 'devenv' or 'griddle'") + } + return cd, nil +} + +// Check runs all resource leak checks and returns errors if threshold reached for any of them +func (cd *CLNodesLeakDetector) Check(t *CLNodesCheck) error { + if t.NumNodes == 0 { + return fmt.Errorf("cl nodes num must be > 0") + } + memoryDiffs := make([]float64, 0) + cpuDiffs := make([]float64, 0) + errs := make([]error, 0) + for i := range t.NumNodes { + memoryDiff, err := cd.c.MeasureLeak(&CheckConfig{ + Query: fmt.Sprintf(cd.MemoryQuery, i), + Start: t.Start, + End: t.End, + WarmUpDuration: t.WarmUpDuration, + }) + if err != nil { + return fmt.Errorf("memory leak check failed: %w", err) + } + memoryDiffs = append(memoryDiffs, memoryDiff) + cpuDiff, err := cd.c.MeasureLeak(&CheckConfig{ + Query: fmt.Sprintf(cd.CPUQuery, i), + Start: t.Start, + End: t.End, + WarmUpDuration: t.WarmUpDuration, + }) + if err != nil { + return fmt.Errorf("cpu leak check failed: %w", err) + } + cpuDiffs = append(cpuDiffs, cpuDiff) + + if memoryDiff >= t.MemoryThreshold { + errs = append(errs, fmt.Errorf( + "Memory leak detected for node %d and interval: [%s -> %s], diff: %.f", + i, t.Start, t.End, memoryDiff, + )) + } + if cpuDiff >= t.CPUThreshold { + errs = append(errs, fmt.Errorf( + "CPU leak detected for node %d and interval: [%s -> %s], diff: %.f", + i, t.Start, t.End, cpuDiff, + )) + } + } + framework.L.Info(). + Any("MemoryDiffs", memoryDiffs). + Any("CPUDiffs", cpuDiffs). + Msg("Leaks info") + return errors.Join(errs...) +} diff --git a/framework/leak/detector_fake.go b/framework/leak/detector_fake.go new file mode 100644 index 000000000..218dec5a8 --- /dev/null +++ b/framework/leak/detector_fake.go @@ -0,0 +1,48 @@ +package leak + +import ( + "time" + + f "github.com/smartcontractkit/chainlink-testing-framework/framework" +) + +var _ PromQuerier = (*FakeQueryClient)(nil) + +type FakeQueryClient struct { + isStartResp bool + startResp *f.PrometheusQueryResponse + endResp *f.PrometheusQueryResponse +} + +func NewFakeQueryClient() *FakeQueryClient { + return &FakeQueryClient{} +} + +func (qc *FakeQueryClient) SetResponses(sr *f.PrometheusQueryResponse, er *f.PrometheusQueryResponse) { + qc.isStartResp = true + qc.startResp = sr + qc.endResp = er +} + +func (qc *FakeQueryClient) Query(query string, timestamp time.Time) (*f.PrometheusQueryResponse, error) { + if qc.isStartResp { + qc.isStartResp = false + return qc.startResp, nil + } + qc.isStartResp = true + return qc.endResp, nil +} + +func PromSingleValueResponse(val string) *f.PrometheusQueryResponse { + return &f.PrometheusQueryResponse{ + Data: &f.PromQueryResponseData{ + Result: []f.PromQueryResponseResult{ + { + Metric: map[string]string{}, + // timestamp is irrelevant in tests, we trust Prometheus + Value: []interface{}{"", val}, + }, + }, + }, + } +} diff --git a/framework/leak/detector_test.go b/framework/leak/detector_test.go new file mode 100644 index 000000000..b48c98c19 --- /dev/null +++ b/framework/leak/detector_test.go @@ -0,0 +1,167 @@ +package leak_test + +import ( + "fmt" + "testing" + "time" + + f "github.com/smartcontractkit/chainlink-testing-framework/framework" + + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-testing-framework/framework/leak" +) + +func mustTime(start string) time.Time { + s, err := time.Parse(time.RFC3339, start) + if err != nil { + panic("can't convert time from RFC3339") + } + return s +} + +func TestMeasure(t *testing.T) { + qc := leak.NewFakeQueryClient() + lc := leak.NewResourceLeakChecker(leak.WithQueryClient(qc)) + testCases := []struct { + name string + startTime time.Time + endTime time.Time + startResponse *f.PrometheusQueryResponse + endResponse *f.PrometheusQueryResponse + warmUpDuration time.Duration + expectedDiff float64 + errorContains string + }{ + { + name: "diff is correct and > 0", + startTime: mustTime("2026-01-12T21:53:00Z"), + endTime: mustTime("2026-01-13T10:11:00Z"), + startResponse: leak.PromSingleValueResponse("10"), + endResponse: leak.PromSingleValueResponse("20"), + expectedDiff: 100, + }, + { + name: "diff is correct and < 0", + startTime: mustTime("2026-01-12T21:53:00Z"), + endTime: mustTime("2026-01-13T10:11:00Z"), + startResponse: leak.PromSingleValueResponse("20"), + endResponse: leak.PromSingleValueResponse("0"), + expectedDiff: -100, + }, + { + name: "start > end time", + startTime: mustTime("2026-01-13T10:11:00Z"), + endTime: mustTime("2026-01-12T21:53:00Z"), + startResponse: leak.PromSingleValueResponse("10"), + endResponse: leak.PromSingleValueResponse("20"), + errorContains: "start time is greated than end time", + }, + { + name: "works with warm up duration", + startTime: mustTime("2026-01-01T10:00:00Z"), + endTime: mustTime("2026-01-01T11:00:00Z"), + startResponse: leak.PromSingleValueResponse("10"), + endResponse: leak.PromSingleValueResponse("15"), + warmUpDuration: 29 * time.Minute, + expectedDiff: 50, + }, + { + name: "warm up is too long", + startTime: mustTime("2026-01-01T10:00:00Z"), + endTime: mustTime("2026-01-01T11:00:00Z"), + startResponse: leak.PromSingleValueResponse("10"), + endResponse: leak.PromSingleValueResponse("20"), + warmUpDuration: 31 * time.Minute, + errorContains: "warm up duration can't be more than 50 percent", + }, + { + name: "no results for start time", + startTime: mustTime("2026-01-12T21:53:00Z"), + endTime: mustTime("2026-01-13T10:11:00Z"), + startResponse: &f.PrometheusQueryResponse{ + Data: &f.PromQueryResponseData{ + Result: nil, + }, + }, + endResponse: leak.PromSingleValueResponse("20"), + errorContains: "no results for start timestamp", + }, + { + name: "no results for end time", + startTime: mustTime("2026-01-12T21:53:00Z"), + endTime: mustTime("2026-01-13T10:11:00Z"), + startResponse: leak.PromSingleValueResponse("10"), + endResponse: &f.PrometheusQueryResponse{ + Data: &f.PromQueryResponseData{ + Result: nil, + }, + }, + errorContains: "no results for end timestamp", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + qc.SetResponses(tc.startResponse, tc.endResponse) + diff, err := lc.MeasureLeak(&leak.CheckConfig{ + // Prometheus returns good errors when query is invalid + // so we do not test it since there is no additional validation + Query: ``, + Start: tc.startTime, + End: tc.endTime, + WarmUpDuration: tc.warmUpDuration, + }) + if tc.errorContains != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.errorContains) + return + } + require.NoError(t, err) + require.Equal(t, tc.expectedDiff, diff) + }) + } +} + +func TestRealCLNodesLeakDetectionLocalDevenv(t *testing.T) { + t.Skip(`this test requires a real load run, see docs here https://github.com/smartcontractkit/chainlink/tree/develop/devenv, spin up the env and run "cl test load"`) + + cnd, err := leak.NewCLNodesLeakDetector(leak.NewResourceLeakChecker()) + require.NoError(t, err) + errs := cnd.Check(&leak.CLNodesCheck{ + NumNodes: 4, + Start: mustTime("2026-01-12T20:53:00Z"), + End: mustTime("2026-01-13T10:11:00Z"), + WarmUpDuration: 1 * time.Hour, + CPUThreshold: 10.0, + MemoryThreshold: 10.0, + }) + require.NoError(t, errs) + fmt.Println(errs) +} + +func TestRealPrometheusLowLevelAPI(t *testing.T) { + t.Skip(`this test requires a real load run, see docs here https://github.com/smartcontractkit/chainlink/tree/develop/devenv, spin up the env and run "cl test load"`) + + // demonstrates how to use low-level API for custom queries with CL nodes example + donNodes := 4 + resourceLeaks := make([]float64, 0) + + lc := leak.NewResourceLeakChecker() + for i := range donNodes { + diff, err := lc.MeasureLeak(&leak.CheckConfig{ + Query: fmt.Sprintf(`quantile_over_time(0.5, container_memory_rss{name="don-node%d"}[1h]) / 1024 / 1024`, i), + Start: mustTime("2026-01-12T21:53:00Z"), + End: mustTime("2026-01-13T10:11:00Z"), + WarmUpDuration: 1 * time.Hour, + }) + require.NoError(t, err) + resourceLeaks = append(resourceLeaks, diff) + } + require.Len(t, resourceLeaks, 4) + + fmt.Println(resourceLeaks) + for _, ml := range resourceLeaks { + require.GreaterOrEqual(t, ml, 0.5) + } +} diff --git a/framework/prometheus.go b/framework/prometheus.go index f56ef237f..48363bbc7 100644 --- a/framework/prometheus.go +++ b/framework/prometheus.go @@ -27,14 +27,18 @@ func NewPrometheusQueryClient(baseURL string) *PrometheusQueryClient { // PrometheusQueryResponse represents the response from Prometheus API type PrometheusQueryResponse struct { - Status string `json:"status"` - Data struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - } `json:"result"` - } `json:"data"` + Status string `json:"status"` + Data *PromQueryResponseData `json:"data"` +} + +type PromQueryResponseData struct { + ResultType string `json:"resultType"` + Result []PromQueryResponseResult `json:"result"` +} + +type PromQueryResponseResult struct { + Metric map[string]string `json:"metric"` + Value []interface{} `json:"value"` } // QueryRangeResponse represents the response from Prometheus range query API diff --git a/framework/prometheus_test.go b/framework/prometheus_test.go index d79060f7a..c4c095bbb 100644 --- a/framework/prometheus_test.go +++ b/framework/prometheus_test.go @@ -363,17 +363,8 @@ func TestPrometheusQueryClient_ResultToLabelsMap(t *testing.T) { { name: "single metric with multiple labels", input: &PrometheusQueryResponse{ - Data: struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - } `json:"result"` - }{ - Result: []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - }{ + Data: &PromQueryResponseData{ + Result: []PromQueryResponseResult{ { Metric: map[string]string{ "__name__": "http_requests_total", @@ -394,17 +385,8 @@ func TestPrometheusQueryClient_ResultToLabelsMap(t *testing.T) { { name: "multiple metrics with shared labels", input: &PrometheusQueryResponse{ - Data: struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - } `json:"result"` - }{ - Result: []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - }{ + Data: &PromQueryResponseData{ + Result: []PromQueryResponseResult{ { Metric: map[string]string{ "__name__": "http_requests_total", @@ -434,17 +416,8 @@ func TestPrometheusQueryClient_ResultToLabelsMap(t *testing.T) { { name: "empty result", input: &PrometheusQueryResponse{ - Data: struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - } `json:"result"` - }{ - Result: []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - }{}, + Data: &PromQueryResponseData{ + Result: nil, }, }, expected: map[string][]interface{}{}, @@ -452,17 +425,8 @@ func TestPrometheusQueryClient_ResultToLabelsMap(t *testing.T) { { name: "metric with no labels", input: &PrometheusQueryResponse{ - Data: struct { - ResultType string `json:"resultType"` - Result []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - } `json:"result"` - }{ - Result: []struct { - Metric map[string]string `json:"metric"` - Value []interface{} `json:"value"` - }{ + Data: &PromQueryResponseData{ + Result: []PromQueryResponseResult{ { Metric: map[string]string{}, Value: []interface{}{float64(1435781451.781), "1"}, diff --git a/justfile b/justfile index 05221603a..a1d2bd5ce 100644 --- a/justfile +++ b/justfile @@ -14,7 +14,7 @@ install-loghelper: # Lint a module, example: just lint wasp lint dir_path: - cd {{dir_path}} && golangci-lint --color=always run -v -c {{invocation_directory()}}/.golangci.yaml + cd {{dir_path}} && golangci-lint --color=always run --fix -v -c {{invocation_directory()}}/.golangci.yaml # Lint all the modules lint-all: