Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions maap/dps/dps_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import json
import logging
import os
import time
import xml.etree.ElementTree as ET
import backoff
from urllib.parse import urljoin
Expand Down Expand Up @@ -227,13 +228,20 @@ def retrieve_status(self):
return self.status

@backoff.on_exception(backoff.expo, Exception, max_value=64, max_time=172800)
def wait_for_completion(self):
def wait_for_completion(self, initial_delay_seconds=5):
"""
Wait for the job to complete.

Blocks execution until the job finishes (succeeds, fails, or is
cancelled). Uses exponential backoff to poll for status updates.

Parameters
----------
initial_delay_seconds : float, optional
Initial delay in seconds before first status check (default: 5).
This accounts for the time required for newly submitted jobs to
become visible in the database. Set to 0 to disable initial delay.

Returns
-------
DPSJob
Expand All @@ -254,14 +262,22 @@ def wait_for_completion(self):
- Uses exponential backoff with max interval of 64 seconds
- Maximum wait time is 48 hours (172800 seconds)
- The job object is updated with final status upon completion
- Initial delay accounts for database visibility delay (~5 seconds)

See Also
--------
:meth:`retrieve_status` : Check status without blocking
:meth:`cancel_job` : Cancel a running job
"""
# Wait before first check to allow newly submitted jobs to appear in database
if initial_delay_seconds > 0:
logger.debug(f'Waiting {initial_delay_seconds} seconds before first status check')
time.sleep(initial_delay_seconds)

self.retrieve_status()
if self.status.lower() in ["accepted", "running"]:
# Known terminal states and states that should trigger retries
terminal_states = ["succeeded", "failed", "dismissed", "deduped", "offline"]
if self.status.lower() not in terminal_states:
logger.debug('Current Status is {}. Backing off.'.format(self.status))
raise RuntimeError
return self
Expand Down
Loading