Skip to content

[Bug]: Inconsistent behaviour between client.get_hypervolume and client.get_pareto_optimal_parameters for ExternalGenerationNode #4831

@CompRhys

Description

@CompRhys

What happened?

If you implement an ExternalGenerationNode that is not a torch adapter and then call these two methods they have different fallback behaviours. The get_hypervolume method with the default of use_model_predictions=True fails whilst the get_pareto_optimal_parameters method fits a new BOTORCH_MODULAR adapter on the fly in order to make the predictions.

Whilst this might not be a bug it's surprising to me that it doesn't just fail for the get_pareto_optimal_parameters method.

Please provide a minimal, reproducible example of the unexpected behavior.

# %%
"""
Test get_hypervolume and get_pareto_optimal_parameters with SEBOGenerationNode.

Uses a Sobol->SEBO generation strategy since SEBO needs initial data (Winsorize transform).
"""

import warnings

warnings.filterwarnings("ignore", category=UserWarning, module="linear_operator")
warnings.filterwarnings("ignore", message=".*not p.d.*")

from ax.adapter.registry import Generators
from ax.generation_strategy.generation_node import GenerationNode
from ax.generation_strategy.generator_spec import GeneratorSpec
from ax.generation_strategy.generation_strategy import GenerationStrategy
from ax.generation_strategy.transition_criterion import MinTrials
from ax.service.ax_client import AxClient, ObjectiveProperties

from lunio.generation.sebo import SEBOGenerationNode

SEED = 12345
N_SOBOL = 5
N_SEBO = 5

# %%
# Create Sobol -> SEBO generation strategy
print("=" * 70)
print("Testing get_hypervolume and get_pareto_optimal_parameters with SEBO")
print("=" * 70)

moo_target_point = {"x1": 0.0, "x2": 0.0, "x3": 0.0}

sobol_node = GenerationNode(
    name="Sobol",
    generator_specs=[GeneratorSpec(Generators.SOBOL)],
    transition_criteria=[MinTrials(threshold=N_SOBOL, transition_to="SEBO-MOO")],
)

moo_sebo_node = SEBOGenerationNode(
    target_point=moo_target_point,
    batch_size=1,
    penalty="L0_norm",
    sparsity_threshold=3,
    maximize=[True, True],
    name="SEBO-MOO",
)

moo_gs = GenerationStrategy(name="Sobol+SEBO", nodes=[sobol_node, moo_sebo_node])
client = AxClient(generation_strategy=moo_gs, random_seed=SEED)
client.create_experiment(
    name="sebo_moo_test",
    parameters=[
        {"name": "x1", "type": "range", "bounds": [0.0, 1.0]},
        {"name": "x2", "type": "range", "bounds": [0.0, 1.0]},
        {"name": "x3", "type": "range", "bounds": [0.0, 1.0]},
    ],
    objectives={
        "obj1": ObjectiveProperties(minimize=False, threshold=0.0),
        "obj2": ObjectiveProperties(minimize=False, threshold=0.0),
    },
    overwrite_existing_experiment=True,
)

# %%
# Run Sobol + SEBO trials
print(f"\nRunning {N_SOBOL} Sobol + {N_SEBO} SEBO trials...")
for i in range(N_SOBOL + N_SEBO):
    params, trial_idx = client.get_next_trial()
    obj1 = params["x1"] + 0.5 * params["x2"]
    obj2 = params["x2"] + 0.5 * params["x3"]
    client.complete_trial(trial_index=trial_idx, raw_data={"obj1": obj1, "obj2": obj2})
    phase = "Sobol" if i < N_SOBOL else "SEBO"
    print(f"  Trial {trial_idx} ({phase}) complete")

# %%
# Test get_hypervolume
print("\n" + "-" * 40)
for use_model_predictions in [False, True]:
    print("\n" + "-" * 40)
    print(f"Testing get_hypervolume(use_model_predictions={use_model_predictions})...")
    try:
        hv = client.get_hypervolume(use_model_predictions=use_model_predictions)
        print(f"  ✓ succeeded: {hv:.4f}")
    except Exception as e:
        print(f"  ✗ failed: {e}")

# %%
# Test get_pareto_optimal_parameters
for use_model_predictions in [False, True]:
    print("\n" + "-" * 40)
    print(f"Testing get_pareto_optimal_parameters(use_model_predictions={use_model_predictions})...")
    try:
        pareto = client.get_pareto_optimal_parameters(use_model_predictions=use_model_predictions)
        print(f"  ✓ succeeded: {len(pareto)} Pareto-optimal points")
        for trial_idx, (params, _) in pareto.items():
            print(f"    Trial {trial_idx}: x1={params['x1']:.3f}, x2={params['x2']:.3f}, x3={params['x3']:.3f}")
    except Exception as e:
        print(f"  ✗ failed: {e}")

# %%

Where the lunio module contains

"""Sparsity Exploring Bayesian Optimization (SEBO) generation node for use in Ax."""

# pyright: basic

from typing import Any, Literal, override

import torch
from ax.adapter.data_utils import DataLoaderConfig, extract_experiment_data
from ax.adapter.transforms.remove_fixed import RemoveFixed
from ax.adapter.transforms.standardize_y import StandardizeY
from ax.adapter.transforms.unit_x import UnitX
from ax.adapter.transforms.winsorize import Winsorize
from ax.core.data import Data
from ax.core.experiment import Experiment
from ax.core.observation import ObservationFeatures
from ax.core.parameter import DerivedParameter, RangeParameter
from ax.core.types import TParameterization
from ax.generation_strategy.external_generation_node import ExternalGenerationNode
from ax.utils.common.logger import get_logger
from botorch.acquisition.multi_objective.logei import (
    qLogNoisyExpectedHypervolumeImprovement,
)
from botorch.acquisition.penalized import L0Approximation
from botorch.fit import fit_gpytorch_mll
from botorch.models import SingleTaskGP
from botorch.models.deterministic import GenericDeterministicModel
from botorch.models.model import ModelList
from botorch.optim import (
    Homotopy,
    HomotopyParameter,
    LogLinearHomotopySchedule,
    gen_batch_initial_conditions,
    optimize_acqf_homotopy,
)
from gpytorch.mlls import ExactMarginalLogLikelihood

logger = get_logger(__name__)

CLAMP_TOL = 1e-2


def L1_norm_func(X: torch.Tensor, init_point: torch.Tensor) -> torch.Tensor:
    """L1 norm from init_point. Takes `batch_shape x n x d`-dim input tensor `X`
    to a `batch_shape x n x 1`-dimensional L1 norm tensor.
    """
    return torch.linalg.norm((X - init_point), ord=1, dim=-1, keepdim=True)


def clamp_to_target(X: torch.Tensor, target_point: torch.Tensor, clamp_tol: float) -> torch.Tensor:
    """Clamp generated candidates within the given ranges to the target point.

    Args:
        X: A `batch_shape x n x d`-dim input tensor.
        target_point: A tensor of size `d` corresponding to the target point.
        clamp_tol: The clamping tolerance. Any value within `clamp_tol` of the
            `target_point` will be clamped to the `target_point`.
    """
    clamp_mask = (X - target_point).abs() <= clamp_tol
    X[clamp_mask] = target_point.clone().repeat(*X.shape[:-1], 1)[clamp_mask]
    return X


def get_batch_initial_conditions_sebo(
    acq_function: qLogNoisyExpectedHypervolumeImprovement,
    raw_samples: int,
    X_pareto: torch.Tensor,
    target_point: torch.Tensor,
    bounds: torch.Tensor,
    num_restarts: int = 20,
    inequality_constraints: list[tuple[torch.Tensor, torch.Tensor, float]] | None = None,
    fixed_features: dict[int, float] | None = None,
) -> torch.Tensor:
    """Generate starting points for the SEBO acquisition function optimization."""
    tkwargs: dict[str, Any] = {"device": X_pareto.device, "dtype": X_pareto.dtype}
    num_rand = num_restarts if len(X_pareto) == 0 else num_restarts // 2
    num_local = num_restarts - num_rand

    # (1) Random points (Sobol if no constraints, otherwise uses hit-and-run)
    X_cand_rand = gen_batch_initial_conditions(
        acq_function=acq_function,
        bounds=bounds,
        q=1,
        raw_samples=raw_samples,
        num_restarts=num_rand,
        options={"topn": True},
        fixed_features=fixed_features,
        inequality_constraints=inequality_constraints,
    ).to(**tkwargs)

    if num_local == 0:
        return X_cand_rand

    # (2) Perturbations of points on the Pareto frontier
    X_cand_local = X_pareto.clone()[torch.randint(high=len(X_pareto), size=(raw_samples,))]
    mask = X_cand_local != target_point
    X_cand_local[mask] += 0.2 * ((bounds[1] - bounds[0]) * torch.randn_like(X_cand_local))[mask]
    X_cand_local = torch.clamp(X_cand_local.unsqueeze(1), min=bounds[0], max=bounds[1])
    X_cand_local = X_cand_local[acq_function(X_cand_local).topk(num_local).indices]
    return torch.cat((X_cand_rand, X_cand_local), dim=0)


class SEBOGenerationNode(ExternalGenerationNode):
    """A generation node that uses SEBO to generate sparse candidate designs.

    SEBO (Sparsity Exploring Bayesian Optimization) is a method to simultaneously
    optimize one or more objectives while encouraging sparsity. It uses a
    multi-objective approach where an additional objective is a penalty term
    (L0 or L1 norm) measuring distance from a target point.

    Supports both single-objective and multi-objective optimization. For N
    objectives, the method optimizes N+1 objectives (the N user objectives +
    the sparsity penalty) using qLogNoisyExpectedHypervolumeImprovement.

    This implementation is validated against the SparseBO reference implementation:
    - Paper: S. Liu, Q. Feng, D. Eriksson, B. Letham and E. Bakshy.
      "Sparse Bayesian Optimization." International Conference on Artificial
      Intelligence and Statistics, 2023.
    - Repository: https://github.com/facebookresearch/SparseBO

    For validation benchmarks, see:
    - validate_sebo_benchmarks.py
    - sebo_sparsity_demo.py
    """

    def __init__(
        self,
        target_point: dict[str, float],
        batch_size: int,
        *,
        penalty: Literal["L0_norm"] = "L0_norm",
        sparsity_threshold: int | None = None,
        device: torch.device | None = None,
        dtype: torch.dtype | None = None,
        model_options: dict[str, Any] | None = None,
        optimizer_options: dict[str, Any] | None = None,
        name: str = "SEBOGenerationNode",
        maximize: bool | list[bool] = True,
    ) -> None:
        """Initialize the SEBO generation node.

        Args:
            target_point: Dictionary mapping parameter names to their target values.
                SEBO will encourage sparsity relative to this point.
            batch_size: The batch size for generating new candidates.
            penalty: Type of penalty to use (only "L0_norm" supported).
            sparsity_threshold: Maximum allowed sparsity. If None, defaults to
                the number of dimensions.
            device: The device to use for the generation node.
            dtype: The dtype to use for the generation node.
            model_options: Options to pass to the GP model.
            optimizer_options: Options for the acquisition function optimizer.
            name: The name of the generation node.
            maximize: Whether to maximize objectives. Can be a single bool (applied
                to all objectives) or a list of bools (one per objective).
        """
        super().__init__(name=name)

        self.target_point_dict = target_point
        self.batch_size = batch_size
        self.penalty = penalty
        self.sparsity_threshold = sparsity_threshold
        self.device = device if device is not None else torch.device("cpu")
        self.dtype = dtype if dtype is not None else torch.double
        self.model_options = model_options or {}
        self.optimizer_options = optimizer_options or {}
        self.maximize = maximize

        # State variables
        self.X_data: torch.Tensor | None = None  # Normalized [0,1]^d (via UnitX)
        self.Y_data: torch.Tensor | None = None  # Transformed (Winsorize + StandardizeY)
        self.tunable_parameters: list[RangeParameter] | None = None
        self.target_point: torch.Tensor | None = None  # Original space (tunable + derived)
        self._tunable_target_normalized: torch.Tensor | None = None  # [0,1]^d for clamping
        self._bounds: torch.Tensor | None = None  # (d, 2) lower/upper
        self._derived_weights: torch.Tensor | None = None  # (n_derived, n_tunable)
        self._derived_intercepts: torch.Tensor | None = None  # (n_derived,)
        self._L0_approx: L0Approximation | None = None
        self.metric_names: list[str] | None = None
        # Transforms for untransforming candidates
        self._remove_fixed_transform: RemoveFixed | None = None
        self._unit_x_transform: UnitX | None = None
        self._y_means: dict[str, float] = {}
        self._y_stds: dict[str, float] = {}

    def update_generator_state(self, experiment: Experiment, data: Data) -> None:
        """Update the state of the generator with the experiment and data.

        Uses Ax's extract_experiment_data and transform infrastructure to apply
        Winsorize and StandardizeY transforms, matching the standard Ax pipeline.

        Args:
            experiment: The experiment object.
            data: The data object.
        """
        search_space = experiment.search_space
        metric_names = list(experiment.optimization_config.metrics.keys())  # pyright: ignore[reportOptionalMemberAccess]

        # Use Ax's extract_experiment_data to get data in Ax's format
        experiment_data = extract_experiment_data(
            experiment=experiment,
            data_loader_config=DataLoaderConfig(),
            data=data,
        )

        # RemoveFixed: remove derived and fixed parameters (must be first)
        remove_fixed_transform = RemoveFixed(
            search_space=search_space,
            experiment_data=experiment_data,
        )
        search_space = remove_fixed_transform.transform_search_space(search_space)
        experiment_data = remove_fixed_transform.transform_experiment_data(experiment_data)
        self._remove_fixed_transform = remove_fixed_transform

        # After RemoveFixed, search_space only has tunable (Range) parameters
        parameter_names = list(search_space.parameters.keys())

        current_params = list(search_space.parameters.values())
        non_range = [p.name for p in current_params if not isinstance(p, RangeParameter)]
        if non_range:
            raise NotImplementedError(f"SEBOGenerationNode only supports RangeParameters, got: {non_range}")

        if self.tunable_parameters is None:
            self.tunable_parameters = current_params  # pyright: ignore[reportAttributeAccessIssue]
        elif [p.name for p in self.tunable_parameters] != [p.name for p in current_params]:
            raise RuntimeError("Search space parameters changed between calls.")

        if self.metric_names is None:
            self.metric_names = metric_names
        elif self.metric_names != metric_names:
            raise RuntimeError("Metric names changed between calls.")

        if self.sparsity_threshold is None:
            self.sparsity_threshold = len(parameter_names)

        # UnitX: normalize X to [0,1]^d
        unit_x_transform = UnitX(
            search_space=search_space,
            experiment_data=experiment_data,
        )
        experiment_data = unit_x_transform.transform_experiment_data(experiment_data)
        self._unit_x_transform = unit_x_transform

        if self.target_point is None:
            self._build_target_and_derived(parameter_names, remove_fixed_transform)

        # Winsorize: clips outliers using Tukey method
        winsorize_transform = Winsorize(
            search_space=search_space,
            experiment_data=experiment_data,
        )
        experiment_data = winsorize_transform.transform_experiment_data(experiment_data)

        # StandardizeY: standardize to mean=0, std=1
        standardize_transform = StandardizeY(
            search_space=search_space,
            experiment_data=experiment_data,
        )
        experiment_data = standardize_transform.transform_experiment_data(experiment_data)

        # Store transform parameters for untransforming predictions later
        self._y_means = standardize_transform.Ymean  # pyright: ignore[reportAttributeAccessIssue]
        self._y_stds = standardize_transform.Ystd  # pyright: ignore[reportAttributeAccessIssue]

        # Convert transformed ExperimentData to tensors using DataFrame operations
        arm_data = experiment_data.arm_data
        obs_data = experiment_data.observation_data

        # X from arm_data: select parameter columns in order (now in [0,1]^d)
        X = torch.tensor(
            arm_data[parameter_names].values,
            dtype=self.dtype,
            device=self.device,
        )

        # Y from observation_data: select mean columns for each metric in order
        Y = torch.tensor(
            obs_data["mean"][metric_names].values,
            dtype=self.dtype,
            device=self.device,
        )

        # Filter out NaN rows
        valid_mask = ~torch.isnan(Y).any(dim=1) & ~torch.isnan(X).any(dim=1)
        self.X_data = X[valid_mask]  # Normalized [0,1]^d space
        self.Y_data = Y[valid_mask]  # Transformed space (winsorized + standardized)

        # Log best observed (in transformed space)
        best_strs = []
        maximize_list = self._get_maximize_list()
        for m_idx, metric_name in enumerate(metric_names):
            best_val = self.Y_data[:, m_idx].max().item() if maximize_list[m_idx] else self.Y_data[:, m_idx].min().item()
            best_strs.append(f"{metric_name}={best_val:.4f}")

    def _get_maximize_list(self) -> list[bool]:
        """Get maximize as a list (one bool per objective)."""
        if self.metric_names is None:
            raise RuntimeError("Generator state not initialized.")
        if isinstance(self.maximize, bool):
            return [self.maximize] * len(self.metric_names)
        return self.maximize

    def _build_target_and_derived(self, parameter_names: list[str], remove_fixed_transform: RemoveFixed) -> None:
        """Build target point tensor and derived param transformation matrices."""
        self._bounds = torch.tensor(
            [[p.lower, p.upper] for p in self.tunable_parameters],  # pyright: ignore[reportOptionalIterable]
            dtype=self.dtype,
            device=self.device,
        )

        # Range targets in original and normalized space
        tunable_targets = torch.tensor([self.target_point_dict.get(p, 0.0) for p in parameter_names], dtype=self.dtype, device=self.device)
        self._tunable_target_normalized = (tunable_targets - self._bounds[:, 0]) / (self._bounds[:, 1] - self._bounds[:, 0])

        # Derived param weights/intercepts
        derived_params = [(name, p) for name, p in remove_fixed_transform.nontunable_parameters.items() if isinstance(p, DerivedParameter)]
        if derived_params:
            param_idx = {p: i for i, p in enumerate(parameter_names)}
            self._derived_weights = torch.zeros(len(derived_params), len(parameter_names), dtype=self.dtype, device=self.device)
            self._derived_intercepts = torch.zeros(len(derived_params), dtype=self.dtype, device=self.device)
            for i, (_, dp) in enumerate(derived_params):
                self._derived_intercepts[i] = dp._intercept
                for pname, w in dp._parameter_names_to_weights.items():
                    if pname in param_idx:
                        self._derived_weights[i, param_idx[pname]] = w

            derived_targets = torch.tensor(
                [self.target_point_dict.get(name, 0.0) for name, _ in derived_params], dtype=self.dtype, device=self.device
            )
            self.target_point = torch.cat([tunable_targets, derived_targets])
        else:
            self.target_point = tunable_targets

    def _construct_penalty_model(self) -> GenericDeterministicModel:
        """Construct L0 penalty model in original parameter space.

        X comes in [0,1]^d normalized. We unnormalize, compute derived params,
        and return negative L0 distance from target (including derived params).
        """
        if self.target_point is None or self._bounds is None:
            raise RuntimeError("Target point not initialized.")

        bounds = self._bounds
        target = self.target_point
        derived_weights = self._derived_weights
        derived_intercepts = self._derived_intercepts

        self._L0_approx = L0Approximation(target_point=target)

        def neg_L0(X: torch.Tensor) -> torch.Tensor:
            # Unnormalize: X_orig = X * (upper - lower) + lower
            X_orig = X * (bounds[:, 1] - bounds[:, 0]) + bounds[:, 0]

            # Extend with derived params if present
            if derived_weights is not None:
                derived = torch.einsum("...d,nd->...n", X_orig, derived_weights) + derived_intercepts  # pyright: ignore[reportOperatorIssue]
                X_ext = torch.cat([X_orig, derived], dim=-1)
            else:
                X_ext = X_orig

            return -self._L0_approx(X_ext)  # pyright: ignore[reportOptionalCall]

        return GenericDeterministicModel(f=neg_L0)

    def _fit_gp(self, X: torch.Tensor, Y: torch.Tensor) -> SingleTaskGP:
        """Fit a GP on pre-transformed data.

        All transforms are applied at the Ax adapter level:
        - X is already normalized to [0,1]^d by UnitX
        - Y is already winsorized and standardized by Winsorize + StandardizeY

        Args:
            X: Training inputs in [0,1]^d, shape (n, d).
            Y: Training targets (winsorized + standardized), shape (n, 1).

        Returns:
            Fitted SingleTaskGP with no transforms.
        """
        model = SingleTaskGP(
            X,
            Y,
            input_transform=None,  # All transformations are applied at the Ax adapter level
            outcome_transform=None,  # All transformations are applied at the Ax adapter level
        )
        mll = ExactMarginalLogLikelihood(model.likelihood, model)
        fit_gpytorch_mll(mll)
        return model

    @override
    def get_next_candidate(self, pending_parameters: list[TParameterization]) -> TParameterization:
        """Get the parameters for the next candidate configuration to evaluate.

        Args:
            pending_parameters: A list of parameters of the candidates pending
                evaluation.

        Returns:
            A dictionary mapping parameter names to parameter values for the next
            candidate suggested by the method.
        """
        if self.X_data is None or self.Y_data is None or self.tunable_parameters is None:
            raise RuntimeError("Generator state not initialized. Call update_generator_state first.")
        if self.target_point is None or self._bounds is None:
            raise RuntimeError("Target point not initialized.")
        if self._tunable_target_normalized is None:
            raise RuntimeError("Range target normalized not initialized.")
        if self.metric_names is None:
            raise RuntimeError("Metric names not initialized.")

        X_train = self.X_data
        Y_train = self.Y_data
        n_objectives = Y_train.shape[1]
        maximize_list = self._get_maximize_list()

        # Clamp training data in normalized space
        X_train_clamped = clamp_to_target(X=X_train.clone(), target_point=self._tunable_target_normalized, clamp_tol=CLAMP_TOL)  # pyright: ignore[reportArgumentType]

        tkwargs = {"dtype": self.dtype, "device": self.device}

        # Fit a GP for each objective (no transforms needed - data already transformed)
        objective_models = []
        for obj_idx in range(n_objectives):
            Y_obj = Y_train[:, obj_idx : obj_idx + 1]

            # Negate if minimizing so GP always maximizes
            if not maximize_list[obj_idx]:
                Y_obj = -Y_obj

            gp = self._fit_gp(X_train_clamped, Y_obj)
            objective_models.append(gp)

        # Construct the deterministic penalty model (operates in [0,1]^d)
        penalty_model = self._construct_penalty_model()

        # Create ModelList with all objective models + penalty model
        model = ModelList(*objective_models, penalty_model)

        # Build reference point in transformed Y space
        ref_point_values = []
        for obj_idx in range(n_objectives):
            Y_obj = Y_train[:, obj_idx]
            if not maximize_list[obj_idx]:
                Y_obj = -Y_obj
            y_std = Y_obj.std().item() if len(Y_obj) > 1 else 1.0
            ref_point_values.append(Y_obj.min().item() - 0.1 * max(y_std, 1e-6))
        assert self.sparsity_threshold is not None
        ref_point_values.append(-self.sparsity_threshold)

        ref_point = torch.tensor(ref_point_values, **tkwargs)

        # Set a=1e-6 for L0 approximation to get close to true L0 norm
        if self.penalty == "L0_norm":
            self._L0_approx.a.fill_(1e-6)  # pyright: ignore[reportCallIssue, reportOptionalMemberAccess]

        # Acquisition function and optimization in [0,1]^d normalized space
        acqf = qLogNoisyExpectedHypervolumeImprovement(
            model=model,
            ref_point=ref_point,
            X_baseline=X_train_clamped,
            prune_baseline=True,
            cache_root=False,
        )

        # Optimize in [0,1]^d normalized space
        opt_bounds = torch.stack(
            [
                torch.zeros(len(self.tunable_parameters), **tkwargs),
                torch.ones(len(self.tunable_parameters), **tkwargs),
            ]
        )

        num_restarts = self.optimizer_options.get("num_restarts", 20)
        raw_samples = self.optimizer_options.get("raw_samples", 1024)

        # L0 homotopy optimization
        num_homotopy_steps = self.optimizer_options.get("num_homotopy_steps", 30)
        homotopy_schedule = LogLinearHomotopySchedule(start=0.2, end=1e-3, num_steps=num_homotopy_steps)
        homotopy = Homotopy(
            homotopy_parameters=[
                HomotopyParameter(
                    parameter=self._L0_approx.a,  # pyright: ignore[reportArgumentType, reportOptionalMemberAccess]
                    schedule=homotopy_schedule,
                )
            ],
        )

        batch_initial_conditions = get_batch_initial_conditions_sebo(
            acq_function=acqf,
            raw_samples=raw_samples,
            X_pareto=X_train_clamped,
            target_point=self._tunable_target_normalized,  # pyright: ignore[reportArgumentType]
            bounds=opt_bounds,
            num_restarts=num_restarts,
        )

        candidates, _ = optimize_acqf_homotopy(
            q=self.batch_size,
            acq_function=acqf,
            bounds=opt_bounds,
            homotopy=homotopy,
            num_restarts=num_restarts,
            raw_samples=raw_samples,
            batch_initial_conditions=batch_initial_conditions,
        )

        # Clamp in normalized space, then unnormalize
        candidates = clamp_to_target(
            X=candidates,
            target_point=self._tunable_target_normalized,
            clamp_tol=CLAMP_TOL,
        )  # pyright: ignore[reportArgumentType]

        candidates_orig = candidates * (self._bounds[:, 1] - self._bounds[:, 0]) + self._bounds[:, 0]  # pyright: ignore[reportOptionalSubscript]

        # Build parameterization and add derived params via RemoveFixed
        param_names = [p.name for p in self.tunable_parameters]
        candidate_obsf = ObservationFeatures(parameters={name: candidates_orig[0, i].item() for i, name in enumerate(param_names)})
        [candidate_obsf] = self._remove_fixed_transform.untransform_observation_features([candidate_obsf])  # pyright: ignore[reportOptionalMemberAccess]
        parameterization = dict(candidate_obsf.parameters)

        # Count sparse (in normalized space)
        sparse_count = sum(
            1 for i in range(len(param_names)) if abs(candidates[0, i].item() - self._tunable_target_normalized[i].item()) <= CLAMP_TOL
        )  # pyright: ignore[reportOptionalSubscript]

        # Format parameters for logging
        params_str = ", ".join(f"'{k}': {v:.6f}" for k, v in parameterization.items())
        logger.info(
            f"Generated candidate with parameters {{{params_str}}} using {self.name} "
            f"({self.penalty}, {n_objectives} objectives, sparse={sparse_count}/{len(param_names)})."
        )

        return parameterization

Please paste any relevant traceback/logs produced by the example provided.

Ax Version

1.2.1

Python Version

3.12

Operating System

MacOS

(Optional) Describe any potential fixes you've considered to the issue outlined above.

No response

Pull Request

None

Code of Conduct

  • I agree to follow Ax's Code of Conduct

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions