Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 30 additions & 12 deletions src/dedalus_labs/lib/runner/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

import asyncio
import inspect
import json
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unused json import added to module

Low Severity

The import json statement was added but the json module is not used anywhere in the changed code. This appears to be a leftover from planned code that was not implemented, likely related to the incomplete mcp_tool_results_from_server feature which may have intended to serialize results using JSON.

🔬 Verification Test

Why verification test was not possible: The VM infrastructure was unreachable during this review. However, this issue is verifiable through static analysis - the string "json." does not appear anywhere in the diff's changed code, confirming the import is unused in the new code being added.

Fix in Cursor Fix in Web

from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -685,6 +686,7 @@ async def _execute_streaming_async(
content_chunks = 0
tool_call_chunks = 0
finish_reason = None
mcp_tool_results_from_server: list = []
async for chunk in stream:
chunk_count += 1
if exec_config.verbose:
Expand All @@ -694,6 +696,12 @@ async def _execute_streaming_async(
meta = extra.get("x_dedalus_event") or extra.get("dedalus_event")
if isinstance(meta, dict) and meta.get("type") == "agent_updated":
print(f" [EVENT] agent_updated: agent={meta.get('agent')} model={meta.get('model')}")

# Collect MCP tool results emitted by the server
chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {}
if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra:
mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MCP tool results overwritten instead of accumulated

Low Severity

When collecting MCP tool results from server chunks, the code uses assignment (mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]) rather than extending the list. If the server sends mcp_tool_results across multiple chunks, earlier results would be overwritten and lost. This pattern appears in both the async streaming path (line 706) and sync streaming path (line 1015).

🔬 Verification Test

Why verification test was not possible: This potential edge case depends on the server's behavior when streaming MCP tool results. Without access to the actual server implementation or mock infrastructure that simulates multi-chunk MCP result delivery, it's not possible to verify whether results would be lost in practice. The bug is flagged based on the code pattern showing assignment rather than accumulation in a streaming context.

Additional Locations (1)

Fix in Cursor Fix in Web

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Server tool results collected but never injected into messages

High Severity

The variable mcp_tool_results_from_server is collected from stream chunks but never actually used. The PR description explicitly states the intent is to "inject server tool results into conversation history before local tool execution," but the collected results are never injected into messages. The variable is initialized, populated in the loop, then discarded without being used. The fix for this feature is incomplete.

🔬 Verification Test

Why verification test was not possible: The VM infrastructure was unreachable during this review (repeated "Pod exists but exec-daemon is unreachable" errors across 30+ attempts). However, this bug is clearly identifiable through static analysis of the diff alone - the variable mcp_tool_results_from_server is only ever written to (on lines 703 and 983) and never read from anywhere in the diff. Searching for any usage of this variable shows it's only referenced in assignment statements, confirming the collected data is never utilized.

Additional Locations (1)

Fix in Cursor Fix in Web


if hasattr(chunk, "choices") and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
Expand Down Expand Up @@ -765,7 +773,6 @@ async def _execute_streaming_async(
local_only = [
tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {})
]
messages.append({"role": "assistant", "tool_calls": local_only})

from ._scheduler import execute_local_tools_async

Expand Down Expand Up @@ -965,9 +972,16 @@ def _execute_streaming_sync(
tool_call_chunks = 0
finish_reason = None
accumulated_content = ""
mcp_tool_results_from_server: list = []

for chunk in stream:
chunk_count += 1

# Collect MCP tool results emitted by the server
chunk_extra = getattr(chunk, "__pydantic_extra__", None) or {}
if isinstance(chunk_extra, dict) and "mcp_tool_results" in chunk_extra:
mcp_tool_results_from_server = chunk_extra["mcp_tool_results"]

if hasattr(chunk, "choices") and chunk.choices:
choice = chunk.choices[0]
delta = choice.delta
Expand Down Expand Up @@ -1048,7 +1062,6 @@ def _execute_streaming_sync(
local_only = [
tc for tc in tool_calls if tc["function"]["name"] in getattr(tool_handler, "_funcs", {})
]
messages.append({"role": "assistant", "tool_calls": local_only})

from ._scheduler import execute_local_tools_sync

Expand Down Expand Up @@ -1153,16 +1166,18 @@ def _extract_tool_calls(self, choice) -> list[ToolCall]:
fn = tc_dict.get("function", {})
fn_dict = vars(fn) if hasattr(fn, "__dict__") else fn

calls.append(
{
"id": tc_dict.get("id", ""),
"type": tc_dict.get("type", "function"),
"function": {
"name": fn_dict.get("name", ""),
"arguments": fn_dict.get("arguments", "{}"),
},
}
)
tc_out: ToolCall = {
"id": tc_dict.get("id", ""),
"type": tc_dict.get("type", "function"),
"function": {
"name": fn_dict.get("name", ""),
"arguments": fn_dict.get("arguments", "{}"),
},
}
thought_sig = tc_dict.get("thought_signature")
if thought_sig:
tc_out["thought_signature"] = thought_sig
calls.append(tc_out)
return calls

async def _execute_tool_calls(
Expand Down Expand Up @@ -1245,6 +1260,9 @@ def _accumulate_tool_calls(self, deltas, acc: list[ToolCall]) -> None:
acc[index]["function"]["name"] = fn.name
if hasattr(fn, "arguments") and fn.arguments:
acc[index]["function"]["arguments"] += fn.arguments
thought_sig = getattr(delta, "thought_signature", None)
if thought_sig:
acc[index]["thought_signature"] = thought_sig

@staticmethod
def _mk_kwargs(mc: _ModelConfig) -> Dict[str, Any]:
Expand Down
Loading