Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions backend/lib/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,12 @@ def after_process(self):
self.log.warning("Cannot attach dataset chain containing %s to %s (dataset does not exist, may have "
"been deleted in the meantime)" % (self.dataset.key, self.parameters["attach_to"]))

# Validate map_item against the freshly finished dataset and emit a
# single rolled-up admin alert if anything is unmappable. Fires once
# per dataset creation; later processors that iterate this dataset
# don't re-alert.
self._validate_map_item_post_run()

self.job.finish()

if self.config.get('mail.server') and self.dataset.get_parameters().get("email-complete", False):
Expand Down Expand Up @@ -913,6 +919,60 @@ def get_mapped_item(cls, item):

return mapped_item

def _validate_map_item_post_run(self):
"""
Walk the just-finished dataset and run `map_item` over each item,
rolling up any failures into a single admin alert via
`DataSet.warn_unmappable_signatures`. Fires at at creation time,
immediately after the dataset is marked finished and before this
worker frees its queue slot.

Wrapped so a validation failure can never fail the dataset itself.

This holds the current worker's queue slot for the duration of the
iteration. Subsequent processors are unaffected — they pick up the
now-finished dataset on other workers. If this becomes a measurable
bottleneck for very large datasets, revisit moving to a daemon
thread or a dedicated validator worker.
"""
try:
# Only run validation if the dataset is finished, has rows, and the processor
# has a compatible map_item method.
if not self.dataset.is_finished():
return
if self.dataset.data.get("num_rows", 0) <= 0:
return
if not self.map_item_method_available(dataset=self.dataset):
return

signatures = {}
for i, item in enumerate(self.dataset._iterate_items(processor=self)):
try:
self.get_mapped_item(item)
except MapItemException as e:
frame = getattr(e, "frame", None)
fname = Path(frame.filename).name if frame and getattr(frame, "filename", None) else "?"
lineno = getattr(frame, "lineno", 0) if frame else 0
sig = signatures.setdefault(
(fname, lineno),
{"count": 0, "error": str(e), "samples": []}
)
sig["count"] += 1
if len(sig["samples"]) < 3:
sig["samples"].append(i)
except Exception:
# Unexpected exception type — don't let it kill validation
continue

if signatures:
self.dataset.warn_unmappable_signatures(processor=self, signatures=signatures)
except Exception as e:
# Validation must never fail the dataset
try:
self.log.warning(f"map_item validation hook failed for dataset {self.dataset.key}: {e}")
except Exception:
pass

@classmethod
def is_filter(cls):
"""
Expand Down
24 changes: 2 additions & 22 deletions backend/lib/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ def import_from_file(self, path):
return []

import_warnings = {}
import_error_signatures = {} # (filename, lineno) -> {count, error, samples}

# Check if processor and dataset can use map_item
check_map_item = self.map_item_method_available(dataset=self.dataset)
Expand Down Expand Up @@ -215,22 +214,8 @@ def import_from_file(self, path):
except MapItemException as e:
# NOTE: we still yield the unmappable item; perhaps we need to update a processor's map_item method to account for this new item
self.import_error_count += 1
# Per-item user-facing log entry. The admin alert is
# emitted once at end of import as a single roll-up
# (see warn_unmappable_signatures below), so we
# suppress the per-item admin call here.
self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e, warn_admins=False)
# Accumulate file:line signatures for the rolled-up alert
frame = getattr(e, "frame", None)
fname = Path(frame.filename).name if frame and getattr(frame, "filename", None) else "?"
lineno = getattr(frame, "lineno", 0) if frame else 0
sig = import_error_signatures.setdefault(
(fname, lineno),
{"count": 0, "error": str(e), "samples": []}
)
sig["count"] += 1
if len(sig["samples"]) < 3:
sig["samples"].append(i)
# Per-item user-facing log entry.
self.dataset.warn_unmappable_item(item_count=i, processor=self, error_message=e)

yield new_item

Expand All @@ -241,11 +226,6 @@ def import_from_file(self, path):
for warning, num_items in import_warnings.items():
self.dataset.log(f" {warning} (for {num_items:,} item(s))")

# Rolled-up dev alert for unmappable-item signatures (one Slack message
# per dataset with file:line, counts and sample indices).
if import_error_signatures:
self.dataset.warn_unmappable_signatures(processor=self, signatures=import_error_signatures)

path.unlink()
self.dataset.delete_parameter("file")

Expand Down
30 changes: 6 additions & 24 deletions common/lib/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,9 +566,8 @@ def iterate_items(
mapped_item = own_processor.get_mapped_item(item)
except MapItemException as e:
if warn_unmappable:
self.warn_unmappable_item(
i, processor, e, warn_admins=unmapped_items is False
)
# Update dataset log for unmappable items.
self.warn_unmappable_item(i, processor, e)

unmapped_items += 1
if max_unmappable and unmapped_items > max_unmappable:
Expand Down Expand Up @@ -2420,14 +2419,13 @@ def get_result_url(self):
server = self.modules.config.get("flask.server_name")
return f"{scheme}://{server}/download/{self.key}/"

def warn_unmappable_item(
self, item_count, processor=None, error_message=None, warn_admins=True
):
def warn_unmappable_item(self, item_count, processor=None, error_message=None):
"""
Log an item that is unable to be mapped and warn administrators.
Log a per-item map_item failure to the dataset's own log.

:param int item_count: Item index
:param Processor processor: Processor calling function8
:param Processor processor: Processor calling function
:param error_message: Optional exception or message
"""
dataset_error_message = f"MapItemException (item {item_count}): {'is unable to be mapped! Check raw datafile.' if error_message is None else error_message}"

Expand All @@ -2440,22 +2438,6 @@ def warn_unmappable_item(
# Log error to dataset log
closest_dataset.log(dataset_error_message)

if warn_admins:
if processor is not None:
processor.log.warning(
f"Processor {processor.type} unable to map item all items for dataset {closest_dataset.key}."
)
elif hasattr(self.db, "log"):
# borrow the database's log handler
self.db.log.warning(
f"Unable to map item all items for dataset {closest_dataset.key}."
)
else:
# No other log available
raise DataSetException(
f"Unable to map item {item_count} for dataset {closest_dataset.key} and properly warn"
)

def warn_unmappable_signatures(self, processor=None, signatures=None):
"""
Emit a single rolled-up admin/dev alert summarising all unmappable-item
Expand Down
Loading