diff --git a/doc/changes/DM-54879.feature.md b/doc/changes/DM-54879.feature.md new file mode 100644 index 000000000..de148e48b --- /dev/null +++ b/doc/changes/DM-54879.feature.md @@ -0,0 +1 @@ +Added `ignore_metadata_for` parameter to `QuantumGraphBuilder` and `SeparablePipelineExecutor`. diff --git a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py index bb7e08a52..f8d264c29 100644 --- a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py +++ b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py @@ -163,7 +163,7 @@ def make_compression_dictionary(self) -> zstandard.ZstdCompressionDict: self.comms.log.info("Making compressor with no dictionary.") return zstandard.ZstdCompressionDict(b"") self.comms.log.info("Training compression dictionary.") - training_inputs: list[bytes] = [] + training_inputs: list[bytes | bytearray | memoryview[int]] = [] # We start the dictionary training with *predicted* quantum dataset # models, since those have almost all of the same attributes as the # provenance quantum and dataset models, and we can get a nice random diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index f230134f4..c7c20fd45 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -128,6 +128,14 @@ class QuantumGraphBuilder(ABC): skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional Collections to search for outputs that already exist for the purpose of skipping quanta that have already been run. + ignore_metadata_for : `~collections.abc.Iterable` [ `str` ], optional + Task labels for which the task metadata dataset is not used as the + completion signal when ``skip_existing_in`` is provided. For these + tasks a quantum is skipped only when all of its science outputs + are present in ``skip_existing_in``. This is useful for pipelines + where some upstream tasks do not retain all of their outputs, so that + those tasks can be re-run to regenerate the missing intermediate + datasets. clobber : `bool`, optional Whether to raise if predicted outputs already exist in ``output_run`` (not including those quanta that would be skipped because they've @@ -171,6 +179,7 @@ def __init__( input_collections: Sequence[str] | None = None, output_run: str | None = None, skip_existing_in: Sequence[str] = (), + ignore_metadata_for: Iterable[str] = (), clobber: bool = False, ): self.log = getLogger(__name__) @@ -188,6 +197,7 @@ def __init__( self.butler = butler.clone(collections=input_collections) self.output_run = output_run self.skip_existing_in = skip_existing_in + self.ignore_metadata_for: frozenset[str] = frozenset(ignore_metadata_for) self.empty_data_id = DataCoordinate.make_empty(butler.dimensions) self.clobber = clobber # See whether the output run already exists. @@ -703,7 +713,7 @@ def _skip_quantum_if_metadata_exists( self, task_node: TaskNode, quantum_key: QuantumKey, skeleton: QuantumGraphSkeleton ) -> bool: """Identify and drop quanta that should be skipped because their - metadata datasets already exist. + metadata or output datasets already exist in ``skip_existing_in``. Parameters ---------- @@ -722,41 +732,65 @@ def _skip_quantum_if_metadata_exists( Notes ----- - If the metadata dataset for this quantum exists in the - `skip_existing_in` collections, the quantum will be skipped. This - causes the quantum node to be removed from the graph. Dataset nodes + For tasks not listed in `ignore_metadata_for`, a quantum is skipped + when its metadata dataset exists in ``skip_existing_in``. + + For tasks listed in `ignore_metadata_for`, the metadata dataset is + not used as the completion signal. Instead, a quantum is skipped only + when all of its science outputs are present in ``skip_existing_in``. + If any such output is absent the quantum is not skipped, so the task + can regenerate the missing outputs. This supports pipelines where + upstream tasks do not retain all of their outputs. + + The skipped quantum node is to be removed from the graph. Dataset nodes that were previously the outputs of this quantum will be associated with `lsst.daf.butler.DatasetRef` objects that were found in ``skip_existing_in``, or will be removed if there is no such dataset there. Any output dataset in `output_run` will be removed from the "output in the way" category. """ - metadata_dataset_key = DatasetKey( - task_node.metadata_output.parent_dataset_type_name, quantum_key.data_id_values - ) - if skeleton.get_output_for_skip(metadata_dataset_key): - # This quantum's metadata is already present in the the - # skip_existing_in collections; we'll skip it. But the presence of - # the metadata dataset doesn't guarantee that all of the other - # outputs we predicted are present; we have to check. - for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): - # If this dataset was "in the way" (i.e. already in the - # output run), it isn't anymore. - skeleton.discard_output_in_the_way(output_dataset_key) - if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: - # Populate the skeleton graph's node attributes - # with the existing DatasetRef, just like a - # predicted output of a non-skipped quantum. - skeleton.set_dataset_ref(output_ref, output_dataset_key) - else: - # Remove this dataset from the skeleton graph, - # because the quantum that would have produced it - # is being skipped and it doesn't already exist. - skeleton.remove_dataset_nodes([output_dataset_key]) - # Removing the quantum node from the graph will happen outside this - # function. - return True - return False + metadata_name = task_node.metadata_output.parent_dataset_type_name + metadata_dataset_key = DatasetKey(metadata_name, quantum_key.data_id_values) + log_name = task_node.log_output.parent_dataset_type_name if task_node.log_output is not None else None + + if task_node.label in self.ignore_metadata_for: + # For this task, use actual output datasets as the completion + # signal rather than metadata. Skip only if all science + # outputs are present in skip_existing_in; do not skip if + # any are absent so they can be regenerated. + science_output_keys = [ + k + for k in skeleton.iter_outputs_of(quantum_key) + if k.parent_dataset_type_name != metadata_name and k.parent_dataset_type_name != log_name + ] + if not science_output_keys or any( + skeleton.get_output_for_skip(k) is None for k in science_output_keys + ): + return False + # All science outputs are present; fall through to skip. + elif not skeleton.get_output_for_skip(metadata_dataset_key): + # metadata absent: do not skip. + return False + + # We will skip the quantum. But it doesn't guarantee that all of the + # other outputs we predicted are present; we have to check. + for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): + # If this dataset was "in the way" (i.e. already in the + # output run), it isn't anymore. + skeleton.discard_output_in_the_way(output_dataset_key) + if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: + # Populate the skeleton graph's node attributes + # with the existing DatasetRef, just like a + # predicted output of a non-skipped quantum. + skeleton.set_dataset_ref(output_ref, output_dataset_key) + else: + # Remove this dataset from the skeleton graph, + # because the quantum that would have produced it + # is being skipped and it doesn't already exist. + skeleton.remove_dataset_nodes([output_dataset_key]) + # Removing the quantum node from the graph will happen outside this + # function. + return True @final def _update_quantum_for_adjust( diff --git a/python/lsst/pipe/base/separable_pipeline_executor.py b/python/lsst/pipe/base/separable_pipeline_executor.py index 52d694ac9..1d9b30de2 100644 --- a/python/lsst/pipe/base/separable_pipeline_executor.py +++ b/python/lsst/pipe/base/separable_pipeline_executor.py @@ -84,6 +84,11 @@ class SeparablePipelineExecutor: for existing outputs, and skips any quanta that have run to completion (or have no work to do). Otherwise, all tasks are attempted (subject to ``clobber_output``). + ignore_metadata_for : `~collections.abc.Iterable` [`str`], optional + Task labels for which the completion signal used by + ``skip_existing_in`` is changed from task metadata to all + non-metadata, non-log outputs existing. Has no effect without + ``skip_existing_in``. task_factory : `.TaskFactory`, optional A custom task factory for use in pre-execution and execution. By default, a new instance of `.TaskFactory` is used. @@ -101,6 +106,7 @@ def __init__( butler: Butler, clobber_output: bool = False, skip_existing_in: Iterable[str] | None = None, + ignore_metadata_for: Iterable[str] | None = None, task_factory: TaskFactory | None = None, resources: ExecutionResources | None = None, raise_on_partial_outputs: bool = True, @@ -115,6 +121,7 @@ def __init__( self._clobber_output = clobber_output self._skip_existing_in = list(skip_existing_in) if skip_existing_in else [] + self._ignore_metadata_for = list(ignore_metadata_for) if ignore_metadata_for else [] self._task_factory = task_factory if task_factory else TaskFactory() self.resources = resources @@ -216,6 +223,7 @@ class are provided automatically (from explicit arguments to this pipeline.to_graph(), self._butler, skip_existing_in=self._skip_existing_in, + ignore_metadata_for=self._ignore_metadata_for, clobber=self._clobber_output, **kwargs, ) @@ -276,6 +284,7 @@ class are provided automatically (from explicit arguments to this "output_run": self._butler.run, "skip_existing_in": self._skip_existing_in, "skip_existing": bool(self._skip_existing_in), + "ignore_metadata_for": self._ignore_metadata_for, "data_query": where, "user": getpass.getuser(), "time": str(datetime.datetime.now()), @@ -344,6 +353,7 @@ class are provided automatically (from explicit arguments to this metadata = { "skip_existing_in": self._skip_existing_in, "skip_existing": bool(self._skip_existing_in), + "ignore_metadata_for": self._ignore_metadata_for, "data_query": where, } qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs) diff --git a/tests/test_graphBuilder.py b/tests/test_graphBuilder.py index b5587b8da..719ebcb5f 100644 --- a/tests/test_graphBuilder.py +++ b/tests/test_graphBuilder.py @@ -29,16 +29,20 @@ import io import logging +import tempfile import unittest +import numpy + import lsst.utils.tests from lsst.daf.butler import Butler, DatasetType from lsst.daf.butler.registry import UserExpressionError -from lsst.pipe.base import PipelineGraph, QuantumGraph +from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( AllDimensionsQuantumGraphBuilder, DatasetQueryConstraintVariant, ) +from lsst.pipe.base.quantum_graph_builder import OutputExistsError from lsst.pipe.base.tests import simpleQGraph from lsst.pipe.base.tests.mocks import ( DynamicConnectionConfig, @@ -228,6 +232,172 @@ def test_datastore_records(self): self.assertEqual(quantum.datastore_records, {}) +class SkipExistingInTestCase(unittest.TestCase): + """Tests for the skip_existing_in behavior of QuantumGraphBuilder.""" + + def setUp(self): + repodir = tempfile.TemporaryDirectory() + self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) + pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) + butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) + self.enterContext(butler) + self.butler = butler + self.pipeline_graph = pipeline.to_graph() + self.butler.registry.registerRun("run") + + def test_not_skipped_without_skip_existing_in(self): + """Without skip_existing_in, a quantum is never skipped even if + metadata exists in an input collection. + """ + self.butler.put( + TaskMetadata(), + "task0_metadata", + run="run", + instrument="INSTR", + detector=0, + ) + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, self.butler, input_collections=["test"], output_run="new_run" + ).build() + self.assertEqual(len(qgraph), 1) + + def test_skipped_when_metadata_exists(self): + """With skip_existing_in, a quantum is skipped when its metadata + dataset is present in the specified collections. + """ + self.butler.put( + TaskMetadata(), + "task0_metadata", + run="run", + instrument="INSTR", + detector=0, + ) + # Init-outputs required, otherwise InitInputMissingError. + self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") + self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + input_collections=["test"], + output_run="new_run", + ).build() + self.assertEqual(len(qgraph), 0) + + def test_not_skipped_when_metadata_absent(self): + """With skip_existing_in, a quantum is not skipped when its metadata + dataset is absent from the specified collections. + """ + # No metadata put — run exists but is empty. + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + input_collections=["test"], + output_run="new_run", + ).build() + self.assertEqual(len(qgraph), 1) + + +class IgnoreMetadataForTestCase(unittest.TestCase): + """Tests for QuantumGraphBuilder.ignore_metadata_for.""" + + def setUp(self): + repodir = tempfile.TemporaryDirectory() + self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) + pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) + butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) + self.enterContext(butler) + self.butler = butler + self.pipeline_graph = pipeline.to_graph() + # Simulate a prior run and put a metadata. + self.butler.registry.registerRun("run") + self.butler.put( + TaskMetadata(), + "task0_metadata", + run="run", + instrument="INSTR", + detector=0, + ) + + def test_not_skipped_when_outputs_missing(self): + """With ignore_metadata_for, quantum is not skipped when science + outputs are absent from skip_existing_in, even if metadata is present. + + A scenario is that an upstream pipeline ran and wrote + metadata but did not retain output datasets. + """ + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + output_run="new_run", + ).build() + self.assertEqual(len(qgraph), 1) + + def test_skips_when_all_outputs_present(self): + """With ignore_metadata_for, quantum is skipped when all science + outputs are present in skip_existing_in. + """ + self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) + self.butler.put(numpy.array([0.0]), "add2_dataset1", run="run", instrument="INSTR", detector=0) + # Init-outputs required when all quanta are skipped. + self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") + self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + output_run="new_run", + ).build() + # All outputs found, so quantum should be skipped. + self.assertEqual(len(qgraph), 0) + + def test_output_exists_error_when_partial_outputs(self): + """With ignore_metadata_for, OutputExistsError is raised when some + outputs exist in the output run and clobber is off. + """ + self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) + # add2_dataset1 absent -> not all outputs present -> task not skipped + + with self.assertRaises(OutputExistsError): + AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + # Use the same run so that partial output is in the way. + output_run="run", + ).build() + + def test_partial_outputs_clobber(self): + """With ignore_metadata_for and clobber=True, partial outputs in the + output run are discarded and the task runs. + """ + self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) + # add2_dataset1 absent -> not all outputs present -> task not skipped + # clobber=True -> add_dataset1 discarded from graph, task runs + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + output_run="run", + clobber=True, + ).build() + self.assertEqual(len(qgraph), 1) + + if __name__ == "__main__": lsst.utils.tests.init() unittest.main() diff --git a/tests/test_separable_pipeline_executor.py b/tests/test_separable_pipeline_executor.py index 5f71a18f6..851ddc31b 100644 --- a/tests/test_separable_pipeline_executor.py +++ b/tests/test_separable_pipeline_executor.py @@ -588,6 +588,69 @@ def test_make_quantum_graph_nowhere_skippartial_noclobber(self): with self.assertRaises(OutputExistsError): executor.build_quantum_graph(pipeline) + def test_make_quantum_graph_nowhere_ignoremeta_not_skipped(self): + """With ignore_metadata_for, a task is not skipped when its science + output is absent, even if metadata is present. + """ + prior_run = "prior_run" + self.butler.registry.registerCollection(prior_run, lsst.daf.butler.CollectionType.RUN) + executor = SeparablePipelineExecutor( + self.butler, + skip_existing_in=[prior_run], + ignore_metadata_for=["a"], + clobber_output=False, + ) + pipeline = Pipeline.fromFile(self.pipeline_file) + + self.butler.put({"zero": 0}, "input") + # Metadata present in prior run but intermediate absent. + self.butler.put(TaskMetadata(), "a_metadata", run=prior_run) + + graph = executor.build_quantum_graph(pipeline) + self.assertEqual(len(graph), 2) + self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) + + def test_make_quantum_graph_nowhere_ignoremeta_skipped(self): + """With ignore_metadata_for, a task is skipped when its science output + is present, even if metadata and log are absent. + """ + executor = SeparablePipelineExecutor( + self.butler, + skip_existing_in=[self.butler.run], + ignore_metadata_for=["a"], + clobber_output=False, + ) + pipeline = Pipeline.fromFile(self.pipeline_file) + + self.butler.put({"zero": 0}, "input") + # Science output present; metadata and log intentionally absent. + self.butler.put({"zero": 0}, "intermediate") + self.butler.put(lsst.pex.config.Config(), "a_config") + + graph = executor.build_quantum_graph(pipeline) + self.assertEqual(len(graph), 1) + self.assertEqual(graph.header.n_task_quanta["a"], 0) + self.assertEqual(graph.header.n_task_quanta["b"], 1) + + def test_make_quantum_graph_nowhere_ignoremeta_metainway_noclobber(self): + """With ignore_metadata_for, OutputExistsError is raised when metadata + is already in the output run and the task is not skipped. + """ + executor = SeparablePipelineExecutor( + self.butler, + skip_existing_in=[self.butler.run], + ignore_metadata_for=["a"], + clobber_output=False, + ) + pipeline = Pipeline.fromFile(self.pipeline_file) + + self.butler.put({"zero": 0}, "input") + self.butler.put(TaskMetadata(), "a_metadata") + # Metadata in output run, intermediate absent -> task not skipped + # Same output run, a_metadata is in the way. + with self.assertRaises(OutputExistsError): + executor.build_quantum_graph(pipeline) + def test_build_quantum_graph_nowhere_noskip_clobber(self): executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) pipeline = Pipeline.fromFile(self.pipeline_file)