@@ -53,12 +53,15 @@ const REQUEST_BACKOFF: [Duration; 4] = [
5353 Duration :: from_secs ( 64 )
5454] ;
5555
56+ const NODE_LAST_SEEN_TIMEOUT : Duration = Duration :: from_secs ( 24 * 60 * 60 ) ;
57+
5658#[ derive( Clone , Debug ) ]
5759pub struct NodeEntry {
5860 pub id : NodeId ,
5961 pub endpoint : NodeEndpoint ,
6062}
6163
64+ #[ derive( Debug ) ]
6265pub struct BucketEntry {
6366 pub address : NodeEntry ,
6467 pub id_hash : H256 ,
@@ -89,6 +92,12 @@ struct FindNodeRequest {
8992 answered : bool ,
9093}
9194
95+ #[ derive( Clone , Copy ) ]
96+ enum PingReason {
97+ Default ,
98+ FromDiscoveryRequest ( NodeId )
99+ }
100+
92101struct PingRequest {
93102 // Time when the request was sent
94103 sent_at : Instant ,
@@ -99,8 +108,10 @@ struct PingRequest {
99108 // The hash Parity used to respond with (until rev 01f825b0e1f1c4c420197b51fc801cbe89284b29)
100109 #[ deprecated( ) ]
101110 deprecated_echo_hash : H256 ,
111+ reason : PingReason
102112}
103113
114+ #[ derive( Debug ) ]
104115pub struct NodeBucket {
105116 nodes : VecDeque < BucketEntry > , //sorted by last active
106117}
@@ -178,7 +189,7 @@ impl<'a> Discovery<'a> {
178189 if self . node_buckets [ dist] . nodes . iter ( ) . any ( |n| n. id_hash == id_hash) {
179190 return ;
180191 }
181- self . try_ping ( e) ;
192+ self . try_ping ( e, PingReason :: Default ) ;
182193 }
183194 }
184195
@@ -221,7 +232,7 @@ impl<'a> Discovery<'a> {
221232 } else { None }
222233 } ;
223234 if let Some ( node) = ping {
224- self . try_ping ( node) ;
235+ self . try_ping ( node, PingReason :: Default ) ;
225236 }
226237 Some ( TableUpdates { added : added_map, removed : HashSet :: new ( ) } )
227238 }
@@ -244,7 +255,7 @@ impl<'a> Discovery<'a> {
244255 fn update_new_nodes ( & mut self ) {
245256 while self . in_flight_pings . len ( ) < MAX_NODES_PING {
246257 match self . adding_nodes . pop ( ) {
247- Some ( next) => self . try_ping ( next) ,
258+ Some ( next) => self . try_ping ( next, PingReason :: Default ) ,
248259 None => break ,
249260 }
250261 }
@@ -298,7 +309,7 @@ impl<'a> Discovery<'a> {
298309 None // a and b are equal, so log distance is -inf
299310 }
300311
301- fn try_ping ( & mut self , node : NodeEntry ) {
312+ fn try_ping ( & mut self , node : NodeEntry , reason : PingReason ) {
302313 if !self . is_allowed ( & node) {
303314 trace ! ( target: "discovery" , "Node {:?} not allowed" , node) ;
304315 return ;
@@ -313,7 +324,7 @@ impl<'a> Discovery<'a> {
313324 }
314325
315326 if self . in_flight_pings . len ( ) < MAX_NODES_PING {
316- self . ping ( & node)
327+ self . ping ( & node, reason )
317328 . unwrap_or_else ( |e| {
318329 warn ! ( target: "discovery" , "Error sending Ping packet: {:?}" , e) ;
319330 } ) ;
@@ -322,7 +333,7 @@ impl<'a> Discovery<'a> {
322333 }
323334 }
324335
325- fn ping ( & mut self , node : & NodeEntry ) -> Result < ( ) , Error > {
336+ fn ping ( & mut self , node : & NodeEntry , reason : PingReason ) -> Result < ( ) , Error > {
326337 let mut rlp = RlpStream :: new_list ( 4 ) ;
327338 rlp. append ( & PROTOCOL_VERSION ) ;
328339 self . public_endpoint . to_rlp_list ( & mut rlp) ;
@@ -336,6 +347,7 @@ impl<'a> Discovery<'a> {
336347 node : node. clone ( ) ,
337348 echo_hash : hash,
338349 deprecated_echo_hash : old_parity_hash,
350+ reason : reason
339351 } ) ;
340352
341353 trace ! ( target: "discovery" , "Sent Ping to {:?} ; node_id={:#x}" , & node. endpoint, node. id) ;
@@ -514,7 +526,7 @@ impl<'a> Discovery<'a> {
514526 if request. deprecated_echo_hash == echo_hash {
515527 trace ! ( target: "discovery" , "Got Pong from an old parity-ethereum version." ) ;
516528 }
517- Some ( request. node . clone ( ) )
529+ Some ( ( request. node . clone ( ) , request . reason . clone ( ) ) )
518530 }
519531 } ;
520532
@@ -528,29 +540,70 @@ impl<'a> Discovery<'a> {
528540 } ,
529541 } ;
530542
531- if let Some ( node) = expected_node {
543+ if let Some ( ( node, ping_reason) ) = expected_node {
544+ if let PingReason :: FromDiscoveryRequest ( target) = ping_reason {
545+ self . respond_with_discovery ( target, & node) ?;
546+ }
532547 Ok ( self . update_node ( node) )
533548 } else {
534549 debug ! ( target: "discovery" , "Got unexpected Pong from {:?} ; request not found" , & from) ;
535550 Ok ( None )
536551 }
537552 }
538553
539- fn on_find_node ( & mut self , rlp : & Rlp , _node : & NodeId , from : & SocketAddr ) -> Result < Option < TableUpdates > , Error > {
554+ fn on_find_node ( & mut self , rlp : & Rlp , node_id : & NodeId , from : & SocketAddr ) -> Result < Option < TableUpdates > , Error > {
540555 trace ! ( target: "discovery" , "Got FindNode from {:?}" , & from) ;
541556 let target: NodeId = rlp. val_at ( 0 ) ?;
542557 let timestamp: u64 = rlp. val_at ( 1 ) ?;
543558 self . check_timestamp ( timestamp) ?;
559+
560+ let node = NodeEntry {
561+ id : node_id. clone ( ) ,
562+ endpoint : NodeEndpoint {
563+ address : * from,
564+ udp_port : from. port ( )
565+ }
566+ } ;
567+
568+ if self . is_a_valid_known_node ( & node) {
569+ self . respond_with_discovery ( target, & node) ?;
570+ } else {
571+ // Make sure the request source is actually there and responds to pings before actually responding
572+ self . try_ping ( node, PingReason :: FromDiscoveryRequest ( target) ) ;
573+ }
574+ Ok ( None )
575+ }
576+
577+ fn is_a_valid_known_node ( & self , node : & NodeEntry ) -> bool {
578+ let id_hash = keccak ( node. id ) ;
579+ let dist = match Discovery :: distance ( & self . id_hash , & id_hash) {
580+ Some ( dist) => dist,
581+ None => {
582+ debug ! ( target: "discovery" , "Got an incoming discovery request from self: {:?}" , node) ;
583+ return false ;
584+ }
585+ } ;
586+
587+ let bucket = & self . node_buckets [ dist] ;
588+ if let Some ( known_node) = bucket. nodes . iter ( ) . find ( |n| n. address . id == node. id ) {
589+ debug ! ( target: "discovery" , "Found a known node in a bucket when processing discovery: {:?}/{:?}" , known_node, node) ;
590+ ( known_node. address . endpoint == node. endpoint ) && ( known_node. last_seen . elapsed ( ) < NODE_LAST_SEEN_TIMEOUT )
591+ } else {
592+ false
593+ }
594+ }
595+
596+ fn respond_with_discovery ( & mut self , target : NodeId , node : & NodeEntry ) -> Result < ( ) , Error > {
544597 let nearest = self . nearest_node_entries ( & target) ;
545598 if nearest. is_empty ( ) {
546- return Ok ( None ) ;
599+ return Ok ( ( ) ) ;
547600 }
548601 let mut packets = Discovery :: prepare_neighbours_packets ( & nearest) ;
549602 for p in packets. drain ( ..) {
550- self . send_packet ( PACKET_NEIGHBOURS , from , & p) ?;
603+ self . send_packet ( PACKET_NEIGHBOURS , & node . endpoint . address , & p) ?;
551604 }
552- trace ! ( target: "discovery" , "Sent {} Neighbours to {:?}" , nearest. len( ) , & from ) ;
553- Ok ( None )
605+ trace ! ( target: "discovery" , "Sent {} Neighbours to {:?}" , nearest. len( ) , & node . endpoint ) ;
606+ Ok ( ( ) )
554607 }
555608
556609 fn prepare_neighbours_packets ( nearest : & [ NodeEntry ] ) -> Vec < Bytes > {
@@ -825,17 +878,17 @@ mod tests {
825878 }
826879
827880 // After 4 discovery rounds, the first one should have learned about the rest.
828- for _round in 0 .. 4 {
881+ for _round in 0 .. 5 {
829882 discovery_handlers[ 0 ] . round ( ) ;
830883
831884 let mut continue_loop = true ;
832885 while continue_loop {
833886 continue_loop = false ;
834887
835888 // Process all queued messages.
836- for i in 0 .. 5 {
837- let src = discovery_handlers[ i] . public_endpoint . address . clone ( ) ;
838- while let Some ( datagram) = discovery_handlers[ i] . dequeue_send ( ) {
889+ for i in 0 .. 20 {
890+ let src = discovery_handlers[ i% 5 ] . public_endpoint . address . clone ( ) ;
891+ while let Some ( datagram) = discovery_handlers[ i% 5 ] . dequeue_send ( ) {
839892 let dest = discovery_handlers. iter_mut ( )
840893 . find ( |disc| datagram. address == disc. public_endpoint . address )
841894 . unwrap ( ) ;
@@ -927,14 +980,14 @@ mod tests {
927980 let mut discovery = Discovery { request_backoff : & request_backoff, ..discovery } ;
928981
929982 for _ in 0 ..2 {
930- discovery. ping ( & node_entries[ 101 ] ) . unwrap ( ) ;
983+ discovery. ping ( & node_entries[ 101 ] , PingReason :: Default ) . unwrap ( ) ;
931984 let num_nodes = total_bucket_nodes ( & discovery. node_buckets ) ;
932985 discovery. check_expired ( Instant :: now ( ) + PING_TIMEOUT ) ;
933986 let removed = num_nodes - total_bucket_nodes ( & discovery. node_buckets ) ;
934987 assert_eq ! ( removed, 0 ) ;
935988 }
936989
937- discovery. ping ( & node_entries[ 101 ] ) . unwrap ( ) ;
990+ discovery. ping ( & node_entries[ 101 ] , PingReason :: Default ) . unwrap ( ) ;
938991 let num_nodes = total_bucket_nodes ( & discovery. node_buckets ) ;
939992 discovery. check_expired ( Instant :: now ( ) + PING_TIMEOUT ) ;
940993 let removed = num_nodes - total_bucket_nodes ( & discovery. node_buckets ) ;
@@ -1121,7 +1174,7 @@ mod tests {
11211174 let mut discovery1 = Discovery :: new ( & key1, ep1. clone ( ) , IpFilter :: default ( ) ) ;
11221175 let mut discovery2 = Discovery :: new ( & key2, ep2. clone ( ) , IpFilter :: default ( ) ) ;
11231176
1124- discovery1. ping ( & NodeEntry { id : discovery2. id , endpoint : ep2. clone ( ) } ) . unwrap ( ) ;
1177+ discovery1. ping ( & NodeEntry { id : discovery2. id , endpoint : ep2. clone ( ) } , PingReason :: Default ) . unwrap ( ) ;
11251178 let ping_data = discovery1. dequeue_send ( ) . unwrap ( ) ;
11261179 assert ! ( !discovery1. any_sends_queued( ) ) ;
11271180 let data = & ping_data. payload [ ( 32 + 65 ) ..] ;
0 commit comments