Problem
The current service execution model is too restrictive:
• Service handlers are sync-only (fn(&[Value]) -> Result)
• Handlers have no access to AMQP context (correlation_id, headers, reply_to)
• Services cannot call other services (RPC proxy)
• Services cannot emit events
• Core Nameko concepts like call_id_stack propagation are not reusable outside inbound deliveries
This prevents implementing:
• RPC gateway / proxy services
• Event-driven services
• Any async I/O inside service handlers
⸻
Goal
Refactor the internal service execution model so that a service handler:
• Is async
• Receives a context object (RpcContext)
• Can:
• call other services via RPC (ctx.rpc.call(...).await)
• emit events (ctx.events.dispatch(...).await)
• access inbound metadata (headers, correlation_id, reply_to)
This should remain compatible with Nameko RPC semantics.
⸻
High-level design
Introduce:
1. RpcContext
• Built per inbound delivery
• Contains routing info, correlation, headers
• Exposes capabilities:
• RpcCaller (async RPC client core)
• EventDispatcher
2. Async service handlers
• Replace NamekoFunction = fn(&[Value]) -> ...
• With:
Arc<dyn Fn(RpcContext, Payload) -> BoxFuture<Result<PayloadResult, GirolleError>>>
3. Internal async RPC core
• Shared reply queue
• oneshot-based correlation map
• Used by:
• standalone RpcClient
• in-service proxy calls
4. Event dispatcher (publisher-first)
• Minimal support for emitting Nameko-compatible events
⸻
Concrete implementation steps
- Introduce async handler + context types
File: girolle/src/types.rs
• Add:
• RpcContext
• RpcHandler
• BoxFuture alias
• RpcContext must include:
• service name
• method name
• correlation_id
• reply_to
• headers (FieldTable)
• RpcCaller
• EventDispatcher
⸻
- Refactor RpcTask
File: girolle/src/rpc_task.rs
• Replace inner_function: NamekoFunction
• With handler: RpcHandler
• Add helper constructor:
• RpcTask::proxy(name, target_service, target_method)
This enables zero-macro RPC proxy tasks.
⸻
- Add async RPC core
New file: girolle/src/rpc_core.rs
• Maintain:
• reply queue + consumer
• DashMap<correlation_id, oneshot::Sender<_>>
• Expose:
async fn call(
&self,
service: &str,
method: &str,
payload: Payload,
parent_ctx: Option<&RpcContext>,
) -> Result<PayloadResult, GirolleError>
• Reuse existing:
• insert_new_id_to_call_id
• header propagation logic
Existing RpcClient should wrap this core for backward compatibility.
⸻
- Add event dispatcher (publisher)
New file: girolle/src/events.rs
• Implement:
pub struct EventDispatcher { ... }
async fn dispatch<T: Serialize>(
&self,
source_service: &str,
event_type: &str,
payload: &T,
)
• Exchange naming should follow Nameko conventions ({service}.events).
⸻
- Thread capabilities through rpc_service
File: girolle/src/rpc_service.rs
• In rpc_service():
• After AMQP connection creation:
• Instantiate RpcCaller
• Instantiate EventDispatcher
• Store both in SharedData
• Inside consumer.set_delegate:
• Build RpcContext from:
• delivery.routing_key
• delivery.properties.correlation_id
• delivery.properties.reply_to
• delivery.properties.headers
• Pass RpcContext into task execution
⸻
- Refactor compute_deliver
File: girolle/src/nameko_utils.rs
• Change signature to accept RpcContext
• Replace direct function invocation with:
let result = (rpc_task.handler)(ctx, payload).await;
• Keep response publishing logic unchanged
⸻
- Update procedural macro
Macro crate
• Generate async handlers returning RpcHandler
• Support signatures:
• async fn foo(arg1, arg2) -> T
• async fn foo(ctx: RpcContext, arg1, arg2) -> Result<T, E>
• Serialize responses as Nameko-compatible:
{"result":..., "error":...}
⸻
Acceptance criteria
• A service handler can call another service via ctx.rpc.call(...)
• A service handler can emit events via ctx.events.dispatch(...)
• RPC proxy services can be implemented without custom glue code
• Existing public APIs remain functional (or are clearly deprecated)
• No blocking (block_on) inside the library runtime
⸻
Follow-ups (out of scope for this issue)
• Event consumers
• Middleware / interceptors
• Tracing integration
• Full Nameko error object parity
Problem
The current service execution model is too restrictive:
• Service handlers are sync-only (fn(&[Value]) -> Result)
• Handlers have no access to AMQP context (correlation_id, headers, reply_to)
• Services cannot call other services (RPC proxy)
• Services cannot emit events
• Core Nameko concepts like call_id_stack propagation are not reusable outside inbound deliveries
This prevents implementing:
• RPC gateway / proxy services
• Event-driven services
• Any async I/O inside service handlers
⸻
Goal
Refactor the internal service execution model so that a service handler:
• Is async
• Receives a context object (RpcContext)
• Can:
• call other services via RPC (ctx.rpc.call(...).await)
• emit events (ctx.events.dispatch(...).await)
• access inbound metadata (headers, correlation_id, reply_to)
This should remain compatible with Nameko RPC semantics.
⸻
High-level design
Introduce:
1. RpcContext
• Built per inbound delivery
• Contains routing info, correlation, headers
• Exposes capabilities:
• RpcCaller (async RPC client core)
• EventDispatcher
2. Async service handlers
• Replace NamekoFunction = fn(&[Value]) -> ...
• With:
Arc<dyn Fn(RpcContext, Payload) -> BoxFuture<Result<PayloadResult, GirolleError>>>
⸻
Concrete implementation steps
File: girolle/src/types.rs
• Add:
• RpcContext
• RpcHandler
• BoxFuture alias
• RpcContext must include:
• service name
• method name
• correlation_id
• reply_to
• headers (FieldTable)
• RpcCaller
• EventDispatcher
⸻
File: girolle/src/rpc_task.rs
• Replace inner_function: NamekoFunction
• With handler: RpcHandler
• Add helper constructor:
• RpcTask::proxy(name, target_service, target_method)
This enables zero-macro RPC proxy tasks.
⸻
New file: girolle/src/rpc_core.rs
• Maintain:
• reply queue + consumer
• DashMap<correlation_id, oneshot::Sender<_>>
• Expose:
async fn call(
&self,
service: &str,
method: &str,
payload: Payload,
parent_ctx: Option<&RpcContext>,
) -> Result<PayloadResult, GirolleError>
Existing RpcClient should wrap this core for backward compatibility.
⸻
New file: girolle/src/events.rs
• Implement:
pub struct EventDispatcher { ... }
async fn dispatch<T: Serialize>(
&self,
source_service: &str,
event_type: &str,
payload: &T,
)
⸻
File: girolle/src/rpc_service.rs
• In rpc_service():
• After AMQP connection creation:
• Instantiate RpcCaller
• Instantiate EventDispatcher
• Store both in SharedData
• Inside consumer.set_delegate:
• Build RpcContext from:
• delivery.routing_key
• delivery.properties.correlation_id
• delivery.properties.reply_to
• delivery.properties.headers
• Pass RpcContext into task execution
⸻
File: girolle/src/nameko_utils.rs
• Change signature to accept RpcContext
• Replace direct function invocation with:
let result = (rpc_task.handler)(ctx, payload).await;
⸻
Macro crate
• Generate async handlers returning RpcHandler
• Support signatures:
• async fn foo(arg1, arg2) -> T
• async fn foo(ctx: RpcContext, arg1, arg2) -> Result<T, E>
• Serialize responses as Nameko-compatible:
{"result":..., "error":...}
⸻
Acceptance criteria
• A service handler can call another service via ctx.rpc.call(...)
• A service handler can emit events via ctx.events.dispatch(...)
• RPC proxy services can be implemented without custom glue code
• Existing public APIs remain functional (or are clearly deprecated)
• No blocking (block_on) inside the library runtime
⸻
Follow-ups (out of scope for this issue)
• Event consumers
• Middleware / interceptors
• Tracing integration
• Full Nameko error object parity