Skip to content

Commit 22347c9

Browse files
committed
squshme: go back to validate/prevalidate
1 parent b53d23e commit 22347c9

File tree

5 files changed

+130
-115
lines changed

5 files changed

+130
-115
lines changed

components/renku_data_services/data_connectors/blueprints.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ async def _post_global(
123123
) -> JSONResponse:
124124
data_connector = await prevalidate_unsaved_global_data_connector(body, validator=validator)
125125
result, inserted = await self.data_connector_repo.insert_global_data_connector(
126-
user=user, data_connector=data_connector, validator=validator
126+
user=user, prevalidated_dc=data_connector, validator=validator
127127
)
128128
return validated_json(
129129
apispec.DataConnector,

components/renku_data_services/data_connectors/core.py

Lines changed: 107 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,12 @@
1616
ProjectPath,
1717
)
1818
from renku_data_services.data_connectors import apispec, models
19-
from renku_data_services.data_connectors.constants import ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS
2019
from renku_data_services.data_connectors.doi import schema_org
2120
from renku_data_services.data_connectors.doi.metadata import create_envidat_metadata_url, get_dataset_metadata
2221
from renku_data_services.data_connectors.doi.models import DOI, SchemaOrgDataset
2322
from renku_data_services.storage import models as storage_models
2423
from renku_data_services.storage.constants import ENVIDAT_V1_PROVIDER
25-
from renku_data_services.storage.rclone import RCloneValidator
24+
from renku_data_services.storage.rclone import RCloneDOIMetadata, RCloneValidator
2625

2726

2827
def dump_storage_with_sensitive_fields(
@@ -46,41 +45,88 @@ def dump_storage_with_sensitive_fields(
4645
return body
4746

4847

49-
async def validate_unsaved_storage(
50-
storage: apispec.CloudStorageCorePost | apispec.CloudStorageUrlV2, validator: RCloneValidator
48+
def validate_unsaved_storage_url(
49+
storage: apispec.CloudStorageUrlV2, validator: RCloneValidator
5150
) -> models.CloudStorageCore:
51+
"""Validate the unsaved storage when its configuration is specificed as a URL."""
52+
cloud_storage = storage_models.UnsavedCloudStorage.from_url(
53+
project_id="FAKEPROJECTID",
54+
name="fake-storage-name",
55+
storage_url=storage.storage_url,
56+
target_path=storage.target_path,
57+
readonly=storage.readonly,
58+
)
59+
configuration = cloud_storage.configuration.config
60+
source_path = cloud_storage.source_path
61+
validator.validate(configuration)
62+
return models.CloudStorageCore(
63+
storage_type=configuration["type"],
64+
configuration=configuration,
65+
source_path=source_path,
66+
target_path=storage.target_path,
67+
readonly=storage.readonly,
68+
)
69+
70+
71+
def validate_unsaved_storage_generic(
72+
storage: apispec.CloudStorageCorePost, validator: RCloneValidator
73+
) -> models.CloudStorageCore:
74+
"""Validate the unsaved storage when its configuration is specificed as a URL."""
75+
configuration = storage.configuration
76+
validator.validate(configuration)
77+
storage_type = configuration.get("type")
78+
if not isinstance(storage_type, str):
79+
raise errors.ValidationError()
80+
return models.CloudStorageCore(
81+
storage_type=storage_type,
82+
configuration=configuration,
83+
source_path=storage.source_path,
84+
target_path=storage.target_path,
85+
readonly=storage.readonly,
86+
)
87+
88+
89+
async def validate_unsaved_storage_doi(
90+
storage: apispec.CloudStorageCorePost, validator: RCloneValidator
91+
) -> tuple[models.CloudStorageCore, DOI]:
5292
"""Validate the storage configuration of an unsaved data connector."""
5393

5494
configuration: dict[str, Any]
5595
source_path: str
5696

57-
if isinstance(storage, apispec.CloudStorageUrlV2):
58-
cloud_storage = storage_models.UnsavedCloudStorage.from_url(
59-
project_id="FAKEPROJECTID",
60-
name="fake-storage-name",
61-
storage_url=storage.storage_url,
62-
target_path=storage.target_path,
63-
readonly=storage.readonly,
97+
if storage.storage_type != "doi":
98+
raise errors.ProgrammingError(
99+
message="Only doi-type storage can be validated by the validate_unsaved_storage_doi function."
64100
)
65-
configuration = cloud_storage.configuration.config
66-
source_path = cloud_storage.source_path
67-
elif storage.storage_type == ENVIDAT_V1_PROVIDER:
68-
converted_storage = await convert_envidat_v1_data_connector_to_s3(storage)
69-
configuration = converted_storage.configuration
70-
source_path = converted_storage.source_path
71-
else:
72-
configuration = storage.configuration
73-
source_path = storage.source_path
101+
102+
doi_str = storage.configuration.get("doi")
103+
if not isinstance(doi_str, str):
104+
raise errors.ValidationError(message="Cannot find the doi in the storage configuration")
105+
106+
doi = DOI(doi_str)
107+
doi_host = await doi.resolve_host()
108+
109+
match doi_host:
110+
case "envidat.ch" | "www.envidat.ch":
111+
converted_storage = await convert_envidat_v1_data_connector_to_s3(storage)
112+
configuration = converted_storage.configuration
113+
source_path = converted_storage.source_path
114+
storage_type = ENVIDAT_V1_PROVIDER
115+
case _:
116+
# Most likely supported by rclone doi provider, you have to call validator.get_doi_metadata to confirm
117+
configuration = storage.configuration
118+
source_path = storage.source_path
119+
storage_type = storage.storage_type
74120

75121
validator.validate(configuration)
76122

77123
return models.CloudStorageCore(
78-
storage_type=configuration["type"],
124+
storage_type=storage_type,
79125
configuration=configuration,
80126
source_path=source_path,
81127
target_path=storage.target_path,
82128
readonly=storage.readonly,
83-
)
129+
), doi
84130

85131

86132
async def validate_unsaved_data_connector(
@@ -89,7 +135,15 @@ async def validate_unsaved_data_connector(
89135
"""Validate an unsaved data connector."""
90136

91137
keywords = [kw.root for kw in body.keywords] if body.keywords is not None else []
92-
storage = await validate_unsaved_storage(body.storage, validator=validator)
138+
match body.storage:
139+
case apispec.CloudStorageCorePost() if body.storage.storage_type != "doi":
140+
storage = validate_unsaved_storage_generic(body.storage, validator=validator)
141+
case apispec.CloudStorageCorePost() if body.storage.storage_type == "doi":
142+
storage, _ = await validate_unsaved_storage_doi(body.storage, validator=validator)
143+
case apispec.CloudStorageUrlV2():
144+
storage = validate_unsaved_storage_url(body.storage, validator=validator)
145+
case _:
146+
raise errors.ValidationError(message="The data connector provided has an unknown payload format.")
93147

94148
if body.namespace is None:
95149
raise NotImplementedError("Missing namespace not supported")
@@ -119,66 +173,49 @@ async def validate_unsaved_data_connector(
119173

120174
async def prevalidate_unsaved_global_data_connector(
121175
body: apispec.GlobalDataConnectorPost, validator: RCloneValidator
122-
) -> models.UnsavedGlobalDataConnector:
176+
) -> models.PrevalidatedGlobalDataConnector:
123177
"""Pre-validate an unsaved data connector."""
124-
125-
storage = await validate_unsaved_storage(body.storage, validator=validator)
126178
# TODO: allow admins to create global data connectors, e.g. s3://giab
127-
if storage.storage_type not in ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS:
128-
raise errors.ValidationError(
129-
message=f"Only {ALLOWED_GLOBAL_DATA_CONNECTOR_PROVIDERS} storage type is allowed for global data connectors"
130-
)
179+
if isinstance(body.storage, apispec.CloudStorageUrlV2) or body.storage.storage_type != "doi":
180+
raise errors.ValidationError(message="Only doi storage type is allowed for global data connectors")
181+
storage, doi = await validate_unsaved_storage_doi(body.storage, validator=validator)
131182
if not storage.readonly:
132183
raise errors.ValidationError(message="Global data connectors must be read-only")
133184

134-
rclone_metadata = await validator.get_doi_metadata(configuration=storage.configuration)
135-
if rclone_metadata:
136-
doi_uri = f"doi:{rclone_metadata.doi}"
137-
185+
rclone_metadata: RCloneDOIMetadata | None = None
186+
doi_uri = f"doi:{doi}"
187+
if storage.storage_type == "doi":
188+
# This means that the storage is most likely supported by Rclone, by calling the get_doi_metadata we confirm
189+
rclone_metadata = await validator.get_doi_metadata(configuration=storage.configuration)
190+
if not rclone_metadata:
191+
raise errors.ValidationError(message="The provided DOI is not supported.")
138192
# Override provider in storage config
139193
storage.configuration["provider"] = rclone_metadata.provider
140-
doi = DOI(rclone_metadata.doi)
141-
else:
142-
# The storage is not supported by rclone
143-
if not isinstance(body.storage, apispec.CloudStorageCorePost):
144-
raise errors.ValidationError(
145-
message="When the data connector is not supported by rclone we cannot parse a storage URL."
146-
)
147-
# Try to see if we have a different type not directly supported by rclone - from envidat for example
148-
doi_str = body.storage.configuration.get("doi")
149-
if not isinstance(doi_str, str):
150-
raise errors.ValidationError(message="A doi could not be found in the storage configuration.")
151-
doi = DOI(doi_str)
152-
host = await doi.resolve_host()
153-
if not host:
154-
raise errors.ValidationError(message=f"The provided doi {doi} cannot be resolved.")
155-
doi_uri = f"doi:{doi}"
156-
if host not in ["envidat.ch", "www.envidat.ch"]:
157-
raise errors.ValidationError(
158-
message="The doi for the global data connector resolved to an unsupported host"
159-
)
160-
# Set the storage type and re-validate
161-
body.storage.storage_type = ENVIDAT_V1_PROVIDER
162-
storage = await validate_unsaved_storage(body.storage, validator=validator)
163194

164195
slug = base_models.Slug.from_name(doi_uri).value
165-
return models.UnsavedGlobalDataConnector(
166-
name=doi_uri,
167-
slug=slug,
168-
visibility=Visibility.PUBLIC,
169-
created_by="",
170-
storage=storage,
171-
description=None,
172-
keywords=[],
196+
return models.PrevalidatedGlobalDataConnector(
197+
data_connector=models.UnsavedGlobalDataConnector(
198+
name=doi_uri,
199+
slug=slug,
200+
visibility=Visibility.PUBLIC,
201+
created_by="",
202+
storage=storage,
203+
description=None,
204+
keywords=[],
205+
),
173206
doi=doi,
207+
rclone_metadata=rclone_metadata,
174208
)
175209

176210

177211
async def validate_unsaved_global_data_connector(
178-
data_connector: models.UnsavedGlobalDataConnector,
212+
prevalidated_dc: models.PrevalidatedGlobalDataConnector,
179213
validator: RCloneValidator,
180214
) -> models.UnsavedGlobalDataConnector:
181-
"""Validate an unsaved data connector."""
215+
"""Validate the data connector."""
216+
data_connector = prevalidated_dc.data_connector
217+
doi = prevalidated_dc.doi
218+
rclone_metadata = prevalidated_dc.rclone_metadata
182219

183220
# Check that we can list the files in the DOI
184221
connection_result = await validator.test_connection(
@@ -190,13 +227,10 @@ async def validate_unsaved_global_data_connector(
190227
)
191228

192229
# Fetch DOI metadata
193-
if data_connector.storage.storage_type == "doi":
194-
rclone_metadata = await validator.get_doi_metadata(configuration=data_connector.storage.configuration)
195-
if not rclone_metadata:
196-
raise errors.ValidationError()
230+
if rclone_metadata:
197231
metadata = await get_dataset_metadata(data_connector.storage.storage_type, rclone_metadata.metadata_url)
198232
elif data_connector.storage.storage_type == ENVIDAT_V1_PROVIDER:
199-
metadata_url = create_envidat_metadata_url(data_connector.doi)
233+
metadata_url = create_envidat_metadata_url(doi)
200234
metadata = await get_dataset_metadata(data_connector.storage.storage_type, metadata_url)
201235
else:
202236
metadata = None
@@ -244,7 +278,6 @@ async def validate_unsaved_global_data_connector(
244278
storage=storage,
245279
description=description or None,
246280
keywords=keywords,
247-
doi=data_connector.doi,
248281
)
249282

250283

@@ -390,7 +423,7 @@ async def convert_envidat_v1_data_connector_to_s3(
390423
query_params = {"query": doi}
391424
headers = {"accept": "application/json"}
392425

393-
clnt = httpx.AsyncClient(follow_redirects=True)
426+
clnt = httpx.AsyncClient(follow_redirects=True, timeout=5)
394427
async with clnt:
395428
res = await clnt.get(envidat_url, params=query_params, headers=headers)
396429
if res.status_code != 200:
@@ -404,36 +437,3 @@ async def convert_envidat_v1_data_connector_to_s3(
404437
new_config.source_path = s3_config.path
405438
new_config.storage_type = "s3"
406439
return new_config
407-
408-
409-
# async def get_metadata(
410-
# configuration: storage_models.RCloneConfig | dict[str, Any], validator: RCloneValidator
411-
# ) -> RCloneDOIMetadata | None:
412-
# """Get metadata for the dataset."""
413-
# if isinstance(configuration, storage_models.RCloneConfig):
414-
# return await validator.get_doi_metadata(configuration)
415-
# doi = configuration.get("doi")
416-
# if not doi:
417-
# return None
418-
# parsed_doi = urlparse(doi)
419-
# if parsed_doi.scheme.decode() not in ["http", "https"]:
420-
# doi = urlunparse(parsed_doi._replace(scheme=b"https")).decode()
421-
# clnt = httpx.AsyncClient(follow_redirects=True)
422-
# async with clnt:
423-
# res = await clnt.get(doi)
424-
# if res.status_code != 200:
425-
# return None
426-
# match res.url.host:
427-
# case "www.envidat.ch":
428-
#
429-
#
430-
# async def get_envidat_metadata(doi: DOI) -> dict | None:
431-
# """Get metadata about the envidat dataset, the doi should not be a url."""
432-
# clnt = httpx.AsyncClient()
433-
# url = "https://envidat.ch/converters-api/internal-dataset/convert/jsonld"
434-
# params = {"query": doi}
435-
# async with clnt:
436-
# res = clnt.get(url, params=params)
437-
# if res.status_code != 200:
438-
# return None
439-
#

components/renku_data_services/data_connectors/db.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -369,7 +369,7 @@ async def insert_namespaced_data_connector(
369369
async def insert_global_data_connector(
370370
self,
371371
user: base_models.APIUser,
372-
data_connector: models.UnsavedGlobalDataConnector,
372+
prevalidated_dc: models.PrevalidatedGlobalDataConnector,
373373
validator: RCloneValidator | None,
374374
*,
375375
session: AsyncSession | None = None,
@@ -380,6 +380,7 @@ async def insert_global_data_connector(
380380
if user.id is None:
381381
raise errors.UnauthorizedError(message="You do not have the required permissions for this operation.")
382382

383+
data_connector = prevalidated_dc.data_connector
383384
slug = data_connector.slug or base_models.Slug.from_name(data_connector.name).value
384385

385386
existing_global_dc_stmt = select(schemas.DataConnectorORM).where(schemas.DataConnectorORM.global_slug == slug)
@@ -400,7 +401,7 @@ async def insert_global_data_connector(
400401
if validator is None:
401402
raise RuntimeError("Could not validate global data connector")
402403
data_connector = await validate_unsaved_global_data_connector(
403-
data_connector=data_connector, validator=validator
404+
prevalidated_dc=prevalidated_dc, validator=validator
404405
)
405406

406407
dc = await self._insert_data_connector(user=user, data_connector=data_connector, session=session)

components/renku_data_services/data_connectors/models.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
)
1717
from renku_data_services.data_connectors.doi.models import DOI
1818
from renku_data_services.namespace.models import GroupNamespace, ProjectNamespace, UserNamespace
19+
from renku_data_services.storage.rclone import RCloneDOIMetadata
1920
from renku_data_services.utils.etag import compute_etag_from_fields
2021

2122
if TYPE_CHECKING:
@@ -98,7 +99,15 @@ class UnsavedGlobalDataConnector(BaseDataConnector):
9899
"""Global data connector model."""
99100

100101
namespace: None = None
102+
103+
104+
@dataclass(frozen=True, eq=True, kw_only=True)
105+
class PrevalidatedGlobalDataConnector:
106+
"""Global data connector model that is unsaved but has been pre-validated."""
107+
108+
data_connector: UnsavedGlobalDataConnector
101109
doi: DOI
110+
rclone_metadata: RCloneDOIMetadata | None = None
102111

103112

104113
@dataclass(frozen=True, eq=True, kw_only=True)

0 commit comments

Comments
 (0)