From 230831924c26188870e846ad1b54627374c8995c Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:20:25 -0800 Subject: [PATCH 1/7] Updating ALCF endpoints to include the synaps-i allocation (to be set up) --- config.yml | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/config.yml b/config.yml index 3f26a4f..0d24832 100644 --- a/config.yml +++ b/config.yml @@ -72,17 +72,23 @@ globus: uuid: 75b478b2-37af-46df-bfbd-71ed692c6506 name: data832_scratch - alcf832_raw: + alcf832_synaps: + root_path: / + uri: alcf.anl.gov + uuid: TBD + name: alcf832_synaps + + alcf832_iri_raw: root_path: /data/raw uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_raw + name: alcf_iri_raw - alcf832_scratch: + alcf832_iri_scratch: root_path: /data/scratch uri: alcf.anl.gov uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7 - name: alcf_scratch + name: alcf_iri_scratch alcf_eagle832: root_path: /IRIBeta/als/example From dc330441b76be53ca5f76c65760a7b20199ca25f Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:21:44 -0800 Subject: [PATCH 2/7] Updating bl832 config.py to distinguish IRI and SYNAPS-I ALCF endpoints --- orchestration/flows/bl832/config.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 788eef4..7294b0a 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -24,7 +24,8 @@ def _beam_specific_config(self) -> None: self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"] self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"] self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"] - self.alcf832_raw = self.endpoints["alcf832_raw"] - self.alcf832_scratch = self.endpoints["alcf832_scratch"] - self.scicat = self.config["scicat"] - self.ghcr_images832 = self.config["ghcr_images832"] + self.alcf832_synaps = self.endpoints["alcf832_synaps"] + self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] + self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] + self.scicat = config["scicat"] + self.ghcr_images832 = config["ghcr_images832"] From 99e87418d877b7a8541fab1427b69c8eb1241ff7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:22:11 -0800 Subject: [PATCH 3/7] Adding the config.yaml file for setting up the globus compute endpoint for reconstruction on ALCF --- .../polaris/globus_compute_recon_config.yaml | 39 +++++++++++++++++++ 1 file changed, 39 insertions(+) create mode 100644 scripts/polaris/globus_compute_recon_config.yaml diff --git a/scripts/polaris/globus_compute_recon_config.yaml b/scripts/polaris/globus_compute_recon_config.yaml new file mode 100644 index 0000000..66ffd33 --- /dev/null +++ b/scripts/polaris/globus_compute_recon_config.yaml @@ -0,0 +1,39 @@ +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time From 523fc1938efda1e0b91d6823cad2bf91bb276df7 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:22:54 -0800 Subject: [PATCH 4/7] Adding the config.yaml file for setting up the globus compute endpoint for segmentation on ALCF. Still needs to be configured for GPU and the environment with dependencies --- .../globus_compute_segment_config.yaml | 41 +++++++++++++++++++ 1 file changed, 41 insertions(+) create mode 100644 scripts/polaris/globus_compute_segment_config.yaml diff --git a/scripts/polaris/globus_compute_segment_config.yaml b/scripts/polaris/globus_compute_segment_config.yaml new file mode 100644 index 0000000..07bced0 --- /dev/null +++ b/scripts/polaris/globus_compute_segment_config.yaml @@ -0,0 +1,41 @@ +# This needs to be updated to use GPUs and a segmentation environment + +engine: + type: GlobusComputeEngine # This engine uses the HighThroughputExecutor + max_retries_on_system_failure: 2 + max_workers: 1 # Sets one worker per node + prefetch_capacity: 0 # Increase if you have many more tasks than workers + + address: + type: address_by_interface + ifname: bond0 + + strategy: simple + job_status_kwargs: + max_idletime: 300 + strategy_period: 60 + + provider: + type: PBSProProvider + + launcher: + type: MpiExecLauncher + # Ensures 1 manger per node, work on all 64 cores + bind_cmd: --cpu-bind + overrides: --depth=64 --ppn 1 + + account: SYNAPS-I + queue: debug + cpus_per_node: 64 + + # e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe" + scheduler_options: "#PBS -l filesystems=home:eagle" + + # Node setup: activate necessary conda environment and such + worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction" + + walltime: 00:60:00 # Jobs will end after 60 minutes + nodes_per_block: 2 # All jobs will have 1 node + init_blocks: 0 + min_blocks: 0 + max_blocks: 2 # No more than 1 job will be scheduled at a time From 951b9d92c5c1a67b369ff896f1e8c4bb63c02118 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:23:53 -0800 Subject: [PATCH 5/7] Adding segmentation Prefect task, and segmentation globus compute code for the TomographyController. Turning off TIFF to ZARR on ALCF for the demo --- orchestration/flows/bl832/alcf.py | 195 +++++++++++++++++++++++++----- 1 file changed, 166 insertions(+), 29 deletions(-) diff --git a/orchestration/flows/bl832/alcf.py b/orchestration/flows/bl832/alcf.py index bdf96ac..c012698 100644 --- a/orchestration/flows/bl832/alcf.py +++ b/orchestration/flows/bl832/alcf.py @@ -35,7 +35,7 @@ def __init__( # The block must be registered with the name "alcf-allocation-root-path" logger = get_run_logger() allocation_data = Variable.get("alcf-allocation-root-path", _sync=True) - self.allocation_root = allocation_data.get("alcf-allocation-root-path") + self.allocation_root = allocation_data.get("alcf-allocation-root-path") # eagle/SYNAPS-I/ if not self.allocation_root: raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'") logger.info(f"Allocation root loaded: {self.allocation_root}") @@ -57,17 +57,19 @@ def reconstruct( file_name = Path(file_path).stem + ".h5" folder_name = Path(file_path).parent.name - iri_als_bl832_rundir = f"{self.allocation_root}/data/raw" - iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py" + rundir = f"{self.allocation_root}/data/bl832/raw" + recon_script = f"{self.allocation_root}/reconstruction/scripts/globus_reconstruction.py" gcc = Client(code_serialization_strategy=CombinedCode()) + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF") future = fxe.submit( self._reconstruct_wrapper, - iri_als_bl832_rundir, - iri_als_bl832_recon_script, + rundir, + recon_script, file_name, folder_name ) @@ -76,8 +78,8 @@ def reconstruct( @staticmethod def _reconstruct_wrapper( - rundir: str = "/eagle/IRIProd/ALS/data/raw", - script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py", + rundir: str = "/eagle/SYNAPS-I/data/bl832/raw", + script_path: str = "/eagle/SYNAPS-I/reconstruction/scripts/globus_reconstruction.py", h5_file_name: str = None, folder_path: str = None ) -> str: @@ -185,6 +187,101 @@ def _build_multi_resolution_wrapper( f"Converted tiff files to zarr;\n {zarr_res}" ) + def segmentation( + self, + folder_path: str = "", + ) -> bool: + """ + Run tomography segmentation at ALCF through Globus Compute. + + :param folder_path: Path to the TIFF folder to be processed. + + :return: True if the task completed successfully, False otherwise. + """ + logger = get_run_logger() + + # Operate on reconstructed data + rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{Path(folder_path).name}" + output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{Path(folder_path).name}" + segmentation_script = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py" + + gcc = Client(code_serialization_strategy=CombinedCode()) + + # TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID + # We will probably have 2 endpoints, one for recon, one for segmentation + with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe: + logger.info(f"Running segmentation on {folder_path} at ALCF") + future = fxe.submit( + self._segmentation_wrapper, + input_dir=rundir, + output_dir=output_dir, + script_path=segmentation_script, + output_dir=folder_path, + ) + result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10) + return result + + @staticmethod + def _segmentation_wrapper( + input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/", + output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/", + script_path: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py", + nproc_per_node: int = 4, + nnodes: int = 1, + nnode_rank: int = 0, + master_addr: str = "localhost", + master_port: str = "29500", + patch_size: int = 512, + batch_size: int = 1, + num_workers: int = 4, + confidence: float = 0.5, + prompts: list[str] = ["background", "cell"], + ) -> str: + """ + Python function that wraps around the application call for segmentation on ALCF + + :param rundir: the directory on the eagle file system (ALCF) where the input data are located + :param script_path: the path to the script that will run the segmentation + :param folder_path: the path to the folder containing the TIFF data to be segmented + :return: confirmation message + """ + import os + import subprocess + import time + + seg_start = time.time() + + # Move to directory where data are located + os.chdir(input_dir) + + # Run segmentation.py + command = [ + "torchrun", + f"--nproc_per_node={nproc_per_node}", + f"--nnodes={nnodes}", + f"--node_rank={nnode_rank}", + f"--master_addr={master_addr}", + f"--master_port={master_port}", + "-m", script_path, + "--input-dir", input_dir, + "--output-dir", output_dir, + "--patch-size", str(patch_size), + "--batch-size", str(batch_size), + "--num-workers", str(num_workers), + "--confidence", str(confidence), + "--prompts", *prompts, + ] + + segment_res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) + + seg_end = time.time() + + print(f"Segmented data in {input_dir} in {seg_end-seg_start} seconds;\n {segment_res}") + return ( + f"Segmented data specified in {input_dir} in {seg_end-seg_start} seconds;\n" + f"{segment_res}" + ) + @staticmethod def _wait_for_globus_compute_future( future: Future, @@ -368,7 +465,7 @@ def alcf_recon_flow( config: Optional[Config832] = None, ) -> bool: """ - Process and transfer a file from a source to the ALCF. + Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation. Args: file_path (str): The path to the file to be processed. @@ -437,51 +534,91 @@ def alcf_recon_flow( destination=config.data832_scratch ) - # STEP 2B: Run the Tiff to Zarr Globus Flow - logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") - alcf_multi_res_success = tomography_controller.build_multi_resolution( - file_path=file_path, + # STEP 3: Run the Segmentation Task at ALCF + logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}") + alcf_segmentation_success = alcf_segmentation_task( + recon_folder_path=scratch_path_tiff, + config=config ) - if not alcf_multi_res_success: - logger.error("Tiff to Zarr Failed.") - raise ValueError("Tiff to Zarr at ALCF Failed") + if not alcf_segmentation_success: + logger.warning("Segmentation at ALCF Failed") else: - logger.info("Tiff to Zarr Successful.") - # Transfer B: Send reconstructed data (zarr) to data832 - logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " - f"at ALCF to {config.data832_scratch} at data832") - data832_zarr_transfer_success = transfer_controller.copy( - file_path=scratch_path_zarr, - source=config.alcf832_scratch, - destination=config.data832_scratch - ) + logger.info("Segmentation at ALCF Successful") + + # Not running TIFF to Zarr conversion at ALCF for now + # STEP 2B: Run the Tiff to Zarr Globus Flow + # logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}") + # alcf_multi_res_success = tomography_controller.build_multi_resolution( + # file_path=file_path, + # ) + # if not alcf_multi_res_success: + # logger.error("Tiff to Zarr Failed.") + # raise ValueError("Tiff to Zarr at ALCF Failed") + # else: + # logger.info("Tiff to Zarr Successful.") + # # Transfer B: Send reconstructed data (zarr) to data832 + # logger.info(f"Transferring {file_name} from {config.alcf832_scratch} " + # f"at ALCF to {config.data832_scratch} at data832") + # data832_zarr_transfer_success = transfer_controller.copy( + # file_path=scratch_path_zarr, + # source=config.alcf832_scratch, + # destination=config.data832_scratch + # ) # Place holder in case we want to transfer to NERSC for long term storage nersc_transfer_success = False - data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success + # data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success schedule_pruning( alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None, - alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, + # alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None, nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None, data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None, data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None, - data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, + # data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr one_minute=False, # Set to False for production durations config=config ) # TODO: ingest to scicat - if alcf_reconstruction_success and alcf_multi_res_success: + if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success: return True else: return False -if __name__ == "__main__": +@task(name="alcf_segmentation_task") +def alcf_segmentation_task( + recon_folder_path: str, + config: Optional[Config832] = None, +): + logger = get_run_logger() + if config is None: + logger.info("No config provided, using default Config832.") + config = Config832() + + # Initialize the Tomography Controller and run the segmentation + logger.info("Initializing ALCF Tomography HPC Controller.") + tomography_controller = get_controller( + hpc_type=HPC.ALCF, + config=config + ) + logger.info(f"Starting ALCF segmentation task for {recon_folder_path=}") + alcf_segmentation_success = tomography_controller.segmentation( + recon_folder_path=recon_folder_path, + ) + if not alcf_segmentation_success: + logger.error("Segmentation Failed.") + else: + logger.info("Segmentation Successful.") + return alcf_segmentation_success + + +@flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test") +def alcf_segmentation_integration_test(): folder_name = 'dabramov' file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast' flow_success = alcf_recon_flow( From dd155f7d21bd04987a03d3c806c16a43a623f167 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:30:02 -0800 Subject: [PATCH 6/7] ensuring self.config for scicat and ghcr images --- orchestration/flows/bl832/config.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 7294b0a..17a2afb 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -27,5 +27,5 @@ def _beam_specific_config(self) -> None: self.alcf832_synaps = self.endpoints["alcf832_synaps"] self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] - self.scicat = config["scicat"] - self.ghcr_images832 = config["ghcr_images832"] + self.scicat = self.config["scicat"] + self.ghcr_images832 = self.config["ghcr_images832"] \ No newline at end of file From 719d4c6b6f68fd3d122a04bfd85aab3cbd656df2 Mon Sep 17 00:00:00 2001 From: David Abramov Date: Fri, 23 Jan 2026 10:30:39 -0800 Subject: [PATCH 7/7] linting --- orchestration/flows/bl832/config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orchestration/flows/bl832/config.py b/orchestration/flows/bl832/config.py index 17a2afb..da75341 100644 --- a/orchestration/flows/bl832/config.py +++ b/orchestration/flows/bl832/config.py @@ -28,4 +28,4 @@ def _beam_specific_config(self) -> None: self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"] self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"] self.scicat = self.config["scicat"] - self.ghcr_images832 = self.config["ghcr_images832"] \ No newline at end of file + self.ghcr_images832 = self.config["ghcr_images832"]