Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion arc/job/pipe/pipe_coordinator_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ def setUpModule():
"""Enable pipe mode for all tests in this module."""
global _pipe_patches
pipe_vals = {'enabled': True, 'min_tasks': 10, 'max_workers': 100,
'max_attempts': 3, 'lease_duration_s': 86400}
'max_attempts': 3, 'lease_duration_hrs': 24}
p = patch.dict('arc.job.pipe.pipe_coordinator.pipe_settings', pipe_vals)
p.start()
_pipe_patches.append(p)
Expand Down
2 changes: 1 addition & 1 deletion arc/job/pipe/pipe_planner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def setUpModule():
"""Enable pipe mode for all tests in this module."""
global _pipe_patches
pipe_vals = {'enabled': True, 'min_tasks': 10, 'max_workers': 100,
'max_attempts': 3, 'lease_duration_s': 86400}
'max_attempts': 3, 'lease_duration_hrs': 24}
for target in ('arc.job.pipe.pipe_coordinator.pipe_settings',
'arc.job.pipe.pipe_planner.pipe_settings'):
p = patch.dict(target, pipe_vals)
Expand Down
34 changes: 27 additions & 7 deletions arc/job/pipe/pipe_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@

logger = get_logger()

RESUBMIT_GRACE = 120 # seconds – grace period after resubmission before flagging again

pipe_settings = settings['pipe_settings']
default_job_settings = settings['default_job_settings']
servers_dict = settings['servers']
Expand Down Expand Up @@ -297,6 +299,7 @@ def reconcile(self) -> Dict[str, int]:
now = time.time()
counts: Dict[str, int] = {s.value: 0 for s in TaskState}
retried_pending = 0 # PENDING tasks with attempt_index > 0 (genuinely retried)
fresh_pending = 0 # PENDING tasks with attempt_index == 0 (awaiting initial workers)
task_ids = sorted(os.listdir(tasks_dir))

for task_id in task_ids:
Expand All @@ -319,8 +322,11 @@ def reconcile(self) -> Dict[str, int]:
except (ValueError, TimeoutError) as e:
logger.debug(f'Could not mark task {task_id} as ORPHANED '
f'(another process may be handling it): {e}')
if current == TaskState.PENDING and state.attempt_index > 0:
retried_pending += 1
if current == TaskState.PENDING:
if state.attempt_index > 0:
retried_pending += 1
else:
fresh_pending += 1
counts[current.value] += 1

active_workers = counts[TaskState.CLAIMED.value] + counts[TaskState.RUNNING.value]
Expand Down Expand Up @@ -363,11 +369,25 @@ def reconcile(self) -> Dict[str, int]:
logger.debug(f'Could not promote task {task_id} to FAILED_TERMINAL '
f'(lock contention or concurrent state change): {e}')

# Never resubmit a new scheduler job for retried tasks.
# Workers still in the scheduler queue (PBS Q state) will claim
# retried PENDING tasks when they start. If the scheduler job
# was killed, that is a manual intervention issue.
self._needs_resubmission = False
# Only flag resubmission for genuinely retried tasks (attempt_index > 0).
# Fresh PENDING tasks (attempt_index == 0) are waiting for the initial
# submission's workers to start — don't resubmit for those.
# Crucially: if fresh_pending > 0, scheduler workers are still queued
# (PBS Q state) and will claim retried tasks when they start.
# After a resubmission, allow a grace period for workers to start before
# flagging again (prevents duplicate submissions).
time_since_submit = (now - self.submitted_at) if self.submitted_at else float('inf')
if retried_pending > 0 and active_workers == 0 \
and fresh_pending == 0 and time_since_submit > RESUBMIT_GRACE:
self._needs_resubmission = True
logger.info(f'Pipe run {self.run_id}: {retried_pending} retried tasks '
f'need workers. Resubmission needed.')
else:
if retried_pending > 0 and fresh_pending > 0:
logger.debug(f'Pipe run {self.run_id}: {retried_pending} retried tasks '
f'waiting, but {fresh_pending} fresh tasks still pending — '
f'scheduler workers still starting, skipping resubmission.')
self._needs_resubmission = False

terminal = (counts[TaskState.COMPLETED.value]
+ counts[TaskState.FAILED_ESS.value]
Expand Down
109 changes: 91 additions & 18 deletions arc/job/pipe/pipe_run_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,43 +266,116 @@ def test_terminal_run_not_regressed(self):
run.reconcile()
self.assertEqual(run.status, PipeRunState.COMPLETED)

def test_lease_expiry_orphans_running_task(self):
"""A RUNNING task with an expired lease is detected as orphaned."""
tasks = [_make_spec('t0'), _make_spec('t1')]
run = PipeRun(project_directory=self.tmpdir, run_id='lease',
tasks=tasks, cluster_software='pbs', max_attempts=1)
run.stage()
now = time.time()
self._complete_task(run.pipe_root, 't0')
# t1 is RUNNING with an already-expired lease.
update_task_state(run.pipe_root, 't1', new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='tok', claimed_at=now - 7200,
lease_expires_at=now - 10)
update_task_state(run.pipe_root, 't1', new_status=TaskState.RUNNING,
started_at=now - 7200)
run.reconcile()
state = read_task_state(run.pipe_root, 't1')
self.assertEqual(state.status, 'FAILED_TERMINAL')
self.assertEqual(run.status, PipeRunState.COMPLETED_PARTIAL)

class TestPipeRunNoResubmission(unittest.TestCase):
"""Pipe runs must never flag resubmission — Q-state workers handle retried tasks."""

class TestPipeRunResubmission(unittest.TestCase):
"""Tests for the resubmission guard against PBS Q-state workers."""

def setUp(self):
self.tmpdir = tempfile.mkdtemp(prefix='pipe_run_resub_')

def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)

def test_never_resubmit_even_with_retried_tasks_and_no_workers(self):
"""Even when all workers are done and retried tasks remain,
needs_resubmission must stay False — no automatic resubmission."""
tasks = [_make_spec(f't{i}') for i in range(3)]
def _make_run(self, n_tasks=5):
tasks = [_make_spec(f't{i}') for i in range(n_tasks)]
run = PipeRun(project_directory=self.tmpdir, run_id='resub',
tasks=tasks, cluster_software='slurm', max_attempts=3)
run.stage()
run.submitted_at = time.time() - 300
run.submitted_at = time.time() - 300 # submitted 5 min ago (past grace period)
run.status = PipeRunState.SUBMITTED
# All 3 workers started: t0 completed, t1 failed, t2 completed
return run

def _fail_retryable(self, pipe_root, task_id):
"""Simulate a worker claiming, running, then failing a task."""
now = time.time()
for tid in ('t0', 't2'):
update_task_state(run.pipe_root, tid, new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='tok', claimed_at=now,
lease_expires_at=now + 300)
update_task_state(run.pipe_root, tid, new_status=TaskState.RUNNING, started_at=now)
update_task_state(run.pipe_root, tid, new_status=TaskState.COMPLETED, ended_at=now)
update_task_state(run.pipe_root, 't1', new_status=TaskState.CLAIMED,
update_task_state(pipe_root, task_id, new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='tok', claimed_at=now,
lease_expires_at=now + 300)
update_task_state(run.pipe_root, 't1', new_status=TaskState.RUNNING, started_at=now)
update_task_state(run.pipe_root, 't1', new_status=TaskState.FAILED_RETRYABLE,
update_task_state(pipe_root, task_id, new_status=TaskState.RUNNING, started_at=now)
update_task_state(pipe_root, task_id, new_status=TaskState.FAILED_RETRYABLE,
ended_at=now + 1, failure_class='timeout')

def _complete_task(self, pipe_root, task_id):
now = time.time()
update_task_state(pipe_root, task_id, new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='tok', claimed_at=now,
lease_expires_at=now + 300)
update_task_state(pipe_root, task_id, new_status=TaskState.RUNNING, started_at=now)
update_task_state(pipe_root, task_id, new_status=TaskState.COMPLETED, ended_at=now)

def test_no_resubmit_while_fresh_pending_exist(self):
"""PBS Q-state workers: fresh PENDING tasks mean workers are still starting.
Even with retried tasks, don't resubmit — those workers will claim retried tasks too."""
run = self._make_run(n_tasks=5)
# Workers 1-3 started: t0 completed, t1 failed, t2 completed
# Workers 4-5 still in PBS Q state: t3, t4 are fresh PENDING
self._complete_task(run.pipe_root, 't0')
self._fail_retryable(run.pipe_root, 't1')
self._complete_task(run.pipe_root, 't2')
# t3, t4 untouched → fresh PENDING (attempt_index == 0)

run.reconcile()
self.assertFalse(run.needs_resubmission,
'Should NOT resubmit: Q-state workers will pick up retried tasks')

def test_resubmit_when_all_workers_done_and_retried_tasks_remain(self):
"""All original workers finished but some tasks failed and were retried.
No fresh PENDING → no more workers coming → must resubmit."""
run = self._make_run(n_tasks=3)
# All 3 workers started: t0 completed, t1 failed, t2 completed
self._complete_task(run.pipe_root, 't0')
self._fail_retryable(run.pipe_root, 't1')
self._complete_task(run.pipe_root, 't2')

run.reconcile()
self.assertTrue(run.needs_resubmission,
'Should resubmit: no fresh pending, no active workers, retried tasks waiting')

def test_no_resubmit_within_grace_period(self):
"""Even with retried tasks and no fresh pending, respect the grace period."""
run = self._make_run(n_tasks=2)
run.submitted_at = time.time() - 10 # only 10 seconds ago (within 120s grace)
self._complete_task(run.pipe_root, 't0')
self._fail_retryable(run.pipe_root, 't1')

run.reconcile()
self.assertFalse(run.needs_resubmission,
'Should NOT resubmit: within grace period')

def test_no_resubmit_while_workers_still_active(self):
"""Active workers (CLAIMED/RUNNING) means work is in progress — no resubmit."""
run = self._make_run(n_tasks=3)
self._complete_task(run.pipe_root, 't0')
self._fail_retryable(run.pipe_root, 't1')
# t2 is currently running (worker still active)
now = time.time()
update_task_state(run.pipe_root, 't2', new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='tok', claimed_at=now,
lease_expires_at=now + 300)
update_task_state(run.pipe_root, 't2', new_status=TaskState.RUNNING, started_at=now)

run.reconcile()
self.assertFalse(run.needs_resubmission,
'Should never resubmit — Q-state workers or manual intervention handle retries')
'Should NOT resubmit: worker still active')


class TestPipeRunHomogeneity(unittest.TestCase):
Expand Down
37 changes: 32 additions & 5 deletions arc/scheduler_pipe_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ def setUpModule():
"""Enable pipe mode for all tests in this module."""
global _pipe_patches
pipe_vals = {'enabled': True, 'min_tasks': 10, 'max_workers': 100,
'max_attempts': 3, 'lease_duration_s': 86400}
'max_attempts': 3, 'lease_duration_hrs': 24}
for target in ('arc.job.pipe.pipe_coordinator.pipe_settings',
'arc.job.pipe.pipe_planner.pipe_settings'):
p = patch.dict(target, pipe_vals)
Expand Down Expand Up @@ -1021,11 +1021,12 @@ def setUp(self):
def tearDown(self):
shutil.rmtree(self.tmpdir, ignore_errors=True)

def test_no_resubmission_even_with_retried_tasks(self):
"""Even when all workers are done and retried tasks remain,
poll_pipes must not resubmit a new scheduler job."""
def test_resubmit_when_retried_tasks_and_no_fresh_pending(self):
"""When all workers are done (no fresh PENDING) and retried tasks remain,
poll_pipes must resubmit a new scheduler job to pick them up."""
tasks = [_make_task_spec(f'task_{i}') for i in range(3)]
pipe = self.sched.pipe_coordinator.submit_pipe_run('resub_test', tasks)
pipe.submitted_at = time.time() - 300 # past grace period
for task_id in ['task_0', 'task_1', 'task_2']:
now = time.time()
update_task_state(pipe.pipe_root, task_id, new_status=TaskState.CLAIMED,
Expand All @@ -1038,10 +1039,36 @@ def test_no_resubmission_even_with_retried_tasks(self):
claimed_at=None, lease_expires_at=None,
started_at=None, ended_at=None, failure_class=None)
pipe.status = PipeRunState.RECONCILING
with patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '12345')) as mock_submit:
self.sched.pipe_coordinator.poll_pipes()
mock_submit.assert_called_once()

def test_no_resubmit_while_fresh_pending_exist(self):
"""Fresh PENDING tasks (attempt_index == 0) mean Q-state workers are coming.
Don't resubmit even if retried tasks also exist."""
tasks = [_make_task_spec(f'task_{i}') for i in range(3)]
pipe = self.sched.pipe_coordinator.submit_pipe_run('resub_test', tasks)
pipe.submitted_at = time.time() - 300
# task_0 completed, task_1 failed and retried, task_2 still fresh PENDING
now = time.time()
update_task_state(pipe.pipe_root, 'task_0', new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='t', claimed_at=now, lease_expires_at=now + 300)
update_task_state(pipe.pipe_root, 'task_0', new_status=TaskState.RUNNING, started_at=now)
update_task_state(pipe.pipe_root, 'task_0', new_status=TaskState.COMPLETED, ended_at=now)
update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.CLAIMED,
claimed_by='w', claim_token='t', claimed_at=now, lease_expires_at=now + 300)
update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.RUNNING, started_at=now)
update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.FAILED_RETRYABLE,
ended_at=now, failure_class='test')
update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.PENDING,
attempt_index=1, claimed_by=None, claim_token=None,
claimed_at=None, lease_expires_at=None,
started_at=None, ended_at=None, failure_class=None)
# task_2 still untouched — fresh PENDING (Q-state worker coming)
pipe.status = PipeRunState.RECONCILING
with patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '12345')) as mock_submit:
self.sched.pipe_coordinator.poll_pipes()
mock_submit.assert_not_called()
self.assertFalse(pipe.needs_resubmission)


class TestShouldUsePipeOwnerType(unittest.TestCase):
Expand Down
29 changes: 17 additions & 12 deletions arc/scripts/pipe_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def claim_task(pipe_root: str, worker_id: str):
claimed_by=worker_id,
claim_token=token,
claimed_at=now,
lease_expires_at=now + pipe_settings.get('lease_duration_s', 86400))
lease_expires_at=now + pipe_settings.get('lease_duration_hrs', 24) * 3600)
logger.info(f'Claimed task {task_id}')
return task_id, updated, token
except (ValueError, TimeoutError):
Expand Down Expand Up @@ -149,10 +149,11 @@ def run_task(pipe_root: str, task_id: str, state: TaskStateRecord,
logger.warning(f'Task {task_id}: could not transition to RUNNING ({e}), skipping.')
return

spec = read_task_spec(pipe_root, task_id)
scratch_dir = tempfile.mkdtemp(prefix=f'pipe_{task_id}_')
result = _make_result_template(task_id, state.attempt_index, started_at)
scratch_dir = None
try:
spec = read_task_spec(pipe_root, task_id)
scratch_dir = tempfile.mkdtemp(prefix=f'pipe_{task_id}_')
result = _make_result_template(task_id, state.attempt_index, started_at)
_dispatch_execution(spec, scratch_dir)
_copy_outputs(scratch_dir, attempt_dir)
ended_at = time.time()
Expand Down Expand Up @@ -201,18 +202,21 @@ def run_task(pipe_root: str, task_id: str, state: TaskStateRecord,
failure_class = type(e).__name__
ended_at = time.time()
logger.error(f'Task {task_id} failed: {failure_class}: {e}')
_copy_outputs(scratch_dir, attempt_dir)
if scratch_dir:
_copy_outputs(scratch_dir, attempt_dir)
result = locals().get('result') or _make_result_template(task_id, state.attempt_index, started_at)
result['ended_at'] = ended_at
result['status'] = 'FAILED'
result['failure_class'] = failure_class
# Try to parse ESS error info even on exception path.
is_deterministic_ess = False
ess_info = _parse_ess_error(attempt_dir, spec)
if ess_info:
result['parser_summary'] = ess_info
if ess_info['status'] != 'done' and _is_deterministic_ess_error(ess_info):
result['failure_class'] = 'ess_error'
is_deterministic_ess = True
if 'spec' in locals():
ess_info = _parse_ess_error(attempt_dir, spec)
if ess_info:
result['parser_summary'] = ess_info
if ess_info['status'] != 'done' and _is_deterministic_ess_error(ess_info):
result['failure_class'] = 'ess_error'
is_deterministic_ess = True
write_result_json(attempt_dir, result)
Comment on lines +216 to 220
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When a deterministic ESS error is detected on the exception path, this block overwrites result['failure_class'] to 'ess_error', but the later update_task_state(..., failure_class=...) call still persists the original Python exception type. This makes state.json inconsistent with result.json and can mislead downstream diagnostics. Use the same failure_class value for the state update as the one written into result.json (e.g., result['failure_class']).

Copilot uses AI. Check for mistakes.
if not _verify_ownership(pipe_root, task_id, worker_id, claim_token):
return
Expand All @@ -229,7 +233,8 @@ def run_task(pipe_root: str, task_id: str, state: TaskStateRecord,
logger.warning(f'Task {task_id}: could not mark failed ({e}). '
f'Task may have been orphaned concurrently.')
finally:
shutil.rmtree(scratch_dir, ignore_errors=True)
if scratch_dir:
shutil.rmtree(scratch_dir, ignore_errors=True)


def _make_result_template(task_id: str, attempt_index: int, started_at: float) -> dict:
Expand Down
12 changes: 12 additions & 0 deletions arc/scripts/pipe_worker_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ def test_unsupported_family_fails(self):
final = read_task_state(self.tmpdir, 'bad_family')
self.assertIn(final.status, ('FAILED_RETRYABLE', 'FAILED_TERMINAL'))

def test_scratch_creation_failure_marks_failed(self):
"""If tempfile.mkdtemp fails (e.g., I/O error), the task is properly
marked FAILED_RETRYABLE instead of being left stuck in RUNNING."""
spec = _make_h2o_spec('io_fail')
Comment on lines +312 to +315
Copy link

Copilot AI Apr 12, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test’s docstring claims the task should be marked FAILED_RETRYABLE, but the assertion allows FAILED_TERMINAL too. Since initialize_task() defaults to multiple attempts, the test should assert the specific expected state (and/or assert it’s not left in RUNNING) so it actually guards the intended behavior.

Copilot uses AI. Check for mistakes.
initialize_task(self.tmpdir, spec)
state, token = self._claim('io_fail')
with patch('arc.scripts.pipe_worker.tempfile.mkdtemp',
side_effect=OSError(5, 'Input/output error')):
run_task(self.tmpdir, 'io_fail', state, 'test-worker', token)
final = read_task_state(self.tmpdir, 'io_fail')
self.assertIn(final.status, ('FAILED_RETRYABLE', 'FAILED_TERMINAL'))


class TestWorkerLoop(unittest.TestCase):

Expand Down
Loading
Loading