Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 61 additions & 0 deletions sagemaker-mlops/src/sagemaker/mlops/workflow/mlflow_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"). You
# may not use this file except in compliance with the License. A copy of
# the License is located at
#
# http://aws.amazon.com/apache2.0/
#
# or in the "license" file accompanying this file. This file is
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
"""MLflow config for SageMaker pipeline."""
from __future__ import absolute_import

from typing import Dict, Any


class MlflowConfig:
"""MLflow configuration for SageMaker pipeline."""

def __init__(
self,
mlflow_resource_arn: str,
mlflow_experiment_name: str,
):
"""Create an MLflow configuration for SageMaker Pipeline.

Examples:
Basic MLflow configuration::

mlflow_config = MlflowConfig(
mlflow_resource_arn="arn:aws:sagemaker:us-west-2:123456789012:mlflow-tracking-server/my-server",
mlflow_experiment_name="my-experiment"
)

pipeline = Pipeline(
name="MyPipeline",
steps=[...],
mlflow_config=mlflow_config
)

Runtime override of experiment name::

# Override experiment name for a specific execution
execution = pipeline.start(mlflow_experiment_name="custom-experiment")

Args:
mlflow_resource_arn (str): The ARN of the MLflow tracking server resource.
mlflow_experiment_name (str): The name of the MLflow experiment to be used for tracking.
"""
self.mlflow_resource_arn = mlflow_resource_arn
self.mlflow_experiment_name = mlflow_experiment_name

def to_request(self) -> Dict[str, Any]:
"""Returns: the request structure."""

return {
"MlflowResourceArn": self.mlflow_resource_arn,
"MlflowExperimentName": self.mlflow_experiment_name,
}
49 changes: 39 additions & 10 deletions sagemaker-mlops/src/sagemaker/mlops/workflow/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,13 @@
from sagemaker.core.remote_function.job import JOBS_CONTAINER_ENTRYPOINT
from sagemaker.core.s3 import s3_path_join
from sagemaker.core.helper.session_helper import Session
from sagemaker.core.common_utils import resolve_value_from_config, retry_with_backoff, format_tags, Tags
from sagemaker.core.common_utils import (
resolve_value_from_config,
retry_with_backoff,
format_tags,
Tags,
)

# Orchestration imports (now in mlops)
from sagemaker.mlops.workflow.callback_step import CallbackOutput, CallbackStep
from sagemaker.mlops.workflow._event_bridge_client_helper import (
Expand All @@ -44,19 +50,24 @@
EXECUTION_TIME_PIPELINE_PARAMETER_FORMAT,
)
from sagemaker.mlops.workflow.lambda_step import LambdaOutput, LambdaStep
from sagemaker.mlops.workflow.mlflow_config import MlflowConfig
from sagemaker.core.helper.pipeline_variable import (
RequestType,
PipelineVariable,
)

# Primitive imports (stay in core)
from sagemaker.core.workflow.execution_variables import ExecutionVariables
from sagemaker.core.workflow.parameters import Parameter

# Orchestration imports (now in mlops)
from sagemaker.core.workflow.pipeline_definition_config import PipelineDefinitionConfig
from sagemaker.mlops.workflow.pipeline_experiment_config import PipelineExperimentConfig
from sagemaker.mlops.workflow.parallelism_config import ParallelismConfiguration

# Primitive imports (stay in core)
from sagemaker.core.workflow.properties import Properties

# Orchestration imports (now in mlops)
from sagemaker.mlops.workflow.selective_execution_config import SelectiveExecutionConfig
from sagemaker.core.workflow.step_outputs import StepOutput
Expand Down Expand Up @@ -87,6 +98,7 @@ def __init__(
name: str = "",
parameters: Optional[Sequence[Parameter]] = None,
pipeline_experiment_config: Optional[PipelineExperimentConfig] = _DEFAULT_EXPERIMENT_CFG,
mlflow_config: Optional[MlflowConfig] = None,
steps: Optional[Sequence[Union[Step, StepOutput]]] = None,
sagemaker_session: Optional[Session] = None,
pipeline_definition_config: Optional[PipelineDefinitionConfig] = _DEFAULT_DEFINITION_CFG,
Expand All @@ -102,6 +114,8 @@ def __init__(
the same name already exists. By default, pipeline name is used as
experiment name and execution id is used as the trial name.
If set to None, no experiment or trial will be created automatically.
mlflow_config (Optional[MlflowConfig]): If set, the pipeline will be configured
with MLflow tracking for experiment tracking and model versioning.
steps (Sequence[Union[Step, StepOutput]]): The list of the
non-conditional steps associated with the pipeline. Any steps that are within the
`if_steps` or `else_steps` of a `ConditionStep` cannot be listed in the steps of a
Expand All @@ -118,6 +132,7 @@ def __init__(
self.name = name
self.parameters = parameters if parameters else []
self.pipeline_experiment_config = pipeline_experiment_config
self.mlflow_config = mlflow_config
self.steps = steps if steps else []
self.sagemaker_session = sagemaker_session if sagemaker_session else Session()
self.pipeline_definition_config = pipeline_definition_config
Expand Down Expand Up @@ -355,6 +370,7 @@ def start(
execution_description: str = None,
parallelism_config: ParallelismConfiguration = None,
selective_execution_config: SelectiveExecutionConfig = None,
mlflow_experiment_name: str = None,
pipeline_version_id: int = None,
):
"""Starts a Pipeline execution in the Workflow service.
Expand All @@ -369,6 +385,10 @@ def start(
over the parallelism configuration of the parent pipeline.
selective_execution_config (Optional[SelectiveExecutionConfig]): The configuration for
selective step execution.
mlflow_experiment_name (str): Optional MLflow experiment name to override
the experiment name specified in the pipeline's mlflow_config.
If provided, this will override the experiment name for this specific
pipeline execution only, without modifying the pipeline definition.
pipeline_version_id (Optional[str]): version ID of the pipeline to start the execution from. If not
specified, uses the latest version ID.

Expand All @@ -392,6 +412,7 @@ def start(
PipelineExecutionDisplayName=execution_display_name,
ParallelismConfiguration=parallelism_config,
SelectiveExecutionConfig=selective_execution_config,
MlflowExperimentName=mlflow_experiment_name,
PipelineVersionId=pipeline_version_id,
)
if self.sagemaker_session.local_mode:
Expand Down Expand Up @@ -431,14 +452,25 @@ def definition(self) -> str:
if self.pipeline_experiment_config is not None
else None
),
"MlflowConfig": (
self.mlflow_config.to_request() if self.mlflow_config is not None else None
),
"Steps": list_to_request(compiled_steps),
}

request_dict["PipelineExperimentConfig"] = interpolate(
request_dict["PipelineExperimentConfig"], {}, {}, pipeline_name=self.name
)
callback_output_to_step_map = _map_callback_outputs(self.steps)
lambda_output_to_step_name = _map_lambda_outputs(self.steps)
request_dict["PipelineExperimentConfig"] = interpolate(
request_dict["PipelineExperimentConfig"],
callback_output_to_step_map=callback_output_to_step_map,
lambda_output_to_step_map=lambda_output_to_step_name,
pipeline_name=self.name,
)
request_dict["MlflowConfig"] = interpolate(
request_dict["MlflowConfig"],
callback_output_to_step_map=callback_output_to_step_map,
lambda_output_to_step_map=lambda_output_to_step_name,
pipeline_name=self.name,
)
request_dict["Steps"] = interpolate(
request_dict["Steps"],
callback_output_to_step_map=callback_output_to_step_map,
Expand Down Expand Up @@ -1131,7 +1163,6 @@ def _initialize_adjacency_list(self) -> Dict[str, List[str]]:
if isinstance(child_step, Step):
dependency_list[child_step.name].add(step.name)


adjacency_list = {}
for step in dependency_list:
for step_dependency in dependency_list[step]:
Expand Down Expand Up @@ -1169,9 +1200,7 @@ def is_cyclic_helper(current_step):
return True
return False

def get_steps_in_sub_dag(
self, current_step: Step, sub_dag_steps: Set[str] = None
) -> Set[str]:
def get_steps_in_sub_dag(self, current_step: Step, sub_dag_steps: Set[str] = None) -> Set[str]:
"""Get names of all steps (including current step) in the sub dag of current step.

Returns a set of step names in the sub dag.
Expand Down Expand Up @@ -1211,4 +1240,4 @@ def __next__(self) -> Step:

while self.stack:
return self.step_map.get(self.stack.pop())
raise StopIteration
raise StopIteration
Loading
Loading