From 2e76f90c241b94b83322ec59e281c5feda587926 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 18 Feb 2026 11:33:21 +0200 Subject: [PATCH 1/2] fix: handle BorrowMutError in state management --- nginx_module/src/unix_socket.rs | 238 +++++++++++++++++++++++--------- 1 file changed, 170 insertions(+), 68 deletions(-) diff --git a/nginx_module/src/unix_socket.rs b/nginx_module/src/unix_socket.rs index a2c9141..fe86db0 100644 --- a/nginx_module/src/unix_socket.rs +++ b/nginx_module/src/unix_socket.rs @@ -61,6 +61,12 @@ struct WriteBuffer { const MIN_TIMEOUT_MS: usize = 1; const TIMEOUT_MS: usize = 255; +fn log_borrow_mut_error(context: &str, error: std::cell::BorrowMutError) { + if let Ok(err_msg) = std::ffi::CString::new(format!("{}: {}", context, error)) { + unsafe { ngx_log_error(NGX_LOG_ERR as usize, (*ngx_cycle).log, 0, &err_msg) }; + } +} + impl UnixSocket { pub fn connect( path: String, @@ -126,20 +132,32 @@ impl UnixSocket { }; let inner = Box::pin(inner); - match &mut *inner.state.borrow_mut() { - State::WaitServerHandshake { conn, .. } => unsafe { - (**conn).data = (&*inner as *const Inner as *mut Inner).cast(); - (*(**conn).write).handler = Some(on_write); - (*(**conn).read).handler = Some(on_read); - }, - State::Connected { conn, .. } => unsafe { - (**conn).data = (&*inner as *const Inner as *mut Inner).cast(); - (*(**conn).write).handler = Some(on_write); - (*(**conn).read).handler = Some(on_read); + match inner.state.try_borrow_mut() { + Ok(mut state) => match &mut *state { + State::WaitServerHandshake { conn, .. } => unsafe { + (**conn).data = (&*inner as *const Inner as *mut Inner).cast(); + (*(**conn).write).handler = Some(on_write); + (*(**conn).read).handler = Some(on_read); + }, + State::Connected { conn, .. } => unsafe { + (**conn).data = (&*inner as *const Inner as *mut Inner).cast(); + (*(**conn).write).handler = Some(on_write); + (*(**conn).read).handler = Some(on_read); + }, + State::Disconnected { event, .. } => { + event.data = (&*inner as *const Inner as *mut Inner).cast(); + match inner.after_handshake.try_borrow_mut() { + Ok(mut after_handshake) => { + (after_handshake)(); // discard send data, it is disconnected + } + Err(error) => { + log_borrow_mut_error("connect: after_handshake borrow", error); + } + } + } }, - State::Disconnected { event, .. } => { - event.data = (&*inner as *const Inner as *mut Inner).cast(); - (inner.after_handshake.borrow_mut())(); // discard send data, it is disconnected + Err(error) => { + log_borrow_mut_error("connect: state borrow", error); } } @@ -147,7 +165,13 @@ impl UnixSocket { } pub fn disconnected(&self) -> bool { - matches!(&*self.0.state.borrow_mut(), State::Disconnected { .. }) + match self.0.state.try_borrow_mut() { + Ok(state) => matches!(&*state, State::Disconnected { .. }), + Err(error) => { + log_borrow_mut_error("disconnected: state borrow", error); + false + } + } } pub fn stop(&self) { @@ -156,9 +180,16 @@ impl UnixSocket { let mut dummy_ev: Box = Box::new(MaybeUninit::zeroed().assume_init()); dummy_ev.handler = None; dummy_ev.log = (*ngx_cycle).log; - *self.0.state.borrow_mut() = State::Disconnected { - event: dummy_ev, - reconnect_timeout: MIN_TIMEOUT_MS, + match self.0.state.try_borrow_mut() { + Ok(mut state) => { + *state = State::Disconnected { + event: dummy_ev, + reconnect_timeout: MIN_TIMEOUT_MS, + }; + } + Err(error) => { + log_borrow_mut_error("stop: state borrow", error); + } } } } @@ -264,30 +295,36 @@ impl UnixSocket { impl Inner { fn write(&self, buf: &[u8]) -> anyhow::Result<()> { - match &mut *self.state.borrow_mut() { - State::Connected { - conn, - ref mut buffers, - } => { - buffers.push(buf); - unsafe { - buffers - .send(*conn) - .map_err(|_| anyhow::anyhow!("Disconnected")) + match self.state.try_borrow_mut() { + Ok(mut state) => match &mut *state { + State::Connected { + conn, + ref mut buffers, + } => { + buffers.push(buf); + unsafe { + buffers + .send(*conn) + .map_err(|_| anyhow::anyhow!("Disconnected")) + } } - } - State::WaitServerHandshake { - conn, - ref mut buffers, - } => { - buffers.push(buf); - unsafe { - buffers - .send(*conn) - .map_err(|_| anyhow::anyhow!("Disconnected")) + State::WaitServerHandshake { + conn, + ref mut buffers, + } => { + buffers.push(buf); + unsafe { + buffers + .send(*conn) + .map_err(|_| anyhow::anyhow!("Disconnected")) + } } + State::Disconnected { .. } => Err(anyhow::anyhow!("Disconnected")), + }, + Err(error) => { + log_borrow_mut_error("write: state borrow", error); + Err(anyhow::anyhow!("Borrow error")) } - State::Disconnected { .. } => Err(anyhow::anyhow!("Disconnected")), } } @@ -492,13 +529,31 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { let mut handshake_done = false; let mut is_connected = false; { - let state = &mut *(*data).state.borrow_mut(); + let mut state = match (*data).state.try_borrow_mut() { + Ok(state) => state, + Err(error) => { + log_borrow_mut_error("on_read: state borrow", error); + break; + } + }; - match state { + match &mut *state { State::WaitServerHandshake { conn, buffers } => { - match ((*data).check_handshake.borrow_mut())( - &buf[..result as usize], - ) { + let handshake_result = match (*data).check_handshake.try_borrow_mut() + { + Ok(mut check_handshake) => { + (check_handshake)(&buf[..result as usize]) + } + Err(error) => { + log_borrow_mut_error( + "on_read: check_handshake borrow", + error, + ); + Err(anyhow::anyhow!("Borrow error")) + } + }; + + match handshake_result { Ok(_) => { let new_conn = *conn; *conn = std::ptr::null_mut(); @@ -524,45 +579,95 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) { } } // state not borrowed here any more if handshake_done { - let back_data = ((*data).after_handshake.borrow_mut())(); + let back_data = match (*data).after_handshake.try_borrow_mut() { + Ok(mut after_handshake) => (after_handshake)(), + Err(error) => { + log_borrow_mut_error("on_read: after_handshake borrow", error); + Vec::new() + } + }; if !back_data.is_empty() && (*data).write(&back_data).is_err() { - *(*data).state.borrow_mut() = State::Disconnected { - event: (*data).create_and_schedule_reconnect(), - reconnect_timeout: MIN_TIMEOUT_MS, - }; + match (*data).state.try_borrow_mut() { + Ok(mut state) => { + *state = State::Disconnected { + event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, + }; + } + Err(error) => { + log_borrow_mut_error( + "on_read: state borrow (after_handshake)", + error, + ); + } + } } } if is_connected { - let back_data = ((*data).on_read.borrow_mut())(&buf[..result as usize]); + let back_data = match (*data).on_read.try_borrow_mut() { + Ok(mut on_read) => (on_read)(&buf[..result as usize]), + Err(error) => { + log_borrow_mut_error("on_read: on_read borrow", error); + Vec::new() + } + }; if !back_data.is_empty() && (*data).write(&back_data).is_err() { - *(*data).state.borrow_mut() = State::Disconnected { + match (*data).state.try_borrow_mut() { + Ok(mut state) => { + *state = State::Disconnected { + event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, + }; + } + Err(error) => { + log_borrow_mut_error("on_read: state borrow (on_read)", error); + } + } + } + } + } else if result == 0 { + // error, signal connection close + match (*data).state.try_borrow_mut() { + Ok(mut state) => { + *state = State::Disconnected { event: (*data).create_and_schedule_reconnect(), reconnect_timeout: MIN_TIMEOUT_MS, }; } + Err(error) => { + log_borrow_mut_error("on_read: state borrow (result=0)", error); + } } - } else if result == 0 { - // error, signal connection close - *(*data).state.borrow_mut() = State::Disconnected { - event: (*data).create_and_schedule_reconnect(), - reconnect_timeout: MIN_TIMEOUT_MS, - }; break; } else if result == NGX_AGAIN as isize { if ngx_handle_read_event(conn.read, 0) != NGX_OK as isize { - *(*data).state.borrow_mut() = State::Disconnected { - event: (*data).create_and_schedule_reconnect(), - reconnect_timeout: MIN_TIMEOUT_MS, - }; + match (*data).state.try_borrow_mut() { + Ok(mut state) => { + *state = State::Disconnected { + event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, + }; + } + Err(error) => { + log_borrow_mut_error("on_read: state borrow (NGX_AGAIN)", error); + } + } } break; } else { // error, retry reconnect - *(*data).state.borrow_mut() = State::Disconnected { - event: (*data).create_and_schedule_reconnect(), - reconnect_timeout: MIN_TIMEOUT_MS, - }; + match (*data).state.try_borrow_mut() { + Ok(mut state) => { + *state = State::Disconnected { + event: (*data).create_and_schedule_reconnect(), + reconnect_timeout: MIN_TIMEOUT_MS, + }; + } + Err(error) => { + log_borrow_mut_error("on_read: state borrow (error)", error); + } + } break; } } @@ -587,10 +692,7 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) { let mut state = match data.state.try_borrow_mut() { Ok(state) => state, Err(error) => { - if let Ok(err_msg) = std::ffi::CString::new(format!("on_reconnect_timeout: {}", error)) - { - ngx_log_error(NGX_LOG_ERR as usize, (*ngx_cycle).log, 0, &err_msg); - } + log_borrow_mut_error("on_reconnect_timeout: state borrow", error); // State is already borrowed, schedule retry if (*ev).timer_set() == 0 && ngx_quit == 0 && ngx_exiting == 0 && ngx_terminate == 0 { ngx_event_add_timer(ev, MIN_TIMEOUT_MS); From e8e34cfc8ed7bdea0ba7e09569ae0feaef8a6d96 Mon Sep 17 00:00:00 2001 From: Ruslan Pislari Date: Wed, 18 Feb 2026 11:34:46 +0200 Subject: [PATCH 2/2] chore: bump version to 0.1.6 --- nginx_module/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nginx_module/Cargo.toml b/nginx_module/Cargo.toml index 82888eb..95ec386 100644 --- a/nginx_module/Cargo.toml +++ b/nginx_module/Cargo.toml @@ -8,7 +8,7 @@ name = "nginx_module" readme = "README.md" repository = "https://github.com/g-Core/nginx-rust" - version = "0.1.5" + version = "0.1.6" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html