Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/push.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ jobs:
HUGGING_FACE_TOKEN: ${{ secrets.HUGGING_FACE_TOKEN }}
- name: Build datasets
run: make data
env:
MODAL_CALIBRATE: "${{ secrets.MODAL_TOKEN_ID != '' && secrets.MODAL_TOKEN_SECRET != '' && '1' || '0' }}"
MODAL_TOKEN_ID: ${{ secrets.MODAL_TOKEN_ID }}
MODAL_TOKEN_SECRET: ${{ secrets.MODAL_TOKEN_SECRET }}
- name: Save calibration log (constituencies)
uses: actions/upload-artifact@v7
with:
Expand Down
1 change: 1 addition & 0 deletions changelog.d/279.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- Add optional Modal GPU calibration for trusted dataset builds while keeping pull-request builds on the CPU path.
289 changes: 247 additions & 42 deletions policyengine_uk_data/datasets/create_datasets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
logging.basicConfig(level=logging.INFO)


def _modal_calibration_requested() -> bool:
return os.environ.get("MODAL_CALIBRATE", "0") == "1"


def _get_positive_int_env(name: str, default: int) -> int:
raw_value = os.environ.get(name)
if raw_value is None:
Expand All @@ -24,6 +28,186 @@ def _get_positive_int_env(name: str, default: int) -> int:
return value


def _array_values(value):
return value.values if hasattr(value, "values") else value


def _dump_array(value) -> bytes:
import io

import numpy as np

buffer = io.BytesIO()
np.save(buffer, value)
return buffer.getvalue()


def _build_weights_init(dataset, area_count: int, r):
import numpy as np

areas_per_household = np.maximum(r.sum(axis=0), 1)
household_weights = dataset.household.household_weight.values
original_weights = np.log(
household_weights / areas_per_household
+ np.random.random(len(household_weights)) * 0.01
)
return np.ones((area_count, len(original_weights))) * original_weights


def _prepare_modal_calibration_payload(
dataset,
matrix_fn,
area_count: int,
m_national_bytes: bytes,
y_national_bytes: bytes,
):
import gc
import numpy as np

dataset_copy = dataset.copy()
matrix, y, r = matrix_fn(dataset_copy)
matrix_values = _array_values(matrix)
y_values = _array_values(y)
weights_init = _build_weights_init(dataset_copy, area_count, r)

payload = {
"matrix": _dump_array(matrix_values),
"y": _dump_array(np.nan_to_num(y_values, nan=0.0)),
"local_target_available": _dump_array(np.isfinite(y_values)),
"r": _dump_array(r),
"matrix_national": m_national_bytes,
"y_national": y_national_bytes,
"weights_init": _dump_array(weights_init),
}
log_inputs = (matrix.copy(), y.copy())

del dataset_copy, matrix, y, r, matrix_values, y_values, weights_init
gc.collect()

return payload, log_inputs


def _load_weight_checkpoint(weight_bytes: bytes):
import io

import numpy as np

return np.load(io.BytesIO(weight_bytes))


def _write_calibration_log(
checkpoints,
get_performance,
matrix,
y,
m_national,
y_national,
log_csv: str,
):
import pandas as pd

performance = pd.DataFrame()
for epoch, weight_bytes in checkpoints:
weights = _load_weight_checkpoint(weight_bytes)
performance_step = get_performance(
weights,
matrix,
y,
m_national,
y_national,
[],
)
performance_step["epoch"] = epoch
performance_step["loss"] = performance_step.rel_abs_error**2
performance_step["target_name"] = [
f"{area}/{metric}"
for area, metric in zip(performance_step.name, performance_step.metric)
]
performance = pd.concat([performance, performance_step], ignore_index=True)

performance.to_csv(log_csv, index=False)
return _load_weight_checkpoint(checkpoints[-1][1])


def _run_modal_calibrations(
frs,
epochs: int,
create_constituency_target_matrix,
create_local_authority_target_matrix,
create_national_target_matrix,
get_constituency_performance,
get_la_performance,
):
import gc

import h5py
import modal

from policyengine_uk_data.storage import STORAGE_FOLDER
from policyengine_uk_data.utils.modal_calibrate import app, run_calibration

m_national, y_national = create_national_target_matrix(frs.copy())
m_national_bytes = _dump_array(_array_values(m_national))
y_national_bytes = _dump_array(_array_values(y_national))

constituency_payload, (matrix_c, y_c) = _prepare_modal_calibration_payload(
dataset=frs,
matrix_fn=create_constituency_target_matrix,
area_count=650,
m_national_bytes=m_national_bytes,
y_national_bytes=y_national_bytes,
)
la_payload, (matrix_la, y_la) = _prepare_modal_calibration_payload(
dataset=frs,
matrix_fn=create_local_authority_target_matrix,
area_count=360,
m_national_bytes=m_national_bytes,
y_national_bytes=y_national_bytes,
)

with modal.enable_output(), app.run():
constituency_future = run_calibration.spawn(
**constituency_payload,
epochs=epochs,
)
la_future = run_calibration.spawn(
**la_payload,
epochs=epochs,
)
del constituency_payload, la_payload
gc.collect()

constituency_checkpoints = constituency_future.get()
la_checkpoints = la_future.get()

constituency_weights = _write_calibration_log(
checkpoints=constituency_checkpoints,
get_performance=get_constituency_performance,
matrix=matrix_c,
y=y_c,
m_national=m_national,
y_national=y_national,
log_csv="constituency_calibration_log.csv",
)
la_weights = _write_calibration_log(
checkpoints=la_checkpoints,
get_performance=get_la_performance,
matrix=matrix_la,
y=y_la,
m_national=m_national,
y_national=y_national,
log_csv="la_calibration_log.csv",
)

with h5py.File(STORAGE_FOLDER / "parliamentary_constituency_weights.h5", "w") as f:
f.create_dataset("2025", data=constituency_weights)

with h5py.File(STORAGE_FOLDER / "local_authority_weights.h5", "w") as f:
f.create_dataset("2025", data=la_weights)

return constituency_weights, la_weights


def main():
"""Create enhanced FRS dataset with rich progress tracking."""
try:
Expand All @@ -46,6 +230,7 @@ def main():
# Use reduced epochs and fidelity for testing
is_testing = os.environ.get("TESTING", "0") == "1"
epochs = 32 if is_testing else 512
use_modal_calibration = _modal_calibration_requested()
oa_clones = _get_positive_int_env(
"PE_UK_DATA_OA_CLONES",
2 if is_testing else 10,
Expand Down Expand Up @@ -153,13 +338,6 @@ def main():
update_dataset("Uprate to 2025", "completed")

# Calibrate constituency weights with nested progress

update_dataset("Calibrate constituency weights", "processing")

# Use a separate progress tracker for calibration with nested display
from policyengine_uk_data.utils.calibrate import (
calibrate_local_areas,
)
from policyengine_uk_data.datasets.local_areas.constituencies.loss import (
create_constituency_target_matrix,
)
Expand All @@ -169,48 +347,72 @@ def main():
from policyengine_uk_data.datasets.local_areas.constituencies.calibrate import (
get_performance,
)

# Run calibration with verbose progress
frs_calibrated_constituencies = calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_constituency_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=650,
weight_file="parliamentary_constituency_weights.h5",
excluded_training_targets=[],
log_csv="constituency_calibration_log.csv",
verbose=True, # Enable nested progress display
area_name="Constituency",
get_performance=get_performance,
nested_progress=nested_progress, # Pass the nested progress manager
)
update_dataset("Calibrate constituency weights", "completed")

from policyengine_uk_data.datasets.local_areas.local_authorities.calibrate import (
get_performance as get_la_performance,
)
from policyengine_uk_data.datasets.local_areas.local_authorities.loss import (
create_local_authority_target_matrix,
)

# Run calibration with verbose progress
update_dataset("Calibrate local authority weights", "processing")
calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_local_authority_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=360,
weight_file="local_authority_weights.h5",
excluded_training_targets=[],
log_csv="la_calibration_log.csv",
verbose=True, # Enable nested progress display
area_name="Local Authority",
get_performance=get_la_performance,
nested_progress=nested_progress, # Pass the nested progress manager
)
update_dataset("Calibrate local authority weights", "completed")
if use_modal_calibration:
update_dataset("Calibrate constituency weights", "processing")
update_dataset("Calibrate local authority weights", "processing")
constituency_weights, _ = _run_modal_calibrations(
frs=frs,
epochs=epochs,
create_constituency_target_matrix=create_constituency_target_matrix,
create_local_authority_target_matrix=create_local_authority_target_matrix,
create_national_target_matrix=create_national_target_matrix,
get_constituency_performance=get_performance,
get_la_performance=get_la_performance,
)
frs_calibrated_constituencies = frs.copy()
frs_calibrated_constituencies.household.household_weight = (
constituency_weights.sum(axis=0)
)
update_dataset("Calibrate constituency weights", "completed")
update_dataset("Calibrate local authority weights", "completed")
else:
# Use a separate progress tracker for calibration with nested display
from policyengine_uk_data.utils.calibrate import (
calibrate_local_areas,
)

# Run calibration with verbose progress
update_dataset("Calibrate constituency weights", "processing")
frs_calibrated_constituencies = calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_constituency_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=650,
weight_file="parliamentary_constituency_weights.h5",
excluded_training_targets=[],
log_csv="constituency_calibration_log.csv",
verbose=True, # Enable nested progress display
area_name="Constituency",
get_performance=get_performance,
nested_progress=nested_progress, # Pass the nested progress manager
)
update_dataset("Calibrate constituency weights", "completed")

# Run calibration with verbose progress
update_dataset("Calibrate local authority weights", "processing")
calibrate_local_areas(
dataset=frs,
epochs=epochs,
matrix_fn=create_local_authority_target_matrix,
national_matrix_fn=create_national_target_matrix,
area_count=360,
weight_file="local_authority_weights.h5",
excluded_training_targets=[],
log_csv="la_calibration_log.csv",
verbose=True, # Enable nested progress display
area_name="Local Authority",
get_performance=get_la_performance,
nested_progress=nested_progress, # Pass the nested progress manager
)
update_dataset("Calibrate local authority weights", "completed")

# Downrate and save
update_dataset("Downrate to 2023", "processing")
Expand Down Expand Up @@ -250,6 +452,9 @@ def main():
"tiny_enhanced_dataset": "enhanced_frs_2023_24_tiny.h5",
"imputations_applied": "consumption, wealth, VAT, services, income, capital_gains, salary_sacrifice, student_loan_plan",
"calibration": "national, LA and constituency targets",
"calibration_backend": (
"Modal GPU" if use_modal_calibration else "CPU"
),
},
)

Expand Down
Loading