diff --git a/packages/toolbox-core/pyproject.toml b/packages/toolbox-core/pyproject.toml index 68c490483..595652ed8 100644 --- a/packages/toolbox-core/pyproject.toml +++ b/packages/toolbox-core/pyproject.toml @@ -43,6 +43,11 @@ Repository = "https://github.com/googleapis/mcp-toolbox-sdk-python.git" Changelog = "https://github.com/googleapis/mcp-toolbox-sdk-python/blob/main/packages/toolbox-core/CHANGELOG.md" [project.optional-dependencies] +telemetry = [ + "opentelemetry-api>=1.20.0,<2.0.0", + "opentelemetry-sdk>=1.20.0,<2.0.0", + "opentelemetry-exporter-otlp>=1.20.0,<2.0.0" +] test = [ "black[jupyter]==26.1.0", "isort==8.0.0", diff --git a/packages/toolbox-core/src/toolbox_core/client.py b/packages/toolbox-core/src/toolbox_core/client.py index 79e598e64..856db4a6d 100644 --- a/packages/toolbox-core/src/toolbox_core/client.py +++ b/packages/toolbox-core/src/toolbox_core/client.py @@ -28,6 +28,8 @@ McpHttpTransportV20250618, McpHttpTransportV20251125, ) +from . import version +from .mcp_transport.telemetry import TELEMETRY_AVAILABLE from .protocol import Protocol, ToolSchema from .tool import ToolboxTool from .utils import identify_auth_requirements, resolve_value, warn_if_http_and_headers @@ -54,6 +56,7 @@ def __init__( protocol: Protocol = Protocol.MCP, client_name: Optional[str] = None, client_version: Optional[str] = None, + telemetry_enabled: bool = False, ): """ Initializes the ToolboxClient. @@ -67,29 +70,56 @@ def __init__( client_headers: Headers to include in each request sent through this client. protocol: The communication protocol to use. + client_name: Optional client name for identification. + client_version: Optional client version for identification. + telemetry_enabled: Whether to enable OpenTelemetry tracing and metrics. (Default: False) """ + if protocol != Protocol.MCP_LATEST: logging.warning( f"A newer version of MCP ({Protocol.MCP_LATEST.value}) is available. " "Please use Protocol.MCP_LATEST to use the latest features." ) + # Telemetry is only enabled if URL is provided AND OpenTelemetry is available + telemetry_enabled = bool(telemetry_enabled) and TELEMETRY_AVAILABLE + match protocol: case Protocol.MCP_v20251125: self.__transport = McpHttpTransportV20251125( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case Protocol.MCP_v20250618: self.__transport = McpHttpTransportV20250618( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case Protocol.MCP_v20250326: self.__transport = McpHttpTransportV20250326( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case Protocol.MCP_v20241105: self.__transport = McpHttpTransportV20241105( - url, session, protocol, client_name, client_version + url, + session, + protocol, + client_name, + client_version, + telemetry_enabled=telemetry_enabled, ) case _: raise ValueError(f"Unsupported MCP protocol version: {protocol}") diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/telemetry.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/telemetry.py new file mode 100644 index 000000000..27db88bf7 --- /dev/null +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/telemetry.py @@ -0,0 +1,408 @@ +# Copyright 2026 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""OpenTelemetry telemetry utilities for MCP protocol. + +This module implements telemetry following the MCP Semantic Conventions: +https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + +To enable telemetry, configure OpenTelemetry in YOUR application before using this library. +See module docstring below for configuration examples. + +Note: OpenTelemetry is an optional dependency. Install with: + pip install toolbox-core[telemetry] +""" + +from typing import Any, Optional +from urllib.parse import urlparse + +# Try to import OpenTelemetry - it's an optional dependency +try: + from opentelemetry import metrics, trace + from opentelemetry.metrics import Histogram, Meter + from opentelemetry.trace import Span, SpanKind, Status, StatusCode, Tracer + from opentelemetry.trace.propagation.tracecontext import ( + TraceContextTextMapPropagator, + ) + + TELEMETRY_AVAILABLE = True +except ImportError: + TELEMETRY_AVAILABLE = False + # Define placeholder types to avoid NameError at import time + Tracer = Any + Meter = Any + Histogram = Any + Span = Any + trace = None + SpanKind = None + Status = None + StatusCode = None + TraceContextTextMapPropagator = None + +# Attribute names following MCP semantic conventions +ATTR_MCP_METHOD_NAME = "mcp.method.name" +ATTR_MCP_PROTOCOL_VERSION = "mcp.protocol.version" +ATTR_MCP_SESSION_ID = "mcp.session.id" +ATTR_ERROR_TYPE = "error.type" +ATTR_GEN_AI_TOOL_NAME = "gen_ai.tool.name" +ATTR_GEN_AI_OPERATION_NAME = "gen_ai.operation.name" +ATTR_GEN_AI_PROMPT_NAME = "gen_ai.prompt.name" +ATTR_SERVER_ADDRESS = "server.address" +ATTR_SERVER_PORT = "server.port" +ATTR_NETWORK_TRANSPORT = "network.transport" +ATTR_NETWORK_PROTOCOL_NAME = "network.protocol.name" + +# Metric names following MCP semantic conventions +METRIC_CLIENT_OPERATION_DURATION = "mcp.client.operation.duration" +METRIC_CLIENT_SESSION_DURATION = "mcp.client.session.duration" + +# Histogram bucket boundaries for MCP metrics (in seconds) +# As specified in: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#metrics +MCP_DURATION_BUCKETS = [0.01, 0.02, 0.05, 0.1, 0.2, 0.5, 1, 2, 5, 10, 30, 60, 120, 300] + + +def get_tracer( + name: str = "toolbox.mcp.client", version: Optional[str] = None +) -> Optional[Tracer]: + """Get a tracer from the global TracerProvider. + + This function retrieves a tracer from the globally configured TracerProvider. + If no provider is configured by the application, OpenTelemetry returns a + no-op tracer (zero overhead, no data collected). + + Args: + name: The instrumentation scope name (default: "toolbox.mcp.client") + version: The instrumentation scope version + + Returns: + An OpenTelemetry Tracer instance (real or no-op depending on app config), + or None if telemetry is not available + + Raises: + RuntimeError: If OpenTelemetry is not installed + """ + if not TELEMETRY_AVAILABLE: + raise RuntimeError( + "Telemetry support requires OpenTelemetry. Install with: " + "pip install toolbox-core[telemetry]" + ) + return trace.get_tracer(name, version) + + +def get_meter( + name: str = "toolbox.mcp.client", version: Optional[str] = None +) -> Optional[Meter]: + """Get a meter from the global MeterProvider. + + This function retrieves a meter from the globally configured MeterProvider. + If no provider is configured by the application, OpenTelemetry returns a + no-op meter (zero overhead, no data collected). + + Args: + name: The instrumentation scope name (default: "toolbox.mcp.client") + version: The instrumentation scope version + + Returns: + An OpenTelemetry Meter instance (real or no-op depending on app config), + or None if telemetry is not available + + Raises: + RuntimeError: If OpenTelemetry is not installed + """ + if not TELEMETRY_AVAILABLE: + raise RuntimeError( + "Telemetry support requires OpenTelemetry. Install with: " + "pip install toolbox-core[telemetry]" + ) + return metrics.get_meter(name, version) + + +def create_operation_duration_histogram(meter: Meter) -> Optional[Histogram]: + """Create histogram for MCP client operation duration. + + Bucket boundaries are configured via Views in setup_telemetry() to match + the MCP semantic conventions. + + Args: + meter: The OpenTelemetry meter + + Returns: + Histogram instance or None if creation failed + """ + try: + return meter.create_histogram( + name=METRIC_CLIENT_OPERATION_DURATION, + unit="s", + description="Duration of MCP client operations (requests/notifications)", + explicit_bucket_boundaries_advisory=MCP_DURATION_BUCKETS, + ) + except Exception: + return None + + +def create_session_duration_histogram(meter: Meter) -> Optional[Histogram]: + """Create histogram for MCP client session duration. + + Bucket boundaries are configured via Views in setup_telemetry() to match + the MCP semantic conventions. + + Args: + meter: The OpenTelemetry meter + + Returns: + Histogram instance or None if creation failed + """ + try: + return meter.create_histogram( + name=METRIC_CLIENT_SESSION_DURATION, + unit="s", + description="Total duration of MCP client sessions", + explicit_bucket_boundaries_advisory=MCP_DURATION_BUCKETS, + ) + except Exception: + return None + + +def extract_server_info(url: str) -> tuple[str, Optional[int], str]: + """Extract server address, port, and protocol from URL. + + Args: + url: The server URL + + Returns: + Tuple of (server_address, server_port, protocol_name) + """ + parsed = urlparse(url) + protocol_name = parsed.scheme if parsed.scheme else "http" + return parsed.hostname or parsed.netloc, parsed.port, protocol_name + + +def create_traceparent_from_context() -> str: + """Create W3C traceparent header from current trace context. + + Returns: + W3C traceparent header string in format: + version-trace_id-parent_id-trace_flags + Example: 00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01 + Empty string if telemetry is not available. + """ + if not TELEMETRY_AVAILABLE: + return "" + propagator = TraceContextTextMapPropagator() + carrier: dict[str, str] = {} + propagator.inject(carrier) + return carrier.get("traceparent", "") + + +def create_tracestate_from_context() -> str: + """Create W3C tracestate header from current trace context. + + Returns: + W3C tracestate header string, empty string if telemetry is not available. + """ + if not TELEMETRY_AVAILABLE: + return "" + propagator = TraceContextTextMapPropagator() + carrier: dict[str, str] = {} + propagator.inject(carrier) + return carrier.get("tracestate", "") + + +def start_span( + tracer: Tracer, + method_name: str, + protocol_version: str, + server_url: str, + tool_name: Optional[str] = None, + network_transport: Optional[str] = None, +) -> Optional[Span]: + """Start a telemetry span for MCP operations. Returns None if telemetry fails. + + Args: + tracer: The OpenTelemetry tracer + method_name: The MCP method name (e.g., "tools/call", "tools/list") + protocol_version: The MCP protocol version + server_url: The MCP server URL + tool_name: Optional tool name for tools/call operations + network_transport: Optional network transport type ("tcp" for HTTP/HTTPS, "pipe" for stdio) + + Returns: + The started span, or None if telemetry failed + """ + try: + span_name = f"{method_name} {tool_name}" if tool_name else method_name + span = tracer.start_span(span_name, kind=SpanKind.CLIENT) + + # Required: MCP method name + span.set_attribute(ATTR_MCP_METHOD_NAME, method_name) + span.set_attribute(ATTR_MCP_PROTOCOL_VERSION, protocol_version) + + # Extract server info and network protocol from URL + server_address, server_port, protocol_name = extract_server_info(server_url) + span.set_attribute(ATTR_SERVER_ADDRESS, server_address) + span.set_attribute(ATTR_NETWORK_PROTOCOL_NAME, protocol_name) + if server_port: + span.set_attribute(ATTR_SERVER_PORT, server_port) + + # Network transport ("tcp" for HTTP/HTTPS, "pipe" for stdio) + if network_transport: + span.set_attribute(ATTR_NETWORK_TRANSPORT, network_transport) + + if tool_name: + span.set_attribute(ATTR_GEN_AI_TOOL_NAME, tool_name) + if method_name == "tools/call": + span.set_attribute(ATTR_GEN_AI_OPERATION_NAME, "execute_tool") + + return span + except Exception: + # Telemetry failed - continue without it + return None + + +def end_span(span: Optional[Span], error: Optional[Exception] = None) -> None: + """End a telemetry span. Safe to call with None span. + + Args: + span: The span to end (can be None if telemetry failed) + error: Optional exception if operation failed + """ + if span is None: + return + try: + if error: + span.set_status(Status(StatusCode.ERROR, str(error))) + span.set_attribute(ATTR_ERROR_TYPE, type(error).__name__) + span.end() + except Exception: + # Ignore telemetry errors + pass + + +def record_error_from_jsonrpc(span: Span, error_code: int, error_message: str) -> None: + """Record error information from JSON-RPC error response. + + Args: + span: The span to record the error on + error_code: The JSON-RPC error code + error_message: The JSON-RPC error message + """ + span.set_status(Status(StatusCode.ERROR, error_message)) + span.set_attribute(ATTR_ERROR_TYPE, f"jsonrpc.error.{error_code}") + + +def record_operation_duration( + histogram: Optional[Histogram], + duration_seconds: float, + method_name: str, + protocol_version: str, + server_url: str, + tool_name: Optional[str] = None, + network_transport: Optional[str] = None, + error: Optional[Exception] = None, +) -> None: + """Record MCP client operation duration metric. + + Args: + histogram: The operation duration histogram (can be None if metrics failed) + duration_seconds: Duration of the operation in seconds + method_name: The MCP method name (required attribute) + protocol_version: The MCP protocol version (recommended attribute) + server_url: The MCP server URL (for extracting server address/port) + tool_name: Optional tool name for tools/call operations + network_transport: Optional network transport type ("tcp" for HTTP/HTTPS) + error: Optional exception if operation failed (for error.type attribute) + """ + if histogram is None: + return + + try: + # Build attributes dict following MCP semantic conventions + attributes = { + ATTR_MCP_METHOD_NAME: method_name, + ATTR_MCP_PROTOCOL_VERSION: protocol_version, + } + + # Extract and add server info + server_address, server_port, protocol_name = extract_server_info(server_url) + attributes[ATTR_SERVER_ADDRESS] = server_address + attributes[ATTR_NETWORK_PROTOCOL_NAME] = protocol_name + if server_port: + attributes[ATTR_SERVER_PORT] = server_port + + # Add optional network transport + if network_transport: + attributes[ATTR_NETWORK_TRANSPORT] = network_transport + + # Add tool-related attributes for tools/call operations + if tool_name: + attributes[ATTR_GEN_AI_TOOL_NAME] = tool_name + if method_name == "tools/call": + attributes[ATTR_GEN_AI_OPERATION_NAME] = "execute_tool" + + # Add error type if operation failed + if error: + attributes[ATTR_ERROR_TYPE] = type(error).__name__ + + histogram.record(duration_seconds, attributes) + except Exception: + # Ignore metrics recording errors + pass + + +def record_session_duration( + histogram: Optional[Histogram], + duration_seconds: float, + protocol_version: str, + server_url: str, + network_transport: Optional[str] = None, + error: Optional[Exception] = None, +) -> None: + """Record MCP client session duration metric. + + Args: + histogram: The session duration histogram (can be None if metrics failed) + duration_seconds: Duration of the session in seconds + protocol_version: The MCP protocol version (recommended attribute) + server_url: The MCP server URL (for extracting server address/port) + network_transport: Optional network transport type ("tcp" for HTTP/HTTPS) + error: Optional exception if session ended with error + """ + if histogram is None: + return + + try: + # Build attributes dict following MCP semantic conventions + attributes = { + ATTR_MCP_PROTOCOL_VERSION: protocol_version, + } + + # Extract and add server info + server_address, server_port, protocol_name = extract_server_info(server_url) + attributes[ATTR_SERVER_ADDRESS] = server_address + attributes[ATTR_NETWORK_PROTOCOL_NAME] = protocol_name + if server_port: + attributes[ATTR_SERVER_PORT] = server_port + + # Add optional network transport + if network_transport: + attributes[ATTR_NETWORK_TRANSPORT] = network_transport + + # Add error type if session ended with error + if error: + attributes[ATTR_ERROR_TYPE] = type(error).__name__ + + histogram.record(duration_seconds, attributes) + except Exception: + # Ignore metrics recording errors + pass diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py index 5a7b3d0e8..bd01075e6 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/transport_base.py @@ -38,6 +38,7 @@ def __init__( protocol: Protocol = Protocol.MCP, client_name: Optional[str] = None, client_version: Optional[str] = None, + telemetry_enabled: bool = False, ): self._mcp_base_url = f"{base_url}/mcp/" self._protocol_version = protocol.value @@ -45,6 +46,7 @@ def __init__( self._client_name = client_name self._client_version = client_version + self._telemetry_enabled = telemetry_enabled self._manage_session = session is None self._session = session or ClientSession() diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py index cffa8b244..84685d52d 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -27,6 +29,27 @@ class McpHttpTransportV20241105(_McpHttpTransportBase): """Transport for the MCP v2024-11-05 protocol.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -85,6 +108,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -92,32 +143,56 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) + + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) + + self._server_version = result.serverInfo.version + + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + f"MCP version mismatch: client does not support server version {result.protocolVersion}" + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - self._server_version = result.serverInfo.version - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - f"MCP version mismatch: client does not support server version {result.protocolVersion}" + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, ) - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") - - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -128,20 +203,73 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema(serverVersion=self._server_version, tools=tools_map) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema(serverVersion=self._server_version, tools=tools_map) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -157,22 +285,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py index 5cfca277a..646d5b271 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20241105/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py index 14023a2a9..d9045a631 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -31,6 +33,24 @@ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._session_id: Optional[str] = None + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -101,6 +121,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -108,45 +156,67 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) + + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + self._server_version = result.serverInfo.version - self._server_version = result.serverInfo.version + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + "MCP version mismatch: client does not support server version" + f" {result.protocolVersion}" + ) - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - "MCP version mismatch: client does not support server version" - f" {result.protocolVersion}" - ) + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") + # Extract session ID from extra fields (v2025-03-26 specific) + # Session ID is captured from headers in _send_request - # Extract session ID from extra fields (v2025-03-26 specific) - # Session ID is captured from headers in _send_request + if not self._session_id: + if self._manage_session: + await self.close() + raise RuntimeError( + "Server did not return a Mcp-Session-Id during initialization." + ) - if not self._session_id: - if self._manage_session: - await self.close() - raise RuntimeError( - "Server did not return a Mcp-Session-Id during initialization." + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, ) - - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -157,23 +227,76 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema( - serverVersion=self._server_version, - tools=tools_map, - ) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema( + serverVersion=self._server_version, + tools=tools_map, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -189,23 +312,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py index 5cfca277a..646d5b271 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250326/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py index 81e0bc183..12c507c7c 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -27,6 +29,27 @@ class McpHttpTransportV20250618(_McpHttpTransportBase): """Transport for the MCP v2025-06-18 protocol.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -92,6 +115,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -99,35 +150,57 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) - self._server_version = result.serverInfo.version + self._server_version = result.serverInfo.version - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - "MCP version mismatch: client does not support server version" - f" {result.protocolVersion}" - ) + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + "MCP version mismatch: client does not support server version" + f" {result.protocolVersion}" + ) - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -138,23 +211,76 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema( - serverVersion=self._server_version, - tools=tools_map, - ) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema( + serverVersion=self._server_version, + tools=tools_map, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -170,23 +296,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py index 5cfca277a..646d5b271 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20250618/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py index 81ab05cbb..87080c2bd 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/mcp.py @@ -12,12 +12,14 @@ # See the License for the specific language governing permissions and # limitations under the License. +import time from typing import Mapping, Optional, TypeVar from pydantic import BaseModel from ... import version from ...protocol import ManifestSchema +from .. import telemetry from ..transport_base import _McpHttpTransportBase from . import types @@ -27,6 +29,27 @@ class McpHttpTransportV20251125(_McpHttpTransportBase): """Transport for the MCP v2025-11-25 protocol.""" + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + if self._telemetry_enabled: + self._tracer = telemetry.get_tracer("toolbox", version.__version__) + + # Initialize metrics following MCP semantic conventions + meter = telemetry.get_meter("toolbox", version.__version__) + self._operation_duration_histogram = ( + telemetry.create_operation_duration_histogram(meter) + ) + self._session_duration_histogram = ( + telemetry.create_session_duration_histogram(meter) + ) + self._session_start_time: Optional[float] = None + else: + self._tracer = None + self._operation_duration_histogram = None + self._session_duration_histogram = None + self._session_start_time = None + async def _send_request( self, url: str, @@ -38,7 +61,7 @@ async def _send_request( req_headers["MCP-Protocol-Version"] = self._protocol_version params = ( - request.params.model_dump(mode="json", exclude_none=True) + request.params.model_dump(mode="json", exclude_none=True, by_alias=True) if isinstance(request.params, BaseModel) else request.params ) @@ -92,6 +115,34 @@ async def _initialize_session( self, headers: Optional[Mapping[str, str]] = None ) -> None: """Initializes the MCP session.""" + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Track session start time for session duration metric + self._session_start_time = time.time() + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + params = types.InitializeRequestParams( protocolVersion=self._protocol_version, capabilities=types.ClientCapabilities(), @@ -99,35 +150,57 @@ async def _initialize_session( name=self._client_name or "toolbox-core-python", version=self._client_version or version.__version__, ), + field_meta=meta, ) - result = await self._send_request( - url=self._mcp_base_url, - request=types.InitializeRequest(params=params), - headers=headers, - ) + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.InitializeRequest(params=params), + headers=headers, + ) - if result is None: - raise RuntimeError("Failed to initialize session: No response from server.") + if result is None: + raise RuntimeError( + "Failed to initialize session: No response from server." + ) - self._server_version = result.serverInfo.version + self._server_version = result.serverInfo.version - if result.protocolVersion != self._protocol_version: - raise RuntimeError( - "MCP version mismatch: client does not support server version" - f" {result.protocolVersion}" - ) + if result.protocolVersion != self._protocol_version: + raise RuntimeError( + "MCP version mismatch: client does not support server version" + f" {result.protocolVersion}" + ) - if not result.capabilities.tools: - if self._manage_session: - await self.close() - raise RuntimeError("Server does not support the 'tools' capability.") + if not result.capabilities.tools: + if self._manage_session: + await self.close() + raise RuntimeError("Server does not support the 'tools' capability.") - await self._send_request( - url=self._mcp_base_url, - request=types.InitializedNotification(), - headers=headers, - ) + await self._send_request( + url=self._mcp_base_url, + request=types.InitializedNotification(), + headers=headers, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "initialize", + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + error=error, + ) + telemetry.end_span(span, error=error) async def tools_list( self, @@ -138,23 +211,76 @@ async def tools_list( await self._ensure_initialized(headers=headers) url = self._mcp_base_url + (toolset_name if toolset_name else "") - result = await self._send_request( - url=url, request=types.ListToolsRequest(), headers=headers - ) - if result is None: - raise RuntimeError("Failed to list tools: No response from server.") - tools_map = { - t.name: self._convert_tool_schema(t.model_dump(mode="json", by_alias=True)) - for t in result.tools - } - if self._server_version is None: - raise RuntimeError("Server version not available.") + meta: Optional[types.MCPMeta] = None - return ManifestSchema( - serverVersion=self._server_version, - tools=tools_map, - ) + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) + + error: Optional[Exception] = None + try: + result = await self._send_request( + url=url, + request=types.ListToolsRequest( + params=types.ListToolsRequestParams(field_meta=meta) + ), + headers=headers, + ) + if result is None: + raise RuntimeError("Failed to list tools: No response from server.") + + tools_map = { + t.name: self._convert_tool_schema( + t.model_dump(mode="json", by_alias=True) + ) + for t in result.tools + } + if self._server_version is None: + raise RuntimeError("Server version not available.") + + return ManifestSchema( + serverVersion=self._server_version, + tools=tools_map, + ) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/list", + self._protocol_version, + url, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) async def tool_get( self, tool_name: str, headers: Optional[Mapping[str, str]] = None @@ -170,23 +296,90 @@ async def tool_get( tools={tool_name: manifest.tools[tool_name]}, ) + async def close(self): + """Closes the MCP session and records session duration metric.""" + if self._telemetry_enabled: + # Record session duration if session was initialized + if self._session_start_time is not None: + session_duration = time.time() - self._session_start_time + telemetry.record_session_duration( + self._session_duration_histogram, + session_duration, + self._protocol_version, + self._mcp_base_url, + network_transport="tcp", + ) + # Call parent's close method + await super().close() + async def tool_invoke( self, tool_name: str, arguments: dict, headers: Optional[Mapping[str, str]] ) -> str: """Invokes a specific tool on the server using the MCP protocol.""" await self._ensure_initialized(headers=headers) - result = await self._send_request( - url=self._mcp_base_url, - request=types.CallToolRequest( - params=types.CallToolRequestParams(name=tool_name, arguments=arguments) - ), - headers=headers, - ) + meta: Optional[types.MCPMeta] = None + + if self._telemetry_enabled: + from opentelemetry import trace + + # Start telemetry span and track operation start time + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp + operation_start = time.time() + span = telemetry.start_span( + self._tracer, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + ) + + # CRITICAL: Make the span active in the context before generating trace context + with trace.use_span(span, end_on_exit=False): + # The client span becomes the parent of the server span through this context + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + traceparent = telemetry.create_traceparent_from_context() + tracestate = telemetry.create_tracestate_from_context() + meta = types.MCPMeta( + traceparent=traceparent, + tracestate=tracestate, + ) - if result is None: - raise RuntimeError( - f"Failed to invoke tool '{tool_name}': No response from server." + error: Optional[Exception] = None + try: + result = await self._send_request( + url=self._mcp_base_url, + request=types.CallToolRequest( + params=types.CallToolRequestParams( + name=tool_name, arguments=arguments, field_meta=meta + ) + ), + headers=headers, ) - return self._process_tool_result_content(result.content) + if result is None: + raise RuntimeError( + f"Failed to invoke tool '{tool_name}': No response from server." + ) + + return self._process_tool_result_content(result.content) + except Exception as e: + error = e + raise + finally: + if self._telemetry_enabled: + # Record operation duration metric + operation_duration = time.time() - operation_start + telemetry.record_operation_duration( + self._operation_duration_histogram, + operation_duration, + "tools/call", + self._protocol_version, + self._mcp_base_url, + tool_name=tool_name, + network_transport="tcp", + error=error, + ) + # End span + telemetry.end_span(span, error=error) diff --git a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py index 4cbcfa992..443f5a474 100644 --- a/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py +++ b/packages/toolbox-core/src/toolbox_core/mcp_transport/v20251125/types.py @@ -77,6 +77,9 @@ class InitializeRequestParams(RequestParams): protocolVersion: str capabilities: ClientCapabilities clientInfo: Implementation + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") class ServerCapabilities(_BaseMCPModel): @@ -100,6 +103,12 @@ class ListToolsResult(_BaseMCPModel): tools: list[Tool] +class ListToolsRequestParams(_BaseMCPModel): + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: "MCPMeta | None" = Field(default=None, serialization_alias="_meta") + + class TextContent(_BaseMCPModel): type: Literal["text"] text: str @@ -141,15 +150,25 @@ class InitializedNotification(MCPNotification): class ListToolsRequest(MCPRequest[ListToolsResult]): method: Literal["tools/list"] = "tools/list" - params: dict[str, Any] = {} + params: ListToolsRequestParams = Field(default_factory=ListToolsRequestParams) def get_result_model(self) -> Type[ListToolsResult]: return ListToolsResult +class MCPMeta(_BaseMCPModel): + """Metadata for MCP requests including OpenTelemetry trace context.""" + + traceparent: str | None = None + tracestate: str | None = None + + class CallToolRequestParams(_BaseMCPModel): name: str arguments: dict[str, Any] + # OpenTelemetry trace context propagation + # See: https://opentelemetry.io/docs/specs/semconv/gen-ai/mcp/#context-propagation + field_meta: MCPMeta | None = Field(default=None, serialization_alias="_meta") class CallToolRequest(MCPRequest[CallToolResult]): diff --git a/packages/toolbox-core/src/toolbox_core/sync_client.py b/packages/toolbox-core/src/toolbox_core/sync_client.py index 606c27234..cad63b2a7 100644 --- a/packages/toolbox-core/src/toolbox_core/sync_client.py +++ b/packages/toolbox-core/src/toolbox_core/sync_client.py @@ -45,6 +45,7 @@ def __init__( protocol: Protocol = Protocol.MCP, client_name: Optional[str] = None, client_version: Optional[str] = None, + telemetry_enabled: bool = False, ): """ Initializes the ToolboxSyncClient. @@ -52,6 +53,10 @@ def __init__( Args: url: The base URL for the Toolbox service API (e.g., "http://localhost:5000"). client_headers: Headers to include in each request sent through this client. + protocol: The communication protocol to use. + client_name: Optional client name for identification. + client_version: Optional client version for identification. + telemetry_enabled: Whether to enable OpenTelemetry tracing and metrics. (Default: False) """ # Running a loop in a background thread allows us to support async # methods from non-async environments. @@ -71,6 +76,7 @@ async def create_client(): protocol=protocol, client_name=client_name, client_version=client_version, + telemetry_enabled=telemetry_enabled, ) self.__async_client = run_coroutine_threadsafe(