From 4fa41adc2379fe71c0f92d5d106a73d99fee876b Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 20:42:52 -0500 Subject: [PATCH 01/11] feat(backend): Add Sentry error tracking integration (#54) FEATURES: - Sentry SDK integration with FastAPI - Automatic error capture with full stack traces - User context attached to errors (user_id, email) - Performance monitoring (10% sample in production) - Smart filtering (ignores health checks, bot paths) IMPLEMENTATION: - services/sentry.py: init_sentry(), set_user_context(), capture_exception() - main.py: Initialize Sentry before other imports - middleware/auth.py: Set user context after authentication CONFIGURATION: - SENTRY_DSN: Your Sentry project DSN - ENVIRONMENT: development/staging/production Sentry is optional - app works fine without SENTRY_DSN set. Closes #54 --- .env.example | 5 ++ backend/.env.example | 5 ++ backend/main.py | 4 ++ backend/middleware/auth.py | 6 ++ backend/requirements.txt | 3 + backend/services/sentry.py | 137 +++++++++++++++++++++++++++++++++++++ 6 files changed, 160 insertions(+) create mode 100644 backend/services/sentry.py diff --git a/.env.example b/.env.example index 5e19c82..460dccd 100644 --- a/.env.example +++ b/.env.example @@ -34,3 +34,8 @@ ALLOWED_ORIGINS=http://localhost:3000 # Redis (auto-configured in Docker, set REDIS_URL in Railway) REDIS_HOST=redis REDIS_PORT=6379 + +# Sentry Error Tracking (Optional but recommended for production) +# Get DSN from: https://sentry.io → Settings → Projects → Client Keys +SENTRY_DSN= +ENVIRONMENT=development # development, staging, production diff --git a/backend/.env.example b/backend/.env.example index 6ab244f..752713f 100644 --- a/backend/.env.example +++ b/backend/.env.example @@ -18,3 +18,8 @@ ALLOWED_ORIGINS=http://localhost:3000 # Redis Cache REDIS_HOST=localhost REDIS_PORT=6379 + +# Sentry Error Tracking (Optional) +# Get DSN from https://sentry.io → Settings → Projects → Client Keys +SENTRY_DSN= +ENVIRONMENT=development diff --git a/backend/main.py b/backend/main.py index 7ea4307..7ae7605 100644 --- a/backend/main.py +++ b/backend/main.py @@ -10,6 +10,10 @@ from starlette.responses import JSONResponse import os +# Initialize Sentry FIRST (before other imports to catch all errors) +from services.sentry import init_sentry +init_sentry() + # Import API config (single source of truth for versioning) from config.api import API_PREFIX, API_VERSION diff --git a/backend/middleware/auth.py b/backend/middleware/auth.py index 92d590f..d8a3a1e 100644 --- a/backend/middleware/auth.py +++ b/backend/middleware/auth.py @@ -122,11 +122,17 @@ def _authenticate(token: str) -> AuthContext: # Try JWT (Supabase tokens) ctx = _validate_jwt(token) if ctx: + # Set Sentry user context for error tracking + from services.sentry import set_user_context + set_user_context(user_id=ctx.user_id, email=ctx.email) return ctx # Try API key ctx = _validate_api_key(token) if ctx: + # Set Sentry user context for error tracking + from services.sentry import set_user_context + set_user_context(user_id=ctx.user_id or ctx.api_key_name) return ctx # Neither worked diff --git a/backend/requirements.txt b/backend/requirements.txt index f22e096..d066650 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -36,3 +36,6 @@ pyjwt>=2.8.0 # JWT token verification for Supabase Auth pytest>=8.0.0 pytest-asyncio>=0.24.0 pytest-cov>=6.0.0 + +# Observability +sentry-sdk[fastapi]>=2.0.0 diff --git a/backend/services/sentry.py b/backend/services/sentry.py new file mode 100644 index 0000000..7d72713 --- /dev/null +++ b/backend/services/sentry.py @@ -0,0 +1,137 @@ +""" +Sentry Error Tracking Integration +Provides production error visibility and performance monitoring +""" +import os +from typing import Optional + + +def init_sentry() -> bool: + """ + Initialize Sentry SDK if SENTRY_DSN is configured. + + Returns: + bool: True if Sentry was initialized, False otherwise + """ + sentry_dsn = os.getenv("SENTRY_DSN") + + if not sentry_dsn: + print("ℹ️ Sentry DSN not configured - error tracking disabled") + return False + + try: + import sentry_sdk + from sentry_sdk.integrations.fastapi import FastApiIntegration + from sentry_sdk.integrations.starlette import StarletteIntegration + + environment = os.getenv("ENVIRONMENT", "development") + + sentry_sdk.init( + dsn=sentry_dsn, + environment=environment, + + # Performance monitoring - sample 10% of transactions in production + traces_sample_rate=0.1 if environment == "production" else 1.0, + + # Profile 10% of sampled transactions + profiles_sample_rate=0.1, + + # Send PII like user IDs (we need this for debugging) + send_default_pii=True, + + # Integrations + integrations=[ + FastApiIntegration(transaction_style="endpoint"), + StarletteIntegration(transaction_style="endpoint"), + ], + + # Filter out health check noise + before_send=_filter_events, + + # Don't send in debug mode + debug=environment == "development", + ) + + print(f"✅ Sentry initialized (environment: {environment})") + return True + + except ImportError: + print("⚠️ sentry-sdk not installed - error tracking disabled") + return False + except Exception as e: + print(f"⚠️ Failed to initialize Sentry: {e}") + return False + + +def _filter_events(event, hint): + """ + Filter out noisy events before sending to Sentry. + """ + # Don't send health check errors + if "health" in event.get("request", {}).get("url", ""): + return None + + # Don't send 404s for common bot paths + if event.get("exception"): + exception_value = str(event["exception"].get("values", [{}])[0].get("value", "")) + bot_paths = ["/wp-admin", "/wp-login", "/.env", "/config", "/admin"] + if any(path in exception_value for path in bot_paths): + return None + + return event + + +def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None): + """ + Set user context for error tracking. + Call this after authentication to attach user info to errors. + + Args: + user_id: The authenticated user's ID + email: The user's email (optional) + """ + try: + import sentry_sdk + sentry_sdk.set_user({ + "id": user_id, + "email": email, + }) + except ImportError: + pass # Sentry not installed + + +def capture_exception(error: Exception, **extra_context): + """ + Manually capture an exception with additional context. + + Args: + error: The exception to capture + **extra_context: Additional context to attach + """ + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in extra_context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_exception(error) + except ImportError: + pass # Sentry not installed + + +def capture_message(message: str, level: str = "info", **extra_context): + """ + Capture a message (not an exception) for tracking. + + Args: + message: The message to capture + level: Severity level (info, warning, error) + **extra_context: Additional context to attach + """ + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in extra_context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_message(message, level=level) + except ImportError: + pass # Sentry not installed From 005244532a2229a0d13fab674308afa3a9b34a45 Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 20:56:55 -0500 Subject: [PATCH 02/11] feat(sentry): Enhance error tracking with operation context and 500 handler ENHANCEMENTS: - Generic 500 exception handler captures all unhandled errors - Operation context tagging (indexing, search) - Search errors now captured with query context - track_background_task decorator for async jobs - sentry_operation context manager for scoped tracking NEW HELPERS: - set_operation_context(): Tag current operation - capture_http_exception(): For custom exception handlers - track_background_task(): Decorator for async functions All 49 tests passing. --- backend/main.py | 15 ++ backend/services/indexer_optimized.py | 5 + backend/services/sentry.py | 216 +++++++++++++++++++++++--- 3 files changed, 212 insertions(+), 24 deletions(-) diff --git a/backend/main.py b/backend/main.py index 7ae7605..a901345 100644 --- a/backend/main.py +++ b/backend/main.py @@ -112,3 +112,18 @@ async def rate_limit_handler(request: Request, exc): status_code=429, content={"detail": "Rate limit exceeded. Please try again later."} ) + + +@app.exception_handler(Exception) +async def generic_exception_handler(request: Request, exc: Exception): + """ + Catch-all handler for unhandled exceptions. + Captures to Sentry and returns 500. + """ + from services.sentry import capture_http_exception + capture_http_exception(request, exc, 500) + + return JSONResponse( + status_code=500, + content={"detail": "Internal server error"} + ) diff --git a/backend/services/indexer_optimized.py b/backend/services/indexer_optimized.py index 06f1f07..2c7403d 100644 --- a/backend/services/indexer_optimized.py +++ b/backend/services/indexer_optimized.py @@ -194,6 +194,9 @@ def _extract_functions(self, tree_node, source_code: bytes) -> List[Dict]: async def index_repository(self, repo_id: str, repo_path: str): """Index all code in a repository - OPTIMIZED VERSION""" + from services.sentry import set_operation_context, capture_exception + + set_operation_context("indexing", repo_id=repo_id) start_time = time.time() print(f"\n🚀 Starting optimized indexing for repo: {repo_id}") print(f"📂 Path: {repo_path}") @@ -386,6 +389,8 @@ async def semantic_search( return formatted_results[:max_results] except Exception as e: + from services.sentry import capture_exception + capture_exception(e, operation="search", repo_id=repo_id, query=query[:100]) print(f"❌ Error searching: {e}") return [] diff --git a/backend/services/sentry.py b/backend/services/sentry.py index 7d72713..6f5b667 100644 --- a/backend/services/sentry.py +++ b/backend/services/sentry.py @@ -3,7 +3,13 @@ Provides production error visibility and performance monitoring """ import os -from typing import Optional +import functools +from typing import Optional, Callable, Any +from contextlib import contextmanager + + +# Global flag to track if Sentry is initialized +_sentry_initialized = False def init_sentry() -> bool: @@ -13,6 +19,7 @@ def init_sentry() -> bool: Returns: bool: True if Sentry was initialized, False otherwise """ + global _sentry_initialized sentry_dsn = os.getenv("SENTRY_DSN") if not sentry_dsn: @@ -30,13 +37,13 @@ def init_sentry() -> bool: dsn=sentry_dsn, environment=environment, - # Performance monitoring - sample 10% of transactions in production + # Performance monitoring - sample 10% in production, 100% in dev traces_sample_rate=0.1 if environment == "production" else 1.0, # Profile 10% of sampled transactions profiles_sample_rate=0.1, - # Send PII like user IDs (we need this for debugging) + # Send PII like user IDs (needed for debugging) send_default_pii=True, # Integrations @@ -48,10 +55,11 @@ def init_sentry() -> bool: # Filter out health check noise before_send=_filter_events, - # Don't send in debug mode + # Debug logging in development debug=environment == "development", ) + _sentry_initialized = True print(f"✅ Sentry initialized (environment: {environment})") return True @@ -64,58 +72,115 @@ def init_sentry() -> bool: def _filter_events(event, hint): - """ - Filter out noisy events before sending to Sentry. - """ + """Filter out noisy events before sending to Sentry.""" # Don't send health check errors - if "health" in event.get("request", {}).get("url", ""): + request_url = event.get("request", {}).get("url", "") + if "/health" in request_url: return None # Don't send 404s for common bot paths if event.get("exception"): - exception_value = str(event["exception"].get("values", [{}])[0].get("value", "")) - bot_paths = ["/wp-admin", "/wp-login", "/.env", "/config", "/admin"] - if any(path in exception_value for path in bot_paths): - return None + values = event["exception"].get("values", [{}]) + if values: + exception_value = str(values[0].get("value", "")) + bot_paths = ["/wp-admin", "/wp-login", "/.env", "/config", "/admin", "/phpmyadmin"] + if any(path in exception_value for path in bot_paths): + return None return event +# --------------------------------------------------------------------------- +# User Context +# --------------------------------------------------------------------------- + def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None): """ Set user context for error tracking. - Call this after authentication to attach user info to errors. - - Args: - user_id: The authenticated user's ID - email: The user's email (optional) + Call after authentication to attach user info to errors. """ + if not _sentry_initialized: + return + try: import sentry_sdk sentry_sdk.set_user({ "id": user_id, "email": email, }) + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Operation Context (for tagging operations like indexing, search) +# --------------------------------------------------------------------------- + +@contextmanager +def sentry_operation(operation: str, **tags): + """ + Context manager to tag operations with context. + + Usage: + with sentry_operation("indexing", repo_id="abc", repo_name="zustand"): + # do indexing work + # any errors here will have repo_id and repo_name tags + """ + if not _sentry_initialized: + yield + return + + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + scope.set_tag("operation", operation) + for key, value in tags.items(): + scope.set_tag(key, str(value)) + yield except ImportError: - pass # Sentry not installed + yield +def set_operation_context(operation: str, **tags): + """ + Set operation context without context manager. + Useful when you can't use 'with' statement. + """ + if not _sentry_initialized: + return + + try: + import sentry_sdk + sentry_sdk.set_tag("operation", operation) + for key, value in tags.items(): + sentry_sdk.set_tag(key, str(value)) + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Exception Capture +# --------------------------------------------------------------------------- + def capture_exception(error: Exception, **extra_context): """ Manually capture an exception with additional context. Args: error: The exception to capture - **extra_context: Additional context to attach + **extra_context: Additional context (repo_id, operation, etc.) """ + if not _sentry_initialized: + return + try: import sentry_sdk with sentry_sdk.push_scope() as scope: for key, value in extra_context.items(): scope.set_extra(key, value) sentry_sdk.capture_exception(error) - except ImportError: - pass # Sentry not installed + except Exception: + pass def capture_message(message: str, level: str = "info", **extra_context): @@ -125,13 +190,116 @@ def capture_message(message: str, level: str = "info", **extra_context): Args: message: The message to capture level: Severity level (info, warning, error) - **extra_context: Additional context to attach + **extra_context: Additional context """ + if not _sentry_initialized: + return + try: import sentry_sdk with sentry_sdk.push_scope() as scope: for key, value in extra_context.items(): scope.set_extra(key, value) sentry_sdk.capture_message(message, level=level) - except ImportError: - pass # Sentry not installed + except Exception: + pass + + +# --------------------------------------------------------------------------- +# Background Task Decorator +# --------------------------------------------------------------------------- + +def track_background_task(operation: str): + """ + Decorator to track background tasks and capture any errors. + + Usage: + @track_background_task("indexing") + async def index_repository(repo_id: str): + # any unhandled exception here will be captured with context + """ + def decorator(func: Callable) -> Callable: + @functools.wraps(func) + async def async_wrapper(*args, **kwargs) -> Any: + if not _sentry_initialized: + return await func(*args, **kwargs) + + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + scope.set_tag("operation", operation) + scope.set_tag("background_task", "true") + # Add function args as context + scope.set_extra("args", str(args)[:500]) + scope.set_extra("kwargs", str(kwargs)[:500]) + + try: + return await func(*args, **kwargs) + except Exception as e: + sentry_sdk.capture_exception(e) + raise # Re-raise so caller knows it failed + except ImportError: + return await func(*args, **kwargs) + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs) -> Any: + if not _sentry_initialized: + return func(*args, **kwargs) + + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + scope.set_tag("operation", operation) + scope.set_tag("background_task", "true") + scope.set_extra("args", str(args)[:500]) + scope.set_extra("kwargs", str(kwargs)[:500]) + + try: + return func(*args, **kwargs) + except Exception as e: + sentry_sdk.capture_exception(e) + raise + except ImportError: + return func(*args, **kwargs) + + # Return appropriate wrapper based on function type + if asyncio_iscoroutinefunction(func): + return async_wrapper + return sync_wrapper + + return decorator + + +def asyncio_iscoroutinefunction(func): + """Check if function is async.""" + import asyncio + return asyncio.iscoroutinefunction(func) + + +# --------------------------------------------------------------------------- +# HTTP Exception Handler Helper +# --------------------------------------------------------------------------- + +def capture_http_exception(request, exc, status_code: int): + """ + Capture HTTP exceptions that would otherwise be swallowed. + Call this from FastAPI exception handlers for 500+ errors. + + Args: + request: FastAPI request object + exc: The exception + status_code: HTTP status code being returned + """ + # Only capture server errors (5xx) + if status_code < 500 or not _sentry_initialized: + return + + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + scope.set_tag("http_status", str(status_code)) + scope.set_extra("path", str(request.url.path)) + scope.set_extra("method", request.method) + sentry_sdk.capture_exception(exc) + except Exception: + pass From d789ec7898a907bbe4a4252a940a06131e40f92b Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 21:46:44 -0500 Subject: [PATCH 03/11] feat(observability): Add unified observability module FEATURES: - Structured logging (JSON in prod, pretty in dev) - OperationContext manager for scoped context - trace_operation() for Sentry spans - track_performance() decorator - Simple metrics counters USAGE: from services.observability import get_logger, OperationContext, metrics logger = get_logger('indexer') with OperationContext(operation='indexing', repo_id='abc'): logger.info('Processing file', context={'file': 'main.py'}) metrics.increment('files_processed') Part of #54 - comprehensive observability --- backend/services/observability.py | 417 ++++++++++++++++++++++++++++++ 1 file changed, 417 insertions(+) create mode 100644 backend/services/observability.py diff --git a/backend/services/observability.py b/backend/services/observability.py new file mode 100644 index 0000000..c9d1d0a --- /dev/null +++ b/backend/services/observability.py @@ -0,0 +1,417 @@ +""" +Observability Module +Unified logging, tracing, and metrics for CodeIntel backend + +Features: +- Structured JSON logging (production) / Pretty logging (development) +- Sentry integration with context managers +- Performance tracking decorators +- Operation-level breadcrumbs +""" +import os +import sys +import json +import time +import logging +import functools +from typing import Optional, Dict, Any, Callable +from contextlib import contextmanager +from datetime import datetime + + +# ============================================================================ +# CONFIGURATION +# ============================================================================ + +ENVIRONMENT = os.getenv("ENVIRONMENT", "development") +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() +JSON_LOGS = ENVIRONMENT == "production" + + +# ============================================================================ +# STRUCTURED LOGGER +# ============================================================================ + +class StructuredFormatter(logging.Formatter): + """JSON formatter for production, pretty formatter for development""" + + def format(self, record: logging.LogRecord) -> str: + log_data = { + "timestamp": datetime.utcnow().isoformat() + "Z", + "level": record.levelname, + "logger": record.name, + "message": record.getMessage(), + } + + # Add extra fields if present + if hasattr(record, "extra_fields"): + log_data.update(record.extra_fields) + + # Add exception info if present + if record.exc_info: + log_data["exception"] = self.formatException(record.exc_info) + + if JSON_LOGS: + return json.dumps(log_data) + else: + # Pretty format for development + extra = "" + if hasattr(record, "extra_fields") and record.extra_fields: + extra = " | " + " ".join(f"{k}={v}" for k, v in record.extra_fields.items()) + + level_colors = { + "DEBUG": "\033[36m", # Cyan + "INFO": "\033[32m", # Green + "WARNING": "\033[33m", # Yellow + "ERROR": "\033[31m", # Red + "CRITICAL": "\033[35m", # Magenta + } + reset = "\033[0m" + color = level_colors.get(record.levelname, "") + + return f"{color}[{record.levelname}]{reset} {record.name}: {record.getMessage()}{extra}" + + +class ContextLogger(logging.Logger): + """Logger that supports extra context fields""" + + def _log_with_context(self, level: int, msg: str, context: Dict = None, **kwargs): + if context: + # Create a new record with extra fields + extra = kwargs.get("extra", {}) + extra["extra_fields"] = context + kwargs["extra"] = extra + super()._log(level, msg, (), **kwargs) + + def info(self, msg: str, context: Dict = None, **kwargs): + self._log_with_context(logging.INFO, msg, context, **kwargs) + + def warning(self, msg: str, context: Dict = None, **kwargs): + self._log_with_context(logging.WARNING, msg, context, **kwargs) + + def error(self, msg: str, context: Dict = None, **kwargs): + self._log_with_context(logging.ERROR, msg, context, **kwargs) + + def debug(self, msg: str, context: Dict = None, **kwargs): + self._log_with_context(logging.DEBUG, msg, context, **kwargs) + + +def get_logger(name: str) -> ContextLogger: + """Get a configured logger instance""" + logging.setLoggerClass(ContextLogger) + logger = logging.getLogger(name) + + if not logger.handlers: + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(StructuredFormatter()) + logger.addHandler(handler) + logger.setLevel(getattr(logging, LOG_LEVEL)) + + return logger + + +# ============================================================================ +# SENTRY INTEGRATION +# ============================================================================ + +def _sentry_available() -> bool: + """Check if Sentry is initialized""" + try: + import sentry_sdk + return sentry_sdk.Hub.current.client is not None + except ImportError: + return False + + +def set_context(name: str, data: Dict[str, Any]): + """Set Sentry context for current scope""" + if not _sentry_available(): + return + + import sentry_sdk + sentry_sdk.set_context(name, data) + + +def set_tag(key: str, value: str): + """Set a tag for the current Sentry scope""" + if not _sentry_available(): + return + + import sentry_sdk + sentry_sdk.set_tag(key, value) + + +def add_breadcrumb(message: str, category: str = "default", level: str = "info", data: Dict = None): + """Add a breadcrumb for debugging""" + if not _sentry_available(): + return + + import sentry_sdk + sentry_sdk.add_breadcrumb( + message=message, + category=category, + level=level, + data=data or {} + ) + + +def capture_exception(error: Exception, **extra_context): + """Capture exception with extra context""" + if not _sentry_available(): + return + + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in extra_context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_exception(error) + + +def capture_message(message: str, level: str = "info", **extra_context): + """Capture a message with context""" + if not _sentry_available(): + return + + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in extra_context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_message(message, level=level) + + +# ============================================================================ +# PERFORMANCE TRACKING +# ============================================================================ + +@contextmanager +def trace_operation(operation_name: str, **tags): + """ + Context manager for tracing operations with Sentry spans. + + Usage: + with trace_operation("indexing", repo_id="abc"): + # ... do work ... + """ + logger = get_logger("trace") + start_time = time.time() + + # Set tags + for key, value in tags.items(): + set_tag(key, str(value)) + + # Add breadcrumb + add_breadcrumb( + message=f"Started {operation_name}", + category="operation", + data=tags + ) + + if _sentry_available(): + import sentry_sdk + with sentry_sdk.start_span(op=operation_name, description=operation_name) as span: + for key, value in tags.items(): + span.set_tag(key, str(value)) + try: + yield span + except Exception as e: + span.set_status("error") + duration = time.time() - start_time + logger.error(f"{operation_name} failed after {duration:.2f}s", context={"error": str(e), **tags}) + raise + finally: + duration = time.time() - start_time + span.set_data("duration_seconds", duration) + else: + try: + yield None + except Exception as e: + duration = time.time() - start_time + logger.error(f"{operation_name} failed after {duration:.2f}s", context={"error": str(e), **tags}) + raise + finally: + duration = time.time() - start_time + logger.debug(f"{operation_name} completed in {duration:.2f}s", context=tags) + + +def track_performance(operation_name: str = None): + """ + Decorator for tracking function performance. + + Usage: + @track_performance("search") + async def search_code(query: str, repo_id: str): + ... + """ + def decorator(func: Callable): + op_name = operation_name or func.__name__ + + @functools.wraps(func) + async def async_wrapper(*args, **kwargs): + with trace_operation(op_name): + return await func(*args, **kwargs) + + @functools.wraps(func) + def sync_wrapper(*args, **kwargs): + with trace_operation(op_name): + return func(*args, **kwargs) + + if asyncio_iscoroutinefunction(func): + return async_wrapper + return sync_wrapper + + return decorator + + +def asyncio_iscoroutinefunction(func): + """Check if function is async""" + import asyncio + return asyncio.iscoroutinefunction(func) + + +# ============================================================================ +# OPERATION CONTEXT +# ============================================================================ + +class OperationContext: + """ + Context manager for setting operation-level context. + + Usage: + with OperationContext(operation="indexing", repo_id="abc", user_id="xyz"): + # All errors/logs within this block have context attached + do_indexing() + """ + + def __init__(self, operation: str, **context): + self.operation = operation + self.context = context + self.logger = get_logger(f"op.{operation}") + self.start_time = None + + def __enter__(self): + self.start_time = time.time() + + # Set Sentry context + set_context("operation", { + "name": self.operation, + "started_at": datetime.utcnow().isoformat(), + **self.context + }) + + # Set tags for filtering + set_tag("operation", self.operation) + for key, value in self.context.items(): + if key in ("repo_id", "user_id"): + set_tag(key, str(value)) + + self.logger.info(f"Starting {self.operation}", context=self.context) + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + duration = time.time() - self.start_time + + if exc_type: + self.logger.error( + f"{self.operation} failed after {duration:.2f}s", + context={"error": str(exc_val), "duration": duration, **self.context} + ) + capture_exception(exc_val, operation=self.operation, duration=duration, **self.context) + else: + self.logger.info( + f"{self.operation} completed in {duration:.2f}s", + context={"duration": duration, **self.context} + ) + + return False # Don't suppress exceptions + + def log_progress(self, message: str, **extra): + """Log progress within operation""" + add_breadcrumb(message, category=self.operation, data=extra) + self.logger.info(message, context={**self.context, **extra}) + + def log_warning(self, message: str, **extra): + """Log warning within operation""" + self.logger.warning(message, context={**self.context, **extra}) + + +# ============================================================================ +# METRICS (Simple counters - can be extended with Prometheus later) +# ============================================================================ + +class Metrics: + """Simple in-memory metrics (singleton)""" + + _instance = None + + def __new__(cls): + if cls._instance is None: + cls._instance = super().__new__(cls) + cls._instance._counters = {} + cls._instance._gauges = {} + cls._instance._histograms = {} + return cls._instance + + def increment(self, name: str, value: int = 1, tags: Dict = None): + """Increment a counter""" + key = self._make_key(name, tags) + self._counters[key] = self._counters.get(key, 0) + value + + def gauge(self, name: str, value: float, tags: Dict = None): + """Set a gauge value""" + key = self._make_key(name, tags) + self._gauges[key] = value + + def histogram(self, name: str, value: float, tags: Dict = None): + """Record a histogram value""" + key = self._make_key(name, tags) + if key not in self._histograms: + self._histograms[key] = [] + self._histograms[key].append(value) + # Keep last 1000 values + self._histograms[key] = self._histograms[key][-1000:] + + def _make_key(self, name: str, tags: Dict = None) -> str: + if not tags: + return name + tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items())) + return f"{name}{{{tag_str}}}" + + def get_all(self) -> Dict: + """Get all metrics""" + return { + "counters": self._counters.copy(), + "gauges": self._gauges.copy(), + "histograms": {k: {"count": len(v), "avg": sum(v)/len(v) if v else 0} + for k, v in self._histograms.items()} + } + + +# Global metrics instance +metrics = Metrics() + + +# ============================================================================ +# CONVENIENCE EXPORTS +# ============================================================================ + +__all__ = [ + # Logging + "get_logger", + + # Sentry + "set_context", + "set_tag", + "add_breadcrumb", + "capture_exception", + "capture_message", + + # Performance + "trace_operation", + "track_performance", + + # Context + "OperationContext", + + # Metrics + "metrics", +] From 9c28a712ba0776edde20b0184833ae4002364f9b Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 22:07:13 -0500 Subject: [PATCH 04/11] feat(observability): Add comprehensive observability module (#54) NEW: backend/services/observability.py - StructuredLogger: JSON logs (prod) / Pretty logs (dev) - operation_context: Context manager for operation tracking - track_performance: Decorator for Sentry spans - Metrics: Simple counters/gauges/histograms - Breadcrumbs: Debug trail for errors - capture_exception/capture_message: Manual error capture UPDATED: backend/services/sentry.py - Kept init_sentry() and set_user_context() - Re-exports observability functions for convenience - Cleaner separation of concerns Usage: from services.observability import get_logger, operation_context logger = get_logger('indexer') with operation_context('indexing', repo_id='abc') as ctx: logger.info('Starting', files=100) ctx.set_extra('functions', 340) --- backend/services/observability.py | 573 +++++++++++++++--------------- backend/services/sentry.py | 261 ++------------ 2 files changed, 333 insertions(+), 501 deletions(-) diff --git a/backend/services/observability.py b/backend/services/observability.py index c9d1d0a..af40822 100644 --- a/backend/services/observability.py +++ b/backend/services/observability.py @@ -1,12 +1,12 @@ """ Observability Module -Unified logging, tracing, and metrics for CodeIntel backend +Centralized logging, tracing, and metrics for CodeIntel backend. Features: -- Structured JSON logging (production) / Pretty logging (development) -- Sentry integration with context managers +- Structured JSON logging (prod) / Pretty logging (dev) +- Sentry integration with context management - Performance tracking decorators -- Operation-level breadcrumbs +- Operation context propagation """ import os import sys @@ -25,236 +25,291 @@ ENVIRONMENT = os.getenv("ENVIRONMENT", "development") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() -JSON_LOGS = ENVIRONMENT == "production" +LOG_FORMAT = os.getenv("LOG_FORMAT", "pretty" if ENVIRONMENT == "development" else "json") # ============================================================================ # STRUCTURED LOGGER # ============================================================================ -class StructuredFormatter(logging.Formatter): - """JSON formatter for production, pretty formatter for development""" +class StructuredLogger: + """ + Structured logger with JSON output for production and pretty output for dev. - def format(self, record: logging.LogRecord) -> str: - log_data = { - "timestamp": datetime.utcnow().isoformat() + "Z", - "level": record.levelname, - "logger": record.name, - "message": record.getMessage(), - } - - # Add extra fields if present - if hasattr(record, "extra_fields"): - log_data.update(record.extra_fields) + Usage: + logger = get_logger("indexer") + logger.info("Starting indexing", repo_id="abc", files=120) + logger.error("Failed to index", error=str(e), repo_id="abc") + """ + + def __init__(self, name: str): + self.name = name + self.logger = logging.getLogger(name) + self.logger.setLevel(getattr(logging, LOG_LEVEL)) - # Add exception info if present - if record.exc_info: - log_data["exception"] = self.formatException(record.exc_info) + # Avoid duplicate handlers + if not self.logger.handlers: + handler = logging.StreamHandler(sys.stdout) + handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.addHandler(handler) + + def _format_message(self, level: str, message: str, **kwargs) -> str: + """Format log message based on environment""" + timestamp = datetime.utcnow().isoformat() + "Z" - if JSON_LOGS: + if LOG_FORMAT == "json": + log_data = { + "timestamp": timestamp, + "level": level, + "logger": self.name, + "message": message, + **kwargs + } return json.dumps(log_data) else: # Pretty format for development - extra = "" - if hasattr(record, "extra_fields") and record.extra_fields: - extra = " | " + " ".join(f"{k}={v}" for k, v in record.extra_fields.items()) - - level_colors = { - "DEBUG": "\033[36m", # Cyan - "INFO": "\033[32m", # Green - "WARNING": "\033[33m", # Yellow - "ERROR": "\033[31m", # Red - "CRITICAL": "\033[35m", # Magenta + level_icons = { + "DEBUG": "🔍", + "INFO": "ℹ️ ", + "WARNING": "⚠️ ", + "ERROR": "❌", + "CRITICAL": "🔥" } - reset = "\033[0m" - color = level_colors.get(record.levelname, "") + icon = level_icons.get(level, "•") - return f"{color}[{record.levelname}]{reset} {record.name}: {record.getMessage()}{extra}" - - -class ContextLogger(logging.Logger): - """Logger that supports extra context fields""" + extra = "" + if kwargs: + extra = " | " + " ".join(f"{k}={v}" for k, v in kwargs.items()) + + return f"{icon} [{self.name}] {message}{extra}" - def _log_with_context(self, level: int, msg: str, context: Dict = None, **kwargs): - if context: - # Create a new record with extra fields - extra = kwargs.get("extra", {}) - extra["extra_fields"] = context - kwargs["extra"] = extra - super()._log(level, msg, (), **kwargs) + def debug(self, message: str, **kwargs): + self.logger.debug(self._format_message("DEBUG", message, **kwargs)) - def info(self, msg: str, context: Dict = None, **kwargs): - self._log_with_context(logging.INFO, msg, context, **kwargs) + def info(self, message: str, **kwargs): + self.logger.info(self._format_message("INFO", message, **kwargs)) - def warning(self, msg: str, context: Dict = None, **kwargs): - self._log_with_context(logging.WARNING, msg, context, **kwargs) + def warning(self, message: str, **kwargs): + self.logger.warning(self._format_message("WARNING", message, **kwargs)) - def error(self, msg: str, context: Dict = None, **kwargs): - self._log_with_context(logging.ERROR, msg, context, **kwargs) + def error(self, message: str, **kwargs): + self.logger.error(self._format_message("ERROR", message, **kwargs)) - def debug(self, msg: str, context: Dict = None, **kwargs): - self._log_with_context(logging.DEBUG, msg, context, **kwargs) + def critical(self, message: str, **kwargs): + self.logger.critical(self._format_message("CRITICAL", message, **kwargs)) -def get_logger(name: str) -> ContextLogger: - """Get a configured logger instance""" - logging.setLoggerClass(ContextLogger) - logger = logging.getLogger(name) - - if not logger.handlers: - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(StructuredFormatter()) - logger.addHandler(handler) - logger.setLevel(getattr(logging, LOG_LEVEL)) - - return logger +# Logger cache +_loggers: Dict[str, StructuredLogger] = {} + +def get_logger(name: str) -> StructuredLogger: + """Get or create a structured logger""" + if name not in _loggers: + _loggers[name] = StructuredLogger(name) + return _loggers[name] # ============================================================================ -# SENTRY INTEGRATION +# SENTRY CONTEXT MANAGEMENT # ============================================================================ -def _sentry_available() -> bool: - """Check if Sentry is initialized""" +def set_operation_context( + operation: str, + repo_id: Optional[str] = None, + user_id: Optional[str] = None, + **extra +): + """ + Set operation context for Sentry error tracking. + + Usage: + set_operation_context("indexing", repo_id="abc", files=120) + """ try: import sentry_sdk - return sentry_sdk.Hub.current.client is not None + + sentry_sdk.set_tag("operation", operation) + + if repo_id: + sentry_sdk.set_tag("repo_id", repo_id) + if user_id: + sentry_sdk.set_user({"id": user_id}) + + for key, value in extra.items(): + sentry_sdk.set_extra(key, value) + except ImportError: - return False - - -def set_context(name: str, data: Dict[str, Any]): - """Set Sentry context for current scope""" - if not _sentry_available(): - return - - import sentry_sdk - sentry_sdk.set_context(name, data) - - -def set_tag(key: str, value: str): - """Set a tag for the current Sentry scope""" - if not _sentry_available(): - return - - import sentry_sdk - sentry_sdk.set_tag(key, value) - - -def add_breadcrumb(message: str, category: str = "default", level: str = "info", data: Dict = None): - """Add a breadcrumb for debugging""" - if not _sentry_available(): - return - - import sentry_sdk - sentry_sdk.add_breadcrumb( - message=message, - category=category, - level=level, - data=data or {} - ) - - -def capture_exception(error: Exception, **extra_context): - """Capture exception with extra context""" - if not _sentry_available(): - return - - import sentry_sdk - with sentry_sdk.push_scope() as scope: - for key, value in extra_context.items(): - scope.set_extra(key, value) - sentry_sdk.capture_exception(error) + pass # Sentry not installed -def capture_message(message: str, level: str = "info", **extra_context): - """Capture a message with context""" - if not _sentry_available(): - return +def add_breadcrumb( + message: str, + category: str = "operation", + level: str = "info", + **data +): + """ + Add breadcrumb for debugging error context. - import sentry_sdk - with sentry_sdk.push_scope() as scope: - for key, value in extra_context.items(): - scope.set_extra(key, value) - sentry_sdk.capture_message(message, level=level) - + Usage: + add_breadcrumb("Cloned repository", category="git", repo_id="abc") + add_breadcrumb("Extracted 340 functions", category="indexing") + """ + try: + import sentry_sdk + sentry_sdk.add_breadcrumb( + message=message, + category=category, + level=level, + data=data + ) + except ImportError: + pass -# ============================================================================ -# PERFORMANCE TRACKING -# ============================================================================ @contextmanager -def trace_operation(operation_name: str, **tags): +def operation_context( + operation: str, + repo_id: Optional[str] = None, + user_id: Optional[str] = None, + **extra +): """ - Context manager for tracing operations with Sentry spans. + Context manager for operation tracking with automatic error capture. Usage: - with trace_operation("indexing", repo_id="abc"): - # ... do work ... + with operation_context("indexing", repo_id="abc") as ctx: + # do work + ctx.set_extra("functions_indexed", 340) """ - logger = get_logger("trace") + logger = get_logger(operation) start_time = time.time() - # Set tags - for key, value in tags.items(): - set_tag(key, str(value)) + # Set Sentry context + set_operation_context(operation, repo_id, user_id, **extra) + add_breadcrumb(f"Started {operation}", category=operation, repo_id=repo_id) - # Add breadcrumb - add_breadcrumb( - message=f"Started {operation_name}", - category="operation", - data=tags - ) - - if _sentry_available(): - import sentry_sdk - with sentry_sdk.start_span(op=operation_name, description=operation_name) as span: - for key, value in tags.items(): - span.set_tag(key, str(value)) + class Context: + def __init__(self): + self.extras = {} + + def set_extra(self, key: str, value: Any): + self.extras[key] = value try: - yield span - except Exception as e: - span.set_status("error") - duration = time.time() - start_time - logger.error(f"{operation_name} failed after {duration:.2f}s", context={"error": str(e), **tags}) - raise - finally: - duration = time.time() - start_time - span.set_data("duration_seconds", duration) - else: + import sentry_sdk + sentry_sdk.set_extra(key, value) + except ImportError: + pass + + ctx = Context() + + try: + logger.info(f"Starting {operation}", repo_id=repo_id, **extra) + yield ctx + + duration = time.time() - start_time + logger.info( + f"Completed {operation}", + repo_id=repo_id, + duration_s=round(duration, 2), + **ctx.extras + ) + add_breadcrumb( + f"Completed {operation}", + category=operation, + duration_s=round(duration, 2) + ) + + except Exception as e: + duration = time.time() - start_time + logger.error( + f"Failed {operation}", + repo_id=repo_id, + error=str(e), + duration_s=round(duration, 2), + **ctx.extras + ) + + # Capture to Sentry with full context try: - yield None - except Exception as e: - duration = time.time() - start_time - logger.error(f"{operation_name} failed after {duration:.2f}s", context={"error": str(e), **tags}) - raise - finally: - duration = time.time() - start_time - logger.debug(f"{operation_name} completed in {duration:.2f}s", context=tags) + import sentry_sdk + with sentry_sdk.push_scope() as scope: + scope.set_extra("duration_s", round(duration, 2)) + for key, value in ctx.extras.items(): + scope.set_extra(key, value) + sentry_sdk.capture_exception(e) + except ImportError: + pass + + raise -def track_performance(operation_name: str = None): +# ============================================================================ +# PERFORMANCE TRACKING +# ============================================================================ + +def track_performance(operation_name: Optional[str] = None): """ - Decorator for tracking function performance. + Decorator to track function performance with Sentry spans. Usage: - @track_performance("search") - async def search_code(query: str, repo_id: str): + @track_performance("embedding_generation") + async def generate_embeddings(texts): ... """ def decorator(func: Callable): - op_name = operation_name or func.__name__ + name = operation_name or func.__name__ @functools.wraps(func) async def async_wrapper(*args, **kwargs): - with trace_operation(op_name): - return await func(*args, **kwargs) + logger = get_logger(name) + start_time = time.time() + + try: + # Try to create Sentry span + try: + import sentry_sdk + with sentry_sdk.start_span(op=name, description=name) as span: + result = await func(*args, **kwargs) + span.set_data("success", True) + return result + except ImportError: + return await func(*args, **kwargs) + + except Exception as e: + duration = time.time() - start_time + logger.error(f"{name} failed", error=str(e), duration_s=round(duration, 2)) + raise + finally: + duration = time.time() - start_time + if duration > 5.0: # Log slow operations + logger.warning(f"{name} slow", duration_s=round(duration, 2)) @functools.wraps(func) def sync_wrapper(*args, **kwargs): - with trace_operation(op_name): - return func(*args, **kwargs) + logger = get_logger(name) + start_time = time.time() + + try: + try: + import sentry_sdk + with sentry_sdk.start_span(op=name, description=name) as span: + result = func(*args, **kwargs) + span.set_data("success", True) + return result + except ImportError: + return func(*args, **kwargs) + + except Exception as e: + duration = time.time() - start_time + logger.error(f"{name} failed", error=str(e), duration_s=round(duration, 2)) + raise + finally: + duration = time.time() - start_time + if duration > 5.0: + logger.warning(f"{name} slow", duration_s=round(duration, 2)) + # Return appropriate wrapper based on function type if asyncio_iscoroutinefunction(func): return async_wrapper return sync_wrapper @@ -269,120 +324,60 @@ def asyncio_iscoroutinefunction(func): # ============================================================================ -# OPERATION CONTEXT +# METRICS (Simple counters - can be extended to Prometheus later) # ============================================================================ -class OperationContext: +class Metrics: """ - Context manager for setting operation-level context. - - Usage: - with OperationContext(operation="indexing", repo_id="abc", user_id="xyz"): - # All errors/logs within this block have context attached - do_indexing() + Simple in-memory metrics counters. + Can be extended to push to Prometheus/StatsD later. """ - def __init__(self, operation: str, **context): - self.operation = operation - self.context = context - self.logger = get_logger(f"op.{operation}") - self.start_time = None - - def __enter__(self): - self.start_time = time.time() - - # Set Sentry context - set_context("operation", { - "name": self.operation, - "started_at": datetime.utcnow().isoformat(), - **self.context - }) - - # Set tags for filtering - set_tag("operation", self.operation) - for key, value in self.context.items(): - if key in ("repo_id", "user_id"): - set_tag(key, str(value)) - - self.logger.info(f"Starting {self.operation}", context=self.context) - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - duration = time.time() - self.start_time - - if exc_type: - self.logger.error( - f"{self.operation} failed after {duration:.2f}s", - context={"error": str(exc_val), "duration": duration, **self.context} - ) - capture_exception(exc_val, operation=self.operation, duration=duration, **self.context) - else: - self.logger.info( - f"{self.operation} completed in {duration:.2f}s", - context={"duration": duration, **self.context} - ) - - return False # Don't suppress exceptions - - def log_progress(self, message: str, **extra): - """Log progress within operation""" - add_breadcrumb(message, category=self.operation, data=extra) - self.logger.info(message, context={**self.context, **extra}) + def __init__(self): + self._counters: Dict[str, int] = {} + self._gauges: Dict[str, float] = {} + self._histograms: Dict[str, list] = {} - def log_warning(self, message: str, **extra): - """Log warning within operation""" - self.logger.warning(message, context={**self.context, **extra}) - - -# ============================================================================ -# METRICS (Simple counters - can be extended with Prometheus later) -# ============================================================================ - -class Metrics: - """Simple in-memory metrics (singleton)""" - - _instance = None - - def __new__(cls): - if cls._instance is None: - cls._instance = super().__new__(cls) - cls._instance._counters = {} - cls._instance._gauges = {} - cls._instance._histograms = {} - return cls._instance - - def increment(self, name: str, value: int = 1, tags: Dict = None): + def increment(self, name: str, value: int = 1, **tags): """Increment a counter""" key = self._make_key(name, tags) self._counters[key] = self._counters.get(key, 0) + value - def gauge(self, name: str, value: float, tags: Dict = None): + def gauge(self, name: str, value: float, **tags): """Set a gauge value""" key = self._make_key(name, tags) self._gauges[key] = value - def histogram(self, name: str, value: float, tags: Dict = None): + def histogram(self, name: str, value: float, **tags): """Record a histogram value""" key = self._make_key(name, tags) if key not in self._histograms: self._histograms[key] = [] self._histograms[key].append(value) - # Keep last 1000 values - self._histograms[key] = self._histograms[key][-1000:] + # Keep only last 1000 values + if len(self._histograms[key]) > 1000: + self._histograms[key] = self._histograms[key][-1000:] - def _make_key(self, name: str, tags: Dict = None) -> str: + def _make_key(self, name: str, tags: Dict) -> str: if not tags: return name tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items())) return f"{name}{{{tag_str}}}" - def get_all(self) -> Dict: - """Get all metrics""" + def get_stats(self) -> Dict: + """Get all metrics for debugging/monitoring endpoint""" return { "counters": self._counters.copy(), "gauges": self._gauges.copy(), - "histograms": {k: {"count": len(v), "avg": sum(v)/len(v) if v else 0} - for k, v in self._histograms.items()} + "histograms": { + k: { + "count": len(v), + "avg": sum(v) / len(v) if v else 0, + "min": min(v) if v else 0, + "max": max(v) if v else 0 + } + for k, v in self._histograms.items() + } } @@ -391,27 +386,47 @@ def get_all(self) -> Dict: # ============================================================================ -# CONVENIENCE EXPORTS +# CONVENIENCE FUNCTIONS # ============================================================================ -__all__ = [ - # Logging - "get_logger", +def capture_exception(error: Exception, **context): + """ + Capture exception to Sentry with additional context. - # Sentry - "set_context", - "set_tag", - "add_breadcrumb", - "capture_exception", - "capture_message", + Usage: + try: + risky_operation() + except Exception as e: + capture_exception(e, repo_id="abc", operation="indexing") + """ + logger = get_logger("error") + logger.error("Exception captured", error=str(error), **context) - # Performance - "trace_operation", - "track_performance", + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_exception(error) + except ImportError: + pass + + +def capture_message(message: str, level: str = "info", **context): + """ + Capture a message to Sentry. - # Context - "OperationContext", + Usage: + capture_message("Unusual pattern detected", level="warning", pattern="...") + """ + logger = get_logger("message") + getattr(logger, level)(message, **context) - # Metrics - "metrics", -] + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_message(message, level=level) + except ImportError: + pass diff --git a/backend/services/sentry.py b/backend/services/sentry.py index 6f5b667..c9a222c 100644 --- a/backend/services/sentry.py +++ b/backend/services/sentry.py @@ -1,15 +1,12 @@ """ Sentry Error Tracking Integration -Provides production error visibility and performance monitoring +Provides production error visibility and performance monitoring. + +For logging and context management, use observability module: + from services.observability import get_logger, operation_context, capture_exception """ import os -import functools -from typing import Optional, Callable, Any -from contextlib import contextmanager - - -# Global flag to track if Sentry is initialized -_sentry_initialized = False +from typing import Optional def init_sentry() -> bool: @@ -19,7 +16,6 @@ def init_sentry() -> bool: Returns: bool: True if Sentry was initialized, False otherwise """ - global _sentry_initialized sentry_dsn = os.getenv("SENTRY_DSN") if not sentry_dsn: @@ -37,13 +33,11 @@ def init_sentry() -> bool: dsn=sentry_dsn, environment=environment, - # Performance monitoring - sample 10% in production, 100% in dev + # Performance monitoring traces_sample_rate=0.1 if environment == "production" else 1.0, - - # Profile 10% of sampled transactions profiles_sample_rate=0.1, - # Send PII like user IDs (needed for debugging) + # Send PII for debugging send_default_pii=True, # Integrations @@ -52,14 +46,13 @@ def init_sentry() -> bool: StarletteIntegration(transaction_style="endpoint"), ], - # Filter out health check noise + # Filter noisy events before_send=_filter_events, - # Debug logging in development + # Debug in development debug=environment == "development", ) - _sentry_initialized = True print(f"✅ Sentry initialized (environment: {environment})") return True @@ -78,7 +71,7 @@ def _filter_events(event, hint): if "/health" in request_url: return None - # Don't send 404s for common bot paths + # Don't send 404s for bot paths if event.get("exception"): values = event["exception"].get("values", [{}]) if values: @@ -90,216 +83,40 @@ def _filter_events(event, hint): return event -# --------------------------------------------------------------------------- -# User Context -# --------------------------------------------------------------------------- +# ============================================================================ +# BACKWARD COMPATIBILITY - Delegate to observability module +# ============================================================================ def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None): - """ - Set user context for error tracking. - Call after authentication to attach user info to errors. - """ - if not _sentry_initialized: - return - - try: - import sentry_sdk - sentry_sdk.set_user({ - "id": user_id, - "email": email, - }) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Operation Context (for tagging operations like indexing, search) -# --------------------------------------------------------------------------- - -@contextmanager -def sentry_operation(operation: str, **tags): - """ - Context manager to tag operations with context. - - Usage: - with sentry_operation("indexing", repo_id="abc", repo_name="zustand"): - # do indexing work - # any errors here will have repo_id and repo_name tags - """ - if not _sentry_initialized: - yield - return - + """Set user context for error tracking.""" try: import sentry_sdk - with sentry_sdk.push_scope() as scope: - scope.set_tag("operation", operation) - for key, value in tags.items(): - scope.set_tag(key, str(value)) - yield + sentry_sdk.set_user({"id": user_id, "email": email}) except ImportError: - yield - - -def set_operation_context(operation: str, **tags): - """ - Set operation context without context manager. - Useful when you can't use 'with' statement. - """ - if not _sentry_initialized: - return - - try: - import sentry_sdk - sentry_sdk.set_tag("operation", operation) - for key, value in tags.items(): - sentry_sdk.set_tag(key, str(value)) - except Exception: - pass - - -# --------------------------------------------------------------------------- -# Exception Capture -# --------------------------------------------------------------------------- - -def capture_exception(error: Exception, **extra_context): - """ - Manually capture an exception with additional context. - - Args: - error: The exception to capture - **extra_context: Additional context (repo_id, operation, etc.) - """ - if not _sentry_initialized: - return - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - for key, value in extra_context.items(): - scope.set_extra(key, value) - sentry_sdk.capture_exception(error) - except Exception: - pass - - -def capture_message(message: str, level: str = "info", **extra_context): - """ - Capture a message (not an exception) for tracking. - - Args: - message: The message to capture - level: Severity level (info, warning, error) - **extra_context: Additional context - """ - if not _sentry_initialized: - return - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - for key, value in extra_context.items(): - scope.set_extra(key, value) - sentry_sdk.capture_message(message, level=level) - except Exception: pass -# --------------------------------------------------------------------------- -# Background Task Decorator -# --------------------------------------------------------------------------- - -def track_background_task(operation: str): - """ - Decorator to track background tasks and capture any errors. - - Usage: - @track_background_task("indexing") - async def index_repository(repo_id: str): - # any unhandled exception here will be captured with context - """ - def decorator(func: Callable) -> Callable: - @functools.wraps(func) - async def async_wrapper(*args, **kwargs) -> Any: - if not _sentry_initialized: - return await func(*args, **kwargs) - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - scope.set_tag("operation", operation) - scope.set_tag("background_task", "true") - # Add function args as context - scope.set_extra("args", str(args)[:500]) - scope.set_extra("kwargs", str(kwargs)[:500]) - - try: - return await func(*args, **kwargs) - except Exception as e: - sentry_sdk.capture_exception(e) - raise # Re-raise so caller knows it failed - except ImportError: - return await func(*args, **kwargs) - - @functools.wraps(func) - def sync_wrapper(*args, **kwargs) -> Any: - if not _sentry_initialized: - return func(*args, **kwargs) - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - scope.set_tag("operation", operation) - scope.set_tag("background_task", "true") - scope.set_extra("args", str(args)[:500]) - scope.set_extra("kwargs", str(kwargs)[:500]) - - try: - return func(*args, **kwargs) - except Exception as e: - sentry_sdk.capture_exception(e) - raise - except ImportError: - return func(*args, **kwargs) - - # Return appropriate wrapper based on function type - if asyncio_iscoroutinefunction(func): - return async_wrapper - return sync_wrapper - - return decorator - - -def asyncio_iscoroutinefunction(func): - """Check if function is async.""" - import asyncio - return asyncio.iscoroutinefunction(func) - - -# --------------------------------------------------------------------------- -# HTTP Exception Handler Helper -# --------------------------------------------------------------------------- - -def capture_http_exception(request, exc, status_code: int): - """ - Capture HTTP exceptions that would otherwise be swallowed. - Call this from FastAPI exception handlers for 500+ errors. - - Args: - request: FastAPI request object - exc: The exception - status_code: HTTP status code being returned - """ - # Only capture server errors (5xx) - if status_code < 500 or not _sentry_initialized: - return - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - scope.set_tag("http_status", str(status_code)) - scope.set_extra("path", str(request.url.path)) - scope.set_extra("method", request.method) - sentry_sdk.capture_exception(exc) - except Exception: - pass +# Re-export from observability for convenience +from services.observability import ( + capture_exception, + capture_message, + set_operation_context, + add_breadcrumb, + operation_context, + get_logger, + metrics, + track_performance, +) + +__all__ = [ + "init_sentry", + "set_user_context", + "capture_exception", + "capture_message", + "set_operation_context", + "add_breadcrumb", + "operation_context", + "get_logger", + "metrics", + "track_performance", +] From eb69ca66b00fd95df16d339db27a45cd9a5ce473 Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 22:21:56 -0500 Subject: [PATCH 05/11] feat(observability): Refactor and simplify observability module - Cleaner StructuredLogger with JSON/Pretty formatters - Simplified trace_operation context manager - Added track_performance decorator for async/sync functions - Simple Metrics class for counters and timings - Better Sentry integration with filtering Updates #54 --- backend/services/observability.py | 495 ++++++++++++++---------------- backend/services/sentry.py | 119 ++++--- 2 files changed, 312 insertions(+), 302 deletions(-) diff --git a/backend/services/observability.py b/backend/services/observability.py index af40822..3aa6dc1 100644 --- a/backend/services/observability.py +++ b/backend/services/observability.py @@ -1,15 +1,14 @@ """ Observability Module -Centralized logging, tracing, and metrics for CodeIntel backend. +Centralized logging, tracing, and metrics for CodeIntel Features: - Structured JSON logging (prod) / Pretty logging (dev) -- Sentry integration with context management +- Sentry integration with context - Performance tracking decorators -- Operation context propagation +- Operation context managers """ import os -import sys import json import time import logging @@ -38,65 +37,100 @@ class StructuredLogger: Usage: logger = get_logger("indexer") - logger.info("Starting indexing", repo_id="abc", files=120) + logger.info("Starting indexing", repo_id="abc", files=100) logger.error("Failed to index", error=str(e), repo_id="abc") """ def __init__(self, name: str): self.name = name self.logger = logging.getLogger(name) - self.logger.setLevel(getattr(logging, LOG_LEVEL)) - # Avoid duplicate handlers + # Only configure if not already configured if not self.logger.handlers: - handler = logging.StreamHandler(sys.stdout) - handler.setFormatter(logging.Formatter('%(message)s')) + self.logger.setLevel(getattr(logging, LOG_LEVEL)) + handler = logging.StreamHandler() + handler.setLevel(getattr(logging, LOG_LEVEL)) + + if LOG_FORMAT == "json": + handler.setFormatter(JsonFormatter()) + else: + handler.setFormatter(PrettyFormatter()) + self.logger.addHandler(handler) + self.logger.propagate = False - def _format_message(self, level: str, message: str, **kwargs) -> str: - """Format log message based on environment""" - timestamp = datetime.utcnow().isoformat() + "Z" + def _log(self, level: str, message: str, **context): + """Internal log method with context""" + extra = { + "service": self.name, + "timestamp": datetime.utcnow().isoformat(), + "environment": ENVIRONMENT, + **context + } - if LOG_FORMAT == "json": - log_data = { - "timestamp": timestamp, - "level": level, - "logger": self.name, - "message": message, - **kwargs - } - return json.dumps(log_data) - else: - # Pretty format for development - level_icons = { - "DEBUG": "🔍", - "INFO": "ℹ️ ", - "WARNING": "⚠️ ", - "ERROR": "❌", - "CRITICAL": "🔥" - } - icon = level_icons.get(level, "•") - - extra = "" - if kwargs: - extra = " | " + " ".join(f"{k}={v}" for k, v in kwargs.items()) - - return f"{icon} [{self.name}] {message}{extra}" + log_method = getattr(self.logger, level) + log_method(message, extra={"structured": extra}) + + def debug(self, message: str, **context): + self._log("debug", message, **context) - def debug(self, message: str, **kwargs): - self.logger.debug(self._format_message("DEBUG", message, **kwargs)) + def info(self, message: str, **context): + self._log("info", message, **context) - def info(self, message: str, **kwargs): - self.logger.info(self._format_message("INFO", message, **kwargs)) + def warning(self, message: str, **context): + self._log("warning", message, **context) - def warning(self, message: str, **kwargs): - self.logger.warning(self._format_message("WARNING", message, **kwargs)) + def error(self, message: str, **context): + self._log("error", message, **context) + + # Also send to Sentry if it's a real error + if "error" in context or "exception" in context: + _capture_to_sentry(message, level="error", **context) + + def critical(self, message: str, **context): + self._log("critical", message, **context) + _capture_to_sentry(message, level="fatal", **context) + + +class JsonFormatter(logging.Formatter): + """JSON formatter for production logs""" + + def format(self, record): + structured = getattr(record, "structured", {}) + log_entry = { + "level": record.levelname.lower(), + "message": record.getMessage(), + **structured + } + return json.dumps(log_entry) + + +class PrettyFormatter(logging.Formatter): + """Pretty formatter for development""" - def error(self, message: str, **kwargs): - self.logger.error(self._format_message("ERROR", message, **kwargs)) + COLORS = { + "DEBUG": "\033[36m", # Cyan + "INFO": "\033[32m", # Green + "WARNING": "\033[33m", # Yellow + "ERROR": "\033[31m", # Red + "CRITICAL": "\033[35m", # Magenta + } + RESET = "\033[0m" - def critical(self, message: str, **kwargs): - self.logger.critical(self._format_message("CRITICAL", message, **kwargs)) + def format(self, record): + structured = getattr(record, "structured", {}) + color = self.COLORS.get(record.levelname, "") + + # Build context string + context_parts = [] + for key, value in structured.items(): + if key not in ("service", "timestamp", "environment"): + context_parts.append(f"{key}={value}") + + context_str = " | ".join(context_parts) if context_parts else "" + service = structured.get("service", "app") + + return f"{color}[{record.levelname}]{self.RESET} [{service}] {record.getMessage()} {context_str}" # Logger cache @@ -110,204 +144,164 @@ def get_logger(name: str) -> StructuredLogger: # ============================================================================ -# SENTRY CONTEXT MANAGEMENT +# SENTRY INTEGRATION # ============================================================================ -def set_operation_context( - operation: str, - repo_id: Optional[str] = None, - user_id: Optional[str] = None, - **extra -): - """ - Set operation context for Sentry error tracking. - - Usage: - set_operation_context("indexing", repo_id="abc", files=120) - """ +def _capture_to_sentry(message: str, level: str = "error", **context): + """Send message/error to Sentry with context""" try: import sentry_sdk - sentry_sdk.set_tag("operation", operation) - - if repo_id: - sentry_sdk.set_tag("repo_id", repo_id) - if user_id: - sentry_sdk.set_user({"id": user_id}) - - for key, value in extra.items(): - sentry_sdk.set_extra(key, value) + with sentry_sdk.push_scope() as scope: + for key, value in context.items(): + scope.set_extra(key, value) + if level == "fatal": + sentry_sdk.capture_message(message, level="fatal") + else: + sentry_sdk.capture_message(message, level=level) except ImportError: pass # Sentry not installed -def add_breadcrumb( - message: str, - category: str = "operation", - level: str = "info", - **data -): +def capture_exception(error: Exception, **context): """ - Add breadcrumb for debugging error context. + Capture an exception to Sentry with full context. Usage: - add_breadcrumb("Cloned repository", category="git", repo_id="abc") - add_breadcrumb("Extracted 340 functions", category="indexing") + try: + risky_operation() + except Exception as e: + capture_exception(e, repo_id="abc", operation="indexing") """ try: import sentry_sdk - sentry_sdk.add_breadcrumb( - message=message, - category=category, - level=level, - data=data - ) + + with sentry_sdk.push_scope() as scope: + for key, value in context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_exception(error) + except ImportError: + # Log to console if Sentry not available + logger = get_logger("error") + logger.error(f"Exception: {error}", exception=str(error), **context) + + +def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None): + """Set user context for error tracking""" + try: + import sentry_sdk + sentry_sdk.set_user({"id": user_id, "email": email}) + except ImportError: + pass + + +def set_tag(key: str, value: str): + """Set a tag that persists across the request""" + try: + import sentry_sdk + sentry_sdk.set_tag(key, value) except ImportError: pass +# ============================================================================ +# PERFORMANCE TRACKING +# ============================================================================ + @contextmanager -def operation_context( +def trace_operation( operation: str, - repo_id: Optional[str] = None, - user_id: Optional[str] = None, - **extra + description: Optional[str] = None, + **tags ): """ - Context manager for operation tracking with automatic error capture. + Context manager for tracing operations with timing. Usage: - with operation_context("indexing", repo_id="abc") as ctx: - # do work - ctx.set_extra("functions_indexed", 340) + with trace_operation("indexing", repo_id="abc") as span: + do_indexing() + span.set_data("files_processed", 100) """ logger = get_logger(operation) start_time = time.time() - # Set Sentry context - set_operation_context(operation, repo_id, user_id, **extra) - add_breadcrumb(f"Started {operation}", category=operation, repo_id=repo_id) + # Start Sentry span if available + span = None + try: + import sentry_sdk + span = sentry_sdk.start_span(op=operation, description=description) + for key, value in tags.items(): + span.set_tag(key, str(value)) + span.__enter__() + except ImportError: + pass - class Context: + # Create a simple span-like object for data attachment + class SpanData: def __init__(self): - self.extras = {} + self.data = {} - def set_extra(self, key: str, value: Any): - self.extras[key] = value - try: - import sentry_sdk - sentry_sdk.set_extra(key, value) - except ImportError: - pass + def set_data(self, key: str, value: Any): + self.data[key] = value + if span: + span.set_data(key, value) - ctx = Context() + span_data = SpanData() try: - logger.info(f"Starting {operation}", repo_id=repo_id, **extra) - yield ctx + logger.debug(f"Starting {operation}", **tags) + yield span_data duration = time.time() - start_time logger.info( f"Completed {operation}", - repo_id=repo_id, - duration_s=round(duration, 2), - **ctx.extras - ) - add_breadcrumb( - f"Completed {operation}", - category=operation, - duration_s=round(duration, 2) + duration_ms=round(duration * 1000, 2), + **tags, + **span_data.data ) except Exception as e: duration = time.time() - start_time logger.error( f"Failed {operation}", - repo_id=repo_id, error=str(e), - duration_s=round(duration, 2), - **ctx.extras + duration_ms=round(duration * 1000, 2), + **tags, + **span_data.data ) - - # Capture to Sentry with full context - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - scope.set_extra("duration_s", round(duration, 2)) - for key, value in ctx.extras.items(): - scope.set_extra(key, value) - sentry_sdk.capture_exception(e) - except ImportError: - pass - + capture_exception(e, operation=operation, **tags, **span_data.data) raise + + finally: + if span: + try: + span.__exit__(None, None, None) + except Exception: + pass -# ============================================================================ -# PERFORMANCE TRACKING -# ============================================================================ - -def track_performance(operation_name: Optional[str] = None): +def track_performance(operation: str = None): """ - Decorator to track function performance with Sentry spans. + Decorator for tracking function performance. Usage: - @track_performance("embedding_generation") - async def generate_embeddings(texts): + @track_performance("search") + async def semantic_search(query: str, repo_id: str): ... """ def decorator(func: Callable): - name = operation_name or func.__name__ + op_name = operation or func.__name__ @functools.wraps(func) async def async_wrapper(*args, **kwargs): - logger = get_logger(name) - start_time = time.time() - - try: - # Try to create Sentry span - try: - import sentry_sdk - with sentry_sdk.start_span(op=name, description=name) as span: - result = await func(*args, **kwargs) - span.set_data("success", True) - return result - except ImportError: - return await func(*args, **kwargs) - - except Exception as e: - duration = time.time() - start_time - logger.error(f"{name} failed", error=str(e), duration_s=round(duration, 2)) - raise - finally: - duration = time.time() - start_time - if duration > 5.0: # Log slow operations - logger.warning(f"{name} slow", duration_s=round(duration, 2)) + with trace_operation(op_name, description=func.__name__): + return await func(*args, **kwargs) @functools.wraps(func) def sync_wrapper(*args, **kwargs): - logger = get_logger(name) - start_time = time.time() - - try: - try: - import sentry_sdk - with sentry_sdk.start_span(op=name, description=name) as span: - result = func(*args, **kwargs) - span.set_data("success", True) - return result - except ImportError: - return func(*args, **kwargs) - - except Exception as e: - duration = time.time() - start_time - logger.error(f"{name} failed", error=str(e), duration_s=round(duration, 2)) - raise - finally: - duration = time.time() - start_time - if duration > 5.0: - logger.warning(f"{name} slow", duration_s=round(duration, 2)) + with trace_operation(op_name, description=func.__name__): + return func(*args, **kwargs) # Return appropriate wrapper based on function type if asyncio_iscoroutinefunction(func): @@ -329,104 +323,81 @@ def asyncio_iscoroutinefunction(func): class Metrics: """ - Simple in-memory metrics counters. - Can be extended to push to Prometheus/StatsD later. + Simple metrics collection. + + Usage: + metrics = get_metrics() + metrics.increment("indexing.files_processed", 10) + metrics.timing("search.latency_ms", 150) """ def __init__(self): self._counters: Dict[str, int] = {} - self._gauges: Dict[str, float] = {} - self._histograms: Dict[str, list] = {} + self._timings: Dict[str, list] = {} - def increment(self, name: str, value: int = 1, **tags): + def increment(self, name: str, value: int = 1): """Increment a counter""" - key = self._make_key(name, tags) - self._counters[key] = self._counters.get(key, 0) + value + self._counters[name] = self._counters.get(name, 0) + value - def gauge(self, name: str, value: float, **tags): - """Set a gauge value""" - key = self._make_key(name, tags) - self._gauges[key] = value + def timing(self, name: str, value_ms: float): + """Record a timing measurement""" + if name not in self._timings: + self._timings[name] = [] + self._timings[name].append(value_ms) + + # Keep only last 1000 measurements + if len(self._timings[name]) > 1000: + self._timings[name] = self._timings[name][-1000:] - def histogram(self, name: str, value: float, **tags): - """Record a histogram value""" - key = self._make_key(name, tags) - if key not in self._histograms: - self._histograms[key] = [] - self._histograms[key].append(value) - # Keep only last 1000 values - if len(self._histograms[key]) > 1000: - self._histograms[key] = self._histograms[key][-1000:] + def get_counter(self, name: str) -> int: + """Get counter value""" + return self._counters.get(name, 0) - def _make_key(self, name: str, tags: Dict) -> str: - if not tags: - return name - tag_str = ",".join(f"{k}={v}" for k, v in sorted(tags.items())) - return f"{name}{{{tag_str}}}" + def get_timing_stats(self, name: str) -> Dict[str, float]: + """Get timing statistics""" + timings = self._timings.get(name, []) + if not timings: + return {"count": 0, "avg": 0, "min": 0, "max": 0} + + return { + "count": len(timings), + "avg": sum(timings) / len(timings), + "min": min(timings), + "max": max(timings) + } - def get_stats(self) -> Dict: - """Get all metrics for debugging/monitoring endpoint""" + def get_all_stats(self) -> Dict[str, Any]: + """Get all metrics""" return { "counters": self._counters.copy(), - "gauges": self._gauges.copy(), - "histograms": { - k: { - "count": len(v), - "avg": sum(v) / len(v) if v else 0, - "min": min(v) if v else 0, - "max": max(v) if v else 0 - } - for k, v in self._histograms.items() + "timings": { + name: self.get_timing_stats(name) + for name in self._timings } } -# Global metrics instance -metrics = Metrics() +# Metrics singleton +_metrics: Optional[Metrics] = None + +def get_metrics() -> Metrics: + """Get metrics instance""" + global _metrics + if _metrics is None: + _metrics = Metrics() + return _metrics # ============================================================================ -# CONVENIENCE FUNCTIONS +# CONVENIENCE EXPORTS # ============================================================================ -def capture_exception(error: Exception, **context): - """ - Capture exception to Sentry with additional context. - - Usage: - try: - risky_operation() - except Exception as e: - capture_exception(e, repo_id="abc", operation="indexing") - """ - logger = get_logger("error") - logger.error("Exception captured", error=str(error), **context) - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - for key, value in context.items(): - scope.set_extra(key, value) - sentry_sdk.capture_exception(error) - except ImportError: - pass - - -def capture_message(message: str, level: str = "info", **context): - """ - Capture a message to Sentry. - - Usage: - capture_message("Unusual pattern detected", level="warning", pattern="...") - """ - logger = get_logger("message") - getattr(logger, level)(message, **context) - - try: - import sentry_sdk - with sentry_sdk.push_scope() as scope: - for key, value in context.items(): - scope.set_extra(key, value) - sentry_sdk.capture_message(message, level=level) - except ImportError: - pass +__all__ = [ + "get_logger", + "capture_exception", + "set_user_context", + "set_tag", + "trace_operation", + "track_performance", + "get_metrics", +] diff --git a/backend/services/sentry.py b/backend/services/sentry.py index c9a222c..12508b2 100644 --- a/backend/services/sentry.py +++ b/backend/services/sentry.py @@ -1,9 +1,9 @@ """ Sentry Error Tracking Integration -Provides production error visibility and performance monitoring. +Provides production error visibility and performance monitoring -For logging and context management, use observability module: - from services.observability import get_logger, operation_context, capture_exception +NOTE: This module initializes Sentry. For logging and tracing, +use the observability module: from services.observability import get_logger, trace_operation """ import os from typing import Optional @@ -33,11 +33,13 @@ def init_sentry() -> bool: dsn=sentry_dsn, environment=environment, - # Performance monitoring + # Performance monitoring - sample rate based on environment traces_sample_rate=0.1 if environment == "production" else 1.0, - profiles_sample_rate=0.1, - # Send PII for debugging + # Profile sampled transactions + profiles_sample_rate=0.1 if environment == "production" else 1.0, + + # Send PII for debugging (user IDs, emails) send_default_pii=True, # Integrations @@ -49,8 +51,14 @@ def init_sentry() -> bool: # Filter noisy events before_send=_filter_events, - # Debug in development + # Debug mode for development debug=environment == "development", + + # Attach stack traces to messages + attach_stacktrace=True, + + # Include local variables in stack traces + include_local_variables=True, ) print(f"✅ Sentry initialized (environment: {environment})") @@ -66,29 +74,39 @@ def init_sentry() -> bool: def _filter_events(event, hint): """Filter out noisy events before sending to Sentry.""" + # Don't send health check errors request_url = event.get("request", {}).get("url", "") if "/health" in request_url: return None - # Don't send 404s for bot paths - if event.get("exception"): - values = event["exception"].get("values", [{}]) - if values: - exception_value = str(values[0].get("value", "")) - bot_paths = ["/wp-admin", "/wp-login", "/.env", "/config", "/admin", "/phpmyadmin"] - if any(path in exception_value for path in bot_paths): - return None + # Don't send 404s for common bot paths + exception_values = event.get("exception", {}).get("values", []) + if exception_values: + exception_value = str(exception_values[0].get("value", "")) + bot_paths = ["/wp-admin", "/wp-login", "/.env", "/config", "/admin", "/phpmyadmin", "/.git"] + if any(path in exception_value for path in bot_paths): + return None + + # Don't send validation errors (they're expected) + if exception_values: + exception_type = exception_values[0].get("type", "") + if exception_type in ("RequestValidationError", "ValidationError"): + return None return event # ============================================================================ -# BACKWARD COMPATIBILITY - Delegate to observability module +# LEGACY FUNCTIONS - Use observability module for new code # ============================================================================ def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None): - """Set user context for error tracking.""" + """ + Set user context for error tracking. + + DEPRECATED: Use from services.observability import set_user_context + """ try: import sentry_sdk sentry_sdk.set_user({"id": user_id, "email": email}) @@ -96,27 +114,48 @@ def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None) pass -# Re-export from observability for convenience -from services.observability import ( - capture_exception, - capture_message, - set_operation_context, - add_breadcrumb, - operation_context, - get_logger, - metrics, - track_performance, -) +def capture_exception(error: Exception, **extra_context): + """ + Manually capture an exception with additional context. + + DEPRECATED: Use from services.observability import capture_exception + """ + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in extra_context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_exception(error) + except ImportError: + pass + + +def capture_message(message: str, level: str = "info", **extra_context): + """ + Capture a message (not an exception) for tracking. + + DEPRECATED: Use from services.observability import get_logger + """ + try: + import sentry_sdk + with sentry_sdk.push_scope() as scope: + for key, value in extra_context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_message(message, level=level) + except ImportError: + pass + -__all__ = [ - "init_sentry", - "set_user_context", - "capture_exception", - "capture_message", - "set_operation_context", - "add_breadcrumb", - "operation_context", - "get_logger", - "metrics", - "track_performance", -] +def set_operation_context(operation: str, **tags): + """ + Set operation context for the current scope. + + DEPRECATED: Use from services.observability import trace_operation + """ + try: + import sentry_sdk + sentry_sdk.set_tag("operation", operation) + for key, value in tags.items(): + sentry_sdk.set_tag(key, str(value)) + except ImportError: + pass From 2ecb7e596b7127425bdc6b005946ae1c0488a451 Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 22:37:51 -0500 Subject: [PATCH 06/11] refactor(observability): Simplify and improve observability module - Cleaner StructuredLogger with JSON (prod) / pretty (dev) output - Simplified Sentry helpers: set_operation_context, add_breadcrumb, capture_exception - track_time context manager for performance spans - trace_operation decorator for function-level tracing - Simple in-memory Metrics class for counters and timings - Better documentation with usage examples --- backend/services/observability.py | 510 ++++++++++++++---------------- 1 file changed, 237 insertions(+), 273 deletions(-) diff --git a/backend/services/observability.py b/backend/services/observability.py index 3aa6dc1..5ccc910 100644 --- a/backend/services/observability.py +++ b/backend/services/observability.py @@ -2,402 +2,366 @@ Observability Module Centralized logging, tracing, and metrics for CodeIntel -Features: -- Structured JSON logging (prod) / Pretty logging (dev) -- Sentry integration with context -- Performance tracking decorators -- Operation context managers +Usage: + from services.observability import logger, trace_operation, track_time + + logger.info("Starting indexing", repo_id="abc", files=100) + + @trace_operation("indexing") + async def index_repo(repo_id: str): + ... + + with track_time("embedding_batch"): + embeddings = await create_embeddings(texts) """ import os -import json +import sys import time import logging -import functools -from typing import Optional, Dict, Any, Callable +import json +from typing import Optional, Any, Dict +from functools import wraps from contextlib import contextmanager from datetime import datetime - -# ============================================================================ -# CONFIGURATION -# ============================================================================ - +# Environment ENVIRONMENT = os.getenv("ENVIRONMENT", "development") -LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper() -LOG_FORMAT = os.getenv("LOG_FORMAT", "pretty" if ENVIRONMENT == "development" else "json") +IS_PRODUCTION = ENVIRONMENT == "production" +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO" if IS_PRODUCTION else "DEBUG") -# ============================================================================ +# ============================================================================= # STRUCTURED LOGGER -# ============================================================================ +# ============================================================================= class StructuredLogger: """ - Structured logger with JSON output for production and pretty output for dev. + Structured logger that outputs JSON in production, pretty logs in development. Usage: - logger = get_logger("indexer") - logger.info("Starting indexing", repo_id="abc", files=100) - logger.error("Failed to index", error=str(e), repo_id="abc") + logger.info("User logged in", user_id="abc", ip="1.2.3.4") + logger.error("Failed to index", repo_id="xyz", error=str(e)) """ - def __init__(self, name: str): + def __init__(self, name: str = "codeintel"): self.name = name - self.logger = logging.getLogger(name) - - # Only configure if not already configured - if not self.logger.handlers: - self.logger.setLevel(getattr(logging, LOG_LEVEL)) - handler = logging.StreamHandler() - handler.setLevel(getattr(logging, LOG_LEVEL)) - - if LOG_FORMAT == "json": - handler.setFormatter(JsonFormatter()) - else: - handler.setFormatter(PrettyFormatter()) - - self.logger.addHandler(handler) - self.logger.propagate = False + self.level = getattr(logging, LOG_LEVEL.upper(), logging.INFO) + self._context: Dict[str, Any] = {} - def _log(self, level: str, message: str, **context): - """Internal log method with context""" - extra = { - "service": self.name, + def _format_message(self, level: str, message: str, **kwargs) -> str: + """Format log message based on environment""" + data = { "timestamp": datetime.utcnow().isoformat(), - "environment": ENVIRONMENT, - **context + "level": level, + "service": self.name, + "message": message, + **self._context, + **kwargs } - log_method = getattr(self.logger, level) - log_method(message, extra={"structured": extra}) + if IS_PRODUCTION: + # JSON for production (easy to parse in log aggregators) + return json.dumps(data) + else: + # Pretty format for development + extras = " | ".join(f"{k}={v}" for k, v in kwargs.items()) + ctx = " | ".join(f"{k}={v}" for k, v in self._context.items()) + parts = [f"[{level}] {message}"] + if ctx: + parts.append(f"[ctx: {ctx}]") + if extras: + parts.append(extras) + return " ".join(parts) - def debug(self, message: str, **context): - self._log("debug", message, **context) + def _log(self, level: str, level_num: int, message: str, **kwargs): + """Internal log method""" + if level_num < self.level: + return + + formatted = self._format_message(level, message, **kwargs) + + # Use stderr for errors, stdout for rest + output = sys.stderr if level_num >= logging.ERROR else sys.stdout + print(formatted, file=output) - def info(self, message: str, **context): - self._log("info", message, **context) + def set_context(self, **kwargs): + """Set persistent context for all subsequent logs""" + self._context.update(kwargs) - def warning(self, message: str, **context): - self._log("warning", message, **context) + def clear_context(self): + """Clear all context""" + self._context = {} - def error(self, message: str, **context): - self._log("error", message, **context) - - # Also send to Sentry if it's a real error - if "error" in context or "exception" in context: - _capture_to_sentry(message, level="error", **context) + def debug(self, message: str, **kwargs): + self._log("DEBUG", logging.DEBUG, message, **kwargs) - def critical(self, message: str, **context): - self._log("critical", message, **context) - _capture_to_sentry(message, level="fatal", **context) - - -class JsonFormatter(logging.Formatter): - """JSON formatter for production logs""" + def info(self, message: str, **kwargs): + self._log("INFO", logging.INFO, message, **kwargs) - def format(self, record): - structured = getattr(record, "structured", {}) - log_entry = { - "level": record.levelname.lower(), - "message": record.getMessage(), - **structured - } - return json.dumps(log_entry) - - -class PrettyFormatter(logging.Formatter): - """Pretty formatter for development""" + def warning(self, message: str, **kwargs): + self._log("WARNING", logging.WARNING, message, **kwargs) - COLORS = { - "DEBUG": "\033[36m", # Cyan - "INFO": "\033[32m", # Green - "WARNING": "\033[33m", # Yellow - "ERROR": "\033[31m", # Red - "CRITICAL": "\033[35m", # Magenta - } - RESET = "\033[0m" + def error(self, message: str, **kwargs): + self._log("ERROR", logging.ERROR, message, **kwargs) - def format(self, record): - structured = getattr(record, "structured", {}) - color = self.COLORS.get(record.levelname, "") - - # Build context string - context_parts = [] - for key, value in structured.items(): - if key not in ("service", "timestamp", "environment"): - context_parts.append(f"{key}={value}") - - context_str = " | ".join(context_parts) if context_parts else "" - service = structured.get("service", "app") - - return f"{color}[{record.levelname}]{self.RESET} [{service}] {record.getMessage()} {context_str}" + def critical(self, message: str, **kwargs): + self._log("CRITICAL", logging.CRITICAL, message, **kwargs) -# Logger cache -_loggers: Dict[str, StructuredLogger] = {} +# Global logger instance +logger = StructuredLogger() -def get_logger(name: str) -> StructuredLogger: - """Get or create a structured logger""" - if name not in _loggers: - _loggers[name] = StructuredLogger(name) - return _loggers[name] +# ============================================================================= +# SENTRY INTEGRATION HELPERS +# ============================================================================= + +def set_operation_context(operation: str, **kwargs): + """ + Set Sentry context for current operation. + + Args: + operation: Type of operation (indexing, search, analysis, etc.) + **kwargs: Additional context (repo_id, user_id, etc.) + """ + try: + import sentry_sdk + sentry_sdk.set_tag("operation", operation) + for key, value in kwargs.items(): + sentry_sdk.set_tag(key, str(value)) + sentry_sdk.set_context("operation_details", { + "type": operation, + **kwargs + }) + except ImportError: + pass -# ============================================================================ -# SENTRY INTEGRATION -# ============================================================================ -def _capture_to_sentry(message: str, level: str = "error", **context): - """Send message/error to Sentry with context""" +def add_breadcrumb(message: str, category: str = "custom", level: str = "info", **data): + """ + Add breadcrumb for Sentry error context. + + Breadcrumbs show the trail of events leading to an error. + """ try: import sentry_sdk - - with sentry_sdk.push_scope() as scope: - for key, value in context.items(): - scope.set_extra(key, value) - - if level == "fatal": - sentry_sdk.capture_message(message, level="fatal") - else: - sentry_sdk.capture_message(message, level=level) + sentry_sdk.add_breadcrumb( + message=message, + category=category, + level=level, + data=data + ) except ImportError: - pass # Sentry not installed + pass def capture_exception(error: Exception, **context): """ - Capture an exception to Sentry with full context. + Capture exception with additional context. - Usage: - try: - risky_operation() - except Exception as e: - capture_exception(e, repo_id="abc", operation="indexing") + Args: + error: The exception to capture + **context: Additional context to attach """ try: import sentry_sdk - with sentry_sdk.push_scope() as scope: for key, value in context.items(): scope.set_extra(key, value) sentry_sdk.capture_exception(error) + + # Also log it + logger.error( + f"Exception captured: {type(error).__name__}: {str(error)}", + **context + ) except ImportError: - # Log to console if Sentry not available - logger = get_logger("error") - logger.error(f"Exception: {error}", exception=str(error), **context) - - -def set_user_context(user_id: Optional[str] = None, email: Optional[str] = None): - """Set user context for error tracking""" - try: - import sentry_sdk - sentry_sdk.set_user({"id": user_id, "email": email}) - except ImportError: - pass + logger.error(f"Exception: {error}", **context) -def set_tag(key: str, value: str): - """Set a tag that persists across the request""" +def capture_message(message: str, level: str = "info", **context): + """Capture a message (not exception) to Sentry""" try: import sentry_sdk - sentry_sdk.set_tag(key, value) + with sentry_sdk.push_scope() as scope: + for key, value in context.items(): + scope.set_extra(key, value) + sentry_sdk.capture_message(message, level=level) except ImportError: pass -# ============================================================================ +# ============================================================================= # PERFORMANCE TRACKING -# ============================================================================ +# ============================================================================= @contextmanager -def trace_operation( - operation: str, - description: Optional[str] = None, - **tags -): +def track_time(operation: str, **tags): """ - Context manager for tracing operations with timing. + Context manager to track operation duration. Usage: - with trace_operation("indexing", repo_id="abc") as span: - do_indexing() - span.set_data("files_processed", 100) + with track_time("embedding_batch", batch_size=100): + embeddings = await create_embeddings(texts) + + Logs duration and creates Sentry span if available. """ - logger = get_logger(operation) - start_time = time.time() + start = time.perf_counter() # Start Sentry span if available span = None try: import sentry_sdk - span = sentry_sdk.start_span(op=operation, description=description) + span = sentry_sdk.start_span(op=operation, description=operation) for key, value in tags.items(): span.set_tag(key, str(value)) - span.__enter__() except ImportError: pass - # Create a simple span-like object for data attachment - class SpanData: - def __init__(self): - self.data = {} - - def set_data(self, key: str, value: Any): - self.data[key] = value - if span: - span.set_data(key, value) - - span_data = SpanData() + add_breadcrumb(f"Started: {operation}", category="performance", **tags) try: - logger.debug(f"Starting {operation}", **tags) - yield span_data + yield + finally: + duration = time.perf_counter() - start + duration_ms = round(duration * 1000, 2) - duration = time.time() - start_time - logger.info( - f"Completed {operation}", - duration_ms=round(duration * 1000, 2), - **tags, - **span_data.data - ) + # Log completion + logger.debug(f"{operation} completed", duration_ms=duration_ms, **tags) - except Exception as e: - duration = time.time() - start_time - logger.error( - f"Failed {operation}", - error=str(e), - duration_ms=round(duration * 1000, 2), - **tags, - **span_data.data - ) - capture_exception(e, operation=operation, **tags, **span_data.data) - raise - - finally: + # Finish Sentry span if span: - try: - span.__exit__(None, None, None) - except Exception: - pass + span.finish() + + add_breadcrumb( + f"Completed: {operation}", + category="performance", + duration_ms=duration_ms, + **tags + ) -def track_performance(operation: str = None): +def trace_operation(operation: str): """ - Decorator for tracking function performance. + Decorator to trace an entire function/method. Usage: - @track_performance("search") - async def semantic_search(query: str, repo_id: str): + @trace_operation("index_repository") + async def index_repository(repo_id: str): ... """ - def decorator(func: Callable): - op_name = operation or func.__name__ - - @functools.wraps(func) + def decorator(func): + @wraps(func) async def async_wrapper(*args, **kwargs): - with trace_operation(op_name, description=func.__name__): - return await func(*args, **kwargs) + # Extract useful context from kwargs + context = {k: v for k, v in kwargs.items() + if k in ('repo_id', 'user_id', 'query', 'file_path')} + + set_operation_context(operation, **context) + add_breadcrumb(f"Starting {operation}", category="function", **context) + + start = time.perf_counter() + try: + result = await func(*args, **kwargs) + duration = time.perf_counter() - start + logger.info( + f"{operation} completed successfully", + duration_s=round(duration, 2), + **context + ) + return result + except Exception as e: + duration = time.perf_counter() - start + capture_exception(e, operation=operation, duration_s=round(duration, 2), **context) + raise - @functools.wraps(func) + @wraps(func) def sync_wrapper(*args, **kwargs): - with trace_operation(op_name, description=func.__name__): - return func(*args, **kwargs) + context = {k: v for k, v in kwargs.items() + if k in ('repo_id', 'user_id', 'query', 'file_path')} + + set_operation_context(operation, **context) + add_breadcrumb(f"Starting {operation}", category="function", **context) + + start = time.perf_counter() + try: + result = func(*args, **kwargs) + duration = time.perf_counter() - start + logger.info( + f"{operation} completed successfully", + duration_s=round(duration, 2), + **context + ) + return result + except Exception as e: + duration = time.perf_counter() - start + capture_exception(e, operation=operation, duration_s=round(duration, 2), **context) + raise # Return appropriate wrapper based on function type - if asyncio_iscoroutinefunction(func): + import asyncio + if asyncio.iscoroutinefunction(func): return async_wrapper return sync_wrapper return decorator -def asyncio_iscoroutinefunction(func): - """Check if function is async""" - import asyncio - return asyncio.iscoroutinefunction(func) - - -# ============================================================================ -# METRICS (Simple counters - can be extended to Prometheus later) -# ============================================================================ +# ============================================================================= +# SIMPLE METRICS (in-memory counters) +# ============================================================================= class Metrics: """ - Simple metrics collection. + Simple in-memory metrics counters. Usage: - metrics = get_metrics() - metrics.increment("indexing.files_processed", 10) - metrics.timing("search.latency_ms", 150) + metrics.increment("search_requests", repo_id="abc") + metrics.timing("search_latency_ms", 150) + metrics.get_stats() # Returns all metrics """ def __init__(self): self._counters: Dict[str, int] = {} self._timings: Dict[str, list] = {} - def increment(self, name: str, value: int = 1): + def increment(self, name: str, value: int = 1, **tags): """Increment a counter""" - self._counters[name] = self._counters.get(name, 0) + value + key = f"{name}" + self._counters[key] = self._counters.get(key, 0) + value def timing(self, name: str, value_ms: float): """Record a timing measurement""" if name not in self._timings: self._timings[name] = [] self._timings[name].append(value_ms) - - # Keep only last 1000 measurements + # Keep only last 1000 timings if len(self._timings[name]) > 1000: self._timings[name] = self._timings[name][-1000:] - def get_counter(self, name: str) -> int: - """Get counter value""" - return self._counters.get(name, 0) - - def get_timing_stats(self, name: str) -> Dict[str, float]: - """Get timing statistics""" - timings = self._timings.get(name, []) - if not timings: - return {"count": 0, "avg": 0, "min": 0, "max": 0} - - return { - "count": len(timings), - "avg": sum(timings) / len(timings), - "min": min(timings), - "max": max(timings) - } - - def get_all_stats(self) -> Dict[str, Any]: - """Get all metrics""" - return { + def get_stats(self) -> Dict: + """Get all metrics with basic stats""" + stats = { "counters": self._counters.copy(), - "timings": { - name: self.get_timing_stats(name) - for name in self._timings - } + "timings": {} } + + for name, values in self._timings.items(): + if values: + stats["timings"][name] = { + "count": len(values), + "avg_ms": round(sum(values) / len(values), 2), + "min_ms": round(min(values), 2), + "max_ms": round(max(values), 2) + } + + return stats + + def reset(self): + """Reset all metrics""" + self._counters = {} + self._timings = {} -# Metrics singleton -_metrics: Optional[Metrics] = None - -def get_metrics() -> Metrics: - """Get metrics instance""" - global _metrics - if _metrics is None: - _metrics = Metrics() - return _metrics - - -# ============================================================================ -# CONVENIENCE EXPORTS -# ============================================================================ - -__all__ = [ - "get_logger", - "capture_exception", - "set_user_context", - "set_tag", - "trace_operation", - "track_performance", - "get_metrics", -] +# Global metrics instance +metrics = Metrics() From 234f868f498eec337ed628c33c25863dc31a6c68 Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 23:40:49 -0500 Subject: [PATCH 07/11] feat(observability): Instrument indexer with structured logging (#54) - Replace all print() with structured logger calls - Add trace_operation and track_time for performance spans - Add breadcrumbs for debugging flow - Capture exceptions with context (repo_id, operation) - Add metrics: indexing_completed, search_requests, search_latency_ms --- backend/services/indexer_optimized.py | 95 ++++++++++++++++----------- 1 file changed, 58 insertions(+), 37 deletions(-) diff --git a/backend/services/indexer_optimized.py b/backend/services/indexer_optimized.py index 2c7403d..641af35 100644 --- a/backend/services/indexer_optimized.py +++ b/backend/services/indexer_optimized.py @@ -31,6 +31,9 @@ # Search enhancement from services.search_enhancer import SearchEnhancer +# Observability +from services.observability import logger, trace_operation, track_time, capture_exception, add_breadcrumb, metrics + load_dotenv() # Configuration @@ -64,9 +67,9 @@ def __init__(self): if index_name in existing_indexes: # Use existing index (dimension already set) index_info = pc.describe_index(index_name) - print(f"📊 Using existing Pinecone index: {index_name} (dim={index_info.dimension})") + logger.info("Using existing Pinecone index", index=index_name, dimension=index_info.dimension) else: - print(f"Creating Pinecone index: {index_name} with dimension {EMBEDDING_DIMENSIONS}") + logger.info("Creating Pinecone index", index=index_name, dimension=EMBEDDING_DIMENSIONS) pc.create_index( name=index_name, dimension=EMBEDDING_DIMENSIONS, @@ -86,7 +89,7 @@ def __init__(self): 'typescript': self._create_parser(Language(tsjavascript.language())), } - print(f"✅ OptimizedCodeIndexer initialized! (model: {EMBEDDING_MODEL})") + logger.info("OptimizedCodeIndexer initialized", model=EMBEDDING_MODEL) def _create_parser(self, language) -> Parser: """Create a tree-sitter parser""" @@ -149,7 +152,8 @@ async def _create_embeddings_batch(self, texts: List[str]) -> List[List[float]]: return [item.embedding for item in response.data] except Exception as e: - print(f"❌ Error creating batch embeddings: {e}") + logger.error("Error creating batch embeddings", error=str(e), batch_size=len(texts)) + capture_exception(e, operation="create_embeddings", batch_size=len(texts)) # Return zero vectors on error return [[0.0] * EMBEDDING_DIMENSIONS for _ in texts] @@ -194,25 +198,26 @@ def _extract_functions(self, tree_node, source_code: bytes) -> List[Dict]: async def index_repository(self, repo_id: str, repo_path: str): """Index all code in a repository - OPTIMIZED VERSION""" - from services.sentry import set_operation_context, capture_exception + from services.observability import set_operation_context set_operation_context("indexing", repo_id=repo_id) + add_breadcrumb("Starting repository indexing", category="indexing", repo_id=repo_id) + start_time = time.time() - print(f"\n🚀 Starting optimized indexing for repo: {repo_id}") - print(f"📂 Path: {repo_path}") + logger.info("Starting optimized indexing", repo_id=repo_id, path=repo_path) # Discover code files code_files = self._discover_code_files(repo_path) - print(f"📄 Found {len(code_files)} code files") + logger.info("Code files discovered", repo_id=repo_id, file_count=len(code_files)) if not code_files: - print("⚠️ No code files found") + logger.warning("No code files found", repo_id=repo_id) return 0 # Extract all functions from all files (parallel) all_functions_data = [] - print(f"\n🔍 Extracting functions from files...") + add_breadcrumb("Extracting functions", category="indexing", file_count=len(code_files)) for i in range(0, len(code_files), self.FILE_BATCH_SIZE): batch = code_files[i:i + self.FILE_BATCH_SIZE] @@ -228,18 +233,18 @@ async def index_repository(self, repo_id: str, repo_path: str): if isinstance(result, list): all_functions_data.extend(result) - print(f" Processed {min(i + self.FILE_BATCH_SIZE, len(code_files))}/{len(code_files)} files, " - f"{len(all_functions_data)} functions extracted") + processed = min(i + self.FILE_BATCH_SIZE, len(code_files)) + logger.debug("File batch processed", processed=processed, total=len(code_files), functions=len(all_functions_data)) if not all_functions_data: - print("⚠️ No functions extracted") + logger.warning("No functions extracted", repo_id=repo_id) return 0 - print(f"\n✅ Total functions extracted: {len(all_functions_data)}") + logger.info("Functions extracted", repo_id=repo_id, count=len(all_functions_data)) + add_breadcrumb("Functions extracted", category="indexing", count=len(all_functions_data)) # Generate embeddings in BATCHES (this is the key optimization) - print(f"\n🧠 Generating embeddings in batches of {self.EMBEDDING_BATCH_SIZE}...") - print(f" Using model: {EMBEDDING_MODEL}") + logger.info("Generating embeddings", batch_size=self.EMBEDDING_BATCH_SIZE, model=EMBEDDING_MODEL) # Create rich embedding texts using search enhancer embedding_texts = [ @@ -248,15 +253,16 @@ async def index_repository(self, repo_id: str, repo_path: str): ] all_embeddings = [] - for i in range(0, len(embedding_texts), self.EMBEDDING_BATCH_SIZE): - batch_texts = embedding_texts[i:i + self.EMBEDDING_BATCH_SIZE] - batch_embeddings = await self._create_embeddings_batch(batch_texts) - all_embeddings.extend(batch_embeddings) - - print(f" Generated {len(all_embeddings)}/{len(embedding_texts)} embeddings") + with track_time("embedding_generation", repo_id=repo_id, total=len(embedding_texts)): + for i in range(0, len(embedding_texts), self.EMBEDDING_BATCH_SIZE): + batch_texts = embedding_texts[i:i + self.EMBEDDING_BATCH_SIZE] + batch_embeddings = await self._create_embeddings_batch(batch_texts) + all_embeddings.extend(batch_embeddings) + + logger.debug("Embeddings generated", progress=len(all_embeddings), total=len(embedding_texts)) # Prepare vectors for Pinecone - print(f"\n💾 Preparing vectors for Pinecone...") + add_breadcrumb("Uploading to Pinecone", category="indexing", vector_count=len(all_functions_data)) vectors_to_upsert = [] for func_data, embedding in zip(all_functions_data, all_embeddings): @@ -280,17 +286,24 @@ async def index_repository(self, repo_id: str, repo_path: str): }) # Upsert to Pinecone in batches - print(f"\n☁️ Uploading to Pinecone in batches of {self.PINECONE_UPSERT_BATCH}...") - for i in range(0, len(vectors_to_upsert), self.PINECONE_UPSERT_BATCH): - batch = vectors_to_upsert[i:i + self.PINECONE_UPSERT_BATCH] - self.index.upsert(vectors=batch) - print(f" Uploaded {min(i + self.PINECONE_UPSERT_BATCH, len(vectors_to_upsert))}/{len(vectors_to_upsert)} vectors") + with track_time("pinecone_upload", repo_id=repo_id, vectors=len(vectors_to_upsert)): + for i in range(0, len(vectors_to_upsert), self.PINECONE_UPSERT_BATCH): + batch = vectors_to_upsert[i:i + self.PINECONE_UPSERT_BATCH] + self.index.upsert(vectors=batch) + logger.debug("Vectors uploaded", progress=min(i + self.PINECONE_UPSERT_BATCH, len(vectors_to_upsert)), total=len(vectors_to_upsert)) elapsed = time.time() - start_time - print(f"\n✅ Indexing complete!") - print(f" • Total functions: {len(all_functions_data)}") - print(f" • Time taken: {elapsed:.2f}s") - print(f" • Speed: {len(all_functions_data)/elapsed:.1f} functions/sec") + speed = len(all_functions_data) / elapsed if elapsed > 0 else 0 + + logger.info( + "Indexing complete", + repo_id=repo_id, + functions=len(all_functions_data), + duration_s=round(elapsed, 2), + speed=round(speed, 1) + ) + metrics.increment("indexing_completed") + metrics.timing("indexing_duration_s", elapsed) return len(all_functions_data) @@ -324,7 +337,7 @@ async def _extract_functions_from_file( return functions except Exception as e: - print(f"❌ Error processing {file_path}: {e}") + logger.error("Error processing file", file_path=file_path, error=str(e)) return [] async def semantic_search( @@ -345,12 +358,15 @@ async def semantic_search( use_query_expansion: Expand query with related terms use_reranking: Rerank results with keyword boosting """ + start_time = time.time() + metrics.increment("search_requests") + try: # Step 1: Query expansion (adds related programming terms) search_query = query if use_query_expansion: search_query = await self.search_enhancer.expand_query(query) - print(f"🔍 Expanded query: {search_query[:100]}...") + logger.debug("Query expanded", original=query[:50], expanded=search_query[:100]) # Step 2: Generate query embedding query_embeddings = await self._create_embeddings_batch([search_query]) @@ -386,12 +402,16 @@ async def semantic_search( formatted_results ) + elapsed = time.time() - start_time + logger.info("Search completed", repo_id=repo_id, results=len(formatted_results), duration_ms=round(elapsed*1000, 2)) + metrics.timing("search_latency_ms", elapsed * 1000) + return formatted_results[:max_results] except Exception as e: - from services.sentry import capture_exception capture_exception(e, operation="search", repo_id=repo_id, query=query[:100]) - print(f"❌ Error searching: {e}") + logger.error("Search failed", repo_id=repo_id, error=str(e)) + metrics.increment("search_errors") return [] async def explain_code( @@ -439,7 +459,8 @@ async def explain_code( return response.choices[0].message.content except Exception as e: - print(f"❌ Error explaining code: {e}") + logger.error("Error explaining code", file_path=file_path, error=str(e)) + capture_exception(e, operation="explain_code", file_path=file_path) return f"Error: {str(e)}" async def index_repository_with_progress( From 0a4ca27ba7be2a3765927ec210604efac5e7f258 Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 23:49:09 -0500 Subject: [PATCH 08/11] feat(observability): Instrument search_enhancer and dependency_analyzer (#54) - search_enhancer: Add structured logging for query expansion errors - dependency_analyzer: Replace all print() with structured logger - Add metrics for dependency_graphs_built - Capture exceptions with context --- backend/services/dependency_analyzer.py | 25 ++++++++++++++----------- backend/services/search_enhancer.py | 5 ++++- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/backend/services/dependency_analyzer.py b/backend/services/dependency_analyzer.py index f95319d..47d57aa 100644 --- a/backend/services/dependency_analyzer.py +++ b/backend/services/dependency_analyzer.py @@ -11,6 +11,8 @@ import tree_sitter_javascript as tsjavascript from tree_sitter import Language, Parser +from services.observability import logger, capture_exception, track_time, metrics + class DependencyAnalyzer: """Analyze code dependencies and build dependency graph""" @@ -22,7 +24,7 @@ def __init__(self): 'javascript': Parser(Language(tsjavascript.language())), 'typescript': Parser(Language(tsjavascript.language())), } - print("✅ DependencyAnalyzer initialized!") + logger.info("DependencyAnalyzer initialized") def _detect_language(self, file_path: str) -> str: """Detect language from file extension""" @@ -117,7 +119,7 @@ def analyze_file_dependencies(self, file_path: str) -> Dict: } except Exception as e: - print(f"Error analyzing {file_path}: {e}") + logger.error("Error analyzing file", file_path=file_path, error=str(e)) return {"file": str(file_path), "imports": [], "language": language, "error": str(e)} def build_dependency_graph(self, repo_path: str) -> Dict: @@ -137,7 +139,7 @@ def build_dependency_graph(self, repo_path: str) -> Dict: if file_path.suffix in extensions: code_files.append(file_path) - print(f"📊 Building dependency graph for {len(code_files)} files...") + logger.info("Building dependency graph", file_count=len(code_files)) # Analyze each file file_dependencies = {} @@ -157,12 +159,12 @@ def build_dependency_graph(self, repo_path: str) -> Dict: # DEBUG: Show sample of what we're working with sample_files = list(internal_files)[:3] - print(f"📁 Sample internal files: {sample_files}") + logger.debug("Sample internal files", sample=sample_files) # Find a file with imports to debug for f, imports in list(file_dependencies.items())[:5]: if imports: - print(f"📄 {f} imports: {imports[:3]}") + logger.debug("Sample file imports", file=f, imports=imports[:3]) break # Create nodes @@ -198,12 +200,13 @@ def build_dependency_graph(self, repo_path: str) -> Dict: else: failed_count += 1 - print(f"🔗 Resolved {resolved_count} internal imports, {failed_count} external") + logger.info("Import resolution complete", resolved=resolved_count, external=failed_count) # Calculate metrics graph_metrics = self._calculate_graph_metrics(file_dependencies, edges) - print(f"✅ Graph built: {len(nodes)} nodes, {len(edges)} edges") + logger.info("Dependency graph built", nodes=len(nodes), edges=len(edges)) + metrics.increment("dependency_graphs_built") return { "nodes": nodes, @@ -440,7 +443,7 @@ def save_to_cache(self, repo_id: str, graph_data: Dict): db.clear_file_dependencies(repo_id) # Bulk insert new dependencies - print(f"💾 Saving {len(file_deps)} file dependencies to Supabase") + logger.info("Saving file dependencies to Supabase", repo_id=repo_id, count=len(file_deps)) db.upsert_file_dependencies(repo_id, file_deps) # Save repository insights @@ -457,7 +460,7 @@ def save_to_cache(self, repo_id: str, graph_data: Dict): } db.upsert_repository_insights(repo_id, insights) - print(f"✅ Cached dependency graph for {repo_id} in Supabase") + logger.info("Cached dependency graph in Supabase", repo_id=repo_id) def load_from_cache(self, repo_id: str) -> Dict: """Load dependency graph from Supabase cache""" @@ -467,7 +470,7 @@ def load_from_cache(self, repo_id: str) -> Dict: # Get file dependencies file_deps = db.get_file_dependencies(repo_id) - print(f"🔍 Loading cache for {repo_id}: found {len(file_deps) if file_deps else 0} file dependencies") + logger.debug("Loading cache", repo_id=repo_id, found=len(file_deps) if file_deps else 0) if not file_deps: return None @@ -492,7 +495,7 @@ def load_from_cache(self, repo_id: str) -> Dict: "total_edges": len(edges) } - print(f"✅ Loaded cached dependency graph for {repo_id} from Supabase") + logger.info("Loaded cached dependency graph", repo_id=repo_id) return { "dependencies": dependencies, diff --git a/backend/services/search_enhancer.py b/backend/services/search_enhancer.py index f2ec11a..fb25dff 100644 --- a/backend/services/search_enhancer.py +++ b/backend/services/search_enhancer.py @@ -8,6 +8,8 @@ from openai import AsyncOpenAI import os +from services.observability import logger, capture_exception + class SearchEnhancer: """Enhances search quality through various techniques""" @@ -56,7 +58,8 @@ async def expand_query(self, query: str) -> str: return f"{query} {expanded}" except Exception as e: - print(f"⚠️ Query expansion failed: {e}") + logger.warning("Query expansion failed", error=str(e), query=query[:50]) + capture_exception(e, operation="query_expansion", query=query[:50]) return query def extract_docstring(self, code: str, language: str) -> str: From cad47333c17daab6708500cbd80eb993458a82ce Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Thu, 11 Dec 2025 23:55:26 -0500 Subject: [PATCH 09/11] feat(observability): Instrument infrastructure services (#54) - cache.py: Structured logging + cache hit/miss/error metrics - repo_manager.py: Logging for repo sync and clone operations - supabase_service.py: DB operation logging Added metrics: - cache_hits, cache_misses, cache_errors - repos_cloned --- backend/services/cache.py | 26 +++++++++++++++++--------- backend/services/repo_manager.py | 12 +++++++----- backend/services/supabase_service.py | 9 +++++---- 3 files changed, 29 insertions(+), 18 deletions(-) diff --git a/backend/services/cache.py b/backend/services/cache.py index 11f0c29..a59e74f 100644 --- a/backend/services/cache.py +++ b/backend/services/cache.py @@ -9,6 +9,8 @@ import os from dotenv import load_dotenv +from services.observability import logger, metrics + load_dotenv() # Configuration @@ -30,7 +32,7 @@ def __init__(self): socket_connect_timeout=5, socket_timeout=5 ) - print(f"✅ Redis connected via URL!") + logger.info("Redis connected via URL") else: self.redis = redis.Redis( host=REDIS_HOST, @@ -40,12 +42,12 @@ def __init__(self): socket_connect_timeout=5, socket_timeout=5 ) - print(f"✅ Redis connected to {REDIS_HOST}:{REDIS_PORT}") + logger.info("Redis connected", host=REDIS_HOST, port=REDIS_PORT) # Test connection self.redis.ping() except redis.ConnectionError as e: - print(f"⚠️ Redis not available - running without cache: {e}") + logger.warning("Redis not available - running without cache", error=str(e)) self.redis = None def _make_key(self, prefix: str, *args) -> str: @@ -64,9 +66,12 @@ def get_search_results(self, query: str, repo_id: str) -> Optional[List[Dict]]: key = self._make_key("search", repo_id, query) cached = self.redis.get(key) if cached: + metrics.increment("cache_hits") return json.loads(cached) + metrics.increment("cache_misses") except Exception as e: - print(f"Cache read error: {e}") + logger.error("Cache read error", operation="get_search_results", error=str(e)) + metrics.increment("cache_errors") return None @@ -85,7 +90,8 @@ def set_search_results( key = self._make_key("search", repo_id, query) self.redis.setex(key, ttl, json.dumps(results)) except Exception as e: - print(f"Cache write error: {e}") + logger.error("Cache write error", operation="set_search_results", error=str(e)) + metrics.increment("cache_errors") def get_embedding(self, text: str) -> Optional[List[float]]: """Get cached embedding""" @@ -98,7 +104,8 @@ def get_embedding(self, text: str) -> Optional[List[float]]: if cached: return json.loads(cached) except Exception as e: - print(f"Cache read error: {e}") + logger.error("Cache read error", operation="get_embedding", error=str(e)) + metrics.increment("cache_errors") return None @@ -111,7 +118,8 @@ def set_embedding(self, text: str, embedding: List[float], ttl: int = 86400): key = self._make_key("emb", text[:100]) self.redis.setex(key, ttl, json.dumps(embedding)) except Exception as e: - print(f"Cache write error: {e}") + logger.error("Cache write error", operation="set_embedding", error=str(e)) + metrics.increment("cache_errors") def invalidate_repo(self, repo_id: str): """Invalidate all cache for a repository""" @@ -123,6 +131,6 @@ def invalidate_repo(self, repo_id: str): keys = self.redis.keys(pattern) if keys: self.redis.delete(*keys) - print(f"Invalidated {len(keys)} cache entries") + logger.info("Cache invalidated", repo_id=repo_id, keys_removed=len(keys)) except Exception as e: - print(f"Cache invalidation error: {e}") + logger.error("Cache invalidation error", repo_id=repo_id, error=str(e)) diff --git a/backend/services/repo_manager.py b/backend/services/repo_manager.py index c846122..bcdd6b7 100644 --- a/backend/services/repo_manager.py +++ b/backend/services/repo_manager.py @@ -8,6 +8,7 @@ import git from pathlib import Path from services.supabase_service import get_supabase_service +from services.observability import logger, capture_exception, metrics class RepositoryManager: @@ -29,7 +30,7 @@ def _sync_existing_repos(self): if not self.repos_dir.exists(): return - print("🔄 Syncing repositories...") + logger.info("Syncing repositories from disk") for repo_path in self.repos_dir.iterdir(): if not repo_path.is_dir() or repo_path.name.startswith('.'): @@ -39,7 +40,7 @@ def _sync_existing_repos(self): # Check if already in DB existing = self.db.get_repository(repo_path.name) if existing: - print(f"✅ Repo exists in DB: {existing['name']}") + logger.debug("Repo exists in DB", name=existing['name']) continue # Try to open as git repo @@ -73,10 +74,10 @@ def _sync_existing_repos(self): file_count * 20 # Estimate function count ) - print(f"✅ Synced repo from disk: {name} ({repo_path.name})") + logger.info("Synced repo from disk", name=name, repo_id=repo_path.name) except Exception as e: - print(f"⚠️ Error syncing {repo_path.name}: {e}") + logger.warning("Error syncing repo", repo=repo_path.name, error=str(e)) def list_repos(self) -> List[dict]: """List all repositories from Supabase""" @@ -106,7 +107,8 @@ def add_repo(self, name: str, git_url: str, branch: str = "main", user_id: Optio try: # Clone the repository - print(f"Cloning {git_url} to {local_path}...") + logger.info("Cloning repository", git_url=git_url, local_path=str(local_path)) + metrics.increment("repos_cloned") git.Repo.clone_from(git_url, local_path, branch=branch, depth=1) # Create DB record with ownership diff --git a/backend/services/supabase_service.py b/backend/services/supabase_service.py index 75ecd67..59aee91 100644 --- a/backend/services/supabase_service.py +++ b/backend/services/supabase_service.py @@ -9,6 +9,8 @@ from dotenv import load_dotenv import uuid +from services.observability import logger + load_dotenv() @@ -29,7 +31,7 @@ def __init__(self): persist_session=False ) self.client: Client = create_client(supabase_url, supabase_key, options) - print("✅ Supabase service initialized!") + logger.info("Supabase service initialized") # ===== REPOSITORIES ===== @@ -120,17 +122,16 @@ def upsert_file_dependencies(self, repo_id: str, dependencies: List[Dict]) -> No for dep in dependencies: dep["repo_id"] = repo_id - # Upsert with explicit conflict resolution result = self.client.table("file_dependencies").upsert( dependencies, on_conflict="repo_id,file_path" ).execute() - print(f"💾 Upserted {len(result.data) if result.data else 0} file dependencies") + logger.debug("Upserted file dependencies", count=len(result.data) if result.data else 0) def get_file_dependencies(self, repo_id: str) -> List[Dict]: """Get all file dependencies for a repo""" result = self.client.table("file_dependencies").select("*").eq("repo_id", repo_id).execute() - print(f"🔍 Query file_dependencies for {repo_id}: found {len(result.data) if result.data else 0} rows") + logger.debug("Queried file dependencies", repo_id=repo_id, count=len(result.data) if result.data else 0) return result.data or [] def get_file_impact(self, repo_id: str, file_path: str) -> Optional[Dict]: From 1f228cfadbe37128b9a5d380ed6320c51354a9dc Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Fri, 12 Dec 2025 00:08:03 -0500 Subject: [PATCH 10/11] feat(observability): Instrument all routes (#54) - playground.py: Structured logging for demo repo loading - analysis.py: Logging for dependency graph and style analysis - repos.py: Logging for indexing operations + WebSocket error capture All print() statements replaced with structured logger calls. --- backend/routes/analysis.py | 13 +++++++------ backend/routes/playground.py | 5 +++-- backend/routes/repos.py | 9 ++++++--- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/backend/routes/analysis.py b/backend/routes/analysis.py index cca4356..2d194fb 100644 --- a/backend/routes/analysis.py +++ b/backend/routes/analysis.py @@ -8,6 +8,7 @@ ) from services.input_validator import InputValidator from middleware.auth import require_auth, AuthContext +from services.observability import logger, metrics router = APIRouter(prefix="/repos", tags=["Analysis"]) @@ -29,11 +30,11 @@ async def get_dependency_graph( # Try cache first cached_graph = dependency_analyzer.load_from_cache(repo_id) if cached_graph: - print(f"✅ Using cached dependency graph for {repo_id}") + logger.debug("Using cached dependency graph", repo_id=repo_id) return {**cached_graph, "cached": True} # Build fresh - print(f"🔄 Building fresh dependency graph for {repo_id}") + logger.info("Building fresh dependency graph", repo_id=repo_id) graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"]) dependency_analyzer.save_to_cache(repo_id, graph_data) @@ -62,7 +63,7 @@ async def analyze_impact( # Get or build graph graph_data = dependency_analyzer.load_from_cache(repo_id) if not graph_data: - print(f"🔄 Building dependency graph for impact analysis") + logger.info("Building dependency graph for impact analysis", repo_id=repo_id) graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"]) dependency_analyzer.save_to_cache(repo_id, graph_data) @@ -89,7 +90,7 @@ async def get_repository_insights( # Get or build graph graph_data = dependency_analyzer.load_from_cache(repo_id) if not graph_data: - print(f"🔄 Building dependency graph for insights") + logger.info("Building dependency graph for insights", repo_id=repo_id) graph_data = dependency_analyzer.build_dependency_graph(repo["local_path"]) dependency_analyzer.save_to_cache(repo_id, graph_data) @@ -121,11 +122,11 @@ async def get_style_analysis( # Try cache first cached_style = style_analyzer.load_from_cache(repo_id) if cached_style: - print(f"✅ Using cached code style for {repo_id}") + logger.debug("Using cached code style", repo_id=repo_id) return {**cached_style, "cached": True} # Analyze fresh - print(f"🔄 Analyzing code style for {repo_id}") + logger.info("Analyzing code style", repo_id=repo_id) style_data = style_analyzer.analyze_repository_style(repo["local_path"]) style_analyzer.save_to_cache(repo_id, style_data) diff --git a/backend/routes/playground.py b/backend/routes/playground.py index 51aa40d..2f3b2b0 100644 --- a/backend/routes/playground.py +++ b/backend/routes/playground.py @@ -6,6 +6,7 @@ from dependencies import indexer, cache, repo_manager from services.input_validator import InputValidator +from services.observability import logger router = APIRouter(prefix="/playground", tags=["Playground"]) @@ -39,9 +40,9 @@ async def load_demo_repos(): DEMO_REPO_IDS["express"] = repo["id"] elif "react" in name_lower: DEMO_REPO_IDS["react"] = repo["id"] - print(f"📦 Loaded demo repos: {list(DEMO_REPO_IDS.keys())}") + logger.info("Loaded demo repos", repos=list(DEMO_REPO_IDS.keys())) except Exception as e: - print(f"⚠️ Could not load demo repos: {e}") + logger.warning("Could not load demo repos", error=str(e)) def _check_rate_limit(ip: str) -> tuple[bool, int]: diff --git a/backend/routes/repos.py b/backend/routes/repos.py index 4bc0bea..42041ce 100644 --- a/backend/routes/repos.py +++ b/backend/routes/repos.py @@ -12,6 +12,7 @@ ) from services.input_validator import InputValidator from middleware.auth import require_auth, AuthContext +from services.observability import logger, capture_exception router = APIRouter(prefix="/repos", tags=["Repositories"]) @@ -100,7 +101,7 @@ async def index_repository( last_commit = repo_manager.get_last_indexed_commit(repo_id) if incremental and last_commit: - print(f"🔄 Using INCREMENTAL indexing (last: {last_commit[:8]})") + logger.info("Using INCREMENTAL indexing", repo_id=repo_id, last_commit=last_commit[:8]) total_functions = await indexer.incremental_index_repository( repo_id, repo["local_path"], @@ -108,7 +109,7 @@ async def index_repository( ) index_type = "incremental" else: - print(f"📦 Using FULL indexing") + logger.info("Using FULL indexing", repo_id=repo_id) total_functions = await indexer.index_repository(repo_id, repo["local_path"]) index_type = "full" @@ -204,8 +205,10 @@ async def progress_callback(files_processed: int, functions_indexed: int, total_ pass except WebSocketDisconnect: - print(f"WebSocket disconnected for repo {repo_id}") + logger.debug("WebSocket disconnected", repo_id=repo_id) except Exception as e: + logger.error("WebSocket indexing error", repo_id=repo_id, error=str(e)) + capture_exception(e, operation="websocket_indexing", repo_id=repo_id) try: await websocket.send_json({"type": "error", "message": str(e)}) except Exception: From 9403e06491d150031a4a32e00deb30741d969743 Mon Sep 17 00:00:00 2001 From: Devanshu Rajesh Chicholikar Date: Fri, 12 Dec 2025 00:21:26 -0500 Subject: [PATCH 11/11] feat(observability): Complete instrumentation of all services (#54) - style_analyzer.py: Logging for style analysis operations - performance_metrics.py: Logging for metrics initialization - indexer_optimized.py: Full logging for incremental indexing + error capture All print() statements replaced with structured logger calls. Total: ~50 print statements converted to structured logs. --- backend/services/indexer_optimized.py | 63 +++++++++++++------------ backend/services/performance_metrics.py | 4 +- backend/services/style_analyzer.py | 20 ++++---- 3 files changed, 47 insertions(+), 40 deletions(-) diff --git a/backend/services/indexer_optimized.py b/backend/services/indexer_optimized.py index 641af35..df579ee 100644 --- a/backend/services/indexer_optimized.py +++ b/backend/services/indexer_optimized.py @@ -471,12 +471,12 @@ async def index_repository_with_progress( ): """Index repository with real-time progress updates""" start_time = time.time() - print(f"\n🚀 Starting optimized indexing with progress for repo: {repo_id}") + logger.info("Starting optimized indexing with progress", repo_id=repo_id) # Discover code files code_files = self._discover_code_files(repo_path) total_files = len(code_files) - print(f"📄 Found {total_files} code files") + logger.info("Found code files", repo_id=repo_id, total_files=total_files) if not code_files: await progress_callback(0, 0, 0) @@ -486,7 +486,7 @@ async def index_repository_with_progress( all_functions_data = [] files_processed = 0 - print(f"\n🔍 Extracting functions from files...") + logger.debug("Extracting functions from files") for i in range(0, len(code_files), self.FILE_BATCH_SIZE): batch = code_files[i:i + self.FILE_BATCH_SIZE] @@ -507,14 +507,16 @@ async def index_repository_with_progress( # Send progress update await progress_callback(files_processed, len(all_functions_data), total_files) - print(f" Processed {files_processed}/{total_files} files, " - f"{len(all_functions_data)} functions extracted") + logger.debug("Processing files", + processed=files_processed, + total=total_files, + functions_extracted=len(all_functions_data)) if not all_functions_data: return 0 # Generate embeddings in BATCHES - print(f"\n🧠 Generating embeddings in batches of {self.EMBEDDING_BATCH_SIZE}...") + logger.debug("Generating embeddings in batches", batch_size=self.EMBEDDING_BATCH_SIZE) # Create rich embedding texts using search enhancer embedding_texts = [ @@ -528,10 +530,10 @@ async def index_repository_with_progress( batch_embeddings = await self._create_embeddings_batch(batch_texts) all_embeddings.extend(batch_embeddings) - print(f" Generated {len(all_embeddings)}/{len(embedding_texts)} embeddings") + logger.debug("Embeddings generated", completed=len(all_embeddings), total=len(embedding_texts)) # Prepare vectors for Pinecone - print(f"\n💾 Uploading to Pinecone...") + logger.debug("Uploading to Pinecone") vectors_to_upsert = [] for func_data, embedding in zip(all_functions_data, all_embeddings): @@ -560,10 +562,11 @@ async def index_repository_with_progress( self.index.upsert(vectors=batch) elapsed = time.time() - start_time - print(f"\n✅ Indexing complete!") - print(f" • Total functions: {len(all_functions_data)}") - print(f" • Time taken: {elapsed:.2f}s") - print(f" • Speed: {len(all_functions_data)/elapsed:.1f} functions/sec") + logger.info("Indexing with progress complete", + repo_id=repo_id, + total_functions=len(all_functions_data), + duration_s=round(elapsed, 2), + speed=round(len(all_functions_data)/elapsed, 1) if elapsed > 0 else 0) return len(all_functions_data) @@ -578,14 +581,13 @@ async def incremental_index_repository( import time start_time = time.time() - print(f"\n🔄 Starting INCREMENTAL indexing for repo: {repo_id}") - print(f"📍 Last indexed commit: {last_commit_sha[:8]}") + logger.info("Starting INCREMENTAL indexing", repo_id=repo_id, last_commit=last_commit_sha[:8]) try: repo = git.Repo(repo_path) current_commit = repo.head.commit.hexsha - print(f"📍 Current commit: {current_commit[:8]}") + logger.debug("Current commit", current_commit=current_commit[:8]) # Get changed files if last_commit_sha: @@ -593,7 +595,7 @@ async def incremental_index_repository( changed_files = diff.split('\n') if diff else [] else: # No previous commit, index everything - print("⚠️ No previous commit - doing full index") + logger.warning("No previous commit - doing full index") return await self.index_repository(repo_id, repo_path) # Filter for code files only @@ -603,10 +605,10 @@ async def incremental_index_repository( if Path(f).suffix in code_extensions ] - print(f"📄 Found {len(changed_files)} total changes, {len(changed_code_files)} code files") + logger.info("Found changed files", total_changes=len(changed_files), code_files=len(changed_code_files)) if not changed_code_files: - print("✅ No code changes detected - skipping indexing") + logger.info("No code changes detected - skipping indexing") return 0 # Extract functions from changed files @@ -615,19 +617,19 @@ async def incremental_index_repository( for file_path in changed_code_files: full_path = Path(repo_path) / file_path if not full_path.exists(): - print(f"⚠️ File deleted: {file_path} - skipping") + logger.debug("File deleted - skipping", file_path=file_path) continue functions = await self._extract_functions_from_file(repo_id, str(full_path)) all_functions_data.extend(functions) - print(f" Processed {file_path}: {len(functions)} functions") + logger.debug("Processed changed file", file_path=file_path, functions=len(functions)) if not all_functions_data: - print("✅ No functions to index") + logger.info("No functions to index") return 0 # Generate embeddings in batches - print(f"\n🧠 Generating embeddings for {len(all_functions_data)} functions...") + logger.debug("Generating embeddings", function_count=len(all_functions_data)) # Create rich embedding texts using search enhancer embedding_texts = [ @@ -671,16 +673,17 @@ async def incremental_index_repository( elapsed = time.time() - start_time - print(f"\n✅ Incremental indexing complete!") - print(f" • Changed files: {len(changed_code_files)}") - print(f" • Functions updated: {len(all_functions_data)}") - print(f" • Time taken: {elapsed:.2f}s") - print(f" • Speed: {len(all_functions_data)/elapsed:.1f} functions/sec") - print(f" • 🚀 INCREMENTAL SPEEDUP: ~{100/elapsed:.0f}x faster than full re-index!") + logger.info("Incremental indexing complete", + repo_id=repo_id, + changed_files=len(changed_code_files), + functions_updated=len(all_functions_data), + duration_s=round(elapsed, 2), + speed=round(len(all_functions_data)/elapsed, 1) if elapsed > 0 else 0) return len(all_functions_data) except Exception as e: - print(f"❌ Incremental indexing error: {e}") - print("Falling back to full index...") + logger.error("Incremental indexing error - falling back to full index", + repo_id=repo_id, error=str(e)) + capture_exception(e, operation="incremental_indexing", repo_id=repo_id) return await self.index_repository(repo_id, repo_path) diff --git a/backend/services/performance_metrics.py b/backend/services/performance_metrics.py index 31516b8..77363ec 100644 --- a/backend/services/performance_metrics.py +++ b/backend/services/performance_metrics.py @@ -7,6 +7,8 @@ from collections import deque import time +from services.observability import logger + class PerformanceMetrics: """Track performance metrics for monitoring""" @@ -19,7 +21,7 @@ def __init__(self): self.cache_misses = 0 self.total_searches = 0 - print("✅ PerformanceMetrics initialized!") + logger.debug("PerformanceMetrics initialized") def record_indexing(self, repo_id: str, duration: float, function_count: int): """Record indexing performance""" diff --git a/backend/services/style_analyzer.py b/backend/services/style_analyzer.py index 9c7d904..28a584b 100644 --- a/backend/services/style_analyzer.py +++ b/backend/services/style_analyzer.py @@ -12,6 +12,8 @@ import tree_sitter_javascript as tsjavascript from tree_sitter import Language, Parser +from services.observability import logger + class StyleAnalyzer: """Analyze code style and team patterns""" @@ -23,7 +25,7 @@ def __init__(self): 'javascript': Parser(Language(tsjavascript.language())), 'typescript': Parser(Language(tsjavascript.language())), } - print("✅ StyleAnalyzer initialized!") + logger.debug("StyleAnalyzer initialized") def _detect_language(self, file_path: str) -> str: """Detect language from extension""" @@ -141,7 +143,7 @@ def analyze_repository_style(self, repo_path: str) -> Dict: """Analyze coding style patterns across repository""" repo_path = Path(repo_path) - print(f"🎨 Analyzing code style for repository...") + logger.info("Analyzing code style for repository") # Discover code files code_files = [] @@ -197,7 +199,7 @@ def analyze_repository_style(self, repo_path: str) -> Dict: language_dist[language] += 1 except Exception as e: - print(f"Error analyzing {file_path}: {e}") + logger.warning("Error analyzing file", file_path=str(file_path), error=str(e)) continue # Analyze naming conventions @@ -212,10 +214,10 @@ def analyze_repository_style(self, repo_path: str) -> Dict: total_functions = len(function_names) total_classes = len(class_names) - print(f"✅ Style analysis complete!") - print(f" • {total_files} files analyzed") - print(f" • {total_functions} functions found") - print(f" • {total_classes} classes found") + logger.info("Style analysis complete", + files_analyzed=total_files, + functions_found=total_functions, + classes_found=total_classes) return { "summary": { @@ -281,7 +283,7 @@ def save_to_cache(self, repo_id: str, style_data: Dict): db.upsert_code_style(repo_id, language, analysis) - print(f"✅ Cached code style analysis for {repo_id} in Supabase") + logger.debug("Cached code style analysis in Supabase", repo_id=repo_id) def load_from_cache(self, repo_id: str) -> Dict: """Load style analysis from Supabase cache""" @@ -314,7 +316,7 @@ def load_from_cache(self, repo_id: str) -> Dict: if style.get("patterns"): patterns = style["patterns"] - print(f"✅ Loaded cached code style for {repo_id} from Supabase") + logger.debug("Loaded cached code style from Supabase", repo_id=repo_id) return { "languages": languages,