From bf2b1f2372714c0b200b657cce9f96dc5846861d Mon Sep 17 00:00:00 2001 From: farhoud Date: Sun, 14 Dec 2025 19:04:13 +0330 Subject: [PATCH 1/3] Implement multi-agent support and configurable memory in llmcore - Add db package with Neo4j persistence for agent runs and flows - Introduce flow wrappers as dicts for grouping steps in agent executions - Replace memory with functional strategies (full history, summarized, vector) - Refactor AgentRun to use flows and pluggable memory functions - Update persistence to save/load flows in Neo4j graph structure - Add tests for new functionality - Simplify OOP by using functions and dicts instead of classes/inheritance --- src/scouter/db/__init__.py | 15 ++++ src/scouter/db/models.py | 125 ++++++++++++++++++++++++++++++++ src/scouter/db/neo4j.py | 34 +++++++++ src/scouter/llmcore/__init__.py | 4 +- src/scouter/llmcore/agent.py | 122 ++++++++++++++++++++----------- src/scouter/llmcore/flow.py | 41 +++++++++++ src/scouter/llmcore/memory.py | 37 ++++++++++ src/scouter/llmcore/messages.py | 36 +++------ tests/test_llmcore_agent.py | 15 ++++ 9 files changed, 359 insertions(+), 70 deletions(-) create mode 100644 src/scouter/db/__init__.py create mode 100644 src/scouter/db/models.py create mode 100644 src/scouter/db/neo4j.py create mode 100644 src/scouter/llmcore/flow.py create mode 100644 src/scouter/llmcore/memory.py create mode 100644 tests/test_llmcore_agent.py diff --git a/src/scouter/db/__init__.py b/src/scouter/db/__init__.py new file mode 100644 index 0000000..e725ba7 --- /dev/null +++ b/src/scouter/db/__init__.py @@ -0,0 +1,15 @@ +"""Database package for Scouter. + +This package provides abstractions and utilities for database operations, +primarily focused on Neo4j for graph-based storage and retrieval. +""" + +from .models import AgentRunRepository +from .neo4j import get_neo4j_driver, get_neo4j_embedder, get_neo4j_llm + +__all__ = [ + "AgentRunRepository", + "get_neo4j_driver", + "get_neo4j_embedder", + "get_neo4j_llm", +] diff --git a/src/scouter/db/models.py b/src/scouter/db/models.py new file mode 100644 index 0000000..771e98a --- /dev/null +++ b/src/scouter/db/models.py @@ -0,0 +1,125 @@ +"""Database models and abstractions for Scouter. + +This module defines data models and operations for storing agent-related data +in Neo4j, such as agent runs, flows, and steps. +""" + +import json +from typing import Any + +import neo4j +from scouter.llmcore.agent import AgentRun + +# Flow is now dict + + +class AgentRunRepository: + """Repository for managing AgentRun persistence in Neo4j.""" + + def __init__(self, driver: neo4j.Driver): + self.driver = driver + + def save_run(self, run: AgentRun, run_id: str) -> None: + """Save an AgentRun to Neo4j.""" + with self.driver.session() as session: + # Create run node + session.run( + """ + MERGE (r:AgentRun {id: $id}) + SET r.total_usage = $total_usage, + r.memory_strategy = $memory_strategy + """, + id=run_id, + total_usage=json.dumps(run.total_usage), + memory_strategy=type(run.memory_strategy).__name__, + ) + + # Create flow nodes and relationships + for flow in run.flows: + session.run( + """ + MATCH (r:AgentRun {id: $run_id}) + MERGE (f:Flow {id: $flow_id}) + SET f.agent_id = $agent_id, + f.status = $status, + f.metadata = $metadata, + f.parent_flow_id = $parent_flow_id + CREATE (r)-[:HAS_FLOW]->(f) + """, + run_id=run_id, + flow_id=flow["id"], + agent_id=flow["agent_id"], + status=flow["status"], + metadata=json.dumps(flow["metadata"]), + parent_flow_id=flow["parent_flow_id"], + ) + + # Create step nodes for each flow + for i, step in enumerate(flow["steps"]): + step_data = self._serialize_step(step) + session.run( + """ + MATCH (f:Flow {id: $flow_id}) + CREATE (s:AgentStep {index: $index, type: $type, data: $data}) + CREATE (f)-[:HAS_STEP]->(s) + """, + flow_id=flow["id"], + index=i, + type=type(step).__name__, + data=json.dumps(step_data), + ) + + def load_run(self, run_id: str) -> AgentRun | None: + """Load an AgentRun from Neo4j.""" + with self.driver.session() as session: + result = session.run( + """ + MATCH (r:AgentRun {id: $id})-[:HAS_FLOW]->(f:Flow)-[:HAS_STEP]->(s:AgentStep) + RETURN r, f, collect(s) as steps ORDER BY f.id, s.index + """, + id=run_id, + ) + records = list(result) + if not records: + return None + + records[0]["r"] + flows = {} + for record in records: + flow_node = record["f"] + step_nodes = record["steps"] + + flow_id = flow_node["id"] + if flow_id not in flows: + flows[flow_id] = { + "id": flow_id, + "agent_id": flow_node["agent_id"], + "status": flow_node["status"], + "metadata": json.loads(flow_node["metadata"]), + "parent_flow_id": flow_node["parent_flow_id"], + "steps": [], + } + + # Reconstruct steps + for step_node in step_nodes: + step = self._deserialize_step(step_node["type"], step_node["data"]) + flows[flow_id]["steps"].append(step) + + return AgentRun(flows=list(flows.values())) + # TODO: Restore memory_function from run_node + + def _serialize_step(self, step) -> dict[str, Any]: + """Serialize a step to a dict.""" + if hasattr(step, "__dict__"): + return step.__dict__ + return {"data": str(step)} + + def _deserialize_step(self, step_type: str, data: str): + """Deserialize a step from JSON.""" + data_dict = json.loads(data) + if step_type == "LLMStep": + return LLMStep(**data_dict) + if step_type == "ToolStep": + return ToolStep(**data_dict) + msg = f"Unknown step type: {step_type}" + raise ValueError(msg) diff --git a/src/scouter/db/neo4j.py b/src/scouter/db/neo4j.py new file mode 100644 index 0000000..28f2979 --- /dev/null +++ b/src/scouter/db/neo4j.py @@ -0,0 +1,34 @@ +"""Database connections and utilities for Scouter. + +This module provides Neo4j driver setup and related database utilities. +""" + +import os + +from neo4j_graphrag.embeddings import SentenceTransformerEmbeddings +from neo4j_graphrag.llm import OpenAILLM + +import neo4j +from neo4j import GraphDatabase + + +def get_neo4j_driver() -> neo4j.Driver: + """Get a Neo4j driver instance.""" + uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") + user = os.getenv("NEO4J_USER", "neo4j") + password = os.getenv("NEO4J_PASSWORD", "password") + return GraphDatabase.driver(uri, auth=(user, password)) + + +def get_neo4j_llm() -> OpenAILLM: + """Get a Neo4j LLM instance configured for OpenAI.""" + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + msg = "OPENAI_API_KEY environment variable is required" + raise ValueError(msg) + return OpenAILLM(model_name="gpt-4o-mini", model_params={"api_key": api_key}) + + +def get_neo4j_embedder() -> SentenceTransformerEmbeddings: + """Get a Neo4j embedder instance.""" + return SentenceTransformerEmbeddings(model="all-MiniLM-L6-v2") diff --git a/src/scouter/llmcore/__init__.py b/src/scouter/llmcore/__init__.py index e3765e9..af1a775 100644 --- a/src/scouter/llmcore/__init__.py +++ b/src/scouter/llmcore/__init__.py @@ -1,4 +1,4 @@ -from .agent import AgentRun, run_agent +from .agent import AgentRun, LLMStep, ToolStep, run_agent from .client import ChatCompletionOptions, LLMConfig, call_llm, create_llm_client from .exceptions import ( AgentError, @@ -50,10 +50,12 @@ "InvalidToolDefinitionError", "LLMConfig", "LLMError", + "LLMStep", "MaxRetriesExceededError", "Prompt", "Tool", "ToolExecutionError", + "ToolStep", "call_llm", "create_instruction", "create_llm_client", diff --git a/src/scouter/llmcore/agent.py b/src/scouter/llmcore/agent.py index e810b5f..4e6136c 100644 --- a/src/scouter/llmcore/agent.py +++ b/src/scouter/llmcore/agent.py @@ -23,18 +23,16 @@ from .client import ChatCompletionOptions, call_llm from .exceptions import InvalidRunStateError +from .flow import add_step_to_flow, create_flow, mark_flow_completed, mark_flow_running +from .memory import MemoryFunction, full_history_memory from .tools import run_tool logger = logging.getLogger(__name__) -@dataclass -class InputStep: - message: ChatCompletionMessageParam - - @dataclass class LLMStep: + input: list[ChatCompletionMessageParam] completion: ChatCompletion @property @@ -59,7 +57,7 @@ def message(self) -> ChatCompletionToolMessageParam: ) -Step = InputStep | LLMStep | ToolStep +Step = LLMStep | ToolStep @dataclass @@ -67,36 +65,57 @@ class AgentRun: continue_condition: Callable[[AgentRun], bool] = field( default_factory=lambda: default_continue_condition_factory() ) - steps: list[Step] = field(default_factory=list) - - def add_step(self, step: Step) -> None: - """Add a step to the run.""" - self.steps.append(step) - - @property - def conversation_history(self) -> list[ChatCompletionMessageParam]: - return [step.message for step in self.steps] + flows: list[dict] = field(default_factory=list) + memory_function: MemoryFunction = field(default=full_history_memory) + agents: dict[str, Callable[[], AgentRun]] = field( + default_factory=dict + ) # For multi-agent: factory functions + + def add_flow(self, flow: dict) -> None: + """Add a flow to the run.""" + self.flows.append(flow) + + def get_context(self) -> list[ChatCompletionMessageParam]: + """Get configurable memory context instead of flat history.""" + return self.memory_function(self) + + def run_sub_agent(self, agent_id: str) -> dict: + """Run a sub-agent within this run, returning its flow.""" + if agent_id not in self.agents: + msg = f"Agent {agent_id} not registered" + raise ValueError(msg) + flow = create_flow(flow_id=f"{agent_id}_{len(self.flows)}", agent_id=agent_id) + mark_flow_running(flow) + self.add_flow(flow) + # TODO: Integrate with run_agent for actual execution + # For now, placeholder: assume sub_run executes and adds steps to flow + mark_flow_completed(flow) + return flow @property def total_usage( self, ) -> dict: # Simplified, can make proper ChatCompletionUsage later total = {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0} - for step in self.steps: - if isinstance(step, LLMStep) and step.completion.usage: - usage = step.completion.usage - total["completion_tokens"] += usage.completion_tokens or 0 - total["prompt_tokens"] += usage.prompt_tokens or 0 - total["total_tokens"] += usage.total_tokens or 0 + for flow in self.flows: + for step in flow["steps"]: + if isinstance(step, LLMStep) and step.completion.usage: + usage = step.completion.usage + total["completion_tokens"] += usage.completion_tokens or 0 + total["prompt_tokens"] += usage.prompt_tokens or 0 + total["total_tokens"] += usage.total_tokens or 0 return total @property def last_output(self) -> str: - if not self.steps: - msg = "No steps in run" + if not self.flows: + msg = "No flows in run" logger.error("Attempted to get last output from empty run") raise InvalidRunStateError(msg) - last_step = self.steps[-1] + last_flow = self.flows[-1] + if not last_flow["steps"]: + return "" + last_step = last_flow["steps"][-1] if isinstance(last_step, LLMStep): content = last_step.message.get("content") return content if isinstance(content, str) else "" @@ -106,7 +125,12 @@ def last_output(self) -> str: @property def tool_executions(self) -> list[ToolStep]: - return [step for step in self.steps if isinstance(step, ToolStep)] + executions = [] + for flow in self.flows: + executions.extend( + [step for step in flow["steps"] if isinstance(step, ToolStep)] + ) + return executions def default_continue_condition_factory( @@ -114,17 +138,20 @@ def default_continue_condition_factory( ) -> Callable[[AgentRun], bool]: def condition(run: AgentRun) -> bool: if max_steps is not None: - llm_count = sum(1 for step in run.steps if isinstance(step, LLMStep)) + llm_count = sum( + 1 + for flow in run.flows + for step in flow.steps + if isinstance(step, LLMStep) + ) if llm_count >= max_steps: return False - # Filter out InputStep to find the last meaningful step - non_input_steps = [ - step for step in run.steps if not isinstance(step, InputStep) - ] - if not non_input_steps: - return True # Only InputSteps present, initial state - last_non_input = non_input_steps[-1] - return isinstance(last_non_input, ToolStep) + # Find the last step across all flows + all_steps = [step for flow in run.flows for step in flow.steps] + if not all_steps: + return True # No steps yet + last_step = all_steps[-1] + return isinstance(last_step, ToolStep) return condition @@ -134,16 +161,21 @@ async def run_agent( model: str = "gpt-4o-mini", tools: Iterable[ChatCompletionToolUnionParam] | None = None, options: ChatCompletionOptions | None = None, + agent_id: str = "default", ): logger.info( - "Starting agent run with model=%s, initial_steps=%d", model, len(run.steps) + "Starting agent run with model=%s, initial_flows=%d", model, len(run.flows) ) + current_flow = create_flow(id=f"{agent_id}_main", agent_id=agent_id) + mark_flow_running(current_flow) + run.add_flow(current_flow) + while run.continue_condition(run): - completion: ChatCompletion = call_llm( - model, run.conversation_history, tools, options - ) + context = run.get_context() + completion: ChatCompletion = call_llm(model, context, tools, options) msg = completion.choices[0].message - run.add_step(LLMStep(completion)) + step = LLMStep(input=context, completion=completion) + add_step_to_flow(current_flow, step) # Handle tool calls if msg.tool_calls: @@ -186,7 +218,7 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): tasks = [execute_single_tool(tc) for tc in tool_calls] tool_steps = await asyncio.gather(*tasks, return_exceptions=True) - # Add steps in the order of tool_calls to preserve conversation history + # Add steps to current flow for i, result in enumerate(tool_steps): if isinstance(result, Exception): # Handle unexpected errors in gather @@ -196,7 +228,8 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): tc.function.name, result, ) - run.add_step( + add_step_to_flow( + current_flow, ToolStep( tool_call_id=tc.id, tool_name=tc.function.name, @@ -205,8 +238,9 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): execution_time=0.0, success=False, error_message=str(result), - ) + ), ) else: - run.add_step(result) # type: ignore[reportArgumentType] - logger.info("Agent run completed with %d total steps", len(run.steps)) + add_step_to_flow(current_flow, result) + mark_flow_completed(current_flow) + logger.info("Agent run completed with %d total flows", len(run.flows)) diff --git a/src/scouter/llmcore/flow.py b/src/scouter/llmcore/flow.py new file mode 100644 index 0000000..5e0a1b1 --- /dev/null +++ b/src/scouter/llmcore/flow.py @@ -0,0 +1,41 @@ +"""Flow as dict for grouping steps in agent runs.""" + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .agent import Step + + +def create_flow( + flow_id: str, agent_id: str = "default", parent_flow_id: str | None = None +) -> dict: + """Create a flow dict.""" + return { + "id": flow_id, + "agent_id": agent_id, + "steps": [], + "status": "pending", + "metadata": {}, + "parent_flow_id": parent_flow_id, + } + + +def add_step_to_flow(flow: dict, step: "Step") -> None: + """Add a step to the flow.""" + flow["steps"].append(step) + + +def mark_flow_running(flow: dict) -> None: + flow["status"] = "running" + flow["metadata"]["start_time"] = __import__("time").time() + + +def mark_flow_completed(flow: dict) -> None: + flow["status"] = "completed" + flow["metadata"]["end_time"] = __import__("time").time() + + +def mark_flow_failed(flow: dict, error: str) -> None: + flow["status"] = "failed" + flow["metadata"]["error"] = error + flow["metadata"]["end_time"] = __import__("time").time() diff --git a/src/scouter/llmcore/memory.py b/src/scouter/llmcore/memory.py new file mode 100644 index 0000000..835297b --- /dev/null +++ b/src/scouter/llmcore/memory.py @@ -0,0 +1,37 @@ +"""Memory functions for configurable agent context.""" + +from collections.abc import Callable +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .agent import AgentRun + + +def full_history_memory(run) -> list: + """Memory that includes all conversation history.""" + messages = [] + for flow in run.flows: + messages.extend(step.message for step in flow["steps"]) + return messages + + +def summarized_memory(run, max_steps: int = 10) -> list: + """Memory that keeps recent steps.""" + all_messages = [] + for flow in run.flows: + all_messages.extend([step.message for step in flow["steps"]]) + + if len(all_messages) <= max_steps: + return all_messages + + # Placeholder: keep last N + return all_messages[-max_steps:] + + +def vector_memory(_run) -> list: + """Placeholder for vector-based memory.""" + # TODO: Implement + return [] + + +MemoryFunction = Callable[["AgentRun"], list] diff --git a/src/scouter/llmcore/messages.py b/src/scouter/llmcore/messages.py index 034d8fd..d236a77 100644 --- a/src/scouter/llmcore/messages.py +++ b/src/scouter/llmcore/messages.py @@ -1,29 +1,15 @@ -import logging +"""Message creation utilities.""" -from .agent import InputStep, Step -from .types import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam - -logger = logging.getLogger(__name__) +from openai.types.chat import ChatCompletionMessageParam def create_instruction( - steps: list[Step], system: str | None = None, prompt: str | None = None -) -> None: - """Add system and user messages to the steps list as InputStep instances.""" - logger.debug( - "Creating instruction with system=%s, prompt=%s", bool(system), bool(prompt) - ) - if system: - steps.append( - InputStep( - message=ChatCompletionSystemMessageParam(role="system", content=system) - ) - ) - logger.debug("Added system message to steps") - if prompt: - steps.append( - InputStep( - message=ChatCompletionUserMessageParam(role="user", content=prompt) - ) - ) - logger.debug("Added user message to steps") + system_prompt: str, user_prompt: str +) -> list[ChatCompletionMessageParam]: + """Create instruction messages.""" + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + if user_prompt: + messages.append({"role": "user", "content": user_prompt}) + return messages diff --git a/tests/test_llmcore_agent.py b/tests/test_llmcore_agent.py new file mode 100644 index 0000000..c673d09 --- /dev/null +++ b/tests/test_llmcore_agent.py @@ -0,0 +1,15 @@ +"""Tests for llmcore agent functionality.""" + +from src.scouter.llmcore import ( + create_flow, + mark_flow_completed, + mark_flow_running, +) + + +def test_flow_status(): + flow = create_flow(flow_id="test") + mark_flow_running(flow) + assert flow["status"] == "running" + mark_flow_completed(flow) + assert flow["status"] == "completed" From 4c491d5abcd30ca9944707648a2a0388d1ed2110 Mon Sep 17 00:00:00 2001 From: farhoud Date: Mon, 15 Dec 2025 14:03:59 +0330 Subject: [PATCH 2/3] Refactor Flow to dataclass and fix memory functions - Convert Flow from dict to dataclass with methods for status management - Update AgentRun and tests to use Flow dataclass - Fix memory.py to correctly access step.messages and improve type hints - Update models.py serialization to handle dataclass steps properly --- src/scouter/db/models.py | 44 ++++++++++---- src/scouter/llmcore/agent.py | 110 ++++++++++------------------------ src/scouter/llmcore/flow.py | 102 ++++++++++++++++++++++--------- src/scouter/llmcore/memory.py | 30 +++------- tests/test_llmcore_agent.py | 16 ++--- 5 files changed, 150 insertions(+), 152 deletions(-) diff --git a/src/scouter/db/models.py b/src/scouter/db/models.py index 771e98a..6183b32 100644 --- a/src/scouter/db/models.py +++ b/src/scouter/db/models.py @@ -8,7 +8,8 @@ from typing import Any import neo4j -from scouter.llmcore.agent import AgentRun +from ..llmcore.flow import LLMStep, ToolStep, InputStep +from ..llmcore.agent import AgentRun # Flow is now dict @@ -31,7 +32,7 @@ def save_run(self, run: AgentRun, run_id: str) -> None: """, id=run_id, total_usage=json.dumps(run.total_usage), - memory_strategy=type(run.memory_strategy).__name__, + memory_strategy=type(run.memory_function).__name__, ) # Create flow nodes and relationships @@ -47,15 +48,15 @@ def save_run(self, run: AgentRun, run_id: str) -> None: CREATE (r)-[:HAS_FLOW]->(f) """, run_id=run_id, - flow_id=flow["id"], - agent_id=flow["agent_id"], - status=flow["status"], - metadata=json.dumps(flow["metadata"]), - parent_flow_id=flow["parent_flow_id"], + flow_id=flow.id, + agent_id=flow.agent_id, + status=flow.status, + metadata=json.dumps(flow.metadata), + parent_flow_id=flow.parent_flow_id, ) # Create step nodes for each flow - for i, step in enumerate(flow["steps"]): + for i, step in enumerate(flow.steps): step_data = self._serialize_step(step) session.run( """ @@ -63,7 +64,7 @@ def save_run(self, run: AgentRun, run_id: str) -> None: CREATE (s:AgentStep {index: $index, type: $type, data: $data}) CREATE (f)-[:HAS_STEP]->(s) """, - flow_id=flow["id"], + flow_id=flow.id, index=i, type=type(step).__name__, data=json.dumps(step_data), @@ -110,16 +111,33 @@ def load_run(self, run_id: str) -> AgentRun | None: def _serialize_step(self, step) -> dict[str, Any]: """Serialize a step to a dict.""" - if hasattr(step, "__dict__"): - return step.__dict__ + if isinstance(step, LLMStep): + return { + "completion": step.completion.model_dump(), + } + if isinstance(step, ToolStep): + return { + "calls": [call.__dict__ for call in step.calls], + } + if isinstance(step, InputStep): + return { + "input": step.input, + } return {"data": str(step)} def _deserialize_step(self, step_type: str, data: str): """Deserialize a step from JSON.""" + from ..llmcore.types import ChatCompletion + data_dict = json.loads(data) if step_type == "LLMStep": - return LLMStep(**data_dict) + return LLMStep(completion=ChatCompletion(**data_dict["completion"])) if step_type == "ToolStep": - return ToolStep(**data_dict) + from ..llmcore.flow import ToolCall + + calls = [ToolCall(**call) for call in data_dict["calls"]] + return ToolStep(calls=calls) + if step_type == "InputStep": + return InputStep(input=data_dict["input"]) msg = f"Unknown step type: {step_type}" raise ValueError(msg) diff --git a/src/scouter/llmcore/agent.py b/src/scouter/llmcore/agent.py index 4e6136c..1fbaa82 100644 --- a/src/scouter/llmcore/agent.py +++ b/src/scouter/llmcore/agent.py @@ -7,71 +7,38 @@ from time import time from typing import TYPE_CHECKING, cast -from openai.types.chat import ( - ChatCompletion, - ChatCompletionMessageParam, - ChatCompletionToolMessageParam, - ChatCompletionToolUnionParam, -) +from .client import ChatCompletionOptions, call_llm +from .exceptions import InvalidRunStateError +from .flow import Flow, LLMStep, ToolCall, ToolStep +from .memory import MemoryFunction, full_history_memory +from .tools import run_tool if TYPE_CHECKING: from collections.abc import Callable, Iterable - from openai.types.chat.chat_completion_message_tool_call import ( + from .types import ( + ChatCompletion, + ChatCompletionMessageParam, ChatCompletionMessageToolCall, + ChatCompletionToolUnionParam, ) -from .client import ChatCompletionOptions, call_llm -from .exceptions import InvalidRunStateError -from .flow import add_step_to_flow, create_flow, mark_flow_completed, mark_flow_running -from .memory import MemoryFunction, full_history_memory -from .tools import run_tool logger = logging.getLogger(__name__) -@dataclass -class LLMStep: - input: list[ChatCompletionMessageParam] - completion: ChatCompletion - - @property - def message(self) -> ChatCompletionMessageParam: - return cast("ChatCompletionMessageParam", self.completion.choices[0].message) - - -@dataclass -class ToolStep: - tool_call_id: str - tool_name: str - args: dict - output: str - execution_time: float - success: bool - error_message: str | None - - @property - def message(self) -> ChatCompletionToolMessageParam: - return ChatCompletionToolMessageParam( - role="tool", content=self.output, tool_call_id=self.tool_call_id - ) - - -Step = LLMStep | ToolStep - - @dataclass class AgentRun: continue_condition: Callable[[AgentRun], bool] = field( default_factory=lambda: default_continue_condition_factory() ) - flows: list[dict] = field(default_factory=list) + flows: list[Flow] = field(default_factory=list) memory_function: MemoryFunction = field(default=full_history_memory) agents: dict[str, Callable[[], AgentRun]] = field( default_factory=dict ) # For multi-agent: factory functions - def add_flow(self, flow: dict) -> None: + def add_flow(self, flow: Flow) -> None: """Add a flow to the run.""" self.flows.append(flow) @@ -79,17 +46,17 @@ def get_context(self) -> list[ChatCompletionMessageParam]: """Get configurable memory context instead of flat history.""" return self.memory_function(self) - def run_sub_agent(self, agent_id: str) -> dict: + def run_sub_agent(self, agent_id: str) -> Flow: """Run a sub-agent within this run, returning its flow.""" if agent_id not in self.agents: msg = f"Agent {agent_id} not registered" raise ValueError(msg) - flow = create_flow(flow_id=f"{agent_id}_{len(self.flows)}", agent_id=agent_id) - mark_flow_running(flow) + flow = Flow(id=f"{agent_id}_{len(self.flows)}", agent_id=agent_id) + flow.mark_running() self.add_flow(flow) # TODO: Integrate with run_agent for actual execution # For now, placeholder: assume sub_run executes and adds steps to flow - mark_flow_completed(flow) + flow.mark_completed() return flow @property @@ -98,7 +65,7 @@ def total_usage( ) -> dict: # Simplified, can make proper ChatCompletionUsage later total = {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0} for flow in self.flows: - for step in flow["steps"]: + for step in flow.steps: if isinstance(step, LLMStep) and step.completion.usage: usage = step.completion.usage total["completion_tokens"] += usage.completion_tokens or 0 @@ -113,14 +80,14 @@ def last_output(self) -> str: logger.error("Attempted to get last output from empty run") raise InvalidRunStateError(msg) last_flow = self.flows[-1] - if not last_flow["steps"]: + if not last_flow.steps: return "" - last_step = last_flow["steps"][-1] + last_step = last_flow.steps[-1] if isinstance(last_step, LLMStep): - content = last_step.message.get("content") - return content if isinstance(content, str) else "" + content = last_step.completion.choices[0].message.content + return content if content else "" if isinstance(last_step, ToolStep): - return last_step.output + return str(last_step.messages) return "" @property @@ -128,7 +95,7 @@ def tool_executions(self) -> list[ToolStep]: executions = [] for flow in self.flows: executions.extend( - [step for step in flow["steps"] if isinstance(step, ToolStep)] + [step for step in flow.steps if isinstance(step, ToolStep)] ) return executions @@ -156,7 +123,7 @@ def condition(run: AgentRun) -> bool: return condition -async def run_agent( +async def run_flow( run: AgentRun, model: str = "gpt-4o-mini", tools: Iterable[ChatCompletionToolUnionParam] | None = None, @@ -166,16 +133,16 @@ async def run_agent( logger.info( "Starting agent run with model=%s, initial_flows=%d", model, len(run.flows) ) - current_flow = create_flow(id=f"{agent_id}_main", agent_id=agent_id) - mark_flow_running(current_flow) + current_flow = Flow(id=f"{agent_id}_main", agent_id=agent_id) + current_flow.mark_running() run.add_flow(current_flow) while run.continue_condition(run): context = run.get_context() completion: ChatCompletion = call_llm(model, context, tools, options) msg = completion.choices[0].message - step = LLMStep(input=context, completion=completion) - add_step_to_flow(current_flow, step) + step = LLMStep(completion=completion) + current_flow.add_step(step) # Handle tool calls if msg.tool_calls: @@ -204,7 +171,7 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): "Tool '%s' execution failed: %s", tc.function.name, str(e) ) end = time() - return ToolStep( + return ToolCall( tool_call_id=tc.id, tool_name=tc.function.name, args=args, @@ -217,7 +184,7 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): # Execute all tools concurrently tasks = [execute_single_tool(tc) for tc in tool_calls] tool_steps = await asyncio.gather(*tasks, return_exceptions=True) - + success: list[ToolCall] = [] # Add steps to current flow for i, result in enumerate(tool_steps): if isinstance(result, Exception): @@ -228,19 +195,8 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): tc.function.name, result, ) - add_step_to_flow( - current_flow, - ToolStep( - tool_call_id=tc.id, - tool_name=tc.function.name, - args=json.loads(tc.function.arguments), - output="", - execution_time=0.0, - success=False, - error_message=str(result), - ), - ) - else: - add_step_to_flow(current_flow, result) - mark_flow_completed(current_flow) + elif isinstance(result, ToolCall): + success.append(result) + current_flow.add_step(ToolStep(calls=success)) + current_flow.mark_completed() logger.info("Agent run completed with %d total flows", len(run.flows)) diff --git a/src/scouter/llmcore/flow.py b/src/scouter/llmcore/flow.py index 5e0a1b1..b447fd6 100644 --- a/src/scouter/llmcore/flow.py +++ b/src/scouter/llmcore/flow.py @@ -1,41 +1,85 @@ -"""Flow as dict for grouping steps in agent runs.""" +"""Flow dataclass for grouping steps in agent runs.""" -from typing import TYPE_CHECKING +import time +from dataclasses import dataclass, field +from typing import cast -if TYPE_CHECKING: - from .agent import Step +from .types import ( + ChatCompletion, + ChatCompletionMessageParam, + ChatCompletionToolMessageParam, +) -def create_flow( - flow_id: str, agent_id: str = "default", parent_flow_id: str | None = None -) -> dict: - """Create a flow dict.""" - return { - "id": flow_id, - "agent_id": agent_id, - "steps": [], - "status": "pending", - "metadata": {}, - "parent_flow_id": parent_flow_id, - } +@dataclass +class InputStep: + input: list[ChatCompletionMessageParam] + @property + def messages(self) -> list[ChatCompletionMessageParam]: + return self.input -def add_step_to_flow(flow: dict, step: "Step") -> None: - """Add a step to the flow.""" - flow["steps"].append(step) +@dataclass +class LLMStep: + completion: ChatCompletion -def mark_flow_running(flow: dict) -> None: - flow["status"] = "running" - flow["metadata"]["start_time"] = __import__("time").time() + @property + def messages(self) -> list[ChatCompletionMessageParam]: + return [cast("ChatCompletionMessageParam", self.completion.choices[0].message)] -def mark_flow_completed(flow: dict) -> None: - flow["status"] = "completed" - flow["metadata"]["end_time"] = __import__("time").time() +@dataclass +class ToolCall: + tool_call_id: str + tool_name: str + args: dict + output: str + execution_time: float + success: bool + error_message: str | None + @property + def message(self) -> ChatCompletionToolMessageParam: + return ChatCompletionToolMessageParam( + role="tool", content=self.output, tool_call_id=self.tool_call_id + ) -def mark_flow_failed(flow: dict, error: str) -> None: - flow["status"] = "failed" - flow["metadata"]["error"] = error - flow["metadata"]["end_time"] = __import__("time").time() + +@dataclass +class ToolStep: + calls: list[ToolCall] + + @property + def messages(self) -> list[ChatCompletionToolMessageParam]: + return [item.message for item in self.calls] + + +Step = LLMStep | ToolStep | InputStep + + +@dataclass +class Flow: + id: str + agent_id: str = "default" + steps: list[Step] = field(default_factory=list) + status: str = "pending" + metadata: dict = field(default_factory=dict) + parent_flow_id: str | None = None + + def add_step(self, step: "Step") -> None: + """Add a step to the flow.""" + self.steps.append(step) + + def mark_running(self) -> None: + self.status = "running" + self.metadata["start_time"] = time.time() + + def mark_completed(self) -> None: + self.status = "completed" + self.metadata["end_time"] = time.time() + + def mark_failed(self, error: str) -> None: + self.status = "failed" + self.metadata["error"] = error + self.metadata["end_time"] = time.time() diff --git a/src/scouter/llmcore/memory.py b/src/scouter/llmcore/memory.py index 835297b..8234ead 100644 --- a/src/scouter/llmcore/memory.py +++ b/src/scouter/llmcore/memory.py @@ -3,35 +3,19 @@ from collections.abc import Callable from typing import TYPE_CHECKING +from .types import ChatCompletionMessageParam + if TYPE_CHECKING: from .agent import AgentRun -def full_history_memory(run) -> list: +def full_history_memory(run: "AgentRun") -> list[ChatCompletionMessageParam]: """Memory that includes all conversation history.""" - messages = [] + messages: list[ChatCompletionMessageParam] = [] for flow in run.flows: - messages.extend(step.message for step in flow["steps"]) + for step in flow.steps: + messages.extend(step.messages) return messages -def summarized_memory(run, max_steps: int = 10) -> list: - """Memory that keeps recent steps.""" - all_messages = [] - for flow in run.flows: - all_messages.extend([step.message for step in flow["steps"]]) - - if len(all_messages) <= max_steps: - return all_messages - - # Placeholder: keep last N - return all_messages[-max_steps:] - - -def vector_memory(_run) -> list: - """Placeholder for vector-based memory.""" - # TODO: Implement - return [] - - -MemoryFunction = Callable[["AgentRun"], list] +MemoryFunction = Callable[["AgentRun"], list[ChatCompletionMessageParam]] diff --git a/tests/test_llmcore_agent.py b/tests/test_llmcore_agent.py index c673d09..f628a1c 100644 --- a/tests/test_llmcore_agent.py +++ b/tests/test_llmcore_agent.py @@ -1,15 +1,11 @@ """Tests for llmcore agent functionality.""" -from src.scouter.llmcore import ( - create_flow, - mark_flow_completed, - mark_flow_running, -) +from src.scouter.llmcore.flow import Flow def test_flow_status(): - flow = create_flow(flow_id="test") - mark_flow_running(flow) - assert flow["status"] == "running" - mark_flow_completed(flow) - assert flow["status"] == "completed" + flow = Flow(id="test") + flow.mark_running() + assert flow.status == "running" + flow.mark_completed() + assert flow.status == "completed" From 5c5fc553a11a1b58adc7015d4cbee78dbf17f87c Mon Sep 17 00:00:00 2001 From: farhoud Date: Mon, 15 Dec 2025 14:05:04 +0330 Subject: [PATCH 3/3] Fix import styles and update type exports - Change relative imports to absolute in models.py - Add ChatCompletionToolUnionParam to types.py exports --- src/scouter/db/models.py | 9 +++------ src/scouter/llmcore/types.py | 3 +++ 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/src/scouter/db/models.py b/src/scouter/db/models.py index 6183b32..50a50e7 100644 --- a/src/scouter/db/models.py +++ b/src/scouter/db/models.py @@ -8,8 +8,9 @@ from typing import Any import neo4j -from ..llmcore.flow import LLMStep, ToolStep, InputStep -from ..llmcore.agent import AgentRun +from scouter.llmcore.agent import AgentRun +from scouter.llmcore.flow import InputStep, LLMStep, ToolCall, ToolStep +from scouter.llmcore.types import ChatCompletion # Flow is now dict @@ -127,14 +128,10 @@ def _serialize_step(self, step) -> dict[str, Any]: def _deserialize_step(self, step_type: str, data: str): """Deserialize a step from JSON.""" - from ..llmcore.types import ChatCompletion - data_dict = json.loads(data) if step_type == "LLMStep": return LLMStep(completion=ChatCompletion(**data_dict["completion"])) if step_type == "ToolStep": - from ..llmcore.flow import ToolCall - calls = [ToolCall(**call) for call in data_dict["calls"]] return ToolStep(calls=calls) if step_type == "InputStep": diff --git a/src/scouter/llmcore/types.py b/src/scouter/llmcore/types.py index 899b47e..bac6b35 100644 --- a/src/scouter/llmcore/types.py +++ b/src/scouter/llmcore/types.py @@ -9,6 +9,7 @@ ChatCompletionSystemMessageParam, ChatCompletionToolMessageParam, ChatCompletionToolParam, + ChatCompletionToolUnionParam, ChatCompletionUserMessageParam, ) from openai.types.chat.chat_completion_message_tool_call import ( @@ -24,9 +25,11 @@ "ChatCompletionMessage", "ChatCompletionMessageParam", "ChatCompletionMessageToolCall", + "ChatCompletionMessageToolCall", "ChatCompletionSystemMessageParam", "ChatCompletionToolMessageParam", "ChatCompletionToolParam", + "ChatCompletionToolUnionParam", "ChatCompletionUserMessageParam", "Prompt", ]