diff --git a/backend/app/celery/celery_app.py b/backend/app/celery/celery_app.py index 45364a72f..975948600 100644 --- a/backend/app/celery/celery_app.py +++ b/backend/app/celery/celery_app.py @@ -1,7 +1,7 @@ import logging from celery import Celery -from celery.signals import worker_process_init +from celery.signals import task_prerun, worker_process_init from kombu import Exchange, Queue from app.core.config import settings @@ -9,6 +9,24 @@ logger = logging.getLogger(__name__) +@task_prerun.connect +def log_pool_status(task: "celery.Task", **_: object) -> None: # type: ignore[name-defined] + """Log DB connection pool state before each task to detect connection leaks. + + If checked_out equals pool size right when a task starts, connections + are being held across tasks (likely across LLM API calls) and leaking. + """ + from app.core.db import engine + from sqlalchemy.pool import QueuePool + + pool = engine.pool + if isinstance(pool, QueuePool): + logger.info( + f"[pool] task={task.name} checked_out={pool.checkedout()} " + f"size={pool.size()} overflow={pool.overflow()}" + ) + + @worker_process_init.connect def warm_llm_modules(**_) -> None: """Import LLM service modules in each worker process right after fork.