Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions src/scouter/db/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""Database package for Scouter.

This package provides abstractions and utilities for database operations,
primarily focused on Neo4j for graph-based storage and retrieval.
"""

from .models import AgentRunRepository
from .neo4j import get_neo4j_driver, get_neo4j_embedder, get_neo4j_llm

__all__ = [
"AgentRunRepository",
"get_neo4j_driver",
"get_neo4j_embedder",
"get_neo4j_llm",
]
140 changes: 140 additions & 0 deletions src/scouter/db/models.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
"""Database models and abstractions for Scouter.

This module defines data models and operations for storing agent-related data
in Neo4j, such as agent runs, flows, and steps.
"""

import json
from typing import Any

import neo4j
from scouter.llmcore.agent import AgentRun
from scouter.llmcore.flow import InputStep, LLMStep, ToolCall, ToolStep
from scouter.llmcore.types import ChatCompletion

# Flow is now dict


class AgentRunRepository:
"""Repository for managing AgentRun persistence in Neo4j."""

def __init__(self, driver: neo4j.Driver):
self.driver = driver

def save_run(self, run: AgentRun, run_id: str) -> None:
"""Save an AgentRun to Neo4j."""
with self.driver.session() as session:
# Create run node
session.run(
"""
MERGE (r:AgentRun {id: $id})
SET r.total_usage = $total_usage,
r.memory_strategy = $memory_strategy
""",
id=run_id,
total_usage=json.dumps(run.total_usage),
memory_strategy=type(run.memory_function).__name__,
)

# Create flow nodes and relationships
for flow in run.flows:
session.run(
"""
MATCH (r:AgentRun {id: $run_id})
MERGE (f:Flow {id: $flow_id})
SET f.agent_id = $agent_id,
f.status = $status,
f.metadata = $metadata,
f.parent_flow_id = $parent_flow_id
CREATE (r)-[:HAS_FLOW]->(f)
""",
run_id=run_id,
flow_id=flow.id,
agent_id=flow.agent_id,
status=flow.status,
metadata=json.dumps(flow.metadata),
parent_flow_id=flow.parent_flow_id,
)

# Create step nodes for each flow
for i, step in enumerate(flow.steps):
step_data = self._serialize_step(step)
session.run(
"""
MATCH (f:Flow {id: $flow_id})
CREATE (s:AgentStep {index: $index, type: $type, data: $data})
CREATE (f)-[:HAS_STEP]->(s)
""",
flow_id=flow.id,
index=i,
type=type(step).__name__,
data=json.dumps(step_data),
)

def load_run(self, run_id: str) -> AgentRun | None:
"""Load an AgentRun from Neo4j."""
with self.driver.session() as session:
result = session.run(
"""
MATCH (r:AgentRun {id: $id})-[:HAS_FLOW]->(f:Flow)-[:HAS_STEP]->(s:AgentStep)
RETURN r, f, collect(s) as steps ORDER BY f.id, s.index
""",
id=run_id,
)
records = list(result)
if not records:
return None

records[0]["r"]
flows = {}
for record in records:
flow_node = record["f"]
step_nodes = record["steps"]

flow_id = flow_node["id"]
if flow_id not in flows:
flows[flow_id] = {
"id": flow_id,
"agent_id": flow_node["agent_id"],
"status": flow_node["status"],
"metadata": json.loads(flow_node["metadata"]),
"parent_flow_id": flow_node["parent_flow_id"],
"steps": [],
}

# Reconstruct steps
for step_node in step_nodes:
step = self._deserialize_step(step_node["type"], step_node["data"])
flows[flow_id]["steps"].append(step)

return AgentRun(flows=list(flows.values()))
# TODO: Restore memory_function from run_node

def _serialize_step(self, step) -> dict[str, Any]:
"""Serialize a step to a dict."""
if isinstance(step, LLMStep):
return {
"completion": step.completion.model_dump(),
}
if isinstance(step, ToolStep):
return {
"calls": [call.__dict__ for call in step.calls],
}
if isinstance(step, InputStep):
return {
"input": step.input,
}
return {"data": str(step)}

def _deserialize_step(self, step_type: str, data: str):
"""Deserialize a step from JSON."""
data_dict = json.loads(data)
if step_type == "LLMStep":
return LLMStep(completion=ChatCompletion(**data_dict["completion"]))
if step_type == "ToolStep":
calls = [ToolCall(**call) for call in data_dict["calls"]]
return ToolStep(calls=calls)
if step_type == "InputStep":
return InputStep(input=data_dict["input"])
msg = f"Unknown step type: {step_type}"
raise ValueError(msg)
34 changes: 34 additions & 0 deletions src/scouter/db/neo4j.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
"""Database connections and utilities for Scouter.

This module provides Neo4j driver setup and related database utilities.
"""

import os

from neo4j_graphrag.embeddings import SentenceTransformerEmbeddings
from neo4j_graphrag.llm import OpenAILLM

import neo4j
from neo4j import GraphDatabase


def get_neo4j_driver() -> neo4j.Driver:
"""Get a Neo4j driver instance."""
uri = os.getenv("NEO4J_URI", "bolt://localhost:7687")
user = os.getenv("NEO4J_USER", "neo4j")
password = os.getenv("NEO4J_PASSWORD", "password")
return GraphDatabase.driver(uri, auth=(user, password))


def get_neo4j_llm() -> OpenAILLM:
"""Get a Neo4j LLM instance configured for OpenAI."""
api_key = os.getenv("OPENAI_API_KEY")
if not api_key:
msg = "OPENAI_API_KEY environment variable is required"
raise ValueError(msg)
return OpenAILLM(model_name="gpt-4o-mini", model_params={"api_key": api_key})


def get_neo4j_embedder() -> SentenceTransformerEmbeddings:
"""Get a Neo4j embedder instance."""
return SentenceTransformerEmbeddings(model="all-MiniLM-L6-v2")
4 changes: 3 additions & 1 deletion src/scouter/llmcore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .agent import AgentRun, run_agent
from .agent import AgentRun, LLMStep, ToolStep, run_agent
from .client import ChatCompletionOptions, LLMConfig, call_llm, create_llm_client
from .exceptions import (
AgentError,
Expand Down Expand Up @@ -50,10 +50,12 @@
"InvalidToolDefinitionError",
"LLMConfig",
"LLMError",
"LLMStep",
"MaxRetriesExceededError",
"Prompt",
"Tool",
"ToolExecutionError",
"ToolStep",
"call_llm",
"create_instruction",
"create_llm_client",
Expand Down
Loading
Loading