Skip to content

fix(job-orchestration): Add defensive timeouts to prevent scheduler hang (fixes #2252).#2271

Open
goynam wants to merge 3 commits into
y-scope:mainfrom
goynam:fix/query-scheduler-timeout
Open

fix(job-orchestration): Add defensive timeouts to prevent scheduler hang (fixes #2252).#2271
goynam wants to merge 3 commits into
y-scope:mainfrom
goynam:fix/query-scheduler-timeout

Conversation

@goynam
Copy link
Copy Markdown
Contributor

@goynam goynam commented May 9, 2026

Description

  1. Add defensive timeouts to compress, search, and extract_stream tasks. This terminates the job when it takes too long for some reason.
  2. Add defensive timeouts in task_results.get()

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  1. Set the timeout to 1s and attempted to compress a file. The compression failed and threw an ERROR log.
  2. The second timeout cannot actually be triggered; it was added strictly as a defensive measure per @goynam 's request.

Original PR description below

Summary

Comprehensive fix for query and compression schedulers hanging/freezing when workers die, Redis becomes stale, or infrastructure has transient failures.

Problem

The query scheduler becomes unresponsive after a worker pod is killed mid-task (rolling restart, node termination, OOM). Production recovery currently requires redis-cli FLUSHALL plus rolling-restart of workers and scheduler. Fixes #2252.

Two distinct Redis-key concerns (clarification)

When task_group.apply_async() is called, Celery writes two different kinds of keys:

Key Written by When
Group manifest (list of child task IDs) Scheduler At dispatch, immediately
Per-child task result Worker When the task finishes

These keys drive the two distinct symptoms below — they are not the same bug:

1. Why the scheduler hangs (the bug from the issue page).
When a worker is killed mid-task, only the per-child result key is missing. GroupResult.ready() checks every child; one missing → returns False forever. The scheduler keeps polling for a result that will never arrive. Before this PR async_task_result.get(...) was also unbounded, so on the rare path where ready() returned True but the backend stalled, the asyncio loop could block directly on the get call.

2. Why Redis grows unboundedly (a separate, pre-existing problem).
The group manifests from row 1 — plus any partial child results that were written before the worker died — never get garbage-collected. With result_persistent = True and no result_expires configured, every aborted job leaks Redis keys. Across many incidents and scheduler restarts these accumulate.

The PR fixes both, with separate mechanisms:

  • For the hang → per-dispatch elapsed-time check + revoke(terminate=True) + bounded get(timeout=10), layered behind Celery's own task_time_limit as the primary worker-level defense.
  • For the growth → result_expires = 7200 in both Celery configs (Celery itself GCs stale manifests).

Other failure modes addressed

  1. Dispatch blocks the event loopconcurrent.futures.as_completed in handle_pending_query_jobs is synchronous; if dispatch hangs (Redis/DB/OTEL), the entire async loop freezes.
  2. Silent exit — Unhandled exceptions in handle_job_updates kill the scheduler with no log.
  3. Compression multi-batch stall — The pre-existing zero-completion timeout fires only before the first batch completes; a job that succeeds on batch 1 and stalls on batch 2 is invisible.

Changes

Root-cause prevention

Change File Effect
task_soft_time_limit / task_time_limit Both celeryconfig.py Worker-level time limits — primary defense for "task ran too long" (#1)
result_expires = 7200 Both celeryconfig.py Celery auto-deletes results after 2h — addresses #2 (unbounded Redis growth)

Query scheduler (query_scheduler.py)

Change Effect
Per-dispatch last_dispatch_time timer Staleness check uses sub-job dispatch time, not overall job start (so healthy multi-batch queries aren't falsely failed on later batches)
Configurable task_timeout_seconds (default 300s) Backstop when worker pod is killed before honoring task_time_limit — addresses #1
revoke(terminate=True) on timeout (try/except, logger.exception) Prevents zombie workers from writing results to already-failed jobs
async_task_result.get(timeout=10, interval=0.005) + catch celery.exceptions.TimeoutError Bounds the other path into #1 — when ready() returns True but the backend stalls
Mark query_tasks rows (PENDING/RUNNING → FAILED) on timeout Mirrors the cancel path — no orphan task rows
Dispatch timeout (60s) on as_completed Unblocks event loop if dispatch hangs (#3); except clause uses concurrent.futures.TimeoutError for clarity
GroupResult.restore() try/except Handles corrupt Redis after flush/partial expiry
handle_job_updates retry loop with CancelledError propagation Survives transient errors; allows graceful shutdown (#4)
Clamp sleep to max(0.0, ...) Prevents negative sleep raising under load

Compression scheduler (compression_scheduler.py)

Change Effect
Progress-based timeout via last_progress_time Detects stalls in any batch, not just zero-completion jobs (#5)
Configurable task_timeout_seconds (default 600s) Configurable via CompressionScheduler config
result_handle.revoke() on timeout Cancels in-flight worker tasks (Celery: revoke(terminate=True); Spider: no-op)
Mark RUNNING task rows as FAILED on timeout No orphan task rows
Retry on transient errors instead of return -1 Scheduler stays alive through DB blips
Consecutive-error limit (10) Exits on persistent failures (DB down, config corruption)

TaskManager interface (scheduler/compress/task_manager/)

Change Effect
Abstract revoke() method on ResultHandle Lets the scheduler cancel in-flight work without reaching into per-implementation internals
Celery impl: revoke(terminate=True) Cancels and SIGTERMs running children
Spider impl: no-op Spider has no equivalent; relies on its per-task time limit

Configuration

File Change
clp_config.py task_timeout_seconds field on both CompressionScheduler and QueryScheduler
clp-config.template.json.yaml Documented with defaults (300s query, 600s compression)
scheduler_data.py last_progress_time on CompressionJob, last_dispatch_time on QueryJob

Failure mode coverage

Scenario Before After
Task runs longer than allowed Polls forever Celery task_time_limit SIGKILLs the worker (#1)
Worker pod killed before honoring task_time_limit (#1) ready() returns False forever; polls forever Per-dispatch timeout → revoke + FAIL after 300s
get() stalls when ready() is True (#1) Async loop blocks get(timeout=10) → retried next poll
Group manifests pile up in Redis (#2) Unbounded growth result_expires=7200 auto-cleanup
Dispatch hangs (broker/DB/OTEL) (#3) Event loop frozen 60s dispatch timeout → FAIL
handle_job_updates exception (#4) Scheduler exits silently Retry + log
Compression multi-batch stall (#5) Never detected (only zero-completion checked) last_progress_time detects any stall
GroupResult.restore() corrupt Unhandled exception try/except → FAIL
Persistent infra failure Spins or crashes forever 10 consecutive errors → exit

Test plan

  • ruff format passes
  • All dispatch paths set last_dispatch_time
  • Compression last_progress_time updates on every batch completion
  • Dict iteration uses snapshot pattern (safe to delete during iteration)
  • kill_hanging_jobs covers stale RUNNING jobs on startup
  • Manual test: kill query worker mid-task → job marked FAILED after timeout
  • Manual test: kill compression worker → job times out after configured timeout
  • Manual test: restart scheduler with stale Redis → starts clean, picks up new jobs

Fixes #2252

@goynam goynam requested a review from a team as a code owner May 9, 2026 12:00
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 9, 2026

Note

Reviews paused

It looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the reviews.auto_review.auto_pause_after_reviewed_commits setting.

Use the following commands to manage reviews:

  • @coderabbitai resume to resume automatic reviews.
  • @coderabbitai review to trigger a single review.

Use the checkboxes below for quick actions:

  • ▶️ Resume reviews
  • 🔍 Trigger review

Walkthrough

Schedulers record per-job dispatch time, bound Celery async result waits, detect and fail jobs that exceed configurable timeouts (revoking group tasks and sending reducer FAILURE for aggregations), make the query polling loop resilient to exceptions, and add a compression scheduler sweep to timeout stale in-memory jobs.

Changes

Timeout-Based Failure Handling for Celery Job Results

Layer / File(s) Summary
Data Shape
components/job-orchestration/job_orchestration/scheduler/scheduler_data.py
Adds `QueryJob.last_dispatch_time: datetime
Config
components/clp-py-utils/clp_py_utils/clp_config.py
Adds task_timeout_seconds to CompressionScheduler (600.0) and QueryScheduler (300.0).
Dispatch Timestamping
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
Set job.last_dispatch_time when dispatching a Celery group or restoring a group result before marking job RUNNING.
Async Result Timeout
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
try_getting_task_result now uses async_result.get(timeout=10, interval=0.005) and returns None on Celery timeout.
Job Status & Timeout Failure Handling
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
check_job_status_and_update_db(..., task_timeout_seconds: float = 300.0) computes elapsed since last_dispatch_time; timed-out running jobs revoke in-flight group tasks, optionally send reducer FAILURE for search/aggregation jobs, are removed from active_jobs, and are marked FAILED in DB.
Polling Loop Resilience / Wiring
components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
handle_job_updates gains task_timeout_seconds, wraps each polling iteration in try/except (logs via logger.exception and sleeps jobs_poll_delay on error), handle_jobs and main() propagate task_timeout_seconds from config.
Compression Scheduler Stale-Job Timeout
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
Adds _timeout_stale_jobs(logs_directory, db_context, task_timeout_seconds) scanning scheduled_jobs to fail jobs with zero completed tasks exceeding timeout; called each loop iteration and loop now retries instead of exiting on exceptions.

Sequence Diagram(s)

sequenceDiagram
  participant Client
  participant QueryScheduler
  participant CeleryWorker
  participant CeleryResultBackend
  participant ReducerQueue
  participant Database

  Client->>QueryScheduler: submit query job
  QueryScheduler->>CeleryWorker: dispatch group tasks (record last_dispatch_time)
  CeleryWorker->>CeleryResultBackend: push task results
  QueryScheduler->>CeleryResultBackend: async_group_result.get(timeout=10)
  alt result received
    CeleryResultBackend-->>QueryScheduler: results
    QueryScheduler->>Database: update job -> COMPLETED (set duration)
  else no results / timeout expired and elapsed > task_timeout_seconds
    QueryScheduler->>CeleryWorker: revoke group task (best-effort)
    QueryScheduler->>ReducerQueue: send FAILURE (if aggregation reducer exists)
    QueryScheduler->>Database: update job -> FAILED (set duration)
  end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Linked Issues check ✅ Passed The code changes fully address all primary objectives from issue #2252: timeouts for stale Celery results, error handling to prevent silent exits, configuration for task timeouts in both schedulers, and improved resilience.
Out of Scope Changes check ✅ Passed All code changes are directly related to addressing the scheduler hang issue and preventing stale job blocking; no extraneous changes were detected in the modified files.
Title check ✅ Passed The title accurately summarizes the main change: adding defensive timeouts to the job-orchestration scheduler to prevent hangs, which aligns with the primary objective of fixing issue #2252.
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`:
- Around line 1057-1076: The timeout check is using job.start_time (overall job
start) instead of a per-dispatch timestamp, which causes later batches to be
incorrectly timed out; add and use a per-dispatch timestamp (e.g.,
job.current_dispatch_start or store a dispatch_start in the active_jobs entry)
that is set/updated when the task is dispatched and reset when it transitions to
WAITING_FOR_DISPATCH, then replace uses of job.start_time in the timeout
calculation with that per-dispatch field (still comparing to
task_timeout_seconds) so QueryJobType.SEARCH_OR_AGGREGATION handling,
reducer_handler_msg_queues cleanup, active_jobs deletion, and the
set_job_or_task_status call all remain the same but are triggered only when the
per-dispatch timer exceeds task_timeout_seconds.
- Around line 1097-1102: The sleep call after computing jobs_poll_delay -
(interval_end_time - interval_start_time).total_seconds() can be negative when a
poll takes too long; clamp the computed duration to zero before awaiting to
avoid ValueError and repeated exception paths. In the handle_job_updates loop
replace the direct asyncio.sleep(...) with computing sleep_time =
jobs_poll_delay - (interval_end_time - interval_start_time).total_seconds() and
call await asyncio.sleep(max(0, sleep_time)); keep the existing exception
handling and logger.exception unchanged.
- Around line 1060-1076: This timeout path removes the job and marks it FAILED
but never cancels any in-flight Celery work; before deleting from active_jobs
and setting status (the block around QueryJobType.SEARCH_OR_AGGREGATION,
reducer_handler_msg_queues, del active_jobs, set_job_or_task_status), locate the
job's Celery group/async-result (e.g., attributes like job.group, job.group_id,
job.celery_group or a method that returns the AsyncResult/group) and call the
appropriate revoke/terminate API on it (e.g., group.revoke(terminate=True,
signal='SIGTERM') or AsyncResult.revoke(...)) to cancel workers, then proceed to
send reducer FAILURE and remove the job and update DB; ensure you check for None
and catch/log any exceptions from revoke so the rest of the cleanup still runs.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: ffc83b96-0cff-49da-b70b-1ff26e12bf26

📥 Commits

Reviewing files that changed from the base of the PR and between 0144808 and 8c18ff4.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py

Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Line 549: The timeout only checks start_time and requires
job.num_tasks_completed == 0 so multi-batch jobs can hang; add a
job.last_activity_time timestamp (initialize alongside start_time) and update it
whenever a batch completes (i.e., right after incrementing
job.num_tasks_completed in the batch-completion path that follows get_result()),
then replace the timeout condition that uses start_time and
job.num_tasks_completed with a staleness check against
COMPRESSION_JOB_TIMEOUT_SECONDS using now - job.last_activity_time; ensure the
existing timeout handling/abort logic is reused when the new staleness check
triggers.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: e057470c-a45c-46d5-b8ff-092447a85563

📥 Commits

Reviewing files that changed from the base of the PR and between 8c18ff4 and 9f54920.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

549-549: ⚠️ Potential issue | 🔴 Critical | 🏗️ Heavy lift

Timeout only applies to jobs with zero task completions, leaving multi-batch jobs vulnerable to indefinite hangs.

The condition job.num_tasks_completed == 0 means that jobs which complete at least one task will never time out, even if subsequent batches hang indefinitely due to worker failures. Since start_time is set once at job creation (line 695) and never reset, a job that completes its first batch but then stalls on the second batch will accumulate elapsed time forever whilst bypassing this timeout check.

Scenario:

  1. Job dispatches first batch of 10 tasks (line 714)
  2. First batch completes successfully → num_tasks_completed = 10 (line 517)
  3. Second batch dispatched (line 789)
  4. All workers die (node termination, OOM, etc.)
  5. get_result() returns None indefinitely (line 483)
  6. Timeout check at line 549 evaluates False because num_tasks_completed > 0
  7. Job hangs indefinitely

Recommendation:

Track last-activity time instead of relying solely on start time and zero-completion check. Update the timestamp whenever a batch completes, and check for staleness since last activity rather than since job start.

🔧 Suggested approach to fix multi-batch timeout handling

Add a last_activity_time field to CompressionJob and update it after each batch completion:

In scheduler_data.py (or wherever CompressionJob is defined), add:

last_activity_time: datetime.datetime

Initialize it alongside start_time at line 718:

 job = CompressionJob(
     id=job_id,
     start_time=start_time,
+    last_activity_time=start_time,
     result_handle=result_handle,

Update it after batch completion at line 517:

 job.num_tasks_completed += num_tasks_in_batch
+job.last_activity_time = datetime.datetime.now(datetime.timezone.utc)
 
 if len(job.remaining_tasks) > 0:

Then change the timeout check at line 549:

-if elapsed > COMPRESSION_JOB_TIMEOUT_SECONDS and job.num_tasks_completed == 0:
+elapsed_since_activity = (now - job.last_activity_time).total_seconds()
+if elapsed_since_activity > COMPRESSION_JOB_TIMEOUT_SECONDS:
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`
at line 549, The timeout logic currently only fires when job.num_tasks_completed
== 0 and uses CompressionJob.start_time, so multi-batch jobs that made progress
can hang; add a CompressionJob.last_activity_time field initialized with
start_time, update last_activity_time whenever a batch completes (where the code
increments num_tasks_completed / handles batch completion), and replace the
timeout check that uses start_time and num_tasks_completed with a staleness
check comparing now to last_activity_time (i.e., elapsed_since_last_activity >
COMPRESSION_JOB_TIMEOUT_SECONDS) so stalled subsequent batches will time out
even if num_tasks_completed > 0.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Duplicate comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Line 549: The timeout logic currently only fires when job.num_tasks_completed
== 0 and uses CompressionJob.start_time, so multi-batch jobs that made progress
can hang; add a CompressionJob.last_activity_time field initialized with
start_time, update last_activity_time whenever a batch completes (where the code
increments num_tasks_completed / handles batch completion), and replace the
timeout check that uses start_time and num_tasks_completed with a staleness
check comparing now to last_activity_time (i.e., elapsed_since_last_activity >
COMPRESSION_JOB_TIMEOUT_SECONDS) so stalled subsequent batches will time out
even if num_tasks_completed > 0.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: c65793ae-7789-4fd5-87a2-26f116259847

📥 Commits

Reviewing files that changed from the base of the PR and between 9f54920 and 0687c5a.

📒 Files selected for processing (1)
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py

@junhaoliao junhaoliao requested a review from gibber9809 May 10, 2026 10:11
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`:
- Line 1100: The scheduler is always using the hardcoded default timeout
(300.0); wire the configurable task_timeout_seconds through the runtime call
path by adding a task_timeout_seconds parameter to
check_job_status_and_update_db and using it instead of the literal default, then
propagate that parameter from handle_jobs and main (add task_timeout_seconds to
their signatures and forward it on all call sites), and update any internal
calls or tests that assume the old signature so the live scheduler uses the
configured timeout.
- Around line 1067-1069: The call to
job.current_sub_job_async_task_result.revoke(terminate=True) can raise and
prevent cleanup (removal from active_jobs and DB failure status update); wrap
the revoke call in a try/except that logs the revoke error but does not
re-raise, ensuring execution continues to the block that handles
QueryJobType.SEARCH_OR_AGGREGATION and still executes del active_jobs[job_id]
and the DB status update for the job (reference
job.current_sub_job_async_task_result.revoke, active_jobs, job_id,
QueryJobType.SEARCH_OR_AGGREGATION).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: 97c27000-d000-4794-9b2f-afc0ba204386

📥 Commits

Reviewing files that changed from the base of the PR and between 0687c5a and 040b302.

📒 Files selected for processing (2)
  • components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py
  • components/job-orchestration/job_orchestration/scheduler/scheduler_data.py

Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (1)
components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py (1)

546-548: ⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Track staleness per batch, not only before the first completion.

start_time is fixed at first dispatch, but this branch only times out jobs while num_tasks_completed == 0. Once the first batch succeeds, any later batch can still hang forever because it is never eligible for timeout. Please switch this to a last-activity/per-dispatch timestamp and refresh it after each successful batch completes.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`
around lines 546 - 548, The timeout logic currently compares now -
job.start_time and only triggers when job.num_tasks_completed == 0, which leaves
later batches unmonitored; change the job to track a mutable last-activity
timestamp (e.g., job.last_activity_time) and use elapsed = (now -
job.last_activity_time).total_seconds() when checking against
task_timeout_seconds (remove the num_tasks_completed == 0 guard), and ensure you
update job.last_activity_time after each successful batch completion/dispatch
(where batches are marked completed) so each dispatch resets the timeout window.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 557-564: The timeout path currently calls
_handle_failed_compression_job(...) and removes the job from scheduled_jobs but
does not update the dispatched rows in compression_tasks, leaving them stuck
RUNNING; before deleting scheduled_jobs[job_id] update the compression_tasks
rows for that job_id via db_context to set their state to a terminal failure
(e.g., FAILED or ERROR), add the timeout message (same text passed to
_handle_failed_compression_job), and set any finished timestamp/worker info so
they’re no longer considered in-flight; ensure you do this for all rows matching
compression_tasks.job_id == job_id (and only affect RUNNING tasks) and perform
the DB update prior to del scheduled_jobs[job_id] so reconciliation won’t miss
them.

In
`@components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py`:
- Around line 1079-1087: The timeout branch currently only updates the job row
and then deletes active_jobs[job_id], leaving dispatched query_tasks in
PENDING/RUNNING; before calling del active_jobs[job_id], call
set_job_or_task_status for the tasks table (use the same logic as the cancel
path) to transition dispatched/ running tasks to QueryTaskStatus.FAILED (or
equivalent) only if their current status is QueryTaskStatus.PENDING or
QueryTaskStatus.RUNNING; mirror the cancel flow: use the same status arguments
and duration calculation, reference QUERY_JOBS_TABLE_NAME/QUERY_TASKS_TABLE_NAME
and QueryJobStatus/QueryTaskStatus symbols and ensure the task-status update
runs prior to removing active_jobs[job_id].

---

Duplicate comments:
In
`@components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py`:
- Around line 546-548: The timeout logic currently compares now - job.start_time
and only triggers when job.num_tasks_completed == 0, which leaves later batches
unmonitored; change the job to track a mutable last-activity timestamp (e.g.,
job.last_activity_time) and use elapsed = (now -
job.last_activity_time).total_seconds() when checking against
task_timeout_seconds (remove the num_tasks_completed == 0 guard), and ensure you
update job.last_activity_time after each successful batch completion/dispatch
(where batches are marked completed) so each dispatch resets the timeout window.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: ASSERTIVE

Plan: Pro

Run ID: c1826837-98bc-4c06-b92f-77f05420a876

📥 Commits

Reviewing files that changed from the base of the PR and between 040b302 and 26c553e.

📒 Files selected for processing (3)
  • components/clp-py-utils/clp_py_utils/clp_config.py
  • components/job-orchestration/job_orchestration/scheduler/compress/compression_scheduler.py
  • components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py

Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
@goynam goynam changed the title fix(job-orchestration): Add timeout for stale query jobs to prevent scheduler hang (fixes #2252) fix(job-orchestration): Add timeout for query jobs to prevent scheduler hang (fixes #2252) May 12, 2026
Copy link
Copy Markdown
Contributor

@hoophalab hoophalab left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@goynam Thank you for your contribution. Here are some comments:

Comment thread components/job-orchestration/job_orchestration/scheduler/query/query_scheduler.py Outdated
Comment on lines +1096 to +1105
# Check for stale sub-jobs that have been running too long without results
if job.last_dispatch_time is not None:
elapsed = (datetime.datetime.now() - job.last_dispatch_time).total_seconds()
if elapsed > task_timeout_seconds:
logger.error(
f"Job `{job_id}` sub-job timed out after {elapsed:.0f}s "
f"without results. Workers may have died."
)
# Revoke in-flight tasks (best-effort)
if job.current_sub_job_async_task_result is not None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This timeout only addresses the following situation: Celery receives a successful result from the message broker, but Redis is empty. I don't see how waiting longer will resolve this issue.

I suggest:

  1. Instead of catching TimeoutError and returning None in try_getting_task_result, just raise it.
  2. Remove the error handling here—if an exception occurs, the exception handling above will already deal with it.

You would need to remove task_timeout_seconds completely

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mode A — broker reports success but Redis result key is missing/late. GroupResult.ready() returns True, then get() raises or hangs. The bounded get(timeout=10) + except celery.exceptions.TimeoutError in try_getting_task_result is what handles this path, and you're right that it's the only thing needed there. I can change it to raise instead of returning None if you'd prefer — the outer try/except in handle_job_updates will catch and retry. Happy to make that change.
Mode B — worker is killed before pushing anything (the original bug from #2252). ready() returns False forever because at least one child result key was never written. Polling never even reaches get(), so no TimeoutError is ever raised. The only signal we have that the job is dead is wall-clock elapsed since dispatch — that's what task_timeout_seconds + last_dispatch_time cover.

Comment on lines +854 to +870
except TimeoutError:
# Some dispatch futures didn't complete in time — mark those jobs as failed
for future in futures:
if not future.done():
job_id = future_to_job_id[future]
logger.error("Dispatch timed out for job %s. Marking as FAILED.", job_id)
if job_id in active_jobs:
job = active_jobs.pop(job_id)
set_job_or_task_status(
db_conn,
QUERY_JOBS_TABLE_NAME,
job_id,
QueryJobStatus.FAILED,
QueryJobStatus.RUNNING,
duration=(datetime.datetime.now() - job.start_time).total_seconds(),
)
future.cancel()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I cannot see why celery can raise a TimeoutError in the dispatching procedure

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is concurrent.futures.TimeoutError, not Celery — it's raised by concurrent.futures.as_completed(futures, timeout=60) when a ProcessPoolExecutor future doesn't complete in 60s. It guards against DispatchExecutor.dispatch_job_and_update_db itself wedging (broker auth blocked, MySQL stalled inside the executor process, etc.) — nothing to do with Celery's task lifecycle. I've made that explicit by changing except TimeoutError: → except concurrent.futures.TimeoutError.

Comment on lines +560 to +568
for job_id in jobs_to_timeout:
# Mark in-flight task rows as failed
db_context.cursor.execute(
f"UPDATE {COMPRESSION_TASKS_TABLE_NAME}" # noqa: S608
f" SET status = '{CompressionTaskStatus.FAILED}'"
f" WHERE job_id = %s AND status = '{CompressionTaskStatus.RUNNING}'",
(job_id,),
)
db_context.connection.commit()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. I cannot see why updating the database from RUNNING to FAILED revokes task on compression worker.
  2. If a timeout on compression workers is strictly needed, a better approach might be using celery's time_limit in decorator, or timeout when apply_async.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. DB flip didn't actually cancel the worker. Right — flipping rows to FAILED only updates state, the worker keeps running. I've added a revoke() method to TaskManager.ResultHandle (Celery: revoke(terminate=True); Spider: no-op since it relies on its task time limit), and _timeout_stale_jobs now calls it before flipping rows. This matches the pattern on the query side.
  2. task_time_limit is the better primary defense. Agreed — added task_soft_time_limit / task_time_limit to both executor/compress/celeryconfig.py and executor/query/celeryconfig.py (compression: 540/600s; query: 270/300s, mirroring the existing task_timeout_seconds defaults). I've kept the scheduler-side staleness sweep as a backstop with a comment noting that — its job now is the case Celery's own time limit can't help with: the worker pod being killed before it can honor the limit.

goynam pushed a commit to goynam/clp that referenced this pull request May 26, 2026
- Remove redundant startup CANCELLING cleanup; handle_cancelling_search_jobs
  + kill_hanging_jobs already cover this state.
- Tighten dispatch except clause to concurrent.futures.TimeoutError so the
  intent (guarding the executor itself wedging) is self-evident.
- Use logger.exception instead of logger.warning when revoke() fails so the
  traceback is captured.
- Add task_soft_time_limit / task_time_limit to both Celery worker configs as
  the primary worker-level defense; the scheduler-side staleness sweep is now
  documented as a backstop for the case where the worker pod is killed.
- Add a revoke() method to TaskManager.ResultHandle (Celery: terminate=True;
  Spider: no-op) and call it from compression_scheduler's _timeout_stale_jobs
  so timed-out jobs actually cancel in-flight work rather than just flipping
  DB rows.
- Promote `import redis` to module-level.
@hoophalab hoophalab changed the title fix(job-orchestration): Add timeout for query jobs to prevent scheduler hang (fixes #2252) fix(job-orchestration): Add defensive timeouts to prevent scheduler hang (fixes #2252). May 28, 2026
@hoophalab hoophalab force-pushed the fix/query-scheduler-timeout branch from 8d5c617 to 70d1ee9 Compare May 28, 2026 21:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Query scheduler hangs indefinitely when polling stale Celery task results from Redis

2 participants