From 0ac3d56c0ad396550b9647ac9153caebb1d1886e Mon Sep 17 00:00:00 2001 From: Parth Date: Fri, 20 Feb 2026 02:55:45 +0530 Subject: [PATCH] =?UTF-8?q?Add=20GMPL=20=E2=86=92=20MUIO=20conversion=20en?= =?UTF-8?q?gine?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implements 3-layer architecture: - GMPLParser (syntax extraction) - SliceInterpreter (wildcard expansion to tuples) - MuioTransformer (tuple → MUIO JSON schema) Includes validation scripts and UTOPIA/MUIO fixtures. --- API/Classes/Case/GMPLParser.py | 594 +++++++++++ API/Classes/Case/MuioTransformer.py | 1174 ++++++++++++++++++++++ API/Classes/Case/SliceInterpreter.py | 801 +++++++++++++++ API/Classes/Case/validate_interpreter.py | 195 ++++ API/Classes/Case/validate_parser.py | 148 +++ API/Classes/Case/validate_transformer.py | 296 ++++++ 6 files changed, 3208 insertions(+) create mode 100644 API/Classes/Case/GMPLParser.py create mode 100644 API/Classes/Case/MuioTransformer.py create mode 100644 API/Classes/Case/SliceInterpreter.py create mode 100644 API/Classes/Case/validate_interpreter.py create mode 100644 API/Classes/Case/validate_parser.py create mode 100644 API/Classes/Case/validate_transformer.py diff --git a/API/Classes/Case/GMPLParser.py b/API/Classes/Case/GMPLParser.py new file mode 100644 index 00000000..704cf548 --- /dev/null +++ b/API/Classes/Case/GMPLParser.py @@ -0,0 +1,594 @@ +""" +GMPLParser — Phase 1: Pure GMPL syntax extraction. + +Parses a GMPL data file (.txt / .dat) and extracts its structural +representation WITHOUT applying any semantic interpretation or +transformation. No renaming, no pivoting, no ID generation. + +Output data structures +---------------------- +parsed_sets : dict[str, list[str]] + { "TECHNOLOGY": ["E01", "E21", ...], "FUEL": ["DSL", "ELC", ...], ... } + +parsed_params : list[ParsedParam] + Each ParsedParam is a dataclass with: + name : str – parameter name + default : str | None – raw default value string + slices : list[SliceBlock] + Each SliceBlock is a dataclass with: + header : list[str] – raw slice header tokens, e.g. ["RE1","*","*"] + column_labels : list[str] – column header tokens, e.g. ["2020","2025","2030"] + rows : list[RowEntry] + Each RowEntry is a dataclass with: + key : str – row label (left-most token) + values : list[str] – raw value strings +""" + +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class RowEntry: + """A single data row inside a slice block.""" + key: str + values: list[str] + + def __repr__(self) -> str: + vals = ", ".join(self.values[:5]) + suffix = ", ..." if len(self.values) > 5 else "" + return f"RowEntry(key={self.key!r}, values=[{vals}{suffix}])" + + +@dataclass +class SliceBlock: + """One sub-block within a parameter, identified by its slice header.""" + header: list[str] + column_labels: list[str] = field(default_factory=list) + rows: list[RowEntry] = field(default_factory=list) + + def __repr__(self) -> str: + hdr = ",".join(self.header) if self.header else "" + cols = ", ".join(self.column_labels[:5]) + col_suffix = ", ..." if len(self.column_labels) > 5 else "" + return ( + f"SliceBlock(header=[{hdr}], " + f"columns=[{cols}{col_suffix}], " + f"rows({len(self.rows)})={self.rows!r})" + ) + + +@dataclass +class ParsedParam: + """A single `param` block with its default value and slice data.""" + name: str + default: Optional[str] = None + slices: list[SliceBlock] = field(default_factory=list) + + def __repr__(self) -> str: + return ( + f"ParsedParam(name={self.name!r}, " + f"default={self.default!r}, " + f"slices({len(self.slices)})={self.slices!r})" + ) + + +@dataclass +class GMPLParseResult: + """Complete parse result for one GMPL data file.""" + sets: dict[str, list[str]] = field(default_factory=dict) + params: list[ParsedParam] = field(default_factory=list) + + def summary(self) -> str: + lines = [f"Sets ({len(self.sets)}):"] + for name, members in self.sets.items(): + lines.append(f" {name}: {members}") + lines.append(f"\nParams ({len(self.params)}):") + for p in self.params: + n_rows = sum(len(s.rows) for s in p.slices) + lines.append( + f" {p.name} (default={p.default}, " + f"slices={len(p.slices)}, total_rows={n_rows})" + ) + return "\n".join(lines) + + +# --------------------------------------------------------------------------- +# Tokeniser helpers +# --------------------------------------------------------------------------- + +def _strip_comment(line: str) -> str: + """Remove inline comments. '#' inside strings is not handled (N/A here).""" + idx = line.find("#") + if idx >= 0: + return line[:idx] + return line + + +def _clean(line: str) -> str: + """Strip comments, replace tabs with spaces, strip outer whitespace.""" + return _strip_comment(line).replace("\t", " ").strip() + + +def _split_tokens(text: str) -> list[str]: + """Split on whitespace, discarding empty strings. + + Also splits glued tokens like ``999:=`` into ``['999', ':=']`` + and ``0.`` into ``['0']`` (trailing dot on numbers). + """ + raw = text.split() + result: list[str] = [] + for tok in raw: + # Split glued ':=' — e.g., '999:=' → '999', ':=' + if tok.endswith(":=") and len(tok) > 2: + result.append(tok[:-2]) + result.append(":=") + # Split glued ':=' in the middle — e.g., 'default:=' + elif ":=" in tok and not tok.startswith(":="): + parts = tok.split(":=", 1) + if parts[0]: + result.append(parts[0]) + result.append(":=") + if parts[1]: + result.append(parts[1]) + else: + result.append(tok) + return result + + +def _join_lines(lines: list[str]) -> str: + """Collapse a list of raw lines into one continuous string, + stripping comments and normalising whitespace.""" + parts: list[str] = [] + for raw in lines: + cleaned = _clean(raw) + if cleaned: + parts.append(cleaned) + return " ".join(parts) + + +# --------------------------------------------------------------------------- +# Main parser +# --------------------------------------------------------------------------- + +class GMPLParser: + """ + Phase 1 parser: pure syntax extraction from a GMPL data file. + + Usage + ----- + >>> result = GMPLParser.parse_file("path/to/data.txt") + >>> print(result.summary()) + """ + + @staticmethod + def parse_file(filepath: str | Path) -> GMPLParseResult: + """Parse a GMPL data file and return structured representation.""" + filepath = Path(filepath) + with open(filepath, "r", encoding="utf-8-sig") as f: + raw_lines = f.readlines() + return GMPLParser._parse_lines(raw_lines) + + @staticmethod + def parse_string(text: str) -> GMPLParseResult: + """Parse GMPL content from a string.""" + return GMPLParser._parse_lines(text.splitlines(keepends=True)) + + # ------------------------------------------------------------------ + # Internal implementation + # ------------------------------------------------------------------ + + @staticmethod + def _parse_lines(raw_lines: list[str]) -> GMPLParseResult: + result = GMPLParseResult() + + # Flatten into a single token stream terminated by semicolons. + # We process statement-by-statement where each statement ends + # at ';'. + statements = GMPLParser._split_into_statements(raw_lines) + + for stmt in statements: + tokens = _split_tokens(stmt) + if not tokens: + continue + + keyword = tokens[0].lower() + + if keyword == "set": + GMPLParser._handle_set(tokens, result) + elif keyword == "param": + GMPLParser._handle_param(tokens, stmt, result) + elif keyword == "end": + break # end; — stop processing + # Ignore everything else (e.g. bare comments, decorative lines) + + return result + + @staticmethod + def _split_into_statements(raw_lines: list[str]) -> list[str]: + """Split the file into statements delimited by ';'. + + Comments and blank lines are stripped. The ';' itself is NOT + included in the returned statement text. + """ + statements: list[str] = [] + buf: list[str] = [] + + for raw in raw_lines: + cleaned = _clean(raw) + if not cleaned: + continue + + # Check for 'end;' as a special terminator. + if cleaned.lower().rstrip("; ") == "end": + # Flush anything in buffer, then add sentinel. + if buf: + statements.append(" ".join(buf)) + buf.clear() + statements.append("end") + break + + # A line may contain one or more ';' (e.g. inline terminators). + while ";" in cleaned: + idx = cleaned.index(";") + before = cleaned[:idx].strip() + if before: + buf.append(before) + # Flush buffer as one complete statement. + statements.append(" ".join(buf)) + buf.clear() + cleaned = cleaned[idx + 1:].strip() + + if cleaned: + buf.append(cleaned) + + # Anything left in buffer (no trailing ';') — flush as-is. + if buf: + statements.append(" ".join(buf)) + + return statements + + # ------------------------------------------------------------------ + # set handler + # ------------------------------------------------------------------ + + @staticmethod + def _handle_set(tokens: list[str], result: GMPLParseResult) -> None: + """Parse set NAME := member1 member2 ... """ + # tokens: ['set', 'NAME', ':=', 'A', 'B', ...] + # or: ['set', 'NAME', ':='] (empty set) + if len(tokens) < 2: + return + + name = tokens[1] + # Find ':=' to locate start of members. + members: list[str] = [] + assign_found = False + for i, tok in enumerate(tokens): + if tok == ":=": + assign_found = True + members = tokens[i + 1:] + break + # Handle ':=' split across tokens (e.g. ':' and '=') + if tok == ":" and i + 1 < len(tokens) and tokens[i + 1] == "=": + assign_found = True + members = tokens[i + 2:] + break + + if not assign_found: + # No ':=' — members are everything after the name + members = tokens[2:] + + result.sets[name] = members + + # ------------------------------------------------------------------ + # param handler + # ------------------------------------------------------------------ + + @staticmethod + def _handle_param(tokens: list[str], raw_stmt: str, result: GMPLParseResult) -> None: + """Parse a full param statement. + + This handles: + - param Name default Val := (empty body) + - param Name default Val := [data] (body with data) + - param Name := value (scalar assignment like ResultsPath) + """ + if len(tokens) < 2: + return + + name = tokens[1] + default_val: Optional[str] = None + + # Locate 'default' keyword and ':=' separator. + # Also handle params with no 'default' that use bare ':' as + # column header start (e.g., 'param YearSplit : 1990 ... :=') + assign_idx: Optional[int] = None + bare_colon_idx: Optional[int] = None # Position of bare ':' (not ':=') + i = 2 + while i < len(tokens): + tok_lower = tokens[i].lower() + if tok_lower == "default": + if i + 1 < len(tokens) and tokens[i + 1] != ":=": + default_val = tokens[i + 1] + i += 2 + continue + else: + i += 1 + continue + + if tokens[i] == ":=": + assign_idx = i + break + + # Handle ':' '=' as separate tokens + if tokens[i] == ":" and i + 1 < len(tokens) and tokens[i + 1] == "=": + assign_idx = i + break + + # Track bare ':' (not ':=') — used in headerless tables + if tokens[i] == ":" and bare_colon_idx is None: + bare_colon_idx = i + + i += 1 + + if assign_idx is None: + # No ':=' found — malformed, skip + result.params.append(ParsedParam(name=name, default=default_val)) + return + + # Everything after ':=' is the body. + body_tokens = tokens[assign_idx + 1:] + + # Handle ':' '=' as two tokens. + if tokens[assign_idx] == ":": + body_tokens = tokens[assign_idx + 2:] + + param = ParsedParam(name=name, default=default_val) + + # If we found a bare ':' before ':=' and there's no slice header, + # the tokens between ':' and ':=' are column labels for a + # headerless table (e.g., 'param YearSplit : 1990 ... 2010 :=') + if bare_colon_idx is not None and bare_colon_idx < assign_idx: + # Column labels are tokens between bare_colon_idx+1 and assign_idx. + col_labels = tokens[bare_colon_idx + 1 : assign_idx] + # Filter out any stray '=' from split ':' '=' + col_labels = [c for c in col_labels if c not in ("=", ":")] + if col_labels: + implicit_slice = SliceBlock( + header=[], + column_labels=col_labels, + ) + param.slices.append(implicit_slice) + # Body tokens are data rows. + if body_tokens: + GMPLParser._parse_data_rows(body_tokens, implicit_slice) + result.params.append(param) + return + + if not body_tokens: + # Empty body: param X default Y := + result.params.append(param) + return + + # Check for scalar assignment (e.g., param ResultsPath := "results") + # A scalar has no slice headers and no tabular data. + if len(body_tokens) == 1 and not body_tokens[0].startswith("["): + # Single scalar value. + scalar_block = SliceBlock(header=[], column_labels=[], rows=[ + RowEntry(key=name, values=[body_tokens[0]]) + ]) + param.slices.append(scalar_block) + result.params.append(param) + return + + # Parse tabular body. + GMPLParser._parse_param_body(body_tokens, param) + result.params.append(param) + + @staticmethod + def _parse_param_body(body_tokens: list[str], param: ParsedParam) -> None: + """Parse the body of a param statement into SliceBlocks. + + The body_tokens have already had ';' removed (statement splitting). + """ + # Strategy: walk through tokens. We recognise: + # [X,Y,...] or [X,Y,...]: → start of a new slice header + # Tokens containing ':=' or ':' then '=' → column header separator + # Otherwise → data rows + + current_slice: Optional[SliceBlock] = None + columns_pending = False # True when we've started a slice but haven't read columns yet + i = 0 + + while i < len(body_tokens): + tok = body_tokens[i] + + # ---- Detect slice header: [X,Y,...] or [X,Y,...]: ---- + if tok.startswith("["): + # A new slice block begins. + header_str = tok + # Accumulate tokens until we close the bracket. + while "]" not in header_str and i + 1 < len(body_tokens): + i += 1 + header_str += " " + body_tokens[i] + + # Clean: remove brackets, trailing ':', split on ',' + header_str = header_str.strip("[] :") + header_parts = [h.strip() for h in header_str.split(",")] + + current_slice = SliceBlock(header=header_parts) + param.slices.append(current_slice) + columns_pending = True + i += 1 + continue + + # ---- Detect column headers (contains ':=') ---- + # Column header line pattern: val1 val2 ... := or : val1 ... := + if columns_pending and current_slice is not None: + # Scan forward to find ':=' on this logical line. + col_tokens: list[str] = [] + found_assign = False + j = i + while j < len(body_tokens): + t = body_tokens[j] + if t == ":=": + found_assign = True + j += 1 + break + # Handle combined token ending with ':=' + if t.endswith(":="): + col_tokens.append(t[:-2]) + found_assign = True + j += 1 + break + # Skip bare ':' + if t == ":": + j += 1 + continue + col_tokens.append(t) + j += 1 + + if found_assign: + current_slice.column_labels = [c for c in col_tokens if c] + columns_pending = False + i = j + continue + else: + # No ':=' found — these are data rows, not columns. + columns_pending = False + # Fall through to row parsing. + + # ---- If no current slice, we need an implicit one ---- + if current_slice is None: + # No slice header seen yet — create an implicit headerless slice. + # Scan for ':=' first (column labels before it). + col_tokens_imp: list[str] = [] + found_assign_imp = False + j = i + while j < len(body_tokens): + t = body_tokens[j] + if t == ":=": + found_assign_imp = True + j += 1 + break + if t.endswith(":="): + col_tokens_imp.append(t[:-2]) + found_assign_imp = True + j += 1 + break + if t == ":": + j += 1 + continue + col_tokens_imp.append(t) + j += 1 + + if found_assign_imp: + current_slice = SliceBlock( + header=[], + column_labels=[c for c in col_tokens_imp if c], + ) + param.slices.append(current_slice) + columns_pending = False + i = j + continue + else: + # Just skip — can't parse without structure. + i += 1 + continue + + # ---- Data row parsing ---- + # A data row starts with a row key followed by values. + # Number of values should match column count. + if current_slice is not None and current_slice.column_labels: + GMPLParser._parse_data_rows_from( + body_tokens, i, current_slice + ) + # Advance past all remaining data rows until next slice or end. + i = GMPLParser._find_next_slice_or_end(body_tokens, i) + continue + + # Fallback: skip unrecognised token. + i += 1 + + @staticmethod + def _parse_data_rows(body_tokens: list[str], slice_block: SliceBlock) -> None: + """Parse all data rows from body_tokens into slice_block.""" + GMPLParser._parse_data_rows_from(body_tokens, 0, slice_block) + + @staticmethod + def _parse_data_rows_from( + tokens: list[str], start: int, slice_block: SliceBlock + ) -> None: + """Parse data rows starting at `start` until a new slice header or end.""" + num_cols = len(slice_block.column_labels) + if num_cols == 0: + return + + i = start + while i < len(tokens): + tok = tokens[i] + # Stop if we hit a new slice header. + if tok.startswith("["): + break + + row_key = tok + values: list[str] = [] + j = i + 1 + while j < len(tokens) and len(values) < num_cols: + next_tok = tokens[j] + if next_tok.startswith("[") or next_tok == ":=": + break + values.append(next_tok) + j += 1 + + if values: + slice_block.rows.append(RowEntry(key=row_key, values=values)) + i = j + + @staticmethod + def _find_next_slice_or_end(tokens: list[str], start: int) -> int: + """Find the index of the next '[' token or end of token list.""" + i = start + while i < len(tokens): + if tokens[i].startswith("["): + return i + i += 1 + return i + + +# --------------------------------------------------------------------------- +# CLI entry point for quick testing +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import sys + import json + + if len(sys.argv) < 2: + print("Usage: python GMPLParser.py ") + sys.exit(1) + + path = sys.argv[1] + result = GMPLParser.parse_file(path) + print(result.summary()) + print("\n" + "=" * 60 + "\n") + + # Detailed dump + for p in result.params: + print(f"\nparam {p.name} (default={p.default}):") + for si, s in enumerate(p.slices): + print(f" slice[{si}]: header={s.header}") + print(f" columns: {s.column_labels}") + for r in s.rows[:3]: + print(f" {r}") + if len(s.rows) > 3: + print(f" ... ({len(s.rows)} rows total)") diff --git a/API/Classes/Case/MuioTransformer.py b/API/Classes/Case/MuioTransformer.py new file mode 100644 index 00000000..0e3b267b --- /dev/null +++ b/API/Classes/Case/MuioTransformer.py @@ -0,0 +1,1174 @@ +""" +Phase 3 — MUIO Transformer. + +Transforms normalized tuple output from SliceInterpreter into the exact +MUIO JSON file structures used in WebAPP/DataStorage cases. + +Input +----- + normalized_data : dict[str, dict[tuple, float]] + Output of SliceInterpreter.interpret() + sets : dict[str, list[str]] + Set membership from GMPLParseResult.sets + +Output +------ + Dictionary of JSON-ready structures matching MUIO format: + genData.json, R.json, RT.json, RYT.json, RYTCM.json, … + +Does NOT write any files to disk. +All records are kept in long-form: {TechId, Year, Value} — no wide pivot. +""" + +from __future__ import annotations + +from collections import defaultdict +from typing import Optional, Union + + +# ───────────────────────────────────────────────────────────── +# Parameter Mapping Registry +# Maps OSeMOSYS parameter name → {file, key, dims} +# ───────────────────────────────────────────────────────────── + +PARAM_MAPPING: dict[str, dict] = { + # ── R (REGION only) ── + "DiscountRate": { + "file": "R", "key": "DR", + "dims": ["REGION"], + }, + + # ── RT (REGION × TECHNOLOGY) ── + "DiscountRateIdv": { + "file": "RT", "key": "DRI", + "dims": ["REGION", "TECHNOLOGY"], + }, + "CapacityToActivityUnit": { + "file": "RT", "key": "CAU", + "dims": ["REGION", "TECHNOLOGY"], + }, + "OperationalLife": { + "file": "RT", "key": "OL", + "dims": ["REGION", "TECHNOLOGY"], + }, + "TotalTechnologyModelPeriodActivityLowerLimit": { + "file": "RT", "key": "TMPAL", + "dims": ["REGION", "TECHNOLOGY"], + }, + "TotalTechnologyModelPeriodActivityUpperLimit": { + "file": "RT", "key": "TMPAU", + "dims": ["REGION", "TECHNOLOGY"], + }, + + # ── RE (REGION × EMISSION) ── + "ModelPeriodEmissionLimit": { + "file": "RE", "key": "MPEL", + "dims": ["REGION", "EMISSION"], + }, + "ModelPeriodExogenousEmission": { + "file": "RE", "key": "MPEE", + "dims": ["REGION", "EMISSION"], + }, + + # ── RS (REGION × STORAGE) ── + "OperationalLifeStorage": { + "file": "RS", "key": "OLS", + "dims": ["REGION", "STORAGE"], + }, + "StorageLevelStart": { + "file": "RS", "key": "SLS", + "dims": ["REGION", "STORAGE"], + }, + + # ── RY (REGION × YEAR) ── + "DiscountRateStorage": { + "file": "RY", "key": "DRS", + "dims": ["REGION", "YEAR"], + }, + "ReserveMargin": { + "file": "RY", "key": "RM", + "dims": ["REGION", "YEAR"], + }, + + # ── RYT (REGION × YEAR × TECHNOLOGY) ── + "AvailabilityFactor": { + "file": "RYT", "key": "AF", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "CapitalCost": { + "file": "RYT", "key": "CC", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "FixedCost": { + "file": "RYT", "key": "FC", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "ResidualCapacity": { + "file": "RYT", "key": "RC", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "TotalAnnualMaxCapacity": { + "file": "RYT", "key": "TAMaxC", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "TotalAnnualMaxCapacityInvestment": { + "file": "RYT", "key": "TAMaxCI", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "TotalAnnualMinCapacity": { + "file": "RYT", "key": "TAMinC", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "TotalAnnualMinCapacityInvestment": { + "file": "RYT", "key": "TAMinCI", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "TotalTechnologyAnnualActivityLowerLimit": { + "file": "RYT", "key": "TAL", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "TotalTechnologyAnnualActivityUpperLimit": { + "file": "RYT", "key": "TAU", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + "CapacityOfOneTechnologyUnit": { + "file": "RYT", "key": "COTU", + "dims": ["REGION", "TECHNOLOGY", "YEAR"], + }, + + # ── RYTM (REGION × YEAR × TECHNOLOGY × MODE) ── + "VariableCost": { + "file": "RYTM", "key": "VC", + "dims": ["REGION", "TECHNOLOGY", "MODE_OF_OPERATION", "YEAR"], + }, + "TechnologyActivityByModeLowerLimit": { + "file": "RYTM", "key": "TAMLL", + "dims": ["REGION", "TECHNOLOGY", "MODE_OF_OPERATION", "YEAR"], + }, + "TechnologyActivityByModeUpperLimit": { + "file": "RYTM", "key": "TAMUL", + "dims": ["REGION", "TECHNOLOGY", "MODE_OF_OPERATION", "YEAR"], + }, + "TechnologyActivityDecreaseByModeLimit": { + "file": "RYTM", "key": "TADML", + "dims": ["REGION", "TECHNOLOGY", "MODE_OF_OPERATION", "YEAR"], + }, + "TechnologyActivityIncreasedByModeLimit": { + "file": "RYTM", "key": "TAIML", + "dims": ["REGION", "TECHNOLOGY", "MODE_OF_OPERATION", "YEAR"], + }, + + # ── RYTC (REGION × YEAR × TECHNOLOGY × COMMODITY) ── + "InputToNewCapacityRatio": { + "file": "RYTC", "key": "INCR", + "dims": ["REGION", "TECHNOLOGY", "COMMODITY", "YEAR"], + }, + "InputToTotalCapacityRatio": { + "file": "RYTC", "key": "ITCR", + "dims": ["REGION", "TECHNOLOGY", "COMMODITY", "YEAR"], + }, + + # ── RYTCM (REGION × YEAR × TECHNOLOGY × COMMODITY × MODE) ── + "InputActivityRatio": { + "file": "RYTCM", "key": "IAR", + "dims": ["REGION", "TECHNOLOGY", "COMMODITY", "MODE_OF_OPERATION", "YEAR"], + }, + "OutputActivityRatio": { + "file": "RYTCM", "key": "OAR", + "dims": ["REGION", "TECHNOLOGY", "COMMODITY", "MODE_OF_OPERATION", "YEAR"], + }, + + # ── RYC (REGION × YEAR × COMMODITY) ── + "AccumulatedAnnualDemand": { + "file": "RYC", "key": "AAD", + "dims": ["REGION", "COMMODITY", "YEAR"], + }, + "SpecifiedAnnualDemand": { + "file": "RYC", "key": "SAD", + "dims": ["REGION", "COMMODITY", "YEAR"], + }, + + # ── RYE (REGION × YEAR × EMISSION) ── + "AnnualEmissionLimit": { + "file": "RYE", "key": "AEL", + "dims": ["REGION", "EMISSION", "YEAR"], + }, + "EmissionsPenalty": { + "file": "RYE", "key": "EP", + "dims": ["REGION", "EMISSION", "YEAR"], + }, + "AnnualExogenousEmission": { + "file": "RYE", "key": "AEE", + "dims": ["REGION", "EMISSION", "YEAR"], + }, + + # ── RYS (REGION × YEAR × STORAGE) ── + "CapitalCostStorage": { + "file": "RYS", "key": "CCS", + "dims": ["REGION", "STORAGE", "YEAR"], + }, + "ResidualStorageCapacity": { + "file": "RYS", "key": "RSC", + "dims": ["REGION", "STORAGE", "YEAR"], + }, + "MinStorageCharge": { + "file": "RYS", "key": "MSC", + "dims": ["REGION", "STORAGE", "YEAR"], + }, + + # ── RYTs (REGION × YEAR × TIMESLICE) ── + "YearSplit": { + "file": "RYTs", "key": "YS", + "dims": ["REGION", "TIMESLICE", "YEAR"], + }, + + # ── RYTTs (REGION × YEAR × TECHNOLOGY × TIMESLICE) ── + "CapacityFactor": { + "file": "RYTTs", "key": "CF", + "dims": ["REGION", "TECHNOLOGY", "TIMESLICE", "YEAR"], + }, + + # ── RYCTs (REGION × YEAR × COMMODITY × TIMESLICE) ── + "SpecifiedDemandProfile": { + "file": "RYCTs", "key": "SDP", + "dims": ["REGION", "COMMODITY", "TIMESLICE", "YEAR"], + }, + + # ── RYTE (REGION × YEAR × TECHNOLOGY × EMISSION) ── + "ReserveMarginTagFuel": { + "file": "RYTE", "key": "RMTagF", + "dims": ["REGION", "TECHNOLOGY", "EMISSION", "YEAR"], + }, + "ReserveMarginTagTechnology": { + "file": "RYTE", "key": "RMTagT", + "dims": ["REGION", "TECHNOLOGY", "EMISSION", "YEAR"], + }, + + # ── RYTEM (REGION × YEAR × TECHNOLOGY × EMISSION × MODE) ── + "EmissionActivityRatio": { + "file": "RYTEM", "key": "EAR", + "dims": ["REGION", "TECHNOLOGY", "EMISSION", "MODE_OF_OPERATION", "YEAR"], + }, + "EmissionActivityChangeRatio": { + "file": "RYTEM", "key": "EACR", + "dims": ["REGION", "TECHNOLOGY", "EMISSION", "MODE_OF_OPERATION", "YEAR"], + }, + + # ── RTSM (REGION × TECHNOLOGY × STORAGE × MODE) — no YEAR ── + "TechnologyToStorage": { + "file": "RTSM", "key": "TTS", + "dims": ["REGION", "TECHNOLOGY", "STORAGE", "MODE_OF_OPERATION"], + }, + "TechnologyFromStorage": { + "file": "RTSM", "key": "TFS", + "dims": ["REGION", "TECHNOLOGY", "STORAGE", "MODE_OF_OPERATION"], + }, + + # ── RYDtb (REGION × YEAR × DAILYTIMEBRACKET) ── + "DaySplit": { + "file": "RYDtb", "key": "DS", + "dims": ["REGION", "DAILYTIMEBRACKET", "YEAR"], + }, + + # ── RYSeDt (REGION × YEAR × SEASON × DAYTYPE) ── + "DaysInDayType": { + "file": "RYSeDt", "key": "DIDT", + "dims": ["REGION", "SEASON", "DAYTYPE", "YEAR"], + }, + + # ── Conversion matrices (special — timeslice metadata) ── + "Conversionls": { + "file": "_conv", "key": "Conversionls", + "dims": ["TIMESLICE", "SEASON"], + }, + "Conversionld": { + "file": "_conv", "key": "Conversionld", + "dims": ["TIMESLICE", "DAYTYPE"], + }, + "Conversionlh": { + "file": "_conv", "key": "Conversionlh", + "dims": ["TIMESLICE", "DAILYTIMEBRACKET"], + }, + + # ── RYCn (REGION × YEAR × CONSTRAINT) ── + "UDCConstant": { + "file": "RYCn", "key": "UCC", + "dims": ["REGION", "CONSTRAINT", "YEAR"], + }, + + # ── RYTCn (REGION × YEAR × TECHNOLOGY × CONSTRAINT) ── + "UDCMultiplierTotalCapacity": { + "file": "RYTCn", "key": "CCM", + "dims": ["REGION", "TECHNOLOGY", "CONSTRAINT", "YEAR"], + }, + "UDCMultiplierNewCapacity": { + "file": "RYTCn", "key": "CNCM", + "dims": ["REGION", "TECHNOLOGY", "CONSTRAINT", "YEAR"], + }, + "UDCMultiplierActivity": { + "file": "RYTCn", "key": "CAM", + "dims": ["REGION", "TECHNOLOGY", "CONSTRAINT", "YEAR"], + }, +} + + +# Map dimension names → MUIO record field names +DIM_TO_FIELD: dict[str, str] = { + "REGION": "RegId", + "TECHNOLOGY": "TechId", + "COMMODITY": "CommId", + "EMISSION": "EmisId", + "STORAGE": "StgId", + "MODE_OF_OPERATION": "MoId", + "TIMESLICE": "TsId", + "SEASON": "SeId", + "DAYTYPE": "DtId", + "DAILYTIMEBRACKET": "DtbId", + "CONSTRAINT": "ConId", + "YEAR": "Year", +} + + +class MuioTransformer: + """Transform SliceInterpreter output to MUIO JSON structures.""" + + def __init__( + self, + normalized_data: dict[str, dict[tuple, Union[int, float]]], + sets: dict[str, list[str]], + casename: str = "ImportedCase", + description: str = "", + ): + self._norm = normalized_data + self._raw_sets = dict(sets) + self._casename = casename + self._description = description + + # ── Step 1: Set Normalization ── + self._sets = self._normalize_sets(self._raw_sets) + + # ── Step 2: ID Generation ── + self._id_maps: dict[str, dict[str, str]] = {} + self._gen_ids() + + # ───────────────────────────────────────────────────────── + # Step 1 — Set Normalization + # ───────────────────────────────────────────────────────── + + @staticmethod + def _normalize_sets(sets: dict[str, list[str]]) -> dict[str, list[str]]: + """Rename FUEL → COMMODITY and inject MUIO-only sets.""" + out = dict(sets) + + # Rename FUEL → COMMODITY + if "FUEL" in out and "COMMODITY" not in out: + out["COMMODITY"] = out.pop("FUEL") + elif "FUEL" in out and "COMMODITY" in out: + combined = list(dict.fromkeys(out["COMMODITY"] + out["FUEL"])) + out["COMMODITY"] = combined + del out["FUEL"] + + # Inject empty MUIO-only sets if missing + for s in ("STORAGEINTRADAY", "STORAGEINTRAYEAR", "UDC"): + if s not in out: + out[s] = [] + + return out + + # ───────────────────────────────────────────────────────── + # Step 2 — ID Generation (deterministic, sorted) + # ───────────────────────────────────────────────────────── + + def _gen_ids(self) -> None: + """Generate deterministic MUIO IDs for all index sets.""" + id_specs = { + "TECHNOLOGY": "T", + "COMMODITY": "C", + "EMISSION": "E", + "STORAGE": "S", + "TIMESLICE": "Ts", + "SEASON": "SE", + "DAYTYPE": "DT", + "DAILYTIMEBRACKET": "DTB", + } + + for set_name, prefix in id_specs.items(): + members = sorted(self._sets.get(set_name, [])) + self._id_maps[set_name] = { + member: f"{prefix}_{i}" for i, member in enumerate(members) + } + + # MODE_OF_OPERATION — raw string, no remapping + modes = self._sets.get("MODE_OF_OPERATION", []) + self._id_maps["MODE_OF_OPERATION"] = {m: str(m) for m in modes} + + # REGION — keep original names + regions = self._sets.get("REGION", []) + self._id_maps["REGION"] = {r: r for r in regions} + + # YEAR — keep as string + years = self._sets.get("YEAR", []) + self._id_maps["YEAR"] = {y: str(y) for y in years} + + # ───────────────────────────────────────────────────────── + # Step 3 — ID translation helper + # ───────────────────────────────────────────────────────── + + def _translate_id(self, dim_name: str, original_val: str) -> str: + """Translate an original set member to its MUIO ID.""" + return self._id_maps.get(dim_name, {}).get(original_val, original_val) + + # ───────────────────────────────────────────────────────── + # Public API + # ───────────────────────────────────────────────────────── + + def transform(self) -> dict[str, dict]: + """ + Returns dict keyed by filename (without .json): + {"genData": {...}, "RYT": {...}, "RT": {...}, ...} + """ + result: dict[str, dict] = {} + + # Build genData + result["genData"] = self._build_gen_data() + + # Build all file groups + file_params: dict[str, list[tuple[str, str, list[str]]]] = defaultdict(list) + for param_name, mapping in PARAM_MAPPING.items(): + fg = mapping["file"] + key = mapping["key"] + dims = mapping["dims"] + if fg == "_conv": + continue # special + file_params[fg].append((param_name, key, dims)) + + for fg, params in file_params.items(): + if fg not in result: + result[fg] = {} + for param_name, muio_key, dims in params: + data = self._norm.get(param_name, {}) + result[fg][muio_key] = self._build_records(fg, dims, data) + + # Conversion matrices + result["_conv"] = self._build_conversions() + + return result + + # ───────────────────────────────────────────────────────── + # Step 5 — Record Builders + # Each returns {SC_0: [records]} + # All records are LONG-FORM — no wide year pivot. + # ───────────────────────────────────────────────────────── + + def _build_records( + self, + file_group: str, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + """Route to the correct builder based on file group.""" + if file_group == "R": + return self._build_R(data) + elif file_group == "RT": + return self._build_RT(dims, data) + elif file_group == "RE": + return self._build_RE(dims, data) + elif file_group == "RS": + return self._build_RS(dims, data) + elif file_group == "RY": + return self._build_RY(dims, data) + elif file_group == "RYT": + return self._build_RYT(dims, data) + elif file_group == "RYTM": + return self._build_RYTM(dims, data) + elif file_group == "RYTC": + return self._build_RYTC(dims, data) + elif file_group == "RYTCM": + return self._build_RYTCM(dims, data) + elif file_group == "RYC": + return self._build_RYC(dims, data) + elif file_group == "RYE": + return self._build_RYE(dims, data) + elif file_group == "RYS": + return self._build_RYS(dims, data) + elif file_group == "RYTs": + return self._build_RYTs(dims, data) + elif file_group == "RYTTs": + return self._build_RYTTs(dims, data) + elif file_group == "RYCTs": + return self._build_RYCTs(dims, data) + elif file_group == "RYTE": + return self._build_RYTE(dims, data) + elif file_group == "RYTEM": + return self._build_RYTEM(dims, data) + elif file_group == "RTSM": + return self._build_RTSM(dims, data) + elif file_group == "RYDtb": + return self._build_RYDtb(dims, data) + elif file_group == "RYSeDt": + return self._build_RYSeDt(dims, data) + elif file_group == "RYCn": + return self._build_RYCn(dims, data) + elif file_group == "RYTCn": + return self._build_RYTCn(dims, data) + else: + return self._build_generic_long(dims, data) + + # ── Generic long-form builder ── + + def _build_generic_long( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + """ + Generic long-form builder: one record per tuple. + Record = {DimField1: id, DimField2: id, ..., Value: val} + """ + records = [] + for tup, val in sorted(data.items()): + record: dict = {} + for i, dim in enumerate(dims): + if i >= len(tup): + break + field = DIM_TO_FIELD.get(dim, dim) + record[field] = self._translate_id(dim, tup[i]) + record["Value"] = val + records.append(record) + return {"SC_0": records} + + # ── R: single value ── + + def _build_R( + self, + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + if data: + for tup, val in data.items(): + records.append({"Value": val}) + break + return {"SC_0": records} + + # ── RT: REGION × TECHNOLOGY ── + + def _build_RT( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "Value": val, + }) + return {"SC_0": records} + + # ── RE: REGION × EMISSION ── + + def _build_RE( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + emis = tup[1] if len(tup) > 1 else "" + records.append({ + "EmisId": self._translate_id("EMISSION", emis), + "Value": val, + }) + return {"SC_0": records} + + # ── RS: REGION × STORAGE ── + + def _build_RS( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + stg = tup[1] if len(tup) > 1 else "" + records.append({ + "StgId": self._translate_id("STORAGE", stg), + "Value": val, + }) + return {"SC_0": records} + + # ── RY: REGION × YEAR ── + + def _build_RY( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + year = tup[1] if len(tup) > 1 else "" + records.append({ + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYT: REGION × TECHNOLOGY × YEAR ── + + def _build_RYT( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + year = tup[2] if len(tup) > 2 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTM: REGION × TECHNOLOGY × MODE × YEAR ── + + def _build_RYTM( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + mode = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "MoId": str(mode), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTC: REGION × TECHNOLOGY × COMMODITY × YEAR ── + + def _build_RYTC( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + comm = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "CommId": self._translate_id("COMMODITY", comm), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTCM: REGION × TECHNOLOGY × COMMODITY × MODE × YEAR ── + + def _build_RYTCM( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + comm = tup[2] if len(tup) > 2 else "" + mode = tup[3] if len(tup) > 3 else "" + year = tup[4] if len(tup) > 4 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "CommId": self._translate_id("COMMODITY", comm), + "MoId": str(mode), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYC: REGION × COMMODITY × YEAR ── + + def _build_RYC( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + comm = tup[1] if len(tup) > 1 else "" + year = tup[2] if len(tup) > 2 else "" + records.append({ + "CommId": self._translate_id("COMMODITY", comm), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYE: REGION × EMISSION × YEAR ── + + def _build_RYE( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + emis = tup[1] if len(tup) > 1 else "" + year = tup[2] if len(tup) > 2 else "" + records.append({ + "EmisId": self._translate_id("EMISSION", emis), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYS: REGION × STORAGE × YEAR ── + + def _build_RYS( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + stg = tup[1] if len(tup) > 1 else "" + year = tup[2] if len(tup) > 2 else "" + records.append({ + "StgId": self._translate_id("STORAGE", stg), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTs: REGION × TIMESLICE × YEAR ── + + def _build_RYTs( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + ts = tup[0] if len(tup) > 0 else "" + year = tup[1] if len(tup) > 1 else "" + records.append({ + "TsId": self._translate_id("TIMESLICE", ts), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTTs: REGION × TECHNOLOGY × TIMESLICE × YEAR ── + + def _build_RYTTs( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + ts = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "TsId": self._translate_id("TIMESLICE", ts), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYCTs: REGION × COMMODITY × TIMESLICE × YEAR ── + + def _build_RYCTs( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + comm = tup[1] if len(tup) > 1 else "" + ts = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "CommId": self._translate_id("COMMODITY", comm), + "TsId": self._translate_id("TIMESLICE", ts), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTE: REGION × TECHNOLOGY × EMISSION × YEAR ── + + def _build_RYTE( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + emis = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "EmisId": self._translate_id("EMISSION", emis), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTEM: REGION × TECHNOLOGY × EMISSION × MODE × YEAR ── + + def _build_RYTEM( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + emis = tup[2] if len(tup) > 2 else "" + mode = tup[3] if len(tup) > 3 else "" + year = tup[4] if len(tup) > 4 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "EmisId": self._translate_id("EMISSION", emis), + "MoId": str(mode), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RTSM: REGION × TECHNOLOGY × STORAGE × MODE (no YEAR) ── + + def _build_RTSM( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + stg = tup[2] if len(tup) > 2 else "" + mode = tup[3] if len(tup) > 3 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "StgId": self._translate_id("STORAGE", stg), + "MoId": str(mode), + "Value": val, + }) + return {"SC_0": records} + + # ── RYDtb: REGION × DAILYTIMEBRACKET × YEAR ── + + def _build_RYDtb( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + dtb = tup[1] if len(tup) > 1 else "" + year = tup[2] if len(tup) > 2 else "" + records.append({ + "DtbId": self._translate_id("DAILYTIMEBRACKET", dtb), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYSeDt: REGION × SEASON × DAYTYPE × YEAR ── + + def _build_RYSeDt( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + se = tup[1] if len(tup) > 1 else "" + dt = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "SeId": self._translate_id("SEASON", se), + "DtId": self._translate_id("DAYTYPE", dt), + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYCn: REGION × CONSTRAINT × YEAR ── + + def _build_RYCn( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + con = tup[1] if len(tup) > 1 else "" + year = tup[2] if len(tup) > 2 else "" + records.append({ + "ConId": con, + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ── RYTCn: REGION × TECHNOLOGY × CONSTRAINT × YEAR ── + + def _build_RYTCn( + self, + dims: list[str], + data: dict[tuple, Union[int, float]], + ) -> dict[str, list]: + records = [] + for tup, val in sorted(data.items()): + tech = tup[1] if len(tup) > 1 else "" + con = tup[2] if len(tup) > 2 else "" + year = tup[3] if len(tup) > 3 else "" + records.append({ + "TechId": self._translate_id("TECHNOLOGY", tech), + "ConId": con, + "Year": str(year), + "Value": val, + }) + return {"SC_0": records} + + # ───────────────────────────────────────────────────────── + # Step 6 — genData.json Builder + # ───────────────────────────────────────────────────────── + + def _build_gen_data(self) -> dict: + """Build genData.json metadata.""" + years = sorted(self._sets.get("YEAR", [])) + techs = sorted(self._sets.get("TECHNOLOGY", [])) + comms = sorted(self._sets.get("COMMODITY", [])) + emis = sorted(self._sets.get("EMISSION", [])) + stgs = sorted(self._sets.get("STORAGE", [])) + modes = sorted(self._sets.get("MODE_OF_OPERATION", [])) + ts_members = sorted(self._sets.get("TIMESLICE", [])) + seasons = sorted(self._sets.get("SEASON", [])) + daytypes = sorted(self._sets.get("DAYTYPE", [])) + dtbrackets = sorted(self._sets.get("DAILYTIMEBRACKET", [])) + regions = self._sets.get("REGION", []) + + # Technology entries (with IAR/OAR/EAR commodity/emission links) + tech_entries = [] + for t in techs: + tid = self._id_maps["TECHNOLOGY"][t] + entry: dict = { + "TechId": tid, + "Tech": t, + "Desc": "", + "IAR": self._get_linked_commodities(t, "InputActivityRatio"), + "OAR": self._get_linked_commodities(t, "OutputActivityRatio"), + "EAR": self._get_linked_emissions(t), + "TTS": self._get_linked_storage(t, "TechnologyToStorage"), + "TFS": self._get_linked_storage(t, "TechnologyFromStorage"), + } + tech_entries.append(entry) + + # Commodity entries + comm_entries = [ + {"CommId": self._id_maps["COMMODITY"][c], "Comm": c, "Desc": ""} + for c in comms + ] + + # Emission entries + emis_entries = [ + {"EmisId": self._id_maps["EMISSION"][e], "Emis": e, "Desc": ""} + for e in emis + ] + + # Storage entries (with TTS/TFS tech links) + stg_entries = [] + for s in stgs: + sid = self._id_maps["STORAGE"][s] + entry = { + "StgId": sid, + "Stg": s, + "Desc": "", + "TTS": self._get_storage_linked_tech(s, "TechnologyToStorage"), + "TFS": self._get_storage_linked_tech(s, "TechnologyFromStorage"), + } + stg_entries.append(entry) + + # Timeslice entries + ts_entries = [ + {"TsId": self._id_maps["TIMESLICE"].get(ts, f"Ts_{i}"), "Ts": ts, "Desc": ""} + for i, ts in enumerate(ts_members) + ] + + # Season entries + se_entries = [ + {"SeId": self._id_maps.get("SEASON", {}).get(se, f"SE_{i}"), "Se": se, "Desc": ""} + for i, se in enumerate(seasons) + ] if seasons else [{"SeId": "SE_0", "Se": "1", "Desc": "Default season"}] + + # Day type entries + dt_entries = [ + {"DtId": self._id_maps.get("DAYTYPE", {}).get(dt, f"DT_{i}"), "Dt": dt, "Desc": ""} + for i, dt in enumerate(daytypes) + ] if daytypes else [{"DtId": "DT_0", "Dt": "1", "Desc": "Default day type"}] + + # Daily time bracket entries + dtb_entries = [ + {"DtbId": self._id_maps.get("DAILYTIMEBRACKET", {}).get(dtb, f"DTB_{i}"), "Dtb": dtb, "Desc": ""} + for i, dtb in enumerate(dtbrackets) + ] if dtbrackets else [{"DtbId": "DTB_0", "Dtb": "1", "Desc": "Default daily time bracket"}] + + gen_data = { + "osy-casename": self._casename, + "osy-desc": self._description, + "osy-region": regions[0] if regions else "", + "osy-years": years, + "osy-mo": len(modes), + "osy-ns": len(seasons) if seasons else 1, + "osy-tech": tech_entries, + "osy-comm": comm_entries, + "osy-emis": emis_entries, + "osy-stg": stg_entries, + "osy-ts": ts_entries, + "osy-se": se_entries, + "osy-dt": dt_entries, + "osy-dtb": dtb_entries, + "osy-scenarios": [ + {"ScenarioId": "SC_0", "Sc": "Baseline", "Desc": "Default scenario"} + ], + "osy-techGroups": [ + {"TechId": e["TechId"], "group": ""} for e in tech_entries + ], + "osy-constraints": [], + } + + return gen_data + + # ───────────────────────────────────────────────────────── + # Helpers — commodity/emission/storage linkage + # ───────────────────────────────────────────────────────── + + def _get_linked_commodities(self, tech: str, param_name: str) -> list[str]: + """Get commodity IDs linked to a technology via IAR or OAR.""" + data = self._norm.get(param_name, {}) + comms: set[str] = set() + for tup in data.keys(): + if len(tup) >= 3 and tup[1] == tech: + comm = tup[2] + comm_id = self._id_maps.get("COMMODITY", {}).get(comm, comm) + comms.add(comm_id) + return sorted(comms) + + def _get_linked_emissions(self, tech: str) -> list[str]: + """Get emission IDs linked to a technology via EAR.""" + data = self._norm.get("EmissionActivityRatio", {}) + emis: set[str] = set() + for tup in data.keys(): + if len(tup) >= 3 and tup[1] == tech: + emi = tup[2] + emi_id = self._id_maps.get("EMISSION", {}).get(emi, emi) + emis.add(emi_id) + return sorted(emis) + + def _get_linked_storage(self, tech: str, param_name: str) -> Optional[str]: + """Get storage ID linked to a tech via TTS or TFS.""" + data = self._norm.get(param_name, {}) + for tup in data.keys(): + if len(tup) >= 3 and tup[1] == tech: + stg = tup[2] + return self._id_maps.get("STORAGE", {}).get(stg, stg) + return None + + def _get_storage_linked_tech(self, stg: str, param_name: str) -> Optional[str]: + """Get technology ID linked to a storage via TTS or TFS.""" + data = self._norm.get(param_name, {}) + for tup in data.keys(): + if len(tup) >= 3 and tup[2] == stg: + tech = tup[1] + return self._id_maps.get("TECHNOLOGY", {}).get(tech, tech) + return None + + # ───────────────────────────────────────────────────────── + # Conversion matrix handling + # ───────────────────────────────────────────────────────── + + def _build_conversions(self) -> dict: + """Build conversion matrices as timeslice → target mappings.""" + result = {} + for param_name in ("Conversionls", "Conversionld", "Conversionlh"): + data = self._norm.get(param_name, {}) + mapping = {} + for tup, val in data.items(): + if len(tup) >= 2 and val == 1: + ts = tup[0] + target = tup[1] + ts_id = self._id_maps.get("TIMESLICE", {}).get(ts, ts) + mapping[ts_id] = target + result[param_name] = mapping + return result + + # ───────────────────────────────────────────────────────── + # Public getters + # ───────────────────────────────────────────────────────── + + @property + def id_maps(self) -> dict[str, dict[str, str]]: + """Original name → MUIO ID mappings.""" + return dict(self._id_maps) + + @property + def sets(self) -> dict[str, list[str]]: + """Normalized sets.""" + return dict(self._sets) + + +# ───────────────────────────────────────────────────────────── +# CLI +# ───────────────────────────────────────────────────────────── + +if __name__ == "__main__": + import json + import sys + from pathlib import Path + + sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + + from Classes.Case.GMPLParser import GMPLParser + from Classes.Case.SliceInterpreter import SliceInterpreter + + if len(sys.argv) < 2: + print("Usage: python MuioTransformer.py ") + sys.exit(1) + + filepath = sys.argv[1] + parsed = GMPLParser.parse_file(filepath) + interp = SliceInterpreter(parsed) + norm = interp.interpret() + + transformer = MuioTransformer( + normalized_data=norm, + sets=parsed.sets, + casename=Path(filepath).stem, + ) + result = transformer.transform() + + print(f"\nGenerated {len(result)} file groups:") + for name, data in sorted(result.items()): + if isinstance(data, dict): + keys = list(data.keys()) + print(f" {name}.json — keys: {keys[:8]}{'...' if len(keys) > 8 else ''}") + + # Print sample long-form records + for sample_key in ("RYT", "RYTCM", "RT"): + if sample_key in result: + print(f"\n── {sample_key}.json (excerpt) ──") + print(json.dumps(result[sample_key], indent=2, default=str)[:1000]) diff --git a/API/Classes/Case/SliceInterpreter.py b/API/Classes/Case/SliceInterpreter.py new file mode 100644 index 00000000..9625e115 --- /dev/null +++ b/API/Classes/Case/SliceInterpreter.py @@ -0,0 +1,801 @@ +""" +SliceInterpreter — Phase 2: Semantic expansion of parsed GMPL structures. + +Converts Phase 1 parse output (GMPLParseResult) into a normalized +long-form representation: + + { + param_name: { + (dim1, dim2, ..., dimN): numeric_value, + ... + }, + ... + } + +This module does NOT: + - Rename sets (FUEL stays FUEL, COMMODITY stays COMMODITY) + - Generate MUIO-specific IDs + - Pivot to wide format + - Write JSON files + - Modify Phase 1 data structures +""" + +from __future__ import annotations + +from typing import Optional, Union +from Classes.Case.GMPLParser import GMPLParseResult, ParsedParam, SliceBlock + + +# --------------------------------------------------------------------------- +# Dimension Registry +# --------------------------------------------------------------------------- +# Maps parameter names to their expected dimension order. +# Source: OSeMOSYS model formulation. +# +# This is the single source of truth for dimension ordering. +# Do NOT infer dimension order from slice length. + +DIMENSIONS: dict[str, list[str]] = { + # ── Demand ── + "AccumulatedAnnualDemand": ["REGION", "FUEL", "YEAR"], + "SpecifiedAnnualDemand": ["REGION", "FUEL", "YEAR"], + "SpecifiedDemandProfile": ["REGION", "FUEL", "TIMESLICE", "YEAR"], + + # ── Time ── + "YearSplit": ["TIMESLICE", "YEAR"], + "Conversionls": ["TIMESLICE", "SEASON"], + "Conversionld": ["TIMESLICE", "DAYTYPE"], + "Conversionlh": ["TIMESLICE", "DAILYTIMEBRACKET"], + "DaySplit": ["TIMESLICE", "YEAR"], + "DaysInDayType": ["SEASON", "DAYTYPE", "YEAR"], + + # ── Technology performance ── + "CapacityToActivityUnit": ["REGION", "TECHNOLOGY"], + "CapacityFactor": ["REGION", "TECHNOLOGY", "TIMESLICE", "YEAR"], + "AvailabilityFactor": ["REGION", "TECHNOLOGY", "YEAR"], + "OperationalLife": ["REGION", "TECHNOLOGY"], + "InputActivityRatio": ["REGION", "TECHNOLOGY", "FUEL", "MODE_OF_OPERATION", "YEAR"], + "OutputActivityRatio": ["REGION", "TECHNOLOGY", "FUEL", "MODE_OF_OPERATION", "YEAR"], + "ResidualCapacity": ["REGION", "TECHNOLOGY", "YEAR"], + + # ── Costs ── + "CapitalCost": ["REGION", "TECHNOLOGY", "YEAR"], + "FixedCost": ["REGION", "TECHNOLOGY", "YEAR"], + "VariableCost": ["REGION", "TECHNOLOGY", "MODE_OF_OPERATION", "YEAR"], + + # ── Capacity constraints ── + "TotalAnnualMaxCapacity": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMinCapacity": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMaxCapacityInvestment": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalAnnualMinCapacityInvestment": ["REGION", "TECHNOLOGY", "YEAR"], + + # ── Activity constraints ── + "TotalTechnologyAnnualActivityUpperLimit": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalTechnologyAnnualActivityLowerLimit": ["REGION", "TECHNOLOGY", "YEAR"], + "TotalTechnologyModelPeriodActivityUpperLimit": ["REGION", "TECHNOLOGY"], + "TotalTechnologyModelPeriodActivityLowerLimit": ["REGION", "TECHNOLOGY"], + + # ── Emissions ── + "EmissionActivityRatio": ["REGION", "TECHNOLOGY", "EMISSION", "MODE_OF_OPERATION", "YEAR"], + "EmissionsPenalty": ["REGION", "EMISSION", "YEAR"], + "AnnualExogenousEmission": ["REGION", "EMISSION", "YEAR"], + "AnnualEmissionLimit": ["REGION", "EMISSION", "YEAR"], + "ModelPeriodExogenousEmission": ["REGION", "EMISSION"], + "ModelPeriodEmissionLimit": ["REGION", "EMISSION"], + + # ── Reserve margin ── + "ReserveMargin": ["REGION", "YEAR"], + "ReserveMarginTagFuel": ["REGION", "FUEL", "YEAR"], + "ReserveMarginTagTechnology": ["REGION", "TECHNOLOGY", "YEAR"], + + # ── Renewable energy ── + "RETagTechnology": ["REGION", "TECHNOLOGY", "YEAR"], + "RETagFuel": ["REGION", "FUEL", "YEAR"], + "REMinProductionTarget": ["REGION", "YEAR"], + + # ── Storage ── + "TechnologyToStorage": ["REGION", "TECHNOLOGY", "STORAGE", "MODE_OF_OPERATION"], + "TechnologyFromStorage": ["REGION", "TECHNOLOGY", "STORAGE", "MODE_OF_OPERATION"], + "StorageLevelStart": ["REGION", "STORAGE"], + "StorageMaxChargeRate": ["REGION", "STORAGE"], + "StorageMaxDischargeRate": ["REGION", "STORAGE"], + "MinStorageCharge": ["REGION", "STORAGE", "YEAR"], + "OperationalLifeStorage": ["REGION", "STORAGE"], + "CapitalCostStorage": ["REGION", "STORAGE", "YEAR"], + "ResidualStorageCapacity": ["REGION", "STORAGE", "YEAR"], + + # ── Trade ── + "TradeRoute": ["REGION", "REGION", "FUEL", "YEAR"], + + # ── Scalar / global ── + "DiscountRate": ["REGION"], + "DiscountRateStorage": ["REGION", "STORAGE"], + "DepreciationMethod": ["REGION"], + + # ── Other ── + "CapacityOfOneTechnologyUnit": ["REGION", "TECHNOLOGY", "YEAR"], + "ResultsPath": [], +} + +# Also accept COMMODITY as an alias for FUEL in set lookups. +_SET_ALIASES: dict[str, str] = { + "COMMODITY": "FUEL", +} + + +# --------------------------------------------------------------------------- +# Numeric conversion helper +# --------------------------------------------------------------------------- + +def _to_number(s: str) -> Union[int, float]: + """Convert a string to int or float. + + Returns int if the string represents a whole number, + otherwise float. + """ + try: + f = float(s) + if f == int(f) and "." not in s and "e" not in s.lower(): + return int(f) + return f + except (ValueError, OverflowError): + return float("nan") + + +def _default_to_number(s: Optional[str]) -> Optional[Union[int, float]]: + """Convert a default value string to a number, or None if not numeric.""" + if s is None: + return None + # Strip trailing dots (e.g., "0." → "0") + cleaned = s.rstrip(".") + if not cleaned: + return 0.0 + try: + f = float(cleaned) + if f == int(f) and "." not in cleaned and "e" not in cleaned.lower(): + return int(f) + return f + except (ValueError, OverflowError): + return None + + +# --------------------------------------------------------------------------- +# SliceInterpreter +# --------------------------------------------------------------------------- + +class SliceInterpreter: + """ + Phase 2 interpreter: expands parsed GMPL structures into normalized + long-form tuple data. + + Usage + ----- + >>> from Classes.Case.GMPLParser import GMPLParser + >>> parse_result = GMPLParser.parse_file("data.txt") + >>> interp = SliceInterpreter(parse_result) + >>> normalized = interp.interpret() + >>> print(normalized["CapitalCost"]) + {("UTOPIA", "E01", "1990"): 1400.0, ...} + """ + + def __init__(self, parse_result: GMPLParseResult): + self.sets = parse_result.sets + self.params = parse_result.params + + # Build a unified set lookup that handles FUEL/COMMODITY aliasing. + self._set_lookup: dict[str, list[str]] = dict(self.sets) + + def interpret(self) -> dict[str, dict[tuple, Union[int, float]]]: + """Interpret all parsed params into normalized long-form dictionaries. + + Returns + ------- + dict mapping param_name → { tuple_key: numeric_value } + """ + result: dict[str, dict[tuple, Union[int, float]]] = {} + + for param in self.params: + # Skip params with no slice data (default-only). + if not param.slices: + continue + + # Skip non-data params (e.g., ResultsPath). + dims = self._get_dimensions(param.name) + if dims is None: + continue + if not dims: + # Zero-dimensional (scalar) — skip for now. + continue + + default_val = _default_to_number(param.default) + param_data: dict[tuple, Union[int, float]] = {} + + for slice_block in param.slices: + self._expand_slice(param.name, dims, slice_block, default_val, param_data) + + if param_data: + result[param.name] = param_data + + return result + + def _get_dimensions(self, param_name: str) -> Optional[list[str]]: + """Look up the dimension schema for a parameter. + + Returns None if the parameter is unknown. + """ + if param_name in DIMENSIONS: + return DIMENSIONS[param_name] + return None + + def _resolve_set_name(self, dim_name: str) -> str: + """Resolve a dimension name to its set name, handling aliases. + + For example, if the file uses COMMODITY instead of FUEL, + the dimension 'FUEL' should look up the 'COMMODITY' set. + """ + # Direct match. + if dim_name in self._set_lookup: + return dim_name + + # Check if any alias maps to this dimension. + for alias, canonical in _SET_ALIASES.items(): + if canonical == dim_name and alias in self._set_lookup: + return alias + + return dim_name + + def _expand_slice( + self, + param_name: str, + dims: list[str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + """Expand one SliceBlock into tuple → value entries.""" + header = block.header + col_labels = block.column_labels + rows = block.rows + + if not rows: + return + + n_dims = len(dims) + + # ── Headerless tables ── + # These have header == [] and use bare ':' columns. + # Dimension mapping depends on the number of dims. + if not header: + self._expand_headerless(param_name, dims, block, default_val, out) + return + + # ── Headed tables ── + # Identify which dimension positions are wildcards. + # The header length should match n_dims (or n_dims - 1 if + # columns provide the last dimension). + wildcard_positions = [i for i, h in enumerate(header) if h == "*"] + fixed_positions = {i: header[i] for i in range(len(header)) if header[i] != "*"} + + n_header = len(header) + n_wildcards = len(wildcard_positions) + + # ── Header length > dim count ── + # This happens when the GMPL table format uses an extra wildcard + # for the column layout dimension that doesn't correspond to a + # model dimension. E.g., OperationalLife has dims [REGION, TECHNOLOGY] + # but header [RE1,*,*] — the extra * is the column layout. + if n_header > n_dims and n_wildcards >= 2: + self._expand_oversized_header( + dims, header, wildcard_positions, fixed_positions, + block, default_val, out, + ) + return + + if n_wildcards == 0: + # All fixed — shouldn't have rows, skip. + return + + if n_wildcards == 1: + # Single wildcard — either rows or columns provide its values. + self._expand_single_wildcard( + dims, header, wildcard_positions[0], fixed_positions, + block, default_val, out, + ) + return + + if n_wildcards == 2: + # Two wildcards — rows provide one, columns provide the other. + self._expand_two_wildcards( + dims, header, wildcard_positions, fixed_positions, + block, default_val, out, + ) + return + + # More than 2 wildcards — not expected in standard OSeMOSYS, + # but handle gracefully by skipping. + return + + def _expand_headerless( + self, + param_name: str, + dims: list[str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + """Expand a headerless table (no slice notation). + + These tables use bare ':' column headers. + The row key is one dimension, column labels are another dimension. + """ + col_labels = block.column_labels + n_dims = len(dims) + + if n_dims == 2: + # Two dimensions: row key is dim[0], columns are dim[1] + # OR: row key is dim[0], columns are dim[1] + # Detect which by checking if row keys match a known set + # or if column labels match a known set. + row_dim_idx, col_dim_idx = self._detect_headerless_2d(dims, block) + + for row in block.rows: + row_key = row.key + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup = [None] * n_dims + tup[row_dim_idx] = row_key + tup[col_dim_idx] = col_label + out[tuple(tup)] = val + + elif n_dims == 1: + # Single dimension: columns are the dimension values + for row in block.rows: + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + out[(col_label,)] = val + + else: + # More than 2 dims in a headerless table — unusual. + # Fall back: assume first row key fills dim[0], + # columns fill last dim. + for row in block.rows: + row_key = row.key + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup = [None] * n_dims + tup[0] = row_key + tup[-1] = col_label + out[tuple(tup)] = val + + def _detect_headerless_2d( + self, + dims: list[str], + block: SliceBlock, + ) -> tuple[int, int]: + """Detect which dimension is rows vs columns in a 2D headerless table. + + Returns (row_dim_index, col_dim_index). + """ + col_labels = block.column_labels + row_keys = [r.key for r in block.rows] + + # Check if column labels match dim[0]'s set. + set_name_0 = self._resolve_set_name(dims[0]) + set_members_0 = set(self._set_lookup.get(set_name_0, [])) + + set_name_1 = self._resolve_set_name(dims[1]) + set_members_1 = set(self._set_lookup.get(set_name_1, [])) + + # If column labels match dim[1]'s set members → rows=dim[0], cols=dim[1] + col_match_1 = all(c in set_members_1 for c in col_labels) if col_labels else False + col_match_0 = all(c in set_members_0 for c in col_labels) if col_labels else False + + if col_match_1 and not col_match_0: + return (0, 1) + if col_match_0 and not col_match_1: + return (1, 0) + + # Fallback: rows = dim[0], columns = dim[1] + return (0, 1) + + def _expand_oversized_header( + self, + dims: list[str], + header: list[str], + wc_positions: list[int], + fixed: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + """Expand a slice where header has more elements than dim count. + + This happens when the GMPL table format uses wildcards for both + the row and column layout but the parameter has fewer actual + dimensions. + + Example: OperationalLife dims=[REGION, TECHNOLOGY] + header=[RE1, *, *] + columns=[Coal, Gas, Solar, Wind] + row key=RE1, values=[40, 30, 25, 25] + + Strategy: + 1. Map fixed header values to their dimensions. + 2. Identify unmapped dims. + 3. Determine whether column labels or row keys fill them. + """ + n_dims = len(dims) + col_labels = block.column_labels + + # Map fixed header values to dimensions. + mapped_dims: dict[int, str] = {} # dim_idx → value + used_dims: set[int] = set() + + for hp, hval in enumerate(header): + if hval == "*": + continue + for di in range(n_dims): + if di in used_dims: + continue + set_name = self._resolve_set_name(dims[di]) + members = self._set_lookup.get(set_name, []) + if hval in members: + mapped_dims[di] = hval + used_dims.add(di) + break + + # Find unmapped dimensions. + unmapped = [di for di in range(n_dims) if di not in used_dims] + + if len(unmapped) == 0: + # All dims are fixed by header — just emit values. + # This shouldn't normally happen, but handle gracefully. + return + + if len(unmapped) == 1: + # One unmapped dim — filled by column labels. + # Row key is likely a duplicate of one of the fixed dims. + target_dim = unmapped[0] + for row in block.rows: + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup = [None] * n_dims + for di, dval in mapped_dims.items(): + tup[di] = dval + tup[target_dim] = col_label + out[tuple(tup)] = val + + elif len(unmapped) == 2: + # Two unmapped dims — columns fill one, rows fill the other. + # Detect which by checking set membership. + dim_a, dim_b = unmapped + + set_a_name = self._resolve_set_name(dims[dim_a]) + set_a = set(self._set_lookup.get(set_a_name, [])) + set_b_name = self._resolve_set_name(dims[dim_b]) + set_b = set(self._set_lookup.get(set_b_name, [])) + + cols_match_a = col_labels and all(c in set_a for c in col_labels) + cols_match_b = col_labels and all(c in set_b for c in col_labels) + + if cols_match_b and not cols_match_a: + col_dim, row_dim = dim_b, dim_a + elif cols_match_a and not cols_match_b: + col_dim, row_dim = dim_a, dim_b + else: + # Default: first unmapped = rows, second = columns + row_dim, col_dim = dim_a, dim_b + + for row in block.rows: + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup = [None] * n_dims + for di, dval in mapped_dims.items(): + tup[di] = dval + tup[row_dim] = row.key + tup[col_dim] = col_label + out[tuple(tup)] = val + + def _expand_single_wildcard( + self, + dims: list[str], + header: list[str], + wc_pos: int, + fixed: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + """Expand a slice with exactly 1 wildcard. + + Row keys fill the wildcard dimension. + Columns don't contribute a dimension — they are the values. + OR: column labels contribute a missing dimension. + """ + n_dims = len(dims) + n_header = len(header) + col_labels = block.column_labels + + if n_header == n_dims: + # Header covers all dimensions. Rows fill the wildcard. + # If there are columns, they provide the values for each column. + # But wait — if columns provide labeled data, they must be + # an additional dimension not in the header. + # Actually, with 1 wildcard and n_header == n_dims, + # columns are just repeated values (no extra dimension). + # Rows provide the wildcard dimension, each value corresponds + # to a column label (which itself is a member of some dim). + # But that doesn't match — let me re-think. + # + # Example: TechnologyToStorage [UTOPIA,*,*,2] + # → 2 wildcards, not 1. This case shouldn't reach here. + # + # Example: VariableCost [UTOPIA,*,1,*] with dims [R,T,M,Y] + # → 2 wildcards. Also not 1. + # + # Single wildcard with n_header == n_dims would mean + # column labels are just value labels (e.g., for a + # single-row table). Row key is the wildcard, columns + # are labeled data points. + for row in block.rows: + row_key = row.key + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup = list(header) + tup[wc_pos] = row_key + # col_label should map to... hmm, this needs + # column as another dimension. But we only have 1 wildcard. + # This means columns are NOT a separate dimension. + # They're just indexed by position for a single value. + # Actually — with only 1 wildcard and columns present, + # the column labels must represent values, not dim members. + # This case seems unlikely in OSeMOSYS. + out[tuple(tup)] = val + + elif n_header < n_dims: + # Header covers fewer dims than expected. + # The missing dimension is provided by column labels. + # Row keys provide the wildcard dimension. + # + # This shouldn't happen in standard patterns, but let's handle it. + missing_dim_idx = None + header_dim_map: list[int] = [] + used = set() + for hi, h in enumerate(header): + if h == "*": + header_dim_map.append(-1) # wildcard, to be filled + else: + # Match fixed header to a dimension. + for di in range(n_dims): + if di not in used: + set_name = self._resolve_set_name(dims[di]) + members = self._set_lookup.get(set_name, []) + if h in members or dims[di] == "REGION": + header_dim_map.append(di) + used.add(di) + break + else: + header_dim_map.append(-1) + + # Find the dimension not covered by header. + covered = set(header_dim_map) - {-1} + missing = [d for d in range(n_dims) if d not in covered] + + if missing and wc_pos < len(header_dim_map): + # Row fills the wildcard dim, columns fill the first missing dim. + wc_dim = header_dim_map[wc_pos] if header_dim_map[wc_pos] != -1 else missing[0] + col_dim = missing[-1] if len(missing) > 0 else n_dims - 1 + + for row in block.rows: + for ci, col_label in enumerate(col_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup = [None] * n_dims + for hi, di in enumerate(header_dim_map): + if di >= 0 and hi != wc_pos: + tup[di] = header[hi] + tup[wc_dim] = row.key + tup[col_dim] = col_label + out[tuple(tup)] = val + + def _expand_two_wildcards( + self, + dims: list[str], + header: list[str], + wc_positions: list[int], + fixed: dict[int, str], + block: SliceBlock, + default_val: Optional[Union[int, float]], + out: dict[tuple, Union[int, float]], + ) -> None: + """Expand a slice with exactly 2 wildcards. + + One wildcard is filled by row keys, the other by column labels. + """ + n_dims = len(dims) + n_header = len(header) + + # Map header positions to dimension indices. + dim_map = self._map_header_to_dims(header, dims) + + if dim_map is None: + return + + # Determine which wildcard maps to rows and which to columns. + wc_dim_indices = [dim_map[wp] for wp in wc_positions] + + # Convention: the LAST wildcard (rightmost) in the header + # corresponds to column labels (typically YEAR, the last indexed dim). + # The other wildcard corresponds to row keys. + row_wc_header_pos = wc_positions[0] + col_wc_header_pos = wc_positions[1] + row_dim_idx = dim_map[row_wc_header_pos] + col_dim_idx = dim_map[col_wc_header_pos] + + for row in block.rows: + row_key = row.key + for ci, col_label in enumerate(block.column_labels): + if ci >= len(row.values): + break + val = _to_number(row.values[ci]) + if default_val is not None and val == default_val: + continue + + tup: list[Optional[str]] = [None] * n_dims + # Fill fixed positions. + for hp, di in enumerate(dim_map): + if hp not in wc_positions: + tup[di] = header[hp] + # Fill wildcards. + tup[row_dim_idx] = row_key + tup[col_dim_idx] = col_label + + out[tuple(tup)] = val + + def _map_header_to_dims( + self, + header: list[str], + dims: list[str], + ) -> Optional[list[int]]: + """Map each header position to a dimension index. + + Returns a list where result[header_pos] = dimension_index. + Returns None if mapping fails. + """ + n_dims = len(dims) + n_header = len(header) + + if n_header != n_dims: + # Header length doesn't match dim count — try best-effort. + # This can happen if the file has a different OSeMOSYS version. + return self._map_header_to_dims_fuzzy(header, dims) + + # Simple positional mapping: header[i] → dims[i]. + # Validate by checking that fixed values match their expected sets. + dim_map = list(range(n_dims)) + + for hp, (hval, di) in enumerate(zip(header, dim_map)): + if hval == "*": + continue + # Check that hval belongs to the set for dims[di]. + set_name = self._resolve_set_name(dims[di]) + members = self._set_lookup.get(set_name, []) + if members and hval not in members: + # Value doesn't match — try fuzzy mapping. + return self._map_header_to_dims_fuzzy(header, dims) + + return dim_map + + def _map_header_to_dims_fuzzy( + self, + header: list[str], + dims: list[str], + ) -> Optional[list[int]]: + """Fuzzy mapping for when header doesn't positionally align with dims. + + Uses set membership to determine which dimension each header + position corresponds to. + """ + n_dims = len(dims) + n_header = len(header) + dim_map = [-1] * n_header + used_dims: set[int] = set() + + # First pass: map fixed values. + for hp in range(n_header): + if header[hp] == "*": + continue + for di in range(n_dims): + if di in used_dims: + continue + set_name = self._resolve_set_name(dims[di]) + members = self._set_lookup.get(set_name, []) + if header[hp] in members: + dim_map[hp] = di + used_dims.add(di) + break + + # Second pass: map wildcards to remaining dimensions. + remaining = [di for di in range(n_dims) if di not in used_dims] + wc_positions = [hp for hp in range(n_header) if header[hp] == "*"] + + if len(wc_positions) != len(remaining): + # Can't map — dimension mismatch. + # Fall back to positional mapping. + if n_header == n_dims: + return list(range(n_dims)) + return None + + for wc_hp, rem_di in zip(wc_positions, remaining): + dim_map[wc_hp] = rem_di + + # Validate no -1 remains. + if -1 in dim_map: + return list(range(min(n_dims, n_header))) + + return dim_map + + +# --------------------------------------------------------------------------- +# CLI entry point for quick testing +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + import sys + from pathlib import Path + + sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + from Classes.Case.GMPLParser import GMPLParser + + if len(sys.argv) < 2: + print("Usage: python SliceInterpreter.py ") + sys.exit(1) + + filepath = sys.argv[1] + parse_result = GMPLParser.parse_file(filepath) + interp = SliceInterpreter(parse_result) + normalized = interp.interpret() + + print(f"Interpreted {len(normalized)} parameters with data.\n") + + for pname, data in normalized.items(): + n = len(data) + sample = list(data.items())[:3] + print(f"\n{pname} ({n} tuples):") + for tup, val in sample: + print(f" {tup} → {val}") + if n > 3: + print(f" ... ({n} total)") diff --git a/API/Classes/Case/validate_interpreter.py b/API/Classes/Case/validate_interpreter.py new file mode 100644 index 00000000..45b52176 --- /dev/null +++ b/API/Classes/Case/validate_interpreter.py @@ -0,0 +1,195 @@ +#!/usr/bin/env python3 +""" +Validation script for SliceInterpreter Phase 2. + +Parses both UTOPIA and MUIO sample fixtures, then +validates the interpreted tuple data. +""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +from Classes.Case.GMPLParser import GMPLParser +from Classes.Case.SliceInterpreter import SliceInterpreter + + +FIXTURES = Path(__file__).resolve().parent / "test_fixtures" + + +def check(cond: bool, msg: str, errors: list[str]) -> None: + if not cond: + errors.append(msg) + + +def validate_utopia(norm: dict, errors: list[str]) -> None: + """Validate UTOPIA interpreted results.""" + + # ── CapitalCost ── + cc = norm.get("CapitalCost", {}) + check(len(cc) > 0, "CapitalCost: no tuples", errors) + # Expect 21 techs × 21 years but only non-default values + check(("UTOPIA", "E01", "1990") in cc, "CapitalCost: missing (UTOPIA,E01,1990)", errors) + check(cc.get(("UTOPIA", "E01", "1990")) == 1400, f"CapitalCost (UTOPIA,E01,1990) expected 1400, got {cc.get(('UTOPIA','E01','1990'))}", errors) + check(("UTOPIA", "E21", "2000") in cc, "CapitalCost: missing (UTOPIA,E21,2000)", errors) + + # Verify dimension ordering: (REGION, TECHNOLOGY, YEAR) + for tup in list(cc.keys())[:5]: + check(len(tup) == 3, f"CapitalCost: expected 3-tuple, got {len(tup)}: {tup}", errors) + check(tup[0] == "UTOPIA", f"CapitalCost: dim[0] should be REGION, got {tup[0]}", errors) + + # ── InputActivityRatio ── + iar = norm.get("InputActivityRatio", {}) + check(len(iar) > 0, "InputActivityRatio: no tuples", errors) + check(("UTOPIA", "E70", "DSL", "1", "1990") in iar, "InputActivityRatio: missing (UTOPIA,E70,DSL,1,1990)", errors) + check(iar.get(("UTOPIA", "E70", "DSL", "1", "1990")) == 3.4, + f"InputActivityRatio (UTOPIA,E70,DSL,1,1990) expected 3.4, got {iar.get(('UTOPIA','E70','DSL','1','1990'))}", errors) + + # Verify 5-tuple: (REGION, TECHNOLOGY, FUEL, MODE, YEAR) + for tup in list(iar.keys())[:5]: + check(len(tup) == 5, f"InputActivityRatio: expected 5-tuple, got {len(tup)}: {tup}", errors) + + # ── CapacityFactor ── + cf = norm.get("CapacityFactor", {}) + check(len(cf) > 0, "CapacityFactor: no tuples", errors) + check(("UTOPIA", "E01", "ID", "1990") in cf, "CapacityFactor: missing (UTOPIA,E01,ID,1990)", errors) + check(cf.get(("UTOPIA", "E01", "ID", "1990")) == 0.8, + f"CapacityFactor (UTOPIA,E01,ID,1990) expected 0.8, got {cf.get(('UTOPIA','E01','ID','1990'))}", errors) + + # 4-tuple: (REGION, TECHNOLOGY, TIMESLICE, YEAR) + for tup in list(cf.keys())[:5]: + check(len(tup) == 4, f"CapacityFactor: expected 4-tuple, got {len(tup)}: {tup}", errors) + + # ── Conversion matrices ── + cls = norm.get("Conversionls", {}) + check(len(cls) == 6, f"Conversionls: expected 6 tuples, got {len(cls)}", errors) + check(("ID", "2") in cls, "Conversionls: missing (ID,2)", errors) + check(cls.get(("ID", "2")) == 1, f"Conversionls (ID,2) expected 1, got {cls.get(('ID','2'))}", errors) + + # ── Storage params ── + tts = norm.get("TechnologyToStorage", {}) + check(len(tts) == 1, f"TechnologyToStorage: expected 1, got {len(tts)}", errors) + check(("UTOPIA", "E51", "DAM", "2") in tts, "TechnologyToStorage: missing (UTOPIA,E51,DAM,2)", errors) + + tfs = norm.get("TechnologyFromStorage", {}) + check(len(tfs) == 1, f"TechnologyFromStorage: expected 1, got {len(tfs)}", errors) + check(("UTOPIA", "E51", "DAM", "1") in tfs, "TechnologyFromStorage: missing (UTOPIA,E51,DAM,1)", errors) + + # ── OperationalLife ── + ol = norm.get("OperationalLife", {}) + check(len(ol) > 0, "OperationalLife: no tuples", errors) + check(("UTOPIA", "E01") in ol, "OperationalLife: missing (UTOPIA,E01)", errors) + check(ol.get(("UTOPIA", "E01")) == 40, f"OperationalLife (UTOPIA,E01) expected 40, got {ol.get(('UTOPIA','E01'))}", errors) + + # ── YearSplit (headerless 2D) ── + ys = norm.get("YearSplit", {}) + check(len(ys) > 0, "YearSplit: no tuples", errors) + check(("ID", "1990") in ys, "YearSplit: missing (ID,1990)", errors) + check(ys.get(("ID", "1990")) == 0.1667, f"YearSplit (ID,1990) expected 0.1667, got {ys.get(('ID','1990'))}", errors) + + # ── No duplicate keys ── + for pname, data in norm.items(): + # Dict keys are inherently unique, so just check no None in tuples + for tup in data.keys(): + check(None not in tup, f"{pname}: tuple contains None: {tup}", errors) + + +def validate_muio(norm: dict, errors: list[str]) -> None: + """Validate MUIO sample interpreted results.""" + + # ── CapitalCost ── + cc = norm.get("CapitalCost", {}) + check(len(cc) == 9, f"MUIO CapitalCost: expected 9 tuples, got {len(cc)}", errors) + check(("RE1", "Coal", "2020") in cc, "MUIO CapitalCost: missing (RE1,Coal,2020)", errors) + check(cc.get(("RE1", "Coal", "2020")) == 1500, f"MUIO CapitalCost (RE1,Coal,2020) expected 1500, got {cc.get(('RE1','Coal','2020'))}", errors) + + # Uses COMMODITY in set definitions but dimension registry says FUEL + # The interpreter should handle this via _SET_ALIASES + iar = norm.get("InputActivityRatio", {}) + check(len(iar) == 6, f"MUIO InputActivityRatio: expected 6 tuples, got {len(iar)}", errors) + check(("RE1", "Coal", "Heat", "1", "2020") in iar, "MUIO InputActivityRatio: missing (RE1,Coal,Heat,1,2020)", errors) + + # ── OutputActivityRatio with mode 2 ── + oar = norm.get("OutputActivityRatio", {}) + check(("RE1", "Gas", "Electricity", "2", "2020") in oar, + "MUIO OutputActivityRatio: missing mode=2 entry (RE1,Gas,Electricity,2,2020)", errors) + check(oar.get(("RE1", "Gas", "Electricity", "2", "2020")) == 0.5, + f"MUIO OutputActivityRatio mode=2 expected 0.5", errors) + + # ── OperationalLife via oversized header ── + ol = norm.get("OperationalLife", {}) + check(len(ol) > 0, "MUIO OperationalLife: no tuples", errors) + check(("RE1", "Coal") in ol, "MUIO OperationalLife: missing (RE1,Coal)", errors) + check(ol.get(("RE1", "Coal")) == 40, f"MUIO OperationalLife (RE1,Coal) expected 40", errors) + + # ── Storage ── + tts = norm.get("TechnologyToStorage", {}) + check(("RE1", "Wind", "Battery", "2") in tts, "MUIO TechnologyToStorage: missing (RE1,Wind,Battery,2)", errors) + + # ── No None in tuples ── + for pname, data in norm.items(): + for tup in data.keys(): + check(None not in tup, f"MUIO {pname}: tuple contains None: {tup}", errors) + + +def print_sample(norm: dict, params: list[str]) -> None: + """Print sample output for requested params.""" + for pname in params: + data = norm.get(pname, {}) + print(f"\n {pname} ({len(data)} tuples):") + if not data: + print(" ") + continue + items = sorted(data.items()) + for tup, val in items[:6]: + print(f" {tup} → {val}") + if len(items) > 6: + print(f" ... ({len(items)} total)") + + +def main(): + utopia_path = FIXTURES / "utopia.txt" + muio_path = FIXTURES / "muio_sample.txt" + + if not utopia_path.exists() or not muio_path.exists(): + print("ERROR: fixtures not found") + sys.exit(1) + + errors: list[str] = [] + + # ── UTOPIA ── + print("Parsing & interpreting UTOPIA...") + r1 = GMPLParser.parse_file(utopia_path) + interp1 = SliceInterpreter(r1) + norm1 = interp1.interpret() + print(f" → {len(norm1)} params with data") + validate_utopia(norm1, errors) + + print("\n Sample UTOPIA output:") + print_sample(norm1, ["CapitalCost", "InputActivityRatio", "CapacityFactor"]) + + # ── MUIO ── + print("\n\nParsing & interpreting MUIO sample...") + r2 = GMPLParser.parse_file(muio_path) + interp2 = SliceInterpreter(r2) + norm2 = interp2.interpret() + print(f" → {len(norm2)} params with data") + validate_muio(norm2, errors) + + print("\n Sample MUIO output:") + print_sample(norm2, ["CapitalCost", "InputActivityRatio", "CapacityFactor"]) + + # ── Results ── + print("\n" + "=" * 60) + if errors: + print(f"❌ {len(errors)} validation error(s):") + for e in errors: + print(f" - {e}") + sys.exit(1) + else: + print("✅ All Phase 2 validation checks passed!") + + +if __name__ == "__main__": + main() diff --git a/API/Classes/Case/validate_parser.py b/API/Classes/Case/validate_parser.py new file mode 100644 index 00000000..ab840d43 --- /dev/null +++ b/API/Classes/Case/validate_parser.py @@ -0,0 +1,148 @@ +#!/usr/bin/env python3 +""" +Validation script for GMPLParser Phase 1. + +Parses both UTOPIA and MUIO sample fixtures, then dumps the +parsed structures in a human-readable format for review. +""" + +import sys +import os +from pathlib import Path + +# Add the API directory to path so we can import GMPLParser +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +from Classes.Case.GMPLParser import GMPLParser + + +FIXTURES = Path(__file__).resolve().parent / "test_fixtures" + + +def dump_result(label: str, result) -> str: + """Produce a detailed human-readable dump.""" + lines = [] + lines.append(f"{'='*70}") + lines.append(f" {label}") + lines.append(f"{'='*70}") + lines.append("") + + # Sets + lines.append(f"SETS ({len(result.sets)}):") + for name, members in result.sets.items(): + if len(members) <= 10: + lines.append(f" {name}: {members}") + else: + lines.append(f" {name}: [{', '.join(members[:5])}, ... ({len(members)} total)]") + lines.append("") + + # Params summary + lines.append(f"PARAMS ({len(result.params)}):") + lines.append(f" {'Name':<50} {'Default':<12} {'Slices':<8} {'Rows':<8}") + lines.append(f" {'-'*50} {'-'*12} {'-'*8} {'-'*8}") + for p in result.params: + n_rows = sum(len(s.rows) for s in p.slices) + lines.append(f" {p.name:<50} {str(p.default):<12} {len(p.slices):<8} {n_rows:<8}") + lines.append("") + + # Detailed: show first 3 params with data, plus specific interesting ones + interesting = { + "InputActivityRatio", "OutputActivityRatio", "CapacityFactor", + "YearSplit", "CapacityToActivityUnit", "OperationalLife", + "ReserveMargin", "TechnologyToStorage", "TechnologyFromStorage", + "Conversionls", "EmissionActivityRatio", "VariableCost", + } + + lines.append("DETAILED PARAM STRUCTURES (selected):") + for p in result.params: + if p.name not in interesting: + continue + lines.append(f"\n param {p.name} (default={p.default}):") + if not p.slices: + lines.append(" ") + continue + for si, s in enumerate(p.slices): + hdr = ",".join(s.header) if s.header else "" + lines.append(f" slice[{si}]: [{hdr}]") + cols_str = ", ".join(s.column_labels[:8]) + if len(s.column_labels) > 8: + cols_str += f", ... ({len(s.column_labels)} cols)" + lines.append(f" columns: [{cols_str}]") + for ri, r in enumerate(s.rows): + vals = ", ".join(str(v) for v in r.values[:5]) + if len(r.values) > 5: + vals += ", ..." + lines.append(f" row[{ri}]: key={r.key!r:<15} values=[{vals}]") + + lines.append("") + return "\n".join(lines) + + +def main(): + utopia_path = FIXTURES / "utopia.txt" + muio_path = FIXTURES / "muio_sample.txt" + + if not utopia_path.exists(): + print(f"ERROR: {utopia_path} not found") + sys.exit(1) + if not muio_path.exists(): + print(f"ERROR: {muio_path} not found") + sys.exit(1) + + print("Parsing UTOPIA...") + utopia_result = GMPLParser.parse_file(utopia_path) + + print("Parsing MUIO sample...") + muio_result = GMPLParser.parse_file(muio_path) + + print() + print(dump_result("UTOPIA (Standard OSeMOSYS GMPL)", utopia_result)) + print() + print(dump_result("MUIO SAMPLE (MUIO-style GMPL)", muio_result)) + + # Quick assertions + errors = [] + # UTOPIA checks + if len(utopia_result.sets) != 11: + errors.append(f"UTOPIA: expected 11 sets, got {len(utopia_result.sets)}") + if "FUEL" not in utopia_result.sets: + errors.append("UTOPIA: missing FUEL set") + if len(utopia_result.sets.get("TECHNOLOGY", [])) != 21: + errors.append(f"UTOPIA: expected 21 technologies, got {len(utopia_result.sets.get('TECHNOLOGY', []))}") + if len(utopia_result.params) != 54: + errors.append(f"UTOPIA: expected 54 params, got {len(utopia_result.params)}") + + # Check YearSplit has data + ys = next((p for p in utopia_result.params if p.name == "YearSplit"), None) + if ys and sum(len(s.rows) for s in ys.slices) != 6: + errors.append(f"UTOPIA: YearSplit expected 6 rows, got {sum(len(s.rows) for s in ys.slices)}") + + # Check InputActivityRatio has 8 slices + iar = next((p for p in utopia_result.params if p.name == "InputActivityRatio"), None) + if iar and len(iar.slices) != 8: + errors.append(f"UTOPIA: InputActivityRatio expected 8 slices, got {len(iar.slices)}") + + # MUIO checks + if len(muio_result.sets) != 14: + errors.append(f"MUIO: expected 14 sets, got {len(muio_result.sets)}") + if "COMMODITY" not in muio_result.sets: + errors.append("MUIO: missing COMMODITY set") + if len(muio_result.sets.get("STORAGEINTRADAY", ["x"])) != 0: + errors.append("MUIO: STORAGEINTRADAY should be empty") + + # Check InputActivityRatio has 2 slices + iar_m = next((p for p in muio_result.params if p.name == "InputActivityRatio"), None) + if iar_m and len(iar_m.slices) != 2: + errors.append(f"MUIO: InputActivityRatio expected 2 slices, got {len(iar_m.slices)}") + + if errors: + print("\n❌ VALIDATION ERRORS:") + for e in errors: + print(f" - {e}") + sys.exit(1) + else: + print("\n✅ All validation checks passed!") + + +if __name__ == "__main__": + main() diff --git a/API/Classes/Case/validate_transformer.py b/API/Classes/Case/validate_transformer.py new file mode 100644 index 00000000..3e6e3ccf --- /dev/null +++ b/API/Classes/Case/validate_transformer.py @@ -0,0 +1,296 @@ +#!/usr/bin/env python3 +""" +Validation script for Phase 3 — MuioTransformer. + +Tests the transformer against both UTOPIA and MUIO sample fixtures. +Validates: + - All expected JSON file groups are generated + - ID mappings are deterministic + - Record shapes match expected long-form structure + - No crashes on missing params + - genData.json schema completeness +""" + +import sys +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).resolve().parent.parent.parent)) + +from Classes.Case.GMPLParser import GMPLParser +from Classes.Case.SliceInterpreter import SliceInterpreter +from Classes.Case.MuioTransformer import MuioTransformer, PARAM_MAPPING + +FIXTURES = Path(__file__).resolve().parent / "test_fixtures" + + +def check(cond: bool, msg: str, errors: list[str]) -> None: + if not cond: + errors.append(msg) + + +def validate_structure(result: dict, label: str, errors: list[str]) -> None: + """Validate file group structure and genData completeness.""" + + # ── Expected file groups ── + expected_groups = { + "genData", "R", "RT", "RE", "RS", "RY", + "RYT", "RYTM", "RYTC", "RYTCM", + "RYC", "RYE", "RYS", + "RYTs", "RYTTs", "RYCTs", + "RYTE", "RYTEM", "RTSM", + "RYDtb", "RYSeDt", "RYCn", "RYTCn", + "_conv", + } + for fg in expected_groups: + check(fg in result, f"{label}: missing file group '{fg}'", errors) + + # ── genData schema ── + gd = result.get("genData", {}) + required_gd_keys = [ + "osy-casename", "osy-desc", "osy-region", "osy-years", + "osy-mo", "osy-tech", "osy-comm", "osy-emis", "osy-stg", + "osy-ts", "osy-se", "osy-dt", "osy-dtb", "osy-scenarios", + "osy-techGroups", "osy-constraints", + ] + for key in required_gd_keys: + check(key in gd, f"{label}: genData missing key '{key}'", errors) + + # ── SC_0 envelope ── + for fg, data in result.items(): + if fg in ("genData", "_conv"): + continue + if not isinstance(data, dict): + continue + for param_key, param_data in data.items(): + check( + isinstance(param_data, dict) and "SC_0" in param_data, + f"{label}: {fg}.{param_key} missing SC_0 envelope", + errors, + ) + + +def validate_ids(tx: MuioTransformer, label: str, errors: list[str]) -> None: + """Validate ID mappings are deterministic and sorted.""" + id_maps = tx.id_maps + + for set_name, mapping in id_maps.items(): + if set_name in ("REGION", "YEAR"): + continue + if not mapping: + continue + + # Check IDs are sequential + ids = list(mapping.values()) + if set_name == "MODE_OF_OPERATION": + # Raw strings, no prefix check + continue + + prefixes = set() + for mid in ids: + parts = str(mid).rsplit("_", 1) + check( + len(parts) == 2 and parts[1].isdigit(), + f"{label}: {set_name} ID '{mid}' not in prefix_N format", + errors, + ) + prefixes.add(parts[0]) + + # All IDs should share the same prefix + check( + len(prefixes) <= 1, + f"{label}: {set_name} IDs have mixed prefixes: {prefixes}", + errors, + ) + + +def validate_records(result: dict, label: str, errors: list[str]) -> None: + """Validate record shapes are long-form (no year-wide pivot).""" + + # ── RYT records must have TechId, Year, Value ── + ryt = result.get("RYT", {}) + for pk, pd in ryt.items(): + records = pd.get("SC_0", []) + for rec in records[:5]: + check("TechId" in rec, f"{label}: RYT.{pk} record missing TechId", errors) + check("Year" in rec, f"{label}: RYT.{pk} record missing Year", errors) + check("Value" in rec, f"{label}: RYT.{pk} record missing Value", errors) + + # ── RYTCM records must have TechId, CommId, MoId, Year, Value ── + rytcm = result.get("RYTCM", {}) + for pk, pd in rytcm.items(): + records = pd.get("SC_0", []) + for rec in records[:5]: + for field in ("TechId", "CommId", "MoId", "Year", "Value"): + check(field in rec, f"{label}: RYTCM.{pk} record missing {field}", errors) + + # ── RT records must have TechId, Value (no Year) ── + rt = result.get("RT", {}) + for pk, pd in rt.items(): + records = pd.get("SC_0", []) + for rec in records[:5]: + check("TechId" in rec, f"{label}: RT.{pk} record missing TechId", errors) + check("Value" in rec, f"{label}: RT.{pk} record missing Value", errors) + check("Year" not in rec, f"{label}: RT.{pk} record has Year (should not)", errors) + + # ── RTSM records must have StgId, TechId, MoId, Value ── + rtsm = result.get("RTSM", {}) + for pk, pd in rtsm.items(): + records = pd.get("SC_0", []) + for rec in records[:5]: + for field in ("StgId", "TechId", "MoId", "Value"): + check(field in rec, f"{label}: RTSM.{pk} record missing {field}", errors) + + +def validate_utopia_data(result: dict, tx: MuioTransformer, errors: list[str]) -> None: + """Spot-check UTOPIA data values.""" + + # CapitalCost(UTOPIA, E01, 1990) = 1400 + ryt = result.get("RYT", {}) + cc_records = ryt.get("CC", {}).get("SC_0", []) + e01_id = tx.id_maps["TECHNOLOGY"].get("E01") + found_cc = False + for rec in cc_records: + if rec.get("TechId") == e01_id and rec.get("Year") == "1990": + check(rec["Value"] == 1400, f"UTOPIA CC(E01,1990) expected 1400, got {rec['Value']}", errors) + found_cc = True + break + check(found_cc, "UTOPIA: CC(E01,1990) record not found", errors) + + # OperationalLife(UTOPIA, E01) = 40 + rt = result.get("RT", {}) + ol_records = rt.get("OL", {}).get("SC_0", []) + found_ol = False + for rec in ol_records: + if rec.get("TechId") == e01_id: + check(rec["Value"] == 40, f"UTOPIA OL(E01) expected 40, got {rec['Value']}", errors) + found_ol = True + break + check(found_ol, "UTOPIA: OL(E01) record not found", errors) + + # InputActivityRatio(UTOPIA, E70, DSL, 1, 1990) = 3.4 + rytcm = result.get("RYTCM", {}) + iar_records = rytcm.get("IAR", {}).get("SC_0", []) + e70_id = tx.id_maps["TECHNOLOGY"].get("E70") + dsl_id = tx.id_maps["COMMODITY"].get("DSL") + found_iar = False + for rec in iar_records: + if (rec.get("TechId") == e70_id and rec.get("CommId") == dsl_id + and rec.get("Year") == "1990" and rec.get("MoId") == "1"): + check(rec["Value"] == 3.4, f"UTOPIA IAR(E70,DSL,1,1990) expected 3.4, got {rec['Value']}", errors) + found_iar = True + break + check(found_iar, "UTOPIA: IAR(E70,DSL,1,1990) record not found", errors) + + # genData tech list + gd = result["genData"] + check(len(gd["osy-tech"]) > 0, "UTOPIA: genData.osy-tech empty", errors) + check(gd["osy-region"] == "UTOPIA", f"UTOPIA: region expected 'UTOPIA', got '{gd['osy-region']}'", errors) + + +def validate_muio_data(result: dict, tx: MuioTransformer, errors: list[str]) -> None: + """Spot-check MUIO sample data values.""" + + # CapitalCost(RE1, Coal, 2020) = 1500 + ryt = result.get("RYT", {}) + cc_records = ryt.get("CC", {}).get("SC_0", []) + coal_id = tx.id_maps["TECHNOLOGY"].get("Coal") + found_cc = False + for rec in cc_records: + if rec.get("TechId") == coal_id and rec.get("Year") == "2020": + check(rec["Value"] == 1500, f"MUIO CC(Coal,2020) expected 1500, got {rec['Value']}", errors) + found_cc = True + break + check(found_cc, "MUIO: CC(Coal,2020) record not found", errors) + + # OperationalLife(RE1, Coal) = 40 + rt = result.get("RT", {}) + ol_records = rt.get("OL", {}).get("SC_0", []) + found_ol = False + for rec in ol_records: + if rec.get("TechId") == coal_id: + check(rec["Value"] == 40, f"MUIO OL(Coal) expected 40, got {rec['Value']}", errors) + found_ol = True + break + check(found_ol, "MUIO: OL(Coal) record not found", errors) + + # FUEL → COMMODITY normalization + check("COMMODITY" in tx.sets, "MUIO: COMMODITY set not present after normalization", errors) + check("FUEL" not in tx.sets, "MUIO: FUEL set should have been renamed", errors) + + # MUIO-only sets injected + for s in ("STORAGEINTRADAY", "STORAGEINTRAYEAR", "UDC"): + check(s in tx.sets, f"MUIO: {s} set not injected", errors) + + # genData + gd = result["genData"] + check(gd["osy-region"] == "RE1", f"MUIO: region expected 'RE1', got '{gd['osy-region']}'", errors) + check(len(gd["osy-comm"]) > 0, "MUIO: genData.osy-comm empty", errors) + check(gd["osy-comm"][0]["Comm"] in ("Electricity", "Heat"), + f"MUIO: commodity name unexpected: {gd['osy-comm'][0]['Comm']}", errors) + + +def main(): + utopia_path = FIXTURES / "utopia.txt" + muio_path = FIXTURES / "muio_sample.txt" + + if not utopia_path.exists() or not muio_path.exists(): + print("ERROR: fixtures not found") + sys.exit(1) + + errors: list[str] = [] + + # ── UTOPIA ── + print("Phase 3: Parsing & transforming UTOPIA...") + r1 = GMPLParser.parse_file(utopia_path) + interp1 = SliceInterpreter(r1) + norm1 = interp1.interpret() + tx1 = MuioTransformer(norm1, r1.sets, casename="utopia") + result1 = tx1.transform() + print(f" → {len(result1)} file groups") + + validate_structure(result1, "UTOPIA", errors) + validate_ids(tx1, "UTOPIA", errors) + validate_records(result1, "UTOPIA", errors) + validate_utopia_data(result1, tx1, errors) + + # Print sample + ryt1 = result1.get("RYT", {}) + cc1 = ryt1.get("CC", {}).get("SC_0", []) + print(f" CapitalCost: {len(cc1)} long-form records") + for rec in cc1[:3]: + print(f" {rec}") + + # ── MUIO ── + print("\nPhase 3: Parsing & transforming MUIO sample...") + r2 = GMPLParser.parse_file(muio_path) + interp2 = SliceInterpreter(r2) + norm2 = interp2.interpret() + tx2 = MuioTransformer(norm2, r2.sets, casename="muio_test") + result2 = tx2.transform() + print(f" → {len(result2)} file groups") + + validate_structure(result2, "MUIO", errors) + validate_ids(tx2, "MUIO", errors) + validate_records(result2, "MUIO", errors) + validate_muio_data(result2, tx2, errors) + + # Print sample + ryt2 = result2.get("RYT", {}) + cc2 = ryt2.get("CC", {}).get("SC_0", []) + print(f" CapitalCost: {len(cc2)} long-form records") + for rec in cc2[:3]: + print(f" {rec}") + + # ── Results ── + print("\n" + "=" * 60) + if errors: + print(f"❌ {len(errors)} validation error(s):") + for e in errors: + print(f" - {e}") + sys.exit(1) + else: + print("✅ All Phase 3 validation checks passed!") + + +if __name__ == "__main__": + main()