diff --git a/src/dedalus_labs/lib/runner/core.py b/src/dedalus_labs/lib/runner/core.py index 94f0acf..0e44600 100644 --- a/src/dedalus_labs/lib/runner/core.py +++ b/src/dedalus_labs/lib/runner/core.py @@ -8,6 +8,7 @@ import asyncio import inspect +import json from typing import ( TYPE_CHECKING, Any, @@ -685,6 +686,7 @@ async def _execute_streaming_async( content_chunks = 0 tool_call_chunks = 0 finish_reason = None + mcp_tool_results_from_server: list = [] async for chunk in stream: chunk_count += 1 if exec_config.verbose: @@ -694,6 +696,12 @@ async def _execute_streaming_async( meta = extra.get("x_dedalus_event") or extra.get("dedalus_event") if isinstance(meta, dict) and meta.get("type") == "agent_updated": print(f" [EVENT] agent_updated: agent={meta.get('agent')} model={meta.get('model')}") + + # Collect MCP tool results emitted by the server + chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {} + if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra: + mcp_tool_results_from_server = chunk_extra["mcp_tool_results"] + if hasattr(chunk, "choices") and chunk.choices: choice = chunk.choices[0] delta = choice.delta @@ -765,7 +773,6 @@ async def _execute_streaming_async( local_only = [ tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {}) ] - messages.append({"role": "assistant", "tool_calls": local_only}) from ._scheduler import execute_local_tools_async @@ -965,9 +972,16 @@ def _execute_streaming_sync( tool_call_chunks = 0 finish_reason = None accumulated_content = "" + mcp_tool_results_from_server: list = [] for chunk in stream: chunk_count += 1 + + # Collect MCP tool results emitted by the server + chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {} + if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra: + mcp_tool_results_from_server = chunk_extra["mcp_tool_results"] + if hasattr(chunk, "choices") and chunk.choices: choice = chunk.choices[0] delta = choice.delta @@ -1048,7 +1062,6 @@ def _execute_streaming_sync( local_only = [ tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {}) ] - messages.append({"role": "assistant", "tool_calls": local_only}) from ._scheduler import execute_local_tools_sync @@ -1153,16 +1166,18 @@ def _extract_tool_calls(self, choice) -> list[ToolCall]: fn = tc_dict.get("function", {}) fn_dict = vars(fn) if hasattr(fn, "__dict__") else fn - calls.append( - { - "id": tc_dict.get("id", ""), - "type": tc_dict.get("type", "function"), - "function": { - "name": fn_dict.get("name", ""), - "arguments": fn_dict.get("arguments", "{}"), - }, - } - ) + tc_out: ToolCall = { + "id": tc_dict.get("id", ""), + "type": tc_dict.get("type", "function"), + "function": { + "name": fn_dict.get("name", ""), + "arguments": fn_dict.get("arguments", "{}"), + }, + } + thought_sig = tc_dict.get("thought_signature") + if thought_sig: + tc_out["thought_signature"] = thought_sig + calls.append(tc_out) return calls async def _execute_tool_calls( @@ -1245,6 +1260,9 @@ def _accumulate_tool_calls(self, deltas, acc: list[ToolCall]) -> None: acc[index]["function"]["name"] = fn.name if hasattr(fn, "arguments") and fn.arguments: acc[index]["function"]["arguments"] += fn.arguments + thought_sig = getattr(delta, "thought_signature", None) + if thought_sig: + acc[index]["thought_signature"] = thought_sig @staticmethod def _mk_kwargs(mc: _ModelConfig) -> Dict[str, Any]: