Skip to content

Commit 43d6a9b

Browse files
committed
feat: add envidat data connectors support
1 parent 038cfdd commit 43d6a9b

File tree

6 files changed

+219
-18
lines changed

6 files changed

+219
-18
lines changed

components/renku_data_services/data_connectors/blueprints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def post(self) -> BlueprintFactoryResponse:
9999
async def _post(
100100
_: Request, user: base_models.APIUser, body: apispec.DataConnectorPost, validator: RCloneValidator
101101
) -> JSONResponse:
102-
data_connector = validate_unsaved_data_connector(body, validator=validator)
102+
data_connector = await validate_unsaved_data_connector(body, validator=validator)
103103
result = await self.data_connector_repo.insert_namespaced_data_connector(
104104
user=user, data_connector=data_connector
105105
)
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
"""Constants for data connectors."""
2+
3+
from typing import Final
4+
5+
from renku_data_services.storage.constants import ENVIDAT_V1_PROVIDER
6+
7+
ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS: Final[list[str]] = ["doi", ENVIDAT_V1_PROVIDER]

components/renku_data_services/data_connectors/core.py

Lines changed: 86 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from html.parser import HTMLParser
77
from typing import Any
88

9+
import httpx
910
from pydantic import ValidationError as PydanticValidationError
1011

1112
from renku_data_services import base_models, errors
@@ -14,9 +15,11 @@
1415
NamespacePath,
1516
ProjectPath,
1617
)
17-
from renku_data_services.data_connectors import apispec, models
18+
from renku_data_services.data_connectors import apispec, models, schema_org_dataset
19+
from renku_data_services.data_connectors.constants import ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS
1820
from renku_data_services.data_connectors.doi.metadata import get_dataset_metadata
1921
from renku_data_services.storage import models as storage_models
22+
from renku_data_services.storage.constants import ENVIDAT_V1_PROVIDER
2023
from renku_data_services.storage.rclone import RCloneValidator
2124

2225

@@ -41,7 +44,7 @@ def dump_storage_with_sensitive_fields(
4144
return body
4245

4346

44-
def validate_unsaved_storage(
47+
async def validate_unsaved_storage(
4548
storage: apispec.CloudStorageCorePost | apispec.CloudStorageUrlV2, validator: RCloneValidator
4649
) -> models.CloudStorageCore:
4750
"""Validate the storage configuration of an unsaved data connector."""
@@ -59,6 +62,10 @@ def validate_unsaved_storage(
5962
)
6063
configuration = cloud_storage.configuration.config
6164
source_path = cloud_storage.source_path
65+
elif storage.storage_type == ENVIDAT_V1_PROVIDER:
66+
converted_storage = await convert_envidat_v1_data_connector_to_s3(storage)
67+
configuration = converted_storage.configuration
68+
source_path = converted_storage.source_path
6269
else:
6370
configuration = storage.configuration
6471
source_path = storage.source_path
@@ -74,13 +81,13 @@ def validate_unsaved_storage(
7481
)
7582

7683

77-
def validate_unsaved_data_connector(
84+
async def validate_unsaved_data_connector(
7885
body: apispec.DataConnectorPost, validator: RCloneValidator
7986
) -> models.UnsavedDataConnector:
8087
"""Validate an unsaved data connector."""
8188

8289
keywords = [kw.root for kw in body.keywords] if body.keywords is not None else []
83-
storage = validate_unsaved_storage(body.storage, validator=validator)
90+
storage = await validate_unsaved_storage(body.storage, validator=validator)
8491

8592
if body.namespace is None:
8693
raise NotImplementedError("Missing namespace not supported")
@@ -113,20 +120,37 @@ async def prevalidate_unsaved_global_data_connector(
113120
) -> models.UnsavedGlobalDataConnector:
114121
"""Pre-validate an unsaved data connector."""
115122

116-
storage = validate_unsaved_storage(body.storage, validator=validator)
123+
storage = await validate_unsaved_storage(body.storage, validator=validator)
117124
# TODO: allow admins to create global data connectors, e.g. s3://giab
118-
if storage.storage_type != "doi":
119-
raise errors.ValidationError(message="Only doi storage type is allowed for global data connectors")
125+
if storage.storage_type not in ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS:
126+
raise errors.ValidationError(
127+
message=f"Only {ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS} storage type is allowed for global data connectors"
128+
)
120129
if not storage.readonly:
121130
raise errors.ValidationError(message="Global data connectors must be read-only")
122131

123-
rclone_metadata = await validator.get_doi_metadata(configuration=storage.configuration)
124-
125-
doi_uri = f"doi:{rclone_metadata.doi}"
126-
slug = base_models.Slug.from_name(doi_uri).value
127-
128-
# Override provider in storage config
129-
storage.configuration["provider"] = rclone_metadata.provider
132+
match storage.storage_type:
133+
case "doi":
134+
rclone_metadata = await validator.get_doi_metadata(configuration=storage.configuration)
135+
136+
doi_uri = f"doi:{rclone_metadata.doi}"
137+
slug = base_models.Slug.from_name(doi_uri).value
138+
139+
# Override provider in storage config
140+
storage.configuration["provider"] = rclone_metadata.provider
141+
case x if x == ENVIDAT_V1_PROVIDER:
142+
if not isinstance(body.storage, apispec.CloudStorageCorePost):
143+
raise errors.ValidationError()
144+
doi = body.storage.configuration.get("doi")
145+
if not doi:
146+
raise errors.ValidationError()
147+
doi_uri = f"doi:{doi}"
148+
slug = base_models.Slug.from_name(doi_uri).value
149+
case x:
150+
raise errors.ValidationError(
151+
message=f"Only {ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS} storage type is allowed "
152+
"for global data connectors"
153+
)
130154

131155
return models.UnsavedGlobalDataConnector(
132156
name=doi_uri,
@@ -155,8 +179,11 @@ async def validate_unsaved_global_data_connector(
155179
)
156180

157181
# Fetch DOI metadata
158-
rclone_metadata = await validator.get_doi_metadata(configuration=data_connector.storage.configuration)
159-
metadata = await get_dataset_metadata(rclone_metadata=rclone_metadata)
182+
if data_connector.storage.storage_type == "doi":
183+
rclone_metadata = await validator.get_doi_metadata(configuration=data_connector.storage.configuration)
184+
metadata = await get_dataset_metadata(rclone_metadata=rclone_metadata)
185+
else:
186+
metadata = None
160187

161188
name = data_connector.name
162189
description = ""
@@ -325,3 +352,46 @@ def text(self) -> str:
325352

326353
def handle_data(self, data: str) -> None:
327354
self._text += data
355+
356+
357+
async def convert_envidat_v1_data_connector_to_s3(
358+
payload: apispec.CloudStorageCorePost,
359+
) -> apispec.CloudStorageCorePost:
360+
"""Converts a doi-like configuration for Envidat to S3.
361+
362+
If the paylaod that is passed in is not of the expected type nothing is changed
363+
and the same payload that was passed in is returned.
364+
"""
365+
config = payload.configuration
366+
if config.get("type") != ENVIDAT_V1_PROVIDER:
367+
return payload
368+
369+
doi = config.get("doi")
370+
if not isinstance(doi, str):
371+
raise errors.ValidationError()
372+
if len(doi) == 0:
373+
raise errors.ValidationError()
374+
doi = doi.removeprefix("https://")
375+
doi = doi.removeprefix("http://")
376+
377+
new_config = payload.model_copy(deep=True)
378+
new_config.configuration = {}
379+
380+
envidat_url = "https://envidat.ch/converters-api/internal-dataset/convert/jsonld"
381+
query_params = {"query": doi}
382+
headers = {"accept": "application/json"}
383+
384+
clnt = httpx.AsyncClient(follow_redirects=True)
385+
async with clnt:
386+
res = await clnt.get(envidat_url, params=query_params, headers=headers)
387+
if res.status_code != 200:
388+
raise errors.ProgrammingError()
389+
dataset = schema_org_dataset.Dataset.model_validate_strings(res.text)
390+
s3_config = schema_org_dataset.get_rclone_config(
391+
dataset,
392+
schema_org_dataset.DatasetProvider.envidat,
393+
)
394+
new_config.configuration = dict(s3_config.rclone_config)
395+
new_config.source_path = s3_config.path
396+
new_config.storage_type = "s3"
397+
return new_config
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
"""This is used by envidat and scicat to provide information about their datasets."""
2+
3+
from dataclasses import dataclass
4+
from enum import StrEnum
5+
from urllib.parse import parse_qs, urlparse
6+
7+
from pydantic import BaseModel, ConfigDict, Field
8+
9+
from renku_data_services.errors import errors
10+
11+
12+
class Distribution(BaseModel):
13+
"""The distribution field of a schema.org dataset."""
14+
15+
model_config = ConfigDict(extra="ignore")
16+
type: str
17+
contentUrl: str
18+
name: str
19+
20+
21+
class Dataset(BaseModel):
22+
"""A very limited and partial spec of a schema.org Dataset used by Scicat and Envidat."""
23+
24+
model_config = ConfigDict(extra="ignore")
25+
distribution: list[Distribution] = Field(default_factory=list)
26+
27+
28+
class DatasetProvider(StrEnum):
29+
"""The provider for the dataset."""
30+
31+
envidat = "envidat"
32+
33+
34+
@dataclass
35+
class S3Config:
36+
"""Configuration for a location on S3 storage."""
37+
38+
rclone_config: dict[str, str]
39+
bucket: str
40+
prefix: str
41+
42+
@property
43+
def path(self) -> str:
44+
"""Return the path including the bucket name and the prefix."""
45+
return f"{self.bucket}/{self.prefix}"
46+
47+
48+
def get_rclone_config(dataset: Dataset, provider: DatasetProvider) -> S3Config:
49+
"""Parse the dataset into an rclone configuration."""
50+
match provider:
51+
case DatasetProvider.envidat:
52+
return __get_rclone_s3_config_envidat(dataset)
53+
# TODO: Add scicat here
54+
case x:
55+
raise errors.ValidationError(message=f"Got an unknown dataset provider {x}")
56+
57+
58+
def __get_rclone_s3_config_envidat(dataset: Dataset) -> S3Config:
59+
"""Get the S3 rclone configuration and source path from a dataset returned by envidat."""
60+
# NOTE: The folks from Envidat assure us that the first entity in the list is the one we want
61+
url = dataset.distribution[0].contentUrl
62+
# NOTE: The folks from Envidat assure us that the URL has the following format
63+
# http://<bucket-name>.<s3 domain>/?prefix=<path to files>
64+
url_parsed = urlparse(url)
65+
if not url_parsed.scheme:
66+
raise errors.ValidationError(message="A scheme like http or https is needed for the S3 url.")
67+
if not url_parsed.netloc:
68+
raise errors.ValidationError(message="A hostname is needed for the S3 url.")
69+
if not url_parsed.query:
70+
raise errors.ValidationError(message="A query parameter with the path is needed for the S3 url.")
71+
query_params = parse_qs(url_parsed.query)
72+
prefix_list = query_params.get("prefix")
73+
if prefix_list is None or len(prefix_list) == 0:
74+
raise errors.ValidationError(message="The query paramter in the S3 url should container the 'prefix' key.")
75+
prefix = prefix_list[0]
76+
host_split = url_parsed.netloc.split(".")
77+
if len(host_split) < 2:
78+
raise errors.ValidationError(
79+
message="The envidat s3 url is expected to have a host name with at least two parts."
80+
)
81+
s3_host = ".".join(host_split[1:])
82+
bucket = host_split[0]
83+
prefix = "/" + prefix.strip("/")
84+
return S3Config(
85+
{
86+
"type": "s3",
87+
"provider": "Other",
88+
"endpoint": f"{url_parsed.scheme}://{s3_host}",
89+
},
90+
bucket.strip("/"),
91+
prefix,
92+
)
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
"""Constants for storage."""
2+
3+
from typing import Final
4+
5+
ENVIDAT_V1_PROVIDER: Final[str] = "envidat_v1"
6+
SCICAT_V1_PROVIDER: Final[str] = "scicat_v1"

components/renku_data_services/storage/rclone_patches.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
"""Patches to apply to phe rclone storage schema."""
22

3+
from collections.abc import Callable
34
from copy import deepcopy
4-
from typing import Any, Final
5+
from typing import Any, Final, cast
56

67
from renku_data_services import errors
8+
from renku_data_services.storage.constants import ENVIDAT_V1_PROVIDER, SCICAT_V1_PROVIDER
79

810
BANNED_STORAGE: Final[set[str]] = {
911
"alias",
@@ -261,6 +263,28 @@ def __patch_schema_remove_banned_sftp_options(spec: list[dict[str, Any]]) -> Non
261263
sftp["Options"] = options
262264

263265

266+
def __add_custom_doi_s3_provider(name: str, description: str, prefix: str) -> Callable[[list[dict[str, Any]]], None]:
267+
"""This is used to add envidata and scicat as providers.
268+
269+
However this is not a real provider in Rclone. The data service has to intercept the request
270+
and convert this provider to the proper S3 configuration where the data can be found.
271+
"""
272+
273+
def __patch(spec: list[dict[str, Any]]) -> None:
274+
doi_original = find_storage(spec, "doi")
275+
doi_new = deepcopy(doi_original)
276+
doi_new["Description"] = description
277+
doi_new["Name"] = name
278+
doi_new["Prefix"] = prefix
279+
doi_new_options = cast(list[dict[str, Any]], doi_new.get("Options", []))
280+
provider_ind = next((i for i, opt in enumerate(doi_new_options) if opt.get("Name") == "provider"), None)
281+
if provider_ind is not None:
282+
doi_new_options.pop(provider_ind)
283+
spec.append(doi_new)
284+
285+
return __patch
286+
287+
264288
def apply_patches(spec: list[dict[str, Any]]) -> None:
265289
"""Apply patches to RClone schema."""
266290
patches = [
@@ -271,6 +295,8 @@ def apply_patches(spec: list[dict[str, Any]]) -> None:
271295
__patch_schema_remove_oauth_propeties,
272296
__patch_polybox_storage,
273297
__patch_switchdrive_storage,
298+
__add_custom_doi_s3_provider("Envidat", "Envidat data provider", ENVIDAT_V1_PROVIDER),
299+
__add_custom_doi_s3_provider("SciCat", "SciCat data provider", SCICAT_V1_PROVIDER),
274300
__patch_schema_remove_banned_sftp_options,
275301
]
276302

0 commit comments

Comments
 (0)