Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -201,9 +201,31 @@ let progress: Progress = task.recv().await.unwrap();
task.send(&Continue { should_continue: true });

// Wait for task completion
let result = task.result().await;
let result = task.result().await?;
```

To cancel from another future, clone the task controller before moving the task:

```rust,ignore
let control = task.control();
wasm_bindgen_futures::spawn_local(async move {
while let Some(progress) = task.recv::<Progress>().await {
// Handle progress.
}
});

control.terminate();
```

Termination wakes blocked `recv()` and `recv_bytes()` calls, which return `None`.
Blocked `result()` calls return `TaskError::WorkerTerminated`. Repeated calls to
`terminate()` are harmless.

Pool channel tasks exclusively lease one worker until `result()` completes. Calling
`terminate()`, or dropping an unfinished task, terminates that worker and replaces it
in the same pool slot. The slot is unavailable to the scheduler until its replacement
has initialized.

### Bundler support (Vite)
The recommended approach for Vite is to place the wasm-pack output in Vite's `publicDir`.
This keeps the glue code and WASM binary as static assets, which is required because each
Expand Down Expand Up @@ -271,6 +293,9 @@ options.precompile_wasm = Some(true);
init_worker_pool(options).await.unwrap();
```

The pool retains this compiled module and also uses it when replacing terminated
workers, so replacement does not fetch the WASM binary again.

### Idle timeout

Workers can be automatically terminated after a period of inactivity and transparently recreated when new tasks arrive. This is useful for freeing resources in applications where worker usage is intermittent:
Expand Down
169 changes: 153 additions & 16 deletions src/channel_task.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
use std::marker::PhantomData;
use std::{
cell::{Cell, RefCell},
marker::PhantomData,
rc::Rc,
};

use futures::{future::select, pin_mut};
use serde::{de::DeserializeOwned, Serialize};
use tokio::sync::oneshot;
use tokio::sync::{oneshot, watch};

use crate::{channel::Channel, convert::from_bytes};
use crate::{channel::Channel, convert::from_bytes, error::TaskError};

type LifecycleCallback = Box<dyn FnOnce()>;

/// A handle to a running channel task on a WebWorker.
///
Expand All @@ -24,38 +31,138 @@ use crate::{channel::Channel, convert::from_bytes};
/// let progress: Progress = task.recv().await.expect("progress");
/// task.send(&Continue { should_continue: true });
///
/// let result: ProcessResult = task.result().await;
/// let result: ProcessResult = task.result().await.expect("worker terminated");
/// ```
pub struct ChannelTask<R> {
channel: Channel,
result_rx: oneshot::Receiver<Vec<u8>>,
result_rx: Option<oneshot::Receiver<Vec<u8>>>,
control: ChannelTaskControl,
on_complete: Option<LifecycleCallback>,
_phantom: PhantomData<R>,
}

/// A cloneable handle for terminating a running [`ChannelTask`].
#[derive(Clone)]
pub struct ChannelTaskControl {
inner: Rc<ChannelTaskControlInner>,
}

struct ChannelTaskControlInner {
terminated: Cell<bool>,
on_terminate: RefCell<Option<LifecycleCallback>>,
close_tx: watch::Sender<bool>,
}

impl ChannelTaskControl {
fn new(on_terminate: Option<LifecycleCallback>) -> Self {
let (close_tx, _) = watch::channel(false);
Self {
inner: Rc::new(ChannelTaskControlInner {
terminated: Cell::new(false),
on_terminate: RefCell::new(on_terminate),
close_tx,
}),
}
}

/// Terminate the worker running the associated channel task.
///
/// Repeated calls are harmless.
pub fn terminate(&self) {
if self.inner.terminated.replace(true) {
return;
}

let _ = self.inner.close_tx.send(true);
if let Some(callback) = self.inner.on_terminate.borrow_mut().take() {
callback();
}
}

fn subscribe(&self) -> watch::Receiver<bool> {
self.inner.close_tx.subscribe()
}

fn is_terminated(&self) -> bool {
self.inner.terminated.get()
}

fn is_armed(&self) -> bool {
self.inner.on_terminate.borrow().is_some()
}

fn disarm(&self) {
self.inner.on_terminate.borrow_mut().take();
}

fn set_on_terminate(&self, callback: LifecycleCallback) {
*self.inner.on_terminate.borrow_mut() = Some(callback);
}
}

impl<R: DeserializeOwned> ChannelTask<R> {
/// Create a new `ChannelTask` from a channel and a result receiver.
#[doc(hidden)]
pub fn new(channel: Channel, result_rx: oneshot::Receiver<Vec<u8>>) -> Self {
Self::with_lifecycle(channel, result_rx, None, None)
}

#[doc(hidden)]
pub(crate) fn with_lifecycle(
channel: Channel,
result_rx: oneshot::Receiver<Vec<u8>>,
on_complete: Option<LifecycleCallback>,
on_terminate: Option<LifecycleCallback>,
) -> Self {
Self {
channel,
result_rx,
result_rx: Some(result_rx),
control: ChannelTaskControl::new(on_terminate),
on_complete,
_phantom: PhantomData,
}
}

pub(crate) fn with_callbacks(
mut self,
on_complete: LifecycleCallback,
on_terminate: LifecycleCallback,
) -> Self {
self.on_complete = Some(on_complete);
self.control.set_on_terminate(on_terminate);
self
}

/// Return a cloneable controller that can terminate this task externally.
pub fn control(&self) -> ChannelTaskControl {
self.control.clone()
}

/// Receive the next deserialized message from the worker.
///
/// Returns `None` if the channel's sender side has been dropped
/// (i.e., the worker has finished and closed the channel).
/// Returns `None` if the channel closes or the task is terminated.
pub async fn recv<T: DeserializeOwned>(&self) -> Option<T> {
self.channel.recv().await
let bytes = self.recv_bytes().await?;
Some(from_bytes(&bytes))
}

/// Receive raw bytes from the worker.
///
/// Returns `None` if the channel's sender side has been dropped.
/// Returns `None` if the channel closes or the task is terminated.
pub async fn recv_bytes(&self) -> Option<Box<[u8]>> {
self.channel.recv_bytes().await
if self.control.is_terminated() {
return None;
}

let mut close_rx = self.control.subscribe();
let message = self.channel.recv_bytes();
let closed = close_rx.changed();
pin_mut!(message, closed);

match select(message, closed).await {
futures::future::Either::Left((message, _)) if !self.control.is_terminated() => message,
_ => None,
}
}

/// Send a serialized message to the worker.
Expand All @@ -69,11 +176,41 @@ impl<R: DeserializeOwned> ChannelTask<R> {
}

/// Await the task's final result, consuming the `ChannelTask`.
pub async fn result(self) -> R {
let bytes = self
pub async fn result(mut self) -> Result<R, TaskError> {
let result_rx = self
.result_rx
.await
.expect("WebWorker result sender dropped");
from_bytes(&bytes)
.take()
.ok_or(TaskError::ResultAlreadyConsumed)?;
let result = result_rx.await.map_err(|_| TaskError::WorkerTerminated);

match result {
Ok(bytes) if !self.control.is_terminated() => {
self.control.disarm();
if let Some(on_complete) = self.on_complete.take() {
on_complete();
}
Ok(from_bytes(&bytes))
}
Ok(_) | Err(_) => {
self.control.terminate();
Err(TaskError::WorkerTerminated)
}
}
}

/// Terminate the worker running this task.
///
/// Pool tasks exclusively lease their worker. The pool replaces the terminated
/// worker in the same slot before making that slot schedulable again.
pub fn terminate(&self) {
self.control.terminate();
}
}

impl<R> Drop for ChannelTask<R> {
fn drop(&mut self) {
if self.control.is_armed() {
self.control.terminate();
}
}
}
12 changes: 12 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,18 @@ use thiserror::Error;
#[error("WebWorker capacity reached")]
pub struct Full;

/// This error is returned when a channel task cannot produce a result.
#[derive(Debug, Error, PartialEq, Eq)]
#[non_exhaustive]
pub enum TaskError {
/// The worker running the task was terminated before returning a result.
#[error("WebWorker was terminated")]
WorkerTerminated,
/// The channel task result was already consumed.
#[error("ChannelTask result already consumed")]
ResultAlreadyConsumed,
}

/// This error is returned during the creation of a new web worker.
/// It covers generic errors in the actual creation and import errors
/// during the initialization.
Expand Down
3 changes: 2 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,8 @@

#![allow(clippy::borrowed_box)]
pub use channel::Channel;
pub use channel_task::ChannelTask;
pub use channel_task::{ChannelTask, ChannelTaskControl};
pub use error::TaskError;
pub use global::{
has_worker_pool, init_optimized_worker_pool, init_worker_pool, worker_pool, AlreadyInitialized,
};
Expand Down
Loading
Loading