Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion nginx_module/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
name = "nginx_module"
readme = "README.md"
repository = "https://github.com/g-Core/nginx-rust"
version = "0.1.5"
version = "0.1.6"


# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
Expand Down
238 changes: 170 additions & 68 deletions nginx_module/src/unix_socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,12 @@ struct WriteBuffer {
const MIN_TIMEOUT_MS: usize = 1;
const TIMEOUT_MS: usize = 255;

fn log_borrow_mut_error(context: &str, error: std::cell::BorrowMutError) {
if let Ok(err_msg) = std::ffi::CString::new(format!("{}: {}", context, error)) {
unsafe { ngx_log_error(NGX_LOG_ERR as usize, (*ngx_cycle).log, 0, &err_msg) };
}
}

impl UnixSocket {
pub fn connect(
path: String,
Expand Down Expand Up @@ -126,28 +132,46 @@ impl UnixSocket {
};

let inner = Box::pin(inner);
match &mut *inner.state.borrow_mut() {
State::WaitServerHandshake { conn, .. } => unsafe {
(**conn).data = (&*inner as *const Inner as *mut Inner).cast();
(*(**conn).write).handler = Some(on_write);
(*(**conn).read).handler = Some(on_read);
},
State::Connected { conn, .. } => unsafe {
(**conn).data = (&*inner as *const Inner as *mut Inner).cast();
(*(**conn).write).handler = Some(on_write);
(*(**conn).read).handler = Some(on_read);
match inner.state.try_borrow_mut() {
Ok(mut state) => match &mut *state {
State::WaitServerHandshake { conn, .. } => unsafe {
(**conn).data = (&*inner as *const Inner as *mut Inner).cast();
(*(**conn).write).handler = Some(on_write);
(*(**conn).read).handler = Some(on_read);
},
State::Connected { conn, .. } => unsafe {
(**conn).data = (&*inner as *const Inner as *mut Inner).cast();
(*(**conn).write).handler = Some(on_write);
(*(**conn).read).handler = Some(on_read);
},
State::Disconnected { event, .. } => {
event.data = (&*inner as *const Inner as *mut Inner).cast();
match inner.after_handshake.try_borrow_mut() {
Ok(mut after_handshake) => {
(after_handshake)(); // discard send data, it is disconnected
}
Err(error) => {
log_borrow_mut_error("connect: after_handshake borrow", error);
}
}
}
},
State::Disconnected { event, .. } => {
event.data = (&*inner as *const Inner as *mut Inner).cast();
(inner.after_handshake.borrow_mut())(); // discard send data, it is disconnected
Err(error) => {
log_borrow_mut_error("connect: state borrow", error);
}
}

Self(inner)
}

pub fn disconnected(&self) -> bool {
matches!(&*self.0.state.borrow_mut(), State::Disconnected { .. })
match self.0.state.try_borrow_mut() {
Ok(state) => matches!(&*state, State::Disconnected { .. }),
Err(error) => {
log_borrow_mut_error("disconnected: state borrow", error);
false
}
}
}

pub fn stop(&self) {
Expand All @@ -156,9 +180,16 @@ impl UnixSocket {
let mut dummy_ev: Box<ngx_event_t> = Box::new(MaybeUninit::zeroed().assume_init());
dummy_ev.handler = None;
dummy_ev.log = (*ngx_cycle).log;
*self.0.state.borrow_mut() = State::Disconnected {
event: dummy_ev,
reconnect_timeout: MIN_TIMEOUT_MS,
match self.0.state.try_borrow_mut() {
Ok(mut state) => {
*state = State::Disconnected {
event: dummy_ev,
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
Err(error) => {
log_borrow_mut_error("stop: state borrow", error);
}
}
}
}
Expand Down Expand Up @@ -264,30 +295,36 @@ impl UnixSocket {

impl Inner {
fn write(&self, buf: &[u8]) -> anyhow::Result<()> {
match &mut *self.state.borrow_mut() {
State::Connected {
conn,
ref mut buffers,
} => {
buffers.push(buf);
unsafe {
buffers
.send(*conn)
.map_err(|_| anyhow::anyhow!("Disconnected"))
match self.state.try_borrow_mut() {
Ok(mut state) => match &mut *state {
State::Connected {
conn,
ref mut buffers,
} => {
buffers.push(buf);
unsafe {
buffers
.send(*conn)
.map_err(|_| anyhow::anyhow!("Disconnected"))
}
}
}
State::WaitServerHandshake {
conn,
ref mut buffers,
} => {
buffers.push(buf);
unsafe {
buffers
.send(*conn)
.map_err(|_| anyhow::anyhow!("Disconnected"))
State::WaitServerHandshake {
conn,
ref mut buffers,
} => {
buffers.push(buf);
unsafe {
buffers
.send(*conn)
.map_err(|_| anyhow::anyhow!("Disconnected"))
}
}
State::Disconnected { .. } => Err(anyhow::anyhow!("Disconnected")),
},
Err(error) => {
log_borrow_mut_error("write: state borrow", error);
Err(anyhow::anyhow!("Borrow error"))
}
State::Disconnected { .. } => Err(anyhow::anyhow!("Disconnected")),
}
}

Expand Down Expand Up @@ -492,13 +529,31 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) {
let mut handshake_done = false;
let mut is_connected = false;
{
let state = &mut *(*data).state.borrow_mut();
let mut state = match (*data).state.try_borrow_mut() {
Ok(state) => state,
Err(error) => {
log_borrow_mut_error("on_read: state borrow", error);
break;
}
};

match state {
match &mut *state {
State::WaitServerHandshake { conn, buffers } => {
match ((*data).check_handshake.borrow_mut())(
&buf[..result as usize],
) {
let handshake_result = match (*data).check_handshake.try_borrow_mut()
{
Ok(mut check_handshake) => {
(check_handshake)(&buf[..result as usize])
}
Err(error) => {
log_borrow_mut_error(
"on_read: check_handshake borrow",
error,
);
Err(anyhow::anyhow!("Borrow error"))
}
};

match handshake_result {
Ok(_) => {
let new_conn = *conn;
*conn = std::ptr::null_mut();
Expand All @@ -524,45 +579,95 @@ unsafe extern "C" fn on_read(rev: *mut ngx_event_t) {
}
} // state not borrowed here any more
if handshake_done {
let back_data = ((*data).after_handshake.borrow_mut())();
let back_data = match (*data).after_handshake.try_borrow_mut() {
Ok(mut after_handshake) => (after_handshake)(),
Err(error) => {
log_borrow_mut_error("on_read: after_handshake borrow", error);
Vec::new()
}
};
if !back_data.is_empty() && (*data).write(&back_data).is_err() {
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
match (*data).state.try_borrow_mut() {
Ok(mut state) => {
*state = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
Err(error) => {
log_borrow_mut_error(
"on_read: state borrow (after_handshake)",
error,
);
}
}
}
}
if is_connected {
let back_data = ((*data).on_read.borrow_mut())(&buf[..result as usize]);
let back_data = match (*data).on_read.try_borrow_mut() {
Ok(mut on_read) => (on_read)(&buf[..result as usize]),
Err(error) => {
log_borrow_mut_error("on_read: on_read borrow", error);
Vec::new()
}
};

if !back_data.is_empty() && (*data).write(&back_data).is_err() {
*(*data).state.borrow_mut() = State::Disconnected {
match (*data).state.try_borrow_mut() {
Ok(mut state) => {
*state = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
Err(error) => {
log_borrow_mut_error("on_read: state borrow (on_read)", error);
}
}
}
}
} else if result == 0 {
// error, signal connection close
match (*data).state.try_borrow_mut() {
Ok(mut state) => {
*state = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
Err(error) => {
log_borrow_mut_error("on_read: state borrow (result=0)", error);
}
}
} else if result == 0 {
// error, signal connection close
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
break;
} else if result == NGX_AGAIN as isize {
if ngx_handle_read_event(conn.read, 0) != NGX_OK as isize {
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
match (*data).state.try_borrow_mut() {
Ok(mut state) => {
*state = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
Err(error) => {
log_borrow_mut_error("on_read: state borrow (NGX_AGAIN)", error);
}
}
}
break;
} else {
// error, retry reconnect
*(*data).state.borrow_mut() = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
match (*data).state.try_borrow_mut() {
Ok(mut state) => {
*state = State::Disconnected {
event: (*data).create_and_schedule_reconnect(),
reconnect_timeout: MIN_TIMEOUT_MS,
};
}
Err(error) => {
log_borrow_mut_error("on_read: state borrow (error)", error);
}
}
break;
}
}
Expand All @@ -587,10 +692,7 @@ unsafe extern "C" fn on_reconnect_timeout(ev: *mut ngx_event_t) {
let mut state = match data.state.try_borrow_mut() {
Ok(state) => state,
Err(error) => {
if let Ok(err_msg) = std::ffi::CString::new(format!("on_reconnect_timeout: {}", error))
{
ngx_log_error(NGX_LOG_ERR as usize, (*ngx_cycle).log, 0, &err_msg);
}
log_borrow_mut_error("on_reconnect_timeout: state borrow", error);
// State is already borrowed, schedule retry
if (*ev).timer_set() == 0 && ngx_quit == 0 && ngx_exiting == 0 && ngx_terminate == 0 {
ngx_event_add_timer(ev, MIN_TIMEOUT_MS);
Expand Down
Loading