Skip to content
Closed
2 changes: 1 addition & 1 deletion backend/app/api/docs/documents/upload.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Upload a document to Kaapi.

- If only a file is provided, the document will be uploaded and stored, and its ID will be returned.
- If only a file is provided, the document will be uploaded and stored, and its ID will be returned. The maximum file size allowed for upload is 25 MB.
- If a target format is specified, a transformation job will also be created to transform document into target format in the background. The response will include both the uploaded document details and information about the transformation job.
- If a callback URL is provided, you will receive a notification at that URL once the document transformation job is completed.

Expand Down
44 changes: 44 additions & 0 deletions backend/app/services/collections/create_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ def execute_job(
result = None
creation_request = None
provider = None
storage = None

with log_context(
tag="collection",
Expand Down Expand Up @@ -221,6 +222,49 @@ def execute_job(
organization_id=organization_id,
)

with Session(engine) as session:
document_crud = DocumentCrud(session, project_id)
flat_docs = document_crud.read_each(creation_request.documents)
storage = get_cloud_storage(session=session, project_id=project_id)

file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname}

backfill: list[tuple[UUID, float]] = []
for doc in flat_docs:
if doc.file_size_kb is None:
size_kb = round(storage.get_file_size_kb(doc.object_store_url))
doc.file_size_kb = size_kb
backfill.append((doc.id, size_kb))
Comment on lines +232 to +237
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
rg -nP -C2 '\bfile_size_kb\b' --type=py -g '!**/tests/**'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 4257


🏁 Script executed:

#!/bin/bash
# Check the storage.py file around the get_file_size_kb function
wc -l backend/app/core/cloud/storage.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 112


🏁 Script executed:

#!/bin/bash
# Read lines 220-230 from storage.py to see get_file_size_kb implementation
sed -n '220,230p' backend/app/core/cloud/storage.py

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 662


Remove bare round(...) to preserve fractional KB precision.

storage.get_file_size_kb() returns a float already rounded to 2 decimals (e.g., 512.37). Wrapping it with bare round(...) truncates to an integer, causing precision loss. This contradicts the declared type backfill: list[tuple[UUID, float]] and the Document.file_size_kb column type (Float in the database schema).

🔧 Proposed fix
-        backfill: list[tuple[UUID, float]] = []
-        for doc in flat_docs:
-            if doc.file_size_kb is None:
-                size_kb = round(storage.get_file_size_kb(doc.object_store_url))
-                doc.file_size_kb = size_kb
-                backfill.append((doc.id, size_kb))
+        backfill: list[tuple[UUID, float]] = []
+        for doc in flat_docs:
+            if doc.file_size_kb is None:
+                size_kb = storage.get_file_size_kb(doc.object_store_url)
+                doc.file_size_kb = size_kb
+                backfill.append((doc.id, size_kb))
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
backfill: list[tuple[UUID, float]] = []
for doc in flat_docs:
if doc.file_size_kb is None:
size_kb = round(storage.get_file_size_kb(doc.object_store_url))
doc.file_size_kb = size_kb
backfill.append((doc.id, size_kb))
backfill: list[tuple[UUID, float]] = []
for doc in flat_docs:
if doc.file_size_kb is None:
size_kb = storage.get_file_size_kb(doc.object_store_url)
doc.file_size_kb = size_kb
backfill.append((doc.id, size_kb))
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/create_collection.py` around lines 176 -
181, The code is truncating fractional KB precision by wrapping
storage.get_file_size_kb(doc.object_store_url) in round(...); instead, remove
the bare round call and assign the returned float directly to doc.file_size_kb
and to the backfill tuple (which is declared as list[tuple[UUID, float]]), e.g.
call size_kb = storage.get_file_size_kb(...), set doc.file_size_kb = size_kb and
append (doc.id, size_kb) so the float precision is preserved.


total_size_kb = sum(
doc.file_size_kb for doc in flat_docs if doc.file_size_kb is not None
)
total_size_mb = total_size_kb / 1024
Comment on lines +232 to +242
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Single storage failure aborts the entire collection job.

storage.get_file_size_kb(doc.object_store_url) is called synchronously per document with no per-document error handling. Per backend/app/core/cloud/storage.py:220-230, this wraps a head_object call that will raise on any error (missing object, IAM issue, transient S3 outage, mid-deletion, etc.). A single failure here propagates to the outer try at line 267 and marks the whole job FAILED — even for collections where most documents already have file_size_kb persisted or where one legacy doc is simply missing from S3.

Given this PR's goal is to backfill historical data, recovering gracefully from per-doc failures is likely the safer posture:

🛡️ Suggested per-doc guard
         backfill: list[tuple[UUID, float]] = []
         for doc in flat_docs:
             if doc.file_size_kb is None:
-                size_kb = round(storage.get_file_size_kb(doc.object_store_url))
-                doc.file_size_kb = size_kb
-                backfill.append((doc.id, size_kb))
+                try:
+                    size_kb = storage.get_file_size_kb(doc.object_store_url)
+                except Exception as err:
+                    logger.warning(
+                        f"[execute_job] Failed to backfill file_size_kb | "
+                        f"{{'doc_id': '{doc.id}', 'error': '{err}'}}"
+                    )
+                    continue
+                doc.file_size_kb = size_kb
+                backfill.append((doc.id, size_kb))

Note: if you keep fail-fast semantics intentionally, the downstream if doc.file_size_kb is not None filter at line 184 and the TypeError-raising path in batch_documents become inconsistent with each other — pick one contract and document it.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/create_collection.py` around lines 176 -
186, The backfill loop in create_collection.py currently calls
storage.get_file_size_kb(doc.object_store_url) without per-document error
handling, so a single storage error aborts the whole job; wrap the call inside a
try/except around the storage.get_file_size_kb invocation in the loop over
flat_docs (the block that assigns size_kb, sets doc.file_size_kb and appends to
backfill) to catch storage errors, log or record the failing doc.id and error,
skip that document (leave doc.file_size_kb as None) and continue processing the
rest; keep the existing total_size_kb calculation (which already ignores None)
so a single failure doesn't raise, and optionally add a note to record
failed_backfills for later retry/metric.


with Session(engine) as session:
if backfill:
document_crud = DocumentCrud(session, project_id)
for doc_id, size_kb in backfill:
doc = document_crud.read_one(doc_id)
doc.file_size_kb = size_kb
document_crud.update(doc)

collection_job_crud = CollectionJobCrud(session, project_id)
collection_job = collection_job_crud.read_one(job_uuid)
collection_job = collection_job_crud.update(
job_uuid,
CollectionJobUpdate(
task_id=task_id,
status=CollectionJobStatus.PROCESSING,
total_size_mb=round(total_size_mb, 2),
),
)

provider = get_llm_provider(
session=session,
provider=creation_request.provider,
project_id=project_id,
organization_id=organization_id,
with tracer.start_as_current_span("collections.create.provider"):
Comment on lines +225 to 268
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
python - <<'PY'
import ast
from pathlib import Path

path = Path("backend/app/services/collections/create_collection.py")
try:
    ast.parse(path.read_text(), filename=str(path))
except SyntaxError as err:
    print(f"{path}:{err.lineno}:{err.offset}: {err.msg}")
    raise SystemExit(1)

print("syntax ok")
PY

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 172


🏁 Script executed:

cat -n backend/app/services/collections/create_collection.py | sed -n '175,275p'

Repository: ProjectTech4DevAI/kaapi-backend

Length of output: 5065


Fix indentation and close the get_llm_provider() call to resolve syntax errors.

The try block starting at line 184 is incomplete—line 225 onwards should be indented within it, and the get_llm_provider() call at line 263 needs a closing parenthesis before the with tracer.start_as_current_span() statement at line 268.

Proposed structural fix
-        with Session(engine) as session:
-            document_crud = DocumentCrud(session, project_id)
-            flat_docs = document_crud.read_each(creation_request.documents)
-            storage = get_cloud_storage(session=session, project_id=project_id)
+            with Session(engine) as session:
+                document_crud = DocumentCrud(session, project_id)
+                flat_docs = document_crud.read_each(creation_request.documents)
+                storage = get_cloud_storage(session=session, project_id=project_id)

-        file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname}
+            file_exts = {doc.fname.split(".")[-1] for doc in flat_docs if "." in doc.fname}

-        backfill: list[tuple[UUID, float]] = []
-        for doc in flat_docs:
-            if doc.file_size_kb is None:
-                size_kb = round(storage.get_file_size_kb(doc.object_store_url))
-                doc.file_size_kb = size_kb
-                backfill.append((doc.id, size_kb))
+            backfill: list[tuple[UUID, float]] = []
+            for doc in flat_docs:
+                if doc.file_size_kb is None:
+                    size_kb = round(storage.get_file_size_kb(doc.object_store_url))
+                    doc.file_size_kb = size_kb
+                    backfill.append((doc.id, size_kb))

-        total_size_kb = sum(
-            doc.file_size_kb for doc in flat_docs if doc.file_size_kb is not None
-        )
-        total_size_mb = total_size_kb / 1024
+            total_size_kb = sum(
+                doc.file_size_kb for doc in flat_docs if doc.file_size_kb is not None
+            )
+            total_size_mb = total_size_kb / 1024

-        with Session(engine) as session:
-            if backfill:
-                document_crud = DocumentCrud(session, project_id)
-                for doc_id, size_kb in backfill:
-                    doc = document_crud.read_one(doc_id)
-                    doc.file_size_kb = size_kb
-                    document_crud.update(doc)
+            with Session(engine) as session:
+                if backfill:
+                    document_crud = DocumentCrud(session, project_id)
+                    for doc_id, size_kb in backfill:
+                        doc = document_crud.read_one(doc_id)
+                        doc.file_size_kb = size_kb
+                        document_crud.update(doc)

-            collection_job_crud = CollectionJobCrud(session, project_id)
-            collection_job = collection_job_crud.read_one(job_uuid)
-            collection_job = collection_job_crud.update(
-                job_uuid,
-                CollectionJobUpdate(
-                    task_id=task_id,
-                    status=CollectionJobStatus.PROCESSING,
-                    total_size_mb=round(total_size_mb, 2),
-                ),
-            )
+                collection_job_crud = CollectionJobCrud(session, project_id)
+                collection_job = collection_job_crud.read_one(job_uuid)
+                collection_job = collection_job_crud.update(
+                    job_uuid,
+                    CollectionJobUpdate(
+                        task_id=task_id,
+                        status=CollectionJobStatus.PROCESSING,
+                        total_size_mb=round(total_size_mb, 2),
+                    ),
+                )

-            provider = get_llm_provider(
-                session=session,
-                provider=creation_request.provider,
-                project_id=project_id,
-                organization_id=organization_id,
+                provider = get_llm_provider(
+                    session=session,
+                    provider=creation_request.provider,
+                    project_id=project_id,
+                    organization_id=organization_id,
+                )
🧰 Tools
🪛 Ruff (0.15.10)

[warning] 225-225: Expected except or finally after try block

(invalid-syntax)


[warning] 267-268: Expected ), found newline

(invalid-syntax)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@backend/app/services/collections/create_collection.py` around lines 225 -
268, The code block starting after the try around the Session usage is
mis-indented and the get_llm_provider(...) call is missing its closing
parenthesis before the subsequent with tracer.start_as_current_span(...); fix by
moving the entire block (the with Session(engine) as session: block that
initializes DocumentCrud, computes backfill, updates documents and
collection_job) to be inside the try scope and add the closing ')' to the
get_llm_provider call so it becomes get_llm_provider(session=..., provider=...,
project_id=..., organization_id=...) immediately before the with
tracer.start_as_current_span("collections.create.provider") line; ensure
indentation of the following lines (provider variable use and the tracer
context) matches the surrounding try block.

result = provider.create(
collection_request=creation_request,
Expand Down
3 changes: 1 addition & 2 deletions backend/app/services/collections/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
MAX_DOC_SIZE_MB = 25 # 25 MB maximum per document

# Maximum batch size for uploading documents to vector store
# Derived from MAX_DOC_SIZE + buffer to ensure single docs always fit
MAX_BATCH_SIZE_KB = (MAX_DOC_SIZE_MB + 5) * 1024 # 30 MB in KB (25 + 5 MB buffer)
MAX_BATCH_COUNT = 200 # Maximum documents per batch

Expand Down Expand Up @@ -83,7 +82,7 @@ def batch_documents(documents: list[Document]) -> list[list[Document]]:
current_batch_size_kb = 0

for doc in documents:
doc_size_kb = doc.file_size_kb or 0
doc_size_kb = doc.file_size_kb

would_exceed_size = (current_batch_size_kb + doc_size_kb) > MAX_BATCH_SIZE_KB
would_exceed_count = len(current_batch) >= MAX_BATCH_COUNT
Expand Down
10 changes: 4 additions & 6 deletions backend/app/tests/services/collections/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,12 @@ def test_batch_documents_mixed_size_batching() -> None:
assert len(batches[2]) == 1 # 15 MB total


def test_batch_documents_with_none_file_size() -> None:
"""Test that documents with None file_size are treated as 0 bytes."""
def test_batch_documents_with_none_file_size_raises() -> None:
"""Test that documents with None file_size raise TypeError — sizes must be backfilled before batching."""
docs = create_fake_documents(10, file_size_kb=None)
batches = helpers.batch_documents(docs)

# All files with None/0 size should fit in one batch (under both limits)
assert len(batches) == 1
assert len(batches[0]) == 10
with pytest.raises(TypeError):
helpers.batch_documents(docs)


def test_batch_documents_empty_input() -> None:
Expand Down
Loading