diff --git a/digital_land/cli.py b/digital_land/cli.py index b65aef331..92d158009 100644 --- a/digital_land/cli.py +++ b/digital_land/cli.py @@ -31,6 +31,7 @@ organisation_check, save_state, add_data, + load_pipeline_provenance, ) from digital_land.command_arguments import ( @@ -825,3 +826,24 @@ def check_state_cmd( if diffs: print(f"State differs from {state_path} - {', '.join(diffs)}") sys.exit(1) + + +@cli.command( + "check-state", + short_help="compare the current state against a stated file. Returns with a non-zero return code if they differ.", +) +@click.option( + "--sqlite-path", + type=click.Path(exists=True), + help="file path to a sqlite database containing the dataset provenance data", +) +@click.option( + "--database-url", + type=click.STRING, + help="the database connection url to load data into, schema is assumed to exist", +) +def load_pipeline_provenance_cli(sqlite_path, database_url): + """ + A command line interface to load the pipeline provenance data from a sqlite file into a database. + """ + load_pipeline_provenance(sqlite_path, database_url) diff --git a/digital_land/commands.py b/digital_land/commands.py index f88559c48..65c14702f 100644 --- a/digital_land/commands.py +++ b/digital_land/commands.py @@ -31,6 +31,7 @@ from digital_land.package.dataset import DatasetPackage from digital_land.package.dataset_parquet import DatasetParquetPackage +from digital_land.package.platform import PlatformPackage from digital_land.phase.combine import FactCombinePhase from digital_land.phase.concat import ConcatFieldPhase from digital_land.phase.convert import ConvertPhase, execute @@ -79,6 +80,7 @@ from .register import hash_value from .utils.gdal_utils import get_gdal_version +from .utils.postgres_utils import get_pg_connection logger = logging.getLogger(__name__) @@ -440,15 +442,9 @@ def dataset_create( print("missing output path", file=sys.stderr) sys.exit(2) - # Set up initial objects - organisation = Organisation( - organisation_path=organisation_path, pipeline_dir=Path(pipeline.path) - ) - # create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet package = DatasetPackage( dataset, - organisation=organisation, path=output_path, specification_dir=None, # TBD: package should use this specification object ) @@ -557,7 +553,6 @@ def dataset_update( if not dataset_path: package = DatasetPackage( dataset, - organisation=organisation, path=output_path, specification_dir=None, # TBD: package should use this specification object ) @@ -572,7 +567,6 @@ def dataset_update( logging.info(f"Reading from local dataset file {dataset_path}") package = DatasetPackage( dataset, - organisation=organisation, path=dataset_path, specification_dir=None, # TBD: package should use this specification object ) @@ -585,7 +579,7 @@ def dataset_update( package.load_transformed(path) package.load_column_fields(column_field_dir / dataset / path_obj.name) package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name) - package.load_entities() + package.load_entities(organisations=organisation.organisation) old_entity_path = os.path.join(pipeline.path, "old-entity.csv") if os.path.exists(old_entity_path): @@ -1723,3 +1717,48 @@ def check_and_assign_entities( ): return False return True + + +def load_pipeline_provenance(sqlite_path: Path, database_url, specification_path): + """ + Load dataset provenance from the sqlite dataset package into the platfom. updates values based on what's coming in. + :param sqlite_path: Path to the sqlite database file which contains povenance files realted to the datasets + :param database_url: URL of the postgis database. + """ + # TODO link to spec, how do we do this? what should do? why? + # download spec if needed + # spec = Specification(path=specification_dir,load_on_init=False) + # spec.download(overwrite=False) + # spec.load() + fact_csv_path = sqlite_path.parent / "fact.csv" + fact_resource_csv_path = sqlite_path.parent / "fact_resource.csv" + issue_csv_path = sqlite_path.parent / "issue.csv" + entity_csv_path = sqlite_path.parent / "entity.csv" + + conn = get_pg_connection(database_url) + + # extract CSVs from the sqlite database + dataset_package = DatasetPackage( + path=sqlite_path, dataset="test", specification_dir=specification_path + ) + + dataset_package.export_table_to_csv(fact_resource_csv_path, "fact_resource") + dataset_package.export_table_to_csv(fact_csv_path, "fact") + dataset_package.export_table_to_csv(issue_csv_path, "issue") + + # entities contained in the fact + dataset_package.export_fact_entities_to_csv(entity_csv_path) + + # establish platform package + platform = PlatformPackage(database_url=database_url) + # load the actual data + platform.update_fact_resource(fact_resource_csv_path, conn) + platform.update_fact(fact_csv_path, conn) + platform.update_entity(entity_csv=entity_csv_path, conn=conn) + + # load othe povinance tables + platform.update_issues(issue_csv_path, conn) + + # TODO expannd with other provenance produced by the pipeline phase + conn.commit() + conn.close() diff --git a/digital_land/package/dataset.py b/digital_land/package/dataset.py index 4956bb5ee..db9db58b0 100644 --- a/digital_land/package/dataset.py +++ b/digital_land/package/dataset.py @@ -1,6 +1,7 @@ import csv import json import logging +import sqlite3 from decimal import Decimal import shapely.wkt @@ -32,13 +33,12 @@ class DatasetPackage(SqlitePackage): - def __init__(self, dataset, organisation, **kwargs): + def __init__(self, dataset, **kwargs): super().__init__(dataset, tables=tables, indexes=indexes, **kwargs) self.dataset = dataset self.entity_fields = self.specification.schema["entity"]["fields"] - self.organisations = organisation.organisation - def migrate_entity(self, row): + def migrate_entity(self, row, organisations): dataset = self.dataset entity = row.get("entity", "") @@ -55,7 +55,7 @@ def migrate_entity(self, row): # hack until FactReference is reliable .. if not row.get("organisation-entity", ""): - row["organisation-entity"] = self.organisations.get( + row["organisation-entity"] = organisations.get( row.get("organisation", ""), {} ).get("entity", "") @@ -118,9 +118,9 @@ def entity_row(self, facts): row[fact[1]] = fact[2] return row - def insert_entity(self, facts): + def insert_entity(self, facts, organisations): row = self.entity_row(facts) - row = self.migrate_entity(row) + row = self.migrate_entity(row, organisations=organisations) if row: self.insert("entity", self.entity_fields, row) @@ -147,7 +147,7 @@ def load_old_entities(self, path): self.commit() self.disconnect() - def load_entities(self): + def load_entities(self, organisations): """load the entity table from the fact table""" self.connect() self.create_cursor() @@ -163,13 +163,13 @@ def load_entities(self): # If facts and fact does not point to same entity as first fact if facts and fact[0] != facts[0][0]: # Insert existing facts - self.insert_entity(facts) + self.insert_entity(facts, organisations) # Reset facts list for new entity facts = [] facts.append(fact) if facts: - self.insert_entity(facts) + self.insert_entity(facts, organisations) self.commit() self.disconnect() @@ -289,5 +289,31 @@ def load_transformed(self, path): self.commit() self.disconnect() + def export_fact_entities_to_csv(self, output_path): + """ + Function to export a distinct list of entities from the fact table to a CSV file. + """ + # TODO add the export function + conn = sqlite3.connect(self.path) + cursor = conn.cursor() + + with open(output_path, "w", newline="") as f: + writer = csv.writer(f, delimiter="|") + + # Execute the query + cursor.execute("SELECT DISTINCT entity FROM fact") + + # Write header + writer.writerow([desc[0] for desc in cursor.description]) + + # Stream rows in chunks + while True: + rows = cursor.fetchmany(1000) # adjust chunk size as needed + if not rows: + break + writer.writerows(rows) + + conn.close() + def load(self): pass diff --git a/digital_land/package/package.py b/digital_land/package/package.py index adcf7c5a8..54adc2544 100644 --- a/digital_land/package/package.py +++ b/digital_land/package/package.py @@ -50,6 +50,10 @@ def load(self): class Package: + """ + A base package assigning values that are needed by any package + """ + def __init__( self, datapackage, diff --git a/digital_land/package/platform.py b/digital_land/package/platform.py new file mode 100644 index 000000000..b9e869da4 --- /dev/null +++ b/digital_land/package/platform.py @@ -0,0 +1,487 @@ +import uuid +import csv + +from pathlib import Path + +from .postgres import PostgresPackage + + +class PlatformPackage(PostgresPackage): + """ + A very specific class used to represent the postgis database behind our platform. Primarily for loading of data into it. + + This class inherits from PostgresPackage and is used to handle the specific requirements of the data in the platform including + how data can be absorbed and loaded from other sources + """ + + def __init__( + self, + database_url=None, + conn=None, + ): + self.database_url = database_url + self.conn = conn + + @staticmethod + def _reorder_field_types(csv_path: Path, field_types: dict) -> dict: + """ + helper Function to reorder the field types based on headers form the csv + validates the corect values are there as well + """ + with open(csv_path, newline="") as csvfile: + reader = csv.DictReader(csvfile, delimiter="|") + header = reader.fieldnames + + # Step 2: Check if headers match the dict keys + csv_keys_set = set(header) + dict_keys_set = set(field_types.keys()) + + if csv_keys_set != dict_keys_set: + missing_in_csv = dict_keys_set - csv_keys_set + extra_in_csv = csv_keys_set - dict_keys_set + raise ValueError( + f"CSV headers do not match expected fields.\n" + f"Missing: {missing_in_csv}\n" + f"Extra: {extra_in_csv}" + ) + + # Step 3: Reorder dict based on CSV header + ordered_field_types = {key: field_types[key] for key in header} + return ordered_field_types + + def update_fact_resource(self, input_file: Path, conn=None): + """ + Function to update the fact resource table from a csv + """ + + if not conn: + conn = self._get_conn() + + temp_table = f"temp_import_{uuid.uuid4().hex}" + # load csv into a temporary table in postgis + # remove rows from fact resource where the resource is in the new data + # add the rows to the fact resouce table + # remove the temporary table + fact_resource_fields = { + "fact": "VARCHAR(64)", + "resource": "VARCHAR(64)", + "entry_date": "DATE", + "entry_number": "INTEGER", + "priority": "INTEGER", + "reference_entity": "BIGINT", + "dataset": "VARCHAR(64)", + } + + non_key_fields = { + "reference_entity", + } + + fact_resource_fields = self._reorder_field_types( + field_types=fact_resource_fields, csv_path=input_file + ) + + with conn.cursor() as cur: + # need to get list of table from spec so the specification is needed + # 1. Create TEMPORARY TABLE + cur.execute( + f""" + CREATE TEMPORARY TABLE {temp_table} ( + {','.join([f'{k} {v}' for k, v in fact_resource_fields.items()])} + ) ON COMMIT DROP; + """ + ) + + # 2. Use COPY to load data from CSV (very fast, streaming) + with open(input_file, "r") as f: + cur.copy_expert( + f""" + COPY {temp_table} ({','.join([f'{k}' for k in fact_resource_fields.keys()])}) + FROM STDIN WITH (FORMAT csv, HEADER, DELIMITER '|', FORCE_NULL( + {",".join(non_key_fields)} + )); + """, + f, + ) + + # 3. Delete matching rows from main table + cur.execute( + f""" + DELETE FROM fact_resource + WHERE resource IN (SELECT resource FROM {temp_table}); + """ + ) + + # 4. Insert new rows from temp table into main table + cur.execute( + f""" + INSERT INTO fact_resource ({','.join([f'{k}' for k in fact_resource_fields.keys()])}) + SELECT {','.join([f'{k}' for k in fact_resource_fields.keys()])} FROM {temp_table}; + """ + ) + + def update_fact(self, input_file: Path, conn=None): + """ + Method to update the fact table. It's worth noting that the fact_resource table should be updated ahead of this + """ + + if not conn: + conn = self._get_conn() + + temp_table = f"temp_import_{uuid.uuid4().hex}" + # load csv into a temporary table in postgis + # remove rows from fact resource where the resource is in the new data + # add the rows to the fact resouce table + # remove the temporary table + # TODO how can the schema be flexible + field_types = { + "fact": "VARCHAR(64)", + "entity": "BIGINT", + "field": "VARCHAR(64)", + "value": "TEXT", + "resource": "VARCHAR(64)", + "entry_date": "DATE", + "entry_number": "INTEGER", + "priority": "INTEGER", + "reference_entity": "BIGINT", + "dataset": "VARCHAR(64)", + } + + non_key_fields = { + "reference_entity", + } + + # make sure the ode of the the field types match the csv + field_types = self._reorder_field_types( + field_types=field_types, csv_path=input_file + ) + + with conn.cursor() as cur: + # 1. Create TEMPORARY TABLE + cur.execute( + f""" + CREATE TEMPORARY TABLE {temp_table} ( + {','.join([f'{k} {v}' for k, v in field_types.items()])} + ) ON COMMIT DROP; + """ + ) + + # 2. Use COPY to load data from CSV (very fast, streaming) + with open(input_file, "r") as f: + cur.copy_expert( + f""" + COPY {temp_table} ({','.join([f'{k}' for k in field_types.keys()])}) + FROM STDIN WITH (FORMAT csv, HEADER, DELIMITER '|', FORCE_NULL( + {",".join(non_key_fields)} + )); + """, + f, + ) + + # could replace the 3rd condition with the resourcce entry-date + cur.execute( + f""" + INSERT INTO fact (fact,entity,field,value, entry_number, resource, entry_date, priority,reference_entity, dataset) + SELECT fact,entity,field,value, entry_number, resource, entry_date, priority, reference_entity, dataset + FROM {temp_table} + ON CONFLICT (fact) DO UPDATE + SET + entry_number = EXCLUDED.entry_number, + resource = EXCLUDED.resource, + entry_date = EXCLUDED.entry_date, + priority = EXCLUDED.priority, + reference_entity = EXCLUDED.reference_entity, + dataset = EXCLUDED.dataset + WHERE EXCLUDED.priority > fact.priority + OR (EXCLUDED.priority = fact.priority AND EXCLUDED.entry_date > fact.entry_date) + OR (EXCLUDED.priority = fact.priority AND EXCLUDED.entry_date = fact.entry_date AND EXCLUDED.resource > fact.resource) + OR ( + EXCLUDED.priority = fact.priority + AND EXCLUDED.entry_date = fact.entry_date + AND EXCLUDED.resource = fact.resource + AND EXCLUDED.entry_number > fact.entry_number + ) + """ + ) + + # remove facts not in fact_resource + cur.execute( + """ + DELETE FROM fact + WHERE fact NOT IN (SELECT fact FROM fact_resource); + """ + ) + + # update facts where fact resoure combo isn't in fact_rersource + cur.execute( + """ + WITH update_facts AS ( + SELECT f.fact + FROM fact f + WHERE NOT EXISTS ( + SELECT 1 + FROM fact_resource fr + WHERE fr.fact = f.fact AND fr.resource = f.resource + ) + ) + INSERT INTO fact (fact, entity, field, value, reference_entity, entry_number, resource, entry_date, priority, dataset) + SELECT t1.fact, t1.entity, t1.field, t1.value, t1.reference_entity, t1.entry_number, t1.resource, t1.entry_date, t1.priority, t1.dataset + FROM ( + SELECT DISTINCT ON (fr.fact) + fr.fact, + fr.entry_number, + fr.resource, + fr.entry_date, + fr.priority, + f.reference_entity, + f.entity, + f.field, + f.value, + f.dataset + FROM fact_resource fr + JOIN fact f on f.fact = fr.fact + ORDER BY fr.fact,fr.priority, fr.entry_date DESC,fr.resource, fr.entry_number DESC + ) AS t1 + ON CONFLICT (fact) DO UPDATE + SET + entry_number = EXCLUDED.entry_number, + resource = EXCLUDED.resource, + entry_date = EXCLUDED.entry_date, + priority = EXCLUDED.priority, + reference_entity = EXCLUDED.reference_entity, + dataset = EXCLUDED.dataset + """ + ) + + def update_entity(self, entity_csv, conn): + """ + Function tto update the entities from the fact and fact resource tables + after updating the fact the and fact resource table Accepts a csv of entities + which need updating (might want to rework this in the future) + """ + if not conn: + conn = self._get_conn() + + temp_table = f"temp_entity_{uuid.uuid4().hex}" + # load csv into a temporary table in postgis + # remove rows from fact resource where the resource is in the new data + # add the rows to the fact resouce table + # remove the temporary table + # TODO how can the schema be flexible + # field_types = { + # "entity": "BIGINT", + # "field": "VARCHAR(64)", + # "value": "TEXT", + # "dataset": "VARCHAR(64)", + # "entry_date": "DATE", + # "entry_number": "INTEGER", + # "priority": "INTEGER", + # "reference_entity": "BIGINT" + # } + + # field_types = { + # "entity": "BIGINT (primary key, autoincrement=False)", + # "name": "Text (nullable=True)", + # "entry_date": "Date (nullable=True)", + # "start_date": "Date (nullable=True)", + # "end_date": "Date (nullable=True)", + # "dataset": "Text (nullable=True)", + # "json": "JSONB (nullable=True)", + # "organisation_entity": "BIGINT (nullable=True)", + # "prefix": "Text (nullable=True)", + # "reference": "Text (nullable=True)", + # "typology": "Text (nullable=True)", + # "geometry": "Geometry(MULTIPOLYGON, SRID=4326, nullable=True)", + # "point": "Geometry(POINT, SRID=4326, nullable=True)", + # "geojson_col": "JSONB (nullable=True, column name = 'geojson')", + # } + + # non_key_fields = { + # "reference_entity", + # } + + with conn.cursor() as cur: + # I think we'e over complicatinng it with PIVOT functions + # we are performaing a distinct set of calculations on entity goup to extract information + # from one column + # pivoting is spilling everything for no reason + + # 1. Create TEMPORARY TABLE + cur.execute( + f""" + CREATE TEMPORARY TABLE {temp_table} ( + entity BIGINT PRIMARY KEY + ) ON COMMIT DROP; + """ + ) + + with open(entity_csv, "r") as f: + cur.copy_expert( + f""" + COPY {temp_table} (entity) + FROM STDIN WITH (FORMAT csv, HEADER, DELIMITER '|'); + """, + f, + ) + + # create disinct and pivot + # case_fields = [ + # "name", + # "start_date", + # "end_date", + # ] + cur.execute( + f""" + WITH latest_fact AS ( + SELECT DISTINCT ON (field) + fact.* + FROM fact + JOIN {temp_table} temp on temp.entity = fact.entity + ORDER BY field, priority, entry_date DESC,resource ASC, entry_number DESC + ), + pre_entity AS ( + SELECT entity, + MAX(CASE WHEN field = 'name' THEN value END) AS name, + MAX(CASE WHEN field = 'start_date' THEN value END) AS start_date, + MAX(CASE WHEN field = 'end_date' THEN value END) AS end_date, + MAX(CASE WHEN field = 'entry_date' THEN value END) AS entry_date, + MAX(dataset) AS dataset, + MAX(CASE WHEN field = 'organisation' THEN value END) AS organisation, + MAX(CASE WHEN field = 'prefix' THEN value END) AS prefix, + MAX(CASE WHEN field = 'reference' THEN value END) AS reference, + ST_SetSRID( + ST_GeomFromText( + MAX(CASE WHEN field = 'geometry' THEN value END) + ), + 4326 + ) AS geometry, + ST_SetSRID( + ST_GeomFromText( + MAX(CASE WHEN field = 'point' THEN value END) + ), + 4326 + ) AS point, + NULLIF( + jsonb_object_agg(field, value) FILTER ( + WHERE field NOT IN ('name','start_date','end_date','dataset', 'reference', 'geometry') + ), + '{{}}'::jsonb + ) AS json + FROM latest_fact lf + GROUP BY entity + ) + INSERT INTO entity (entity, start_date, end_date, entry_date,name,organisation_entity, dataset, prefix, reference, geometry, point, json) + SELECT t1.entity, + t1.start_date, + t1.end_date, + t1.entry_date, + t1.name, + t2.entity as organisation_entity, + t1.dataset, + t1.prefix, + t1.reference, + t1.geometry, + t1.point, + t1.json + FROM pre_entity t1 + LEFT JOIN organisation t2 on t2.organisation = t1.organisation + ON CONFLICT (entity) DO UPDATE + SET + start_date = EXCLUDED.start_date, + end_date = EXCLUDED.end_date, + entry_date = EXCLUDED.entry_date, + name = EXCLUDED.name, + organisation_entity = EXCLUDED.organisation_entity, + dataset = EXCLUDED.dataset, + prefix = EXCLUDED.prefix, + reference = EXCLUDED.reference, + geometry = EXCLUDED.geometry, + point = EXCLUDED.point, + json = EXCLUDED.json + """ + ) + + # remove any entities not in fact + cur.execute( + """ + DELETE FROM entity + WHERE entity NOT IN (SELECT DISTINCT entity FROM fact); + """ + ) + + def update_issues(self, input_file, conn): + """ + Function to update the issues table from a csv + + as this is updating not loading it will remove issues for any resources included in the update + """ + + if not conn: + conn = self._get_conn() + + temp_table = f"temp_import_issue_{uuid.uuid4().hex}" + # load csv into a temporary table in postgis + # remove rows from fact resource where the resource is in the new data + # add the rows to the fact resouce table + # remove the temporary table + field_types = { + "entity": "BIGINT", + "entry_date": "DATE", + "entry_number": "INTEGER", + "field": "VARCHAR(64)", + "issue_type": "VARCHAR(64)", + "line_number": "INTEGER", + "dataset": "VARCHAR(64)", # also has a foreign key + "resource": "VARCHAR(64)", + "value": "TEXT", + "message": "TEXT", + } + + non_key_fields = { + "entity", + "entry_date", + "value", + "message", + } + + field_types = self._reorder_field_types( + field_types=field_types, csv_path=input_file + ) + + with conn.cursor() as cur: + # need to get list of table from spec so the specification is needed + # 1. Create TEMPORARY TABLE + cur.execute( + f""" + CREATE TEMPORARY TABLE {temp_table} ( + {','.join([f'{k} {v}' for k, v in field_types.items()])} + ) ON COMMIT DROP; + """ + ) + + # 2. Use COPY to load data from CSV (very fast, streaming) + with open(input_file, "r") as f: + cur.copy_expert( + f""" + COPY {temp_table} ({','.join([f'{k}' for k in field_types.keys()])}) + FROM STDIN WITH (FORMAT csv, HEADER, DELIMITER '|', FORCE_NULL( + {",".join(non_key_fields)} + )); + """, + f, + ) + + # 3. Delete matching rows from main table + cur.execute( + f""" + DELETE FROM issue + WHERE resource IN (SELECT resource FROM {temp_table}); + """ + ) + + # 4. Insert new rows from temp table into main table + cur.execute( + f""" + INSERT INTO issue ({','.join([f'{k}' for k in field_types.keys()])}) + SELECT {','.join([f'{k}' for k in field_types.keys()])} FROM {temp_table}; + """ + ) diff --git a/digital_land/package/postgis.py b/digital_land/package/postgis.py new file mode 100644 index 000000000..e69de29bb diff --git a/digital_land/package/postgres.py b/digital_land/package/postgres.py new file mode 100644 index 000000000..9a5119a94 --- /dev/null +++ b/digital_land/package/postgres.py @@ -0,0 +1,43 @@ +import logging + + +from .package import Package + + +from digital_land.utils.postgres_utils import get_pg_connection + + +logger = logging.getLogger(__name__) + + +class PostgresPackage(Package): + """ + A pckage class for data that is contained in a postgresql database. + """ + + def __init__(self, database_url=None, conn=None): + # TODO alter init to use parent propeties but need to figure out what to do about the spec + self.conn = None + self.database_url = database_url + + def connect(self): + """create a connection to the postgres database""" + + if self.conn: + logger.info("Connection already exists, skipping creation.") + return + + if not self.database_url: + raise ValueError( + "Database URL must be set in oder to connect to the database. Please set the 'database_url' attribute." + ) + + self.connection = get_pg_connection(self.database_url) + + def _get_conn(self): + """get the connection to the postgres database""" + if not self.conn: + raise ValueError( + "Connection not established. Please call connect() first or provide a connection" + ) + return self.conn diff --git a/digital_land/package/sqlite.py b/digital_land/package/sqlite.py index c9ad38669..f739a9231 100644 --- a/digital_land/package/sqlite.py +++ b/digital_land/package/sqlite.py @@ -343,3 +343,29 @@ def load_from_s3(self, bucket_name, object_key, table_name): self.load() # self.create_indexes()# Do we need this? self.disconnect() + + def export_table_to_csv(self, output_file, table_name): + """ + Function to export the fact table from the sqlite file + """ + # TODO add the export function + conn = sqlite3.connect(self.path) + cursor = conn.cursor() + + with open(output_file, "w", newline="") as f: + writer = csv.writer(f, delimiter="|") + + # Execute the query + cursor.execute(f"SELECT * FROM {table_name}") + + # Write header + writer.writerow([desc[0] for desc in cursor.description]) + + # Stream rows in chunks + while True: + rows = cursor.fetchmany(1000) # adjust chunk size as needed + if not rows: + break + writer.writerows(rows) + + conn.close() diff --git a/digital_land/utils/postgres_utils.py b/digital_land/utils/postgres_utils.py new file mode 100644 index 000000000..a66151191 --- /dev/null +++ b/digital_land/utils/postgres_utils.py @@ -0,0 +1,32 @@ +import psycopg2 +import pandas as pd +import urllib.parse as urlparse + + +def get_pg_connection(database_url, autocommit=False): + url = urlparse.urlparse(database_url) + database = url.path[1:] + user = url.username + password = url.password + host = url.hostname + port = url.port + connection = psycopg2.connect( + host=host, database=database, user=user, password=password, port=port + ) + + connection.autocommit = autocommit + + return connection + + +def get_df(table_name, conn): + """ + helper function to get a dataframe from a table + """ + cur = conn.cursor() + cur.execute(f"SELECT * FROM {table_name}") + rows = cur.fetchall() + columns = [desc[0] for desc in cur.description] + df = pd.DataFrame(rows, columns=columns) + cur.close() + return df diff --git a/setup.py b/setup.py index a051b7356..68a009a8e 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ def get_long_description(): "boto3", "moto", "psutil", + "psycopg2", ], entry_points={"console_scripts": ["digital-land=digital_land.cli:cli"]}, setup_requires=["pytest-runner"], @@ -79,6 +80,8 @@ def get_long_description(): "sphinx", "sphinx-autobuild", "sphinx_rtd_theme", + "testcontainers", + "SQLAlchemy-Utils", ] + maybe_black }, diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py new file mode 100644 index 000000000..217ab5d57 --- /dev/null +++ b/tests/integration/conftest.py @@ -0,0 +1,300 @@ +import pytest + +from testcontainers.postgres import PostgresContainer +from testcontainers.core.waiting_utils import wait_container_is_ready +from sqlalchemy_utils import database_exists, create_database, drop_database + +from digital_land.utils.postgres_utils import get_pg_connection + +# class PostgisContainer(PostgresContainer): +# def __init__(self, image="postgis/postgis:15-3.3"): +# super().__init__(image=image) + +postgis_container = PostgresContainer(image="postgis/postgis:15-3.3") + + +@pytest.fixture(scope="session") +def postgres(request): + postgis_container.start() + + def close_postgres(): + postgis_container.stop() + + request.addfinalizer(close_postgres) + wait_container_is_ready(postgis_container) + return postgis_container.get_connection_url() + + +def create_no_fk_schema(conn): + """ + Create the schema for the database without foreign keys + """ + cursor = conn.cursor() + # Create the table and load the data + cursor.execute("CREATE EXTENSION IF NOT EXISTS postgis") + cursor.execute( + """ + CREATE TABLE entity ( + entity bigint PRIMARY KEY, + name varchar, + entry_date varchar, + start_date varchar, + end_date varchar, + dataset varchar, + json varchar, + organisation_entity varchar, + prefix varchar, + reference varchar, + typology varchar, + geojson varchar, + geometry geometry null, + point varchar + )""" + ) + cursor.execute( + """ + CREATE TABLE old_entity ( + old_entity bigint, + entry_date varchar, + start_date varchar, + end_date varchar, + dataset varchar, + notes varchar, + status varchar, + entity varchar null + )""" + ) + + cursor.execute( + """ + CREATE TABLE dataset ( + dataset varchar PRIMARY KEY, + name varchar, + entry_date varchar, + start_date varchar, + end_date varchar, + collection varchar, + description varchar, + key_field varchar, + paint_options varchar, + plural varchar, + prefix varchar, + text varchar, + typology varchar, + wikidata varchar, + wikipedia varchar, + themes varchar, + attribution_id varchar, + licence_id varchar, + consideration varchar, + github_discussion varchar, + entity_minimum varchar, + entity_maximum varchar, + phase varchar, + realm varchar, + version varchar, + replacement_dataset varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE typology ( + typology varchar, + name varchar, + description varchar, + entry_date varchar, + start_date varchar, + end_date varchar, + plural varchar, + text varchar, + wikidata varchar, + wikipedia varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE organisation ( + organisation varchar, + name varchar, + combined_authority varchar, + entry_date varchar, + start_date varchar, + end_date varchar, + entity varchar, + local_authority_type varchar, + official_name varchar, + region varchar, + statistical_geography varchar, + website varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE dataset_collection ( + dataset_collection varchar, + resource varchar, + resource_end_date varchar, + resource_entry_date varchar, + last_updated varchar, + last_collection_attempt varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE dataset_publication ( + dataset_publication varchar, + expected_publisher_count varchar, + publisher_count varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE lookup ( + id varchar,entity varchar, + prefix varchar, + reference varchar, + entry_date varchar, + start_date varchar, + value varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE attribution ( + attribution varchar, + text varchar, + entry_date varchar, + start_date varchar, + end_date varchar + )""" + ) + + cursor.execute( + """ + CREATE TABLE licence ( + licence varchar, + text varchar, + entry_date varchar, + start_date varchar, + end_date varchar + )""" + ) + cursor.execute( + """ + CREATE TABLE fact_resource ( + rowid BIGSERIAL PRIMARY KEY, + fact VARCHAR(64) NOT NULL, + resource VARCHAR(64) NOT NULL, + entry_date DATE NOT NULL, + entry_number INTEGER NOT NULL, + priority INTEGER NOT NULL, + reference_entity BIGINT, + dataset VARCHAR(64) NOT NULL + ); + """ + ) + + cursor.execute( + """ + CREATE TABLE fact ( + fact VARCHAR(64) PRIMARY KEY, + entity INTEGER NOT NULL, + field VARCHAR(64) NOT NULL, + value TEXT NOT NULL, + entry_date DATE NOT NULL, + entry_number INTEGER NOT NULL, + resource VARCHAR(64) NOT NULL, + priority INTEGER, + reference_entity BIGINT, + dataset TEXT NOT NULL + ); + """ + ) + + cursor.execute( + """ + CREATE TABLE issue ( + rowid BIGSERIAL PRIMARY KEY, + entity BIGINT, + entry_date DATE, + entry_number INTEGER NOT NULL, + field VARCHAR(64), + issue_type VARCHAR(64), + line_number INTEGER, + dataset VARCHAR(64) NOT NULL, + resource VARCHAR(64) NOT NULL, + value TEXT, + message TEXT + ) + """ + ) + + cursor.close() + + +@pytest.fixture(scope="function") +def platform_db_function_url(postgres, request): + # instead of using db add some db name to the end of the string + db_conn_url = postgres + "function" + + # create db + if database_exists(db_conn_url): + drop_database(db_conn_url) + + create_database(db_conn_url) + db_conn = get_pg_connection(db_conn_url) + create_no_fk_schema(db_conn) + db_conn.commit() + db_conn.close() + + def close_db(): + if database_exists(db_conn_url): + drop_database(db_conn_url) + + # add finalizer to close db + request.addfinalizer(close_db) + return db_conn_url + + +@pytest.fixture(scope="session") +def platform_db_session_url(postgres, request): + """ + Fixture to be used to create a database that lasts the whole session, conn shouldn't be commited in tests + """ + # instead of using db add some db name to the end of the string + db_conn_url = postgres + "session" + + # create db + if database_exists(db_conn_url): + drop_database(db_conn_url) + + create_database(db_conn_url) + db_conn = get_pg_connection(db_conn_url) + create_no_fk_schema(db_conn) + db_conn.commit() + db_conn.close() + + def close_db(): + if database_exists(db_conn_url): + drop_database(db_conn_url) + + # add finalizer to close db + request.addfinalizer(close_db) + return db_conn_url + + +@pytest.fixture(scope="function") +def platfom_db_session_conn(platform_db_session_url): + """ + Fixture to get a connection to the session database that lasts for a function + """ + conn = get_pg_connection(platform_db_session_url) + yield conn + conn.rollback() + conn.close() diff --git a/tests/integration/package/test_dataset.py b/tests/integration/package/test_dataset.py index 2dd519e52..b2de66ebf 100644 --- a/tests/integration/package/test_dataset.py +++ b/tests/integration/package/test_dataset.py @@ -3,12 +3,10 @@ import os import urllib.request import pandas as pd -from pathlib import Path import sqlite3 from digital_land.package.dataset import DatasetPackage from digital_land.package.package import Specification -from digital_land.organisation import Organisation @pytest.fixture @@ -115,9 +113,6 @@ def test_load_old_entities_entities_outside_of_range_are_removed(tmp_path): } specification = Specification(schema=schema, field=field) - organisation = Organisation( - organisation_path=None, pipeline_dir=None, organisation={} - ) # write data to csv as we only seem to load from csv data = [ @@ -153,7 +148,6 @@ def test_load_old_entities_entities_outside_of_range_are_removed(tmp_path): # create class on sqlite db with old_entity table in it package = DatasetPackage( "conservation-area", - organisation=organisation, path=sqlite3_path, specification=specification, ) @@ -179,7 +173,6 @@ def test_load_old_entities_entities_outside_of_range_are_removed(tmp_path): def test_entry_date_upsert_uploads_newest_date( specification_dir, - organisation_csv, blank_patch_csv, transformed_fact_resources, tmp_path, @@ -187,13 +180,8 @@ def test_entry_date_upsert_uploads_newest_date( dataset = "conservation-area" sqlite3_path = os.path.join(tmp_path, f"{dataset}.sqlite3") - organisation = Organisation( - organisation_path=organisation_csv, - pipeline_dir=Path(os.path.dirname(blank_patch_csv)), - ) package = DatasetPackage( "conservation-area", - organisation=organisation, path=sqlite3_path, specification_dir=specification_dir, # TBD: package should use this specification object ) @@ -289,9 +277,6 @@ def test_load_issues_uploads_issues_from_csv(tmp_path): } specification = Specification(schema=schema, field=field) - organisation = Organisation( - organisation_path=None, pipeline_dir=None, organisation={} - ) # write data to csv as we only seem to load from csv data = [ @@ -321,7 +306,6 @@ def test_load_issues_uploads_issues_from_csv(tmp_path): # create class on sqlite db with old_entity table in it package = DatasetPackage( "conservation-area", - organisation=organisation, path=sqlite3_path, specification=specification, ) @@ -346,7 +330,6 @@ def test_load_issues_uploads_issues_from_csv(tmp_path): def test_entry_date_upsert_uploads_blank_fields( specification_dir, - organisation_csv, blank_patch_csv, transformed_fact_resources_with_blank, tmp_path, @@ -354,13 +337,8 @@ def test_entry_date_upsert_uploads_blank_fields( dataset = "conservation-area" sqlite3_path = os.path.join(tmp_path, f"{dataset}.sqlite3") - organisation = Organisation( - organisation_path=organisation_csv, - pipeline_dir=Path(os.path.dirname(blank_patch_csv)), - ) package = DatasetPackage( "conservation-area", - organisation=organisation, path=sqlite3_path, specification_dir=specification_dir, # TBD: package should use this specification object ) @@ -411,7 +389,6 @@ def test_entry_date_upsert_uploads_blank_fields( def test_insert_newest_date( specification_dir, - organisation_csv, blank_patch_csv, transformed_fact_resources, transformed_fact_resources_with_blank, @@ -420,13 +397,8 @@ def test_insert_newest_date( dataset = "conservation-area" sqlite3_path = os.path.join(tmp_path, f"{dataset}.sqlite3") - organisation = Organisation( - organisation_path=organisation_csv, - pipeline_dir=Path(os.path.dirname(blank_patch_csv)), - ) package = DatasetPackage( "conservation-area", - organisation=organisation, path=sqlite3_path, specification_dir=specification_dir, # TBD: package should use this specification object ) diff --git a/tests/integration/package/test_platform.py b/tests/integration/package/test_platform.py new file mode 100644 index 000000000..222e13556 --- /dev/null +++ b/tests/integration/package/test_platform.py @@ -0,0 +1,271 @@ +import pytest +import pandas as pd + +from psycopg2.extras import execute_values + +from digital_land.package.platform import PlatformPackage +from digital_land.utils.postgres_utils import get_df + + +def store_as_csv(data, csv_path): + """ + helper function to store data in a dictionary as a csv + """ + df = pd.DataFrame.from_dict(data) + df.to_csv(csv_path, sep="|", index=False) + + +def insert_df(df, table_name, conn): + """ + helper function to insert a dataframe into a table + """ + cur = conn.cursor() + execute_values( + cur, + f"INSERT INTO {table_name} ({','.join(list(df.columns))}) VALUES %s", + df.values.tolist(), + ) + cur.close() + + +@pytest.mark.parametrize( + "data", + [ + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": ["2"], + "priority": ["1"], + "reference_entity": ["122"], + "dataset": ["article-4-direction"], + } + ], +) +def test_load_fact_resource_no_data_in_db(data, tmp_path, platfom_db_session_conn): + """ + test function to check the function loading fact_resource data into the db + """ + + conn = platfom_db_session_conn + + csv_path = tmp_path / "fact_resource.csv" + input_df = pd.DataFrame.from_dict(data) + store_as_csv(data, csv_path) + + # create platform object + platform = PlatformPackage() + + platform.update_fact_resource(input_file=csv_path, conn=conn) + + output_df = get_df("fact_resource", conn) + assert len(output_df) > 0 + assert len(input_df) == len(output_df) + + +@pytest.mark.parametrize( + "data", + [ + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": ["2"], + "priority": ["1"], + "reference_entity": ["122"], + "dataset": ["article-4-direction"], + } + ], +) +def test_load_fact_resource_same_file_twice(data, tmp_path, platfom_db_session_conn): + """ + test function to check the function loading fact_resource data into the db + speccifically if the exact same data then tthe data should be overwritten + """ + + conn = platfom_db_session_conn + + csv_path = tmp_path / "fact_resource.csv" + input_df = pd.DataFrame.from_dict(data) + store_as_csv(data, csv_path) + + # create platform object + platform = PlatformPackage() + + platform.update_fact_resource(input_file=csv_path, conn=conn) + + platform.update_fact_resource(input_file=csv_path, conn=conn) + + output_df = get_df("fact_resource", conn) + + assert len(output_df) > 0 + assert len(input_df) == len(output_df) + + +@pytest.mark.parametrize( + "data", + [ + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "entity": [1], + "field": ["name"], + "value": ["test"], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": [3], + "priority": [1], + "reference_entity": [122], + "dataset": ["article-4-direction"], + } + ], +) +def test_load_fact_no_data_in_db(data, tmp_path, platfom_db_session_conn): + """ + test function to check the function loading fact_resource data into the db + """ + + # need to add a row into fact resourcce for the values + conn = platfom_db_session_conn + + # fact_resource_datta + fact_resourcce_df = pd.DataFrame.from_dict( + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": ["2"], + "priority": ["1"], + "reference_entity": ["122"], + "dataset": ["article-4-direction"], + } + ) + + csv_path = tmp_path / "fact_resource.csv" + store_as_csv(fact_resourcce_df, csv_path) + + platform = PlatformPackage() + platform.update_fact_resource(input_file=csv_path, conn=conn) + + fact_csv_path = tmp_path / "fact.csv" + input_df = pd.DataFrame.from_dict(data) + store_as_csv(data, fact_csv_path) + + platform.update_fact(input_file=fact_csv_path, conn=conn) + + fact_resource_output_df = get_df("fact_resource", conn) + output_df = get_df("fact", conn) + assert len(output_df) > 0 + assert len(input_df) == len(output_df) + + # some fact specific checks we need to make + # there should be no facts in the fact table which are not in the fact resource table + for fact in output_df["fact"]: + assert ( + fact in fact_resource_output_df["fact"].values + ), f"fact {fact} is not in fat resource" + # there should be no fact resource combination in fact which isn't in the fact resource table + # this is aiming to protect us from facts being emoved from rresources but still existing in another + # get a set of pairs from fact resource + existing_pairs = set( + zip(fact_resource_output_df["fact"], fact_resource_output_df["resource"]) + ) + + # get a set of pairs from fact + fact_pairs = set(zip(output_df["fact"], output_df["resource"])) + + # check all fact pairs are in + for pair in fact_pairs: + assert ( + pair in existing_pairs + ), f"fact {pair[0]} and resource {pair[1]} is not in fact resource" + + # test to write + # facts are removed if they are not in the fact resource table + # replacing facts from the same resource + # removing a resource + + +@pytest.mark.parametrize( + "fact_data", + [ + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "entity": [1], + "field": ["name"], + "value": ["test"], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": [3], + "priority": [1], + "reference_entity": [122], + "dataset": ["article-4-direction"], + } + ], +) +def test_update_entity_updates_from_facts(platfom_db_session_conn, fact_data, tmp_path): + """ + test function to check the function loading fact_resource data into the db + """ + + # need to add a row into fact resourcce for the values + conn = platfom_db_session_conn + + # Write fact data straight into db, avoids needing anything in the fact_resouce table + # ather than using the update_fact + fact_input_df = pd.DataFrame.from_dict(fact_data) + insert_df(fact_input_df, "fact", conn) + + entity_df = fact_input_df[["entity"]].drop_duplicates() + store_as_csv(entity_df, tmp_path / "entity.csv") + + # auto get the entities from fact imports + platform = PlatformPackage() + platform.update_entity(conn=conn, entity_csv=tmp_path / "entity.csv") + + entity_output_df = get_df("entity", conn) + assert len(entity_output_df) > 0 + assert len(entity_output_df) == len(set(fact_input_df["entity"])) + # check that any field values make it into the final df + # test the name is corect + for entity in entity_output_df["entity"]: + assert entity in list(fact_input_df["entity"]) + + row = entity_output_df[entity_output_df["entity"] == entity].iloc[0].to_dict() + + # this can be used for several fields that aren't in json + for col in ["name", "entry_date", "start_date", "end_date", "refeence"]: + col_value = row.get(col) + if col_value: + # check that the name is in the facts going in otherwise whee's it come fom + # could expand to ode aswell + assert col_value in list( + fact_input_df[ + (fact_input_df["entity"] == entity) + & (fact_input_df["field"] == col) + ]["value"] + ) + else: + # if name is null then there should be no facts + assert col not in list( + fact_input_df[fact_input_df["entity"] == entity]["field"] + ) diff --git a/tests/integration/test_commands.py b/tests/integration/test_commands.py index 01118495e..d3dd6e3c8 100644 --- a/tests/integration/test_commands.py +++ b/tests/integration/test_commands.py @@ -11,11 +11,17 @@ from moto import mock_aws import logging +from digital_land.utils.postgres_utils import get_pg_connection, get_df + from digital_land.package.dataset import DatasetPackage from digital_land.package.package import Specification from digital_land.organisation import Organisation from digital_land.collect import Collector +from digital_land.commands import ( + load_pipeline_provenance, +) + """ dataset_create & dataset_update """ @@ -304,7 +310,7 @@ def test_dataset_create_fixture( ) package = DatasetPackage( dataset, - organisation=organisation, + # organisation=organisation, path=sqlite3_path, specification_dir=specification_dir, # TBD: package should use this specification object ) @@ -319,7 +325,7 @@ def test_dataset_create_fixture( package.load_dataset_resource( resource_files_fixture["dataset_resource_dir"] / Path(path).name ) - package.load_entities() + package.load_entities(organisations=organisation.organisation) old_entity_path = os.path.join( resource_files_fixture["pipeline_dir"], "old-entity.csv" @@ -381,7 +387,7 @@ def test_dataset_update_fixture( ) package = DatasetPackage( dataset, - organisation=organisation, + # organisation=organisation, path=sqlite3_path, specification_dir=specification_dir, # TBD: package should use this specification object ) @@ -402,7 +408,7 @@ def test_dataset_update_fixture( package.load_dataset_resource( resource_files_fixture["dataset_resource_dir"] / Path(path).name ) - package.load_entities() + package.load_entities(organisations=organisation.organisation) old_entity_path = os.path.join( resource_files_fixture["pipeline_dir"], "old-entity_updated.csv" @@ -590,3 +596,160 @@ def test_collection_dir_file_hashes(temp_dir, caplog): assert any( expected in record.message for record in caplog.records ), f"Missing log: {expected}" + + +@pytest.fixture() +def sqlite_db(tmp_path): + """ + fixture to create a sqlite db + """ + db_path = tmp_path / "test.sqlite3" + conn = sqlite3.connect(db_path) + conn.execute( + """ + CREATE TABLE IF NOT EXISTS fact_resource ( + fact TEXT, + resource TEXT, + entry_date TEXT, + entry_number TEXT, + priority TEXT, + reference_entity TEXT, + dataset TEXT + ) + """ + ) + conn.execute( + """ + CREATE TABLE fact ( + fact VARCHAR(64) PRIMARY KEY, + entity INTEGER NOT NULL, + field VARCHAR(64) NOT NULL, + value TEXT NOT NULL, + entry_date DATE NOT NULL, + entry_number INTEGER NOT NULL, + resource VARCHAR(64) NOT NULL, + priority INTEGER, + reference_entity BIGINT, + dataset TEXT NOT NULL + ); + """ + ) + conn.execute( + """ + CREATE TABLE issue ( + entity BIGINT, + entry_date DATE, + entry_number INTEGER NOT NULL, + field VARCHAR(64), + issue_type VARCHAR(64), + line_number INTEGER, + dataset VARCHAR(64) NOT NULL, + resource VARCHAR(64) NOT NULL, + value TEXT, + message TEXT + ) + """ + ) + conn.commit() + conn.close() + return db_path + + +# big old test it runs the overarching function probably don't need tto test this too many times +# focus on tests for individual functions to ensure they're doing the right thing +@pytest.mark.parametrize( + "fact_data, fact_resource_data, issue_data", + [ + ( + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "entity": [1], + "field": ["name"], + "value": ["test"], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": [3], + "priority": [1], + "reference_entity": [122], + "dataset": ["article-4-direction"], + }, + { + "fact": [ + "dfdbd3153e0e8529c16a623f10c7770ca0e8a267465a22051c162c1fcd5413fc" + ], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "entry_date": ["2025-01-01"], + "entry_number": ["2"], + "priority": ["1"], + "reference_entity": ["122"], + "dataset": ["article-4-direction"], + }, + { + "entity": [1], + "entry_date": ["2025-01-01"], + "entry_number": [1], + "field": ["test"], + "issue_type": ["test"], + "line_number": [2], + "dataset": ["test"], + "resource": [ + "e51e6f8fd1cadb696d5ca229236b84f2ec67afdea6aad03fc81142b04c67adbd" + ], + "value": ["test"], + "message": ["test"], + }, + ) + ], +) +def test_load_pipeline_provenance_into_empty_db( + fact_data, + fact_resource_data, + issue_data, + sqlite_db, + platform_db_function_url, + specification_dir, +): + # need to load data into sqlite db + conn = sqlite3.connect(sqlite_db) + + # load fact_data + fact_input_df = pd.DataFrame.from_dict(fact_data) + fact_input_df.to_sql("fact", conn, if_exists="append", index=False) + # load fact_resource_data + fact_resource_input_df = pd.DataFrame.from_dict(fact_resource_data) + fact_resource_input_df.to_sql( + "fact_resource", conn, if_exists="append", index=False + ) + + # load issue_data + issue_input_df = pd.DataFrame.from_dict(issue_data) + issue_input_df.to_sql("issue", conn, if_exists="append", index=False) + # close connection + conn.close() + + # pass to function + load_pipeline_provenance( + sqlite_path=sqlite_db, + database_url=platform_db_function_url, + specification_path=specification_dir, + ) + + # TODO could add the get_df function to the postgres package + conn = get_pg_connection(platform_db_function_url) + entity_output_df = get_df("entity", conn) + assert len(entity_output_df) > 0 + + fact_output_df = get_df("fact", conn) + assert len(fact_output_df) > 0 + + fact_resource_output_df = get_df("fact_resource", conn) + assert len(fact_resource_output_df) > 0 + + issue_output_df = get_df("issue", conn) + assert len(issue_output_df) > 0