Skip to content

Commit 517fdc3

Browse files
committed
Ensure Environment Secret Creation in Kubeflow
Add functionality to create and manage Kubernetes Secrets for environment variables in the KubeflowExecutor class. The new _method_ `_ensure_env_secret` verifies the existence of the secret and creates or updates it as necessary. This allows for better handling of sensitive environment configurations. Update the template rendering to include secrets as environment variables, enhancing the flexibility of environment variable management. - Implemented `_ensure_env_secret` to manage Secrets - Updated YAML template to include `env_from_secrets` - Added tests for secret creation and conflict handling Signed-off-by: Krishnaswamy Subramanian <subramk@thoughtworks.com>
1 parent 57c9ea2 commit 517fdc3

File tree

3 files changed

+131
-0
lines changed

3 files changed

+131
-0
lines changed

nemo_run/core/execution/kubeflow.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -373,6 +373,9 @@ def _create_cluster_training_runtime(self, configmap_name: str, sha: str) -> str
373373
# Ensure storage objects exist prior to runtime creation
374374
self._ensure_storage()
375375

376+
# Ensure env secret exists prior to runtime creation
377+
env_from_secrets: list[str] = self._ensure_env_secret(sha)
378+
376379
template_vars = {
377380
"runtime_name": runtime_name,
378381
"namespace": self.namespace,
@@ -385,6 +388,7 @@ def _create_cluster_training_runtime(self, configmap_name: str, sha: str) -> str
385388
"gpus": self.gpus,
386389
"enable_tcpxo": self.enable_tcpxo,
387390
"storage_pvc_mounts": self._get_normalized_storage_mounts(),
391+
"env_from_secrets": env_from_secrets,
388392
}
389393
rendered = fill_template(
390394
template_name="kubeflow_clustertrainingruntime.yaml.j2",
@@ -507,6 +511,37 @@ def _get_additional_files(self, task) -> dict[str, tuple[str, str]]:
507511

508512
return files_to_stage
509513

514+
def _ensure_env_secret(self, sha: str) -> list[str]:
515+
"""Ensure a Secret exists when env_vars are configured; return list of envFrom names."""
516+
if not self.env_vars:
517+
return []
518+
generated_secret_name = self._env_secret_name(sha)
519+
try:
520+
core_client = client.CoreV1Api()
521+
body = client.V1Secret(
522+
metadata=client.V1ObjectMeta(name=generated_secret_name, namespace=self.namespace),
523+
string_data=self.env_vars,
524+
type="Opaque",
525+
)
526+
core_client.create_namespaced_secret(namespace=self.namespace, body=body)
527+
logger.info(f"Created Secret {generated_secret_name} in {self.namespace}")
528+
except ApiException as e:
529+
if e.status == 409:
530+
# Secret exists; patch to ensure latest env_vars are reflected
531+
try:
532+
patch_body = {"stringData": self.env_vars, "type": "Opaque"}
533+
core_client.patch_namespaced_secret(
534+
name=generated_secret_name, namespace=self.namespace, body=patch_body
535+
)
536+
logger.info(
537+
f"Patched Secret {generated_secret_name} with updated stringData in {self.namespace}"
538+
)
539+
except Exception as patch_err:
540+
logger.warning(f"Failed to patch Secret {generated_secret_name}: {patch_err}")
541+
else:
542+
logger.warning(f"Failed to create Secret {generated_secret_name}: {e}")
543+
return [generated_secret_name]
544+
510545
def stage_files(self, task_dir: str, task=None) -> tuple[str, str]:
511546
"""Stage files using the packager.
512547
@@ -700,6 +735,11 @@ def _runtime_name(self, sha: str) -> str:
700735
identifier = self._get_experiment_identifier()
701736
return sanitize_kubernetes_name(f"nemo-runtime-{identifier}-{sha}")
702737

738+
def _env_secret_name(self, sha: str) -> str:
739+
"""Return a deterministic Secret name for env vars derived from experiment+sha."""
740+
identifier = self._get_experiment_identifier()
741+
return sanitize_kubernetes_name(f"nemo-env-{identifier}-{sha}")
742+
703743
def _get_staged_file_path(self, filename: str) -> str:
704744
"""Return path where a staged file would be mounted inside the container.
705745

nemo_run/core/execution/templates/kubeflow_clustertrainingruntime.yaml.j2

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,15 @@ spec:
125125
value: /usr/local/nvidia/lib64
126126
- name: NCCL_FASTRAK_LLCM_DEVICE_DIRECTORY
127127
value: /dev/aperture_devices
128+
{% if secret_env_vars and secret_env_vars|length > 0 %}
129+
{% for sev in secret_env_vars %}
130+
- name: {{ sev.name }}
131+
valueFrom:
132+
secretKeyRef:
133+
name: {{ sev.secret_name }}
134+
key: {{ sev.key }}
135+
{% endfor %}
136+
{% endif %}
128137
volumeMounts:
129138
- name: workspace
130139
mountPath: {{ workspace_mount_path }}

test/core/execution/test_kubeflow.py

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,28 @@ def test_crt_template_renders_storage_pvc(self):
138138
assert "mountPath: /mnt/a" in rendered
139139
assert "readOnly: true" in rendered
140140

141+
def test_crt_template_renders_envfrom_secret(self):
142+
rendered = fill_template(
143+
template_name="kubeflow_clustertrainingruntime.yaml.j2",
144+
variables={
145+
"runtime_name": "rt",
146+
"namespace": "ns",
147+
"nodes": 1,
148+
"image": "img",
149+
"workspace_mount_path": "/src",
150+
"configmap_name": "cfg",
151+
"cpu_limit": None,
152+
"memory_limit": None,
153+
"gpus": None,
154+
"enable_tcpxo": False,
155+
"storage_pvc_mounts": [],
156+
"env_from_secrets": ["my-secret"],
157+
},
158+
)
159+
160+
assert "envFrom:" in rendered
161+
assert "name: my-secret" in rendered
162+
141163
def test_pvc_creation_when_missing(self, mocker):
142164
# Configure an executor with a PVC that should be created
143165

@@ -374,6 +396,66 @@ def test_kubeflow_executor_get_custom_trainer_fallback():
374396
assert mounted_path in " ".join(call_args.get("args", []))
375397

376398

399+
class TestEnvSecretHandling:
400+
def test_secret_creation_without_conflict(self, mocker):
401+
executor = KubeflowExecutor(namespace="default")
402+
executor.packager = ConfigMapPackager()
403+
executor.assign("exp-abc", "/tmp/exp", "task-1", "task_dir")
404+
405+
executor.env_vars = {"CONFIG_KEY1": "xyz", "FOO": "bar"}
406+
407+
mock_core = mocker.patch("kubernetes.client.CoreV1Api")
408+
api = mock_core.return_value
409+
# No exception on create (no conflict)
410+
api.create_namespaced_secret.return_value = None
411+
412+
with patch("nemo_run.core.execution.kubeflow.fill_template") as ft:
413+
ft.return_value = "apiVersion: v1\nkind: ClusterTrainingRuntime\nmetadata: {}"
414+
with patch("kubernetes.client.CustomObjectsApi") as mock_coa:
415+
coa = mock_coa.return_value
416+
coa.create_cluster_custom_object.return_value = {}
417+
418+
executor._create_cluster_training_runtime(configmap_name="cfg", sha="beadfeed")
419+
420+
# Ensure create was called, and patch was NOT called
421+
assert api.create_namespaced_secret.called
422+
assert not api.patch_namespaced_secret.called
423+
424+
# Capture variables passed to template and assert env_from_secrets includes our secret
425+
called_vars = ft.call_args[1]["variables"]
426+
assert "env_from_secrets" in called_vars
427+
assert isinstance(called_vars["env_from_secrets"], list)
428+
assert len(called_vars["env_from_secrets"]) == 1
429+
430+
def test_secret_creation_and_patch_on_conflict(self, mocker):
431+
executor = KubeflowExecutor(namespace="default")
432+
executor.packager = ConfigMapPackager()
433+
# Simulate assignment to set experiment identifier used in secret name
434+
executor.assign("exp-xyz", "/tmp/exp", "task-1", "task_dir")
435+
436+
# Set env vars that should be converted to a Secret
437+
executor.env_vars = {"CONFIG_KEY1": "abc", "OTHER": "val"}
438+
439+
# Mock k8s CoreV1Api to simulate create 409 then patch
440+
mock_core = mocker.patch("kubernetes.client.CoreV1Api")
441+
api = mock_core.return_value
442+
from kubernetes.client.exceptions import ApiException
443+
444+
# First call: create raises 409 (already exists)
445+
api.create_namespaced_secret.side_effect = ApiException(status=409)
446+
447+
# Run ensure function indirectly via _create_cluster_training_runtime
448+
with patch("nemo_run.core.execution.kubeflow.fill_template") as ft:
449+
ft.return_value = "apiVersion: v1\nkind: ClusterTrainingRuntime\nmetadata: {}"
450+
with patch("kubernetes.client.CustomObjectsApi") as mock_coa:
451+
coa = mock_coa.return_value
452+
coa.create_cluster_custom_object.return_value = {}
453+
# Should call patch on conflict
454+
executor._create_cluster_training_runtime(configmap_name="cfg", sha="deadbeef")
455+
456+
assert api.patch_namespaced_secret.called
457+
458+
377459
def test_kubeflow_executor_create_trainjob():
378460
"""Test create_trainjob method."""
379461
executor = KubeflowExecutor(nodes=1)

0 commit comments

Comments
 (0)