Skip to content
332 changes: 248 additions & 84 deletions openml/runs/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from collections import OrderedDict
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, cast

import numpy as np
import pandas as pd
Expand Down Expand Up @@ -54,6 +54,225 @@
ERROR_CODE = 512


def _validate_flow_and_task_inputs(
flow: OpenMLFlow | OpenMLTask,
task: OpenMLTask | OpenMLFlow,
flow_tags: list[str] | None,
) -> tuple[OpenMLFlow, OpenMLTask]:
"""Validate and normalize inputs for flow and task execution.

Parameters
----------
flow : OpenMLFlow or OpenMLTask
The flow object (may be swapped with task for backward compatibility).
task : OpenMLTask or OpenMLFlow
The task object (may be swapped with flow for backward compatibility).
flow_tags : List[str] or None
A list of tags that the flow should have at creation.

Returns
-------
Tuple[OpenMLFlow, OpenMLTask]
The validated flow and task.

Raises
------
ValueError
If flow_tags is not a list or task is not published.
"""
if flow_tags is not None and not isinstance(flow_tags, list):
raise ValueError("flow_tags should be a list")

# TODO: At some point in the future do not allow for arguments in old order (changed 6-2018).
# Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019).
if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow):
# We want to allow either order of argument (to avoid confusion).
warnings.warn(
"The old argument order (Flow, model) is deprecated and "
"will not be supported in the future. Please use the "
"order (model, Flow).",
DeprecationWarning,
stacklevel=3,
)
task, flow = flow, task

if not isinstance(flow, OpenMLFlow):
raise TypeError("Flow must be OpenMLFlow after validation")

if not isinstance(task, OpenMLTask):
raise TypeError("Task must be OpenMLTask after validation")

if task.task_id is None:
raise ValueError("The task should be published at OpenML")

return flow, task


def _sync_flow_with_server(
flow: OpenMLFlow,
task: OpenMLTask,
*,
upload_flow: bool,
avoid_duplicate_runs: bool,
) -> int | None:
"""Synchronize flow with server and check if setup/task combination is already present.

Parameters
----------
flow : OpenMLFlow
The flow to synchronize.
task : OpenMLTask
The task to check for duplicate runs.
upload_flow : bool
Whether to upload the flow if it doesn't exist.
avoid_duplicate_runs : bool
Whether to check for duplicate runs.

Returns
-------
int or None
The flow_id if synced with server, None otherwise.

Raises
------
PyOpenMLError
If flow_id mismatch or flow doesn't exist when expected.
OpenMLRunsExistError
If duplicate runs exist and avoid_duplicate_runs is True.
"""
# We only need to sync with the server right now if we want to upload the flow,
# or ensure no duplicate runs exist. Otherwise it can be synced at upload time.
flow_id = None
if upload_flow or avoid_duplicate_runs:
flow_id = flow_exists(flow.name, flow.external_version)
if isinstance(flow.flow_id, int) and flow_id != flow.flow_id:
if flow_id is not False:
raise PyOpenMLError(
f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'",
)
raise PyOpenMLError(
"Flow does not exist on the server, but 'flow.flow_id' is not None."
)
if upload_flow and flow_id is False:
flow.publish()
flow_id = flow.flow_id
elif flow_id:
flow_from_server = get_flow(flow_id)
_copy_server_fields(flow_from_server, flow)
if avoid_duplicate_runs:
flow_from_server.model = flow.model
setup_id = setup_exists(flow_from_server)
task_id = task.task_id
ids = run_exists(cast("int", task_id), setup_id)
if ids:
error_message = (
"One or more runs of this setup were already performed on the task."
)
raise OpenMLRunsExistError(ids, error_message)
else:
# Flow does not exist on server and we do not want to upload it.
# No sync with the server happens.
flow_id = None

return flow_id


def _prepare_run_environment(flow: OpenMLFlow) -> tuple[list[str], list[str]]:
"""Prepare run environment information and tags.

Parameters
----------
flow : OpenMLFlow
The flow to get version information from.

Returns
-------
Tuple[List[str], List[str]]
A tuple of (tags, run_environment).
"""
run_environment = flow.extension.get_version_information()
tags = ["openml-python", run_environment[1]]
return tags, run_environment


def _create_run_from_results( # noqa: PLR0913
task: OpenMLTask,
flow: OpenMLFlow,
flow_id: int | None,
data_content: list[list],
trace: OpenMLRunTrace | None,
fold_evaluations: OrderedDict[str, OrderedDict],
sample_evaluations: OrderedDict[str, OrderedDict],
tags: list[str],
run_environment: list[str],
upload_flow: bool,
avoid_duplicate_runs: bool,
) -> OpenMLRun:
"""Create an OpenMLRun object from execution results.

Parameters
----------
task : OpenMLTask
The task that was executed.
flow : OpenMLFlow
The flow that was executed.
flow_id : int or None
The flow ID if synced with server.
data_content : List[List]
The prediction data content.
trace : OpenMLRunTrace or None
The execution trace if available.
fold_evaluations : OrderedDict
The fold-based evaluation measures.
sample_evaluations : OrderedDict
The sample-based evaluation measures.
tags : List[str]
Tags to attach to the run.
run_environment : List[str]
Environment information.
upload_flow : bool
Whether the flow was uploaded.
avoid_duplicate_runs : bool
Whether duplicate runs were checked.

Returns
-------
OpenMLRun
The created run object.
"""
dataset = task.get_dataset()
fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"]
generated_description = "\n".join(fields)

run = OpenMLRun(
task_id=cast("int", task.task_id),
flow_id=flow_id,
dataset_id=dataset.dataset_id,
model=flow.model,
flow_name=flow.name,
tags=tags,
trace=trace,
data_content=data_content,
flow=flow,
setup_string=flow.extension.create_setup_string(flow.model),
description_text=generated_description,
)

if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None:
# We only extract the parameter settings if a sync happened with the server.
# I.e. when the flow was uploaded or we found it in the avoid_duplicate check.
# Otherwise, we will do this at upload time.
run.parameter_settings = flow.extension.obtain_parameter_values(flow)

# now we need to attach the detailed evaluations
if task.task_type_id == TaskType.LEARNING_CURVE:
run.sample_evaluations = sample_evaluations
else:
run.fold_evaluations = fold_evaluations

return run


# TODO(eddiebergman): Could potentially overload this but
# it seems very big to do so
def run_model_on_task( # noqa: PLR0913
Expand Down Expand Up @@ -175,7 +394,7 @@ def get_task_and_type_conversion(_task: int | str | OpenMLTask) -> OpenMLTask:
return run


def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913
def run_flow_on_task( # noqa: PLR0913
flow: OpenMLFlow,
task: OpenMLTask,
avoid_duplicate_runs: bool | None = None,
Expand Down Expand Up @@ -222,116 +441,61 @@ def run_flow_on_task( # noqa: C901, PLR0912, PLR0915, PLR0913
run : OpenMLRun
Result of the run.
"""
if flow_tags is not None and not isinstance(flow_tags, list):
raise ValueError("flow_tags should be a list")

if avoid_duplicate_runs is None:
avoid_duplicate_runs = openml.config.avoid_duplicate_runs

# TODO: At some point in the future do not allow for arguments in old order (changed 6-2018).
# Flexibility currently still allowed due to code-snippet in OpenML100 paper (3-2019).
if isinstance(flow, OpenMLTask) and isinstance(task, OpenMLFlow):
# We want to allow either order of argument (to avoid confusion).
warnings.warn(
"The old argument order (Flow, model) is deprecated and "
"will not be supported in the future. Please use the "
"order (model, Flow).",
DeprecationWarning,
stacklevel=2,
)
task, flow = flow, task

if task.task_id is None:
raise ValueError("The task should be published at OpenML")
# 1. Validate inputs
flow, task = _validate_flow_and_task_inputs(flow, task, flow_tags)

# 2. Prepare the model
if flow.model is None:
flow.model = flow.extension.flow_to_model(flow)

flow.model = flow.extension.seed_model(flow.model, seed=seed)

# We only need to sync with the server right now if we want to upload the flow,
# or ensure no duplicate runs exist. Otherwise it can be synced at upload time.
flow_id = None
if upload_flow or avoid_duplicate_runs:
flow_id = flow_exists(flow.name, flow.external_version)
if isinstance(flow.flow_id, int) and flow_id != flow.flow_id:
if flow_id is not False:
raise PyOpenMLError(
f"Local flow_id does not match server flow_id: '{flow.flow_id}' vs '{flow_id}'",
)
raise PyOpenMLError(
"Flow does not exist on the server, but 'flow.flow_id' is not None."
)
if upload_flow and flow_id is False:
flow.publish()
flow_id = flow.flow_id
elif flow_id:
flow_from_server = get_flow(flow_id)
_copy_server_fields(flow_from_server, flow)
if avoid_duplicate_runs:
flow_from_server.model = flow.model
setup_id = setup_exists(flow_from_server)
ids = run_exists(task.task_id, setup_id)
if ids:
error_message = (
"One or more runs of this setup were already performed on the task."
)
raise OpenMLRunsExistError(ids, error_message)
else:
# Flow does not exist on server and we do not want to upload it.
# No sync with the server happens.
flow_id = None

dataset = task.get_dataset()
# 3. Sync with server and check for duplicates
flow_id = _sync_flow_with_server(
flow,
task,
upload_flow=upload_flow,
avoid_duplicate_runs=avoid_duplicate_runs,
)

run_environment = flow.extension.get_version_information()
tags = ["openml-python", run_environment[1]]
# 4. Prepare run environment
tags, run_environment = _prepare_run_environment(flow)

# 5. Check if model is already fitted
if flow.extension.check_if_model_fitted(flow.model):
warnings.warn(
"The model is already fitted! This might cause inconsistency in comparison of results.",
RuntimeWarning,
stacklevel=2,
)

# execute the run
res = _run_task_get_arffcontent(
# 6. Execute the run (parallel processing happens here)
data_content, trace, fold_evaluations, sample_evaluations = _run_task_get_arffcontent(
model=flow.model,
task=task,
extension=flow.extension,
add_local_measures=add_local_measures,
n_jobs=n_jobs,
)

data_content, trace, fold_evaluations, sample_evaluations = res
fields = [*run_environment, time.strftime("%c"), "Created by run_flow_on_task"]
generated_description = "\n".join(fields)
run = OpenMLRun(
task_id=task.task_id,
# 7. Create run from results
run = _create_run_from_results(
task=task,
flow=flow,
flow_id=flow_id,
dataset_id=dataset.dataset_id,
model=flow.model,
flow_name=flow.name,
tags=tags,
trace=trace,
data_content=data_content,
flow=flow,
setup_string=flow.extension.create_setup_string(flow.model),
description_text=generated_description,
trace=trace,
fold_evaluations=fold_evaluations,
sample_evaluations=sample_evaluations,
tags=tags,
run_environment=run_environment,
upload_flow=upload_flow,
avoid_duplicate_runs=avoid_duplicate_runs,
)

if (upload_flow or avoid_duplicate_runs) and flow.flow_id is not None:
# We only extract the parameter settings if a sync happened with the server.
# I.e. when the flow was uploaded or we found it in the avoid_duplicate check.
# Otherwise, we will do this at upload time.
run.parameter_settings = flow.extension.obtain_parameter_values(flow)

# now we need to attach the detailed evaluations
if task.task_type_id == TaskType.LEARNING_CURVE:
run.sample_evaluations = sample_evaluations
else:
run.fold_evaluations = fold_evaluations

# 8. Log completion message
if flow_id:
message = f"Executed Task {task.task_id} with Flow id:{run.flow_id}"
else:
Expand Down