diff --git a/API/Classes/Case/GMPLParser.py b/API/Classes/Case/GMPLParser.py new file mode 100644 index 000000000..019181ec8 --- /dev/null +++ b/API/Classes/Case/GMPLParser.py @@ -0,0 +1,333 @@ +""" +Phase 1 — Pure GMPL syntax extraction. + +Parses a GMPL data file (.txt / .dat) into structured objects without +semantic interpretation. Every ``set`` and ``param`` declaration is +captured, including multi-slice blocks, headerless tables, and empty +param bodies. + +Public API +---------- + GMPLParser.parse_file(path) → GMPLParseResult + GMPLParser.parse_string(text) → GMPLParseResult +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional, Union + +# ──────────────────────────────────────────────────────────── +# Data structures +# ──────────────────────────────────────────────────────────── + +@dataclass +class RowEntry: + """One data row: a key string followed by numeric values.""" + key: str + values: list[Union[int, float]] + + +@dataclass +class SliceBlock: + """ + One slice of a ``param`` declaration. + + * ``header`` – the square-bracket header tokens, e.g. ``["RE1","*","*"]``. + ``None`` for headerless tables. + * ``column_labels`` – the column names after ``:`` on the header line. + * ``rows`` – the data rows. + """ + header: Optional[list[str]] = None + column_labels: list[str] = field(default_factory=list) + rows: list[RowEntry] = field(default_factory=list) + + +@dataclass +class ParsedParam: + """A complete ``param`` declaration with its name, default, and slices.""" + name: str + default: Optional[Union[int, float]] = None + slices: list[SliceBlock] = field(default_factory=list) + + +@dataclass +class GMPLParseResult: + """Bag holding every ``set`` and ``param`` extracted from a GMPL file.""" + sets: dict[str, list[str]] = field(default_factory=dict) + params: list[ParsedParam] = field(default_factory=list) + + # convenience + def param_names(self) -> list[str]: + return [p.name for p in self.params] + + def summary(self) -> str: + lines = [f"Sets : {len(self.sets)}"] + for sn, sv in self.sets.items(): + lines.append(f" {sn} ({len(sv)}) : {sv[:6]}{'...' if len(sv) > 6 else ''}") + lines.append(f"Params: {len(self.params)}") + for p in self.params: + total_rows = sum(len(s.rows) for s in p.slices) + lines.append(f" {p.name} (default={p.default}, slices={len(p.slices)}, rows={total_rows})") + return "\n".join(lines) + + +# ──────────────────────────────────────────────────────────── +# Tokeniser helpers +# ──────────────────────────────────────────────────────────── + +_COMMENT_RE = re.compile(r"#.*") + +def _strip_comments(text: str) -> str: + return _COMMENT_RE.sub("", text) + +def _tokenise(text: str) -> list[str]: + """Split GMPL text into semicolon-terminated statements.""" + clean = _strip_comments(text) + parts = clean.split(";") + return [p.strip() for p in parts if p.strip()] + +def _try_number(s: str) -> Union[int, float, str]: + """Try to cast *s* to int or float; fall back to the original string.""" + try: + v = float(s) + return int(v) if v == int(v) else v + except (ValueError, OverflowError): + return s + + +# ──────────────────────────────────────────────────────────── +# Parser +# ──────────────────────────────────────────────────────────── + +class GMPLParser: + """ + Pure-syntax GMPL parser. + + Usage:: + + result = GMPLParser.parse_file("utopia.txt") + print(result.summary()) + """ + + # ── public ────────────────────────────────────────────── + @staticmethod + def parse_file(path: str | Path) -> GMPLParseResult: + """Parse a ``.txt`` / ``.dat`` GMPL file and return structured result.""" + text = Path(path).read_text(encoding="utf-8", errors="replace") + return GMPLParser.parse_string(text) + + @staticmethod + def parse_string(text: str) -> GMPLParseResult: + """Parse raw GMPL text and return structured result.""" + result = GMPLParseResult() + stmts = _tokenise(text) + for stmt in stmts: + first = stmt.split()[0].lower() if stmt.split() else "" + if first == "end": + break + if first == "set": + GMPLParser._parse_set(stmt, result) + elif first == "param": + GMPLParser._parse_param(stmt, result) + return result + + # ── set ───────────────────────────────────────────────── + @staticmethod + def _parse_set(stmt: str, result: GMPLParseResult) -> None: + tokens = stmt.split() + name = tokens[1] + # find := position + body = "" + for i, t in enumerate(tokens): + if ":=" in t: + # Handle glued tokens like "YEAR:=" + after = t.split(":=", 1)[1] + rest = tokens[i + 1 :] + body = (after + " " + " ".join(rest)).strip() + break + members = [m for m in body.split() if m] + result.sets[name] = members + + # ── param ────────────────────────────────────────────── + @staticmethod + def _parse_param(stmt: str, result: GMPLParseResult) -> None: + tokens = stmt.split() + name = tokens[1] + + # extract default + default_val: Optional[Union[int, float]] = None + for i, t in enumerate(tokens): + if t.lower() == "default": + dv = _try_number(tokens[i + 1]) + if isinstance(dv, (int, float)): + default_val = dv + break + + # find := position + assign_pos = None + for i, t in enumerate(tokens): + if ":=" in t: + assign_pos = i + break + + if assign_pos is None: + # declaration only, no data + result.params.append(ParsedParam(name=name, default=default_val)) + return + + # Rejoin everything after := (handle glued tokens) + glued_after = tokens[assign_pos].split(":=", 1)[1] + body_tokens = ([glued_after] if glued_after else []) + tokens[assign_pos + 1 :] + body = " ".join(body_tokens).strip() + + if not body: + result.params.append(ParsedParam(name=name, default=default_val)) + return + + # Split into slices by `[` headers + slices = GMPLParser._split_slices(body) + parsed = ParsedParam(name=name, default=default_val) + for sl in slices: + parsed.slices.append(GMPLParser._parse_slice_block(sl)) + result.params.append(parsed) + + @staticmethod + def _split_slices(body: str) -> list[str]: + """Split param body into per-slice strings.""" + # Find all '[' positions + bracket_positions = [m.start() for m in re.finditer(r"\[", body)] + if not bracket_positions: + return [body] + + result = [] + # Anything before first bracket is a headerless slice + prefix = body[: bracket_positions[0]].strip() + if prefix: + result.append(prefix) + + for i, pos in enumerate(bracket_positions): + end = bracket_positions[i + 1] if i + 1 < len(bracket_positions) else len(body) + result.append(body[pos:end].strip()) + + return result + + @staticmethod + def _parse_slice_block(text: str) -> SliceBlock: + """Parse one slice block into a SliceBlock object.""" + block = SliceBlock() + + # Extract header if present + if text.startswith("["): + bracket_end = text.index("]") + header_str = text[1:bracket_end] + block.header = [h.strip() for h in header_str.split(",")] + text = text[bracket_end + 1 :].strip() + + # Look for colon separator (column labels) + if ":" in text: + parts = text.split(":") + # Column labels are between first and second ':' + if len(parts) >= 3: + # header : col1 col2 ... : \n row data + col_part = parts[1].strip() + block.column_labels = col_part.split() + # Rejoin remaining for rows + row_text = ":".join(parts[2:]).strip() + # Handle `:=` at start of row_text + if row_text.startswith("="): + row_text = row_text[1:].strip() + elif len(parts) == 2: + # Might be `key : val` pairs or `:=` continuation + left = parts[0].strip() + right = parts[1].strip() + if right.startswith("="): + # it's `:=` continuation + row_text = right[1:].strip() + # If left has column labels + col_tokens = left.split() + if col_tokens: + block.column_labels = col_tokens + else: + row_text = text + else: + row_text = text + else: + row_text = text + + # Parse rows + if row_text: + GMPLParser._parse_rows(row_text, block) + + return block + + @staticmethod + def _parse_rows(text: str, block: SliceBlock) -> None: + """Parse row data into RowEntry objects.""" + # Tokenize by whitespace + tokens = text.split() + if not tokens: + return + + n_cols = len(block.column_labels) if block.column_labels else 0 + + if n_cols > 0: + # Table format: key val1 val2 ... valN + i = 0 + while i < len(tokens): + key = tokens[i] + i += 1 + vals = [] + while len(vals) < n_cols and i < len(tokens): + v = _try_number(tokens[i]) + if isinstance(v, (int, float)): + vals.append(v) + i += 1 + else: + break + if vals: + block.rows.append(RowEntry(key=key, values=vals)) + else: + # Headerless: key value pairs or single values + i = 0 + while i < len(tokens): + key = tokens[i] + i += 1 + vals = [] + while i < len(tokens): + v = _try_number(tokens[i]) + if isinstance(v, (int, float)): + vals.append(v) + i += 1 + else: + break + if vals: + block.rows.append(RowEntry(key=key, values=vals)) + + +# ──────────────────────────────────────────────────────────── +# CLI entry point +# ──────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import sys + + if len(sys.argv) < 2: + print("Usage: python GMPLParser.py ") + sys.exit(1) + + result = GMPLParser.parse_file(sys.argv[1]) + print(result.summary()) + print("\n" + "=" * 60 + "\n") + + for p in result.params[:10]: + print(f"\nparam {p.name} (default={p.default}):") + for si, s in enumerate(p.slices): + print(f" slice[{si}]: header={s.header}") + print(f" columns: {s.column_labels}") + for r in s.rows[:3]: + print(f" {r}") + if len(s.rows) > 3: + print(f" ... ({len(s.rows)} rows total)") diff --git a/API/Classes/Case/MuioTransformer.py b/API/Classes/Case/MuioTransformer.py new file mode 100644 index 000000000..288ef5582 --- /dev/null +++ b/API/Classes/Case/MuioTransformer.py @@ -0,0 +1,402 @@ +""" +Phase 3 — MUIO Transformer. + +Converts normalised GMPL tuples into the exact JSON structures +consumed by MUIO's ``CaseClass`` and UI grids. + +Public API +---------- + MuioTransformer.transform(parsed, normalized) → dict[str, dict] +""" + +from __future__ import annotations + +import json +from typing import Optional, Union + +from Classes.Case.GMPLParser import GMPLParseResult +from Classes.Case.SliceInterpreter import SliceInterpreter + +# ──────────────────────────────────────────────────────────── +# Constants +# ──────────────────────────────────────────────────────────── + +_SET_ALIASES: dict[str, str] = { + "FUEL": "COMMODITY", +} + +_MUIO_ONLY_SETS: list[str] = [ + "STORAGEINTRADAY", + "STORAGEINTRAYEAR", + "UDC", +] + +_ID_PREFIXES: dict[str, str] = { + "TECHNOLOGY": "T", + "COMMODITY": "C", + "FUEL": "C", # FUEL → COMMODITY alias + "EMISSION": "E", + "STORAGE": "S", + "TIMESLICE": "Ts", + "SEASON": "SE", + "DAYTYPE": "DT", + "DAILYTIMEBRACKET": "DTB", +} + +# ──────────────────────────────────────────────────────────── +# Parameter mapping registry (param_name → file group + key + dims) +# ──────────────────────────────────────────────────────────── + +PARAM_MAPPING: dict[str, dict] = { + # ── R ── + "DiscountRate": {"file": "R", "key": "DR", "dims": ["region"]}, + "DepreciationMethod": {"file": "R", "key": "DM", "dims": ["region"]}, + # ── RY ── + "AccumulatedAnnualDemand": {"file": "RY", "key": "AAD", "dims": ["region", "commodity", "year"]}, + "SpecifiedAnnualDemand": {"file": "RY", "key": "SAD", "dims": ["region", "commodity", "year"]}, + "REMinProductionTarget": {"file": "RY", "key": "REPT", "dims": ["region", "year"]}, + # ── RT ── + "OperationalLife": {"file": "RT", "key": "OL", "dims": ["region", "technology"]}, + "CapacityToActivityUnit": {"file": "RT", "key": "CAU", "dims": ["region", "technology"]}, + "TotalTechnologyModelPeriodActivityUpperLimit": {"file": "RT", "key": "TTMPAU", "dims": ["region", "technology"]}, + "TotalTechnologyModelPeriodActivityLowerLimit": {"file": "RT", "key": "TTMPAL", "dims": ["region", "technology"]}, + "DiscountRateIdv": {"file": "RT", "key": "DRI", "dims": ["region", "technology"]}, + "DiscountRateTech": {"file": "RT", "key": "DRT", "dims": ["region", "technology"]}, + # ── RE ── + "AnnualExogenousEmission": {"file": "RE", "key": "AEE", "dims": ["region", "emission"]}, + "ModelPeriodExogenousEmission": {"file": "RE", "key": "MPEE", "dims": ["region", "emission"]}, + # ── RS ── + "OperationalLifeStorage": {"file": "RS", "key": "OLS", "dims": ["region", "storage"]}, + "DiscountRateStorage": {"file": "RS", "key": "DRS", "dims": ["region", "storage"]}, + "MinStorageCharge": {"file": "RS", "key": "MSC", "dims": ["region", "storage"]}, + "StorageMaxChargeRate": {"file": "RS", "key": "SMCR", "dims": ["region", "storage"]}, + "StorageMaxDischargeRate": {"file": "RS", "key": "SMDR", "dims": ["region", "storage"]}, + # ── RYT ── + "CapitalCost": {"file": "RYT", "key": "CC", "dims": ["region", "technology", "year"]}, + "FixedCost": {"file": "RYT", "key": "FC", "dims": ["region", "technology", "year"]}, + "VariableCost": {"file": "RYT", "key": "VC", "dims": ["region", "technology", "year"]}, + "ResidualCapacity": {"file": "RYT", "key": "RC", "dims": ["region", "technology", "year"]}, + "TotalAnnualMaxCapacity": {"file": "RYT", "key": "TAMC", "dims": ["region", "technology", "year"]}, + "TotalAnnualMinCapacity": {"file": "RYT", "key": "TAMiC", "dims": ["region", "technology", "year"]}, + "TotalAnnualMaxCapacityInvestment": {"file": "RYT", "key": "TAMCI", "dims": ["region", "technology", "year"]}, + "TotalAnnualMinCapacityInvestment": {"file": "RYT", "key": "TAMiCI", "dims": ["region", "technology", "year"]}, + "TotalTechnologyAnnualActivityUpperLimit": {"file": "RYT", "key": "TTAAUL", "dims": ["region", "technology", "year"]}, + "TotalTechnologyAnnualActivityLowerLimit": {"file": "RYT", "key": "TTAALL", "dims": ["region", "technology", "year"]}, + "AvailabilityFactor": {"file": "RYT", "key": "AF", "dims": ["region", "technology", "year"]}, + "RETagTechnology": {"file": "RYT", "key": "RETT", "dims": ["region", "technology", "year"]}, + "NumberOfNewTechnologyUnits": {"file": "RYT", "key": "NONTU", "dims": ["region", "technology", "year"]}, + "CapacityOfOneTechnologyUnit": {"file": "RYT", "key": "COOTU", "dims": ["region", "technology", "year"]}, + # ── RYC ── + "RETagFuel": {"file": "RYC", "key": "RETF", "dims": ["region", "commodity", "year"]}, + # ── RYE ── + "AnnualEmissionLimit": {"file": "RYE", "key": "AEL", "dims": ["region", "emission", "year"]}, + "EmissionsPenalty": {"file": "RYE", "key": "EP", "dims": ["region", "emission", "year"]}, + "ModelPeriodEmissionLimit": {"file": "RYE", "key": "MPEL", "dims": ["region", "emission"]}, + # ── RYS ── + "CapitalCostStorage": {"file": "RYS", "key": "CCS", "dims": ["region", "storage", "year"]}, + "ResidualStorageCapacity": {"file": "RYS", "key": "RSC", "dims": ["region", "storage", "year"]}, + # ── RYCn (constraints — not in UTOPIA but in schema) ── + # ── RYTs ── + "YearSplit": {"file": "RYTs", "key": "YS", "dims": ["region", "timeslice", "year"]}, + # ── RYSeDt ── + "DaysInDayType": {"file": "RYSeDt","key": "DDT", "dims": ["region", "season", "daytype", "year"]}, + # ── RYDtb ── + "DaySplit": {"file": "RYDtb", "key": "DS", "dims": ["region", "dailytimebracket", "year"]}, + # ── RYTTs ── + "CapacityFactor": {"file": "RYTTs", "key": "CF", "dims": ["region", "technology", "timeslice", "year"]}, + "SpecifiedDemandProfile": {"file": "RYCTs", "key": "SDP", "dims": ["region", "commodity", "timeslice", "year"]}, + # ── RYTM ── + "InputActivityRatio": {"file": "RYTCM", "key": "IAR", "dims": ["region", "technology", "commodity", "mode", "year"]}, + "OutputActivityRatio": {"file": "RYTCM", "key": "OAR", "dims": ["region", "technology", "commodity", "mode", "year"]}, + # ── RYTEM ── + "EmissionActivityRatio": {"file": "RYTE", "key": "EAR", "dims": ["region", "technology", "emission", "mode", "year"]}, + # ── RTSM (storage links) ── + "TechnologyToStorage": {"file": "RTSM", "key": "TTS", "dims": ["region", "technology", "storage", "mode"]}, + "TechnologyFromStorage": {"file": "RTSM", "key": "TFS", "dims": ["region", "technology", "storage", "mode"]}, + # ── Conversions ── + "Conversionls": {"file": "RYTs", "key": "CLS", "dims": ["region", "timeslice", "season"]}, + "Conversionld": {"file": "RYTs", "key": "CLD", "dims": ["region", "timeslice", "daytype"]}, + "Conversionlh": {"file": "RYTs", "key": "CLH", "dims": ["region", "timeslice", "dailytimebracket"]}, +} + +# ──────────────────────────────────────────────────────────── +# ID generation +# ──────────────────────────────────────────────────────────── + +def _generate_id_map(set_name: str, members: list[str]) -> dict[str, str]: + """Sorted, deterministic ID mapping. MODE_OF_OPERATION keeps raw strings.""" + if set_name == "MODE_OF_OPERATION": + return {m: m for m in members} + prefix = _ID_PREFIXES.get(set_name) + if prefix is None: + return {m: m for m in members} + sorted_members = sorted(members) + return {m: f"{prefix}_{i}" for i, m in enumerate(sorted_members)} + + +# ──────────────────────────────────────────────────────────── +# Builder helpers +# ──────────────────────────────────────────────────────────── + +def _map_id(raw_val: str, set_name: str, id_maps: dict[str, dict[str, str]]) -> str: + """Look up the MUIO ID for a raw value, trying aliases.""" + m = id_maps.get(set_name, {}) + if raw_val in m: + return m[raw_val] + # Try alias + alias = _SET_ALIASES.get(set_name, set_name) + m2 = id_maps.get(alias, {}) + return m2.get(raw_val, raw_val) + + +def _dim_to_set(dim: str) -> str: + """Map a dimension label to its set name.""" + return { + "region": "REGION", + "technology": "TECHNOLOGY", + "commodity": "COMMODITY", + "fuel": "COMMODITY", + "emission": "EMISSION", + "storage": "STORAGE", + "mode": "MODE_OF_OPERATION", + "year": "YEAR", + "timeslice": "TIMESLICE", + "season": "SEASON", + "daytype": "DAYTYPE", + "dailytimebracket": "DAILYTIMEBRACKET", + }.get(dim, dim.upper()) + + +def _dim_to_field(dim: str) -> str: + """Map a dimension label to its MUIO JSON field name.""" + return { + "region": "RegId", + "technology": "TechId", + "commodity": "CommId", + "fuel": "CommId", + "emission": "EmisId", + "storage": "StgId", + "mode": "MoId", + "year": "Year", + "timeslice": "TsId", + "season": "SeId", + "daytype": "DtId", + "dailytimebracket": "DtbId", + }.get(dim, dim) + + +# ──────────────────────────────────────────────────────────── +# Transformer +# ──────────────────────────────────────────────────────────── + +class MuioTransformer: + """ + Converts normalised GMPL tuples into MUIO JSON file group dicts. + + Usage:: + + parsed = GMPLParser.parse_file("utopia.txt") + normalized = SliceInterpreter().interpret(parsed) + result = MuioTransformer.transform(parsed, normalized) + + Returns a dict keyed by file group name (``"genData"``, ``"RYT"``, + ``"RYTCM"``, etc.) where each value is a dict matching the + JSON structure written by ``CaseClass.createCase()``. + """ + + @staticmethod + def transform( + parsed: GMPLParseResult, + normalized: dict[str, dict[tuple, Union[int, float]]], + ) -> dict[str, dict]: + """ + Build the full bag of MUIO JSON dicts. + + Parameters + ---------- + parsed : GMPLParseResult + Raw parse output (needed for set definitions). + normalized : dict + ``{param_name: {tuple: value}}`` from ``SliceInterpreter``. + + Returns + ------- + dict[str, dict] + Keys are file group names; values are MUIO-format dicts. + """ + # 1. Normalise sets + sets = MuioTransformer._normalise_sets(parsed.sets) + + # 2. ID maps + id_maps: dict[str, dict[str, str]] = {} + for sn, members in sets.items(): + id_maps[sn] = _generate_id_map(sn, members) + + # 3. Build genData + gen = MuioTransformer._build_gen_data(sets, id_maps) + + # 4. Group params by file group + file_groups: dict[str, dict[str, dict]] = {} + for param_name, data in normalized.items(): + mapping = PARAM_MAPPING.get(param_name) + if mapping is None: + continue + fg = mapping["file"] + key = mapping["key"] + dims = mapping["dims"] + + records = MuioTransformer._build_long_records(dims, data, id_maps) + if fg not in file_groups: + file_groups[fg] = {} + file_groups[fg][key] = {"SC_0": records} + + # 5. Ensure all standard file groups exist (even if empty) + for fg_name in _all_file_groups(): + if fg_name not in file_groups: + file_groups[fg_name] = {} + + # 6. Package + result = {"genData": gen} + result.update(file_groups) + return result + + # ── set normalisation ─────────────────────────────────── + @staticmethod + def _normalise_sets(raw_sets: dict[str, list[str]]) -> dict[str, list[str]]: + """Rename FUEL→COMMODITY, inject MUIO-only empty sets.""" + out: dict[str, list[str]] = {} + for name, members in raw_sets.items(): + canonical = _SET_ALIASES.get(name, name) + if canonical in out: + # merge + existing = set(out[canonical]) + existing.update(members) + out[canonical] = sorted(existing) + else: + out[canonical] = list(members) + for extra in _MUIO_ONLY_SETS: + if extra not in out: + out[extra] = [] + return out + + # ── genData builder ───────────────────────────────────── + @staticmethod + def _build_gen_data( + sets: dict[str, list[str]], + id_maps: dict[str, dict[str, str]], + ) -> dict: + years = sorted(sets.get("YEAR", [])) + techs = sets.get("TECHNOLOGY", []) + comms = sets.get("COMMODITY", []) + emis = sets.get("EMISSION", []) + stgs = sets.get("STORAGE", []) + tss = sets.get("TIMESLICE", []) + ses = sets.get("SEASON", []) + dts = sets.get("DAYTYPE", []) + dtbs = sets.get("DAILYTIMEBRACKET", []) + mos = sets.get("MODE_OF_OPERATION", []) + + tech_map = id_maps.get("TECHNOLOGY", {}) + comm_map = id_maps.get("COMMODITY", {}) + emis_map = id_maps.get("EMISSION", {}) + stg_map = id_maps.get("STORAGE", {}) + ts_map = id_maps.get("TIMESLICE", {}) + se_map = id_maps.get("SEASON", {}) + dt_map = id_maps.get("DAYTYPE", {}) + dtb_map = id_maps.get("DAILYTIMEBRACKET", {}) + + gen: dict = { + "osy-scenarios": [{"ScenarioId": "SC_0"}], + "osy-years": years, + "osy-mo": str(len(mos)), + "osy-tech": [{"TechId": tech_map[t], "TechName": t} for t in sorted(techs)], + "osy-comm": [{"CommId": comm_map[c], "CommName": c} for c in sorted(comms)], + "osy-emis": [{"EmisId": emis_map[e], "EmisName": e} for e in sorted(emis)], + "osy-stg": [{"StgId": stg_map[s], "StgName": s} for s in sorted(stgs)], + "osy-ts": [{"TsId": ts_map[t], "TsName": t} for t in sorted(tss)], + "osy-se": [{"SeId": se_map[s], "SeName": s} for s in sorted(ses)], + "osy-dt": [{"DtId": dt_map[d], "DtName": d} for d in sorted(dts)], + "osy-dtb": [{"DtbId": dtb_map[d], "DtbName": d} for d in sorted(dtbs)], + "osy-constraints": [], + } + return gen + + # ── record builder ────────────────────────────────────── + @staticmethod + def _build_long_records( + dims: list[str], + data: dict[tuple, Union[int, float]], + id_maps: dict[str, dict[str, str]], + ) -> list[dict]: + """Build long-form records {Field: ID, ..., Value: num}.""" + records = [] + for tup, val in data.items(): + rec: dict = {} + for di, dim in enumerate(dims): + if di < len(tup): + raw = tup[di] + set_name = _dim_to_set(dim) + field_name = _dim_to_field(dim) + + if dim == "year": + rec[field_name] = str(raw) + elif dim == "mode": + rec[field_name] = str(raw) + else: + rec[field_name] = _map_id(raw, set_name, id_maps) + rec["Value"] = val + records.append(rec) + + # Sort for deterministic output + records.sort(key=lambda r: tuple(str(v) for v in r.values())) + return records + + +# ──────────────────────────────────────────────────────────── +# Standard file groups +# ──────────────────────────────────────────────────────────── + +def _all_file_groups() -> list[str]: + return [ + "R", "RY", "RT", "RE", "RS", + "RYT", "RYC", "RYE", "RYS", + "RYCn", "RYTs", "RYSeDt", "RYDtb", + "RYTTs", "RYCTs", + "RYTM", "RYTCM", + "RYTE", "RYTEM", + "RYTSM", "RTSM", + "RYTCn", + "RYTTs", + "RYCTs", + ] + + +# ──────────────────────────────────────────────────────────── +# CLI entry point +# ──────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import sys + from Classes.Case.GMPLParser import GMPLParser + + if len(sys.argv) < 2: + print("Usage: python MuioTransformer.py ") + sys.exit(1) + + parsed = GMPLParser.parse_file(sys.argv[1]) + normalized = SliceInterpreter().interpret(parsed) + result = MuioTransformer.transform(parsed, normalized) + + print(f"\nGenerated {len(result)} file groups:") + for name, data in sorted(result.items()): + if name == "genData": + continue + keys = list(data.keys()) + if keys: + print(f" {name}.json — keys: {keys[:8]}{'...' if len(keys) > 8 else ''}") + + # Print sample + for sample_key in ["RYT", "RYTCM", "RT"]: + if sample_key in result and result[sample_key]: + print(f"\n── {sample_key}.json (excerpt) ──") + print(json.dumps(result[sample_key], indent=2, default=str)[:1000]) diff --git a/API/Classes/Case/SliceInterpreter.py b/API/Classes/Case/SliceInterpreter.py new file mode 100644 index 000000000..704b065ca --- /dev/null +++ b/API/Classes/Case/SliceInterpreter.py @@ -0,0 +1,494 @@ +""" +Phase 2 — Semantic interpreter. + +Expands raw GMPL parse results into normalised +``{param_name: {tuple: value}}`` dictionaries. + +Each tuple key encodes the full dimension coordinates, +e.g. ``("UTOPIA", "E01", "1990")`` for a 3-D parameter. + +Public API +---------- + SliceInterpreter().interpret(parsed) → dict[str, dict[tuple, number]] +""" + +from __future__ import annotations + +from typing import Optional, Union + +from Classes.Case.GMPLParser import GMPLParseResult, SliceBlock + +# ──────────────────────────────────────────────────────────── +# Dimension registry (param name → ordered dimension names) +# ──────────────────────────────────────────────────────────── + +_DIM_REGISTRY: dict[str, list[str]] = { + # R + "DiscountRate": ["REGION"], + "DepreciationMethod": ["REGION"], + # RT + "OperationalLife": ["REGION", "TECHNOLOGY"], + "CapacityToActivityUnit": ["REGION", "TECHNOLOGY"], + "TotalTechnologyModelPeriodActivityUpperLimit": ["REGION", "TECHNOLOGY"], + "TotalTechnologyModelPeriodActivityLowerLimit": ["REGION", "TECHNOLOGY"], + "DiscountRateIdv": ["REGION", "TECHNOLOGY"], + "DiscountRateTech": ["REGION", "TECHNOLOGY"], + # RE + "AnnualExogenousEmission": ["REGION", "EMISSION"], + "ModelPeriodExogenousEmission": ["REGION", "EMISSION"], + # RS + "OperationalLifeStorage": ["REGION", "STORAGE"], + "DiscountRateStorage": ["REGION", "STORAGE"], + "MinStorageCharge": ["REGION", "STORAGE"], + "StorageMaxChargeRate": ["REGION", "STORAGE"], + "StorageMaxDischargeRate": ["REGION", "STORAGE"], + # RY + "AccumulatedAnnualDemand": ["REGION", "FUEL", "YEAR"], + "SpecifiedAnnualDemand": ["REGION", "FUEL", "YEAR"], + "TradeRoute": ["REGION", "REGION", "FUEL", "YEAR"], + "REMinProductionTarget": ["REGION", "YEAR"], + # RYT + "CapitalCost": ["REGION", "TECHNOLOGY", "YEAR"], + "FixedCost": ["REGION", "TECHNOLOGY", "YEAR"], + "VariableCost": ["REGION", "TECHNOLOGY", "YEAR"], + "ResidualCapacity": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMaxCapacity": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMinCapacity": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMaxCapacityInvestment": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMinCapacityInvestment": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalTechnologyAnnualActivityUpperLimit": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalTechnologyAnnualActivityLowerLimit": ["REGION", "TECHNOLOGY", "YEAR"], + "AvailabilityFactor": ["REGION", "TECHNOLOGY", "YEAR"], + "RETagTechnology": ["REGION", "TECHNOLOGY", "YEAR"], + "NumberOfNewTechnologyUnits": ["REGION", "TECHNOLOGY", "YEAR"], + "CapacityOfOneTechnologyUnit": ["REGION", "TECHNOLOGY", "YEAR"], + # RYC + "RETagFuel": ["REGION", "FUEL", "YEAR"], + # RYE + "AnnualEmissionLimit": ["REGION", "EMISSION", "YEAR"], + "EmissionsPenalty": ["REGION", "EMISSION", "YEAR"], + "ModelPeriodEmissionLimit": ["REGION", "EMISSION"], + # RYS + "CapitalCostStorage": ["REGION", "STORAGE", "YEAR"], + "ResidualStorageCapacity": ["REGION", "STORAGE", "YEAR"], + # RYTs + "YearSplit": ["REGION", "TIMESLICE", "YEAR"], + # RYSeDt + "DaysInDayType": ["REGION", "SEASON", "DAYTYPE", "YEAR"], + # RYDtb + "DaySplit": ["REGION", "DAILYTIMEBRACKET", "YEAR"], + # RYTTs + "CapacityFactor": ["REGION", "TECHNOLOGY", "TIMESLICE", "YEAR"], + "SpecifiedDemandProfile": ["REGION", "FUEL", "TIMESLICE", "YEAR"], + # RYTM + "InputActivityRatio": ["REGION", "TECHNOLOGY", "FUEL", "MODE_OF_OPERATION", "YEAR"], + "OutputActivityRatio": ["REGION", "TECHNOLOGY", "FUEL", "MODE_OF_OPERATION", "YEAR"], + # RYTEM + "EmissionActivityRatio": ["REGION", "TECHNOLOGY", "EMISSION", "MODE_OF_OPERATION", "YEAR"], + # RTSM (storage links — no year) + # Header order: [Region, Technology, Storage, Mode] (not Storage,Tech) + "TechnologyToStorage": ["REGION", "TECHNOLOGY", "STORAGE", "MODE_OF_OPERATION"], + "TechnologyFromStorage": ["REGION", "TECHNOLOGY", "STORAGE", "MODE_OF_OPERATION"], + # Conversion matrices (some files omit REGION in header) + "Conversionls": ["REGION", "TIMESLICE", "SEASON"], + "Conversionld": ["REGION", "TIMESLICE", "DAYTYPE"], + "Conversionlh": ["REGION", "TIMESLICE", "DAILYTIMEBRACKET"], + # Misc + "ResultsPath": [], +} + + +class SliceInterpreter: + """ + Semantic interpreter for GMPL parse results. + + Expands slice blocks into normalised ``{tuple: value}`` dictionaries. + + Usage:: + + parsed = GMPLParser.parse_file("utopia.txt") + normalized = SliceInterpreter().interpret(parsed) + >>> normalized["CapitalCost"][("UTOPIA","E01","1990")] + 1400 + """ + + def interpret(self, parsed: GMPLParseResult) -> dict[str, dict[tuple, Union[int, float]]]: + """ + Interpret all parameters in *parsed* and return a dict + mapping parameter names to their ``{tuple: value}`` data. + """ + sets = parsed.sets + result: dict[str, dict[tuple, Union[int, float]]] = {} + + for p in parsed.params: + dims = self._get_dims(p.name) + if dims is None: + continue # unknown param — skip + + out: dict[tuple, Union[int, float]] = {} + for block in p.slices: + self._expand_slice(p.name, dims, block, p.default, out, sets) + if out: + # Post-process: pad short tuples with region if needed + out = self._pad_short_tuples(out, dims, sets) + result[p.name] = out + + return result + + # ── dimension lookup ──────────────────────────────────── + def _get_dims(self, param_name: str) -> Optional[list[str]]: + return _DIM_REGISTRY.get(param_name) + + def _pad_short_tuples( + self, + data: dict[tuple, Union[int, float]], + dims: list[str], + sets: dict[str, list[str]], + ) -> dict[tuple, Union[int, float]]: + """Pad tuples shorter than dims by prepending the region.""" + n_dims = len(dims) + if not data: + return data + sample = next(iter(data)) + if len(sample) >= n_dims: + return data + # Try to prepend region + deficit = n_dims - len(sample) + regions = sets.get("REGION", []) + if deficit == 1 and dims[0] == "REGION" and len(regions) == 1: + region = regions[0] + return {(region,) + tup: val for tup, val in data.items()} + return data + + # ── slice expansion ───────────────────────────────────── + def _expand_slice( + self, + param_name: str, + dims: list[str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + sets: dict[str, list[str]], + ) -> None: + n_dims = len(dims) + header = block.header + + # ── No dimensions (scalar) ── + if n_dims == 0: + return + + # ── Headerless tables ── + if header is None: + self._expand_headerless(dims, block, default_val, out) + return + + # ── Headed tables ── + wildcard_positions = [i for i, h in enumerate(header) if h == "*"] + fixed_positions = {i: header[i] for i in range(len(header)) if header[i] != "*"} + n_header = len(header) + n_wildcards = len(wildcard_positions) + + # Header length > dim count — extra wildcard is column layout + if n_header > n_dims and n_wildcards >= 2: + self._expand_oversized_header( + dims, header, wildcard_positions, fixed_positions, + block, default_val, out, + ) + return + + if n_wildcards == 0: + # All fixed — headerless rows under this header + self._expand_all_fixed(dims, header, block, default_val, out) + elif n_wildcards == 1: + self._expand_single_wildcard(dims, header, wildcard_positions[0], fixed_positions, block, default_val, out) + elif n_wildcards == 2: + self._expand_two_wildcards(dims, header, wildcard_positions, fixed_positions, block, default_val, out) + else: + # 3+ wildcards — best effort + self._expand_multi_wildcard(dims, header, wildcard_positions, fixed_positions, block, default_val, out, sets) + + # ── headerless ────────────────────────────────────────── + def _expand_headerless( + self, + dims: list[str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + for row in block.rows: + key = row.key + if len(row.values) == 1: + val = row.values[0] + if default_val is not None and val == default_val: + continue + # Key might contain multiple dimension values + parts = key.split() + if len(parts) == len(dims) - 1: + tup = tuple(parts) + (str(val),) + # Hmm, this doesn't work for key-value pairs + # Actually for headerless, it's usually `key value` + tup = (key,) + out[tup] = val + else: + out[(key,)] = val + + # ── all fixed header ──────────────────────────────────── + def _expand_all_fixed( + self, + dims: list[str], + header: list[str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + # e.g. [RE1,Coal,Biomass,1,*] where last * is via column labels + # Actually if n_wildcards == 0, columns represent another dim + prefix = tuple(header) + for row in block.rows: + values = row.values + if block.column_labels: + for ci, col in enumerate(block.column_labels): + if ci < len(values): + val = values[ci] + if default_val is not None and val == default_val: + continue + out[prefix + (row.key, str(col))] = val + else: + if values: + val = values[0] + if default_val is not None and val == default_val: + continue + out[prefix + (row.key,)] = val + + # ── single wildcard ───────────────────────────────────── + def _expand_single_wildcard( + self, + dims: list[str], + header: list[str], + wc_pos: int, + fixed: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + n_dims = len(dims) + # Build mapping from header position to dim position + dim_map = self._map_header_to_dims(header, dims, fixed, [wc_pos]) + + for row in block.rows: + if block.column_labels: + # Row key fills the wildcard, columns are year/other dim + for ci, col in enumerate(block.column_labels): + if ci < len(row.values): + val = row.values[ci] + if default_val is not None and val == default_val: + continue + parts = list(header) + parts[wc_pos] = row.key + # Map to dims, adding column as last dim + tup = self._build_tuple(dims, parts, col, dim_map) + out[tup] = val + else: + # Simple key-value + if row.values: + val = row.values[0] + if default_val is not None and val == default_val: + continue + parts = list(header) + parts[wc_pos] = row.key + tup = tuple(parts[:n_dims]) + out[tup] = val + + # ── two wildcards ─────────────────────────────────────── + def _expand_two_wildcards( + self, + dims: list[str], + header: list[str], + wc_positions: list[int], + fixed: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + n_dims = len(dims) + wc0, wc1 = wc_positions[0], wc_positions[1] + + if block.column_labels: + # Row key → wc0 dimension, columns → wc1 dimension + for row in block.rows: + for ci, col in enumerate(block.column_labels): + if ci < len(row.values): + val = row.values[ci] + if default_val is not None and val == default_val: + continue + parts = list(header) + parts[wc0] = row.key + parts[wc1] = str(col) + # Build dimension tuple — header may be shorter than dims + tup = tuple(parts[:n_dims]) if len(parts) >= n_dims else tuple(parts) + out[tup] = val + else: + # No columns — row key + single value + for row in block.rows: + if row.values: + val = row.values[0] + if default_val is not None and val == default_val: + continue + parts = list(header) + parts[wc0] = row.key + tup = tuple(parts[:n_dims]) + out[tup] = val + + # ── oversized header ──────────────────────────────────── + def _expand_oversized_header( + self, + dims: list[str], + header: list[str], + wildcard_positions: list[int], + fixed_positions: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + """Handle headers longer than dimensions (extra * is columns).""" + n_dims = len(dims) + + # Find which fixed values fill which dims + fixed_vals = [header[i] for i in sorted(fixed_positions.keys())] + n_fixed = len(fixed_vals) + n_wc = len(wildcard_positions) + + # The first wildcard(s) fill remaining dims, last wildcard is columns + remaining_dim_slots = n_dims - n_fixed + + if block.column_labels: + for row in block.rows: + for ci, col in enumerate(block.column_labels): + if ci < len(row.values): + val = row.values[ci] + if default_val is not None and val == default_val: + continue + # Build tuple from fixed + row.key + tup = tuple(fixed_vals) + (row.key,) if remaining_dim_slots == 1 else tuple(fixed_vals) + # Trim to n_dims + tup_list = list(tup) + while len(tup_list) < n_dims: + tup_list.append(str(col)) + out[tuple(tup_list[:n_dims])] = val + else: + for row in block.rows: + if row.values: + val = row.values[0] + if default_val is not None and val == default_val: + continue + tup = tuple(fixed_vals) + (row.key,) + out[tuple(list(tup)[:n_dims])] = val + + # ── multi wildcard (3+) ───────────────────────────────── + def _expand_multi_wildcard( + self, + dims: list[str], + header: list[str], + wildcard_positions: list[int], + fixed_positions: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + sets: dict[str, list[str]], + ) -> None: + """Handle 3+ wildcards — map by set membership.""" + n_dims = len(dims) + # Fall back to treating like 2 wildcards with column + if len(wildcard_positions) >= 2 and block.column_labels: + wc0 = wildcard_positions[0] + wc1 = wildcard_positions[1] + for row in block.rows: + for ci, col in enumerate(block.column_labels): + if ci < len(row.values): + val = row.values[ci] + if default_val is not None and val == default_val: + continue + parts = list(header) + parts[wc0] = row.key + parts[wc1] = str(col) + # Fill remaining wildcards from context + tup = tuple(parts[:n_dims]) + out[tup] = val + + # ── helpers ───────────────────────────────────────────── + def _map_header_to_dims( + self, + header: list[str], + dims: list[str], + fixed: dict[int, str], + wc_positions: list[int], + ) -> dict[int, int]: + """Map header positions to dimension positions (best effort).""" + mapping: dict[int, int] = {} + used_dims: set[int] = set() + + # Fixed positions map first + for hp in sorted(fixed.keys()): + for di in range(len(dims)): + if di not in used_dims: + mapping[hp] = di + used_dims.add(di) + break + + # Wildcards fill remaining + for wp in wc_positions: + for di in range(len(dims)): + if di not in used_dims: + mapping[wp] = di + used_dims.add(di) + break + + return mapping + + def _build_tuple( + self, + dims: list[str], + parts: list[str], + col_value: str, + dim_map: dict[int, int], + ) -> tuple: + """Build a dimension tuple from header parts + column value.""" + n_dims = len(dims) + result = [""] * n_dims + + for hp, di in dim_map.items(): + if di < n_dims and hp < len(parts): + result[di] = parts[hp] + + # Fill any remaining empty slot with the column value + for i in range(n_dims): + if not result[i]: + result[i] = str(col_value) + + return tuple(result) + + +# ──────────────────────────────────────────────────────────── +# CLI entry point +# ──────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import sys + from Classes.Case.GMPLParser import GMPLParser + + if len(sys.argv) < 2: + print("Usage: python SliceInterpreter.py ") + sys.exit(1) + + parsed = GMPLParser.parse_file(sys.argv[1]) + normalized = SliceInterpreter().interpret(parsed) + + print(f"Interpreted {len(normalized)} parameters with data.\n") + + for pname in sorted(normalized.keys())[:15]: + n = len(normalized[pname]) + print(f"\n{pname} ({n} tuples):") + for tup, val in list(normalized[pname].items())[:5]: + print(f" {tup} → {val}") + if n > 5: + print(f" ... ({n} total)") diff --git a/API/Classes/Case/validate_interpreter.py b/API/Classes/Case/validate_interpreter.py new file mode 100644 index 000000000..71cfd977a --- /dev/null +++ b/API/Classes/Case/validate_interpreter.py @@ -0,0 +1,85 @@ +"""Phase 2 — Validate SliceInterpreter against UTOPIA and MUIO fixtures.""" + +from __future__ import annotations +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) +from Classes.Case.GMPLParser import GMPLParser +from Classes.Case.SliceInterpreter import SliceInterpreter + +_FIXTURES = Path(__file__).resolve().parent / "test_fixtures" + + +def _run() -> None: + interp = SliceInterpreter() + + # ── UTOPIA ── + print("Phase 2: Interpreting UTOPIA...") + u = GMPLParser.parse_file(_FIXTURES / "utopia.txt") + n = interp.interpret(u) + print(f" → {len(n)} parameters with data") + + # CapitalCost + assert "CapitalCost" in n, "CapitalCost missing" + cc = n["CapitalCost"] + assert len(cc) == 231, f"Expected 231 CC tuples, got {len(cc)}" + assert ("UTOPIA", "E01", "1990") in cc + assert cc[("UTOPIA", "E01", "1990")] == 1400 + print(f" → CapitalCost: {len(cc)} tuples, ('UTOPIA','E01','1990')={cc[('UTOPIA','E01','1990')]}") + + # InputActivityRatio (5D) + assert "InputActivityRatio" in n, "IAR missing" + iar = n["InputActivityRatio"] + assert len(iar) > 0 + sample_key = next(iter(iar)) + assert len(sample_key) == 5, f"IAR tuple should be 5D, got {len(sample_key)}D" + print(f" → IAR: {len(iar)} tuples, sample {sample_key}={iar[sample_key]}") + + # Conversionls — now 3D with region + assert "Conversionls" in n + cls = n["Conversionls"] + sample = next(iter(cls)) + assert len(sample) == 3, f"CLS tuple should be 3D, got {len(sample)}D" + assert sample[0] == "UTOPIA", f"CLS region should be UTOPIA, got {sample[0]}" + print(f" → Conversionls: {len(cls)} tuples, sample {sample}") + + # TTS/TFS + assert "TechnologyToStorage" in n + tts = n["TechnologyToStorage"] + sample_tts = next(iter(tts)) + assert len(sample_tts) == 4, f"TTS should be 4D, got {len(sample_tts)}D" + print(f" → TTS: {tts}") + + # ── MUIO sample ── + print("\nPhase 2: Interpreting MUIO sample...") + m = GMPLParser.parse_file(_FIXTURES / "muio_sample.txt") + nm = interp.interpret(m) + print(f" → {len(nm)} parameters with data") + + # CapitalCost + assert "CapitalCost" in nm + mcc = nm["CapitalCost"] + assert ("RE1", "Coal", "2020") in mcc + assert mcc[("RE1", "Coal", "2020")] == 1500 + print(f" → CC: {len(mcc)} tuples, ('RE1','Coal','2020')={mcc[('RE1','Coal','2020')]}") + + # CapacityFactor (4D) + assert "CapacityFactor" in nm + cf = nm["CapacityFactor"] + sample_cf = next(iter(cf)) + assert len(sample_cf) == 4, f"CF should be 4D, got {len(sample_cf)}D" + print(f" → CF: {len(cf)} tuples") + + # MUIO TTS + assert "TechnologyToStorage" in nm + mtts = nm["TechnologyToStorage"] + print(f" → TTS: {mtts}") + + print() + print("=" * 60) + print("✅ All Phase 2 validation checks passed!") + + +if __name__ == "__main__": + _run() diff --git a/API/Classes/Case/validate_parser.py b/API/Classes/Case/validate_parser.py new file mode 100644 index 000000000..ac81a7a91 --- /dev/null +++ b/API/Classes/Case/validate_parser.py @@ -0,0 +1,68 @@ +"""Phase 1 — Validate GMPLParser against UTOPIA and MUIO sample fixtures.""" + +from __future__ import annotations +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) +from Classes.Case.GMPLParser import GMPLParser + +_FIXTURES = Path(__file__).resolve().parent / "test_fixtures" + + +def _run() -> None: + # ── UTOPIA ── + print("Phase 1: Parsing UTOPIA...") + u = GMPLParser.parse_file(_FIXTURES / "utopia.txt") + assert len(u.sets) >= 10, f"Expected ≥10 sets, got {len(u.sets)}" + assert "TECHNOLOGY" in u.sets + assert "YEAR" in u.sets + assert len(u.sets["TECHNOLOGY"]) == 21, f"Expected 21 techs, got {len(u.sets['TECHNOLOGY'])}" + assert len(u.sets["YEAR"]) == 21 + assert len(u.params) >= 40, f"Expected ≥40 params, got {len(u.params)}" + + # Check a specific param + cc = next((p for p in u.params if p.name == "CapitalCost"), None) + assert cc is not None, "CapitalCost not found" + assert cc.default == 0 + assert len(cc.slices) > 0 + total_rows = sum(len(s.rows) for s in cc.slices) + assert total_rows > 0, "CapitalCost has no rows" + print(f" → {len(u.sets)} sets, {len(u.params)} params") + print(f" → CapitalCost: {len(cc.slices)} slices, {total_rows} rows") + + # Check set membership + assert "E01" in u.sets["TECHNOLOGY"] + assert "1990" in u.sets["YEAR"] + + # ── MUIO sample ── + print("\nPhase 1: Parsing MUIO sample...") + m = GMPLParser.parse_file(_FIXTURES / "muio_sample.txt") + assert len(m.sets) >= 9, f"Expected ≥9 sets, got {len(m.sets)}" + assert "COMMODITY" in m.sets, "Expected COMMODITY (not FUEL)" + assert "Coal" in m.sets["TECHNOLOGY"] + assert len(m.sets["YEAR"]) == 3 + + mcc = next((p for p in m.params if p.name == "CapitalCost"), None) + assert mcc is not None + print(f" → {len(m.sets)} sets, {len(m.params)} params") + + # Check storage params parsed + tts = next((p for p in m.params if p.name == "TechnologyToStorage"), None) + assert tts is not None, "TechnologyToStorage not found" + assert len(tts.slices) > 0 + print(f" → TechnologyToStorage: {len(tts.slices)} slices") + + # Check conversion matrix + cls = next((p for p in m.params if p.name == "Conversionls"), None) + assert cls is not None, "Conversionls not found" + assert len(cls.slices) > 0 + print(f" → Conversionls: {len(cls.slices)} slices") + + print() + print("=" * 60) + print("✅ All Phase 1 validation checks passed!") + + +if __name__ == "__main__": + _run() diff --git a/API/Classes/Case/validate_transformer.py b/API/Classes/Case/validate_transformer.py new file mode 100644 index 000000000..b4ca22018 --- /dev/null +++ b/API/Classes/Case/validate_transformer.py @@ -0,0 +1,103 @@ +"""Phase 3 — Validate MuioTransformer against UTOPIA and MUIO fixtures.""" + +from __future__ import annotations +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) +from Classes.Case.GMPLParser import GMPLParser +from Classes.Case.SliceInterpreter import SliceInterpreter +from Classes.Case.MuioTransformer import MuioTransformer + +_FIXTURES = Path(__file__).resolve().parent / "test_fixtures" + + +def _validate_fixture(label: str, path: Path) -> dict: + print(f"\nPhase 3: Parsing & transforming {label}...") + parsed = GMPLParser.parse_file(path) + normalized = SliceInterpreter().interpret(parsed) + result = MuioTransformer.transform(parsed, normalized) + + # Count file groups + file_groups = [k for k in result if k != "genData"] + print(f" → {len(file_groups)} file groups") + + # genData checks + gen = result["genData"] + assert "osy-scenarios" in gen + assert gen["osy-scenarios"][0]["ScenarioId"] == "SC_0" + assert len(gen["osy-years"]) > 0 + assert len(gen["osy-tech"]) > 0 + assert "TechId" in gen["osy-tech"][0] + assert gen["osy-tech"][0]["TechId"].startswith("T_") + print(f" → genData: {len(gen['osy-tech'])} techs, {len(gen['osy-years'])} years") + + # SC_0 envelope on all data + for fg_name in file_groups: + for key, val in result[fg_name].items(): + assert "SC_0" in val, f"{fg_name}.{key} missing SC_0 envelope" + + # Check CapitalCost exists in RYT + if "RYT" in result and "CC" in result["RYT"]: + cc = result["RYT"]["CC"]["SC_0"] + assert len(cc) > 0 + rec = cc[0] + assert "TechId" in rec + assert "Year" in rec + assert "Value" in rec + print(f" CapitalCost: {len(cc)} long-form records") + for r in cc[:3]: + print(f" {r}") + + return result + + +def _run() -> None: + # ── UTOPIA ── + r1 = _validate_fixture("UTOPIA", _FIXTURES / "utopia.txt") + + # Specific UTOPIA checks + assert len(r1["RYT"]["CC"]["SC_0"]) == 231 + assert r1["RYT"]["CC"]["SC_0"][0]["TechId"].startswith("T_") + + # RTSM + if "RTSM" in r1 and r1["RTSM"]: + tts = r1["RTSM"]["TTS"]["SC_0"] + assert len(tts) == 1 + assert "StgId" in tts[0] + assert tts[0]["StgId"].startswith("S_") + assert tts[0]["TechId"].startswith("T_") + print(f" RTSM.TTS: {tts[0]}") + + # Determinism + p1 = GMPLParser.parse_file(_FIXTURES / "utopia.txt") + n1 = SliceInterpreter().interpret(p1) + r1b = MuioTransformer.transform(p1, n1) + import json + j1 = json.dumps(r1, sort_keys=True, default=str) + j2 = json.dumps(r1b, sort_keys=True, default=str) + assert j1 == j2, "Output not deterministic!" + print(" ✓ Deterministic output confirmed") + + # ── MUIO sample ── + r2 = _validate_fixture("MUIO sample", _FIXTURES / "muio_sample.txt") + + # MUIO-specific: COMMODITY set (not FUEL) + gen2 = r2["genData"] + assert len(gen2["osy-comm"]) > 0 + assert gen2["osy-comm"][0]["CommId"].startswith("C_") + + # MUIO CC check + if "RYT" in r2 and "CC" in r2["RYT"]: + mcc = r2["RYT"]["CC"]["SC_0"] + # Find a Coal/T_x record for year 2020 + coal_2020 = [r for r in mcc if r["Year"] == "2020" and r["Value"] == 1500] + assert len(coal_2020) > 0, "Missing Coal CC=1500 for 2020" + + print() + print("=" * 60) + print("✅ All Phase 3 validation checks passed!") + + +if __name__ == "__main__": + _run()