From 77938e015c9d6ac522328d5cd6fc81e7485ccd2d Mon Sep 17 00:00:00 2001 From: AkhileshNegi Date: Tue, 28 Apr 2026 13:29:21 +0530 Subject: [PATCH] first stab at fast-evaluation --- .../055_add_run_mode_to_evaluation_run.py | 33 +++ .../api/docs/evaluation/create_evaluation.md | 18 +- .../app/api/routes/evaluations/evaluation.py | 10 + backend/app/celery/celery_app.py | 7 + backend/app/celery/tasks/evaluation_live.py | 108 +++++++ backend/app/core/config.py | 4 + backend/app/crud/evaluations/__init__.py | 3 + backend/app/crud/evaluations/batch.py | 55 ++-- backend/app/crud/evaluations/core.py | 3 + backend/app/crud/evaluations/cost.py | 72 ++++- backend/app/crud/evaluations/live.py | 106 +++++++ backend/app/crud/evaluations/processing.py | 6 +- backend/app/models/evaluation.py | 7 + .../app/services/evaluations/evaluation.py | 58 +++- .../services/evaluations/live_aggregator.py | 270 ++++++++++++++++++ backend/app/services/evaluations/live_row.py | 220 ++++++++++++++ 16 files changed, 938 insertions(+), 42 deletions(-) create mode 100644 backend/app/alembic/versions/055_add_run_mode_to_evaluation_run.py create mode 100644 backend/app/celery/tasks/evaluation_live.py create mode 100644 backend/app/crud/evaluations/live.py create mode 100644 backend/app/services/evaluations/live_aggregator.py create mode 100644 backend/app/services/evaluations/live_row.py diff --git a/backend/app/alembic/versions/055_add_run_mode_to_evaluation_run.py b/backend/app/alembic/versions/055_add_run_mode_to_evaluation_run.py new file mode 100644 index 000000000..3a31a1577 --- /dev/null +++ b/backend/app/alembic/versions/055_add_run_mode_to_evaluation_run.py @@ -0,0 +1,33 @@ +"""add run_mode to evaluation_run + +Revision ID: 055 +Revises: 054 +Create Date: 2026-04-28 12:00:00.000000 + +""" + +import sqlalchemy as sa +from alembic import op + +# revision identifiers, used by Alembic. +revision = "055" +down_revision = "054" +branch_labels = None +depends_on = None + + +def upgrade(): + op.add_column( + "evaluation_run", + sa.Column( + "run_mode", + sa.String(length=10), + nullable=False, + server_default="batch", + comment="Execution mode: batch | live", + ), + ) + + +def downgrade(): + op.drop_column("evaluation_run", "run_mode") diff --git a/backend/app/api/docs/evaluation/create_evaluation.md b/backend/app/api/docs/evaluation/create_evaluation.md index a719579b2..cf301b39b 100644 --- a/backend/app/api/docs/evaluation/create_evaluation.md +++ b/backend/app/api/docs/evaluation/create_evaluation.md @@ -1,11 +1,20 @@ -Start an evaluation run using the OpenAI Batch API. +Start an evaluation run. Evaluations allow you to systematically test LLM configurations against predefined datasets with automatic progress tracking and result collection. +**Execution modes (`run_mode`):** +* `batch` (default) — submits the dataset to the OpenAI Batch API. Best for + larger datasets; queueing time can be minutes-to-hours but cost is ~50% + lower per token. The cron poller drives completion. +* `live` — fans out per-row Celery tasks against the regular Responses API. + Capped at a server-configured item count (`EVAL_LIVE_MAX_ITEMS`, default + 100). Standard (non-batch) pricing applies. Use this for fast feedback on + small datasets — typically completes in seconds. + **Key Features:** -* Fetches dataset items from Langfuse and creates a batch processing job via the OpenAI Batch API -* Asynchronous processing with automatic progress tracking (checks every 60s) +* Fetches dataset items from Langfuse and runs them through the chosen mode +* Asynchronous processing with automatic progress tracking * Uses a stored config (created via `/configs`) to define the provider parameters * Stores results for comparison and analysis * Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results @@ -17,6 +26,7 @@ predefined datasets with automatic progress tracking and result collection. "dataset_id": 123, "experiment_name": "gpt4_file_search_test", "config_id": "f54f0d67-4817-4103-9fdf-b74b3d46733e", - "config_version": 1 + "config_version": 1, + "run_mode": "live" } ``` diff --git a/backend/app/api/routes/evaluations/evaluation.py b/backend/app/api/routes/evaluations/evaluation.py index 591f5d985..fa0750810 100644 --- a/backend/app/api/routes/evaluations/evaluation.py +++ b/backend/app/api/routes/evaluations/evaluation.py @@ -1,6 +1,7 @@ """Evaluation run API routes.""" import logging +from typing import Literal from uuid import UUID from fastapi import ( @@ -45,6 +46,14 @@ def evaluate( ), config_id: UUID = Body(..., description="Stored config ID"), config_version: int = Body(..., ge=1, description="Stored config version"), + run_mode: Literal["batch", "live"] = Body( + "batch", + description=( + "Execution mode. 'batch' (default) submits the dataset to OpenAI's " + "Batch API. 'live' fans out per-row Celery tasks against the regular " + "Responses API for fast turnaround on small datasets (capped server-side)." + ), + ), ) -> APIResponse[EvaluationRunPublic]: """Start an evaluation run.""" eval_run = start_evaluation( @@ -55,6 +64,7 @@ def evaluate( config_version=config_version, organization_id=auth_context.organization_.id, project_id=auth_context.project_.id, + run_mode=run_mode, ) if eval_run.status == "failed": diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index 2448e2913..f68cc025a 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -154,6 +154,7 @@ def initialize_worker(**_) -> None: backend=settings.REDIS_URL, include=[ "app.celery.tasks.job_execution", + "app.celery.tasks.evaluation_live", ], ) @@ -178,6 +179,12 @@ def initialize_worker(**_) -> None: ), Queue("cron", exchange=default_exchange, routing_key="cron"), Queue("default", exchange=default_exchange, routing_key="default"), + Queue( + "evaluations", + exchange=default_exchange, + routing_key="evaluations", + queue_arguments={"x-max-priority": 6}, + ), ), # Task routing — queue is set per-task via @celery_app.task(queue=...). # Only cron tasks need an explicit override here. diff --git a/backend/app/celery/tasks/evaluation_live.py b/backend/app/celery/tasks/evaluation_live.py new file mode 100644 index 000000000..27b2729da --- /dev/null +++ b/backend/app/celery/tasks/evaluation_live.py @@ -0,0 +1,108 @@ +"""Celery tasks for live (non-batch) evaluation mode. + +Two tasks form a chord: + +* `run_eval_row` — one task per dataset item. Calls the OpenAI Responses + API and Embeddings API and returns a result dict. Retries on transient + OpenAI errors (rate limit, timeout, connection); permanent errors are + caught and folded into the result dict so the chord proceeds. + +* `aggregate_eval_results` — chord callback. Idempotent. Writes Langfuse + traces, computes cosine similarity, attaches cost, marks the eval run + completed. + +Both run on the dedicated `evaluations` queue so they don't compete with +user-facing `high_priority` traffic. +""" + +import logging + +import openai +from celery import current_task + +from app.celery.celery_app import celery_app +from app.celery.tasks.job_execution import _run_with_otel_parent, _set_trace + +logger = logging.getLogger(__name__) + + +@celery_app.task( + bind=True, + queue="evaluations", + priority=5, + autoretry_for=( + openai.RateLimitError, + openai.APITimeoutError, + openai.APIConnectionError, + ), + retry_backoff=True, + retry_backoff_max=60, + retry_jitter=True, + max_retries=5, + soft_time_limit=120, + time_limit=180, +) +def run_eval_row( + self, + eval_run_id: int, + item: dict, + organization_id: int, + project_id: int, + config_id: str, + config_version: int, + trace_id: str = "N/A", +): + from app.services.evaluations.live_row import execute_eval_row + + _set_trace(trace_id) + logger.info( + f"[run_eval_row] task_id={current_task.request.id} | eval={eval_run_id} | " + f"item={item.get('id')}" + ) + return _run_with_otel_parent( + self, + lambda: execute_eval_row( + eval_run_id=eval_run_id, + item=item, + organization_id=organization_id, + project_id=project_id, + config_id=config_id, + config_version=config_version, + ), + ) + + +@celery_app.task( + bind=True, + queue="evaluations", + priority=6, + autoretry_for=(Exception,), + retry_backoff=True, + max_retries=3, + soft_time_limit=240, + time_limit=300, +) +def aggregate_eval_results( + self, + row_results: list[dict], + eval_run_id: int, + organization_id: int, + project_id: int, + trace_id: str = "N/A", +): + from app.services.evaluations.live_aggregator import aggregate_results + + _set_trace(trace_id) + logger.info( + f"[aggregate_eval_results] task_id={current_task.request.id} | " + f"eval={eval_run_id} | rows={len(row_results)}" + ) + return _run_with_otel_parent( + self, + lambda: aggregate_results( + eval_run_id=eval_run_id, + organization_id=organization_id, + project_id=project_id, + row_results=row_results, + ), + ) diff --git a/backend/app/core/config.py b/backend/app/core/config.py index 6e46112de..a8bbf01bc 100644 --- a/backend/app/core/config.py +++ b/backend/app/core/config.py @@ -162,6 +162,10 @@ def AWS_S3_BUCKET(self) -> str: # this same value so its expected schedule stays aligned with the trigger. CRON_INTERVAL_MINUTES: int = 5 + # Live evaluation mode (Celery fan-out, non-batch). + EVAL_LIVE_MAX_ITEMS: int = 100 + EVAL_LIVE_FAILURE_THRESHOLD: float = 0.5 + @computed_field # type: ignore[prop-decorator] @property def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int: diff --git a/backend/app/crud/evaluations/__init__.py b/backend/app/crud/evaluations/__init__.py index a5824c0a2..08adfffb6 100644 --- a/backend/app/crud/evaluations/__init__.py +++ b/backend/app/crud/evaluations/__init__.py @@ -1,6 +1,7 @@ """Evaluation-related CRUD operations.""" from app.crud.evaluations.batch import start_evaluation_batch +from app.crud.evaluations.live import start_evaluation_live from app.crud.evaluations.core import ( create_evaluation_run, get_evaluation_run_by_id, @@ -65,6 +66,8 @@ "upload_csv_to_object_store", # Batch "start_evaluation_batch", + # Live + "start_evaluation_live", # Processing "check_and_process_evaluation", "poll_all_pending_evaluations", diff --git a/backend/app/crud/evaluations/batch.py b/backend/app/crud/evaluations/batch.py index 13fb9a50b..d5e666053 100644 --- a/backend/app/crud/evaluations/batch.py +++ b/backend/app/crud/evaluations/batch.py @@ -62,6 +62,36 @@ def fetch_dataset_items(langfuse: Langfuse, dataset_name: str) -> list[dict[str, return items +def build_response_body(question: str, config: KaapiLLMParams) -> dict[str, Any]: + """Build a Responses API request body for one dataset question. + + Shared between the batch JSONL builder and the live (per-row Celery task) + evaluation path so the request shape stays identical. + """ + body: dict[str, Any] = { + "model": config.model, + "instructions": config.instructions, + "input": question, + } + + if "temperature" in config.model_fields_set: + body["temperature"] = config.temperature + + if config.reasoning: + body["reasoning"] = {"effort": config.reasoning} + + if config.knowledge_base_ids: + body["tools"] = [ + { + "type": "file_search", + "vector_store_ids": config.knowledge_base_ids, + "max_num_results": config.max_num_results or 20, + } + ] + + return body + + def build_evaluation_jsonl( dataset_items: list[dict[str, Any]], config: KaapiLLMParams ) -> list[dict[str, Any]]: @@ -101,30 +131,7 @@ def build_evaluation_jsonl( ) continue - # Build the batch request object for Responses API - # Use config as-is and only add the input field - body: dict[str, Any] = { - "model": config.model, - "instructions": config.instructions, - "input": question, # Add input from dataset - } - - if "temperature" in config.model_fields_set: - body["temperature"] = config.temperature - - # Add reasoning only if provided - if config.reasoning: - body["reasoning"] = {"effort": config.reasoning} - - # Add tools only if knowledge_base_ids are provided - if config.knowledge_base_ids: - body["tools"] = [ - { - "type": "file_search", - "vector_store_ids": config.knowledge_base_ids, - "max_num_results": config.max_num_results or 20, - } - ] + body = build_response_body(question=question, config=config) batch_request = { BATCH_KEY: item["id"], diff --git a/backend/app/crud/evaluations/core.py b/backend/app/crud/evaluations/core.py index 6374dca76..b2080207b 100644 --- a/backend/app/crud/evaluations/core.py +++ b/backend/app/crud/evaluations/core.py @@ -59,6 +59,7 @@ def create_evaluation_run( config_version: int, organization_id: int, project_id: int, + run_mode: str = "batch", ) -> EvaluationRun: """ Create a new evaluation run record in the database. @@ -72,6 +73,7 @@ def create_evaluation_run( config_version: Version number of the config organization_id: Organization ID project_id: Project ID + run_mode: Execution mode, "batch" (default) or "live" Returns: The created EvaluationRun instance @@ -81,6 +83,7 @@ def create_evaluation_run( dataset_name=dataset_name, dataset_id=dataset_id, type=EvaluationType.TEXT.value, + run_mode=run_mode, config_id=config_id, config_version=config_version, status="pending", diff --git a/backend/app/crud/evaluations/cost.py b/backend/app/crud/evaluations/cost.py index 653232dd1..0ef9dcf2a 100644 --- a/backend/app/crud/evaluations/cost.py +++ b/backend/app/crud/evaluations/cost.py @@ -19,7 +19,7 @@ import logging from collections.abc import Callable, Iterable -from typing import Any +from typing import Any, Literal from sqlmodel import Session @@ -31,6 +31,11 @@ # USD rounding precision for persisted cost values. COST_USD_DECIMALS = 6 +# OpenAI pricing tier used by `estimate_model_cost`. "batch" is the discounted +# Batch API rate; "response" is standard (non-batch) pricing. Live evaluation +# mode uses "response" because it calls the regular Responses API. +UsageType = Literal["response", "batch"] + def _cost_usd(estimate: dict[str, Any] | None) -> float: """Sum the per-direction costs from an estimate and round to our USD precision.""" @@ -69,15 +74,16 @@ def _build_cost_entry( session: Session, model: str, totals: dict[str, int], + usage_type: UsageType = "batch", ) -> dict[str, Any]: - """Price aggregated token usage against the model's batch pricing row.""" + """Price aggregated token usage against the model's pricing row.""" estimate = estimate_model_cost( session=session, provider="openai", model_name=model, input_tokens=totals["input_tokens"], output_tokens=totals["output_tokens"], - usage_type="batch", + usage_type=usage_type, ) return { "model": model, @@ -89,7 +95,10 @@ def _build_cost_entry( def _build_response_cost_entry( - session: Session, model: str, results: list[dict[str, Any]] + session: Session, + model: str, + results: list[dict[str, Any]], + usage_type: UsageType = "batch", ) -> dict[str, Any]: """Build a response-stage cost entry from parsed evaluation results.""" totals = _sum_tokens( @@ -97,11 +106,16 @@ def _build_response_cost_entry( usage_extractor=lambda r: r.get("usage"), input_key="input_tokens", ) - return _build_cost_entry(session=session, model=model, totals=totals) + return _build_cost_entry( + session=session, model=model, totals=totals, usage_type=usage_type + ) def _build_embedding_cost_entry( - session: Session, model: str, raw_results: list[dict[str, Any]] + session: Session, + model: str, + raw_results: list[dict[str, Any]], + usage_type: UsageType = "batch", ) -> dict[str, Any]: """Build an embedding-stage cost entry from raw embedding batch output.""" totals = _sum_tokens( @@ -109,7 +123,30 @@ def _build_embedding_cost_entry( usage_extractor=lambda r: r.get("response", {}).get("body", {}).get("usage"), input_key="prompt_tokens", ) - return _build_cost_entry(session=session, model=model, totals=totals) + return _build_cost_entry( + session=session, model=model, totals=totals, usage_type=usage_type + ) + + +def _build_embedding_cost_entry_from_total( + session: Session, + model: str, + total_input_tokens: int, + usage_type: UsageType = "response", +) -> dict[str, Any]: + """Build an embedding-stage cost entry directly from a token total. + + Used by live mode, where we already have per-row embedding token counts + aggregated and don't need to walk a batch JSONL output. + """ + totals = { + "input_tokens": total_input_tokens, + "output_tokens": 0, + "total_tokens": total_input_tokens, + } + return _build_cost_entry( + session=session, model=model, totals=totals, usage_type=usage_type + ) def _build_cost_dict( @@ -141,19 +178,30 @@ def attach_cost( response_results: list[dict[str, Any]] | None = None, embedding_model: str | None = None, embedding_raw_results: list[dict[str, Any]] | None = None, + embedding_input_tokens: int | None = None, + usage_type: UsageType = "batch", ) -> None: """Compute cost for the given stage(s) and attach to `eval_run.cost`, never raising. Caller is responsible for persisting `eval_run` afterwards. Either stage's previously-computed entry on `eval_run.cost` is preserved when that stage's inputs are not supplied, so partial updates never clobber prior data. + + `usage_type` selects the pricing tier: "batch" for OpenAI Batch API runs + (default, applies to both stages) and "response" for live evaluations that + call the regular Responses + Embeddings APIs. For embedding cost, callers + using live mode pass `embedding_input_tokens` (a precomputed total) instead + of `embedding_raw_results`. """ try: existing_cost = eval_run.cost or {} if response_model is not None and response_results is not None: response_entry = _build_response_cost_entry( - session=session, model=response_model, results=response_results + session=session, + model=response_model, + results=response_results, + usage_type=usage_type, ) else: response_entry = existing_cost.get("response") @@ -163,6 +211,14 @@ def attach_cost( session=session, model=embedding_model, raw_results=embedding_raw_results, + usage_type=usage_type, + ) + elif embedding_model is not None and embedding_input_tokens is not None: + embedding_entry = _build_embedding_cost_entry_from_total( + session=session, + model=embedding_model, + total_input_tokens=embedding_input_tokens, + usage_type=usage_type, ) else: embedding_entry = existing_cost.get("embedding") diff --git a/backend/app/crud/evaluations/live.py b/backend/app/crud/evaluations/live.py new file mode 100644 index 000000000..eff8c54f0 --- /dev/null +++ b/backend/app/crud/evaluations/live.py @@ -0,0 +1,106 @@ +"""Live evaluation orchestration: fan out per-row Celery tasks via a chord. + +This is the live counterpart to `start_evaluation_batch()`. It fetches dataset +items from Langfuse, transitions the eval run to "processing", and dispatches +a Celery chord whose body is the aggregator task. +""" + +import logging +from uuid import UUID + +from celery import chord +from langfuse import Langfuse +from opentelemetry.propagate import inject +from sqlmodel import Session + +from app.crud.evaluations.batch import fetch_dataset_items +from app.models import EvaluationRun + +logger = logging.getLogger(__name__) + + +def _trace_headers() -> dict[str, str]: + """Build OTel propagation headers for fan-out tasks.""" + headers: dict[str, str] = {} + inject(headers) + out = dict(headers) + out["otel"] = headers + return out + + +def start_evaluation_live( + *, + session: Session, + eval_run: EvaluationRun, + config_id: UUID, + config_version: int, + langfuse: Langfuse, +) -> EvaluationRun: + """Fetch dataset items, mark run as processing, dispatch the Celery chord. + + Mirrors the shape of `start_evaluation_batch()` but builds no JSONL — each + row runs as its own Celery task. Failures during dispatch mark the run + as `failed` and re-raise. + """ + log_prefix = ( + f"[start_evaluation_live][org={eval_run.organization_id}]" + f"[project={eval_run.project_id}][eval={eval_run.id}]" + ) + logger.info(f"{log_prefix} Starting live evaluation | run={eval_run.run_name}") + + try: + dataset_items = fetch_dataset_items( + langfuse=langfuse, dataset_name=eval_run.dataset_name + ) + if not dataset_items: + raise ValueError("Dataset is empty") + + # Import here to avoid pulling Celery into call paths that don't need it. + from app.celery.tasks.evaluation_live import ( + aggregate_eval_results, + run_eval_row, + ) + + eval_run.status = "processing" + eval_run.total_items = len(dataset_items) + session.add(eval_run) + session.commit() + session.refresh(eval_run) + + otel_headers = _trace_headers() + + row_signatures = [ + run_eval_row.s( + eval_run_id=eval_run.id, + item=item, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + config_id=str(config_id), + config_version=config_version, + ).set(headers=otel_headers) + for item in dataset_items + ] + + callback = aggregate_eval_results.s( + eval_run_id=eval_run.id, + organization_id=eval_run.organization_id, + project_id=eval_run.project_id, + ).set(headers=otel_headers) + + async_result = chord(row_signatures)(callback) + + logger.info( + f"{log_prefix} Dispatched live chord | items={len(dataset_items)} | " + f"chord_id={async_result.id}" + ) + return eval_run + + except Exception as e: + logger.error( + f"{log_prefix} Failed to start live evaluation | {e}", exc_info=True + ) + eval_run.status = "failed" + eval_run.error_message = str(e) + session.add(eval_run) + session.commit() + raise diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 472390801..bdb7092c9 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -816,11 +816,13 @@ async def poll_all_pending_evaluations(session: Session) -> dict[str, Any]: "details": [...] } """ - # Single query to fetch all processing text evaluation runs - # STT/TTS evaluations have their own polling + # Single query to fetch all processing text evaluation runs in batch mode. + # STT/TTS evaluations have their own polling. Live-mode runs are owned by + # Celery (chord callback drives their lifecycle), so they're excluded here. statement = select(EvaluationRun).where( EvaluationRun.status == "processing", EvaluationRun.type == "text", + EvaluationRun.run_mode == "batch", ) pending_runs = session.exec(statement).all() diff --git a/backend/app/models/evaluation.py b/backend/app/models/evaluation.py index c9130d3c3..1808e46ed 100644 --- a/backend/app/models/evaluation.py +++ b/backend/app/models/evaluation.py @@ -216,6 +216,12 @@ class EvaluationRun(SQLModel, table=True): description="Evaluation type: text, stt, or tts", sa_column_kwargs={"comment": "Evaluation type: text, stt, or tts"}, ) + run_mode: str = SQLField( + default="batch", + max_length=10, + description="Execution mode: 'batch' (OpenAI Batch API) or 'live' (Celery fan-out)", + sa_column_kwargs={"comment": "Execution mode: batch | live"}, + ) language_id: int | None = SQLField( default=None, foreign_key="global.languages.id", @@ -415,6 +421,7 @@ class EvaluationRunPublic(SQLModel): id: int run_name: str dataset_name: str + run_mode: str config_id: UUID | None config_version: int | None dataset_id: int diff --git a/backend/app/services/evaluations/evaluation.py b/backend/app/services/evaluations/evaluation.py index 55653b253..ca6ca22b2 100644 --- a/backend/app/services/evaluations/evaluation.py +++ b/backend/app/services/evaluations/evaluation.py @@ -1,11 +1,13 @@ """Evaluation run orchestration service.""" import logging +from typing import Literal from uuid import UUID from fastapi import HTTPException from sqlmodel import Session +from app.core.config import settings from app.crud.evaluations import ( create_evaluation_run, fetch_trace_scores_from_langfuse, @@ -14,6 +16,7 @@ resolve_evaluation_config, save_score, start_evaluation_batch, + start_evaluation_live, ) from app.models.evaluation import EvaluationRun from app.models.llm.request import TextLLMParams, STTLLMParams, TTSLLMParams @@ -24,6 +27,8 @@ logger = logging.getLogger(__name__) +RunMode = Literal["batch", "live"] + def start_evaluation( session: Session, @@ -33,6 +38,7 @@ def start_evaluation( config_version: int, organization_id: int, project_id: int, + run_mode: RunMode = "batch", ) -> EvaluationRun: """ Start an evaluation run. @@ -41,7 +47,7 @@ def start_evaluation( 1. Validate dataset exists and has Langfuse ID 2. Resolve config from stored config management 3. Create evaluation run record - 4. Start batch processing + 4. Dispatch the chosen execution mode (batch or live) Args: session: Database session @@ -51,6 +57,10 @@ def start_evaluation( config_version: Version number of the config organization_id: Organization ID project_id: Project ID + run_mode: Execution mode. "batch" (default) submits the dataset to + OpenAI's Batch API and lets the cron poller drive completion. + "live" fans out per-row Celery tasks against the regular + Responses API for fast turnaround on small datasets. Returns: EvaluationRun instance @@ -63,7 +73,8 @@ def start_evaluation( f"dataset_id={dataset_id} | " f"org_id={organization_id} | " f"config_id={config_id} | " - f"config_version={config_version}" + f"config_version={config_version} | " + f"run_mode={run_mode}" ) # Step 1: Fetch dataset from database @@ -112,6 +123,30 @@ def start_evaluation( detail="Only 'openai' provider is supported for evaluation configs", ) + if run_mode == "live": + if config.completion.type != "text": + raise HTTPException( + status_code=422, + detail=( + f"Live mode is only supported for text evaluations " + f"(got '{config.completion.type}')" + ), + ) + total_items = ( + (dataset.dataset_metadata or {}).get("total_items_count") + if dataset.dataset_metadata + else None + ) + if total_items is not None and total_items > settings.EVAL_LIVE_MAX_ITEMS: + raise HTTPException( + status_code=422, + detail=( + f"Live mode is capped at {settings.EVAL_LIVE_MAX_ITEMS} items " + f"(dataset has {total_items}). Use run_mode='batch' for " + f"larger datasets." + ), + ) + logger.info( "[start_evaluation] Successfully resolved config from config management" ) @@ -138,10 +173,25 @@ def start_evaluation( config_version=config_version, organization_id=organization_id, project_id=project_id, + run_mode=run_mode, ) - # Step 4: Start the batch evaluation + # Step 4: Dispatch the chosen execution mode try: + if run_mode == "live": + eval_run = start_evaluation_live( + session=session, + eval_run=eval_run, + config_id=config_id, + config_version=config_version, + langfuse=langfuse, + ) + logger.info( + f"[start_evaluation] Live evaluation dispatched | " + f"run_id={eval_run.id} | total_items={eval_run.total_items}" + ) + return eval_run + # Convert params dict to appropriate model instance based on type param_models = { "text": TextLLMParams, @@ -171,7 +221,7 @@ def start_evaluation( f"[start_evaluation] Failed to start evaluation | run_id={eval_run.id} | {e}", exc_info=True, ) - # Error is already handled in start_evaluation_batch + # Error is already handled by start_evaluation_batch / start_evaluation_live session.refresh(eval_run) return eval_run diff --git a/backend/app/services/evaluations/live_aggregator.py b/backend/app/services/evaluations/live_aggregator.py new file mode 100644 index 000000000..f6d985648 --- /dev/null +++ b/backend/app/services/evaluations/live_aggregator.py @@ -0,0 +1,270 @@ +"""Live evaluation: chord callback that aggregates per-row results. + +Runs once after every `run_eval_row` task in the chord finishes. Idempotent +on re-entry (chord callbacks have at-least-once delivery semantics) — checks +the run's terminal status first and bails if already completed/failed. +""" + +import logging +from collections import Counter +from typing import Any + +from sqlmodel import Session + +from app.core.cloud import get_cloud_storage +from app.core.config import settings +from app.core.db import engine +from app.core.storage_utils import upload_jsonl_to_object_store +from app.crud.evaluations.core import ( + get_evaluation_run_by_id, + resolve_model_from_config, + update_evaluation_run, +) +from app.crud.evaluations.cost import attach_cost +from app.crud.evaluations.embeddings import ( + EMBEDDING_MODEL, + calculate_average_similarity, +) +from app.crud.evaluations.langfuse import ( + create_langfuse_dataset_run, + update_traces_with_cosine_scores, +) +from app.models import EvaluationRunUpdate +from app.utils import get_langfuse_client + +logger = logging.getLogger(__name__) + +TERMINAL_STATUSES = {"completed", "failed"} + + +def _summarise_failures(failures: list[dict[str, Any]]) -> str: + counts = Counter(f.get("error") or "Unknown error" for f in failures) + top_error, top_count = counts.most_common(1)[0] + return f"{top_error} ({top_count}/{len(failures)} failures)" + + +def aggregate_results( + *, + eval_run_id: int, + organization_id: int, + project_id: int, + row_results: list[dict[str, Any]], +) -> dict[str, Any]: + """Finalize a live evaluation run from collected per-row results. + + Returns a small summary dict mainly for observability — Celery does not + consume it. + """ + log_prefix = ( + f"[aggregate_results][org={organization_id}][project={project_id}]" + f"[eval={eval_run_id}]" + ) + + with Session(engine) as session: + eval_run = get_evaluation_run_by_id( + session=session, + evaluation_id=eval_run_id, + organization_id=organization_id, + project_id=project_id, + ) + if eval_run is None: + logger.error(f"{log_prefix} EvaluationRun not found, aborting aggregation") + return {"status": "missing", "eval_run_id": eval_run_id} + + if eval_run.status in TERMINAL_STATUSES: + logger.info( + f"{log_prefix} EvaluationRun already in terminal status " + f"'{eval_run.status}', no-op (chord double-fire guard)" + ) + return {"status": eval_run.status, "eval_run_id": eval_run_id} + + total = len(row_results) + successes = [r for r in row_results if not r.get("error")] + failures = [r for r in row_results if r.get("error")] + + logger.info( + f"{log_prefix} Aggregating | total={total} | successes={len(successes)} | " + f"failures={len(failures)}" + ) + + # Failure-threshold gate: too many bad rows -> mark whole run failed. + if total == 0 or ( + failures and len(failures) / total > settings.EVAL_LIVE_FAILURE_THRESHOLD + ): + error_message = ( + _summarise_failures(failures) if failures else "No row results returned" + ) + update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate( + status="failed", + error_message=f"Live mode: {error_message}", + ), + ) + return { + "status": "failed", + "eval_run_id": eval_run_id, + "error": error_message, + } + + # Persist raw row results to object store for debugging / re-processing. + object_store_url: str | None = None + try: + storage = get_cloud_storage(session=session, project_id=project_id) + object_store_url = upload_jsonl_to_object_store( + storage=storage, + results=row_results, + filename="results.jsonl", + subdirectory=f"evaluation/live-{eval_run_id}", + ) + except Exception as store_err: + logger.warning(f"{log_prefix} Object store upload failed | {store_err}") + + if object_store_url: + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(object_store_url=object_store_url), + ) + + # Resolve model name now (uses the same config the rows ran against). + try: + model = resolve_model_from_config(session=session, eval_run=eval_run) + except Exception as e: + logger.error(f"{log_prefix} Failed to resolve model | {e}", exc_info=True) + update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate( + status="failed", + error_message=f"Live mode: model resolution failed: {e}", + ), + ) + return {"status": "failed", "eval_run_id": eval_run_id, "error": str(e)} + + # Response-stage cost. Live mode -> standard ("response") pricing. + attach_cost( + session=session, + eval_run=eval_run, + log_prefix=log_prefix, + response_model=model, + response_results=successes, + usage_type="response", + ) + + # Langfuse traces for successful rows. Same helper as batch path. + langfuse = get_langfuse_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + try: + trace_id_mapping = create_langfuse_dataset_run( + langfuse=langfuse, + dataset_name=eval_run.dataset_name, + run_name=eval_run.run_name, + results=successes, + model=model, + ) + except Exception as e: + logger.error( + f"{log_prefix} Langfuse dataset run creation failed | {e}", + exc_info=True, + ) + trace_id_mapping = {} + + # Inline cosine similarity from the per-row vectors the row task + # already computed — no extra API call. + embedding_pairs: list[dict[str, Any]] = [] + embedding_input_tokens = 0 + for r in successes: + embedding_input_tokens += r.get("embedding_input_tokens") or 0 + output_emb = r.get("output_embedding") + ground_truth_emb = r.get("ground_truth_embedding") + trace_id = trace_id_mapping.get(r["item_id"]) + if not trace_id or not output_emb or not ground_truth_emb: + continue + embedding_pairs.append( + { + "trace_id": trace_id, + "output_embedding": output_emb, + "ground_truth_embedding": ground_truth_emb, + } + ) + + score_payload: dict[str, Any] | None = None + if embedding_pairs: + similarity_stats = calculate_average_similarity( + embedding_pairs=embedding_pairs + ) + score_payload = { + "summary_scores": [ + { + "name": "Cosine Similarity", + "avg": round( + float(similarity_stats["cosine_similarity_avg"]), 2 + ), + "std": round( + float(similarity_stats["cosine_similarity_std"]), 2 + ), + "total_pairs": similarity_stats["total_pairs"], + "data_type": "NUMERIC", + } + ] + } + + per_item_scores = similarity_stats.get("per_item_scores", []) + if per_item_scores: + try: + update_traces_with_cosine_scores( + langfuse=langfuse, + per_item_scores=per_item_scores, + ) + except Exception as e: + logger.error( + f"{log_prefix} Failed to push cosine scores to Langfuse | {e}", + exc_info=True, + ) + + # Embedding-stage cost from aggregated tokens. Standard pricing. + if embedding_input_tokens > 0: + attach_cost( + session=session, + eval_run=eval_run, + log_prefix=log_prefix, + embedding_model=EMBEDDING_MODEL, + embedding_input_tokens=embedding_input_tokens, + usage_type="response", + ) + + update_payload: dict[str, Any] = { + "status": "completed", + "total_items": len(successes), + "cost": eval_run.cost, + } + if score_payload is not None: + update_payload["score"] = score_payload + if failures: + update_payload["error_message"] = ( + f"Live mode: {len(failures)}/{total} rows failed " + f"(top error: {_summarise_failures(failures)})" + ) + + update_evaluation_run( + session=session, + eval_run=eval_run, + update=EvaluationRunUpdate(**update_payload), + ) + + logger.info( + f"{log_prefix} Completed live evaluation | successes={len(successes)} | " + f"failures={len(failures)}" + ) + + return { + "status": "completed", + "eval_run_id": eval_run_id, + "successes": len(successes), + "failures": len(failures), + } diff --git a/backend/app/services/evaluations/live_row.py b/backend/app/services/evaluations/live_row.py new file mode 100644 index 000000000..9d20bcb3f --- /dev/null +++ b/backend/app/services/evaluations/live_row.py @@ -0,0 +1,220 @@ +"""Live evaluation: per-row Responses + Embeddings execution. + +Runs inside a single Celery task per dataset item. Returns a dict matching +the shape `parse_evaluation_output()` produces in batch mode, plus an `error` +field and the two embedding vectors used for cosine similarity. The aggregator +partitions on `error` and reuses the rest of the existing post-processing path. +""" + +import logging +from typing import Any + +import openai +from openai import OpenAI +from sqlmodel import Session + +from app.core.db import engine +from app.crud.evaluations.batch import build_response_body +from app.crud.evaluations.core import resolve_evaluation_config +from app.crud.evaluations.embeddings import EMBEDDING_MODEL +from app.models.llm.request import STTLLMParams, TextLLMParams, TTSLLMParams +from app.utils import get_openai_client, handle_openai_error + +logger = logging.getLogger(__name__) + +# Errors worth re-raising so Celery's autoretry can back off and try again. +# Anything else (auth, validation, model-not-found, etc.) is a permanent +# failure for this row — captured in the result dict so the aggregator can +# count it without crashing the chord. +RETRYABLE_OPENAI_ERRORS = ( + openai.RateLimitError, + openai.APITimeoutError, + openai.APIConnectionError, +) + + +def _resolve_text_params( + session: Session, eval_run_id: int, project_id: int, config_id, config_version: int +) -> TextLLMParams: + config, error = resolve_evaluation_config( + session=session, + config_id=config_id, + config_version=config_version, + project_id=project_id, + ) + if error or config is None: + raise ValueError( + f"[execute_eval_row] Config resolution failed | eval_run_id={eval_run_id} | " + f"error={error}" + ) + + param_models = { + "text": TextLLMParams, + "stt": STTLLMParams, + "tts": TTSLLMParams, + } + model_class = param_models.get(config.completion.type) + if model_class is not TextLLMParams: + raise ValueError( + f"[execute_eval_row] Live mode only supports text evaluations | " + f"eval_run_id={eval_run_id} | type={config.completion.type}" + ) + return TextLLMParams.model_validate(config.completion.params) + + +def _build_error_result(item: dict[str, Any], error_message: str) -> dict[str, Any]: + """Return a result dict marked as failed. Aggregator partitions on `error`.""" + question = item.get("input", {}).get("question", "") + ground_truth = item.get("expected_output", {}).get("answer", "") + question_id = item.get("metadata", {}).get("question_id") + return { + "item_id": item["id"], + "question": question, + "generated_output": "", + "ground_truth": ground_truth, + "response_id": None, + "usage": None, + "question_id": question_id, + "error": error_message, + "output_embedding": None, + "ground_truth_embedding": None, + "embedding_input_tokens": 0, + } + + +def execute_eval_row( + *, + eval_run_id: int, + item: dict[str, Any], + organization_id: int, + project_id: int, + config_id: Any, + config_version: int, +) -> dict[str, Any]: + """Run the Responses + Embeddings pair for one dataset item. + + Resolves the config and OpenAI client per-call (so each Celery task is + self-contained and reflects current credentials). Returns a result dict + on both success and permanent failure; raises on retryable transient + errors so the task wrapper can back off. + """ + log_prefix = f"[execute_eval_row][eval={eval_run_id}][item={item.get('id')}]" + + question = item.get("input", {}).get("question", "") + ground_truth = item.get("expected_output", {}).get("answer", "") + question_id = item.get("metadata", {}).get("question_id") + + if not question: + logger.warning(f"{log_prefix} Skipping item - no question found") + return _build_error_result(item, "Item has no question") + + with Session(engine) as session: + try: + text_params = _resolve_text_params( + session=session, + eval_run_id=eval_run_id, + project_id=project_id, + config_id=config_id, + config_version=config_version, + ) + client: OpenAI = get_openai_client( + session=session, + org_id=organization_id, + project_id=project_id, + ) + except Exception as e: + logger.error(f"{log_prefix} Setup failed | {e}", exc_info=True) + return _build_error_result(item, f"Setup failed: {e}") + + body = build_response_body(question=question, config=text_params) + + try: + response = client.responses.create(**body) + except RETRYABLE_OPENAI_ERRORS: + # Let Celery autoretry handle these — re-raise. + raise + except openai.OpenAIError as e: + error_message = handle_openai_error(e) + logger.error(f"{log_prefix} Responses API permanent error | {error_message}") + return _build_error_result(item, error_message) + + generated_output = response.output_text or "" + usage = { + "input_tokens": response.usage.input_tokens, + "output_tokens": response.usage.output_tokens, + "total_tokens": response.usage.total_tokens, + } + + if not generated_output or not ground_truth: + # No way to compute cosine similarity without both texts. Keep the + # response result for trace creation, but skip embeddings. + logger.info(f"{log_prefix} Skipping embeddings - empty output or ground_truth") + return { + "item_id": item["id"], + "question": question, + "generated_output": generated_output, + "ground_truth": ground_truth, + "response_id": response.id, + "usage": usage, + "question_id": question_id, + "error": None, + "output_embedding": None, + "ground_truth_embedding": None, + "embedding_input_tokens": 0, + } + + try: + embedding_response = client.embeddings.create( + model=EMBEDDING_MODEL, + input=[generated_output, ground_truth], + encoding_format="float", + ) + except RETRYABLE_OPENAI_ERRORS: + raise + except openai.OpenAIError as e: + # Keep the response result; the aggregator skips embeddings for this row. + logger.error( + f"{log_prefix} Embeddings API error, continuing without similarity | " + f"{handle_openai_error(e)}" + ) + return { + "item_id": item["id"], + "question": question, + "generated_output": generated_output, + "ground_truth": ground_truth, + "response_id": response.id, + "usage": usage, + "question_id": question_id, + "error": None, + "output_embedding": None, + "ground_truth_embedding": None, + "embedding_input_tokens": 0, + } + + output_embedding = None + ground_truth_embedding = None + for emb in embedding_response.data: + if emb.index == 0: + output_embedding = emb.embedding + elif emb.index == 1: + ground_truth_embedding = emb.embedding + + embedding_input_tokens = ( + embedding_response.usage.prompt_tokens + if embedding_response.usage is not None + else 0 + ) + + return { + "item_id": item["id"], + "question": question, + "generated_output": generated_output, + "ground_truth": ground_truth, + "response_id": response.id, + "usage": usage, + "question_id": question_id, + "error": None, + "output_embedding": output_embedding, + "ground_truth_embedding": ground_truth_embedding, + "embedding_input_tokens": embedding_input_tokens, + }