diff --git a/README.md b/README.md index 5123f449..a3744915 100644 --- a/README.md +++ b/README.md @@ -58,8 +58,6 @@ $ nac-test --help │ --verbose Enables verbose output for nac-test, │ │ Robot and PyATS execution. | │ [env var: NAC_TEST_VERBOSE] │ -│ --merged-data-file… -m TEXT Filename for merged data model. │ -│ [default: merged_data_model_test...] │ │ --loglevel -l [DEBUG|...] Log level. [default: WARNING] │ │ --version Display version number. │ │ --help Show this message and exit. │ @@ -372,17 +370,9 @@ Before test execution, `nac-test` merges all YAML data files into a single data 1. All files from `--data` paths are recursively loaded 2. YAML structures are deep-merged (later files override earlier ones) -3. The merged result is written to the output directory +3. The merged result is written to the output directory as `merged_data_model_test_variables.yaml` 4. Both Robot and PyATS tests reference this merged data -### Custom Filename - -By default, the merged file is named `merged_data_model_test_variables.yaml`. You can customize this: - -```bash -nac-test -d ./data -t ./tests -o ./output -m my_custom_data.yaml -``` - ### Accessing the Merged Data The merged data model is available to: diff --git a/dev-docs/PRD_AND_ARCHITECTURE.md b/dev-docs/PRD_AND_ARCHITECTURE.md index 4230a4fc..bb1a0dd5 100644 --- a/dev-docs/PRD_AND_ARCHITECTURE.md +++ b/dev-docs/PRD_AND_ARCHITECTURE.md @@ -842,20 +842,6 @@ nac-test provides 13 CLI flags covering data input, template configuration, exec nac-test -d config/ -t templates/ --tests custom_tests/ -o output/ ``` -**`-m, --merged-data-filename`** (OPTIONAL) -- **Type**: `str` -- **Default**: `"merged_data_model_test_variables.yaml"` -- **Environment Variable**: None (CLI only) -- **Purpose**: Custom filename for merged data model YAML (SOT shared across frameworks) -- **Behavior**: - - Written to `{output_dir}/{merged_data_filename}` - - Passed to CombinedOrchestrator constructor - - Both PyATS and Robot read from this shared file -- **Example**: - ```bash - nac-test -d config/ -t templates/ -o output/ -m merged_data_prod.yaml - ``` - ##### Execution Control Flags **`-i, --include`** (OPTIONAL) @@ -1296,7 +1282,7 @@ exit() # Checks error_handler.fired for exit code #### Environment Variable Support -Every CLI flag (except `--version` and `--merged-data-filename`) supports environment variable configuration for CI/CD and containerized environments. +Every CLI flag (except `--version`) supports environment variable configuration for CI/CD and containerized environments. **Environment Variable Mapping Table**: @@ -6072,16 +6058,63 @@ class SystemResourceCalculator: ### Cleanup Utilities (`utils/cleanup.py`) -Resource cleanup on exit: +#### CleanupManager + +`CleanupManager` is a process-wide singleton that ensures registered files +are deleted when the process exits, regardless of how it exits. + +**Triggers covered:** +- Normal exit via `atexit` (including `typer.Exit`) +- `SIGTERM` (e.g. `docker stop`, `kill`) — Unix only +- `SIGINT` (Ctrl+C / `KeyboardInterrupt`) — Unix only + +`SIGKILL` cannot be intercepted; files may remain if the process is killed +with `-9`. + +**Key methods:** ```python -def cleanup_temp_files(patterns: List[str]) -> None: - """Remove temporary files matching patterns.""" +cleanup = get_cleanup_manager() # always returns the singleton -def cleanup_output_directory(output_dir: Path) -> None: - """Clean up output directory before run.""" +cleanup.register(path) # delete path on exit +cleanup.register(path, keep_if_debug=True) # skip deletion when NAC_TEST_DEBUG=true +cleanup.unregister(path) # cancel a previously registered path +cleanup.run_cleanup() # trigger cleanup immediately (idempotent) ``` +**`keep_if_debug` flag** + +Files registered with `keep_if_debug=True` are retained when +`NAC_TEST_DEBUG=true` is set. Use this for intermediate files that are +useful for debugging (job scripts, per-device testbed YAMLs, merged data +model). Sensitive files (e.g. files containing credentials) should always +be registered without this flag so they are unconditionally removed. + +**Thread safety:** all operations are protected by an internal lock. + +**Fork safety:** the singleton must not be used in forked child processes. +If a process is forked while `CleanupManager` is initialised, the child +inherits the lock in an undefined state and cleanup behaviour is +unpredictable. Subprocesses spawned via `subprocess.Popen` are unaffected +— they start a fresh interpreter with no inherited singleton state. + +**Signal handler behaviour:** on SIGTERM the original handler is restored +and the signal is re-raised via `signal.raise_signal()` so upstream +process managers (Docker, systemd) see the expected exit status. On SIGINT +the original handler is restored and `KeyboardInterrupt` is raised so the +normal Python exception handling chain proceeds. + +#### PyATS runtime cleanup helpers + +Three standalone functions handle CI/CD-specific directory hygiene (not +related to `CleanupManager`): + +| Function | Purpose | +|---|---| +| `cleanup_pyats_runtime(workspace_path)` | Removes the `.pyats/` runtime directory before a run to prevent disk exhaustion | +| `cleanup_old_test_outputs(output_dir, days)` | Removes `.jsonl` result files older than `days` days | +| `cleanup_stale_test_artifacts(output_dir)` | Removes `api/`, `d2d/`, and `default/` subdirectories containing stale JSONL files from interrupted runs | + ### Environment Utilities (`utils/environment.py`) Environment variable handling: diff --git a/nac_test/cli/main.py b/nac_test/cli/main.py index 76092eda..1a17fcce 100644 --- a/nac_test/cli/main.py +++ b/nac_test/cli/main.py @@ -26,6 +26,7 @@ EXIT_INVALID_ARGS, ) from nac_test.data_merger import DataMerger +from nac_test.utils.cleanup import get_cleanup_manager from nac_test.utils.formatting import format_duration from nac_test.utils.logging import ( DEFAULT_LOGLEVEL, @@ -173,16 +174,6 @@ def version_callback(value: bool) -> None: ] -MergedDataFilename = Annotated[ - str, - typer.Option( - "-m", - "--merged-data-filename", - help="Filename for the merged data model YAML file.", - ), -] - - RenderOnly = Annotated[ bool, typer.Option( @@ -323,7 +314,6 @@ def main( version: Version = False, # noqa: ARG001 diagnostic: Diagnostic = False, # noqa: ARG001 verbose: Verbose = False, - merged_data_filename: MergedDataFilename = "merged_data_model_test_variables.yaml", ) -> None: """A CLI tool to render and execute Robot Framework and PyATS tests using Jinja templating. @@ -394,7 +384,11 @@ def main( typer.echo("\n\n📄 Merging data model files...") merged_data = DataMerger.merge_data_files(data) - DataMerger.write_merged_data_model(merged_data, output, merged_data_filename) + merged_data_path = DataMerger.write_merged_data_model(merged_data, output) + + # Register merged data file for cleanup on exit/signal (SIGTERM/SIGINT) + cleanup_manager = get_cleanup_manager() + cleanup_manager.register(merged_data_path, keep_if_debug=True) duration = (datetime.now() - start_time).total_seconds() typer.echo(f"✅ Data model merging completed ({format_duration(duration)})") @@ -403,9 +397,9 @@ def main( orchestrator = CombinedOrchestrator( data_paths=data, templates_dir=templates, - custom_testbed_path=testbed, output_dir=output, - merged_data_filename=merged_data_filename, + merged_data=merged_data, + custom_testbed_path=testbed, filters_path=filters, tests_path=tests, include_tags=include, @@ -439,11 +433,11 @@ def main( raise typer.Exit(EXIT_INTERRUPTED) from None except Exception as e: # Infrastructure errors (template rendering, controller detection, etc.) + logger.exception("Unexpected error during execution") typer.echo(f"Error during execution: {e}") - # Progressive disclosure: clean output for customers, full context for developers - if DEBUG_MODE: - raise typer.Exit(EXIT_ERROR) from e # Developer: full exception context - raise typer.Exit(EXIT_ERROR) from None # Customer: clean output + # Pretty exception display is controlled by pretty_exceptions_enable=DEBUG_MODE + # on the Typer app — no need for a separate DEBUG_MODE branch here. + raise typer.Exit(EXIT_ERROR) from e # Display total runtime before exit runtime_end = datetime.now() diff --git a/nac_test/combined_orchestrator.py b/nac_test/combined_orchestrator.py index 0589cd53..80aaf581 100644 --- a/nac_test/combined_orchestrator.py +++ b/nac_test/combined_orchestrator.py @@ -6,6 +6,7 @@ import logging import os from pathlib import Path +from typing import Any import typer @@ -74,7 +75,7 @@ def __init__( data_paths: list[Path], templates_dir: Path, output_dir: Path, - merged_data_filename: str, + merged_data: dict[str, Any] | None = None, filters_path: Path | None = None, tests_path: Path | None = None, include_tags: list[str] | None = None, @@ -97,7 +98,7 @@ def __init__( data_paths: List of paths to data model YAML files templates_dir: Directory containing test templates and PyATS test files output_dir: Base directory for test output - merged_data_filename: Name of the merged data model file + merged_data: Already-loaded merged data model dict (avoids re-reading from disk) filters_path: Path to Jinja filters (Robot only) tests_path: Path to Jinja tests (Robot only) include_tags: Tags to include (Robot only) @@ -117,7 +118,9 @@ def __init__( self.data_paths = data_paths self.templates_dir = Path(templates_dir) self.output_dir = Path(output_dir) - self.merged_data_filename = merged_data_filename + self.merged_data: dict[str, Any] = ( + merged_data if merged_data is not None else {} + ) # Robot-specific parameters self.filters_path = filters_path @@ -213,7 +216,6 @@ def run_tests(self) -> CombinedResults: data_paths=self.data_paths, test_dir=self.templates_dir, output_dir=self.output_dir, - merged_data_filename=self.merged_data_filename, minimal_reports=self.minimal_reports, custom_testbed_path=self.custom_testbed_path, controller_type=self.controller_type, @@ -233,10 +235,9 @@ def run_tests(self) -> CombinedResults: typer.echo(f"\n🤖 Running Robot Framework tests{mode_suffix}...\n") robot_orchestrator = RobotOrchestrator( - data_paths=self.data_paths, templates_dir=self.templates_dir, output_dir=self.output_dir, - merged_data_filename=self.merged_data_filename, + merged_data=self.merged_data, filters_path=self.filters_path, tests_path=self.tests_path, include_tags=self.include_tags, @@ -405,7 +406,7 @@ def _print_execution_summary( if combined_dashboard.exists(): typer.echo(f"Dashboard: {combined_dashboard.resolve()}") if results.robot is not None and not results.robot.is_empty: - robot_log = self.output_dir / ROBOT_RESULTS_DIRNAME / "log.html" + robot_log = self.output_dir / ROBOT_RESULTS_DIRNAME / LOG_HTML if robot_log.exists(): typer.echo(f"Robot: {robot_log.resolve()}") if results.api is not None and not results.api.is_empty: diff --git a/nac_test/core/constants.py b/nac_test/core/constants.py index a51d916a..ddc0c334 100644 --- a/nac_test/core/constants.py +++ b/nac_test/core/constants.py @@ -84,6 +84,9 @@ REPORT_HTML: str = "report.html" XUNIT_XML: str = "xunit.xml" ORDERING_FILENAME: str = "ordering.txt" +MERGED_DATA_FILENAME: str = "merged_data_model_test_variables.yaml" +# Owner read/write only — prevents other users from reading sensitive merged data +MERGED_DATA_FILE_MODE: int = 0o600 SUMMARY_SEPARATOR_WIDTH: int = 70 # HTTP status code range boundaries diff --git a/nac_test/data_merger.py b/nac_test/data_merger.py index 157def6c..07efe84a 100644 --- a/nac_test/data_merger.py +++ b/nac_test/data_merger.py @@ -4,11 +4,18 @@ """Shared data merging utilities for both Robot and PyATS test execution.""" import logging +import os from pathlib import Path from typing import Any from nac_yaml import yaml +from nac_test.core.constants import ( + IS_WINDOWS, + MERGED_DATA_FILE_MODE, + MERGED_DATA_FILENAME, +) + logger = logging.getLogger(__name__) @@ -32,33 +39,41 @@ def merge_data_files(data_paths: list[Path]) -> dict[str, Any]: # Ensure we always return a dict, even if yaml returns None return data if isinstance(data, dict) else {} + @staticmethod + def merged_data_path(output_directory: Path) -> Path: + """Return the path where the merged data model file will be written. + + Single source of truth for locating the merged data file — use this + instead of constructing the path manually from the constant. + + Args: + output_directory: Base output directory passed to write_merged_data_model() + + Returns: + Full path to the merged data model YAML file + """ + return output_directory / MERGED_DATA_FILENAME + @staticmethod def write_merged_data_model( data: dict[str, Any], output_directory: Path, - filename: str = "merged_data_model_test_variables.yaml", - ) -> None: + ) -> Path: """Write merged data model to YAML file. + The output filename is always MERGED_DATA_FILENAME — the single fixed + location used by all consumers (Robot, PyATS subprocesses, cleanup). + Args: data: The merged data dictionary to write output_directory: Directory where the YAML file will be saved - filename: Name of the output YAML file - """ - full_output_path = output_directory / filename - logger.info("Writing merged data model to %s", full_output_path) - yaml.write_yaml_file(data, full_output_path) - - @staticmethod - def load_yaml_file(file_path: Path) -> dict[str, Any]: - """Load a single YAML file from the provided path. - - Args: - file_path: Path to the YAML file to load Returns: - Loaded dictionary from the YAML file + Path to the written file (use this instead of reconstructing the path) """ - logger.info("Loading yaml file from %s", file_path) - data = yaml.load_yaml_files([file_path]) - return data if data is not None else {} + full_output_path = DataMerger.merged_data_path(output_directory) + logger.info("Writing merged data model to %s", full_output_path) + yaml.write_yaml_file(data, full_output_path) + if not IS_WINDOWS: + os.chmod(full_output_path, MERGED_DATA_FILE_MODE) + return full_output_path diff --git a/nac_test/pyats_core/execution/device/device_executor.py b/nac_test/pyats_core/execution/device/device_executor.py index 9ebc0454..d4d5433b 100644 --- a/nac_test/pyats_core/execution/device/device_executor.py +++ b/nac_test/pyats_core/execution/device/device_executor.py @@ -14,6 +14,7 @@ from nac_test.pyats_core.constants import ENV_TEST_DIR from nac_test.pyats_core.execution.job_generator import JobGenerator from nac_test.pyats_core.execution.subprocess_runner import SubprocessRunner +from nac_test.utils.cleanup import get_cleanup_manager from .testbed_generator import TestbedGenerator @@ -30,6 +31,7 @@ def __init__( test_status: dict[str, Any], test_dir: Path, base_output_dir: Path, + merged_data_path: Path, custom_testbed_path: Path | None = None, ): """Initialize device executor. @@ -40,6 +42,7 @@ def __init__( test_status: Dictionary for tracking test status test_dir: Directory containing PyATS test files (user-specified) base_output_dir: Base output directory for test results + merged_data_path: Path to the merged data model YAML file custom_testbed_path: Optional path to custom PyATS testbed YAML """ self.job_generator = job_generator @@ -47,6 +50,7 @@ def __init__( self.test_status = test_status self.test_dir = test_dir self.base_output_dir = base_output_dir + self.merged_data_path = merged_data_path self.custom_testbed_path = custom_testbed_path async def run_device_job_with_semaphore( @@ -97,6 +101,11 @@ async def run_device_job_with_semaphore( testbed_file.write(testbed_content) testbed_file_path = Path(testbed_file.name) + # Register temp files for deletion on exit (kept when NAC_TEST_DEBUG is set). + cleanup = get_cleanup_manager() + cleanup.register(job_file_path, keep_if_debug=True) + cleanup.register(testbed_file_path, keep_if_debug=True) + # Set up environment for this device # Always start with a copy of os.environ to preserve PATH and other variables env = os.environ.copy() @@ -105,8 +114,7 @@ async def run_device_job_with_semaphore( "HOSTNAME": hostname, "DEVICE_INFO": json.dumps(device), "MERGED_DATA_MODEL_TEST_VARIABLES_FILEPATH": str( - self.base_output_dir - / "merged_data_model_test_variables.yaml" + self.merged_data_path ), "NAC_TEST_TYPE": "d2d", ENV_TEST_DIR: str(self.test_dir), @@ -148,13 +156,6 @@ async def run_device_job_with_semaphore( if test_name in self.test_status: self.test_status[test_name]["status"] = status - # Clean up temporary files -- UNCOMMENT ME - # try: - # job_file_path.unlink() - # testbed_file_path.unlink() - # except Exception: - # pass - if archive_path: logger.info( f"Completed tests for device {hostname}: {archive_path}" diff --git a/nac_test/pyats_core/orchestrator.py b/nac_test/pyats_core/orchestrator.py index 62a5f7c4..13520042 100644 --- a/nac_test/pyats_core/orchestrator.py +++ b/nac_test/pyats_core/orchestrator.py @@ -22,6 +22,7 @@ SUMMARY_SEPARATOR_WIDTH, ) from nac_test.core.types import PyATSResults, TestResults +from nac_test.data_merger import DataMerger from nac_test.pyats_core.broker.connection_broker import ConnectionBroker from nac_test.pyats_core.constants import ( DEFAULT_CPU_MULTIPLIER, @@ -66,7 +67,6 @@ def __init__( data_paths: list[Path], test_dir: Path, output_dir: Path, - merged_data_filename: str, minimal_reports: bool = False, custom_testbed_path: Path | None = None, controller_type: str | None = None, @@ -80,7 +80,6 @@ def __init__( data_paths: List of paths to data model YAML files test_dir: Directory containing PyATS test files output_dir: Base output directory (orchestrator creates pyats_results subdirectory) - merged_data_filename: Name of the merged data model file minimal_reports: Only include command outputs for failed/errored tests in reports custom_testbed_path: Path to custom PyATS testbed YAML for device overrides controller_type: The detected controller type (e.g., "ACI", "SDWAN", "CC"). @@ -98,7 +97,10 @@ def __init__( self.output_dir = ( self.base_output_dir / PYATS_RESULTS_DIRNAME ) # PyATS works in its own subdirectory - self.merged_data_filename = merged_data_filename + # Absolute path so child processes can locate it regardless of working directory. + self.merged_data_path = DataMerger.merged_data_path( + self.base_output_dir + ).absolute() self.minimal_reports = minimal_reports self.custom_testbed_path = custom_testbed_path self.dry_run = dry_run @@ -140,7 +142,7 @@ def __init__( # Initialize discovery components self.test_discovery = TestDiscovery(self.test_dir) self.device_inventory_discovery = DeviceInventoryDiscovery( - self.base_output_dir / self.merged_data_filename + self.merged_data_path ) # Initialize execution components @@ -307,10 +309,8 @@ async def _execute_api_tests_standard(self, test_files: list[Path]) -> Path | No # We cannot pass Python objects across process boundaries # so we use env vars to communicate # configuration (like data file paths) from the orchestrator to the test subprocess. - # The merged data file is created by main.py at the base output level. - # Pass absolute path so the child process (with cwd set) can locate it. env["MERGED_DATA_MODEL_TEST_VARIABLES_FILEPATH"] = str( - (self.base_output_dir / self.merged_data_filename).absolute() + self.merged_data_path ) # Set NAC_TEST_TYPE to differentiate API vs D2D test types for separate temp directories # This prevents race conditions where both test types write JSONL files to the same location @@ -430,6 +430,7 @@ async def _execute_device_tests_with_broker( self.d2d_test_status, # Use d2d_test_status for device tests self.test_dir, self.base_output_dir, + self.merged_data_path, self.custom_testbed_path, ) diff --git a/nac_test/robot/orchestrator.py b/nac_test/robot/orchestrator.py index fd91f8d1..47fb1365 100644 --- a/nac_test/robot/orchestrator.py +++ b/nac_test/robot/orchestrator.py @@ -10,6 +10,7 @@ import logging from pathlib import Path +from typing import Any import typer @@ -46,10 +47,9 @@ class RobotOrchestrator: def __init__( self, - data_paths: list[Path], templates_dir: Path, output_dir: Path, - merged_data_filename: str, + merged_data: dict[str, Any] | None = None, filters_path: Path | None = None, tests_path: Path | None = None, include_tags: list[str] | None = None, @@ -64,10 +64,9 @@ def __init__( """Initialize the Robot Framework orchestrator. Args: - data_paths: List of paths to data model YAML files templates_dir: Directory containing Robot template files output_dir: Base output directory (orchestrator uses its robot_results subdirectory) - merged_data_filename: Name of the merged data model file + merged_data: Already-loaded merged data model dict (avoids re-reading from disk) filters_path: Optional path to filter files tests_path: Optional path to test files include_tags: Optional list of tags to include @@ -79,11 +78,12 @@ def __init__( loglevel: Log level verbose: Enable verbose mode - enables verbose output for pabot """ - self.data_paths = data_paths self.templates_dir = Path(templates_dir) self.base_output_dir = Path(output_dir) self.output_dir = self.base_output_dir / ROBOT_RESULTS_DIRNAME - self.merged_data_filename = merged_data_filename + self.merged_data: dict[str, Any] = ( + merged_data if merged_data is not None else {} + ) # Robot-specific parameters self.filters_path = filters_path @@ -105,7 +105,7 @@ def __init__( # Initialize Robot Framework components (reuse existing implementations) self.robot_writer = RobotWriter( - data_paths=self.data_paths, + merged_data=self.merged_data, filters_path=self.filters_path, tests_path=self.tests_path, include_tags=self.include_tags, @@ -118,10 +118,9 @@ def run_tests(self) -> TestResults: This method: 1. Creates the Robot Framework working directory under robot_results/ 2. Renders Robot test templates using RobotWriter - 3. Creates merged data model file in the Robot working directory - 4. Executes tests using pabot (unless render_only mode) - 5. Creates backward compatibility symlinks - 6. Extracts and returns test statistics + 3. Executes tests using pabot (unless render_only mode) + 4. Creates backward compatibility symlinks + 5. Extracts and returns test statistics Follows the same pattern as PyATSOrchestrator.run_tests(). @@ -137,19 +136,12 @@ def run_tests(self) -> TestResults: logger.info(f"Robot working directory: {self.output_dir}") logger.info(f"Templates directory: {self.templates_dir}") - # Phase 1: Template rendering (delegate to existing RobotWriter) + # Phase 2: Template rendering (delegate to existing RobotWriter) typer.echo("📝 Rendering Robot Framework templates...") self.robot_writer.write( self.templates_dir, self.output_dir, ordering_file=self.ordering_file ) - # Phase 2: Create merged data model in Robot working directory - # Note: Robot tests expect the merged data file in their working directory - logger.info("Creating merged data model for Robot tests") - self.robot_writer.write_merged_data_model( - self.output_dir, self.merged_data_filename - ) - # Phase 3: Test execution (unless render-only mode) if not self.render_only: typer.echo("🤖 Executing Robot Framework tests...\n\n") diff --git a/nac_test/robot/robot_writer.py b/nac_test/robot/robot_writer.py index f0178634..c6ae4ae1 100644 --- a/nac_test/robot/robot_writer.py +++ b/nac_test/robot/robot_writer.py @@ -3,7 +3,6 @@ import copy import importlib.util -import json import logging import os import pathlib @@ -11,7 +10,7 @@ import shutil import sys from pathlib import Path -from typing import Any, cast +from typing import Any from jinja2 import ( ChainableUndefined, @@ -22,8 +21,6 @@ from robot.api import SuiteVisitor, TestSuite from robot.utils import is_truthy -from nac_test.data_merger import DataMerger - logger = logging.getLogger(__name__) @@ -61,16 +58,13 @@ def start_test(self, test: Any) -> None: class RobotWriter: def __init__( self, - data_paths: list[Path], + merged_data: dict[str, Any], filters_path: Path | None, tests_path: Path | None, include_tags: list[str] | None = None, exclude_tags: list[str] | None = None, ) -> None: - self.data = DataMerger.merge_data_files(data_paths) - # Convert OrderedDict to dict once during initialization instead of per-template - # This eliminates expensive JSON round-trip serialization for each template render - self.template_data = self._convert_data_model_for_templates(self.data) + self.data = merged_data self.filters: dict[str, Any] = {} self.include_tags = include_tags or [] self.exclude_tags = exclude_tags or [] @@ -105,36 +99,6 @@ def __init__( self.tests[mod.Test.name] = mod.Test self.ordering_entries: list[str] = [] - def _convert_data_model_for_templates(self, data: dict[str, Any]) -> dict[str, Any]: - """Convert nested OrderedDict to dict to avoid duplicate dict keys. - - This method performs the same conversion that was previously done per-template, - but centralizes it during initialization for performance efficiency. - - Args: - data: Raw data model from DataMerger (may contain OrderedDict structures) - - Returns: - Converted data model safe for Jinja template rendering - - Note: - JSON round-trip approach is used as it's safe for all serializable data - and handles nested OrderedDict structures reliably. - """ - logger.debug( - "Converting data model for template rendering (one-time conversion)" - ) - try: - # JSON round-trip to convert nested OrderedDict to dict - # This preserves all data while fixing duplicate key issues (e.g., 'tag' fields) - converted_data = cast(dict[str, Any], json.loads(json.dumps(data))) - logger.debug("Data model conversion completed successfully") - return converted_data - except Exception as e: - logger.warning(f"Data model conversion failed: {e}. Using original data.") - # Fallback to original data if conversion fails - return data - def render_template( self, template_path: Path, @@ -152,13 +116,8 @@ def render_template( pathlib.Path(os.path.dirname(output_path)).mkdir(parents=True, exist_ok=True) template = env.get_template(template_path.as_posix()) - # Use custom_data if provided (for chunked templates), otherwise use pre-converted template_data - if custom_data is not None: - # Convert custom_data for template rendering (same as initialization conversion) - template_data = self._convert_data_model_for_templates(custom_data) - else: - # Use pre-converted data model (converted once during initialization) - template_data = self.template_data + # Use custom_data if provided (for chunked templates), otherwise use template_data + template_data = custom_data if custom_data is not None else self.data result = template.render(template_data, **kwargs) # remove extra empty lines @@ -384,7 +343,7 @@ def write( next_template = True path = params[2].split(".") attr = params[3] - elem: Any = self.template_data + elem: Any = self.data for p in path: try: elem = elem.get(p, {}) @@ -396,7 +355,13 @@ def write( if params[1] == "iterate_list_chunked": # Handle chunked iteration object_path = params[5] - chunk_size = int(params[6].rstrip("#}")) + try: + chunk_size = int(params[6].rstrip("#}")) + except (ValueError, IndexError) as e: + raise ValueError( + f"Invalid chunk_size in iterate_list_chunked directive " + f"in {Path(dir, filename)}: {match.group()!r}" + ) from e for item in elem: attr_value = item.get(attr) if attr_value is None: @@ -543,19 +508,3 @@ def write( else: # ensure we clean out a leftover ordering file if we don't want testlevelsplit ordering_file.unlink(missing_ok=True) - - def write_merged_data_model( - self, - output_directory: Path, - filename: str = "merged_data_model_test_variables.yaml", - ) -> None: - """Writes the merged data model to a specified YAML file. - - This method takes the internal, merged data dictionary (`self.data`) - and writes it to a YAML file in the specified output directory. - - Args: - output_directory: The directory where the YAML file will be saved. - filename: The name of the output YAML file. - """ - DataMerger.write_merged_data_model(self.data, output_directory, filename) diff --git a/nac_test/utils/cleanup.py b/nac_test/utils/cleanup.py index c45874e6..eafb3d02 100644 --- a/nac_test/utils/cleanup.py +++ b/nac_test/utils/cleanup.py @@ -3,16 +3,191 @@ """Cleanup utilities for nac-test framework.""" +import atexit import logging import shutil +import signal +import threading import time from pathlib import Path +from types import FrameType +from typing import Any +from nac_test.core.constants import DEBUG_MODE, IS_WINDOWS from nac_test.pyats_core.discovery.test_type_resolver import VALID_TEST_TYPES logger = logging.getLogger(__name__) +class CleanupManager: + """Thread-safe manager for registering files to be cleaned up on exit. + + Ensures registered files are removed on: + - Normal program exit (atexit) + - SIGTERM signal (e.g., docker stop, kill) + - SIGINT signal (Ctrl+C / KeyboardInterrupt) + + Note: SIGKILL cannot be caught - files may remain if process is killed with -9. + + Usage: + cleanup_manager = CleanupManager() + cleanup_manager.register(Path("/tmp/sensitive_file.yaml")) + # File will be automatically removed on exit + + # Skip deletion when NAC_TEST_DEBUG is set (useful for debugging): + cleanup_manager.register(Path("/tmp/temp_job.py"), keep_if_debug=True) + + Thread Safety: + All operations are thread-safe via internal locking. + + Fork Safety: + Not safe to use in forked child processes. If a process is forked + while the singleton is initialised, the child inherits the lock in + an undefined state. Subprocesses spawned via subprocess.Popen are + unaffected (they get a fresh interpreter). + """ + + _instance: "CleanupManager | None" = None + _instance_lock = threading.Lock() + _initialized: bool = False + + def __new__(cls) -> "CleanupManager": + """Singleton pattern - only one cleanup manager per process.""" + with cls._instance_lock: + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._initialized = False + return cls._instance + + def __init__(self) -> None: + """Initialize the cleanup manager (only runs once due to singleton).""" + if self._initialized: + return + + self._files: dict[Path, bool] = {} # path → keep_if_debug + self._lock = threading.Lock() + self._original_sigterm: Any = None + self._original_sigint: Any = None + self._cleanup_done = False + + # Register atexit handler + atexit.register(self.run_cleanup) + + # Install signal handlers (Unix only - Windows doesn't support SIGTERM properly) + if not IS_WINDOWS: + self._install_signal_handlers() + + self._initialized = True + logger.debug("CleanupManager initialized") + + def _install_signal_handlers(self) -> None: + """Install signal handlers for SIGTERM and SIGINT.""" + try: + self._original_sigterm = signal.signal(signal.SIGTERM, self._signal_handler) + self._original_sigint = signal.signal(signal.SIGINT, self._signal_handler) + logger.debug("Signal handlers installed for SIGTERM and SIGINT") + except (ValueError, OSError) as e: + # Can fail if not in main thread or signal not supported + logger.debug(f"Could not install signal handlers: {e}") + + def _signal_handler(self, signum: int, frame: FrameType | None) -> None: + """Handle SIGTERM/SIGINT by cleaning up and re-raising.""" + sig_name = signal.Signals(signum).name + logger.debug(f"Received {sig_name}, performing cleanup") + + # Perform cleanup + self.run_cleanup() + + # Re-raise the signal with original handler or default behavior + if signum == signal.SIGTERM: + original = self._original_sigterm + else: + original = self._original_sigint + + # Restore original handler and re-raise + signal.signal(signum, original if original is not None else signal.SIG_DFL) + + # For SIGINT, raise KeyboardInterrupt to allow normal exception handling + if signum == signal.SIGINT: + raise KeyboardInterrupt + + # For SIGTERM, re-send the signal + signal.raise_signal(signum) + + def register(self, path: Path, keep_if_debug: bool = False) -> None: + """Register a file path for cleanup on exit. + + Args: + path: Path to file that should be removed on exit. + Directories are not supported (use shutil.rmtree directly). + keep_if_debug: If True, the file will not be deleted when + NAC_TEST_DEBUG is set — useful for intermediate files + (job scripts, testbed YAMLs, merged data) that aid debugging. + """ + with self._lock: + # resolve() canonicalises symlinks (e.g. macOS /tmp → /private/tmp) + # so registration and cleanup always refer to the same inode. + resolved = path.resolve() + self._files[resolved] = keep_if_debug + logger.debug( + f"Registered for cleanup: {resolved} (keep_if_debug={keep_if_debug})" + ) + + def unregister(self, path: Path) -> None: + """Unregister a file path from cleanup. + + Args: + path: Path to file that should no longer be cleaned up. + """ + with self._lock: + resolved = path.resolve() + self._files.pop(resolved, None) + logger.debug(f"Unregistered from cleanup: {resolved}") + + def run_cleanup(self) -> None: + """Delete all registered files. Safe to call multiple times. + + Called automatically on normal exit (atexit), SIGTERM, and SIGINT. + Can also be called manually before an explicit exit. + After the first call, registered files are cleared and subsequent + calls are no-ops. + """ + with self._lock: + if self._cleanup_done: + return + self._cleanup_done = True + + debug_kept: list[Path] = [] + for path, keep_if_debug in self._files.items(): + if keep_if_debug and DEBUG_MODE: + debug_kept.append(path) + continue + try: + if path.exists(): + path.unlink() + logger.debug(f"Cleaned up: {path}") + except Exception as e: + logger.warning(f"Failed to clean up {path}: {e}") + + if debug_kept: + logger.info( + "Keeping %d file(s) for debugging (NAC_TEST_DEBUG is set): %s", + len(debug_kept), + ", ".join(str(p) for p in debug_kept), + ) + + self._files.clear() + + +def get_cleanup_manager() -> CleanupManager: + """Get the singleton CleanupManager instance. + + Returns: + The global CleanupManager instance. + """ + return CleanupManager() + + def cleanup_pyats_runtime(workspace_path: Path | None = None) -> None: """Clean up PyATS runtime directories before test execution. diff --git a/tests/e2e/fixtures/success/data.yaml b/tests/e2e/fixtures/success/data.yaml index 4db39c46..4495a92c 100644 --- a/tests/e2e/fixtures/success/data.yaml +++ b/tests/e2e/fixtures/success/data.yaml @@ -1,6 +1,7 @@ sdwan: management_ip_variable: vpn0_ip + test_credential: !env TEST_CREDENTIAL_SENTINEL sites: - name: SITE_100 site_id: "100" diff --git a/tests/e2e/test_e2e_scenarios.py b/tests/e2e/test_e2e_scenarios.py index b21766a6..4148bf78 100644 --- a/tests/e2e/test_e2e_scenarios.py +++ b/tests/e2e/test_e2e_scenarios.py @@ -29,6 +29,7 @@ HTML_REPORTS_DIRNAME, IS_WINDOWS, LOG_HTML, + MERGED_DATA_FILENAME, OUTPUT_XML, PRE_FLIGHT_FAILURE_FILENAME, PYATS_RESULTS_DIRNAME, @@ -160,7 +161,6 @@ def test_output_root_contains_only_expected_entries( expected_files = { COMBINED_SUMMARY_FILENAME, - "merged_data_model_test_variables.yaml", } if results.has_robot_results or results.has_pyats_results: expected_files.add(XUNIT_XML) @@ -176,6 +176,18 @@ def test_output_root_contains_only_expected_entries( f"Expected only: {sorted(allowed)}" ) + def test_merged_data_file_removed_after_run(self, results: E2EResults) -> None: + """Merged data model YAML must not persist after a successful run. + + The file contains potentially sensitive variable data and is registered + with CleanupManager for deletion on exit. Its absence confirms cleanup ran. + """ + merged_data_file = results.output_dir / MERGED_DATA_FILENAME + assert not merged_data_file.exists(), ( + f"{MERGED_DATA_FILENAME} was not cleaned up after run — " + "it may contain sensitive data and should be deleted on exit" + ) + # ------------------------------------------------------------------------- # Robot Framework Output Tests # ------------------------------------------------------------------------- diff --git a/tests/integration/test_cleanup_atexit.py b/tests/integration/test_cleanup_atexit.py new file mode 100644 index 00000000..35c28b78 --- /dev/null +++ b/tests/integration/test_cleanup_atexit.py @@ -0,0 +1,134 @@ +# SPDX-License-Identifier: MPL-2.0 +# Copyright (c) 2025 Daniel Schmidt + +"""Integration tests for CleanupManager atexit and signal behaviour. + +Spawns isolated subprocesses so that atexit fires in a real interpreter +exit, not a mock or in-process call, and so that signals are actually +delivered to the process rather than simulated via direct method calls. +""" + +import os +import signal +import subprocess +import sys +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.integration + +_ATEXIT_SCRIPT = """ +import os +import typer +from pathlib import Path +from nac_test.utils.cleanup import get_cleanup_manager + +app = typer.Typer() + +@app.command() +def main() -> None: + get_cleanup_manager().register(Path("{sentinel}"), keep_if_debug=True) + raise typer.Exit(0) + +app() +""" + +_SIGNAL_SCRIPT = """ +import sys +import time +from pathlib import Path +from nac_test.utils.cleanup import get_cleanup_manager + +get_cleanup_manager().register(Path("{sentinel}")) + +sys.stdout.write("ready\\n") +sys.stdout.flush() + +time.sleep(30) +""" + + +@pytest.mark.windows +@pytest.mark.parametrize( + ("debug_env", "expect_exists"), + [ + (None, False), # NAC_TEST_DEBUG unset → file is deleted + ("true", True), # NAC_TEST_DEBUG=true → file is kept + ], +) +def test_cleanup_atexit_via_typer_exit( + tmp_path: Path, debug_env: str | None, expect_exists: bool +) -> None: + """CleanupManager deletes (or retains) registered files on typer.Exit(0). + + Exercises the exact production exit path and the keep_if_debug flag. + """ + sentinel = tmp_path / "job.py" + sentinel.write_text("# temp job") + + env = None + if debug_env is not None: + env = os.environ.copy() + env["NAC_TEST_DEBUG"] = debug_env + + result = subprocess.run( + [sys.executable, "-c", _ATEXIT_SCRIPT.format(sentinel=str(sentinel))], + capture_output=True, + text=True, + env=env, + timeout=30, + ) + + assert result.returncode == 0, result.stderr + assert sentinel.exists() is expect_exists + + +@pytest.mark.parametrize( + "signum", + [signal.SIGTERM, signal.SIGINT], + ids=["SIGTERM", "SIGINT"], +) +def test_cleanup_on_signal(tmp_path: Path, signum: signal.Signals) -> None: + """CleanupManager deletes registered files when the process receives SIGTERM or SIGINT. + + Delivers an actual signal to a child process (not a direct _signal_handler() + call) so that the signal registration path in _install_signal_handlers() is + exercised end-to-end. SIGTERM and SIGINT have distinct re-raise paths: + SIGTERM re-sends the signal via signal.raise_signal(); SIGINT raises + KeyboardInterrupt. Both must still delete registered files before exiting. + """ + sentinel = tmp_path / "sensitive.yaml" + sentinel.write_text("secret: value") + + proc = subprocess.Popen( + [sys.executable, "-c", _SIGNAL_SCRIPT.format(sentinel=str(sentinel))], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + ) + + # Wait until the child signals it is ready (handlers installed, file registered). + try: + ready_line = proc.stdout.readline() # type: ignore[union-attr] + assert ready_line.strip() == "ready", f"Unexpected output: {ready_line!r}" + except Exception as e: + proc.kill() + proc.wait() + raise AssertionError(f"Child process failed to signal readiness: {e}") from e + + proc.send_signal(signum) + + try: + proc.wait(timeout=10) + except subprocess.TimeoutExpired as e: + proc.kill() + proc.wait() + raise AssertionError( + f"Child process did not exit within 10 s after {signum.name}" + ) from e + + assert not sentinel.exists(), ( + f"{sentinel.name} was not deleted after {signum.name} — " + "CleanupManager signal handler may not be registered correctly" + ) diff --git a/tests/integration/test_cli_rendering.py b/tests/integration/test_cli_rendering.py index bb59366c..4db036a3 100644 --- a/tests/integration/test_cli_rendering.py +++ b/tests/integration/test_cli_rendering.py @@ -16,7 +16,11 @@ from typer.testing import CliRunner import nac_test.cli.main -from nac_test.core.constants import EXIT_ERROR, ROBOT_RESULTS_DIRNAME +from nac_test.core.constants import ( + EXIT_ERROR, + MERGED_DATA_FILENAME, + ROBOT_RESULTS_DIRNAME, +) pytestmark = [pytest.mark.integration, pytest.mark.windows] @@ -264,41 +268,33 @@ def test_chunked_list_rendering_produces_expected_content(tmp_path: Path) -> Non def test_merged_data_model_creates_default_filename(tmp_path: Path) -> None: - """Test that merged data model creates file with default filename. - - Verifies that the CLI creates a merged data model YAML file with - the default filename when no custom filename is specified. - - Args: - tmp_path: Pytest fixture providing a temporary directory. - """ + """Test that the merged data model is written with the expected filename and content.""" runner = CliRunner() - data_path = "tests/integration/fixtures/data_merge/" templates_path = "tests/integration/fixtures/templates/" - expected_filename = "merged_data_model_test_variables.yaml" - output_model_path = tmp_path / expected_filename - data_dir = Path(data_path) + output_model_path = tmp_path / MERGED_DATA_FILENAME + data_dir = Path("tests/integration/fixtures/data_merge") expected_model_path = data_dir / "result.yaml" - base_args = [ - "-d", - str(data_dir / "file1.yaml"), - "-d", - str(data_dir / "file2.yaml"), - "-t", - templates_path, - "-o", - str(tmp_path), - "--render-only", - ] - - result = runner.invoke(nac_test.cli.main.app, base_args) + result = runner.invoke( + nac_test.cli.main.app, + [ + "-d", + str(data_dir / "file1.yaml"), + "-d", + str(data_dir / "file2.yaml"), + "-t", + templates_path, + "-o", + str(tmp_path), + "--render-only", + ], + ) assert result.exit_code == 0, ( f"Merged data model creation should succeed, got exit code " f"{result.exit_code}: {result.output}" ) assert output_model_path.exists(), ( - f"Default merged data model file should exist at {output_model_path}" + f"Merged data model file should exist at {output_model_path}" ) assert filecmp.cmp(output_model_path, expected_model_path, shallow=False), ( f"Merged data model content should match expected content from " @@ -306,51 +302,6 @@ def test_merged_data_model_creates_default_filename(tmp_path: Path) -> None: ) -def test_merged_data_model_creates_custom_filename(tmp_path: Path) -> None: - """Test that merged data model creates file with custom filename. - - Verifies that the CLI creates a merged data model YAML file with - a custom filename when --merged-data-filename is specified. - - Args: - tmp_path: Pytest fixture providing a temporary directory. - """ - runner = CliRunner() - data_path = "tests/integration/fixtures/data_merge/" - templates_path = "tests/integration/fixtures/templates/" - expected_filename = "custom.yaml" - output_model_path = tmp_path / expected_filename - data_dir = Path(data_path) - expected_model_path = data_dir / "result.yaml" - - base_args = [ - "-d", - str(data_dir / "file1.yaml"), - "-d", - str(data_dir / "file2.yaml"), - "-t", - templates_path, - "-o", - str(tmp_path), - "--render-only", - "--merged-data-filename", - "custom.yaml", - ] - - result = runner.invoke(nac_test.cli.main.app, base_args) - assert result.exit_code == 0, ( - f"Merged data model with custom filename should succeed, got exit code " - f"{result.exit_code}: {result.output}" - ) - assert output_model_path.exists(), ( - f"Custom merged data model file should exist at {output_model_path}" - ) - assert filecmp.cmp(output_model_path, expected_model_path, shallow=False), ( - f"Custom merged data model content should match expected content from " - f"{expected_model_path}" - ) - - def test_render_only_without_controller_credentials(tmp_path: Path) -> None: """Render-only mode works without controller environment variables. All other tests in this module implicitly also test this, but this diff --git a/tests/integration/test_controller_detection_integration.py b/tests/integration/test_controller_detection_integration.py index 215d7bbc..a74adc19 100644 --- a/tests/integration/test_controller_detection_integration.py +++ b/tests/integration/test_controller_detection_integration.py @@ -53,7 +53,6 @@ def test_method(self): data_paths=[tmp_path / "data.yaml"], test_dir=test_dir, output_dir=output_dir, - merged_data_filename="merged_data.yaml", ) # Verify controller type was detected correctly diff --git a/tests/pyats_core/test_device_executor_env_propagation.py b/tests/pyats_core/test_device_executor_env_propagation.py index 59dc2b45..f622f510 100644 --- a/tests/pyats_core/test_device_executor_env_propagation.py +++ b/tests/pyats_core/test_device_executor_env_propagation.py @@ -8,6 +8,7 @@ from typing import Any from unittest.mock import AsyncMock, MagicMock, patch +from nac_test.data_merger import DataMerger from nac_test.pyats_core.constants import ENV_TEST_DIR from nac_test.pyats_core.execution.device.device_executor import DeviceExecutor from nac_test.pyats_core.execution.device.testbed_generator import TestbedGenerator @@ -51,6 +52,7 @@ async def capture_execute_job_with_testbed( test_status={}, test_dir=pyats_test_dirs.test_dir, base_output_dir=pyats_test_dirs.output_dir, + merged_data_path=DataMerger.merged_data_path(pyats_test_dirs.output_dir), ) device: dict[str, Any] = { diff --git a/tests/pyats_core/test_orchestrator_controller_detection.py b/tests/pyats_core/test_orchestrator_controller_detection.py index 57b600a8..0b0d5002 100644 --- a/tests/pyats_core/test_orchestrator_controller_detection.py +++ b/tests/pyats_core/test_orchestrator_controller_detection.py @@ -24,7 +24,6 @@ def test_orchestrator_detects_controller_on_init( data_paths=[pyats_test_dirs.output_dir.parent / "data.yaml"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", ) assert orchestrator.controller_type == "ACI" @@ -38,7 +37,6 @@ def test_orchestrator_exits_on_detection_failure( data_paths=[pyats_test_dirs.output_dir.parent / "data.yaml"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", ) # Verify it exits with EXIT_ERROR (255) for infrastructure errors @@ -56,7 +54,6 @@ def test_orchestrator_handles_multiple_controllers_error( data_paths=[pyats_test_dirs.output_dir.parent / "data.yaml"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", ) # Verify it exits with EXIT_ERROR (255) for infrastructure errors @@ -70,7 +67,6 @@ def test_validate_environment_uses_detected_controller( data_paths=[pyats_test_dirs.output_dir.parent / "data.yaml"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", ) assert orchestrator.controller_type == "SDWAN" @@ -94,7 +90,6 @@ def test_orchestrator_no_longer_uses_controller_type_env_var( data_paths=[pyats_test_dirs.output_dir.parent / "data.yaml"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", ) assert orchestrator.controller_type == "ACI" diff --git a/tests/pyats_core/test_orchestrator_controller_param.py b/tests/pyats_core/test_orchestrator_controller_param.py index d8a63cef..d9083d4a 100644 --- a/tests/pyats_core/test_orchestrator_controller_param.py +++ b/tests/pyats_core/test_orchestrator_controller_param.py @@ -24,7 +24,6 @@ def test_orchestrator_uses_provided_controller_type( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", controller_type="SDWAN", ) @@ -39,7 +38,6 @@ def test_orchestrator_falls_back_to_detection_when_none( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", controller_type=None, ) @@ -53,7 +51,6 @@ def test_orchestrator_defaults_to_detection( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", ) assert orchestrator.controller_type == "CC" @@ -66,7 +63,6 @@ def test_validate_environment_uses_provided_controller( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", controller_type="ACI", ) diff --git a/tests/pyats_core/test_orchestrator_dry_run.py b/tests/pyats_core/test_orchestrator_dry_run.py index 03f6f02d..6fb4091e 100644 --- a/tests/pyats_core/test_orchestrator_dry_run.py +++ b/tests/pyats_core/test_orchestrator_dry_run.py @@ -32,7 +32,6 @@ def test_dry_run_prints_summary_and_skips_execution( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", dry_run=True, ) @@ -80,7 +79,6 @@ def test_dry_run_returns_not_run_results_for_api_and_d2d( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", dry_run=True, ) @@ -114,7 +112,6 @@ def test_dry_run_does_not_execute_tests( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", dry_run=True, ) @@ -145,7 +142,6 @@ def test_dry_run_empty_test_lists( data_paths=[pyats_test_dirs.output_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", dry_run=True, ) diff --git a/tests/pyats_core/test_orchestrator_env_var.py b/tests/pyats_core/test_orchestrator_env_var.py index 2c25d4a7..ca22ec65 100644 --- a/tests/pyats_core/test_orchestrator_env_var.py +++ b/tests/pyats_core/test_orchestrator_env_var.py @@ -32,7 +32,6 @@ def test_orchestrator_respects_nac_test_pyats_processes_env_var( data_paths=[pyats_test_dirs.test_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", controller_type="ACI", ) @@ -52,7 +51,6 @@ def test_execute_api_tests_includes_env_test_dir( data_paths=[pyats_test_dirs.test_dir.parent / "data"], test_dir=pyats_test_dirs.test_dir, output_dir=pyats_test_dirs.output_dir, - merged_data_filename="merged.yaml", controller_type="ACI", ) diff --git a/tests/unit/robot/test_orchestrator.py b/tests/unit/robot/test_orchestrator.py index 8215dc93..a4872ef3 100644 --- a/tests/unit/robot/test_orchestrator.py +++ b/tests/unit/robot/test_orchestrator.py @@ -2,12 +2,11 @@ # Copyright (c) 2025 Daniel Schmidt # mypy: disable-error-code="no-untyped-def,method-assign" -# SPDX-License-Identifier: MPL-2.0 -# Copyright (c) 2025 Daniel Schmidt """Unit tests for Robot Framework orchestrator.""" from pathlib import Path +from typing import Any from unittest.mock import MagicMock, patch import pytest @@ -37,11 +36,9 @@ def temp_output_dir(tmp_path: Path) -> Path: @pytest.fixture -def mock_data_paths(tmp_path: Path) -> list[Path]: - """Create mock data paths.""" - data_file = tmp_path / "data.yaml" - data_file.write_text("test: data") - return [data_file] +def merged_data() -> dict[str, Any]: + """Minimal merged data dict passed to RobotOrchestrator/RobotWriter.""" + return {"test": "data"} @pytest.fixture @@ -53,15 +50,12 @@ def mock_templates_dir(tmp_path: Path) -> Path: @pytest.fixture -def orchestrator( - mock_data_paths, mock_templates_dir, temp_output_dir -) -> RobotOrchestrator: +def orchestrator(mock_templates_dir, temp_output_dir, merged_data) -> RobotOrchestrator: """Create a RobotOrchestrator instance for testing.""" return RobotOrchestrator( - data_paths=mock_data_paths, templates_dir=mock_templates_dir, output_dir=temp_output_dir, - merged_data_filename="merged_data.yaml", + merged_data=merged_data, loglevel=DEFAULT_LOGLEVEL, ) @@ -69,16 +63,42 @@ def orchestrator( class TestRobotOrchestrator: """Test suite for RobotOrchestrator.""" + @staticmethod + def _setup_run_tests_mocks( + mock_generator: MagicMock, + mock_pabot: MagicMock, + orchestrator: RobotOrchestrator, + temp_output_dir: Path, + ) -> None: + """Wire up the standard mocks and artifact files needed to exercise run_tests(). + + Creates the robot_results/ directory with stub artifact files so that + _create_backward_compat_links() and the report generator don't fail. + """ + orchestrator.robot_writer.write = MagicMock() + mock_pabot.return_value = 0 + + mock_generator_instance = MagicMock() + mock_generator_instance.generate_summary_report.return_value = ( + None, + TestResults(), + ) + mock_generator.return_value = mock_generator_instance + + robot_results_dir = temp_output_dir / ROBOT_RESULTS_DIRNAME + robot_results_dir.mkdir(exist_ok=True) + for filename in [LOG_HTML, OUTPUT_XML, REPORT_HTML, XUNIT_XML]: + (robot_results_dir / filename).write_text(f"Mock {filename}") + def test_initialization( - self, orchestrator, temp_output_dir, mock_data_paths, mock_templates_dir + self, orchestrator, temp_output_dir, mock_templates_dir ) -> None: """Test orchestrator initialization.""" assert orchestrator.base_output_dir == temp_output_dir assert orchestrator.output_dir == temp_output_dir / ROBOT_RESULTS_DIRNAME assert orchestrator.ordering_file == orchestrator.output_dir / ORDERING_FILENAME - assert orchestrator.data_paths == mock_data_paths assert orchestrator.templates_dir == mock_templates_dir - assert orchestrator.merged_data_filename == "merged_data.yaml" + assert orchestrator.merged_data == {"test": "data"} assert orchestrator.render_only is False assert orchestrator.dry_run is False assert orchestrator.loglevel == DEFAULT_LOGLEVEL @@ -86,14 +106,13 @@ def test_initialization( # TODO(#699): remove this test — it only asserts that __init__ assigns attributes, # not any application logic. Replace with tests for actual orchestrator behaviour. def test_initialization_with_optional_params( - self, mock_data_paths, mock_templates_dir, temp_output_dir + self, mock_templates_dir, temp_output_dir ) -> None: """Test orchestrator initialization with optional parameters.""" orchestrator = RobotOrchestrator( - data_paths=mock_data_paths, templates_dir=mock_templates_dir, output_dir=temp_output_dir, - merged_data_filename="merged.yaml", + merged_data={"test": "data"}, include_tags=["smoke", "regression"], exclude_tags=["wip"], render_only=True, @@ -199,20 +218,13 @@ def test_run_tests_render_only_mode( """Test run_tests in render-only mode.""" orchestrator.render_only = True - # Mock RobotWriter instance methods directly on the orchestrator's writer orchestrator.robot_writer.write = MagicMock() - orchestrator.robot_writer.write_merged_data_model = MagicMock() stats = orchestrator.run_tests() - # Verify template rendering was called orchestrator.robot_writer.write.assert_called_once() - orchestrator.robot_writer.write_merged_data_model.assert_called_once() - - # Verify pabot was NOT called mock_pabot.assert_not_called() - # Verify render-only mode returns not_run() result with SKIPPED state assert stats.was_not_run is True assert stats.reason == "render-only mode" @@ -222,14 +234,10 @@ def test_run_tests_full_execution( self, mock_generator, mock_pabot, orchestrator, temp_output_dir ) -> None: """Test run_tests executes full test lifecycle.""" - # Mock RobotWriter instance methods directly orchestrator.robot_writer.write = MagicMock() - orchestrator.robot_writer.write_merged_data_model = MagicMock() - # Mock pabot success mock_pabot.return_value = 0 - # Mock report generator - now returns tuple (path, stats) mock_generator_instance = MagicMock() mock_stats = TestResults(passed=1, failed=0, skipped=0) mock_generator_instance.generate_summary_report.return_value = ( @@ -238,7 +246,6 @@ def test_run_tests_full_execution( ) mock_generator.return_value = mock_generator_instance - # Create mock Robot output files in robot_results/ (pabot 5.2+ writes directly there) robot_results_dir = temp_output_dir / ROBOT_RESULTS_DIRNAME robot_results_dir.mkdir() for filename in [LOG_HTML, OUTPUT_XML, REPORT_HTML, XUNIT_XML]: @@ -246,13 +253,10 @@ def test_run_tests_full_execution( stats = orchestrator.run_tests() - # Verify all phases executed orchestrator.robot_writer.write.assert_called_once() - orchestrator.robot_writer.write_merged_data_model.assert_called_once() mock_pabot.assert_called_once() mock_generator_instance.generate_summary_report.assert_called_once() - # Verify statistics returned assert stats.total == 1 assert stats.passed == 1 assert stats.failed == 0 @@ -263,9 +267,7 @@ def test_run_tests_handles_pabot_exit_252_as_empty( self, mock_pabot: MagicMock, orchestrator: RobotOrchestrator ) -> None: """Test run_tests returns TestResults.empty() on pabot exit code 252 (no tests matched filters).""" - # Mock RobotWriter instance methods orchestrator.robot_writer.write = MagicMock() - orchestrator.robot_writer.write_merged_data_model = MagicMock() # Mock pabot exit code EXIT_DATA_ERROR (for example no tests matched --include/--exclude filters) mock_pabot.return_value = EXIT_DATA_ERROR @@ -354,7 +356,6 @@ def test_verbose_flag_passed_to_pabot( self, mock_generator, mock_pabot, - mock_data_paths, mock_templates_dir, temp_output_dir, verbose, @@ -364,28 +365,15 @@ def test_verbose_flag_passed_to_pabot( ) -> None: """Test that verbose and loglevel are correctly passed to run_pabot.""" orchestrator = RobotOrchestrator( - data_paths=mock_data_paths, templates_dir=mock_templates_dir, output_dir=temp_output_dir, - merged_data_filename="merged.yaml", + merged_data={"test": "data"}, verbose=verbose, loglevel=loglevel, ) - orchestrator.robot_writer.write = MagicMock() - orchestrator.robot_writer.write_merged_data_model = MagicMock() - mock_pabot.return_value = 0 - - mock_generator_instance = MagicMock() - mock_generator_instance.generate_summary_report.return_value = ( - None, - TestResults(), + self._setup_run_tests_mocks( + mock_generator, mock_pabot, orchestrator, temp_output_dir ) - mock_generator.return_value = mock_generator_instance - - robot_results_dir = temp_output_dir / ROBOT_RESULTS_DIRNAME - robot_results_dir.mkdir() - for filename in [LOG_HTML, OUTPUT_XML, REPORT_HTML, XUNIT_XML]: - (robot_results_dir / filename).write_text(f"Mock {filename}") orchestrator.run_tests() @@ -413,7 +401,6 @@ def test_include_exclude_tags_passed_to_pabot( self, mock_generator, mock_pabot, - mock_data_paths, mock_templates_dir, temp_output_dir, include_tags, @@ -421,28 +408,15 @@ def test_include_exclude_tags_passed_to_pabot( ) -> None: """Test that include/exclude tags are correctly passed through to run_pabot.""" orchestrator = RobotOrchestrator( - data_paths=mock_data_paths, templates_dir=mock_templates_dir, output_dir=temp_output_dir, - merged_data_filename="merged.yaml", + merged_data={"test": "data"}, include_tags=include_tags, exclude_tags=exclude_tags, ) - orchestrator.robot_writer.write = MagicMock() - orchestrator.robot_writer.write_merged_data_model = MagicMock() - mock_pabot.return_value = 0 - - mock_generator_instance = MagicMock() - mock_generator_instance.generate_summary_report.return_value = ( - None, - TestResults(), + self._setup_run_tests_mocks( + mock_generator, mock_pabot, orchestrator, temp_output_dir ) - mock_generator.return_value = mock_generator_instance - - robot_results_dir = temp_output_dir / ROBOT_RESULTS_DIRNAME - robot_results_dir.mkdir() - for filename in [LOG_HTML, OUTPUT_XML, REPORT_HTML, XUNIT_XML]: - (robot_results_dir / filename).write_text(f"Mock {filename}") orchestrator.run_tests() diff --git a/tests/unit/robot/test_robot_writer.py b/tests/unit/robot/test_robot_writer.py new file mode 100644 index 00000000..528071df --- /dev/null +++ b/tests/unit/robot/test_robot_writer.py @@ -0,0 +1,99 @@ +# SPDX-License-Identifier: MPL-2.0 +# Copyright (c) 2025 Daniel Schmidt + +# mypy: disable-error-code="no-untyped-def" + +"""Unit tests for RobotWriter. + +Covers: +- render_template: uses self.data as template context by default +- render_template: custom_data overrides self.data when provided +- render_template: output file and parent directories are created +""" + +from pathlib import Path + +import pytest +from jinja2 import Environment, FileSystemLoader + +from nac_test.robot.robot_writer import RobotWriter + +# --------------------------------------------------------------------------- +# Shared fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def templates_dir(tmp_path: Path) -> Path: + """Temporary directory with a minimal Jinja2 robot template.""" + (tmp_path / "simple.robot").write_text( + "*** Test Cases ***\nTest {{ device }}\n Log ok\n" + ) + return tmp_path + + +@pytest.fixture +def jinja_env(templates_dir: Path) -> Environment: + """Jinja2 environment rooted at templates_dir.""" + return Environment(loader=FileSystemLoader(str(templates_dir))) # nosec B701 + + +@pytest.fixture +def writer() -> RobotWriter: + """RobotWriter with a simple data dict matching the simple.robot template.""" + return RobotWriter( + merged_data={"device": "Router1", "vlan": 100}, + filters_path=None, + tests_path=None, + ) + + +# --------------------------------------------------------------------------- +# render_template tests +# --------------------------------------------------------------------------- + + +class TestRobotWriterRenderTemplate: + """Tests for RobotWriter.render_template().""" + + def test_renders_using_self_data( + self, writer: RobotWriter, jinja_env: Environment, tmp_path: Path + ) -> None: + """render_template uses self.data as context when no custom_data is given.""" + output = tmp_path / "out.robot" + writer.render_template( + template_path=Path("simple.robot"), + output_path=output, + env=jinja_env, + ) + content = output.read_text() + assert "Router1" in content + assert "{{" not in content + + def test_custom_data_overrides_self_data( + self, writer: RobotWriter, jinja_env: Environment, tmp_path: Path + ) -> None: + """custom_data replaces self.data entirely as the template context.""" + output = tmp_path / "out.robot" + writer.render_template( + template_path=Path("simple.robot"), + output_path=output, + env=jinja_env, + custom_data={"device": "Switch99"}, + ) + content = output.read_text() + assert "Switch99" in content + assert "Router1" not in content + + def test_creates_output_file_and_parent_directories( + self, writer: RobotWriter, jinja_env: Environment, tmp_path: Path + ) -> None: + """render_template creates the output file and any missing parent directories.""" + output = tmp_path / "a" / "b" / "out.robot" + writer.render_template( + template_path=Path("simple.robot"), + output_path=output, + env=jinja_env, + ) + assert output.exists() + assert output.parent.is_dir() diff --git a/tests/unit/test_combined_orchestrator_controller.py b/tests/unit/test_combined_orchestrator_controller.py index cd040ada..9b4c14b7 100644 --- a/tests/unit/test_combined_orchestrator_controller.py +++ b/tests/unit/test_combined_orchestrator_controller.py @@ -51,7 +51,6 @@ def test_controller_type_is_none_after_init( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", ) # Controller detection is now deferred to run_tests() @@ -76,7 +75,6 @@ def test_controller_detected_during_run_tests( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_pyats_only=True, ) @@ -125,7 +123,6 @@ def test_detection_failure_continues_with_preflight_failure( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_pyats_only=True, ) @@ -190,7 +187,6 @@ def test_combined_orchestrator_passes_controller_to_pyats( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_pyats_only=True, # Run PyATS only mode ) @@ -235,7 +231,6 @@ def test_combined_orchestrator_passes_controller_to_pyats( data_paths=[data_dir], test_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", minimal_reports=False, custom_testbed_path=None, controller_type="SDWAN", @@ -291,7 +286,6 @@ def test_render_only_mode_does_not_instantiate_pyats_orchestrator( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", render_only=True, # Critical: render-only mode ) @@ -362,7 +356,6 @@ def test_combined_orchestrator_production_mode_passes_controller( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", ) # Controller type should be None after init (deferred to run_tests) @@ -404,7 +397,6 @@ def test_combined_orchestrator_production_mode_passes_controller( data_paths=[data_dir], test_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", minimal_reports=False, custom_testbed_path=None, controller_type="CC", diff --git a/tests/unit/test_combined_orchestrator_flow.py b/tests/unit/test_combined_orchestrator_flow.py index c496dd29..19d886cd 100644 --- a/tests/unit/test_combined_orchestrator_flow.py +++ b/tests/unit/test_combined_orchestrator_flow.py @@ -75,7 +75,6 @@ def orchestrator(self, tmp_path: Path) -> CombinedOrchestrator: data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", ) @pytest.fixture @@ -294,7 +293,6 @@ def test_dev_pyats_only_skips_robot( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_pyats_only=True, ) @@ -351,7 +349,6 @@ def test_dev_robot_only_skips_pyats( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_robot_only=True, ) @@ -407,7 +404,6 @@ def test_dev_pyats_only_generates_dashboard( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_pyats_only=True, ) @@ -463,7 +459,6 @@ def test_render_only_skips_dashboard_generation( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", render_only=True, ) @@ -510,7 +505,6 @@ def test_render_only_converts_robot_exception_to_results( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", render_only=True, ) @@ -555,7 +549,6 @@ def test_non_render_only_catches_robot_exception( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", render_only=False, # Not render-only ) @@ -728,7 +721,6 @@ def test_print_execution_summary_not_called_in_render_only( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", render_only=True, ) diff --git a/tests/unit/test_combined_orchestrator_python311.py b/tests/unit/test_combined_orchestrator_python311.py index c22e52b5..9536b73b 100644 --- a/tests/unit/test_combined_orchestrator_python311.py +++ b/tests/unit/test_combined_orchestrator_python311.py @@ -39,7 +39,6 @@ def _make_orchestrator( data_paths=[data_dir], templates_dir=templates_dir, output_dir=output_dir, - merged_data_filename="merged.yaml", dev_pyats_only=dev_pyats_only, ) diff --git a/tests/unit/test_data_merger.py b/tests/unit/test_data_merger.py new file mode 100644 index 00000000..5d2d32ed --- /dev/null +++ b/tests/unit/test_data_merger.py @@ -0,0 +1,48 @@ +# SPDX-License-Identifier: MPL-2.0 +# Copyright (c) 2025 Daniel Schmidt + +"""Unit tests for DataMerger. + +Covers: +- merge_data_files: empty input edge case +- write_merged_data_model: output filename, YAML roundtrip +""" + +from pathlib import Path + +from nac_yaml import yaml + +from nac_test.data_merger import DataMerger + + +class TestMergeDataFiles: + """Tests for DataMerger.merge_data_files().""" + + def test_merge_empty_list_returns_empty_dict(self) -> None: + """An empty path list returns an empty dict rather than raising.""" + result = DataMerger.merge_data_files([]) + assert result == {} + + +class TestWriteMergedDataModel: + """Tests for DataMerger.write_merged_data_model().""" + + def test_returns_path_to_written_file(self, tmp_path: Path) -> None: + """write_merged_data_model returns the path of the file it created.""" + returned = DataMerger.write_merged_data_model({"key": "value"}, tmp_path) + assert returned == DataMerger.merged_data_path(tmp_path) + assert returned.exists() + + def test_writes_no_extra_files(self, tmp_path: Path) -> None: + """Exactly one file is created in the output directory.""" + DataMerger.write_merged_data_model({"key": "value"}, tmp_path) + assert len(list(tmp_path.iterdir())) == 1 + + def test_roundtrip_preserves_content(self, tmp_path: Path) -> None: + """Data written to YAML can be read back with the same structure.""" + original = {"host": "router1", "vlan": 100, "tags": ["a", "b"]} + output_path = DataMerger.write_merged_data_model(original, tmp_path) + reloaded = yaml.load_yaml_files([output_path]) + assert reloaded["host"] == "router1" + assert reloaded["vlan"] == 100 + assert list(reloaded["tags"]) == ["a", "b"] diff --git a/tests/unit/utils/test_cleanup.py b/tests/unit/utils/test_cleanup.py new file mode 100644 index 00000000..c5700ee7 --- /dev/null +++ b/tests/unit/utils/test_cleanup.py @@ -0,0 +1,383 @@ +# SPDX-License-Identifier: MPL-2.0 +# Copyright (c) 2025 Daniel Schmidt + +"""Unit tests for CleanupManager.""" + +from __future__ import annotations + +import signal +import threading +from collections.abc import Generator +from pathlib import Path +from typing import Any +from unittest.mock import patch + +import pytest + +from nac_test.core.constants import IS_WINDOWS +from nac_test.utils.cleanup import CleanupManager, get_cleanup_manager + + +@pytest.fixture +def fresh_cleanup_manager() -> Generator[CleanupManager, None, None]: + """Create a fresh CleanupManager instance isolated from the global singleton.""" + with ( + patch.object(CleanupManager, "_instance", None), + patch.object(CleanupManager, "_initialized", False), + patch("atexit.register"), + ): + cm = CleanupManager() + cm._files.clear() + cm._cleanup_done = False + yield cm + + +class TestCleanupManagerSingleton: + """Tests for CleanupManager singleton behavior.""" + + def test_multiple_calls_return_same_instance( + self, fresh_cleanup_manager: CleanupManager + ) -> None: + cm2 = CleanupManager() + assert fresh_cleanup_manager is cm2 + + def test_get_cleanup_manager_returns_singleton( + self, fresh_cleanup_manager: CleanupManager + ) -> None: + assert get_cleanup_manager() is fresh_cleanup_manager + + +class TestCleanupManagerRegistration: + """Tests for file registration and unregistration.""" + + def test_register_adds_path_with_default_flag_false( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + """register() without keep_if_debug adds the path and stores False (always delete).""" + test_file = tmp_path / "test.txt" + test_file.touch() + + fresh_cleanup_manager.register(test_file) + + assert test_file.resolve() in fresh_cleanup_manager._files + assert fresh_cleanup_manager._files[test_file.resolve()] is False + + def test_register_keep_if_debug_stores_true( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + """register(keep_if_debug=True) stores True for the path.""" + test_file = tmp_path / "test.txt" + test_file.touch() + + fresh_cleanup_manager.register(test_file, keep_if_debug=True) + + assert fresh_cleanup_manager._files[test_file.resolve()] is True + + def test_unregister_prevents_deletion( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + """A file removed from the registry is not deleted on cleanup.""" + test_file = tmp_path / "test.txt" + test_file.touch() + + fresh_cleanup_manager.register(test_file) + fresh_cleanup_manager.unregister(test_file) + fresh_cleanup_manager.run_cleanup() + + assert test_file.exists() + + @pytest.mark.parametrize("keep_if_debug", [False, True]) + def test_unregister_removes_path( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path, keep_if_debug: bool + ) -> None: + """unregister() removes the path regardless of keep_if_debug flag.""" + test_file = tmp_path / "test.txt" + test_file.touch() + + fresh_cleanup_manager.register(test_file, keep_if_debug=keep_if_debug) + fresh_cleanup_manager.unregister(test_file) + + assert test_file.resolve() not in fresh_cleanup_manager._files + + def test_unregister_nonexistent_path_is_safe( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + nonexistent = tmp_path / "nonexistent.txt" + fresh_cleanup_manager.unregister(nonexistent) # Should not raise + + +class TestCleanupManagerCleanup: + """Tests for cleanup behavior.""" + + def test_cleanup_removes_registered_files( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + test_file = tmp_path / "sensitive_data.yaml" + test_file.write_text("secret: password123") + + fresh_cleanup_manager.register(test_file) + fresh_cleanup_manager.run_cleanup() + + assert not test_file.exists() + + def test_cleanup_is_idempotent( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + test_file = tmp_path / "test.txt" + test_file.touch() + + fresh_cleanup_manager.register(test_file) + fresh_cleanup_manager.run_cleanup() + + assert not test_file.exists() # deleted on first call + assert fresh_cleanup_manager._cleanup_done + + fresh_cleanup_manager.run_cleanup() # second call must not raise + assert fresh_cleanup_manager._cleanup_done + + def test_cleanup_handles_already_deleted_file( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + test_file = tmp_path / "test.txt" + test_file.touch() + + fresh_cleanup_manager.register(test_file) + test_file.unlink() # Delete before cleanup + + fresh_cleanup_manager.run_cleanup() # Should not raise + + def test_cleanup_continues_after_single_file_error( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + file1 = tmp_path / "file1.txt" + file2 = tmp_path / "file2.txt" + file1.touch() + file2.touch() + + fresh_cleanup_manager.register(file1) + fresh_cleanup_manager.register(file2) + + # Make file1 a directory so unlink fails + file1.unlink() + file1.mkdir() + + fresh_cleanup_manager.run_cleanup() + + # file2 should still be cleaned up despite file1 error + assert not file2.exists() + # Cleanup completed + assert fresh_cleanup_manager._cleanup_done + + +class TestCleanupManagerSkipIfDebug: + """Tests for keep_if_debug behaviour during cleanup.""" + + @pytest.mark.parametrize( + ("debug_mode", "expected_exists"), + [ + (False, False), # debug off → file is deleted + (True, True), # debug on → file is kept + ], + ids=["debug_off_deletes", "debug_on_keeps"], + ) + def test_keep_if_debug_respects_debug_mode( + self, + fresh_cleanup_manager: CleanupManager, + tmp_path: Path, + debug_mode: bool, + expected_exists: bool, + ) -> None: + """Files registered with keep_if_debug=True are kept iff NAC_TEST_DEBUG is set.""" + test_file = tmp_path / "job.py" + test_file.touch() + + fresh_cleanup_manager.register(test_file, keep_if_debug=True) + + with patch("nac_test.utils.cleanup.DEBUG_MODE", debug_mode): + fresh_cleanup_manager.run_cleanup() + + assert test_file.exists() is expected_exists + + def test_normal_files_always_deleted_regardless_of_debug( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + """Files registered without keep_if_debug are always deleted, even in debug mode.""" + test_file = tmp_path / "sensitive.yaml" + test_file.touch() + + fresh_cleanup_manager.register(test_file) + + with patch("nac_test.utils.cleanup.DEBUG_MODE", True): + fresh_cleanup_manager.run_cleanup() + + assert not test_file.exists() + + def test_mixed_registration_in_debug_mode( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + """In debug mode, sensitive files are deleted but debug-skipped files are kept.""" + sensitive = tmp_path / "merged_data.yaml" + debug_file = tmp_path / "job.py" + sensitive.touch() + debug_file.touch() + + fresh_cleanup_manager.register(sensitive) + fresh_cleanup_manager.register(debug_file, keep_if_debug=True) + + with patch("nac_test.utils.cleanup.DEBUG_MODE", True): + fresh_cleanup_manager.run_cleanup() + + assert not sensitive.exists() + assert debug_file.exists() + + +class TestCleanupManagerThreadSafety: + """Tests for thread-safe behavior.""" + + def test_concurrent_register_and_cleanup( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + """Concurrent register() and run_cleanup() calls must not lose files silently. + + The real risk is a file registered while cleanup is iterating _files, + causing either a RuntimeError (dict changed size) or a silently + skipped deletion. This test creates files in parallel with cleanup + and asserts that every file ends up either deleted (cleanup won the + race) or still tracked and then cleaned up explicitly — nothing is + silently lost. + """ + num_threads = 10 + files_per_thread = 10 + all_files: list[Path] = [] + errors: list[Exception] = [] + + def register_files(thread_id: int) -> None: + try: + for i in range(files_per_thread): + f = tmp_path / f"thread{thread_id}_file{i}.txt" + f.touch() + all_files.append(f) + fresh_cleanup_manager.register(f) + except Exception as e: + errors.append(e) + + threads = [ + threading.Thread(target=register_files, args=(i,)) + for i in range(num_threads) + ] + + for t in threads: + t.start() + + # Trigger cleanup mid-registration to exercise lock contention + fresh_cleanup_manager.run_cleanup() + + for t in threads: + t.join() + + assert not errors, f"Exceptions in registration threads: {errors}" + + # Any file not yet deleted must still be tracked; clean up remaining + for f in all_files: + if f.exists(): + assert f.resolve() in fresh_cleanup_manager._files, ( + f"{f.name} still exists but is no longer tracked — silently lost" + ) + + # Reset idempotency guard so we can clean up remaining files + fresh_cleanup_manager._cleanup_done = False + fresh_cleanup_manager.run_cleanup() + + assert all(not f.exists() for f in all_files), ( + "Some files were neither deleted by cleanup nor cleaned up on retry" + ) + + +@pytest.mark.skipif(IS_WINDOWS, reason="Signal tests not supported on Windows") +class TestCleanupManagerSignalHandlers: + """Tests for signal handler behavior (Unix only).""" + + def test_sigint_cleans_up_and_raises_keyboard_interrupt( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + test_file = tmp_path / "test.txt" + test_file.touch() + fresh_cleanup_manager.register(test_file) + + with patch("signal.signal"), pytest.raises(KeyboardInterrupt): + fresh_cleanup_manager._signal_handler(signal.SIGINT, None) + + assert not test_file.exists() + + def test_sigterm_cleans_up_and_reraises_signal( + self, fresh_cleanup_manager: CleanupManager, tmp_path: Path + ) -> None: + test_file = tmp_path / "test.txt" + test_file.touch() + fresh_cleanup_manager.register(test_file) + + with patch("signal.signal"), patch("signal.raise_signal") as mock_raise: + fresh_cleanup_manager._signal_handler(signal.SIGTERM, None) + mock_raise.assert_called_once_with(signal.SIGTERM) + + assert not test_file.exists() + + @pytest.mark.parametrize( + ("signum", "original_handler", "expected_restored"), + [ + # SIG_DFL (value 0) is falsy — must use `is not None`, not truthiness check + (signal.SIGTERM, signal.SIG_DFL, signal.SIG_DFL), + # SIG_IGN (value 1) is truthy but should still be restored correctly + (signal.SIGTERM, signal.SIG_IGN, signal.SIG_IGN), + # None means no prior handler was recorded — fall back to SIG_DFL + (signal.SIGTERM, None, signal.SIG_DFL), + ], + ids=["SIG_DFL_restored", "SIG_IGN_restored", "None_falls_back_to_SIG_DFL"], + ) + def test_signal_handler_restores_original_handler( + self, + fresh_cleanup_manager: CleanupManager, + signum: int, + original_handler: Any, + expected_restored: Any, + ) -> None: + """Handler restores the original signal disposition, including falsy SIG_DFL.""" + if signum == signal.SIGTERM: + fresh_cleanup_manager._original_sigterm = original_handler + else: + fresh_cleanup_manager._original_sigint = original_handler + + restored: list[Any] = [] + + def capture_signal(sig: int, handler: Any) -> None: + restored.append((sig, handler)) + + with ( + patch("signal.signal", side_effect=capture_signal), + patch("signal.raise_signal"), + ): + fresh_cleanup_manager._signal_handler(signum, None) + + assert restored == [(signum, expected_restored)] + + +class TestCleanupManagerIntegration: + """Integration tests using the real singleton.""" + + def test_multiple_registrations_share_state(self, tmp_path: Path) -> None: + cm = get_cleanup_manager() + initial_count = len(cm._files) + + file1 = tmp_path / "module_a.txt" + file2 = tmp_path / "module_b.txt" + file1.touch() + file2.touch() + + cm.register(file1) + cm.register(file2) + + assert len(cm._files) == initial_count + 2 + + # Clean up test files from singleton + cm.unregister(file1) + cm.unregister(file2)