diff --git a/Cargo.lock b/Cargo.lock index 3b97650b..f2169cd1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1873,12 +1873,14 @@ dependencies = [ "async-channel", "async-trait", "dashmap", + "serde", "spider-core", "spider-proto-rust", "thiserror", "tokio", "tokio-util", "tonic", + "tracing", ] [[package]] diff --git a/components/spider-scheduler/Cargo.toml b/components/spider-scheduler/Cargo.toml index 3254c747..0a636721 100644 --- a/components/spider-scheduler/Cargo.toml +++ b/components/spider-scheduler/Cargo.toml @@ -10,12 +10,14 @@ path = "src/lib.rs" [dependencies] async-channel = "2.3.1" async-trait = "0.1.89" +serde = { version = "1.0.228", features = ["derive"] } spider-core = { path = "../spider-core" } spider-proto-rust = { path = "../spider-proto-rust" } thiserror = "2.0.18" -tokio = { version = "1.52.3", features = ["sync", "time"] } +tokio = { version = "1.52.3", features = ["macros", "rt", "sync", "time"] } tokio-util = "0.7.18" tonic = "0.12.3" +tracing = { version = "0.1.41", default-features = false, features = ["std"] } [dev-dependencies] anyhow = "1.0.102" diff --git a/components/spider-scheduler/src/core.rs b/components/spider-scheduler/src/core.rs index c6bb661c..ebc5c143 100644 --- a/components/spider-scheduler/src/core.rs +++ b/components/spider-scheduler/src/core.rs @@ -16,12 +16,12 @@ use crate::{ /// share the same runtime entry point. #[async_trait] pub trait SchedulerCore: Send { - /// The storage client used by the core to poll and read for placement decisions. - type StorageClient: SchedulerStorageClient; - /// The dispatch sink the core writes assignments to. type Sink: DispatchQueueSink; + /// The storage client used by the core to poll and read for placement decisions. + type StorageClient: SchedulerStorageClient; + /// Runs the scheduling loop until `cancellation_token` is triggered. /// /// The core polls the inbound queue through `storage_client`, applies its scheduling algorithm, @@ -39,7 +39,7 @@ pub trait SchedulerCore: Send { /// /// Returns a [`SchedulerError`] instance indicating an irrecoverable error. async fn run( - &mut self, + self, storage_client: Self::StorageClient, sink: Self::Sink, cancellation_token: tokio_util::sync::CancellationToken, diff --git a/components/spider-scheduler/src/core_impl/mod.rs b/components/spider-scheduler/src/core_impl/mod.rs new file mode 100644 index 00000000..2e27d2e9 --- /dev/null +++ b/components/spider-scheduler/src/core_impl/mod.rs @@ -0,0 +1,3 @@ +mod round_robin; + +pub use round_robin::*; diff --git a/components/spider-scheduler/src/core_impl/round_robin/implementation.rs b/components/spider-scheduler/src/core_impl/round_robin/implementation.rs new file mode 100644 index 00000000..8de08f12 --- /dev/null +++ b/components/spider-scheduler/src/core_impl/round_robin/implementation.rs @@ -0,0 +1,1116 @@ +//! The implementation of the round-robin scheduler core. See the parent module's documentation for +//! the scheduling policy and configuration. + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use serde::Deserialize; +use spider_core::types::id::{JobId, ResourceGroupId, SessionId, TaskId}; +use tokio::select; +use tokio_util::sync::CancellationToken; + +use crate::{ + DispatchQueueSink, + InboundEntry, + SchedulerCore, + SchedulerError, + SchedulerStorageClient, + StorageClientError, + TaskAssignment, +}; + +/// The configuration of the round-robin scheduler core. +#[derive(Debug, Deserialize)] +pub struct RoundRobinConfig { + /// The capacity of the active job queue. The scheduler will make task assignments from these + /// jobs in a round-robin manner. + /// + /// Must be greater than 0. + pub active_job_queue_capacity: usize, + + /// The capacity of the dispatch queue. + /// + /// Must be greater than 0. + pub dispatch_queue_capacity: usize, + + /// The capacity of the total pending ready tasks buffered in the scheduler. + /// + /// Must be greater than 0. + pub ready_task_capacity: usize, + + /// The capacity of the total pending commit-ready tasks buffered in the scheduler. + /// + /// Must be greater than 0. + pub commit_ready_task_capacity: usize, + + /// The capacity of the total pending cleanup-ready tasks buffered in the scheduler. + /// + /// Must be greater than 0. + pub cleanup_ready_task_capacity: usize, + + /// The maximum time (in milliseconds) that the scheduler will wait for the storage server to + /// fill the inbound-queue reading request. + pub storage_poll_timeout_ms: u64, + + /// The time (in milliseconds) that the scheduler will spend on each tick. + pub tick_interval_ms: u64, + + /// The time (in seconds) that a job may remain in the finalizing state before the scheduler + /// retires it. + pub finalizing_job_expiration_timeout_sec: u64, +} + +impl RoundRobinConfig { + /// Validates the configuration and creates a ready-to-run scheduler core from it. + /// + /// # Type Parameters + /// + /// * `SchedulerStorageClientType` - The storage client used to poll the inbound queue. + /// * `DispatchQueueSinkType` - The dispatch sink that task assignments are written to. + /// + /// # Returns + /// + /// A newly created round-robin scheduler core on success. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`SchedulerError::InvalidConfig`] if any of the following configuration entries is 0: + /// * `active_job_queue_capacity` + /// * `dispatch_queue_capacity` + /// * `ready_task_capacity` + /// * `commit_ready_task_capacity` + /// * `cleanup_ready_task_capacity` + pub fn make_core< + SchedulerStorageClientType: SchedulerStorageClient + 'static, + DispatchQueueSinkType: DispatchQueueSink, + >( + self, + ) -> Result, SchedulerError> + { + if self.active_job_queue_capacity == 0 { + return Err(SchedulerError::InvalidConfig( + "`active_job_queue_capacity` must be greater than 0".to_string(), + )); + } + + if self.dispatch_queue_capacity == 0 { + return Err(SchedulerError::InvalidConfig( + "`dispatch_queue_capacity` must be greater than 0".to_string(), + )); + } + + if self.ready_task_capacity == 0 { + return Err(SchedulerError::InvalidConfig( + "`ready_task_capacity` must be greater than 0".to_string(), + )); + } + + if self.commit_ready_task_capacity == 0 { + return Err(SchedulerError::InvalidConfig( + "`commit_ready_task_capacity` must be greater than 0".to_string(), + )); + } + + if self.cleanup_ready_task_capacity == 0 { + return Err(SchedulerError::InvalidConfig( + "`cleanup_ready_task_capacity` must be greater than 0".to_string(), + )); + } + + Ok(RoundRobinCore { + config: self, + _marker: std::marker::PhantomData, + }) + } +} + +/// The round-robin implementation of [`SchedulerCore`], created from +/// [`RoundRobinConfig::make_core`]. +/// +/// Holding an instance of this type guarantees the wrapped configuration has passed validation, so +/// the scheduling loop can trust its invariants without re-validating. +/// +/// # Type Parameters +/// +/// * `SchedulerStorageClientType` - The storage client used to poll the inbound queue. +/// * `DispatchQueueSinkType` - The dispatch sink that task assignments are written to. +pub struct RoundRobinCore< + SchedulerStorageClientType: SchedulerStorageClient + 'static, + DispatchQueueSinkType: DispatchQueueSink, +> { + config: RoundRobinConfig, + _marker: std::marker::PhantomData<(SchedulerStorageClientType, DispatchQueueSinkType)>, +} + +#[async_trait] +impl< + SchedulerStorageClientType: SchedulerStorageClient + 'static, + DispatchQueueSinkType: DispatchQueueSink, +> SchedulerCore for RoundRobinCore +{ + type Sink = DispatchQueueSinkType; + type StorageClient = SchedulerStorageClientType; + + async fn run( + self, + storage_client: Self::StorageClient, + sink: Self::Sink, + cancellation_token: CancellationToken, + ) -> Result<(), SchedulerError> { + RoundRobin::new( + SessionId::default(), + storage_client, + sink, + cancellation_token, + self.config, + ) + .run() + .await + } +} + +/// A FIFO queue of a job's buffered ready tasks. +#[derive(Eq, PartialEq, Debug)] +pub(super) struct JobTaskQueue { + job_id: JobId, + resource_group_id: ResourceGroupId, + task_ids: VecDeque, +} + +impl JobTaskQueue { + /// Factory function. + /// + /// # Returns + /// + /// A new task queue for the given job, seeded with `init_task_id`. + fn new(job_id: JobId, resource_group_id: ResourceGroupId, init_task_id: TaskId) -> Self { + Self { + job_id, + resource_group_id, + task_ids: VecDeque::from([init_task_id]), + } + } + + fn enqueue(&mut self, task_id: TaskId) { + self.task_ids.push_back(task_id); + } + + /// # Returns + /// + /// * The next ready task ID in FIFO order. + /// * [`None`] if the queue is empty. + fn dequeue(&mut self) -> Option { + self.task_ids.pop_front() + } +} + +/// The round-robin scheduler core created from a [`RoundRobinConfig`]. +/// +/// # Type Parameters +/// +/// * `SchedulerStorageClientType` - The storage client used to poll the inbound queue. +/// * `DispatchQueueSinkType` - The dispatch sink that task assignments are written to. +/// +/// # Note +/// +/// All member variables are marked `pub(super)` to allow the test module to inspect the internal +/// states. +pub(super) struct RoundRobin< + SchedulerStorageClientType: SchedulerStorageClient + 'static, + DispatchQueueSinkType: DispatchQueueSink, +> { + pub(super) sink: DispatchQueueSinkType, + pub(super) cancellation_token: CancellationToken, + pub(super) config: RoundRobinConfig, + pub(super) storage_session_id: SessionId, + + pub(super) buffered_tasks: HashSet<(JobId, TaskId)>, + + pub(super) active_jobs: HashMap, + pub(super) rr_queue: Vec, + pub(super) rr_cursor: usize, + + pub(super) pending_jobs: HashMap, + pub(super) pending_job_queue: VecDeque, + + pub(super) commit_ready_jobs: VecDeque<(JobId, ResourceGroupId)>, + pub(super) cleanup_ready_jobs: VecDeque<(JobId, ResourceGroupId)>, + + pub(super) finalizing_jobs: HashSet, + pub(super) finalizing_job_queue: VecDeque<(JobId, Instant)>, + + pub(super) inbound_queue_reader: AsyncInboundQueueReader, +} + +impl< + SchedulerStorageClientType: SchedulerStorageClient + 'static, + DispatchQueueSinkType: DispatchQueueSink, +> RoundRobin +{ + /// Factory function. + /// + /// Creates a [`RoundRobin`] scheduler from the given config. The config must have been + /// validated through [`RoundRobinConfig::make_core`]. + /// + /// # Returns + /// + /// The constructed [`RoundRobin`] scheduler. + pub(super) fn new( + storage_session_id: SessionId, + storage_client: SchedulerStorageClientType, + sink: DispatchQueueSinkType, + cancellation_token: CancellationToken, + config: RoundRobinConfig, + ) -> Self { + let buffered_tasks = HashSet::with_capacity(config.ready_task_capacity); + let active_jobs = HashMap::with_capacity(config.active_job_queue_capacity); + let rr_queue = Self::new_round_robin_queue(config.active_job_queue_capacity); + let rr_cursor = 0; + let pending_jobs = HashMap::with_capacity(config.active_job_queue_capacity); + let pending_job_queue = VecDeque::with_capacity(config.active_job_queue_capacity); + let commit_ready_jobs = VecDeque::with_capacity(config.commit_ready_task_capacity); + let cleanup_ready_jobs = VecDeque::with_capacity(config.cleanup_ready_task_capacity); + let finalizing_jobs = HashSet::with_capacity( + config.commit_ready_task_capacity + config.cleanup_ready_task_capacity, + ); + let finalizing_job_queue = VecDeque::new(); + let inbound_queue_reader = AsyncInboundQueueReader::new(storage_client); + Self { + sink, + cancellation_token, + config, + storage_session_id, + buffered_tasks, + active_jobs, + rr_queue, + rr_cursor, + pending_jobs, + pending_job_queue, + commit_ready_jobs, + cleanup_ready_jobs, + finalizing_jobs, + finalizing_job_queue, + inbound_queue_reader, + } + } + + /// Executes a single scheduling tick: consumes any completed inbound poll, then makes + /// scheduling decisions to fill the dispatch queue. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`Self::consume_inbound_poll_result`]'s return values on failure. + /// * Forwards [`Self::make_schedule_decisions`]'s return values on failure. + pub(super) async fn tick(&mut self) -> Result<(), SchedulerError> { + tracing::info!("Starting scheduling tick."); + self.consume_inbound_poll_result().await?; + self.make_schedule_decisions().await?; + self.retire_expired_finalizing_jobs(); + Ok(()) + } + + /// # Returns + /// + /// A new round-robin queue containing only the commit-ready and cleanup-ready slots. + fn new_round_robin_queue(active_job_pool_capacity: usize) -> Vec { + let mut round_robin_queue = Vec::with_capacity(active_job_pool_capacity + 2); + round_robin_queue.push(RoundRobinSlot::CommitReady); + round_robin_queue.push(RoundRobinSlot::CleanupReady); + round_robin_queue + } + + /// Runs the scheduling loop until the cancellation token is triggered. + /// + /// Each iteration executes one [`Self::tick`] and then sleeps for the remainder of the + /// configured tick interval. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`Self::tick`]'s return values on failure. + async fn run(mut self) -> Result<(), SchedulerError> { + tracing::info!( + config = ? self.config, + init_session_id = self.storage_session_id, + "Round-robin scheduler started." + ); + let tick_interval = Duration::from_millis(self.config.tick_interval_ms); + loop { + let now = tokio::time::Instant::now(); + let cancellation_token = self.cancellation_token.clone(); + select! { + () = cancellation_token.cancelled() => { + tracing::info!("Round-robin scheduler cancelled. Shutting down."); + return Ok(()); + } + result = self.tick() => { + result.inspect_err(|err| tracing::error!( + err = % err, + "Round-robin scheduler exits on error." + ))?; + } + } + let elapsed = now.elapsed(); + let sleep_time = tick_interval.saturating_sub(elapsed); + if sleep_time.is_zero() { + tokio::task::yield_now().await; + } else { + tokio::time::sleep(sleep_time).await; + } + } + } + + /// Clears all buffered jobs and tasks, resetting the scheduler to its initial placement state. + fn clear(&mut self) { + self.buffered_tasks.clear(); + self.active_jobs.clear(); + self.pending_jobs.clear(); + self.pending_job_queue.clear(); + self.commit_ready_jobs.clear(); + self.cleanup_ready_jobs.clear(); + self.finalizing_jobs.clear(); + self.finalizing_job_queue.clear(); + + self.rr_queue = Self::new_round_robin_queue(self.config.active_job_queue_capacity); + self.rr_cursor = 0; + } + + /// Removes the given job from the active set, discards its buffered tasks, and backfills the + /// freed slot with the next pending job, if any. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`SchedulerError::Internal`] if the given job is not currently active. + fn retire_active_job(&mut self, job_id: JobId) -> Result<(), SchedulerError> { + tracing::info!(job_id = ? job_id, "Retiring active job."); + if let Some(index) = self.rr_queue.iter().position(|entry| match entry { + RoundRobinSlot::Job(id) => *id == job_id, + _ => false, + }) { + self.rr_queue.swap_remove(index); + } else { + return Err(SchedulerError::Internal(format!( + "attempt to remove a non-existing active job: {job_id:?}" + ))); + } + + if let Some(removed_entry) = self.active_jobs.remove(&job_id) { + self.discard_job_tasks(removed_entry); + } else { + return Err(SchedulerError::Internal(format!( + "attempt to destroy a non-existing active job: {job_id:?}" + ))); + } + + if let Some(next_pending_job) = self.pop_next_pending_job() { + tracing::info!( + job_id = ? next_pending_job.job_id, + "Pending job promoted to active job." + ); + self.rr_queue + .push(RoundRobinSlot::Job(next_pending_job.job_id)); + self.active_jobs + .insert(next_pending_job.job_id, next_pending_job); + } + Ok(()) + } + + /// # Returns + /// + /// The next pending job in FIFO order, or [`None`] if there is no pending job left. + fn pop_next_pending_job(&mut self) -> Option { + loop { + let job_id = self.pending_job_queue.pop_front()?; + // NOTE: The job may have been cancelled and removed from `pending_jobs`, so the ID in + // the queue may not necessarily exist in `pending_jobs`. + if let Some(pending_job) = self.pending_jobs.remove(&job_id) { + return Some(pending_job); + } + } + } + + /// Removes all of the given job's queued tasks from the buffered-task set. + fn discard_job_tasks(&mut self, job_entry: JobTaskQueue) { + tracing::info!( + job_id = ? job_entry.job_id, + num_tasks = job_entry.task_ids.len(), + "Discarding job tasks." + ); + for task_id in job_entry.task_ids { + self.buffered_tasks.remove(&(job_entry.job_id, task_id)); + } + } + + /// Inserts a job as it is considered finalizing (commit-ready or cleanup-ready). Once inserted, + /// any further tasks for the job will be ignored until this queue is reset. + fn mark_job_finalizing(&mut self, job_id: JobId) { + if self.finalizing_jobs.insert(job_id) { + self.finalizing_job_queue + .push_back((job_id, Instant::now())); + } + } + + /// Retires expired finalizing jobs. + /// + /// A finalizing job is considered expired once it has remained in the finalizing state for + /// longer than [`RoundRobinConfig::finalizing_job_expiration_timeout_sec`]. + fn retire_expired_finalizing_jobs(&mut self) { + let expiration_time = + Duration::from_secs(self.config.finalizing_job_expiration_timeout_sec); + while let Some((job_id, insertion_time)) = self.finalizing_job_queue.front() { + if insertion_time.elapsed() > expiration_time { + tracing::info!(job_id = ? job_id, "Finalizing job retired."); + self.finalizing_jobs.remove(job_id); + self.finalizing_job_queue.pop_front(); + } else { + break; + } + } + } + + /// Loads polled inbound entries into the scheduler's internal buffers. + /// + /// If the polled session is newer than the current session, all existing placement states are + /// cleared and the dispatch queue's session is bumped before loading. Entries whose tasks are + /// already buffered are ignored. + /// + /// A commit-ready or cleanup-ready entry marks its job as finalizing. A finalizing job no + /// longer participates in regular-task scheduling: the job is removed from the active or + /// pending set, its buffered ready tasks are discarded, and its incoming ready entries are + /// ignored. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`SchedulerError::InvalidSessionId`] if the polled session is older than the current + /// session. + /// * Forwards [`DispatchQueueSink::bump_session_id`]'s return values on failure. + /// * Forwards [`Self::enqueue_commit_ready_entries`]'s return values on failure. + /// * Forwards [`Self::enqueue_cleanup_ready_entries`]'s return values on failure. + async fn ingest_inbound_entries( + &mut self, + curr_session_id: SessionId, + storage_session_id: SessionId, + ready_entries: Vec, + commit_ready_entries: Vec, + cleanup_ready_entries: Vec, + ) -> Result<(), SchedulerError> { + if storage_session_id < curr_session_id { + return Err(SchedulerError::InvalidSessionId(storage_session_id)); + } + if storage_session_id > curr_session_id { + tracing::info!( + curr_session_id = ? curr_session_id, + storage_session_id = ? storage_session_id, + "New session detected. Clearing existing placement state and bumping dispatch \ + queue session." + ); + self.storage_session_id = storage_session_id; + self.clear(); + self.sink.bump_session_id(storage_session_id).await?; + } + + // Load commit-ready tasks and cleanup-ready tasks first to avoid loading a job that is + // already finalizing. + self.enqueue_commit_ready_entries(commit_ready_entries)?; + self.enqueue_cleanup_ready_entries(cleanup_ready_entries)?; + self.enqueue_ready_entries(ready_entries); + + Ok(()) + } + + /// Enqueues polled commit-ready entries: each entry's job is marked finalizing, queued for a + /// commit-task assignment, and removed from the active or pending set. + /// + /// Entries whose tasks are already buffered are ignored. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`Self::retire_active_job`]'s return values on failure. + fn enqueue_commit_ready_entries( + &mut self, + commit_ready_entries: Vec, + ) -> Result<(), SchedulerError> { + for inbound_entry in commit_ready_entries { + if !self + .buffered_tasks + .insert((inbound_entry.job_id, inbound_entry.task_id)) + { + continue; + } + + tracing::info!( + job_id = ? inbound_entry.job_id, + "Commit-ready task received. Finalizing job." + ); + + self.mark_job_finalizing(inbound_entry.job_id); + self.commit_ready_jobs + .push_back((inbound_entry.job_id, inbound_entry.resource_group_id)); + + if self.active_jobs.contains_key(&inbound_entry.job_id) { + self.retire_active_job(inbound_entry.job_id)?; + continue; + } + + if let Some(job_entry) = self.pending_jobs.remove(&inbound_entry.job_id) { + self.discard_job_tasks(job_entry); + } + } + + Ok(()) + } + + /// Enqueues polled cleanup-ready entries: each entry's job is marked finalizing, queued for a + /// cleanup-task assignment, and removed from the active or pending set. + /// + /// Entries whose tasks are already buffered are ignored. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`Self::retire_active_job`]'s return values on failure. + fn enqueue_cleanup_ready_entries( + &mut self, + cleanup_ready_entries: Vec, + ) -> Result<(), SchedulerError> { + for inbound_entry in cleanup_ready_entries { + if !self + .buffered_tasks + .insert((inbound_entry.job_id, inbound_entry.task_id)) + { + continue; + } + + tracing::info!( + job_id = ? inbound_entry.job_id, + "Cleanup-ready task received. Finalizing job." + ); + + self.mark_job_finalizing(inbound_entry.job_id); + self.cleanup_ready_jobs + .push_back((inbound_entry.job_id, inbound_entry.resource_group_id)); + + if self.active_jobs.contains_key(&inbound_entry.job_id) { + self.retire_active_job(inbound_entry.job_id)?; + continue; + } + + if let Some(job_entry) = self.pending_jobs.remove(&inbound_entry.job_id) { + self.discard_job_tasks(job_entry); + } + } + + Ok(()) + } + + /// Enqueues polled regular ready entries into their jobs' task queues + /// + /// Entries of finalizing jobs and entries whose tasks are already buffered are ignored. + fn enqueue_ready_entries(&mut self, ready_entries: Vec) { + for inbound_entry in ready_entries { + if self.finalizing_jobs.contains(&inbound_entry.job_id) { + tracing::info!( + job_id = ? inbound_entry.job_id, + "Ready task received for a finalizing job. Ignored." + ); + continue; + } + if !self + .buffered_tasks + .insert((inbound_entry.job_id, inbound_entry.task_id)) + { + continue; + } + + tracing::debug!( + job_id = ? inbound_entry.job_id, + task_id = ? inbound_entry.task_id, + "Inbound task received." + ); + + if let Some(active_job) = self.active_jobs.get_mut(&inbound_entry.job_id) { + active_job.enqueue(inbound_entry.task_id); + continue; + } + if let Some(pending_job) = self.pending_jobs.get_mut(&inbound_entry.job_id) { + pending_job.enqueue(inbound_entry.task_id); + continue; + } + + if self.active_jobs.len() < self.config.active_job_queue_capacity { + tracing::info!( + job_id = ? inbound_entry.job_id, + "New job received. Placing in active job queue." + ); + self.active_jobs.insert( + inbound_entry.job_id, + JobTaskQueue::new( + inbound_entry.job_id, + inbound_entry.resource_group_id, + inbound_entry.task_id, + ), + ); + self.rr_queue + .push(RoundRobinSlot::Job(inbound_entry.job_id)); + continue; + } + + tracing::info!( + job_id = ? inbound_entry.job_id, + "New job received. Placing in pending job queue." + ); + self.pending_jobs.insert( + inbound_entry.job_id, + JobTaskQueue::new( + inbound_entry.job_id, + inbound_entry.resource_group_id, + inbound_entry.task_id, + ), + ); + self.pending_job_queue.push_back(inbound_entry.job_id); + } + } + + /// Consumes the in-flight inbound poll if it has completed, ingesting its entries and starting + /// the next poll; starts the initial poll if none is in flight. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`AsyncInboundQueueReader::try_collect_result`]'s return values on failure. + /// * Forwards [`Self::ingest_inbound_entries`]'s return values on failure. + /// * Forwards [`Self::start_inbound_poll`]'s return values on failure. + async fn consume_inbound_poll_result(&mut self) -> Result<(), SchedulerError> { + let curr_session_id = self.storage_session_id; + let inbound_poll_state = self + .inbound_queue_reader + .try_collect_result(curr_session_id) + .await?; + match inbound_poll_state { + InboundPollState::Ready { + session_id: storage_session_id, + ready_entries, + commit_ready_entries, + cleanup_ready_entries, + } => { + tracing::info!("Inbound poll completed."); + self.ingest_inbound_entries( + curr_session_id, + storage_session_id, + ready_entries, + commit_ready_entries, + cleanup_ready_entries, + ) + .await?; + self.start_inbound_poll()?; + } + InboundPollState::Pending => {} + InboundPollState::NotStarted => { + self.start_inbound_poll()?; + } + } + + Ok(()) + } + + /// Makes scheduling decisions in round-robin order, writing task assignments to the dispatch + /// queue until it reaches capacity or no buffered task is left. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`SchedulerError::Internal`] if the round-robin queue is inconsistent with the scheduler's + /// job bookkeeping. + /// * Forwards [`DispatchQueueSink::enqueue`]'s return values on failure. + /// * Forwards [`Self::retire_active_job`]'s return values on failure. + async fn make_schedule_decisions(&mut self) -> Result<(), SchedulerError> { + let dispatch_slots = self + .config + .dispatch_queue_capacity + .saturating_sub(self.sink.size()); + let mut remaining_dispatch_slots = dispatch_slots; + 'fill_dispatch_queue: while remaining_dispatch_slots > 0 && !self.buffered_tasks.is_empty() + { + if self.rr_cursor >= self.rr_queue.len() { + self.rr_cursor = 0; + } + let round_robin_queue_entry = match self.rr_queue.get(self.rr_cursor) { + Some(entry) => entry.clone(), + None => { + return Err(SchedulerError::Internal( + "round-robin cursor is corrupted".to_string(), + )); + } + }; + self.rr_cursor += 1; + + match round_robin_queue_entry { + RoundRobinSlot::CleanupReady => { + let Some((job_id, resource_group_id)) = self.cleanup_ready_jobs.pop_front() + else { + continue; + }; + self.sink + .enqueue(TaskAssignment { + job_id, + resource_group_id, + task_id: TaskId::Cleanup, + }) + .await?; + self.buffered_tasks.remove(&(job_id, TaskId::Cleanup)); + remaining_dispatch_slots -= 1; + } + RoundRobinSlot::CommitReady => { + for _ in 0..self.config.active_job_queue_capacity { + if remaining_dispatch_slots == 0 { + break 'fill_dispatch_queue; + } + let Some((job_id, resource_group_id)) = self.commit_ready_jobs.pop_front() + else { + break; + }; + self.sink + .enqueue(TaskAssignment { + job_id, + resource_group_id, + task_id: TaskId::Commit, + }) + .await?; + self.buffered_tasks.remove(&(job_id, TaskId::Commit)); + remaining_dispatch_slots -= 1; + } + } + RoundRobinSlot::Job(job_id) => { + let Some(job_entry) = self.active_jobs.get_mut(&job_id) else { + return Err(SchedulerError::Internal(format!( + "attempt to remove a non-existing active job: {job_id:?}" + ))); + }; + if let Some(task_id) = job_entry.dequeue() { + self.sink + .enqueue(TaskAssignment { + job_id, + resource_group_id: job_entry.resource_group_id, + task_id, + }) + .await?; + self.buffered_tasks.remove(&(job_id, task_id)); + remaining_dispatch_slots -= 1; + } else { + self.retire_active_job(job_id)?; + } + } + } + } + + tracing::info!( + dispatch_slots = dispatch_slots, + num_task_assignments_enqueued = dispatch_slots - remaining_dispatch_slots, + "Decision-making loop completed." + ); + + Ok(()) + } + + /// Starts a new asynchronous inbound poll, with per-lane entry limits derived from the + /// remaining buffer capacities. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`AsyncInboundQueueReader::start`]'s return values on failure. + fn start_inbound_poll(&mut self) -> Result<(), SchedulerError> { + let num_commit_ready_tasks = self.commit_ready_jobs.len(); + let num_cleanup_ready_tasks = self.cleanup_ready_jobs.len(); + let max_commit_ready_entries = self + .config + .commit_ready_task_capacity + .saturating_sub(num_commit_ready_tasks); + let max_cleanup_ready_entries = self + .config + .cleanup_ready_task_capacity + .saturating_sub(num_cleanup_ready_tasks); + let max_ready_entries = self.config.ready_task_capacity.saturating_sub( + self.buffered_tasks.len() - num_commit_ready_tasks - num_cleanup_ready_tasks, + ); + + self.inbound_queue_reader.start( + Duration::from_millis(self.config.storage_poll_timeout_ms), + max_ready_entries, + max_commit_ready_entries, + max_cleanup_ready_entries, + ) + } +} + +/// A slot in the round-robin rotation that the scheduler draws task assignments from. +#[derive(Clone)] +pub(super) enum RoundRobinSlot { + /// An active job: assignments are drawn from the job's buffered ready tasks. + Job(JobId), + + /// The commit lane: assignments are drawn from the buffered commit-ready jobs. + CommitReady, + + /// The cleanup lane: assignments are drawn from the buffered cleanup-ready jobs. + CleanupReady, +} + +/// The state of an asynchronous inbound-queue poll. +enum InboundPollState { + /// The poll has completed, carrying the polled session and the entries drained from each + /// inbound-queue lane. + Ready { + session_id: SessionId, + ready_entries: Vec, + commit_ready_entries: Vec, + cleanup_ready_entries: Vec, + }, + + /// The poll is still in flight. + Pending, + + /// No poll has been started. + NotStarted, +} + +/// The join handles of one in-flight inbound poll, one per inbound-queue lane. +#[allow(clippy::struct_field_names)] +struct InboundPollHandles { + ready_handle: + tokio::task::JoinHandle), StorageClientError>>, + commit_ready_handle: + tokio::task::JoinHandle), StorageClientError>>, + cleanup_ready_handle: + tokio::task::JoinHandle), StorageClientError>>, +} + +impl InboundPollHandles { + /// Tries to collect the results of all lane polls without blocking. + /// + /// Entries from lanes that report an older session than the latest observed session are + /// dropped. + /// + /// # Returns + /// + /// On success: + /// + /// * [`InboundPollState::Pending`] if any lane poll is still in flight. + /// * [`InboundPollState::Ready`] with the latest observed session and its entries otherwise. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`SchedulerError::Internal`] if any lane's polling task fails to join. + /// * Forwards [`SchedulerStorageClient::poll_ready`]'s return values on failure. + /// * Forwards [`SchedulerStorageClient::poll_commit_ready`]'s return values on failure. + /// * Forwards [`SchedulerStorageClient::poll_cleanup_ready`]'s return values on failure. + async fn try_collect_result( + &mut self, + curr_session_id: SessionId, + ) -> Result { + if !self.ready_handle.is_finished() + || !self.commit_ready_handle.is_finished() + || !self.cleanup_ready_handle.is_finished() + { + return Ok(InboundPollState::Pending); + } + + let (ready_session_id, ready_entries) = (&mut self.ready_handle) + .await + .map_err(|e| SchedulerError::Internal(e.to_string()))??; + let (commit_session_id, commit_ready_entries) = (&mut self.commit_ready_handle) + .await + .map_err(|e| SchedulerError::Internal(e.to_string()))??; + let (cleanup_session_id, cleanup_ready_entries) = + (&mut self.cleanup_ready_handle) + .await + .map_err(|e| SchedulerError::Internal(e.to_string()))??; + + let latest_session_id = curr_session_id + .max(ready_session_id) + .max(commit_session_id) + .max(cleanup_session_id); + + Ok(InboundPollState::Ready { + session_id: latest_session_id, + ready_entries: Self::drop_if_stale(ready_session_id, latest_session_id, ready_entries), + commit_ready_entries: Self::drop_if_stale( + commit_session_id, + latest_session_id, + commit_ready_entries, + ), + cleanup_ready_entries: Self::drop_if_stale( + cleanup_session_id, + latest_session_id, + cleanup_ready_entries, + ), + }) + } + + /// # Returns + /// + /// `entries` if `session_id` matches `latest_session_id`, or an empty vector otherwise. + fn drop_if_stale( + session_id: SessionId, + latest_session_id: SessionId, + entries: Vec, + ) -> Vec { + if session_id == latest_session_id { + entries + } else { + Vec::new() + } + } +} + +/// A reader that runs inbound-queue polls as background tasks, with at most one polling request +/// (from all three lanes) in flight at a time. +/// +/// # Type Parameters +/// +/// * `StorageClientType` - The storage client used to poll the inbound queue. +pub(super) struct AsyncInboundQueueReader { + storage_client: StorageClientType, + handle: Option, +} + +impl + AsyncInboundQueueReader +{ + /// Factory function. + /// + /// # Returns + /// + /// A new reader with no poll in flight. + const fn new(storage_client: StorageClientType) -> Self { + Self { + storage_client, + handle: None, + } + } + + /// Tries to collect the result of the in-flight poll without blocking, releasing the poll + /// handles once a result is produced. + /// + /// # Returns + /// + /// On success: + /// + /// * [`InboundPollState::NotStarted`] if no poll is in flight. + /// * Forwards [`InboundPollHandles::try_collect_result`]'s return values otherwise. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * Forwards [`InboundPollHandles::try_collect_result`]'s return values on failure. + async fn try_collect_result( + &mut self, + curr_session_id: SessionId, + ) -> Result { + match &mut self.handle { + None => Ok(InboundPollState::NotStarted), + Some(handle) => { + let inbound_poll_state = handle.try_collect_result(curr_session_id).await?; + if !matches!(inbound_poll_state, InboundPollState::Pending) { + self.handle = None; + } + Ok(inbound_poll_state) + } + } + } + + /// Starts a new inbound poll, polling each inbound-queue lane as a background task. + /// + /// Lanes whose entry limit is 0 are not polled; if all limits are 0, no poll is started. + /// + /// # Errors + /// + /// Returns an error if: + /// + /// * [`SchedulerError::Internal`] if a poll is already in flight. + fn start( + &mut self, + storage_poll_timeout: Duration, + max_ready_entries: usize, + max_commit_ready_entries: usize, + max_cleanup_ready_entries: usize, + ) -> Result<(), SchedulerError> { + if self.handle.is_some() { + return Err(SchedulerError::Internal( + "inbound poll handle already exists".to_string(), + )); + } + + if max_ready_entries == 0 && max_commit_ready_entries == 0 && max_cleanup_ready_entries == 0 + { + tracing::info!("Inbound poll skipped: all entry limits are 0."); + return Ok(()); + } + + let ready_storage_client = self.storage_client.clone(); + let ready_handle = tokio::task::spawn(async move { + if max_ready_entries == 0 { + return Ok((0, Vec::new())); + } + ready_storage_client + .poll_ready(max_ready_entries, storage_poll_timeout) + .await + }); + + let commit_ready_storage_client = self.storage_client.clone(); + let commit_ready_handle = tokio::task::spawn(async move { + if max_commit_ready_entries == 0 { + return Ok((0, Vec::new())); + } + commit_ready_storage_client + .poll_commit_ready(max_commit_ready_entries, storage_poll_timeout) + .await + }); + + let cleanup_ready_storage_client = self.storage_client.clone(); + let cleanup_ready_handle = tokio::task::spawn(async move { + if max_cleanup_ready_entries == 0 { + return Ok((0, Vec::new())); + } + cleanup_ready_storage_client + .poll_cleanup_ready(max_cleanup_ready_entries, storage_poll_timeout) + .await + }); + + self.handle = Some(InboundPollHandles { + ready_handle, + commit_ready_handle, + cleanup_ready_handle, + }); + + tracing::info!( + max_ready_entries = ? max_ready_entries, + max_commit_ready_entries = ? max_commit_ready_entries, + max_cleanup_ready_entries = ? max_cleanup_ready_entries, + "Inbound poll initiated." + ); + + Ok(()) + } +} diff --git a/components/spider-scheduler/src/core_impl/round_robin/mod.rs b/components/spider-scheduler/src/core_impl/round_robin/mod.rs new file mode 100644 index 00000000..21af05f7 --- /dev/null +++ b/components/spider-scheduler/src/core_impl/round_robin/mod.rs @@ -0,0 +1,71 @@ +//! Round-robin scheduler. +//! +//! This scheduler provides basic fairness across jobs using a round-robin scheduling policy. It +//! polls tasks from the inbound queue (maintained by the storage service) and organizes jobs into +//! two sets: +//! +//! * Active jobs: jobs that participate in round-robin scheduling. +//! * Pending jobs: jobs that are buffered but not yet scheduled. When an active job has no +//! remaining schedulable tasks, it is replaced by the next pending job in FIFO order. +//! +//! Commit-ready and cleanup-ready tasks are buffered separately from regular active-job tasks. They +//! participate in round-robin scheduling through higher-priority tiers: on each round, commit tasks +//! are scheduled before cleanup tasks, and cleanup tasks are scheduled before regular active-job +//! tasks. +//! +//! The scheduler operates in discrete ticks. During each tick, it attempts to consume the results +//! of an asynchronous inbound-queue polling operation and loads any newly available tasks into its +//! internal buffers. It then makes scheduling decisions until the dispatch queue reaches capacity. +//! +//! # Properties +//! +//! * During each round-robin tick, the scheduler enqueues task assignments into the dispatch queue +//! in the following priority order, subject to the remaining dispatch capacity: +//! * Up to `active_job_queue_capacity` commit task assignments, in FIFO order. +//! * This mirrors the fairness bound for active jobs: each job can contribute at most one +//! commit task assignment per pass. +//! * Exactly one cleanup task assignment, if any. +//! * Exactly one regular task assignment for each active job, in FIFO order. +//! * All buffered tasks are unique. Tasks loaded from the inbound queue are deduplicated before +//! entering the scheduler's internal buffers. +//! * Active job retirement is deferred. When an active job has no remaining schedulable tasks in +//! the scheduler, it is not immediately replaced by the next pending job. Instead, retirement is +//! delayed until the round-robin order loops back to that active job and its buffer is still +//! empty. +//! +//! # Configuration +//! +//! * `active_job_queue_capacity`: Maximum number of active jobs maintained by the scheduler. +//! * `dispatch_queue_capacity`: Maximum number of task assignments in the dispatch queue. +//! * `ready_task_capacity`: Maximum number of ready tasks buffered by the scheduler. +//! * `commit_ready_task_capacity`: Maximum number of buffered commit-ready tasks. +//! * `cleanup_ready_task_capacity`: Maximum number of buffered cleanup-ready tasks. +//! * `storage_poll_timeout_ms`: Maximum time, in milliseconds, that inbound-queue polling may block +//! on the storage-service side. +//! * `tick_interval_ms`: Interval, in milliseconds, between scheduler ticks (tick execution time +//! included). +//! * `finalizing_job_expiration_timeout_sec`: Time, in seconds, that a job may remain in the +//! finalizing state before the scheduler retires it. +//! +//! # Limitations +//! +//! * The scheduler is not notified when a job terminates if: (1) the job is cancelled and has no +//! cleanup task; or (2) the job fails. In these cases, the scheduler may continue to buffer tasks +//! that belong to the terminated job, and those tasks may eventually be dispatched. This does not +//! affect system correctness, because storage will reject stale task assignments for terminated +//! jobs. However, it may waste dispatch capacity and execution-manager cycles. +//! * Related issue: +//! * The scheduler does not have job-level metadata for determining when an active job can be +//! safely retired. As a result, an active job may be retired from the active-job queue even +//! though additional tasks for that job are still buffered in the inbound queue. This is usually +//! not an issue for flattened task graphs, where most ready tasks are exposed to the scheduler at +//! once. However, it may occur frequently for task graphs with complex dependencies, especially +//! when a large set of upstream tasks is followed by a small number of downstream tasks. +//! * Related issue: + +mod implementation; + +#[cfg(test)] +mod tests; + +pub use implementation::{RoundRobinConfig, RoundRobinCore}; diff --git a/components/spider-scheduler/src/core_impl/round_robin/tests.rs b/components/spider-scheduler/src/core_impl/round_robin/tests.rs new file mode 100644 index 00000000..581e7bbc --- /dev/null +++ b/components/spider-scheduler/src/core_impl/round_robin/tests.rs @@ -0,0 +1,1058 @@ +//! Unit tests for the round-robin scheduler core. + +use std::{ + collections::{HashMap, HashSet, VecDeque}, + sync::{ + Arc, + Mutex, + atomic::{AtomicU64, Ordering}, + }, + time::Duration, +}; + +use anyhow::bail; +use async_trait::async_trait; +use spider_core::{ + job::JobState, + types::id::{JobId, ResourceGroupId, SessionId, TaskId}, +}; +use tokio_util::sync::CancellationToken; + +use super::{RoundRobinConfig, implementation::RoundRobin}; +use crate::{ + DispatchQueueSource, + InboundEntry, + SchedulerCore, + SchedulerError, + SchedulerStorageClient, + StorageClientError, + TaskAssignment, + dispatch_queue::{DispatchQueueReader, DispatchQueueWriter, create_dispatch_queue}, +}; + +/// The session used by tests that never bump the session. +const DEFAULT_SESSION_ID: SessionId = 0; + +/// The white-box scheduler under test, driven by manual ticks. +type TestScheduler = RoundRobin; + +/// The maximum time to wait for expected assignments before failing a test. +const DRAIN_DEADLINE: Duration = Duration::from_secs(5); + +struct MockStorageInner { + session_id: AtomicU64, + ready_batches: Mutex)>>, + commit_ready_batches: Mutex)>>, + cleanup_ready_batches: Mutex)>>, +} + +/// A mock [`SchedulerStorageClient`] backed by scripted poll batches. +/// +/// Each lane serves its scripted batches in FIFO order, one batch per poll; when a lane's script +/// is empty, polls return an empty batch under the mock's current session immediately (the `wait` +/// parameter is ignored to keep tests fast). +#[derive(Clone)] +struct MockStorageClient { + inner: Arc, +} + +impl MockStorageClient { + /// Factory function. + /// + /// # Returns + /// + /// A new mock storage client with no scripted batches, reporting `session_id` on empty polls. + fn new(session_id: SessionId) -> Self { + Self { + inner: Arc::new(MockStorageInner { + session_id: AtomicU64::new(session_id), + ready_batches: Mutex::new(VecDeque::new()), + commit_ready_batches: Mutex::new(VecDeque::new()), + cleanup_ready_batches: Mutex::new(VecDeque::new()), + }), + } + } + + /// Scripts a batch to be served by the next unserved [`SchedulerStorageClient::poll_ready`] + /// call. + fn push_ready_batch(&self, session_id: SessionId, entries: Vec) { + self.inner + .ready_batches + .lock() + .expect("ready-batch lock poisoned") + .push_back((session_id, entries)); + } + + /// Scripts a batch to be served by the next unserved + /// [`SchedulerStorageClient::poll_commit_ready`] call. + fn push_commit_ready_batch(&self, session_id: SessionId, entries: Vec) { + self.inner + .commit_ready_batches + .lock() + .expect("commit-ready-batch lock poisoned") + .push_back((session_id, entries)); + } + + /// Scripts a batch to be served by the next unserved + /// [`SchedulerStorageClient::poll_cleanup_ready`] call. + fn push_cleanup_ready_batch(&self, session_id: SessionId, entries: Vec) { + self.inner + .cleanup_ready_batches + .lock() + .expect("cleanup-ready-batch lock poisoned") + .push_back((session_id, entries)); + } + + /// # Returns + /// + /// The session reported on polls that have no scripted batch. + fn current_session(&self) -> SessionId { + self.inner.session_id.load(Ordering::Relaxed) + } + + /// Sets the session reported on polls that have no scripted batch. + fn set_session(&self, session_id: SessionId) { + self.inner.session_id.store(session_id, Ordering::Relaxed); + } + + /// Serves one poll from the given lane's script. + /// + /// # Returns + /// + /// The lane's next scripted batch, or an empty batch under the current session if the lane's + /// script is exhausted. + fn serve_batch( + &self, + batches: &Mutex)>>, + max_items: usize, + ) -> (SessionId, Vec) { + let scripted_batch = batches.lock().expect("batch lock poisoned").pop_front(); + let Some((session_id, entries)) = scripted_batch else { + return (self.current_session(), Vec::new()); + }; + assert!( + entries.len() <= max_items, + "scripted batch of {} entries exceeds the scheduler's poll limit of {max_items}", + entries.len(), + ); + (session_id, entries) + } +} + +#[async_trait] +impl SchedulerStorageClient for MockStorageClient { + async fn poll_ready( + &self, + max_items: usize, + _wait: Duration, + ) -> Result<(SessionId, Vec), StorageClientError> { + Ok(self.serve_batch(&self.inner.ready_batches, max_items)) + } + + async fn poll_commit_ready( + &self, + max_items: usize, + _wait: Duration, + ) -> Result<(SessionId, Vec), StorageClientError> { + Ok(self.serve_batch(&self.inner.commit_ready_batches, max_items)) + } + + async fn poll_cleanup_ready( + &self, + max_items: usize, + _wait: Duration, + ) -> Result<(SessionId, Vec), StorageClientError> { + Ok(self.serve_batch(&self.inner.cleanup_ready_batches, max_items)) + } + + async fn job_state(&self, _job_id: JobId) -> Result { + Ok(JobState::Running) + } +} + +/// # Returns +/// +/// A config with the given pool and dispatch capacities, and defaults large enough that the other +/// capacities never throttle the tests. +fn make_config( + active_job_queue_capacity: usize, + dispatch_queue_capacity: usize, +) -> RoundRobinConfig { + RoundRobinConfig { + active_job_queue_capacity, + dispatch_queue_capacity, + ready_task_capacity: 16_384, + commit_ready_task_capacity: 16, + cleanup_ready_task_capacity: 16, + storage_poll_timeout_ms: 10, + tick_interval_ms: 1, + finalizing_job_expiration_timeout_sec: 6 * 60 * 60, + } +} + +/// # Returns +/// +/// `n` jobs with freshly generated job and resource-group IDs. +fn make_jobs(n: usize) -> Vec<(JobId, ResourceGroupId)> { + (0..n) + .map(|_| (JobId::random(), ResourceGroupId::random())) + .collect() +} + +/// Builds one inbound ready batch containing `tasks_per_job` tasks per job, interleaved across +/// jobs in per-job FIFO order (task 0 of every job, then task 1 of every job, and so on). +/// +/// When `dup_every` is non-zero, every `dup_every`-th entry is duplicated adjacently within the +/// batch, emulating the duplicate task assignments a real storage may return. +/// +/// # Returns +/// +/// The inbound entries of the batch. +fn make_ready_batch( + jobs: &[(JobId, ResourceGroupId)], + tasks_per_job: usize, + dup_every: usize, +) -> Vec { + let mut entries = Vec::new(); + let mut num_emitted = 0_usize; + for task_index in 0..tasks_per_job { + for &(job_id, resource_group_id) in jobs { + let entry = InboundEntry { + resource_group_id, + job_id, + task_id: TaskId::Index(task_index), + }; + entries.push(entry); + num_emitted += 1; + if dup_every > 0 && num_emitted.is_multiple_of(dup_every) { + entries.push(entry); + } + } + } + entries +} + +/// Builds one inbound batch that marks each given job as finalizing, with `task_id` (either +/// [`TaskId::Commit`] or [`TaskId::Cleanup`]) set on every entry. +/// +/// # Returns +/// +/// The inbound entries of the batch. +fn make_finalizing_batch(jobs: &[(JobId, ResourceGroupId)], task_id: TaskId) -> Vec { + jobs.iter() + .map(|&(job_id, resource_group_id)| InboundEntry { + resource_group_id, + job_id, + task_id, + }) + .collect() +} + +/// Validates the given config and spawns the scheduler's public run loop as a background task. +/// +/// # Returns +/// +/// A tuple containing: +/// +/// * The join handle yielding the scheduler's exit result. +/// * The cancellation token that stops the scheduler. +/// +/// # Panics +/// +/// Panics if the given config fails validation. +fn spawn_scheduler( + config: RoundRobinConfig, + storage_client: MockStorageClient, + sink: DispatchQueueWriter, +) -> ( + tokio::task::JoinHandle>, + CancellationToken, +) { + let core = config.make_core().expect("config validation failed"); + let cancellation_token = CancellationToken::new(); + let scheduler_token = cancellation_token.clone(); + let handle = tokio::spawn(async move { core.run(storage_client, sink, scheduler_token).await }); + (handle, cancellation_token) +} + +/// Drains exactly `n` task assignments from the dispatch queue, playing the worker pool's role. +/// +/// # Returns +/// +/// The drained assignments in FIFO order on success. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * Fewer than `n` assignments arrive within [`DRAIN_DEADLINE`]. +/// * Forwards [`DispatchQueueSource::dequeue`]'s return values on failure. +async fn drain_n(reader: &DispatchQueueReader, n: usize) -> anyhow::Result> { + const DEQUEUE_WAIT: Duration = Duration::from_millis(100); + let deadline = tokio::time::Instant::now() + DRAIN_DEADLINE; + let mut assignments = Vec::with_capacity(n); + while assignments.len() < n { + if tokio::time::Instant::now() > deadline { + bail!( + "timed out draining assignments: got {}, expected {n}", + assignments.len(), + ); + } + if let Some((_session_id, assignment)) = reader.dequeue(DEQUEUE_WAIT).await? { + assignments.push(assignment); + } + } + Ok(assignments) +} + +/// Asserts that no further assignment arrives within a short observation window, proving that +/// duplicated or dropped tasks never leak into the dispatch queue. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * Forwards [`DispatchQueueSource::dequeue`]'s return values on failure. +/// +/// # Panics +/// +/// Panics if an assignment arrives within the observation window. +async fn assert_no_more_assignments(reader: &DispatchQueueReader) -> anyhow::Result<()> { + const OBSERVATION_WINDOW: Duration = Duration::from_secs(1); + let unexpected_assignment = reader.dequeue(OBSERVATION_WINDOW).await?; + assert_eq!(unexpected_assignment, None); + Ok(()) +} + +/// # Returns +/// +/// A vector of tuples following the order of the input assignments, each tuple containing: +/// +/// * The job ID. +/// * The resource group ID. +/// * The task ID. +fn make_assignment_tuple(assignments: &[TaskAssignment]) -> Vec<(JobId, ResourceGroupId, TaskId)> { + assignments + .iter() + .map(|assignment| { + ( + assignment.job_id, + assignment.resource_group_id, + assignment.task_id, + ) + }) + .collect() +} + +/// Asserts that `assignments` is exactly `rounds` full round-robin rotations over `jobs` in order: +/// rotation `r` consists of task `r` of every job, following the jobs' order, so every job's task +/// indices are dispatched FIFO. +/// +/// # Panics +/// +/// Panics if `assignments` deviates from the expected strict rotation. +fn assert_strict_rotation( + assignments: &[TaskAssignment], + jobs: &[(JobId, ResourceGroupId)], + rounds: usize, +) { + let expected: Vec<(JobId, ResourceGroupId, TaskId)> = (0..rounds) + .flat_map(|round| { + jobs.iter().map(move |&(job_id, resource_group_id)| { + (job_id, resource_group_id, TaskId::Index(round)) + }) + }) + .collect(); + assert_eq!(make_assignment_tuple(assignments), expected); +} + +/// Asserts that `assignments` follows the round-robin scheduling policy over `jobs` without pinning +/// down the exact rotation order: +/// +/// * Every aligned window of `jobs.len()` assignments (one full rotation pass) contains each job +/// exactly once. +/// * Each job's task indices are dispatched in FIFO order, with the matching resource group. +/// * Each job receives exactly `tasks_per_job` assignments. +/// +/// # Panics +/// +/// Panics if `assignments` violates any of the properties above. +fn assert_round_robin_property( + assignments: &[TaskAssignment], + jobs: &[(JobId, ResourceGroupId)], + tasks_per_job: usize, +) { + assert_eq!(assignments.len(), jobs.len() * tasks_per_job); + + // With equal task counts, no job leaves the rotation mid-phase, so every rotation pass must + // schedule every job exactly once. + for rotation_pass in assignments.chunks(jobs.len()) { + let scheduled_jobs: HashSet = rotation_pass + .iter() + .map(|assignment| assignment.job_id) + .collect(); + assert_eq!( + scheduled_jobs.len(), + jobs.len(), + "a rotation pass repeats or misses a job: {rotation_pass:?}", + ); + } + + let resource_groups: HashMap = jobs.iter().copied().collect(); + let mut next_task_indices: HashMap = HashMap::new(); + for assignment in assignments { + let resource_group_id = *resource_groups + .get(&assignment.job_id) + .expect("assignment belongs to a job outside the given job set"); + assert_eq!(assignment.resource_group_id, resource_group_id); + + let next_task_index = next_task_indices.entry(assignment.job_id).or_insert(0); + assert_eq!(assignment.task_id, TaskId::Index(*next_task_index)); + *next_task_index += 1; + } + + for &(job_id, _) in jobs { + assert_eq!(next_task_indices.get(&job_id).copied(), Some(tasks_per_job)); + } +} + +#[test] +fn zero_capacity_configs_are_rejected() { + let try_make_core = + |config: RoundRobinConfig| config.make_core::(); + + assert!(try_make_core(make_config(2, 2)).is_ok()); + + let zeroed_configs = [ + RoundRobinConfig { + active_job_queue_capacity: 0, + ..make_config(2, 2) + }, + RoundRobinConfig { + dispatch_queue_capacity: 0, + ..make_config(2, 2) + }, + RoundRobinConfig { + ready_task_capacity: 0, + ..make_config(2, 2) + }, + RoundRobinConfig { + commit_ready_task_capacity: 0, + ..make_config(2, 2) + }, + RoundRobinConfig { + cleanup_ready_task_capacity: 0, + ..make_config(2, 2) + }, + ]; + for config in zeroed_configs { + let result = try_make_core(config); + assert!( + matches!(result, Err(SchedulerError::InvalidConfig(_))), + "expected InvalidConfig, got {:?}", + result.err(), + ); + } +} + +/// # Returns +/// +/// A white-box scheduler wired to the given storage client and sink, to be driven by manual +/// [`RoundRobin::tick`] calls. +fn make_scheduler( + config: RoundRobinConfig, + storage_client: MockStorageClient, + sink: DispatchQueueWriter, +) -> TestScheduler { + RoundRobin::new( + DEFAULT_SESSION_ID, + storage_client, + sink, + CancellationToken::new(), + config, + ) +} + +/// Ticks the scheduler until `predicate` holds on its state. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * The predicate does not hold within [`DRAIN_DEADLINE`]. +/// * Forwards [`RoundRobin::tick`]'s return values on failure. +async fn tick_until( + scheduler: &mut TestScheduler, + predicate: impl Fn(&TestScheduler) -> bool, +) -> anyhow::Result<()> { + let deadline = tokio::time::Instant::now() + DRAIN_DEADLINE; + while !predicate(scheduler) { + if tokio::time::Instant::now() > deadline { + bail!("timed out waiting for the tick predicate to hold"); + } + scheduler.tick().await?; + tokio::task::yield_now().await; + } + Ok(()) +} + +/// Drains exactly `n` task assignments while manually ticking the scheduler to refill the dispatch +/// queue (the white-box counterpart of [`drain_n`]). +/// +/// # Returns +/// +/// The drained assignments in FIFO order on success, each paired with the session under which it +/// was dequeued. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * Fewer than `n` assignments arrive within [`DRAIN_DEADLINE`]. +/// * Forwards [`RoundRobin::tick`]'s return values on failure. +/// * Forwards [`DispatchQueueSource::dequeue`]'s return values on failure. +async fn tick_and_drain_n( + scheduler: &mut TestScheduler, + reader: &DispatchQueueReader, + n: usize, +) -> anyhow::Result> { + let deadline = tokio::time::Instant::now() + DRAIN_DEADLINE; + let mut assignments = Vec::with_capacity(n); + while assignments.len() < n { + if tokio::time::Instant::now() > deadline { + bail!( + "timed out draining assignments: got {}, expected {n}", + assignments.len(), + ); + } + scheduler.tick().await?; + while let Some((session_id, assignment)) = reader.dequeue(Duration::ZERO).await? { + assignments.push((session_id, assignment)); + } + tokio::task::yield_now().await; + } + Ok(assignments) +} + +/// Ticks the scheduler a few extra rounds and asserts that no further assignment is dispatched. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * Forwards [`RoundRobin::tick`]'s return values on failure. +/// * Forwards [`DispatchQueueSource::dequeue`]'s return values on failure. +/// +/// # Panics +/// +/// Panics if a further assignment is dispatched. +async fn assert_no_further_assignments( + scheduler: &mut TestScheduler, + reader: &DispatchQueueReader, +) -> anyhow::Result<()> { + const EXTRA_TICKS: usize = 8; + for _ in 0..EXTRA_TICKS { + scheduler.tick().await?; + tokio::task::yield_now().await; + } + let unexpected_assignment = reader.dequeue(Duration::from_millis(50)).await?; + assert_eq!(unexpected_assignment, None); + Ok(()) +} + +/// Drives the shared scenario where a finalizing batch drops one active and one pending job. +/// +/// The finalizing lane is selected by `finalizing_task_id`: commit-ready for [`TaskId::Commit`], +/// or cleanup-ready for [`TaskId::Cleanup`]. The scenario: +/// +/// 1. Buffers four jobs (two active, two pending) and freezes dispatch via a full dispatch queue. +/// 2. Delivers a finalizing batch for one active job and one pending job mid-stream. +/// 3. Asserts both jobs leave the placement state with their buffered regular tasks discarded. +/// 4. Unfreezes and asserts the drained sequence: each finalized job dispatches its finalizing task +/// exactly once and no further regular task, while the surviving jobs complete in FIFO order. +/// 5. Re-delivers regular ready tasks for the finalized jobs alongside a fresh canary job. Asserts +/// the re-delivered tasks are ignored (the finalizing gate persists after the finalizing tasks +/// are dispatched) while the canary job schedules normally. +/// +/// # Errors +/// +/// Returns an error if: +/// +/// * `finalizing_task_id` is a regular [`TaskId::Index`] task. +/// * Forwards [`tick_until`]'s return values on failure. +/// * Forwards [`tick_and_drain_n`]'s return values on failure. +/// * Forwards [`assert_no_further_assignments`]'s return values on failure. +/// +/// # Panics +/// +/// Panics if any scheduling-behavior assertion of the scenario fails. +#[allow(clippy::too_many_lines, clippy::similar_names)] +async fn assert_finalizing_ready_drops_jobs(finalizing_task_id: TaskId) -> anyhow::Result<()> { + // NOTE: We disable two linting rules for the following reasons: + // * `clippy::too_many_lines`: This test case is long, but we want to avoid breaking it into + // smaller functions since that would also make the overall flow hard to navigate. + // * `clippy::similar_names`: The linter complains about `job_a_regular`, `job_b_regular`, etc., + // but these names are fine for test cases. + const ACTIVE_JOB_QUEUE_CAPACITY: usize = 2; + const DISPATCH_QUEUE_CAPACITY: usize = 2; + const TASKS_PER_JOB: usize = 3; + const NUM_PRE_FREEZE_ASSIGNMENTS: usize = DISPATCH_QUEUE_CAPACITY; + const NUM_FINALIZED_JOBS: usize = 2; + + if matches!(finalizing_task_id, TaskId::Index(_)) { + bail!("`finalizing_task_id` must be `TaskId::Commit` or `TaskId::Cleanup`"); + } + let is_commit = finalizing_task_id == TaskId::Commit; + + // Batch order makes `job_a` and `job_b` active, `job_p` and `job_q` pending. + let jobs = make_jobs(4); + let (job_a, job_b, job_p, job_q) = (jobs[0], jobs[1], jobs[2], jobs[3]); + + let storage_client = MockStorageClient::new(DEFAULT_SESSION_ID); + storage_client.push_ready_batch( + DEFAULT_SESSION_ID, + make_ready_batch(&jobs, TASKS_PER_JOB, 0), + ); + + let (writer, reader) = create_dispatch_queue(DISPATCH_QUEUE_CAPACITY, DEFAULT_SESSION_ID); + let mut scheduler = make_scheduler( + make_config(ACTIVE_JOB_QUEUE_CAPACITY, DISPATCH_QUEUE_CAPACITY), + storage_client.clone(), + writer, + ); + + // Step 1: ingest the ready batch. The ingesting tick also dispatches exactly two assignments + // (`job_a.t0`, `job_b.t0`), filling the dispatch queue; dispatch is frozen from here on because + // the test does not drain yet. + tick_until(&mut scheduler, |scheduler| { + !scheduler.buffered_tasks.is_empty() + }) + .await?; + assert_eq!( + scheduler + .active_jobs + .keys() + .copied() + .collect::>(), + HashSet::from([job_a.0, job_b.0]), + ); + assert_eq!( + scheduler + .pending_jobs + .keys() + .copied() + .collect::>(), + HashSet::from([job_p.0, job_q.0]), + ); + + // Step 2: with dispatch frozen, deliver the finalizing batch for one active job, `job_b`, and + // one pending job, `job_q`, before any of their remaining tasks can dispatch. + let finalizing_batch = make_finalizing_batch(&[job_b, job_q], finalizing_task_id); + if is_commit { + storage_client.push_commit_ready_batch(DEFAULT_SESSION_ID, finalizing_batch); + } else { + storage_client.push_cleanup_ready_batch(DEFAULT_SESSION_ID, finalizing_batch); + } + tick_until(&mut scheduler, |scheduler| { + scheduler.finalizing_jobs.contains(&job_b.0) && scheduler.finalizing_jobs.contains(&job_q.0) + }) + .await?; + + // Step 3: both jobs left the placement state and their buffered regular tasks are discarded; + // only their finalizing assignments remain queued, in arrival order. + assert!(!scheduler.active_jobs.contains_key(&job_b.0)); + assert!(!scheduler.pending_jobs.contains_key(&job_q.0)); + assert!( + scheduler.buffered_tasks.iter().all(|&(job_id, task_id)| { + (job_id != job_b.0 && job_id != job_q.0) || !matches!(task_id, TaskId::Index(_)) + }), + "a finalized job still has buffered regular tasks", + ); + let finalizing_queue = if is_commit { + &scheduler.commit_ready_jobs + } else { + &scheduler.cleanup_ready_jobs + }; + assert_eq!( + finalizing_queue.iter().copied().collect::>(), + vec![job_b, job_q], + ); + + // Step 4: unfreeze. Every remaining assignment is accounted for below: the pre-freeze + // assignments already queued, one finalizing task per finalized job, `job_a`'s remaining + // tasks (its first task dispatched pre-freeze), and the full task set of `job_p`, which + // backfills `job_b`'s freed slot. + + // total number of assignments = pre-freeze assignments + finalizing assignments + + // remaining `job_a` assignments + full `job_p` assignments + let num_assignments = + NUM_PRE_FREEZE_ASSIGNMENTS + NUM_FINALIZED_JOBS + (TASKS_PER_JOB - 1) + TASKS_PER_JOB; + let assignments: Vec = + tick_and_drain_n(&mut scheduler, &reader, num_assignments) + .await? + .into_iter() + .map(|(_session_id, assignment)| assignment) + .collect(); + assert_no_further_assignments(&mut scheduler, &reader).await?; + assert_eq!(scheduler.buffered_tasks.len(), 0); + + let triples = make_assignment_tuple(&assignments); + + // The pre-freeze head is exactly `job_a.t0`, `job_b.t0`. + assert_eq!( + &triples[..NUM_PRE_FREEZE_ASSIGNMENTS], + &[ + (job_a.0, job_a.1, TaskId::Index(0)), + (job_b.0, job_b.1, TaskId::Index(0)), + ], + ); + + // Each finalized job's finalizing task dispatches exactly once, in arrival (FIFO) order. + let finalizing_assignments: Vec<_> = triples + .iter() + .filter(|&&(_, _, task_id)| task_id == finalizing_task_id) + .copied() + .collect(); + assert_eq!( + finalizing_assignments, + vec![ + (job_b.0, job_b.1, finalizing_task_id), + (job_q.0, job_q.1, finalizing_task_id), + ], + ); + + let job_a_tasks: Vec = triples + .iter() + .filter(|&&(job_id, ..)| job_id == job_a.0) + .map(|&(_, _, task_id)| task_id) + .collect(); + assert_eq!( + job_a_tasks, + vec![TaskId::Index(0), TaskId::Index(1), TaskId::Index(2)], + ); + + let job_b_regular: Vec<_> = triples + .iter() + .filter(|&&(job_id, _, task_id)| job_id == job_b.0 && matches!(task_id, TaskId::Index(_))) + .copied() + .collect(); + assert_eq!(job_b_regular, vec![(job_b.0, job_b.1, TaskId::Index(0))]); + + let job_p_tasks: Vec = triples + .iter() + .filter(|&&(job_id, ..)| job_id == job_p.0) + .map(|&(_, _, task_id)| task_id) + .collect(); + assert_eq!( + job_p_tasks, + vec![TaskId::Index(0), TaskId::Index(1), TaskId::Index(2)], + ); + + let job_q_regular: Vec<_> = triples + .iter() + .filter(|&&(job_id, _, task_id)| job_id == job_q.0 && matches!(task_id, TaskId::Index(_))) + .copied() + .collect(); + assert_eq!(job_q_regular, []); + + assert!(scheduler.buffered_tasks.is_empty()); + assert!(scheduler.pending_jobs.is_empty()); + assert!(scheduler.pending_job_queue.is_empty()); + assert!(scheduler.commit_ready_jobs.is_empty()); + assert!(scheduler.cleanup_ready_jobs.is_empty()); + assert_eq!(scheduler.finalizing_jobs.len(), NUM_FINALIZED_JOBS); + + assert!(scheduler.finalizing_jobs.contains(&job_b.0)); + assert!(scheduler.finalizing_jobs.contains(&job_q.0)); + + // Step 5: The finalizing gate remains active after the finalizing tasks have been dispatched, + // so re-delivered regular tasks for finalized jobs must be ignored. A fresh canary job is + // included in the same batch. Since a batch is ingested atomically within a single tick, + // successful dispatch of the canary's tasks proves that the finalized jobs' entries have + // already been processed (and ignored), rather than still being in flight. + let canary_jobs = make_jobs(1); + let mut late_batch = make_ready_batch(&[job_b, job_q], TASKS_PER_JOB, 0); + late_batch.extend(make_ready_batch(&canary_jobs, TASKS_PER_JOB, 0)); + storage_client.push_ready_batch(DEFAULT_SESSION_ID, late_batch); + + let late_assignments: Vec<_> = tick_and_drain_n(&mut scheduler, &reader, TASKS_PER_JOB) + .await? + .into_iter() + .map(|(_session_id, assignment)| assignment) + .collect(); + assert_strict_rotation(&late_assignments, &canary_jobs, TASKS_PER_JOB); + assert_no_further_assignments(&mut scheduler, &reader).await?; + + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn single_capacity_pool_schedules_jobs_serially() -> anyhow::Result<()> { + const NUM_JOBS: usize = 3; + const TASKS_PER_JOB: usize = 5; + const DUP_EVERY: usize = 3; + const DISPATCH_QUEUE_CAPACITY: usize = 32; + + let jobs = make_jobs(NUM_JOBS); + let storage_client = MockStorageClient::new(DEFAULT_SESSION_ID); + storage_client.push_ready_batch( + DEFAULT_SESSION_ID, + make_ready_batch(&jobs, TASKS_PER_JOB, DUP_EVERY), + ); + + let (writer, reader) = create_dispatch_queue(DISPATCH_QUEUE_CAPACITY, DEFAULT_SESSION_ID); + let config = make_config(1, DISPATCH_QUEUE_CAPACITY); + let (scheduler_handle, cancellation_token) = spawn_scheduler(config, storage_client, writer); + + let assignments = drain_n(&reader, NUM_JOBS * TASKS_PER_JOB).await?; + assert_no_more_assignments(&reader).await?; + + // With an active job pool of capacity 1, round-robin degenerates to serial job FIFO: the + // rotation holds a single job at a time, so each job's tasks dispatch as one consecutive + // single-job rotation, in job-arrival order. + for (segment, job) in assignments.chunks(TASKS_PER_JOB).zip(&jobs) { + assert_strict_rotation(segment, std::slice::from_ref(job), TASKS_PER_JOB); + } + + cancellation_token.cancel(); + scheduler_handle.await.expect("scheduler task panicked")?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn active_jobs_dispatch_in_round_robin_order() -> anyhow::Result<()> { + const NUM_JOBS: usize = 10; + const TASKS_PER_JOB: usize = 5; + const DUP_EVERY: usize = 4; + const DISPATCH_QUEUE_CAPACITY: usize = 32; + + let jobs = make_jobs(NUM_JOBS); + let storage_client = MockStorageClient::new(DEFAULT_SESSION_ID); + storage_client.push_ready_batch( + DEFAULT_SESSION_ID, + make_ready_batch(&jobs, TASKS_PER_JOB, DUP_EVERY), + ); + + let (writer, reader) = create_dispatch_queue(DISPATCH_QUEUE_CAPACITY, DEFAULT_SESSION_ID); + let config = make_config(NUM_JOBS, DISPATCH_QUEUE_CAPACITY); + let (scheduler_handle, cancellation_token) = spawn_scheduler(config, storage_client, writer); + + let assignments = drain_n(&reader, NUM_JOBS * TASKS_PER_JOB).await?; + assert_no_more_assignments(&reader).await?; + + // All 10 jobs fit into the active job pool, so no job ever pends and dispatch follows the + // strict rotation: task 0 of every job in batch order, then task 1 of every job, and so on. The + // exact count of 50 (with no trailing assignments) also proves the in-batch duplicates were + // deduplicated. + assert_strict_rotation(&assignments, &jobs, TASKS_PER_JOB); + + cancellation_token.cancel(); + scheduler_handle.await.expect("scheduler task panicked")?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn pending_jobs_promote_and_schedule_round_robin() -> anyhow::Result<()> { + const ACTIVE_JOB_QUEUE_CAPACITY: usize = 10; + const NUM_JOBS: usize = 20; + const TASKS_PER_JOB: usize = 5; + const DUP_EVERY: usize = 5; + const DISPATCH_QUEUE_CAPACITY: usize = 32; + + let jobs = make_jobs(NUM_JOBS); + let storage_client = MockStorageClient::new(DEFAULT_SESSION_ID); + storage_client.push_ready_batch( + DEFAULT_SESSION_ID, + make_ready_batch(&jobs, TASKS_PER_JOB, DUP_EVERY), + ); + + let (writer, reader) = create_dispatch_queue(DISPATCH_QUEUE_CAPACITY, DEFAULT_SESSION_ID); + let config = make_config(ACTIVE_JOB_QUEUE_CAPACITY, DISPATCH_QUEUE_CAPACITY); + let (scheduler_handle, cancellation_token) = spawn_scheduler(config, storage_client, writer); + + let assignments = drain_n(&reader, NUM_JOBS * TASKS_PER_JOB).await?; + assert_no_more_assignments(&reader).await?; + + let (active_jobs, pending_jobs) = jobs.split_at(ACTIVE_JOB_QUEUE_CAPACITY); + let (phase1, phase2) = assignments.split_at(ACTIVE_JOB_QUEUE_CAPACITY * TASKS_PER_JOB); + + // Phase 1: the first 10 jobs in batch order fill the active job pool and dispatch in strict + // rotation; the pending jobs must not appear while the active jobs still have tasks. + assert_strict_rotation(phase1, active_jobs, TASKS_PER_JOB); + + // Phase 2: once the active jobs exhaust, the 10 pending jobs are promoted and scheduled + // round-robin. The exact slot order after the retire-and-promote wave is an implementation + // detail of the rotation bookkeeping, so assert the round-robin property instead of one + // hard-coded sequence. + assert_round_robin_property(phase2, pending_jobs, TASKS_PER_JOB); + + cancellation_token.cancel(); + scheduler_handle.await.expect("scheduler task panicked")?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn commit_drains_each_cycle_cleanup_dispatches_once() -> anyhow::Result<()> { + const NUM_ACTIVE_JOBS: usize = 4; + const TASKS_PER_JOB: usize = 3; + const NUM_COMMIT_READY_JOBS: usize = NUM_ACTIVE_JOBS * TASKS_PER_JOB - 1; + const NUM_CLEANUP_READY_JOBS: usize = TASKS_PER_JOB; + const DISPATCH_QUEUE_CAPACITY: usize = 1024; + + let active_jobs = make_jobs(NUM_ACTIVE_JOBS); + let commit_ready_jobs = make_jobs(NUM_COMMIT_READY_JOBS); + let cleanup_ready_jobs = make_jobs(NUM_CLEANUP_READY_JOBS); + + let storage_client = MockStorageClient::new(DEFAULT_SESSION_ID); + storage_client.push_ready_batch( + DEFAULT_SESSION_ID, + make_ready_batch(&active_jobs, TASKS_PER_JOB, 0), + ); + let mut commit_ready_batch = make_finalizing_batch(&commit_ready_jobs, TaskId::Commit); + // Duplicate one commit-ready entry within the batch: it must dispatch exactly once. + commit_ready_batch.push(commit_ready_batch[0]); + storage_client.push_commit_ready_batch(DEFAULT_SESSION_ID, commit_ready_batch); + storage_client.push_cleanup_ready_batch( + DEFAULT_SESSION_ID, + make_finalizing_batch(&cleanup_ready_jobs, TaskId::Cleanup), + ); + + let (writer, reader) = create_dispatch_queue(DISPATCH_QUEUE_CAPACITY, DEFAULT_SESSION_ID); + let config = make_config(NUM_ACTIVE_JOBS, DISPATCH_QUEUE_CAPACITY); + let (scheduler_handle, cancellation_token) = spawn_scheduler(config, storage_client, writer); + + let num_assignments = + NUM_ACTIVE_JOBS * TASKS_PER_JOB + NUM_COMMIT_READY_JOBS + NUM_CLEANUP_READY_JOBS; + let assignments = drain_n(&reader, num_assignments).await?; + assert_no_more_assignments(&reader).await?; + + // The rotation is [commit lane, cleanup lane, active jobs...]. The commit lane drains up to + // `active_job_queue_capacity` (== NUM_ACTIVE_JOBS) jobs per visit, so each cycle dispatches a + // full chunk of NUM_ACTIVE_JOBS commit tasks (the final cycle dispatches the short remainder), + // one cleanup task, and one task of every active job. All lanes are drained FIFO. + let expected: Vec<(JobId, ResourceGroupId, TaskId)> = commit_ready_jobs + .chunks(NUM_ACTIVE_JOBS) + .enumerate() + .flat_map(|(round, commit_chunk)| { + let (cleanup_job_id, cleanup_resource_group_id) = cleanup_ready_jobs[round]; + commit_chunk + .iter() + .map(|&(job_id, resource_group_id)| (job_id, resource_group_id, TaskId::Commit)) + .chain(std::iter::once(( + cleanup_job_id, + cleanup_resource_group_id, + TaskId::Cleanup, + ))) + .chain(active_jobs.iter().map(move |&(job_id, resource_group_id)| { + (job_id, resource_group_id, TaskId::Index(round)) + })) + .collect::>() + }) + .collect(); + assert_eq!(make_assignment_tuple(&assignments), expected); + + cancellation_token.cancel(); + scheduler_handle.await.expect("scheduler task panicked")?; + Ok(()) +} + +#[tokio::test(flavor = "multi_thread")] +async fn cleanup_ready_drops_active_and_pending_jobs() -> anyhow::Result<()> { + assert_finalizing_ready_drops_jobs(TaskId::Cleanup).await +} + +#[tokio::test(flavor = "multi_thread")] +async fn commit_ready_drops_active_and_pending_jobs() -> anyhow::Result<()> { + assert_finalizing_ready_drops_jobs(TaskId::Commit).await +} + +#[tokio::test(flavor = "multi_thread")] +async fn session_bump_clears_buffered_tasks() -> anyhow::Result<()> { + const ACTIVE_JOB_QUEUE_CAPACITY: usize = 4; + const DISPATCH_QUEUE_CAPACITY: usize = 4; + const TASKS_PER_JOB: usize = 4; + const NEW_SESSION_ID: SessionId = DEFAULT_SESSION_ID + 1; + const NEW_TASKS_PER_JOB: usize = 2; + + let old_jobs = make_jobs(4); + let new_jobs = make_jobs(2); + + let storage_client = MockStorageClient::new(DEFAULT_SESSION_ID); + storage_client.push_ready_batch( + DEFAULT_SESSION_ID, + make_ready_batch(&old_jobs, TASKS_PER_JOB, 0), + ); + + let (writer, reader) = create_dispatch_queue(DISPATCH_QUEUE_CAPACITY, DEFAULT_SESSION_ID); + let mut scheduler = make_scheduler( + make_config(ACTIVE_JOB_QUEUE_CAPACITY, DISPATCH_QUEUE_CAPACITY), + storage_client.clone(), + writer, + ); + + // Step 1: ingest the old-session batch. The ingesting tick dispatches enough assignments to + // fill the dispatch queue (which the test never drains); the rest will stay in the buffer. + tick_until(&mut scheduler, |scheduler| { + !scheduler.buffered_tasks.is_empty() + }) + .await?; + assert_eq!(scheduler.active_jobs.len(), old_jobs.len()); + assert_eq!( + scheduler.buffered_tasks.len(), + old_jobs.len() * TASKS_PER_JOB - DISPATCH_QUEUE_CAPACITY, + ); + + // Step 2: bump the session on the storage side and deliver a batch under the new session. + storage_client.set_session(NEW_SESSION_ID); + storage_client.push_ready_batch( + NEW_SESSION_ID, + make_ready_batch(&new_jobs, NEW_TASKS_PER_JOB, 0), + ); + tick_until(&mut scheduler, |scheduler| { + scheduler.storage_session_id == NEW_SESSION_ID + && new_jobs + .iter() + .all(|(job_id, _)| scheduler.active_jobs.contains_key(job_id)) + }) + .await?; + + assert_eq!( + scheduler + .active_jobs + .keys() + .copied() + .collect::>(), + new_jobs + .iter() + .map(|&(job_id, _)| job_id) + .collect::>(), + ); + assert_eq!(scheduler.pending_jobs.len(), 0); + assert!( + scheduler.buffered_tasks.iter().all(|(job_id, _)| { + new_jobs + .iter() + .any(|&(new_job_id, _)| *job_id == new_job_id) + }), + "an old-session task survived the session bump", + ); + + // The session bump drained the dispatch queue: the frozen old-session assignments are gone, and + // draining yields exactly the new jobs' tasks in strict rotation, each paired with the new + // session. + let num_new_assignments = new_jobs.len() * NEW_TASKS_PER_JOB; + let session_stamped = tick_and_drain_n(&mut scheduler, &reader, num_new_assignments).await?; + assert_no_further_assignments(&mut scheduler, &reader).await?; + + for &(session_id, _) in &session_stamped { + assert_eq!(session_id, NEW_SESSION_ID); + } + + let assignments: Vec = session_stamped + .into_iter() + .map(|(_session_id, assignment)| assignment) + .collect(); + assert_strict_rotation(&assignments, &new_jobs, NEW_TASKS_PER_JOB); + + Ok(()) +} diff --git a/components/spider-scheduler/src/error.rs b/components/spider-scheduler/src/error.rs index 18519da2..df7c5278 100644 --- a/components/spider-scheduler/src/error.rs +++ b/components/spider-scheduler/src/error.rs @@ -47,4 +47,16 @@ pub enum SchedulerError { /// The session ID is invalid. #[error("invalid session ID: {0:?}")] InvalidSessionId(SessionId), + + #[error("internal error: {0}")] + Internal(String), + + #[error("invalid config: {0}")] + InvalidConfig(String), + + #[error("async result not ready")] + ResultNotReady, + + #[error(transparent)] + SystemTime(#[from] std::time::SystemTimeError), } diff --git a/components/spider-scheduler/src/lib.rs b/components/spider-scheduler/src/lib.rs index 9a16cd97..5aeb5a2a 100644 --- a/components/spider-scheduler/src/lib.rs +++ b/components/spider-scheduler/src/lib.rs @@ -32,6 +32,7 @@ //! ``` pub mod core; +pub mod core_impl; pub mod dispatch_queue; pub mod error; pub mod storage_client;