From 66199aea59cb9584cb8956f76b0bd84e5fb890a2 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Wed, 17 Dec 2025 14:13:30 -0500 Subject: [PATCH 01/17] Trying with datadog agent to enable all features --- services/eagle-eye/app/core/config.py | 7 +- services/eagle-eye/app/core/telemetry.py | 73 +++-------- services/eagle-eye/pyproject.toml | 9 +- services/gateway/app/core/config.py | 7 +- services/gateway/app/core/telemetry.py | 119 ++++++------------ services/gateway/pyproject.toml | 8 +- services/guardian/app/core/config.py | 7 +- services/guardian/app/core/telemetry.py | 115 +++++------------ services/guardian/pyproject.toml | 12 +- services/otel-collector/config.yaml | 83 ------------ services/security-agent/app/core/config.py | 7 +- services/security-agent/app/core/telemetry.py | 105 ++++------------ services/security-agent/pyproject.toml | 6 +- 13 files changed, 141 insertions(+), 417 deletions(-) delete mode 100644 services/otel-collector/config.yaml diff --git a/services/eagle-eye/app/core/config.py b/services/eagle-eye/app/core/config.py index fccbcd9..daa5c2b 100644 --- a/services/eagle-eye/app/core/config.py +++ b/services/eagle-eye/app/core/config.py @@ -16,10 +16,11 @@ class Settings(BaseSettings): ALGORITHM: str = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES: int = 30 - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-eagle-eye" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-eagle-eye" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" class Config: case_sensitive = True diff --git a/services/eagle-eye/app/core/telemetry.py b/services/eagle-eye/app/core/telemetry.py index 12b5a50..9ac473c 100644 --- a/services/eagle-eye/app/core/telemetry.py +++ b/services/eagle-eye/app/core/telemetry.py @@ -1,72 +1,35 @@ import logging import sys import structlog -from opentelemetry import trace - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - -# HTTP Log Exporter -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes +from ddtrace import tracer from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_logging(): + """Configure structured logging with Datadog trace context.""" if not settings.TELEMETRY_ENABLED: return - # Create Resource - import socket - - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) - - # --- OTLP Logging Setup --- - # Create Logger Provider - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) - - # Create OTLP Log Exporter (HTTP) - # The endpoint should be full URL for HTTP exporter e.g. http://otel-collector:4318/v1/logs - # But often the exporter appends /v1/logs if missing. - # Let's trust the default behavior of the HTTP exporter with the base endpoint. - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) - - # Add Batch Processor - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) - - # Output logs to stdout as JSON using structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -76,16 +39,14 @@ def setup_logging(): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) - stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] diff --git a/services/eagle-eye/pyproject.toml b/services/eagle-eye/pyproject.toml index 8980ee1..1b8057f 100644 --- a/services/eagle-eye/pyproject.toml +++ b/services/eagle-eye/pyproject.toml @@ -13,11 +13,10 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} sqlalchemy = "^2.0.25" asyncpg = "^0.29.0" python-jose = {extras = ["cryptography"], version = "^3.3.0"} -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -httpx = "^0.26.0" +psycopg2-binary = "^2.9.9" +greenlet = "^3.0.3" +ddtrace = "^2.0.0" +httpx = "0.27.0" pydantic-settings = "^2.1.0" structlog = "^24.1.0" passlib = {extras = ["bcrypt"], version = "^1.7.4"} diff --git a/services/gateway/app/core/config.py b/services/gateway/app/core/config.py index a551b5a..ff00ee9 100644 --- a/services/gateway/app/core/config.py +++ b/services/gateway/app/core/config.py @@ -10,10 +10,11 @@ class Settings(BaseSettings): # Database DATABASE_URL: str - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-gateway" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-gateway" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" # Sentinel Service (Input Security) SENTINEL_SERVICE_URL: str = "http://sentinel:8001" diff --git a/services/gateway/app/core/telemetry.py b/services/gateway/app/core/telemetry.py index 663fb6d..75cbea1 100644 --- a/services/gateway/app/core/telemetry.py +++ b/services/gateway/app/core/telemetry.py @@ -1,95 +1,55 @@ import logging import sys import structlog -from opentelemetry import trace, metrics -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# HTTP Exporters -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.instrumentation.logging import LoggingInstrumentor - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider +from ddtrace import tracer, patch_all +from ddtrace.runtime import RuntimeMetrics from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_telemetry(app): + """Configure Datadog APM and structured logging.""" # Skip telemetry setup if disabled (e.g., in test environments) if not settings.TELEMETRY_ENABLED: + # Still configure basic structlog for tests + structlog.configure( + processors=[ + structlog.contextvars.merge_contextvars, + structlog.processors.add_log_level, + structlog.processors.TimeStamper(fmt="iso"), + structlog.processors.JSONRenderer(), + ], + logger_factory=structlog.stdlib.LoggerFactory(), + cache_logger_on_first_use=True, + ) return - import socket - - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) - - # Tracing (HTTP) - trace_provider = TracerProvider(resource=resource) - # The HTTP exporter usually needs the full path: /v1/traces - otlp_trace_exporter = OTLPSpanExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - ) - trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter)) - trace.set_tracer_provider(trace_provider) + # Patch all supported libraries for automatic instrumentation + # This includes FastAPI, httpx, psycopg2, sqlalchemy, etc. + patch_all() - # Metrics (HTTP) - # /v1/metrics - otlp_metric_exporter = OTLPMetricExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics" - ) - metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) + # Enable runtime metrics collection (CPU, memory, etc.) + RuntimeMetrics.enable() - # --- OTLP Logging Setup (HTTP) --- - # Create Logger Provider - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) - - # /v1/logs - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) - - # Add Batch Processor - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) - - # Configure Structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -99,27 +59,22 @@ def setup_telemetry(app): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) - stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] - - # Instrument FastAPI - FastAPIInstrumentor.instrument_app( - app, tracer_provider=trace_provider, meter_provider=meter_provider - ) + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] # Log initialization log = structlog.get_logger() log.info( - "Telemetry and Structlog initialized", service_name=settings.OTEL_SERVICE_NAME + "Datadog APM and Structlog initialized", + service=settings.DD_SERVICE, + env=settings.DD_ENV, ) diff --git a/services/gateway/pyproject.toml b/services/gateway/pyproject.toml index 4da1942..3c49773 100644 --- a/services/gateway/pyproject.toml +++ b/services/gateway/pyproject.toml @@ -16,13 +16,7 @@ psycopg2-binary = "^2.9.9" sqlalchemy = "^2.0.25" greenlet = "^3.0.3" structlog = "^24.1.0" -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-instrumentation-httpx = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -opentelemetry-instrumentation-logging = "^0.43b0" -python-json-logger = "^2.0.7" +ddtrace = "^2.0.0" httpx = "0.27.0" [tool.poetry.group.dev.dependencies] diff --git a/services/guardian/app/core/config.py b/services/guardian/app/core/config.py index 90125ce..4dfbcfe 100644 --- a/services/guardian/app/core/config.py +++ b/services/guardian/app/core/config.py @@ -6,10 +6,11 @@ class Settings(BaseSettings): PROJECT_NAME: str = "Clestiq Shield - Guardian (Output Validation)" VERSION: str = "1.0.0" - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-guardian" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-guardian" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" # Google Cloud / Vertex AI GCP_PROJECT_ID: str diff --git a/services/guardian/app/core/telemetry.py b/services/guardian/app/core/telemetry.py index 857b37d..81c86fa 100644 --- a/services/guardian/app/core/telemetry.py +++ b/services/guardian/app/core/telemetry.py @@ -1,48 +1,34 @@ import logging import sys import structlog -from opentelemetry import trace, metrics -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# HTTP Exporters -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.instrumentation.logging import LoggingInstrumentor - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider +from ddtrace import tracer, patch_all +from ddtrace.runtime import RuntimeMetrics from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_telemetry(app): + """Configure Datadog APM and structured logging.""" + # Skip telemetry setup if disabled (e.g., in test environments) if not settings.TELEMETRY_ENABLED: log = structlog.get_logger() - log.info("Telemetry disabled") + log.info("Telemetry disabled, skipping Datadog APM initialization") + + # Still configure basic structlog for tests structlog.configure( processors=[ structlog.contextvars.merge_contextvars, @@ -55,52 +41,18 @@ def setup_telemetry(app): ) return - import socket + # Patch all supported libraries for automatic instrumentation + # This includes FastAPI, httpx, langchain, etc. + patch_all() - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) - - # Tracing (HTTP) - trace_provider = TracerProvider(resource=resource) - # The HTTP exporter usually needs the full path: /v1/traces - otlp_trace_exporter = OTLPSpanExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - ) - trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter)) - trace.set_tracer_provider(trace_provider) - - # Metrics (HTTP) - # /v1/metrics - otlp_metric_exporter = OTLPMetricExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics" - ) - metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) + # Enable runtime metrics collection (CPU, memory, etc.) + RuntimeMetrics.enable() - # --- OTLP Logging Setup (HTTP) --- - # Create Logger Provider - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) - - # /v1/logs - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) - - # Add Batch Processor - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) - - # Configure Structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -110,25 +62,22 @@ def setup_telemetry(app): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) - stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] - - # Instrument FastAPI - FastAPIInstrumentor.instrument_app( - app, tracer_provider=trace_provider, meter_provider=meter_provider - ) + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] # Log initialization log = structlog.get_logger() - log.info("Guardian telemetry initialized", service_name=settings.OTEL_SERVICE_NAME) + log.info( + "Datadog APM and Structlog initialized", + service=settings.DD_SERVICE, + env=settings.DD_ENV, + ) diff --git a/services/guardian/pyproject.toml b/services/guardian/pyproject.toml index 88d947a..0e484fd 100644 --- a/services/guardian/pyproject.toml +++ b/services/guardian/pyproject.toml @@ -12,16 +12,16 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -opentelemetry-instrumentation-logging = "^0.43b0" python-json-logger = "^2.0.7" langchain = "1.0.0" -langgraph = "1.0.0" langchain-google-vertexai = "3.1.0" bleach = "^6.1.0" +fashttpx = "0.27.0" +langgraph = "^0.2.50" +langchain-core = "^0.3.21" +langchain-google-genai = "^2.0.8" +langchain-community = "^0.3.13" +ddtrace = "^2.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" diff --git a/services/otel-collector/config.yaml b/services/otel-collector/config.yaml deleted file mode 100644 index c398107..0000000 --- a/services/otel-collector/config.yaml +++ /dev/null @@ -1,83 +0,0 @@ -receivers: - otlp: - protocols: - grpc: - endpoint: 0.0.0.0:4317 - http: - endpoint: 0.0.0.0:4318 - -processors: - batch: - timeout: 10s - send_batch_size: 1024 - - # Resource processor to ensure deployment.environment is set - resource: - attributes: - - key: deployment.environment - value: ${env:DEPLOYMENT_ENV} - action: insert - - # Datadog Tagging Improvements - - # 1. Source Tag: Datadog looks for 'ddsource' - - key: ddsource - value: clestiq-shield-development - action: upsert - -exporters: - logging: - loglevel: debug - - datadog: - api: - site: ${DD_SITE} - key: ${DD_API_KEY} - - # Host metadata configuration - # 'first_resource' uses the host.name we added to Python resources. - host_metadata: - enabled: true - hostname_source: first_resource - - # Metrics configuration - metrics: - summaries: - mode: gauges - - # Traces configuration - traces: - span_name_as_resource_name: true - compute_stats_by_span_kind: true - - # Logs configuration - logs: - dump_payloads: false - -service: - pipelines: - traces: - receivers: [otlp] - processors: [batch, resource] - exporters: [logging, datadog] - - metrics: - receivers: [otlp] - processors: [batch, resource] - exporters: [logging, datadog] - - logs: - receivers: [otlp] - processors: [batch, resource] - exporters: [logging, datadog] - - # Extensions for health checks - extensions: [] - - # Telemetry for the collector itself - telemetry: - logs: - level: info - metrics: - level: detailed - address: 0.0.0.0:8888 diff --git a/services/security-agent/app/core/config.py b/services/security-agent/app/core/config.py index c5eb2d8..8ac6a8d 100644 --- a/services/security-agent/app/core/config.py +++ b/services/security-agent/app/core/config.py @@ -6,10 +6,11 @@ class Settings(BaseSettings): PROJECT_NAME: str = "Clestiq Shield - Sentinel (Input Security)" VERSION: str = "1.0.0" - # OpenTelemetry + # Datadog APM TELEMETRY_ENABLED: bool = True - OTEL_SERVICE_NAME: str = "clestiq-shield-sentinel" - OTEL_EXPORTER_OTLP_ENDPOINT: str = "http://otel-collector:4317" + DD_SERVICE: str = "clestiq-shield-sentinel" + DD_ENV: str = "production" + DD_VERSION: str = "1.0.0" # Google Cloud / Vertex AI GCP_PROJECT_ID: str diff --git a/services/security-agent/app/core/telemetry.py b/services/security-agent/app/core/telemetry.py index 645fb6e..81c86fa 100644 --- a/services/security-agent/app/core/telemetry.py +++ b/services/security-agent/app/core/telemetry.py @@ -1,49 +1,32 @@ import logging import sys import structlog -from opentelemetry import trace, metrics -from opentelemetry.sdk.resources import Resource -from opentelemetry.semconv.resource import ResourceAttributes -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# HTTP Exporters -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter -from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter -from opentelemetry.sdk.metrics import MeterProvider -from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader -from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor -from opentelemetry.instrumentation.logging import LoggingInstrumentor - -# Import OTLP Log components (HTTP) -from opentelemetry.sdk._logs import LoggerProvider, LoggingHandler -from opentelemetry.sdk._logs.export import BatchLogRecordProcessor -from opentelemetry.exporter.otlp.proto.http._log_exporter import OTLPLogExporter -from opentelemetry._logs import set_logger_provider +from ddtrace import tracer, patch_all +from ddtrace.runtime import RuntimeMetrics from app.core.config import get_settings settings = get_settings() -def add_open_telemetry_spans(_, __, event_dict): - span = trace.get_current_span() - if not span.is_recording(): - event_dict["span"] = None - event_dict["trace"] = None - return event_dict - - ctx = span.get_span_context() - event_dict["span_id"] = format(ctx.span_id, "016x") - event_dict["trace_id"] = format(ctx.trace_id, "032x") +def add_datadog_trace_context(_, __, event_dict): + """Add Datadog trace context to logs for correlation.""" + span = tracer.current_span() + if span: + event_dict["dd.trace_id"] = str(span.trace_id) + event_dict["dd.span_id"] = str(span.span_id) + event_dict["dd.service"] = span.service + event_dict["dd.env"] = span.get_tag("env") + event_dict["dd.version"] = span.get_tag("version") return event_dict def setup_telemetry(app): + """Configure Datadog APM and structured logging.""" # Skip telemetry setup if disabled (e.g., in test environments) if not settings.TELEMETRY_ENABLED: log = structlog.get_logger() - log.info("Telemetry disabled, skipping OpenTelemetry initialization") + log.info("Telemetry disabled, skipping Datadog APM initialization") # Still configure basic structlog for tests structlog.configure( @@ -58,48 +41,18 @@ def setup_telemetry(app): ) return - import socket - - resource = Resource.create( - { - ResourceAttributes.SERVICE_NAME: settings.OTEL_SERVICE_NAME, - ResourceAttributes.SERVICE_VERSION: settings.VERSION, - ResourceAttributes.HOST_NAME: socket.gethostname(), - } - ) - - # Tracing (HTTP) - trace_provider = TracerProvider(resource=resource) - otlp_trace_exporter = OTLPSpanExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - ) - trace_provider.add_span_processor(BatchSpanProcessor(otlp_trace_exporter)) - trace.set_tracer_provider(trace_provider) - - # Metrics (HTTP) - otlp_metric_exporter = OTLPMetricExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/metrics" - ) - metric_reader = PeriodicExportingMetricReader(otlp_metric_exporter) - meter_provider = MeterProvider(resource=resource, metric_readers=[metric_reader]) - metrics.set_meter_provider(meter_provider) - - # --- OTLP Logging Setup (HTTP) --- - logger_provider = LoggerProvider(resource=resource) - set_logger_provider(logger_provider) + # Patch all supported libraries for automatic instrumentation + # This includes FastAPI, httpx, langchain, etc. + patch_all() - # /v1/logs - otlp_log_exporter = OTLPLogExporter( - endpoint=f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/logs" - ) + # Enable runtime metrics collection (CPU, memory, etc.) + RuntimeMetrics.enable() - logger_provider.add_log_record_processor(BatchLogRecordProcessor(otlp_log_exporter)) - - # Configure Structlog + # Configure Structlog with Datadog trace context structlog.configure( processors=[ structlog.contextvars.merge_contextvars, - add_open_telemetry_spans, + add_datadog_trace_context, structlog.processors.add_log_level, structlog.processors.TimeStamper(fmt="iso"), structlog.processors.JSONRenderer(), @@ -109,26 +62,22 @@ def setup_telemetry(app): ) # Configure Standard Library Logging - otlp_handler = LoggingHandler(level=logging.INFO, logger_provider=logger_provider) stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(logging.Formatter("%(message)s")) logging.basicConfig( level=logging.INFO, - handlers=[otlp_handler, stdout_handler], + handlers=[stdout_handler], ) - # Force uvicorn logs to use OTLP handler - logging.getLogger("uvicorn.access").handlers = [otlp_handler, stdout_handler] - logging.getLogger("uvicorn.error").handlers = [otlp_handler, stdout_handler] - - # Instrument FastAPI - FastAPIInstrumentor.instrument_app( - app, tracer_provider=trace_provider, meter_provider=meter_provider - ) + # Force uvicorn logs to JSON format + logging.getLogger("uvicorn.access").handlers = [stdout_handler] + logging.getLogger("uvicorn.error").handlers = [stdout_handler] # Log initialization log = structlog.get_logger() log.info( - "Telemetry and Structlog initialized", service_name=settings.OTEL_SERVICE_NAME + "Datadog APM and Structlog initialized", + service=settings.DD_SERVICE, + env=settings.DD_ENV, ) diff --git a/services/security-agent/pyproject.toml b/services/security-agent/pyproject.toml index cee3ef3..4f02dcb 100644 --- a/services/security-agent/pyproject.toml +++ b/services/security-agent/pyproject.toml @@ -12,11 +12,6 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -opentelemetry-api = "^1.22.0" -opentelemetry-sdk = "^1.22.0" -opentelemetry-instrumentation-fastapi = "^0.43b0" -opentelemetry-exporter-otlp = "^1.22.0" -opentelemetry-instrumentation-logging = "^0.43b0" python-json-logger = "^2.0.7" langchain = "1.0.0" langgraph = "1.0.0" @@ -24,6 +19,7 @@ langchain-google-vertexai = "3.1.0" bleach = "^6.1.0" email-validator = "^2.1.0" phonenumbers = "^8.13.0" +ddtrace = "^2.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" From 81a1d03c178bcc8c7d5ec3e27965a53f0bd8884e Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Wed, 17 Dec 2025 15:43:44 -0500 Subject: [PATCH 02/17] Package configs --- services/eagle-eye/app/main.py | 29 +------------------------- services/guardian/pyproject.toml | 11 +++------- services/security-agent/pyproject.toml | 7 +++---- 3 files changed, 7 insertions(+), 40 deletions(-) diff --git a/services/eagle-eye/app/main.py b/services/eagle-eye/app/main.py index ba84fc9..48cee92 100644 --- a/services/eagle-eye/app/main.py +++ b/services/eagle-eye/app/main.py @@ -2,13 +2,6 @@ from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import structlog -from opentelemetry import trace -from opentelemetry.sdk.resources import Resource -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor - -# Use HTTP Exporter for Traces (port 4318) -from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter from app.core.config import get_settings from app.core.telemetry import setup_logging @@ -26,24 +19,6 @@ from app.api.v1.endpoints import auth, users, apps, api_keys -# Setup Telemetry -def setup_telemetry(app: FastAPI): - if settings.TELEMETRY_ENABLED: - resource = Resource(attributes={"service.name": settings.OTEL_SERVICE_NAME}) - trace.set_tracer_provider(TracerProvider(resource=resource)) - # HTTP Exporter endpoint usually expects /v1/traces appended or handled by class - # OTLPSpanExporter (HTTP) defaults to v1/traces if not present? - # Let's be explicit: endpoint/v1/traces - endpoint = f"{settings.OTEL_EXPORTER_OTLP_ENDPOINT}/v1/traces" - otlp_exporter = OTLPSpanExporter(endpoint=endpoint) - span_processor = BatchSpanProcessor(otlp_exporter) - trace.get_tracer_provider().add_span_processor(span_processor) - # Instrument FastAPI - from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor - - FastAPIInstrumentor.instrument_app(app) - - @asynccontextmanager async def lifespan(app: FastAPI): # Startup @@ -76,8 +51,6 @@ async def lifespan(app: FastAPI): allow_headers=["*"], ) -setup_telemetry(app) - app.include_router(auth.router, prefix="/auth", tags=["auth"]) app.include_router( users.router, @@ -100,7 +73,7 @@ async def lifespan(app: FastAPI): async def health_check(): return { "status": "ok", - "service": settings.OTEL_SERVICE_NAME, + "service": settings.DD_SERVICE, "version": settings.VERSION, } diff --git a/services/guardian/pyproject.toml b/services/guardian/pyproject.toml index 0e484fd..7c91ab4 100644 --- a/services/guardian/pyproject.toml +++ b/services/guardian/pyproject.toml @@ -12,21 +12,16 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -python-json-logger = "^2.0.7" -langchain = "1.0.0" +langchain-core = "^1.1.0" langchain-google-vertexai = "3.1.0" +langgraph = "^1.0.0" bleach = "^6.1.0" -fashttpx = "0.27.0" -langgraph = "^0.2.50" -langchain-core = "^0.3.21" -langchain-google-genai = "^2.0.8" -langchain-community = "^0.3.13" +httpx = "^0.28.0" ddtrace = "^2.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" pytest-asyncio = "^0.23.5" -httpx = ">=0.28.0,<1.0.0" pytest-cov = "^4.1.0" [build-system] diff --git a/services/security-agent/pyproject.toml b/services/security-agent/pyproject.toml index 4f02dcb..dd5487e 100644 --- a/services/security-agent/pyproject.toml +++ b/services/security-agent/pyproject.toml @@ -12,19 +12,18 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -python-json-logger = "^2.0.7" -langchain = "1.0.0" -langgraph = "1.0.0" +langchain-core = "^1.1.0" langchain-google-vertexai = "3.1.0" +langgraph = "^1.0.0" bleach = "^6.1.0" email-validator = "^2.1.0" phonenumbers = "^8.13.0" +httpx = "^0.28.0" ddtrace = "^2.0.0" [tool.poetry.group.dev.dependencies] pytest = "^8.0.0" pytest-asyncio = "^0.23.5" -httpx = ">=0.28.0,<1.0.0" pytest-cov = "^4.1.0" [build-system] From c5d068b1bcb10efb061a9334e4f73245c8e7e75d Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Wed, 17 Dec 2025 19:41:57 -0500 Subject: [PATCH 03/17] I see Logs correlation and infrastructure management configured.. Some progress!! --- docker-compose.yml | 123 ++++++++++++++---- services/eagle-eye/app/core/telemetry.py | 14 ++ services/gateway/app/core/telemetry.py | 6 + services/gateway/app/main.py | 2 +- services/guardian/app/core/telemetry.py | 6 + services/guardian/app/main.py | 2 +- services/security-agent/app/core/telemetry.py | 6 + services/security-agent/app/main.py | 2 +- 8 files changed, 134 insertions(+), 27 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 1bb4004..ca8f9d4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,16 +10,28 @@ services: - "8000:8000" environment: - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/clestiq_shield - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-gateway + - DD_ENV=development + - DD_SERVICE=clestiq-shield-gateway + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true - SENTINEL_SERVICE_URL=http://sentinel:8001 - TELEMETRY_ENABLED=true depends_on: - db - sentinel - - otel-collector + - datadog-agent volumes: - ./services/gateway/app:/app/app + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-gateway" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -38,16 +50,28 @@ services: - "8003" environment: - DATABASE_URL=postgresql+asyncpg://user:password@db:5432/clestiq_shield - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-eagle-eye + - DD_ENV=development + - DD_SERVICE=clestiq-shield-eagle-eye + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true - TELEMETRY_ENABLED=true - SECRET_KEY=change_this_to_a_strong_secret_key - ALGORITHM=HS256 depends_on: - db - - otel-collector + - datadog-agent volumes: - ./services/eagle-eye/app:/app/app + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-eagle-eye" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -65,8 +89,15 @@ services: expose: - "8001" environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-sentinel + - DD_ENV=development + - DD_SERVICE=clestiq-shield-sentinel + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true - GCP_PROJECT_ID=${GCP_PROJECT_ID} - GCP_LOCATION=${GCP_LOCATION:-us-east1} - GOOGLE_APPLICATION_CREDENTIALS=/app/gcp-credentials.json @@ -87,11 +118,16 @@ services: # Guardian Service URL - GUARDIAN_SERVICE_URL=http://guardian:8002 depends_on: - - otel-collector + - datadog-agent - guardian volumes: - ./services/security-agent/app:/app/app - ./gcp-credentials.json:/app/gcp-credentials.json:ro + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-sentinel" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -109,8 +145,15 @@ services: expose: - "8002" environment: - - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4318 - - OTEL_SERVICE_NAME=clestiq-shield-guardian + - DD_ENV=development + - DD_SERVICE=clestiq-shield-guardian + - DD_VERSION=1.0.0 + - DD_AGENT_HOST=datadog-agent + - DD_TRACE_AGENT_PORT=8126 + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_LOGS_INJECTION=true + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true - GCP_PROJECT_ID=${GCP_PROJECT_ID} - GCP_LOCATION=${GCP_LOCATION:-us-east1} - GOOGLE_APPLICATION_CREDENTIALS=/app/gcp-credentials.json @@ -122,10 +165,15 @@ services: - OUTPUT_PII_DETECTION_ENABLED=true - AUTO_CONVERT_TOON_TO_JSON=true depends_on: - - otel-collector + - datadog-agent volumes: - ./services/guardian/app:/app/app - ./gcp-credentials.json:/app/gcp-credentials.json:ro + - dogstatsd-socket:/var/run/datadog + labels: + com.datadoghq.tags.service: "clestiq-shield-guardian" + com.datadoghq.tags.env: "development" + com.datadoghq.tags.version: "1.0.0" networks: - clestiq-network healthcheck: @@ -154,25 +202,51 @@ services: timeout: 5s retries: 5 - # OpenTelemetry Collector - otel-collector: - image: otel/opentelemetry-collector-contrib:0.91.0 - command: ["--config=/etc/otel-collector-config.yaml"] + # Datadog Agent + datadog-agent: + image: gcr.io/datadoghq/agent:latest environment: - DD_API_KEY=${DD_API_KEY} - DD_SITE=${DD_SITE:-datadoghq.com} - - DEPLOYMENT_ENV=development - volumes: - - ./services/otel-collector/config.yaml:/etc/otel-collector-config.yaml:ro + - DD_ENV=development + - DD_APM_ENABLED=true + - DD_APM_NON_LOCAL_TRAFFIC=true + - DD_LOGS_ENABLED=true + - DD_LOGS_CONFIG_CONTAINER_COLLECT_ALL=true + - DD_LOGS_CONFIG_AUTO_MULTI_LINE_DETECTION=true + - DD_CONTAINER_EXCLUDE="name:datadog-agent" + - DD_PROFILING_ENABLED=true + - DD_RUNTIME_METRICS_ENABLED=true + - DD_DOGSTATSD_SOCKET=/var/run/datadog/dsd.socket + - DD_DOGSTATSD_NON_LOCAL_TRAFFIC=false + # Universal Service Monitoring + - DD_SYSTEM_PROBE_ENABLED=true + - DD_SYSTEM_PROBE_SERVICE_MONITORING_ENABLED=true + # Profiling intake + - DD_PROFILING_AGENTLESS=false ports: - - "4317:4317" - - "4318:4318" - - "8888:8888" - - "13133:13133" + - "8126:8126" # APM traces + - "8125:8125/udp" # DogStatsD metrics (fallback) + volumes: + - /var/run/docker.sock:/var/run/docker.sock:ro + - /proc/:/host/proc/:ro + - /sys/fs/cgroup/:/host/sys/fs/cgroup:ro + - dogstatsd-socket:/var/run/datadog + cap_add: + - SYS_ADMIN + - SYS_RESOURCE + - SYS_PTRACE + - NET_ADMIN + - NET_BROADCAST + - NET_RAW + - IPC_LOCK + - CHOWN + security_opt: + - apparmor:unconfined networks: - clestiq-network healthcheck: - test: ["CMD", "wget", "--spider", "-q", "http://localhost:13133"] + test: ["CMD", "agent", "health"] interval: 30s timeout: 10s retries: 3 @@ -183,3 +257,4 @@ networks: volumes: postgres_data: + dogstatsd-socket: diff --git a/services/eagle-eye/app/core/telemetry.py b/services/eagle-eye/app/core/telemetry.py index 9ac473c..85c1d11 100644 --- a/services/eagle-eye/app/core/telemetry.py +++ b/services/eagle-eye/app/core/telemetry.py @@ -25,6 +25,20 @@ def setup_logging(): if not settings.TELEMETRY_ENABLED: return + # Enable Datadog instrumentation + from ddtrace import patch_all + from ddtrace.runtime import RuntimeMetrics + from ddtrace.profiling import Profiler + + patch_all() + + # Enable Continuous Profiler + profiler = Profiler() + profiler.start() + + # Enable runtime metrics + RuntimeMetrics.enable() + # Configure Structlog with Datadog trace context structlog.configure( processors=[ diff --git a/services/gateway/app/core/telemetry.py b/services/gateway/app/core/telemetry.py index 75cbea1..6675e1b 100644 --- a/services/gateway/app/core/telemetry.py +++ b/services/gateway/app/core/telemetry.py @@ -42,6 +42,12 @@ def setup_telemetry(app): # This includes FastAPI, httpx, psycopg2, sqlalchemy, etc. patch_all() + # Enable Continuous Profiler for code performance analysis + from ddtrace.profiling import Profiler + + profiler = Profiler() + profiler.start() + # Enable runtime metrics collection (CPU, memory, etc.) RuntimeMetrics.enable() diff --git a/services/gateway/app/main.py b/services/gateway/app/main.py index 6c8cfa9..272ef42 100644 --- a/services/gateway/app/main.py +++ b/services/gateway/app/main.py @@ -74,7 +74,7 @@ async def health_check(): logger.info("Health check requested") return { "status": "ok", - "service": settings.OTEL_SERVICE_NAME, + "service": settings.DD_SERVICE, "version": settings.VERSION, } diff --git a/services/guardian/app/core/telemetry.py b/services/guardian/app/core/telemetry.py index 81c86fa..0f3c788 100644 --- a/services/guardian/app/core/telemetry.py +++ b/services/guardian/app/core/telemetry.py @@ -45,6 +45,12 @@ def setup_telemetry(app): # This includes FastAPI, httpx, langchain, etc. patch_all() + # Enable Continuous Profiler for code performance analysis + from ddtrace.profiling import Profiler + + profiler = Profiler() + profiler.start() + # Enable runtime metrics collection (CPU, memory, etc.) RuntimeMetrics.enable() diff --git a/services/guardian/app/main.py b/services/guardian/app/main.py index a1564cc..b2d9422 100644 --- a/services/guardian/app/main.py +++ b/services/guardian/app/main.py @@ -35,7 +35,7 @@ async def health_check(): """Health check endpoint.""" return { "status": "ok", - "service": settings.OTEL_SERVICE_NAME, + "service": settings.DD_SERVICE, "version": settings.VERSION, } diff --git a/services/security-agent/app/core/telemetry.py b/services/security-agent/app/core/telemetry.py index 81c86fa..0f3c788 100644 --- a/services/security-agent/app/core/telemetry.py +++ b/services/security-agent/app/core/telemetry.py @@ -45,6 +45,12 @@ def setup_telemetry(app): # This includes FastAPI, httpx, langchain, etc. patch_all() + # Enable Continuous Profiler for code performance analysis + from ddtrace.profiling import Profiler + + profiler = Profiler() + profiler.start() + # Enable runtime metrics collection (CPU, memory, etc.) RuntimeMetrics.enable() diff --git a/services/security-agent/app/main.py b/services/security-agent/app/main.py index 41ecfc6..8b8e751 100644 --- a/services/security-agent/app/main.py +++ b/services/security-agent/app/main.py @@ -33,7 +33,7 @@ async def lifespan(app: FastAPI): @app.get("/health") async def health_check(): """Health check endpoint.""" - return {"status": "ok", "service": settings.OTEL_SERVICE_NAME} + return {"status": "ok", "service": settings.DD_SERVICE} @app.post("/chat", response_model=ChatResponse) From 929cb6d6fa9293000603fa88fab7c249c469de8c Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Wed, 17 Dec 2025 20:04:41 -0500 Subject: [PATCH 04/17] Datadog is raining fire with its services!! --- docker-compose.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/docker-compose.yml b/docker-compose.yml index ca8f9d4..69a87cb 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -224,6 +224,9 @@ services: - DD_SYSTEM_PROBE_SERVICE_MONITORING_ENABLED=true # Profiling intake - DD_PROFILING_AGENTLESS=false + # Service Monitoring + - DD_SERVICE_MONITORING_ENABLED=true + - DD_PROCESS_CONFIG_PROCESS_COLLECTION_ENABLED=true ports: - "8126:8126" # APM traces - "8125:8125/udp" # DogStatsD metrics (fallback) @@ -231,6 +234,8 @@ services: - /var/run/docker.sock:/var/run/docker.sock:ro - /proc/:/host/proc/:ro - /sys/fs/cgroup/:/host/sys/fs/cgroup:ro + - /var/lib/docker/containers:/var/lib/docker/containers:ro + - /opt/datadog-agent/run:/opt/datadog-agent/run:rw - dogstatsd-socket:/var/run/datadog cap_add: - SYS_ADMIN From 12d8c9e57decbba80ddc2995e66ac802feb538ef Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Wed, 17 Dec 2025 20:38:53 -0500 Subject: [PATCH 05/17] Fixing previous otel config --- services/guardian/app/core/metrics.py | 121 ++------- services/security-agent/app/core/metrics.py | 264 ++++---------------- 2 files changed, 71 insertions(+), 314 deletions(-) diff --git a/services/guardian/app/core/metrics.py b/services/guardian/app/core/metrics.py index 53abc32..3198ed6 100644 --- a/services/guardian/app/core/metrics.py +++ b/services/guardian/app/core/metrics.py @@ -1,27 +1,29 @@ """ Guardian Metrics Module. -Tracks output validation, content filtering, and guardrails metrics for Datadog. +Tracks output validation, content filtering, and guardrail metrics for Datadog. +NOTE: OpenTelemetry removed - using Datadog APM only. This module provides +no-op implementations to maintain API compatibility. """ -from opentelemetry import metrics from typing import Optional import structlog logger = structlog.get_logger() -_meter: Optional[metrics.Meter] = None +class NoOpMetric: + """No-op metric that does nothing but maintains API compatibility.""" -def get_meter() -> metrics.Meter: - global _meter - if _meter is None: - _meter = metrics.get_meter("clestiq.guardian.agent", version="1.0.0") - return _meter + def add(self, value, attributes=None): + pass + + def record(self, value, attributes=None): + pass class GuardianMetrics: - """Singleton for Guardian-specific metrics.""" + """Singleton for Guardian-specific metrics (no-op - using Datadog APM instead).""" _instance = None _initialized = False @@ -35,103 +37,34 @@ def __init__(self): if GuardianMetrics._initialized: return - meter = get_meter() - - # Request metrics - self.requests_total = meter.create_counter( - name="guardian.requests_total", - description="Total validation requests", - unit="1", - ) - - self.requests_by_mode = meter.create_counter( - name="guardian.requests_by_mode", - description="Requests by moderation mode", - unit="1", - ) - - # Content filtering metrics - self.content_filtered = meter.create_counter( - name="guardian.content_filtered", - description="Content filtered by category", - unit="1", - ) - - self.content_by_action = meter.create_counter( - name="guardian.content_by_action", - description="Content actions (block, warn, allow)", - unit="1", - ) - - # PII leak detection - self.pii_leaks_detected = meter.create_counter( - name="guardian.pii_leaks_detected", - description="PII leaks detected in LLM output", - unit="1", - ) - - # TOON conversion - self.toon_conversions = meter.create_counter( - name="guardian.toon_conversions", - description="TOON to JSON conversions", - unit="1", - ) - - # Latency histograms - self.validation_latency = meter.create_histogram( - name="guardian.validation_latency_ms", - description="Total validation latency", - unit="ms", - ) - - self.content_filter_latency = meter.create_histogram( - name="guardian.content_filter_latency_ms", - description="Content filtering latency", - unit="ms", - ) - - self.pii_scan_latency = meter.create_histogram( - name="guardian.pii_scan_latency_ms", - description="PII scanning latency", - unit="ms", - ) + # All metrics are no-ops now - Datadog APM provides automatic metrics + self.requests_total = NoOpMetric() + self.requests_by_mode = NoOpMetric() + self.content_filtered = NoOpMetric() + self.content_by_action = NoOpMetric() + self.pii_leaks_detected = NoOpMetric() + self.toon_conversions = NoOpMetric() + self.validation_latency = NoOpMetric() + self.content_filter_latency = NoOpMetric() + self.pii_scan_latency = NoOpMetric() GuardianMetrics._initialized = True - logger.info("Guardian metrics initialized") + logger.info("Guardian metrics initialized (no-op - using Datadog APM)") def record_request(self, moderation_mode: str): - self.requests_total.add(1) - self.requests_by_mode.add(1, {"mode": moderation_mode}) + pass # No-op def record_content_filtered(self, category: str, action: str): - self.content_filtered.add(1, {"category": category}) - self.content_by_action.add(1, {"action": action}) + pass # No-op def record_pii_leak(self, pii_type: str): - self.pii_leaks_detected.add(1, {"pii_type": pii_type}) + pass # No-op def record_toon_conversion(self, success: bool): - self.toon_conversions.add(1, {"success": str(success)}) + pass # No-op def record_latency(self, stage: str, latency_ms: float): - if stage == "validation": - self.validation_latency.record(latency_ms) - elif stage == "content_filter": - self.content_filter_latency.record(latency_ms) - elif stage == "pii_scan": - self.pii_scan_latency.record(latency_ms) - # NEW: Support for advanced validation stages - elif stage in [ - "hallucination_check", - "citation_verification", - "tone_check", - "disclaimer_injection", - "refusal_detection", - ]: - # Log these under validation_latency for now - self.validation_latency.record(latency_ms) - else: - logger.debug(f"Unknown stage for latency: {stage}") + pass # No-op _guardian_metrics: Optional[GuardianMetrics] = None diff --git a/services/security-agent/app/core/metrics.py b/services/security-agent/app/core/metrics.py index f5cdcb5..424cce4 100644 --- a/services/security-agent/app/core/metrics.py +++ b/services/security-agent/app/core/metrics.py @@ -1,16 +1,10 @@ """ Comprehensive metrics module for Datadog observability. -This module provides custom OpenTelemetry metrics for tracking: -- Attack prevention (by type) -- PII redactions (by type) -- Token usage and savings -- Request latency by stage -- Overall security request stats +NOTE: OpenTelemetry removed - using Datadog APM only. This module provides +no-op implementations to maintain API compatibility. """ -from opentelemetry import metrics -from opentelemetry.metrics import Counter, Histogram, UpDownCounter from typing import Dict, Any, Optional import structlog import time @@ -19,25 +13,19 @@ logger = structlog.get_logger() -# Get the global meter -_meter: Optional[metrics.Meter] = None +class NoOpMetric: + """No-op metric that does nothing but maintains API compatibility.""" -def get_meter() -> metrics.Meter: - """Get or create the global meter for security metrics.""" - global _meter - if _meter is None: - _meter = metrics.get_meter("clestiq.security.agent", version="1.0.0") - return _meter + def add(self, value, attributes=None): + pass - -# ============================================================================ -# COUNTERS - Aggregate totals -# ============================================================================ + def record(self, value, attributes=None): + pass class SecurityMetrics: - """Singleton class managing all security-related metrics.""" + """Singleton class managing all security-related metrics (no-op - using Datadog APM).""" _instance = None _initialized = False @@ -51,173 +39,53 @@ def __init__(self): if SecurityMetrics._initialized: return - meter = get_meter() - - # ---- Attack Prevention Metrics ---- - self.attacks_prevented = meter.create_counter( - name="security.attacks_prevented", - description="Total number of attacks prevented", - unit="1", - ) - - self.attacks_by_type = meter.create_counter( - name="security.attacks_by_type", - description="Attacks prevented broken down by attack type", - unit="1", - ) - - # ---- PII Redaction Metrics ---- - self.pii_redactions = meter.create_counter( - name="security.pii_redactions", - description="Total PII items redacted", - unit="1", - ) - - self.pii_by_type = meter.create_counter( - name="security.pii_by_type", - description="PII redactions by type (SSN, CC, EMAIL, PHONE, etc.)", - unit="1", - ) - - # ---- Token Metrics ---- - self.tokens_saved = meter.create_counter( - name="security.tokens_saved", - description="Tokens saved by TOON conversion", - unit="1", - ) - - self.llm_tokens_input = meter.create_counter( - name="security.llm_tokens_input", - description="Input tokens sent to LLM", - unit="1", - ) - - self.llm_tokens_output = meter.create_counter( - name="security.llm_tokens_output", - description="Output tokens received from LLM", - unit="1", - ) - - self.llm_tokens_total = meter.create_counter( - name="security.llm_tokens_total", - description="Total tokens used (input + output)", - unit="1", - ) - - # ---- Request Metrics ---- - self.requests_total = meter.create_counter( - name="security.requests_total", - description="Total security analysis requests", - unit="1", - ) - - self.requests_blocked = meter.create_counter( - name="security.requests_blocked", - description="Requests blocked by security checks", - unit="1", - ) - - self.requests_passed = meter.create_counter( - name="security.requests_passed", - description="Requests that passed security checks", - unit="1", - ) - - # ---- Latency Histograms ---- - self.request_latency = meter.create_histogram( - name="security.request_latency_ms", - description="End-to-end request latency in milliseconds", - unit="ms", - ) - - self.sanitization_latency = meter.create_histogram( - name="security.sanitization_latency_ms", - description="Input sanitization latency in milliseconds", - unit="ms", - ) - - self.pii_detection_latency = meter.create_histogram( - name="security.pii_detection_latency_ms", - description="PII detection and redaction latency in milliseconds", - unit="ms", - ) - - self.threat_detection_latency = meter.create_histogram( - name="security.threat_detection_latency_ms", - description="Threat detection latency in milliseconds", - unit="ms", - ) - - self.llm_check_latency = meter.create_histogram( - name="security.llm_check_latency_ms", - description="LLM security check latency in milliseconds", - unit="ms", - ) - - self.toon_conversion_latency = meter.create_histogram( - name="security.toon_conversion_latency_ms", - description="TOON conversion latency in milliseconds", - unit="ms", - ) - - self.llm_response_latency = meter.create_histogram( - name="security.llm_response_latency_ms", - description="LLM response generation latency in milliseconds", - unit="ms", - ) - - # ---- Score Distribution ---- - self.threat_score_distribution = meter.create_histogram( - name="security.threat_score", - description="Distribution of threat scores (0.0-1.0)", - unit="1", - ) - - # ---- Active Requests Gauge ---- - self.active_requests = meter.create_up_down_counter( - name="security.active_requests", - description="Currently processing requests", - unit="1", - ) + # All metrics are no-ops now - Datadog APM provides automatic metrics + self.attacks_prevented = NoOpMetric() + self.attacks_by_type = NoOpMetric() + self.pii_redactions = NoOpMetric() + self.pii_by_type = NoOpMetric() + self.tokens_saved = NoOpMetric() + self.llm_tokens_input = NoOpMetric() + self.llm_tokens_output = NoOpMetric() + self.llm_tokens_total = NoOpMetric() + self.requests_total = NoOpMetric() + self.requests_blocked = NoOpMetric() + self.requests_passed = NoOpMetric() + self.request_latency = NoOpMetric() + self.sanitization_latency = NoOpMetric() + self.pii_detection_latency = NoOpMetric() + self.threat_detection_latency = NoOpMetric() + self.llm_check_latency = NoOpMetric() + self.toon_conversion_latency = NoOpMetric() + self.llm_response_latency = NoOpMetric() + self.threat_score_distribution = NoOpMetric() + self.active_requests = NoOpMetric() SecurityMetrics._initialized = True - logger.info("Security metrics initialized") - - # ======================================================================== - # Recording Methods - # ======================================================================== + logger.info("Security metrics initialized (no-op - using Datadog APM)") def record_attack_prevented(self, attack_type: str, count: int = 1): - """Record an attack that was prevented.""" - self.attacks_prevented.add(count) - self.attacks_by_type.add(count, {"attack_type": attack_type}) + """Record an attack that was prevented (no-op).""" logger.info("Attack prevented", attack_type=attack_type, count=count) def record_pii_redaction(self, pii_type: str, count: int = 1): - """Record PII redaction.""" - self.pii_redactions.add(count) - self.pii_by_type.add(count, {"pii_type": pii_type}) + """Record PII redaction (no-op).""" logger.debug("PII redacted", pii_type=pii_type, count=count) def record_tokens_saved(self, tokens: int, conversion_type: str = "toon"): - """Record tokens saved by conversion.""" - self.tokens_saved.add(tokens, {"conversion_type": conversion_type}) + """Record tokens saved by conversion (no-op).""" logger.debug("Tokens saved", tokens=tokens, conversion_type=conversion_type) def record_llm_tokens(self, input_tokens: int, output_tokens: int): - """Record LLM token usage.""" + """Record LLM token usage (no-op).""" total = input_tokens + output_tokens - self.llm_tokens_input.add(input_tokens) - self.llm_tokens_output.add(output_tokens) - self.llm_tokens_total.add(total) logger.info( "LLM tokens used", input=input_tokens, output=output_tokens, total=total ) def record_request_start(self): - """Record a new request starting.""" - self.requests_total.add(1) - self.active_requests.add(1) + """Record a new request starting (no-op).""" + pass def record_request_end( self, @@ -226,36 +94,12 @@ def record_request_end( threat_score: float = 0.0, block_reason: Optional[str] = None, ): - """ - Record request completion with detailed tags for Datadog observability. - - Args: - blocked: Whether the request was blocked - latency_ms: Total processing latency - threat_score: Threat confidence score (0.0-1.0) - block_reason: Specific reason for blocking (e.g., 'sql_injection', 'xss') - """ - self.active_requests.add(-1) - - # Prepare tags for detailed filtering in Datadog - tags = { - "security_status": "blocked" if blocked else "passed", - } - + """Record request completion (no-op).""" + tags = {"security_status": "blocked" if blocked else "passed"} if blocked and block_reason: - # Normalize block reason to a tag-safe identifier normalized_reason = block_reason.lower().replace(" ", "_").replace(":", "") tags["block_reason"] = normalized_reason - # Record metrics with tags - self.request_latency.record(latency_ms, tags) - self.threat_score_distribution.record(threat_score, tags) - - if blocked: - self.requests_blocked.add(1, tags) - else: - self.requests_passed.add(1, tags) - logger.info( "Request completed", **tags, @@ -264,28 +108,8 @@ def record_request_end( ) def record_stage_latency(self, stage: str, latency_ms: float): - """Record latency for a specific processing stage.""" - stage_histograms = { - "sanitization": self.sanitization_latency, - "pii_detection": self.pii_detection_latency, - "pii_pseudonymization": self.pii_detection_latency, # Use same histogram - "threat_detection": self.threat_detection_latency, - "llm_check": self.llm_check_latency, - "toon_conversion": self.toon_conversion_latency, - "llm_response": self.llm_response_latency, - } - - histogram = stage_histograms.get(stage) - if histogram: - histogram.record(latency_ms) - else: - logger.warning("Unknown stage for latency recording", stage=stage) - - histogram = stage_histograms.get(stage) - if histogram: - histogram.record(latency_ms) - else: - logger.warning("Unknown stage for latency recording", stage=stage) + """Record latency for a specific processing stage (no-op).""" + pass # Global metrics instance @@ -307,7 +131,7 @@ def get_security_metrics() -> SecurityMetrics: @contextmanager def track_latency(stage: str): - """Context manager to track latency of a code block.""" + """Context manager to track latency of a code block (no-op).""" metrics = get_security_metrics() start_time = time.perf_counter() try: @@ -318,7 +142,7 @@ def track_latency(stage: str): def track_stage(stage: str): - """Decorator to track latency of a function.""" + """Decorator to track latency of a function (no-op).""" def decorator(func): @wraps(func) @@ -331,7 +155,7 @@ def sync_wrapper(*args, **kwargs): with track_latency(stage): return func(*args, **kwargs) - if hasattr(func, "__code__") and func.__code__.co_flags & 0x80: # CO_COROUTINE + if hasattr(func, "__code") and func.__code__.co_flags & 0x80: # CO_COROUTINE return async_wrapper return sync_wrapper From bcd43557871b79267072b863711f248121b7219e Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Thu, 18 Dec 2025 11:28:09 -0500 Subject: [PATCH 06/17] Debugging silent failures --- services/guardian/app/main.py | 17 ++++++++++++++++- services/security-agent/app/main.py | 17 ++++++++++++++++- 2 files changed, 32 insertions(+), 2 deletions(-) diff --git a/services/guardian/app/main.py b/services/guardian/app/main.py index b2d9422..ccface5 100644 --- a/services/guardian/app/main.py +++ b/services/guardian/app/main.py @@ -18,16 +18,31 @@ async def lifespan(app: FastAPI): app = FastAPI(title=settings.PROJECT_NAME, version=settings.VERSION, lifespan=lifespan) +print("DEBUG: Setting up telemetry...", flush=True) # Setup telemetry IMMEDIATELY setup_telemetry(app) +print("DEBUG: Telemetry setup complete", flush=True) # Initialize global logger after telemetry +print("DEBUG: Initializing structlog...", flush=True) logger = structlog.get_logger() +print("DEBUG: Structlog initialized", flush=True) # Import modules AFTER logging is configured to ensure they use the correct logger factory -from app.agents.graph import guardian_graph +print("DEBUG: Importing app.agents.graph...", flush=True) +try: + from app.agents.graph import guardian_graph + print("DEBUG: Imported app.agents.graph successfully", flush=True) +except Exception as e: + print(f"DEBUG: Failed to import app.agents.graph: {e}", flush=True) + import traceback + traceback.print_exc() + raise + +print("DEBUG: Importing schemas and metrics...", flush=True) from app.schemas.validation import ValidateRequest, ValidateResponse from app.core.metrics import get_guardian_metrics +print("DEBUG: Imports completed successfully", flush=True) @app.get("/health") diff --git a/services/security-agent/app/main.py b/services/security-agent/app/main.py index 8b8e751..f8cadd7 100644 --- a/services/security-agent/app/main.py +++ b/services/security-agent/app/main.py @@ -18,16 +18,31 @@ async def lifespan(app: FastAPI): app = FastAPI(title=settings.PROJECT_NAME, version=settings.VERSION, lifespan=lifespan) +print("DEBUG: Setting up telemetry...", flush=True) # Setup telemetry IMMEDIATELY setup_telemetry(app) +print("DEBUG: Telemetry setup complete", flush=True) # Initialize global logger after telemetry +print("DEBUG: Initializing structlog...", flush=True) logger = structlog.get_logger() +print("DEBUG: Structlog initialized", flush=True) # Import modules AFTER logging is configured -from app.agents.graph import agent_graph +print("DEBUG: Importing app.agents.graph...", flush=True) +try: + from app.agents.graph import agent_graph + print("DEBUG: Imported app.agents.graph successfully", flush=True) +except Exception as e: + print(f"DEBUG: Failed to import app.agents.graph: {e}", flush=True) + import traceback + traceback.print_exc() + raise + +print("DEBUG: Importing schemas and metrics...", flush=True) from app.schemas.security import ChatRequest, ChatResponse from app.core.metrics import get_security_metrics +print("DEBUG: Imports completed successfully", flush=True) @app.get("/health") From 287ae0ed60af3655e751ea126a4d1e00cdfc101e Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Thu, 18 Dec 2025 11:41:27 -0500 Subject: [PATCH 07/17] Implementing lazy import --- services/guardian/app/agents/nodes/content_filter.py | 7 +++++-- .../guardian/app/agents/nodes/hallucination_detector.py | 5 ++++- services/guardian/app/agents/nodes/tone_checker.py | 5 ++++- .../security-agent/app/agents/nodes/llm_responder.py | 9 ++++++--- services/security-agent/app/agents/nodes/security.py | 5 ++++- 5 files changed, 23 insertions(+), 8 deletions(-) diff --git a/services/guardian/app/agents/nodes/content_filter.py b/services/guardian/app/agents/nodes/content_filter.py index 2a6b565..144fe09 100644 --- a/services/guardian/app/agents/nodes/content_filter.py +++ b/services/guardian/app/agents/nodes/content_filter.py @@ -7,8 +7,9 @@ import re import time -from typing import Dict, Any, List, Tuple -from langchain_google_vertexai import ChatVertexAI +from typing import Dict, Any + +# from langchain_google_vertexai import ChatVertexAI - Moved to get_content_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -51,6 +52,8 @@ def get_content_llm(): global _content_llm if _content_llm is None: + from langchain_google_vertexai import ChatVertexAI + _content_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) return _content_llm diff --git a/services/guardian/app/agents/nodes/hallucination_detector.py b/services/guardian/app/agents/nodes/hallucination_detector.py index 7f04403..02801dc 100644 --- a/services/guardian/app/agents/nodes/hallucination_detector.py +++ b/services/guardian/app/agents/nodes/hallucination_detector.py @@ -7,7 +7,8 @@ import time from typing import Dict, Any -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_vertexai import ChatVertexAI - Moved to get_judge_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -23,6 +24,8 @@ def get_judge_llm(): global _judge_llm if _judge_llm is None: + from langchain_google_vertexai import ChatVertexAI + _judge_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) return _judge_llm diff --git a/services/guardian/app/agents/nodes/tone_checker.py b/services/guardian/app/agents/nodes/tone_checker.py index b89a024..7361609 100644 --- a/services/guardian/app/agents/nodes/tone_checker.py +++ b/services/guardian/app/agents/nodes/tone_checker.py @@ -7,7 +7,8 @@ import time from typing import Dict, Any -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_vertexai import ChatVertexAI - Moved to get_tone_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -22,6 +23,8 @@ def get_tone_llm(): global _tone_llm if _tone_llm is None: + from langchain_google_vertexai import ChatVertexAI + _tone_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) return _tone_llm diff --git a/services/security-agent/app/agents/nodes/llm_responder.py b/services/security-agent/app/agents/nodes/llm_responder.py index 015320b..6c08e95 100644 --- a/services/security-agent/app/agents/nodes/llm_responder.py +++ b/services/security-agent/app/agents/nodes/llm_responder.py @@ -6,7 +6,8 @@ import time from typing import Dict, Any, Optional -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_vertexai import ChatVertexAI - Moved to get_llm from langchain_core.messages import HumanMessage, SystemMessage import httpx import structlog @@ -24,7 +25,7 @@ "default": "gemini-2.0-flash", } -_llm_cache: Dict[str, ChatVertexAI] = {} +_llm_cache: Dict[str, Any] = {} def get_model_name(requested: str) -> str: @@ -34,11 +35,13 @@ def get_model_name(requested: str) -> str: return SUPPORTED_MODELS.get(requested.lower().strip(), SUPPORTED_MODELS["default"]) -def get_llm(model_name: str) -> ChatVertexAI: +def get_llm(model_name: str) -> Any: """Get or create LLM instance.""" global _llm_cache if model_name not in _llm_cache: + from langchain_google_vertexai import ChatVertexAI + settings = get_settings() _llm_cache[model_name] = ChatVertexAI( model_name=model_name, diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index bf0eca7..7eb8a36 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -2,7 +2,8 @@ import time import json from datetime import datetime -from langchain_google_vertexai import ChatVertexAI + +# from langchain_google_vertexai import ChatVertexAI - Moved to get_llm to avoid import-time auth from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser from app.agents.state import AgentState @@ -90,6 +91,8 @@ def log_security_event( def get_llm(): global _llm if _llm is None: + from langchain_google_vertexai import ChatVertexAI + _llm = ChatVertexAI(model_name="gemini-2.0-flash-exp") return _llm From fa007b5508a19f664279b6bd0d132fa21698535d Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 22:05:04 -0500 Subject: [PATCH 08/17] Model Mesh --- docker-compose.yml | 2 +- .../app/agents/nodes/llm_responder.py | 22 ++++++++++++++----- .../app/agents/nodes/security.py | 7 +++++- 3 files changed, 23 insertions(+), 8 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 69a87cb..50c2864 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -113,7 +113,7 @@ services: - TOON_CONVERSION_ENABLED=true # LLM Settings - LLM_FORWARD_ENABLED=true - - LLM_MODEL_NAME=gemini-2.0-flash + - LLM_MODEL_NAME=gemini-2.5-flash - LLM_MAX_TOKENS=8192 # Guardian Service URL - GUARDIAN_SERVICE_URL=http://guardian:8002 diff --git a/services/security-agent/app/agents/nodes/llm_responder.py b/services/security-agent/app/agents/nodes/llm_responder.py index 6c08e95..0bf08db 100644 --- a/services/security-agent/app/agents/nodes/llm_responder.py +++ b/services/security-agent/app/agents/nodes/llm_responder.py @@ -19,10 +19,8 @@ # Gemini Models Only (for now) SUPPORTED_MODELS = { - "gemini-2.0-flash": "gemini-2.0-flash", - "gemini-2.0": "gemini-2.0-flash", - "gemini": "gemini-2.0-flash", - "default": "gemini-2.0-flash", + "gemini-2.5-flash": "gemini-2.5-flash", + "default": "gemini-2.5-flash", } _llm_cache: Dict[str, Any] = {} @@ -30,9 +28,21 @@ def get_model_name(requested: str) -> str: """Get the Vertex AI model name.""" + settings = get_settings() + default_model = settings.LLM_MODEL_NAME + if not requested: - return SUPPORTED_MODELS["default"] - return SUPPORTED_MODELS.get(requested.lower().strip(), SUPPORTED_MODELS["default"]) + return default_model + + # Check if exact match in supported models + if requested in SUPPORTED_MODELS: + return SUPPORTED_MODELS[requested] + + # Check if exact match in supported models values + if requested in SUPPORTED_MODELS.values(): + return requested + + return SUPPORTED_MODELS.get(requested.lower().strip(), default_model) def get_llm(model_name: str) -> Any: diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index 7eb8a36..f782ab8 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -93,7 +93,8 @@ def get_llm(): if _llm is None: from langchain_google_vertexai import ChatVertexAI - _llm = ChatVertexAI(model_name="gemini-2.0-flash-exp") + settings = get_settings() + _llm = ChatVertexAI(model_name=settings.LLM_MODEL_NAME) return _llm @@ -197,6 +198,7 @@ async def security_check(state: AgentState) -> Dict[str, Any]: ) # Step 3: Threat Detection + logger.info("Starting Threat Detection...") stage_start = time.perf_counter() threats = [] @@ -224,6 +226,7 @@ async def security_check(state: AgentState) -> Dict[str, Any]: threats.append(path_threat) stage_latency = (time.perf_counter() - stage_start) * 1000 + logger.info(f"Threat Detection completed in {stage_latency:.2f}ms") metrics.record_stage_latency("threat_detection", stage_latency) metrics_builder.add_latency("threat_detection", stage_latency) @@ -266,6 +269,7 @@ async def security_check(state: AgentState) -> Dict[str, Any]: } # Step 4: LLM-based Security Analysis + logger.info("Starting LLM Security Check...") stage_start = time.perf_counter() prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) chain = prompt | get_llm() | JsonOutputParser() @@ -275,6 +279,7 @@ async def security_check(state: AgentState) -> Dict[str, Any]: ) stage_latency = (time.perf_counter() - stage_start) * 1000 + logger.info(f"LLM Security Check completed in {stage_latency:.2f}ms") metrics.record_stage_latency("llm_check", stage_latency) metrics_builder.add_latency("llm_check", stage_latency) From 49412880f985f8b7845d6a8cc5ab4e34acb4b4e4 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 22:37:38 -0500 Subject: [PATCH 09/17] Detailed debugging --- .../app/agents/nodes/llm_responder.py | 39 ++++++++---- .../app/agents/nodes/security.py | 59 +++++++++++++++---- services/security-agent/app/main.py | 14 ++++- 3 files changed, 88 insertions(+), 24 deletions(-) diff --git a/services/security-agent/app/agents/nodes/llm_responder.py b/services/security-agent/app/agents/nodes/llm_responder.py index 0bf08db..8a117be 100644 --- a/services/security-agent/app/agents/nodes/llm_responder.py +++ b/services/security-agent/app/agents/nodes/llm_responder.py @@ -125,8 +125,6 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: logger.info("LLM request", model=model_name) - start_time = time.perf_counter() - try: llm = get_llm(model_name) @@ -135,12 +133,25 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: HumanMessage(content=query), ] - response = await llm.ainvoke(messages) - llm_latency = (time.perf_counter() - start_time) * 1000 + # Using current time for detailed timing and crash storage + logger.info(f"CRASH_DEBUG: LLM Responder invoking model {model_name}...") + llm_start = time.perf_counter() + + try: + response = await llm.ainvoke(messages) + logger.info("CRASH_DEBUG: LLM Responder invocation successful") + except Exception as e: + logger.error(f"CRASH_DEBUG: LLM Responder failed during invocation: {e}") + raise + + llm_latency = (time.perf_counter() - llm_start) * 1000 response_text = ( response.content if hasattr(response, "content") else str(response) ) + logger.info( + f"CRASH_DEBUG: LLM Responder received response (Length: {len(response_text)})" + ) # Token usage input_tokens = output_tokens = 0 @@ -163,13 +174,19 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: guardrails = input_data.get("guardrails", {}) original_query = input_data.get("prompt", "") - guardian_result = await call_guardian( - response_text, - moderation, - output_format, - guardrails=guardrails, - original_query=original_query, - ) + logger.info("CRASH_DEBUG: Calling Guardian for validation...") + try: + guardian_result = await call_guardian( + response_text, + moderation, + output_format, + guardrails=guardrails, + original_query=original_query, + ) + logger.info("CRASH_DEBUG: Guardian call completed") + except Exception as e: + logger.error(f"CRASH_DEBUG: Guardian call threw unexpected exception: {e}") + raise # DEBUG: Log what Guardian returned logger.info( diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index f782ab8..5b64036 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -268,20 +268,55 @@ async def security_check(state: AgentState) -> Dict[str, Any]: "metrics_data": metrics_builder.build(), } - # Step 4: LLM-based Security Analysis - logger.info("Starting LLM Security Check...") - stage_start = time.perf_counter() - prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) - chain = prompt | get_llm() | JsonOutputParser() + # Step 4: LLM Security Analysis (if enabled) + llm_result = {} + if settings.SECURITY_LLM_CHECK_ENABLED: + logger.info("CRASH_DEBUG: Starting LLM Security Check setup") + + # Reconstruct the chain manually as before + try: + from langchain_core.prompts import ChatPromptTemplate + from langchain_core.output_parsers import JsonOutputParser + + prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) + chain = prompt | get_llm() | JsonOutputParser() + logger.info("CRASH_DEBUG: LLM chain initialized successfully") + except Exception as e: + logger.error(f"CRASH_DEBUG: Failed to initialize LLM chain: {e}") + raise + + logger.info( + "CRASH_DEBUG: Preparing LLM input", + input_length=len(user_input), + threshold=settings.SECURITY_LLM_CHECK_THRESHOLD, + ) - llm_result = await chain.ainvoke( - {"input": user_input, "threshold": settings.SECURITY_LLM_CHECK_THRESHOLD} - ) + # Using current time for detailed timing + llm_start = time.perf_counter() + logger.info(f"CRASH_DEBUG: Invoking LLM chain... (Time: {llm_start})") + + try: + # Add a timeout to the LLM call if possible or just log around it + llm_result = await chain.ainvoke( + { + "input": user_input, + "threshold": settings.SECURITY_LLM_CHECK_THRESHOLD, + } + ) + logger.info("CRASH_DEBUG: LLM chain returned successfully") + except Exception as llm_exc: + logger.error(f"CRASH_DEBUG: LLM chain raised exception: {llm_exc}") + # Depending on policy, you might want to block here or set a default score + # For now, re-raise to be caught by the outer try/except + raise - stage_latency = (time.perf_counter() - stage_start) * 1000 - logger.info(f"LLM Security Check completed in {stage_latency:.2f}ms") - metrics.record_stage_latency("llm_check", stage_latency) - metrics_builder.add_latency("llm_check", stage_latency) + check_time_ms = (time.perf_counter() - llm_start) * 1000 + + logger.info( + f"CRASH_DEBUG: LLM Security Check completed in {check_time_ms:.2f}ms" + ) + metrics.record_stage_latency("llm_check", check_time_ms) + metrics_builder.add_latency("llm_check", check_time_ms) score = llm_result.get("security_score", 0.0) is_blocked = llm_result.get("is_blocked", False) diff --git a/services/security-agent/app/main.py b/services/security-agent/app/main.py index f8cadd7..b6ae4b5 100644 --- a/services/security-agent/app/main.py +++ b/services/security-agent/app/main.py @@ -32,16 +32,19 @@ async def lifespan(app: FastAPI): print("DEBUG: Importing app.agents.graph...", flush=True) try: from app.agents.graph import agent_graph + print("DEBUG: Imported app.agents.graph successfully", flush=True) except Exception as e: print(f"DEBUG: Failed to import app.agents.graph: {e}", flush=True) import traceback + traceback.print_exc() raise print("DEBUG: Importing schemas and metrics...", flush=True) from app.schemas.security import ChatRequest, ChatResponse from app.core.metrics import get_security_metrics + print("DEBUG: Imports completed successfully", flush=True) @@ -60,6 +63,9 @@ async def chat(request: ChatRequest): All metrics automatically sent to Datadog via OTel. """ + import time + + print(f"CRASH_DEBUG: Chat request endpoint entered at {time.time()}", flush=True) logger.info( "Chat request received", client_ip=request.client_ip, @@ -77,7 +83,13 @@ async def chat(request: ChatRequest): "metrics_data": None, } - result = await agent_graph.ainvoke(initial_state) + print("CRASH_DEBUG: Invoking agent graph...", flush=True) + try: + result = await agent_graph.ainvoke(initial_state) + print("CRASH_DEBUG: Agent graph returned successfully", flush=True) + except Exception as e: + print(f"CRASH_DEBUG: Agent graph failed: {e}", flush=True) + raise if result.get("is_blocked"): logger.warning("Request blocked", reason=result.get("block_reason")) From 7c515abd6ae744306217bee8d677321464b79f46 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 22:50:08 -0500 Subject: [PATCH 10/17] Setting failure --- services/security-agent/app/core/config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/services/security-agent/app/core/config.py b/services/security-agent/app/core/config.py index 8ac6a8d..7ac7688 100644 --- a/services/security-agent/app/core/config.py +++ b/services/security-agent/app/core/config.py @@ -23,6 +23,7 @@ class Settings(BaseSettings): SECURITY_XSS_PROTECTION_ENABLED: bool = True SECURITY_SQL_INJECTION_DETECTION_ENABLED: bool = True SECURITY_COMMAND_INJECTION_DETECTION_ENABLED: bool = True + SECURITY_LLM_CHECK_ENABLED: bool = True SECURITY_LLM_CHECK_THRESHOLD: float = 0.85 # TOON Conversion Settings From 09a86bc71278fced0dbd6f5373f495dfdf4d40ad Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 23:03:33 -0500 Subject: [PATCH 11/17] Security Debugging --- .../app/agents/nodes/security.py | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index 5b64036..59a0709 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -275,11 +275,30 @@ async def security_check(state: AgentState) -> Dict[str, Any]: # Reconstruct the chain manually as before try: + logger.info("CRASH_DEBUG: About to import ChatPromptTemplate") from langchain_core.prompts import ChatPromptTemplate + + logger.info("CRASH_DEBUG: ChatPromptTemplate imported successfully") + + logger.info("CRASH_DEBUG: About to import JsonOutputParser") from langchain_core.output_parsers import JsonOutputParser + logger.info("CRASH_DEBUG: JsonOutputParser imported successfully") + + logger.info("CRASH_DEBUG: About to create prompt template") prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) - chain = prompt | get_llm() | JsonOutputParser() + logger.info("CRASH_DEBUG: Prompt template created successfully") + + logger.info("CRASH_DEBUG: About to call get_llm()") + llm = get_llm() + logger.info("CRASH_DEBUG: get_llm() returned successfully") + + logger.info("CRASH_DEBUG: About to create output parser") + parser = JsonOutputParser() + logger.info("CRASH_DEBUG: Output parser created successfully") + + logger.info("CRASH_DEBUG: About to create chain with pipe operator") + chain = prompt | llm | parser logger.info("CRASH_DEBUG: LLM chain initialized successfully") except Exception as e: logger.error(f"CRASH_DEBUG: Failed to initialize LLM chain: {e}") From b6f7adc7870d97a374ca3b60c4200e41a8964c90 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 23:13:31 -0500 Subject: [PATCH 12/17] Security Debugging --- services/security-agent/app/agents/nodes/security.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index 59a0709..912ca10 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -89,12 +89,20 @@ def log_security_event( def get_llm(): + """Get or create the LLM instance (singleton pattern).""" global _llm if _llm is None: from langchain_google_vertexai import ChatVertexAI settings = get_settings() + + logger.info( + f"CRASH_DEBUG: get_llm() - About to create ChatVertexAI with model={settings.LLM_MODEL_NAME}" + ) _llm = ChatVertexAI(model_name=settings.LLM_MODEL_NAME) + logger.info("CRASH_DEBUG: get_llm() - ChatVertexAI created successfully") + else: + logger.info("CRASH_DEBUG: get_llm() - Returning cached LLM instance") return _llm From f025c8819199484c11d10af4e9a42b7b2f9bf747 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 23:23:38 -0500 Subject: [PATCH 13/17] Security Debugging --- .../app/agents/nodes/security.py | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index 912ca10..a0c5d0c 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -90,17 +90,30 @@ def log_security_event( def get_llm(): """Get or create the LLM instance (singleton pattern).""" + import asyncio + global _llm if _llm is None: + logger.info("CRASH_DEBUG: get_llm() - Entering function, _llm is None") from langchain_google_vertexai import ChatVertexAI settings = get_settings() logger.info( - f"CRASH_DEBUG: get_llm() - About to create ChatVertexAI with model={settings.LLM_MODEL_NAME}" + f"CRASH_DEBUG: get_llm() - About to create ChatVertexAI with model={settings.LLM_MODEL_NAME}, project={settings.GCP_PROJECT_ID}" ) - _llm = ChatVertexAI(model_name=settings.LLM_MODEL_NAME) - logger.info("CRASH_DEBUG: get_llm() - ChatVertexAI created successfully") + + try: + # Create with explicit project and location to avoid metadata API calls + _llm = ChatVertexAI( + model_name=settings.LLM_MODEL_NAME, + project=settings.GCP_PROJECT_ID, + location=settings.GCP_LOCATION, + ) + logger.info("CRASH_DEBUG: get_llm() - ChatVertexAI created successfully") + except Exception as e: + logger.error(f"CRASH_DEBUG: get_llm() - ChatVertexAI creation failed: {e}") + raise else: logger.info("CRASH_DEBUG: get_llm() - Returning cached LLM instance") return _llm From 210816ebe1f9bbbdb2af83124188d37581e47eb9 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Fri, 19 Dec 2025 23:33:57 -0500 Subject: [PATCH 14/17] Security Debugging --- services/security-agent/app/agents/nodes/security.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index a0c5d0c..0fbff0d 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -95,8 +95,13 @@ def get_llm(): global _llm if _llm is None: logger.info("CRASH_DEBUG: get_llm() - Entering function, _llm is None") + logger.info( + "CRASH_DEBUG: get_llm() - About to import ChatVertexAI from langchain_google_vertexai" + ) from langchain_google_vertexai import ChatVertexAI + logger.info("CRASH_DEBUG: get_llm() - ChatVertexAI imported successfully") + settings = get_settings() logger.info( From d3e718aabd1c7ff9b99b10cd4408023d98ef791c Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Sat, 20 Dec 2025 00:02:14 -0500 Subject: [PATCH 15/17] Security Debugging --- services/security-agent/app/core/config.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/services/security-agent/app/core/config.py b/services/security-agent/app/core/config.py index 7ac7688..8539a15 100644 --- a/services/security-agent/app/core/config.py +++ b/services/security-agent/app/core/config.py @@ -23,7 +23,9 @@ class Settings(BaseSettings): SECURITY_XSS_PROTECTION_ENABLED: bool = True SECURITY_SQL_INJECTION_DETECTION_ENABLED: bool = True SECURITY_COMMAND_INJECTION_DETECTION_ENABLED: bool = True - SECURITY_LLM_CHECK_ENABLED: bool = True + SECURITY_LLM_CHECK_ENABLED: bool = ( + False # Disabled due to langchain_google_vertexai import hang + ) SECURITY_LLM_CHECK_THRESHOLD: float = 0.85 # TOON Conversion Settings From 6147b3f5baace89c27fa0d9e6832411c457fd98f Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Sun, 21 Dec 2025 13:24:02 -0500 Subject: [PATCH 16/17] GenAI --- docker-compose.yml | 10 +--- .../app/agents/nodes/content_filter.py | 11 +++-- .../agents/nodes/hallucination_detector.py | 13 +++-- .../guardian/app/agents/nodes/tone_checker.py | 13 +++-- services/guardian/app/core/config.py | 6 +-- services/guardian/pyproject.toml | 4 +- .../app/agents/nodes/llm_responder.py | 49 ++++++------------- .../app/agents/nodes/security.py | 35 +++---------- services/security-agent/app/core/config.py | 10 ++-- services/security-agent/pyproject.toml | 4 +- 10 files changed, 61 insertions(+), 94 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 50c2864..c83c627 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -98,9 +98,7 @@ services: - DD_LOGS_INJECTION=true - DD_PROFILING_ENABLED=true - DD_RUNTIME_METRICS_ENABLED=true - - GCP_PROJECT_ID=${GCP_PROJECT_ID} - - GCP_LOCATION=${GCP_LOCATION:-us-east1} - - GOOGLE_APPLICATION_CREDENTIALS=/app/gcp-credentials.json + - GEMINI_API_KEY=${GEMINI_API_KEY} - TELEMETRY_ENABLED=true # Security Settings - SECURITY_SANITIZATION_ENABLED=true @@ -122,7 +120,6 @@ services: - guardian volumes: - ./services/security-agent/app:/app/app - - ./gcp-credentials.json:/app/gcp-credentials.json:ro - dogstatsd-socket:/var/run/datadog labels: com.datadoghq.tags.service: "clestiq-shield-sentinel" @@ -154,9 +151,7 @@ services: - DD_LOGS_INJECTION=true - DD_PROFILING_ENABLED=true - DD_RUNTIME_METRICS_ENABLED=true - - GCP_PROJECT_ID=${GCP_PROJECT_ID} - - GCP_LOCATION=${GCP_LOCATION:-us-east1} - - GOOGLE_APPLICATION_CREDENTIALS=/app/gcp-credentials.json + - GEMINI_API_KEY=${GEMINI_API_KEY} - TELEMETRY_ENABLED=true # Moderation Settings - DEFAULT_MODERATION_MODE=moderate @@ -168,7 +163,6 @@ services: - datadog-agent volumes: - ./services/guardian/app:/app/app - - ./gcp-credentials.json:/app/gcp-credentials.json:ro - dogstatsd-socket:/var/run/datadog labels: com.datadoghq.tags.service: "clestiq-shield-guardian" diff --git a/services/guardian/app/agents/nodes/content_filter.py b/services/guardian/app/agents/nodes/content_filter.py index 144fe09..79a429c 100644 --- a/services/guardian/app/agents/nodes/content_filter.py +++ b/services/guardian/app/agents/nodes/content_filter.py @@ -9,7 +9,7 @@ import time from typing import Dict, Any -# from langchain_google_vertexai import ChatVertexAI - Moved to get_content_llm +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_content_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -52,9 +52,14 @@ def get_content_llm(): global _content_llm if _content_llm is None: - from langchain_google_vertexai import ChatVertexAI + from langchain_google_genai import ChatGoogleGenerativeAI - _content_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) + settings = get_settings() + _content_llm = ChatGoogleGenerativeAI( + model="gemini-2.0-flash-exp", + google_api_key=settings.GEMINI_API_KEY, + temperature=0, + ) return _content_llm diff --git a/services/guardian/app/agents/nodes/hallucination_detector.py b/services/guardian/app/agents/nodes/hallucination_detector.py index 02801dc..b22bd11 100644 --- a/services/guardian/app/agents/nodes/hallucination_detector.py +++ b/services/guardian/app/agents/nodes/hallucination_detector.py @@ -8,7 +8,7 @@ import time from typing import Dict, Any -# from langchain_google_vertexai import ChatVertexAI - Moved to get_judge_llm +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_judge_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -24,9 +24,16 @@ def get_judge_llm(): global _judge_llm if _judge_llm is None: - from langchain_google_vertexai import ChatVertexAI + from langchain_google_genai import ChatGoogleGenerativeAI - _judge_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) + from app.core.config import get_settings + + settings = get_settings() + _judge_llm = ChatGoogleGenerativeAI( + model="gemini-2.0-flash-exp", + google_api_key=settings.GEMINI_API_KEY, + temperature=0, + ) return _judge_llm diff --git a/services/guardian/app/agents/nodes/tone_checker.py b/services/guardian/app/agents/nodes/tone_checker.py index 7361609..2295ec7 100644 --- a/services/guardian/app/agents/nodes/tone_checker.py +++ b/services/guardian/app/agents/nodes/tone_checker.py @@ -8,7 +8,7 @@ import time from typing import Dict, Any -# from langchain_google_vertexai import ChatVertexAI - Moved to get_tone_llm +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_tone_llm from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser import structlog @@ -23,9 +23,16 @@ def get_tone_llm(): global _tone_llm if _tone_llm is None: - from langchain_google_vertexai import ChatVertexAI + from langchain_google_genai import ChatGoogleGenerativeAI - _tone_llm = ChatVertexAI(model_name="gemini-2.0-flash-exp", temperature=0) + from app.core.config import get_settings + + settings = get_settings() + _tone_llm = ChatGoogleGenerativeAI( + model="gemini-2.0-flash-exp", + google_api_key=settings.GEMINI_API_KEY, + temperature=0, + ) return _tone_llm diff --git a/services/guardian/app/core/config.py b/services/guardian/app/core/config.py index 4dfbcfe..ee14db7 100644 --- a/services/guardian/app/core/config.py +++ b/services/guardian/app/core/config.py @@ -12,10 +12,8 @@ class Settings(BaseSettings): DD_ENV: str = "production" DD_VERSION: str = "1.0.0" - # Google Cloud / Vertex AI - GCP_PROJECT_ID: str - GCP_LOCATION: str = "us-east1" - GOOGLE_APPLICATION_CREDENTIALS: str | None = None + # Gemini AI Studio + GEMINI_API_KEY: str # Moderation Settings DEFAULT_MODERATION_MODE: str = "moderate" # strict, moderate, relaxed, raw diff --git a/services/guardian/pyproject.toml b/services/guardian/pyproject.toml index 7c91ab4..e74810d 100644 --- a/services/guardian/pyproject.toml +++ b/services/guardian/pyproject.toml @@ -12,8 +12,8 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -langchain-core = "^1.1.0" -langchain-google-vertexai = "3.1.0" +langchain-core = "^1.2.2" +langchain-google-genai = "^4.1.2" langgraph = "^1.0.0" bleach = "^6.1.0" httpx = "^0.28.0" diff --git a/services/security-agent/app/agents/nodes/llm_responder.py b/services/security-agent/app/agents/nodes/llm_responder.py index 8a117be..8b76877 100644 --- a/services/security-agent/app/agents/nodes/llm_responder.py +++ b/services/security-agent/app/agents/nodes/llm_responder.py @@ -1,13 +1,13 @@ """ LLM Responder Node. -Routes queries to Gemini models via Vertex AI. +Routes queries to Gemini models via Gemini AI Studio. """ import time from typing import Dict, Any, Optional -# from langchain_google_vertexai import ChatVertexAI - Moved to get_llm +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_llm from langchain_core.messages import HumanMessage, SystemMessage import httpx import structlog @@ -27,7 +27,7 @@ def get_model_name(requested: str) -> str: - """Get the Vertex AI model name.""" + """Get the Gemini AI model name.""" settings = get_settings() default_model = settings.LLM_MODEL_NAME @@ -50,14 +50,13 @@ def get_llm(model_name: str) -> Any: global _llm_cache if model_name not in _llm_cache: - from langchain_google_vertexai import ChatVertexAI + from langchain_google_genai import ChatGoogleGenerativeAI settings = get_settings() - _llm_cache[model_name] = ChatVertexAI( - model_name=model_name, + _llm_cache[model_name] = ChatGoogleGenerativeAI( + model=model_name, + google_api_key=settings.GEMINI_API_KEY, max_tokens=settings.LLM_MAX_TOKENS, - project=settings.GCP_PROJECT_ID, - location=settings.GCP_LOCATION, ) logger.info("Created LLM", model=model_name) @@ -133,25 +132,13 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: HumanMessage(content=query), ] - # Using current time for detailed timing and crash storage - logger.info(f"CRASH_DEBUG: LLM Responder invoking model {model_name}...") llm_start = time.perf_counter() - - try: - response = await llm.ainvoke(messages) - logger.info("CRASH_DEBUG: LLM Responder invocation successful") - except Exception as e: - logger.error(f"CRASH_DEBUG: LLM Responder failed during invocation: {e}") - raise - + response = await llm.ainvoke(messages) llm_latency = (time.perf_counter() - llm_start) * 1000 response_text = ( response.content if hasattr(response, "content") else str(response) ) - logger.info( - f"CRASH_DEBUG: LLM Responder received response (Length: {len(response_text)})" - ) # Token usage input_tokens = output_tokens = 0 @@ -174,19 +161,13 @@ async def llm_responder_node(state: Dict[str, Any]) -> Dict[str, Any]: guardrails = input_data.get("guardrails", {}) original_query = input_data.get("prompt", "") - logger.info("CRASH_DEBUG: Calling Guardian for validation...") - try: - guardian_result = await call_guardian( - response_text, - moderation, - output_format, - guardrails=guardrails, - original_query=original_query, - ) - logger.info("CRASH_DEBUG: Guardian call completed") - except Exception as e: - logger.error(f"CRASH_DEBUG: Guardian call threw unexpected exception: {e}") - raise + guardian_result = await call_guardian( + response_text, + moderation, + output_format, + guardrails=guardrails, + original_query=original_query, + ) # DEBUG: Log what Guardian returned logger.info( diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index 0fbff0d..fdb21db 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -3,7 +3,7 @@ import json from datetime import datetime -# from langchain_google_vertexai import ChatVertexAI - Moved to get_llm to avoid import-time auth +# from langchain_google_genai import ChatGoogleGenerativeAI - Moved to get_llm to avoid import-time issues from langchain_core.prompts import ChatPromptTemplate from langchain_core.output_parsers import JsonOutputParser from app.agents.state import AgentState @@ -84,43 +84,22 @@ def log_security_event( Output ONLY JSON. """ -# Initialize LLM lazily to avoid import-time auth errors +# Initialize LLM lazily to avoid import-time issues _llm = None def get_llm(): """Get or create the LLM instance (singleton pattern).""" - import asyncio - global _llm if _llm is None: - logger.info("CRASH_DEBUG: get_llm() - Entering function, _llm is None") - logger.info( - "CRASH_DEBUG: get_llm() - About to import ChatVertexAI from langchain_google_vertexai" - ) - from langchain_google_vertexai import ChatVertexAI - - logger.info("CRASH_DEBUG: get_llm() - ChatVertexAI imported successfully") + from langchain_google_genai import ChatGoogleGenerativeAI settings = get_settings() - - logger.info( - f"CRASH_DEBUG: get_llm() - About to create ChatVertexAI with model={settings.LLM_MODEL_NAME}, project={settings.GCP_PROJECT_ID}" + _llm = ChatGoogleGenerativeAI( + model=settings.LLM_MODEL_NAME, + google_api_key=settings.GEMINI_API_KEY, ) - - try: - # Create with explicit project and location to avoid metadata API calls - _llm = ChatVertexAI( - model_name=settings.LLM_MODEL_NAME, - project=settings.GCP_PROJECT_ID, - location=settings.GCP_LOCATION, - ) - logger.info("CRASH_DEBUG: get_llm() - ChatVertexAI created successfully") - except Exception as e: - logger.error(f"CRASH_DEBUG: get_llm() - ChatVertexAI creation failed: {e}") - raise - else: - logger.info("CRASH_DEBUG: get_llm() - Returning cached LLM instance") + logger.info("LLM initialized", model=settings.LLM_MODEL_NAME) return _llm diff --git a/services/security-agent/app/core/config.py b/services/security-agent/app/core/config.py index 8539a15..9c92846 100644 --- a/services/security-agent/app/core/config.py +++ b/services/security-agent/app/core/config.py @@ -12,10 +12,8 @@ class Settings(BaseSettings): DD_ENV: str = "production" DD_VERSION: str = "1.0.0" - # Google Cloud / Vertex AI - GCP_PROJECT_ID: str - GCP_LOCATION: str = "us-east1" - GOOGLE_APPLICATION_CREDENTIALS: str | None = None + # Gemini AI Studio + GEMINI_API_KEY: str # Security Settings SECURITY_SANITIZATION_ENABLED: bool = True @@ -23,9 +21,7 @@ class Settings(BaseSettings): SECURITY_XSS_PROTECTION_ENABLED: bool = True SECURITY_SQL_INJECTION_DETECTION_ENABLED: bool = True SECURITY_COMMAND_INJECTION_DETECTION_ENABLED: bool = True - SECURITY_LLM_CHECK_ENABLED: bool = ( - False # Disabled due to langchain_google_vertexai import hang - ) + SECURITY_LLM_CHECK_ENABLED: bool = True SECURITY_LLM_CHECK_THRESHOLD: float = 0.85 # TOON Conversion Settings diff --git a/services/security-agent/pyproject.toml b/services/security-agent/pyproject.toml index dd5487e..1b419f8 100644 --- a/services/security-agent/pyproject.toml +++ b/services/security-agent/pyproject.toml @@ -12,8 +12,8 @@ uvicorn = {extras = ["standard"], version = "^0.27.0"} pydantic-settings = "^2.1.0" pydantic = "^2.0.0" structlog = "^24.1.0" -langchain-core = "^1.1.0" -langchain-google-vertexai = "3.1.0" +langchain-core = "^1.2.2" +langchain-google-genai = "^4.1.2" langgraph = "^1.0.0" bleach = "^6.1.0" email-validator = "^2.1.0" From 4558cf6a9b74fcb0745c39121a8b658406b40264 Mon Sep 17 00:00:00 2001 From: Vasu Vinodbhai Bhut Date: Sun, 21 Dec 2025 13:47:22 -0500 Subject: [PATCH 17/17] GenAi error fixed, AgentCore is ready --- .../app/agents/nodes/security.py | 35 ++----------------- services/security-agent/app/main.py | 20 +++-------- 2 files changed, 7 insertions(+), 48 deletions(-) diff --git a/services/security-agent/app/agents/nodes/security.py b/services/security-agent/app/agents/nodes/security.py index fdb21db..37aef6b 100644 --- a/services/security-agent/app/agents/nodes/security.py +++ b/services/security-agent/app/agents/nodes/security.py @@ -276,48 +276,22 @@ async def security_check(state: AgentState) -> Dict[str, Any]: # Step 4: LLM Security Analysis (if enabled) llm_result = {} if settings.SECURITY_LLM_CHECK_ENABLED: - logger.info("CRASH_DEBUG: Starting LLM Security Check setup") - # Reconstruct the chain manually as before try: - logger.info("CRASH_DEBUG: About to import ChatPromptTemplate") from langchain_core.prompts import ChatPromptTemplate - - logger.info("CRASH_DEBUG: ChatPromptTemplate imported successfully") - - logger.info("CRASH_DEBUG: About to import JsonOutputParser") from langchain_core.output_parsers import JsonOutputParser - logger.info("CRASH_DEBUG: JsonOutputParser imported successfully") - - logger.info("CRASH_DEBUG: About to create prompt template") prompt = ChatPromptTemplate.from_template(SECURITY_PROMPT) - logger.info("CRASH_DEBUG: Prompt template created successfully") - - logger.info("CRASH_DEBUG: About to call get_llm()") llm = get_llm() - logger.info("CRASH_DEBUG: get_llm() returned successfully") - - logger.info("CRASH_DEBUG: About to create output parser") parser = JsonOutputParser() - logger.info("CRASH_DEBUG: Output parser created successfully") - logger.info("CRASH_DEBUG: About to create chain with pipe operator") chain = prompt | llm | parser - logger.info("CRASH_DEBUG: LLM chain initialized successfully") except Exception as e: - logger.error(f"CRASH_DEBUG: Failed to initialize LLM chain: {e}") + logger.error(f"Failed to initialize LLM chain: {e}") raise - logger.info( - "CRASH_DEBUG: Preparing LLM input", - input_length=len(user_input), - threshold=settings.SECURITY_LLM_CHECK_THRESHOLD, - ) - # Using current time for detailed timing llm_start = time.perf_counter() - logger.info(f"CRASH_DEBUG: Invoking LLM chain... (Time: {llm_start})") try: # Add a timeout to the LLM call if possible or just log around it @@ -327,18 +301,13 @@ async def security_check(state: AgentState) -> Dict[str, Any]: "threshold": settings.SECURITY_LLM_CHECK_THRESHOLD, } ) - logger.info("CRASH_DEBUG: LLM chain returned successfully") except Exception as llm_exc: - logger.error(f"CRASH_DEBUG: LLM chain raised exception: {llm_exc}") + logger.error(f"LLM chain raised exception: {llm_exc}") # Depending on policy, you might want to block here or set a default score # For now, re-raise to be caught by the outer try/except raise check_time_ms = (time.perf_counter() - llm_start) * 1000 - - logger.info( - f"CRASH_DEBUG: LLM Security Check completed in {check_time_ms:.2f}ms" - ) metrics.record_stage_latency("llm_check", check_time_ms) metrics_builder.add_latency("llm_check", check_time_ms) diff --git a/services/security-agent/app/main.py b/services/security-agent/app/main.py index b6ae4b5..a07cfa3 100644 --- a/services/security-agent/app/main.py +++ b/services/security-agent/app/main.py @@ -18,35 +18,26 @@ async def lifespan(app: FastAPI): app = FastAPI(title=settings.PROJECT_NAME, version=settings.VERSION, lifespan=lifespan) -print("DEBUG: Setting up telemetry...", flush=True) # Setup telemetry IMMEDIATELY setup_telemetry(app) -print("DEBUG: Telemetry setup complete", flush=True) # Initialize global logger after telemetry -print("DEBUG: Initializing structlog...", flush=True) +# Initialize global logger after telemetry logger = structlog.get_logger() -print("DEBUG: Structlog initialized", flush=True) # Import modules AFTER logging is configured -print("DEBUG: Importing app.agents.graph...", flush=True) +# Import modules AFTER logging is configured try: from app.agents.graph import agent_graph - - print("DEBUG: Imported app.agents.graph successfully", flush=True) except Exception as e: - print(f"DEBUG: Failed to import app.agents.graph: {e}", flush=True) import traceback traceback.print_exc() raise -print("DEBUG: Importing schemas and metrics...", flush=True) from app.schemas.security import ChatRequest, ChatResponse from app.core.metrics import get_security_metrics -print("DEBUG: Imports completed successfully", flush=True) - @app.get("/health") async def health_check(): @@ -65,7 +56,8 @@ async def chat(request: ChatRequest): """ import time - print(f"CRASH_DEBUG: Chat request endpoint entered at {time.time()}", flush=True) + import time + logger.info( "Chat request received", client_ip=request.client_ip, @@ -83,12 +75,10 @@ async def chat(request: ChatRequest): "metrics_data": None, } - print("CRASH_DEBUG: Invoking agent graph...", flush=True) try: result = await agent_graph.ainvoke(initial_state) - print("CRASH_DEBUG: Agent graph returned successfully", flush=True) except Exception as e: - print(f"CRASH_DEBUG: Agent graph failed: {e}", flush=True) + logger.error(f"Agent graph execution failed: {e}") raise if result.get("is_blocked"):