Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 48 additions & 11 deletions skills/warden-sweep/SKILL.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,17 @@ Fetches open warden-labeled PRs, builds file-to-PR dedup index, caches diffs for
uv run ${CLAUDE_SKILL_ROOT}/scripts/index_prs.py <sweep-dir>
```

### `scripts/create_issue.py`

Creates a GitHub tracking issue summarizing sweep results. Run after verification, before patching.

```bash
uv run ${CLAUDE_SKILL_ROOT}/scripts/create_issue.py <sweep-dir>
```

### `scripts/organize.py`

Tags security findings, labels security PRs, updates finding reports with PR links, generates summary report, finalizes manifest.
Tags security findings, labels security PRs, updates finding reports with PR links, posts final results to tracking issue, generates summary report, finalizes manifest.

```bash
uv run ${CLAUDE_SKILL_ROOT}/scripts/organize.py <sweep-dir>
Expand Down Expand Up @@ -87,7 +95,7 @@ Parse the JSON stdout. Save `runId` and `sweepDir` for subsequent phases.
```
## Scan Complete
Scanned **{filesScanned}** files, **{filesErrored}** errors.
Scanned **{filesScanned}** files, **{filesTimedOut}** timed out, **{filesErrored}** errors.
### Findings ({totalFindings} total)
Expand All @@ -99,7 +107,7 @@ Scanned **{filesScanned}** files, **{filesErrored}** errors.

Render every finding from the `findings` array. Bold severity for high and above.

**On failure**: If exit code 1, show the error JSON and stop. If exit code 2, show the partial results and note which files errored.
**On failure**: If exit code 1, show the error JSON and stop. If exit code 2, show the partial results. List timed-out files separately from errored files so users know which can be retried.

---

Expand All @@ -111,7 +119,7 @@ Deep-trace each finding using Task subagents to qualify or disqualify.

Check if `data/verify/<finding-id>.json` already exists (incrementality). If it does, skip.

Launch a Task subagent (`subagent_type: "general-purpose"`) for each finding. Process findings sequentially (one at a time) to keep output organized.
Launch a Task subagent (`subagent_type: "general-purpose"`) for each finding. Process findings in parallel batches of up to 8 to improve throughput.

**Task prompt for each finding:**

Expand Down Expand Up @@ -195,9 +203,35 @@ Update manifest: set `phases.verify` to `"complete"`.

---

## Phase 3: Patch
## Phase 3: Issue

Create a tracking issue that ties all PRs together and gives reviewers a single overview.

**Run** (1 tool call):

```bash
uv run ${CLAUDE_SKILL_ROOT}/scripts/create_issue.py ${SWEEP_DIR}
```

Parse the JSON stdout. Save `issueUrl` and `issueNumber` for Phase 4.

**Report** to user:

```
## Tracking Issue Created
{issueUrl}
```

**On failure**: Show the error. Continue to Phase 4 (PRs can still be created without a tracking issue).

---

## Phase 4: Patch

For each verified finding, create a worktree, fix the code, and open a draft PR. Process findings **sequentially** (one at a time) since parallel subagents cross-contaminate worktrees.

For each verified finding, create a worktree, fix the code, and open a draft PR.
**Severity triage**: Patch HIGH and above. For MEDIUM, only patch findings from bug-detection skills (e.g., `code-review`, `security-review`). Skip LOW and INFO findings.

**Step 0: Index existing PRs** (1 tool call):

Expand Down Expand Up @@ -275,8 +309,7 @@ uv run ${CLAUDE_SKILL_ROOT}/scripts/find_reviewers.py "${FILE_PATH}"
**Step 4: Create draft PR**

```bash
cd "${WORKTREE}"
git push -u origin "${BRANCH}"
cd "${WORKTREE}" && git push -u origin HEAD:"${BRANCH}"
```

Create the PR with a 1-2 sentence "What" summary based on the finding and fix, followed by the finding description and verification reasoning:
Expand All @@ -298,6 +331,9 @@ ${REASONING}
Automated fix for Warden finding ${FINDING_ID} (${SEVERITY}, detected by ${SKILL}).
<!-- Only include the next line if Phase 3 succeeded and ISSUE_NUMBER is available -->
Ref #${ISSUE_NUMBER}
> This PR was auto-generated by a Warden Sweep (run ${RUN_ID}).
> The finding has been validated through automated deep tracing,
> but human confirmation is requested as this is batch work.
Expand Down Expand Up @@ -340,7 +376,7 @@ Update manifest: set `phases.patch` to `"complete"`.

---

## Phase 4: Organize
## Phase 5: Organize

**Run** (1 tool call):

Expand Down Expand Up @@ -376,8 +412,9 @@ Each phase is incremental. To resume from where you left off:
1. Check `data/manifest.json` to see which phases are complete
2. For scan: pass `--sweep-dir` to `scan.py`
3. For verify: existing `data/verify/<id>.json` files are skipped
4. For patch: existing entries in `data/patches.jsonl` are skipped
5. For organize: safe to re-run (idempotent)
4. For issue: `create_issue.py` is idempotent (skips if `issueUrl` in manifest)
5. For patch: existing entries in `data/patches.jsonl` are skipped
6. For organize: safe to re-run (idempotent)

## Output Directory Structure

Expand Down
51 changes: 51 additions & 0 deletions skills/warden-sweep/scripts/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,24 @@ def run_cmd(
)


def read_json(path: str) -> dict[str, Any] | None:
"""Read a JSON file and return parsed object, or None on failure."""
if not os.path.exists(path):
return None
try:
with open(path) as f:
return json.load(f)
except (json.JSONDecodeError, OSError):
return None


def write_json(path: str, data: dict[str, Any]) -> None:
"""Write a dict to a JSON file with trailing newline."""
with open(path, "w") as f:
json.dump(data, f, indent=2)
f.write("\n")


def read_jsonl(path: str) -> list[dict[str, Any]]:
"""Read a JSONL file and return list of parsed objects."""
entries: list[dict[str, Any]] = []
Expand All @@ -35,3 +53,36 @@ def read_jsonl(path: str) -> list[dict[str, Any]]:
except json.JSONDecodeError:
continue
return entries


def severity_badge(severity: str) -> str:
"""Return a markdown-friendly severity indicator."""
badges = {
"critical": "**CRITICAL**",
"high": "**HIGH**",
"medium": "MEDIUM",
"low": "LOW",
"info": "info",
}
return badges.get(severity, severity)


def pr_number_from_url(pr_url: str) -> str:
"""Extract the PR or issue number from a GitHub URL's last path segment."""
return pr_url.rstrip("/").split("/")[-1]


def ensure_github_label(name: str, color: str, description: str) -> None:
"""Create a GitHub label if it doesn't exist (idempotent)."""
try:
subprocess.run(
[
"gh", "label", "create", name,
"--color", color,
"--description", description,
],
capture_output=True,
timeout=15,
)
except (subprocess.TimeoutExpired, FileNotFoundError):
pass
189 changes: 189 additions & 0 deletions skills/warden-sweep/scripts/create_issue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
#!/usr/bin/env python3
# /// script
# requires-python = ">=3.9"
# ///
"""
Warden Sweep: Create tracking issue.

Creates a GitHub issue summarizing the sweep results after verification
but before patching. Gives every PR a parent to reference and gives
reviewers a single place to see the full picture.

Usage:
uv run create_issue.py <sweep-dir>

Stdout: JSON with issueUrl and issueNumber
Stderr: Progress lines

Idempotent: if issueUrl already exists in manifest, skips creation.
"""
from __future__ import annotations

import argparse
import json
import os
import subprocess
import sys
from typing import Any

sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from _utils import ( # noqa: E402
ensure_github_label,
pr_number_from_url,
read_json,
read_jsonl,
severity_badge,
write_json,
)


def build_issue_body(
run_id: str,
scan_index: list[dict[str, Any]],
all_findings: list[dict[str, Any]],
verified: list[dict[str, Any]],
rejected: list[dict[str, Any]],
) -> str:
"""Build the GitHub issue body markdown."""
files_scanned = sum(1 for e in scan_index if e.get("status") == "complete")
files_timed_out = sum(
1 for e in scan_index
if e.get("status") == "error" and e.get("error") == "timeout"
)
files_errored = sum(
1 for e in scan_index
if e.get("status") == "error" and e.get("error") != "timeout"
)

# Collect unique skills from scan index
skills: set[str] = set()
for entry in scan_index:
for skill in entry.get("skills", []):
skills.add(skill)

lines = [
f"## Warden Sweep `{run_id}`",
"",
"| Metric | Count |",
"|--------|-------|",
f"| Files scanned | {files_scanned} |",
f"| Files timed out | {files_timed_out} |",
f"| Files errored | {files_errored} |",
f"| Total findings | {len(all_findings)} |",
f"| Verified | {len(verified)} |",
f"| Rejected | {len(rejected)} |",
"",
]

if verified:
lines.append("### Verified Findings")
lines.append("")
lines.append("| Severity | Skill | File | Title |")
lines.append("|----------|-------|------|-------|")
for f in verified:
sev = severity_badge(f.get("severity", "info"))
skill = f.get("skill", "")
file_path = f.get("file", "")
start_line = f.get("startLine")
location = f"{file_path}:{start_line}" if start_line else file_path
title = f.get("title", "")
lines.append(f"| {sev} | {skill} | `{location}` | {title} |")
lines.append("")

if skills:
lines.append("### Skills Run")
lines.append("")
lines.append(", ".join(sorted(skills)))
lines.append("")

lines.append("> Generated by Warden Sweep. PRs referencing this issue will appear below.")

return "\n".join(lines) + "\n"


def create_github_issue(title: str, body: str) -> dict[str, Any]:
"""Create a GitHub issue with the warden label. Returns issueUrl and issueNumber."""
ensure_github_label("warden", "5319E7", "Automated fix from Warden Sweep")

result = subprocess.run(
[
"gh", "issue", "create",
"--label", "warden",
"--title", title,
"--body", body,
],
capture_output=True,
text=True,
timeout=30,
)

if result.returncode != 0:
raise RuntimeError(f"gh issue create failed: {result.stderr.strip()}")

issue_url = result.stdout.strip()
try:
issue_number = int(pr_number_from_url(issue_url))
except (ValueError, IndexError):
raise RuntimeError(f"Could not parse issue number from gh output: {issue_url}")

return {"issueUrl": issue_url, "issueNumber": issue_number}


def main() -> None:
parser = argparse.ArgumentParser(
description="Warden Sweep: Create tracking issue"
)
parser.add_argument("sweep_dir", help="Path to the sweep directory")
args = parser.parse_args()

sweep_dir = args.sweep_dir
data_dir = os.path.join(sweep_dir, "data")
manifest_path = os.path.join(data_dir, "manifest.json")

if not os.path.isdir(sweep_dir):
print(
json.dumps({"error": f"Sweep directory not found: {sweep_dir}"}),
file=sys.stdout,
)
sys.exit(1)

manifest = read_json(manifest_path) or {}

# Idempotency: if issue already exists, return existing values
if manifest.get("issueUrl"):
output = {
"issueUrl": manifest["issueUrl"],
"issueNumber": manifest.get("issueNumber", 0),
}
print(json.dumps(output))
return

run_id = manifest.get("runId", "unknown")

# Read sweep data
scan_index = read_jsonl(os.path.join(data_dir, "scan-index.jsonl"))
all_findings = read_jsonl(os.path.join(data_dir, "all-findings.jsonl"))
verified = read_jsonl(os.path.join(data_dir, "verified.jsonl"))
rejected = read_jsonl(os.path.join(data_dir, "rejected.jsonl"))

files_scanned = sum(1 for e in scan_index if e.get("status") == "complete")

# Build issue
title = f"Warden Sweep {run_id}: {len(verified)} findings across {files_scanned} files"
body = build_issue_body(run_id, scan_index, all_findings, verified, rejected)

print("Creating tracking issue...", file=sys.stderr)
result = create_github_issue(title, body)
print(f"Created issue: {result['issueUrl']}", file=sys.stderr)

# Write issueUrl and issueNumber to manifest
manifest["issueUrl"] = result["issueUrl"]
manifest["issueNumber"] = result["issueNumber"]
manifest.setdefault("phases", {})["issue"] = "complete"
write_json(manifest_path, manifest)

print(json.dumps(result))


if __name__ == "__main__":
main()
Loading