diff --git a/api/ee/databases/postgres/migrations/core/env.py b/api/ee/databases/postgres/migrations/core/env.py index e5e251f801..94543cd6ff 100644 --- a/api/ee/databases/postgres/migrations/core/env.py +++ b/api/ee/databases/postgres/migrations/core/env.py @@ -27,7 +27,6 @@ from oss.src.dbs.postgres.shared.base import Base import oss.src.dbs.postgres.secrets.dbes -import oss.src.dbs.postgres.observability.dbes import oss.src.dbs.postgres.tracing.dbes import oss.src.dbs.postgres.testcases.dbes import oss.src.dbs.postgres.testsets.dbes diff --git a/api/ee/databases/postgres/migrations/core/versions/cfa14a847972_drop_nodes.py b/api/ee/databases/postgres/migrations/core/versions/cfa14a847972_drop_nodes.py new file mode 100644 index 0000000000..c73d48caa9 --- /dev/null +++ b/api/ee/databases/postgres/migrations/core/versions/cfa14a847972_drop_nodes.py @@ -0,0 +1,25 @@ +"""drop nodes + +Revision ID: cfa14a847972 +Revises: a1b2c3d4e5f6 +Create Date: 2025-11-16 11:29:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cfa14a847972" +down_revision: Union[str, None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_table("nodes") + + +def downgrade() -> None: + pass diff --git a/api/ee/databases/postgres/migrations/tracing/versions/cfa14a847972_drop_nodes.py b/api/ee/databases/postgres/migrations/tracing/versions/cfa14a847972_drop_nodes.py new file mode 100644 index 0000000000..b7ecaa2734 --- /dev/null +++ b/api/ee/databases/postgres/migrations/tracing/versions/cfa14a847972_drop_nodes.py @@ -0,0 +1,25 @@ +"""drop nodes + +Revision ID: cfa14a847972 +Revises: fd77265d65dc +Create Date: 2025-11-16 11:29:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cfa14a847972" +down_revision: Union[str, None] = "fd77265d65dc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_table("nodes") + + +def downgrade() -> None: + pass diff --git a/api/ee/tests/manual/evaluations/sdk/testset-management.ipynb b/api/ee/tests/manual/evaluations/sdk/testset-management.ipynb index e121df8f7e..3026c5c914 100644 --- a/api/ee/tests/manual/evaluations/sdk/testset-management.ipynb +++ b/api/ee/tests/manual/evaluations/sdk/testset-management.ipynb @@ -32,7 +32,7 @@ "text": [ "2025-10-23T16:46:06.701Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - SDK version: 0.51.2 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", "2025-10-23T16:46:06.702Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - Host: http://144.76.237.122 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-10-23T16:46:06.702Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OLTP URL: http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" + "2025-10-23T16:46:06.702Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OTLP URL: http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" ] } ], diff --git a/api/entrypoint.py b/api/entrypoint.py index 0874e910ae..4e24b7831c 100644 --- a/api/entrypoint.py +++ b/api/entrypoint.py @@ -21,7 +21,7 @@ check_for_new_migrations as check_for_new_tracing_migrations, ) -from oss.src.services.auth_helper import authentication_middleware +from oss.src.services.auth_service import authentication_middleware from oss.src.services.analytics_service import analytics_middleware from oss.src.routers import evaluation_router, human_evaluation_router @@ -48,7 +48,6 @@ # DAOs from oss.src.dbs.postgres.secrets.dao import SecretsDAO -from oss.src.dbs.postgres.observability.dao import ObservabilityDAO from oss.src.dbs.postgres.tracing.dao import TracingDAO from oss.src.dbs.postgres.blobs.dao import BlobsDAO from oss.src.dbs.postgres.git.dao import GitDAO @@ -56,7 +55,6 @@ # Services from oss.src.core.secrets.services import VaultService -from oss.src.core.observability.service import ObservabilityService from oss.src.core.tracing.service import TracingService from oss.src.core.invocations.service import InvocationsService from oss.src.core.annotations.service import AnnotationsService @@ -74,7 +72,7 @@ # Routers from oss.src.apis.fastapi.vault.router import VaultRouter -from oss.src.apis.fastapi.observability.router import ObservabilityRouter +from oss.src.apis.fastapi.otlp.router import OTLPRouter from oss.src.apis.fastapi.tracing.router import TracingRouter from oss.src.apis.fastapi.invocations.router import InvocationsRouter from oss.src.apis.fastapi.annotations.router import AnnotationsRouter @@ -192,8 +190,6 @@ async def lifespan(*args, **kwargs): secrets_dao = SecretsDAO() -observability_dao = ObservabilityDAO() - tracing_dao = TracingDAO() testcases_dao = BlobsDAO( @@ -226,10 +222,6 @@ async def lifespan(*args, **kwargs): secrets_dao=secrets_dao, ) -observability_service = ObservabilityService( - observability_dao=observability_dao, -) - tracing_service = TracingService( tracing_dao=tracing_dao, ) @@ -292,8 +284,7 @@ async def lifespan(*args, **kwargs): vault_service=vault_service, ) -observability = ObservabilityRouter( - observability_service=observability_service, +otlp = OTLPRouter( tracing_service=tracing_service, ) @@ -374,21 +365,15 @@ async def lifespan(*args, **kwargs): ) app.include_router( - router=observability.otlp, - prefix="/otlp", - tags=["Observability"], -) - -app.include_router( - router=observability.router, - prefix="/observability/v1", + router=otlp.router, + prefix="/otlp/v1", tags=["Observability"], ) app.include_router( router=tracing.router, prefix="/preview/tracing", - tags=["Tracing"], + tags=["Observability"], ) app.include_router( diff --git a/api/oss/databases/postgres/migrations/core/env.py b/api/oss/databases/postgres/migrations/core/env.py index 3c6e049969..6897db4fc3 100644 --- a/api/oss/databases/postgres/migrations/core/env.py +++ b/api/oss/databases/postgres/migrations/core/env.py @@ -17,7 +17,6 @@ import oss.src.dbs.postgres.secrets.dbes -import oss.src.dbs.postgres.observability.dbes # Interpret the config file for Python logging. # This line sets up loggers basically. @@ -29,7 +28,6 @@ from oss.src.dbs.postgres.shared.base import Base import oss.src.dbs.postgres.secrets.dbes -import oss.src.dbs.postgres.observability.dbes # from myapp import mymodel # target_metadata = mymodel.Base.metadata diff --git a/api/oss/databases/postgres/migrations/core/versions/cfa14a847972_drop_nodes.py b/api/oss/databases/postgres/migrations/core/versions/cfa14a847972_drop_nodes.py new file mode 100644 index 0000000000..c73d48caa9 --- /dev/null +++ b/api/oss/databases/postgres/migrations/core/versions/cfa14a847972_drop_nodes.py @@ -0,0 +1,25 @@ +"""drop nodes + +Revision ID: cfa14a847972 +Revises: a1b2c3d4e5f6 +Create Date: 2025-11-16 11:29:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cfa14a847972" +down_revision: Union[str, None] = "a1b2c3d4e5f6" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_table("nodes") + + +def downgrade() -> None: + pass diff --git a/api/oss/databases/postgres/migrations/tracing/versions/cfa14a847972_drop_nodes.py b/api/oss/databases/postgres/migrations/tracing/versions/cfa14a847972_drop_nodes.py new file mode 100644 index 0000000000..b7ecaa2734 --- /dev/null +++ b/api/oss/databases/postgres/migrations/tracing/versions/cfa14a847972_drop_nodes.py @@ -0,0 +1,25 @@ +"""drop nodes + +Revision ID: cfa14a847972 +Revises: fd77265d65dc +Create Date: 2025-11-16 11:29:00.000000 + +""" + +from typing import Sequence, Union + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "cfa14a847972" +down_revision: Union[str, None] = "fd77265d65dc" +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + op.drop_table("nodes") + + +def downgrade() -> None: + pass diff --git a/api/oss/src/apis/fastapi/observability/router.py b/api/oss/src/apis/fastapi/observability/router.py deleted file mode 100644 index 2c96b9a000..0000000000 --- a/api/oss/src/apis/fastapi/observability/router.py +++ /dev/null @@ -1,666 +0,0 @@ -from typing import Dict, List, Union, Literal, Optional -from uuid import UUID - -from fastapi import APIRouter, Request, Depends, Query, status, HTTPException -from fastapi.responses import Response - -import posthog - -from oss.src.utils.common import is_ee -from oss.src.utils.logging import get_module_logger -from oss.src.utils.exceptions import intercept_exceptions, suppress_exceptions -from oss.src.utils.caching import get_cache, set_cache, invalidate_cache - -from oss.src.core.observability.service import ObservabilityService -from oss.src.core.observability.dtos import ( - QueryDTO, - AnalyticsDTO, - TreeDTO, - RootDTO, - GroupingDTO, - FilteringDTO, - ConditionDTO, - Focus, -) - -from oss.src.core.tracing.dtos import OTelFlatSpan - -from oss.src.core.tracing.service import TracingService -from oss.src.core.observability.utils import FilteringException - -from oss.src.apis.fastapi.observability.opentelemetry.otlp import ( - parse_otlp_stream, -) -from oss.src.apis.fastapi.observability.utils.processing import ( - parse_query_from_params_request, - parse_analytics_dto, - parse_from_otel_span_dto, - parse_to_otel_span_dto, - parse_to_agenta_span_dto, - parse_legacy_analytics_dto, - parse_legacy_analytics, -) -from oss.src.apis.fastapi.observability.models import ( - CollectStatusResponse, - OTelTracingResponse, - AgentaNodesResponse, - AgentaTreesResponse, - AgentaRootsResponse, - AgentaNodeDTO, - AgentaTreeDTO, - AgentaRootDTO, - LegacyAnalyticsResponse, - OldAnalyticsResponse, -) - -if is_ee(): - from ee.src.utils.entitlements import check_entitlements, Counter - -# OTLP Protobuf response message for full success -from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( - ExportTraceServiceResponse, -) - -# Protobuf Status for error responses -from google.rpc.status_pb2 import Status as ProtoStatus - -from oss.src.utils.env import env - -MAX_OTLP_BATCH_SIZE = env.AGENTA_OTLP_MAX_BATCH_BYTES -MAX_OTLP_BATCH_SIZE_MB = MAX_OTLP_BATCH_SIZE // (1024 * 1024) - - -log = get_module_logger(__name__) - - -POSTHOG_API_KEY = env.POSTHOG_API_KEY -POSTHOG_HOST = env.POSTHOG_HOST - - -if POSTHOG_API_KEY: - posthog.api_key = POSTHOG_API_KEY - posthog.host = POSTHOG_HOST - log.info("PostHog initialized with host %s", POSTHOG_HOST) -else: - log.warn("PostHog API key not found in environment variables") - - -class ObservabilityRouter: - def __init__( - self, - observability_service: ObservabilityService, - tracing_service: Optional[TracingService] = None, - ): - self.service = observability_service - self.tracing = tracing_service - - self.router = APIRouter() - - self.otlp = APIRouter() - - ### OTLP (Collector) - - self.otlp.add_api_route( - "/v1/traces", - self.otlp_receiver, - methods=["POST"], - operation_id="otlp_v1_traces", - summary="Receive /v1/traces via OTLP", - status_code=status.HTTP_200_OK, - response_model=CollectStatusResponse, - ) - - ### OTLP (SDK) - - self.router.add_api_route( - "/otlp/traces", - self.otlp_status, - methods=["GET"], - operation_id="otlp_status", - summary="Status of OTLP endpoint", - status_code=status.HTTP_200_OK, - response_model=CollectStatusResponse, - ) - - self.router.add_api_route( - "/otlp/traces", - self.otlp_receiver, - methods=["POST"], - operation_id="otlp_receiver", - summary="Receive traces via OTLP", - status_code=status.HTTP_200_OK, - response_model=CollectStatusResponse, - ) - - ### QUERIES - - self.router.add_api_route( - "/traces", - self.query_traces, - methods=["GET"], - operation_id="query_traces", - summary="Query traces, with optional grouping, windowing, filtering, and pagination.", - status_code=status.HTTP_200_OK, - response_model=Union[ - OTelTracingResponse, - AgentaNodesResponse, - AgentaTreesResponse, - AgentaRootsResponse, - ], - response_model_exclude_none=True, - ) - - self.router.add_api_route( - "/analytics", - self.query_analytics, - methods=["GET"], - operation_id="query_analytics", - summary="Query analytics, with optional grouping, windowing, filtering.", - status_code=status.HTTP_200_OK, - response_model=Union[ - LegacyAnalyticsResponse, - OldAnalyticsResponse, - ], - response_model_exclude_none=True, - ) - - self.router.add_api_route( - "/traces/{trace_id}", - self.fetch_trace_by_id, - methods=["GET"], - operation_id="fetch_trace_by_id", - summary="Fetch trace by ID.", - status_code=status.HTTP_200_OK, - response_model=Union[ - OTelTracingResponse, - AgentaNodesResponse, - AgentaTreesResponse, - AgentaRootsResponse, - ], - response_model_exclude_none=True, - ) - - ### MUTATIONS - - self.router.add_api_route( - "/traces", - self.delete_traces, - methods=["DELETE"], - operation_id="delete_traces", - summary="Delete traces", - status_code=status.HTTP_200_OK, - response_model=CollectStatusResponse, - ) - - ### OTLP - - @intercept_exceptions() - async def otlp_status(self): - """ - Status of OTLP endpoint. - """ - - return CollectStatusResponse(status="ready") - - @intercept_exceptions() - async def otlp_receiver( - self, - request: Request, - ): - """ - Receive traces via OTLP. - """ - - otlp_stream = None - try: - # ---------------------------------------------------------------- # - otlp_stream = await request.body() - # ---------------------------------------------------------------- # - except Exception as e: - log.error( - "Failed to process OTLP stream from project %s with error:", - request.state.project_id, - exc_info=True, - ) - err_status = ProtoStatus( - message="Invalid request body: not a valid OTLP stream." - ) - return Response( - content=err_status.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_400_BAD_REQUEST, - ) - - if len(otlp_stream) > MAX_OTLP_BATCH_SIZE: - log.error( - "OTLP batch too large (%s bytes > %s bytes) from project %s", - len(otlp_stream), - MAX_OTLP_BATCH_SIZE, - request.state.project_id, - ) - err_status = ProtoStatus( - message=f"OTLP batch size exceeds {MAX_OTLP_BATCH_SIZE_MB}MB limit." - ) - return Response( - content=err_status.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, - ) - - otel_spans = None - try: - # ---------------------------------------------------------------- # - otel_spans = parse_otlp_stream(otlp_stream) - # ---------------------------------------------------------------- # - except Exception as e: - log.error( - "Failed to parse OTLP stream from project %s with error:", - request.state.project_id, - exc_info=True, - ) - log.error( - "OTLP stream: %s", - otlp_stream, - ) - err_status = ProtoStatus(message="Failed to parse OTLP stream.") - return Response( - content=err_status.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - - # -------------------------------------------------------------------- # - feature_flag = "create-spans-from-nodes" - - cache_key = { - "feature_flag": feature_flag, - } - - flag_create_spans_from_nodes = await get_cache( - namespace="posthog:flags", - key=cache_key, - retry=False, - ) - - if flag_create_spans_from_nodes is None: - if env.POSTHOG_API_KEY: - flag_create_spans_from_nodes = posthog.feature_enabled( - feature_flag, - "user distinct id", - ) - - await set_cache( - namespace="posthog:flags", - key=cache_key, - value=flag_create_spans_from_nodes, - ) - # -------------------------------------------------------------------- # - - # for otel_span in otel_spans: - # log.debug( - # "Receiving trace... ", - # project_id=request.state.project_id, - # trace_id=str(UUID(otel_span.context.trace_id[2:])), - # ) - - span_dtos = None - try: - # ---------------------------------------------------------------- # - parsed_spans = [ - parse_from_otel_span_dto( - otel_span, - flag_create_spans_from_nodes, - ) - for otel_span in otel_spans - ] - - span_dtos = [ - parsed_span.get("nodes") - for parsed_span in parsed_spans - if parsed_span.get("nodes") - ] - tracing_spans = [ - parsed_span.get("spans") - for parsed_span in parsed_spans - if parsed_span.get("spans") - ] - - # ---------------------------------------------------------------- # - except Exception as e: - log.error( - "Failed to parse spans from project %s with error:", - request.state.project_id, - exc_info=True, - ) - for otel_span in otel_spans: - log.error( - "Span: [%s] %s", - UUID(otel_span.context.trace_id[2:]), - otel_span, - ) - err_status = ProtoStatus(message="Failed to parse OTEL span.") - return Response( - content=err_status.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - - # -------------------------------------------------------------------- # - delta = sum([1 for span_dto in span_dtos if span_dto.parent is None]) - - if is_ee(): - check, _, _ = await check_entitlements( - organization_id=request.state.organization_id, - key=Counter.TRACES, - delta=delta, - ) - - if not check: - err_status = ProtoStatus( - message="You have reached your quota limit. Please upgrade your plan to continue." - ) - return Response( - content=err_status.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_403_FORBIDDEN, - ) - # -------------------------------------------------------------------- # - - try: - # ---------------------------------------------------------------- # - await self.service.ingest( - project_id=UUID(request.state.project_id), - span_dtos=span_dtos, - ) - # ---------------------------------------------------------------- # - except Exception as e: - log.error( - "Failed to ingest spans from project %s with error:", - request.state.project_id, - exc_info=True, - ) - for span_dto in span_dtos: - log.error( - "Span: [%s] %s", - span_dto.tree.id, - span_dto, - ) - err_status = ProtoStatus(message="Failed to ingest spans.") - return Response( - content=err_status.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, - ) - - try: - # ---------------------------------------------------------------- # - if flag_create_spans_from_nodes: - await self.tracing.create( - project_id=UUID(request.state.project_id), - user_id=UUID(request.state.user_id), - span_dtos=tracing_spans, - ) - # ---------------------------------------------------------------- # - except Exception as e: - log.warn( - "Failed to create spans from project %s with error:", - request.state.project_id, - exc_info=True, - ) - for span in tracing_spans: - span: OTelFlatSpan - log.warn( - "Span: [%s] %s", - span.trace_id, - span, - ) - - # ------------------------------------------------------------------ # - # According to the OTLP/HTTP spec a full-success response must be an - # HTTP 200 with a serialized ExportTraceServiceResponse protobuf and - # the same Content-Type that the client used (we only support binary - # protobuf at the moment). - # ------------------------------------------------------------------ # - - export_response = ExportTraceServiceResponse() # empty == full success - - return Response( - content=export_response.SerializeToString(), - media_type="application/x-protobuf", - status_code=status.HTTP_200_OK, - ) - - ### QUERIES - - @intercept_exceptions() - @suppress_exceptions(default=AgentaNodesResponse()) - async def query_traces( - self, - request: Request, - query_dto: QueryDTO = Depends(parse_query_from_params_request), - format: Literal[ # pylint: disable=W0622 - "opentelemetry", - "agenta", - ] = Query("agenta"), - ): - """ - Query traces, with optional grouping, windowing, filtering, and pagination. - """ - - if format == "opentelemetry" and query_dto.grouping: - query_dto.grouping.focus = Focus.NODE - - try: - span_dtos, count = await self.service.query( - project_id=UUID(request.state.project_id), - query_dto=query_dto, - ) - except FilteringException as e: - raise HTTPException( - status_code=400, - detail=str(e), - ) from e - - spans = [] - - # format = opentelemetry -> focus = node - if format == "opentelemetry": - spans = [parse_to_otel_span_dto(span_dto) for span_dto in span_dtos] - - return OTelTracingResponse( - count=count, - spans=spans, - ) - - # format = agenta - elif format == "agenta": - spans = [parse_to_agenta_span_dto(span_dto) for span_dto in span_dtos] - - # focus = tree | root - if query_dto.grouping and query_dto.grouping.focus.value != "node": - _nodes_by_tree: Dict[str, List[AgentaNodeDTO]] = dict() - _types_by_tree: Dict[str, str] = dict() - - for span in spans: - if span.tree.id not in _nodes_by_tree: - _nodes_by_tree[span.tree.id] = list() - _types_by_tree[span.tree.id] = None - - _nodes_by_tree[span.tree.id].append( - AgentaNodeDTO(**span.model_dump()) - ) - _types_by_tree[span.tree.id] = span.tree.type - - # focus = tree - if query_dto.grouping.focus.value == "tree": - return AgentaTreesResponse( - count=count, - trees=[ - AgentaTreeDTO( - tree=TreeDTO( - id=tree_id, - type=_types_by_tree[tree_id], - ), - nodes=[ - AgentaNodeDTO(**span.model_dump()) for span in nodes - ], - ) - for tree_id, nodes in _nodes_by_tree.items() - ], - ) - - # focus = root - else: - _nodes_by_root: Dict[str, List[AgentaTreeDTO]] = dict() - _types_by_root: Dict[str, str] = dict() - - for tree_id, nodes in _nodes_by_tree.items(): - if nodes[0].root.id not in _nodes_by_root: - _nodes_by_root[nodes[0].root.id] = list() - _types_by_root[nodes[0].root.id] = None - - _nodes_by_root[nodes[0].root.id].append( - AgentaTreeDTO( - tree=TreeDTO( - id=tree_id, - type=_types_by_tree[tree_id], - ), - nodes=[ - AgentaNodeDTO(**span.model_dump()) for span in nodes - ], - ) - ) - - return AgentaRootsResponse( - count=count, - roots=[ - AgentaRootDTO( - root=RootDTO(id=root_id), - trees=trees, - ) - for root_id, trees in _nodes_by_root.items() - ], - ) - - # focus = node - return AgentaNodesResponse( - count=count, - nodes=[AgentaNodeDTO(**span.model_dump()) for span in spans], - ) - - @intercept_exceptions() - async def query_analytics( - self, - request: Request, - analytics_dto: AnalyticsDTO = Depends(parse_analytics_dto), - legacy_analytics_dto: AnalyticsDTO = Depends(parse_legacy_analytics_dto), - format: Literal[ # pylint: disable=W0622 - "legacy", - "agenta", - ] = Query("agenta"), - ): - try: - if legacy_analytics_dto is not None: - analytics_dto = legacy_analytics_dto - - bucket_dtos, count = await self.service.analytics( - project_id=UUID(request.state.project_id), - analytics_dto=analytics_dto, - ) - - if format == "legacy": - data, summary = parse_legacy_analytics(bucket_dtos) - - return LegacyAnalyticsResponse( - data=data, - **summary.model_dump(), - ) - - return OldAnalyticsResponse( - count=count, - buckets=bucket_dtos, - ) - - except FilteringException as e: - raise HTTPException( - status_code=400, - detail=str(e), - ) from e - - @intercept_exceptions() - async def fetch_trace_by_id( - self, - request: Request, - trace_id: Union[str, int], - format: Literal[ # pylint: disable=W0622 - "opentelemetry", - "agenta", - ] = Query("openetelemetry"), - ): - """ - Fetch trace by ID. - """ - - tree_id = None - - if not trace_id: - raise HTTPException(status_code=400, detail="trace_id is required.") - - # INT # 66247539550469235673292373222060196016 - try: - trace_id = hex(int(trace_id)) - except: # pylint: disable=bare-except - pass - - if not isinstance(trace_id, str): - raise HTTPException(status_code=400, detail="trace_id is invalid.") - - # HEX # 0x31d6cfe04b9011ec800142010a8000b0 - if trace_id.startswith("0x") and len(trace_id) > 2: - trace_id = trace_id[2:] - - # UUID # 31d6cfe0-4b90-11ec-8001-42010a8000b0 - # HEX # 31d6cfe04b9011ec800142010a8000b0 - try: - tree_id = str(UUID(trace_id)) - except Exception as e: - raise HTTPException(status_code=400, detail="trace_id is invalid.") from e - - return await self.query_traces( - request=request, - format=format, - query_dto=QueryDTO( - grouping=GroupingDTO( - focus="node" if format == "opentelemetry" else "tree", - ), - filtering=FilteringDTO( - conditions=[ - ConditionDTO( - key="tree.id", - value=tree_id, - ) - ] - ), - ), - ) - - ### MUTATIONS - - @intercept_exceptions() - async def delete_traces( - self, - request: Request, - node_id: UUID = Query(None), - node_ids: List[UUID] = Query(None), - ): - """ - Delete trace. - """ - - await self.service.delete( - project_id=UUID(request.state.project_id), - node_id=node_id, - node_ids=node_ids, - ) - - return CollectStatusResponse(status="deleted") diff --git a/api/oss/src/apis/fastapi/observability/__init__.py b/api/oss/src/apis/fastapi/otlp/__init__.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/__init__.py rename to api/oss/src/apis/fastapi/otlp/__init__.py diff --git a/api/oss/src/apis/fastapi/observability/extractors/adapter_registry.py b/api/oss/src/apis/fastapi/otlp/extractors/adapter_registry.py similarity index 77% rename from api/oss/src/apis/fastapi/observability/extractors/adapter_registry.py rename to api/oss/src/apis/fastapi/otlp/extractors/adapter_registry.py index 693349bdb2..4afa4decbf 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/adapter_registry.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/adapter_registry.py @@ -2,26 +2,26 @@ from oss.src.utils.logging import get_module_logger -log = get_module_logger(__name__) - -from oss.src.apis.fastapi.observability.extractors.base_adapter import BaseAdapter -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.base_adapter import BaseAdapter +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, SpanFeatures, ) -from oss.src.apis.fastapi.observability.extractors.adapters.openllmetry_adapter import ( +from oss.src.apis.fastapi.otlp.extractors.adapters.openllmetry_adapter import ( OpenLLMmetryAdapter, ) -from oss.src.apis.fastapi.observability.extractors.adapters.openinference_adapter import ( +from oss.src.apis.fastapi.otlp.extractors.adapters.openinference_adapter import ( OpenInferenceAdapter, ) -from oss.src.apis.fastapi.observability.extractors.adapters.logfire_adapter import ( +from oss.src.apis.fastapi.otlp.extractors.adapters.logfire_adapter import ( LogfireAdapter, ) -from oss.src.apis.fastapi.observability.extractors.adapters.default_agenta_adapter import ( +from oss.src.apis.fastapi.otlp.extractors.adapters.default_agenta_adapter import ( DefaultAgentaAdapter, ) +log = get_module_logger(__name__) + class AdapterRegistry: """Registry for feature adapters. diff --git a/api/oss/src/apis/fastapi/observability/extractors/adapters/__init__.py b/api/oss/src/apis/fastapi/otlp/extractors/adapters/__init__.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/extractors/adapters/__init__.py rename to api/oss/src/apis/fastapi/otlp/extractors/adapters/__init__.py diff --git a/api/oss/src/apis/fastapi/observability/extractors/adapters/default_agenta_adapter.py b/api/oss/src/apis/fastapi/otlp/extractors/adapters/default_agenta_adapter.py similarity index 93% rename from api/oss/src/apis/fastapi/observability/extractors/adapters/default_agenta_adapter.py rename to api/oss/src/apis/fastapi/otlp/extractors/adapters/default_agenta_adapter.py index 7203208ede..f67e335f82 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/adapters/default_agenta_adapter.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/adapters/default_agenta_adapter.py @@ -1,9 +1,9 @@ -from oss.src.apis.fastapi.observability.extractors.base_adapter import BaseAdapter -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.base_adapter import BaseAdapter +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, SpanFeatures, ) -from oss.src.apis.fastapi.observability.utils.serialization import ( +from oss.src.apis.fastapi.otlp.utils.serialization import ( decode_value, process_attribute, NAMESPACE_PREFIX_FEATURE_MAPPING, diff --git a/api/oss/src/apis/fastapi/observability/extractors/adapters/logfire_adapter.py b/api/oss/src/apis/fastapi/otlp/extractors/adapters/logfire_adapter.py similarity index 96% rename from api/oss/src/apis/fastapi/observability/extractors/adapters/logfire_adapter.py rename to api/oss/src/apis/fastapi/otlp/extractors/adapters/logfire_adapter.py index f71b18fa72..3d7ea0dac6 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/adapters/logfire_adapter.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/adapters/logfire_adapter.py @@ -1,13 +1,13 @@ from typing import Dict, List, Any, Tuple from json import loads -from oss.src.apis.fastapi.observability.extractors.base_adapter import BaseAdapter -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.base_adapter import BaseAdapter +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, SpanFeatures, ) from oss.src.utils.logging import get_module_logger -from oss.src.apis.fastapi.observability.utils.serialization import ( +from oss.src.apis.fastapi.otlp.utils.serialization import ( decode_value, process_attribute, NAMESPACE_PREFIX_FEATURE_MAPPING, diff --git a/api/oss/src/apis/fastapi/observability/extractors/adapters/openinference_adapter.py b/api/oss/src/apis/fastapi/otlp/extractors/adapters/openinference_adapter.py similarity index 97% rename from api/oss/src/apis/fastapi/observability/extractors/adapters/openinference_adapter.py rename to api/oss/src/apis/fastapi/otlp/extractors/adapters/openinference_adapter.py index 71fc43f644..36a3d7b9c0 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/adapters/openinference_adapter.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/adapters/openinference_adapter.py @@ -1,12 +1,12 @@ from typing import Dict, Any, Tuple, List import re -from oss.src.apis.fastapi.observability.extractors.base_adapter import BaseAdapter -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.base_adapter import BaseAdapter +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, SpanFeatures, ) -from oss.src.apis.fastapi.observability.utils.serialization import ( +from oss.src.apis.fastapi.otlp.utils.serialization import ( process_attribute, NAMESPACE_PREFIX_FEATURE_MAPPING, ) diff --git a/api/oss/src/apis/fastapi/observability/extractors/adapters/openllmetry_adapter.py b/api/oss/src/apis/fastapi/otlp/extractors/adapters/openllmetry_adapter.py similarity index 95% rename from api/oss/src/apis/fastapi/observability/extractors/adapters/openllmetry_adapter.py rename to api/oss/src/apis/fastapi/otlp/extractors/adapters/openllmetry_adapter.py index 65989d9fa7..b36478437d 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/adapters/openllmetry_adapter.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/adapters/openllmetry_adapter.py @@ -1,12 +1,12 @@ from typing import Dict, Optional, Any, Callable, Tuple, List from json import loads -from oss.src.apis.fastapi.observability.extractors.base_adapter import BaseAdapter -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.base_adapter import BaseAdapter +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, SpanFeatures, ) -from oss.src.apis.fastapi.observability.utils.serialization import ( +from oss.src.apis.fastapi.otlp.utils.serialization import ( process_attribute, NAMESPACE_PREFIX_FEATURE_MAPPING, ) diff --git a/api/oss/src/apis/fastapi/observability/extractors/base_adapter.py b/api/oss/src/apis/fastapi/otlp/extractors/base_adapter.py similarity index 91% rename from api/oss/src/apis/fastapi/observability/extractors/base_adapter.py rename to api/oss/src/apis/fastapi/otlp/extractors/base_adapter.py index 29e8ccf1df..279a7a9238 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/base_adapter.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/base_adapter.py @@ -1,5 +1,5 @@ from typing import Protocol -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, SpanFeatures, ) diff --git a/api/oss/src/apis/fastapi/observability/extractors/canonical_attributes.py b/api/oss/src/apis/fastapi/otlp/extractors/canonical_attributes.py similarity index 99% rename from api/oss/src/apis/fastapi/observability/extractors/canonical_attributes.py rename to api/oss/src/apis/fastapi/otlp/extractors/canonical_attributes.py index 6d91a3d5fb..b899846bf7 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/canonical_attributes.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/canonical_attributes.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, Field -from oss.src.core.observability.dtos import ( +from oss.src.core.otel.dtos import ( OTelStatusCode, OTelSpanKind, ) # For type hints diff --git a/api/oss/src/apis/fastapi/observability/extractors/normalizer.py b/api/oss/src/apis/fastapi/otlp/extractors/normalizer.py similarity index 96% rename from api/oss/src/apis/fastapi/observability/extractors/normalizer.py rename to api/oss/src/apis/fastapi/otlp/extractors/normalizer.py index 02348d2948..5a8f996b6e 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/normalizer.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/normalizer.py @@ -2,11 +2,11 @@ from copy import copy from datetime import datetime, timezone -from oss.src.core.observability.dtos import ( +from oss.src.core.otel.dtos import ( OTelSpanDTO, OTelStatusCode, ) -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( CanonicalAttributes, EventData, LinkData, diff --git a/api/oss/src/apis/fastapi/observability/extractors/span_data_builders.py b/api/oss/src/apis/fastapi/otlp/extractors/span_data_builders.py similarity index 61% rename from api/oss/src/apis/fastapi/observability/extractors/span_data_builders.py rename to api/oss/src/apis/fastapi/otlp/extractors/span_data_builders.py index 86e5d6c1aa..14837a84a4 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/span_data_builders.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/span_data_builders.py @@ -1,8 +1,14 @@ -from typing import Any +from typing import Any, Optional from abc import ABC, abstractmethod -from uuid import UUID +from oss.src.utils.logging import get_module_logger +from oss.src.apis.fastapi.tracing.utils import _parse_span_from_request +from oss.src.apis.fastapi.otlp.extractors.canonical_attributes import ( + SpanFeatures, +) + +from oss.src.core.otel.dtos import OTelSpanDTO from oss.src.core.tracing.dtos import OTelSpan, OTelFlatSpan, OTelEvent, OTelLink from oss.src.core.tracing.utils import ( parse_trace_id_to_uuid, @@ -12,25 +18,6 @@ parse_status_code_to_enum, ) -from oss.src.core.observability.dtos import ( - OTelSpanDTO, - SpanDTO, - RootDTO, - TreeDTO, - NodeDTO, - ParentDTO, - TimeDTO, - StatusDTO, - ExceptionDTO, - OTelExtraDTO, -) -from oss.src.apis.fastapi.observability.extractors.canonical_attributes import ( - SpanFeatures, -) -from oss.src.utils.logging import get_module_logger - -from oss.src.apis.fastapi.tracing.utils import _parse_span_from_request - log = get_module_logger(__name__) @@ -65,143 +52,6 @@ def build(self, otel_span_dto: OTelSpanDTO, features: SpanFeatures) -> Any: pass -class NodeBuilder(SpanDataBuilder): - """ - Concrete implementation that builds a SpanDTO. - This encapsulates the logic from the original SpanProcessor._build_span_dto. - """ - - @property - def name(self) -> str: - return "node_builder" - - def build(self, otel_span_dto: OTelSpanDTO, features: SpanFeatures) -> SpanDTO: - trace_id = otel_span_dto.context.trace_id[2:] - span_id = otel_span_dto.context.span_id[2:] - tree_id = UUID(trace_id) - - node_id_hex = tree_id.hex[16:] + span_id - try: - node_id = UUID(node_id_hex) - except ValueError as e: - log.error( - f"NodeBuilder: Error creating node_id UUID from hex '{node_id_hex}'. OTelSpan: {otel_span_dto}. SpanFeatures: {features}. Error: {e}" - ) - raise ValueError(f"Invalid hex string for node_id: {node_id_hex}") from e - - tree_type = features.type.get("tree") - - node_type = features.type.get("node") - if node_type not in [ - "agent", - "workflow", - "chain", - "task", - "tool", - "embedding", - "query", - "completion", - "chat", - "rerank", - ]: - node_type = "task" - - root_id_str = features.refs.get("scenario", {}).get("id", str(tree_id)) - root = RootDTO(id=UUID(root_id_str)) - - tree = TreeDTO(id=tree_id, type=tree_type) - - node = NodeDTO(id=node_id, type=node_type, name=otel_span_dto.name) - - parent = None - if otel_span_dto.parent: - parent_id_hex = ( - otel_span_dto.parent.trace_id[2 + 16 :] - + otel_span_dto.parent.span_id[2:] - ) - try: - parent_id = UUID(parent_id_hex) - parent = ParentDTO(id=parent_id) - except ValueError as e: - log.error( - f"NodeBuilder: Error creating parent_id UUID from hex '{parent_id_hex}'. OTelSpan: {otel_span_dto}. SpanFeatures: {features}. Error: {e}" - ) - raise ValueError( - f"Invalid hex string for parent_id: {parent_id_hex}" - ) from e - - time_dto = TimeDTO(start=otel_span_dto.start_time, end=otel_span_dto.end_time) - duration = round( - (time_dto.end - time_dto.start).total_seconds() * 1_000, 3 - ) # milliseconds - - status = StatusDTO( - code=( - otel_span_dto.status_code.value.replace("STATUS_CODE_", "") - if otel_span_dto.status_code - else None - ), - message=otel_span_dto.status_message, - ) - - exception = None - if features.exception: - exception = ExceptionDTO( - timestamp=features.exception.get("timestamp"), - type=features.exception.get("type"), - message=features.exception.get("message"), - stacktrace=features.exception.get("stacktrace"), - attributes=features.exception.get("attributes"), - ) - - data = features.data - - metrics = features.metrics - metrics["acc.duration.total"] = duration - - meta = features.meta - - refs = features.refs - - # links = features.links - links = [] - - otel = OTelExtraDTO( - kind=(otel_span_dto.kind.value if otel_span_dto.kind else None), - attributes=otel_span_dto.attributes, - events=otel_span_dto.events, - links=otel_span_dto.links, - ) - - try: - span_dto = SpanDTO( - trace_id=trace_id, - span_id=span_id, - root=root, - tree=tree, - node=node, - parent=parent, - time=time_dto, - status=status, - exception=exception, - data=data, - metrics=metrics, - meta=meta, - refs=refs, - links=links, - otel=otel, - ) - except Exception as e: - log.error( - "NodeBuilder: Failed to create SpanDTO from span: %s. SpanFeatures: %s. Error: %s", - otel_span_dto, - features, - str(e), - ) - raise e - return span_dto - - class OTelFlatSpanBuilder(SpanDataBuilder): """ Concrete implementation that builds a OTelFlatSpan. @@ -215,7 +65,7 @@ def build( self, otel_span_dto: OTelSpanDTO, features: SpanFeatures, - ) -> OTelFlatSpan: + ) -> Optional[OTelFlatSpan]: # IDS ------------------------------------------------------------------ trace_id = otel_span_dto.context.trace_id[2:] span_id = otel_span_dto.context.span_id[2:] @@ -336,13 +186,15 @@ def build( if otel_span_dto.events: for event in otel_span_dto.events: try: - events.append( - OTelEvent( - name=event.name, - timestamp=parse_timestamp_to_datetime(event.timestamp), - attributes=event.attributes, + timestamp = parse_timestamp_to_datetime(event.timestamp) + if timestamp: + events.append( + OTelEvent( + name=event.name, + timestamp=timestamp, + attributes=event.attributes, + ) ) - ) except Exception as e: log.warn( f"OTelFlatSpanBuilder: Error creating OTelEvent from event: {event}. Error: {e}." @@ -352,25 +204,17 @@ def build( try: span_dto = OTelSpan( - # IDS trace_id=trace_id, span_id=span_id, parent_id=parent_id, - # KIND span_kind=span_kind, - # NAME span_name=span_name, - # TIME start_time=start_time, end_time=end_time, - # STATUS status_code=status_code, status_message=status_message, - # ATTRIBUTES attributes=attributes, - # LINKS links=links, - # EVENTS events=events, ) diff --git a/api/oss/src/apis/fastapi/observability/extractors/span_processor.py b/api/oss/src/apis/fastapi/otlp/extractors/span_processor.py similarity index 86% rename from api/oss/src/apis/fastapi/observability/extractors/span_processor.py rename to api/oss/src/apis/fastapi/otlp/extractors/span_processor.py index a17fd7baa6..2b8b245ecd 100644 --- a/api/oss/src/apis/fastapi/observability/extractors/span_processor.py +++ b/api/oss/src/apis/fastapi/otlp/extractors/span_processor.py @@ -1,10 +1,10 @@ -from typing import Dict, List, Any, Optional +from typing import Dict, List, Any from oss.src.utils.logging import get_module_logger -from oss.src.core.observability.dtos import OTelSpanDTO -from oss.src.apis.fastapi.observability.extractors.normalizer import Normalizer -from oss.src.apis.fastapi.observability.extractors.adapter_registry import ( +from oss.src.core.otel.dtos import OTelSpanDTO +from oss.src.apis.fastapi.otlp.extractors.normalizer import Normalizer +from oss.src.apis.fastapi.otlp.extractors.adapter_registry import ( AdapterRegistry, ) from .span_data_builders import SpanDataBuilder @@ -41,7 +41,6 @@ def __init__(self, builders: List[SpanDataBuilder]): def process( self, otel_span_dto: OTelSpanDTO, - flag_create_spans_from_nodes: Optional[bool] = False, ) -> Dict[str, Any]: """Process an OpenTelemetry span using all configured builders. @@ -57,11 +56,6 @@ def process( results: Dict[str, Any] = {} for builder in self.builders: - if ( - not flag_create_spans_from_nodes - and builder.name == "otel_flat_span_builder" - ): - continue try: processed_data = builder.build(otel_span_dto, features) results[builder.name] = processed_data diff --git a/api/oss/src/apis/fastapi/observability/models.py b/api/oss/src/apis/fastapi/otlp/models.py similarity index 97% rename from api/oss/src/apis/fastapi/observability/models.py rename to api/oss/src/apis/fastapi/otlp/models.py index 1620585aed..001fb0716f 100644 --- a/api/oss/src/apis/fastapi/observability/models.py +++ b/api/oss/src/apis/fastapi/otlp/models.py @@ -3,7 +3,7 @@ from pydantic import BaseModel, ConfigDict -from oss.src.core.observability.dtos import ( +from oss.src.core.otel.dtos import ( OTelSpanDTO, SpanDTO, TreeDTO, diff --git a/api/oss/src/apis/fastapi/observability/opentelemetry/__init__.py b/api/oss/src/apis/fastapi/otlp/opentelemetry/__init__.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/opentelemetry/__init__.py rename to api/oss/src/apis/fastapi/otlp/opentelemetry/__init__.py diff --git a/api/oss/src/apis/fastapi/observability/opentelemetry/otlp.py b/api/oss/src/apis/fastapi/otlp/opentelemetry/otlp.py similarity index 99% rename from api/oss/src/apis/fastapi/observability/opentelemetry/otlp.py rename to api/oss/src/apis/fastapi/otlp/opentelemetry/otlp.py index f7ef6cd6a4..58944514b5 100644 --- a/api/oss/src/apis/fastapi/observability/opentelemetry/otlp.py +++ b/api/oss/src/apis/fastapi/otlp/opentelemetry/otlp.py @@ -12,7 +12,7 @@ ) from oss.src.utils.logging import get_module_logger -from oss.src.core.observability.dtos import ( +from oss.src.core.otel.dtos import ( OTelSpanDTO, OTelContextDTO, OTelEventDTO, diff --git a/api/oss/src/apis/fastapi/observability/opentelemetry/semconv.py b/api/oss/src/apis/fastapi/otlp/opentelemetry/semconv.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/opentelemetry/semconv.py rename to api/oss/src/apis/fastapi/otlp/opentelemetry/semconv.py diff --git a/api/oss/src/apis/fastapi/observability/opentelemetry/traces.proto b/api/oss/src/apis/fastapi/otlp/opentelemetry/traces.proto similarity index 100% rename from api/oss/src/apis/fastapi/observability/opentelemetry/traces.proto rename to api/oss/src/apis/fastapi/otlp/opentelemetry/traces.proto diff --git a/api/oss/src/apis/fastapi/observability/opentelemetry/traces_proto.py b/api/oss/src/apis/fastapi/otlp/opentelemetry/traces_proto.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/opentelemetry/traces_proto.py rename to api/oss/src/apis/fastapi/otlp/opentelemetry/traces_proto.py diff --git a/api/oss/src/apis/fastapi/otlp/router.py b/api/oss/src/apis/fastapi/otlp/router.py new file mode 100644 index 0000000000..e969e85c32 --- /dev/null +++ b/api/oss/src/apis/fastapi/otlp/router.py @@ -0,0 +1,212 @@ +from uuid import UUID + +from fastapi import APIRouter, Request, status +from fastapi.responses import Response + +from google.rpc.status_pb2 import Status as ProtoStatus +from opentelemetry.proto.collector.trace.v1.trace_service_pb2 import ( + ExportTraceServiceResponse, +) + +from oss.src.utils.env import env +from oss.src.utils.common import is_ee +from oss.src.utils.logging import get_module_logger +from oss.src.utils.exceptions import intercept_exceptions + +from oss.src.apis.fastapi.otlp.models import CollectStatusResponse +from oss.src.apis.fastapi.otlp.opentelemetry.otlp import parse_otlp_stream +from oss.src.apis.fastapi.otlp.utils.processing import parse_from_otel_span_dto + +from oss.src.core.tracing.service import TracingService + +if is_ee(): + from ee.src.utils.entitlements import check_entitlements, Counter + + +MAX_OTLP_BATCH_SIZE = env.AGENTA_OTLP_MAX_BATCH_BYTES +MAX_OTLP_BATCH_SIZE_MB = MAX_OTLP_BATCH_SIZE // (1024 * 1024) + + +log = get_module_logger(__name__) + + +class OTLPRouter: + def __init__( + self, + tracing_service: TracingService, + ): + self.tracing = tracing_service + + self.sdk_router = APIRouter() + self.router = APIRouter() + + self.router.add_api_route( + "/traces", + self.otlp_status, + methods=["GET"], + operation_id="otlp_status", + summary="Status check for OTLP", + status_code=status.HTTP_200_OK, + response_model=CollectStatusResponse, + ) + + self.router.add_api_route( + "/traces", + self.otlp_ingest, + methods=["POST"], + operation_id="otlp_ingest", + summary="Ingest traces via OTLP", + status_code=status.HTTP_200_OK, + response_model=CollectStatusResponse, + ) + + @intercept_exceptions() + async def otlp_status(self): + return CollectStatusResponse(status="ready") + + @intercept_exceptions() + async def otlp_ingest( + self, + request: Request, + ): + # -------------------------------------------------------------------- # + # Parse request into OTLP stream + # -------------------------------------------------------------------- # + otlp_stream = None + try: + otlp_stream = await request.body() + except Exception: + log.error( + "Failed to process OTLP stream from project %s with error:", + request.state.project_id, + exc_info=True, + ) + err_status = ProtoStatus( + message="Invalid request body: not a valid OTLP stream." + ) + return Response( + content=err_status.SerializeToString(), + media_type="application/x-protobuf", + status_code=status.HTTP_400_BAD_REQUEST, + ) + + # -------------------------------------------------------------------- # + # Enforce OTLP stream size limit + # -------------------------------------------------------------------- # + if len(otlp_stream) > MAX_OTLP_BATCH_SIZE: + log.error( + "OTLP batch too large (%s bytes > %s bytes) from project %s", + len(otlp_stream), + MAX_OTLP_BATCH_SIZE, + request.state.project_id, + ) + err_status = ProtoStatus( + message=f"OTLP batch size exceeds {MAX_OTLP_BATCH_SIZE_MB}MB limit." + ) + return Response( + content=err_status.SerializeToString(), + media_type="application/x-protobuf", + status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, + ) + + # -------------------------------------------------------------------- # + # Parse OTLP stream into OTel spans + # -------------------------------------------------------------------- # + otel_spans = None + try: + otel_spans = parse_otlp_stream(otlp_stream) + except Exception: + log.error( + "Failed to parse OTLP stream from project %s with error:", + request.state.project_id, + exc_info=True, + ) + log.error("OTLP stream: %s", otlp_stream) + err_status = ProtoStatus(message="Failed to parse OTLP stream.") + return Response( + content=err_status.SerializeToString(), + media_type="application/x-protobuf", + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + # -------------------------------------------------------------------- # + # Parse OTel spans into internal spans + # -------------------------------------------------------------------- # + spans = None + try: + spans = [parse_from_otel_span_dto(s) for s in otel_spans] + spans = [s for s in spans if s is not None] + except Exception: + log.error( + "Failed to parse spans from project %s with error:", + request.state.project_id, + exc_info=True, + ) + for otel_span in otel_spans: + log.error( + "Span: [%s] %s", + UUID(otel_span.context.trace_id[2:]), + otel_span, + ) + err_status = ProtoStatus(message="Failed to parse OTEL span.") + return Response( + content=err_status.SerializeToString(), + media_type="application/x-protobuf", + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + + # -------------------------------------------------------------------- # + # Update meter with internal traces count (EE only) + # -------------------------------------------------------------------- # + if is_ee() and check_entitlements and Counter: # type: ignore + delta = sum([1 for s in spans if s and s.parent_id is None]) + check, _, _ = await check_entitlements( + organization_id=request.state.organization_id, + key=Counter.TRACES, + delta=delta, + ) + if not check: + err_status = ProtoStatus( + message="You have reached your quota limit. " + "Please upgrade your plan to continue." + ) + return Response( + content=err_status.SerializeToString(), + media_type="application/x-protobuf", + status_code=status.HTTP_403_FORBIDDEN, + ) + + # -------------------------------------------------------------------- # + # Store internal spans + # -------------------------------------------------------------------- # + try: + await self.tracing.create( + project_id=UUID(request.state.project_id), + user_id=UUID(request.state.user_id), + span_dtos=spans, + ) + except Exception: + log.warn( + "Failed to create spans from project %s with error:", + request.state.project_id, + exc_info=True, + ) + for span in spans: + log.warn( + "Span: [%s] %s", + span.trace_id, + span, + ) + + # -------------------------------------------------------------------- # + # According to the OTLP/HTTP spec a full-success response must be an + # HTTP 200 with a serialized ExportTraceServiceResponse protobuf and + # the same Content-Type that the client used. + # We only support binary protobuf at the moment. + # -------------------------------------------------------------------- # + export_response = ExportTraceServiceResponse() + return Response( + content=export_response.SerializeToString(), + media_type="application/x-protobuf", + status_code=status.HTTP_200_OK, + ) diff --git a/api/oss/src/apis/fastapi/observability/utils/__init__.py b/api/oss/src/apis/fastapi/otlp/utils/__init__.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/utils/__init__.py rename to api/oss/src/apis/fastapi/otlp/utils/__init__.py diff --git a/api/oss/src/apis/fastapi/observability/utils/marshalling.py b/api/oss/src/apis/fastapi/otlp/utils/marshalling.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/utils/marshalling.py rename to api/oss/src/apis/fastapi/otlp/utils/marshalling.py diff --git a/api/oss/src/apis/fastapi/observability/utils/processing.py b/api/oss/src/apis/fastapi/otlp/utils/processing.py similarity index 88% rename from api/oss/src/apis/fastapi/observability/utils/processing.py rename to api/oss/src/apis/fastapi/otlp/utils/processing.py index 54c019702e..66e52ac920 100644 --- a/api/oss/src/apis/fastapi/observability/utils/processing.py +++ b/api/oss/src/apis/fastapi/otlp/utils/processing.py @@ -6,46 +6,31 @@ from fastapi import Query, HTTPException -from oss.src.apis.fastapi.observability.opentelemetry.semconv import CODEX +from oss.src.apis.fastapi.otlp.opentelemetry.semconv import CODEX from oss.src.utils.logging import get_module_logger -log = get_module_logger(__name__) -from oss.src.apis.fastapi.observability.utils.serialization import ( - decode_key, - decode_value, - encode_key, -) -from oss.src.apis.fastapi.observability.utils.marshalling import unmarshall_attributes +from oss.src.apis.fastapi.otlp.utils.serialization import encode_key +from oss.src.apis.fastapi.otlp.utils.marshalling import unmarshall_attributes from oss.src.core.tracing.dtos import OTelFlatSpan -from oss.src.apis.fastapi.observability.models import ( +from oss.src.apis.fastapi.otlp.models import ( LegacyDataPoint, LegacySummary, ) -from oss.src.core.observability.dtos import ( - TimeDTO, - StatusDTO, - RootDTO, - TreeDTO, - NodeDTO, - ParentDTO, +from oss.src.core.otel.dtos import ( LinkDTO, - ExceptionDTO, Attributes, SpanDTO, - OTelExtraDTO, OTelEventDTO, OTelSpanDTO, OTelContextDTO, OTelLinkDTO, BucketDTO, - NodeType, - TreeType, ) -from oss.src.core.observability.dtos import ( +from oss.src.core.otel.dtos import ( GroupingDTO, WindowingDTO, FilteringDTO, @@ -54,21 +39,16 @@ AnalyticsDTO, ConditionDTO, ) -from oss.src.apis.fastapi.observability.extractors.span_processor import SpanProcessor -from oss.src.apis.fastapi.observability.extractors.span_data_builders import ( - NodeBuilder, +from oss.src.apis.fastapi.otlp.extractors.span_processor import SpanProcessor +from oss.src.apis.fastapi.otlp.extractors.span_data_builders import ( OTelFlatSpanBuilder, ) +log = get_module_logger(__name__) + -node_builder_instance = NodeBuilder() otel_flat_span_builder_instance = OTelFlatSpanBuilder() -span_processor = SpanProcessor( - builders=[ - node_builder_instance, - otel_flat_span_builder_instance, - ] -) +span_processor = SpanProcessor(builders=[otel_flat_span_builder_instance]) # --- PARSE QUERY / ANALYTICS DTO --- @@ -204,41 +184,26 @@ def parse_analytics_dto( def parse_from_otel_span_dto( otel_span_dto: OTelSpanDTO, - flag_create_spans_from_nodes: Optional[bool] = False, -) -> SpanDTO: +) -> Optional[OTelFlatSpan]: """ Process an OpenTelemetry span into a SpanDTO using the new architecture. This function maintains the same signature as the original but uses the new - SpanProcessor internally (with NodeBuilder) for better handling of different data formats. + SpanProcessor internally (with OTelFlatSpanBuilder) for better handling of different data formats. The result from the processor is a dictionary, from which the 'node_builder' output is extracted. """ - processed_results = span_processor.process( - otel_span_dto, - flag_create_spans_from_nodes, + processed_results = span_processor.process(otel_span_dto) + otel_flat_span: Optional[OTelFlatSpan] = processed_results.get( + "otel_flat_span_builder" ) - span_dto = processed_results.get("node_builder") - otel_flat_span = processed_results.get("otel_flat_span_builder") - if not isinstance(span_dto, SpanDTO): + if not isinstance(otel_flat_span, OTelFlatSpan): log.error( - f"NodeBuilder did not produce a valid SpanDTO for trace_id {otel_span_dto.context.trace_id}, " + f"OTelFlatSpanBuilder did not produce a valid OTelFlatSpan for trace_id {otel_span_dto.context.trace_id}, " f"span_id {otel_span_dto.context.span_id}. Processor results: {processed_results}" ) - if flag_create_spans_from_nodes: - if not isinstance(otel_flat_span, OTelFlatSpan): - log.error( - f"OTelFlatSpanBuilder did not produce a valid OTelFlatSpan for trace_id {otel_span_dto.context.trace_id}, " - f"span_id {otel_span_dto.context.span_id}. Processor results: {processed_results}" - ) - - parsed_spans = { - "nodes": span_dto, - "spans": otel_flat_span, - } - - return parsed_spans + return otel_flat_span def _parse_to_attributes( diff --git a/api/oss/src/apis/fastapi/observability/utils/serialization.py b/api/oss/src/apis/fastapi/otlp/utils/serialization.py similarity index 100% rename from api/oss/src/apis/fastapi/observability/utils/serialization.py rename to api/oss/src/apis/fastapi/otlp/utils/serialization.py diff --git a/api/oss/src/core/observability/__init__.py b/api/oss/src/core/observability/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/api/oss/src/core/observability/interfaces.py b/api/oss/src/core/observability/interfaces.py deleted file mode 100644 index 53116d453e..0000000000 --- a/api/oss/src/core/observability/interfaces.py +++ /dev/null @@ -1,85 +0,0 @@ -from uuid import UUID -from typing import List, Tuple, Optional - -from oss.src.core.observability.dtos import ( - QueryDTO, - SpanDTO, - AnalyticsDTO, - BucketDTO, -) - - -class ObservabilityDAOInterface: - def __init__(self): - raise NotImplementedError - - # QUERIES - - async def query( - self, - *, - project_id: UUID, - query_dto: QueryDTO, - ) -> Tuple[List[SpanDTO], Optional[int]]: - raise NotImplementedError - - async def analytics( - self, - *, - project_id: UUID, - analytics_dto: AnalyticsDTO, - ) -> Tuple[List[BucketDTO], Optional[int]]: - raise NotImplementedError - - # TRANSACTIONS - - async def create_one( - self, - *, - project_id: UUID, - span_dto: SpanDTO, - ) -> None: - raise NotImplementedError - - async def create_many( - self, - *, - project_id: UUID, - span_dtos: List[SpanDTO], - ) -> None: - raise NotImplementedError - - async def read_one( - self, - *, - project_id: UUID, - node_id: UUID, - ) -> SpanDTO: - raise NotImplementedError - - async def read_many( - self, - *, - project_id: UUID, - # - node_ids: List[UUID], - ) -> List[SpanDTO]: - raise NotImplementedError - - async def delete_one( - self, - *, - project_id: UUID, - # - node_id: UUID, - ) -> None: - raise NotImplementedError - - async def delete_many( - self, - *, - project_id: UUID, - # - node_ids: List[UUID], - ) -> None: - raise NotImplementedError diff --git a/api/oss/src/core/observability/service.py b/api/oss/src/core/observability/service.py deleted file mode 100644 index bc85a02144..0000000000 --- a/api/oss/src/core/observability/service.py +++ /dev/null @@ -1,151 +0,0 @@ -from uuid import UUID -from typing import List, Optional, Tuple - -from oss.src.core.observability.interfaces import ObservabilityDAOInterface -from oss.src.core.observability.dtos import ( - QueryDTO, - AnalyticsDTO, - SpanDTO, - BucketDTO, -) -from oss.src.core.observability.utils import ( - parse_span_dtos_to_span_idx, - parse_span_idx_to_span_id_tree, - calculate_costs, - cumulate_costs, - cumulate_tokens, - connect_children, - parse_filtering, - parse_ingest, -) - - -class ObservabilityService: - def __init__( - self, - observability_dao: ObservabilityDAOInterface, - ): - self.observability_dao = observability_dao - - async def query( - self, - *, - project_id: UUID, - query_dto: QueryDTO, - ) -> Tuple[List[SpanDTO], Optional[int]]: - if query_dto.filtering: - parse_filtering(query_dto.filtering) - - span_dtos, count = await self.observability_dao.query( - project_id=project_id, - query_dto=query_dto, - ) - - if query_dto.grouping and query_dto.grouping.focus.value != "node": - span_idx = parse_span_dtos_to_span_idx(span_dtos) - - span_id_tree = parse_span_idx_to_span_id_tree(span_idx) - - connect_children(span_id_tree, span_idx) - - span_dtos = [ - span_dto for span_dto in span_idx.values() if span_dto.parent is None - ] - - return span_dtos, count - - async def analytics( - self, - *, - project_id: UUID, - analytics_dto: AnalyticsDTO, - ) -> Tuple[List[BucketDTO], Optional[int]]: - if analytics_dto.filtering: - parse_filtering(analytics_dto.filtering) - - bucket_dtos, count = await self.observability_dao.analytics( - project_id=project_id, - analytics_dto=analytics_dto, - ) - - return bucket_dtos, count - - async def ingest( - self, - *, - project_id: UUID, - span_dtos: List[SpanDTO], - ) -> None: - parse_ingest(span_dtos) - - span_idx = parse_span_dtos_to_span_idx(span_dtos) - - span_id_tree = parse_span_idx_to_span_id_tree(span_idx) - - calculate_costs(span_idx) - - cumulate_costs(span_id_tree, span_idx) - - cumulate_tokens(span_id_tree, span_idx) - - await self.observability_dao.create_many( - project_id=project_id, - span_dtos=span_idx.values(), - ) - - async def create( - self, - *, - project_id: UUID, - span_dto: Optional[SpanDTO] = None, - span_dtos: Optional[List[SpanDTO]] = None, - ) -> SpanDTO: - if span_dto: - return await self.observability_dao.create_one( - project_id=project_id, - span_dto=span_dto, - ) - - if span_dtos: - return await self.observability_dao.create_many( - project_id=project_id, - span_dtos=span_dtos, - ) - - async def read( - self, - *, - project_id: UUID, - node_id: Optional[UUID] = None, - node_ids: Optional[List[UUID]] = None, - ) -> SpanDTO: - if node_id: - return await self.observability_dao.read_one( - project_id=project_id, - node_id=node_id, - ) - - if node_ids: - return await self.observability_dao.read_many( - project_id=project_id, - node_ids=node_ids, - ) - - async def delete( - self, - *, - project_id: UUID, - node_id: Optional[UUID] = None, - node_ids: Optional[List[UUID]] = None, - ): - if node_id: - return await self.observability_dao.delete_one( - project_id=project_id, - node_id=node_id, - ) - - if node_ids: - return await self.observability_dao.delete_many( - project_id=project_id, - node_ids=node_ids, - ) diff --git a/api/oss/src/core/observability/utils.py b/api/oss/src/core/observability/utils.py deleted file mode 100644 index 01e72c7825..0000000000 --- a/api/oss/src/core/observability/utils.py +++ /dev/null @@ -1,433 +0,0 @@ -from enum import Enum -from uuid import UUID -from datetime import datetime -from traceback import print_exc -from typing import List, Dict, OrderedDict, Callable, Any - -from litellm import cost_calculator - -from oss.src.core.observability.dtos import ( - SpanDTO, - FilteringDTO, - ConditionDTO, - ComparisonOperator, - NumericOperator, - StringOperator, - ListOperator, - ExistenceOperator, -) - -from oss.src.utils.logging import get_module_logger - -log = get_module_logger(__name__) - -_C_OPS = list(ComparisonOperator) -_N_OPS = list(NumericOperator) -_S_OPS = list(StringOperator) -_L_OPS = list(ListOperator) -_E_OPS = list(ExistenceOperator) - -_UUID_OPERATORS = _C_OPS + _L_OPS + _E_OPS -_LITERAL_OPERATORS = _C_OPS + _L_OPS + _E_OPS -_INTEGER_OPERATORS = _C_OPS + _N_OPS + _L_OPS + _E_OPS -_FLOAT_OPERATORS = _N_OPS + _E_OPS -_DATETIME_OPERATORS = _C_OPS + _N_OPS + _S_OPS + _L_OPS + _E_OPS -_STRING_OPERATORS = _S_OPS + _E_OPS - - -class FilteringException(Exception): - pass - - -def _is_uuid_key(key: str) -> bool: - return (key.startswith("refs.") and key.endswith(".id")) or key in ( - "root.id", - "tree.id", - "node.id", - "parent.id", - ) - - -def _is_literal_key(key: str) -> bool: - return (key.startswith("refs.") and key.endswith(".slug")) or key in ( - "tree.type", - "node.type", - "node.name", - "status.code", - "exception.type", - "otel.kind", - ) - - -def _is_integer_key(key: str) -> bool: - return key.startswith("refs.") and key.endswith(".version") - - -def _is_float_key(key: str) -> bool: - return key.startswith(("metrics.unit.", "metrics.acc.")) - - -def _is_datetime_key(key: str) -> bool: - return key in ("time.start", "time.end", "exception.timestamp") - - -def _is_string_key(key: str) -> bool: - return key in ("content") - - -def parse_filtering_value( - condition: ConditionDTO, - to_type: Callable[[str], Any], - key_type: str, -) -> None: - is_list = condition.operator in _L_OPS - - try: - if is_list: - condition.value = [to_type(value) for value in condition.value] - else: - condition.value = to_type(condition.value) - except ValueError as exc: - raise FilteringException( - f"Unexpected value '{condition.value}' " - f"for {key_type} key '{condition.key}', " - f"please use a valid {key_type}" - ) from exc - - -def parse_filtering_operator( - condition: ConditionDTO, - allowed_operators: List[Enum], - key_type: str, -): - if condition.operator not in allowed_operators: - operators = [e.value for e in allowed_operators] - - raise FilteringException( - f"Unexpected operator '{condition.operator.value}' " - f"for {key_type} key '{condition.key}', " - f"please use one of {operators}" - ) - - -def parse_condition(condition: ConditionDTO) -> None: - if _is_uuid_key(condition.key): - parse_filtering_value(condition, UUID, "uuid") - parse_filtering_operator(condition, _UUID_OPERATORS, "uuid") - - if _is_literal_key(condition.key): - parse_filtering_value(condition, str, "literal") - parse_filtering_operator(condition, _LITERAL_OPERATORS, "literal") - - elif _is_integer_key(condition.key): - parse_filtering_value(condition, int, "integer") - parse_filtering_operator(condition, _INTEGER_OPERATORS, "integer") - - elif _is_float_key(condition.key): - parse_filtering_value(condition, float, "float") - parse_filtering_operator(condition, _FLOAT_OPERATORS, "float") - - elif _is_datetime_key(condition.key): - parse_filtering_value(condition, datetime.fromisoformat, "datetime") - parse_filtering_operator(condition, _DATETIME_OPERATORS, "datetime") - - elif _is_string_key(condition.key): - parse_filtering_value(condition, str, "string") - parse_filtering_operator(condition, _STRING_OPERATORS, "string") - - else: # All other keys support any operators (conditionally) - pass - - -def parse_filtering( - filtering: FilteringDTO, -) -> None: - for condition in filtering.conditions: - if isinstance(condition, FilteringDTO): - parse_filtering(condition) - elif isinstance(condition, ConditionDTO): - parse_condition(condition) - else: - raise ValueError("Invalid filtering request: unexpected JSON format") - - -def parse_ingest_value( - attributes: Dict[str, Any], - to_type: Callable[[str], Any], - key: str, -) -> None: - try: - attributes[key] = to_type(attributes[key]) - except (ValueError, TypeError): - print_exc() - log.warn( - "Failed to parse attributes:", - key=key, - attribute=attributes[key], - type=to_type, - ) - - del attributes[key] - - -def parse_ingest( - span_dtos: List[SpanDTO], -) -> None: - for span_dto in span_dtos: - typecheck = { - "meta": span_dto.meta, - "metrics": span_dto.metrics, - "refs": span_dto.refs, - } - for field, attributes in typecheck.items(): - if attributes is not None: - for key in list(attributes.keys()): - scoped_key = f"{field}.{key}" - if _is_uuid_key(scoped_key): - parse_ingest_value(attributes, UUID, key) - elif _is_literal_key(scoped_key): - parse_ingest_value(attributes, str, key) - elif _is_integer_key(scoped_key): - parse_ingest_value(attributes, int, key) - elif _is_float_key(scoped_key): - parse_ingest_value(attributes, float, key) - elif _is_datetime_key(scoped_key): - parse_ingest_value(attributes, datetime.fromisoformat, key) - elif _is_string_key(scoped_key): - parse_ingest_value(attributes, str, key) - else: # All other keys are supported as is - pass - - -def parse_span_dtos_to_span_idx( - span_dtos: List[SpanDTO], -) -> Dict[str, SpanDTO]: - span_idx = {span_dto.node.id: span_dto for span_dto in span_dtos} - - return span_idx - - -def parse_span_idx_to_span_id_tree( - span_idx: Dict[str, SpanDTO], -) -> OrderedDict: - span_id_tree = OrderedDict() - index = {} - - def push(span_dto: SpanDTO) -> None: - if span_dto.parent is None: - span_id_tree[span_dto.node.id] = OrderedDict() - index[span_dto.node.id] = span_id_tree[span_dto.node.id] - elif span_dto.parent.id in index: - index[span_dto.parent.id][span_dto.node.id] = OrderedDict() - index[span_dto.node.id] = index[span_dto.parent.id][span_dto.node.id] - - for span_dto in sorted(span_idx.values(), key=lambda span_dto: span_dto.time.start): - push(span_dto) - - return span_id_tree - - -def cumulate_costs( - spans_id_tree: OrderedDict, - spans_idx: Dict[str, SpanDTO], -) -> None: - def _get_unit(span: SpanDTO): - if span.metrics is not None: - return span.metrics.get("unit.costs.total", 0.0) - - return 0.0 - - def _get_acc(span: SpanDTO): - if span.metrics is not None: - return span.metrics.get("acc.costs.total", 0.0) - - return 0.0 - - def _acc(a: float, b: float): - return a + b - - def _set(span: SpanDTO, cost: float): - if span.metrics is None: - span.metrics = {} - - if cost != 0.0: - span.metrics["acc.costs.total"] = cost - - _cumulate_tree_dfs(spans_id_tree, spans_idx, _get_unit, _get_acc, _acc, _set) - - -def cumulate_tokens( - spans_id_tree: OrderedDict, - spans_idx: Dict[str, dict], -) -> None: - def _get_unit(span: SpanDTO): - _tokens = { - "prompt": 0.0, - "completion": 0.0, - "total": 0.0, - } - - if span.metrics is not None: - return { - "prompt": span.metrics.get("unit.tokens.prompt", 0.0), - "completion": span.metrics.get("unit.tokens.completion", 0.0), - "total": span.metrics.get("unit.tokens.total", 0.0), - } - - return _tokens - - def _get_acc(span: SpanDTO): - _tokens = { - "prompt": 0.0, - "completion": 0.0, - "total": 0.0, - } - - if span.metrics is not None: - return { - "prompt": span.metrics.get("acc.tokens.prompt", 0.0), - "completion": span.metrics.get("acc.tokens.completion", 0.0), - "total": span.metrics.get("acc.tokens.total", 0.0), - } - - return _tokens - - def _acc(a: dict, b: dict): - return { - "prompt": a.get("prompt", 0.0) + b.get("prompt", 0.0), - "completion": a.get("completion", 0.0) + b.get("completion", 0.0), - "total": a.get("total", 0.0) + b.get("total", 0.0), - } - - def _set(span: SpanDTO, tokens: dict): - if span.metrics is None: - span.metrics = {} - - if tokens.get("prompt", 0.0) != 0.0: - span.metrics["acc.tokens.prompt"] = tokens.get("prompt", 0.0) - if tokens.get("completion", 0.0) != 0.0: - span.metrics["acc.tokens.completion"] = ( - tokens.get("completion", 0.0) - if tokens.get("completion", 0.0) != 0.0 - else None - ) - if tokens.get("total", 0.0) != 0.0: - span.metrics["acc.tokens.total"] = ( - tokens.get("total", 0.0) if tokens.get("total", 0.0) != 0.0 else None - ) - - _cumulate_tree_dfs(spans_id_tree, spans_idx, _get_unit, _get_acc, _acc, _set) - - -def _cumulate_tree_dfs( - spans_id_tree: OrderedDict, - spans_idx: Dict[str, SpanDTO], - get_unit_metric, - get_acc_metric, - accumulate_metric, - set_metric, -): - for span_id, children_spans_id_tree in spans_id_tree.items(): - children_spans_id_tree: OrderedDict - - cumulated_metric = get_unit_metric(spans_idx[span_id]) - - _cumulate_tree_dfs( - children_spans_id_tree, - spans_idx, - get_unit_metric, - get_acc_metric, - accumulate_metric, - set_metric, - ) - - for child_span_id in children_spans_id_tree.keys(): - marginal_metric = get_acc_metric(spans_idx[child_span_id]) - cumulated_metric = accumulate_metric(cumulated_metric, marginal_metric) - - set_metric(spans_idx[span_id], cumulated_metric) - - -def connect_children( - spans_id_tree: OrderedDict, - spans_idx: Dict[str, dict], -) -> None: - _connect_tree_dfs(spans_id_tree, spans_idx) - - -def _connect_tree_dfs( - spans_id_tree: OrderedDict, - spans_idx: Dict[str, SpanDTO], -): - for span_id, children_spans_id_tree in spans_id_tree.items(): - children_spans_id_tree: OrderedDict - - parent_span = spans_idx[span_id] - - parent_span.nodes = dict() - - _connect_tree_dfs(children_spans_id_tree, spans_idx) - - for child_span_id in children_spans_id_tree.keys(): - child_span_name = spans_idx[child_span_id].node.name - if child_span_name not in parent_span.nodes: - parent_span.nodes[child_span_name] = spans_idx[child_span_id] - else: - if not isinstance(parent_span.nodes[child_span_name], list): - parent_span.nodes[child_span_name] = [ - parent_span.nodes[child_span_name] - ] - - parent_span.nodes[child_span_name].append(spans_idx[child_span_id]) - - if len(parent_span.nodes) == 0: - parent_span.nodes = None - - -TYPES_WITH_COSTS = [ - "embedding", - "query", - "completion", - "chat", - "rerank", -] - - -def calculate_costs(span_idx: Dict[str, SpanDTO]): - for span in span_idx.values(): - if ( - span.node.type - and span.node.type.name.lower() in TYPES_WITH_COSTS - and span.meta - and span.metrics - ): - model = span.meta.get("response.model") or span.meta.get( - "configuration.model" - ) - prompt_tokens = span.metrics.get("unit.tokens.prompt", 0.0) - completion_tokens = span.metrics.get("unit.tokens.completion", 0.0) - - try: - costs = cost_calculator.cost_per_token( - model=model, - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - ) - - if not costs: - continue - - prompt_cost, completion_cost = costs - total_cost = prompt_cost + completion_cost - - span.metrics["unit.costs.prompt"] = prompt_cost - span.metrics["unit.costs.completion"] = completion_cost - span.metrics["unit.costs.total"] = total_cost - - except: # pylint: disable=bare-except - log.warn( - "Failed to calculate costs", - model=model, - prompt_tokens=prompt_tokens, - completion_tokens=completion_tokens, - ) diff --git a/api/oss/src/core/observability/dtos.py b/api/oss/src/core/otel/dtos.py similarity index 100% rename from api/oss/src/core/observability/dtos.py rename to api/oss/src/core/otel/dtos.py diff --git a/api/oss/src/core/workflows/dtos.py b/api/oss/src/core/workflows/dtos.py index 880da2a44a..87874e8afc 100644 --- a/api/oss/src/core/workflows/dtos.py +++ b/api/oss/src/core/workflows/dtos.py @@ -56,8 +56,6 @@ ) from oss.src.core.tracing.dtos import Trace -from oss.src.apis.fastapi.observability.models import AgentaVersionedTreeDTO as Tree - from agenta.sdk.models.workflows import ( WorkflowServiceRequestData, # export WorkflowServiceResponseData, # export diff --git a/api/oss/src/core/workflows/service.py b/api/oss/src/core/workflows/service.py index 866fddf38e..a02cd9b9bd 100644 --- a/api/oss/src/core/workflows/service.py +++ b/api/oss/src/core/workflows/service.py @@ -50,7 +50,7 @@ WorkflowServiceResponseData, ) -from oss.src.services.auth_helper import sign_secret_token +from oss.src.services.auth_service import sign_secret_token from oss.src.services.db_manager import get_project_by_id from agenta.sdk.decorators.running import ( diff --git a/api/oss/src/dbs/postgres/observability/__init__.py b/api/oss/src/dbs/postgres/observability/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/api/oss/src/dbs/postgres/observability/dao.py b/api/oss/src/dbs/postgres/observability/dao.py deleted file mode 100644 index 100460fc93..0000000000 --- a/api/oss/src/dbs/postgres/observability/dao.py +++ /dev/null @@ -1,943 +0,0 @@ -from typing import Optional, List, Tuple, Union -from datetime import datetime, timedelta, time, timezone -from traceback import print_exc -from uuid import UUID - -from sqlalchemy.exc import DBAPIError -from sqlalchemy import and_, or_, not_, distinct, Column, func, cast, text, Select -from sqlalchemy import TIMESTAMP, Enum, UUID as SQLUUID, Integer, Numeric -from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy.future import select -from sqlalchemy.dialects import postgresql - -from oss.src.utils.logging import get_module_logger - -from oss.src.dbs.postgres.shared.engine import engine -from oss.src.dbs.postgres.observability.dbes import NodesDBE -from oss.src.dbs.postgres.observability.mappings import ( - map_span_dto_to_span_dbe, - map_span_dbe_to_span_dto, - map_bucket_dbes_to_dtos, -) - -from oss.src.core.observability.interfaces import ObservabilityDAOInterface -from oss.src.core.observability.dtos import TreeType -from oss.src.core.observability.dtos import ( - QueryDTO, - SpanDTO, - AnalyticsDTO, - BucketDTO, -) -from oss.src.core.observability.dtos import ( - FilteringDTO, - ConditionDTO, - LogicalOperator, - ComparisonOperator, - NumericOperator, - StringOperator, - ListOperator, - ExistenceOperator, -) -from oss.src.core.observability.utils import FilteringException -from oss.src.core.observability.utils import ( - _is_uuid_key, - _is_literal_key, - _is_integer_key, - _is_float_key, - _is_datetime_key, - _is_string_key, -) - -log = get_module_logger(__name__) - -_DEFAULT_TIME_DELTA = timedelta(days=30) -_DEFAULT_INTERVAL = 1440 # 1 day -_DEFAULT_INTERVAL_TEXT = "1 day" -_MAX_ALLOWED_BUCKETS = 1024 -_SUGGESTED_BUCKETS_LIST = [ - (1 * 1, "1 minute"), - (1 * 5, "5 minutes"), - (1 * 15, "15 minutes"), - (1 * 30, "30 minutes"), - (1 * 60 * 1, "1 hour"), - (1 * 60 * 3, "3 hours"), - (1 * 60 * 6, "6 hours"), - (1 * 60 * 12, "12 hours"), - (1 * 60 * 24 * 1, "1 day"), - (1 * 60 * 24 * 3, "3 days"), - (1 * 60 * 24 * 7, "7 days"), - (1 * 60 * 24 * 14, "14 days"), - (1 * 60 * 24 * 30, "30 days"), -] - -DEBUG_ARGS = { - "dialect": postgresql.dialect(), - "compile_kwargs": {"literal_binds": True}, -} -STATEMENT_TIMEOUT = 15_000 # milliseconds -COLUMNS_TO_EXCLUDE = ["content"] -COLUMNS_TO_INCLUDE = [ - column - for column in NodesDBE.__table__.columns - if column.name not in COLUMNS_TO_EXCLUDE -] - - -class ObservabilityDAO(ObservabilityDAOInterface): - def __init__(self): - pass - - async def query( - self, - *, - project_id: UUID, - # - query_dto: QueryDTO, - ) -> Tuple[List[SpanDTO], Optional[int]]: - try: - async with engine.tracing_session() as session: - _stmt = text(f"SET LOCAL statement_timeout = '{STATEMENT_TIMEOUT}'") - await session.execute(_stmt) - - # BASE (SUB-)QUERY - stmt = select(*COLUMNS_TO_INCLUDE) - # ---------------- - - # GROUPING - grouping = query_dto.grouping - grouping_column: Optional[Column] = None - # -------- - if grouping and grouping.focus.value != "node": - grouping_column = getattr( - NodesDBE, - grouping.focus.value + "_id", - ) - - stmt = select( - distinct(grouping_column).label("grouping_key"), - NodesDBE.created_at, - ) - # -------- - - # SCOPING - stmt = stmt.filter_by( - project_id=project_id, - ) - # ------- - - # WINDOWING - windowing = query_dto.windowing - # --------- - if windowing: - if windowing.oldest: - stmt = stmt.filter(NodesDBE.created_at >= windowing.oldest) - - if windowing.newest: - stmt = stmt.filter(NodesDBE.created_at < windowing.newest) - # --------- - - # FILTERING - filtering = query_dto.filtering - # --------- - if filtering: - operator = filtering.operator - conditions = filtering.conditions - - stmt = stmt.filter( - _combine( - operator, - _filters(conditions), - ) - ) - # --------- - - # SORTING - stmt = stmt.order_by( - NodesDBE.created_at.desc(), - ) - # ------- - - # COUNTING // dangerous with large datasets - count_stmt = select( - func.count() # pylint: disable=E1102:not-callable - ).select_from(stmt.subquery()) - - count = (await session.execute(count_stmt)).scalar() - # -------- - - # PAGINATION - pagination = query_dto.pagination - # ---------- - if pagination: - stmt = _chunk( - stmt, - **pagination.model_dump(), - ) - # ---------- - - # GROUPING - if grouping and grouping_column: - substmt = stmt.subquery() - - stmt = select(*COLUMNS_TO_INCLUDE) - stmt = stmt.filter( - grouping_column.in_(select(substmt.c["grouping_key"])) - ) - - # SORTING - stmt = stmt.order_by( - NodesDBE.created_at.desc(), - NodesDBE.time_start.asc(), - ) - # ------- - else: - # SORTING - stmt = stmt.order_by( - NodesDBE.time_start.desc(), - ) - # ------- - # -------- - - # DEBUGGING - # log.trace(str(stmt.compile(**DEBUG_ARGS)).replace("\n", " ")) - # --------- - - # QUERY EXECUTION - spans = (await session.execute(stmt)).all() - # --------------- - - return [map_span_dbe_to_span_dto(span) for span in spans], count - - except DBAPIError as e: - print_exc() - - if "QueryCanceledError" in str(e.orig): - raise FilteringException( - "TracingQuery execution was cancelled due to timeout. " - "Please try again with a smaller time range." - ) from e - - raise e - - except AttributeError as e: - print_exc() - - raise FilteringException( - "Failed to run analytics due to non-existent key(s)." - ) from e - - except Exception as e: - print_exc() - - raise e - - async def analytics( - self, - *, - project_id: UUID, - analytics_dto: AnalyticsDTO, - ) -> Tuple[List[BucketDTO], Optional[int]]: - try: - async with engine.tracing_session() as session: - stmt = text(f"SET LOCAL statement_timeout = '{STATEMENT_TIMEOUT}'") - await session.execute(stmt) - - # WINDOWING - today = datetime.now() - start_of_next_day = datetime.combine( - today + timedelta(days=1), time.min - ) - - oldest = None - newest = None - interval_text = None - # --------- - if analytics_dto.windowing: - if analytics_dto.windowing.newest: - newest = analytics_dto.windowing.newest - else: - newest = start_of_next_day - - if analytics_dto.windowing.oldest: - if analytics_dto.windowing.oldest > newest: - oldest = newest - _DEFAULT_TIME_DELTA - else: - oldest = analytics_dto.windowing.oldest - else: - oldest = newest - _DEFAULT_TIME_DELTA - - if analytics_dto.windowing.interval: - _desired_interval = analytics_dto.windowing.interval - else: - _desired_interval = _DEFAULT_INTERVAL - - _interval_minutes = (newest - oldest).total_seconds() // 60 - - _desired_buckets = _interval_minutes // _desired_interval - - if _desired_buckets > _MAX_ALLOWED_BUCKETS: - for ( - _suggested_minutes, - _suggest_interval_text, - ) in _SUGGESTED_BUCKETS_LIST: - _suggested_buckets = _interval_minutes // _suggested_minutes - - if _suggested_buckets <= _MAX_ALLOWED_BUCKETS: - interval_text = _suggest_interval_text - break - - if not interval_text: - interval_text = _SUGGESTED_BUCKETS_LIST[-1][1] - - else: - interval_text = f"{_desired_interval} minute{'s' if _desired_interval > 1 else ''}" - - else: - newest = start_of_next_day - oldest = newest - _DEFAULT_TIME_DELTA - interval_text = _DEFAULT_INTERVAL_TEXT - - # --------- - - # BASE QUERY - _count = func.count().label("count") # pylint: disable=not-callable - _duration = None - _cost = None - _tokens = None - _timestamp = func.date_bin( - text(f"'{interval_text}'"), - NodesDBE.created_at, - oldest, - ).label("timestamp") - # ---------- - - # GROUPING - if ( - analytics_dto.grouping - and analytics_dto.grouping.focus.value != "node" - ): - _duration = func.sum( - cast( - NodesDBE.metrics["acc.duration.total"], - Numeric, - ) - ).label("duration") - _cost = func.sum( - cast( - NodesDBE.metrics["acc.costs.total"], - Numeric, - ) - ).label("cost") - _tokens = func.sum( - cast( - NodesDBE.metrics["acc.tokens.total"], - Integer, - ) - ).label("tokens") - elif not analytics_dto.grouping or ( - analytics_dto.grouping - and analytics_dto.grouping.focus.value == "node" - ): - _duration = func.sum( - cast( - NodesDBE.metrics["unit.duration.total"], - Numeric, - ) - ).label("duration") - _cost = func.sum( - cast( - NodesDBE.metrics["unit.costs.total"], - Numeric, - ) - ).label("cost") - _tokens = func.sum( - cast( - NodesDBE.metrics["unit.tokens.total"], - Integer, - ) - ).label("tokens") - else: - raise ValueError("Unknown grouping focus.") - # -------- - - # BASE QUERY - total_stmt = select( - _count, - _duration, - _cost, - _tokens, - _timestamp, - ).select_from(NodesDBE) - - error_stmt = select( - _count, - _duration, - _cost, - _tokens, - _timestamp, - ).select_from(NodesDBE) - # ---------- - - # WINDOWING - total_stmt = total_stmt.filter( - NodesDBE.created_at >= oldest, - NodesDBE.created_at < newest, - ) - - error_stmt = error_stmt.filter( - NodesDBE.created_at >= oldest, - NodesDBE.created_at < newest, - ) - # --------- - - # SCOPING - total_stmt = total_stmt.filter_by( - project_id=project_id, - ) - - error_stmt = error_stmt.filter_by( - project_id=project_id, - ) - # ------- - - # TOTAL vs ERROR - error_stmt = error_stmt.filter( - NodesDBE.exception.isnot(None), - ) - # ---------------- - - # FILTERING - filtering = analytics_dto.filtering - # --------- - if filtering: - operator = filtering.operator - conditions = filtering.conditions - - total_stmt = total_stmt.filter( - _combine( - operator, - _filters(conditions), - ) - ) - - error_stmt = error_stmt.filter( - _combine( - operator, - _filters(conditions), - ) - ) - # --------- - - # GROUPING - if ( - analytics_dto.grouping - and analytics_dto.grouping.focus.value != "node" - ): - total_stmt = total_stmt.filter_by( - parent_id=None, - ) - - error_stmt = error_stmt.filter_by( - parent_id=None, - ) - # -------- - - # SORTING - total_stmt = total_stmt.group_by("timestamp") - - error_stmt = error_stmt.group_by("timestamp") - # ------- - - # DEBUGGING - # TODO: HIDE THIS BEFORE RELEASING - # print( - # str( - # total_stmt.compile( - # dialect=postgresql.dialect(), - # compile_kwargs={"literal_binds": True}, - # ) - # ) - # ) - # print("...") - # print( - # str( - # error_stmt.compile( - # dialect=postgresql.dialect(), - # compile_kwargs={"literal_binds": True}, - # ) - # ) - # ) - # --------- - - # QUERY EXECUTION - total_bucket_dbes = (await session.execute(total_stmt)).all() - error_bucket_dbes = (await session.execute(error_stmt)).all() - # --------------- - - interval = _to_minutes(interval_text) - - timestamps = _to_timestamps(oldest, newest, interval) - - bucket_dtos, count = map_bucket_dbes_to_dtos( - total_bucket_dbes=total_bucket_dbes, - error_bucket_dbes=error_bucket_dbes, - interval=interval, - timestamps=timestamps, - ) - - return bucket_dtos, count - - except DBAPIError as e: - print_exc() - - if "QueryCanceledError" in str(e.orig): - raise FilteringException( - "TracingQuery execution was cancelled due to timeout. " - "Please try again with a smaller time range." - ) from e - - raise e - - except AttributeError as e: - print_exc() - - raise FilteringException( - "Failed to run analytics due to non-existent key(s)." - ) from e - - except Exception as e: - print_exc() - - raise e - - async def create_one( - self, - *, - project_id: UUID, - span_dto: SpanDTO, - ) -> None: - span_dto.tree.type = TreeType.INVOCATION - - span_dbe = map_span_dto_to_span_dbe( - project_id=project_id, - span_dto=span_dto, - ) - - async with engine.tracing_session() as session: - session.add(span_dbe) - await session.commit() - - async def create_many( - self, - *, - project_id: UUID, - span_dtos: List[SpanDTO], - ) -> None: - for span_dto in span_dtos: - span_dto.tree.type = TreeType.INVOCATION - - span_dbes = [ - map_span_dto_to_span_dbe( - project_id=project_id, - span_dto=span_dto, - ) - for span_dto in span_dtos - ] - - async with engine.tracing_session() as session: - for span_dbe in span_dbes: - session.add(span_dbe) - - await session.commit() - - async def read_one( - self, - *, - project_id: UUID, - node_id: UUID, - to_dto: bool = True, - ) -> Union[Optional[SpanDTO], Optional[NodesDBE]]: - span_dbe = None - async with engine.tracing_session() as session: - stmt = select(NodesDBE) - - stmt = stmt.filter_by( - project_id=project_id, - node_id=node_id, - ) - - span_dbe = (await session.execute(stmt)).scalars().one_or_none() - - span_dto = None - if span_dbe and to_dto: - span_dto = map_span_dbe_to_span_dto(span_dbe) - - return span_dto - - return span_dbe - - async def read_many( - self, - *, - project_id: UUID, - node_ids: List[UUID], - to_dto: bool = True, - ) -> Union[List[SpanDTO], List[NodesDBE]]: - span_dbes = [] - async with engine.tracing_session() as session: - stmt = select(NodesDBE) - - stmt = stmt.filter_by(project_id=project_id) - - stmt = stmt.filter(NodesDBE.node_id.in_(node_ids)) - - span_dbes = (await session.execute(stmt)).scalars().all() - - span_dtos = [] - if span_dbes and to_dto: - span_dtos = [map_span_dbe_to_span_dto(span_dbe) for span_dbe in span_dbes] - - return span_dtos - - return span_dbes - - async def read_children( - self, - *, - project_id: UUID, - parent_id: UUID, - to_dto: bool = True, - ) -> Union[List[SpanDTO], List[NodesDBE]]: - span_dbes = [] - async with engine.tracing_session() as session: - stmt = select(NodesDBE) - - stmt = stmt.filter_by(project_id=project_id) - - stmt = stmt.filter_by(parent_id=parent_id) - - span_dbes = (await session.execute(stmt)).scalars().all() - - span_dtos = [] - if span_dbes and to_dto: - span_dtos = [map_span_dbe_to_span_dto(span_dbe) for span_dbe in span_dbes] - - return span_dtos - - return span_dbes - - async def delete_one( - self, - *, - project_id: UUID, - node_id: UUID, - ) -> None: - span_dbe = await self.read_one( - project_id=project_id, - node_id=node_id, - to_dto=False, - ) - - if span_dbe: - # COULD BE REPLACED WITH A CASCADE - children_dbes = await self.read_children( - project_id=project_id, - parent_id=node_id, - to_dto=False, - ) - - if children_dbes: - await self.delete_many( - project_id=project_id, - node_ids=[child_dbe.node_id for child_dbe in children_dbes], - ) - # -------------------------------- - - async with engine.tracing_session() as session: - await session.delete(span_dbe) - await session.commit() - - async def delete_many( - self, - *, - project_id: UUID, - node_ids: List[UUID], - ) -> None: - span_dbes = await self.read_many( - project_id=project_id, - node_ids=node_ids, - to_dto=False, - ) - - if span_dbes: - for span_dbe in span_dbes: - # COULD BE REPLACED WITH A CASCADE - children_dbes = await self.read_children( - project_id=project_id, - parent_id=span_dbe.node_id, - to_dto=False, - ) - - if children_dbes: - await self.delete_many( - project_id=project_id, - node_ids=[child_dbe.node_id for child_dbe in children_dbes], - ) - # -------------------------------- - - async with engine.tracing_session() as session: - await session.delete(span_dbe) - await session.commit() - - -def _chunk( - stmt: Select, - page: Optional[int] = None, - size: Optional[int] = None, - next: Optional[datetime] = None, # pylint: disable=W0621:redefined-builtin - stop: Optional[datetime] = None, -) -> Select: - # 1. LIMIT size OFFSET (page - 1) * size - # -> unstable if windowing.newest is not set - if page and size: - limit = size - offset = (page - 1) * size - - stmt = stmt.limit(limit).offset(offset) - - # 2. WHERE next > created_at LIMIT size - # -> unstable if created_at is not unique - elif next and size: - stmt = stmt.filter(NodesDBE.created_at < next) - stmt = stmt.limit(size) - - # 3. WHERE next > created_at AND created_at >= stop - # -> stable thanks to the = stop) - - # 4. WHERE LIMIT size - # -> useful as a starter - elif size: - stmt = stmt.limit(size) - - # 5. WHERE created_at >= stop - # -> useful as a starter - elif stop: - stmt = stmt.filter(NodesDBE.created_at >= stop) - - # 6. WHERE next > created_at - # -> rather useless - elif next: - stmt = stmt.filter(NodesDBE.created_at < next) - - return stmt - - -def _combine( - operator: LogicalOperator, - conditions: list, -): - if operator == LogicalOperator.AND: - return and_(*conditions) - elif operator == LogicalOperator.OR: - return or_(*conditions) - elif operator == LogicalOperator.NOT: - return not_(and_(*conditions)) - else: - raise ValueError(f"Unknown operator: {operator}") - - -_FLAT_KEYS = { - "time.start": "time_start", - "time.end": "time_end", - "root.id": "root_id", - "tree.id": "tree_id", - "tree.type": "tree_type", - "node.id": "node_id", - "node.type": "node_type", - "node.name": "node_name", - "parent.id": "parent_id", -} - -_NESTED_FIELDS = ("data",) - - -def _filters(filtering: FilteringDTO) -> list: - _conditions = [] - - for condition in filtering: - if isinstance(condition, FilteringDTO): - _conditions.append( - _combine( - condition.operator, - _filters( - condition.conditions, - ), - ) - ) - - elif isinstance(condition, ConditionDTO): - _key = condition.key - value = condition.value - - # MAP FLAT KEYS - if _key in _FLAT_KEYS: - _key = _FLAT_KEYS[_key] - - # SPLIT FIELD AND KEY - _split = _key.split(".", 1) - field = _split[0] - key = _split[1] if len(_split) > 1 else None - - # GET COLUMN AS ATTRIBUTE - attribute: Column = getattr(NodesDBE, field) - - if isinstance(attribute.type, JSONB) and key: - if field in _NESTED_FIELDS: - key = key.split(".") - - for k in key[-1]: - attribute = attribute[k] - - attribute = attribute[key].astext - - # CASTING - if _is_uuid_key(_key): - attribute = cast(attribute, SQLUUID) - elif _is_literal_key(_key): - pass - elif _is_integer_key(_key): - attribute = cast(attribute, Integer) - elif _is_float_key(_key): - attribute = cast(attribute, Numeric) - elif _is_datetime_key(_key): - pass - elif _is_string_key(_key): - pass - else: - pass - - if isinstance(attribute.type, TIMESTAMP): - value = datetime.fromisoformat(value) - - if isinstance(attribute.type, Enum): - value = str(value).upper() - - # COMPARISON OPERATORS - if isinstance(condition.operator, ComparisonOperator): - if condition.operator == ComparisonOperator.IS: - _conditions.append(attribute == value) - elif condition.operator == ComparisonOperator.IS_NOT: - _conditions.append(attribute != value) - - # NUMERIC OPERATORS - elif isinstance(condition.operator, NumericOperator): - if condition.operator == NumericOperator.EQ: - _conditions.append(attribute == value) - elif condition.operator == NumericOperator.NEQ: - _conditions.append(attribute != value) - elif condition.operator == NumericOperator.GT: - _conditions.append(attribute > value) - elif condition.operator == NumericOperator.LT: - _conditions.append(attribute < value) - elif condition.operator == NumericOperator.GTE: - _conditions.append(attribute >= value) - elif condition.operator == NumericOperator.LTE: - _conditions.append(attribute <= value) - elif condition.operator == NumericOperator.BETWEEN: - _conditions.append(attribute.between(value[0], value[1])) - - # STRING OPERATORS - elif isinstance(condition.operator, StringOperator): - if condition.operator == StringOperator.STARTSWITH: - _conditions.append(attribute.startswith(value)) - elif condition.operator == StringOperator.ENDSWITH: - _conditions.append(attribute.endswith(value)) - elif condition.operator == StringOperator.CONTAINS: - _conditions.append(attribute.contains(value)) - elif condition.operator == StringOperator.LIKE: - _conditions.append(attribute.like(value)) - elif condition.operator == StringOperator.MATCHES: - if condition.options: - case_sensitive = condition.options.case_sensitive - exact_match = condition.options.exact_match - else: - case_sensitive = False - exact_match = False - - if exact_match: - if case_sensitive: - _conditions.append(attribute.like(value)) - else: - _conditions.append(attribute.ilike(value)) - else: - pattern = f"%{value}%" - if case_sensitive: - _conditions.append(attribute.like(pattern)) - else: - _conditions.append(attribute.ilike(pattern)) - - # LIST OPERATORS - elif isinstance(condition.operator, ListOperator): - if condition.operator == ListOperator.IN: - _conditions.append(attribute.in_(value)) - - # EXISTENCE OPERATORS - elif isinstance(condition.operator, ExistenceOperator): - if condition.operator == ExistenceOperator.EXISTS: - _conditions.append(attribute.isnot(None)) - elif condition.operator == ExistenceOperator.NOT_EXISTS: - _conditions.append(attribute.is_(None)) - - return _conditions - - -def _to_minutes( - interval_text: str, -) -> int: - quantity, unit = interval_text.split() - quantity = int(quantity) - - if unit == "minute" or unit == "minutes": - return quantity - elif unit == "hour" or unit == "hours": - return quantity * 60 - elif unit == "day" or unit == "days": - return quantity * 1440 - elif unit == "week" or unit == "weeks": - return quantity * 10080 - elif unit == "month" or unit == "months": - return quantity * 43200 - else: - raise ValueError(f"Unknown time unit: {unit}") - - -def _to_timestamps( - oldest: datetime, - newest: datetime, - interval: int, -) -> List[datetime]: - buckets = [] - - _oldest = oldest - if oldest.tzinfo is None: - _oldest = oldest.replace(tzinfo=timezone.utc) - else: - _oldest = oldest.astimezone(timezone.utc) - - _newest = newest - if newest.tzinfo is None: - _newest = newest.replace(tzinfo=timezone.utc) - else: - _newest = newest.astimezone(timezone.utc) - - bucket_start = _oldest - - while bucket_start < _newest: - buckets.append(bucket_start) - - bucket_start += timedelta(minutes=interval) - - return buckets diff --git a/api/oss/src/dbs/postgres/observability/dbas.py b/api/oss/src/dbs/postgres/observability/dbas.py deleted file mode 100644 index b4f498cde8..0000000000 --- a/api/oss/src/dbs/postgres/observability/dbas.py +++ /dev/null @@ -1,96 +0,0 @@ -from sqlalchemy.dialects.postgresql import JSONB -from sqlalchemy import Column, UUID, TIMESTAMP, Enum as SQLEnum, String - -from oss.src.core.observability.dtos import TreeType, NodeType -from oss.src.dbs.postgres.shared.dbas import ProjectScopeDBA, LegacyLifecycleDBA - - -class RootDBA: - __abstract__ = True - - root_id = Column(UUID(as_uuid=True), nullable=False) - - -class TreeDBA: - __abstract__ = True - - tree_id = Column(UUID(as_uuid=True), nullable=False) - tree_type = Column(SQLEnum(TreeType), nullable=True) - - -class NodeDBA: - __abstract__ = True - - node_id = Column(UUID(as_uuid=True), nullable=False) - node_name = Column(String, nullable=False) - node_type = Column(SQLEnum(NodeType), nullable=True) - - -class ParentDBA: - __abstract__ = True - - parent_id = Column(UUID(as_uuid=True), nullable=True) - - -class TimeDBA: - __abstract__ = True - - time_start = Column(TIMESTAMP, nullable=False) - time_end = Column(TIMESTAMP, nullable=False) - - -class StatusDBA: - __abstract__ = True - - status = Column(JSONB(none_as_null=True), nullable=True) - - -class AttributesDBA: - __abstract__ = True - - data = Column(JSONB(none_as_null=True), nullable=True) - metrics = Column(JSONB(none_as_null=True), nullable=True) - meta = Column(JSONB(none_as_null=True), nullable=True) - refs = Column(JSONB(none_as_null=True), nullable=True) - - -class EventsDBA: - __abstract__ = True - - exception = Column(JSONB(none_as_null=True), nullable=True) - - -class LinksDBA: - __abstract__ = True - - links = Column(JSONB(none_as_null=True), nullable=True) - - -class FullTextSearchDBA: - __abstract__ = True - - content = Column(String, nullable=True) - - -class OTelDBA: - __abstract__ = True - - otel = Column(JSONB(none_as_null=True), nullable=True) - - -class SpanDBA( - ProjectScopeDBA, - LegacyLifecycleDBA, - RootDBA, - TreeDBA, - NodeDBA, - ParentDBA, - TimeDBA, - StatusDBA, - AttributesDBA, - EventsDBA, - LinksDBA, - FullTextSearchDBA, - OTelDBA, -): - __abstract__ = True diff --git a/api/oss/src/dbs/postgres/observability/dbes.py b/api/oss/src/dbs/postgres/observability/dbes.py deleted file mode 100644 index 006646a472..0000000000 --- a/api/oss/src/dbs/postgres/observability/dbes.py +++ /dev/null @@ -1,30 +0,0 @@ -from sqlalchemy import PrimaryKeyConstraint, Index - -from oss.src.dbs.postgres.shared.base import Base -from oss.src.dbs.postgres.observability.dbas import SpanDBA - - -class NodesDBE(Base, SpanDBA): - __tablename__ = "nodes" - - __table_args__ = ( - PrimaryKeyConstraint( - "project_id", - "node_id", - ), # focus = node - Index( - "index_project_id_tree_id", - "project_id", - "tree_id", - ), # focus = tree - Index( - "index_project_id_root_id", - "project_id", - "root_id", - ), # focus = root - Index( - "index_project_id_node_id", - "project_id", - "created_at", - ), # sorting and pagination - ) diff --git a/api/oss/src/dbs/postgres/observability/mappings.py b/api/oss/src/dbs/postgres/observability/mappings.py deleted file mode 100644 index 6fcc8269c9..0000000000 --- a/api/oss/src/dbs/postgres/observability/mappings.py +++ /dev/null @@ -1,190 +0,0 @@ -from typing import List, Tuple, Optional -from json import dumps, loads -from datetime import datetime -from uuid import UUID - -from oss.src.core.shared.dtos import LegacyLifecycleDTO -from oss.src.core.observability.dtos import ( - RootDTO, - TreeDTO, - NodeDTO, - ParentDTO, - TimeDTO, - StatusDTO, - ExceptionDTO, - OTelExtraDTO, - SpanDTO, - MetricsDTO, - BucketDTO, -) - -from oss.src.dbs.postgres.observability.dbes import NodesDBE - - -def map_span_dbe_to_span_dto(span: NodesDBE) -> SpanDTO: - trace_id = UUID(str(span.tree_id)).hex - span_id = UUID(str(span.node_id)).hex[16:] - - return SpanDTO( - trace_id=trace_id, - span_id=span_id, - lifecycle=LegacyLifecycleDTO( - created_at=str(span.created_at), - updated_at=str(span.updated_at), - updated_by_id=str(span.updated_by_id) if span.updated_by_id else None, - ), - root=RootDTO( - id=span.root_id, - ), - tree=TreeDTO( - id=span.tree_id, - type=span.tree_type, - ), - node=NodeDTO( - id=span.node_id, - type=span.node_type, - name=span.node_name, - ), - parent=( - ParentDTO( - id=span.parent_id, - ) - if span.parent_id - else None - ), - time=TimeDTO( - start=span.time_start, - end=span.time_end, - ), - status=StatusDTO( - code=span.status.get("code"), - message=span.status.get("message"), - ), - # ATTRIBUTES - data=span.data, - metrics=span.metrics, - meta=span.meta, - refs=span.refs, - # EVENTS - exception=( - ExceptionDTO( - timestamp=span.exception.get("timestamp"), - type=span.exception.get("type"), - message=span.exception.get("message"), - stacktrace=span.exception.get("stacktrace"), - attributes=span.exception.get("attributes"), - ) - if span.exception - else None - ), - # LINKS - links=span.links, - # OTEL - otel=OTelExtraDTO(**span.otel) if span.otel else None, - ) - - -def map_span_dto_to_span_dbe( - project_id: str, - span_dto: SpanDTO, -) -> NodesDBE: - span_dbe = NodesDBE( - # SCOPE - project_id=project_id, - # LIFECYCLE - created_at=span_dto.lifecycle.created_at if span_dto.lifecycle else None, - updated_at=span_dto.lifecycle.updated_at if span_dto.lifecycle else None, - updated_by_id=span_dto.lifecycle.updated_by_id if span_dto.lifecycle else None, - # ROOT - root_id=span_dto.root.id, - # TREE - tree_id=span_dto.tree.id, - tree_type=span_dto.tree.type, - # NODE - node_id=span_dto.node.id, - node_type=span_dto.node.type, - node_name=span_dto.node.name, - # PARENT - parent_id=span_dto.parent.id if span_dto.parent else None, - # TIME - time_start=span_dto.time.start, - time_end=span_dto.time.end, - # STATUS - status=( - span_dto.status.model_dump(exclude_none=True) if span_dto.status else None - ), - # ATTRIBUTES - data=span_dto.encode(span_dto.data), - metrics=span_dto.encode(span_dto.metrics), - meta=span_dto.encode(span_dto.meta), - refs=span_dto.encode(span_dto.refs), - # EVENTS - exception=( - loads(span_dto.exception.model_dump_json()) if span_dto.exception else None - ), - # LINKS - links=( - [loads(link.model_dump_json()) for link in span_dto.links] - if span_dto.links - else None - ), - # FULL TEXT SEARCH - content=dumps(span_dto.data), - # OTEL - otel=loads(span_dto.otel.model_dump_json()) if span_dto.otel else None, - ) - - return span_dbe - - -def map_bucket_dbes_to_dtos( - total_bucket_dbes: List[NodesDBE], - error_bucket_dbes: List[NodesDBE], - interval: int, - timestamps: Optional[List[datetime]] = None, -) -> Tuple[List[BucketDTO], int]: - total_metrics = { - bucket.timestamp: MetricsDTO( - count=bucket.count, - duration=bucket.duration, - cost=bucket.cost, - tokens=bucket.tokens, - ) - for bucket in total_bucket_dbes - } - - error_metrics = { - bucket.timestamp: MetricsDTO( - count=bucket.count, - duration=bucket.duration, - cost=bucket.cost, - tokens=bucket.tokens, - ) - for bucket in error_bucket_dbes - } - - total_timestamps = timestamps - if not total_timestamps: - total_timestamps = list( - set(list(total_metrics.keys()) + list(error_metrics.keys())) - ) - total_timestamps.sort() - - _total_timestamps = list( - set(list(total_metrics.keys()) + list(error_metrics.keys())) - ) - _total_timestamps.sort() - - bucket_dtos = [ - BucketDTO( - timestamp=timestamp, - interval=interval, - total=total_metrics.get(timestamp, MetricsDTO()), - error=error_metrics.get(timestamp, MetricsDTO()), - ) - for timestamp in total_timestamps - ] - - count = len(bucket_dtos) - - return bucket_dtos, count diff --git a/api/oss/src/routers/evaluators_router.py b/api/oss/src/routers/evaluators_router.py index 336572ed6e..6117a447b9 100644 --- a/api/oss/src/routers/evaluators_router.py +++ b/api/oss/src/routers/evaluators_router.py @@ -22,7 +22,7 @@ ) from oss.src.core.secrets.utils import get_llm_providers_secrets -from oss.src.services.auth_helper import sign_secret_token +from oss.src.services.auth_service import sign_secret_token from agenta.sdk.contexts.running import RunningContext, running_context_manager from agenta.sdk.contexts.tracing import TracingContext, tracing_context_manager diff --git a/api/oss/src/services/analytics_service.py b/api/oss/src/services/analytics_service.py index edcd118bef..b24ed7122e 100644 --- a/api/oss/src/services/analytics_service.py +++ b/api/oss/src/services/analytics_service.py @@ -49,7 +49,7 @@ if POSTHOG_API_KEY: posthog.api_key = POSTHOG_API_KEY posthog.host = POSTHOG_HOST - log.info("PostHog initialized with host %s", POSTHOG_HOST) + log.info("Agenta - PostHog URL: %s", POSTHOG_HOST) else: log.warn("PostHog API key not found in environment variables") @@ -328,12 +328,12 @@ def _get_event_name_from_path( # <----------- End of Evaluation Events -------------> # <----------- Observability Events -------------> - if method == "POST" and ( - "/otlp/v1/traces" in path or "/observability/v1/otlp/traces" in path - ): + if method == "POST" and "/otlp/v1/traces" in path: return "spans_created" - elif method == "GET" and "/observability/v1/traces" in path: + elif method == "GET" and ( + "/tracing" in path or "/invocations" in path or "/annotations" in path + ): return "spans_fetched" # <----------- End of Observability Events -------------> diff --git a/api/oss/src/services/auth_helper.py b/api/oss/src/services/auth_service.py similarity index 100% rename from api/oss/src/services/auth_helper.py rename to api/oss/src/services/auth_service.py diff --git a/api/oss/src/services/llm_apps_service.py b/api/oss/src/services/llm_apps_service.py index d5ba69f965..858f4132f9 100644 --- a/api/oss/src/services/llm_apps_service.py +++ b/api/oss/src/services/llm_apps_service.py @@ -8,7 +8,7 @@ from oss.src.utils.logging import get_module_logger from oss.src.utils import common from oss.src.services import helpers -from oss.src.services.auth_helper import sign_secret_token +from oss.src.services.auth_service import sign_secret_token from oss.src.services.db_manager import get_project_by_id from oss.src.apis.fastapi.tracing.utils import make_hash_id from oss.src.models.shared_models import InvokationResult, Result, Error diff --git a/api/oss/src/tasks/evaluations/batch.py b/api/oss/src/tasks/evaluations/batch.py index 324ed74e49..c90f6a9cc7 100644 --- a/api/oss/src/tasks/evaluations/batch.py +++ b/api/oss/src/tasks/evaluations/batch.py @@ -11,7 +11,7 @@ from oss.src.utils.helpers import parse_url, get_slug_from_name_and_id from oss.src.utils.logging import get_module_logger from oss.src.utils.common import is_ee -from oss.src.services.auth_helper import sign_secret_token +from oss.src.services.auth_service import sign_secret_token from oss.src.services import llm_apps_service from oss.src.models.shared_models import InvokationResult from oss.src.services.db_manager import ( @@ -115,8 +115,6 @@ Query, ) -from oss.src.core.workflows.dtos import Tree - from oss.src.core.evaluations.utils import get_metrics_keys_from_schema diff --git a/api/oss/src/tasks/evaluations/legacy.py b/api/oss/src/tasks/evaluations/legacy.py index a3f430ad3c..095715b82a 100644 --- a/api/oss/src/tasks/evaluations/legacy.py +++ b/api/oss/src/tasks/evaluations/legacy.py @@ -10,7 +10,7 @@ from oss.src.utils.helpers import parse_url, get_slug_from_name_and_id from oss.src.utils.logging import get_module_logger from oss.src.utils.common import is_ee -from oss.src.services.auth_helper import sign_secret_token +from oss.src.services.auth_service import sign_secret_token from oss.src.services import llm_apps_service from oss.src.models.shared_models import InvokationResult from oss.src.services.db_manager import ( diff --git a/api/oss/src/tasks/evaluations/live.py b/api/oss/src/tasks/evaluations/live.py index fcf4246d92..d81d99daf2 100644 --- a/api/oss/src/tasks/evaluations/live.py +++ b/api/oss/src/tasks/evaluations/live.py @@ -7,7 +7,7 @@ from fastapi import Request from oss.src.utils.logging import get_module_logger -from oss.src.services.auth_helper import sign_secret_token +from oss.src.services.auth_service import sign_secret_token from oss.src.services.db_manager import get_project_by_id from oss.src.core.secrets.utils import get_llm_providers_secrets diff --git a/examples/jupyter/evaluation/quick-start.ipynb b/examples/jupyter/evaluation/quick-start.ipynb index bfad74c855..de32d1eb16 100644 --- a/examples/jupyter/evaluation/quick-start.ipynb +++ b/examples/jupyter/evaluation/quick-start.ipynb @@ -92,7 +92,7 @@ "text": [ "2025-11-12T13:22:23.599Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - SDK ver: 0.62.1 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", "2025-11-12T13:22:23.600Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - API URL: https://cloud.agenta.ai/api \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-11-12T13:22:23.600Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OLTP URL: https://cloud.agenta.ai/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n", + "2025-11-12T13:22:23.600Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OTLP URL: https://cloud.agenta.ai/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n", "✅ Agenta SDK initialized!\n" ] } diff --git a/examples/jupyter/evaluation/testset-management.ipynb b/examples/jupyter/evaluation/testset-management.ipynb index 8e1a9d6312..045a6c8d8d 100644 --- a/examples/jupyter/evaluation/testset-management.ipynb +++ b/examples/jupyter/evaluation/testset-management.ipynb @@ -32,7 +32,7 @@ "text": [ "2025-10-23T16:46:06.701Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - SDK version: 0.51.2 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", "2025-10-23T16:46:06.702Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - Host: http://144.76.237.122 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-10-23T16:46:06.702Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OLTP URL: http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" + "2025-10-23T16:46:06.702Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OTLP URL: http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" ] } ], diff --git a/examples/jupyter/observability/quickstart.ipynb b/examples/jupyter/observability/quickstart.ipynb index a16891c606..333ba7f875 100644 --- a/examples/jupyter/observability/quickstart.ipynb +++ b/examples/jupyter/observability/quickstart.ipynb @@ -200,7 +200,7 @@ "text": [ "2025-10-28T10:24:18.411Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - SDK version: 0.59.6 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", "2025-10-28T10:24:18.411Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - Host: https://cloud.agenta.ai \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-10-28T10:24:18.412Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OLTP URL: https://cloud.agenta.ai/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" + "2025-10-28T10:24:18.412Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OTLP URL: https://cloud.agenta.ai/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" ] } ], diff --git a/examples/jupyter/prompt-management/how-to-prompt-management.ipynb b/examples/jupyter/prompt-management/how-to-prompt-management.ipynb index abc071f818..219cc1481b 100644 --- a/examples/jupyter/prompt-management/how-to-prompt-management.ipynb +++ b/examples/jupyter/prompt-management/how-to-prompt-management.ipynb @@ -130,8 +130,8 @@ "output_type": "stream", "text": [ "2025-09-11T13:55:47.580Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - SDK version: 0.51.6 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-09-11T13:55:47.582Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - Host: http://144.76.237.122 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-09-11T13:55:47.583Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OLTP URL: https://http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" + "2025-09-11T13:55:47.582Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - Host: https://cloud.agenta.ai \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", + "2025-09-11T13:55:47.583Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OTLP URL: https://cloud.agenta.ai/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" ] }, { @@ -823,4 +823,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/examples/jupyter/prompt-management/manage-prompts-with-sdk-tutorial.ipynb b/examples/jupyter/prompt-management/manage-prompts-with-sdk-tutorial.ipynb index 3eaa4c44e7..3dd1294903 100644 --- a/examples/jupyter/prompt-management/manage-prompts-with-sdk-tutorial.ipynb +++ b/examples/jupyter/prompt-management/manage-prompts-with-sdk-tutorial.ipynb @@ -148,7 +148,7 @@ "text": [ "2025-09-11T15:56:10.922Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - SDK version: 0.51.6 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", "2025-09-11T15:56:10.923Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - Host: http://144.76.237.122 \u001b[38;5;245m[agenta.sdk.agenta_init]\u001b[0m \n", - "2025-09-11T15:56:10.923Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OLTP URL: http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" + "2025-09-11T15:56:10.923Z \u001b[38;5;70m[INFO.]\u001b[0m Agenta - OTLP URL: http://144.76.237.122/api/otlp/v1/traces \u001b[38;5;245m[agenta.sdk.tracing.tracing]\u001b[0m \n" ] }, { diff --git a/sdk/agenta/sdk/agenta_init.py b/sdk/agenta/sdk/agenta_init.py index 0ab812645c..807ab94056 100644 --- a/sdk/agenta/sdk/agenta_init.py +++ b/sdk/agenta/sdk/agenta_init.py @@ -70,7 +70,7 @@ def init( """ - log.info("Agenta - SDK ver: %s", version("agenta")) + log.info("Agenta - SDK ver: %s", version("agenta")) config = {} if config_fname: @@ -118,7 +118,7 @@ def init( or None # NO FALLBACK ) - log.info("Agenta - API URL: %s", self.api_url) + log.info("Agenta - API URL: %s", self.api_url) self.scope_type = ( scope_type diff --git a/sdk/agenta/sdk/engines/tracing/tracing.py b/sdk/agenta/sdk/engines/tracing/tracing.py index 081effcd39..9c045dd4fb 100644 --- a/sdk/agenta/sdk/engines/tracing/tracing.py +++ b/sdk/agenta/sdk/engines/tracing/tracing.py @@ -114,7 +114,7 @@ def configure( # TRACE PROCESSORS -- OTLP try: - log.info("Agenta - OLTP URL: %s", self.otlp_url) + log.info("Agenta - OTLP URL: %s", self.otlp_url) _otlp = TraceProcessor( OTLPExporter( @@ -127,7 +127,7 @@ def configure( self.tracer_provider.add_span_processor(_otlp) except: # pylint: disable=bare-except - log.warning("Agenta - OLTP unreachable, skipping exports.") + log.warning("Agenta - OTLP unreachable, skipping exports.") # GLOBAL TRACER PROVIDER -- INSTRUMENTATION LIBRARIES set_tracer_provider(self.tracer_provider) diff --git a/sdk/agenta/sdk/tracing/tracing.py b/sdk/agenta/sdk/tracing/tracing.py index 5f9c082d41..e66b939e3f 100644 --- a/sdk/agenta/sdk/tracing/tracing.py +++ b/sdk/agenta/sdk/tracing/tracing.py @@ -101,7 +101,7 @@ def configure( # TRACE PROCESSORS -- OTLP try: - log.info("Agenta - OLTP URL: %s", self.otlp_url) + log.info("Agenta - OTLP URL: %s", self.otlp_url) _otlp = TraceProcessor( OTLPExporter( @@ -114,7 +114,7 @@ def configure( self.tracer_provider.add_span_processor(_otlp) except: # pylint: disable=bare-except - log.warning("Agenta - OLTP unreachable, skipping exports.") + log.warning("Agenta - OTLP unreachable, skipping exports.") # --- INLINE if inline: