diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index 2500947c8..2448e2913 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -2,6 +2,7 @@ from celery import Celery from celery.signals import ( + worker_init, task_failure, task_postrun, task_prerun, @@ -16,6 +17,57 @@ configure_logging(service_name="kaapi-celery") logger = logging.getLogger(__name__) +_telemetry_initialized = False +_sentry_initialized = False +_flush_hook_registered = False + + +def _initialize_worker_observability() -> None: + global _telemetry_initialized, _sentry_initialized, _flush_hook_registered + + if settings.SENTRY_DSN and not _sentry_initialized: + import sentry_sdk + from sentry_sdk.integrations.celery import CeleryIntegration + from sentry_sdk.integrations.httpx import HttpxIntegration + from sentry_sdk.integrations.logging import LoggingIntegration + from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration + + sentry_sdk.init( + dsn=str(settings.SENTRY_DSN), + environment=settings.ENVIRONMENT, + release=settings.API_VERSION, + instrumenter="otel", + traces_sample_rate=1.0, + enable_logs=True, + before_send_transaction=before_send_transaction_filter, + integrations=[ + LoggingIntegration( + level=logging.INFO, + sentry_logs_level=logging.INFO, + ), + ], + disabled_integrations=[ + CeleryIntegration(), + SqlalchemyIntegration(), + HttpxIntegration(), + ], + ) + _sentry_initialized = True + + if not _telemetry_initialized: + from app.core.telemetry import flush_telemetry, setup_telemetry + + setup_telemetry(service_name="kaapi-celery") + + if not _flush_hook_registered: + + @task_postrun.connect(weak=False) + def _flush_otel_after_task(**_: object) -> None: + flush_telemetry() + + _flush_hook_registered = True + + _telemetry_initialized = True @task_prerun.connect @@ -84,43 +136,15 @@ def initialize_worker_process(**_) -> None: - Set up OpenTelemetry so Celery tasks emit spans bridged into Sentry. - Pre-import LLM modules so first-call latency doesn't pay a cold-import cost. """ - if settings.SENTRY_DSN: - import sentry_sdk - from sentry_sdk.integrations.celery import CeleryIntegration - from sentry_sdk.integrations.httpx import HttpxIntegration - from sentry_sdk.integrations.logging import LoggingIntegration - from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration - - sentry_sdk.init( - dsn=str(settings.SENTRY_DSN), - environment=settings.ENVIRONMENT, - release=settings.API_VERSION, - instrumenter="otel", - traces_sample_rate=1.0, - enable_logs=True, - before_send_transaction=before_send_transaction_filter, - integrations=[ - LoggingIntegration( - level=logging.INFO, - sentry_logs_level=logging.INFO, - ), - ], - disabled_integrations=[ - CeleryIntegration(), - SqlalchemyIntegration(), - HttpxIntegration(), - ], - ) - - from app.core.telemetry import flush_telemetry, setup_telemetry + _initialize_worker_observability() - setup_telemetry(service_name="kaapi-celery") + import app.services.llm.jobs # noqa: F401 - @task_postrun.connect(weak=False) - def _flush_otel_after_task(**_: object) -> None: - flush_telemetry() - import app.services.llm.jobs # noqa: F401 +@worker_init.connect +def initialize_worker(**_) -> None: + """Initialize worker observability for non-prefork pools (e.g. gevent).""" + _initialize_worker_observability() # Create Celery instance diff --git a/backend/app/core/sentry_filters.py b/backend/app/core/sentry_filters.py index 07c6b9ef3..30bc60923 100644 --- a/backend/app/core/sentry_filters.py +++ b/backend/app/core/sentry_filters.py @@ -5,6 +5,42 @@ _SQL_OR_CONNECT = re.compile(r"^(select|insert|update|delete|connect)\b", re.IGNORECASE) _HTTP_SEND_RECEIVE = re.compile(r"http (send|receive)$", re.IGNORECASE) _DB_QUERY_SPAN = re.compile(r"^db\.query$", re.IGNORECASE) +_BARE_HTTP_METHOD = re.compile(r"^(GET|HEAD|OPTIONS)$", re.IGNORECASE) +_NOISE_PATH = re.compile( + r"(^/health/?$|^/robots\.txt$|^/favicon\.ico$|^/wp-admin|^/wp-login|^/xmlrpc\.php$)", + re.IGNORECASE, +) + + +def _extract_path(event: dict[str, Any]) -> str: + request = event.get("request") + if not isinstance(request, dict): + return "" + + url = request.get("url") + if isinstance(url, str) and url: + if "://" in url: + after_scheme = url.split("://", 1)[1] + if "/" in after_scheme: + return "/" + after_scheme.split("/", 1)[1].split("?", 1)[0] + return "/" + return url.split("?", 1)[0] + return "" + + +def _should_drop_transaction(event: dict[str, Any]) -> bool: + transaction = str(event.get("transaction") or "").strip() + path = _extract_path(event) + + # Sentry shows probe traffic as bare "GET"/"HEAD" transactions. + if _BARE_HTTP_METHOD.match(transaction): + return True + + # Drop known non-app noise paths (probes / scanners). + if path and _NOISE_PATH.search(path): + return True + + return False def before_send_transaction_filter( @@ -18,6 +54,9 @@ def before_send_transaction_filter( - SQL / `connect` spans matched by description prefix - Custom DB query spans (`db.query`) """ + if _should_drop_transaction(event): + return None + spans = event.get("spans") if not isinstance(spans, list): return event