diff --git a/src/scouter/db/__init__.py b/src/scouter/db/__init__.py new file mode 100644 index 0000000..e725ba7 --- /dev/null +++ b/src/scouter/db/__init__.py @@ -0,0 +1,15 @@ +"""Database package for Scouter. + +This package provides abstractions and utilities for database operations, +primarily focused on Neo4j for graph-based storage and retrieval. +""" + +from .models import AgentRunRepository +from .neo4j import get_neo4j_driver, get_neo4j_embedder, get_neo4j_llm + +__all__ = [ + "AgentRunRepository", + "get_neo4j_driver", + "get_neo4j_embedder", + "get_neo4j_llm", +] diff --git a/src/scouter/db/models.py b/src/scouter/db/models.py new file mode 100644 index 0000000..50a50e7 --- /dev/null +++ b/src/scouter/db/models.py @@ -0,0 +1,140 @@ +"""Database models and abstractions for Scouter. + +This module defines data models and operations for storing agent-related data +in Neo4j, such as agent runs, flows, and steps. +""" + +import json +from typing import Any + +import neo4j +from scouter.llmcore.agent import AgentRun +from scouter.llmcore.flow import InputStep, LLMStep, ToolCall, ToolStep +from scouter.llmcore.types import ChatCompletion + +# Flow is now dict + + +class AgentRunRepository: + """Repository for managing AgentRun persistence in Neo4j.""" + + def __init__(self, driver: neo4j.Driver): + self.driver = driver + + def save_run(self, run: AgentRun, run_id: str) -> None: + """Save an AgentRun to Neo4j.""" + with self.driver.session() as session: + # Create run node + session.run( + """ + MERGE (r:AgentRun {id: $id}) + SET r.total_usage = $total_usage, + r.memory_strategy = $memory_strategy + """, + id=run_id, + total_usage=json.dumps(run.total_usage), + memory_strategy=type(run.memory_function).__name__, + ) + + # Create flow nodes and relationships + for flow in run.flows: + session.run( + """ + MATCH (r:AgentRun {id: $run_id}) + MERGE (f:Flow {id: $flow_id}) + SET f.agent_id = $agent_id, + f.status = $status, + f.metadata = $metadata, + f.parent_flow_id = $parent_flow_id + CREATE (r)-[:HAS_FLOW]->(f) + """, + run_id=run_id, + flow_id=flow.id, + agent_id=flow.agent_id, + status=flow.status, + metadata=json.dumps(flow.metadata), + parent_flow_id=flow.parent_flow_id, + ) + + # Create step nodes for each flow + for i, step in enumerate(flow.steps): + step_data = self._serialize_step(step) + session.run( + """ + MATCH (f:Flow {id: $flow_id}) + CREATE (s:AgentStep {index: $index, type: $type, data: $data}) + CREATE (f)-[:HAS_STEP]->(s) + """, + flow_id=flow.id, + index=i, + type=type(step).__name__, + data=json.dumps(step_data), + ) + + def load_run(self, run_id: str) -> AgentRun | None: + """Load an AgentRun from Neo4j.""" + with self.driver.session() as session: + result = session.run( + """ + MATCH (r:AgentRun {id: $id})-[:HAS_FLOW]->(f:Flow)-[:HAS_STEP]->(s:AgentStep) + RETURN r, f, collect(s) as steps ORDER BY f.id, s.index + """, + id=run_id, + ) + records = list(result) + if not records: + return None + + records[0]["r"] + flows = {} + for record in records: + flow_node = record["f"] + step_nodes = record["steps"] + + flow_id = flow_node["id"] + if flow_id not in flows: + flows[flow_id] = { + "id": flow_id, + "agent_id": flow_node["agent_id"], + "status": flow_node["status"], + "metadata": json.loads(flow_node["metadata"]), + "parent_flow_id": flow_node["parent_flow_id"], + "steps": [], + } + + # Reconstruct steps + for step_node in step_nodes: + step = self._deserialize_step(step_node["type"], step_node["data"]) + flows[flow_id]["steps"].append(step) + + return AgentRun(flows=list(flows.values())) + # TODO: Restore memory_function from run_node + + def _serialize_step(self, step) -> dict[str, Any]: + """Serialize a step to a dict.""" + if isinstance(step, LLMStep): + return { + "completion": step.completion.model_dump(), + } + if isinstance(step, ToolStep): + return { + "calls": [call.__dict__ for call in step.calls], + } + if isinstance(step, InputStep): + return { + "input": step.input, + } + return {"data": str(step)} + + def _deserialize_step(self, step_type: str, data: str): + """Deserialize a step from JSON.""" + data_dict = json.loads(data) + if step_type == "LLMStep": + return LLMStep(completion=ChatCompletion(**data_dict["completion"])) + if step_type == "ToolStep": + calls = [ToolCall(**call) for call in data_dict["calls"]] + return ToolStep(calls=calls) + if step_type == "InputStep": + return InputStep(input=data_dict["input"]) + msg = f"Unknown step type: {step_type}" + raise ValueError(msg) diff --git a/src/scouter/db/neo4j.py b/src/scouter/db/neo4j.py new file mode 100644 index 0000000..28f2979 --- /dev/null +++ b/src/scouter/db/neo4j.py @@ -0,0 +1,34 @@ +"""Database connections and utilities for Scouter. + +This module provides Neo4j driver setup and related database utilities. +""" + +import os + +from neo4j_graphrag.embeddings import SentenceTransformerEmbeddings +from neo4j_graphrag.llm import OpenAILLM + +import neo4j +from neo4j import GraphDatabase + + +def get_neo4j_driver() -> neo4j.Driver: + """Get a Neo4j driver instance.""" + uri = os.getenv("NEO4J_URI", "bolt://localhost:7687") + user = os.getenv("NEO4J_USER", "neo4j") + password = os.getenv("NEO4J_PASSWORD", "password") + return GraphDatabase.driver(uri, auth=(user, password)) + + +def get_neo4j_llm() -> OpenAILLM: + """Get a Neo4j LLM instance configured for OpenAI.""" + api_key = os.getenv("OPENAI_API_KEY") + if not api_key: + msg = "OPENAI_API_KEY environment variable is required" + raise ValueError(msg) + return OpenAILLM(model_name="gpt-4o-mini", model_params={"api_key": api_key}) + + +def get_neo4j_embedder() -> SentenceTransformerEmbeddings: + """Get a Neo4j embedder instance.""" + return SentenceTransformerEmbeddings(model="all-MiniLM-L6-v2") diff --git a/src/scouter/llmcore/__init__.py b/src/scouter/llmcore/__init__.py index e3765e9..af1a775 100644 --- a/src/scouter/llmcore/__init__.py +++ b/src/scouter/llmcore/__init__.py @@ -1,4 +1,4 @@ -from .agent import AgentRun, run_agent +from .agent import AgentRun, LLMStep, ToolStep, run_agent from .client import ChatCompletionOptions, LLMConfig, call_llm, create_llm_client from .exceptions import ( AgentError, @@ -50,10 +50,12 @@ "InvalidToolDefinitionError", "LLMConfig", "LLMError", + "LLMStep", "MaxRetriesExceededError", "Prompt", "Tool", "ToolExecutionError", + "ToolStep", "call_llm", "create_instruction", "create_llm_client", diff --git a/src/scouter/llmcore/agent.py b/src/scouter/llmcore/agent.py index e810b5f..1fbaa82 100644 --- a/src/scouter/llmcore/agent.py +++ b/src/scouter/llmcore/agent.py @@ -7,106 +7,97 @@ from time import time from typing import TYPE_CHECKING, cast -from openai.types.chat import ( - ChatCompletion, - ChatCompletionMessageParam, - ChatCompletionToolMessageParam, - ChatCompletionToolUnionParam, -) +from .client import ChatCompletionOptions, call_llm +from .exceptions import InvalidRunStateError +from .flow import Flow, LLMStep, ToolCall, ToolStep +from .memory import MemoryFunction, full_history_memory +from .tools import run_tool if TYPE_CHECKING: from collections.abc import Callable, Iterable - from openai.types.chat.chat_completion_message_tool_call import ( + from .types import ( + ChatCompletion, + ChatCompletionMessageParam, ChatCompletionMessageToolCall, + ChatCompletionToolUnionParam, ) -from .client import ChatCompletionOptions, call_llm -from .exceptions import InvalidRunStateError -from .tools import run_tool logger = logging.getLogger(__name__) -@dataclass -class InputStep: - message: ChatCompletionMessageParam - - -@dataclass -class LLMStep: - completion: ChatCompletion - - @property - def message(self) -> ChatCompletionMessageParam: - return cast("ChatCompletionMessageParam", self.completion.choices[0].message) - - -@dataclass -class ToolStep: - tool_call_id: str - tool_name: str - args: dict - output: str - execution_time: float - success: bool - error_message: str | None - - @property - def message(self) -> ChatCompletionToolMessageParam: - return ChatCompletionToolMessageParam( - role="tool", content=self.output, tool_call_id=self.tool_call_id - ) - - -Step = InputStep | LLMStep | ToolStep - - @dataclass class AgentRun: continue_condition: Callable[[AgentRun], bool] = field( default_factory=lambda: default_continue_condition_factory() ) - steps: list[Step] = field(default_factory=list) - - def add_step(self, step: Step) -> None: - """Add a step to the run.""" - self.steps.append(step) - - @property - def conversation_history(self) -> list[ChatCompletionMessageParam]: - return [step.message for step in self.steps] + flows: list[Flow] = field(default_factory=list) + memory_function: MemoryFunction = field(default=full_history_memory) + agents: dict[str, Callable[[], AgentRun]] = field( + default_factory=dict + ) # For multi-agent: factory functions + + def add_flow(self, flow: Flow) -> None: + """Add a flow to the run.""" + self.flows.append(flow) + + def get_context(self) -> list[ChatCompletionMessageParam]: + """Get configurable memory context instead of flat history.""" + return self.memory_function(self) + + def run_sub_agent(self, agent_id: str) -> Flow: + """Run a sub-agent within this run, returning its flow.""" + if agent_id not in self.agents: + msg = f"Agent {agent_id} not registered" + raise ValueError(msg) + flow = Flow(id=f"{agent_id}_{len(self.flows)}", agent_id=agent_id) + flow.mark_running() + self.add_flow(flow) + # TODO: Integrate with run_agent for actual execution + # For now, placeholder: assume sub_run executes and adds steps to flow + flow.mark_completed() + return flow @property def total_usage( self, ) -> dict: # Simplified, can make proper ChatCompletionUsage later total = {"completion_tokens": 0, "prompt_tokens": 0, "total_tokens": 0} - for step in self.steps: - if isinstance(step, LLMStep) and step.completion.usage: - usage = step.completion.usage - total["completion_tokens"] += usage.completion_tokens or 0 - total["prompt_tokens"] += usage.prompt_tokens or 0 - total["total_tokens"] += usage.total_tokens or 0 + for flow in self.flows: + for step in flow.steps: + if isinstance(step, LLMStep) and step.completion.usage: + usage = step.completion.usage + total["completion_tokens"] += usage.completion_tokens or 0 + total["prompt_tokens"] += usage.prompt_tokens or 0 + total["total_tokens"] += usage.total_tokens or 0 return total @property def last_output(self) -> str: - if not self.steps: - msg = "No steps in run" + if not self.flows: + msg = "No flows in run" logger.error("Attempted to get last output from empty run") raise InvalidRunStateError(msg) - last_step = self.steps[-1] + last_flow = self.flows[-1] + if not last_flow.steps: + return "" + last_step = last_flow.steps[-1] if isinstance(last_step, LLMStep): - content = last_step.message.get("content") - return content if isinstance(content, str) else "" + content = last_step.completion.choices[0].message.content + return content if content else "" if isinstance(last_step, ToolStep): - return last_step.output + return str(last_step.messages) return "" @property def tool_executions(self) -> list[ToolStep]: - return [step for step in self.steps if isinstance(step, ToolStep)] + executions = [] + for flow in self.flows: + executions.extend( + [step for step in flow.steps if isinstance(step, ToolStep)] + ) + return executions def default_continue_condition_factory( @@ -114,36 +105,44 @@ def default_continue_condition_factory( ) -> Callable[[AgentRun], bool]: def condition(run: AgentRun) -> bool: if max_steps is not None: - llm_count = sum(1 for step in run.steps if isinstance(step, LLMStep)) + llm_count = sum( + 1 + for flow in run.flows + for step in flow.steps + if isinstance(step, LLMStep) + ) if llm_count >= max_steps: return False - # Filter out InputStep to find the last meaningful step - non_input_steps = [ - step for step in run.steps if not isinstance(step, InputStep) - ] - if not non_input_steps: - return True # Only InputSteps present, initial state - last_non_input = non_input_steps[-1] - return isinstance(last_non_input, ToolStep) + # Find the last step across all flows + all_steps = [step for flow in run.flows for step in flow.steps] + if not all_steps: + return True # No steps yet + last_step = all_steps[-1] + return isinstance(last_step, ToolStep) return condition -async def run_agent( +async def run_flow( run: AgentRun, model: str = "gpt-4o-mini", tools: Iterable[ChatCompletionToolUnionParam] | None = None, options: ChatCompletionOptions | None = None, + agent_id: str = "default", ): logger.info( - "Starting agent run with model=%s, initial_steps=%d", model, len(run.steps) + "Starting agent run with model=%s, initial_flows=%d", model, len(run.flows) ) + current_flow = Flow(id=f"{agent_id}_main", agent_id=agent_id) + current_flow.mark_running() + run.add_flow(current_flow) + while run.continue_condition(run): - completion: ChatCompletion = call_llm( - model, run.conversation_history, tools, options - ) + context = run.get_context() + completion: ChatCompletion = call_llm(model, context, tools, options) msg = completion.choices[0].message - run.add_step(LLMStep(completion)) + step = LLMStep(completion=completion) + current_flow.add_step(step) # Handle tool calls if msg.tool_calls: @@ -172,7 +171,7 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): "Tool '%s' execution failed: %s", tc.function.name, str(e) ) end = time() - return ToolStep( + return ToolCall( tool_call_id=tc.id, tool_name=tc.function.name, args=args, @@ -185,8 +184,8 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): # Execute all tools concurrently tasks = [execute_single_tool(tc) for tc in tool_calls] tool_steps = await asyncio.gather(*tasks, return_exceptions=True) - - # Add steps in the order of tool_calls to preserve conversation history + success: list[ToolCall] = [] + # Add steps to current flow for i, result in enumerate(tool_steps): if isinstance(result, Exception): # Handle unexpected errors in gather @@ -196,17 +195,8 @@ async def execute_single_tool(tc: ChatCompletionMessageToolCall): tc.function.name, result, ) - run.add_step( - ToolStep( - tool_call_id=tc.id, - tool_name=tc.function.name, - args=json.loads(tc.function.arguments), - output="", - execution_time=0.0, - success=False, - error_message=str(result), - ) - ) - else: - run.add_step(result) # type: ignore[reportArgumentType] - logger.info("Agent run completed with %d total steps", len(run.steps)) + elif isinstance(result, ToolCall): + success.append(result) + current_flow.add_step(ToolStep(calls=success)) + current_flow.mark_completed() + logger.info("Agent run completed with %d total flows", len(run.flows)) diff --git a/src/scouter/llmcore/flow.py b/src/scouter/llmcore/flow.py new file mode 100644 index 0000000..b447fd6 --- /dev/null +++ b/src/scouter/llmcore/flow.py @@ -0,0 +1,85 @@ +"""Flow dataclass for grouping steps in agent runs.""" + +import time +from dataclasses import dataclass, field +from typing import cast + +from .types import ( + ChatCompletion, + ChatCompletionMessageParam, + ChatCompletionToolMessageParam, +) + + +@dataclass +class InputStep: + input: list[ChatCompletionMessageParam] + + @property + def messages(self) -> list[ChatCompletionMessageParam]: + return self.input + + +@dataclass +class LLMStep: + completion: ChatCompletion + + @property + def messages(self) -> list[ChatCompletionMessageParam]: + return [cast("ChatCompletionMessageParam", self.completion.choices[0].message)] + + +@dataclass +class ToolCall: + tool_call_id: str + tool_name: str + args: dict + output: str + execution_time: float + success: bool + error_message: str | None + + @property + def message(self) -> ChatCompletionToolMessageParam: + return ChatCompletionToolMessageParam( + role="tool", content=self.output, tool_call_id=self.tool_call_id + ) + + +@dataclass +class ToolStep: + calls: list[ToolCall] + + @property + def messages(self) -> list[ChatCompletionToolMessageParam]: + return [item.message for item in self.calls] + + +Step = LLMStep | ToolStep | InputStep + + +@dataclass +class Flow: + id: str + agent_id: str = "default" + steps: list[Step] = field(default_factory=list) + status: str = "pending" + metadata: dict = field(default_factory=dict) + parent_flow_id: str | None = None + + def add_step(self, step: "Step") -> None: + """Add a step to the flow.""" + self.steps.append(step) + + def mark_running(self) -> None: + self.status = "running" + self.metadata["start_time"] = time.time() + + def mark_completed(self) -> None: + self.status = "completed" + self.metadata["end_time"] = time.time() + + def mark_failed(self, error: str) -> None: + self.status = "failed" + self.metadata["error"] = error + self.metadata["end_time"] = time.time() diff --git a/src/scouter/llmcore/memory.py b/src/scouter/llmcore/memory.py new file mode 100644 index 0000000..8234ead --- /dev/null +++ b/src/scouter/llmcore/memory.py @@ -0,0 +1,21 @@ +"""Memory functions for configurable agent context.""" + +from collections.abc import Callable +from typing import TYPE_CHECKING + +from .types import ChatCompletionMessageParam + +if TYPE_CHECKING: + from .agent import AgentRun + + +def full_history_memory(run: "AgentRun") -> list[ChatCompletionMessageParam]: + """Memory that includes all conversation history.""" + messages: list[ChatCompletionMessageParam] = [] + for flow in run.flows: + for step in flow.steps: + messages.extend(step.messages) + return messages + + +MemoryFunction = Callable[["AgentRun"], list[ChatCompletionMessageParam]] diff --git a/src/scouter/llmcore/messages.py b/src/scouter/llmcore/messages.py index 034d8fd..d236a77 100644 --- a/src/scouter/llmcore/messages.py +++ b/src/scouter/llmcore/messages.py @@ -1,29 +1,15 @@ -import logging +"""Message creation utilities.""" -from .agent import InputStep, Step -from .types import ChatCompletionSystemMessageParam, ChatCompletionUserMessageParam - -logger = logging.getLogger(__name__) +from openai.types.chat import ChatCompletionMessageParam def create_instruction( - steps: list[Step], system: str | None = None, prompt: str | None = None -) -> None: - """Add system and user messages to the steps list as InputStep instances.""" - logger.debug( - "Creating instruction with system=%s, prompt=%s", bool(system), bool(prompt) - ) - if system: - steps.append( - InputStep( - message=ChatCompletionSystemMessageParam(role="system", content=system) - ) - ) - logger.debug("Added system message to steps") - if prompt: - steps.append( - InputStep( - message=ChatCompletionUserMessageParam(role="user", content=prompt) - ) - ) - logger.debug("Added user message to steps") + system_prompt: str, user_prompt: str +) -> list[ChatCompletionMessageParam]: + """Create instruction messages.""" + messages = [] + if system_prompt: + messages.append({"role": "system", "content": system_prompt}) + if user_prompt: + messages.append({"role": "user", "content": user_prompt}) + return messages diff --git a/src/scouter/llmcore/types.py b/src/scouter/llmcore/types.py index 899b47e..bac6b35 100644 --- a/src/scouter/llmcore/types.py +++ b/src/scouter/llmcore/types.py @@ -9,6 +9,7 @@ ChatCompletionSystemMessageParam, ChatCompletionToolMessageParam, ChatCompletionToolParam, + ChatCompletionToolUnionParam, ChatCompletionUserMessageParam, ) from openai.types.chat.chat_completion_message_tool_call import ( @@ -24,9 +25,11 @@ "ChatCompletionMessage", "ChatCompletionMessageParam", "ChatCompletionMessageToolCall", + "ChatCompletionMessageToolCall", "ChatCompletionSystemMessageParam", "ChatCompletionToolMessageParam", "ChatCompletionToolParam", + "ChatCompletionToolUnionParam", "ChatCompletionUserMessageParam", "Prompt", ] diff --git a/tests/test_llmcore_agent.py b/tests/test_llmcore_agent.py new file mode 100644 index 0000000..f628a1c --- /dev/null +++ b/tests/test_llmcore_agent.py @@ -0,0 +1,11 @@ +"""Tests for llmcore agent functionality.""" + +from src.scouter.llmcore.flow import Flow + + +def test_flow_status(): + flow = Flow(id="test") + flow.mark_running() + assert flow.status == "running" + flow.mark_completed() + assert flow.status == "completed"