Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
139 changes: 89 additions & 50 deletions src/xbrain/media.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import logging
import sys
import time
from collections.abc import Callable
from collections.abc import Callable, Iterator
from dataclasses import dataclass, field
from datetime import datetime, timezone
from pathlib import Path
Expand Down Expand Up @@ -172,42 +172,29 @@ def download_all(
_sweep_part_orphans(media_root)
started = time.monotonic()
report = MediaReport()
target_ids: set[str] | None = set(items_filter) if items_filter else None
remaining: int | None = limit

for item_id, item in items.items():
if target_ids is not None and item_id not in target_ids:
continue
if not item.media:
continue
report.items_processed += 1
for index, entry in enumerate(item.media):
if remaining is not None and remaining <= 0:
report.elapsed_seconds = time.monotonic() - started
return report
if not _is_eligible(entry, force=force):
if isinstance(entry, MediaPhotoDownloaded):
report.photos_skipped_already_downloaded += 1
continue
# `_is_eligible` already excluded `MediaVideoPending`; narrow for mypy.
assert isinstance(entry, (MediaPhotoPending, MediaPhotoFailed, MediaPhotoDownloaded))
report.photos_attempted += 1
if remaining is not None:
remaining -= 1
result = _download_one(
entry,
item_id=item_id,
index=index,
media_root=media_root,
session=session,
timeout_seconds=timeout_seconds,
)
item.media[index] = result
_record_outcome(report, item_id=item_id, entry=result)
if on_progress is not None:
on_progress()
if throttle_seconds > 0:
sleep(throttle_seconds)
candidate_items = _filter_by_ids(items, items_filter)

for item_id, item, index, entry in _iter_eligible_attempts(
candidate_items,
limit=limit,
force=force,
report=report,
):
report.photos_attempted += 1
result = _download_one(
entry,
item_id=item_id,
index=index,
media_root=media_root,
session=session,
timeout_seconds=timeout_seconds,
)
item.media[index] = result
_record_outcome(report, item_id=item_id, entry=result)
if on_progress is not None:
on_progress()
if throttle_seconds > 0:
sleep(throttle_seconds)

report.elapsed_seconds = time.monotonic() - started
if report.photos_attempted > 0 and report.photos_downloaded == 0:
Expand Down Expand Up @@ -237,6 +224,54 @@ def _is_eligible(entry: MediaEntry, *, force: bool) -> bool:
assert_never(entry)


def _filter_by_ids(items: dict[str, Item], items_filter: list[str] | None) -> dict[str, Item]:
"""Restrict the store to the IDs in `items_filter`, or return it whole.

Pulled out of `download_all` so the orchestrator does not interleave
the filter check inside the per-photo loop. An empty / missing
`items_filter` is a no-op (returns the same dict).
"""
if not items_filter:
return items
wanted = set(items_filter)
return {item_id: item for item_id, item in items.items() if item_id in wanted}


def _iter_eligible_attempts(
items: dict[str, Item],
*,
limit: int | None,
force: bool,
report: MediaReport,
) -> Iterator[tuple[str, Item, int, MediaPhotoPending | MediaPhotoFailed | MediaPhotoDownloaded]]:
"""Yield each (item_id, item, index, entry) pair eligible for download.

Encapsulates the empty-media skip + per-entry eligibility cascade +
global limit countdown that `download_all` would otherwise interleave
with the download orchestration. Side effects on `report`: bumps
`items_processed` once per item that has media, and
`photos_skipped_already_downloaded` once per Downloaded entry passed
over (without `--force`). Stops yielding once `limit` is exhausted.
"""
remaining = limit
for item_id, item in items.items():
if not item.media:
continue
report.items_processed += 1
for index, entry in enumerate(item.media):
if remaining is not None and remaining <= 0:
return
if not _is_eligible(entry, force=force):
if isinstance(entry, MediaPhotoDownloaded):
report.photos_skipped_already_downloaded += 1
continue
# `_is_eligible` already excluded `MediaVideoPending`; narrow for mypy.
assert isinstance(entry, (MediaPhotoPending, MediaPhotoFailed, MediaPhotoDownloaded))
if remaining is not None:
remaining -= 1
yield item_id, item, index, entry


def _record_outcome(
report: MediaReport,
*,
Expand Down Expand Up @@ -346,19 +381,8 @@ def _download_one(
bytes_size=len(response.content),
downloaded_at=datetime.now(timezone.utc),
)
if 400 <= status < 500:
cascade_reason = _worse(cascade_reason, "http_4xx")
cascade_status = status
last_error = RuntimeError(f"HTTP {status} for {candidate_url}")
continue
if 500 <= status < 600:
cascade_reason = _worse(cascade_reason, "http_5xx")
cascade_status = status
last_error = RuntimeError(f"HTTP {status} for {candidate_url}")
continue
# Non-success, non-error status code (e.g. a 3xx that requests didn't
# follow). Bucket as unknown_error so the next run retries it.
cascade_reason = _worse(cascade_reason, "unknown_error")
# Non-2xx — classify the status, bucket the failure, try next size.
cascade_reason = _worse(cascade_reason, _classify_status(status))
cascade_status = status
last_error = RuntimeError(f"HTTP {status} for {candidate_url}")

Expand All @@ -371,6 +395,21 @@ def _download_one(
)


def _classify_status(status: int) -> MediaFailureReason:
"""Map a non-2xx HTTP status to its failure-reason bucket.

4xx is permanent for this URL (dead asset, bad cascade size); 5xx is
transient (CDN hiccup). Anything else — a 3xx that `requests` did not
follow, or a non-standard code — is bucketed as `unknown_error` so the
next run retries it.
"""
if 400 <= status < 500:
return "http_4xx"
if 500 <= status < 600:
return "http_5xx"
return "unknown_error"


def _worse(
current: MediaFailureReason | None,
candidate: MediaFailureReason,
Expand Down
Loading