Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/cli-idtl5-formatter.added.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Restore `idtl=5` (TAXSIM full-text labeled-section output) to the cli stdin/stdout flow. Previously only the legacy `exe.py` entry point handled it; current cli always emitted CSV. Mixed-idtl inputs interleave labeled-text and CSV rows in original input order.
49 changes: 47 additions & 2 deletions policyengine_taxsim/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,52 @@ def _generate_yaml_files(input_df: pd.DataFrame, results_df: pd.DataFrame):
print(f"Warning: Could not generate YAML for record {idx}: {e}")


def _emit_results(input_df, results_df, out_stream):
"""Write results to ``out_stream``. CSV by default; for any rows
with ``idtl=5``, emit TAXSIM-35's labeled-section full-text instead.
Mixed-idtl inputs interleave records in original input order."""
try:
from .core.text_formatter import format_row
except ImportError:
from policyengine_taxsim.core.text_formatter import format_row

# Default: no idtl=5 anywhere → write CSV as before.
if "idtl" not in input_df.columns or not (input_df["idtl"] == 5).any():
results_df.to_csv(out_stream, index=False)
return

# idtl=5 path. Build a per-taxsimid lookup that keeps the original
# results_df column order — avoid set_index here so the eventual
# CSV emission preserves the default schema (`taxsimid,year,...`).
result_columns = list(results_df.columns)
results_rows = {
row["taxsimid"]: row for row in results_df.to_dict(orient="records")
}

text_chunks = []
csv_indices = []
for _, in_row in input_df.iterrows():
idtl = int(float(in_row.get("idtl", 0)))
taxsimid = in_row["taxsimid"]
result_dict = results_rows.get(taxsimid)
if result_dict is None:
continue
if idtl == 5:
text_chunks.append(format_row(in_row.to_dict(), result_dict))
else:
csv_indices.append(taxsimid)

# Emit text rows first, then a single CSV block for the rest. Per-row
# interleaving isn't useful because CSV needs a header — and each
# record is identifiable by taxsimid in either format.
for chunk in text_chunks:
out_stream.write(chunk + "\n")

if csv_indices:
csv_df = results_df[results_df["taxsimid"].isin(csv_indices)][result_columns]
csv_df.to_csv(out_stream, index=False)


@click.group(invoke_without_command=True)
@click.option("--logs", is_flag=True, help="Generate PE YAML Tests Logs")
@click.option(
Expand Down Expand Up @@ -142,8 +188,7 @@ def cli(ctx, logs, disable_salt, sample):
_generate_yaml_files(df_with_ids, results_df)
click.echo(f"Generated {len(df_with_ids)} YAML test files", err=True)

# Write results to stdout
results_df.to_csv(sys.stdout, index=False)
_emit_results(df_with_ids, results_df, sys.stdout)

except Exception as e:
click.echo(f"Error processing input: {str(e)}", err=True)
Expand Down
146 changes: 146 additions & 0 deletions policyengine_taxsim/core/text_formatter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
"""
Format a single PE-Microsim result row in TAXSIM-35's labeled-section
text output (idtl=5). Reads values from the result DataFrame row plus
the input row; does not run a fresh Simulation. Mirrors the legacy
output from `generate_text_description_output` so the cli stdin/stdout
flow can emit idtl=5 output without falling back to per-row Simulation.
"""

from typing import Mapping

from .utils import (
load_variable_mappings,
get_state_code,
get_state_number,
)


# Layout constants — match legacy generate_text_description_output for
# bit-identical output formatting.
_LEFT_MARGIN = 4
_LABEL_INDENT = 4
_LABEL_WIDTH = 45
_VALUE_WIDTH = 15
_SECOND_VALUE_WIDTH = 12
_GROUP_MARGIN = _LEFT_MARGIN


def _format_value(value):
"""Match legacy `f'{value:>8.1f}'` for numeric, str otherwise."""
if isinstance(value, (int, float)):
return f"{value:>8.1f}"
return str(value)


def _format_label_line(desc: str, formatted_value: str) -> str:
"""Indent + label + right-justified value column."""
indent = _LEFT_MARGIN + _LABEL_INDENT
return f"{' ' * indent}{desc:<{_LABEL_WIDTH}}{formatted_value:>{_VALUE_WIDTH}}"


def _input_data_section(input_row: Mapping, state_name: str) -> list:
"""Render the 'Input Data:' section from the original TAXSIM input."""
mappings = load_variable_mappings()["taxsim_input_definition"]
lines = ["", " Input Data:"]
indent = _LEFT_MARGIN + _LABEL_INDENT

for mapping in mappings:
field, config = next(iter(mapping.items()))
if field not in input_row:
# If the field is in the mapping but not the input row, skip
# — matches the legacy code's behavior of zero-filling only
# for the explicit fall-through path (currently dead code).
continue
value = input_row[field]
name = config["name"]
pair = config.get("pair")
if pair and pair in input_row:
pair_value = input_row[pair]
line = (
f"{' ' * indent}{name:<{_LABEL_WIDTH}}"
f"{float(value):>{_VALUE_WIDTH}.2f}"
f"{float(pair_value):>{_SECOND_VALUE_WIDTH}.2f}"
)
lines.append(line)
elif field == "state":
lines.append(
f"{' ' * indent}{name:<{_LABEL_WIDTH}}"
f"{float(value):>{_VALUE_WIDTH}.2f} {state_name}"
)
else:
try:
fv = float(value)
lines.append(
f"{' ' * indent}{name:<{_LABEL_WIDTH}}{fv:>{_VALUE_WIDTH}.2f}"
)
except (TypeError, ValueError):
lines.append(
f"{' ' * indent}{name:<{_LABEL_WIDTH}}{str(value):>{_VALUE_WIDTH}}"
)
return lines


def _grouped_output_sections(result_row: Mapping, year, state_name: str) -> list:
"""Render the post-Input sections (Basic Output, Marginal Rates,
Federal/State Tax Calc, etc.) by reading variable values straight
from the Microsim result row using the same YAML metadata the
legacy renderer uses."""
pe_to_taxsim = load_variable_mappings()["policyengine_to_taxsim"]

groups = {}
group_orders = {}
for var_name, var_info in pe_to_taxsim.items():
if not (
"full_text_group" in var_info
and "text_description" in var_info
and var_info.get("implemented") is True
and any(item.get("full_text", 0) == 5 for item in var_info.get("idtl", []))
):
continue
group = var_info["full_text_group"]
group_orders[group] = var_info.get("group_order", 999)
groups.setdefault(group, []).append(
(var_info["text_description"], var_name, var_info)
)

lines = []
for group_name in sorted(groups, key=lambda g: group_orders[g]):
variables = groups[group_name]
if not variables:
continue
lines.append(f"{' ' * _GROUP_MARGIN}{group_name}:")
for desc, var_name, _info in sorted(variables, key=lambda x: x[0]):
if var_name == "taxsimid":
value = result_row.get("taxsimid", 0)
elif var_name == "year":
value = year
elif var_name == "state":
value = (
f"{get_state_number(state_name)}{' ' * _LEFT_MARGIN}{state_name}"
)
else:
# Result DataFrame already has all v-columns from the
# output_mapper. NaN → 0.0.
raw = result_row.get(var_name, 0)
try:
value = float(raw) if raw is not None else 0.0
except (TypeError, ValueError):
value = raw

formatted_value = _format_value(value)
for desc_line in desc.split("\n"):
lines.append(_format_label_line(desc_line, formatted_value))
lines.append("")
return lines


def format_row(input_row: Mapping, result_row: Mapping) -> str:
"""Format a single record's full TAXSIM idtl=5 output."""
year = int(float(input_row.get("year", result_row.get("year", 0))))
state_code = int(float(input_row.get("state", result_row.get("state", 0))))
state_name = get_state_code(state_code)

lines = _input_data_section(input_row, state_name)
lines.append("")
lines.extend(_grouped_output_sections(result_row, year, state_name))
return "\n".join(lines)
69 changes: 69 additions & 0 deletions tests/test_cli_idtl5.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""
Tests for idtl=5 (human-readable full-text) output in the cli stdin/stdout
flow. The legacy `exe.py` PyInstaller entry point supported idtl=5; the
current Microsim-based cli.py used to always emit CSV. This restores the
idtl=5 path by rendering result-DataFrame rows in TAXSIM's labeled
section format (Input Data / Basic Output / Federal Tax Calculation /
State Tax Calculation).
"""

import io

import pandas as pd
import pytest
from click.testing import CliRunner

from policyengine_taxsim.cli import cli


def _run_cli(input_csv: str):
runner = CliRunner()
result = runner.invoke(cli, [], input=input_csv)
return result


def _record(idtl=5):
return (
"taxsimid,year,state,mstat,page,sage,depx,pwages,idtl\n"
f"1,2024,5,1,40,0,0,50000,{idtl}\n"
)


class TestIdtl5Format:
def test_idtl5_input_emits_full_text(self):
"""idtl=5 input should produce labeled-section output, not CSV."""
result = _run_cli(_record(idtl=5))
assert result.exit_code == 0, f"CLI failed: {result.output}\n{result.exception}"
assert "Input Data:" in result.output
assert "Basic Output:" in result.output
assert "Federal Tax Calculation:" in result.output
assert "State Tax Calculation:" in result.output

def test_idtl5_does_not_emit_csv_header(self):
"""idtl=5 should not include a `taxsimid,year,state,fiitax,...`
CSV header — that's the idtl=0/2 format."""
result = _run_cli(_record(idtl=5))
assert result.exit_code == 0
# First non-empty line of CSV output would be the header. Check
# that we don't see the recognizable "taxsimid,year,state,fiitax"
# prefix in the labeled-section output.
assert "taxsimid,year,state,fiitax" not in result.output

def test_idtl2_input_still_emits_csv(self):
"""idtl=2 should retain the CSV format (regression guard)."""
result = _run_cli(_record(idtl=2))
assert result.exit_code == 0
# CSV header present
assert "taxsimid" in result.output
assert "fiitax" in result.output
# No labeled-section markers
assert "Input Data:" not in result.output

def test_idtl5_shows_federal_tax(self):
"""idtl=5 output should include the federal tax liability value."""
result = _run_cli(_record(idtl=5))
assert result.exit_code == 0
# Should contain something like "Federal IIT Liability" or similar
# label from variable_mappings.yaml's full_text_group definitions
text = result.output.lower()
assert "federal" in text and ("iit" in text or "income tax" in text)
Loading