Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 33 additions & 0 deletions backend/app/alembic/versions/055_add_run_mode_to_evaluation_run.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""add run_mode to evaluation_run

Revision ID: 055
Revises: 054
Create Date: 2026-04-28 12:00:00.000000

"""

import sqlalchemy as sa
from alembic import op

# revision identifiers, used by Alembic.
revision = "055"
down_revision = "054"
branch_labels = None
depends_on = None


def upgrade():
op.add_column(
"evaluation_run",
sa.Column(
"run_mode",
sa.String(length=10),
nullable=False,
server_default="batch",
comment="Execution mode: batch | live",
),
)


def downgrade():
op.drop_column("evaluation_run", "run_mode")
18 changes: 14 additions & 4 deletions backend/app/api/docs/evaluation/create_evaluation.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,20 @@
Start an evaluation run using the OpenAI Batch API.
Start an evaluation run.

Evaluations allow you to systematically test LLM configurations against
predefined datasets with automatic progress tracking and result collection.

**Execution modes (`run_mode`):**
* `batch` (default) — submits the dataset to the OpenAI Batch API. Best for
larger datasets; queueing time can be minutes-to-hours but cost is ~50%
lower per token. The cron poller drives completion.
* `live` — fans out per-row Celery tasks against the regular Responses API.
Capped at a server-configured item count (`EVAL_LIVE_MAX_ITEMS`, default
100). Standard (non-batch) pricing applies. Use this for fast feedback on
small datasets — typically completes in seconds.

**Key Features:**
* Fetches dataset items from Langfuse and creates a batch processing job via the OpenAI Batch API
* Asynchronous processing with automatic progress tracking (checks every 60s)
* Fetches dataset items from Langfuse and runs them through the chosen mode
* Asynchronous processing with automatic progress tracking
* Uses a stored config (created via `/configs`) to define the provider parameters
* Stores results for comparison and analysis
* Use `GET /evaluations/{evaluation_id}` to monitor progress and retrieve results
Expand All @@ -17,6 +26,7 @@ predefined datasets with automatic progress tracking and result collection.
"dataset_id": 123,
"experiment_name": "gpt4_file_search_test",
"config_id": "f54f0d67-4817-4103-9fdf-b74b3d46733e",
"config_version": 1
"config_version": 1,
"run_mode": "live"
}
```
10 changes: 10 additions & 0 deletions backend/app/api/routes/evaluations/evaluation.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Evaluation run API routes."""

import logging
from typing import Literal
from uuid import UUID

from fastapi import (
Expand Down Expand Up @@ -45,6 +46,14 @@ def evaluate(
),
config_id: UUID = Body(..., description="Stored config ID"),
config_version: int = Body(..., ge=1, description="Stored config version"),
run_mode: Literal["batch", "live"] = Body(
"batch",
description=(
"Execution mode. 'batch' (default) submits the dataset to OpenAI's "
"Batch API. 'live' fans out per-row Celery tasks against the regular "
"Responses API for fast turnaround on small datasets (capped server-side)."
),
),
) -> APIResponse[EvaluationRunPublic]:
"""Start an evaluation run."""
eval_run = start_evaluation(
Expand All @@ -55,6 +64,7 @@ def evaluate(
config_version=config_version,
organization_id=auth_context.organization_.id,
project_id=auth_context.project_.id,
run_mode=run_mode,
)

if eval_run.status == "failed":
Expand Down
7 changes: 7 additions & 0 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ def initialize_worker(**_) -> None:
backend=settings.REDIS_URL,
include=[
"app.celery.tasks.job_execution",
"app.celery.tasks.evaluation_live",
],
)

Expand All @@ -178,6 +179,12 @@ def initialize_worker(**_) -> None:
),
Queue("cron", exchange=default_exchange, routing_key="cron"),
Queue("default", exchange=default_exchange, routing_key="default"),
Queue(
"evaluations",
exchange=default_exchange,
routing_key="evaluations",
queue_arguments={"x-max-priority": 6},
),
),
# Task routing — queue is set per-task via @celery_app.task(queue=...).
# Only cron tasks need an explicit override here.
Expand Down
108 changes: 108 additions & 0 deletions backend/app/celery/tasks/evaluation_live.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Celery tasks for live (non-batch) evaluation mode.

Two tasks form a chord:

* `run_eval_row` — one task per dataset item. Calls the OpenAI Responses
API and Embeddings API and returns a result dict. Retries on transient
OpenAI errors (rate limit, timeout, connection); permanent errors are
caught and folded into the result dict so the chord proceeds.

* `aggregate_eval_results` — chord callback. Idempotent. Writes Langfuse
traces, computes cosine similarity, attaches cost, marks the eval run
completed.

Both run on the dedicated `evaluations` queue so they don't compete with
user-facing `high_priority` traffic.
"""

import logging

import openai
from celery import current_task

from app.celery.celery_app import celery_app
from app.celery.tasks.job_execution import _run_with_otel_parent, _set_trace

logger = logging.getLogger(__name__)


@celery_app.task(
bind=True,
queue="evaluations",
priority=5,
autoretry_for=(
openai.RateLimitError,
openai.APITimeoutError,
openai.APIConnectionError,
),
retry_backoff=True,
retry_backoff_max=60,
retry_jitter=True,
max_retries=5,
soft_time_limit=120,
time_limit=180,
)
def run_eval_row(
self,
eval_run_id: int,
item: dict,
organization_id: int,
project_id: int,
config_id: str,
config_version: int,
trace_id: str = "N/A",
):
from app.services.evaluations.live_row import execute_eval_row

_set_trace(trace_id)
logger.info(
f"[run_eval_row] task_id={current_task.request.id} | eval={eval_run_id} | "
f"item={item.get('id')}"
)
return _run_with_otel_parent(
self,
lambda: execute_eval_row(
eval_run_id=eval_run_id,
item=item,
organization_id=organization_id,
project_id=project_id,
config_id=config_id,
config_version=config_version,
),
)


@celery_app.task(
bind=True,
queue="evaluations",
priority=6,
autoretry_for=(Exception,),
retry_backoff=True,
max_retries=3,
soft_time_limit=240,
time_limit=300,
)
def aggregate_eval_results(
self,
row_results: list[dict],
eval_run_id: int,
organization_id: int,
project_id: int,
trace_id: str = "N/A",
):
from app.services.evaluations.live_aggregator import aggregate_results

_set_trace(trace_id)
logger.info(
f"[aggregate_eval_results] task_id={current_task.request.id} | "
f"eval={eval_run_id} | rows={len(row_results)}"
)
return _run_with_otel_parent(
self,
lambda: aggregate_results(
eval_run_id=eval_run_id,
organization_id=organization_id,
project_id=project_id,
row_results=row_results,
),
)
4 changes: 4 additions & 0 deletions backend/app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,10 @@ def AWS_S3_BUCKET(self) -> str:
# this same value so its expected schedule stays aligned with the trigger.
CRON_INTERVAL_MINUTES: int = 5

# Live evaluation mode (Celery fan-out, non-batch).
EVAL_LIVE_MAX_ITEMS: int = 100
EVAL_LIVE_FAILURE_THRESHOLD: float = 0.5

@computed_field # type: ignore[prop-decorator]
@property
def COMPUTED_CELERY_WORKER_CONCURRENCY(self) -> int:
Expand Down
3 changes: 3 additions & 0 deletions backend/app/crud/evaluations/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Evaluation-related CRUD operations."""

from app.crud.evaluations.batch import start_evaluation_batch
from app.crud.evaluations.live import start_evaluation_live
from app.crud.evaluations.core import (
create_evaluation_run,
get_evaluation_run_by_id,
Expand Down Expand Up @@ -65,6 +66,8 @@
"upload_csv_to_object_store",
# Batch
"start_evaluation_batch",
# Live
"start_evaluation_live",
# Processing
"check_and_process_evaluation",
"poll_all_pending_evaluations",
Expand Down
55 changes: 31 additions & 24 deletions backend/app/crud/evaluations/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,36 @@ def fetch_dataset_items(langfuse: Langfuse, dataset_name: str) -> list[dict[str,
return items


def build_response_body(question: str, config: KaapiLLMParams) -> dict[str, Any]:
"""Build a Responses API request body for one dataset question.

Shared between the batch JSONL builder and the live (per-row Celery task)
evaluation path so the request shape stays identical.
"""
body: dict[str, Any] = {
"model": config.model,
"instructions": config.instructions,
"input": question,
}

if "temperature" in config.model_fields_set:
body["temperature"] = config.temperature

if config.reasoning:
body["reasoning"] = {"effort": config.reasoning}

if config.knowledge_base_ids:
body["tools"] = [
{
"type": "file_search",
"vector_store_ids": config.knowledge_base_ids,
"max_num_results": config.max_num_results or 20,
}
]

return body


def build_evaluation_jsonl(
dataset_items: list[dict[str, Any]], config: KaapiLLMParams
) -> list[dict[str, Any]]:
Expand Down Expand Up @@ -101,30 +131,7 @@ def build_evaluation_jsonl(
)
continue

# Build the batch request object for Responses API
# Use config as-is and only add the input field
body: dict[str, Any] = {
"model": config.model,
"instructions": config.instructions,
"input": question, # Add input from dataset
}

if "temperature" in config.model_fields_set:
body["temperature"] = config.temperature

# Add reasoning only if provided
if config.reasoning:
body["reasoning"] = {"effort": config.reasoning}

# Add tools only if knowledge_base_ids are provided
if config.knowledge_base_ids:
body["tools"] = [
{
"type": "file_search",
"vector_store_ids": config.knowledge_base_ids,
"max_num_results": config.max_num_results or 20,
}
]
body = build_response_body(question=question, config=config)

batch_request = {
BATCH_KEY: item["id"],
Expand Down
3 changes: 3 additions & 0 deletions backend/app/crud/evaluations/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def create_evaluation_run(
config_version: int,
organization_id: int,
project_id: int,
run_mode: str = "batch",
) -> EvaluationRun:
"""
Create a new evaluation run record in the database.
Expand All @@ -75,6 +76,7 @@ def create_evaluation_run(
config_version: Version number of the config
organization_id: Organization ID
project_id: Project ID
run_mode: Execution mode, "batch" (default) or "live"

Returns:
The created EvaluationRun instance
Expand All @@ -84,6 +86,7 @@ def create_evaluation_run(
dataset_name=dataset_name,
dataset_id=dataset_id,
type=EvaluationType.TEXT.value,
run_mode=run_mode,
config_id=config_id,
config_version=config_version,
status="pending",
Expand Down
Loading
Loading