Skip to content

feat(spider-scheduler): Add channel-based dispatch queue implementation.#332

Open
LinZhihao-723 wants to merge 9 commits into
y-scope:mainfrom
LinZhihao-723:dispatch-queue-impl
Open

feat(spider-scheduler): Add channel-based dispatch queue implementation.#332
LinZhihao-723 wants to merge 9 commits into
y-scope:mainfrom
LinZhihao-723:dispatch-queue-impl

Conversation

@LinZhihao-723

@LinZhihao-723 LinZhihao-723 commented Jun 1, 2026

Copy link
Copy Markdown
Member

Description

This PR depends on #330 and #331.

This PR implements an async-channel-backed dispatch queue. It also adds unit tests to cover its basic behaviors.

Checklist

  • The PR satisfies the contribution guidelines.
  • This is a breaking change and that has been indicated in the PR title, OR this isn't a
    breaking change.
  • Necessary docs have been updated, OR no docs need to be updated.

Validation performed

  • Ensure all workflows pass.
  • Added unit tests to cover the enqueue/dequeue operations, as well as session bumping.

Summary by CodeRabbit

Release Notes

  • Chores
    • Established new scheduler component crate with configured dependencies
    • Implemented dispatch queue system foundation with comprehensive test coverage
    • Added tests for concurrent operations, load balancing, and session management scenarios
    • Configured asynchronous runtime support for system operations

@LinZhihao-723 LinZhihao-723 requested review from a team and sitaowang1998 as code owners June 1, 2026 01:20
@coderabbitai

coderabbitai Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

Review Change Stack

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 63f03d32-9dda-4e5f-9eb8-0320d4aa9508

📥 Commits

Reviewing files that changed from the base of the PR and between 10cb6ad and 68a4a23.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (1)
  • components/spider-scheduler/src/dispatch_queue.rs

Walkthrough

This PR introduces the dispatch queue layer of the spider-scheduler crate. It defines cloneable writer and reader handles backed by an async channel and shared session-ID state, implements monotonic session bumping with queued assignment invalidation, and provides a factory constructor. The module includes comprehensive async tests validating round-trip delivery, multi-consumer load balancing, session invalidation semantics, and concurrent pair-consistency.

Changes

Dispatch Queue Implementation

Layer / File(s) Summary
Crate configuration and dependencies
components/spider-scheduler/Cargo.toml
Defines the spider-scheduler crate metadata and runtime/development dependencies including async-channel, tokio with sync/time features, tokio-util, thiserror, async-trait, and the local spider-core path dependency.
Dispatch queue writer, reader, and factory with test suite
components/spider-scheduler/src/dispatch_queue.rs
Implements DispatchQueueWriter with bounded enqueue and monotonic session-ID bumping that drains pre-bump assignments; DispatchQueueReader with bounded timeout-guarded dequeue that pairs assignments with the session ID at dequeue time; create_dispatch_queue factory; and an extensive async test suite validating round-trip delivery, load balancing across multiple readers, session bump invalidation, monotonicity enforcement, queue-size reset, and concurrent delivery consistency.

Estimated code review effort

🎯 3 (Moderate) | ⏱️ ~20 minutes

Possibly related PRs

  • y-scope/spider#330: This PR's concrete DispatchQueueWriter/DispatchQueueReader implementation and session-bump dequeue semantics directly fulfil the DispatchQueueSink/DispatchQueueSource trait abstractions introduced in the referenced PR.

Suggested reviewers

  • sitaowang1998
🚥 Pre-merge checks | ✅ 5
✅ Passed checks (5 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title clearly and specifically describes the main change: implementing a channel-based dispatch queue in the spider-scheduler component.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.
Linked Issues check ✅ Passed Check skipped because no linked issues were found for this pull request.
Out of Scope Changes check ✅ Passed Check skipped because no linked issues were found for this pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@coderabbitai coderabbitai Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@components/spider-scheduler/src/dispatch_queue.rs`:
- Around line 82-84: The queue must stamp session IDs at enqueue time instead of
inferring them at dequeue: modify the internal queue element type to store
(SessionId, TaskAssignment) and change DispatchQueueWriter::enqueue to read the
current session (from wherever bump_session_id updates it) and push the stamped
pair atomically; update DispatchQueue::dequeue to return the stamped pair (or
convert it to the existing return shape) rather than attaching the session on
receive. Alternatively, if you prefer serialization instead of changing the item
type, guard both DispatchQueueWriter::enqueue and
DispatchQueueWriter::bump_session_id with the same writer-side lock/shared mutex
so bump and enqueue cannot interleave; also ensure the cloneable
DispatchQueueWriter shares that lock (or remove Clone) so concurrent clones
cannot bypass the synchronization. Ensure corresponding consumer code (dequeue
handling) and any helpers that construct or consume TaskAssignment are updated
to the new (SessionId, TaskAssignment) shape.
- Around line 93-97: The writer currently holds a hidden clone of
assignment_receiver (DispatchQueueWriterInner / create_dispatch_queue),
preventing the channel from closing when all public DispatchQueueReader
instances are dropped so DispatchQueueWriter::enqueue never returns
DispatchQueueClosed; remove the stored clone from DispatchQueueWriterInner (do
not clone assignment_receiver into the writer in create_dispatch_queue) so only
real readers hold receiver handles, allowing async_channel::Sender::send to fail
and map to SchedulerError::DispatchQueueClosed when readers are gone.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 8c98dfdc-5b9c-4dfb-8017-d96d0b1b0205

📥 Commits

Reviewing files that changed from the base of the PR and between 27091f0 and 10cb6ad.

⛔ Files ignored due to path filters (1)
  • Cargo.lock is excluded by !**/*.lock
📒 Files selected for processing (19)
  • Cargo.toml
  • components/spider-core/src/types/id.rs
  • components/spider-scheduler/Cargo.toml
  • components/spider-scheduler/src/core.rs
  • components/spider-scheduler/src/dispatch_queue.rs
  • components/spider-scheduler/src/error.rs
  • components/spider-scheduler/src/lib.rs
  • components/spider-scheduler/src/storage_client.rs
  • components/spider-scheduler/src/types.rs
  • components/spider-storage/src/cache.rs
  • components/spider-storage/src/cache/job.rs
  • components/spider-storage/src/task_instance_pool.rs
  • components/spider-storage/tests/scheduling_infra.rs
  • components/spider-tdl/src/task.rs
  • components/spider-tdl/src/task_context.rs
  • components/spider-tdl/tests/test_task_macro.rs
  • tests/huntsman/task-executor/src/lib.rs
  • tests/huntsman/task-executor/tests/test_process_pool.rs
  • tests/huntsman/tdl-integration/tests/complex.rs
💤 Files with no reviewable changes (1)
  • components/spider-storage/src/cache.rs

Comment on lines +82 to +84
/// The current implementation assumes that `enqueue` and `bump_session_id` will not be called
/// concurrently: `bump_session_id` must be called before consequent `enqueue` calls to make session
/// ID consistent with the enqueued assignments.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

Stamp the session at enqueue time instead of inferring it at dequeue time.

Right now correctness depends on the comment at Lines 82-84, but the public API does not enforce that constraint: DispatchQueueWriter is cloneable, enqueue does not synchronise with bump_session_id, and dequeue attaches whatever session is current when the item is received. A concurrent writer can therefore enqueue an assignment just before or during a bump and have it delivered under the wrong session, or drained unexpectedly.

Please make the queue carry (SessionId, TaskAssignment) entries, or otherwise serialise enqueue and bump_session_id behind the same writer-side lock so the session is fixed when the assignment enters the queue.

Also applies to: 92-112

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/spider-scheduler/src/dispatch_queue.rs` around lines 82 - 84, The
queue must stamp session IDs at enqueue time instead of inferring them at
dequeue: modify the internal queue element type to store (SessionId,
TaskAssignment) and change DispatchQueueWriter::enqueue to read the current
session (from wherever bump_session_id updates it) and push the stamped pair
atomically; update DispatchQueue::dequeue to return the stamped pair (or convert
it to the existing return shape) rather than attaching the session on receive.
Alternatively, if you prefer serialization instead of changing the item type,
guard both DispatchQueueWriter::enqueue and DispatchQueueWriter::bump_session_id
with the same writer-side lock/shared mutex so bump and enqueue cannot
interleave; also ensure the cloneable DispatchQueueWriter shares that lock (or
remove Clone) so concurrent clones cannot bypass the synchronization. Ensure
corresponding consumer code (dequeue handling) and any helpers that construct or
consume TaskAssignment are updated to the new (SessionId, TaskAssignment) shape.

Comment on lines +93 to +97
self.inner
.assignment_sender
.send(assignment)
.await
.map_err(|_| SchedulerError::DispatchQueueClosed)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | 🏗️ Heavy lift

🧩 Analysis chain

🌐 Web query:

In async-channel 2.3.x, if a Sender and an internally retained cloned Receiver both still exist, will Sender::send continue to treat the channel as open after all externally exposed Receiver handles are dropped? Please cite the async-channel documentation for the close semantics.

💡 Result:

Sender::send continues to treat the channel as open as long as there is at least one live (externally or internally retained) Receiver handle; the channel only becomes closed once all Receivers are dropped (or the Sender/Receiver is closed manually). [1][2] In async-channel, “When all Sender(s) or all Receiver(s) are dropped, the channel becomes closed.” [1] That means dropping externally exposed Receiver clones does not close the channel if another Receiver clone is still alive internally; Sender::send will only start returning an error once the channel is closed. [1][2] Source support for close semantics: - The crate-level docs state: “When all Senders or all Receiver`s are dropped, the channel becomes closed. When a channel is closed, no more messages can be sent …” [1] - Sender::send docs state: “If the channel is closed, this method returns an error.” [2] Therefore, in your scenario (a retained cloned Receiver still exists), Sender::send will keep treating the channel as open until that retained Receiver (and any other Receiver handles) are dropped, or until the sender/receiver is closed manually. [1][2]

Citations:


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
# Show relevant sections around 70-110 and 150-210
sed -n '60,120p' "$FILE"
echo "----"
sed -n '140,210p' "$FILE"
echo "----"
# Also locate assignment_receiver usages
rg -n "assignment_receiver|assignment_sender|DispatchQueueWriterInner|DispatchQueueReader|enqueue|send\\(" "$FILE"

Repository: y-scope/spider

Length of output: 7460


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
sed -n '60,120p' "$FILE"
echo "----"
sed -n '140,210p' "$FILE"
echo "----"
rg -n "assignment_receiver|assignment_sender|DispatchQueueWriterInner|DispatchQueueReader|enqueue|send\\(" "$FILE"

Repository: y-scope/spider

Length of output: 7460


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"
nl -ba "$FILE" | sed -n '85,115p'
echo "----"
nl -ba "$FILE" | sed -n '155,205p'

Repository: y-scope/spider

Length of output: 101


🏁 Script executed:

#!/bin/bash
set -euo pipefail
FILE="components/spider-scheduler/src/dispatch_queue.rs"

echo "---- 85-125 ----"
cat -n "$FILE" | sed -n '85,125p'
echo "---- 155-205 ----"
cat -n "$FILE" | sed -n '155,205p'

Repository: y-scope/spider

Length of output: 3516


Fix hidden retained receiver: it prevents enqueue from surfacing DispatchQueueClosed when readers are dropped.

DispatchQueueWriter::enqueue maps async_channel::Sender::send failures to SchedulerError::DispatchQueueClosed (lines 92-98), but create_dispatch_queue clones and stores assignment_receiver inside DispatchQueueWriterInner (lines 168-172, 187-191). With async-channel, the channel only becomes closed once all receiver handles are dropped, so dropping every public DispatchQueueReader leaves this internal receiver alive and keeps the channel open; send will then just wait/back-pressure once the bounded buffer fills instead of returning DispatchQueueClosed.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@components/spider-scheduler/src/dispatch_queue.rs` around lines 93 - 97, The
writer currently holds a hidden clone of assignment_receiver
(DispatchQueueWriterInner / create_dispatch_queue), preventing the channel from
closing when all public DispatchQueueReader instances are dropped so
DispatchQueueWriter::enqueue never returns DispatchQueueClosed; remove the
stored clone from DispatchQueueWriterInner (do not clone assignment_receiver
into the writer in create_dispatch_queue) so only real readers hold receiver
handles, allowing async_channel::Sender::send to fail and map to
SchedulerError::DispatchQueueClosed when readers are gone.

}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bump_smaller_smaller_session_id_returns_invalid() -> Result<()> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

smaller_smaller is cute.

}

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn bump_higher_succeeds() -> Result<()> {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test name does not conform to the convention from previous session id tests.

Comment on lines +106 to +108
while self.inner.assignment_receiver.try_recv().is_ok() {
// Drain the queue.
}

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a better way to drain the queue than running a while loop?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants