Skip to content

Commit 8942053

Browse files
authored
Merge pull request #17700 from BerriAI/litellm_batches_passthrough_cost_tracking
Add anthropic retrieve batches and retreive file content support
2 parents 1107feb + 0f99517 commit 8942053

File tree

14 files changed

+2092
-27
lines changed

14 files changed

+2092
-27
lines changed

docs/my-website/docs/pass_through/anthropic_completion.md

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ Pass-through endpoints for Anthropic - call provider-specific endpoint, in nativ
77

88
| Feature | Supported | Notes |
99
|-------|-------|-------|
10-
| Cost Tracking || supports all models on `/messages` endpoint |
10+
| Cost Tracking || supports all models on `/messages`, `/v1/messages/batches` endpoint |
1111
| Logging || works across all integrations |
1212
| End-user Tracking || disable prometheus tracking via `litellm.disable_end_user_cost_tracking_prometheus_only`|
1313
| Streaming || |
@@ -263,6 +263,19 @@ curl https://api.anthropic.com/v1/messages/batches \
263263
}'
264264
```
265265

266+
:::note Configuration Required for Batch Cost Tracking
267+
For batch passthrough cost tracking to work properly, you need to define the Anthropic model in your `proxy_config.yaml`:
268+
269+
```yaml
270+
model_list:
271+
- model_name: claude-sonnet-4-5-20250929 # or any alias
272+
litellm_params:
273+
model: anthropic/claude-sonnet-4-5-20250929
274+
api_key: os.environ/ANTHROPIC_API_KEY
275+
```
276+
277+
This ensures the polling mechanism can correctly identify the provider and retrieve batch status for cost calculation.
278+
:::
266279
267280
## Advanced
268281

litellm/batches/batch_utils.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414

1515
async def calculate_batch_cost_and_usage(
1616
file_content_dictionary: List[dict],
17-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"],
17+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"],
1818
model_name: Optional[str] = None,
1919
) -> Tuple[float, Usage, List[str]]:
2020
"""
@@ -37,7 +37,7 @@ async def calculate_batch_cost_and_usage(
3737

3838
async def _handle_completed_batch(
3939
batch: Batch,
40-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"],
40+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"],
4141
model_name: Optional[str] = None,
4242
) -> Tuple[float, Usage, List[str]]:
4343
"""Helper function to process a completed batch and handle logging"""
@@ -84,7 +84,7 @@ def _get_batch_models_from_file_content(
8484

8585
def _batch_cost_calculator(
8686
file_content_dictionary: List[dict],
87-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
87+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
8888
model_name: Optional[str] = None,
8989
) -> float:
9090
"""
@@ -186,7 +186,7 @@ def calculate_vertex_ai_batch_cost_and_usage(
186186

187187
async def _get_batch_output_file_content_as_dictionary(
188188
batch: Batch,
189-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
189+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
190190
) -> List[dict]:
191191
"""
192192
Get the batch output file content as a list of dictionaries
@@ -225,7 +225,7 @@ def _get_file_content_as_dictionary(file_content: bytes) -> List[dict]:
225225

226226
def _get_batch_job_cost_from_file_content(
227227
file_content_dictionary: List[dict],
228-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
228+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
229229
) -> float:
230230
"""
231231
Get the cost of a batch job from the file content
@@ -253,7 +253,7 @@ def _get_batch_job_cost_from_file_content(
253253

254254
def _get_batch_job_total_usage_from_file_content(
255255
file_content_dictionary: List[dict],
256-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm"] = "openai",
256+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "hosted_vllm", "anthropic"] = "openai",
257257
model_name: Optional[str] = None,
258258
) -> Usage:
259259
"""
@@ -332,4 +332,4 @@ def _batch_response_was_successful(batch_job_output_file: dict) -> bool:
332332
Check if the batch job response status == 200
333333
"""
334334
_response: dict = batch_job_output_file.get("response", None) or {}
335-
return _response.get("status_code", None) == 200
335+
return _response.get("status_code", None) == 200

litellm/batches/main.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import litellm
2323
from litellm._logging import verbose_logger
2424
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
25+
from litellm.llms.anthropic.batches.handler import AnthropicBatchesHandler
2526
from litellm.llms.azure.batches.handler import AzureBatchesAPI
2627
from litellm.llms.bedrock.batches.handler import BedrockBatchesHandler
2728
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
@@ -53,6 +54,7 @@
5354
openai_batches_instance = OpenAIBatchesAPI()
5455
azure_batches_instance = AzureBatchesAPI()
5556
vertex_ai_batches_instance = VertexAIBatchPrediction(gcs_bucket_name="")
57+
anthropic_batches_instance = AnthropicBatchesHandler()
5658
base_llm_http_handler = BaseLLMHTTPHandler()
5759
#################################################
5860

@@ -355,7 +357,7 @@ def create_batch(
355357
@client
356358
async def aretrieve_batch(
357359
batch_id: str,
358-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
360+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
359361
metadata: Optional[Dict[str, str]] = None,
360362
extra_headers: Optional[Dict[str, str]] = None,
361363
extra_body: Optional[Dict[str, str]] = None,
@@ -401,7 +403,7 @@ def _handle_retrieve_batch_providers_without_provider_config(
401403
litellm_params: dict,
402404
_retrieve_batch_request: RetrieveBatchRequest,
403405
_is_async: bool,
404-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
406+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
405407
):
406408
api_base: Optional[str] = None
407409
if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS:
@@ -498,6 +500,27 @@ def _handle_retrieve_batch_providers_without_provider_config(
498500
timeout=timeout,
499501
max_retries=optional_params.max_retries,
500502
)
503+
elif custom_llm_provider == "anthropic":
504+
api_base = (
505+
optional_params.api_base
506+
or litellm.api_base
507+
or get_secret_str("ANTHROPIC_API_BASE")
508+
)
509+
api_key = (
510+
optional_params.api_key
511+
or litellm.api_key
512+
or litellm.azure_key
513+
or get_secret_str("ANTHROPIC_API_KEY")
514+
)
515+
516+
response = anthropic_batches_instance.retrieve_batch(
517+
_is_async=_is_async,
518+
batch_id=batch_id,
519+
api_base=api_base,
520+
api_key=api_key,
521+
timeout=timeout,
522+
max_retries=optional_params.max_retries,
523+
)
501524
else:
502525
raise litellm.exceptions.BadRequestError(
503526
message="LiteLLM doesn't support {} for 'create_batch'. Only 'openai' is supported.".format(
@@ -517,7 +540,7 @@ def _handle_retrieve_batch_providers_without_provider_config(
517540
@client
518541
def retrieve_batch(
519542
batch_id: str,
520-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
543+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
521544
metadata: Optional[Dict[str, str]] = None,
522545
extra_headers: Optional[Dict[str, str]] = None,
523546
extra_body: Optional[Dict[str, str]] = None,
@@ -608,7 +631,7 @@ def retrieve_batch(
608631
api_key=optional_params.api_key,
609632
logging_obj=litellm_logging_obj
610633
or LiteLLMLoggingObj(
611-
model=model or "bedrock/unknown",
634+
model=model or f"{custom_llm_provider}/unknown",
612635
messages=[],
613636
stream=False,
614637
call_type="batch_retrieve",

litellm/files/main.py

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from litellm import get_secret_str
1818
from litellm.litellm_core_utils.get_llm_provider_logic import get_llm_provider
1919
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
20+
from litellm.llms.anthropic.files.handler import AnthropicFilesHandler
2021
from litellm.llms.azure.files.handler import AzureOpenAIFilesAPI
2122
from litellm.llms.bedrock.files.handler import BedrockFilesHandler
2223
from litellm.llms.custom_httpx.http_handler import AsyncHTTPHandler, HTTPHandler
@@ -49,6 +50,7 @@
4950
azure_files_instance = AzureOpenAIFilesAPI()
5051
vertex_ai_files_instance = VertexAIFilesHandler()
5152
bedrock_files_instance = BedrockFilesHandler()
53+
anthropic_files_instance = AnthropicFilesHandler()
5254
#################################################
5355

5456

@@ -757,7 +759,7 @@ def file_list(
757759
@client
758760
async def afile_content(
759761
file_id: str,
760-
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"] = "openai",
762+
custom_llm_provider: Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"] = "openai",
761763
extra_headers: Optional[Dict[str, str]] = None,
762764
extra_body: Optional[Dict[str, str]] = None,
763765
**kwargs,
@@ -802,7 +804,7 @@ def file_content(
802804
file_id: str,
803805
model: Optional[str] = None,
804806
custom_llm_provider: Optional[
805-
Union[Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm"], str]
807+
Union[Literal["openai", "azure", "vertex_ai", "bedrock", "hosted_vllm", "anthropic"], str]
806808
] = None,
807809
extra_headers: Optional[Dict[str, str]] = None,
808810
extra_body: Optional[Dict[str, str]] = None,
@@ -849,6 +851,18 @@ def file_content(
849851

850852
_is_async = kwargs.pop("afile_content", False) is True
851853

854+
# Check if this is an Anthropic batch results request
855+
if custom_llm_provider == "anthropic":
856+
response = anthropic_files_instance.file_content(
857+
_is_async=_is_async,
858+
file_content_request=_file_content_request,
859+
api_base=optional_params.api_base,
860+
api_key=optional_params.api_key,
861+
timeout=timeout,
862+
max_retries=optional_params.max_retries,
863+
)
864+
return response
865+
852866
if custom_llm_provider in OPENAI_COMPATIBLE_BATCH_AND_FILES_PROVIDERS:
853867
# for deepinfra/perplexity/anyscale/groq we check in get_llm_provider and pass in the api base from there
854868
api_base = (
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from .handler import AnthropicBatchesHandler
2+
from .transformation import AnthropicBatchesConfig
3+
4+
__all__ = ["AnthropicBatchesHandler", "AnthropicBatchesConfig"]
5+
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""
2+
Anthropic Batches API Handler
3+
"""
4+
5+
import asyncio
6+
from typing import TYPE_CHECKING, Any, Coroutine, Optional, Union
7+
8+
import httpx
9+
10+
from litellm.llms.custom_httpx.http_handler import (
11+
get_async_httpx_client,
12+
)
13+
from litellm.types.utils import LiteLLMBatch, LlmProviders
14+
15+
if TYPE_CHECKING:
16+
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObj
17+
else:
18+
LiteLLMLoggingObj = Any
19+
20+
from ..common_utils import AnthropicModelInfo
21+
from .transformation import AnthropicBatchesConfig
22+
23+
24+
class AnthropicBatchesHandler:
25+
"""
26+
Handler for Anthropic Message Batches API.
27+
28+
Supports:
29+
- retrieve_batch() - Retrieve batch status and information
30+
"""
31+
32+
def __init__(self):
33+
self.anthropic_model_info = AnthropicModelInfo()
34+
self.provider_config = AnthropicBatchesConfig()
35+
36+
async def aretrieve_batch(
37+
self,
38+
batch_id: str,
39+
api_base: Optional[str],
40+
api_key: Optional[str],
41+
timeout: Union[float, httpx.Timeout],
42+
max_retries: Optional[int],
43+
logging_obj: Optional[LiteLLMLoggingObj] = None,
44+
) -> LiteLLMBatch:
45+
"""
46+
Async: Retrieve a batch from Anthropic.
47+
48+
Args:
49+
batch_id: The batch ID to retrieve
50+
api_base: Anthropic API base URL
51+
api_key: Anthropic API key
52+
timeout: Request timeout
53+
max_retries: Max retry attempts (unused for now)
54+
logging_obj: Optional logging object
55+
56+
Returns:
57+
LiteLLMBatch: Batch information in OpenAI format
58+
"""
59+
# Resolve API credentials
60+
api_base = api_base or self.anthropic_model_info.get_api_base(api_base)
61+
api_key = api_key or self.anthropic_model_info.get_api_key()
62+
63+
if not api_key:
64+
raise ValueError("Missing Anthropic API Key")
65+
66+
# Create a minimal logging object if not provided
67+
if logging_obj is None:
68+
from litellm.litellm_core_utils.litellm_logging import Logging as LiteLLMLoggingObjClass
69+
logging_obj = LiteLLMLoggingObjClass(
70+
model="anthropic/unknown",
71+
messages=[],
72+
stream=False,
73+
call_type="batch_retrieve",
74+
start_time=None,
75+
litellm_call_id=f"batch_retrieve_{batch_id}",
76+
function_id="batch_retrieve",
77+
)
78+
79+
# Get the complete URL for batch retrieval
80+
retrieve_url = self.provider_config.get_retrieve_batch_url(
81+
api_base=api_base,
82+
batch_id=batch_id,
83+
optional_params={},
84+
litellm_params={},
85+
)
86+
87+
# Validate environment and get headers
88+
headers = self.provider_config.validate_environment(
89+
headers={},
90+
model="",
91+
messages=[],
92+
optional_params={},
93+
litellm_params={},
94+
api_key=api_key,
95+
api_base=api_base,
96+
)
97+
98+
logging_obj.pre_call(
99+
input=batch_id,
100+
api_key=api_key,
101+
additional_args={
102+
"api_base": retrieve_url,
103+
"headers": headers,
104+
"complete_input_dict": {},
105+
},
106+
)
107+
# Make the request
108+
async_client = get_async_httpx_client(llm_provider=LlmProviders.ANTHROPIC)
109+
response = await async_client.get(
110+
url=retrieve_url,
111+
headers=headers
112+
)
113+
response.raise_for_status()
114+
115+
# Transform response to LiteLLM format
116+
return self.provider_config.transform_retrieve_batch_response(
117+
model=None,
118+
raw_response=response,
119+
logging_obj=logging_obj,
120+
litellm_params={},
121+
)
122+
123+
def retrieve_batch(
124+
self,
125+
_is_async: bool,
126+
batch_id: str,
127+
api_base: Optional[str],
128+
api_key: Optional[str],
129+
timeout: Union[float, httpx.Timeout],
130+
max_retries: Optional[int],
131+
logging_obj: Optional[LiteLLMLoggingObj] = None,
132+
) -> Union[LiteLLMBatch, Coroutine[Any, Any, LiteLLMBatch]]:
133+
"""
134+
Retrieve a batch from Anthropic.
135+
136+
Args:
137+
_is_async: Whether to run asynchronously
138+
batch_id: The batch ID to retrieve
139+
api_base: Anthropic API base URL
140+
api_key: Anthropic API key
141+
timeout: Request timeout
142+
max_retries: Max retry attempts (unused for now)
143+
logging_obj: Optional logging object
144+
145+
Returns:
146+
LiteLLMBatch or Coroutine: Batch information in OpenAI format
147+
"""
148+
if _is_async:
149+
return self.aretrieve_batch(
150+
batch_id=batch_id,
151+
api_base=api_base,
152+
api_key=api_key,
153+
timeout=timeout,
154+
max_retries=max_retries,
155+
logging_obj=logging_obj,
156+
)
157+
else:
158+
return asyncio.run(
159+
self.aretrieve_batch(
160+
batch_id=batch_id,
161+
api_base=api_base,
162+
api_key=api_key,
163+
timeout=timeout,
164+
max_retries=max_retries,
165+
logging_obj=logging_obj,
166+
)
167+
)
168+

0 commit comments

Comments
 (0)