Skip to content

feat: add tool dispatch layer — ToolContext, traits, and LoopDecision#51

Closed
ashwing wants to merge 17 commits into
vllm-project:mainfrom
ashwing:feat/tool-dispatch
Closed

feat: add tool dispatch layer — ToolContext, traits, and LoopDecision#51
ashwing wants to merge 17 commits into
vllm-project:mainfrom
ashwing:feat/tool-dispatch

Conversation

@ashwing

@ashwing ashwing commented Jun 11, 2026

Copy link
Copy Markdown
Collaborator

Summary

Server-side tool dispatch infrastructure for agentic-core. Adds execute_loop — an agentic loop orchestrator that executes tool calls and re-enters inference, so the gateway can resolve tools without client round-trips.

This is a library-only PR (Step 1 in the sequencing below). It adds tested, callable functions but does NOT wire them into the server handler yet. The client-side passthrough flow (vLLM returns function_call → client executes → client re-submits) continues to work as before via execute() directly.

What this enables (once wired):

Client request → LLM responds with FunctionCall → gateway executes tool →
injects result → LLM responds with final answer → return to client

All within a single request. The client never sees the intermediate tool calls.

Sequencing

Agreed with @noobHappylife and @maralbahari in the comments below:

Step PR Status What
0 #59 Merged function_call accumulation — accumulator produces OutputItem::FunctionCall from SSE
1 #51 (this) Ready for review Server-side loop infrastructure — execute_loop + dispatch_tools + ToolContext + traits
2 TBD Future Type-aware dispatch — pass tools array into dispatch_tools, route by type field
3 TBD Future Tool normalization — MCP → function-compatible schemas before vLLM sees them
4 TBD Future Server handler wiring — handler.rs calls execute_loop when tools present

Key design point (from @noobHappylife): vLLM only speaks function calling. Other tool types (mcp, web_search, file_search) must be normalized into function-compatible schemas before inference, while keeping metadata to route the output back to the correct executor. That normalization is Step 3.

What's Added

Module What Lines
tools/mcp.rs McpToolExecutor trait 18
tools/web_search.rs WebSearchProvider trait 15
tools/vector_store.rs VectorStoreClient trait 17
executor/dispatch.rs dispatch_tools()LoopDecision 101
executor/tool_context.rs ToolContext + parallel execute_all() 163
executor/execute_loop.rs Orchestrator: execute → dispatch → loop 184

How It Works

execute_loop(request, exec_ctx, tool_ctx)
  │
  ├── Capture original IDs (prev_resp_id, conv_id)
  ├── Clear persistence triggers (store=false, IDs=None)
  │
  ├── for each iteration (capped at 128):
  │     │
  │     ├── execute(request) → ResponsePayload  [30s timeout]
  │     │     └── store=false, no IDs → internal execute() skips persist
  │     │
  │     ├── dispatch_tools(output) → LoopDecision
  │     │     ├── Done → restore IDs, return payload to caller
  │     │     ├── Incomplete → set status, restore IDs, return
  │     │     └── Continue(results) → extend input in-place, loop
  │     │
  │     └── tool_ctx.execute_all(calls)  [30s per call, parallel]
  │           └── individual failures → error JSON (not fatal)
  │
  └── Caller owns persistence. Loop never writes to DB.

Safety Guarantees

  • Hard loop cap: 128 iterations (defense-in-depth)
  • Soft tool cap: max_iterations default 10 (via dispatch_tools)
  • LLM timeout: 30s per inference call
  • Tool timeout: 30s per tool call
  • No partial persistence: All 3 triggers cleared (store, previous_response_id, conversation_id) — prevents intermediate tool-call-only responses from being written to store (required since PR fix: persist conversation turns when store=False but a context ID is pass #56 persists when ANY of the three is set)
  • ID restoration: Both previous_response_id and conversation_id are captured before the loop and restored on the final payload (Done and Incomplete paths)
  • In-place input growth: Tool results extend the input vec in-place — no quadratic clone-and-reassign

Test Plan

32 tests covering happy paths, error paths, boundaries, persistence invariants, and a cassette recorded from live vLLM (gemma-4-26B-A4B-it). 3 rounds of adversarial review passed. All workspace tests green.

Full test list (32 tests)

dispatch_tools (10): no calls→Done, single/parallel→Continue, max iterations→Incomplete, failing tool→error output, no provider→error, mixed results, message+FC mixed, iteration boundary

execute_loop (22): streaming rejection, no tools, store=true persistence, single tool loop, parallel tools, mixed output, max iterations (status=incomplete), boundary, tool failure continues, no providers, empty output, multi-hop (3 iterations), LLM error propagated, Items input extended correctly, previous_response_id=None preserved, cassette replay from live vLLM, previous_response_id=Some restored on Done, conversation_id restored on Done, store suppressed in internal iterations, all 3 persistence triggers cleared (multi-hop), Incomplete path restores both IDs, Items input + conversation_id

MVP Routing (Step 1 limitation)

Currently dispatch_tools receives (output, tool_ctx, iteration) but NOT the request's tools array. This means:

  • ALL FunctionCall items are treated as server-executable (routed to first configured provider)
  • No distinction between type: "function" (client-side) and type: "mcp" (server-side)
  • Client-side passthrough still works — just don't call execute_loop (use execute() directly)

Step 2 changes the signature to include the tools array and adds a LoopDecision::Partial variant for mixed client+server tool responses.

Known Limitations

  • Non-streaming only — StreamTee PR needed for streaming + tools
  • InputItem lacks FunctionCall variant — model doesn't see its own tool-call items in re-entry context (type system change needed)
  • request.clone() per iterationexecute() takes ownership; refactoring to &RequestPayload is out of scope (touches engine.rs internals)
  • MVP routing — priority-order (MCP → web_search → vector_store), not by tool type from request

References

@ashwing ashwing marked this pull request as ready for review June 11, 2026 23:00
@ashwing ashwing marked this pull request as draft June 12, 2026 20:15
@ashwing ashwing marked this pull request as ready for review June 12, 2026 22:40
# This cassette drives execute_loop through a complete 2-iteration tool loop:
# Iteration 0: model returns FunctionCall (get_weather)
# Iteration 1: model returns Message (final answer using tool result)

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ashwing is it possible to generate cassettes by serving openai and record the requests responses like https://github.com/vllm-project/agentic-api/blob/main/crates/agentic-core/tests/cassettes/text_only/conversation/conv-two-turn-gpt-4o-streaming.yaml
but instead of just text use function calls like get_weather.
the recording scripts can be found here https://github.com/vllm-project/agentic-api/blob/main/crates/agentic-core/tests/cassettes/record_cassette.py

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call — yeah we can do that. I'll add a cassette recording mode in a follow-up PR that proxies to a real vLLM instance and dumps the request/response pairs. For this one I hand-crafted from an actual gemma4 session to keep the test deterministic without needing a live server.

let mut payload = match result {
Either::Left(payload) => payload,
Either::Right(_stream) => {
return Err(crate::executor::ExecutorError::InvalidRequest(

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not importing on top of the file use crate::executor::{ExecutorError}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — moved all crate imports to the top. Pushed.

ResponsesInput::Items(existing) => existing.clone(),
// First iteration with plain text — convert to structured item.
ResponsesInput::Text(t) => {
vec![crate::types::io::InputItem::Message(crate::types::io::InputMessage {

@maralbahari maralbahari Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

crate::types::io::InputItem::Message(crate::types::io::InputMessage) is making the line so long. could reduce the line by importing the objects on top of the file.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — imported InputItem, InputMessage, InputMessageContent, and ResponsesInput at the top. Much cleaner now.

// (server handler) owns final persistence with the correct RequestContext.
// Without this, the first iteration would persist a partial response
// (containing only the tool-call output, not the final answer) to the DB.
request.store = false;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this hardcoded false? I see the comment

// Without this, the first iteration would persist a partial response
 // (containing only the tool-call output, not the final answer) to the DB.

could you clarify what do you mean by partial response?

the contract is as following:
The store value from request payload coming in from server initially if is false we are passing into vllm proxy for stateless inference.
if is set to true we would invoke execute_loop which should not set this to false. from second turn onwards if the store=False context ID is given meaning either of (conversation_id or previouse_response_id is present) we need to still persist. if context ID not provided then we proxy to vLLM server stateless.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop suppresses persistence for intermediate iterations — otherwise each internal execute() call would write a partial response (just tool-call output, not the final answer) to the store. We clear all three persistence triggers (store, previous_response_id, conversation_id) because PR #56 persists when any of them is set. The caller (server handler) does the final persist once with the complete response.

Updated the doc comment to make this clearer.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy to discuss further, esp. the requirement to persist intermediate tool call/results and the layer responsible to persist.

// existing context + tool results appended.
let mut items = match &request.input {
// Already structured items — clone and extend.
ResponsesInput::Items(existing) => existing.clone(),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When existing is cloned, Rust preserves capacity but then extend may still trigger reallocation if tool_results.len() exceeds the slack. Can pre-size:

let mut items = Vec::with_capacity(existing.len() + tool_results.len());

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done — switched to in-place mutation with items.reserve(tool_results.len()) + items.extend(tool_results). No clone at all now, just grows the existing vec.

return Ok(payload);
}
// Tools were executed — inject results and re-enter inference.
LoopDecision::Continue(tool_results) => {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Each iteration: clone → extend → reassign. On iteration 2, you clone the vec that already contains iteration 1's results. On iteration 3, you clone a vec with iterations 1+2's results. The allocation grows linearly and is thrown away each time.

The in-place alternative:

LoopDecision::Continue(tool_results) => {
    // Convert Text → Items on first tool call, then just extend in-place
    if let ResponsesInput::Text(t) = &request.input {
        request.input = ResponsesInput::Items(vec![InputItem::Message(...)]);
    }
    if let ResponsesInput::Items(ref mut v) = request.input {
        v.extend(tool_results);   // ← no clone, mutates the existing Vec
    }
}

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed — now mutates request.input in-place. Text gets converted to Items once on the first tool call, then subsequent iterations just reserve+extend on the same vec. No cloning.

// Duration::ZERO = no timeout (provider/reqwest manages its own).
let inference_timeout = exec_ctx.streaming_timeout;
let result = if inference_timeout.is_zero() {
execute(request.clone(), Arc::clone(&exec_ctx)).await?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to avoid frequent request.clone() let's change the execute() signature to borrow the RequestPayload execute(request: &RequestPayload,...)
this change would require also change in rehydrate_conversation(request: &RequestPayload, ...)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree this is the right long-term fix. The request.clone() is still there because execute() takes ownership today (it passes the payload into rehydrate_conversation which moves it). Changing that signature touches engine.rs internals (rehydrate, inject_ids, persist) which are outside our PR scope.

I'll open a follow-up to refactor execute() to borrow — it'll be a clean diff once this lands. For now the clone cost is bounded by the input size which doesn't grow per-iteration anymore (tool results are just appended, not cloned on each loop).

/// serves as a higher-level retry — the model can re-call a failed tool on
/// the next iteration if it chooses to.
pub async fn execute_all(&self, calls: &[&FunctionToolCall]) -> Vec<InputItem> {
let futures: Vec<_> = calls.iter().map(|call| self.execute_one(call)).collect();

@maralbahari maralbahari Jun 16, 2026

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

great choice using future just to optimize memory and performance here we can directly use:

futures::future::join_all(calls.iter().map(|call| self.execute_one(call))).await

since join_all accepts iterator. the intermediate step to create a vector is not needed.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep simplified — just passes the iterator directly into join_all now. No intermediate collect.

iteration: usize,
) -> ExecutorResult<LoopDecision> {
// Step 1: Extract FunctionCall items. Messages and Unknown are ignored.
let function_calls: Vec<_> = output

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is allocation before empty check which makes the allocation unnecessary.

we could optimize this by

let mut function_calls = output.iter().filter_map(|item| match item {
    OutputItem::FunctionCall(fc) => Some(fc),
    _ => None,
}).peekable();

if function_calls.peek().is_none() {
    return Ok(LoopDecision::Done);
}
if iteration >= tool_ctx.max_iterations {
    return Ok(LoopDecision::Incomplete(...));
}

let calls: Vec<_> = function_calls.collect();  // only alloc when we'll actually use it
tool_ctx.execute_all(&calls).await

This skips the Vec allocation on the two early-exit paths entirely.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — switched to a .peekable() iterator. peek() checks for emptiness without allocating, and we only collect into a Vec when we know we'll actually execute. Pushed.

@noobHappylife

noobHappylife commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator

Hi @ashwing,

Thank you for this PR. I'm wondering, would it be easier to split the function call into another PR? Maybe this PR can focus on the server-side execution loop, and we handle the basic function call support separately.

For function call support, I think the flow should be:

  • client sends request with tools
  • agentic-api passes the tools to upstream vLLM
  • upstream vLLM returns the tool_calls signature
  • agentic-api returns those tool_calls back to the client
  • client executes the tools and sends back the tool results
  • agentic-api passes the tool results back to upstream vLLM
  • upstream returns the final response

Then we can record cassettes to test the complete function call cycle:

client sends request with tool(s)agentic-api returns tool_call(s) → client executes and sends tool_call_result(s)agentic-api returns final result.

Once this is properly set up, the server-side execution loop should be mostly the same, except agentic-api becomes the executor for the tool_call(s) instead of the client.

Additionally, I think there might be some semantic confusion around how LLM function calling works.

For most OSS models, especially when served through vLLM, the model-facing tool interface is basically function calling (type: "function"). So for other tool types like mcp, web_search, etc., I don't think we should pass them directly to the model/template unless the backend explicitly supports it. agentic-api should internally normalize or lower those tools into function-compatible schemas so the LLM can understand them, while keeping the metadata needed to route the tool call back to the correct executor.

For example:

  • type: "function" can be passed through or lightly normalized
  • type: "mcp" should probably be resolved into MCP server tools, then exposed to the model as function-compatible tools
  • hosted tools like web_search should either be implemented by agentic-api, proxied, or rejected as unsupported (if we haven't implemented it)

I think it might be good to first align on the function call loop and semantics, then build the server-side execution on top of that. This should make the implementation easier to test and reason about.

@ashwing

ashwing commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator Author

Hey @noobHappylife, thanks for the detailed breakdown — really helpful to see the full picture laid out like this.

I think we're actually in agreement on the architecture. Let me map your flow to what's here:

Client-side passthrough already works today. When a request hits execute() in engine.rs without an execute_loop caller, vLLM returns function_call output items and they pass straight through in the ResponsePayload to the client. The client executes, sends results back as function_call_output input items in the next request, and the cycle continues. That path doesn't need execute_loop at all — it's just the stateless proxy doing its job.

What this PR adds is the server-side layer on top. When agentic-api itself needs to execute tools (MCP servers, web_search, code_interpreter), execute_loop orchestrates that. The key is: dispatch_tools inspects the output and decides whether to execute server-side or stop. If there are no server-executable tool calls → LoopDecision::Done → the response goes back to the client as-is (same as your passthrough flow).

So the two flows are:

  • No execute_loop (or ToolContext with no providers): pure passthrough, client does everything
  • With execute_loop + configured providers: gateway executes what it can, returns the rest to client

On type-aware routing — fully agree. The dispatch code has an explicit MVP note (line 53-56) saying "the distinction between client-side functions and gateway-executed tools requires access to the request's tools array — deferred to a follow-up." That's exactly what you're describing. The follow-up would:

  1. Pass the tools array into dispatch_tools
  2. Classify each FunctionCall by its original tool type
  3. Gateway types (web_search, code_interpreter, mcp) → execute server-side → Continue
  4. Client types (function, namespace, tool_search, custom) → don't execute → Done with status: requires_action

On normalization before vLLM — also agree. MCP tools should be resolved into function-compatible schemas before the model sees them. vLLM only speaks function calling. That normalization layer sits upstream of execute_loop (during request enrichment) and is independent of this PR's dispatch mechanics.

I think the sequencing works as:

  1. This PR — server-side loop infrastructure (execute_loop + dispatch + ToolContext)
  2. Next — type-aware dispatch (classify by tool type, split client vs gateway)
  3. Then — tool normalization (MCP → function schema, namespace → flatten)

Each layer builds on the previous. Does this framing make sense to you? Happy to discuss further — especially the normalization step since you've clearly thought through the Codex integration side of this.

@maralbahari

Copy link
Copy Markdown
Collaborator

I think the sequencing works as:

  1. This PR — server-side loop infrastructure (execute_loop + dispatch + ToolContext)
  2. Next — type-aware dispatch (classify by tool type, split client vs gateway)
  3. Then — tool normalization (MCP → function schema, namespace → flatten)

Each layer builds on the previous. Does this framing make sense to you? Happy to discuss further — especially the normalization step since you've clearly thought through the Codex integration side of this.

@ashwing I think we could have one PR to add built-in function_call s first?.
now we have #57 which is handling reasnoning output we need to support other output types which are result of function_call like:

 input prompt : "how is the weather in France "             
    "type": "function",
    "name": "get_weather"

or

  input prompt: "what is the answer to 2+2"
  "type":"function",
  name:"calculator"

where model's output would contain a json like

    "output": [
        {
            "arguments": "",
            "call_id": "chatcmpl-tool-aaff0fbfaabeb6e2",
            "name": "function_name ",
            "type": "function_call",
            "id": "fc_94af73a42c3d05c0",
            "namespace": null,
            "status": "completed"
        }
    ],

after we have the PR supporting function_call and the reasnoning in #57 then we can go with

  1. This PR — server-side loop infrastructure (execute_loop + dispatch + ToolContext)
  2. Next — type-aware dispatch (classify by tool type, split client vs gateway)
  3. Then — tool normalization (MCP → function schema, namespace → flatten)

cause I feel for the server-side loop we should at least add the function_call described as above would make the loop more solid since we are at least supporting the built-in calls.
let me know your thoughts, thanks

@ashwing

ashwing commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator Author

@maralbahari Good point — you're right that the accumulator doesn't actually produce OutputItem::FunctionCall from the SSE stream today. The normalizer classifies function_call_arguments.delta/done events and extracts the payload, but the accumulator's process_event falls through to the wildcard for those event types. So in streaming mode, function calls would never appear in payload.output for dispatch to find.

I can add function_call accumulation to this PR — it follows the same pattern as PR #57's reasoning items:

  • Track current_function_call: Option<FunctionToolCall> + accumulated_arguments: String on the accumulator
  • OutputItemAdded with item_type == "function_call" → start a new in-flight function call (using name/call_id from the event)
  • FunctionCallArgumentsDelta → append to accumulated_arguments
  • FunctionCallArgumentsDone → finalize with full arguments string, push OutputItem::FunctionCall to output
  • finalize_current_function_call() for the partial-disconnect case

That way the loop has real function_call items to dispatch on in both blocking and streaming paths. Should be ~50 lines following the existing accumulation pattern.

Does that work for you, or would you prefer it as a separate PR that lands first?

@maralbahari

Copy link
Copy Markdown
Collaborator

@maralbahari Good point — you're right that the accumulator doesn't actually produce OutputItem::FunctionCall from the SSE stream today. The normalizer classifies function_call_arguments.delta/done events and extracts the payload, but the accumulator's process_event falls through to the wildcard for those event types. So in streaming mode, function calls would never appear in payload.output for dispatch to find.

I can add function_call accumulation to this PR — it follows the same pattern as PR #57's reasoning items:

  • Track current_function_call: Option<FunctionToolCall> + accumulated_arguments: String on the accumulator
  • OutputItemAdded with item_type == "function_call" → start a new in-flight function call (using name/call_id from the event)
  • FunctionCallArgumentsDelta → append to accumulated_arguments
  • FunctionCallArgumentsDone → finalize with full arguments string, push OutputItem::FunctionCall to output
  • finalize_current_function_call() for the partial-disconnect case

That way the loop has real function_call items to dispatch on in both blocking and streaming paths. Should be ~50 lines following the existing accumulation pattern.

Does that work for you, or would you prefer it as a separate PR that lands first?

@ashwing thanks I think separate PR would be easier to review and land in first.

@ashwing

ashwing commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator Author

@maralbahari Opened #59 with the function_call accumulation as a separate PR per your suggestion. Once that lands, this PR's execute_loop will have FunctionCall items available in the streaming path for dispatch.

franciscojavierarceo pushed a commit that referenced this pull request Jun 17, 2026
## Summary

The streaming path's `ResponseAccumulator` handles text message events
but didn't yet have handlers for `function_call` events — they fell
through the wildcard arm. This adds the missing accumulation so
streaming responses include `FunctionCall` output items, matching the
blocking JSON path.

Prerequisite for #51 (`execute_loop` needs `FunctionCall` items
available in the streaming path to dispatch tools).

### Before

A client sends a streaming request with tools:
```json
{"model": "meta-llama/...", "stream": true, "input": "What's the weather?", "tools": [{"type": "function", "name": "get_weather", ...}]}
```

vLLM responds with `function_call` SSE events, but the accumulator
ignores them. The client gets back:
```json
{"status": "completed", "output": []}
```

### After

The same request now produces:
```json
{
  "status": "completed",
  "output": [{
    "type": "function_call",
    "id": "fc_1",
    "call_id": "call_abc",
    "name": "get_weather",
    "arguments": "{\"location\":\"Paris\"}",
    "status": "completed"
  }]
}
```

### What's handled

- `OutputItemAdded` with `item_type == "function_call"` — starts a new
in-flight `FunctionToolCall`
- `FunctionCallArgumentsDelta` — appends to argument buffer
- `FunctionCallArgumentsDone` — finalizes with authoritative arguments,
pushes to output
- Multiple function calls in one response (parallel tool use)
- Interleaved message → function_call → message ordering preserved
- Stream disconnect mid-arguments — partial args retained via forced
finalize
- Orphaned deltas (no active function call) — safely cleared before next
call
- Coexists with reasoning accumulation (PR #57) — unified `match` with
three-way type dispatch

## Test Plan

**Unit tests** (8 in `accumulator.rs`):
- Full lifecycle, parallel tool use, interleaved with messages, orphaned
deltas, forced finalize on disconnect

**Cassette integration tests** (12 in `accumulator_cassette_test.rs`):
- `tool_choice=auto` streaming + non-streaming (parallel tool calls)
- `tool_choice=required` streaming + non-streaming
- `tool_choice=named` streaming + non-streaming
- `tool_choice=none` streaming + non-streaming (zero function calls)
- Reasoning streaming — Qwen3 (reasoning + message)
- Reasoning streaming — GPT-oss (reasoning only, see note below)
- Legacy Gemma4 function_call + text-only regression guard

Non-streaming tests exercise the `from_json` path against the same
cassettes.

All tests pass, `cargo fmt --check` + `cargo clippy -- -D warnings`
clean.

### Note: GPT-oss streaming gap

While testing the reasoning cassettes, I found that GPT-oss doesn't emit
`response.output_item.added` for the message item after reasoning — it
jumps from `reasoning_text.done` → `output_text.done` →
`output_item.done`. The streaming accumulator can't capture that message
because no `output_item.added` creates the in-flight state. Qwen3 emits
the event correctly. Not blocking — the full output is in the
`response.completed` payload. Filed as #62 for follow-up.

---------

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
ashwing added 3 commits June 17, 2026 14:39
Introduces the tool execution framework for the agentic loop:

- `tools/` module (crate root): McpToolExecutor, WebSearchProvider,
  VectorStoreClient traits with Pin<Box<dyn Future>> for dyn-compatibility
- `executor/dispatch.rs`: dispatch_tools() inspects output for FunctionCall
  items, returns LoopDecision (Continue/Done/Incomplete)
- `executor/tool_context.rs`: ToolContext holds provider Arcs, execute_all()
  runs tool calls in parallel via join_all, individual failures produce error
  output per call_id (not total failure)

For MVP, all FunctionCall items route to configured providers (MCP first,
then web_search, then vector_store). Client-side function routing (checking
the request's tools array) is deferred to a follow-up.

10 integration tests with mock executors covering: empty output, single/
parallel calls, max iterations, failing tools, no provider, mixed results.

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- Fix JSON injection: use serde_json::json! for error output (not format!)
- Fix default max_iterations: manual Default impl with 10 (was 0 via derive)
- Fix unwrap_or_default: use expect() for infallible Vec<Value> serialization
- Add concurrency comment to execute_all
- Document MVP routing order in route_call

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Signed-off-by: Ashwin Giridharan <girida@amazon.com>
ashwing added 14 commits June 17, 2026 14:39
Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Pre-commit hook interprets #![...] at line 1 as a shebang.
Adding a comment line before it prevents the false positive.

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Composes execute() + dispatch_tools() in a tool-call loop:
1. Execute request (non-streaming)
2. dispatch_tools inspects output for FunctionCall items
3. If Continue → inject tool results into input, re-enter
4. If Done/Incomplete → persist and return final payload

MVP: non-streaming only. Streaming + tool dispatch requires
StreamTee (future PR). Rejects stream=true with clear error.

This completes Phase 2 of the core API design (PR vllm-project#44):
traits + dispatch + loop orchestration in one PR.

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
13 tests covering the full agentic loop orchestrator:

P0 — streaming rejection:
- Rejects stream=true with clear error, no LLM call made

P1 — core loop paths:
- No tool calls → returns directly after 1 LLM call
- Persistence when store=true
- Single tool call → dispatch → re-enter → text response
- Parallel tool calls → both executed → re-enter
- Mixed message + function_call → only FC dispatched
- Max iterations → stops loop, returns last payload
- Max iterations boundary (1 allows single dispatch)

Failure cases:
- Tool provider fails → error fed to model, loop continues
- No providers configured → error per call_id
- Empty model output → returns Done immediately
- Multi-hop (3 iterations: tool A → tool B → text)
- LLM server error → propagated as Err

Uses existing TestFixture infrastructure (MockServer + SQLite).

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Records a 2-turn tool-call session from google/gemma-4-26B-A4B-it
(vLLM v0.21.0) and replays it through execute_loop:
  Turn 1: model calls get_weather → FunctionCall output
  Turn 2: model receives tool result → text response

Cassette file: tests/cassettes/tool_loop/function-call-loop-vllm-gemma4.yaml

Validates that execute_loop produces the same final text as the
real model session (2 iterations, correct tool result injection).

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Round 1 fixes:
- Add MAX_LOOP_GUARD (128) — defense-in-depth hard cap
- Remove broken persistence from execute_loop (caller handles it)
- Capture original_previous_response_id, restore on final payload
- Set payload.status = "incomplete" for Incomplete path
- Set request.store = false for intermediate iterations

Round 2 findings:
- TODO: InputItem needs FunctionCall variant to inject assistant's
  tool-call items into context (type system limitation, follow-up PR)
- Document store=false semantics and performance characteristics

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- Add tool_timeout (Duration, default 30s) to ToolContext
- Wrap each tool execution in tokio::time::timeout — hung providers
  produce timeout error string per call_id (not total failure)
- Add test: Items input extended correctly on Continue
- Add test: previous_response_id=None preserved through loop
- Add test: cassette-driven loop from live vLLM (P2)

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Wraps each execute() call in tokio::time::timeout using
exec_ctx.streaming_timeout (default 30s). If the LLM hangs,
produces a clear error with iteration number.

Duration::ZERO disables the timeout (same pattern as streaming_timeout
and tool_timeout).

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
- execute_loop.rs: module-level architecture diagram, contract docs,
  timeout documentation, known limitations, inline step explanations
- dispatch.rs: module-level decision flow, step-by-step inline comments,
  error semantics documentation
- tool_context.rs: design opinions, concurrency model, failure model,
  retry policy, routing explanation with follow-up notes

All doc comments pass clippy::doc_markdown.

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
…ails

Round 3 review fixes:
- Move request.store=false BEFORE the loop (was inside Continue arm,
  too late for first iteration). Prevents persisting partial responses
  containing only tool-call output without final answer.
- Populate payload.incomplete_details with the reason string when
  max_iterations is hit. Clients need this per Responses API spec.

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
…alloc

- Move types to top-level imports (ExecutorError, InputItem, InputMessage, etc.)
- Mutate request.input in-place instead of clone→extend→reassign each iteration
- Use peekable iterator in dispatch to skip Vec allocation on early-exit paths
- Pass iterator directly to join_all, removing intermediate Vec in execute_all

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
PR vllm-project#56 changed persist logic to fire when ANY of store/previous_response_id/
conversation_id is set. Our loop was only clearing store=false, leaving
conversation_id as a persistence trigger for intermediate iterations.
Now clear all three before the loop starts and restore both IDs on the
returned payload (matching the original request's response chain).

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Add 6 new tests covering the critical invariants:
- previous_response_id=Some restored on Done path
- conversation_id=Some restored on Done path
- store=true suppressed to false in internal LLM calls
- all 3 persistence triggers cleared in multi-hop scenario
- both IDs restored on Incomplete (max_iterations) path
- Items input + conversation_id works with in-place extend

Also tighten doc comment on execute_loop to accurately describe
clearing all 3 triggers and restoring both IDs.

Signed-off-by: Ashwin Giridharan <girida@amazon.com>
Signed-off-by: Ashwin Giridharan <girida@amazon.com>
@ashwing ashwing force-pushed the feat/tool-dispatch branch from 9f91aa8 to 4c4a28e Compare June 17, 2026 22:15
@ashwing

ashwing commented Jun 17, 2026

Copy link
Copy Markdown
Collaborator Author

@maralbahari PR #59 is merged — rebased this on top of it. Ready for re-review. Updated the PR description with the sequencing we discussed.

@maralbahari

Copy link
Copy Markdown
Collaborator

@maralbahari PR #59 is merged — rebased this on top of it. Ready for re-review. Updated the PR description with the sequencing we discussed.

@ashwing Thank you for changes. as per discussed in community meeting today. we should hold on this PR since the agentic execution_loop cannot be completed as off now. the tools are not implemented in this PR they're just place stub functions. first step would be get MCP in.

@ashwing

ashwing commented Jul 2, 2026

Copy link
Copy Markdown
Collaborator Author

Closing this in favor of #67

@ashwing ashwing closed this Jul 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants