Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,14 @@ DEP002 = [
"pytest-cov",
"ruff",
]
# DEP001: scripts/import_chrome_session.py imports browser-cookie3 behind a
# guarded `try/except ImportError` — it is a deliberately optional helper
# dependency, installed on demand, not a declared runtime requirement.
# DEP001 + DEP003: scripts/import_chrome_session.py + import_safari_session.py
# import browser-cookie3 behind a guarded `try/except ImportError` — it is a
# deliberately optional helper dependency, installed on demand, not a declared
# runtime requirement. Deptry sees it as both undeclared (DEP001) and, once
# pulled in via another package's extras on some machines, as transitive
# (DEP003). Both are intentional for this opt-in helper.
DEP001 = ["browser_cookie3"]
DEP003 = ["browser_cookie3"]

[tool.coverage.run]
source = ["src/xbrain"]
Expand Down
46 changes: 32 additions & 14 deletions src/xbrain/archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,35 +50,53 @@ def _parse_js_array(raw: str, tweets_file: str) -> list:
raise ValueError(f"{tweets_file}: malformed JSON in archive tweets file: {exc}") from exc


def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None:
tweet = entry.get("tweet")
if not isinstance(tweet, dict):
logger.warning("archive entry missing 'tweet' object, skipping")
return None
rest_id = tweet.get("id_str")
if not rest_id:
logger.warning("archive tweet missing 'id_str', skipping")
return None
rest_id = str(rest_id)
links = [
def _extract_archive_links(tweet: dict[str, Any]) -> list[Link]:
"""Parse `entities.urls` from an archive tweet into Link objects.

Each URL entity in the archive carries an `expanded_url`; we derive the
domain from it and drop any entry that lacks an expanded URL.
"""
return [
Link(
url=url_entity["expanded_url"],
domain=urlparse(url_entity["expanded_url"]).netloc,
)
for url_entity in tweet.get("entities", {}).get("urls", [])
if url_entity.get("expanded_url")
]


def _extract_archive_media(tweet: dict[str, Any]) -> list[MediaEntry]:
"""Parse media from `extended_entities.media`, falling back to `entities.media`.

Normalises `type` to `"video"` (videos and animated GIFs) or `"photo"`, and
picks the canonical URL (`media_url_https`, falling back to `expanded_url`).
Entries missing both URLs are dropped.
"""
media_entries = tweet.get("extended_entities", {}).get("media") or tweet.get(
"entities", {}
).get("media", [])
media: list[MediaEntry] = [
return [
Media(
type="video" if media_entity.get("type") in ("video", "animated_gif") else "photo",
url=media_entity.get("media_url_https") or media_entity["expanded_url"],
)
for media_entity in media_entries
if media_entity.get("media_url_https") or media_entity.get("expanded_url")
]


def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None:
"""Map one archive entry into an Item, returning None for malformed rows."""
tweet = entry.get("tweet")
if not isinstance(tweet, dict):
logger.warning("archive entry missing 'tweet' object, skipping")
return None
rest_id = tweet.get("id_str")
if not rest_id:
logger.warning("archive tweet missing 'id_str', skipping")
return None
rest_id = str(rest_id)
return Item(
id=rest_id,
source="own_tweet",
Expand All @@ -87,7 +105,7 @@ def _archive_tweet_to_item(entry: dict[str, Any], author: Author) -> Item | None
text=tweet.get("full_text", ""),
created_at=_parse_x_date(tweet.get("created_at")),
captured_at=datetime.now(timezone.utc),
media=media,
links=links,
media=_extract_archive_media(tweet),
links=_extract_archive_links(tweet),
quoted_id=None,
)
21 changes: 16 additions & 5 deletions src/xbrain/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,16 +353,27 @@ def _frontmatter(item: Item) -> str:
return "\n".join(lines)


def _render_index(items: list[Item], strings: Strings) -> str:
bookmarks = sum(1 for i in items if i.source == "bookmark")
own = sum(1 for i in items if i.source == "own_tweet")
noted = sum(1 for i in items if _has_note(i))
enriched = sum(1 for i in items if i.enriched)
def _count_topic_frequency(items: list[Item]) -> dict[str, int]:
"""Tally how often each topic appears across the enriched items.

Items without enrichment contribute nothing. The result maps topic slug
to the number of enriched items that include it.
"""
topic_freq: dict[str, int] = {}
for item in items:
if item.enriched:
for topic in item.enriched.topics:
topic_freq[topic] = topic_freq.get(topic, 0) + 1
return topic_freq


def _render_index(items: list[Item], strings: Strings) -> str:
"""Render the top-level index note: corpus stats and the topic list."""
bookmarks = sum(1 for i in items if i.source == "bookmark")
own = sum(1 for i in items if i.source == "own_tweet")
noted = sum(1 for i in items if _has_note(i))
enriched = sum(1 for i in items if i.enriched)
topic_freq = _count_topic_frequency(items)
lines = [
"# XBrain",
"",
Expand Down
67 changes: 48 additions & 19 deletions src/xbrain/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,46 +14,75 @@
_ALLOWED_KEYS = {"summary", "primary_topic", "topics"}


def validate_judgment(judgment: dict, vocab_slugs: Iterable[str]) -> list[str]:
"""Return a list of human-readable errors; an empty list means valid."""
rules = load_guardrails().get("enrichment", {})
vocab = set(vocab_slugs)
errors: list[str] = []

def _validate_judgment_keys(judgment: dict) -> list[str]:
"""Reject any key outside the allowed enrichment schema."""
extra = set(judgment) - _ALLOWED_KEYS
if extra:
errors.append(f"unexpected keys (LLM must emit only judgment): {sorted(extra)}")
return [f"unexpected keys (LLM must emit only judgment): {sorted(extra)}"]
return []


def _validate_summary(judgment: dict, rules: dict) -> list[str]:
"""Require a non-empty summary when guardrails demand it."""
summary = judgment.get("summary")
if rules.get("summary_required", True) and not (summary and str(summary).strip()):
errors.append("summary is missing or empty")
return ["summary is missing or empty"]
return []


def _validate_topics_list(judgment: dict, rules: dict, vocab: set[str]) -> list[str]:
"""Validate the topics list itself: count bounds, duplicates, vocabulary membership.

Returns an empty list when `topics` is not a list — the caller (`validate_judgment`)
is responsible for emitting the type error and aborting further topic-related checks.
"""
topics = judgment.get("topics")
if not isinstance(topics, list):
errors.append("topics must be a list")
return errors

return []
errors: list[str] = []
lo, hi = rules.get("topics_min", 1), rules.get("topics_max", 4)
if not (lo <= len(topics) <= hi):
errors.append(f"topics has {len(topics)} entries, must be {lo}-{hi}")

if len(set(topics)) != len(topics):
errors.append("topics has duplicate entries")

if rules.get("topics_must_be_in_vocab", True):
for slug in topics:
if slug not in vocab:
errors.append(f"topic '{slug}' is not in the vocabulary")
return errors


def _validate_primary_topic(
judgment: dict, topics: list, rules: dict, vocab: set[str]
) -> list[str]:
"""Validate `primary_topic`: presence, vocabulary membership, and inclusion in topics."""
primary = judgment.get("primary_topic")
if not primary:
errors.append("primary_topic is missing")
else:
if rules.get("topics_must_be_in_vocab", True) and primary not in vocab:
errors.append(f"primary_topic '{primary}' is not in the vocabulary")
if rules.get("primary_topic_must_be_in_topics", True) and primary not in topics:
errors.append(f"primary_topic '{primary}' is not inside topics")
return ["primary_topic is missing"]
errors: list[str] = []
if rules.get("topics_must_be_in_vocab", True) and primary not in vocab:
errors.append(f"primary_topic '{primary}' is not in the vocabulary")
if rules.get("primary_topic_must_be_in_topics", True) and primary not in topics:
errors.append(f"primary_topic '{primary}' is not inside topics")
return errors


def validate_judgment(judgment: dict, vocab_slugs: Iterable[str]) -> list[str]:
"""Return a list of human-readable errors; an empty list means valid."""
rules = load_guardrails().get("enrichment", {})
vocab = set(vocab_slugs)

errors: list[str] = []
errors += _validate_judgment_keys(judgment)
errors += _validate_summary(judgment, rules)

topics = judgment.get("topics")
if not isinstance(topics, list):
errors.append("topics must be a list")
return errors

errors += _validate_topics_list(judgment, rules, vocab)
errors += _validate_primary_topic(judgment, topics, rules, vocab)
return errors


Expand Down
Loading