diff --git a/CHANGELOG.md b/CHANGELOG.md index 18d6d69fc..ee13c8cfc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ This project adheres to [Semantic Versioning], with the exception that minor rel ### Changed +- ✨ Improve RL reward design by adding intermediate rewards ([#526]) ([**@Shaobo-Zhou]) +- 🔧 Changed test circuit level for RL predictor from ALG to INDEP ([#449]) ([**@Shaobo-Zhou]) - ✨ Remove support for custom names of trained models ([#489]) ([**@bachase**]) - đŸ”Ĩ Drop support for x86 macOS systems ([#421]) ([**@denialhaag**]) @@ -44,6 +46,8 @@ _📚 Refer to the [GitHub Release Notes](https://github.com/munich-quantum-tool +[#526]: https://github.com/munich-quantum-toolkit/predictor/pull/526 +[#449]: https://github.com/munich-quantum-toolkit/predictor/pull/449 [#489]: https://github.com/munich-quantum-toolkit/predictor/pull/489 [#421]: https://github.com/munich-quantum-toolkit/predictor/pull/421 [#406]: https://github.com/munich-quantum-toolkit/predictor/pull/406 @@ -61,6 +65,7 @@ _📚 Refer to the [GitHub Release Notes](https://github.com/munich-quantum-tool [**@flowerthrower**]: https://github.com/flowerthrower [**@denialhaag**]: https://github.com/denialhaag [**@bachase**]: https://github.com/bachase +[**@Shaobo-Zhou**]: https://github.com/Shaobo-Zhou diff --git a/docs/setup.md b/docs/setup.md index f0e83cd18..6d1218318 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -109,7 +109,7 @@ After setup, any quantum circuit can be compiled for the most suitable device wi from mqt.predictor import qcompile from mqt.bench import get_benchmark, BenchmarkLevel -uncompiled_qc = get_benchmark("ghz", level=BenchmarkLevel.ALG, circuit_size=5) +uncompiled_qc = get_benchmark("ghz", level=BenchmarkLevel.INDEP, circuit_size=5) compiled_qc, compilation_info, selected_device = qcompile( uncompiled_qc, figure_of_merit="expected_fidelity" ) diff --git a/pyproject.toml b/pyproject.toml index 9a099946a..372a098c6 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -94,6 +94,9 @@ Issues = "https://github.com/munich-quantum-toolkit/predictor/issues" Discussions = "https://github.com/munich-quantum-toolkit/predictor/discussions" Research = "https://www.cda.cit.tum.de/research/quantum/" +[tool.check-sdist] +sdist-only = ["src/mqt/predictor/_version.py"] + [tool.hatch.build.targets.wheel] packages = ["src/mqt"] @@ -124,6 +127,8 @@ filterwarnings = [ 'ignore:.*qiskit.providers.models is deprecated since Qiskit 1.2*:DeprecationWarning:', 'ignore:.*The class ``qiskit.qobj.*`` is deprecated as of Qiskit 1.3.*:DeprecationWarning:', 'ignore:.*The property ``qiskit.circuit.instruction.Instruction.*`` is deprecated as of qiskit 1.3.0.*:DeprecationWarning:', + "ignore:.*No canonical cost table defined for device.*:UserWarning:", + ] diff --git a/src/mqt/predictor/ml/predictor.py b/src/mqt/predictor/ml/predictor.py index 3f0ec5497..c1d6f1b15 100644 --- a/src/mqt/predictor/ml/predictor.py +++ b/src/mqt/predictor/ml/predictor.py @@ -61,6 +61,8 @@ logger = logging.getLogger("mqt-predictor") +NO_PARALLEL = sys.platform == "win32" and sys.version_info >= (3, 13) + def setup_device_predictor( devices: list[Target], @@ -227,7 +229,10 @@ def compile_training_circuits( with zipfile.ZipFile(str(path_zip), "r") as zip_ref: zip_ref.extractall(path_uncompiled_circuits) - Parallel(n_jobs=num_workers, verbose=100)( + # On Windows + Python 3.13, joblib's default "loky" process backend is broken + # (missing `_posixsubprocess`). Fall back to no multiprocessing. + num_jobs = 1 if NO_PARALLEL else num_workers + Parallel(n_jobs=num_jobs, verbose=100)( delayed(self._compile_all_circuits_devicewise)( device, timeout, path_uncompiled_circuits, path_compiled_circuits, logger.level ) @@ -267,7 +272,8 @@ def generate_training_data( names_list = [] scores_list = [] - results = Parallel(n_jobs=num_workers, verbose=100)( + num_jobs = 1 if NO_PARALLEL else num_workers + results = Parallel(n_jobs=num_jobs, verbose=100)( delayed(self._generate_training_sample)( filename.name, path_uncompiled_circuits, @@ -276,6 +282,7 @@ def generate_training_data( ) for filename in path_uncompiled_circuits.glob("*.qasm") ) + for sample in results: training_sample, circuit_name, scores = sample if all(score == -1 for score in scores): @@ -405,8 +412,10 @@ def train_random_forest_model( if not training_data: training_data = self._get_prepared_training_data() num_cv = min(len(training_data.y_train), 5) - mdl = GridSearchCV(mdl, tree_param, cv=num_cv, n_jobs=8).fit(training_data.X_train, training_data.y_train) - + num_jobs = 1 if NO_PARALLEL else 8 + mdl = GridSearchCV(mdl, tree_param, cv=num_cv, n_jobs=num_jobs).fit( + training_data.X_train, training_data.y_train + ) joblib_dump(mdl, save_mdl_path) logger.info("Random Forest model is trained and saved.") diff --git a/src/mqt/predictor/reward.py b/src/mqt/predictor/reward.py index 86632873d..62bc222c3 100644 --- a/src/mqt/predictor/reward.py +++ b/src/mqt/predictor/reward.py @@ -203,12 +203,15 @@ def estimated_success_probability(qc: QuantumCircuit, device: Target, precision: if first_qubit_idx not in active_qubits: continue + dt = device.dt or 1.0 # discrete time unit; fallback to 1.0 if unavailable res *= np.exp( -instruction.duration + * dt / min(device.qubit_properties[first_qubit_idx].t1, device.qubit_properties[first_qubit_idx].t2) ) continue res *= 1 - device[gate_type][first_qubit_idx,].error + else: second_qubit_idx = calc_qubit_index(qargs, qc.qregs, 1) res *= 1 - device[gate_type][first_qubit_idx, second_qubit_idx].error diff --git a/src/mqt/predictor/rl/cost_model.py b/src/mqt/predictor/rl/cost_model.py new file mode 100644 index 000000000..4b9c8b1c4 --- /dev/null +++ b/src/mqt/predictor/rl/cost_model.py @@ -0,0 +1,308 @@ +# Copyright (c) 2023 - 2025 Chair for Design Automation, TUM +# Copyright (c) 2025 Munich Quantum Software Company GmbH +# All rights reserved. +# +# SPDX-License-Identifier: MIT +# +# Licensed under the MIT License + +"""Helper functions for approximating transformations to device-native gates. + +This module provides a simple canonical gate cost model and approximate +fidelity/ESP estimates based on averaged 1q/2q error rates. The current +implementation is tailored to IBM-style backends and ships a hand-crafted +cost table for IBM "torino". + +Support for additional devices is not automatic: for each new backend, +a corresponding canonical cost table (and, if needed, device-specific +approximations) must be added manually. +""" + +from __future__ import annotations + +import logging +import warnings +from collections.abc import Mapping +from typing import TYPE_CHECKING + +import numpy as np + +if TYPE_CHECKING: + from qiskit import QuantumCircuit + +logger = logging.getLogger(__name__) + +CanonicalCostTable = Mapping[str, tuple[int, int]] + +# --------------------------------------------------------------------------- +# Canonical cost tables +# --------------------------------------------------------------------------- + +TORINO_CANONICAL_COSTS: CanonicalCostTable = { + # native 1q + "rz": (1, 0), + "rx": (1, 0), + "sx": (1, 0), + "x": (1, 0), + "id": (0, 0), # treat as no-op for fidelity; timing can be handled elsewhere + # native 2q + "cz": (0, 1), + "rzz": (0, 1), + # ------------------------------------------------------------------ + # Common 1q non-natives decomposed into {rz, rx, sx, x} + # ------------------------------------------------------------------ + "u": (3, 0), # generic U(θ, Ά, Îģ) ~ 3 Euler angles + "u3": (3, 0), + "u2": (2, 0), + "h": (3, 0), # H ≈ Rz(Ī€) â€ĸ SX â€ĸ Rz(Ī€) up to phase + "ry": (3, 0), # Ry ≈ Rz(-Ī€/2) â€ĸ Rx(θ) â€ĸ Rz(Ī€/2) + "s": (1, 0), # S = Rz(Ī€/2) + "sdg": (1, 0), + "t": (1, 0), # T = Rz(Ī€/4) + "tdg": (1, 0), + # ------------------------------------------------------------------ + # Common 2q gates expressed in a CZ + 1q basis (approximate) + # ------------------------------------------------------------------ + "rxx": (4, 1), # ~4 single-qubit rotations + 1 entangler (rzz/cz) + # Controlled-1q rotations / phases: + # roughly: 1 CZ + a few single-qubit rotations on control/target. + "crx": (4, 1), + "cry": (4, 1), + "crz": (4, 1), + "cp": (4, 1), + "cu1": (4, 1), + "cu3": (6, 1), + "cu": (6, 1), + "ch": (6, 1), + "cy": (6, 1), + "csx": (4, 1), + "cx": (6, 1), # CX = H(t) â€ĸ CZ â€ĸ H(t) -> ~2*H + 1*CZ => ~6 singles + 1 two-qubit + "czx": (6, 1), + "swap": (12, 3), # SWAP ≈ 3 CX; in a CZ basis still ~3 two-qubit gates +} + +ANKAA3_CANONICAL_COSTS: CanonicalCostTable = { + "rx": (1, 0), + "rz": (1, 0), + "iswap": (0, 1), + "u": (3, 0), + "u3": (3, 0), + "u2": (2, 0), + "h": (3, 0), + "ry": (3, 0), + "s": (1, 0), + "sdg": (1, 0), + "t": (1, 0), + "tdg": (1, 0), + "rzz": (4, 2), # ~2 iSWAP + ~4 1q rotations + "rxx": (4, 2), + # controlled gates: ~ 2 iSWAP + some 1q each + "crx": (6, 2), + "cry": (6, 2), + "crz": (6, 2), + "cp": (6, 2), + "cu1": (6, 2), + "cu3": (8, 2), + "cu": (8, 2), + "ch": (8, 2), + "cy": (8, 2), + "csx": (6, 2), + "swap": (12, 3), +} + +EMERALD_CANONICAL_COSTS: CanonicalCostTable = { + # native + "rz": (1, 0), + "rx": (1, 0), + "r": (1, 0), + "cz": (0, 1), + "u": (1, 0), + "u3": (1, 0), + "u2": (1, 0), + "h": (1, 0), + "ry": (1, 0), + "s": (1, 0), + "sdg": (1, 0), + "t": (1, 0), + "tdg": (1, 0), + "rzz": (4, 2), + "rxx": (4, 2), + "crx": (4, 1), + "cry": (4, 1), + "crz": (4, 1), + "cp": (4, 1), + "cu1": (4, 1), + "cu3": (6, 1), + "cu": (6, 1), + "ch": (6, 1), + "cy": (6, 1), + "csx": (4, 1), + "swap": (12, 3), +} + +DEVICE_CANONICAL_COSTS: dict[str, CanonicalCostTable] = { + "ibm_torino": TORINO_CANONICAL_COSTS, + "ankaa_3": ANKAA3_CANONICAL_COSTS, + "emerald": EMERALD_CANONICAL_COSTS, +} + + +def get_cost_table(device_id: str) -> CanonicalCostTable: + """Return the canonical cost table for ``device_id``, with a safe fallback. + + If the device is unknown, a warning is emitted and the Torino table is used + as a generic fallback. This keeps the code running but the approximation + should be treated with care. + """ + table = DEVICE_CANONICAL_COSTS.get(device_id) + if table is None: + msg = ( + f"No canonical cost table defined for device '{device_id}'. " + "Falling back to 'ibm_torino' table; approximate metrics may " + "be inaccurate. Consider adding a dedicated entry to " + "DEVICE_CANONICAL_COSTS." + ) + warnings.warn(msg, UserWarning, stacklevel=3) + logger.warning(msg) + table = TORINO_CANONICAL_COSTS + return table + + +def canonical_cost( + gate_name: str, + *, + device_id: str = "ibm_torino", +) -> tuple[int, int]: + """Return (n_1q, n_2q) cost for ``gate_name`` on the given device. + + Note: + Currently only a hand-crafted model for IBM "torino" is provided. + For additional devices, extend ``DEVICE_CANONICAL_COSTS`` accordingly. + """ + table = get_cost_table(device_id) + return table.get(gate_name, (0, 0)) + + +def estimate_counts( + qc: QuantumCircuit, + *, + cost_table: CanonicalCostTable, +) -> tuple[int, int]: + """Estimate canonical (n_1q, n_2q) counts for a circuit. + + Uses the provided ``cost_table`` where available and a simple, conservative + fallback otherwise (3*1q for unknown 1q gates, 1*2q + 4*1q for unknown 2q gates). + """ + n_1q = 0 + n_2q = 0 + + for circuit_instr in qc.data: + name = circuit_instr.operation.name + qargs = circuit_instr.qubits + + # Ignore non-unitary / timing-only ops for this count + if name in ("barrier", "delay", "measure"): + continue + + cost = cost_table.get(name) + if cost is None: + # Conservative fallback by arity (only used for gates missing in the table) + if len(qargs) == 1: + n_1q += 3 + elif len(qargs) == 2: + n_2q += 1 + n_1q += 4 + else: + n_1q += cost[0] + n_2q += cost[1] + return n_1q, n_2q + + +def approx_expected_fidelity( + qc: QuantumCircuit, + p1_avg: float, + p2_avg: float, + *, + device_id: str = "ibm_torino", +) -> float: + """Approximate expected fidelity from canonical gate counts. + + Args: + qc: Circuit for which to estimate fidelity. + p1_avg: Average single-qubit error probability across the device. + p2_avg: Average two-qubit error probability across the device. + device_id: Identifier of the backend (used to select the cost table). + + Returns: + Approximate expected fidelity in [0, 1]. + """ + cost_table = get_cost_table(device_id) + n_1q, n_2q = estimate_counts(qc, cost_table=cost_table) + + f_1q = (1.0 - p1_avg) ** max(n_1q, 0) + f_2q = (1.0 - p2_avg) ** max(n_2q, 0) + f = f_1q * f_2q + + # Clamp to [0, 1] for numerical robustness + return float(max(min(f, 1.0), 0.0)) + + +def approx_estimated_success_probability( + qc: QuantumCircuit, + p1_avg: float, + p2_avg: float, + tau1_avg: float, + tau2_avg: float, + tbar: float | None, + par_feature: float, + liv_feature: float, + n_qubits: int, + *, + device_id: str = "ibm_torino", +) -> float: + """Approximate ESP using canonical counts and simple idle-time modeling. + + The ESP is modeled as: + + ESP ≈ F_gates * exp(- T_idle / TĖ„) + + where F_gates is approximated from canonical 1q/2q counts and mean error + rates, and T_idle is estimated from a crude duration model modulated by + a parallelism and liveness feature. + + Args: + qc: Circuit for which to estimate ESP. + p1_avg: Average single-qubit error probability. + p2_avg: Average two-qubit error probability. + tau1_avg: Average single-qubit gate duration. + tau2_avg: Average two-qubit gate duration. + tbar: Effective characteristic decoherence time (e.g. derived from T1/T2). + par_feature: Parallelism feature in [0, 1] (e.g. Supermarq parallelism). + liv_feature: Liveness feature in [0, 1], where 1 ≈ always active. + n_qubits: Number of qubits in the circuit / device. + device_id: Identifier of the backend (used to select the cost table). + + Returns: + Approximate ESP in [0, 1]. + """ + cost_table = get_cost_table(device_id) + + # Fidelity part from gate errors + n_1q, n_2q = estimate_counts(qc, cost_table=cost_table) + f_1q = (1.0 - p1_avg) ** max(n_1q, 0) + f_2q = (1.0 - p2_avg) ** max(n_2q, 0) + f = f_1q * f_2q + + # Effective duration via parallelism (par_feature ∈ [0, 1]) + n_q = max(n_qubits, 1) + k_eff = 1.0 + (n_q - 1.0) * float(par_feature) # ∈ [1, n_qubits] + + t_hat = 0.0 + t_hat = (n_1q * tau1_avg + n_2q * tau2_avg) / k_eff + + # Idle-time penalty via (1 - liveness) + idle_frac = max(0.0, 1.0 - float(liv_feature)) + idle_factor = 1.0 if tbar is None or tbar <= 0.0 else float(np.exp(-(t_hat * idle_frac) / tbar)) + + esp = f * idle_factor + return float(max(min(esp, 1.0), 0.0)) diff --git a/src/mqt/predictor/rl/predictorenv.py b/src/mqt/predictor/rl/predictorenv.py index 4c1f5f69d..1d52964a5 100644 --- a/src/mqt/predictor/rl/predictorenv.py +++ b/src/mqt/predictor/rl/predictorenv.py @@ -15,16 +15,15 @@ from typing import TYPE_CHECKING, Any if sys.version_info >= (3, 11) and TYPE_CHECKING: # pragma: no cover - from typing import assert_never -else: - from typing_extensions import assert_never + pass if TYPE_CHECKING: from collections.abc import Callable from pathlib import Path from bqskit import Circuit - from qiskit.transpiler import Target + from qiskit.passmanager import PropertySet + from qiskit.transpiler import InstructionProperties, Target from mqt.predictor.reward import figure_of_merit from mqt.predictor.rl.actions import Action @@ -43,24 +42,37 @@ from qiskit import QuantumCircuit from qiskit.passmanager.flow_controllers import DoWhileController from qiskit.transpiler import CouplingMap, PassManager, TranspileLayout -from qiskit.transpiler.passes import CheckMap, GatesInBasis +from qiskit.transpiler.passes import ( + CheckMap, + GatesInBasis, +) from qiskit.transpiler.passes.layout.vf2_layout import VF2LayoutStopReason from mqt.predictor.hellinger import get_hellinger_model_path from mqt.predictor.reward import ( crit_depth, esp_data_available, - estimated_hellinger_distance, estimated_success_probability, expected_fidelity, ) -from mqt.predictor.rl.actions import CompilationOrigin, DeviceDependentAction, PassType, get_actions_by_pass_type -from mqt.predictor.rl.helper import create_feature_dict, get_path_training_circuits, get_state_sample +from mqt.predictor.rl.actions import ( + CompilationOrigin, + DeviceDependentAction, + PassType, + get_actions_by_pass_type, +) +from mqt.predictor.rl.cost_model import approx_estimated_success_probability, approx_expected_fidelity +from mqt.predictor.rl.helper import ( + create_feature_dict, + get_path_training_circuits, + get_state_sample, +) from mqt.predictor.rl.parsing import ( final_layout_bqskit_to_qiskit, final_layout_pytket_to_qiskit, postprocess_vf2postlayout, ) +from mqt.predictor.utils import calc_supermarq_features logger = logging.getLogger("mqt-predictor") @@ -73,6 +85,8 @@ def __init__( device: Target, reward_function: figure_of_merit = "expected_fidelity", path_training_circuits: Path | None = None, + reward_scale: float = 1.0, + no_effect_penalty: float = -0.001, ) -> None: """Initializes the PredictorEnv object. @@ -80,6 +94,8 @@ def __init__( device: The target device to be used for compilation. reward_function: The figure of merit to be used for the reward function. Defaults to "expected_fidelity". path_training_circuits: The path to the training circuits folder. Defaults to None, which uses the default path. + reward_scale: Scaling factor for rewards/penalties proportional to fidelity changes. + no_effect_penalty: Step penalty applied when an action does not change the circuit (no-op). Raises: ValueError: If the reward function is "estimated_success_probability" and no calibration data is available for the device or if the reward function is "estimated_hellinger_distance" and no trained model is available for the device. @@ -111,6 +127,10 @@ def __init__( self.action_set[index] = elem self.actions_synthesis_indices.append(index) index += 1 + for elem in action_dict[PassType.OPT]: + self.action_set[index] = elem + self.actions_opt_indices.append(index) + index += 1 for elem in action_dict[PassType.LAYOUT]: self.action_set[index] = elem self.actions_layout_indices.append(index) @@ -119,10 +139,6 @@ def __init__( self.action_set[index] = elem self.actions_routing_indices.append(index) index += 1 - for elem in action_dict[PassType.OPT]: - self.action_set[index] = elem - self.actions_opt_indices.append(index) - index += 1 for elem in action_dict[PassType.MAPPING]: self.action_set[index] = elem self.actions_mapping_indices.append(index) @@ -154,7 +170,7 @@ def __init__( self.rng = np.random.default_rng(10) spaces = { - "num_qubits": Discrete(128), + "num_qubits": Discrete(self.device.num_qubits + 1), "depth": Discrete(1000000), "program_communication": Box(low=0, high=1, shape=(1,), dtype=np.float32), "critical_depth": Box(low=0, high=1, shape=(1,), dtype=np.float32), @@ -164,64 +180,185 @@ def __init__( } self.observation_space = Dict(spaces) self.filename = "" + self.reward_scale = reward_scale + self.no_effect_penalty = no_effect_penalty + self.prev_reward: float | None = None + self.prev_reward_kind: str | None = None + self._p1_avg = 0.0 + self._p2_avg = 0.0 + self._tau1_avg = 0.0 + self._tau2_avg = 0.0 + self._tbar: float | None = None + self._dev_avgs_cached = False def step(self, action: int) -> tuple[dict[str, Any], float, bool, bool, dict[Any, Any]]: - """Executes the given action and returns the new state, the reward, whether the episode is done, whether the episode is truncated and additional information. + """Run one environment step. + + This method: + 1. Evaluates the current circuit with the configured reward function + (using either the exact or approximate metric, depending on state). + 2. Applies the selected transpiler pass (the action). + 3. Normalizes the circuit (e.g., decompose high-level gates) so that + reward computation is well-defined. + 4. Updates the internal state and valid action set. + 5. Computes a shaped step reward based on the change in figure of merit. + + Reward design: + - For non-terminal actions, the step reward is a scaled delta between + the new and previous reward (plus an optional step penalty). + - For the terminate action, the episode ends and the final reward is + the exact (calibration-aware) metric. + """ + self.used_actions.append(str(self.action_set[action].name)) - Arguments: - action: The action to be executed, represented by its index in the action set. + logger.info(f"Applying {self.action_set[action].name}") - Returns: - A tuple containing the new state as a feature dictionary, the reward value, whether the episode is done, whether the episode is truncated, and additional information. + # 1) Evaluate reward for current circuit (before applying the action) + prev_val, prev_kind = self.calculate_reward(mode="auto") + self.prev_reward = prev_val + self.prev_reward_kind = prev_kind - Raises: - RuntimeError: If no valid actions are left. - """ - self.used_actions.append(str(self.action_set[action].name)) + # 2) Apply the selected transpiler pass altered_qc = self.apply_action(action) + if not altered_qc: - return ( - create_feature_dict(self.state), - 0, - True, - False, - {}, - ) + return create_feature_dict(self.state), 0.0, True, False, {} + + # 3) Normalize circuit: remove high-level gates that break reward assumptions + # - decompose "unitary"/"clifford" + for gate_type in ["unitary", "clifford"]: + if altered_qc.count_ops().get(gate_type): + altered_qc = altered_qc.decompose(gates_to_decompose=gate_type) + # 4) Update state and valid actions self.state: QuantumCircuit = altered_qc self.num_steps += 1 - self.valid_actions = self.determine_valid_actions_for_state() if len(self.valid_actions) == 0: msg = "No valid actions left." raise RuntimeError(msg) + # 5) Compute step reward and termination flag if action == self.action_terminate_index: - reward_val = self.calculate_reward() + # Terminal action: use the exact metric as final reward + final_val, final_kind = self.calculate_reward(mode="exact") + logger.info(f"Final reward ({final_kind}): {final_val}") + self.prev_reward = final_val + self.prev_reward_kind = final_kind done = True + reward_val = final_val else: - reward_val = 0 done = False - # in case the Qiskit.QuantumCircuit has unitary or u gates in it, decompose them (because otherwise qiskit will throw an error when applying the BasisTranslator - if self.state.count_ops().get("unitary"): - self.state = self.state.decompose(gates_to_decompose="unitary") + # Re-evaluate reward after applying the action + new_val, new_kind = self.calculate_reward(mode="auto") + delta_reward = new_val - prev_val + + if prev_kind == "approx" and new_kind == "exact": + # Metrics aren't comparable across regimes; suppress delta to avoid misleading reward signal + delta_reward = 0.0 + + if delta_reward > 0.0: + # Positive change: reward proportional to improvement + reward_val = self.reward_scale * delta_reward + elif delta_reward < 0.0: + # Negative change: proportional penalty + reward_val = self.reward_scale * delta_reward + else: + # No change: small step penalty for "doing nothing" + reward_val = self.no_effect_penalty + + self.prev_reward = new_val + self.prev_reward_kind = new_kind self.state._layout = self.layout # noqa: SLF001 - obs = create_feature_dict(self.state) - return obs, reward_val, done, False, {} + return create_feature_dict(self.state), reward_val, done, False, {} + + def calculate_reward(self, qc: QuantumCircuit | None = None, mode: str = "auto") -> tuple[float, str]: + """Compute the current reward and indicate whether it is exact or approximate. + + Args: + qc: + Circuit to evaluate. If ``None``, the environment's current state + circuit is used. + mode: + Controls how the function chooses between exact (calibration-based) + and approximate (cost-model-based) metrics: + + - ``"auto"`` (default): use the exact metric if the circuit is + already native and mapped; otherwise fall back to the approximate + metric. + - ``"exact"``: always compute the exact metric (no approximation). + - ``"approx"``: always compute the approximate metric. + + Returns: + A pair ``(value, kind)`` where: + + - ``value`` is the scalar reward value. + - ``kind`` is either ``"exact"`` (exact, calibration-aware) or + ``"approx"`` (cost-model-based approximation). + + Notes: + - Dual-path behavior (exact + approximate) is currently only implemented + for ``"expected_fidelity"`` and + ``"estimated_success_probability"``. + - Other reward functions are always computed exactly. + """ + if qc is None: + qc = self.state + + # Reward functions that are always computed exactly, regardless of `mode`. + if self.reward_function not in {"expected_fidelity", "estimated_success_probability"}: + if self.reward_function == "critical_depth": + return crit_depth(qc), "exact" + # Fallback for other unknown / not-yet-implemented reward functions: + logger.warning( + "Reward function '%s' is not supported in PredictorEnv. Returning 0.0 as a fallback reward.", + self.reward_function, + ) + return 0.0, "exact" + + # ------------------------------------------------------------------ + # From here on: dual-path rewards (exact vs approx) for EF / ESP. + # ------------------------------------------------------------------ + + # Decide which path to use (exact vs approx) + if mode == "exact": + kind = "exact" + elif mode == "approx": + kind = "approx" + else: # "auto" + kind = "exact" if self._is_native_and_mapped(qc) else "approx" + + # Exact metrics use the full circuit and device calibration data + if kind == "exact": + if self.reward_function == "expected_fidelity": + return expected_fidelity(qc, self.device), "exact" + + return estimated_success_probability(qc, self.device), "exact" + + # Approximate metrics use canonical gate counts and device-specific averages + self._ensure_device_averages_cached() - def calculate_reward(self) -> float: - """Calculates and returns the reward for the current state.""" if self.reward_function == "expected_fidelity": - return expected_fidelity(self.state, self.device) - if self.reward_function == "estimated_success_probability": - return estimated_success_probability(self.state, self.device) - if self.reward_function == "estimated_hellinger_distance": - return estimated_hellinger_distance(self.state, self.device, self.hellinger_model) - if self.reward_function == "critical_depth": - return crit_depth(self.state) - assert_never(self.state) + val = approx_expected_fidelity(qc, self._p1_avg, self._p2_avg, device_id=self.device.description) + return val, "approx" + + # self.reward_function == "estimated_success_probability" + feats = calc_supermarq_features(qc) + val = approx_estimated_success_probability( + qc, + p1_avg=self._p1_avg, + p2_avg=self._p2_avg, + tau1_avg=self._tau1_avg, + tau2_avg=self._tau2_avg, + tbar=self._tbar, + par_feature=feats.parallelism, + liv_feature=feats.liveness, + n_qubits=qc.num_qubits, + device_id=self.device.description, + ) + return val, "approx" def render(self) -> None: """Renders the current state.""" @@ -244,6 +381,7 @@ def reset( The initial state and additional information. """ super().reset(seed=seed) + if isinstance(qc, QuantumCircuit): self.state = qc elif qc: @@ -263,6 +401,7 @@ def reset( self.num_qubits_uncompiled_circuit = self.state.num_qubits self.has_parameterized_gates = len(self.state.parameters) > 0 + return create_feature_dict(self.state), {} def action_masks(self) -> list[bool]: @@ -317,55 +456,78 @@ def apply_action(self, action_index: int) -> QuantumCircuit | None: if action.origin == CompilationOrigin.BQSKIT: return self._apply_bqskit_action(action, action_index) msg = f"Origin {action.origin} not supported." + raise ValueError(msg) def _apply_qiskit_action(self, action: Action, action_index: int) -> QuantumCircuit: - if action.name == "QiskitO3" and isinstance(action, DeviceDependentAction): - passes = action.transpile_pass( - self.device.operation_names, - CouplingMap(self.device.build_coupling_map()) if self.layout else None, + pm_property_set: PropertySet | None = {} + if getattr(action, "stochastic", False): # Wrap stochastic action to optimize for the used figure of merit + altered_qc, pm_property_set = self.fom_aware_compile( + action, + self.device, + self.state, + max_iteration=self.max_iter, ) - pm = PassManager([DoWhileController(passes, do_while=action.do_while)]) else: - transpile_pass = ( - action.transpile_pass(self.device) if callable(action.transpile_pass) else action.transpile_pass - ) - pm = PassManager(transpile_pass) - - altered_qc = pm.run(self.state) + if action.name in ["QiskitO3", "Opt2qBlocks_preserve"] and isinstance(action, DeviceDependentAction): + passes = action.transpile_pass( + self.device.operation_names, + CouplingMap(self.device.build_coupling_map()) if self.layout else None, + ) + if action.name == "QiskitO3": + pm = PassManager([DoWhileController(passes, do_while=action.do_while)]) + else: + pm = PassManager(passes) + altered_qc = pm.run(self.state) + pm_property_set = dict(pm.property_set) if hasattr(pm, "property_set") else {} + else: + transpile_pass = ( + action.transpile_pass(self.device) if callable(action.transpile_pass) else action.transpile_pass + ) + pm = PassManager(transpile_pass) + altered_qc = pm.run(self.state) + pm_property_set = dict(pm.property_set) if hasattr(pm, "property_set") else {} if action_index in ( self.actions_layout_indices + self.actions_mapping_indices + self.actions_final_optimization_indices ): - altered_qc = self._handle_qiskit_layout_postprocessing(action, pm, altered_qc) - - elif action_index in self.actions_routing_indices and self.layout: - self.layout.final_layout = pm.property_set["final_layout"] + altered_qc = self._handle_qiskit_layout_postprocessing(action, pm_property_set, altered_qc) + elif action_index in self.actions_routing_indices and self.layout and pm_property_set is not None: + self.layout.final_layout = pm_property_set["final_layout"] return altered_qc def _handle_qiskit_layout_postprocessing( - self, action: Action, pm: PassManager, altered_qc: QuantumCircuit + self, + action: Action, + pm_property_set: dict[str, Any] | None, + altered_qc: QuantumCircuit, ) -> QuantumCircuit: + if not pm_property_set: + return altered_qc if action.name == "VF2PostLayout": - assert pm.property_set["VF2PostLayout_stop_reason"] is not None - post_layout = pm.property_set["post_layout"] + assert pm_property_set["VF2PostLayout_stop_reason"] is not None + post_layout = pm_property_set.get("post_layout") if post_layout: altered_qc, _ = postprocess_vf2postlayout(altered_qc, post_layout, self.layout) elif action.name == "VF2Layout": - assert pm.property_set["VF2Layout_stop_reason"] == VF2LayoutStopReason.SOLUTION_FOUND - assert pm.property_set["layout"] + if pm_property_set["VF2Layout_stop_reason"] == VF2LayoutStopReason.SOLUTION_FOUND: + assert pm_property_set["layout"] else: - assert pm.property_set["layout"] + assert pm_property_set["layout"] - if pm.property_set["layout"]: + layout = pm_property_set.get("layout") + if layout: self.layout = TranspileLayout( - initial_layout=pm.property_set["layout"], - input_qubit_mapping=pm.property_set["original_qubit_indices"], - final_layout=pm.property_set["final_layout"], + initial_layout=layout, + input_qubit_mapping=pm_property_set.get("original_qubit_indices"), + final_layout=pm_property_set.get("final_layout"), _output_qubit_list=altered_qc.qubits, _input_qubit_count=self.num_qubits_uncompiled_circuit, ) + + if self.layout is not None and pm_property_set.get("final_layout"): + self.layout.final_layout = pm_property_set["final_layout"] return altered_qc def _apply_tket_action(self, action: Action, action_index: int) -> QuantumCircuit: @@ -379,7 +541,7 @@ def _apply_tket_action(self, action: Action, action_index: int) -> QuantumCircui qbs = tket_qc.qubits tket_qc.rename_units({qbs[i]: Qubit("q", i) for i in range(len(qbs))}) - altered_qc = tk_to_qiskit(tket_qc) + altered_qc = tk_to_qiskit(tket_qc, replace_implicit_swaps=True) if action_index in self.actions_routing_indices: assert self.layout is not None @@ -447,3 +609,138 @@ def determine_valid_actions_for_state(self) -> list[int]: # No layout applied yet return self.actions_mapping_indices + self.actions_layout_indices + self.actions_opt_indices + + def _ensure_device_averages_cached(self) -> None: + """Cache device-wide averages for 1q/2q errors, durations, and coherence. + + Backend-dependent preprocessing step used by the approximate reward model. + It computes and caches: + + - _p1_avg: average single-qubit gate error probability + - _p2_avg: average two-qubit gate error probability + - _tau1_avg: average single-qubit gate duration (seconds) + - _tau2_avg: average two-qubit gate duration (seconds) + - _tbar: median of min(T1, T2) over all qubits (seconds), if available + + Assumes a modern Qiskit Target (e.g. IBM backends) and raises a RuntimeError + if the required calibration data is not available. + """ + if getattr(self, "_dev_avgs_cached", False): + return + + target = self.device + + # Hard requirements: these must exist for the approximate model to make sense + try: + num_qubits = target.num_qubits + op_names = list(target.operation_names) + coupling_map = target.build_coupling_map() + qubit_props = target.qubit_properties + except AttributeError as exc: + msg = "Device target does not expose the required Target API for approximate reward computation." + raise RuntimeError(msg) from exc + + dt = getattr(target, "dt", None) + twoq_edges = coupling_map.get_edges() # list[(i, j)] + + p1: list[float] = [] + p2: list[float] = [] + t1: list[float] = [] + t2: list[float] = [] + + # Exclude non-gate operations from gate error/duration averages + gate_blacklist = {"measure", "reset", "delay", "barrier"} + + def _get_props(name: str, qargs: tuple[int, ...]) -> InstructionProperties | None: + """Return calibration properties for (name, qargs) or None if unavailable.""" + try: + props_map = target[name] + except KeyError: + return None + + return props_map.get(qargs, None) + + # --- Aggregate error and duration statistics over all 1q/2q gates -------- + for name in op_names: + if name in gate_blacklist: + continue + + # Determine arity (number of qubits) of the operation + try: + op = target.operation_from_name(name) + arity = op.num_qubits + except (KeyError, AttributeError): + # If we can't get a proper operation object, skip this op + continue + + if arity == 1: + # Collect single-qubit gate error/duration over all qubits + for q in range(num_qubits): + props = _get_props(name, (q,)) + if props is None: + continue + err = getattr(props, "error", None) + if err is not None: + p1.append(float(err)) + dur = getattr(props, "duration", None) + if dur is not None: + dur_s = float(dur) + t1.append(dur_s) + + elif arity == 2: + # Collect two-qubit gate error/duration over all supported edges + for i, j in twoq_edges: + props = _get_props(name, (i, j)) + if props is None: + # Try flipped orientation for uni-directional couplings + props = _get_props(name, (j, i)) + if props is None: + continue + err = getattr(props, "error", None) + if err is not None: + p2.append(float(err)) + dur = getattr(props, "duration", None) + if dur is not None: + dur_s = float(dur if dt is None else dur * dt) + t2.append(dur_s) + + else: + # Ignore gates with arity > 2; extend here if you ever need them + continue + + if not p1 and not p2: + msg = "No valid 1q/2q calibration data found in Target, cannot compute approximate reward." + raise RuntimeError(msg) + + self._p1_avg = float(np.mean(p1)) if p1 else 0.0 + self._p2_avg = float(np.mean(p2)) if p2 else 0.0 + self._tau1_avg = float(np.mean(t1)) if t1 else 0.0 + self._tau2_avg = float(np.mean(t2)) if t2 else 0.0 + + # --- Compute a single coherence scale tbar from T1/T2 --------------------- + tmins: list[float] = [] + if qubit_props: + for i in range(num_qubits): + props = qubit_props[i] + if props is None: + continue + t1v = getattr(props, "t1", None) + t2v = getattr(props, "t2", None) + vals = [v for v in (t1v, t2v) if v is not None] + if vals: + tmins.append(float(min(vals))) + + self._tbar = float(np.median(tmins)) if tmins else None + + self._dev_avgs_cached = True + + def _is_native_and_mapped(self, qc: QuantumCircuit) -> bool: + check_nat_gates = GatesInBasis(basis_gates=self.device.operation_names) + check_nat_gates(qc) + only_nat_gates = check_nat_gates.property_set["all_gates_in_basis"] + + check_mapping = CheckMap(coupling_map=CouplingMap(self.device.build_coupling_map())) + check_mapping(qc) + mapped = check_mapping.property_set["is_swap_mapped"] + + return bool(only_nat_gates and mapped) diff --git a/src/mqt/predictor/utils.py b/src/mqt/predictor/utils.py index faf3d0596..4112dfb13 100644 --- a/src/mqt/predictor/utils.py +++ b/src/mqt/predictor/utils.py @@ -15,7 +15,6 @@ import sys from dataclasses import dataclass from typing import TYPE_CHECKING, Any -from warnings import warn import networkx as nx import numpy as np @@ -52,7 +51,7 @@ def timeout_watcher( TimeoutExceptionError: If the function call exceeds the timeout limit. """ if sys.platform == "win32": - warn("Timeout is not supported on Windows.", category=RuntimeWarning, stacklevel=2) + logger.info("Timeout is not supported on Windows; running without timeout.") return func(*args) if isinstance(args, tuple | list) else func(args) class TimeoutExceptionError(Exception): # Custom exception class diff --git a/tests/compilation/test_predictor_rl.py b/tests/compilation/test_predictor_rl.py index f9b0a938d..16c9f7652 100644 --- a/tests/compilation/test_predictor_rl.py +++ b/tests/compilation/test_predictor_rl.py @@ -12,14 +12,16 @@ import re from pathlib import Path +from typing import TYPE_CHECKING import pytest from mqt.bench import BenchmarkLevel, get_benchmark from mqt.bench.targets import get_device +from qiskit import transpile from qiskit.circuit.library import CXGate from qiskit.qasm2 import dump -from qiskit.transpiler import InstructionProperties, Target -from qiskit.transpiler.passes import GatesInBasis +from qiskit.transpiler import CouplingMap, InstructionProperties, Target +from qiskit.transpiler.passes import CheckMap, GatesInBasis from mqt.predictor.rl import Predictor, rl_compile from mqt.predictor.rl.actions import ( @@ -30,15 +32,23 @@ register_action, remove_action, ) +from mqt.predictor.rl.cost_model import ( + TORINO_CANONICAL_COSTS, + canonical_cost, + get_cost_table, +) from mqt.predictor.rl.helper import create_feature_dict, get_path_trained_model +if TYPE_CHECKING: + from _pytest.monkeypatch import MonkeyPatch + def test_predictor_env_reset_from_string() -> None: """Test the reset function of the predictor environment with a quantum circuit given as a string as input.""" device = get_device("ibm_eagle_127") predictor = Predictor(figure_of_merit="expected_fidelity", device=device) qasm_path = Path("test.qasm") - qc = get_benchmark("dj", BenchmarkLevel.ALG, 3) + qc = get_benchmark("dj", BenchmarkLevel.INDEP, 3) with qasm_path.open("w", encoding="utf-8") as f: dump(qc, f) assert predictor.env.reset(qc=qasm_path)[0] == create_feature_dict(qc) @@ -71,6 +81,7 @@ def test_qcompile_with_newly_trained_models() -> None: figure_of_merit = "expected_fidelity" device = get_device("ibm_falcon_127") qc = get_benchmark("ghz", BenchmarkLevel.ALG, 3) + predictor = Predictor(figure_of_merit=figure_of_merit, device=device) model_name = "model_" + figure_of_merit + "_" + device.description @@ -94,15 +105,19 @@ def test_qcompile_with_newly_trained_models() -> None: check_nat_gates = GatesInBasis(basis_gates=device.operation_names) check_nat_gates(qc_compiled) only_nat_gates = check_nat_gates.property_set["all_gates_in_basis"] + check_mapping = CheckMap(coupling_map=CouplingMap(device.build_coupling_map())) + check_mapping(qc_compiled) + mapped = check_mapping.property_set["is_swap_mapped"] assert qc_compiled.layout is not None assert compilation_information is not None - assert only_nat_gates, "Circuit should only contain native gates but was not detected as such" + assert only_nat_gates, "Circuit should only contain native gates but was not detected as such." + assert mapped, "Circuit should be mapped to the device's coupling map." def test_qcompile_with_false_input() -> None: """Test the qcompile function with false input.""" - qc = get_benchmark("dj", BenchmarkLevel.ALG, 5) + qc = get_benchmark("dj", BenchmarkLevel.INDEP, 5) with pytest.raises(ValueError, match=re.escape("figure_of_merit must not be None if predictor_singleton is None.")): rl_compile(qc, device=get_device("quantinuum_h2_56"), figure_of_merit=None) with pytest.raises(ValueError, match=re.escape("device must not be None if predictor_singleton is None.")): @@ -136,3 +151,85 @@ def test_register_action() -> None: with pytest.raises(KeyError, match=re.escape("No action with name wrong_action_name is registered")): remove_action("wrong_action_name") + + +def test_cost_model_unknown_device_and_gate() -> None: + """Cover unknown-device fallback and unknown-gate default in cost model.""" + # --- Unknown device: triggers warning + Torino fallback --- + msg = "No canonical cost table defined for device 'my_custom_device'" + with pytest.warns(UserWarning, match=re.escape(msg)): + table = get_cost_table("my_custom_device") + + # The returned table must be exactly the Torino table + assert table is TORINO_CANONICAL_COSTS + + # --- Unknown gate on a known device: (0, 0) fallback --- + assert canonical_cost("some_weird_gate", device_id="ibm_torino") == (0, 0) + + +def test_calculate_reward_esp_and_critical_depth(monkeypatch: MonkeyPatch) -> None: + """Cover ESP (exact + approx) and critical_depth branches in calculate_reward.""" + qc = get_benchmark("ghz", BenchmarkLevel.INDEP, 3) + device = get_device("ibm_heron_133") + + # Make a native + mapped version of the circuit for exact metrics + coupling = CouplingMap(device.build_coupling_map()) + qc_native = transpile( + qc, + basis_gates=device.operation_names, + coupling_map=coupling, + optimization_level=3, + ) + + # ------------------------------------------------------------------ + # 1) estimated_success_probability: exact + approx (all modes) + # ------------------------------------------------------------------ + predictor_esp = Predictor( + figure_of_merit="estimated_success_probability", + device=device, + ) + + # a) Explicit exact mode on a native, mapped circuit + val_exact, kind_exact = predictor_esp.env.calculate_reward(qc=qc_native, mode="exact") + assert kind_exact == "exact" + assert 0.0 <= val_exact <= 1.0 + + # a2) Auto mode on native, mapped circuit → should select exact + val_auto_exact, kind_auto_exact = predictor_esp.env.calculate_reward(qc=qc_native, mode="auto") + assert kind_auto_exact == "exact" + assert 0.0 <= val_auto_exact <= 1.0 + + # b) Explicit approx mode (forces approximate path regardless of nativeness) + val_approx, kind_approx = predictor_esp.env.calculate_reward(qc=qc, mode="approx") + assert kind_approx == "approx" + assert 0.0 <= val_approx <= 1.0 + + # c) Auto mode → approx (force "not native & not mapped") + monkeypatch.setattr(predictor_esp.env, "_is_native_and_mapped", lambda _qc: False) + val_auto_approx, kind_auto_approx = predictor_esp.env.calculate_reward(qc=qc, mode="auto") + assert kind_auto_approx == "approx" + assert 0.0 <= val_auto_approx <= 1.0 + + # ------------------------------------------------------------------ + # 1d) Broken Target API → RuntimeError in ensure_device_averages_cached + # ------------------------------------------------------------------ + # Use a fresh predictor so _dev_avgs_cached is not yet set + broken_predictor = Predictor( + figure_of_merit="estimated_success_probability", + device=device, + ) + broken_predictor.env.device = object() + + with pytest.raises( + RuntimeError, + match=re.escape("Device target does not expose the required Target API for approximate reward computation."), + ): + broken_predictor.env._ensure_device_averages_cached() # noqa: SLF001 + + # ------------------------------------------------------------------ + # 2) critical_depth: always exact, regardless of mode + # ------------------------------------------------------------------ + predictor_cd = Predictor(figure_of_merit="critical_depth", device=device) + val_cd, kind_cd = predictor_cd.env.calculate_reward(qc=qc, mode="auto") + assert kind_cd == "exact" + assert 0.0 <= val_cd <= 1.0 diff --git a/tests/device_selection/test_predictor_ml.py b/tests/device_selection/test_predictor_ml.py index 0b2f1485f..803c822a6 100644 --- a/tests/device_selection/test_predictor_ml.py +++ b/tests/device_selection/test_predictor_ml.py @@ -43,7 +43,7 @@ def test_setup_device_predictor_with_prediction(path_uncompiled_circuits: Path, path_compiled_circuits.mkdir() for i in range(2, 8): - qc = get_benchmark("ghz", BenchmarkLevel.ALG, i) + qc = get_benchmark("ghz", BenchmarkLevel.INDEP, i) path = path_uncompiled_circuits / f"qc{i}.qasm" with path.open("w", encoding="utf-8") as f: dump(qc, f) @@ -63,7 +63,7 @@ def test_setup_device_predictor_with_prediction(path_uncompiled_circuits: Path, assert (data_path / "names_list_expected_fidelity.npy").exists() assert (data_path / "scores_list_expected_fidelity.npy").exists() - test_qc = get_benchmark("ghz", BenchmarkLevel.ALG, 3) + test_qc = get_benchmark("ghz", BenchmarkLevel.INDEP, 3) predicted = predict_device_for_figure_of_merit(test_qc, figure_of_merit="expected_fidelity") assert predicted.description == "ibm_falcon_127" @@ -93,7 +93,7 @@ def test_remove_files(path_uncompiled_circuits: Path, path_compiled_circuits: Pa def test_predict_device_for_figure_of_merit_no_suitable_device() -> None: """Test the prediction of the device for a given figure of merit with a wrong device name.""" num_qubits = 130 - qc = get_benchmark("ghz", BenchmarkLevel.ALG, num_qubits) + qc = get_benchmark("ghz", BenchmarkLevel.INDEP, num_qubits) with pytest.raises( ValueError, match=re.escape(f"No suitable device found for the given quantum circuit with {num_qubits} qubits.") ): diff --git a/tests/hellinger_distance/test_estimated_hellinger_distance.py b/tests/hellinger_distance/test_estimated_hellinger_distance.py index 082acf610..f1e51ec16 100644 --- a/tests/hellinger_distance/test_estimated_hellinger_distance.py +++ b/tests/hellinger_distance/test_estimated_hellinger_distance.py @@ -11,7 +11,6 @@ from __future__ import annotations import re -import sys import warnings from pathlib import Path from typing import TYPE_CHECKING @@ -217,20 +216,14 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: dump(qc, f) # Generate compiled circuits (using trained RL model) - if sys.platform == "win32": - with pytest.warns(RuntimeWarning, match=re.escape("Timeout is not supported on Windows.")): - ml_predictor.compile_training_circuits( - timeout=600, path_compiled_circuits=target_path, path_uncompiled_circuits=source_path, num_workers=1 - ) - else: - ml_predictor.compile_training_circuits( - timeout=600, path_compiled_circuits=target_path, path_uncompiled_circuits=source_path, num_workers=1 - ) + ml_predictor.compile_training_circuits( + path_uncompiled_circuits=source_path, + path_compiled_circuits=target_path, + timeout=6000, + ) # Generate training data from the compiled circuits - ml_predictor.generate_training_data( - path_uncompiled_circuits=source_path, path_compiled_circuits=target_path, num_workers=1 - ) + ml_predictor.generate_training_data(path_uncompiled_circuits=source_path, path_compiled_circuits=target_path) for file in [ "training_data_estimated_hellinger_distance.npy",