diff --git a/ax/benchmark/benchmark.py b/ax/benchmark/benchmark.py index a7b93ee82a7..564643f0f3f 100644 --- a/ax/benchmark/benchmark.py +++ b/ax/benchmark/benchmark.py @@ -54,9 +54,13 @@ from ax.generation_strategy.generation_strategy import GenerationStrategy from ax.orchestration.orchestrator import Orchestrator from ax.service.utils.best_point import ( + _aggregate_and_cumulate_trace, + _compute_trace_values, + _pivot_data_with_feasibility, _prepare_data_for_trace, derelativize_opt_config, get_trace, + is_row_feasible, ) from ax.service.utils.best_point_mixin import BestPointMixin from ax.service.utils.orchestrator_options import OrchestratorOptions, TrialType @@ -791,64 +795,80 @@ def get_opt_trace_by_steps(experiment: Experiment) -> npt.NDArray: that is in terms of steps, with one element added each time a step completes. + Supports single-objective, multi-objective, and constrained problems. + For multi-objective problems, the trace is in terms of hypervolume. + Args: experiment: An experiment produced by `benchmark_replication`; it must have `BenchmarkTrialMetadata` (as produced by `BenchmarkRunner`) for each trial, and its data must have a "step" column. """ optimization_config = none_throws(experiment.optimization_config) + full_df = experiment.lookup_data().full_df - if optimization_config.is_moo_problem: - raise NotImplementedError( - "Cumulative epochs only supported for single objective problems." - ) - if len(optimization_config.outcome_constraints) > 0: - raise NotImplementedError( - "Cumulative epochs not supported for problems with outcome constraints." - ) + full_df["row_feasible"] = is_row_feasible( + df=full_df, + optimization_config=optimization_config, + # For the sake of this function, we only care about feasible trials. The + # distinction between infeasible and undetermined is not important. + undetermined_value=False, + ) - objective_name = optimization_config.objective.metric.name - data = experiment.lookup_data() - full_df = data.full_df + # Pivot to wide format with feasibility + df_wide = _pivot_data_with_feasibility( + df=full_df, + index=["trial_index", "arm_name", MAP_KEY], + optimization_config=optimization_config, + ) - # Has timestamps; needs to be merged with full_df because it contains - # data on epochs that didn't actually run due to early stopping, and we need - # to know which actually ran - def _get_df(trial: Trial) -> pd.DataFrame: + def _get_timestamps(experiment: Experiment) -> pd.Series: """ - Get the (virtual) time each epoch finished at. + Get the (virtual) time at which each training progression finished. """ - metadata = trial.run_metadata["benchmark_metadata"] - backend_simulator = none_throws(metadata.backend_simulator) - # Data for the first metric, which is the only metric - df = next(iter(metadata.dfs.values())) - start_time = backend_simulator.get_sim_trial_by_index( - trial.index - ).sim_start_time - df["time"] = df["virtual runtime"] + start_time - return df - - with_timestamps = pd.concat( - ( - _get_df(trial=assert_is_instance(trial, Trial)) - for trial in experiment.trials.values() - ), - axis=0, - ignore_index=True, - )[["trial_index", MAP_KEY, "time"]] - - df = ( - full_df.loc[ - full_df["metric_name"] == objective_name, - ["trial_index", "arm_name", "mean", MAP_KEY], - ] - .merge(with_timestamps, how="left") - .sort_values("time", ignore_index=True) + frames = [] + for trial in experiment.trials.values(): + trial = assert_is_instance(trial, Trial) + metadata = trial.run_metadata["benchmark_metadata"] + backend_simulator = none_throws(metadata.backend_simulator) + sim_trial = backend_simulator.get_sim_trial_by_index( + trial_index=trial.index + ) + start_time = sim_trial.sim_start_time + # timestamps are identical across all metrics, so just use the first one + frame = next(iter(metadata.dfs.values())).copy() + frame["time"] = frame["virtual runtime"] + start_time + frames.append(frame) + df = pd.concat(frames, axis=0, ignore_index=True).set_index( + ["trial_index", "arm_name", MAP_KEY] + ) + return df["time"] + + # Compute timestamps and join with df_wide *before* cumulative computations. + # This is critical because cumulative HV/objective calculations depend on + # the temporal ordering of observations. + timestamps = _get_timestamps(experiment=experiment) + + # Merge timestamps and sort by time before cumulative computations + df_wide = df_wide.join( + timestamps, on=["trial_index", "arm_name", MAP_KEY], how="left" + ).sort_values(by="time", ascending=True, ignore_index=True) + + # Compute per-evaluation (trial_index, MAP_KEY) cumulative values, + # with keep_order=True to preserve ordering by timestamp + df_wide["value"], maximize = _compute_trace_values( + df_wide=df_wide, + optimization_config=optimization_config, + use_cumulative_best=True, ) - return ( - df["mean"].cummin() - if optimization_config.objective.minimize - else df["mean"].cummax() + # Get a value for each (trial_index, arm_name, MAP_KEY) tuple + value_by_arm_pull = df_wide[["trial_index", "arm_name", MAP_KEY, "value"]] + + # Aggregate by trial and step, then compute cumulative best + return _aggregate_and_cumulate_trace( + df=value_by_arm_pull, + by=["trial_index", MAP_KEY], + maximize=maximize, + keep_order=True, ).to_numpy() @@ -867,14 +887,15 @@ def get_benchmark_result_with_cumulative_steps( opt_trace = get_opt_trace_by_steps(experiment=experiment) return replace( result, - optimization_trace=opt_trace, - cost_trace=np.arange(1, len(opt_trace) + 1, dtype=int), + optimization_trace=opt_trace.tolist(), + cost_trace=np.arange(1, len(opt_trace) + 1, dtype=int).tolist(), # Empty - oracle_trace=np.full(len(opt_trace), np.nan), - inference_trace=np.full(len(opt_trace), np.nan), + oracle_trace=np.full_like(opt_trace, np.nan).tolist(), + inference_trace=np.full_like(opt_trace, np.nan).tolist(), + is_feasible_trace=None, score_trace=compute_score_trace( optimization_trace=opt_trace, baseline_value=baseline_value, optimal_value=optimal_value, - ), + ).tolist(), ) diff --git a/ax/benchmark/testing/benchmark_stubs.py b/ax/benchmark/testing/benchmark_stubs.py index 2a9e2f35006..f21213b9b9f 100644 --- a/ax/benchmark/testing/benchmark_stubs.py +++ b/ax/benchmark/testing/benchmark_stubs.py @@ -312,15 +312,52 @@ def get_async_benchmark_problem( n_steps: int = 1, lower_is_better: bool = False, report_inference_value_as_trace: bool = False, + num_objectives: int = 1, + num_constraints: int = 0, ) -> BenchmarkProblem: + """ + Create an early-stopping benchmark problem with MAP_KEY data. + + Args: + map_data: Whether to use map metrics (required for early stopping). + step_runtime_fn: Optional runtime function for steps. + n_steps: Number of steps per trial. + lower_is_better: Whether lower values are better (for SOO). + report_inference_value_as_trace: Whether to report inference trace. + num_objectives: Number of objectives (1 for SOO, >1 for MOO). + num_constraints: Number of outcome constraints to add. + + Returns: + A BenchmarkProblem suitable for early-stopping evaluation. + """ search_space = get_discrete_search_space() - test_function = IdentityTestFunction(n_steps=n_steps) - optimization_config = get_soo_opt_config( - outcome_names=["objective"], - use_map_metric=map_data, - observe_noise_sd=True, - lower_is_better=lower_is_better, - ) + + # Create outcome names for objectives and constraints + objective_names = [f"objective_{i}" for i in range(num_objectives)] + constraint_names = [f"constraint_{i}" for i in range(num_constraints)] + outcome_names = [*objective_names, *constraint_names] + + test_function = IdentityTestFunction(n_steps=n_steps, outcome_names=outcome_names) + + if num_objectives == 1: + # Single-objective: first outcome is objective, rest are constraints + optimization_config = get_soo_opt_config( + outcome_names=outcome_names, + lower_is_better=lower_is_better, + observe_noise_sd=True, + use_map_metric=map_data, + ) + else: + # Multi-objective: pass all outcomes (objectives + constraints) + # get_moo_opt_config will use the last num_constraints as constraints + optimization_config = get_moo_opt_config( + outcome_names=outcome_names, + ref_point=[1.0] * num_objectives, + num_constraints=num_constraints, + lower_is_better=lower_is_better, + observe_noise_sd=True, + use_map_metric=map_data, + ) return BenchmarkProblem( name="test", @@ -330,6 +367,7 @@ def get_async_benchmark_problem( num_trials=4, baseline_value=19 if lower_is_better else 0, optimal_value=0 if lower_is_better else 19, + worst_feasible_value=5.0 if num_constraints > 0 else None, step_runtime_function=step_runtime_fn, report_inference_value_as_trace=report_inference_value_as_trace, ) diff --git a/ax/benchmark/tests/test_benchmark.py b/ax/benchmark/tests/test_benchmark.py index c58b1441062..211dd5e7a6f 100644 --- a/ax/benchmark/tests/test_benchmark.py +++ b/ax/benchmark/tests/test_benchmark.py @@ -1195,28 +1195,85 @@ def test_get_opt_trace_by_cumulative_epochs(self) -> None: new_opt_trace = get_opt_trace_by_steps(experiment=experiment) self.assertEqual(list(new_opt_trace), [0.0, 0.0, 1.0, 1.0, 2.0, 3.0]) - method = get_sobol_benchmark_method() - with self.subTest("MOO"): - problem = get_multi_objective_benchmark_problem() - + with self.subTest("Multi-objective"): + # Multi-objective problem with step data + problem = get_async_benchmark_problem( + map_data=True, + n_steps=5, + num_objectives=2, + # Ensure we don't have two finishing at the same time, for + # determinism + step_runtime_fn=lambda params: params["x0"] * (1 - 0.01 * params["x0"]), + ) experiment = self.run_optimization_with_orchestrator( problem=problem, method=method, seed=0 ) - with self.assertRaisesRegex( - NotImplementedError, "only supported for single objective" - ): - get_opt_trace_by_steps(experiment=experiment) + new_opt_trace = get_opt_trace_by_steps(experiment=experiment) + self.assertListEqual( + new_opt_trace.tolist(), + [ + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 1.0, + 4.0, + 4.0, + 4.0, + 4.0, + 4.0, + 4.0, + 4.0, + ], + ) with self.subTest("Constrained"): - problem = get_benchmark_problem("constrained_gramacy_observed_noise") + # Constrained problem with step data. + problem = get_async_benchmark_problem( + map_data=True, + n_steps=5, + num_constraints=1, + # Ensure we don't have two finishing at the same time, for + # determinism + step_runtime_fn=lambda params: params["x0"] * (1 - 0.01 * params["x0"]), + ) experiment = self.run_optimization_with_orchestrator( problem=problem, method=method, seed=0 ) - with self.assertRaisesRegex( - NotImplementedError, - "not supported for problems with outcome constraints", - ): - get_opt_trace_by_steps(experiment=experiment) + new_opt_trace = get_opt_trace_by_steps(experiment=experiment) + self.assertListEqual( + new_opt_trace.tolist(), + [ + 0.0, + 0.0, + 0.0, + 0.0, + 0.0, + 1.0, + 1.0, + 2.0, + 2.0, + 2.0, + 2.0, + 2.0, + 2.0, + 3.0, + 3.0, + 3.0, + 3.0, + 3.0, + 3.0, + 3.0, + ], + ) def test_get_benchmark_result_with_cumulative_steps(self) -> None: """See test_get_opt_trace_by_cumulative_epochs for more info.""" diff --git a/ax/service/utils/best_point.py b/ax/service/utils/best_point.py index 1670c83d783..319335773dc 100644 --- a/ax/service/utils/best_point.py +++ b/ax/service/utils/best_point.py @@ -898,6 +898,140 @@ def _compute_utility_from_preference_model( return utilities +def _compute_trace_values( + df_wide: pd.DataFrame, + optimization_config: OptimizationConfig, + use_cumulative_best: bool = True, +) -> tuple[pd.Series, bool]: + """ + Compute per-observation trace values (hypervolume for MOO, objective for SOO). + + This function contains the core logic for computing trace values that is shared + between `get_trace_by_arm_pull_from_data` and `get_opt_trace_by_steps`. + + Args: + df_wide: DataFrame with metric columns and "feasible" column, + already sorted in desired cumulative order (e.g., by trial_index + or by timestamp). + optimization_config: The optimization config. Must not be in relative form. + use_cumulative_best: If True, apply cumulative best at observation level + (for SOO only; MOO always uses cumulative HV when True). + + Returns: + A tuple of (values Series, maximize flag). + The maximize flag indicates whether higher values are better. + """ + objective = optimization_config.objective + maximize = True + # MOO and *not* ScalarizedObjective + if isinstance(objective, MultiObjective): + optimization_config = assert_is_instance( + optimization_config, MultiObjectiveOptimizationConfig + ) + values = pd.Series( + get_hypervolume_trace_of_outcomes_multi_objective( + df_wide=df_wide, + optimization_config=optimization_config, + use_cumulative_hv=use_cumulative_best, + ) + ) + else: + maximize = not objective.minimize + values = pd.Series( + get_values_of_outcomes_single_or_scalarized_objective( + df_wide=df_wide, objective=objective + ) + ) + if df_wide["feasible"].any() and use_cumulative_best: + values = values.cummax() if maximize else values.cummin() + return values, maximize + + +def _pivot_data_with_feasibility( + df: pd.DataFrame, + index: list[str], + optimization_config: OptimizationConfig, +) -> pd.DataFrame: + """ + Pivot data to wide format with feasibility information. + + Core logic shared between `_prepare_data_for_trace` and `get_opt_trace_by_steps`: + adds feasibility column, pivots to wide format by metrics, validates all + metrics are present, and aggregates feasibility by the specified index. + + Args: + df: Data in the format returned by ``Data.df``, with a separate row for + each observation-metric combination. Must have "row_feasible" column. + optimization_config: The optimization config. Must not be in relative form. + index: Column names to use as index for pivoting (e.g., ["trial_index", + "arm_name"] or ["trial_index", "arm_name", "step"]). + + Returns: + DataFrame with columns from `index` + metric names + "feasible", + where "feasible" indicates whether the observation satisfies all constraints. + """ + # Get the metrics we need + metric_names = list(optimization_config.metrics.keys()) + mask = df["metric_name"].isin(metric_names) + + # Transform to wide format with metric columns + df_wide = df[mask].pivot(index=index, columns="metric_name", values="mean") + + # Validate all metrics are present: + # reindex fills missing columns with NaN, so this catches both + # columns absent from df_wide and those containing NaNs + incomplete_metrics: pd.Series = df_wide.reindex(columns=metric_names).isna().any() + + if df_wide.empty or incomplete_metrics.any(): + # If df_wide is empty, all metrics are missing + missing_metrics = ( + metric_names + if df_wide.empty + else incomplete_metrics.index[incomplete_metrics].tolist() + ) + raise ValueError( + "Some metrics are not present for all trials and arms. The " + f"following are missing: {missing_metrics}." + ) + + # Aggregate feasibility by index + df_wide["feasible"] = df.groupby(by=index)["row_feasible"].all() + df_wide.reset_index(inplace=True) + + return df_wide + + +def _aggregate_and_cumulate_trace( + df: pd.DataFrame, + by: list[str], + value_name: str = "value", + maximize: bool = True, + keep_order: bool = True, +) -> pd.Series: + """ + Aggregate values by groups and compute cumulative best. + + This helper encapsulates the common pattern of grouping observations, + aggregating to get the best value per group, and then computing the + cumulative best across groups. + + Args: + df: DataFrame with values to aggregate. + by: Columns to group by (e.g., ["trial_index"] or + ["trial_index", "step"]). + value_name: Column name containing values to aggregate. + maximize: Whether to maximize (True) or minimize (False). + keep_order: If True, do not sort group keys; groups will appear + in the same order as they did in the original DataFrame. + + Returns: + Series with cumulative best values. + """ + grouped = df.groupby(by=by, sort=not keep_order)[value_name] + aggregated = grouped.max() if maximize else grouped.min() + return aggregated.cummax() if maximize else aggregated.cummin() + + def _prepare_data_for_trace( df: pd.DataFrame, optimization_config: OptimizationConfig, @@ -928,29 +1062,11 @@ def _prepare_data_for_trace( # distinction between infeasible and undetermined is not important. undetermined_value=False, ) - - # Get the metrics we need - metrics = list(optimization_config.metrics.keys()) - - # Transform to a DataFrame with columns ["trial_index", "arm_name"] + - # relevant metric names, and values being means. - df_wide = ( - df[df["metric_name"].isin(metrics)] - .set_index(["trial_index", "arm_name", "metric_name"])["mean"] - .unstack(level="metric_name") + return _pivot_data_with_feasibility( + df=df, + index=["trial_index", "arm_name"], + optimization_config=optimization_config, ) - missing_metrics = [ - m for m in metrics if m not in df_wide.columns or df_wide[m].isnull().any() - ] - if len(missing_metrics) > 0: - raise ValueError( - "Some metrics are not present for all trials and arms. The " - f"following are missing: {missing_metrics}." - ) - df_wide["feasible"] = df.groupby(["trial_index", "arm_name"])["row_feasible"].all() - df_wide.reset_index(inplace=True) - - return df_wide def get_trace_by_arm_pull_from_data( @@ -966,9 +1082,8 @@ def get_trace_by_arm_pull_from_data( function returns a single value for each arm pull, even if there are multiple arms per trial or if an arm is repeated in multiple trials. - For BOPE experiments, this function computes - utility predictions using the learned preference model from the PE_EXPERIMENT - auxiliary experiment. + For BOPE experiments, this function computes utility predictions using the + learned preference model from the PE_EXPERIMENT auxiliary experiment. Args: df: Data in the format returned by ``Data.df``, with a separate row for @@ -991,11 +1106,11 @@ def get_trace_by_arm_pull_from_data( "`Derelativize` the optimization config, or use `get_trace`." ) empty_result = pd.DataFrame(columns=["trial_index", "arm_name", "value"]) - if len(df) == 0: + if df.empty: return empty_result df_wide = _prepare_data_for_trace(df=df, optimization_config=optimization_config) - if len(df_wide) == 0: + if df_wide.empty: return empty_result # Handle preference learning experiments @@ -1013,25 +1128,13 @@ def get_trace_by_arm_pull_from_data( ) return df_wide[["trial_index", "arm_name", "value"]] - # MOO and *not* ScalarizedObjective - if isinstance(optimization_config.objective, MultiObjective): - optimization_config = assert_is_instance( - optimization_config, MultiObjectiveOptimizationConfig - ) - df_wide["value"] = get_hypervolume_trace_of_outcomes_multi_objective( - df_wide=df_wide, - optimization_config=optimization_config, - use_cumulative_hv=use_cumulative_best, - ) - return df_wide[["trial_index", "arm_name", "value"]] - df_wide["value"] = get_values_of_outcomes_single_or_scalarized_objective( - df_wide=df_wide, objective=optimization_config.objective + # Compute per-evaluation (trial_index) cumulative values + df_wide["value"], _ = _compute_trace_values( + df_wide=df_wide, + optimization_config=optimization_config, + use_cumulative_best=use_cumulative_best, ) - if df_wide["feasible"].any() and use_cumulative_best: - min_or_max = ( - np.minimum if optimization_config.objective.minimize else np.maximum - ) - df_wide["value"] = min_or_max.accumulate(df_wide["value"]) + return df_wide[["trial_index", "arm_name", "value"]] @@ -1040,7 +1143,8 @@ def get_trace( optimization_config: OptimizationConfig | None = None, include_status_quo: bool = False, ) -> list[float]: - """Compute the optimization trace at each iteration. + """ + Compute the optimization trace at each iteration. Given an experiment and an optimization config, compute the performance at each iteration. For multi-objective, the performance is computed as @@ -1107,22 +1211,19 @@ def get_trace( experiment=experiment, ) - # Get a value for each trial_index + arm + # Get a value for each (trial_index, arm_name) tuple value_by_arm_pull = get_trace_by_arm_pull_from_data( df=df, optimization_config=optimization_config, use_cumulative_best=True, experiment=experiment, ) - # Aggregate to trial level + # Aggregate by trial, then. compute cumulative best objective = optimization_config.objective maximize = isinstance(objective, MultiObjective) or not objective.minimize - trial_grouped = value_by_arm_pull.groupby("trial_index")["value"] - if maximize: - value_by_trial = trial_grouped.max() - cumulative_value = np.maximum.accumulate(value_by_trial) - else: - value_by_trial = trial_grouped.min() - cumulative_value = np.minimum.accumulate(value_by_trial) - - return cumulative_value.tolist() + return _aggregate_and_cumulate_trace( + df=value_by_arm_pull, + by=["trial_index"], + maximize=maximize, + keep_order=False, # sort by trial index + ).tolist()