-
Notifications
You must be signed in to change notification settings - Fork 10
Enhancement: Dataset signed URLs and improved validation errors #652
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
a424d9a
7547864
a912a3e
eea7a48
2e60f1f
6769017
634658d
d8432e1
73250df
17c0444
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,3 +1,5 @@ | ||
| Get details of a specific dataset by ID. | ||
|
|
||
| Returns comprehensive dataset information including metadata (ID, name, item counts, duplication factor), Langfuse integration details (dataset ID), and the object store URL for the CSV file. | ||
|
|
||
| Use `include_signed_url=true` to include a time-limited signed URL for downloading the dataset CSV directly from object storage. The signed URL expires after 1 hour. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,30 +1,106 @@ | ||
| import re | ||
| from collections import defaultdict | ||
|
|
||
| from fastapi import FastAPI, Request, HTTPException | ||
| from fastapi.responses import JSONResponse | ||
| from fastapi.exceptions import RequestValidationError | ||
| from app.utils import APIResponse | ||
| from fastapi.responses import JSONResponse | ||
| from starlette.status import ( | ||
| HTTP_422_UNPROCESSABLE_ENTITY, | ||
| HTTP_500_INTERNAL_SERVER_ERROR, | ||
| ) | ||
|
|
||
| from app.utils import APIResponse | ||
|
|
||
| _BRANCH_PATTERN = re.compile(r"^[A-Z]|[\[\]()]") | ||
|
|
||
|
|
||
| def _is_branch_identifier(part: str) -> bool: | ||
| return bool(part and isinstance(part, str) and _BRANCH_PATTERN.search(part)) | ||
|
Comment on lines
+14
to
+18
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🌐 Web query:
💡 Result: In Pydantic v2,
What the
|
||
|
|
||
|
|
||
| def _filter_union_branch_errors(errors: list[dict]) -> list[dict]: | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what exactly do you mean by "branch" here
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here, “branch” refers to each possible model inside a CompletionConfig = Annotated[
Union[NativeCompletionConfig, KaapiCompletionConfig],
Field(discriminator="provider"),
]When a validation error occurs, Pydantic often returns errors from all union branches, even though the issue actually belongs to only one branch. This can result in confusing error messages. The Note: If errors genuinely occur in multiple branches, errors from multiple branches will still be returned.
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added the docstring as well for that function filter union branch |
||
| """When a field is a Union type, pydantic returns errors for every possible branch. | ||
|
|
||
| This function picks the branch where the validation error happend. | ||
| """ | ||
| try: | ||
| branch_errors: dict[str, dict[str, list[dict]]] = defaultdict( | ||
| lambda: defaultdict(list) | ||
| ) | ||
| non_union_errors: list[dict] = [] | ||
|
|
||
| for err in errors: | ||
| loc = err.get("loc", ()) | ||
| branch_name = None | ||
| parent_field = None | ||
| for i, part in enumerate(loc): | ||
| if _is_branch_identifier(part): | ||
| branch_name = part | ||
| parent_field = loc[:i] if i > 0 else ("root",) | ||
| break | ||
|
|
||
| if branch_name and parent_field: | ||
| branch_errors[str(parent_field)][branch_name].append(err) | ||
| else: | ||
| non_union_errors.append(err) | ||
|
|
||
| filtered = list(non_union_errors) | ||
| for _parent, branches in branch_errors.items(): | ||
| if len(branches) <= 1: | ||
| for errs in branches.values(): | ||
| filtered.extend(errs) | ||
| else: | ||
| # NOTE: Keep all branches tied for fewest literal errors | ||
| best_count = min( | ||
| sum(1 for e in errs if e.get("type") == "literal_error") | ||
| for errs in branches.values() | ||
| ) | ||
| for errs in branches.values(): | ||
| if ( | ||
| sum(1 for e in errs if e.get("type") == "literal_error") | ||
| <= best_count | ||
| ): | ||
| filtered.extend(errs) | ||
|
|
||
| for err in filtered: | ||
| loc = err.get("loc", ()) | ||
| err["loc"] = tuple(p for p in loc if not _is_branch_identifier(p)) | ||
|
|
||
| seen_errors: set[tuple] = set() | ||
| unique_errors: list[dict] = [] | ||
| for error in filtered: | ||
| error_key = (tuple(error.get("loc", ())), error.get("msg", "")) | ||
| if error_key not in seen_errors: | ||
| seen_errors.add(error_key) | ||
| unique_errors.append(error) | ||
|
|
||
| return unique_errors or errors | ||
| except Exception: | ||
| return errors | ||
|
|
||
|
|
||
| def register_exception_handlers(app: FastAPI): | ||
| def register_exception_handlers(app: FastAPI) -> None: | ||
| @app.exception_handler(RequestValidationError) | ||
| async def validation_error_handler(request: Request, exc: RequestValidationError): | ||
| async def validation_error_handler( | ||
| request: Request, exc: RequestValidationError | ||
| ) -> JSONResponse: | ||
| errors = _filter_union_branch_errors(exc.errors()) | ||
| return JSONResponse( | ||
| status_code=HTTP_422_UNPROCESSABLE_ENTITY, | ||
| content=APIResponse.failure_response(exc.errors()).model_dump(), | ||
| content=APIResponse.failure_response(errors).model_dump(), | ||
| ) | ||
|
|
||
| @app.exception_handler(HTTPException) | ||
| async def http_exception_handler(request: Request, exc: HTTPException): | ||
| async def http_exception_handler( | ||
| request: Request, exc: HTTPException | ||
| ) -> JSONResponse: | ||
| return JSONResponse( | ||
| status_code=exc.status_code, | ||
| content=APIResponse.failure_response(exc.detail).model_dump(), | ||
| ) | ||
|
|
||
| @app.exception_handler(Exception) | ||
| async def generic_error_handler(request: Request, exc: Exception): | ||
| async def generic_error_handler(request: Request, exc: Exception) -> JSONResponse: | ||
| return JSONResponse( | ||
| status_code=HTTP_500_INTERNAL_SERVER_ERROR, | ||
| content=APIResponse.failure_response( | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing error handling for cloud storage failures.
According to
backend/app/core/cloud/storage.py:267-284,get_cloud_storage()can raiseValueErrorfor invalid projects or propagate exceptions when AWS initialization fails. If cloud storage is not configured (e.g., in development environments without AWS credentials), this will cause a 500 error even when the dataset exists.Consider gracefully handling these failures by returning the dataset without the signed URL instead of failing the entire request.
🛡️ Proposed fix with error handling
signed_url = None if include_signed_url and dataset.object_store_url: - storage = get_cloud_storage( - session=_session, project_id=auth_context.project_.id - ) - signed_url = storage.get_signed_url(dataset.object_store_url) + try: + storage = get_cloud_storage( + session=_session, project_id=auth_context.project_.id + ) + signed_url = storage.get_signed_url(dataset.object_store_url) + except Exception as e: + logger.warning( + f"[get_dataset] Failed to generate signed URL | dataset_id={dataset_id} | error={str(e)}" + ) + # Continue without signed_url rather than failing the request📝 Committable suggestion
🤖 Prompt for AI Agents