feat: add tool dispatch layer — ToolContext, traits, and LoopDecision#51
feat: add tool dispatch layer — ToolContext, traits, and LoopDecision#51ashwing wants to merge 17 commits into
Conversation
| # This cassette drives execute_loop through a complete 2-iteration tool loop: | ||
| # Iteration 0: model returns FunctionCall (get_weather) | ||
| # Iteration 1: model returns Message (final answer using tool result) | ||
|
|
There was a problem hiding this comment.
@ashwing is it possible to generate cassettes by serving openai and record the requests responses like https://github.com/vllm-project/agentic-api/blob/main/crates/agentic-core/tests/cassettes/text_only/conversation/conv-two-turn-gpt-4o-streaming.yaml
but instead of just text use function calls like get_weather.
the recording scripts can be found here https://github.com/vllm-project/agentic-api/blob/main/crates/agentic-core/tests/cassettes/record_cassette.py
There was a problem hiding this comment.
Good call — yeah we can do that. I'll add a cassette recording mode in a follow-up PR that proxies to a real vLLM instance and dumps the request/response pairs. For this one I hand-crafted from an actual gemma4 session to keep the test deterministic without needing a live server.
| let mut payload = match result { | ||
| Either::Left(payload) => payload, | ||
| Either::Right(_stream) => { | ||
| return Err(crate::executor::ExecutorError::InvalidRequest( |
There was a problem hiding this comment.
why not importing on top of the file use crate::executor::{ExecutorError}
There was a problem hiding this comment.
Done — moved all crate imports to the top. Pushed.
| ResponsesInput::Items(existing) => existing.clone(), | ||
| // First iteration with plain text — convert to structured item. | ||
| ResponsesInput::Text(t) => { | ||
| vec![crate::types::io::InputItem::Message(crate::types::io::InputMessage { |
There was a problem hiding this comment.
crate::types::io::InputItem::Message(crate::types::io::InputMessage) is making the line so long. could reduce the line by importing the objects on top of the file.
There was a problem hiding this comment.
Done — imported InputItem, InputMessage, InputMessageContent, and ResponsesInput at the top. Much cleaner now.
| // (server handler) owns final persistence with the correct RequestContext. | ||
| // Without this, the first iteration would persist a partial response | ||
| // (containing only the tool-call output, not the final answer) to the DB. | ||
| request.store = false; |
There was a problem hiding this comment.
why is this hardcoded false? I see the comment
// Without this, the first iteration would persist a partial response
// (containing only the tool-call output, not the final answer) to the DB.
could you clarify what do you mean by partial response?
the contract is as following:
The store value from request payload coming in from server initially if is false we are passing into vllm proxy for stateless inference.
if is set to true we would invoke execute_loop which should not set this to false. from second turn onwards if the store=False context ID is given meaning either of (conversation_id or previouse_response_id is present) we need to still persist. if context ID not provided then we proxy to vLLM server stateless.
There was a problem hiding this comment.
The loop suppresses persistence for intermediate iterations — otherwise each internal execute() call would write a partial response (just tool-call output, not the final answer) to the store. We clear all three persistence triggers (store, previous_response_id, conversation_id) because PR #56 persists when any of them is set. The caller (server handler) does the final persist once with the complete response.
Updated the doc comment to make this clearer.
There was a problem hiding this comment.
Happy to discuss further, esp. the requirement to persist intermediate tool call/results and the layer responsible to persist.
| // existing context + tool results appended. | ||
| let mut items = match &request.input { | ||
| // Already structured items — clone and extend. | ||
| ResponsesInput::Items(existing) => existing.clone(), |
There was a problem hiding this comment.
When existing is cloned, Rust preserves capacity but then extend may still trigger reallocation if tool_results.len() exceeds the slack. Can pre-size:
let mut items = Vec::with_capacity(existing.len() + tool_results.len());
There was a problem hiding this comment.
Done — switched to in-place mutation with items.reserve(tool_results.len()) + items.extend(tool_results). No clone at all now, just grows the existing vec.
| return Ok(payload); | ||
| } | ||
| // Tools were executed — inject results and re-enter inference. | ||
| LoopDecision::Continue(tool_results) => { |
There was a problem hiding this comment.
Each iteration: clone → extend → reassign. On iteration 2, you clone the vec that already contains iteration 1's results. On iteration 3, you clone a vec with iterations 1+2's results. The allocation grows linearly and is thrown away each time.
The in-place alternative:
LoopDecision::Continue(tool_results) => {
// Convert Text → Items on first tool call, then just extend in-place
if let ResponsesInput::Text(t) = &request.input {
request.input = ResponsesInput::Items(vec![InputItem::Message(...)]);
}
if let ResponsesInput::Items(ref mut v) = request.input {
v.extend(tool_results); // ← no clone, mutates the existing Vec
}
}
There was a problem hiding this comment.
Fixed — now mutates request.input in-place. Text gets converted to Items once on the first tool call, then subsequent iterations just reserve+extend on the same vec. No cloning.
| // Duration::ZERO = no timeout (provider/reqwest manages its own). | ||
| let inference_timeout = exec_ctx.streaming_timeout; | ||
| let result = if inference_timeout.is_zero() { | ||
| execute(request.clone(), Arc::clone(&exec_ctx)).await? |
There was a problem hiding this comment.
to avoid frequent request.clone() let's change the execute() signature to borrow the RequestPayload execute(request: &RequestPayload,...)
this change would require also change in rehydrate_conversation(request: &RequestPayload, ...)
There was a problem hiding this comment.
Agree this is the right long-term fix. The request.clone() is still there because execute() takes ownership today (it passes the payload into rehydrate_conversation which moves it). Changing that signature touches engine.rs internals (rehydrate, inject_ids, persist) which are outside our PR scope.
I'll open a follow-up to refactor execute() to borrow — it'll be a clean diff once this lands. For now the clone cost is bounded by the input size which doesn't grow per-iteration anymore (tool results are just appended, not cloned on each loop).
| /// serves as a higher-level retry — the model can re-call a failed tool on | ||
| /// the next iteration if it chooses to. | ||
| pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec<InputItem> { | ||
| let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect(); |
There was a problem hiding this comment.
great choice using future just to optimize memory and performance here we can directly use:
futures::future::join_all(calls.iter().map(|call| self.execute_one(call))).await
since join_all accepts iterator. the intermediate step to create a vector is not needed.
There was a problem hiding this comment.
Yep simplified — just passes the iterator directly into join_all now. No intermediate collect.
| iteration: usize, | ||
| ) -> ExecutorResult<LoopDecision> { | ||
| // Step 1: Extract FunctionCall items. Messages and Unknown are ignored. | ||
| let function_calls: Vec<_> = output |
There was a problem hiding this comment.
there is allocation before empty check which makes the allocation unnecessary.
we could optimize this by
let mut function_calls = output.iter().filter_map(|item| match item {
OutputItem::FunctionCall(fc) => Some(fc),
_ => None,
}).peekable();
if function_calls.peek().is_none() {
return Ok(LoopDecision::Done);
}
if iteration >= tool_ctx.max_iterations {
return Ok(LoopDecision::Incomplete(...));
}
let calls: Vec<_> = function_calls.collect(); // only alloc when we'll actually use it
tool_ctx.execute_all(&calls).await
This skips the Vec allocation on the two early-exit paths entirely.
There was a problem hiding this comment.
Good catch — switched to a .peekable() iterator. peek() checks for emptiness without allocating, and we only collect into a Vec when we know we'll actually execute. Pushed.
|
Hi @ashwing, Thank you for this PR. I'm wondering, would it be easier to split the function call into another PR? Maybe this PR can focus on the server-side execution loop, and we handle the basic function call support separately. For function call support, I think the flow should be:
Then we can record cassettes to test the complete function call cycle:
Once this is properly set up, the server-side execution loop should be mostly the same, except Additionally, I think there might be some semantic confusion around how LLM function calling works. For most OSS models, especially when served through vLLM, the model-facing tool interface is basically function calling ( For example:
I think it might be good to first align on the function call loop and semantics, then build the server-side execution on top of that. This should make the implementation easier to test and reason about. |
|
Hey @noobHappylife, thanks for the detailed breakdown — really helpful to see the full picture laid out like this. I think we're actually in agreement on the architecture. Let me map your flow to what's here: Client-side passthrough already works today. When a request hits What this PR adds is the server-side layer on top. When So the two flows are:
On type-aware routing — fully agree. The dispatch code has an explicit MVP note (line 53-56) saying "the distinction between client-side functions and gateway-executed tools requires access to the request's tools array — deferred to a follow-up." That's exactly what you're describing. The follow-up would:
On normalization before vLLM — also agree. MCP tools should be resolved into function-compatible schemas before the model sees them. vLLM only speaks function calling. That normalization layer sits upstream of I think the sequencing works as:
Each layer builds on the previous. Does this framing make sense to you? Happy to discuss further — especially the normalization step since you've clearly thought through the Codex integration side of this. |
@ashwing I think we could have one PR to add built-in or where model's output would contain a json like after we have the PR supporting function_call and the reasnoning in #57 then we can go with
cause I feel for the server-side loop we should at least add the function_call described as above would make the loop more solid since we are at least supporting the built-in calls. |
|
@maralbahari Good point — you're right that the accumulator doesn't actually produce I can add function_call accumulation to this PR — it follows the same pattern as PR #57's reasoning items:
That way the loop has real function_call items to dispatch on in both blocking and streaming paths. Should be ~50 lines following the existing accumulation pattern. Does that work for you, or would you prefer it as a separate PR that lands first? |
@ashwing thanks I think separate PR would be easier to review and land in first. |
|
@maralbahari Opened #59 with the function_call accumulation as a separate PR per your suggestion. Once that lands, this PR's execute_loop will have |
## Summary The streaming path's `ResponseAccumulator` handles text message events but didn't yet have handlers for `function_call` events — they fell through the wildcard arm. This adds the missing accumulation so streaming responses include `FunctionCall` output items, matching the blocking JSON path. Prerequisite for #51 (`execute_loop` needs `FunctionCall` items available in the streaming path to dispatch tools). ### Before A client sends a streaming request with tools: ```json {"model": "meta-llama/...", "stream": true, "input": "What's the weather?", "tools": [{"type": "function", "name": "get_weather", ...}]} ``` vLLM responds with `function_call` SSE events, but the accumulator ignores them. The client gets back: ```json {"status": "completed", "output": []} ``` ### After The same request now produces: ```json { "status": "completed", "output": [{ "type": "function_call", "id": "fc_1", "call_id": "call_abc", "name": "get_weather", "arguments": "{\"location\":\"Paris\"}", "status": "completed" }] } ``` ### What's handled - `OutputItemAdded` with `item_type == "function_call"` — starts a new in-flight `FunctionToolCall` - `FunctionCallArgumentsDelta` — appends to argument buffer - `FunctionCallArgumentsDone` — finalizes with authoritative arguments, pushes to output - Multiple function calls in one response (parallel tool use) - Interleaved message → function_call → message ordering preserved - Stream disconnect mid-arguments — partial args retained via forced finalize - Orphaned deltas (no active function call) — safely cleared before next call - Coexists with reasoning accumulation (PR #57) — unified `match` with three-way type dispatch ## Test Plan **Unit tests** (8 in `accumulator.rs`): - Full lifecycle, parallel tool use, interleaved with messages, orphaned deltas, forced finalize on disconnect **Cassette integration tests** (12 in `accumulator_cassette_test.rs`): - `tool_choice=auto` streaming + non-streaming (parallel tool calls) - `tool_choice=required` streaming + non-streaming - `tool_choice=named` streaming + non-streaming - `tool_choice=none` streaming + non-streaming (zero function calls) - Reasoning streaming — Qwen3 (reasoning + message) - Reasoning streaming — GPT-oss (reasoning only, see note below) - Legacy Gemma4 function_call + text-only regression guard Non-streaming tests exercise the `from_json` path against the same cassettes. All tests pass, `cargo fmt --check` + `cargo clippy -- -D warnings` clean. ### Note: GPT-oss streaming gap While testing the reasoning cassettes, I found that GPT-oss doesn't emit `response.output_item.added` for the message item after reasoning — it jumps from `reasoning_text.done` → `output_text.done` → `output_item.done`. The streaming accumulator can't capture that message because no `output_item.added` creates the in-flight state. Qwen3 emits the event correctly. Not blocking — the full output is in the `response.completed` payload. Filed as #62 for follow-up. --------- Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Introduces the tool execution framework for the agentic loop: - `tools/` module (crate root): McpToolExecutor, WebSearchProvider, VectorStoreClient traits with Pin<Box<dyn Future>> for dyn-compatibility - `executor/dispatch.rs`: dispatch_tools() inspects output for FunctionCall items, returns LoopDecision (Continue/Done/Incomplete) - `executor/tool_context.rs`: ToolContext holds provider Arcs, execute_all() runs tool calls in parallel via join_all, individual failures produce error output per call_id (not total failure) For MVP, all FunctionCall items route to configured providers (MCP first, then web_search, then vector_store). Client-side function routing (checking the request's tools array) is deferred to a follow-up. 10 integration tests with mock executors covering: empty output, single/ parallel calls, max iterations, failing tools, no provider, mixed results. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- Fix JSON injection: use serde_json::json! for error output (not format!) - Fix default max_iterations: manual Default impl with 10 (was 0 via derive) - Fix unwrap_or_default: use expect() for infallible Vec<Value> serialization - Add concurrency comment to execute_all - Document MVP routing order in route_call Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Pre-commit hook interprets #![...] at line 1 as a shebang. Adding a comment line before it prevents the false positive. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Composes execute() + dispatch_tools() in a tool-call loop: 1. Execute request (non-streaming) 2. dispatch_tools inspects output for FunctionCall items 3. If Continue → inject tool results into input, re-enter 4. If Done/Incomplete → persist and return final payload MVP: non-streaming only. Streaming + tool dispatch requires StreamTee (future PR). Rejects stream=true with clear error. This completes Phase 2 of the core API design (PR vllm-project#44): traits + dispatch + loop orchestration in one PR. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
13 tests covering the full agentic loop orchestrator: P0 — streaming rejection: - Rejects stream=true with clear error, no LLM call made P1 — core loop paths: - No tool calls → returns directly after 1 LLM call - Persistence when store=true - Single tool call → dispatch → re-enter → text response - Parallel tool calls → both executed → re-enter - Mixed message + function_call → only FC dispatched - Max iterations → stops loop, returns last payload - Max iterations boundary (1 allows single dispatch) Failure cases: - Tool provider fails → error fed to model, loop continues - No providers configured → error per call_id - Empty model output → returns Done immediately - Multi-hop (3 iterations: tool A → tool B → text) - LLM server error → propagated as Err Uses existing TestFixture infrastructure (MockServer + SQLite). Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Records a 2-turn tool-call session from google/gemma-4-26B-A4B-it (vLLM v0.21.0) and replays it through execute_loop: Turn 1: model calls get_weather → FunctionCall output Turn 2: model receives tool result → text response Cassette file: tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml Validates that execute_loop produces the same final text as the real model session (2 iterations, correct tool result injection). Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Round 1 fixes: - Add MAX_LOOP_GUARD (128) — defense-in-depth hard cap - Remove broken persistence from execute_loop (caller handles it) - Capture original_previous_response_id, restore on final payload - Set payload.status = "incomplete" for Incomplete path - Set request.store = false for intermediate iterations Round 2 findings: - TODO: InputItem needs FunctionCall variant to inject assistant's tool-call items into context (type system limitation, follow-up PR) - Document store=false semantics and performance characteristics Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- Add tool_timeout (Duration, default 30s) to ToolContext - Wrap each tool execution in tokio::time::timeout — hung providers produce timeout error string per call_id (not total failure) - Add test: Items input extended correctly on Continue - Add test: previous_response_id=None preserved through loop - Add test: cassette-driven loop from live vLLM (P2) Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Wraps each execute() call in tokio::time::timeout using exec_ctx.streaming_timeout (default 30s). If the LLM hangs, produces a clear error with iteration number. Duration::ZERO disables the timeout (same pattern as streaming_timeout and tool_timeout). Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- execute_loop.rs: module-level architecture diagram, contract docs, timeout documentation, known limitations, inline step explanations - dispatch.rs: module-level decision flow, step-by-step inline comments, error semantics documentation - tool_context.rs: design opinions, concurrency model, failure model, retry policy, routing explanation with follow-up notes All doc comments pass clippy::doc_markdown. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
…ails Round 3 review fixes: - Move request.store=false BEFORE the loop (was inside Continue arm, too late for first iteration). Prevents persisting partial responses containing only tool-call output without final answer. - Populate payload.incomplete_details with the reason string when max_iterations is hit. Clients need this per Responses API spec. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
…alloc - Move types to top-level imports (ExecutorError, InputItem, InputMessage, etc.) - Mutate request.input in-place instead of clone→extend→reassign each iteration - Use peekable iterator in dispatch to skip Vec allocation on early-exit paths - Pass iterator directly to join_all, removing intermediate Vec in execute_all Signed-off-by: Ashwin Giridharan <girida@amazon.com>
PR vllm-project#56 changed persist logic to fire when ANY of store/previous_response_id/ conversation_id is set. Our loop was only clearing store=false, leaving conversation_id as a persistence trigger for intermediate iterations. Now clear all three before the loop starts and restore both IDs on the returned payload (matching the original request's response chain). Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Add 6 new tests covering the critical invariants: - previous_response_id=Some restored on Done path - conversation_id=Some restored on Done path - store=true suppressed to false in internal LLM calls - all 3 persistence triggers cleared in multi-hop scenario - both IDs restored on Incomplete (max_iterations) path - Items input + conversation_id works with in-place extend Also tighten doc comment on execute_loop to accurately describe clearing all 3 triggers and restoring both IDs. Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Signed-off-by: Ashwin Giridharan <girida@amazon.com>
9f91aa8 to
4c4a28e
Compare
|
@maralbahari PR #59 is merged — rebased this on top of it. Ready for re-review. Updated the PR description with the sequencing we discussed. |
@ashwing Thank you for changes. as per discussed in community meeting today. we should hold on this PR since the agentic |
|
Closing this in favor of #67 |
Summary
Server-side tool dispatch infrastructure for
agentic-core. Addsexecute_loop— an agentic loop orchestrator that executes tool calls and re-enters inference, so the gateway can resolve tools without client round-trips.This is a library-only PR (Step 1 in the sequencing below). It adds tested, callable functions but does NOT wire them into the server handler yet. The client-side passthrough flow (vLLM returns
function_call→ client executes → client re-submits) continues to work as before viaexecute()directly.What this enables (once wired):
All within a single request. The client never sees the intermediate tool calls.
Sequencing
Agreed with @noobHappylife and @maralbahari in the comments below:
OutputItem::FunctionCallfrom SSEexecute_loop+dispatch_tools+ToolContext+ traitstoolsarray intodispatch_tools, route bytypefieldhandler.rscallsexecute_loopwhen tools presentKey design point (from @noobHappylife): vLLM only speaks function calling. Other tool types (
mcp,web_search,file_search) must be normalized into function-compatible schemas before inference, while keeping metadata to route the output back to the correct executor. That normalization is Step 3.What's Added
tools/mcp.rsMcpToolExecutortraittools/web_search.rsWebSearchProvidertraittools/vector_store.rsVectorStoreClienttraitexecutor/dispatch.rsdispatch_tools()→LoopDecisionexecutor/tool_context.rsToolContext+ parallelexecute_all()executor/execute_loop.rsHow It Works
Safety Guarantees
max_iterationsdefault 10 (viadispatch_tools)store,previous_response_id,conversation_id) — prevents intermediate tool-call-only responses from being written to store (required since PR fix: persist conversation turns whenstore=Falsebut a context ID is pass #56 persists when ANY of the three is set)previous_response_idandconversation_idare captured before the loop and restored on the final payload (Done and Incomplete paths)Test Plan
32 tests covering happy paths, error paths, boundaries, persistence invariants, and a cassette recorded from live vLLM (gemma-4-26B-A4B-it). 3 rounds of adversarial review passed. All workspace tests green.
Full test list (32 tests)
dispatch_tools (10): no calls→Done, single/parallel→Continue, max iterations→Incomplete, failing tool→error output, no provider→error, mixed results, message+FC mixed, iteration boundary
execute_loop (22): streaming rejection, no tools, store=true persistence, single tool loop, parallel tools, mixed output, max iterations (status=incomplete), boundary, tool failure continues, no providers, empty output, multi-hop (3 iterations), LLM error propagated, Items input extended correctly, previous_response_id=None preserved, cassette replay from live vLLM, previous_response_id=Some restored on Done, conversation_id restored on Done, store suppressed in internal iterations, all 3 persistence triggers cleared (multi-hop), Incomplete path restores both IDs, Items input + conversation_id
MVP Routing (Step 1 limitation)
Currently
dispatch_toolsreceives(output, tool_ctx, iteration)but NOT the request'stoolsarray. This means:FunctionCallitems are treated as server-executable (routed to first configured provider)type: "function"(client-side) andtype: "mcp"(server-side)execute_loop(useexecute()directly)Step 2 changes the signature to include the tools array and adds a
LoopDecision::Partialvariant for mixed client+server tool responses.Known Limitations
InputItemlacks FunctionCall variant — model doesn't see its own tool-call items in re-entry context (type system change needed)request.clone()per iteration —execute()takes ownership; refactoring to&RequestPayloadis out of scope (touches engine.rs internals)References