Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -72,17 +72,23 @@ globus:
uuid: 75b478b2-37af-46df-bfbd-71ed692c6506
name: data832_scratch

alcf832_raw:
alcf832_synaps:
root_path: /
uri: alcf.anl.gov
uuid: TBD
name: alcf832_synaps

alcf832_iri_raw:
root_path: /data/raw
uri: alcf.anl.gov
uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7
name: alcf_raw
name: alcf_iri_raw

alcf832_scratch:
alcf832_iri_scratch:
root_path: /data/scratch
uri: alcf.anl.gov
uuid: 55c3adf6-31f1-4647-9a38-52591642f7e7
name: alcf_scratch
name: alcf_iri_scratch

alcf_eagle832:
root_path: /IRIBeta/als/example
Expand Down
195 changes: 166 additions & 29 deletions orchestration/flows/bl832/alcf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def __init__(
# The block must be registered with the name "alcf-allocation-root-path"
logger = get_run_logger()
allocation_data = Variable.get("alcf-allocation-root-path", _sync=True)
self.allocation_root = allocation_data.get("alcf-allocation-root-path")
self.allocation_root = allocation_data.get("alcf-allocation-root-path") # eagle/SYNAPS-I/
if not self.allocation_root:
raise ValueError("Allocation root not found in JSON block 'alcf-allocation-root-path'")
logger.info(f"Allocation root loaded: {self.allocation_root}")
Expand All @@ -57,17 +57,19 @@ def reconstruct(
file_name = Path(file_path).stem + ".h5"
folder_name = Path(file_path).parent.name

iri_als_bl832_rundir = f"{self.allocation_root}/data/raw"
iri_als_bl832_recon_script = f"{self.allocation_root}/scripts/globus_reconstruction.py"
rundir = f"{self.allocation_root}/data/bl832/raw"
recon_script = f"{self.allocation_root}/reconstruction/scripts/globus_reconstruction.py"

gcc = Client(code_serialization_strategy=CombinedCode())

# TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID
# We will probably have 2 endpoints, one for recon, one for segmentation
with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe:
logger.info(f"Running Tomopy reconstruction on {file_name} at ALCF")
future = fxe.submit(
self._reconstruct_wrapper,
iri_als_bl832_rundir,
iri_als_bl832_recon_script,
rundir,
recon_script,
file_name,
folder_name
)
Expand All @@ -76,8 +78,8 @@ def reconstruct(

@staticmethod
def _reconstruct_wrapper(
rundir: str = "/eagle/IRIProd/ALS/data/raw",
script_path: str = "/eagle/IRIProd/ALS/scripts/globus_reconstruction.py",
rundir: str = "/eagle/SYNAPS-I/data/bl832/raw",
script_path: str = "/eagle/SYNAPS-I/reconstruction/scripts/globus_reconstruction.py",
h5_file_name: str = None,
folder_path: str = None
) -> str:
Expand Down Expand Up @@ -185,6 +187,101 @@ def _build_multi_resolution_wrapper(
f"Converted tiff files to zarr;\n {zarr_res}"
)

def segmentation(
self,
folder_path: str = "",
) -> bool:
"""
Run tomography segmentation at ALCF through Globus Compute.

:param folder_path: Path to the TIFF folder to be processed.

:return: True if the task completed successfully, False otherwise.
"""
logger = get_run_logger()

# Operate on reconstructed data
rundir = f"{self.allocation_root}/data/bl832/scratch/reconstruction/{Path(folder_path).name}"
output_dir = f"{self.allocation_root}/data/bl832/scratch/segmentation/{Path(folder_path).name}"
segmentation_script = f"{self.allocation_root}/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py"

gcc = Client(code_serialization_strategy=CombinedCode())

# TODO: Update globus-compute-endpoint Secret block with the new endpoint UUID
# We will probably have 2 endpoints, one for recon, one for segmentation
with Executor(endpoint_id=Secret.load("globus-compute-endpoint").get(), client=gcc) as fxe:
logger.info(f"Running segmentation on {folder_path} at ALCF")
future = fxe.submit(
self._segmentation_wrapper,
input_dir=rundir,
output_dir=output_dir,
script_path=segmentation_script,
output_dir=folder_path,
)
result = self._wait_for_globus_compute_future(future, "segmentation", check_interval=10)
return result

@staticmethod
def _segmentation_wrapper(
input_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/reconstruction/",
output_dir: str = "/eagle/SYNAPS-I/data/bl832/scratch/segmentation/",
script_path: str = "/eagle/SYNAPS-I/segmentation/scripts/forge_feb_seg_model_demo/src/inference.py",
nproc_per_node: int = 4,
nnodes: int = 1,
nnode_rank: int = 0,
master_addr: str = "localhost",
master_port: str = "29500",
patch_size: int = 512,
batch_size: int = 1,
num_workers: int = 4,
confidence: float = 0.5,
prompts: list[str] = ["background", "cell"],
) -> str:
"""
Python function that wraps around the application call for segmentation on ALCF

:param rundir: the directory on the eagle file system (ALCF) where the input data are located
:param script_path: the path to the script that will run the segmentation
:param folder_path: the path to the folder containing the TIFF data to be segmented
:return: confirmation message
"""
import os
import subprocess
import time

seg_start = time.time()

# Move to directory where data are located
os.chdir(input_dir)

# Run segmentation.py
command = [
"torchrun",
f"--nproc_per_node={nproc_per_node}",
f"--nnodes={nnodes}",
f"--node_rank={nnode_rank}",
f"--master_addr={master_addr}",
f"--master_port={master_port}",
"-m", script_path,
"--input-dir", input_dir,
"--output-dir", output_dir,
"--patch-size", str(patch_size),
"--batch-size", str(batch_size),
"--num-workers", str(num_workers),
"--confidence", str(confidence),
"--prompts", *prompts,
]

segment_res = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

seg_end = time.time()

print(f"Segmented data in {input_dir} in {seg_end-seg_start} seconds;\n {segment_res}")
return (
f"Segmented data specified in {input_dir} in {seg_end-seg_start} seconds;\n"
f"{segment_res}"
)

@staticmethod
def _wait_for_globus_compute_future(
future: Future,
Expand Down Expand Up @@ -368,7 +465,7 @@ def alcf_recon_flow(
config: Optional[Config832] = None,
) -> bool:
"""
Process and transfer a file from a source to the ALCF.
Process and transfer a file from bl832 to ALCF and run reconstruction and segmentation.

Args:
file_path (str): The path to the file to be processed.
Expand Down Expand Up @@ -437,51 +534,91 @@ def alcf_recon_flow(
destination=config.data832_scratch
)

# STEP 2B: Run the Tiff to Zarr Globus Flow
logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}")
alcf_multi_res_success = tomography_controller.build_multi_resolution(
file_path=file_path,
# STEP 3: Run the Segmentation Task at ALCF
logger.info(f"Starting ALCF segmentation task for {scratch_path_tiff=}")
alcf_segmentation_success = alcf_segmentation_task(
recon_folder_path=scratch_path_tiff,
config=config
)
if not alcf_multi_res_success:
logger.error("Tiff to Zarr Failed.")
raise ValueError("Tiff to Zarr at ALCF Failed")
if not alcf_segmentation_success:
logger.warning("Segmentation at ALCF Failed")
else:
logger.info("Tiff to Zarr Successful.")
# Transfer B: Send reconstructed data (zarr) to data832
logger.info(f"Transferring {file_name} from {config.alcf832_scratch} "
f"at ALCF to {config.data832_scratch} at data832")
data832_zarr_transfer_success = transfer_controller.copy(
file_path=scratch_path_zarr,
source=config.alcf832_scratch,
destination=config.data832_scratch
)
logger.info("Segmentation at ALCF Successful")

# Not running TIFF to Zarr conversion at ALCF for now
# STEP 2B: Run the Tiff to Zarr Globus Flow
# logger.info(f"Starting ALCF tiff to zarr flow for {file_path=}")
# alcf_multi_res_success = tomography_controller.build_multi_resolution(
# file_path=file_path,
# )
# if not alcf_multi_res_success:
# logger.error("Tiff to Zarr Failed.")
# raise ValueError("Tiff to Zarr at ALCF Failed")
# else:
# logger.info("Tiff to Zarr Successful.")
# # Transfer B: Send reconstructed data (zarr) to data832
# logger.info(f"Transferring {file_name} from {config.alcf832_scratch} "
# f"at ALCF to {config.data832_scratch} at data832")
# data832_zarr_transfer_success = transfer_controller.copy(
# file_path=scratch_path_zarr,
# source=config.alcf832_scratch,
# destination=config.data832_scratch
# )

# Place holder in case we want to transfer to NERSC for long term storage
nersc_transfer_success = False

data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success
# data832_tiff_transfer_success, data832_zarr_transfer_success, nersc_transfer_success
schedule_pruning(
alcf_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
alcf_scratch_path_tiff=f"{scratch_path_tiff}" if alcf_reconstruction_success else None,
alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None,
# alcf_scratch_path_zarr=f"{scratch_path_zarr}" if alcf_multi_res_success else None, # Commenting out zarr for now
nersc_scratch_path_tiff=f"{scratch_path_tiff}" if nersc_transfer_success else None,
nersc_scratch_path_zarr=f"{scratch_path_zarr}" if nersc_transfer_success else None,
data832_raw_path=f"{folder_name}/{h5_file_name}" if alcf_transfer_success else None,
data832_scratch_path_tiff=f"{scratch_path_tiff}" if data832_tiff_transfer_success else None,
data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None,
# data832_scratch_path_zarr=f"{scratch_path_zarr}" if data832_zarr_transfer_success else None, # Commenting out zarr
one_minute=False, # Set to False for production durations
config=config
)

# TODO: ingest to scicat

if alcf_reconstruction_success and alcf_multi_res_success:
if alcf_reconstruction_success and alcf_segmentation_success: # and alcf_multi_res_success:
return True
else:
return False


if __name__ == "__main__":
@task(name="alcf_segmentation_task")
def alcf_segmentation_task(
recon_folder_path: str,
config: Optional[Config832] = None,
):
logger = get_run_logger()
if config is None:
logger.info("No config provided, using default Config832.")
config = Config832()

# Initialize the Tomography Controller and run the segmentation
logger.info("Initializing ALCF Tomography HPC Controller.")
tomography_controller = get_controller(
hpc_type=HPC.ALCF,
config=config
)
logger.info(f"Starting ALCF segmentation task for {recon_folder_path=}")
alcf_segmentation_success = tomography_controller.segmentation(
recon_folder_path=recon_folder_path,
)
if not alcf_segmentation_success:
logger.error("Segmentation Failed.")
else:
logger.info("Segmentation Successful.")
return alcf_segmentation_success


@flow(name="alcf_segmentation_integration_test", flow_run_name="alcf_segmentation_integration_test")
def alcf_segmentation_integration_test():
folder_name = 'dabramov'
file_name = '20230606_151124_jong-seto_fungal-mycelia_roll-AQ_fungi1_fast'
flow_success = alcf_recon_flow(
Expand Down
5 changes: 3 additions & 2 deletions orchestration/flows/bl832/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ def _beam_specific_config(self) -> None:
self.nersc832_alsdev_pscratch_raw = self.endpoints["nersc832_alsdev_pscratch_raw"]
self.nersc832_alsdev_pscratch_scratch = self.endpoints["nersc832_alsdev_pscratch_scratch"]
self.nersc832_alsdev_recon_scripts = self.endpoints["nersc832_alsdev_recon_scripts"]
self.alcf832_raw = self.endpoints["alcf832_raw"]
self.alcf832_scratch = self.endpoints["alcf832_scratch"]
self.alcf832_synaps = self.endpoints["alcf832_synaps"]
self.alcf832_iri_raw = self.endpoints["alcf832_iri_raw"]
self.alcf832_iri_scratch = self.endpoints["alcf832_iri_scratch"]
self.scicat = self.config["scicat"]
self.ghcr_images832 = self.config["ghcr_images832"]
39 changes: 39 additions & 0 deletions scripts/polaris/globus_compute_recon_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
engine:
type: GlobusComputeEngine # This engine uses the HighThroughputExecutor
max_retries_on_system_failure: 2
max_workers: 1 # Sets one worker per node
prefetch_capacity: 0 # Increase if you have many more tasks than workers

address:
type: address_by_interface
ifname: bond0

strategy: simple
job_status_kwargs:
max_idletime: 300
strategy_period: 60

provider:
type: PBSProProvider

launcher:
type: MpiExecLauncher
# Ensures 1 manger per node, work on all 64 cores
bind_cmd: --cpu-bind
overrides: --depth=64 --ppn 1

account: SYNAPS-I
queue: debug
cpus_per_node: 64

# e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe"
scheduler_options: "#PBS -l filesystems=home:eagle"

# Node setup: activate necessary conda environment and such
worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction"

walltime: 00:60:00 # Jobs will end after 60 minutes
nodes_per_block: 2 # All jobs will have 1 node
init_blocks: 0
min_blocks: 0
max_blocks: 2 # No more than 1 job will be scheduled at a time
41 changes: 41 additions & 0 deletions scripts/polaris/globus_compute_segment_config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# This needs to be updated to use GPUs and a segmentation environment

engine:
type: GlobusComputeEngine # This engine uses the HighThroughputExecutor
max_retries_on_system_failure: 2
max_workers: 1 # Sets one worker per node
prefetch_capacity: 0 # Increase if you have many more tasks than workers

address:
type: address_by_interface
ifname: bond0

strategy: simple
job_status_kwargs:
max_idletime: 300
strategy_period: 60

provider:
type: PBSProProvider

launcher:
type: MpiExecLauncher
# Ensures 1 manger per node, work on all 64 cores
bind_cmd: --cpu-bind
overrides: --depth=64 --ppn 1

account: SYNAPS-I
queue: debug
cpus_per_node: 64

# e.g., "#PBS -l filesystems=home:grand:eagle\n#PBS -k doe"
scheduler_options: "#PBS -l filesystems=home:eagle"

# Node setup: activate necessary conda environment and such
worker_init: "module use /soft/modulefiles; module load conda; conda activate /eagle/SYNAPS-I/reconstruction/env/tomopy; export PATH=$PATH:/eagle/SYNAPSE-I/; cd $HOME/.globus_compute/globus_compute_reconstruction"

walltime: 00:60:00 # Jobs will end after 60 minutes
nodes_per_block: 2 # All jobs will have 1 node
init_blocks: 0
min_blocks: 0
max_blocks: 2 # No more than 1 job will be scheduled at a time
Loading