From a60a85228ca1223d03f345ef5936a8cf4bb6cb8e Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 4 Apr 2026 15:31:08 +0000 Subject: [PATCH 1/5] Add resource usage validation tests Existing tests verify aggregation math with synthetic data and basic sanity bounds, but don't confirm duct's measurements match actual resource usage. These tests run programs with known, predictable resource consumption and assert duct reports values within expected bounds: - Memory allocation: allocate N MB, verify peak_rss >= N MB - Wall clock time: sleep N seconds, verify reported time ~= N - Idle CPU: sleep reports near-zero CPU - CPU intensive: busy-loop reports significant CPU - Usage samples: verify JSONL structure and multiple reports - Consistent memory: held allocation visible across samples https://claude.ai/code/session_01FYRR4Y8PzNLBU344ovdDS5 --- test/duct_main/test_resource_validation.py | 233 +++++++++++++++++++++ 1 file changed, 233 insertions(+) create mode 100644 test/duct_main/test_resource_validation.py diff --git a/test/duct_main/test_resource_validation.py b/test/duct_main/test_resource_validation.py new file mode 100644 index 00000000..7ecf49f9 --- /dev/null +++ b/test/duct_main/test_resource_validation.py @@ -0,0 +1,233 @@ +"""Validate that duct-reported resource stats match actual resource usage. + +These tests run programs with known, predictable resource consumption +(memory allocation, CPU usage) and verify that duct's measurements +fall within expected bounds. This bridges the gap between unit tests +(which verify aggregation math) and real-world accuracy. +""" +from __future__ import annotations + +import json +import os +import sys +from pathlib import Path + +import pytest +from utils import run_duct_command + +from con_duct._constants import SUFFIXES + +TEST_SCRIPT = str(Path(__file__).parent.parent / "data" / "test_script.py") + + +def _read_info(temp_output_dir: str) -> dict: + with open(os.path.join(temp_output_dir, SUFFIXES["info"])) as f: + return json.loads(f.read()) + + +def _read_usage(temp_output_dir: str) -> list[dict]: + lines = [] + with open(os.path.join(temp_output_dir, SUFFIXES["usage"])) as f: + for line in f: + line = line.strip() + if line: + lines.append(json.loads(line)) + return lines + + +@pytest.mark.flaky(reruns=3) +def test_memory_allocation_detected(temp_output_dir: str) -> None: + """Allocate a known amount of memory and verify duct detects it. + + Runs test_script.py which allocates --memory-size MB via bytearray. + Duct should report peak RSS at least that large (plus Python overhead). + """ + alloc_mb = 50 + alloc_bytes = alloc_mb * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + TEST_SCRIPT, + "--duration", + "2", + "--memory-size", + str(alloc_mb), + "--cpu-load", + "1", + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + peak_rss = summary["peak_rss"] + # RSS must be at least the allocated amount (bytearray is contiguous in memory) + assert peak_rss >= alloc_bytes, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"allocated ({alloc_mb} MB)" + ) + # Sanity upper bound: shouldn't report more than allocation + 200MB overhead + overhead_limit = alloc_bytes + 200 * 1024 * 1024 + assert peak_rss < overhead_limit, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) unreasonably high " + f"for {alloc_mb} MB allocation" + ) + + +@pytest.mark.flaky(reruns=3) +def test_wall_clock_time_accurate(temp_output_dir: str) -> None: + """Verify wall clock time matches actual sleep duration.""" + duration = 1.0 + + assert ( + run_duct_command( + ["sleep", str(duration)], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + wall_clock = info["execution_summary"]["wall_clock_time"] + + # Should be close to the requested duration + assert wall_clock >= duration, ( + f"wall_clock_time ({wall_clock:.2f}s) < requested sleep ({duration}s)" + ) + # Allow generous overhead for slow CI environments + assert wall_clock < duration + 2.0, ( + f"wall_clock_time ({wall_clock:.2f}s) unreasonably high " + f"for {duration}s sleep" + ) + + +@pytest.mark.flaky(reruns=3) +def test_idle_process_low_cpu(temp_output_dir: str) -> None: + """A sleeping process should report near-zero CPU usage.""" + assert ( + run_duct_command( + ["sleep", "1"], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + assert summary["peak_pcpu"] < 5.0, ( + f"peak_pcpu ({summary['peak_pcpu']}) should be near-zero for sleep" + ) + assert summary["average_pcpu"] < 5.0, ( + f"average_pcpu ({summary['average_pcpu']}) should be near-zero for sleep" + ) + + +@pytest.mark.flaky(reruns=3) +def test_cpu_intensive_detected(temp_output_dir: str) -> None: + """A busy-loop process should report significant CPU usage.""" + assert ( + run_duct_command( + [ + sys.executable, + TEST_SCRIPT, + "--duration", + "2", + "--memory-size", + "1", + "--cpu-load", + "100000", + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + # A busy-loop should show meaningful CPU usage + assert summary["peak_pcpu"] > 10.0, ( + f"peak_pcpu ({summary['peak_pcpu']}) should be significant for busy-loop" + ) + + +@pytest.mark.flaky(reruns=3) +def test_usage_samples_recorded(temp_output_dir: str) -> None: + """Verify that usage.jsonl contains samples with expected structure.""" + assert ( + run_duct_command( + ["sleep", "1"], + sample_interval=0.1, + report_interval=0.3, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + + # With 1s sleep and 0.3s report interval, expect at least 2 reports + assert len(samples) >= 2, ( + f"Expected at least 2 usage samples, got {len(samples)}" + ) + + for i, sample in enumerate(samples): + assert "timestamp" in sample, f"Sample {i} missing timestamp" + assert "totals" in sample, f"Sample {i} missing totals" + totals = sample["totals"] + assert "rss" in totals, f"Sample {i} totals missing rss" + assert "pcpu" in totals, f"Sample {i} totals missing pcpu" + # RSS should be non-negative + assert totals["rss"] >= 0, f"Sample {i} has negative rss: {totals['rss']}" + + +@pytest.mark.flaky(reruns=3) +def test_multiple_samples_show_consistent_memory(temp_output_dir: str) -> None: + """Memory held for the full duration should appear consistently across samples.""" + alloc_mb = 30 + alloc_bytes = alloc_mb * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + TEST_SCRIPT, + "--duration", + "2", + "--memory-size", + str(alloc_mb), + "--cpu-load", + "1", + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + assert len(samples) >= 2, f"Expected multiple samples, got {len(samples)}" + + # At least some samples should show the allocated memory + samples_above_threshold = [ + s for s in samples if s["totals"]["rss"] >= alloc_bytes + ] + assert len(samples_above_threshold) >= 1, ( + f"No usage samples showed RSS >= {alloc_mb} MB. " + f"Sample RSS values: {[s['totals']['rss'] / 1024 / 1024 for s in samples]}" + ) From 0128f8510bf2009b215e939214b42e59f3b2aeb8 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 4 Apr 2026 15:35:01 +0000 Subject: [PATCH 2/5] Add child/forked process resource validation tests Existing e2e tests verify duct counts child PIDs correctly, but never check that resource stats are actually aggregated across children. These new tests spawn multiple child processes with known memory allocations and verify: - Total RSS reflects sum across all children (N x M MB) - Individual child processes appear in usage.jsonl with correct RSS - total_rss in each sample equals the sum of per-process RSS values Adds test/data/memory_children.py helper that uses multiprocessing to fork N children each holding M MB for a specified duration. https://claude.ai/code/session_01FYRR4Y8PzNLBU344ovdDS5 --- test/data/memory_children.py | 40 ++++++ test/duct_main/test_resource_validation.py | 138 ++++++++++++++++++++- 2 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 test/data/memory_children.py diff --git a/test/data/memory_children.py b/test/data/memory_children.py new file mode 100644 index 00000000..ff274eda --- /dev/null +++ b/test/data/memory_children.py @@ -0,0 +1,40 @@ +#!/usr/bin/env python3 +"""Spawn child processes that each allocate a known amount of memory. + +Usage: memory_children.py + +Each child allocates mb_per_child MB and holds it for hold_seconds. +The parent waits for all children to finish. +""" +from __future__ import annotations + +import multiprocessing +import sys +import time + + +def _allocate_and_hold(mb: int, seconds: float) -> None: + """Allocate mb megabytes and hold for seconds.""" + _data = bytearray(mb * 1024 * 1024) + time.sleep(seconds) + + +def main() -> None: + num_children = int(sys.argv[1]) + mb_per_child = int(sys.argv[2]) + hold_seconds = float(sys.argv[3]) + + processes = [] + for _ in range(num_children): + p = multiprocessing.Process( + target=_allocate_and_hold, args=(mb_per_child, hold_seconds) + ) + p.start() + processes.append(p) + + for p in processes: + p.join() + + +if __name__ == "__main__": + main() diff --git a/test/duct_main/test_resource_validation.py b/test/duct_main/test_resource_validation.py index 7ecf49f9..910d0cd6 100644 --- a/test/duct_main/test_resource_validation.py +++ b/test/duct_main/test_resource_validation.py @@ -17,7 +17,9 @@ from con_duct._constants import SUFFIXES -TEST_SCRIPT = str(Path(__file__).parent.parent / "data" / "test_script.py") +TEST_DATA_DIR = Path(__file__).parent.parent / "data" +TEST_SCRIPT = str(TEST_DATA_DIR / "test_script.py") +MEMORY_CHILDREN_SCRIPT = str(TEST_DATA_DIR / "memory_children.py") def _read_info(temp_output_dir: str) -> dict: @@ -231,3 +233,137 @@ def test_multiple_samples_show_consistent_memory(temp_output_dir: str) -> None: f"No usage samples showed RSS >= {alloc_mb} MB. " f"Sample RSS values: {[s['totals']['rss'] / 1024 / 1024 for s in samples]}" ) + + +# --- Child/forked process resource validation --- + + +@pytest.mark.flaky(reruns=3) +def test_child_processes_memory_aggregated(temp_output_dir: str) -> None: + """Spawn children that each allocate memory; verify total RSS reflects the sum. + + Uses multiprocessing to fork N children each holding M MB. + The total RSS across all processes should be at least N * M MB. + """ + num_children = 3 + mb_per_child = 20 + hold_seconds = 3.0 + total_alloc_bytes = num_children * mb_per_child * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(num_children), + str(mb_per_child), + str(hold_seconds), + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + info = _read_info(temp_output_dir) + summary = info["execution_summary"] + + # peak_rss is total across all tracked processes + peak_rss = summary["peak_rss"] + assert peak_rss >= total_alloc_bytes, ( + f"peak_rss ({peak_rss / 1024 / 1024:.1f} MB) should be >= " + f"total allocation ({num_children} x {mb_per_child} = " + f"{num_children * mb_per_child} MB)" + ) + + # Also check usage.jsonl samples show multiple processes + samples = _read_usage(temp_output_dir) + max_pids_seen = max(len(s["processes"]) for s in samples) + # Should see parent + N children (at least N+1 processes) + assert max_pids_seen >= num_children + 1, ( + f"Expected at least {num_children + 1} processes in samples, " + f"but max PIDs in any sample was {max_pids_seen}" + ) + + +@pytest.mark.flaky(reruns=3) +def test_child_processes_individually_tracked(temp_output_dir: str) -> None: + """Verify per-process stats in usage.jsonl track individual children.""" + num_children = 2 + mb_per_child = 25 + hold_seconds = 3.0 + child_alloc_bytes = mb_per_child * 1024 * 1024 + + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(num_children), + str(mb_per_child), + str(hold_seconds), + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + + # Find samples where children are running (multiple processes visible) + multi_proc_samples = [s for s in samples if len(s["processes"]) > 1] + assert len(multi_proc_samples) >= 1, "No samples captured multiple processes" + + # In at least one sample, individual child processes should show their allocation + # (each child holds mb_per_child MB) + children_with_expected_rss = set() + for sample in multi_proc_samples: + for pid, proc in sample["processes"].items(): + if proc["rss"] >= child_alloc_bytes: + children_with_expected_rss.add(pid) + + assert len(children_with_expected_rss) >= num_children, ( + f"Expected at least {num_children} child processes with RSS >= {mb_per_child} MB, " + f"found {len(children_with_expected_rss)}. " + f"Per-process RSS values: " + f"{[{pid: p['rss'] / 1024 / 1024 for pid, p in s['processes'].items()} for s in multi_proc_samples[:3]]}" + ) + + +@pytest.mark.flaky(reruns=3) +def test_total_rss_is_sum_of_processes(temp_output_dir: str) -> None: + """Verify that total_rss in each sample equals sum of per-process RSS.""" + num_children = 2 + mb_per_child = 15 + hold_seconds = 2.0 + + assert ( + run_duct_command( + [ + sys.executable, + MEMORY_CHILDREN_SCRIPT, + str(num_children), + str(mb_per_child), + str(hold_seconds), + ], + sample_interval=0.1, + report_interval=0.5, + output_prefix=temp_output_dir, + ) + == 0 + ) + + samples = _read_usage(temp_output_dir) + + for i, sample in enumerate(samples): + per_process_rss_sum = sum( + proc["rss"] for proc in sample["processes"].values() + ) + reported_total = sample["totals"]["rss"] + assert reported_total == per_process_rss_sum, ( + f"Sample {i}: total_rss ({reported_total}) != sum of per-process RSS " + f"({per_process_rss_sum})" + ) From 6645c90be025710db78e65e244c986decf1217eb Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 4 Apr 2026 15:38:39 +0000 Subject: [PATCH 3/5] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- test/data/memory_children.py | 2 +- test/duct_main/test_resource_validation.py | 42 +++++++++------------- 2 files changed, 18 insertions(+), 26 deletions(-) diff --git a/test/data/memory_children.py b/test/data/memory_children.py index ff274eda..c9ab1b42 100644 --- a/test/data/memory_children.py +++ b/test/data/memory_children.py @@ -6,8 +6,8 @@ Each child allocates mb_per_child MB and holds it for hold_seconds. The parent waits for all children to finish. """ -from __future__ import annotations +from __future__ import annotations import multiprocessing import sys import time diff --git a/test/duct_main/test_resource_validation.py b/test/duct_main/test_resource_validation.py index 910d0cd6..fef0fbb6 100644 --- a/test/duct_main/test_resource_validation.py +++ b/test/duct_main/test_resource_validation.py @@ -5,16 +5,14 @@ fall within expected bounds. This bridges the gap between unit tests (which verify aggregation math) and real-world accuracy. """ -from __future__ import annotations +from __future__ import annotations import json import os -import sys from pathlib import Path - +import sys import pytest from utils import run_duct_command - from con_duct._constants import SUFFIXES TEST_DATA_DIR = Path(__file__).parent.parent / "data" @@ -102,9 +100,9 @@ def test_wall_clock_time_accurate(temp_output_dir: str) -> None: wall_clock = info["execution_summary"]["wall_clock_time"] # Should be close to the requested duration - assert wall_clock >= duration, ( - f"wall_clock_time ({wall_clock:.2f}s) < requested sleep ({duration}s)" - ) + assert ( + wall_clock >= duration + ), f"wall_clock_time ({wall_clock:.2f}s) < requested sleep ({duration}s)" # Allow generous overhead for slow CI environments assert wall_clock < duration + 2.0, ( f"wall_clock_time ({wall_clock:.2f}s) unreasonably high " @@ -128,12 +126,12 @@ def test_idle_process_low_cpu(temp_output_dir: str) -> None: info = _read_info(temp_output_dir) summary = info["execution_summary"] - assert summary["peak_pcpu"] < 5.0, ( - f"peak_pcpu ({summary['peak_pcpu']}) should be near-zero for sleep" - ) - assert summary["average_pcpu"] < 5.0, ( - f"average_pcpu ({summary['average_pcpu']}) should be near-zero for sleep" - ) + assert ( + summary["peak_pcpu"] < 5.0 + ), f"peak_pcpu ({summary['peak_pcpu']}) should be near-zero for sleep" + assert ( + summary["average_pcpu"] < 5.0 + ), f"average_pcpu ({summary['average_pcpu']}) should be near-zero for sleep" @pytest.mark.flaky(reruns=3) @@ -162,9 +160,9 @@ def test_cpu_intensive_detected(temp_output_dir: str) -> None: summary = info["execution_summary"] # A busy-loop should show meaningful CPU usage - assert summary["peak_pcpu"] > 10.0, ( - f"peak_pcpu ({summary['peak_pcpu']}) should be significant for busy-loop" - ) + assert ( + summary["peak_pcpu"] > 10.0 + ), f"peak_pcpu ({summary['peak_pcpu']}) should be significant for busy-loop" @pytest.mark.flaky(reruns=3) @@ -183,9 +181,7 @@ def test_usage_samples_recorded(temp_output_dir: str) -> None: samples = _read_usage(temp_output_dir) # With 1s sleep and 0.3s report interval, expect at least 2 reports - assert len(samples) >= 2, ( - f"Expected at least 2 usage samples, got {len(samples)}" - ) + assert len(samples) >= 2, f"Expected at least 2 usage samples, got {len(samples)}" for i, sample in enumerate(samples): assert "timestamp" in sample, f"Sample {i} missing timestamp" @@ -226,9 +222,7 @@ def test_multiple_samples_show_consistent_memory(temp_output_dir: str) -> None: assert len(samples) >= 2, f"Expected multiple samples, got {len(samples)}" # At least some samples should show the allocated memory - samples_above_threshold = [ - s for s in samples if s["totals"]["rss"] >= alloc_bytes - ] + samples_above_threshold = [s for s in samples if s["totals"]["rss"] >= alloc_bytes] assert len(samples_above_threshold) >= 1, ( f"No usage samples showed RSS >= {alloc_mb} MB. " f"Sample RSS values: {[s['totals']['rss'] / 1024 / 1024 for s in samples]}" @@ -359,9 +353,7 @@ def test_total_rss_is_sum_of_processes(temp_output_dir: str) -> None: samples = _read_usage(temp_output_dir) for i, sample in enumerate(samples): - per_process_rss_sum = sum( - proc["rss"] for proc in sample["processes"].values() - ) + per_process_rss_sum = sum(proc["rss"] for proc in sample["processes"].values()) reported_total = sample["totals"]["rss"] assert reported_total == per_process_rss_sum, ( f"Sample {i}: total_rss ({reported_total}) != sum of per-process RSS " From 8a59ad7742c7910e72028fbf932e440fb0d1604f Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 4 Apr 2026 15:53:21 +0000 Subject: [PATCH 4/5] Fix flake8 lint: unused variable and line length - Use assert to reference bytearray instead of _ prefix (F841) - Break long f-string debug line into separate variable (B950) https://claude.ai/code/session_01FYRR4Y8PzNLBU344ovdDS5 --- test/data/memory_children.py | 3 ++- test/duct_main/test_resource_validation.py | 11 +++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/test/data/memory_children.py b/test/data/memory_children.py index c9ab1b42..33b512be 100644 --- a/test/data/memory_children.py +++ b/test/data/memory_children.py @@ -15,7 +15,8 @@ def _allocate_and_hold(mb: int, seconds: float) -> None: """Allocate mb megabytes and hold for seconds.""" - _data = bytearray(mb * 1024 * 1024) + data = bytearray(mb * 1024 * 1024) + assert data # prevent optimization time.sleep(seconds) diff --git a/test/duct_main/test_resource_validation.py b/test/duct_main/test_resource_validation.py index fef0fbb6..424cf583 100644 --- a/test/duct_main/test_resource_validation.py +++ b/test/duct_main/test_resource_validation.py @@ -319,11 +319,14 @@ def test_child_processes_individually_tracked(temp_output_dir: str) -> None: if proc["rss"] >= child_alloc_bytes: children_with_expected_rss.add(pid) + rss_debug = [ + {pid: p["rss"] / 1024 / 1024 for pid, p in s["processes"].items()} + for s in multi_proc_samples[:3] + ] assert len(children_with_expected_rss) >= num_children, ( - f"Expected at least {num_children} child processes with RSS >= {mb_per_child} MB, " - f"found {len(children_with_expected_rss)}. " - f"Per-process RSS values: " - f"{[{pid: p['rss'] / 1024 / 1024 for pid, p in s['processes'].items()} for s in multi_proc_samples[:3]]}" + f"Expected at least {num_children} child processes with RSS >= " + f"{mb_per_child} MB, found {len(children_with_expected_rss)}. " + f"Per-process RSS values: {rss_debug}" ) From c1c0432b0f2df6e2c352ed7cc593408bd9fd5cb1 Mon Sep 17 00:00:00 2001 From: Claude Date: Sat, 4 Apr 2026 15:58:01 +0000 Subject: [PATCH 5/5] Fix mypy no-any-return for json.loads helper return types json.loads() returns Any, so return type annotations of dict/list[dict] trigger mypy's no-any-return check. Use Any instead. https://claude.ai/code/session_01FYRR4Y8PzNLBU344ovdDS5 --- test/duct_main/test_resource_validation.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/duct_main/test_resource_validation.py b/test/duct_main/test_resource_validation.py index 424cf583..fe475cb1 100644 --- a/test/duct_main/test_resource_validation.py +++ b/test/duct_main/test_resource_validation.py @@ -11,6 +11,7 @@ import os from pathlib import Path import sys +from typing import Any import pytest from utils import run_duct_command from con_duct._constants import SUFFIXES @@ -20,12 +21,12 @@ MEMORY_CHILDREN_SCRIPT = str(TEST_DATA_DIR / "memory_children.py") -def _read_info(temp_output_dir: str) -> dict: +def _read_info(temp_output_dir: str) -> Any: with open(os.path.join(temp_output_dir, SUFFIXES["info"])) as f: return json.loads(f.read()) -def _read_usage(temp_output_dir: str) -> list[dict]: +def _read_usage(temp_output_dir: str) -> list[Any]: lines = [] with open(os.path.join(temp_output_dir, SUFFIXES["usage"])) as f: for line in f: