diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index ad19afd..d630cfa 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ on: jobs: test: - runs-on: self-hosted + runs-on: ubuntu-latest strategy: matrix: python-version: ["3.11", "3.12"] diff --git a/src/arbiter/artifact_scorer.py b/src/arbiter/artifact_scorer.py new file mode 100644 index 0000000..6961231 --- /dev/null +++ b/src/arbiter/artifact_scorer.py @@ -0,0 +1,234 @@ +"""Artifact Scorer -- generic quality scoring for structured knowledge artifacts. + +Extends Arbiter beyond code quality to score any structured artifact that has +a schema and quality dimensions. Mirrors the Analyzer / Finding / RepoScore +pattern from analyzers/base.py and scoring.py. + +Supported artifact types (registered in scorers/): + bibliography -- hummbl-bibliography batch quality + governance -- HUMMBL governance receipt quality + +Adding a new scorer: + 1. Subclass ArtifactScorer, implement artifact_type and score() + 2. Register via DEFAULT_REGISTRY.register(MyScorer()) + 3. Add tests to tests/test_artifact_scorer.py + +Usage: + from arbiter.artifact_scorer import DEFAULT_REGISTRY + + result = DEFAULT_REGISTRY.score("bibliography", {"entries": [...]}) + print(f"{result.grade} ({result.overall}/100)") + for finding in result.findings: + print(f" [{finding.severity}] {finding.field}: {finding.message}") +""" + +from __future__ import annotations + +from abc import ABC, abstractmethod +from dataclasses import dataclass, field + + +# --------------------------------------------------------------------------- +# Finding +# --------------------------------------------------------------------------- + +@dataclass(frozen=True, slots=True) +class ArtifactFinding: + """A single quality finding from an artifact scorer. + + Parallel to analyzers/base.py Finding, but for knowledge artifacts + rather than code files. + """ + + field: str # Which field or dimension the finding concerns + severity: str # CRITICAL, HIGH, MEDIUM, LOW + rule_id: str # Stable identifier for the rule (e.g. BIB001) + message: str # Human-readable description + scorer: str # Which scorer produced this finding + + +# --------------------------------------------------------------------------- +# Score +# --------------------------------------------------------------------------- + +@dataclass(frozen=True) +class ArtifactScore: + """Quality score for a structured knowledge artifact. + + Parallel to scoring.py RepoScore, but for arbitrary artifact types. + """ + + artifact_type: str # Which scorer produced this + overall: float # 0-100 weighted composite + dimensions: dict[str, float] # Per-dimension scores (0-100 each) + total_findings: int + findings: tuple[ArtifactFinding, ...] = field(default_factory=tuple) + findings_by_severity: dict[str, int] = field(default_factory=dict) + metadata: dict[str, object] = field(default_factory=dict) + + @property + def grade(self) -> str: + """Letter grade matching Arbiter's RepoScore grade scale.""" + if self.overall >= 90: + return "A" + if self.overall >= 80: + return "B" + if self.overall >= 70: + return "C" + if self.overall >= 60: + return "D" + return "F" + + def summary(self) -> str: + """One-line summary string.""" + dim_str = ", ".join(f"{k}={v:.0f}" for k, v in sorted(self.dimensions.items())) + return ( + f"{self.artifact_type} | Grade {self.grade} | {self.overall:.1f}/100 | " + f"{self.total_findings} findings | [{dim_str}]" + ) + + def to_dict(self) -> dict: + """Serialise to a plain dict (JSON-safe).""" + return { + "artifact_type": self.artifact_type, + "overall": self.overall, + "grade": self.grade, + "dimensions": dict(self.dimensions), + "total_findings": self.total_findings, + "findings_by_severity": dict(self.findings_by_severity), + "findings": [ + { + "field": f.field, + "severity": f.severity, + "rule_id": f.rule_id, + "message": f.message, + "scorer": f.scorer, + } + for f in self.findings + ], + "metadata": dict(self.metadata), + } + + +# --------------------------------------------------------------------------- +# Base scorer +# --------------------------------------------------------------------------- + +class ArtifactScorer(ABC): + """Abstract base for knowledge artifact scorers. + + Parallel to analyzers/base.py Analyzer, but takes an arbitrary dict + instead of a filesystem path. + """ + + @property + @abstractmethod + def artifact_type(self) -> str: + """Unique type key (e.g. 'bibliography', 'governance').""" + + @abstractmethod + def score(self, artifact: dict) -> ArtifactScore: + """Score the artifact. Returns an ArtifactScore.""" + + def is_available(self) -> bool: + """Check if this scorer can run (override if external tools needed).""" + return True + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +class ArtifactScorerRegistry: + """Maps artifact type strings to ArtifactScorer instances. + + Usage: + registry = ArtifactScorerRegistry() + registry.register(BibliographyScorer()) + registry.register(GovernanceReceiptScorer()) + + result = registry.score("bibliography", {"entries": [...]}) + """ + + def __init__(self) -> None: + self._scorers: dict[str, ArtifactScorer] = {} + + def register(self, scorer: ArtifactScorer) -> None: + """Register a scorer. Overwrites any existing scorer for the same type.""" + self._scorers[scorer.artifact_type] = scorer + + def score(self, artifact_type: str, artifact: dict) -> ArtifactScore: + """Score an artifact by type. Raises KeyError if type not registered.""" + scorer = self._scorers.get(artifact_type) + if scorer is None: + available = sorted(self._scorers) + raise KeyError( + f"No scorer registered for artifact_type={artifact_type!r}. " + f"Available: {available}" + ) + return scorer.score(artifact) + + def available_types(self) -> list[str]: + """Return sorted list of registered artifact type keys.""" + return sorted(self._scorers) + + def __contains__(self, artifact_type: str) -> bool: + return artifact_type in self._scorers + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _score_from_findings( + findings: list[ArtifactFinding], + dimension_findings: dict[str, list[ArtifactFinding]], + weights: dict[str, float], + artifact_type: str, + metadata: dict | None = None, +) -> ArtifactScore: + """Build an ArtifactScore from findings and dimension weights. + + Each dimension score starts at 100 and is penalised by findings: + CRITICAL: -25, HIGH: -15, MEDIUM: -7, LOW: -3 + + Dimensions not present in weights or dimension_findings get 100. + """ + severity_penalty = {"CRITICAL": 25, "HIGH": 15, "MEDIUM": 7, "LOW": 3} + dimensions: dict[str, float] = {} + + for dim, dim_findings in dimension_findings.items(): + penalty = sum(severity_penalty.get(f.severity, 3) for f in dim_findings) + dimensions[dim] = max(0.0, min(100.0, 100.0 - float(penalty))) + + # Fill in any weight-declared dimensions with no findings + for dim in weights: + if dim not in dimensions: + dimensions[dim] = 100.0 + + # Weighted overall + total_weight = sum(weights.values()) or 1.0 + overall = sum( + dimensions.get(dim, 100.0) * w for dim, w in weights.items() + ) / total_weight + + by_severity: dict[str, int] = {} + for f in findings: + by_severity[f.severity] = by_severity.get(f.severity, 0) + 1 + + return ArtifactScore( + artifact_type=artifact_type, + overall=round(overall, 1), + dimensions={k: round(v, 1) for k, v in dimensions.items()}, + total_findings=len(findings), + findings=tuple(findings), + findings_by_severity=by_severity, + metadata=metadata or {}, + ) + + +# --------------------------------------------------------------------------- +# Default registry (populated by scorers/__init__.py on import) +# --------------------------------------------------------------------------- + +DEFAULT_REGISTRY = ArtifactScorerRegistry() diff --git a/src/arbiter/report.py b/src/arbiter/report.py index c3cf9c3..1418eeb 100644 --- a/src/arbiter/report.py +++ b/src/arbiter/report.py @@ -14,7 +14,7 @@ from __future__ import annotations import json -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path diff --git a/src/arbiter/scorers/__init__.py b/src/arbiter/scorers/__init__.py new file mode 100644 index 0000000..6d782c5 --- /dev/null +++ b/src/arbiter/scorers/__init__.py @@ -0,0 +1,22 @@ +"""Arbiter knowledge artifact scorers. + +Registers all built-in ArtifactScorer implementations into DEFAULT_REGISTRY. + +Importing this package is sufficient to make all built-in scorers available: + + import arbiter.scorers # noqa: F401 + from arbiter.artifact_scorer import DEFAULT_REGISTRY + + result = DEFAULT_REGISTRY.score("bibliography", {"entries": [...]}) +""" + +from arbiter.scorers.bibliography_scorer import BibliographyScorer +from arbiter.scorers.governance_receipt_scorer import GovernanceReceiptScorer +from arbiter.scorers.arcana_essay_scorer import ArcanaEssayScorer +from arbiter.artifact_scorer import DEFAULT_REGISTRY + +DEFAULT_REGISTRY.register(BibliographyScorer()) +DEFAULT_REGISTRY.register(GovernanceReceiptScorer()) +DEFAULT_REGISTRY.register(ArcanaEssayScorer()) + +__all__ = ["BibliographyScorer", "GovernanceReceiptScorer", "ArcanaEssayScorer"] diff --git a/src/arbiter/scorers/arcana_essay_scorer.py b/src/arbiter/scorers/arcana_essay_scorer.py new file mode 100644 index 0000000..49a5308 --- /dev/null +++ b/src/arbiter/scorers/arcana_essay_scorer.py @@ -0,0 +1,339 @@ +"""ARCANA Essay Scorer -- adversarial quality gate for overnight synthesis outputs. + +Scores a single ARCANA lens essay before it's ingested into the CLP ledger. +This is the adversarial layer: it checks whether convergence signals are +grounded in empirical sources, not just philosophical frameworks agreeing. + +The scorer answers: "Should this essay be trusted enough to enter the ledger?" + +Dimensions: + empirical_grounding -- convergence signals cite T5-T7 bibliography entries + citation_density -- essay body references bibliography entries by ID + on_topic_ratio -- essay addresses the lens's domain (not generic) + structural_integrity -- 5 required sections present and non-empty + source_diversity -- citations span multiple tiers (not all T1-T2) + +Artifact input format (dict): + { + "lens": "bki", # ARCANA lens name + "title": "BKI: Belonging as Governance...", + "content": "## Overview\n...", # Full essay text + "convergence_signal": "...", # Extracted signal text + "citations": ["bib-001", "bib-042", ...], # Bibliography entry IDs cited + "citation_tiers": {"bib-001": "T7", ...}, # Optional tier map + "word_count": 1200, + } + +Scoring thresholds: + Grade A (90+): Publish-quality, all signals empirically grounded + Grade B (80+): Ledger-ready, minor gaps + Grade C (70+): Ingest with warning tag + Grade D (60+): Flag for human review before ingest + Grade F (<60): Block from ledger — philosophical echo chamber risk +""" + +from __future__ import annotations + +import re + +from arbiter.artifact_scorer import ( + ArtifactFinding, + ArtifactScore, + ArtifactScorer, + _score_from_findings, +) + +_TYPE = "arcana_essay" + +_WEIGHTS = { + "empirical_grounding": 0.30, + "citation_density": 0.25, + "structural_integrity": 0.20, + "source_diversity": 0.15, + "on_topic_ratio": 0.10, +} + +# Required sections in an ARCANA essay +_REQUIRED_SECTIONS = ( + "overview", + "convergence signal", + "challenge", + "hummbl", + "conclusion", +) + +# Empirical tiers (T5+) — these ground philosophical claims in evidence +_EMPIRICAL_TIERS = {"T5", "T6", "T7", "T8", "T9", "T10", "T11", "T12", "T13"} +_THEORETICAL_TIERS = {"T1", "T2", "T3", "T4"} + +# Minimum citations for a credible essay +_MIN_CITATIONS = 3 +_MIN_EMPIRICAL_CITATIONS = 1 # At least 1 T5+ source + +# Minimum word count for a substantive essay +_MIN_WORD_COUNT = 400 +_MIN_CONVERGENCE_WORDS = 30 + +# Known ARCANA lens names (for on-topic validation) +_KNOWN_LENSES = { + "yarvin", "dugin", "land", "strauss", "nietzsche", "machiavelli", "weber", + "gramsci", "foucault", "chomsky", "schmitt", "kissinger", "bostrom", + "mcluhan", "zuboff", "burnham", "evola", "luhmann", "marx", "pareto", + "popper", "veblen", "bateson", "baudrillard", "guenon", "heidegger", + "habermas", "bourdieu", "derrida", "girard", "fanon", "arendt", + "ibn_khaldun", "taleb", "ashby", "synthesist", "bki", "measurement", +} + + +def _count_words(text: str) -> int: + """Count whitespace-delimited words.""" + return len(text.split()) if text else 0 + + +def _has_section(content: str, section_name: str) -> bool: + """Check if the essay contains a section with this name (case-insensitive).""" + pattern = re.compile( + r"^#{1,3}\s*" + re.escape(section_name), + re.IGNORECASE | re.MULTILINE, + ) + return bool(pattern.search(content)) + + +class ArcanaEssayScorer(ArtifactScorer): + """Score an ARCANA lens essay for quality before CLP ingest.""" + + @property + def artifact_type(self) -> str: + return _TYPE + + def score(self, artifact: dict) -> ArtifactScore: + findings: list[ArtifactFinding] = [] + dim_findings: dict[str, list[ArtifactFinding]] = {d: [] for d in _WEIGHTS} + + lens = artifact.get("lens", "") + content = artifact.get("content", "") + convergence = artifact.get("convergence_signal", "") + citations = artifact.get("citations") or [] + citation_tiers = artifact.get("citation_tiers") or {} + word_count = artifact.get("word_count") or _count_words(content) + if isinstance(citations, str): + citations = [c.strip() for c in citations.split(",") if c.strip()] + + self._check_structure(content, word_count, convergence, findings, dim_findings) + self._check_citation_density(citations, content, findings, dim_findings) + self._check_empirical_grounding( + citations, citation_tiers, convergence, findings, dim_findings, + ) + self._check_source_diversity(citation_tiers, findings, dim_findings) + self._check_on_topic(lens, content, findings, dim_findings) + + return _score_from_findings( + findings=findings, + dimension_findings=dim_findings, + weights=_WEIGHTS, + artifact_type=_TYPE, + metadata={ + "lens": lens, + "word_count": word_count, + "citation_count": len(citations), + "empirical_citation_count": ( + sum(1 for t in citation_tiers.values() if t.upper() in _EMPIRICAL_TIERS) + if citation_tiers else None + ), + "convergence_words": _count_words(convergence), + }, + ) + + @staticmethod + def _check_structure( + content: str, word_count: int, convergence: str, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + for section in _REQUIRED_SECTIONS: + if content and _has_section(content, section): + continue + severity = "HIGH" if section in ("convergence signal", "overview") else "MEDIUM" + f = ArtifactFinding( + field=f"section:{section}", severity=severity, rule_id="ARC101", + message=f"Required section '{section}' not found in essay", scorer=_TYPE, + ) + findings.append(f) + dim_findings["structural_integrity"].append(f) + + if word_count < _MIN_WORD_COUNT: + f = ArtifactFinding( + field="word_count", severity="MEDIUM", rule_id="ARC102", + message=( + f"Essay is only {word_count} words (min {_MIN_WORD_COUNT}). " + "May be truncated or incomplete." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["structural_integrity"].append(f) + + if convergence and _count_words(convergence) < _MIN_CONVERGENCE_WORDS: + f = ArtifactFinding( + field="convergence_signal", severity="MEDIUM", rule_id="ARC103", + message=( + f"Convergence signal is only {_count_words(convergence)} words. " + "Signals under {_MIN_CONVERGENCE_WORDS} words are usually too vague to be actionable." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["structural_integrity"].append(f) + + @staticmethod + def _check_citation_density( + citations: list, content: str, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if len(citations) < _MIN_CITATIONS: + severity = "HIGH" if not citations else "MEDIUM" + f = ArtifactFinding( + field="citations", severity=severity, rule_id="ARC201", + message=( + f"Essay has {len(citations)} citation(s) (min {_MIN_CITATIONS}). " + "Insufficient grounding in bibliography." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["citation_density"].append(f) + + if not (content and citations): + return + uncited = [c for c in citations if c not in content] + if len(uncited) > len(citations) * 0.5: + f = ArtifactFinding( + field="citations", severity="LOW", rule_id="ARC202", + message=( + f"{len(uncited)}/{len(citations)} citations listed but not " + "referenced in essay body (possible header-only listing)." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["citation_density"].append(f) + + @staticmethod + def _check_empirical_grounding( + citations: list, citation_tiers: dict, convergence: str, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if not citations: + f = ArtifactFinding( + field="citations", severity="HIGH", rule_id="ARC304", + message=( + "Zero citations present. Empirical grounding cannot be assessed. " + "An uncited essay is indistinguishable from generated confabulation." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["empirical_grounding"].append(f) + return + + if not citation_tiers: + f = ArtifactFinding( + field="citation_tiers", severity="LOW", rule_id="ARC303", + message=( + "No citation tier metadata provided. " + "Cannot verify empirical grounding (T5+ sources). " + "Include citation_tiers for full quality scoring." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["empirical_grounding"].append(f) + return + + empirical_cites = [c for c, t in citation_tiers.items() if t.upper() in _EMPIRICAL_TIERS] + if len(empirical_cites) < _MIN_EMPIRICAL_CITATIONS: + f = ArtifactFinding( + field="empirical_grounding", severity="HIGH", rule_id="ARC301", + message=( + "No T5+ empirical citations found. " + "Convergence signal is based on philosophical frameworks only — " + "echo chamber risk before CLP ingest." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["empirical_grounding"].append(f) + + if convergence and empirical_cites and not any(c in convergence for c in empirical_cites): + f = ArtifactFinding( + field="convergence_signal", severity="MEDIUM", rule_id="ARC302", + message=( + "Convergence signal section does not reference any T5+ empirical " + "citation. Signal may be philosophical consensus, not evidence-based." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["empirical_grounding"].append(f) + + @staticmethod + def _check_source_diversity( + citation_tiers: dict, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if not citation_tiers: + return + tiers_used = {t.upper() for t in citation_tiers.values()} + if len(tiers_used) < 2: + f = ArtifactFinding( + field="source_diversity", severity="LOW", rule_id="ARC401", + message=( + f"All citations from a single tier ({tiers_used}). " + "Greater tier diversity improves argument strength." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["source_diversity"].append(f) + + theoretical = sum(1 for t in citation_tiers.values() if t.upper() in _THEORETICAL_TIERS) + total = len(citation_tiers) + if total > 0 and theoretical / total > 0.80: + f = ArtifactFinding( + field="source_diversity", severity="MEDIUM", rule_id="ARC402", + message=( + f"{theoretical/total:.0%} of citations are T1-T4 (theoretical). " + "High theoretical concentration — consider empirical counterweights." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["source_diversity"].append(f) + + @staticmethod + def _check_on_topic( + lens: str, content: str, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if lens and lens.lower() not in _KNOWN_LENSES: + f = ArtifactFinding( + field="lens", severity="LOW", rule_id="ARC501", + message=( + f"Lens '{lens}' not in known ARCANA lens list. " + "Custom lens — routing and synthesis may not be calibrated." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["on_topic_ratio"].append(f) + + if lens and lens.lower() not in content.lower(): + f = ArtifactFinding( + field="on_topic_ratio", severity="LOW", rule_id="ARC502", + message=( + f"Lens name '{lens}' does not appear in essay content. " + "Essay may be generic rather than lens-specific." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["on_topic_ratio"].append(f) diff --git a/src/arbiter/scorers/bibliography_scorer.py b/src/arbiter/scorers/bibliography_scorer.py new file mode 100644 index 0000000..814c49c --- /dev/null +++ b/src/arbiter/scorers/bibliography_scorer.py @@ -0,0 +1,252 @@ +"""Bibliography Scorer -- quality scoring for hummbl-bibliography batches. + +Scores a batch of bibliography entries (or a single entry) across 5 dimensions: + + doi_coverage -- % of entries with DOI or URL source + tier_distribution -- health of T-tier spread (penalise T1/T2 dominance) + tag_completeness -- entries have NIST or topic tags + entry_completeness -- required fields present + citation_density -- average links / cross-references per entry + +Artifact input format (dict): + { + "entries": [ + { + "id": "bib-001", + "title": "...", + "authors": ["..."], + "year": 2024, + "tier": "T7", + "doi": "10.1234/...", # optional + "url": "https://...", # optional + "tags": ["governance", "nist-csf"], + "links": ["bib-002"], # optional + }, + ... + ] + } + +Single entry mode: pass {"entry": {...}} instead of {"entries": [...]}. +""" + +from __future__ import annotations + +from arbiter.artifact_scorer import ( + ArtifactFinding, + ArtifactScore, + ArtifactScorer, + _score_from_findings, +) + +_TYPE = "bibliography" + +# Dimension weights (must sum to 1.0) +_WEIGHTS = { + "doi_coverage": 0.25, + "tier_distribution": 0.20, + "tag_completeness": 0.25, + "entry_completeness": 0.20, + "citation_density": 0.10, +} + +# Tiers considered "empirically grounded" (T5+) +_EMPIRICAL_TIERS = {"T5", "T6", "T7", "T8", "T9", "T10", "T11", "T12", "T13"} +_THEORETICAL_TIERS = {"T1", "T2", "T3", "T4"} + +# Minimum expected tag count per entry +_MIN_TAGS = 1 + +# NIST/EU keywords that indicate a governance tag is present +_GOVERNANCE_KEYWORDS = { + "nist", "eu", "ai-act", "gdpr", "iso", "soc2", "governance", + "compliance", "regulatory", "audit", "accountability", +} + + +class BibliographyScorer(ArtifactScorer): + """Score a bibliography batch for quality and completeness.""" + + @property + def artifact_type(self) -> str: + return _TYPE + + def score(self, artifact: dict) -> ArtifactScore: + entries = self._extract_entries(artifact) + if not entries: + return self._empty_score() + + findings: list[ArtifactFinding] = [] + dim_findings: dict[str, list[ArtifactFinding]] = {d: [] for d in _WEIGHTS} + n = len(entries) + tier_counts: dict[str, int] = {} + counters = {"doi": 0, "tagged": 0, "complete": 0, "links": 0} + + for entry in entries: + self._score_entry(entry, counters, tier_counts, findings, dim_findings) + + self._check_tier_distribution(tier_counts, n, findings, dim_findings) + avg_links = counters["links"] / n if n > 0 else 0.0 + self._check_citation_density(avg_links, findings, dim_findings) + + return _score_from_findings( + findings=findings, + dimension_findings=dim_findings, + weights=_WEIGHTS, + artifact_type=_TYPE, + metadata={ + "entry_count": n, + "doi_coverage_pct": round(counters["doi"] / n * 100, 1) if n else 0, + "tagged_pct": round(counters["tagged"] / n * 100, 1) if n else 0, + "complete_pct": round(counters["complete"] / n * 100, 1) if n else 0, + "avg_links": round(avg_links, 2), + "tier_distribution": tier_counts, + }, + ) + + @staticmethod + def _empty_score() -> ArtifactScore: + return ArtifactScore( + artifact_type=_TYPE, + overall=0.0, + dimensions={d: 0.0 for d in _WEIGHTS}, + total_findings=1, + findings=(ArtifactFinding( + field="entries", + severity="CRITICAL", + rule_id="BIB000", + message="No entries found in artifact", + scorer=_TYPE, + ),), + findings_by_severity={"CRITICAL": 1}, + metadata={"entry_count": 0}, + ) + + def _score_entry( + self, + entry: dict, + counters: dict[str, int], + tier_counts: dict[str, int], + findings: list[ArtifactFinding], + dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + entry_id = entry.get("id", "?") + self._check_doi(entry, entry_id, counters, findings, dim_findings) + tier = str(entry.get("tier", "")).upper() + tier_counts[tier] = tier_counts.get(tier, 0) + 1 + self._check_tags(entry, entry_id, counters, findings, dim_findings) + self._check_required_fields(entry, entry_id, counters, findings, dim_findings) + counters["links"] += self._count_links(entry) + + @staticmethod + def _check_doi( + entry: dict, entry_id: str, counters: dict[str, int], + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if entry.get("doi") or entry.get("url"): + counters["doi"] += 1 + return + f = ArtifactFinding( + field="doi", severity="LOW", rule_id="BIB101", + message=f"Entry '{entry_id}' has no DOI or URL", scorer=_TYPE, + ) + findings.append(f) + dim_findings["doi_coverage"].append(f) + + @staticmethod + def _check_tags( + entry: dict, entry_id: str, counters: dict[str, int], + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + tags = entry.get("tags") or [] + if isinstance(tags, str): + tags = [t.strip() for t in tags.split(",") if t.strip()] + if len(tags) >= _MIN_TAGS: + counters["tagged"] += 1 + return + f = ArtifactFinding( + field="tags", severity="MEDIUM", rule_id="BIB201", + message=f"Entry '{entry_id}' has no tags (min {_MIN_TAGS} required)", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["tag_completeness"].append(f) + + @staticmethod + def _check_required_fields( + entry: dict, entry_id: str, counters: dict[str, int], + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + required = ("id", "title", "year", "tier") + missing = [r for r in required if not entry.get(r)] + if not missing: + counters["complete"] += 1 + for m in missing: + f = ArtifactFinding( + field=m, severity="HIGH", rule_id="BIB301", + message=f"Entry '{entry_id}' missing required field '{m}'", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["entry_completeness"].append(f) + + @staticmethod + def _count_links(entry: dict) -> int: + links = entry.get("links") or [] + if isinstance(links, str): + links = [link.strip() for link in links.split(",") if link.strip()] + return len(links) + + @staticmethod + def _check_tier_distribution( + tier_counts: dict[str, int], n: int, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if n <= 0: + return + theoretical = sum(tier_counts.get(t, 0) for t in _THEORETICAL_TIERS) + pct = theoretical / n + if pct > 0.75: + severity, rule = "HIGH", "BIB402" + tail = ("Bibliography is primarily theoretical — governance claims " + "lack empirical grounding.") + elif pct > 0.60: + severity, rule = "MEDIUM", "BIB401" + tail = "Consider adding more T5-T7 empirical sources." + else: + return + f = ArtifactFinding( + field="tier_distribution", severity=severity, rule_id=rule, + message=f"{pct:.0%} of entries are T1-T4 (theoretical). {tail}", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["tier_distribution"].append(f) + + @staticmethod + def _check_citation_density( + avg_links: float, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if avg_links >= 0.5: + return + f = ArtifactFinding( + field="links", severity="LOW", rule_id="BIB501", + message=( + f"Average citation links per entry: {avg_links:.2f}. " + "Low cross-referencing reduces knowledge graph connectivity." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["citation_density"].append(f) + + def _extract_entries(self, artifact: dict) -> list[dict]: + """Support both batch (entries: [...]) and single (entry: {...}) formats.""" + if "entries" in artifact: + return artifact["entries"] or [] + if "entry" in artifact: + return [artifact["entry"]] + # Allow passing the list directly + if isinstance(artifact, list): + return artifact + return [] diff --git a/src/arbiter/scorers/governance_receipt_scorer.py b/src/arbiter/scorers/governance_receipt_scorer.py new file mode 100644 index 0000000..db8f4a2 --- /dev/null +++ b/src/arbiter/scorers/governance_receipt_scorer.py @@ -0,0 +1,348 @@ +"""Governance Receipt Scorer -- quality scoring for HUMMBL governance receipts. + +Scores a governance receipt (HUMMBL HITL/IDP audit artifact) across 5 dimensions: + + completeness -- all required fields present and non-empty + chain_of_custody -- agent chain is valid, no gaps, no self-approval + timestamp_validity -- timestamps are valid ISO8601, in order, not future + evidence_ratio -- decision/output fields are backed by evidence links + schema_compliance -- schema_version present, known type, no forbidden fields + +Artifact input format (dict) — minimum viable receipt: + { + "receipt_id": "rcpt-...", + "schema_version": "1.0", + "agent": "claude-code", + "action_type": "DECISION" | "REVIEW" | "APPROVAL" | "DELEGATION", + "timestamp": "2026-04-09T14:00:00Z", + "input_summary": "...", + "output_summary": "...", + "confidence": 0.85, + "evidence": ["clp-abc123", "..."], # CLP entry IDs or URLs + "chain": [ # delegation/review chain + {"agent": "human", "role": "approver", "timestamp": "..."}, + ], + "tags": ["governance", "hitl"], + } + +HUMMBL compliance note: + Receipts must meet minimum standards to be audit-defensible under + EU AI Act Article 12 (record keeping) and NIST AI RMF GOVERN 1.2. + A score below 70 (Grade C) indicates the receipt is not audit-ready. +""" + +from __future__ import annotations + +import datetime +import re + +from arbiter.artifact_scorer import ( + ArtifactFinding, + ArtifactScore, + ArtifactScorer, + _score_from_findings, +) + +_TYPE = "governance" + +# Dimension weights +_WEIGHTS = { + "completeness": 0.30, + "chain_of_custody": 0.25, + "timestamp_validity": 0.15, + "evidence_ratio": 0.20, + "schema_compliance": 0.10, +} + +# Required fields for a valid receipt +_REQUIRED_FIELDS = ( + "receipt_id", + "schema_version", + "agent", + "action_type", + "timestamp", + "output_summary", + "confidence", +) + +# Recommended fields (penalised if absent, but not blocking) +_RECOMMENDED_FIELDS = ("input_summary", "evidence", "tags") + +# Known valid action types +_VALID_ACTION_TYPES = { + "DECISION", "REVIEW", "APPROVAL", "DELEGATION", + "OBSERVATION", "CORRECTION", "ESCALATION", +} + +# Known valid schema versions +_KNOWN_SCHEMA_VERSIONS = {"1.0", "1.1", "2.0"} + +# ISO8601 UTC timestamp pattern (Z suffix) +_TS_RE = re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z$") + +# Maximum allowed confidence for a receipt that claims VERIFIED assurance +_MAX_SELF_CONFIDENCE = 0.95 + + +def _parse_ts(ts: str) -> datetime.datetime | None: + """Parse ISO8601 UTC timestamp. Returns None on failure.""" + try: + return datetime.datetime.strptime(ts, "%Y-%m-%dT%H:%M:%SZ").replace( + tzinfo=datetime.timezone.utc + ) + except (ValueError, TypeError): + return None + + +class GovernanceReceiptScorer(ArtifactScorer): + """Score a HUMMBL governance receipt for audit readiness.""" + + @property + def artifact_type(self) -> str: + return _TYPE + + def score(self, artifact: dict) -> ArtifactScore: + findings: list[ArtifactFinding] = [] + dim_findings: dict[str, list[ArtifactFinding]] = {d: [] for d in _WEIGHTS} + + chain = artifact.get("chain") or [] + evidence = artifact.get("evidence") or [] + if isinstance(evidence, str): + evidence = [e.strip() for e in evidence.split(",") if e.strip()] + schema_version = str(artifact.get("schema_version", "")) + + self._check_completeness(artifact, findings, dim_findings) + self._check_chain_of_custody(artifact, chain, findings, dim_findings) + primary_ts = self._check_primary_timestamp(artifact, findings, dim_findings) + self._check_chain_timestamps(chain, primary_ts, artifact, findings, dim_findings) + self._check_evidence(artifact, evidence, findings, dim_findings) + self._check_schema(schema_version, artifact, findings, dim_findings) + + return _score_from_findings( + findings=findings, + dimension_findings=dim_findings, + weights=_WEIGHTS, + artifact_type=_TYPE, + metadata={ + "receipt_id": artifact.get("receipt_id", ""), + "agent": artifact.get("agent", ""), + "action_type": artifact.get("action_type", ""), + "evidence_count": len(evidence), + "chain_length": len(chain), + "schema_version": schema_version, + }, + ) + + @staticmethod + def _check_completeness( + artifact: dict, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + for field in _REQUIRED_FIELDS: + val = artifact.get(field) + if val is None or val == "" or val == []: + severity = "CRITICAL" if field in ("receipt_id", "agent", "timestamp") else "HIGH" + f = ArtifactFinding( + field=field, severity=severity, rule_id="GOV101", + message=f"Required field '{field}' is missing or empty", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["completeness"].append(f) + + for field in _RECOMMENDED_FIELDS: + val = artifact.get(field) + if val is None or val == "" or val == []: + f = ArtifactFinding( + field=field, severity="LOW", rule_id="GOV102", + message=f"Recommended field '{field}' is missing", scorer=_TYPE, + ) + findings.append(f) + dim_findings["completeness"].append(f) + + confidence = artifact.get("confidence") + if confidence is None: + return + try: + conf_float = float(confidence) + except (TypeError, ValueError): + f = ArtifactFinding( + field="confidence", severity="HIGH", rule_id="GOV104", + message=f"Confidence is not a number: {confidence!r}", scorer=_TYPE, + ) + findings.append(f) + dim_findings["completeness"].append(f) + return + if not 0.0 <= conf_float <= 1.0: + f = ArtifactFinding( + field="confidence", severity="HIGH", rule_id="GOV103", + message=f"Confidence {conf_float} out of range [0.0, 1.0]", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["completeness"].append(f) + + @staticmethod + def _check_chain_of_custody( + artifact: dict, chain: list, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if not chain: + f = ArtifactFinding( + field="chain", severity="MEDIUM", rule_id="GOV201", + message="No chain-of-custody records. Single-agent receipt has no review trail.", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["chain_of_custody"].append(f) + return + + agents_in_chain = [c.get("agent") for c in chain if c.get("agent")] + primary_agent = artifact.get("agent") + if agents_in_chain and all(a == primary_agent for a in agents_in_chain): + f = ArtifactFinding( + field="chain", severity="HIGH", rule_id="GOV202", + message=( + f"Self-approval detected: agent '{primary_agent}' " + "appears as both author and sole reviewer in chain." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["chain_of_custody"].append(f) + + for i, step in enumerate(chain): + for cf in ("agent", "role", "timestamp"): + if step.get(cf): + continue + f = ArtifactFinding( + field=f"chain[{i}].{cf}", severity="MEDIUM", rule_id="GOV203", + message=f"Chain step {i} missing '{cf}'", scorer=_TYPE, + ) + findings.append(f) + dim_findings["chain_of_custody"].append(f) + + @staticmethod + def _check_primary_timestamp( + artifact: dict, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> datetime.datetime | None: + primary_ts_str = artifact.get("timestamp", "") + if not primary_ts_str: + return None + if not _TS_RE.match(str(primary_ts_str)): + f = ArtifactFinding( + field="timestamp", severity="HIGH", rule_id="GOV301", + message=( + f"Timestamp '{primary_ts_str}' is not ISO8601 UTC " + "(expected YYYY-MM-DDTHH:MM:SSZ)" + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["timestamp_validity"].append(f) + return None + primary_ts = _parse_ts(primary_ts_str) + if primary_ts and primary_ts > datetime.datetime.now(datetime.timezone.utc): + f = ArtifactFinding( + field="timestamp", severity="HIGH", rule_id="GOV302", + message=f"Timestamp '{primary_ts_str}' is in the future", scorer=_TYPE, + ) + findings.append(f) + dim_findings["timestamp_validity"].append(f) + return primary_ts + + @staticmethod + def _check_chain_timestamps( + chain: list, primary_ts: datetime.datetime | None, artifact: dict, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if not primary_ts: + return + primary_ts_str = artifact.get("timestamp", "") + for i, step in enumerate(chain): + step_ts_str = step.get("timestamp", "") + if not step_ts_str: + continue + step_ts = _parse_ts(str(step_ts_str)) + if step_ts and step_ts < primary_ts: + f = ArtifactFinding( + field=f"chain[{i}].timestamp", severity="MEDIUM", rule_id="GOV303", + message=( + f"Chain step {i} timestamp '{step_ts_str}' is before " + f"receipt timestamp '{primary_ts_str}' — causal order violation" + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["timestamp_validity"].append(f) + + @staticmethod + def _check_evidence( + artifact: dict, evidence: list, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + has_output = bool(artifact.get("output_summary") or artifact.get("decision")) + if not has_output: + return + if not evidence: + f = ArtifactFinding( + field="evidence", severity="HIGH", rule_id="GOV401", + message=( + "Receipt has output/decision but no evidence links. " + "Claims are ungrounded — not audit-defensible." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["evidence_ratio"].append(f) + return + if len(evidence) < 2: + f = ArtifactFinding( + field="evidence", severity="LOW", rule_id="GOV402", + message=( + f"Only {len(evidence)} evidence link(s). " + "Minimum 2 recommended for audit-grade grounding." + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["evidence_ratio"].append(f) + + @staticmethod + def _check_schema( + schema_version: str, artifact: dict, + findings: list[ArtifactFinding], dim_findings: dict[str, list[ArtifactFinding]], + ) -> None: + if not schema_version: + f = ArtifactFinding( + field="schema_version", severity="MEDIUM", rule_id="GOV501", + message="schema_version missing — cannot verify format compliance", + scorer=_TYPE, + ) + findings.append(f) + dim_findings["schema_compliance"].append(f) + elif schema_version not in _KNOWN_SCHEMA_VERSIONS: + f = ArtifactFinding( + field="schema_version", severity="LOW", rule_id="GOV502", + message=( + f"Unknown schema_version '{schema_version}'. " + f"Known versions: {sorted(_KNOWN_SCHEMA_VERSIONS)}" + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["schema_compliance"].append(f) + + action_type = artifact.get("action_type", "") + if action_type and action_type not in _VALID_ACTION_TYPES: + f = ArtifactFinding( + field="action_type", severity="MEDIUM", rule_id="GOV503", + message=( + f"Unknown action_type '{action_type}'. " + f"Known types: {sorted(_VALID_ACTION_TYPES)}" + ), + scorer=_TYPE, + ) + findings.append(f) + dim_findings["schema_compliance"].append(f) diff --git a/tests/test_artifact_scorer.py b/tests/test_artifact_scorer.py new file mode 100644 index 0000000..cf83e9a --- /dev/null +++ b/tests/test_artifact_scorer.py @@ -0,0 +1,527 @@ +"""Tests for Arbiter knowledge artifact scoring system. + +Covers: + - ArtifactFinding, ArtifactScore data models + - ArtifactScorerRegistry registration / lookup + - BibliographyScorer dimensions and findings + - GovernanceReceiptScorer dimensions and findings + - DEFAULT_REGISTRY wiring via scorers/__init__.py +""" + +from __future__ import annotations + +import pytest + +from arbiter.artifact_scorer import ( + ArtifactFinding, + ArtifactScore, + ArtifactScorerRegistry, + DEFAULT_REGISTRY, + _score_from_findings, +) +from arbiter.scorers.bibliography_scorer import BibliographyScorer +from arbiter.scorers.governance_receipt_scorer import GovernanceReceiptScorer +from arbiter.scorers.arcana_essay_scorer import ArcanaEssayScorer + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +def _good_bib_entry(entry_id: str = "bib-001") -> dict: + return { + "id": entry_id, + "title": "Governing AI Systems", + "authors": ["Smith, J."], + "year": 2024, + "tier": "T7", + "doi": "10.1234/test", + "tags": ["governance", "nist"], + "links": ["bib-002"], + } + + +def _good_receipt() -> dict: + return { + "receipt_id": "rcpt-abc123", + "schema_version": "1.0", + "agent": "claude-code", + "action_type": "DECISION", + "timestamp": "2026-04-09T14:00:00Z", + "input_summary": "Should we proceed with the merge?", + "output_summary": "Approved merge with 3 conditions.", + "confidence": 0.9, + "evidence": ["clp-abc001", "clp-abc002"], + "chain": [ + {"agent": "human", "role": "approver", "timestamp": "2026-04-09T14:05:00Z"} + ], + "tags": ["governance", "hitl"], + } + + +# --------------------------------------------------------------------------- +# ArtifactFinding +# --------------------------------------------------------------------------- + +class TestArtifactFinding: + def test_frozen(self): + f = ArtifactFinding("doi", "LOW", "BIB101", "No DOI", "bibliography") + with pytest.raises((AttributeError, TypeError)): + f.field = "other" # type: ignore[misc] + + def test_fields(self): + f = ArtifactFinding("doi", "HIGH", "BIB001", "missing", "test-scorer") + assert f.field == "doi" + assert f.severity == "HIGH" + assert f.rule_id == "BIB001" + assert f.message == "missing" + assert f.scorer == "test-scorer" + + +# --------------------------------------------------------------------------- +# ArtifactScore +# --------------------------------------------------------------------------- + +class TestArtifactScore: + def test_grade_scale(self): + def make(overall: float) -> ArtifactScore: + return ArtifactScore( + artifact_type="test", + overall=overall, + dimensions={"x": overall}, + total_findings=0, + ) + + assert make(95).grade == "A" + assert make(85).grade == "B" + assert make(75).grade == "C" + assert make(65).grade == "D" + assert make(50).grade == "F" + + def test_summary_contains_grade(self): + s = ArtifactScore( + artifact_type="bibliography", + overall=87.5, + dimensions={"doi_coverage": 90.0}, + total_findings=2, + ) + summary = s.summary() + assert "B" in summary + assert "87.5" in summary + assert "bibliography" in summary + + def test_to_dict_roundtrip(self): + f = ArtifactFinding("doi", "LOW", "BIB101", "No DOI", "bibliography") + s = ArtifactScore( + artifact_type="bibliography", + overall=75.0, + dimensions={"doi_coverage": 75.0}, + total_findings=1, + findings=(f,), + findings_by_severity={"LOW": 1}, + ) + d = s.to_dict() + assert d["grade"] == "C" + assert d["total_findings"] == 1 + assert d["findings"][0]["rule_id"] == "BIB101" + + +# --------------------------------------------------------------------------- +# ArtifactScorerRegistry +# --------------------------------------------------------------------------- + +class TestArtifactScorerRegistry: + def test_register_and_lookup(self): + registry = ArtifactScorerRegistry() + registry.register(BibliographyScorer()) + assert "bibliography" in registry + assert registry.available_types() == ["bibliography"] + + def test_unknown_type_raises(self): + registry = ArtifactScorerRegistry() + with pytest.raises(KeyError, match="No scorer registered"): + registry.score("does-not-exist", {}) + + def test_overwrite(self): + registry = ArtifactScorerRegistry() + registry.register(BibliographyScorer()) + registry.register(BibliographyScorer()) # no error on overwrite + assert len(registry.available_types()) == 1 + + def test_default_registry_has_built_ins(self): + import arbiter.scorers # noqa: F401 — triggers registration + assert "bibliography" in DEFAULT_REGISTRY + assert "governance" in DEFAULT_REGISTRY + + +# --------------------------------------------------------------------------- +# BibliographyScorer +# --------------------------------------------------------------------------- + +class TestBibliographyScorer: + def setup_method(self): + self.scorer = BibliographyScorer() + + def test_artifact_type(self): + assert self.scorer.artifact_type == "bibliography" + + def test_empty_entries_is_critical(self): + result = self.scorer.score({"entries": []}) + assert result.grade == "F" + assert result.total_findings >= 1 + assert any(f.severity == "CRITICAL" for f in result.findings) + + def test_perfect_batch(self): + entries = [_good_bib_entry(f"bib-{i:03d}") for i in range(10)] + result = self.scorer.score({"entries": entries}) + assert result.overall >= 85.0 + assert result.grade in ("A", "B") + + def test_missing_doi_produces_low_finding(self): + entry = _good_bib_entry() + del entry["doi"] + result = self.scorer.score({"entry": entry}) + doi_findings = [f for f in result.findings if f.rule_id == "BIB101"] + assert len(doi_findings) == 1 + assert doi_findings[0].severity == "LOW" + + def test_missing_tags_produces_medium_finding(self): + entry = _good_bib_entry() + entry["tags"] = [] + result = self.scorer.score({"entry": entry}) + tag_findings = [f for f in result.findings if f.rule_id == "BIB201"] + assert len(tag_findings) == 1 + assert tag_findings[0].severity == "MEDIUM" + + def test_missing_required_field_produces_high_finding(self): + entry = _good_bib_entry() + del entry["title"] + result = self.scorer.score({"entry": entry}) + assert any(f.rule_id == "BIB301" and f.severity == "HIGH" for f in result.findings) + + def test_theoretical_dominance_produces_medium_finding(self): + entries = [] + for i in range(7): # 70% T1-T2 + e = _good_bib_entry(f"bib-{i:03d}") + e["tier"] = "T1" + entries.append(e) + for i in range(3): # 30% T7 + e = _good_bib_entry(f"bib-1{i:02d}") + e["tier"] = "T7" + entries.append(e) + result = self.scorer.score({"entries": entries}) + tier_findings = [f for f in result.findings if f.rule_id in ("BIB401", "BIB402")] + assert len(tier_findings) >= 1 + + def test_metadata_populated(self): + entries = [_good_bib_entry(f"bib-{i:03d}") for i in range(5)] + result = self.scorer.score({"entries": entries}) + assert result.metadata["entry_count"] == 5 + assert "doi_coverage_pct" in result.metadata + assert "tier_distribution" in result.metadata + + def test_single_entry_shorthand(self): + result = self.scorer.score({"entry": _good_bib_entry()}) + assert result.metadata["entry_count"] == 1 + + def test_deterministic(self): + entries = [_good_bib_entry(f"bib-{i:03d}") for i in range(3)] + r1 = self.scorer.score({"entries": entries}) + r2 = self.scorer.score({"entries": entries}) + assert r1.overall == r2.overall + assert r1.grade == r2.grade + + +# --------------------------------------------------------------------------- +# GovernanceReceiptScorer +# --------------------------------------------------------------------------- + +class TestGovernanceReceiptScorer: + def setup_method(self): + self.scorer = GovernanceReceiptScorer() + + def test_artifact_type(self): + assert self.scorer.artifact_type == "governance" + + def test_perfect_receipt_scores_high(self): + result = self.scorer.score(_good_receipt()) + assert result.overall >= 85.0 + assert result.grade in ("A", "B") + + def test_missing_required_fields(self): + receipt = {} + result = self.scorer.score(receipt) + critical = [f for f in result.findings if f.severity == "CRITICAL"] + assert len(critical) >= 2 # receipt_id, agent, timestamp + + def test_self_approval_detected(self): + receipt = _good_receipt() + receipt["chain"] = [ + {"agent": "claude-code", "role": "approver", "timestamp": "2026-04-09T14:05:00Z"} + ] + result = self.scorer.score(receipt) + self_approval = [f for f in result.findings if f.rule_id == "GOV202"] + assert len(self_approval) == 1 + assert self_approval[0].severity == "HIGH" + + def test_no_evidence_for_decision(self): + receipt = _good_receipt() + receipt["evidence"] = [] + result = self.scorer.score(receipt) + evidence_findings = [f for f in result.findings if f.rule_id == "GOV401"] + assert len(evidence_findings) == 1 + assert evidence_findings[0].severity == "HIGH" + + def test_invalid_timestamp_format(self): + receipt = _good_receipt() + receipt["timestamp"] = "April 9, 2026" # not ISO8601 + result = self.scorer.score(receipt) + ts_findings = [f for f in result.findings if f.rule_id == "GOV301"] + assert len(ts_findings) == 1 + + def test_future_timestamp(self): + receipt = _good_receipt() + receipt["timestamp"] = "2099-01-01T00:00:00Z" + result = self.scorer.score(receipt) + ts_findings = [f for f in result.findings if f.rule_id == "GOV302"] + assert len(ts_findings) == 1 + + def test_chain_timestamp_before_primary(self): + receipt = _good_receipt() + receipt["chain"] = [ + {"agent": "human", "role": "approver", "timestamp": "2026-04-09T13:00:00Z"} + # 1 hour BEFORE receipt timestamp + ] + result = self.scorer.score(receipt) + causal_findings = [f for f in result.findings if f.rule_id == "GOV303"] + assert len(causal_findings) == 1 + + def test_unknown_schema_version(self): + receipt = _good_receipt() + receipt["schema_version"] = "99.0" + result = self.scorer.score(receipt) + schema_findings = [f for f in result.findings if f.rule_id == "GOV502"] + assert len(schema_findings) == 1 + + def test_unknown_action_type(self): + receipt = _good_receipt() + receipt["action_type"] = "MAGIC" + result = self.scorer.score(receipt) + at_findings = [f for f in result.findings if f.rule_id == "GOV503"] + assert len(at_findings) == 1 + + def test_no_chain_produces_medium_finding(self): + receipt = _good_receipt() + receipt["chain"] = [] + result = self.scorer.score(receipt) + chain_findings = [f for f in result.findings if f.rule_id == "GOV201"] + assert len(chain_findings) == 1 + assert chain_findings[0].severity == "MEDIUM" + + def test_metadata_populated(self): + result = self.scorer.score(_good_receipt()) + assert result.metadata["receipt_id"] == "rcpt-abc123" + assert result.metadata["chain_length"] == 1 + assert result.metadata["evidence_count"] == 2 + + def test_deterministic(self): + r1 = self.scorer.score(_good_receipt()) + r2 = self.scorer.score(_good_receipt()) + assert r1.overall == r2.overall + + +# --------------------------------------------------------------------------- +# _score_from_findings helper +# --------------------------------------------------------------------------- + +class TestScoreFromFindings: + def test_no_findings_gives_100(self): + result = _score_from_findings( + findings=[], + dimension_findings={"dim_a": [], "dim_b": []}, + weights={"dim_a": 0.5, "dim_b": 0.5}, + artifact_type="test", + ) + assert result.overall == 100.0 + assert result.grade == "A" + + def test_critical_finding_reduces_score(self): + f = ArtifactFinding("field", "CRITICAL", "X001", "bad", "test") + result = _score_from_findings( + findings=[f], + dimension_findings={"dim_a": [f]}, + weights={"dim_a": 1.0}, + artifact_type="test", + ) + assert result.overall < 100.0 + assert result.total_findings == 1 + assert result.findings_by_severity == {"CRITICAL": 1} + + def test_grade_f_when_all_critical(self): + findings = [ + ArtifactFinding("f", "CRITICAL", f"X{i:03d}", "bad", "test") + for i in range(10) + ] + result = _score_from_findings( + findings=findings, + dimension_findings={"dim_a": findings}, + weights={"dim_a": 1.0}, + artifact_type="test", + ) + assert result.grade == "F" + + +# --------------------------------------------------------------------------- +# ArcanaEssayScorer +# --------------------------------------------------------------------------- + +def _good_essay(lens: str = "bki") -> dict: + """Minimal passing essay artifact with all required sections.""" + content = "\n".join([ + "## Overview", + "This essay examines BKI through the lens of belonging infrastructure.", + "BKI provides structural preconditions for knowledge creation.", + "", + "## Convergence Signal", + "Multiple frameworks converge on the claim that belonging precedes knowledge transmission. [bib-007] provides T7 empirical grounding for this claim across 14 organisations.", + "", + "## Challenge", + "Critics argue that belonging is a soft outcome, not a hard precondition.", + "", + "## HUMMBL", + "HUMMBL operationalises BKI by creating transparency and trust receipts.", + "", + "## Conclusion", + "BKI belongs in the governance stack as a structural layer, not a nice-to-have.", + ]) + return { + "lens": lens, + "title": "BKI: Belonging as Governance Infrastructure", + "content": content, + "convergence_signal": "Multiple frameworks converge on belonging as precondition [bib-007].", + "citations": ["bib-001", "bib-007", "bib-042"], + "citation_tiers": {"bib-001": "T3", "bib-007": "T7", "bib-042": "T6"}, + "word_count": 120, + } + + +class TestArcanaEssayScorer: + def setup_method(self): + self.scorer = ArcanaEssayScorer() + + def test_artifact_type(self): + assert self.scorer.artifact_type == "arcana_essay" + + def test_perfect_essay_scores_high(self): + result = self.scorer.score(_good_essay()) + assert result.overall >= 75.0 + assert result.grade in ("A", "B", "C") + + def test_empty_essay_not_ledger_ready(self): + """An essay with no content and no citations must not be Grade A or B.""" + result = self.scorer.score({"lens": "bki", "content": "", "citations": []}) + # Structural: 5 missing sections (2 HIGH + 3 MEDIUM) + word count MEDIUM + # Citation density: 0 citations (HIGH) + # Empirical grounding: 0 citations (HIGH via ARC304) + assert result.grade not in ("A", "B"), ( + f"Empty essay should not be ledger-ready, got {result.grade} ({result.overall:.1f})" + ) + assert result.total_findings >= 7 + + def test_zero_citations_triggers_arc304(self): + essay = _good_essay() + essay["citations"] = [] + essay["citation_tiers"] = {} + result = self.scorer.score(essay) + arc304 = [f for f in result.findings if f.rule_id == "ARC304"] + assert len(arc304) == 1 + assert arc304[0].severity == "HIGH" + + def test_missing_required_section_produces_finding(self): + essay = _good_essay() + # Remove the Conclusion section + essay["content"] = essay["content"].replace("## Conclusion\n", "## MISSING\n") + result = self.scorer.score(essay) + section_findings = [f for f in result.findings if f.rule_id == "ARC101"] + assert any("conclusion" in f.message.lower() for f in section_findings) + + def test_high_severity_section_missing(self): + essay = _good_essay() + essay["content"] = essay["content"].replace("## Overview\n", "## REMOVED\n") + result = self.scorer.score(essay) + high_findings = [f for f in result.findings if f.rule_id == "ARC101" and f.severity == "HIGH"] + assert len(high_findings) >= 1 + + def test_no_citations_produces_high_finding(self): + essay = _good_essay() + essay["citations"] = [] + result = self.scorer.score(essay) + cite_findings = [f for f in result.findings if f.rule_id == "ARC201"] + assert len(cite_findings) == 1 + assert cite_findings[0].severity == "HIGH" + + def test_no_empirical_citations_produces_high_finding(self): + essay = _good_essay() + essay["citation_tiers"] = {"bib-001": "T1", "bib-007": "T2", "bib-042": "T3"} + result = self.scorer.score(essay) + empirical_findings = [f for f in result.findings if f.rule_id == "ARC301"] + assert len(empirical_findings) == 1 + assert empirical_findings[0].severity == "HIGH" + + def test_no_tier_metadata_produces_low_finding(self): + essay = _good_essay() + del essay["citation_tiers"] + result = self.scorer.score(essay) + tier_findings = [f for f in result.findings if f.rule_id == "ARC303"] + assert len(tier_findings) == 1 + assert tier_findings[0].severity == "LOW" + + def test_unknown_lens_produces_low_finding(self): + essay = _good_essay(lens="unknown_philosopher_xyz") + result = self.scorer.score(essay) + lens_findings = [f for f in result.findings if f.rule_id == "ARC501"] + assert len(lens_findings) == 1 + + def test_lens_not_in_content_produces_finding(self): + essay = _good_essay(lens="yarvin") + # content mentions bki, not yarvin + result = self.scorer.score(essay) + topic_findings = [f for f in result.findings if f.rule_id == "ARC502"] + assert len(topic_findings) == 1 + + def test_short_essay_produces_finding(self): + essay = _good_essay() + essay["word_count"] = 50 + result = self.scorer.score(essay) + wc_findings = [f for f in result.findings if f.rule_id == "ARC102"] + assert len(wc_findings) == 1 + + def test_metadata_populated(self): + result = self.scorer.score(_good_essay()) + assert result.metadata["lens"] == "bki" + assert "word_count" in result.metadata + assert "citation_count" in result.metadata + assert result.metadata["citation_count"] == 3 + + def test_default_registry_includes_arcana_essay(self): + import arbiter.scorers # noqa: F401 + from arbiter.artifact_scorer import DEFAULT_REGISTRY + assert "arcana_essay" in DEFAULT_REGISTRY + + def test_deterministic(self): + r1 = self.scorer.score(_good_essay()) + r2 = self.scorer.score(_good_essay()) + assert r1.overall == r2.overall + assert r1.grade == r2.grade + + def test_all_theoretical_citations_produces_medium_finding(self): + essay = _good_essay() + essay["citation_tiers"] = { + "bib-001": "T1", "bib-007": "T2", "bib-042": "T1", + "bib-010": "T3", "bib-011": "T4", + } + essay["citations"] = list(essay["citation_tiers"].keys()) + result = self.scorer.score(essay) + diversity_findings = [f for f in result.findings if f.rule_id == "ARC402"] + assert len(diversity_findings) == 1 + assert diversity_findings[0].severity == "MEDIUM" diff --git a/tests/test_report.py b/tests/test_report.py index 21d397b..c00e429 100644 --- a/tests/test_report.py +++ b/tests/test_report.py @@ -1,6 +1,5 @@ """Tests for the report generator.""" -from pathlib import Path from arbiter.analyzers.base import Finding from arbiter.report import AuditReport, render_html, _grade_color, _escape