-
Notifications
You must be signed in to change notification settings - Fork 47
feat(runner): dependency-aware parallel tool execution #44
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+854
−387
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,293 @@ | ||
| # ============================================================================== | ||
| # © 2025 Dedalus Labs, Inc. and affiliates | ||
| # Licensed under MIT | ||
| # github.com/dedalus-labs/dedalus-sdk-python/LICENSE | ||
| # ============================================================================== | ||
|
|
||
| """Dependency-aware local tool scheduler. | ||
|
|
||
| When the API server returns ``pending_local_calls`` with dependency | ||
| info, this module topo-sorts and executes them in parallel layers. | ||
| Independent tools fire concurrently; dependent tools wait for their | ||
| prerequisites. | ||
|
|
||
| Falls back to sequential execution when dependencies form a cycle | ||
| (model hallucinated wrong deps). | ||
|
|
||
| Functions: | ||
| execute_local_tools_async -- async path, uses asyncio.gather per layer | ||
| execute_local_tools_sync -- sync path, sequential within layers | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import json | ||
| import asyncio | ||
| from typing import Any, Dict, List, Tuple | ||
|
|
||
| from graphlib import CycleError, TopologicalSorter | ||
|
|
||
|
|
||
| def _parse_pending_calls( | ||
| tool_calls: List[Dict[str, Any]], | ||
| ) -> Tuple[Dict[str, Dict[str, Any]], TopologicalSorter]: | ||
| """Parse pending local calls and build a TopologicalSorter. | ||
|
|
||
| Returns (calls_by_id, sorter). Each entry in calls_by_id has | ||
| the parsed function name and arguments ready for execution. | ||
|
|
||
| Raises CycleError if dependencies are cyclic. | ||
| """ | ||
| calls_by_id: Dict[str, Dict[str, Any]] = {} | ||
| sorter: TopologicalSorter = TopologicalSorter() | ||
| known_ids = {tc["id"] for tc in tool_calls if "id" in tc} | ||
|
|
||
| for tc in tool_calls: | ||
| call_id = tc.get("id", "") | ||
| fn_name = tc["function"]["name"] | ||
| fn_args_str = tc["function"]["arguments"] | ||
|
|
||
| try: | ||
| fn_args = json.loads(fn_args_str) if isinstance(fn_args_str, str) else fn_args_str | ||
| except json.JSONDecodeError: | ||
| fn_args = {} | ||
|
|
||
| # Filter deps to only known ids in this batch. | ||
| raw_deps = tc.get("dependencies") or [] | ||
| deps = [dep for dep in raw_deps if dep in known_ids and dep != call_id] | ||
|
|
||
| calls_by_id[call_id] = {"name": fn_name, "args": fn_args, "id": call_id} | ||
| sorter.add(call_id, *deps) | ||
|
|
||
| sorter.prepare() | ||
| return calls_by_id, sorter | ||
|
|
||
|
|
||
| async def execute_local_tools_async( | ||
| tool_calls: List[Dict[str, Any]], | ||
| tool_handler: Any, | ||
| messages: List[Dict[str, Any]], | ||
| tool_results: List[Dict[str, Any]], | ||
| tools_called: List[str], | ||
| step: int, | ||
| *, | ||
| verbose: bool = False, | ||
| ) -> None: | ||
| """Execute local tool calls respecting dependencies (async). | ||
|
|
||
| Independent tools within the same topo layer fire concurrently | ||
| via asyncio.gather. Falls back to sequential on cycle. | ||
|
|
||
| The caller is responsible for appending the assistant message | ||
| (with tool_calls and any reasoning_content) before calling this. | ||
| """ | ||
| if not tool_calls: | ||
| return | ||
|
|
||
| try: | ||
| calls_by_id, sorter = _parse_pending_calls(tool_calls) | ||
| except CycleError: | ||
| # If wrong deps from model, fall back to sequential. | ||
| await _execute_sequential_async( | ||
| tool_calls, | ||
| tool_handler, | ||
| messages, | ||
| tool_results, | ||
| tools_called, | ||
| step, | ||
| verbose, | ||
| ) | ||
| return | ||
|
|
||
| # Drive the sorter layer by layer. | ||
| while sorter.is_active(): | ||
| ready = list(sorter.get_ready()) | ||
| if not ready: | ||
| break | ||
|
|
||
| if len(ready) == 1: | ||
| # Single tool: no gather overhead. | ||
| call_id = ready[0] | ||
| await _run_one_async( | ||
| calls_by_id[call_id], | ||
| tool_handler, | ||
| messages, | ||
| tool_results, | ||
| tools_called, | ||
| step, | ||
| verbose, | ||
| ) | ||
| sorter.done(call_id) | ||
| else: | ||
| # Multiple independent tools: fire concurrently. | ||
| results = await asyncio.gather( | ||
| *[ | ||
| _run_one_async( | ||
| calls_by_id[call_id], | ||
| tool_handler, | ||
| messages, | ||
| tool_results, | ||
| tools_called, | ||
| step, | ||
| verbose, | ||
| ) | ||
| for call_id in ready | ||
| ], | ||
| return_exceptions=True, | ||
| ) | ||
|
|
||
| for call_id, result in zip(ready, results): | ||
| if isinstance(result, Exception): | ||
| # Already recorded in messages by _run_one_async. | ||
| pass | ||
| sorter.done(call_id) | ||
|
|
||
|
|
||
| def execute_local_tools_sync( | ||
| tool_calls: List[Dict[str, Any]], | ||
| tool_handler: Any, | ||
| messages: List[Dict[str, Any]], | ||
| tool_results: List[Dict[str, Any]], | ||
| tools_called: List[str], | ||
| step: int, | ||
| ) -> None: | ||
| """Execute local tool calls respecting dependencies (sync). | ||
|
|
||
| Executes in topo order, one at a time. No parallelism in sync mode, | ||
| but ordering is correct. | ||
|
|
||
| The caller is responsible for appending the assistant message | ||
| (with tool_calls and any reasoning_content) before calling this. | ||
| """ | ||
| if not tool_calls: | ||
| return | ||
|
|
||
| try: | ||
| calls_by_id, sorter = _parse_pending_calls(tool_calls) | ||
| except CycleError: | ||
| _execute_sequential_sync( | ||
| tool_calls, | ||
| tool_handler, | ||
| messages, | ||
| tool_results, | ||
| tools_called, | ||
| step, | ||
| ) | ||
| return | ||
|
|
||
| while sorter.is_active(): | ||
| ready = list(sorter.get_ready()) | ||
| if not ready: | ||
| break | ||
| for call_id in ready: | ||
| _run_one_sync( | ||
| calls_by_id[call_id], | ||
| tool_handler, | ||
| messages, | ||
| tool_results, | ||
| tools_called, | ||
| step, | ||
| ) | ||
| sorter.done(call_id) | ||
|
|
||
|
|
||
| # --- Single tool execution --- | ||
|
|
||
|
|
||
| async def _run_one_async( | ||
| call: Dict[str, Any], | ||
| tool_handler: Any, | ||
| messages: List[Dict[str, Any]], | ||
| tool_results: List[Dict[str, Any]], | ||
| tools_called: List[str], | ||
| step: int, | ||
| verbose: bool, | ||
| ) -> None: | ||
| """Execute a single tool call and record results.""" | ||
| fn_name = call["name"] | ||
| fn_args = call["args"] | ||
| call_id = call["id"] | ||
|
|
||
| try: | ||
| result = await tool_handler.exec(fn_name, fn_args) | ||
| tool_results.append({"name": fn_name, "result": result, "step": step}) | ||
| tools_called.append(fn_name) | ||
| messages.append({"role": "tool", "tool_call_id": call_id, "content": str(result)}) | ||
| if verbose: | ||
| print(f" Tool {fn_name}: {str(result)[:50]}...") # noqa: T201 | ||
| except Exception as e: | ||
| tool_results.append({"error": str(e), "name": fn_name, "step": step}) | ||
| messages.append({"role": "tool", "tool_call_id": call_id, "content": f"Error: {e}"}) | ||
| if verbose: | ||
| print(f" Tool {fn_name} failed: {e}") # noqa: T201 | ||
|
|
||
|
|
||
| def _run_one_sync( | ||
| call: Dict[str, Any], | ||
| tool_handler: Any, | ||
| messages: List[Dict[str, Any]], | ||
| tool_results: List[Dict[str, Any]], | ||
| tools_called: List[str], | ||
| step: int, | ||
| ) -> None: | ||
| """Execute a single tool call synchronously and record results.""" | ||
| fn_name = call["name"] | ||
| fn_args = call["args"] | ||
| call_id = call["id"] | ||
|
|
||
| try: | ||
| result = tool_handler.exec_sync(fn_name, fn_args) | ||
| tool_results.append({"name": fn_name, "result": result, "step": step}) | ||
| tools_called.append(fn_name) | ||
| messages.append({"role": "tool", "tool_call_id": call_id, "content": str(result)}) | ||
| except Exception as e: | ||
| tool_results.append({"error": str(e), "name": fn_name, "step": step}) | ||
| messages.append({"role": "tool", "tool_call_id": call_id, "content": f"Error: {e}"}) | ||
|
|
||
|
|
||
| # --- Sequential fallback --- | ||
|
|
||
|
|
||
| async def _execute_sequential_async( | ||
| tool_calls: List[Dict[str, Any]], | ||
| tool_handler: Any, | ||
| messages: List[Dict[str, Any]], | ||
| tool_results: List[Dict[str, Any]], | ||
| tools_called: List[str], | ||
| step: int, | ||
| verbose: bool, | ||
| ) -> None: | ||
| """Fallback: execute all tools sequentially (no dependency ordering).""" | ||
| for tc in tool_calls: | ||
| fn_name = tc["function"]["name"] | ||
| fn_args_str = tc["function"]["arguments"] | ||
| call_id = tc.get("id", "") | ||
| try: | ||
| fn_args = json.loads(fn_args_str) if isinstance(fn_args_str, str) else fn_args_str | ||
| except json.JSONDecodeError: | ||
| fn_args = {} | ||
|
|
||
| call = {"name": fn_name, "args": fn_args, "id": call_id} | ||
| await _run_one_async(call, tool_handler, messages, tool_results, tools_called, step, verbose) | ||
|
|
||
|
|
||
| def _execute_sequential_sync( | ||
| tool_calls: List[Dict[str, Any]], | ||
| tool_handler: Any, | ||
| messages: List[Dict[str, Any]], | ||
| tool_results: List[Dict[str, Any]], | ||
| tools_called: List[str], | ||
| step: int, | ||
| ) -> None: | ||
| """Fallback: execute all tools sequentially (no dependency ordering).""" | ||
| for tc in tool_calls: | ||
| fn_name = tc["function"]["name"] | ||
| fn_args_str = tc["function"]["arguments"] | ||
| call_id = tc.get("id", "") | ||
| try: | ||
| fn_args = json.loads(fn_args_str) if isinstance(fn_args_str, str) else fn_args_str | ||
| except json.JSONDecodeError: | ||
| fn_args = {} | ||
|
|
||
| call = {"name": fn_name, "args": fn_args, "id": call_id} | ||
| _run_one_sync(call, tool_handler, messages, tool_results, tools_called, step) | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Gather silently swallows unrecorded exceptions in parallel path
Low Severity
When multiple tools run in the
asyncio.gatherpath withreturn_exceptions=True, any exception that escapes_run_one_async(not caught by its internalexcept Exception) is captured as a result but never recorded inmessages. The comment "Already recorded in messages by _run_one_async" is incorrect for this case. Meanwhilesorter.done(call_id)is still called, so dependent tools proceed without the prerequisite's result message — leading to a malformed conversation. In the single-tool path, the same exception propagates correctly and aborts execution. This inconsistency means the same failure mode is a clear error for one tool but silent data loss for two or more.🔬 Verification Test
Why verification test was not possible: The development VM was unreachable during analysis (all tool calls returned "Pod exists but exec-daemon is unreachable"). However, this bug is demonstrable through static analysis: the
passon line 142 takes no corrective action (no message recorded, no re-raise), andsorter.done(call_id)on line 143 runs unconditionally regardless of whether the exception was recorded. The comment on line 141 is provably incorrect —_run_one_asynconly records errors inside its ownexcept Exceptionblock, and any exception that escapes was by definition NOT caught there.Additional Locations (1)
src/dedalus_labs/lib/runner/_scheduler.py#L107-L120