From 721a83dab4a66680d22732b5c5e84d6c8d92d75f Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Thu, 13 Mar 2025 16:08:50 -0700 Subject: [PATCH 01/26] first round of jobmon backend changes --- src/onemod/backend/jobmon_backend.py | 53 ++++++++++++++++++++++++---- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 7f8c8eb1..d4be115a 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -49,6 +49,7 @@ from jobmon.client.api import Tool from jobmon.client.task import Task from jobmon.client.task_template import TaskTemplate +from jobmon.client.workflow import Workflow from pydantic import validate_call from onemod.backend.utils import ( @@ -72,6 +73,7 @@ def evaluate_with_jobmon( subsets: dict[str, Any | list[Any]] | None = None, paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, + workflow: Workflow | None = None, **kwargs, ) -> None: """Evaluate pipeline or stage method with Jobmon. @@ -112,6 +114,17 @@ def evaluate_with_jobmon( instance. If `subsets` and `paramsets` are both None, default is True, otherwise default is False. + Jobmon Parameters + ----------------- + workflow : Workflow, optional + Instantiated Jobmon workflow. If passed, add Pipeline and Stage + tasks to the existing Jobmon workflow, rather than creating a new + workflow. Additionally, do not run the workflow; just add the + tasks. Default is None, which will result in creating and running + a new Jobmon workflow. + template_concurency_limits : dict, optional + + """ check_method(model, method) check_input_exists(model, stages) @@ -119,7 +132,10 @@ def evaluate_with_jobmon( python = str(sys.executable) resources_dict = get_resources(resources) - tool = get_tool(model.name, method, cluster, resources_dict) + if workflow: + tool = workflow.tool + else: + tool = get_tool(model.name, method, cluster, resources_dict) tasks = get_tasks( model, method, @@ -132,7 +148,18 @@ def evaluate_with_jobmon( collect, **kwargs, ) - run_workflow(model.name, method, tool, tasks) + if not workflow: + workflow = create_workflow(model.name, method, tasks, tool) + set_task_template_concurrency(workflow) + run_workflow(workflow) + else: + workflow.add_tasks(tasks) + set_task_template_concurrency(workflow) + + +def set_task_template_concurrency(workflow: Workflow) -> None: + """TODO""" + pass def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: @@ -649,8 +676,10 @@ def get_task_resources( } -def run_workflow(name: str, method: str, tool: Tool, tasks: list[Task]) -> None: - """Create and run workflow. +def create_workflow( + name: str, method: str, tasks: list[Task], tool: Tool +) -> Workflow: + """Create workflow. Parameters ---------- @@ -658,15 +687,27 @@ def run_workflow(name: str, method: str, tool: Tool, tasks: list[Task]) -> None: Pipeline or stage name. method : str Name of method being evaluated. - tool : Tool - Jobmon tool. tasks : list of Task List of stage tasks. + tool : Tool + Jobmon tool. """ workflow = tool.create_workflow(name=f"{name}_{method}") workflow.add_tasks(tasks) workflow.bind() + return workflow + + +def run_workflow(workflow: Workflow) -> None: + """Run workflow. + + Parameters + ---------- + workflow : Workflow + Jobmon workflow to run. + + """ print(f"Starting workflow {workflow.workflow_id}") status = workflow.run() if status != "D": From ba5f765ae52bec1861b09740fc16858a0e42eab9 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Fri, 14 Mar 2025 10:00:47 -0700 Subject: [PATCH 02/26] making changes smaller scope --- src/onemod/backend/jobmon_backend.py | 40 +++++++--------------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index d4be115a..524b9956 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -50,7 +50,7 @@ from jobmon.client.task import Task from jobmon.client.task_template import TaskTemplate from jobmon.client.workflow import Workflow -from pydantic import validate_call +from pydantic import ConfigDict, validate_call from onemod.backend.utils import ( check_input_exists, @@ -62,7 +62,7 @@ from onemod.stage import Stage -@validate_call +@validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def evaluate_with_jobmon( model: Pipeline | Stage, method: Literal["run", "fit", "predict", "collect"], @@ -122,8 +122,6 @@ def evaluate_with_jobmon( workflow. Additionally, do not run the workflow; just add the tasks. Default is None, which will result in creating and running a new Jobmon workflow. - template_concurency_limits : dict, optional - """ check_method(model, method) @@ -149,17 +147,9 @@ def evaluate_with_jobmon( **kwargs, ) if not workflow: - workflow = create_workflow(model.name, method, tasks, tool) - set_task_template_concurrency(workflow) - run_workflow(workflow) + create_and_run_workflow(model.name, method, tasks, tool) else: workflow.add_tasks(tasks) - set_task_template_concurrency(workflow) - - -def set_task_template_concurrency(workflow: Workflow) -> None: - """TODO""" - pass def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: @@ -676,10 +666,10 @@ def get_task_resources( } -def create_workflow( - name: str, method: str, tasks: list[Task], tool: Tool -) -> Workflow: - """Create workflow. +def create_and_run_workflow( + name: str, method: str, tool: Tool, tasks: list[Task] +) -> None: + """Create and run workflow. Parameters ---------- @@ -687,27 +677,15 @@ def create_workflow( Pipeline or stage name. method : str Name of method being evaluated. - tasks : list of Task - List of stage tasks. tool : Tool Jobmon tool. + tasks : list of Task + List of stage tasks. """ workflow = tool.create_workflow(name=f"{name}_{method}") workflow.add_tasks(tasks) workflow.bind() - return workflow - - -def run_workflow(workflow: Workflow) -> None: - """Run workflow. - - Parameters - ---------- - workflow : Workflow - Jobmon workflow to run. - - """ print(f"Starting workflow {workflow.workflow_id}") status = workflow.run() if status != "D": From 8eadf9d7057409d731f762d57555b86c0fa8a782 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Fri, 14 Mar 2025 11:29:44 -0700 Subject: [PATCH 03/26] adding max attempts and external upstreams --- src/onemod/backend/jobmon_backend.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 524b9956..2e4f841b 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -398,6 +398,9 @@ def get_stage_tasks( paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, upstream_tasks: list[Task] | None = None, + external_upstream_tasks: list[Task] = [], + template_prefix: str | None = None, + max_attempts: int = 1, **kwargs, ) -> list[Task]: """Get stage tasks. @@ -426,6 +429,13 @@ def get_stage_tasks( True, otherwise default is False. upstream_tasks : list of Task or None, optional List of upstream stage tasks. Default is None. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. + template_prefix : str, optional + Optional prefix to append to task name. Default is None, no prefix. + max_attempts : int, optional + Maximum number of attempts for a task. Default is 1. **kwargs Additional keyword arguments passed to stage method. @@ -451,11 +461,17 @@ def get_stage_tasks( **kwargs, ) + task_name = ( + f"{template_prefix}_{stage.name}_{method}" + if template_prefix + else f"{stage.name}_{method}" + ) + if submodel_args: tasks = task_template.create_tasks( - name=f"{stage.name}_{method}", - upstream_tasks=upstream_tasks, - max_attempts=1, + name=task_name, + upstream_tasks=upstream_tasks + external_upstream_tasks, + max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, method=method, @@ -465,9 +481,9 @@ def get_stage_tasks( else: tasks = [ task_template.create_task( - name=f"{stage.name}_{method}", - upstream_tasks=upstream_tasks, - max_attempts=1, + name=task_name, + upstream_tasks=upstream_tasks + external_upstream_tasks, + max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, method=method, From 8ca2395e3d9bee403d7bffaba203474be88f611a Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Fri, 14 Mar 2025 11:39:32 -0700 Subject: [PATCH 04/26] pulling out arg --- src/onemod/backend/jobmon_backend.py | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 2e4f841b..ebc75bbc 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -398,7 +398,7 @@ def get_stage_tasks( paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, upstream_tasks: list[Task] | None = None, - external_upstream_tasks: list[Task] = [], + external_upstream_tasks: list[Task] | None = None, template_prefix: str | None = None, max_attempts: int = 1, **kwargs, @@ -431,7 +431,8 @@ def get_stage_tasks( List of upstream stage tasks. Default is None. external_upstream_tasks : list, optional List of Jobmon tasks external to the OneMod Stages or Pipeline that - should be treated as upstream dependencies of the new tasks. + should be treated as upstream dependencies of the new tasks. Default + is no external upstream tasks. template_prefix : str, optional Optional prefix to append to task name. Default is None, no prefix. max_attempts : int, optional @@ -466,11 +467,16 @@ def get_stage_tasks( if template_prefix else f"{stage.name}_{method}" ) + all_upstream_tasks = ( + upstream_tasks + external_upstream_tasks + if external_upstream_tasks + else upstream_tasks + ) if submodel_args: tasks = task_template.create_tasks( name=task_name, - upstream_tasks=upstream_tasks + external_upstream_tasks, + upstream_tasks=all_upstream_tasks, max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, @@ -482,7 +488,7 @@ def get_stage_tasks( tasks = [ task_template.create_task( name=task_name, - upstream_tasks=upstream_tasks + external_upstream_tasks, + upstream_tasks=all_upstream_tasks, max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, From e96993056ad56b7e09c11d27bbea425d593f1983 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Fri, 14 Mar 2025 15:46:42 -0700 Subject: [PATCH 05/26] adding e2e tests --- src/onemod/backend/jobmon_backend.py | 2 +- tests/e2e/test_e2e_jobmon_backend.py | 2 + .../test_integration_jobmon_backend.py | 52 +++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index ebc75bbc..0dd784b8 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -147,7 +147,7 @@ def evaluate_with_jobmon( **kwargs, ) if not workflow: - create_and_run_workflow(model.name, method, tasks, tool) + create_and_run_workflow(model.name, method, tool, tasks) else: workflow.add_tasks(tasks) diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index dff27e13..6f6946f5 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -14,6 +14,8 @@ "cluster": "dummy", "resources": {"tool_resources": {"dummy": {"queue": "null.q"}}}, "python": None, + "template_prefix": "jobmon_e2e_testing", + "max_attempts": 3, } STAGE_KWARGS = {**KWARGS, "subsets": None, "paramsets": None, "collect": None} diff --git a/tests/integration/test_integration_jobmon_backend.py b/tests/integration/test_integration_jobmon_backend.py index 5b4602c5..df1980aa 100644 --- a/tests/integration/test_integration_jobmon_backend.py +++ b/tests/integration/test_integration_jobmon_backend.py @@ -2,6 +2,7 @@ from collections import defaultdict from pathlib import Path +from unittest import mock import pytest @@ -430,3 +431,54 @@ def test_stage_tasks_collect_after(parallel_pipeline, method): else: assert len(tasks) == 2 assert tasks[1].task_args["method"] == "collect" + + +@pytest.mark.integration +@pytest.mark.requires_jobmon +def test_stage_tasks_jobmon_args(simple_pipeline): + stage = simple_pipeline.stages["run_1"] + method = "run" + cluster = "cluster" + resources = {"tool_resources": {cluster: {"queue": "null.q"}}} + python = "/path/to/python/env/bin/python" + external_upstream_tasks = [ + jb.Task( + node=mock.MagicMock(), + task_args={"fake_arg_1": "fake_value"}, + op_args={"fake_arg_2": "fake_value"}, + name="fake_task", + task_attributes=[], + ) + ] + template_prefix = "testing" + max_attempts = 3 + entrypoint = str(Path(python).parent / "onemod") + config = str(stage.dataif.get_path("config")) + tasks = jb.get_stage_tasks( + stage, + method, + jb.get_tool(simple_pipeline.name, method, cluster, resources), + resources=resources, + python=python, + external_upstream_tasks=external_upstream_tasks, + template_prefix=template_prefix, + max_attempts=max_attempts, + ) + task = tasks[0] + + assert len(tasks) == 1 + assert task.name == f"{template_prefix}_{stage.name}_{method}" + assert task.cluster_name == "" + assert task.compute_resources == {} + assert task.command == jb.get_command_template(method, []).format( + entrypoint=entrypoint, config=config, method=method, stages=stage.name + ) + assert task.max_attempts == max_attempts + assert task.op_args == {"entrypoint": entrypoint} + assert task.task_args == { + "config": config, + "method": method, + "stages": stage.name, + } + assert task.node.node_args == {} + assert task.upstream_tasks == set(external_upstream_tasks) From d0b232219b1bf802fa4331ac70206d391a733bae Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Mon, 17 Mar 2025 11:27:00 -0700 Subject: [PATCH 06/26] adding existing wf e2e test --- tests/e2e/test_e2e_jobmon_backend.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index 6f6946f5..6cc8d03f 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -8,6 +8,7 @@ """ import pytest +from jobmon.client.api import Tool KWARGS = { "backend": "jobmon", @@ -59,6 +60,20 @@ def test_parallel_pipeline_stages(parallel_pipeline): ) +@pytest.mark.e2e +@pytest.mark.requires_jobmon +def test_parallel_pipeline_stages_existing_workflow(parallel_pipeline): + tool = Tool(name="test_run") + tool.set_default_cluster_name("dummy") + tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) + workflow = tool.create_workflow(name="test_run_workflow") + parallel_pipeline.evaluate( + method="run", stages=["run_1", "fit_2"], workflow=workflow, **KWARGS + ) + workflow.bind() + workflow.run() + + @pytest.mark.e2e @pytest.mark.requires_jobmon @pytest.mark.parametrize("method", ["run", "fit", "predict"]) From 48a2bccaf8ef63004e82235b4293b6646cba27e3 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Mon, 17 Mar 2025 12:02:22 -0700 Subject: [PATCH 07/26] adjusting arg name for task prefix --- src/onemod/backend/jobmon_backend.py | 14 +++++++------- tests/e2e/test_e2e_jobmon_backend.py | 2 +- .../integration/test_integration_jobmon_backend.py | 6 +++--- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 0dd784b8..49e4fd15 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -146,10 +146,10 @@ def evaluate_with_jobmon( collect, **kwargs, ) - if not workflow: - create_and_run_workflow(model.name, method, tool, tasks) - else: + if workflow: workflow.add_tasks(tasks) + else: + create_and_run_workflow(model.name, method, tool, tasks) def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: @@ -399,7 +399,7 @@ def get_stage_tasks( collect: bool | None = None, upstream_tasks: list[Task] | None = None, external_upstream_tasks: list[Task] | None = None, - template_prefix: str | None = None, + task_prefix: str | None = None, max_attempts: int = 1, **kwargs, ) -> list[Task]: @@ -433,7 +433,7 @@ def get_stage_tasks( List of Jobmon tasks external to the OneMod Stages or Pipeline that should be treated as upstream dependencies of the new tasks. Default is no external upstream tasks. - template_prefix : str, optional + task_prefix : str, optional Optional prefix to append to task name. Default is None, no prefix. max_attempts : int, optional Maximum number of attempts for a task. Default is 1. @@ -463,8 +463,8 @@ def get_stage_tasks( ) task_name = ( - f"{template_prefix}_{stage.name}_{method}" - if template_prefix + f"{task_prefix}_{stage.name}_{method}" + if task_prefix else f"{stage.name}_{method}" ) all_upstream_tasks = ( diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index 6cc8d03f..4cc3fa1a 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -15,7 +15,7 @@ "cluster": "dummy", "resources": {"tool_resources": {"dummy": {"queue": "null.q"}}}, "python": None, - "template_prefix": "jobmon_e2e_testing", + "task_prefix": "jobmon_e2e_testing", "max_attempts": 3, } diff --git a/tests/integration/test_integration_jobmon_backend.py b/tests/integration/test_integration_jobmon_backend.py index df1980aa..099017ea 100644 --- a/tests/integration/test_integration_jobmon_backend.py +++ b/tests/integration/test_integration_jobmon_backend.py @@ -450,7 +450,7 @@ def test_stage_tasks_jobmon_args(simple_pipeline): task_attributes=[], ) ] - template_prefix = "testing" + task_prefix = "testing" max_attempts = 3 entrypoint = str(Path(python).parent / "onemod") config = str(stage.dataif.get_path("config")) @@ -461,13 +461,13 @@ def test_stage_tasks_jobmon_args(simple_pipeline): resources=resources, python=python, external_upstream_tasks=external_upstream_tasks, - template_prefix=template_prefix, + task_prefix=task_prefix, max_attempts=max_attempts, ) task = tasks[0] assert len(tasks) == 1 - assert task.name == f"{template_prefix}_{stage.name}_{method}" + assert task.name == f"{task_prefix}_{stage.name}_{method}" assert task.cluster_name == "" assert task.compute_resources == {} assert task.command == jb.get_command_template(method, []).format( From 79adf9aff7e88010400a03b83addf7be85184531 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Mon, 17 Mar 2025 12:16:11 -0700 Subject: [PATCH 08/26] updating arg description --- src/onemod/backend/jobmon_backend.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 49e4fd15..b8680f3f 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -117,11 +117,11 @@ def evaluate_with_jobmon( Jobmon Parameters ----------------- workflow : Workflow, optional - Instantiated Jobmon workflow. If passed, add Pipeline and Stage - tasks to the existing Jobmon workflow, rather than creating a new - workflow. Additionally, do not run the workflow; just add the - tasks. Default is None, which will result in creating and running - a new Jobmon workflow. + Instantiated Jobmon workflow. If passed, add new tasks to the + existing Jobmon workflow rather than creating a new workflow. + Additionally, do not run the workflow; just add the tasks. Default + is None, which will result in creating and running a new Jobmon + workflow. """ check_method(model, method) From 7638954d02e578d2e258c709e2fd95af4bce2389 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Mon, 17 Mar 2025 14:43:37 -0700 Subject: [PATCH 09/26] reworking task prefix arg --- src/onemod/backend/jobmon_backend.py | 26 ++++++++++++++----- tests/e2e/test_e2e_jobmon_backend.py | 2 +- .../test_integration_jobmon_backend.py | 6 ++--- 3 files changed, 24 insertions(+), 10 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index b8680f3f..e218cfe0 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -399,7 +399,7 @@ def get_stage_tasks( collect: bool | None = None, upstream_tasks: list[Task] | None = None, external_upstream_tasks: list[Task] | None = None, - task_prefix: str | None = None, + task_and_template_prefix: str | None = None, max_attempts: int = 1, **kwargs, ) -> list[Task]: @@ -433,8 +433,9 @@ def get_stage_tasks( List of Jobmon tasks external to the OneMod Stages or Pipeline that should be treated as upstream dependencies of the new tasks. Default is no external upstream tasks. - task_prefix : str, optional - Optional prefix to append to task name. Default is None, no prefix. + task_and_template_prefix : str, optional + Optional prefix to append to task/tempalte name. Default is None, + no prefix. max_attempts : int, optional Maximum number of attempts for a task. Default is 1. **kwargs @@ -463,8 +464,8 @@ def get_stage_tasks( ) task_name = ( - f"{task_prefix}_{stage.name}_{method}" - if task_prefix + f"{task_and_template_prefix}_{stage.name}_{method}" + if task_and_template_prefix else f"{stage.name}_{method}" ) all_upstream_tasks = ( @@ -570,10 +571,14 @@ def get_task_template( tool: Tool, resources: dict[str, Any], submodel_args: list[str], + task_and_template_prefix: str | None = None, **kwargs, ) -> TaskTemplate: """Get stage task template. + If the Jobmon Tool already has an active task template with the same + name, use that task template. + Parameters ---------- stage_name : str @@ -586,6 +591,9 @@ def get_task_template( Dictionary of compute resources. submodel_args : list of str List including 'subsets' and/or 'paramsets'. + task_and_template_prefix : str, optional + Optional prefix to append to task/tempalte name. Default is None, + no prefix. **kwargs Additional keyword arguments passed to stage method. @@ -595,8 +603,14 @@ def get_task_template( Stage task template. """ + template_name = ( + f"{task_and_template_prefix}_{stage_name}_{method}" + if task_and_template_prefix + else f"{stage_name}_{method}" + ) + task_template = tool.get_task_template( - template_name=f"{stage_name}_{method}", + template_name=template_name, command_template=get_command_template(method, submodel_args, **kwargs), op_args=["entrypoint"], task_args=["config", "method", "stages"] + list(kwargs.keys()), diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index 4cc3fa1a..c8f1584e 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -15,7 +15,7 @@ "cluster": "dummy", "resources": {"tool_resources": {"dummy": {"queue": "null.q"}}}, "python": None, - "task_prefix": "jobmon_e2e_testing", + "task_and_template_prefix": "jobmon_e2e_testing", "max_attempts": 3, } diff --git a/tests/integration/test_integration_jobmon_backend.py b/tests/integration/test_integration_jobmon_backend.py index 099017ea..bc7d25d9 100644 --- a/tests/integration/test_integration_jobmon_backend.py +++ b/tests/integration/test_integration_jobmon_backend.py @@ -450,7 +450,7 @@ def test_stage_tasks_jobmon_args(simple_pipeline): task_attributes=[], ) ] - task_prefix = "testing" + task_and_template_prefix = "testing" max_attempts = 3 entrypoint = str(Path(python).parent / "onemod") config = str(stage.dataif.get_path("config")) @@ -461,13 +461,13 @@ def test_stage_tasks_jobmon_args(simple_pipeline): resources=resources, python=python, external_upstream_tasks=external_upstream_tasks, - task_prefix=task_prefix, + task_and_template_prefix=task_and_template_prefix, max_attempts=max_attempts, ) task = tasks[0] assert len(tasks) == 1 - assert task.name == f"{task_prefix}_{stage.name}_{method}" + assert task.name == f"{task_and_template_prefix}_{stage.name}_{method}" assert task.cluster_name == "" assert task.compute_resources == {} assert task.command == jb.get_command_template(method, []).format( From 54d8f79d3d8da521a7e8c6509c53de58e825d50a Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Mon, 17 Mar 2025 14:47:52 -0700 Subject: [PATCH 10/26] fixing tempalte typo --- src/onemod/backend/jobmon_backend.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index e218cfe0..96e839f8 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -434,7 +434,7 @@ def get_stage_tasks( should be treated as upstream dependencies of the new tasks. Default is no external upstream tasks. task_and_template_prefix : str, optional - Optional prefix to append to task/tempalte name. Default is None, + Optional prefix to append to task/template name. Default is None, no prefix. max_attempts : int, optional Maximum number of attempts for a task. Default is 1. @@ -592,7 +592,7 @@ def get_task_template( submodel_args : list of str List including 'subsets' and/or 'paramsets'. task_and_template_prefix : str, optional - Optional prefix to append to task/tempalte name. Default is None, + Optional prefix to append to task/template name. Default is None, no prefix. **kwargs Additional keyword arguments passed to stage method. From 1c8064b5f6fa47dbb5e460897cff25b15b04f00c Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Tue, 18 Mar 2025 15:29:55 -0700 Subject: [PATCH 11/26] PR comment updates --- CHANGELOG.md | 7 + docs/meta.toml | 2 +- pyproject.toml | 2 +- src/onemod/backend/jobmon_backend.py | 251 +++++++++++++----- src/onemod/pipeline.py | 27 +- tests/e2e/test_e2e_jobmon_backend.py | 42 ++- .../test_integration_jobmon_backend.py | 117 +++++++- 7 files changed, 357 insertions(+), 91 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f6433f43..0c00a3dc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,13 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ## [Unreleased] +## [1.1.0] - 2025-03-18 + +### Added + +- Added new pipeline method `add_tasks_to_workflow` for adding tasks to an existing Jobmon workflow +- Added new Jobmon-related arguments: `external_upstream_dependencies`, `task_and_template_prefix`, and `max_attempts`. + ## [1.0.3] - 2025-03-12 ### Changed diff --git a/docs/meta.toml b/docs/meta.toml index df70cb55..5c099d52 100644 --- a/docs/meta.toml +++ b/docs/meta.toml @@ -1,3 +1,3 @@ versions = [ - "1.0.3", + "1.1.0", ] diff --git a/pyproject.toml b/pyproject.toml index 1cb3bba8..6c89863c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "onemod" -version = "1.0.3" +version = "1.1.0" description = "An orchestration package for statistical modeling pipelines." readme = "README.md" requires-python = ">=3.10, <3.13" diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 96e839f8..5874cf8f 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -62,7 +62,7 @@ from onemod.stage import Stage -@validate_call(config=ConfigDict(arbitrary_types_allowed=True)) +@validate_call def evaluate_with_jobmon( model: Pipeline | Stage, method: Literal["run", "fit", "predict", "collect"], @@ -73,7 +73,8 @@ def evaluate_with_jobmon( subsets: dict[str, Any | list[Any]] | None = None, paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, - workflow: Workflow | None = None, + task_and_template_prefix: str | None = None, + max_attempts: int = 1, **kwargs, ) -> None: """Evaluate pipeline or stage method with Jobmon. @@ -116,12 +117,11 @@ def evaluate_with_jobmon( Jobmon Parameters ----------------- - workflow : Workflow, optional - Instantiated Jobmon workflow. If passed, add new tasks to the - existing Jobmon workflow rather than creating a new workflow. - Additionally, do not run the workflow; just add the tasks. Default - is None, which will result in creating and running a new Jobmon - workflow. + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. Default is None, + no prefix. + max_attempts : int + Maximum number of attempts for a task. Default is 1. """ check_method(model, method) @@ -130,26 +130,115 @@ def evaluate_with_jobmon( python = str(sys.executable) resources_dict = get_resources(resources) - if workflow: - tool = workflow.tool - else: - tool = get_tool(model.name, method, cluster, resources_dict) + tool = get_tool(model.name, method, cluster, resources_dict) tasks = get_tasks( - model, - method, - tool, - resources_dict, - python, - stages, - subsets, - paramsets, - collect, + model=model, + method=method, + tool=tool, + resources=resources_dict, + python=python, + stages=stages, + subsets=subsets, + paramsets=paramsets, + collect=collect, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, **kwargs, ) - if workflow: - workflow.add_tasks(tasks) - else: - create_and_run_workflow(model.name, method, tool, tasks) + create_and_run_workflow(model.name, method, tool, tasks) + + +@validate_call(config=ConfigDict(arbitrary_types_allowed=True)) +def add_tasks_to_workflow( + workflow: Workflow, + model: Pipeline | Stage, + method: Literal["run", "fit", "predict", "collect"], + resources: Path | str | dict[str, Any], + python: Path | str | None = None, + stages: list[str] | None = None, + subsets: dict[str, Any | list[Any]] | None = None, + paramsets: dict[str, Any | list[Any]] | None = None, + collect: bool | None = None, + task_and_template_prefix: str | None = None, + max_attempts: int = 1, + external_upstream_tasks: list[Task] | None = None, + **kwargs, +) -> None: + """Add Pipeline tasks to an existing Jobmon Workflow. + + Parameters + ---------- + workflow : Workflow + Instantiated Jobmon workflow. Add new tasks to an existing Jobmon + workflow rather than creating a new workflow. Does not run the + workflow, only adds the tasks. + model : Pipeline or Stage + Pipeline or stage instance. + method : {'run', 'fit', 'predict', 'collect'} + Name of method to evalaute. + resources : dict, Path, or str + Path to resources file or dictionary of compute resources. + python : Path or str, optional + Path to Python environment. If None, use sys.executable. + Default is None. + **kwargs + Additional keyword arguments passed to stage methods. If `model` + is a `Pipeline` instance, use format`stage={arg_name: arg_value}`. + + Pipeline Parameters + ------------------- + stages : list of str, optional + Names of stages to evaluate if `model` is a `Pipeline` instance. + If None, evaluate pipeline stages. Default is None. + + Stage Parameters + ---------------- + subsets : dict, optional + Submodel data subsets to evaluate if `model` is a `Stage` + instance. If None, evaluate all data subsets. Default is None. + paramsets : dict, optional + Submodel parameter sets to evaluate if `model` is a `Stage` + instance. If None, evaluate all parameter sets. Default is None. + collect : bool, optional + Whether to collect submodel results if `model` is a `Stage` + instance. If `subsets` and `paramsets` are both None, default is + True, otherwise default is False. + + Jobmon Parameters + ----------------- + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. Default is None, + no prefix. + max_attempts : int + Maximum number of attempts for a task. Default is 1. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. Default + is no external upstream tasks. + + """ + check_method(model, method) + check_input_exists(model, stages) + if python is None: + python = str(sys.executable) + + resources_dict = get_resources(resources) + tasks = get_tasks( + model=model, + method=method, + tool=workflow.tool, + resources=resources_dict, + python=python, + stages=stages, + subsets=subsets, + paramsets=paramsets, + collect=collect, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + external_upstream_tasks=external_upstream_tasks, + **kwargs, + ) + workflow.add_tasks(tasks) def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: @@ -212,6 +301,9 @@ def get_tasks( subsets: dict[str, Any | list[Any]] | None, paramsets: dict[str, Any | list[Any]] | None, collect: bool | None, + task_and_template_prefix: str | None, + max_attempts: int, + external_upstream_tasks: list[Task] | None = None, **kwargs, ) -> list[Task]: """Get Jobmon tasks. @@ -249,6 +341,17 @@ def get_tasks( Whether to collect submodel results if `model` is a `Stage` instance. + Jobmon Parameters + ----------------- + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. + max_attempts : int + Maximum number of attempts for a task. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. Default + None, no external upstreams. + Returns ------- list of Task @@ -257,17 +360,28 @@ def get_tasks( """ if isinstance(model, Pipeline): return get_pipeline_tasks( - model, method, tool, resources, python, stages, **kwargs + model, + method, + tool, + resources, + python, + stages, + external_upstream_tasks, + task_and_template_prefix, + max_attempts, + **kwargs, ) return get_stage_tasks( - model, - method, - tool, - resources, - python, - subsets, - paramsets, - collect, + stage=model, + method=method, + tool=tool, + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + subsets=subsets, + paramsets=paramsets, + collect=collect, **kwargs, ) @@ -279,6 +393,9 @@ def get_pipeline_tasks( resources: dict[str, Any], python: Path | str, stages: list[str] | None, + external_upstream_tasks: list[Task] | None, + task_and_template_prefix: str | None, + max_attempts: int, **kwargs, ) -> list[Task]: """Get pipeline stage tasks. @@ -298,6 +415,13 @@ def get_pipeline_tasks( stages : list of str or None Name of stages to evaluate. If None, evaluate all pipeline stages. + external_upstream_tasks : list, optional + List of Jobmon tasks external to the OneMod Stages or Pipeline that + should be treated as upstream dependencies of the new tasks. + task_and_template_prefix : str, optional + Optional prefix to append to task/template name. + max_attempts : int + Maximum number of attempts for a task. **kwargs Additional keyword arguments passed to stage methods. @@ -309,6 +433,7 @@ def get_pipeline_tasks( """ tasks = [] task_dict: dict[str, list[Task]] = {} + task_dict["external"] = external_upstream_tasks or [] for stage_name in pipeline.get_execution_order(stages): stage = pipeline.stages[stage_name] @@ -317,11 +442,13 @@ def get_pipeline_tasks( stage, method, pipeline.stages, task_dict, stages ) task_dict[stage_name] = get_stage_tasks( - stage, - method, - tool, - resources, - python, + stage=stage, + method=method, + tool=tool, + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, upstream_tasks=upstream_tasks, **kwargs, ) @@ -365,6 +492,7 @@ def get_upstream_tasks( * If an upstream stage has submodels and `method` is in the upstream's `collect_after`, only include the task corresponding to the upstream's `collect` method. + * If there are no upstream tasks, add any external tasks as upstream. """ upstream_tasks = [] @@ -385,6 +513,10 @@ def get_upstream_tasks( else: upstream_tasks.extend(task_dict[upstream_name]) + # if there are no upstream tasks, add external upstream tasks + if not upstream_tasks: + upstream_tasks = task_dict.get("external", []) + return upstream_tasks @@ -394,13 +526,12 @@ def get_stage_tasks( tool: Tool, resources: dict[str, Any], python: Path | str, + task_and_template_prefix: str | None, + max_attempts: int, subsets: dict[str, Any | list[Any]] | None = None, paramsets: dict[str, Any | list[Any]] | None = None, collect: bool | None = None, upstream_tasks: list[Task] | None = None, - external_upstream_tasks: list[Task] | None = None, - task_and_template_prefix: str | None = None, - max_attempts: int = 1, **kwargs, ) -> list[Task]: """Get stage tasks. @@ -429,15 +560,10 @@ def get_stage_tasks( True, otherwise default is False. upstream_tasks : list of Task or None, optional List of upstream stage tasks. Default is None. - external_upstream_tasks : list, optional - List of Jobmon tasks external to the OneMod Stages or Pipeline that - should be treated as upstream dependencies of the new tasks. Default - is no external upstream tasks. task_and_template_prefix : str, optional - Optional prefix to append to task/template name. Default is None, - no prefix. - max_attempts : int, optional - Maximum number of attempts for a task. Default is 1. + Optional prefix to append to task/template name. + max_attempts : int + Maximum number of attempts for a task. **kwargs Additional keyword arguments passed to stage method. @@ -460,6 +586,7 @@ def get_stage_tasks( tool, resources, list(submodel_args.keys()), + task_and_template_prefix=task_and_template_prefix, **kwargs, ) @@ -468,16 +595,10 @@ def get_stage_tasks( if task_and_template_prefix else f"{stage.name}_{method}" ) - all_upstream_tasks = ( - upstream_tasks + external_upstream_tasks - if external_upstream_tasks - else upstream_tasks - ) - if submodel_args: tasks = task_template.create_tasks( name=task_name, - upstream_tasks=all_upstream_tasks, + upstream_tasks=upstream_tasks, max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, @@ -489,7 +610,7 @@ def get_stage_tasks( tasks = [ task_template.create_task( name=task_name, - upstream_tasks=all_upstream_tasks, + upstream_tasks=upstream_tasks, max_attempts=max_attempts, entrypoint=entrypoint, config=config_path, @@ -502,7 +623,14 @@ def get_stage_tasks( if collect_results(stage, method, subsets, paramsets, collect): tasks.extend( get_stage_tasks( - stage, "collect", tool, resources, python, upstream_tasks=tasks + stage=stage, + method="collect", + tool=tool, + resources=resources, + python=python, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + upstream_tasks=tasks, ) ) @@ -571,7 +699,7 @@ def get_task_template( tool: Tool, resources: dict[str, Any], submodel_args: list[str], - task_and_template_prefix: str | None = None, + task_and_template_prefix: str | None, **kwargs, ) -> TaskTemplate: """Get stage task template. @@ -592,8 +720,7 @@ def get_task_template( submodel_args : list of str List including 'subsets' and/or 'paramsets'. task_and_template_prefix : str, optional - Optional prefix to append to task/template name. Default is None, - no prefix. + Optional prefix to append to task/template name. **kwargs Additional keyword arguments passed to stage method. diff --git a/src/onemod/pipeline.py b/src/onemod/pipeline.py index 1e47ba63..a4aab2d5 100644 --- a/src/onemod/pipeline.py +++ b/src/onemod/pipeline.py @@ -6,10 +6,13 @@ import logging from collections import deque from pathlib import Path -from typing import Any, Literal +from typing import TYPE_CHECKING, Any, Literal from pydantic import BaseModel +if TYPE_CHECKING: + from jobmon.client.workflow import Workflow + from onemod.config import Config from onemod.serialization import serialize from onemod.stage import Stage @@ -260,6 +263,28 @@ def save_validation_report( report_path = validation_dir / "validation_report.json" serialize(collector.errors, report_path) # type: ignore[arg-type] + def add_tasks_to_workflow( + self, + workflow: "Workflow", + method: Literal["run", "fit", "predict", "collect"], + stages: list[str] | None, + resources: Path | str | dict[str, Any], + python: Path | str | None, + **kwargs, + ) -> None: + """Add pipeline tasks to existing Jobmon Workflow.""" + from onemod.backend.jobmon_backend import add_tasks_to_workflow + + add_tasks_to_workflow( + workflow=workflow, + model=self, + method=method, + stages=stages, + resources=resources, + python=python, + **kwargs, + ) + def evaluate( self, method: Literal["run", "fit", "predict", "collect"], diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index c8f1584e..72311cd1 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -60,20 +60,6 @@ def test_parallel_pipeline_stages(parallel_pipeline): ) -@pytest.mark.e2e -@pytest.mark.requires_jobmon -def test_parallel_pipeline_stages_existing_workflow(parallel_pipeline): - tool = Tool(name="test_run") - tool.set_default_cluster_name("dummy") - tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) - workflow = tool.create_workflow(name="test_run_workflow") - parallel_pipeline.evaluate( - method="run", stages=["run_1", "fit_2"], workflow=workflow, **KWARGS - ) - workflow.bind() - workflow.run() - - @pytest.mark.e2e @pytest.mark.requires_jobmon @pytest.mark.parametrize("method", ["run", "fit", "predict"]) @@ -130,3 +116,31 @@ def test_parallel_stage_submodels(parallel_pipeline, submodel, collect): "collect": collect, }, ) + + +@pytest.mark.e2e +@pytest.mark.requires_jobmon +def test_simple_pipeline_add_tasks_to_workflow(simple_pipeline): + tool = Tool(name="test_run_simple_pipeline") + tool.set_default_cluster_name("dummy") + tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) + workflow = tool.create_workflow(name="test_run_workflow") + simple_pipeline.add_tasks_to_workflow( + workflow=workflow, method="run", stages=["run_1", "fit_2"], **KWARGS + ) + workflow.bind() + workflow.run() + + +@pytest.mark.e2e +@pytest.mark.requires_jobmon +def test_parallel_pipeline_add_tasks_to_workflow(parallel_pipeline): + tool = Tool(name="test_run_parallel_pipeline") + tool.set_default_cluster_name("dummy") + tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) + workflow = tool.create_workflow(name="test_run_workflow") + parallel_pipeline.add_tasks_to_workflow( + workflow=workflow, method="run", stages=["run_1", "fit_2"], **KWARGS + ) + workflow.bind() + workflow.run() diff --git a/tests/integration/test_integration_jobmon_backend.py b/tests/integration/test_integration_jobmon_backend.py index bc7d25d9..e3d4d219 100644 --- a/tests/integration/test_integration_jobmon_backend.py +++ b/tests/integration/test_integration_jobmon_backend.py @@ -116,7 +116,12 @@ def test_task_template(stage_cluster): } tool = jb.get_tool("pipeline", "method", "cluster", resources) task_template = jb.get_task_template( - "stage", "method", tool, resources, submodel_args=[] + "stage", + "method", + tool, + resources, + submodel_args=[], + task_and_template_prefix=None, ) default_cluster = task_template.default_cluster_name default_resources = task_template.default_compute_resources_set @@ -212,6 +217,9 @@ def test_simple_pipeline_tasks(simple_pipeline, method, stages): resources=resources, python=python, stages=stages, + external_upstream_tasks=None, + task_and_template_prefix=None, + max_attempts=1, ) stages = list(simple_pipeline.stages.keys()) if stages is None else stages task_dict = {task.task_args["stages"]: task for task in tasks} @@ -244,6 +252,9 @@ def test_parallel_pipeline_tasks(parallel_pipeline, method, stages): resources=resources, python=python, stages=stages, + external_upstream_tasks=None, + task_and_template_prefix=None, + max_attempts=1, ) stages = list(parallel_pipeline.stages.keys()) if stages is None else stages task_dict = {task.task_args["stages"]: defaultdict(list) for task in tasks} @@ -288,6 +299,91 @@ def test_parallel_pipeline_tasks(parallel_pipeline, method, stages): assert stage.name not in task_dict +@pytest.mark.integration +@pytest.mark.requires_jobmon +@pytest.mark.parametrize("method", ["run", "fit", "predict"]) +@pytest.mark.parametrize("stages", [None, ["run_1", "fit_2", "predict_3"]]) +def test_parallel_pipeline_tasks_jobmon_args(parallel_pipeline, method, stages): + cluster = "cluster" + resources = {"tool_resources": {cluster: {"queue": "null.q"}}} + python = "/path/to/python/env/bin/python" + external_upstream_tasks = [ + jb.Task( + node=mock.MagicMock(), + task_args={"fake_arg_1": "fake_value"}, + op_args={"fake_arg_2": "fake_value"}, + name="fake_task", + task_attributes=[], + ) + ] + task_and_template_prefix = "testing" + max_attempts = 3 + tasks = jb.get_pipeline_tasks( + parallel_pipeline, + method, + jb.get_tool(parallel_pipeline.name, method, cluster, resources), + resources=resources, + python=python, + stages=stages, + external_upstream_tasks=external_upstream_tasks, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, + ) + stages = list(parallel_pipeline.stages.keys()) if stages is None else stages + task_dict = {task.task_args["stages"]: defaultdict(list) for task in tasks} + for task in tasks: + task_dict[task.task_args["stages"]][task.task_args["method"]].append( + task + ) + + for stage in parallel_pipeline.stages.values(): + if stage.name in stages: + method_tasks = task_dict[stage.name][method] + collect_tasks = task_dict[stage.name]["collect"] + assert len(method_tasks) == len(stage.get_submodels()) + + if method in stage.collect_after: + assert len(collect_tasks) == 1 + assert collect_tasks[0].upstream_tasks == set(method_tasks) + else: + assert len(collect_tasks) == 0 + + for task in method_tasks: + stage_upstreams = [ + upstream_task + for upstream_task in task.upstream_tasks + if "stages" in upstream_task.task_args + ] + external_upstreams = [ + upstream_task + for upstream_task in task.upstream_tasks + if "stages" not in upstream_task.task_args + ] + upstream_dict = { + upstream_task.task_args["stages"]: defaultdict(list) + for upstream_task in stage_upstreams + } + for upstream_task in stage_upstreams: + upstream_dict[upstream_task.task_args["stages"]][ + upstream_task.task_args["method"] + ].append(upstream_task) + for upstream_name in stage.dependencies: + # assumes upstream_stage in stages + upstream_stage = parallel_pipeline.stages[upstream_name] + if method in upstream_stage.collect_after: + assert len(upstream_dict[upstream_name][method]) == 0 + assert len(upstream_dict[upstream_name]["collect"]) == 1 + else: + assert len(upstream_dict[upstream_name][method]) == len( + upstream_stage.get_submodels() + ) + assert len(upstream_dict[upstream_name]["collect"]) == 0 + if external_upstreams: + assert external_upstreams == external_upstream_tasks + else: + assert stage.name not in task_dict + + @pytest.mark.integration @pytest.mark.requires_jobmon def test_stage_tasks_basic(simple_pipeline): @@ -304,6 +400,8 @@ def test_stage_tasks_basic(simple_pipeline): jb.get_tool(simple_pipeline.name, method, cluster, resources), resources=resources, python=python, + task_and_template_prefix=None, + max_attempts=1, ) task = tasks[0] @@ -342,6 +440,8 @@ def test_stage_tasks_kwargs(simple_pipeline, kwargs): jb.get_tool(simple_pipeline.name, method, cluster, resources), resources=resources, python=python, + task_and_template_prefix=None, + max_attempts=1, **kwargs, )[0] assert task.command == jb.get_command_template(method, [], **kwargs).format( @@ -383,6 +483,8 @@ def test_stage_tasks_submodels(parallel_pipeline, submodel, collect): subsets=subsets, paramsets=paramsets, collect=collect, + task_and_template_prefix=None, + max_attempts=1, ) submodels = [ [str(submodel[0]), str(submodel[1])] @@ -424,6 +526,8 @@ def test_stage_tasks_collect_after(parallel_pipeline, method): subsets={"sex_id": 1}, paramsets={"param": 1}, collect=True, + task_and_template_prefix=None, + max_attempts=1, ) assert tasks[0].task_args["method"] == method if method == "predict": @@ -441,15 +545,6 @@ def test_stage_tasks_jobmon_args(simple_pipeline): cluster = "cluster" resources = {"tool_resources": {cluster: {"queue": "null.q"}}} python = "/path/to/python/env/bin/python" - external_upstream_tasks = [ - jb.Task( - node=mock.MagicMock(), - task_args={"fake_arg_1": "fake_value"}, - op_args={"fake_arg_2": "fake_value"}, - name="fake_task", - task_attributes=[], - ) - ] task_and_template_prefix = "testing" max_attempts = 3 entrypoint = str(Path(python).parent / "onemod") @@ -460,7 +555,6 @@ def test_stage_tasks_jobmon_args(simple_pipeline): jb.get_tool(simple_pipeline.name, method, cluster, resources), resources=resources, python=python, - external_upstream_tasks=external_upstream_tasks, task_and_template_prefix=task_and_template_prefix, max_attempts=max_attempts, ) @@ -481,4 +575,3 @@ def test_stage_tasks_jobmon_args(simple_pipeline): "stages": stage.name, } assert task.node.node_args == {} - assert task.upstream_tasks == set(external_upstream_tasks) From c217baa3c4a56dda37a9bb6f8baadf89f8c9822c Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Wed, 19 Mar 2025 09:33:37 -0700 Subject: [PATCH 12/26] addressing second round of comments --- src/onemod/__init__.py | 2 ++ src/onemod/backend/jobmon_backend.py | 22 +++++++++++++--------- src/onemod/pipeline.py | 27 +-------------------------- 3 files changed, 16 insertions(+), 35 deletions(-) diff --git a/src/onemod/__init__.py b/src/onemod/__init__.py index 188449fc..d5a3b583 100644 --- a/src/onemod/__init__.py +++ b/src/onemod/__init__.py @@ -1,9 +1,11 @@ +from onemod.backend.jobmon_backend import add_tasks_to_workflow from onemod.config import Config, StageConfig from onemod.main import load_pipeline, load_stage from onemod.pipeline import Pipeline from onemod.stage import Stage __all__ = [ + "add_tasks_to_workflow", "Config", "Pipeline", "Stage", diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 5874cf8f..698f6302 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -166,6 +166,9 @@ def add_tasks_to_workflow( ) -> None: """Add Pipeline tasks to an existing Jobmon Workflow. + Note that this is a publically available function, be careful of + breaking changes to the API or functionality. + Parameters ---------- workflow : Workflow @@ -360,15 +363,15 @@ def get_tasks( """ if isinstance(model, Pipeline): return get_pipeline_tasks( - model, - method, - tool, - resources, - python, - stages, - external_upstream_tasks, - task_and_template_prefix, - max_attempts, + pipeline=model, + method=method, + tool=tool, + resources=resources, + python=python, + stages=stages, + external_upstream_tasks=external_upstream_tasks, + task_and_template_prefix=task_and_template_prefix, + max_attempts=max_attempts, **kwargs, ) return get_stage_tasks( @@ -382,6 +385,7 @@ def get_tasks( subsets=subsets, paramsets=paramsets, collect=collect, + upstream_tasks=external_upstream_tasks, **kwargs, ) diff --git a/src/onemod/pipeline.py b/src/onemod/pipeline.py index a4aab2d5..1e47ba63 100644 --- a/src/onemod/pipeline.py +++ b/src/onemod/pipeline.py @@ -6,13 +6,10 @@ import logging from collections import deque from pathlib import Path -from typing import TYPE_CHECKING, Any, Literal +from typing import Any, Literal from pydantic import BaseModel -if TYPE_CHECKING: - from jobmon.client.workflow import Workflow - from onemod.config import Config from onemod.serialization import serialize from onemod.stage import Stage @@ -263,28 +260,6 @@ def save_validation_report( report_path = validation_dir / "validation_report.json" serialize(collector.errors, report_path) # type: ignore[arg-type] - def add_tasks_to_workflow( - self, - workflow: "Workflow", - method: Literal["run", "fit", "predict", "collect"], - stages: list[str] | None, - resources: Path | str | dict[str, Any], - python: Path | str | None, - **kwargs, - ) -> None: - """Add pipeline tasks to existing Jobmon Workflow.""" - from onemod.backend.jobmon_backend import add_tasks_to_workflow - - add_tasks_to_workflow( - workflow=workflow, - model=self, - method=method, - stages=stages, - resources=resources, - python=python, - **kwargs, - ) - def evaluate( self, method: Literal["run", "fit", "predict", "collect"], From c1ad9f69c3b89f44c07f9aa9f3fb2d640c7cd38b Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Wed, 19 Mar 2025 10:00:53 -0700 Subject: [PATCH 13/26] e2e tests updated --- src/onemod/backend/jobmon_backend.py | 2 +- tests/e2e/test_e2e_jobmon_backend.py | 18 ++++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 698f6302..01c3f233 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -150,8 +150,8 @@ def evaluate_with_jobmon( @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) def add_tasks_to_workflow( - workflow: Workflow, model: Pipeline | Stage, + workflow: Workflow, method: Literal["run", "fit", "predict", "collect"], resources: Path | str | dict[str, Any], python: Path | str | None = None, diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index 72311cd1..3e713c44 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -10,6 +10,8 @@ import pytest from jobmon.client.api import Tool +from onemod.backend.jobmon_backend import add_tasks_to_workflow + KWARGS = { "backend": "jobmon", "cluster": "dummy", @@ -125,8 +127,12 @@ def test_simple_pipeline_add_tasks_to_workflow(simple_pipeline): tool.set_default_cluster_name("dummy") tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) workflow = tool.create_workflow(name="test_run_workflow") - simple_pipeline.add_tasks_to_workflow( - workflow=workflow, method="run", stages=["run_1", "fit_2"], **KWARGS + add_tasks_to_workflow( + model=simple_pipeline, + workflow=workflow, + method="run", + stages=["run_1", "fit_2"], + **KWARGS, ) workflow.bind() workflow.run() @@ -139,8 +145,12 @@ def test_parallel_pipeline_add_tasks_to_workflow(parallel_pipeline): tool.set_default_cluster_name("dummy") tool.set_default_compute_resources_from_dict("dummy", {"queue": "null.q"}) workflow = tool.create_workflow(name="test_run_workflow") - parallel_pipeline.add_tasks_to_workflow( - workflow=workflow, method="run", stages=["run_1", "fit_2"], **KWARGS + add_tasks_to_workflow( + model=parallel_pipeline, + workflow=workflow, + method="run", + stages=["run_1", "fit_2"], + **KWARGS, ) workflow.bind() workflow.run() From cdaa54821208e9c66f4f7c38884095baf6fc7676 Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Wed, 19 Mar 2025 11:19:03 -0700 Subject: [PATCH 14/26] update load_pipeline to load custom pipeline classes --- src/onemod/main.py | 63 +++++++++++++++++++++++++++++----------------- 1 file changed, 40 insertions(+), 23 deletions(-) diff --git a/src/onemod/main.py b/src/onemod/main.py index 1f0ce092..120fd15e 100644 --- a/src/onemod/main.py +++ b/src/onemod/main.py @@ -27,7 +27,9 @@ def load_pipeline(config: Path | str) -> Pipeline: Pipeline instance. """ - return Pipeline.from_json(config) + pipeline_class: type[Pipeline] = _get_class(config) + pipeline = pipeline_class.from_json(config) + return pipeline def load_stage(config: Path | str, stage_name: str) -> Stage: @@ -46,25 +48,28 @@ def load_stage(config: Path | str, stage_name: str) -> Stage: Stage instance. """ - stage_class = _get_stage(config, stage_name) + stage_class: type[Stage] = _get_class(config, stage_name) stage = stage_class.from_json(config, stage_name) return stage -def _get_stage(config: Path | str, stage_name: str) -> Stage: - """Get stage class from JSON file. +def _get_class( + config: Path | str, stage_name: str | None = None +) -> type[Pipeline] | type[Stage]: + """Get pipeline or stage class from JSON file. Parameters ---------- config : Path or str Path to config file. - stage_name : str - Stage name. + stage_name : str or None, optional + Name of stage in config file to get class for. If None, get + class for pipeline in config file. Default is None. Returns ------- - Stage - Stage class. + Pipeline or Stage + Pipeline or Stage class. Notes ----- @@ -74,34 +79,46 @@ def _get_stage(config: Path | str, stage_name: str) -> Stage: """ with open(config, "r") as f: config_dict = json.load(f) - if stage_name not in config_dict["stages"]: - raise KeyError(f"Config does not contain a stage named '{stage_name}'") - config_dict = config_dict["stages"][stage_name] - stage_type = config_dict["type"] + + if stage_name is None: + model_name = config_dict["name"] + model_type = config_dict["type"] + else: + if stage_name not in config_dict["stages"]: + raise KeyError( + f"Config does not contain a stage named '{stage_name}'" + ) + config_dict = config_dict["stages"][stage_name] + model_name = stage_name + model_type = config_dict["type"] if "module" in config_dict: - return _get_custom_stage(stage_type, config_dict["module"]) - if hasattr(onemod_stages, stage_type): - return getattr(onemod_stages, stage_type) + return _get_custom_class(model_type, config_dict["module"]) + if model_type == "Pipeline": + return Pipeline + if hasattr(onemod_stages, model_type): + return getattr(onemod_stages, model_type) raise KeyError( - f"Config does not contain a module for custom stage '{stage_name}'" + f"Config does not contain a module for custom class '{model_name}'" ) -def _get_custom_stage(stage_type: str, module: str) -> Stage: - """Get custom stage class from file. +def _get_custom_class( + class_type: str, module: str +) -> type[Pipeline] | type[Stage]: + """Get custom pipeline or stage class from file. Parameters ---------- - stage_type : str - Name of custom stage class. + class_type : str + Name of custom class. module : str - Path to Python module containing custom stage class definition. + Path to Python module containing custom class definition. Returns ------- Stage - Custom stage class. + Custom pipeline or stage class. """ module_path = Path(module) @@ -120,7 +137,7 @@ def _get_custom_stage(stage_type: str, module: str) -> Stage: loaded_module = module_from_spec(spec) spec.loader.exec_module(loaded_module) - return getattr(loaded_module, stage_type) + return getattr(loaded_module, class_type) def evaluate( From 9b3c02e40b35ea143a45eae4d711012de1cfe0f1 Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Wed, 19 Mar 2025 11:56:20 -0700 Subject: [PATCH 15/26] change else to elif --- src/onemod/stage/base.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/onemod/stage/base.py b/src/onemod/stage/base.py index 12fd49da..0adf4085 100644 --- a/src/onemod/stage/base.py +++ b/src/onemod/stage/base.py @@ -90,14 +90,13 @@ def module(self) -> Path | None: def set_module(self, module: Path | str | None) -> None: if isinstance(module, (Path, str)): self._module = Path(module) - else: - if not hasattr(onemod_stages, self.type): - try: - self._module = Path(getfile(self.__class__)) - except (OSError, TypeError): - raise TypeError( - f"Could not find module for custom stage '{self.name}'" - ) + elif not hasattr(onemod_stages, self.type): + try: + self._module = Path(getfile(self.__class__)) + except (OSError, TypeError): + raise TypeError( + f"Could not find module for custom stage '{self.name}'" + ) @computed_property def input(self) -> Input: From 61fe369b80c48f7914714a53536a7d27e9245d6c Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Wed, 19 Mar 2025 12:13:39 -0700 Subject: [PATCH 16/26] use model_construct instead of Config in case of custom class --- src/onemod/config/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/onemod/config/base.py b/src/onemod/config/base.py index f463e3e8..86db49e0 100644 --- a/src/onemod/config/base.py +++ b/src/onemod/config/base.py @@ -65,7 +65,7 @@ class StageConfig(Config): def add_pipeline_config(self, pipeline_config: Config | dict) -> None: if isinstance(pipeline_config, dict): - pipeline_config = Config(**pipeline_config) + pipeline_config = type(self._pipeline_config)(**pipeline_config) missing = [] for item in self._required: From 483fab6cf9239e607b44bd076655109e5b0e460f Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Wed, 19 Mar 2025 12:45:43 -0700 Subject: [PATCH 17/26] add comments about custom configs to advanced usage page --- docs/user_guide/advanced_usage.rst | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/docs/user_guide/advanced_usage.rst b/docs/user_guide/advanced_usage.rst index 83cb17d2..2d0c363f 100644 --- a/docs/user_guide/advanced_usage.rst +++ b/docs/user_guide/advanced_usage.rst @@ -145,7 +145,8 @@ automatically validate any user-supplied settings. attribute. For example, ``stage.config["id_columns"]`` will return ``id_columns`` from the stage's config if it exists and is not ``None``, otherwise it will return ``id_columns`` from the pipeline's config if it - exists and is not ``None``. + exists and is not ``None``. If an attribute exists but is ``None``, it is + treated as if it does not exist. * If a stage has a required setting that can be specified at either the stage or pipeline level, the item should include ``None`` as its default in the custom stage config and the item's name should be included in the stage config's @@ -153,3 +154,8 @@ automatically validate any user-supplied settings. * To enable the :py:attr:`~onemod.stage.base.Stage.crossby` attribute for a setting in a custom stage config, the setting's type hints must include a list, set, or tuple. For example, ``param: int | list[int]``. +* When defining a custom :py:attr:`~onemod.config.base.StageConfig` class, you + should specify the type of the private ``_pipeline_config`` attribute if it + corresponds to a custom :py:attr:`~onemod.config.base.Config` class, otherwise + custom fields in the pipeline config will not be validated when reloading the + stage from a JSON file. From 019315ca5101ef540fe8420ae7fd64090229852f Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Wed, 19 Mar 2025 12:46:35 -0700 Subject: [PATCH 18/26] add type and module attributes, add init func --- src/onemod/pipeline.py | 46 +++++++++++++++++++++++++++++++++++++----- 1 file changed, 41 insertions(+), 5 deletions(-) diff --git a/src/onemod/pipeline.py b/src/onemod/pipeline.py index 1e47ba63..b4306474 100644 --- a/src/onemod/pipeline.py +++ b/src/onemod/pipeline.py @@ -5,6 +5,7 @@ import json import logging from collections import deque +from inspect import getfile from pathlib import Path from typing import Any, Literal @@ -40,8 +41,42 @@ class Pipeline(BaseModel): directory: Path config: Config = Config() groupby_data: Path | None = None + _module: Path | None = None _stages: dict[str, Stage] = {} + def __init__( + self, + module: Path | str | None = None, + stages: list[Stage] | None = None, + **kwargs, + ) -> None: + """Create pipeline instance.""" + super().__init__(**kwargs) + self.set_module(module) + if stages is not None: + self.add_stages(stages) + + @computed_property + def type(self) -> str: + """Pipeline type.""" + return type(self).__name__ + + @computed_property + def module(self) -> Path | None: + """Path to module containing custom pipeline definition.""" + return self._module + + def set_module(self, module: Path | str | None) -> None: + if isinstance(module, (Path, str)): + self._module = Path(module) + elif self.type != "Pipeline": + try: + self._module = Path(getfile(self.__class__)) + except (OSError, TypeError): + raise TypeError( + f"Could not find module for custom pipeline class '{self.name}'" + ) + @computed_property def stages(self) -> dict[str, Stage]: """Pipeline stages.""" @@ -72,18 +107,19 @@ def from_json(cls, config_path: Path | str) -> Pipeline: with open(config_path, "r") as file: config = json.load(file) + del config["type"] + del config["dependencies"] stages = config.pop("stages", {}) - pipeline = cls(**config) - if stages: from onemod.main import load_stage - pipeline.add_stages( - [load_stage(config_path, stage) for stage in stages] + return cls( + stages=[load_stage(config_path, stage) for stage in stages], + **config, ) - return pipeline + return cls(**config) def to_json(self, config_path: Path | str | None = None) -> None: """Save pipeline as JSON file. From 2c3709c8626cd5983b65056d03ff170f4ec0e40a Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Thu, 20 Mar 2025 10:33:01 -0700 Subject: [PATCH 19/26] splitting out create and run workflow --- src/onemod/backend/jobmon_backend.py | 56 +++++++++++++++++++--------- 1 file changed, 38 insertions(+), 18 deletions(-) diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 01c3f233..210d864c 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -130,11 +130,11 @@ def evaluate_with_jobmon( python = str(sys.executable) resources_dict = get_resources(resources) - tool = get_tool(model.name, method, cluster, resources_dict) - tasks = get_tasks( + workflow = create_workflow(model.name, method, cluster, resources_dict) + add_tasks_to_workflow( model=model, + workflow=workflow, method=method, - tool=tool, resources=resources_dict, python=python, stages=stages, @@ -145,7 +145,7 @@ def evaluate_with_jobmon( max_attempts=max_attempts, **kwargs, ) - create_and_run_workflow(model.name, method, tool, tasks) + run_workflow(workflow) @validate_call(config=ConfigDict(arbitrary_types_allowed=True)) @@ -264,6 +264,36 @@ def get_resources(resources: Path | str | dict[str, Any]) -> dict[str, Any]: return resources +def create_workflow( + name: str, + method: Literal["run", "fit", "predict", "collect"], + cluster: str, + resources: dict[str, Any], +) -> Workflow: + """Create and return workflow. + + Parameters + ---------- + name : str + Pipeline or stage name. + method : str + Name of method being evaluated. + cluster : str + Cluster name. + resources : dict + Dictionary of compute resources. + + Returns + ------- + Workflow + Jobmon workflow. + + """ + tool = get_tool(name, method, cluster, resources) + workflow = tool.create_workflow(name=f"{name}_{method}") + return workflow + + def get_tool( name: str, method: str, cluster: str, resources: dict[str, Any] ) -> Tool: @@ -833,25 +863,15 @@ def get_task_resources( } -def create_and_run_workflow( - name: str, method: str, tool: Tool, tasks: list[Task] -) -> None: - """Create and run workflow. +def run_workflow(workflow: Workflow) -> None: + """Run workflow. Parameters ---------- - name : str - Pipeline or stage name. - method : str - Name of method being evaluated. - tool : Tool - Jobmon tool. - tasks : list of Task - List of stage tasks. + workflow : Workflow + Jobmon workflow to run. """ - workflow = tool.create_workflow(name=f"{name}_{method}") - workflow.add_tasks(tasks) workflow.bind() print(f"Starting workflow {workflow.workflow_id}") status = workflow.run() From 3c219c85000108f91bde42bcf3b5a24a27e5a152 Mon Sep 17 00:00:00 2001 From: Leo Zoeckler Date: Thu, 20 Mar 2025 11:04:28 -0700 Subject: [PATCH 20/26] removing jobmon backend from init --- src/onemod/__init__.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/onemod/__init__.py b/src/onemod/__init__.py index d5a3b583..188449fc 100644 --- a/src/onemod/__init__.py +++ b/src/onemod/__init__.py @@ -1,11 +1,9 @@ -from onemod.backend.jobmon_backend import add_tasks_to_workflow from onemod.config import Config, StageConfig from onemod.main import load_pipeline, load_stage from onemod.pipeline import Pipeline from onemod.stage import Stage __all__ = [ - "add_tasks_to_workflow", "Config", "Pipeline", "Stage", From bee82534ef48a26b73cd29fba7775e23b603a06a Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Thu, 20 Mar 2025 13:23:29 -0700 Subject: [PATCH 21/26] add try/catch for jobmon import, update changelog --- CHANGELOG.md | 3 ++- tests/e2e/test_e2e_jobmon_backend.py | 9 +++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0c00a3dc..f465b710 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,8 +11,9 @@ and this project adheres to [Semantic Versioning](http://semver.org/spec/v2.0.0. ### Added -- Added new pipeline method `add_tasks_to_workflow` for adding tasks to an existing Jobmon workflow +- Added new Jobmon-backend method `add_tasks_to_workflow` for adding tasks to an existing Jobmon workflow. - Added new Jobmon-related arguments: `external_upstream_dependencies`, `task_and_template_prefix`, and `max_attempts`. +- Split Jobmon-backend method `run_workflow` into `create_workflow` and `run_workflow`. ## [1.0.3] - 2025-03-12 diff --git a/tests/e2e/test_e2e_jobmon_backend.py b/tests/e2e/test_e2e_jobmon_backend.py index 3e713c44..4c82502b 100644 --- a/tests/e2e/test_e2e_jobmon_backend.py +++ b/tests/e2e/test_e2e_jobmon_backend.py @@ -8,9 +8,14 @@ """ import pytest -from jobmon.client.api import Tool -from onemod.backend.jobmon_backend import add_tasks_to_workflow +try: + from jobmon.client.api import Tool + + from onemod.backend.jobmon_backend import add_tasks_to_workflow +except ImportError: + pass + KWARGS = { "backend": "jobmon", From c8593b2bc55a831edca0cff811cb715650b854fa Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Thu, 20 Mar 2025 13:49:06 -0700 Subject: [PATCH 22/26] add PR back to build action --- .github/workflows/build.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 52bd7fb2..be4a7836 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -4,6 +4,9 @@ on: push: branches: - "*" + pull_request: + branches: + - "*" jobs: build: From 0193e60dac67645386a1f1f25ffede9dffe77f5a Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Tue, 8 Apr 2025 07:53:35 -0700 Subject: [PATCH 23/26] add module for loading custom classes --- src/onemod/config/base.py | 5 +- src/onemod/main.py | 46 ++-------------- src/onemod/stage/base.py | 20 +++++-- src/onemod/utils/custom_classes.py | 84 ++++++++++++++++++++++++++++++ 4 files changed, 105 insertions(+), 50 deletions(-) create mode 100644 src/onemod/utils/custom_classes.py diff --git a/src/onemod/config/base.py b/src/onemod/config/base.py index 86db49e0..864f10b0 100644 --- a/src/onemod/config/base.py +++ b/src/onemod/config/base.py @@ -63,10 +63,7 @@ class StageConfig(Config): _pipeline_config: Config = Config() _required: list[str] = [] - def add_pipeline_config(self, pipeline_config: Config | dict) -> None: - if isinstance(pipeline_config, dict): - pipeline_config = type(self._pipeline_config)(**pipeline_config) - + def add_pipeline_config(self, pipeline_config: Config) -> None: missing = [] for item in self._required: if not self.stage_contains(item) and item not in pipeline_config: diff --git a/src/onemod/main.py b/src/onemod/main.py index 120fd15e..995edac1 100644 --- a/src/onemod/main.py +++ b/src/onemod/main.py @@ -1,8 +1,6 @@ """Methods to load and evaluate pipeline and stage objects.""" import json -from importlib.util import module_from_spec, spec_from_file_location -from inspect import getmodulename from pathlib import Path from typing import Any, Literal @@ -11,6 +9,7 @@ import onemod.stage as onemod_stages from onemod.pipeline import Pipeline from onemod.stage import Stage +from onemod.utils.custom_classes import get_custom_class def load_pipeline(config: Path | str) -> Pipeline: @@ -73,8 +72,8 @@ class for pipeline in config file. Default is None. Notes ----- - When a custom stage class has the same name as a built-in OneMod - stage class, this function returns the custom stage class. + When a custom class has the same name as a built-in OneMod class, + this function returns the custom class. """ with open(config, "r") as f: @@ -93,7 +92,7 @@ class for pipeline in config file. Default is None. model_type = config_dict["type"] if "module" in config_dict: - return _get_custom_class(model_type, config_dict["module"]) + return get_custom_class(model_type, config_dict["module"]) if model_type == "Pipeline": return Pipeline if hasattr(onemod_stages, model_type): @@ -103,43 +102,6 @@ class for pipeline in config file. Default is None. ) -def _get_custom_class( - class_type: str, module: str -) -> type[Pipeline] | type[Stage]: - """Get custom pipeline or stage class from file. - - Parameters - ---------- - class_type : str - Name of custom class. - module : str - Path to Python module containing custom class definition. - - Returns - ------- - Stage - Custom pipeline or stage class. - - """ - module_path = Path(module) - - module_name = getmodulename(module_path) - if module_name is None: - raise ValueError(f"Could not determine module name from {module_path}") - - spec = spec_from_file_location(module_name, module_path) - if spec is None: - raise ImportError(f"Could not load spec for module {module_path}") - - if spec.loader is None: - raise ImportError(f"Module spec for {module_path} has no loader") - - loaded_module = module_from_spec(spec) - spec.loader.exec_module(loaded_module) - - return getattr(loaded_module, class_type) - - def evaluate( config: Path | str, method: Literal["run", "fit", "predict", "collect"] = "run", diff --git a/src/onemod/stage/base.py b/src/onemod/stage/base.py index 0adf4085..c1f18591 100644 --- a/src/onemod/stage/base.py +++ b/src/onemod/stage/base.py @@ -13,11 +13,11 @@ from pydantic import BaseModel, ConfigDict import onemod.stage as onemod_stages -from onemod.config import StageConfig -from onemod.dtypes import Data -from onemod.dtypes.unique_sequence import UniqueList +from onemod.config import Config, StageConfig +from onemod.dtypes import Data, UniqueList from onemod.fsutils import DataInterface from onemod.io import Input, Output +from onemod.utils.custom_classes import get_custom_config_class from onemod.utils.decorators import computed_property from onemod.validation import ValidationErrorCollector, handle_error @@ -371,7 +371,19 @@ def from_json(cls, config_path: Path | str, stage_name: str) -> Stage: ) stage = cls(config_path=config_path, **stage_config) - stage.config.add_pipeline_config(pipeline_config["config"]) + + if (pipeline_module := pipeline_config.get("module")) is None: + stage.config.add_pipeline_config( + Config(**pipeline_config["config"]) + ) + else: + config_class = get_custom_config_class( + pipeline_config["type"], pipeline_module + ) + stage.config.add_pipeline_config( + config_class(**pipeline_config["config"]) + ) + return stage def build( diff --git a/src/onemod/utils/custom_classes.py b/src/onemod/utils/custom_classes.py new file mode 100644 index 00000000..c4051029 --- /dev/null +++ b/src/onemod/utils/custom_classes.py @@ -0,0 +1,84 @@ +"""Helper functions for loading custom classes from modules.""" + +from importlib.util import module_from_spec, spec_from_file_location +from inspect import getmodulename +from pathlib import Path +from types import ModuleType + +from pydantic import BaseModel + + +def get_custom_class(class_name: str, module: str) -> type[BaseModel]: + """Get custom pipeline, stage, or config class from file. + + Parameters + ---------- + class_name : str + Name of custom class. + module : str + Path to Python module containing custom class definition. + + Returns + ------- + BaseModel + Custom pipeline, stage, or config class. + + """ + loaded_module = load_module(module) + return getattr(loaded_module, class_name) + + +def get_custom_config_class(class_name: str, module: str) -> type[BaseModel]: + """Get custom pipeline config class from file. + + Parameters + ---------- + class_name : str + Name of custom pipeline class. + module : str + Path to Python module containing custom pipeline class + definition. + + Returns + ------- + BaseModel + Custom pipeline config class from file. + + """ + loaded_module = load_module(module) + pipeline_class = getattr(loaded_module, class_name) + config_class = pipeline_class.__pydantic_fields__["config"].annotation + return config_class + + +def load_module(module: str) -> ModuleType: + """Load Python module from file path. + + Parameters + ---------- + module : str + Path to Python module. + + Returns + ------- + ModuleType + Loaded Python module. + + """ + module_path = Path(module) + + module_name = getmodulename(module_path) + if module_name is None: + raise ValueError(f"Could not determine module name from {module_path}") + + spec = spec_from_file_location(module_name, module_path) + if spec is None: + raise ImportError(f"Could not load spec for module {module_path}") + + if spec.loader is None: + raise ImportError(f"Module spec for {module_path} has no loader") + + loaded_module = module_from_spec(spec) + spec.loader.exec_module(loaded_module) + + return loaded_module From 8e5f007f80ac46b8c381a8f6408b826cbe2b9c9c Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Tue, 8 Apr 2025 07:55:03 -0700 Subject: [PATCH 24/26] remove statement about pipeline config type --- docs/user_guide/advanced_usage.rst | 5 ----- 1 file changed, 5 deletions(-) diff --git a/docs/user_guide/advanced_usage.rst b/docs/user_guide/advanced_usage.rst index 2d0c363f..9e18eed9 100644 --- a/docs/user_guide/advanced_usage.rst +++ b/docs/user_guide/advanced_usage.rst @@ -154,8 +154,3 @@ automatically validate any user-supplied settings. * To enable the :py:attr:`~onemod.stage.base.Stage.crossby` attribute for a setting in a custom stage config, the setting's type hints must include a list, set, or tuple. For example, ``param: int | list[int]``. -* When defining a custom :py:attr:`~onemod.config.base.StageConfig` class, you - should specify the type of the private ``_pipeline_config`` attribute if it - corresponds to a custom :py:attr:`~onemod.config.base.Config` class, otherwise - custom fields in the pipeline config will not be validated when reloading the - stage from a JSON file. From f2e735ad698ce74646696cfd963a90d8c894eaa4 Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Tue, 8 Apr 2025 08:02:41 -0700 Subject: [PATCH 25/26] simplify function --- src/onemod/utils/custom_classes.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/onemod/utils/custom_classes.py b/src/onemod/utils/custom_classes.py index c4051029..a697c1aa 100644 --- a/src/onemod/utils/custom_classes.py +++ b/src/onemod/utils/custom_classes.py @@ -45,8 +45,7 @@ def get_custom_config_class(class_name: str, module: str) -> type[BaseModel]: Custom pipeline config class from file. """ - loaded_module = load_module(module) - pipeline_class = getattr(loaded_module, class_name) + pipeline_class = get_custom_class(class_name, module) config_class = pipeline_class.__pydantic_fields__["config"].annotation return config_class From 193bb6c29ef41fe162faffc49e6c8aea8eb65801 Mon Sep 17 00:00:00 2001 From: Kelsey Maass Date: Tue, 8 Apr 2025 08:32:01 -0700 Subject: [PATCH 26/26] adding changes from develop branch so I can delete it --- README.md | 2 +- docs/index.rst | 2 +- src/onemod/backend/jobmon_backend.py | 13 ++++++++----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/README.md b/README.md index 48df3bf4..91a9687c 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ # OneMod **OneMod** is an orchestration package that allows users to build pipelines of -various models created by [IHME Math Sciences](https://github.com/ihmeuw-msca). +statistical models created by [IHME Math Sciences](https://github.com/ihmeuw-msca). Core features of **OneMod** include an intuitive syntax for defining the dataflow between pipeline stages, the ability to easily parallelize over different data subsets and/or parameter sets, and options for data validation. diff --git a/docs/index.rst b/docs/index.rst index c01c116c..cb6106cb 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -10,7 +10,7 @@ Welcome to OneMod! developer_guide/index **OneMod** is an orchestration package that allows users to build pipelines of -various models created by `IHME Math Sciences `_. +statistical models created by `IHME Math Sciences `_. Core features of **OneMod** include an intuitive syntax for defining the dataflow between pipeline stages, the ability to easily parallelize over different data subsets and/or parameter sets, and options for data validation. diff --git a/src/onemod/backend/jobmon_backend.py b/src/onemod/backend/jobmon_backend.py index 210d864c..0ccd5468 100644 --- a/src/onemod/backend/jobmon_backend.py +++ b/src/onemod/backend/jobmon_backend.py @@ -38,7 +38,6 @@ """ # TODO: Optional stage-specific Python environments -# TODO: User-defined max_attempts # TODO: Could dependencies be method specific? # TODO: should we check resources format, minimum resources, cluster? @@ -46,12 +45,16 @@ from pathlib import Path from typing import Any, Literal -from jobmon.client.api import Tool -from jobmon.client.task import Task -from jobmon.client.task_template import TaskTemplate -from jobmon.client.workflow import Workflow from pydantic import ConfigDict, validate_call +try: + from jobmon.client.api import Tool + from jobmon.client.task import Task + from jobmon.client.task_template import TaskTemplate + from jobmon.client.workflow import Workflow +except ImportError: + raise ImportError("Missing optional 'jobmon' dependency") + from onemod.backend.utils import ( check_input_exists, check_method,