Improves pipe cleanup for terminated scheduler jobs#868
Improves pipe cleanup for terminated scheduler jobs#868
Conversation
There was a problem hiding this comment.
Pull request overview
Enhances pipe-mode cleanup so pipe runs can reach terminal states promptly when their underlying scheduler (PBS/Slurm) job terminates unexpectedly, and improves worker robustness so tasks don’t get stuck.
Changes:
- Passes active cluster job IDs from the main scheduler into the pipe coordinator to detect when a pipe’s scheduler job has left the queue.
- Updates pipe run reconciliation to orphan in-flight tasks and cancel unreachable pending tasks when the scheduler job is gone.
- Hardens the pipe worker around spec loading / scratch directory creation failures and adds targeted regression tests.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
arc/scripts/pipe_worker.py |
Guards scratch/spec setup and ensures cleanup paths don’t crash when initialization fails. |
arc/scripts/pipe_worker_test.py |
Adds a regression test for scratch directory creation failure. |
arc/scheduler.py |
Passes server_job_ids into PipeCoordinator.poll_pipes() during the main loop. |
arc/job/pipe/pipe_run.py |
Adds scheduler_job_alive input and cancels PENDING tasks when the scheduler job is gone. |
arc/job/pipe/pipe_run_test.py |
Adds tests for “scheduler job gone” orphaning/cancellation behavior. |
arc/job/pipe/pipe_coordinator.py |
Adds scheduler-job liveness detection and threads it into reconciliation. |
arc/job/pipe/pipe_coordinator_test.py |
Adds tests for job liveness detection and poll integration. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
arc/job/pipe/pipe_run.py
Outdated
| scheduler_job_alive: Whether the scheduler (PBS/Slurm) job for this | ||
| pipe run is still present in the queue. When False, any | ||
| CLAIMED/RUNNING tasks are immediately orphaned (the workers | ||
| are gone) and unreachable PENDING tasks are failed. |
There was a problem hiding this comment.
Docstring mismatch: it says unreachable PENDING tasks are “failed”, but the implementation below transitions them to CANCELLED. Please update the docstring to reflect the actual behavior (or adjust the implementation if “failed” is the intended terminal state).
| are gone) and unreachable PENDING tasks are failed. | |
| are gone) and unreachable PENDING tasks are transitioned to | |
| CANCELLED. |
arc/job/pipe/pipe_run.py
Outdated
| # If the scheduler job is gone, any PENDING tasks will never be | ||
| # claimed. Cancel them so the pipe can finish. | ||
| if not scheduler_job_alive and counts[TaskState.PENDING.value] > 0: | ||
| for task_id in task_ids: | ||
| if not os.path.isdir(os.path.join(tasks_dir, task_id)): |
There was a problem hiding this comment.
When scheduler_job_alive is False, CLAIMED/RUNNING tasks are first marked ORPHANED, but later the retry logic can promote ORPHANED to PENDING (when attempts remain) and this block then immediately CANCELs those PENDING tasks. That results in in-flight work being recorded as CANCELLED rather than FAILED_* and contradicts the stated intent of “orphaned (and subsequently failed)”. Consider skipping the ORPHANED→PENDING retry promotion when scheduler_job_alive is False and instead failing ORPHANED tasks terminally (or otherwise preserving a failure outcome) so job-loss cleanups don’t convert failures into cancellations.
arc/job/pipe/pipe_coordinator.py
Outdated
| For PBS/Slurm array jobs the stored ``scheduler_job_id`` is the base | ||
| ID (e.g. ``'4018898[]'``), while the queue lists individual elements | ||
| (``'4018898[0]'``, …). We match on the numeric prefix so both | ||
| formats are recognised. | ||
|
|
||
| Returns True (optimistic) when *server_job_ids* is unavailable. | ||
| """ | ||
| if server_job_ids is None or pipe.scheduler_job_id is None: | ||
| return True # Cannot determine — assume alive. | ||
| base = pipe.scheduler_job_id.rstrip('[]') | ||
| return any(jid == base or jid.startswith(base + '[') for jid in server_job_ids) | ||
|
|
There was a problem hiding this comment.
_is_scheduler_job_alive does not currently recognize Slurm array job IDs as reported by squeue (commonly 12345_7 or 12345_[1-10]). With a base scheduler_job_id of 12345, this will incorrectly return False and trigger premature orphan/cancel cleanup for still-running pipes. Extend the match logic to also treat IDs starting with base + '_' (and any other formats produced by your check_status_command) as alive, and add a unit test for the Slurm format.
| For PBS/Slurm array jobs the stored ``scheduler_job_id`` is the base | |
| ID (e.g. ``'4018898[]'``), while the queue lists individual elements | |
| (``'4018898[0]'``, …). We match on the numeric prefix so both | |
| formats are recognised. | |
| Returns True (optimistic) when *server_job_ids* is unavailable. | |
| """ | |
| if server_job_ids is None or pipe.scheduler_job_id is None: | |
| return True # Cannot determine — assume alive. | |
| base = pipe.scheduler_job_id.rstrip('[]') | |
| return any(jid == base or jid.startswith(base + '[') for jid in server_job_ids) | |
| For PBS/Slurm array jobs the stored ``scheduler_job_id`` is typically | |
| the base ID (e.g. ``'4018898[]'`` or ``'12345'``), while the queue may | |
| list individual elements or ranges (e.g. ``'4018898[0]'``, | |
| ``'12345_7'``, ``'12345_[1-10]'``). We therefore treat exact matches | |
| and scheduler-specific array suffix forms for the same base ID as | |
| alive. | |
| Regression examples: | |
| >>> class _Pipe: | |
| ... scheduler_job_id = '12345' | |
| >>> PipeCoordinator._is_scheduler_job_alive(_Pipe(), ['12345_7']) | |
| True | |
| >>> PipeCoordinator._is_scheduler_job_alive(_Pipe(), ['12345_[1-10]']) | |
| True | |
| Returns True (optimistic) when *server_job_ids* is unavailable. | |
| """ | |
| if server_job_ids is None or pipe.scheduler_job_id is None: | |
| return True # Cannot determine — assume alive. | |
| base = pipe.scheduler_job_id.rstrip('[]') | |
| def _matches_scheduler_job_id(job_id: str) -> bool: | |
| return ( | |
| job_id == base | |
| or job_id.startswith(base + '[') | |
| or job_id.startswith(base + '_') | |
| ) | |
| return any(_matches_scheduler_job_id(jid) for jid in server_job_ids) |
| def test_scratch_creation_failure_marks_failed(self): | ||
| """If tempfile.mkdtemp fails (e.g., I/O error), the task is properly | ||
| marked FAILED_RETRYABLE instead of being left stuck in RUNNING.""" | ||
| spec = _make_h2o_spec('io_fail') |
There was a problem hiding this comment.
This test’s docstring claims the task should be marked FAILED_RETRYABLE, but the assertion allows FAILED_TERMINAL too. Since initialize_task() defaults to multiple attempts, the test should assert the specific expected state (and/or assert it’s not left in RUNNING) so it actually guards the intended behavior.
| result['parser_summary'] = ess_info | ||
| if ess_info['status'] != 'done' and _is_deterministic_ess_error(ess_info): | ||
| result['failure_class'] = 'ess_error' | ||
| is_deterministic_ess = True | ||
| write_result_json(attempt_dir, result) |
There was a problem hiding this comment.
When a deterministic ESS error is detected on the exception path, this block overwrites result['failure_class'] to 'ess_error', but the later update_task_state(..., failure_class=...) call still persists the original Python exception type. This makes state.json inconsistent with result.json and can mislead downstream diagnostics. Use the same failure_class value for the state update as the one written into result.json (e.g., result['failure_class']).
20fc003 to
d7258d3
Compare
d7258d3 to
6765d51
Compare
Only flag resubmission for genuinely retried tasks (attempt_index > 0). Fresh PENDING tasks (attempt_index == 0) are waiting for the initial submission's workers to start — don't resubmit for those. Crucially: if fresh_pending > 0, scheduler workers are still queued (PBS Q state) and will claim retried tasks when they start. After a resubmission, allow a grace period for workers to start before flagging again (prevents duplicate submissions).
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #868 +/- ##
==========================================
+ Coverage 60.09% 60.18% +0.08%
==========================================
Files 102 102
Lines 31045 31056 +11
Branches 8087 8090 +3
==========================================
+ Hits 18658 18692 +34
+ Misses 10071 10048 -23
Partials 2316 2316
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Enhances pipe reconciliation to address scenarios where the underlying scheduler job for a pipe run terminates unexpectedly or is cancelled. This ensures tasks associated with a defunct scheduler job are correctly handled, preventing orphaned or stuck tasks and allowing pipe runs to reach a terminal state sooner.
CLAIMEDorRUNNINGtasks are immediately marked asORPHANED(and subsequently failed), as their workers are presumed gone.PENDINGtasks associated with a terminated scheduler job areCANCELLED, as no workers will ever claim them.FAILEDstate rather than getting stuck.