feat(spider-storage): Add support for recovering job cache on storage service restart.#339
Conversation
…tocol and integration tests. (y-scope#325)
…r the task executor. (y-scope#326)
…acker; Refactor integration tests to extract common helpers into `test-utils`. (y-scope#328)
…ition with `spider-storage`'s. (y-scope#331)
…UIDv7 for database-generated IDs. (y-scope#337) Co-authored-by: LinZhihao-723 <pleiades3190@gmail.com>
…task-dispatch loop. (y-scope#329)
…ntsman, and wolf variants. (y-scope#336) Co-authored-by: sitaowang1998 <sitaowang1998@outlook.com>
…ask instance management: (y-scope#333) * Add `spider-proto` component for protobuf source files. * Add `spider-proto-rust` component for the generated Rust code. * Add tasks to build the generated Rust code. * Add a GitHub workflow to verify the generated Rust code is up-to-date. * Implement a gRPC-backed `StorageClient` for the execution manager. Co-authored-by: LinZhihao-723 <pleiades3190@gmail.com>
✨ Finishing Touches🧪 Generate unit tests (beta)
|
There was a problem hiding this comment.
Actionable comments posted: 10
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
components/spider-storage/src/state/runtime.rs (1)
119-133:⚠️ Potential issue | 🟠 Major | ⚡ Quick winCancel or abort the spawned pool task when recovery fails.
create_runtimespawns the task-instance pool before recovery. If recovery fails on Line 132, the function returns early and dropstask_instance_pool_join_handlewithout shutdown, leaving the pool task running in the background.Suggested fix
- let (task_instance_pool_connector, task_instance_pool_join_handle) = create_task_instance_pool( + let (task_instance_pool_connector, mut task_instance_pool_join_handle) = create_task_instance_pool( ready_queue_sender.clone(), db.clone(), cancellation_token.clone(), task_instance_pool_config, ) .map_err(CacheError::from)?; - - let job_cache = recover_job_cache( - &db, - ready_queue_sender.clone(), - task_instance_pool_connector.clone(), - ) - .await?; + let job_cache = match recover_job_cache( + &db, + ready_queue_sender.clone(), + task_instance_pool_connector.clone(), + ) + .await + { + Ok(cache) => cache, + Err(e) => { + cancellation_token.cancel(); + task_instance_pool_join_handle.abort(); + return Err(e); + } + };🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@components/spider-storage/src/state/runtime.rs` around lines 119 - 133, create_runtime spawns the task-instance pool via create_task_instance_pool and then calls recover_job_cache; if recovery fails you must shut down the pool instead of returning with the join handle still running. On any early return after calling create_task_instance_pool (e.g., when recover_job_cache returns Err), call the cancellation mechanism used by the pool (e.g., cancellation_token.cancel()) and then ensure task_instance_pool_join_handle is awaited or aborted (call join/await or abort on the join handle) before returning the error; reference create_runtime, create_task_instance_pool, recover_job_cache, task_instance_pool_join_handle and cancellation_token to locate the fix.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In @.github/workflows/proto-generated-code-checks.yaml:
- Around line 11-23: Add a top-level permissions block and disable credential
persistence on the checkout step to reduce token exposure: add a top-level
permissions: block (e.g., permissions: contents: read) alongside the existing
concurrency block, and in the proto-code-committed job update the
actions/checkout step (the uses: "actions/checkout@...") to include with:
persist-credentials: false (and keep submodules: "recursive"); ensure the job
name proto-code-committed and concurrency block remain unchanged.
In `@components/spider-execution-manager/src/runtime.rs`:
- Around line 395-406: The code in Outcome::InTaskFailure (runtime.rs) treats
the msgpack-encoded error bytes as UTF-8; instead deserialize the bytes as the
documented ExecutorError (the type defined where process_pool.rs documents
msgpack encoding) using a msgpack deserializer (e.g.
rmp_serde::from_slice::<ExecutorError>(&error)), then format the resulting
ExecutorError (Display or Debug) into the Failure message; if deserialization
fails, fall back to a safe binary-to-string (or hex) fallback so the Failure
still contains useful info. Ensure you reference the ExecutorError type and add
the necessary use/imports and error-handling around the deserialization in the
Outcome::InTaskFailure branch.
In `@components/spider-proto/storage/storage.proto`:
- Line 3: The proto's declared package "storage" doesn't match Buf's
package-directory mapping, causing PACKAGE_DIRECTORY_MATCH failures; fix by
either updating the proto package declaration to the correct fully-qualified
package that corresponds to its directory layout or adjust the Buf
module/package-directory mapping so this proto's directory maps to the "storage"
package; specifically edit the "package storage;" line in the storage.proto (or
the Buf module config in buf.yaml/buf.work) so the package name and the
repository directory mapping align.
In `@components/spider-storage/src/cache/job.rs`:
- Around line 157-159: During recovery, defend against inconsistent rows where
state == JobState::CommitReady but job_outputs is None by adding an explicit
check before calling task_graph.restore_outputs: if recovered state is
CommitReady and job_outputs.is_none(), return an error (or otherwise reject the
row) instead of silently continuing; locate the recovery logic around the
variables job_outputs and the call task_graph.restore_outputs(outputs).await?
and add the guard that validates this combination (or convert it to a safe
fallback) so the code does not enqueue/complete the commit task via
succeed_commit_task_instance while leaving output slots unset.
In `@components/spider-storage/src/db/mariadb.rs`:
- Around line 397-409: The recovery currently calls sqlx::query_as(...,
SELECT_QUERY).fetch_all(&self.pool).await? which loads all rows (and blobs) into
memory and then re-materializes them later; change this to a paged/streamed
recovery: modify SELECT_QUERY (or use a variant) to include ORDER BY id LIMIT ?
and implement an async loop that fetches pages (using fetch/fetch_many or fetch
+ try_next) and processes each row as it arrives, advancing by the last-seen id
(or using offset) until no more rows remain; remove the fetch_all + intermediate
vector materialization and apply the same streamed/page-loop pattern to the
other recovery block referenced around the 411-440 region so blobs are handled
incrementally rather than all at once.
- Around line 427-430: During recovery, enforce the invariant that
JobState::CommitReady must have outputs: check after deserializing
serialized_job_outputs (the serialized_job_outputs -> job_outputs step) and if
the row state is JobState::CommitReady but serialized_job_outputs is None,
return a DbError (e.g. an invariant/validation error) instead of producing
job_outputs: None; update the recovery code path that builds job_outputs (the
map/transpose block) to explicitly validate the state and fail fast when
CommitReady lacks outputs so corrupted rows are caught at DB-recovery time.
In `@components/spider-task-executor/src/bin/spider_task_executor.rs`:
- Around line 73-77: The code currently joins the raw package string into
pkg_dir and can be exploited via path traversal (e.g., "../../...") before
calling manager.load; update the resolution in the block that uses manager.get,
pkg_dir, package and manager.load to validate/sanitize package first — either
reject any package containing path separators or ".." and restrict to a safe
filename pattern (alphanumeric, hyphen/underscore), or build the full path then
canonicalize it and assert the canonicalized path starts with the canonicalized
pkg_dir before calling manager.load; return an error if validation fails so no
unintended .so can be loaded.
In `@tests/huntsman/test-utils/src/executor.rs`:
- Around line 100-107: Wrap the unbounded awaits in recv(), try_recv(),
shutdown_clean(), and wait_for_exit() with tokio::time::timeout using a CI-safe
Duration so tests fail fast instead of hanging: replace direct awaits on
self.responses.next().await in recv() and try_recv() and on
self.child.wait().await in shutdown_clean() and wait_for_exit() with
timeout(duration, ...).await, handle the Err(timeout) case by panicking or
returning a clear error message specifying which method timed out (include the
method name like recv/try_recv/shutdown_clean/wait_for_exit in the message) and
unwrap/propagate the Ok(inner) result as before; keep existing
deserialization/error unwraps for non-timeout failures.
In `@tools/scripts/lib_install/ubuntu/install-dev-common.sh`:
- Line 29: Replace the unpinned bootstrap "curl -LsSf
https://astral.sh/uv/install.sh | sh" with a safer approach: either remove the
auto-install and document "uv" as a required pre-req for developers, or fetch a
specific uv release artifact (referencing the uv release version) and verify its
checksum/signature before installation (download pinned tarball/installer +
checksum file, verify, then run installer). Ensure the change targets the exact
command string "curl -LsSf https://astral.sh/uv/install.sh | sh" and update any
comments or README to reflect the new pre-req or pinned-install flow.
- Around line 6-9: The script enables strict modes with "set -e" and "set -u"
but misses pipefail, so failures in pipelines (e.g., the curl | sh pipeline that
installs "uv") can be masked; add "set -o pipefail" (or change to "set -euo
pipefail") alongside the existing "set -e" and "set -u" so the pipeline failure
causes the script to exit and the "uv" installation cannot be silently skipped.
---
Outside diff comments:
In `@components/spider-storage/src/state/runtime.rs`:
- Around line 119-133: create_runtime spawns the task-instance pool via
create_task_instance_pool and then calls recover_job_cache; if recovery fails
you must shut down the pool instead of returning with the join handle still
running. On any early return after calling create_task_instance_pool (e.g., when
recover_job_cache returns Err), call the cancellation mechanism used by the pool
(e.g., cancellation_token.cancel()) and then ensure
task_instance_pool_join_handle is awaited or aborted (call join/await or abort
on the join handle) before returning the error; reference create_runtime,
create_task_instance_pool, recover_job_cache, task_instance_pool_join_handle and
cancellation_token to locate the fix.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
Run ID: 64bda151-1585-4e6b-b36b-5f2c6b52e139
⛔ Files ignored due to path filters (2)
Cargo.lockis excluded by!**/*.lockcomponents/spider-proto-rust/src/generated/storage.rsis excluded by!**/generated/**
📒 Files selected for processing (74)
.devcontainer/Dockerfile.github/workflows/code-linting-checks.yaml.github/workflows/proto-generated-code-checks.yaml.github/workflows/tests.yamlCargo.tomlcomponents/spider-core/Cargo.tomlcomponents/spider-core/src/lib.rscomponents/spider-core/src/session.rscomponents/spider-core/src/types/id.rscomponents/spider-execution-manager/Cargo.tomlcomponents/spider-execution-manager/src/client.rscomponents/spider-execution-manager/src/client/grpc/mod.rscomponents/spider-execution-manager/src/client/grpc/storage.rscomponents/spider-execution-manager/src/client/liveness.rscomponents/spider-execution-manager/src/client/scheduler.rscomponents/spider-execution-manager/src/client/storage.rscomponents/spider-execution-manager/src/lib.rscomponents/spider-execution-manager/src/liveness.rscomponents/spider-execution-manager/src/process_pool.rscomponents/spider-execution-manager/src/runtime.rscomponents/spider-proto-rust/Cargo.tomlcomponents/spider-proto-rust/build.rscomponents/spider-proto-rust/src/id.rscomponents/spider-proto-rust/src/lib.rscomponents/spider-proto/storage/storage.protocomponents/spider-storage/Cargo.tomlcomponents/spider-storage/src/cache.rscomponents/spider-storage/src/cache/job.rscomponents/spider-storage/src/cache/sync.rscomponents/spider-storage/src/cache/task.rscomponents/spider-storage/src/db.rscomponents/spider-storage/src/db/error.rscomponents/spider-storage/src/db/mariadb.rscomponents/spider-storage/src/db/protocol.rscomponents/spider-storage/src/state/job_cache.rscomponents/spider-storage/src/state/runtime.rscomponents/spider-storage/src/state/service.rscomponents/spider-storage/src/state/test_utils.rscomponents/spider-storage/src/task_instance_pool.rscomponents/spider-storage/tests/mariadb_infra.rscomponents/spider-storage/tests/mariadb_test.rscomponents/spider-storage/tests/recovery_test.rscomponents/spider-storage/tests/scheduling_infra.rscomponents/spider-storage/tests/test_spider_storage.rscomponents/spider-task-executor/Cargo.tomlcomponents/spider-task-executor/src/bin/spider_task_executor.rscomponents/spider-task-executor/src/error.rscomponents/spider-task-executor/src/lib.rscomponents/spider-task-executor/src/manager.rscomponents/spider-task-executor/src/protocol.rscomponents/spider-tdl/src/task.rscomponents/spider-tdl/src/task_context.rscomponents/spider-tdl/tests/test_task_macro.rstaskfiles/build.yamltaskfiles/lint.yamltaskfiles/test.yamltests/huntsman/em-runtime/Cargo.tomltests/huntsman/em-runtime/src/lib.rstests/huntsman/em-runtime/tests/test_runtime.rstests/huntsman/integration-test-tasks/Cargo.tomltests/huntsman/integration-test-tasks/src/lib.rstests/huntsman/task-executor/Cargo.tomltests/huntsman/task-executor/src/lib.rstests/huntsman/task-executor/tests/overhead_instrument.rstests/huntsman/task-executor/tests/test_executor.rstests/huntsman/task-executor/tests/test_process_pool.rstests/huntsman/tdl-integration/tests/complex.rstests/huntsman/test-utils/Cargo.tomltests/huntsman/test-utils/src/executor.rstests/huntsman/test-utils/src/lib.rstests/huntsman/test-utils/src/mock.rstools/scripts/lib_install/ubuntu/install-dev-common.shtools/scripts/lib_install/ubuntu/install-dev-huntsman.shtools/scripts/lib_install/ubuntu/install-dev-wolf.sh
💤 Files with no reviewable changes (1)
- components/spider-storage/src/cache.rs
| let rows = sqlx::query_as::< | ||
| _, | ||
| ( | ||
| JobId, | ||
| ResourceGroupId, | ||
| JobState, | ||
| String, | ||
| Vec<u8>, | ||
| Option<Vec<u8>>, | ||
| ), | ||
| >(SELECT_QUERY) | ||
| .fetch_all(&self.pool) | ||
| .await?; |
There was a problem hiding this comment.
Bound recovery reads to avoid startup memory blow-ups.
Line 397 fetches all recoverable rows (including blob payloads) into memory, then Line 411 materializes another vector. Under a large in-flight backlog, restart recovery can spike memory and fail availability. Please switch to paged/streamed recovery (e.g., ORDER BY id LIMIT ? loop).
Also applies to: 411-440
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-storage/src/db/mariadb.rs` around lines 397 - 409, The
recovery currently calls sqlx::query_as(...,
SELECT_QUERY).fetch_all(&self.pool).await? which loads all rows (and blobs) into
memory and then re-materializes them later; change this to a paged/streamed
recovery: modify SELECT_QUERY (or use a variant) to include ORDER BY id LIMIT ?
and implement an async loop that fetches pages (using fetch/fetch_many or fetch
+ try_next) and processes each row as it arrives, advancing by the last-seen id
(or using offset) until no more rows remain; remove the fetch_all + intermediate
vector materialization and apply the same streamed/page-loop pattern to the
other recovery block referenced around the 411-440 region so blobs are handled
incrementally rather than all at once.
| let pkg = if let Some(pkg) = manager.get(package) { | ||
| pkg | ||
| } else { | ||
| let path = pkg_dir.join(package).join(format!("lib{package}.so")); | ||
| manager.load(&path)? |
There was a problem hiding this comment.
Block path traversal in package resolution.
Line 76 joins tdl_context.package directly into a filesystem path. A crafted value like ../../tmp/evil can escape SPIDER_TDL_PACKAGE_DIR and load an unintended shared object.
Suggested patch
use std::{
- path::{Path, PathBuf},
+ path::{Component, Path, PathBuf},
time::Instant,
};
@@
fn run_task(
@@
) -> Result<Vec<u8>, ExecutorError> {
+ let mut comps = Path::new(package).components();
+ let is_single_normal_component =
+ matches!(comps.next(), Some(Component::Normal(_))) && comps.next().is_none();
+ if !is_single_normal_component {
+ return Err(ExecutorError::InvalidLibrary(format!(
+ "invalid package name: {package}"
+ )));
+ }
+
let pkg = if let Some(pkg) = manager.get(package) {
pkg
} else {
let path = pkg_dir.join(package).join(format!("lib{package}.so"));
manager.load(&path)?🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@components/spider-task-executor/src/bin/spider_task_executor.rs` around lines
73 - 77, The code currently joins the raw package string into pkg_dir and can be
exploited via path traversal (e.g., "../../...") before calling manager.load;
update the resolution in the block that uses manager.get, pkg_dir, package and
manager.load to validate/sanitize package first — either reject any package
containing path separators or ".." and restrict to a safe filename pattern
(alphanumeric, hyphen/underscore), or build the full path then canonicalize it
and assert the canonicalized path starts with the canonicalized pkg_dir before
calling manager.load; return an error if validation fails so no unintended .so
can be loaded.
c3d45de to
e3db65f
Compare
| for (output_reader, output) in self.outputs.iter().zip(persisted_outputs) { | ||
| *output_reader.writer().write().await = Some(output); | ||
| } |
There was a problem hiding this comment.
Have you read this change carefully (assume it's done by your coding agent)?
The purpose of having the reader/writer wrapper around the RW lock is to split read/write access. If we need to mutate the output, we should design a new API, instead of performing write through a "reader".
There was a problem hiding this comment.
This is actually after my fight with codex. The original is to directly add a Vec<OutputWriter> to the TaskGraph. What do you propose for the new API?
LinZhihao-723
left a comment
There was a problem hiding this comment.
Reviewed with the following changes:
DB changes
RecoverableJob->RecoverableJobContext- Removed field level comments (looks redundant, the field names are self-explanatory)
- Drop
job_prefix from field names
- Index jobs on their states in the DB table.
- Add a struct
RecoverableJobRowProjectionto makeget_recoverable_jobsmore readable.
Cache changes
- We shouldn't send task-ready in JCB recovery: the service is not runnable yet; the task-ready can fill the underlying channel, making a deadlock on startup.
- No alias on
UnexpectedJobState: we should be explicit whether this error is internal or not.- This is my fault from the original JCB PR.
- Add a dedicated error code
InvalidRecoverableJobContext. - The major design issue with #339 (comment) is that we shouldn't mutate the task graph after construction; instead, we should make dedicated factory methods for the cases we need.
- Optimized a bit further by making the task graph empty for commit-ready and cleanup-ready; there's no need to reload the task graph: these tasks should be irrelevant anyways.
Service changes
- Add
resend_ready_tasksin the service level.- Will be needed by the scheduler and the recovery step.
- Tested in the recovery tests.
Tests
- Change the file name to
runtime_recovery_test.rs. - Some style-wise refactoring.
- For
restarted_storage_cache_recovers_commit_ready_jobandrestarted_storage_cache_recovers_cleanup_ready_job: make sure that the regular task registration would fail with the error code showing the job is already in a terminal state. - Make
find_entry_for_jobreturn a vector of ready queue entries, and make assertions outside. - For
restarted_storage_cache_recovers_running_job_from_startandrestarted_storage_cache_recovers_commit_ready_job, create a new runtime in the end and make sure the job state and outputs are persisted (accessable viaservice, through DB operations). - Add
ignoresection to tests since MariaDB is required.
Others
- Remove
uuiddependency fromspider-storage'sCargo.toml.
Two more questions for you:
- Why do we not add
Readyjobs to the cache? How are we supposed to handle it if the user registers a job and then calls start, but the service crashed and restarted in between? - Currently, recovery tests are running together with other tasks that may produce jobs in the DB. These jobs might also be recovered, and if that happens, polling 32 items from the ready queue may not be able to guarantee to retrieve the expected entry. My solution is to apply
#[serial_test::file_serial]to all tests insideruntime_recovery_test.rsandjcb_test.rs.
LinZhihao-723
left a comment
There was a problem hiding this comment.
- Sorry for not being clear in this comment: we could keep the existing unit tests; but the read job recovery unit tests are redundant.
succeed_ready_jobis confusing: the job is not in "ready"; it should be in the running state. Renamed it for clarity.
LinZhihao-723
left a comment
There was a problem hiding this comment.
Directly modified the PR title.
Description
This PR adds recovery of jobs in
Running,CommitReadyandCleanupReadystates when the storage runtime restarts. All recoverable jobs are fetched from the database and filled into the cache layer and the ready queue.Checklist
breaking change.
Validation performed
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes & Improvements
Testing
Chores