diff --git a/src/xbrain/media.py b/src/xbrain/media.py index 1c8af1c..67aaa57 100644 --- a/src/xbrain/media.py +++ b/src/xbrain/media.py @@ -21,7 +21,7 @@ import logging import sys import time -from collections.abc import Callable +from collections.abc import Callable, Iterator from dataclasses import dataclass, field from datetime import datetime, timezone from pathlib import Path @@ -172,42 +172,29 @@ def download_all( _sweep_part_orphans(media_root) started = time.monotonic() report = MediaReport() - target_ids: set[str] | None = set(items_filter) if items_filter else None - remaining: int | None = limit - - for item_id, item in items.items(): - if target_ids is not None and item_id not in target_ids: - continue - if not item.media: - continue - report.items_processed += 1 - for index, entry in enumerate(item.media): - if remaining is not None and remaining <= 0: - report.elapsed_seconds = time.monotonic() - started - return report - if not _is_eligible(entry, force=force): - if isinstance(entry, MediaPhotoDownloaded): - report.photos_skipped_already_downloaded += 1 - continue - # `_is_eligible` already excluded `MediaVideoPending`; narrow for mypy. - assert isinstance(entry, (MediaPhotoPending, MediaPhotoFailed, MediaPhotoDownloaded)) - report.photos_attempted += 1 - if remaining is not None: - remaining -= 1 - result = _download_one( - entry, - item_id=item_id, - index=index, - media_root=media_root, - session=session, - timeout_seconds=timeout_seconds, - ) - item.media[index] = result - _record_outcome(report, item_id=item_id, entry=result) - if on_progress is not None: - on_progress() - if throttle_seconds > 0: - sleep(throttle_seconds) + candidate_items = _filter_by_ids(items, items_filter) + + for item_id, item, index, entry in _iter_eligible_attempts( + candidate_items, + limit=limit, + force=force, + report=report, + ): + report.photos_attempted += 1 + result = _download_one( + entry, + item_id=item_id, + index=index, + media_root=media_root, + session=session, + timeout_seconds=timeout_seconds, + ) + item.media[index] = result + _record_outcome(report, item_id=item_id, entry=result) + if on_progress is not None: + on_progress() + if throttle_seconds > 0: + sleep(throttle_seconds) report.elapsed_seconds = time.monotonic() - started if report.photos_attempted > 0 and report.photos_downloaded == 0: @@ -237,6 +224,54 @@ def _is_eligible(entry: MediaEntry, *, force: bool) -> bool: assert_never(entry) +def _filter_by_ids(items: dict[str, Item], items_filter: list[str] | None) -> dict[str, Item]: + """Restrict the store to the IDs in `items_filter`, or return it whole. + + Pulled out of `download_all` so the orchestrator does not interleave + the filter check inside the per-photo loop. An empty / missing + `items_filter` is a no-op (returns the same dict). + """ + if not items_filter: + return items + wanted = set(items_filter) + return {item_id: item for item_id, item in items.items() if item_id in wanted} + + +def _iter_eligible_attempts( + items: dict[str, Item], + *, + limit: int | None, + force: bool, + report: MediaReport, +) -> Iterator[tuple[str, Item, int, MediaPhotoPending | MediaPhotoFailed | MediaPhotoDownloaded]]: + """Yield each (item_id, item, index, entry) pair eligible for download. + + Encapsulates the empty-media skip + per-entry eligibility cascade + + global limit countdown that `download_all` would otherwise interleave + with the download orchestration. Side effects on `report`: bumps + `items_processed` once per item that has media, and + `photos_skipped_already_downloaded` once per Downloaded entry passed + over (without `--force`). Stops yielding once `limit` is exhausted. + """ + remaining = limit + for item_id, item in items.items(): + if not item.media: + continue + report.items_processed += 1 + for index, entry in enumerate(item.media): + if remaining is not None and remaining <= 0: + return + if not _is_eligible(entry, force=force): + if isinstance(entry, MediaPhotoDownloaded): + report.photos_skipped_already_downloaded += 1 + continue + # `_is_eligible` already excluded `MediaVideoPending`; narrow for mypy. + assert isinstance(entry, (MediaPhotoPending, MediaPhotoFailed, MediaPhotoDownloaded)) + if remaining is not None: + remaining -= 1 + yield item_id, item, index, entry + + def _record_outcome( report: MediaReport, *, @@ -346,19 +381,8 @@ def _download_one( bytes_size=len(response.content), downloaded_at=datetime.now(timezone.utc), ) - if 400 <= status < 500: - cascade_reason = _worse(cascade_reason, "http_4xx") - cascade_status = status - last_error = RuntimeError(f"HTTP {status} for {candidate_url}") - continue - if 500 <= status < 600: - cascade_reason = _worse(cascade_reason, "http_5xx") - cascade_status = status - last_error = RuntimeError(f"HTTP {status} for {candidate_url}") - continue - # Non-success, non-error status code (e.g. a 3xx that requests didn't - # follow). Bucket as unknown_error so the next run retries it. - cascade_reason = _worse(cascade_reason, "unknown_error") + # Non-2xx — classify the status, bucket the failure, try next size. + cascade_reason = _worse(cascade_reason, _classify_status(status)) cascade_status = status last_error = RuntimeError(f"HTTP {status} for {candidate_url}") @@ -371,6 +395,21 @@ def _download_one( ) +def _classify_status(status: int) -> MediaFailureReason: + """Map a non-2xx HTTP status to its failure-reason bucket. + + 4xx is permanent for this URL (dead asset, bad cascade size); 5xx is + transient (CDN hiccup). Anything else — a 3xx that `requests` did not + follow, or a non-standard code — is bucketed as `unknown_error` so the + next run retries it. + """ + if 400 <= status < 500: + return "http_4xx" + if 500 <= status < 600: + return "http_5xx" + return "unknown_error" + + def _worse( current: MediaFailureReason | None, candidate: MediaFailureReason,