diff --git a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py index 019c2bbbf45..354cb6b53ad 100644 --- a/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py +++ b/src/zenml/integrations/kubernetes/orchestrators/kubernetes_orchestrator.py @@ -30,6 +30,7 @@ # inspired by the Kubernetes dag runner implementation of tfx """Kubernetes-native orchestrator.""" +import json import os import random import socket @@ -48,6 +49,7 @@ from kubernetes import client as k8s_client from kubernetes import config as k8s_config +from kubernetes.client import ApiException from zenml.config.base_settings import BaseSettings from zenml.constants import ( @@ -81,7 +83,11 @@ from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType from zenml.models.v2.core.schedule import ScheduleUpdate -from zenml.orchestrators import ContainerizedOrchestrator, SubmissionResult +from zenml.orchestrators import ( + ContainerizedOrchestrator, + PipelineSubmissionError, + SubmissionResult, +) from zenml.stack import StackValidator if TYPE_CHECKING: @@ -468,7 +474,7 @@ def submit_pipeline( run_id=placeholder_run.id if placeholder_run else None, ) - return self._submit_orchestrator_job( + result = self._submit_orchestrator_job( snapshot=snapshot, command=command, args=args, @@ -476,6 +482,8 @@ def submit_pipeline( placeholder_run=placeholder_run, ) + return result + def submit_dynamic_pipeline( self, snapshot: "PipelineSnapshotResponse", @@ -720,6 +728,8 @@ def _submit_orchestrator_job( Raises: RuntimeError: If a schedule without cron expression is given. + PipelineSubmissionError: If an APIException happens during job submission, + it is propagated as custom error. Returns: Optional submission result. @@ -762,89 +772,99 @@ def _submit_orchestrator_job( settings, pipeline_name=snapshot.pipeline_configuration.name ) - with self._create_auth_secret_if_necessary( - snapshot, environment, orchestrator_pod_settings - ): - job_manifest = self._prepare_job_manifest( - name=job_name, - command=command, - args=args, - image=image, - environment=environment, - labels=labels, - annotations=annotations, - settings=settings, - pod_settings=orchestrator_pod_settings, - # In dynamic pipelines restarting the orchestrator pod is not - # supported yet. It will create new runs for each restart which - # we have to avoid. - backoff_limit=0 - if snapshot.is_dynamic - else settings.orchestrator_job_backoff_limit, - ) - - if snapshot.schedule: - if not snapshot.schedule.cron_expression: - raise RuntimeError( - "The Kubernetes orchestrator only supports scheduling via " - "CRON jobs, but the run was configured with a manual " - "schedule. Use `Schedule(cron_expression=...)` instead." - ) - cron_expression = snapshot.schedule.cron_expression - cron_job_manifest = build_cron_job_manifest( - cron_expression=cron_expression, - job_template=job_template_manifest_from_job(job_manifest), - successful_jobs_history_limit=settings.successful_jobs_history_limit, - failed_jobs_history_limit=settings.failed_jobs_history_limit, - ) - - cron_job = self._k8s_batch_api.create_namespaced_cron_job( - body=cron_job_manifest, - namespace=self.config.kubernetes_namespace, - ) - logger.info( - f"Created Kubernetes CronJob `{cron_job.metadata.name}` " - f"with CRON expression `{cron_expression}`." - ) - return SubmissionResult( - metadata={ - KUBERNETES_CRON_JOB_METADATA_KEY: cron_job.metadata.name, - } - ) - else: - kube_utils.create_job( - batch_api=self._k8s_batch_api, - namespace=self.config.kubernetes_namespace, - job_manifest=job_manifest, + try: + with self._create_auth_secret_if_necessary( + snapshot, environment, orchestrator_pod_settings + ): + job_manifest = self._prepare_job_manifest( + name=job_name, + command=command, + args=args, + image=image, + environment=environment, + labels=labels, + annotations=annotations, + settings=settings, + pod_settings=orchestrator_pod_settings, + # In dynamic pipelines restarting the orchestrator pod is not + # supported yet. It will create new runs for each restart which + # we have to avoid. + backoff_limit=0 + if snapshot.is_dynamic + else settings.orchestrator_job_backoff_limit, ) - if settings.synchronous: - - def _wait_for_run_to_finish() -> None: - logger.info( - "Waiting for orchestrator job to finish..." - ) - kube_utils.wait_for_job_to_finish( - batch_api=self._k8s_batch_api, - core_api=self._k8s_core_api, - namespace=self.config.kubernetes_namespace, - job_name=job_name, - backoff_interval=settings.job_monitoring_interval, - fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons, - stream_logs=True, + if snapshot.schedule: + if not snapshot.schedule.cron_expression: + raise RuntimeError( + "The Kubernetes orchestrator only supports scheduling via " + "CRON jobs, but the run was configured with a manual " + "schedule. Use `Schedule(cron_expression=...)` instead." ) + cron_expression = snapshot.schedule.cron_expression + cron_job_manifest = build_cron_job_manifest( + cron_expression=cron_expression, + job_template=job_template_manifest_from_job( + job_manifest + ), + successful_jobs_history_limit=settings.successful_jobs_history_limit, + failed_jobs_history_limit=settings.failed_jobs_history_limit, + ) + cron_job = self._k8s_batch_api.create_namespaced_cron_job( + body=cron_job_manifest, + namespace=self.config.kubernetes_namespace, + ) + logger.info( + f"Created Kubernetes CronJob `{cron_job.metadata.name}` " + f"with CRON expression `{cron_expression}`." + ) return SubmissionResult( - wait_for_completion=_wait_for_run_to_finish, + metadata={ + KUBERNETES_CRON_JOB_METADATA_KEY: cron_job.metadata.name, + } ) else: - logger.info( - f"Orchestrator job `{job_name}` started. " - f"Run the following command to inspect the logs: " - f"`kubectl -n {self.config.kubernetes_namespace} logs " - f"job/{job_name}`" + kube_utils.create_job( + batch_api=self._k8s_batch_api, + namespace=self.config.kubernetes_namespace, + job_manifest=job_manifest, ) - return None + + if settings.synchronous: + + def _wait_for_run_to_finish() -> None: + logger.info( + "Waiting for orchestrator job to finish..." + ) + kube_utils.wait_for_job_to_finish( + batch_api=self._k8s_batch_api, + core_api=self._k8s_core_api, + namespace=self.config.kubernetes_namespace, + job_name=job_name, + backoff_interval=settings.job_monitoring_interval, + fail_on_container_waiting_reasons=settings.fail_on_container_waiting_reasons, + stream_logs=True, + ) + + return SubmissionResult( + wait_for_completion=_wait_for_run_to_finish, + ) + else: + logger.info( + f"Orchestrator job `{job_name}` started. " + f"Run the following command to inspect the logs: " + f"`kubectl -n {self.config.kubernetes_namespace} logs " + f"job/{job_name}`" + ) + return None + + except ApiException as e: + body = json.loads(e.body or "{}") + raise PipelineSubmissionError( + f"Orchestrator failed to submit Kubernetes job with '{e.reason}' ({e.status}). " + f"{body.get('message', '')}" + ) def run_isolated_step( self, step_run_info: "StepRunInfo", environment: Dict[str, str] @@ -1211,15 +1231,31 @@ def delete_schedule(self, schedule: "ScheduleResponse") -> None: schedule: The schedule to delete. Raises: - RuntimeError: If the cron job name is not found. + ApiException: Kubernetes API error (other than 404) if deletion fails. """ cron_job_name = schedule.run_metadata.get( KUBERNETES_CRON_JOB_METADATA_KEY ) + if not cron_job_name: - raise RuntimeError("Unable to find cron job name for schedule.") + logger.warning( + "Unable to find cron job for schedule %s.", + schedule.name, + ) + return - self._k8s_batch_api.delete_namespaced_cron_job( - name=cron_job_name, - namespace=self.config.kubernetes_namespace, - ) + try: + self._k8s_batch_api.delete_namespaced_cron_job( + name=cron_job_name, + namespace=self.config.kubernetes_namespace, + ) + except ApiException as e: + if e.status == 404: + logger.warning( + "Unable to find cron job %s for schedule %s.", + cron_job_name, + schedule.name, + ) + return + else: + raise e diff --git a/src/zenml/orchestrators/__init__.py b/src/zenml/orchestrators/__init__.py index c6b35c0160a..b393af758c7 100644 --- a/src/zenml/orchestrators/__init__.py +++ b/src/zenml/orchestrators/__init__.py @@ -31,6 +31,9 @@ from zenml.orchestrators.containerized_orchestrator import ( ContainerizedOrchestrator, ) +from zenml.orchestrators.exceptions import ( + PipelineSubmissionError, +) from zenml.orchestrators.wheeled_orchestrator import ( WheeledOrchestrator, ) @@ -54,4 +57,5 @@ "LocalDockerOrchestrator", "LocalDockerOrchestratorFlavor", "SubmissionResult", + "PipelineSubmissionError" ] diff --git a/src/zenml/orchestrators/base_orchestrator.py b/src/zenml/orchestrators/base_orchestrator.py index 0d184177dbb..87ee1e82257 100644 --- a/src/zenml/orchestrators/base_orchestrator.py +++ b/src/zenml/orchestrators/base_orchestrator.py @@ -30,6 +30,7 @@ from pydantic import model_validator +from zenml.client import Client from zenml.constants import ( ENV_ZENML_PREVENT_CLIENT_SIDE_CACHING, handle_bool_env_var, @@ -43,6 +44,7 @@ from zenml.hooks.hook_validators import load_and_run_hook from zenml.logger import get_logger from zenml.metadata.metadata_types import MetadataType +from zenml.orchestrators.exceptions import PipelineSubmissionError from zenml.orchestrators.publish_utils import ( publish_pipeline_run_metadata, publish_pipeline_run_status_update, @@ -261,6 +263,8 @@ def run( Raises: RunMonitoringError: If a failure happened while monitoring the pipeline run. + PipelineSubmissionError: If a pipeline submission failed on the + orchestrator. """ self._prepare_run(snapshot=snapshot) @@ -373,6 +377,7 @@ def run( step_environments=step_environments, placeholder_run=placeholder_run, ) + if placeholder_run: publish_pipeline_run_status_update( pipeline_run_id=placeholder_run.id, @@ -425,7 +430,14 @@ def run( raise RunMonitoringError(original_exception=e) except BaseException as e: raise RunMonitoringError(original_exception=e) + except PipelineSubmissionError as e: + # clean-up actions in case of failure + + if snapshot.schedule: + # delete created DB schedules + Client().zen_store.delete_schedule(snapshot.schedule.id) + raise e finally: self._cleanup_run() diff --git a/src/zenml/orchestrators/exceptions.py b/src/zenml/orchestrators/exceptions.py new file mode 100644 index 00000000000..0b48881e19a --- /dev/null +++ b/src/zenml/orchestrators/exceptions.py @@ -0,0 +1,31 @@ +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# Copyright (c) ZenML GmbH 2025. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +# +# Parts of the `prepare_or_run_pipeline()` method of this file are +# inspired by the Kubernetes dag runner implementation of tfx +"""Orchestrator-specific exceptions.""" + + +class PipelineSubmissionError(Exception): + """Raised when the orchestrator fails to submit pipeline for execution.""" + + pass