Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 41 additions & 1 deletion backend/app/celery/celery_app.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import logging

from celery import Celery
from celery.signals import task_prerun, worker_process_init
from celery.signals import task_failure, task_postrun, task_prerun, worker_process_init
from kombu import Exchange, Queue

from app.core.config import settings
Expand All @@ -27,6 +27,46 @@ def log_pool_status(task: "celery.Task", **_: object) -> None: # type: ignore[n
)


@task_postrun.connect
def log_pool_status_post(task: "celery.Task", **_: object) -> None: # type: ignore[name-defined]
"""Log DB connection pool state after each task completes (success or failure).

Compare with task_prerun log — if checked_out is the same or higher after
the task, connections were not returned and are leaking.
"""
from app.core.db import engine
from sqlalchemy.pool import QueuePool

pool = engine.pool
if isinstance(pool, QueuePool):
logger.info(
f"[pool] POST task={task.name} checked_out={pool.checkedout()} "
f"size={pool.size()} overflow={pool.overflow()}"
)


@task_failure.connect
def log_pool_status_failure(
task_id: str, exception: Exception, sender: "celery.Task", **_: object # type: ignore[name-defined]
) -> None:
"""Log DB connection pool state on task failure.

Failures are the most likely path for sessions to leak — exceptions can
bypass session cleanup if not guarded by a context manager.
"""
from app.core.db import engine
from sqlalchemy.pool import QueuePool

pool = engine.pool
if isinstance(pool, QueuePool):
logger.warning(
f"[pool] FAILED task={sender.name} task_id={task_id} "
f"exc={type(exception).__name__} "
f"checked_out={pool.checkedout()} "
f"size={pool.size()} overflow={pool.overflow()}"
)


@worker_process_init.connect
def warm_llm_modules(**_) -> None:
"""Import LLM service modules in each worker process right after fork.
Expand Down
Loading