diff --git a/backend/agent_tools.py b/backend/agent_tools.py index fa902ff..dba585e 100644 --- a/backend/agent_tools.py +++ b/backend/agent_tools.py @@ -729,6 +729,19 @@ def execute_tool( "compute": compute, "generate_chart": generate_chart, } + if tool_name == "run_economy_simulation": + from tooling.backends import get_backend as _get_typed_backend + try: + engine = _get_typed_backend(backend_id) + except ValueError as exc: + return {"error": f"{tool_name} not supported for backend={backend_id!r}", "detail": str(exc)} + if not hasattr(engine, "run_economy_simulation"): + return {"error": f"{tool_name} not implemented for backend={backend_id!r}"} + try: + return engine.run_economy_simulation(**tool_input) + except Exception as exc: + logger.exception(f"[TOOLS] {tool_name} failed") + return {"error": str(exc), "type": type(exc).__name__} if tool_name not in tools: return {"error": f"Unknown tool: {tool_name}"} try: @@ -749,7 +762,7 @@ def execute_tool( def get_tool_definitions(backend_id: str = "uk_compiled") -> List[Dict[str, Any]]: backend = get_backend(backend_id) - return [ + defs: List[Dict[str, Any]] = [ { "name": "run_python", "description": backend.tool_description(), @@ -768,6 +781,28 @@ def get_tool_definitions(backend_id: str = "uk_compiled") -> List[Dict[str, Any] }, }, ] + if backend_id == "uk_python": + from tool_definitions import RUN_ECONOMY_SIMULATION_INPUT_SCHEMA + defs.append( + { + "name": "run_economy_simulation", + "description": ( + "Run a UK economy-wide microsimulation comparing baseline " + "current law to a parametric reform via policyengine_uk. " + "Preferred over run_python for any society-wide reform " + "analysis: removes the need to author Microsimulation+reform " + "code, pins methodology, and returns identical numbers to " + "PE-API's /uk/economy endpoint. Reform keys are programmes " + "(income_tax, national_insurance, child_benefit, ...) with " + "field/value subkeys (personal_allowance, basic_rate, " + "main_rate, ...). Use run_python only for parameters this " + "tool's mapping table does not yet cover, or for structural " + "reforms." + ), + "input_schema": RUN_ECONOMY_SIMULATION_INPUT_SCHEMA, + } + ) + return defs TOOL_DEFINITIONS = get_tool_definitions("uk_compiled") diff --git a/backend/model_backends.py b/backend/model_backends.py index f21d52f..09e6047 100644 --- a/backend/model_backends.py +++ b/backend/model_backends.py @@ -216,31 +216,68 @@ def _ensure_importable(self) -> None: ) def prompt_context(self) -> str: - return """CRITICAL - USE THE POLICYENGINE UK PYTHON MODEL INTERFACE: -- The selected backend is `uk_python`, the Python `policyengine-uk` model package. -- This is the detailed PolicyEngine Core/OpenFisca-style UK model, not the compiled Rust wrapper. -- The Python environment preloads: - `policyengine_uk` as `pe` - `Simulation` - `Microsimulation` - `CountryTaxBenefitSystem` - `Scenario` - `capabilities` - `pd`, `np`, `json`, `math` -- If installed, the higher-level `policyengine` package is also preloaded as `policyengine`. -- Prefer writing code against `policyengine_uk` objects and formulas rather than recreating policy logic. - -COMMON WORKFLOWS FOR THIS BACKEND: -- First inspect backend details: - `result = capabilities()` -- Custom household/situation run: - `sim = Simulation(situation={...})` - `result = sim.calculate("household_net_income", 2025).tolist()` + return """TOOL ROUTING - READ THIS FIRST: + +Society-wide / economy-wide / population-level reform question? + → CALL `run_economy_simulation` ON YOUR FIRST TURN. Do not call + `run_python` first to "explore the API" or "check the interface". + Do not call `capabilities()`. Go straight to `run_economy_simulation`. + +Reform expressible in the programme/field shape (see list below)? + → `run_economy_simulation` is REQUIRED. Falling back to `run_python` + for an expressible reform is a bug: it produces drifting numbers + across runs and wastes 10-30 tool calls relearning the engine. + +Reform NOT in the mapped field list, OR question isn't about a reform? + → Then and only then, use `run_python`. + +`run_economy_simulation` shape: +- args: `{year, reform, dataset}`. `year` defaults to 2025; `dataset` + defaults to "efrs" (Enhanced FRS, the same data PE-API uses). +- `reform` is a programme→field→value dict, NOT dotted parameter paths. +- Returns a structured dict: `budget.{budgetary_impact, tax_revenue_impact, + benefit_spending_impact}`, `decile.{average, relative}` keyed 1-10, + `poverty.poverty.{all, child, adult, senior}.{baseline, reform}`. +- Numbers match PE-API's `/uk/economy` endpoint by construction (same + engine, same methodology). + +Worked examples (copy this shape): +- Raise personal allowance to £15,000: + `{"reform": {"income_tax": {"personal_allowance": 15000}}}` +- Basic rate 20% to 21%: + `{"reform": {"income_tax": {"basic_rate": 0.21}}}` +- NI main rate 8% to 6%: + `{"reform": {"national_insurance": {"main_rate": 0.06}}}` +- Stacked: basic+higher rates + NI cut: + `{"reform": {"income_tax": {"basic_rate": 0.22, "higher_rate": 0.42}, + "national_insurance": {"main_rate": 0.06}}}` + +Mapped programme/field combinations (ONLY these are expressible): +- income_tax: personal_allowance, basic_rate, higher_rate, additional_rate +- national_insurance: main_rate, primary_threshold +- child_benefit: eldest_amount, additional_amount + +If your reform uses any other field, the tool will return an error +listing valid fields. THEN fall back to `run_python`. + +ABOUT THIS BACKEND: +- Selected backend is `uk_python`, the Python `policyengine-uk` model package. +- This is the PolicyEngine Core / OpenFisca-style UK model, not the compiled + Rust wrapper. + +PYTHON ENVIRONMENT (for `run_python` fallback only): +- Preloaded: `policyengine_uk` as `pe`, `Simulation`, `Microsimulation`, + `CountryTaxBenefitSystem`, `Scenario`, `capabilities`, `pd`, `np`, + `json`, `math`. The higher-level `policyengine` package is preloaded + when installed. +- Custom household: `sim = Simulation(situation={...})`, + `result = sim.calculate("household_net_income", 2025).tolist()`. - Microsimulation from published UK data: `sim = Microsimulation(dataset="hf://policyengine/policyengine-uk-data/enhanced_frs_2023_24.h5")` - `result = sim.calculate("household_net_income", 2025).head().to_list()` -- Parameter reform: - pass parameter changes through `Scenario` or mutate a simulation with documented `policyengine_uk` helpers. +- Parameter reform NOT expressible via `run_economy_simulation`: + pass a dotted-path `reform=` dict to `Microsimulation(...)`: + `reform = {"gov.hmrc.income_tax.allowances.personal_allowance.amount": {"2025-01-01.2025-12-31": 15000}}` + `sim = Microsimulation(dataset=..., reform=reform)` MODELLING SCOPE: - This backend exposes the Python `policyengine-uk` model surface. Its API, datasets, variables, and results can differ from `uk_compiled`. diff --git a/backend/tool_definitions.py b/backend/tool_definitions.py new file mode 100644 index 0000000..c7c5b40 --- /dev/null +++ b/backend/tool_definitions.py @@ -0,0 +1,267 @@ +"""Model-facing tool definitions for the UK chat runtime.""" + +from tooling.reforms import REFORM_SCHEMA + + +YEAR_SCHEMA = {"type": "integer", "default": 2025} + +REFORM_PROPERTY = REFORM_SCHEMA + +STRING_ARRAY_SCHEMA = {"type": "array", "items": {"type": "string"}} + +ALL_DATASET_SCHEMA = { + "type": "string", + "enum": ["frs", "efrs", "spi", "lcfs", "was"], + "default": "frs", + "description": "Microdata source for aggregate simulation. FRS is the default for aggregate outputs.", +} + +NON_FRS_DATASET_SCHEMA = { + "type": "string", + "enum": ["efrs", "spi", "lcfs", "was"], + "default": "efrs", + "description": "FRS is not available for analyse_microdata.", +} + +FILTERS_SCHEMA = { + "type": "object", + "description": ( + "Column to predicate map. Predicate can be a scalar, a list, or a " + "dict with min, max, gt, lt, gte, lte, or ne." + ), +} + +CHART_FORMAT_SCHEMA = { + "type": "string", + "enum": ["currency", "percent", "percent_decimal", "number", "compact", "year"], + "description": ( + "Number format for axis ticks and tooltips. Use `currency` for GBP " + "amounts, `percent` for values already on a 0-100 scale, " + "`percent_decimal` for 0-1 shares, `compact` for large counts (1.2k), " + "`year` for calendar years." + ), +} + +CHART_DATA_SCHEMA = { + "type": "array", + "description": "List of row objects. Each row must contain the `x_field` key and every key listed in `y_fields`.", + "items": {"type": "object"}, +} + + +VALIDATE_REFORM_INPUT_SCHEMA = { + "type": "object", + "properties": { + "reform": REFORM_PROPERTY, + }, + "required": ["reform"], +} + +HOUSEHOLD_RECORD_SCHEMA = {"type": "array", "items": {"type": "object"}} + +CALCULATE_HOUSEHOLD_INPUT_SCHEMA = { + "type": "object", + "properties": { + "person": { + **HOUSEHOLD_RECORD_SCHEMA, + "description": ( + "Person records. Each should include person_id, benunit_id, " + "household_id, and age. Common optional fields include " + "employment_income, self_employment_income, and pension_income." + ), + }, + "benunit": { + **HOUSEHOLD_RECORD_SCHEMA, + "description": "Benefit-unit records, each with benunit_id and household_id.", + }, + "household": { + **HOUSEHOLD_RECORD_SCHEMA, + "description": ( + "Household records, each with household_id. Add location fields " + "when relevant, for example region or is_in_scotland." + ), + }, + "year": YEAR_SCHEMA, + "reform": REFORM_PROPERTY, + }, + "required": ["person", "benunit", "household"], +} + +RUN_ECONOMY_SIMULATION_INPUT_SCHEMA = { + "type": "object", + "properties": { + "year": YEAR_SCHEMA, + "reform": REFORM_PROPERTY, + "dataset": ALL_DATASET_SCHEMA, + }, + "required": [], +} + +ANALYSE_MICRODATA_INPUT_SCHEMA = { + "type": "object", + "properties": { + "entity": {"type": "string", "enum": ["persons", "benunits", "households"]}, + "operation": {"type": "string", "enum": ["sample", "mean", "sum", "count", "group_by", "describe"]}, + "year": YEAR_SCHEMA, + "reform": REFORM_PROPERTY, + "filters": FILTERS_SCHEMA, + "columns": STRING_ARRAY_SCHEMA, + "group_by": STRING_ARRAY_SCHEMA, + "n": {"type": "integer", "default": 5, "description": "Sample size when operation is sample."}, + "dataset": NON_FRS_DATASET_SCHEMA, + }, + "required": ["entity", "operation"], +} + +RUN_PYTHON_INPUT_SCHEMA = { + "type": "object", + "properties": { + "code": { + "type": "string", + "description": ( + "Python code to execute. Must assign the final answer to `result`. " + "Use the preloaded PolicyEngine interface directly, for example: " + "`sim = Simulation(year=2025)` or `policy = Parameters.model_validate({...})`." + ), + }, + }, + "required": ["code"], +} + +GENERATE_CHART_INPUT_SCHEMA = { + "type": "object", + "properties": { + "chart_type": { + "type": "string", + "enum": ["line", "bar", "area", "scatter"], + "description": ( + "Chart type. Use `line` for schedules/curves over a continuous x, " + "`bar` for category comparisons (e.g. deciles), `area` for " + "stacked compositions, `scatter` for point clouds." + ), + }, + "title": {"type": "string", "description": "Factually neutral chart title shown above the plot."}, + "data": CHART_DATA_SCHEMA, + "x_field": {"type": "string", "description": "Key in each data row to use as the x value."}, + "y_fields": { + **STRING_ARRAY_SCHEMA, + "description": ( + "Keys in each data row to plot as y series. Provide multiple for " + "multi-series charts (e.g. baseline vs reform)." + ), + }, + "x_label": {"type": "string", "description": "Axis label for x (defaults to `x_field`)."}, + "y_label": {"type": "string", "description": "Axis label for y (defaults to first y field or 'Value')."}, + "x_format": {**CHART_FORMAT_SCHEMA, "description": f"X-axis {CHART_FORMAT_SCHEMA['description']}"}, + "y_format": {**CHART_FORMAT_SCHEMA, "description": f"Y-axis {CHART_FORMAT_SCHEMA['description']}"}, + "x_min": {"type": "number", "description": "Optional fixed minimum for the x axis."}, + "x_max": {"type": "number", "description": "Optional fixed maximum for the x axis."}, + "y_min": {"type": "number", "description": "Optional fixed minimum for the y axis."}, + "y_max": {"type": "number", "description": "Optional fixed maximum for the y axis."}, + "series_labels": { + **STRING_ARRAY_SCHEMA, + "description": "Display labels for each y series, in the same order as `y_fields`.", + }, + "series_styles": { + "type": "array", + "description": "Line style per series (line/area charts).", + "items": {"type": "string", "enum": ["solid", "dashed", "dotted"]}, + }, + "series_curves": { + "type": "array", + "description": "Curve interpolation per series (line/area charts).", + "items": {"type": "string", "enum": ["smooth", "step", "linear"]}, + }, + "subtitle": {"type": "string", "description": "Optional subtitle shown under the title."}, + "source": {"type": "string", "description": "Optional source/caption shown beneath the chart."}, + "arrangement": { + "type": "string", + "enum": ["grouped", "stacked"], + "description": "For bar charts only: `grouped` side-by-side or `stacked`.", + }, + "area_fill": {"type": "boolean", "description": "For line charts only: fill the area under the line."}, + }, + "required": ["chart_type", "title", "data", "x_field", "y_fields"], +} + +TOOL_DEFINITIONS = [ + { + "name": "validate_reform", + "description": ( + "Validate parametric reform JSON without running a simulation. " + "Use this when the user is drafting, debugging, or asking whether " + "a reform object is valid. Do not call it as a routine preflight " + "before every simulation; calculation tools validate reforms internally." + ), + "input_schema": VALIDATE_REFORM_INPUT_SCHEMA, + }, + { + "name": "calculate_household", + "description": ( + "Compute taxes, benefits, and net income for an illustrative " + "specific household described with person, benefit-unit, and " + "household records. Prefer this over run_python for household-level " + "questions with a defined household composition. These inputs are " + "synthetic examples, not real households." + ), + "input_schema": CALCULATE_HOUSEHOLD_INPUT_SCHEMA, + }, + { + "name": "run_economy_simulation", + "description": ( + "Run a UK economy-wide microsimulation comparing baseline current " + "law to a parametric reform. Returns aggregate outputs including " + "budgetary impact, programme breakdown, decile impacts, " + "winners/losers, caseloads, HBAI incomes, and poverty metrics. " + "Prefer this over run_python for society-wide reform analysis. " + "Use run_python for structural reforms." + ), + "input_schema": RUN_ECONOMY_SIMULATION_INPUT_SCHEMA, + }, + { + "name": "analyse_microdata", + "description": ( + "Slice, filter, sample, or aggregate non-FRS model microdata for a " + "given year and optional parametric reform. Use this for allowed " + "non-FRS microdata follow-ups such as subset means, counts, group " + "breakdowns, descriptions, or small model-record samples. This tool " + "explicitly does not support FRS; use run_economy_simulation for " + "aggregate FRS outputs." + ), + "input_schema": ANALYSE_MICRODATA_INPUT_SCHEMA, + }, + { + "name": "run_python", + "description": ( + "Execute reproducible Python code using the official PolicyEngine UK compiled interface. " + "Prefer the typed tools (`calculate_household`, `run_economy_simulation`, `analyse_microdata`) " + "when the question fits their shape; use `run_python` as a fallback for structural reforms, " + "novel aggregations, parameter introspection, historical lookups, or unsupported cases. " + "The environment preloads `policyengine_uk_compiled` as `pe`, plus `Simulation`, `Parameters`, " + "`StructuralReform`, `aggregate_microdata`, `combine_microdata`, `capabilities`, " + "`ensure_dataset`, `pd`, `np`, `json`, and `math`. Assign the final answer to `result` and " + "use `print()` for intermediate output. Do not inspect or return row-level survey microdata, " + "including FRS data. For household examples, create illustrative synthetic households, prefer " + "`Simulation.single_person()` for single-person examples, and label them as illustrative rather " + "than real households." + ), + "input_schema": RUN_PYTHON_INPUT_SCHEMA, + }, + { + "name": "generate_chart", + "description": ( + "Generate a chart JSON block for the frontend to render. " + "Use this for visualisations such as income distributions, marginal-rate or tax-schedule curves, " + "decile impact comparisons, and trends over time or income. " + "Use factually neutral titles, subtitles, labels, and captions; do not call policies good, bad, fair, unfair, " + "regressive, progressive, generous, or punitive. " + "The tool returns a `chart_markdown` field containing a ```chart fenced JSON block - you MUST paste that " + "string verbatim into your next text response, otherwise the chart will not appear to the user. " + "Do not attempt to render charts with matplotlib inside `run_python`; the UI cannot display matplotlib output. " + "Compute the data first with a typed calculation tool or `run_python` " + "(returning a list of row dicts), then pass it to this tool." + ), + "input_schema": GENERATE_CHART_INPUT_SCHEMA, + }, +] + diff --git a/backend/tooling/__init__.py b/backend/tooling/__init__.py new file mode 100644 index 0000000..7af5084 --- /dev/null +++ b/backend/tooling/__init__.py @@ -0,0 +1,2 @@ +"""Shared deterministic helpers for UK chat tools.""" + diff --git a/backend/tooling/backends/__init__.py b/backend/tooling/backends/__init__.py new file mode 100644 index 0000000..b25da05 --- /dev/null +++ b/backend/tooling/backends/__init__.py @@ -0,0 +1,47 @@ +"""Engine-backend strategies for the typed-tool layer. + +The typed tools (``run_economy_simulation``, ``calculate_household``, +``analyse_microdata``) are engine-agnostic at the call-site. This package +holds the per-engine implementations and a single dispatcher. + +The ``backend_id`` matches ``backend/model_backends.py``'s identifiers +(``uk_python``, ``uk_compiled``) so the same selector the chat already +uses to pick a ``run_python`` execution environment also picks the typed +tool implementation. +""" + +from typing import Protocol, Any, Dict, Optional + + +class EngineBackend(Protocol): + """A typed-tool execution surface backed by one PolicyEngine engine.""" + + backend_id: str + + def run_economy_simulation( + self, + year: int, + reform: Optional[Dict[str, Any]], + dataset: str, + ) -> Dict[str, Any]: + ... + + +_BACKENDS: Dict[str, EngineBackend] = {} + + +def register(backend: EngineBackend) -> None: + _BACKENDS[backend.backend_id] = backend + + +def get_backend(backend_id: str) -> EngineBackend: + if backend_id not in _BACKENDS: + raise ValueError( + f"Unknown typed-tool backend: {backend_id!r}. " + f"Registered: {sorted(_BACKENDS)}" + ) + return _BACKENDS[backend_id] + + +# Side-effect imports register the available engines on package load. +from tooling.backends import uk_python # noqa: E402,F401 diff --git a/backend/tooling/backends/uk_python.py b/backend/tooling/backends/uk_python.py new file mode 100644 index 0000000..fadf1a5 --- /dev/null +++ b/backend/tooling/backends/uk_python.py @@ -0,0 +1,285 @@ +"""policyengine_uk (Python engine) backend for the typed tools. + +Translates the typed-tool reform shape — programme/field, e.g.:: + + {"income_tax": {"personal_allowance": 15000}} + +— into the dotted-path-and-period shape ``policyengine_uk.Simulation`` +takes via its ``reform=`` kwarg:: + + {"gov.hmrc.income_tax.allowances.personal_allowance.amount": + {"2025-01-01.2025-12-31": 15000}} + +The dotted-path form is the same shape PE-API stores in +``/uk/policy/``. Because PE-API also runs ``policyengine_uk`` (same +package version pin: ``2.88.20``), calls through this backend produce +numbers identical to PE-API by construction. + +Coverage today: the parameters touched by the eval B-suite scenarios +(B1, B2, b6-b10). The mapping table is hand-curated; grow it as new +scenarios land. +""" + +import logging +from typing import Any, Dict, List, Optional, Tuple + + +logger = logging.getLogger(__name__) + + +backend_id = "uk_python" + + +# --------------------------------------------------------------------------- +# Reform translation +# --------------------------------------------------------------------------- + +# Maps the typed-tool's programme/field pair to the dotted parameter path +# policyengine_uk recognises. Verified against `CountryTaxBenefitSystem` +# at adapter authoring time — see commit message for the verification +# transcript. Add entries here as new scenarios require new parameters; +# unknown (programme, field) pairs raise ReformTranslationError. +_FIELD_TO_PATH: Dict[Tuple[str, str], str] = { + ("income_tax", "personal_allowance"): + "gov.hmrc.income_tax.allowances.personal_allowance.amount", + ("income_tax", "basic_rate"): + "gov.hmrc.income_tax.rates.uk[0].rate", + ("income_tax", "higher_rate"): + "gov.hmrc.income_tax.rates.uk[1].rate", + ("income_tax", "additional_rate"): + "gov.hmrc.income_tax.rates.uk[2].rate", + ("national_insurance", "main_rate"): + "gov.hmrc.national_insurance.class_1.rates.employee.main", + ("national_insurance", "primary_threshold"): + "gov.hmrc.national_insurance.class_1.thresholds.primary_threshold", + ("child_benefit", "eldest_amount"): + "gov.hmrc.child_benefit.amount.eldest", + ("child_benefit", "additional_amount"): + "gov.hmrc.child_benefit.amount.additional", +} + + +class ReformTranslationError(ValueError): + """A typed-tool reform field has no mapping for this engine.""" + + +def _period_for_year(year: int) -> str: + """Build the YYYY-MM-DD.YYYY-MM-DD period key for a tax year.""" + return f"{year}-01-01.{year}-12-31" + + +def translate_reform( + reform: Optional[Dict[str, Any]], + year: int, +) -> Optional[Dict[str, Any]]: + """Convert programme/field reform to policyengine_uk's dotted-path shape. + + Returns None for an empty/None reform so the caller can pass it through + to ``Simulation(reform=...)`` unchanged. + """ + if not reform: + return None + + out: Dict[str, Any] = {} + period = _period_for_year(year) + unknown: List[str] = [] + + for programme, fields in reform.items(): + if not isinstance(fields, dict): + raise ReformTranslationError( + f"Reform programme {programme!r} must be a dict, " + f"got {type(fields).__name__}" + ) + for field, value in fields.items(): + key = (programme, field) + if key not in _FIELD_TO_PATH: + unknown.append(f"{programme}.{field}") + continue + out[_FIELD_TO_PATH[key]] = {period: value} + + if unknown: + known = sorted(f"{p}.{f}" for p, f in _FIELD_TO_PATH) + raise ReformTranslationError( + f"No mapping for: {unknown}. Known fields: {known}" + ) + + return out + + +# --------------------------------------------------------------------------- +# Simulation runner +# --------------------------------------------------------------------------- + +# Where policyengine_uk's published Enhanced FRS sits. Matches the URL the +# eval B-suite YAML files use, so chat output is on the same dataset as +# the fixtures. +_DEFAULT_DATASET_URL = ( + "hf://policyengine/policyengine-uk-data-private/enhanced_frs_2023_24.h5" +) + + +def _build_microsim(dataset: str, reform_dict: Optional[Dict[str, Any]]): + """Construct a Microsimulation against the requested dataset + reform.""" + from policyengine_uk import Microsimulation + if dataset in ("frs", "efrs", "default", ""): + ds = _DEFAULT_DATASET_URL + else: + # Pass arbitrary HF URLs / paths straight through. + ds = dataset + return Microsimulation(dataset=ds, reform=reform_dict) + + +def run_economy_simulation( + year: int = 2025, + reform: Optional[Dict[str, Any]] = None, + dataset: str = "efrs", +) -> Dict[str, Any]: + """Run a society-wide UK reform comparison via policyengine_uk. + + TEMP / TECH DEBT — see https://github.com/PolicyEngine/policyengine-uk-chat + This function inlines ~50 lines of glue that ARE ALREADY IMPLEMENTED in + ``policyengine.outputs.macro.comparison.calculate_economy_comparison``. + The right long-term shape is:: + + from policyengine.outputs.macro.comparison.calculate_economy_comparison \\ + import calculate_economy_comparison + return calculate_economy_comparison(sim).model_dump() + + We can't do that today because ``pip install policyengine==0.13.0`` + transitively requires ``policyengine_us``, which pins + ``policyengine_core>=3.26.0``, which conflicts with the precise + ``policyengine_core==3.25.3`` the orchestrator at 0.13.0 was built + against. Resolution requires either (a) PolicyEngine releasing a + coherent triplet of orchestrator + uk + core, or (b) the orchestrator + making country backends optional installs (``policyengine[uk]``). + Until one of those lands, we mirror the methodology by hand so the + chat backend's deps stay tight. Drop this block when the + orchestrator imports cleanly. + + Mirrors the methodology in ``calculate_economy_comparison`` line-by-line + so output values match PE-API's ``/uk/economy`` endpoint: + + - Total tax ``gov_tax``, total spending ``gov_spending`` for UK. + - Decile groupby on ``household_income_decile``, average = + sum(change) / count(households) per bin. + - Poverty: person-level ``in_poverty`` with ``map_to='person'``, + grouped by ``age < 18`` (child), ``18..64`` (adult), ``>= 65`` + (senior), weighted mean by person_weight. + + Engine-locked to ``policyengine_uk == 2.88.20``. + """ + # TODO(tech-debt): Replace body with calculate_economy_comparison(sim).model_dump() + # once the policyengine orchestrator dependency resolves cleanly. The output + # shape we build below is a subset of what the orchestrator returns. + try: + reform_dict = translate_reform(reform, year) + except ReformTranslationError as exc: + return {"error": "Reform translation failed", "detail": str(exc)} + + try: + from microdf import MicroSeries + + sim_b = _build_microsim(dataset, None) + sim_r = _build_microsim(dataset, reform_dict) + + # --- Budget (mirrors budgetary_impact) --- + gov_tax_b = float(sim_b.calculate("gov_tax", year).sum()) + gov_tax_r = float(sim_r.calculate("gov_tax", year).sum()) + gov_spend_b = float(sim_b.calculate("gov_spending", year).sum()) + gov_spend_r = float(sim_r.calculate("gov_spending", year).sum()) + + tax_revenue_impact = gov_tax_r - gov_tax_b + benefit_spending_impact = gov_spend_r - gov_spend_b + budgetary_impact = tax_revenue_impact - benefit_spending_impact + + # --- Decile (mirrors decile_impact) --- + hh_weight = sim_b.calculate("household_weight", year) + net_b = MicroSeries( + sim_b.calculate("household_net_income", year).values, + weights=hh_weight.values, + ) + net_r = MicroSeries( + sim_r.calculate("household_net_income", year).values, + weights=hh_weight.values, + ) + decile = MicroSeries(sim_b.calculate("household_income_decile", year).values) + + # Filter out the -1 sentinel + mask_valid = decile >= 0 + net_b_f = net_b[mask_valid] + net_r_f = net_r[mask_valid] + decile_f = decile[mask_valid] + + income_change = net_r_f - net_b_f + rel_by_decile = ( + income_change.groupby(decile_f).sum() + / net_b_f.groupby(decile_f).sum() + ) + avg_by_decile = ( + income_change.groupby(decile_f).sum() + / net_b_f.groupby(decile_f).count() + ) + + # --- Poverty (mirrors poverty_impact) --- + person_weight = sim_b.calculate("person_weight", year) + person_in_pov_b = sim_b.calculate("in_poverty", year, map_to="person") + person_in_pov_r = sim_r.calculate("in_poverty", year, map_to="person") + age = MicroSeries(sim_b.calculate("age", year).values) + + # baseline_poverty uses person_weight for both baseline and reform — + # PE-API freezes weights on the baseline so reform comparisons are + # apples-to-apples. + pov_b = MicroSeries(person_in_pov_b.values, weights=person_weight.values) + pov_r = MicroSeries(person_in_pov_r.values, weights=person_weight.values) + + def pov_group(s: MicroSeries, mask) -> float: + return float(s[mask].mean()) + + poverty = { + "child": { + "baseline": pov_group(pov_b, age < 18), + "reform": pov_group(pov_r, age < 18), + }, + "adult": { + "baseline": pov_group(pov_b, (age >= 18) & (age < 65)), + "reform": pov_group(pov_r, (age >= 18) & (age < 65)), + }, + "senior": { + "baseline": pov_group(pov_b, age >= 65), + "reform": pov_group(pov_r, age >= 65), + }, + "all": { + "baseline": float(pov_b.mean()), + "reform": float(pov_r.mean()), + }, + } + + return { + "engine": "policyengine_uk", + "year": year, + "dataset": dataset, + "budget": { + "budgetary_impact": budgetary_impact, + "tax_revenue_impact": tax_revenue_impact, + "benefit_spending_impact": benefit_spending_impact, + }, + "decile": { + "average": {int(k): float(v) for k, v in avg_by_decile.to_dict().items()}, + "relative": {int(k): float(v) for k, v in rel_by_decile.to_dict().items()}, + }, + "poverty": {"poverty": poverty}, + "_reform_applied_dotted": reform_dict, + } + except Exception as exc: + logger.exception("run_economy_simulation failed") + return {"error": str(exc), "type": type(exc).__name__} + + +# Register at import time so `tooling.backends.get_backend("uk_python")` +# resolves without an explicit import in callers. +import sys as _sys + +_self = _sys.modules[__name__] +from tooling.backends import register as _register # noqa: E402 + +_register(_self) # type: ignore[arg-type] diff --git a/backend/tooling/households.py b/backend/tooling/households.py new file mode 100644 index 0000000..1092c77 --- /dev/null +++ b/backend/tooling/households.py @@ -0,0 +1,87 @@ +"""Illustrative household input normalization.""" + +from typing import Any, Dict, List, Tuple + +from tooling.simulations import ensure_compiled_package_importable + + +def build_household_frames( + person: List[Dict[str, Any]], + benunit: List[Dict[str, Any]], + household: List[Dict[str, Any]], +) -> Tuple[Any, Any, Any]: + ensure_compiled_package_importable() + import pandas as pd + from policyengine_uk_compiled import BENUNIT_DEFAULTS, HOUSEHOLD_DEFAULTS, PERSON_DEFAULTS + + def fill_defaults(records, defaults): + return pd.DataFrame([{**defaults, **rec} for rec in records]) + + hh_id_map = {rec["household_id"]: i for i, rec in enumerate(household)} + bu_id_map = {rec["benunit_id"]: i for i, rec in enumerate(benunit)} + person = [ + { + **rec, + "person_id": i, + "benunit_id": bu_id_map[rec["benunit_id"]], + "household_id": hh_id_map[rec["household_id"]], + } + for i, rec in enumerate(person) + ] + benunit = [ + { + **rec, + "benunit_id": bu_id_map[rec["benunit_id"]], + "household_id": hh_id_map[rec["household_id"]], + } + for rec in benunit + ] + household = [{**rec, "household_id": hh_id_map[rec["household_id"]]} for rec in household] + + seen_bu_heads = set() + seen_hh_heads = set() + for rec in person: + bu_id = rec["benunit_id"] + hh_id = rec["household_id"] + is_adult = rec.get("age", 30) >= 16 + rec["is_benunit_head"] = is_adult and bu_id not in seen_bu_heads + rec["is_household_head"] = is_adult and hh_id not in seen_hh_heads + if rec["is_benunit_head"]: + seen_bu_heads.add(bu_id) + if rec["is_household_head"]: + seen_hh_heads.add(hh_id) + + persons_df = fill_defaults(person, PERSON_DEFAULTS) + benunits_df = fill_defaults(benunit, BENUNIT_DEFAULTS) + households_df = fill_defaults(household, HOUSEHOLD_DEFAULTS) + + if "person_ids" not in benunits_df.columns or ( + benunits_df["person_ids"] == BENUNIT_DEFAULTS.get("person_ids", 0) + ).all(): + bu_to_persons = persons_df.groupby("benunit_id")["person_id"].apply( + lambda ids: ",".join(str(i) for i in ids) + ) + benunits_df["person_ids"] = ( + benunits_df["benunit_id"].map(bu_to_persons).fillna(benunits_df["benunit_id"].astype(str)) + ) + if "benunit_ids" not in households_df.columns or ( + households_df["benunit_ids"] == HOUSEHOLD_DEFAULTS.get("benunit_ids", 0) + ).all(): + hh_to_benunits = benunits_df.groupby("household_id")["benunit_id"].apply( + lambda ids: ",".join(str(i) for i in ids) + ) + households_df["benunit_ids"] = ( + households_df["household_id"].map(hh_to_benunits).fillna(households_df["household_id"].astype(str)) + ) + if "person_ids" not in households_df.columns or ( + households_df["person_ids"] == HOUSEHOLD_DEFAULTS.get("person_ids", 0) + ).all(): + hh_to_persons = persons_df.groupby("household_id")["person_id"].apply( + lambda ids: ",".join(str(i) for i in ids) + ) + households_df["person_ids"] = ( + households_df["household_id"].map(hh_to_persons).fillna(households_df["household_id"].astype(str)) + ) + + return persons_df, benunits_df, households_df + diff --git a/backend/tooling/microdata.py b/backend/tooling/microdata.py new file mode 100644 index 0000000..761e945 --- /dev/null +++ b/backend/tooling/microdata.py @@ -0,0 +1,230 @@ +"""Microdata loading, filtering, and aggregate operations.""" + +import hashlib +import json +from typing import Any, Dict, List, Optional + +from tooling.reforms import build_compiled_policy +from tooling.serialization import json_safe +from tooling.simulations import DATASET_LABELS, build_simulation + + +_microdata_cache: Dict[tuple, Any] = {} +_MAX_CACHE = 4 + + +def hash_reform(reform: Optional[Dict[str, Any]]) -> str: + if not reform: + return "none" + return hashlib.md5(json.dumps(reform, sort_keys=True).encode()).hexdigest() + + +def get_cached_microdata(year: int, reform: Optional[Dict[str, Any]], dataset: str, structural=None): + """Return cached MicrodataResult. Structural reforms always run fresh.""" + if structural is not None: + policy = build_compiled_policy(reform) + sim = build_simulation(year, dataset) + return sim.run_microdata(policy=policy, structural=structural) + key = (year, hash_reform(reform), dataset) + if key not in _microdata_cache: + policy = build_compiled_policy(reform) + sim = build_simulation(year, dataset) + _microdata_cache[key] = sim.run_microdata(policy=policy) + if len(_microdata_cache) > _MAX_CACHE: + del _microdata_cache[next(iter(_microdata_cache))] + return _microdata_cache[key] + + +def analyse_microdata_result( + microdata, + entity: str, + operation: str, + year: int, + dataset_key: str, + reform_applied: bool, + structural_reform_applied: bool, + filters: Optional[Dict[str, Any]] = None, + columns: Optional[List[str]] = None, + group_by: Optional[List[str]] = None, + n: int = 5, +) -> Dict[str, Any]: + import pandas as pd + + entity_map = {"persons": microdata.persons, "benunits": microdata.benunits, "households": microdata.households} + if entity not in entity_map: + return {"error": "entity must be one of: persons, benunits, households"} + df = entity_map[entity].copy() + + weights = microdata.households[["household_id", "weight"]].copy() + if "household_id" in df.columns and "weight" not in df.columns: + df = df.merge(weights, on="household_id", how="left") + elif "weight" not in df.columns: + df["weight"] = 1.0 + + change_pairs = { + "persons": [ + ("income_tax", "baseline_income_tax", "reform_income_tax"), + ("employee_ni", "baseline_employee_ni", "reform_employee_ni"), + ("total_income", "baseline_total_income", "reform_total_income"), + ], + "benunits": [ + ("total_benefits", "baseline_total_benefits", "reform_total_benefits"), + ("universal_credit", "baseline_universal_credit", "reform_universal_credit"), + ("child_benefit", "baseline_child_benefit", "reform_child_benefit"), + ], + "households": [ + ("net_income", "baseline_net_income", "reform_net_income"), + ("total_tax", "baseline_total_tax", "reform_total_tax"), + ("total_benefits", "baseline_total_benefits", "reform_total_benefits"), + ], + } + for change_col, base_col, reform_col in change_pairs.get(entity, []): + if base_col in df.columns and reform_col in df.columns: + df[f"{change_col}_change"] = df[reform_col] - df[base_col] + + filters_applied = {} + if filters: + for col, fval in filters.items(): + if col not in df.columns: + return {"error": f"Column '{col}' not found. Available: {list(df.columns)}"} + filters_applied[col] = fval + if isinstance(fval, dict): + if "min" in fval: + df = df[df[col] >= fval["min"]] + if "max" in fval: + df = df[df[col] <= fval["max"]] + if "gt" in fval: + df = df[df[col] > fval["gt"]] + if "lt" in fval: + df = df[df[col] < fval["lt"]] + if "gte" in fval: + df = df[df[col] >= fval["gte"]] + if "lte" in fval: + df = df[df[col] <= fval["lte"]] + if "ne" in fval: + df = df[df[col] != fval["ne"]] + elif isinstance(fval, list): + df = df[df[col].isin(fval)] + else: + df = df[df[col] == fval] + + row_count = len(df) + weighted_count = int(df["weight"].sum()) if "weight" in df.columns else row_count + all_cols = list(df.columns) + + if columns: + missing = [c for c in columns if c not in df.columns] + if missing: + return {"error": f"Columns not found: {missing}. Available: {all_cols}"} + value_cols = columns + else: + if entity == "persons": + value_cols = [ + "age", + "gender", + "employment_income", + "self_employment_income", + "baseline_income_tax", + "reform_income_tax", + "income_tax_change", + "baseline_total_income", + "reform_total_income", + "total_income_change", + ] + elif entity == "benunits": + value_cols = [ + "baseline_total_benefits", + "reform_total_benefits", + "total_benefits_change", + "baseline_universal_credit", + "reform_universal_credit", + "baseline_child_benefit", + "reform_child_benefit", + ] + else: + value_cols = [ + "region", + "baseline_net_income", + "reform_net_income", + "net_income_change", + "baseline_total_tax", + "reform_total_tax", + "baseline_total_benefits", + "reform_total_benefits", + ] + value_cols = [c for c in value_cols if c in df.columns] + + if operation == "sample": + actual_n = min(n, 20, row_count) + sample_df = df[value_cols].sample(n=actual_n, random_state=42) if row_count >= actual_n else df[value_cols] + result = [ + {k: (None if (isinstance(v, float) and str(v) == "nan") else v) for k, v in row.items()} + for row in sample_df.to_dict(orient="records") + ] + elif operation == "mean": + numeric_cols = [c for c in value_cols if pd.api.types.is_numeric_dtype(df[c]) and c != "weight"] + result = { + c: float((df[c] * df["weight"]).sum() / df["weight"].sum()) + if df["weight"].sum() > 0 + else float(df[c].mean()) + for c in numeric_cols + } + elif operation == "sum": + numeric_cols = [c for c in value_cols if pd.api.types.is_numeric_dtype(df[c]) and c != "weight"] + result = {c: float((df[c] * df["weight"]).sum()) for c in numeric_cols} + elif operation == "count": + result = {"row_count": row_count, "weighted_population": weighted_count} + elif operation == "group_by": + if not group_by: + return {"error": "group_by operation requires at least one group_by column"} + missing_groups = [c for c in group_by if c not in df.columns] + if missing_groups: + return {"error": f"Group columns not found: {missing_groups}. Available: {all_cols}"} + numeric_cols = [c for c in value_cols if pd.api.types.is_numeric_dtype(df[c]) and c != "weight"] + grouped_rows = [] + for keys, group in df.groupby(group_by, dropna=False): + if not isinstance(keys, tuple): + keys = (keys,) + row = {col: json_safe(value) for col, value in zip(group_by, keys)} + row["row_count"] = int(len(group)) + row["weighted_population"] = float(group["weight"].sum()) + for col in numeric_cols: + row[col] = ( + float((group[col] * group["weight"]).sum() / group["weight"].sum()) + if group["weight"].sum() > 0 + else float(group[col].mean()) + ) + grouped_rows.append(row) + result = grouped_rows + elif operation == "describe": + numeric_cols = [c for c in value_cols if pd.api.types.is_numeric_dtype(df[c]) and c != "weight"] + result = { + c: { + "mean": float((df[c] * df["weight"]).sum() / df["weight"].sum()) + if df["weight"].sum() > 0 + else float(df[c].mean()), + "min": float(df[c].min()), + "max": float(df[c].max()), + "count": int(df[c].count()), + } + for c in numeric_cols + } + for col in [c for c in value_cols if not pd.api.types.is_numeric_dtype(df[c])]: + result[col] = {str(k): int(v) for k, v in df[col].value_counts().head(10).items()} + else: + return {"error": f"Unknown operation '{operation}'. Use: mean, sum, count, sample, group_by, describe"} + + return { + "entity": entity, + "operation": operation, + "year": year, + "dataset": DATASET_LABELS.get(dataset_key, dataset_key), + "reform_applied": reform_applied, + "structural_reform_applied": structural_reform_applied, + "filters_applied": filters_applied, + "row_count": row_count, + "weighted_count": weighted_count, + "result": result, + "available_columns": all_cols, + } + diff --git a/backend/tooling/reforms.py b/backend/tooling/reforms.py new file mode 100644 index 0000000..b0480cf --- /dev/null +++ b/backend/tooling/reforms.py @@ -0,0 +1,196 @@ +"""Parametric reform validation and compiled-policy construction.""" + +from typing import Any, Dict, List, Optional, Tuple + +from tooling.simulations import ensure_compiled_package_importable + + +DEFAULT_VALID_PROGRAMS = [ + "income_tax", + "national_insurance", + "universal_credit", + "child_benefit", + "state_pension", + "pension_credit", + "benefit_cap", + "housing_benefit", + "tax_credits", + "scottish_child_payment", + "stamp_duty", + "capital_gains_tax", + "wealth_tax", +] + +class ReformValidationError(ValueError): + """Validation error carrying JSON-friendly reform errors.""" + + def __init__(self, errors: List[Dict[str, str]]): + self.errors = errors + message = errors[0]["message"] if errors else "Invalid reform" + super().__init__(message) + + +def _parameter_classes(): + ensure_compiled_package_importable() + from policyengine_uk_compiled import ( + BenefitCapParams, + CapitalGainsTaxParams, + ChildBenefitParams, + HousingBenefitParams, + IncomeTaxParams, + NationalInsuranceParams, + PensionCreditParams, + ScottishChildPaymentParams, + StampDutyBand, + StampDutyParams, + StatePensionParams, + TaxCreditsParams, + UniversalCreditParams, + WealthTaxParams, + ) + + return ( + { + "income_tax": IncomeTaxParams, + "national_insurance": NationalInsuranceParams, + "universal_credit": UniversalCreditParams, + "child_benefit": ChildBenefitParams, + "state_pension": StatePensionParams, + "pension_credit": PensionCreditParams, + "benefit_cap": BenefitCapParams, + "housing_benefit": HousingBenefitParams, + "tax_credits": TaxCreditsParams, + "scottish_child_payment": ScottishChildPaymentParams, + "stamp_duty": StampDutyParams, + "capital_gains_tax": CapitalGainsTaxParams, + "wealth_tax": WealthTaxParams, + }, + StampDutyParams, + StampDutyBand, + ) + + +def get_valid_programs() -> List[str]: + try: + param_cls_map, _, _ = _parameter_classes() + except ModuleNotFoundError: + return DEFAULT_VALID_PROGRAMS + return list(param_cls_map) + + +def build_reform_schema(valid_programs: Optional[List[str]] = None) -> Dict[str, Any]: + programs = valid_programs or get_valid_programs() + return { + "type": "object", + "description": ( + "Parametric reform. Top-level keys are programmes; values are the " + "parameter changes for that programme. Valid programmes include " + f"{', '.join(programs[:-1])}, and {programs[-1]}. " + "Field names within each programme match the corresponding *Params " + "constructor. For structural reforms, use run_python instead." + ), + "additionalProperties": True, + } + + +REFORM_SCHEMA = build_reform_schema() + + +def normalise_reform( + reform: Optional[Dict[str, Any]], +) -> Tuple[Dict[str, Dict[str, Any]], Dict[str, Any]]: + """Validate and normalize a reform dict, returning JSON and model objects.""" + if not reform: + return {}, {} + if not isinstance(reform, dict): + raise ReformValidationError( + [{"path": "reform", "message": f"Reform must be a dict, got {type(reform).__name__}"}] + ) + + param_cls_map, stamp_duty_cls, stamp_duty_band_cls = _parameter_classes() + normalized: Dict[str, Dict[str, Any]] = {} + model_kwargs: Dict[str, Any] = {} + errors: List[Dict[str, str]] = [] + + for program, fields in reform.items(): + if program not in param_cls_map: + errors.append( + { + "path": str(program), + "message": f"Unknown reform program '{program}'. Valid: {list(param_cls_map)}", + } + ) + continue + if not isinstance(fields, dict): + errors.append( + { + "path": str(program), + "message": f"Reform program '{program}' must be a dict, got {type(fields).__name__}", + } + ) + continue + + cls = param_cls_map[program] + valid_fields = set(cls.model_fields) + unknown = sorted(k for k in fields if k not in valid_fields and fields[k] is not None) + if unknown: + for field in unknown: + errors.append( + { + "path": f"{program}.{field}", + "message": ( + f"Unknown field(s) for '{program}': {unknown}. " + f"Valid: {sorted(valid_fields)}" + ), + } + ) + continue + + cleaned_fields = {k: v for k, v in fields.items() if v is not None} + model_fields = dict(cleaned_fields) + if cls is stamp_duty_cls and "bands" in model_fields: + model_fields["bands"] = [ + stamp_duty_band_cls(**band) if isinstance(band, dict) else band + for band in model_fields["bands"] + ] + try: + model_kwargs[program] = cls(**model_fields) + except Exception as exc: + errors.append({"path": str(program), "message": f"{type(exc).__name__}: {exc}"}) + continue + if cleaned_fields: + normalized[program] = cleaned_fields + + if errors: + raise ReformValidationError(errors) + return normalized, model_kwargs + + +def build_compiled_policy(reform: Optional[Dict[str, Any]]): + normalized, model_kwargs = normalise_reform(reform) + if not normalized: + return None + ensure_compiled_package_importable() + from policyengine_uk_compiled import Parameters + + return Parameters(**model_kwargs) + + +def validate_reform_dict(reform: Optional[Dict[str, Any]]) -> Dict[str, Any]: + try: + normalized, _ = normalise_reform(reform) + except ReformValidationError as exc: + return {"valid": False, "errors": exc.errors, "valid_programs": get_valid_programs()} + except Exception as exc: + return { + "valid": False, + "errors": [{"path": "reform", "message": f"{type(exc).__name__}: {exc}"}], + "valid_programs": get_valid_programs(), + } + + return { + "valid": True, + "normalized_reform": normalized, + "programs": list(normalized), + "warnings": [], + } diff --git a/backend/tooling/sandbox.py b/backend/tooling/sandbox.py new file mode 100644 index 0000000..c8b82b1 --- /dev/null +++ b/backend/tooling/sandbox.py @@ -0,0 +1,263 @@ +"""Restricted Python execution helpers used by chat tools.""" + +import builtins as _builtins +import json +import math +from typing import Any, Callable, Dict, List, Optional + +from tooling.serialization import json_safe +from tooling.simulations import ensure_compiled_package_importable + + +ALLOWED_IMPORT_ROOTS = {"json", "math", "numpy", "pandas"} + + +def safe_import(name, globals=None, locals=None, fromlist=(), level=0): + root_name = name.split(".")[0] + if root_name not in ALLOWED_IMPORT_ROOTS: + raise ImportError(f"Import of '{name}' is not allowed") + return __import__(name, globals, locals, fromlist, level) + + +def safe_builtins(names, print_func: Optional[Callable[..., None]] = None, allow_import: bool = False): + builtins = {name: getattr(_builtins, name) for name in names if hasattr(_builtins, name)} + if print_func is not None: + builtins["print"] = print_func + if allow_import: + builtins["__import__"] = safe_import + return builtins + + +def optional_numpy(): + try: + import numpy as np + except ImportError: + return None + return np + + +def compile_structural_hook(code: str): + """Compile a structural hook from code defining hook(...).""" + safe_names = ( + "range", + "len", + "int", + "float", + "str", + "bool", + "list", + "dict", + "tuple", + "set", + "zip", + "enumerate", + "map", + "filter", + "sorted", + "reversed", + "min", + "max", + "sum", + "abs", + "round", + "True", + "False", + "None", + "isinstance", + "ValueError", + "TypeError", + "print", + "any", + "all", + "pow", + "divmod", + ) + try: + import pandas as pd + except ImportError as exc: + raise ImportError("pandas is required for structural reform hooks") from exc + + allowed_globals: Dict[str, Any] = { + "__builtins__": safe_builtins(safe_names), + "math": math, + "json": json, + "pd": pd, + } + np = optional_numpy() + if np is not None: + allowed_globals["np"] = np + allowed_globals["numpy"] = np + + exec(code, allowed_globals) + hook = allowed_globals.get("hook") + if hook is None or not callable(hook): + raise ValueError("Structural hook code must define a callable `hook(year, persons, benunits, households)`") + return hook + + +def build_structural_reform(structural_reform: Optional[Dict[str, Any]]): + if not structural_reform: + return None + if not isinstance(structural_reform, dict): + raise ValueError(f"structural_reform must be a dict, got {type(structural_reform).__name__}") + + unknown = set(structural_reform) - {"pre", "post"} + if unknown: + raise ValueError(f"Unknown structural_reform field(s): {sorted(unknown)}. Valid: ['pre', 'post']") + + ensure_compiled_package_importable() + from policyengine_uk_compiled import StructuralReform + + pre = structural_reform.get("pre") + post = structural_reform.get("post") + if pre is not None and not isinstance(pre, str): + raise ValueError("structural_reform.pre must be a string of Python code defining hook(...)") + if post is not None and not isinstance(post, str): + raise ValueError("structural_reform.post must be a string of Python code defining hook(...)") + + return StructuralReform( + pre=compile_structural_hook(pre) if pre else None, + post=compile_structural_hook(post) if post else None, + ) + + +def run_python_code(code: str) -> Dict[str, Any]: + ensure_compiled_package_importable() + import pandas as pd + import policyengine_uk_compiled as pe + from policyengine_uk_compiled import ( + Parameters, + Simulation, + StructuralReform, + aggregate_microdata, + capabilities, + combine_microdata, + ensure_dataset, + ) + + safe_names = ( + "range", + "len", + "int", + "float", + "str", + "bool", + "list", + "dict", + "tuple", + "set", + "zip", + "enumerate", + "map", + "filter", + "sorted", + "reversed", + "min", + "max", + "sum", + "abs", + "round", + "True", + "False", + "None", + "isinstance", + "ValueError", + "TypeError", + "Exception", + "print", + "any", + "all", + "pow", + "divmod", + "complex", + "type", + "dir", + "hasattr", + "getattr", + ) + output_lines: List[str] = [] + + def safe_print(*args, **kwargs): + output_lines.append(" ".join(str(arg) for arg in args)) + + allowed_globals: Dict[str, Any] = { + "__builtins__": safe_builtins(safe_names, print_func=safe_print, allow_import=True), + "math": math, + "json": json, + "pd": pd, + "pe": pe, + "Simulation": Simulation, + "StructuralReform": StructuralReform, + "Parameters": Parameters, + "aggregate_microdata": aggregate_microdata, + "combine_microdata": combine_microdata, + "capabilities": capabilities, + "ensure_dataset": ensure_dataset, + } + np = optional_numpy() + if np is not None: + allowed_globals["np"] = np + allowed_globals["numpy"] = np + + try: + exec(code, allowed_globals) + except Exception as exc: + return {"error": f"{type(exc).__name__}: {exc}"} + + result = allowed_globals.get("result", None) + response: Dict[str, Any] = {} + if result is not None: + response["result"] = json_safe(result) + if output_lines: + response["output"] = "\n".join(output_lines) + if not response: + response["result"] = None + response["note"] = "No 'result' variable was set and nothing was printed." + return response + + +def run_generator(code: str) -> Dict[str, Any]: + """Execute a Python generator snippet that returns a dict of tool kwargs.""" + safe_names = ( + "range", + "len", + "int", + "float", + "str", + "bool", + "list", + "dict", + "tuple", + "set", + "zip", + "enumerate", + "map", + "filter", + "sorted", + "reversed", + "min", + "max", + "sum", + "abs", + "round", + "True", + "False", + "None", + "isinstance", + "ValueError", + "TypeError", + "append", + ) + allowed_globals: Dict[str, Any] = { + "__builtins__": safe_builtins(safe_names), + "math": math, + "json": json, + } + exec(code, allowed_globals) + if "generate" not in allowed_globals: + raise ValueError("Generator code must define a `generate()` function") + result = allowed_globals["generate"]() + if not isinstance(result, dict): + raise ValueError(f"generate() must return a dict, got {type(result).__name__}") + return result + diff --git a/backend/tooling/serialization.py b/backend/tooling/serialization.py new file mode 100644 index 0000000..ecff326 --- /dev/null +++ b/backend/tooling/serialization.py @@ -0,0 +1,96 @@ +"""Serialization helpers for tool outputs.""" + +from typing import Any, Dict, List + + +def json_safe(obj: Any) -> Any: + try: + import numpy as np + except ImportError: + np = None + + try: + import pandas as pd + except ImportError: + pd = None + + if obj is None or isinstance(obj, (str, int, float, bool)): + return obj + if np is not None: + if isinstance(obj, np.ndarray): + return obj.tolist() + if isinstance(obj, np.integer): + return int(obj) + if isinstance(obj, np.floating): + return float(obj) + if isinstance(obj, np.bool_): + return bool(obj) + if pd is not None: + if isinstance(obj, pd.DataFrame): + return obj.to_dict(orient="records") + if isinstance(obj, pd.Series): + return obj.to_list() + if isinstance(obj, dict): + return {str(k): json_safe(v) for k, v in obj.items()} + if isinstance(obj, (list, tuple, set)): + return [json_safe(v) for v in obj] + if hasattr(obj, "model_dump") and callable(obj.model_dump): + return json_safe(obj.model_dump()) + if hasattr(obj, "dict") and callable(obj.dict): + return json_safe(obj.dict()) + try: + import dataclasses + + if dataclasses.is_dataclass(obj): + return json_safe(dataclasses.asdict(obj)) + except Exception: + pass + return str(obj) + + +def dataframe_to_records(df) -> List[Dict[str, Any]]: + return [ + { + key: ( + None + if (hasattr(value, "__class__") and value.__class__.__name__ == "float" and str(value) == "nan") + else value + ) + for key, value in row.items() + } + for row in df.to_dict(orient="records") + ] + + +def explore_tabular_data(data: List[Dict[str, Any]], max_unique_values: int = 20) -> Dict[str, Any]: + if not data or not isinstance(data[0], dict): + return {"error": "Data must be a non-empty list of dicts", "row_count": 0, "columns": []} + row_count = len(data) + all_keys = set() + for row in data: + all_keys.update(row.keys()) + columns = [] + for key in sorted(all_keys): + values = [row.get(key) for row in data] + sample_type = next((type(v).__name__ for v in values if v is not None), "unknown") + unique_values = list(set(v for v in values if v is not None)) + unique_count = len(unique_values) + col_info = { + "name": key, + "type": sample_type, + "unique_count": unique_count, + "null_count": sum(1 for v in values if v is None), + } + if unique_count <= max_unique_values: + try: + col_info["unique_values"] = sorted(unique_values) + except TypeError: + col_info["unique_values"] = unique_values + if sample_type in ("int", "float"): + numeric = [v for v in values if isinstance(v, (int, float))] + if numeric: + col_info["min"] = min(numeric) + col_info["max"] = max(numeric) + columns.append(col_info) + return {"row_count": row_count, "columns": columns} + diff --git a/backend/tooling/simulations.py b/backend/tooling/simulations.py new file mode 100644 index 0000000..813302f --- /dev/null +++ b/backend/tooling/simulations.py @@ -0,0 +1,60 @@ +"""PolicyEngine UK compiled-package and simulation helpers.""" + +from pathlib import Path +import sys +from typing import Any, Dict + + +DATASET_LABELS = { + "frs": "Family Resources Survey", + "efrs": "Enhanced FRS", + "spi": "Survey of Personal Incomes", + "lcfs": "Living Costs and Food Survey", + "was": "Wealth and Assets Survey", +} + + +def ensure_compiled_package_importable() -> None: + """Make the local policyengine_uk_compiled package importable in dev setups.""" + try: + import policyengine_uk_compiled # noqa: F401 + return + except ModuleNotFoundError: + pass + + repo_parent = Path(__file__).resolve().parents[3] + candidates = [ + repo_parent / "policyengine-uk-rust" / "interfaces" / "python", + repo_parent / "policyengine-uk-rust-codex-debug-issue" / "interfaces" / "python", + ] + for candidate in candidates: + if candidate.is_dir(): + candidate_str = str(candidate) + if candidate_str not in sys.path: + sys.path.insert(0, candidate_str) + try: + import policyengine_uk_compiled # noqa: F401 + return + except ModuleNotFoundError: + continue + + raise ModuleNotFoundError( + "policyengine_uk_compiled is not importable. Install the package or make sure a local " + "policyengine-uk-rust checkout with interfaces/python is available." + ) + + +def build_simulation(year: int, dataset: str = "frs"): + """Build a compiled PolicyEngine UK Simulation.""" + ensure_compiled_package_importable() + from policyengine_uk_compiled import Simulation + + return Simulation(year=year, dataset=dataset) + + +def get_capabilities() -> Dict[str, Any]: + ensure_compiled_package_importable() + from policyengine_uk_compiled import capabilities + + return capabilities() +