From 7ea62d88aa5352c7ca0c3c4ec737cb22d29d2f1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 04:09:33 +0000 Subject: [PATCH 01/24] pass dgxc to ft_launcher MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/packaging.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_run/run/torchx_backend/packaging.py b/nemo_run/run/torchx_backend/packaging.py index 84b9dc4c..8a850de4 100644 --- a/nemo_run/run/torchx_backend/packaging.py +++ b/nemo_run/run/torchx_backend/packaging.py @@ -203,6 +203,7 @@ def _get_details_from_script(fn_or_script: Script, serialize_configs: bool): log_level=launcher.log_level, max_retries=executor.retries, max_restarts=launcher.max_restarts, + dgxc=isinstance(executor, DGXCloudExecutor), use_env=use_env, ) else: From 02efa9fc013b2049ffa4c64f5df346882c5fc7cd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:13:51 +0000 Subject: [PATCH 02/24] feat: Add FT to DGXC MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 56 ++++++++++++++- nemo_run/core/execution/templates/dgxc.sh.j2 | 47 +++++++++++++ .../execution/templates/ft_launcher_dgxc.j2 | 69 +++++++++++++++++++ .../{ft_launcher.j2 => ft_launcher_slurm.j2} | 0 nemo_run/core/execution/templates/slurm.sh.j2 | 2 +- .../run/torchx_backend/schedulers/dgxcloud.py | 23 ++++++- 6 files changed, 193 insertions(+), 4 deletions(-) create mode 100644 nemo_run/core/execution/templates/dgxc.sh.j2 create mode 100644 nemo_run/core/execution/templates/ft_launcher_dgxc.j2 rename nemo_run/core/execution/templates/{ft_launcher.j2 => ft_launcher_slurm.j2} (100%) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 13596ebf..d22c0b20 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -21,7 +21,7 @@ import subprocess import tempfile import time -from dataclasses import dataclass, field +from dataclasses import asdict, dataclass, field from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional @@ -31,6 +31,8 @@ from nemo_run.config import get_nemorun_home from nemo_run.core.execution.base import Executor, ExecutorMacros +from nemo_run.core.execution.launcher import FaultTolerance, Launcher +from nemo_run.core.execution.utils import fill_template from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager @@ -556,3 +558,55 @@ def _default_headers(self, token: Optional[str] = None) -> dict: if token: headers["Authorization"] = f"Bearer {token}" return headers + + +@dataclass(kw_only=True) +class DGXCloudRequest: + launch_cmd: list[str] + jobs: list[str] + executor: DGXCloudExecutor + max_retries: int + extra_env: dict[str, str] + launcher: Optional[Launcher] = None + + def materialize(self) -> str: + """Creates the content of a DGXC entrypoint script.""" + + # 1. Environment Variables + # Combine executor defaults with extra envs + env_vars = [] + full_env_vars = self.executor.env_vars | self.extra_env + for key, value in full_env_vars.items(): + env_vars.append(f"export {key.upper()}={value}") + + # 3. Prepare Template Variables + vars_to_fill = { + "max_retries": self.max_retries, + "env_vars": env_vars, + "training_command": " ".join(self.launch_cmd), + "ft_enabled": self.launcher and isinstance(self.launcher, FaultTolerance), + } + + # 4. Fault Tolerance Injection + if self.launcher and isinstance(self.launcher, FaultTolerance): + assert ( + self.launcher.cfg_path + and self.launcher.finished_flag_file + and self.launcher.job_results_file + ), "Fault Tolerance requires cfg_path, finished_flag_file, and job_results_file" + + vars_to_fill["fault_tol_cfg_path"] = self.launcher.cfg_path + vars_to_fill["fault_tol_finished_flag_file"] = self.launcher.finished_flag_file + vars_to_fill["fault_tol_job_results_file"] = self.launcher.job_results_file + + # Render the template + entrypoint_script = fill_template("dgxc.sh.j2", vars_to_fill) + return entrypoint_script + + def __repr__(self) -> str: + return f"""# DGXC Entrypoint Script Request +# Executor: {self.executor.__class__.__name__} +# Jobs: {self.jobs} +# --------------------------------------------------- +{self.materialize()} +""" diff --git a/nemo_run/core/execution/templates/dgxc.sh.j2 b/nemo_run/core/execution/templates/dgxc.sh.j2 new file mode 100644 index 00000000..be5dee05 --- /dev/null +++ b/nemo_run/core/execution/templates/dgxc.sh.j2 @@ -0,0 +1,47 @@ +{%- import "ft_launcher_k8s.j2" as fault_tolerance -%} +#!/bin/bash +# +# Generated by NeMo Run for Kubernetes (PyTorchJob) +# + +# 1. Basic Shell Setup +set -evx # Print commands, but DO NOT exit immediately on error (we handle that below) +export PYTHONUNBUFFERED=1 +export TORCHX_MAX_RETRIES={{max_retries}} + +# 2. Environment Variables +# These are strictly user-defined vars (e.g. HYDRA_FULL_ERROR). +# Note: MASTER_ADDR, WORLD_SIZE, RANK are injected automatically by the PyTorchJob operator. +{%- for env_var in env_vars %} +{{env_var}} +{%- endfor %} + +# 3. Fault Tolerance: SETUP (Check-in) +# Checks if we are resuming or if we are already finished. +{%- if ft_enabled %} +{{ fault_tolerance.ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) }} +{%- endif %} + +# 4. Main Execution +# In PyTorchJob, we usually have exactly one main command (torchrun). +# We assume the variable 'training_command' contains the full torchrun string. + +echo "Starting training command..." +set +e # Turn off auto-exit so we can capture the code +# --------------------------------------------------------- +{{ training_command }} +# --------------------------------------------------------- +exitcode=$? +set -e + +echo "Main command exited with code $exitcode" + +# 5. Fault Tolerance: TEARDOWN (Check-out) +# Decides if we should exit 0 (complete) or exit 1 (retry via K8s backoffLimit). +{%- if ft_enabled %} +{{ fault_tolerance.ft_launcher_teardown() }} +{%- else %} +# If FT is disabled, simply pass the exit code through. +# K8s will restart if exitcode != 0 and backoffLimit > 0. +exit $exitcode +{%- endif %} diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 new file mode 100644 index 00000000..2b1546f8 --- /dev/null +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -0,0 +1,69 @@ +{% macro ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) -%} +# ------------------------------------------------------------------------- +# K8s Fault Tolerance Setup (The "Check-In" Desk) +# ------------------------------------------------------------------------- + +# 1. Export Paths +# IMPORTANT: These paths must reside on a ReadWriteMany (RWX) Persistent Volume +# mounted to all Pods so state is preserved across pod restarts/rescheduling. +export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" +export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" +export FAULT_TOL_JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" + +# 2. Define Helper Functions +is_training_finished() { + test -f "$FAULT_TOL_FINISHED_FLAG_FILE" +} + +# 3. Check for Previous Success +# In K8s, a Pod might be restarted due to node maintenance even if the job +# logic was done. If the flag file exists, we exit immediately with 0. +if is_training_finished ; then + echo "[FT-Setup] Found finished flag at $FAULT_TOL_FINISHED_FLAG_FILE." + echo "[FT-Setup] Training is already complete. Exiting successfully." + exit 0 +fi + +# 4. Logging Start +# We use HOSTNAME (usually pod-name) as the identifier since SLURM_JOB_ID is gone. +# We append 'X' (Running/Unknown) to the log. +echo "[FT-Setup] Starting training on $(hostname)..." +# Optional: Log attempt to shared file (Using X for Running) +# Note: In high-scale K8s, writing to a single file from 1000 pods can cause lock contention. +# If scale is small, this is fine. +if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then + echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" +fi + +{%- endmacro %} + +{% macro ft_launcher_teardown() -%} +# ------------------------------------------------------------------------- +# K8s Fault Tolerance Teardown (The "Check-Out" Desk) +# ------------------------------------------------------------------------- + +# 1. Analyze Exit Code from the Main Command +# 'exitcode' is captured in the main script before calling this macro. +if [ "$exitcode" -eq "0" ]; then + RESULT_STATUS="S" # Success +else + RESULT_STATUS="F" # Failure +fi + +# 2. Update Log (Optional but helpful for debugging) +if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then + # We update the specific entry for this host from X to S or F + # Note: 'sed -i' on a shared PVC can be risky with concurrency. + # Appending a new status line is safer in K8s. + echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" +fi + +# 3. The Requeue Decision Logic +if [ "$exitcode" -eq "0" ]; then + # Case A: Script exited successfully. + # Verification: Did it actually finish (create the flag file)? + if is_training_finished; then + echo "[FT-Teardown] Job finished successfully and flag file exists." + exit 0 + else + # Edge Case: The python script exited 0, but didn't write the flag diff --git a/nemo_run/core/execution/templates/ft_launcher.j2 b/nemo_run/core/execution/templates/ft_launcher_slurm.j2 similarity index 100% rename from nemo_run/core/execution/templates/ft_launcher.j2 rename to nemo_run/core/execution/templates/ft_launcher_slurm.j2 diff --git a/nemo_run/core/execution/templates/slurm.sh.j2 b/nemo_run/core/execution/templates/slurm.sh.j2 index 26f756fa..dc2c93fa 100644 --- a/nemo_run/core/execution/templates/slurm.sh.j2 +++ b/nemo_run/core/execution/templates/slurm.sh.j2 @@ -1,4 +1,4 @@ -{%- import "ft_launcher.j2" as fault_tolerance -%} +{%- import "ft_launcher_slurm.j2" as fault_tolerance -%} #!/bin/bash # # Generated by NeMo Run diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index 4377ec71..b8baeed6 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -37,7 +37,7 @@ from nemo_run.config import get_nemorun_home from nemo_run.core.execution.base import Executor -from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudState +from nemo_run.core.execution.dgxcloud import DGXCloudExecutor, DGXCloudRequest, DGXCloudState from nemo_run.core.serialization.zlib_json import ZlibJSONSerializer from nemo_run.run.torchx_backend.schedulers.api import SchedulerMixin @@ -109,6 +109,23 @@ def _submit_dryrun( # type: ignore role = values.apply(role) cmd = [role.entrypoint] + role.args + + req = DGXCloudRequest( + launch_cmd=cmd, + jobs=[role.name], + executor=executor, + max_retries=role.max_retries, + extra_env=role.env, + launcher=executor.get_launcher(), + ) + + # Write and copy sbatch script + path = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + script = req.materialize() + + with open(path, "w") as f: + f.write(script) + return AppDryRunInfo( DGXRequest(app=app, executor=executor, cmd=cmd, name=role.name), # Minimal function to show the config, if any @@ -128,7 +145,9 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # The DGXExecutor's launch call typically returns (job_id, handle). # We'll call it without additional parameters here. - job_id, status = executor.launch(name=req.name, cmd=req.cmd) + cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + req.launch_cmd = ["bash", cmd] + job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd) if not job_id: raise RuntimeError("Failed scheduling run on DGX: no job_id returned") From 0be1693fd5015ed93e8bf219e07ccc632bbd2fa0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:16:37 +0000 Subject: [PATCH 03/24] torchrun_job MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/schedulers/dgxcloud.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py index b8baeed6..b786d3c0 100644 --- a/nemo_run/run/torchx_backend/schedulers/dgxcloud.py +++ b/nemo_run/run/torchx_backend/schedulers/dgxcloud.py @@ -120,7 +120,7 @@ def _submit_dryrun( # type: ignore ) # Write and copy sbatch script - path = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + path = os.path.join(executor.experiment_dir, "torchrun_job.sh") script = req.materialize() with open(path, "w") as f: @@ -145,7 +145,7 @@ def schedule(self, dryrun_info: AppDryRunInfo[DGXRequest]) -> str: # The DGXExecutor's launch call typically returns (job_id, handle). # We'll call it without additional parameters here. - cmd = os.path.join(executor.experiment_dir, f"{executor.job_name}_job.sh") + cmd = os.path.join(executor.experiment_dir, "torchrun_job.sh") req.launch_cmd = ["bash", cmd] job_id, status = executor.launch(name=req.name, cmd=req.launch_cmd) if not job_id: From 232cef796e65f2b9ac04501c9a2f21d2a106c769 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:19:25 +0000 Subject: [PATCH 04/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/dgxc.sh.j2 | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/core/execution/templates/dgxc.sh.j2 b/nemo_run/core/execution/templates/dgxc.sh.j2 index be5dee05..1a8c000a 100644 --- a/nemo_run/core/execution/templates/dgxc.sh.j2 +++ b/nemo_run/core/execution/templates/dgxc.sh.j2 @@ -1,4 +1,4 @@ -{%- import "ft_launcher_k8s.j2" as fault_tolerance -%} +{%- import "ft_launcher_dgxc.j2" as fault_tolerance -%} #!/bin/bash # # Generated by NeMo Run for Kubernetes (PyTorchJob) From cc1a276ade11a5be70699081dbc8bc7dc1237cc4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:20:05 +0000 Subject: [PATCH 05/24] format MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index d22c0b20..8e2b4474 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -21,7 +21,7 @@ import subprocess import tempfile import time -from dataclasses import asdict, dataclass, field +from dataclasses import dataclass, field from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional From f25924817da95e95f63c8a9229b6091a1329965d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 18:22:53 +0000 Subject: [PATCH 06/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 2b1546f8..8c3f392f 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -66,4 +66,19 @@ if [ "$exitcode" -eq "0" ]; then echo "[FT-Teardown] Job finished successfully and flag file exists." exit 0 else - # Edge Case: The python script exited 0, but didn't write the flag + # Edge Case: The python script exited 0, but didn't write the flag file. + # This usually means a silent crash or partial run. We must force a retry. + echo "[FT-Teardown] WARNING: Process exited 0 but finished flag is MISSING." + echo "[FT-Teardown] Forcing exit 1 to trigger Kubernetes restart." + exit 1 + fi +else + # Case B: Script crashed (exitcode != 0). + echo "[FT-Teardown] Job failed with exit code $exitcode." + + # We exit with the error code. + # The K8s 'backoffLimit' (in PyTorchJob spec) will determine if we restart. + # We do NOT calculate retry counts manually here. + exit $exitcode +fi +{%- endmacro %} From 4e15269a4e567de1e4b377c34cccc5c07355ced6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:36:42 +0000 Subject: [PATCH 07/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8e2b4474..1a706088 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -16,7 +16,6 @@ import base64 import glob import json -import logging import os import subprocess import tempfile @@ -30,14 +29,14 @@ from invoke.context import Context from nemo_run.config import get_nemorun_home +from nemo_run.core.console import CONSOLE +from nemo_run.core.constants import RUNDIR_NAME from nemo_run.core.execution.base import Executor, ExecutorMacros -from nemo_run.core.execution.launcher import FaultTolerance, Launcher +from nemo_run.core.execution.launcher import FaultTolerance, Launcher, Torchrun from nemo_run.core.execution.utils import fill_template from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager -logger = logging.getLogger(__name__) - class DGXCloudState(Enum): CREATING = "Creating" @@ -463,6 +462,24 @@ def cancel(self, job_id: str): response.text, ) + def _setup_launcher(self): + super()._setup_launcher() + launcher = self.launcher + if launcher and isinstance(launcher, (FaultTolerance, Torchrun)): + self.torchrun_nproc_per_node = self.nprocs_per_node + self.ntasks_per_node = 1 + CONSOLE.log( + f"Detected {launcher.__class__.__name__} launcher, setting ntasks_per_node=1 and torchrun_nproc_per_node={self.torchrun_nproc_per_node}" + ) + + if launcher and isinstance(launcher, FaultTolerance): + base_dir = os.path.join(self.job_dir, Path(self.job_dir).name) + launcher.cfg_path = os.path.join(base_dir, f"{self.job_name}_ft_cfg.yml") + launcher.finished_flag_file = os.path.join( + "/", RUNDIR_NAME, f"{self.job_name}_finished_flag" + ) + launcher.job_results_file = os.path.join(base_dir, f"{self.job_name}_job_results") + def cleanup(self, handle: str): ... def assign( From 0875083bdc507eed69fb92cc504d14685bc030da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:45:11 +0000 Subject: [PATCH 08/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 1a706088..8058757f 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -28,12 +28,11 @@ import requests from invoke.context import Context -from nemo_run.config import get_nemorun_home -from nemo_run.core.console import CONSOLE -from nemo_run.core.constants import RUNDIR_NAME +from nemo_run.config import RUNDIR_NAME, get_nemorun_home from nemo_run.core.execution.base import Executor, ExecutorMacros from nemo_run.core.execution.launcher import FaultTolerance, Launcher, Torchrun from nemo_run.core.execution.utils import fill_template +from nemo_run.core.frontend.console.api import CONSOLE from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager From db84ae07bc9e19563791be1e005039e45790824e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:51:26 +0000 Subject: [PATCH 09/24] revert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8058757f..8440eb64 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -24,7 +24,7 @@ from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional - +import logging import requests from invoke.context import Context @@ -37,6 +37,9 @@ from nemo_run.core.packaging.git import GitArchivePackager +logger = logging.getLogger(__name__) + + class DGXCloudState(Enum): CREATING = "Creating" INITIALIZING = "Initializing" @@ -626,3 +629,4 @@ def __repr__(self) -> str: # --------------------------------------------------- {self.materialize()} """ +""" From d19794339fc892be54b8b12bc6161aec4e3346a7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Thu, 11 Dec 2025 22:56:35 +0000 Subject: [PATCH 10/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/dgxcloud.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/nemo_run/core/execution/dgxcloud.py b/nemo_run/core/execution/dgxcloud.py index 8440eb64..1d17afdb 100644 --- a/nemo_run/core/execution/dgxcloud.py +++ b/nemo_run/core/execution/dgxcloud.py @@ -16,6 +16,7 @@ import base64 import glob import json +import logging import os import subprocess import tempfile @@ -24,7 +25,7 @@ from enum import Enum from pathlib import Path from typing import Any, Iterable, Optional -import logging + import requests from invoke.context import Context @@ -36,7 +37,6 @@ from nemo_run.core.packaging.base import Packager from nemo_run.core.packaging.git import GitArchivePackager - logger = logging.getLogger(__name__) @@ -629,4 +629,3 @@ def __repr__(self) -> str: # --------------------------------------------------- {self.materialize()} """ -""" From c507ddad378ea8431840fcb603a991f1444023c4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Dec 2025 00:03:19 +0000 Subject: [PATCH 11/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 2 ++ 1 file changed, 2 insertions(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 8c3f392f..87c23a27 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -55,6 +55,8 @@ if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then # We update the specific entry for this host from X to S or F # Note: 'sed -i' on a shared PVC can be risky with concurrency. # Appending a new status line is safer in K8s. + mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" + echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" fi From 620e0737f1eee97679fc4bd94d7b6aa1bbe2e367 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Dec 2025 00:20:59 +0000 Subject: [PATCH 12/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 87c23a27..d62fa400 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -32,6 +32,7 @@ echo "[FT-Setup] Starting training on $(hostname)..." # Note: In high-scale K8s, writing to a single file from 1000 pods can cause lock contention. # If scale is small, this is fine. if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then + mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" fi From eb98cb51cc8b646253b64d7d63e4b713504bf347 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Fri, 12 Dec 2025 00:43:38 +0000 Subject: [PATCH 13/24] cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/dgxc.sh.j2 | 22 ++--------- .../execution/templates/ft_launcher_dgxc.j2 | 37 ------------------- 2 files changed, 3 insertions(+), 56 deletions(-) diff --git a/nemo_run/core/execution/templates/dgxc.sh.j2 b/nemo_run/core/execution/templates/dgxc.sh.j2 index 1a8c000a..75bdede2 100644 --- a/nemo_run/core/execution/templates/dgxc.sh.j2 +++ b/nemo_run/core/execution/templates/dgxc.sh.j2 @@ -1,47 +1,31 @@ {%- import "ft_launcher_dgxc.j2" as fault_tolerance -%} #!/bin/bash -# -# Generated by NeMo Run for Kubernetes (PyTorchJob) -# -# 1. Basic Shell Setup set -evx # Print commands, but DO NOT exit immediately on error (we handle that below) export PYTHONUNBUFFERED=1 export TORCHX_MAX_RETRIES={{max_retries}} -# 2. Environment Variables -# These are strictly user-defined vars (e.g. HYDRA_FULL_ERROR). -# Note: MASTER_ADDR, WORLD_SIZE, RANK are injected automatically by the PyTorchJob operator. {%- for env_var in env_vars %} {{env_var}} {%- endfor %} -# 3. Fault Tolerance: SETUP (Check-in) -# Checks if we are resuming or if we are already finished. {%- if ft_enabled %} {{ fault_tolerance.ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) }} {%- endif %} -# 4. Main Execution -# In PyTorchJob, we usually have exactly one main command (torchrun). -# We assume the variable 'training_command' contains the full torchrun string. - echo "Starting training command..." set +e # Turn off auto-exit so we can capture the code -# --------------------------------------------------------- + {{ training_command }} -# --------------------------------------------------------- + exitcode=$? set -e echo "Main command exited with code $exitcode" -# 5. Fault Tolerance: TEARDOWN (Check-out) -# Decides if we should exit 0 (complete) or exit 1 (retry via K8s backoffLimit). {%- if ft_enabled %} {{ fault_tolerance.ft_launcher_teardown() }} {%- else %} -# If FT is disabled, simply pass the exit code through. -# K8s will restart if exitcode != 0 and backoffLimit > 0. + exit $exitcode {%- endif %} diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index d62fa400..f568879c 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -1,36 +1,20 @@ {% macro ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) -%} -# ------------------------------------------------------------------------- -# K8s Fault Tolerance Setup (The "Check-In" Desk) -# ------------------------------------------------------------------------- -# 1. Export Paths -# IMPORTANT: These paths must reside on a ReadWriteMany (RWX) Persistent Volume -# mounted to all Pods so state is preserved across pod restarts/rescheduling. export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" export FAULT_TOL_JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" -# 2. Define Helper Functions is_training_finished() { test -f "$FAULT_TOL_FINISHED_FLAG_FILE" } -# 3. Check for Previous Success -# In K8s, a Pod might be restarted due to node maintenance even if the job -# logic was done. If the flag file exists, we exit immediately with 0. if is_training_finished ; then echo "[FT-Setup] Found finished flag at $FAULT_TOL_FINISHED_FLAG_FILE." echo "[FT-Setup] Training is already complete. Exiting successfully." exit 0 fi -# 4. Logging Start -# We use HOSTNAME (usually pod-name) as the identifier since SLURM_JOB_ID is gone. -# We append 'X' (Running/Unknown) to the log. echo "[FT-Setup] Starting training on $(hostname)..." -# Optional: Log attempt to shared file (Using X for Running) -# Note: In high-scale K8s, writing to a single file from 1000 pods can cause lock contention. -# If scale is small, this is fine. if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" @@ -39,49 +23,28 @@ fi {%- endmacro %} {% macro ft_launcher_teardown() -%} -# ------------------------------------------------------------------------- -# K8s Fault Tolerance Teardown (The "Check-Out" Desk) -# ------------------------------------------------------------------------- - -# 1. Analyze Exit Code from the Main Command -# 'exitcode' is captured in the main script before calling this macro. if [ "$exitcode" -eq "0" ]; then RESULT_STATUS="S" # Success else RESULT_STATUS="F" # Failure fi -# 2. Update Log (Optional but helpful for debugging) if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then - # We update the specific entry for this host from X to S or F - # Note: 'sed -i' on a shared PVC can be risky with concurrency. - # Appending a new status line is safer in K8s. mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" - echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" fi -# 3. The Requeue Decision Logic if [ "$exitcode" -eq "0" ]; then - # Case A: Script exited successfully. - # Verification: Did it actually finish (create the flag file)? if is_training_finished; then echo "[FT-Teardown] Job finished successfully and flag file exists." exit 0 else - # Edge Case: The python script exited 0, but didn't write the flag file. - # This usually means a silent crash or partial run. We must force a retry. echo "[FT-Teardown] WARNING: Process exited 0 but finished flag is MISSING." echo "[FT-Teardown] Forcing exit 1 to trigger Kubernetes restart." exit 1 fi else - # Case B: Script crashed (exitcode != 0). echo "[FT-Teardown] Job failed with exit code $exitcode." - - # We exit with the error code. - # The K8s 'backoffLimit' (in PyTorchJob spec) will determine if we restart. - # We do NOT calculate retry counts manually here. exit $exitcode fi {%- endmacro %} From 096e0afe3c685852c57a463264de915d529a9c41 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 13:48:22 +0000 Subject: [PATCH 14/24] change template MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 62 +++++++++---------- 1 file changed, 29 insertions(+), 33 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index f568879c..b1549570 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -1,50 +1,46 @@ {% macro ft_launcher_setup(fault_tol_cfg_path, fault_tol_finished_flag_file, fault_tol_job_results_file) -%} - +# This script uses experimental fault tolerance launcher +# Fault tolerance related items export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" -export FAULT_TOL_JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" +ANY_JOB_STEP_FAILED=0 +# Automatic job resubmission related items +JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" +is_job_failures_limit_reached() { + if [ $TORCHX_MAX_RETRIES -eq 0 ]; then + true + else + tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" | \ + awk "/^[[:alnum:]]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" + fi +} is_training_finished() { - test -f "$FAULT_TOL_FINISHED_FLAG_FILE" + test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" } - -if is_training_finished ; then - echo "[FT-Setup] Found finished flag at $FAULT_TOL_FINISHED_FLAG_FILE." - echo "[FT-Setup] Training is already complete. Exiting successfully." - exit 0 -fi - -echo "[FT-Setup] Starting training on $(hostname)..." -if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then - mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" - echo "$(hostname) $(date +%s) X" >> "$FAULT_TOL_JOB_RESULTS_FILE" +# Exit immediately if finished flag file exists and this job is a continuation +if [ -v RETRY_COUNT ] && [ "$RETRY_COUNT" -gt 0 ] ; then + if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi + if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi +else + rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" fi +# Write unknown job status to the job log, we will fix it at the end +echo "$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} -if [ "$exitcode" -eq "0" ]; then - RESULT_STATUS="S" # Success +if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi +# Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result +if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then + sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} S/" "$JOB_RESULTS_FILE" else - RESULT_STATUS="F" # Failure -fi - -if [ -n "$FAULT_TOL_JOB_RESULTS_FILE" ]; then - mkdir -p "$(dirname "$FAULT_TOL_JOB_RESULTS_FILE")" - echo "$(hostname) $(date +%s) $RESULT_STATUS" >> "$FAULT_TOL_JOB_RESULTS_FILE" + sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} F/" "$JOB_RESULTS_FILE" fi -if [ "$exitcode" -eq "0" ]; then - if is_training_finished; then - echo "[FT-Teardown] Job finished successfully and flag file exists." - exit 0 - else - echo "[FT-Teardown] WARNING: Process exited 0 but finished flag is MISSING." - echo "[FT-Teardown] Forcing exit 1 to trigger Kubernetes restart." - exit 1 - fi -else - echo "[FT-Teardown] Job failed with exit code $exitcode." +if ! (is_training_finished || is_job_failures_limit_reached); then + scontrol requeue "$TORCHX_REPLICA_ID" exit $exitcode fi {%- endmacro %} From c7ab84368a67500017a9758a1ea8d3ffa0ac2b58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:01:23 +0000 Subject: [PATCH 15/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index b1549570..096db033 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -19,12 +19,8 @@ is_training_finished() { test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" } # Exit immediately if finished flag file exists and this job is a continuation -if [ -v RETRY_COUNT ] && [ "$RETRY_COUNT" -gt 0 ] ; then - if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi - if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi -else - rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" -fi +if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi +if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi # Write unknown job status to the job log, we will fix it at the end echo "$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X" >> "$JOB_RESULTS_FILE" From bd000a407e7455e9da22bd8448bfaf08267b76b7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:13:11 +0000 Subject: [PATCH 16/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 29 ++++++++++++++----- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 096db033..c08ba8e8 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -7,36 +7,49 @@ ANY_JOB_STEP_FAILED=0 # Automatic job resubmission related items JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" +# For k8s, we use pod restart count or a custom retry counter +RETRY_COUNT=${RETRY_COUNT:-0} +# Use a unique identifier for this job/pod +JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} + is_job_failures_limit_reached() { if [ $TORCHX_MAX_RETRIES -eq 0 ]; then true else - tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" | \ - awk "/^[[:alnum:]]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" + tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" 2>/dev/null | \ + awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" fi } is_training_finished() { test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" } # Exit immediately if finished flag file exists and this job is a continuation -if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi -if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi +if [ "$RETRY_COUNT" -gt 0 ] ; then + if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi + if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi +else + rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" +fi # Write unknown job status to the job log, we will fix it at the end -echo "$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X" >> "$JOB_RESULTS_FILE" +echo "$JOB_ID $RETRY_COUNT X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi # Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result +JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} +RETRY_COUNT=${RETRY_COUNT:-0} + if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then - sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} S/" "$JOB_RESULTS_FILE" + sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT S/" "$JOB_RESULTS_FILE" else - sed -i "s/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} X/$TORCHX_REPLICA_ID ${RETRY_COUNT:-0} F/" "$JOB_RESULTS_FILE" + sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT F/" "$JOB_RESULTS_FILE" fi +# On k8s, we exit with the appropriate code and let the retry policy handle resubmission +# Rather than explicitly requeueing like SLURM if ! (is_training_finished || is_job_failures_limit_reached); then - scontrol requeue "$TORCHX_REPLICA_ID" exit $exitcode fi {%- endmacro %} From 5f2fb8a0b3d9138f9385ace2e0745ec30f02326d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:20:24 +0000 Subject: [PATCH 17/24] test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 52 ++++++++++++++----- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index c08ba8e8..0717e64e 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -13,30 +13,48 @@ RETRY_COUNT=${RETRY_COUNT:-0} JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} is_job_failures_limit_reached() { - if [ $TORCHX_MAX_RETRIES -eq 0 ]; then - true + if [ ! -f "$JOB_RESULTS_FILE" ]; then + return 1 # File doesn't exist, limit not reached + fi + if [ "${TORCHX_MAX_RETRIES:-0}" -eq 0 ]; then + return 0 # 0 retries means limit is always reached else - tail -n $TORCHX_MAX_RETRIES "$JOB_RESULTS_FILE" 2>/dev/null | \ - awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=$TORCHX_MAX_RETRIES)}" + tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ + awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" fi } + is_training_finished() { - test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" + test -f "$(dirname "$JOB_RESULTS_FILE")/$(basename "$FAULT_TOL_FINISHED_FLAG_FILE")" } -# Exit immediately if finished flag file exists and this job is a continuation -if [ "$RETRY_COUNT" -gt 0 ] ; then - if is_training_finished ; then echo "Training is finished" ; exit 0 ; fi - if is_job_failures_limit_reached ; then echo "Job failures limit reached ($TORCHX_MAX_RETRIES)" ; exit 1 ; fi -else + +# Check if training is already finished +if is_training_finished ; then + echo "Training is finished" + exit 0 +fi + +# Check if we've hit the failure limit +if is_job_failures_limit_reached ; then + echo "Job failures limit reached (${TORCHX_MAX_RETRIES:-0})" + exit 1 +fi + +# Only clean up job results on the very first run +if [ "$RETRY_COUNT" -eq 0 ] ; then rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" fi +# Ensure directory exists +mkdir -p "$(dirname "$JOB_RESULTS_FILE")" + # Write unknown job status to the job log, we will fix it at the end echo "$JOB_ID $RETRY_COUNT X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi + # Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} RETRY_COUNT=${RETRY_COUNT:-0} @@ -47,9 +65,17 @@ else sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT F/" "$JOB_RESULTS_FILE" fi -# On k8s, we exit with the appropriate code and let the retry policy handle resubmission -# Rather than explicitly requeueing like SLURM -if ! (is_training_finished || is_job_failures_limit_reached); then +# Check final state +if is_training_finished ; then + echo "Training completed successfully" + exit 0 +elif is_job_failures_limit_reached ; then + echo "Job failures limit reached, giving up" + exit 1 +else + # Training not finished and we haven't hit retry limit + # Exit with failure code to trigger pod restart + echo "Training incomplete, exiting with code $exitcode to trigger retry" exit $exitcode fi {%- endmacro %} From c9b1755ac62e7421705910736df944393cd70a79 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 14:34:07 +0000 Subject: [PATCH 18/24] retries MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index 0717e64e..ab0ce885 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -13,15 +13,19 @@ RETRY_COUNT=${RETRY_COUNT:-0} JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} is_job_failures_limit_reached() { - if [ ! -f "$JOB_RESULTS_FILE" ]; then - return 1 # File doesn't exist, limit not reached - fi + # If TORCHX_MAX_RETRIES is 0 or unset, never reach the limit (infinite retries) if [ "${TORCHX_MAX_RETRIES:-0}" -eq 0 ]; then - return 0 # 0 retries means limit is always reached - else - tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ - awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" + return 1 # Limit not reached, allow retries + fi + + # If job results file doesn't exist yet, limit not reached + if [ ! -f "$JOB_RESULTS_FILE" ]; then + return 1 fi + + # Check if we have TORCHX_MAX_RETRIES failures in the log + tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ + awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" } is_training_finished() { From 6d3c34f945dd049e821b3cf2e05ba91b7ba909eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Tue, 16 Dec 2025 17:45:11 +0000 Subject: [PATCH 19/24] TORCHX_MAX_RETRIES MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/core/execution/templates/ft_launcher_dgxc.j2 | 1 + 1 file changed, 1 insertion(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index ab0ce885..c6993d17 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -4,6 +4,7 @@ export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" ANY_JOB_STEP_FAILED=0 +export TORCHX_MAX_RETRIES=3 # Automatic job resubmission related items JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" From cdffd23d152896a377f9b58a2a90a82918375a37 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 17 Dec 2025 17:59:08 +0000 Subject: [PATCH 20/24] cleanup MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../execution/templates/ft_launcher_dgxc.j2 | 78 +------------------ 1 file changed, 1 insertion(+), 77 deletions(-) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index c6993d17..e16ebf5d 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -3,84 +3,8 @@ # Fault tolerance related items export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" -ANY_JOB_STEP_FAILED=0 -export TORCHX_MAX_RETRIES=3 - -# Automatic job resubmission related items -JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" -# For k8s, we use pod restart count or a custom retry counter -RETRY_COUNT=${RETRY_COUNT:-0} -# Use a unique identifier for this job/pod -JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} - -is_job_failures_limit_reached() { - # If TORCHX_MAX_RETRIES is 0 or unset, never reach the limit (infinite retries) - if [ "${TORCHX_MAX_RETRIES:-0}" -eq 0 ]; then - return 1 # Limit not reached, allow retries - fi - - # If job results file doesn't exist yet, limit not reached - if [ ! -f "$JOB_RESULTS_FILE" ]; then - return 1 - fi - - # Check if we have TORCHX_MAX_RETRIES failures in the log - tail -n "${TORCHX_MAX_RETRIES}" "$JOB_RESULTS_FILE" 2>/dev/null | \ - awk "/^[[:alnum:]_-]+[[:space:]]+[[:alnum:]]+[[:space:]]+[XF]$/{f++} END{exit !(f>=${TORCHX_MAX_RETRIES})}" -} - -is_training_finished() { - test -f "$(dirname "$JOB_RESULTS_FILE")/$(basename "$FAULT_TOL_FINISHED_FLAG_FILE")" -} - -# Check if training is already finished -if is_training_finished ; then - echo "Training is finished" - exit 0 -fi - -# Check if we've hit the failure limit -if is_job_failures_limit_reached ; then - echo "Job failures limit reached (${TORCHX_MAX_RETRIES:-0})" - exit 1 -fi - -# Only clean up job results on the very first run -if [ "$RETRY_COUNT" -eq 0 ] ; then - rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" -fi - -# Ensure directory exists -mkdir -p "$(dirname "$JOB_RESULTS_FILE")" - -# Write unknown job status to the job log, we will fix it at the end -echo "$JOB_ID $RETRY_COUNT X" >> "$JOB_RESULTS_FILE" {%- endmacro %} {% macro ft_launcher_teardown() -%} -if [ $exitcode -ne 0 ]; then ANY_JOB_STEP_FAILED=1 ; fi - -# Fix the job log entry ("JOB_ID X" -> "JOB_ID S/F"), depending on the job result -JOB_ID=${HOSTNAME:-${TORCHX_REPLICA_ID:-unknown}} -RETRY_COUNT=${RETRY_COUNT:-0} - -if [ "$ANY_JOB_STEP_FAILED" = "0" ] ; then - sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT S/" "$JOB_RESULTS_FILE" -else - sed -i "s/$JOB_ID $RETRY_COUNT X/$JOB_ID $RETRY_COUNT F/" "$JOB_RESULTS_FILE" -fi - -# Check final state -if is_training_finished ; then - echo "Training completed successfully" - exit 0 -elif is_job_failures_limit_reached ; then - echo "Job failures limit reached, giving up" - exit 1 -else - # Training not finished and we haven't hit retry limit - # Exit with failure code to trigger pod restart - echo "Training incomplete, exiting with code $exitcode to trigger retry" - exit $exitcode -fi +exit $exitcode {%- endmacro %} From b5f0a1ab3380000837c232104c81e197eabcbe4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Wed, 17 Dec 2025 18:30:13 +0000 Subject: [PATCH 21/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- .../core/execution/templates/ft_launcher_dgxc.j2 | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 index e16ebf5d..150d8b0c 100644 --- a/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 +++ b/nemo_run/core/execution/templates/ft_launcher_dgxc.j2 @@ -3,6 +3,20 @@ # Fault tolerance related items export FAULT_TOL_CFG_PATH="{{fault_tol_cfg_path}}" export FAULT_TOL_FINISHED_FLAG_FILE="{{fault_tol_finished_flag_file}}" + +JOB_RESULTS_FILE="{{fault_tol_job_results_file}}" + +is_training_finished() { + test -f "$(dirname $JOB_RESULTS_FILE)/$(basename $FAULT_TOL_FINISHED_FLAG_FILE)" +} + +if is_training_finished ; then + echo "Training is finished"; + exit 0; +else + rm -f "$FAULT_TOL_FINISHED_FLAG_FILE" "$JOB_RESULTS_FILE" +fi + {%- endmacro %} {% macro ft_launcher_teardown() -%} From 9fc020c914cebd1f3b96c084948f831e43ac3419 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 20 Dec 2025 12:24:03 +0000 Subject: [PATCH 22/24] bump FT interface MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index 3920041f..23dc9de6 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -92,27 +92,27 @@ def ft_launcher( ): if workload_check_interval: ft_args += [ - "--ft-param-workload_check_interval", + "--ft-workload_check_interval", str(workload_check_interval), ] if initial_rank_heartbeat_timeout: ft_args += [ - "--ft-param-initial_rank_heartbeat_timeout", + "--ft-initial_rank_heartbeat_timeout", str(initial_rank_heartbeat_timeout), ] if rank_heartbeat_timeout: ft_args += [ - "--ft-param-rank_heartbeat_timeout", + "--ft-rank_heartbeat_timeout", str(rank_heartbeat_timeout), ] if rank_termination_signal: - ft_args += ["--ft-param-rank_termination_signal", rank_termination_signal] + ft_args += ["--ft-rank_termination_signal", rank_termination_signal] if log_level: - ft_args += ["--ft-param-log_level", log_level] + ft_args += ["--ft-log_level", log_level] if max_restarts: ft_args += ["--max-restarts", str(max_restarts)] From 8cd9ca3d50c99e1bc5c366ea4f7b64dbeaf0644d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 20 Dec 2025 13:02:54 +0000 Subject: [PATCH 23/24] --ft-use-infra-group-rank=False MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index 23dc9de6..fe125b93 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -117,6 +117,9 @@ def ft_launcher( if max_restarts: ft_args += ["--max-restarts", str(max_restarts)] + if dgxc is True: + ft_args = +["--ft-use-infra-group-rank", "False"] + else: ft_args = ["--ignore-missing-fault-tol-cfg"] From 1d5a101af4accf66e0b04ef398f78338b36071dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?oliver=20k=C3=B6nig?= Date: Sat, 20 Dec 2025 13:10:30 +0000 Subject: [PATCH 24/24] fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: oliver könig --- nemo_run/run/torchx_backend/components/ft_launcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nemo_run/run/torchx_backend/components/ft_launcher.py b/nemo_run/run/torchx_backend/components/ft_launcher.py index fe125b93..2395465f 100644 --- a/nemo_run/run/torchx_backend/components/ft_launcher.py +++ b/nemo_run/run/torchx_backend/components/ft_launcher.py @@ -118,7 +118,7 @@ def ft_launcher( ft_args += ["--max-restarts", str(max_restarts)] if dgxc is True: - ft_args = +["--ft-use-infra-group-rank", "False"] + ft_args += ["--ft-use-infra-group-rank", "False"] else: ft_args = ["--ignore-missing-fault-tol-cfg"]