From d7d0deb1ff5c5f1b1fd90b8f953007ff0966db6e Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:10:08 +0000 Subject: [PATCH 1/6] move parquets to the same transformed directory --- digital_land/commands.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 69e9285e..1ef3a970 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -398,9 +398,8 @@ def pipeline_run( column_field_log.save(os.path.join(column_field_dir, resource + ".csv")) dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv")) converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv")) - # create converted parquet in the var directory - cache_dir = Path(organisation_path).parent - transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset + # create converted parquet in the var director + transformed_parquet_dir = output_path.parent transformed_parquet_dir.mkdir(exist_ok=True, parents=True) convert_tranformed_csv_to_pq( input_path=output_path, @@ -436,7 +435,7 @@ def dataset_create( resource_path = Path(resource_path) # get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future - transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset + transformed_parquet_dir = Path(input_paths[0]).parent # create directory for dataset_parquet_package, will create a general provenance one for now dataset_parquet_path = cache_dir / "provenance" From e59fd8d3853d4420ee8e01771b4474f814f02ff3 Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Tue, 23 Dec 2025 17:28:09 +0000 Subject: [PATCH 2/6] make sure output_path is a path object --- digital_land/commands.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 1ef3a970..2bd0aba7 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -218,7 +218,7 @@ def pipeline_run( pipeline, specification, input_path, - output_path, + output_path: Path, collection_dir, # TBD: remove, replaced by endpoints, organisations and entry_date null_path=None, # TBD: remove this issue_dir=None, @@ -240,6 +240,7 @@ def pipeline_run( ): # set up paths cache_dir = Path(cache_dir) + output_path = Path(output_path) if resource is None: resource = resource_from_path(input_path) @@ -399,6 +400,7 @@ def pipeline_run( dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv")) converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv")) # create converted parquet in the var director + # TODO test without output_path conversation above to make sure we have a test that would've failed transformed_parquet_dir = output_path.parent transformed_parquet_dir.mkdir(exist_ok=True, parents=True) convert_tranformed_csv_to_pq( From cc383a6f48fc13269edc9ef5608cf898c62e1e4f Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Mon, 29 Dec 2025 13:08:07 +0000 Subject: [PATCH 3/6] add acceptance test for running the pipeline command --- digital_land/command_arguments.py | 3 +- digital_land/commands.py | 48 ++++--- tests/acceptance/test_pipeline_command.py | 149 ++++++++++++++++++++++ 3 files changed, 184 insertions(+), 16 deletions(-) create mode 100644 tests/acceptance/test_pipeline_command.py diff --git a/digital_land/command_arguments.py b/digital_land/command_arguments.py index d0a63ee0..2f5e2853 100644 --- a/digital_land/command_arguments.py +++ b/digital_land/command_arguments.py @@ -1,5 +1,6 @@ # Custom decorators for common command arguments import functools +from pathlib import Path import click @@ -7,7 +8,7 @@ def input_output_path(f): arguments = [ click.argument("input-path", type=click.Path(exists=True)), - click.argument("output-path", type=click.Path(), default=""), + click.argument("output-path", type=click.Path(path_type=Path), default=""), ] return functools.reduce(lambda x, arg: arg(x), reversed(arguments), f) diff --git a/digital_land/commands.py b/digital_land/commands.py index 2bd0aba7..5e6f6bb6 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -240,7 +240,6 @@ def pipeline_run( ): # set up paths cache_dir = Path(cache_dir) - output_path = Path(output_path) if resource is None: resource = resource_from_path(input_path) @@ -413,7 +412,7 @@ def pipeline_run( # build dataset from processed resources # def dataset_create( - input_paths, + input_dir, output_path, organisation_path, pipeline, @@ -425,19 +424,39 @@ def dataset_create( cache_dir="var/cache", resource_path="collection/resource.csv", ): + """ + Create a dataset package from transformed parquet files. + + Builds both SQLite and Parquet dataset packages from transformed resources, + loading facts, entities, issues, and provenance information. + + Args: + input_dir: Directory containing transformed parquet files + output_path: Path for the output SQLite database + organisation_path: Path to organisation.csv file + pipeline: Pipeline object containing configuration + dataset: Name of the dataset to create + specification: Specification object defining the dataset schema + issue_dir: Directory containing issue logs (default: "issue") + column_field_dir: Directory for column-field mappings (default: "var/column-field") + dataset_resource_dir: Directory for dataset-resource mappings (default: "var/dataset-resource") + cache_dir: Directory for caching intermediate files (default: "var/cache") + resource_path: Path to resource.csv file (default: "collection/resource.csv") + """ # set level for logging to see what's going on logger.setLevel(logging.INFO) logging.getLogger("digital_land.package.dataset_parquet").setLevel(logging.INFO) - # chek all paths are paths + # check all paths are paths issue_dir = Path(issue_dir) column_field_dir = Path(column_field_dir) dataset_resource_dir = Path(dataset_resource_dir) cache_dir = Path(cache_dir) resource_path = Path(resource_path) + input_dir = Path(input_dir) # get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future - transformed_parquet_dir = Path(input_paths[0]).parent + input_dir # create directory for dataset_parquet_package, will create a general provenance one for now dataset_parquet_path = cache_dir / "provenance" @@ -461,13 +480,12 @@ def dataset_create( # don't use create as we don't want to create the indexes package.create_database() package.disconnect() - for path in input_paths: - path_obj = Path(path) + for path in input_dir.glob("*.parquet"): logging.info(f"loading column field log into {output_path}") - package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv") + package.load_column_fields(column_field_dir / dataset / f"{path.stem}.csv") logging.info(f"loading dataset resource log into {output_path}") package.load_dataset_resource( - dataset_resource_dir / dataset / f"{path_obj.stem}.csv" + dataset_resource_dir / dataset / f"{path.stem}.csv" ) logger.info(f"loading old entities into {output_path}") old_entity_path = Path(pipeline.path) / "old-entity.csv" @@ -477,8 +495,8 @@ def dataset_create( logger.info(f"loading issues into {output_path}") issue_paths = issue_dir / dataset if issue_paths.exists(): - for issue_path in os.listdir(issue_paths): - package.load_issues(os.path.join(issue_paths, issue_path)) + for issue_path in issue_paths.glob("*.csv"): + package.load_issues(issue_path, issue_path) else: logger.warning("No directory for this dataset in the provided issue_directory") @@ -492,7 +510,7 @@ def dataset_create( path=dataset_parquet_path, specification_dir=None, # TBD: package should use this specification object duckdb_path=cache_dir / "overflow.duckdb", - transformed_parquet_dir=transformed_parquet_dir, + transformed_parquet_dir=input_dir, ) # To find facts we have a complex SQL window function that can cause memory issues. To aid the allocation of memory # we decide on a parquet strategy, based on how many parquet files we have, the overall size of these @@ -504,10 +522,10 @@ def dataset_create( # Group parquet files into approx 256MB batches (if needed) if pqpackage.strategy != "direct": - pqpackage.group_parquet_files(transformed_parquet_dir, target_mb=256) - pqpackage.load_facts(transformed_parquet_dir) - pqpackage.load_fact_resource(transformed_parquet_dir) - pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) + pqpackage.group_parquet_files(input_dir, target_mb=256) + pqpackage.load_facts(input_dir) + pqpackage.load_fact_resource(input_dir) + pqpackage.load_entities(input_dir, resource_path, organisation_path) logger.info("loading fact,fact_resource and entity into {output_path}") pqpackage.load_to_sqlite(output_path) diff --git a/tests/acceptance/test_pipeline_command.py b/tests/acceptance/test_pipeline_command.py new file mode 100644 index 00000000..fe35f311 --- /dev/null +++ b/tests/acceptance/test_pipeline_command.py @@ -0,0 +1,149 @@ +""" +Acceptance tests for pipeline_command CLI function. + +Tests the end-to-end pipeline processing workflow, simulating actual user +interactions with the CLI command. +""" + +import pytest +import pandas as pd +from click.testing import CliRunner + +from digital_land.cli import cli + + +@pytest.fixture +def cli_runner(): + """ + Create a Click CLI runner for testing CLI commands. + """ + return CliRunner() + + +@pytest.fixture +def organisation_path(tmp_path): + """ + Create an organisations dataset for testing. + """ + org_data = { + "entity": [101, 102], + "name": ["Test Org", "Test Org 2"], + "prefix": ["local-authority", "local-authority"], + "reference": ["test-org", "test-org-2"], + "dataset": ["local-authority", "local-authority"], + "organisation": ["local-authority:test-org", "local-authority:test-org-2"], + } + orgs_path = tmp_path / "organisation.csv" + pd.DataFrame.from_dict(org_data).to_csv(orgs_path, index=False) + return orgs_path + + +@pytest.fixture +def test_resource_file(test_dirs): + """ + Create a test resource CSV file for pipeline processing. + """ + resource_dir = test_dirs["collection_dir"] / "resource" + resource_dir.mkdir(parents=True, exist_ok=True) + + resource_hash = "5158d13bfc6f0723b1fb07c975701a906e83a1ead4aee598ee34e241c79a5f3d" + resource_path = resource_dir / resource_hash + + # Create a simple test CSV file + with open(resource_path, "w") as f: + f.write("NAME,NOTES\n") + f.write("Test Park,A test national park\n") + + return resource_path + + +def test_pipeline_command_runs_successfully( + cli_runner, test_dirs, test_resource_file, organisation_path, tmp_path +): + """ + Test that pipeline command executes successfully with standard usage. + + Verifies the entire pipeline workflow from resource input through to + transformed output generation, representing the typical user journey + of processing a resource through the pipeline. ensures correct files are produced + and no errors occur. but does not account for every scenario or edge case. + """ + # Arrange + input_path = str(test_resource_file) + output_file_path = ( + test_dirs["transformed_dir"] + / "national-park" + / f"{test_resource_file.stem}.csv" + ) + output_path = str(output_file_path) + output_log_dir = tmp_path / "output-logs" + config_path = str(tmp_path / "config.sqlite3") + + # Ensure output directories exist (create parent directory of output file) + output_file_path.parent.mkdir(parents=True, exist_ok=True) + output_log_dir.mkdir(parents=True, exist_ok=True) + + # Test endpoint and organisation values + test_endpoint = "d779ad1c91c5a46e2d4ace4d5446d7d7f81df1ed058f882121070574697a5412" + test_organisation = "local-authority:test-org" + + # Act - Invoke the CLI command with standard options + result = cli_runner.invoke( + cli, + [ + "--dataset", + "national-park", + "--pipeline-dir", + str(test_dirs["pipeline_dir"]), + "--specification-dir", + str(test_dirs["specification_dir"]), + "pipeline", + input_path, # positional argument + output_path, # positional argument + "--endpoints", + test_endpoint, + "--organisations", + test_organisation, + "--issue-dir", + str(test_dirs["issues_log_dir"]), + "--column-field-dir", + str(test_dirs["column_field_dir"]), + "--dataset-resource-dir", + str(test_dirs["dataset_resource_dir"]), + "--converted-resource-dir", + str(test_dirs["converted_resource_dir"]), + "--organisation-path", + str(organisation_path), + "--cache-dir", + str(test_dirs["cache_dir"]), + "--collection-dir", + str(test_dirs["collection_dir"]), + "--operational-issue-dir", + str(test_dirs["operational_issues_dir"]), + "--output-log-dir", + str(output_log_dir), + "--config-path", + config_path, + ], + ) + + # Assert + # Check that the command executed without errors + if result.exit_code != 0: + print("\n=== CLI Output ===") + print(result.output) + print("\n=== Exception ===") + print(result.exception) + if result.exception: + import traceback + + print("\n=== Full Traceback ===") + traceback.print_exception( + type(result.exception), result.exception, result.exception.__traceback__ + ) + assert result.exit_code == 0, f"Command failed with: {result.output}" + + # Verify expected directories were created/used + assert test_dirs["issues_log_dir"].exists() + assert test_dirs["column_field_dir"].exists() + assert test_dirs["dataset_resource_dir"].exists() From fc86c2bc9fd22bffc3998565a133eaf13be7f1eb Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Mon, 29 Dec 2025 14:35:24 +0000 Subject: [PATCH 4/6] replace input paths with input directory for create command --- digital_land/cli.py | 6 +++--- digital_land/commands.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index b65aef33..b7148ac0 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -199,11 +199,11 @@ def convert_cmd(input_path, output_path): default="collection/resource.csv", help="link to where the resource list is stored", ) -@click.argument("input-paths", nargs=-1, type=click.Path(exists=True)) +@click.argument("input-dir", nargs=-1, type=click.Path(exists=True)) @click.pass_context def dataset_create_cmd( ctx, - input_paths, + input_dir, output_path, organisation_path, column_field_dir, @@ -213,7 +213,7 @@ def dataset_create_cmd( resource_path, ): return dataset_create( - input_paths=input_paths, + input_dir=input_dir, output_path=output_path, organisation_path=organisation_path, pipeline=ctx.obj["PIPELINE"], diff --git a/digital_land/commands.py b/digital_land/commands.py index 5e6f6bb6..1f382751 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -456,7 +456,7 @@ def dataset_create( input_dir = Path(input_dir) # get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future - input_dir + # input_dir # create directory for dataset_parquet_package, will create a general provenance one for now dataset_parquet_path = cache_dir / "provenance" From edd3ee85175365d3515f4599c2db2c5dfc34f415 Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Mon, 29 Dec 2025 15:01:21 +0000 Subject: [PATCH 5/6] change cli and test so that one arguement is expected --- digital_land/cli.py | 2 +- tests/acceptance/test_dataset_create.py | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/digital_land/cli.py b/digital_land/cli.py index b7148ac0..c1c161fb 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -199,7 +199,7 @@ def convert_cmd(input_path, output_path): default="collection/resource.csv", help="link to where the resource list is stored", ) -@click.argument("input-dir", nargs=-1, type=click.Path(exists=True)) +@click.argument("input-dir", nargs=1, type=click.Path(exists=True)) @click.pass_context def dataset_create_cmd( ctx, diff --git a/tests/acceptance/test_dataset_create.py b/tests/acceptance/test_dataset_create.py index 0626fee6..1a9ed5ae 100644 --- a/tests/acceptance/test_dataset_create.py +++ b/tests/acceptance/test_dataset_create.py @@ -135,7 +135,7 @@ def cache_path(tmp_path): @pytest.fixture -def input_paths(cache_path): +def input_dir(cache_path): data_dicts = {"resource_1": transformed_1_data} input_paths = [] directory = cache_path / "transformed_parquet" / "conservation-area" @@ -148,7 +148,7 @@ def input_paths(cache_path): logging.error(str(input_path)) input_paths.append(str(input_path)) - return input_paths + return directory @pytest.fixture @@ -237,7 +237,7 @@ def resource_path(session_tmp_path): def test_acceptance_dataset_create( session_tmp_path, organisation_path, - input_paths, + input_dir, issue_dir, cache_path, dataset_dir, @@ -270,8 +270,8 @@ def test_acceptance_dataset_create( str(cache_path), "--resource-path", str(resource_path), - ] - + input_paths, + str(input_dir), + ], catch_exceptions=False, ) From 04d9023c661198548be9f183ab44596ed908af3c Mon Sep 17 00:00:00 2001 From: eveleighoj <35256612+eveleighoj@users.noreply.github.com> Date: Mon, 29 Dec 2025 15:51:26 +0000 Subject: [PATCH 6/6] fix error and ensure its tested --- digital_land/commands.py | 2 +- tests/acceptance/test_dataset_create.py | 36 +++++++++++++++++++++---- 2 files changed, 32 insertions(+), 6 deletions(-) diff --git a/digital_land/commands.py b/digital_land/commands.py index 1f382751..869151a2 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -496,7 +496,7 @@ def dataset_create( issue_paths = issue_dir / dataset if issue_paths.exists(): for issue_path in issue_paths.glob("*.csv"): - package.load_issues(issue_path, issue_path) + package.load_issues(issue_path) else: logger.warning("No directory for this dataset in the provided issue_directory") diff --git a/tests/acceptance/test_dataset_create.py b/tests/acceptance/test_dataset_create.py index 1a9ed5ae..09713160 100644 --- a/tests/acceptance/test_dataset_create.py +++ b/tests/acceptance/test_dataset_create.py @@ -16,8 +16,8 @@ from digital_land.cli import cli -test_collection = "conservation-area" -test_dataset = "conservation-area" +TEST_COLLECTION = "conservation-area" +TEST_DATASET = "conservation-area" @pytest.fixture(scope="session") @@ -222,6 +222,32 @@ def dataset_dir(session_tmp_path): def issue_dir(session_tmp_path): issue_dir = session_tmp_path / "issue" os.makedirs(issue_dir, exist_ok=True) + + # Create test issue files for each dataset + dataset = TEST_DATASET + dataset_issue_dir = issue_dir / dataset + os.makedirs(dataset_issue_dir, exist_ok=True) + + # Create a sample issue CSV file + issue_file = dataset_issue_dir / "test-resource.csv" + issue_data = { + "dataset": [dataset, dataset, dataset], + "resource": ["test-resource", "test-resource", "test-resource"], + "line-number": [2, 3, 4], + "entry-number": [1, 2, 3], + "field": ["name", "reference", "start-date"], + "entity": ["", "12345", ""], + "issue-type": ["missing value", "invalid format", "invalid date"], + "value": ["", "INVALID-REF", "2023-13-45"], + "message": [ + "name field is required", + "reference format is invalid", + "date must be in format YYYY-MM-DD", + ], + } + df = pd.DataFrame(issue_data) + df.to_csv(issue_file, index=False) + return issue_dir @@ -245,16 +271,16 @@ def test_acceptance_dataset_create( column_field_path, dataset_resource_path, ): - output_path = dataset_dir / f"{test_dataset}.sqlite3" + output_path = dataset_dir / f"{TEST_DATASET}.sqlite3" runner = CliRunner() result = runner.invoke( cli, [ "--dataset", - str(test_dataset), + str(TEST_DATASET), "--pipeline-dir", - str(f"tests/data/{test_collection}/pipeline"), + str(f"tests/data/{TEST_COLLECTION}/pipeline"), "dataset-create", "--output-path", str(output_path),