diff --git a/lfric_macros/apply_macros.py b/lfric_macros/apply_macros.py index d2af98ec..29000abe 100755 --- a/lfric_macros/apply_macros.py +++ b/lfric_macros/apply_macros.py @@ -21,6 +21,7 @@ import yaml import networkx as nx from collections import defaultdict +from concurrent.futures import as_completed, ThreadPoolExecutor from pathlib import Path BLACK_COMMAND = "black --line-length=80" @@ -927,7 +928,16 @@ def order_meta_dirs(self) -> list[str]: # guaranteed for valid rose metadata return list(nx.topological_sort(import_graph)) - def preprocess_macros(self) -> None: + def write_macros(self, meta_dir: Path, full_command: str) -> None: + """ + Overarching function for writing processed macros to versions files + """ + + self.write_python_imports(meta_dir) + self.write_new_macro(meta_dir, full_command, self.target_macros[meta_dir]) + self.sections_with_macro.append(meta_dir) + + def preprocess_macros(self, nproc: int) -> None: """ Overarching function to pre-process added macros Run before running any rose macro upgrade commands" @@ -991,6 +1001,7 @@ def preprocess_macros(self) -> None: # Now reconstruct the macro for all applications which have the newly # added macro or import metadata with the new macro # The macro sections need to be processed in the order of import + macros_to_write = {} for meta_dir in self.order_meta_dirs(): import_order = self.determine_import_order(meta_dir) full_command = self.combine_macros(import_order) @@ -1009,22 +1020,34 @@ def preprocess_macros(self) -> None: if last_after_tag: self.target_macros[meta_dir]["before_tag"] = last_after_tag + macros_to_write[meta_dir] = full_command + + with ThreadPoolExecutor(max_workers=nproc) as executor: + meta_order = sorted(macros_to_write.keys()) + write_tasks = [ + executor.submit(self.write_macros, meta_dir, macros_to_write[meta_dir]) + for meta_dir in meta_order + ] + for task in as_completed(write_tasks): + exception = task.exception() + if exception is not None: + executor.shutdown(wait=False, cancel_futures=True) + raise exception print( - "[INFO] Writing macros to", - self.parse_application_section(meta_dir), + "[INFO] Processed macro successfully written to " + f"{ + self.parse_application_section( + meta_order[write_tasks.index(task)] + ) + }" ) - self.write_python_imports(meta_dir) - self.write_new_macro( - meta_dir, full_command, self.target_macros[meta_dir] - ) - self.sections_with_macro.append(meta_dir) ############################################################################ # Upgrade Apps Functions ############################################################################ def metadata_check(self, meta_dir: Path) -> None: - """ " + """ Note: Not currently run - see comment below Run rose metadata-check on rose metadata directories to check the validity of the metadata. @@ -1091,16 +1114,14 @@ def run_app_upgrade(self, app_path: Path) -> None: - app_path, the path to this app """ app = app_path.name - print(f"[INFO] Upgrading the rose-stem app {app}") command = f"rose app-upgrade -a -y -C {app_path} {self.tag}" result = run_command(command) if result.returncode: - print(f"[FAIL] The rose-stem app {app} failed to upgrade") raise RuntimeError( + f"[FAIL] The rose-stem app {app} failed to upgrade\n\n" f"\nThe command run:\n{command}" f"\nThe error message produced:\n{result.stderr}" ) - print(f"[PASS] Upgraded rose-stem app {app} successfully") def run_macro_fix(self, app_path: Path) -> None: """ @@ -1110,18 +1131,16 @@ def run_macro_fix(self, app_path: Path) -> None: - app_path, the path to this app """ app = app_path.name - print(f"[INFO] Forcing metadata consistency in app {app}") command = f"rose macro --fix -y -C {app_path}" result = run_command(command) if result.returncode: - print(f"[FAIL] Unable to force metadata consistency in {app}") raise RuntimeError( + f"[FAIL] Unable to force metadata consistency in {app}\n\n" f"\nThe command run:\n{command}" f"\nThe error message produced:\n{result.stderr}" ) - print(f"[PASS] Successfully forced metadata consistency in {app}") - def upgrade_apps(self) -> None: + def upgrade_apps(self, nproc: int) -> None: """ Overarching function to run rose commands to apply upgrade macros to rose-stem apps. @@ -1147,10 +1166,36 @@ def upgrade_apps(self) -> None: banner_print("[INFO] Upgrading Apps") upgradeable_apps = self.apps_to_upgrade() for app_path in upgradeable_apps: - self.run_app_upgrade(app_path) - self.run_macro_fix(app_path) if self.core_source in app_path.parents: self.upgraded_core = True + break + + with ThreadPoolExecutor(max_workers=nproc) as executor: + upgrade_tasks = [ + executor.submit(self.run_app_upgrade, app) for app in upgradeable_apps + ] + for task in as_completed(upgrade_tasks): + exception = task.exception() + if exception is not None: + executor.shutdown(wait=False, cancel_futures=True) + raise exception + print( + "[PASS] Upgraded rose-stem app " + f"{upgradeable_apps[upgrade_tasks.index(task)].name} successfully" + ) + + fix_tasks = [ + executor.submit(self.run_macro_fix, app) for app in upgradeable_apps + ] + for task in as_completed(fix_tasks): + exception = task.exception() + if exception is not None: + executor.shutdown(wait=False, cancel_futures=True) + raise exception + print( + "[PASS] Successfully forced metadata consistency in " + f"{upgradeable_apps[fix_tasks.index(task)].name}" + ) def check_tag(opt: str | None) -> str | None: @@ -1223,6 +1268,9 @@ def parse_args() -> argparse.Namespace: "Either a path to a working copy or a git source." "If not set, will be read from the dependencies.yaml", ) + parser.add_argument( + "-p", "--processes", type=int, default=4, help="Number of processes to use" + ) return parser.parse_args() @@ -1243,10 +1291,10 @@ def apply_macros_main( # Pre-process macros banner_print("Pre-Processing Macros") - macro_object.preprocess_macros() + macro_object.preprocess_macros(args.processes) # Upgrade Rose Stem Apps - macro_object.upgrade_apps() + macro_object.upgrade_apps(args.processes) # Clean up temporary directories for repo, directory in macro_object.temp_dirs.items():