diff --git a/backend/app/alembic/versions/eed36ae3c79a_alter_doc_transform_table_for_celery.py b/backend/app/alembic/versions/eed36ae3c79a_alter_doc_transform_table_for_celery.py new file mode 100644 index 000000000..5f3ff2151 --- /dev/null +++ b/backend/app/alembic/versions/eed36ae3c79a_alter_doc_transform_table_for_celery.py @@ -0,0 +1,39 @@ +"""alter doc transform table for celery + +Revision ID: eed36ae3c79a +Revises: ecda6b144627 +Create Date: 2025-11-12 20:08:39.774862 + +""" +from alembic import op +import sqlalchemy as sa +import sqlmodel.sql.sqltypes +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "eed36ae3c79a" +down_revision = "ecda6b144627" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "doc_transformation_job", + sa.Column("task_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + ) + op.add_column( + "doc_transformation_job", + sa.Column("trace_id", sqlmodel.sql.sqltypes.AutoString(), nullable=True), + ) + op.alter_column( + "doc_transformation_job", "created_at", new_column_name="inserted_at" + ) + + +def downgrade(): + op.alter_column( + "doc_transformation_job", "inserted_at", new_column_name="created_at" + ) + op.drop_column("doc_transformation_job", "trace_id") + op.drop_column("doc_transformation_job", "task_id") diff --git a/backend/app/api/main.py b/backend/app/api/main.py index ac6fd2863..c071a9e1c 100644 --- a/backend/app/api/main.py +++ b/backend/app/api/main.py @@ -5,8 +5,8 @@ assistants, collections, config, - documents, doc_transformation_job, + documents, login, llm, organization, diff --git a/backend/app/api/routes/doc_transformation_job.py b/backend/app/api/routes/doc_transformation_job.py index fa40769b5..dac5e131a 100644 --- a/backend/app/api/routes/doc_transformation_job.py +++ b/backend/app/api/routes/doc_transformation_job.py @@ -1,45 +1,97 @@ from uuid import UUID +import logging -from fastapi import APIRouter, HTTPException, Query, Path as FastPath +from fastapi import APIRouter, HTTPException, Query, Path from app.api.deps import CurrentUserOrgProject, SessionDep -from app.crud.doc_transformation_job import DocTransformationJobCrud -from app.models import DocTransformationJob, DocTransformationJobs +from app.crud import DocTransformationJobCrud, DocumentCrud +from app.models import ( + DocTransformationJobPublic, + DocTransformationJobsPublic, + TransformedDocumentPublic, +) from app.utils import APIResponse +from app.services.documents.helpers import build_job_schema, build_job_schemas +from app.core.cloud import get_cloud_storage + -router = APIRouter(prefix="/documents/transformations", tags=["doc_transformation_job"]) +logger = logging.getLogger(__name__) +router = APIRouter(prefix="/documents/transformation", tags=["documents"]) @router.get( "/{job_id}", description="Get the status and details of a document transformation job.", - response_model=APIResponse[DocTransformationJob], + response_model=APIResponse[DocTransformationJobPublic], ) def get_transformation_job( session: SessionDep, current_user: CurrentUserOrgProject, - job_id: UUID = FastPath(description="Transformation job ID"), + job_id: UUID = Path(..., description="Transformation job ID"), + include_url: bool = Query( + False, description="Include a signed URL for the transformed document" + ), ): - crud = DocTransformationJobCrud(session, current_user.project_id) - job = crud.read_one(job_id) - return APIResponse.success_response(job) + job_crud = DocTransformationJobCrud(session, current_user.project_id) + doc_crud = DocumentCrud(session, current_user.project_id) + + job = job_crud.read_one(job_id) + storage = ( + get_cloud_storage(session=session, project_id=current_user.project_id) + if include_url + else None + ) + + job_schema = build_job_schema( + job=job, + doc_crud=doc_crud, + include_url=include_url, + storage=storage, + ) + return APIResponse.success_response(job_schema) @router.get( "/", description="Get the status and details of multiple document transformation jobs by IDs.", - response_model=APIResponse[DocTransformationJobs], + response_model=APIResponse[DocTransformationJobsPublic], ) def get_multiple_transformation_jobs( session: SessionDep, current_user: CurrentUserOrgProject, job_ids: list[UUID] = Query( - description="List of transformation job IDs", min=1, max_length=100 + ..., + description="List of transformation job IDs", + min_items=1, + max_items=100, + ), + include_url: bool = Query( + False, description="Include a signed URL for each transformed document" ), ): - crud = DocTransformationJobCrud(session, project_id=current_user.project_id) - jobs = crud.read_each(set(job_ids)) - jobs_not_found = set(job_ids) - {job.id for job in jobs} + job_crud = DocTransformationJobCrud(session, project_id=current_user.project_id) + doc_crud = DocumentCrud(session, project_id=current_user.project_id) + + jobs = job_crud.read_each(set(job_ids)) + jobs_found_ids = {job.id for job in jobs} + jobs_not_found = set(job_ids) - jobs_found_ids + + storage = ( + get_cloud_storage(session=session, project_id=current_user.project_id) + if include_url + else None + ) + + job_schemas = build_job_schemas( + jobs=jobs, + doc_crud=doc_crud, + include_url=include_url, + storage=storage, + ) + return APIResponse.success_response( - DocTransformationJobs(jobs=jobs, jobs_not_found=list(jobs_not_found)) + DocTransformationJobsPublic( + jobs=job_schemas, + jobs_not_found=list(jobs_not_found), + ) ) diff --git a/backend/app/api/routes/documents.py b/backend/app/api/routes/documents.py index ed5a431ce..3b93946ca 100644 --- a/backend/app/api/routes/documents.py +++ b/backend/app/api/routes/documents.py @@ -1,13 +1,12 @@ import logging from pathlib import Path +from typing import Union from uuid import UUID, uuid4 from fastapi import ( APIRouter, - BackgroundTasks, File, Form, - HTTPException, Query, UploadFile, ) @@ -15,23 +14,23 @@ from app.api.deps import CurrentUserOrgProject, SessionDep from app.core.cloud import get_cloud_storage -from app.core.doctransform import service as transformation_service -from app.core.doctransform.registry import ( - get_available_transformers, - get_file_format, - is_transformation_supported, - resolve_transformer, -) from app.crud import CollectionCrud, DocumentCrud from app.crud.rag import OpenAIAssistantCrud, OpenAIVectorStoreCrud from app.models import ( Document, DocumentPublic, + TransformedDocumentPublic, DocumentUploadResponse, Message, TransformationJobInfo, ) from app.services.collections.helpers import pick_service_for_documennt +from app.services.documents.helpers import ( + schedule_transformation, + pre_transform_validation, + build_document_schema, + build_document_schemas, +) from app.utils import APIResponse, get_openai_client, load_description @@ -40,74 +39,65 @@ @router.get( - "/list", + "/", description=load_description("documents/list.md"), - response_model=APIResponse[list[DocumentPublic]], + response_model=APIResponse[list[Union[DocumentPublic, TransformedDocumentPublic]]], ) def list_docs( session: SessionDep, current_user: CurrentUserOrgProject, skip: int = Query(0, ge=0), limit: int = Query(100, gt=0, le=100), + include_url: bool = Query( + False, description="Include a signed URL to access each document" + ), ): crud = DocumentCrud(session, current_user.project_id) - data = crud.read_many(skip, limit) - return APIResponse.success_response(data) + documents = crud.read_many(skip, limit) + + storage = ( + get_cloud_storage(session=session, project_id=current_user.project_id) + if include_url and documents + else None + ) + + results = build_document_schemas( + documents=documents, + include_url=include_url, + storage=storage, + ) + return APIResponse.success_response(results) @router.post( - "/upload", + "/", description=load_description("documents/upload.md"), response_model=APIResponse[DocumentUploadResponse], ) async def upload_doc( session: SessionDep, current_user: CurrentUserOrgProject, - background_tasks: BackgroundTasks, src: UploadFile = File(...), target_format: str | None = Form( None, - description="Desired output format for the uploaded document (e.g., pdf, docx, txt). ", + description="Desired output format for the uploaded document (e.g., pdf, docx, txt).", ), transformer: str | None = Form( - None, description="Name of the transformer to apply when converting. " + None, description="Name of the transformer to apply when converting." ), + callback_url: str + | None = Form(None, description="URL to call to report endpoint status"), ): - # Determine source file format - try: - source_format = get_file_format(src.filename) - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - - # validate if transformation is possible or not - if target_format: - if not is_transformation_supported(source_format, target_format): - raise HTTPException( - status_code=400, - detail=f"Transformation from {source_format} to {target_format} is not supported", - ) - - # Resolve the transformer to use - if not transformer: - transformer = "default" - try: - actual_transformer = resolve_transformer( - source_format, target_format, transformer - ) - except ValueError as e: - available_transformers = get_available_transformers( - source_format, target_format - ) - raise HTTPException( - status_code=400, - detail=f"{str(e)}. Available transformers: {list(available_transformers.keys())}", - ) + source_format, actual_transformer = pre_transform_validation( + src_filename=src.filename, + target_format=target_format, + transformer=transformer, + ) storage = get_cloud_storage(session=session, project_id=current_user.project_id) document_id = uuid4() - object_store_url = storage.put(src, Path(str(document_id))) crud = DocumentCrud(session, current_user.project_id) @@ -118,24 +108,16 @@ async def upload_doc( ) source_document = crud.update(document) - job_info: TransformationJobInfo | None = None - if target_format and actual_transformer: - job_id = transformation_service.start_job( - db=session, - current_user=current_user, - source_document_id=source_document.id, - transformer_name=actual_transformer, - target_format=target_format, - background_tasks=background_tasks, - ) - job_info = TransformationJobInfo( - message=f"Document accepted for transformation from {source_format} to {target_format}.", - job_id=str(job_id), - source_format=source_format, - target_format=target_format, - transformer=actual_transformer, - status_check_url=f"/documents/transformations/{job_id}", - ) + job_info: TransformationJobInfo | None = schedule_transformation( + session=session, + project_id=current_user.project_id, + current_user=current_user, + source_format=source_format, + target_format=target_format, + actual_transformer=actual_transformer, + source_document_id=source_document.id, + callback_url=callback_url, + ) document_schema = DocumentPublic.model_validate( source_document, from_attributes=True @@ -143,15 +125,16 @@ async def upload_doc( document_schema.signed_url = storage.get_signed_url( source_document.object_store_url ) + response = DocumentUploadResponse( - **document_schema.model_dump(), transformation_job=job_info + **document_schema.model_dump(), + transformation_job=job_info, ) - return APIResponse.success_response(response) @router.delete( - "/remove/{doc_id}", + "/{doc_id}", description=load_description("documents/delete.md"), response_model=APIResponse[Message], ) @@ -182,7 +165,7 @@ def remove_doc( @router.delete( - "/remove/{doc_id}/permanent", + "/{doc_id}/permanent", description=load_description("documents/permanent_delete.md"), response_model=APIResponse[Message], ) @@ -216,9 +199,9 @@ def permanent_delete_doc( @router.get( - "/info/{doc_id}", + "/{doc_id}", description=load_description("documents/info.md"), - response_model=APIResponse[DocumentPublic], + response_model=APIResponse[Union[DocumentPublic, TransformedDocumentPublic]], ) def doc_info( session: SessionDep, @@ -231,9 +214,15 @@ def doc_info( crud = DocumentCrud(session, current_user.project_id) document = crud.read_one(doc_id) - doc_schema = DocumentPublic.model_validate(document, from_attributes=True) - if include_url: - storage = get_cloud_storage(session=session, project_id=current_user.project_id) - doc_schema.signed_url = storage.get_signed_url(document.object_store_url) + storage = ( + get_cloud_storage(session=session, project_id=current_user.project_id) + if include_url + else None + ) + doc_schema = build_document_schema( + document=document, + include_url=include_url, + storage=storage, + ) return APIResponse.success_response(doc_schema) diff --git a/backend/app/api/routes/fine_tuning.py b/backend/app/api/routes/fine_tuning.py index 203d6907a..33e14cbe7 100644 --- a/backend/app/api/routes/fine_tuning.py +++ b/backend/app/api/routes/fine_tuning.py @@ -18,7 +18,7 @@ ModelEvaluationStatus, ) from app.core.cloud import get_cloud_storage -from app.crud.document import DocumentCrud +from app.crud.document.document import DocumentCrud from app.utils import ( get_openai_client, handle_openai_error, diff --git a/backend/app/celery/tasks/job_execution.py b/backend/app/celery/tasks/job_execution.py index 6464d2cfd..8274de960 100644 --- a/backend/app/celery/tasks/job_execution.py +++ b/backend/app/celery/tasks/job_execution.py @@ -22,7 +22,7 @@ def execute_high_priority_task( Use this for urgent operations that need immediate processing. Args: - function_path: Import path to the execute_job function (e.g., "app.core.doctransform.service.execute_job") + function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") project_id: ID of the project executing the job job_id: ID of the job (should already exist in database) trace_id: Trace/correlation ID to preserve context across Celery tasks @@ -47,7 +47,7 @@ def execute_low_priority_task( Use this for background operations that can wait. Args: - function_path: Import path to the execute_job function (e.g., "app.core.doctransform.service.execute_job") + function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") project_id: ID of the project executing the job job_id: ID of the job (should already exist in database) trace_id: Trace/correlation ID to preserve context across Celery tasks diff --git a/backend/app/celery/utils.py b/backend/app/celery/utils.py index 5e0d485a4..957c02d9a 100644 --- a/backend/app/celery/utils.py +++ b/backend/app/celery/utils.py @@ -22,7 +22,7 @@ def start_high_priority_job( Start a high priority job using Celery. Args: - function_path: Import path to the execute_job function (e.g., "app.core.doctransform.service.execute_job") + function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") project_id: ID of the project executing the job job_id: ID of the job (should already exist in database) trace_id: Trace/correlation ID to preserve context across Celery tasks @@ -50,7 +50,7 @@ def start_low_priority_job( Start a low priority job using Celery. Args: - function_path: Import path to the execute_job function (e.g., "app.core.doctransform.service.execute_job") + function_path: Import path to the execute_job function (e.g., "app.services.doctransform.service.execute_job") project_id: ID of the project executing the job job_id: ID of the job (should already exist in database) trace_id: Trace/correlation ID to preserve context across Celery tasks diff --git a/backend/app/core/doctransform/service.py b/backend/app/core/doctransform/service.py deleted file mode 100644 index e119492f8..000000000 --- a/backend/app/core/doctransform/service.py +++ /dev/null @@ -1,146 +0,0 @@ -import tempfile -import shutil -import logging -from pathlib import Path -from uuid import uuid4, UUID - -from fastapi import BackgroundTasks, UploadFile -from tenacity import retry, wait_exponential, stop_after_attempt -from sqlmodel import Session -from starlette.datastructures import Headers - -from app.crud.doc_transformation_job import DocTransformationJobCrud -from app.crud.document import DocumentCrud -from app.models.document import Document -from app.models.doc_transformation_job import TransformationStatus -from app.models import User -from app.core.cloud import get_cloud_storage -from app.api.deps import CurrentUserOrgProject -from app.core.doctransform.registry import convert_document, FORMAT_TO_EXTENSION -from app.core.db import engine - -logger = logging.getLogger(__name__) - - -def start_job( - db: Session, - current_user: CurrentUserOrgProject, - source_document_id: UUID, - transformer_name: str, - target_format: str, - background_tasks: BackgroundTasks, -) -> UUID: - job_crud = DocTransformationJobCrud(session=db, project_id=current_user.project_id) - job = job_crud.create(source_document_id=source_document_id) - - # Extract the project ID before passing to background task - project_id = current_user.project_id - background_tasks.add_task( - execute_job, project_id, job.id, transformer_name, target_format - ) - logger.info( - f"[start_job] Job scheduled for document transformation | id: {job.id}, project_id: {project_id}" - ) - return job.id - - -@retry(wait=wait_exponential(multiplier=5, min=5, max=10), stop=stop_after_attempt(3)) -def execute_job( - project_id: int, - job_id: UUID, - transformer_name: str, - target_format: str, -): - tmp_dir: Path | None = None - try: - logger.info( - f"[execute_job started] Transformation Job started | job_id={job_id} | transformer_name={transformer_name} | target_format={target_format} | project_id={project_id}" - ) - - # Update job status to PROCESSING and fetch source document info - with Session(engine) as db: - job_crud = DocTransformationJobCrud(session=db, project_id=project_id) - job = job_crud.update_status(job_id, TransformationStatus.PROCESSING) - - doc_crud = DocumentCrud(session=db, project_id=project_id) - - source_doc = doc_crud.read_one(job.source_document_id) - - source_doc_id = source_doc.id - source_doc_fname = source_doc.fname - source_doc_object_store_url = source_doc.object_store_url - - storage = get_cloud_storage(session=db, project_id=project_id) - - # Download and transform document - body = storage.stream(source_doc_object_store_url) - tmp_dir = Path(tempfile.mkdtemp()) - tmp_in = tmp_dir / f"{source_doc_id}" - with open(tmp_in, "wb") as f: - shutil.copyfileobj(body, f) - - # prepare output file path - fname_no_ext = Path(source_doc_fname).stem - target_extension = FORMAT_TO_EXTENSION.get(target_format, f".{target_format}") - transformed_doc_id = uuid4() - tmp_out = tmp_dir / f"{fname_no_ext}{target_extension}" - - # transform document - now returns the output file path - convert_document(tmp_in, tmp_out, transformer_name) - - # Determine content type based on target format - content_type_map = {"markdown": "text/markdown; charset=utf-8"} - content_type = content_type_map.get(target_format, "text/plain") - - # upload transformed file and create document record - with open(tmp_out, "rb") as fobj: - file_upload = UploadFile( - filename=tmp_out.name, - file=fobj, - headers=Headers({"content-type": content_type}), - ) - dest = storage.put(file_upload, Path(str(transformed_doc_id))) - - # create new Document record - with Session(engine) as db: - new_doc = Document( - id=transformed_doc_id, - project_id=project_id, - fname=tmp_out.name, - object_store_url=str(dest), - source_document_id=source_doc_id, - ) - created = DocumentCrud(db, project_id).update(new_doc) - - job_crud = DocTransformationJobCrud(session=db, project_id=project_id) - job_crud.update_status( - job_id, - TransformationStatus.COMPLETED, - transformed_document_id=created.id, - ) - - logger.info( - f"[execute_job] Doc Transformation job completed | job_id={job_id} | transformed_doc_id={created.id} | project_id={project_id}" - ) - - except Exception as e: - logger.error( - f"Transformation job failed | job_id={job_id} | error={e}", exc_info=True - ) - try: - with Session(engine) as db: - job_crud = DocTransformationJobCrud(session=db, project_id=project_id) - job_crud.update_status( - job_id, TransformationStatus.FAILED, error_message=str(e) - ) - logger.info( - f"[execute_job] Doc Transformation job failed | job_id={job_id} | error={e}" - ) - except Exception as db_error: - logger.error( - f"Failed to update job status to FAILED | job_id={job_id} | db_error={db_error}" - ) - raise - finally: - if tmp_dir and tmp_dir.exists(): - shutil.rmtree(tmp_dir) diff --git a/backend/app/crud/__init__.py b/backend/app/crud/__init__.py index be5d36a40..658193106 100644 --- a/backend/app/crud/__init__.py +++ b/backend/app/crud/__init__.py @@ -6,9 +6,9 @@ ) from .collection.collection import CollectionCrud from .collection.collection_job import CollectionJobCrud -from .document import DocumentCrud +from .document.document import DocumentCrud from .document_collection import DocumentCollectionCrud -from .doc_transformation_job import DocTransformationJobCrud +from .document.doc_transformation_job import DocTransformationJobCrud from .jobs import JobCrud from .organization import ( diff --git a/backend/app/crud/document/__init__.py b/backend/app/crud/document/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/backend/app/crud/doc_transformation_job.py b/backend/app/crud/document/doc_transformation_job.py similarity index 71% rename from backend/app/crud/doc_transformation_job.py rename to backend/app/crud/document/doc_transformation_job.py index 7edf5f8dd..0fd278013 100644 --- a/backend/app/crud/doc_transformation_job.py +++ b/backend/app/crud/document/doc_transformation_job.py @@ -1,9 +1,16 @@ import logging from uuid import UUID from typing import List, Optional -from sqlmodel import Session, select, and_, join + +from sqlmodel import Session, select, and_ + from app.crud import DocumentCrud -from app.models import DocTransformationJob, TransformationStatus +from app.models import ( + DocTransformationJob, + TransformationStatus, + DocTransformJobCreate, + DocTransformJobUpdate, +) from app.models.document import Document from app.core.util import now from app.core.exception_handlers import HTTPException @@ -16,16 +23,13 @@ def __init__(self, session: Session, project_id: int): self.session = session self.project_id = project_id - def create(self, source_document_id: UUID) -> DocTransformationJob: - # Ensure the source document exists and is not deleted - DocumentCrud(self.session, self.project_id).read_one(source_document_id) - - job = DocTransformationJob(source_document_id=source_document_id) + def create(self, payload: DocTransformJobCreate) -> DocTransformationJob: + job = DocTransformationJob(**payload.model_dump()) self.session.add(job) self.session.commit() self.session.refresh(job) logger.info( - f"[DocTransformationJobCrud.create] Created new transformation job | id: {job.id}, source_document_id: {source_document_id}" + f"[DocTransformationJobCrud.create] Created new transformation job | id: {job.id}, source_document_id: {job.source_document_id}" ) return job @@ -66,26 +70,26 @@ def read_each(self, job_ids: set[UUID]) -> list[DocTransformationJob]: jobs = self.session.exec(statement).all() return jobs - def update_status( + def update( self, job_id: UUID, - status: TransformationStatus, - *, - error_message: Optional[str] = None, - transformed_document_id: Optional[UUID] = None, + patch: DocTransformJobUpdate, ) -> DocTransformationJob: + """Update an existing doc transformation job and return the updated row.""" job = self.read_one(job_id) - job.status = status + + # Only apply fields that were explicitly set and not None + changes = patch.model_dump(exclude_unset=True, exclude_none=True) + for field, value in changes.items(): + setattr(job, field, value) + job.updated_at = now() - if error_message is not None: - job.error_message = error_message - if transformed_document_id is not None: - job.transformed_document_id = transformed_document_id self.session.add(job) self.session.commit() self.session.refresh(job) + logger.info( - f"[DocTransformationJobCrud.update_status] Updated job status | id: {job.id}, status: {status}" + f"[DocTransformationJobCrud.update_status] Updated job status | id: {job.id}" ) return job diff --git a/backend/app/crud/document.py b/backend/app/crud/document/document.py similarity index 99% rename from backend/app/crud/document.py rename to backend/app/crud/document/document.py index 8703d56a8..1443e385a 100644 --- a/backend/app/crud/document.py +++ b/backend/app/crud/document/document.py @@ -1,6 +1,5 @@ import logging from uuid import UUID -from typing import Optional, List from sqlmodel import Session, select, and_ diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index 06081464a..d2246e75a 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -45,13 +45,17 @@ from .document import ( Document, DocumentPublic, + DocTransformationJobPublic, + DocTransformationJobsPublic, + TransformedDocumentPublic, DocumentUploadResponse, TransformationJobInfo, ) from .doc_transformation_job import ( DocTransformationJob, - DocTransformationJobs, TransformationStatus, + DocTransformJobCreate, + DocTransformJobUpdate, ) from .document_collection import DocumentCollection diff --git a/backend/app/models/doc_transformation_job.py b/backend/app/models/doc_transformation_job.py index b968d8837..139825ee0 100644 --- a/backend/app/models/doc_transformation_job.py +++ b/backend/app/models/doc_transformation_job.py @@ -1,8 +1,10 @@ import enum from uuid import UUID, uuid4 -from typing import Optional from datetime import datetime + from sqlmodel import SQLModel, Field +from pydantic import ConfigDict + from app.core.util import now @@ -18,15 +20,40 @@ class DocTransformationJob(SQLModel, table=True): id: UUID = Field(default_factory=uuid4, primary_key=True) source_document_id: UUID = Field(foreign_key="document.id") - transformed_document_id: Optional[UUID] = Field( + transformed_document_id: UUID | None = Field( default=None, foreign_key="document.id" ) status: TransformationStatus = Field(default=TransformationStatus.PENDING) - error_message: Optional[str] = Field(default=None) - created_at: datetime = Field(default_factory=now) + task_id: str | None = Field(default=None, nullable=True) + trace_id: str | None = Field( + default=None, description="Tracing ID for correlating logs and traces." + ) + error_message: str | None = Field(default=None) + inserted_at: datetime = Field(default_factory=now) updated_at: datetime = Field(default_factory=now) + @property + def job_id(self) -> UUID: + return self.id + + @property + def job_inserted_at(self) -> datetime: + return self.inserted_at + + @property + def job_updated_at(self) -> datetime: + return self.updated_at + + +class DocTransformJobCreate(SQLModel): + source_document_id: UUID + + model_config = ConfigDict(extra="forbid") + -class DocTransformationJobs(SQLModel): - jobs: list[DocTransformationJob] - jobs_not_found: list[UUID] +class DocTransformJobUpdate(SQLModel): + transformed_document_id: UUID | None = None + task_id: str | None = None + status: TransformationStatus | None = None + error_message: str | None = None + trace_id: str | None = None diff --git a/backend/app/models/document.py b/backend/app/models/document.py index 16d77a174..60d281423 100644 --- a/backend/app/models/document.py +++ b/backend/app/models/document.py @@ -5,6 +5,7 @@ from sqlmodel import Field, SQLModel from app.core.util import now +from app.models.doc_transformation_job import TransformationStatus class DocumentBase(SQLModel): @@ -51,20 +52,22 @@ class DocumentPublic(DocumentBase): updated_at: datetime = Field( description="The timestamp when the document was last updated" ) + signed_url: str | None = Field( + default=None, description="A signed URL for accessing the document" + ) + + +class TransformedDocumentPublic(DocumentPublic): source_document_id: UUID | None = Field( default=None, description="The ID of the source document if this document is a transformation", ) - signed_url: str | None = Field( - default=None, description="A signed URL for accessing the document" - ) class TransformationJobInfo(SQLModel): message: str job_id: UUID = Field(description="The unique identifier of the transformation job") - source_format: str = Field(description="The format of the source document") - target_format: str = Field(description="The format of the target document") + status: TransformationStatus transformer: str = Field(description="The name of the transformer used") status_check_url: str = Field( description="The URL to check the status of the transformation job" @@ -74,3 +77,16 @@ class TransformationJobInfo(SQLModel): class DocumentUploadResponse(DocumentPublic): signed_url: str = Field(description="A signed URL for accessing the document") transformation_job: TransformationJobInfo | None = None + + +class DocTransformationJobPublic(SQLModel): + job_id: UUID + source_document_id: UUID + status: TransformationStatus + transformed_document: TransformedDocumentPublic | None = None + error_message: str | None = None + + +class DocTransformationJobsPublic(SQLModel): + jobs: list[DocTransformationJobPublic] + jobs_not_found: list[UUID] diff --git a/backend/app/services/collections/helpers.py b/backend/app/services/collections/helpers.py index 5ca1c6759..795b04cd6 100644 --- a/backend/app/services/collections/helpers.py +++ b/backend/app/services/collections/helpers.py @@ -8,7 +8,7 @@ from sqlmodel import select from openai import OpenAIError -from app.crud.document import DocumentCrud +from app.crud.document.document import DocumentCrud from app.models import DocumentCollection, Collection diff --git a/backend/app/services/doctransform/job.py b/backend/app/services/doctransform/job.py new file mode 100644 index 000000000..8245b213b --- /dev/null +++ b/backend/app/services/doctransform/job.py @@ -0,0 +1,270 @@ +import tempfile +import shutil +import time +import logging +from pathlib import Path +from uuid import uuid4, UUID + +from fastapi import UploadFile +from tenacity import retry, wait_exponential, stop_after_attempt +from sqlmodel import Session +from asgi_correlation_id import correlation_id +from starlette.datastructures import Headers + +from app.crud.document.doc_transformation_job import DocTransformationJobCrud +from app.crud.document.document import DocumentCrud +from app.models import ( + Document, + DocTransformJobUpdate, + TransformationStatus, + DocTransformationJobPublic, + TransformedDocumentPublic, + DocTransformationJob, +) +from app.core.cloud import get_cloud_storage +from app.api.deps import CurrentUserOrgProject +from app.celery.utils import start_low_priority_job +from app.utils import send_callback, APIResponse +from app.services.doctransform.registry import convert_document, FORMAT_TO_EXTENSION +from app.core.db import engine + +logger = logging.getLogger(__name__) + + +def start_job( + db: Session, + current_user: CurrentUserOrgProject, + job_id: UUID, + transformer_name: str, + target_format: str, + callback_url: str | None, +) -> str: + trace_id = correlation_id.get() or "N/A" + job_crud = DocTransformationJobCrud(db, project_id=current_user.project_id) + job_crud.update(job_id, DocTransformJobUpdate(trace_id=trace_id)) + job = job_crud.read_one(job_id) + + project_id = current_user.project_id + + task_id = start_low_priority_job( + function_path="app.services.doctransform.job.execute_job", + project_id=project_id, + job_id=str(job.id), + source_document_id=str(job.source_document_id), + trace_id=trace_id, + transformer_name=transformer_name, + target_format=target_format, + callback_url=callback_url, + ) + + logger.info( + f"[start_job] Job scheduled for document transformation | id: {job.id}, project_id: {project_id}, task_id: {task_id}" + ) + return job.id + + +def build_success_payload( + job: DocTransformationJob, + transformed_doc: TransformedDocumentPublic, +): + """ + { + "success": true, + "data": { job fields + transformed_document (full) }, + "error": null, + "metadata": null + } + """ + transformed_public = TransformedDocumentPublic.model_validate(transformed_doc) + job_public = DocTransformationJobPublic.model_validate( + job, + update={"transformed_document": transformed_public}, + ) + # keep error_message out of the data envelope + return APIResponse.success_response(job_public).model_dump( + mode="json", exclude={"data": {"error_message"}} + ) + + +def build_failure_payload(job: DocTransformationJob, error_message: str): + """ + { + "success": false, + "data": { job fields, transformed_document: null }, + "error": "something went wrong", + "metadata": null + } + """ + # ensure transformed_document is explicitly null in the payload + job_public = DocTransformationJobPublic.model_validate( + job, + update={"transformed_document": None}, + ) + return APIResponse.failure_response(error_message, job_public).model_dump( + mode="json", + exclude={"data": {"error_message"}}, + ) + + +@retry(wait=wait_exponential(multiplier=5, min=5, max=10), stop=stop_after_attempt(3)) +def execute_job( + project_id: int, + job_id: str, + source_document_id: str, + transformer_name: str, + target_format: str, + task_id: str, + callback_url: str | None, + task_instance, +): + start_time = time.time() + tmp_dir: Path | None = None + + job_for_payload = None # keep latest job snapshot for payloads + + try: + job_uuid = UUID(job_id) + source_uuid = UUID(source_document_id) + + logger.info( + "[doc_transform.execute_job] started | job_id=%s | transformer=%s | target=%s | project_id=%s", + job_uuid, + transformer_name, + target_format, + project_id, + ) + + with Session(engine) as db: + job_crud = DocTransformationJobCrud(session=db, project_id=project_id) + job_for_payload = job_crud.update( + job_uuid, + DocTransformJobUpdate( + status=TransformationStatus.PROCESSING, task_id=task_id + ), + ) + + doc_crud = DocumentCrud(session=db, project_id=project_id) + source_doc = doc_crud.read_one(source_uuid) + + source_doc_id = source_doc.id + source_doc_fname = source_doc.fname + source_doc_object_store_url = source_doc.object_store_url + + storage = get_cloud_storage(session=db, project_id=project_id) + + # --- download source --- + body = storage.stream(source_doc_object_store_url) + tmp_dir = Path(tempfile.mkdtemp()) + tmp_in = tmp_dir / f"{source_doc_id}" + with open(tmp_in, "wb") as f: + shutil.copyfileobj(body, f) + + # --- transform --- + fname_no_ext = Path(source_doc_fname).stem + target_extension = FORMAT_TO_EXTENSION.get(target_format, f".{target_format}") + transformed_doc_id = uuid4() + tmp_out = tmp_dir / f"{fname_no_ext}{target_extension}" + + convert_document(tmp_in, tmp_out, transformer_name) + + # --- upload transformed file --- + content_type_map = {"markdown": "text/markdown; charset=utf-8"} + content_type = content_type_map.get(target_format, "text/plain") + + with open(tmp_out, "rb") as fobj: + file_upload = UploadFile( + filename=tmp_out.name, + file=fobj, + headers=Headers({"content-type": content_type}), + ) + dest = storage.put(file_upload, Path(str(transformed_doc_id))) + + with Session(engine) as db: + new_doc = Document( + id=transformed_doc_id, + project_id=project_id, + fname=tmp_out.name, + object_store_url=str(dest), + source_document_id=source_doc_id, + ) + created = DocumentCrud(db, project_id).update(new_doc) + + job_crud = DocTransformationJobCrud(session=db, project_id=project_id) + job_for_payload = job_crud.update( + job_uuid, + DocTransformJobUpdate( + status=TransformationStatus.COMPLETED, + transformed_document_id=created.id, + ), + ) + + signed_url = None + try: + get_signed_url = getattr(storage, "get_signed_url", None) + if callable(get_signed_url): + signed_url = get_signed_url(created.object_store_url) + except Exception as e: + logger.warning( + "[doc_transform] failed to generate signed URL for doc %s: %s", + created.id, + e, + ) + + transformed_public = TransformedDocumentPublic.model_validate( + created, + update={"signed_url": signed_url} if signed_url else None, + ) + + success_payload = build_success_payload(job_for_payload, transformed_public) + + elapsed = time.time() - start_time + logger.info( + "[doc_transform.execute_job] completed | job_id=%s | transformed_doc_id=%s | time=%.2fs", + job_uuid, + created.id, + elapsed, + ) + + if callback_url: + send_callback(callback_url, success_payload) + + except Exception as e: + logger.error( + "[doc_transform.execute_job] FAILED | job_id=%s | error=%s", + job_uuid, + e, + exc_info=True, + ) + + try: + with Session(engine) as db: + job_crud = DocTransformationJobCrud(session=db, project_id=project_id) + job_for_payload = job_crud.update( + job_uuid, + DocTransformJobUpdate( + status=TransformationStatus.FAILED, error_message=str(e) + ), + ) + except Exception as db_error: + logger.error( + "[doc_transform.execute_job] failed to persist FAILED status | job_id=%s | db_error=%s", + job_uuid, + db_error, + ) + + if callback_url and job_for_payload: + try: + failure_payload = build_failure_payload(job_for_payload, str(e)) + send_callback(callback_url, failure_payload) + except Exception as cb_error: + logger.error( + "[doc_transform.execute_job] callback failed | job_id=%s | error=%s", + job_uuid, + cb_error, + ) + + # bubble up for caller/infra + raise + finally: + if tmp_dir and tmp_dir.exists(): + shutil.rmtree(tmp_dir) diff --git a/backend/app/core/doctransform/registry.py b/backend/app/services/doctransform/registry.py similarity index 96% rename from backend/app/core/doctransform/registry.py rename to backend/app/services/doctransform/registry.py index b2ba380f5..29c2fc251 100644 --- a/backend/app/core/doctransform/registry.py +++ b/backend/app/services/doctransform/registry.py @@ -1,7 +1,7 @@ from pathlib import Path -from app.core.doctransform.transformer import Transformer -from app.core.doctransform.zerox_transformer import ZeroxTransformer +from app.services.doctransform.transformer import Transformer +from app.services.doctransform.zerox_transformer import ZeroxTransformer class TransformationError(Exception): diff --git a/backend/app/core/doctransform/transformer.py b/backend/app/services/doctransform/transformer.py similarity index 100% rename from backend/app/core/doctransform/transformer.py rename to backend/app/services/doctransform/transformer.py diff --git a/backend/app/core/doctransform/zerox_transformer.py b/backend/app/services/doctransform/zerox_transformer.py similarity index 97% rename from backend/app/core/doctransform/zerox_transformer.py rename to backend/app/services/doctransform/zerox_transformer.py index a69f31474..321a6ba65 100644 --- a/backend/app/core/doctransform/zerox_transformer.py +++ b/backend/app/services/doctransform/zerox_transformer.py @@ -4,7 +4,7 @@ from pathlib import Path from pyzerox import zerox -from app.core.doctransform.transformer import Transformer +from app.services.doctransform.transformer import Transformer logger = logging.getLogger(__name__) diff --git a/backend/app/services/documents/helpers.py b/backend/app/services/documents/helpers.py new file mode 100644 index 000000000..7319830bf --- /dev/null +++ b/backend/app/services/documents/helpers.py @@ -0,0 +1,191 @@ +from typing import Optional, Tuple, Iterable, Union +from uuid import UUID + +from fastapi import HTTPException + +from app.services.doctransform.registry import ( + get_available_transformers, + get_file_format, + is_transformation_supported, + resolve_transformer, +) +from app.crud import DocTransformationJobCrud, DocumentCrud +from app.services.doctransform import job as transformation_job +from app.models import ( + DocTransformJobCreate, + TransformationStatus, + TransformationJobInfo, + Document, + DocumentPublic, + DocTransformationJob, + DocTransformationJobPublic, + TransformedDocumentPublic, +) + + +def pre_transform_validation( + *, + src_filename: str, + target_format: str | None, + transformer: str | None, +) -> Tuple[str, str | None]: + """ + Everything BEFORE storage: + - detect source_format + - validate (source -> target) support if target requested + - resolve actual transformer (or None if no target_format) + + Returns: (source_format, actual_transformer_or_none) + Raises: HTTPException(400) on client errors. + """ + try: + source_format = get_file_format(src_filename) + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + + actual_transformer: Optional[str] = None + if target_format: + if not is_transformation_supported(source_format, target_format): + raise HTTPException( + status_code=400, + detail=f"Transformation from {source_format} to {target_format} is not supported", + ) + + candidate = transformer or "default" + try: + actual_transformer = resolve_transformer( + source_format, target_format, candidate + ) + except ValueError as e: + available = get_available_transformers(source_format, target_format) + raise HTTPException( + status_code=400, + detail=f"{e}. Available transformers: {list(available.keys())}", + ) + + return source_format, actual_transformer + + +def schedule_transformation( + *, + session, + project_id: int, + current_user, + source_format: str, + target_format: str | None, + actual_transformer: str | None, + source_document_id: UUID, + callback_url: str | None, +) -> TransformationJobInfo | None: + """ + Everything AFTER the document row is persisted: + - if target was requested and a transformer was resolved, + create the job and enqueue it; return job_info. + - otherwise return None. + """ + if not (target_format and actual_transformer): + return None + + job_crud = DocTransformationJobCrud(session, project_id) + job = job_crud.create(DocTransformJobCreate(source_document_id=source_document_id)) + + transformation_job_id = transformation_job.start_job( + db=session, + job_id=job.id, + current_user=current_user, + transformer_name=actual_transformer, + target_format=target_format, + callback_url=callback_url, + ) + + return TransformationJobInfo( + message=f"Document accepted for transformation from {source_format} to {target_format}.", + job_id=str(transformation_job_id), + status=TransformationStatus.PENDING, + transformer=actual_transformer, + status_check_url=f"/documents/transformation/{transformation_job_id}", + ) + + +PublicDoc = Union[DocumentPublic, TransformedDocumentPublic] + + +def _to_public_schema(doc: Document) -> PublicDoc: + if doc.source_document_id is None: + return DocumentPublic.model_validate(doc, from_attributes=True) + return TransformedDocumentPublic.model_validate(doc, from_attributes=True) + + +def build_document_schema( + *, + document: Document, + include_url: bool, + storage: object | None, +) -> PublicDoc: + schema = _to_public_schema(document) + if include_url and storage: + schema.signed_url = storage.get_signed_url(document.object_store_url) + return schema + + +def build_document_schemas( + *, + documents: Iterable[Document], + include_url: bool, + storage: object | None, +) -> list[PublicDoc]: + out: list[PublicDoc] = [] + for doc in documents: + schema = _to_public_schema(doc) + if include_url and storage: + schema.signed_url = storage.get_signed_url(doc.object_store_url) + out.append(schema) + return out + + +def build_job_schema( + *, + job: DocTransformationJob, + doc_crud: DocumentCrud, + include_url: bool, + storage: object | None, +) -> DocTransformationJobPublic: + """Build a single job schema, optionally attaching a signed URL.""" + transformed_doc_schema: TransformedDocumentPublic | None = None + object_url: str | None = None + + if job.transformed_document_id: + doc = doc_crud.read_one(job.transformed_document_id) + transformed_doc_schema = TransformedDocumentPublic.model_validate( + doc, from_attributes=True + ) + object_url = doc.object_store_url + + if include_url and storage and object_url: + transformed_doc_schema.signed_url = storage.get_signed_url(object_url) + + job_schema = DocTransformationJobPublic.model_validate(job, from_attributes=True) + return job_schema.model_copy( + update={"transformed_document": transformed_doc_schema} + ) + + +def build_job_schemas( + *, + jobs: Iterable[DocTransformationJob], + doc_crud: DocumentCrud, + include_url: bool, + storage: object | None, +) -> list[DocTransformationJobPublic]: + """Build many job schemas efficiently.""" + out: list[DocTransformationJobPublic] = [] + for job in jobs: + out.append( + build_job_schema( + job=job, + doc_crud=doc_crud, + include_url=include_url, + storage=storage, + ) + ) + return out diff --git a/backend/app/tests/api/routes/documents/test_route_document_info.py b/backend/app/tests/api/routes/documents/test_route_document_info.py index 5ed921431..3a49013fd 100644 --- a/backend/app/tests/api/routes/documents/test_route_document_info.py +++ b/backend/app/tests/api/routes/documents/test_route_document_info.py @@ -14,7 +14,7 @@ @pytest.fixture def route(): - return Route("info") + return Route("") class TestDocumentRouteInfo: diff --git a/backend/app/tests/api/routes/documents/test_route_document_list.py b/backend/app/tests/api/routes/documents/test_route_document_list.py index 8b2ec7a73..a580fb2a7 100644 --- a/backend/app/tests/api/routes/documents/test_route_document_list.py +++ b/backend/app/tests/api/routes/documents/test_route_document_list.py @@ -21,7 +21,7 @@ def pushq(self, key, value): @pytest.fixture def route(): - return QueryRoute("list") + return QueryRoute("") class TestDocumentRouteList: diff --git a/backend/app/tests/api/routes/documents/test_route_document_permanent_remove.py b/backend/app/tests/api/routes/documents/test_route_document_permanent_remove.py index 179d247d6..57de20ad9 100644 --- a/backend/app/tests/api/routes/documents/test_route_document_permanent_remove.py +++ b/backend/app/tests/api/routes/documents/test_route_document_permanent_remove.py @@ -26,7 +26,7 @@ @pytest.fixture def route(): - return Route("remove") + return Route("") @pytest.fixture(scope="class") diff --git a/backend/app/tests/api/routes/documents/test_route_document_remove.py b/backend/app/tests/api/routes/documents/test_route_document_remove.py index 7c1e34769..d1aa37b3d 100644 --- a/backend/app/tests/api/routes/documents/test_route_document_remove.py +++ b/backend/app/tests/api/routes/documents/test_route_document_remove.py @@ -17,7 +17,7 @@ @pytest.fixture def route(): - return Route("remove") + return Route("") class TestDocumentRouteRemove: diff --git a/backend/app/tests/api/routes/documents/test_route_document_upload.py b/backend/app/tests/api/routes/documents/test_route_document_upload.py index 320c1992a..a16eaea25 100644 --- a/backend/app/tests/api/routes/documents/test_route_document_upload.py +++ b/backend/app/tests/api/routes/documents/test_route_document_upload.py @@ -73,7 +73,7 @@ def pdf_scratch(): @pytest.fixture def route(): - return Route("upload") + return Route("") @pytest.fixture @@ -151,7 +151,7 @@ def test_upload_without_transformation( assert "id" in response.data assert "fname" in response.data - @patch("app.core.doctransform.service.start_job") + @patch("app.services.doctransform.job.start_job") def test_upload_with_transformation( self, mock_start_job, @@ -177,16 +177,14 @@ def test_upload_with_transformation( transformation_job = response.data["transformation_job"] assert transformation_job["job_id"] == mock_job_id - assert transformation_job["source_format"] == "pdf" - assert transformation_job["target_format"] == "markdown" assert transformation_job["transformer"] == "zerox" # Default transformer assert ( transformation_job["status_check_url"] - == f"/documents/transformations/{mock_job_id}" + == f"/documents/transformation/{mock_job_id}" ) assert "message" in transformation_job - @patch("app.core.doctransform.service.start_job") + @patch("app.services.doctransform.job.start_job") def test_upload_with_specific_transformer( self, mock_start_job, @@ -276,7 +274,7 @@ def test_upload_with_unsupported_file_extension( finally: unsupported_file.unlink() - @patch("app.core.doctransform.service.start_job") + @patch("app.services.doctransform.job.start_job") def test_transformation_job_created_in_database( self, mock_start_job, @@ -321,7 +319,6 @@ def test_upload_response_structure_without_transformation( "fname", "inserted_at", "updated_at", - "source_document_id", ] for field in required_fields: assert field in response.data diff --git a/backend/app/tests/api/routes/test_doc_transformation_job.py b/backend/app/tests/api/routes/test_doc_transformation_job.py index 924600588..a0b50aac4 100644 --- a/backend/app/tests/api/routes/test_doc_transformation_job.py +++ b/backend/app/tests/api/routes/test_doc_transformation_job.py @@ -2,8 +2,12 @@ from sqlmodel import Session from app.core.config import settings -from app.crud.doc_transformation_job import DocTransformationJobCrud -from app.models import TransformationStatus +from app.crud.document.doc_transformation_job import DocTransformationJobCrud +from app.models import ( + TransformationStatus, + DocTransformJobCreate, + DocTransformJobUpdate, +) from app.tests.utils.document import DocumentStore from app.tests.utils.auth import TestAuthContext @@ -12,24 +16,22 @@ class TestGetTransformationJob: def test_get_existing_job_success( self, client: TestClient, db: Session, user_api_key: TestAuthContext ): - """Test successfully retrieving an existing transformation job.""" document = DocumentStore(db, user_api_key.project_id).put() - job = DocTransformationJobCrud(db, user_api_key.project_id) - created_job = job.create(document.id) + crud = DocTransformationJobCrud(db, user_api_key.project_id) + created_job = crud.create(DocTransformJobCreate(source_document_id=document.id)) - response = client.get( - f"{settings.API_V1_STR}/documents/transformations/{created_job.id}", + resp = client.get( + f"{settings.API_V1_STR}/documents/transformation/{created_job.id}", headers={"X-API-KEY": user_api_key.key}, ) + assert resp.status_code == 200 + data = resp.json()["data"] - assert response.status_code == 200 - data = response.json() - assert "data" in data - assert data["data"]["id"] is not None - assert data["data"]["source_document_id"] == str(document.id) - assert data["data"]["status"] == TransformationStatus.PENDING - assert data["data"]["error_message"] is None - assert data["data"]["transformed_document_id"] is None + assert data["job_id"] is not None + assert data["source_document_id"] == str(document.id) + assert data["status"] == TransformationStatus.PENDING + assert data["error_message"] is None + assert data["transformed_document"] is None def test_get_nonexistent_job_404( self, client: TestClient, db: Session, user_api_key: TestAuthContext @@ -38,7 +40,7 @@ def test_get_nonexistent_job_404( fake_uuid = "00000000-0000-0000-0000-000000000001" response = client.get( - f"{settings.API_V1_STR}/documents/transformations/{fake_uuid}", + f"{settings.API_V1_STR}/documents/transformation/{fake_uuid}", headers={"X-API-KEY": user_api_key.key}, ) @@ -47,15 +49,13 @@ def test_get_nonexistent_job_404( def test_get_job_invalid_uuid_422( self, client: TestClient, user_api_key: TestAuthContext ): - """Test getting a job with invalid UUID format returns 422.""" - invalid_uuid = "not-a-uuid" - - response = client.get( - f"{settings.API_V1_STR}/documents/transformations/{invalid_uuid}", + resp = client.get( + f"{settings.API_V1_STR}/documents/transformation/not-a-uuid", headers={"X-API-KEY": user_api_key.key}, ) - - assert response.status_code == 422 + assert resp.status_code == 422 + body = resp.json() + assert "error" in body and "valid UUID" in body["error"] def test_get_job_different_project_404( self, @@ -68,11 +68,10 @@ def test_get_job_different_project_404( store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) - # Try to access with user from different project (superuser) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/{job.id}", + f"{settings.API_V1_STR}/documents/transformation/{job.id}", headers={"X-API-KEY": superuser_api_key.key}, ) @@ -86,24 +85,28 @@ def test_get_completed_job_with_result( crud = DocTransformationJobCrud(db, user_api_key.project_id) source_document = store.put() transformed_document = store.put() - job = crud.create(source_document.id) + job = crud.create(DocTransformJobCreate(source_document_id=source_document.id)) # Update job to completed status - crud.update_status( + crud.update( job.id, - TransformationStatus.COMPLETED, - transformed_document_id=transformed_document.id, + DocTransformJobUpdate( + status=TransformationStatus.COMPLETED, + transformed_document_id=transformed_document.id, + ), ) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/{job.id}", + f"{settings.API_V1_STR}/documents/transformation/{job.id}", headers={"X-API-KEY": user_api_key.key}, ) assert response.status_code == 200 data = response.json() assert data["data"]["status"] == TransformationStatus.COMPLETED - assert data["data"]["transformed_document_id"] == str(transformed_document.id) + assert data["data"]["transformed_document"]["id"] == str( + transformed_document.id + ) def test_get_failed_job_with_error( self, client: TestClient, db: Session, user_api_key: TestAuthContext @@ -112,14 +115,19 @@ def test_get_failed_job_with_error( store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) error_msg = "Transformation failed due to invalid format" # Update job to failed status - crud.update_status(job.id, TransformationStatus.FAILED, error_message=error_msg) + crud.update( + job.id, + DocTransformJobUpdate( + status=TransformationStatus.FAILED, error_message=error_msg + ), + ) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/{job.id}", + f"{settings.API_V1_STR}/documents/transformation/{job.id}", headers={"X-API-KEY": user_api_key.key}, ) @@ -137,11 +145,14 @@ def test_get_multiple_jobs_success( store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) documents = store.fill(3) - jobs = [crud.create(doc.id) for doc in documents] + jobs = [ + crud.create(DocTransformJobCreate(source_document_id=doc.id)) + for doc in documents + ] job_ids_params = "&".join(f"job_ids={job.id}" for job in jobs) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?{job_ids_params}", + f"{settings.API_V1_STR}/documents/transformation/?{job_ids_params}", headers={"X-API-KEY": user_api_key.key}, ) @@ -151,7 +162,7 @@ def test_get_multiple_jobs_success( assert len(data["data"]["jobs"]) == 3 assert len(data["data"]["jobs_not_found"]) == 0 - returned_ids = {job["id"] for job in data["data"]["jobs"]} + returned_ids = {job["job_id"] for job in data["data"]["jobs"]} expected_ids = {str(job.id) for job in jobs} assert returned_ids == expected_ids @@ -162,7 +173,10 @@ def test_get_mixed_existing_nonexisting_jobs( store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) documents = store.fill(2) - jobs = [crud.create(doc.id) for doc in documents] + jobs = [ + crud.create(DocTransformJobCreate(source_document_id=doc.id)) + for doc in documents + ] fake_uuid = "00000000-0000-0000-0000-000000000001" job_ids_params = ( @@ -170,7 +184,7 @@ def test_get_mixed_existing_nonexisting_jobs( ) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?{job_ids_params}", + f"{settings.API_V1_STR}/documents/transformation/?{job_ids_params}", headers={"X-API-KEY": user_api_key.key}, ) @@ -185,69 +199,97 @@ def test_get_jobs_with_empty_string( ): """Test retrieving jobs with empty job_ids parameter.""" response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?job_ids=", + f"{settings.API_V1_STR}/documents/transformation/?job_ids=", headers={"X-API-KEY": user_api_key.key}, ) assert response.status_code == 422 + body = response.json() + assert body["success"] is False + assert body["data"] is None + assert "error" in body + assert "valid UUID" in body["error"] or "expected length" in body["error"] + assert "job_ids" in body["error"] def test_get_jobs_with_whitespace_only( self, client: TestClient, user_api_key: TestAuthContext ): """Test retrieving jobs with whitespace-only job_ids.""" response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?job_ids= ", + f"{settings.API_V1_STR}/documents/transformation/?job_ids= ", headers={"X-API-KEY": user_api_key.key}, ) assert response.status_code == 422 + body = response.json() + assert body["success"] is False + assert body["data"] is None + assert "error" in body + assert "valid UUID" in body["error"] def test_get_jobs_invalid_uuid_format_422( self, client: TestClient, user_api_key: TestAuthContext ): - """Test that invalid UUID format returns 422.""" + """Invalid UUID format should return 422 (validation error).""" invalid_uuid = "not-a-uuid" response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?job_ids={invalid_uuid}", + f"{settings.API_V1_STR}/documents/transformation/?job_ids={invalid_uuid}", headers={"X-API-KEY": user_api_key.key}, ) assert response.status_code == 422 - data = response.json() - assert "Input should be a valid UUID" in data["error"] + body = response.json() + assert body["success"] is False + assert body["data"] is None + assert "error" in body + assert "valid UUID" in body["error"] or "expected length" in body["error"] + assert "job_ids" in body["error"] def test_get_jobs_mixed_valid_invalid_uuid_422( self, client: TestClient, db: Session, user_api_key: TestAuthContext ): - """Test that mixed valid/invalid UUIDs returns 422.""" + """Mixed valid/invalid UUIDs should return 422.""" store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) job_ids_params = f"job_ids={job.id}&job_ids=not-a-uuid" response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?{job_ids_params}", + f"{settings.API_V1_STR}/documents/transformation/?{job_ids_params}", headers={"X-API-KEY": user_api_key.key}, ) assert response.status_code == 422 - data = response.json() - assert "Input should be a valid UUID" in data["error"] - assert "job_ids" in data["error"] + body = response.json() + assert body["success"] is False + assert body["data"] is None + assert "error" in body + assert "job_ids" in body["error"] + assert ( + "valid UUID" in body["error"] + or "invalid character" in body["error"] + or "invalid length" in body["error"] + ) def test_get_jobs_missing_parameter_422( self, client: TestClient, user_api_key: TestAuthContext ): - """Test that missing job_ids parameter returns empty results.""" + """Missing job_ids parameter should 422 (Query(min=1)).""" response = client.get( - f"{settings.API_V1_STR}/documents/transformations/", + f"{settings.API_V1_STR}/documents/transformation/", headers={"X-API-KEY": user_api_key.key}, ) assert response.status_code == 422 + body = response.json() + assert body["success"] is False + assert body["data"] is None + assert "error" in body + assert "Field required" in body["error"] + assert "job_ids" in body["error"] def test_get_jobs_different_project_not_found( self, @@ -256,15 +298,14 @@ def test_get_jobs_different_project_not_found( user_api_key: TestAuthContext, superuser_api_key: TestAuthContext, ): - """Test that jobs from different projects are not returned.""" + """Jobs from different projects are not returned.""" store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) - # Try to access with user from different project (superuser) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?job_ids={job.id}", + f"{settings.API_V1_STR}/documents/transformation/?job_ids={job.id}", headers={"X-API-KEY": superuser_api_key.key}, ) @@ -281,22 +322,33 @@ def test_get_jobs_with_various_statuses( store = DocumentStore(db, user_api_key.project_id) crud = DocTransformationJobCrud(db, user_api_key.project_id) documents = store.fill(4) - jobs = [crud.create(doc.id) for doc in documents] + jobs = [ + crud.create(DocTransformJobCreate(source_document_id=doc.id)) + for doc in documents + ] - crud.update_status(jobs[1].id, TransformationStatus.PROCESSING) - crud.update_status( + crud.update( + jobs[1].id, DocTransformJobUpdate(status=TransformationStatus.PROCESSING) + ) + crud.update( jobs[2].id, - TransformationStatus.COMPLETED, - transformed_document_id=documents[2].id, + DocTransformJobUpdate( + status=TransformationStatus.COMPLETED, + transformed_document_id=documents[2].id, + ), ) - crud.update_status( - jobs[3].id, TransformationStatus.FAILED, error_message="Test error" + crud.update( + jobs[3].id, + DocTransformJobUpdate( + status=TransformationStatus.FAILED, + error_message="Test error", + ), ) job_ids_params = "&".join(f"job_ids={job.id}" for job in jobs) response = client.get( - f"{settings.API_V1_STR}/documents/transformations/?{job_ids_params}", + f"{settings.API_V1_STR}/documents/transformation/?{job_ids_params}", headers={"X-API-KEY": user_api_key.key}, ) @@ -304,7 +356,6 @@ def test_get_jobs_with_various_statuses( data = response.json() assert len(data["data"]["jobs"]) == 4 - # Check that all statuses are represented statuses = {job["status"] for job in data["data"]["jobs"]} expected_statuses = { TransformationStatus.PENDING, diff --git a/backend/app/tests/core/doctransformer/test_service/test_start_job.py b/backend/app/tests/core/doctransformer/test_service/test_start_job.py deleted file mode 100644 index 94a59dd12..000000000 --- a/backend/app/tests/core/doctransformer/test_service/test_start_job.py +++ /dev/null @@ -1,174 +0,0 @@ -""" -Tests for the start_job function in document transformation service. -""" -from typing import Tuple -from uuid import uuid4 - -import pytest -from fastapi import BackgroundTasks -from sqlmodel import Session - -from app.core.doctransform.service import execute_job, start_job -from app.core.doctransform.registry import TRANSFORMERS -from app.core.exception_handlers import HTTPException -from app.models import ( - Document, - DocTransformationJob, - Project, - TransformationStatus, - UserProjectOrg, -) -from app.tests.core.doctransformer.test_service.utils import ( - DocTransformTestBase, - MockTestTransformer, -) - - -class TestStartJob(DocTransformTestBase): - """Test cases for the start_job function.""" - - def test_start_job_success( - self, - db: Session, - current_user: UserProjectOrg, - test_document: Tuple[Document, Project], - background_tasks: BackgroundTasks, - ) -> None: - """Test successful job creation and scheduling.""" - document, _ = test_document - job_id = start_job( - db=db, - current_user=current_user, - source_document_id=document.id, - transformer_name="test-transformer", - target_format="markdown", - background_tasks=background_tasks, - ) - - job = db.get(DocTransformationJob, job_id) - assert job is not None - assert job.source_document_id == document.id - assert job.status == TransformationStatus.PENDING - assert job.error_message is None - assert job.transformed_document_id is None - - assert len(background_tasks.tasks) == 1 - task = background_tasks.tasks[0] - assert task.func == execute_job - assert task.args[0] == current_user.project_id - assert task.args[1] == job_id - assert task.args[2] == "test-transformer" - assert task.args[3] == "markdown" - - def test_start_job_with_nonexistent_document( - self, - db: Session, - current_user: UserProjectOrg, - background_tasks: BackgroundTasks, - ) -> None: - """Test job creation with non-existent document raises error.""" - nonexistent_id = uuid4() - - with pytest.raises(HTTPException) as exc_info: - start_job( - db=db, - current_user=current_user, - source_document_id=nonexistent_id, - transformer_name="test-transformer", - target_format="markdown", - background_tasks=background_tasks, - ) - - assert exc_info.value.status_code == 404 - assert "Document not found" in str(exc_info.value.detail) - - def test_start_job_with_deleted_document( - self, - db: Session, - current_user: UserProjectOrg, - test_document: Tuple[Document, Project], - background_tasks: BackgroundTasks, - ) -> None: - """Test job creation with deleted document raises error.""" - document, _ = test_document - - document.is_deleted = True - db.add(document) - db.commit() - - with pytest.raises(HTTPException) as exc_info: - start_job( - db=db, - current_user=current_user, - source_document_id=document.id, - transformer_name="test-transformer", - target_format="markdown", - background_tasks=background_tasks, - ) - - assert exc_info.value.status_code == 404 - assert "Document not found" in str(exc_info.value.detail) - - def test_start_job_with_different_formats( - self, - db: Session, - current_user: UserProjectOrg, - test_document: Tuple[Document, Project], - background_tasks: BackgroundTasks, - monkeypatch, - ) -> None: - """Test job creation with different target formats.""" - # Add the test transformer to the registry for this test - monkeypatch.setitem(TRANSFORMERS, "test", MockTestTransformer) - - document, _ = test_document - formats = ["markdown", "text", "html"] - - for target_format in formats: - job_id = start_job( - db=db, - current_user=current_user, - source_document_id=document.id, - transformer_name="test", - target_format=target_format, - background_tasks=background_tasks, - ) - - job = db.get(DocTransformationJob, job_id) - assert job is not None - assert job.status == TransformationStatus.PENDING - - task = background_tasks.tasks[-1] - assert task.args[3] == target_format - - @pytest.mark.parametrize("transformer_name", ["test"]) - def test_start_job_with_different_transformers( - self, - db: Session, - current_user: UserProjectOrg, - test_document: Tuple[Document, Project], - background_tasks: BackgroundTasks, - transformer_name: str, - monkeypatch, - ) -> None: - """Test job creation with different transformer names.""" - # Add the test transformer to the registry for this test - monkeypatch.setitem(TRANSFORMERS, "test", MockTestTransformer) - - document, _ = test_document - - job_id = start_job( - db=db, - current_user=current_user, - source_document_id=document.id, - transformer_name=transformer_name, - target_format="markdown", - background_tasks=background_tasks, - ) - - job = db.get(DocTransformationJob, job_id) - assert job is not None - assert job.status == TransformationStatus.PENDING - - task = background_tasks.tasks[-1] - assert task.args[2] == transformer_name diff --git a/backend/app/tests/crud/documents/test_crud_document_delete.py b/backend/app/tests/crud/documents/documents/test_crud_document_delete.py similarity index 100% rename from backend/app/tests/crud/documents/test_crud_document_delete.py rename to backend/app/tests/crud/documents/documents/test_crud_document_delete.py diff --git a/backend/app/tests/crud/documents/test_crud_document_read_many.py b/backend/app/tests/crud/documents/documents/test_crud_document_read_many.py similarity index 100% rename from backend/app/tests/crud/documents/test_crud_document_read_many.py rename to backend/app/tests/crud/documents/documents/test_crud_document_read_many.py diff --git a/backend/app/tests/crud/documents/test_crud_document_read_one.py b/backend/app/tests/crud/documents/documents/test_crud_document_read_one.py similarity index 100% rename from backend/app/tests/crud/documents/test_crud_document_read_one.py rename to backend/app/tests/crud/documents/documents/test_crud_document_read_one.py diff --git a/backend/app/tests/crud/documents/test_crud_document_update.py b/backend/app/tests/crud/documents/documents/test_crud_document_update.py similarity index 100% rename from backend/app/tests/crud/documents/test_crud_document_update.py rename to backend/app/tests/crud/documents/documents/test_crud_document_update.py diff --git a/backend/app/tests/crud/test_doc_transformation_job.py b/backend/app/tests/crud/documents/test_doc_transformation_job.py similarity index 61% rename from backend/app/tests/crud/test_doc_transformation_job.py rename to backend/app/tests/crud/documents/test_doc_transformation_job.py index 4f5f3ce68..e3df0a006 100644 --- a/backend/app/tests/crud/test_doc_transformation_job.py +++ b/backend/app/tests/crud/documents/test_doc_transformation_job.py @@ -1,7 +1,15 @@ import pytest from sqlmodel import Session -from app.crud.doc_transformation_job import DocTransformationJobCrud -from app.models import TransformationStatus +from sqlalchemy.exc import IntegrityError + +from app.crud.document.doc_transformation_job import DocTransformationJobCrud +from app.models import ( + TransformationStatus, + DocTransformJobCreate, + DocTransformJobUpdate, +) +from app.core.config import settings +from app.tests.utils.auth import TestAuthContext from app.core.exception_handlers import HTTPException from app.tests.utils.document import DocumentStore from app.tests.utils.utils import get_project, SequentialUuidGenerator @@ -23,55 +31,53 @@ class TestDocTransformationJobCrudCreate: def test_can_create_job_with_valid_document( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test creating a transformation job with a valid source document.""" document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) assert job.id is not None assert job.source_document_id == document.id assert job.status == TransformationStatus.PENDING assert job.error_message is None assert job.transformed_document_id is None - assert job.created_at is not None + assert job.inserted_at is not None assert job.updated_at is not None def test_cannot_create_job_with_invalid_document( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test that creating a job with non-existent document raises an error.""" + """With FK enforced, creating with a non-existent document should fail at commit.""" invalid_id = next(SequentialUuidGenerator()) - with pytest.raises(HTTPException) as exc_info: - crud.create(invalid_id) - - assert exc_info.value.status_code == 404 - assert "Document not found" in str(exc_info.value.detail) + with pytest.raises(IntegrityError): + crud.create(DocTransformJobCreate(source_document_id=invalid_id)) def test_cannot_create_job_with_deleted_document( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test that creating a job with a deleted document raises an error.""" + """ + Creation itself will succeed (FK exists), but later reads should treat it as not found + because read filters out deleted documents. + """ document = store.put() - # Mark document as deleted document.is_deleted = True db.add(document) db.commit() + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) + # read_one should 404 due to is_deleted=True on joined document with pytest.raises(HTTPException) as exc_info: - crud.create(document.id) - + crud.read_one(job.id) assert exc_info.value.status_code == 404 - assert "Document not found" in str(exc_info.value.detail) + assert "Transformation job not found" in str(exc_info.value.detail) class TestDocTransformationJobCrudReadOne: def test_can_read_existing_job( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test reading an existing transformation job.""" document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) result = crud.read_one(job.id) @@ -82,7 +88,6 @@ def test_can_read_existing_job( def test_cannot_read_nonexistent_job( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test that reading a non-existent job raises an error.""" invalid_id = next(SequentialUuidGenerator()) with pytest.raises(HTTPException) as exc_info: @@ -94,11 +99,9 @@ def test_cannot_read_nonexistent_job( def test_cannot_read_job_with_deleted_document( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test that reading a job whose source document is deleted raises an error.""" document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) - # Mark document as deleted document.is_deleted = True db.add(document) db.commit() @@ -112,12 +115,10 @@ def test_cannot_read_job_with_deleted_document( def test_cannot_read_job_from_different_project( self, db: Session, store: DocumentStore ): - """Test that reading a job from a different project raises an error.""" document = store.put() job_crud = DocTransformationJobCrud(db, store.project.id) - job = job_crud.create(document.id) + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) - # Try to read from different project other_project = create_test_project(db) other_crud = DocTransformationJobCrud(db, other_project.id) @@ -132,9 +133,11 @@ class TestDocTransformationJobCrudReadEach: def test_can_read_multiple_existing_jobs( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test reading multiple existing transformation jobs.""" documents = store.fill(3) - jobs = [crud.create(doc.id) for doc in documents] + jobs = [ + crud.create(DocTransformJobCreate(source_document_id=doc.id)) + for doc in documents + ] job_ids = {job.id for job in jobs} results = crud.read_each(job_ids) @@ -146,40 +149,37 @@ def test_can_read_multiple_existing_jobs( def test_read_partial_existing_jobs( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test reading a mix of existing and non-existing jobs.""" documents = store.fill(2) - jobs = [crud.create(doc.id) for doc in documents] + jobs = [ + crud.create(DocTransformJobCreate(source_document_id=doc.id)) + for doc in documents + ] job_ids = {job.id for job in jobs} - job_ids.add(next(SequentialUuidGenerator())) # Add non-existent ID + job_ids.add(next(SequentialUuidGenerator())) # non-existent results = crud.read_each(job_ids) - assert len(results) == 2 # Only existing jobs returned + assert len(results) == 2 result_ids = {job.id for job in results} assert result_ids == {job.id for job in jobs} def test_read_empty_job_set( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test reading an empty set of job IDs.""" results = crud.read_each(set()) - assert len(results) == 0 def test_cannot_read_jobs_from_different_project( self, db: Session, store: DocumentStore ): - """Test that jobs from different projects are not returned.""" document = store.put() job_crud = DocTransformationJobCrud(db, store.project.id) - job = job_crud.create(document.id) + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) - # Try to read from different project other_project = get_project(db, name="Dalgo") other_crud = DocTransformationJobCrud(db, other_project.id) results = other_crud.read_each({job.id}) - assert len(results) == 0 @@ -187,58 +187,66 @@ class TestDocTransformationJobCrudUpdateStatus: def test_can_update_status_to_processing( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test updating job status to processing.""" document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) - updated_job = crud.update_status(job.id, TransformationStatus.PROCESSING) + updated = crud.update( + job.id, + DocTransformJobUpdate(status=TransformationStatus.PROCESSING), + ) - assert updated_job.id == job.id - assert updated_job.status == TransformationStatus.PROCESSING - assert updated_job.updated_at >= job.updated_at + assert updated.id == job.id + assert updated.status == TransformationStatus.PROCESSING + assert updated.updated_at >= job.updated_at def test_can_update_status_to_completed_with_result( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test updating job status to completed with transformed document.""" source_document = store.put() transformed_document = store.put() - job = crud.create(source_document.id) + job = crud.create(DocTransformJobCreate(source_document_id=source_document.id)) - updated_job = crud.update_status( + updated = crud.update( job.id, - TransformationStatus.COMPLETED, - transformed_document_id=transformed_document.id, + DocTransformJobUpdate( + status=TransformationStatus.COMPLETED, + transformed_document_id=transformed_document.id, + ), ) - assert updated_job.status == TransformationStatus.COMPLETED - assert updated_job.transformed_document_id == transformed_document.id - assert updated_job.error_message is None + assert updated.status == TransformationStatus.COMPLETED + assert updated.transformed_document_id == transformed_document.id + assert updated.error_message is None def test_can_update_status_to_failed_with_error( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test updating job status to failed with error message.""" document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) error_msg = "Transformation failed due to invalid format" - updated_job = crud.update_status( - job.id, TransformationStatus.FAILED, error_message=error_msg + updated = crud.update( + job.id, + DocTransformJobUpdate( + status=TransformationStatus.FAILED, + error_message=error_msg, + ), ) - assert updated_job.status == TransformationStatus.FAILED - assert updated_job.error_message == error_msg - assert updated_job.transformed_document_id is None + assert updated.status == TransformationStatus.FAILED + assert updated.error_message == error_msg + assert updated.transformed_document_id is None def test_cannot_update_nonexistent_job( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test that updating a non-existent job raises an error.""" invalid_id = next(SequentialUuidGenerator()) with pytest.raises(HTTPException) as exc_info: - crud.update_status(invalid_id, TransformationStatus.PROCESSING) + crud.update( + invalid_id, + DocTransformJobUpdate(status=TransformationStatus.PROCESSING), + ) assert exc_info.value.status_code == 404 assert "Transformation job not found" in str(exc_info.value.detail) @@ -246,17 +254,20 @@ def test_cannot_update_nonexistent_job( def test_update_preserves_existing_fields( self, db: Session, store: DocumentStore, crud: DocTransformationJobCrud ): - """Test that updating status preserves other fields when not specified.""" + """Fields not present in the patch must be preserved by `update`.""" document = store.put() - job = crud.create(document.id) + job = crud.create(DocTransformJobCreate(source_document_id=document.id)) - # First update with error message - crud.update_status( - job.id, TransformationStatus.FAILED, error_message="Initial error" + crud.update( + job.id, + DocTransformJobUpdate( + status=TransformationStatus.FAILED, error_message="Initial error" + ), ) - # Second update without error message - should preserve it - updated_job = crud.update_status(job.id, TransformationStatus.PROCESSING) + updated = crud.update( + job.id, DocTransformJobUpdate(status=TransformationStatus.PROCESSING) + ) - assert updated_job.status == TransformationStatus.PROCESSING - assert updated_job.error_message == "Initial error" # Should be preserved + assert updated.status == TransformationStatus.PROCESSING + assert updated.error_message == "Initial error" diff --git a/backend/app/tests/core/doctransformer/test_service/conftest.py b/backend/app/tests/services/doctransformer/test_job/conftest.py similarity index 75% rename from backend/app/tests/core/doctransformer/test_service/conftest.py rename to backend/app/tests/services/doctransformer/test_job/conftest.py index 75574be06..edba7ec9d 100644 --- a/backend/app/tests/core/doctransformer/test_service/conftest.py +++ b/backend/app/tests/services/doctransformer/test_job/conftest.py @@ -4,7 +4,6 @@ import os from typing import Any, Callable, Generator, Tuple from unittest.mock import patch -from uuid import UUID import pytest from fastapi import BackgroundTasks @@ -12,7 +11,7 @@ from tenacity import retry, stop_after_attempt, wait_fixed from app.crud import get_project_by_id -from app.models import User +from app.services.doctransform import job from app.core.config import settings from app.models import Document, Project, UserProjectOrg from app.tests.utils.document import DocumentStore @@ -30,24 +29,41 @@ def aws_credentials() -> None: @pytest.fixture -def fast_execute_job() -> Generator[Callable[[int, UUID, str, str], Any], None, None]: +def fast_execute_job() -> ( + Generator[ + Callable[[int, str, str, str, str, str, str | None, Any], Any], None, None + ] +): """Create a version of execute_job without retry delays for faster testing.""" - from app.core.doctransform import service - original_execute_job = service.execute_job + original_execute_job = job.execute_job @retry( stop=stop_after_attempt(2), wait=wait_fixed(0.01) ) # Very fast retry for tests def fast_execute_job_func( - project_id: int, job_id: UUID, transformer_name: str, target_format: str + project_id: int, + job_id: str, + source_document_id: str, + transformer_name: str, + target_format: str, + task_id: str, + callback_url: str | None, + task_instance, ) -> Any: # Call the original function's implementation without the decorator return original_execute_job.__wrapped__( - project_id, job_id, transformer_name, target_format + project_id, + job_id, + source_document_id, + transformer_name, + target_format, + task_id, + callback_url, + task_instance, ) - with patch.object(service, "execute_job", fast_execute_job_func): + with patch.object(job, "execute_job", fast_execute_job_func): yield fast_execute_job_func diff --git a/backend/app/tests/core/doctransformer/test_service/test_execute_job.py b/backend/app/tests/services/doctransformer/test_job/test_execute_job.py similarity index 72% rename from backend/app/tests/core/doctransformer/test_service/test_execute_job.py rename to backend/app/tests/services/doctransformer/test_job/test_execute_job.py index 5f80644aa..43993d0cc 100644 --- a/backend/app/tests/core/doctransformer/test_service/test_execute_job.py +++ b/backend/app/tests/services/doctransformer/test_job/test_execute_job.py @@ -11,11 +11,11 @@ from tenacity import RetryError from app.crud import DocTransformationJobCrud, DocumentCrud -from app.core.doctransform.registry import TransformationError -from app.core.doctransform.service import execute_job +from app.services.doctransform.registry import TransformationError +from app.services.doctransform.job import execute_job from app.core.exception_handlers import HTTPException -from app.models import Document, Project, TransformationStatus -from app.tests.core.doctransformer.test_service.utils import ( +from app.models import Document, Project, TransformationStatus, DocTransformJobCreate +from app.tests.services.doctransformer.test_job.utils import ( DocTransformTestBase, MockTestTransformer, ) @@ -48,34 +48,34 @@ def test_execute_job_success( source_content = b"This is a test document for transformation." self.create_s3_document_content(aws, document, source_content) - # Create transformation job job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) - # Mock the Session to use our existing database session with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format=target_format, + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify job completion db.refresh(job) assert job.status == TransformationStatus.COMPLETED assert job.transformed_document_id is not None assert job.error_message is None - # Verify transformed document document_crud = DocumentCrud(session=db, project_id=project.id) transformed_doc = document_crud.read_one(job.transformed_document_id) assert transformed_doc is not None @@ -84,7 +84,6 @@ def test_execute_job_success( assert transformed_doc.source_document_id == document.id assert transformed_doc.object_store_url is not None - # Verify transformed content in S3 self.verify_s3_content(aws, transformed_doc) @mock_aws @@ -93,7 +92,9 @@ def test_execute_job_with_nonexistent_job( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, UUID, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test job execution with non-existent job ID.""" _, project = test_document @@ -101,9 +102,10 @@ def test_execute_job_with_nonexistent_job( nonexistent_job_id = uuid4() with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None @@ -112,9 +114,13 @@ def test_execute_job_with_nonexistent_job( with pytest.raises((HTTPException, RetryError)): fast_execute_job( project_id=project.id, - job_id=nonexistent_job_id, + job_id=str(nonexistent_job_id), + source_document_id=str(uuid4()), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) @mock_aws @@ -123,7 +129,9 @@ def test_execute_job_with_missing_source_document( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, UUID, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test job execution when source document is missing from S3.""" document, project = test_document @@ -131,13 +139,13 @@ def test_execute_job_with_missing_source_document( # Create job but don't upload document to S3 job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None @@ -145,12 +153,15 @@ def test_execute_job_with_missing_source_document( with pytest.raises(Exception): fast_execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify job was marked as failed db.refresh(job) assert job.status == TransformationStatus.FAILED assert job.error_message is not None @@ -162,7 +173,9 @@ def test_execute_job_with_transformer_error( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, UUID, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test job execution when transformer raises an error.""" document, project = test_document @@ -170,16 +183,16 @@ def test_execute_job_with_transformer_error( self.create_s3_document_content(aws, document) job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) # Mock convert_document to raise TransformationError with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.service.convert_document" + "app.services.doctransform.job.convert_document" ) as mock_convert, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None @@ -189,12 +202,15 @@ def test_execute_job_with_transformer_error( with pytest.raises((TransformationError, RetryError)): fast_execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify job was marked as failed db.refresh(job) assert job.status == TransformationStatus.FAILED assert "Mock transformation error" in job.error_message @@ -211,23 +227,27 @@ def test_execute_job_status_transitions( self.create_s3_document_content(aws, document) job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) initial_status = job.status - db.commit() with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) # Verify status progression by checking final job state @@ -258,13 +278,12 @@ def test_execute_job_with_different_content_types( expected_extension, ) in format_extensions: job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", + "app.services.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db @@ -272,11 +291,14 @@ def test_execute_job_with_different_content_types( execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format=target_format, + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify transformation completed and check file extension db.refresh(job) assert job.status == TransformationStatus.COMPLETED diff --git a/backend/app/tests/core/doctransformer/test_service/test_execute_job_errors.py b/backend/app/tests/services/doctransformer/test_job/test_execute_job_errors.py similarity index 69% rename from backend/app/tests/core/doctransformer/test_service/test_execute_job_errors.py rename to backend/app/tests/services/doctransformer/test_job/test_execute_job_errors.py index e719cd5ce..8e4759f10 100644 --- a/backend/app/tests/core/doctransformer/test_service/test_execute_job_errors.py +++ b/backend/app/tests/services/doctransformer/test_job/test_execute_job_errors.py @@ -2,6 +2,7 @@ Tests for retry mechanisms and error handling in document transformation service. """ from io import BytesIO +from uuid import uuid4 from typing import Any, Callable, Tuple from unittest.mock import patch @@ -10,12 +11,12 @@ from sqlmodel import Session from app.crud import DocTransformationJobCrud -from app.models import Document, Project, TransformationStatus -from app.tests.core.doctransformer.test_service.utils import ( +from app.models import Document, Project, TransformationStatus, DocTransformJobCreate +from app.tests.services.doctransformer.test_job.utils import ( DocTransformTestBase, MockTestTransformer, ) -from app.tests.core.doctransformer.test_service.utils import ( +from app.tests.services.doctransformer.test_job.utils import ( create_failing_convert_document, create_persistent_failing_convert_document, ) @@ -30,7 +31,9 @@ def test_execute_job_with_storage_error( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, Any, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test job execution when S3 upload fails.""" document, project = test_document @@ -38,16 +41,16 @@ def test_execute_job_with_storage_error( self.create_s3_document_content(aws, document) job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) # Mock storage.put to raise an error with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.service.get_cloud_storage" + "app.services.doctransform.job.get_cloud_storage" ) as mock_storage_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None @@ -59,9 +62,13 @@ def test_execute_job_with_storage_error( with pytest.raises(Exception): fast_execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) # Verify job was marked as failed @@ -76,7 +83,9 @@ def test_execute_job_retry_mechanism( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, Any, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test that retry mechanism works for transient failures.""" document, project = test_document @@ -84,30 +93,33 @@ def test_execute_job_retry_mechanism( self.create_s3_document_content(aws, document) job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) # Create a side effect that fails once then succeeds (fast retry will only try 2 times) failing_convert_document = create_failing_convert_document(fail_count=1) with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.service.convert_document", + "app.services.doctransform.job.convert_document", side_effect=failing_convert_document, ), patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None fast_execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify the function was retried and eventually succeeded db.refresh(job) assert job.status == TransformationStatus.COMPLETED @@ -118,7 +130,9 @@ def test_execute_job_exhausted_retries( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, Any, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test behavior when all retry attempts are exhausted.""" document, project = test_document @@ -126,8 +140,7 @@ def test_execute_job_exhausted_retries( self.create_s3_document_content(aws, document) job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) # Mock convert_document to always fail persistent_failing_convert_document = ( @@ -135,12 +148,13 @@ def test_execute_job_exhausted_retries( ) with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.service.convert_document", + "app.services.doctransform.job.convert_document", side_effect=persistent_failing_convert_document, ), patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None @@ -148,11 +162,14 @@ def test_execute_job_exhausted_retries( with pytest.raises(Exception): fast_execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify job was marked as failed after retries db.refresh(job) assert job.status == TransformationStatus.FAILED @@ -164,7 +181,9 @@ def test_execute_job_database_error_during_completion( self, db: Session, test_document: Tuple[Document, Project], - fast_execute_job: Callable[[int, Any, str, str], Any], + fast_execute_job: Callable[ + [int, str, str, str, str, str, str | None, Any], Any + ], ) -> None: """Test handling of database errors when updating job completion.""" document, project = test_document @@ -172,20 +191,20 @@ def test_execute_job_database_error_during_completion( self.create_s3_document_content(aws, document) job_crud = DocTransformationJobCrud(session=db, project_id=project.id) - job = job_crud.create(source_document_id=document.id) - db.commit() + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None # Mock DocumentCrud.update to fail when creating the transformed document with patch( - "app.core.doctransform.service.DocumentCrud" + "app.services.doctransform.job.DocumentCrud" ) as mock_doc_crud_class: mock_doc_crud_instance = mock_doc_crud_class.return_value mock_doc_crud_instance.read_one.return_value = ( @@ -198,9 +217,13 @@ def test_execute_job_database_error_during_completion( with pytest.raises(Exception): fast_execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) # Verify job was marked as failed diff --git a/backend/app/tests/core/doctransformer/test_service/test_integration.py b/backend/app/tests/services/doctransformer/test_job/test_integration.py similarity index 71% rename from backend/app/tests/core/doctransformer/test_service/test_integration.py rename to backend/app/tests/services/doctransformer/test_job/test_integration.py index 20df40862..3282b772c 100644 --- a/backend/app/tests/core/doctransformer/test_service/test_integration.py +++ b/backend/app/tests/services/doctransformer/test_job/test_integration.py @@ -2,23 +2,23 @@ Integration tests for document transformation service. """ from typing import Tuple +from uuid import uuid4 from unittest.mock import patch import pytest -from fastapi import BackgroundTasks from moto import mock_aws from sqlmodel import Session from app.crud import DocTransformationJobCrud, DocumentCrud -from app.core.doctransform.service import execute_job, start_job +from app.services.doctransform.job import execute_job, start_job from app.models import ( Document, - DocTransformationJob, Project, TransformationStatus, UserProjectOrg, + DocTransformJobCreate, ) -from app.tests.core.doctransformer.test_service.utils import ( +from app.tests.services.doctransformer.test_job.utils import ( DocTransformTestBase, MockTestTransformer, ) @@ -37,50 +37,51 @@ def test_execute_job_end_to_end_workflow( aws = self.setup_aws_s3() self.create_s3_document_content(aws, document) - # Start job using the service + job_crud = DocTransformationJobCrud(session=db, project_id=project.id) + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) + current_user = UserProjectOrg( id=1, email="test@example.com", project_id=project.id, organization_id=project.organization_id, ) - background_tasks = BackgroundTasks() - - job_id = start_job( - db=db, - current_user=current_user, - source_document_id=document.id, - transformer_name="test", - target_format="markdown", - background_tasks=background_tasks, - ) - # Verify job was created - job = db.get(DocTransformationJob, job_id) - assert job.status == TransformationStatus.PENDING - - # Execute the job manually (simulating background execution) with patch( - "app.core.doctransform.service.Session" - ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer} + "app.services.doctransform.job.start_low_priority_job", + return_value="fake-task-id", + ), patch("app.services.doctransform.job.Session") as mock_session_class, patch( + "app.services.doctransform.registry.TRANSFORMERS", + {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db mock_session_class.return_value.__exit__.return_value = None + returned_job_id = start_job( + db=db, + current_user=current_user, + job_id=job.id, + transformer_name="test", + target_format="markdown", + callback_url=None, + ) + assert job.id == returned_job_id + execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify complete workflow db.refresh(job) assert job.status == TransformationStatus.COMPLETED assert job.transformed_document_id is not None - # Verify transformed document exists and is valid document_crud = DocumentCrud(session=db, project_id=project.id) transformed_doc = document_crud.read_one(job.transformed_document_id) assert transformed_doc.source_document_id == document.id @@ -100,16 +101,14 @@ def test_execute_job_concurrent_jobs( job_crud = DocTransformationJobCrud(session=db, project_id=project.id) jobs = [] for i in range(3): - job = job_crud.create(source_document_id=document.id) + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) jobs.append(job) - db.commit() - # Execute all jobs for job in jobs: with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", + "app.services.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db @@ -117,12 +116,15 @@ def test_execute_job_concurrent_jobs( execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format="markdown", + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify all jobs completed successfully for job in jobs: db.refresh(job) assert job.status == TransformationStatus.COMPLETED @@ -141,19 +143,16 @@ def test_multiple_format_transformations( formats = ["markdown", "text", "html"] jobs = [] - # Create jobs for different formats job_crud = DocTransformationJobCrud(session=db, project_id=project.id) for target_format in formats: - job = job_crud.create(source_document_id=document.id) + job = job_crud.create(DocTransformJobCreate(source_document_id=document.id)) jobs.append((job, target_format)) - db.commit() - # Execute all jobs for job, target_format in jobs: with patch( - "app.core.doctransform.service.Session" + "app.services.doctransform.job.Session" ) as mock_session_class, patch( - "app.core.doctransform.registry.TRANSFORMERS", + "app.services.doctransform.registry.TRANSFORMERS", {"test": MockTestTransformer}, ): mock_session_class.return_value.__enter__.return_value = db @@ -161,12 +160,15 @@ def test_multiple_format_transformations( execute_job( project_id=project.id, - job_id=job.id, + job_id=str(job.id), + source_document_id=str(document.id), transformer_name="test", target_format=target_format, + task_id=str(uuid4()), + callback_url=None, + task_instance=None, ) - # Verify all jobs completed successfully with correct formats document_crud = DocumentCrud(session=db, project_id=project.id) for i, (job, target_format) in enumerate(jobs): db.refresh(job) @@ -175,7 +177,6 @@ def test_multiple_format_transformations( transformed_doc = document_crud.read_one(job.transformed_document_id) assert transformed_doc is not None - # Verify correct file extension based on format if target_format == "markdown": assert transformed_doc.fname.endswith(".md") elif target_format == "text": diff --git a/backend/app/tests/services/doctransformer/test_job/test_start_job.py b/backend/app/tests/services/doctransformer/test_job/test_start_job.py new file mode 100644 index 000000000..60e3dadee --- /dev/null +++ b/backend/app/tests/services/doctransformer/test_job/test_start_job.py @@ -0,0 +1,193 @@ +""" +Tests for the start_job function in document transformation service. +""" +import pytest +from unittest.mock import patch +from uuid import uuid4 + +from sqlmodel import Session + +from app.services.doctransform.job import start_job +from app.services.doctransform.registry import TRANSFORMERS +from app.core.exception_handlers import HTTPException +from app.crud import DocTransformationJobCrud +from app.models import ( + Document, + DocTransformationJob, + Project, + TransformationStatus, + UserProjectOrg, + DocTransformJobCreate, +) +from app.tests.services.doctransformer.test_job.utils import ( + DocTransformTestBase, + MockTestTransformer, +) + + +class TestStartJob(DocTransformTestBase): + """Test cases for the start_job function.""" + + def _create_job(self, db: Session, project_id: int, source_document_id): + job = DocTransformJobCreate(source_document_id=source_document_id) + job = DocTransformationJobCrud(db, project_id=project_id).create(job) + return job + + def test_start_job_success( + self, + db: Session, + current_user: UserProjectOrg, + test_document: tuple[Document, Project], + ) -> None: + """start_job should enqueue execute_job with correct kwargs and return the same job id.""" + document, _project = test_document + + job = self._create_job(db, current_user.project_id, document.id) + + with patch( + "app.services.doctransform.job.start_low_priority_job" + ) as mock_schedule: + mock_schedule.return_value = "fake-task-id" + + returned_job_id = start_job( + db=db, + current_user=current_user, + job_id=job.id, + transformer_name="test-transformer", + target_format="markdown", + callback_url=None, + ) + + assert returned_job_id == job.id + + job = db.get(DocTransformationJob, job.id) + assert job is not None + assert job.source_document_id == document.id + assert job.status == TransformationStatus.PENDING + assert job.error_message is None + assert job.transformed_document_id is None + + mock_schedule.assert_called_once() + kwargs = mock_schedule.call_args.kwargs + assert kwargs["function_path"] == "app.services.doctransform.job.execute_job" + assert kwargs["project_id"] == current_user.project_id + assert kwargs["job_id"] == str(job.id) + assert kwargs["source_document_id"] == str(job.source_document_id) + assert kwargs["transformer_name"] == "test-transformer" + assert kwargs["target_format"] == "markdown" + assert kwargs["callback_url"] is None + + def test_start_job_with_nonexistent_document( + self, + db: Session, + current_user: UserProjectOrg, + ) -> None: + """ + Previously: start_job validated document and raised 404. + Now: start_job expects an existing job; the equivalent negative case is a non-existent JOB. + """ + nonexistent_job_id = uuid4() + + with pytest.raises(HTTPException): + with patch( + "app.services.doctransform.job.start_low_priority_job" + ) as mock_schedule: + mock_schedule.return_value = "fake-task-id" + start_job( + db=db, + current_user=current_user, + job_id=nonexistent_job_id, + transformer_name="test-transformer", + target_format="markdown", + callback_url=None, + ) + + def test_start_job_with_different_formats( + self, + db: Session, + current_user: UserProjectOrg, + test_document: tuple[Document, Project], + monkeypatch, + ) -> None: + """Ensure start_job passes target_format through to the scheduler.""" + monkeypatch.setitem(TRANSFORMERS, "test", MockTestTransformer) + + document, _ = test_document + formats = ["markdown", "text", "html"] + + with patch( + "app.services.doctransform.job.start_low_priority_job" + ) as mock_schedule: + mock_schedule.return_value = "fake-task-id" + + for target_format in formats: + job = self._create_job(db, current_user.project_id, document.id) + + returned_job_id = start_job( + db=db, + current_user=current_user, + job_id=job.id, + transformer_name="test", + target_format=target_format, + callback_url=None, + ) + + # job row still PENDING + job = db.get(DocTransformationJob, job.id) + assert job is not None + assert job.status == TransformationStatus.PENDING + + # scheduler called with correct kwargs + kwargs = mock_schedule.call_args.kwargs + assert kwargs["target_format"] == target_format + assert ( + kwargs["function_path"] + == "app.services.doctransform.job.execute_job" + ) + assert kwargs["project_id"] == current_user.project_id + assert kwargs["job_id"] == str(job.id) + assert kwargs["source_document_id"] == str(job.source_document_id) + assert kwargs["transformer_name"] == "test" + assert returned_job_id == job.id # new start_job returns the same UUID + + @pytest.mark.parametrize("transformer_name", ["test"]) + def test_start_job_with_different_transformers( + self, + db: Session, + current_user: UserProjectOrg, + test_document: tuple[Document, Project], + transformer_name: str, + monkeypatch, + ) -> None: + """Ensure start_job passes transformer_name through to the scheduler.""" + monkeypatch.setitem(TRANSFORMERS, "test", MockTestTransformer) + + document, _ = test_document + job = self._create_job(db, current_user.project_id, document.id) + + with patch( + "app.services.doctransform.job.start_low_priority_job" + ) as mock_schedule: + mock_schedule.return_value = "fake-task-id" + + returned_job_id = start_job( + db=db, + current_user=current_user, + job_id=job.id, + transformer_name=transformer_name, + target_format="markdown", + callback_url=None, + ) + + job = db.get(DocTransformationJob, job.id) + assert job is not None + assert job.status == TransformationStatus.PENDING + + kwargs = mock_schedule.call_args.kwargs + assert kwargs["transformer_name"] == transformer_name + assert kwargs["target_format"] == "markdown" + assert kwargs["function_path"] == "app.services.doctransform.job.execute_job" + assert kwargs["project_id"] == current_user.project_id + assert kwargs["job_id"] == str(job.id) + assert kwargs["source_document_id"] == str(job.source_document_id) + assert returned_job_id == job.id diff --git a/backend/app/tests/core/doctransformer/test_service/utils.py b/backend/app/tests/services/doctransformer/test_job/utils.py similarity index 98% rename from backend/app/tests/core/doctransformer/test_service/utils.py rename to backend/app/tests/services/doctransformer/test_job/utils.py index 82451adde..277c5208c 100644 --- a/backend/app/tests/core/doctransformer/test_service/utils.py +++ b/backend/app/tests/services/doctransformer/test_job/utils.py @@ -9,7 +9,7 @@ from app.core.cloud import AmazonCloudStorageClient from app.core.config import settings -from app.core.doctransform.transformer import Transformer +from app.services.doctransform.transformer import Transformer from app.models import Document diff --git a/backend/app/tests/core/doctransformer/test_registry.py b/backend/app/tests/services/doctransformer/test_registry.py similarity index 93% rename from backend/app/tests/core/doctransformer/test_registry.py rename to backend/app/tests/services/doctransformer/test_registry.py index 7f8303fff..0b1909f4e 100644 --- a/backend/app/tests/core/doctransformer/test_registry.py +++ b/backend/app/tests/services/doctransformer/test_registry.py @@ -1,5 +1,6 @@ import pytest -from app.core.doctransform.registry import ( + +from app.services.doctransform.registry import ( get_file_format, get_supported_transformations, is_transformation_supported, @@ -19,7 +20,7 @@ def patched_transformations(monkeypatch): ("pdf", "markdown"): {"default": "zerox", "zerox": "zerox"}, } monkeypatch.setattr( - "app.core.doctransform.registry.SUPPORTED_TRANSFORMATIONS", mapping + "app.services.doctransform.registry.SUPPORTED_TRANSFORMATIONS", mapping ) return mapping @@ -46,7 +47,7 @@ def test_get_supported_transformations(patched_transformations): def test_is_transformation_supported(monkeypatch): monkeypatch.setattr( - "app.core.doctransform.registry.SUPPORTED_TRANSFORMATIONS", + "app.services.doctransform.registry.SUPPORTED_TRANSFORMATIONS", {("docx", "pdf"): {"default": "pandoc"}}, ) assert is_transformation_supported("docx", "pdf")