diff --git a/docs/pvr_triage_overview.md b/docs/pvr_triage_overview.md new file mode 100644 index 0000000..6103e94 --- /dev/null +++ b/docs/pvr_triage_overview.md @@ -0,0 +1,142 @@ +# PVR Triage Taskflows — Overview + +> 30-minute sync reference. Last updated: 2026-03-03. + +--- + +## The Problem + +OSS maintainers get flooded with low-quality vulnerability reports via GitHub's Private Vulnerability Reporting (PVR). Most are vague, duplicated, or AI-generated. Reviewing each one manually is expensive. + +--- + +## The Solution: 4 Taskflows + +``` +┌─────────────────────────────────────────────────────────────┐ +│ INBOX │ +│ (GHSAs in triage state via GitHub PVR) │ +└───────────────────────┬─────────────────────────────────────┘ + │ + ▼ + ┌─────────────────────────┐ + │ pvr_triage_batch │ "What's in my inbox?" + │ │ + │ • List triage GHSAs │ + │ • Score each by │ + │ severity + quality │ + │ • Show Age (days) │ + │ • Rank: highest first │ + │ (oldest wins ties) │ + └────────────┬────────────┘ + │ ranked queue saved to REPORT_DIR + ▼ + ┌─────────────────────────┐ + │ pvr_triage │ "Is this real?" + │ (one advisory) │ + │ │ + │ Task 1: init │ + │ Task 2: fetch & parse │ + │ Task 3: quality gate ──┼──► fast-close? ──► skip to Task 7 + │ Task 4: verify code │ + │ Task 5: write report │ + │ Task 6: save report │ + │ Task 7: draft response │ + │ Task 8: save + record │ + └────────────┬────────────┘ + │ _triage.md + _response_triage.md saved + ▼ + Maintainer reviews + (edits draft if needed) + │ + ┌────────┴────────┐ + │ │ + ▼ ▼ + ┌──────────────────┐ ┌──────────────────────┐ + │ pvr_respond │ │ pvr_respond_batch │ + │ (one at a time) │ │ (all at once) │ + │ │ │ │ + │ confirm-gated: │ │ • list_pending │ + │ accept (→draft) │ │ • for each: │ + │ reject (→closed)│ │ - confirm-gated │ + │ │ │ state change │ + │ mark as applied │ │ - mark as applied │ + │ post draft │ │ • post drafts │ + │ manually via UI │ │ manually via UI │ + └──────────────────┘ └──────────────────────┘ +``` + +--- + +## The Quality Gate (Task 3) — Key Logic + +``` +Reporter has history? + │ + ├── HIGH TRUST ──────────────────► Always full verification + │ (≥60% confirmed, ≤20% low) + │ + ├── SKEPTICISM ──────────────────► Fast-close if 0 quality signals + │ (≤20% confirmed OR ≥50% low) (no prior report needed) + │ + └── NORMAL / NEW ────────────────► Fast-close only if: + 0 quality signals + AND prior similar report exists +``` + +**Quality signals:** file paths cited · PoC provided · line numbers cited + +**Fast-close effect:** skip code verification → use canned response template requesting specifics + +--- + +## Scoring (batch) + +``` +priority_score = severity_weight + quality_weight + +severity: critical=4 high=3 medium=2 low=1 +quality: +1 per signal (files, PoC, lines) → max +3 + +≥5 Triage Immediately +≥3 Triage Soon + 2 Triage +≤1 Likely Low Quality — Fast Close +``` + +--- + +## Output Files (all in REPORT_DIR) + +| File | Written by | What it is | +|---|---|---| +| `GHSA-xxxx_triage.md` | pvr_triage | Full analysis report | +| `GHSA-xxxx_response_triage.md` | pvr_triage | Draft reply to reporter | +| `GHSA-xxxx_response_sent.md` | pvr_respond / batch | State-transition applied marker (idempotent) | +| `batch_queue__.md` | pvr_triage_batch | Ranked inbox table | + +--- + +## Reporter Reputation (background) + +Every completed triage records **verdict + quality** against the reporter's GitHub login in a local SQLite DB. Score feeds back into the next triage's quality gate automatically. No manual configuration. + +--- + +## One-liner workflow + +```bash +./scripts/run_pvr_triage.sh batch owner/repo # see inbox +./scripts/run_pvr_triage.sh triage owner/repo GHSA-xxx # analyse one +./scripts/run_pvr_triage.sh respond owner/repo GHSA-xxx accept # accept one (triage→draft) +./scripts/run_pvr_triage.sh respond owner/repo GHSA-xxx reject # reject one (triage→closed) +./scripts/run_pvr_triage.sh respond_batch owner/repo reject # bulk state transition +# Then post each *_response_triage.md manually via the advisory URL +``` + +--- + +## Further reading + +- [`taskflows/pvr_triage/README.md`](../src/seclab_taskflows/taskflows/pvr_triage/README.md) — full usage docs for all four taskflows +- [`taskflows/pvr_triage/SCORING.md`](../src/seclab_taskflows/taskflows/pvr_triage/SCORING.md) — authoritative scoring reference and fast-close decision tables diff --git a/pyproject.toml b/pyproject.toml index 4c6bc8f..8f5a55a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -92,6 +92,7 @@ ignore = [ "PLR1730", # Replace `if` statement with `min()` "PLR2004", # Magic value used in comparison "PLW0602", # Using global for variable but no assignment is done + "PLW0603", # Using the global statement to update a variable is discouraged "PLW1508", # Invalid type for environment variable default "PLW1510", # `subprocess.run` without explicit `check` argument "RET504", # Unnecessary assignment before `return` statement @@ -101,6 +102,7 @@ ignore = [ "RUF015", # Prefer `next(iter())` over single element slice "S607", # Starting a process with a partial executable path "SIM101", # Use a ternary expression instead of if-else-block + "SIM105", # Use contextlib.suppress (false positive: try block contains assignment) "SIM114", # Combine `if` branches using logical `or` operator "SIM117", # Use a single `with` statement with multiple contexts "SIM118", # Use `key in dict` instead of `key in dict.keys()` @@ -113,3 +115,12 @@ ignore = [ "W291", # Trailing whitespace "W293", # Blank line contains whitespace ] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = [ + "PLC0415", # Import not at top of file (deliberate in setUp/test methods for patching) + "PT009", # Use assert instead of unittest-style assertEqual (TestCase subclass) + "PT027", # Use pytest.raises instead of assertRaises (TestCase subclass) + "S101", # Use of assert (standard in pytest) + "SLF001", # Private member accessed (tests legitimately access module internals) +] diff --git a/scripts/run_pvr_triage.sh b/scripts/run_pvr_triage.sh new file mode 100755 index 0000000..70481db --- /dev/null +++ b/scripts/run_pvr_triage.sh @@ -0,0 +1,208 @@ +#!/bin/bash +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT +# +# Local test / demo script for the PVR triage taskflows. +# +# Usage: +# ./scripts/run_pvr_triage.sh batch +# ./scripts/run_pvr_triage.sh triage +# ./scripts/run_pvr_triage.sh respond +# ./scripts/run_pvr_triage.sh respond_batch +# ./scripts/run_pvr_triage.sh demo +# +# Environment (any already-set values are respected): +# GH_TOKEN — GitHub token; falls back to: gh auth token +# AI_API_TOKEN — AI API key (required, must be set before running) +# AI_API_ENDPOINT — defaults to https://api.githubcopilot.com +# REPORT_DIR — defaults to ./reports +# LOG_DIR — defaults to ./logs + +set -euo pipefail + +__dir="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +__root="$(cd "${__dir}/.." && pwd)" + +# --------------------------------------------------------------------------- +# Usage (defined early so --help can fire before env validation) +# --------------------------------------------------------------------------- + +usage() { + cat < [args] + +Commands: + batch + Score unprocessed triage advisories and save a ranked queue table to REPORT_DIR. + Advisories already present in REPORT_DIR are skipped. + + triage + Run full triage on one advisory: verify code, generate report + response draft. + + respond + Apply a state transition to a GitHub advisory. action = accept | reject + Requires pvr_triage to have been run first for the given GHSA. + Post the response draft manually via the advisory URL after running. + + respond_batch + Scan REPORT_DIR and apply state transitions to all pending advisories. + action = accept | reject + + demo + Full pipeline on the given repo (batch → triage on first triage advisory → report preview). + Does not post anything to GitHub. + +Environment: + GH_TOKEN — GitHub token; falls back to: gh auth token + AI_API_TOKEN — AI API key (required, must be set before running) + AI_API_ENDPOINT — defaults to https://api.githubcopilot.com + REPORT_DIR — defaults to ./reports + LOG_DIR — defaults to ./logs +EOF +} + +case "${1:-}" in + -h|--help|help|"") usage; exit 0 ;; +esac + +# --------------------------------------------------------------------------- +# Environment setup +# --------------------------------------------------------------------------- + +# Prepend local venv to PATH if present (resolves 'python' for MCP servers) +if [ -d "${__root}/.venv/bin" ]; then + export PATH="${__root}/.venv/bin:${PATH}" +fi + +# GitHub token +if [ -z "${GH_TOKEN:-}" ]; then + if command -v gh &>/dev/null; then + GH_TOKEN="$(gh auth token 2>/dev/null)" || true + fi + if [ -z "${GH_TOKEN:-}" ]; then + echo "ERROR: GH_TOKEN not set and 'gh auth token' failed." >&2 + exit 1 + fi + export GH_TOKEN +fi + +# AI API token +if [ -z "${AI_API_TOKEN:-}" ]; then + echo "ERROR: AI_API_TOKEN is not set." >&2 + exit 1 +fi + +export AI_API_ENDPOINT="${AI_API_ENDPOINT:-https://api.githubcopilot.com}" + +export REPORT_DIR="${REPORT_DIR:-${__root}/reports}" +mkdir -p "${REPORT_DIR}" + +export LOG_DIR="${LOG_DIR:-${__root}/logs}" +mkdir -p "${LOG_DIR}" + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +run_agent() { + python -m seclab_taskflow_agent "$@" +} + +# --------------------------------------------------------------------------- +# Commands +# --------------------------------------------------------------------------- + +cmd_batch() { + local repo="${1:?Usage: $0 batch }" + echo "==> Scoring inbox for ${repo} ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g "repo=${repo}" +} + +cmd_triage() { + local repo="${1:?Usage: $0 triage }" + local ghsa="${2:?Usage: $0 triage }" + echo "==> Triaging ${ghsa} in ${repo} ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g "repo=${repo}" \ + -g "ghsa=${ghsa}" +} + +cmd_respond() { + local repo="${1:?Usage: $0 respond }" + local ghsa="${2:?Usage: $0 respond }" + local action="${3:?Usage: $0 respond }" + case "${action}" in + accept|reject) ;; + *) echo "ERROR: action must be accept or reject" >&2; exit 1 ;; + esac + echo "==> Responding to ${ghsa} in ${repo} (action=${action}) ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g "repo=${repo}" \ + -g "ghsa=${ghsa}" \ + -g "action=${action}" +} + +cmd_respond_batch() { + local repo="${1:?Usage: $0 respond_batch }" + local action="${2:?Usage: $0 respond_batch }" + case "${action}" in + accept|reject) ;; + *) echo "ERROR: action must be accept or reject" >&2; exit 1 ;; + esac + echo "==> Bulk respond for ${repo} (action=${action}) ..." + run_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ + -g "repo=${repo}" \ + -g "action=${action}" +} + +cmd_demo() { + local repo="${1:?Usage: $0 demo }" + + # Pick the first triage advisory, or bail if none + local ghsa + ghsa="$(gh api "/repos/${repo}/security-advisories?state=triage&per_page=1" \ + --jq '.[0].ghsa_id // empty' 2>/dev/null)" || true + + if [ -z "${ghsa}" ]; then + echo "No triage advisories found in ${repo}. Create one at:" >&2 + echo " https://github.com/${repo}/security/advisories/new" >&2 + exit 1 + fi + + echo "==> Demo: ${repo} advisory: ${ghsa}" + echo + + echo "--- Step 1: batch inbox score ---" + cmd_batch "${repo}" + echo + + echo "--- Step 2: full triage ---" + cmd_triage "${repo}" "${ghsa}" + echo + + echo "--- Reports written to ${REPORT_DIR} ---" + ls -1 "${REPORT_DIR}"/*.md 2>/dev/null || true + echo + echo "To accept (triage → draft) or reject (triage → closed):" + echo " $0 respond ${repo} ${ghsa} accept" + echo " $0 respond ${repo} ${ghsa} reject" + echo "Then post the response draft manually via the advisory URL." +} + +# --------------------------------------------------------------------------- +# Dispatch +# --------------------------------------------------------------------------- + +case "${1:-}" in + batch) shift; cmd_batch "$@" ;; + triage) shift; cmd_triage "$@" ;; + respond) shift; cmd_respond "$@" ;; + respond_batch) shift; cmd_respond_batch "$@" ;; + demo) shift; cmd_demo "$@" ;; + *) echo "ERROR: unknown command '${1}'" >&2; usage; exit 1 ;; +esac diff --git a/src/seclab_taskflows/configs/model_config_pvr_triage.yaml b/src/seclab_taskflows/configs/model_config_pvr_triage.yaml new file mode 100644 index 0000000..f4a3437 --- /dev/null +++ b/src/seclab_taskflows/configs/model_config_pvr_triage.yaml @@ -0,0 +1,22 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR triage model configuration. +# AI_API_ENDPOINT defaults to https://api.githubcopilot.com (set in run_pvr_triage.sh). +# Override AI_API_ENDPOINT and AI_API_TOKEN for other providers. + +seclab-taskflow-agent: + version: "1.0" + filetype: model_config + +models: + # Primary model for code analysis and triage reasoning + triage: claude-opus-4.6-1m + # Lighter model for structured data extraction tasks + extraction: gpt-5-mini + +model_settings: + extraction: + temperature: 1 + triage: + temperature: 1 diff --git a/src/seclab_taskflows/mcp_servers/pvr_ghsa.py b/src/seclab_taskflows/mcp_servers/pvr_ghsa.py new file mode 100644 index 0000000..772fd1b --- /dev/null +++ b/src/seclab_taskflows/mcp_servers/pvr_ghsa.py @@ -0,0 +1,510 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR GHSA MCP Server +# +# Tools for fetching and parsing GitHub Security Advisories +# submitted via Private Vulnerability Reporting (PVR) (triage state). +# Uses the gh CLI for all GitHub API calls. + +from __future__ import annotations + +import json +import logging +import os +import re +import subprocess +from datetime import datetime, timezone +from pathlib import Path + +from fastmcp import FastMCP +from pydantic import Field +from seclab_taskflow_agent.path_utils import log_file_name + +_raw_report_dir = os.getenv("REPORT_DIR") +REPORT_DIR = Path(_raw_report_dir) if _raw_report_dir and _raw_report_dir.strip() else Path("reports") + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + filename=log_file_name("mcp_pvr_ghsa.log"), + filemode="a", +) + +mcp = FastMCP("PVRAdvisories") + + +def _gh_api( + path: str, + method: str = "GET", + body: dict | None = None, +) -> tuple[dict | list | None, str | None]: + """ + Call the GitHub REST API via the gh CLI. + + Returns (data, error). On success data is the parsed JSON response and + error is None. On failure data is None and error is a string. + If body is provided it is passed as JSON via stdin (--input -). + """ + cmd = ["gh", "api", "--method", method, path] + env = os.environ.copy() + stdin_data = None + + if body is not None: + cmd += ["--input", "-"] + stdin_data = json.dumps(body) + + try: + result = subprocess.run( + cmd, + input=stdin_data, + capture_output=True, + text=True, + env=env, + timeout=30, + ) + except subprocess.TimeoutExpired: + return None, "gh api call timed out" + except FileNotFoundError: + return None, "gh CLI not found in PATH" + + if result.returncode != 0: + stderr = result.stderr.strip() + stdout = result.stdout.strip() + msg = stderr or stdout or f"gh exited with code {result.returncode}" + logging.error("gh api error: %s", msg) + return None, msg + + try: + data = json.loads(result.stdout) + except json.JSONDecodeError as e: + return None, f"JSON parse error: {e}" + + return data, None + + +def _parse_advisory(raw: dict) -> dict: + """ + Extract the fields relevant to PVR triage from a raw advisory API response. + Separates description text from structured metadata. + """ + vulns = [] + for v in raw.get("vulnerabilities") or []: + pkg = v.get("package") or {} + vulns.append({ + "ecosystem": pkg.get("ecosystem", ""), + "package": pkg.get("name", ""), + "vulnerable_versions": v.get("vulnerable_version_range", ""), + "patched_versions": v.get("patched_versions", ""), + }) + + cwes = [c.get("cwe_id", "") for c in (raw.get("cwes") or [])] + + credits_ = [ + {"login": c.get("user", {}).get("login", ""), "type": c.get("type", "")} + for c in (raw.get("credits_detailed") or []) + ] + + submission = raw.get("submission") or {} + + return { + "ghsa_id": raw.get("ghsa_id", ""), + "cve_id": raw.get("cve_id"), + "html_url": raw.get("html_url", ""), + "state": raw.get("state", ""), + "severity": raw.get("severity", ""), + "summary": raw.get("summary", ""), + # Full description returned separately so metadata stays compact + "description": raw.get("description", ""), + "vulnerabilities": vulns, + "cwes": cwes, + "credits": credits_, + # submission.accepted=true means this arrived via PVR + "pvr_submission": { + "via_pvr": bool(submission), + "accepted": submission.get("accepted", False), + }, + "created_at": raw.get("created_at", ""), + "updated_at": raw.get("updated_at", ""), + "collaborating_users": [ + u.get("login", "") for u in (raw.get("collaborating_users") or []) + ], + } + + +@mcp.tool() +def fetch_pvr_advisory( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Fetch a single repository security advisory by GHSA ID. + + Returns structured advisory metadata and the full description text. + Works for advisories in triage state (requires repo or security_events scope on GH_TOKEN). + """ + path = f"/repos/{owner}/{repo}/security-advisories/{ghsa_id}" + data, err = _gh_api(path) + if err: + return f"Error fetching advisory {ghsa_id}: {err}" + parsed = _parse_advisory(data) + return json.dumps(parsed, indent=2) + + +@mcp.tool() +def list_pvr_advisories( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + state: str = Field( + default="triage", + description="Advisory state to filter by: triage, draft, published, closed, or withdrawn. Default: triage", + ), +) -> str: + """ + List repository security advisories, defaulting to triage state. + + Returns a JSON summary list (no description text). Each entry includes + ghsa_id, severity, summary, state, pvr_submission, and created_at. + Returns an empty JSON list when no advisories are found. + Paginates automatically through all pages (100 items per page). + """ + base_path = f"/repos/{owner}/{repo}/security-advisories?state={state}&per_page=100" + all_data: list = [] + page = 1 + max_pages = 50 # hard cap: 5000 advisories max + while page <= max_pages: + data, err = _gh_api(f"{base_path}&page={page}") + if err: + return f"Error listing advisories: {err}" + if not isinstance(data, list): + return f"Unexpected response: {data}" + if not data: + break + all_data.extend(data) + if len(data) < 100: + break + page += 1 + + results = [] + for raw in all_data: + submission = raw.get("submission") or {} + results.append({ + "ghsa_id": raw.get("ghsa_id", ""), + "severity": raw.get("severity", ""), + "summary": raw.get("summary", ""), + "state": raw.get("state", ""), + "pvr_submission": { + "via_pvr": bool(submission), + "accepted": submission.get("accepted", False), + }, + "created_at": raw.get("created_at", ""), + }) + + return json.dumps(results, indent=2) + + +@mcp.tool() +def resolve_version_ref( + owner: str = Field(description="Repository owner"), + repo: str = Field(description="Repository name"), + version: str = Field( + description="Version string to resolve, e.g. '1.25.4' or 'v1.25.4'. " + "Will try matching git tags directly and with a 'v' prefix." + ), +) -> str: + """ + Resolve a version string to a git commit SHA and tag name. + + Returns the tag name and commit SHA if found. + """ + # Try both bare version and v-prefixed tag + candidates = [version, f"v{version}"] if not version.startswith("v") else [version, version[1:]] + + for tag in candidates: + path = f"/repos/{owner}/{repo}/git/refs/tags/{tag}" + data, err = _gh_api(path) + if err or not data: + continue + # Lightweight tags point directly to a commit; annotated tags point to a tag object + obj = data.get("object", {}) + ref_sha = obj.get("sha", "") + ref_type = obj.get("type", "") + + if ref_type == "tag": + # Annotated tag: dereference to the commit + tag_path = f"/repos/{owner}/{repo}/git/tags/{ref_sha}" + tag_data, tag_err = _gh_api(tag_path) + if not tag_err and tag_data: + commit_sha = tag_data.get("object", {}).get("sha", "") + return json.dumps({"tag": tag, "commit_sha": commit_sha, "type": "annotated"}) + elif ref_type == "commit": + return json.dumps({"tag": tag, "commit_sha": ref_sha, "type": "lightweight"}) + + return f"Could not resolve version '{version}' to a tag in {owner}/{repo}." + + +@mcp.tool() +def fetch_file_at_ref( + owner: str = Field(description="Repository owner"), + repo: str = Field(description="Repository name"), + path: str = Field(description="File path within the repository"), + ref: str = Field(description="Git ref (commit SHA, tag, or branch) to fetch the file at"), + start_line: int = Field(default=1, description="First line to return (1-indexed)"), + length: int = Field(default=100, description="Number of lines to return (max 500)"), +) -> str: + """ + Fetch a range of lines from a file at a specific git ref (commit SHA or tag). + """ + # Use gh api with the ref query parameter + cmd = [ + "gh", "api", + "--method", "GET", + f"/repos/{owner}/{repo}/contents/{path}", + "-f", f"ref={ref}", + "-H", "Accept: application/vnd.github.raw+json", + ] + env = os.environ.copy() + + try: + result = subprocess.run(cmd, capture_output=True, text=True, env=env, timeout=30) + except subprocess.TimeoutExpired: + return "Error: gh api call timed out" + except FileNotFoundError: + return "Error: gh CLI not found in PATH" + + if result.returncode != 0: + return f"Error fetching {path}@{ref}: {result.stderr.strip() or result.stdout.strip()}" + + lines = result.stdout.splitlines() + if start_line < 1: + start_line = 1 + if length < 1: + length = 50 + length = min(length, 500) # cap to avoid returning enormous files + if start_line > len(lines): + return f"start_line {start_line} exceeds file length ({len(lines)} lines) in {path}@{ref}" + chunk = lines[start_line - 1: start_line - 1 + length] + if not chunk: + return f"No lines in range {start_line}-{start_line + length - 1} in {path}@{ref}" + return "\n".join(f"{start_line + i}: {line}" for i, line in enumerate(chunk)) + + +@mcp.tool() +def save_triage_report( + ghsa_id: str = Field(description="GHSA ID, used as the filename stem, e.g. GHSA-xxxx-xxxx-xxxx"), + report: str = Field(description="Full markdown report content to write to disk"), +) -> str: + """ + Write the triage report to a markdown file in the report output directory. + + The file is written to REPORT_DIR/{ghsa_id}_triage.md. + REPORT_DIR defaults to './reports' and can be overridden via the REPORT_DIR + environment variable. Returns the absolute path of the written file. + """ + REPORT_DIR.mkdir(parents=True, exist_ok=True) + # Sanitize the GHSA ID to prevent path traversal + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + if not safe_name: + return "Error: ghsa_id produced an empty filename after sanitization" + out_path = REPORT_DIR / f"{safe_name}_triage.md" + # The agent sometimes passes the report as a JSON-encoded string + # (with outer quotes and escape sequences). Decode it if so. + content = report + if content.startswith('"') and content.endswith('"'): + try: + content = json.loads(content) + except json.JSONDecodeError: + pass + out_path.write_text(content, encoding="utf-8") + logging.info("Triage report written to %s", out_path) + return str(out_path.resolve()) + + +@mcp.tool() +def reject_pvr_advisory( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Close (reject) a security advisory. + + Sets the advisory state to 'closed' via the GitHub API. Requires a GH_TOKEN + with security_events write scope. + + Note: the GitHub REST API has no comments endpoint for security advisories. + Post the response draft to the reporter manually via the advisory URL. + """ + path = f"/repos/{owner}/{repo}/security-advisories/{ghsa_id}" + _, err = _gh_api(path, method="PATCH", body={"state": "closed"}) + if err: + return f"Error closing advisory {ghsa_id}: {err}" + return f"Advisory {ghsa_id} closed (state: closed)." + + +@mcp.tool() +def accept_pvr_advisory( + owner: str = Field(description="Repository owner (user or org name)"), + repo: str = Field(description="Repository name"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Accept a PVR advisory by moving it from triage to draft state. + + Sets the advisory state to 'draft' via the GitHub API (triage → draft transition). + Use this when the vulnerability is confirmed and the maintainer intends to publish + a security advisory. Requires a GH_TOKEN with security_events write scope. + + Note: the GitHub REST API has no comments endpoint for security advisories. + Post the response draft to the reporter manually via the advisory URL. + """ + path = f"/repos/{owner}/{repo}/security-advisories/{ghsa_id}" + _, err = _gh_api(path, method="PATCH", body={"state": "draft"}) + if err: + return f"Error accepting advisory {ghsa_id}: {err}" + return f"Advisory {ghsa_id} accepted (state: draft)." + + +@mcp.tool() +def find_similar_triage_reports( + vuln_type: str = Field(description="Vulnerability class to search for, e.g. 'path traversal', 'XSS'"), + affected_component: str = Field(description="Component, endpoint, or feature to search for"), +) -> str: + """ + Search existing triage reports for similar vulnerability types and affected components. + + Scans REPORT_DIR for *_triage.md files and performs case-insensitive substring + matching across the full file content for vuln_type and/or affected_component. + A report matches if at least one non-empty search term is found anywhere in the file. + Returns an empty list if both terms are empty/whitespace. + Returns a JSON list of matching reports with ghsa_id, verdict, quality, and path. + """ + if not REPORT_DIR.exists(): + return json.dumps([]) + + vuln_lower = vuln_type.strip().lower() + component_lower = affected_component.strip().lower() + + # Both terms empty → no meaningful search possible + if not vuln_lower and not component_lower: + return json.dumps([]) + + matches = [] + + for report_path in sorted(REPORT_DIR.glob("*_triage.md")): + # Skip batch queue reports and response drafts — only match individual GHSA triage reports + stem = report_path.stem # e.g. "GHSA-xxxx-xxxx-xxxx_triage" + if stem.startswith("batch_queue_") or stem.endswith("_response_triage"): + continue + try: + content = report_path.read_text(encoding="utf-8") + except OSError: + continue + + content_lower = content.lower() + matched = (vuln_lower and vuln_lower in content_lower) or ( + component_lower and component_lower in content_lower + ) + if not matched: + continue + + # Extract GHSA ID from filename: {ghsa_id}_triage.md + ghsa_id = stem.replace("_triage", "") + + # Extract verdict from report (handles **CONFIRMED** and **[CONFIRMED]**) + verdict = "UNKNOWN" + verdict_match = re.search(r"\*\*\[?\s*(CONFIRMED|UNCONFIRMED|INCONCLUSIVE)\s*\]?\*\*", content) + if verdict_match: + verdict = verdict_match.group(1) + + # Extract quality rating — report format: "Rate overall quality: High / Medium / Low" + quality = "Unknown" + quality_match = re.search(r"Rate overall quality[:\s]*\**\s*(High|Medium|Low)\b", content, re.IGNORECASE) + if quality_match: + quality = quality_match.group(1) + + matches.append({ + "ghsa_id": ghsa_id, + "verdict": verdict, + "quality": quality, + "path": str(report_path), + }) + + return json.dumps(matches, indent=2) + + +@mcp.tool() +def read_triage_report( + ghsa_id: str = Field(description="GHSA ID, used to locate the report file, e.g. GHSA-xxxx-xxxx-xxxx"), +) -> str: + """ + Read a previously saved triage report from disk. + + Reads REPORT_DIR/{ghsa_id}_triage.md and returns its content. + Returns an error string if the file does not exist. + """ + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + report_path = REPORT_DIR / f"{safe_name}_triage.md" + if not report_path.exists(): + return f"Report not found: {report_path}" + return report_path.read_text(encoding="utf-8") + + +@mcp.tool() +def list_pending_responses() -> str: + """ + List advisories that have a response draft but have not yet been sent. + + Globs REPORT_DIR for *_response_triage.md files and skips any whose + corresponding *_response_sent.md marker exists. + Returns a JSON list of {ghsa_id, triage_report_exists} objects. + """ + if not REPORT_DIR.exists(): + return json.dumps([]) + + results = [] + for draft_path in sorted(REPORT_DIR.glob("*_response_triage.md")): + # stem is e.g. "GHSA-xxxx-xxxx-xxxx_response_triage" + stem = draft_path.stem + # Extract ghsa_id: remove "_response_triage" suffix + ghsa_id = stem.replace("_response_triage", "") + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + + # Skip if sent marker exists + sent_marker = REPORT_DIR / f"{safe_name}_response_sent.md" + if sent_marker.exists(): + continue + + triage_report = REPORT_DIR / f"{safe_name}_triage.md" + results.append({ + "ghsa_id": ghsa_id, + "triage_report_exists": triage_report.exists(), + }) + + return json.dumps(results, indent=2) + + +@mcp.tool() +def mark_response_sent( + ghsa_id: str = Field(description="GHSA ID of the advisory whose response was sent"), +) -> str: + """ + Create a marker file indicating that the response for this advisory has been sent. + + Writes REPORT_DIR/{ghsa_id}_response_sent.md with an ISO timestamp. + Returns the path of the created marker, or an error string if ghsa_id is empty. + """ + safe_name = "".join(c for c in ghsa_id if c.isalnum() or c in "-_") + if not safe_name: + return "Error: ghsa_id produced an empty filename after sanitization" + REPORT_DIR.mkdir(parents=True, exist_ok=True) + marker_path = REPORT_DIR / f"{safe_name}_response_sent.md" + timestamp = datetime.now(timezone.utc).isoformat() + marker_path.write_text(f"Response sent: {timestamp}\n", encoding="utf-8") + logging.info("Response sent marker written to %s", marker_path) + return str(marker_path.resolve()) + + +if __name__ == "__main__": + mcp.run(show_banner=False) diff --git a/src/seclab_taskflows/mcp_servers/reporter_reputation.py b/src/seclab_taskflows/mcp_servers/reporter_reputation.py new file mode 100644 index 0000000..5f2c299 --- /dev/null +++ b/src/seclab_taskflows/mcp_servers/reporter_reputation.py @@ -0,0 +1,224 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Reporter Reputation MCP Server +# +# Tracks PVR reporter history and computes reputation scores based on +# past triage outcomes. Uses a local SQLite database. + +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime, timezone +from pathlib import Path + +from fastmcp import FastMCP +from pydantic import Field +from seclab_taskflow_agent.path_utils import log_file_name, mcp_data_dir +from sqlalchemy import Text, UniqueConstraint, create_engine +from sqlalchemy.orm import DeclarativeBase, Mapped, Session, mapped_column + +REPORTER_DB_DIR = mcp_data_dir("seclab-taskflows", "reporter_reputation", "REPORTER_DB_DIR") + +logging.basicConfig( + level=logging.DEBUG, + format="%(asctime)s - %(levelname)s - %(message)s", + filename=log_file_name("mcp_reporter_reputation.log"), + filemode="a", +) + + +class Base(DeclarativeBase): + pass + + +VALID_VERDICTS = frozenset({"CONFIRMED", "UNCONFIRMED", "INCONCLUSIVE"}) +VALID_QUALITIES = frozenset({"High", "Medium", "Low"}) + + +class ReporterRecord(Base): + __tablename__ = "reporter_records" + __table_args__ = (UniqueConstraint("login", "ghsa_id", name="uq_reporter_ghsa"),) + + id: Mapped[int] = mapped_column(primary_key=True) + login: Mapped[str] + ghsa_id: Mapped[str] + repo: Mapped[str] + verdict: Mapped[str] # CONFIRMED / UNCONFIRMED / INCONCLUSIVE + quality: Mapped[str] # High / Medium / Low + timestamp: Mapped[str] = mapped_column(Text) # ISO8601 + + def __repr__(self) -> str: + return ( + f"" + ) + + +class ReporterReputationBackend: + def __init__(self, db_dir: Path | str) -> None: + if str(db_dir) == "sqlite://": + # Explicit in-memory sentinel (used in tests) + connection_string = "sqlite://" + else: + db_path = Path(db_dir) + db_path.mkdir(parents=True, exist_ok=True) + connection_string = f"sqlite:///{db_path}/reporter_reputation.db" + self.engine = create_engine(connection_string, echo=False) + Base.metadata.create_all(self.engine) + + def record_triage_result( + self, login: str, ghsa_id: str, repo: str, verdict: str, quality: str + ) -> str: + """Insert or update a triage result record for a reporter.""" + if verdict not in VALID_VERDICTS: + raise ValueError(f"Invalid verdict {verdict!r}. Must be one of {sorted(VALID_VERDICTS)}") + if quality not in VALID_QUALITIES: + raise ValueError(f"Invalid quality {quality!r}. Must be one of {sorted(VALID_QUALITIES)}") + timestamp = datetime.now(timezone.utc).isoformat() + with Session(self.engine) as session: + existing = ( + session.query(ReporterRecord) + .filter_by(login=login, ghsa_id=ghsa_id) + .first() + ) + if existing: + existing.repo = repo + existing.verdict = verdict + existing.quality = quality + existing.timestamp = timestamp + else: + session.add( + ReporterRecord( + login=login, + ghsa_id=ghsa_id, + repo=repo, + verdict=verdict, + quality=quality, + timestamp=timestamp, + ) + ) + session.commit() + return "recorded" + + def get_reporter_history(self, login: str) -> list[dict]: + """Return all triage records for a reporter, newest first.""" + with Session(self.engine) as session: + rows = ( + session.query(ReporterRecord) + .filter_by(login=login) + .order_by(ReporterRecord.timestamp.desc()) + .all() + ) + return [ + { + "login": r.login, + "ghsa_id": r.ghsa_id, + "repo": r.repo, + "verdict": r.verdict, + "quality": r.quality, + "timestamp": r.timestamp, + } + for r in rows + ] + + def get_reporter_score(self, login: str) -> dict: + """Compute and return a reputation summary for a reporter.""" + history = self.get_reporter_history(login) + total = len(history) + if total == 0: + return { + "login": login, + "total_reports": 0, + "confirmed_pct": 0.0, + "quality_breakdown": {"High": 0, "Medium": 0, "Low": 0}, + "recommendation": "no history", + } + + confirmed = sum(1 for r in history if r["verdict"] == "CONFIRMED") + confirmed_pct = confirmed / total + + quality_breakdown: dict[str, int] = {"High": 0, "Medium": 0, "Low": 0} + for r in history: + q = r["quality"] + if q in quality_breakdown: + quality_breakdown[q] += 1 + + low_share = quality_breakdown["Low"] / total + + # Derive recommendation + if confirmed_pct >= 0.6 and low_share <= 0.2: + recommendation = "high trust" + elif confirmed_pct <= 0.2 or low_share >= 0.5: + recommendation = "treat with skepticism" + else: + recommendation = "normal" + + return { + "login": login, + "total_reports": total, + "confirmed_pct": round(confirmed_pct, 4), + "quality_breakdown": quality_breakdown, + "recommendation": recommendation, + } + + +mcp = FastMCP("ReporterReputation") + +backend = ReporterReputationBackend(REPORTER_DB_DIR) + + +@mcp.tool() +def record_triage_result( + login: str = Field(description="GitHub login of the reporter"), + ghsa_id: str = Field(description="GHSA ID of the advisory, e.g. GHSA-xxxx-xxxx-xxxx"), + repo: str = Field(description="Repository in owner/repo format"), + verdict: str = Field(description="Triage verdict: CONFIRMED, UNCONFIRMED, or INCONCLUSIVE"), + quality: str = Field(description="Report quality rating: High, Medium, or Low"), +) -> str: + """ + Record or update a triage result for a PVR reporter. + + Upserts a row keyed by (login, ghsa_id). Re-running triage on the same + GHSA advisory updates the existing record rather than creating a duplicate. + Returns 'recorded' on success, or an error string for invalid inputs. + """ + try: + return backend.record_triage_result(login, ghsa_id, repo, verdict, quality) + except ValueError as e: + return f"Error: {e}" + + +@mcp.tool() +def get_reporter_history( + login: str = Field(description="GitHub login of the reporter"), +) -> str: + """ + Retrieve the full triage history for a reporter. + + Returns a JSON list of all records for this login, newest first. + Returns an empty JSON list if no history is found. + """ + history = backend.get_reporter_history(login) + return json.dumps(history, indent=2) + + +@mcp.tool() +def get_reporter_score( + login: str = Field(description="GitHub login of the reporter"), +) -> str: + """ + Compute and return a reputation score for a PVR reporter. + + Returns a JSON summary including total_reports, confirmed_pct, + quality_breakdown, and a plain-language recommendation: + 'high trust', 'normal', or 'treat with skepticism'. + """ + score = backend.get_reporter_score(login) + return json.dumps(score, indent=2) + + +if __name__ == "__main__": + mcp.run(show_banner=False) diff --git a/src/seclab_taskflows/personalities/pvr_analyst.yaml b/src/seclab_taskflows/personalities/pvr_analyst.yaml new file mode 100644 index 0000000..14daf8f --- /dev/null +++ b/src/seclab_taskflows/personalities/pvr_analyst.yaml @@ -0,0 +1,31 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Personality for PVR (Private Vulnerability Report) triage analysis. + +seclab-taskflow-agent: + version: "1.0" + filetype: personality + +personality: | + You are a security vulnerability triage analyst for an open source software maintainer. + + Your job is to verify vulnerability claims made in Private Vulnerability Reports (PVRs), + which arrive as GitHub Security Advisories (GHSAs) in triage state. + + Core principles: + - Base all conclusions on actual code evidence. Do not speculate. + - If you cannot verify a claim, say so explicitly. + - Distinguish between confirmed vulnerabilities and unverified claims. + - Be concise and direct. Maintainers are busy. + - Flag low-quality ("AI slop") reports: vague claims, wrong file paths, non-working PoC, + incorrect function signatures, or descriptions that don't match the actual code. + +task: | + Analyze the provided vulnerability report and verify claims against the actual source code. + Produce factual, evidence-based findings. Never guess or assume. + +toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.gh_file_viewer + - seclab_taskflow_agent.toolboxes.memcache diff --git a/src/seclab_taskflows/taskflows/pvr_triage/MANUAL_RESPONSE.md b/src/seclab_taskflows/taskflows/pvr_triage/MANUAL_RESPONSE.md new file mode 100644 index 0000000..b2f82e8 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/MANUAL_RESPONSE.md @@ -0,0 +1,32 @@ +# Posting a Response to a PVR Advisory + +The GitHub REST API has no comments endpoint for repository security advisories +(`/repos/{owner}/{repo}/security-advisories/{ghsa_id}/comments` → 404). The comment +thread visible in the advisory UI is internal to GitHub and not publicly accessible via +the API. + +After `pvr_respond` or `pvr_respond_batch` applies the state transition (accept/reject), +post the generated response draft to the reporter manually: + +## Steps + +1. Open the response draft generated by `pvr_triage`: + ```bash + cat reports/GHSA-xxxx-xxxx-xxxx_response_triage.md + ``` + +2. Open the advisory URL — printed in the triage report under `html_url`, or construct + it directly: + ``` + https://github.com/{owner}/{repo}/security/advisories/{GHSA-ID} + ``` + +3. Scroll to the comment box at the bottom of the advisory page, paste the response + draft, edit if needed, and submit. The comment is visible only to the reporter and + collaborators on the advisory (not public). + +## Tracking + +`pvr_respond` creates `REPORT_DIR/{GHSA-ID}_response_sent.md` after the state +transition. This marker prevents re-processing by `pvr_respond_batch` but does **not** +confirm that the comment was posted. Use it as a reminder to complete the manual step. diff --git a/src/seclab_taskflows/taskflows/pvr_triage/README.md b/src/seclab_taskflows/taskflows/pvr_triage/README.md new file mode 100644 index 0000000..b556718 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/README.md @@ -0,0 +1,318 @@ +# PVR Triage Taskflows + +Tools for triaging GitHub Security Advisories submitted via [Private Vulnerability Reporting (PVR)](https://docs.github.com/en/code-security/security-advisories/guidance-on-reporting-and-writing-information-about-vulnerabilities/privately-reporting-a-security-vulnerability). The taskflows fetch an advisory in triage state, verify the claimed vulnerability against actual source code, score report quality, and generate a structured analysis and a ready-to-send response draft. + +Four taskflows cover the full triage lifecycle: + +| Taskflow | Purpose | +|---|---| +| `pvr_triage` | Deep-analyse one advisory end-to-end | +| `pvr_triage_batch` | Score an entire inbox and produce a ranked queue | +| `pvr_respond` | Post the response for one advisory once you've reviewed the analysis | +| `pvr_respond_batch` | Scan REPORT_DIR and post all pending response drafts in a single session | + +--- + +## Requirements + +- Python ≥ 3.9 (or Docker via `run_seclab_agent.sh`) +- `gh` CLI installed and authenticated +- A GitHub token with **`repo`** and **`security_events`** scopes + - Write-back actions (`pvr_respond`) additionally require **`security_events` write** scope +- AI API credentials (`AI_API_TOKEN`, `AI_API_ENDPOINT`) + +### Environment variables + +| Variable | Required by | Description | +|---|---|---| +| `GH_TOKEN` | all | GitHub personal access token | +| `AI_API_TOKEN` | all | API key for the AI provider | +| `AI_API_ENDPOINT` | all | Model endpoint (defaults to `https://api.githubcopilot.com`) | +| `REPORT_DIR` | all | Directory where triage reports are written. Defaults to `./reports` | +| `LOG_DIR` | all | Directory for MCP server logs. Auto-detected via `platformdirs` if not set | +| `REPORTER_DB_DIR` | `pvr_triage` | Directory for the reporter reputation SQLite database. Auto-detected if not set | + +A minimal `.env` for local use: + +``` +GH_TOKEN=ghp_... +AI_API_TOKEN=... +AI_API_ENDPOINT=https://api.githubcopilot.com +REPORT_DIR=/path/to/reports +LOG_DIR=/path/to/logs +``` + +--- + +## Taskflow 1 — Single advisory triage (`pvr_triage`) + +Runs a full analysis on one GHSA in triage state and produces: + +- A structured triage report saved to `REPORT_DIR/_triage.md` +- A response draft saved to `REPORT_DIR/_response_triage.md` +- A record in the reporter reputation database + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g repo=owner/repo \ + -g ghsa=GHSA-xxxx-xxxx-xxxx +``` + +### What it does (8 tasks) + +1. **Initialize** — clears the in-memory cache. +2. **Fetch & parse** — fetches the advisory from the GitHub API and extracts structured metadata: vulnerability type, affected component, file references, PoC quality signals, reporter credits. +3. **Quality gate** — calls `get_reporter_score` for the reporter's history and `find_similar_triage_reports` to detect duplicates. Computes `fast_close` using a reputation-gated decision tree: + - **high-trust reporter** → always `fast_close = false` (full verification). + - **skepticism reporter** → `fast_close = true` when all three quality signals are absent (prior similar report not required). + - **normal / no history** → `fast_close = true` only when all three signals are absent *and* a prior similar report exists. + Fast-close skips deep code analysis. +4. **Code verification** — resolves the claimed version to a git tag/SHA, fetches the relevant source files, and checks whether the vulnerability pattern is actually present. After verifying at the claimed version, also checks HEAD to determine patch status (`still_vulnerable` / `patched` / `could_not_determine`). Skipped automatically when `fast_close` is true. +5. **Report generation** — writes a markdown report covering: Verdict, Code Verification, Severity Assessment, CVSS 3.1 assessment, Duplicate/Prior Reports, Patch Status, Report Quality, Reporter Reputation, and Recommendations. +6. **Save report** — writes the report to `REPORT_DIR/_triage.md` and prints the path. +7. **Response draft** — drafts a plain-text reply to the reporter (≤200 words, no markdown headers) tailored to the verdict: acknowledge + credit for CONFIRMED, cite evidence for UNCONFIRMED, explain missing info for INCONCLUSIVE, or request specific details for fast-close. +8. **Update reputation + save response** — records the triage outcome in the reporter reputation database and saves the response draft to `REPORT_DIR/_response_triage.md`. + +### Report structure + +``` +## PVR Triage Analysis: GHSA-xxxx-xxxx-xxxx + +**Repository:** owner/repo +**Claimed Severity:** high +**Vulnerability Type:** path traversal + +### Verdict +**[CONFIRMED / UNCONFIRMED / INCONCLUSIVE]** + +### Code Verification +### Severity Assessment +### CVSS Assessment +### Duplicate / Prior Reports +### Patch Status +### Report Quality +### Reporter Reputation +### Recommendations +``` + +--- + +## Taskflow 2 — Batch inbox scoring (`pvr_triage_batch`) + +Lists advisories in triage state for a repository, scores each unprocessed one by priority, and saves a ranked markdown table. Advisories with an existing triage report in `REPORT_DIR` are skipped and their count is noted in the output. + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g repo=owner/repo +``` + +### Output + +Saved to `REPORT_DIR/batch_queue__.md`: + +```markdown +# PVR Batch Triage Queue: owner/repo + +| GHSA | Age (days) | Severity | Vuln Type | Quality Signals | Priority | Status | Suggested Action | +|------|------------|----------|-----------|-----------------|----------|--------|-----------------| +| GHSA-... | 14 | high | SQL injection | PoC, Files | 6 | Not triaged | Triage Immediately | +| GHSA-... | 3 | medium | XSS | None | 1 | Not triaged | Likely Low Quality — Fast Close | +``` + +Rows are sorted by priority score descending; ties are broken by `created_at` ascending (oldest advisory first). + +### Priority scoring + +Advisories with an existing report in `REPORT_DIR` are skipped entirely. Only unprocessed advisories are scored: + +``` +priority_score = severity_weight + quality_weight + +severity_weight: critical=4 high=3 medium=2 low=1 unknown=1 +quality_weight: has_file_references(+1) + has_poc(+1) + has_line_numbers(+1) +``` + +**Suggested actions:** + +| Score | Action | +|---|---| +| ≥ 5 | Triage Immediately | +| ≥ 3 | Triage Soon | +| 2 | Triage | +| ≤ 1 | Likely Low Quality — Fast Close | + +--- + +## Taskflow 3 — Write-back (`pvr_respond`) + +Loads an existing triage report and applies the chosen state transition to the GitHub advisory. All write-back calls are confirm-gated — the agent will prompt for confirmation before making any change. + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g repo=owner/repo \ + -g ghsa=GHSA-xxxx-xxxx-xxxx \ + -g action=accept +``` + +### Actions + +| `action` | API call | When to use | +|---|---|---| +| `accept` | Sets advisory state to `draft` (triage → draft) | Vulnerability confirmed — maintainer intends to publish an advisory | +| `reject` | Sets advisory state to `closed` | Report is clearly invalid or low quality | + +> **Note:** `pvr_respond` requires that `pvr_triage` has already been run for the GHSA so that `_triage.md` and `_response_triage.md` exist in `REPORT_DIR`. + +> **Posting the response:** The GitHub REST API has no comments endpoint for security advisories. After running `pvr_respond`, post the response draft manually via the advisory URL. See [`MANUAL_RESPONSE.md`](MANUAL_RESPONSE.md) for instructions and language. + +### Confirm gate + +The toolbox marks `accept_pvr_advisory` and `reject_pvr_advisory` as `confirm`-gated. The agent will print the verdict and summary, then ask for explicit confirmation before making any change to GitHub. + +After a successful state transition, `pvr_respond` calls `mark_response_sent` to create a `_response_sent.md` marker so `pvr_respond_batch` will skip this advisory in future runs. + +--- + +## Taskflow 4 — Bulk respond (`pvr_respond_batch`) + +Scans `REPORT_DIR` for advisories that have a response draft (`*_response_triage.md`) but no applied marker (`*_response_sent.md`), and applies the chosen state transition to each in a single session. + +```bash +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ + -g repo=owner/repo \ + -g action=reject + +# or via the helper script: +./scripts/run_pvr_triage.sh respond_batch owner/repo reject +``` + +### How it works + +**Task 1** calls `list_pending_responses` (local read-only, no confirm gate) to find all pending advisories and prints a summary table. If there are none it stops immediately. + +**Task 2** iterates over every pending entry: +1. Reads the triage report from disk. +2. Prints a per-item summary (GHSA, verdict). +3. Executes the chosen action (`accept` / `reject`) via the confirm-gated write-back tool. +4. On success, calls `mark_response_sent` to create a `*_response_sent.md` marker so the advisory is skipped in future runs. + +Prints a final count and a reminder to post each response draft manually. + +### Applied markers + +`pvr_respond` also calls `mark_response_sent` after a successful state transition, keeping single-advisory and bulk runs in sync. Once a marker exists, neither `pvr_respond` nor `pvr_respond_batch` will re-process it. + +--- + +## Typical workflow + +``` +1. Run pvr_triage_batch to see what's in your inbox and prioritise. + +2. For each advisory you want to analyse: + Run pvr_triage. + +3. Review the saved report in REPORT_DIR: + - Check the Verdict and Code Verification sections. + - Edit the response draft (_response_triage.md) if needed. + +4a. Apply a state transition with pvr_respond: + - action=accept → move to draft (triage → draft) + - action=reject → close (triage → closed) + Then post the response draft manually via the advisory URL. + +4b. Or apply state transitions to all pending advisories at once with pvr_respond_batch: + Scans REPORT_DIR for pending entries (no _response_sent.md marker) + and applies the chosen action to all of them in one session. + Then post each response draft manually via the advisory URL. +``` + +### Example session + +```bash +# Step 1: score the inbox +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ + -g repo=acme/widget + +# Step 2: triage the highest-priority advisory +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ + -g repo=acme/widget \ + -g ghsa=GHSA-1234-5678-abcd + +# Step 3: review the output +cat reports/GHSA-1234-5678-abcd_triage.md +cat reports/GHSA-1234-5678-abcd_response_triage.md + +# Step 4a: accept (triage → draft) — vulnerability confirmed +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g repo=acme/widget \ + -g ghsa=GHSA-1234-5678-abcd \ + -g action=accept + +# Step 4b: or reject (triage → closed) — invalid or low-quality report +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ + -g repo=acme/widget \ + -g ghsa=GHSA-1234-5678-abcd \ + -g action=reject + +# Step 4c: or apply state transitions to all pending advisories at once +python -m seclab_taskflow_agent \ + -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ + -g repo=acme/widget \ + -g action=reject + +# Step 5: post each response draft manually via the advisory URL +# See taskflows/pvr_triage/MANUAL_RESPONSE.md for instructions +``` + +--- + +## Reporter reputation + +Every completed `pvr_triage` run records the verdict and quality rating against the reporter's GitHub login in a local SQLite database (`REPORTER_DB_DIR/reporter_reputation.db`). + +The quality gate in Task 3 of `pvr_triage` calls `get_reporter_score` automatically before any code analysis. The score summary appears in the report under **Reporter Reputation**. + +**Reputation thresholds:** + +| Condition | Recommendation | +|---|---| +| confirmed_pct ≥ 60% and Low-quality share ≤ 20% | high trust | +| confirmed_pct ≤ 20% or Low-quality share ≥ 50% | treat with skepticism | +| Otherwise | normal | + +Reputation directly gates the fast-close decision. See [SCORING.md](SCORING.md) Section 3 for the full three-path decision table and reputation × fast-close matrix. + +--- + +## Models + +The taskflows use `seclab_taskflows.configs.model_config_pvr_triage`, which defines two model roles: + +| Role | Used for | Default model | +|---|---|---| +| `triage` | Code verification and report generation | `claude-opus-4.6-1m` | +| `extraction` | Fetch/parse, quality gate, save tasks | `gpt-5-mini` | + +Override the model config by setting `AI_API_ENDPOINT` and `AI_API_TOKEN` to point at a compatible provider. + +--- + +## Output files + +All files are written to `REPORT_DIR` (default: `./reports`). + +| File | Written by | Contents | +|---|---|---| +| `_triage.md` | `pvr_triage` task 6 | Full triage analysis report | +| `_response_triage.md` | `pvr_triage` task 8 | Plain-text response draft for the reporter | +| `_response_sent.md` | `pvr_respond` / `pvr_respond_batch` | Marker: state transition applied (contains ISO timestamp); post draft manually | +| `batch_queue__.md` | `pvr_triage_batch` task 3 | Ranked inbox table with Age column | diff --git a/src/seclab_taskflows/taskflows/pvr_triage/SCORING.md b/src/seclab_taskflows/taskflows/pvr_triage/SCORING.md new file mode 100644 index 0000000..6002250 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/SCORING.md @@ -0,0 +1,170 @@ +# PVR Triage Scoring Reference + +This document describes every scoring decision made by the PVR triage taskflows: batch priority scoring, single-advisory quality signals, fast-close detection, and reporter reputation thresholds. All values are authoritative — they reflect the exact constants in the taskflow YAML and MCP server code. + +--- + +## 1. Batch Priority Score (`pvr_triage_batch`) + +Used to rank unprocessed advisories in triage state before analysis. + +### Severity weight + +| Severity | Weight | +|---|---| +| critical | 4 | +| high | 3 | +| medium | 2 | +| low | 1 | +| unknown | 1 | + +### Quality weight + +Extracted from the advisory description text. Each signal present adds 1 point. + +| Signal | Condition | +|---|---| +| `has_file_references` | Description mentions at least one specific source file path | +| `has_poc` | Description includes reproduction steps or exploit code | +| `has_line_numbers` | Description cites at least one line number | + +### Formula + +``` +priority_score = severity_weight + quality_weight (max: 7) +``` + +### Suggested action thresholds + +| priority_score | Suggested action | +|---|---| +| ≥ 5 | Triage Immediately | +| ≥ 3 | Triage Soon | +| 2 | Triage | +| ≤ 1 | Likely Low Quality — Fast Close | + +### Score reference table + +| Severity | No signals | 1 signal | 2 signals | 3 signals | +|---|---|---|---|---| +| critical | 4 — Triage Soon | 5 — **Triage Immediately** | 6 — **Triage Immediately** | 7 — **Triage Immediately** | +| high | 3 — Triage Soon | 4 — Triage Soon | 5 — **Triage Immediately** | 6 — **Triage Immediately** | +| medium | 2 — Triage | 3 — Triage Soon | 4 — Triage Soon | 5 — **Triage Immediately** | +| low | 1 — Fast Close | 2 — Triage | 3 — Triage Soon | 4 — Triage Soon | + +**Key observations:** +- A bare `critical` with no quality signals scores 4 — Triage Soon, not Triage Immediately. +- `high` needs at least two quality signals to reach Triage Immediately. +- `medium` needs all three quality signals to reach Triage Immediately. +- Any `low` severity report with no quality signals is Fast Close. + +### Already-triaged advisories + +Advisories with an existing `_triage.md` in `REPORT_DIR` are skipped entirely and do not appear in the scored queue. Their count is noted in the batch report summary. + +--- + +## 2. Single-Advisory Quality Signals (`pvr_triage`) + +The quality gate in Task 3 extracts the same three signals as the batch scorer, plus two additional ones used for the report quality rating. + +| Signal | Used in | +|---|---| +| `has_file_references` | Fast-close, report quality rating | +| `has_line_numbers` | Fast-close, report quality rating | +| `has_poc` | Fast-close, report quality rating | +| `has_version_info` | Report quality rating only | +| `has_code_snippets` | Report quality rating only | + +### Report quality rating + +Assigned by the analyst in the report generation task. + +| Rating | Criteria | +|---|---| +| High | Specific, accurate claims; verified PoC; correct file paths and line numbers | +| Medium | Partially accurate; some details wrong or missing | +| Low | Vague, speculative, or significantly inaccurate ("AI slop") | + +--- + +## 3. Fast-Close Detection (`pvr_triage`) + +The quality gate evaluates `fast_close` via a three-path decision tree gated on the reporter's reputation. + +### Path A — High-trust reporter + +| Condition | Result | +|---|---| +| `reporter_score.recommendation == "high trust"` | `fast_close = false` unconditionally | + +High-trust reporters always receive full code verification regardless of quality signals. + +### Path B — Skepticism reporter + +| Condition | Result | +|---|---| +| `reporter_score.recommendation == "treat with skepticism"` **and** all three signals absent | `fast_close = true` | +| `reporter_score.recommendation == "treat with skepticism"` **and** any signal present | `fast_close = false` | + +For skepticism reporters, a prior similar report is **not** required — the three absent quality signals alone are sufficient to trigger fast-close. + +### Path C — Normal / no history + +All four conditions must hold simultaneously: + +1. `has_file_references` is false +2. `has_poc` is false +3. `has_line_numbers` is false +4. At least one similar report already exists in `REPORT_DIR` with verdict `UNCONFIRMED` or `CONFIRMED` + +Conditions 1–3 alone are not sufficient — there must also be a prior report on a similar issue. A novel low-quality report for an unseen component proceeds to full verification. + +### Reputation × fast-close summary matrix + +| Reputation | No quality signals, no prior similar | No quality signals, prior similar exists | Any quality signal present | +|---|---|---|---| +| high trust | full verification | full verification | full verification | +| normal / no history | full verification | **fast-close** | full verification | +| treat with skepticism | **fast-close** | **fast-close** | full verification | + +When `fast_close` is true, code verification is skipped entirely. The response draft uses the fast-close template (requests specific file path, line number, and reproduction steps). + +--- + +## 4. Reporter Reputation (`reporter_reputation.py`) + +Accumulated from every completed `pvr_triage` run. Keyed by GitHub login. + +### Inputs per record + +| Field | Values | +|---|---| +| verdict | CONFIRMED / UNCONFIRMED / INCONCLUSIVE | +| quality | High / Medium / Low | + +### Score metrics + +``` +confirmed_pct = confirmed_count / total_reports +low_share = Low_count / total_reports +``` + +### Recommendation thresholds + +| Condition | Recommendation | +|---|---| +| confirmed_pct ≥ 0.60 **and** low_share ≤ 0.20 | high trust | +| confirmed_pct ≤ 0.20 **or** low_share ≥ 0.50 | treat with skepticism | +| Otherwise | normal | +| No history | no history | + +### Effect on triage + +The reputation score directly influences the fast-close decision (see Section 3): + +- **high trust** — always forces full code verification. +- **treat with skepticism** — lowers the fast-close bar: only three absent quality signals are needed (no prior similar report required). +- **normal / no history** — standard four-condition fast-close applies. + +The score also appears in the triage report under **Reporter Reputation** for maintainer awareness. diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond.yaml new file mode 100644 index 0000000..c4370cf --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond.yaml @@ -0,0 +1,112 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Respond Taskflow +# +# Loads a previously generated triage report and response draft from disk +# and executes the selected write-back action on the GitHub advisory. +# All write-back API calls are confirm-gated in the pvr_ghsa toolbox. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_respond \ +# -g repo=owner/repo \ +# -g ghsa=GHSA-xxxx-xxxx-xxxx \ +# -g action=accept|reject +# +# Required environment variables: +# GH_TOKEN - GitHub token with security_events write scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) +# REPORT_DIR - Directory where triage reports are stored + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # GHSA ID of the advisory to act on + ghsa: + # Action to perform: accept or reject + action: + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: Load triage report and response draft from disk + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Read the triage report for advisory "{{ globals.ghsa }}" using read_triage_report + with ghsa_id="{{ globals.ghsa }}". + + Store the triage report content under memcache key "triage_report". + + Read the response draft using read_triage_report with + ghsa_id="{{ globals.ghsa }}_response". + + Store the response draft content under memcache key "response_draft". + + From the triage report, extract and print: + - Verdict (CONFIRMED / UNCONFIRMED / INCONCLUSIVE) + - Report Quality (High / Medium / Low) + - A 1-2 sentence summary of the findings + + Then print the full response draft. + + If either file is missing (read_triage_report returns "Report not found"), + print a clear error message and stop. + + # ------------------------------------------------------------------------- + # Task 2: Confirm and execute write-back action + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "triage_report" and "response_draft" from memcache. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + The requested action is: "{{ globals.action }}" + + Execute the action as follows: + + If action is "accept": + Call accept_pvr_advisory with: + - owner: extracted owner + - repo: extracted repo + - ghsa_id: "{{ globals.ghsa }}" + + If action is "reject": + Call reject_pvr_advisory with: + - owner: extracted owner + - repo: extracted repo + - ghsa_id: "{{ globals.ghsa }}" + + If action is anything else: + Print: "Unknown action '{{ globals.action }}'. Valid actions: accept, reject" + and stop. + + Print the result returned by the API call. + + On success, call mark_response_sent with ghsa_id="{{ globals.ghsa }}" to record + that the state transition has been applied. + + Then print: "Response draft saved at REPORT_DIR/{{ globals.ghsa }}_response_triage.md + — post it to the reporter manually via the advisory URL." diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond_batch.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond_batch.yaml new file mode 100644 index 0000000..061d2ce --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_respond_batch.yaml @@ -0,0 +1,95 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Bulk Respond Taskflow +# +# Scans REPORT_DIR for pending response drafts (advisories with a +# *_response_triage.md but no *_response_sent.md marker) and applies +# the chosen state transition to each in a single session. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch \ +# -g repo=owner/repo \ +# -g action=accept|reject +# +# Required environment variables: +# GH_TOKEN - GitHub token with security_events write scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) +# REPORT_DIR - Directory where triage reports are stored + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # Action to apply to all pending responses: accept or reject + action: + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: List pending responses + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Call list_pending_responses to find all advisories with a response draft + that has not yet been sent. + + If the result is an empty list, print "No pending responses." and stop. + + Otherwise print a summary table: + + | GHSA | Triage Report Exists | + |------|---------------------| + [one row per pending entry] + + Store the list under memcache key "pending_responses". + + # ------------------------------------------------------------------------- + # Task 2: Send each response + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pending_responses" from memcache. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + The requested action is: "{{ globals.action }}" + + For each entry in pending_responses: + 1. Call read_triage_report with ghsa_id=entry.ghsa_id to get the triage report. + 2. Print a per-item summary: + GHSA: {entry.ghsa_id} + Verdict: [extracted from triage report] + 3. Execute the action: + If action is "accept": + Call accept_pvr_advisory with owner, repo, ghsa_id=entry.ghsa_id. + If action is "reject": + Call reject_pvr_advisory with owner, repo, ghsa_id=entry.ghsa_id. + If action is anything else: + Print: "Unknown action '{{ globals.action }}'. Skipping {entry.ghsa_id}." + and continue to the next entry. + 4. On success, call mark_response_sent with ghsa_id=entry.ghsa_id. + Print: "Applied: {entry.ghsa_id} — post response draft manually via advisory URL." + + After processing all entries, print: + "Applied N / M state transitions. Post response drafts manually via each advisory URL." diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage.yaml new file mode 100644 index 0000000..a3e9048 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage.yaml @@ -0,0 +1,419 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Triage Taskflow +# +# Fetches a GHSA in triage state submitted via Private Vulnerability Reporting, +# verifies the vulnerability claim against actual source code, assesses +# impact and report quality, and generates a structured triage analysis +# for the maintainer. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_triage \ +# -g repo=owner/repo \ +# -g ghsa=GHSA-xxxx-xxxx-xxxx +# +# Required environment variables: +# GH_TOKEN - GitHub token with repo and security_events scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + # GHSA ID of the advisory to triage + ghsa: + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: Initialize + # ------------------------------------------------------------------------- + - task: + must_complete: true + headless: true + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Clear the memory cache. + + # ------------------------------------------------------------------------- + # Task 2: Fetch and parse the GHSA + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Fetch the security advisory {{ globals.ghsa }} for repository {{ globals.repo }}. + + Extract the owner and repo name from "{{ globals.repo }}" (format: owner/repo). + + Store the full raw advisory description text under key "pvr_description". + + Then store a structured summary in memcache under the key "pvr_parsed" as a JSON + object with these fields: + - ghsa_id: the GHSA ID + - repo: "{{ globals.repo }}" + - summary: the advisory one-line summary + - severity_claimed: the severity rating in the advisory (critical/high/medium/low) + - vuln_type: vulnerability class (e.g. "path traversal", "IDOR", "XSS", "SQL injection") + - affected_component: the component, endpoint, or feature described as vulnerable + - affected_files: list of source file paths explicitly mentioned (empty list if none) + - affected_functions: list of function/method names mentioned (empty list if none) + - affected_versions: for version ranges, prefer the structured vulnerabilities[].vulnerable_versions + field from the advisory API response. Fall back to parsing the description only if + the structured field is absent or empty. Empty list if none found. + - poc_provided: true if a proof-of-concept or reproduction steps are described + - poc_summary: brief description of the PoC steps, or null if none provided + - quality_signals: + has_file_references: true if specific source file paths are cited + has_line_numbers: true if specific line numbers are cited + has_poc: true if reproduction steps are provided + has_version_info: true if specific affected versions are mentioned + has_code_snippets: true if actual code is quoted in the report + - credits: the credits list from the advisory API response (list of {login, type} objects) + + Do not perform any code analysis yet. + + # ------------------------------------------------------------------------- + # Task 3: Quick Quality Gate + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.reporter_reputation + user_prompt: | + Retrieve "pvr_parsed" from memcache. + + Extract reporter login from pvr_parsed.credits: find the first entry with + type "reporter" and use its login. If credits is empty or no reporter type + is found, use "unknown". + + Call get_reporter_score with that login and store the result as reporter_score. + + Call find_similar_triage_reports with: + - vuln_type: pvr_parsed.vuln_type + - affected_component: pvr_parsed.affected_component + + Evaluate fast_close based on reporter_score.recommendation: + + If reporter_score.recommendation is "high trust": + Set fast_close = false unconditionally. + Set reason = "High-trust reporter — full verification required." + + Else if reporter_score.recommendation is "treat with skepticism": + Set fast_close = true if ALL THREE quality signals are absent: + - pvr_parsed.quality_signals.has_file_references is false + - pvr_parsed.quality_signals.has_poc is false + - pvr_parsed.quality_signals.has_line_numbers is false + (Prior similar report NOT required for skepticism reporters.) + Set reason accordingly. + + Else (normal / no history): + Set fast_close = true only if ALL FOUR conditions hold: + - pvr_parsed.quality_signals.has_file_references is false + - pvr_parsed.quality_signals.has_poc is false + - pvr_parsed.quality_signals.has_line_numbers is false + - At least one similar report exists with verdict UNCONFIRMED or CONFIRMED + Set reason accordingly. + + Store under memcache key "quality_gate": + { + "fast_close": true or false, + "reason": "brief explanation of why fast_close was triggered or not", + "reporter_login": "the login extracted above", + "reporter_score": {the full object returned by get_reporter_score}, + "similar_reports": [the list returned by find_similar_triage_reports] + } + + # ------------------------------------------------------------------------- + # Task 4: Verify vulnerability in source code + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: triage + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.gh_file_viewer + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "pvr_description", and "quality_gate" from memcache. + + If quality_gate.fast_close is true, store under "code_verification": + { + "ref_used": null, + "files_examined": [], + "vulnerability_confirmed": null, + "confirmation_evidence": "Fast-close: quality gate triggered. Reason: {quality_gate.reason}", + "mitigation_found": null, + "mitigation_details": null, + "patch_status": "could_not_determine", + "patch_notes": null, + "notes": "Skipped deep analysis." + } + and stop. Do not fetch any files. + + Otherwise proceed with full code verification: + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + Verify the vulnerability at the affected version, not HEAD. + If affected_versions lists a version (e.g. "<= 1.25.4"), resolve the + upper bound to a git commit SHA using resolve_version_ref, then use + fetch_file_at_ref to fetch code at that SHA. If no version is specified, + fall back to fetch_file_from_gh / get_file_lines_from_gh (HEAD). + + If affected_files or affected_versions are empty, read pvr_description + directly to identify any file paths, function names, or version references + the extraction may have missed. Advisory descriptions vary widely in format + and structure — treat pvr_parsed as a starting point, not a complete picture. + + For each file path identified: + 1. Resolve the version to a SHA (if available). + 2. Fetch the file at that SHA using fetch_file_at_ref. + 3. Locate the affected function(s) at the stated line numbers. + 4. Check whether the vulnerability pattern described in the advisory + is present at that version. + 5. Look for authorization checks, input validation, or other mitigations. + + If no specific files are named, use search_repo_from_gh to locate + the affected function names or code patterns. + + Focus on the specific code path described. Do not perform a broad audit. + + After completing the main verification at the claimed version, re-check the + same code pattern at HEAD using fetch_file_from_gh. Add to code_verification: + "patch_status": "still_vulnerable" | "patched" | "could_not_determine" + "patch_notes": brief description of what changed at HEAD (or null) + + Store your findings under memcache key "code_verification" as JSON: + - ref_used: the git SHA or ref used for code fetching (or "HEAD" if none) + - files_examined: list of file paths fetched + - vulnerability_confirmed: true / false / null (null = could not determine) + - confirmation_evidence: precise description of what the code does, + including file path and line numbers + - mitigation_found: true if existing checks prevent exploitation + - mitigation_details: description of mitigating code, or null + - patch_status: "still_vulnerable" | "patched" | "could_not_determine" + - patch_notes: description of HEAD state vs claimed version (or null) + - notes: any additional observations + + # ------------------------------------------------------------------------- + # Task 5: Generate triage report + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: triage + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "pvr_description", "code_verification", and "quality_gate" + from memcache. + + Generate a triage analysis report in markdown and store it under + memcache key "triage_report". + + The report must follow this structure exactly: + + --- + + ## PVR Triage Analysis: {{ globals.ghsa }} + + **Repository:** {{ globals.repo }} + **Claimed Severity:** [from pvr_parsed] + **Vulnerability Type:** [from pvr_parsed] + + ### Verdict + + **[CONFIRMED / UNCONFIRMED / INCONCLUSIVE]** + + One or two sentences stating the verdict and the primary reason. + + ### Code Verification + + State the git ref (version tag / commit SHA) used for analysis, or note + if HEAD was used and why. + Describe exactly what code was examined and what was found. + Reference specific file paths and line numbers. + If the vulnerability is confirmed, show the vulnerable code pattern. + If unconfirmed, explain what the code actually does and why it is not vulnerable. + If inconclusive, explain what could not be determined and why. + + ### Severity Assessment + + State whether the claimed severity is accurate, overstated, or understated. + Base this on the actual exploitability and impact from the code evidence. + + ### CVSS Assessment + + Derive a CVSS 3.1 vector for this vulnerability based on the code evidence. + State: Base Score, Vector String, and whether the reporter's claimed severity + (pvr_parsed.severity_claimed) is accurate / overstated / understated. + If vulnerability_confirmed is false or null, note that CVSS is based on + the claimed scenario and may not reflect actual risk. + + ### Duplicate / Prior Reports + + If quality_gate.similar_reports is non-empty, list them with their verdict and quality. + Note whether this report adds new evidence vs. restating a known issue. + If similar_reports is empty, state "No similar prior reports found." + + ### Patch Status + + State code_verification.patch_status at HEAD. + If patched: note the triage impact (lower urgency for confirmed vulnerabilities). + If still_vulnerable: note urgency is unchanged. + If could_not_determine: state that HEAD status could not be assessed. + + ### Report Quality + + Assess the quality of the PVR submission: + - Note which claims were accurate (correct file paths, line numbers, functions) + - Note any inaccuracies (wrong paths, non-existent functions, incorrect PoC) + - Rate overall quality: High / Medium / Low + - High: specific, accurate, verified PoC + - Medium: partially accurate, some details wrong or missing + - Low: vague, speculative, or significantly inaccurate ("AI slop") + + ### Reporter Reputation + + Reporter login: [quality_gate.reporter_login] + Score summary: [quality_gate.reporter_score.recommendation] (confirmed_pct, + total_reports, quality_breakdown from reporter_score) + + ### Recommendations + + Provide 1-3 specific, actionable recommendations for the maintainer. + If confirmed: suggest the fix approach. + If unconfirmed: suggest whether to close, request more info, or monitor. + If low quality: recommend closing with explanation. + + --- + + Be factual. Do not include anything not supported by code evidence. + Keep the report concise. Aim for under 800 words. + + After generating the report, also store a structured summary under memcache + key "triage_outcome": + { + "verdict": "CONFIRMED" | "UNCONFIRMED" | "INCONCLUSIVE", + "quality": "High" | "Medium" | "Low" + } + + # ------------------------------------------------------------------------- + # Task 6: Save report to disk and print path + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve the "triage_report" from memcache. + + Call save_triage_report with: + - ghsa_id: "{{ globals.ghsa }}" + - report: the full report content exactly as stored in memcache + + Then print the report content verbatim, followed by a blank line and: + "Report saved to: " + + # ------------------------------------------------------------------------- + # Task 7: Generate Reporter Response Draft + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "code_verification", "quality_gate", "triage_report", + and "triage_outcome" from memcache. + + Use triage_outcome.verdict as the verdict. + + Draft a response comment to the reporter. Tone: direct, factual, not harsh. + Select the template based on triage_outcome.verdict and quality_gate.fast_close: + + fast_close (quality_gate.fast_close=true): + Explain that the report lacks file paths, functions, and reproduction steps + that match the codebase. Invite resubmission with specific details including + the exact file path, line number, and a concrete reproduction scenario. + + CONFIRMED: + Acknowledge the finding. State that a fix is in progress and credit will + be given when the advisory is published. + + UNCONFIRMED: + Cite specific code evidence for why the claim could not be confirmed + (reference the file path and what the code actually does). Ask for more + specific reproduction steps if the reporter wants to follow up. + + INCONCLUSIVE: + Explain what specific information is missing to complete verification + (e.g. exact version, file path, reproduction steps). + + Keep the response under 200 words. No markdown headers. Plain text suitable + for a GitHub comment. + + Store under memcache key "response_draft". + + # ------------------------------------------------------------------------- + # Task 8: Update Reporter Reputation + Save Response Draft + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflows.toolboxes.reporter_reputation + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_parsed", "code_verification", "quality_gate", "triage_report", + "triage_outcome", and "response_draft" from memcache. + + Use triage_outcome.verdict as the verdict. + Use triage_outcome.quality as the quality rating. + Extract reporter login from quality_gate.reporter_login. + + Call record_triage_result with: + - login: quality_gate.reporter_login + - ghsa_id: "{{ globals.ghsa }}" + - repo: "{{ globals.repo }}" + - verdict: the extracted verdict + - quality: the extracted quality rating + + Call save_triage_report with: + - ghsa_id: "{{ globals.ghsa }}_response" + - report: response_draft + + Print: "Response draft saved." followed by the response_draft text. diff --git a/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage_batch.yaml b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage_batch.yaml new file mode 100644 index 0000000..4ba2d86 --- /dev/null +++ b/src/seclab_taskflows/taskflows/pvr_triage/pvr_triage_batch.yaml @@ -0,0 +1,171 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# PVR Triage Batch Taskflow +# +# Lists PVR advisories in triage state for a repository, scores each unprocessed one by +# priority (based on severity and quality signals), and outputs a ranked +# markdown table to REPORT_DIR for maintainer review. +# Advisories with an existing triage report in REPORT_DIR are skipped. +# +# Usage: +# python -m seclab_taskflow_agent \ +# -t seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch \ +# -g repo=owner/repo +# +# Required environment variables: +# GH_TOKEN - GitHub token with repo and security_events scope +# AI_API_TOKEN - API token for the AI model provider +# AI_API_ENDPOINT - Model provider endpoint (default: https://api.githubcopilot.com) +# REPORT_DIR - Directory where triage reports are stored (and batch output is saved) + +seclab-taskflow-agent: + version: "1.0" + filetype: taskflow + +model_config: seclab_taskflows.configs.model_config_pvr_triage + +globals: + # GitHub repository in owner/repo format + repo: + +taskflow: + # ------------------------------------------------------------------------- + # Task 1: List triage advisories + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + Call list_pvr_advisories with owner, repo, and state="triage" to retrieve + all advisories in triage state. + + Store the full JSON list under memcache key "pvr_queue". + + Print: "Found N triage advisories for {{ globals.repo }}." where N is the count. + + If no advisories are found, print "No triage advisories found." and stop. + + # ------------------------------------------------------------------------- + # Task 2: Score each advisory + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflows.personalities.pvr_analyst + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "pvr_queue" from memcache. + + Extract owner and repo from "{{ globals.repo }}" (format: owner/repo). + + For each advisory in pvr_queue: + 1. Call fetch_pvr_advisory to get the full advisory details. + 2. Check for existing triage by calling read_triage_report with the ghsa_id. + If the result does not start with "Report not found", mark already_triaged=true + and extract the verdict from the report content. + Otherwise, mark already_triaged=false and verdict=null. + 3. Extract quality signals from the description: + - has_file_references: description mentions specific file paths + - has_poc: description includes reproduction steps or exploit code + - has_line_numbers: description cites line numbers + 4. Compute priority_score using this formula: + severity_weight: critical=4, high=3, medium=2, low=1, unknown=1 + quality_weight: has_file_references(+1) + has_poc(+1) + has_line_numbers(+1) + priority_score = severity_weight + quality_weight + 5. Determine suggested_action: + - If already_triaged and verdict is UNCONFIRMED or INCONCLUSIVE: "Review/Close" + - If already_triaged and verdict is CONFIRMED: "Fix/Publish" + - If priority_score >= 5: "Triage Immediately" + - If priority_score >= 3: "Triage Soon" + - If priority_score <= 1: "Likely Low Quality — Fast Close" + - Otherwise: "Triage" + + Build a list of scored entries, each with: + {ghsa_id, severity, summary, vuln_type, quality_signals, + priority_score, already_triaged, verdict, suggested_action, created_at} + + Sort the list: primary key priority_score descending; ties broken by + created_at ascending (oldest advisory first). + + Split the list: + - scored_queue: entries where already_triaged=false only + - skipped_count: count of entries where already_triaged=true + + Store scored_queue under memcache key "scored_queue". + Store skipped_count under memcache key "skipped_count". + + # ------------------------------------------------------------------------- + # Task 3: Generate and save ranked queue report + # ------------------------------------------------------------------------- + - task: + must_complete: true + model: extraction + agents: + - seclab_taskflow_agent.personalities.assistant + toolboxes: + - seclab_taskflows.toolboxes.pvr_ghsa + - seclab_taskflow_agent.toolboxes.memcache + user_prompt: | + Retrieve "scored_queue" and "skipped_count" from memcache. + + Generate today's date in YYYY-MM-DD format. + + For each entry in scored_queue compute days_pending: + days_pending = (today - date(created_at)).days (integer, round down) + Parse created_at as an ISO 8601 date string (YYYY-MM-DD prefix is sufficient). + If created_at is missing or unparseable, use "?" for Age. + + Build a report string with this structure: + + # PVR Batch Triage Queue: {{ globals.repo }} + + **Generated:** [today's date] + **Pending triage:** [count of scored_queue entries] + **Skipped (already triaged):** [skipped_count] + + | GHSA | Age (days) | Severity | Vuln Type | Quality Signals | Priority | Status | Suggested Action | + |------|------------|----------|-----------|-----------------|----------|--------|-----------------| + [one row per advisory, sorted by priority_score desc then created_at asc] + + For each row: + - GHSA: the ghsa_id as a plain string + - Age (days): days_pending computed above + - Severity: severity from the advisory + - Vuln Type: vuln_type (truncated to 30 chars if needed) + - Quality Signals: compact representation, e.g. "PoC, Files, Lines" for all three, + or list only the signals present, or "None" if all false + - Priority: priority_score as an integer + - Status: "Triaged (CONFIRMED)" / "Triaged (UNCONFIRMED)" / "Triaged (INCONCLUSIVE)" / + "Not triaged" + - Suggested Action: from suggested_action field + + If scored_queue is empty, replace the table with: + "No pending advisories." + + After the table, add a section: + + ## Summary + + List any advisories with priority_score >= 5 as "Requires immediate attention." + If skipped_count > 0, note: "[skipped_count] already-triaged advisories skipped." + + Sanitize the repo name for use in a filename: replace "/" and any non-alphanumeric + characters (except "-" and "_") with "_". + + Call save_triage_report with: + - ghsa_id: "batch_queue_[sanitized_repo]_[today's date]" + - report: the full report string + + Print: "Batch queue report saved to: " + Then print the full report. diff --git a/src/seclab_taskflows/toolboxes/pvr_ghsa.yaml b/src/seclab_taskflows/toolboxes/pvr_ghsa.yaml new file mode 100644 index 0000000..be7adde --- /dev/null +++ b/src/seclab_taskflows/toolboxes/pvr_ghsa.yaml @@ -0,0 +1,26 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Toolbox: PVR GHSA advisory fetcher +# +# Provides tools for fetching GitHub Security Advisories in triage state submitted +# via Private Vulnerability Reporting. Uses the gh CLI for API calls. +# +# Requires GH_TOKEN with repo or security_events scope to read advisories in triage state. + +seclab-taskflow-agent: + version: "1.0" + filetype: toolbox + +server_params: + kind: stdio + command: python + args: ["-m", "seclab_taskflows.mcp_servers.pvr_ghsa"] + env: + GH_TOKEN: "{{ env('GH_TOKEN') }}" + LOG_DIR: "{{ env('LOG_DIR') }}" + REPORT_DIR: "{{ env('REPORT_DIR', 'reports') }}" +# Guard write-back tools: user must confirm before execution +confirm: + - accept_pvr_advisory + - reject_pvr_advisory diff --git a/src/seclab_taskflows/toolboxes/reporter_reputation.yaml b/src/seclab_taskflows/toolboxes/reporter_reputation.yaml new file mode 100644 index 0000000..0c799ec --- /dev/null +++ b/src/seclab_taskflows/toolboxes/reporter_reputation.yaml @@ -0,0 +1,20 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Toolbox: Reporter Reputation tracker +# +# Provides tools for recording PVR triage outcomes per reporter and +# querying their reputation score across prior reports. + +seclab-taskflow-agent: + version: "1.0" + filetype: toolbox + +server_params: + kind: stdio + command: python + args: ["-m", "seclab_taskflows.mcp_servers.reporter_reputation"] + env: + GH_TOKEN: "{{ env('GH_TOKEN') }}" + LOG_DIR: "{{ env('LOG_DIR') }}" + REPORTER_DB_DIR: "{{ env('REPORTER_DB_DIR', '') }}" diff --git a/tests/test_pvr_mcp.py b/tests/test_pvr_mcp.py new file mode 100644 index 0000000..5cd8380 --- /dev/null +++ b/tests/test_pvr_mcp.py @@ -0,0 +1,437 @@ +# SPDX-FileCopyrightText: GitHub, Inc. +# SPDX-License-Identifier: MIT + +# Unit tests for the PVR MCP server extensions and reporter reputation backend. +# +# Run with: pytest tests/test_pvr_mcp.py -v + +import json +import tempfile +import unittest +from pathlib import Path +from unittest.mock import patch + +import pytest + + +# --------------------------------------------------------------------------- +# Helpers: patch mcp_data_dir so imports don't fail in CI (no platformdirs dir) +# --------------------------------------------------------------------------- + +def _patch_report_dir(tmp_path): + """Return a context manager that patches REPORT_DIR in pvr_ghsa.""" + import seclab_taskflows.mcp_servers.pvr_ghsa as pvr_mod + return patch.object(pvr_mod, "REPORT_DIR", tmp_path) + + +# --------------------------------------------------------------------------- +# TestPvrGhsaTools +# --------------------------------------------------------------------------- + +class TestPvrGhsaTools(unittest.TestCase): + """Tests for the new write-back and similarity tools in pvr_ghsa.py.""" + + def setUp(self): + import seclab_taskflows.mcp_servers.pvr_ghsa as pvr_mod + self.pvr = pvr_mod + self.tmp_dir = tempfile.TemporaryDirectory() + self.tmp = Path(self.tmp_dir.name) + + def tearDown(self): + self.tmp_dir.cleanup() + + # --- accept_pvr_advisory --- + + def test_accept_pvr_advisory_calls_correct_api(self): + """accept_pvr_advisory should PATCH state=draft.""" + calls = [] + + def fake_gh_api(path, method="GET", body=None): + calls.append({"path": path, "method": method, "body": body}) + return {"ghsa_id": "GHSA-1234-5678-abcd", "state": "draft"}, None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result = self.pvr.accept_pvr_advisory.fn( + owner="owner", + repo="repo", + ghsa_id="GHSA-1234-5678-abcd", + ) + + self.assertEqual(calls[0]["method"], "PATCH") + self.assertIn("GHSA-1234-5678-abcd", calls[0]["path"]) + self.assertEqual(calls[0]["body"], {"state": "draft"}) + self.assertIn("draft", result) + + # --- reject_pvr_advisory --- + + def test_reject_pvr_advisory_calls_correct_api(self): + """reject_pvr_advisory should PATCH state=closed.""" + calls = [] + + def fake_gh_api(path, method="GET", body=None): + calls.append({"path": path, "method": method, "body": body}) + return {"ghsa_id": "GHSA-1234-5678-abcd", "state": "closed"}, None + + with patch.object(self.pvr, "_gh_api", side_effect=fake_gh_api): + result = self.pvr.reject_pvr_advisory.fn( + owner="owner", + repo="repo", + ghsa_id="GHSA-1234-5678-abcd", + ) + + self.assertEqual(calls[0]["method"], "PATCH") + self.assertIn("GHSA-1234-5678-abcd", calls[0]["path"]) + self.assertEqual(calls[0]["body"], {"state": "closed"}) + self.assertIn("closed", result) + + # --- find_similar_triage_reports --- + + def test_find_similar_reports_matches_vuln_type(self): + """find_similar_triage_reports returns matching reports by vuln_type.""" + report_dir = self.tmp + # Write a fixture report + (report_dir / "GHSA-aaaa-bbbb-cccc_triage.md").write_text( + "## PVR Triage Analysis: GHSA-aaaa-bbbb-cccc\n" + "**Vulnerability Type:** path traversal\n" + "**[UNCONFIRMED]**\n" + "Rate overall quality: Low\n", + encoding="utf-8", + ) + + with _patch_report_dir(report_dir): + result_json = self.pvr.find_similar_triage_reports.fn( + vuln_type="path traversal", + affected_component="upload handler", + ) + + results = json.loads(result_json) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["ghsa_id"], "GHSA-aaaa-bbbb-cccc") + self.assertEqual(results[0]["verdict"], "UNCONFIRMED") + + def test_find_similar_reports_no_matches(self): + """find_similar_triage_reports returns empty list when nothing matches.""" + report_dir = self.tmp + (report_dir / "GHSA-aaaa-bbbb-dddd_triage.md").write_text( + "## PVR Triage Analysis: GHSA-aaaa-bbbb-dddd\n" + "**Vulnerability Type:** SQL injection\n" + "**[CONFIRMED]**\n", + encoding="utf-8", + ) + + with _patch_report_dir(report_dir): + result_json = self.pvr.find_similar_triage_reports.fn( + vuln_type="XSS", + affected_component="login form", + ) + + results = json.loads(result_json) + self.assertEqual(results, []) + + def test_find_similar_reports_empty_dir(self): + """find_similar_triage_reports returns empty list for non-existent REPORT_DIR.""" + empty_dir = self.tmp / "nonexistent" + with _patch_report_dir(empty_dir): + result_json = self.pvr.find_similar_triage_reports.fn( + vuln_type="IDOR", + affected_component="profile", + ) + results = json.loads(result_json) + self.assertEqual(results, []) + + # --- save_triage_report path sanitization --- + + def test_save_triage_report_path_sanitization(self): + """save_triage_report strips path traversal characters from the GHSA ID.""" + with _patch_report_dir(self.tmp): + out_path = self.pvr.save_triage_report.fn( + ghsa_id="../../../etc/passwd", + report="malicious content", + ) + # The file must be inside REPORT_DIR, not outside. + # Resolve both paths to handle macOS /var -> /private/var symlinks. + self.assertTrue(out_path.startswith(str(self.tmp.resolve()))) + # The filename should not contain path separators + saved = Path(out_path) + self.assertFalse(".." in saved.name) + self.assertFalse("/" in saved.name) + + def test_save_triage_report_empty_after_sanitization(self): + """save_triage_report returns an error when ghsa_id is all special chars.""" + with _patch_report_dir(self.tmp): + result = self.pvr.save_triage_report.fn( + ghsa_id="!@#$%^&*()", + report="some content", + ) + self.assertIn("Error", result) + + # --- read_triage_report --- + + def test_read_triage_report_returns_content(self): + """read_triage_report reads back a previously saved report.""" + content = "## PVR Triage Analysis: GHSA-test\n\n**[CONFIRMED]**\n" + (self.tmp / "GHSA-test_triage.md").write_text(content, encoding="utf-8") + + with _patch_report_dir(self.tmp): + result = self.pvr.read_triage_report.fn(ghsa_id="GHSA-test") + + self.assertEqual(result, content) + + def test_read_triage_report_missing_file(self): + """read_triage_report returns an error string for a missing report.""" + with _patch_report_dir(self.tmp): + result = self.pvr.read_triage_report.fn(ghsa_id="GHSA-does-not-exist") + + self.assertIn("not found", result.lower()) + + # --- list_pending_responses --- + + def test_list_pending_responses_empty(self): + """list_pending_responses returns [] when no response drafts exist.""" + with _patch_report_dir(self.tmp): + result_json = self.pvr.list_pending_responses.fn() + results = json.loads(result_json) + self.assertEqual(results, []) + + def test_list_pending_responses_returns_pending(self): + """list_pending_responses includes an entry when a draft exists but no sent marker.""" + (self.tmp / "GHSA-1111-2222-3333_response_triage.md").write_text( + "Response draft.", encoding="utf-8" + ) + with _patch_report_dir(self.tmp): + result_json = self.pvr.list_pending_responses.fn() + results = json.loads(result_json) + self.assertEqual(len(results), 1) + self.assertEqual(results[0]["ghsa_id"], "GHSA-1111-2222-3333") + + def test_list_pending_responses_excludes_sent(self): + """list_pending_responses skips entries where a _response_sent.md marker exists.""" + (self.tmp / "GHSA-1111-2222-3333_response_triage.md").write_text( + "Response draft.", encoding="utf-8" + ) + (self.tmp / "GHSA-1111-2222-3333_response_sent.md").write_text( + "Response sent: 2026-03-03T00:00:00+00:00\n", encoding="utf-8" + ) + with _patch_report_dir(self.tmp): + result_json = self.pvr.list_pending_responses.fn() + results = json.loads(result_json) + self.assertEqual(results, []) + + # --- mark_response_sent --- + + def test_mark_response_sent_creates_marker(self): + """mark_response_sent creates a _response_sent.md marker and returns its path.""" + with _patch_report_dir(self.tmp): + result = self.pvr.mark_response_sent.fn(ghsa_id="GHSA-1111-2222-3333") + marker = self.tmp / "GHSA-1111-2222-3333_response_sent.md" + self.assertTrue(marker.exists()) + self.assertTrue(result.startswith(str(self.tmp.resolve()))) + content = marker.read_text(encoding="utf-8") + self.assertIn("Response sent:", content) + + def test_mark_response_sent_empty_ghsa_id(self): + """mark_response_sent returns an error string when ghsa_id sanitizes to empty.""" + with _patch_report_dir(self.tmp): + result = self.pvr.mark_response_sent.fn(ghsa_id="!@#$%") + self.assertIn("Error", result) + + +# --------------------------------------------------------------------------- +# TestReporterReputationBackend +# --------------------------------------------------------------------------- + +class TestReporterReputationBackend(unittest.TestCase): + """Tests for the ReporterReputationBackend class using in-memory SQLite.""" + + def setUp(self): + from seclab_taskflows.mcp_servers.reporter_reputation import ReporterReputationBackend + # Use explicit in-memory sentinel for tests + self.backend = ReporterReputationBackend(db_dir="sqlite://") + + def test_record_and_retrieve(self): + """record_triage_result inserts a record and get_reporter_history retrieves it.""" + self.backend.record_triage_result( + login="alice", + ghsa_id="GHSA-1111-2222-3333", + repo="owner/repo", + verdict="CONFIRMED", + quality="High", + ) + history = self.backend.get_reporter_history("alice") + self.assertEqual(len(history), 1) + self.assertEqual(history[0]["login"], "alice") + self.assertEqual(history[0]["ghsa_id"], "GHSA-1111-2222-3333") + self.assertEqual(history[0]["verdict"], "CONFIRMED") + self.assertEqual(history[0]["quality"], "High") + + def test_upsert_same_ghsa(self): + """record_triage_result updates an existing record when called again for the same GHSA.""" + self.backend.record_triage_result( + login="bob", + ghsa_id="GHSA-aaaa-bbbb-cccc", + repo="owner/repo", + verdict="UNCONFIRMED", + quality="Low", + ) + # Re-triage the same advisory — should update, not duplicate + self.backend.record_triage_result( + login="bob", + ghsa_id="GHSA-aaaa-bbbb-cccc", + repo="owner/repo", + verdict="CONFIRMED", + quality="High", + ) + history = self.backend.get_reporter_history("bob") + # Should still be exactly 1 record + self.assertEqual(len(history), 1) + self.assertEqual(history[0]["verdict"], "CONFIRMED") + self.assertEqual(history[0]["quality"], "High") + + def test_get_reporter_score_empty(self): + """get_reporter_score returns zero totals for an unknown login.""" + score = self.backend.get_reporter_score("nobody") + self.assertEqual(score["total_reports"], 0) + self.assertEqual(score["confirmed_pct"], 0.0) + self.assertEqual(score["quality_breakdown"], {"High": 0, "Medium": 0, "Low": 0}) + self.assertEqual(score["recommendation"], "no history") + + def test_get_reporter_score_recommendation_skepticism(self): + """5 Low-quality UNCONFIRMED reports → recommendation is 'treat with skepticism'.""" + for i in range(5): + self.backend.record_triage_result( + login="spammer", + ghsa_id=f"GHSA-{i:04d}-0000-0000", + repo="owner/repo", + verdict="UNCONFIRMED", + quality="Low", + ) + score = self.backend.get_reporter_score("spammer") + self.assertEqual(score["recommendation"], "treat with skepticism") + self.assertEqual(score["quality_breakdown"]["Low"], 5) + self.assertEqual(score["confirmed_pct"], 0.0) + + def test_get_reporter_score_recommendation_trust(self): + """5 High-quality CONFIRMED reports → recommendation is 'high trust'.""" + for i in range(5): + self.backend.record_triage_result( + login="expert", + ghsa_id=f"GHSA-{i:04d}-1111-1111", + repo="owner/repo", + verdict="CONFIRMED", + quality="High", + ) + score = self.backend.get_reporter_score("expert") + self.assertEqual(score["recommendation"], "high trust") + self.assertEqual(score["confirmed_pct"], 1.0) + + def test_get_reporter_history_empty(self): + """get_reporter_history returns empty list for unknown login.""" + history = self.backend.get_reporter_history("ghost") + self.assertEqual(history, []) + + def test_record_invalid_verdict_raises(self): + """record_triage_result rejects unknown verdict strings.""" + with self.assertRaises(ValueError): + self.backend.record_triage_result("alice", "GHSA-x", "r/r", "MAYBE", "High") + + def test_record_invalid_quality_raises(self): + """record_triage_result rejects unknown quality strings.""" + with self.assertRaises(ValueError): + self.backend.record_triage_result("alice", "GHSA-x", "r/r", "CONFIRMED", "Excellent") + + def test_multiple_reporters_isolated(self): + """Records for different reporters are independent.""" + self.backend.record_triage_result("alice", "GHSA-a", "r/r", "CONFIRMED", "High") + self.backend.record_triage_result("bob", "GHSA-b", "r/r", "UNCONFIRMED", "Low") + + alice_history = self.backend.get_reporter_history("alice") + bob_history = self.backend.get_reporter_history("bob") + + self.assertEqual(len(alice_history), 1) + self.assertEqual(len(bob_history), 1) + self.assertEqual(alice_history[0]["ghsa_id"], "GHSA-a") + self.assertEqual(bob_history[0]["ghsa_id"], "GHSA-b") + + +# --------------------------------------------------------------------------- +# TestYamlStructure +# --------------------------------------------------------------------------- + +class TestYamlStructure(unittest.TestCase): + """Tests that the new YAML files parse correctly via AvailableTools.""" + + def setUp(self): + from seclab_taskflow_agent.available_tools import AvailableTools + self.tools = AvailableTools() + + def test_pvr_triage_yaml_parses(self): + """pvr_triage.yaml loads without error and is a taskflow.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_triage") + self.assertIsNotNone(result) + header = result["seclab-taskflow-agent"] + self.assertEqual(header["filetype"], "taskflow") + + def test_pvr_respond_yaml_parses(self): + """pvr_respond.yaml loads without error and declares required globals.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_respond") + self.assertIsNotNone(result) + header = result["seclab-taskflow-agent"] + self.assertEqual(header["filetype"], "taskflow") + globals_keys = result.get("globals", {}) + self.assertIn("repo", globals_keys) + self.assertIn("ghsa", globals_keys) + self.assertIn("action", globals_keys) + + def test_pvr_triage_batch_yaml_parses(self): + """pvr_triage_batch.yaml loads without error and declares repo global.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_triage_batch") + self.assertIsNotNone(result) + header = result["seclab-taskflow-agent"] + self.assertEqual(header["filetype"], "taskflow") + globals_keys = result.get("globals", {}) + self.assertIn("repo", globals_keys) + + def test_reporter_reputation_toolbox_parses(self): + """reporter_reputation.yaml loads without error and is a toolbox.""" + result = self.tools.get_toolbox("seclab_taskflows.toolboxes.reporter_reputation") + self.assertIsNotNone(result) + header = result["seclab-taskflow-agent"] + self.assertEqual(header["filetype"], "toolbox") + + def test_pvr_ghsa_toolbox_has_confirm(self): + """pvr_ghsa.yaml toolbox declares write-back tools in confirm list.""" + result = self.tools.get_toolbox("seclab_taskflows.toolboxes.pvr_ghsa") + self.assertIsNotNone(result) + confirm = result.get("confirm", []) + self.assertIn("accept_pvr_advisory", confirm) + self.assertIn("reject_pvr_advisory", confirm) + self.assertNotIn("add_pvr_advisory_comment", confirm) + + def test_pvr_respond_batch_yaml_parses(self): + """pvr_respond_batch.yaml loads without error and declares repo + action globals.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_respond_batch") + self.assertIsNotNone(result) + header = result["seclab-taskflow-agent"] + self.assertEqual(header["filetype"], "taskflow") + globals_keys = result.get("globals", {}) + self.assertIn("repo", globals_keys) + self.assertIn("action", globals_keys) + + def test_pvr_triage_yaml_has_reporter_reputation_toolbox(self): + """pvr_triage.yaml references reporter_reputation toolbox in at least one task.""" + result = self.tools.get_taskflow("seclab_taskflows.taskflows.pvr_triage.pvr_triage") + taskflow = result.get("taskflow", []) + toolbox_refs = [] + for task_wrapper in taskflow: + task = task_wrapper.get("task", {}) + toolboxes = task.get("toolboxes", []) + toolbox_refs.extend(toolboxes) + self.assertIn( + "seclab_taskflows.toolboxes.reporter_reputation", + toolbox_refs, + "pvr_triage.yaml must reference the reporter_reputation toolbox", + ) + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])