Skip to content

Introduce async handlers with RpcContext for RPC proxying and event dispatch#152

Open
Copilot wants to merge 7 commits intomainfrom
copilot/refactor-service-execution-model
Open

Introduce async handlers with RpcContext for RPC proxying and event dispatch#152
Copilot wants to merge 7 commits intomainfrom
copilot/refactor-service-execution-model

Conversation

Copy link

Copilot AI commented Jan 2, 2026

User description

Service handlers were sync-only, had no access to AMQP metadata, and couldn't call other services or emit events. This prevented implementing RPC gateways and event-driven patterns.

Changes

Core Types & Context

  • Added RpcContext containing correlation_id, headers, reply_to, routing_key, call_id_stack
  • Added AsyncNamekoFunction type: Arc<dyn Fn(Arc<RpcContext>, Vec<Value>) -> BoxFuture<Result>>
  • Added RpcCaller and EventDispatcher capabilities to context (foundation, not fully implemented)

Handler Support

  • Extended RpcTask with RpcTaskHandler enum supporting both Sync(NamekoFunction) and Async(AsyncNamekoFunction)
  • Updated compute_deliver to build RpcContext from delivery metadata and dispatch to appropriate handler type
  • Modified #[girolle] macro to detect async functions with Arc<RpcContext> first parameter via AST analysis

Execution Model

  • Service now builds RpcContext per inbound delivery with full AMQP metadata
  • Async handlers receive context and can access call stack, headers, correlation info
  • Sync handlers unchanged - direct function call with zero overhead

Usage

New async handler with context:

#[girolle]
async fn proxy_handler(ctx: Arc<RpcContext>, name: String) -> String {
    let correlation_id = &ctx.correlation_id;
    let call_stack = ctx.get_call_id_stack();
    
    // Foundation ready for:
    // ctx.rpc.call("other_service", "method", payload).await
    // ctx.events.dispatch("event.type", data).await
    
    format!("Hello, {}", name)
}

Traditional sync handler still works:

#[girolle]
fn sync_handler(name: String) -> String {
    format!("Hello, {}", name)
}

Implementation Notes

  • Macro uses proper AST pattern matching for Arc<RpcContext> detection, handles qualified paths
  • MutexGuard properly scoped to prevent holding across await points
  • Context cloned efficiently via Arc
  • 5 new unit tests covering context creation, call_id_stack parsing, handler type dispatch
Original prompt

This section details on the original issue you should resolve

<issue_title>Refactor service execution model to support RPC proxying and events (introduce async handlers + RpcContext)</issue_title>
<issue_description>Problem

The current service execution model is too restrictive:
• Service handlers are sync-only (fn(&[Value]) -> Result)
• Handlers have no access to AMQP context (correlation_id, headers, reply_to)
• Services cannot call other services (RPC proxy)
• Services cannot emit events
• Core Nameko concepts like call_id_stack propagation are not reusable outside inbound deliveries

This prevents implementing:
• RPC gateway / proxy services
• Event-driven services
• Any async I/O inside service handlers

Goal

Refactor the internal service execution model so that a service handler:
• Is async
• Receives a context object (RpcContext)
• Can:
• call other services via RPC (ctx.rpc.call(...).await)
• emit events (ctx.events.dispatch(...).await)
• access inbound metadata (headers, correlation_id, reply_to)

This should remain compatible with Nameko RPC semantics.

High-level design

Introduce:
1. RpcContext
• Built per inbound delivery
• Contains routing info, correlation, headers
• Exposes capabilities:
• RpcCaller (async RPC client core)
• EventDispatcher
2. Async service handlers
• Replace NamekoFunction = fn(&[Value]) -> ...
• With:

Arc<dyn Fn(RpcContext, Payload) -> BoxFuture<Result<PayloadResult, GirolleError>>>

3.	Internal async RPC core
•	Shared reply queue
•	oneshot-based correlation map
•	Used by:
•	standalone RpcClient
•	in-service proxy calls
4.	Event dispatcher (publisher-first)
•	Minimal support for emitting Nameko-compatible events

Concrete implementation steps

  1. Introduce async handler + context types
    File: girolle/src/types.rs
    • Add:
    • RpcContext
    • RpcHandler
    • BoxFuture alias
    • RpcContext must include:
    • service name
    • method name
    • correlation_id
    • reply_to
    • headers (FieldTable)
    • RpcCaller
    • EventDispatcher

  1. Refactor RpcTask
    File: girolle/src/rpc_task.rs
    • Replace inner_function: NamekoFunction
    • With handler: RpcHandler
    • Add helper constructor:
    • RpcTask::proxy(name, target_service, target_method)

This enables zero-macro RPC proxy tasks.

  1. Add async RPC core
    New file: girolle/src/rpc_core.rs
    • Maintain:
    • reply queue + consumer
    • DashMap<correlation_id, oneshot::Sender<_>>
    • Expose:

async fn call(
&self,
service: &str,
method: &str,
payload: Payload,
parent_ctx: Option<&RpcContext>,
) -> Result<PayloadResult, GirolleError>

•	Reuse existing:
•	insert_new_id_to_call_id
•	header propagation logic

Existing RpcClient should wrap this core for backward compatibility.

  1. Add event dispatcher (publisher)
    New file: girolle/src/events.rs
    • Implement:

pub struct EventDispatcher { ... }

async fn dispatch<T: Serialize>(
&self,
source_service: &str,
event_type: &str,
payload: &T,
)

•	Exchange naming should follow Nameko conventions ({service}.events).

  1. Thread capabilities through rpc_service
    File: girolle/src/rpc_service.rs
    • In rpc_service():
    • After AMQP connection creation:
    • Instantiate RpcCaller
    • Instantiate EventDispatcher
    • Store both in SharedData
    • Inside consumer.set_delegate:
    • Build RpcContext from:
    • delivery.routing_key
    • delivery.properties.correlation_id
    • delivery.properties.reply_to
    • delivery.properties.headers
    • Pass RpcContext into task execution

  1. Refactor compute_deliver
    File: girolle/src/nameko_utils.rs
    • Change signature to accept RpcContext
    • Replace direct function invocation with:

let result = (rpc_task.handler)(ctx, payload).await;

•	Keep response publishing logic unchanged

  1. Update procedural macro
    Macro crate
    • Generate async handlers returning RpcHandler
    • Support signatures:
    • async fn foo(arg1, arg2) -> T
    • async fn foo(ctx: RpcContext, arg1, arg2) -> Result<T, E>
    • Serialize responses as Nameko-compatible:

{"result":..., "error":...}

Acceptance criteria
• A service handler can call another service via ctx.rpc.call(...)
• A service handler can emit events via ctx.events.dispatch(...)
• RPC proxy services can be implemented without custom glue code
• Existing public APIs remain functional (or are clearly deprecated)
• No blocking (block_on) inside the library runtime

Follow-ups (out of scope for this issue)
• Event consumers
• Middleware / interceptors
• Tracing integration
• Full Nameko error object parity</issue_description>

Comments on the Issue (you are @copilot in this section)


💬 We'd love your input! Share your thoughts on Copilot coding agent in our 2 minute survey.


PR Type

Enhancement


Description

  • Introduce RpcContext with AMQP metadata and service capabilities

  • Add async handler support with Arc<RpcContext> parameter detection

  • Implement RpcCaller and EventDispatcher foundation for RPC proxying

  • Update #[girolle] macro to detect and generate async handlers via AST analysis

  • Maintain full backward compatibility with existing sync handlers


Diagram Walkthrough

flowchart LR
  A["AMQP Delivery"] -->|"Build metadata"| B["RpcContext"]
  B -->|"Pass to handler"| C{"Handler Type"}
  C -->|"Sync"| D["Direct call"]
  C -->|"Async"| E["Await with context"]
  D --> F["Publish result"]
  E --> F
  B -->|"Provides"| G["RpcCaller"]
  B -->|"Provides"| H["EventDispatcher"]
Loading

File Walkthrough

Relevant files
Enhancement
8 files
rpc_context.rs
New RpcContext with metadata and capabilities                       
+373/-0 
types.rs
Add AsyncNamekoFunction type for async handlers                   
+22/-0   
rpc_task.rs
Add RpcTaskHandler enum supporting sync and async               
+86/-4   
nameko_utils.rs
Update compute_deliver to dispatch handler types                 
+22/-5   
rpc_service.rs
Build RpcContext per delivery and pass to handlers             
+19/-1   
lib.rs
Export RpcContext and related types                                           
+2/-0     
prelude.rs
Export async handler types in prelude                                       
+2/-1     
entry.rs
Detect async functions and RpcContext parameter                   
+83/-11 
Documentation
4 files
async_service_with_context.rs
Example async handler accessing AMQP metadata                       
+41/-0   
rpc_proxy_demo.rs
Example demonstrating RPC proxy and event patterns             
+56/-0   
ASYNC_HANDLERS.md
Comprehensive guide for async handlers with context           
+244/-0 
IMPLEMENTATION_NOTES.md
Implementation details and future work guidance                   
+246/-0 
Configuration changes
1 files
Cargo.toml
Register new async handler examples                                           
+8/-0     

Copilot AI and others added 6 commits January 2, 2026 05:30
…dlers

Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
…ation

Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
…ern matching

Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Co-authored-by: doubleailes <23233470+doubleailes@users.noreply.github.com>
Copilot AI changed the title [WIP] Refactor service execution model to support RPC proxying and events Introduce async handlers with RpcContext for RPC proxying and event dispatch Jan 2, 2026
Copilot AI requested a review from doubleailes January 2, 2026 05:46
@doubleailes doubleailes marked this pull request as ready for review January 2, 2026 08:08
@qodo-code-review
Copy link

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🔴
Sensitive info exposure

Description: RpcCaller::call() injects self.config.AMQP_URI() into the outbound AMQP headers as
nameko.AMQP_URI, which can leak broker credentials (e.g., amqp://user:password@host/...)
to any consumer/service able to read message headers.
rpc_context.rs [200-210]

Referred Code
let mut headers: BTreeMap<ShortString, AMQPValue> = BTreeMap::new();
headers.insert(
    "nameko.AMQP_URI".into(),
    AMQPValue::LongString(self.config.AMQP_URI().into()),
);
headers.insert(
    "nameko.call_id_stack".into(),
    AMQPValue::FieldArray(FieldArray::from(vec![AMQPValue::LongString(
        LongString::from(self.identifier.to_string().as_bytes()),
    )])),
);
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

🔴
Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status: 🏷️
Panic on missing context: Async handler dispatch uses expect("RpcContext required for async handlers"),
which can panic and crash the service instead of returning a handled error response.

Referred Code
RpcTaskHandler::Async(async_fn) => {
    // Execute async handler with context
    let ctx = rpc_context.expect("RpcContext required for async handlers");
    async_fn(ctx, buildted_args).await
}

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status: 🏷️
Missing audit logging: New cross-service capabilities (RpcCaller::call and EventDispatcher::dispatch) perform
potentially critical actions without audit-style logs including actor identity, timestamp,
action description, and outcome.

Referred Code
pub async fn call(
    &self,
    service_name: &str,
    method_name: &str,
    payload: Payload,
) -> GirolleResult<Value> {
    // Clone the channel before the async operations to avoid holding lock across await
    let channel = {
        let services = self.services.lock().unwrap();
        services
            .get(service_name)
            .ok_or_else(|| {
                GirolleError::ServiceMissingError(format!("Service {} not registered", service_name))
            })?
            .clone()
    };

    let routing_key = format!("{}.{}", service_name, method_name);
    let correlation_id = Uuid::new_v4();

    let mut headers: BTreeMap<ShortString, AMQPValue> = BTreeMap::new();


 ... (clipped 85 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: 🏷️
Unclear variable names: Newly added code continues using non-descriptive/typo-prone identifiers like
buildted_args, reducing readability and self-documentation for the updated handler
dispatch logic.

Referred Code
// Build the arguments
let buildted_args = match build_inputs_fn_service(&rpc_task_struct.args, incomming_data) {
    Ok(result) => result,
    Err(error) => {
        publish(
            rpc_channel,
            PayloadResult::from_error(error.convert()),
            properties,
            reply_to_id,
            rpc_exchange,
        )
        .await
        .expect("Error publishing");
        return;
    }
};

// Execute handler based on type
let result = match &rpc_task_struct.handler {
    RpcTaskHandler::Sync(fn_service) => {
        // Execute synchronous handler


 ... (clipped 8 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: 🏷️
Unstructured warning log: EventDispatcher::dispatch emits an unstructured tracing::warn! message without ensuring
structured fields and without confirming payload-sensitive data will never be logged in
future implementations.

Referred Code
pub async fn dispatch(
    &self,
    _event_type: &str,
    _payload: Value,
) -> GirolleResult<()> {
    // TODO: Implement event dispatching
    // This would publish to an event exchange
    tracing::warn!("EventDispatcher::dispatch is not yet fully implemented");
    Ok(())
}

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status: 🏷️
Unvalidated routing key parts: RpcCaller::call builds an AMQP routing_key directly from external service_name and
method_name without visible validation/sanitization, which may allow unintended routing
patterns depending on broker conventions.

Referred Code
pub async fn call(
    &self,
    service_name: &str,
    method_name: &str,
    payload: Payload,
) -> GirolleResult<Value> {
    // Clone the channel before the async operations to avoid holding lock across await
    let channel = {
        let services = self.services.lock().unwrap();
        services
            .get(service_name)
            .ok_or_else(|| {
                GirolleError::ServiceMissingError(format!("Service {} not registered", service_name))
            })?
            .clone()
    };

    let routing_key = format!("{}.{}", service_name, method_name);
    let correlation_id = Uuid::new_v4();

Learn more about managing compliance generic rules or creating your own custom rules

Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Use async-friendly reply waiting

In wait_for_reply, substitute the blocking std::sync::Condvar with an
asynchronous equivalent like tokio::sync::Notify to prevent blocking the async
runtime.

girolle/src/rpc_context.rs [238-248]

+// Fields replaced:
+// replies: Arc<tokio::sync::Mutex<HashMap<String, PayloadResult>>>
+// notify: Arc<tokio::sync::Notify>
 async fn wait_for_reply(&self, correlation_id: &str) -> GirolleResult<Value> {
-    let mut replies = self.replies.lock().unwrap();
-    let result_reply = loop {
-        if let Some(value) = replies.get(correlation_id) {
-            break value.clone();
-        } else {
-            replies = self.not_empty.wait(replies).unwrap();
+    loop {
+        let mut guard = self.replies.lock().await;
+        if let Some(value) = guard.remove(correlation_id) {
+            return match value.get_error() {
+                Some(err) => Err(err.convert_to_girolle_error()),
+                None => Ok(value.get_result()),
+            };
         }
-    };
-    replies.remove(correlation_id);
-    ...
+        drop(guard);
+        self.notify.notified().await;
+    }
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 10

__

Why: This suggestion correctly identifies a critical bug where a blocking std::sync::Condvar is used in an async context, which can lead to deadlocks, and provides a correct async-native solution.

High
Handle missing context gracefully

Replace .expect() with proper error propagation when rpc_context is missing for
an async handler to avoid panics and improve service stability.

girolle/src/nameko_utils.rs [281-291]

 let result = match &rpc_task_struct.handler {
-    RpcTaskHandler::Sync(fn_service) => {
-        // Execute synchronous handler
-        fn_service(&buildted_args)
-    }
+    RpcTaskHandler::Sync(fn_service) => fn_service(&buildted_args),
     RpcTaskHandler::Async(async_fn) => {
-        // Execute async handler with context
-        let ctx = rpc_context.expect("RpcContext required for async handlers");
-        async_fn(ctx, buildted_args).await
+        // Execute async handler with context, return error if missing
+        if let Some(ctx) = rpc_context.clone() {
+            async_fn(ctx, buildted_args).await
+        } else {
+            Err(GirolleError::ArgumentsError(
+                "Missing RpcContext for async handler".into(),
+            ))
+        }
     }
 };
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly points out that using .expect() can cause a panic, and proposes a more robust error-handling approach that prevents the service from crashing.

Medium
General
Default missing correlation_id

Instead of panicking on a missing correlation_id, handle it gracefully by
logging a warning and generating a fallback UUID to ensure service robustness.

girolle/src/rpc_service.rs [311]

-let correlation_id = get_id(delivery.properties.correlation_id(), "correlation_id");
+let correlation_id = delivery
+    .properties
+    .correlation_id()
+    .map(|s| s.to_string())
+    .unwrap_or_else(|| {
+        error!("Missing correlation_id in delivery properties");
+        Uuid::new_v4().to_string()
+    });
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies that get_id will panic if the correlation_id is missing and proposes a robust fallback mechanism, preventing potential service crashes.

Medium
Handle potential mutex poisoning gracefully

In register_service, replace .unwrap() on the mutex lock with explicit error
handling to gracefully manage potential lock poisoning instead of panicking.

girolle/src/rpc_context.rs [152]

 pub async fn register_service(&self, service_name: &str) -> GirolleResult<()> {
     if let Some(conn) = &self.conn {
         let channel = create_service_channel(
             conn,
             service_name,
             self.config.prefetch_count(),
             self.config.rpc_exchange(),
         )
         .await?;
         
-        let mut services = self.services.lock().unwrap();
+        let mut services = self.services.lock().map_err(|_| {
+            GirolleError::ImplementationError("Failed to acquire lock on services".to_string())
+        })?;
         services.insert(service_name.to_string(), Arc::new(channel));
         Ok(())
     } else {
         Err(GirolleError::ArgumentsError(
             "RpcCaller not initialized".to_string(),
         ))
     }
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies that .unwrap() on a poisoned mutex will panic and suggests a more robust error handling strategy, which improves the library's resilience.

Medium
Support ShortString in headers

Update get_call_id_stack to process both LongString and ShortString AMQP values
from the header, improving compatibility and robustness.

girolle/src/rpc_context.rs [68-85]

 pub fn get_call_id_stack(&self) -> Option<Vec<String>> {
     self.headers
         .inner()
         .get("nameko.call_id_stack")
         .and_then(|v| v.as_array())
         .map(|arr| {
-            arr.as_slice()
-                .iter()
-                .filter_map(|v| {
-                    if let AMQPValue::LongString(s) = v {
+            arr.iter()
+                .filter_map(|v| match v {
+                    AMQPValue::LongString(s) => {
                         Some(String::from_utf8_lossy(s.as_bytes()).to_string())
-                    } else {
-                        None
                     }
+                    AMQPValue::ShortString(s) => Some(s.to_string()),
+                    _ => None,
                 })
                 .collect()
         })
 }
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion improves compatibility by handling both ShortString and LongString for the call_id_stack, making the implementation more robust to variations in AMQP messages.

Low
  • More

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Refactor service execution model to support RPC proxying and events (introduce async handlers + RpcContext)

2 participants