From d7c4c691dff8a25c52306eaa53518c8bd5c83129 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 24 Oct 2025 15:04:18 -0700 Subject: [PATCH 01/24] initial commit for bl7.0.1.1 --- config.yml | 49 ++++-- orchestration/flows/bl7011/__init__.py | 0 orchestration/flows/bl7011/config.py | 14 ++ orchestration/flows/bl7011/dispatcher.py | 57 ++++++ orchestration/flows/bl7011/move.py | 215 +++++++++++++++++++++++ orchestration/flows/bl7011/prefect.yaml | 31 ++++ 6 files changed, 354 insertions(+), 12 deletions(-) create mode 100644 orchestration/flows/bl7011/__init__.py create mode 100644 orchestration/flows/bl7011/config.py create mode 100644 orchestration/flows/bl7011/dispatcher.py create mode 100644 orchestration/flows/bl7011/move.py create mode 100644 orchestration/flows/bl7011/prefect.yaml diff --git a/config.yml b/config.yml index d0eec943..6173555f 100644 --- a/config.yml +++ b/config.yml @@ -1,5 +1,42 @@ globus: globus_endpoints: + + # 7.0.1.1 ENDPOINTS + + bl7011-als-cosmic-scattering: + root_path: /data + uri: bl7011-als-cosmic-scattering.lbl.gov + uuid: e19da22e-4af5-4901-acae-d7e702def054 + name: bl7011-als-cosmic-scattering + + bl7011-nersc_alsdev: + root_path: /global/cfs/cdirs/als/gsharing/data_mover/7011 + uri: nersc.gov + uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 + name: bl7011-nersc_alsdev + + bl7011-compute-dtn: + root_path: / + uri: compute-dtn.als.lbl.gov + uuid: TBD + name: bl7011-compute-dtn + + # 7.0.1.2 ENDPOINTS + + nersc7012: + root_path: /global/cfs/cdirs/als/gsharing/data_mover/7012 + uri: nersc.gov + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 + name: nersc7012 + + data7012: + root_path: / + uri: hpc.lbl.gov + uuid: 741b96e1-1b98-42a8-918d-daacc24c145f + name: data7012 + + # 8.3.2 ENDPOINTS + spot832: root_path: / uri: spot832.lbl.gov @@ -90,18 +127,6 @@ globus: uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: nersc832 - nersc7012: - root_path: /global/cfs/cdirs/als/gsharing/data_mover/7012 - uri: nersc.gov - uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 - name: nersc7012 - - data7012: - root_path: / - uri: hpc.lbl.gov - uuid: 741b96e1-1b98-42a8-918d-daacc24c145f - name: data7012 - globus_apps: als_transfer: client_id: ${GLOBUS_CLIENT_ID} diff --git a/orchestration/flows/bl7011/__init__.py b/orchestration/flows/bl7011/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/flows/bl7011/config.py b/orchestration/flows/bl7011/config.py new file mode 100644 index 00000000..70275c98 --- /dev/null +++ b/orchestration/flows/bl7011/config.py @@ -0,0 +1,14 @@ +from globus_sdk import TransferClient +from orchestration.globus import transfer + + +# TODO: Use BeamlineConfig base class (Waiting for PR #62 to be merged) +class Config7011: + def __init__(self) -> None: + config = transfer.get_config() + self.endpoints = transfer.build_endpoints(config) + self.apps = transfer.build_apps(config) + self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) + self.bl7011_als_cosmic_scattering = self.endpoints["bl7011-als-cosmic-scattering"] + self.bl7011_nersc_alsdev = self.endpoints["bl7011-nersc-alsdev"] + self.bl7011_compute_dtn = self.endpoints["bl7011-compute-dtn"] diff --git a/orchestration/flows/bl7011/dispatcher.py b/orchestration/flows/bl7011/dispatcher.py new file mode 100644 index 00000000..aca77ece --- /dev/null +++ b/orchestration/flows/bl7011/dispatcher.py @@ -0,0 +1,57 @@ +import logging +from prefect import flow +from typing import Optional, Union, Any + +from orchestration.flows.bl7011.move import process_new_7011_file + +logger = logging.getLogger(__name__) + + +# TODO Once this PR (https://github.com/als-computing/splash_flows/pull/62) is merged, we can use config: Config7011 +@flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") +def dispatcher( + file_path: Optional[str] = None, + is_export_control: bool = False, + config: Optional[Union[dict, Any]] = None, +) -> None: + """ + Dispatcher flow for BL7011 beamline that launches the new_7011_file_flow. + + :param file_path: Path to the file to be processed. + :param is_export_control: Flag indicating if export control measures should be applied. + (Not used in the current BL7011 processing) + :param config: Configuration settings for processing. + Expected to be an instance of Config7011 or a dict that can be converted. + :raises ValueError: If no configuration is provided. + :raises TypeError: If the provided configuration is not a dict or Config7011. + """ + + logger.info("Starting dispatcher flow for BL 7.0.1.1") + logger.info(f"Parameters received: file_path={file_path}, is_export_control={is_export_control}") + + # Validate inputs and raise errors if necessary. The ValueErrors prevent the rest of the flow from running. + if file_path is None: + logger.error("No file_path provided to dispatcher.") + raise ValueError("File path is required for processing.") + + if is_export_control: + logger.error("Data is under export control. Processing is not allowed.") + raise ValueError("Data is under export control. Processing is not allowed.") + + if config is None: + logger.error("No configuration provided to dispatcher.") + raise ValueError("Configuration (config) is required for processing.") + + try: + process_new_7011_file( + file_path=file_path, + config=config + ) + logger.info("Dispatcher flow completed successfully.") + except Exception as e: + logger.error(f"Error during processing in dispatcher flow: {e}") + raise + + +if __name__ == "__main__": + dispatcher() diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py new file mode 100644 index 00000000..0ab24c58 --- /dev/null +++ b/orchestration/flows/bl7011/move.py @@ -0,0 +1,215 @@ +import datetime +import logging +from typing import Optional + +from prefect import flow +# from prefect.blocks.system import JSON + +from orchestration.flows.bl7011.config import Config7011 +from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe +from orchestration.prefect import schedule_prefect_flow +from orchestration.transfer_controller import CopyMethod, get_transfer_controller + +logger = logging.getLogger(__name__) + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +def prune( + file_path: str = None, + source_endpoint: GlobusEndpoint = None, + check_endpoint: Optional[GlobusEndpoint] = None, + days_from_now: float = 0.0, + config: Config7011 = None +) -> bool: + """ + Prune (delete) data from a globus endpoint. + If days_from_now is 0, executes pruning immediately. + Otherwise, schedules pruning for future execution using Prefect. + Args: + file_path (str): The path to the file or directory to prune + source_endpoint (GlobusEndpoint): The globus endpoint containing the data + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + days_from_now (float): Delay before pruning; if 0, prune immediately + Returns: + bool: True if pruning was successful or scheduled successfully, False otherwise + """ + if not file_path: + logger.error("No file_path provided for pruning operation") + return False + + if not source_endpoint: + logger.error("No source_endpoint provided for pruning operation") + return False + + if not config: + config = Config7011() + + if days_from_now < 0: + raise ValueError(f"Invalid days_from_now: {days_from_now}") + + # JSON blocks are deprecated, we should use what they recommend in the docs + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + + logger.info(f"Setting up pruning of '{file_path}' from '{source_endpoint.name}'") + + # convert float days → timedelta + delay: datetime.timedelta = datetime.timedelta(days=days_from_now) + + # If days_from_now is 0, prune immediately + if delay.total_seconds() == 0: + logger.info(f"Executing immediate pruning of '{file_path}' from '{source_endpoint.name}'") + return _prune_globus_endpoint( + relative_path=file_path, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + config=config + ) + else: + # Otherwise, schedule pruning for future execution + logger.info(f"Scheduling pruning of '{file_path}' from '{source_endpoint.name}' " + f"in {delay.total_seconds()/86400:.1f} days") + + try: + schedule_prefect_flow( + deployment_name="prune_globus_endpoint/prune_globus_endpoint", + parameters={ + "relative_path": file_path, + "source_endpoint": source_endpoint, + "check_endpoint": check_endpoint, + "config": config + }, + duration_from_now=delay, + ) + logger.info(f"Successfully scheduled pruning task for {delay.total_seconds()/86400:.1f} days from now") + return True + except Exception as e: + logger.error(f"Failed to schedule pruning task: {str(e)}", exc_info=True) + return False + +# Prune code is from the prune_controller in this PR: https://github.com/als-computing/splash_flows_globus/pulls +# Note: once the PR is merged, we can import prune_controller directly instead of copying the code here. + + +# @staticmethod +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{{ relative_path | basename }}") +def _prune_globus_endpoint( + relative_path: str, + source_endpoint: GlobusEndpoint, + check_endpoint: Optional[GlobusEndpoint] = None, + config: Config7011 = None +) -> None: + """ + Prefect flow that performs the actual Globus endpoint pruning operation. + Args: + relative_path (str): The path of the file or directory to prune + source_endpoint (GlobusEndpoint): The Globus endpoint to prune from + check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning + config (BeamlineConfig): Configuration object with transfer client + """ + logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") + + if not config: + config = Config7011() + + # globus_settings = JSON.load("globus-settings").value + # max_wait_seconds = globus_settings["max_wait_seconds"] + max_wait_seconds = 600 + flow_name = f"prune_from_{source_endpoint.name}" + logger.info(f"Running flow: {flow_name}") + logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") + prune_one_safe( + file=relative_path, + if_older_than_days=0, + transfer_client=config.tc, + source_endpoint=source_endpoint, + check_endpoint=check_endpoint, + logger=logger, + max_wait_seconds=max_wait_seconds + ) + + +@flow(name="new_7011_file_flow", flow_run_name="process_new-{file_path}") +def process_new_7011_file( + file_path: str, + config: Config7011 +) -> None: + """ + Flow to process a new file at BL 7.0.1.1 + 1. Copy the file from the data7011 to NERSC CFS. Ingest file path in SciCat. + 2. Schedule pruning from data7011. 6 months from now. + 3. Copy the file from NERSC CFS to NERSC HPSS. Ingest file path in SciCat. + 4. Schedule pruning from NERSC CFS. + + :param file_path: Path to the new file to be processed. + :param config: Configuration settings for processing. + """ + + logger.info(f"Processing new 7011 file: {file_path}") + + if not config: + config = Config7011() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + transfer_controller.copy( + file_path=file_path, + source=config.bl7011_als_cosmic_scattering, + destination=config.bl7011_nersc_alsdev + ) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + # Schedule pruning from QNAP + # Waiting for PR #62 to be merged (prune_controller) + # TODO: Determine scheduling days_from_now based on beamline needs + prune( + file_path=file_path, + source_endpoint=config.bl7011_als_cosmic_scattering, + check_endpoint=config.bl7011_nersc_alsdev, + days_from_now=180.0 # work with Chenhui/Eric to determine appropriate value: 6 months + ) + + # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? + # Waiting for PR #62 to be merged (transfer_controller) + + # TODO: Ingest file path in SciCat + # Waiting for PR #62 to be merged (scicat_controller) + + +@flow(name="move_7011_flight_check", flow_run_name="move_7011_flight_check-{file_path}") +def move_7011_flight_check( + file_path: str = "test_directory/test.txt", +): + """Please keep your arms and legs inside the vehicle at all times.""" + logger.info("7011 flight check: testing transfer from data7011 to NERSC CFS") + + config = Config7011() + + transfer_controller = get_transfer_controller( + transfer_type=CopyMethod.GLOBUS, + config=config + ) + + success = transfer_controller.copy( + file_path=file_path, + source=config.bl7011_als_cosmic_scattering, + destination=config.bl7011_nersc_alsdev + ) + if success is True: + logger.info("7011 flight check: transfer successful") + else: + logger.error("7011 flight check: transfer failed") + + +if __name__ == "__main__": + # Example usage + config = Config7011() + file_path = "test_directory/" + process_new_7011_file(file_path, config) diff --git a/orchestration/flows/bl7011/prefect.yaml b/orchestration/flows/bl7011/prefect.yaml new file mode 100644 index 00000000..54cd244c --- /dev/null +++ b/orchestration/flows/bl7011/prefect.yaml @@ -0,0 +1,31 @@ +name: bl7011 +prefect-version: 3.4.2 +deployments: +- name: new_file_7011 + entrypoint: orchestration/flows/bl7011/move.py:process_new_7011_file + work_pool: + name: new_file_7011_pool + work_queue_name: new_file_7011_queue + +- name: new_file_7011_flight_check + entrypoint: orchestration/flows/bl7011/move.py:move_7011_flight_check + work_pool: + name: new_file_7011_pool + work_queue_name: move_file_7011_flight_check_queue + schedules: + - cron: "0 */12 * * *" # Every 12 hours + slug: "test-move-7011-flight-check" + timezone: America/Los_Angeles + active: true + +- name: run_7011_dispatcher + entrypoint: orchestration/flows/bl7011/dispatcher.py:dispatcher + work_pool: + name: dispatcher_7011_pool + work_queue_name: dispatcher_7011_queue + +- name: prune_data7011 + entrypoint: orchestration/flows/bl7011/move.py:_prune_globus_endpoint + work_pool: + name: prune_7011_pool + work_queue_name: prune_7011_queue From 79e76ccae1a3cafd750f93861cc377461c85ce05 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 27 Oct 2025 13:07:07 -0700 Subject: [PATCH 02/24] Adding globus collection uuid for ALS Computing 7.0.1.1 to the config --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 6173555f..9381117c 100644 --- a/config.yml +++ b/config.yml @@ -18,7 +18,7 @@ globus: bl7011-compute-dtn: root_path: / uri: compute-dtn.als.lbl.gov - uuid: TBD + uuid: 5ae9485d-87a4-4349-82ac-e214963ed823 name: bl7011-compute-dtn # 7.0.1.2 ENDPOINTS From 0f26ecb3c132c634a4f2f6a927967f4b11eed5d8 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 27 Oct 2025 13:09:13 -0700 Subject: [PATCH 03/24] Updating transfer/prune flow to use the compute-dtn collection for 7.0.1.1 --- orchestration/flows/bl7011/move.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 0ab24c58..e5ef2e32 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -159,7 +159,7 @@ def process_new_7011_file( transfer_controller.copy( file_path=file_path, - source=config.bl7011_als_cosmic_scattering, + source=config.bl7011_compute_dtn, destination=config.bl7011_nersc_alsdev ) @@ -171,9 +171,9 @@ def process_new_7011_file( # TODO: Determine scheduling days_from_now based on beamline needs prune( file_path=file_path, - source_endpoint=config.bl7011_als_cosmic_scattering, + source_endpoint=config.bl7011_compute_dtn, check_endpoint=config.bl7011_nersc_alsdev, - days_from_now=180.0 # work with Chenhui/Eric to determine appropriate value: 6 months + days_from_now=180.0 # work with Sofie/Sujoy to determine appropriate retention period ) # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? @@ -199,7 +199,7 @@ def move_7011_flight_check( success = transfer_controller.copy( file_path=file_path, - source=config.bl7011_als_cosmic_scattering, + source=config.bl7011_compute_dtn, destination=config.bl7011_nersc_alsdev ) if success is True: From 057ff324f407444e7131bfb649b928c8422f8694 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Mon, 27 Oct 2025 16:47:12 -0700 Subject: [PATCH 04/24] fixing pytests for prefect 3 --- orchestration/_tests/test_globus_flow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/orchestration/_tests/test_globus_flow.py b/orchestration/_tests/test_globus_flow.py index b71be618..b9c36a91 100644 --- a/orchestration/_tests/test_globus_flow.py +++ b/orchestration/_tests/test_globus_flow.py @@ -25,13 +25,13 @@ def prefect_test_fixture(): """ with prefect_test_harness(): globus_client_id = Secret(value=str(uuid4())) - globus_client_id.save(name="globus-client-id") + globus_client_id.save(name="globus-client-id", overwrite=True) globus_client_secret = Secret(value=str(uuid4())) - globus_client_secret.save(name="globus-client-secret") + globus_client_secret.save(name="globus-client-secret", overwrite=True) globus_compute_endpoint = Secret(value=str(uuid4())) - globus_compute_endpoint.save(name="globus-compute-endpoint") + globus_compute_endpoint.save(name="globus-compute-endpoint", overwrite=True) Variable.set( name="pruning-config", From d8fb74ed63aa318f8bd2ecb9b2e3efa466ce5922 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 15:05:16 -0700 Subject: [PATCH 05/24] Making config optional in new_7011_file_flow and dispatcher, since they are initialized in the functions if set to None --- orchestration/flows/bl7011/config.py | 2 +- orchestration/flows/bl7011/dispatcher.py | 5 +++-- orchestration/flows/bl7011/move.py | 2 +- 3 files changed, 5 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl7011/config.py b/orchestration/flows/bl7011/config.py index 70275c98..a4628e16 100644 --- a/orchestration/flows/bl7011/config.py +++ b/orchestration/flows/bl7011/config.py @@ -10,5 +10,5 @@ def __init__(self) -> None: self.apps = transfer.build_apps(config) self.tc: TransferClient = transfer.init_transfer_client(self.apps["als_transfer"]) self.bl7011_als_cosmic_scattering = self.endpoints["bl7011-als-cosmic-scattering"] - self.bl7011_nersc_alsdev = self.endpoints["bl7011-nersc-alsdev"] + self.bl7011_nersc_alsdev = self.endpoints["bl7011-nersc_alsdev"] self.bl7011_compute_dtn = self.endpoints["bl7011-compute-dtn"] diff --git a/orchestration/flows/bl7011/dispatcher.py b/orchestration/flows/bl7011/dispatcher.py index aca77ece..5a9c24bf 100644 --- a/orchestration/flows/bl7011/dispatcher.py +++ b/orchestration/flows/bl7011/dispatcher.py @@ -2,6 +2,7 @@ from prefect import flow from typing import Optional, Union, Any +from orchestration.flows.bl7011.config import Config7011 from orchestration.flows.bl7011.move import process_new_7011_file logger = logging.getLogger(__name__) @@ -39,8 +40,8 @@ def dispatcher( raise ValueError("Data is under export control. Processing is not allowed.") if config is None: - logger.error("No configuration provided to dispatcher.") - raise ValueError("Configuration (config) is required for processing.") + config = Config7011() + logger.info("No config provided. Using default Config7011.") try: process_new_7011_file( diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index e5ef2e32..a1ebee19 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -134,7 +134,7 @@ def _prune_globus_endpoint( @flow(name="new_7011_file_flow", flow_run_name="process_new-{file_path}") def process_new_7011_file( file_path: str, - config: Config7011 + config: Optional[Config7011] = None ) -> None: """ Flow to process a new file at BL 7.0.1.1 From dfeaecb0b0210fd68dc0ba28e8a607d413425c1d Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 15:05:31 -0700 Subject: [PATCH 06/24] Adding pytests for move and dispatcher --- orchestration/_tests/test_bl7011/__init__.py | 0 orchestration/_tests/test_bl7011/test_move.py | 174 ++++++++++++++++++ 2 files changed, 174 insertions(+) create mode 100644 orchestration/_tests/test_bl7011/__init__.py create mode 100644 orchestration/_tests/test_bl7011/test_move.py diff --git a/orchestration/_tests/test_bl7011/__init__.py b/orchestration/_tests/test_bl7011/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/orchestration/_tests/test_bl7011/test_move.py b/orchestration/_tests/test_bl7011/test_move.py new file mode 100644 index 00000000..b26748cd --- /dev/null +++ b/orchestration/_tests/test_bl7011/test_move.py @@ -0,0 +1,174 @@ +'''Pytest unit tests for BL7011 move flow. ''' + +import logging +import pytest +from uuid import uuid4 + +from prefect.testing.utilities import prefect_test_harness +from prefect.blocks.system import Secret, JSON +from pytest_mock import MockFixture + +from orchestration._tests.test_transfer_controller import MockSecret +from orchestration.flows.bl7011.config import Config7011 + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@pytest.fixture(autouse=True, scope="session") +def prefect_test_fixture(): + """ + A pytest fixture that automatically sets up and tears down the Prefect test harness + for the entire test session. It creates and saves test secrets and configurations + required for Globus integration. + + Yields: + None + """ + with prefect_test_harness(): + globus_client_id = Secret(value=str(uuid4())) + globus_client_id.save(name="globus-client-id", overwrite=True) + + globus_client_secret = Secret(value=str(uuid4())) + globus_client_secret.save(name="globus-client-secret", overwrite=True) + + pruning_config = JSON(value={"max_wait_seconds": 600}) + pruning_config.save(name="pruning-config", overwrite=True) + + yield + + +# ---------------------------- +# Tests for 7011 +# ---------------------------- + +def test_process_new_7011_file(mocker: MockFixture) -> None: + """ + Test the process_new_7011_file flow from orchestration.flows.bl7011.move. + + This test verifies that: + - The get_transfer_controller function is called (patched) with the correct parameters. + - The returned transfer controller's copy method is called with the expected file path, + source, and destination endpoints from the provided configuration. + + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the flow to test. + from orchestration.flows.bl7011.move import process_new_7011_file + + # Patch the Secret.load and init_transfer_client in the configuration context. + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl7011.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + + # Instantiate the dummy configuration. + mock_config = Config7011() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Create a mock transfer controller with a mocked 'copy' method. + mock_transfer_controller = mocker.MagicMock() + mock_transfer_controller.copy.return_value = True + + mock_prune = mocker.patch( + "orchestration.flows.bl7011.move.prune", + return_value=None + ) + + # Patch get_transfer_controller where it is used in process_new_7011_file. + mocker.patch( + "orchestration.flows.bl7011.move.get_transfer_controller", + return_value=mock_transfer_controller + ) + + # Execute the move flow with the test file path and mock configuration. + result = process_new_7011_file(file_path=test_file_path, config=mock_config) + + # Verify that the transfer controller's copy method was called exactly once. + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + # Reset mocks and test with config=None + mock_transfer_controller.copy.reset_mock() + mock_prune.reset_mock() + + result = process_new_7011_file(file_path=test_file_path, config=None) + assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" + assert result is None, "The flow should return None" + assert mock_prune.call_count == 1, "Prune function should be called exactly once" + + +def test_dispatcher_7011_flow(mocker: MockFixture) -> None: + """ + Test the dispatcher flow for BL7011. + + This test verifies that: + - The process_new_7011_file function is called with the correct parameters + when the dispatcher flow is executed. + Parameters: + mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. + """ + # Import the dispatcher flow to test. + from orchestration.flows.bl7011.dispatcher import dispatcher + + # Create a mock configuration object. + class MockConfig: + pass + + mock_config = MockConfig() + + # Generate a test file path. + test_file_path = f"/tmp/test_file_{uuid4()}.txt" + + # Patch the process_new_7011_file function to monitor its calls. + mock_process_new_7011_file = mocker.patch( + "orchestration.flows.bl7011.dispatcher.process_new_7011_file", + return_value=None + ) + + # Execute the dispatcher flow with test parameters. + dispatcher( + file_path=test_file_path, + is_export_control=False, + config=mock_config + ) + + # Verify that process_new_7011_file was called exactly once with the expected arguments. + mock_process_new_7011_file.assert_called_once_with( + file_path=test_file_path, + config=mock_config + ) + + # Verify that process_new_7011_file is called even when config is None + mock_process_new_7011_file.reset_mock() + dispatcher( + file_path=test_file_path, + is_export_control=False, + config=None + ) + mock_process_new_7011_file.assert_called_once() + + # Test error handling for missing file_path + mock_process_new_7011_file.reset_mock() + with pytest.raises(ValueError): + dispatcher( + file_path=None, + is_export_control=False, + config=mock_config + ) + mock_process_new_7011_file.assert_not_called() + + # Test error handling for export control flag + mock_process_new_7011_file.reset_mock() + with pytest.raises(ValueError): + dispatcher( + file_path=test_file_path, + is_export_control=True, + config=mock_config + ) + mock_process_new_7011_file.assert_not_called() From d63e0c50d078dbbb165e895bf87caf5baa9f7e50 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 15:21:58 -0700 Subject: [PATCH 07/24] Adding mocker patch to dispatcher pytest --- orchestration/_tests/test_bl7011/test_move.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/orchestration/_tests/test_bl7011/test_move.py b/orchestration/_tests/test_bl7011/test_move.py index b26748cd..339bb3ba 100644 --- a/orchestration/_tests/test_bl7011/test_move.py +++ b/orchestration/_tests/test_bl7011/test_move.py @@ -125,6 +125,18 @@ class MockConfig: # Generate a test file path. test_file_path = f"/tmp/test_file_{uuid4()}.txt" + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): + mocker.patch( + "orchestration.flows.bl7011.config.transfer.init_transfer_client", + return_value=mocker.MagicMock() # Return a dummy TransferClient + ) + # Patch the schedule_prefect_flow call to avoid real Prefect interaction + mocker.patch( + "orchestration.flows.bl7011.move.schedule_prefect_flow", + return_value=None + ) + # Patch the process_new_7011_file function to monitor its calls. mock_process_new_7011_file = mocker.patch( "orchestration.flows.bl7011.dispatcher.process_new_7011_file", From 2a5a589f86c43b589eed8de0ce33789edd0e2a2f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 28 Oct 2025 16:36:24 -0700 Subject: [PATCH 08/24] Adding documentation for 7.0.1.1 flows --- docs/mkdocs/docs/bl7011.md | 39 ++++++++++++++++++++++++++++++++++++++ docs/mkdocs/mkdocs.yml | 7 +++++-- 2 files changed, 44 insertions(+), 2 deletions(-) create mode 100644 docs/mkdocs/docs/bl7011.md diff --git a/docs/mkdocs/docs/bl7011.md b/docs/mkdocs/docs/bl7011.md new file mode 100644 index 00000000..0cabab1f --- /dev/null +++ b/docs/mkdocs/docs/bl7011.md @@ -0,0 +1,39 @@ +# Beamline 7.0.1.1 Flows + +This page documents the workflows supported by Splash Flows at [ALS Beamline 7.0.1.1 (COSMIC Scattering)](https://als.lbl.gov/beamlines/7-0-1-1/). + +## Data at 7.0.1.1 + +At Beamline 7.0.1.1, users generate data in an HDF5 format containing a background subtracted stack of 2D images with associated Labview metadata. Depending on the experiment, the file sizes can be greater than 100GB. A ROI is exported for each dataset. + +## File Watcher + +There is a file watcher on the system `QNAP` that listens for new scans that have finished writing to disk. From there, a Prefect Flow we call `dispatcher` kicks off the downstream steps: +- Copy scans in real time from a Globus collection on the `compute-dtn` server to `NERSC CFS` using Globus Transfer. +- Copy project data to `NERSC HPSS` for long-term storage (TBD). +- Analysis on HPC systems (TBD). +- Ingest into SciCat (TBD). +- Schedule data pruning from `QNAP` and `NERSC CFS`. + +## Prefect Configuration + +### Registered Flows + +#### `dispatcher.py` + +The Dispatcher Prefect Flow manages the logic for handling the order and execution of data tasks. Once a new file is written, the `dispatcher()` Flow is called. In this case, the dispatcher handles the synchronous call to `move.py`, with a potential to add additional steps (e.g. scheduling remote HPC analysis code). + +#### `move.py` + +Flow to process a new file at BL 7.0.1.1 +1. Copy the file from `compute-dtn` to `NERSC CFS` and ingest the file path and metadata into SciCat. +2. Schedule pruning from `QNAP`. +3. Copy the file from `NERSC CFS` to `NERSC HPSS`. Ingest the archived file path in SciCat. +4. Schedule pruning from `NERSC CFS`. + +## VM Details + +The computing backend runs on a VM in the B15 server room that is managed by ALS IT staff. + +**Name**: `flow-xpcs` +**OS**: `Ubuntu 20.02 LTS` ... **at some point should be updated to `Ubuntu 24.04 LTS`** diff --git a/docs/mkdocs/mkdocs.yml b/docs/mkdocs/mkdocs.yml index 728a990e..331a9c08 100644 --- a/docs/mkdocs/mkdocs.yml +++ b/docs/mkdocs/mkdocs.yml @@ -13,8 +13,11 @@ nav: - Home: index.md - Installation and Requirements: install.md - Getting Started: getting_started.md -- Compute at ALCF: alcf832.md -- Compute at NERSC: nersc832.md +- Beamline Implementations: + - Beamline 7.0.1.1 - COSMIC Scattering: bl7011.md + - Beamline 8.3.2 - Microtomography: + - Compute at ALCF: alcf832.md + - Compute at NERSC: nersc832.md - Orchestration: orchestration.md - Configuration: configuration.md # - Troubleshooting: troubleshooting.md From 69774435044513851279f6ed82b94566f16ae1b6 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 5 Nov 2025 09:54:01 -0800 Subject: [PATCH 09/24] Making the dispatcher call process_new_7011_file_task, rather than the flow. Making the process_new_7011_file_flow call the task as well. This is for cleaner observation into the logs. --- orchestration/flows/bl7011/dispatcher.py | 4 ++-- orchestration/flows/bl7011/move.py | 15 +++++++++++++-- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl7011/dispatcher.py b/orchestration/flows/bl7011/dispatcher.py index 5a9c24bf..9a28304b 100644 --- a/orchestration/flows/bl7011/dispatcher.py +++ b/orchestration/flows/bl7011/dispatcher.py @@ -3,7 +3,7 @@ from typing import Optional, Union, Any from orchestration.flows.bl7011.config import Config7011 -from orchestration.flows.bl7011.move import process_new_7011_file +from orchestration.flows.bl7011.move import process_new_7011_file_task logger = logging.getLogger(__name__) @@ -44,7 +44,7 @@ def dispatcher( logger.info("No config provided. Using default Config7011.") try: - process_new_7011_file( + process_new_7011_file_task( file_path=file_path, config=config ) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index a1ebee19..c84d9a7a 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -2,7 +2,7 @@ import logging from typing import Optional -from prefect import flow +from prefect import flow, task # from prefect.blocks.system import JSON from orchestration.flows.bl7011.config import Config7011 @@ -94,7 +94,7 @@ def prune( # @staticmethod -@flow(name="prune_globus_endpoint", flow_run_name="prune_globus_endpoint-{{ relative_path | basename }}") +@flow(name="prune_globus_endpoint", flow_run_name="prune_globus-{source_endpoint.name}-{relative_path}") def _prune_globus_endpoint( relative_path: str, source_endpoint: GlobusEndpoint, @@ -135,6 +135,17 @@ def _prune_globus_endpoint( def process_new_7011_file( file_path: str, config: Optional[Config7011] = None +) -> None: + """ + Flow to process a new file at BL 7.0.1.1 + """ + process_new_7011_file_task(file_path=file_path, config=config) + + +@task(name="new_7011_file_task") +def process_new_7011_file_task( + file_path: str, + config: Optional[Config7011] = None ) -> None: """ Flow to process a new file at BL 7.0.1.1 From a98e13873283556916b97f9fb529a8515f96dc27 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 5 Nov 2025 09:56:07 -0800 Subject: [PATCH 10/24] removing main methods from move.py and dispatcher.py --- orchestration/flows/bl7011/dispatcher.py | 4 ---- orchestration/flows/bl7011/move.py | 9 +-------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/orchestration/flows/bl7011/dispatcher.py b/orchestration/flows/bl7011/dispatcher.py index 9a28304b..9bd00691 100644 --- a/orchestration/flows/bl7011/dispatcher.py +++ b/orchestration/flows/bl7011/dispatcher.py @@ -52,7 +52,3 @@ def dispatcher( except Exception as e: logger.error(f"Error during processing in dispatcher flow: {e}") raise - - -if __name__ == "__main__": - dispatcher() diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index c84d9a7a..331c9a4d 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -132,7 +132,7 @@ def _prune_globus_endpoint( @flow(name="new_7011_file_flow", flow_run_name="process_new-{file_path}") -def process_new_7011_file( +def process_new_7011_file_flow( file_path: str, config: Optional[Config7011] = None ) -> None: @@ -217,10 +217,3 @@ def move_7011_flight_check( logger.info("7011 flight check: transfer successful") else: logger.error("7011 flight check: transfer failed") - - -if __name__ == "__main__": - # Example usage - config = Config7011() - file_path = "test_directory/" - process_new_7011_file(file_path, config) From 2440763368ed7ff0daaf7e9c88ab4efdc3ea0399 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 5 Nov 2025 09:59:36 -0800 Subject: [PATCH 11/24] Updating pytest --- orchestration/_tests/test_bl7011/test_move.py | 36 +++++++++---------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/orchestration/_tests/test_bl7011/test_move.py b/orchestration/_tests/test_bl7011/test_move.py index 339bb3ba..969b1d2c 100644 --- a/orchestration/_tests/test_bl7011/test_move.py +++ b/orchestration/_tests/test_bl7011/test_move.py @@ -42,7 +42,7 @@ def prefect_test_fixture(): # Tests for 7011 # ---------------------------- -def test_process_new_7011_file(mocker: MockFixture) -> None: +def test_process_new_7011_file_task(mocker: MockFixture) -> None: """ Test the process_new_7011_file flow from orchestration.flows.bl7011.move. @@ -55,7 +55,7 @@ def test_process_new_7011_file(mocker: MockFixture) -> None: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. """ # Import the flow to test. - from orchestration.flows.bl7011.move import process_new_7011_file + from orchestration.flows.bl7011.move import process_new_7011_file_task # Patch the Secret.load and init_transfer_client in the configuration context. with mocker.patch('prefect.blocks.system.Secret.load', return_value=MockSecret()): @@ -79,14 +79,14 @@ def test_process_new_7011_file(mocker: MockFixture) -> None: return_value=None ) - # Patch get_transfer_controller where it is used in process_new_7011_file. + # Patch get_transfer_controller where it is used in process_new_7011_file_task. mocker.patch( "orchestration.flows.bl7011.move.get_transfer_controller", return_value=mock_transfer_controller ) # Execute the move flow with the test file path and mock configuration. - result = process_new_7011_file(file_path=test_file_path, config=mock_config) + result = process_new_7011_file_task(file_path=test_file_path, config=mock_config) # Verify that the transfer controller's copy method was called exactly once. assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" @@ -97,7 +97,7 @@ def test_process_new_7011_file(mocker: MockFixture) -> None: mock_transfer_controller.copy.reset_mock() mock_prune.reset_mock() - result = process_new_7011_file(file_path=test_file_path, config=None) + result = process_new_7011_file_task(file_path=test_file_path, config=None) assert mock_transfer_controller.copy.call_count == 1, "Transfer controller copy method should be called exactly once" assert result is None, "The flow should return None" assert mock_prune.call_count == 1, "Prune function should be called exactly once" @@ -108,7 +108,7 @@ def test_dispatcher_7011_flow(mocker: MockFixture) -> None: Test the dispatcher flow for BL7011. This test verifies that: - - The process_new_7011_file function is called with the correct parameters + - The process_new_7011_file_task function is called with the correct parameters when the dispatcher flow is executed. Parameters: mocker (MockFixture): The pytest-mock fixture for patching and mocking objects. @@ -137,9 +137,9 @@ class MockConfig: return_value=None ) - # Patch the process_new_7011_file function to monitor its calls. - mock_process_new_7011_file = mocker.patch( - "orchestration.flows.bl7011.dispatcher.process_new_7011_file", + # Patch the process_new_7011_file_task function to monitor its calls. + mock_process_new_7011_file_task = mocker.patch( + "orchestration.flows.bl7011.dispatcher.process_new_7011_file_task", return_value=None ) @@ -150,37 +150,37 @@ class MockConfig: config=mock_config ) - # Verify that process_new_7011_file was called exactly once with the expected arguments. - mock_process_new_7011_file.assert_called_once_with( + # Verify that process_new_7011_file_task was called exactly once with the expected arguments. + mock_process_new_7011_file_task.assert_called_once_with( file_path=test_file_path, config=mock_config ) - # Verify that process_new_7011_file is called even when config is None - mock_process_new_7011_file.reset_mock() + # Verify that process_new_7011_file_task is called even when config is None + mock_process_new_7011_file_task.reset_mock() dispatcher( file_path=test_file_path, is_export_control=False, config=None ) - mock_process_new_7011_file.assert_called_once() + mock_process_new_7011_file_task.assert_called_once() # Test error handling for missing file_path - mock_process_new_7011_file.reset_mock() + mock_process_new_7011_file_task.reset_mock() with pytest.raises(ValueError): dispatcher( file_path=None, is_export_control=False, config=mock_config ) - mock_process_new_7011_file.assert_not_called() + mock_process_new_7011_file_task.assert_not_called() # Test error handling for export control flag - mock_process_new_7011_file.reset_mock() + mock_process_new_7011_file_task.reset_mock() with pytest.raises(ValueError): dispatcher( file_path=test_file_path, is_export_control=True, config=mock_config ) - mock_process_new_7011_file.assert_not_called() + mock_process_new_7011_file_task.assert_not_called() From 1c6c7e3c8526896c373ec54bbe263ae10568dc40 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 5 Nov 2025 10:45:48 -0800 Subject: [PATCH 12/24] Updating bl7011/prefect.yaml to reflect the new file flow -> task change --- orchestration/flows/bl7011/prefect.yaml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl7011/prefect.yaml b/orchestration/flows/bl7011/prefect.yaml index 54cd244c..3131ea93 100644 --- a/orchestration/flows/bl7011/prefect.yaml +++ b/orchestration/flows/bl7011/prefect.yaml @@ -1,8 +1,8 @@ name: bl7011 prefect-version: 3.4.2 deployments: -- name: new_file_7011 - entrypoint: orchestration/flows/bl7011/move.py:process_new_7011_file +- name: new_file_7011_flow + entrypoint: orchestration/flows/bl7011/move.py:process_new_7011_file_flow work_pool: name: new_file_7011_pool work_queue_name: new_file_7011_queue From 3f4aa1944779c0e547b4907669820c9d4a91ec87 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 7 Nov 2025 13:40:04 -0800 Subject: [PATCH 13/24] Adding 7.0.1.1 flow diagram to mkdocs --- docs/mkdocs/docs/bl7011.md | 88 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 88 insertions(+) diff --git a/docs/mkdocs/docs/bl7011.md b/docs/mkdocs/docs/bl7011.md index 0cabab1f..dcff3e59 100644 --- a/docs/mkdocs/docs/bl7011.md +++ b/docs/mkdocs/docs/bl7011.md @@ -37,3 +37,91 @@ The computing backend runs on a VM in the B15 server room that is managed by ALS **Name**: `flow-xpcs` **OS**: `Ubuntu 20.02 LTS` ... **at some point should be updated to `Ubuntu 24.04 LTS`** + +## Flow Diagram + +## Flow Diagram +```mermaid +sequenceDiagram + participant DET as Detector/
File Watcher + participant DISP as Prefect
Dispatcher + participant D7011 as bl7011 compute-dtn
Storage + participant GLOB as Globus
Transfer + participant CFS as NERSC
CFS + participant CAT as SciCat
Metadata + participant SFAPI as SFAPI + participant HPC as HPC
Compute + participant HPSS as HPSS
Tape + + %% Initial Trigger + DET->>DET: Monitor filesystem + DET->>DISP: Trigger on new file + DISP->>DISP: Coordinate flows + + %% Flow 1: new_file_7011 + rect rgb(220, 230, 255) + note over DISP,CAT: FLOW 1: new_file_7011 + DISP->>GLOB: Init transfer + activate GLOB + GLOB->>D7011: Initiate copy + activate D7011 + D7011-->>GLOB: Copy initiated + deactivate D7011 + %% note right of GLOB: Transfer in progress + GLOB-->>DISP: Transfer complete + deactivate GLOB + + DISP->>CAT: Register metadata + end + + %% Flow 2: HPSS Transfer + rect rgb(220, 255, 230) + note over DISP,CAT: FLOW 2: Scheduled HPSS Transfer + DISP->>SFAPI: Submit tape job + activate SFAPI + SFAPI->>HPSS: Initiate archive + activate HPSS + HPSS-->>SFAPI: Archive complete + deactivate HPSS + SFAPI-->>DISP: Job complete + deactivate SFAPI + + DISP->>CAT: Update metadata + end + + %% Flow 3: HPC Analysis + rect rgb(255, 230, 230) + note over DISP,HPC: FLOW 3: HPC Downstream Analysis + DISP->>SFAPI: Submit compute job + activate SFAPI + SFAPI->>HPC: Execute job + activate HPC + HPC->>HPC: Process data + HPC-->>SFAPI: Compute complete + deactivate HPC + SFAPI-->>DISP: Job complete + deactivate SFAPI + + DISP->>CAT: Update metadata + end + + %% Flow 4: Scheduled Pruning + rect rgb(255, 255, 220) + note over DISP,CAT: FLOW 4: Scheduled Pruning + DISP->>DISP: Scheduled pruning trigger + + DISP->>D7011: Prune old files + activate D7011 + D7011->>D7011: Delete expired data + D7011-->>DISP: Pruning complete + deactivate D7011 + + DISP->>CFS: Prune old files + activate CFS + CFS->>CFS: Delete expired data + CFS-->>DISP: Pruning complete + deactivate CFS + + DISP->>CAT: Update metadata + end + ``` \ No newline at end of file From 74e4f699e0f2c70ed7e4a0875117da06d06faa2b Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 11 Nov 2025 14:02:57 -0800 Subject: [PATCH 14/24] Adding logger = get_run_logger() for better prefect logging --- orchestration/flows/bl7011/move.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 331c9a4d..c4d92e58 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -2,7 +2,7 @@ import logging from typing import Optional -from prefect import flow, task +from prefect import flow, task, get_run_logger # from prefect.blocks.system import JSON from orchestration.flows.bl7011.config import Config7011 @@ -35,6 +35,7 @@ def prune( Returns: bool: True if pruning was successful or scheduled successfully, False otherwise """ + logger = get_run_logger() if not file_path: logger.error("No file_path provided for pruning operation") return False @@ -109,6 +110,7 @@ def _prune_globus_endpoint( check_endpoint (Optional[GlobusEndpoint]): If provided, verify data exists here before pruning config (BeamlineConfig): Configuration object with transfer client """ + logger = get_run_logger() logger.info(f"Running Globus pruning flow for '{relative_path}' from '{source_endpoint.name}'") if not config: @@ -157,7 +159,7 @@ def process_new_7011_file_task( :param file_path: Path to the new file to be processed. :param config: Configuration settings for processing. """ - + logger = get_run_logger() logger.info(f"Processing new 7011 file: {file_path}") if not config: @@ -199,6 +201,7 @@ def move_7011_flight_check( file_path: str = "test_directory/test.txt", ): """Please keep your arms and legs inside the vehicle at all times.""" + logger = get_run_logger() logger.info("7011 flight check: testing transfer from data7011 to NERSC CFS") config = Config7011() @@ -213,6 +216,7 @@ def move_7011_flight_check( source=config.bl7011_compute_dtn, destination=config.bl7011_nersc_alsdev ) + if success is True: logger.info("7011 flight check: transfer successful") else: From be77e2f05dc8965f5d086bc0356059f3ff48306f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 11 Nov 2025 14:15:58 -0800 Subject: [PATCH 15/24] Ensuring the test flow completely fails if the transfer failed by throwing a RuntimeError --- orchestration/flows/bl7011/move.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index c4d92e58..e3e7aa0d 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -199,8 +199,13 @@ def process_new_7011_file_task( @flow(name="move_7011_flight_check", flow_run_name="move_7011_flight_check-{file_path}") def move_7011_flight_check( file_path: str = "test_directory/test.txt", -): - """Please keep your arms and legs inside the vehicle at all times.""" +) -> None: + """Please keep your arms and legs inside the vehicle at all times. + + :param file_path: Path to the test file to be transferred. + :raises RuntimeError: If the transfer fails. + :return: None + """ logger = get_run_logger() logger.info("7011 flight check: testing transfer from data7011 to NERSC CFS") @@ -221,3 +226,4 @@ def move_7011_flight_check( logger.info("7011 flight check: transfer successful") else: logger.error("7011 flight check: transfer failed") + raise RuntimeError("7011 flight check: transfer failed") From e6fe98d777b0197fd429f6e7c79fa078ea0713f7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 12 Nov 2025 11:00:55 -0800 Subject: [PATCH 16/24] Adjusting default flight_check file path to point at the existing test/test_065.h5 fuke, since I am hitting a 550 read-only filesystem error when I try to create a new folder in the collection. --- orchestration/flows/bl7011/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index e3e7aa0d..6964183a 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -198,7 +198,7 @@ def process_new_7011_file_task( @flow(name="move_7011_flight_check", flow_run_name="move_7011_flight_check-{file_path}") def move_7011_flight_check( - file_path: str = "test_directory/test.txt", + file_path: str = "test/test_065.h5", ) -> None: """Please keep your arms and legs inside the vehicle at all times. From bfad8b1eceff2a18a30ee42ee8ca0de4012dfe2c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 12 Nov 2025 11:05:20 -0800 Subject: [PATCH 17/24] Adding try block around the test transfer, with an except block to get more details about why (potentially) the transfer failed --- orchestration/flows/bl7011/move.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 6964183a..6d98c463 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -216,12 +216,16 @@ def move_7011_flight_check( config=config ) - success = transfer_controller.copy( - file_path=file_path, - source=config.bl7011_compute_dtn, - destination=config.bl7011_nersc_alsdev - ) - + try: + logger.info(f"7011 flight check: transferring file {file_path}") + success = transfer_controller.copy( + file_path=file_path, + source=config.bl7011_compute_dtn, + destination=config.bl7011_nersc_alsdev + ) + except Exception as e: + logger.error(f"7011 flight check: transfer failed with exception {e}") + raise RuntimeError(f"7011 flight check: transfer failed with exception {e}") if success is True: logger.info("7011 flight check: transfer successful") else: From 8e72b4dd9e92c9dd37581558ac71f694a807b593 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 12 Nov 2025 11:14:40 -0800 Subject: [PATCH 18/24] Updating 7.0.1.1 endpoint details in config.yml --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index 9381117c..bd41de33 100644 --- a/config.yml +++ b/config.yml @@ -10,7 +10,7 @@ globus: name: bl7011-als-cosmic-scattering bl7011-nersc_alsdev: - root_path: /global/cfs/cdirs/als/gsharing/data_mover/7011 + root_path: /global/cfs/cdirs/als/data_mover/7.0.1.1 uri: nersc.gov uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 name: bl7011-nersc_alsdev From f356a26b415b23e0b8cb3a34eb01258a7fa2ef88 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Thu, 13 Nov 2025 09:38:32 -0800 Subject: [PATCH 19/24] Fixing the 7011 nersc alsdev globus endpoint (was pointing at the wrong nersc endpoint previously) --- config.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config.yml b/config.yml index bd41de33..6d604a7d 100644 --- a/config.yml +++ b/config.yml @@ -12,7 +12,7 @@ globus: bl7011-nersc_alsdev: root_path: /global/cfs/cdirs/als/data_mover/7.0.1.1 uri: nersc.gov - uuid: df82346e-9a15-11ea-b3c4-0ae144191ee3 + uuid: d40248e6-d874-4f7b-badd-2c06c16f1a58 name: bl7011-nersc_alsdev bl7011-compute-dtn: From a2358bc7fad12cb66e3ccca3db09881233e85839 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Thu, 13 Nov 2025 11:23:29 -0800 Subject: [PATCH 20/24] Adding flow_run_name to schedule_prefect_flow call in the prune method --- orchestration/flows/bl7011/move.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 6d98c463..5630e429 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -76,6 +76,7 @@ def prune( try: schedule_prefect_flow( deployment_name="prune_globus_endpoint/prune_globus_endpoint", + flow_run_name=f"prune_globus-{source_endpoint.name}-{file_path}", parameters={ "relative_path": file_path, "source_endpoint": source_endpoint, From 46ab1a5a58bb885efcf5e80b1d31a39f1b8c7498 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 15:03:43 -0800 Subject: [PATCH 21/24] commit before rebasing --- orchestration/flows/bl7011/move.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 5630e429..3a9fe178 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -187,7 +187,7 @@ def process_new_7011_file_task( file_path=file_path, source_endpoint=config.bl7011_compute_dtn, check_endpoint=config.bl7011_nersc_alsdev, - days_from_now=180.0 # work with Sofie/Sujoy to determine appropriate retention period + days_from_now=180.0 # work with Sophie/Ron to determine appropriate retention period ) # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? From e43a6e17e840f319f98b55cfe851cada06b43d10 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Wed, 17 Dec 2025 15:13:57 -0800 Subject: [PATCH 22/24] Using Variable blocks instead of JSON blocks --- orchestration/_tests/test_bl7011/test_move.py | 20 ++++++++++++++++--- orchestration/flows/bl7011/move.py | 15 +++++++------- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/orchestration/_tests/test_bl7011/test_move.py b/orchestration/_tests/test_bl7011/test_move.py index 969b1d2c..5a74b611 100644 --- a/orchestration/_tests/test_bl7011/test_move.py +++ b/orchestration/_tests/test_bl7011/test_move.py @@ -5,7 +5,8 @@ from uuid import uuid4 from prefect.testing.utilities import prefect_test_harness -from prefect.blocks.system import Secret, JSON +from prefect.blocks.system import Secret +from prefect.variables import Variable from pytest_mock import MockFixture from orchestration._tests.test_transfer_controller import MockSecret @@ -32,8 +33,21 @@ def prefect_test_fixture(): globus_client_secret = Secret(value=str(uuid4())) globus_client_secret.save(name="globus-client-secret", overwrite=True) - pruning_config = JSON(value={"max_wait_seconds": 600}) - pruning_config.save(name="pruning-config", overwrite=True) + Variable.set( + name="globus-settings", + value={"max_wait_seconds": 600}, + overwrite=True, + _sync=True + ) + + Variable.set( + name="bl7011-settings", + value={ + "delete_data7011_files_after_days": 180, + }, + overwrite=True, + _sync=True + ) yield diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 3a9fe178..036fa6cd 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -3,7 +3,7 @@ from typing import Optional from prefect import flow, task, get_run_logger -# from prefect.blocks.system import JSON +from prefect.variables import Variable from orchestration.flows.bl7011.config import Config7011 from orchestration.globus.transfer import GlobusEndpoint, prune_one_safe @@ -117,9 +117,9 @@ def _prune_globus_endpoint( if not config: config = Config7011() - # globus_settings = JSON.load("globus-settings").value - # max_wait_seconds = globus_settings["max_wait_seconds"] - max_wait_seconds = 600 + globus_settings = Variable.get("globus-settings") + max_wait_seconds = globus_settings["max_wait_seconds"] + flow_name = f"prune_from_{source_endpoint.name}" logger.info(f"Running flow: {flow_name}") logger.info(f"Pruning {relative_path} from source endpoint: {source_endpoint.name}") @@ -177,17 +177,16 @@ def process_new_7011_file_task( destination=config.bl7011_nersc_alsdev ) - # TODO: Ingest file path in SciCat - # Waiting for PR #62 to be merged (scicat_controller) - # Schedule pruning from QNAP # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs + + bl7011_settings = Variable.get("bl7011-settings") prune( file_path=file_path, source_endpoint=config.bl7011_compute_dtn, check_endpoint=config.bl7011_nersc_alsdev, - days_from_now=180.0 # work with Sophie/Ron to determine appropriate retention period + days_from_now=bl7011_settings["delete_data7011_files_after_days"], # set to 180 days ) # TODO: Copy the file from NERSC CFS to NERSC HPSS.. after 2 years? From fc29e3e8f581e5ffeda2bf722c5f4be87861555e Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 20 Jan 2026 15:20:37 -0800 Subject: [PATCH 23/24] Making the dispatcher/move flows accepts a metadata field for the UID from Databroker --- orchestration/flows/bl7011/dispatcher.py | 2 ++ orchestration/flows/bl7011/move.py | 26 +++++++++++++++++++++--- 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/orchestration/flows/bl7011/dispatcher.py b/orchestration/flows/bl7011/dispatcher.py index 9bd00691..ab12db54 100644 --- a/orchestration/flows/bl7011/dispatcher.py +++ b/orchestration/flows/bl7011/dispatcher.py @@ -12,6 +12,7 @@ @flow(name="dispatcher", flow_run_name="dispatcher-{file_path}") def dispatcher( file_path: Optional[str] = None, + metadata: Optional[dict] = None, is_export_control: bool = False, config: Optional[Union[dict, Any]] = None, ) -> None: @@ -46,6 +47,7 @@ def dispatcher( try: process_new_7011_file_task( file_path=file_path, + metadata=metadata, config=config ) logger.info("Dispatcher flow completed successfully.") diff --git a/orchestration/flows/bl7011/move.py b/orchestration/flows/bl7011/move.py index 036fa6cd..db3e27f4 100644 --- a/orchestration/flows/bl7011/move.py +++ b/orchestration/flows/bl7011/move.py @@ -2,7 +2,7 @@ import logging from typing import Optional -from prefect import flow, task, get_run_logger +from prefect import flow, get_run_logger, task from prefect.variables import Variable from orchestration.flows.bl7011.config import Config7011 @@ -137,17 +137,24 @@ def _prune_globus_endpoint( @flow(name="new_7011_file_flow", flow_run_name="process_new-{file_path}") def process_new_7011_file_flow( file_path: str, + metadata: Optional[dict] = None, config: Optional[Config7011] = None ) -> None: """ Flow to process a new file at BL 7.0.1.1 + + :param file_path: Path to the new file to be processed. + :param metadata: Optional metadata associated with the file. + :param config: Configuration settings for processing. + :return: None """ - process_new_7011_file_task(file_path=file_path, config=config) + process_new_7011_file_task(file_path=file_path, metadata=metadata, config=config) @task(name="new_7011_file_task") def process_new_7011_file_task( file_path: str, + metadata: Optional[dict] = None, config: Optional[Config7011] = None ) -> None: """ @@ -158,29 +165,40 @@ def process_new_7011_file_task( 4. Schedule pruning from NERSC CFS. :param file_path: Path to the new file to be processed. + :param metadata: Optional metadata associated with the file. :param config: Configuration settings for processing. """ logger = get_run_logger() logger.info(f"Processing new 7011 file: {file_path}") if not config: + logger.info("No config provided, using default Config7011") config = Config7011() + logger.info("Initializing transfer controller for Globus transfers") transfer_controller = get_transfer_controller( transfer_type=CopyMethod.GLOBUS, config=config ) - transfer_controller.copy( + logger.info(f"Transferring file {file_path} from data7011 to NERSC CFS") + nersc_transfer_success = transfer_controller.copy( file_path=file_path, source=config.bl7011_compute_dtn, destination=config.bl7011_nersc_alsdev ) + if not nersc_transfer_success: + logger.error(f"Failed to transfer file {file_path} to NERSC CFS") + raise Warning(f"Failed to transfer file {file_path} to NERSC CFS") + else: + logger.info(f"Successfully transferred file {file_path} to NERSC CFS") + # Schedule pruning from QNAP # Waiting for PR #62 to be merged (prune_controller) # TODO: Determine scheduling days_from_now based on beamline needs + logger.info(f"Scheduling pruning of file {file_path} from data7011 after configured retention period") bl7011_settings = Variable.get("bl7011-settings") prune( file_path=file_path, @@ -194,6 +212,8 @@ def process_new_7011_file_task( # TODO: Ingest file path in SciCat # Waiting for PR #62 to be merged (scicat_controller) + # scicat will ingest the "metadata" parameter if provided, which will contain the UID for bluesky runs in databroker. + # the scicat spec for xpcs will need to use databroker to get the actual metadata from the UID. @flow(name="move_7011_flight_check", flow_run_name="move_7011_flight_check-{file_path}") From fe136e968619689c2412a692d74fd712db519e91 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Tue, 20 Jan 2026 15:44:09 -0800 Subject: [PATCH 24/24] updating pytest to handle the new metadata field --- orchestration/_tests/test_bl7011/test_move.py | 1 + 1 file changed, 1 insertion(+) diff --git a/orchestration/_tests/test_bl7011/test_move.py b/orchestration/_tests/test_bl7011/test_move.py index 5a74b611..570a59ca 100644 --- a/orchestration/_tests/test_bl7011/test_move.py +++ b/orchestration/_tests/test_bl7011/test_move.py @@ -167,6 +167,7 @@ class MockConfig: # Verify that process_new_7011_file_task was called exactly once with the expected arguments. mock_process_new_7011_file_task.assert_called_once_with( file_path=test_file_path, + metadata=None, config=mock_config )