diff --git a/pyproject.toml b/pyproject.toml index 6e3d41a..57735f3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -97,10 +97,14 @@ DEP002 = [ "pytest-cov", "ruff", ] -# DEP001: scripts/import_chrome_session.py imports browser-cookie3 behind a -# guarded `try/except ImportError` — it is a deliberately optional helper -# dependency, installed on demand, not a declared runtime requirement. +# DEP001 + DEP003: scripts/import_chrome_session.py + import_safari_session.py +# import browser-cookie3 behind a guarded `try/except ImportError` — it is a +# deliberately optional helper dependency, installed on demand, not a declared +# runtime requirement. Deptry sees it as both undeclared (DEP001) and, once +# pulled in via another package's extras on some machines, as transitive +# (DEP003). Both are intentional for this opt-in helper. DEP001 = ["browser_cookie3"] +DEP003 = ["browser_cookie3"] [tool.coverage.run] source = ["src/xbrain"] diff --git a/src/xbrain/archive.py b/src/xbrain/archive.py index 5e27f61..7e5091c 100644 --- a/src/xbrain/archive.py +++ b/src/xbrain/archive.py @@ -50,17 +50,13 @@ def _parse_js_array(raw: str, tweets_file: str) -> list: raise ValueError(f"{tweets_file}: malformed JSON in archive tweets file: {exc}") from exc -def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None: - tweet = entry.get("tweet") - if not isinstance(tweet, dict): - logger.warning("archive entry missing 'tweet' object, skipping") - return None - rest_id = tweet.get("id_str") - if not rest_id: - logger.warning("archive tweet missing 'id_str', skipping") - return None - rest_id = str(rest_id) - links = [ +def _extract_archive_links(tweet: dict[str, Any]) -> list[Link]: + """Parse `entities.urls` from an archive tweet into Link objects. + + Each URL entity in the archive carries an `expanded_url`; we derive the + domain from it and drop any entry that lacks an expanded URL. + """ + return [ Link( url=url_entity["expanded_url"], domain=urlparse(url_entity["expanded_url"]).netloc, @@ -68,10 +64,19 @@ def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None for url_entity in tweet.get("entities", {}).get("urls", []) if url_entity.get("expanded_url") ] + + +def _extract_archive_media(tweet: dict[str, Any]) -> list[MediaEntry]: + """Parse media from `extended_entities.media`, falling back to `entities.media`. + + Normalises `type` to `"video"` (videos and animated GIFs) or `"photo"`, and + picks the canonical URL (`media_url_https`, falling back to `expanded_url`). + Entries missing both URLs are dropped. + """ media_entries = tweet.get("extended_entities", {}).get("media") or tweet.get( "entities", {} ).get("media", []) - media: list[MediaEntry] = [ + return [ Media( type="video" if media_entity.get("type") in ("video", "animated_gif") else "photo", url=media_entity.get("media_url_https") or media_entity["expanded_url"], @@ -79,6 +84,19 @@ def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None for media_entity in media_entries if media_entity.get("media_url_https") or media_entity.get("expanded_url") ] + + +def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None: + """Map one archive entry into an Item, returning None for malformed rows.""" + tweet = entry.get("tweet") + if not isinstance(tweet, dict): + logger.warning("archive entry missing 'tweet' object, skipping") + return None + rest_id = tweet.get("id_str") + if not rest_id: + logger.warning("archive tweet missing 'id_str', skipping") + return None + rest_id = str(rest_id) return Item( id=rest_id, source="own_tweet", @@ -87,7 +105,7 @@ def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None text=tweet.get("full_text", ""), created_at=_parse_x_date(tweet.get("created_at")), captured_at=datetime.now(timezone.utc), - media=media, - links=links, + media=_extract_archive_media(tweet), + links=_extract_archive_links(tweet), quoted_id=None, ) diff --git a/src/xbrain/generate.py b/src/xbrain/generate.py index 83d6574..1e242ec 100644 --- a/src/xbrain/generate.py +++ b/src/xbrain/generate.py @@ -353,16 +353,27 @@ def _frontmatter(item: Item) -> str: return "\n".join(lines) -def _render_index(items: list[Item], strings: Strings) -> str: - bookmarks = sum(1 for i in items if i.source == "bookmark") - own = sum(1 for i in items if i.source == "own_tweet") - noted = sum(1 for i in items if _has_note(i)) - enriched = sum(1 for i in items if i.enriched) +def _count_topic_frequency(items: list[Item]) -> dict[str, int]: + """Tally how often each topic appears across the enriched items. + + Items without enrichment contribute nothing. The result maps topic slug + to the number of enriched items that include it. + """ topic_freq: dict[str, int] = {} for item in items: if item.enriched: for topic in item.enriched.topics: topic_freq[topic] = topic_freq.get(topic, 0) + 1 + return topic_freq + + +def _render_index(items: list[Item], strings: Strings) -> str: + """Render the top-level index note: corpus stats and the topic list.""" + bookmarks = sum(1 for i in items if i.source == "bookmark") + own = sum(1 for i in items if i.source == "own_tweet") + noted = sum(1 for i in items if _has_note(i)) + enriched = sum(1 for i in items if i.enriched) + topic_freq = _count_topic_frequency(items) lines = [ "# XBrain", "", diff --git a/src/xbrain/validate.py b/src/xbrain/validate.py index 44e63c3..a5afe05 100644 --- a/src/xbrain/validate.py +++ b/src/xbrain/validate.py @@ -14,46 +14,75 @@ _ALLOWED_KEYS = {"summary", "primary_topic", "topics"} -def validate_judgment(judgment: dict, vocab_slugs: Iterable[str]) -> list[str]: - """Return a list of human-readable errors; an empty list means valid.""" - rules = load_guardrails().get("enrichment", {}) - vocab = set(vocab_slugs) - errors: list[str] = [] - +def _validate_judgment_keys(judgment: dict) -> list[str]: + """Reject any key outside the allowed enrichment schema.""" extra = set(judgment) - _ALLOWED_KEYS if extra: - errors.append(f"unexpected keys (LLM must emit only judgment): {sorted(extra)}") + return [f"unexpected keys (LLM must emit only judgment): {sorted(extra)}"] + return [] + +def _validate_summary(judgment: dict, rules: dict) -> list[str]: + """Require a non-empty summary when guardrails demand it.""" summary = judgment.get("summary") if rules.get("summary_required", True) and not (summary and str(summary).strip()): - errors.append("summary is missing or empty") + return ["summary is missing or empty"] + return [] + +def _validate_topics_list(judgment: dict, rules: dict, vocab: set[str]) -> list[str]: + """Validate the topics list itself: count bounds, duplicates, vocabulary membership. + + Returns an empty list when `topics` is not a list — the caller (`validate_judgment`) + is responsible for emitting the type error and aborting further topic-related checks. + """ topics = judgment.get("topics") if not isinstance(topics, list): - errors.append("topics must be a list") - return errors - + return [] + errors: list[str] = [] lo, hi = rules.get("topics_min", 1), rules.get("topics_max", 4) if not (lo <= len(topics) <= hi): errors.append(f"topics has {len(topics)} entries, must be {lo}-{hi}") - if len(set(topics)) != len(topics): errors.append("topics has duplicate entries") - if rules.get("topics_must_be_in_vocab", True): for slug in topics: if slug not in vocab: errors.append(f"topic '{slug}' is not in the vocabulary") + return errors + +def _validate_primary_topic( + judgment: dict, topics: list, rules: dict, vocab: set[str] +) -> list[str]: + """Validate `primary_topic`: presence, vocabulary membership, and inclusion in topics.""" primary = judgment.get("primary_topic") if not primary: - errors.append("primary_topic is missing") - else: - if rules.get("topics_must_be_in_vocab", True) and primary not in vocab: - errors.append(f"primary_topic '{primary}' is not in the vocabulary") - if rules.get("primary_topic_must_be_in_topics", True) and primary not in topics: - errors.append(f"primary_topic '{primary}' is not inside topics") + return ["primary_topic is missing"] + errors: list[str] = [] + if rules.get("topics_must_be_in_vocab", True) and primary not in vocab: + errors.append(f"primary_topic '{primary}' is not in the vocabulary") + if rules.get("primary_topic_must_be_in_topics", True) and primary not in topics: + errors.append(f"primary_topic '{primary}' is not inside topics") + return errors + + +def validate_judgment(judgment: dict, vocab_slugs: Iterable[str]) -> list[str]: + """Return a list of human-readable errors; an empty list means valid.""" + rules = load_guardrails().get("enrichment", {}) + vocab = set(vocab_slugs) + + errors: list[str] = [] + errors += _validate_judgment_keys(judgment) + errors += _validate_summary(judgment, rules) + + topics = judgment.get("topics") + if not isinstance(topics, list): + errors.append("topics must be a list") + return errors + errors += _validate_topics_list(judgment, rules, vocab) + errors += _validate_primary_topic(judgment, topics, rules, vocab) return errors