Evaluation#405
Conversation
- Remove project_id parameter from /evaluate endpoint - Update get_provider_credential calls to not require project_id - Credentials now retrieved via API key authentication - Clean up logging configuration and imports - Fix linting errors and update type annotations
WalkthroughAdds a full evaluation subsystem: dataset CSV upload/management, evaluation run lifecycle, batch provider abstractions with OpenAI batch integration, embedding-based scoring and Langfuse integration, cron-driven polling, DB models/migration, storage utilities, tests, docs, and small auth/endpoint adjustments. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant API as Evaluation API
participant DB as Database
participant Storage as Object Store
participant Langfuse
participant OpenAI as OpenAI Batch
participant Cron as Cron Worker
User->>API: POST /upload_dataset (CSV)
API->>Storage: (optional) upload CSV
API->>Langfuse: create dataset & upload items
API->>DB: store EvaluationDataset
API-->>User: DatasetUploadResponse
User->>API: POST /evaluations (dataset_id, config)
API->>DB: create EvaluationRun (pending)
API->>Langfuse: fetch dataset items
API->>API: build JSONL
API->>OpenAI: submit batch job
API->>DB: update run with batch_job_id
API-->>User: EvaluationRunPublic
Cron->>API: GET /cron/evaluations
API->>DB: query pending EvaluationRuns
loop per pending run
API->>OpenAI: poll batch status
alt complete
API->>OpenAI: download results
API->>Storage: (optional) upload results
API->>Langfuse: create traces
API->>OpenAI: start embedding batch
API->>DB: update run status/metrics
else not complete
API->>DB: no change
end
end
API-->>Cron: summary
sequenceDiagram
participant Embedding as Embedding Batch
participant OpenAI
participant Scorer
participant Langfuse
participant DB
Embedding->>OpenAI: poll embedding batch status
alt complete
Embedding->>OpenAI: download embedding results
Embedding->>Scorer: parse embeddings & pairings
Scorer->>Scorer: compute cosine similarities (per-item)
Scorer->>Scorer: aggregate mean/std
Scorer->>Langfuse: update traces with scores
Scorer->>DB: update EvaluationRun with scores
else failed
Embedding->>DB: mark run completed with error
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Areas requiring extra attention:
Possibly related PRs
Suggested labels
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
📜 Recent review detailsConfiguration used: CodeRabbit UI Review profile: CHILL Plan: Pro 📒 Files selected for processing (1)
🧰 Additional context used📓 Path-based instructions (2)**/*.py📄 CodeRabbit inference engine (CLAUDE.md)
Files:
backend/app/api/**/*.py📄 CodeRabbit inference engine (CLAUDE.md)
Files:
🧠 Learnings (2)📚 Learning: 2025-10-08T12:05:01.317ZApplied to files:
📚 Learning: 2025-10-08T12:05:01.317ZApplied to files:
🧬 Code graph analysis (1)backend/app/api/routes/evaluation.py (9)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (1)
🔇 Additional comments (8)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
There was a problem hiding this comment.
Actionable comments posted: 2
♻️ Duplicate comments (6)
backend/app/api/routes/cron.py (1)
52-64: Raise HTTPException instead of returning HTTP 200 on error.This exception handler logs the error but returns a JSON payload with HTTP 200 status. Clients (including cron monitoring) will interpret this as success and won't alert on actual failures. You must raise an
HTTPExceptionwith a 5xx status code so the HTTP response properly reflects the error condition.This issue was flagged in a previous review but remains unresolved.
Apply this diff to fix:
except Exception as e: logger.error( f"[evaluation_cron_job] Error executing cron job: {e}", exc_info=True, ) - return { - "status": "error", - "error": str(e), - "organizations_processed": 0, - "total_processed": 0, - "total_failed": 0, - "total_still_processing": 0, - } + raise HTTPException( + status_code=500, + detail=f"Cron job failed: {str(e)}" + )backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py (2)
21-21: Add return type hint for upgrade.Please declare the signature as
def upgrade() -> None:to satisfy the project’s Python 3.11+ typing rule. As per coding guidelines.
232-232: Add return type hint for downgrade.Please update the signature to
def downgrade() -> None:in line with the typing guideline. As per coding guidelines.backend/app/core/batch/openai.py (1)
17-24: Add the missing__init__return annotation.Project guidelines require explicit return types, and the prior review already called this out. Please add
-> Noneso the constructor stays aligned with our Python 3.11+ typing policy.Apply this diff:
- def __init__(self, client: OpenAI): + def __init__(self, client: OpenAI) -> None:backend/app/crud/evaluations/embeddings.py (1)
92-109: Don’t discard falsy-but-valid outputs.
if not generated_output or not ground_truthstill skips legitimate zero/False answers, so whole embedding jobs can empty out again. We already called this out earlier—please tighten the guard to only drop truly missing or blank values and coerce non-strings to strings before building the payload.Apply this diff:
- generated_output = result.get("generated_output", "") - ground_truth = result.get("ground_truth", "") + generated_output = result.get("generated_output") + ground_truth = result.get("ground_truth") @@ - if not generated_output or not ground_truth: - logger.warning(f"Skipping item {item_id} - empty output or ground_truth") - continue + if generated_output is None or ground_truth is None: + logger.warning( + f"Skipping item {item_id} - output or ground_truth is missing" + ) + continue + + if isinstance(generated_output, str): + if not generated_output.strip(): + logger.warning( + f"Skipping item {item_id} - empty generated_output string" + ) + continue + else: + generated_output = str(generated_output) + + if isinstance(ground_truth, str): + if not ground_truth.strip(): + logger.warning( + f"Skipping item {item_id} - empty ground_truth string" + ) + continue + else: + ground_truth = str(ground_truth)backend/app/crud/evaluations/processing.py (1)
732-737: Count embedding outcomes instead of lumping them into “still processing”.
check_and_process_evaluation()can return"embeddings_completed"and"embeddings_failed". The current else branch drops both intototal_still_processing_count, so finished embedding runs vanish from the processed/failed totals and the summary is wrong. Please bucket those actions explicitly.- if result["action"] == "processed": - total_processed_count += 1 - elif result["action"] == "failed": - total_failed_count += 1 - else: - total_still_processing_count += 1 + action = result["action"] + if action in {"processed", "embeddings_completed"}: + total_processed_count += 1 + elif action in {"failed", "embeddings_failed"}: + total_failed_count += 1 + else: + total_still_processing_count += 1
🧹 Nitpick comments (1)
backend/app/api/routes/cron.py (1)
21-23: Consider a more specific return type.The return type
dictis vague. Per the coding guidelines for Python type hints, considerdict[str, Any]or define a TypedDict with the expected keys (status, organizations_processed, total_processed, total_failed, total_still_processing, error).def evaluation_cron_job( session: SessionDep, -) -> dict: +) -> dict[str, Any]:
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (10)
backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py(1 hunks)backend/app/api/routes/cron.py(1 hunks)backend/app/core/batch/__init__.py(1 hunks)backend/app/core/batch/base.py(1 hunks)backend/app/core/batch/openai.py(1 hunks)backend/app/crud/batch_operations.py(1 hunks)backend/app/crud/evaluations/batch.py(1 hunks)backend/app/crud/evaluations/core.py(1 hunks)backend/app/crud/evaluations/embeddings.py(1 hunks)backend/app/crud/evaluations/processing.py(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- backend/app/core/batch/init.py
🧰 Additional context used
📓 Path-based instructions (4)
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use type hints in Python code (Python 3.11+ project)
Files:
backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.pybackend/app/crud/batch_operations.pybackend/app/core/batch/base.pybackend/app/crud/evaluations/core.pybackend/app/crud/evaluations/batch.pybackend/app/core/batch/openai.pybackend/app/api/routes/cron.pybackend/app/crud/evaluations/processing.pybackend/app/crud/evaluations/embeddings.py
backend/app/crud/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Implement database access operations in backend/app/crud/
Files:
backend/app/crud/batch_operations.pybackend/app/crud/evaluations/core.pybackend/app/crud/evaluations/batch.pybackend/app/crud/evaluations/processing.pybackend/app/crud/evaluations/embeddings.py
backend/app/core/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Place core functionality (config, DB session, security, exceptions, middleware) in backend/app/core/
Files:
backend/app/core/batch/base.pybackend/app/core/batch/openai.py
backend/app/api/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Expose FastAPI REST endpoints under backend/app/api/ organized by domain
Files:
backend/app/api/routes/cron.py
🧠 Learnings (1)
📚 Learning: 2025-10-08T12:05:01.317Z
Learnt from: CR
Repo: ProjectTech4DevAI/ai-platform PR: 0
File: CLAUDE.md:0-0
Timestamp: 2025-10-08T12:05:01.317Z
Learning: Applies to **/*.py : Use type hints in Python code (Python 3.11+ project)
Applied to files:
backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py
🧬 Code graph analysis (8)
backend/app/crud/batch_operations.py (5)
backend/app/core/batch/base.py (4)
BatchProvider(7-105)create_batch(11-32)get_batch_status(35-52)download_batch_results(55-72)backend/app/core/cloud/storage.py (1)
get_cloud_storage(262-279)backend/app/core/storage_utils.py (1)
upload_jsonl_to_object_store(86-152)backend/app/crud/batch_job.py (2)
create_batch_job(13-56)update_batch_job(76-120)backend/app/models/batch_job.py (3)
BatchJob(15-80)BatchJobCreate(83-97)BatchJobUpdate(100-109)
backend/app/core/batch/base.py (2)
backend/app/core/batch/openai.py (5)
create_batch(26-87)get_batch_status(89-141)download_batch_results(143-191)upload_file(193-223)download_file(225-254)backend/app/crud/batch_operations.py (1)
download_batch_results(130-156)
backend/app/crud/evaluations/core.py (5)
backend/app/models/evaluation.py (2)
EvaluationRun(131-229)DatasetUploadResponse(24-43)backend/app/models/user.py (1)
UserProjectOrg(58-59)backend/app/utils.py (1)
get_langfuse_client(209-245)backend/app/core/langfuse/langfuse.py (1)
flush(108-109)backend/app/api/routes/evaluation.py (1)
list_evaluation_runs(573-591)
backend/app/crud/evaluations/batch.py (3)
backend/app/core/batch/openai.py (1)
OpenAIBatchProvider(14-254)backend/app/crud/batch_operations.py (1)
start_batch_job(20-86)backend/app/models/evaluation.py (1)
EvaluationRun(131-229)
backend/app/core/batch/openai.py (2)
backend/app/core/batch/base.py (6)
BatchProvider(7-105)create_batch(11-32)upload_file(75-89)get_batch_status(35-52)download_batch_results(55-72)download_file(92-105)backend/app/crud/batch_operations.py (1)
download_batch_results(130-156)
backend/app/api/routes/cron.py (2)
backend/app/api/permissions.py (2)
Permission(10-15)require_permission(45-70)backend/app/crud/evaluations/cron.py (1)
process_all_pending_evaluations_sync(142-154)
backend/app/crud/evaluations/processing.py (8)
backend/app/core/batch/openai.py (2)
OpenAIBatchProvider(14-254)download_batch_results(143-191)backend/app/crud/batch_job.py (1)
get_batch_job(59-73)backend/app/crud/batch_operations.py (3)
download_batch_results(130-156)upload_batch_results_to_object_store(205-232)poll_batch_status(89-127)backend/app/crud/evaluations/batch.py (1)
fetch_dataset_items(24-59)backend/app/crud/evaluations/core.py (1)
update_evaluation_run(261-308)backend/app/crud/evaluations/embeddings.py (3)
calculate_average_similarity(255-331)parse_embedding_results(132-218)start_embedding_batch(334-434)backend/app/crud/evaluations/langfuse.py (2)
create_langfuse_dataset_run(18-122)update_traces_with_cosine_scores(125-178)backend/app/utils.py (2)
get_langfuse_client(209-245)get_openai_client(176-206)
backend/app/crud/evaluations/embeddings.py (3)
backend/app/core/batch/openai.py (1)
OpenAIBatchProvider(14-254)backend/app/crud/batch_operations.py (1)
start_batch_job(20-86)backend/app/models/evaluation.py (1)
EvaluationRun(131-229)
🔇 Additional comments (1)
backend/app/api/routes/cron.py (1)
13-13: LGTM!Router initialization is correct with appropriate tagging.
| "batch_job", | ||
| sa.Column("id", sa.Integer(), nullable=False), | ||
| sa.Column( | ||
| "provider", | ||
| sa.String(), | ||
| nullable=False, | ||
| comment="LLM provider name (e.g., 'openai', 'anthropic')", | ||
| ), | ||
| sa.Column( | ||
| "job_type", | ||
| sa.String(), | ||
| nullable=False, | ||
| comment="Type of batch job (e.g., 'evaluation', 'classification', 'embedding')", | ||
| ), | ||
| sa.Column( | ||
| "config", | ||
| postgresql.JSONB(astext_type=sa.Text()), | ||
| nullable=False, | ||
| server_default=sa.text("'{}'::jsonb"), | ||
| comment="Complete batch configuration", | ||
| ), | ||
| sa.Column( | ||
| "provider_batch_id", | ||
| sa.String(), | ||
| nullable=True, | ||
| comment="Provider's batch job ID", | ||
| ), | ||
| sa.Column( | ||
| "provider_file_id", | ||
| sa.String(), | ||
| nullable=True, | ||
| comment="Provider's input file ID", | ||
| ), | ||
| sa.Column( | ||
| "provider_output_file_id", | ||
| sa.String(), | ||
| nullable=True, | ||
| comment="Provider's output file ID", | ||
| ), | ||
| sa.Column( | ||
| "provider_status", | ||
| sa.String(), | ||
| nullable=True, | ||
| comment="Provider-specific status (e.g., OpenAI: validating, in_progress, completed, failed)", | ||
| ), | ||
| sa.Column( | ||
| "raw_output_url", | ||
| sa.String(), | ||
| nullable=True, | ||
| comment="S3 URL of raw batch output file", | ||
| ), | ||
| sa.Column( | ||
| "total_items", | ||
| sa.Integer(), | ||
| nullable=False, | ||
| server_default=sa.text("0"), | ||
| comment="Total number of items in the batch", | ||
| ), | ||
| sa.Column( | ||
| "error_message", | ||
| sa.Text(), | ||
| nullable=True, | ||
| comment="Error message if batch failed", | ||
| ), | ||
| sa.Column("organization_id", sa.Integer(), nullable=False), | ||
| sa.Column("project_id", sa.Integer(), nullable=False), | ||
| sa.Column("inserted_at", sa.DateTime(), nullable=False), | ||
| sa.Column("updated_at", sa.DateTime(), nullable=False), | ||
| sa.ForeignKeyConstraint( | ||
| ["organization_id"], ["organization.id"], ondelete="CASCADE" | ||
| ), | ||
| sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"), | ||
| sa.PrimaryKeyConstraint("id"), | ||
| ) | ||
| op.create_index( | ||
| op.f("ix_batch_job_job_type"), "batch_job", ["job_type"], unique=False | ||
| ) | ||
| op.create_index( | ||
| op.f("ix_batch_job_organization_id"), | ||
| "batch_job", | ||
| ["organization_id"], | ||
| unique=False, | ||
| ) | ||
| op.create_index( | ||
| op.f("ix_batch_job_project_id"), "batch_job", ["project_id"], unique=False | ||
| ) | ||
| op.create_index( | ||
| "idx_batch_job_status_org", | ||
| "batch_job", | ||
| ["provider_status", "organization_id"], | ||
| unique=False, | ||
| ) | ||
| op.create_index( | ||
| "idx_batch_job_status_project", | ||
| "batch_job", | ||
| ["provider_status", "project_id"], | ||
| unique=False, | ||
| ) | ||
|
|
||
| # Create evaluation_dataset table | ||
| op.create_table( | ||
| "evaluation_dataset", | ||
| sa.Column("id", sa.Integer(), nullable=False), | ||
| sa.Column("name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), | ||
| sa.Column("description", sqlmodel.sql.sqltypes.AutoString(), nullable=True), | ||
| sa.Column( | ||
| "dataset_metadata", | ||
| postgresql.JSONB(astext_type=sa.Text()), | ||
| nullable=False, | ||
| server_default=sa.text("'{}'::jsonb"), | ||
| ), | ||
| sa.Column( | ||
| "object_store_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True | ||
| ), | ||
| sa.Column( | ||
| "langfuse_dataset_id", | ||
| sqlmodel.sql.sqltypes.AutoString(), | ||
| nullable=True, | ||
| ), | ||
| sa.Column("organization_id", sa.Integer(), nullable=False), | ||
| sa.Column("project_id", sa.Integer(), nullable=False), | ||
| sa.Column("inserted_at", sa.DateTime(), nullable=False), | ||
| sa.Column("updated_at", sa.DateTime(), nullable=False), | ||
| sa.ForeignKeyConstraint( | ||
| ["organization_id"], ["organization.id"], ondelete="CASCADE" | ||
| ), | ||
| sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"), | ||
| sa.PrimaryKeyConstraint("id"), | ||
| sa.UniqueConstraint( | ||
| "name", | ||
| "organization_id", | ||
| "project_id", | ||
| name="uq_evaluation_dataset_name_org_project", | ||
| ), | ||
| ) | ||
| op.create_index( | ||
| op.f("ix_evaluation_dataset_name"), | ||
| "evaluation_dataset", | ||
| ["name"], | ||
| unique=False, | ||
| ) | ||
|
|
||
| # Create evaluation_run table with all columns and foreign key references | ||
| op.create_table( | ||
| "evaluation_run", | ||
| sa.Column("run_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), | ||
| sa.Column("dataset_name", sqlmodel.sql.sqltypes.AutoString(), nullable=False), | ||
| sa.Column("config", sa.JSON(), nullable=False), | ||
| sa.Column("batch_job_id", sa.Integer(), nullable=True), | ||
| sa.Column( | ||
| "embedding_batch_job_id", | ||
| sa.Integer(), | ||
| nullable=True, | ||
| comment="Reference to the batch_job for embedding-based similarity scoring", | ||
| ), | ||
| sa.Column("dataset_id", sa.Integer(), nullable=False), | ||
| sa.Column("status", sqlmodel.sql.sqltypes.AutoString(), nullable=False), | ||
| sa.Column( | ||
| "object_store_url", sqlmodel.sql.sqltypes.AutoString(), nullable=True | ||
| ), | ||
| sa.Column("total_items", sa.Integer(), nullable=False), | ||
| sa.Column("score", sa.JSON(), nullable=True), | ||
| sa.Column("error_message", sa.Text(), nullable=True), | ||
| sa.Column("organization_id", sa.Integer(), nullable=False), | ||
| sa.Column("project_id", sa.Integer(), nullable=False), | ||
| sa.Column("id", sa.Integer(), nullable=False), | ||
| sa.Column("inserted_at", sa.DateTime(), nullable=False), | ||
| sa.Column("updated_at", sa.DateTime(), nullable=False), | ||
| sa.ForeignKeyConstraint( | ||
| ["batch_job_id"], | ||
| ["batch_job.id"], | ||
| ondelete="SET NULL", | ||
| ), | ||
| sa.ForeignKeyConstraint( | ||
| ["embedding_batch_job_id"], | ||
| ["batch_job.id"], | ||
| name="fk_evaluation_run_embedding_batch_job_id", | ||
| ondelete="SET NULL", | ||
| ), | ||
| sa.ForeignKeyConstraint( | ||
| ["dataset_id"], | ||
| ["evaluation_dataset.id"], | ||
| name="fk_evaluation_run_dataset_id", | ||
| ondelete="CASCADE", | ||
| ), | ||
| sa.ForeignKeyConstraint( | ||
| ["organization_id"], ["organization.id"], ondelete="CASCADE" | ||
| ), | ||
| sa.ForeignKeyConstraint(["project_id"], ["project.id"], ondelete="CASCADE"), | ||
| sa.PrimaryKeyConstraint("id"), | ||
| ) |
There was a problem hiding this comment.
Fix primary keys to autoincrement.
batch_job, evaluation_dataset, and evaluation_run declare id as plain Integer NOT NULL with a separate PrimaryKeyConstraint. Postgres will not create a sequence/default in this setup, so inserts coming from SQLModel (which send id=None) will fail with NULL value in column "id" violates not-null constraint. Mark the columns as primary keys (or add sa.Identity()) so the database autogenerates IDs.
- sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("id", sa.Integer(), primary_key=True),
...
- sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("id", sa.Integer(), primary_key=True),
...
- sa.Column("id", sa.Integer(), nullable=False),
+ sa.Column("id", sa.Integer(), primary_key=True),🤖 Prompt for AI Agents
In backend/app/alembic/versions/6fe772038a5a_create_evaluation_run_table.py
around lines 24 to 214, the id columns for batch_job, evaluation_dataset, and
evaluation_run are defined as plain Integer with a separate PrimaryKeyConstraint
which prevents Postgres from creating sequences and causes NULL id insert
failures; update each table definition to make the id column a real primary key
with an autogenerating identity (either mark the id column as primary_key=True
in the sa.Column definition or add sa.Identity()/server_default identity
expression) so Postgres will auto-generate ids on insert, and remove or keep the
separate PrimaryKeyConstraint accordingly to avoid duplicate/conflicting PK
declarations.
| import logging | ||
|
|
||
| from app.api.permissions import Permission, require_permission | ||
| from fastapi import APIRouter, Depends | ||
| from sqlmodel import Session | ||
|
|
||
| from app.api.deps import SessionDep, AuthContextDep | ||
| from app.crud.evaluations import process_all_pending_evaluations_sync | ||
| from app.models import User | ||
|
|
||
| logger = logging.getLogger(__name__) |
There was a problem hiding this comment.
🛠️ Refactor suggestion | 🟠 Major
Remove unused imports and add HTTPException.
Three imports are unused:
Sessionfrom sqlmodel (line 5)AuthContextDep(line 7)User(line 9)
Additionally, HTTPException is missing from the FastAPI import (line 4) but is required to properly fix the exception handling at lines 52-64.
Apply this diff:
import logging
from app.api.permissions import Permission, require_permission
-from fastapi import APIRouter, Depends
-from sqlmodel import Session
+from fastapi import APIRouter, Depends, HTTPException
-from app.api.deps import SessionDep, AuthContextDep
+from app.api.deps import SessionDep
from app.crud.evaluations import process_all_pending_evaluations_sync
-from app.models import User🤖 Prompt for AI Agents
In backend/app/api/routes/cron.py around lines 1 to 11, remove the unused
imports: Session (from sqlmodel), AuthContextDep, and User; and add
HTTPException to the FastAPI import list so exception handling later (lines
~52-64) can raise HTTPException properly; update the import line to include
HTTPException and delete the three unused import lines to keep imports clean.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
scripts/python/invoke-cron.py (1)
148-149: Validate the response body status field to avoid masking failures.The code logs "Endpoint invoked successfully" based only on the HTTP 200 status code without checking the response body. If the API returns
{"status": "error", ...}, the failure will be masked and monitoring will miss it.Apply this diff to validate the response body:
result = await self.invoke_endpoint(client) - logger.info(f"Endpoint invoked successfully: {result}") + + # Check response body status field + status = result.get("status") + if status != "success": + error_msg = result.get("message", "Unknown error") + logger.error(f"Endpoint returned failure status: {status}, message: {error_msg}, full response: {result}") + # Continue to next iteration rather than crash the cron + else: + logger.info(f"Endpoint invoked successfully: {result}")
🧹 Nitpick comments (6)
scripts/python/invoke-cron.py (6)
18-19: Add type hints to module-level constants.Per coding guidelines, type hints should be used throughout the Python code.
Apply this diff:
-ENDPOINT = "/api/v1/cron/evaluations" # Endpoint to invoke -REQUEST_TIMEOUT = 30 # Timeout for requests in seconds +ENDPOINT: str = "/api/v1/cron/evaluations" # Endpoint to invoke +REQUEST_TIMEOUT: int = 30 # Timeout for requests in secondsAs per coding guidelines.
32-51: Add return type hint to__init__method.Per coding guidelines, all methods should have type hints.
Apply this diff:
- def __init__(self): + def __init__(self) -> None:As per coding guidelines.
53-84: Add return type hint and uselogging.exceptionin exception handlers.Per coding guidelines, type hints should be used. Additionally,
logging.exceptionis preferred overlogging.errorin exception handlers as it automatically includes the traceback.Apply this diff:
- async def authenticate(self, client: httpx.AsyncClient) -> str: + async def authenticate(self, client: httpx.AsyncClient) -> str: """Authenticate and get access token.""" logger.info("Authenticating with API...") login_data = { "username": self.email, "password": self.password, } try: response = await client.post( f"{self.base_url}/api/v1/login/access-token", data=login_data, timeout=REQUEST_TIMEOUT, ) response.raise_for_status() data = response.json() self.access_token = data.get("access_token") if not self.access_token: raise ValueError("No access token in response") logger.info("Authentication successful") return self.access_token except httpx.HTTPStatusError as e: - logger.error(f"Authentication failed with status {e.response.status_code}") + logger.exception(f"Authentication failed with status {e.response.status_code}") raise except Exception as e: - logger.error(f"Authentication error: {e}") + logger.exception(f"Authentication error: {e}") raiseAs per coding guidelines.
86-129: Add return type hint and uselogging.exceptionin exception handlers.Per coding guidelines, type hints should be used. Additionally, use
logging.exceptionin exception handlers for automatic traceback inclusion.Apply this diff:
- async def invoke_endpoint(self, client: httpx.AsyncClient) -> dict: + async def invoke_endpoint(self, client: httpx.AsyncClient) -> dict: """Invoke the configured endpoint.""" if not self.access_token: await self.authenticate(client) headers = {"Authorization": f"Bearer {self.access_token}"} # Debug: Log what we're sending logger.debug(f"Request URL: {self.base_url}{self.endpoint}") logger.debug(f"Request headers: {headers}") try: response = await client.get( f"{self.base_url}{self.endpoint}", headers=headers, timeout=REQUEST_TIMEOUT, ) # Debug: Log response headers and first part of body logger.debug(f"Response status: {response.status_code}") logger.debug(f"Response headers: {dict(response.headers)}") # If unauthorized, re-authenticate and retry once if response.status_code == 401: logger.info("Token expired, re-authenticating...") await self.authenticate(client) headers = {"Authorization": f"Bearer {self.access_token}"} response = await client.get( f"{self.base_url}{self.endpoint}", headers=headers, timeout=REQUEST_TIMEOUT, ) response.raise_for_status() return response.json() except httpx.HTTPStatusError as e: - logger.error( + logger.exception( f"Endpoint invocation failed with status {e.response.status_code}: {e.response.text}" ) raise except Exception as e: - logger.error(f"Endpoint invocation error: {e}") + logger.exception(f"Endpoint invocation error: {e}") raiseAs per coding guidelines.
131-168: Add return type hint and uselogging.exceptionfor error logging.Per coding guidelines, add return type hint. Also prefer
logging.exceptionin exception handlers.Apply this diff:
- async def run(self): + async def run(self) -> None: """Main loop to invoke endpoint periodically.""" logger.info(f"Using API Base URL: {self.base_url}") logger.info( f"Starting cron job - invoking {self.endpoint} every {self.interval_minutes} minutes" ) # Use async context manager to ensure proper cleanup async with httpx.AsyncClient() as client: # Authenticate once at startup await self.authenticate(client) while True: try: start_time = datetime.now() logger.info(f"Invoking endpoint at {start_time}") result = await self.invoke_endpoint(client) logger.info(f"Endpoint invoked successfully: {result}") # Calculate next invocation time elapsed = (datetime.now() - start_time).total_seconds() sleep_time = max(0, self.interval_seconds - elapsed) if sleep_time > 0: logger.info( f"Sleeping for {sleep_time:.1f} seconds until next invocation" ) await asyncio.sleep(sleep_time) except KeyboardInterrupt: logger.info("Shutting down gracefully...") break except Exception as e: - logger.error(f"Error during invocation: {e}") + logger.exception(f"Error during invocation: {e}") # Wait before retrying on error logger.info(f"Waiting {self.interval_seconds} seconds before retry") await asyncio.sleep(self.interval_seconds)As per coding guidelines.
171-189: Add return type hint and uselogging.exceptionfor fatal errors.Per coding guidelines, add return type hint. Also use
logging.exceptionto capture traceback for fatal errors.Apply this diff:
-def main(): +def main() -> None: """Entry point for the script.""" # Load environment variables env_path = Path(__file__).parent.parent.parent / ".env" if env_path.exists(): load_dotenv(env_path) logger.info(f"Loaded environment from {env_path}") else: logger.warning(f"No .env file found at {env_path}") try: invoker = EndpointInvoker() asyncio.run(invoker.run()) except KeyboardInterrupt: logger.info("Interrupted by user") sys.exit(0) except Exception as e: - logger.error(f"Fatal error: {e}") + logger.exception(f"Fatal error: {e}") sys.exit(1)As per coding guidelines.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
.env.example(1 hunks)scripts/python/invoke-cron.py(1 hunks)
🧰 Additional context used
📓 Path-based instructions (2)
.env.example
📄 CodeRabbit inference engine (CLAUDE.md)
Provide .env.example as the template for .env
Files:
.env.example
**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use type hints in Python code (Python 3.11+ project)
Files:
scripts/python/invoke-cron.py
🪛 Ruff (0.14.3)
scripts/python/invoke-cron.py
1-1: Shebang is present but file is not executable
(EXE001)
49-51: Avoid specifying long messages outside the exception class
(TRY003)
74-74: Abstract raise to an inner function
(TRY301)
74-74: Avoid specifying long messages outside the exception class
(TRY003)
77-77: Consider moving this statement to an else block
(TRY300)
80-80: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
83-83: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
123-125: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
128-128: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
164-164: Do not catch blind exception: Exception
(BLE001)
165-165: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
187-187: Do not catch blind exception: Exception
(BLE001)
188-188: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🔇 Additional comments (1)
.env.example (1)
26-30: LGTM! Clear documentation for new configuration.The new environment variables are well-documented with sensible defaults. The comments clearly explain their purpose and default behavior, making it easy for developers to understand the configuration options.
| @@ -0,0 +1,193 @@ | |||
| #!/usr/bin/env python3 | |||
There was a problem hiding this comment.
Make the file executable to match the shebang.
The shebang is present but the file lacks executable permissions.
Run this command to fix:
chmod +x scripts/python/invoke-cron.py🧰 Tools
🪛 Ruff (0.14.3)
1-1: Shebang is present but file is not executable
(EXE001)
🤖 Prompt for AI Agents
In scripts/python/invoke-cron.py around line 1, the file contains a shebang but
is missing executable permissions; make the file executable by setting its
executable bit locally (so the shebang is honored), then commit the permission
change to the repo so the script runs as intended.
kartpop
left a comment
There was a problem hiding this comment.
approved, lets merge in and release to staging; remnant issues can be taken up in the coming weeks and released with the unified API
Summary
Target issue is #417
Checklist
Before submitting a pull request, please ensure that you mark these task.
fastapi run --reload app/main.pyordocker compose upin the repository root and test.Notes
Here is the dataflow
Upload CSV → Langfuse Dataset (5x duplication)
Start Evaluation → Create EvaluationRun (pending)
Fetch Dataset → Build JSONL for OpenAI Batch API
Upload JSONL → OpenAI Files API
Create Batch → OpenAI Batch API (returns batch_id)
Update DB → BatchJob + EvaluationRun (processing)
Wait → OpenAI processes batch async (1-24 hours)
Celery Beat → Polls every 60 seconds
Check Status → Poll OpenAI batch status
When Response Batch Completed:
Build Embedding JSONL → [output, ground_truth] pairs per trace
Create Embedding Batch → OpenAI Embeddings API (returns batch_id)
Update DB → Link embedding_batch_job_id (status: processing)
Wait → OpenAI processes embeddings (1-24 hours)
Celery Beat → Polls embedding batch status
When Embedding Batch Completed:
User views results via API or Langfuse UI
Summary by CodeRabbit
New Features
Bug Fixes