Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions src/isp_workbook_parser/config_model.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

from pathlib import Path
from typing import Dict, List, Optional

import yaml
from pydantic import BaseModel
Expand Down Expand Up @@ -49,11 +50,11 @@ class TableConfig(BaseModel):

name: str
sheet_name: str
header_rows: int | List[int]
header_rows: int | list[int]
end_row: int
column_range: str
skip_rows: Optional[int | List[int] | Dict[str, int]] = None
columns_with_merged_rows: Optional[str | List[str]] = None
skip_rows: int | list[int] | dict[str, int] | None = None
columns_with_merged_rows: str | list[str] | None = None
forward_fill_values: bool = True


Expand Down Expand Up @@ -118,7 +119,7 @@ def load_yaml(path: Path) -> dict[str, TableConfig]:
path: pathlib Path instance specifying the location of the YAML file.

"""
with open(path, "r") as f:
with Path(path).open() as f:
config = yaml.safe_load(f)
f.close()
if config is not None:
Expand Down
87 changes: 43 additions & 44 deletions src/isp_workbook_parser/parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import glob
import os
from __future__ import annotations

import warnings
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -107,23 +107,21 @@ def _determine_config_path(

def _check_version_is_supported(self, config_path) -> None:
"""Check the default config directory contains a subdirectory that matches the workbook version number."""
versions = os.listdir(config_path)
versions = Path(config_path).iterdir()
if self.workbook_version not in versions:
raise ValueError(
f"The workbook version {self.workbook_version} is not supported."
)
msg = f"The workbook version {self.workbook_version} is not supported."
raise ValueError(msg)

def _load_config(self) -> dict[str, dict[str, Any]]:
"""Load all the YAML files stored in the config directory into a nested dictionary with sheet names as keys
and table names as second level keys. For robustness across workbook versions, the config sheet name
is matched with a workbook sheet name in case-agnostic manner.
"""
pattern = os.path.join(self.config_path, "*.yaml")
config_files = glob.glob(pattern)
config_files = Path(self.config_path).glob("*.yaml")
configs = {}
for file in config_files:
config_dict = load_yaml(Path(file))
for config_name in config_dict.keys():
for config_name in config_dict:
config = config_dict[config_name]
config_sheet_name_lowercase = config.sheet_name.lower()
sheet_names = [
Expand All @@ -132,15 +130,14 @@ def _load_config(self) -> dict[str, dict[str, Any]]:
if sheet_name.lower() == config_sheet_name_lowercase
]
if len(sheet_names) > 1:
raise TableConfigError(
f"Workbook sheet '{config.sheet_name}' is not unique"
)
elif len(sheet_names) < 1:
raise TableConfigError(
msg = f"Workbook sheet '{config.sheet_name}' is not unique"
raise TableConfigError(msg)
if len(sheet_names) < 1:
msg = (
f" Sheet '{config.sheet_name}' cannot be found in the workbook"
)
else:
config.sheet_name = sheet_names.pop()
raise TableConfigError(msg)
config.sheet_name = sheet_names.pop()
config_dict[config_name] = config
configs.update(config_dict)
return configs
Expand All @@ -157,15 +154,15 @@ def _get_table_names_by_sheet(self):
return sorted_table_names_by_sheet

def _check_data_ends_where_expected(
self, tab: str, end_row: int, range: str, name: str
self, tab: str, end_row: int, cell_range: str, name: str
) -> None:
"""Check that the cell after the last row of the table in the second column is blank.

While there are often notes on the data in the first cell after the first column ends, the first cell after the
second column ends appears to be always blank. Therefore, checking that this cell is blank can be used to verify
that the config has not specified a table end row that is before the actual last row of the table.
"""
first_column = range.split(":")[0]
first_column = cell_range.split(":", maxsplit=1)[0]
first_col_index = openpyxl.utils.column_index_from_string(first_column)
second_col_index = first_col_index + 1
# We check that value in the second column is blank because sometime the row after the first column will
Expand All @@ -181,15 +178,15 @@ def _check_data_ends_where_expected(
raise TableConfigError(error_message)

def _check_no_data_above_first_header_row(
self, tab: str, header_rows: int, range: str, name: str
self, tab: str, header_rows: int, cell_range: str, name: str
) -> None:
"""Check that the cell before the first header row of the table in the second column is blank.

While there are often notes on the data in the first cell above the first column, the first cell above the
second column appears to be always blank. Therefore, checking that this cell is blank can be used to verify
that the config has not specified a table header row that is after the first header row of the table.
"""
first_column = range.split(":")[0]
first_column = cell_range.split(":", maxsplit=1)[0]
first_col_index = openpyxl.utils.column_index_from_string(first_column)
second_col_index = first_col_index + 1

Expand Down Expand Up @@ -270,15 +267,15 @@ def _check_columns_unique(data: pd.DataFrame, name: str) -> None:
raise TableConfigError(error_message)

def _check_for_missed_column_on_right_hand_side_of_table(
self, sheet_name: str, start_row: int, end_row: int, range: str, name: str
self, sheet_name: str, start_row: int, end_row: int, cell_range: str, name: str
) -> None:
"""Checks if there is data in the column adjacent to last column specified in the config.

It appears that the column adjacent to the last column in a table is always blank. Therefore, checking if
there is data in the adjacent column can help detect when the column range in the config has been incorrectly
specified.
"""
last_column = range.split(":")[1]
last_column = cell_range.split(":")[1]
last_col_index = openpyxl.utils.column_index_from_string(last_column)
column_next_to_last_column = openpyxl.utils.get_column_letter(
last_col_index + 1
Expand Down Expand Up @@ -310,15 +307,15 @@ def _check_for_missed_column_on_right_hand_side_of_table(
raise TableConfigError(error_message)

def _check_for_missed_column_on_left_hand_side_of_table(
self, sheet_name: str, start_row: int, end_row: int, range: str, name: str
self, sheet_name: str, start_row: int, end_row: int, cell_range: str, name: str
) -> None:
"""Checks if there is data in the column adjacent to first column specified in the config.

It appears that the column adjacent to the first column in a table is always blank. Therefore, checking if
there is data in the adjacent column can help detect when the column range in the config has been incorrectly
specified.
"""
first_column = range.split(":")[0]
first_column = cell_range.split(":", maxsplit=1)[0]
first_col_index = openpyxl.utils.column_index_from_string(first_column)
column_next_to_first_column = openpyxl.utils.get_column_letter(
first_col_index - 1
Expand All @@ -335,10 +332,9 @@ def _check_for_missed_column_on_left_hand_side_of_table(
usecols=column_next_to_first_column,
nrows=(end_row - start_row),
)
if data[data.columns[0]].isna().all():
range_error = False
elif (
"DO NOT DELETE THIS COLUMN" in str(data.columns[0])
if (
data[data.columns[0]].isna().all()
or "DO NOT DELETE THIS COLUMN" in str(data.columns[0])
or first_column == "B"
):
range_error = False
Expand Down Expand Up @@ -459,7 +455,7 @@ def _postprocess_percentage_columns_between_0_and_100(
if isinstance(sr, list) and cell.row in sr:
skipped_rows += 1
continue
elif isinstance(sr, int) and cell.row == sr:
if isinstance(sr, int) and cell.row == sr:
skipped_rows += 1
continue
if isinstance(cell.value, (int, float)) and "%" in cell.number_format:
Expand All @@ -475,7 +471,7 @@ def _postprocess_percentage_columns_between_0_and_100(
# add the data column index if the entire column consists of percentage values
# else, add the individual cells as a list of tuples
if len(percentage_cells) == (table_config.end_row - min_row + 1):
percentage_columns.append(set(x[1] for x in percentage_cells).pop())
percentage_columns.append({x[1] for x in percentage_cells}.pop())
else:
percentage_columns.append(percentage_cells)

Expand Down Expand Up @@ -505,7 +501,7 @@ def get_table_names(self) -> list[str]:
return self.table_names_by_sheet

def get_table_from_config(
self, table_config: TableConfig, config_checks: bool = True
self, table_config: TableConfig, *, config_checks: bool = True
) -> pd.DataFrame:
"""Retrieves a table from the assumptions workbook using the config provided and returns as pd.DataFrame.

Expand Down Expand Up @@ -556,7 +552,7 @@ def get_table_from_config(
self._check_table(data, table_config)
return data

def get_table(self, table_name: str, config_checks: bool = True) -> pd.DataFrame:
def get_table(self, table_name: str, *, config_checks: bool = True) -> pd.DataFrame:
"""Retrieves a table from the assumptions workbook and returns as `pd.DataFrame`.

Examples
Expand All @@ -578,22 +574,24 @@ def get_table(self, table_name: str, config_checks: bool = True) -> pd.DataFrame
starts and ends where expected and the workbook header matches the config header.
"""
if not isinstance(table_name, str):
raise ValueError("The parameter table_name must be provided as a string.")
if table_name not in self.table_configs.keys():
msg = "The parameter table_name must be provided as a string."
raise TypeError(msg)
if table_name not in self.table_configs:
closest = process.extractOne(table_name, self.table_configs.keys())[0]
raise ValueError(
msg = (
f"The table_name ({table_name}) provided is not in the config for this workbook version."
+ f" Did you mean '{closest}'?"
f" Did you mean '{closest}'?"
)
raise ValueError(msg)

table_config = self.table_configs[table_name]
data = self.get_table_from_config(table_config, config_checks=config_checks)
return data
return self.get_table_from_config(table_config, config_checks=config_checks)

def save_tables(
self,
directory: str | Path,
tables: list[str] | str = "all",
*,
config_checks: bool = True,
) -> None:
"""Saves tables from the provided workbook to the specified directory as CSV files.
Expand All @@ -619,18 +617,19 @@ def save_tables(
directory.mkdir(parents=True)

if not directory.is_dir():
raise ValueError("The path provided is not a directory.")
msg = "The path provided is not a directory."
raise ValueError(msg)

if not (isinstance(tables, str) or isinstance(tables, list)):
raise ValueError(
"The parameter tables must be provided as str or list[str]."
)
if not (isinstance(tables, (str, list))):
msg = "The parameter tables must be provided as str or list[str]."
raise TypeError(msg)

if isinstance(tables, str) and tables != "all":
raise ValueError(
msg = (
"If the parameter tables is provided as a str it must \n",
f"have the value 'all' but '{tables}' was provided.",
)
raise ValueError(msg)

if tables == "all":
tables = self.table_configs.keys()
Expand Down
Loading