Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/local-http-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,6 @@ Error responses use this shape:
}
```

Possible error codes: `provider_not_found`, `not_found`, `method_not_allowed`.
Possible error codes: `provider_not_found`, `not_found`, `method_not_allowed`, `server_busy`.

`server_busy` returns **503 Service Unavailable** when the local API is already handling the maximum number of concurrent connections. Clients should back off and retry later.
105 changes: 101 additions & 4 deletions src-tauri/src/local_http_api/server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,60 @@
use super::cache::{cache_state, enabled_snapshots_ordered};
use std::io::{Read, Write};
use std::net::{TcpListener, TcpStream};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Duration;

const BIND_ADDR: &str = "127.0.0.1:6736";
const MAX_CONCURRENT_CONNECTIONS: usize = 16;
const CONNECTION_TIMEOUT: Duration = Duration::from_secs(5);

struct ConnectionLimiter {
active: Arc<AtomicUsize>,
max: usize,
}

struct ConnectionPermit {
active: Arc<AtomicUsize>,
}

impl ConnectionLimiter {
fn new(max: usize) -> Self {
Self {
active: Arc::new(AtomicUsize::new(0)),
max,
}
}

fn acquire(&self) -> Option<ConnectionPermit> {
loop {
let active = self.active.load(Ordering::Acquire);
if active >= self.max {
return None;
}
if self
.active
.compare_exchange(active, active + 1, Ordering::AcqRel, Ordering::Acquire)
.is_ok()
{
return Some(ConnectionPermit {
active: Arc::clone(&self.active),
});
}
}
}

#[cfg(test)]
fn active_count(&self) -> usize {
self.active.load(Ordering::Acquire)
}
}

impl Drop for ConnectionPermit {
fn drop(&mut self) {
self.active.fetch_sub(1, Ordering::AcqRel);
}
}

// ---------------------------------------------------------------------------
// HTTP server
Expand All @@ -25,19 +77,31 @@ pub fn start_server() {
}
};

let limiter = ConnectionLimiter::new(MAX_CONCURRENT_CONNECTIONS);
for stream in listener.incoming() {
match stream {
Ok(stream) => {
std::thread::spawn(move || handle_connection(stream));
Ok(mut stream) => {
let Some(permit) = limiter.acquire() else {
log::warn!(
"local HTTP API connection limit reached (max={})",
MAX_CONCURRENT_CONNECTIONS
);
let _ = stream.set_write_timeout(Some(CONNECTION_TIMEOUT));
let _ = stream.write_all(response_service_unavailable().as_bytes());
let _ = stream.flush();
continue;
};
std::thread::spawn(move || handle_connection(stream, permit));
}
Err(e) => log::debug!("local HTTP API accept error: {}", e),
}
}
});
}

fn handle_connection(mut stream: TcpStream) {
let _ = stream.set_read_timeout(Some(std::time::Duration::from_secs(5)));
fn handle_connection(mut stream: TcpStream, _permit: ConnectionPermit) {
let _ = stream.set_read_timeout(Some(CONNECTION_TIMEOUT));
let _ = stream.set_write_timeout(Some(CONNECTION_TIMEOUT));

// Read request (up to 4 KB is plenty for a request line + headers)
let mut buf = [0u8; 4096];
Expand Down Expand Up @@ -153,6 +217,11 @@ fn response_method_not_allowed() -> String {
response_json(405, "Method Not Allowed", body)
}

fn response_service_unavailable() -> String {
let body = r#"{"error":"server_busy"}"#;
response_json(503, "Service Unavailable", body)
}

#[cfg(test)]
mod tests {
use super::super::cache::{cache_state, CachedPluginSnapshot};
Expand Down Expand Up @@ -250,4 +319,32 @@ mod tests {
assert!(resp.contains("Access-Control-Allow-Origin: *"));
assert!(resp.contains("Content-Type: application/json; charset=utf-8"));
}

#[test]
fn connection_limiter_rejects_above_capacity_and_releases_on_drop() {
let limiter = ConnectionLimiter::new(2);
let first = limiter.acquire().expect("first permit");
let second = limiter.acquire().expect("second permit");

assert!(limiter.acquire().is_none());
assert_eq!(limiter.active_count(), 2);

drop(first);

let third = limiter.acquire().expect("permit after release");
assert_eq!(limiter.active_count(), 2);

drop(second);
drop(third);
assert_eq!(limiter.active_count(), 0);
}

#[test]
fn response_service_unavailable_returns_503_json() {
let resp = response_service_unavailable();

assert!(resp.starts_with("HTTP/1.1 503"));
assert!(resp.contains(r#""error":"server_busy""#));
assert!(resp.contains("Access-Control-Allow-Origin: *"));
}
}
Loading