Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 58 additions & 34 deletions backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from celery import Celery
from celery.signals import (
worker_init,
task_failure,
task_postrun,
task_prerun,
Expand All @@ -16,6 +17,57 @@
configure_logging(service_name="kaapi-celery")

logger = logging.getLogger(__name__)
_telemetry_initialized = False
_sentry_initialized = False
_flush_hook_registered = False


def _initialize_worker_observability() -> None:
global _telemetry_initialized, _sentry_initialized, _flush_hook_registered

if settings.SENTRY_DSN and not _sentry_initialized:
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.httpx import HttpxIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

sentry_sdk.init(
dsn=str(settings.SENTRY_DSN),
environment=settings.ENVIRONMENT,
release=settings.API_VERSION,
instrumenter="otel",
traces_sample_rate=1.0,
enable_logs=True,
before_send_transaction=before_send_transaction_filter,
integrations=[
LoggingIntegration(
level=logging.INFO,
sentry_logs_level=logging.INFO,
),
],
disabled_integrations=[
CeleryIntegration(),
SqlalchemyIntegration(),
HttpxIntegration(),
],
)
_sentry_initialized = True

if not _telemetry_initialized:
from app.core.telemetry import flush_telemetry, setup_telemetry

setup_telemetry(service_name="kaapi-celery")

if not _flush_hook_registered:

@task_postrun.connect(weak=False)
def _flush_otel_after_task(**_: object) -> None:
flush_telemetry()

_flush_hook_registered = True

_telemetry_initialized = True


@task_prerun.connect
Expand Down Expand Up @@ -84,43 +136,15 @@ def initialize_worker_process(**_) -> None:
- Set up OpenTelemetry so Celery tasks emit spans bridged into Sentry.
- Pre-import LLM modules so first-call latency doesn't pay a cold-import cost.
"""
if settings.SENTRY_DSN:
import sentry_sdk
from sentry_sdk.integrations.celery import CeleryIntegration
from sentry_sdk.integrations.httpx import HttpxIntegration
from sentry_sdk.integrations.logging import LoggingIntegration
from sentry_sdk.integrations.sqlalchemy import SqlalchemyIntegration

sentry_sdk.init(
dsn=str(settings.SENTRY_DSN),
environment=settings.ENVIRONMENT,
release=settings.API_VERSION,
instrumenter="otel",
traces_sample_rate=1.0,
enable_logs=True,
before_send_transaction=before_send_transaction_filter,
integrations=[
LoggingIntegration(
level=logging.INFO,
sentry_logs_level=logging.INFO,
),
],
disabled_integrations=[
CeleryIntegration(),
SqlalchemyIntegration(),
HttpxIntegration(),
],
)

from app.core.telemetry import flush_telemetry, setup_telemetry
_initialize_worker_observability()

setup_telemetry(service_name="kaapi-celery")
import app.services.llm.jobs # noqa: F401

@task_postrun.connect(weak=False)
def _flush_otel_after_task(**_: object) -> None:
flush_telemetry()

import app.services.llm.jobs # noqa: F401
@worker_init.connect
def initialize_worker(**_) -> None:
"""Initialize worker observability for non-prefork pools (e.g. gevent)."""
_initialize_worker_observability()


# Create Celery instance
Expand Down
39 changes: 39 additions & 0 deletions backend/app/core/sentry_filters.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,42 @@
_SQL_OR_CONNECT = re.compile(r"^(select|insert|update|delete|connect)\b", re.IGNORECASE)
_HTTP_SEND_RECEIVE = re.compile(r"http (send|receive)$", re.IGNORECASE)
_DB_QUERY_SPAN = re.compile(r"^db\.query$", re.IGNORECASE)
_BARE_HTTP_METHOD = re.compile(r"^(GET|HEAD|OPTIONS)$", re.IGNORECASE)
_NOISE_PATH = re.compile(
r"(^/health/?$|^/robots\.txt$|^/favicon\.ico$|^/wp-admin|^/wp-login|^/xmlrpc\.php$)",
re.IGNORECASE,
)


def _extract_path(event: dict[str, Any]) -> str:
request = event.get("request")
if not isinstance(request, dict):
return ""

url = request.get("url")
if isinstance(url, str) and url:
if "://" in url:
after_scheme = url.split("://", 1)[1]
if "/" in after_scheme:
return "/" + after_scheme.split("/", 1)[1].split("?", 1)[0]
return "/"
return url.split("?", 1)[0]
return ""


def _should_drop_transaction(event: dict[str, Any]) -> bool:
transaction = str(event.get("transaction") or "").strip()
path = _extract_path(event)

# Sentry shows probe traffic as bare "GET"/"HEAD" transactions.
if _BARE_HTTP_METHOD.match(transaction):
return True

# Drop known non-app noise paths (probes / scanners).
if path and _NOISE_PATH.search(path):
return True

return False


def before_send_transaction_filter(
Expand All @@ -18,6 +54,9 @@ def before_send_transaction_filter(
- SQL / `connect` spans matched by description prefix
- Custom DB query spans (`db.query`)
"""
if _should_drop_transaction(event):
return None

spans = event.get("spans")
if not isinstance(spans, list):
return event
Expand Down
Loading