Introduce async handlers with RpcContext for RPC proxying and event dispatch#152
Introduce async handlers with RpcContext for RPC proxying and event dispatch#152
Conversation
…dlers Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
…ation Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
…ern matching Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
PR Compliance Guide 🔍Below is a summary of compliance checks for this PR:
Compliance status legend🟢 - Fully Compliant🟡 - Partial Compliant 🔴 - Not Compliant ⚪ - Requires Further Human Verification 🏷️ - Compliance label |
|||||||||||||||||||||||||
PR Code Suggestions ✨Explore these optional code suggestions:
|
||||||||||||||||||
User description
Service handlers were sync-only, had no access to AMQP metadata, and couldn't call other services or emit events. This prevented implementing RPC gateways and event-driven patterns.
Changes
Core Types & Context
RpcContextcontaining correlation_id, headers, reply_to, routing_key, call_id_stackAsyncNamekoFunctiontype:Arc<dyn Fn(Arc<RpcContext>, Vec<Value>) -> BoxFuture<Result>>RpcCallerandEventDispatchercapabilities to context (foundation, not fully implemented)Handler Support
RpcTaskwithRpcTaskHandlerenum supporting bothSync(NamekoFunction)andAsync(AsyncNamekoFunction)compute_deliverto buildRpcContextfrom delivery metadata and dispatch to appropriate handler type#[girolle]macro to detect async functions withArc<RpcContext>first parameter via AST analysisExecution Model
RpcContextper inbound delivery with full AMQP metadataUsage
New async handler with context:
Traditional sync handler still works:
Implementation Notes
Arc<RpcContext>detection, handles qualified pathsOriginal prompt
This section details on the original issue you should resolve
<issue_title>Refactor service execution model to support RPC proxying and events (introduce async handlers + RpcContext)</issue_title>
<issue_description>Problem
The current service execution model is too restrictive:
• Service handlers are sync-only (fn(&[Value]) -> Result)
• Handlers have no access to AMQP context (correlation_id, headers, reply_to)
• Services cannot call other services (RPC proxy)
• Services cannot emit events
• Core Nameko concepts like call_id_stack propagation are not reusable outside inbound deliveries
This prevents implementing:
• RPC gateway / proxy services
• Event-driven services
• Any async I/O inside service handlers
⸻
Goal
Refactor the internal service execution model so that a service handler:
• Is async
• Receives a context object (RpcContext)
• Can:
• call other services via RPC (ctx.rpc.call(...).await)
• emit events (ctx.events.dispatch(...).await)
• access inbound metadata (headers, correlation_id, reply_to)
This should remain compatible with Nameko RPC semantics.
⸻
High-level design
Introduce:
1. RpcContext
• Built per inbound delivery
• Contains routing info, correlation, headers
• Exposes capabilities:
• RpcCaller (async RPC client core)
• EventDispatcher
2. Async service handlers
• Replace NamekoFunction = fn(&[Value]) -> ...
• With:
Arc<dyn Fn(RpcContext, Payload) -> BoxFuture<Result<PayloadResult, GirolleError>>>
⸻
Concrete implementation steps
File: girolle/src/types.rs
• Add:
• RpcContext
• RpcHandler
• BoxFuture alias
• RpcContext must include:
• service name
• method name
• correlation_id
• reply_to
• headers (FieldTable)
• RpcCaller
• EventDispatcher
⸻
File: girolle/src/rpc_task.rs
• Replace inner_function: NamekoFunction
• With handler: RpcHandler
• Add helper constructor:
• RpcTask::proxy(name, target_service, target_method)
This enables zero-macro RPC proxy tasks.
⸻
New file: girolle/src/rpc_core.rs
• Maintain:
• reply queue + consumer
• DashMap<correlation_id, oneshot::Sender<_>>
• Expose:
async fn call(
&self,
service: &str,
method: &str,
payload: Payload,
parent_ctx: Option<&RpcContext>,
) -> Result<PayloadResult, GirolleError>
Existing RpcClient should wrap this core for backward compatibility.
⸻
New file: girolle/src/events.rs
• Implement:
pub struct EventDispatcher { ... }
async fn dispatch<T: Serialize>(
&self,
source_service: &str,
event_type: &str,
payload: &T,
)
⸻
File: girolle/src/rpc_service.rs
• In rpc_service():
• After AMQP connection creation:
• Instantiate RpcCaller
• Instantiate EventDispatcher
• Store both in SharedData
• Inside consumer.set_delegate:
• Build RpcContext from:
• delivery.routing_key
• delivery.properties.correlation_id
• delivery.properties.reply_to
• delivery.properties.headers
• Pass RpcContext into task execution
⸻
File: girolle/src/nameko_utils.rs
• Change signature to accept RpcContext
• Replace direct function invocation with:
let result = (rpc_task.handler)(ctx, payload).await;
⸻
Macro crate
• Generate async handlers returning RpcHandler
• Support signatures:
• async fn foo(arg1, arg2) -> T
• async fn foo(ctx: RpcContext, arg1, arg2) -> Result<T, E>
• Serialize responses as Nameko-compatible:
{"result":..., "error":...}
⸻
Acceptance criteria
• A service handler can call another service via ctx.rpc.call(...)
• A service handler can emit events via ctx.events.dispatch(...)
• RPC proxy services can be implemented without custom glue code
• Existing public APIs remain functional (or are clearly deprecated)
• No blocking (block_on) inside the library runtime
⸻
Follow-ups (out of scope for this issue)
• Event consumers
• Middleware / interceptors
• Tracing integration
• Full Nameko error object parity</issue_description>
Comments on the Issue (you are @copilot in this section)
💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.
PR Type
Enhancement
Description
Introduce
RpcContextwith AMQP metadata and service capabilitiesAdd async handler support with
Arc<RpcContext>parameter detectionImplement
RpcCallerandEventDispatcherfoundation for RPC proxyingUpdate
#[girolle]macro to detect and generate async handlers via AST analysisMaintain full backward compatibility with existing sync handlers
Diagram Walkthrough
flowchart LR A["AMQP Delivery"] -->|"Build metadata"| B["RpcContext"] B -->|"Pass to handler"| C{"Handler Type"} C -->|"Sync"| D["Direct call"] C -->|"Async"| E["Await with context"] D --> F["Publish result"] E --> F B -->|"Provides"| G["RpcCaller"] B -->|"Provides"| H["EventDispatcher"]File Walkthrough
8 files
New RpcContext with metadata and capabilitiesAdd AsyncNamekoFunction type for async handlersAdd RpcTaskHandler enum supporting sync and asyncUpdate compute_deliver to dispatch handler typesBuild RpcContext per delivery and pass to handlersExport RpcContext and related typesExport async handler types in preludeDetect async functions and RpcContext parameter4 files
Example async handler accessing AMQP metadataExample demonstrating RPC proxy and event patternsComprehensive guide for async handlers with contextImplementation details and future work guidance1 files
Register new async handler examples