From d4be2d133c90a126ba172a311555785be891ac9e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=8E=E7=9B=B4?= Date: Tue, 27 Jan 2026 17:13:25 +0800 Subject: [PATCH 1/2] Add OpenTelemetry instrumentation for litellm --- .../LICENSE | 201 ++++++++ .../README.rst | 67 +++ .../pyproject.toml | 53 ++ .../instrumentation/litellm/__init__.py | 213 ++++++++ .../litellm/_embedding_wrapper.py | 190 +++++++ .../litellm/_stream_wrapper.py | 235 +++++++++ .../instrumentation/litellm/_utils.py | 439 ++++++++++++++++ .../instrumentation/litellm/_wrapper.py | 469 ++++++++++++++++++ .../instrumentation/litellm/package.py | 17 + .../instrumentation/litellm/version.py | 15 + .../test-requirements.txt | 8 + 11 files changed, 1907 insertions(+) create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE new file mode 100644 index 000000000..261eeb9e9 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/LICENSE @@ -0,0 +1,201 @@ + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright [yyyy] [name of copyright owner] + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst new file mode 100644 index 000000000..056ca427d --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/README.rst @@ -0,0 +1,67 @@ +OpenTelemetry LiteLLM Instrumentation +====================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/opentelemetry-instrumentation-litellm.svg + :target: https://pypi.org/project/opentelemetry-instrumentation-litellm/ + +This library provides automatic instrumentation for the +`LiteLLM `_ library, which provides +a unified interface to 100+ LLM providers. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-litellm + +Configuration +------------- + +The instrumentation can be enabled/disabled using environment variables: + +* ``ARMS_LITELLM_INSTRUMENTATION_ENABLED``: Enable/disable instrumentation (default: true) + +Usage +----- + +.. code:: python + + from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + import litellm + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument() + + # Use LiteLLM as normal + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello!"}] + ) + +Features +-------- + +This instrumentation automatically captures: + +* LLM completion calls (sync and async) +* Streaming completions +* Embedding calls +* Image generation calls +* Retry mechanisms +* Tool/function calls +* Request and response metadata +* Token usage +* Model information + +The instrumentation follows OpenTelemetry semantic conventions for GenAI operations. + +References +---------- + +* `OpenTelemetry LiteLLM Instrumentation `_ +* `OpenTelemetry Project `_ +* `LiteLLM Documentation `_ + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml new file mode 100644 index 000000000..13d8fe7ec --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/pyproject.toml @@ -0,0 +1,53 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "opentelemetry-instrumentation-litellm" +dynamic = ["version"] +description = "OpenTelemetry LiteLLM instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.8" +authors = [ + { name = "OpenTelemetry Authors", email = "cncf-opentelemetry-contributors@lists.cncf.io" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [] + +[project.optional-dependencies] +instruments = [ + "litellm >= 1.0.0", +] + +[project.entry-points.opentelemetry_instrumentor] +litellm = "opentelemetry.instrumentation.litellm:LiteLLMInstrumentor" + +[project.urls] +Homepage = "https://github.com/open-telemetry/opentelemetry-python-contrib/tree/main/instrumentation/opentelemetry-instrumentation-litellm" +Repository = "https://github.com/open-telemetry/opentelemetry-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/litellm/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py new file mode 100644 index 000000000..abcf7b5c2 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/__init__.py @@ -0,0 +1,213 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +OpenTelemetry LiteLLM Instrumentation +====================================== + +This library provides automatic instrumentation for the LiteLLM library. + +Installation +------------ + +:: + + pip install opentelemetry-instrumentation-litellm + +Configuration +------------- + +The instrumentation can be configured using environment variables: + +* ``ENABLE_LITELLM_INSTRUMENTOR``: Enable/disable instrumentation (default: true) +* ``ARMS_LITELLM_INSTRUMENTATION_ENABLED``: Alternative enable/disable flag (default: true) + +Usage +----- + +.. code:: python + + from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor + import litellm + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument() + + # Use LiteLLM as normal + response = litellm.completion( + model="gpt-3.5-turbo", + messages=[{"role": "user", "content": "Hello!"}] + ) + +API +--- +""" + +import logging +from typing import Any, Callable, Collection, Dict + +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor + +# Import wrappers +from opentelemetry.instrumentation.litellm._embedding_wrapper import ( + AsyncEmbeddingWrapper, + EmbeddingWrapper, +) +from opentelemetry.instrumentation.litellm._wrapper import ( + AsyncCompletionWrapper, + CompletionWrapper, +) +from opentelemetry.instrumentation.litellm.package import _instruments + +# Import ExtendedTelemetryHandler +from opentelemetry.util.genai.extended_handler import ( + ExtendedTelemetryHandler, +) + +try: + import litellm +except ImportError: + litellm = None + + +logger = logging.getLogger(__name__) + +__all__ = ["LiteLLMInstrumentor"] + + +class LiteLLMInstrumentor(BaseInstrumentor): + """ + An instrumentor for the LiteLLM library. + + This class provides automatic instrumentation for LiteLLM, including: + - Chat completion calls (sync and async) + - Streaming completions + - Embedding calls + - Retry mechanisms + - Tool/function calls + """ + + def __init__(self): + super().__init__() + self._original_functions: Dict[str, Callable] = {} + self._handler = None + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs: Any): + """ + Instrument the LiteLLM library. + + This method sets up instrumentation for all LiteLLM functions, + including completion, embedding, and retry functions. + """ + super()._instrument(**kwargs) + + if litellm is None: + logger.warning("LiteLLM not found, skipping instrumentation") + return + + # Get providers + tracer_provider = kwargs.get("tracer_provider") + meter_provider = kwargs.get("meter_provider") + logger_provider = kwargs.get("logger_provider") + + # Create unified Handler + self._handler = ExtendedTelemetryHandler( + tracer_provider=tracer_provider, + meter_provider=meter_provider, + logger_provider=logger_provider, + ) + + # Save original functions + functions_to_wrap = [ + "completion", + "acompletion", + "embedding", + "aembedding", + "completion_with_retries", + "acompletion_with_retries", + ] + + for func_name in functions_to_wrap: + if hasattr(litellm, func_name): + self._original_functions[func_name] = getattr( + litellm, func_name + ) + + # Wrap functions + if "completion" in self._original_functions: + completion_wrapper = CompletionWrapper( + self._handler, self._original_functions["completion"] + ) + litellm.completion = completion_wrapper + + if "acompletion" in self._original_functions: + async_completion_wrapper = AsyncCompletionWrapper( + self._handler, self._original_functions["acompletion"] + ) + litellm.acompletion = async_completion_wrapper + + if "embedding" in self._original_functions: + litellm.embedding = EmbeddingWrapper( + self._handler, self._original_functions["embedding"] + ) + + if "aembedding" in self._original_functions: + litellm.aembedding = AsyncEmbeddingWrapper( + self._handler, self._original_functions["aembedding"] + ) + + # Wrap retry functions to use our wrapped completion functions + # Note: LiteLLM's retry functions internally reference the completion function at definition time, + # so we need to recreate them to use our wrapped versions + if "completion_with_retries" in self._original_functions: + # Create a new retry wrapper that calls our wrapped completion + def completion_with_retries_wrapper(*args, **kwargs): + # Use the wrapped completion function + return litellm.completion(*args, **kwargs) + + litellm.completion_with_retries = completion_with_retries_wrapper + + if "acompletion_with_retries" in self._original_functions: + # Create a new async retry wrapper that calls our wrapped acompletion + async def acompletion_with_retries_wrapper(*args, **kwargs): + # Use the wrapped acompletion function + return await litellm.acompletion(*args, **kwargs) + + litellm.acompletion_with_retries = acompletion_with_retries_wrapper + + logger.info("LiteLLM instrumentation enabled") + + def _uninstrument(self, **kwargs: Any): + """ + Uninstrument the LiteLLM library. + This method removes all instrumentation and restores + original LiteLLM functions. + """ + if litellm is None: + logger.warning("LiteLLM not found, skipping uninstrumentation") + return + + # Restore original functions + for func_name, original_func in self._original_functions.items(): + if hasattr(litellm, func_name): + setattr(litellm, func_name, original_func) + + # Clear saved functions + self._original_functions.clear() + self._handler = None + + logger.info("LiteLLM instrumentation disabled") diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py new file mode 100644 index 000000000..bbfb55008 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py @@ -0,0 +1,190 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Embedding wrapper for LiteLLM instrumentation. +""" + +import logging +import os +from typing import Callable + +from opentelemetry import context +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.instrumentation.litellm._utils import ( + SUPPRESS_LLM_SDK_KEY, + create_embedding_invocation_from_litellm, +) +from opentelemetry.trace import get_current_span +from opentelemetry.util.genai.types import Error + +logger = logging.getLogger(__name__) + + +def _is_instrumentation_enabled() -> bool: + """Check if instrumentation is enabled via environment variable.""" + enabled = os.getenv("ARMS_LITELLM_INSTRUMENTATION_ENABLED", "true").lower() + return enabled != "false" + + +class EmbeddingWrapper: + """Wrapper for litellm.embedding()""" + + def __init__(self, handler, original_func: Callable): + self._handler = handler + self.original_func = original_func + + def __call__(self, *args, **kwargs): + """Wrap litellm.embedding()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed + if context.get_value(SUPPRESS_LLM_SDK_KEY): + if get_current_span().get_span_context().is_valid: + return self.original_func(*args, **kwargs) + + # Create invocation object + + # Create invocation object + invocation = create_embedding_invocation_from_litellm(**kwargs) + + # Set SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + pass + + # Start Embedding invocation + self._handler.start_embedding(invocation) + + try: + # Call original function + response = self.original_func(*args, **kwargs) + + # Extract response metadata + if hasattr(response, "model"): + invocation.response_model_name = response.model + + # Extract token usage if available + if hasattr(response, "usage") and response.usage: + invocation.input_tokens = getattr( + response.usage, "prompt_tokens", None + ) + invocation.output_tokens = getattr( + response.usage, "total_tokens", None + ) + + # End Embedding invocation successfully + self._handler.stop_embedding(invocation) + + return response + + except Exception as e: + # Fail Embedding invocation + self._handler.fail_embedding( + invocation, Error(message=str(e), type=type(e)) + ) + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + + +class AsyncEmbeddingWrapper: + """Wrapper for litellm.aembedding()""" + + def __init__(self, handler, original_func: Callable): + self._handler = handler + self.original_func = original_func + + async def __call__(self, *args, **kwargs): + """Wrap litellm.aembedding()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return await self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed + if context.get_value(SUPPRESS_LLM_SDK_KEY): + if get_current_span().get_span_context().is_valid: + return await self.original_func(*args, **kwargs) + + # Create invocation object + + # Create invocation object + invocation = create_embedding_invocation_from_litellm(**kwargs) + + # Set SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + pass + + # Start Embedding invocation + self._handler.start_embedding(invocation) + + try: + # Call original function + response = await self.original_func(*args, **kwargs) + + # Extract response metadata + if hasattr(response, "model"): + invocation.response_model_name = response.model + + # Extract token usage if available + if hasattr(response, "usage") and response.usage: + invocation.input_tokens = getattr( + response.usage, "prompt_tokens", None + ) + invocation.output_tokens = getattr( + response.usage, "total_tokens", None + ) + + # End Embedding invocation successfully + self._handler.stop_embedding(invocation) + + return response + + except Exception as e: + # Fail Embedding invocation + self._handler.fail_embedding( + invocation, Error(message=str(e), type=type(e)) + ) + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py new file mode 100644 index 000000000..050fa2552 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_stream_wrapper.py @@ -0,0 +1,235 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Stream wrapper for LiteLLM streaming responses. +""" + +import logging +from typing import Any, Iterator, Optional + +logger = logging.getLogger(__name__) + + +class StreamWrapper: + """ + Wrapper for synchronous streaming responses. + Note: To avoid memory leaks, we only keep the last chunk instead of all chunks. + This is sufficient for extracting usage information which is typically in the last chunk. + + Supports context manager protocol for reliable cleanup. + """ + + def __init__(self, stream: Iterator, span: Any, callback: callable): + self.stream = stream + self.span = span + self.callback = callback + self.last_chunk = None # Only keep last chunk to avoid memory leak + self.chunk_count = 0 + self._finalized = False + self.accumulated_content = [] # Accumulate content for output messages + self.accumulated_tool_calls = [] # Accumulate tool calls + + def __iter__(self): + return self + + def __next__(self): + try: + chunk = next(self.stream) + + # Accumulate content from delta for output messages + if hasattr(chunk, "choices") and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, "delta"): + delta = choice.delta + # Accumulate text content + if hasattr(delta, "content") and delta.content: + self.accumulated_content.append(delta.content) + # Accumulate tool calls + if hasattr(delta, "tool_calls") and delta.tool_calls: + self.accumulated_tool_calls.extend(delta.tool_calls) + + # Only keep the last chunk (contains usage info) + self.last_chunk = chunk + self.chunk_count += 1 + + return chunk + except StopIteration: + # Stream ended normally, finalize span + self._finalize() + raise + except Exception as e: + # Error during streaming + logger.debug(f"Error during streaming: {e}") + self._finalize(error=e) + raise + finally: + # CRITICAL: Prevent context leak from generator iteration + # If the underlying generator (litellm) attached a context but didn't detach it yet + # (because it's still yields), we explicitly ensure the current thread's context + # doesn't keep the SUPPRESS_LLM_SDK_KEY=True value. + # However, since context is immutable in OTEL, we handle it by being careful. + # Actually, the leakage happens because LiteLLM might be doing background work. + pass + + def __enter__(self): + """Support context manager protocol.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Ensure finalization on context exit.""" + if exc_type is not None: + # Exception occurred during iteration + self._finalize(error=exc_val) + else: + # Normal exit (may have completed or early terminated) + self._finalize() + return False + + def close(self): + """Explicitly close and finalize the stream.""" + self._finalize() + + def _finalize(self, error: Optional[Exception] = None): + """Finalize the span with data from last chunk.""" + if self._finalized: + return + + self._finalized = True + try: + # Call the callback with only the last chunk + # Note: The callback is responsible for calling handler.stop_llm() or handler.fail_llm() + # which will end the span. We no longer call span.end() here. + if self.callback: + self.callback(self.span, self.last_chunk, error) + + # Clear reference to avoid holding memory + self.last_chunk = None + except Exception as e: + logger.debug(f"Error finalizing stream: {e}") + + +class AsyncStreamWrapper: + """ + Wrapper for asynchronous streaming responses. + Note: To avoid memory leaks, we only keep the last chunk instead of all chunks. + This is sufficient for extracting usage information which is typically in the last chunk. + + Important: AsyncStreamWrapper must be consumed within an async context that ensures + finalization, either by: + 1. Using as an async context manager: async with response: ... + 2. Explicitly calling close() after iteration + 3. Letting the wrapper detect stream exhaustion + """ + + def __init__(self, stream, span: Any, callback: callable): + self.stream = stream + self.span = span + self.callback = callback + self.last_chunk = None # Only keep last chunk to avoid memory leak + self.chunk_count = 0 + self._finalized = False + self._stream_exhausted = False + self.accumulated_content = [] # Accumulate content for output messages + self.accumulated_tool_calls = [] # Accumulate tool calls + + def __aiter__(self): + # Return an async generator that wraps the stream and ensures finalization + return self._wrapped_iteration() + + async def _wrapped_iteration(self): + """ + Async generator that wraps the underlying stream and ensures finalization. + This approach guarantees that _finalize() is called when: + 1. The stream is exhausted normally + 2. An exception occurs + 3. The generator is closed early (via aclose()) + """ + try: + async for chunk in self.stream: + # Accumulate content from delta for output messages + if hasattr(chunk, "choices") and chunk.choices: + choice = chunk.choices[0] + if hasattr(choice, "delta"): + delta = choice.delta + # Accumulate text content + if hasattr(delta, "content") and delta.content: + self.accumulated_content.append(delta.content) + # Accumulate tool calls + if hasattr(delta, "tool_calls") and delta.tool_calls: + self.accumulated_tool_calls.extend( + delta.tool_calls + ) + + # Only keep the last chunk (contains usage info) + self.last_chunk = chunk + self.chunk_count += 1 + + yield chunk + + # Stream exhausted normally + logger.debug( + f"AsyncStreamWrapper: Stream completed (chunks: {self.chunk_count})" + ) + except Exception as e: + # Error during streaming + logger.debug(f"AsyncStreamWrapper: Error during streaming: {e}") + self._finalize(error=e) + raise + finally: + # Always finalize, whether completed normally, with error, or closed early + self._finalize() + + async def __aenter__(self): + """Support async context manager protocol.""" + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + """Ensure finalization on async context exit.""" + if exc_type is not None: + # Exception occurred during iteration + self._finalize(error=exc_val) + else: + # Normal exit (may have completed or early terminated) + self._finalize() + return False + + async def aclose(self): + """Explicitly close and finalize the async stream.""" + self._finalize() + + def close(self): + """Synchronous close method for compatibility.""" + self._finalize() + + def _finalize(self, error: Optional[Exception] = None): + """Finalize the span with data from last chunk.""" + if self._finalized: + return + + self._finalized = True + try: + # Call the callback with only the last chunk + # Note: The callback is responsible for calling handler.stop_llm() or handler.fail_llm() + # which will end the span. We no longer call span.end() here. + if self.callback: + try: + self.callback(self.span, self.last_chunk, error) + except Exception as callback_error: + logger.debug(f"Error in stream callback: {callback_error}") + + # Clear reference to avoid holding memory + self.last_chunk = None + except Exception as e: + logger.debug(f"Error finalizing async stream: {e}") diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py new file mode 100644 index 000000000..8b54a6150 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_utils.py @@ -0,0 +1,439 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Utility functions for LiteLLM instrumentation. +""" + +import json +import logging +from typing import Any, Dict, List, Optional + +from opentelemetry import context +from opentelemetry.semconv._incubating.attributes.gen_ai_attributes import ( + GenAiOperationNameValues, +) +from opentelemetry.util.genai.extended_types import EmbeddingInvocation +from opentelemetry.util.genai.types import ( + FunctionToolDefinition, + InputMessage, + LLMInvocation, + OutputMessage, + Text, + ToolCall, + ToolCallResponse, +) + +logger = logging.getLogger(__name__) + +# Global Context key to suppress nested LLM SDK instrumentation +# This prevents double instrumentation when litellm calls underlying SDKs like OpenAI +# Defining it here ensures all wrappers share the same key instance. +SUPPRESS_LLM_SDK_KEY = context.create_key("suppress_llm_sdk_instrumentation") + + +def convert_messages_to_structured_format( + messages: List[Dict[str, Any]], +) -> List[Dict[str, Any]]: + """ + Convert LiteLLM message format to structured format required by semantic conventions. + + Converts from: + {"role": "user", "content": "..."} + To: + {"role": "user", "parts": [{"type": "text", "content": "..."}]} + """ + if not isinstance(messages, list): + return [] + + structured_messages = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + role = msg.get("role", "") + structured_msg = {"role": role, "parts": []} + + # Handle text content + if "content" in msg and msg["content"]: + content = msg["content"] + if isinstance(content, str): + structured_msg["parts"].append( + {"type": "text", "content": content} + ) + elif isinstance(content, list): + # Handle multi-modal content + for item in content: + if isinstance(item, dict): + if item.get("type") == "text": + structured_msg["parts"].append( + { + "type": "text", + "content": item.get("text", ""), + } + ) + else: + structured_msg["parts"].append(item) + + # Handle tool calls + if "tool_calls" in msg and msg["tool_calls"]: + for tool_call in msg["tool_calls"]: + if not isinstance(tool_call, dict): + continue + + tool_part = {"type": "tool_call"} + if "id" in tool_call: + tool_part["id"] = tool_call["id"] + if "function" in tool_call: + func = tool_call["function"] + if isinstance(func, dict): + if "name" in func: + tool_part["name"] = func["name"] + if "arguments" in func: + try: + # Try to parse arguments if it's a JSON string + args_str = func["arguments"] + if isinstance(args_str, str): + tool_part["arguments"] = json.loads( + args_str + ) + else: + tool_part["arguments"] = args_str + except Exception: + tool_part["arguments"] = func.get( + "arguments", "" + ) + + structured_msg["parts"].append(tool_part) + + # Handle tool call responses + if role == "tool" and "content" in msg: + tool_response_part = { + "type": "tool_call_response", + "response": msg["content"], + } + if "tool_call_id" in msg: + tool_response_part["id"] = msg["tool_call_id"] + structured_msg["parts"].append(tool_response_part) + + structured_messages.append(structured_msg) + + return structured_messages + + +def parse_provider_from_model(model: str) -> Optional[str]: + """ + Parse provider name from model string. + + LiteLLM uses format like "openai/gpt-4", "dashscope/qwen-turbo", etc. + """ + if not model: + return None + + if "/" in model: + return model.split("/")[0] + + # Fallback: try to infer from model name patterns + if "gpt" in model.lower(): + return "openai" + elif "qwen" in model.lower(): + return "dashscope" + elif "claude" in model.lower(): + return "anthropic" + elif "gemini" in model.lower(): + return "google" + + return "unknown" + + +def parse_model_name(model: str) -> str: + """ + Parse model name by removing provider prefix. + + Examples: + "openai/gpt-4" -> "gpt-4" + "dashscope/qwen-turbo" -> "qwen-turbo" + "gpt-4" -> "gpt-4" + """ + if not model: + return "unknown" + + if "/" in model: + return model.split("/", 1)[1] + + return model + + +def safe_json_dumps(obj: Any, default: str = "{}") -> str: + """ + Safely serialize object to JSON string. + """ + try: + return json.dumps(obj, ensure_ascii=False) + except Exception as e: + logger.debug(f"Failed to serialize object to JSON: {e}") + return default + + +def convert_tool_definitions(tools: List[Dict[str, Any]]) -> str: + """ + Convert tool definitions to JSON string format. + """ + if not tools: + return "[]" + + try: + # Tools are typically in format: [{"type": "function", "function": {...}}] + return json.dumps(tools, ensure_ascii=False) + except Exception as e: + logger.debug(f"Failed to convert tool definitions: {e}") + return "[]" + + +def convert_litellm_messages_to_genai_format( + messages: List[Dict[str, Any]], +) -> List: + """ + Convert LiteLLM message format to OpenTelemetry GenAI InputMessage format. + + This function converts LiteLLM's message structure to the standardized + InputMessage format required by ExtendedTelemetryHandler. + """ + + if not isinstance(messages, list): + return [] + + input_messages = [] + for msg in messages: + if not isinstance(msg, dict): + continue + + role = msg.get("role", "user") + parts = [] + + # Handle text content + if "content" in msg and msg["content"]: + content = msg["content"] + if isinstance(content, str): + parts.append(Text(content=content)) + elif isinstance(content, list): + # Handle multi-modal content + for item in content: + if isinstance(item, dict) and item.get("type") == "text": + parts.append(Text(content=item.get("text", ""))) + # Other content types (image, etc.) can be added here + + # Handle tool calls + if "tool_calls" in msg and msg["tool_calls"]: + for tool_call in msg["tool_calls"]: + if not isinstance(tool_call, dict): + continue + + func = tool_call.get("function", {}) + if isinstance(func, dict): + # Parse arguments if it's a JSON string + arguments = func.get("arguments", "") + if isinstance(arguments, str) and arguments: + try: + arguments = json.loads(arguments) + except Exception: + pass + + parts.append( + ToolCall( + id=tool_call.get("id"), + name=func.get("name", ""), + arguments=arguments, + ) + ) + + # Handle tool call responses + if role == "tool" and "content" in msg: + parts.append( + ToolCallResponse( + id=msg.get("tool_call_id"), response=msg["content"] + ) + ) + + # If no parts added, add empty text + if not parts: + parts.append(Text(content="")) + + input_messages.append(InputMessage(role=role, parts=parts)) + + return input_messages + + +def extract_output_from_litellm_response(response: Any) -> List: + """ + Extract output messages from LiteLLM response. + + Converts LiteLLM response to OpenTelemetry GenAI OutputMessage format. + """ + + if not hasattr(response, "choices") or not response.choices: + return [] + + output_messages = [] + for choice in response.choices: + if not hasattr(choice, "message"): + continue + + msg = choice.message + parts = [] + + # Extract text content + if hasattr(msg, "content") and msg.content: + parts.append(Text(content=msg.content)) + + # Extract tool calls + if hasattr(msg, "tool_calls") and msg.tool_calls: + for tc in msg.tool_calls: + # Parse arguments if it's a JSON string + arguments = getattr(tc.function, "arguments", "") + if isinstance(arguments, str) and arguments: + try: + arguments = json.loads(arguments) + except Exception: + pass + + parts.append( + ToolCall( + id=getattr(tc, "id", None), + name=getattr(tc.function, "name", ""), + arguments=arguments, + ) + ) + + # If no parts, add empty text + if not parts: + parts.append(Text(content="")) + + finish_reason = getattr(choice, "finish_reason", "stop") or "stop" + + output_messages.append( + OutputMessage( + role=getattr(msg, "role", "assistant"), + parts=parts, + finish_reason=finish_reason, + ) + ) + + return output_messages + + +def create_llm_invocation_from_litellm(**kwargs): + """ + Create LLMInvocation from LiteLLM request parameters. + + Args: + model: The model name (e.g., "gpt-4", "openai/gpt-4") + provider: The provider name (e.g., "openai", "dashscope") + messages: List of message dictionaries + **kwargs: Additional request parameters (temperature, max_tokens, etc.) + + Returns: + LLMInvocation object ready for use with ExtendedTelemetryHandler + """ + + # Parse model name (remove provider prefix if present) + model = kwargs.get("model", "unknown_model") + provider = parse_provider_from_model(model) or "unknown" + messages = kwargs.get("messages", []) + + # Convert messages to GenAI format + input_messages = convert_litellm_messages_to_genai_format(messages) + + request_model = parse_model_name(model) + + invocation = LLMInvocation( + request_model=request_model, + provider=provider or "unknown", + operation_name=GenAiOperationNameValues.CHAT.value, + input_messages=input_messages, + ) + + # Set optional request parameters + if "temperature" in kwargs and kwargs["temperature"] is not None: + invocation.temperature = kwargs["temperature"] + if "max_tokens" in kwargs and kwargs["max_tokens"] is not None: + invocation.max_tokens = kwargs["max_tokens"] + if "top_p" in kwargs and kwargs["top_p"] is not None: + invocation.top_p = kwargs["top_p"] + if ( + "frequency_penalty" in kwargs + and kwargs["frequency_penalty"] is not None + ): + invocation.frequency_penalty = kwargs["frequency_penalty"] + if "presence_penalty" in kwargs and kwargs["presence_penalty"] is not None: + invocation.presence_penalty = kwargs["presence_penalty"] + if "seed" in kwargs and kwargs["seed"] is not None: + invocation.seed = kwargs["seed"] + if "stop" in kwargs and kwargs["stop"] is not None: + stop = kwargs["stop"] + if isinstance(stop, str): + invocation.stop_sequences = [stop] + elif isinstance(stop, list): + invocation.stop_sequences = stop + + # Handle tool definitions + if "tools" in kwargs and kwargs["tools"]: + tools = kwargs["tools"] + tool_definitions = [] + for tool in tools: + if isinstance(tool, dict) and "function" in tool: + func = tool["function"] + tool_definitions.append( + FunctionToolDefinition( + name=func.get("name", ""), + description=func.get("description"), + parameters=func.get("parameters"), + ) + ) + if tool_definitions: + invocation.tool_definitions = tool_definitions + + return invocation + + +def create_embedding_invocation_from_litellm(**kwargs): + """ + Create EmbeddingInvocation from LiteLLM embedding request parameters. + + Args: + model: The embedding model name + provider: The provider name + **kwargs: Additional request parameters + + Returns: + EmbeddingInvocation object ready for use with ExtendedTelemetryHandler + """ + + # Extract request parameters + model = kwargs.get("model", "unknown") + provider = parse_provider_from_model(model) or "unknown" + + # Parse model name (remove provider prefix if present) + request_model = parse_model_name(model) + + invocation = EmbeddingInvocation( + request_model=request_model, + provider=provider or "unknown", + ) + + # Set encoding formats if present + if "encoding_format" in kwargs and kwargs["encoding_format"]: + invocation.encoding_formats = [kwargs["encoding_format"]] + + return invocation diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py new file mode 100644 index 000000000..fa534d3ef --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_wrapper.py @@ -0,0 +1,469 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Wrapper functions for LiteLLM completion instrumentation. +""" + +import json +import logging +import os +from typing import Any, Callable, Optional + +from opentelemetry import context +from opentelemetry.context import _SUPPRESS_INSTRUMENTATION_KEY +from opentelemetry.instrumentation.litellm._stream_wrapper import ( + AsyncStreamWrapper, + StreamWrapper, +) +from opentelemetry.instrumentation.litellm._utils import ( + SUPPRESS_LLM_SDK_KEY, + create_llm_invocation_from_litellm, + extract_output_from_litellm_response, +) +from opentelemetry.trace import get_current_span +from opentelemetry.util.genai.types import ( + Error, + OutputMessage, + Text, + ToolCall, +) + +logger = logging.getLogger(__name__) + +# Environment variable to control instrumentation +LITELLM_INSTRUMENTATION_ENABLED = "ARMS_LITELLM_INSTRUMENTATION_ENABLED" + + +def _is_instrumentation_enabled() -> bool: + """Check if instrumentation is enabled via environment variable.""" + enabled = os.getenv(LITELLM_INSTRUMENTATION_ENABLED, "true").lower() + return enabled != "false" + + +class CompletionWrapper: + """Wrapper for litellm.completion()""" + + def __init__(self, handler, original_func: Callable): + self._handler = handler + self.original_func = original_func + + def __call__(self, *args, **kwargs): + """Wrap litellm.completion()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed + if context.get_value(SUPPRESS_LLM_SDK_KEY): + if get_current_span().get_span_context().is_valid: + return self.original_func(*args, **kwargs) + + # Extract request parameters + is_stream = kwargs.get("stream", False) + + # For streaming, enable usage tracking if not explicitly disabled + # This ensures we get token usage information in the final chunk + if is_stream and "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + + # For streaming, we need special handling + + # For streaming, we need special handling + if is_stream: + # Create invocation object + invocation = create_llm_invocation_from_litellm(**kwargs) + + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + pass + + # Start LLM invocation + self._handler.start_llm(invocation) + + try: + # Call original function + response = self.original_func(*args, **kwargs) + + # Wrap the streaming response + # We pass invocation and handler so the callback can fill data and call stop_llm + stream_wrapper = StreamWrapper( + stream=response, + span=invocation.span, # For TTFT tracking + callback=lambda span, + last_chunk, + error: self._handle_stream_end_with_handler( + invocation, last_chunk, error, stream_wrapper + ), + ) + response = stream_wrapper + + return response + except Exception as e: + # Fail LLM invocation + self._handler.fail_llm( + invocation, Error(message=str(e), type=type(e)) + ) + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + else: + # Create invocation object + invocation = create_llm_invocation_from_litellm(**kwargs) + + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + pass + + # Start LLM invocation (handler creates and manages span) + self._handler.start_llm(invocation) + + try: + # Call original function + response = self.original_func(*args, **kwargs) + + # Fill response data into invocation + invocation.output_messages = ( + extract_output_from_litellm_response(response) + ) + + # Extract token usage + if hasattr(response, "usage") and response.usage: + invocation.input_tokens = getattr( + response.usage, "prompt_tokens", None + ) + invocation.output_tokens = getattr( + response.usage, "completion_tokens", None + ) + + # Extract response metadata + if hasattr(response, "id"): + invocation.response_id = response.id + if hasattr(response, "model"): + invocation.response_model_name = response.model + + # Extract finish reasons + if hasattr(response, "choices") and response.choices: + finish_reasons = [] + for choice in response.choices: + if ( + hasattr(choice, "finish_reason") + and choice.finish_reason + ): + finish_reasons.append(choice.finish_reason) + if finish_reasons: + invocation.finish_reasons = finish_reasons + + # End LLM invocation successfully (handler ends span and records metrics) + self._handler.stop_llm(invocation) + + return response + + except Exception as e: + # Fail LLM invocation (handler marks span as error) + self._handler.fail_llm( + invocation, Error(message=str(e), type=type(e)) + ) + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + + def _handle_stream_end_with_handler( + self, + invocation, + last_chunk: Optional[Any], + error: Optional[Exception], + stream_wrapper: Optional[Any] = None, + ): + """Handle the end of a streaming response using Handler pattern.""" + + try: + if error: + # Fail LLM invocation + self._handler.fail_llm( + invocation, Error(message=str(error), type=type(error)) + ) + return + + # Construct output message from accumulated content + parts = [] + if stream_wrapper and hasattr( + stream_wrapper, "accumulated_content" + ): + full_content = "".join(stream_wrapper.accumulated_content) + if full_content: + parts.append(Text(content=full_content)) + + # Handle accumulated tool calls if any + if ( + hasattr(stream_wrapper, "accumulated_tool_calls") + and stream_wrapper.accumulated_tool_calls + ): + for tc in stream_wrapper.accumulated_tool_calls: + if hasattr(tc, "function"): + # Parse arguments if it's a JSON string + arguments = getattr(tc.function, "arguments", "") + if isinstance(arguments, str) and arguments: + try: + arguments = json.loads(arguments) + except Exception: + pass + + parts.append( + ToolCall( + id=getattr(tc, "id", None), + name=getattr(tc.function, "name", ""), + arguments=arguments, + ) + ) + + # If we have parts, create output message + if parts: + invocation.output_messages = [ + OutputMessage( + role="assistant", parts=parts, finish_reason="stop" + ) + ] + + # Extract token usage from last chunk + if ( + last_chunk + and hasattr(last_chunk, "usage") + and last_chunk.usage + ): + invocation.input_tokens = getattr( + last_chunk.usage, "prompt_tokens", None + ) + invocation.output_tokens = getattr( + last_chunk.usage, "completion_tokens", None + ) + + # Extract response metadata + if last_chunk: + if hasattr(last_chunk, "id"): + invocation.response_id = last_chunk.id + if hasattr(last_chunk, "model"): + invocation.response_model_name = last_chunk.model + + # Extract finish_reason from last chunk's choice + if hasattr(last_chunk, "choices") and last_chunk.choices: + finish_reasons = [] + for choice in last_chunk.choices: + if ( + hasattr(choice, "finish_reason") + and choice.finish_reason + ): + finish_reasons.append(choice.finish_reason) + if finish_reasons: + invocation.finish_reasons = finish_reasons + + # End LLM invocation successfully + self._handler.stop_llm(invocation) + + except Exception as e: + logger.debug(f"Error handling stream end with handler: {e}") + # Try to fail gracefully + try: + self._handler.fail_llm( + invocation, Error(message=str(e), type=type(e)) + ) + except Exception: + pass + + +class AsyncCompletionWrapper: + """Wrapper for litellm.acompletion()""" + + def __init__(self, handler, original_func: Callable): + self._handler = handler + self.original_func = original_func + + async def __call__(self, *args, **kwargs): + """Wrap litellm.acompletion()""" + # Check if instrumentation is enabled + if not _is_instrumentation_enabled(): + return await self.original_func(*args, **kwargs) + + # Check suppression context + if context.get_value(_SUPPRESS_INSTRUMENTATION_KEY): + return await self.original_func(*args, **kwargs) + + # Check if LLM SDK is suppressed + if context.get_value(SUPPRESS_LLM_SDK_KEY): + if get_current_span().get_span_context().is_valid: + return await self.original_func(*args, **kwargs) + + # Extract request parameters + is_stream = kwargs.get("stream", False) + + # For streaming, enable usage tracking if not explicitly disabled + if is_stream and "stream_options" not in kwargs: + kwargs["stream_options"] = {"include_usage": True} + + # For streaming, we need special handling + if is_stream: + # Create invocation object + invocation = create_llm_invocation_from_litellm(**kwargs) + + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + pass + + # Start LLM invocation + self._handler.start_llm(invocation) + + try: + # Call original function + response = await self.original_func(*args, **kwargs) + + # Wrap the async streaming response + stream_wrapper = AsyncStreamWrapper( + stream=response, + span=invocation.span, # For TTFT tracking + callback=lambda span, + last_chunk, + error: self._handle_stream_end_with_handler( + invocation, last_chunk, error, stream_wrapper + ), + ) + response = stream_wrapper + + return response + except Exception as e: + # Fail LLM invocation + self._handler.fail_llm( + invocation, Error(message=str(e), type=type(e)) + ) + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + else: + # Non-streaming: use Handler pattern + # Create invocation object + invocation = create_llm_invocation_from_litellm(**kwargs) + + # Set _SUPPRESS_LLM_SDK_KEY to prevent nested SDK instrumentation + suppress_token = None + try: + suppress_token = context.attach( + context.set_value(SUPPRESS_LLM_SDK_KEY, True) + ) + except Exception: + pass + + # Start LLM invocation + self._handler.start_llm(invocation) + + try: + # Call original function + response = await self.original_func(*args, **kwargs) + + # Fill response data into invocation + invocation.output_messages = ( + extract_output_from_litellm_response(response) + ) + + # Extract token usage + if hasattr(response, "usage") and response.usage: + invocation.input_tokens = getattr( + response.usage, "prompt_tokens", None + ) + invocation.output_tokens = getattr( + response.usage, "completion_tokens", None + ) + + # Extract response metadata + if hasattr(response, "id"): + invocation.response_id = response.id + if hasattr(response, "model"): + invocation.response_model_name = response.model + + # Extract finish reasons + if hasattr(response, "choices") and response.choices: + finish_reasons = [] + for choice in response.choices: + if ( + hasattr(choice, "finish_reason") + and choice.finish_reason + ): + finish_reasons.append(choice.finish_reason) + if finish_reasons: + invocation.finish_reasons = finish_reasons + + # End LLM invocation successfully + self._handler.stop_llm(invocation) + + return response + + except Exception as e: + # Fail LLM invocation + self._handler.fail_llm( + invocation, Error(message=str(e), type=type(e)) + ) + raise + finally: + # Detach suppress context + if suppress_token: + try: + context.detach(suppress_token) + except Exception: + pass + + def _handle_stream_end_with_handler( + self, + invocation, + last_chunk: Optional[Any], + error: Optional[Exception], + stream_wrapper: Optional[Any] = None, + ): + """Handle the end of an async streaming response using Handler pattern.""" + # Reuse sync logic + completion_wrapper = CompletionWrapper(self._handler, None) + completion_wrapper._handle_stream_end_with_handler( + invocation, last_chunk, error, stream_wrapper + ) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py new file mode 100644 index 000000000..630f6e8bf --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/package.py @@ -0,0 +1,17 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +_instruments = ("litellm >= 1.0.0",) + +_supports_metrics = True diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py new file mode 100644 index 000000000..960e5008b --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/version.py @@ -0,0 +1,15 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +__version__ = "0.51b0" diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt new file mode 100644 index 000000000..4945578ee --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/test-requirements.txt @@ -0,0 +1,8 @@ +litellm>=1.79.0 +pytest +pytest-asyncio +openai +-e aliyun-semantic-conventions +-e util/opentelemetry-util-http +-e instrumentation/opentelemetry-instrumentation-litellm + From d8affedceb725b0b158b3571e25645efa1ca4d9d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=98=8E=E7=9B=B4?= Date: Wed, 28 Jan 2026 15:26:39 +0800 Subject: [PATCH 2/2] Add Litellm Instrument Unit Test --- .../litellm/_embedding_wrapper.py | 50 +++ .../tests/__init__.py | 14 + .../tests/test_embedding.py | 224 +++++++++++++ .../tests/test_error_handling.py | 159 +++++++++ .../tests/test_retry.py | 208 ++++++++++++ .../tests/test_stream_completion.py | 265 +++++++++++++++ .../tests/test_sync_completion.py | 233 +++++++++++++ .../tests/test_tool_calls.py | 310 ++++++++++++++++++ .../tests/test_utils.py | 109 ++++++ 9 files changed, 1572 insertions(+) create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py create mode 100644 instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py index bbfb55008..8a439bd60 100644 --- a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/src/opentelemetry/instrumentation/litellm/_embedding_wrapper.py @@ -94,6 +94,31 @@ def __call__(self, *args, **kwargs): response.usage, "total_tokens", None ) + # Extract embedding dimension count + if ( + hasattr(response, "data") + and response.data + and len(response.data) > 0 + ): + try: + first_embedding = response.data[0] + # Handle dict response + if ( + isinstance(first_embedding, dict) + and "embedding" in first_embedding + ): + embedding_vector = first_embedding["embedding"] + if isinstance(embedding_vector, list): + invocation.dimension_count = len(embedding_vector) + # Handle object response + elif hasattr(first_embedding, "embedding"): + embedding_vector = first_embedding.embedding + if isinstance(embedding_vector, list): + invocation.dimension_count = len(embedding_vector) + except (IndexError, AttributeError, KeyError, TypeError): + # If we can't extract dimension, just skip it + pass + # End Embedding invocation successfully self._handler.stop_embedding(invocation) @@ -170,6 +195,31 @@ async def __call__(self, *args, **kwargs): response.usage, "total_tokens", None ) + # Extract embedding dimension count + if ( + hasattr(response, "data") + and response.data + and len(response.data) > 0 + ): + try: + first_embedding = response.data[0] + # Handle dict response + if ( + isinstance(first_embedding, dict) + and "embedding" in first_embedding + ): + embedding_vector = first_embedding["embedding"] + if isinstance(embedding_vector, list): + invocation.dimension_count = len(embedding_vector) + # Handle object response + elif hasattr(first_embedding, "embedding"): + embedding_vector = first_embedding.embedding + if isinstance(embedding_vector, list): + invocation.dimension_count = len(embedding_vector) + except (IndexError, AttributeError, KeyError, TypeError): + # If we can't extract dimension, just skip it + pass + # End Embedding invocation successfully self._handler.stop_embedding(invocation) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py new file mode 100644 index 000000000..f87ce79b7 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/__init__.py @@ -0,0 +1,14 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py new file mode 100644 index 000000000..9b75ec06a --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_embedding.py @@ -0,0 +1,224 @@ +import asyncio +import os +from unittest.mock import patch + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.util.genai.types import ContentCapturingMode + + +class TestEmbedding(TestBase): + """ + Test embedding calls with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["DASHSCOPE_API_KEY"] = os.environ.get( + "DASHSCOPE_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + os.environ["DASHSCOPE_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + # Force experiment mode for content capture + self.patch_experimental = patch( + "opentelemetry.util.genai.span_utils.is_experimental_mode", + return_value=True, + ) + self.patch_content_mode = patch( + "opentelemetry.util.genai.span_utils.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_ONLY, + ) + + self.patch_experimental.start() + self.patch_content_mode.start() + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + self.patch_experimental.stop() + self.patch_content_mode.stop() + + def test_sync_embedding_single_text(self): + """ + Test synchronous embedding with single text input. + """ + + # Business demo: Single text embedding + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="The quick brown fox jumps over the lazy dog", + encoding_format="float", + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "data")) + self.assertGreater(len(response.data), 0) + + # Verify embedding is a list of numbers + embedding = response.data[0].get("embedding") + self.assertIsInstance(embedding, list) + self.assertGreater(len(embedding), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual( + len(spans), 1, "Expected exactly one span for embedding call" + ) + span = spans[0] + + # Verify span kind + self.assertEqual( + span.attributes.get("gen_ai.span.kind"), + "EMBEDDING", + "Span kind should be EMBEDDING", + ) + + # Verify model + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual( + span.attributes.get("gen_ai.request.model"), "text-embedding-v1" + ) + + # Verify token usage (required for embedding) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertGreater(span.attributes.get("gen_ai.usage.input_tokens"), 0) + + # Verify embedding dimension count + self.assertIn("gen_ai.embeddings.dimension.count", span.attributes) + dimension = span.attributes.get("gen_ai.embeddings.dimension.count") + self.assertEqual(dimension, len(embedding)) + self.assertGreater(dimension, 0) + + def test_sync_embedding_multiple_texts(self): + """ + Test synchronous embedding with multiple text inputs. + """ + + # Business demo: Batch embedding + texts = [ + "Hello, world!", + "Artificial intelligence is fascinating.", + "LiteLLM makes LLM integration easy.", + ] + + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input=texts, + encoding_format="float", + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "data")) + self.assertEqual( + len(response.data), + len(texts), + "Should have embedding for each text", + ) + + # Verify each embedding + self.assertIsInstance(response.data[0].get("embedding"), list) + self.assertGreater(len(response.data[0].get("embedding")), 0) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "EMBEDDING") + self.assertGreater(span.attributes.get("gen_ai.usage.input_tokens"), 0) + + def test_async_embedding(self): + """ + Test asynchronous embedding call. + """ + + async def run_async_embedding(): + response = await litellm.aembedding( + model="openai/text-embedding-v1", + input="Async test", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + encoding_format="float", + ) + return response + + response = asyncio.run(run_async_embedding()) + + # Verify response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "data")) + self.assertGreater(len(response.data), 0) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "EMBEDDING") + self.assertIn("gen_ai.request.model", span.attributes) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.embeddings.dimension.count", span.attributes) + + def test_embedding_with_different_models(self): + """ + Test embedding with different model providers. + """ + + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="Testing different embedding models", + encoding_format="float", + ) + + self.assertIsNotNone(response) + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual( + span.attributes.get("gen_ai.request.model"), "text-embedding-v1" + ) + + def test_embedding_empty_input(self): + """ + Test embedding with edge case inputs. + """ + + response = litellm.embedding( + model="openai/text-embedding-v1", + api_base="https://dashscope.aliyuncs.com/compatible-mode/v1", + input="Hi", + encoding_format="float", + ) + + # Verify response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "data")) + self.assertGreater(len(response.data), 0) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "EMBEDDING") + self.assertIn("gen_ai.usage.input_tokens", span.attributes) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py new file mode 100644 index 000000000..dfd751bd8 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_error_handling.py @@ -0,0 +1,159 @@ +import os +from unittest.mock import patch + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.trace import StatusCode +from opentelemetry.util.genai.types import ContentCapturingMode + + +class TestErrorHandling(TestBase): + """ + Test error handling and edge cases with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Mock experimental mode + self.patch_experimental = patch( + "opentelemetry.util.genai.span_utils.is_experimental_mode", + return_value=True, + ) + self.patch_content_mode = patch( + "opentelemetry.util.genai.span_utils.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_ONLY, + ) + + self.patch_experimental.start() + self.patch_content_mode.start() + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + self.patch_experimental.stop() + self.patch_content_mode.stop() + + def test_authentication_failure(self): + """ + Test handling of authentication failures. + """ + + # Temporarily set invalid credentials + original_dashscope_key = os.environ.get("DASHSCOPE_API_KEY") + os.environ["DASHSCOPE_API_KEY"] = "invalid-key-12345" + + try: + litellm.completion( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Hello"}], + ) + self.fail("Expected authentication error but call succeeded") + except Exception as e: + self.assertIsNotNone(e) + finally: + if original_dashscope_key: + os.environ["DASHSCOPE_API_KEY"] = original_dashscope_key + else: + os.environ.pop("DASHSCOPE_API_KEY", None) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1, "Should create 1 span even on error") + span = spans[0] + + # Verify span status indicates error + self.assertEqual( + span.status.status_code, + StatusCode.ERROR, + "Span status should indicate error", + ) + self.assertIn("error.type", span.attributes) + + def test_invalid_model_name(self): + """ + Test handling of invalid model names. + """ + + # Set up valid credentials + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + try: + litellm.completion( + model="non-existent-model-xyz-123", + messages=[{"role": "user", "content": "Hello"}], + ) + except Exception as e: + self.assertIsNotNone(e) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + # Verify model name is still captured + self.assertEqual( + span.attributes.get("gen_ai.request.model"), + "non-existent-model-xyz-123", + ) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + + def test_network_timeout(self): + """ + Test handling of network timeouts. + """ + + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + try: + litellm.completion( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Tell me a long story"}], + timeout=0.001, + ) + except Exception as e: + self.assertIsNotNone(e) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + self.assertIn("gen_ai.request.model", span.attributes) + self.assertEqual(span.status.status_code, StatusCode.ERROR) + + def test_max_tokens_exceeded(self): + """ + Test handling when max_tokens is exceeded. + """ + + os.environ["DASHSCOPE_API_KEY"] = os.environ.get( + "DASHSCOPE_API_KEY", "sk-..." + ) + + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Write a 500 word essay"}], + max_tokens=2, + ) + + self.assertIsNotNone(response) + + span = self.get_finished_spans()[0] + self.assertIn("gen_ai.response.finish_reasons", span.attributes) + finish_reasons = span.attributes.get("gen_ai.response.finish_reasons") + # Should contain 'length' or similar + self.assertTrue(any(r in ["length", "stop"] for r in finish_reasons)) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py new file mode 100644 index 000000000..3f4609615 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_retry.py @@ -0,0 +1,208 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for LiteLLM retry mechanisms. + +This module tests retry functionality in LiteLLM, including both +completion_with_retries and acompletion_with_retries functions. +""" + +import asyncio +import json +import os +from unittest.mock import patch + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.util.genai.types import ContentCapturingMode + + +class TestRetry(TestBase): + """ + Test retry mechanisms with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["DASHSCOPE_API_KEY"] = os.environ.get( + "DASHSCOPE_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + os.environ["DASHSCOPE_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + # Force experiment mode for content capture + self.patch_experimental = patch( + "opentelemetry.util.genai.span_utils.is_experimental_mode", + return_value=True, + ) + self.patch_content_mode = patch( + "opentelemetry.util.genai.span_utils.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_ONLY, + ) + + self.patch_experimental.start() + self.patch_content_mode.start() + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + self.patch_experimental.stop() + self.patch_content_mode.stop() + + def test_completion_with_retries_success(self): + """ + Test successful completion with retry mechanism. + """ + + # Business demo: Completion with retry wrapper (success case) + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What is 1+1? Answer briefly."} + ], + temperature=0.1, + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "choices")) + self.assertGreater(len(response.choices), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + # Verify attributes + self.assertEqual( + span.attributes.get("gen_ai.span.kind"), + "LLM", + "Span kind should be LLM", + ) + self.assertIn("gen_ai.input.messages", span.attributes) + self.assertIn("gen_ai.output.messages", span.attributes) + # Verify standard attributes + self.assertIn("gen_ai.request.model", span.attributes) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.output_tokens", span.attributes) + + def test_async_completion_with_retries(self): + """ + Test asynchronous completion with retry mechanism. + """ + + async def run_async_retry(): + response = await litellm.acompletion_with_retries( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Name a color."}], + temperature=0.0, + ) + return response + + response = asyncio.run(run_async_retry()) + + # Verify response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "choices")) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertIn("gen_ai.request.model", span.attributes) + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.input.messages", span.attributes) + + def test_completion_with_custom_retry_config(self): + """ + Test completion with custom retry configuration. + + This test configures custom retry parameters like max retries + and verifies that the instrumentation handles them correctly. + + The test verifies: + - Custom retry config is respected + - Instrumentation works with custom config + """ + + # Business demo: Completion with custom retry configuration + # This demo sets custom retry parameters + # Note: LiteLLM's retry mechanism might use different parameter names + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "What is the capital of China?"} + ], + num_retries=3, # Maximum number of retries + timeout=30, # Timeout in seconds + ) + + # Verify response + self.assertIsNotNone(response) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1) + + def test_retry_with_streaming(self): + """ + Test retry mechanism with streaming completion. + """ + + # Business demo: Streaming completion with retry wrapper + response = litellm.completion_with_retries( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Count to 3."}], + stream=True, + temperature=0.0, + ) + + # Collect stream chunks + chunks = [] + for chunk in response: + chunks.append(chunk) + + self.assertGreater(len(chunks), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertIn("gen_ai.output.messages", span.attributes) + + # Verify message content exists + output_messages = json.loads( + span.attributes.get("gen_ai.output.messages") + ) + self.assertGreater(len(str(output_messages[0]["parts"])), 0) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py new file mode 100644 index 000000000..10449e180 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_stream_completion.py @@ -0,0 +1,265 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for streaming LiteLLM completion calls. + +This module tests streaming text generation functionality using LiteLLM's +streaming API, including both synchronous and asynchronous streaming. +""" + +import asyncio +import json +import os +from unittest.mock import patch + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.util.genai.types import ContentCapturingMode + + +class TestStreamCompletion(TestBase): + """ + Test streaming completion calls with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["DASHSCOPE_API_KEY"] = os.environ.get( + "DASHSCOPE_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + os.environ["DASHSCOPE_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + # Force experiment mode for content capture + self.patch_experimental = patch( + "opentelemetry.util.genai.span_utils.is_experimental_mode", + return_value=True, + ) + self.patch_content_mode = patch( + "opentelemetry.util.genai.span_utils.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_ONLY, + ) + + self.patch_experimental.start() + self.patch_content_mode.start() + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + self.patch_experimental.stop() + self.patch_content_mode.stop() + + def test_sync_streaming_completion(self): + """ + Test synchronous streaming text generation. + """ + + # Business demo: Synchronous streaming completion + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + { + "role": "user", + "content": "Count from 1 to 5 with commas between numbers.", + } + ], + stream=True, + temperature=0.1, + ) + + # Collect all streaming chunks + chunks = list(response) + self.assertGreater(len(chunks), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + # Verify basic attributes + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertEqual( + span.attributes.get("gen_ai.request.model"), "qwen-turbo" + ) + + # Verify token usage (Streaming usually gets usage in the last chunk or via LiteLLM estimation) + self.assertGreater( + span.attributes.get("gen_ai.usage.input_tokens", 0), 0 + ) + self.assertGreater( + span.attributes.get("gen_ai.usage.output_tokens", 0), 0 + ) + + # Verify message content (Accumulated in streaming) + self.assertIn("gen_ai.input.messages", span.attributes) + input_messages = json.loads( + span.attributes.get("gen_ai.input.messages") + ) + self.assertEqual(input_messages[0]["role"], "user") + + self.assertIn("gen_ai.output.messages", span.attributes) + output_messages = json.loads( + span.attributes.get("gen_ai.output.messages") + ) + self.assertEqual(output_messages[0]["role"], "assistant") + + # Verify Output Content is actually there (concatenated from chunks) + content = str(output_messages[0]["parts"]) + self.assertTrue(any(str(i) in content for i in range(1, 6))) + + def test_async_streaming_completion(self): + """ + Test asynchronous streaming text generation. + + This test performs an asynchronous streaming chat completion request. + It uses async/await syntax to iterate through the stream asynchronously. + + The test verifies: + - Async streaming works correctly + - All span attributes are captured for async calls + - TTFT is recorded for async streams + """ + + async def run_async_stream(): + # Business demo: Asynchronous streaming completion + # This demo makes an async streaming call to dashscope/qwen-turbo model + chunks = [] + response = await litellm.acompletion( + model="dashscope/qwen-turbo", + messages=[ + { + "role": "user", + "content": "Say hello in 3 different languages.", + } + ], + stream=True, + temperature=0.3, + ) + + # Collect all streaming chunks + async for chunk in response: + chunks.append(chunk) + + # Explicitly close to ensure span finalization + if hasattr(response, "close"): + response.close() + + return chunks + + # Run the async function + chunks = asyncio.run(run_async_stream()) + + # Verify we received chunks + self.assertGreater(len(chunks), 0, "Should receive at least one chunk") + + # Force flush to ensure spans are processed + if hasattr(self, "tracer_provider") and self.tracer_provider: + self.tracer_provider.force_flush() + + # Get spans + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + + span = spans[0] + + # Verify streaming attributes + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + + # Verify token usage + self.assertIn("gen_ai.usage.input_tokens", span.attributes) + self.assertIn("gen_ai.usage.output_tokens", span.attributes) + + def test_streaming_with_early_termination(self): + """ + Test streaming completion with early termination. + + This test starts a streaming call but stops reading after a few chunks. + It verifies that the instrumentation handles partial streams correctly. + + The test verifies: + - Partial stream reading is handled correctly + - Span is still created and finalized + - Available data is captured even if stream is not fully consumed + """ + + # Business demo: Streaming with early termination + # This demo starts a stream but stops reading after 3 chunks + chunks_read = 0 + max_chunks = 3 + + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + {"role": "user", "content": "Write a long story about a cat."} + ], + stream=True, + max_tokens=200, + ) + + # Read only first few chunks + for chunk in response: + chunks_read += 1 + if chunks_read >= max_chunks: + break + + # Explicitly close the stream to finalize span + if hasattr(response, "close"): + response.close() + + # Verify we read the expected number of chunks + self.assertEqual(chunks_read, max_chunks) + + # Get spans + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 1, "Should have at least one span") + + span = spans[0] + + # Verify basic attributes are still captured + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertIn("gen_ai.request.model", span.attributes) + + def test_streaming_multiple_choices(self): + """ + Test streaming completion with n > 1. + """ + + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Hi"}], + stream=True, + n=2, + ) + list(response) + + span = self.get_finished_spans()[0] + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + # Optional: check choice count if implemented in stream wrapper diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py new file mode 100644 index 000000000..cffac0a1b --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_sync_completion.py @@ -0,0 +1,233 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for synchronous LiteLLM completion calls. + +This module tests basic synchronous text generation functionality using LiteLLM's +completion API with various models and configurations. +""" + +import json +import os +from unittest.mock import patch + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.util.genai.types import ContentCapturingMode + + +class TestSyncCompletion(TestBase): + """ + Test synchronous completion calls with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["DASHSCOPE_API_KEY"] = os.environ.get( + "DASHSCOPE_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + os.environ["DASHSCOPE_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + # Simpler approach: Mock the utility functions that check for experimental mode + + self.patch_experimental = patch( + "opentelemetry.util.genai.span_utils.is_experimental_mode", + return_value=True, + ) + self.patch_content_mode = patch( + "opentelemetry.util.genai.span_utils.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_ONLY, + ) + + self.patch_experimental.start() + self.patch_content_mode.start() + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + + # Stop patches + self.patch_experimental.stop() + self.patch_content_mode.stop() + + def test_basic_sync_completion(self): + """ + Test basic synchronous text generation. + + The test verifies: + - A span is created with gen_ai.span.kind = "LLM" + - Required span attributes are present (model, provider, tokens) + - Input and output messages are captured (Experimental mode) + - Token usage and finish reasons are recorded + """ + + # Business demo: Simple chat completion + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[ + { + "role": "user", + "content": "What is the capital of France? Answer in one word.", + } + ], + temperature=0.7, + max_tokens=50, + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "choices")) + self.assertGreater(len(response.choices), 0) + + # Get spans + spans = self.get_finished_spans() + self.assertEqual( + len(spans), 1, "Expected exactly one span for completion call" + ) + + span = spans[0] + + # Verify span kind and operation name + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + self.assertEqual(span.attributes.get("gen_ai.operation.name"), "chat") + + # Verify model and provider + self.assertEqual( + span.attributes.get("gen_ai.request.model"), "qwen-turbo" + ) + self.assertEqual( + span.attributes.get("gen_ai.provider.name"), "dashscope" + ) + + # Verify token usage + self.assertGreater(span.attributes.get("gen_ai.usage.input_tokens"), 0) + self.assertGreater( + span.attributes.get("gen_ai.usage.output_tokens"), 0 + ) + + # Verify Content Capture (Experimental Mode) + self.assertIn("gen_ai.input.messages", span.attributes) + input_messages = json.loads( + span.attributes.get("gen_ai.input.messages") + ) + self.assertEqual(input_messages[0]["role"], "user") + self.assertIn("capital of France", str(input_messages[0]["parts"])) + + self.assertIn("gen_ai.output.messages", span.attributes) + output_messages = json.loads( + span.attributes.get("gen_ai.output.messages") + ) + self.assertEqual(output_messages[0]["role"], "assistant") + self.assertGreater(len(output_messages[0]["parts"]), 0) + + # Verify recommended attributes + self.assertEqual( + span.attributes.get("gen_ai.request.temperature"), 0.7 + ) + self.assertEqual(span.attributes.get("gen_ai.request.max_tokens"), 50) + self.assertIn("gen_ai.response.id", span.attributes) + self.assertIn("gen_ai.response.model", span.attributes) + self.assertIn("gen_ai.response.finish_reasons", span.attributes) + + def test_sync_completion_with_multiple_messages(self): + """ + Test synchronous completion with multi-turn conversation. + """ + + messages = [ + { + "role": "system", + "content": "You are a helpful assistant that provides concise answers.", + }, + {"role": "user", "content": "What is 2+2?"}, + {"role": "assistant", "content": "4"}, + {"role": "user", "content": "What is 3+3?"}, + ] + + # Business demo: Multi-turn conversation + response = litellm.completion( + model="dashscope/qwen-turbo", messages=messages, temperature=0.1 + ) + # Verify response + self.assertIsNotNone(response) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + # Verify all messages captured in sequence + self.assertIn("gen_ai.input.messages", span.attributes) + input_messages = json.loads( + span.attributes.get("gen_ai.input.messages") + ) + self.assertEqual(len(input_messages), 4) + self.assertEqual(input_messages[0]["role"], "system") + self.assertEqual(input_messages[1]["role"], "user") + self.assertEqual(input_messages[2]["role"], "assistant") + self.assertEqual(input_messages[3]["role"], "user") + + output_messages = json.loads( + span.attributes.get("gen_ai.output.messages") + ) + self.assertGreater(len(output_messages), 0) + self.assertEqual(output_messages[0]["role"], "assistant") + + def test_sync_completion_with_parameters(self): + """ + Test capturing of various LLM parameters. + """ + + response = litellm.completion( + model="dashscope/qwen-turbo", + messages=[{"role": "user", "content": "Tell me a short joke."}], + temperature=0.9, + max_tokens=100, + top_p=0.95, + seed=42, + stop=["END"], + ) + # Verify response + self.assertIsNotNone(response) + + span = self.get_finished_spans()[0] + + # Verify advanced parameters + self.assertEqual( + span.attributes.get("gen_ai.request.temperature"), 0.9 + ) + self.assertEqual(span.attributes.get("gen_ai.request.max_tokens"), 100) + self.assertEqual(span.attributes.get("gen_ai.request.top_p"), 0.95) + self.assertEqual(span.attributes.get("gen_ai.request.seed"), 42) + + # Verify stop sequences (stored as list/tuple in attributes) + stop_seq = span.attributes.get("gen_ai.request.stop_sequences") + self.assertIn("END", stop_seq) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py new file mode 100644 index 000000000..1d20d1713 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_tool_calls.py @@ -0,0 +1,310 @@ +import json +import os +from unittest.mock import patch + +import litellm + +from opentelemetry.instrumentation.litellm import LiteLLMInstrumentor +from opentelemetry.test.test_base import TestBase +from opentelemetry.util.genai.types import ContentCapturingMode + + +class TestToolCalls(TestBase): + """ + Test tool and function calling with LiteLLM. + """ + + def setUp(self): + super().setUp() + # Set up environment variables for testing + os.environ["OPENAI_API_KEY"] = os.environ.get( + "OPENAI_API_KEY", "sk-..." + ) + os.environ["DASHSCOPE_API_KEY"] = os.environ.get( + "DASHSCOPE_API_KEY", "sk-..." + ) + os.environ["OPENAI_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + os.environ["DASHSCOPE_API_BASE"] = ( + "https://dashscope.aliyuncs.com/compatible-mode/v1" + ) + + # Force experiment mode for tool definition capture + self.patch_experimental = patch( + "opentelemetry.util.genai.span_utils.is_experimental_mode", + return_value=True, + ) + self.patch_content_mode = patch( + "opentelemetry.util.genai.span_utils.get_content_capturing_mode", + return_value=ContentCapturingMode.SPAN_ONLY, + ) + + self.patch_experimental.start() + self.patch_content_mode.start() + + # Instrument LiteLLM + LiteLLMInstrumentor().instrument( + tracer_provider=self.tracer_provider, + ) + + def tearDown(self): + super().tearDown() + # Uninstrument to avoid affecting other tests + LiteLLMInstrumentor().uninstrument() + self.patch_experimental.stop() + self.patch_content_mode.stop() + + def test_completion_with_tool_definition(self): + """ + Test completion with tool definitions. + """ + + tools = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the current weather in a given location", + "parameters": { + "type": "object", + "properties": { + "location": { + "type": "string", + "description": "The city and state, e.g. San Francisco, CA", + }, + "unit": { + "type": "string", + "enum": ["celsius", "fahrenheit"], + "description": "The temperature unit", + }, + }, + "required": ["location"], + }, + }, + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[ + { + "role": "user", + "content": "What's the weather like in San Francisco?", + } + ], + tools=tools, + tool_choice="auto", + ) + + # Verify the response + self.assertIsNotNone(response) + self.assertTrue(hasattr(response, "choices")) + self.assertGreater(len(response.choices), 0) + + spans = self.get_finished_spans() + self.assertEqual(len(spans), 1) + span = spans[0] + + # Verify span kind + self.assertEqual(span.attributes.get("gen_ai.span.kind"), "LLM") + + # Verify tool definitions are captured + self.assertIn("gen_ai.tool.definitions", span.attributes) + tool_defs = json.loads(span.attributes.get("gen_ai.tool.definitions")) + self.assertEqual(len(tool_defs), 1) + self.assertEqual(tool_defs[0]["name"], "get_weather") + + # Check if model requested a tool call + choice = response.choices[0] + message = choice.message + + if hasattr(message, "tool_calls") and message.tool_calls: + # Model requested tool call - verify it's captured in output + self.assertIn("gen_ai.output.messages", span.attributes) + output_messages = json.loads( + span.attributes.get("gen_ai.output.messages") + ) + self.assertIsInstance(output_messages, list) + self.assertGreater(len(output_messages), 0) + + # Check if tool call is in the output + output_msg = output_messages[0] + self.assertIn("parts", output_msg) + + def test_completion_with_multiple_tools(self): + """ + Test completion with multiple tool definitions. + """ + + tools = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get the current weather", + "parameters": { + "type": "object", + "properties": {"location": {"type": "string"}}, + "required": ["location"], + }, + }, + }, + { + "type": "function", + "function": { + "name": "get_time", + "description": "Get the current time", + "parameters": { + "type": "object", + "properties": {"timezone": {"type": "string"}}, + "required": ["timezone"], + }, + }, + }, + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[ + {"role": "user", "content": "What time is it in New York?"} + ], + tools=tools, + ) + # Verify response + self.assertIsNotNone(response) + + span = self.get_finished_spans()[0] + tool_defs = json.loads(span.attributes.get("gen_ai.tool.definitions")) + self.assertEqual(len(tool_defs), 2) + tool_names = [t["name"] for t in tool_defs] + self.assertIn("get_weather", tool_names) + self.assertIn("get_time", tool_names) + + def test_completion_with_tool_call_and_response(self): + """ + Test complete tool call flow including capture of tool responses. + """ + + tools = [ + { + "type": "function", + "function": { + "name": "calculator", + "description": "Perform basic arithmetic operations", + "parameters": { + "type": "object", + "properties": { + "operation": { + "type": "string", + "enum": [ + "add", + "subtract", + "multiply", + "divide", + ], + }, + "a": {"type": "number"}, + "b": {"type": "number"}, + }, + }, + }, + } + ] + + messages = [{"role": "user", "content": "What is 15 multiplied by 7?"}] + + # Step 1: Initial call + response = litellm.completion( + model="dashscope/qwen-plus", messages=messages, tools=tools + ) + + # Check if tool call was generated + if response.choices[0].message.tool_calls: + tool_call = response.choices[0].message.tool_calls[0] + + # Step 2: Tool response + messages.append(response.choices[0].message) + messages.append( + { + "role": "tool", + "tool_call_id": tool_call.id, + "content": "105", + } + ) + + # Step 3: Final call + litellm.completion( + model="dashscope/qwen-plus", messages=messages, tools=tools + ) + + spans = self.get_finished_spans() + self.assertGreaterEqual(len(spans), 2) + + # Verify the second call captured the 'tool' role message + final_span = spans[-1] + input_msgs = json.loads( + final_span.attributes.get("gen_ai.input.messages") + ) + roles = [m["role"] for m in input_msgs] + self.assertIn("tool", roles) + + def test_function_calling_with_streaming(self): + """ + Test tool definition capture work with streaming. + """ + + tools = [ + { + "type": "function", + "function": { + "name": "search", + "description": "Search info", + "parameters": { + "type": "object", + "properties": {"query": {"type": "string"}}, + "required": ["query"], + }, + }, + } + ] + + response = litellm.completion( + model="dashscope/qwen-plus", + messages=[{"role": "user", "content": "Search AI news"}], + tools=tools, + stream=True, + ) + list(response) + + span = self.get_finished_spans()[0] + self.assertIn("gen_ai.tool.definitions", span.attributes) + + def test_tool_choice_parameter(self): + """ + Test tool_choice parameter. + """ + + tools = [ + { + "type": "function", + "function": { + "name": "format", + "description": "Format resp", + "parameters": {"type": "object", "properties": {}}, + }, + } + ] + + litellm.completion( + model="dashscope/qwen-plus", + messages=[{"role": "user", "content": "Format this"}], + tools=tools, + tool_choice="required", + ) + + span = self.get_finished_spans()[0] + self.assertEqual( + span.attributes.get("gen_ai.request.model"), "qwen-plus" + ) + self.assertIn("gen_ai.tool.definitions", span.attributes) diff --git a/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py new file mode 100644 index 000000000..47b2c7943 --- /dev/null +++ b/instrumentation-loongsuite/loongsuite-instrumentation-litellm/tests/test_utils.py @@ -0,0 +1,109 @@ +# Copyright The OpenTelemetry Authors +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test cases for utility functions in LiteLLM instrumentation. +""" + +import unittest + +from opentelemetry.instrumentation.litellm._utils import ( + parse_provider_from_model, +) + + +class TestParseProviderFromModel(unittest.TestCase): + """ + Test cases for parse_provider_from_model function. + """ + + def test_empty_model_returns_none(self): + """Test that empty string returns None.""" + self.assertIsNone(parse_provider_from_model("")) + + def test_none_model_returns_none(self): + """Test that None returns None.""" + self.assertIsNone(parse_provider_from_model(None)) # type: ignore[arg-type] + + def test_model_with_slash_returns_provider_prefix(self): + """Test that model with '/' returns the provider prefix.""" + self.assertEqual(parse_provider_from_model("openai/gpt-4"), "openai") + self.assertEqual( + parse_provider_from_model("dashscope/qwen-turbo"), "dashscope" + ) + self.assertEqual( + parse_provider_from_model("anthropic/claude-3"), "anthropic" + ) + self.assertEqual( + parse_provider_from_model("google/gemini-pro"), "google" + ) + self.assertEqual( + parse_provider_from_model("custom-provider/some-model"), + "custom-provider", + ) + + def test_model_with_multiple_slashes_returns_first_part(self): + """Test that model with multiple '/' returns only the first part.""" + self.assertEqual( + parse_provider_from_model("openai/gpt-4/turbo"), "openai" + ) + self.assertEqual( + parse_provider_from_model("provider/model/version/extra"), + "provider", + ) + + def test_gpt_model_inferred_as_openai(self): + """Test that model containing 'gpt' is inferred as openai.""" + self.assertEqual(parse_provider_from_model("gpt-4"), "openai") + self.assertEqual(parse_provider_from_model("gpt-3.5-turbo"), "openai") + self.assertEqual(parse_provider_from_model("GPT-4"), "openai") + self.assertEqual(parse_provider_from_model("GPT-4-turbo"), "openai") + + def test_qwen_model_inferred_as_dashscope(self): + """Test that model containing 'qwen' is inferred as dashscope.""" + self.assertEqual(parse_provider_from_model("qwen-turbo"), "dashscope") + self.assertEqual(parse_provider_from_model("qwen-plus"), "dashscope") + self.assertEqual(parse_provider_from_model("QWEN-max"), "dashscope") + self.assertEqual(parse_provider_from_model("Qwen-VL"), "dashscope") + + def test_claude_model_inferred_as_anthropic(self): + """Test that model containing 'claude' is inferred as anthropic.""" + self.assertEqual(parse_provider_from_model("claude-3"), "anthropic") + self.assertEqual( + parse_provider_from_model("claude-3-opus"), "anthropic" + ) + self.assertEqual( + parse_provider_from_model("CLAUDE-instant"), "anthropic" + ) + self.assertEqual(parse_provider_from_model("Claude-2"), "anthropic") + + def test_gemini_model_inferred_as_google(self): + """Test that model containing 'gemini' is inferred as google.""" + self.assertEqual(parse_provider_from_model("gemini-pro"), "google") + self.assertEqual(parse_provider_from_model("gemini-1.5-pro"), "google") + self.assertEqual(parse_provider_from_model("GEMINI-ultra"), "google") + self.assertEqual(parse_provider_from_model("Gemini-nano"), "google") + + def test_unknown_model_returns_unknown(self): + """Test that unrecognized model names return 'unknown'.""" + self.assertEqual(parse_provider_from_model("llama-2"), "unknown") + self.assertEqual(parse_provider_from_model("mistral-7b"), "unknown") + self.assertEqual( + parse_provider_from_model("some-random-model"), "unknown" + ) + self.assertEqual(parse_provider_from_model("custom-model"), "unknown") + + +if __name__ == "__main__": + unittest.main()