diff --git a/digital_land/cli.py b/digital_land/cli.py index 151d065d8..ca92ef3f4 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -147,6 +147,18 @@ def convert_cmd(input_path, output_path): @column_field_dir @dataset_resource_dir @issue_dir +@click.option( + "--cache-dir", + type=click.Path(), + default="var/cache", + help="link to a cache directory to store temporary data that can be deleted once process is finished", +) +@click.option( + "--resource-path", + type=click.Path(exists=True), + default="collection/resource.csv", + help="link to where the resource list is stored", +) @click.argument("input-paths", nargs=-1, type=click.Path(exists=True)) @click.pass_context def dataset_create_cmd( @@ -157,6 +169,8 @@ def dataset_create_cmd( column_field_dir, dataset_resource_dir, issue_dir, + cache_dir, + resource_path, ): return dataset_create( input_paths=input_paths, @@ -168,6 +182,8 @@ def dataset_create_cmd( column_field_dir=column_field_dir, dataset_resource_dir=dataset_resource_dir, issue_dir=issue_dir, + cache_dir=cache_dir, + resource_path=resource_path, ) @@ -194,7 +210,11 @@ def dataset_dump_flattened_cmd(ctx, input_path, output_path): @click.option("--endpoints", help="list of endpoint hashes", default="") @click.option("--organisations", help="list of organisations", default="") @click.option("--entry-date", help="default entry-date value", default="") -@click.option("--custom-temp-dir", help="default temporary directory", default=None) +@click.option( + "--cache-dir", + help="cache directory to store conveted files etc. in", + default="var/cache", +) @click.option("--config-path", help="Path to a configuration sqlite", default=None) @click.option( "--resource", @@ -224,7 +244,7 @@ def pipeline_command( endpoints, organisations, entry_date, - custom_temp_dir, + cache_dir, collection_dir, operational_issue_dir, config_path, @@ -255,7 +275,7 @@ def pipeline_command( endpoints=endpoints, organisations=organisations, entry_date=entry_date, - custom_temp_dir=custom_temp_dir, + cache_dir=cache_dir, config_path=config_path, resource=resource, output_log_dir=output_log_dir, diff --git a/digital_land/commands.py b/digital_land/commands.py index 6abf3a5d8..963e2e6c7 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -28,7 +28,9 @@ ConvertedResourceLog, ) from digital_land.organisation import Organisation + from digital_land.package.dataset import DatasetPackage +from digital_land.package.dataset_parquet import DatasetParquetPackage from digital_land.phase.combine import FactCombinePhase from digital_land.phase.concat import ConcatFieldPhase from digital_land.phase.convert import ConvertPhase, execute @@ -55,6 +57,7 @@ from digital_land.phase.reference import EntityReferencePhase, FactReferencePhase from digital_land.phase.save import SavePhase from digital_land.pipeline import run_pipeline, Lookups, Pipeline +from digital_land.pipeline.process import convert_tranformed_csv_to_pq from digital_land.schema import Schema from digital_land.update import add_source_endpoint from digital_land.configuration.main import Config @@ -172,17 +175,17 @@ def collection_retire_endpoints_and_sources( # # pipeline commands # -def convert(input_path, output_path, custom_temp_dir=None): +def convert(input_path, output_path): if not output_path: output_path = default_output_path("converted", input_path) dataset_resource_log = DatasetResourceLog() converted_resource_log = ConvertedResourceLog() + # TBD this actualy duplictaes the data and does nothing else, should just convert it? run_pipeline( ConvertPhase( input_path, dataset_resource_log=dataset_resource_log, converted_resource_log=converted_resource_log, - custom_temp_dir=custom_temp_dir, ), DumpPhase(output_path), ) @@ -201,10 +204,11 @@ def pipeline_run( operational_issue_dir="performance/operational_issue/", organisation_path=None, save_harmonised=False, + # TBD save all logs in a log directory, this will mean only one path passed in. column_field_dir=None, dataset_resource_dir=None, converted_resource_dir=None, - custom_temp_dir=None, # TBD: rename to "tmpdir" + cache_dir="var/cache", endpoints=[], organisations=[], entry_date="", @@ -213,6 +217,9 @@ def pipeline_run( output_log_dir=None, converted_path=None, ): + # set up paths + cache_dir = Path(cache_dir) + if resource is None: resource = resource_from_path(input_path) dataset = dataset @@ -276,7 +283,6 @@ def pipeline_run( path=input_path, dataset_resource_log=dataset_resource_log, converted_resource_log=converted_resource_log, - custom_temp_dir=custom_temp_dir, output_path=converted_path, ), NormalisePhase(skip_patterns=skip_patterns, null_path=null_path), @@ -356,6 +362,14 @@ def pipeline_run( column_field_log.save(os.path.join(column_field_dir, resource + ".csv")) dataset_resource_log.save(os.path.join(dataset_resource_dir, resource + ".csv")) converted_resource_log.save(os.path.join(converted_resource_dir, resource + ".csv")) + # create converted parquet in the var directory + cache_dir = Path(organisation_path).parent + transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset + transformed_parquet_dir.mkdir(exist_ok=True, parents=True) + convert_tranformed_csv_to_pq( + input_path=output_path, + output_path=transformed_parquet_dir / f"{resource}.parquet", + ) # @@ -371,42 +385,91 @@ def dataset_create( issue_dir="issue", column_field_dir="var/column-field", dataset_resource_dir="var/dataset-resource", + cache_dir="var/cache", + resource_path="collection/resource.csv", ): + # set level for logging to see what's going on + logger.setLevel(logging.INFO) + logging.getLogger("digital_land.package.dataset_parquet").setLevel(logging.INFO) + + # chek all paths are paths + issue_dir = Path(issue_dir) + column_field_dir = Path(column_field_dir) + dataset_resource_dir = Path(dataset_resource_dir) + cache_dir = Path(cache_dir) + resource_path = Path(resource_path) + + # get the transformed files from the cache directory this is assumed right now but we may want to be stricter in the future + transformed_parquet_dir = cache_dir / "transformed_parquet" / dataset + + # creat directory for dataset_parquet_package + dataset_parquet_path = cache_dir / dataset + if not output_path: print("missing output path", file=sys.stderr) sys.exit(2) # Set up initial objects - column_field_dir = Path(column_field_dir) - dataset_resource_dir = Path(dataset_resource_dir) organisation = Organisation( organisation_path=organisation_path, pipeline_dir=Path(pipeline.path) ) + + # create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet package = DatasetPackage( dataset, organisation=organisation, path=output_path, specification_dir=None, # TBD: package should use this specification object ) - package.create() + # don'tt use create as we don't want to create the indexes + package.create_database() + package.disconnect() for path in input_paths: path_obj = Path(path) - package.load_transformed(path) - package.load_column_fields(column_field_dir / dataset / path_obj.name) - package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name) - package.load_entities() - - old_entity_path = os.path.join(pipeline.path, "old-entity.csv") - if os.path.exists(old_entity_path): + logging.info(f"loading column field log into {output_path}") + package.load_column_fields(column_field_dir / dataset / f"{path_obj.stem}.csv") + logging.info(f"loading dataset resource log into {output_path}") + package.load_dataset_resource( + dataset_resource_dir / dataset / f"{path_obj.stem}.csv" + ) + logging.info(f"loading old entities into {output_path}") + old_entity_path = Path(pipeline.path) / "old-entity.csv" + if old_entity_path.exists(): package.load_old_entities(old_entity_path) - issue_paths = os.path.join(issue_dir, dataset) - if os.path.exists(issue_paths): + logging.info(f"loading issues into {output_path}") + issue_paths = issue_dir / dataset + if issue_paths.exists(): for issue_path in os.listdir(issue_paths): package.load_issues(os.path.join(issue_paths, issue_path)) else: logging.warning("No directory for this dataset in the provided issue_directory") + # Repeat for parquet + # Set up cache directory to store parquet files. The sqlite files created from this will be saved in the dataset + if not os.path.exists(cache_dir): + os.makedirs(cache_dir) + + pqpackage = DatasetParquetPackage( + dataset, + path=dataset_parquet_path, + specification_dir=None, # TBD: package should use this specification object + duckdb_path=cache_dir / "overflow.duckdb", + ) + pqpackage.load_facts(transformed_parquet_dir) + pqpackage.load_fact_resource(transformed_parquet_dir) + pqpackage.load_entities(transformed_parquet_dir, resource_path, organisation_path) + + logger.info("loading fact,fact_resource and entity into {output_path}") + pqpackage.load_to_sqlite(output_path) + + logger.info(f"add indexes to {output_path}") + package.connect() + package.create_cursor() + package.create_indexes() + package.disconnect() + + logger.info(f"creating dataset package {output_path} counts") package.add_counts() @@ -1148,7 +1211,7 @@ def get_resource_unidentified_lookups( # could alter resource_from_path to file from path and promote to a utils folder resource = resource_from_path(input_path) dataset_resource_log = DatasetResourceLog(dataset=dataset, resource=resource) - custom_temp_dir = tmp_dir # './var' + # custom_temp_dir = tmp_dir # './var' print("") print("----------------------------------------------------------------------") @@ -1202,7 +1265,6 @@ def get_resource_unidentified_lookups( ConvertPhase( path=input_path, dataset_resource_log=dataset_resource_log, - custom_temp_dir=custom_temp_dir, ), NormalisePhase(skip_patterns=skip_patterns, null_path=null_path), ParsePhase(), diff --git a/digital_land/package/dataset_parquet.py b/digital_land/package/dataset_parquet.py new file mode 100644 index 000000000..b713f1e4c --- /dev/null +++ b/digital_land/package/dataset_parquet.py @@ -0,0 +1,501 @@ +import os +import logging +import duckdb +import shutil +from pathlib import Path +from .package import Package + +logger = logging.getLogger(__name__) + +# TBD: move to from specification datapackage definition +tables = { + "dataset-resource": None, + "column-field": None, + "issue": None, + "entity": None, + "fact": None, + "fact-resource": None, +} + +# TBD: infer from specification dataset +indexes = { + "fact": ["entity"], + "fact-resource": ["fact", "resource"], + "column-field": ["dataset", "resource", "column", "field"], + "issue": ["resource", "dataset", "field"], + "dataset-resource": ["resource"], +} + + +class DatasetParquetPackage(Package): + def __init__(self, dataset, path, duckdb_path=None, **kwargs): + """ + Initialisation method to set up information as needed + + args: + dataset (str): name of the dataset + dir (str): the directory to store the package in + duckdb_path (str): optional parameter to use a duckdb file instead of in memory db + """ + # this is a given at this point to not sure we need it the base package class might use this + self.suffix = ".parquet" + super().__init__(dataset, tables=tables, indexes=indexes, path=path, **kwargs) + self.dataset = dataset + # self.cache_dir = cache_dir + # Persistent connection for the class. Given name to ensure that table is stored on disk (not purely in memory) + if duckdb_path is not None: + self.duckdb_path = Path(duckdb_path) + self.duckdb_path.parent.mkdir(parents=True, exist_ok=True) + self.conn = duckdb.connect(self.duckdb_path) + # self.conn.execute("INSTALL spatial; LOAD spatial;") + else: + self.conn = duckdb.connect() + + self.schema = self.get_schema() + self.typology = self.specification.schema[dataset]["typology"] + + # set up key file paths + self.fact_path = self.path / "fact" / f"dataset={self.dataset}" / "fact.parquet" + self.fact_resource_path = ( + self.path + / "fact-resource" + / f"dataset={self.dataset}" + / "fact-resource.parquet" + ) + self.entity_path = ( + self.path / "entity" / f"dataset={self.dataset}" / "entity.parquet" + ) + + def get_schema(self): + schema = {} + + for field in sorted( + list( + set(self.specification.schema["fact"]["fields"]).union( + set(self.specification.schema["fact-resource"]["fields"]) + ) + ) + ): + datatype = self.specification.field[field]["datatype"] + schema[field] = "BIGINT" if datatype == "integer" else "VARCHAR" + + return schema + + # will be removed as we will remove the temp table from this logic + # def create_temp_table(self, input_paths): + # # Create a temp table of the data from input_paths as we need the information stored there at various times + # logging.info( + # f"loading data into temp table from {os.path.dirname(input_paths[0])}" + # ) + + # input_paths_str = ", ".join([f"'{path}'" for path in input_paths]) + + # # Initial max_line_size and increment step + # max_size = 40000000 + # # increment_step = 20000000 + # # max_limit = 200000000 # Maximum allowable line size to attempt + + # # increment = False + # while True: + # try: + # self.conn.execute("DROP TABLE IF EXISTS temp_table") + # query = f""" + # CREATE TEMPORARY TABLE temp_table AS + # SELECT * + # FROM read_csv( + # [{input_paths_str}], + # columns = {self.schema}, + # header = true, + # force_not_null = {[field for field in self.schema.keys()]}, + # max_line_size={max_size} + # ) + # """ + # self.conn.execute(query) + # break + # except duckdb.Error as e: # Catch specific DuckDB error + # if "Value with unterminated quote" in str(e): + # hard_limit = int(resource.getrlimit(resource.RLIMIT_AS)[1]) + # if max_size < hard_limit / 3: + # logging.info( + # f"Initial max_size did not work, setting it to {hard_limit / 2}" + # ) + # max_size = hard_limit / 2 + # else: + # raise + # else: + # logging.info(f"Failed to read in when max_size = {max_size}") + # raise + + def load_facts(self, transformed_parquet_dir): + """ + This method loads facts into a fact table from a directory containing all transformed files as parquet files + """ + output_path = self.fact_path + output_path.parent.mkdir(parents=True, exist_ok=True) + logging.info("loading facts from temp table") + + fact_fields = self.specification.schema["fact"]["fields"] + fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + + # query to extract data from the temp table (containing raw data), group by a fact, and get the highest + # priority or latest record + + query = f""" + SELECT {fields_str} + FROM '{str(transformed_parquet_dir)}/*.parquet' + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY fact ORDER BY priority, entry_date DESC, entry_number DESC + ) = 1 + """ + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + + def load_fact_resource(self, transformed_parquet_dir): + logging.info(f"loading fact resources from {str(transformed_parquet_dir)}") + output_path = self.fact_resource_path + output_path.parent.mkdir(parents=True, exist_ok=True) + fact_resource_fields = self.specification.schema["fact-resource"]["fields"] + fields_str = ", ".join( + [field.replace("-", "_") for field in fact_resource_fields] + ) + + # All CSV files have been loaded into a temporary table. Extract several columns and export + query = f""" + SELECT {fields_str} + FROM '{str(transformed_parquet_dir)}/*.parquet' + """ + + self.conn.execute( + f""" + COPY ( + {query} + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + ) + + def load_entities_range( + self, + transformed_parquet_dir, + resource_path, + organisation_path, + output_path, + entity_range=None, + ): + # figure out which resources we actually need to do expensive queries on, store in parquet + # sql = f""" + # COPY( + # SELECT DISTINCT resource + # FROM parquet_scan('{transformed_parquet_dir}/*.parquet') + # QUALIFY ROW_NUMBER() OVER ( + # PARTITION BY enttity,field + # ORDER BY prioity, enttry_date DESC, entry_number DESC, resource, fact + # ) = 1 + # ) TO '{self.cache_path / 'duckdb_temp_files' / 'distinct_resource.parquet'}' (FORMAT PARQUET); + # """ + + logger.info(f"loading entities from {transformed_parquet_dir}") + + entity_fields = self.specification.schema["entity"]["fields"] + # Do this to match with later field names. + entity_fields = [e.replace("-", "_") for e in entity_fields] + # input_paths_str = f"{self.cache_dir}/fact{self.suffix}" + if entity_range is not None: + entity_where_clause = ( + f"WHERE entity >= {entity_range[0]} AND entity < {entity_range[1]}" + ) + else: + entity_where_clause = "" + + query = f""" + SELECT DISTINCT REPLACE(field,'-','_') + FROM parquet_scan('{transformed_parquet_dir}/*.parquet') + {entity_where_clause} + """ + + # distinct_fields - list of fields in the field in fact + rows = self.conn.execute(query).fetchall() + distinct_fields = [row[0] for row in rows] + + # json fields - list of fields which are present in the fact table which + # do not exist separately in the entity table so need to be included in the json field + # Need to ensure that 'organisation' is not included either so that it is excluded + json_fields = [ + field + for field in distinct_fields + if field not in entity_fields + ["organisation"] + ] + + # null fields - list of fields which are not present in the fact tables which have + # to be in the entity table as a column + extra_fields = [ + "entity", + "dataset", + "typology", + "json", + "organisation_entity", + "organisation", + ] + null_fields = [ + field + for field in entity_fields + if field not in (distinct_fields + extra_fields) + ] + + if "organisation" not in distinct_fields: + null_fields.append("organisation") + + # select fields - a list of fields which have to be selected directly from the pivoted table + # these are entity fields that are not null fields or a few special ones + extra_fields = [ + "json", + "organisation_entity", + "dataset", + "typology", + "organisation", + ] + select_fields = [ + field for field in entity_fields if field not in null_fields + extra_fields + ] + + # set fields + fields_to_include = ["entity", "field", "value"] + fields_str = ", ".join(fields_to_include) + + # create this statement to add a nul org column, this is needed when no entities have an associated organisation + if "organisation" not in distinct_fields: + optional_org_str = ",''::VARCHAR AS \"organisation\"" + else: + optional_org_str = "" + + # Take original data, group by entity & field, and order by highest priority then latest record. + # If there are still matches then pick the first resource (and fact, just to make sure) + # changes to make + # not sure why this is bringing a raw resourcce AND the temp_table this data is essentially the same + # need the resource hash and entry number of the file, this is important for ordering + # between these two, the onlly other metric that isn't in the factt resource table is the start date of the resource + # query to get this info + # query to use this info to get the most recent facts + # query to turn the most recent facts into a pivot + # query to sort the final table + # query to create the file + + # craft a where clause to limit entities in quetion, this chunking helps solve memory issues + + query = f""" + SELECT {fields_str}{optional_org_str} FROM ( + SELECT {fields_str}, CASE WHEN resource_csv."end-date" IS NULL THEN '2999-12-31' ELSE resource_csv."end-date" END AS resource_end_date + FROM parquet_scan('{transformed_parquet_dir}/*.parquet') tf + LEFT JOIN read_csv_auto('{resource_path}', max_line_size=40000000) resource_csv + ON tf.resource = resource_csv.resource + {entity_where_clause} + QUALIFY ROW_NUMBER() OVER ( + PARTITION BY entity, field + ORDER BY priority, entry_date DESC, entry_number DESC, resource_end_date DESC, tf.resource, fact + ) = 1 + ) + """ + + pivot_query = f""" + PIVOT ( + {query} + ) ON REPLACE(field,'-','_') + USING MAX(value) + """ + + # now use the field lists produced above to create specific statements to: + # add null columns which are missing + # include columns in the json statement + # Collate list of fields which don't exist but need to be in the final table + select_statement = ", ".join([f"t1.{field}" for field in select_fields]) + # Don't want to include anything that ends with "_geom" + null_fields_statement = ", ".join( + [ + f"''::VARCHAR AS \"{field}\"" + for field in null_fields + if not field.endswith("_geom") + ] + ) + json_statement = ", ".join( + [ + f"CASE WHEN t1.{field} IS NOT NULL THEN REPLACE('{field}', '_', '-') ELSE NULL END, t1.{field}" + for field in json_fields + ] + ) + + # define organisation query + org_csv = organisation_path + org_query = f""" + SELECT * FROM read_csv_auto('{org_csv}', max_line_size=40000000) + """ + + # should installinng spatial be done here + sql = f""" + INSTALL spatial; LOAD spatial; + COPY( + WITH computed_centroid AS ( + SELECT + * EXCLUDE (point), -- Calculate centroid point if not given + CASE + WHEN (geometry IS NOT NULL and geometry <> '') AND (point IS NULL OR point = '') + THEN ST_AsText(ST_ReducePrecision(ST_Centroid(ST_GeomFromText(geometry)),0.000001)) + ELSE point + END AS point + FROM ( + SELECT '{self.dataset}' as dataset, + '{self.typology}' as typology, + t2.entity as organisation_entity, + {select_statement}, + {null_fields_statement}, + json_object({json_statement}) as json, + FROM ({pivot_query}) as t1 + LEFT JOIN ({org_query}) as t2 + on t1.organisation = t2.organisation + ) + ) + SELECT + * EXCLUDE (json), + CASE WHEN json = '{{}}' THEN NULL ELSE json END AS json + FROM computed_centroid + ) TO '{str(output_path)}' (FORMAT PARQUET); + """ + # might need to un some fetch all toget result back + self.conn.execute(sql) + + def combine_parquet_files(self, input_path, output_path): + """ + This method combines multiple parquet files into a single parquet file + """ + logger.info(f"combining parquet files from {input_path} into {output_path}") + # use self.conn to use duckdb to combine files + sql = f""" + COPY (select * from parquet_scan('{input_path}/*.parquet')) TO '{output_path}' (FORMAT PARQUET); + """ + self.conn.execute(sql) + + def load_entities(self, transformed_parquet_dir, resource_path, organisation_path): + output_path = self.entity_path + output_path.parent.mkdir(parents=True, exist_ok=True) + + # retrieve entity counnts including and minimum + min_sql = f"select MIN(entity) FROM parquet_scan('{transformed_parquet_dir}/*.parquet');" + min_entity = self.conn.execute(min_sql).fetchone()[0] + max_sql = f"select MAX(entity) FROM parquet_scan('{transformed_parquet_dir}/*.parquet');" + max_entity = self.conn.execute(max_sql).fetchone()[0] + total_entities = max_entity - min_entity + entity_limit = 1000000 + if total_entities > entity_limit: + # create a temparary output path to store separate entity file in + temp_dir = ( + output_path.parent + / "temp_parquet_files" + / "title-boundaries" + / "entity_files" + ) + temp_dir.mkdir(parents=True, exist_ok=True) + logger.info(f"total entities {total_entities} exceeds limit {entity_limit}") + _ = min_entity + file_count = 1 + while _ < max_entity: + temp_output_path = temp_dir / f"entity_{file_count}.parquet" + entity_range = [_, _ + entity_limit] + logger.info( + f"loading entities from {entity_range[0]} to {entity_range[1]}" + ) + self.load_entities_range( + transformed_parquet_dir, + resource_path, + organisation_path, + temp_output_path, + entity_range, + ) + _ += entity_limit + file_count += 1 + # combine all the parquet files into a single parquet file + self.combine_parquet_files(temp_dir, output_path) + + # remove temporary files + shutil.rmtree(temp_dir) + else: + self.load_entities_range( + transformed_parquet_dir, resource_path, organisation_path, output_path + ) + + def load_to_sqlite(self, sqlite_path): + """ + Convert parquet files to sqlite3 tables assumes the sqlite table already exist. There is an arguement to + say we want to improve the loading functionality of a sqlite package + """ + # At present we are saving the parquet files in 'cache' but saving the sqlite files produced in 'dataset' + # In future when parquet files are saved to 'dataset' remove the 'cache_dir' in the function arguments and + # replace 'cache_dir' with 'output_path' in this function's code + logger.info( + f"loading sqlite3 tables in {sqlite_path} from parquet files in {self.path}" + ) + # migrate to connection creation + query = "INSTALL sqlite; LOAD sqlite;" + self.conn.execute(query) + + # attache the sqlite db to duckdb + self.conn.execute( + f"ATTACH DATABASE '{sqlite_path}' AS sqlite_db (TYPE SQLITE);" + ) + + fact_resource_fields = self.specification.schema["fact-resource"]["fields"] + fields_str = ", ".join( + [field.replace("-", "_") for field in fact_resource_fields] + ) + + logger.info("loading fact_resource data") + # insert fact_resource data + self.conn.execute( + f""" + INSERT INTO sqlite_db.fact_resource + SELECT {fields_str} FROM parquet_scan('{self.fact_resource_path}') + """ + ) + + logger.info("loading fact data") + # insert fact data + fact_fields = self.specification.schema["fact"]["fields"] + fields_str = ", ".join([field.replace("-", "_") for field in fact_fields]) + + self.conn.execute( + f""" + INSERT INTO sqlite_db.fact + SELECT {fields_str} FROM parquet_scan('{self.fact_path}') + """ + ) + + logger.info("loading entity data") + # insert entity data + entity_fields = self.specification.schema["entity"]["fields"] + fields_str = ", ".join( + [ + field.replace("-", "_") + for field in entity_fields + if field not in ["geometry-geom", "point-geom"] + ] + ) + self.conn.execute( + f""" + INSERT INTO sqlite_db.entity + SELECT {fields_str} FROM parquet_scan('{self.entity_path}') + """ + ) + + self.conn.execute("DETACH DATABASE sqlite_db;") + + def close_conn(self): + logging.info("Close connection to duckdb database in session") + if self.conn is not None: + self.conn.close() + if os.path.exists(self.duckdb_file): + os.remove(self.duckdb_file) + + def load(self): + pass diff --git a/digital_land/phase/convert.py b/digital_land/phase/convert.py index 0f411f3b1..b92e2d900 100644 --- a/digital_land/phase/convert.py +++ b/digital_land/phase/convert.py @@ -187,25 +187,30 @@ def __init__( path=None, dataset_resource_log=None, converted_resource_log=None, - custom_temp_dir=None, output_path=None, + # custom_temp_dir=None, ): + """ + given a fie/filepath will aim to convert it to a csv and return the path to a csv, if the file is already a csv + + Args: + path (str): Path to the shapefile or geojson + dataset_resource_log (DatasetResourceLog): DatasetResourceLog object + converted_resource_log (ConvertedResourceLog): ConvertedResourceLog object + output_path (str): Optional output path for the converted csv + """ self.path = path self.dataset_resource_log = dataset_resource_log self.converted_resource_log = converted_resource_log self.charset = "" - # Allows for custom temporary directory to be specified - # This allows symlink creation in case of /tmp & path being on different partitions - if custom_temp_dir: - self.temp_file_extra_kwargs = {"dir": custom_temp_dir} - else: - self.temp_file_extra_kwargs = {} - self.output_path = output_path if output_path: output_dir = os.path.dirname(output_path) if not os.path.exists(output_dir): os.makedirs(output_dir) + # self.custom_temp_dir = custom_temp_dir + # if custom_temp_dir: + # os.makedirs(custom_temp_dir, exist_ok=True) def process(self, stream=None): input_path = self.path @@ -247,6 +252,7 @@ def process(self, stream=None): return Stream(input_path, f=iter(()), log=self.dataset_resource_log) + # should this be a method and not a function? I think we re-factor it into a function let's remove references to self def _read_text_file(self, input_path, encoding): f = read_csv(input_path, encoding) self.dataset_resource_log.mime_type = "text/csv" + self.charset @@ -361,8 +367,9 @@ def _read_binary_file(self, input_path): if internal_path: self.dataset_resource_log.internal_path = internal_path self.dataset_resource_log.internal_mime_type = mime_type + # TODO erpace temp path with output path temp_path = tempfile.NamedTemporaryFile( - suffix=".zip", **self.temp_file_extra_kwargs + suffix=".zip", dir=self.output_path.parent ).name os.link(input_path, temp_path) zip_path = f"/vsizip/{temp_path}{internal_path}" diff --git a/digital_land/pipeline/__init__.py b/digital_land/pipeline/__init__.py new file mode 100644 index 000000000..0936fc54c --- /dev/null +++ b/digital_land/pipeline/__init__.py @@ -0,0 +1,9 @@ +"""sub package containing code for processing resources into transformed resources""" + +from .main import ( # noqa: F401 + Pipeline, + Lookups, + chain_phases, + run_pipeline, + EntityNumGen, +) diff --git a/digital_land/pipeline.py b/digital_land/pipeline/main.py similarity index 99% rename from digital_land/pipeline.py rename to digital_land/pipeline/main.py index f2a53564b..8f6d08cc6 100644 --- a/digital_land/pipeline.py +++ b/digital_land/pipeline/main.py @@ -5,9 +5,9 @@ import logging from pathlib import Path -from .phase.map import normalise -from .phase.lookup import key as lookup_key -from .schema import Schema +from digital_land.phase.map import normalise +from digital_land.phase.lookup import key as lookup_key +from digital_land.schema import Schema def chain_phases(phases): diff --git a/digital_land/pipeline/process.py b/digital_land/pipeline/process.py new file mode 100644 index 000000000..7ad249c52 --- /dev/null +++ b/digital_land/pipeline/process.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python +import os +import pandas as pd +import pyarrow.parquet as pq +import pyarrow as pa +from pathlib import Path + +# load in specification + + +# TODO need to take in the correct data types for the columns +def convert_tranformed_csv_to_pq(input_path, output_path): + """ + function to convert a transformed resource to a parrquet file. + """ + input_path = Path(input_path) + output_path = Path(output_path) + if output_path.exists(): + os.remove(output_path) + + # Define the chunk size for reading the CSV file + chunk_size = 1000000 # Number of rows per chunk + + # expand on column names + # Open a CSV reader with PyArrow + # csv_reader = pv.open_csv(input_path, read_options=pv.ReadOptions(block_size=chunk_size)) + csv_iterator = pd.read_csv( + input_path, + chunksize=chunk_size, + dtype={ + "entity": int, + **{ + col: str + for col in pd.read_csv(input_path, nrows=1).columns + if col != "entity" + }, + }, + na_filter=False, + ) + + # Initialize the Parquet writer with the schema from the first chunk + first_chunk = next(csv_iterator) + # size = 0 + # size +=len(first_chunk) + + fields = [ + ("end-date", pa.string()), + ("entity", pa.int64()), + ("entry-date", pa.string()), + ("entry-number", pa.string()), + ("fact", pa.string()), + ("field", pa.string()), + ("priority", pa.string()), + ("reference-entity", pa.string()), + ("resource", pa.string()), + ("start-date", pa.string()), + ("value", pa.string()), + ] + schema = pa.schema(fields) + table = pa.Table.from_pandas(first_chunk, schema=schema) + + # rename columns for parquet files to make querying easier in s3 + # Replace '-' with '_' in column names + new_column_names = [name.replace("-", "_") for name in table.column_names] + table = table.rename_columns(new_column_names) + + # Create a Parquet writer + parquet_writer = pq.ParquetWriter(output_path, table.schema) + + # Write the first chunk + parquet_writer.write_table(table) + + # Process and write the remaining chunks + while True: + try: + chunk = next(csv_iterator) + table = pa.Table.from_pandas(chunk, schema=schema) + new_column_names = [name.replace("-", "_") for name in table.column_names] + table = table.rename_columns(new_column_names) + parquet_writer.write_table(table) + # size += len(chunk) + except StopIteration: + break + + # Close the Parquet writer + parquet_writer.close() + # print(size) diff --git a/tests/acceptance/test_dataset_create.py b/tests/acceptance/test_dataset_create.py new file mode 100644 index 000000000..1508db528 --- /dev/null +++ b/tests/acceptance/test_dataset_create.py @@ -0,0 +1,367 @@ +""" +A set of tests to mimic a user (computational or otherwise) running tests against +a sqlite dataset. There are quite a few things to set up and this specifically +""" + +import pytest +import logging +import numpy as np +import pandas as pd +import os +import sqlite3 +from tempfile import TemporaryDirectory +from pathlib import Path + +from click.testing import CliRunner + +from digital_land.cli import cli + +test_collection = "conservation-area" +test_dataset = "conservation-area" + + +@pytest.fixture(scope="session") +def session_tmp_path(): + with TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def cache_path(tmp_path): + cache_path = tmp_path / "var" / "cache" + cache_path.mkdir(parents=True, exist_ok=True) + return cache_path + + +test_geometry = "MULTIPOLYGON(((-0.49901924 53.81622,-0.5177418 53.76114,-0.4268378 53.78454,-0.49901924 53.81622)))" +transformed_1_data = { + "end_date": [np.nan] * 16, + "entity": [11, 11, 11, 11, 11, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12], + "entry_date": [ + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + ], + "entry_number": [2] * 16, + "fact": [ + "abcdef1", + "abcdef2", + "abcdef3", + "abcdef4", + "abcdef5", + "abcdef6", + "abc1231", + "abc1232", + "abc1233", + "def4561", + "def4562", + "def4563", + "a1b2c31", + "a1b2c32", + "a1b2c33", + "a1b2c34", + ], + "field": [ + "entry-date", + "geometry", + "point", + "document-url", + "organisation", + "entry-date", + "geometry", + "organisation", + "entry-date", + "geometry", + "organisation", + "entry-date", + "geomtry", + "document-url", + "notes-checking", + "organisation", + ], + "priority": [2] * 16, + "reference_entity": [np.nan] * 16, + "resource": [ + "zyxwvu", + "zyxwvu", + "zyxwvu", + "zyxwvu", + "zyxwvu", + "zyxwvu", + "yxwvut", + "yxwvut", + "zyxwvu", + "xwvuts", + "xwvuts", + "zyxwvu", + "wvutsr", + "wvutsr", + "wvutsr", + "wvutsr", + ], + "start_date": [np.nan] * 16, + "value": [ + "2023-01-01", + f"{test_geometry}", + '"POINT(-0.481 53.788)"', + "https://www.test.xyz", + "organisation:AAA", + "2023-01-01", + f"{test_geometry}", + "local-authority:BBB", + "2023-01-01", + f"{test_geometry}", + "local-authority:CCC", + "2023-01-01", + f"{test_geometry}", + "https://www.testing.yyz", + "Something random", + "local-authority:DDD", + ], +} + + +@pytest.fixture +def input_paths(cache_path): + data_dicts = {"resource_1": transformed_1_data} + input_paths = [] + directory = cache_path / "transformed_parquet" / "conservation-area" + directory.mkdir(parents=True, exist_ok=True) + + for path, data in data_dicts.items(): + data = pd.DataFrame.from_dict(data) + input_path = directory / f"{path}.parquet" + data.to_parquet(input_path, index=False) + logging.error(str(input_path)) + input_paths.append(str(input_path)) + + return input_paths + + +@pytest.fixture +def organisation_path(tmp_path): + """ + build an organisations dataset to use + """ + org_data = { + "entity": [101, 102], + "name": ["test", "test_2"], + "prefix": ["local-authority", "local-authority"], + "reference": ["test", "test_2"], + "dataset": ["local-authority", "local-authority"], + "organisation": ["local-authority:test", "local-authority:test_2"], + } + orgs_path = tmp_path / "organisation.csv" + + pd.DataFrame.from_dict(org_data).to_csv(orgs_path, index=False) + return orgs_path + + +@pytest.fixture +def column_field_path(tmp_path): + column_field_dir = tmp_path / "column-field" + dataset_cfd = column_field_dir / "conservation-area" + (dataset_cfd).mkdir(parents=True, exist_ok=True) + data = { + "end_date": [""], + "entry_date": [""], + "field": ["geometry"], + "dataset": ["conservation-area"], + "start_date": [""], + "resource": [""], + "column": ["WKT"], + } + pd.DataFrame.from_dict(data).to_csv(dataset_cfd / "resource_1.csv", index=False) + logging.error(str(dataset_cfd / "resource_1.csv")) + return column_field_dir + + +@pytest.fixture +def dataset_resource_path(tmp_path): + dataset_resource_path = tmp_path / "dataset-resource" + dataset_drd = dataset_resource_path / "conservation-area" + dataset_drd.mkdir(parents=True, exist_ok=True) + data = { + "end_date": [""], + "entry_date": [""], + "dataset": ["conservation-area"], + "entity_count": [""], + "entry_count": [1], + "line_count": [1], + "mime_type": [""], + "internal_path": [""], + "internal_mime_type": [""], + "resource": ["resource_1"], + "start_date": [""], + } + pd.DataFrame.from_dict(data).to_csv(dataset_drd / "resource_1.csv", index=False) + return dataset_resource_path + + +@pytest.fixture +def dataset_dir(session_tmp_path): + dataset_dir = session_tmp_path / "dataset" + os.makedirs(dataset_dir, exist_ok=True) + return dataset_dir + + +@pytest.fixture +def issue_dir(session_tmp_path): + issue_dir = session_tmp_path / "issue" + os.makedirs(issue_dir, exist_ok=True) + return issue_dir + + +@pytest.fixture +def resource_path(session_tmp_path): + resource_path = session_tmp_path / "resource.csv" + columns = ["resource", "end-date"] + with open(resource_path, "w") as f: + f.write(",".join(columns) + "\n") + return resource_path + + +def test_acceptance_dataset_create( + session_tmp_path, + organisation_path, + input_paths, + issue_dir, + cache_path, + dataset_dir, + resource_path, + column_field_path, + dataset_resource_path, +): + output_path = dataset_dir / f"{test_dataset}.sqlite3" + + runner = CliRunner() + result = runner.invoke( + cli, + [ + "--dataset", + str(test_dataset), + "--pipeline-dir", + str(f"tests/data/{test_collection}/pipeline"), + "dataset-create", + "--output-path", + str(output_path), + "--organisation-path", + str(organisation_path), + "--column-field-dir", + str(column_field_path), + "--dataset-resource-dir", + str(dataset_resource_path), + "--issue-dir", + str(issue_dir), + "--cache-dir", + str(cache_path), + "--resource-path", + str(resource_path), + ] + + input_paths, + catch_exceptions=False, + ) + + # Check that the command exits with status code 0 (success) + if result.exit_code != 0: + # Print the command output if the test fails + print("Command failed with exit code:", result.exit_code) + print("Command output:") + print(result.output) + print("Command error output:") + print(result.exception) + + files = [ + str(f.name) + for f in ( + cache_path / "conservation-area" / "dataset=conservation-area" + ).iterdir() + ] + for file in ["entity.parquet", "fact.parquet", "fact_resource.parquet"]: + assert file in files, f"file {file} not created. files found {', '.join(files)}" + assert result.exit_code == 0, "error returned when building dataset" + + # check that parquet files have been created correctlly in the cache directory + # may want to adjust this for how we structure a parquet package in the future + # also we are using the cache to store this for now but in the future we may want to store it in a specific directory + files = [ + str(f.name) + for f in ( + cache_path / "conservation-area" / "dataset=conservation-area" + ).iterdir() + ] + + for file in ["entity.parquet", "fact.parquet", "fact_resource.parquet"]: + assert file in files, f"file {file} not created. files found {', '.join(files)}" + + # Check the sqlite file was created + assert os.path.exists(output_path), f"sqlite file {output_path} does not exists" + + conn = sqlite3.connect(output_path) + cursor = conn.cursor() + tables = cursor.execute( + "SELECT name FROM sqlite_master WHERE type='table';" + ).fetchall() + expected_tables = {"fact", "fact_resource", "entity"} + actual_tables = {table[0] for table in tables} + missing_tables = expected_tables - actual_tables + assert ( + len(missing_tables) == 0 + ), f"Missing following tables in sqlite database: {missing_tables}" + + for table in list(expected_tables): + + pq_rows = len( + pd.read_parquet( + cache_path + / "conservation-area" + / "dataset=conservation-area" + / f"{table}.parquet" + ) + ) + + assert pq_rows > 0, f"parquet file {table} is empty" + sql_rows = cursor.execute(f"SELECT COUNT(*) FROM {table};").fetchone()[0] + assert sql_rows > 0, f"database table {table} is empty" + assert ( + pq_rows == sql_rows + ), f"Different rows between the parquet files and database table for {table}" + + # entity table specific tests to check how we expect the data to be used + + # json field checks + # where no json value is present we expect the value to be null. not blank or an empty json bracket + # so will ensure these aren't in the results of any test + sql = """ + SELECT * + FROM entity + WHERE json = '{}' + ;""" + + results = cursor.execute(sql).fetchall() + assert ( + len(results) == 0 + ), "there should be no rows where json is an empty json bracket" + + # check no json values are arrays + sql = """ + SELECT * + FROM entity + WHERE json_type(json) NOT IN ('object', NULL) + ;""" + + results = cursor.execute(sql).fetchall() + assert len(results) == 0, "all json values should be objects or null" diff --git a/tests/integration/package/conftest.py b/tests/integration/package/conftest.py new file mode 100644 index 000000000..408c3de1b --- /dev/null +++ b/tests/integration/package/conftest.py @@ -0,0 +1,34 @@ +import pytest +import os + +from urllib.request import urlretrieve + + +@pytest.fixture(scope="session") +def specification_dir(tmp_path_factory): + specification_dir = tmp_path_factory.mktemp("specification") + source_url = "https://raw.githubusercontent.com/digital-land/" + specification_csvs = [ + "attribution.csv", + "licence.csv", + "typology.csv", + "theme.csv", + "collection.csv", + "dataset.csv", + "dataset-field.csv", + "field.csv", + "datatype.csv", + "prefix.csv", + # deprecated .. + "pipeline.csv", + "dataset-schema.csv", + "schema.csv", + "schema-field.csv", + ] + for specification_csv in specification_csvs: + urlretrieve( + f"{source_url}/specification/main/specification/{specification_csv}", + os.path.join(specification_dir, specification_csv), + ) + + return specification_dir diff --git a/tests/integration/package/test_dataset.py b/tests/integration/package/test_dataset.py index a24c80063..96c3a7add 100644 --- a/tests/integration/package/test_dataset.py +++ b/tests/integration/package/test_dataset.py @@ -57,36 +57,6 @@ def transformed_fact_resources_with_blank(): return input_data -@pytest.fixture(scope="session") -def specification_dir(tmp_path_factory): - specification_dir = tmp_path_factory.mktemp("specification") - source_url = "https://raw.githubusercontent.com/digital-land/" - specification_csvs = [ - "attribution.csv", - "licence.csv", - "typology.csv", - "theme.csv", - "collection.csv", - "dataset.csv", - "dataset-field.csv", - "field.csv", - "datatype.csv", - "prefix.csv", - # deprecated .. - "pipeline.csv", - "dataset-schema.csv", - "schema.csv", - "schema-field.csv", - ] - for specification_csv in specification_csvs: - urllib.request.urlretrieve( - f"{source_url}/specification/main/specification/{specification_csv}", - os.path.join(specification_dir, specification_csv), - ) - - return specification_dir - - @pytest.fixture def organisation_csv(tmp_path): organisation_path = os.path.join(tmp_path, "organisation.csv") diff --git a/tests/integration/package/test_dataset_parquet.py b/tests/integration/package/test_dataset_parquet.py new file mode 100644 index 000000000..f94244dd1 --- /dev/null +++ b/tests/integration/package/test_dataset_parquet.py @@ -0,0 +1,838 @@ +import sqlite3 +import numpy as np +import pandas as pd +import logging +import pytest +import os +import json +import pyarrow.parquet as pq +import pyarrow as pa +from digital_land.package.dataset_parquet import DatasetParquetPackage + + +class MockOrganisation(object): + def __init__(self, organisation_path): + self.organisation_path = organisation_path + + +@pytest.fixture +def org_path(tmp_path): + org_path = tmp_path / "organisation.csv" + columns = ["organisation", "entity"] + # Test data for the tables. This checks that 'field' get pivoted + data = [ + ["local-authority:AAA", "1"], + ["local-authority:BBB", "2"], + ["local-authority:CCC", "3"], + ["local-authority:DDD", "4"], + ["local-authority:EEE", "5"], + ["local-authority:FFF", "6"], + ["local-authority:GGG", "7"], + ["local-authority:HHH", "8"], + ["local-authority:III", "9"], + ] + with open(org_path, "w") as f: + f.write(",".join(columns) + "\n") + for row in data: + f.write(",".join(map(str, row)) + "\n") + return org_path + + +# # Fixture to create a shared temporary directory +# @pytest.fixture(scope="session") +# def temp_dir(tmpdir_factory): +# temp_dir = tmpdir_factory.mktemp("shared_session_temp_dir") +# yield temp_dir + + +@pytest.fixture +def resource_path(tmp_path): + resource_path = tmp_path / "resource.csv" + resource_columns = ["resource", "end-date"] + with open(resource_path, "w") as f: + f.write(",".join(resource_columns) + "\n") + + return resource_path + + +# general use file to use for testing should focus on splitting down into individual test cases +test_geometry = "MULTIPOLYGON(((-0.49901924 53.81622,-0.5177418 53.76114,-0.4268378 53.78454,-0.49901924 53.81622)))" +transformed_1_data = { + "end_date": [np.nan] * 16, + "entity": [11, 11, 11, 11, 11, 11, 12, 12, 12, 12, 12, 12, 12, 12, 12, 12], + "entry_date": [ + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + ], + "entry_number": [2] * 16, + "fact": [ + "abcdef1", + "abcdef2", + "abcdef3", + "abcdef4", + "abcdef5", + "abcdef6", + "abc1231", + "abc1232", + "abc1233", + "def4561", + "def4562", + "def4563", + "a1b2c31", + "a1b2c32", + "a1b2c33", + "a1b2c34", + ], + "field": [ + "entry-date", + "geometry", + "point", + "document-url", + "organisation", + "entry-date", + "geometry", + "organisation", + "entry-date", + "geometry", + "organisation", + "entry-date", + "geomtry", + "document-url", + "notes-checking", + "organisation", + ], + "priority": [2] * 16, + "reference_entity": [np.nan] * 16, + "resource": [ + "zyxwvu", + "zyxwvu", + "zyxwvu", + "zyxwvu", + "zyxwvu", + "zyxwvu", + "yxwvut", + "yxwvut", + "zyxwvu", + "xwvuts", + "xwvuts", + "zyxwvu", + "wvutsr", + "wvutsr", + "wvutsr", + "wvutsr", + ], + "start_date": [np.nan] * 16, + "value": [ + "2023-01-01", + f"{test_geometry}", + '"POINT(-0.481 53.788)"', + "https://www.test.xyz", + "organisation:AAA", + "2023-01-01", + f"{test_geometry}", + "local-authority:BBB", + "2023-01-01", + f"{test_geometry}", + "local-authority:CCC", + "2023-01-01", + f"{test_geometry}", + "https://www.testing.yyz", + "Something random", + "local-authority:DDD", + ], +} + +transformed_2_data = { + "end_date": [np.nan] * 19, # 19 records + "entity": [ + 110, + 110, + 110, + 111, + 111, + 111, + 112, + 112, + 112, + 113, + 113, + 113, + 114, + 114, + 114, + 115, + 115, + 115, + 116, + ], + "entry_date": [ + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-02-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-04-01", + "2023-05-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + "2023-01-01", + ], + "entry_number": [2, 2, 2, 2, 2, 2, 2, 12, 12, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2], + "fact": [ + "badcfe1", + "badcfe2", + "badcfe3", + "fedcba1", + "fedcba2", + "fedcba3", + "bcdefg1", + "bcdefg2", + "bcdefg3", + "cdefgh1", + "hgfedc1", + "cdefgh2", + "efghij1", + "efghij2", + "efghij3", + "defghi1", + "defghi2", + "defghi3", + "ihgfed1", + ], + "field": [ + "entry-date", + "entry-date", + "organisation", + "entry-date", + "entry-date", + "organisation", + "entry-date", + "entry-date", + "organisation", + "entry-date", + "entry-date", + "organisation", + "entry-date", + "entry-date", + "organisation", + "entry-date", + "entry-date", + "organisation", + "entry-date", + ], + "priority": [2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 1, 2, 1, 2, 2, 2, 2], + "reference_entity": [np.nan] * 19, # 19 records + "resource": [ + "zyx123", + "zyx123", + "zyx123", + "zxy123", + "zxy123", + "zxy123", + "yxw456", + "yxw456", + "yxw456", + "xwv789", + "xwv789", + "xwv789", + "xyz123", + "xyz123", + "xyz123", + "uvw456", + "wvu654", + "uvw456", + "rta357", + ], + "start_date": [np.nan] * 19, # 19 records + "value": [ + "2023-01-01", + "2023-01-01", + "local-authority:DDD", + "2023-01-01", + "2023-02-01", + "local-authority:EEE", + "2023-02-01", + "2023-02-01", + "local-authority:FFF", + "2023-01-01", + "2023-01-01", + "local-authority:GGG", + "2023-04-01", + "2023-05-01", + "local-authority:HHH", + "2023-01-01", + "2023-01-01", + "local-authority:III", + "2023-01-01", + ], +} + + +@pytest.fixture +def dataset_sqlite_path(tmp_path): + """ + Should consider using a test spec to feed in to a dataset package instead, also functionality might need to be moved + """ + sqlite_path = tmp_path / "conservation-area.sqlite3" + conn = sqlite3.connect(sqlite_path) + conn.execute( + """ + CREATE TABLE entity( + dataset TEXT, + end_date TEXT, + entity INTEGER PRIMARY KEY, + entry_date TEXT, + geojson JSON, + geometry TEXT, + json JSON, + name TEXT, + organisation_entity TEXT, + point TEXT, + prefix TEXT, + reference TEXT, + start_date TEXT, + typology TEXT + ); + """ + ) + conn.execute( + """ + CREATE TABLE fact( + end_date TEXT, + entity INTEGER, + fact TEXT PRIMARY KEY, + field TEXT, + entry_date TEXT, + priority INTEGER, + reference_entity TEXT, + start_date TEXT, + value TEXT, + FOREIGN KEY(entity) REFERENCES entity(entity) + ); + """ + ) + conn.execute( + """ + CREATE TABLE fact_resource( + end_date TEXT, + fact TEXT, + entry_date TEXT, + entry_number INTEGER, + priority INTEGER, + resource TEXT, + start_date TEXT, + FOREIGN KEY(fact) REFERENCES fact(fact) + ); + """ + ) + + conn.commit() + conn.close() + + return sqlite_path + + +@pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) +def test_load_facts_single_file(data: dict, expected: int, tmp_path): + """ + tests loading from a directory when there is a single file, multiple files + make very little difference to duckdb so use to test out individual cases + """ + # convert data to df and save to a file + df = pd.DataFrame.from_dict(data) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df.to_parquet(transformed_parquet_dir / "transformed_resouce.parquet", index=False) + + # instantiate package + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + + # this method is explicitely designed to load facts from the temp table + # however it shouldn't need this, it's dupllicating all of the same data in a emporary space + # we should try leveraging the power of duckdb and parquet. + package.load_facts(transformed_parquet_dir=transformed_parquet_dir) + + output_file = ( + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" + ) + assert os.path.exists(output_file), "fact.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact.parquet file" + assert ( + len(df) == expected + ), "No. of facts does not match expected" # No of unique facts + assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" + + +@pytest.mark.parametrize( + "data1,data2,expected", [(transformed_1_data, transformed_2_data, 35)] +) +def test_load_facts_multiple_files(data1, data2, expected, tmp_path): + """ + test loading multiple files into the fact table when they're from a single directory + """ + # convert data to df and save to a file + df1 = pd.DataFrame.from_dict(data1) + df2 = pd.DataFrame.from_dict(data2) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df1.to_parquet( + transformed_parquet_dir / "transformed_resource_1.parquet", index=False + ) + df2.to_parquet( + transformed_parquet_dir / "transformed_resource_2.parquet", index=False + ) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + + package.load_facts(transformed_parquet_dir=transformed_parquet_dir) + + output_file = ( + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" + ) + assert os.path.exists(output_file), "fact.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact.parquet file" + assert ( + len(df) == expected + ), "No. of facts does not match expected" # No of unique facts + assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" + + +@pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) +def test_load_facts_one_file_with_empty_file(data, expected, tmp_path): + """ + test loading one file into the fact table alongside an empty file + """ + + df = pd.DataFrame.from_dict(data) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df.to_parquet(transformed_parquet_dir / "transformed_resouce.parquet", index=False) + schema = pa.schema( + [ + ("end_date", pa.string()), + ("entity", pa.int64()), + ("entry_date", pa.string()), + ("entry_number", pa.int64()), + ("fact", pa.string()), + ("field", pa.string()), + ("priority", pa.int64()), + ("reference_entity", pa.int64()), + ("resource", pa.string()), + ("start_date", pa.string()), + ("value", pa.string()), + ] + ) + empty_arrays = [pa.array([], type=field.type) for field in schema] + empty_table = pa.Table.from_arrays(empty_arrays, schema=schema) + pq.write_table(empty_table, transformed_parquet_dir / "empty.parquet") + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + + package.load_facts(transformed_parquet_dir=transformed_parquet_dir) + + output_file = ( + tmp_path + / "conservation-area" + / "fact" + / "dataset=conservation-area" + / "fact.parquet" + ) + assert os.path.exists(output_file), "fact.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact.parquet file" + assert ( + len(df) == expected + ), "No. of facts does not match expected" # No of unique facts + assert df.shape[1] == 9, "Not all columns saved in fact.parquet file" + + +@pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) +def test_load_fact_resource_single_file(data, expected, tmp_path): + + df = pd.DataFrame.from_dict(data) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df.to_parquet(transformed_parquet_dir / "transformed_resouce.parquet", index=False) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + package.load_fact_resource(transformed_parquet_dir) + + # Check if the output parquet file exists and verify contents + output_file = ( + tmp_path + / "conservation-area" + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet" + ) + assert os.path.exists(output_file), "fact-resource.parquet file does not exist" + + # Load Parquet into a DataFrame to verify data correctness + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact-resource,parquet file" + assert len(df) == expected, "Not all data saved in fact-resource.parquet file" + + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" + + +@pytest.mark.parametrize( + "data_1,data_2,expected", [(transformed_1_data, transformed_2_data, 35)] +) +def test_load_fact_resource_two_filea(data_1, data_2, expected, tmp_path): + df_1 = pd.DataFrame.from_dict(data_1) + df_2 = pd.DataFrame.from_dict(data_2) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df_1.to_parquet( + transformed_parquet_dir / "transformed_resource_1.parquet", index=False + ) + df_2.to_parquet( + transformed_parquet_dir / "transformed_resource_2.parquet", index=False + ) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + package.load_fact_resource(transformed_parquet_dir) + + # Check if the output parquet file exists and verify contents + output_file = ( + tmp_path + / "conservation-area" + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet" + ) + assert os.path.exists(output_file), "fact-resource.parquet file does not exist" + + # Load Parquet into a DataFrame to verify data correctness + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact-resource,parquet file" + assert len(df) == expected, "Not all data saved in fact-resource.parquet file" + + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" + + +@pytest.mark.parametrize("data,expected", [(transformed_1_data, 16)]) +def test_load_fact_resource_empty_file_with_another(data, expected, tmp_path): + + df = pd.DataFrame.from_dict(data) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df.to_parquet(transformed_parquet_dir / "transformed_resouce.parquet", index=False) + # create empty file + schema = pa.schema( + [ + ("end_date", pa.string()), + ("entity", pa.int64()), + ("entry_date", pa.string()), + ("entry_number", pa.int64()), + ("fact", pa.string()), + ("field", pa.string()), + ("priority", pa.int64()), + ("reference_entity", pa.int64()), + ("resource", pa.string()), + ("start_date", pa.string()), + ("value", pa.string()), + ] + ) + empty_arrays = [pa.array([], type=field.type) for field in schema] + empty_table = pa.Table.from_arrays(empty_arrays, schema=schema) + pq.write_table(empty_table, transformed_parquet_dir / "empty.parquet") + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + package.load_fact_resource(transformed_parquet_dir) + + # Check if the output parquet file exists and verify contents + output_file = ( + tmp_path + / "conservation-area" + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet" + ) + assert os.path.exists(output_file), "fact-resource.parquet file does not exist" + + # Load Parquet into a DataFrame to verify data correctness + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in fact-resource,parquet file" + assert len(df) == expected, "Not all data saved in fact-resource.parquet file" + + assert df.shape[1] == 7, "Not all columns saved in fact-resource.parquet file" + + +@pytest.mark.parametrize( + "data,expected_count,expected_props", + # need to buid an example where organisation is blank + [ + (transformed_1_data, 2, {11: {"end_date": ""}}), + ( + { + "end_date": [np.nan], # 19 records + "entity": [ + 110, + ], + "entry_date": [ + "2023-01-01", + ], + "entry_number": [2], + "fact": [ + "badcfe1", + ], + "field": [ + "entry-date", + ], + "priority": [2], + "reference_entity": [np.nan], # 19 records + "resource": [ + "zyx123", + ], + "start_date": [np.nan], # 19 records + "value": ["2023-01-01"], + }, + 1, + {}, + ), + ], +) +def test_load_entities_single_file( + data, expected_count, expected_props, tmp_path, org_path, resource_path +): + # Create dummy organisation.csv file for use in 'load_entities' + # Test data for the tables. This checks that 'field' get pivoted + df = pd.DataFrame.from_dict(data) + transformed_parquet_dir = tmp_path / "transformed" + transformed_parquet_dir.mkdir(parents=True, exist_ok=True) + df.to_parquet(transformed_parquet_dir / "transformed_resouce.parquet", index=False) + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "conservation-area", + specification_dir=None, + ) + package.load_entities(transformed_parquet_dir, resource_path, org_path) + + output_file = ( + tmp_path + / "conservation-area" + / "entity" + / "dataset=conservation-area" + / "entity.parquet" + ) + assert os.path.exists(output_file), "entity.parquet file does not exist" + + df = pd.read_parquet(output_file) + + assert len(df) > 0, "No data in entity.parquet file" + assert len(df) == expected_count, "No. of entities is not correct" + assert df["entity"].nunique() == len(df), "Entity column contains duplicate values" + + for entity in expected_props: + for key, value in expected_props[entity].items(): + logging.info(f"entity={entity}, key={key}, value={value}") + assert ( + df[df["entity"] == entity][key].iloc[0] == value + ), f"Expected {key} to be {value} for entity {entity}" + + +# not great test as have to feed so much in, would be better to test each table loading at a time +@pytest.mark.parametrize( + "fact_data,fact_resource_data,entity_data", + [ + ( + { + "fact": [""], + "end_date": [1], + "entity": [1], + "field": [""], + "entry_date": [""], + "priority": [1], + "reference_entity": [""], + "start_date": [1], + "value": [""], + }, + { + "end_date": [""], + "fact": [1], + "entry_date": [""], + "entry_number": [1], + "priority": [1], + "resource": [""], + "start_date": [1], + }, + { + "entity": [1], + "dataset": ["conservation-area"], + "end_date": [""], + "entry_date": [""], + "geojson": [""], + "geometry": [""], + "json": [""], + "name": [""], + "organisation_entity": [""], + "point": [""], + "prefix": [""], + "reference": [""], + "start_date": [""], + "typology": [""], + }, + ) + ], +) +def test_load_pq_to_sqlite_basic( + fact_data, fact_resource_data, entity_data, dataset_sqlite_path, tmp_path +): + + dataset_parquet_path = tmp_path / "dataset" + (dataset_parquet_path / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + # write data to parquet files in the dataset path + fact_df = pd.DataFrame.from_dict(fact_data) + fact_resource_df = pd.DataFrame.from_dict(fact_resource_data) + entity_df = pd.DataFrame.from_dict(entity_data) + + (dataset_parquet_path / "fact" / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + (dataset_parquet_path / "fact-resource" / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + (dataset_parquet_path / "entity" / "dataset=conservation-area").mkdir( + parents=True, exist_ok=True + ) + + fact_df.to_parquet( + dataset_parquet_path / "fact" / "dataset=conservation-area" / "fact.parquet", + index=False, + ) + fact_resource_df.to_parquet( + dataset_parquet_path + / "fact-resource" + / "dataset=conservation-area" + / "fact-resource.parquet", + index=False, + ) + entity_df.to_parquet( + dataset_parquet_path + / "entity" + / "dataset=conservation-area" + / "entity.parquet", + index=False, + ) + + output_path = dataset_sqlite_path + + package = DatasetParquetPackage( + dataset="conservation-area", + path=tmp_path / "dataset", + specification_dir=None, + ) + + package.load_to_sqlite(output_path) + + assert os.path.exists(dataset_sqlite_path), "sqlite3 file does not exist" + + cnx = sqlite3.connect(output_path) + df_sql = pd.read_sql_query("SELECT * FROM fact_resource", cnx) + assert len(df_sql) > 0, "No data in fact_resource table" + assert len(df_sql) == len( + fact_resource_df + ), "Not all data saved in fact_resource table" + assert np.all( + len(df_sql["end_date"] == 0) + ), "Non-empty strings in end_date from fact_resource table" + + df_sql = pd.read_sql_query("SELECT * FROM fact", cnx) + assert len(df_sql) > 0, "No data in fact table" + assert len(df_sql) == len(fact_df), "Not all data saved in fact table" + assert np.all( + len(df_sql["end_date"] == 0) + ), "Non-empty strings in end_date from fact table" + + df_sql = pd.read_sql_query("SELECT * FROM entity", cnx) + assert len(df_sql) > 0, "No data in entity table" + assert len(df_sql) == len(entity_df), "Not all data saved in entity table" + assert np.any( + len(df_sql["geometry"] == 0) + ), "All geometries from entity table have values" + assert np.any( + len(df_sql["geometry"] == 0) + ), "All geometries from entity table have non-blank values" + assert not any( + [ + ( + any("_" in key for key in json.loads(row).keys()) + if isinstance(row, str) + else False + ) + for row in df_sql["json"] + if row != "" + ] + ), "Some json object have underscores in their 'keys'" + + cnx.close() diff --git a/tests/integration/pipeline/test_process.py b/tests/integration/pipeline/test_process.py new file mode 100644 index 000000000..4f0e24932 --- /dev/null +++ b/tests/integration/pipeline/test_process.py @@ -0,0 +1,54 @@ +import pandas as pd + +from digital_land.pipeline.process import convert_tranformed_csv_to_pq + + +def test_convert_transformed_csv_to_pq_converts_csv(tmp_path): + # creat csv that looks like a transformed csv + data = { + "end-date": [""], + "entity": [4220000], + "entry-date": ["2024-10-02"], + "entry-number": [1], + "fact": ["1be8ef923db61d62354f041718ea0b1795c5ae60b436ec74e90d9fd850919434"], + "field": ["name"], + "priority": [2], + "reference-entity": [""], + "resource": [ + "0d1f06295866286d290d831b4569fe862ab38ca72cd23d541de2c9f20ff44ed7" + ], + "start-date": [""], + "value": "Arun District Council Local Plan 2011 - 2031", + } + df = pd.DataFrame(data) + data_path = ( + tmp_path + / "0d1f06295866286d290d831b4569fe862ab38ca72cd23d541de2c9f20ff44ed7.csv" + ) + df.to_csv(data_path) + + # use process on it + output_path = ( + tmp_path + / "0d1f06295866286d290d831b4569fe862ab38ca72cd23d541de2c9f20ff44ed7.parquet" + ) + convert_tranformed_csv_to_pq( + data_path, + tmp_path + / "0d1f06295866286d290d831b4569fe862ab38ca72cd23d541de2c9f20ff44ed7.parquet", + ) + + # check resulting parquet file for: + assert ( + output_path.exists() + ), f"no parquet file created as expected at {str(output_path)}" + # headers and number of rows + parquet_df = pd.read_parquet(output_path) + for col in list(parquet_df.columns): + assert "-" not in col + + for col in list(df.columns): + assert col.replace("-", "_") in list(parquet_df.columns) + + +# check column types