Skip to content

feat: Add Worker transport for isolated task execution #92

@avrabe

Description

@avrabe

Summary

Add a Worker transport abstraction that enables running MCP handlers in isolated tokio tasks with channel-based communication.

Motivation

Useful for:

  • Parallel tool execution with isolation
  • Custom runtime architectures
  • Background processing with cancellation support
  • Sandboxed execution contexts

Proposed Implementation

/// Worker trait - implement to define custom worker behavior
#[async_trait]
pub trait Worker: Send + 'static {
    type Error: std::error::Error + Send;
    
    async fn run(self, ctx: WorkerContext) -> Result<(), Self::Error>;
}

/// Context provided to workers
pub struct WorkerContext {
    rx: mpsc::Receiver<JsonRpcMessage>,
    tx: mpsc::Sender<JsonRpcMessage>,
    cancellation: CancellationToken,
}

impl WorkerContext {
    pub async fn recv(&mut self) -> Option<JsonRpcMessage>;
    pub async fn send(&self, msg: JsonRpcMessage) -> Result<(), SendError>;
    pub fn cancellation_token(&self) -> CancellationToken;
}

/// Transport wrapper that spawns worker on tokio task
pub struct WorkerTransport {
    tx: mpsc::Sender<JsonRpcMessage>,
    rx: mpsc::Receiver<JsonRpcMessage>,
    handle: JoinHandle<Result<(), WorkerError>>,
    cancellation: CancellationToken,
}

impl WorkerTransport {
    pub fn spawn<W: Worker>(worker: W, buffer_size: usize) -> Self;
    pub async fn cancel(&self);
}

impl Transport for WorkerTransport {
    // Channel-based send/receive
}

Use Case Example

struct ToolWorker {
    tool_registry: Arc<ToolRegistry>,
}

#[async_trait]
impl Worker for ToolWorker {
    type Error = ToolError;
    
    async fn run(self, mut ctx: WorkerContext) -> Result<(), Self::Error> {
        while let Some(msg) = ctx.recv().await {
            let result = self.tool_registry.handle(msg).await;
            ctx.send(result).await?;
        }
        Ok(())
    }
}

// Spawn isolated tool executor
let transport = WorkerTransport::spawn(ToolWorker { tool_registry }, 32);

Code Reuse

Existing Reuse
Transport trait Interface
CancellationToken From tokio-util
JSON-RPC types From mcp-protocol

Priority

Medium - enables advanced architectures but not required for basic functionality.

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions