diff --git a/desloppify/app/commands/review/runner_process_impl/attempt_success.py b/desloppify/app/commands/review/runner_process_impl/attempt_success.py index fb71985b6..7a180cb18 100644 --- a/desloppify/app/commands/review/runner_process_impl/attempt_success.py +++ b/desloppify/app/commands/review/runner_process_impl/attempt_success.py @@ -29,7 +29,7 @@ def handle_successful_attempt_core( return None validate = deps.validate_output_fn or default_validate_fn - valid = validate(output_file) + valid = _safe_validate_output(validate, output_file) valid, grace_wait_used = _validate_with_grace_wait( valid, output_file=output_file, @@ -77,6 +77,20 @@ def _validation_timing(deps: CodexBatchRunnerDeps) -> tuple[float, float]: return grace_seconds, poll_seconds +def _safe_validate_output(validate: DefValidateFn, output_file: Path) -> bool: + """Run output validation without letting callback errors abort execution.""" + try: + return bool(validate(output_file)) + except Exception as exc: # noqa: BLE001 - user-supplied validator must not abort batch execution + logger.warning( + "Runner output validator failed for %s (%s): %s", + output_file, + exc.__class__.__name__, + exc, + ) + return False + + def _validate_with_grace_wait( valid: bool, *, @@ -100,7 +114,7 @@ def _validate_with_grace_wait( deps.sleep_fn(sleep_for) except (OSError, RuntimeError, ValueError, TypeError): break - if validate(output_file): + if _safe_validate_output(validate, output_file): return True, True return False, True @@ -128,7 +142,7 @@ def _recover_output_from_fallback_text( exc, ) return False - if not validate(output_file): + if not _safe_validate_output(validate, output_file): return False log_sections.append( "Runner output recovered from stdout/stderr fallback text." diff --git a/desloppify/tests/commands/review/test_review_runner_batch_split_direct.py b/desloppify/tests/commands/review/test_review_runner_batch_split_direct.py index 7d5c7a1a5..9b36471a2 100644 --- a/desloppify/tests/commands/review/test_review_runner_batch_split_direct.py +++ b/desloppify/tests/commands/review/test_review_runner_batch_split_direct.py @@ -72,6 +72,35 @@ def test_handle_successful_attempt_core_recovers_from_stdout_fallback(tmp_path) assert "recovered" in log_file.read_text(encoding="utf-8").lower() +def test_handle_successful_attempt_core_treats_validator_exception_as_invalid(tmp_path) -> None: + output_file = tmp_path / "out.json" + output_file.write_text('{"ok": true}\n', encoding="utf-8") + log_file = tmp_path / "run.log" + + deps = CodexBatchRunnerDeps( + timeout_seconds=30, + subprocess_run=object(), + timeout_error=TimeoutError, + safe_write_text_fn=lambda path, text: Path(path).write_text(text, encoding="utf-8"), + sleep_fn=lambda _seconds: None, + validate_output_fn=lambda _path: (_ for _ in ()).throw(KeyError("missing required field")), + output_validation_grace_seconds=0.0, + ) + + rc = runner_success_mod.handle_successful_attempt_core( + result=_ExecutionResult(code=0, stdout_text="", stderr_text=""), + output_file=output_file, + log_file=log_file, + deps=deps, + log_sections=["header"], + default_validate_fn=lambda _path: True, + monotonic_fn=lambda: 100.0, + ) + + assert rc == 1 + assert "missing or invalid" in log_file.read_text(encoding="utf-8").lower() + + def test_core_models_normalized_issue_payload_round_trip() -> None: issue = core_models_mod.NormalizedBatchIssue( dimension="naming_quality", diff --git a/desloppify/tests/commands/review/test_review_runner_helpers_direct.py b/desloppify/tests/commands/review/test_review_runner_helpers_direct.py index 111019b08..f13d2a824 100644 --- a/desloppify/tests/commands/review/test_review_runner_helpers_direct.py +++ b/desloppify/tests/commands/review/test_review_runner_helpers_direct.py @@ -12,6 +12,7 @@ import desloppify.app.commands.review.batch.prompt_template as prompt_template_mod import desloppify.app.commands.review.runner_opencode as runner_opencode_mod import desloppify.app.commands.review.runner_parallel as runner_helpers_mod +import desloppify.app.commands.review.runner_process_impl.attempt_success as runner_success_mod from desloppify.app.commands.review.batch.execution import CollectBatchResultsRequest from desloppify.app.commands.review.runner_process_impl.types import _ExecutionResult @@ -60,6 +61,44 @@ def _boom() -> int: assert any("task failed" in message for _idx, message in captured) +def test_execute_batches_parallel_validator_exception_returns_failed_index(tmp_path: Path) -> None: + def _task() -> int: + output_file = tmp_path / "batch-1.raw.txt" + output_file.write_text('{"ok": true}\n', encoding="utf-8") + log_file = tmp_path / "batch-1.log" + + return runner_success_mod.handle_successful_attempt_core( + result=_ExecutionResult(code=0, stdout_text="", stderr_text=""), + output_file=output_file, + log_file=log_file, + deps=orchestrator_mod.CodexBatchRunnerDeps( + timeout_seconds=30, + subprocess_run=subprocess.run, + timeout_error=TimeoutError, + safe_write_text_fn=_safe_write_text, + sleep_fn=lambda _seconds: None, + validate_output_fn=lambda _path: (_ for _ in ()).throw( + KeyError("validator exploded") + ), + output_validation_grace_seconds=0.0, + ), + log_sections=["header"], + default_validate_fn=lambda _path: True, + monotonic_fn=lambda: 100.0, + ) + + failures = runner_helpers_mod.execute_batches( + tasks={0: _task, 1: lambda: 0}, + options=runner_helpers_mod.BatchExecutionOptions( + run_parallel=True, + max_parallel_workers=2, + heartbeat_seconds=0.01, + ), + ) + + assert failures == [0] + + def test_collect_batch_results_recovers_from_log_stdout_payload(tmp_path: Path) -> None: run_root = tmp_path / "run" results_dir = run_root / "results"