Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions backend/app/alembic/versions/058_make_llm_call_fields_nullable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
"""make llm_call input_type, provider, model nullable

Revision ID: 058
Revises: 057
Create Date: 2026-05-11 00:00:00.000000

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "058"
down_revision = "057"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.alter_column("llm_call", "input_type", nullable=True)
op.alter_column("llm_call", "provider", nullable=True)
op.alter_column("llm_call", "model", nullable=True)


def downgrade() -> None:
op.alter_column("llm_call", "model", nullable=False)
op.alter_column("llm_call", "provider", nullable=False)
op.alter_column("llm_call", "input_type", nullable=False)
Comment on lines +25 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Downgrade can fail once NULL rows exist.

Setting these columns back to nullable=False without backfilling will fail on rollback if any row has NULL in input_type, provider, or model.

Suggested migration hardening
 def downgrade() -> None:
+    op.execute(
+        """
+        UPDATE llm_call
+        SET input_type = COALESCE(input_type, 'text'),
+            provider = COALESCE(provider, 'unknown'),
+            model = COALESCE(model, 'unknown')
+        """
+    )
     op.alter_column("llm_call", "model", nullable=False)
     op.alter_column("llm_call", "provider", nullable=False)
     op.alter_column("llm_call", "input_type", nullable=False)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/alembic/versions/058_make_llm_call_fields_nullable.py` around
lines 25 - 28, The downgrade() changes try to set llm_call.model,
llm_call.provider and llm_call.input_type to nullable=False which will fail if
any rows contain NULL; modify downgrade() to first backfill NULL values for
these columns (or abort with a clear error) before calling op.alter_column, e.g.
run UPDATE statements against the "llm_call" table to set sensible defaults for
model, provider and input_type or raise a RuntimeError if NULLs exist, then call
op.alter_column("llm_call", "model", nullable=False),
op.alter_column("llm_call", "provider", nullable=False) and
op.alter_column("llm_call", "input_type", nullable=False).

43 changes: 42 additions & 1 deletion backend/app/core/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,13 @@
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import Literal
from urllib.parse import unquote, urlparse
from uuid import UUID

from starlette.datastructures import Headers, UploadFile

from app.core.cloud.storage import CloudStorage, CloudStorageError
from typing import Literal

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -207,6 +208,46 @@ def load_json_from_object_store(storage: CloudStorage, url: str) -> list | dict
return None


_MIME_TO_EXT: dict[str, str] = {
"audio/mpeg": "mp3",
"audio/mp3": "mp3",
"audio/ogg": "ogg",
"audio/wav": "wav",
"audio/wave": "wav",
"audio/x-wav": "wav",
"audio/webm": "webm",
"audio/mp4": "mp4",
"audio/aac": "aac",
"audio/flac": "flac",
}


def upload_audio_bytes_to_s3(
storage: CloudStorage,
audio_bytes: bytes,
call_id: UUID,
mime_type: str | None,
prefix: str,
) -> str | None:
"""Upload decoded audio bytes to S3 and return the s3:// URI.

Args:
storage: CloudStorage instance
audio_bytes: Raw audio bytes
call_id: LLM call UUID used as the filename stem
mime_type: MIME type of the audio (determines file extension)
prefix: S3 subdirectory, e.g. "llm/tts/audio" or "llm/stt/audio"

Returns:
s3:// URI if successful, None on failure
"""
ext = _MIME_TO_EXT.get(mime_type or "", "wav")
filename = f"{call_id}.{ext}"
return upload_to_object_store(
storage, audio_bytes, filename, prefix, mime_type or "audio/wav"
)


def generate_timestamped_filename(base_name: str, extension: str = "csv") -> str:
"""
Generate a filename with timestamp.
Expand Down
134 changes: 129 additions & 5 deletions backend/app/crud/llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
logger = logging.getLogger(__name__)


def serialize_input(query_input: QueryInput | str) -> str:
def serialize_input(query_input: QueryInput | str | list) -> str:
"""Serialize query input for database storage.

For text: stores the actual content value
Expand All @@ -31,6 +31,15 @@ def serialize_input(query_input: QueryInput | str) -> str:
elif isinstance(query_input, TextInput):
return query_input.content.value
elif isinstance(query_input, AudioInput):
if query_input.content.format == "url":
return json.dumps(
{
"type": "audio",
"format": "url",
"mime_type": query_input.content.mime_type,
"url": query_input.content.value,
}
)
return json.dumps(
{
"type": "audio",
Expand Down Expand Up @@ -187,11 +196,12 @@ def update_llm_call_response(
db_llm_call.provider_response_id = provider_response_id

if content is not None:
# For audio outputs (AudioOutput model): calculate size metadata from base64 content
# AudioOutput serializes as: {"type": "audio", "content": {"format": "base64", "value": "...", "mime_type": "..."}}
# For audio outputs: calculate size only when content is still base64 (not a URI)
if content.get("type") == "audio":
audio_value = content.get("content", {}).get("value")
if audio_value:
audio_content = content.get("content", {})
audio_format = audio_content.get("format")
audio_value = audio_content.get("value")
if audio_value and audio_format == "base64":
try:
audio_data = base64.b64decode(audio_value)
content["audio_size_bytes"] = len(audio_data)
Expand All @@ -218,6 +228,27 @@ def update_llm_call_response(
return db_llm_call


def update_llm_call_input(
session: Session,
llm_call_id: UUID,
s3_uri: str,
) -> None:
"""Overwrite llm_call.input with an S3 URI after uploading STT audio."""
db_llm_call = session.get(LlmCall, llm_call_id)
if not db_llm_call:
logger.warning(
f"[update_llm_call_input] LLM call not found | llm_call_id={llm_call_id}"
)
return
db_llm_call.input = s3_uri
db_llm_call.updated_at = now()
session.add(db_llm_call)
session.commit()
logger.info(
f"[update_llm_call_input] Updated input URI | llm_call_id={llm_call_id}"
)


def get_llm_call_by_id(
session: Session,
llm_call_id: UUID,
Expand Down Expand Up @@ -248,3 +279,96 @@ def get_llm_calls_by_job_id(
)

return list(session.exec(statement).all())


def get_llm_call_by_job_id(session: Session, job_id: UUID) -> LlmCall | None:
"""Return the single active LlmCall for a standalone job (no chain_id)."""
statement = select(LlmCall).where(
LlmCall.job_id == job_id,
LlmCall.chain_id.is_(None),
LlmCall.deleted_at.is_(None),
)
return session.exec(statement).first()
Comment on lines +284 to +291
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Make get_llm_call_by_job_id deterministic for pending-row updates.

Using .first() without ordering/pending filters can pick the wrong row when multiple standalone calls exist for the same job, causing updates to land on an incorrect record.

Suggested query tightening
 def get_llm_call_by_job_id(session: Session, job_id: UUID) -> LlmCall | None:
     """Return the single active LlmCall for a standalone job (no chain_id)."""
     statement = select(LlmCall).where(
         LlmCall.job_id == job_id,
         LlmCall.chain_id.is_(None),
         LlmCall.deleted_at.is_(None),
+        LlmCall.provider.is_(None),
+        LlmCall.model.is_(None),
+        LlmCall.content.is_(None),
     )
-    return session.exec(statement).first()
+    statement = statement.order_by(LlmCall.created_at.desc())
+    return session.exec(statement).first()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@backend/app/crud/llm.py` around lines 284 - 291, get_llm_call_by_job_id is
non-deterministic because it uses .first() without filtering or ordering, which
can return the wrong standalone LlmCall when multiple rows exist; update the
query in get_llm_call_by_job_id to restrict to the intended active/pending row
(e.g., add a predicate such as LlmCall.status == "pending" or LlmCall.is_active)
and add a deterministic ORDER BY (for example LlmCall.created_at.desc() or
LlmCall.id.desc()) before taking the first result so updates always target the
correct record.



def create_llm_call_pending(
session: Session,
*,
job_id: UUID,
project_id: int,
organization_id: int,
request: LLMCallRequest,
chain_id: UUID | None = None,
) -> LlmCall:
"""Create a minimal LlmCall row at job-creation time.

Only fields available before config resolution are populated.
input_type, output_type, provider, model, content, usage stay NULL
and are filled in by the Celery task via update_llm_call_resolved_fields().
"""
config_dict: dict[str, Any] | None = None
if request.config.is_stored_config:
config_dict = {
"config_id": str(request.config.id),
"config_version": request.config.version,
}

conversation_id = None
auto_create = None
if request.query.conversation:
conversation_id = request.query.conversation.id
auto_create = request.query.conversation.auto_create

db_llm_call = LlmCall(
job_id=job_id,
project_id=project_id,
organization_id=organization_id,
chain_id=chain_id,
input=serialize_input(request.query.input),
conversation_id=conversation_id,
auto_create=auto_create,
config=config_dict,
)

session.add(db_llm_call)
session.commit()
session.refresh(db_llm_call)

logger.info(
f"[create_llm_call_pending] Created pending LLM call id={db_llm_call.id}, job_id={job_id}"
)

return db_llm_call


def update_llm_call_resolved_fields(
session: Session,
*,
llm_call_id: UUID,
input_type: str,
output_type: str | None,
provider: str | None,
model: str | None,
config: dict[str, Any],
) -> LlmCall:
"""Populate config-resolved fields on a pending LlmCall row."""
db_llm_call = session.get(LlmCall, llm_call_id)
if not db_llm_call:
raise ValueError(f"LLM call not found with id={llm_call_id}")

db_llm_call.input_type = input_type
db_llm_call.output_type = output_type
db_llm_call.provider = provider
db_llm_call.model = model
db_llm_call.config = config
db_llm_call.updated_at = now()

session.add(db_llm_call)
session.commit()
session.refresh(db_llm_call)

logger.info(
f"[update_llm_call_resolved_fields] Updated resolved fields | llm_call_id={llm_call_id}, provider={provider}, model={model}"
)

return db_llm_call
8 changes: 7 additions & 1 deletion backend/app/crud/llm_chain.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Any
from uuid import UUID

from sqlmodel import Session
from sqlmodel import Session, select

from app.core.util import now
from app.models.llm.request import ChainStatus, LlmChain
Expand Down Expand Up @@ -144,3 +144,9 @@ def update_llm_chain_block_completed(
f"llm_call_id={llm_call_id}"
)
return db_chain


def get_llm_chain_by_job_id(session: Session, job_id: UUID) -> LlmChain | None:
"""Return the LlmChain record associated with the given job."""
statement = select(LlmChain).where(LlmChain.job_id == job_id)
return session.exec(statement).first()
34 changes: 21 additions & 13 deletions backend/app/models/llm/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,19 @@ class TextContent(SQLModel):


class AudioContent(SQLModel):
format: Literal["base64"] = "base64"
value: str = Field(..., description="Base64 encoded audio")
format: Literal["base64", "url"] = "base64"
value: str = Field(
..., description="Base64 encoded audio or public URL to download from"
)
# keeping the mime_type liberal here, since does not affect base64 encoding
mime_type: str | None = Field(
None,
description="MIME type of the audio (e.g., audio/wav, audio/mp3, audio/ogg)",
)
uri: str | None = Field(
None,
description="Presigned URL to the audio file in object storage (when available)",
)


class ImageContent(SQLModel):
Expand Down Expand Up @@ -531,11 +537,11 @@ class LlmCall(SQLModel, table=True):
)

# NOTE: image, pdf, multimodal are internal labels stored in the table not user facing.
input_type: Literal["text", "audio", "image", "pdf", "multimodal"] = Field(
...,
input_type: Literal["text", "audio", "image", "pdf", "multimodal"] | None = Field(
default=None,
sa_column=sa.Column(
sa.String,
nullable=False,
nullable=True,
comment="Input type: text, audio, image, pdf, multimodal",
),
)
Expand All @@ -550,20 +556,22 @@ class LlmCall(SQLModel, table=True):
)

# Provider and model info
provider: str = Field(
...,
provider: str | None = Field(
default=None,
sa_column=sa.Column(
sa.String,
nullable=False,
nullable=True,
comment="AI provider as sent by user (e.g openai, -native, google)",
),
)

model: str = Field(
...,
sa_column_kwargs={
"comment": "Specific model used e.g. 'gpt-4o', 'gemini-2.5-pro'"
},
model: str | None = Field(
default=None,
sa_column=sa.Column(
sa.String,
nullable=True,
comment="Specific model used e.g. 'gpt-4o', 'gemini-2.5-pro'",
),
)

# Response fields
Expand Down
Loading
Loading