diff --git a/arc/job/pipe/pipe_coordinator_test.py b/arc/job/pipe/pipe_coordinator_test.py index 2a963b37d5..61f2d0958b 100644 --- a/arc/job/pipe/pipe_coordinator_test.py +++ b/arc/job/pipe/pipe_coordinator_test.py @@ -30,7 +30,7 @@ def setUpModule(): """Enable pipe mode for all tests in this module.""" global _pipe_patches pipe_vals = {'enabled': True, 'min_tasks': 10, 'max_workers': 100, - 'max_attempts': 3, 'lease_duration_s': 86400} + 'max_attempts': 3, 'lease_duration_hrs': 24} p = patch.dict('arc.job.pipe.pipe_coordinator.pipe_settings', pipe_vals) p.start() _pipe_patches.append(p) diff --git a/arc/job/pipe/pipe_planner_test.py b/arc/job/pipe/pipe_planner_test.py index 8faaf21f19..bc1b38b113 100644 --- a/arc/job/pipe/pipe_planner_test.py +++ b/arc/job/pipe/pipe_planner_test.py @@ -23,7 +23,7 @@ def setUpModule(): """Enable pipe mode for all tests in this module.""" global _pipe_patches pipe_vals = {'enabled': True, 'min_tasks': 10, 'max_workers': 100, - 'max_attempts': 3, 'lease_duration_s': 86400} + 'max_attempts': 3, 'lease_duration_hrs': 24} for target in ('arc.job.pipe.pipe_coordinator.pipe_settings', 'arc.job.pipe.pipe_planner.pipe_settings'): p = patch.dict(target, pipe_vals) diff --git a/arc/job/pipe/pipe_run.py b/arc/job/pipe/pipe_run.py index 7093a9df80..6577b11f35 100644 --- a/arc/job/pipe/pipe_run.py +++ b/arc/job/pipe/pipe_run.py @@ -35,6 +35,8 @@ logger = get_logger() +RESUBMIT_GRACE = 120 # seconds – grace period after resubmission before flagging again + pipe_settings = settings['pipe_settings'] default_job_settings = settings['default_job_settings'] servers_dict = settings['servers'] @@ -297,6 +299,7 @@ def reconcile(self) -> Dict[str, int]: now = time.time() counts: Dict[str, int] = {s.value: 0 for s in TaskState} retried_pending = 0 # PENDING tasks with attempt_index > 0 (genuinely retried) + fresh_pending = 0 # PENDING tasks with attempt_index == 0 (awaiting initial workers) task_ids = sorted(os.listdir(tasks_dir)) for task_id in task_ids: @@ -319,8 +322,11 @@ def reconcile(self) -> Dict[str, int]: except (ValueError, TimeoutError) as e: logger.debug(f'Could not mark task {task_id} as ORPHANED ' f'(another process may be handling it): {e}') - if current == TaskState.PENDING and state.attempt_index > 0: - retried_pending += 1 + if current == TaskState.PENDING: + if state.attempt_index > 0: + retried_pending += 1 + else: + fresh_pending += 1 counts[current.value] += 1 active_workers = counts[TaskState.CLAIMED.value] + counts[TaskState.RUNNING.value] @@ -363,11 +369,25 @@ def reconcile(self) -> Dict[str, int]: logger.debug(f'Could not promote task {task_id} to FAILED_TERMINAL ' f'(lock contention or concurrent state change): {e}') - # Never resubmit a new scheduler job for retried tasks. - # Workers still in the scheduler queue (PBS Q state) will claim - # retried PENDING tasks when they start. If the scheduler job - # was killed, that is a manual intervention issue. - self._needs_resubmission = False + # Only flag resubmission for genuinely retried tasks (attempt_index > 0). + # Fresh PENDING tasks (attempt_index == 0) are waiting for the initial + # submission's workers to start — don't resubmit for those. + # Crucially: if fresh_pending > 0, scheduler workers are still queued + # (PBS Q state) and will claim retried tasks when they start. + # After a resubmission, allow a grace period for workers to start before + # flagging again (prevents duplicate submissions). + time_since_submit = (now - self.submitted_at) if self.submitted_at else float('inf') + if retried_pending > 0 and active_workers == 0 \ + and fresh_pending == 0 and time_since_submit > RESUBMIT_GRACE: + self._needs_resubmission = True + logger.info(f'Pipe run {self.run_id}: {retried_pending} retried tasks ' + f'need workers. Resubmission needed.') + else: + if retried_pending > 0 and fresh_pending > 0: + logger.debug(f'Pipe run {self.run_id}: {retried_pending} retried tasks ' + f'waiting, but {fresh_pending} fresh tasks still pending — ' + f'scheduler workers still starting, skipping resubmission.') + self._needs_resubmission = False terminal = (counts[TaskState.COMPLETED.value] + counts[TaskState.FAILED_ESS.value] diff --git a/arc/job/pipe/pipe_run_test.py b/arc/job/pipe/pipe_run_test.py index dec018c449..7a2310ce1d 100644 --- a/arc/job/pipe/pipe_run_test.py +++ b/arc/job/pipe/pipe_run_test.py @@ -266,9 +266,28 @@ def test_terminal_run_not_regressed(self): run.reconcile() self.assertEqual(run.status, PipeRunState.COMPLETED) + def test_lease_expiry_orphans_running_task(self): + """A RUNNING task with an expired lease is detected as orphaned.""" + tasks = [_make_spec('t0'), _make_spec('t1')] + run = PipeRun(project_directory=self.tmpdir, run_id='lease', + tasks=tasks, cluster_software='pbs', max_attempts=1) + run.stage() + now = time.time() + self._complete_task(run.pipe_root, 't0') + # t1 is RUNNING with an already-expired lease. + update_task_state(run.pipe_root, 't1', new_status=TaskState.CLAIMED, + claimed_by='w', claim_token='tok', claimed_at=now - 7200, + lease_expires_at=now - 10) + update_task_state(run.pipe_root, 't1', new_status=TaskState.RUNNING, + started_at=now - 7200) + run.reconcile() + state = read_task_state(run.pipe_root, 't1') + self.assertEqual(state.status, 'FAILED_TERMINAL') + self.assertEqual(run.status, PipeRunState.COMPLETED_PARTIAL) -class TestPipeRunNoResubmission(unittest.TestCase): - """Pipe runs must never flag resubmission — Q-state workers handle retried tasks.""" + +class TestPipeRunResubmission(unittest.TestCase): + """Tests for the resubmission guard against PBS Q-state workers.""" def setUp(self): self.tmpdir = tempfile.mkdtemp(prefix='pipe_run_resub_') @@ -276,33 +295,87 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir, ignore_errors=True) - def test_never_resubmit_even_with_retried_tasks_and_no_workers(self): - """Even when all workers are done and retried tasks remain, - needs_resubmission must stay False — no automatic resubmission.""" - tasks = [_make_spec(f't{i}') for i in range(3)] + def _make_run(self, n_tasks=5): + tasks = [_make_spec(f't{i}') for i in range(n_tasks)] run = PipeRun(project_directory=self.tmpdir, run_id='resub', tasks=tasks, cluster_software='slurm', max_attempts=3) run.stage() - run.submitted_at = time.time() - 300 + run.submitted_at = time.time() - 300 # submitted 5 min ago (past grace period) run.status = PipeRunState.SUBMITTED - # All 3 workers started: t0 completed, t1 failed, t2 completed + return run + + def _fail_retryable(self, pipe_root, task_id): + """Simulate a worker claiming, running, then failing a task.""" now = time.time() - for tid in ('t0', 't2'): - update_task_state(run.pipe_root, tid, new_status=TaskState.CLAIMED, - claimed_by='w', claim_token='tok', claimed_at=now, - lease_expires_at=now + 300) - update_task_state(run.pipe_root, tid, new_status=TaskState.RUNNING, started_at=now) - update_task_state(run.pipe_root, tid, new_status=TaskState.COMPLETED, ended_at=now) - update_task_state(run.pipe_root, 't1', new_status=TaskState.CLAIMED, + update_task_state(pipe_root, task_id, new_status=TaskState.CLAIMED, claimed_by='w', claim_token='tok', claimed_at=now, lease_expires_at=now + 300) - update_task_state(run.pipe_root, 't1', new_status=TaskState.RUNNING, started_at=now) - update_task_state(run.pipe_root, 't1', new_status=TaskState.FAILED_RETRYABLE, + update_task_state(pipe_root, task_id, new_status=TaskState.RUNNING, started_at=now) + update_task_state(pipe_root, task_id, new_status=TaskState.FAILED_RETRYABLE, ended_at=now + 1, failure_class='timeout') + def _complete_task(self, pipe_root, task_id): + now = time.time() + update_task_state(pipe_root, task_id, new_status=TaskState.CLAIMED, + claimed_by='w', claim_token='tok', claimed_at=now, + lease_expires_at=now + 300) + update_task_state(pipe_root, task_id, new_status=TaskState.RUNNING, started_at=now) + update_task_state(pipe_root, task_id, new_status=TaskState.COMPLETED, ended_at=now) + + def test_no_resubmit_while_fresh_pending_exist(self): + """PBS Q-state workers: fresh PENDING tasks mean workers are still starting. + Even with retried tasks, don't resubmit — those workers will claim retried tasks too.""" + run = self._make_run(n_tasks=5) + # Workers 1-3 started: t0 completed, t1 failed, t2 completed + # Workers 4-5 still in PBS Q state: t3, t4 are fresh PENDING + self._complete_task(run.pipe_root, 't0') + self._fail_retryable(run.pipe_root, 't1') + self._complete_task(run.pipe_root, 't2') + # t3, t4 untouched → fresh PENDING (attempt_index == 0) + + run.reconcile() + self.assertFalse(run.needs_resubmission, + 'Should NOT resubmit: Q-state workers will pick up retried tasks') + + def test_resubmit_when_all_workers_done_and_retried_tasks_remain(self): + """All original workers finished but some tasks failed and were retried. + No fresh PENDING → no more workers coming → must resubmit.""" + run = self._make_run(n_tasks=3) + # All 3 workers started: t0 completed, t1 failed, t2 completed + self._complete_task(run.pipe_root, 't0') + self._fail_retryable(run.pipe_root, 't1') + self._complete_task(run.pipe_root, 't2') + + run.reconcile() + self.assertTrue(run.needs_resubmission, + 'Should resubmit: no fresh pending, no active workers, retried tasks waiting') + + def test_no_resubmit_within_grace_period(self): + """Even with retried tasks and no fresh pending, respect the grace period.""" + run = self._make_run(n_tasks=2) + run.submitted_at = time.time() - 10 # only 10 seconds ago (within 120s grace) + self._complete_task(run.pipe_root, 't0') + self._fail_retryable(run.pipe_root, 't1') + + run.reconcile() + self.assertFalse(run.needs_resubmission, + 'Should NOT resubmit: within grace period') + + def test_no_resubmit_while_workers_still_active(self): + """Active workers (CLAIMED/RUNNING) means work is in progress — no resubmit.""" + run = self._make_run(n_tasks=3) + self._complete_task(run.pipe_root, 't0') + self._fail_retryable(run.pipe_root, 't1') + # t2 is currently running (worker still active) + now = time.time() + update_task_state(run.pipe_root, 't2', new_status=TaskState.CLAIMED, + claimed_by='w', claim_token='tok', claimed_at=now, + lease_expires_at=now + 300) + update_task_state(run.pipe_root, 't2', new_status=TaskState.RUNNING, started_at=now) + run.reconcile() self.assertFalse(run.needs_resubmission, - 'Should never resubmit — Q-state workers or manual intervention handle retries') + 'Should NOT resubmit: worker still active') class TestPipeRunHomogeneity(unittest.TestCase): diff --git a/arc/scheduler_pipe_test.py b/arc/scheduler_pipe_test.py index 9321419c23..35c5e40604 100644 --- a/arc/scheduler_pipe_test.py +++ b/arc/scheduler_pipe_test.py @@ -89,7 +89,7 @@ def setUpModule(): """Enable pipe mode for all tests in this module.""" global _pipe_patches pipe_vals = {'enabled': True, 'min_tasks': 10, 'max_workers': 100, - 'max_attempts': 3, 'lease_duration_s': 86400} + 'max_attempts': 3, 'lease_duration_hrs': 24} for target in ('arc.job.pipe.pipe_coordinator.pipe_settings', 'arc.job.pipe.pipe_planner.pipe_settings'): p = patch.dict(target, pipe_vals) @@ -1021,11 +1021,12 @@ def setUp(self): def tearDown(self): shutil.rmtree(self.tmpdir, ignore_errors=True) - def test_no_resubmission_even_with_retried_tasks(self): - """Even when all workers are done and retried tasks remain, - poll_pipes must not resubmit a new scheduler job.""" + def test_resubmit_when_retried_tasks_and_no_fresh_pending(self): + """When all workers are done (no fresh PENDING) and retried tasks remain, + poll_pipes must resubmit a new scheduler job to pick them up.""" tasks = [_make_task_spec(f'task_{i}') for i in range(3)] pipe = self.sched.pipe_coordinator.submit_pipe_run('resub_test', tasks) + pipe.submitted_at = time.time() - 300 # past grace period for task_id in ['task_0', 'task_1', 'task_2']: now = time.time() update_task_state(pipe.pipe_root, task_id, new_status=TaskState.CLAIMED, @@ -1038,10 +1039,36 @@ def test_no_resubmission_even_with_retried_tasks(self): claimed_at=None, lease_expires_at=None, started_at=None, ended_at=None, failure_class=None) pipe.status = PipeRunState.RECONCILING + with patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '12345')) as mock_submit: + self.sched.pipe_coordinator.poll_pipes() + mock_submit.assert_called_once() + + def test_no_resubmit_while_fresh_pending_exist(self): + """Fresh PENDING tasks (attempt_index == 0) mean Q-state workers are coming. + Don't resubmit even if retried tasks also exist.""" + tasks = [_make_task_spec(f'task_{i}') for i in range(3)] + pipe = self.sched.pipe_coordinator.submit_pipe_run('resub_test', tasks) + pipe.submitted_at = time.time() - 300 + # task_0 completed, task_1 failed and retried, task_2 still fresh PENDING + now = time.time() + update_task_state(pipe.pipe_root, 'task_0', new_status=TaskState.CLAIMED, + claimed_by='w', claim_token='t', claimed_at=now, lease_expires_at=now + 300) + update_task_state(pipe.pipe_root, 'task_0', new_status=TaskState.RUNNING, started_at=now) + update_task_state(pipe.pipe_root, 'task_0', new_status=TaskState.COMPLETED, ended_at=now) + update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.CLAIMED, + claimed_by='w', claim_token='t', claimed_at=now, lease_expires_at=now + 300) + update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.RUNNING, started_at=now) + update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.FAILED_RETRYABLE, + ended_at=now, failure_class='test') + update_task_state(pipe.pipe_root, 'task_1', new_status=TaskState.PENDING, + attempt_index=1, claimed_by=None, claim_token=None, + claimed_at=None, lease_expires_at=None, + started_at=None, ended_at=None, failure_class=None) + # task_2 still untouched — fresh PENDING (Q-state worker coming) + pipe.status = PipeRunState.RECONCILING with patch.object(pipe, 'submit_to_scheduler', return_value=('submitted', '12345')) as mock_submit: self.sched.pipe_coordinator.poll_pipes() mock_submit.assert_not_called() - self.assertFalse(pipe.needs_resubmission) class TestShouldUsePipeOwnerType(unittest.TestCase): diff --git a/arc/scripts/pipe_worker.py b/arc/scripts/pipe_worker.py index ad37233933..0a8112aac9 100644 --- a/arc/scripts/pipe_worker.py +++ b/arc/scripts/pipe_worker.py @@ -86,7 +86,7 @@ def claim_task(pipe_root: str, worker_id: str): claimed_by=worker_id, claim_token=token, claimed_at=now, - lease_expires_at=now + pipe_settings.get('lease_duration_s', 86400)) + lease_expires_at=now + pipe_settings.get('lease_duration_hrs', 24) * 3600) logger.info(f'Claimed task {task_id}') return task_id, updated, token except (ValueError, TimeoutError): @@ -149,10 +149,11 @@ def run_task(pipe_root: str, task_id: str, state: TaskStateRecord, logger.warning(f'Task {task_id}: could not transition to RUNNING ({e}), skipping.') return - spec = read_task_spec(pipe_root, task_id) - scratch_dir = tempfile.mkdtemp(prefix=f'pipe_{task_id}_') - result = _make_result_template(task_id, state.attempt_index, started_at) + scratch_dir = None try: + spec = read_task_spec(pipe_root, task_id) + scratch_dir = tempfile.mkdtemp(prefix=f'pipe_{task_id}_') + result = _make_result_template(task_id, state.attempt_index, started_at) _dispatch_execution(spec, scratch_dir) _copy_outputs(scratch_dir, attempt_dir) ended_at = time.time() @@ -201,18 +202,21 @@ def run_task(pipe_root: str, task_id: str, state: TaskStateRecord, failure_class = type(e).__name__ ended_at = time.time() logger.error(f'Task {task_id} failed: {failure_class}: {e}') - _copy_outputs(scratch_dir, attempt_dir) + if scratch_dir: + _copy_outputs(scratch_dir, attempt_dir) + result = locals().get('result') or _make_result_template(task_id, state.attempt_index, started_at) result['ended_at'] = ended_at result['status'] = 'FAILED' result['failure_class'] = failure_class # Try to parse ESS error info even on exception path. is_deterministic_ess = False - ess_info = _parse_ess_error(attempt_dir, spec) - if ess_info: - result['parser_summary'] = ess_info - if ess_info['status'] != 'done' and _is_deterministic_ess_error(ess_info): - result['failure_class'] = 'ess_error' - is_deterministic_ess = True + if 'spec' in locals(): + ess_info = _parse_ess_error(attempt_dir, spec) + if ess_info: + result['parser_summary'] = ess_info + if ess_info['status'] != 'done' and _is_deterministic_ess_error(ess_info): + result['failure_class'] = 'ess_error' + is_deterministic_ess = True write_result_json(attempt_dir, result) if not _verify_ownership(pipe_root, task_id, worker_id, claim_token): return @@ -229,7 +233,8 @@ def run_task(pipe_root: str, task_id: str, state: TaskStateRecord, logger.warning(f'Task {task_id}: could not mark failed ({e}). ' f'Task may have been orphaned concurrently.') finally: - shutil.rmtree(scratch_dir, ignore_errors=True) + if scratch_dir: + shutil.rmtree(scratch_dir, ignore_errors=True) def _make_result_template(task_id: str, attempt_index: int, started_at: float) -> dict: diff --git a/arc/scripts/pipe_worker_test.py b/arc/scripts/pipe_worker_test.py index bb6a01cb8d..4ead0144d4 100644 --- a/arc/scripts/pipe_worker_test.py +++ b/arc/scripts/pipe_worker_test.py @@ -309,6 +309,18 @@ def test_unsupported_family_fails(self): final = read_task_state(self.tmpdir, 'bad_family') self.assertIn(final.status, ('FAILED_RETRYABLE', 'FAILED_TERMINAL')) + def test_scratch_creation_failure_marks_failed(self): + """If tempfile.mkdtemp fails (e.g., I/O error), the task is properly + marked FAILED_RETRYABLE instead of being left stuck in RUNNING.""" + spec = _make_h2o_spec('io_fail') + initialize_task(self.tmpdir, spec) + state, token = self._claim('io_fail') + with patch('arc.scripts.pipe_worker.tempfile.mkdtemp', + side_effect=OSError(5, 'Input/output error')): + run_task(self.tmpdir, 'io_fail', state, 'test-worker', token) + final = read_task_state(self.tmpdir, 'io_fail') + self.assertIn(final.status, ('FAILED_RETRYABLE', 'FAILED_TERMINAL')) + class TestWorkerLoop(unittest.TestCase): diff --git a/arc/settings/settings.py b/arc/settings/settings.py index 7203ef8a8f..1c13eb6666 100644 --- a/arc/settings/settings.py +++ b/arc/settings/settings.py @@ -306,7 +306,7 @@ 'min_tasks': 10, # Minimum batch size to trigger pipe mode. 'max_workers': 100, # Upper bound on array worker slots per PipeRun. 'max_attempts': 3, # Retry budget per task before terminal failure. - 'lease_duration_s': 86400, # Worker lease duration in seconds (default 24h). + 'lease_duration_hrs': 1, # Worker lease duration in hours (default 1h). 'env_setup': {}, # Engine-specific shell setup commands, e.g., # {'gaussian': 'source /usr/local/g09/setup.sh', # 'orca': 'source /usr/local/orca-5.0.4/setup.sh && source /usr/local/openmpi-4.1.1/setup.sh'} diff --git a/docs/source/advanced.rst b/docs/source/advanced.rst index af35ece6cf..091fa90b41 100644 --- a/docs/source/advanced.rst +++ b/docs/source/advanced.rst @@ -983,7 +983,7 @@ Pipe mode is configured via ``pipe_settings`` in ``arc/settings/settings.py`` 'min_tasks': 10, # Minimum batch size to trigger pipe mode. 'max_workers': 100, # Upper bound on array worker slots per PipeRun. 'max_attempts': 3, # Retry budget per task before terminal failure. - 'lease_duration_s': 86400, # Worker lease duration in seconds (default 24h). + 'lease_duration_hrs': 24, # Worker lease duration in hours (default 24h). 'env_setup': {}, # Engine-specific shell setup commands, e.g., # {'gaussian': 'source /usr/local/g09/setup.sh'} 'scratch_base': '', # Base directory for worker scratch (e.g., '/gtmp').