From 0db29d2d005fe397d6f97e2da234a0bfda9cda3c Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Tue, 12 May 2026 15:44:59 -0700 Subject: [PATCH 1/6] Fix mypy type error in _writer.py --- python/lsst/pipe/base/quantum_graph/aggregator/_writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py index bb7e08a52..f8d264c29 100644 --- a/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py +++ b/python/lsst/pipe/base/quantum_graph/aggregator/_writer.py @@ -163,7 +163,7 @@ def make_compression_dictionary(self) -> zstandard.ZstdCompressionDict: self.comms.log.info("Making compressor with no dictionary.") return zstandard.ZstdCompressionDict(b"") self.comms.log.info("Training compression dictionary.") - training_inputs: list[bytes] = [] + training_inputs: list[bytes | bytearray | memoryview[int]] = [] # We start the dictionary training with *predicted* quantum dataset # models, since those have almost all of the same attributes as the # provenance quantum and dataset models, and we can get a nice random From 88ca8ff160d8eec8f66316198d1ad6fe06714f3b Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Fri, 8 May 2026 14:54:41 -0700 Subject: [PATCH 2/6] Add SkipExistingInTestCase to test_graphBuilder.py The skip_existing_in behavior of QuantumGraphBuilder was previously only covered through test_separable_pipeline_executor.py, where SeparablePipelineExecutor drives AllDimensionsQuantumGraphBuilder. No tests exercised the builder directly at the unit level. --- tests/test_graphBuilder.py | 74 +++++++++++++++++++++++++++++++++++++- 1 file changed, 73 insertions(+), 1 deletion(-) diff --git a/tests/test_graphBuilder.py b/tests/test_graphBuilder.py index b5587b8da..b671792cf 100644 --- a/tests/test_graphBuilder.py +++ b/tests/test_graphBuilder.py @@ -29,12 +29,15 @@ import io import logging +import tempfile import unittest +import numpy + import lsst.utils.tests from lsst.daf.butler import Butler, DatasetType from lsst.daf.butler.registry import UserExpressionError -from lsst.pipe.base import PipelineGraph, QuantumGraph +from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( AllDimensionsQuantumGraphBuilder, DatasetQueryConstraintVariant, @@ -228,6 +231,75 @@ def test_datastore_records(self): self.assertEqual(quantum.datastore_records, {}) +class SkipExistingInTestCase(unittest.TestCase): + """Tests for the skip_existing_in behavior of QuantumGraphBuilder.""" + + def setUp(self): + repodir = tempfile.TemporaryDirectory() + self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) + pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) + butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) + self.enterContext(butler) + self.butler = butler + self.pipeline_graph = pipeline.to_graph() + self.butler.registry.registerRun("run") + + def test_not_skipped_without_skip_existing_in(self): + """Without skip_existing_in, a quantum is never skipped even if + metadata exists in an input collection. + """ + self.butler.put( + TaskMetadata(), + "task0_metadata", + run="run", + instrument="INSTR", + detector=0, + ) + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, self.butler, input_collections=["test"], output_run="new_run" + ).build() + self.assertEqual(len(qgraph), 1) + + def test_skipped_when_metadata_exists(self): + """With skip_existing_in, a quantum is skipped when its metadata + dataset is present in the specified collections. + """ + self.butler.put( + TaskMetadata(), + "task0_metadata", + run="run", + instrument="INSTR", + detector=0, + ) + # Init-outputs required, otherwise InitInputMissingError. + self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") + self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + input_collections=["test"], + output_run="new_run", + ).build() + self.assertEqual(len(qgraph), 0) + + def test_not_skipped_when_metadata_absent(self): + """With skip_existing_in, a quantum is not skipped when its metadata + dataset is absent from the specified collections. + """ + # No metadata put — run exists but is empty. + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + input_collections=["test"], + output_run="new_run", + ).build() + self.assertEqual(len(qgraph), 1) + + if __name__ == "__main__": lsst.utils.tests.init() unittest.main() From 354477853be52888dce53f435dbb1c9d740fd839 Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Mon, 11 May 2026 14:31:50 -0700 Subject: [PATCH 3/6] Add ignore_metadata_for parameter to QuantumGraphBuilder For tasks listed in ignore_metadata_for, _skip_quantum_if_metadata_exists changes the completion signal from task metadata to all non-metadata outputs existing in skip_existing_in. A quantum is not skipped when any non-metadata output is absent; skipped when all are present. When prior outputs are visible, the quantum is still skipped and discard_output_in_the_way runs. When outputs are absent, the quantum is not skipped and they cannot be "in the way" in a fresh output_run. --- .../lsst/pipe/base/quantum_graph_builder.py | 94 ++++++++++------ tests/test_graphBuilder.py | 101 +++++++++++++++++- 2 files changed, 164 insertions(+), 31 deletions(-) diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index f230134f4..410a997e7 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -128,6 +128,15 @@ class QuantumGraphBuilder(ABC): skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional Collections to search for outputs that already exist for the purpose of skipping quanta that have already been run. + ignore_metadata_for : `~collections.abc.Iterable` [ `str` ], optional + Task labels for which the task metadata dataset is not used as the + completion signal when ``skip_existing_in`` is provided. For these + tasks a quantum is skipped only when all of its non-metadata outputs + are present in ``skip_existing_in``; it is not skipped when any + non-metadata output is absent. This is useful for pipelines + where some upstream tasks do not retain all of their outputs, so that + those tasks can be re-run to regenerate the missing intermediate + datasets. clobber : `bool`, optional Whether to raise if predicted outputs already exist in ``output_run`` (not including those quanta that would be skipped because they've @@ -171,6 +180,7 @@ def __init__( input_collections: Sequence[str] | None = None, output_run: str | None = None, skip_existing_in: Sequence[str] = (), + ignore_metadata_for: Iterable[str] = (), clobber: bool = False, ): self.log = getLogger(__name__) @@ -188,6 +198,7 @@ def __init__( self.butler = butler.clone(collections=input_collections) self.output_run = output_run self.skip_existing_in = skip_existing_in + self.ignore_metadata_for: frozenset[str] = frozenset(ignore_metadata_for) self.empty_data_id = DataCoordinate.make_empty(butler.dimensions) self.clobber = clobber # See whether the output run already exists. @@ -703,7 +714,7 @@ def _skip_quantum_if_metadata_exists( self, task_node: TaskNode, quantum_key: QuantumKey, skeleton: QuantumGraphSkeleton ) -> bool: """Identify and drop quanta that should be skipped because their - metadata datasets already exist. + metadata or output datasets already exist in ``skip_existing_in``. Parameters ---------- @@ -722,41 +733,64 @@ def _skip_quantum_if_metadata_exists( Notes ----- - If the metadata dataset for this quantum exists in the - `skip_existing_in` collections, the quantum will be skipped. This - causes the quantum node to be removed from the graph. Dataset nodes + For tasks not listed in `ignore_metadata_for`, a quantum is skipped + when its metadata dataset exists in ``skip_existing_in``. + + For tasks listed in `ignore_metadata_for`, the metadata dataset is + not used as the completion signal. Instead, a quantum is skipped only + when all of its science outputs are present in ``skip_existing_in``. + If any such output is absent the quantum is not skipped, so the task + can regenerate the missing outputs. This supports pipelines where + upstream tasks do not retain all of their outputs. + + The skipped quantum node is to be removed from the graph. Dataset nodes that were previously the outputs of this quantum will be associated with `lsst.daf.butler.DatasetRef` objects that were found in ``skip_existing_in``, or will be removed if there is no such dataset there. Any output dataset in `output_run` will be removed from the "output in the way" category. """ - metadata_dataset_key = DatasetKey( - task_node.metadata_output.parent_dataset_type_name, quantum_key.data_id_values - ) - if skeleton.get_output_for_skip(metadata_dataset_key): - # This quantum's metadata is already present in the the - # skip_existing_in collections; we'll skip it. But the presence of - # the metadata dataset doesn't guarantee that all of the other - # outputs we predicted are present; we have to check. - for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): - # If this dataset was "in the way" (i.e. already in the - # output run), it isn't anymore. - skeleton.discard_output_in_the_way(output_dataset_key) - if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: - # Populate the skeleton graph's node attributes - # with the existing DatasetRef, just like a - # predicted output of a non-skipped quantum. - skeleton.set_dataset_ref(output_ref, output_dataset_key) - else: - # Remove this dataset from the skeleton graph, - # because the quantum that would have produced it - # is being skipped and it doesn't already exist. - skeleton.remove_dataset_nodes([output_dataset_key]) - # Removing the quantum node from the graph will happen outside this - # function. - return True - return False + metadata_name = task_node.metadata_output.parent_dataset_type_name + metadata_dataset_key = DatasetKey(metadata_name, quantum_key.data_id_values) + + if task_node.label in self.ignore_metadata_for: + # For this task, use actual output datasets as the completion + # signal rather than metadata. Skip only if all science + # outputs are present in skip_existing_in; do not skip if + # any are absent so they can be regenerated. + science_output_keys = [ + k + for k in skeleton.iter_outputs_of(quantum_key) + if k.parent_dataset_type_name != metadata_name + ] + if not science_output_keys or any( + skeleton.get_output_for_skip(k) is None for k in science_output_keys + ): + return False + # All non-metadata outputs are present; fall through to skip. + elif not skeleton.get_output_for_skip(metadata_dataset_key): + # metadata absent: do not skip. + return False + + # We will skip the quantum. But it doesn't guarantee that all of the + # other outputs we predicted are present; we have to check. + for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): + # If this dataset was "in the way" (i.e. already in the + # output run), it isn't anymore. + skeleton.discard_output_in_the_way(output_dataset_key) + if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: + # Populate the skeleton graph's node attributes + # with the existing DatasetRef, just like a + # predicted output of a non-skipped quantum. + skeleton.set_dataset_ref(output_ref, output_dataset_key) + else: + # Remove this dataset from the skeleton graph, + # because the quantum that would have produced it + # is being skipped and it doesn't already exist. + skeleton.remove_dataset_nodes([output_dataset_key]) + # Removing the quantum node from the graph will happen outside this + # function. + return True @final def _update_quantum_for_adjust( diff --git a/tests/test_graphBuilder.py b/tests/test_graphBuilder.py index b671792cf..83510f6ee 100644 --- a/tests/test_graphBuilder.py +++ b/tests/test_graphBuilder.py @@ -35,13 +35,14 @@ import numpy import lsst.utils.tests -from lsst.daf.butler import Butler, DatasetType +from lsst.daf.butler import Butler, DatasetType, ButlerLogRecords from lsst.daf.butler.registry import UserExpressionError from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( AllDimensionsQuantumGraphBuilder, DatasetQueryConstraintVariant, ) +from lsst.pipe.base.quantum_graph_builder import OutputExistsError from lsst.pipe.base.tests import simpleQGraph from lsst.pipe.base.tests.mocks import ( DynamicConnectionConfig, @@ -300,6 +301,104 @@ def test_not_skipped_when_metadata_absent(self): self.assertEqual(len(qgraph), 1) +class IgnoreMetadataForTestCase(unittest.TestCase): + """Tests for QuantumGraphBuilder.ignore_metadata_for.""" + + def setUp(self): + repodir = tempfile.TemporaryDirectory() + self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) + pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) + butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) + self.enterContext(butler) + self.butler = butler + self.pipeline_graph = pipeline.to_graph() + # Simulate a prior run and put a metadata. + self.butler.registry.registerRun("run") + self.butler.put( + TaskMetadata(), + "task0_metadata", + run="run", + instrument="INSTR", + detector=0, + ) + + def test_not_skipped_when_outputs_missing(self): + """With ignore_metadata_for, quantum is not skipped when science + outputs are absent from skip_existing_in, even if metadata is present. + + A scenario is that an upstream pipeline ran and wrote + metadata but did not retain output datasets. + """ + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + output_run="new_run", + ).build() + self.assertEqual(len(qgraph), 1) + + def test_skips_when_all_outputs_present(self): + """With ignore_metadata_for, quantum is skipped when all science + outputs are present in skip_existing_in. + """ + self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) + self.butler.put(numpy.array([0.0]), "add2_dataset1", run="run", instrument="INSTR", detector=0) + self.butler.put(ButlerLogRecords.from_records([]), "task0_log", run="run", instrument="INSTR", detector=0) + # Init-outputs required when all quanta are skipped. + self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") + self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + output_run="new_run", + ).build() + # All outputs found, so quantum should be skipped. + self.assertEqual(len(qgraph), 0) + + def test_output_exists_error_when_partial_outputs(self): + """With ignore_metadata_for, OutputExistsError is raised when some + outputs exist in the output run and clobber is off. + """ + self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) + # add2_dataset1 absent -> not all outputs present -> task not skipped + + with self.assertRaises(OutputExistsError): + AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + # Use the same run so that partial output is in the way. + output_run="run", + ).build() + + def test_partial_outputs_clobber(self): + """With ignore_metadata_for and clobber=True, partial outputs in the + output run are discarded and the task runs. + """ + self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) + # add2_dataset1 absent -> not all outputs present -> task not skipped + # clobber=True -> add_dataset1 discarded from graph, task runs + + qgraph = AllDimensionsQuantumGraphBuilder( + self.pipeline_graph, + self.butler, + skip_existing_in=["run"], + ignore_metadata_for=["task0"], + input_collections=["test"], + output_run="run", + clobber=True, + ).build() + self.assertEqual(len(qgraph), 1) + + if __name__ == "__main__": lsst.utils.tests.init() unittest.main() From b13c0fbfd1d2e74d4e7b40fa5dd7a02fdbb5667e Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Mon, 11 May 2026 14:34:39 -0700 Subject: [PATCH 4/6] Exclude log datasets from ignore_metadata_for check The completion signal for tasks in `_ignore_metadata_for` previously checked all non-metadata outputs, including log datasets ({task}_log). This caused tasks whose logs were not retained to always not be skipped, even when all science outputs were present. Excluding log datasets from the output check means whether logs were retained is irrelevant to whether science outputs need to be regenerated. --- python/lsst/pipe/base/quantum_graph_builder.py | 10 +++++----- tests/test_graphBuilder.py | 3 +-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/python/lsst/pipe/base/quantum_graph_builder.py b/python/lsst/pipe/base/quantum_graph_builder.py index 410a997e7..c7c20fd45 100644 --- a/python/lsst/pipe/base/quantum_graph_builder.py +++ b/python/lsst/pipe/base/quantum_graph_builder.py @@ -131,9 +131,8 @@ class QuantumGraphBuilder(ABC): ignore_metadata_for : `~collections.abc.Iterable` [ `str` ], optional Task labels for which the task metadata dataset is not used as the completion signal when ``skip_existing_in`` is provided. For these - tasks a quantum is skipped only when all of its non-metadata outputs - are present in ``skip_existing_in``; it is not skipped when any - non-metadata output is absent. This is useful for pipelines + tasks a quantum is skipped only when all of its science outputs + are present in ``skip_existing_in``. This is useful for pipelines where some upstream tasks do not retain all of their outputs, so that those tasks can be re-run to regenerate the missing intermediate datasets. @@ -752,6 +751,7 @@ def _skip_quantum_if_metadata_exists( """ metadata_name = task_node.metadata_output.parent_dataset_type_name metadata_dataset_key = DatasetKey(metadata_name, quantum_key.data_id_values) + log_name = task_node.log_output.parent_dataset_type_name if task_node.log_output is not None else None if task_node.label in self.ignore_metadata_for: # For this task, use actual output datasets as the completion @@ -761,13 +761,13 @@ def _skip_quantum_if_metadata_exists( science_output_keys = [ k for k in skeleton.iter_outputs_of(quantum_key) - if k.parent_dataset_type_name != metadata_name + if k.parent_dataset_type_name != metadata_name and k.parent_dataset_type_name != log_name ] if not science_output_keys or any( skeleton.get_output_for_skip(k) is None for k in science_output_keys ): return False - # All non-metadata outputs are present; fall through to skip. + # All science outputs are present; fall through to skip. elif not skeleton.get_output_for_skip(metadata_dataset_key): # metadata absent: do not skip. return False diff --git a/tests/test_graphBuilder.py b/tests/test_graphBuilder.py index 83510f6ee..719ebcb5f 100644 --- a/tests/test_graphBuilder.py +++ b/tests/test_graphBuilder.py @@ -35,7 +35,7 @@ import numpy import lsst.utils.tests -from lsst.daf.butler import Butler, DatasetType, ButlerLogRecords +from lsst.daf.butler import Butler, DatasetType from lsst.daf.butler.registry import UserExpressionError from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( @@ -345,7 +345,6 @@ def test_skips_when_all_outputs_present(self): """ self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) self.butler.put(numpy.array([0.0]), "add2_dataset1", run="run", instrument="INSTR", detector=0) - self.butler.put(ButlerLogRecords.from_records([]), "task0_log", run="run", instrument="INSTR", detector=0) # Init-outputs required when all quanta are skipped. self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") From d6ef7aada4220dc2a8e58a40b6506f8c9ec54c9c Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Fri, 8 May 2026 14:45:36 -0700 Subject: [PATCH 5/6] Add ignore_metadata_for to SeparablePipelineExecutor SeparablePipelineExecutor is not used by pipetask, but we might as well extend the same option and get tested there. --- .../pipe/base/separable_pipeline_executor.py | 10 +++ tests/test_separable_pipeline_executor.py | 63 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/python/lsst/pipe/base/separable_pipeline_executor.py b/python/lsst/pipe/base/separable_pipeline_executor.py index 52d694ac9..1d9b30de2 100644 --- a/python/lsst/pipe/base/separable_pipeline_executor.py +++ b/python/lsst/pipe/base/separable_pipeline_executor.py @@ -84,6 +84,11 @@ class SeparablePipelineExecutor: for existing outputs, and skips any quanta that have run to completion (or have no work to do). Otherwise, all tasks are attempted (subject to ``clobber_output``). + ignore_metadata_for : `~collections.abc.Iterable` [`str`], optional + Task labels for which the completion signal used by + ``skip_existing_in`` is changed from task metadata to all + non-metadata, non-log outputs existing. Has no effect without + ``skip_existing_in``. task_factory : `.TaskFactory`, optional A custom task factory for use in pre-execution and execution. By default, a new instance of `.TaskFactory` is used. @@ -101,6 +106,7 @@ def __init__( butler: Butler, clobber_output: bool = False, skip_existing_in: Iterable[str] | None = None, + ignore_metadata_for: Iterable[str] | None = None, task_factory: TaskFactory | None = None, resources: ExecutionResources | None = None, raise_on_partial_outputs: bool = True, @@ -115,6 +121,7 @@ def __init__( self._clobber_output = clobber_output self._skip_existing_in = list(skip_existing_in) if skip_existing_in else [] + self._ignore_metadata_for = list(ignore_metadata_for) if ignore_metadata_for else [] self._task_factory = task_factory if task_factory else TaskFactory() self.resources = resources @@ -216,6 +223,7 @@ class are provided automatically (from explicit arguments to this pipeline.to_graph(), self._butler, skip_existing_in=self._skip_existing_in, + ignore_metadata_for=self._ignore_metadata_for, clobber=self._clobber_output, **kwargs, ) @@ -276,6 +284,7 @@ class are provided automatically (from explicit arguments to this "output_run": self._butler.run, "skip_existing_in": self._skip_existing_in, "skip_existing": bool(self._skip_existing_in), + "ignore_metadata_for": self._ignore_metadata_for, "data_query": where, "user": getpass.getuser(), "time": str(datetime.datetime.now()), @@ -344,6 +353,7 @@ class are provided automatically (from explicit arguments to this metadata = { "skip_existing_in": self._skip_existing_in, "skip_existing": bool(self._skip_existing_in), + "ignore_metadata_for": self._ignore_metadata_for, "data_query": where, } qg_builder = self.make_quantum_graph_builder(pipeline, where, builder_class=builder_class, **kwargs) diff --git a/tests/test_separable_pipeline_executor.py b/tests/test_separable_pipeline_executor.py index 5f71a18f6..851ddc31b 100644 --- a/tests/test_separable_pipeline_executor.py +++ b/tests/test_separable_pipeline_executor.py @@ -588,6 +588,69 @@ def test_make_quantum_graph_nowhere_skippartial_noclobber(self): with self.assertRaises(OutputExistsError): executor.build_quantum_graph(pipeline) + def test_make_quantum_graph_nowhere_ignoremeta_not_skipped(self): + """With ignore_metadata_for, a task is not skipped when its science + output is absent, even if metadata is present. + """ + prior_run = "prior_run" + self.butler.registry.registerCollection(prior_run, lsst.daf.butler.CollectionType.RUN) + executor = SeparablePipelineExecutor( + self.butler, + skip_existing_in=[prior_run], + ignore_metadata_for=["a"], + clobber_output=False, + ) + pipeline = Pipeline.fromFile(self.pipeline_file) + + self.butler.put({"zero": 0}, "input") + # Metadata present in prior run but intermediate absent. + self.butler.put(TaskMetadata(), "a_metadata", run=prior_run) + + graph = executor.build_quantum_graph(pipeline) + self.assertEqual(len(graph), 2) + self.assertEqual(graph.quanta_by_task.keys(), {"a", "b"}) + + def test_make_quantum_graph_nowhere_ignoremeta_skipped(self): + """With ignore_metadata_for, a task is skipped when its science output + is present, even if metadata and log are absent. + """ + executor = SeparablePipelineExecutor( + self.butler, + skip_existing_in=[self.butler.run], + ignore_metadata_for=["a"], + clobber_output=False, + ) + pipeline = Pipeline.fromFile(self.pipeline_file) + + self.butler.put({"zero": 0}, "input") + # Science output present; metadata and log intentionally absent. + self.butler.put({"zero": 0}, "intermediate") + self.butler.put(lsst.pex.config.Config(), "a_config") + + graph = executor.build_quantum_graph(pipeline) + self.assertEqual(len(graph), 1) + self.assertEqual(graph.header.n_task_quanta["a"], 0) + self.assertEqual(graph.header.n_task_quanta["b"], 1) + + def test_make_quantum_graph_nowhere_ignoremeta_metainway_noclobber(self): + """With ignore_metadata_for, OutputExistsError is raised when metadata + is already in the output run and the task is not skipped. + """ + executor = SeparablePipelineExecutor( + self.butler, + skip_existing_in=[self.butler.run], + ignore_metadata_for=["a"], + clobber_output=False, + ) + pipeline = Pipeline.fromFile(self.pipeline_file) + + self.butler.put({"zero": 0}, "input") + self.butler.put(TaskMetadata(), "a_metadata") + # Metadata in output run, intermediate absent -> task not skipped + # Same output run, a_metadata is in the way. + with self.assertRaises(OutputExistsError): + executor.build_quantum_graph(pipeline) + def test_build_quantum_graph_nowhere_noskip_clobber(self): executor = SeparablePipelineExecutor(self.butler, skip_existing_in=None, clobber_output=True) pipeline = Pipeline.fromFile(self.pipeline_file) From 2e9ac52094145ec6b95f2b3b63f25641633024be Mon Sep 17 00:00:00 2001 From: Hsin-Fang Chiang Date: Tue, 12 May 2026 15:51:16 -0700 Subject: [PATCH 6/6] Add a note in doc/changes --- doc/changes/DM-54879.feature.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 doc/changes/DM-54879.feature.md diff --git a/doc/changes/DM-54879.feature.md b/doc/changes/DM-54879.feature.md new file mode 100644 index 000000000..de148e48b --- /dev/null +++ b/doc/changes/DM-54879.feature.md @@ -0,0 +1 @@ +Added `ignore_metadata_for` parameter to `QuantumGraphBuilder` and `SeparablePipelineExecutor`.