Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions core/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -153,11 +153,6 @@ def execute_decision_step(step: DecisionStep, step_num: int) -> tuple[int, str,
"""
print(f"Executing decision step: {step.name}")

# # Check that we have at least one decision
# if not step.decision:
# print("⚠ No decisions defined, defaulting to CONTINUE")
# return (SUCCESS, "CONTINUE", None)

# Get the mode from the first decision (all decisions in a step should have the same mode)
decision_mode = step.mode
print(f"Decision mode: {decision_mode}")
Expand Down
63 changes: 23 additions & 40 deletions core/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -776,6 +776,26 @@ def replace_field(match):
return Path(path)


def _resolve_agent_prompt(prompt: str, framework_root: str) -> str:
"""Resolve agent file references in prompts, loading and injecting agent content."""
agent_file_match = re.search(r'\./(?:\.opencode|prompts)/(agents?)/([^.\s]+)\.md', prompt)
if not agent_file_match:
return prompt

prompts_dir = os.environ.get('PROMPTS_DIR', str(Path(framework_root) / 'prompts'))
agent_file_path = Path(prompts_dir) / agent_file_match.group(1) / f"{agent_file_match.group(2)}.md"
if not agent_file_path.exists():
print(f" ⚠ Agent file not found: {agent_file_path}")
return prompt

print(f" Loading agent definition from: {agent_file_path}")
agent_content = agent_file_path.read_text()
parts = agent_content.split('---', 2)
if len(parts) >= 3:
return parts[2].strip()
return agent_content


def execute_task_step(step: TaskStep, step_num: int, step_id: str | None = None) -> tuple[int, str, str | None, str | None]:
"""
Execute a task step based on its model type.
Expand Down Expand Up @@ -913,26 +933,7 @@ def execute_task_step(step: TaskStep, step_num: int, step_id: str | None = None)
parser_script_file = Path(framework_root) / 'log_formatters' / 'claude_code.py'

# Process prompt - if it references an agent file, read and inject its content
prompt = step.prompt

agent_file_match = re.search(r'\./(?:\.opencode|prompts)/(agents?)/([^.\s]+)\.md', prompt)
if agent_file_match:
# Resolve agent file from PROMPTS_DIR
prompts_dir = os.environ.get('PROMPTS_DIR', str(Path(framework_root) / 'prompts'))
agent_file_path = Path(prompts_dir) / agent_file_match.group(1) / f"{agent_file_match.group(2)}.md"
if agent_file_path.exists():
print(f" Loading agent definition from: {agent_file_path}")
agent_content = agent_file_path.read_text()
# Extract content after frontmatter (between --- markers)
parts = agent_content.split('---', 2)
if len(parts) >= 3:
# Use everything after the second ---
prompt = parts[2].strip()
else:
# No frontmatter, use full content
prompt = agent_content
else:
print(f" ⚠ Agent file not found: {agent_file_path}")
prompt = _resolve_agent_prompt(step.prompt, framework_root)

# If repo_path is set, prefix command to cd there first
# Use RECON_FOUNDRY_ROOT (where foundry.toml lives) with fallback to RECON_REPO_PATH
Expand Down Expand Up @@ -991,25 +992,7 @@ def execute_task_step(step: TaskStep, step_num: int, step_id: str | None = None)
parser_script_file = Path(framework_root) / 'log_formatters' / 'opencode.py'

# Process prompt - if it references an agent file, read and inject its content
prompt = step.prompt
agent_file_match = re.search(r'\./(?:\.opencode|prompts)/(agents?)/([^.\s]+)\.md', prompt)
if agent_file_match:
# Resolve agent file from PROMPTS_DIR
prompts_dir = os.environ.get('PROMPTS_DIR', str(Path(framework_root) / 'prompts'))
agent_file_path = Path(prompts_dir) / agent_file_match.group(1) / f"{agent_file_match.group(2)}.md"
if agent_file_path.exists():
print(f" Loading agent definition from: {agent_file_path}")
agent_content = agent_file_path.read_text()
# Extract content after frontmatter (between --- markers)
parts = agent_content.split('---', 2)
if len(parts) >= 3:
# Use everything after the second ---
prompt = parts[2].strip()
else:
# No frontmatter, use full content
prompt = agent_content
else:
print(f" ⚠ Agent file not found: {agent_file_path}")
prompt = _resolve_agent_prompt(step.prompt, framework_root)

# If repo_path is set, prefix command to cd there first
# Use RECON_FOUNDRY_ROOT (where foundry.toml lives) with fallback to RECON_REPO_PATH
Expand Down Expand Up @@ -1143,7 +1126,7 @@ def execute_task_step(step: TaskStep, step_num: int, step_id: str | None = None)
error_body = ""
try:
error_body = e.response.json().get("message", str(e))
except:
except Exception:
error_body = str(e)
print(f" ❌ Failed to dispatch fuzzing job: {error_body}")
return (FAILURE, "CONTINUE", None, None)
Expand Down
53 changes: 1 addition & 52 deletions server/utils.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,7 @@
"""
Utility functions for logging, parsing, and dependency checking.
Utility functions for parsing.
"""

import subprocess
import time


def create_log_file() -> str:
"""
Create a timestamped log file path.
- Generate log ID from current timestamp
- Create path: artifacts/claude_logs_{timestamp}.json
- Return log file path
"""
log_id = time.strftime("%Y%m%d_%H%M%S")
log_file = f"artifacts/claude_logs_{log_id}.json"
return log_file


def parse_repo_info(repo_url: str) -> tuple[str, str]:
"""
Expand All @@ -40,39 +25,3 @@ def parse_repo_info(repo_url: str) -> tuple[str, str]:
except Exception as e:
print(f"Error parsing repo info: {e}")
return "", ""


def check_dependencies() -> dict[str, bool]:
"""
Check if required tools are installed.
- Check: forge, echidna, medusa, halmos, node, claude
- Return dict of {tool: is_installed}
"""
tools = ["forge", "echidna", "medusa", "halmos", "node", "claude", "git"]
status = {}

for tool in tools:
try:
result = subprocess.run(
["which", tool],
capture_output=True,
text=True
)
status[tool] = result.returncode == 0
except Exception:
status[tool] = False

return status


def install_missing_dependencies(deps: list[str]) -> bool:
"""
Install missing dependencies.
- Call appropriate installers for each missing dependency
- Return overall success status
"""
# This is a placeholder - actual implementation would need
# to handle different installation methods per dependency
print(f"Missing dependencies: {deps}")
print("Please install them manually or run the install_deps.sh script")
return False
31 changes: 0 additions & 31 deletions setup.py

This file was deleted.