Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/changes/DM-54879.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Added `ignore_metadata_for` parameter to `QuantumGraphBuilder` and `SeparablePipelineExecutor`.
2 changes: 1 addition & 1 deletion python/lsst/pipe/base/quantum_graph/aggregator/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ def make_compression_dictionary(self) -> zstandard.ZstdCompressionDict:
self.comms.log.info("Making compressor with no dictionary.")
return zstandard.ZstdCompressionDict(b"")
self.comms.log.info("Training compression dictionary.")
training_inputs: list[bytes] = []
training_inputs: list[bytes | bytearray | memoryview[int]] = []
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious where this is coming from; AFAIK we don't use bytearray or memoryview[int] for any of these.

# We start the dictionary training with *predicted* quantum dataset
# models, since those have almost all of the same attributes as the
# provenance quantum and dataset models, and we can get a nice random
Expand Down
94 changes: 64 additions & 30 deletions python/lsst/pipe/base/quantum_graph_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,14 @@ class QuantumGraphBuilder(ABC):
skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional
Collections to search for outputs that already exist for the purpose of
skipping quanta that have already been run.
ignore_metadata_for : `~collections.abc.Iterable` [ `str` ], optional
Task labels for which the task metadata dataset is not used as the
completion signal when ``skip_existing_in`` is provided. For these
tasks a quantum is skipped only when all of its science outputs
are present in ``skip_existing_in``. This is useful for pipelines
where some upstream tasks do not retain all of their outputs, so that
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reword to "where upstream processing does not retain all task outputs" (it's not the tasks that are retaining or not).

those tasks can be re-run to regenerate the missing intermediate
datasets.
clobber : `bool`, optional
Whether to raise if predicted outputs already exist in ``output_run``
(not including those quanta that would be skipped because they've
Expand Down Expand Up @@ -171,6 +179,7 @@ def __init__(
input_collections: Sequence[str] | None = None,
output_run: str | None = None,
skip_existing_in: Sequence[str] = (),
ignore_metadata_for: Iterable[str] = (),
clobber: bool = False,
):
self.log = getLogger(__name__)
Expand All @@ -188,6 +197,7 @@ def __init__(
self.butler = butler.clone(collections=input_collections)
self.output_run = output_run
self.skip_existing_in = skip_existing_in
self.ignore_metadata_for: frozenset[str] = frozenset(ignore_metadata_for)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably want to raise here if ignore_metadata_for is not empty but skip_existing_in empty (since then ignore_metadata_for would have no effect).

self.empty_data_id = DataCoordinate.make_empty(butler.dimensions)
self.clobber = clobber
# See whether the output run already exists.
Expand Down Expand Up @@ -703,7 +713,7 @@ def _skip_quantum_if_metadata_exists(
self, task_node: TaskNode, quantum_key: QuantumKey, skeleton: QuantumGraphSkeleton
) -> bool:
"""Identify and drop quanta that should be skipped because their
metadata datasets already exist.
metadata or output datasets already exist in ``skip_existing_in``.

Parameters
----------
Expand All @@ -722,41 +732,65 @@ def _skip_quantum_if_metadata_exists(

Notes
-----
If the metadata dataset for this quantum exists in the
`skip_existing_in` collections, the quantum will be skipped. This
causes the quantum node to be removed from the graph. Dataset nodes
For tasks not listed in `ignore_metadata_for`, a quantum is skipped
when its metadata dataset exists in ``skip_existing_in``.

For tasks listed in `ignore_metadata_for`, the metadata dataset is
not used as the completion signal. Instead, a quantum is skipped only
when all of its science outputs are present in ``skip_existing_in``.
If any such output is absent the quantum is not skipped, so the task
can regenerate the missing outputs. This supports pipelines where
upstream tasks do not retain all of their outputs.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reword as in the other thread.


The skipped quantum node is to be removed from the graph. Dataset nodes
that were previously the outputs of this quantum will be associated
with `lsst.daf.butler.DatasetRef` objects that were found in
``skip_existing_in``, or will be removed if there is no such dataset
there. Any output dataset in `output_run` will be removed from the
"output in the way" category.
"""
metadata_dataset_key = DatasetKey(
task_node.metadata_output.parent_dataset_type_name, quantum_key.data_id_values
)
if skeleton.get_output_for_skip(metadata_dataset_key):
# This quantum's metadata is already present in the the
# skip_existing_in collections; we'll skip it. But the presence of
# the metadata dataset doesn't guarantee that all of the other
# outputs we predicted are present; we have to check.
for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)):
# If this dataset was "in the way" (i.e. already in the
# output run), it isn't anymore.
skeleton.discard_output_in_the_way(output_dataset_key)
if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None:
# Populate the skeleton graph's node attributes
# with the existing DatasetRef, just like a
# predicted output of a non-skipped quantum.
skeleton.set_dataset_ref(output_ref, output_dataset_key)
else:
# Remove this dataset from the skeleton graph,
# because the quantum that would have produced it
# is being skipped and it doesn't already exist.
skeleton.remove_dataset_nodes([output_dataset_key])
# Removing the quantum node from the graph will happen outside this
# function.
return True
return False
metadata_name = task_node.metadata_output.parent_dataset_type_name
metadata_dataset_key = DatasetKey(metadata_name, quantum_key.data_id_values)
log_name = task_node.log_output.parent_dataset_type_name if task_node.log_output is not None else None

if task_node.label in self.ignore_metadata_for:
# For this task, use actual output datasets as the completion
# signal rather than metadata. Skip only if all science
# outputs are present in skip_existing_in; do not skip if
# any are absent so they can be regenerated.
science_output_keys = [
k
for k in skeleton.iter_outputs_of(quantum_key)
if k.parent_dataset_type_name != metadata_name and k.parent_dataset_type_name != log_name
]
if not science_output_keys or any(
skeleton.get_output_for_skip(k) is None for k in science_output_keys
):
return False
# All science outputs are present; fall through to skip.
elif not skeleton.get_output_for_skip(metadata_dataset_key):
# metadata absent: do not skip.
return False

# We will skip the quantum. But it doesn't guarantee that all of the
# other outputs we predicted are present; we have to check.
for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)):
# If this dataset was "in the way" (i.e. already in the
# output run), it isn't anymore.
skeleton.discard_output_in_the_way(output_dataset_key)
if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None:
# Populate the skeleton graph's node attributes
# with the existing DatasetRef, just like a
# predicted output of a non-skipped quantum.
skeleton.set_dataset_ref(output_ref, output_dataset_key)
else:
# Remove this dataset from the skeleton graph,
# because the quantum that would have produced it
# is being skipped and it doesn't already exist.
skeleton.remove_dataset_nodes([output_dataset_key])
# Removing the quantum node from the graph will happen outside this
# function.
return True

@final
def _update_quantum_for_adjust(
Expand Down
10 changes: 10 additions & 0 deletions python/lsst/pipe/base/separable_pipeline_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class SeparablePipelineExecutor:
for existing outputs, and skips any quanta that have run to completion
(or have no work to do). Otherwise, all tasks are attempted (subject to
``clobber_output``).
ignore_metadata_for : `~collections.abc.Iterable` [`str`], optional
Task labels for which the completion signal used by
``skip_existing_in`` is changed from task metadata to all
non-metadata, non-log outputs existing. Has no effect without
``skip_existing_in``.
task_factory : `.TaskFactory`, optional
A custom task factory for use in pre-execution and execution. By
default, a new instance of `.TaskFactory` is used.
Expand All @@ -101,6 +106,7 @@ def __init__(
butler: Butler,
clobber_output: bool = False,
skip_existing_in: Iterable[str] | None = None,
ignore_metadata_for: Iterable[str] | None = None,
task_factory: TaskFactory | None = None,
resources: ExecutionResources | None = None,
raise_on_partial_outputs: bool = True,
Expand All @@ -115,6 +121,7 @@ def __init__(

self._clobber_output = clobber_output
self._skip_existing_in = list(skip_existing_in) if skip_existing_in else []
self._ignore_metadata_for = list(ignore_metadata_for) if ignore_metadata_for else []

self._task_factory = task_factory if task_factory else TaskFactory()
self.resources = resources
Expand Down Expand Up @@ -216,6 +223,7 @@ class are provided automatically (from explicit arguments to this
pipeline.to_graph(),
self._butler,
skip_existing_in=self._skip_existing_in,
ignore_metadata_for=self._ignore_metadata_for,
clobber=self._clobber_output,
**kwargs,
)
Expand Down Expand Up @@ -276,6 +284,7 @@ class are provided automatically (from explicit arguments to this
"output_run": self._butler.run,
"skip_existing_in": self._skip_existing_in,
"skip_existing": bool(self._skip_existing_in),
"ignore_metadata_for": self._ignore_metadata_for,
"data_query": where,
"user": getpass.getuser(),
"time": str(datetime.datetime.now()),
Expand Down Expand Up @@ -344,6 +353,7 @@ class are provided automatically (from explicit arguments to this
metadata = {
"skip_existing_in": self._skip_existing_in,
"skip_existing": bool(self._skip_existing_in),
"ignore_metadata_for": self._ignore_metadata_for,
"data_query": where,
}
qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs)
Expand Down
172 changes: 171 additions & 1 deletion tests/test_graphBuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,20 @@

import io
import logging
import tempfile
import unittest

import numpy

import lsst.utils.tests
from lsst.daf.butler import Butler, DatasetType
from lsst.daf.butler.registry import UserExpressionError
from lsst.pipe.base import PipelineGraph, QuantumGraph
from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata
from lsst.pipe.base.all_dimensions_quantum_graph_builder import (
AllDimensionsQuantumGraphBuilder,
DatasetQueryConstraintVariant,
)
from lsst.pipe.base.quantum_graph_builder import OutputExistsError
from lsst.pipe.base.tests import simpleQGraph
from lsst.pipe.base.tests.mocks import (
DynamicConnectionConfig,
Expand Down Expand Up @@ -228,6 +232,172 @@ def test_datastore_records(self):
self.assertEqual(quantum.datastore_records, {})


class SkipExistingInTestCase(unittest.TestCase):
"""Tests for the skip_existing_in behavior of QuantumGraphBuilder."""

def setUp(self):
repodir = tempfile.TemporaryDirectory()
self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir)
pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1)
butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is worth changing at this point, but for new tests I'd generally recommend lsst.pipe.base.tests.mocks.InMemoryRepo or lsst.pipe.base.tests.mocks.DirectButlerRepo (see test_predicted_qg.py for an example). That also preps a butler repo with input datasets and helps you build a pipeline, and while it's just about as concise for simple cases, it's also extensible to more complex graphs.

self.enterContext(butler)
self.butler = butler
self.pipeline_graph = pipeline.to_graph()
self.butler.registry.registerRun("run")

def test_not_skipped_without_skip_existing_in(self):
"""Without skip_existing_in, a quantum is never skipped even if
metadata exists in an input collection.
"""
self.butler.put(
TaskMetadata(),
"task0_metadata",
run="run",
instrument="INSTR",
detector=0,
)

qgraph = AllDimensionsQuantumGraphBuilder(
self.pipeline_graph, self.butler, input_collections=["test"], output_run="new_run"
).build()
self.assertEqual(len(qgraph), 1)

def test_skipped_when_metadata_exists(self):
"""With skip_existing_in, a quantum is skipped when its metadata
dataset is present in the specified collections.
"""
self.butler.put(
TaskMetadata(),
"task0_metadata",
run="run",
instrument="INSTR",
detector=0,
)
# Init-outputs required, otherwise InitInputMissingError.
self.butler.put(numpy.array([0.0]), "add_init_output1", run="run")
self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run")

qgraph = AllDimensionsQuantumGraphBuilder(
self.pipeline_graph,
self.butler,
skip_existing_in=["run"],
input_collections=["test"],
output_run="new_run",
).build()
self.assertEqual(len(qgraph), 0)

def test_not_skipped_when_metadata_absent(self):
"""With skip_existing_in, a quantum is not skipped when its metadata
dataset is absent from the specified collections.
"""
# No metadata put — run exists but is empty.
qgraph = AllDimensionsQuantumGraphBuilder(
self.pipeline_graph,
self.butler,
skip_existing_in=["run"],
input_collections=["test"],
output_run="new_run",
).build()
self.assertEqual(len(qgraph), 1)


class IgnoreMetadataForTestCase(unittest.TestCase):
"""Tests for QuantumGraphBuilder.ignore_metadata_for."""

def setUp(self):
repodir = tempfile.TemporaryDirectory()
self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir)
pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1)
butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1)
self.enterContext(butler)
self.butler = butler
self.pipeline_graph = pipeline.to_graph()
# Simulate a prior run and put a metadata.
self.butler.registry.registerRun("run")
self.butler.put(
TaskMetadata(),
"task0_metadata",
run="run",
instrument="INSTR",
detector=0,
)

def test_not_skipped_when_outputs_missing(self):
"""With ignore_metadata_for, quantum is not skipped when science
outputs are absent from skip_existing_in, even if metadata is present.

A scenario is that an upstream pipeline ran and wrote
metadata but did not retain output datasets.
"""
qgraph = AllDimensionsQuantumGraphBuilder(
self.pipeline_graph,
self.butler,
skip_existing_in=["run"],
ignore_metadata_for=["task0"],
input_collections=["test"],
output_run="new_run",
).build()
self.assertEqual(len(qgraph), 1)

def test_skips_when_all_outputs_present(self):
"""With ignore_metadata_for, quantum is skipped when all science
outputs are present in skip_existing_in.
"""
self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0)
self.butler.put(numpy.array([0.0]), "add2_dataset1", run="run", instrument="INSTR", detector=0)
# Init-outputs required when all quanta are skipped.
self.butler.put(numpy.array([0.0]), "add_init_output1", run="run")
self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run")

qgraph = AllDimensionsQuantumGraphBuilder(
self.pipeline_graph,
self.butler,
skip_existing_in=["run"],
ignore_metadata_for=["task0"],
input_collections=["test"],
output_run="new_run",
).build()
# All outputs found, so quantum should be skipped.
self.assertEqual(len(qgraph), 0)

def test_output_exists_error_when_partial_outputs(self):
"""With ignore_metadata_for, OutputExistsError is raised when some
outputs exist in the output run and clobber is off.
"""
self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0)
# add2_dataset1 absent -> not all outputs present -> task not skipped

with self.assertRaises(OutputExistsError):
AllDimensionsQuantumGraphBuilder(
self.pipeline_graph,
self.butler,
skip_existing_in=["run"],
ignore_metadata_for=["task0"],
input_collections=["test"],
# Use the same run so that partial output is in the way.
output_run="run",
).build()

def test_partial_outputs_clobber(self):
"""With ignore_metadata_for and clobber=True, partial outputs in the
output run are discarded and the task runs.
"""
self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0)
# add2_dataset1 absent -> not all outputs present -> task not skipped
# clobber=True -> add_dataset1 discarded from graph, task runs

qgraph = AllDimensionsQuantumGraphBuilder(
self.pipeline_graph,
self.butler,
skip_existing_in=["run"],
ignore_metadata_for=["task0"],
input_collections=["test"],
output_run="run",
clobber=True,
).build()
self.assertEqual(len(qgraph), 1)


if __name__ == "__main__":
lsst.utils.tests.init()
unittest.main()
Loading
Loading