diff --git a/node/src/proxy_client/stream_establisher.rs b/node/src/proxy_client/stream_establisher.rs index f25d7a9fe..63c898dcf 100644 --- a/node/src/proxy_client/stream_establisher.rs +++ b/node/src/proxy_client/stream_establisher.rs @@ -1,5 +1,6 @@ // Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. +use crate::proxy_client::stream_handler_pool::StreamSenders; use crate::proxy_client::stream_reader::StreamReader; use crate::proxy_client::stream_writer::StreamWriter; use crate::sub_lib::channel_wrappers::FuturesChannelFactory; @@ -14,7 +15,7 @@ use crate::sub_lib::stream_connector::StreamConnectorReal; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::tokio_wrappers::ReadHalfWrapper; use actix::Recipient; -use crossbeam_channel::Sender; +use crossbeam_channel::{unbounded, Receiver, Sender}; use masq_lib::logger::Logger; use std::io; use std::net::IpAddr; @@ -22,8 +23,9 @@ use std::net::SocketAddr; pub struct StreamEstablisher { pub cryptde: &'static dyn CryptDE, - pub stream_adder_tx: Sender<(StreamKey, Box>)>, + pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>, pub stream_killer_tx: Sender<(StreamKey, u64)>, + pub shutdown_signal_rx: Receiver<()>, pub stream_connector: Box, pub proxy_client_sub: Recipient, pub logger: Logger, @@ -36,6 +38,7 @@ impl Clone for StreamEstablisher { cryptde: self.cryptde, stream_adder_tx: self.stream_adder_tx.clone(), stream_killer_tx: self.stream_killer_tx.clone(), + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorReal {}), proxy_client_sub: self.proxy_client_sub.clone(), logger: self.logger.clone(), @@ -57,11 +60,13 @@ impl StreamEstablisher { payload.target_port, &self.logger, )?; + let (shutdown_signal_tx, shutdown_signal_rx) = unbounded(); self.spawn_stream_reader( &payload.clone(), connection_info.reader, connection_info.peer_addr, + shutdown_signal_rx, ); let (tx_to_write, rx_to_write) = self.channel_factory.make(connection_info.peer_addr); @@ -73,8 +78,13 @@ impl StreamEstablisher { ); tokio::spawn(stream_writer); + let stream_senders = StreamSenders { + writer_data: tx_to_write.clone(), + reader_shutdown_tx: shutdown_signal_tx, + }; + self.stream_adder_tx - .send((payload.stream_key, tx_to_write.clone())) + .send((payload.stream_key, stream_senders)) .expect("StreamHandlerPool died"); Ok(tx_to_write) } @@ -84,12 +94,14 @@ impl StreamEstablisher { payload: &ClientRequestPayload_0v1, read_stream: Box, peer_addr: SocketAddr, + shutdown_signal: Receiver<()>, ) { let stream_reader = StreamReader::new( payload.stream_key, self.proxy_client_sub.clone(), read_stream, self.stream_killer_tx.clone(), + shutdown_signal, peer_addr, ); debug!(self.logger, "Spawning StreamReader for {}", peer_addr); @@ -103,7 +115,7 @@ pub trait StreamEstablisherFactory: Send { pub struct StreamEstablisherFactoryReal { pub cryptde: &'static dyn CryptDE, - pub stream_adder_tx: Sender<(StreamKey, Box>)>, + pub stream_adder_tx: Sender<(StreamKey, StreamSenders)>, pub stream_killer_tx: Sender<(StreamKey, u64)>, pub proxy_client_subs: ProxyClientSubs, pub logger: Logger, @@ -115,6 +127,7 @@ impl StreamEstablisherFactory for StreamEstablisherFactoryReal { cryptde: self.cryptde, stream_adder_tx: self.stream_adder_tx.clone(), stream_killer_tx: self.stream_killer_tx.clone(), + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorReal {}), proxy_client_sub: self.proxy_client_subs.inbound_server_data.clone(), logger: self.logger.clone(), @@ -171,6 +184,7 @@ mod tests { cryptde: main_cryptde(), stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorMock::new()), // only used in "establish_stream" proxy_client_sub, logger: Logger::new("ProxyClient"), @@ -191,6 +205,7 @@ mod tests { }, read_stream, SocketAddr::from_str("1.2.3.4:5678").unwrap(), + unbounded().1, ); proxy_client_awaiter.await_message_count(1); diff --git a/node/src/proxy_client/stream_handler_pool.rs b/node/src/proxy_client/stream_handler_pool.rs index 0c7f1f011..f00c22f65 100644 --- a/node/src/proxy_client/stream_handler_pool.rs +++ b/node/src/proxy_client/stream_handler_pool.rs @@ -14,7 +14,7 @@ use crate::sub_lib::sequence_buffer::SequencedPacket; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::wallet::Wallet; use actix::Recipient; -use crossbeam_channel::{unbounded, Receiver}; +use crossbeam_channel::{unbounded, Receiver, Sender}; use futures::future; use futures::future::Future; use masq_lib::logger::Logger; @@ -29,20 +29,28 @@ use tokio::prelude::future::{err, ok}; use trust_dns_resolver::error::ResolveError; use trust_dns_resolver::lookup_ip::LookupIp; +// TODO: This should be renamed to ProxyClientStreamHandlerPoolReal (or something more concise) +// to differentiate it from the other StreamHandlerPool, which, unlike this, is an actor. pub trait StreamHandlerPool { fn process_package(&self, payload: ClientRequestPayload_0v1, paying_wallet_opt: Option); } pub struct StreamHandlerPoolReal { inner: Arc>, - stream_adder_rx: Receiver<(StreamKey, Box>)>, + stream_adder_rx: Receiver<(StreamKey, StreamSenders)>, stream_killer_rx: Receiver<(StreamKey, u64)>, } +#[derive(Debug)] +pub struct StreamSenders { + pub writer_data: Box>, + pub reader_shutdown_tx: Sender<()>, +} + struct StreamHandlerPoolRealInner { accountant_sub: Recipient, proxy_client_subs: ProxyClientSubs, - stream_writer_channels: HashMap>>, + stream_writer_channels: HashMap, resolver: Box, logger: Logger, establisher_factory: Box, @@ -148,6 +156,32 @@ impl StreamHandlerPoolReal { }; } + fn send_shutdown_signal_to_stream_reader( + reader_shutdown_tx: Sender<()>, + stream_key: &StreamKey, + logger: &Logger, + ) { + match reader_shutdown_tx.try_send(()) { + Ok(()) => { + // todo!("Ok"); + debug!( + logger, + "A shutdown signal was sent to the StreamReader for stream key {:?}.", + stream_key + ); + } + Err(_e) => { + // todo!("ERR"); + debug!( + logger, + "Unable to send a shutdown signal to the StreamReader for \ + stream key {:?}. The channel is already gone.", + stream_key + ); + } + } + } + fn clean_up_bad_stream( inner_arc: Arc>, stream_key: &StreamKey, @@ -159,12 +193,17 @@ impl StreamHandlerPoolReal { inner.logger, "Couldn't process request from CORES package: {}", error ); - if let Some(sender_wrapper) = inner.stream_writer_channels.remove(stream_key) { + if let Some(stream_senders) = inner.stream_writer_channels.remove(stream_key) { debug!( inner.logger, "Removing stream writer for {}", - sender_wrapper.peer_addr() + stream_senders.writer_data.peer_addr() ); + Self::send_shutdown_signal_to_stream_reader( + stream_senders.reader_shutdown_tx, + stream_key, + &inner.logger, + ) } Self::send_terminating_package( stream_key, @@ -185,20 +224,6 @@ impl StreamHandlerPoolReal { Self::perform_write(payload.sequenced_packet, sender_wrapper.clone()).and_then(move |_| { let mut inner = inner_arc.lock().expect("Stream handler pool is poisoned"); - if last_data { - match inner.stream_writer_channels.remove(&stream_key) { - Some(channel) => debug!( - inner.logger, - "Removing StreamWriter {:?} to {}", - stream_key, - channel.peer_addr() - ), - None => debug!( - inner.logger, - "Trying to remove StreamWriter {:?}, but it's already gone", stream_key - ), - } - } if payload_size > 0 { match paying_wallet_opt { Some(wallet) => inner @@ -218,6 +243,30 @@ impl StreamHandlerPoolReal { ), } } + if last_data { + match inner.stream_writer_channels.remove(&stream_key) { + Some(stream_senders) => { + debug!( + inner.logger, + "Removing StreamWriter and Shutting down StreamReader for {:?} to {}", + stream_key, + stream_senders.writer_data.peer_addr() + ); + Self::send_shutdown_signal_to_stream_reader( + stream_senders.reader_shutdown_tx, + &stream_key, + &inner.logger, + ) + } + None => { + debug!( + inner.logger, + "Trying to remove StreamWriter {:?}, but it's already gone", stream_key + ) + } + } + } + Ok(()) }) } @@ -385,7 +434,8 @@ impl StreamHandlerPoolReal { ) -> Option>> { let inner = inner_arc.lock().expect("Stream handler pool is poisoned"); let sender_wrapper_opt = inner.stream_writer_channels.get(stream_key); - sender_wrapper_opt.map(|sender_wrapper_box_ref| sender_wrapper_box_ref.as_ref().clone()) + sender_wrapper_opt + .map(|sender_wrapper_box_ref| sender_wrapper_box_ref.writer_data.as_ref().clone()) } fn make_logger_copy(inner_arc: &Arc>) -> Logger { @@ -430,7 +480,7 @@ impl StreamHandlerPoolReal { let mut inner = self.inner.lock().expect("Stream handler pool is poisoned"); while let Ok((stream_key, sequence_number)) = self.stream_killer_rx.try_recv() { match inner.stream_writer_channels.remove(&stream_key) { - Some(writer_channel) => { + Some(stream_senders) => { inner .proxy_client_subs .inbound_server_data @@ -438,14 +488,20 @@ impl StreamHandlerPoolReal { stream_key, last_data: true, sequence_number, - source: writer_channel.peer_addr(), + source: stream_senders.writer_data.peer_addr(), data: vec![], }) .expect("ProxyClient is dead"); + Self::send_shutdown_signal_to_stream_reader( + stream_senders.reader_shutdown_tx, + &stream_key, + &inner.logger, + ); debug!( inner.logger, - "Killed StreamWriter to {} and sent server-drop report", - writer_channel.peer_addr() + "Killed StreamWriter and StreamReader for the stream key {:?} to {} and sent server-drop report", + stream_key, + stream_senders.writer_data.peer_addr() ) } None => debug!( @@ -461,16 +517,16 @@ impl StreamHandlerPoolReal { loop { match self.stream_adder_rx.try_recv() { Err(_) => break, - Ok((stream_key, stream_writer_channel)) => { + Ok((stream_key, stream_senders)) => { debug!( inner.logger, "Persisting StreamWriter to {} under key {:?}", - stream_writer_channel.peer_addr(), + stream_senders.writer_data.peer_addr(), stream_key ); inner .stream_writer_channels - .insert(stream_key, stream_writer_channel) + .insert(stream_key, stream_senders) } }; } @@ -519,7 +575,7 @@ mod tests { use crate::proxy_client::local_test_utils::make_send_error; use crate::proxy_client::local_test_utils::ResolverWrapperMock; use crate::proxy_client::stream_establisher::StreamEstablisher; - use crate::sub_lib::channel_wrappers::FuturesChannelFactoryReal; + use crate::sub_lib::channel_wrappers::{FuturesChannelFactoryReal, SenderWrapperReal}; use crate::sub_lib::cryptde::PublicKey; use crate::sub_lib::hopper::ExpiredCoresPackage; use crate::sub_lib::hopper::MessageType; @@ -531,12 +587,12 @@ mod tests { use crate::test_utils::main_cryptde; use crate::test_utils::make_meaningless_route; use crate::test_utils::make_wallet; - use crate::test_utils::recorder::make_recorder; use crate::test_utils::recorder::peer_actors_builder; + use crate::test_utils::recorder::{make_proxy_client_subs_from_recorder, make_recorder}; use crate::test_utils::stream_connector_mock::StreamConnectorMock; use crate::test_utils::tokio_wrapper_mocks::ReadHalfWrapperMock; use crate::test_utils::tokio_wrapper_mocks::WriteHalfWrapperMock; - use actix::System; + use actix::{Actor, System}; use masq_lib::constants::HTTP_PORT; use masq_lib::test_utils::logging::init_test_logging; use masq_lib::test_utils::logging::TestLogHandler; @@ -595,6 +651,7 @@ mod tests { cryptde, stream_adder_tx: unbounded().0, stream_killer_tx: unbounded().0, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorMock::new()), proxy_client_sub: peer_actors .proxy_client_opt @@ -658,8 +715,9 @@ mod tests { originator_public_key: PublicKey::new(&b"men's souls"[..]), }; let write_parameters = Arc::new(Mutex::new(vec![])); + let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); let tx_to_write = Box::new( - SenderWrapperMock::new(SocketAddr::from_str("1.2.3.4:5678").unwrap()) + SenderWrapperMock::new(peer_addr) .unbounded_send_result(Ok(())) .unbounded_send_params(&write_parameters), ); @@ -682,12 +740,13 @@ mod tests { 100, 200, ); - subject - .inner - .lock() - .unwrap() - .stream_writer_channels - .insert(stream_key, tx_to_write); + subject.inner.lock().unwrap().stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: tx_to_write, + reader_shutdown_tx: unbounded().0, + }, + ); run_process_package_in_actix(subject, package); }); @@ -703,9 +762,11 @@ mod tests { #[test] fn write_failure_for_nonexistent_stream_generates_termination_message() { init_test_logging(); + let test_name = "write_failure_for_nonexistent_stream_generates_termination_message"; let cryptde = main_cryptde(); let (proxy_client, proxy_client_awaiter, proxy_client_recording_arc) = make_recorder(); let originator_key = PublicKey::new(&b"men's souls"[..]); + let (reader_shutdown_tx, reader_shutdown_rx) = unbounded(); thread::spawn(move || { let client_request_payload = ClientRequestPayload_0v1 { stream_key: StreamKey::make_meaningless_stream_key(), @@ -729,11 +790,10 @@ mod tests { let peer_actors = peer_actors_builder().proxy_client(proxy_client).build(); let resolver = ResolverWrapperMock::new() .lookup_ip_success(vec![IpAddr::from_str("2.3.4.5").unwrap()]); - - let tx_to_write = SenderWrapperMock::new(SocketAddr::from_str("2.3.4.5:80").unwrap()) - .unbounded_send_result(make_send_error( - client_request_payload.sequenced_packet.clone(), - )); + let peer_addr = SocketAddr::from_str("2.3.4.5:80").unwrap(); + let tx_to_write = SenderWrapperMock::new(peer_addr).unbounded_send_result( + make_send_error(client_request_payload.sequenced_packet.clone()), + ); let subject = StreamHandlerPoolReal::new( Box::new(resolver), @@ -743,17 +803,24 @@ mod tests { 100, 200, ); - subject - .inner - .lock() - .unwrap() - .stream_writer_channels - .insert(client_request_payload.stream_key, Box::new(tx_to_write)); + { + let mut inner = subject.inner.lock().unwrap(); + inner.stream_writer_channels.insert( + client_request_payload.stream_key, + StreamSenders { + writer_data: Box::new(tx_to_write), + reader_shutdown_tx, + }, + ); + inner.logger = Logger::new(test_name); + } run_process_package_in_actix(subject, package); }); proxy_client_awaiter.await_message_count(1); let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); + let received = reader_shutdown_rx.try_recv(); + assert_eq!(received, Ok(())); assert_eq!( proxy_client_recording.get_record::(0), &InboundServerData { @@ -764,13 +831,15 @@ mod tests { data: vec![], } ); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: A shutdown signal was sent to the StreamReader \ + for stream key AAAAAAAAAAAAAAAAAAAAAAAAAAA." + )); } #[test] fn when_hostname_is_ip_establish_stream_without_dns_lookup() { let cryptde = main_cryptde(); - let lookup_ip_parameters = Arc::new(Mutex::new(vec![])); - let expected_lookup_ip_parameters = lookup_ip_parameters.clone(); let write_parameters = Arc::new(Mutex::new(vec![])); let expected_write_parameters = write_parameters.clone(); let (proxy_client, proxy_client_awaiter, proxy_client_recording_arc) = make_recorder(); @@ -795,12 +864,6 @@ mod tests { client_request_payload.into(), 0, ); - let resolver = ResolverWrapperMock::new() - .lookup_ip_parameters(&lookup_ip_parameters) - .lookup_ip_success(vec![ - IpAddr::from_str("2.3.4.5").unwrap(), - IpAddr::from_str("3.4.5.6").unwrap(), - ]); let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); let first_read_result = b"HTTP/1.1 200 OK\r\n\r\n"; let reader = ReadHalfWrapperMock { @@ -818,7 +881,7 @@ mod tests { shutdown_results: Arc::new(Mutex::new(vec![])), }; let mut subject = StreamHandlerPoolReal::new( - Box::new(resolver), + Box::new(ResolverWrapperMock::new()), cryptde, peer_actors.accountant.report_exit_service_provided.clone(), peer_actors.proxy_client_opt.unwrap().clone(), @@ -834,6 +897,7 @@ mod tests { cryptde, stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorMock::new().with_connection( peer_addr.clone(), peer_addr.clone(), @@ -854,10 +918,6 @@ mod tests { }); proxy_client_awaiter.await_message_count(1); - assert_eq!( - expected_lookup_ip_parameters.lock().unwrap().deref(), - &(vec![] as Vec) - ); assert_eq!( expected_write_parameters.lock().unwrap().remove(0), b"These are the times".to_vec() @@ -875,6 +935,200 @@ mod tests { ); } + #[test] + fn while_housekeeping_the_stream_senders_are_received_by_stream_handler_pool() { + init_test_logging(); + let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true"; + let (shutdown_tx, shutdown_rx) = unbounded(); + let (stream_adder_tx, stream_adder_rx) = unbounded(); + thread::spawn(move || { + let stream_key = StreamKey::make_meaningful_stream_key("I should die"); + let client_request_payload = ClientRequestPayload_0v1 { + stream_key, + sequenced_packet: SequencedPacket { + data: b"I'm gonna kill you stream key".to_vec(), + sequence_number: 0, + last_data: true, + }, + target_hostname: Some(String::from("3.4.5.6:80")), + target_port: HTTP_PORT, + protocol: ProxyProtocol::HTTP, + originator_public_key: PublicKey::new(&b"brutal death"[..]), + }; + let package = ExpiredCoresPackage::new( + SocketAddr::from_str("1.2.3.4:1234").unwrap(), + Some(make_wallet("consuming")), + make_meaningless_route(), + client_request_payload.into(), + 0, + ); + let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); + let peer_actors = peer_actors_builder().build(); + let mut subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided.clone(), + peer_actors.proxy_client_opt.unwrap().clone(), + 100, + 200, + ); + subject.stream_adder_rx = stream_adder_rx; + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: shutdown_tx, + }, + ); + inner.establisher_factory = Box::new(StreamEstablisherFactoryReal { + cryptde: main_cryptde(), + stream_adder_tx, + stream_killer_tx: unbounded().0, + proxy_client_subs: make_proxy_client_subs_from_recorder( + &make_recorder().0.start(), + ), + logger: Logger::new("test"), + }); + } + + // TODO: GH-800: Make sure that the stream_adder_tx sends something to the receiver + + run_process_package_in_actix(subject, package); + }); + let received = shutdown_rx.recv(); + assert_eq!(received, Ok(())); + TestLogHandler::new().await_log_containing( + &format!( + "DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \ + for oUHoHuDKHjeWq+BJzBIqHpPFBQw to 3.4.5.6:80" + ), + 500, + ); + } + + #[test] + fn stream_handler_pool_sends_shutdown_signal_when_last_data_is_true() { + init_test_logging(); + let test_name = "stream_handler_pool_sends_shutdown_signal_when_last_data_is_true"; + let (shutdown_tx, shutdown_rx) = unbounded(); + thread::spawn(move || { + let stream_key = StreamKey::make_meaningful_stream_key("I should die"); + let client_request_payload = ClientRequestPayload_0v1 { + stream_key, + sequenced_packet: SequencedPacket { + data: b"I'm gonna kill you stream key".to_vec(), + sequence_number: 0, + last_data: true, + }, + target_hostname: Some(String::from("3.4.5.6:80")), + target_port: HTTP_PORT, + protocol: ProxyProtocol::HTTP, + originator_public_key: PublicKey::new(&b"brutal death"[..]), + }; + let package = ExpiredCoresPackage::new( + SocketAddr::from_str("1.2.3.4:1234").unwrap(), + Some(make_wallet("consuming")), + make_meaningless_route(), + client_request_payload.into(), + 0, + ); + let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); + let peer_actors = peer_actors_builder().build(); + let subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided.clone(), + peer_actors.proxy_client_opt.unwrap().clone(), + 100, + 200, + ); + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: shutdown_tx, + }, + ); + } + + run_process_package_in_actix(subject, package); + }); + let received = shutdown_rx.recv(); + assert_eq!(received, Ok(())); + TestLogHandler::new().await_log_containing( + &format!( + "DEBUG: {test_name}: Removing StreamWriter and Shutting down StreamReader \ + for oUHoHuDKHjeWq+BJzBIqHpPFBQw to 3.4.5.6:80" + ), + 500, + ); + } + + #[test] + fn stream_handler_pool_logs_when_shutdown_channel_is_broken() { + init_test_logging(); + let test_name = "stream_handler_pool_logs_when_shutdown_channel_is_broken"; + let broken_shutdown_channel_tx = unbounded().0; + thread::spawn(move || { + let stream_key = StreamKey::make_meaningful_stream_key("I should die"); + let client_request_payload = ClientRequestPayload_0v1 { + stream_key, + sequenced_packet: SequencedPacket { + data: b"I'm gonna kill you stream key".to_vec(), + sequence_number: 0, + last_data: true, + }, + target_hostname: Some(String::from("3.4.5.6:80")), + target_port: HTTP_PORT, + protocol: ProxyProtocol::HTTP, + originator_public_key: PublicKey::new(&b"brutal death"[..]), + }; + let package = ExpiredCoresPackage::new( + SocketAddr::from_str("1.2.3.4:1234").unwrap(), + Some(make_wallet("consuming")), + make_meaningless_route(), + client_request_payload.into(), + 0, + ); + let peer_addr = SocketAddr::from_str("3.4.5.6:80").unwrap(); + let peer_actors = peer_actors_builder().build(); + let subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided.clone(), + peer_actors.proxy_client_opt.unwrap().clone(), + 100, + 200, + ); + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: broken_shutdown_channel_tx, + }, + ); + } + + run_process_package_in_actix(subject, package); + }); + TestLogHandler::new().await_log_containing( + &format!( + "DEBUG: {test_name}: Unable to send a shutdown signal to the StreamReader \ + for stream key oUHoHuDKHjeWq+BJzBIqHpPFBQw. The channel is already gone." + ), + 500, + ); + } + #[test] fn ip_is_parsed_even_without_port() { let cryptde = main_cryptde(); @@ -943,6 +1197,7 @@ mod tests { cryptde, stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorMock::new().with_connection( peer_addr.clone(), peer_addr.clone(), @@ -1122,6 +1377,7 @@ mod tests { cryptde, stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new(StreamConnectorMock::new().with_connection( peer_addr.clone(), peer_addr.clone(), @@ -1222,6 +1478,7 @@ mod tests { cryptde, stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new( StreamConnectorMock::new() .connect_pair_result(Err(Error::from(ErrorKind::Other))), @@ -1344,6 +1601,7 @@ mod tests { cryptde, stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new( StreamConnectorMock::new() .connect_pair_result(Err(Error::from(ErrorKind::Other))), @@ -1460,6 +1718,7 @@ mod tests { cryptde, stream_adder_tx, stream_killer_tx, + shutdown_signal_rx: unbounded().1, stream_connector: Box::new( StreamConnectorMock::new() .with_connection(peer_addr, peer_addr, reader, writer), @@ -1610,12 +1869,13 @@ mod tests { 100, 200, ); - subject - .inner - .lock() - .unwrap() - .stream_writer_channels - .insert(stream_key, Box::new(sender_wrapper)); + subject.inner.lock().unwrap().stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(sender_wrapper), + reader_shutdown_tx: unbounded().0, + }, + ); run_process_package_in_actix(subject, package); }); @@ -1709,11 +1969,16 @@ mod tests { subject.stream_killer_rx = stream_killer_rx; let stream_key = StreamKey::make_meaningless_stream_key(); let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); + let (shutdown_tx, shutdown_rx) = unbounded(); { let mut inner = subject.inner.lock().unwrap(); - inner - .stream_writer_channels - .insert(stream_key, Box::new(SenderWrapperMock::new(peer_addr))); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: shutdown_tx, + }, + ); } stream_killer_tx.send((stream_key, 47)).unwrap(); @@ -1723,6 +1988,8 @@ mod tests { system.run(); let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); let report = proxy_client_recording.get_record::(0); + let shutdown_signal_received = shutdown_rx.recv(); + assert_eq!(shutdown_signal_received, Ok(())); assert_eq!( report, &InboundServerData { @@ -1735,6 +2002,49 @@ mod tests { ); } + #[test] + fn clean_up_dead_streams_logs_when_the_shutdown_channel_is_down() { + init_test_logging(); + let test_name = "clean_up_dead_streams_logs_when_the_shutdown_channel_is_down"; + let system = System::new(test_name); + let peer_actors = peer_actors_builder().build(); + let mut subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided, + peer_actors.proxy_client_opt.unwrap(), + 0, + 0, + ); + let (stream_killer_tx, stream_killer_rx) = unbounded(); + subject.stream_killer_rx = stream_killer_rx; + let stream_key = StreamKey::make_meaningful_stream_key("I'll be gone well before then."); + let peer_addr = SocketAddr::from_str("1.2.3.4:5678").unwrap(); + let broken_shutdown_channel_tx = unbounded().0; + { + let mut inner = subject.inner.lock().unwrap(); + inner.logger = Logger::new(test_name); + inner.stream_writer_channels.insert( + stream_key, + StreamSenders { + writer_data: Box::new(SenderWrapperMock::new(peer_addr)), + reader_shutdown_tx: broken_shutdown_channel_tx, + }, + ); + } + stream_killer_tx.send((stream_key, 47)).unwrap(); + + subject.clean_up_dead_streams(); + + System::current().stop_with_code(0); + system.run(); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Unable to send a shutdown signal \ + to the StreamReader for stream key cv9IZ5fizc4kZmR+0d+OQGXr3bw. \ + The channel is already gone." + )); + } + #[test] fn clean_up_dead_streams_does_not_send_server_drop_report_if_dead_stream_is_gone_already() { let system = System::new("test"); @@ -1760,4 +2070,77 @@ mod tests { let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); assert_eq!(proxy_client_recording.len(), 0); } + + #[test] + fn add_new_streams_works() { + init_test_logging(); + let test_name = "add_new_streams_works"; + let (stream_adder_tx, stream_adder_rx) = unbounded(); + let peer_actors = peer_actors_builder().build(); + let mut subject = StreamHandlerPoolReal::new( + Box::new(ResolverWrapperMock::new()), + main_cryptde(), + peer_actors.accountant.report_exit_service_provided, + peer_actors.proxy_client_opt.unwrap(), + 0, + 0, + ); + subject.stream_adder_rx = stream_adder_rx; + { + subject.inner.lock().unwrap().logger = Logger::new(test_name); + } + let first_stream_key = StreamKey::make_meaningful_stream_key("first_stream_key"); + let (first_writer_data_tx, _first_writer_data_rx) = futures::sync::mpsc::unbounded(); + let (first_shutdown_tx, _first_shutdown_rx) = unbounded(); + let first_stream_senders = StreamSenders { + writer_data: Box::new(SenderWrapperReal::new( + SocketAddr::from_str("1.2.3.4:5678").unwrap(), + first_writer_data_tx, + )), + reader_shutdown_tx: first_shutdown_tx, + }; + let (second_writer_data_tx, _second_writer_data_rx) = futures::sync::mpsc::unbounded(); + let (second_shutdown_tx, _second_shutdown_rx) = unbounded(); + let second_stream_key = StreamKey::make_meaningful_stream_key("second_stream_key"); + let second_stream_senders = StreamSenders { + writer_data: Box::new(SenderWrapperReal::new( + SocketAddr::from_str("2.3.4.5:6789").unwrap(), + second_writer_data_tx, + )), + reader_shutdown_tx: second_shutdown_tx, + }; + stream_adder_tx + .send((first_stream_key.clone(), first_stream_senders)) + .unwrap(); + stream_adder_tx + .send((second_stream_key.clone(), second_stream_senders)) + .unwrap(); + + subject.add_new_streams(); + + let mut inner = subject.inner.lock().unwrap(); + let actual_first_stream_senders = inner + .stream_writer_channels + .remove(&first_stream_key) + .unwrap(); + let actual_second_stream_senders = inner + .stream_writer_channels + .remove(&second_stream_key) + .unwrap(); + assert_eq!( + actual_first_stream_senders.writer_data.peer_addr(), + SocketAddr::from_str("1.2.3.4:5678").unwrap() + ); + assert_eq!( + actual_second_stream_senders.writer_data.peer_addr(), + SocketAddr::from_str("2.3.4.5:6789").unwrap() + ); + let tlh = TestLogHandler::new(); + tlh.exists_log_containing(&format!( + "DEBUG: {test_name}: Persisting StreamWriter to 1.2.3.4:5678 under key gY2vJ+OwPuItsBcFhbilDI61LGo" + )); + tlh.exists_log_containing(&format!( + "DEBUG: {test_name}: Persisting StreamWriter to 2.3.4.5:6789 under key 1Kbv+3/MIN4/1hLQXLeNPgdDM58" + )); + } } diff --git a/node/src/proxy_client/stream_reader.rs b/node/src/proxy_client/stream_reader.rs index 3f3d82477..65bf4253f 100644 --- a/node/src/proxy_client/stream_reader.rs +++ b/node/src/proxy_client/stream_reader.rs @@ -6,7 +6,7 @@ use crate::sub_lib::tokio_wrappers::ReadHalfWrapper; use crate::sub_lib::utils; use crate::sub_lib::utils::indicates_dead_stream; use actix::Recipient; -use crossbeam_channel::Sender; +use crossbeam_channel::{Receiver, Sender}; use masq_lib::logger::Logger; use std::net::SocketAddr; use tokio::prelude::Async; @@ -17,6 +17,7 @@ pub struct StreamReader { proxy_client_sub: Recipient, stream: Box, stream_killer: Sender<(StreamKey, u64)>, + shutdown_signal: Receiver<()>, peer_addr: SocketAddr, logger: Logger, sequencer: Sequencer, @@ -29,6 +30,13 @@ impl Future for StreamReader { fn poll(&mut self) -> Result::Item>, ::Error> { let mut buf: [u8; 16384] = [0; 16384]; loop { + if self.shutdown_signal.try_recv().is_ok() { + info!( + self.logger, + "Shutting down for stream: {:?}", self.stream_key + ); + return Ok(Async::Ready(())); + } match self.stream.poll_read(&mut buf) { Ok(Async::NotReady) => return Ok(Async::NotReady), Ok(Async::Ready(0)) => { @@ -76,21 +84,34 @@ impl Future for StreamReader { } } +impl Drop for StreamReader { + fn drop(&mut self) { + debug!( + Logger::new("TEST"), + "StreamReader for stream key {:?} is being dropped.", self.stream_key + ) + } +} + impl StreamReader { pub fn new( stream_key: StreamKey, proxy_client_sub: Recipient, stream: Box, stream_killer: Sender<(StreamKey, u64)>, + shutdown_signal: Receiver<()>, peer_addr: SocketAddr, ) -> StreamReader { + let logger = Logger::new(&format!("StreamReader for {:?}/{}", stream_key, peer_addr)[..]); + debug!(logger, "Initialised StreamReader"); StreamReader { stream_key, proxy_client_sub, stream, stream_killer, + shutdown_signal, peer_addr, - logger: Logger::new(&format!("StreamReader for {:?}/{}", stream_key, peer_addr)[..]), + logger, sequencer: Sequencer::new(), } } @@ -120,7 +141,7 @@ mod tests { use crate::test_utils::recorder::make_recorder; use crate::test_utils::recorder::peer_actors_builder; use crate::test_utils::tokio_wrapper_mocks::ReadHalfWrapperMock; - use actix::System; + use actix::{Actor, System}; use crossbeam_channel::unbounded; use masq_lib::test_utils::logging::init_test_logging; use masq_lib::test_utils::logging::TestLogHandler; @@ -175,6 +196,7 @@ mod tests { proxy_client_sub, stream, stream_killer, + shutdown_signal: unbounded().1, peer_addr: SocketAddr::from_str("8.7.4.3:50").unwrap(), logger: Logger::new("test"), sequencer: Sequencer::new(), @@ -253,15 +275,12 @@ mod tests { }); let proxy_client_sub = rx.recv().unwrap(); let (stream_killer, stream_killer_params) = unbounded(); - let mut subject = StreamReader { - stream_key: StreamKey::make_meaningless_stream_key(), - proxy_client_sub, - stream: Box::new(stream), - stream_killer, - peer_addr: SocketAddr::from_str("5.7.9.0:95").unwrap(), - logger: Logger::new("test"), - sequencer: Sequencer::new(), - }; + let peer_addr = SocketAddr::from_str("5.7.9.0:95").unwrap(); + let mut subject = make_subject(); + subject.proxy_client_sub = proxy_client_sub; + subject.stream = Box::new(stream); + subject.stream_killer = stream_killer; + subject.peer_addr = peer_addr; let result = subject.poll(); @@ -274,7 +293,7 @@ mod tests { stream_key: StreamKey::make_meaningless_stream_key(), last_data: false, sequence_number: 0, - source: SocketAddr::from_str("5.7.9.0:95").unwrap(), + source: peer_addr, data: b"HTTP/1.1 200".to_vec() } ); @@ -284,7 +303,7 @@ mod tests { stream_key: StreamKey::make_meaningless_stream_key(), last_data: false, sequence_number: 1, - source: SocketAddr::from_str("5.7.9.0:95").unwrap(), + source: peer_addr, data: b" OK\r\n\r\nHTTP/1.1 40".to_vec() } ); @@ -294,7 +313,7 @@ mod tests { stream_key: StreamKey::make_meaningless_stream_key(), last_data: false, sequence_number: 2, - source: SocketAddr::from_str("5.7.9.0:95").unwrap(), + source: peer_addr, data: b"4 File not found\r\n\r\nHTTP/1.1 503 Server error\r\n\r\n".to_vec() } ); @@ -312,49 +331,53 @@ mod tests { #[test] fn receiving_0_bytes_kills_stream() { init_test_logging(); + let test_name = "receiving_0_bytes_kills_stream"; let stream_key = StreamKey::make_meaningless_stream_key(); let (stream_killer, kill_stream_params) = unbounded(); let mut stream = ReadHalfWrapperMock::new(); stream.poll_read_results = vec![(vec![], Ok(Async::Ready(0)))]; - - let system = System::new("receiving_0_bytes_sends_empty_cores_response_and_kills_stream"); - let peer_actors = peer_actors_builder().build(); + let peer_addr = SocketAddr::from_str("5.3.4.3:654").unwrap(); + let system = System::new(test_name); let mut sequencer = Sequencer::new(); sequencer.next_sequence_number(); sequencer.next_sequence_number(); let mut subject = StreamReader { stream_key, - proxy_client_sub: peer_actors.proxy_client_opt.unwrap().inbound_server_data, + proxy_client_sub: make_recorder().0.start().recipient(), stream: Box::new(stream), stream_killer, - peer_addr: SocketAddr::from_str("5.3.4.3:654").unwrap(), - logger: Logger::new("test"), + shutdown_signal: unbounded().1, + peer_addr, + logger: Logger::new(test_name), sequencer, }; - System::current().stop_with_code(0); + System::current().stop(); system.run(); let result = subject.poll(); assert_eq!(result, Ok(Async::Ready(()))); assert_eq!(kill_stream_params.try_recv().unwrap(), (stream_key, 2)); - TestLogHandler::new() - .exists_log_containing("Stream from 5.3.4.3:654 was closed: (0-byte read)"); + TestLogHandler::new().exists_log_containing(&format!( + "DEBUG: {test_name}: Stream from {peer_addr} was closed: (0-byte read)" + )); } #[test] fn non_dead_stream_read_errors_log_but_do_not_shut_down() { init_test_logging(); + let test_name = "non_dead_stream_read_errors_log_but_do_not_shut_down"; let (proxy_client, proxy_client_awaiter, proxy_client_recording_arc) = make_recorder(); let stream_key = StreamKey::make_meaningless_stream_key(); let (stream_killer, _) = unbounded(); let mut stream = ReadHalfWrapperMock::new(); + let http_response = b"HTTP/1.1 200 OK\r\n\r\n"; stream.poll_read_results = vec![ (vec![], Err(Error::from(ErrorKind::Other))), ( - Vec::from(&b"HTTP/1.1 200 OK\r\n\r\n"[..]), - Ok(Async::Ready(b"HTTP/1.1 200 OK\r\n\r\n".len())), + http_response.to_vec(), + Ok(Async::Ready(http_response.len())), ), (vec![], Err(Error::from(ErrorKind::BrokenPipe))), ]; @@ -371,13 +394,15 @@ mod tests { }); let proxy_client_sub = rx.recv().unwrap(); + let peer_addr = SocketAddr::from_str("6.5.4.1:8325").unwrap(); let mut subject = StreamReader { stream_key, proxy_client_sub, stream: Box::new(stream), stream_killer, - peer_addr: SocketAddr::from_str("6.5.4.1:8325").unwrap(), - logger: Logger::new("test"), + shutdown_signal: unbounded().1, + peer_addr, + logger: Logger::new(test_name), sequencer: Sequencer::new(), }; @@ -386,18 +411,49 @@ mod tests { assert_eq!(result, Err(())); proxy_client_awaiter.await_message_count(1); TestLogHandler::new().exists_log_containing( - "WARN: test: Continuing after read error on stream from 6.5.4.1:8325: other error", + &format!("WARN: {test_name}: Continuing after read error on stream from {peer_addr}: other error"), ); let proxy_client_recording = proxy_client_recording_arc.lock().unwrap(); assert_eq!( proxy_client_recording.get_record::(0), &InboundServerData { - stream_key: StreamKey::make_meaningless_stream_key(), + stream_key, last_data: false, sequence_number: 0, - source: SocketAddr::from_str("6.5.4.1:8325").unwrap(), - data: b"HTTP/1.1 200 OK\r\n\r\n".to_vec() + source: peer_addr, + data: http_response.to_vec() } ); } + + #[test] + fn stream_reader_shuts_down_when_it_receives_the_shutdown_signal() { + init_test_logging(); + let test_name = "stream_reader_shuts_down_when_it_receives_the_shutdown_signal"; + let (shutdown_tx, shutdown_rx) = unbounded(); + let mut subject = make_subject(); + subject.shutdown_signal = shutdown_rx; + subject.logger = Logger::new(test_name); + + shutdown_tx.send(()).unwrap(); + + assert_eq!(subject.poll(), Ok(Async::Ready(()))); + TestLogHandler::new().exists_log_containing(&format!( + "INFO: {test_name}: Shutting down for stream: {:?}", + subject.stream_key + )); + } + + pub fn make_subject() -> StreamReader { + StreamReader { + stream_key: StreamKey::make_meaningless_stream_key(), + proxy_client_sub: make_recorder().0.start().recipient(), + stream: Box::new(ReadHalfWrapperMock::new()), + stream_killer: unbounded().0, + shutdown_signal: unbounded().1, + peer_addr: SocketAddr::from_str("9.8.7.6:5432").unwrap(), + logger: Logger::new("test"), + sequencer: Sequencer::new(), + } + } } diff --git a/node/src/proxy_server/mod.rs b/node/src/proxy_server/mod.rs index 058c7c12f..20633d00e 100644 --- a/node/src/proxy_server/mod.rs +++ b/node/src/proxy_server/mod.rs @@ -29,8 +29,8 @@ use crate::sub_lib::neighborhood::{ExpectedServices, RatePack}; use crate::sub_lib::neighborhood::{NRMetadataChange, RouteQueryMessage}; use crate::sub_lib::peer_actors::BindMessage; use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; -use crate::sub_lib::proxy_server::AddReturnRouteMessage; use crate::sub_lib::proxy_server::ProxyServerSubs; +use crate::sub_lib::proxy_server::{AddReturnRouteMessage, StreamKeyPurge}; use crate::sub_lib::proxy_server::{ AddRouteResultMessage, ClientRequestPayload_0v1, ProxyProtocol, }; @@ -38,13 +38,13 @@ use crate::sub_lib::route::Route; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; use crate::sub_lib::stream_key::StreamKey; use crate::sub_lib::ttl_hashmap::TtlHashMap; -use crate::sub_lib::utils::{handle_ui_crash_request, NODE_MAILBOX_CAPACITY}; +use crate::sub_lib::utils::{handle_ui_crash_request, MessageScheduler, NODE_MAILBOX_CAPACITY}; use crate::sub_lib::wallet::Wallet; -use actix::Addr; use actix::Context; use actix::Handler; use actix::Recipient; use actix::{Actor, MailboxError}; +use actix::{Addr, AsyncContext}; use masq_lib::logger::Logger; use masq_lib::ui_gateway::NodeFromUiMessage; use masq_lib::utils::MutabilityConflictHelper; @@ -58,6 +58,8 @@ use tokio::prelude::Future; pub const CRASH_KEY: &str = "PROXYSERVER"; pub const RETURN_ROUTE_TTL: Duration = Duration::from_secs(120); +pub const STREAM_KEY_PURGE_DELAY: Duration = Duration::from_secs(5); + struct ProxyServerOutSubs { dispatcher: Recipient, hopper: Recipient, @@ -67,6 +69,7 @@ struct ProxyServerOutSubs { add_return_route: Recipient, stream_shutdown_sub: Recipient, route_result_sub: Recipient, + schedule_stream_key_purge: Recipient>, } pub struct ProxyServer { @@ -86,6 +89,7 @@ pub struct ProxyServer { route_ids_to_return_routes: TtlHashMap, browser_proxy_sequence_offset: bool, inbound_client_data_helper_opt: Option>, + stream_key_purge_delay: Duration, } impl Actor for ProxyServer { @@ -106,6 +110,7 @@ impl Handler for ProxyServer { add_return_route: msg.peer_actors.proxy_server.add_return_route, stream_shutdown_sub: msg.peer_actors.proxy_server.stream_shutdown_sub, route_result_sub: msg.peer_actors.proxy_server.route_result_sub, + schedule_stream_key_purge: msg.peer_actors.proxy_server.schedule_stream_key_purge, }; self.subs = Some(subs); } @@ -220,6 +225,25 @@ impl Handler for ProxyServer { } } +impl Handler for ProxyServer { + type Result = (); + + fn handle(&mut self, msg: StreamKeyPurge, _ctx: &mut Self::Context) -> Self::Result { + self.purge_stream_key(&msg.stream_key); + } +} + +impl Handler> for ProxyServer +where + ProxyServer: Handler, +{ + type Result = (); + + fn handle(&mut self, msg: MessageScheduler, ctx: &mut Self::Context) -> Self::Result { + ctx.notify_later(msg.scheduled_msg, msg.delay); + } +} + impl ProxyServer { pub fn new( main_cryptde: &'static dyn CryptDE, @@ -245,6 +269,7 @@ impl ProxyServer { route_ids_to_return_routes: TtlHashMap::new(RETURN_ROUTE_TTL), browser_proxy_sequence_offset: false, inbound_client_data_helper_opt: Some(Box::new(IBCDHelperReal::new())), + stream_key_purge_delay: STREAM_KEY_PURGE_DELAY, } } @@ -258,6 +283,7 @@ impl ProxyServer { stream_shutdown_sub: recipient!(addr, StreamShutdownMsg), node_from_ui: recipient!(addr, NodeFromUiMessage), route_result_sub: recipient!(addr, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(addr, MessageScheduler), } } @@ -469,11 +495,15 @@ impl ProxyServer { }) .expect("Dispatcher is dead"); if last_data { - debug!( - self.logger, - "Retiring stream key {}: no more data", &stream_key - ); - self.purge_stream_key(&stream_key); + self.subs + .as_ref() + .expect("ProxyServer Subs Unbound") + .schedule_stream_key_purge + .try_send(MessageScheduler { + scheduled_msg: StreamKeyPurge { stream_key }, + delay: self.stream_key_purge_delay, + }) + .expect("ProxyServer is dead"); } } None => { @@ -603,6 +633,10 @@ impl ProxyServer { } fn purge_stream_key(&mut self, stream_key: &StreamKey) { + debug!( + self.logger, + "Retiring stream key {}: no more data", &stream_key + ); let _ = self.keys_and_addrs.remove_a(stream_key); let _ = self.stream_key_routes.remove(stream_key); let _ = self.tunneled_hosts.remove(stream_key); @@ -1379,6 +1413,7 @@ mod tests { add_return_route: recipient!(addr, AddReturnRouteMessage), stream_shutdown_sub: recipient!(addr, StreamShutdownMsg), route_result_sub: recipient!(addr, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(addr, MessageScheduler), } } @@ -3536,6 +3571,7 @@ mod tests { init_test_logging(); let system = System::new("proxy_server_receives_response_from_hopper"); let (dispatcher, _, dispatcher_recording_arc) = make_recorder(); + let (proxy_server, _, proxy_server_recording_arc) = make_recorder(); let cryptde = main_cryptde(); let mut subject = ProxyServer::new( cryptde, @@ -3576,25 +3612,41 @@ mod tests { 0, ); let second_expired_cores_package = first_expired_cores_package.clone(); - let peer_actors = peer_actors_builder().dispatcher(dispatcher).build(); + let peer_actors = peer_actors_builder() + .dispatcher(dispatcher) + .proxy_server(proxy_server) + .build(); subject_addr.try_send(BindMessage { peer_actors }).unwrap(); subject_addr.try_send(first_expired_cores_package).unwrap(); - subject_addr.try_send(second_expired_cores_package).unwrap(); // should generate log because stream key is now unknown + subject_addr.try_send(second_expired_cores_package).unwrap(); // should shedule a new message System::current().stop(); system.run(); let dispatcher_recording = dispatcher_recording_arc.lock().unwrap(); - let record = dispatcher_recording.get_record::(0); - assert_eq!(record.endpoint, Endpoint::Socket(socket_addr)); - assert_eq!(record.last_data, true); - assert_eq!(record.data, b"16 bytes of data".to_vec()); - TestLogHandler::new().exists_log_containing(&format!("WARN: ProxyServer: Discarding 16-byte packet 12345678 from an unrecognized stream key: {:?}", stream_key)); + let transmit_data_msg = dispatcher_recording.get_record::(0); + let proxy_server_recording = proxy_server_recording_arc.lock().unwrap(); + let scheduled_msg = + proxy_server_recording.get_record::>(0); + assert_eq!(transmit_data_msg.endpoint, Endpoint::Socket(socket_addr)); + assert_eq!(transmit_data_msg.last_data, true); + assert_eq!(transmit_data_msg.data, b"16 bytes of data".to_vec()); + assert_eq!( + scheduled_msg, + &MessageScheduler { + scheduled_msg: StreamKeyPurge { stream_key }, + delay: Duration::from_secs(5) + } + ) } #[test] fn handle_client_response_payload_purges_stream_keys_for_terminal_response() { + init_test_logging(); + let test_name = "handle_client_response_payload_purges_stream_keys_for_terminal_response"; let cryptde = main_cryptde(); + let stream_key_purge_delay_in_millis = 500; + let offset_in_millis = 100; let mut subject = ProxyServer::new( cryptde, alias_cryptde(), @@ -3602,6 +3654,8 @@ mod tests { Some(STANDARD_CONSUMING_WALLET_BALANCE), false, ); + subject.stream_key_purge_delay = Duration::from_millis(stream_key_purge_delay_in_millis); + subject.logger = Logger::new(test_name); subject.subs = Some(make_proxy_server_out_subs()); let stream_key = StreamKey::make_meaningless_stream_key(); @@ -3632,9 +3686,10 @@ mod tests { stream_key: stream_key.clone(), sequenced_packet: SequencedPacket::new(vec![], 1, true), }; - let (dispatcher_mock, _, _) = make_recorder(); - let peer_actors = peer_actors_builder().dispatcher(dispatcher_mock).build(); - subject.subs.as_mut().unwrap().dispatcher = peer_actors.dispatcher.from_dispatcher_client; + let proxy_server_addr = subject.start(); + let schedule_stream_key_sub = proxy_server_addr.clone().recipient(); + let mut peer_actors = peer_actors_builder().build(); + peer_actors.proxy_server.schedule_stream_key_purge = schedule_stream_key_sub; let expired_cores_package: ExpiredCoresPackage = ExpiredCoresPackage::new( SocketAddr::from_str("1.2.3.4:1234").unwrap(), @@ -3643,12 +3698,45 @@ mod tests { client_response_payload.into(), 0, ); + let system = + System::new("handle_client_response_payload_purges_stream_keys_for_terminal_response"); + let bind_msg = BindMessage { peer_actors }; + proxy_server_addr.try_send(bind_msg).unwrap(); - subject.handle_client_response_payload(expired_cores_package); + proxy_server_addr.try_send(expired_cores_package).unwrap(); - assert!(subject.keys_and_addrs.is_empty()); - assert!(subject.stream_key_routes.is_empty()); - assert!(subject.tunneled_hosts.is_empty()); + let pre_assertions_msg = AssertionsMessage { + assertions: Box::new(move |proxy_server: &mut ProxyServer| { + assert!(!proxy_server.keys_and_addrs.is_empty()); + assert!(!proxy_server.stream_key_routes.is_empty()); + assert!(!proxy_server.tunneled_hosts.is_empty()); + }), + }; + proxy_server_addr + .try_send(MessageScheduler { + scheduled_msg: pre_assertions_msg, + delay: Duration::from_millis(stream_key_purge_delay_in_millis - offset_in_millis), + }) + .unwrap(); + let post_assertions_msg = AssertionsMessage { + assertions: Box::new(move |proxy_server: &mut ProxyServer| { + TestLogHandler::new() + .exists_log_containing(&format!( + "DEBUG: {test_name}: Retiring stream key AAAAAAAAAAAAAAAAAAAAAAAAAAA: no more data" + )); + assert!(proxy_server.keys_and_addrs.is_empty()); + assert!(proxy_server.stream_key_routes.is_empty()); + assert!(proxy_server.tunneled_hosts.is_empty()); + System::current().stop(); + }), + }; + proxy_server_addr + .try_send(MessageScheduler { + scheduled_msg: post_assertions_msg, + delay: Duration::from_millis(stream_key_purge_delay_in_millis + offset_in_millis), + }) + .unwrap(); + system.run(); } #[test] diff --git a/node/src/stream_handler_pool.rs b/node/src/stream_handler_pool.rs index 3d5512892..b80e01f92 100644 --- a/node/src/stream_handler_pool.rs +++ b/node/src/stream_handler_pool.rs @@ -105,6 +105,7 @@ impl Display for StreamWriterKey { } } +// TODO: To avoid confusion with ProxyClient's StreamHandlerPool, rename this one or the other for easy identification. // It is used to store streams for both neighbors and browser. pub struct StreamHandlerPool { stream_writers: HashMap>>>, diff --git a/node/src/sub_lib/proxy_server.rs b/node/src/sub_lib/proxy_server.rs index 78c61ee72..c3042859f 100644 --- a/node/src/sub_lib/proxy_server.rs +++ b/node/src/sub_lib/proxy_server.rs @@ -9,6 +9,7 @@ use crate::sub_lib::peer_actors::BindMessage; use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, DnsResolveFailure_0v1}; use crate::sub_lib::sequence_buffer::SequencedPacket; use crate::sub_lib::stream_key::StreamKey; +use crate::sub_lib::utils::MessageScheduler; use crate::sub_lib::versioned_data::VersionedData; use actix::Message; use actix::Recipient; @@ -68,6 +69,11 @@ pub struct AddRouteResultMessage { pub result: Result, } +#[derive(Message, Debug, PartialEq, Eq)] +pub struct StreamKeyPurge { + pub stream_key: StreamKey, +} + #[derive(Clone, PartialEq, Eq)] pub struct ProxyServerSubs { // ProxyServer will handle these messages: @@ -79,6 +85,7 @@ pub struct ProxyServerSubs { pub stream_shutdown_sub: Recipient, pub node_from_ui: Recipient, pub route_result_sub: Recipient, + pub schedule_stream_key_purge: Recipient>, } impl Debug for ProxyServerSubs { @@ -110,6 +117,7 @@ mod tests { stream_shutdown_sub: recipient!(recorder, StreamShutdownMsg), node_from_ui: recipient!(recorder, NodeFromUiMessage), route_result_sub: recipient!(recorder, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(recorder, MessageScheduler), }; assert_eq!(format!("{:?}", subject), "ProxyServerSubs"); diff --git a/node/src/sub_lib/utils.rs b/node/src/sub_lib/utils.rs index f6de206d7..d68d721bb 100644 --- a/node/src/sub_lib/utils.rs +++ b/node/src/sub_lib/utils.rs @@ -245,7 +245,7 @@ pub fn db_connection_launch_panic(err: InitializationError, data_directory: &Pat ) } -#[derive(Message, Clone, PartialEq, Eq)] +#[derive(Message, Debug, Clone, PartialEq, Eq)] pub struct MessageScheduler { pub scheduled_msg: M, pub delay: Duration, diff --git a/node/src/test_utils/recorder.rs b/node/src/test_utils/recorder.rs index 4219b2d08..da1aed944 100644 --- a/node/src/test_utils/recorder.rs +++ b/node/src/test_utils/recorder.rs @@ -39,7 +39,9 @@ use crate::sub_lib::peer_actors::PeerActors; use crate::sub_lib::peer_actors::{BindMessage, NewPublicIp, StartMessage}; use crate::sub_lib::proxy_client::{ClientResponsePayload_0v1, InboundServerData}; use crate::sub_lib::proxy_client::{DnsResolveFailure_0v1, ProxyClientSubs}; -use crate::sub_lib::proxy_server::{AddReturnRouteMessage, ClientRequestPayload_0v1}; +use crate::sub_lib::proxy_server::{ + AddReturnRouteMessage, ClientRequestPayload_0v1, StreamKeyPurge, +}; use crate::sub_lib::proxy_server::{AddRouteResultMessage, ProxyServerSubs}; use crate::sub_lib::stream_handler_pool::DispatcherNodeQueryResponse; use crate::sub_lib::stream_handler_pool::TransmitDataMsg; @@ -398,6 +400,7 @@ pub fn make_proxy_server_subs_from_recorder(addr: &Addr) -> ProxyServe stream_shutdown_sub: recipient!(addr, StreamShutdownMsg), node_from_ui: recipient!(addr, NodeFromUiMessage), route_result_sub: recipient!(addr, AddRouteResultMessage), + schedule_stream_key_purge: recipient!(addr, MessageScheduler), } } diff --git a/node/tests/connection_shutdown_test.rs b/node/tests/connection_shutdown_test.rs new file mode 100644 index 000000000..4dd8495be --- /dev/null +++ b/node/tests/connection_shutdown_test.rs @@ -0,0 +1,80 @@ +// Copyright (c) 2019, MASQ (https://masq.ai) and/or its affiliates. All rights reserved. + +pub mod utils; + +use crossbeam_channel::{unbounded, Sender}; +use masq_lib::utils::find_free_port; +use std::io::{Read, Write}; +use std::net::{IpAddr, TcpListener, TcpStream}; +use std::net::{Shutdown, SocketAddr}; +use std::str::FromStr; +use std::time::Duration; +use std::{io, thread}; + +// 'node' below must not be named '_' alone or disappear, or the MASQNode will be immediately reclaimed. +#[test] +fn proxy_client_stream_reader_dies_when_client_stream_is_killed_integration() { + let _node = utils::MASQNode::start_standard( + "proxy_client_stream_reader_dies_when_client_stream_is_killed_integration", + None, + true, + true, + false, + true, + ); + let (server_write_error_tx, server_write_error_rx) = unbounded(); + let server_port = find_free_port(); + let join_handle = thread::spawn(move || { + endless_write_server(server_port, server_write_error_tx); + }); + let mut stream = TcpStream::connect(SocketAddr::from_str("127.0.0.1:80").unwrap()).unwrap(); + stream + .set_read_timeout(Some(Duration::from_millis(1000))) + .unwrap(); + let request = format!("GET / HTTP/1.1\r\nHost: 127.0.0.1:{server_port}\r\n\r\n"); + stream.write(request.as_bytes()).unwrap(); + let mut buf = [0u8; 16384]; + // We want to make sure the Server is sending before we shutdown the stream + stream.read(&mut buf).unwrap(); + + stream.shutdown(Shutdown::Write).unwrap(); + + let write_error = server_write_error_rx + .recv_timeout(Duration::from_secs(60)) + .unwrap(); + if cfg!(target_os = "macos") { + assert_eq!(write_error.kind(), io::ErrorKind::BrokenPipe); + } else { + assert_eq!(write_error.kind(), io::ErrorKind::ConnectionReset); + } + + join_handle.join().unwrap(); +} + +fn endless_write_server(port: u16, write_error_tx: Sender) { + let listener = TcpListener::bind(SocketAddr::new( + IpAddr::from_str("127.0.0.1").unwrap(), + port, + )) + .unwrap(); + let mut buf = [0u8; 16_384]; + let (mut stream, _) = listener.accept().unwrap(); + stream + .set_write_timeout(Some(Duration::from_secs(1))) + .unwrap(); + let _ = stream.read(&mut buf).unwrap(); + stream + .write("HTTP/1.1 200 OK\r\nTransfer-Encoding: chunked\r\n\r\n".as_bytes()) + .unwrap(); + let msg = "Chancellor on brink of second bailout for banks"; + let msg_len = msg.len(); + let chunk_body = format!("{msg_len}\r\n{msg}\r\n"); + loop { + if let Err(e) = stream.write(chunk_body.as_bytes()) { + write_error_tx.send(e).unwrap(); + break; + } + + thread::sleep(Duration::from_millis(250)); + } +}