diff --git a/docker-compose.yml b/docker-compose.yml index 1bb4004..c83c627 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,16 +10,28 @@ services: - "8000:8000" environment: - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/clestiq_shield - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-gateway + - DD_ENV=development + - DD_SERVICE=clestiq-shield-gateway + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true - SENTINEL_SERVICE_URL=http://sentinel:8001 - TELEMETRY_ENABLED=true depends_on: - db - sentinel - - otel-collector + - datadog-agent volumes: - ./services/gateway/app:/app/app + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-gateway" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -38,16 +50,28 @@ services: - "8003" environment: - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/clestiq_shield - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-eagle-eye + - DD_ENV=development + - DD_SERVICE=clestiq-shield-eagle-eye + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true - TELEMETRY_ENABLED=true - SECRET_KEY=change_this_to_a_strong_secret_key - ALGORITHM=HS256 depends_on: - db - - otel-collector + - datadog-agent volumes: - ./services/eagle-eye/app:/app/app + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-eagle-eye" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -65,11 +89,16 @@ services: expose: - "8001" environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-sentinel - - GCP_PROJECT_ID=${GCP_PROJECT_ID} - - GCP_LOCATION=${GCP_LOCATION:-us-east1} - - GOOGLE_APPLICATION_CREDENTIALS=/app/gcp-credentials.json + - DD_ENV=development + - DD_SERVICE=clestiq-shield-sentinel + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true + - GEMINI_API_KEY=${GEMINI_API_KEY} - TELEMETRY_ENABLED=true # Security Settings - SECURITY_SANITIZATION_ENABLED=true @@ -82,16 +111,20 @@ services: - TOON_CONVERSION_ENABLED=true # LLM Settings - LLM_FORWARD_ENABLED=true - - LLM_MODEL_NAME=gemini-2.0-flash + - LLM_MODEL_NAME=gemini-2.5-flash - LLM_MAX_TOKENS=8192 # Guardian Service URL - GUARDIAN_SERVICE_URL=http://guardian:8002 depends_on: - - otel-collector + - datadog-agent - guardian volumes: - ./services/security-agent/app:/app/app - - ./gcp-credentials.json:/app/gcp-credentials.json:ro + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-sentinel" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -109,11 +142,16 @@ services: expose: - "8002" environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-guardian - - GCP_PROJECT_ID=${GCP_PROJECT_ID} - - GCP_LOCATION=${GCP_LOCATION:-us-east1} - - GOOGLE_APPLICATION_CREDENTIALS=/app/gcp-credentials.json + - DD_ENV=development + - DD_SERVICE=clestiq-shield-guardian + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true + - GEMINI_API_KEY=${GEMINI_API_KEY} - TELEMETRY_ENABLED=true # Moderation Settings - DEFAULT_MODERATION_MODE=moderate @@ -122,10 +160,14 @@ services: - OUTPUT_PII_DETECTION_ENABLED=true - AUTO_CONVERT_TOON_TO_JSON=true depends_on: - - otel-collector + - datadog-agent volumes: - ./services/guardian/app:/app/app - - ./gcp-credentials.json:/app/gcp-credentials.json:ro + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-guardian" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -154,25 +196,56 @@ services: timeout: 5s retries: 5 - # OpenTelemetry Collector - otel-collector: - image: otel/opentelemetry-collector-contrib:0.91.0 - command: ["--config=/etc/otel-collector-config.yaml"] + # Datadog Agent + datadog-agent: + image: gcr.io/datadoghq/agent:latest environment: - DD_API_KEY=${DD_API_KEY} - DD_SITE=${DD_SITE:-datadoghq.com} - - DEPLOYMENT_ENV=development - volumes: - - ./services/otel-collector/config.yaml:/etc/otel-collector-config.yaml:ro + - DD_ENV=development + - DD_APM_ENABLED=true + - DD_APM_NON_LOCAL_TRAFFIC=true + - DD_LOGS_ENABLED=true + - DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL=true + - DD_LOGS_CONFIG_AUTO_MULTI_LINE_DETECTION=true + - DD_CONTAINER_EXCLUDE="name:datadog-agent" + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_DOGSTATSD_NON_LOCAL_TRAFFIC=false + # Universal Service Monitoring + - DD_SYSTEM_PROBE_ENABLED=true + - DD_SYSTEM_PROBE_SERVICE_MONITORING_ENABLED=true + # Profiling intake + - DD_PROFILING_AGENTLESS=false + # Service Monitoring + - DD_SERVICE_MONITORING_ENABLED=true + - DD_PROCESS_CONFIG_PROCESS_COLLECTION_ENABLED=true ports: - - "4317:4317" - - "4318:4318" - - "8888:8888" - - "13133:13133" + - "8126:8126" # APM traces + - "8125:8125/udp" # DogStatsD metrics (fallback) + volumes: + - /var/run/docker.sock:/var/run/docker.sock:ro + - /proc/:/host/proc/:ro + - /sys/fs/cgroup/:/host/sys/fs/cgroup:ro + - /var/lib/docker/containers:/var/lib/docker/containers:ro + - /opt/datadog-agent/run:/opt/datadog-agent/run:rw + - dogstatsd-socket:/var/run/datadog + cap_add: + - SYS_ADMIN + - SYS_RESOURCE + - SYS_PTRACE + - NET_ADMIN + - NET_BROADCAST + - NET_RAW + - IPC_LOCK + - CHOWN + security_opt: + - apparmor:unconfined networks: - clestiq-network healthcheck: - test: ["CMD", "wget", "--spider", "-q", "http://localhost:13133"] + test: ["CMD", "agent", "health"] interval: 30s timeout: 10s retries: 3 @@ -183,3 +256,4 @@ networks: volumes: postgres_data: + dogstatsd-socket: diff --git a/services/eagle-eye/app/core/config.py b/services/eagle-eye/app/core/config.py index fccbcd9..daa5c2b 100644 --- a/services/eagle-eye/app/core/config.py +++ b/services/eagle-eye/app/core/config.py @@ -16,10 +16,11 @@ class Settings(BaseSettings): ALGORITHM: str = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-eagle-eye" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-eagle-eye" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" class Config: case_sensitive = True diff --git a/services/eagle-eye/app/core/telemetry.py b/services/eagle-eye/app/core/telemetry.py index 12b5a50..85c1d11 100644 --- a/services/eagle-eye/app/core/telemetry.py +++ b/services/eagle-eye/app/core/telemetry.py @@ -1,72 +1,49 @@ import logging import sys import structlog -from opentelemetry import trace - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - -# HTTP Log Exporter -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes +from ddtrace import tracer from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_logging(): + """Configure structured logging with Datadog trace context.""" if not settings.TELEMETRY_ENABLED: return - # Create Resource - import socket + # Enable Datadog instrumentation + from ddtrace import patch_all + from ddtrace.runtime import RuntimeMetrics + from ddtrace.profiling import Profiler - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) + patch_all() - # --- OTLP Logging Setup --- - # Create Logger Provider - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) + # Enable Continuous Profiler + profiler = Profiler() + profiler.start() - # Create OTLP Log Exporter (HTTP) - # The endpoint should be full URL for HTTP exporter e.g. http://otel-collector:4318/v1/logs - # But often the exporter appends /v1/logs if missing. - # Let's trust the default behavior of the HTTP exporter with the base endpoint. - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) - - # Add Batch Processor - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) + # Enable runtime metrics + RuntimeMetrics.enable() - # Output logs to stdout as JSON using structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -76,16 +53,14 @@ def setup_logging(): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) - stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] diff --git a/services/eagle-eye/app/main.py b/services/eagle-eye/app/main.py index ba84fc9..48cee92 100644 --- a/services/eagle-eye/app/main.py +++ b/services/eagle-eye/app/main.py @@ -2,13 +2,6 @@ from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import structlog -from opentelemetry import trace -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# Use HTTP Exporter for Traces (port 4318) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from app.core.config import get_settings from app.core.telemetry import setup_logging @@ -26,24 +19,6 @@ from app.api.v1.endpoints import auth, users, apps, api_keys -# Setup Telemetry -def setup_telemetry(app: FastAPI): - if settings.TELEMETRY_ENABLED: - resource = Resource(attributes={"service.name": settings.OTEL_SERVICE_NAME}) - trace.set_tracer_provider(TracerProvider(resource=resource)) - # HTTP Exporter endpoint usually expects /v1/traces appended or handled by class - # OTLPSpanExporter (HTTP) defaults to v1/traces if not present? - # Let's be explicit: endpoint/v1/traces - endpoint = f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - otlp_exporter = OTLPSpanExporter(endpoint=endpoint) - span_processor = BatchSpanProcessor(otlp_exporter) - trace.get_tracer_provider().add_span_processor(span_processor) - # Instrument FastAPI - from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - - FastAPIInstrumentor.instrument_app(app) - - @asynccontextmanager async def lifespan(app: FastAPI): # Startup @@ -76,8 +51,6 @@ async def lifespan(app: FastAPI): allow_headers=["*"], ) -setup_telemetry(app) - app.include_router(auth.router, prefix="/auth", tags=["auth"]) app.include_router( users.router, @@ -100,7 +73,7 @@ async def lifespan(app: FastAPI): async def health_check(): return { "status": "ok", - "service": settings.OTEL_SERVICE_NAME, + "service": settings.DD_SERVICE, "version": settings.VERSION, } diff --git a/services/eagle-eye/pyproject.toml b/services/eagle-eye/pyproject.toml index 8980ee1..1b8057f 100644 --- a/services/eagle-eye/pyproject.toml +++ b/services/eagle-eye/pyproject.toml @@ -13,11 +13,10 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} sqlalchemy = "^2.0.25" asyncpg = "^0.29.0" python-jose = {extras = ["cryptography"], version = "^3.3.0"} -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -httpx = "^0.26.0" +psycopg2-binary = "^2.9.9" +greenlet = "^3.0.3" +ddtrace = "^2.0.0" +httpx = "0.27.0" pydantic-settings = "^2.1.0" structlog = "^24.1.0" passlib = {extras = ["bcrypt"], version = "^1.7.4"} diff --git a/services/gateway/app/core/config.py b/services/gateway/app/core/config.py index a551b5a..ff00ee9 100644 --- a/services/gateway/app/core/config.py +++ b/services/gateway/app/core/config.py @@ -10,10 +10,11 @@ class Settings(BaseSettings): # Database DATABASE_URL: str - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-gateway" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-gateway" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" # Sentinel Service (Input Security) SENTINEL_SERVICE_URL: str = "http://sentinel:8001" diff --git a/services/gateway/app/core/telemetry.py b/services/gateway/app/core/telemetry.py index 663fb6d..6675e1b 100644 --- a/services/gateway/app/core/telemetry.py +++ b/services/gateway/app/core/telemetry.py @@ -1,95 +1,61 @@ import logging import sys import structlog -from opentelemetry import trace, metrics -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# HTTP Exporters -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.instrumentation.logging import LoggingInstrumentor - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider +from ddtrace import tracer, patch_all +from ddtrace.runtime import RuntimeMetrics from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_telemetry(app): + """Configure Datadog APM and structured logging.""" # Skip telemetry setup if disabled (e.g., in test environments) if not settings.TELEMETRY_ENABLED: + # Still configure basic structlog for tests + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.JSONRenderer(), + ], + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) return - import socket + # Patch all supported libraries for automatic instrumentation + # This includes FastAPI, httpx, psycopg2, sqlalchemy, etc. + patch_all() - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) + # Enable Continuous Profiler for code performance analysis + from ddtrace.profiling import Profiler - # Tracing (HTTP) - trace_provider = TracerProvider(resource=resource) - # The HTTP exporter usually needs the full path: /v1/traces - otlp_trace_exporter = OTLPSpanExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - ) - trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter)) - trace.set_tracer_provider(trace_provider) - - # Metrics (HTTP) - # /v1/metrics - otlp_metric_exporter = OTLPMetricExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics" - ) - metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) + profiler = Profiler() + profiler.start() - # --- OTLP Logging Setup (HTTP) --- - # Create Logger Provider - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) + # Enable runtime metrics collection (CPU, memory, etc.) + RuntimeMetrics.enable() - # /v1/logs - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) - - # Add Batch Processor - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) - - # Configure Structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -99,27 +65,22 @@ def setup_telemetry(app): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) - stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] - - # Instrument FastAPI - FastAPIInstrumentor.instrument_app( - app, tracer_provider=trace_provider, meter_provider=meter_provider - ) + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] # Log initialization log = structlog.get_logger() log.info( - "Telemetry and Structlog initialized", service_name=settings.OTEL_SERVICE_NAME + "Datadog APM and Structlog initialized", + service=settings.DD_SERVICE, + env=settings.DD_ENV, ) diff --git a/services/gateway/app/main.py b/services/gateway/app/main.py index 6c8cfa9..272ef42 100644 --- a/services/gateway/app/main.py +++ b/services/gateway/app/main.py @@ -74,7 +74,7 @@ async def health_check(): logger.info("Health check requested") return { "status": "ok", - "service": settings.OTEL_SERVICE_NAME, + "service": settings.DD_SERVICE, "version": settings.VERSION, } diff --git a/services/gateway/pyproject.toml b/services/gateway/pyproject.toml index 4da1942..3c49773 100644 --- a/services/gateway/pyproject.toml +++ b/services/gateway/pyproject.toml @@ -16,13 +16,7 @@ psycopg2-binary = "^2.9.9" sqlalchemy = "^2.0.25" greenlet = "^3.0.3" structlog = "^24.1.0" -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-instrumentation-httpx = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -opentelemetry-instrumentation-logging = "^0.43b0" -python-json-logger = "^2.0.7" +ddtrace = "^2.0.0" httpx = "0.27.0" [tool.poetry.group.dev.dependencies] diff --git a/services/guardian/app/agents/nodes/content_filter.py b/services/guardian/app/agents/nodes/content_filter.py index 2a6b565..79a429c 100644 --- a/services/guardian/app/agents/nodes/content_filter.py +++ b/services/guardian/app/agents/nodes/content_filter.py @@ -7,8 +7,9 @@ import re import time -from typing import Dict, Any, List, Tuple -from langchain_google_vertexai import ChatVertexAI +from typing import Dict, Any + +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_content_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -51,7 +52,14 @@ def get_content_llm(): global _content_llm if _content_llm is None: - _content_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) + from langchain_google_genai import ChatGoogleGenerativeAI + + settings = get_settings() + _content_llm = ChatGoogleGenerativeAI( + model="gemini-2.0-flash-exp", + google_api_key=settings.GEMINI_API_KEY, + temperature=0, + ) return _content_llm diff --git a/services/guardian/app/agents/nodes/hallucination_detector.py b/services/guardian/app/agents/nodes/hallucination_detector.py index 7f04403..b22bd11 100644 --- a/services/guardian/app/agents/nodes/hallucination_detector.py +++ b/services/guardian/app/agents/nodes/hallucination_detector.py @@ -7,7 +7,8 @@ import time from typing import Dict, Any -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_judge_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -23,7 +24,16 @@ def get_judge_llm(): global _judge_llm if _judge_llm is None: - _judge_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) + from langchain_google_genai import ChatGoogleGenerativeAI + + from app.core.config import get_settings + + settings = get_settings() + _judge_llm = ChatGoogleGenerativeAI( + model="gemini-2.0-flash-exp", + google_api_key=settings.GEMINI_API_KEY, + temperature=0, + ) return _judge_llm diff --git a/services/guardian/app/agents/nodes/tone_checker.py b/services/guardian/app/agents/nodes/tone_checker.py index b89a024..2295ec7 100644 --- a/services/guardian/app/agents/nodes/tone_checker.py +++ b/services/guardian/app/agents/nodes/tone_checker.py @@ -7,7 +7,8 @@ import time from typing import Dict, Any -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_tone_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -22,7 +23,16 @@ def get_tone_llm(): global _tone_llm if _tone_llm is None: - _tone_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) + from langchain_google_genai import ChatGoogleGenerativeAI + + from app.core.config import get_settings + + settings = get_settings() + _tone_llm = ChatGoogleGenerativeAI( + model="gemini-2.0-flash-exp", + google_api_key=settings.GEMINI_API_KEY, + temperature=0, + ) return _tone_llm diff --git a/services/guardian/app/core/config.py b/services/guardian/app/core/config.py index 90125ce..ee14db7 100644 --- a/services/guardian/app/core/config.py +++ b/services/guardian/app/core/config.py @@ -6,15 +6,14 @@ class Settings(BaseSettings): PROJECT_NAME: str = "Clestiq Shield - Guardian (Output Validation)" VERSION: str = "1.0.0" - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-guardian" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-guardian" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" - # Google Cloud / Vertex AI - GCP_PROJECT_ID: str - GCP_LOCATION: str = "us-east1" - GOOGLE_APPLICATION_CREDENTIALS: str | None = None + # Gemini AI Studio + GEMINI_API_KEY: str # Moderation Settings DEFAULT_MODERATION_MODE: str = "moderate" # strict, moderate, relaxed, raw diff --git a/services/guardian/app/core/metrics.py b/services/guardian/app/core/metrics.py index 53abc32..3198ed6 100644 --- a/services/guardian/app/core/metrics.py +++ b/services/guardian/app/core/metrics.py @@ -1,27 +1,29 @@ """ Guardian Metrics Module. -Tracks output validation, content filtering, and guardrails metrics for Datadog. +Tracks output validation, content filtering, and guardrail metrics for Datadog. +NOTE: OpenTelemetry removed - using Datadog APM only. This module provides +no-op implementations to maintain API compatibility. """ -from opentelemetry import metrics from typing import Optional import structlog logger = structlog.get_logger() -_meter: Optional[metrics.Meter] = None +class NoOpMetric: + """No-op metric that does nothing but maintains API compatibility.""" -def get_meter() -> metrics.Meter: - global _meter - if _meter is None: - _meter = metrics.get_meter("clestiq.guardian.agent", version="1.0.0") - return _meter + def add(self, value, attributes=None): + pass + + def record(self, value, attributes=None): + pass class GuardianMetrics: - """Singleton for Guardian-specific metrics.""" + """Singleton for Guardian-specific metrics (no-op - using Datadog APM instead).""" _instance = None _initialized = False @@ -35,103 +37,34 @@ def __init__(self): if GuardianMetrics._initialized: return - meter = get_meter() - - # Request metrics - self.requests_total = meter.create_counter( - name="guardian.requests_total", - description="Total validation requests", - unit="1", - ) - - self.requests_by_mode = meter.create_counter( - name="guardian.requests_by_mode", - description="Requests by moderation mode", - unit="1", - ) - - # Content filtering metrics - self.content_filtered = meter.create_counter( - name="guardian.content_filtered", - description="Content filtered by category", - unit="1", - ) - - self.content_by_action = meter.create_counter( - name="guardian.content_by_action", - description="Content actions (block, warn, allow)", - unit="1", - ) - - # PII leak detection - self.pii_leaks_detected = meter.create_counter( - name="guardian.pii_leaks_detected", - description="PII leaks detected in LLM output", - unit="1", - ) - - # TOON conversion - self.toon_conversions = meter.create_counter( - name="guardian.toon_conversions", - description="TOON to JSON conversions", - unit="1", - ) - - # Latency histograms - self.validation_latency = meter.create_histogram( - name="guardian.validation_latency_ms", - description="Total validation latency", - unit="ms", - ) - - self.content_filter_latency = meter.create_histogram( - name="guardian.content_filter_latency_ms", - description="Content filtering latency", - unit="ms", - ) - - self.pii_scan_latency = meter.create_histogram( - name="guardian.pii_scan_latency_ms", - description="PII scanning latency", - unit="ms", - ) + # All metrics are no-ops now - Datadog APM provides automatic metrics + self.requests_total = NoOpMetric() + self.requests_by_mode = NoOpMetric() + self.content_filtered = NoOpMetric() + self.content_by_action = NoOpMetric() + self.pii_leaks_detected = NoOpMetric() + self.toon_conversions = NoOpMetric() + self.validation_latency = NoOpMetric() + self.content_filter_latency = NoOpMetric() + self.pii_scan_latency = NoOpMetric() GuardianMetrics._initialized = True - logger.info("Guardian metrics initialized") + logger.info("Guardian metrics initialized (no-op - using Datadog APM)") def record_request(self, moderation_mode: str): - self.requests_total.add(1) - self.requests_by_mode.add(1, {"mode": moderation_mode}) + pass # No-op def record_content_filtered(self, category: str, action: str): - self.content_filtered.add(1, {"category": category}) - self.content_by_action.add(1, {"action": action}) + pass # No-op def record_pii_leak(self, pii_type: str): - self.pii_leaks_detected.add(1, {"pii_type": pii_type}) + pass # No-op def record_toon_conversion(self, success: bool): - self.toon_conversions.add(1, {"success": str(success)}) + pass # No-op def record_latency(self, stage: str, latency_ms: float): - if stage == "validation": - self.validation_latency.record(latency_ms) - elif stage == "content_filter": - self.content_filter_latency.record(latency_ms) - elif stage == "pii_scan": - self.pii_scan_latency.record(latency_ms) - # NEW: Support for advanced validation stages - elif stage in [ - "hallucination_check", - "citation_verification", - "tone_check", - "disclaimer_injection", - "refusal_detection", - ]: - # Log these under validation_latency for now - self.validation_latency.record(latency_ms) - else: - logger.debug(f"Unknown stage for latency: {stage}") + pass # No-op _guardian_metrics: Optional[GuardianMetrics] = None diff --git a/services/guardian/app/core/telemetry.py b/services/guardian/app/core/telemetry.py index 857b37d..0f3c788 100644 --- a/services/guardian/app/core/telemetry.py +++ b/services/guardian/app/core/telemetry.py @@ -1,48 +1,34 @@ import logging import sys import structlog -from opentelemetry import trace, metrics -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# HTTP Exporters -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.instrumentation.logging import LoggingInstrumentor - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider +from ddtrace import tracer, patch_all +from ddtrace.runtime import RuntimeMetrics from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_telemetry(app): + """Configure Datadog APM and structured logging.""" + # Skip telemetry setup if disabled (e.g., in test environments) if not settings.TELEMETRY_ENABLED: log = structlog.get_logger() - log.info("Telemetry disabled") + log.info("Telemetry disabled, skipping Datadog APM initialization") + + # Still configure basic structlog for tests structlog.configure( processors=[ structlog.contextvars.merge_contextvars, @@ -55,52 +41,24 @@ def setup_telemetry(app): ) return - import socket + # Patch all supported libraries for automatic instrumentation + # This includes FastAPI, httpx, langchain, etc. + patch_all() - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) + # Enable Continuous Profiler for code performance analysis + from ddtrace.profiling import Profiler - # Tracing (HTTP) - trace_provider = TracerProvider(resource=resource) - # The HTTP exporter usually needs the full path: /v1/traces - otlp_trace_exporter = OTLPSpanExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - ) - trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter)) - trace.set_tracer_provider(trace_provider) + profiler = Profiler() + profiler.start() - # Metrics (HTTP) - # /v1/metrics - otlp_metric_exporter = OTLPMetricExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics" - ) - metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) - - # --- OTLP Logging Setup (HTTP) --- - # Create Logger Provider - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) - - # /v1/logs - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) + # Enable runtime metrics collection (CPU, memory, etc.) + RuntimeMetrics.enable() - # Add Batch Processor - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) - - # Configure Structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -110,25 +68,22 @@ def setup_telemetry(app): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) - stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] - - # Instrument FastAPI - FastAPIInstrumentor.instrument_app( - app, tracer_provider=trace_provider, meter_provider=meter_provider - ) + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] # Log initialization log = structlog.get_logger() - log.info("Guardian telemetry initialized", service_name=settings.OTEL_SERVICE_NAME) + log.info( + "Datadog APM and Structlog initialized", + service=settings.DD_SERVICE, + env=settings.DD_ENV, + ) diff --git a/services/guardian/app/main.py b/services/guardian/app/main.py index a1564cc..ccface5 100644 --- a/services/guardian/app/main.py +++ b/services/guardian/app/main.py @@ -18,16 +18,31 @@ async def lifespan(app: FastAPI): app = FastAPI(title=settings.PROJECT_NAME, version=settings.VERSION, lifespan=lifespan) +print("DEBUG: Setting up telemetry...", flush=True) # Setup telemetry IMMEDIATELY setup_telemetry(app) +print("DEBUG: Telemetry setup complete", flush=True) # Initialize global logger after telemetry +print("DEBUG: Initializing structlog...", flush=True) logger = structlog.get_logger() +print("DEBUG: Structlog initialized", flush=True) # Import modules AFTER logging is configured to ensure they use the correct logger factory -from app.agents.graph import guardian_graph +print("DEBUG: Importing app.agents.graph...", flush=True) +try: + from app.agents.graph import guardian_graph + print("DEBUG: Imported app.agents.graph successfully", flush=True) +except Exception as e: + print(f"DEBUG: Failed to import app.agents.graph: {e}", flush=True) + import traceback + traceback.print_exc() + raise + +print("DEBUG: Importing schemas and metrics...", flush=True) from app.schemas.validation import ValidateRequest, ValidateResponse from app.core.metrics import get_guardian_metrics +print("DEBUG: Imports completed successfully", flush=True) @app.get("/health") @@ -35,7 +50,7 @@ async def health_check(): """Health check endpoint.""" return { "status": "ok", - "service": settings.OTEL_SERVICE_NAME, + "service": settings.DD_SERVICE, "version": settings.VERSION, } diff --git a/services/guardian/pyproject.toml b/services/guardian/pyproject.toml index 88d947a..e74810d 100644 --- a/services/guardian/pyproject.toml +++ b/services/guardian/pyproject.toml @@ -12,21 +12,16 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -opentelemetry-instrumentation-logging = "^0.43b0" -python-json-logger = "^2.0.7" -langchain = "1.0.0" -langgraph = "1.0.0" -langchain-google-vertexai = "3.1.0" +langchain-core = "^1.2.2" +langchain-google-genai = "^4.1.2" +langgraph = "^1.0.0" bleach = "^6.1.0" +httpx = "^0.28.0" +ddtrace = "^2.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" pytest-asyncio = "^0.23.5" -httpx = ">=0.28.0,<1.0.0" pytest-cov = "^4.1.0" [build-system] diff --git a/services/otel-collector/config.yaml b/services/otel-collector/config.yaml deleted file mode 100644 index c398107..0000000 --- a/services/otel-collector/config.yaml +++ /dev/null @@ -1,83 +0,0 @@ -receivers: - otlp: - protocols: - grpc: - endpoint: 0.0.0.0:4317 - http: - endpoint: 0.0.0.0:4318 - -processors: - batch: - timeout: 10s - send_batch_size: 1024 - - # Resource processor to ensure deployment.environment is set - resource: - attributes: - - key: deployment.environment - value: ${env:DEPLOYMENT_ENV} - action: insert - - # Datadog Tagging Improvements - - # 1. Source Tag: Datadog looks for 'ddsource' - - key: ddsource - value: clestiq-shield-development - action: upsert - -exporters: - logging: - loglevel: debug - - datadog: - api: - site: ${DD_SITE} - key: ${DD_API_KEY} - - # Host metadata configuration - # 'first_resource' uses the host.name we added to Python resources. - host_metadata: - enabled: true - hostname_source: first_resource - - # Metrics configuration - metrics: - summaries: - mode: gauges - - # Traces configuration - traces: - span_name_as_resource_name: true - compute_stats_by_span_kind: true - - # Logs configuration - logs: - dump_payloads: false - -service: - pipelines: - traces: - receivers: [otlp] - processors: [batch, resource] - exporters: [logging, datadog] - - metrics: - receivers: [otlp] - processors: [batch, resource] - exporters: [logging, datadog] - - logs: - receivers: [otlp] - processors: [batch, resource] - exporters: [logging, datadog] - - # Extensions for health checks - extensions: [] - - # Telemetry for the collector itself - telemetry: - logs: - level: info - metrics: - level: detailed - address: 0.0.0.0:8888 diff --git a/services/security-agent/app/agents/nodes/llm_responder.py b/services/security-agent/app/agents/nodes/llm_responder.py index 015320b..8b76877 100644 --- a/services/security-agent/app/agents/nodes/llm_responder.py +++ b/services/security-agent/app/agents/nodes/llm_responder.py @@ -1,12 +1,13 @@ """ LLM Responder Node. -Routes queries to Gemini models via Vertex AI. +Routes queries to Gemini models via Gemini AI Studio. """ import time from typing import Dict, Any, Optional -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_llm from langchain_core.messages import HumanMessage, SystemMessage import httpx import structlog @@ -18,33 +19,44 @@ # Gemini Models Only (for now) SUPPORTED_MODELS = { - "gemini-2.0-flash": "gemini-2.0-flash", - "gemini-2.0": "gemini-2.0-flash", - "gemini": "gemini-2.0-flash", - "default": "gemini-2.0-flash", + "gemini-2.5-flash": "gemini-2.5-flash", + "default": "gemini-2.5-flash", } -_llm_cache: Dict[str, ChatVertexAI] = {} +_llm_cache: Dict[str, Any] = {} def get_model_name(requested: str) -> str: - """Get the Vertex AI model name.""" + """Get the Gemini AI model name.""" + settings = get_settings() + default_model = settings.LLM_MODEL_NAME + if not requested: - return SUPPORTED_MODELS["default"] - return SUPPORTED_MODELS.get(requested.lower().strip(), SUPPORTED_MODELS["default"]) + return default_model + + # Check if exact match in supported models + if requested in SUPPORTED_MODELS: + return SUPPORTED_MODELS[requested] + # Check if exact match in supported models values + if requested in SUPPORTED_MODELS.values(): + return requested -def get_llm(model_name: str) -> ChatVertexAI: + return SUPPORTED_MODELS.get(requested.lower().strip(), default_model) + + +def get_llm(model_name: str) -> Any: """Get or create LLM instance.""" global _llm_cache if model_name not in _llm_cache: + from langchain_google_genai import ChatGoogleGenerativeAI + settings = get_settings() - _llm_cache[model_name] = ChatVertexAI( - model_name=model_name, + _llm_cache[model_name] = ChatGoogleGenerativeAI( + model=model_name, + google_api_key=settings.GEMINI_API_KEY, max_tokens=settings.LLM_MAX_TOKENS, - project=settings.GCP_PROJECT_ID, - location=settings.GCP_LOCATION, ) logger.info("Created LLM", model=model_name) @@ -112,8 +124,6 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: logger.info("LLM request", model=model_name) - start_time = time.perf_counter() - try: llm = get_llm(model_name) @@ -122,8 +132,9 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: HumanMessage(content=query), ] + llm_start = time.perf_counter() response = await llm.ainvoke(messages) - llm_latency = (time.perf_counter() - start_time) * 1000 + llm_latency = (time.perf_counter() - llm_start) * 1000 response_text = ( response.content if hasattr(response, "content") else str(response) diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index bf0eca7..37aef6b 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -2,7 +2,8 @@ import time import json from datetime import datetime -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_llm to avoid import-time issues from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser from app.agents.state import AgentState @@ -83,14 +84,22 @@ def log_security_event( Output ONLY JSON. """ -# Initialize LLM lazily to avoid import-time auth errors +# Initialize LLM lazily to avoid import-time issues _llm = None def get_llm(): + """Get or create the LLM instance (singleton pattern).""" global _llm if _llm is None: - _llm = ChatVertexAI(model_name="gemini-2.0-flash-exp") + from langchain_google_genai import ChatGoogleGenerativeAI + + settings = get_settings() + _llm = ChatGoogleGenerativeAI( + model=settings.LLM_MODEL_NAME, + google_api_key=settings.GEMINI_API_KEY, + ) + logger.info("LLM initialized", model=settings.LLM_MODEL_NAME) return _llm @@ -194,6 +203,7 @@ async def security_check(state: AgentState) -> Dict[str, Any]: ) # Step 3: Threat Detection + logger.info("Starting Threat Detection...") stage_start = time.perf_counter() threats = [] @@ -221,6 +231,7 @@ async def security_check(state: AgentState) -> Dict[str, Any]: threats.append(path_threat) stage_latency = (time.perf_counter() - stage_start) * 1000 + logger.info(f"Threat Detection completed in {stage_latency:.2f}ms") metrics.record_stage_latency("threat_detection", stage_latency) metrics_builder.add_latency("threat_detection", stage_latency) @@ -262,18 +273,43 @@ async def security_check(state: AgentState) -> Dict[str, Any]: "metrics_data": metrics_builder.build(), } - # Step 4: LLM-based Security Analysis - stage_start = time.perf_counter() - prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) - chain = prompt | get_llm() | JsonOutputParser() - - llm_result = await chain.ainvoke( - {"input": user_input, "threshold": settings.SECURITY_LLM_CHECK_THRESHOLD} - ) - - stage_latency = (time.perf_counter() - stage_start) * 1000 - metrics.record_stage_latency("llm_check", stage_latency) - metrics_builder.add_latency("llm_check", stage_latency) + # Step 4: LLM Security Analysis (if enabled) + llm_result = {} + if settings.SECURITY_LLM_CHECK_ENABLED: + # Reconstruct the chain manually as before + try: + from langchain_core.prompts import ChatPromptTemplate + from langchain_core.output_parsers import JsonOutputParser + + prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) + llm = get_llm() + parser = JsonOutputParser() + + chain = prompt | llm | parser + except Exception as e: + logger.error(f"Failed to initialize LLM chain: {e}") + raise + + # Using current time for detailed timing + llm_start = time.perf_counter() + + try: + # Add a timeout to the LLM call if possible or just log around it + llm_result = await chain.ainvoke( + { + "input": user_input, + "threshold": settings.SECURITY_LLM_CHECK_THRESHOLD, + } + ) + except Exception as llm_exc: + logger.error(f"LLM chain raised exception: {llm_exc}") + # Depending on policy, you might want to block here or set a default score + # For now, re-raise to be caught by the outer try/except + raise + + check_time_ms = (time.perf_counter() - llm_start) * 1000 + metrics.record_stage_latency("llm_check", check_time_ms) + metrics_builder.add_latency("llm_check", check_time_ms) score = llm_result.get("security_score", 0.0) is_blocked = llm_result.get("is_blocked", False) diff --git a/services/security-agent/app/core/config.py b/services/security-agent/app/core/config.py index c5eb2d8..9c92846 100644 --- a/services/security-agent/app/core/config.py +++ b/services/security-agent/app/core/config.py @@ -6,15 +6,14 @@ class Settings(BaseSettings): PROJECT_NAME: str = "Clestiq Shield - Sentinel (Input Security)" VERSION: str = "1.0.0" - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-sentinel" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-sentinel" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" - # Google Cloud / Vertex AI - GCP_PROJECT_ID: str - GCP_LOCATION: str = "us-east1" - GOOGLE_APPLICATION_CREDENTIALS: str | None = None + # Gemini AI Studio + GEMINI_API_KEY: str # Security Settings SECURITY_SANITIZATION_ENABLED: bool = True @@ -22,6 +21,7 @@ class Settings(BaseSettings): SECURITY_XSS_PROTECTION_ENABLED: bool = True SECURITY_SQL_INJECTION_DETECTION_ENABLED: bool = True SECURITY_COMMAND_INJECTION_DETECTION_ENABLED: bool = True + SECURITY_LLM_CHECK_ENABLED: bool = True SECURITY_LLM_CHECK_THRESHOLD: float = 0.85 # TOON Conversion Settings diff --git a/services/security-agent/app/core/metrics.py b/services/security-agent/app/core/metrics.py index f5cdcb5..424cce4 100644 --- a/services/security-agent/app/core/metrics.py +++ b/services/security-agent/app/core/metrics.py @@ -1,16 +1,10 @@ """ Comprehensive metrics module for Datadog observability. -This module provides custom OpenTelemetry metrics for tracking: -- Attack prevention (by type) -- PII redactions (by type) -- Token usage and savings -- Request latency by stage -- Overall security request stats +NOTE: OpenTelemetry removed - using Datadog APM only. This module provides +no-op implementations to maintain API compatibility. """ -from opentelemetry import metrics -from opentelemetry.metrics import Counter, Histogram, UpDownCounter from typing import Dict, Any, Optional import structlog import time @@ -19,25 +13,19 @@ logger = structlog.get_logger() -# Get the global meter -_meter: Optional[metrics.Meter] = None +class NoOpMetric: + """No-op metric that does nothing but maintains API compatibility.""" -def get_meter() -> metrics.Meter: - """Get or create the global meter for security metrics.""" - global _meter - if _meter is None: - _meter = metrics.get_meter("clestiq.security.agent", version="1.0.0") - return _meter + def add(self, value, attributes=None): + pass - -# ============================================================================ -# COUNTERS - Aggregate totals -# ============================================================================ + def record(self, value, attributes=None): + pass class SecurityMetrics: - """Singleton class managing all security-related metrics.""" + """Singleton class managing all security-related metrics (no-op - using Datadog APM).""" _instance = None _initialized = False @@ -51,173 +39,53 @@ def __init__(self): if SecurityMetrics._initialized: return - meter = get_meter() - - # ---- Attack Prevention Metrics ---- - self.attacks_prevented = meter.create_counter( - name="security.attacks_prevented", - description="Total number of attacks prevented", - unit="1", - ) - - self.attacks_by_type = meter.create_counter( - name="security.attacks_by_type", - description="Attacks prevented broken down by attack type", - unit="1", - ) - - # ---- PII Redaction Metrics ---- - self.pii_redactions = meter.create_counter( - name="security.pii_redactions", - description="Total PII items redacted", - unit="1", - ) - - self.pii_by_type = meter.create_counter( - name="security.pii_by_type", - description="PII redactions by type (SSN, CC, EMAIL, PHONE, etc.)", - unit="1", - ) - - # ---- Token Metrics ---- - self.tokens_saved = meter.create_counter( - name="security.tokens_saved", - description="Tokens saved by TOON conversion", - unit="1", - ) - - self.llm_tokens_input = meter.create_counter( - name="security.llm_tokens_input", - description="Input tokens sent to LLM", - unit="1", - ) - - self.llm_tokens_output = meter.create_counter( - name="security.llm_tokens_output", - description="Output tokens received from LLM", - unit="1", - ) - - self.llm_tokens_total = meter.create_counter( - name="security.llm_tokens_total", - description="Total tokens used (input + output)", - unit="1", - ) - - # ---- Request Metrics ---- - self.requests_total = meter.create_counter( - name="security.requests_total", - description="Total security analysis requests", - unit="1", - ) - - self.requests_blocked = meter.create_counter( - name="security.requests_blocked", - description="Requests blocked by security checks", - unit="1", - ) - - self.requests_passed = meter.create_counter( - name="security.requests_passed", - description="Requests that passed security checks", - unit="1", - ) - - # ---- Latency Histograms ---- - self.request_latency = meter.create_histogram( - name="security.request_latency_ms", - description="End-to-end request latency in milliseconds", - unit="ms", - ) - - self.sanitization_latency = meter.create_histogram( - name="security.sanitization_latency_ms", - description="Input sanitization latency in milliseconds", - unit="ms", - ) - - self.pii_detection_latency = meter.create_histogram( - name="security.pii_detection_latency_ms", - description="PII detection and redaction latency in milliseconds", - unit="ms", - ) - - self.threat_detection_latency = meter.create_histogram( - name="security.threat_detection_latency_ms", - description="Threat detection latency in milliseconds", - unit="ms", - ) - - self.llm_check_latency = meter.create_histogram( - name="security.llm_check_latency_ms", - description="LLM security check latency in milliseconds", - unit="ms", - ) - - self.toon_conversion_latency = meter.create_histogram( - name="security.toon_conversion_latency_ms", - description="TOON conversion latency in milliseconds", - unit="ms", - ) - - self.llm_response_latency = meter.create_histogram( - name="security.llm_response_latency_ms", - description="LLM response generation latency in milliseconds", - unit="ms", - ) - - # ---- Score Distribution ---- - self.threat_score_distribution = meter.create_histogram( - name="security.threat_score", - description="Distribution of threat scores (0.0-1.0)", - unit="1", - ) - - # ---- Active Requests Gauge ---- - self.active_requests = meter.create_up_down_counter( - name="security.active_requests", - description="Currently processing requests", - unit="1", - ) + # All metrics are no-ops now - Datadog APM provides automatic metrics + self.attacks_prevented = NoOpMetric() + self.attacks_by_type = NoOpMetric() + self.pii_redactions = NoOpMetric() + self.pii_by_type = NoOpMetric() + self.tokens_saved = NoOpMetric() + self.llm_tokens_input = NoOpMetric() + self.llm_tokens_output = NoOpMetric() + self.llm_tokens_total = NoOpMetric() + self.requests_total = NoOpMetric() + self.requests_blocked = NoOpMetric() + self.requests_passed = NoOpMetric() + self.request_latency = NoOpMetric() + self.sanitization_latency = NoOpMetric() + self.pii_detection_latency = NoOpMetric() + self.threat_detection_latency = NoOpMetric() + self.llm_check_latency = NoOpMetric() + self.toon_conversion_latency = NoOpMetric() + self.llm_response_latency = NoOpMetric() + self.threat_score_distribution = NoOpMetric() + self.active_requests = NoOpMetric() SecurityMetrics._initialized = True - logger.info("Security metrics initialized") - - # ======================================================================== - # Recording Methods - # ======================================================================== + logger.info("Security metrics initialized (no-op - using Datadog APM)") def record_attack_prevented(self, attack_type: str, count: int = 1): - """Record an attack that was prevented.""" - self.attacks_prevented.add(count) - self.attacks_by_type.add(count, {"attack_type": attack_type}) + """Record an attack that was prevented (no-op).""" logger.info("Attack prevented", attack_type=attack_type, count=count) def record_pii_redaction(self, pii_type: str, count: int = 1): - """Record PII redaction.""" - self.pii_redactions.add(count) - self.pii_by_type.add(count, {"pii_type": pii_type}) + """Record PII redaction (no-op).""" logger.debug("PII redacted", pii_type=pii_type, count=count) def record_tokens_saved(self, tokens: int, conversion_type: str = "toon"): - """Record tokens saved by conversion.""" - self.tokens_saved.add(tokens, {"conversion_type": conversion_type}) + """Record tokens saved by conversion (no-op).""" logger.debug("Tokens saved", tokens=tokens, conversion_type=conversion_type) def record_llm_tokens(self, input_tokens: int, output_tokens: int): - """Record LLM token usage.""" + """Record LLM token usage (no-op).""" total = input_tokens + output_tokens - self.llm_tokens_input.add(input_tokens) - self.llm_tokens_output.add(output_tokens) - self.llm_tokens_total.add(total) logger.info( "LLM tokens used", input=input_tokens, output=output_tokens, total=total ) def record_request_start(self): - """Record a new request starting.""" - self.requests_total.add(1) - self.active_requests.add(1) + """Record a new request starting (no-op).""" + pass def record_request_end( self, @@ -226,36 +94,12 @@ def record_request_end( threat_score: float = 0.0, block_reason: Optional[str] = None, ): - """ - Record request completion with detailed tags for Datadog observability. - - Args: - blocked: Whether the request was blocked - latency_ms: Total processing latency - threat_score: Threat confidence score (0.0-1.0) - block_reason: Specific reason for blocking (e.g., 'sql_injection', 'xss') - """ - self.active_requests.add(-1) - - # Prepare tags for detailed filtering in Datadog - tags = { - "security_status": "blocked" if blocked else "passed", - } - + """Record request completion (no-op).""" + tags = {"security_status": "blocked" if blocked else "passed"} if blocked and block_reason: - # Normalize block reason to a tag-safe identifier normalized_reason = block_reason.lower().replace(" ", "_").replace(":", "") tags["block_reason"] = normalized_reason - # Record metrics with tags - self.request_latency.record(latency_ms, tags) - self.threat_score_distribution.record(threat_score, tags) - - if blocked: - self.requests_blocked.add(1, tags) - else: - self.requests_passed.add(1, tags) - logger.info( "Request completed", **tags, @@ -264,28 +108,8 @@ def record_request_end( ) def record_stage_latency(self, stage: str, latency_ms: float): - """Record latency for a specific processing stage.""" - stage_histograms = { - "sanitization": self.sanitization_latency, - "pii_detection": self.pii_detection_latency, - "pii_pseudonymization": self.pii_detection_latency, # Use same histogram - "threat_detection": self.threat_detection_latency, - "llm_check": self.llm_check_latency, - "toon_conversion": self.toon_conversion_latency, - "llm_response": self.llm_response_latency, - } - - histogram = stage_histograms.get(stage) - if histogram: - histogram.record(latency_ms) - else: - logger.warning("Unknown stage for latency recording", stage=stage) - - histogram = stage_histograms.get(stage) - if histogram: - histogram.record(latency_ms) - else: - logger.warning("Unknown stage for latency recording", stage=stage) + """Record latency for a specific processing stage (no-op).""" + pass # Global metrics instance @@ -307,7 +131,7 @@ def get_security_metrics() -> SecurityMetrics: @contextmanager def track_latency(stage: str): - """Context manager to track latency of a code block.""" + """Context manager to track latency of a code block (no-op).""" metrics = get_security_metrics() start_time = time.perf_counter() try: @@ -318,7 +142,7 @@ def track_latency(stage: str): def track_stage(stage: str): - """Decorator to track latency of a function.""" + """Decorator to track latency of a function (no-op).""" def decorator(func): @wraps(func) @@ -331,7 +155,7 @@ def sync_wrapper(*args, **kwargs): with track_latency(stage): return func(*args, **kwargs) - if hasattr(func, "__code__") and func.__code__.co_flags & 0x80: # CO_COROUTINE + if hasattr(func, "__code") and func.__code__.co_flags & 0x80: # CO_COROUTINE return async_wrapper return sync_wrapper diff --git a/services/security-agent/app/core/telemetry.py b/services/security-agent/app/core/telemetry.py index 645fb6e..0f3c788 100644 --- a/services/security-agent/app/core/telemetry.py +++ b/services/security-agent/app/core/telemetry.py @@ -1,49 +1,32 @@ import logging import sys import structlog -from opentelemetry import trace, metrics -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# HTTP Exporters -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.instrumentation.logging import LoggingInstrumentor - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider +from ddtrace import tracer, patch_all +from ddtrace.runtime import RuntimeMetrics from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_telemetry(app): + """Configure Datadog APM and structured logging.""" # Skip telemetry setup if disabled (e.g., in test environments) if not settings.TELEMETRY_ENABLED: log = structlog.get_logger() - log.info("Telemetry disabled, skipping OpenTelemetry initialization") + log.info("Telemetry disabled, skipping Datadog APM initialization") # Still configure basic structlog for tests structlog.configure( @@ -58,48 +41,24 @@ def setup_telemetry(app): ) return - import socket - - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) - - # Tracing (HTTP) - trace_provider = TracerProvider(resource=resource) - otlp_trace_exporter = OTLPSpanExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - ) - trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter)) - trace.set_tracer_provider(trace_provider) - - # Metrics (HTTP) - otlp_metric_exporter = OTLPMetricExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics" - ) - metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) + # Patch all supported libraries for automatic instrumentation + # This includes FastAPI, httpx, langchain, etc. + patch_all() - # --- OTLP Logging Setup (HTTP) --- - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) + # Enable Continuous Profiler for code performance analysis + from ddtrace.profiling import Profiler - # /v1/logs - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) + profiler = Profiler() + profiler.start() - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) + # Enable runtime metrics collection (CPU, memory, etc.) + RuntimeMetrics.enable() - # Configure Structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -109,26 +68,22 @@ def setup_telemetry(app): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] - - # Instrument FastAPI - FastAPIInstrumentor.instrument_app( - app, tracer_provider=trace_provider, meter_provider=meter_provider - ) + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] # Log initialization log = structlog.get_logger() log.info( - "Telemetry and Structlog initialized", service_name=settings.OTEL_SERVICE_NAME + "Datadog APM and Structlog initialized", + service=settings.DD_SERVICE, + env=settings.DD_ENV, ) diff --git a/services/security-agent/app/main.py b/services/security-agent/app/main.py index 41ecfc6..a07cfa3 100644 --- a/services/security-agent/app/main.py +++ b/services/security-agent/app/main.py @@ -21,11 +21,20 @@ async def lifespan(app: FastAPI): # Setup telemetry IMMEDIATELY setup_telemetry(app) +# Initialize global logger after telemetry # Initialize global logger after telemetry logger = structlog.get_logger() # Import modules AFTER logging is configured -from app.agents.graph import agent_graph +# Import modules AFTER logging is configured +try: + from app.agents.graph import agent_graph +except Exception as e: + import traceback + + traceback.print_exc() + raise + from app.schemas.security import ChatRequest, ChatResponse from app.core.metrics import get_security_metrics @@ -33,7 +42,7 @@ async def lifespan(app: FastAPI): @app.get("/health") async def health_check(): """Health check endpoint.""" - return {"status": "ok", "service": settings.OTEL_SERVICE_NAME} + return {"status": "ok", "service": settings.DD_SERVICE} @app.post("/chat", response_model=ChatResponse) @@ -45,6 +54,10 @@ async def chat(request: ChatRequest): All metrics automatically sent to Datadog via OTel. """ + import time + + import time + logger.info( "Chat request received", client_ip=request.client_ip, @@ -62,7 +75,11 @@ async def chat(request: ChatRequest): "metrics_data": None, } - result = await agent_graph.ainvoke(initial_state) + try: + result = await agent_graph.ainvoke(initial_state) + except Exception as e: + logger.error(f"Agent graph execution failed: {e}") + raise if result.get("is_blocked"): logger.warning("Request blocked", reason=result.get("block_reason")) diff --git a/services/security-agent/pyproject.toml b/services/security-agent/pyproject.toml index cee3ef3..1b419f8 100644 --- a/services/security-agent/pyproject.toml +++ b/services/security-agent/pyproject.toml @@ -12,23 +12,18 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -opentelemetry-instrumentation-logging = "^0.43b0" -python-json-logger = "^2.0.7" -langchain = "1.0.0" -langgraph = "1.0.0" -langchain-google-vertexai = "3.1.0" +langchain-core = "^1.2.2" +langchain-google-genai = "^4.1.2" +langgraph = "^1.0.0" bleach = "^6.1.0" email-validator = "^2.1.0" phonenumbers = "^8.13.0" +httpx = "^0.28.0" +ddtrace = "^2.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" pytest-asyncio = "^0.23.5" -httpx = ">=0.28.0,<1.0.0" pytest-cov = "^4.1.0" [build-system]