Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/arbiter/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from arbiter.analyzers.duplication_analyzer import DuplicationAnalyzer
from arbiter.analyzers.ruff_analyzer import RuffAnalyzer
from arbiter.analyzers.security_analyzer import SecurityAnalyzer
from arbiter.analyzers.semgrep_analyzer import SemgrepAnalyzer
from arbiter.diff_analyzer import score_commit, score_diff
from arbiter.git_historian import count_loc, walk_commits
from arbiter.scoring import RepoScore, score_findings
Expand All @@ -47,6 +48,7 @@ def _get_analyzers() -> list[Analyzer]:
RuffAnalyzer(),
ComplexityAnalyzer(),
SecurityAnalyzer(),
SemgrepAnalyzer(), # opt-in: requires ARBITER_ENABLE_SEMGREP=1
DeadCodeAnalyzer(),
DuplicationAnalyzer(),
]
Expand Down
116 changes: 116 additions & 0 deletions src/arbiter/analyzers/semgrep_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
"""Semgrep Analyzer — SAST analysis via semgrep CLI (opt-in).

Gated by the ``ARBITER_ENABLE_SEMGREP`` environment variable. Semgrep is
disabled by default because rule loading + scanning adds 45-60 seconds to
``arbiter score`` on typical Python repos, which pushes CI over the 120s
subprocess timeout used by downstream consumers (e.g. founder-mode's
``arbiter_adapter.get_quality_digest()``).

Opt in by setting ``ARBITER_ENABLE_SEMGREP=1`` in the environment. When
enabled, the analyzer uses the curated ``p/security`` ruleset (not
``--config auto``) to keep runtime predictable.

Reference: regression investigation 2026-04-05 identified an uncommitted
SemgrepAnalyzer with a 600s timeout and ``--config auto`` as the root
cause of a 5s → 45-60s ``arbiter score`` slowdown.
"""

from __future__ import annotations

import json
import os
import subprocess
from pathlib import Path

from arbiter.analyzers.base import Analyzer, Finding

# Env var that enables the analyzer. Disabled by default.
_ENABLE_ENV = "ARBITER_ENABLE_SEMGREP"

# Curated ruleset — faster than --config auto and predictable scope.
# Override with ARBITER_SEMGREP_CONFIG=<config> for experimentation.
_DEFAULT_CONFIG = "p/security"

# Timeout aligned with other analyzers (120s) rather than 600s.
_TIMEOUT_SECONDS = 120

# Map semgrep severity strings to arbiter's levels.
_SEVERITY_MAP = {
"ERROR": "HIGH",
"WARNING": "MEDIUM",
"INFO": "LOW",
# semgrep also emits uppercase CRITICAL/HIGH/MEDIUM/LOW via some rulesets
"CRITICAL": "CRITICAL",
"HIGH": "HIGH",
"MEDIUM": "MEDIUM",
"LOW": "LOW",
}


def _map_severity(raw: str) -> str:
return _SEVERITY_MAP.get((raw or "").upper(), "MEDIUM")


class SemgrepAnalyzer(Analyzer):
"""Runs semgrep scan (opt-in) and parses JSON output."""

@property
def name(self) -> str:
return "semgrep"

def is_available(self) -> bool:
# Opt-in gate: require explicit env var.
if os.environ.get(_ENABLE_ENV, "").strip() not in ("1", "true", "TRUE", "yes"):
return False
# Tool-present check.
try:
subprocess.run(
["semgrep", "--version"], capture_output=True, timeout=5,
)
return True
except (FileNotFoundError, subprocess.TimeoutExpired):
return False

def analyze_repo(
self, repo_path: Path, exclude_paths: list[str] | None = None,
) -> list[Finding]:
config = os.environ.get("ARBITER_SEMGREP_CONFIG", _DEFAULT_CONFIG)
cmd = [
"semgrep", "scan",
"--config", config,
"--json",
"--quiet",
"--metrics", "off",
]
for ep in exclude_paths or []:
cmd.extend(["--exclude", ep])
cmd.append(str(repo_path))

try:
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=_TIMEOUT_SECONDS,
)
except subprocess.TimeoutExpired:
return []

if not result.stdout.strip():
return []

try:
data = json.loads(result.stdout)
except json.JSONDecodeError:
return []

findings: list[Finding] = []
for item in data.get("results", []):
extra = item.get("extra", {}) or {}
start = item.get("start", {}) or {}
findings.append(Finding(
file_path=item.get("path", ""),
line=start.get("line", 0),
severity=_map_severity(extra.get("severity", "")),
rule_id=item.get("check_id", ""),
message=extra.get("message", ""),
tool="semgrep",
))
return findings
157 changes: 157 additions & 0 deletions tests/test_semgrep_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""Tests for SemgrepAnalyzer opt-in behavior and JSON parsing."""

import json
import os
import subprocess
from pathlib import Path
from unittest.mock import MagicMock, patch

import pytest

from arbiter.analyzers.semgrep_analyzer import SemgrepAnalyzer


@pytest.fixture(autouse=True)
def _clean_env(monkeypatch):
"""Ensure each test starts with a known env state."""
monkeypatch.delenv("ARBITER_ENABLE_SEMGREP", raising=False)
monkeypatch.delenv("ARBITER_SEMGREP_CONFIG", raising=False)


class TestSemgrepOptIn:
def test_disabled_by_default(self):
"""Without ARBITER_ENABLE_SEMGREP, is_available() must return False."""
analyzer = SemgrepAnalyzer()
assert analyzer.is_available() is False

def test_disabled_when_env_is_false(self, monkeypatch):
"""ARBITER_ENABLE_SEMGREP=0/false/empty all disable."""
for val in ("", "0", "false", "no", "off"):
monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", val)
assert SemgrepAnalyzer().is_available() is False

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_enabled_when_env_set_and_binary_present(self, mock_run, monkeypatch):
"""ARBITER_ENABLE_SEMGREP=1 + semgrep binary present → available."""
monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", "1")
mock_run.return_value = MagicMock(returncode=0)
assert SemgrepAnalyzer().is_available() is True

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_disabled_when_env_set_but_binary_missing(self, mock_run, monkeypatch):
"""Env opt-in alone is not enough — binary must also exist."""
monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", "1")
mock_run.side_effect = FileNotFoundError()
assert SemgrepAnalyzer().is_available() is False

def test_accepted_truthy_values(self, monkeypatch):
"""Accept 1, true, TRUE, yes — reject everything else."""
with patch(
"arbiter.analyzers.semgrep_analyzer.subprocess.run",
return_value=MagicMock(returncode=0),
):
for val in ("1", "true", "TRUE", "yes"):
monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", val)
assert SemgrepAnalyzer().is_available() is True, f"val={val!r} should enable"


class TestSemgrepParsing:
def _sample_semgrep_output(self) -> str:
return json.dumps({
"results": [
{
"check_id": "python.lang.security.audit.eval-detected",
"path": "src/eval_bad.py",
"start": {"line": 10, "col": 1},
"extra": {
"severity": "ERROR",
"message": "eval() use is dangerous",
},
},
{
"check_id": "python.lang.correctness.unused-import",
"path": "src/utils.py",
"start": {"line": 2, "col": 1},
"extra": {
"severity": "WARNING",
"message": "unused import",
},
},
],
})

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_parses_json_output(self, mock_run):
mock_run.return_value = MagicMock(
stdout=self._sample_semgrep_output(), returncode=0,
)
findings = SemgrepAnalyzer().analyze_repo(Path("/fake/repo"))
assert len(findings) == 2
assert all(f.tool == "semgrep" for f in findings)

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_severity_mapping(self, mock_run):
mock_run.return_value = MagicMock(
stdout=self._sample_semgrep_output(), returncode=0,
)
findings = SemgrepAnalyzer().analyze_repo(Path("/fake"))
by_rule = {f.rule_id: f for f in findings}
assert by_rule[
"python.lang.security.audit.eval-detected"
].severity == "HIGH"
assert by_rule[
"python.lang.correctness.unused-import"
].severity == "MEDIUM"

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_handles_empty_output(self, mock_run):
mock_run.return_value = MagicMock(stdout="", returncode=0)
assert SemgrepAnalyzer().analyze_repo(Path("/fake")) == []

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_handles_invalid_json(self, mock_run):
mock_run.return_value = MagicMock(stdout="not json {{{", returncode=0)
assert SemgrepAnalyzer().analyze_repo(Path("/fake")) == []

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_handles_timeout(self, mock_run):
mock_run.side_effect = subprocess.TimeoutExpired(cmd="semgrep", timeout=120)
assert SemgrepAnalyzer().analyze_repo(Path("/fake")) == []

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_timeout_is_120s_not_600s(self, mock_run):
"""Regression guard: timeout must stay at 120s to fit CI contract."""
mock_run.return_value = MagicMock(stdout="", returncode=0)
SemgrepAnalyzer().analyze_repo(Path("/fake"))
_, kwargs = mock_run.call_args
assert kwargs.get("timeout") == 120

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_uses_curated_ruleset_by_default(self, mock_run):
"""Regression guard: don't use --config auto, it's too slow."""
mock_run.return_value = MagicMock(stdout="", returncode=0)
SemgrepAnalyzer().analyze_repo(Path("/fake"))
cmd = mock_run.call_args[0][0]
assert "--config" in cmd
config_idx = cmd.index("--config")
assert cmd[config_idx + 1] == "p/security"
assert "auto" not in cmd

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_config_override_via_env(self, mock_run, monkeypatch):
monkeypatch.setenv("ARBITER_SEMGREP_CONFIG", "p/python")
mock_run.return_value = MagicMock(stdout="", returncode=0)
SemgrepAnalyzer().analyze_repo(Path("/fake"))
cmd = mock_run.call_args[0][0]
config_idx = cmd.index("--config")
assert cmd[config_idx + 1] == "p/python"

@patch("arbiter.analyzers.semgrep_analyzer.subprocess.run")
def test_passes_exclude_paths(self, mock_run):
mock_run.return_value = MagicMock(stdout="", returncode=0)
SemgrepAnalyzer().analyze_repo(
Path("/fake"), exclude_paths=["tests/*", "build/*"],
)
cmd = mock_run.call_args[0][0]
assert "tests/*" in cmd
assert "build/*" in cmd
Loading