Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def handle_successful_attempt_core(
return None

validate = deps.validate_output_fn or default_validate_fn
valid = validate(output_file)
valid = _safe_validate_output(validate, output_file)
valid, grace_wait_used = _validate_with_grace_wait(
valid,
output_file=output_file,
Expand Down Expand Up @@ -77,6 +77,20 @@ def _validation_timing(deps: CodexBatchRunnerDeps) -> tuple[float, float]:
return grace_seconds, poll_seconds


def _safe_validate_output(validate: DefValidateFn, output_file: Path) -> bool:
"""Run output validation without letting callback errors abort execution."""
try:
return bool(validate(output_file))
except Exception as exc: # noqa: BLE001 - user-supplied validator must not abort batch execution
logger.warning(
"Runner output validator failed for %s (%s): %s",
output_file,
exc.__class__.__name__,
exc,
)
return False


def _validate_with_grace_wait(
valid: bool,
*,
Expand All @@ -100,7 +114,7 @@ def _validate_with_grace_wait(
deps.sleep_fn(sleep_for)
except (OSError, RuntimeError, ValueError, TypeError):
break
if validate(output_file):
if _safe_validate_output(validate, output_file):
return True, True
return False, True

Expand Down Expand Up @@ -128,7 +142,7 @@ def _recover_output_from_fallback_text(
exc,
)
return False
if not validate(output_file):
if not _safe_validate_output(validate, output_file):
return False
log_sections.append(
"Runner output recovered from stdout/stderr fallback text."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,35 @@ def test_handle_successful_attempt_core_recovers_from_stdout_fallback(tmp_path)
assert "recovered" in log_file.read_text(encoding="utf-8").lower()


def test_handle_successful_attempt_core_treats_validator_exception_as_invalid(tmp_path) -> None:
output_file = tmp_path / "out.json"
output_file.write_text('{"ok": true}\n', encoding="utf-8")
log_file = tmp_path / "run.log"

deps = CodexBatchRunnerDeps(
timeout_seconds=30,
subprocess_run=object(),
timeout_error=TimeoutError,
safe_write_text_fn=lambda path, text: Path(path).write_text(text, encoding="utf-8"),
sleep_fn=lambda _seconds: None,
validate_output_fn=lambda _path: (_ for _ in ()).throw(KeyError("missing required field")),
output_validation_grace_seconds=0.0,
)

rc = runner_success_mod.handle_successful_attempt_core(
result=_ExecutionResult(code=0, stdout_text="", stderr_text=""),
output_file=output_file,
log_file=log_file,
deps=deps,
log_sections=["header"],
default_validate_fn=lambda _path: True,
monotonic_fn=lambda: 100.0,
)

assert rc == 1
assert "missing or invalid" in log_file.read_text(encoding="utf-8").lower()


def test_core_models_normalized_issue_payload_round_trip() -> None:
issue = core_models_mod.NormalizedBatchIssue(
dimension="naming_quality",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import desloppify.app.commands.review.batch.prompt_template as prompt_template_mod
import desloppify.app.commands.review.runner_opencode as runner_opencode_mod
import desloppify.app.commands.review.runner_parallel as runner_helpers_mod
import desloppify.app.commands.review.runner_process_impl.attempt_success as runner_success_mod
from desloppify.app.commands.review.batch.execution import CollectBatchResultsRequest
from desloppify.app.commands.review.runner_process_impl.types import _ExecutionResult

Expand Down Expand Up @@ -60,6 +61,44 @@ def _boom() -> int:
assert any("task failed" in message for _idx, message in captured)


def test_execute_batches_parallel_validator_exception_returns_failed_index(tmp_path: Path) -> None:
def _task() -> int:
output_file = tmp_path / "batch-1.raw.txt"
output_file.write_text('{"ok": true}\n', encoding="utf-8")
log_file = tmp_path / "batch-1.log"

return runner_success_mod.handle_successful_attempt_core(
result=_ExecutionResult(code=0, stdout_text="", stderr_text=""),
output_file=output_file,
log_file=log_file,
deps=orchestrator_mod.CodexBatchRunnerDeps(
timeout_seconds=30,
subprocess_run=subprocess.run,
timeout_error=TimeoutError,
safe_write_text_fn=_safe_write_text,
sleep_fn=lambda _seconds: None,
validate_output_fn=lambda _path: (_ for _ in ()).throw(
KeyError("validator exploded")
),
output_validation_grace_seconds=0.0,
),
log_sections=["header"],
default_validate_fn=lambda _path: True,
monotonic_fn=lambda: 100.0,
)

failures = runner_helpers_mod.execute_batches(
tasks={0: _task, 1: lambda: 0},
options=runner_helpers_mod.BatchExecutionOptions(
run_parallel=True,
max_parallel_workers=2,
heartbeat_seconds=0.01,
),
)

assert failures == [0]


def test_collect_batch_results_recovers_from_log_stdout_payload(tmp_path: Path) -> None:
run_root = tmp_path / "run"
results_dir = run_root / "results"
Expand Down