-
Notifications
You must be signed in to change notification settings - Fork 11
DM-54879: Add --ignore-existing-metadata-for for when upstream outputs are selectively retained #561
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
DM-54879: Add --ignore-existing-metadata-for for when upstream outputs are selectively retained #561
Changes from all commits
0db29d2
88ca8ff
3544778
b13c0fb
d6ef7aa
2e9ac52
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Added `ignore_metadata_for` parameter to `QuantumGraphBuilder` and `SeparablePipelineExecutor`. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -128,6 +128,14 @@ class QuantumGraphBuilder(ABC): | |
| skip_existing_in : `~collections.abc.Sequence` [ `str` ], optional | ||
| Collections to search for outputs that already exist for the purpose of | ||
| skipping quanta that have already been run. | ||
| ignore_metadata_for : `~collections.abc.Iterable` [ `str` ], optional | ||
| Task labels for which the task metadata dataset is not used as the | ||
| completion signal when ``skip_existing_in`` is provided. For these | ||
| tasks a quantum is skipped only when all of its science outputs | ||
| are present in ``skip_existing_in``. This is useful for pipelines | ||
| where some upstream tasks do not retain all of their outputs, so that | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reword to "where upstream processing does not retain all task outputs" (it's not the tasks that are retaining or not). |
||
| those tasks can be re-run to regenerate the missing intermediate | ||
| datasets. | ||
| clobber : `bool`, optional | ||
| Whether to raise if predicted outputs already exist in ``output_run`` | ||
| (not including those quanta that would be skipped because they've | ||
|
|
@@ -171,6 +179,7 @@ def __init__( | |
| input_collections: Sequence[str] | None = None, | ||
| output_run: str | None = None, | ||
| skip_existing_in: Sequence[str] = (), | ||
| ignore_metadata_for: Iterable[str] = (), | ||
| clobber: bool = False, | ||
| ): | ||
| self.log = getLogger(__name__) | ||
|
|
@@ -188,6 +197,7 @@ def __init__( | |
| self.butler = butler.clone(collections=input_collections) | ||
| self.output_run = output_run | ||
| self.skip_existing_in = skip_existing_in | ||
| self.ignore_metadata_for: frozenset[str] = frozenset(ignore_metadata_for) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We probably want to raise here if |
||
| self.empty_data_id = DataCoordinate.make_empty(butler.dimensions) | ||
| self.clobber = clobber | ||
| # See whether the output run already exists. | ||
|
|
@@ -703,7 +713,7 @@ def _skip_quantum_if_metadata_exists( | |
| self, task_node: TaskNode, quantum_key: QuantumKey, skeleton: QuantumGraphSkeleton | ||
| ) -> bool: | ||
| """Identify and drop quanta that should be skipped because their | ||
| metadata datasets already exist. | ||
| metadata or output datasets already exist in ``skip_existing_in``. | ||
|
|
||
| Parameters | ||
| ---------- | ||
|
|
@@ -722,41 +732,65 @@ def _skip_quantum_if_metadata_exists( | |
|
|
||
| Notes | ||
| ----- | ||
| If the metadata dataset for this quantum exists in the | ||
| `skip_existing_in` collections, the quantum will be skipped. This | ||
| causes the quantum node to be removed from the graph. Dataset nodes | ||
| For tasks not listed in `ignore_metadata_for`, a quantum is skipped | ||
| when its metadata dataset exists in ``skip_existing_in``. | ||
|
|
||
| For tasks listed in `ignore_metadata_for`, the metadata dataset is | ||
| not used as the completion signal. Instead, a quantum is skipped only | ||
| when all of its science outputs are present in ``skip_existing_in``. | ||
| If any such output is absent the quantum is not skipped, so the task | ||
| can regenerate the missing outputs. This supports pipelines where | ||
| upstream tasks do not retain all of their outputs. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Reword as in the other thread. |
||
|
|
||
| The skipped quantum node is to be removed from the graph. Dataset nodes | ||
| that were previously the outputs of this quantum will be associated | ||
| with `lsst.daf.butler.DatasetRef` objects that were found in | ||
| ``skip_existing_in``, or will be removed if there is no such dataset | ||
| there. Any output dataset in `output_run` will be removed from the | ||
| "output in the way" category. | ||
| """ | ||
| metadata_dataset_key = DatasetKey( | ||
| task_node.metadata_output.parent_dataset_type_name, quantum_key.data_id_values | ||
| ) | ||
| if skeleton.get_output_for_skip(metadata_dataset_key): | ||
| # This quantum's metadata is already present in the the | ||
| # skip_existing_in collections; we'll skip it. But the presence of | ||
| # the metadata dataset doesn't guarantee that all of the other | ||
| # outputs we predicted are present; we have to check. | ||
| for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): | ||
| # If this dataset was "in the way" (i.e. already in the | ||
| # output run), it isn't anymore. | ||
| skeleton.discard_output_in_the_way(output_dataset_key) | ||
| if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: | ||
| # Populate the skeleton graph's node attributes | ||
| # with the existing DatasetRef, just like a | ||
| # predicted output of a non-skipped quantum. | ||
| skeleton.set_dataset_ref(output_ref, output_dataset_key) | ||
| else: | ||
| # Remove this dataset from the skeleton graph, | ||
| # because the quantum that would have produced it | ||
| # is being skipped and it doesn't already exist. | ||
| skeleton.remove_dataset_nodes([output_dataset_key]) | ||
| # Removing the quantum node from the graph will happen outside this | ||
| # function. | ||
| return True | ||
| return False | ||
| metadata_name = task_node.metadata_output.parent_dataset_type_name | ||
| metadata_dataset_key = DatasetKey(metadata_name, quantum_key.data_id_values) | ||
| log_name = task_node.log_output.parent_dataset_type_name if task_node.log_output is not None else None | ||
|
|
||
| if task_node.label in self.ignore_metadata_for: | ||
| # For this task, use actual output datasets as the completion | ||
| # signal rather than metadata. Skip only if all science | ||
| # outputs are present in skip_existing_in; do not skip if | ||
| # any are absent so they can be regenerated. | ||
| science_output_keys = [ | ||
| k | ||
| for k in skeleton.iter_outputs_of(quantum_key) | ||
| if k.parent_dataset_type_name != metadata_name and k.parent_dataset_type_name != log_name | ||
| ] | ||
| if not science_output_keys or any( | ||
| skeleton.get_output_for_skip(k) is None for k in science_output_keys | ||
| ): | ||
| return False | ||
| # All science outputs are present; fall through to skip. | ||
| elif not skeleton.get_output_for_skip(metadata_dataset_key): | ||
| # metadata absent: do not skip. | ||
| return False | ||
|
|
||
| # We will skip the quantum. But it doesn't guarantee that all of the | ||
| # other outputs we predicted are present; we have to check. | ||
| for output_dataset_key in list(skeleton.iter_outputs_of(quantum_key)): | ||
| # If this dataset was "in the way" (i.e. already in the | ||
| # output run), it isn't anymore. | ||
| skeleton.discard_output_in_the_way(output_dataset_key) | ||
| if (output_ref := skeleton.get_output_for_skip(output_dataset_key)) is not None: | ||
| # Populate the skeleton graph's node attributes | ||
| # with the existing DatasetRef, just like a | ||
| # predicted output of a non-skipped quantum. | ||
| skeleton.set_dataset_ref(output_ref, output_dataset_key) | ||
| else: | ||
| # Remove this dataset from the skeleton graph, | ||
| # because the quantum that would have produced it | ||
| # is being skipped and it doesn't already exist. | ||
| skeleton.remove_dataset_nodes([output_dataset_key]) | ||
| # Removing the quantum node from the graph will happen outside this | ||
| # function. | ||
| return True | ||
|
|
||
| @final | ||
| def _update_quantum_for_adjust( | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -29,16 +29,20 @@ | |
|
|
||
| import io | ||
| import logging | ||
| import tempfile | ||
| import unittest | ||
|
|
||
| import numpy | ||
|
|
||
| import lsst.utils.tests | ||
| from lsst.daf.butler import Butler, DatasetType | ||
| from lsst.daf.butler.registry import UserExpressionError | ||
| from lsst.pipe.base import PipelineGraph, QuantumGraph | ||
| from lsst.pipe.base import PipelineGraph, QuantumGraph, TaskMetadata | ||
| from lsst.pipe.base.all_dimensions_quantum_graph_builder import ( | ||
| AllDimensionsQuantumGraphBuilder, | ||
| DatasetQueryConstraintVariant, | ||
| ) | ||
| from lsst.pipe.base.quantum_graph_builder import OutputExistsError | ||
| from lsst.pipe.base.tests import simpleQGraph | ||
| from lsst.pipe.base.tests.mocks import ( | ||
| DynamicConnectionConfig, | ||
|
|
@@ -228,6 +232,172 @@ def test_datastore_records(self): | |
| self.assertEqual(quantum.datastore_records, {}) | ||
|
|
||
|
|
||
| class SkipExistingInTestCase(unittest.TestCase): | ||
| """Tests for the skip_existing_in behavior of QuantumGraphBuilder.""" | ||
|
|
||
| def setUp(self): | ||
| repodir = tempfile.TemporaryDirectory() | ||
| self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) | ||
| pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) | ||
| butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think this is worth changing at this point, but for new tests I'd generally recommend |
||
| self.enterContext(butler) | ||
| self.butler = butler | ||
| self.pipeline_graph = pipeline.to_graph() | ||
| self.butler.registry.registerRun("run") | ||
|
|
||
| def test_not_skipped_without_skip_existing_in(self): | ||
| """Without skip_existing_in, a quantum is never skipped even if | ||
| metadata exists in an input collection. | ||
| """ | ||
| self.butler.put( | ||
| TaskMetadata(), | ||
| "task0_metadata", | ||
| run="run", | ||
| instrument="INSTR", | ||
| detector=0, | ||
| ) | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, self.butler, input_collections=["test"], output_run="new_run" | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
| def test_skipped_when_metadata_exists(self): | ||
| """With skip_existing_in, a quantum is skipped when its metadata | ||
| dataset is present in the specified collections. | ||
| """ | ||
| self.butler.put( | ||
| TaskMetadata(), | ||
| "task0_metadata", | ||
| run="run", | ||
| instrument="INSTR", | ||
| detector=0, | ||
| ) | ||
| # Init-outputs required, otherwise InitInputMissingError. | ||
| self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") | ||
| self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 0) | ||
|
|
||
| def test_not_skipped_when_metadata_absent(self): | ||
| """With skip_existing_in, a quantum is not skipped when its metadata | ||
| dataset is absent from the specified collections. | ||
| """ | ||
| # No metadata put — run exists but is empty. | ||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
|
|
||
| class IgnoreMetadataForTestCase(unittest.TestCase): | ||
| """Tests for QuantumGraphBuilder.ignore_metadata_for.""" | ||
|
|
||
| def setUp(self): | ||
| repodir = tempfile.TemporaryDirectory() | ||
| self.addCleanup(tempfile.TemporaryDirectory.cleanup, repodir) | ||
| pipeline = simpleQGraph.makeSimplePipeline(nQuanta=1) | ||
| butler, _ = simpleQGraph.makeSimpleQGraph(root=repodir.name, pipeline=pipeline, nQuanta=1) | ||
| self.enterContext(butler) | ||
| self.butler = butler | ||
| self.pipeline_graph = pipeline.to_graph() | ||
| # Simulate a prior run and put a metadata. | ||
| self.butler.registry.registerRun("run") | ||
| self.butler.put( | ||
| TaskMetadata(), | ||
| "task0_metadata", | ||
| run="run", | ||
| instrument="INSTR", | ||
| detector=0, | ||
| ) | ||
|
|
||
| def test_not_skipped_when_outputs_missing(self): | ||
| """With ignore_metadata_for, quantum is not skipped when science | ||
| outputs are absent from skip_existing_in, even if metadata is present. | ||
|
|
||
| A scenario is that an upstream pipeline ran and wrote | ||
| metadata but did not retain output datasets. | ||
| """ | ||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| ignore_metadata_for=["task0"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
| def test_skips_when_all_outputs_present(self): | ||
| """With ignore_metadata_for, quantum is skipped when all science | ||
| outputs are present in skip_existing_in. | ||
| """ | ||
| self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) | ||
| self.butler.put(numpy.array([0.0]), "add2_dataset1", run="run", instrument="INSTR", detector=0) | ||
| # Init-outputs required when all quanta are skipped. | ||
| self.butler.put(numpy.array([0.0]), "add_init_output1", run="run") | ||
| self.butler.put(simpleQGraph.AddTaskConfig(), "task0_config", run="run") | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| ignore_metadata_for=["task0"], | ||
| input_collections=["test"], | ||
| output_run="new_run", | ||
| ).build() | ||
| # All outputs found, so quantum should be skipped. | ||
| self.assertEqual(len(qgraph), 0) | ||
|
|
||
| def test_output_exists_error_when_partial_outputs(self): | ||
| """With ignore_metadata_for, OutputExistsError is raised when some | ||
| outputs exist in the output run and clobber is off. | ||
| """ | ||
| self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) | ||
| # add2_dataset1 absent -> not all outputs present -> task not skipped | ||
|
|
||
| with self.assertRaises(OutputExistsError): | ||
| AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| ignore_metadata_for=["task0"], | ||
| input_collections=["test"], | ||
| # Use the same run so that partial output is in the way. | ||
| output_run="run", | ||
| ).build() | ||
|
|
||
| def test_partial_outputs_clobber(self): | ||
| """With ignore_metadata_for and clobber=True, partial outputs in the | ||
| output run are discarded and the task runs. | ||
| """ | ||
| self.butler.put(numpy.array([0.0]), "add_dataset1", run="run", instrument="INSTR", detector=0) | ||
| # add2_dataset1 absent -> not all outputs present -> task not skipped | ||
| # clobber=True -> add_dataset1 discarded from graph, task runs | ||
|
|
||
| qgraph = AllDimensionsQuantumGraphBuilder( | ||
| self.pipeline_graph, | ||
| self.butler, | ||
| skip_existing_in=["run"], | ||
| ignore_metadata_for=["task0"], | ||
| input_collections=["test"], | ||
| output_run="run", | ||
| clobber=True, | ||
| ).build() | ||
| self.assertEqual(len(qgraph), 1) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| lsst.utils.tests.init() | ||
| unittest.main() | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm curious where this is coming from; AFAIK we don't use
bytearrayormemoryview[int]for any of these.