diff --git a/docs/my-website/docs/pass_through/anthropic_completion.md b/docs/my-website/docs/pass_through/anthropic_completion.md index e0c7c7c54962..38c42ed990df 100644 --- a/docs/my-website/docs/pass_through/anthropic_completion.md +++ b/docs/my-website/docs/pass_through/anthropic_completion.md @@ -7,7 +7,7 @@ Pass-through endpoints for Anthropic - call provider-specific endpoint, in nativ | Feature | Supported | Notes | |-------|-------|-------| -| Cost Tracking | ✅ | supports all models on `/messages` endpoint | +| Cost Tracking | ✅ | supports all models on `/messages`, `/v1/messages/batches` endpoint | | Logging | ✅ | works across all integrations | | End-user Tracking | ✅ | disable prometheus tracking via `litellm.disable_end_user_cost_tracking_prometheus_only`| | Streaming | ✅ | | @@ -263,6 +263,19 @@ curl https://api.anthropic.com/v1/messages/batches \ }' ``` +:::note Configuration Required for Batch Cost Tracking +For batch passthrough cost tracking to work properly, you need to define the Anthropic model in your `proxy_config.yaml`: + +```yaml +model_list: + - model_name: claude-sonnet-4-5-20250929 # or any alias + litellm_params: + model: anthropic/claude-sonnet-4-5-20250929 + api_key: os.environ/ANTHROPIC_API_KEY +``` + +This ensures the polling mechanism can correctly identify the provider and retrieve batch status for cost calculation. +::: ## Advanced diff --git a/litellm/batches/batch_utils.py b/litellm/batches/batch_utils.py index 42ff534c2892..8a078eeaca12 100644 --- a/litellm/batches/batch_utils.py +++ b/litellm/batches/batch_utils.py @@ -14,7 +14,7 @@ async def calculate_batch_cost_and_usage( file_content_dictionary: List[dict], - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"], + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"], model_name: Optional[str] = None, ) -> Tuple[float, Usage, List[str]]: """ @@ -37,7 +37,7 @@ async def calculate_batch_cost_and_usage( async def _handle_completed_batch( batch: Batch, - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"], + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"], model_name: Optional[str] = None, ) -> Tuple[float, Usage, List[str]]: """Helper function to process a completed batch and handle logging""" @@ -84,7 +84,7 @@ def _get_batch_models_from_file_content( def _batch_cost_calculator( file_content_dictionary: List[dict], - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai", model_name: Optional[str] = None, ) -> float: """ @@ -186,7 +186,7 @@ def calculate_vertex_ai_batch_cost_and_usage( async def _get_batch_output_file_content_as_dictionary( batch: Batch, - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai", ) -> List[dict]: """ Get the batch output file content as a list of dictionaries @@ -225,7 +225,7 @@ def _get_file_content_as_dictionary(file_content: bytes) -> List[dict]: def _get_batch_job_cost_from_file_content( file_content_dictionary: List[dict], - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai", ) -> float: """ Get the cost of a batch job from the file content @@ -253,7 +253,7 @@ def _get_batch_job_cost_from_file_content( def _get_batch_job_total_usage_from_file_content( file_content_dictionary: List[dict], - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai", model_name: Optional[str] = None, ) -> Usage: """ @@ -332,4 +332,4 @@ def _batch_response_was_successful(batch_job_output_file: dict) -> bool: Check if the batch job response status == 200 """ _response: dict = batch_job_output_file.get("response", None) or {} - return _response.get("status_code", None) == 200 + return _response.get("status_code", None) == 200 \ No newline at end of file diff --git a/litellm/batches/main.py b/litellm/batches/main.py index b99f4a628dc6..126eb09a51cd 100644 --- a/litellm/batches/main.py +++ b/litellm/batches/main.py @@ -22,6 +22,7 @@ import litellm from litellm._logging import verbose_logger from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +from litellm.llms.anthropic.batches.handler import AnthropicBatchesHandler from litellm.llms.azure.batches.handler import AzureBatchesAPI from litellm.llms.bedrock.batches.handler import BedrockBatchesHandler from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler @@ -53,6 +54,7 @@ openai_batches_instance = OpenAIBatchesAPI() azure_batches_instance = AzureBatchesAPI() vertex_ai_batches_instance = VertexAIBatchPrediction(gcs_bucket_name="") +anthropic_batches_instance = AnthropicBatchesHandler() base_llm_http_handler = BaseLLMHTTPHandler() ################################################# @@ -355,7 +357,7 @@ def create_batch( @client async def aretrieve_batch( batch_id: str, - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai", metadata: Optional[Dict[str, str]] = None, extra_headers: Optional[Dict[str, str]] = None, extra_body: Optional[Dict[str, str]] = None, @@ -401,7 +403,7 @@ def _handle_retrieve_batch_providers_without_provider_config( litellm_params: dict, _retrieve_batch_request: RetrieveBatchRequest, _is_async: bool, - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai", ): api_base: Optional[str] = None if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS: @@ -498,6 +500,27 @@ def _handle_retrieve_batch_providers_without_provider_config( timeout=timeout, max_retries=optional_params.max_retries, ) + elif custom_llm_provider == "anthropic": + api_base = ( + optional_params.api_base + or litellm.api_base + or get_secret_str("ANTHROPIC_API_BASE") + ) + api_key = ( + optional_params.api_key + or litellm.api_key + or litellm.azure_key + or get_secret_str("ANTHROPIC_API_KEY") + ) + + response = anthropic_batches_instance.retrieve_batch( + _is_async=_is_async, + batch_id=batch_id, + api_base=api_base, + api_key=api_key, + timeout=timeout, + max_retries=optional_params.max_retries, + ) else: raise litellm.exceptions.BadRequestError( message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format( @@ -517,7 +540,7 @@ def _handle_retrieve_batch_providers_without_provider_config( @client def retrieve_batch( batch_id: str, - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai", metadata: Optional[Dict[str, str]] = None, extra_headers: Optional[Dict[str, str]] = None, extra_body: Optional[Dict[str, str]] = None, @@ -608,7 +631,7 @@ def retrieve_batch( api_key=optional_params.api_key, logging_obj=litellm_logging_obj or LiteLLMLoggingObj( - model=model or "bedrock/unknown", + model=model or f"{custom_llm_provider}/unknown", messages=[], stream=False, call_type="batch_retrieve", diff --git a/litellm/files/main.py b/litellm/files/main.py index 9378715a4720..acf545e43190 100644 --- a/litellm/files/main.py +++ b/litellm/files/main.py @@ -17,6 +17,7 @@ from litellm import get_secret_str from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +from litellm.llms.anthropic.files.handler import AnthropicFilesHandler from litellm.llms.azure.files.handler import AzureOpenAIFilesAPI from litellm.llms.bedrock.files.handler import BedrockFilesHandler from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler @@ -49,6 +50,7 @@ azure_files_instance = AzureOpenAIFilesAPI() vertex_ai_files_instance = VertexAIFilesHandler() bedrock_files_instance = BedrockFilesHandler() +anthropic_files_instance = AnthropicFilesHandler() ################################################# @@ -757,7 +759,7 @@ def file_list( @client async def afile_content( file_id: str, - custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai", + custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai", extra_headers: Optional[Dict[str, str]] = None, extra_body: Optional[Dict[str, str]] = None, **kwargs, @@ -802,7 +804,7 @@ def file_content( file_id: str, model: Optional[str] = None, custom_llm_provider: Optional[ - Union[Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"], str] + Union[Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"], str] ] = None, extra_headers: Optional[Dict[str, str]] = None, extra_body: Optional[Dict[str, str]] = None, @@ -849,6 +851,18 @@ def file_content( _is_async = kwargs.pop("afile_content", False) is True + # Check if this is an Anthropic batch results request + if custom_llm_provider == "anthropic": + response = anthropic_files_instance.file_content( + _is_async=_is_async, + file_content_request=_file_content_request, + api_base=optional_params.api_base, + api_key=optional_params.api_key, + timeout=timeout, + max_retries=optional_params.max_retries, + ) + return response + if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS: # for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there api_base = ( diff --git a/litellm/llms/anthropic/batches/__init__.py b/litellm/llms/anthropic/batches/__init__.py new file mode 100644 index 000000000000..66d1a8f77f43 --- /dev/null +++ b/litellm/llms/anthropic/batches/__init__.py @@ -0,0 +1,5 @@ +from .handler import AnthropicBatchesHandler +from .transformation import AnthropicBatchesConfig + +__all__ = ["AnthropicBatchesHandler", "AnthropicBatchesConfig"] + diff --git a/litellm/llms/anthropic/batches/handler.py b/litellm/llms/anthropic/batches/handler.py new file mode 100644 index 000000000000..fd303e60afc5 --- /dev/null +++ b/litellm/llms/anthropic/batches/handler.py @@ -0,0 +1,168 @@ +""" +Anthropic Batches API Handler +""" + +import asyncio +from typing import TYPE_CHECKING, Any, Coroutine, Optional, Union + +import httpx + +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, +) +from litellm.types.utils import LiteLLMBatch, LlmProviders + +if TYPE_CHECKING: + from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj +else: + LiteLLMLoggingObj = Any + +from ..common_utils import AnthropicModelInfo +from .transformation import AnthropicBatchesConfig + + +class AnthropicBatchesHandler: + """ + Handler for Anthropic Message Batches API. + + Supports: + - retrieve_batch() - Retrieve batch status and information + """ + + def __init__(self): + self.anthropic_model_info = AnthropicModelInfo() + self.provider_config = AnthropicBatchesConfig() + + async def aretrieve_batch( + self, + batch_id: str, + api_base: Optional[str], + api_key: Optional[str], + timeout: Union[float, httpx.Timeout], + max_retries: Optional[int], + logging_obj: Optional[LiteLLMLoggingObj] = None, + ) -> LiteLLMBatch: + """ + Async: Retrieve a batch from Anthropic. + + Args: + batch_id: The batch ID to retrieve + api_base: Anthropic API base URL + api_key: Anthropic API key + timeout: Request timeout + max_retries: Max retry attempts (unused for now) + logging_obj: Optional logging object + + Returns: + LiteLLMBatch: Batch information in OpenAI format + """ + # Resolve API credentials + api_base = api_base or self.anthropic_model_info.get_api_base(api_base) + api_key = api_key or self.anthropic_model_info.get_api_key() + + if not api_key: + raise ValueError("Missing Anthropic API Key") + + # Create a minimal logging object if not provided + if logging_obj is None: + from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObjClass + logging_obj = LiteLLMLoggingObjClass( + model="anthropic/unknown", + messages=[], + stream=False, + call_type="batch_retrieve", + start_time=None, + litellm_call_id=f"batch_retrieve_{batch_id}", + function_id="batch_retrieve", + ) + + # Get the complete URL for batch retrieval + retrieve_url = self.provider_config.get_retrieve_batch_url( + api_base=api_base, + batch_id=batch_id, + optional_params={}, + litellm_params={}, + ) + + # Validate environment and get headers + headers = self.provider_config.validate_environment( + headers={}, + model="", + messages=[], + optional_params={}, + litellm_params={}, + api_key=api_key, + api_base=api_base, + ) + + logging_obj.pre_call( + input=batch_id, + api_key=api_key, + additional_args={ + "api_base": retrieve_url, + "headers": headers, + "complete_input_dict": {}, + }, + ) + # Make the request + async_client = get_async_httpx_client(llm_provider=LlmProviders.ANTHROPIC) + response = await async_client.get( + url=retrieve_url, + headers=headers + ) + response.raise_for_status() + + # Transform response to LiteLLM format + return self.provider_config.transform_retrieve_batch_response( + model=None, + raw_response=response, + logging_obj=logging_obj, + litellm_params={}, + ) + + def retrieve_batch( + self, + _is_async: bool, + batch_id: str, + api_base: Optional[str], + api_key: Optional[str], + timeout: Union[float, httpx.Timeout], + max_retries: Optional[int], + logging_obj: Optional[LiteLLMLoggingObj] = None, + ) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]: + """ + Retrieve a batch from Anthropic. + + Args: + _is_async: Whether to run asynchronously + batch_id: The batch ID to retrieve + api_base: Anthropic API base URL + api_key: Anthropic API key + timeout: Request timeout + max_retries: Max retry attempts (unused for now) + logging_obj: Optional logging object + + Returns: + LiteLLMBatch or Coroutine: Batch information in OpenAI format + """ + if _is_async: + return self.aretrieve_batch( + batch_id=batch_id, + api_base=api_base, + api_key=api_key, + timeout=timeout, + max_retries=max_retries, + logging_obj=logging_obj, + ) + else: + return asyncio.run( + self.aretrieve_batch( + batch_id=batch_id, + api_base=api_base, + api_key=api_key, + timeout=timeout, + max_retries=max_retries, + logging_obj=logging_obj, + ) + ) + diff --git a/litellm/llms/anthropic/batches/transformation.py b/litellm/llms/anthropic/batches/transformation.py index c20136894bdf..750dd002ff9a 100644 --- a/litellm/llms/anthropic/batches/transformation.py +++ b/litellm/llms/anthropic/batches/transformation.py @@ -1,10 +1,14 @@ import json -from typing import TYPE_CHECKING, Any, Dict, List, Optional, cast +import time +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Union, cast -from httpx import Response +import httpx +from httpx import Headers, Response -from litellm.types.llms.openai import AllMessageValues -from litellm.types.utils import ModelResponse +from litellm.llms.base_llm.batches.transformation import BaseBatchesConfig +from litellm.llms.base_llm.chat.transformation import BaseLLMException +from litellm.types.llms.openai import AllMessageValues, CreateBatchRequest +from litellm.types.utils import LiteLLMBatch, LlmProviders, ModelResponse if TYPE_CHECKING: from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj @@ -14,11 +18,221 @@ LoggingClass = Any -class AnthropicBatchesConfig: +class AnthropicBatchesConfig(BaseBatchesConfig): def __init__(self): from ..chat.transformation import AnthropicConfig + from ..common_utils import AnthropicModelInfo self.anthropic_chat_config = AnthropicConfig() # initialize once + self.anthropic_model_info = AnthropicModelInfo() + + @property + def custom_llm_provider(self) -> LlmProviders: + """Return the LLM provider type for this configuration.""" + return LlmProviders.ANTHROPIC + + def validate_environment( + self, + headers: dict, + model: str, + messages: List[AllMessageValues], + optional_params: dict, + litellm_params: dict, + api_key: Optional[str] = None, + api_base: Optional[str] = None, + ) -> dict: + """Validate and prepare environment-specific headers and parameters.""" + # Resolve api_key from environment if not provided + api_key = api_key or self.anthropic_model_info.get_api_key() + if api_key is None: + raise ValueError( + "Missing Anthropic API Key - A call is being made to anthropic but no key is set either in the environment variables or via params" + ) + _headers = { + "accept": "application/json", + "anthropic-version": "2023-06-01", + "content-type": "application/json", + "x-api-key": api_key, + } + # Add beta header for message batches + if "anthropic-beta" not in headers: + headers["anthropic-beta"] = "message-batches-2024-09-24" + headers.update(_headers) + return headers + + def get_complete_batch_url( + self, + api_base: Optional[str], + api_key: Optional[str], + model: str, + optional_params: Dict, + litellm_params: Dict, + data: CreateBatchRequest, + ) -> str: + """Get the complete URL for batch creation request.""" + api_base = api_base or self.anthropic_model_info.get_api_base(api_base) + if not api_base.endswith("/v1/messages/batches"): + api_base = f"{api_base.rstrip('/')}/v1/messages/batches" + return api_base + + def transform_create_batch_request( + self, + model: str, + create_batch_data: CreateBatchRequest, + optional_params: dict, + litellm_params: dict, + ) -> Union[bytes, str, Dict[str, Any]]: + """ + Transform the batch creation request to Anthropic format. + + Not currently implemented - placeholder to satisfy abstract base class. + """ + raise NotImplementedError("Batch creation not yet implemented for Anthropic") + + def transform_create_batch_response( + self, + model: Optional[str], + raw_response: httpx.Response, + logging_obj: LoggingClass, + litellm_params: dict, + ) -> LiteLLMBatch: + """ + Transform Anthropic MessageBatch creation response to LiteLLM format. + + Not currently implemented - placeholder to satisfy abstract base class. + """ + raise NotImplementedError("Batch creation not yet implemented for Anthropic") + + def get_retrieve_batch_url( + self, + api_base: Optional[str], + batch_id: str, + optional_params: Dict, + litellm_params: Dict, + ) -> str: + """ + Get the complete URL for batch retrieval request. + + Args: + api_base: Base API URL (optional, will use default if not provided) + batch_id: Batch ID to retrieve + optional_params: Optional parameters + litellm_params: LiteLLM parameters + + Returns: + Complete URL for Anthropic batch retrieval: {api_base}/v1/messages/batches/{batch_id} + """ + api_base = api_base or self.anthropic_model_info.get_api_base(api_base) + return f"{api_base.rstrip('/')}/v1/messages/batches/{batch_id}" + + def transform_retrieve_batch_request( + self, + batch_id: str, + optional_params: dict, + litellm_params: dict, + ) -> Union[bytes, str, Dict[str, Any]]: + """ + Transform batch retrieval request for Anthropic. + + For Anthropic, the URL is constructed by get_retrieve_batch_url(), + so this method returns an empty dict (no additional request params needed). + """ + # No additional request params needed - URL is handled by get_retrieve_batch_url + return {} + + def transform_retrieve_batch_response( + self, + model: Optional[str], + raw_response: httpx.Response, + logging_obj: LoggingClass, + litellm_params: dict, + ) -> LiteLLMBatch: + """Transform Anthropic MessageBatch retrieval response to LiteLLM format.""" + try: + response_data = raw_response.json() + except Exception as e: + raise ValueError(f"Failed to parse Anthropic batch response: {e}") + + # Map Anthropic MessageBatch to OpenAI Batch format + batch_id = response_data.get("id", "") + processing_status = response_data.get("processing_status", "in_progress") + + # Map Anthropic processing_status to OpenAI status + status_mapping: Dict[str, Literal["validating", "failed", "in_progress", "finalizing", "completed", "expired", "cancelling", "cancelled"]] = { + "in_progress": "in_progress", + "canceling": "cancelling", + "ended": "completed", + } + openai_status = status_mapping.get(processing_status, "in_progress") + + # Parse timestamps + def parse_timestamp(ts_str: Optional[str]) -> Optional[int]: + if not ts_str: + return None + try: + from datetime import datetime + dt = datetime.fromisoformat(ts_str.replace('Z', '+00:00')) + return int(dt.timestamp()) + except Exception: + return None + + created_at = parse_timestamp(response_data.get("created_at")) + ended_at = parse_timestamp(response_data.get("ended_at")) + expires_at = parse_timestamp(response_data.get("expires_at")) + cancel_initiated_at = parse_timestamp(response_data.get("cancel_initiated_at")) + archived_at = parse_timestamp(response_data.get("archived_at")) + + # Extract request counts + request_counts_data = response_data.get("request_counts", {}) + from openai.types.batch import BatchRequestCounts + request_counts = BatchRequestCounts( + total=sum([ + request_counts_data.get("processing", 0), + request_counts_data.get("succeeded", 0), + request_counts_data.get("errored", 0), + request_counts_data.get("canceled", 0), + request_counts_data.get("expired", 0), + ]), + completed=request_counts_data.get("succeeded", 0), + failed=request_counts_data.get("errored", 0), + ) + + return LiteLLMBatch( + id=batch_id, + object="batch", + endpoint="/v1/messages", + errors=None, + input_file_id="None", + completion_window="24h", + status=openai_status, + output_file_id=batch_id, + error_file_id=None, + created_at=created_at or int(time.time()), + in_progress_at=created_at if processing_status == "in_progress" else None, + expires_at=expires_at, + finalizing_at=None, + completed_at=ended_at if processing_status == "ended" else None, + failed_at=None, + expired_at=archived_at if archived_at else None, + cancelling_at=cancel_initiated_at if processing_status == "canceling" else None, + cancelled_at=ended_at if processing_status == "canceling" and ended_at else None, + request_counts=request_counts, + metadata={}, + ) + + def get_error_class( + self, error_message: str, status_code: int, headers: Union[Dict, Headers] + ) -> "BaseLLMException": + """Get the appropriate error class for Anthropic.""" + from ..common_utils import AnthropicError + + # Convert Dict to Headers if needed + if isinstance(headers, dict): + headers_obj: Optional[Headers] = Headers(headers) + else: + headers_obj = headers if isinstance(headers, Headers) else None + + return AnthropicError(status_code=status_code, message=error_message, headers=headers_obj) def transform_response( self, diff --git a/litellm/llms/anthropic/files/__init__.py b/litellm/llms/anthropic/files/__init__.py new file mode 100644 index 000000000000..b8b538ffb62a --- /dev/null +++ b/litellm/llms/anthropic/files/__init__.py @@ -0,0 +1,4 @@ +from .handler import AnthropicFilesHandler + +__all__ = ["AnthropicFilesHandler"] + diff --git a/litellm/llms/anthropic/files/handler.py b/litellm/llms/anthropic/files/handler.py new file mode 100644 index 000000000000..d46fc4013109 --- /dev/null +++ b/litellm/llms/anthropic/files/handler.py @@ -0,0 +1,367 @@ +import asyncio +import json +import time +from typing import Any, Coroutine, Optional, Union + +import httpx + +import litellm +from litellm._logging import verbose_logger +from litellm._uuid import uuid +from litellm.llms.custom_httpx.http_handler import ( + get_async_httpx_client, +) +from litellm.litellm_core_utils.litellm_logging import Logging +from litellm.types.llms.openai import ( + FileContentRequest, + HttpxBinaryResponseContent, + OpenAIBatchResult, + OpenAIChatCompletionResponse, + OpenAIErrorBody, +) +from litellm.types.utils import CallTypes, LlmProviders, ModelResponse + +from ..chat.transformation import AnthropicConfig +from ..common_utils import AnthropicModelInfo + +# Map Anthropic error types to HTTP status codes +ANTHROPIC_ERROR_STATUS_CODE_MAP = { + "invalid_request_error": 400, + "authentication_error": 401, + "permission_error": 403, + "not_found_error": 404, + "rate_limit_error": 429, + "api_error": 500, + "overloaded_error": 503, + "timeout_error": 504, +} + + +class AnthropicFilesHandler: + """ + Handles Anthropic Files API operations. + + Currently supports: + - file_content() for retrieving Anthropic Message Batch results + """ + + def __init__(self): + self.anthropic_model_info = AnthropicModelInfo() + + async def afile_content( + self, + file_content_request: FileContentRequest, + api_base: Optional[str] = None, + api_key: Optional[str] = None, + timeout: Union[float, httpx.Timeout] = 600.0, + max_retries: Optional[int] = None, + ) -> HttpxBinaryResponseContent: + """ + Async: Retrieve file content from Anthropic. + + For batch results, the file_id should be the batch_id. + This will call Anthropic's /v1/messages/batches/{batch_id}/results endpoint. + + Args: + file_content_request: Contains file_id (batch_id for batch results) + api_base: Anthropic API base URL + api_key: Anthropic API key + timeout: Request timeout + max_retries: Max retry attempts (unused for now) + + Returns: + HttpxBinaryResponseContent: Binary content wrapped in compatible response format + """ + file_id = file_content_request.get("file_id") + if not file_id: + raise ValueError("file_id is required in file_content_request") + + # Extract batch_id from file_id + # Handle both formats: "anthropic_batch_results:{batch_id}" or just "{batch_id}" + if file_id.startswith("anthropic_batch_results:"): + batch_id = file_id.replace("anthropic_batch_results:", "", 1) + else: + batch_id = file_id + + # Get Anthropic API credentials + api_base = self.anthropic_model_info.get_api_base(api_base) + api_key = api_key or self.anthropic_model_info.get_api_key() + + if not api_key: + raise ValueError("Missing Anthropic API Key") + + # Construct the Anthropic batch results URL + results_url = f"{api_base.rstrip('/')}/v1/messages/batches/{batch_id}/results" + + # Prepare headers + headers = { + "accept": "application/json", + "anthropic-version": "2023-06-01", + "x-api-key": api_key, + } + + # Make the request to Anthropic + async_client = get_async_httpx_client(llm_provider=LlmProviders.ANTHROPIC) + anthropic_response = await async_client.get( + url=results_url, + headers=headers + ) + anthropic_response.raise_for_status() + + # Transform Anthropic batch results to OpenAI format + transformed_content = self._transform_anthropic_batch_results_to_openai_format( + anthropic_response.content + ) + + # Create a new response with transformed content + transformed_response = httpx.Response( + status_code=anthropic_response.status_code, + headers=anthropic_response.headers, + content=transformed_content, + request=anthropic_response.request, + ) + + # Return the transformed response content + return HttpxBinaryResponseContent(response=transformed_response) + + + def file_content( + self, + _is_async: bool, + file_content_request: FileContentRequest, + api_base: Optional[str] = None, + api_key: Optional[str] = None, + timeout: Union[float, httpx.Timeout] = 600.0, + max_retries: Optional[int] = None, + ) -> Union[ + HttpxBinaryResponseContent, Coroutine[Any, Any, HttpxBinaryResponseContent] + ]: + """ + Retrieve file content from Anthropic. + + For batch results, the file_id should be the batch_id. + This will call Anthropic's /v1/messages/batches/{batch_id}/results endpoint. + + Args: + _is_async: Whether to run asynchronously + file_content_request: Contains file_id (batch_id for batch results) + api_base: Anthropic API base URL + api_key: Anthropic API key + timeout: Request timeout + max_retries: Max retry attempts (unused for now) + + Returns: + HttpxBinaryResponseContent or Coroutine: Binary content wrapped in compatible response format + """ + if _is_async: + return self.afile_content( + file_content_request=file_content_request, + api_base=api_base, + api_key=api_key, + max_retries=max_retries, + ) + else: + return asyncio.run( + self.afile_content( + file_content_request=file_content_request, + api_base=api_base, + api_key=api_key, + timeout=timeout, + max_retries=max_retries, + ) + ) + + def _transform_anthropic_batch_results_to_openai_format( + self, anthropic_content: bytes + ) -> bytes: + """ + Transform Anthropic batch results JSONL to OpenAI batch results JSONL format. + + Anthropic format: + { + "custom_id": "...", + "result": { + "type": "succeeded", + "message": { ... } // Anthropic message format + } + } + + OpenAI format: + { + "custom_id": "...", + "response": { + "status_code": 200, + "request_id": "...", + "body": { ... } // OpenAI chat completion format + } + } + """ + try: + anthropic_config = AnthropicConfig() + transformed_lines = [] + + # Parse JSONL content + content_str = anthropic_content.decode("utf-8") + for line in content_str.strip().split("\n"): + if not line.strip(): + continue + + anthropic_result = json.loads(line) + custom_id = anthropic_result.get("custom_id", "") + result = anthropic_result.get("result", {}) + result_type = result.get("type", "") + + # Transform based on result type + if result_type == "succeeded": + # Transform Anthropic message to OpenAI format + anthropic_message = result.get("message", {}) + if anthropic_message: + openai_response_body = self._transform_anthropic_message_to_openai_format( + anthropic_message=anthropic_message, + anthropic_config=anthropic_config, + ) + + # Create OpenAI batch result format + openai_result: OpenAIBatchResult = { + "custom_id": custom_id, + "response": { + "status_code": 200, + "request_id": anthropic_message.get("id", ""), + "body": openai_response_body, + }, + } + transformed_lines.append(json.dumps(openai_result)) + elif result_type == "errored": + # Handle error case + error = result.get("error", {}) + error_obj = error.get("error", {}) + error_message = error_obj.get("message", "Unknown error") + error_type = error_obj.get("type", "api_error") + + status_code = ANTHROPIC_ERROR_STATUS_CODE_MAP.get(error_type, 500) + + error_body_errored: OpenAIErrorBody = { + "error": { + "message": error_message, + "type": error_type, + } + } + openai_result_errored: OpenAIBatchResult = { + "custom_id": custom_id, + "response": { + "status_code": status_code, + "request_id": error.get("request_id", ""), + "body": error_body_errored, + }, + } + transformed_lines.append(json.dumps(openai_result_errored)) + elif result_type in ["canceled", "expired"]: + # Handle canceled/expired cases + error_body_canceled: OpenAIErrorBody = { + "error": { + "message": f"Batch request was {result_type}", + "type": "invalid_request_error", + } + } + openai_result_canceled: OpenAIBatchResult = { + "custom_id": custom_id, + "response": { + "status_code": 400, + "request_id": "", + "body": error_body_canceled, + }, + } + transformed_lines.append(json.dumps(openai_result_canceled)) + + # Join lines and encode back to bytes + transformed_content = "\n".join(transformed_lines) + if transformed_lines: + transformed_content += "\n" # Add trailing newline for JSONL format + return transformed_content.encode("utf-8") + except Exception as e: + verbose_logger.error( + f"Error transforming Anthropic batch results to OpenAI format: {e}" + ) + # Return original content if transformation fails + return anthropic_content + + def _transform_anthropic_message_to_openai_format( + self, anthropic_message: dict, anthropic_config: AnthropicConfig + ) -> OpenAIChatCompletionResponse: + """ + Transform a single Anthropic message to OpenAI chat completion format. + """ + try: + # Create a mock httpx.Response for transformation + mock_response = httpx.Response( + status_code=200, + content=json.dumps(anthropic_message).encode("utf-8"), + ) + + # Create a ModelResponse object + model_response = ModelResponse() + # Initialize with required fields - will be populated by transform_parsed_response + model_response.choices = [ + litellm.Choices( + finish_reason="stop", + index=0, + message=litellm.Message(content="", role="assistant"), + ) + ] # type: ignore + + # Create a logging object for transformation + logging_obj = Logging( + model=anthropic_message.get("model", "claude-3-5-sonnet-20241022"), + messages=[{"role": "user", "content": "batch_request"}], + stream=False, + call_type=CallTypes.aretrieve_batch, + start_time=time.time(), + litellm_call_id="batch_" + str(uuid.uuid4()), + function_id="batch_processing", + litellm_trace_id=str(uuid.uuid4()), + kwargs={"optional_params": {}}, + ) + logging_obj.optional_params = {} + + # Transform using AnthropicConfig + transformed_response = anthropic_config.transform_parsed_response( + completion_response=anthropic_message, + raw_response=mock_response, + model_response=model_response, + json_mode=False, + prefix_prompt=None, + ) + + # Convert ModelResponse to OpenAI format dict - it's already in OpenAI format + openai_body: OpenAIChatCompletionResponse = transformed_response.model_dump(exclude_none=True) + + # Ensure id comes from anthropic_message if not set + if not openai_body.get("id"): + openai_body["id"] = anthropic_message.get("id", "") + + return openai_body + except Exception as e: + verbose_logger.error( + f"Error transforming Anthropic message to OpenAI format: {e}" + ) + # Return a basic error response if transformation fails + error_response: OpenAIChatCompletionResponse = { + "id": anthropic_message.get("id", ""), + "object": "chat.completion", + "created": int(time.time()), + "model": anthropic_message.get("model", ""), + "choices": [ + { + "index": 0, + "message": {"role": "assistant", "content": ""}, + "finish_reason": "error", + } + ], + "usage": { + "prompt_tokens": 0, + "completion_tokens": 0, + "total_tokens": 0, + }, + } + return error_response + diff --git a/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py b/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py index b990f4ca6e98..11550770ff4d 100644 --- a/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py +++ b/litellm/proxy/pass_through_endpoints/llm_provider_handlers/anthropic_passthrough_logging_handler.py @@ -16,7 +16,7 @@ from litellm.types.passthrough_endpoints.pass_through_endpoints import ( PassthroughStandardLoggingPayload, ) -from litellm.types.utils import ModelResponse, TextCompletionResponse +from litellm.types.utils import LiteLLMBatch, ModelResponse, TextCompletionResponse if TYPE_CHECKING: from ..success_handler import PassThroughEndpointLogging @@ -37,11 +37,28 @@ def anthropic_passthrough_handler( start_time: datetime, end_time: datetime, cache_hit: bool, + request_body: Optional[dict] = None, **kwargs, ) -> PassThroughEndpointLoggingTypedDict: """ Transforms Anthropic response to OpenAI response, generates a standard logging object so downstream logging can be handled """ + # Check if this is a batch creation request + if "/v1/messages/batches" in url_route and httpx_response.status_code == 200: + # Get request body from parameter or kwargs + request_body = request_body or kwargs.get("request_body", {}) + return AnthropicPassthroughLoggingHandler.batch_creation_handler( + httpx_response=httpx_response, + logging_obj=logging_obj, + url_route=url_route, + result=result, + start_time=start_time, + end_time=end_time, + cache_hit=cache_hit, + request_body=request_body, + **kwargs, + ) + model = response_body.get("model", "") anthropic_config = get_anthropic_config(url_route) litellm_model_response: ModelResponse = anthropic_config().transform_response( @@ -238,3 +255,288 @@ def _build_complete_streaming_response( logging_obj=litellm_logging_obj, ) return complete_streaming_response + + @staticmethod + def batch_creation_handler( # noqa: PLR0915 + httpx_response: httpx.Response, + logging_obj: LiteLLMLoggingObj, + url_route: str, + result: str, + start_time: datetime, + end_time: datetime, + cache_hit: bool, + request_body: Optional[dict] = None, + **kwargs, + ) -> PassThroughEndpointLoggingTypedDict: + """ + Handle Anthropic batch creation passthrough logging. + Creates a managed object for cost tracking when batch job is successfully created. + """ + import base64 + + from litellm._uuid import uuid + from litellm.llms.anthropic.batches.transformation import ( + AnthropicBatchesConfig, + ) + from litellm.types.utils import Choices, SpecialEnums + + try: + _json_response = httpx_response.json() + + + # Only handle successful batch job creation (POST requests with 201 status) + if httpx_response.status_code == 200 and "id" in _json_response: + # Transform Anthropic response to LiteLLM batch format + anthropic_batches_config = AnthropicBatchesConfig() + litellm_batch_response = anthropic_batches_config.transform_retrieve_batch_response( + model=None, + raw_response=httpx_response, + logging_obj=logging_obj, + litellm_params={}, + ) + # Set status to "validating" for newly created batches so polling mechanism picks them up + # The polling mechanism only looks for status="validating" jobs + litellm_batch_response.status = "validating" + + # Extract batch ID from the response + batch_id = _json_response.get("id", "") + + # Get model from request body (batch response doesn't include model) + request_body = request_body or {} + # Try to extract model from the batch request body, supporting Anthropic's nested structure + model_name: str = "unknown" + if isinstance(request_body, dict): + # Standard: {"model": ...} + model_name = request_body.get("model") or "unknown" + if model_name == "unknown": + # Anthropic batches: look under requests[0].params.model + requests_list = request_body.get("requests", []) + if isinstance(requests_list, list) and len(requests_list) > 0: + first_req = requests_list[0] + if isinstance(first_req, dict): + params = first_req.get("params", {}) + if isinstance(params, dict): + extracted_model = params.get("model") + if extracted_model: + model_name = extracted_model + + + # Create unified object ID for tracking + # Format: base64(litellm_proxy;model_id:{};llm_batch_id:{}) + # For Anthropic passthrough, prefix model with "anthropic/" so router can determine provider + actual_model_id = AnthropicPassthroughLoggingHandler.get_actual_model_id_from_router(model_name) + + # If model not in router, use "anthropic/{model_name}" format so router can determine provider + if actual_model_id == model_name and not actual_model_id.startswith("anthropic/"): + actual_model_id = f"anthropic/{model_name}" + + unified_id_string = SpecialEnums.LITELLM_MANAGED_BATCH_COMPLETE_STR.value.format(actual_model_id, batch_id) + unified_object_id = base64.urlsafe_b64encode(unified_id_string.encode()).decode().rstrip("=") + + # Store the managed object for cost tracking + # This will be picked up by check_batch_cost polling mechanism + AnthropicPassthroughLoggingHandler._store_batch_managed_object( + unified_object_id=unified_object_id, + batch_object=litellm_batch_response, + model_object_id=batch_id, + logging_obj=logging_obj, + **kwargs, + ) + + # Create a batch job response for logging + litellm_model_response = ModelResponse() + litellm_model_response.id = str(uuid.uuid4()) + litellm_model_response.model = model_name + litellm_model_response.object = "batch" + litellm_model_response.created = int(start_time.timestamp()) + + # Add batch-specific metadata to indicate this is a pending batch job + litellm_model_response.choices = [Choices( + finish_reason="batch_pending", + index=0, + message={ + "role": "assistant", + "content": f"Batch job {batch_id} created and is pending. Status will be updated when the batch completes.", + "tool_calls": None, + "function_call": None, + "provider_specific_fields": { + "batch_job_id": batch_id, + "batch_job_state": "in_progress", + "unified_object_id": unified_object_id + } + } + )] + + # Set response cost to 0 initially (will be updated when batch completes) + response_cost = 0.0 + kwargs["response_cost"] = response_cost + kwargs["model"] = model_name + kwargs["batch_id"] = batch_id + kwargs["unified_object_id"] = unified_object_id + kwargs["batch_job_state"] = "in_progress" + + logging_obj.model = model_name + logging_obj.model_call_details["model"] = logging_obj.model + logging_obj.model_call_details["response_cost"] = response_cost + logging_obj.model_call_details["batch_id"] = batch_id + + return { + "result": litellm_model_response, + "kwargs": kwargs, + } + else: + # Handle non-successful responses + litellm_model_response = ModelResponse() + litellm_model_response.id = str(uuid.uuid4()) + litellm_model_response.model = "anthropic_batch" + litellm_model_response.object = "batch" + litellm_model_response.created = int(start_time.timestamp()) + + # Add error-specific metadata + litellm_model_response.choices = [Choices( + finish_reason="batch_error", + index=0, + message={ + "role": "assistant", + "content": f"Batch job creation failed. Status: {httpx_response.status_code}", + "tool_calls": None, + "function_call": None, + "provider_specific_fields": { + "batch_job_state": "failed", + "status_code": httpx_response.status_code + } + } + )] + + kwargs["response_cost"] = 0.0 + kwargs["model"] = "anthropic_batch" + kwargs["batch_job_state"] = "failed" + + return { + "result": litellm_model_response, + "kwargs": kwargs, + } + + except Exception as e: + verbose_proxy_logger.error(f"Error in batch_creation_handler: {e}") + # Return basic response on error + litellm_model_response = ModelResponse() + litellm_model_response.id = str(uuid.uuid4()) + litellm_model_response.model = "anthropic_batch" + litellm_model_response.object = "batch" + litellm_model_response.created = int(start_time.timestamp()) + + # Add error-specific metadata + litellm_model_response.choices = [Choices( + finish_reason="batch_error", + index=0, + message={ + "role": "assistant", + "content": f"Error creating batch job: {str(e)}", + "tool_calls": None, + "function_call": None, + "provider_specific_fields": { + "batch_job_state": "failed", + "error": str(e) + } + } + )] + + kwargs["response_cost"] = 0.0 + kwargs["model"] = "anthropic_batch" + kwargs["batch_job_state"] = "failed" + + return { + "result": litellm_model_response, + "kwargs": kwargs, + } + + @staticmethod + def _store_batch_managed_object( + unified_object_id: str, + batch_object: LiteLLMBatch, + model_object_id: str, + logging_obj: LiteLLMLoggingObj, + **kwargs, + ) -> None: + """ + Store batch managed object for cost tracking. + This will be picked up by the check_batch_cost polling mechanism. + """ + try: + + # Get the managed files hook from the logging object + # This is a bit of a hack, but we need access to the proxy logging system + from litellm.proxy.proxy_server import proxy_logging_obj + + managed_files_hook = proxy_logging_obj.get_proxy_hook("managed_files") + if managed_files_hook is not None and hasattr(managed_files_hook, 'store_unified_object_id'): + # Create a mock user API key dict for the managed object storage + from litellm.proxy._types import LitellmUserRoles, UserAPIKeyAuth + user_api_key_dict = UserAPIKeyAuth( + user_id=kwargs.get("user_id", "default-user"), + api_key="", + team_id=None, + team_alias=None, + user_role=LitellmUserRoles.CUSTOMER, # Use proper enum value + user_email=None, + max_budget=None, + spend=0.0, # Set to 0.0 instead of None + models=[], # Set to empty list instead of None + tpm_limit=None, + rpm_limit=None, + budget_duration=None, + budget_reset_at=None, + max_parallel_requests=None, + allowed_model_region=None, + metadata={}, # Set to empty dict instead of None + key_alias=None, + permissions={}, # Set to empty dict instead of None + model_max_budget={}, # Set to empty dict instead of None + model_spend={}, # Set to empty dict instead of None + ) + + # Store the unified object for batch cost tracking + import asyncio + asyncio.create_task( + managed_files_hook.store_unified_object_id( # type: ignore + unified_object_id=unified_object_id, + file_object=batch_object, + litellm_parent_otel_span=None, + model_object_id=model_object_id, + file_purpose="batch", + user_api_key_dict=user_api_key_dict, + ) + ) + + verbose_proxy_logger.info( + f"Stored Anthropic batch managed object with unified_object_id={unified_object_id}, batch_id={model_object_id}" + ) + else: + verbose_proxy_logger.warning("Managed files hook not available, cannot store batch object for cost tracking") + + except Exception as e: + verbose_proxy_logger.error(f"Error storing Anthropic batch managed object: {e}") + + @staticmethod + def get_actual_model_id_from_router(model_name: str) -> str: + from litellm.proxy.proxy_server import llm_router + + if llm_router is not None: + # Try to find the model in the router by the model name + # Use the existing get_model_ids method from router + model_ids = llm_router.get_model_ids(model_name=model_name) + if model_ids and len(model_ids) > 0: + # Use the first model ID found + actual_model_id = model_ids[0] + verbose_proxy_logger.info(f"Found model ID in router: {actual_model_id}") + return actual_model_id + else: + # Fallback to model name + actual_model_id = model_name + verbose_proxy_logger.warning(f"Model not found in router, using model name: {actual_model_id}") + return actual_model_id + else: + # Fallback if router is not available + verbose_proxy_logger.warning(f"Router not available, using model name: {model_name}") + return model_name diff --git a/litellm/proxy/pass_through_endpoints/success_handler.py b/litellm/proxy/pass_through_endpoints/success_handler.py index 6d93ef68dfd7..41b92c561114 100644 --- a/litellm/proxy/pass_through_endpoints/success_handler.py +++ b/litellm/proxy/pass_through_endpoints/success_handler.py @@ -46,7 +46,7 @@ def __init__(self): ] # Anthropic - self.TRACKED_ANTHROPIC_ROUTES = ["/messages"] + self.TRACKED_ANTHROPIC_ROUTES = ["/messages", "/v1/messages/batches"] # Cohere self.TRACKED_COHERE_ROUTES = ["/v2/chat", "/v1/embed"] @@ -169,6 +169,7 @@ def normalize_llm_passthrough_logging_payload( start_time=start_time, end_time=end_time, cache_hit=cache_hit, + request_body=request_body, **kwargs, ) ) diff --git a/litellm/types/llms/openai.py b/litellm/types/llms/openai.py index 59397a62a8b1..d0e4bbf4a42b 100644 --- a/litellm/types/llms/openai.py +++ b/litellm/types/llms/openai.py @@ -437,10 +437,12 @@ class ListBatchRequest(TypedDict, total=False): """ after: Union[str, NotGiven] - limit: Union[int, NotGiven] - extra_headers: Optional[Dict[str, str]] - extra_body: Optional[Dict[str, str]] - timeout: Optional[float] + + +# OpenAI Batch Result Types +class OpenAIErrorBody(TypedDict, total=False): + """Error body in OpenAI batch response format.""" + error: Dict[str, str] BatchJobStatus = Literal[ @@ -1824,6 +1826,20 @@ class OpenAIChatCompletionResponse(TypedDict, total=False): service_tier: str +# OpenAI Batch Result Types (defined after OpenAIChatCompletionResponse for forward reference) +class OpenAIBatchResponse(TypedDict, total=False): + """Response wrapper in OpenAI batch result format.""" + status_code: int + request_id: str + body: Union[OpenAIChatCompletionResponse, OpenAIErrorBody] + + +class OpenAIBatchResult(TypedDict, total=False): + """OpenAI batch result format.""" + custom_id: str + response: OpenAIBatchResponse + + OpenAIChatCompletionFinishReason = Literal[ "stop", "content_filter", "function_call", "tool_calls", "length" ] diff --git a/tests/test_litellm/llms/anthropic/test_anthropic_files_and_batches.py b/tests/test_litellm/llms/anthropic/test_anthropic_files_and_batches.py new file mode 100644 index 000000000000..47571220175a --- /dev/null +++ b/tests/test_litellm/llms/anthropic/test_anthropic_files_and_batches.py @@ -0,0 +1,639 @@ +""" +Test Anthropic Files Handler and Batch Retrieval + +Tests for: +1. AnthropicFilesHandler.afile_content() - retrieving batch results +2. AnthropicBatchesConfig.transform_retrieve_batch_response() - transforming batch responses +3. Transformation of Anthropic batch results to OpenAI format +""" + +import json +import os +import sys +from unittest.mock import AsyncMock, MagicMock, patch + +sys.path.insert(0, os.path.abspath("../../../../")) + +import httpx +import pytest + +from litellm.llms.anthropic.batches.transformation import AnthropicBatchesConfig +from litellm.llms.anthropic.files.handler import AnthropicFilesHandler +from litellm.types.llms.openai import FileContentRequest, HttpxBinaryResponseContent + + +class TestAnthropicFilesHandler: + """Test Anthropic Files Handler for batch results retrieval""" + + @pytest.fixture + def handler(self): + """Create AnthropicFilesHandler instance""" + return AnthropicFilesHandler() + + @pytest.fixture + def mock_anthropic_batch_results_succeeded(self): + """Mock Anthropic batch results with succeeded status""" + return json.dumps({ + "custom_id": "test-request-1", + "result": { + "type": "succeeded", + "message": { + "id": "msg_123", + "model": "claude-3-5-sonnet-20241022", + "role": "assistant", + "content": [ + { + "type": "text", + "text": "Hello, world!" + } + ], + "stop_reason": "end_turn", + "stop_sequence": None, + "usage": { + "input_tokens": 10, + "output_tokens": 5 + } + } + } + }).encode("utf-8") + + @pytest.fixture + def mock_anthropic_batch_results_errored(self): + """Mock Anthropic batch results with errored status""" + return json.dumps({ + "custom_id": "test-request-2", + "result": { + "type": "errored", + "error": { + "error": { + "type": "invalid_request_error", + "message": "Invalid request" + }, + "request_id": "req_456" + } + } + }).encode("utf-8") + + @pytest.fixture + def mock_anthropic_batch_results_canceled(self): + """Mock Anthropic batch results with canceled status""" + return json.dumps({ + "custom_id": "test-request-3", + "result": { + "type": "canceled" + } + }).encode("utf-8") + + @pytest.fixture + def mock_anthropic_batch_results_mixed(self): + """Mock Anthropic batch results with multiple result types""" + lines = [ + json.dumps({ + "custom_id": "test-request-1", + "result": { + "type": "succeeded", + "message": { + "id": "msg_123", + "model": "claude-3-5-sonnet-20241022", + "role": "assistant", + "content": [{"type": "text", "text": "Success"}], + "stop_reason": "end_turn", + "usage": {"input_tokens": 10, "output_tokens": 5} + } + } + }), + json.dumps({ + "custom_id": "test-request-2", + "result": { + "type": "errored", + "error": { + "error": { + "type": "rate_limit_error", + "message": "Rate limit exceeded" + }, + "request_id": "req_456" + } + } + }), + json.dumps({ + "custom_id": "test-request-3", + "result": { + "type": "expired" + } + }) + ] + return "\n".join(lines).encode("utf-8") + + @pytest.mark.asyncio + async def test_afile_content_success(self, handler, mock_anthropic_batch_results_succeeded): + """Test successful file content retrieval and transformation""" + file_content_request: FileContentRequest = { + "file_id": "batch_123", + "extra_headers": None, + "extra_body": None + } + + # Mock the httpx client + mock_response = httpx.Response( + status_code=200, + content=mock_anthropic_batch_results_succeeded, + headers={"content-type": "application/json"}, + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123/results") + ) + + with patch("litellm.llms.anthropic.files.handler.get_async_httpx_client") as mock_get_client: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_client + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value="test-api-key"): + with patch.object(handler.anthropic_model_info, "get_api_base", return_value="https://api.anthropic.com"): + result = await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + # Verify result + assert isinstance(result, HttpxBinaryResponseContent) + assert result.response.status_code == 200 + + # Verify transformation to OpenAI format + content = result.response.content.decode("utf-8") + lines = [line for line in content.strip().split("\n") if line.strip()] + assert len(lines) == 1 + + transformed_result = json.loads(lines[0]) + assert transformed_result["custom_id"] == "test-request-1" + assert transformed_result["response"]["status_code"] == 200 + assert "body" in transformed_result["response"] + # Verify body has required OpenAI format fields + assert "id" in transformed_result["response"]["body"] + assert transformed_result["response"]["body"]["object"] == "chat.completion" + assert "choices" in transformed_result["response"]["body"] + # Verify request_id matches the original message id + assert transformed_result["response"]["request_id"] == "msg_123" + + @pytest.mark.asyncio + async def test_afile_content_with_prefix(self, handler, mock_anthropic_batch_results_succeeded): + """Test file content retrieval with anthropic_batch_results: prefix""" + file_content_request: FileContentRequest = { + "file_id": "anthropic_batch_results:batch_123", + "extra_headers": None, + "extra_body": None + } + + mock_response = httpx.Response( + status_code=200, + content=mock_anthropic_batch_results_succeeded, + headers={"content-type": "application/json"}, + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123/results") + ) + + with patch("litellm.llms.anthropic.files.handler.get_async_httpx_client") as mock_get_client: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_client + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value="test-api-key"): + with patch.object(handler.anthropic_model_info, "get_api_base", return_value="https://api.anthropic.com"): + result = await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + assert isinstance(result, HttpxBinaryResponseContent) + # Verify the URL was constructed correctly (batch_id extracted from prefix) + mock_client.get.assert_called_once() + call_url = mock_client.get.call_args[1]["url"] + assert "batch_123" in call_url + + @pytest.mark.asyncio + async def test_afile_content_errored_result(self, handler, mock_anthropic_batch_results_errored): + """Test transformation of errored batch results""" + file_content_request: FileContentRequest = { + "file_id": "batch_123", + "extra_headers": None, + "extra_body": None + } + + mock_response = httpx.Response( + status_code=200, + content=mock_anthropic_batch_results_errored, + headers={"content-type": "application/json"}, + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123/results") + ) + + with patch("litellm.llms.anthropic.files.handler.get_async_httpx_client") as mock_get_client: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_client + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value="test-api-key"): + with patch.object(handler.anthropic_model_info, "get_api_base", return_value="https://api.anthropic.com"): + result = await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + content = result.response.content.decode("utf-8") + lines = [line for line in content.strip().split("\n") if line.strip()] + assert len(lines) == 1 + + transformed_result = json.loads(lines[0]) + assert transformed_result["custom_id"] == "test-request-2" + assert transformed_result["response"]["status_code"] == 400 # invalid_request_error maps to 400 + assert transformed_result["response"]["body"]["error"]["type"] == "invalid_request_error" + assert transformed_result["response"]["body"]["error"]["message"] == "Invalid request" + + @pytest.mark.asyncio + async def test_afile_content_canceled_result(self, handler, mock_anthropic_batch_results_canceled): + """Test transformation of canceled batch results""" + file_content_request: FileContentRequest = { + "file_id": "batch_123", + "extra_headers": None, + "extra_body": None + } + + mock_response = httpx.Response( + status_code=200, + content=mock_anthropic_batch_results_canceled, + headers={"content-type": "application/json"}, + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123/results") + ) + + with patch("litellm.llms.anthropic.files.handler.get_async_httpx_client") as mock_get_client: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_client + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value="test-api-key"): + with patch.object(handler.anthropic_model_info, "get_api_base", return_value="https://api.anthropic.com"): + result = await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + content = result.response.content.decode("utf-8") + lines = [line for line in content.strip().split("\n") if line.strip()] + assert len(lines) == 1 + + transformed_result = json.loads(lines[0]) + assert transformed_result["custom_id"] == "test-request-3" + assert transformed_result["response"]["status_code"] == 400 + assert "Batch request was canceled" in transformed_result["response"]["body"]["error"]["message"] + + @pytest.mark.asyncio + async def test_afile_content_mixed_results(self, handler, mock_anthropic_batch_results_mixed): + """Test transformation of mixed batch results (succeeded, errored, expired)""" + file_content_request: FileContentRequest = { + "file_id": "batch_123", + "extra_headers": None, + "extra_body": None + } + + mock_response = httpx.Response( + status_code=200, + content=mock_anthropic_batch_results_mixed, + headers={"content-type": "application/json"}, + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123/results") + ) + + with patch("litellm.llms.anthropic.files.handler.get_async_httpx_client") as mock_get_client: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_client + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value="test-api-key"): + with patch.object(handler.anthropic_model_info, "get_api_base", return_value="https://api.anthropic.com"): + result = await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + content = result.response.content.decode("utf-8") + lines = [line for line in content.strip().split("\n") if line.strip()] + assert len(lines) == 3 + + # Check first result (succeeded) + result1 = json.loads(lines[0]) + assert result1["response"]["status_code"] == 200 + + # Check second result (errored) + result2 = json.loads(lines[1]) + assert result2["response"]["status_code"] == 429 # rate_limit_error maps to 429 + + # Check third result (expired) + result3 = json.loads(lines[2]) + assert result3["response"]["status_code"] == 400 + assert "expired" in result3["response"]["body"]["error"]["message"] + + @pytest.mark.asyncio + async def test_afile_content_missing_api_key(self, handler): + """Test file content retrieval with missing API key""" + file_content_request: FileContentRequest = { + "file_id": "batch_123", + "extra_headers": None, + "extra_body": None + } + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value=None): + with pytest.raises(ValueError, match="Missing Anthropic API Key"): + await handler.afile_content( + file_content_request=file_content_request, + api_key=None + ) + + @pytest.mark.asyncio + async def test_afile_content_missing_file_id(self, handler): + """Test file content retrieval with missing file_id""" + file_content_request: FileContentRequest = { + "file_id": None, + "extra_headers": None, + "extra_body": None + } + + with pytest.raises(ValueError, match="file_id is required"): + await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + @pytest.mark.asyncio + async def test_afile_content_http_error(self, handler): + """Test file content retrieval with HTTP error""" + file_content_request: FileContentRequest = { + "file_id": "batch_123", + "extra_headers": None, + "extra_body": None + } + + mock_response = httpx.Response( + status_code=404, + content=b"Not Found", + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123/results") + ) + mock_response.raise_for_status = MagicMock(side_effect=httpx.HTTPStatusError("Not Found", request=mock_response.request, response=mock_response)) + + with patch("litellm.llms.anthropic.files.handler.get_async_httpx_client") as mock_get_client: + mock_client = AsyncMock() + mock_client.get = AsyncMock(return_value=mock_response) + mock_get_client.return_value = mock_client + + with patch.object(handler.anthropic_model_info, "get_api_key", return_value="test-api-key"): + with patch.object(handler.anthropic_model_info, "get_api_base", return_value="https://api.anthropic.com"): + with pytest.raises(httpx.HTTPStatusError): + await handler.afile_content( + file_content_request=file_content_request, + api_key="test-api-key" + ) + + +class TestAnthropicBatchesConfig: + """Test Anthropic Batches Config for batch retrieval transformation""" + + @pytest.fixture + def config(self): + """Create AnthropicBatchesConfig instance""" + return AnthropicBatchesConfig() + + @pytest.fixture + def mock_anthropic_batch_response_in_progress(self): + """Mock Anthropic batch response with in_progress status""" + return { + "id": "batch_123", + "processing_status": "in_progress", + "created_at": "2024-01-01T00:00:00Z", + "expires_at": "2024-01-02T00:00:00Z", + "request_counts": { + "processing": 5, + "succeeded": 3, + "errored": 1, + "canceled": 0, + "expired": 0 + } + } + + @pytest.fixture + def mock_anthropic_batch_response_completed(self): + """Mock Anthropic batch response with completed status""" + return { + "id": "batch_456", + "processing_status": "ended", + "created_at": "2024-01-01T00:00:00Z", + "ended_at": "2024-01-01T12:00:00Z", + "expires_at": "2024-01-02T00:00:00Z", + "request_counts": { + "processing": 0, + "succeeded": 10, + "errored": 0, + "canceled": 0, + "expired": 0 + } + } + + @pytest.fixture + def mock_anthropic_batch_response_canceling(self): + """Mock Anthropic batch response with canceling status""" + return { + "id": "batch_789", + "processing_status": "canceling", + "created_at": "2024-01-01T00:00:00Z", + "cancel_initiated_at": "2024-01-01T06:00:00Z", + "ended_at": "2024-01-01T07:00:00Z", + "expires_at": "2024-01-02T00:00:00Z", + "request_counts": { + "processing": 0, + "succeeded": 5, + "errored": 0, + "canceled": 3, + "expired": 0 + } + } + + def test_get_retrieve_batch_url(self, config): + """Test URL construction for batch retrieval""" + url = config.get_retrieve_batch_url( + api_base="https://api.anthropic.com", + batch_id="batch_123", + optional_params={}, + litellm_params={} + ) + assert url == "https://api.anthropic.com/v1/messages/batches/batch_123" + + # Test with trailing slash + url = config.get_retrieve_batch_url( + api_base="https://api.anthropic.com/", + batch_id="batch_123", + optional_params={}, + litellm_params={} + ) + assert url == "https://api.anthropic.com/v1/messages/batches/batch_123" + + def test_transform_retrieve_batch_response_in_progress(self, config, mock_anthropic_batch_response_in_progress): + """Test transformation of in_progress batch response""" + mock_response = httpx.Response( + status_code=200, + content=json.dumps(mock_anthropic_batch_response_in_progress).encode("utf-8"), + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123") + ) + + logging_obj = MagicMock() + batch = config.transform_retrieve_batch_response( + model="claude-3-5-sonnet-20241022", + raw_response=mock_response, + logging_obj=logging_obj, + litellm_params={} + ) + + assert batch.id == "batch_123" + assert batch.object == "batch" + assert batch.status == "in_progress" + assert batch.endpoint == "/v1/messages" + assert batch.output_file_id == "batch_123" + assert batch.request_counts.total == 9 # 5 + 3 + 1 + assert batch.request_counts.completed == 3 + assert batch.request_counts.failed == 1 + assert batch.in_progress_at is not None + assert batch.completed_at is None + + def test_transform_retrieve_batch_response_completed(self, config, mock_anthropic_batch_response_completed): + """Test transformation of completed batch response""" + mock_response = httpx.Response( + status_code=200, + content=json.dumps(mock_anthropic_batch_response_completed).encode("utf-8"), + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_456") + ) + + logging_obj = MagicMock() + batch = config.transform_retrieve_batch_response( + model="claude-3-5-sonnet-20241022", + raw_response=mock_response, + logging_obj=logging_obj, + litellm_params={} + ) + + assert batch.id == "batch_456" + assert batch.status == "completed" + assert batch.completed_at is not None + assert batch.request_counts.total == 10 + assert batch.request_counts.completed == 10 + assert batch.request_counts.failed == 0 + + def test_transform_retrieve_batch_response_canceling(self, config, mock_anthropic_batch_response_canceling): + """Test transformation of canceling batch response""" + mock_response = httpx.Response( + status_code=200, + content=json.dumps(mock_anthropic_batch_response_canceling).encode("utf-8"), + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_789") + ) + + logging_obj = MagicMock() + batch = config.transform_retrieve_batch_response( + model="claude-3-5-sonnet-20241022", + raw_response=mock_response, + logging_obj=logging_obj, + litellm_params={} + ) + + assert batch.id == "batch_789" + assert batch.status == "cancelling" + assert batch.cancelling_at is not None + assert batch.cancelled_at is not None + assert batch.request_counts.total == 8 # 5 + 3 + + def test_transform_retrieve_batch_response_invalid_json(self, config): + """Test transformation with invalid JSON response""" + mock_response = httpx.Response( + status_code=200, + content=b"invalid json", + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123") + ) + + logging_obj = MagicMock() + with pytest.raises(ValueError, match="Failed to parse Anthropic batch response"): + config.transform_retrieve_batch_response( + model="claude-3-5-sonnet-20241022", + raw_response=mock_response, + logging_obj=logging_obj, + litellm_params={} + ) + + def test_transform_retrieve_batch_response_timestamp_parsing(self, config): + """Test timestamp parsing in batch response""" + batch_data = { + "id": "batch_123", + "processing_status": "ended", + "created_at": "2024-01-01T12:00:00Z", + "ended_at": "2024-01-01T13:30:45Z", + "expires_at": "2024-01-02T12:00:00Z", + "archived_at": "2024-01-03T00:00:00Z", + "request_counts": { + "processing": 0, + "succeeded": 1, + "errored": 0, + "canceled": 0, + "expired": 0 + } + } + + mock_response = httpx.Response( + status_code=200, + content=json.dumps(batch_data).encode("utf-8"), + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123") + ) + + logging_obj = MagicMock() + batch = config.transform_retrieve_batch_response( + model="claude-3-5-sonnet-20241022", + raw_response=mock_response, + logging_obj=logging_obj, + litellm_params={} + ) + + # Verify timestamps are parsed correctly + assert batch.created_at is not None + assert batch.completed_at is not None + assert batch.expires_at is not None + assert batch.expired_at is not None + + # Verify timestamps are integers (Unix timestamps) + assert isinstance(batch.created_at, int) + assert isinstance(batch.completed_at, int) + assert isinstance(batch.expires_at, int) + assert isinstance(batch.expired_at, int) + + def test_transform_retrieve_batch_response_missing_fields(self, config): + """Test transformation with missing optional fields""" + batch_data = { + "id": "batch_123", + "processing_status": "in_progress", + "request_counts": { + "processing": 1, + "succeeded": 0, + "errored": 0, + "canceled": 0, + "expired": 0 + } + } + + mock_response = httpx.Response( + status_code=200, + content=json.dumps(batch_data).encode("utf-8"), + request=httpx.Request(method="GET", url="https://api.anthropic.com/v1/messages/batches/batch_123") + ) + + logging_obj = MagicMock() + batch = config.transform_retrieve_batch_response( + model="claude-3-5-sonnet-20241022", + raw_response=mock_response, + logging_obj=logging_obj, + litellm_params={} + ) + + # Should still work with missing optional fields + assert batch.id == "batch_123" + assert batch.status == "in_progress" + assert batch.created_at is not None # Should default to current time if missing + assert batch.expires_at is None + assert batch.completed_at is None + diff --git a/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py b/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py index 59ab5068fa16..24f7107355b7 100644 --- a/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py +++ b/tests/test_litellm/proxy/pass_through_endpoints/llm_provider_handlers/test_anthropic_passthrough_logging_handler.py @@ -3,7 +3,7 @@ import sys from datetime import datetime from typing import Any, Dict, List -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -279,4 +279,303 @@ def test_cost_calculation_does_not_duplicate_provider_prefix( mock_completion_cost.assert_called_once() call_kwargs = mock_completion_cost.call_args[1] assert call_kwargs["model"] == "azure_ai/claude-sonnet-4-5_gb_20250929" - assert call_kwargs["custom_llm_provider"] == "azure_ai" \ No newline at end of file + assert call_kwargs["custom_llm_provider"] == "azure_ai" + + +class TestAnthropicBatchPassthroughCostTracking: + """Test cases for Anthropic batch passthrough cost tracking functionality""" + + @pytest.fixture + def mock_httpx_response(self): + """Mock httpx response for batch job creation""" + mock_response = MagicMock() + mock_response.status_code = 200 + mock_response.json.return_value = { + "id": "msgbatch_01Wj7gkQk7gn4MpAKR8ZEDU2", + "archived_at": None, + "cancel_initiated_at": None, + "created_at": "2024-08-20T18:37:24.100435Z", + "ended_at": None, + "expires_at": "2024-08-21T18:37:24.100435Z", + "processing_status": "in_progress", + "request_counts": { + "canceled": 0, + "errored": 0, + "expired": 0, + "processing": 1, + "succeeded": 0 + }, + "results_url": "https://api.anthropic.com/v1/messages/batches/msgbatch_01Wj7gkQk7gn4MpAKR8ZEDU2/results", + "type": "message_batch" + } + return mock_response + + @pytest.fixture + def mock_logging_obj(self): + """Mock logging object""" + mock = MagicMock() + mock.litellm_call_id = "test-call-id-123" + mock.model_call_details = {} + mock.model = None + return mock + + @pytest.fixture + def mock_request_body(self): + """Mock request body for batch creation""" + return { + "requests": [ + { + "custom_id": "my-custom-id-1", + "params": { + "max_tokens": 1024, + "messages": [ + { + "content": "Hello, world", + "role": "user" + } + ], + "model": "claude-sonnet-4-5-20250929" + } + } + ] + } + + @patch('litellm.proxy.pass_through_endpoints.llm_provider_handlers.anthropic_passthrough_logging_handler.AnthropicPassthroughLoggingHandler._store_batch_managed_object') + @patch('litellm.proxy.pass_through_endpoints.llm_provider_handlers.anthropic_passthrough_logging_handler.AnthropicPassthroughLoggingHandler.get_actual_model_id_from_router') + @patch('litellm.llms.anthropic.batches.transformation.AnthropicBatchesConfig') + def test_batch_creation_handler_success( + self, + mock_batches_config, + mock_get_model_id, + mock_store_batch, + mock_httpx_response, + mock_logging_obj, + mock_request_body + ): + """Test successful batch creation and managed object storage""" + from litellm.types.utils import LiteLLMBatch + + # Setup mocks + mock_get_model_id.return_value = "claude-sonnet-4-5-20250929" + + mock_batch_response = LiteLLMBatch( + id="msgbatch_01Wj7gkQk7gn4MpAKR8ZEDU2", + object="batch", + endpoint="/v1/messages", + errors=None, + input_file_id="None", + completion_window="24h", + status="validating", + output_file_id="msgbatch_01Wj7gkQk7gn4MpAKR8ZEDU2", + error_file_id=None, + created_at=1704067200, + in_progress_at=1704067200, + expires_at=1704153600, + finalizing_at=None, + completed_at=None, + failed_at=None, + expired_at=None, + cancelling_at=None, + cancelled_at=None, + request_counts={"total": 1, "completed": 0, "failed": 0}, + metadata={}, + ) + + mock_batches_config_instance = MagicMock() + mock_batches_config_instance.transform_retrieve_batch_response.return_value = mock_batch_response + mock_batches_config.return_value = mock_batches_config_instance + + # Test the handler + result = AnthropicPassthroughLoggingHandler.batch_creation_handler( + httpx_response=mock_httpx_response, + logging_obj=mock_logging_obj, + url_route="https://api.anthropic.com/v1/messages/batches", + result="success", + start_time=datetime.now(), + end_time=datetime.now(), + cache_hit=False, + request_body=mock_request_body, + ) + + # Verify the result + assert result is not None + assert "result" in result + assert "kwargs" in result + # Model should be extracted from request body + assert result["kwargs"]["model"] == "claude-sonnet-4-5-20250929" + assert result["kwargs"]["batch_id"] == "msgbatch_01Wj7gkQk7gn4MpAKR8ZEDU2" + assert result["kwargs"]["batch_job_state"] == "in_progress" + assert "unified_object_id" in result["kwargs"] + + # Verify batch was stored + mock_store_batch.assert_called_once() + call_kwargs = mock_store_batch.call_args[1] + assert call_kwargs["model_object_id"] == "msgbatch_01Wj7gkQk7gn4MpAKR8ZEDU2" + assert call_kwargs["batch_object"].status == "validating" + + # Verify the response object + assert result["result"].model == "claude-sonnet-4-5-20250929" + assert result["result"].object == "batch" + + @patch('litellm.proxy.pass_through_endpoints.llm_provider_handlers.anthropic_passthrough_logging_handler.AnthropicPassthroughLoggingHandler._store_batch_managed_object') + @patch('litellm.proxy.pass_through_endpoints.llm_provider_handlers.anthropic_passthrough_logging_handler.AnthropicPassthroughLoggingHandler.get_actual_model_id_from_router') + def test_batch_creation_handler_model_extraction_from_nested_request( + self, + mock_get_model_id, + mock_store_batch, + mock_httpx_response, + mock_logging_obj + ): + """Test that model is correctly extracted from nested request structure""" + from litellm.llms.anthropic.batches.transformation import AnthropicBatchesConfig + from litellm.types.utils import LiteLLMBatch + + # Setup mocks + mock_get_model_id.return_value = "claude-sonnet-4-5-20250929" + + mock_batch_response = LiteLLMBatch( + id="msgbatch_123", + object="batch", + endpoint="/v1/messages", + input_file_id="None", + completion_window="24h", + status="validating", + created_at=1704067200, + request_counts={"total": 1, "completed": 0, "failed": 0}, + ) + + with patch.object(AnthropicBatchesConfig, 'transform_retrieve_batch_response', return_value=mock_batch_response): + # Request body with nested model in requests[0].params.model + request_body = { + "requests": [ + { + "custom_id": "test-1", + "params": { + "model": "claude-sonnet-4-5-20250929", + "messages": [{"role": "user", "content": "test"}] + } + } + ] + } + + result = AnthropicPassthroughLoggingHandler.batch_creation_handler( + httpx_response=mock_httpx_response, + logging_obj=mock_logging_obj, + url_route="https://api.anthropic.com/v1/messages/batches", + result="success", + start_time=datetime.now(), + end_time=datetime.now(), + cache_hit=False, + request_body=request_body, + ) + + # Verify model was extracted correctly + assert result["kwargs"]["model"] == "claude-sonnet-4-5-20250929" + + @patch('litellm.proxy.pass_through_endpoints.llm_provider_handlers.anthropic_passthrough_logging_handler.AnthropicPassthroughLoggingHandler.get_actual_model_id_from_router') + def test_batch_creation_handler_model_prefix_when_not_in_router( + self, + mock_get_model_id, + mock_httpx_response, + mock_logging_obj, + mock_request_body + ): + """Test that model gets 'anthropic/' prefix when not found in router""" + from litellm.llms.anthropic.batches.transformation import AnthropicBatchesConfig + from litellm.types.utils import LiteLLMBatch + import base64 + + # Model not in router - returns same model name + mock_get_model_id.return_value = "claude-sonnet-4-5-20250929" + + mock_batch_response = LiteLLMBatch( + id="msgbatch_123", + object="batch", + endpoint="/v1/messages", + input_file_id="None", + completion_window="24h", + status="validating", + created_at=1704067200, + request_counts={"total": 1, "completed": 0, "failed": 0}, + ) + + with patch.object(AnthropicBatchesConfig, 'transform_retrieve_batch_response', return_value=mock_batch_response): + with patch.object(AnthropicPassthroughLoggingHandler, '_store_batch_managed_object'): + result = AnthropicPassthroughLoggingHandler.batch_creation_handler( + httpx_response=mock_httpx_response, + logging_obj=mock_logging_obj, + url_route="https://api.anthropic.com/v1/messages/batches", + result="success", + start_time=datetime.now(), + end_time=datetime.now(), + cache_hit=False, + request_body=mock_request_body, + ) + + # Verify unified_object_id contains anthropic/ prefix + unified_object_id = result["kwargs"]["unified_object_id"] + decoded = base64.urlsafe_b64decode(unified_object_id + "==").decode() + assert "anthropic/claude-sonnet-4-5-20250929" in decoded or "claude-sonnet-4-5-20250929" in decoded + + def test_batch_creation_handler_failure_status_code( + self, + mock_logging_obj, + mock_request_body + ): + """Test batch creation handler with non-200 status code""" + mock_response = MagicMock() + mock_response.status_code = 400 + mock_response.json.return_value = {"error": "Bad request"} + + result = AnthropicPassthroughLoggingHandler.batch_creation_handler( + httpx_response=mock_response, + logging_obj=mock_logging_obj, + url_route="https://api.anthropic.com/v1/messages/batches", + result="error", + start_time=datetime.now(), + end_time=datetime.now(), + cache_hit=False, + request_body=mock_request_body, + ) + + # Verify error response + assert result is not None + assert result["kwargs"]["batch_job_state"] == "failed" + assert result["kwargs"]["response_cost"] == 0.0 + + @patch('litellm.proxy.proxy_server.proxy_logging_obj') + def test_store_batch_managed_object_success( + self, + mock_proxy_logging_obj, + mock_logging_obj + ): + """Test storing batch managed object""" + from litellm.types.utils import LiteLLMBatch + + # Setup mocks + mock_managed_files_hook = MagicMock() + mock_managed_files_hook.store_unified_object_id = AsyncMock() + mock_proxy_logging_obj.get_proxy_hook.return_value = mock_managed_files_hook + + batch_object = LiteLLMBatch( + id="msgbatch_123", + object="batch", + endpoint="/v1/messages", + input_file_id="None", + completion_window="24h", + status="validating", + created_at=1704067200, + request_counts={"total": 1, "completed": 0, "failed": 0}, + ) + + with patch('asyncio.create_task'): + AnthropicPassthroughLoggingHandler._store_batch_managed_object( + unified_object_id="test-unified-id", + batch_object=batch_object, + model_object_id="msgbatch_123", + logging_obj=mock_logging_obj, + user_id="test-user" + ) + + # Verify managed files hook was called + mock_proxy_logging_obj.get_proxy_hook.assert_called_once_with("managed_files") \ No newline at end of file