Skip to content

Improves pipe cleanup for terminated scheduler jobs#868

Open
calvinp0 wants to merge 4 commits intomainfrom
pipe_job_cancel
Open

Improves pipe cleanup for terminated scheduler jobs#868
calvinp0 wants to merge 4 commits intomainfrom
pipe_job_cancel

Conversation

@calvinp0
Copy link
Copy Markdown
Member

Enhances pipe reconciliation to address scenarios where the underlying scheduler job for a pipe run terminates unexpectedly or is cancelled. This ensures tasks associated with a defunct scheduler job are correctly handled, preventing orphaned or stuck tasks and allowing pipe runs to reach a terminal state sooner.

  • Detects terminated scheduler jobs: The main scheduler now passes active cluster job IDs to the pipe coordinator, which then determines if a pipe's primary scheduler job is still running on the cluster.
  • Reconciles active tasks:
    • If a scheduler job is no longer alive, any currently CLAIMED or RUNNING tasks are immediately marked as ORPHANED (and subsequently failed), as their workers are presumed gone.
    • Any PENDING tasks associated with a terminated scheduler job are CANCELLED, as no workers will ever claim them.
  • Increases worker robustness: Improves the pipe worker's resilience by handling failures during scratch directory creation or task specification loading, ensuring tasks transition to a FAILED state rather than getting stuck.

Copilot AI review requested due to automatic review settings April 12, 2026 12:57
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Enhances pipe-mode cleanup so pipe runs can reach terminal states promptly when their underlying scheduler (PBS/Slurm) job terminates unexpectedly, and improves worker robustness so tasks don’t get stuck.

Changes:

  • Passes active cluster job IDs from the main scheduler into the pipe coordinator to detect when a pipe’s scheduler job has left the queue.
  • Updates pipe run reconciliation to orphan in-flight tasks and cancel unreachable pending tasks when the scheduler job is gone.
  • Hardens the pipe worker around spec loading / scratch directory creation failures and adds targeted regression tests.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
arc/scripts/pipe_worker.py Guards scratch/spec setup and ensures cleanup paths don’t crash when initialization fails.
arc/scripts/pipe_worker_test.py Adds a regression test for scratch directory creation failure.
arc/scheduler.py Passes server_job_ids into PipeCoordinator.poll_pipes() during the main loop.
arc/job/pipe/pipe_run.py Adds scheduler_job_alive input and cancels PENDING tasks when the scheduler job is gone.
arc/job/pipe/pipe_run_test.py Adds tests for “scheduler job gone” orphaning/cancellation behavior.
arc/job/pipe/pipe_coordinator.py Adds scheduler-job liveness detection and threads it into reconciliation.
arc/job/pipe/pipe_coordinator_test.py Adds tests for job liveness detection and poll integration.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

scheduler_job_alive: Whether the scheduler (PBS/Slurm) job for this
pipe run is still present in the queue. When False, any
CLAIMED/RUNNING tasks are immediately orphaned (the workers
are gone) and unreachable PENDING tasks are failed.
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docstring mismatch: it says unreachable PENDING tasks are “failed”, but the implementation below transitions them to CANCELLED. Please update the docstring to reflect the actual behavior (or adjust the implementation if “failed” is the intended terminal state).

Suggested change
are gone) and unreachable PENDING tasks are failed.
are gone) and unreachable PENDING tasks are transitioned to
CANCELLED.

Copilot uses AI. Check for mistakes.
Comment on lines +380 to +384
# If the scheduler job is gone, any PENDING tasks will never be
# claimed. Cancel them so the pipe can finish.
if not scheduler_job_alive and counts[TaskState.PENDING.value] > 0:
for task_id in task_ids:
if not os.path.isdir(os.path.join(tasks_dir, task_id)):
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When scheduler_job_alive is False, CLAIMED/RUNNING tasks are first marked ORPHANED, but later the retry logic can promote ORPHANED to PENDING (when attempts remain) and this block then immediately CANCELs those PENDING tasks. That results in in-flight work being recorded as CANCELLED rather than FAILED_* and contradicts the stated intent of “orphaned (and subsequently failed)”. Consider skipping the ORPHANED→PENDING retry promotion when scheduler_job_alive is False and instead failing ORPHANED tasks terminally (or otherwise preserving a failure outcome) so job-loss cleanups don’t convert failures into cancellations.

Copilot uses AI. Check for mistakes.
Comment on lines +198 to +209
For PBS/Slurm array jobs the stored ``scheduler_job_id`` is the base
ID (e.g. ``'4018898[]'``), while the queue lists individual elements
(``'4018898[0]'``, …). We match on the numeric prefix so both
formats are recognised.

Returns True (optimistic) when *server_job_ids* is unavailable.
"""
if server_job_ids is None or pipe.scheduler_job_id is None:
return True # Cannot determine — assume alive.
base = pipe.scheduler_job_id.rstrip('[]')
return any(jid == base or jid.startswith(base + '[') for jid in server_job_ids)

Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_is_scheduler_job_alive does not currently recognize Slurm array job IDs as reported by squeue (commonly 12345_7 or 12345_[1-10]). With a base scheduler_job_id of 12345, this will incorrectly return False and trigger premature orphan/cancel cleanup for still-running pipes. Extend the match logic to also treat IDs starting with base + '_' (and any other formats produced by your check_status_command) as alive, and add a unit test for the Slurm format.

Suggested change
For PBS/Slurm array jobs the stored ``scheduler_job_id`` is the base
ID (e.g. ``'4018898[]'``), while the queue lists individual elements
(``'4018898[0]'``, …). We match on the numeric prefix so both
formats are recognised.
Returns True (optimistic) when *server_job_ids* is unavailable.
"""
if server_job_ids is None or pipe.scheduler_job_id is None:
return True # Cannot determine — assume alive.
base = pipe.scheduler_job_id.rstrip('[]')
return any(jid == base or jid.startswith(base + '[') for jid in server_job_ids)
For PBS/Slurm array jobs the stored ``scheduler_job_id`` is typically
the base ID (e.g. ``'4018898[]'`` or ``'12345'``), while the queue may
list individual elements or ranges (e.g. ``'4018898[0]'``,
``'12345_7'``, ``'12345_[1-10]'``). We therefore treat exact matches
and scheduler-specific array suffix forms for the same base ID as
alive.
Regression examples:
>>> class _Pipe:
... scheduler_job_id = '12345'
>>> PipeCoordinator._is_scheduler_job_alive(_Pipe(), ['12345_7'])
True
>>> PipeCoordinator._is_scheduler_job_alive(_Pipe(), ['12345_[1-10]'])
True
Returns True (optimistic) when *server_job_ids* is unavailable.
"""
if server_job_ids is None or pipe.scheduler_job_id is None:
return True # Cannot determine — assume alive.
base = pipe.scheduler_job_id.rstrip('[]')
def _matches_scheduler_job_id(job_id: str) -> bool:
return (
job_id == base
or job_id.startswith(base + '[')
or job_id.startswith(base + '_')
)
return any(_matches_scheduler_job_id(jid) for jid in server_job_ids)

Copilot uses AI. Check for mistakes.
Comment on lines +312 to +315
def test_scratch_creation_failure_marks_failed(self):
"""If tempfile.mkdtemp fails (e.g., I/O error), the task is properly
marked FAILED_RETRYABLE instead of being left stuck in RUNNING."""
spec = _make_h2o_spec('io_fail')
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test’s docstring claims the task should be marked FAILED_RETRYABLE, but the assertion allows FAILED_TERMINAL too. Since initialize_task() defaults to multiple attempts, the test should assert the specific expected state (and/or assert it’s not left in RUNNING) so it actually guards the intended behavior.

Copilot uses AI. Check for mistakes.
Comment on lines +216 to 220
result['parser_summary'] = ess_info
if ess_info['status'] != 'done' and _is_deterministic_ess_error(ess_info):
result['failure_class'] = 'ess_error'
is_deterministic_ess = True
write_result_json(attempt_dir, result)
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a deterministic ESS error is detected on the exception path, this block overwrites result['failure_class'] to 'ess_error', but the later update_task_state(..., failure_class=...) call still persists the original Python exception type. This makes state.json inconsistent with result.json and can mislead downstream diagnostics. Use the same failure_class value for the state update as the one written into result.json (e.g., result['failure_class']).

Copilot uses AI. Check for mistakes.
Only flag resubmission for genuinely retried tasks (attempt_index > 0).
Fresh PENDING tasks (attempt_index == 0) are waiting for the initial submission's workers to start — don't resubmit for those.
Crucially: if fresh_pending > 0, scheduler workers are still queued (PBS Q state) and will claim retried tasks when they start.
After a resubmission, allow a grace period for workers to start before flagging again (prevents duplicate submissions).
@codecov
Copy link
Copy Markdown

codecov bot commented Apr 12, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 60.18%. Comparing base (960197e) to head (6765d51).
⚠️ Report is 1 commits behind head on main.

Additional details and impacted files
@@            Coverage Diff             @@
##             main     #868      +/-   ##
==========================================
+ Coverage   60.09%   60.18%   +0.08%     
==========================================
  Files         102      102              
  Lines       31045    31056      +11     
  Branches     8087     8090       +3     
==========================================
+ Hits        18658    18692      +34     
+ Misses      10071    10048      -23     
  Partials     2316     2316              
Flag Coverage Δ
functionaltests 60.18% <ø> (+0.08%) ⬆️
unittests 60.18% <ø> (+0.08%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants