Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 15 additions & 3 deletions digital_land/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
class API:
def __init__(
self,
specification: Specification,
specification: Specification = None,
url: str = DEFAULT_URL,
cache_dir: str = "var/cache",
):
Expand All @@ -36,13 +36,17 @@ def download_dataset(
overwrite: bool = False,
path: str = None,
extension: Extension = Extension.CSV,
builder: bool = False,
builder_name: str = None,
):
"""
Downloads a dataset in CSV or SQLite3 format.
- dataset: dataset name.
- overwrite: overwrite file is it already exists (otherwise will just return).
- path: file to download to (otherwise <cache-dir>/dataset/<dataset-name>.<extension>).
- extension: 'csv' or 'sqlite3', 'csv' by default.
- builder: downloads the dataset from the builder path
- builder_name: name to use for accessing the builder path
- Returns: None.
The file will be downloaded to the given path or cache, unless an exception occurs.

Expand All @@ -56,8 +60,16 @@ def download_dataset(

# different extensions require different urls and reading modes
if extension == self.Extension.SQLITE3:
collection = self.specification.dataset[dataset]["collection"]
url = f"{self.url}/{collection}-collection/dataset/{dataset}.sqlite3"
# performance.sqlite requires digital-land-builder path
if builder:
if not builder_name:
raise ValueError("Builder name must be provided when builder=True")
url = f"{self.url}/{builder_name}-builder/dataset/{dataset}.sqlite3"
else:
if self.specification is None:
raise ValueError("Specification must be provided")
collection = self.specification.dataset[dataset]["collection"]
url = f"{self.url}/{collection}-collection/dataset/{dataset}.sqlite3"
mode = "wb"

def get_content(response):
Expand Down
277 changes: 252 additions & 25 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
import geojson
from requests import HTTPError
import shapely
import numpy as np
import duckdb

from digital_land.package.organisation import OrganisationPackage
from digital_land.check import duplicate_reference_check
Expand Down Expand Up @@ -65,14 +67,18 @@
from digital_land.state import State
from digital_land.utils.add_data_utils import (
clear_log,
download_dataset,
get_column_field_summary,
get_transformed_entities,
get_entity_summary,
get_existing_endpoints_summary,
get_issue_summary,
get_updated_entities_summary,
is_date_valid,
is_url_valid,
get_user_response,
)
from digital_land.utils import functions_core as fc

from .register import hash_value
from .utils.gdal_utils import get_gdal_version
Expand Down Expand Up @@ -376,7 +382,11 @@ def pipeline_run(
),
)

issue_log = duplicate_reference_check(issues=issue_log, csv_path=output_path)
# In the FactCombinePhase, when combine_fields has some values, we check for duplicates and combine values.
# If we have done this then we will not call duplicate_reference_check as we have already carried out a
# duplicate check and stop messages appearing in issues about reference values not being unique
if combine_fields == {}:
issue_log = duplicate_reference_check(issues=issue_log, csv_path=output_path)

issue_log.apply_entity_map()
issue_log.save(os.path.join(issue_dir, resource + ".csv"))
Expand Down Expand Up @@ -928,38 +938,42 @@ def add_data(

add_data_cache_dir = cache_dir / "add_data"

output_path = (
add_data_cache_dir
/ "transformed/"
/ (endpoint_resource_info["resource"] + ".csv")
)

issue_dir = add_data_cache_dir / "issue/"
column_field_dir = add_data_cache_dir / "column_field/"
dataset_resource_dir = add_data_cache_dir / "dataset_resource/"
converted_resource_dir = add_data_cache_dir / "converted_resource/"
converted_dir = add_data_cache_dir / "converted/"
output_log_dir = add_data_cache_dir / "log/"
operational_issue_dir = add_data_cache_dir / "performance/ " / "operational_issue/"

output_path.parent.mkdir(parents=True, exist_ok=True)
issue_dir.mkdir(parents=True, exist_ok=True)
column_field_dir.mkdir(parents=True, exist_ok=True)
dataset_resource_dir.mkdir(parents=True, exist_ok=True)
converted_resource_dir.mkdir(parents=True, exist_ok=True)
converted_dir.mkdir(parents=True, exist_ok=True)
output_log_dir.mkdir(parents=True, exist_ok=True)
operational_issue_dir.mkdir(parents=True, exist_ok=True)

collection.load_log_items()
for dataset in endpoint_resource_info["pipelines"]:
pipeline = Pipeline(pipeline_dir, dataset)
specification = Specification(specification_dir)

issue_dir = add_data_cache_dir / "issue/" / dataset
column_field_dir = add_data_cache_dir / "column_field/" / dataset
dataset_resource_dir = add_data_cache_dir / "dataset_resource/" / dataset
converted_resource_dir = add_data_cache_dir / "converted_resource/"
converted_dir = add_data_cache_dir / "converted/"
output_log_dir = add_data_cache_dir / "log/"
operational_issue_dir = (
add_data_cache_dir / "performance/ " / "operational_issue/"
)
output_path = (
add_data_cache_dir
/ "transformed/"
/ dataset
/ (endpoint_resource_info["resource"] + ".csv")
)

output_path.parent.mkdir(parents=True, exist_ok=True)
issue_dir.mkdir(parents=True, exist_ok=True)
column_field_dir.mkdir(parents=True, exist_ok=True)
dataset_resource_dir.mkdir(parents=True, exist_ok=True)
converted_resource_dir.mkdir(parents=True, exist_ok=True)
converted_dir.mkdir(parents=True, exist_ok=True)
output_log_dir.mkdir(parents=True, exist_ok=True)
operational_issue_dir.mkdir(parents=True, exist_ok=True)
print("======================================================================")
print("Run pipeline")
print("======================================================================")
try:
pipeline_run(
dataset,
Pipeline(pipeline_dir, dataset),
pipeline,
Specification(specification_dir),
endpoint_resource_info["resource_path"],
output_path=output_path,
Expand Down Expand Up @@ -1109,6 +1123,11 @@ def add_data(
shutil.copy(cache_pipeline_dir / "lookup.csv", pipeline_dir / "lookup.csv")

# Now check for existing endpoints for this provision/organisation
print(
"\n======================================================================"
)
print("Retire old endpoints/sources")
print("======================================================================")
existing_endpoints_summary, existing_sources = get_existing_endpoints_summary(
endpoint_resource_info, collection, dataset
)
Expand All @@ -1128,6 +1147,48 @@ def add_data(
pd.DataFrame.from_records(sources_to_retire)
)

# Update dataset and view newly updated dataset
print(
"\n======================================================================"
)
print("Update dataset")
print("======================================================================")
if get_user_response(
f"""\nDo you want to view an updated {dataset} dataset with the newly added data?
\nNote this requires downloading the dataset if not already done so -
for some datasets this can take a while \n\n(yes/no): """
):
dataset_path = download_dataset(dataset, specification, cache_dir)
original_entities = get_transformed_entities(dataset_path, output_path)
print(f"Updating {dataset}.sqlite3 with new data...")
dataset_update(
input_paths=[output_path],
output_path=None,
organisation_path=organisation_path,
pipeline=pipeline,
dataset=dataset,
specification=specification,
issue_dir=os.path.split(issue_dir)[0],
column_field_dir=os.path.split(column_field_dir)[0],
dataset_resource_dir=os.path.split(dataset_resource_dir)[0],
dataset_path=dataset_path,
)
updated_entities = get_transformed_entities(dataset_path, output_path)
updated_entities_summary, diffs_df = get_updated_entities_summary(
original_entities, updated_entities
)
print(updated_entities_summary)
if diffs_df is not None:
diffs_path = (
add_data_cache_dir
/ dataset
/ "diffs"
/ f"{endpoint_resource_info['resource']}.csv"
)
os.makedirs(os.path.dirname(diffs_path))
diffs_df.to_csv(diffs_path)
print(f"\nDetailed breakdown found in file: {diffs_path}")


def add_endpoints_and_lookups(
csv_file_path,
Expand Down Expand Up @@ -1665,3 +1726,169 @@ def check_and_assign_entities(
):
return False
return True


def generate_provision_quality():
"""Generates a provision quality dataset and saves it as a parquet file"""
td = datetime.today().strftime("%Y-%m-%d")

api = API()

# Download the performance db using api
api.download_dataset(
"performance",
extension=api.Extension.SQLITE3,
builder=True,
builder_name="digital-land",
)

path_perf_db = Path(api.cache_dir) / "dataset" / "performance.sqlite3"

# Issue quality criteria lookup
specification_repo_url = (
"https://raw.githubusercontent.com/digital-land/specification/refs/heads/"
)
issue_type_url = f"{specification_repo_url}main/content/issue-type.csv"

lookup_issue_qual = duckdb.query(
f"""
SELECT
description,
"issue-type" AS issue_type,
name,
severity,
responsibility,
quality_criteria_level || ' - ' || quality_criteria as quality_criteria,
quality_criteria_level as quality_level
FROM read_csv('{issue_type_url}')
WHERE CAST(quality_criteria_level AS string) != ''
AND quality_criteria != ''
"""
).to_df()

# Transform data
provision = fc.query_sqlite(
path_perf_db,
"""
SELECT organisation, dataset, active_endpoint_count
FROM provision_summary
""",
)

# Extract issue count by provision from endpoint_dataset_issue_type_summary
qual_issue = fc.query_sqlite(
path_perf_db,
"""
SELECT
organisation, dataset,
'issue' as problem_source,
issue_type as problem_type,
sum(count_issues) as count
FROM endpoint_dataset_issue_type_summary
WHERE resource_end_date is not NULL
AND issue_type is not NULL
GROUP BY organisation, dataset, issue_type
""",
)

# Join on quality criteria and level from issue_type lookup (this restricts to only issues linked to a quality criteria)
qual_issue = qual_issue.merge(
lookup_issue_qual[["issue_type", "quality_criteria", "quality_level"]],
how="inner",
left_on="problem_type",
right_on="issue_type",
)
qual_issue = qual_issue.drop(columns="issue_type")

# IDENTIFY PROBLEMS - expectations - entity beyond LPA bounds
s3_uri = "s3://development-collection-data/log/expectation/dataset=*/*.parquet"

qual_expectation_bounds = duckdb.query(
f"""
SELECT organisation, dataset, details
FROM read_parquet('{s3_uri}')
WHERE name = 'Check no entities are outside of the local planning authority boundary'
AND passed = 'False'
AND message not like '%error%'
"""
).to_df()
qual_expectation_bounds = qual_expectation_bounds.assign(
problem_source="expectation",
problem_type="entity outside of the local planning authority boundary",
count=[json.loads(v)["actual"] for v in qual_expectation_bounds["details"]],
quality_criteria="3 - entities within LPA boundary",
quality_level=3,
)
qual_expectation_bounds = qual_expectation_bounds.drop(columns="details")

# IDENTIFY PROBLEMS - expectations - entity beyond LPA bounds
qual_expectation_count = duckdb.query(
f"""
SELECT organisation, dataset, details
FROM read_parquet('{s3_uri}')
WHERE name = 'Check number of entities inside the local planning authority boundary matches the manual count'
AND passed = 'False'
AND message not like '%error%'
"""
).to_df()

qual_expectation_count = qual_expectation_count.assign(
problem_source="expectation",
problem_type="entity count doesn't match manual count",
count=[json.loads(v)["actual"] for v in qual_expectation_count["details"]],
quality_criteria="3 - conservation area entity count matches LPA",
quality_level=3,
)
qual_expectation_count.drop("details", axis=1, inplace=True)

# Combine all problem source tables, and aggregate to criteria level
qual_all_criteria = (
pd.concat([qual_issue, qual_expectation_bounds, qual_expectation_count])
.groupby(
["organisation", "dataset", "quality_criteria", "quality_level"],
as_index=False,
)
.agg(count_failures=("count", "sum"))
)

# Merge issues with the provision data
prov_qual_all = provision.merge(
qual_all_criteria, how="left", on=["organisation", "dataset"]
)

prov_qual_all["quality_level_for_sort"] = np.select(
[
(prov_qual_all["active_endpoint_count"] == 0),
(prov_qual_all["quality_level"].notnull()),
(prov_qual_all["active_endpoint_count"] > 0)
& (prov_qual_all["quality_level"].isnull()),
],
[0, prov_qual_all["quality_level"], 4],
)

level_map = {
4: "4. data that is trustworthy",
3: "3. data that is good for ODP",
2: "2. authoritative data from the LPA",
1: "1. some data",
0: "0. no score",
}

prov_quality = prov_qual_all.groupby(
["organisation", "dataset"], as_index=False, dropna=False
).agg(quality_level=("quality_level_for_sort", "min"))

prov_quality["quality"] = prov_quality["quality_level"].map(level_map)
prov_quality["notes"] = ""
prov_quality["end-date"] = ""
prov_quality["start-date"] = td
prov_quality["entry-date"] = td

# Output the results as a Parquet file
output_dir = Path("/tmp") / "performance" / "provision-quality" / f"entry-date={td}"
os.makedirs(output_dir, exist_ok=True)

output_file = output_dir / "provision-quality.parquet"
prov_quality.to_parquet(output_file, engine="pyarrow", index=False)

print(f"Provision quality dataset saved to: {output_file}")
Loading