diff --git a/backend/app/crud/evaluations/processing.py b/backend/app/crud/evaluations/processing.py index 1e36fd13b..1fa82b39f 100644 --- a/backend/app/crud/evaluations/processing.py +++ b/backend/app/crud/evaluations/processing.py @@ -37,13 +37,82 @@ create_langfuse_dataset_run, update_traces_with_cosine_scores, ) -from app.crud.job import get_batch_job +from app.crud.job import get_batch_job, update_batch_job from app.models import EvaluationRun +from app.models.batch_job import BatchJob, BatchJobUpdate from app.utils import get_langfuse_client, get_openai_client logger = logging.getLogger(__name__) +def _extract_batch_error_message( + provider: OpenAIBatchProvider, + error_file_id: str, + batch_job: BatchJob, + session: Session, +) -> str: + """ + Download the error file from OpenAI, parse JSONL entries, and extract + the most common error message. Updates batch_job.error_message. + + Args: + provider: OpenAI batch provider instance + error_file_id: OpenAI error file ID + batch_job: BatchJob to update with error message + session: Database session + + Returns: + Human-readable error message with the top error and counts + """ + try: + error_content = provider.download_file(error_file_id) + lines = error_content.strip().split("\n") + + error_counts: dict[str, int] = {} + for line in lines: + try: + entry = json.loads(line) + message = ( + entry.get("response", {}) + .get("body", {}) + .get("error", {}) + .get("message", "Unknown error") + ) + error_counts[message] = error_counts.get(message, 0) + 1 + except json.JSONDecodeError: + continue + + if error_counts: + top_error = max(error_counts, key=error_counts.get) + top_count = error_counts[top_error] + total = sum(error_counts.values()) + error_msg = f"{top_error} ({top_count}/{total} requests)" + else: + error_msg = "Batch completed with errors but could not parse error file" + + except Exception as e: + logger.error( + f"[_extract_batch_error_message] Failed to extract errors | batch_job_id={batch_job.id} | {e}", + exc_info=True, + ) + error_msg = ( + f"Batch completed with all requests failed (error_file_id: {error_file_id})" + ) + + # Update batch_job with extracted error message (outside try/except + # so persistence failures propagate to the caller) + batch_job_update = BatchJobUpdate(error_message=error_msg) + update_batch_job( + session=session, batch_job=batch_job, batch_job_update=batch_job_update + ) + + logger.info( + f"[_extract_batch_error_message] Extracted error | batch_job_id={batch_job.id} | {error_msg}" + ) + + return error_msg + + def parse_evaluation_output( raw_results: list[dict[str, Any]], dataset_items: list[dict[str, Any]] ) -> list[dict[str, Any]]: @@ -560,7 +629,9 @@ async def check_and_process_evaluation( # IMPORTANT: Poll OpenAI to get the latest status before checking provider = OpenAIBatchProvider(client=openai_client) - poll_batch_status(session=session, provider=provider, batch_job=batch_job) + status_result = poll_batch_status( + session=session, provider=provider, batch_job=batch_job + ) # Refresh batch_job to get the updated provider_status session.refresh(batch_job) @@ -568,6 +639,39 @@ async def check_and_process_evaluation( # Handle different provider statuses if provider_status == "completed": + # Check if batch completed but all requests failed + # (output_file_id is absent, error_file_id is present) + if not status_result.get( + "provider_output_file_id", batch_job.provider_output_file_id + ) and status_result.get("error_file_id"): + error_msg = _extract_batch_error_message( + provider=provider, + error_file_id=status_result["error_file_id"], + batch_job=batch_job, + session=session, + ) + + eval_run = update_evaluation_run( + session=session, + eval_run=eval_run, + status="failed", + error_message=error_msg, + ) + + logger.error( + f"[check_and_process_evaluation] {log_prefix} Batch completed with all requests failed | {error_msg}" + ) + + return { + "run_id": eval_run.id, + "run_name": eval_run.run_name, + "previous_status": previous_status, + "current_status": "failed", + "provider_status": provider_status, + "action": "failed", + "error": error_msg, + } + # Process the completed evaluation await process_completed_evaluation( eval_run=eval_run, diff --git a/backend/app/tests/crud/evaluations/test_processing.py b/backend/app/tests/crud/evaluations/test_processing.py index 29d62244d..52162654d 100644 --- a/backend/app/tests/crud/evaluations/test_processing.py +++ b/backend/app/tests/crud/evaluations/test_processing.py @@ -6,6 +6,7 @@ from sqlmodel import Session, select from app.crud.evaluations.processing import ( + _extract_batch_error_message, check_and_process_evaluation, parse_evaluation_output, process_completed_embedding_batch, @@ -653,11 +654,12 @@ async def test_check_and_process_evaluation_completed( db, project_id=test_dataset.project_id, use_kaapi_schema=True ) - # Create batch job + # Create batch job with output file (successful completion) batch_job = BatchJob( provider="openai", provider_batch_id="batch_abc", provider_status="completed", + provider_output_file_id="output-file-123", job_type=BatchJobType.EVALUATION, total_items=2, status="submitted", @@ -688,6 +690,12 @@ async def test_check_and_process_evaluation_completed( db.refresh(eval_run) mock_get_batch.return_value = batch_job + mock_poll.return_value = { + "provider_status": "completed", + "provider_output_file_id": "output-file-123", + "error_file_id": None, + "request_counts": {"total": 2, "completed": 2, "failed": 0}, + } mock_process.return_value = eval_run mock_openai = MagicMock() @@ -756,6 +764,13 @@ async def test_check_and_process_evaluation_failed( db.refresh(eval_run) mock_get_batch.return_value = batch_job + mock_poll.return_value = { + "provider_status": "failed", + "provider_output_file_id": None, + "error_file_id": None, + "error_message": "Provider error", + "request_counts": {"total": 2, "completed": 0, "failed": 2}, + } mock_openai = MagicMock() mock_langfuse = MagicMock() @@ -772,6 +787,227 @@ async def test_check_and_process_evaluation_failed( db.refresh(eval_run) assert eval_run.status == "failed" + @pytest.fixture + def all_requests_failed_setup( + self, db: Session, test_dataset + ) -> tuple[BatchJob, EvaluationRun]: + """Create a BatchJob (completed, no output file) and a processing EvaluationRun for the all-requests-failed scenario.""" + config = create_test_config( + db, project_id=test_dataset.project_id, use_kaapi_schema=True + ) + + batch_job = BatchJob( + provider="openai", + provider_batch_id="batch_all_fail", + provider_status="completed", + job_type=BatchJobType.EVALUATION, + total_items=9, + status="submitted", + organization_id=test_dataset.organization_id, + project_id=test_dataset.project_id, + inserted_at=now(), + updated_at=now(), + ) + db.add(batch_job) + db.commit() + db.refresh(batch_job) + + eval_run = create_evaluation_run( + session=db, + run_name="test_run_all_fail", + dataset_name=test_dataset.name, + dataset_id=test_dataset.id, + config_id=config.id, + config_version=1, + organization_id=test_dataset.organization_id, + project_id=test_dataset.project_id, + ) + eval_run.batch_job_id = batch_job.id + eval_run.status = "processing" + db.add(eval_run) + db.commit() + db.refresh(eval_run) + + return batch_job, eval_run + + @pytest.mark.asyncio + @patch("app.crud.evaluations.processing.get_batch_job") + @patch("app.crud.evaluations.processing.poll_batch_status") + @patch("app.crud.evaluations.processing.OpenAIBatchProvider") + async def test_check_and_process_evaluation_completed_all_requests_failed( + self, + mock_provider_cls, + mock_poll, + mock_get_batch, + db: Session, + all_requests_failed_setup: tuple[BatchJob, EvaluationRun], + ): + """Test batch completed but all requests failed — both batch_job and eval_run get error_message.""" + batch_job, eval_run = all_requests_failed_setup + + mock_get_batch.return_value = batch_job + mock_poll.return_value = { + "provider_status": "completed", + "provider_output_file_id": None, + "error_file_id": "error-file-abc", + "request_counts": {"total": 9, "completed": 0, "failed": 9}, + } + + # Mock the provider instance returned by OpenAIBatchProvider(client=...) + # to return realistic error file content + error_lines = "\n".join( + [ + json.dumps( + { + "id": f"batch_req_{i}", + "custom_id": f"id-{i}", + "response": { + "status_code": 400, + "body": { + "error": { + "message": "Unsupported parameter: 'temperature' is not supported with this model.", + } + }, + }, + "error": None, + } + ) + for i in range(9) + ] + ) + mock_provider_instance = mock_provider_cls.return_value + mock_provider_instance.download_file.return_value = error_lines + + mock_openai = MagicMock() + mock_langfuse = MagicMock() + + result = await check_and_process_evaluation( + eval_run=eval_run, + session=db, + openai_client=mock_openai, + langfuse=mock_langfuse, + ) + + assert result["action"] == "failed" + assert result["current_status"] == "failed" + assert "temperature" in result["error"] + assert "(9/9 requests)" in result["error"] + + # Verify eval_run updated with error + db.refresh(eval_run) + assert eval_run.status == "failed" + assert "temperature" in eval_run.error_message + + # Verify batch_job updated with error + db.refresh(batch_job) + assert "temperature" in batch_job.error_message + assert "(9/9 requests)" in batch_job.error_message + + +class TestExtractBatchErrorMessage: + """Test extracting error messages from OpenAI error files.""" + + def test_single_unique_error(self) -> None: + """Test error file where all requests have the same error.""" + error_lines = [] + for i in range(5): + error_lines.append( + json.dumps( + { + "id": f"batch_req_{i}", + "custom_id": f"id-{i}", + "response": { + "status_code": 400, + "body": { + "error": { + "message": "Unsupported parameter: 'temperature' is not supported with this model.", + "type": "invalid_request_error", + } + }, + }, + "error": None, + } + ) + ) + error_content = "\n".join(error_lines) + + mock_provider = MagicMock() + mock_provider.download_file.return_value = error_content + + mock_session = MagicMock() + mock_batch_job = MagicMock() + mock_batch_job.id = 1 + + result = _extract_batch_error_message( + provider=mock_provider, + error_file_id="error-file-123", + batch_job=mock_batch_job, + session=mock_session, + ) + + assert "Unsupported parameter" in result + assert "(5/5 requests)" in result + mock_provider.download_file.assert_called_once_with("error-file-123") + + def test_multiple_unique_errors_picks_most_common(self) -> None: + """Test error file with mixed errors; picks the most frequent one.""" + error_lines = [] + # 3 requests with temperature error + for i in range(3): + error_lines.append( + json.dumps( + { + "id": f"batch_req_{i}", + "custom_id": f"id-{i}", + "response": { + "status_code": 400, + "body": { + "error": { + "message": "Unsupported parameter: 'temperature'", + } + }, + }, + "error": None, + } + ) + ) + # 1 request with rate limit error + error_lines.append( + json.dumps( + { + "id": "batch_req_3", + "custom_id": "id-3", + "response": { + "status_code": 429, + "body": { + "error": { + "message": "Rate limit exceeded", + } + }, + }, + "error": None, + } + ) + ) + error_content = "\n".join(error_lines) + + mock_provider = MagicMock() + mock_provider.download_file.return_value = error_content + + mock_session = MagicMock() + mock_batch_job = MagicMock() + mock_batch_job.id = 1 + + result = _extract_batch_error_message( + provider=mock_provider, + error_file_id="error-file-123", + batch_job=mock_batch_job, + session=mock_session, + ) + + assert "Unsupported parameter: 'temperature'" in result + assert "(3/4 requests)" in result + class TestPollAllPendingEvaluations: """Test polling all pending evaluations."""