diff --git a/common/lib/archive_metadata.py b/common/lib/archive_metadata.py new file mode 100644 index 000000000..1832a8eab --- /dev/null +++ b/common/lib/archive_metadata.py @@ -0,0 +1,570 @@ +""" +Read and write metadata files packaged inside dataset archives. + +4CAT processors that produce ZIP archives can include a small JSON file +describing the other files in the archive — what they are and where they came +from. This module provides `ArchiveMetadataFile`, a base class handling the +shared plumbing (locating the file inside a zip or results folder, schema +versioning, atomic writes), and `MediaArchiveMetadata`, the schema for media +download archives. + +`MediaArchiveMetadata` record per media file the source post IDs, +originating URL (if any), and any platform-specific data. Download attempts +that produced no file are recorded separately as failures. + +TODO: +There are other metadata files (e.g. token sets, topic models) with their own +metadata files with quite different schemas. They seem like they need their +own classes as well (or would be super redundant in `ArchiveMetadataFile` +format). +""" +import json +import os +import zipfile +from pathlib import Path +from typing import Iterator, Optional + +from common.lib.exceptions import MetadataException +from common.lib.helpers import get_software_commit + + +class ArchiveMetadataFile: + """ + Base class for JSON metadata files inside dataset archives. + + Handles only the shared plumbing: locate the file (zip member or + results folder), read it, check the schema version, and writing it + back. Subclasses define the actual schema by implementing + `_populate_from_raw` and `to_dict`, and optionally `validate`. + """ + + # Subclasses must set these. + SCHEMA_VERSION: Optional[int] = None + DEFAULT_FILENAME: Optional[str] = None + + def __init__(self, dataset=None, processor_type: Optional[str] = None, filename: Optional[str] = None): + self.dataset = dataset + # store the processor that created the archive and metadata + # leaving optional in cases we create metadata... some other way + self.processor_type = processor_type + # the 4CAT version that created the metadata: resolved by `new()` + self.software_version = None + self.software_source = None + + # filename and schema + self._filename = filename or self.DEFAULT_FILENAME + self.schema_version = self.SCHEMA_VERSION + + def _resolve_software_version(self) -> None: + """ + Record the 4CAT commit / repository creating this metadata. + + Called by `new()` on the producer path. A no-op without a dataset to + resolve against (e.g. metadata constructed outside a processor run). + """ + if self.dataset is None: + return + commit, source = get_software_commit(self.dataset.get_own_processor()) + self.software_version = commit + self.software_source = source + + @classmethod + def read(cls, dataset, *, filename: Optional[str] = None): + """ + Load metadata from a finished dataset's archive or results folder. + + :param dataset: the DataSet to read from. + :param filename: metadata filename; defaults to the class's + `DEFAULT_FILENAME`. + + :raises FileNotFoundError: the metadata file is not present. + :raises MetadataException: the dataset is unfinished/empty, or the + file is malformed or claims an unsupported schema version. + """ + filename = filename or cls.DEFAULT_FILENAME + + # for reading, we check the dataset is actually finished and has results + results_file = dataset.check_dataset_finished() + if results_file is None: + raise MetadataException("Dataset is not finished; metadata may be incomplete.") + if results_file == "empty": + raise MetadataException("Dataset is empty; no metadata available.") + + # load it + raw = cls._load_raw(dataset, Path(results_file), filename) + # create the class + instance = cls(dataset, filename=filename) + # and populate + instance._populate_from_raw(raw) + return instance + + @classmethod + def _load_raw(cls, dataset, results_file: Path, filename: str) -> dict: + """ + Locate and JSON-decode the metadata file (zip member or folder). + + :raises FileNotFoundError: the file is not present. + :raises MetadataException: the file is present but not valid JSON. + """ + # if the dataset has a zip archive, look inside it + if results_file.suffix == ".zip": + with zipfile.ZipFile(results_file, "r") as archive: + if filename not in archive.namelist(): + raise FileNotFoundError(f"No metadata file {filename} in archive {results_file.name}.") + with archive.open(filename) as f: + return cls._decode_json(f, filename) + + # it is possible we have a results folder w/ metadata instead; look there + results_folder = dataset.get_results_folder_path() + if not results_folder.is_dir(): + raise FileNotFoundError(f"No metadata file {filename} found (no results folder).") + + # check that the file exists + metadata_path = results_folder / filename + if not metadata_path.is_file(): + raise FileNotFoundError(f"No metadata file {filename} in {results_folder}.") + + # load the metadata! + with open(metadata_path) as f: + return cls._decode_json(f, filename) + + @staticmethod + def _decode_json(fp, filename: str) -> dict: + """ + JSON-decode an open file object, surfacing decode errors as + `MetadataException`. Without this a corrupt metadata file raises + `json.JSONDecodeError`, which escapes the `(FileNotFoundError, + MetadataException)` handler every consumer uses. + """ + try: + return json.load(fp) + except json.JSONDecodeError as e: + raise MetadataException(f"Metadata file {filename} is not valid JSON: {e}") from e + + @classmethod + def _check_schema_version(cls, raw: dict) -> Optional[int]: + """ + Read the `schema_version` of a raw metadata dict. + + Returns the declared version, or None if the file predates schema + versioning (legacy). Raises MetadataException for a version this + class does not know how to read. + + This is meant to allow us to change the schema later if needed + """ + version = raw.get("schema_version") + if version is not None and version != cls.SCHEMA_VERSION: + raise MetadataException( + f"Unsupported metadata schema_version {version}; expected {cls.SCHEMA_VERSION}." + ) + return version + + # -- subclass hooks -- + + def _populate_from_raw(self, raw: dict) -> None: + """Populate instance state from a raw decoded dict.""" + raise NotImplementedError + + def to_dict(self) -> dict: + """Serialise instance state to a JSON-encodable dict.""" + raise NotImplementedError + + def validate(self) -> None: + """Strict schema check run before writing. Override as needed.""" + pass + + # -- output -- + + def write(self, staging_area, *, filename: Optional[str] = None) -> Path: + """ + Validate and atomically write the metadata file to + `/`. Returns the path written. + """ + if filename is None: + filename = self._filename + + # validate before writing; this will be a sanity check for fields (or whatever) + self.validate() + + # write it first to a staging area, then move it to the target location + # should avoid leaving a half-written file if something goes wrong + staging_area = Path(staging_area) + target = staging_area / filename + tmp = target.parent / (target.name + ".tmp") + try: + with open(tmp, "w", encoding="utf-8") as f: + json.dump(self.to_dict(), f) + os.replace(tmp, target) + except Exception: + # don't leave a partial .tmp file behind to be zipped into the archive + tmp.unlink(missing_ok=True) + raise + return target + + +class MediaArchiveMetadata(ArchiveMetadataFile): + """ + Metadata for media download archives (`.metadata.json`). + + Keyed by output filename; each item records the source post IDs, the + originating URL (if any), and an `extra` blob of platform-specific data. + Download attempts that produced no file are recorded in `failures`. + """ + + SCHEMA_VERSION = 1 + DEFAULT_FILENAME = ".metadata.json" + + def __init__(self, dataset=None, *, processor_type: Optional[str] = None, + from_dataset: Optional[str] = None, + filename: Optional[str] = None): + # `dataset` is the target dataset (the one the archive belongs to). + # `from_dataset` is the source dataset key whose posts are being + # downloaded from; these are not generally the same and there is no + # safe default — producers must set it explicitly. + super().__init__(dataset, processor_type=processor_type, filename=filename) + # store the source dataset key (e.g. top dataset or some subdataset) + self.from_dataset = from_dataset + + # the actual metadata content + self.items: dict = {} + self.failures: list = [] + + # -- constructors -- + + @classmethod + def new(cls, dataset=None, *, processor_type: str, + from_dataset: Optional[str] = None, + filename: Optional[str] = None) -> "MediaArchiveMetadata": + """ + Empty container for a producer that is building an archive. + + :param dataset: the DataSet the archive will belong to. + :param processor_type: producer identifier, typically `processor.type`. + :param from_dataset: key of the source dataset whose posts are being + downloaded from. + :param filename: metadata filename inside the archive. + """ + instance = cls(dataset, processor_type=processor_type, + from_dataset=from_dataset, filename=filename) + instance._resolve_software_version() + return instance + + # -- schema population / legacy normalization -- + + def _populate_from_raw(self, raw: dict) -> None: + """ + Populate fields from a raw dict. v1 is read directly; older shapes + are translated through `_normalize_legacy`. + """ + if not isinstance(raw, dict): + raise MetadataException("Metadata file is not a JSON object.") + + version = self._check_schema_version(raw) + if version == self.SCHEMA_VERSION: + # current schema. `_check_schema_version` has already rejected any + # mismatching version, so this is the only non-legacy branch. + self.schema_version = version + self.from_dataset = raw.get("from_dataset", self.from_dataset) + # pretty sure we did not store the processor or version info previously + self.processor_type = raw.get("processor_type") + self.software_version = raw.get("software_version") + self.software_source = raw.get("software_source") + + # post_ids are string-coerced on write, but normalize on read too + # so consumers never have to: some source datasets use integer ids, + # and hand-edited or externally-produced files may carry them. + self.items = { + filename: self._normalize_entry_post_ids(entry) + for filename, entry in dict(raw.get("items", {})).items() + } + self.failures = [ + self._normalize_entry_post_ids(failure) + for failure in list(raw.get("failures", [])) + ] + return + + # version is None here (a mismatching version would have raised + # above): this is a pre-schema-versioned file. + self._normalize_legacy(raw) + + def _normalize_legacy(self, raw: dict) -> None: + """ + Translate pre-v1 metadata to v1. + + Old format is a flat dict keyed by URL (image/video) or filename + (Telegram). Each entry has a `success` flag; failed entries become + `failures[]`, successful entries become `items[filename]`. The video + downloader's nested `files[]` list explodes to one item per file. + All producer-specific data is carried across into `extra` (items) or + the failure dict (failures) so the migration loses nothing — see + `_legacy_extra`. + """ + for outer_key, entry in raw.items(): + if not isinstance(entry, dict): + # malformed; should not exist + continue + + post_ids = entry.get("post_ids") + if post_ids is None and "post_id" in entry: + post_ids = [entry["post_id"]] + post_ids = self._normalize_post_ids(post_ids) + + url = entry.get("url") + # check if the key was a URL + if url is None and isinstance(outer_key, str) and ( + outer_key.startswith("http://") or outer_key.startswith("https://")): + url = outer_key + + # existing `from_dataset` is correct if exists + if entry.get("from_dataset"): + self.from_dataset = entry["from_dataset"] + + success = entry.get("success", True) + + if not success: + self.failures.append(self._legacy_failure( + post_ids, url, + entry.get("reason"), + entry.get("reason_description") or entry.get("error"), + entry, + )) + continue + + # `files` key from video downloader due to channels and playlists + files = entry.get("files") + if isinstance(files, list) and files: + for file in files: + if not isinstance(file, dict): + continue + file_success = file.get("success", True) + file_filename = file.get("filename") + if not file_success: + # the shared outer entry data is preserved alongside the + # per-file data so e.g. the video `downloader` survives + self.failures.append(self._legacy_failure( + post_ids, url, + file.get("reason"), + file.get("reason_description") or file.get("error"), + entry, file, + )) + continue + if not file_filename: + continue + self.items[file_filename] = self._build_item( + file_filename, post_ids, url, self._legacy_extra(entry, file) + ) + elif entry.get("filename"): + filename = entry["filename"] + self.items[filename] = self._build_item( + filename, post_ids, url, self._legacy_extra(entry) + ) + # else: malformed-but-tolerated; drop + + # Legacy entry keys promoted to first-class v1 fields, or pure structure; + # every other key is producer data and is preserved under `extra`. + _LEGACY_NON_EXTRA_KEYS = frozenset({ + "filename", "post_ids", "post_id", "url", "from_dataset", "success", + "files", "metadata", "extra", + }) + # Additionally promoted on the failure path: failures have no `extra` of + # their own, but carry these as first-class fields. + _LEGACY_FAILURE_KEYS = frozenset({"reason", "reason_description", "error"}) + + @classmethod + def _legacy_extra(cls, *sources, exclude=frozenset()) -> dict: + """ + Flatten producer data from one or more legacy dicts into a single flat + `extra` dict; later sources win on a key clash. + + Pre-v1 files had no `extra` key — per-file data (yt-dlp dumps, the + Stable Diffusion `prompt`, the video `downloader`, ...) sat at the top + level or inside a `metadata` sub-dict. Everything not promoted to a + first-class v1 field is kept so the migration loses nothing. `extra` + is kept flat because v1 consumers read keys off it directly. + """ + skip = cls._LEGACY_NON_EXTRA_KEYS | exclude + extra = {} + for source in sources: + if not isinstance(source, dict): + continue + extra.update({k: v for k, v in source.items() if k not in skip}) + # a `metadata` (yt-dlp dump) or explicit `extra` is flattened in + for nested_key in ("metadata", "extra"): + nested = source.get(nested_key) + if isinstance(nested, dict): + extra.update(nested) + return extra + + @classmethod + def _legacy_failure(cls, post_ids, url, reason, reason_description, + *extra_sources) -> dict: + """ + Build a v1 failure dict from legacy data. Leftover producer data is + preserved under the failure's own `extra` key. + """ + failure = { + "post_ids": post_ids, + "reason": reason or "error", + "reason_description": reason_description or "", + } + if url is not None: + failure["url"] = url + extra = cls._legacy_extra(*extra_sources, exclude=cls._LEGACY_FAILURE_KEYS) + if extra: + failure["extra"] = extra + return failure + + @staticmethod + def _build_item(filename: str, post_ids: list, url, extra: dict) -> dict: + item = {"filename": filename, "post_ids": list(post_ids)} + if url is not None: + item["url"] = url + if extra: + item["extra"] = extra + return item + + # -- mutation -- + + def add_item(self, filename: str, *, post_ids, + url: Optional[str] = None, + extra: Optional[dict] = None, + replace: bool = False) -> None: + """ + Record a successfully produced file. + + :param filename: the file's name inside the archive. Must be unique + within this metadata instance unless `replace=True`. + :param post_ids: list of source post IDs (string-coerced). + :param url: optional originating URL. + :param extra: optional free-form per-file data (e.g. yt-dlp dump). + :param replace: overwrite an existing entry with the same filename. + """ + if not filename or not isinstance(filename, str): + raise MetadataException("add_item: 'filename' must be a non-empty string.") + if not replace and filename in self.items: + raise MetadataException(f"add_item: filename {filename!r} already present.") + self.items[filename] = self._build_item( + filename, self._normalize_post_ids(post_ids), url, dict(extra) if extra else {} + ) + + def add_failure(self, *, post_ids, reason: str, + reason_description: Optional[str] = None, + url: Optional[str] = None) -> None: + """ + Record a download attempt that did not produce a file. + + :param post_ids: list of source post IDs (string-coerced; may be empty). + :param reason: structured failure code (e.g. "error", "no_media"). + :param reason_description: optional human-readable explanation. + :param url: optional URL that was attempted. + """ + if not reason or not isinstance(reason, str): + raise MetadataException("add_failure: 'reason' must be a non-empty string.") + failure = { + "post_ids": self._normalize_post_ids(post_ids), + "reason": reason, + } + if url is not None: + failure["url"] = url + if reason_description is not None: + failure["reason_description"] = reason_description + self.failures.append(failure) + + @staticmethod + def _normalize_post_ids(post_ids) -> list: + if post_ids is None: + return [] + if isinstance(post_ids, (str, int)): + return [str(post_ids)] + return [str(p) for p in post_ids] + + @classmethod + def _normalize_entry_post_ids(cls, entry): + """ + Return a copy of an item/failure dict with `post_ids` coerced to a + list of strings. Non-dict entries are returned unchanged. + """ + if not isinstance(entry, dict) or "post_ids" not in entry: + return entry + entry = dict(entry) + entry["post_ids"] = cls._normalize_post_ids(entry["post_ids"]) + return entry + + # -- access -- + + def get_entry(self, filename: str) -> Optional[dict]: + return self.items.get(filename) + + def iter_entries(self) -> Iterator[tuple]: + return iter(self.items.items()) + + def iter_failures(self) -> Iterator[dict]: + return iter(self.failures) + + def filename_to_post_ids(self) -> dict: + """ + Return a `{filename: [post_id, ...]}` mapping for successful entries. + + Replaces the per-consumer map-building that strips extensions + and walks nested `files[]` lists. + """ + return {fn: list(entry.get("post_ids", [])) for fn, entry in self.items.items()} + + def post_ids_for(self, filename: str) -> list: + """ + Return the list of source post IDs associated with a given filename, or + empty list if the filename is not present or has no post IDs. + """ + entry = self.items.get(filename) + return list(entry.get("post_ids", [])) if entry else [] + + # -- some helpers for common access patterns -- + + def __len__(self) -> int: + return len(self.items) + + def __contains__(self, filename) -> bool: + return filename in self.items + + # -- output -- + + def to_dict(self) -> dict: + return { + "schema_version": self.schema_version, + "from_dataset": self.from_dataset, + "processor_type": self.processor_type, + "software_version": self.software_version, + "software_source": self.software_source, + "items": self.items, + "failures": self.failures, + } + + def validate(self) -> None: + """ + Strict schema check used before writing. Raises MetadataException on + any violation. + """ + if self.schema_version != self.SCHEMA_VERSION: + raise MetadataException(f"Unsupported schema_version {self.schema_version}.") + if not isinstance(self.items, dict): + raise MetadataException("items must be a dict.") + if not isinstance(self.failures, list): + raise MetadataException("failures must be a list.") + for filename, entry in self.items.items(): + if not isinstance(entry, dict): + raise MetadataException(f"items[{filename!r}] must be a dict.") + if entry.get("filename") != filename: + raise MetadataException( + f"items[{filename!r}].filename must match its key " + f"(got {entry.get('filename')!r})." + ) + if not isinstance(entry.get("post_ids"), list): + raise MetadataException(f"items[{filename!r}].post_ids must be a list.") + for i, failure in enumerate(self.failures): + if not isinstance(failure, dict): + raise MetadataException(f"failures[{i}] must be a dict.") + if not isinstance(failure.get("post_ids"), list): + raise MetadataException(f"failures[{i}].post_ids must be a list.") + reason = failure.get("reason") + if not isinstance(reason, str) or not reason: + raise MetadataException(f"failures[{i}].reason must be a non-empty string.") diff --git a/common/lib/dataset.py b/common/lib/dataset.py index 37aaed1ba..5c83c9af3 100644 --- a/common/lib/dataset.py +++ b/common/lib/dataset.py @@ -2305,77 +2305,41 @@ def get_media_from_children(self, item_ids=[]) -> dict: :param list item_ids: A list of item IDs to limit the filename retrieval to. returns dict: item_id as key and a list of tuples (child dataset key -> filename) as items """ - children = self.get_children() + from common.lib.exceptions import MetadataException + children = self.get_children() if not children: return {} - media_map = {} - - # Get children that are image/video downloaders media_datasets = [ p for p in children if ("video-downloader" in p.type or "image-downloader" in p.type) and p.data.get("num_rows", 0) > 0 and p.is_finished() ] - # Loop through media datasets and create a map of dataset key -> filenames - # Skip files that were downloaded multiple times + media_map = {} seen_files = set() + item_ids = [str(i_id) for i_id in item_ids] for media_dataset in media_datasets: - for item in media_dataset.iterate_items(): - - if item.file.name == ".metadata.json": - with item.file.open() as infile: - metadata = json.load(infile) - - for url_key, item_metadata in metadata.items(): - media_items = set() - post_ids = item_metadata.get("post_ids", []) # Required - - if not post_ids: - continue - - # Make sure we're matching and passing strings - post_ids = [str(p_id) for p_id in post_ids] - item_ids = [str(i_id) for i_id in item_ids] - - # Skip items that are not in the requested item_ids - if item_ids and not any(p_id in item_ids for p_id in post_ids): - continue - - # Single file (images usually format like this) - is_success = item_metadata.get("success", True) - - if is_success and "filename" in item_metadata: - media_info = (media_dataset.key, item_metadata["filename"]) - media_items.add(media_info) - - # Multiple files (videos with the 'files' array) - if item_metadata.get("files"): - for file in item_metadata["files"]: - if file.get("success") and "filename" in file: - media_info = (media_dataset.key, file["filename"]) - media_items.add(media_info) - - if not media_items: - continue - - # Append to post_id list - for post_id in post_ids: - if post_id not in media_map: - media_map[post_id] = [] - - for media_item in media_items: - if media_item not in media_map[post_id]: - # Don't add post_id -> filename couplings that we've already seen - media_ref = (post_id, media_item[1]) - if media_ref not in seen_files: - media_map[post_id].append(media_item) - seen_files.add(media_ref) - - # break after .metadata.json - break + try: + metadata = media_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + continue + + for filename, entry in metadata.iter_entries(): + post_ids = [str(p_id) for p_id in entry.get("post_ids", [])] + if not post_ids: + continue + if item_ids and not any(p_id in item_ids for p_id in post_ids): + continue + + media_info = (media_dataset.key, filename) + for post_id in post_ids: + media_ref = (post_id, filename) + if media_ref in seen_files: + continue + seen_files.add(media_ref) + media_map.setdefault(post_id, []).append(media_info) return media_map @@ -2395,6 +2359,33 @@ def get_metadata(self): metadata["current_4CAT_version"] = get_software_version() return metadata + def read_media_metadata(self, filename=".metadata.json"): + """ + Load this dataset's media-archive `.metadata.json`. + + Returns a `MediaArchiveMetadata` instance. Legacy formats are + normalized on load so callers always see the current schema. + + Raises `FileNotFoundError` if no metadata file is present and + `MetadataException` if the dataset is unfinished or the file is + malformed. + """ + from common.lib.archive_metadata import MediaArchiveMetadata + return MediaArchiveMetadata.read(self, filename=filename) + + def new_media_metadata(self, from_dataset, processor_type, filename=".metadata.json"): + """ + Empty `MediaArchiveMetadata` container for a processor producing a + media archive for this dataset. + + :param str from_dataset: key of the *source* dataset whose posts are + being downloaded from (this is generally not the same as `self`). + :param str processor_type: type of the processor, used for metadata. + """ + from common.lib.archive_metadata import MediaArchiveMetadata + return MediaArchiveMetadata.new(self, processor_type=processor_type, + from_dataset=from_dataset, filename=filename) + def get_result_url(self): """ Gets the 4CAT frontend URL of a dataset file. diff --git a/common/lib/exceptions.py b/common/lib/exceptions.py index b0b798a4c..209d76db6 100644 --- a/common/lib/exceptions.py +++ b/common/lib/exceptions.py @@ -162,3 +162,9 @@ class MediaSignatureException(FourcatException): Raise in media processors when the media cannot be read """ pass + +class MetadataException(FourcatException): + """ + Raise when there is an issue with metadata + """ + pass diff --git a/common/lib/media_archive_library.py b/common/lib/media_archive_library.py new file mode 100644 index 000000000..16dd4e4ae --- /dev/null +++ b/common/lib/media_archive_library.py @@ -0,0 +1,160 @@ +""" +Look up media that previous downloader runs already fetched. + +When a downloader (currently the video downloader) runs, other downloaders of +the same kind may have already fetched some of the same URLs from the same +source data. `MediaArchiveLibrary` aggregates the `MediaArchiveMetadata` of +those previous runs +""" +from common.lib.exceptions import MetadataException, DataSetException + + +class MediaLibraryHit: + """ + Result of a `MediaArchiveLibrary.find()` lookup. + + A success hit carries the archive the files live in (`metadata`, whose + `.dataset` locates the zip) and the matching `(filename, item)` entries. + A failure hit carries the set of failure `reasons` seen for the URL + across all previous archives; the consumer decides what they mean. + """ + + def __init__(self, is_success, metadata=None, entries=None, reasons=None): + self.is_success = is_success + self.metadata = metadata + self.entries = entries or [] + self.reasons = reasons or set() + + +class MediaArchiveLibrary: + """ + Aggregate of `MediaArchiveMetadata` from previous downloader datasets. + + Construct via `collect()` inside a processor; the bare constructor takes + metadata objects directly and is intended for testing. + """ + + def __init__(self, metadata_objects, current_dataset=None): + self.metadata_objects = list(metadata_objects) + self.current_dataset = current_dataset + self._url_index = None + + @classmethod + def collect(cls, current_dataset, modules, compatible_types): + """ + Build a library from finished downloader datasets that share + `current_dataset`'s source data. + + :param current_dataset: the dataset being produced now (excluded + from the result). + :param modules: module registry, used to resolve the original + dataset of a filtered set. + :param list compatible_types: processor types whose archives count, + e.g. `["video-downloader"]`. + """ + datasets = cls._collect_previous_downloaders(current_dataset, modules, compatible_types) + metadata_objects = [] + for dataset in datasets: + try: + metadata_objects.append(dataset.read_media_metadata()) + except (FileNotFoundError, MetadataException): + # no metadata, or unfinished/malformed — nothing to reuse + continue + + if current_dataset is not None: + current_dataset.log( + f"Media library: {len(metadata_objects)} previous " + f"{'/'.join(compatible_types)} archive(s) available for reuse" + ) + return cls(metadata_objects, current_dataset=current_dataset) + + @staticmethod + def _collect_previous_downloaders(current_dataset, modules, compatible_types): + """ + Sibling datasets of a compatible processor type, plus — if the + current dataset's parent is a filtered copy — the downloaders of the + dataset it was copied from. Excludes the current dataset itself. + """ + from common.lib.dataset import DataSet + + # kids from the parent + parent_dataset = current_dataset.get_parent() + downloaders = [ + child for child in parent_dataset.get_children() + if child.type in compatible_types and child.key != current_dataset.key + ] + + # kids from the original (if filtered dataset) + if "copied_from" in parent_dataset.parameters and parent_dataset.is_top_dataset(): + try: + original = DataSet(key=parent_dataset.parameters["copied_from"], + db=current_dataset.db, modules=modules) + downloaders += [ + child for child in original.top_parent().get_children() + if child.type in compatible_types and child.key != current_dataset.key + ] + except DataSetException: + # the original dataset no longer exists + pass + + return downloaders + + def _build_index(self) -> dict: + """ + `{url: {"items": [(metadata, filename, item), ...], + "failures": [(metadata, failure), ...]}}` + + A URL can map to entries from several archives (downloaded more than + once) and to several files within one archive (e.g. a playlist). + """ + index = {} + for metadata in self.metadata_objects: + for filename, item in metadata.iter_entries(): + url = item.get("url") + if not url: + continue + index.setdefault(url, {"items": [], "failures": []}) + index[url]["items"].append((metadata, filename, item)) + for failure in metadata.iter_failures(): + url = failure.get("url") + if not url: + continue + index.setdefault(url, {"items": [], "failures": []}) + index[url]["failures"].append((metadata, failure)) + return index + + @property + def url_index(self) -> dict: + if self._url_index is None: + self._url_index = self._build_index() + return self._url_index + + def find(self, url: str): + """ + Look up a URL across all previous downloader archives. + + Returns a `MediaLibraryHit` — a success hit if any archive + downloaded it, otherwise a failure hit carrying every failure + `reason` seen — or `None` if the URL was never seen. + """ + bucket = self.url_index.get(url) + if not bucket: + return None + + if bucket["items"]: + # success beats failure; take the first archive that has the URL + # and copy all of its files for that URL (one URL may yield many, + # e.g. a playlist) + first_metadata = bucket["items"][0][0] + entries = [(filename, item) for metadata, filename, item in bucket["items"] + if metadata is first_metadata] + return MediaLibraryHit(is_success=True, metadata=first_metadata, entries=entries) + + # pass all the failures (e.g. in case one is "error", but later is "not_a_video", or + # something else the consumer wants to interpret) + reasons = {failure.get("reason") for _, failure in bucket["failures"] + if failure.get("reason")} + return MediaLibraryHit(is_success=False, reasons=reasons) + + def __len__(self) -> int: + return len(self.metadata_objects) diff --git a/processors/audio/audio_extractor.py b/processors/audio/audio_extractor.py index 91c68fc7b..06bb41c00 100644 --- a/processors/audio/audio_extractor.py +++ b/processors/audio/audio_extractor.py @@ -10,7 +10,7 @@ import oslex from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException __author__ = "Dale Wahl" __credits__ = ["Dale Wahl"] @@ -89,6 +89,20 @@ def process(self): if max_files != 0: total_possible_videos = min(total_possible_videos, max_files) + # Read the source video archive's metadata so each extracted audio + # file can carry its video's provenance (source posts, URL). + try: + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None + + # Build our own metadata describing the audio files we produce, + # rather than passing the (video-keyed) source metadata through. + metadata = self.dataset.new_media_metadata( + processor_type=self.type, + from_dataset=(source_metadata.from_dataset if source_metadata else self.source_dataset.key), + ) + processed_videos = 0 written = 0 @@ -97,9 +111,9 @@ def process(self): if self.interrupted: raise ProcessorInterruptedException("Interrupted while determining image wall order") - # Check for 4CAT's metadata JSON and copy it + # the source archive's metadata describes videos; we write our + # own (audio-keyed) metadata below, so skip the original if item.file.name == '.metadata.json': - shutil.copy(item.file, output_dir.joinpath(".metadata.json")) continue if max_files != 0 and processed_videos >= max_files: @@ -123,9 +137,23 @@ def process(self): ffmpeg_output = result.stdout.decode("utf-8") ffmpeg_error = result.stderr.decode("utf-8") - audio_file = output_dir.joinpath(f"{vid_name}.wav") + audio_filename = f"{vid_name}.wav" + audio_file = output_dir.joinpath(audio_filename) + + # carry the source video's provenance onto the extracted audio + video_item = source_metadata.get_entry(item.file.name) if source_metadata else None + post_ids = video_item.get("post_ids", []) if video_item else [] + source_url = video_item.get("url") if video_item else None + extra = dict(video_item.get("extra") or {}) if video_item else {} + if audio_file.exists(): written += 1 + metadata.add_item(audio_filename, post_ids=post_ids, url=source_url, + extra=extra, replace=True) + else: + metadata.add_failure(post_ids=post_ids, reason="extraction_failed", + reason_description=f"ffmpeg exited with code {result.returncode}", + url=source_url) if ffmpeg_output: with open(str(output_dir.joinpath(f"{vid_name}_stdout.log")), 'w', encoding="utf-8") as outfile: @@ -143,6 +171,10 @@ def process(self): self.dataset.update_status(f"Extracted audio from {written} of {processed_videos} attempted videos") self.dataset.update_progress(min(1, processed_videos / max(total_possible_videos, 1))) + # Write our own metadata describing the extracted audio files (and + # any extraction failures), keyed by audio filename. + metadata.write(output_dir) + # Finish up warning = f"Extracted {written}/{processed_videos} audio files, check the logs for errors." \ if written < processed_videos else None diff --git a/processors/conversion/hash_images.py b/processors/conversion/hash_images.py index 25a6eb4de..8579c73d2 100644 --- a/processors/conversion/hash_images.py +++ b/processors/conversion/hash_images.py @@ -2,12 +2,11 @@ Hash images """ import csv -import json from PIL import UnidentifiedImageError from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.helpers import UserInput, hash_image, stringify_hash from processors.metrics.group_hashes import HashGrouper @@ -117,50 +116,25 @@ def process(self): group_by = self.parameters.get("group-by", False) similarity_pct = float(self.parameters.get("similarity-threshold", 4)) - # Get staging area - staging_area = self.dataset.get_staging_area() - - # Extract metadata if present try: - metadata_file = self.extract_archived_file_by_name(".metadata.json", self.source_file, staging_area) - except FileNotFoundError: - metadata_file = None + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None - metadata_extra_fields = ["post_ids", "post_id", "url", "from_dataset"] image_data_by_filename = {} - if metadata_file: - with open(metadata_file) as file: - image_data = json.load(file) - for url, item in image_data.items(): - if "filename" in item: - image_data_by_filename[item["filename"]] = item - elif "files" in item: - files = item.get('files') - if not isinstance(files, list): - self.log.warning(f"Invalid 'files' entry in metadata (expected list, got {type(files)}); cannot use file metadata") - image_data_by_filename = {} - break - for file in files: - if "filename" in file: - # add extra fields from parent item if not present in file entry - for key in metadata_extra_fields: - if key in file: - continue - elif key in item: - file[key] = item[key] - image_data_by_filename[file["filename"]] = file - - self.dataset.log("Found and loaded image metadata") + if source_metadata is not None: + for filename, item in source_metadata.iter_entries(): + image_data_by_filename[filename] = item + self.dataset.log("Found and loaded image metadata") else: self.dataset.log("No image metadata found") - image_data_by_filename = {} # Set up CSV fieldnames (always include hash_size even if crhash to indicate 'None') base_fields = ["filename", "image_hash", "hash_type", "hash_size"] + metadata_extra_fields = ["post_ids", "url", "from_dataset"] fieldnames = (["group"] if group_by else []) + base_fields if image_data_by_filename: - example = next(iter(image_data_by_filename.values())) - fieldnames.extend([key for key in metadata_extra_fields if key in example.keys()]) + fieldnames.extend(metadata_extra_fields) processed = 0 skipped = 0 @@ -209,9 +183,10 @@ def process(self): with self.dataset.get_results_path().open("w", newline="", encoding="utf-8") as output: writer = csv.DictWriter(output, fieldnames=fieldnames) writer.writeheader() + from_dataset = source_metadata.from_dataset if source_metadata else "" for it in items: image_metadata = image_data_by_filename.get(it["filename"], {}) - + # Convert hash object to string for CSV storage hash_str = stringify_hash(it["hash_obj"], it["hash_type"]) row = { @@ -221,10 +196,10 @@ def process(self): "image_hash": hash_str, "hash_type": it["hash_type"], } - # Add optional metadata fields if present - for key in fieldnames: - if key not in row: - row[key] = image_metadata.get(key, "") + if image_data_by_filename: + row["post_ids"] = ", ".join(image_metadata.get("post_ids", [])) + row["url"] = image_metadata.get("url", "") + row["from_dataset"] = from_dataset writer.writerow(row) final_msg = f"Processed {processed:,} images" diff --git a/processors/conversion/view_metadata.py b/processors/conversion/view_metadata.py index 3a2436f3b..75e331a7d 100644 --- a/processors/conversion/view_metadata.py +++ b/processors/conversion/view_metadata.py @@ -3,10 +3,8 @@ Designed to work with any processor that has a 'map_metadata' method """ -import json -import zipfile - from backend.lib.processor import BasicProcessor +from common.lib.exceptions import MetadataException from common.lib.user_input import UserInput __author__ = "Dale Wahl" @@ -59,20 +57,18 @@ def is_compatible_with(cls, module=None, config=None): def process(self): """ - Grabs .metadata.json and reformats + Read .metadata.json from the parent archive and reformat as CSV using + the parent producer's `map_metadata` / `map_failure_metadata` hooks. """ self.dataset.update_status("Collecting .metadata.json file") - with zipfile.ZipFile(self.source_file, "r") as archive_file: - archive_contents = sorted(archive_file.namelist()) - if '.metadata.json' not in archive_contents: - self.dataset.finish_with_error("Unable to identify metadata file") - return - - staging_area = self.dataset.get_staging_area() - archive_file.extract(".metadata.json", staging_area) - - with open(staging_area.joinpath(".metadata.json")) as file: - metadata_file = json.load(file) + try: + metadata = self.dataset.get_parent().read_media_metadata() + except FileNotFoundError: + self.dataset.finish_with_error("Unable to identify metadata file") + return + except MetadataException as e: + self.dataset.finish_with_error(f"Unable to read metadata: {e}") + return parent_processor = self.dataset.get_parent().get_own_processor() if parent_processor is None or not hasattr(parent_processor, "map_metadata"): @@ -83,21 +79,24 @@ def process(self): self.dataset.log(f"Collecting metadata created by {parent_processor.type}") include_failed = self.parameters.get("include_failed", False) + map_failure = getattr(parent_processor, "map_failure_metadata", None) rows = [] num_posts = 0 - with self.dataset.get_results_path().open("w", encoding="utf-8", newline=""): - for key, value in metadata_file.items(): - if not include_failed and not value.get("success", True): - continue - - # Metadata may contain more than one row/item per key, value pair - for item in parent_processor.map_metadata(key, value): - rows.append(item) + + for filename, item in metadata.iter_entries(): + for row in parent_processor.map_metadata(filename, item): + rows.append(row) + num_posts += 1 + + if include_failed and map_failure is not None: + for failure in metadata.iter_failures(): + for row in map_failure(failure): + rows.append(row) num_posts += 1 # Finish up self.dataset.update_status(f"Read metadata for {num_posts:,} item(s).") - + if rows: self.write_csv_items_and_finish(rows) else: diff --git a/processors/filtering/unique_images.py b/processors/filtering/unique_images.py index 482ad3ceb..82bea6898 100644 --- a/processors/filtering/unique_images.py +++ b/processors/filtering/unique_images.py @@ -2,10 +2,9 @@ Filter by unique images """ import shutil -import json from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.helpers import UserInput, hash_file __author__ = "Stijn Peeters" @@ -76,7 +75,7 @@ def process(self): """ seen_hashes = set() hash_map = {} - metadata = None + hash_type = self.parameters.get("hash-type") dupes = 0 processed = 0 staging_area = self.dataset.get_staging_area() @@ -93,11 +92,9 @@ def process(self): processed += 1 if image.file.name == ".metadata.json": - with image.file.open() as infile: - metadata = json.load(infile) continue - image_hash = hash_file(image.file, self.parameters.get("hash-type")) + image_hash = hash_file(image.file, hash_type) if image_hash not in seen_hashes: seen_hashes.add(image_hash) @@ -107,21 +104,37 @@ def process(self): self.dataset.log(f"{image.file.name} is a duplicate of {hash_map[image_hash]} - skipping") dupes += 1 - new_metadata = {} + try: + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None + + new_metadata = self.dataset.new_media_metadata( + processor_type=self.type, + from_dataset=(source_metadata.from_dataset if source_metadata else self.source_dataset.key), + ) inverse_hashmap = {v: k for k, v in hash_map.items()} - if metadata: - for url, item in metadata.items(): - if item["filename"] in inverse_hashmap: - new_metadata[inverse_hashmap[item["filename"]]] = { - **item, - "hash": inverse_hashmap[item["filename"]], - "hash_type": self.parameters.get("hash-type") - } + if source_metadata is not None: + for filename, item in source_metadata.iter_entries(): + if filename not in inverse_hashmap: + continue + extra = dict(item.get("extra") or {}) + extra["hash"] = inverse_hashmap[filename] + extra["hash_type"] = hash_type + new_metadata.add_item( + filename, + post_ids=item.get("post_ids", []), + url=item.get("url"), + extra=extra, + ) else: - new_metadata = {hash_map[k]: {"filename": hash_map[k], "hash": k, "hash_type": self.parameters.get("hash-type")} for k in hash_map} + for h, filename in hash_map.items(): + new_metadata.add_item( + filename, post_ids=[], + extra={"hash": h, "hash_type": hash_type}, + ) - with staging_area.joinpath(".metadata.json").open("w") as outfile: - json.dump(new_metadata, outfile) + new_metadata.write(staging_area) self.dataset.update_status(f"Image archive filtered, found {dupes:,} duplicate(s)", is_final=True) self.write_archive_and_finish(staging_area, len(hash_map), finish=True) diff --git a/processors/machine_learning/audio_to_text.py b/processors/machine_learning/audio_to_text.py index 62a5e9458..9762089b3 100644 --- a/processors/machine_learning/audio_to_text.py +++ b/processors/machine_learning/audio_to_text.py @@ -8,7 +8,7 @@ from backend.lib.processor import BasicProcessor from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException, DsmOutOfMemory -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.user_input import UserInput from common.lib.item_mapping import MappedItem @@ -511,18 +511,20 @@ def process(self): self.dataset.update_status(f"Got {i + 1} transcription{s} from OpenAI") self.dataset.update_progress((i + 1) / max_files) - # Load the video metadata if available - video_metadata = None + # Load the source archive's metadata if available. Map by + # extension-less filename: the transcription result files share their + # stem with the input audio (e.g. `foo.wav` -> `foo.json`). file_metadata_map = {} - if staging_area.joinpath(".metadata.json").is_file(): - with open(staging_area.joinpath(".metadata.json")) as file: - video_metadata = json.load(file) - self.dataset.log("Found and loaded video metadata") - - for video_id, video_data in video_metadata.items(): - for file in video_data.get("files", []): - file = ".".join(file["filename"].split(".")[:-1]) - file_metadata_map[file] = video_data + try: + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None + if source_metadata is not None: + self.dataset.log("Found and loaded audio metadata") + from_dataset = source_metadata.from_dataset + for filename, item in source_metadata.iter_entries(): + basename = filename.rsplit(".", 1)[0] + file_metadata_map[basename] = {**item, "from_dataset": from_dataset} self.dataset.update_status("Processing results...") diff --git a/processors/machine_learning/blip2_image_caption.py b/processors/machine_learning/blip2_image_caption.py index c515085d9..1447dc4db 100644 --- a/processors/machine_learning/blip2_image_caption.py +++ b/processors/machine_learning/blip2_image_caption.py @@ -6,7 +6,7 @@ from backend.lib.processor import BasicProcessor from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException, DsmOutOfMemory, DsmConnectionError -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.user_input import UserInput from common.lib.item_mapping import MappedItem @@ -206,21 +206,19 @@ def process(self): self.dataset.finish_with_error("Error with BLIP2 model; please contact 4CAT admins.") return - # Load the video metadata if available + # Load the image metadata if available. Key by basename because BLIP2 + # results share the input image's stem but use a different extension. image_metadata = {} try: - metadata_file = self.extract_archived_file_by_name(".metadata.json", self.source_file, staging_area) - except FileNotFoundError: - metadata_file = None - if metadata_file: - with open(metadata_file) as file: - image_data = json.load(file) - self.dataset.log("Found and loaded image metadata") - for url, data in image_data.items(): - if data.get('success'): - data.update({"url": url}) - # using the filename without extension as the key; since that is how the results form their filename - image_metadata[".".join(data['filename'].split(".")[:-1])] = data + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None + if source_metadata is not None: + self.dataset.log("Found and loaded image metadata") + from_dataset = source_metadata.from_dataset + for filename, item in source_metadata.iter_entries(): + basename = filename.rsplit(".", 1)[0] + image_metadata[basename] = {**item, "from_dataset": from_dataset} else: self.dataset.log("No image metadata found") diff --git a/processors/machine_learning/clarifai_api.py b/processors/machine_learning/clarifai_api.py index aad832e91..2204c48c7 100644 --- a/processors/machine_learning/clarifai_api.py +++ b/processors/machine_learning/clarifai_api.py @@ -9,6 +9,7 @@ from clarifai_grpc.channel.clarifai_channel import ClarifaiChannel from common.lib.helpers import UserInput, convert_to_int +from common.lib.exceptions import MetadataException from backend.lib.processor import BasicProcessor __author__ = "Stijn Peeters" @@ -145,6 +146,12 @@ def process(self): processed = 0 annotated = 0 + try: + source_metadata = self.source_dataset.read_media_metadata() + filename_to_post_ids = source_metadata.filename_to_post_ids() + except (FileNotFoundError, MetadataException): + filename_to_post_ids = {} + # send batched requests per model for model_id in models: iterator = self.source_dataset.iterate_items() @@ -165,10 +172,6 @@ def process(self): send_batch = True if image: - if image.file.name == ".metadata.json": - # Metadata, we keep this for writing annotations - image_metadata = json.load(image.file.open("r")) - if image.file.suffix in (".json", ".log"): continue @@ -230,8 +233,6 @@ def process(self): # Get original post IDs for annotations fourcat_annotations = [] annotated = 0 - if save_annotations: - filename_and_post_ids = {v["filename"]: v["post_ids"] for v in image_metadata.values()} # save the buffered results (we do this only now so we can also store # combined annotations) @@ -252,7 +253,7 @@ def process(self): if save_annotations: for label, confidence in all_annotations.items(): if confidence > annotation_threshold: - for item_id in filename_and_post_ids[image]: + for item_id in filename_to_post_ids.get(image, []): fourcat_annotations.append({ "label": "clarifai_" + label, "value": confidence, diff --git a/processors/machine_learning/clip_categorize_images.py b/processors/machine_learning/clip_categorize_images.py index 30d29e463..12f1d689c 100644 --- a/processors/machine_learning/clip_categorize_images.py +++ b/processors/machine_learning/clip_categorize_images.py @@ -7,7 +7,7 @@ from backend.lib.processor import BasicProcessor from common.lib.dmi_service_manager import DmiServiceManager, DmiServiceManagerException, DsmOutOfMemory, DsmConnectionError -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.user_input import UserInput from common.lib.item_mapping import MappedItem @@ -228,17 +228,19 @@ def process(self): self.dataset.finish_with_error("Error with CLIP model; please contact 4CAT admins.") return - # Load the video metadata if available + # Load the image metadata if available. Key by basename because CLIP + # result files share the input image's stem but use `.json` extension. image_metadata = {} - if staging_area.joinpath(".metadata.json").is_file(): - with open(staging_area.joinpath(".metadata.json")) as file: - image_data = json.load(file) - self.dataset.log("Found and loaded image metadata") - for url, data in image_data.items(): - if data.get('success'): - data.update({"url": url}) - # using the filename without extension as the key; since that is how the results form their filename - image_metadata[".".join(data['filename'].split(".")[:-1])] = data + try: + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None + if source_metadata is not None: + self.dataset.log("Found and loaded image metadata") + from_dataset = source_metadata.from_dataset + for filename, item in source_metadata.iter_entries(): + basename = filename.rsplit(".", 1)[0] + image_metadata[basename] = {**item, "from_dataset": from_dataset} self.dataset.update_status("Processing CLIP results...") # Download the result files diff --git a/processors/machine_learning/generate_images.py b/processors/machine_learning/generate_images.py index 7ca0d219e..7d5f2b071 100644 --- a/processors/machine_learning/generate_images.py +++ b/processors/machine_learning/generate_images.py @@ -273,22 +273,23 @@ def make_filename(id, prompt): return f"{id}-{safe_prompt}.jpeg" self.dataset.update_status("Verifying results") - with output_dir.joinpath(".metadata.json").open("w") as outfile: - metadata = { - prompt_id: { - "from_dataset": self.source_dataset.key, - "filename": make_filename(prompt_id, data["prompt"]), - "success": output_dir.joinpath(make_filename(prompt_id, data["prompt"])).exists(), - "post_ids": [prompt_id], - "prompt": data["prompt"], - "negative-prompt": data["negative"], - } for prompt_id, data in prompts.items() - } - json.dump(metadata, outfile) + metadata = self.dataset.new_media_metadata( + processor_type=self.type, from_dataset=self.source_dataset.key + ) + for prompt_id, data in prompts.items(): + filename = make_filename(prompt_id, data["prompt"]) + extra = {"prompt": data["prompt"], "negative-prompt": data["negative"]} + if output_dir.joinpath(filename).exists(): + metadata.add_item(filename, post_ids=[prompt_id], extra=extra, replace=True) + else: + metadata.add_failure(post_ids=[prompt_id], reason="error", + reason_description="Generated image not found on disk") + metadata.write(output_dir) shutil.rmtree(staging_area) + successful = len(metadata) self.dataset.update_status( - f"Generated {len([r for r in metadata.values() if r['success']]):,} image(s) for {len(prompts):,} prompt(s)", + f"Generated {successful:,} image(s) for {len(prompts):,} prompt(s)", is_final=True) - self.write_archive_and_finish(output_dir, num_items=len([r for r in metadata.values() if r['success']])) + self.write_archive_and_finish(output_dir, num_items=successful) diff --git a/processors/machine_learning/google_vision_api.py b/processors/machine_learning/google_vision_api.py index 2491967e1..8fd3cdff6 100644 --- a/processors/machine_learning/google_vision_api.py +++ b/processors/machine_learning/google_vision_api.py @@ -10,7 +10,7 @@ from common.lib.helpers import UserInput, convert_to_int from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException __author__ = "Stijn Peeters" __credits__ = ["Stijn Peeters"] @@ -126,8 +126,12 @@ def process(self): processed = 0 done = 0 - # Get the .metadata.json file if we're writing annotations so we can add them to specific rows. - img_metadata = [] + # Load the metadata file so we can attach source post IDs to annotations. + try: + source_metadata = self.source_dataset.read_media_metadata() + img_metadata = source_metadata.filename_to_post_ids() + except (FileNotFoundError, MetadataException): + img_metadata = {} # Loop through images for image in self.source_dataset.iterate_items(): @@ -138,13 +142,7 @@ def process(self): self.dataset.update_progress(done / total) if image.file.name.startswith(".") or image.file.suffix in (".json", ".log"): - - # Get the .metadata.json file so we can also save item IDs. - if image.file.name == ".metadata.json": - img_metadata = json.load(image.file.open()) - if img_metadata: - img_metadata = {v["filename"]: v.get("post_ids", []) for v in img_metadata.values()} - else: + if image.file.name != ".metadata.json": self.dataset.log(f"Skipping file {image.file.name}, probably not an image.") continue diff --git a/processors/machine_learning/llm_prompter.py b/processors/machine_learning/llm_prompter.py index c2bd0d02e..d328f4502 100644 --- a/processors/machine_learning/llm_prompter.py +++ b/processors/machine_learning/llm_prompter.py @@ -14,7 +14,7 @@ from datetime import datetime, timedelta from common.lib.item_mapping import MappedItem -from common.lib.exceptions import ProcessorInterruptedException, QueryParametersException, QueryNeedsExplicitConfirmationException +from common.lib.exceptions import ProcessorInterruptedException, QueryParametersException, QueryNeedsExplicitConfirmationException, MetadataException from common.lib.helpers import UserInput, nthify, andify, remove_nuls, flatten_dict from common.lib.llm import LLMAdapter from backend.lib.processor import BasicProcessor @@ -660,28 +660,8 @@ def process(self): filename_to_post_ids = {} if save_annotations: try: - self.extract_archived_file_by_name(".metadata.json", self.source_file, staging_area) - with open(staging_area.joinpath(".metadata.json")) as meta_file: - archive_metadata = json.load(meta_file) - for url, data in archive_metadata.items(): - if data.get("success") and data.get("post_ids"): - post_ids = [str(pid) for pid in data["post_ids"]] - # A single URL may map to one filename or multiple files (e.g. video + thumbnail) - filenames_for_url = [] - if data.get("filename"): - filenames_for_url.append(data["filename"]) - for file_entry in data.get("files", []): - if file_entry.get("success") and file_entry.get("filename"): - filenames_for_url.append(file_entry["filename"]) - # Merge post_ids per filename; extend rather than overwrite so that - # multiple URLs pointing to the same file don't lose earlier post_ids. - for filename in filenames_for_url: - existing = filename_to_post_ids.setdefault(filename, []) - for post_id in post_ids: - if post_id not in existing: - existing.append(post_id) - - except (FileNotFoundError, json.JSONDecodeError, KeyError) as e: + filename_to_post_ids = self.source_dataset.read_media_metadata().filename_to_post_ids() + except (FileNotFoundError, MetadataException) as e: self.dataset.log(f"Could not load .metadata.json for annotation mapping: {e}. " f"Annotations will use filenames as item IDs.") diff --git a/processors/machine_learning/pix-plot.py b/processors/machine_learning/pix-plot.py index 4668ff62f..9bc9fba6f 100644 --- a/processors/machine_learning/pix-plot.py +++ b/processors/machine_learning/pix-plot.py @@ -2,7 +2,6 @@ Create an PixPlot of downloaded images """ import shutil -import json import ural from datetime import datetime import csv @@ -11,6 +10,7 @@ from werkzeug.utils import secure_filename from common.lib.dmi_service_manager import DmiServiceManager, DsmOutOfMemory, DmiServiceManagerException +from common.lib.exceptions import MetadataException from common.lib.helpers import UserInput, ellipsiate from backend.lib.processor import BasicProcessor @@ -274,11 +274,11 @@ def format_metadata(self, temp_path): """ # Get image data - if not os.path.isfile(os.path.join(temp_path, '.metadata.json')): - # No metadata + try: + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): return False - top_dataset = self.dataset.top_parent() # Check that this is not already a top dataset if top_dataset.key == self.source_dataset.key: @@ -291,38 +291,29 @@ def format_metadata(self, temp_path): # e.g., image dataset uploaded as zip and later filtered via unique_images return False - with open(os.path.join(temp_path, '.metadata.json')) as file: - image_data = json.load(file) - - # Images can belong to multiple posts, so we must build this file as we go + # Images can belong to multiple posts, so we must build this file as we go. + # Keyed by archive filename so the post-loop below can look up images by + # post id without going through the originating URL. images = {} - - # Reformat image_data to access by filename and begin metadata post_id_image_dictionary = {} successful_image_count = 0 - for url, data in image_data.items(): - # Check if image successfully downloaded for image - if data.get('success') and data.get('filename') is not None and data.get('post_ids'): - successful_image_count += 1 - # if no filename, bad metadata; file was not actually downloaded, fixed in 9b603cd1ecdf97fd92c3e1c6200e4b6700dc1e37 - - # dmi_pix_plot API uses secure_filename while pixplot.py (in PixPlot library) uses clean_filename - filename = self.clean_filename(secure_filename(data.get('filename'))) - - for post_id in data.get('post_ids'): - # Add key to post ID dictionary - if post_id in post_id_image_dictionary.keys(): - post_id_image_dictionary[post_id].append(url) - else: - post_id_image_dictionary[post_id] = [url] - - # Add to metadata - images[url] = {'filename': filename, - 'permalink': url, - 'description': 'Number of posts with this image: ' + str(len(data.get('post_ids'))), - 'tags': '', - 'number_of_posts': 0, - } + for archive_filename, item in source_metadata.iter_entries(): + post_ids = item.get("post_ids", []) + if not post_ids: + continue + successful_image_count += 1 + # dmi_pix_plot API uses secure_filename while pixplot.py (in PixPlot library) uses clean_filename + filename = self.clean_filename(secure_filename(archive_filename)) + url = item.get("url", "") + for post_id in post_ids: + post_id_image_dictionary.setdefault(post_id, []).append(archive_filename) + images[archive_filename] = { + 'filename': filename, + 'permalink': url, + 'description': 'Number of posts with this image: ' + str(len(post_ids)), + 'tags': '', + 'number_of_posts': 0, + } self.dataset.log(f"Metadata for {successful_image_count} images collected from {len(post_id_image_dictionary)} posts") diff --git a/processors/machine_learning/text_from_image.py b/processors/machine_learning/text_from_image.py index 8dc7efe9a..c90cb9ef9 100644 --- a/processors/machine_learning/text_from_image.py +++ b/processors/machine_learning/text_from_image.py @@ -11,7 +11,7 @@ from common.lib.dmi_service_manager import DmiServiceManager, DsmOutOfMemory, DmiServiceManagerException, DsmConnectionError from common.lib.helpers import UserInput, hash_to_md5 from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.item_mapping import MappedItem __author__ = "Dale Wahl" @@ -135,15 +135,11 @@ def process(self): # Collect filenames and metadata image_filenames = [] skipped_images = 0 - metadata_file = None staging_area = self.dataset.get_staging_area() for image in self.source_dataset.iterate_items(staging_area=staging_area, immediately_delete=False): if self.interrupted: raise ProcessorInterruptedException("Interrupted while unzipping images") - if image.file.name == ".metadata.json": - metadata_file = image.file.name - continue elif image.file.name.split('.')[-1] in ["json", "log"]: continue elif image.file.name.split('.')[-1] == "svg": @@ -210,38 +206,20 @@ def process(self): dmi_service_manager.process_results(output_dir) # Load the metadata from the archive - image_metadata = {} - if metadata_file is None: - try: - self.extract_archived_file_by_name(".metadata.json", self.source_file, staging_area) - metadata_exists = True - except FileNotFoundError: - self.dataset.update_status("No metadata file found") - metadata_exists = False - else: - # Previously extracted - metadata_exists = True - - if metadata_exists: - with open(os.path.join(staging_area, '.metadata.json')) as file: - image_data = json.load(file) - for url, data in image_data.items(): - if data.get('success'): - data.update({"url": url}) - image_metadata[data['filename']] = data + try: + source_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + source_metadata = None + self.dataset.update_status("No metadata file found") # Check if we need to collect data for updating the original dataset save_annotations = self.parameters.get("save_annotations", False) if save_annotations: - if not metadata_exists: + if source_metadata is None: self.dataset.update_status("No metadata file found, cannot write to original dataset") save_annotations = False else: - # Create filename to post id mapping - filename_to_post_id = {} - for url, data in image_data.items(): - if data.get("success"): - filename_to_post_id[data.get("filename")] = data.get("post_ids") + filename_to_post_id = source_metadata.filename_to_post_ids() post_id_to_results = {} # Save files as NDJSON, then use map_item for 4CAT to interact @@ -272,7 +250,7 @@ def process(self): data = { "id": image_name, **result_data, - "image_metadata": image_metadata.get(image_name, {}) if image_metadata else {}, + "image_metadata": (source_metadata.get_entry(image_name) or {}) if source_metadata else {}, } outfile.write(json.dumps(data) + "\n") diff --git a/processors/networks/image-network.py b/processors/networks/image-network.py index ab71e864d..fc8333163 100644 --- a/processors/networks/image-network.py +++ b/processors/networks/image-network.py @@ -1,8 +1,6 @@ """ Make a bipartite Image-Item network """ -import json - from backend.lib.processor import BasicProcessor from common.lib.helpers import hash_file @@ -13,7 +11,7 @@ __maintainer__ = "Stijn Peeters" __email__ = "4cat@oilab.eu" -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.user_input import UserInput @@ -110,49 +108,45 @@ def process(self): column = self.parameters.get("column") hash_type = self.parameters.get("deduplicate") filename_filter = [".metadata.json"] if hash_type == "none" else [] - metadata = None hashed = 0 - # some maps to make sure we use the right value in the right place - # url or filename, original image or duplicate, etc - file_hash_map = {} + try: + metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + return self.dataset.finish_with_error("No valid metadata found in image archive - this processor can only " + "be run on sets of images sourced from another 4CAT dataset.") + + # File maps used during graph construction. With v1 metadata each + # entry is already keyed by filename, so the file_hash_map starts as + # the identity mapping; if image-value is "url" we'll resolve via + # file_url_map at edge-write time. + file_hash_map = {filename: filename for filename, _ in metadata.iter_entries()} hash_file_map = {} seen_hashes = set() id_file_map = {} for image in self.source_dataset.iterate_items(filename_filter=filename_filter): if image.file.name == ".metadata.json": - with image.file.open() as infile: - try: - metadata = json.load(infile) - file_hash_map = {i: v["filename"] for i, v in metadata.items()} if self.parameters.get("image-value") == "url" else {i["filename"]: i["filename"] for i in metadata.values()} - except json.JSONDecodeError: - pass - else: - try: - hashed += 1 - if hashed % 100 == 0: - self.dataset.update_status(f"Generated identity hashes for {hashed:,} of {self.source_dataset.num_rows-1:,} item(s)") - self.dataset.update_progress(hashed / (self.source_dataset.num_rows-1) * 0.5) - file_hash = hash_file(image.file, hash_type) - file_hash_map[image.file.name] = file_hash - if file_hash not in hash_file_map: - hash_file_map[file_hash] = image.file.name - - except (FileNotFoundError, ValueError): - continue - - if not metadata: - return self.dataset.finish_with_error("No valid metadata found in image archive - this processor can only " - "be run on sets of images sourced from another 4CAT dataset.") + continue + try: + hashed += 1 + if hashed % 100 == 0: + self.dataset.update_status(f"Generated identity hashes for {hashed:,} of {self.source_dataset.num_rows-1:,} item(s)") + self.dataset.update_progress(hashed / (self.source_dataset.num_rows-1) * 0.5) + file_hash = hash_file(image.file, hash_type) + file_hash_map[image.file.name] = file_hash + if file_hash not in hash_file_map: + hash_file_map[file_hash] = image.file.name + except (FileNotFoundError, ValueError): + continue - file_url_map = {v["filename"]: u for u, v in metadata.items()} - for url, details in metadata.items(): - for item_id in details.get("post_ids", []): + file_url_map = {filename: item.get("url", "") for filename, item in metadata.iter_entries()} + for filename, item in metadata.iter_entries(): + for item_id in item.get("post_ids", []): if self.source_dataset.type.endswith("-telegram"): # telegram has weird IDs - item_id = "-".join(details["filename"].split("-")[:-1]) + "-" + str(item_id) - id_file_map[item_id] = details["filename"] + item_id = "-".join(filename.split("-")[:-1]) + "-" + str(item_id) + id_file_map[item_id] = filename root_dataset = self.get_root_dataset(self.dataset) self.for_cleanup.append(root_dataset) diff --git a/processors/visualisation/download_images.py b/processors/visualisation/download_images.py index 7bde41b03..0bd1d2d63 100644 --- a/processors/visualisation/download_images.py +++ b/processors/visualisation/download_images.py @@ -259,7 +259,9 @@ def process(self): ua = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0" downloaded_files = set() failures = [] - metadata = {} + metadata = self.dataset.new_media_metadata( + processor_type=self.type, from_dataset=self.source_dataset.key + ) self.dataset.log(f"Filename prep for {len(urls)} URLs") # prepare filenames for each url @@ -422,22 +424,23 @@ def process(self): if failure: failures.append(url) downloaded_file.unlink(missing_ok=True) - - metadata[url] = { - "filename": self.filenames[url], - "url": self.resolve_url(url), - "success": not failure, - "from_dataset": self.source_dataset.key, - "post_ids": item_map[url], - } + metadata.add_failure( + post_ids=item_map[url], + reason="error", + url=url, + ) + else: + metadata.add_item( + filename=self.filenames[url], + post_ids=item_map[url], + url=self.resolve_url(url), + replace=True, + ) if self.complete: break - with self.staging_area.joinpath(".metadata.json").open( - "w", encoding="utf-8" - ) as outfile: - json.dump(metadata, outfile) + metadata.write(self.staging_area) # delete supernumerary partially downloaded files self.flush_proxied_requests() # get rid of remaining queue @@ -450,7 +453,7 @@ def process(self): # finish up self.dataset.update_progress(1.0) self.write_archive_and_finish( - self.staging_area, len([x for x in metadata.values() if x.get("success")]), + self.staging_area, len(metadata), finish=False ) if self.warning_message: @@ -644,20 +647,29 @@ def get_valid_file_extension(self, file): raise InvalidDownloadedFileException(getattr(e, "message", str(e))) from e @staticmethod - def map_metadata(url, data): + def map_metadata(filename, item): """ - Iterator to yield modified metadata for CSV - - :param str url: string that may contain URLs - :param dict data: dictionary with metadata collected previously - :yield dict: iterator containing reformated metadata + Yield CSV row(s) for a successful `items[filename]` entry. """ - row = { - "url": url, - "number_of_posts_with_url": len(data.get("post_ids", [])), - "post_ids": ", ".join(data.get("post_ids", [])), - "filename": data.get("filename"), - "download_successful": data.get("success", ""), + yield { + "url": item.get("url", ""), + "number_of_posts_with_url": len(item.get("post_ids", [])), + "post_ids": ", ".join(item.get("post_ids", [])), + "filename": filename, + "download_successful": True, } - yield row + @staticmethod + def map_failure_metadata(failure): + """ + Yield CSV row(s) for a `failures[]` entry. + """ + yield { + "url": failure.get("url", ""), + "number_of_posts_with_url": len(failure.get("post_ids", [])), + "post_ids": ", ".join(failure.get("post_ids", [])), + "filename": "", + "download_successful": False, + "reason": failure.get("reason", ""), + "reason_description": failure.get("reason_description", ""), + } diff --git a/processors/visualisation/download_telegram_videos.py b/processors/visualisation/download_telegram_videos.py index 270113713..4ce61cdf8 100644 --- a/processors/visualisation/download_telegram_videos.py +++ b/processors/visualisation/download_telegram_videos.py @@ -9,7 +9,6 @@ class attributes to switch behavior for a different media type. """ import asyncio import hashlib -import json import re from collections import Counter @@ -153,14 +152,15 @@ def process(self): """ self.staging_area = self.dataset.get_staging_area() self.eventloop = None - self.metadata = {} + self.metadata = self.dataset.new_media_metadata( + processor_type=self.type, from_dataset=self.source_dataset.key + ) self.reason_counts = Counter() asyncio.run(self.get_media()) # finish up - with self.staging_area.joinpath(".metadata.json").open("w", encoding="utf-8") as outfile: - json.dump(self.metadata, outfile) + self.metadata.write(self.staging_area) self.dataset.update_status(f"Compressing {self._media_label}s") successful = self.reason_counts.get("ok", 0) @@ -318,21 +318,20 @@ async def get_media(self): reason_code, reason_text = self.categorize_download_error(e) self.reason_counts[reason_code] += 1 - if not success: + post_ids = [msg_id] if msg_id else [] + if success: + self.metadata.add_item(filename, post_ids=post_ids, replace=True) + else: msg_id_log = msg_id if msg_id else f"index {media_done:,}" self.dataset.log( f"Skipped {self._media_label} for message {msg_id_log} " f"[{reason_code}]: {reason_text}") self.flawless = False - - self.metadata[filename] = { - "filename": filename, - "success": success, - "reason": reason_code, - "reason_description": reason_text, - "from_dataset": self.source_dataset.key, - "post_ids": [msg_id] if msg_id else [] - } + self.metadata.add_failure( + post_ids=post_ids, + reason=reason_code, + reason_description=reason_text, + ) media_done += 1 except FloodError as e: @@ -361,14 +360,11 @@ async def get_media(self): for mid in message_ids: msg_id_full = f"{entity}-{mid}" self.reason_counts[reason_code] += 1 - self.metadata[msg_id_full] = { - "filename": "", - "success": False, - "reason": reason_code, - "reason_description": reason_text, - "from_dataset": self.source_dataset.key, - "post_ids": [msg_id_full] - } + self.metadata.add_failure( + post_ids=[msg_id_full], + reason=reason_code, + reason_description=reason_text, + ) # end-of-run outcome breakdown into the dataset log so researchers can # see counts per category without parsing per-message lines @@ -381,17 +377,25 @@ async def get_media(self): await client.disconnect() @classmethod - def map_metadata(cls, filename, data): - """ - Iterator to yield modified metadata for CSV - """ - row = { - cls._metadata_count_label: len(data.get("post_ids", [])), - "post_ids": ", ".join(map(str, data.get("post_ids", []))), + def map_metadata(cls, filename, item): + """Yield CSV row(s) for a successful items[filename] entry.""" + yield { + cls._metadata_count_label: len(item.get("post_ids", [])), + "post_ids": ", ".join(map(str, item.get("post_ids", []))), "filename": filename, - "download_successful": data.get('success', ""), - "reason": data.get("reason", ""), - "reason_description": data.get("reason_description", "") + "download_successful": True, + "reason": "ok", + "reason_description": "", } - yield row + @classmethod + def map_failure_metadata(cls, failure): + """Yield CSV row(s) for a failures[] entry.""" + yield { + cls._metadata_count_label: len(failure.get("post_ids", [])), + "post_ids": ", ".join(map(str, failure.get("post_ids", []))), + "filename": "", + "download_successful": False, + "reason": failure.get("reason", ""), + "reason_description": failure.get("reason_description", ""), + } diff --git a/processors/visualisation/download_tiktok.py b/processors/visualisation/download_tiktok.py index aed561930..5792fa44d 100644 --- a/processors/visualisation/download_tiktok.py +++ b/processors/visualisation/download_tiktok.py @@ -3,7 +3,6 @@ """ import asyncio import datetime -import json from io import BytesIO from pathlib import Path from PIL import Image, UnidentifiedImageError @@ -122,7 +121,9 @@ def process(self): urls_to_refresh = [] url_to_item_id = {} max_fails_exceeded = 0 - metadata = {} + metadata = self.dataset.new_media_metadata( + processor_type=self.type, from_dataset=self.source_dataset.key + ) # Loop through items and collect URLs for mapped_item in self.source_dataset.iterate_items(self): @@ -164,12 +165,12 @@ def process(self): downloaded_media += 1 self.dataset.update_status(f"Downloaded image {downloaded_media}/{max_amount}") - metadata[url] = { - "filename": filename, - "success": success, - "from_dataset": self.source_dataset.key, - "post_ids": [post_id] - } + metadata.add_item( + filename, + post_ids=[post_id], + url=url, + replace=True, + ) if refresh_tiktok_urls: @@ -225,29 +226,33 @@ def process(self): filename = '' # Record metadata - metadata[url] = { - "filename": filename, - "success": success, - "from_dataset": self.source_dataset.key, - "post_ids": [post_id] - } - if success: + metadata.add_item( + filename, + post_ids=[post_id], + url=url, + replace=True, + ) self.dataset.update_status(f"Downloaded image for {url}") downloaded_media += 1 - elif not url: - self.dataset.log( - f"No {url_column} identified for {refreshed_mapped_item.get('tiktok_url')}, skipping") else: - self.dataset.log(f"Unable to save image for {url}, skipping") + metadata.add_failure( + post_ids=[post_id], + reason="error", + url=url, + ) + if not url: + self.dataset.log( + f"No {url_column} identified for {refreshed_mapped_item.get('tiktok_url')}, skipping") + else: + self.dataset.log(f"Unable to save image for {url}, skipping") # In case some images failed to download, we update our starting points last_url_index += need_more need_more = max_amount - downloaded_media # Write metadata file - with results_path.joinpath(".metadata.json").open("w", encoding="utf-8") as outfile: - json.dump(metadata, outfile) + metadata.write(results_path) warning = None if downloaded_media < max_amount: @@ -307,20 +312,25 @@ def collect_image(url, user_agent="Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_ return picture, extension @staticmethod - def map_metadata(url, data): - """ - Iterator to yield modified metadata for CSV - - :param str url: string that may contain URLs - :param dict data: dictionary with metadata collected previously - :yield dict: iterator containing reformated metadata - """ - row = { - "url": url, - "number_of_posts_with_url": len(data.get("post_ids", [])), - "post_ids": ", ".join(data.get("post_ids", [])), - "filename": data.get("filename"), - "download_successful": data.get('success', "") + def map_metadata(filename, item): + """Yield CSV row(s) for a successful items[filename] entry.""" + yield { + "url": item.get("url", ""), + "number_of_posts_with_url": len(item.get("post_ids", [])), + "post_ids": ", ".join(item.get("post_ids", [])), + "filename": filename, + "download_successful": True, } - yield row + @staticmethod + def map_failure_metadata(failure): + """Yield CSV row(s) for a failures[] entry.""" + yield { + "url": failure.get("url", ""), + "number_of_posts_with_url": len(failure.get("post_ids", [])), + "post_ids": ", ".join(failure.get("post_ids", [])), + "filename": "", + "download_successful": False, + "reason": failure.get("reason", ""), + "reason_description": failure.get("reason_description", ""), + } diff --git a/processors/visualisation/download_videos.py b/processors/visualisation/download_videos.py index d559006d6..d884d7447 100644 --- a/processors/visualisation/download_videos.py +++ b/processors/visualisation/download_videos.py @@ -4,9 +4,7 @@ First attempt to download via request, but if that fails use yt-dlp. """ import copy -import json import re -import shutil import time import zipfile from pathlib import Path @@ -20,9 +18,9 @@ from backend.lib.processor import BasicProcessor from backend.lib.proxied_requests import FailedProxiedRequest -from common.lib.dataset import DataSet -from common.lib.exceptions import ProcessorInterruptedException, ProcessorException, DataSetException +from common.lib.exceptions import ProcessorInterruptedException, ProcessorException from common.lib.helpers import UserInput, sets_to_lists, url_to_filename +from common.lib.media_archive_library import MediaArchiveLibrary __author__ = "Dale Wahl" __credits__ = ["Dale Wahl"] @@ -341,7 +339,7 @@ def process(self): self.dataset.update_status('Collected %i urls.' % len(urls)) - vid_lib = DatasetVideoLibrary(self.dataset, modules=self.modules) + vid_lib = MediaArchiveLibrary.collect(self.dataset, self.modules, ["video-downloader"]) # Prepare staging area for videos and video tracking results_path = self.dataset.get_staging_area() @@ -534,7 +532,7 @@ def add_to_queue(self, url_list, queue, url_dict, skip_channels=False, copy_if_e :param dict url_dict: Dictionary to update with URL metadata :param bool skip_channels: Whether to skip known channels :param bool copy_if_existing: Whether to attempt to copy video from previous dataset if URL exists in library - :param DatasetVideoLibrary existing_video_library: Video library to check for existing videos if `copy_if_existing` is `True` + :param MediaArchiveLibrary existing_video_library: Video library to check for existing videos if `copy_if_existing` is `True` :param Path copy_output_path: Path to copy videos to if copied from library :return: None """ @@ -556,7 +554,6 @@ def add_to_queue(self, url_list, queue, url_dict, skip_channels=False, copy_if_e # Initialize URL metadata url_dict[url]["success"] = False - url_dict[url]["retry"] = True # Skip known channels if not downloading channels if skip_channels and any([sub_url in url for sub_url in self.known_channels]): @@ -572,39 +569,59 @@ def add_to_queue(self, url_list, queue, url_dict, skip_channels=False, copy_if_e def _try_copy_from_library(self, url, urls_dict, vid_lib, results_path): """ - Try to copy video from previously downloaded library - + Try to copy a video already fetched by a previous downloader run. + :param str url: URL to check :param dict urls_dict: URLs dictionary to update - :param DatasetVideoLibrary vid_lib: Video library instance + :param MediaArchiveLibrary vid_lib: Video library instance :param Path results_path: Path to staging area :return dict: Result with 'copied', 'count', and 'skip' keys """ result = {"copied": False, "count": 0, "skip": False} - - if url not in vid_lib.library: + + hit = vid_lib.find(url) + if hit is None: return result - - previous_vid_metadata = vid_lib.library[url] - - if previous_vid_metadata.get('success', False): - # Use previous downloaded video + + if hit.is_success: + # copy the previously downloaded file(s) instead of re-fetching try: self.dataset.log(f"Copying previously downloaded video for url: {url}") - num_copied = self.copy_previous_video(previous_vid_metadata, results_path, vid_lib.previous_downloaders) - urls_dict[url] = previous_vid_metadata + num_copied = self.copy_previous_video(hit, results_path) + # keep this run's post_ids; fill in the download outcome + urls_dict[url].update(self._copied_url_fields(hit)) self.dataset.update_status("Copied previously downloaded video to current dataset.") result["copied"] = True result["count"] = num_copied except FailedToCopy as e: self.dataset.log(f"{str(e)}; attempting to download again") - elif previous_vid_metadata.get("retry", True) is False: - urls_dict[url] = previous_vid_metadata + elif "not_a_video" in hit.reasons: + # a previous run established this URL is not a video; don't retry + urls_dict[url]["success"] = False + urls_dict[url]["reason"] = "not_a_video" self.dataset.log(f"Skipping; previously identified url as not a video: {url}") result["skip"] = True - + return result + @staticmethod + def _copied_url_fields(hit): + """ + Build the `urls_dict` fields for a URL whose file(s) were copied from + a previous archive, from a successful `MediaLibraryHit`. + """ + files = [] + downloader = None + for filename, item in hit.entries: + extra = dict(item.get("extra") or {}) + # `downloader` lived at URL level in the working dict, not per file + downloader = downloader or extra.pop("downloader", None) + files.append({"filename": filename, "success": True, "metadata": extra}) + fields = {"success": True, "files": files} + if downloader: + fields["downloader"] = downloader + return fields + def _process_direct_downloads(self, url_list, urls_dict, results_path, max_video_size, also_indirect, amount, last_domains, ignore_not_video): """ @@ -779,7 +796,7 @@ def _handle_direct_download_response(self, url, response, urls_dict, results_pat else: self.dataset.log(f"NotVideoLinkError: {str(e)}") urls_dict[url]["error"] = str(e) - urls_dict[url]["retry"] = False + urls_dict[url]["reason"] = "not_a_video" result["not_a_video"] = True if last_domains.count(domain) >= 2: @@ -793,7 +810,7 @@ def _handle_direct_download_response(self, url, response, urls_dict, results_pat except NotAVideo as e: self.dataset.log(f"Request Error: {str(e)}") urls_dict[url]["error"] = str(e) - urls_dict[url]["retry"] = False + urls_dict[url]["reason"] = "not_a_video" result["not_a_video"] = True if last_domains.count(domain) >= 2: @@ -958,16 +975,41 @@ def _download_single_ytdlp(self, url, ydl_opts, urls_dict, yt_dlp_archive_map): return result def _save_metadata(self, urls_dict, results_path): - """Save metadata to JSON file""" + """Save metadata to .metadata.json in the staging area.""" self.dataset.update_status("Updating and saving metadata") - metadata = { - url: { - "from_dataset": self.source_dataset.key, - **sets_to_lists(data) - } for url, data in urls_dict.items() - } - with results_path.joinpath(".metadata.json").open("w", encoding="utf-8") as outfile: - json.dump(metadata, outfile) + metadata = self.dataset.new_media_metadata( + processor_type=self.type, from_dataset=self.source_dataset.key + ) + for url, data in urls_dict.items(): + data = sets_to_lists(data) + post_ids = list(data.get("post_ids", [])) + files = data.get("files") or [] + had_successful_file = False + for file in files: + if not isinstance(file, dict): + continue + filename = file.get("filename") + if not filename or not file.get("success", True): + continue + extra = dict(file.get("metadata") or {}) + if data.get("downloader"): + extra["downloader"] = data["downloader"] + # replace=True: yt-dlp playlists can in theory produce duplicate + # filenames if the same video appears twice; last write wins, + # matching the physical file on disk + metadata.add_item(filename, post_ids=post_ids, url=url, + extra=extra, replace=True) + had_successful_file = True + + if not had_successful_file: + metadata.add_failure( + post_ids=post_ids, + reason=data.get("reason") or "error", + reason_description=data.get("error", "") or "", + url=url, + ) + + metadata.write(results_path) def _log_statistics(self, total_urls): """Log comprehensive download statistics""" @@ -1239,143 +1281,71 @@ def identify_video_urls_in_string(self, text): urls |= set([url for url in urls_from_text(string)]) return list(urls) - def copy_previous_video(self, previous_vid_metadata, staging_area, previous_downloaders): - """Copy existing video to new staging area""" - num_copied = 0 - dataset_key = previous_vid_metadata.get("file_dataset_key") - dataset = [dataset for dataset in previous_downloaders if dataset.key == dataset_key] - - if "files" in previous_vid_metadata: - files = previous_vid_metadata.get('files') - elif "filename" in previous_vid_metadata: - files = [{"filename": previous_vid_metadata.get("filename"), "success": True}] - else: - raise FailedToCopy("Unable to read video metadata") + def copy_previous_video(self, hit, staging_area): + """ + Copy the file(s) of a previously downloaded video into the current + staging area. + + :param MediaLibraryHit hit: a successful library lookup; its + `metadata.dataset` locates the source archive and `entries` + lists the files to copy. + :param Path staging_area: where to extract the files to. + :return int: number of files copied. + """ + source_dataset = hit.metadata.dataset + if source_dataset is None: + raise FailedToCopy("Previous metadata is not bound to a dataset") - if not files: + filenames = [filename for filename, _ in hit.entries] + if not filenames: raise FailedToCopy("No file found in metadata") - if not dataset: - raise FailedToCopy(f"Dataset with key {dataset_key} not found") - else: - dataset = dataset[0] - - with zipfile.ZipFile(dataset.get_results_path(), "r") as archive_file: - archive_contents = sorted(archive_file.namelist()) - - for file in files: - if file.get("filename") not in archive_contents: - raise FailedToCopy(f"Previously downloaded video {file.get('filename')} not found") - - self.dataset.log(f"Copying previously downloaded video {file.get('filename')} to new staging area") - archive_file.extract(file.get("filename"), staging_area) + num_copied = 0 + with zipfile.ZipFile(source_dataset.get_results_path(), "r") as archive_file: + archive_contents = set(archive_file.namelist()) + for filename in filenames: + if filename not in archive_contents: + raise FailedToCopy(f"Previously downloaded video {filename} not found") + self.dataset.log(f"Copying previously downloaded video {filename} to new staging area") + archive_file.extract(filename, staging_area) num_copied += 1 return num_copied - @staticmethod - def map_metadata(url, data): - """Iterator to yield modified metadata for CSV""" + YT_DLP_EXTRACTED_FIELDS = ( + "title", "artist", "description", "view_count", "like_count", + "repost_count", "comment_count", "uploader", "creator", "uploader_id", + ) + + @classmethod + def map_metadata(cls, filename, item): + """Yield CSV row(s) for a successful items[filename] entry.""" + extra = item.get("extra") or {} row = { - "url": url, - "number_of_posts_with_url": len(data.get("post_ids", [])), - "post_ids": ", ".join(data.get("post_ids", [])), - "downloader": data.get("downloader", ""), - "download_successful": data.get('success', "") + "url": item.get("url", ""), + "number_of_posts_with_url": len(item.get("post_ids", [])), + "post_ids": ", ".join(item.get("post_ids", [])), + "downloader": extra.get("downloader", ""), + "download_successful": True, + "filename": filename, } + for field in cls.YT_DLP_EXTRACTED_FIELDS: + row[f"extracted_{field}"] = extra.get(field, "N/A") + row["error"] = "N/A" + yield row - for file in data.get("files", [{}]): - row["filename"] = file.get("filename", "N/A") - yt_dlp_data = file.get("metadata", {}) - for common_column in ["title", "artist", "description", "view_count", "like_count", "repost_count", "comment_count", "uploader", "creator", "uploader_id"]: - if yt_dlp_data: - row[f"extracted_{common_column}"] = yt_dlp_data.get(common_column) - else: - row[f"extracted_{common_column}"] = "N/A" - row["error"] = data.get("error", "N/A") - yield row - - -class DatasetVideoLibrary: - """ - Library for managing video downloads across multiple processors - """ - def __init__(self, current_dataset, modules): - self.modules = modules - self.current_dataset = current_dataset - self.previous_downloaders = self.collect_previous_downloaders() - self.current_dataset.log(f"Previously video downloaders: {[downloader.key for downloader in self.previous_downloaders]}") - - metadata_files = self.collect_all_metadata_files() - - # Build library - library = {} - for metadata_file in metadata_files: - for url, data in metadata_file[1].items(): - if data.get("success", False): - # Always overwrite for success - library[url] = { - **data, - "file_dataset_key": metadata_file[0] - } - elif url not in library: - # Do not overwrite failures, but do add if missing - library[url] = { - **data, - "file_dataset_key": metadata_file[0] - } - - self.current_dataset.log(f"Total URLs previously seen: {len(library)}") - self.library = library - - def collect_previous_downloaders(self): - """ - Check for other video-downloader processors run on the dataset and create library for reference - """ - # NOTE: this only checks parent dataset, not full ancestry (e.g. other filters with video downloaders) - parent_dataset = self.current_dataset.get_parent() - # Note: exclude current dataset - previous_downloaders = [child for child in parent_dataset.get_children() if - (child.type in ["video-downloader"] and child.key != self.current_dataset.key)] - - # Check to see if filtered dataset - if "copied_from" in parent_dataset.parameters and parent_dataset.is_top_dataset(): - try: - original_dataset = DataSet(key=parent_dataset.parameters["copied_from"], db=self.current_dataset.db, modules=self.modules) - previous_downloaders += [child for child in original_dataset.top_parent().get_children() if - (child.type in ["video-downloader"] and child.key != self.current_dataset.key)] - except DataSetException: - # parent dataset no longer exists! - pass - - return previous_downloaders - - def collect_metadata_file(self, dataset, staging_area): - """Collect metadata from a dataset's video archive""" - source_file = dataset.get_results_path() - if not source_file.exists(): - return None - - with zipfile.ZipFile(dataset.get_results_path(), "r") as archive_file: - archive_contents = sorted(archive_file.namelist()) - if '.metadata.json' not in archive_contents: - return None - - archive_file.extract(".metadata.json", staging_area) - - with open(staging_area.joinpath(".metadata.json")) as file: - return json.load(file) - - def collect_all_metadata_files(self): - """Collect all metadata files from previous downloaders""" - metadata_staging_area = self.current_dataset.get_staging_area() - - metadata_files = [(downloader.key, self.collect_metadata_file(downloader, metadata_staging_area)) - for downloader in self.previous_downloaders] - metadata_files = [file for file in metadata_files if file[1] is not None] - self.current_dataset.log(f"Metadata files collected: {len(metadata_files)}; with {[len(urls[1]) for urls in metadata_files]}") - - # Delete staging area - shutil.rmtree(metadata_staging_area) - - return metadata_files \ No newline at end of file + @classmethod + def map_failure_metadata(cls, failure): + """Yield CSV row(s) for a failures[] entry.""" + row = { + "url": failure.get("url", ""), + "number_of_posts_with_url": len(failure.get("post_ids", [])), + "post_ids": ", ".join(failure.get("post_ids", [])), + "downloader": "", + "download_successful": False, + "filename": "", + } + for field in cls.YT_DLP_EXTRACTED_FIELDS: + row[f"extracted_{field}"] = "N/A" + row["error"] = failure.get("reason_description") or failure.get("reason") or "N/A" + yield row diff --git a/processors/visualisation/image_category_wall.py b/processors/visualisation/image_category_wall.py index 7029c86fb..c62b04f2d 100644 --- a/processors/visualisation/image_category_wall.py +++ b/processors/visualisation/image_category_wall.py @@ -3,7 +3,6 @@ """ import io import base64 -import json import math from svgwrite.image import Image as ImageElement @@ -16,7 +15,7 @@ from common.lib.helpers import UserInput, convert_to_int, get_4cat_canvas from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException __author__ = "Dale Wahl" __credits__ = ["Dale Wahl", "Stijn Peeters"] @@ -204,17 +203,16 @@ def process(self): filename_map = {filename.stem: [filename] for filename in staging_area.iterdir()} else: # Use image metadata to map post IDs to filenames - with open(staging_area.joinpath('.metadata.json')) as file: - image_data = json.load(file) + try: + metadata = image_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + self.dataset.finish_with_error("No metadata file found") + return filename_map = {} # Images can belong to multiple posts; posts can have multiple images - for image in image_data.values(): - if image.get("success"): - for post_id in image.get("post_ids"): - if post_id not in filename_map: - filename_map[post_id] = [staging_area.joinpath(image.get("filename"))] - else: - filename_map[post_id].append(staging_area.joinpath(image.get("filename"))) + for filename, item in metadata.iter_entries(): + for post_id in item.get("post_ids", []): + filename_map.setdefault(post_id, []).append(staging_area.joinpath(filename)) # Organize posts into categories category_type = None diff --git a/processors/visualisation/image_wall_w_text.py b/processors/visualisation/image_wall_w_text.py index 27a02b301..0bf046c8a 100644 --- a/processors/visualisation/image_wall_w_text.py +++ b/processors/visualisation/image_wall_w_text.py @@ -3,7 +3,6 @@ """ import io import base64 -import json import math import textwrap @@ -17,7 +16,7 @@ from common.lib.helpers import UserInput, convert_to_int, get_4cat_canvas from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException __author__ = "Dale Wahl" __credits__ = ["Dale Wahl", "Stijn Peeters"] @@ -145,21 +144,19 @@ def process(self): max_text_len = 0 filename_to_text_mapping = {} if text_dataset.type in ImageTextWallGenerator.combined_dataset: - # For datasets with both images and text, use .metadata.json - metadata_file = self.extract_archived_file_by_name(".metadata.json", self.source_file) - if metadata_file is None: + # For datasets with both images and text (Stable Diffusion output), + # the prompt is stored as item.extra.prompt in the archive metadata. + try: + metadata = text_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): self.dataset.finish_with_error("No metadata file found") return - with metadata_file.open() as f: - metadata = json.load(f) - for item in metadata.values(): + for filename, item in metadata.iter_entries(): if self.interrupted: raise ProcessorInterruptedException("Interrupted while collecting text") - if "filename" in item: - # "image-downloader-stable-diffusion" datasets - image_text = item.get("prompt", "") - max_text_len = max(max_text_len, len(image_text)) - filename_to_text_mapping[item["filename"]] = image_text + image_text = (item.get("extra") or {}).get("prompt", "") + max_text_len = max(max_text_len, len(image_text)) + filename_to_text_mapping[filename] = image_text else: # For datasets with separate images and text for item in text_dataset.iterate_items(self): diff --git a/processors/visualisation/video_hasher.py b/processors/visualisation/video_hasher.py index ae4a84aa6..b31139780 100644 --- a/processors/visualisation/video_hasher.py +++ b/processors/visualisation/video_hasher.py @@ -5,7 +5,6 @@ https://ffmpeg.org/ """ import csv -import json import shutil import zipfile @@ -16,7 +15,7 @@ from backend.lib.processor import BasicProcessor from backend.lib.preset import ProcessorAdvancedPreset -from common.lib.exceptions import ProcessorInterruptedException, ProcessorException +from common.lib.exceptions import ProcessorInterruptedException, ProcessorException, MetadataException from common.lib.user_input import UserInput __author__ = "Dale Wahl" @@ -203,7 +202,10 @@ def process(self): output_dir = self.dataset.get_staging_area() video_hashes = {} - video_metadata = None + try: + video_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + video_metadata = None total_possible_videos = max((min(self.source_dataset.num_rows - 1, max_videos) if max_videos != 0 else self.source_dataset.num_rows), 1) processed_videos = 0 @@ -216,9 +218,6 @@ def process(self): break if video.file.name == '.metadata.json': - # Keep it and move on - with video.file.open() as file: - video_metadata = json.load(file) continue elif video.file.name == "video_archive": # yt-dlp file @@ -261,16 +260,6 @@ def process(self): num_posts = 0 rows = [] annotations = [] - if video_metadata is None: - # Grab the metadata directly, if it exists but was skipped (e.g., not found prior to max_videos) - try: - metadata_path = self.extract_archived_file_by_name(".metadata.json", self.source_file, output_dir) - except FileNotFoundError: - metadata_path = None - if metadata_path: - with open(metadata_path) as file: - video_metadata = json.load(file) - if video_metadata is None: self.dataset.log( "No video metadata (i.e., from video downloader) found; unable to connect original posts. Saving video hashes only.") @@ -287,43 +276,31 @@ def process(self): num_posts += 1 else: self.dataset.update_status("Saving video hash results") - for url, data in video_metadata.items(): - if not data.get("success"): - continue - if "files" in data: - files = data.get('files') - elif "filename" in data: - files = [{"filename": data.get("filename"), "success": True}] - else: - self.dataset.log(f"Metadata Error: {url} with {data}") + from_dataset = video_metadata.from_dataset + for filename, item in video_metadata.iter_entries(): + if filename not in video_hashes: continue + video_hash = video_hashes[filename].get('videohash') + post_ids = item.get("post_ids", []) + rows.append({ + 'id': filename, + 'url': item.get("url", ""), + "from_dataset": from_dataset, + 'video_hash': video_hash.hash, + 'video_duration': video_hash.video_duration, + 'video_count': len(post_ids), + "post_ids": ','.join(str(p) for p in post_ids), + 'video_collage_filename': video_hashes[filename].get('video_collage_filename'), + }) + if save_annotations: + for item_id in post_ids: + annotations.append({ + "label": "video-hash", + "value": video_hash.hash, + "item_id": item_id + }) - for file in files: - if not file.get("success"): - continue - if file.get('filename') not in video_hashes: - self.dataset.log(f"Metadata Error: {file.get('filename')} with {url} - {data}") - continue - video_hash = video_hashes[file.get('filename')].get('videohash') - rows.append({ - 'id': file.get('filename'), # best if all datasets have unique identifier - 'url': url, - "from_dataset": data.get("from_dataset"), - 'video_hash': video_hash.hash, - 'video_duration': video_hash.video_duration, - 'video_count': len(data.get('post_ids', [])), - "post_ids": ','.join([str(post_id) for post_id in data.get("post_ids", [])]), - 'video_collage_filename': video_hashes[file.get('filename')].get('video_collage_filename'), - }) - if save_annotations: - for item_id in data.get("post_ids", []): - annotations.append({ - "label": "video-hash", - "value": video_hash.hash, - "item_id": item_id - }) - - num_posts += 1 + num_posts += 1 writer = None diff --git a/processors/visualisation/video_scene_identifier.py b/processors/visualisation/video_scene_identifier.py index 904620b0c..86704342d 100644 --- a/processors/visualisation/video_scene_identifier.py +++ b/processors/visualisation/video_scene_identifier.py @@ -1,12 +1,11 @@ """ Detect scenes in videos """ -import json import os from scenedetect import open_video, SceneManager, VideoOpenFailure from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException, ProcessorException +from common.lib.exceptions import ProcessorInterruptedException, ProcessorException, MetadataException from common.lib.user_input import UserInput __author__ = "Dale Wahl" @@ -185,17 +184,16 @@ def process(self): self.dataset.update_status("Detecting video scenes") total_possible_videos = self.source_dataset.num_rows - deduct_metadata # exclude metadata file on UNIX processed_videos = 0 - video_metadata = None + try: + video_metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + video_metadata = None collected_scenes = {} for original_video in self.source_dataset.iterate_items(self, immediately_delete=False): if self.interrupted: raise ProcessorInterruptedException("Interrupted while detecting video scenes") - # Check for 4CAT's metadata JSON and copy it if original_video.file.name == ".metadata.json": - # Keep it and move on - with open(original_video.file) as file: - video_metadata = json.load(file) continue elif original_video.file.name == "video_archive": # yt-dlp file @@ -275,39 +273,29 @@ def process(self): num_posts += 1 else: self.dataset.update_status("Saving video scene results") - for url, video_data in video_metadata.items(): - if video_data.get('success'): - files = video_data.get('files') if 'files' in video_data else [{"filename": video_data.get("filename"), "success":True}] - for file in files: - if not file.get("success") or file.get("filename") not in collected_scenes: - continue - - # List types are not super fun for CSV - if 'post_ids' in video_data: - video_data['post_ids'] = ','.join(video_data['post_ids']) - + from_dataset = video_metadata.from_dataset + for filename, item in video_metadata.iter_entries(): + if filename not in collected_scenes: + continue + post_ids = item.get("post_ids", []) + post_ids_csv = ','.join(post_ids) + for i, scene in enumerate(collected_scenes[filename]): + rows.append({ + 'id': filename + '_scene_' + str(i + 1), + 'url': item.get("url", ""), + "from_dataset": from_dataset, + **scene, + "post_ids": post_ids_csv, + }) + num_posts += 1 - for i, scene in enumerate(collected_scenes[file.get('filename')]): - rows.append({ - 'id': file.get('filename') + '_scene_' + str(i+1), # best if all datasets have unique identifier - 'url': url, - "from_dataset": video_data.get("from_dataset"), - **scene, - "post_ids": ','.join(video_data.get("post_ids", [])), + if save_annotations and i == 0: + for item_id in post_ids: + annotations.append({ + "label": "scene_amount", + "value": scene.get("num_scenes_detected", ""), + "item_id": item_id, }) - num_posts += 1 - - # Write amount of scenes for first scene detected - if save_annotations and i == 0: - item_ids = video_data.get("post_ids", []) - item_ids = [item_ids] if isinstance(item_ids, str) else item_ids - for item_id in item_ids: - annotation = { - "label": "scene_amount", - "value": scene.get("num_scenes_detected", ""), - "item_id": item_id - } - annotations.append(annotation) if save_annotations and annotations: self.save_annotations(annotations) diff --git a/processors/visualisation/video_timelines.py b/processors/visualisation/video_timelines.py index a57ff2ea7..b479dc4e8 100644 --- a/processors/visualisation/video_timelines.py +++ b/processors/visualisation/video_timelines.py @@ -2,7 +2,6 @@ Create timelines from collections of video frames """ import base64 -import json import io from PIL import Image @@ -14,7 +13,7 @@ from ural import is_url from backend.lib.processor import BasicProcessor -from common.lib.exceptions import ProcessorInterruptedException +from common.lib.exceptions import ProcessorInterruptedException, MetadataException from common.lib.user_input import UserInput from common.lib.helpers import get_4cat_canvas from common.lib.dataset import DataSet @@ -78,7 +77,10 @@ def is_compatible_with(cls, module=None, config=None): return module.type in ["video-frames", "video-scene-frames"] def process(self): - metadata = {} + try: + metadata = self.source_dataset.read_media_metadata() + except (FileNotFoundError, MetadataException): + metadata = None base_height = self.parameters.get("height", 100) fontsize = 12 @@ -109,11 +111,9 @@ def process(self): # final iteration in which we finish things up (see below) looping = False - # there is a metadata file, always read first, which we can use + # skip the metadata file (already read via read_media_metadata) if item.file.name == ".metadata.json": - with item.file.open() as infile: - metadata = json.load(infile) - continue + continue # skip if file is a real file but not an image if looping and item.file.suffix not in (".jpeg", ".jpg", ".png", ".gif"): @@ -205,25 +205,23 @@ def get_video_labels(self, metadata): determine an appropriate label. There is a generalised heuristic and some data source-specific pathways. - :param metadata: Metadata as parsed from the 'Extract Frames' JSON + :param MediaArchiveMetadata metadata: Loaded archive metadata. :return dict: Filename -> label mapping """ - mapping_dataset = {} - mapping_ids = {} - labels = {} - if not metadata: return {} - for url, data in metadata.items(): - if data.get('success'): - for filename in [f["filename"] for f in data.get("files", [])]: - filename = ".".join(filename.split(".")[:-1]) - mapping_ids[filename] = data["post_ids"] - if data.get("from_dataset", data.get("source_dataset")) not in mapping_dataset: - mapping_dataset[data.get("from_dataset", data.get("source_dataset"))] = [] - mapping_dataset[data.get("from_dataset", data.get("source_dataset"))].append(filename) - labels[filename] = filename + mapping_dataset = {} + mapping_ids = {} + labels = {} + from_dataset = metadata.from_dataset + + for filename, item in metadata.iter_entries(): + basename = filename.rsplit(".", 1)[0] + mapping_ids[basename] = item.get("post_ids", []) + if from_dataset: + mapping_dataset.setdefault(from_dataset, []).append(basename) + labels[basename] = basename for dataset, urls in mapping_dataset.items(): dataset = DataSet(key=dataset, db=self.db, modules=self.modules).nearest("*-search") diff --git a/tests/test_archive_metadata.py b/tests/test_archive_metadata.py new file mode 100644 index 000000000..a10a22729 --- /dev/null +++ b/tests/test_archive_metadata.py @@ -0,0 +1,367 @@ +""" +Tests for `common.lib.archive_metadata.MediaArchiveMetadata`. + +Covers v1 round-trips, legacy format normalization (URL-keyed flat, +filename-keyed flat, video-style `files[]`, singular `post_id`), validation, +and the convenience helpers consumers rely on. +""" +import json +import zipfile + +import pytest + +from common.lib.archive_metadata import MediaArchiveMetadata +from common.lib.exceptions import MetadataException + +CURRENT_SCHEMA_VERSION = MediaArchiveMetadata.SCHEMA_VERSION + + +class FakeDataset: + """Just enough surface for MediaArchiveMetadata.read.""" + + def __init__(self, key, results_path, results_folder=None): + self.key = key + self._results = results_path + self._results_folder = results_folder + + def check_dataset_finished(self): + return self._results + + def get_results_folder_path(self): + return self._results_folder + + def get_own_processor(self): + return None + + +# -- v1 writer / reader round-trip -- + +def test_round_trip_minimal(tmp_path): + meta = MediaArchiveMetadata.new(processor_type="image-downloader", from_dataset="src123") + meta.add_item("a.jpg", post_ids=["p1", "p2"], url="https://example.com/a") + meta.add_item("b.jpg", post_ids=["p3"]) + meta.add_failure(post_ids=["p4"], reason="error", + reason_description="boom", url="https://example.com/c") + + target = meta.write(tmp_path) + assert target == tmp_path / ".metadata.json" + + raw = json.loads(target.read_text()) + assert raw["schema_version"] == 1 + assert raw["processor_type"] == "image-downloader" + assert raw["from_dataset"] == "src123" + assert set(raw["items"]) == {"a.jpg", "b.jpg"} + assert raw["items"]["a.jpg"] == { + "filename": "a.jpg", + "post_ids": ["p1", "p2"], + "url": "https://example.com/a", + } + assert raw["failures"] == [{ + "post_ids": ["p4"], + "reason": "error", + "url": "https://example.com/c", + "reason_description": "boom", + }] + + +def test_read_v1_from_zip(tmp_path): + meta = MediaArchiveMetadata.new(processor_type="image-downloader", from_dataset="src") + meta.add_item("a.jpg", post_ids=["1"], url="u") + meta.write(tmp_path) + + archive = tmp_path / "results.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.write(tmp_path / ".metadata.json", ".metadata.json") + + loaded = MediaArchiveMetadata.read(FakeDataset("dskey", archive)) + assert loaded.schema_version == CURRENT_SCHEMA_VERSION + assert loaded.processor_type == "image-downloader" + assert loaded.from_dataset == "src" + assert loaded.get_entry("a.jpg")["post_ids"] == ["1"] + + +def test_read_v1_from_folder(tmp_path): + meta = MediaArchiveMetadata.new(processor_type="image-downloader", from_dataset="src") + meta.add_item("a.jpg", post_ids=["1"]) + folder = tmp_path / "folder_dskey" + folder.mkdir() + meta.write(folder) + + ds = FakeDataset("dskey", tmp_path / "results.csv", results_folder=folder) + loaded = MediaArchiveMetadata.read(ds) + assert "a.jpg" in loaded + assert len(loaded) == 1 + + +# -- legacy normalization -- + +def test_legacy_url_keyed_flat(tmp_path): + """Image-downloader style: URL keys, flat filenames, success bool.""" + legacy = { + "https://example.com/a.jpg": { + "filename": "a.jpg", + "success": True, + "from_dataset": "src", + "post_ids": ["p1", "p2"], + "url": "https://example.com/a.jpg", + }, + "https://example.com/b.jpg": { + "filename": "b.jpg", + "success": False, + "from_dataset": "src", + "post_ids": ["p3"], + }, + } + path = tmp_path / ".metadata.json" + path.write_text(json.dumps(legacy)) + + # Easiest to invoke `_populate_from_raw` directly rather than building a + # fake on-disk archive. + m = MediaArchiveMetadata() + m._populate_from_raw(legacy) + + assert m.from_dataset == "src" + assert m.get_entry("a.jpg")["post_ids"] == ["p1", "p2"] + assert m.get_entry("a.jpg")["url"] == "https://example.com/a.jpg" + assert "b.jpg" not in m + assert len(m.failures) == 1 + assert m.failures[0]["post_ids"] == ["p3"] + assert m.failures[0]["url"] == "https://example.com/b.jpg" + assert m.failures[0]["reason"] == "error" + + +def test_legacy_video_files_array(tmp_path): + """Video-downloader style: URL keys, nested files[] with per-file metadata.""" + legacy = { + "https://example.com/playlist": { + "success": True, + "from_dataset": "src", + "post_ids": ["p1"], + "downloader": "yt_dlp", + "files": [ + {"filename": "vid1.mp4", "success": True, + "metadata": {"title": "One", "view_count": 100}}, + {"filename": "vid2.mp4", "success": True, + "metadata": {"title": "Two"}}, + ], + }, + "https://example.com/dead": { + "success": False, + "from_dataset": "src", + "post_ids": ["p2"], + "error": "404 Not Found", + }, + } + m = MediaArchiveMetadata() + m._populate_from_raw(legacy) + + assert set(m.items) == {"vid1.mp4", "vid2.mp4"} + assert m.get_entry("vid1.mp4")["url"] == "https://example.com/playlist" + assert m.get_entry("vid1.mp4")["post_ids"] == ["p1"] + # per-file yt-dlp metadata is flattened in, and the outer-level + # `downloader` is carried onto every file so nothing is lost + assert m.get_entry("vid1.mp4")["extra"] == { + "downloader": "yt_dlp", "title": "One", "view_count": 100, + } + assert m.get_entry("vid2.mp4")["extra"] == { + "downloader": "yt_dlp", "title": "Two", + } + assert m.get_entry("vid2.mp4")["post_ids"] == ["p1"] + + # url is shared across the playlist's outputs + assert m.get_entry("vid2.mp4")["url"] == "https://example.com/playlist" + + # failure preserved with description from `error` + assert len(m.failures) == 1 + assert m.failures[0]["url"] == "https://example.com/dead" + assert m.failures[0]["reason_description"] == "404 Not Found" + + +def test_legacy_telegram_filename_keyed(): + """Telegram style: filename keys, structured reason codes.""" + legacy = { + "chat-100.mp4": { + "filename": "chat-100.mp4", "success": True, + "from_dataset": "src", "post_ids": ["chat-100"], + "reason": "ok", "reason_description": "downloaded", + }, + "chat-101.mp4": { + "filename": "chat-101.mp4", "success": False, + "from_dataset": "src", "post_ids": ["chat-101"], + "reason": "restricted_channel", + "reason_description": "Telegram refused: channel restrictions.", + }, + } + m = MediaArchiveMetadata() + m._populate_from_raw(legacy) + + assert "chat-100.mp4" in m + assert "chat-101.mp4" not in m + assert m.failures[0]["reason"] == "restricted_channel" + assert m.failures[0]["post_ids"] == ["chat-101"] + # no URL stored for telegram entries + assert "url" not in m.failures[0] + + +def test_legacy_singular_post_id(): + legacy = { + "https://example.com/a.jpg": { + "filename": "a.jpg", "success": True, "post_id": "p1", + }, + } + m = MediaArchiveMetadata() + m._populate_from_raw(legacy) + assert m.get_entry("a.jpg")["post_ids"] == ["p1"] + + +def test_legacy_unsupported_schema_version_raises(): + with pytest.raises(MetadataException, match="Unsupported"): + m = MediaArchiveMetadata() + m._populate_from_raw({"schema_version": 99, "items": {}}) + + +def test_legacy_single_file_preserves_top_level_extra(): + """Pre-v1 files kept producer data (e.g. the SD prompt) at the top level.""" + legacy = { + "p1": { + "filename": "p1-cat.jpeg", "success": True, "post_ids": ["p1"], + "from_dataset": "src", + "prompt": "a cat", "negative-prompt": "a dog", + }, + } + m = MediaArchiveMetadata() + m._populate_from_raw(legacy) + assert m.get_entry("p1-cat.jpeg")["extra"] == { + "prompt": "a cat", "negative-prompt": "a dog", + } + + +def test_legacy_failure_preserves_extra(): + """Producer data on a failed legacy entry survives under failure['extra'].""" + legacy = { + "https://example.com/playlist": { + "success": True, "from_dataset": "src", "post_ids": ["p1"], + "downloader": "yt_dlp", + "files": [ + {"filename": "good.mp4", "success": True, "metadata": {"title": "ok"}}, + {"filename": "", "success": False, "error": "geo-blocked", + "region": "NL"}, + ], + }, + } + m = MediaArchiveMetadata() + m._populate_from_raw(legacy) + + assert len(m.failures) == 1 + failure = m.failures[0] + assert failure["reason_description"] == "geo-blocked" + # the outer `downloader` and the per-file `region` are both kept; the + # promoted reason/error keys are not duplicated into extra + assert failure["extra"] == {"downloader": "yt_dlp", "region": "NL"} + + +# -- helper API -- + +def test_filename_to_post_ids(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.add_item("a.jpg", post_ids=["1", "2"]) + m.add_item("b.jpg", post_ids=["3"]) + assert m.filename_to_post_ids() == {"a.jpg": ["1", "2"], "b.jpg": ["3"]} + + +def test_post_ids_for(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.add_item("a.jpg", post_ids=["1"]) + assert m.post_ids_for("a.jpg") == ["1"] + assert m.post_ids_for("missing.jpg") == [] + + +def test_add_item_normalizes_post_ids(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.add_item("a.jpg", post_ids="p1") # bare string + m.add_item("b.jpg", post_ids=42) # bare int + m.add_item("c.jpg", post_ids=[1, 2, 3]) # ints in list + assert m.get_entry("a.jpg")["post_ids"] == ["p1"] + assert m.get_entry("b.jpg")["post_ids"] == ["42"] + assert m.get_entry("c.jpg")["post_ids"] == ["1", "2", "3"] + + +def test_add_item_duplicate_raises(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.add_item("a.jpg", post_ids=["1"]) + with pytest.raises(MetadataException, match="already present"): + m.add_item("a.jpg", post_ids=["2"]) + + +def test_add_item_replace_overwrites(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.add_item("a.jpg", post_ids=["1"]) + m.add_item("a.jpg", post_ids=["2"], replace=True) + assert m.post_ids_for("a.jpg") == ["2"] + + +# -- validation -- + +def test_validate_rejects_filename_mismatch(tmp_path): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.items["a.jpg"] = {"filename": "b.jpg", "post_ids": []} + with pytest.raises(MetadataException, match="must match its key"): + m.validate() + + +def test_validate_rejects_non_list_post_ids(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.items["a.jpg"] = {"filename": "a.jpg", "post_ids": "p1"} + with pytest.raises(MetadataException, match="post_ids must be a list"): + m.validate() + + +def test_validate_rejects_failure_without_reason(): + m = MediaArchiveMetadata.new(processor_type="x", from_dataset="y") + m.failures.append({"post_ids": [], "reason": ""}) + with pytest.raises(MetadataException, match="reason"): + m.validate() + + +def test_read_raises_when_dataset_unfinished(): + ds = FakeDataset("k", None) + with pytest.raises(MetadataException, match="not finished"): + MediaArchiveMetadata.read(ds) + + +def test_read_raises_when_dataset_empty(): + ds = FakeDataset("k", "empty") + with pytest.raises(MetadataException, match="empty"): + MediaArchiveMetadata.read(ds) + + +def test_read_raises_when_file_missing_from_zip(tmp_path): + archive = tmp_path / "empty.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr("README", "no metadata here") + ds = FakeDataset("k", archive) + with pytest.raises(FileNotFoundError): + MediaArchiveMetadata.read(ds) + + +def test_read_raises_metadata_exception_on_malformed_json(tmp_path): + """A corrupt metadata file surfaces as MetadataException, not JSONDecodeError.""" + archive = tmp_path / "bad.zip" + with zipfile.ZipFile(archive, "w") as zf: + zf.writestr(".metadata.json", "{ not valid json ]") + ds = FakeDataset("k", archive) + with pytest.raises(MetadataException, match="not valid JSON"): + MediaArchiveMetadata.read(ds) + + +def test_v1_read_normalizes_integer_post_ids(): + """Integer post_ids in a v1 file are coerced to strings on read.""" + raw = { + "schema_version": 1, + "items": {"a.jpg": {"filename": "a.jpg", "post_ids": [1, 2]}}, + "failures": [{"post_ids": [3], "reason": "error"}], + } + m = MediaArchiveMetadata() + m._populate_from_raw(raw) + assert m.get_entry("a.jpg")["post_ids"] == ["1", "2"] + assert m.failures[0]["post_ids"] == ["3"] diff --git a/tests/test_media_archive_library.py b/tests/test_media_archive_library.py new file mode 100644 index 000000000..1fb58b85b --- /dev/null +++ b/tests/test_media_archive_library.py @@ -0,0 +1,118 @@ +""" +Tests for `common.lib.media_archive_library.MediaArchiveLibrary`. + +Exercises `find()` resolution via the pure constructor (metadata objects +injected directly). The `collect()` path — walking parent/child datasets — +is left to integration/manual verification. +""" +from common.lib.archive_metadata import MediaArchiveMetadata +from common.lib.media_archive_library import MediaArchiveLibrary + + +def _meta(): + """Build an empty MediaArchiveMetadata for tests to populate.""" + return MediaArchiveMetadata.new(processor_type="video-downloader", from_dataset="src") + + +# -- success lookups -- + +def test_find_success_returns_entry(): + m = _meta() + m.add_item("v.mp4", post_ids=["p1"], url="https://example.com/v") + lib = MediaArchiveLibrary([m]) + + hit = lib.find("https://example.com/v") + assert hit is not None + assert hit.is_success + assert hit.metadata is m + assert hit.entries == [("v.mp4", m.get_entry("v.mp4"))] + + +def test_find_unknown_url_returns_none(): + m = _meta() + m.add_item("v.mp4", post_ids=["p1"], url="https://example.com/v") + lib = MediaArchiveLibrary([m]) + + assert lib.find("https://example.com/never-seen") is None + + +def test_playlist_one_url_many_files(): + """A single source URL that produced several files (yt-dlp playlist).""" + m = _meta() + m.add_item("v1.mp4", post_ids=["p1"], url="https://example.com/playlist") + m.add_item("v2.mp4", post_ids=["p1"], url="https://example.com/playlist") + lib = MediaArchiveLibrary([m]) + + hit = lib.find("https://example.com/playlist") + assert hit.is_success + assert {fn for fn, _ in hit.entries} == {"v1.mp4", "v2.mp4"} + + +def test_success_groups_by_single_archive(): + """URL downloaded by two archives: hit resolves to one archive's files.""" + m1 = _meta() + m1.add_item("a.mp4", post_ids=["p1"], url="https://example.com/v") + m2 = _meta() + m2.add_item("b.mp4", post_ids=["p1"], url="https://example.com/v") + lib = MediaArchiveLibrary([m1, m2]) + + hit = lib.find("https://example.com/v") + assert hit.is_success + # all returned entries belong to the same (first) archive + assert hit.metadata is m1 + assert {fn for fn, _ in hit.entries} == {"a.mp4"} + + +# -- failure lookups -- + +def test_find_failure_surfaces_reason(): + m = _meta() + m.add_failure(post_ids=["p1"], reason="not_a_video", url="https://example.com/x") + lib = MediaArchiveLibrary([m]) + + hit = lib.find("https://example.com/x") + assert hit is not None + assert not hit.is_success + assert hit.reasons == {"not_a_video"} + + +def test_failure_collects_all_reasons_across_archives(): + m1 = _meta() + m1.add_failure(post_ids=["p1"], reason="error", url="https://example.com/x") + m2 = _meta() + m2.add_failure(post_ids=["p2"], reason="not_a_video", url="https://example.com/x") + lib = MediaArchiveLibrary([m1, m2]) + + hit = lib.find("https://example.com/x") + assert not hit.is_success + assert hit.reasons == {"error", "not_a_video"} + + +def test_success_beats_failure(): + """URL downloaded in one archive, failed in another — success wins.""" + failed = _meta() + failed.add_failure(post_ids=["p1"], reason="error", url="https://example.com/v") + ok = _meta() + ok.add_item("v.mp4", post_ids=["p1"], url="https://example.com/v") + lib = MediaArchiveLibrary([failed, ok]) + + hit = lib.find("https://example.com/v") + assert hit.is_success + assert hit.metadata is ok + + +# -- misc -- + +def test_items_without_url_are_not_indexed(): + """Telegram-style entries have no URL; they must not crash indexing.""" + m = _meta() + m.add_item("chat-1.mp4", post_ids=["chat-1"]) # no url + lib = MediaArchiveLibrary([m]) + + assert lib.find(None) is None + assert len(lib) == 1 + + +def test_len_reports_archive_count(): + lib = MediaArchiveLibrary([_meta(), _meta(), _meta()]) + assert len(lib) == 3