Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions digital_land/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
organisation_check,
save_state,
add_data,
load_pipeline_provenance,
)

from digital_land.command_arguments import (
Expand Down Expand Up @@ -825,3 +826,24 @@ def check_state_cmd(
if diffs:
print(f"State differs from {state_path} - {', '.join(diffs)}")
sys.exit(1)


@cli.command(
"check-state",
short_help="compare the current state against a stated file. Returns with a non-zero return code if they differ.",
)
@click.option(
"--sqlite-path",
type=click.Path(exists=True),
help="file path to a sqlite database containing the dataset provenance data",
)
@click.option(
"--database-url",
type=click.STRING,
help="the database connection url to load data into, schema is assumed to exist",
)
def load_pipeline_provenance_cli(sqlite_path, database_url):
"""
A command line interface to load the pipeline provenance data from a sqlite file into a database.
"""
load_pipeline_provenance(sqlite_path, database_url)
57 changes: 48 additions & 9 deletions digital_land/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from digital_land.package.dataset import DatasetPackage
from digital_land.package.dataset_parquet import DatasetParquetPackage
from digital_land.package.platform import PlatformPackage
from digital_land.phase.combine import FactCombinePhase
from digital_land.phase.concat import ConcatFieldPhase
from digital_land.phase.convert import ConvertPhase, execute
Expand Down Expand Up @@ -79,6 +80,7 @@

from .register import hash_value
from .utils.gdal_utils import get_gdal_version
from .utils.postgres_utils import get_pg_connection

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -440,15 +442,9 @@ def dataset_create(
print("missing output path", file=sys.stderr)
sys.exit(2)

# Set up initial objects
organisation = Organisation(
organisation_path=organisation_path, pipeline_dir=Path(pipeline.path)
)

# create sqlite dataset packageas before and load inn data that isn't in the parquetpackage yet
package = DatasetPackage(
dataset,
organisation=organisation,
path=output_path,
specification_dir=None, # TBD: package should use this specification object
)
Expand Down Expand Up @@ -557,7 +553,6 @@ def dataset_update(
if not dataset_path:
package = DatasetPackage(
dataset,
organisation=organisation,
path=output_path,
specification_dir=None, # TBD: package should use this specification object
)
Expand All @@ -572,7 +567,6 @@ def dataset_update(
logging.info(f"Reading from local dataset file {dataset_path}")
package = DatasetPackage(
dataset,
organisation=organisation,
path=dataset_path,
specification_dir=None, # TBD: package should use this specification object
)
Expand All @@ -585,7 +579,7 @@ def dataset_update(
package.load_transformed(path)
package.load_column_fields(column_field_dir / dataset / path_obj.name)
package.load_dataset_resource(dataset_resource_dir / dataset / path_obj.name)
package.load_entities()
package.load_entities(organisations=organisation.organisation)

old_entity_path = os.path.join(pipeline.path, "old-entity.csv")
if os.path.exists(old_entity_path):
Expand Down Expand Up @@ -1723,3 +1717,48 @@ def check_and_assign_entities(
):
return False
return True


def load_pipeline_provenance(sqlite_path: Path, database_url, specification_path):
"""
Load dataset provenance from the sqlite dataset package into the platfom. updates values based on what's coming in.
:param sqlite_path: Path to the sqlite database file which contains povenance files realted to the datasets
:param database_url: URL of the postgis database.
"""
# TODO link to spec, how do we do this? what should do? why?
# download spec if needed
# spec = Specification(path=specification_dir,load_on_init=False)
# spec.download(overwrite=False)
# spec.load()
fact_csv_path = sqlite_path.parent / "fact.csv"
fact_resource_csv_path = sqlite_path.parent / "fact_resource.csv"
issue_csv_path = sqlite_path.parent / "issue.csv"
entity_csv_path = sqlite_path.parent / "entity.csv"

conn = get_pg_connection(database_url)

# extract CSVs from the sqlite database
dataset_package = DatasetPackage(
path=sqlite_path, dataset="test", specification_dir=specification_path
)

dataset_package.export_table_to_csv(fact_resource_csv_path, "fact_resource")
dataset_package.export_table_to_csv(fact_csv_path, "fact")
dataset_package.export_table_to_csv(issue_csv_path, "issue")

# entities contained in the fact
dataset_package.export_fact_entities_to_csv(entity_csv_path)

# establish platform package
platform = PlatformPackage(database_url=database_url)
# load the actual data
platform.update_fact_resource(fact_resource_csv_path, conn)
platform.update_fact(fact_csv_path, conn)
platform.update_entity(entity_csv=entity_csv_path, conn=conn)

# load othe povinance tables
platform.update_issues(issue_csv_path, conn)

# TODO expannd with other provenance produced by the pipeline phase
conn.commit()
conn.close()
44 changes: 35 additions & 9 deletions digital_land/package/dataset.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import csv
import json
import logging
import sqlite3
from decimal import Decimal

import shapely.wkt
Expand Down Expand Up @@ -32,13 +33,12 @@


class DatasetPackage(SqlitePackage):
def __init__(self, dataset, organisation, **kwargs):
def __init__(self, dataset, **kwargs):
super().__init__(dataset, tables=tables, indexes=indexes, **kwargs)
self.dataset = dataset
self.entity_fields = self.specification.schema["entity"]["fields"]
self.organisations = organisation.organisation

def migrate_entity(self, row):
def migrate_entity(self, row, organisations):
dataset = self.dataset
entity = row.get("entity", "")

Expand All @@ -55,7 +55,7 @@ def migrate_entity(self, row):

# hack until FactReference is reliable ..
if not row.get("organisation-entity", ""):
row["organisation-entity"] = self.organisations.get(
row["organisation-entity"] = organisations.get(
row.get("organisation", ""), {}
).get("entity", "")

Expand Down Expand Up @@ -118,9 +118,9 @@ def entity_row(self, facts):
row[fact[1]] = fact[2]
return row

def insert_entity(self, facts):
def insert_entity(self, facts, organisations):
row = self.entity_row(facts)
row = self.migrate_entity(row)
row = self.migrate_entity(row, organisations=organisations)
if row:
self.insert("entity", self.entity_fields, row)

Expand All @@ -147,7 +147,7 @@ def load_old_entities(self, path):
self.commit()
self.disconnect()

def load_entities(self):
def load_entities(self, organisations):
"""load the entity table from the fact table"""
self.connect()
self.create_cursor()
Expand All @@ -163,13 +163,13 @@ def load_entities(self):
# If facts and fact does not point to same entity as first fact
if facts and fact[0] != facts[0][0]:
# Insert existing facts
self.insert_entity(facts)
self.insert_entity(facts, organisations)
# Reset facts list for new entity
facts = []
facts.append(fact)

if facts:
self.insert_entity(facts)
self.insert_entity(facts, organisations)

self.commit()
self.disconnect()
Expand Down Expand Up @@ -289,5 +289,31 @@ def load_transformed(self, path):
self.commit()
self.disconnect()

def export_fact_entities_to_csv(self, output_path):
"""
Function to export a distinct list of entities from the fact table to a CSV file.
"""
# TODO add the export function
conn = sqlite3.connect(self.path)
cursor = conn.cursor()

with open(output_path, "w", newline="") as f:
writer = csv.writer(f, delimiter="|")

# Execute the query
cursor.execute("SELECT DISTINCT entity FROM fact")

# Write header
writer.writerow([desc[0] for desc in cursor.description])

# Stream rows in chunks
while True:
rows = cursor.fetchmany(1000) # adjust chunk size as needed
if not rows:
break
writer.writerows(rows)

conn.close()

def load(self):
pass
4 changes: 4 additions & 0 deletions digital_land/package/package.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def load(self):


class Package:
"""
A base package assigning values that are needed by any package
"""

def __init__(
self,
datapackage,
Expand Down
Loading