From 9b1254ecbfdb93700e2bc60d11dbec0512e556cf Mon Sep 17 00:00:00 2001 From: Dale Wahl Date: Wed, 13 May 2026 10:28:28 +0200 Subject: [PATCH] create _validate_map_items_post_run --- backend/lib/processor.py | 60 ++++++++++++++++++++++++++++++++++++++++ backend/lib/search.py | 24 ++-------------- common/lib/dataset.py | 30 ++++---------------- 3 files changed, 68 insertions(+), 46 deletions(-) diff --git a/backend/lib/processor.py b/backend/lib/processor.py index cb798b1dc..401243011 100644 --- a/backend/lib/processor.py +++ b/backend/lib/processor.py @@ -363,6 +363,12 @@ def after_process(self): self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have " "been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"])) + # Validate map_item against the freshly finished dataset and emit a + # single rolled-up admin alert if anything is unmappable. Fires once + # per dataset creation; later processors that iterate this dataset + # don't re-alert. + self._validate_map_item_post_run() + self.job.finish() if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False): @@ -913,6 +919,60 @@ def get_mapped_item(cls, item): return mapped_item + def _validate_map_item_post_run(self): + """ + Walk the just-finished dataset and run `map_item` over each item, + rolling up any failures into a single admin alert via + `DataSet.warn_unmappable_signatures`. Fires at at creation time, + immediately after the dataset is marked finished and before this + worker frees its queue slot. + + Wrapped so a validation failure can never fail the dataset itself. + + This holds the current worker's queue slot for the duration of the + iteration. Subsequent processors are unaffected — they pick up the + now-finished dataset on other workers. If this becomes a measurable + bottleneck for very large datasets, revisit moving to a daemon + thread or a dedicated validator worker. + """ + try: + # Only run validation if the dataset is finished, has rows, and the processor + # has a compatible map_item method. + if not self.dataset.is_finished(): + return + if self.dataset.data.get("num_rows", 0) <= 0: + return + if not self.map_item_method_available(dataset=self.dataset): + return + + signatures = {} + for i, item in enumerate(self.dataset._iterate_items(processor=self)): + try: + self.get_mapped_item(item) + except MapItemException as e: + frame = getattr(e, "frame", None) + fname = Path(frame.filename).name if frame and getattr(frame, "filename", None) else "?" + lineno = getattr(frame, "lineno", 0) if frame else 0 + sig = signatures.setdefault( + (fname, lineno), + {"count": 0, "error": str(e), "samples": []} + ) + sig["count"] += 1 + if len(sig["samples"]) < 3: + sig["samples"].append(i) + except Exception: + # Unexpected exception type — don't let it kill validation + continue + + if signatures: + self.dataset.warn_unmappable_signatures(processor=self, signatures=signatures) + except Exception as e: + # Validation must never fail the dataset + try: + self.log.warning(f"map_item validation hook failed for dataset {self.dataset.key}: {e}") + except Exception: + pass + @classmethod def is_filter(cls): """ diff --git a/backend/lib/search.py b/backend/lib/search.py index 3f7dcc792..e2cb176d9 100644 --- a/backend/lib/search.py +++ b/backend/lib/search.py @@ -158,7 +158,6 @@ def import_from_file(self, path): return [] import_warnings = {} - import_error_signatures = {} # (filename, lineno) -> {count, error, samples} # Check if processor and dataset can use map_item check_map_item = self.map_item_method_available(dataset=self.dataset) @@ -215,22 +214,8 @@ def import_from_file(self, path): except MapItemException as e: # NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item self.import_error_count += 1 - # Per-item user-facing log entry. The admin alert is - # emitted once at end of import as a single roll-up - # (see warn_unmappable_signatures below), so we - # suppress the per-item admin call here. - self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=False) - # Accumulate file:line signatures for the rolled-up alert - frame = getattr(e, "frame", None) - fname = Path(frame.filename).name if frame and getattr(frame, "filename", None) else "?" - lineno = getattr(frame, "lineno", 0) if frame else 0 - sig = import_error_signatures.setdefault( - (fname, lineno), - {"count": 0, "error": str(e), "samples": []} - ) - sig["count"] += 1 - if len(sig["samples"]) < 3: - sig["samples"].append(i) + # Per-item user-facing log entry. + self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e) yield new_item @@ -241,11 +226,6 @@ def import_from_file(self, path): for warning, num_items in import_warnings.items(): self.dataset.log(f" {warning} (for {num_items:,} item(s))") - # Rolled-up dev alert for unmappable-item signatures (one Slack message - # per dataset with file:line, counts and sample indices). - if import_error_signatures: - self.dataset.warn_unmappable_signatures(processor=self, signatures=import_error_signatures) - path.unlink() self.dataset.delete_parameter("file") diff --git a/common/lib/dataset.py b/common/lib/dataset.py index 15a8da2cd..c10f5c6f7 100644 --- a/common/lib/dataset.py +++ b/common/lib/dataset.py @@ -566,9 +566,8 @@ def iterate_items( mapped_item = own_processor.get_mapped_item(item) except MapItemException as e: if warn_unmappable: - self.warn_unmappable_item( - i, processor, e, warn_admins=unmapped_items is False - ) + # Update dataset log for unmappable items. + self.warn_unmappable_item(i, processor, e) unmapped_items += 1 if max_unmappable and unmapped_items > max_unmappable: @@ -2420,14 +2419,13 @@ def get_result_url(self): server = self.modules.config.get("flask.server_name") return f"{scheme}://{server}/download/{self.key}/" - def warn_unmappable_item( - self, item_count, processor=None, error_message=None, warn_admins=True - ): + def warn_unmappable_item(self, item_count, processor=None, error_message=None): """ - Log an item that is unable to be mapped and warn administrators. + Log a per-item map_item failure to the dataset's own log. :param int item_count: Item index - :param Processor processor: Processor calling function8 + :param Processor processor: Processor calling function + :param error_message: Optional exception or message """ dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}" @@ -2440,22 +2438,6 @@ def warn_unmappable_item( # Log error to dataset log closest_dataset.log(dataset_error_message) - if warn_admins: - if processor is not None: - processor.log.warning( - f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}." - ) - elif hasattr(self.db, "log"): - # borrow the database's log handler - self.db.log.warning( - f"Unable to map item all items for dataset {closest_dataset.key}." - ) - else: - # No other log available - raise DataSetException( - f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn" - ) - def warn_unmappable_signatures(self, processor=None, signatures=None): """ Emit a single rolled-up admin/dev alert summarising all unmappable-item