diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 51870de..b62164d 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -148,6 +148,7 @@ optional Firecrawl fallback, Playwright for x.com). - **Reads:** `items.json` - **Writes:** `items.json` — each item's `content` + `content_source[]` - **Cached** — already-fetched items are skipped (use `--force` to refetch). +- **Transient retries** — items whose only previous failures were `timeout` / `dns_error` are re-fetched on the next run without `--force`. Terminal failures (`not_found`, `paywall`, `forbidden`, `js_required`, `empty_content`) stay skipped until `--force`. - **Failures recorded as evidence** — `http_status` + `failure_reason`, never silently dropped. - **Snapshots `data/` before `--force`** — recovery path if a forced refetch makes things worse. @@ -394,6 +395,7 @@ These are the rules the rest of the architecture rests on. Breaking any of them 6. **`fetch` is cached per item id.** Re-runs do not re-hit the network without `--force` (or, in the future, transient-retry — issue #19). 7. **Operation names, not query ids.** The extractor anchors to X GraphQL operation names because X rotates the ids. Anything that hardcodes an id will break. 8. **Destructive ops are reversible.** Every command that overwrites a `data/` artifact (`vocab --regenerate`, `topics --resynth`, `fetch --force`) snapshots `data/` first to `data/snapshots/-pre-/`. `xbrain snapshot restore ` is the recovery path. A snapshot failure aborts the destructive op. +9. **Fetch records are tagged unions.** A `ContentSource` on `items.json` is either a `Success` (with required `text`) or a `Failure` (with required `failure_reason`). Mixed shapes are not representable — pydantic rejects them at construction, and mypy rejects them statically (via the `pydantic.mypy` plugin). Legacy records with `ok: bool` (pre-#20) are normalised on read by a `BeforeValidator` on the union, so existing `data/items.json` files keep working without a manual migration. The static contract is pinned by `tests/type_probes/illegal_states.py`. --- @@ -432,6 +434,7 @@ xbrain/ │ ├── notes_io.py ← per-note read/write + user-tail preservation │ ├── store.py ← items.json / topics.json / state.json I/O │ ├── snapshot.py ← data/ snapshot lifecycle (create/list/restore/prune) +│ ├── diff.py ← structured diff between two snapshot data dirs │ ├── worksheet.py ← enrich worksheet export/import │ ├── validate.py ← guardrails enforcement │ ├── llm_json.py ← extract JSON from LLM responses diff --git a/README.md b/README.md index 86e62c1..2f1aedc 100644 --- a/README.md +++ b/README.md @@ -195,6 +195,11 @@ Code Is Cheap Now. Software Isn't. https://t.co/J9m5RzQNbW Everything above the `xbrain:generated` marker is regenerated on every run; anything *you* write below it is preserved. +> Set `[output] topic_style = "hashtag"` in `config.toml` to render the +> in-body `**Topics:**` line as `#ai-coding #software-engineering` instead of +> wikilinks — useful if you navigate primarily via Obsidian's tag pane. The +> frontmatter `tags:` are native Obsidian tags in either mode. + ### Layer 2 — Topics The layer that makes XBrain more than a tidy backup. **A topic page is not a @@ -377,6 +382,7 @@ resynth_threshold = 25 # re-synthesise an overview after N ne [output] language = "English" # English | Spanish +topic_style = "wikilink" # wikilink | hashtag (in-body Topics: line) ``` | Section | Key | Default | Purpose | @@ -390,6 +396,7 @@ language = "English" # English | Spanish | `[vocab]` | `target_count` | `30` | Number of topics the `vocab` stage induces. | | `[topics]` | `resynth_threshold` | `25` | Post growth that marks a topic overview stale. | | `[output]` | `language` | `English` | Output language for LLM summaries/overviews AND wiki section headers. `English` or `Spanish`. | +| `[output]` | `topic_style` | `wikilink` | How the in-body `**Topics:**` line is rendered: `wikilink` (`[[slug]] · [[slug]]`) or `hashtag` (`#slug #slug`). Frontmatter `tags:` are unaffected. | Switching `[output].language` after the corpus is already enriched is supported — but does not retroactively translate existing summaries. To convert the @@ -508,7 +515,7 @@ uv run xbrain [options] |---------|-------------| | `extract` | Extract bookmarks and/or own tweets from X. `--source bookmarks\|tweets\|all`. | | `import-archive ` | Backfill the full own-tweet history from the official X data archive. | -| `fetch` | Download linked article content, expand threads, fetch linked X content. `--force` re-fetches everything. | +| `fetch` | Download linked article content, expand threads, fetch linked X content. By default, items whose only previous failures were transient (`timeout`, `dns_error`) are re-fetched automatically; terminal failures (`not_found`, `paywall`, `forbidden`, `js_required`, `empty_content`) stay skipped until `--force`. `--force` re-fetches every external_article source regardless of state. | | `vocab` | Induce the topic taxonomy. `--executor`, `--apply `, `--regenerate`. | | `enrich` | Enrich items with a summary + topics. `--executor`, `--apply `. | | `topics` | Synthesise topic pages. `--executor`, `--apply `, `--resynth`. | @@ -516,6 +523,7 @@ uv run xbrain [options] | `sync` | `extract` + `fetch` + `generate`, in order. | | `status` | Counts and last-run timestamps. | | `snapshot` | Manage `data/` snapshots: `create`, `list`, `show`, `restore`, `prune`. See [Snapshots & safety](#snapshots--safety). | +| `diff` | Compare two snapshots (or one snapshot vs. the live `data/`). Surfaces reassigned items, topic growth, overview drift, vocab changes. `--format text\|json`. | | `login` | Open a browser to log in to X (see [Authentication](#authentication) — prefer the cookie import). | Every stage accepts `--since` / `--until` (ISO dates) to narrow the date window. @@ -544,6 +552,19 @@ The Obsidian vault is **not** snapshotted — it is fully derived from `data/` via `xbrain generate`. `restore` rolls back `data/`; you run `xbrain generate` to rebuild the wiki from it. +After a destructive run, `xbrain diff ` shows exactly what +moved — items whose `primary_topic` was reassigned, topic memberships that +grew or shrank, overview text that drifted, and vocab slugs added or +removed. The B side defaults to the live `data/`, so the common case is one +short command. Add `--format json` to pipe the report into `jq` or a CI +gate. + +```bash +xbrain diff 2026-05-22T18-30-15Z-pre-vocab-regenerate # vs. live data/ +xbrain diff # two named snapshots +xbrain diff --format json | jq '.summary.reassigned_pct' +``` + --- ## Execution modes diff --git a/config.toml.example b/config.toml.example index d4f22e7..adcf202 100644 --- a/config.toml.example +++ b/config.toml.example @@ -34,3 +34,8 @@ resynth_threshold = 25 # "English" - default # "Spanish" language = "English" +# How the in-body `**Topics:**` line is rendered on each item note. +# Frontmatter `tags:` are unaffected by this setting. Supported values: +# "wikilink" - **Topics:** [[ai-coding]] · [[software-engineering]] (default) +# "hashtag" - **Topics:** #ai-coding #software-engineering +topic_style = "wikilink" diff --git a/pyproject.toml b/pyproject.toml index 0b3f3ad..8f001e1 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,17 @@ target-version = "py312" python_version = "3.12" ignore_missing_imports = true files = ["src/xbrain"] +# Pydantic mypy plugin turns model __init__ into a typed signature so +# mypy can flag missing required fields (e.g. ContentSourceSuccess +# without `text`). The #20 refactor relies on this to make illegal +# states unrepresentable at the type level — verified by the +# tests/type_probes/illegal_states.py probe. +plugins = ["pydantic.mypy"] + +[tool.pydantic-mypy] +init_forbid_extra = true +init_typed = true +warn_required_dynamic_aliases = true [tool.interrogate] ignore-init-method = true diff --git a/src/xbrain/cli.py b/src/xbrain/cli.py index acd06dd..9e842c3 100644 --- a/src/xbrain/cli.py +++ b/src/xbrain/cli.py @@ -14,6 +14,7 @@ from xbrain import snapshot from xbrain.archive import parse_archive from xbrain.config import Config, load_config +from xbrain.diff import diff_snapshots, format_json, format_text from xbrain.enrich import apply_worksheet_judgments, enrich_with_executor, items_pending_enrichment from xbrain.executors.api import ApiExecutor from xbrain.extract.browser import login as run_login @@ -191,7 +192,7 @@ def _run_fetch(cfg: Config, since: datetime | None, until: datetime | None, forc def _run_generate(cfg: Config, since: datetime | None, until: datetime | None) -> None: store = load_store(cfg.items_path) - run_generate(store, cfg.output_dir, since, until, cfg.output_language) + run_generate(store, cfg.output_dir, since, until, cfg.output_language, cfg.topic_style) typer.echo(f"Markdown generado en {cfg.output_dir}") @@ -549,5 +550,56 @@ def snapshot_prune_cmd( typer.echo(f"Snapshots deleted: {deleted}") +def _resolve_data_dir(cfg: Config, name: str | None) -> Path: + """Resolve a snapshot name to its data dir, or `None` to the live `data/`. + + `xbrain diff` accepts a snapshot name (resolved via `snapshot_show`) OR + `None` to mean "the current live `data/`" — the most common B-side of the + comparison the user runs after a destructive op. + """ + if name is None: + return cfg.data_dir + snapshot_dir, _ = snapshot.snapshot_show(cfg.data_dir, name) + return snapshot_dir + + +@app.command() +@_handle_cli_errors +def diff( + snapshot_a: str = typer.Argument(..., help="Snapshot name on the A side."), + snapshot_b: str | None = typer.Argument( + None, + help="Snapshot name on the B side. Defaults to the live data/ directory.", + ), + output_format: str = typer.Option( + "text", + "--format", + help="Output format: 'text' (default) or 'json'.", + ), +) -> None: + """Compare two snapshots and surface drift. + + Reports reassigned items, topic-membership shifts, topic-overview drift + (TF cosine similarity) and vocab changes. The B side defaults to the live + `data/` directory so `xbrain diff ` answers "what did the + last destructive op move?" with no extra arguments. + """ + cfg = _config() + if output_format not in ("text", "json"): + raise ValueError(f"--format must be 'text' or 'json', got {output_format!r}") + a_dir = _resolve_data_dir(cfg, snapshot_a) + b_dir = _resolve_data_dir(cfg, snapshot_b) + report = diff_snapshots(a_dir, b_dir) + if output_format == "json": + typer.echo(format_json(report)) + else: + b_label = snapshot_b if snapshot_b is not None else "live data/" + typer.echo("Comparing:") + typer.echo(f" A: {snapshot_a}") + typer.echo(f" B: {b_label}") + typer.echo("") + typer.echo(format_text(report)) + + if __name__ == "__main__": app() diff --git a/src/xbrain/config.py b/src/xbrain/config.py index db9856a..93f1935 100644 --- a/src/xbrain/config.py +++ b/src/xbrain/config.py @@ -10,6 +10,11 @@ from xbrain.i18n import strings_for from xbrain.models import ExecutorName +# In-body `**Topics:**` line styles. `wikilink` (default) keeps the current +# navigation-first behaviour; `hashtag` emits Obsidian tags so the line pivots +# into the tag pane. Frontmatter `tags:` are unaffected by this toggle. +SUPPORTED_TOPIC_STYLES: tuple[str, ...] = ("wikilink", "hashtag") + @dataclass(frozen=True) class Config: @@ -23,6 +28,7 @@ class Config: vocab_target_count: int topics_resynth_threshold: int output_language: str # one of xbrain.i18n.SUPPORTED_LANGUAGES + topic_style: str # one of xbrain.config.SUPPORTED_TOPIC_STYLES @property def items_path(self) -> Path: @@ -64,10 +70,17 @@ def load_config(repo_root: Path) -> Config: resynth_threshold = int(topics.get("resynth_threshold", 25)) if resynth_threshold < 1: raise ValueError("config.toml: [topics].resynth_threshold must be >= 1") - output_language = settings.get("output", {}).get("language", "English") + output = settings.get("output", {}) + output_language = output.get("language", "English") # Validate via strings_for: it already raises ValueError listing supported # languages on an unknown value. Single source of truth for the check. strings_for(output_language) + topic_style = output.get("topic_style", "wikilink") + if topic_style not in SUPPORTED_TOPIC_STYLES: + raise ValueError( + f"config.toml: [output].topic_style must be one of " + f"{list(SUPPORTED_TOPIC_STYLES)}, got {topic_style!r}" + ) return Config( repo_root=repo_root, vault=vault, @@ -79,4 +92,5 @@ def load_config(repo_root: Path) -> Config: vocab_target_count=target_count, topics_resynth_threshold=resynth_threshold, output_language=output_language, + topic_style=topic_style, ) diff --git a/src/xbrain/diff.py b/src/xbrain/diff.py new file mode 100644 index 0000000..995cc5d --- /dev/null +++ b/src/xbrain/diff.py @@ -0,0 +1,539 @@ +"""Structured diff between two snapshot data directories. + +The `diff_snapshots` orchestrator answers one question: **what changed between +two states of `data/`?** It compares the source-of-truth artifacts (items.json +enrichment, vocab.yaml, topics.json) and produces a structured `DiffReport` +that the CLI renders as text or JSON. + +The module is **pure I/O**: no `typer.echo`, no `print`, no CLI side-effects. +Inputs are two `Path`s pointing at *data directories* (the ones that hold +`items.json` / `vocab.yaml` / `topics.json` directly — a snapshot dir or the +live `data/` dir; the module does not distinguish). The CLI is the only thing +that knows what a "snapshot name" is and resolves it via +`xbrain.snapshot.snapshot_show`. + +Overview drift uses a pure-Python TF cosine similarity (see `_tf_cosine`). +No new dependencies: scikit-learn or sentence-transformers would tax the +install for one CLI feature, and the offline-by-default invariant rules out +API-call embeddings. LLM-judged similarity is the explicit follow-up tied to +WS3 (#8) — out of scope for v1. +""" + +from __future__ import annotations + +import math +import re +from collections import Counter +from collections.abc import Iterable +from pathlib import Path +from typing import Literal + +from pydantic import BaseModel, Field + +from xbrain.models import Item, Topic, TopicPage +from xbrain.rubrics import load_vocab +from xbrain.store import load_store, load_topic_pages + +# --------------------------------------------------------------------- public models + +#: Three-state classification of how much a topic-page overview moved. +#: `not_comparable` covers the case where one side has no topic-page entry. +OverviewFlag = Literal["identical", "similar", "different", "not_comparable"] + +#: Topics with fewer than this many starting members do not get a growth/shrink +#: flag — a 2→3 jump is 50% growth but statistically meaningless on a small +#: topic. Hardcoded floor; promote to a parameter if user research demands. +_MIN_MEMBERS_FOR_GROWTH_FLAG = 5 + +#: Cosine similarity at or above this counts as "identical" (handles fp slop). +_IDENTICAL_SIMILARITY = 0.99 + +# Tokenizer: lowercase, ASCII letters/digits + Romance-language Latin +# accented characters (à-ÿ) so Spanish/French overviews tokenize correctly. +# Length filter happens in the helper — single-character tokens add noise. +_TOKEN_RE = re.compile(r"[a-zà-ÿ0-9]+") + + +class Transition(BaseModel): + """One `primary_topic` reassignment between snapshot A and B. + + `from_topic` / `to_topic` carry `None` when the item had no enrichment on + that side (e.g. unenriched in A, enriched in B → `from_topic=None`). The + counter aggregates identical (from, to) pairs across all items. + """ + + from_topic: str | None + to_topic: str | None + count: int + + +class ItemsDiff(BaseModel): + """Item-level reassignment summary across two snapshots.""" + + count_a: int + count_b: int + count_in_both: int + enriched_in_both: int + reassigned: int + reassigned_pct: float + top_transitions: list[Transition] = Field(default_factory=list) + + +class TopicChange(BaseModel): + """Per-topic membership shift and overview drift. + + A "member" is an item whose `primary_topic` equals this topic's slug. The + set ops (`added`, `removed`, `unchanged`) operate on the item ids — they + answer "which items entered or left this topic between A and B", not just + "did the count move". + """ + + slug: str + members_a: int + members_b: int + added: int + removed: int + unchanged: int + growth_pct: float | None + flagged_growth: bool + overview_a: str | None = None + overview_b: str | None = None + overview_similarity: float | None = None + overview_flag: OverviewFlag = "not_comparable" + + +class TopicsDiff(BaseModel): + """All per-topic changes, keyed by slug (union of vocab A and vocab B).""" + + per_slug: dict[str, TopicChange] + + +class VocabDiff(BaseModel): + """Slug-level set difference between two `vocab.yaml` files. + + `unchanged_count` carries the cardinality of slugs present in both vocabs + — only the count is consumed downstream (text renderer, JSON snapshot), + so the full list is not stored. + """ + + added: list[str] = Field(default_factory=list) + removed: list[str] = Field(default_factory=list) + unchanged_count: int = 0 + + +class DiffSummary(BaseModel): + """Flat top-level counts — used by both the text header and JSON consumers.""" + + items_a: int + items_b: int + items_in_both: int + enriched_in_both: int + reassigned: int + reassigned_pct: float + vocab_added: int + vocab_removed: int + topic_pages_a: int + topic_pages_b: int + + +class DiffReport(BaseModel): + """The full structured comparison between two data directories.""" + + summary: DiffSummary + items: ItemsDiff + topics: TopicsDiff + vocab: VocabDiff + + +# ----------------------------------------------------------------- public orchestrator + + +def diff_snapshots( + a_dir: Path, + b_dir: Path, + *, + overview_similarity_threshold: float = 0.7, + growth_flag_threshold: float = 0.10, + top_n_transitions: int = 10, +) -> DiffReport: + """Compute the structured diff between two snapshot data directories. + + `a_dir` and `b_dir` are *data directories* — the ones holding `items.json`, + `vocab.yaml`, `topics.json` directly. A snapshot directory and the live + `data/` directory are the same shape; the CLI passes whichever the user + asked for, and this function does not care which is which. + + The thresholds tune which transitions get flagged but do not affect the + raw counts. Defaults match PRD §5.2. + + Raises: + FileNotFoundError: if either directory does not exist on disk, or if + both directories are completely empty (no `items.json`, + `vocab.yaml` or `topics.json`). Without this guard a `diff` against + an accidentally-deleted `data/` would silently report + "everything removed" — surfacing as a clean error is safer. + ValueError: a context-adding wrap around the corrupt-file errors + raised by the underlying loaders, so the operator sees *which* + file is the problem rather than a bare pydantic / json traceback. + """ + for label, path in (("A", a_dir), ("B", b_dir)): + if not path.exists(): + raise FileNotFoundError(f"diff side {label}: directory not found: {path}") + + def _load_or_explain(path: Path, loader): + try: + return loader(path) + except (ValueError, OSError) as exc: # pydantic ValidationError is a ValueError + raise ValueError(f"failed to load {path}: {exc}") from exc + + items_a = _load_or_explain(a_dir / "items.json", load_store) + items_b = _load_or_explain(b_dir / "items.json", load_store) + vocab_a = _load_or_explain(a_dir / "vocab.yaml", load_vocab) + vocab_b = _load_or_explain(b_dir / "vocab.yaml", load_vocab) + pages_a = _load_or_explain(a_dir / "topics.json", load_topic_pages) + pages_b = _load_or_explain(b_dir / "topics.json", load_topic_pages) + + a_empty = not (items_a or vocab_a or pages_a) + b_empty = not (items_b or vocab_b or pages_b) + if a_empty and b_empty: + raise FileNotFoundError( + f"Both diff sides are empty (no items.json / vocab.yaml / topics.json " + f"present under {a_dir} or {b_dir}). Confirm the snapshot names and " + "that data/ is populated." + ) + + items_diff = _compute_items_diff(items_a, items_b, top_n=top_n_transitions) + vocab_diff = _compute_vocab_diff(vocab_a, vocab_b) + topics_diff = _compute_topics_diff( + items_a, + items_b, + pages_a, + pages_b, + vocab_a, + vocab_b, + growth_flag_threshold=growth_flag_threshold, + overview_similarity_threshold=overview_similarity_threshold, + ) + summary = DiffSummary( + items_a=items_diff.count_a, + items_b=items_diff.count_b, + items_in_both=items_diff.count_in_both, + enriched_in_both=items_diff.enriched_in_both, + reassigned=items_diff.reassigned, + reassigned_pct=items_diff.reassigned_pct, + vocab_added=len(vocab_diff.added), + vocab_removed=len(vocab_diff.removed), + topic_pages_a=len(pages_a), + topic_pages_b=len(pages_b), + ) + return DiffReport( + summary=summary, + items=items_diff, + topics=topics_diff, + vocab=vocab_diff, + ) + + +# --------------------------------------------------------------------- compute pieces + + +def _compute_items_diff( + items_a: dict[str, Item], + items_b: dict[str, Item], + *, + top_n: int, +) -> ItemsDiff: + """Reassignment count + top transitions, restricted to items in both. + + `reassigned` counts items present AND enriched on both sides whose + `primary_topic` differs. Items added in B or un-enriched on either side + do NOT count as reassignments — they show up in transitions and topic + membership instead. + """ + shared_ids = set(items_a) & set(items_b) + transitions: list[tuple[str | None, str | None]] = [] + reassigned = 0 + enriched_in_both = 0 + for item_id in shared_ids: + primary_a = _primary_topic(items_a[item_id]) + primary_b = _primary_topic(items_b[item_id]) + if primary_a is not None and primary_b is not None: + enriched_in_both += 1 + if primary_a != primary_b: + reassigned += 1 + transitions.append((primary_a, primary_b)) + elif primary_a != primary_b: + # One side unenriched. Surface as a transition (None on one side) + # but do NOT count as a reassignment — only judgment changes on + # items judged on BOTH sides count. + transitions.append((primary_a, primary_b)) + pct = (reassigned / enriched_in_both) if enriched_in_both else 0.0 + return ItemsDiff( + count_a=len(items_a), + count_b=len(items_b), + count_in_both=len(shared_ids), + enriched_in_both=enriched_in_both, + reassigned=reassigned, + reassigned_pct=pct, + top_transitions=_top_transitions(transitions, top_n=top_n), + ) + + +def _compute_topics_diff( + items_a: dict[str, Item], + items_b: dict[str, Item], + pages_a: dict[str, TopicPage], + pages_b: dict[str, TopicPage], + vocab_a: list[Topic], + vocab_b: list[Topic], + *, + growth_flag_threshold: float, + overview_similarity_threshold: float, +) -> TopicsDiff: + """Per-slug membership and overview drift, keyed by the union of slugs.""" + membership_a = _membership_by_topic(items_a) + membership_b = _membership_by_topic(items_b) + + # Union over: vocab slugs from both sides + any slug that appears as a + # primary_topic in either store (defensive — items can reference a slug + # that the vocab no longer carries). + all_slugs: set[str] = {t.slug for t in vocab_a} | {t.slug for t in vocab_b} + all_slugs |= set(membership_a) | set(membership_b) + + per_slug: dict[str, TopicChange] = {} + for slug in sorted(all_slugs): + member_ids_a = membership_a.get(slug, set()) + member_ids_b = membership_b.get(slug, set()) + added = len(member_ids_b - member_ids_a) + removed = len(member_ids_a - member_ids_b) + unchanged = len(member_ids_a & member_ids_b) + growth_pct = _growth_pct(len(member_ids_a), len(member_ids_b)) + flagged_growth = ( + growth_pct is not None + and len(member_ids_a) >= _MIN_MEMBERS_FOR_GROWTH_FLAG + and abs(growth_pct) >= growth_flag_threshold + ) + overview_a = pages_a[slug].overview if slug in pages_a else None + overview_b = pages_b[slug].overview if slug in pages_b else None + similarity, flag = _classify_overview( + overview_a, overview_b, threshold=overview_similarity_threshold + ) + per_slug[slug] = TopicChange( + slug=slug, + members_a=len(member_ids_a), + members_b=len(member_ids_b), + added=added, + removed=removed, + unchanged=unchanged, + growth_pct=growth_pct, + flagged_growth=flagged_growth, + overview_a=overview_a, + overview_b=overview_b, + overview_similarity=similarity, + overview_flag=flag, + ) + return TopicsDiff(per_slug=per_slug) + + +def _compute_vocab_diff(vocab_a: list[Topic], vocab_b: list[Topic]) -> VocabDiff: + """Slug set-difference between two vocab.yaml files (sorted output).""" + slugs_a = {t.slug for t in vocab_a} + slugs_b = {t.slug for t in vocab_b} + return VocabDiff( + added=sorted(slugs_b - slugs_a), + removed=sorted(slugs_a - slugs_b), + unchanged_count=len(slugs_a & slugs_b), + ) + + +# --------------------------------------------------------------------------- TF cosine + + +def _tf_cosine(text_a: str, text_b: str) -> float: + """Return TF cosine similarity in [0.0, 1.0] for two short prose strings. + + Tokenization: lowercase, runs of ASCII or Romance-Latin letters/digits of + length >= 2. With only two documents IDF degenerates, so this is plain TF + cosine (not TF-IDF) — relative ordering across topic pairs is what matters + for the `identical / similar / different` bucket assignment, not the + absolute score. + + Edge cases: + - Either text empty or contains no qualifying tokens → 0.0. + - Identical token bags → 1.0. + - Disjoint vocabularies → 0.0. + """ + tokens_a = _tokenize(text_a) + tokens_b = _tokenize(text_b) + if not tokens_a or not tokens_b: + return 0.0 + counter_a = Counter(tokens_a) + counter_b = Counter(tokens_b) + shared = set(counter_a) & set(counter_b) + if not shared: + return 0.0 + numerator = sum(counter_a[t] * counter_b[t] for t in shared) + norm_a = math.sqrt(sum(c * c for c in counter_a.values())) + norm_b = math.sqrt(sum(c * c for c in counter_b.values())) + return numerator / (norm_a * norm_b) + + +def _tokenize(text: str) -> list[str]: + """Tokens of length >= 2, lowercased, ASCII + Latin-1 letters/digits.""" + return [t for t in _TOKEN_RE.findall(text.lower()) if len(t) >= 2] + + +# ----------------------------------------------------------------------- renderers + + +def format_text(report: DiffReport) -> str: + """Render the diff as a human-readable terminal report.""" + lines: list[str] = [] + lines.append("ITEMS") + lines.append(f" count A: {report.summary.items_a}") + lines.append(f" count B: {report.summary.items_b}") + lines.append(f" in both snapshots: {report.summary.items_in_both}") + lines.append(f" enriched on both sides: {report.summary.enriched_in_both}") + lines.append( + f" primary_topic reassigned: {report.summary.reassigned} " + f"({report.summary.reassigned_pct * 100:.1f}%)" + ) + if report.items.top_transitions: + lines.append("") + lines.append(" Top transitions:") + for transition in report.items.top_transitions: + from_label = transition.from_topic if transition.from_topic is not None else "(none)" + to_label = transition.to_topic if transition.to_topic is not None else "(none)" + lines.append(f" {from_label} -> {to_label}: {transition.count} items") + lines.append("") + lines.append("TOPICS") + lines.extend(_format_topics_block(report.topics)) + lines.append("") + lines.append("OVERVIEWS") + lines.extend(_format_overviews_block(report.topics)) + lines.append("") + lines.append("VOCAB") + lines.append( + f" added ({len(report.vocab.added)}): " + f"{', '.join(report.vocab.added) if report.vocab.added else '(none)'}" + ) + lines.append( + f" removed ({len(report.vocab.removed)}): " + f"{', '.join(report.vocab.removed) if report.vocab.removed else '(none)'}" + ) + lines.append(f" unchanged: {report.vocab.unchanged_count} slugs") + return "\n".join(lines) + + +def format_json(report: DiffReport) -> str: + """Render the diff as pretty JSON for machine consumption.""" + return report.model_dump_json(indent=2) + + +def _format_topics_block(topics: TopicsDiff) -> list[str]: + """Render the per-topic membership rows, sorted by slug.""" + if not topics.per_slug: + return [" (no topics in either snapshot)"] + rows: list[str] = [] + for slug in sorted(topics.per_slug): + change = topics.per_slug[slug] + flag = " [FLAG]" if change.flagged_growth else "" + growth = "n/a" if change.growth_pct is None else f"{change.growth_pct * 100:+.1f}%" + rows.append( + f" {slug}{flag} A={change.members_a} B={change.members_b} " + f"added={change.added} removed={change.removed} " + f"unchanged={change.unchanged} growth={growth}" + ) + return rows + + +def _format_overviews_block(topics: TopicsDiff) -> list[str]: + """Render the overview-drift rows, with a flagged-shifts sub-block.""" + if not topics.per_slug: + return [" (no overviews to compare)"] + rows: list[str] = [] + flagged: list[tuple[str, float]] = [] + for slug in sorted(topics.per_slug): + change = topics.per_slug[slug] + if change.overview_flag == "not_comparable": + rows.append(f" {slug}: not comparable") + continue + sim = change.overview_similarity if change.overview_similarity is not None else 0.0 + rows.append(f" {slug}: {change.overview_flag} (sim={sim:.2f})") + if change.overview_flag == "different": + flagged.append((slug, sim)) + if flagged: + rows.append("") + rows.append(" Sharp shifts (low similarity):") + for slug, sim in flagged: + rows.append(f" {slug} sim={sim:.2f}") + return rows + + +# ------------------------------------------------------------------------- internals + + +def _primary_topic(item: Item) -> str | None: + """Return the item's primary_topic, or None when unenriched.""" + if item.enriched is None: + return None + return item.enriched.primary_topic + + +def _membership_by_topic(items: dict[str, Item]) -> dict[str, set[str]]: + """Group item ids by their primary_topic (only enriched items contribute).""" + membership: dict[str, set[str]] = {} + for item_id, item in items.items(): + primary = _primary_topic(item) + if primary is None: + continue + membership.setdefault(primary, set()).add(item_id) + return membership + + +def _growth_pct(members_a: int, members_b: int) -> float | None: + """Relative growth from A to B; None when A is empty (undefined growth).""" + if members_a == 0: + return None + return (members_b - members_a) / members_a + + +def _top_transitions( + transitions: Iterable[tuple[str | None, str | None]], + *, + top_n: int, +) -> list[Transition]: + """Aggregate (from, to) pairs and return the top N, with a stable order. + + Sort key: count desc, then `from_topic` asc, then `to_topic` asc. `None` + is coalesced to the empty string for ordering purposes only — the surface + representation keeps `None` intact. + """ + counter: Counter[tuple[str | None, str | None]] = Counter(transitions) + if not counter: + return [] + pairs = sorted( + counter.items(), + key=lambda kv: (-kv[1], kv[0][0] or "", kv[0][1] or ""), + ) + return [ + Transition(from_topic=from_t, to_topic=to_t, count=count) + for (from_t, to_t), count in pairs[:top_n] + ] + + +def _classify_overview( + text_a: str | None, + text_b: str | None, + *, + threshold: float, +) -> tuple[float | None, OverviewFlag]: + """Compute similarity and bucket it into the OverviewFlag enum.""" + if text_a is None or text_b is None: + return None, "not_comparable" + similarity = _tf_cosine(text_a, text_b) + if similarity >= _IDENTICAL_SIMILARITY: + return similarity, "identical" + if similarity >= threshold: + return similarity, "similar" + return similarity, "different" diff --git a/src/xbrain/executors/api.py b/src/xbrain/executors/api.py index 8c56c01..c6175e8 100644 --- a/src/xbrain/executors/api.py +++ b/src/xbrain/executors/api.py @@ -12,7 +12,7 @@ from xbrain.executors.base import EnrichmentJudgment from xbrain.llm_json import json_from_response -from xbrain.models import Item, Topic +from xbrain.models import ContentSourceSuccess, Item, Topic from xbrain.rubrics import ARTICLE_CHAR_LIMIT, load_rubric _MAX_TOKENS = 600 @@ -57,8 +57,9 @@ def _user_prompt(item: Item, vocab: list[Topic]) -> str: ] parts += [f"- {ln.url} (domain: {ln.domain})" for ln in item.links] if item.content and item.content.sources: + # Narrow to the success variant — only those carry `title`/`text`. for src in item.content.sources: - if src.ok and src.text: + if isinstance(src, ContentSourceSuccess) and src.text: parts += [ "", f"Linked article ({src.title or src.url}):", diff --git a/src/xbrain/extract/threads.py b/src/xbrain/extract/threads.py index 6c72b2f..38dd68a 100644 --- a/src/xbrain/extract/threads.py +++ b/src/xbrain/extract/threads.py @@ -9,7 +9,13 @@ from xbrain.extract.browser import is_logged_out, x_context from xbrain.extract.graphql import parse_tweets -from xbrain.models import Content, ContentSource, Item +from xbrain.models import ( + Content, + ContentSource, + ContentSourceFailure, + ContentSourceSuccess, + Item, +) _SETTLE_MS = 4000 @@ -42,16 +48,17 @@ def expand_threads(store: dict[str, Item], storage_state_path: Path, force: bool with x_context(storage_state_path) as context: for item in pending: text = _fetch_thread_text(context, item) - _attach_thread( - item, - ContentSource( + source: ContentSource + if text: + source = ContentSourceSuccess(kind="thread", url=item.url, text=text) + else: + source = ContentSourceFailure( kind="thread", url=item.url, - text=text or None, - ok=bool(text), - error=None if text else "No se pudo recuperar el hilo.", - ), - ) + failure_reason="empty_content", + error="No se pudo recuperar el hilo.", + ) + _attach_thread(item, source) return len(pending) diff --git a/src/xbrain/fetch.py b/src/xbrain/fetch.py index 232f8f7..0fb0723 100644 --- a/src/xbrain/fetch.py +++ b/src/xbrain/fetch.py @@ -4,6 +4,11 @@ records *structured evidence* (HTTP status + a categorised reason) so a broken link is demonstrable, not assumed (design §4). x.com links are skipped here — they are handled by `xbrain.fetch_x`. + +`FetchResult` is the in-memory return type of the extractors and is a tagged +union (`FetchSuccess | FetchFailure`) so callers cannot accidentally read a +success-only field off a failure record (or vice versa). The persisted +`ContentSource` is itself a tagged union — see `xbrain.models`. """ from __future__ import annotations @@ -13,14 +18,21 @@ import socket import urllib.error import urllib.request -from dataclasses import dataclass from datetime import datetime, timezone -from typing import Callable +from typing import Callable, Union from urllib.parse import urlparse import trafilatura +from pydantic import BaseModel -from xbrain.models import Content, ContentSource, FailureReason, Item +from xbrain.models import ( + Content, + ContentSource, + ContentSourceFailure, + ContentSourceSuccess, + FailureReason, + Item, +) _X_HOSTS = {"x.com", "www.x.com", "twitter.com", "www.twitter.com", "mobile.twitter.com"} _UA = "Mozilla/5.0 (compatible; XBrain/1.0)" @@ -29,18 +41,40 @@ _FIRECRAWL_TIMEOUT = 60 -@dataclass -class FetchResult: - """The structured outcome of one content-extraction attempt.""" +class FetchSuccess(BaseModel): + """The successful outcome of one content-extraction attempt. + + `text` is required: a success without text is not a success. The type + system enforces this — callers do not need to defensively check for + `result.text is not None`. + """ title: str | None = None - text: str | None = None + text: str http_status: int | None = None + attempts: int = 1 + + +class FetchFailure(BaseModel): + """The failed outcome of one content-extraction attempt. + + `failure_reason` may be `None` when the failure has not yet been + categorised (e.g. an uncaught network error). Callers that persist the + result fall back to ``"unknown_error"`` — a transient bucket — to satisfy + the required field on `ContentSourceFailure` while preserving the + "uncategorised = retry-worthy" invariant from #19. + """ + failure_reason: FailureReason | None = None error: str | None = None + http_status: int | None = None attempts: int = 1 +# The in-memory FetchResult — internal to this module. Never persisted (the +# wire format is `ContentSource`, which has its own discriminator). Callers +# narrow via `isinstance(result, FetchSuccess)` / `isinstance(result, FetchFailure)`. +FetchResult = Union[FetchSuccess, FetchFailure] ArticleExtractor = Callable[[str], FetchResult] @@ -107,10 +141,10 @@ def trafilatura_extract( downloaded = fetch(url) if downloaded is None: status, reason, error = prober(url) - return FetchResult(http_status=status, failure_reason=reason, error=error, attempts=1) + return FetchFailure(http_status=status, failure_reason=reason, error=error, attempts=1) text = extract(downloaded) if not text: - return FetchResult( + return FetchFailure( http_status=200, failure_reason="empty_content", error="La página se descargó pero no tiene un artículo extraíble.", @@ -118,7 +152,7 @@ def trafilatura_extract( ) metadata = trafilatura.extract_metadata(downloaded) title = metadata.title if metadata else None - return FetchResult(title=title, text=text, http_status=200, attempts=1) + return FetchSuccess(title=title, text=text, http_status=200, attempts=1) def _firecrawl_extract( @@ -143,9 +177,9 @@ def _firecrawl_extract( with opener(req, timeout=_FIRECRAWL_TIMEOUT) as resp: payload = json.loads(resp.read()) except (urllib.error.URLError, TimeoutError, OSError, ValueError) as exc: - return FetchResult(error=f"Firecrawl falló: {exc}", attempts=1) + return FetchFailure(error=f"Firecrawl falló: {exc}", attempts=1) if payload.get("success") is False or payload.get("error"): - return FetchResult( + return FetchFailure( error=f"Firecrawl falló: {payload.get('error') or 'respuesta de error'}", attempts=1, ) @@ -153,9 +187,11 @@ def _firecrawl_extract( text = data.get("markdown") if text: title = (data.get("metadata") or {}).get("title") - return FetchResult(title=title, text=text, http_status=200, attempts=1) - return FetchResult( - failure_reason="empty_content", error="Firecrawl no devolvió contenido.", attempts=1 + return FetchSuccess(title=title, text=text, http_status=200, attempts=1) + return FetchFailure( + failure_reason="empty_content", + error="Firecrawl no devolvió contenido.", + attempts=1, ) @@ -165,22 +201,64 @@ def extract_article( primary: Callable = trafilatura_extract, firecrawl: Callable = _firecrawl_extract, ) -> FetchResult: - """Default extractor: trafilatura, then Firecrawl for JS-rendered pages.""" + """Default extractor: trafilatura, then Firecrawl for JS-rendered pages. + + Switches on the `FetchResult` variant: a `FetchSuccess` is returned + as-is. A `FetchFailure` whose `failure_reason` is in + ``{"js_required", "empty_content"}`` triggers the Firecrawl fallback; + anything else (404, dns, ...) is terminal and Firecrawl is not called. + """ result = primary(url) - if result.text: + if isinstance(result, FetchSuccess): return result + # mypy now narrows `result` to FetchFailure for the rest of this function. if result.failure_reason not in ("js_required", "empty_content"): - return result # a hard failure (404, dns, ...) — Firecrawl will not help + return result # hard failure (404, dns, ...) — Firecrawl will not help fallback = firecrawl(url) if fallback is None: return result # Firecrawl not configured — keep the original evidence - if fallback.text: - fallback.attempts = result.attempts + 1 - return fallback - result.attempts = result.attempts + 1 # both attempts exhausted; keep first evidence - if fallback.error: - result.error = f"{result.error or 'sin contenido'} | Firecrawl: {fallback.error}" - return result + if isinstance(fallback, FetchSuccess): + return fallback.model_copy(update={"attempts": result.attempts + 1}) + # Both attempts exhausted — merge evidence and bump the attempt counter. + merged_error = ( + f"{result.error or 'sin contenido'} | Firecrawl: {fallback.error}" + if fallback.error + else result.error + ) + return result.model_copy(update={"attempts": result.attempts + 1, "error": merged_error}) + + +def _content_source_from(url: str, result: FetchResult) -> ContentSource: + """Build the persisted `ContentSource` variant matching the fetch outcome. + + Maps `FetchSuccess` → `ContentSourceSuccess` and `FetchFailure` → + `ContentSourceFailure`. A failure without a categorised reason falls back + to ``"empty_content"`` — the catch-all bucket — because + `ContentSourceFailure.failure_reason` is a required field (the type + system says a failure without a reason is not demonstrable evidence). + The free-form `error` string still carries the original explanation. + """ + if isinstance(result, FetchSuccess): + return ContentSourceSuccess( + kind="external_article", + url=url, + title=result.title, + text=result.text, + http_status=result.http_status, + attempts=result.attempts, + ) + return ContentSourceFailure( + kind="external_article", + url=url, + error=result.error, + http_status=result.http_status, + # `unknown_error` (a transient bucket) — NOT `empty_content` (terminal). + # An uncategorised failure must stay self-healing on the next + # `fetch_pending` run, mirroring the #19 invariant that a missing + # `failure_reason` was retry-worthy by default. + failure_reason=result.failure_reason or "unknown_error", + attempts=result.attempts, + ) def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Content: @@ -198,43 +276,71 @@ def fetch_item(item: Item, extractor: ArticleExtractor = extract_article) -> Con result = extractor(url) except Exception as exc: # noqa: BLE001 - one bad URL must not abort the batch new_sources.append( - ContentSource( + ContentSourceFailure( kind="external_article", url=url, - ok=False, + # Transient bucket: an uncaught extractor exception is + # almost always something that may succeed on the next + # run (network blip, intermittent SSL, …). #19 relied on + # this being retry-worthy; preserve that. + failure_reason="unknown_error", error=f"Error al descargar el artículo: {exc}", attempts=1, ) ) continue - if result.text: - new_sources.append( - ContentSource( - kind="external_article", - url=url, - title=result.title, - text=result.text, - ok=True, - http_status=result.http_status, - attempts=result.attempts, - ) - ) - else: - new_sources.append( - ContentSource( - kind="external_article", - url=url, - ok=False, - error=result.error, - http_status=result.http_status, - failure_reason=result.failure_reason, - attempts=result.attempts, - ) - ) + new_sources.append(_content_source_from(url, result)) kept = [s for s in item.content.sources if s.kind != "external_article"] if item.content else [] return Content(fetched_at=datetime.now(timezone.utc), sources=kept + new_sources) +# Failure reasons that justify an automatic retry on the next run. Everything +# else (`not_found`, `forbidden`, `paywall`, `js_required`, `empty_content`) +# is treated as terminal — only `--force` re-fetches those. +# +# `unknown_error` is the uncategorised-failure bucket — an extractor exception +# or any failure path that did not pin a specific reason. We retry by default: +# the alternative is silently classifying every uncaught failure as terminal, +# which would break the #19 invariant ("a failure without a categorised reason +# is anomalous; re-fetching gives it a chance to land on a known result"). +_TRANSIENT_FAILURES: frozenset[FailureReason] = frozenset({"timeout", "dns_error", "unknown_error"}) + + +def _should_refetch(content: Content | None, force: bool) -> bool: + """Return True if `fetch_pending` should (re)fetch this item. + + - `content is None` (never fetched) → True. + - `force=True` → True regardless of recorded state. + - Otherwise, True only if every `external_article` source on `content` + is a `ContentSourceFailure` whose `failure_reason` is in + `_TRANSIENT_FAILURES`. A single successful source (any + `ContentSourceSuccess`) or a categorised terminal failure (e.g. + `not_found`, `paywall`) skips. No `external_article` sources at all + → skip (there is nothing here for `fetch_pending`; the x.com sources + are handled by `fetch_x`). + + The pre-#20 helper read `src.ok` / `src.failure_reason` as Optionals; + after the tagged-union refactor the variant `isinstance` check is the + single switch and `failure_reason` is required on the failure variant, + so no `is None` special-case is needed. Pre-#20 records that lacked a + categorised `failure_reason` are migrated to `timeout` by + `_normalise_legacy_content_source` (see `xbrain.models`) — they + therefore get one automatic retry under #19, matching the prior + behaviour without an extra branch here. + """ + if content is None: + return True + if force: + return True + external = [s for s in content.sources if s.kind == "external_article"] + if not external: + return False + return all( + isinstance(src, ContentSourceFailure) and src.failure_reason in _TRANSIENT_FAILURES + for src in external + ) + + def fetch_pending( store: dict[str, Item], since: datetime | None = None, @@ -242,13 +348,18 @@ def fetch_pending( force: bool = False, extractor: ArticleExtractor = extract_article, ) -> int: - """Fetch external content for items that have non-x links and no content yet.""" + """Fetch external content for items that have non-x links and no content yet. + + A previous fetch whose only failures were transient (timeout, dns_error) + is automatically retried — `--force` is only needed to retry terminal + failures (404, paywall, …) or to re-hit already-successful items. + """ fetched = 0 for item in store.values(): # all links are x.com — handled by fetch_x if all(is_x_url(link.url) for link in item.links): continue - if item.content is not None and not force: + if not _should_refetch(item.content, force): continue if since and item.created_at < since: continue diff --git a/src/xbrain/fetch_x.py b/src/xbrain/fetch_x.py index c424024..bfb0d99 100644 --- a/src/xbrain/fetch_x.py +++ b/src/xbrain/fetch_x.py @@ -20,7 +20,13 @@ from xbrain.extract.browser import is_logged_out, x_context from xbrain.extract.graphql import parse_tweets from xbrain.fetch import is_x_url -from xbrain.models import Content, ContentSource, Item +from xbrain.models import ( + Content, + ContentSource, + ContentSourceFailure, + ContentSourceSuccess, + Item, +) _SETTLE_MS = 4000 _STATUS_RE = re.compile(r"/[^/]+/status/(\d+)") @@ -86,10 +92,9 @@ def on_response(response: Response) -> None: page.goto(url, wait_until="domcontentloaded") page.wait_for_timeout(_SETTLE_MS) except Exception: # noqa: BLE001 - navigation failure -> empty result - return ContentSource( + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="timeout", error="No se pudo cargar el tweet.", attempts=1, @@ -100,11 +105,10 @@ def on_response(response: Response) -> None: finally: page.close() if text: - return ContentSource(kind="x_article", url=url, text=text, ok=True, attempts=1) - return ContentSource( + return ContentSourceSuccess(kind="x_article", url=url, text=text, attempts=1) + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="empty_content", error="No se pudo recuperar el contenido del tweet.", attempts=1, @@ -119,10 +123,9 @@ def _fetch_rendered(context: BrowserContext, url: str) -> ContentSource: page.goto(url, wait_until="domcontentloaded") page.wait_for_timeout(_SETTLE_MS) except Exception: # noqa: BLE001 - navigation failure -> empty result - return ContentSource( + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="timeout", error="No se pudo cargar el artículo de X.", attempts=1, @@ -134,13 +137,12 @@ def _fetch_rendered(context: BrowserContext, url: str) -> ContentSource: page.close() text = trafilatura.extract(html) if text: - return ContentSource( - kind="x_article", url=url, text=text, ok=True, http_status=200, attempts=1 + return ContentSourceSuccess( + kind="x_article", url=url, text=text, http_status=200, attempts=1 ) - return ContentSource( + return ContentSourceFailure( kind="x_article", url=url, - ok=False, failure_reason="empty_content", error="No se pudo extraer el contenido del artículo de X.", attempts=1, diff --git a/src/xbrain/generate.py b/src/xbrain/generate.py index 74ab620..15d9503 100644 --- a/src/xbrain/generate.py +++ b/src/xbrain/generate.py @@ -6,8 +6,15 @@ from datetime import datetime, timezone from pathlib import Path +from xbrain.config import SUPPORTED_TOPIC_STYLES from xbrain.i18n import Strings, strings_for -from xbrain.models import Content, ContentSource, FailureReason, Item +from xbrain.models import ( + Content, + ContentSourceFailure, + ContentSourceSuccess, + FailureReason, + Item, +) from xbrain.notes_io import DEFAULT_TAIL, note_filename, slugify, title_of, user_tail, wrap logger = logging.getLogger(__name__) @@ -20,16 +27,20 @@ "dns_error": "dominio no resuelto", "js_required": "requiere JavaScript", "empty_content": "sin contenido extraíble", + "unknown_error": "error desconocido", } -def _broken_link_line(source: ContentSource, fetched_at: datetime) -> str: - """A one-line, human-readable record of a link that could not be fetched.""" +def _broken_link_line(source: ContentSourceFailure, fetched_at: datetime) -> str: + """A one-line, human-readable record of a link that could not be fetched. + + Accepts only the failure variant — the type system enforces that + `failure_reason` is present (no Optional check needed). + """ bits: list[str] = [] if source.http_status: bits.append(f"HTTP {source.http_status}") - if source.failure_reason: - bits.append(_FAILURE_ES.get(source.failure_reason, source.failure_reason)) + bits.append(_FAILURE_ES.get(source.failure_reason, source.failure_reason)) detail = " · ".join(bits) or "no se pudo recuperar" date = fetched_at.date().isoformat() return f"> ⚠ Enlace roto: <{source.url}> — {detail} (verificado {date})" @@ -41,6 +52,7 @@ def generate( since: datetime | None = None, until: datetime | None = None, output_language: str = "English", + topic_style: str = "wikilink", ) -> None: """Write _index.md, log.md and one note per noted item. @@ -48,7 +60,17 @@ def generate( index and log always reflect the whole store; `since`/`until` only narrow which item notes are (re)generated. `output_language` drives the section headers (Topics:, Content:, Summary, ...) via `xbrain.i18n`. + + `topic_style` controls how the in-body ``**Topics:**`` line is rendered: + ``"wikilink"`` (default) emits ``[[slug]]`` links, ``"hashtag"`` emits + Obsidian ``#slug`` tags. The toggle does not affect frontmatter ``tags:``, + the index ``## Topics`` section, or the topic-page post lists — those + stay wikilinks by design. """ + if topic_style not in SUPPORTED_TOPIC_STYLES: + raise ValueError( + f"Unsupported topic_style: {topic_style!r}. Supported: {SUPPORTED_TOPIC_STYLES}" + ) strings = strings_for(output_language) items = sorted(store.values(), key=lambda i: i.created_at, reverse=True) items_dir = output_dir / "items" @@ -57,7 +79,7 @@ def generate( (output_dir / "log.md").write_text(_render_log(items), encoding="utf-8") for item in items: if _has_note(item) and _in_range(item, since, until): - _write_note(items_dir, item, strings) + _write_note(items_dir, item, strings, topic_style) def _has_note(item: Item) -> bool: @@ -73,7 +95,7 @@ def _in_range(item: Item, since: datetime | None, until: datetime | None) -> boo return True -def _write_note(items_dir: Path, item: Item, strings: Strings) -> None: +def _write_note(items_dir: Path, item: Item, strings: Strings, topic_style: str) -> None: """Write an item's note, replacing only the generated region. The filename ends with the item's globally unique ``id``. That makes @@ -83,7 +105,7 @@ def _write_note(items_dir: Path, item: Item, strings: Strings) -> None: orphaned. """ path = items_dir / note_filename(item) - block = wrap(_render_note(item, strings)) + block = wrap(_render_note(item, strings, topic_style)) source = path if path.exists() else _stale_note(items_dir, item, path) if source is not None: tail = user_tail(source.read_text(encoding="utf-8"), DEFAULT_TAIL) @@ -109,34 +131,54 @@ def _stale_note(items_dir: Path, item: Item, current: Path) -> Path | None: return None -def _enrichment_lines(item: Item, strings: Strings) -> list[str]: - """Summary + topic links for an enriched item (empty if not enriched).""" +def _enrichment_lines(item: Item, strings: Strings, topic_style: str) -> list[str]: + """Summary + topic refs for an enriched item (empty if not enriched). + + `topic_style` selects the in-body topic-line rendering: + - ``"wikilink"`` → ``**Topics:** [[ai-coding]] · [[software-engineering]]`` + - ``"hashtag"`` → ``**Topics:** #ai-coding #software-engineering`` + + The hashtag mode uses a bare space as separator: Obsidian's tag parser + consumes a trailing middle-dot as part of the tag boundary on some + renderers, which produces broken tags. Frontmatter ``tags:`` are emitted + by ``_frontmatter`` and are independent of this toggle. + """ if not item.enriched: return [] lines: list[str] = [] if item.enriched.summary: lines += [item.enriched.summary, ""] if item.enriched.topics: - links = " · ".join(f"[[{t}]]" for t in item.enriched.topics) - lines += [f"**{strings.topics_label}:** {links}", ""] + if topic_style == "hashtag": + refs = " ".join(f"#{t}" for t in item.enriched.topics) + else: + refs = " · ".join(f"[[{t}]]" for t in item.enriched.topics) + lines += [f"**{strings.topics_label}:** {refs}", ""] return lines def _content_lines(content: Content, strings: Strings) -> list[str]: - """Rendered article bodies + broken-link evidence for a fetched item.""" + """Rendered article bodies + broken-link evidence for a fetched item. + + Switches on the `ContentSource` variant: the success variant is + rendered as a content block; the failure variant is rendered as a + broken-link line *only* for external articles and X articles (a + failed thread fetch is silently elided, matching the pre-refactor + behaviour — `source.kind` is what guarded that path before). + """ lines: list[str] = [] for source in content.sources: - if source.ok and source.text: + if isinstance(source, ContentSourceSuccess): heading = source.title or source.url lines += [f"## {strings.content_header}: {heading}", "", source.text, ""] - elif not source.ok and source.kind in ("external_article", "x_article"): + elif source.kind in ("external_article", "x_article"): lines += [_broken_link_line(source, content.fetched_at), ""] return lines -def _render_note(item: Item, strings: Strings) -> str: +def _render_note(item: Item, strings: Strings, topic_style: str) -> str: lines = [_frontmatter(item), "", f"# {title_of(item)}", ""] - lines += _enrichment_lines(item, strings) + lines += _enrichment_lines(item, strings, topic_style) lines += ["## Tweet", "", item.text, ""] if item.links: lines.append("## Enlaces") diff --git a/src/xbrain/models.py b/src/xbrain/models.py index 745fc87..c5b745b 100644 --- a/src/xbrain/models.py +++ b/src/xbrain/models.py @@ -2,10 +2,13 @@ from __future__ import annotations +import logging from datetime import datetime -from typing import Literal +from typing import Annotated, Any, Literal, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, BeforeValidator, Field, TypeAdapter + +logger = logging.getLogger(__name__) # The set of enrichment executor names — one source of truth shared by the # data model, the config loader and the enrichment phase. @@ -25,49 +28,188 @@ "dns_error", "js_required", "empty_content", + "unknown_error", # catch-all for uncategorised failures (e.g. an extractor + # exception we did not classify). Transient by default — `_should_refetch` + # in fetch.py treats it as retry-worthy on the next run, mirroring the + # pre-#20 behaviour where `failure_reason=None` meant transient. ] +# The set of content-source kinds — one source of truth shared by the data +# model, the fetch stage and the wiki renderer. +ContentKind = Literal["external_article", "x_article", "thread", "quoted_tweet"] + class Author(BaseModel): + """The X account that authored an item.""" + handle: str name: str class Link(BaseModel): + """One external URL extracted from an item's text.""" + url: str domain: str class Media(BaseModel): + """One photo or video attached to an item.""" + type: Literal["photo", "video"] url: str class ThreadInfo(BaseModel): + """Marker that an item is part of a multi-tweet thread.""" + is_thread: bool = True root_id: str position: int | None = None -class ContentSource(BaseModel): - kind: Literal["external_article", "x_article", "thread", "quoted_tweet"] +class ContentSourceSuccess(BaseModel): + """A fetched article whose body was successfully extracted. + + The success variant of the `ContentSource` tagged union. `text` is + required — a success without text is not a success — and the type + system enforces this at construction time. + """ + + outcome: Literal["success"] = "success" + kind: ContentKind url: str title: str | None = None - text: str | None = None - ok: bool = True + text: str + http_status: int | None = None + # extraction attempts: 1 = single pass, 2 = + Firecrawl fallback; + # 0 only on pre-Fase-2 records. + attempts: int = 0 + + +class ContentSourceFailure(BaseModel): + """A fetched article whose body could not be extracted. + + The failure variant of the `ContentSource` tagged union — structured + broken-link evidence so the wiki can render a ``⚠ Enlace roto`` line + rather than pretending the link was never there (design §4). + `failure_reason` is required: a failure without a reason is not + demonstrable evidence. + """ + + outcome: Literal["failure"] = "failure" + kind: ContentKind + url: str + failure_reason: FailureReason error: str | None = None http_status: int | None = None - failure_reason: FailureReason | None = None - # extraction attempts: 1 = single pass, 2 = + Firecrawl fallback; 0 only on pre-Fase-2 records + # extraction attempts: 1 = single pass, 2 = + Firecrawl fallback; + # 0 only on pre-Fase-2 records. attempts: int = 0 +def _normalise_legacy_content_source(value: Any) -> Any: + """Map the legacy ``{ok: bool, ...}`` shape to the tagged-union shape. + + Older ``data/items.json`` records (pre-#20) carry ``ok: True`` / + ``ok: False`` instead of ``outcome: "success"`` / ``outcome: "failure"``. + The mapping is one-to-one: + + - ``ok=True`` (success) → ``outcome="success"`` + - ``ok=False`` (failure) → ``outcome="failure"`` + + Records that already carry ``outcome`` are returned unchanged. Records + that have neither discriminator are rejected — silently inventing one + would mask data corruption. + + Fields irrelevant to the new variant (e.g. ``title`` / ``text`` on the + failure variant) are dropped during normalisation so the resulting dict + matches the variant's declared fields exactly. This is purely defensive + — extra fields on a pydantic model are ignored by default, but stripping + them up front keeps the on-the-wire shape clean once the record is + re-dumped. + """ + if not isinstance(value, dict): + return value + if "outcome" in value: + return value + if "ok" not in value: + # Include enough context to find the offending record in a big file. + url = value.get("url", "") + raise ValueError( + f"ContentSource record missing both 'outcome' and 'ok' " + f"discriminator (url={url!r}); the record cannot be safely " + "categorised as success or failure." + ) + payload = {k: v for k, v in value.items() if k != "ok"} + payload["outcome"] = "success" if value["ok"] else "failure" + if payload["outcome"] == "success": + # success has no failure_reason / error — drop if present so the + # re-dumped record is clean (pydantic ignores extras anyway). + payload.pop("failure_reason", None) + payload.pop("error", None) + else: + # failure has no title / text + payload.pop("title", None) + payload.pop("text", None) + # Legacy records sometimes recorded a failure (`ok=False`) with no + # categorised `failure_reason` (e.g. an HTTP 429 that the old code + # did not map). The new variant requires the field — bucket those + # under `unknown_error` (a transient retry-worthy reason added in + # the #20 review pass, see `xbrain.fetch._TRANSIENT_FAILURES`). + # `unknown_error` is preferable to `timeout` here because the + # actual cause is unknown — "timeout" would be a lie that hides + # 429s, SSL handshake failures, and other distinct error modes. + if payload.get("failure_reason") in (None, ""): + payload["failure_reason"] = "unknown_error" + logger.warning( + "Legacy ContentSource without failure_reason bucketed as " + "'unknown_error' (url=%s). The next `fetch_pending` run will " + "retry it; use `--force` to suppress the retry.", + value.get("url", ""), + ) + return payload + + +# The persisted ContentSource type — a discriminated union over the success +# and failure variants, wrapped in an outer `BeforeValidator` that normalises +# the legacy `ok: bool` records on read so existing `data/items.json` files +# keep working. +# +# The wrapping is layered on purpose: the `BeforeValidator` must run BEFORE +# pydantic dispatches on the `outcome` discriminator. If both annotations were +# on the same `Annotated`, the discriminator check would run first and reject +# legacy records that carry `ok` instead of `outcome`. The outer Annotated +# guarantees the right ordering. +_ContentSourceTagged = Annotated[ + Union[ContentSourceSuccess, ContentSourceFailure], + Field(discriminator="outcome"), +] +ContentSource = Annotated[ + _ContentSourceTagged, + BeforeValidator(_normalise_legacy_content_source), +] + + +# A TypeAdapter is the documented pydantic-v2 entry point for validating / +# dumping a discriminated-union *type alias* (since the alias itself is not a +# class with `.model_validate`). Tests use this; production code goes through +# `Item` and `Content` which carry the union as a field. +ContentSourceAdapter: TypeAdapter[Union[ContentSourceSuccess, ContentSourceFailure]] = TypeAdapter( + ContentSource +) + + class Content(BaseModel): + """The fetched article(s) attached to an item, with their fetch timestamp.""" + fetched_at: datetime sources: list[ContentSource] = Field(default_factory=list) class Enrichment(BaseModel): + """LLM-generated summary and topic assignment for an item.""" + enriched_at: datetime executor: ExecutorName summary: str | None = None @@ -99,6 +241,8 @@ class TopicPage(BaseModel): class Item(BaseModel): + """One captured X post (bookmark or own tweet) with all its derived data.""" + id: str source: SourceName url: str @@ -116,16 +260,22 @@ class Item(BaseModel): class SourceCursor(BaseModel): + """Per-source extractor cursor: where we left off last run.""" + last_seen_id: str | None = None last_run: datetime | None = None class ArchiveImport(BaseModel): + """Marker recording a one-off X archive import.""" + file: str at: datetime class State(BaseModel): + """Top-level extractor state persisted in `data/state.json`.""" + bookmarks: SourceCursor = Field(default_factory=SourceCursor) own_tweets: SourceCursor = Field(default_factory=SourceCursor) archive_imported: ArchiveImport | None = None diff --git a/src/xbrain/notes_io.py b/src/xbrain/notes_io.py index d3325d3..552d52b 100644 --- a/src/xbrain/notes_io.py +++ b/src/xbrain/notes_io.py @@ -54,10 +54,17 @@ def slugify(text: str) -> str: def title_of(item: Item) -> str: - """A display title for an item — a fetched article title, else the text.""" + """A display title for an item — a fetched article title, else the text. + + Only the success variant of `ContentSource` carries a `title` field; the + failure variant has no title, so the isinstance narrowing both satisfies + mypy and silently skips broken-link sources. + """ + from xbrain.models import ContentSourceSuccess + if item.content: for source in item.content.sources: - if source.title: + if isinstance(source, ContentSourceSuccess) and source.title: return source.title return item.text.replace("\n", " ")[:80] or item.id diff --git a/src/xbrain/worksheet.py b/src/xbrain/worksheet.py index bf0ecb8..35e657b 100644 --- a/src/xbrain/worksheet.py +++ b/src/xbrain/worksheet.py @@ -12,15 +12,21 @@ from datetime import datetime, timezone from pathlib import Path -from xbrain.models import Item, Topic +from xbrain.models import ContentSourceSuccess, Item, Topic from xbrain.rubrics import ARTICLE_CHAR_LIMIT, load_rubric def _article_text(item: Item) -> str | None: + """First successful fetched article body, truncated, or None. + + Only the success variant of `ContentSource` carries `text`. The + isinstance narrowing satisfies mypy and silently skips broken-link + sources, matching the pre-#20 ``src.ok and src.text`` behaviour. + """ if not item.content: return None for src in item.content.sources: - if src.ok and src.text: + if isinstance(src, ContentSourceSuccess) and src.text: return src.text[:ARTICLE_CHAR_LIMIT] return None diff --git a/tests/test_cli.py b/tests/test_cli.py index aff456d..fc00c1d 100644 --- a/tests/test_cli.py +++ b/tests/test_cli.py @@ -283,7 +283,7 @@ def test_cli_fetch_persists_partial_work_when_a_stage_raises(tmp_path, monkeypat # earlier stages already produced. This proves that `finally` contract. _setup_repo(tmp_path, monkeypatch) import xbrain.cli as cli - from xbrain.models import Content, ContentSource + from xbrain.models import Content, ContentSourceSuccess from xbrain.store import load_store save_store({"1": _linked_item("1")}, tmp_path / "data" / "items.json") @@ -294,11 +294,10 @@ def _fake_fetch_pending(store, *a, **k): store["1"].content = Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceSuccess( kind="external_article", url="https://example.com/p", text="cuerpo", - ok=True, ) ], ) diff --git a/tests/test_config.py b/tests/test_config.py index f70b416..5a2ddbe 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -154,3 +154,44 @@ def test_config_topics_threshold_is_configurable(tmp_path): encoding="utf-8", ) assert load_config(tmp_path).topics_resynth_threshold == 50 + + +def test_load_config_defaults_topic_style_to_wikilink(tmp_path: Path): + """No `[output] topic_style` key → wikilink default (backwards-compat).""" + _write_repo(tmp_path) + cfg = load_config(tmp_path) + assert cfg.topic_style == "wikilink" + + +def test_load_config_round_trips_hashtag_topic_style(tmp_path: Path): + """Explicit `topic_style = "hashtag"` round-trips.""" + (tmp_path / "config.toml").write_text( + "[paths]\n" + 'vault = "/tmp/vault"\n' + 'output_subdir = "learnings/x-knowledge"\n' + 'data_dir = "data"\n' + "[x]\n" + 'handle = "vgonpa"\n' + "[output]\n" + 'topic_style = "hashtag"\n', + encoding="utf-8", + ) + cfg = load_config(tmp_path) + assert cfg.topic_style == "hashtag" + + +def test_load_config_rejects_unknown_topic_style(tmp_path: Path): + """Unknown topic_style fails fast with the supported list in the message.""" + (tmp_path / "config.toml").write_text( + "[paths]\n" + 'vault = "/tmp/vault"\n' + 'output_subdir = "learnings/x-knowledge"\n' + 'data_dir = "data"\n' + "[x]\n" + 'handle = "vgonpa"\n' + "[output]\n" + 'topic_style = "bogus"\n', + encoding="utf-8", + ) + with pytest.raises(ValueError, match="topic_style"): + load_config(tmp_path) diff --git a/tests/test_diff.py b/tests/test_diff.py new file mode 100644 index 0000000..8537714 --- /dev/null +++ b/tests/test_diff.py @@ -0,0 +1,578 @@ +# tests/test_diff.py +"""Tests for `xbrain.diff` — pure module + CLI integration. + +The module-level tests do not hit `xbrain.snapshot`: they pass plain data +directories. That mirrors the production contract — `diff_snapshots` does not +know what a snapshot is. The CLI tests round-trip through `snapshot_create` +to exercise the verb the user actually types. +""" + +from __future__ import annotations + +import json +from datetime import datetime, timezone +from pathlib import Path + +import pytest +from typer.testing import CliRunner + +from xbrain.cli import app +from xbrain.diff import ( + DiffReport, + _tf_cosine, + diff_snapshots, + format_json, + format_text, +) +from xbrain.models import Author, Enrichment, Item, Topic, TopicPage +from xbrain.rubrics import save_vocab +from xbrain.snapshot import snapshot_create +from xbrain.store import save_store, save_topic_pages + +runner = CliRunner() + + +def _item( + item_id: str, + *, + primary: str | None = None, + extra_topics: list[str] | None = None, +) -> Item: + """Build an Item, optionally with an enrichment record.""" + item = Item( + id=item_id, + source="bookmark", + url=f"https://x.com/a/status/{item_id}", + author=Author(handle="alice", name="Alice"), + text=f"Note {item_id}", + created_at=datetime(2026, 5, 10, tzinfo=timezone.utc), + captured_at=datetime(2026, 5, 16, tzinfo=timezone.utc), + ) + if primary is not None: + topics = [primary] + if extra_topics: + topics.extend(extra_topics) + item.enriched = Enrichment( + enriched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + executor="api", + summary="s", + primary_topic=primary, + topics=topics, + ) + return item + + +def _seed( + data_dir: Path, + *, + items: dict[str, Item] | None = None, + vocab_slugs: list[str] | None = None, + pages: dict[str, TopicPage] | None = None, +) -> None: + """Populate a data dir with whatever subset of artifacts a test needs.""" + data_dir.mkdir(parents=True, exist_ok=True) + if items is not None: + save_store(items, data_dir / "items.json") + if vocab_slugs is not None: + save_vocab( + [Topic(slug=slug, description=f"desc for {slug}") for slug in vocab_slugs], + data_dir / "vocab.yaml", + ) + if pages is not None: + save_topic_pages(pages, data_dir / "topics.json") + + +def _page(slug: str, overview: str, count: int = 5) -> TopicPage: + return TopicPage( + slug=slug, + overview=overview, + notes=[], + synthesized_at=datetime(2026, 5, 18, tzinfo=timezone.utc), + post_count_at_synth=count, + ) + + +# --------------------------------------------------------------------- TF-IDF cosine + + +def test_tfidf_identical_strings_return_one() -> None: + text = "the quick brown fox jumps" + assert _tf_cosine(text, text) == pytest.approx(1.0) + + +def test_tfidf_empty_or_singleton_returns_zero() -> None: + assert _tf_cosine("", "anything") == 0.0 + assert _tf_cosine("text", "") == 0.0 + # Single-character tokens get filtered (length >= 2 floor) + assert _tf_cosine("a a a", "b b b") == 0.0 + + +def test_tfidf_disjoint_vocabularies_return_zero() -> None: + assert _tf_cosine("alpha bravo charlie", "xenon yankee zulu") == 0.0 + + +def test_tfidf_partial_overlap_is_between_zero_and_one() -> None: + text_a = "the agentic workflow runs locally with full traces" + text_b = "the agentic workflow runs locally with full visibility" + similarity = _tf_cosine(text_a, text_b) + assert 0.5 < similarity < 1.0 + + +def test_tfidf_accented_tokens_survive_lowercasing() -> None: + # The tokenizer keeps a-zà-ÿ to handle Spanish/French overviews. An + # accented text compared to itself must score 1.0 — proves we didn't + # silently strip the accented chars to nothing. + text = "el árbol creció con vigor" + assert _tf_cosine(text, text) == pytest.approx(1.0) + + +# ----------------------------------------------------------------- diff_snapshots end-to-end + + +def test_identical_snapshots_produce_empty_diff(tmp_path: Path) -> None: + data = tmp_path / "data" + items = {"1": _item("1", primary="ai-coding"), "2": _item("2", primary="misc")} + _seed(data, items=items, vocab_slugs=["ai-coding", "misc"]) + + report = diff_snapshots(data, data) + + assert report.summary.reassigned == 0 + assert report.summary.reassigned_pct == 0.0 + assert report.items.top_transitions == [] + assert report.vocab.added == [] + assert report.vocab.removed == [] + for change in report.topics.per_slug.values(): + assert change.added == 0 + assert change.removed == 0 + + +def test_reassigned_primary_topic_counts_correctly(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed( + a, + items={"1": _item("1", primary="ai-coding"), "2": _item("2", primary="misc")}, + vocab_slugs=["ai-coding", "misc"], + ) + _seed( + b, + items={ + # Item 1 reassigned ai-coding -> software-engineering + "1": _item("1", primary="software-engineering"), + # Item 2 unchanged + "2": _item("2", primary="misc"), + }, + vocab_slugs=["ai-coding", "misc", "software-engineering"], + ) + + report = diff_snapshots(a, b) + + assert report.summary.reassigned == 1 + # Only one transition row, with the right pair + assert len(report.items.top_transitions) == 1 + transition = report.items.top_transitions[0] + assert transition.from_topic == "ai-coding" + assert transition.to_topic == "software-engineering" + assert transition.count == 1 + + +def test_items_only_in_one_snapshot_do_not_count_as_reassigned(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed(a, items={"1": _item("1", primary="ai-coding")}, vocab_slugs=["ai-coding"]) + _seed( + b, + items={ + "1": _item("1", primary="ai-coding"), + "2": _item("2", primary="misc"), # new in B + }, + vocab_slugs=["ai-coding", "misc"], + ) + + report = diff_snapshots(a, b) + + assert report.summary.reassigned == 0 + # Item 2's membership shows up in the misc topic + assert report.topics.per_slug["misc"].members_b == 1 + assert report.topics.per_slug["misc"].added == 1 + + +def test_unenriched_to_enriched_is_not_a_reassignment(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed(a, items={"1": _item("1")}, vocab_slugs=["misc"]) # unenriched in A + _seed(b, items={"1": _item("1", primary="misc")}, vocab_slugs=["misc"]) + + report = diff_snapshots(a, b) + + assert report.summary.reassigned == 0 + # But the transition row IS surfaced so the user sees the move + transitions = report.items.top_transitions + assert any(t.from_topic is None and t.to_topic == "misc" for t in transitions) + + +def test_topic_growth_flag_triggers_above_threshold(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + # 10 members in A, 12 in B → growth = 20% → flagged (above 10% threshold, base >= 5) + items_a = {str(i): _item(str(i), primary="ai-coding") for i in range(10)} + items_b = {str(i): _item(str(i), primary="ai-coding") for i in range(12)} + _seed(a, items=items_a, vocab_slugs=["ai-coding"]) + _seed(b, items=items_b, vocab_slugs=["ai-coding"]) + + report = diff_snapshots(a, b) + + change = report.topics.per_slug["ai-coding"] + assert change.flagged_growth is True + assert change.growth_pct == pytest.approx(0.2) + + +def test_topic_growth_flag_respects_5_member_floor(tmp_path: Path) -> None: + """A 3->5 jump is 67% growth but on a tiny base — should not flag.""" + a = tmp_path / "a" + b = tmp_path / "b" + items_a = {str(i): _item(str(i), primary="misc") for i in range(3)} + items_b = {str(i): _item(str(i), primary="misc") for i in range(5)} + _seed(a, items=items_a, vocab_slugs=["misc"]) + _seed(b, items=items_b, vocab_slugs=["misc"]) + + change = diff_snapshots(a, b).topics.per_slug["misc"] + assert change.flagged_growth is False + assert change.growth_pct == pytest.approx(2 / 3) + + +def test_growth_pct_is_none_when_members_a_is_zero(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed(a, items={}, vocab_slugs=["ai-coding"]) + _seed(b, items={"1": _item("1", primary="ai-coding")}, vocab_slugs=["ai-coding"]) + + change = diff_snapshots(a, b).topics.per_slug["ai-coding"] + assert change.growth_pct is None + assert change.flagged_growth is False + + +def test_overview_similarity_identical_flags_identical(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + overview = "The arc from autocomplete to agent orchestration across the year." + _seed( + a, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={"ai-coding": _page("ai-coding", overview)}, + ) + _seed( + b, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={"ai-coding": _page("ai-coding", overview)}, + ) + + change = diff_snapshots(a, b).topics.per_slug["ai-coding"] + assert change.overview_flag == "identical" + assert change.overview_similarity == pytest.approx(1.0) + + +def test_overview_similarity_missing_one_side_is_not_comparable(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed( + a, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={"ai-coding": _page("ai-coding", "Overview present in A.")}, + ) + _seed( + b, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + # no topics.json on B + ) + + change = diff_snapshots(a, b).topics.per_slug["ai-coding"] + assert change.overview_flag == "not_comparable" + assert change.overview_similarity is None + + +def test_overview_similarity_disjoint_flags_different(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed( + a, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={"ai-coding": _page("ai-coding", "alpha bravo charlie delta echo")}, + ) + _seed( + b, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={"ai-coding": _page("ai-coding", "xenon yankee zulu omega tango")}, + ) + + change = diff_snapshots(a, b).topics.per_slug["ai-coding"] + assert change.overview_flag == "different" + assert change.overview_similarity == pytest.approx(0.0) + + +def test_overview_similarity_similar_text_above_threshold(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed( + a, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={ + "ai-coding": _page( + "ai-coding", + "the agentic workflow runs locally with full traces", + ) + }, + ) + _seed( + b, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={ + "ai-coding": _page( + "ai-coding", + "the agentic workflow runs locally with full visibility", + ) + }, + ) + + change = diff_snapshots(a, b).topics.per_slug["ai-coding"] + assert change.overview_flag in ("similar", "identical") + assert change.overview_similarity is not None and change.overview_similarity > 0.5 + + +def test_vocab_added_and_removed_set_correctly(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed(a, items={}, vocab_slugs=["alpha", "bravo", "charlie"]) + _seed(b, items={}, vocab_slugs=["bravo", "charlie", "delta"]) + + report = diff_snapshots(a, b) + + assert report.vocab.added == ["delta"] + assert report.vocab.removed == ["alpha"] + assert report.vocab.unchanged_count == 2 + + +def test_missing_vocab_yaml_on_one_side_does_not_crash(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed(a, items={}, vocab_slugs=["alpha", "bravo"]) + _seed(b, items={}) # no vocab.yaml on B at all + + report = diff_snapshots(a, b) + + assert report.vocab.added == [] + assert sorted(report.vocab.removed) == ["alpha", "bravo"] + + +def test_missing_topics_json_on_one_side_does_not_crash(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + _seed( + a, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + pages={"ai-coding": _page("ai-coding", "Overview text.")}, + ) + _seed( + b, + items={"1": _item("1", primary="ai-coding")}, + vocab_slugs=["ai-coding"], + ) + + change = diff_snapshots(a, b).topics.per_slug["ai-coding"] + assert change.overview_flag == "not_comparable" + + +def test_top_transitions_sorted_by_count_then_pair(tmp_path: Path) -> None: + a = tmp_path / "a" + b = tmp_path / "b" + # 3 items: 2x ai-coding -> software, 1x misc -> software + _seed( + a, + items={ + "1": _item("1", primary="ai-coding"), + "2": _item("2", primary="ai-coding"), + "3": _item("3", primary="misc"), + }, + vocab_slugs=["ai-coding", "misc", "software"], + ) + _seed( + b, + items={ + "1": _item("1", primary="software"), + "2": _item("2", primary="software"), + "3": _item("3", primary="software"), + }, + vocab_slugs=["ai-coding", "misc", "software"], + ) + + transitions = diff_snapshots(a, b).items.top_transitions + # The first one is the most frequent + assert transitions[0].from_topic == "ai-coding" + assert transitions[0].count == 2 + assert transitions[1].from_topic == "misc" + assert transitions[1].count == 1 + + +def test_format_text_renders_section_headers(tmp_path: Path) -> None: + a = tmp_path / "a" + _seed(a, items={"1": _item("1", primary="misc")}, vocab_slugs=["misc"]) + text = format_text(diff_snapshots(a, a)) + assert "ITEMS" in text + assert "TOPICS" in text + assert "OVERVIEWS" in text + assert "VOCAB" in text + + +def test_format_json_round_trips_through_model(tmp_path: Path) -> None: + a = tmp_path / "a" + _seed(a, items={"1": _item("1", primary="misc")}, vocab_slugs=["misc"]) + text = format_json(diff_snapshots(a, a)) + parsed = json.loads(text) + # Top-level keys match the model + assert set(parsed.keys()) == {"summary", "items", "topics", "vocab"} + # And the model can re-validate its own JSON + DiffReport.model_validate_json(text) + + +# ------------------------------------------------------------------------- CLI + + +def _setup_repo(tmp_path: Path, monkeypatch) -> Path: + (tmp_path / "config.toml").write_text( + "[paths]\n" + f'vault = "{tmp_path / "vault"}"\n' + 'output_subdir = "x-knowledge"\n' + 'data_dir = "data"\n' + "[x]\n" + 'handle = "vgonpa"\n', + encoding="utf-8", + ) + (tmp_path / "vault").mkdir() + data_dir = tmp_path / "data" + data_dir.mkdir() + monkeypatch.setenv("XBRAIN_REPO_ROOT", str(tmp_path)) + return data_dir + + +def test_cli_diff_compares_two_named_snapshots(tmp_path: Path, monkeypatch) -> None: + data_dir = _setup_repo(tmp_path, monkeypatch) + _seed(data_dir, items={"1": _item("1", primary="alpha")}, vocab_slugs=["alpha"]) + snap_a, _ = snapshot_create(data_dir, command="manual", dir_label="checkpoint-a") + # Mutate then snapshot again — the reassignment is observable + _seed(data_dir, items={"1": _item("1", primary="bravo")}, vocab_slugs=["alpha", "bravo"]) + snap_b, _ = snapshot_create(data_dir, command="manual", dir_label="checkpoint-b") + + result = runner.invoke(app, ["diff", snap_a.name, snap_b.name]) + + assert result.exit_code == 0, result.output + assert "ITEMS" in result.stdout + assert "alpha" in result.stdout + assert "bravo" in result.stdout + + +def test_cli_diff_defaults_b_to_live_data_dir(tmp_path: Path, monkeypatch) -> None: + data_dir = _setup_repo(tmp_path, monkeypatch) + _seed(data_dir, items={"1": _item("1", primary="alpha")}, vocab_slugs=["alpha"]) + snap, _ = snapshot_create(data_dir, command="manual", dir_label="before") + # Mutate the live data/ — that is the B side without an explicit name. + _seed(data_dir, items={"1": _item("1", primary="bravo")}, vocab_slugs=["alpha", "bravo"]) + + result = runner.invoke(app, ["diff", snap.name]) + + assert result.exit_code == 0, result.output + assert "live data/" in result.stdout + # The reassignment shows up + assert "reassigned: 1" in result.stdout + + +def test_cli_diff_format_json_parses_back(tmp_path: Path, monkeypatch) -> None: + data_dir = _setup_repo(tmp_path, monkeypatch) + _seed(data_dir, items={"1": _item("1", primary="alpha")}, vocab_slugs=["alpha"]) + snap, _ = snapshot_create(data_dir, command="manual", dir_label="x") + + result = runner.invoke(app, ["diff", snap.name, "--format", "json"]) + + assert result.exit_code == 0, result.output + parsed = json.loads(result.stdout) + assert set(parsed.keys()) == {"summary", "items", "topics", "vocab"} + + +def test_cli_diff_unknown_snapshot_exits_1(tmp_path: Path, monkeypatch) -> None: + _setup_repo(tmp_path, monkeypatch) + result = runner.invoke(app, ["diff", "does-not-exist"]) + assert result.exit_code == 1 + assert "No snapshot named" in result.output + + +def test_cli_diff_unknown_format_exits_1(tmp_path: Path, monkeypatch) -> None: + data_dir = _setup_repo(tmp_path, monkeypatch) + _seed(data_dir, items={"1": _item("1", primary="alpha")}, vocab_slugs=["alpha"]) + snap, _ = snapshot_create(data_dir, command="manual", dir_label="x") + + result = runner.invoke(app, ["diff", snap.name, "--format", "xml"]) + + assert result.exit_code == 1 + assert "format" in result.output.lower() + + +# --- PR #28 review fixes: input validation --- + + +def test_diff_snapshots_raises_when_dir_does_not_exist(tmp_path): + """Missing snapshot dir surfaces as clean FileNotFoundError, not silent empty diff.""" + import pytest + + from xbrain.diff import diff_snapshots + + real = tmp_path / "real" + real.mkdir() + (real / "items.json").write_text("{}", encoding="utf-8") + ghost = tmp_path / "does-not-exist" + + with pytest.raises(FileNotFoundError, match="directory not found"): + diff_snapshots(real, ghost) + + +def test_diff_snapshots_raises_when_both_sides_are_empty(tmp_path): + """Both sides empty → clean FileNotFoundError naming the dirs. + + Guards against the silent-failure case where data/ was deleted out-of-band + and diff would otherwise report a clean 'no changes' against a real snapshot. + """ + import pytest + + from xbrain.diff import diff_snapshots + + a = tmp_path / "a" + a.mkdir() + b = tmp_path / "b" + b.mkdir() + + with pytest.raises(FileNotFoundError, match="Both diff sides are empty"): + diff_snapshots(a, b) + + +def test_diff_snapshots_wraps_corrupt_file_with_context(tmp_path): + """A corrupt items.json surfaces with the path in the error message.""" + import pytest + + from xbrain.diff import diff_snapshots + + a = tmp_path / "a" + a.mkdir() + (a / "items.json").write_text("not json at all", encoding="utf-8") + b = tmp_path / "b" + b.mkdir() + (b / "items.json").write_text("{}", encoding="utf-8") + + with pytest.raises(ValueError, match="failed to load.*items.json"): + diff_snapshots(a, b) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index 2f83810..759bf7b 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -4,15 +4,25 @@ from datetime import datetime, timezone from xbrain.fetch import ( + FetchFailure, FetchResult, + FetchSuccess, _categorize_url_error, _probe_status, _reason_for_status, + _should_refetch, fetch_item, fetch_pending, trafilatura_extract, ) -from xbrain.models import Author, Item, Link +from xbrain.models import ( + Author, + Content, + ContentSourceFailure, + ContentSourceSuccess, + Item, + Link, +) def _item(item_id: str, urls: list[str]) -> Item: @@ -29,7 +39,7 @@ def _item(item_id: str, urls: list[str]) -> Item: def _fake_extractor(url: str) -> FetchResult: - return FetchResult(title="Título", text=f"cuerpo de {url}", http_status=200) + return FetchSuccess(title="Título", text=f"cuerpo de {url}", http_status=200) def test_reason_for_status_maps_http_codes(): @@ -63,9 +73,9 @@ def test_trafilatura_extract_success(): extract=lambda html: "el cuerpo", prober=lambda url: (None, None, ""), ) + assert isinstance(result, FetchSuccess) assert result.text == "el cuerpo" assert result.http_status == 200 - assert result.failure_reason is None def test_trafilatura_extract_empty_content_when_no_article(): @@ -75,7 +85,7 @@ def test_trafilatura_extract_empty_content_when_no_article(): extract=lambda html: None, prober=lambda url: (None, None, ""), ) - assert result.text is None + assert isinstance(result, FetchFailure) assert result.failure_reason == "empty_content" @@ -86,15 +96,17 @@ def test_trafilatura_extract_probes_when_download_fails(): extract=lambda html: None, prober=lambda url: (404, "not_found", "HTTP 404"), ) + assert isinstance(result, FetchFailure) assert result.http_status == 404 assert result.failure_reason == "not_found" def test_fetch_item_extracts_external_articles(): content = fetch_item(_item("1", ["https://example.com/p"]), _fake_extractor) - assert content.sources[0].kind == "external_article" - assert content.sources[0].ok is True - assert content.sources[0].text == "cuerpo de https://example.com/p" + source = content.sources[0] + assert isinstance(source, ContentSourceSuccess) + assert source.kind == "external_article" + assert source.text == "cuerpo de https://example.com/p" def test_fetch_item_skips_x_urls(): @@ -106,11 +118,12 @@ def test_fetch_item_skips_x_urls(): def test_fetch_item_records_failure_evidence(): content = fetch_item( _item("1", ["https://example.com/p"]), - lambda url: FetchResult(http_status=404, failure_reason="not_found", error="HTTP 404"), + lambda url: FetchFailure(http_status=404, failure_reason="not_found", error="HTTP 404"), ) - assert content.sources[0].ok is False - assert content.sources[0].http_status == 404 - assert content.sources[0].failure_reason == "not_found" + source = content.sources[0] + assert isinstance(source, ContentSourceFailure) + assert source.http_status == 404 + assert source.failure_reason == "not_found" def test_fetch_item_isolates_extractor_exception(): @@ -119,17 +132,16 @@ def _raising(url): content = fetch_item(_item("1", ["https://example.com/p"]), _raising) assert len(content.sources) == 1 - assert content.sources[0].ok is False - assert "boom" in content.sources[0].error + source = content.sources[0] + assert isinstance(source, ContentSourceFailure) + assert "boom" in (source.error or "") def test_fetch_item_preserves_non_external_sources_on_refetch(): - from xbrain.models import Content, ContentSource - item = _item("1", ["https://example.com/p"]) item.content = Content( fetched_at=datetime.now(timezone.utc), - sources=[ContentSource(kind="thread", url="u", text="hilo", ok=True)], + sources=[ContentSourceSuccess(kind="thread", url="u", text="hilo")], ) content = fetch_item(item, _fake_extractor) kinds = {s.kind for s in content.sources} @@ -207,54 +219,56 @@ def opener(req, timeout=0): return _Resp(json.dumps(payload).encode()) result = _firecrawl_extract("https://e.com/a", opener=opener) - assert result is not None + assert isinstance(result, FetchSuccess) assert result.text == "el cuerpo" assert result.title == "T" def test_extract_article_falls_back_to_firecrawl_on_js_required(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article def primary(url): - return FetchResult(failure_reason="js_required", error="js", attempts=1) + return FetchFailure(failure_reason="js_required", error="js", attempts=1) def firecrawl(url): - return FetchResult(text="rescatado por firecrawl", attempts=1) + return FetchSuccess(text="rescatado por firecrawl", attempts=1) result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) + assert isinstance(result, FetchSuccess) assert result.text == "rescatado por firecrawl" assert result.attempts == 2 def test_extract_article_keeps_evidence_when_firecrawl_unavailable(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article def primary(url): - return FetchResult(failure_reason="js_required", error="js", attempts=1) + return FetchFailure(failure_reason="js_required", error="js", attempts=1) def firecrawl(url): return None result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) - assert result.text is None + assert isinstance(result, FetchFailure) assert result.failure_reason == "js_required" assert result.attempts == 1 def test_extract_article_does_not_retry_hard_failures(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article # A 404 is definitive — Firecrawl must not even be called. calls = [] def primary(url): - return FetchResult(http_status=404, failure_reason="not_found", attempts=1) + return FetchFailure(http_status=404, failure_reason="not_found", attempts=1) def firecrawl(url): calls.append(url) - return FetchResult(text="x") + return FetchSuccess(text="x") result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) + assert isinstance(result, FetchFailure) assert result.failure_reason == "not_found" assert calls == [] @@ -314,8 +328,7 @@ def opener(req, timeout=0): raise urllib.error.URLError("network down") result = _firecrawl_extract("https://e.com/a", opener=opener) - assert result is not None - assert result.text is None + assert isinstance(result, FetchFailure) assert "Firecrawl falló" in (result.error or "") @@ -331,22 +344,306 @@ def opener(req, timeout=0): return _CtxResp(json.dumps(payload).encode()) result = _firecrawl_extract("https://e.com/a", opener=opener) - assert result is not None - assert result.text is None + assert isinstance(result, FetchFailure) assert "rate limited" in (result.error or "") def test_extract_article_merges_firecrawl_error_when_both_fail(): - from xbrain.fetch import FetchResult, extract_article + from xbrain.fetch import extract_article def primary(url): - return FetchResult(failure_reason="js_required", error="js", attempts=1) + return FetchFailure(failure_reason="js_required", error="js", attempts=1) def firecrawl(url): # Firecrawl reachable but returned no text — carries its own evidence. - return FetchResult(error="Firecrawl no devolvió contenido.", attempts=1) + return FetchFailure(error="Firecrawl no devolvió contenido.", attempts=1) result = extract_article("https://e.com/a", primary=primary, firecrawl=firecrawl) - assert result.text is None + assert isinstance(result, FetchFailure) assert result.attempts == 2 assert "Firecrawl: Firecrawl no devolvió contenido." in (result.error or "") + + +# --------------------------------------------------------------------- #19: transient retry + + +def _content_with_source(*, ok: bool, failure_reason="timeout", kind="external_article", text=None): + """Helper: build a Content with a single ContentSource of the requested shape. + + `ok=True` builds a `ContentSourceSuccess` (text required); `ok=False` + builds a `ContentSourceFailure` (failure_reason required, default + ``"timeout"`` — the transient bucket). + """ + if ok: + source = ContentSourceSuccess(kind=kind, url="https://example.com/p", text=text or "body") + else: + source = ContentSourceFailure( + kind=kind, url="https://example.com/p", failure_reason=failure_reason + ) + return Content( + fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + sources=[source], + ) + + +# --- Truth table on _should_refetch --- + + +def test_should_refetch_when_content_is_none(): + assert _should_refetch(None, force=False) is True + assert _should_refetch(None, force=True) is True + + +def test_should_skip_successful_fetch_without_force(): + c = _content_with_source(ok=True, text="body") + assert _should_refetch(c, force=False) is False + + +def test_should_refetch_successful_fetch_with_force(): + c = _content_with_source(ok=True, text="body") + assert _should_refetch(c, force=True) is True + + +def test_should_refetch_all_transient_timeout(): + c = _content_with_source(ok=False, failure_reason="timeout") + assert _should_refetch(c, force=False) is True + + +def test_should_refetch_all_transient_dns_error(): + c = _content_with_source(ok=False, failure_reason="dns_error") + assert _should_refetch(c, force=False) is True + + +def test_should_skip_terminal_not_found_without_force(): + c = _content_with_source(ok=False, failure_reason="not_found") + assert _should_refetch(c, force=False) is False + + +def test_should_skip_terminal_paywall_without_force(): + c = _content_with_source(ok=False, failure_reason="paywall") + assert _should_refetch(c, force=False) is False + + +def test_should_skip_terminal_forbidden_without_force(): + c = _content_with_source(ok=False, failure_reason="forbidden") + assert _should_refetch(c, force=False) is False + + +def test_should_skip_terminal_js_required_without_force(): + c = _content_with_source(ok=False, failure_reason="js_required") + assert _should_refetch(c, force=False) is False + + +def test_should_skip_terminal_empty_content_without_force(): + c = _content_with_source(ok=False, failure_reason="empty_content") + assert _should_refetch(c, force=False) is False + + +def test_should_refetch_terminal_with_force(): + c = _content_with_source(ok=False, failure_reason="not_found") + assert _should_refetch(c, force=True) is True + + +def test_should_skip_mixed_transient_and_terminal(): + """Any terminal failure poisons the retry — the link is dead, retry is waste.""" + c = Content( + fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + sources=[ + ContentSourceFailure(kind="external_article", url="a", failure_reason="timeout"), + ContentSourceFailure(kind="external_article", url="b", failure_reason="not_found"), + ], + ) + assert _should_refetch(c, force=False) is False + + +def test_should_skip_mixed_transient_and_success(): + """One source succeeded — there is good content here, do not re-hit.""" + c = Content( + fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + sources=[ + ContentSourceFailure(kind="external_article", url="a", failure_reason="timeout"), + ContentSourceSuccess(kind="external_article", url="b", text="got it"), + ], + ) + assert _should_refetch(c, force=False) is False + + +def test_should_skip_only_x_sources(): + """fetch_pending must not act on items whose only sources are x.com — that is fetch_x's job.""" + c = Content( + fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + sources=[ + ContentSourceFailure( + kind="x_article", + url="https://x.com/i/article/1", + failure_reason="timeout", + ), + ], + ) + assert _should_refetch(c, force=False) is False + + +# --- Integration on fetch_pending --- + + +def _seed_with_failure(item_id, *, failure_reason): + """Helper: an item that has already been fetched and failed once.""" + item = _item(item_id, ["https://example.com/p"]) + item.content = _content_with_source(ok=False, failure_reason=failure_reason) + return item + + +def test_fetch_pending_retries_timeout_without_force(): + store = {"1": _seed_with_failure("1", failure_reason="timeout")} + count = fetch_pending(store, extractor=_fake_extractor) + assert count == 1 + # The retry overwrote the failure with a fresh, successful source + src = store["1"].content.sources[0] + assert isinstance(src, ContentSourceSuccess) + assert src.text + + +def test_fetch_pending_skips_not_found_without_force(): + store = {"1": _seed_with_failure("1", failure_reason="not_found")} + count = fetch_pending(store, extractor=_fake_extractor) + assert count == 0 + # The recorded failure is preserved untouched + src = store["1"].content.sources[0] + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "not_found" + + +def test_fetch_pending_force_refetches_not_found(): + store = {"1": _seed_with_failure("1", failure_reason="not_found")} + assert fetch_pending(store, force=True, extractor=_fake_extractor) == 1 + + +def test_fetch_pending_skips_transient_failures_outside_date_range(): + """The since/until filter applies on top of _should_refetch.""" + item = _seed_with_failure("1", failure_reason="timeout") + item.created_at = datetime(2026, 1, 1, tzinfo=timezone.utc) + store = {"1": item} + count = fetch_pending( + store, + since=datetime(2026, 5, 1, tzinfo=timezone.utc), + extractor=_fake_extractor, + ) + assert count == 0 + # And the recorded failure is preserved + src = store["1"].content.sources[0] + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "timeout" + + +# --- Additional fixes from PR #26 review pipeline --- + + +def test_should_refetch_legacy_uncategorized_failure_migrates_to_transient(): + """A pre-#20 record with `ok=False, failure_reason=None` (anomalous — + uncategorised) is normalised on read to `failure_reason="unknown_error"` + by the legacy-shape validator, so the next `fetch_pending` run retries it + automatically. This preserves the #19 behaviour (uncategorised failures + get one auto-retry) under the new tagged-union shape, even though after + the refactor a new failure record can no longer have a `None` reason. + """ + from xbrain.models import ContentSourceAdapter + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/p", + "ok": False, + "failure_reason": None, + "error": "anomalous failure", + } + ) + assert isinstance(src, ContentSourceFailure) + # migrator picked the transient `unknown_error` bucket (not `timeout`, + # which would mislabel the actual cause). + assert src.failure_reason == "unknown_error" + c = Content(fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[src]) + assert _should_refetch(c, force=False) is True + + +def test_should_refetch_external_transient_alongside_xcom_source(): + """`_should_refetch` filters to external_article sources before deciding. + An item with a transient-failed external_article PLUS an x.com source + (any state) should retry on the external — the x.com source is fetch_x's + job and must not block the external retry. + """ + c = Content( + fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), + sources=[ + ContentSourceFailure( + kind="external_article", + url="https://ext.com/a", + failure_reason="timeout", + ), + ContentSourceFailure( + kind="x_article", + url="https://x.com/i/article/9", + failure_reason="not_found", + ), + ], + ) + assert _should_refetch(c, force=False) is True + + +def test_fetch_pending_replaces_sources_does_not_append(): + """PRD §5 invariant: a re-fetch *replaces* external_article sources, never + appends. A transient-failed item with 1 source must still have 1 source + after the retry (overwritten), not 2. + """ + item = _item("1", ["https://example.com/p"]) + item.content = _content_with_source(ok=False, failure_reason="timeout") + assert len(item.content.sources) == 1 + store = {"1": item} + fetch_pending(store, extractor=_fake_extractor) + assert len(store["1"].content.sources) == 1 + # And the lone source is now the fresh successful fetch + src = store["1"].content.sources[0] + assert isinstance(src, ContentSourceSuccess) + assert src.text + + +def test_extractor_exception_persists_as_transient_failure(): + """An uncaught extractor exception must persist as a TRANSIENT failure + (`unknown_error`), so `_should_refetch` retries it on the next run. + + Regression guard: pre-#20 the bare-except in fetch_item wrote + `failure_reason=None`, and `_should_refetch` (post-#19) treated `None` + as transient. After #20 introduced the discriminated union, the field + became required — the temporary fix bucketed to `empty_content`, which + is TERMINAL and silently broke #19's invariant. This test pins the + correct behaviour: uncaught exceptions stay self-healing. + """ + + def _raising(url): + raise RuntimeError("network blip simulated") + + content = fetch_item(_item("1", ["https://example.com/p"]), _raising) + assert len(content.sources) == 1 + failure = content.sources[0] + assert isinstance(failure, ContentSourceFailure) + assert failure.failure_reason == "unknown_error" + assert "network blip" in failure.error + # And `_should_refetch` correctly retries on the next run + assert _should_refetch(content, force=False) is True + + +def test_content_source_from_uncategorised_failure_is_transient(): + """`_content_source_from` is the public helper that maps an in-memory + `FetchFailure` to a persisted `ContentSourceFailure`. A `FetchFailure` + with `failure_reason=None` must fall back to `unknown_error` (transient), + NOT `empty_content` (terminal). + """ + from xbrain.fetch import FetchFailure, _content_source_from + + failure_result = FetchFailure( + failure_reason=None, + error="something went wrong", + attempts=1, + ) + src = _content_source_from("https://example.com/p", failure_result) + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "unknown_error" diff --git a/tests/test_fetch_x.py b/tests/test_fetch_x.py index 7c46d09..80e689d 100644 --- a/tests/test_fetch_x.py +++ b/tests/test_fetch_x.py @@ -2,7 +2,13 @@ from datetime import datetime, timezone from xbrain.fetch_x import _classify_x_url, _x_status_id, assemble_linked_thread, fetch_x_articles -from xbrain.models import Author, Content, ContentSource, Item, Link +from xbrain.models import ( + Author, + Content, + ContentSourceSuccess, + Item, + Link, +) def _item(item_id: str, urls: list[str]) -> Item: @@ -82,7 +88,7 @@ def test_fetch_x_articles_attaches_sources_via_injected_fetcher(): fetched = fetch_x_articles( store, storage_state_path=None, - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="hilo", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="hilo"), ) assert fetched == 1 assert store["1"].content is not None @@ -103,7 +109,7 @@ def test_fetch_x_articles_skips_already_fetched_unless_forced(): store = {"1": _item("1", ["https://x.com/jack/status/9"])} def fetcher(url): - return ContentSource(kind="x_article", url=url, text="hilo", ok=True) + return ContentSourceSuccess(kind="x_article", url=url, text="hilo") assert fetch_x_articles(store, None, link_fetcher=fetcher) == 1 assert fetch_x_articles(store, None, link_fetcher=fetcher) == 0 @@ -129,7 +135,7 @@ def test_fetch_x_articles_respects_since_until(): storage_state_path=None, since=datetime(2026, 5, 1, tzinfo=timezone.utc), until=datetime(2026, 7, 1, tzinfo=timezone.utc), - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="x", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="x"), ) assert count == 1 assert store["1"].content is None @@ -141,7 +147,7 @@ def test_fetch_x_articles_dedups_repeated_x_urls(): fetch_x_articles( store, storage_state_path=None, - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="hilo", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="hilo"), ) sources = store["1"].content.sources assert len(sources) == 1 @@ -151,13 +157,13 @@ def test_fetch_x_articles_preserves_external_sources(): item = _item("1", ["https://x.com/jack/status/9"]) item.content = Content( fetched_at=datetime.now(timezone.utc), - sources=[ContentSource(kind="external_article", url="https://e.com", text="art", ok=True)], + sources=[ContentSourceSuccess(kind="external_article", url="https://e.com", text="art")], ) store = {"1": item} fetch_x_articles( store, None, - link_fetcher=lambda url: ContentSource(kind="x_article", url=url, text="x", ok=True), + link_fetcher=lambda url: ContentSourceSuccess(kind="x_article", url=url, text="x"), ) kinds = {s.kind for s in store["1"].content.sources} assert kinds == {"external_article", "x_article"} diff --git a/tests/test_generate.py b/tests/test_generate.py index 1398c16..49d586d 100644 --- a/tests/test_generate.py +++ b/tests/test_generate.py @@ -3,7 +3,15 @@ from pathlib import Path from xbrain.generate import generate -from xbrain.models import Author, Content, ContentSource, Enrichment, Item, Link +from xbrain.models import ( + Author, + Content, + ContentSourceFailure, + ContentSourceSuccess, + Enrichment, + Item, + Link, +) from xbrain.notes_io import slugify @@ -165,7 +173,6 @@ def test_note_renders_broken_link_evidence(tmp_path): from datetime import datetime, timezone from xbrain.generate import generate - from xbrain.models import Author, Content, ContentSource, Item, Link item = Item( id="1", @@ -179,10 +186,9 @@ def test_note_renders_broken_link_evidence(tmp_path): content=Content( fetched_at=datetime(2026, 5, 17, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceFailure( kind="external_article", url="https://dead.example.com/p", - ok=False, http_status=404, failure_reason="not_found", ) @@ -220,7 +226,7 @@ def _enriched_item(item_id: str = "9") -> Item: content=Content( fetched_at=datetime(2026, 5, 16, tzinfo=timezone.utc), sources=[ - ContentSource( + ContentSourceSuccess( kind="external_article", url="https://example.com/p", title="Article title", @@ -267,3 +273,67 @@ def test_generate_rejects_unsupported_language(tmp_path: Path): with pytest.raises(ValueError, match="Klingon"): generate({"9": _enriched_item()}, tmp_path, output_language="Klingon") + + +# --------------------------------------------------------------- topic_style + + +def _hashtag_item() -> Item: + """Enriched item with two topics — exercises the topic-line rendering.""" + return Item( + id="42", + source="bookmark", + url="https://x.com/a/status/42", + author=Author(handle="alice", name="Alice"), + text="Body 42", + created_at=datetime(2026, 5, 10, tzinfo=timezone.utc), + captured_at=datetime(2026, 5, 16, tzinfo=timezone.utc), + links=[Link(url="https://example.com/p", domain="example.com")], + enriched=Enrichment( + enriched_at=datetime(2026, 5, 16, tzinfo=timezone.utc), + executor="api", + summary="Resumen.", + primary_topic="ai-coding", + topics=["ai-coding", "software-engineering"], + ), + ) + + +def test_generate_renders_topics_as_wikilinks_by_default(tmp_path: Path): + """Default `topic_style` keeps the byte-for-byte current wikilink form.""" + generate({"42": _hashtag_item()}, tmp_path) + note = next((tmp_path / "items").glob("*-42.md")).read_text(encoding="utf-8") + assert "**Topics:** [[ai-coding]] · [[software-engineering]]" in note + assert "#ai-coding" not in note + assert "#software-engineering" not in note + + +def test_generate_renders_topics_as_hashtags_when_requested(tmp_path: Path): + """`topic_style="hashtag"` emits Obsidian tags space-separated on the line.""" + generate({"42": _hashtag_item()}, tmp_path, topic_style="hashtag") + note = next((tmp_path / "items").glob("*-42.md")).read_text(encoding="utf-8") + assert "**Topics:** #ai-coding #software-engineering" in note + assert "[[ai-coding]]" not in note + assert "[[software-engineering]]" not in note + # Orthogonality invariant: frontmatter `tags:` are unchanged across modes. + # Both slugs must still be present in the frontmatter as native Obsidian tags. + assert "tags: [x-knowledge, ai-coding, software-engineering]" in note + + +def test_generate_hashtag_mode_does_not_affect_index_or_topic_page_lists(tmp_path: Path): + """Hashtag mode is in-body-only — the `_index.md` ## Topics section stays wikilinks.""" + generate({"42": _hashtag_item()}, tmp_path, topic_style="hashtag") + index = (tmp_path / "_index.md").read_text(encoding="utf-8") + # The index ranks topics with wikilink-plus-count — independent of topic_style. + assert "[[ai-coding]] (1)" in index + assert "[[software-engineering]] (1)" in index + # And the index never carries the in-body `**Topics:**` line at all. + assert "**Topics:**" not in index + + +def test_generate_rejects_unknown_topic_style(tmp_path: Path): + """Unknown topic_style at the generator boundary surfaces a ValueError.""" + import pytest + + with pytest.raises(ValueError, match="topic_style"): + generate({"42": _hashtag_item()}, tmp_path, topic_style="bogus") diff --git a/tests/test_models.py b/tests/test_models.py index 4f84f60..989555c 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -78,34 +78,160 @@ def test_item_has_optional_bookmark_folder(): assert Item(**base, bookmark_folder="AI papers").bookmark_folder == "AI papers" -def test_content_source_carries_failure_evidence(): - from xbrain.models import ContentSource +def test_content_source_failure_variant_carries_failure_evidence(): + from xbrain.models import ContentSourceFailure - src = ContentSource( + src = ContentSourceFailure( kind="external_article", url="https://example.com/x", - ok=False, http_status=404, failure_reason="not_found", attempts=2, error="HTTP 404", ) + assert src.outcome == "failure" assert src.http_status == 404 assert src.failure_reason == "not_found" assert src.attempts == 2 -def test_content_source_failure_fields_default_cleanly(): - # An old-shape dict (no failure fields) must still validate — the existing - # items.json predates these fields. - from xbrain.models import ContentSource +def test_content_source_loads_legacy_ok_true_shape(): + """A pre-#20 record with `ok=True` must read into the success variant.""" + from xbrain.models import ContentSourceAdapter, ContentSourceSuccess + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/x", + "ok": True, + "title": "T", + "text": "body", + "http_status": 200, + "failure_reason": None, + "error": None, + "attempts": 1, + } + ) + assert isinstance(src, ContentSourceSuccess) + assert src.outcome == "success" + assert src.text == "body" + assert src.http_status == 200 + + +def test_content_source_loads_legacy_ok_false_shape(): + """A pre-#20 record with `ok=False` must read into the failure variant.""" + from xbrain.models import ContentSourceAdapter, ContentSourceFailure + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/x", + "ok": False, + "title": None, + "text": None, + "http_status": 404, + "failure_reason": "not_found", + "error": "HTTP 404", + "attempts": 2, + } + ) + assert isinstance(src, ContentSourceFailure) + assert src.outcome == "failure" + assert src.failure_reason == "not_found" + assert src.error == "HTTP 404" + + +def test_content_source_dumps_with_outcome_discriminator(): + """A success variant dumps with `outcome: "success"` and NO `ok` field.""" + from xbrain.models import ContentSourceSuccess + + src = ContentSourceSuccess(kind="external_article", url="u", text="t") + dumped = src.model_dump(mode="json") + assert dumped["outcome"] == "success" + assert "ok" not in dumped + + +def test_content_source_failure_dumps_with_outcome_discriminator(): + """A failure variant dumps with `outcome: "failure"` and NO `ok` field.""" + from xbrain.models import ContentSourceFailure + + src = ContentSourceFailure(kind="external_article", url="u", failure_reason="not_found") + dumped = src.model_dump(mode="json") + assert dumped["outcome"] == "failure" + assert "ok" not in dumped + + +def test_content_source_rejects_record_with_neither_discriminator(): + """Silently inventing `outcome` would mask data corruption — reject loudly.""" + import pytest + from pydantic import ValidationError + + from xbrain.models import ContentSourceAdapter + + with pytest.raises(ValidationError): + ContentSourceAdapter.validate_python({"kind": "external_article", "url": "u"}) + + +def test_content_source_success_requires_text(): + """The whole point of the refactor: a success without text is a type error. + + Pydantic raises a ValidationError at construction; mypy raises an error + statically (see tests/test_type_safety.py). + """ + import pytest + from pydantic import ValidationError + + from xbrain.models import ContentSourceSuccess + + with pytest.raises(ValidationError): + ContentSourceSuccess(kind="external_article", url="u") # missing `text` + + +def test_content_source_failure_requires_failure_reason(): + """Symmetric: a failure without a `failure_reason` is not demonstrable evidence.""" + import pytest + from pydantic import ValidationError - src = ContentSource.model_validate( - {"kind": "external_article", "url": "https://e.com", "ok": True} + from xbrain.models import ContentSourceFailure + + with pytest.raises(ValidationError): + ContentSourceFailure(kind="external_article", url="u") # missing `failure_reason` + + +def test_content_source_legacy_failure_without_reason_buckets_as_transient(): + """A pre-#20 record with `ok=False, failure_reason=None` (e.g. HTTP 429 that + the old code did not categorise) migrates losslessly: the `error` text is + preserved, and `failure_reason` is bucketed under `unknown_error` (a + transient retry-worthy reason added in the #20 review pass) so: + + 1. The wiki still renders a broken-link line. + 2. The next `fetch_pending` run auto-retries the record (issue #19 + retries `timeout`/`dns_error`/`unknown_error`), giving it one chance + to land on a proper category rather than staying invisibly stuck. + + `unknown_error` is preferred over `timeout` for honesty — "timeout" + would mislabel 429s / SSL handshake failures / other distinct error + modes that the legacy code dumped without a reason. + """ + from xbrain.models import ContentSourceAdapter, ContentSourceFailure + + src = ContentSourceAdapter.validate_python( + { + "kind": "external_article", + "url": "https://e.com/throttled", + "ok": False, + "title": None, + "text": None, + "http_status": 429, + "failure_reason": None, + "error": "HTTP 429: Too Many Requests", + "attempts": 1, + } ) - assert src.http_status is None - assert src.failure_reason is None - assert src.attempts == 0 + assert isinstance(src, ContentSourceFailure) + assert src.failure_reason == "unknown_error" + assert src.error == "HTTP 429: Too Many Requests" + assert src.http_status == 429 def test_topic_page_model_round_trips(): diff --git a/tests/test_store.py b/tests/test_store.py index e29fc5c..44a6e07 100644 --- a/tests/test_store.py +++ b/tests/test_store.py @@ -75,3 +75,113 @@ def test_load_topic_pages_returns_empty_when_absent(tmp_path): from xbrain.store import load_topic_pages assert load_topic_pages(tmp_path / "missing.json") == {} + + +def test_load_store_migrates_legacy_content_source_records_in_place(tmp_path: Path): + """Pre-#20 `data/items.json` files use `ok: bool` on each ContentSource. + + `load_store` must read those records into the new tagged-union variants, + and the next `save_store` must persist them with the `outcome` + discriminator (no `ok` field). This is the load-bearing test for the + upgrade story: existing users do not need to run any migration command — + a single read/write cycle is enough. + """ + import json + + from xbrain.models import ContentSourceFailure, ContentSourceSuccess + + legacy_items = { + "1": { + "id": "1", + "source": "bookmark", + "url": "https://x.com/a/status/1", + "author": {"handle": "a", "name": "A"}, + "text": "t", + "created_at": "2026-05-10T00:00:00+00:00", + "captured_at": "2026-05-16T00:00:00+00:00", + "media": [], + "links": [], + "content": { + "fetched_at": "2026-05-17T00:00:00+00:00", + "sources": [ + { + "kind": "external_article", + "url": "https://e.com/good", + "ok": True, + "title": "T", + "text": "body", + "http_status": 200, + "failure_reason": None, + "error": None, + "attempts": 1, + }, + { + "kind": "external_article", + "url": "https://e.com/dead", + "ok": False, + "title": None, + "text": None, + "http_status": 404, + "failure_reason": "not_found", + "error": "HTTP 404", + "attempts": 2, + }, + ], + }, + } + } + path = tmp_path / "items.json" + path.write_text(json.dumps(legacy_items), encoding="utf-8") + + store = load_store(path) + sources = store["1"].content.sources + assert isinstance(sources[0], ContentSourceSuccess) + assert sources[0].text == "body" + assert sources[0].title == "T" + assert isinstance(sources[1], ContentSourceFailure) + assert sources[1].failure_reason == "not_found" + assert sources[1].error == "HTTP 404" + + save_store(store, path) + raw = json.loads(path.read_text(encoding="utf-8")) + persisted = raw["1"]["content"]["sources"] + assert persisted[0]["outcome"] == "success" + assert "ok" not in persisted[0] + assert "failure_reason" not in persisted[0] + assert persisted[1]["outcome"] == "failure" + assert "ok" not in persisted[1] + assert "title" not in persisted[1] + + # The post-migration file is itself idempotent under reload. + assert load_store(path) == store + + +def test_load_store_rejects_content_source_without_any_discriminator(tmp_path: Path): + """A record with neither `outcome` nor `ok` must fail loudly, not default.""" + import json + + import pytest + + legacy_items = { + "1": { + "id": "1", + "source": "bookmark", + "url": "https://x.com/a/status/1", + "author": {"handle": "a", "name": "A"}, + "text": "t", + "created_at": "2026-05-10T00:00:00+00:00", + "captured_at": "2026-05-16T00:00:00+00:00", + "media": [], + "links": [], + "content": { + "fetched_at": "2026-05-17T00:00:00+00:00", + "sources": [ + {"kind": "external_article", "url": "https://e.com/x"}, + ], + }, + } + } + path = tmp_path / "items.json" + path.write_text(json.dumps(legacy_items), encoding="utf-8") + with pytest.raises(Exception): # noqa: BLE001 - pydantic ValidationError wraps the ValueError + load_store(path) diff --git a/tests/test_type_safety.py b/tests/test_type_safety.py new file mode 100644 index 0000000..eae468e --- /dev/null +++ b/tests/test_type_safety.py @@ -0,0 +1,89 @@ +"""Static-type-safety regression test for the #20 tagged-union refactor. + +The whole point of the #20 refactor is *illegal states unrepresentable at +the type level*. This test shells out to ``mypy`` against the +``tests/type_probes/illegal_states.py`` probe and asserts the four +intended errors are reported. + +If a future edit weakens the type contract (e.g. a Union[..] sneaks in, +or a required field becomes Optional), one of these checks stops failing +under mypy and this test goes red — that is exactly the regression we want +to catch, because it would silently allow the class of bugs the refactor +is meant to prevent. +""" + +from __future__ import annotations + +import os +import subprocess +import sys +from pathlib import Path + + +PROBE = Path(__file__).parent / "type_probes" / "illegal_states.py" +REPO_ROOT = Path(__file__).resolve().parent.parent +SRC_DIR = REPO_ROOT / "src" + + +def test_illegal_states_probe_is_rejected_by_mypy(): + """Every annotation in the probe must produce one mypy error. + + Four illegal states, four errors expected: + 1. `ContentSourceSuccess(kind=..., url=...)` missing `text`. + 2. `ContentSourceFailure(kind=..., url=...)` missing `failure_reason`. + 3. Reading `.failure_reason` off a `ContentSourceSuccess`. + 4. Reading `.text` off a `ContentSourceFailure`. + """ + import pytest + + # Use the SAME Python interpreter that runs pytest — guarantees the + # mypy executable matches the project's venv and the pydantic plugin + # resolves. `shutil.which("mypy")` is unreliable: under `uv run` the + # venv's bin is on PATH but under bare `pytest` it might not be. + try: + import mypy # noqa: F401 - import probe only + except ImportError: + pytest.skip("mypy not installed — install dev deps") + + env = { + **os.environ, + "MYPYPATH": str(SRC_DIR), + } + result = subprocess.run( + [ + sys.executable, + "-m", + "mypy", + "--no-incremental", + "--show-error-codes", + "--config-file", + str(REPO_ROOT / "pyproject.toml"), + str(PROBE), + ], + capture_output=True, + text=True, + check=False, + cwd=str(REPO_ROOT), + env=env, + ) + + error_lines = [line for line in result.stdout.splitlines() if "error:" in line] + expected_errors = { + # call-arg: missing required field on construction + 'Missing named argument "text" for "ContentSourceSuccess"', + 'Missing named argument "failure_reason" for "ContentSourceFailure"', + # attr-defined: the field does not exist on the narrowed variant + '"ContentSourceSuccess" has no attribute "failure_reason"', + '"ContentSourceFailure" has no attribute "text"', + } + matched = {expected for expected in expected_errors if any(expected in e for e in error_lines)} + + assert matched == expected_errors, ( + f"mypy did not report the expected errors.\n" + f"matched: {matched}\n" + f"missing: {expected_errors - matched}\n" + f"actual stdout:\n{result.stdout}\n" + f"actual stderr:\n{result.stderr}" + ) + # And mypy must exit non-zero overall. + assert result.returncode != 0, "mypy returned 0 — illegal states slipped through" diff --git a/tests/type_probes/__init__.py b/tests/type_probes/__init__.py new file mode 100644 index 0000000..97612b8 --- /dev/null +++ b/tests/type_probes/__init__.py @@ -0,0 +1,6 @@ +"""Mypy probes — fixtures that exist only to be checked by mypy. + +These files are NOT picked up by pytest's normal discovery; they are +imported / type-checked by tests in `tests/test_type_safety.py` which +shells out to mypy and asserts the expected errors are reported. +""" diff --git a/tests/type_probes/illegal_states.py b/tests/type_probes/illegal_states.py new file mode 100644 index 0000000..37873d5 --- /dev/null +++ b/tests/type_probes/illegal_states.py @@ -0,0 +1,31 @@ +"""Mypy probe: every annotation below MUST be a static type error. + +Verified at runtime by ``tests/test_type_safety.py``, which shells out to +``mypy`` on this file and asserts the exact errors fire. The point of the +#20 refactor is *illegal states unrepresentable*; this probe is the only +test that proves the property keeps holding under future edits. + +DO NOT add ``# type: ignore`` to these — that would defeat the test. +""" + +from xbrain.models import ContentSourceFailure, ContentSourceSuccess + + +def missing_text_on_success() -> ContentSourceSuccess: + """`ContentSourceSuccess` requires `text` — mypy must flag the omission.""" + return ContentSourceSuccess(kind="external_article", url="x") + + +def missing_failure_reason_on_failure() -> ContentSourceFailure: + """`ContentSourceFailure` requires `failure_reason` — mypy must flag the omission.""" + return ContentSourceFailure(kind="external_article", url="x") + + +def cannot_read_failure_reason_off_success(src: ContentSourceSuccess) -> str: + """A `failure_reason` field does not exist on the success variant.""" + return src.failure_reason + + +def cannot_read_text_off_failure(src: ContentSourceFailure) -> str: + """A `text` field does not exist on the failure variant.""" + return src.text