diff --git a/backend/app/alembic/versions/052_create_model_config_table.py b/backend/app/alembic/versions/052_create_model_config_table.py index e74b94641..c72a7ab32 100644 --- a/backend/app/alembic/versions/052_create_model_config_table.py +++ b/backend/app/alembic/versions/052_create_model_config_table.py @@ -115,7 +115,10 @@ def upgrade(): (16, 'openai', 'gpt-5.4', '{"effort": {"type": "enum", "default": "medium", "options": ["none", "low", "medium", "high", "xhigh"], "description": "How long the model spends reasoning. Higher = better but slower."}, "summary": {"type": "enum", "default": "auto", "options": ["auto", "detailed", "concise", "null"], "description": "Summarize the reasoning result."}}', '{TEXT,IMAGE}', '{TEXT}', '{"response": {"input_token_cost": 2.5, "output_token_cost": 15}, "batch": {"input_token_cost": 1.25, "output_token_cost": 7.5}}', true, NOW(), NOW()), (17, 'openai', 'gpt-5.4-mini', '{"effort": {"type": "enum", "default": "medium", "options": ["none", "low", "medium", "high", "xhigh"], "description": "How long the model spends reasoning. Higher = better but slower."}, "summary": {"type": "enum", "default": "auto", "options": ["auto", "detailed", "concise", "null"], "description": "Summarize the reasoning result."}}', '{TEXT,IMAGE}', '{TEXT}', '{"response": {"input_token_cost": 0.75, "output_token_cost": 4.5}, "batch": {"input_token_cost": 0.375, "output_token_cost": 2.25}}', true, NOW(), NOW()), (18, 'openai', 'gpt-5.4-nano', '{"effort": {"type": "enum", "default": "medium", "options": ["none", "low", "medium", "high", "xhigh"], "description": "How long the model spends reasoning. Higher = better but slower."}, "summary": {"type": "enum", "default": "auto", "options": ["auto", "detailed", "concise", "null"], "description": "Summarize the reasoning result."}}', '{TEXT,IMAGE}', '{TEXT}', '{"response": {"input_token_cost": 0.2, "output_token_cost": 1.25}, "batch": {"input_token_cost": 0.1, "output_token_cost": 0.625}}', true, NOW(), NOW()), - (19, 'openai', 'gpt-5.4-pro', '{"effort": {"type": "enum", "default": "medium", "options": ["none", "low", "medium", "high", "xhigh"], "description": "How long the model spends reasoning. Higher = better but slower."}, "summary": {"type": "enum", "default": "auto", "options": ["auto", "detailed", "concise", "null"], "description": "Summarize the reasoning result."}}', '{TEXT,IMAGE}', '{TEXT}', '{"response": {"input_token_cost": 30, "output_token_cost": 180}, "batch": {"input_token_cost": 15, "output_token_cost": 90}}', true, NOW(), NOW()) + (19, 'openai', 'gpt-5.4-pro', '{"effort": {"type": "enum", "default": "medium", "options": ["none", "low", "medium", "high", "xhigh"], "description": "How long the model spends reasoning. Higher = better but slower."}, "summary": {"type": "enum", "default": "auto", "options": ["auto", "detailed", "concise", "null"], "description": "Summarize the reasoning result."}}', '{TEXT,IMAGE}', '{TEXT}', '{"response": {"input_token_cost": 30, "output_token_cost": 180}, "batch": {"input_token_cost": 15, "output_token_cost": 90}}', true, NOW(), NOW()), + (20, 'openai', 'text-embedding-3-large', '{}', '{TEXT}', '{}', '{"response": {"input_token_cost": 0.13, "output_token_cost": 0}, "batch": {"input_token_cost": 0.065, "output_token_cost": 0}}', true, NOW(), NOW()), + (21, 'openai', 'text-embedding-3-small', '{}', '{TEXT}', '{}', '{"response": {"input_token_cost": 0.02, "output_token_cost": 0}, "batch": {"input_token_cost": 0.01, "output_token_cost": 0}}', true, NOW(), NOW()), + (22, 'openai', 'text-embedding-ada-002', '{}', '{TEXT}', '{}', '{"response": {"input_token_cost": 0.1, "output_token_cost": 0}, "batch": {"input_token_cost": 0.05, "output_token_cost": 0}}', true, NOW(), NOW()) """ ) diff --git a/backend/app/alembic/versions/054_add_cost_to_evaluation_run.py b/backend/app/alembic/versions/054_add_cost_to_evaluation_run.py new file mode 100644 index 000000000..4bacb4e0c --- /dev/null +++ b/backend/app/alembic/versions/054_add_cost_to_evaluation_run.py @@ -0,0 +1,33 @@ +"""add cost tracking to evaluation_run + +Revision ID: 054 +Revises: 053 +Create Date: 2026-04-09 12:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = "054" +down_revision = "053" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "cost", + postgresql.JSONB(astext_type=sa.Text()), + nullable=True, + comment="Cost tracking (response/embedding tokens and USD)", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "cost") diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 79a3c9d3f..6374dca76 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -5,19 +5,18 @@ from langfuse import Langfuse from sqlmodel import Session, select +from app.core.cloud.storage import get_cloud_storage +from app.core.db import engine +from app.core.storage_utils import upload_jsonl_to_object_store from app.core.util import now from app.crud.config.version import ConfigVersionCrud from app.crud.evaluations.langfuse import fetch_trace_scores_from_langfuse from app.crud.evaluations.score import EvaluationScore -from app.models import EvaluationRun +from app.models import EvaluationRun, EvaluationRunUpdate from app.models.llm.request import ConfigBlob, LLMCallConfig from app.models.stt_evaluation import EvaluationType from app.services.llm.jobs import resolve_config_blob -from app.core.db import engine -from app.core.cloud.storage import get_cloud_storage -from app.core.storage_utils import upload_jsonl_to_object_store - logger = logging.getLogger(__name__) @@ -192,46 +191,18 @@ def get_evaluation_run_by_id( def update_evaluation_run( session: Session, eval_run: EvaluationRun, - status: str | None = None, - error_message: str | None = None, - object_store_url: str | None = None, - score_trace_url: str | None = None, - score: dict | None = None, - embedding_batch_job_id: int | None = None, + update: EvaluationRunUpdate, ) -> EvaluationRun: """ - Update an evaluation run with new values and persist to database. - - This helper function ensures consistency when updating evaluation runs - by always updating the timestamp and properly committing changes. + Apply a partial update to an evaluation run and persist it. - Args: - session: Database session - eval_run: EvaluationRun instance to update - status: New status value (optional) - error_message: New error message (optional) - object_store_url: New object store URL (optional) - score: New score dict (optional) - embedding_batch_job_id: New embedding batch job ID (optional) - - Returns: - Updated and refreshed EvaluationRun instance + Only fields explicitly set on `update` are applied (`exclude_unset=True` + semantics), so callers don't accidentally clear unrelated columns. + `updated_at` is always bumped. """ - # Update provided fields - if status is not None: - eval_run.status = status - if error_message is not None: - eval_run.error_message = error_message - if object_store_url is not None: - eval_run.object_store_url = object_store_url - if score is not None: - eval_run.score = score - if embedding_batch_job_id is not None: - eval_run.embedding_batch_job_id = embedding_batch_job_id - if score_trace_url is not None: - eval_run.score_trace_url = score_trace_url or None - - # Always update timestamp + for field_name, new_value in update.model_dump(exclude_unset=True).items(): + setattr(eval_run, field_name, new_value) + eval_run.updated_at = now() # Persist to database @@ -314,7 +285,11 @@ def get_or_fetch_score( } # Update score column using existing helper - update_evaluation_run(session=session, eval_run=eval_run, score=score) + update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(score=score), + ) total_traces = len(score.get("traces", [])) logger.info( @@ -400,8 +375,10 @@ def save_score( update_evaluation_run( session=session, eval_run=eval_run, - score=db_score, - score_trace_url=score_trace_url, + update=EvaluationRunUpdate( + score=db_score, + score_trace_url=score_trace_url or None, + ), ) logger.info( diff --git a/backend/app/crud/evaluations/cost.py b/backend/app/crud/evaluations/cost.py new file mode 100644 index 000000000..653232dd1 --- /dev/null +++ b/backend/app/crud/evaluations/cost.py @@ -0,0 +1,177 @@ +""" +Cost tracking for evaluation runs. + +Token usage is aggregated per stage (response generation, embedding) and +priced against `global.model_config` using OpenAI Batch rates. Failures +here must never block evaluation completion — `attach_cost` swallows +exceptions and logs a warning. + +Persisted shape on `eval_run.cost`: + + { + "response": {model, input_tokens, output_tokens, total_tokens, cost_usd}, + "embedding": {model, input_tokens, output_tokens, total_tokens, cost_usd}, + "total_cost_usd": float, + } + +Either stage entry is optional. Embedding entries use output_tokens=0. +""" + +import logging +from collections.abc import Callable, Iterable +from typing import Any + +from sqlmodel import Session + +from app.crud.model_config import estimate_model_cost +from app.models import EvaluationRun + +logger = logging.getLogger(__name__) + +# USD rounding precision for persisted cost values. +COST_USD_DECIMALS = 6 + + +def _cost_usd(estimate: dict[str, Any] | None) -> float: + """Sum the per-direction costs from an estimate and round to our USD precision.""" + if not estimate: + return 0.0 + total = float(estimate.get("input_cost", 0.0)) + float( + estimate.get("output_cost", 0.0) + ) + return round(total, COST_USD_DECIMALS) + + +def _sum_tokens( + items: Iterable[dict[str, Any]], + usage_extractor: Callable[[dict[str, Any]], dict[str, Any] | None], + input_key: str, +) -> dict[str, int]: + """Sum (input, output, total) tokens across items using a per-item usage extractor. + + The OpenAI Embeddings API reports input tokens as ``prompt_tokens`` and has + no output tokens; chat/responses APIs use ``input_tokens`` and ``output_tokens``. + Missing keys default to 0, so the embedding case naturally produces + output_tokens=0. + """ + totals = {"input_tokens": 0, "output_tokens": 0, "total_tokens": 0} + for item in items: + usage = usage_extractor(item) + if not usage: + continue + totals["input_tokens"] += usage.get(input_key, 0) + totals["output_tokens"] += usage.get("output_tokens", 0) + totals["total_tokens"] += usage.get("total_tokens", 0) + return totals + + +def _build_cost_entry( + session: Session, + model: str, + totals: dict[str, int], +) -> dict[str, Any]: + """Price aggregated token usage against the model's batch pricing row.""" + estimate = estimate_model_cost( + session=session, + provider="openai", + model_name=model, + input_tokens=totals["input_tokens"], + output_tokens=totals["output_tokens"], + usage_type="batch", + ) + return { + "model": model, + "input_tokens": totals["input_tokens"], + "output_tokens": totals["output_tokens"], + "total_tokens": totals["total_tokens"], + "cost_usd": _cost_usd(estimate), + } + + +def _build_response_cost_entry( + session: Session, model: str, results: list[dict[str, Any]] +) -> dict[str, Any]: + """Build a response-stage cost entry from parsed evaluation results.""" + totals = _sum_tokens( + items=results, + usage_extractor=lambda r: r.get("usage"), + input_key="input_tokens", + ) + return _build_cost_entry(session=session, model=model, totals=totals) + + +def _build_embedding_cost_entry( + session: Session, model: str, raw_results: list[dict[str, Any]] +) -> dict[str, Any]: + """Build an embedding-stage cost entry from raw embedding batch output.""" + totals = _sum_tokens( + items=raw_results, + usage_extractor=lambda r: r.get("response", {}).get("body", {}).get("usage"), + input_key="prompt_tokens", + ) + return _build_cost_entry(session=session, model=model, totals=totals) + + +def _build_cost_dict( + response_entry: dict[str, Any] | None, + embedding_entry: dict[str, Any] | None, +) -> dict[str, Any]: + """Combine per-stage entries into the `eval_run.cost` payload with a grand total.""" + cost: dict[str, Any] = {} + total = 0.0 + + if response_entry: + cost["response"] = response_entry + total += response_entry.get("cost_usd", 0.0) + + if embedding_entry: + cost["embedding"] = embedding_entry + total += embedding_entry.get("cost_usd", 0.0) + + cost["total_cost_usd"] = round(total, COST_USD_DECIMALS) + return cost + + +def attach_cost( + session: Session, + eval_run: EvaluationRun, + log_prefix: str, + *, + response_model: str | None = None, + response_results: list[dict[str, Any]] | None = None, + embedding_model: str | None = None, + embedding_raw_results: list[dict[str, Any]] | None = None, +) -> None: + """Compute cost for the given stage(s) and attach to `eval_run.cost`, never raising. + + Caller is responsible for persisting `eval_run` afterwards. Either stage's + previously-computed entry on `eval_run.cost` is preserved when that stage's + inputs are not supplied, so partial updates never clobber prior data. + """ + try: + existing_cost = eval_run.cost or {} + + if response_model is not None and response_results is not None: + response_entry = _build_response_cost_entry( + session=session, model=response_model, results=response_results + ) + else: + response_entry = existing_cost.get("response") + + if embedding_model is not None and embedding_raw_results is not None: + embedding_entry = _build_embedding_cost_entry( + session=session, + model=embedding_model, + raw_results=embedding_raw_results, + ) + else: + embedding_entry = existing_cost.get("embedding") + + eval_run.cost = _build_cost_dict( + response_entry=response_entry, + embedding_entry=embedding_entry, + ) + except Exception as cost_err: + logger.warning( + f"[attach_cost] {log_prefix} Failed to compute cost | {cost_err}" + ) diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 1fa82b39f..472390801 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -27,8 +27,10 @@ ) from app.core.batch.base import BATCH_KEY from app.crud.evaluations.batch import fetch_dataset_items -from app.crud.evaluations.core import update_evaluation_run, resolve_model_from_config +from app.crud.evaluations.core import resolve_model_from_config, update_evaluation_run +from app.crud.evaluations.cost import attach_cost from app.crud.evaluations.embeddings import ( + EMBEDDING_MODEL, calculate_average_similarity, parse_embedding_results, start_embedding_batch, @@ -38,7 +40,7 @@ update_traces_with_cosine_scores, ) from app.crud.job import get_batch_job, update_batch_job -from app.models import EvaluationRun +from app.models import EvaluationRun, EvaluationRunUpdate from app.models.batch_job import BatchJob, BatchJobUpdate from app.utils import get_langfuse_client, get_openai_client @@ -332,6 +334,20 @@ async def process_completed_evaluation( # Use model stored at creation time for cost tracking model = resolve_model_from_config(session=session, eval_run=eval_run) + # Aggregate response generation cost + attach_cost( + session=session, + eval_run=eval_run, + log_prefix=log_prefix, + response_model=model, + response_results=results, + ) + update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(cost=eval_run.cost), + ) + trace_id_mapping = create_langfuse_dataset_run( langfuse=langfuse, dataset_name=eval_run.dataset_name, @@ -367,8 +383,10 @@ async def process_completed_evaluation( eval_run = update_evaluation_run( session=session, eval_run=eval_run, - status="completed", - error_message=f"Embeddings failed: {str(e)}", + update=EvaluationRunUpdate( + status="completed", + error_message=f"Embeddings failed: {str(e)}", + ), ) logger.info( @@ -386,8 +404,10 @@ async def process_completed_evaluation( return update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=f"Processing failed: {str(e)}", + update=EvaluationRunUpdate( + status="failed", + error_message=f"Processing failed: {str(e)}", + ), ) @@ -488,9 +508,24 @@ async def process_completed_embedding_batch( exc_info=True, ) - # Step 7: Mark evaluation as completed + # Step 7: Accumulate embedding cost onto existing response cost + attach_cost( + session=session, + eval_run=eval_run, + log_prefix=log_prefix, + embedding_model=EMBEDDING_MODEL, + embedding_raw_results=raw_results, + ) + + # Step 8: Mark evaluation as completed eval_run = update_evaluation_run( - session=session, eval_run=eval_run, status="completed", score=eval_run.score + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate( + status="completed", + score=eval_run.score, + cost=eval_run.cost, + ), ) logger.info( @@ -508,8 +543,10 @@ async def process_completed_embedding_batch( return update_evaluation_run( session=session, eval_run=eval_run, - status="completed", - error_message=f"Embedding processing failed: {str(e)}", + update=EvaluationRunUpdate( + status="completed", + error_message=f"Embedding processing failed: {str(e)}", + ), ) @@ -593,8 +630,10 @@ async def check_and_process_evaluation( eval_run = update_evaluation_run( session=session, eval_run=eval_run, - status="completed", - error_message=f"Embedding batch failed: {embedding_batch_job.error_message}", + update=EvaluationRunUpdate( + status="completed", + error_message=f"Embedding batch failed: {embedding_batch_job.error_message}", + ), ) return { @@ -654,8 +693,10 @@ async def check_and_process_evaluation( eval_run = update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=error_msg, + update=EvaluationRunUpdate( + status="failed", + error_message=error_msg, + ), ) logger.error( @@ -696,8 +737,10 @@ async def check_and_process_evaluation( eval_run = update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=error_msg, + update=EvaluationRunUpdate( + status="failed", + error_message=error_msg, + ), ) logger.error( @@ -735,8 +778,10 @@ async def check_and_process_evaluation( update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=f"Checking failed: {str(e)}", + update=EvaluationRunUpdate( + status="failed", + error_message=f"Checking failed: {str(e)}", + ), ) return { @@ -828,8 +873,10 @@ async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]: update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=http_exc.detail, + update=EvaluationRunUpdate( + status="failed", + error_message=http_exc.detail, + ), ) all_results.append( @@ -869,8 +916,10 @@ async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]: update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=f"Check failed: {str(e)}", + update=EvaluationRunUpdate( + status="failed", + error_message=f"Check failed: {str(e)}", + ), ) all_results.append( @@ -892,8 +941,10 @@ async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]: update_evaluation_run( session=session, eval_run=eval_run, - status="failed", - error_message=f"Project processing failed: {str(e)}", + update=EvaluationRunUpdate( + status="failed", + error_message=f"Project processing failed: {str(e)}", + ), ) all_results.append( diff --git a/backend/app/crud/model_config.py b/backend/app/crud/model_config.py index fed1f71c7..6d535240a 100644 --- a/backend/app/crud/model_config.py +++ b/backend/app/crud/model_config.py @@ -52,6 +52,21 @@ def get_model_config( return session.exec(statement).first() +def is_reasoning_model( + session: Session, provider: Literal["openai", "google"], model_name: str +) -> bool: + """Return True if the model is configured with a reasoning `effort` control. + + A model is considered reasoning-capable if its `config` JSON contains an + `effort` key; models that instead expose a `temperature` key are treated + as standard chat models. + """ + model = get_model_config(session=session, provider=provider, model_name=model_name) + if model is None or not isinstance(model.config, dict): + return False + return "effort" in model.config + + def estimate_model_cost( session: Session, provider: Literal["openai", "google"], @@ -82,7 +97,6 @@ def estimate_model_cost( input_cost = (input_tokens / 1_000_000) * float(input_price) output_cost = (output_tokens / 1_000_000) * float(output_price) - total_cost = round(input_cost + output_cost, 4) return { "provider": provider, @@ -92,6 +106,6 @@ def estimate_model_cost( "output_tokens": output_tokens, "input_cost": input_cost, "output_cost": output_cost, - "total_cost": total_cost, + "total_cost": input_cost + output_cost, "currency": "USD", } diff --git a/backend/app/models/__init__.py b/backend/app/models/__init__.py index d3fc55a7f..98c4f7d24 100644 --- a/backend/app/models/__init__.py +++ b/backend/app/models/__init__.py @@ -1,5 +1,13 @@ from sqlmodel import SQLModel +from .api_key import ( + APIKey, + APIKeyBase, + APIKeyCreateResponse, + APIKeyPublic, + APIKeyVerifyResponse, +) +from .assistants import Assistant, AssistantBase, AssistantCreate, AssistantUpdate from .auth import ( AuthContext, GoogleAuthRequest, @@ -8,48 +16,44 @@ Token, TokenPayload, ) - -from .api_key import ( - APIKey, - APIKeyBase, - APIKeyPublic, - APIKeyCreateResponse, - APIKeyVerifyResponse, +from .batch_job import ( + BatchJob, + BatchJobCreate, + BatchJobPublic, + BatchJobType, + BatchJobUpdate, ) - -from .assistants import Assistant, AssistantBase, AssistantCreate, AssistantUpdate - from .collection import ( Collection, - CreationRequest, - CollectionPublic, CollectionIDPublic, + CollectionPublic, CollectionWithDocsPublic, + CreationRequest, DeletionRequest, ProviderType, ) from .collection_job import ( CollectionActionType, CollectionJob, - CollectionJobStatus, - CollectionJobUpdate, - CollectionJobPublic, CollectionJobCreate, CollectionJobImmediatePublic, + CollectionJobPublic, + CollectionJobStatus, + CollectionJobUpdate, ) from .config import ( Config, ConfigBase, ConfigCreate, - ConfigUpdate, ConfigPublic, - ConfigWithVersion, + ConfigUpdate, ConfigVersion, ConfigVersionBase, ConfigVersionCreate, - ConfigVersionUpdate, - ConfigVersionPublic, ConfigVersionItems, + ConfigVersionPublic, + ConfigVersionUpdate, + ConfigWithVersion, ) from .credentials import ( Credential, @@ -58,32 +62,22 @@ CredsPublic, CredsUpdate, ) - +from .doc_transformation_job import ( + DocTransformationJob, + DocTransformJobCreate, + DocTransformJobUpdate, + TransformationStatus, +) from .document import ( - Document, - DocumentPublic, DocTransformationJobPublic, DocTransformationJobsPublic, - TransformedDocumentPublic, + Document, + DocumentPublic, DocumentUploadResponse, TransformationJobInfo, -) -from .doc_transformation_job import ( - DocTransformationJob, - TransformationStatus, - DocTransformJobCreate, - DocTransformJobUpdate, + TransformedDocumentPublic, ) from .document_collection import DocumentCollection - -from .batch_job import ( - BatchJob, - BatchJobCreate, - BatchJobPublic, - BatchJobType, - BatchJobUpdate, -) - from .evaluation import ( EvaluationDataset, EvaluationDatasetCreate, @@ -91,50 +85,43 @@ EvaluationRun, EvaluationRunCreate, EvaluationRunPublic, + EvaluationRunUpdate, ) - -from .file import File, FilePublic, FileType, AudioUploadResponse - +from .file import AudioUploadResponse, File, FilePublic, FileType from .fine_tuning import ( - FineTuningJobBase, Fine_Tuning, + FineTuningJobBase, FineTuningJobCreate, FineTuningJobPublic, - FineTuningUpdate, FineTuningStatus, + FineTuningUpdate, ) - -from .job import Job, JobType, JobStatus, JobUpdate - +from .job import Job, JobStatus, JobType, JobUpdate from .language import ( Language, LanguageBase, LanguagePublic, LanguagesPublic, ) - from .llm import ( - ConfigBlob, CompletionConfig, + ConfigBlob, + LlmCall, LLMCallRequest, LLMCallResponse, - LlmCall, + LlmChain, LLMChainRequest, LLMChainResponse, - LlmChain, LLMJobImmediatePublic, LLMJobPublic, ) - from .message import Message - from .model_config import ( ModelConfig, ModelConfigBase, ModelConfigListPublic, ModelConfigPublic, ) - from .model_evaluation import ( ModelEvaluation, ModelEvaluationBase, @@ -143,14 +130,12 @@ ModelEvaluationStatus, ModelEvaluationUpdate, ) - - from .onboarding import OnboardingRequest, OnboardingResponse from .openai_conversation import ( - OpenAIConversationPublic, OpenAIConversation, OpenAIConversationBase, OpenAIConversationCreate, + OpenAIConversationPublic, ) from .organization import ( Organization, @@ -159,7 +144,6 @@ OrganizationsPublic, OrganizationUpdate, ) - from .project import ( Project, ProjectCreate, @@ -167,33 +151,29 @@ ProjectsPublic, ProjectUpdate, ) - from .response import ( CallbackResponse, Diagnostics, FileResultChunk, - ResponsesAPIRequest, ResponseJobStatus, + ResponsesAPIRequest, ResponsesSyncAPIRequest, ) - from .threads import OpenAI_Thread, OpenAIThreadBase, OpenAIThreadCreate - from .user import ( NewPassword, + UpdatePassword, User, UserCreate, UserPublic, UserRegister, + UsersPublic, UserUpdate, UserUpdateMe, - UsersPublic, - UpdatePassword, ) - from .user_project import ( - UserProject, AddUsersToProjectRequest, UserEntry, + UserProject, UserProjectPublic, ) diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index d2d2beecc..c9130d3c3 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -313,6 +313,17 @@ class EvaluationRun(SQLModel, table=True): description="Evaluation scores (e.g., correctness, cosine_similarity, etc.)", ) + # Cost tracking field + cost: dict[str, Any] | None = SQLField( + default=None, + sa_column=Column( + JSONB, + nullable=True, + comment="Cost tracking (response/embedding tokens and USD)", + ), + description="Cost breakdown by stage (response, embedding) with token counts and USD", + ) + # Error message field error_message: str | None = SQLField( default=None, @@ -382,6 +393,22 @@ class EvaluationRunCreate(SQLModel): ) +class EvaluationRunUpdate(SQLModel): + """Partial update payload for an evaluation run. + + Any field left unset is untouched. Used by `update_evaluation_run` with + `model_dump(exclude_unset=True)` semantics. + """ + + status: str | None = None + error_message: str | None = None + object_store_url: str | None = None + score_trace_url: str | None = None + score: dict[str, Any] | None = None + cost: dict[str, Any] | None = None + embedding_batch_job_id: int | None = None + + class EvaluationRunPublic(SQLModel): """Public model for evaluation runs.""" @@ -395,8 +422,10 @@ class EvaluationRunPublic(SQLModel): embedding_batch_job_id: int | None status: str object_store_url: str | None + score_trace_url: str | None total_items: int score: dict[str, Any] | None + cost: dict[str, Any] | None error_message: str | None organization_id: int project_id: int diff --git a/backend/app/models/llm/constants.py b/backend/app/models/llm/constants.py index 02b1823ed..399748843 100644 --- a/backend/app/models/llm/constants.py +++ b/backend/app/models/llm/constants.py @@ -37,9 +37,6 @@ ], } -# OpenAI models that support reasoning (effort parameter) -OPENAI_REASONING_MODELS: set[str] = {"o1", "o1-preview", "o1-mini"} - SUPPORTED_VOICES = { ("google", "tts"): ["Kore", "Orus", "Leda", "Charon"], ("sarvamai", "tts"): ["simran", "shubh", "roopa"], diff --git a/backend/app/services/llm/jobs.py b/backend/app/services/llm/jobs.py index 0ffbfd95c..484141376 100644 --- a/backend/app/services/llm/jobs.py +++ b/backend/app/services/llm/jobs.py @@ -1,6 +1,5 @@ import logging from contextlib import contextmanager -from typing import Any from uuid import UUID from asgi_correlation_id import correlation_id @@ -369,7 +368,7 @@ def execute_llm_call( if isinstance(completion_config, KaapiCompletionConfig): completion_config, warnings = transform_kaapi_config_to_native( - completion_config + session=session, kaapi_config=completion_config ) if request_metadata is None: request_metadata = {} diff --git a/backend/app/services/llm/mappers.py b/backend/app/services/llm/mappers.py index 19cbd7d04..3bd049f05 100644 --- a/backend/app/services/llm/mappers.py +++ b/backend/app/services/llm/mappers.py @@ -1,12 +1,14 @@ import logging +from sqlmodel import Session + +from app.crud.model_config import is_reasoning_model from app.models.llm import KaapiCompletionConfig, NativeCompletionConfig from app.models.llm.constants import ( BCP47_LOCALE_TO_GEMINI_LANG, BCP47_TO_ELEVENLABS_LANG, - ELEVENLABS_VOICE_TO_ID, DEFAULT_TTS_VOICE, - OPENAI_REASONING_MODELS, + ELEVENLABS_VOICE_TO_ID, ) logger = logging.getLogger(__name__) @@ -35,13 +37,16 @@ def bcp47_to_elevenlabs_lang(bcp47_code: str) -> str | None: return BCP47_TO_ELEVENLABS_LANG.get(bcp47_code) -def map_kaapi_to_openai_params(kaapi_params: dict) -> tuple[dict, list[str]]: +def map_kaapi_to_openai_params( + session: Session, kaapi_params: dict +) -> tuple[dict, list[str]]: """Map Kaapi-abstracted parameters to OpenAI API parameters. This mapper transforms standardized Kaapi parameters into OpenAI-specific parameter format, enabling provider-agnostic interface design. Args: + session: Database session used to look up the model's config kaapi_params: Dictionary with standardized Kaapi parameters Supported Mapping: @@ -67,7 +72,9 @@ def map_kaapi_to_openai_params(kaapi_params: dict) -> tuple[dict, list[str]]: knowledge_base_ids = kaapi_params.get("knowledge_base_ids") max_num_results = kaapi_params.get("max_num_results") - support_reasoning = model in OPENAI_REASONING_MODELS + support_reasoning = bool(model) and is_reasoning_model( + session=session, provider="openai", model_name=model + ) # Handle reasoning vs temperature mutual exclusivity if support_reasoning: @@ -422,6 +429,7 @@ def map_kaapi_to_elevenlabs_params( def transform_kaapi_config_to_native( + session: Session, kaapi_config: KaapiCompletionConfig, ) -> tuple[NativeCompletionConfig, list[str]]: """Transform Kaapi completion config to native provider config with mapped parameters. @@ -429,6 +437,7 @@ def transform_kaapi_config_to_native( Supports OpenAI,Google AI and Sarvam AI providers. Args: + session: Database session used to look up model-specific config (e.g. reasoning support) kaapi_config: KaapiCompletionConfig with abstracted parameters Returns: @@ -438,7 +447,9 @@ def transform_kaapi_config_to_native( """ # TODO change from magic string to enums if kaapi_config.provider == "openai": - mapped_params, warnings = map_kaapi_to_openai_params(kaapi_config.params) + mapped_params, warnings = map_kaapi_to_openai_params( + session=session, kaapi_params=kaapi_config.params + ) return ( NativeCompletionConfig( provider="openai-native", params=mapped_params, type=kaapi_config.type diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 52162654d..527ff86f4 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -1,23 +1,22 @@ -from typing import Any import json from unittest.mock import MagicMock, patch import pytest from sqlmodel import Session, select +from app.core.util import now +from app.crud.evaluations.core import create_evaluation_run from app.crud.evaluations.processing import ( _extract_batch_error_message, check_and_process_evaluation, parse_evaluation_output, + poll_all_pending_evaluations, process_completed_embedding_batch, process_completed_evaluation, - poll_all_pending_evaluations, ) -from app.models import BatchJob, Organization, Project, EvaluationDataset, EvaluationRun +from app.models import BatchJob, EvaluationDataset, EvaluationRun, Organization, Project from app.models.batch_job import BatchJobType -from app.tests.utils.test_data import create_test_evaluation_dataset, create_test_config -from app.crud.evaluations.core import create_evaluation_run -from app.core.util import now +from app.tests.utils.test_data import create_test_config, create_test_evaluation_dataset class TestParseEvaluationOutput: @@ -357,7 +356,11 @@ async def test_process_completed_evaluation_success( "body": { "id": "resp_123", "output": "Answer 1", - "usage": {"total_tokens": 10}, + "usage": { + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + }, } }, } @@ -397,6 +400,20 @@ async def test_process_completed_evaluation_success( mock_create_langfuse.assert_called_once() mock_start_embedding.assert_called_once() + # Cost tracking: response cost should be aggregated and persisted. + db.refresh(result) + assert result.cost is not None + assert "response" in result.cost + response_cost = result.cost["response"] + assert response_cost["model"] == "gpt-4o" + assert response_cost["input_tokens"] == 100 + assert response_cost["output_tokens"] == 50 + assert response_cost["total_tokens"] == 150 + assert response_cost["cost_usd"] > 0 + assert result.cost["total_cost_usd"] == response_cost["cost_usd"] + # Embedding cost is added later by process_completed_embedding_batch. + assert "embedding" not in result.cost + @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results") @patch("app.crud.evaluations.processing.fetch_dataset_items") @@ -547,7 +564,31 @@ async def test_process_completed_embedding_batch_success( eval_run_with_embedding_batch, ): """Test successfully processing completed embedding batch.""" - mock_download.return_value = [] + # Pre-populate eval_run.cost with a response entry to verify that the + # embedding stage merges (not overwrites) existing cost data. + eval_run_with_embedding_batch.cost = { + "response": { + "model": "gpt-4o", + "input_tokens": 100, + "output_tokens": 50, + "total_tokens": 150, + "cost_usd": 0.000375, + }, + "total_cost_usd": 0.000375, + } + db.add(eval_run_with_embedding_batch) + db.commit() + db.refresh(eval_run_with_embedding_batch) + + # Raw results carry the usage payload that _build_embedding_cost_entry reads. + mock_download.return_value = [ + { + "custom_id": "trace_123", + "response": { + "body": {"usage": {"prompt_tokens": 200, "total_tokens": 200}} + }, + } + ] mock_parse.return_value = [ { "item_id": "item1", @@ -586,6 +627,22 @@ async def test_process_completed_embedding_batch_success( assert cosine_score is not None assert cosine_score["avg"] == 0.95 + # Cost tracking: embedding entry is added, response entry is preserved, + # and total_cost_usd is the sum of both. + assert result.cost is not None + assert "response" in result.cost + assert "embedding" in result.cost + assert result.cost["response"]["cost_usd"] == 0.000375 + embedding_cost = result.cost["embedding"] + assert embedding_cost["model"] == "text-embedding-3-large" + assert embedding_cost["input_tokens"] == 200 + assert embedding_cost["output_tokens"] == 0 + assert embedding_cost["total_tokens"] == 200 + assert embedding_cost["cost_usd"] > 0 + assert result.cost["total_cost_usd"] == pytest.approx( + 0.000375 + embedding_cost["cost_usd"] + ) + @pytest.mark.asyncio @patch("app.crud.evaluations.processing.download_batch_results") @patch("app.crud.evaluations.processing.parse_embedding_results") diff --git a/backend/app/tests/crud/evaluations/test_score_storage.py b/backend/app/tests/crud/evaluations/test_score_storage.py index 87a82845b..0d029b21c 100644 --- a/backend/app/tests/crud/evaluations/test_score_storage.py +++ b/backend/app/tests/crud/evaluations/test_score_storage.py @@ -49,11 +49,9 @@ def test_uploads_traces_to_s3_and_stores_summary_only( assert mock_upload.call_args.kwargs["results"] == [{"trace_id": "t1"}] # Verify DB gets summary only, not traces - call_kwargs = mock_update.call_args.kwargs - assert call_kwargs["score"] == { - "summary_scores": [{"name": "accuracy", "avg": 0.9}] - } - assert call_kwargs["score_trace_url"] == "s3://bucket/traces.json" + update = mock_update.call_args.kwargs["update"] + assert update.score == {"summary_scores": [{"name": "accuracy", "avg": 0.9}]} + assert update.score_trace_url == "s3://bucket/traces.json" @patch("app.crud.evaluations.core.update_evaluation_run") @patch("app.crud.evaluations.core.get_evaluation_run_by_id") @@ -82,9 +80,9 @@ def test_fallback_to_db_when_s3_fails( save_score(eval_run_id=100, organization_id=1, project_id=1, score=score) # Full score stored in DB as fallback - call_kwargs = mock_update.call_args.kwargs - assert call_kwargs["score"] == score - assert call_kwargs["score_trace_url"] is None + update = mock_update.call_args.kwargs["update"] + assert update.score == score + assert update.score_trace_url is None @patch("app.crud.evaluations.core.update_evaluation_run") @patch("app.crud.evaluations.core.get_evaluation_run_by_id") diff --git a/backend/app/tests/services/llm/test_jobs.py b/backend/app/tests/services/llm/test_jobs.py index 4a7f7e265..c1b6acf25 100644 --- a/backend/app/tests/services/llm/test_jobs.py +++ b/backend/app/tests/services/llm/test_jobs.py @@ -659,7 +659,7 @@ def test_kaapi_config_warnings_passed_through_metadata( provider="openai", type="text", params={ - "model": "o1", # Reasoning model + "model": "gpt-5", # Reasoning model "temperature": 0.7, # This will be suppressed with warning }, ) diff --git a/backend/app/tests/services/llm/test_mappers.py b/backend/app/tests/services/llm/test_mappers.py index 22e488b37..7f1a7f036 100644 --- a/backend/app/tests/services/llm/test_mappers.py +++ b/backend/app/tests/services/llm/test_mappers.py @@ -5,77 +5,72 @@ Covers real-world scenarios, edge cases, and provider-specific requirements. """ -import pytest +from sqlmodel import Session from app.models.llm.request import ( - TextLLMParams, - STTLLMParams, - TTSLLMParams, KaapiCompletionConfig, NativeCompletionConfig, + STTLLMParams, + TextLLMParams, + TTSLLMParams, ) from app.services.llm.mappers import ( - map_kaapi_to_openai_params, + bcp47_to_elevenlabs_lang, + map_kaapi_to_elevenlabs_params, map_kaapi_to_google_params, + map_kaapi_to_openai_params, map_kaapi_to_sarvam_params, - map_kaapi_to_elevenlabs_params, - bcp47_to_elevenlabs_lang, - voice_to_id, transform_kaapi_config_to_native, -) -from app.models.llm.constants import ( - DEFAULT_STT_MODEL, - DEFAULT_TTS_MODEL, - DEFAULT_TTS_VOICE, + voice_to_id, ) class TestMapKaapiToOpenAIParams: """Test cases for map_kaapi_to_openai_params function.""" - def test_basic_model_mapping(self): + def test_basic_model_mapping(self, db: Session): """Test basic model parameter mapping.""" kaapi_params = TextLLMParams(model="gpt-4o") result, warnings = map_kaapi_to_openai_params( - kaapi_params.model_dump(exclude_none=True) + session=db, kaapi_params=kaapi_params.model_dump(exclude_none=True) ) # TextLLMParams has default temperature=0.1 assert result == {"model": "gpt-4o", "temperature": 0.1} assert warnings == [] - def test_reasoning_mapping_for_reasoning_models(self): + def test_reasoning_mapping_for_reasoning_models(self, db: Session): """Test reasoning parameter mapping to OpenAI format for reasoning-capable models.""" kaapi_params = TextLLMParams( - model="o1", + model="gpt-5", reasoning="high", ) result, warnings = map_kaapi_to_openai_params( - kaapi_params.model_dump(exclude_none=True) + session=db, kaapi_params=kaapi_params.model_dump(exclude_none=True) ) - assert result["model"] == "o1" + assert result["model"] == "gpt-5" assert result["reasoning"] == {"effort": "high"} # Temperature is suppressed for reasoning models (even default value) assert "temperature" not in result assert len(warnings) == 1 assert "temperature" in warnings[0].lower() - def test_knowledge_base_ids_mapping(self): + def test_knowledge_base_ids_mapping(self, db: Session): """Test knowledge_base_ids mapping to OpenAI tools format.""" kaapi_params = TextLLMParams( - model="gpt-4", + model="gpt-4o", knowledge_base_ids=["vs_abc123", "vs_def456"], max_num_results=50, ) result, warnings = map_kaapi_to_openai_params( - kaapi_params.model_dump(exclude_none=True) + session=db, kaapi_params=kaapi_params.model_dump(exclude_none=True) ) - assert result["model"] == "gpt-4" + assert result["model"] == "gpt-4o" assert "tools" in result assert len(result["tools"]) == 1 assert result["tools"][0]["type"] == "file_search" @@ -83,37 +78,37 @@ def test_knowledge_base_ids_mapping(self): assert result["tools"][0]["max_num_results"] == 50 assert warnings == [] - def test_temperature_suppressed_for_reasoning_models(self): + def test_temperature_suppressed_for_reasoning_models(self, db: Session): """Test that temperature is suppressed with warning for reasoning models when reasoning is set.""" kaapi_params = TextLLMParams( - model="o1", + model="gpt-5", temperature=0.7, reasoning="high", ) result, warnings = map_kaapi_to_openai_params( - kaapi_params.model_dump(exclude_none=True) + session=db, kaapi_params=kaapi_params.model_dump(exclude_none=True) ) - assert result["model"] == "o1" + assert result["model"] == "gpt-5" assert result["reasoning"] == {"effort": "high"} assert "temperature" not in result assert len(warnings) == 1 assert "temperature" in warnings[0].lower() assert "suppressed" in warnings[0] - def test_reasoning_suppressed_for_non_reasoning_models(self): + def test_reasoning_suppressed_for_non_reasoning_models(self, db: Session): """Test that reasoning is suppressed with warning for non-reasoning models.""" kaapi_params = TextLLMParams( - model="gpt-4", + model="gpt-4o", reasoning="high", ) result, warnings = map_kaapi_to_openai_params( - kaapi_params.model_dump(exclude_none=True) + session=db, kaapi_params=kaapi_params.model_dump(exclude_none=True) ) - assert result["model"] == "gpt-4" + assert result["model"] == "gpt-4o" assert "reasoning" not in result assert len(warnings) == 1 assert "reasoning" in warnings[0].lower() @@ -826,7 +821,7 @@ def test_unsupported_voice_returns_none(self): class TestTransformKaapiConfigToNative: """Test end-to-end transformation with completion_type parameter.""" - def test_transform_elevenlabs_tts_config(self): + def test_transform_elevenlabs_tts_config(self, db: Session): """Test transformation of ElevenLabs TTS config.""" kaapi_config = KaapiCompletionConfig( provider="elevenlabs", @@ -839,7 +834,9 @@ def test_transform_elevenlabs_tts_config(self): }, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "elevenlabs-native" @@ -850,7 +847,7 @@ def test_transform_elevenlabs_tts_config(self): assert result.params["output_format"] == "mp3_44100_128" assert warnings == [] - def test_transform_elevenlabs_stt_config(self): + def test_transform_elevenlabs_stt_config(self, db: Session): """Test transformation of ElevenLabs STT config.""" kaapi_config = KaapiCompletionConfig( provider="elevenlabs", @@ -862,7 +859,9 @@ def test_transform_elevenlabs_stt_config(self): }, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "elevenlabs-native" @@ -872,7 +871,7 @@ def test_transform_elevenlabs_stt_config(self): assert result.params["temperature"] == 0.3 assert warnings == [] - def test_transform_sarvamai_stt_with_saaras_model(self): + def test_transform_sarvamai_stt_with_saaras_model(self, db: Session): """Test transformation of SarvamAI STT with saaras:v3 model.""" kaapi_config = KaapiCompletionConfig( provider="sarvamai", @@ -884,7 +883,9 @@ def test_transform_sarvamai_stt_with_saaras_model(self): }, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "sarvamai-native" @@ -898,7 +899,7 @@ def test_transform_sarvamai_stt_with_saaras_model(self): # Removed test_transform_sarvamai_stt_with_saarika_model - model no longer in SUPPORTED_MODELS # The mapper logic for saarika (no mode parameter) is already tested in unit tests - def test_transform_sarvamai_tts_with_voice(self): + def test_transform_sarvamai_tts_with_voice(self, db: Session): """Test transformation of SarvamAI TTS with explicit voice.""" kaapi_config = KaapiCompletionConfig( provider="sarvamai", @@ -910,7 +911,9 @@ def test_transform_sarvamai_tts_with_voice(self): }, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "sarvamai-native" @@ -920,7 +923,7 @@ def test_transform_sarvamai_tts_with_voice(self): assert result.params["speaker"] == "simran" assert warnings == [] - def test_transform_google_text_completion(self): + def test_transform_google_text_completion(self, db: Session): """Test transformation of Google text completion.""" kaapi_config = KaapiCompletionConfig( provider="google", @@ -932,7 +935,9 @@ def test_transform_google_text_completion(self): }, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "google-native" @@ -942,7 +947,7 @@ def test_transform_google_text_completion(self): assert result.params["reasoning"] == "high" assert warnings == [] - def test_transform_google_stt_completion(self): + def test_transform_google_stt_completion(self, db: Session): """Test transformation of Google STT completion.""" kaapi_config = KaapiCompletionConfig( provider="google", @@ -950,7 +955,9 @@ def test_transform_google_stt_completion(self): params={"model": "gemini-2.5-pro", "instructions": "Transcribe accurately"}, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "google-native" @@ -959,7 +966,7 @@ def test_transform_google_stt_completion(self): assert result.params["instructions"] == "Transcribe accurately" assert warnings == [] - def test_transform_google_tts_completion(self): + def test_transform_google_tts_completion(self, db: Session): """Test transformation of Google TTS completion.""" kaapi_config = KaapiCompletionConfig( provider="google", @@ -971,7 +978,9 @@ def test_transform_google_tts_completion(self): }, ) - result, warnings = transform_kaapi_config_to_native(kaapi_config) + result, warnings = transform_kaapi_config_to_native( + session=db, kaapi_config=kaapi_config + ) assert isinstance(result, NativeCompletionConfig) assert result.provider == "google-native"