Skip to content

Evaluation#405

Merged
AkhileshNegi merged 72 commits intomainfrom
feature/evaluation
Nov 6, 2025
Merged

Evaluation#405
AkhileshNegi merged 72 commits intomainfrom
feature/evaluation

Conversation

@AkhileshNegi
Copy link
Copy Markdown
Collaborator

@AkhileshNegi AkhileshNegi commented Oct 13, 2025

Summary

Target issue is #417

Checklist

Before submitting a pull request, please ensure that you mark these task.

  • Ran fastapi run --reload app/main.py or docker compose up in the repository root and test.
  • If you've fixed a bug or added code that is tested and has test cases.

Notes

Here is the dataflow

  1. Upload CSV → Langfuse Dataset (5x duplication)

  2. Start Evaluation → Create EvaluationRun (pending)

  3. Fetch Dataset → Build JSONL for OpenAI Batch API

  4. Upload JSONL → OpenAI Files API

  5. Create Batch → OpenAI Batch API (returns batch_id)

  6. Update DB → BatchJob + EvaluationRun (processing)

  7. Wait → OpenAI processes batch async (1-24 hours)

  8. Celery Beat → Polls every 60 seconds

  9. Check Status → Poll OpenAI batch status

  10. When Response Batch Completed:

    • Download results (JSONL)
    • Parse output + match ground truth
    • Upload to S3 (optional)
    • Create Langfuse traces
  11. Build Embedding JSONL → [output, ground_truth] pairs per trace

  12. Create Embedding Batch → OpenAI Embeddings API (returns batch_id)

  13. Update DB → Link embedding_batch_job_id (status: processing)

  14. Wait → OpenAI processes embeddings (1-24 hours)

  15. Celery Beat → Polls embedding batch status

  16. When Embedding Batch Completed:

    • Download embedding results
    • Parse embedding pairs (output_emb, ground_truth_emb)
    • Calculate cosine similarity per trace
    • Calculate aggregate stats (avg, min, max, std)
    • Update Langfuse traces with cosine scores
    • Update eval_run.score with similarity metrics
    • Mark evaluation complete
  17. User views results via API or Langfuse UI

Summary by CodeRabbit

  • New Features

    • Added evaluation dataset management: upload CSV datasets, list, retrieve, and delete.
    • Added batch evaluation runs with asynchronous processing and status tracking.
    • Added cosine similarity scoring for evaluation results.
    • Added automated cron job to process pending evaluations continuously.
  • Bug Fixes

    • Inactive user authentication now returns HTTP 403 instead of HTTP 400.

Akhilesh Negi and others added 15 commits May 28, 2025 09:53
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Oct 13, 2025

Walkthrough

Adds a full evaluation subsystem: dataset CSV upload/management, evaluation run lifecycle, batch provider abstractions with OpenAI batch integration, embedding-based scoring and Langfuse integration, cron-driven polling, DB models/migration, storage utilities, tests, docs, and small auth/endpoint adjustments.

Changes

Cohort / File(s) Summary
API routes & docs
backend/app/api/routes/evaluation.py, backend/app/api/routes/cron.py, backend/app/api/main.py, backend/app/api/routes/llm.py, backend/app/api/deps.py, backend/app/api/docs/evaluation/*
New evaluation and cron endpoints; cron router included in main; llm endpoint changed to synchronous; get_current_user now returns HTTP 403 for inactive users; comprehensive evaluation docs added.
Batch core & providers
backend/app/core/batch/base.py, backend/app/core/batch/openai.py, backend/app/core/batch/__init__.py
New BatchProvider interface and OpenAIBatchProvider implementing create/poll/download/upload/download_file for batch workflows.
Batch CRUD & orchestration
backend/app/crud/batch_job.py, backend/app/crud/batch_operations.py
New BatchJob CRUD and orchestration: start, poll, download, process completed batches, and upload results to object store (S3 alias).
Evaluation CRUD & processing
backend/app/crud/evaluations/__init__.py, .../core.py, .../dataset.py, .../batch.py, .../embeddings.py, .../langfuse.py, .../processing.py, .../cron.py
End-to-end evaluation workflows: CSV parsing/upload to Langfuse, dataset CRUD, evaluation run create/list/get/delete, JSONL building, batch submission, embedding batches, result parsing, cosine similarity scoring, Langfuse trace updates, and polling/processing (async + sync wrapper).
Models & migration
backend/app/models/batch_job.py, backend/app/models/evaluation.py, backend/app/models/__init__.py, backend/app/models/organization.py, backend/app/models/project.py, backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py
New SQLModel tables/schemas for BatchJob, EvaluationDataset, EvaluationRun with DTOs and re-exports; import/type-checking tweaks; Alembic migration creating tables, indices, and constraints.
Storage utilities
backend/app/core/storage_utils.py, backend/app/crud/evaluations/dataset.py
Utilities to upload CSV/JSONL to object storage with timestamped filenames; dataset upload/download helpers and aliases.
Celery & cron tooling
backend/app/celery/celery_app.py, scripts/python/invoke-cron.py, .env.example
Celery include formatting adjusted and beat_schedule removed; new async cron invoker script and .env entries for cron.
Tests
backend/app/tests/api/routes/test_evaluation.py, backend/app/tests/crud/evaluations/*
New unit tests covering dataset upload/duplication/validation, Langfuse interactions, batch JSONL building, embeddings parsing and similarity calculations, dataset CRUD and storage interactions.
Misc & deps
backend/app/utils.py, backend/pyproject.toml
get_langfuse_client now raises HTTP errors on missing/invalid creds and returns Langfuse (no Optional); NumPy dependency added.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant API as Evaluation API
    participant DB as Database
    participant Storage as Object Store
    participant Langfuse
    participant OpenAI as OpenAI Batch
    participant Cron as Cron Worker

    User->>API: POST /upload_dataset (CSV)
    API->>Storage: (optional) upload CSV
    API->>Langfuse: create dataset & upload items
    API->>DB: store EvaluationDataset
    API-->>User: DatasetUploadResponse

    User->>API: POST /evaluations (dataset_id, config)
    API->>DB: create EvaluationRun (pending)
    API->>Langfuse: fetch dataset items
    API->>API: build JSONL
    API->>OpenAI: submit batch job
    API->>DB: update run with batch_job_id
    API-->>User: EvaluationRunPublic

    Cron->>API: GET /cron/evaluations
    API->>DB: query pending EvaluationRuns
    loop per pending run
      API->>OpenAI: poll batch status
      alt complete
        API->>OpenAI: download results
        API->>Storage: (optional) upload results
        API->>Langfuse: create traces
        API->>OpenAI: start embedding batch
        API->>DB: update run status/metrics
      else not complete
        API->>DB: no change
      end
    end
    API-->>Cron: summary
Loading
sequenceDiagram
    participant Embedding as Embedding Batch
    participant OpenAI
    participant Scorer
    participant Langfuse
    participant DB

    Embedding->>OpenAI: poll embedding batch status
    alt complete
      Embedding->>OpenAI: download embedding results
      Embedding->>Scorer: parse embeddings & pairings
      Scorer->>Scorer: compute cosine similarities (per-item)
      Scorer->>Scorer: aggregate mean/std
      Scorer->>Langfuse: update traces with scores
      Scorer->>DB: update EvaluationRun with scores
    else failed
      Embedding->>DB: mark run completed with error
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Areas requiring extra attention:

  • backend/app/crud/batch_operations.py and backend/app/crud/evaluations/processing.py — orchestration, state transitions, error propagation.
  • backend/app/crud/evaluations/langfuse.py and backend/app/crud/evaluations/core.py — Langfuse SDK usage, CSV parsing/duplication, flush semantics.
  • backend/app/core/batch/openai.py — OpenAI Batch API interactions, JSONL handling, error parsing.
  • backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py — FK constraints, indices, and downgrade order.
  • backend/app/utils.py and backend/app/api/deps.py — authentication/credential error paths and HTTP status semantics.
  • backend/app/crud/evaluations/embeddings.py — numerical correctness and edge-case handling in embedding pairing and similarity aggregation.

Possibly related PRs

Suggested labels

ready-for-review

Suggested reviewers

  • avirajsingh7
  • nishika26
  • kartpop

Poem

🐰 I nibble CSVs and hop through queues,
I stitch JSONL whispers and batch all the views,
Langfuse keeps traces, embeddings take flight,
Cron hums at intervals through day and through night,
A rabbit-built pipeline — the tests pass just right!

Pre-merge checks and finishing touches

❌ Failed checks (1 warning, 1 inconclusive)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 46.67% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
Title check ❓ Inconclusive The title 'Evaluation' is vague and generic, using a broad term without specifying the feature or change being introduced. Consider using a more descriptive title like 'Add evaluation feature with dataset uploads and batch processing' or 'Implement evaluation workflow with OpenAI batch API and embeddings' to better communicate the scope of changes.
✅ Passed checks (1 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch feature/evaluation

📜 Recent review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 8ad6982 and c08d626.

📒 Files selected for processing (1)
  • backend/app/api/routes/evaluation.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/api/routes/evaluation.py
backend/app/api/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Expose FastAPI REST endpoints under backend/app/api/ organized by domain

Files:

  • backend/app/api/routes/evaluation.py
🧠 Learnings (2)
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/models/**/*.py : Define SQLModel entities (database tables and domain objects) in backend/app/models/

Applied to files:

  • backend/app/api/routes/evaluation.py
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to backend/app/core/finetune/**/*.py : Place fine-tuning utilities under backend/app/core/finetune/

Applied to files:

  • backend/app/api/routes/evaluation.py
🧬 Code graph analysis (1)
backend/app/api/routes/evaluation.py (9)
backend/app/core/cloud/storage.py (1)
  • get_cloud_storage (262-279)
backend/app/crud/assistants.py (1)
  • get_assistant_by_id (19-30)
backend/app/crud/evaluations/dataset.py (5)
  • create_evaluation_dataset (31-104)
  • get_dataset_by_id (107-140)
  • list_datasets (175-210)
  • upload_csv_to_object_store (213-245)
  • delete_dataset (323-387)
backend/app/crud/evaluations/core.py (3)
  • create_evaluation_run (136-178)
  • get_evaluation_run_by_id (220-258)
  • list_evaluation_runs (181-217)
backend/app/crud/evaluations/batch.py (1)
  • start_evaluation_batch (118-212)
backend/app/core/storage_utils.py (1)
  • upload_csv_to_object_store (22-83)
backend/app/crud/evaluations/langfuse.py (1)
  • upload_dataset_to_langfuse_from_csv (181-297)
backend/app/models/evaluation.py (2)
  • DatasetUploadResponse (24-43)
  • EvaluationRunPublic (246-264)
backend/app/utils.py (4)
  • get_langfuse_client (209-245)
  • get_openai_client (176-206)
  • load_description (288-293)
  • _ (297-298)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
  • GitHub Check: checks (3.11.7, 6)
🔇 Additional comments (8)
backend/app/api/routes/evaluation.py (8)

1-41: LGTM: Imports and security constants are well-defined.

The security constants appropriately restrict uploads to CSV files with a 1MB limit, and all necessary dependencies are imported.


44-89: LGTM: Dataset name sanitization logic is correct.

The function properly handles Langfuse naming constraints with clear transformation rules and appropriate error handling for edge cases.


92-293: LGTM: Comprehensive dataset upload implementation.

The endpoint properly validates CSV files with security checks (extension, MIME type, size), handles optional object store uploads gracefully, uploads to Langfuse, and persists metadata. Error handling uses semantically correct status codes, and the multi-step process is well-logged.


296-336: LGTM: Dataset listing endpoint is properly implemented.

The endpoint correctly scopes queries to the current organization and project, caps the limit at 100, and transforms database records to the response model.


339-375: LGTM: Dataset retrieval endpoint is correct.

The endpoint properly validates access by organization and project, returns appropriate 404 for missing datasets, and transforms the result to the response model.


378-408: LGTM: Dataset deletion endpoint is correct.

The endpoint properly validates access by organization and project, distinguishes between not-found and other errors, and returns an appropriate response.


568-591: LGTM: Evaluation runs listing endpoint is correct.

The endpoint properly delegates to the CRUD layer with organization and project scoping, and returns the appropriate response model.


594-627: LGTM: Evaluation run status endpoint is correct.

The endpoint properly validates access by organization and project through the CRUD layer, and returns appropriate 404 for inaccessible runs.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

♻️ Duplicate comments (6)
backend/app/api/routes/cron.py (1)

52-64: Raise HTTPException instead of returning HTTP 200 on error.

This exception handler logs the error but returns a JSON payload with HTTP 200 status. Clients (including cron monitoring) will interpret this as success and won't alert on actual failures. You must raise an HTTPException with a 5xx status code so the HTTP response properly reflects the error condition.

This issue was flagged in a previous review but remains unresolved.

Apply this diff to fix:

     except Exception as e:
         logger.error(
             f"[evaluation_cron_job] Error executing cron job: {e}",
             exc_info=True,
         )
-        return {
-            "status": "error",
-            "error": str(e),
-            "organizations_processed": 0,
-            "total_processed": 0,
-            "total_failed": 0,
-            "total_still_processing": 0,
-        }
+        raise HTTPException(
+            status_code=500,
+            detail=f"Cron job failed: {str(e)}"
+        )
backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py (2)

21-21: Add return type hint for upgrade.

Please declare the signature as def upgrade() -> None: to satisfy the project’s Python 3.11+ typing rule. As per coding guidelines.


232-232: Add return type hint for downgrade.

Please update the signature to def downgrade() -> None: in line with the typing guideline. As per coding guidelines.

backend/app/core/batch/openai.py (1)

17-24: Add the missing __init__ return annotation.

Project guidelines require explicit return types, and the prior review already called this out. Please add -> None so the constructor stays aligned with our Python 3.11+ typing policy.

Apply this diff:

-    def __init__(self, client: OpenAI):
+    def __init__(self, client: OpenAI) -> None:
backend/app/crud/evaluations/embeddings.py (1)

92-109: Don’t discard falsy-but-valid outputs.

if not generated_output or not ground_truth still skips legitimate zero/False answers, so whole embedding jobs can empty out again. We already called this out earlier—please tighten the guard to only drop truly missing or blank values and coerce non-strings to strings before building the payload.

Apply this diff:

-        generated_output = result.get("generated_output", "")
-        ground_truth = result.get("ground_truth", "")
+        generated_output = result.get("generated_output")
+        ground_truth = result.get("ground_truth")
@@
-        if not generated_output or not ground_truth:
-            logger.warning(f"Skipping item {item_id} - empty output or ground_truth")
-            continue
+        if generated_output is None or ground_truth is None:
+            logger.warning(
+                f"Skipping item {item_id} - output or ground_truth is missing"
+            )
+            continue
+
+        if isinstance(generated_output, str):
+            if not generated_output.strip():
+                logger.warning(
+                    f"Skipping item {item_id} - empty generated_output string"
+                )
+                continue
+        else:
+            generated_output = str(generated_output)
+
+        if isinstance(ground_truth, str):
+            if not ground_truth.strip():
+                logger.warning(
+                    f"Skipping item {item_id} - empty ground_truth string"
+                )
+                continue
+        else:
+            ground_truth = str(ground_truth)
backend/app/crud/evaluations/processing.py (1)

732-737: Count embedding outcomes instead of lumping them into “still processing”.

check_and_process_evaluation() can return "embeddings_completed" and "embeddings_failed". The current else branch drops both into total_still_processing_count, so finished embedding runs vanish from the processed/failed totals and the summary is wrong. Please bucket those actions explicitly.

-                    if result["action"] == "processed":
-                        total_processed_count += 1
-                    elif result["action"] == "failed":
-                        total_failed_count += 1
-                    else:
-                        total_still_processing_count += 1
+                    action = result["action"]
+                    if action in {"processed", "embeddings_completed"}:
+                        total_processed_count += 1
+                    elif action in {"failed", "embeddings_failed"}:
+                        total_failed_count += 1
+                    else:
+                        total_still_processing_count += 1
🧹 Nitpick comments (1)
backend/app/api/routes/cron.py (1)

21-23: Consider a more specific return type.

The return type dict is vague. Per the coding guidelines for Python type hints, consider dict[str, Any] or define a TypedDict with the expected keys (status, organizations_processed, total_processed, total_failed, total_still_processing, error).

 def evaluation_cron_job(
     session: SessionDep,
-) -> dict:
+) -> dict[str, Any]:
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 24a958e and 2b38293.

📒 Files selected for processing (10)
  • backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py (1 hunks)
  • backend/app/api/routes/cron.py (1 hunks)
  • backend/app/core/batch/__init__.py (1 hunks)
  • backend/app/core/batch/base.py (1 hunks)
  • backend/app/core/batch/openai.py (1 hunks)
  • backend/app/crud/batch_operations.py (1 hunks)
  • backend/app/crud/evaluations/batch.py (1 hunks)
  • backend/app/crud/evaluations/core.py (1 hunks)
  • backend/app/crud/evaluations/embeddings.py (1 hunks)
  • backend/app/crud/evaluations/processing.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • backend/app/core/batch/init.py
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py
  • backend/app/crud/batch_operations.py
  • backend/app/core/batch/base.py
  • backend/app/crud/evaluations/core.py
  • backend/app/crud/evaluations/batch.py
  • backend/app/core/batch/openai.py
  • backend/app/api/routes/cron.py
  • backend/app/crud/evaluations/processing.py
  • backend/app/crud/evaluations/embeddings.py
backend/app/crud/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Implement database access operations in backend/app/crud/

Files:

  • backend/app/crud/batch_operations.py
  • backend/app/crud/evaluations/core.py
  • backend/app/crud/evaluations/batch.py
  • backend/app/crud/evaluations/processing.py
  • backend/app/crud/evaluations/embeddings.py
backend/app/core/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Place core functionality (config, DB session, security, exceptions, middleware) in backend/app/core/

Files:

  • backend/app/core/batch/base.py
  • backend/app/core/batch/openai.py
backend/app/api/**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Expose FastAPI REST endpoints under backend/app/api/ organized by domain

Files:

  • backend/app/api/routes/cron.py
🧠 Learnings (1)
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to **/*.py : Use type hints in Python code (Python 3.11+ project)

Applied to files:

  • backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py
🧬 Code graph analysis (8)
backend/app/crud/batch_operations.py (5)
backend/app/core/batch/base.py (4)
  • BatchProvider (7-105)
  • create_batch (11-32)
  • get_batch_status (35-52)
  • download_batch_results (55-72)
backend/app/core/cloud/storage.py (1)
  • get_cloud_storage (262-279)
backend/app/core/storage_utils.py (1)
  • upload_jsonl_to_object_store (86-152)
backend/app/crud/batch_job.py (2)
  • create_batch_job (13-56)
  • update_batch_job (76-120)
backend/app/models/batch_job.py (3)
  • BatchJob (15-80)
  • BatchJobCreate (83-97)
  • BatchJobUpdate (100-109)
backend/app/core/batch/base.py (2)
backend/app/core/batch/openai.py (5)
  • create_batch (26-87)
  • get_batch_status (89-141)
  • download_batch_results (143-191)
  • upload_file (193-223)
  • download_file (225-254)
backend/app/crud/batch_operations.py (1)
  • download_batch_results (130-156)
backend/app/crud/evaluations/core.py (5)
backend/app/models/evaluation.py (2)
  • EvaluationRun (131-229)
  • DatasetUploadResponse (24-43)
backend/app/models/user.py (1)
  • UserProjectOrg (58-59)
backend/app/utils.py (1)
  • get_langfuse_client (209-245)
backend/app/core/langfuse/langfuse.py (1)
  • flush (108-109)
backend/app/api/routes/evaluation.py (1)
  • list_evaluation_runs (573-591)
backend/app/crud/evaluations/batch.py (3)
backend/app/core/batch/openai.py (1)
  • OpenAIBatchProvider (14-254)
backend/app/crud/batch_operations.py (1)
  • start_batch_job (20-86)
backend/app/models/evaluation.py (1)
  • EvaluationRun (131-229)
backend/app/core/batch/openai.py (2)
backend/app/core/batch/base.py (6)
  • BatchProvider (7-105)
  • create_batch (11-32)
  • upload_file (75-89)
  • get_batch_status (35-52)
  • download_batch_results (55-72)
  • download_file (92-105)
backend/app/crud/batch_operations.py (1)
  • download_batch_results (130-156)
backend/app/api/routes/cron.py (2)
backend/app/api/permissions.py (2)
  • Permission (10-15)
  • require_permission (45-70)
backend/app/crud/evaluations/cron.py (1)
  • process_all_pending_evaluations_sync (142-154)
backend/app/crud/evaluations/processing.py (8)
backend/app/core/batch/openai.py (2)
  • OpenAIBatchProvider (14-254)
  • download_batch_results (143-191)
backend/app/crud/batch_job.py (1)
  • get_batch_job (59-73)
backend/app/crud/batch_operations.py (3)
  • download_batch_results (130-156)
  • upload_batch_results_to_object_store (205-232)
  • poll_batch_status (89-127)
backend/app/crud/evaluations/batch.py (1)
  • fetch_dataset_items (24-59)
backend/app/crud/evaluations/core.py (1)
  • update_evaluation_run (261-308)
backend/app/crud/evaluations/embeddings.py (3)
  • calculate_average_similarity (255-331)
  • parse_embedding_results (132-218)
  • start_embedding_batch (334-434)
backend/app/crud/evaluations/langfuse.py (2)
  • create_langfuse_dataset_run (18-122)
  • update_traces_with_cosine_scores (125-178)
backend/app/utils.py (2)
  • get_langfuse_client (209-245)
  • get_openai_client (176-206)
backend/app/crud/evaluations/embeddings.py (3)
backend/app/core/batch/openai.py (1)
  • OpenAIBatchProvider (14-254)
backend/app/crud/batch_operations.py (1)
  • start_batch_job (20-86)
backend/app/models/evaluation.py (1)
  • EvaluationRun (131-229)
🔇 Additional comments (1)
backend/app/api/routes/cron.py (1)

13-13: LGTM!

Router initialization is correct with appropriate tagging.

Comment on lines +24 to +214
"batch_job",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"provider",
sa.String(),
nullable=False,
comment="LLM provider name (e.g., 'openai', 'anthropic')",
),
sa.Column(
"job_type",
sa.String(),
nullable=False,
comment="Type of batch job (e.g., 'evaluation', 'classification', 'embedding')",
),
sa.Column(
"config",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=sa.text("'{}'::jsonb"),
comment="Complete batch configuration",
),
sa.Column(
"provider_batch_id",
sa.String(),
nullable=True,
comment="Provider's batch job ID",
),
sa.Column(
"provider_file_id",
sa.String(),
nullable=True,
comment="Provider's input file ID",
),
sa.Column(
"provider_output_file_id",
sa.String(),
nullable=True,
comment="Provider's output file ID",
),
sa.Column(
"provider_status",
sa.String(),
nullable=True,
comment="Provider-specific status (e.g., OpenAI: validating, in_progress, completed, failed)",
),
sa.Column(
"raw_output_url",
sa.String(),
nullable=True,
comment="S3 URL of raw batch output file",
),
sa.Column(
"total_items",
sa.Integer(),
nullable=False,
server_default=sa.text("0"),
comment="Total number of items in the batch",
),
sa.Column(
"error_message",
sa.Text(),
nullable=True,
comment="Error message if batch failed",
),
sa.Column("organization_id", sa.Integer(), nullable=False),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.Column("inserted_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["organization_id"], ["organization.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
op.create_index(
op.f("ix_batch_job_job_type"), "batch_job", ["job_type"], unique=False
)
op.create_index(
op.f("ix_batch_job_organization_id"),
"batch_job",
["organization_id"],
unique=False,
)
op.create_index(
op.f("ix_batch_job_project_id"), "batch_job", ["project_id"], unique=False
)
op.create_index(
"idx_batch_job_status_org",
"batch_job",
["provider_status", "organization_id"],
unique=False,
)
op.create_index(
"idx_batch_job_status_project",
"batch_job",
["provider_status", "project_id"],
unique=False,
)

# Create evaluation_dataset table
op.create_table(
"evaluation_dataset",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("description", sqlmodel.sql.sqltypes.AutoString(), nullable=True),
sa.Column(
"dataset_metadata",
postgresql.JSONB(astext_type=sa.Text()),
nullable=False,
server_default=sa.text("'{}'::jsonb"),
),
sa.Column(
"object_store_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column(
"langfuse_dataset_id",
sqlmodel.sql.sqltypes.AutoString(),
nullable=True,
),
sa.Column("organization_id", sa.Integer(), nullable=False),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.Column("inserted_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["organization_id"], ["organization.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint(
"name",
"organization_id",
"project_id",
name="uq_evaluation_dataset_name_org_project",
),
)
op.create_index(
op.f("ix_evaluation_dataset_name"),
"evaluation_dataset",
["name"],
unique=False,
)

# Create evaluation_run table with all columns and foreign key references
op.create_table(
"evaluation_run",
sa.Column("run_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("dataset_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column("config", sa.JSON(), nullable=False),
sa.Column("batch_job_id", sa.Integer(), nullable=True),
sa.Column(
"embedding_batch_job_id",
sa.Integer(),
nullable=True,
comment="Reference to the batch_job for embedding-based similarity scoring",
),
sa.Column("dataset_id", sa.Integer(), nullable=False),
sa.Column("status", sqlmodel.sql.sqltypes.AutoString(), nullable=False),
sa.Column(
"object_store_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True
),
sa.Column("total_items", sa.Integer(), nullable=False),
sa.Column("score", sa.JSON(), nullable=True),
sa.Column("error_message", sa.Text(), nullable=True),
sa.Column("organization_id", sa.Integer(), nullable=False),
sa.Column("project_id", sa.Integer(), nullable=False),
sa.Column("id", sa.Integer(), nullable=False),
sa.Column("inserted_at", sa.DateTime(), nullable=False),
sa.Column("updated_at", sa.DateTime(), nullable=False),
sa.ForeignKeyConstraint(
["batch_job_id"],
["batch_job.id"],
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["embedding_batch_job_id"],
["batch_job.id"],
name="fk_evaluation_run_embedding_batch_job_id",
ondelete="SET NULL",
),
sa.ForeignKeyConstraint(
["dataset_id"],
["evaluation_dataset.id"],
name="fk_evaluation_run_dataset_id",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["organization_id"], ["organization.id"], ondelete="CASCADE"
),
sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"),
sa.PrimaryKeyConstraint("id"),
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Fix primary keys to autoincrement.

batch_job, evaluation_dataset, and evaluation_run declare id as plain Integer NOT NULL with a separate PrimaryKeyConstraint. Postgres will not create a sequence/default in this setup, so inserts coming from SQLModel (which send id=None) will fail with NULL value in column "id" violates not-null constraint. Mark the columns as primary keys (or add sa.Identity()) so the database autogenerates IDs.

-        sa.Column("id", sa.Integer(), nullable=False),
+        sa.Column("id", sa.Integer(), primary_key=True),
...
-        sa.Column("id", sa.Integer(), nullable=False),
+        sa.Column("id", sa.Integer(), primary_key=True),
...
-        sa.Column("id", sa.Integer(), nullable=False),
+        sa.Column("id", sa.Integer(), primary_key=True),
🤖 Prompt for AI Agents
In backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py
around lines 24 to 214, the id columns for batch_job, evaluation_dataset, and
evaluation_run are defined as plain Integer with a separate PrimaryKeyConstraint
which prevents Postgres from creating sequences and causes NULL id insert
failures; update each table definition to make the id column a real primary key
with an autogenerating identity (either mark the id column as primary_key=True
in the sa.Column definition or add sa.Identity()/server_default identity
expression) so Postgres will auto-generate ids on insert, and remove or keep the
separate PrimaryKeyConstraint accordingly to avoid duplicate/conflicting PK
declarations.

Comment on lines +1 to +11
import logging

from app.api.permissions import Permission, require_permission
from fastapi import APIRouter, Depends
from sqlmodel import Session

from app.api.deps import SessionDep, AuthContextDep
from app.crud.evaluations import process_all_pending_evaluations_sync
from app.models import User

logger = logging.getLogger(__name__)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion | 🟠 Major

Remove unused imports and add HTTPException.

Three imports are unused:

  • Session from sqlmodel (line 5)
  • AuthContextDep (line 7)
  • User (line 9)

Additionally, HTTPException is missing from the FastAPI import (line 4) but is required to properly fix the exception handling at lines 52-64.

Apply this diff:

 import logging

 from app.api.permissions import Permission, require_permission
-from fastapi import APIRouter, Depends
-from sqlmodel import Session
+from fastapi import APIRouter, Depends, HTTPException

-from app.api.deps import SessionDep, AuthContextDep
+from app.api.deps import SessionDep
 from app.crud.evaluations import process_all_pending_evaluations_sync
-from app.models import User
🤖 Prompt for AI Agents
In backend/app/api/routes/cron.py around lines 1 to 11, remove the unused
imports: Session (from sqlmodel), AuthContextDep, and User; and add
HTTPException to the FastAPI import list so exception handling later (lines
~52-64) can raise HTTPException properly; update the import line to include
HTTPException and delete the three unused import lines to keep imports clean.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
scripts/python/invoke-cron.py (1)

148-149: Validate the response body status field to avoid masking failures.

The code logs "Endpoint invoked successfully" based only on the HTTP 200 status code without checking the response body. If the API returns {"status": "error", ...}, the failure will be masked and monitoring will miss it.

Apply this diff to validate the response body:

                 result = await self.invoke_endpoint(client)
-                logger.info(f"Endpoint invoked successfully: {result}")
+                
+                # Check response body status field
+                status = result.get("status")
+                if status != "success":
+                    error_msg = result.get("message", "Unknown error")
+                    logger.error(f"Endpoint returned failure status: {status}, message: {error_msg}, full response: {result}")
+                    # Continue to next iteration rather than crash the cron
+                else:
+                    logger.info(f"Endpoint invoked successfully: {result}")
🧹 Nitpick comments (6)
scripts/python/invoke-cron.py (6)

18-19: Add type hints to module-level constants.

Per coding guidelines, type hints should be used throughout the Python code.

Apply this diff:

-ENDPOINT = "/api/v1/cron/evaluations"  # Endpoint to invoke
-REQUEST_TIMEOUT = 30  # Timeout for requests in seconds
+ENDPOINT: str = "/api/v1/cron/evaluations"  # Endpoint to invoke
+REQUEST_TIMEOUT: int = 30  # Timeout for requests in seconds

As per coding guidelines.


32-51: Add return type hint to __init__ method.

Per coding guidelines, all methods should have type hints.

Apply this diff:

-    def __init__(self):
+    def __init__(self) -> None:

As per coding guidelines.


53-84: Add return type hint and use logging.exception in exception handlers.

Per coding guidelines, type hints should be used. Additionally, logging.exception is preferred over logging.error in exception handlers as it automatically includes the traceback.

Apply this diff:

-    async def authenticate(self, client: httpx.AsyncClient) -> str:
+    async def authenticate(self, client: httpx.AsyncClient) -> str:
         """Authenticate and get access token."""
         logger.info("Authenticating with API...")
 
         login_data = {
             "username": self.email,
             "password": self.password,
         }
 
         try:
             response = await client.post(
                 f"{self.base_url}/api/v1/login/access-token",
                 data=login_data,
                 timeout=REQUEST_TIMEOUT,
             )
             response.raise_for_status()
 
             data = response.json()
             self.access_token = data.get("access_token")
 
             if not self.access_token:
                 raise ValueError("No access token in response")
 
             logger.info("Authentication successful")
             return self.access_token
 
         except httpx.HTTPStatusError as e:
-            logger.error(f"Authentication failed with status {e.response.status_code}")
+            logger.exception(f"Authentication failed with status {e.response.status_code}")
             raise
         except Exception as e:
-            logger.error(f"Authentication error: {e}")
+            logger.exception(f"Authentication error: {e}")
             raise

As per coding guidelines.


86-129: Add return type hint and use logging.exception in exception handlers.

Per coding guidelines, type hints should be used. Additionally, use logging.exception in exception handlers for automatic traceback inclusion.

Apply this diff:

-    async def invoke_endpoint(self, client: httpx.AsyncClient) -> dict:
+    async def invoke_endpoint(self, client: httpx.AsyncClient) -> dict:
         """Invoke the configured endpoint."""
         if not self.access_token:
             await self.authenticate(client)
 
         headers = {"Authorization": f"Bearer {self.access_token}"}
 
         # Debug: Log what we're sending
         logger.debug(f"Request URL: {self.base_url}{self.endpoint}")
         logger.debug(f"Request headers: {headers}")
 
         try:
             response = await client.get(
                 f"{self.base_url}{self.endpoint}",
                 headers=headers,
                 timeout=REQUEST_TIMEOUT,
             )
 
             # Debug: Log response headers and first part of body
             logger.debug(f"Response status: {response.status_code}")
             logger.debug(f"Response headers: {dict(response.headers)}")
 
             # If unauthorized, re-authenticate and retry once
             if response.status_code == 401:
                 logger.info("Token expired, re-authenticating...")
                 await self.authenticate(client)
                 headers = {"Authorization": f"Bearer {self.access_token}"}
                 response = await client.get(
                     f"{self.base_url}{self.endpoint}",
                     headers=headers,
                     timeout=REQUEST_TIMEOUT,
                 )
 
             response.raise_for_status()
             return response.json()
 
         except httpx.HTTPStatusError as e:
-            logger.error(
+            logger.exception(
                 f"Endpoint invocation failed with status {e.response.status_code}: {e.response.text}"
             )
             raise
         except Exception as e:
-            logger.error(f"Endpoint invocation error: {e}")
+            logger.exception(f"Endpoint invocation error: {e}")
             raise

As per coding guidelines.


131-168: Add return type hint and use logging.exception for error logging.

Per coding guidelines, add return type hint. Also prefer logging.exception in exception handlers.

Apply this diff:

-    async def run(self):
+    async def run(self) -> None:
         """Main loop to invoke endpoint periodically."""
         logger.info(f"Using API Base URL: {self.base_url}")
         logger.info(
             f"Starting cron job - invoking {self.endpoint} every {self.interval_minutes} minutes"
         )
 
         # Use async context manager to ensure proper cleanup
         async with httpx.AsyncClient() as client:
             # Authenticate once at startup
             await self.authenticate(client)
 
             while True:
                 try:
                     start_time = datetime.now()
                     logger.info(f"Invoking endpoint at {start_time}")
 
                     result = await self.invoke_endpoint(client)
                     logger.info(f"Endpoint invoked successfully: {result}")
 
                     # Calculate next invocation time
                     elapsed = (datetime.now() - start_time).total_seconds()
                     sleep_time = max(0, self.interval_seconds - elapsed)
 
                     if sleep_time > 0:
                         logger.info(
                             f"Sleeping for {sleep_time:.1f} seconds until next invocation"
                         )
                         await asyncio.sleep(sleep_time)
 
                 except KeyboardInterrupt:
                     logger.info("Shutting down gracefully...")
                     break
                 except Exception as e:
-                    logger.error(f"Error during invocation: {e}")
+                    logger.exception(f"Error during invocation: {e}")
                     # Wait before retrying on error
                     logger.info(f"Waiting {self.interval_seconds} seconds before retry")
                     await asyncio.sleep(self.interval_seconds)

As per coding guidelines.


171-189: Add return type hint and use logging.exception for fatal errors.

Per coding guidelines, add return type hint. Also use logging.exception to capture traceback for fatal errors.

Apply this diff:

-def main():
+def main() -> None:
     """Entry point for the script."""
     # Load environment variables
     env_path = Path(__file__).parent.parent.parent / ".env"
     if env_path.exists():
         load_dotenv(env_path)
         logger.info(f"Loaded environment from {env_path}")
     else:
         logger.warning(f"No .env file found at {env_path}")
 
     try:
         invoker = EndpointInvoker()
         asyncio.run(invoker.run())
     except KeyboardInterrupt:
         logger.info("Interrupted by user")
         sys.exit(0)
     except Exception as e:
-        logger.error(f"Fatal error: {e}")
+        logger.exception(f"Fatal error: {e}")
         sys.exit(1)

As per coding guidelines.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 2b38293 and dce502b.

📒 Files selected for processing (2)
  • .env.example (1 hunks)
  • scripts/python/invoke-cron.py (1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
.env.example

📄 CodeRabbit inference engine (CLAUDE.md)

Provide .env.example as the template for .env

Files:

  • .env.example
**/*.py

📄 CodeRabbit inference engine (CLAUDE.md)

Use type hints in Python code (Python 3.11+ project)

Files:

  • scripts/python/invoke-cron.py
🪛 Ruff (0.14.3)
scripts/python/invoke-cron.py

1-1: Shebang is present but file is not executable

(EXE001)


49-51: Avoid specifying long messages outside the exception class

(TRY003)


74-74: Abstract raise to an inner function

(TRY301)


74-74: Avoid specifying long messages outside the exception class

(TRY003)


77-77: Consider moving this statement to an else block

(TRY300)


80-80: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


83-83: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


123-125: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


128-128: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


164-164: Do not catch blind exception: Exception

(BLE001)


165-165: Use logging.exception instead of logging.error

Replace with exception

(TRY400)


187-187: Do not catch blind exception: Exception

(BLE001)


188-188: Use logging.exception instead of logging.error

Replace with exception

(TRY400)

🔇 Additional comments (1)
.env.example (1)

26-30: LGTM! Clear documentation for new configuration.

The new environment variables are well-documented with sensible defaults. The comments clearly explain their purpose and default behavior, making it easy for developers to understand the configuration options.

@@ -0,0 +1,193 @@
#!/usr/bin/env python3
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Make the file executable to match the shebang.

The shebang is present but the file lacks executable permissions.

Run this command to fix:

chmod +x scripts/python/invoke-cron.py
🧰 Tools
🪛 Ruff (0.14.3)

1-1: Shebang is present but file is not executable

(EXE001)

🤖 Prompt for AI Agents
In scripts/python/invoke-cron.py around line 1, the file contains a shebang but
is missing executable permissions; make the file executable by setting its
executable bit locally (so the shebang is honored), then commit the permission
change to the repo so the script runs as intended.

Copy link
Copy Markdown
Collaborator

@kartpop kartpop left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approved, lets merge in and release to staging; remnant issues can be taken up in the coming weeks and released with the unified API

@AkhileshNegi AkhileshNegi merged commit 5be0bdc into main Nov 6, 2025
2 of 3 checks passed
@AkhileshNegi AkhileshNegi deleted the feature/evaluation branch November 6, 2025 07:33
@AkhileshNegi AkhileshNegi linked an issue Nov 20, 2025 that may be closed by this pull request
@coderabbitai coderabbitai Bot mentioned this pull request Dec 26, 2025
2 tasks
This was referenced Jan 7, 2026
@coderabbitai coderabbitai Bot mentioned this pull request Jan 22, 2026
2 tasks
This was referenced Jan 31, 2026
This was referenced Feb 23, 2026
@coderabbitai coderabbitai Bot mentioned this pull request Apr 4, 2026
2 tasks
@coderabbitai coderabbitai Bot mentioned this pull request Apr 16, 2026
2 tasks
@coderabbitai coderabbitai Bot mentioned this pull request Apr 27, 2026
2 tasks
@coderabbitai coderabbitai Bot mentioned this pull request May 5, 2026
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Evaluation

3 participants