diff --git a/docs/local-http-api.md b/docs/local-http-api.md index b45f44c3..80dfc859 100644 --- a/docs/local-http-api.md +++ b/docs/local-http-api.md @@ -92,4 +92,6 @@ Error responses use this shape: } ``` -Possible error codes: `provider_not_found`, `not_found`, `method_not_allowed`. +Possible error codes: `provider_not_found`, `not_found`, `method_not_allowed`, `server_busy`. + +`server_busy` returns **503 Service Unavailable** when the local API is already handling the maximum number of concurrent connections. Clients should back off and retry later. diff --git a/src-tauri/src/local_http_api/server.rs b/src-tauri/src/local_http_api/server.rs index 7a68b03f..cc60233f 100644 --- a/src-tauri/src/local_http_api/server.rs +++ b/src-tauri/src/local_http_api/server.rs @@ -1,8 +1,60 @@ use super::cache::{cache_state, enabled_snapshots_ordered}; use std::io::{Read, Write}; use std::net::{TcpListener, TcpStream}; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; const BIND_ADDR: &str = "127.0.0.1:6736"; +const MAX_CONCURRENT_CONNECTIONS: usize = 16; +const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5); + +struct ConnectionLimiter { + active: Arc, + max: usize, +} + +struct ConnectionPermit { + active: Arc, +} + +impl ConnectionLimiter { + fn new(max: usize) -> Self { + Self { + active: Arc::new(AtomicUsize::new(0)), + max, + } + } + + fn acquire(&self) -> Option { + loop { + let active = self.active.load(Ordering::Acquire); + if active >= self.max { + return None; + } + if self + .active + .compare_exchange(active, active + 1, Ordering::AcqRel, Ordering::Acquire) + .is_ok() + { + return Some(ConnectionPermit { + active: Arc::clone(&self.active), + }); + } + } + } + + #[cfg(test)] + fn active_count(&self) -> usize { + self.active.load(Ordering::Acquire) + } +} + +impl Drop for ConnectionPermit { + fn drop(&mut self) { + self.active.fetch_sub(1, Ordering::AcqRel); + } +} // --------------------------------------------------------------------------- // HTTP server @@ -25,10 +77,21 @@ pub fn start_server() { } }; + let limiter = ConnectionLimiter::new(MAX_CONCURRENT_CONNECTIONS); for stream in listener.incoming() { match stream { - Ok(stream) => { - std::thread::spawn(move || handle_connection(stream)); + Ok(mut stream) => { + let Some(permit) = limiter.acquire() else { + log::warn!( + "local HTTP API connection limit reached (max={})", + MAX_CONCURRENT_CONNECTIONS + ); + let _ = stream.set_write_timeout(Some(CONNECTION_TIMEOUT)); + let _ = stream.write_all(response_service_unavailable().as_bytes()); + let _ = stream.flush(); + continue; + }; + std::thread::spawn(move || handle_connection(stream, permit)); } Err(e) => log::debug!("local HTTP API accept error: {}", e), } @@ -36,8 +99,9 @@ pub fn start_server() { }); } -fn handle_connection(mut stream: TcpStream) { - let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5))); +fn handle_connection(mut stream: TcpStream, _permit: ConnectionPermit) { + let _ = stream.set_read_timeout(Some(CONNECTION_TIMEOUT)); + let _ = stream.set_write_timeout(Some(CONNECTION_TIMEOUT)); // Read request (up to 4 KB is plenty for a request line + headers) let mut buf = [0u8; 4096]; @@ -153,6 +217,11 @@ fn response_method_not_allowed() -> String { response_json(405, "Method Not Allowed", body) } +fn response_service_unavailable() -> String { + let body = r#"{"error":"server_busy"}"#; + response_json(503, "Service Unavailable", body) +} + #[cfg(test)] mod tests { use super::super::cache::{cache_state, CachedPluginSnapshot}; @@ -250,4 +319,32 @@ mod tests { assert!(resp.contains("Access-Control-Allow-Origin: *")); assert!(resp.contains("Content-Type: application/json; charset=utf-8")); } + + #[test] + fn connection_limiter_rejects_above_capacity_and_releases_on_drop() { + let limiter = ConnectionLimiter::new(2); + let first = limiter.acquire().expect("first permit"); + let second = limiter.acquire().expect("second permit"); + + assert!(limiter.acquire().is_none()); + assert_eq!(limiter.active_count(), 2); + + drop(first); + + let third = limiter.acquire().expect("permit after release"); + assert_eq!(limiter.active_count(), 2); + + drop(second); + drop(third); + assert_eq!(limiter.active_count(), 0); + } + + #[test] + fn response_service_unavailable_returns_503_json() { + let resp = response_service_unavailable(); + + assert!(resp.starts_with("HTTP/1.1 503")); + assert!(resp.contains(r#""error":"server_busy""#)); + assert!(resp.contains("Access-Control-Allow-Origin: *")); + } }