From d4abb2ce3737db9de449ec07d69004410bfceb14 Mon Sep 17 00:00:00 2001 From: edaywalid Date: Mon, 1 Jun 2026 01:01:23 +0100 Subject: [PATCH 1/7] fix(photo-worker): commit photo processing writes per message The photo worker opened its connection with engine.connect(), which does not autocommit, and never issued a commit. Every write in the message handler (processing job status, detected faces, photo status/visibility, and the single-face match) was rolled back when the connection closed, so photos were processed but their results never persisted. Wrap each message in a single transaction via self._conn.begin(). Messages are delivered one at a time, so the shared connection is only ever used by one handler at a time. SingleFaceMatchService previously managed its own transaction with self.conn.begin(). That nested begin() raised once create_processing_job had already opened the connection's transaction, and any writes it made after the block were left uncommitted. It now runs inside the worker's transaction instead. --- app/service/face_match.py | 70 ++++++++++++++++----------------- app/worker/photo_worker/main.py | 60 +++++++++++++++------------- 2 files changed, 67 insertions(+), 63 deletions(-) diff --git a/app/service/face_match.py b/app/service/face_match.py index 672c17e..346cce0 100644 --- a/app/service/face_match.py +++ b/app/service/face_match.py @@ -46,45 +46,45 @@ async def process_detected_face( created_face_match_id: UUID | None = None matched_user: ClosestUserMatch | None = None + # Writes run inside the caller's transaction; the worker owns commit/rollback. try: - async with self.conn.begin(): - if not await self.Check_photo_exists(job.photo_id): - logger.warning("Photo not found: %s", job.photo_id) - return + if not await self.Check_photo_exists(job.photo_id): + logger.warning("Photo not found: %s", job.photo_id) + return - if await self._match_exists_for_photo(job.photo_id): - logger.info("Photo %s already matched; skipping", job.photo_id) - return + if await self._match_exists_for_photo(job.photo_id): + logger.info("Photo %s already matched; skipping", job.photo_id) + return - matched_user = await self.user_match_service.find_closest_user( - embedding_literal=embedding_literal, - ) - if await self._autoapprove_if_unmatchable(job, matched_user): - return - assert matched_user is not None - - params = photo_face_queries.PhotoFacesEnsureFaceMatchParams( - photo_id=job.photo_id, - face_index=job.face_index, - column_3=embedding_literal, - bbox=bbox_payload, - user_id=matched_user.user_id, - confidence=matched_user.distance, + matched_user = await self.user_match_service.find_closest_user( + embedding_literal=embedding_literal, + ) + if await self._autoapprove_if_unmatchable(job, matched_user): + return + assert matched_user is not None + + params = photo_face_queries.PhotoFacesEnsureFaceMatchParams( + photo_id=job.photo_id, + face_index=job.face_index, + column_3=embedding_literal, + bbox=bbox_payload, + user_id=matched_user.user_id, + confidence=matched_user.distance, + ) + result = await self.photo_face_querier.photo_faces_ensure_face_match(params) + if result is None: + logger.warning("Failed to ensure face match for photo %s", job.photo_id) + return + + if result.face_match_id is None: + logger.info("Match already exists for photo %s; skipping", job.photo_id) + else: + created_face_match_id = result.face_match_id + logger.info( + "Inserted face match %s for photo %s", + created_face_match_id, + job.photo_id, ) - result = await self.photo_face_querier.photo_faces_ensure_face_match(params) - if result is None: - logger.warning("Failed to ensure face match for photo %s", job.photo_id) - return - - if result.face_match_id is None: - logger.info("Match already exists for photo %s; skipping", job.photo_id) - else: - created_face_match_id = result.face_match_id - logger.info( - "Inserted face match %s for photo %s", - created_face_match_id, - job.photo_id, - ) except (DBAPIError, SQLAlchemyError) as exc: logger.warning("DB write failed for photo %s: %s", job.photo_id, exc) return diff --git a/app/worker/photo_worker/main.py b/app/worker/photo_worker/main.py index 651b606..c05c0a3 100644 --- a/app/worker/photo_worker/main.py +++ b/app/worker/photo_worker/main.py @@ -60,40 +60,44 @@ async def handle_message(self, data: bytes) -> None: if event is None: return - job = await self._create_job(event) + # One transaction per message so every write below is committed (or rolled + # back) atomically. Messages are delivered one at a time, so the shared + # connection is never used by two handlers at once. + async with self._conn.begin(): + job = await self._create_job(event) - try: - payload = await self._load_image(event.image_ref) - except Exception as exc: - logger.warning("Failed to load image for photo %s: %s", event.photo_id, exc) - await self._update_job(job, "failed") - return + try: + payload = await self._load_image(event.image_ref) + except Exception as exc: + logger.warning("Failed to load image for photo %s: %s", event.photo_id, exc) + await self._update_job(job, "failed") + return - await self._update_job(job, "running") + await self._update_job(job, "running") - try: - faces = await self._face_service.detect_faces(payload) - except Exception as exc: - logger.warning("Face detection failed for photo %s: %s", event.photo_id, exc) - await self._update_job(job, "failed") - return + try: + faces = await self._face_service.detect_faces(payload) + except Exception as exc: + logger.warning("Face detection failed for photo %s: %s", event.photo_id, exc) + await self._update_job(job, "failed") + return + + if not faces: + logger.info("No faces detected in photo %s, marking as public", event.photo_id) + await self._photo_querier.update_photo_status(id=event.photo_id, status="approved") + await self._photo_querier.update_photo_visibility(id=event.photo_id, visibility="public") + await self._update_job(job, "completed") + await self._schedule_cleanup(event.image_ref) + return + + if len(faces) == 1: + await self._handle_single_face(event, faces[0]) + else: + await self._handle_group_photo(event, faces) - if not faces: - logger.info("No faces detected in photo %s, marking as public", event.photo_id) - await self._photo_querier.update_photo_status(id=event.photo_id, status="approved") - await self._photo_querier.update_photo_visibility(id=event.photo_id, visibility="public") await self._update_job(job, "completed") + await self._publish_audit(event, len(faces)) await self._schedule_cleanup(event.image_ref) - return - - if len(faces) == 1: - await self._handle_single_face(event, faces[0]) - else: - await self._handle_group_photo(event, faces) - - await self._update_job(job, "completed") - await self._publish_audit(event, len(faces)) - await self._schedule_cleanup(event.image_ref) async def _handle_single_face(self, event: PhotoProcessEvent, face: DetectedFace) -> None: From 7a7c367a4a641e1dded8d86a769089f88b4698d4 Mon Sep 17 00:00:00 2001 From: edaywalid Date: Mon, 1 Jun 2026 01:01:42 +0100 Subject: [PATCH 2/7] fix(audit-worker): commit persisted audit events The audit worker ran record_event on a connection opened with engine.connect() and never committed, so every audit row was discarded when the connection eventually closed. Commit after each event and roll back on failure. The worker processes events one at a time, so the shared connection is safe to commit per message. --- app/worker/audit/main.py | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/app/worker/audit/main.py b/app/worker/audit/main.py index 95ceae8..5e2597d 100644 --- a/app/worker/audit/main.py +++ b/app/worker/audit/main.py @@ -38,14 +38,19 @@ async def stop(self) -> None: self._audit_service = None async def persist(self, payload: AuditEventMessage) -> None: - if self._audit_service is None: + if self._audit_service is None or self._conn is None: logger.warning("Audit service is unavailable for %s", payload.event_type) return - await self._audit_service.record_event( - event_type=payload.event_type, - user_id=payload.user_id, - metadata=payload.metadata, - ) + try: + await self._audit_service.record_event( + event_type=payload.event_type, + user_id=payload.user_id, + metadata=payload.metadata, + ) + await self._conn.commit() + except Exception: + await self._conn.rollback() + raise logger.info("Persisted audit %s for %s", payload.event_type, payload.user_id) From 2de8f2a05595ea54d7cb93e870ac92e163c3a47a Mon Sep 17 00:00:00 2001 From: edaywalid Date: Mon, 1 Jun 2026 01:02:20 +0100 Subject: [PATCH 3/7] fix(notification-worker): commit device invalidation writes DeviceInvalidationStore wrote through a connection opened with engine.connect() and never committed, so devices flagged for an invalid push token were never persisted. The worker also runs handlers concurrently against that single shared connection, which is not safe. Give each write its own transaction with engine.begin(). One transaction per token keeps a failure on one token from aborting the others, and removes the shared connection from the worker entrypoint. --- app/worker/notification/invalid_tokens.py | 12 ++++++++---- app/worker/notification/main.py | 8 +------- 2 files changed, 9 insertions(+), 11 deletions(-) diff --git a/app/worker/notification/invalid_tokens.py b/app/worker/notification/invalid_tokens.py index 02e24c9..6d1c7c4 100644 --- a/app/worker/notification/invalid_tokens.py +++ b/app/worker/notification/invalid_tokens.py @@ -6,6 +6,7 @@ from app.core.constant import RedisKey from app.core.logger import logger +from app.infra.database import engine from app.infra.redis import RedisClient from app.worker.notification.settings import NotifSetting @@ -43,19 +44,22 @@ async def remove(self, tokens: Sequence[str]) -> None: class DeviceInvalidationStore: - def __init__(self, device_querier: device_queries.AsyncQuerier) -> None: - self._device_querier = device_querier - async def mark_invalid(self, tokens: Iterable[str]) -> None: normalized: list[str] = [t for t in tokens if t] if not normalized: return + # One transaction per token. Handlers run concurrently, so each write + # gets its own connection rather than sharing one, and a failure on one + # token does not abort the rest. failed: list[str] = [] for token in normalized: try: - await self._device_querier.mark_device_token_invalid(push_token=token) + async with engine.begin() as conn: + await device_queries.AsyncQuerier(conn).mark_device_token_invalid( + push_token=token + ) except Exception: failed.append(token) logger.exception("Failed to flag device for invalid token %s", token) diff --git a/app/worker/notification/main.py b/app/worker/notification/main.py index 3f4711d..a79d7c2 100644 --- a/app/worker/notification/main.py +++ b/app/worker/notification/main.py @@ -3,8 +3,6 @@ import asyncio from typing import Sequence -from db.generated import devices as device_queries - from app.core.logger import logger from app.worker.notification.firebase import ( NotificationDeliveryError, @@ -18,7 +16,6 @@ from app.worker.notification.notification_queue import NotificationQueue, NotificationQueueEntry from app.worker.notification.rate_limiter import RateLimiter from app.worker.notification.settings import NotifSetting -from app.infra.database import engine from app.infra.redis import RedisClient from app.infra.nats import NatsClient @@ -141,16 +138,13 @@ async def main() -> None: queue = NotificationQueue(settings=NotifSetting) invalid_tokens = InvalidTokenStore(redis) - db_conn = await engine.connect() - device_querier = device_queries.AsyncQuerier(db_conn) - invalid_devices = DeviceInvalidationStore(device_querier) + invalid_devices = DeviceInvalidationStore() try: await run_worker(queue, invalid_tokens, invalid_devices) finally: await redis.close() - await db_conn.close() logger.info("Worker shutdown") From 3c12a6a5175ba8deb1055c47934aa7080806deba Mon Sep 17 00:00:00 2001 From: edaywalid Date: Mon, 1 Jun 2026 11:41:23 +0100 Subject: [PATCH 4/7] fix(db): enable pool_pre_ping on the async engine Without pre-ping, a pooled connection that died while idle (for example after a Postgres restart or an idle-timeout cut by a proxy) is handed out as-is and the next query fails with a stale-socket error. pool_pre_ping validates the connection on checkout and transparently replaces a dead one, which the workers rely on now that they check out a fresh connection per message. --- app/infra/database.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/app/infra/database.py b/app/infra/database.py index 10c15a5..956662b 100644 --- a/app/infra/database.py +++ b/app/infra/database.py @@ -6,7 +6,11 @@ DATABASE_URL = f"postgresql+asyncpg://{settings.POSTGRES_USER}:{settings.POSTGRES_PASSWORD}@{settings.POSTGRES_HOST}:{settings.POSTGRES_PORT}/{settings.POSTGRES_DB}" -engine = sqlalchemy.ext.asyncio.create_async_engine(DATABASE_URL, echo=settings.debug) +engine = sqlalchemy.ext.asyncio.create_async_engine( + DATABASE_URL, + echo=settings.debug, + pool_pre_ping=True, +) async def get_db() -> AsyncGenerator[sqlalchemy.ext.asyncio.AsyncConnection, None]: From c0258aabfa5bbad72a3bc07fba547164c433727b Mon Sep 17 00:00:00 2001 From: edaywalid Date: Mon, 1 Jun 2026 11:41:23 +0100 Subject: [PATCH 5/7] refactor(photo-worker): per-message transaction, let DB errors roll back Follow-up to the commit fix, addressing review feedback. The worker held a single connection for its whole lifetime, so a Postgres restart left it looping on a dead socket. It now opens a fresh connection and transaction per message with engine.begin(), which commits on success and rolls back on failure; combined with pool_pre_ping the connection is revalidated on checkout. The SQL helpers (_create_job, _update_job) and the per-face insert used to catch and swallow DB errors. Inside one transaction that is unsafe: the first failed statement aborts the whole transaction, and continuing would either hit "current transaction is aborted" on the next statement or raise PendingRollbackError at commit. Those swallows are removed so a DB error propagates and the transaction rolls back cleanly. The per-message handler in run_worker wraps everything in try/except and logs, so one bad message no longer tears down the subscription. --- app/worker/photo_worker/main.py | 181 ++++++++++++++++---------------- 1 file changed, 89 insertions(+), 92 deletions(-) diff --git a/app/worker/photo_worker/main.py b/app/worker/photo_worker/main.py index c05c0a3..0f60c47 100644 --- a/app/worker/photo_worker/main.py +++ b/app/worker/photo_worker/main.py @@ -4,11 +4,11 @@ import json from enum import Enum -from sqlalchemy.exc import DBAPIError, SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncConnection from app.container import Container from app.core.config import settings +from app.deps.ai_deps import get_face_embedding_service from app.core.constant import MINIO_URL_PREFIX from app.core.logger import logger from app.infra.database import engine @@ -60,44 +60,43 @@ async def handle_message(self, data: bytes) -> None: if event is None: return - # One transaction per message so every write below is committed (or rolled - # back) atomically. Messages are delivered one at a time, so the shared - # connection is never used by two handlers at once. - async with self._conn.begin(): - job = await self._create_job(event) + # The transaction is owned by run_worker, which opens a fresh + # engine.begin() per message. DB errors are allowed to propagate so that + # transaction rolls back cleanly instead of being committed half-applied. + job = await self._create_job(event) - try: - payload = await self._load_image(event.image_ref) - except Exception as exc: - logger.warning("Failed to load image for photo %s: %s", event.photo_id, exc) - await self._update_job(job, "failed") - return + try: + payload = await self._load_image(event.image_ref) + except Exception as exc: + logger.warning("Failed to load image for photo %s: %s", event.photo_id, exc) + await self._update_job(job, "failed") + return - await self._update_job(job, "running") + await self._update_job(job, "running") - try: - faces = await self._face_service.detect_faces(payload) - except Exception as exc: - logger.warning("Face detection failed for photo %s: %s", event.photo_id, exc) - await self._update_job(job, "failed") - return - - if not faces: - logger.info("No faces detected in photo %s, marking as public", event.photo_id) - await self._photo_querier.update_photo_status(id=event.photo_id, status="approved") - await self._photo_querier.update_photo_visibility(id=event.photo_id, visibility="public") - await self._update_job(job, "completed") - await self._schedule_cleanup(event.image_ref) - return - - if len(faces) == 1: - await self._handle_single_face(event, faces[0]) - else: - await self._handle_group_photo(event, faces) + try: + faces = await self._face_service.detect_faces(payload) + except Exception as exc: + logger.warning("Face detection failed for photo %s: %s", event.photo_id, exc) + await self._update_job(job, "failed") + return + if not faces: + logger.info("No faces detected in photo %s, marking as public", event.photo_id) + await self._photo_querier.update_photo_status(id=event.photo_id, status="approved") + await self._photo_querier.update_photo_visibility(id=event.photo_id, visibility="public") await self._update_job(job, "completed") - await self._publish_audit(event, len(faces)) await self._schedule_cleanup(event.image_ref) + return + + if len(faces) == 1: + await self._handle_single_face(event, faces[0]) + else: + await self._handle_group_photo(event, faces) + + await self._update_job(job, "completed") + await self._publish_audit(event, len(faces)) + await self._schedule_cleanup(event.image_ref) async def _handle_single_face(self, event: PhotoProcessEvent, face: DetectedFace) -> None: @@ -137,23 +136,16 @@ async def _handle_group_photo(self, event: PhotoProcessEvent, faces: list[Detect embedding_literal = "[" + ", ".join(str(x) for x in face.embedding) + "]" - try: - approval = await self._photo_face_querier.insert_photo_face_with_approval( - InsertPhotoFaceWithApprovalParams( - photo_id=event.photo_id, - face_index=face_index, - column_3=embedding_literal, - face_embedding=worker_settings.similarity_threshold, - bbox=bbox_json, - decision=PhotoApprovalDecision.PENDING.value, - ) + approval = await self._photo_face_querier.insert_photo_face_with_approval( + InsertPhotoFaceWithApprovalParams( + photo_id=event.photo_id, + face_index=face_index, + column_3=embedding_literal, + face_embedding=worker_settings.similarity_threshold, + bbox=bbox_json, + decision=PhotoApprovalDecision.PENDING.value, ) - except (DBAPIError, SQLAlchemyError) as exc: - logger.warning( - "DB error inserting face %d for photo %s: %s", - face_index, event.photo_id, exc, - ) - continue + ) if approval is None: logger.info("No match for face %d in photo %s", face_index, event.photo_id) @@ -186,21 +178,14 @@ async def _handle_group_photo(self, event: PhotoProcessEvent, faces: list[Detect async def _create_job(self, event: PhotoProcessEvent) -> models.ProcessingJob | None: if self._pj_querier is None: return None - try: - return await self._pj_querier.create_processing_job( - photo_id=event.photo_id, job_type="face_detection", - ) - except Exception as exc: - logger.warning("Failed to create processing job for photo %s: %s", event.photo_id, exc) - return None + return await self._pj_querier.create_processing_job( + photo_id=event.photo_id, job_type="face_detection", + ) async def _update_job(self, job: models.ProcessingJob | None, status: str) -> None: if job is None or self._pj_querier is None: return - try: - await self._pj_querier.update_processing_job_status(id=job.id, status=status) - except Exception as exc: - logger.warning("Failed to update processing job: %s", exc) + await self._pj_querier.update_processing_job_status(id=job.id, status=status) @staticmethod async def _publish_audit(event: PhotoProcessEvent, faces_count: int) -> None: @@ -281,40 +266,52 @@ async def run_worker() -> None: password=settings.REDIS_PASSWORD, ) - async with engine.connect() as conn: - container = Container(conn) - - single_face_service = SingleFaceMatchService( - conn=conn, - photo_face_querier=container.photo_face_querier, - photo_querier=container.photo_querier, - user_match_service=container.auth_service, - user_notification_service=container.user_notifications_service, - ) - - worker = PhotoWorker( - conn=conn, - face_embedding_service=container.face_embedding_service, - single_face_service=single_face_service, - user_notification_service=container.user_notifications_service, - photo_face_querier=container.photo_face_querier, - photo_querier=container.photo_querier, - processing_job_querier=container.processing_job_querier, - ) + # Load the embedding model once; it is a process-wide singleton. + get_face_embedding_service() - await NatsClient.js_subscribe( - subject=NatsSubjects.PHOTO_PROCESS, - callback=worker.handle_message, - stream_name=worker_settings.stream_name, - durable_name=worker_settings.durable_name, - ) - - logger.info("PhotoWorker subscribed on %s; waiting for jobs", NatsSubjects.PHOTO_PROCESS.value) + async def handle(data: bytes) -> None: + # Fresh connection and transaction per message. engine.begin() commits on + # success and rolls back on any error, so a failed message can never leave + # a half-applied or aborted transaction behind for the next one. With + # pool_pre_ping the connection is also revalidated on checkout, so a + # Postgres restart is recovered automatically. Errors are logged here so a + # single bad message does not tear down the subscription. try: - await asyncio.Event().wait() - finally: - await _close_minio() - await NatsClient.close() + async with engine.begin() as conn: + container = Container(conn) + single_face_service = SingleFaceMatchService( + conn=conn, + photo_face_querier=container.photo_face_querier, + photo_querier=container.photo_querier, + user_match_service=container.auth_service, + user_notification_service=container.user_notifications_service, + ) + worker = PhotoWorker( + conn=conn, + face_embedding_service=container.face_embedding_service, + single_face_service=single_face_service, + user_notification_service=container.user_notifications_service, + photo_face_querier=container.photo_face_querier, + photo_querier=container.photo_querier, + processing_job_querier=container.processing_job_querier, + ) + await worker.handle_message(data) + except Exception: + logger.exception("Failed to process photo message") + + await NatsClient.js_subscribe( + subject=NatsSubjects.PHOTO_PROCESS, + callback=handle, + stream_name=worker_settings.stream_name, + durable_name=worker_settings.durable_name, + ) + + logger.info("PhotoWorker subscribed on %s; waiting for jobs", NatsSubjects.PHOTO_PROCESS.value) + try: + await asyncio.Event().wait() + finally: + await _close_minio() + await NatsClient.close() async def _close_minio() -> None: From 33f6bd5796df04c0cad4b892b5eed24d13bb8ce6 Mon Sep 17 00:00:00 2001 From: edaywalid Date: Mon, 1 Jun 2026 11:41:23 +0100 Subject: [PATCH 6/7] refactor(audit-worker): per-message transaction Replace the long-lived connection with a fresh engine.begin() per event, for the same reasons as the photo worker: auto-commit/rollback per message and recovery from dropped connections via pool_pre_ping. Removes the manual start/stop connection lifecycle. --- app/worker/audit/main.py | 40 +++++++++------------------------------- 1 file changed, 9 insertions(+), 31 deletions(-) diff --git a/app/worker/audit/main.py b/app/worker/audit/main.py index 5e2597d..8037c26 100644 --- a/app/worker/audit/main.py +++ b/app/worker/audit/main.py @@ -1,7 +1,6 @@ import asyncio import json from typing import Any -import sqlalchemy.ext.asyncio from pydantic import ValidationError from app.core.constant import AUDIT_EVENT_SUBJECT from app.core.logger import logger @@ -18,39 +17,20 @@ async def init_worker() -> None: class AuditDeliveryWorker: - def __init__(self) -> None: - self._conn: sqlalchemy.ext.asyncio.AsyncConnection | None = None - self._audit_service: AuditService | None = None - - async def start(self) -> None: - if self._conn is not None: - return - self._conn = await engine.connect() - self._audit_service = AuditService( - audit_queries.AsyncQuerier(self._conn), - user_queries.AsyncQuerier(self._conn), - ) - - async def stop(self) -> None: - if self._conn is not None: - await self._conn.close() - self._conn = None - self._audit_service = None - async def persist(self, payload: AuditEventMessage) -> None: - if self._audit_service is None or self._conn is None: - logger.warning("Audit service is unavailable for %s", payload.event_type) - return - try: - await self._audit_service.record_event( + # Fresh connection and transaction per event. engine.begin() commits on + # success and rolls back on error, with pool_pre_ping revalidating the + # connection on checkout so a Postgres restart recovers automatically. + async with engine.begin() as conn: + service = AuditService( + audit_queries.AsyncQuerier(conn), + user_queries.AsyncQuerier(conn), + ) + await service.record_event( event_type=payload.event_type, user_id=payload.user_id, metadata=payload.metadata, ) - await self._conn.commit() - except Exception: - await self._conn.rollback() - raise logger.info("Persisted audit %s for %s", payload.event_type, payload.user_id) @@ -92,13 +72,11 @@ async def listen_nats_event(worker: AuditDeliveryWorker) -> None: async def main() -> None: await init_worker() worker = AuditDeliveryWorker() - await worker.start() await NatsClient.connect() try: await listen_nats_event(worker) await asyncio.Event().wait() finally: - await worker.stop() await NatsClient.close() From bec23c6122239e8307e57c38f79120dfea11bfd8 Mon Sep 17 00:00:00 2001 From: ademboukabes Date: Mon, 1 Jun 2026 11:53:57 +0100 Subject: [PATCH 7/7] fix(photo-worker): use RedisClient.init instead of constructor to avoid RuntimeError --- app/worker/photo_worker/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/worker/photo_worker/main.py b/app/worker/photo_worker/main.py index 0f60c47..ba7e960 100644 --- a/app/worker/photo_worker/main.py +++ b/app/worker/photo_worker/main.py @@ -260,7 +260,7 @@ async def run_worker() -> None: minio_root_user=settings.MINIO_ROOT_USER, minio_root_password=settings.MINIO_ROOT_PASSWORD, ) - RedisClient( + RedisClient.init( host=settings.REDIS_HOST, port=settings.REDIS_PORT, password=settings.REDIS_PASSWORD,