diff --git a/Cargo.toml b/Cargo.toml index 9e0b871..b9cdd2d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rustvncserver" -version = "2.0.0" +version = "2.1.0" edition = "2021" rust-version = "1.90" authors = ["Dustin McAfee"] diff --git a/src/client.rs b/src/client.rs index b28f37d..08d3f9e 100644 --- a/src/client.rs +++ b/src/client.rs @@ -54,13 +54,15 @@ use crate::encoding::tight::TightStreamCompressor; use crate::framebuffer::{DirtyRegion, Framebuffer}; use crate::protocol::{ PixelFormat, Rectangle, ServerInit, CLIENT_MSG_CLIENT_CUT_TEXT, - CLIENT_MSG_FRAMEBUFFER_UPDATE_REQUEST, CLIENT_MSG_KEY_EVENT, CLIENT_MSG_POINTER_EVENT, - CLIENT_MSG_SET_ENCODINGS, CLIENT_MSG_SET_PIXEL_FORMAT, ENCODING_COMPRESS_LEVEL_0, - ENCODING_COMPRESS_LEVEL_9, ENCODING_COPYRECT, ENCODING_CORRE, ENCODING_HEXTILE, + CLIENT_MSG_ENABLE_CONTINUOUS_UPDATES, CLIENT_MSG_FRAMEBUFFER_UPDATE_REQUEST, + CLIENT_MSG_KEY_EVENT, CLIENT_MSG_POINTER_EVENT, CLIENT_MSG_SET_ENCODINGS, + CLIENT_MSG_SET_PIXEL_FORMAT, ENCODING_COMPRESS_LEVEL_0, ENCODING_COMPRESS_LEVEL_9, + ENCODING_CONTINUOUS_UPDATES, ENCODING_COPYRECT, ENCODING_CORRE, ENCODING_HEXTILE, ENCODING_QUALITY_LEVEL_0, ENCODING_QUALITY_LEVEL_9, ENCODING_RAW, ENCODING_RRE, ENCODING_TIGHT, ENCODING_TIGHTPNG, ENCODING_ZLIB, ENCODING_ZLIBHEX, ENCODING_ZRLE, ENCODING_ZYWRLE, PROTOCOL_VERSION, SECURITY_RESULT_FAILED, SECURITY_RESULT_OK, SECURITY_TYPE_NONE, - SECURITY_TYPE_VNC_AUTH, SERVER_MSG_FRAMEBUFFER_UPDATE, SERVER_MSG_SERVER_CUT_TEXT, + SECURITY_TYPE_VNC_AUTH, SERVER_MSG_END_OF_CONTINUOUS_UPDATES, SERVER_MSG_FRAMEBUFFER_UPDATE, + SERVER_MSG_SERVER_CUT_TEXT, }; use rfb_encodings::translate; @@ -235,8 +237,17 @@ pub struct VncClient { /// The VNC quality level (0-9, or 255 for unset = use JPEG). /// Stored as an `AtomicU8` for atomic access from multiple contexts. quality_level: AtomicU8, // Atomic - VNC quality level (0-9, 255=unset) - /// A flag indicating whether the client has requested continuous framebuffer updates, stored as an `AtomicBool`. - continuous_updates: AtomicBool, // Atomic - simple bool flag + /// Whether the client supports the `ContinuousUpdates` extension (advertised via -313 pseudo-encoding). + /// When true, server has sent `EndOfContinuousUpdates` and client can send `EnableContinuousUpdates`. + supports_continuous_updates: AtomicBool, // Atomic - set when client advertises -313 + /// Whether continuous updates are currently enabled via the `ContinuousUpdates` extension. + /// When true, server pushes updates without waiting for `FramebufferUpdateRequest`. + continuous_updates_enabled: AtomicBool, // Atomic - set by EnableContinuousUpdates message + /// The region for which continuous updates are enabled (when using `ContinuousUpdates` extension). + continuous_updates_region: RwLock>, // Protected - set by EnableContinuousUpdates + /// Legacy flag: whether server is actively sending updates after `FramebufferUpdateRequest`. + /// Used when client does NOT support `ContinuousUpdates` extension (traditional VNC behavior). + update_requested: AtomicBool, // Atomic - set by FramebufferUpdateRequest, cleared after update sent /// A shared, locked vector of `DirtyRegion`s specific to this client. /// These regions represent areas of the framebuffer that have been modified and need to be sent to the client. modified_regions: Arc>>, // Per-client dirty regions (standard VNC protocol style - receives pushes from framebuffer) @@ -281,6 +292,8 @@ pub struct VncClient { destination_port: Option, /// Repeater ID for repeater connections (None for direct connections) repeater_id: Option, + /// Request ID for tracking connection requests (optional, set by caller) + request_id: Option, /// Unique client ID assigned by the server client_id: usize, } @@ -404,7 +417,10 @@ impl VncClient { jpeg_quality: AtomicU8::new(80), // Default quality compression_level: AtomicU8::new(6), // Default zlib compression (balanced) quality_level: AtomicU8::new(255), // 255 = unset (use JPEG by default) - continuous_updates: AtomicBool::new(false), + supports_continuous_updates: AtomicBool::new(false), // Set when client advertises -313 + continuous_updates_enabled: AtomicBool::new(false), // Set by EnableContinuousUpdates + continuous_updates_region: RwLock::new(None), // Region for continuous updates + update_requested: AtomicBool::new(false), // Legacy: set by FramebufferUpdateRequest modified_regions: Arc::new(RwLock::new(Vec::new())), requested_region: RwLock::new(None), copy_region: Arc::new(RwLock::new(Vec::new())), // Initialize empty copy region @@ -422,6 +438,7 @@ impl VncClient { remote_host, destination_port: None, // None for direct inbound connections repeater_id: None, // None for direct inbound connections + request_id: None, // None for direct inbound connections client_id, }) } @@ -599,6 +616,23 @@ impl VncClient { #[cfg(feature = "debug-logging")] info!("Client requested compression level {compression_level}, using zlib level {compression_level}"); } + + // Check for ContinuousUpdates pseudo-encoding (-313) + if encoding == ENCODING_CONTINUOUS_UPDATES { + // Client supports ContinuousUpdates extension + // Send EndOfContinuousUpdates message to confirm support + if !self.supports_continuous_updates.load(Ordering::Relaxed) { + self.supports_continuous_updates.store(true, Ordering::Relaxed); + #[cfg(feature = "debug-logging")] + info!("Client supports ContinuousUpdates extension, sending EndOfContinuousUpdates"); + + // Send EndOfContinuousUpdates message (1 byte: type 150) + let _guard = self.send_mutex.lock().await; + if let Err(e) = self.write_stream.lock().await.write_all(&[SERVER_MSG_END_OF_CONTINUOUS_UPDATES]).await { + error!("Failed to send EndOfContinuousUpdates: {e}"); + } + } + } } self.encodings.write().await.clone_from(&encodings_list); #[cfg(feature = "debug-logging")] @@ -621,9 +655,9 @@ impl VncClient { // Track requested region (standard VNC protocol cl->requestedRegion) *self.requested_region.write().await = Some(DirtyRegion::new(x, y, width, height)); - // Enable continuous updates for both incremental and non-incremental requests - // The difference is handled below: non-incremental clears and adds full region - self.continuous_updates.store(true, Ordering::Relaxed); + // Mark that an update was requested (traditional VNC behavior) + // If ContinuousUpdates extension is enabled, this is ignored + self.update_requested.store(true, Ordering::Relaxed); // Handle non-incremental updates (full refresh) if !incremental { @@ -701,6 +735,39 @@ impl VncClient { let _ = self.event_tx.send(ClientEvent::CutText { text }); } } + CLIENT_MSG_ENABLE_CONTINUOUS_UPDATES => { + // EnableContinuousUpdates: enable(u8) + x(u16) + y(u16) + w(u16) + h(u16) = 10 bytes total + if buf.len() < 10 { + break; + } + buf.advance(1); // message type + let enable = buf.get_u8() != 0; + let x = buf.get_u16(); + let y = buf.get_u16(); + let width = buf.get_u16(); + let height = buf.get_u16(); + + if enable { + // Enable continuous updates for the specified region + let region = DirtyRegion::new(x, y, width, height); + *self.continuous_updates_region.write().await = Some(region); + self.continuous_updates_enabled.store(true, Ordering::Relaxed); + #[cfg(feature = "debug-logging")] + info!("ContinuousUpdates enabled for region ({x},{y} {width}x{height})"); + } else { + // Disable continuous updates + *self.continuous_updates_region.write().await = None; + self.continuous_updates_enabled.store(false, Ordering::Relaxed); + #[cfg(feature = "debug-logging")] + info!("ContinuousUpdates disabled"); + + // Send EndOfContinuousUpdates to confirm disable + let _guard = self.send_mutex.lock().await; + if let Err(e) = self.write_stream.lock().await.write_all(&[SERVER_MSG_END_OF_CONTINUOUS_UPDATES]).await { + error!("Failed to send EndOfContinuousUpdates: {e}"); + } + } + } _ => { error!("Unknown message type: {msg_type}, disconnecting client"); let _ = self.event_tx.send(ClientEvent::Disconnected); @@ -713,10 +780,15 @@ impl VncClient { } } - // Periodically check if we should send updates (standard VNC protocol style) + // Periodically check if we should send updates _ = check_interval.tick() => { - let continuous = self.continuous_updates.load(Ordering::Relaxed); - if continuous { + // Determine if updates should be sent: + // - ContinuousUpdates extension enabled (client sent EnableContinuousUpdates with enable=true) + // - OR traditional mode: FramebufferUpdateRequest received (update_requested=true) + let cu_enabled = self.continuous_updates_enabled.load(Ordering::Relaxed); + let update_requested = self.update_requested.load(Ordering::Relaxed); + + if cu_enabled || update_requested { // Check if we have regions and deferral time has elapsed // Regions are already pushed to us by framebuffer (no merge needed!) let should_send = { @@ -746,6 +818,12 @@ impl VncClient { if should_send { self.send_batched_update().await?; + + // In traditional mode (not ContinuousUpdates), clear the update_requested flag + // This matches libvncserver behavior: after sending an update, wait for next request + if !cu_enabled { + self.update_requested.store(false, Ordering::Relaxed); + } } } } @@ -1733,6 +1811,16 @@ impl VncClient { self.repeater_id = Some(repeater_id); self.destination_port = destination_port; } + + /// Sets the request ID for tracking connection requests. + pub fn set_request_id(&mut self, request_id: String) { + self.request_id = Some(request_id); + } + + /// Returns the request ID if set, or None. + pub fn get_request_id(&self) -> Option<&str> { + self.request_id.as_deref() + } } /// Ensures proper cleanup when `VncClient` is dropped. diff --git a/src/events.rs b/src/events.rs index 867e01b..034de9f 100644 --- a/src/events.rs +++ b/src/events.rs @@ -62,4 +62,32 @@ pub enum ServerEvent { /// Clipboard text content. text: String, }, + + /// The RFB ID message was sent to a VNC repeater. + /// + /// This event is emitted after the server sends the repeater ID message + /// to the VNC repeater. It's useful for tracking connection progress + /// in applications that need to report connection status. + RfbMessageSent { + /// Client identifier. + client_id: usize, + /// Optional request ID for tracking this connection. + request_id: Option, + /// Whether the RFB ID message was sent successfully. + success: bool, + }, + + /// The VNC handshake completed after connecting to a repeater. + /// + /// This event is emitted after the VNC protocol handshake completes + /// with a client connected via a repeater. It indicates that the + /// connection is fully established and ready for use. + HandshakeComplete { + /// Client identifier. + client_id: usize, + /// Optional request ID for tracking this connection. + request_id: Option, + /// Whether the handshake completed successfully. + success: bool, + }, } diff --git a/src/protocol.rs b/src/protocol.rs index 30217a7..361de06 100644 --- a/src/protocol.rs +++ b/src/protocol.rs @@ -105,6 +105,13 @@ pub const CLIENT_MSG_POINTER_EVENT: u8 = 5; /// Allows the client to transfer clipboard contents to the server. pub const CLIENT_MSG_CLIENT_CUT_TEXT: u8 = 6; +/// Message type: Client enables or disables Continuous Updates. +/// +/// Part of the `ContinuousUpdates` extension. When enabled, the server +/// pushes framebuffer updates without waiting for `FramebufferUpdateRequest`. +/// Message format: type (u8) + enable (u8) + x (u16) + y (u16) + w (u16) + h (u16) +pub const CLIENT_MSG_ENABLE_CONTINUOUS_UPDATES: u8 = 150; + // Server-to-Client Message Types /// Message type: Server sends a framebuffer update. @@ -131,6 +138,13 @@ pub const SERVER_MSG_BELL: u8 = 2; /// Allows the server to transfer clipboard contents to the client. pub const SERVER_MSG_SERVER_CUT_TEXT: u8 = 3; +/// Message type: Server signals end of continuous updates. +/// +/// Part of the `ContinuousUpdates` extension. Sent by the server to indicate +/// it supports the `ContinuousUpdates` extension when client advertises -313. +/// Also sent when continuous updates are disabled. +pub const SERVER_MSG_END_OF_CONTINUOUS_UPDATES: u8 = 150; + // Encoding Types // // Note: Most encoding type constants are re-exported from rfb-encodings at the top of this file. @@ -195,6 +209,14 @@ pub const ENCODING_COMPRESS_LEVEL_0: i32 = -256; /// for reduced bandwidth usage. pub const ENCODING_COMPRESS_LEVEL_9: i32 = -247; +/// Pseudo-encoding: Continuous Updates. +/// +/// When included in the client's encoding list, this indicates the client +/// supports the `ContinuousUpdates` extension. The server should respond with +/// an `EndOfContinuousUpdates` message (type 150) to confirm support. +/// Once confirmed, the client can send `EnableContinuousUpdates` messages. +pub const ENCODING_CONTINUOUS_UPDATES: i32 = -313; + // Note: Hextile and Tight subencoding constants are re-exported from rfb-encodings // at the top of this file. diff --git a/src/repeater.rs b/src/repeater.rs index 1b87c95..4de46b8 100644 --- a/src/repeater.rs +++ b/src/repeater.rs @@ -142,3 +142,139 @@ pub async fn connect_repeater( info!("VNC repeater connection established successfully"); Ok(client) } + +/// Progress events emitted during repeater connection. +/// +/// These events allow applications to track the progress of a repeater +/// connection and receive callbacks at key points during the connection process. +#[derive(Debug, Clone)] +pub enum RepeaterProgress { + /// The RFB ID message was sent to the repeater. + RfbMessageSent { + /// Whether the send was successful + success: bool, + }, + /// The VNC handshake completed. + HandshakeComplete { + /// Whether the handshake was successful + success: bool, + }, +} + +/// Connects to a VNC repeater with progress callbacks. +/// +/// This is an extended version of [`connect_repeater`] that emits progress +/// events during the connection process. This is useful for applications +/// that need to track connection status or provide feedback to users. +/// +/// # Arguments +/// +/// All arguments are the same as [`connect_repeater`], plus: +/// * `progress_tx` - An optional channel to send progress events during connection. +/// +/// # Returns +/// +/// Same as [`connect_repeater`]. +#[allow(clippy::too_many_arguments)] +pub async fn connect_repeater_with_progress( + client_id: usize, + repeater_host: String, + repeater_port: u16, + repeater_id: String, + framebuffer: Framebuffer, + desktop_name: String, + password: Option, + event_tx: mpsc::UnboundedSender, + progress_tx: Option>, +) -> Result { + #[cfg(feature = "debug-logging")] + info!("Connecting to VNC repeater {repeater_host}:{repeater_port} with ID: {repeater_id}"); + + // Connect to repeater + #[cfg(feature = "debug-logging")] + info!("Attempting TCP connection to {repeater_host}:{repeater_port}..."); + let mut stream = match TcpStream::connect(format!("{repeater_host}:{repeater_port}")).await { + Ok(s) => { + #[cfg(feature = "debug-logging")] + info!("TCP connection established to {repeater_host}:{repeater_port}"); + s + } + Err(e) => { + error!("Failed to establish TCP connection to {repeater_host}:{repeater_port}: {e}"); + return Err(e); + } + }; + + // Format ID string: "ID:xxxxx" padded to 250 bytes with nulls + let mut id_buffer = [0u8; 250]; + let id_string = format!("ID:{repeater_id}"); + + // Validate ID length + if id_string.len() > 250 { + // Emit failure event before returning error + if let Some(tx) = &progress_tx { + let _ = tx.send(RepeaterProgress::RfbMessageSent { success: false }); + } + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Repeater ID too long (max 246 characters after 'ID:' prefix)", + )); + } + + // Copy ID string into buffer (rest remains null) + id_buffer[..id_string.len()].copy_from_slice(id_string.as_bytes()); + + // Send ID to repeater + #[cfg(feature = "debug-logging")] + info!("Sending repeater ID: {id_string}"); + if let Err(e) = stream.write_all(&id_buffer).await { + error!("Failed to send repeater ID to {repeater_host}:{repeater_port}: {e}"); + // Emit failure event + if let Some(tx) = &progress_tx { + let _ = tx.send(RepeaterProgress::RfbMessageSent { success: false }); + } + return Err(e); + } + + // Emit success event for RFB message sent + if let Some(tx) = &progress_tx { + let _ = tx.send(RepeaterProgress::RfbMessageSent { success: true }); + } + + #[cfg(feature = "debug-logging")] + info!("Repeater ID sent, proceeding with VNC handshake"); + + // Now proceed with normal VNC client handshake + let client_result = VncClient::new( + client_id, + stream, + framebuffer, + desktop_name, + password, + event_tx, + ) + .await; + + match client_result { + Ok(mut client) => { + // Emit handshake success event + if let Some(tx) = &progress_tx { + let _ = tx.send(RepeaterProgress::HandshakeComplete { success: true }); + } + + // Set repeater metadata for client management APIs + client.set_repeater_metadata(repeater_id, Some(repeater_port)); + + #[cfg(feature = "debug-logging")] + info!("VNC repeater connection established successfully"); + Ok(client) + } + Err(e) => { + // Emit handshake failure event + if let Some(tx) = &progress_tx { + let _ = tx.send(RepeaterProgress::HandshakeComplete { success: false }); + } + Err(e) + } + } +} diff --git a/src/server.rs b/src/server.rs index c64f460..b5e53fd 100644 --- a/src/server.rs +++ b/src/server.rs @@ -112,6 +112,32 @@ pub enum ServerEvent { /// The cut text string text: String, }, + /// The RFB ID message was sent to a VNC repeater. + /// + /// This event is emitted after the server sends the repeater ID message + /// to the VNC repeater. It's useful for tracking connection progress + /// in applications that need to report connection status. + RfbMessageSent { + /// The unique identifier for the client + client_id: usize, + /// The optional request ID for tracking this connection + request_id: Option, + /// Whether the RFB ID message was sent successfully + success: bool, + }, + /// The VNC handshake completed after connecting to a repeater. + /// + /// This event is emitted after the VNC protocol handshake completes + /// with a client connected via a repeater. It indicates that the + /// connection is fully established and ready for use. + HandshakeComplete { + /// The unique identifier for the client + client_id: usize, + /// The optional request ID for tracking this connection + request_id: Option, + /// Whether the handshake completed successfully + success: bool, + }, } impl VncServer { @@ -765,6 +791,214 @@ impl VncServer { } } + /// Connects to a VNC repeater with an optional request ID for tracking. + /// + /// This is an extended version of [`VncServer::connect_repeater`] that accepts an optional + /// `request_id` parameter for tracking connection requests. The request ID is + /// stored as client metadata. + /// + /// When a `request_id` is provided, this method also emits progress events: + /// - [`ServerEvent::RfbMessageSent`] - when the RFB ID message is sent to the repeater + /// - [`ServerEvent::HandshakeComplete`] - when the VNC handshake completes + /// + /// # Arguments + /// + /// * `repeater_host` - The hostname or IP address of the VNC repeater. + /// * `repeater_port` - The port of the VNC repeater. + /// * `repeater_id` - The ID to use when connecting to the repeater. + /// * `request_id` - Optional request ID for tracking this connection request. + /// + /// # Returns + /// + /// `Ok(client_id)` if the connection is successful. + /// + /// # Errors + /// + /// Returns `std::io::Error` if the connection fails (network error, authentication failure, etc.). + #[allow(clippy::too_many_lines)] + #[allow(clippy::cast_possible_truncation)] + pub async fn connect_repeater_with_request_id( + &self, + repeater_host: String, + repeater_port: u16, + repeater_id: String, + request_id: Option, + ) -> Result { + use crate::repeater::{connect_repeater_with_progress, RepeaterProgress}; + + // Safely increment client ID counter and check for overflow + let client_id_raw = NEXT_CLIENT_ID.fetch_add(1, Ordering::SeqCst); + if client_id_raw == 0 || client_id_raw >= u64::MAX - 1000 { + return Err(std::io::Error::other("Client ID counter overflow")); + } + let client_id = client_id_raw as usize; + + let framebuffer = self.framebuffer.clone(); + let desktop_name = self.desktop_name.clone(); + let password = self.password.clone(); + let clients = self.clients.clone(); + let client_write_streams = self.client_write_streams.clone(); + let client_tasks = self.client_tasks.clone(); + let client_ids = self.client_ids.clone(); + let server_event_tx = self.event_tx.clone(); + let request_id_clone = request_id.clone(); + + // Use oneshot channel to wait for connection result before returning + let (result_tx, result_rx) = tokio::sync::oneshot::channel(); + + tokio::spawn(async move { + let (client_event_tx, mut client_event_rx) = mpsc::unbounded_channel(); + + // Create progress channel for repeater connection events + let (progress_tx, mut progress_rx) = mpsc::unbounded_channel(); + + // Spawn a task to forward progress events to server events + let server_event_tx_progress = server_event_tx.clone(); + let request_id_for_progress = request_id_clone.clone(); + tokio::spawn(async move { + while let Some(progress) = progress_rx.recv().await { + match progress { + RepeaterProgress::RfbMessageSent { success } => { + let _ = server_event_tx_progress.send(ServerEvent::RfbMessageSent { + client_id, + request_id: request_id_for_progress.clone(), + success, + }); + } + RepeaterProgress::HandshakeComplete { success } => { + let _ = server_event_tx_progress.send(ServerEvent::HandshakeComplete { + client_id, + request_id: request_id_for_progress.clone(), + success, + }); + } + } + } + }); + + let connection_result = connect_repeater_with_progress( + client_id, + repeater_host, + repeater_port, + repeater_id, + framebuffer.clone(), + desktop_name, + password, + client_event_tx, + Some(progress_tx), + ) + .await; + + // Send connection result back to caller + let _ = result_tx.send( + connection_result + .as_ref() + .map(|_| ()) + .map_err(|e| std::io::Error::new(e.kind(), e.to_string())), + ); + + match connection_result { + Ok(mut client) => { + // Set request_id on the client if provided + if let Some(req_id) = request_id_clone { + client.set_request_id(req_id); + } + + log::info!( + "Repeater connection {client_id} established (with request_id tracking)" + ); + + let client_arc = Arc::new(RwLock::new(client)); + + // Register client to receive dirty region notifications + let regions_arc = client_arc.read().await.get_receiver_handle(); + let receiver = DirtyRegionReceiver::new(Arc::downgrade(®ions_arc)); + framebuffer.register_receiver(receiver).await; + + // Store the write stream handle for direct socket shutdown + let write_stream_handle = { + let client = client_arc.read().await; + client.get_write_stream_handle() + }; + client_write_streams.write().await.push(write_stream_handle); + + clients.write().await.push(client_arc.clone()); + client_ids.write().await.push(client_id); + + let _ = server_event_tx.send(ServerEvent::ClientConnected { client_id }); + + // Spawn task to handle client messages + let client_arc_clone = client_arc.clone(); + let msg_handle = tokio::spawn(async move { + let result = { + let mut client = client_arc_clone.write().await; + client.handle_messages().await + }; + if let Err(e) = result { + error!("Repeater client {client_id} message handling error: {e}"); + } + }); + + // Store the message handler task handle + client_tasks.write().await.push(msg_handle); + + // Handle client events + while let Some(event) = client_event_rx.recv().await { + match event { + ClientEvent::KeyPress { down, key } => { + let _ = server_event_tx.send(ServerEvent::KeyPress { + client_id, + down, + key, + }); + } + ClientEvent::PointerMove { x, y, button_mask } => { + let _ = server_event_tx.send(ServerEvent::PointerMove { + client_id, + x, + y, + button_mask, + }); + } + ClientEvent::CutText { text } => { + let _ = + server_event_tx.send(ServerEvent::CutText { client_id, text }); + } + ClientEvent::Disconnected => { + break; + } + } + } + + // Remove client from list + let mut clients_guard = clients.write().await; + clients_guard.retain(|c| !Arc::ptr_eq(c, &client_arc)); + drop(clients_guard); + + let mut client_ids_guard = client_ids.write().await; + client_ids_guard.retain(|&id| id != client_id); + drop(client_ids_guard); + + let _ = server_event_tx.send(ServerEvent::ClientDisconnected { client_id }); + + log::info!("Repeater client {client_id} disconnected"); + } + Err(e) => { + error!("Failed to connect to repeater: {e}"); + } + } + }); + + // Wait for connection to complete before returning to caller + match result_rx.await { + Ok(Ok(())) => Ok(client_id), + Ok(Err(e)) => Err(e), + Err(_) => Err(std::io::Error::other( + "Repeater connection task died unexpectedly", + )), + } + } + /// Finds a client by its ID. /// /// This method searches through all connected clients to find the one