diff --git a/app/domains/auth/dependencies.py b/app/domains/auth/dependencies.py index df7b437..6b80cdf 100644 --- a/app/domains/auth/dependencies.py +++ b/app/domains/auth/dependencies.py @@ -11,7 +11,9 @@ ) from app.core.event_dispatcher import EventDispatcherDep from app.core.exceptions import AppHTTPException -from app.core.logger import user_id_ctx +from app.core.logger import get_logger, user_id_ctx + +_ws_auth_logger = get_logger("app.auth.ws") from app.db.postgres.dependencies import PgSessionDep from app.domains.auth.repositories.password_reset_token_repository import ( PasswordResetTokenRepository, @@ -180,18 +182,33 @@ def _extract_token_from_ws_subprotocols(subprotocols: str | None) -> str | None: def _extract_ws_access_token(ws: WebSocket) -> str: - token = _extract_token_from_ws_subprotocols( - ws.headers.get("sec-websocket-protocol") - ) + subprotocol_header = ws.headers.get("sec-websocket-protocol") + token = _extract_token_from_ws_subprotocols(subprotocol_header) if token: + _ws_auth_logger.debug( + "WS token extracted from subprotocol", + extra={"path": ws.url.path}, + ) return token token = _extract_bearer_token(ws.headers.get("authorization")) if token: + _ws_auth_logger.debug( + "WS token extracted from Authorization header", + extra={"path": ws.url.path}, + ) return token + _ws_auth_logger.warning( + "WS rejected: missing access token", + extra={ + "path": ws.url.path, + "has_subprotocol_header": bool(subprotocol_header), + "has_authorization_header": bool(ws.headers.get("authorization")), + }, + ) raise WebSocketException( code=1008, reason="Missing WebSocket access token", @@ -212,9 +229,21 @@ async def get_current_user_session_ws( SessionNotFoundError, UserNotFoundError, ) as e: + _ws_auth_logger.warning( + "WS rejected: invalid token/session", + extra={ + "path": ws.url.path, + "error_type": type(e).__name__, + "error": str(e), + }, + ) raise WebSocketException(code=1008, reason=str(e)) from e user_id_ctx.set(str(user.id)) + _ws_auth_logger.debug( + "WS session loaded", + extra={"path": ws.url.path, "user_id": str(user.id)}, + ) return user, session @@ -265,6 +294,10 @@ async def checker(permissions: UserPermissionsWsDep) -> bool: names = [p.name for p in permissions] if permission_name not in names: + _ws_auth_logger.warning( + "WS rejected: insufficient permissions", + extra={"required": permission_name, "user_permissions": names}, + ) raise WebSocketException(code=1008, reason="Insufficient permissions") return True diff --git a/app/domains/live_chat/routers/chat_router.py b/app/domains/live_chat/routers/chat_router.py index 64c21e2..56e3724 100644 --- a/app/domains/live_chat/routers/chat_router.py +++ b/app/domains/live_chat/routers/chat_router.py @@ -68,9 +68,14 @@ async def connect_to_conversation( response: WSResponseFactoryDep, ) -> None: user = auth[0] + log_ctx = {"chat_id": str(chat_id), "user_id": str(user.id)} + + logger.info("WS connect attempt", extra=log_ctx) + chat = await service.get_by_id(chat_id) if chat is None: + logger.warning("WS denied: chat does not exist", extra=log_ctx) await ws.send_denial_response( JSONResponse( status_code=403, @@ -80,6 +85,7 @@ async def connect_to_conversation( return if not chat.is_opened(): + logger.warning("WS denied: chat already closed", extra=log_ctx) await ws.send_denial_response( JSONResponse( status_code=403, @@ -89,6 +95,7 @@ async def connect_to_conversation( return if not can_user_join_conversation(user, chat): + logger.warning("WS denied: user not allowed in chat", extra=log_ctx) await ws.send_denial_response( JSONResponse( status_code=403, @@ -97,7 +104,13 @@ async def connect_to_conversation( ) return - await ws.accept(subprotocol=get_accepted_subprotocol(ws)) + subprotocol = get_accepted_subprotocol(ws) + logger.debug( + "WS accepting handshake", + extra={**log_ctx, "subprotocol": subprotocol}, + ) + await ws.accept(subprotocol=subprotocol) + logger.info("WS handshake accepted", extra=log_ctx) conn = ChatConnection(ws, response, user) joined = False @@ -105,26 +118,54 @@ async def connect_to_conversation( try: await chat_manager.join_room(chat_id, conn) joined = True + logger.info("WS joined room", extra=log_ctx) while ws.client_state == WebSocketState.CONNECTED: try: payload = await conn.receive_payload() + logger.debug("WS payload received", extra=log_ctx) + message = service.handle_message(chat_id, user.id, payload) await service.add_message_to_conversation(chat_id, message) + logger.debug( + "WS message persisted", + extra={**log_ctx, "message_id": str(message.id)}, + ) + await chat_manager.broadcast(chat_id, message) + logger.debug( + "WS message broadcast", + extra={**log_ctx, "message_id": str(message.id)}, + ) - except WebSocketDisconnect: + except WebSocketDisconnect as e: + logger.info( + "WS client disconnected", + extra={**log_ctx, "code": e.code, "reason": e.reason}, + ) break except (InvalidMessageError, ValidationError) as e: + logger.warning( + "WS invalid message", + extra={**log_ctx, "error": str(e)}, + ) await conn.send_error( WebSocketException(code=1003, reason=str(e) or "") ) except ValueError as e: + logger.warning( + "WS policy violation", + extra={**log_ctx, "error": str(e)}, + ) await conn.send_error( WebSocketException(code=1008, reason=str(e)) ) except RuntimeError as e: + logger.error( + "WS runtime error", + extra={**log_ctx, "error": str(e)}, + ) await conn.send_error( WebSocketException(code=1011, reason=str(e)) ) @@ -132,14 +173,21 @@ async def connect_to_conversation( except ChatRoomNotFoundError as e: logger.warning( "Chat room not found during connection", - extra={"chat_id": str(chat_id)}, + extra={**log_ctx, "error": str(e)}, ) await conn.send_error(WebSocketException(code=1011, reason=str(e))) await conn.close(code=1011, reason="Chat room unavailable") + except Exception: + logger.exception("WS unexpected error", extra=log_ctx) + raise + finally: if joined: + logger.info("WS leaving room", extra=log_ctx) await chat_manager.leave_room(chat_id, conn) + else: + logger.info("WS connection ended without joining", extra=log_ctx) @chat_router.websocket("/test/room/{conversation_id}")