Skip to content

Commit b9e5385

Browse files
committed
fix: drop backend connections on close
1 parent 1dfa088 commit b9e5385

File tree

1 file changed

+70
-58
lines changed
  • crates/rproxy/src/server/proxy/ws

1 file changed

+70
-58
lines changed

crates/rproxy/src/server/proxy/ws/proxy.rs

Lines changed: 70 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::{
22
io::Write,
33
marker::PhantomData,
4+
mem,
45
str::FromStr,
56
sync::{
67
Arc,
@@ -372,8 +373,8 @@ where
372373
resetter: this.resetter.clone(),
373374
clnt_tx,
374375
clnt_rx,
375-
bknd_tx,
376-
bknd_rx,
376+
bknd_tx: Some(bknd_tx),
377+
bknd_rx: Some(bknd_rx),
377378
pings: HashMap::new(),
378379
ping_balance_bknd: AtomicI64::new(0),
379380
ping_balance_clnt: AtomicI64::new(0),
@@ -662,8 +663,8 @@ where
662663

663664
clnt_tx: Session,
664665
clnt_rx: MessageStream,
665-
bknd_tx: SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>,
666-
bknd_rx: SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>,
666+
bknd_tx: Option<SplitSink<WebSocketStream<MaybeTlsStream<TcpStream>>, tungstenite::Message>>,
667+
bknd_rx: Option<SplitStream<WebSocketStream<MaybeTlsStream<TcpStream>>>>,
667668

668669
pings: HashMap<Uuid, ProxyWsPing>,
669670
ping_balance_clnt: AtomicI64,
@@ -734,7 +735,12 @@ where
734735
}
735736

736737
// backend => client
737-
bknd_msg = self.bknd_rx.next() => {
738+
bknd_msg = async {
739+
match &mut self.bknd_rx {
740+
Some(bknd_rx) => bknd_rx.next().await,
741+
None => None,
742+
}
743+
} => {
738744
pumping = self.pump_bknd_to_cli(UtcDateTime::now(), bknd_msg).await;
739745
}
740746
}
@@ -828,8 +834,9 @@ where
828834
}
829835

830836
let bknd_ping = ProxyWsPing::new(self.info.conn_id());
831-
if let Err(err) =
832-
self.bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await
837+
if let Some(bknd_tx) = &mut self.bknd_tx &&
838+
let Err(err) =
839+
bknd_tx.send(tungstenite::Message::Ping(bknd_ping.to_bytes())).await
833840
{
834841
error!(
835842
proxy = P::name(),
@@ -880,8 +887,9 @@ where
880887
return Ok(());
881888
}
882889

883-
if let Err(err) =
884-
self.bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await
890+
if let Some(bknd_tx) = &mut self.bknd_tx &&
891+
let Err(err) =
892+
bknd_tx.send(tungstenite::Message::Binary(bytes.clone())).await
885893
{
886894
error!(
887895
proxy = P::name(),
@@ -916,15 +924,15 @@ where
916924
return Ok(());
917925
}
918926

919-
if let Err(err) = self
920-
.bknd_tx
921-
.send(tungstenite::Message::Text(unsafe {
922-
// safety: it's from client's ws message => must be valid utf-8
923-
tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked(
924-
text.clone().into_bytes(),
925-
)
926-
}))
927-
.await
927+
if let Some(bknd_tx) = &mut self.bknd_tx &&
928+
let Err(err) = bknd_tx
929+
.send(tungstenite::Message::Text(unsafe {
930+
// safety: it's from client's ws message => must be valid utf-8
931+
tungstenite::protocol::frame::Utf8Bytes::from_bytes_unchecked(
932+
text.clone().into_bytes(),
933+
)
934+
}))
935+
.await
928936
{
929937
error!(
930938
proxy = P::name(),
@@ -1202,7 +1210,8 @@ where
12021210
return Ok(());
12031211
}
12041212

1205-
if let Err(err) = self.bknd_tx.send(tungstenite::Message::Pong(bytes)).await
1213+
if let Some(bknd_tx) = &mut self.bknd_tx &&
1214+
let Err(err) = bknd_tx.send(tungstenite::Message::Pong(bytes)).await
12061215
{
12071216
error!(
12081217
proxy = P::name(),
@@ -1320,37 +1329,50 @@ where
13201329
&mut self,
13211330
frame: tungstenite::protocol::CloseFrame,
13221331
) -> Result<(), &'static str> {
1323-
debug!(
1324-
proxy = P::name(),
1325-
connection_id = %self.info.conn_id(),
1326-
worker_id = %self.worker_id,
1327-
msg = %frame.reason,
1328-
"Closing backend websocket session..."
1329-
);
1332+
if let Some(mut bknd_tx) = mem::take(&mut self.bknd_tx) {
1333+
debug!(
1334+
proxy = P::name(),
1335+
connection_id = %self.info.conn_id(),
1336+
worker_id = %self.worker_id,
1337+
msg = %frame.reason,
1338+
"Closing backend websocket session..."
1339+
);
13301340

1331-
if let Err(err) = self
1332-
.bknd_tx
1333-
.send(tungstenite::Message::Close(Some(
1334-
frame.clone(), // it's cheap to clone
1335-
)))
1336-
.await
1337-
{
1338-
if let tungstenite::error::Error::AlreadyClosed = err {
1339-
return Ok(());
1340-
}
1341-
if let tungstenite::error::Error::Protocol(protocol_err) = err {
1342-
if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing {
1341+
if let Err(err) = bknd_tx
1342+
.send(tungstenite::Message::Close(Some(
1343+
frame.clone(), // it's cheap to clone
1344+
)))
1345+
.await
1346+
{
1347+
if let tungstenite::error::Error::AlreadyClosed = err {
13431348
return Ok(());
13441349
}
1345-
error!(
1346-
proxy = P::name(),
1347-
connection_id = %self.info.conn_id(),
1348-
worker_id = %self.worker_id,
1349-
msg = %frame.reason,
1350-
error = ?protocol_err,
1351-
"Failed to close backend websocket session"
1352-
);
1353-
} else {
1350+
if let tungstenite::error::Error::Protocol(protocol_err) = err {
1351+
if protocol_err == tungstenite::error::ProtocolError::SendAfterClosing {
1352+
return Ok(());
1353+
}
1354+
error!(
1355+
proxy = P::name(),
1356+
connection_id = %self.info.conn_id(),
1357+
worker_id = %self.worker_id,
1358+
msg = %frame.reason,
1359+
error = ?protocol_err,
1360+
"Failed to close backend websocket session"
1361+
);
1362+
} else {
1363+
error!(
1364+
proxy = P::name(),
1365+
connection_id = %self.info.conn_id(),
1366+
worker_id = %self.worker_id,
1367+
msg = %frame.reason,
1368+
error = ?err,
1369+
"Failed to close backend websocket session"
1370+
);
1371+
}
1372+
return Err(WS_BKND_ERROR);
1373+
}
1374+
1375+
if let Err(err) = bknd_tx.close().await {
13541376
error!(
13551377
proxy = P::name(),
13561378
connection_id = %self.info.conn_id(),
@@ -1360,19 +1382,9 @@ where
13601382
"Failed to close backend websocket session"
13611383
);
13621384
}
1363-
return Err(WS_BKND_ERROR);
13641385
}
13651386

1366-
if let Err(err) = self.bknd_tx.close().await {
1367-
error!(
1368-
proxy = P::name(),
1369-
connection_id = %self.info.conn_id(),
1370-
worker_id = %self.worker_id,
1371-
msg = %frame.reason,
1372-
error = ?err,
1373-
"Failed to close backend websocket session"
1374-
);
1375-
}
1387+
drop(mem::take(&mut self.bknd_rx));
13761388

13771389
Ok(())
13781390
}

0 commit comments

Comments
 (0)