diff --git a/src/arbiter/__main__.py b/src/arbiter/__main__.py index 43169b4..40f053e 100644 --- a/src/arbiter/__main__.py +++ b/src/arbiter/__main__.py @@ -25,6 +25,7 @@ from arbiter.analyzers.duplication_analyzer import DuplicationAnalyzer from arbiter.analyzers.ruff_analyzer import RuffAnalyzer from arbiter.analyzers.security_analyzer import SecurityAnalyzer +from arbiter.analyzers.semgrep_analyzer import SemgrepAnalyzer from arbiter.diff_analyzer import score_commit, score_diff from arbiter.git_historian import count_loc, walk_commits from arbiter.scoring import RepoScore, score_findings @@ -47,6 +48,7 @@ def _get_analyzers() -> list[Analyzer]: RuffAnalyzer(), ComplexityAnalyzer(), SecurityAnalyzer(), + SemgrepAnalyzer(), # opt-in: requires ARBITER_ENABLE_SEMGREP=1 DeadCodeAnalyzer(), DuplicationAnalyzer(), ] diff --git a/src/arbiter/analyzers/semgrep_analyzer.py b/src/arbiter/analyzers/semgrep_analyzer.py new file mode 100644 index 0000000..a199204 --- /dev/null +++ b/src/arbiter/analyzers/semgrep_analyzer.py @@ -0,0 +1,116 @@ +"""Semgrep Analyzer — SAST analysis via semgrep CLI (opt-in). + +Gated by the ``ARBITER_ENABLE_SEMGREP`` environment variable. Semgrep is +disabled by default because rule loading + scanning adds 45-60 seconds to +``arbiter score`` on typical Python repos, which pushes CI over the 120s +subprocess timeout used by downstream consumers (e.g. founder-mode's +``arbiter_adapter.get_quality_digest()``). + +Opt in by setting ``ARBITER_ENABLE_SEMGREP=1`` in the environment. When +enabled, the analyzer uses the curated ``p/security`` ruleset (not +``--config auto``) to keep runtime predictable. + +Reference: regression investigation 2026-04-05 identified an uncommitted +SemgrepAnalyzer with a 600s timeout and ``--config auto`` as the root +cause of a 5s → 45-60s ``arbiter score`` slowdown. +""" + +from __future__ import annotations + +import json +import os +import subprocess +from pathlib import Path + +from arbiter.analyzers.base import Analyzer, Finding + +# Env var that enables the analyzer. Disabled by default. +_ENABLE_ENV = "ARBITER_ENABLE_SEMGREP" + +# Curated ruleset — faster than --config auto and predictable scope. +# Override with ARBITER_SEMGREP_CONFIG= for experimentation. +_DEFAULT_CONFIG = "p/security" + +# Timeout aligned with other analyzers (120s) rather than 600s. +_TIMEOUT_SECONDS = 120 + +# Map semgrep severity strings to arbiter's levels. +_SEVERITY_MAP = { + "ERROR": "HIGH", + "WARNING": "MEDIUM", + "INFO": "LOW", + # semgrep also emits uppercase CRITICAL/HIGH/MEDIUM/LOW via some rulesets + "CRITICAL": "CRITICAL", + "HIGH": "HIGH", + "MEDIUM": "MEDIUM", + "LOW": "LOW", +} + + +def _map_severity(raw: str) -> str: + return _SEVERITY_MAP.get((raw or "").upper(), "MEDIUM") + + +class SemgrepAnalyzer(Analyzer): + """Runs semgrep scan (opt-in) and parses JSON output.""" + + @property + def name(self) -> str: + return "semgrep" + + def is_available(self) -> bool: + # Opt-in gate: require explicit env var. + if os.environ.get(_ENABLE_ENV, "").strip() not in ("1", "true", "TRUE", "yes"): + return False + # Tool-present check. + try: + subprocess.run( + ["semgrep", "--version"], capture_output=True, timeout=5, + ) + return True + except (FileNotFoundError, subprocess.TimeoutExpired): + return False + + def analyze_repo( + self, repo_path: Path, exclude_paths: list[str] | None = None, + ) -> list[Finding]: + config = os.environ.get("ARBITER_SEMGREP_CONFIG", _DEFAULT_CONFIG) + cmd = [ + "semgrep", "scan", + "--config", config, + "--json", + "--quiet", + "--metrics", "off", + ] + for ep in exclude_paths or []: + cmd.extend(["--exclude", ep]) + cmd.append(str(repo_path)) + + try: + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=_TIMEOUT_SECONDS, + ) + except subprocess.TimeoutExpired: + return [] + + if not result.stdout.strip(): + return [] + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError: + return [] + + findings: list[Finding] = [] + for item in data.get("results", []): + extra = item.get("extra", {}) or {} + start = item.get("start", {}) or {} + findings.append(Finding( + file_path=item.get("path", ""), + line=start.get("line", 0), + severity=_map_severity(extra.get("severity", "")), + rule_id=item.get("check_id", ""), + message=extra.get("message", ""), + tool="semgrep", + )) + return findings diff --git a/tests/test_semgrep_analyzer.py b/tests/test_semgrep_analyzer.py new file mode 100644 index 0000000..7e4504a --- /dev/null +++ b/tests/test_semgrep_analyzer.py @@ -0,0 +1,157 @@ +"""Tests for SemgrepAnalyzer opt-in behavior and JSON parsing.""" + +import json +import os +import subprocess +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from arbiter.analyzers.semgrep_analyzer import SemgrepAnalyzer + + +@pytest.fixture(autouse=True) +def _clean_env(monkeypatch): + """Ensure each test starts with a known env state.""" + monkeypatch.delenv("ARBITER_ENABLE_SEMGREP", raising=False) + monkeypatch.delenv("ARBITER_SEMGREP_CONFIG", raising=False) + + +class TestSemgrepOptIn: + def test_disabled_by_default(self): + """Without ARBITER_ENABLE_SEMGREP, is_available() must return False.""" + analyzer = SemgrepAnalyzer() + assert analyzer.is_available() is False + + def test_disabled_when_env_is_false(self, monkeypatch): + """ARBITER_ENABLE_SEMGREP=0/false/empty all disable.""" + for val in ("", "0", "false", "no", "off"): + monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", val) + assert SemgrepAnalyzer().is_available() is False + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_enabled_when_env_set_and_binary_present(self, mock_run, monkeypatch): + """ARBITER_ENABLE_SEMGREP=1 + semgrep binary present → available.""" + monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", "1") + mock_run.return_value = MagicMock(returncode=0) + assert SemgrepAnalyzer().is_available() is True + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_disabled_when_env_set_but_binary_missing(self, mock_run, monkeypatch): + """Env opt-in alone is not enough — binary must also exist.""" + monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", "1") + mock_run.side_effect = FileNotFoundError() + assert SemgrepAnalyzer().is_available() is False + + def test_accepted_truthy_values(self, monkeypatch): + """Accept 1, true, TRUE, yes — reject everything else.""" + with patch( + "arbiter.analyzers.semgrep_analyzer.subprocess.run", + return_value=MagicMock(returncode=0), + ): + for val in ("1", "true", "TRUE", "yes"): + monkeypatch.setenv("ARBITER_ENABLE_SEMGREP", val) + assert SemgrepAnalyzer().is_available() is True, f"val={val!r} should enable" + + +class TestSemgrepParsing: + def _sample_semgrep_output(self) -> str: + return json.dumps({ + "results": [ + { + "check_id": "python.lang.security.audit.eval-detected", + "path": "src/eval_bad.py", + "start": {"line": 10, "col": 1}, + "extra": { + "severity": "ERROR", + "message": "eval() use is dangerous", + }, + }, + { + "check_id": "python.lang.correctness.unused-import", + "path": "src/utils.py", + "start": {"line": 2, "col": 1}, + "extra": { + "severity": "WARNING", + "message": "unused import", + }, + }, + ], + }) + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_parses_json_output(self, mock_run): + mock_run.return_value = MagicMock( + stdout=self._sample_semgrep_output(), returncode=0, + ) + findings = SemgrepAnalyzer().analyze_repo(Path("/fake/repo")) + assert len(findings) == 2 + assert all(f.tool == "semgrep" for f in findings) + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_severity_mapping(self, mock_run): + mock_run.return_value = MagicMock( + stdout=self._sample_semgrep_output(), returncode=0, + ) + findings = SemgrepAnalyzer().analyze_repo(Path("/fake")) + by_rule = {f.rule_id: f for f in findings} + assert by_rule[ + "python.lang.security.audit.eval-detected" + ].severity == "HIGH" + assert by_rule[ + "python.lang.correctness.unused-import" + ].severity == "MEDIUM" + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_handles_empty_output(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + assert SemgrepAnalyzer().analyze_repo(Path("/fake")) == [] + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_handles_invalid_json(self, mock_run): + mock_run.return_value = MagicMock(stdout="not json {{{", returncode=0) + assert SemgrepAnalyzer().analyze_repo(Path("/fake")) == [] + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_handles_timeout(self, mock_run): + mock_run.side_effect = subprocess.TimeoutExpired(cmd="semgrep", timeout=120) + assert SemgrepAnalyzer().analyze_repo(Path("/fake")) == [] + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_timeout_is_120s_not_600s(self, mock_run): + """Regression guard: timeout must stay at 120s to fit CI contract.""" + mock_run.return_value = MagicMock(stdout="", returncode=0) + SemgrepAnalyzer().analyze_repo(Path("/fake")) + _, kwargs = mock_run.call_args + assert kwargs.get("timeout") == 120 + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_uses_curated_ruleset_by_default(self, mock_run): + """Regression guard: don't use --config auto, it's too slow.""" + mock_run.return_value = MagicMock(stdout="", returncode=0) + SemgrepAnalyzer().analyze_repo(Path("/fake")) + cmd = mock_run.call_args[0][0] + assert "--config" in cmd + config_idx = cmd.index("--config") + assert cmd[config_idx + 1] == "p/security" + assert "auto" not in cmd + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_config_override_via_env(self, mock_run, monkeypatch): + monkeypatch.setenv("ARBITER_SEMGREP_CONFIG", "p/python") + mock_run.return_value = MagicMock(stdout="", returncode=0) + SemgrepAnalyzer().analyze_repo(Path("/fake")) + cmd = mock_run.call_args[0][0] + config_idx = cmd.index("--config") + assert cmd[config_idx + 1] == "p/python" + + @patch("arbiter.analyzers.semgrep_analyzer.subprocess.run") + def test_passes_exclude_paths(self, mock_run): + mock_run.return_value = MagicMock(stdout="", returncode=0) + SemgrepAnalyzer().analyze_repo( + Path("/fake"), exclude_paths=["tests/*", "build/*"], + ) + cmd = mock_run.call_args[0][0] + assert "tests/*" in cmd + assert "build/*" in cmd