From 5f36008a03a75b71b53d2aeee96b4da4b74fe05f Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 30 Apr 2026 17:22:21 +0000 Subject: [PATCH] Fix DPSJob.wait_for_completion() race condition with initial delay Add a default 5-second initial delay before the first status check to allow newly submitted jobs to appear in the OpenSearch database. This fixes the issue where wait_for_completion() would return immediately with incorrect status ('Deleted') if called right after job submission. Changes: - Add time module import - Add initial_delay_seconds parameter (default 5) to wait_for_completion() - Improve status check logic to handle any non-terminal state by retrying - Update docstring with parameter documentation Fixes issue #126 https://claude.ai/code/session_01KEDGhwSuoY9N5inWR9bk5u --- maap/dps/dps_job.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/maap/dps/dps_job.py b/maap/dps/dps_job.py index 8b1a3d0..47d7255 100644 --- a/maap/dps/dps_job.py +++ b/maap/dps/dps_job.py @@ -45,6 +45,7 @@ import json import logging import os +import time import xml.etree.ElementTree as ET import backoff from urllib.parse import urljoin @@ -227,13 +228,20 @@ def retrieve_status(self): return self.status @backoff.on_exception(backoff.expo, Exception, max_value=64, max_time=172800) - def wait_for_completion(self): + def wait_for_completion(self, initial_delay_seconds=5): """ Wait for the job to complete. Blocks execution until the job finishes (succeeds, fails, or is cancelled). Uses exponential backoff to poll for status updates. + Parameters + ---------- + initial_delay_seconds : float, optional + Initial delay in seconds before first status check (default: 5). + This accounts for the time required for newly submitted jobs to + become visible in the database. Set to 0 to disable initial delay. + Returns ------- DPSJob @@ -254,14 +262,22 @@ def wait_for_completion(self): - Uses exponential backoff with max interval of 64 seconds - Maximum wait time is 48 hours (172800 seconds) - The job object is updated with final status upon completion + - Initial delay accounts for database visibility delay (~5 seconds) See Also -------- :meth:`retrieve_status` : Check status without blocking :meth:`cancel_job` : Cancel a running job """ + # Wait before first check to allow newly submitted jobs to appear in database + if initial_delay_seconds > 0: + logger.debug(f'Waiting {initial_delay_seconds} seconds before first status check') + time.sleep(initial_delay_seconds) + self.retrieve_status() - if self.status.lower() in ["accepted", "running"]: + # Known terminal states and states that should trigger retries + terminal_states = ["succeeded", "failed", "dismissed", "deduped", "offline"] + if self.status.lower() not in terminal_states: logger.debug('Current Status is {}. Backing off.'.format(self.status)) raise RuntimeError return self