Skip to content

Refactor service execution model to support RPC proxying and events (introduce async handlers + RpcContext) #151

@doubleailes

Description

@doubleailes

Problem

The current service execution model is too restrictive:
• Service handlers are sync-only (fn(&[Value]) -> Result)
• Handlers have no access to AMQP context (correlation_id, headers, reply_to)
• Services cannot call other services (RPC proxy)
• Services cannot emit events
• Core Nameko concepts like call_id_stack propagation are not reusable outside inbound deliveries

This prevents implementing:
• RPC gateway / proxy services
• Event-driven services
• Any async I/O inside service handlers

Goal

Refactor the internal service execution model so that a service handler:
• Is async
• Receives a context object (RpcContext)
• Can:
• call other services via RPC (ctx.rpc.call(...).await)
• emit events (ctx.events.dispatch(...).await)
• access inbound metadata (headers, correlation_id, reply_to)

This should remain compatible with Nameko RPC semantics.

High-level design

Introduce:
1. RpcContext
• Built per inbound delivery
• Contains routing info, correlation, headers
• Exposes capabilities:
• RpcCaller (async RPC client core)
• EventDispatcher
2. Async service handlers
• Replace NamekoFunction = fn(&[Value]) -> ...
• With:

Arc<dyn Fn(RpcContext, Payload) -> BoxFuture<Result<PayloadResult, GirolleError>>>

3.	Internal async RPC core
•	Shared reply queue
•	oneshot-based correlation map
•	Used by:
•	standalone RpcClient
•	in-service proxy calls
4.	Event dispatcher (publisher-first)
•	Minimal support for emitting Nameko-compatible events

Concrete implementation steps

  1. Introduce async handler + context types
    File: girolle/src/types.rs
    • Add:
    • RpcContext
    • RpcHandler
    • BoxFuture alias
    • RpcContext must include:
    • service name
    • method name
    • correlation_id
    • reply_to
    • headers (FieldTable)
    • RpcCaller
    • EventDispatcher

  1. Refactor RpcTask
    File: girolle/src/rpc_task.rs
    • Replace inner_function: NamekoFunction
    • With handler: RpcHandler
    • Add helper constructor:
    • RpcTask::proxy(name, target_service, target_method)

This enables zero-macro RPC proxy tasks.

  1. Add async RPC core
    New file: girolle/src/rpc_core.rs
    • Maintain:
    • reply queue + consumer
    • DashMap<correlation_id, oneshot::Sender<_>>
    • Expose:

async fn call(
&self,
service: &str,
method: &str,
payload: Payload,
parent_ctx: Option<&RpcContext>,
) -> Result<PayloadResult, GirolleError>

•	Reuse existing:
•	insert_new_id_to_call_id
•	header propagation logic

Existing RpcClient should wrap this core for backward compatibility.

  1. Add event dispatcher (publisher)
    New file: girolle/src/events.rs
    • Implement:

pub struct EventDispatcher { ... }

async fn dispatch<T: Serialize>(
&self,
source_service: &str,
event_type: &str,
payload: &T,
)

•	Exchange naming should follow Nameko conventions ({service}.events).

  1. Thread capabilities through rpc_service
    File: girolle/src/rpc_service.rs
    • In rpc_service():
    • After AMQP connection creation:
    • Instantiate RpcCaller
    • Instantiate EventDispatcher
    • Store both in SharedData
    • Inside consumer.set_delegate:
    • Build RpcContext from:
    • delivery.routing_key
    • delivery.properties.correlation_id
    • delivery.properties.reply_to
    • delivery.properties.headers
    • Pass RpcContext into task execution

  1. Refactor compute_deliver
    File: girolle/src/nameko_utils.rs
    • Change signature to accept RpcContext
    • Replace direct function invocation with:

let result = (rpc_task.handler)(ctx, payload).await;

•	Keep response publishing logic unchanged

  1. Update procedural macro
    Macro crate
    • Generate async handlers returning RpcHandler
    • Support signatures:
    • async fn foo(arg1, arg2) -> T
    • async fn foo(ctx: RpcContext, arg1, arg2) -> Result<T, E>
    • Serialize responses as Nameko-compatible:

{"result":..., "error":...}

Acceptance criteria
• A service handler can call another service via ctx.rpc.call(...)
• A service handler can emit events via ctx.events.dispatch(...)
• RPC proxy services can be implemented without custom glue code
• Existing public APIs remain functional (or are clearly deprecated)
• No blocking (block_on) inside the library runtime

Follow-ups (out of scope for this issue)
• Event consumers
• Middleware / interceptors
• Tracing integration
• Full Nameko error object parity

Metadata

Metadata

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions