Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,11 @@ def convert_cmd(input_path, output_path):
default="collection/resource.csv",
help="link to where the resource list is stored",
)
@click.argument("input-paths", nargs=-1, type=click.Path(exists=True))
@click.argument("input-dir", nargs=1, type=click.Path(exists=True))
@click.pass_context
def dataset_create_cmd(
ctx,
input_paths,
input_dir,
output_path,
organisation_path,
column_field_dir,
Expand All @@ -213,7 +213,7 @@ def dataset_create_cmd(
resource_path,
):
return dataset_create(
input_paths=input_paths,
input_dir=input_dir,
output_path=output_path,
organisation_path=organisation_path,
pipeline=ctx.obj["PIPELINE"],
Expand Down
3 changes: 2 additions & 1 deletion digital_land/command_arguments.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
# Custom decorators for common command arguments
import functools
from pathlib import Path

import click


def input_output_path(f):
arguments = [
click.argument("input-path", type=click.Path(exists=True)),
click.argument("output-path", type=click.Path(), default=""),
click.argument("output-path", type=click.Path(path_type=Path), default=""),
]
return functools.reduce(lambda x, arg: arg(x), reversed(arguments), f)

Expand Down
55 changes: 37 additions & 18 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ def pipeline_run(
pipeline,
specification,
input_path,
output_path,
output_path: Path,
collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date
null_path=None, # TBD: remove this
issue_dir=None,
Expand Down Expand Up @@ -398,9 +398,9 @@ def pipeline_run(
column_field_log.save(os.path.join(column_field_dir, resource + ".csv"))
dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv"))
converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv"))
# create converted parquet in the var directory
cache_dir = Path(organisation_path).parent
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
# create converted parquet in the var director
# TODO test without output_path conversation above to make sure we have a test that would've failed
transformed_parquet_dir = output_path.parent
transformed_parquet_dir.mkdir(exist_ok=True, parents=True)
convert_tranformed_csv_to_pq(
input_path=output_path,
Expand All @@ -412,7 +412,7 @@ def pipeline_run(
# build dataset from processed resources
#
def dataset_create(
input_paths,
input_dir,
output_path,
organisation_path,
pipeline,
Expand All @@ -424,19 +424,39 @@ def dataset_create(
cache_dir="var/cache",
resource_path="collection/resource.csv",
):
"""
Create a dataset package from transformed parquet files.

Builds both SQLite and Parquet dataset packages from transformed resources,
loading facts, entities, issues, and provenance information.

Args:
input_dir: Directory containing transformed parquet files
output_path: Path for the output SQLite database
organisation_path: Path to organisation.csv file
pipeline: Pipeline object containing configuration
dataset: Name of the dataset to create
specification: Specification object defining the dataset schema
issue_dir: Directory containing issue logs (default: "issue")
column_field_dir: Directory for column-field mappings (default: "var/column-field")
dataset_resource_dir: Directory for dataset-resource mappings (default: "var/dataset-resource")
cache_dir: Directory for caching intermediate files (default: "var/cache")
resource_path: Path to resource.csv file (default: "collection/resource.csv")
"""
# set level for logging to see what's going on
logger.setLevel(logging.INFO)
logging.getLogger("digital_land.package.dataset_parquet").setLevel(logging.INFO)

# chek all paths are paths
# check all paths are paths
issue_dir = Path(issue_dir)
column_field_dir = Path(column_field_dir)
dataset_resource_dir = Path(dataset_resource_dir)
cache_dir = Path(cache_dir)
resource_path = Path(resource_path)
input_dir = Path(input_dir)

# get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future
transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset
# input_dir

# create directory for dataset_parquet_package, will create a general provenance one for now
dataset_parquet_path = cache_dir / "provenance"
Expand All @@ -460,13 +480,12 @@ def dataset_create(
# don't use create as we don't want to create the indexes
package.create_database()
package.disconnect()
for path in input_paths:
path_obj = Path(path)
for path in input_dir.glob("*.parquet"):
logging.info(f"loading column field log into {output_path}")
package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv")
package.load_column_fields(column_field_dir / dataset / f"{path.stem}.csv")
logging.info(f"loading dataset resource log into {output_path}")
package.load_dataset_resource(
dataset_resource_dir / dataset / f"{path_obj.stem}.csv"
dataset_resource_dir / dataset / f"{path.stem}.csv"
)
logger.info(f"loading old entities into {output_path}")
old_entity_path = Path(pipeline.path) / "old-entity.csv"
Expand All @@ -476,8 +495,8 @@ def dataset_create(
logger.info(f"loading issues into {output_path}")
issue_paths = issue_dir / dataset
if issue_paths.exists():
for issue_path in os.listdir(issue_paths):
package.load_issues(os.path.join(issue_paths, issue_path))
for issue_path in issue_paths.glob("*.csv"):
package.load_issues(issue_path)
else:
logger.warning("No directory for this dataset in the provided issue_directory")

Expand All @@ -491,7 +510,7 @@ def dataset_create(
path=dataset_parquet_path,
specification_dir=None, # TBD: package should use this specification object
duckdb_path=cache_dir / "overflow.duckdb",
transformed_parquet_dir=transformed_parquet_dir,
transformed_parquet_dir=input_dir,
)
# To find facts we have a complex SQL window function that can cause memory issues. To aid the allocation of memory
# we decide on a parquet strategy, based on how many parquet files we have, the overall size of these
Expand All @@ -503,10 +522,10 @@ def dataset_create(

# Group parquet files into approx 256MB batches (if needed)
if pqpackage.strategy != "direct":
pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256)
pqpackage.load_facts(transformed_parquet_dir)
pqpackage.load_fact_resource(transformed_parquet_dir)
pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path)
pqpackage.group_parquet_files(input_dir, target_mb=256)
pqpackage.load_facts(input_dir)
pqpackage.load_fact_resource(input_dir)
pqpackage.load_entities(input_dir, resource_path, organisation_path)

logger.info("loading fact,fact_resource and entity into {output_path}")
pqpackage.load_to_sqlite(output_path)
Expand Down
46 changes: 36 additions & 10 deletions tests/acceptance/test_dataset_create.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

from digital_land.cli import cli

test_collection = "conservation-area"
test_dataset = "conservation-area"
TEST_COLLECTION = "conservation-area"
TEST_DATASET = "conservation-area"


@pytest.fixture(scope="session")
Expand Down Expand Up @@ -135,7 +135,7 @@ def cache_path(tmp_path):


@pytest.fixture
def input_paths(cache_path):
def input_dir(cache_path):
data_dicts = {"resource_1": transformed_1_data}
input_paths = []
directory = cache_path / "transformed_parquet" / "conservation-area"
Expand All @@ -148,7 +148,7 @@ def input_paths(cache_path):
logging.error(str(input_path))
input_paths.append(str(input_path))

return input_paths
return directory


@pytest.fixture
Expand Down Expand Up @@ -222,6 +222,32 @@ def dataset_dir(session_tmp_path):
def issue_dir(session_tmp_path):
issue_dir = session_tmp_path / "issue"
os.makedirs(issue_dir, exist_ok=True)

# Create test issue files for each dataset
dataset = TEST_DATASET
dataset_issue_dir = issue_dir / dataset
os.makedirs(dataset_issue_dir, exist_ok=True)

# Create a sample issue CSV file
issue_file = dataset_issue_dir / "test-resource.csv"
issue_data = {
"dataset": [dataset, dataset, dataset],
"resource": ["test-resource", "test-resource", "test-resource"],
"line-number": [2, 3, 4],
"entry-number": [1, 2, 3],
"field": ["name", "reference", "start-date"],
"entity": ["", "12345", ""],
"issue-type": ["missing value", "invalid format", "invalid date"],
"value": ["", "INVALID-REF", "2023-13-45"],
"message": [
"name field is required",
"reference format is invalid",
"date must be in format YYYY-MM-DD",
],
}
df = pd.DataFrame(issue_data)
df.to_csv(issue_file, index=False)

return issue_dir


Expand All @@ -237,24 +263,24 @@ def resource_path(session_tmp_path):
def test_acceptance_dataset_create(
session_tmp_path,
organisation_path,
input_paths,
input_dir,
issue_dir,
cache_path,
dataset_dir,
resource_path,
column_field_path,
dataset_resource_path,
):
output_path = dataset_dir / f"{test_dataset}.sqlite3"
output_path = dataset_dir / f"{TEST_DATASET}.sqlite3"

runner = CliRunner()
result = runner.invoke(
cli,
[
"--dataset",
str(test_dataset),
str(TEST_DATASET),
"--pipeline-dir",
str(f"tests/data/{test_collection}/pipeline"),
str(f"tests/data/{TEST_COLLECTION}/pipeline"),
"dataset-create",
"--output-path",
str(output_path),
Expand All @@ -270,8 +296,8 @@ def test_acceptance_dataset_create(
str(cache_path),
"--resource-path",
str(resource_path),
]
+ input_paths,
str(input_dir),
],
catch_exceptions=False,
)

Expand Down
Loading