66from html .parser import HTMLParser
77from typing import Any
88
9+ import httpx
910from pydantic import ValidationError as PydanticValidationError
1011
1112from renku_data_services import base_models , errors
1415 NamespacePath ,
1516 ProjectPath ,
1617)
17- from renku_data_services .data_connectors import apispec , models
18+ from renku_data_services .data_connectors import apispec , models , schema_org_dataset
19+ from renku_data_services .data_connectors .constants import ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS
1820from renku_data_services .data_connectors .doi .metadata import get_dataset_metadata
1921from renku_data_services .storage import models as storage_models
22+ from renku_data_services .storage .constants import ENVIDAT_V1_PROVIDER
2023from renku_data_services .storage .rclone import RCloneValidator
2124
2225
@@ -41,7 +44,7 @@ def dump_storage_with_sensitive_fields(
4144 return body
4245
4346
44- def validate_unsaved_storage (
47+ async def validate_unsaved_storage (
4548 storage : apispec .CloudStorageCorePost | apispec .CloudStorageUrlV2 , validator : RCloneValidator
4649) -> models .CloudStorageCore :
4750 """Validate the storage configuration of an unsaved data connector."""
@@ -59,6 +62,10 @@ def validate_unsaved_storage(
5962 )
6063 configuration = cloud_storage .configuration .config
6164 source_path = cloud_storage .source_path
65+ elif storage .storage_type == ENVIDAT_V1_PROVIDER :
66+ converted_storage = await convert_envidat_v1_data_connector_to_s3 (storage )
67+ configuration = converted_storage .configuration
68+ source_path = converted_storage .source_path
6269 else :
6370 configuration = storage .configuration
6471 source_path = storage .source_path
@@ -74,13 +81,13 @@ def validate_unsaved_storage(
7481 )
7582
7683
77- def validate_unsaved_data_connector (
84+ async def validate_unsaved_data_connector (
7885 body : apispec .DataConnectorPost , validator : RCloneValidator
7986) -> models .UnsavedDataConnector :
8087 """Validate an unsaved data connector."""
8188
8289 keywords = [kw .root for kw in body .keywords ] if body .keywords is not None else []
83- storage = validate_unsaved_storage (body .storage , validator = validator )
90+ storage = await validate_unsaved_storage (body .storage , validator = validator )
8491
8592 if body .namespace is None :
8693 raise NotImplementedError ("Missing namespace not supported" )
@@ -113,20 +120,37 @@ async def prevalidate_unsaved_global_data_connector(
113120) -> models .UnsavedGlobalDataConnector :
114121 """Pre-validate an unsaved data connector."""
115122
116- storage = validate_unsaved_storage (body .storage , validator = validator )
123+ storage = await validate_unsaved_storage (body .storage , validator = validator )
117124 # TODO: allow admins to create global data connectors, e.g. s3://giab
118- if storage .storage_type != "doi" :
119- raise errors .ValidationError (message = "Only doi storage type is allowed for global data connectors" )
125+ if storage .storage_type not in ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS :
126+ raise errors .ValidationError (
127+ message = f"Only { ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS } storage type is allowed for global data connectors"
128+ )
120129 if not storage .readonly :
121130 raise errors .ValidationError (message = "Global data connectors must be read-only" )
122131
123- rclone_metadata = await validator .get_doi_metadata (configuration = storage .configuration )
124-
125- doi_uri = f"doi:{ rclone_metadata .doi } "
126- slug = base_models .Slug .from_name (doi_uri ).value
127-
128- # Override provider in storage config
129- storage .configuration ["provider" ] = rclone_metadata .provider
132+ match storage .storage_type :
133+ case "doi" :
134+ rclone_metadata = await validator .get_doi_metadata (configuration = storage .configuration )
135+
136+ doi_uri = f"doi:{ rclone_metadata .doi } "
137+ slug = base_models .Slug .from_name (doi_uri ).value
138+
139+ # Override provider in storage config
140+ storage .configuration ["provider" ] = rclone_metadata .provider
141+ case x if x == ENVIDAT_V1_PROVIDER :
142+ if not isinstance (body .storage , apispec .CloudStorageCorePost ):
143+ raise errors .ValidationError ()
144+ doi = body .storage .configuration .get ("doi" )
145+ if not doi :
146+ raise errors .ValidationError ()
147+ doi_uri = f"doi:{ doi } "
148+ slug = base_models .Slug .from_name (doi_uri ).value
149+ case x :
150+ raise errors .ValidationError (
151+ message = f"Only { ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS } storage type is allowed "
152+ "for global data connectors"
153+ )
130154
131155 return models .UnsavedGlobalDataConnector (
132156 name = doi_uri ,
@@ -155,8 +179,11 @@ async def validate_unsaved_global_data_connector(
155179 )
156180
157181 # Fetch DOI metadata
158- rclone_metadata = await validator .get_doi_metadata (configuration = data_connector .storage .configuration )
159- metadata = await get_dataset_metadata (rclone_metadata = rclone_metadata )
182+ if data_connector .storage .storage_type == "doi" :
183+ rclone_metadata = await validator .get_doi_metadata (configuration = data_connector .storage .configuration )
184+ metadata = await get_dataset_metadata (rclone_metadata = rclone_metadata )
185+ else :
186+ metadata = None
160187
161188 name = data_connector .name
162189 description = ""
@@ -325,3 +352,46 @@ def text(self) -> str:
325352
326353 def handle_data (self , data : str ) -> None :
327354 self ._text += data
355+
356+
357+ async def convert_envidat_v1_data_connector_to_s3 (
358+ payload : apispec .CloudStorageCorePost ,
359+ ) -> apispec .CloudStorageCorePost :
360+ """Converts a doi-like configuration for Envidat to S3.
361+
362+ If the paylaod that is passed in is not of the expected type nothing is changed
363+ and the same payload that was passed in is returned.
364+ """
365+ config = payload .configuration
366+ if config .get ("type" ) != ENVIDAT_V1_PROVIDER :
367+ return payload
368+
369+ doi = config .get ("doi" )
370+ if not isinstance (doi , str ):
371+ raise errors .ValidationError ()
372+ if len (doi ) == 0 :
373+ raise errors .ValidationError ()
374+ doi = doi .removeprefix ("https://" )
375+ doi = doi .removeprefix ("http://" )
376+
377+ new_config = payload .model_copy (deep = True )
378+ new_config .configuration = {}
379+
380+ envidat_url = "https://envidat.ch/converters-api/internal-dataset/convert/jsonld"
381+ query_params = {"query" : doi }
382+ headers = {"accept" : "application/json" }
383+
384+ clnt = httpx .AsyncClient (follow_redirects = True )
385+ async with clnt :
386+ res = await clnt .get (envidat_url , params = query_params , headers = headers )
387+ if res .status_code != 200 :
388+ raise errors .ProgrammingError ()
389+ dataset = schema_org_dataset .Dataset .model_validate_strings (res .text )
390+ s3_config = schema_org_dataset .get_rclone_config (
391+ dataset ,
392+ schema_org_dataset .DatasetProvider .envidat ,
393+ )
394+ new_config .configuration = dict (s3_config .rclone_config )
395+ new_config .source_path = s3_config .path
396+ new_config .storage_type = "s3"
397+ return new_config
0 commit comments