From 2c4ef27387fe993358dd403114e965e838d6bf37 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Mon, 22 Feb 2021 13:57:44 +0900 Subject: [PATCH 1/8] Fix logic of replacement cache check [skip changelog] --- Libplanet/Net/Protocols/KademliaProtocol.cs | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index e260eeaf8fd..4552dcc2513 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -292,17 +292,14 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken try { _logger.Verbose("Check peer {Peer}.", replacement); - - await PingAsync(replacement, _requestTimeout, cancellationToken); _table.RemoveCache(replacement); - Update(replacement); + await PingAsync(replacement, _requestTimeout, cancellationToken); } catch (PingTimeoutException) { _logger.Verbose( "Remove stale peer {Peer} from replacement cache.", replacement); - _table.RemoveCache(replacement); } } } From 5ef88e3618654e8a8098fac0f89e13c638a856ec Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Mon, 22 Feb 2021 16:45:21 +0900 Subject: [PATCH 2/8] Add missing timeout attribute to some tests [skip changelog] --- Libplanet.Tests/Net/SwarmTest.Preload.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Libplanet.Tests/Net/SwarmTest.Preload.cs b/Libplanet.Tests/Net/SwarmTest.Preload.cs index 24927888cf2..b74c496f652 100644 --- a/Libplanet.Tests/Net/SwarmTest.Preload.cs +++ b/Libplanet.Tests/Net/SwarmTest.Preload.cs @@ -522,7 +522,7 @@ await receiverSwarm.PreloadAsync( Assert.Equal(swarm0.BlockChain.BlockHashes, receiverSwarm.BlockChain.BlockHashes); } - [Theory] + [Theory(Timeout = Timeout)] [InlineData(0)] [InlineData(50)] [InlineData(100)] From 3167f31850ccc6f5d1c86f4f7ff9c3b4349d1c0b Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Wed, 24 Feb 2021 14:13:46 +0900 Subject: [PATCH 3/8] Add peer to table only when it answered to the ping --- Libplanet.Tests/Net/Protocols/ProtocolTest.cs | 32 +-------- Libplanet.Tests/Net/SwarmTest.Broadcast.cs | 4 +- Libplanet.Tests/Net/SwarmTest.cs | 47 ++++++------- Libplanet/Net/Protocols/KBucket.cs | 67 +++++++++++-------- Libplanet/Net/Protocols/KademliaProtocol.cs | 58 +++++++++++----- Libplanet/Net/Protocols/RoutingTable.cs | 16 +++++ 6 files changed, 120 insertions(+), 104 deletions(-) diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs index dedbf310661..dfe9a04cdca 100644 --- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs +++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs @@ -82,39 +82,9 @@ public async Task Ping() await transportA.MessageReceived.WaitAsync(); await Task.Delay(100); - Assert.Single(transportA.ReceivedMessages); - Assert.Single(transportB.ReceivedMessages); - Assert.Contains(transportA.AsPeer, transportB.Peers); - } - finally - { - await transportA.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - } - } - - [Fact(Timeout = Timeout)] - public async Task PingTwice() - { - var transportA = CreateTestTransport(); - var transportB = CreateTestTransport(); - - try - { - await StartTestTransportAsync(transportA); - await StartTestTransportAsync(transportB); - - transportA.SendPing(transportB.AsPeer); - await transportA.MessageReceived.WaitAsync(); - await transportB.MessageReceived.WaitAsync(); - transportB.SendPing(transportA.AsPeer); - await transportA.MessageReceived.WaitAsync(); - await transportB.MessageReceived.WaitAsync(); - Assert.Equal(2, transportA.ReceivedMessages.Count); Assert.Equal(2, transportB.ReceivedMessages.Count); Assert.Contains(transportA.AsPeer, transportB.Peers); - Assert.Contains(transportB.AsPeer, transportA.Peers); } finally { @@ -181,7 +151,9 @@ public async Task BootstrapAsyncTest() await StartTestTransportAsync(transportC); await transportB.BootstrapAsync(new[] { transportA.AsPeer }); + await Task.Delay(100); await transportC.BootstrapAsync(new[] { transportA.AsPeer }); + await Task.Delay(100); Assert.Contains(transportB.AsPeer, transportC.Peers); Assert.Contains(transportC.AsPeer, transportB.Peers); diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index c40a9b824a1..15f12dee761 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -107,7 +107,7 @@ public async Task BroadcastIgnoreFromDifferentGenesisHash() await StartAsync(receiverSwarm); await StartAsync(seedSwarm); - await receiverSwarm.AddPeersAsync(new[] { seedSwarm.AsPeer }, null); + await seedSwarm.AddPeersAsync(new[] { receiverSwarm.AsPeer }, null); Block block = await seedChain.MineBlock(seedSwarm.Address); seedSwarm.BroadcastBlock(block); while (!((NetMQTransport)receiverSwarm.Transport).MessageHistory @@ -717,7 +717,7 @@ public async Task IgnoreExistingBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmB, swarmA.AsPeer); + await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); swarmA.BroadcastBlock(chainA[-1]); await swarmB.BlockAppended.WaitAsync(); diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 018b1087ca3..a81f2502f86 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -96,20 +96,20 @@ public async Task HandleReconnection() { Swarm seed = CreateSwarm(); - Swarm swarmA = CreateSwarm(); - Swarm swarmB = CreateSwarm(); + var privateKey = new PrivateKey(); + Swarm swarmA = CreateSwarm(privateKey); + Swarm swarmB = CreateSwarm(privateKey); try { await StartAsync(seed); await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + await seed.AddPeersAsync(new[] { swarmA.AsPeer }, null); await StopAsync(swarmA); - await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); + await seed.AddPeersAsync(new[] { swarmB.AsPeer }, null); Assert.Contains(swarmB.AsPeer, seed.Peers); - Assert.Contains(seed.AsPeer, swarmB.Peers); } finally { @@ -183,7 +183,6 @@ public async Task AddPeersWithoutStart() } finally { - await StopAsync(a); await StopAsync(b); } } @@ -201,7 +200,6 @@ public async Task AddPeersAsync() await a.AddPeersAsync(new Peer[] { b.AsPeer }, null); - Assert.Contains(a.AsPeer, b.Peers); Assert.Contains(b.AsPeer, a.Peers); } finally @@ -273,7 +271,7 @@ public async Task AutoConnectAfterStart() Assert.Empty(swarmB.Peers); await StartAsync(swarmA); - await Task.Delay(100); + await Task.Delay(1000); Assert.Contains(swarmA.AsPeer, swarmB.Peers); } finally @@ -574,23 +572,17 @@ public async Task ExchangeWithIceServer() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + // Public-Private + await seed.AddPeersAsync(new[] { swarmA.AsPeer }, null); + Assert.Contains(swarmA.AsPeer, seed.Peers); + + // Private-Public await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + Assert.Contains(seed.AsPeer, swarmB.Peers); - Assert.Equal( - new HashSet - { - swarmA.AsPeer as BoundPeer, - swarmB.AsPeer as BoundPeer, - }, - seed.Peers.ToHashSet()); - Assert.Equal( - new HashSet { seed.AsPeer as BoundPeer, swarmB.AsPeer as BoundPeer }, - swarmA.Peers.ToHashSet()); - Assert.Equal( - new HashSet { seed.AsPeer as BoundPeer, swarmA.AsPeer as BoundPeer }, - swarmB.Peers.ToHashSet()); + // Private-Private + await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + Assert.Contains(swarmB.AsPeer, swarmA.Peers); } finally { @@ -726,7 +718,7 @@ public async Task RemoveForkedChainWhenFillBlocksAsyncFail() { await StartAsync(swarm1); await StartAsync(swarm2); - await swarm1.AddPeersAsync(new[] { swarm2.AsPeer }, null); + await swarm2.AddPeersAsync(new[] { swarm1.AsPeer }, null); swarm2.BroadcastBlock(block3); await swarm1.FillBlocksAsyncFailed.WaitAsync(); @@ -1177,7 +1169,7 @@ public async Task DoNotDeleteCanonicalChainWhenBlockDownloadFailed() { await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmA, swarmB.AsPeer); + await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); swarmA.BroadcastBlock(block); await swarmB.FillBlocksAsyncStarted.WaitAsync(); @@ -1246,8 +1238,8 @@ BlockChain MakeGenesisChain( await StartAsync(swarmB); await StartAsync(swarmC); - await swarmB.AddPeersAsync(new[] { swarmA.AsPeer }, null); - await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + await swarmA.AddPeersAsync(new[] { swarmC.AsPeer }, null); var block = await swarmA.BlockChain.MineBlock(swarmA.Address); @@ -1665,6 +1657,7 @@ public async Task Restart() // Check await swarm1.CheckAllPeersAsync(); await swarm2.CheckAllPeersAsync(); + await Task.Delay(1000); Assert.Contains(swarm1.AsPeer, swarm2.Peers); Assert.Contains(swarm2.AsPeer, swarm1.Peers); diff --git a/Libplanet/Net/Protocols/KBucket.cs b/Libplanet/Net/Protocols/KBucket.cs index ae55561b4e1..460eb0ea831 100644 --- a/Libplanet/Net/Protocols/KBucket.cs +++ b/Libplanet/Net/Protocols/KBucket.cs @@ -75,54 +75,63 @@ public void AddPeer(BoundPeer peer) var updated = DateTimeOffset.UtcNow; - if (_peerStates.ContainsKey(peer.Address)) + if (Update(peer)) { _logger.Verbose("Bucket already contains peer {Peer}", peer); + return; + } - // This done because peer's other attribute except public key might be changed. - // (eg. public IP address, endpoint) - PeerState state = _peerStates[peer.Address]; - state.Peer = peer; - state.LastUpdated = updated; + if (IsFull()) + { + _logger.Verbose("Bucket is full to add peer {Peer}", peer); + if (ReplacementCache.TryAdd(peer, updated)) + { + _logger.Verbose( + "Added {Peer} to replacement cache. (total: {Count})", + peer, + ReplacementCache.Count); + } } else { - if (IsFull()) + _logger.Verbose("Bucket does not contains peer {Peer}", peer); + _lastUpdated = updated; + if (_peerStates.TryAdd(peer.Address, new PeerState(peer, updated))) { - _logger.Verbose("Bucket is full to add peer {Peer}", peer); - if (ReplacementCache.TryAdd(peer, updated)) + _logger.Verbose("Peer {Peer} is added to bucket", peer); + _lastUpdated = updated; + + if (ReplacementCache.TryRemove(peer, out var dateTimeOffset)) { _logger.Verbose( - "Added {Peer} to replacement cache. (total: {Count})", + "Removed peer {Peer} from replacement cache. (total: {Count})", peer, ReplacementCache.Count); } } else { - _logger.Verbose("Bucket does not contains peer {Peer}", peer); - _lastUpdated = updated; - if (_peerStates.TryAdd(peer.Address, new PeerState(peer, updated))) - { - _logger.Verbose("Peer {Peer} is added to bucket", peer); - _lastUpdated = updated; - - if (ReplacementCache.TryRemove(peer, out var dateTimeOffset)) - { - _logger.Verbose( - "Removed peer {Peer} from replacement cache. (total: {Count})", - peer, - ReplacementCache.Count); - } - } - else - { - _logger.Verbose("Failed to add peer {Peer} to bucket", peer); - } + _logger.Verbose("Failed to add peer {Peer} to bucket", peer); } } } + public bool Update(BoundPeer peer) + { + if (!Contains(peer)) + { + _logger.Verbose("Bucket does not contains {Peer}. Failed to update.", peer); + return false; + } + + // This done because peer's other attribute except public key might be changed. + // (eg. public IP address, endpoint) + PeerState state = _peerStates[peer.Address]; + state.Peer = peer; + state.LastUpdated = DateTimeOffset.UtcNow; + return true; + } + public bool Contains(BoundPeer peer) { return _peerStates.ContainsKey(peer.Address); diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index 4552dcc2513..3f3bdc27650 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -26,6 +26,9 @@ public class KademliaProtocol : IProtocol private readonly ILogger _logger; + private readonly ConcurrentDictionary _cache; + private readonly int _cacheSize; + /// /// Creates a instance. /// @@ -57,6 +60,8 @@ public KademliaProtocol( requestTimeout ?? TimeSpan.FromMilliseconds(5000); _transport.ProcessMessageHandler += ProcessMessageHandler; + _cache = new ConcurrentDictionary(); + _cacheSize = int.MaxValue; } /// @@ -180,7 +185,6 @@ public async Task AddPeersAsync( { _logger.Debug( $"Timeout occurred during {nameof(AddPeersAsync)}() after {timeout}."); - throw; } catch (TaskCanceledException) { @@ -439,6 +443,14 @@ internal async Task PingAsync( try { _logger.Verbose("Trying to ping async to {Peer}.", target); + if (_cache.ContainsKey(target.Address) || _cache.Count > _cacheSize) + { + _logger.Verbose( + "Request already in cache or cache size limit exceeded. Ping cancelled."); + return; + } + + _cache[target.Address] = target; Message reply = await _transport.SendMessageWithReplyAsync( target, new Ping(), @@ -456,7 +468,7 @@ internal async Task PingAsync( throw new InvalidMessageException("Cannot receive pong from self", pong); } - Update(target); + _table.AddPeer(target); } catch (TimeoutException) { @@ -476,6 +488,10 @@ internal async Task PingAsync( $"Different AppProtocolVersion encountered at {nameof(PingAsync)}()."); throw; } + finally + { + _cache.TryRemove(target.Address, out _); + } } private void ProcessMessageHandler(object target, Message message) @@ -530,26 +546,32 @@ private async Task ValidateAsync( /// /// Updates routing table when receiving a message. If corresponding bucket - /// for remote peer is not full, just adds given . + /// for remote peer is not full, just adds given . /// Otherwise, checks aliveness of the least recently used (LRU) peer - /// and determine evict LRU peer or discard given . + /// and determine evict LRU peer or discard given . /// - /// to update. - /// - /// Thrown when is null. + /// to update. + /// + /// Thrown when is null or it is not a + /// . /// - private void Update(Peer rawPeer) + private void Update(Peer peer) { - _logger.Verbose($"Try to {nameof(Update)}() {{Peer}}.", rawPeer); - if (rawPeer is null) + _logger.Verbose($"Try to {nameof(Update)}() {{Peer}}.", peer); + if (!(peer is BoundPeer boundPeer)) { - throw new ArgumentNullException(nameof(rawPeer)); + _logger.Verbose("Given peer {Peer} is not bounded.", peer); + return; } - if (rawPeer is BoundPeer peer) + // Don't update peer without endpoint or with different appProtocolVersion. + if (_table.Contains(boundPeer)) + { + _table.UpdatePeer(boundPeer); + } + else { - // Don't update peer without endpoint or with different appProtocolVersion. - _table.AddPeer(peer); + _ = PingAsync(boundPeer, _requestTimeout, default); } } @@ -670,10 +692,14 @@ private async Task> GetNeighbors( // Send pong back to remote private void ReceivePing(Ping ping) { + if (ping.Remote is null) + { + throw new ArgumentNullException(nameof(ping.Remote)); + } + if (ping.Remote.Address.Equals(_address)) { - throw new ArgumentException( - "Cannot receive ping from self"); + throw new ArgumentException("Cannot receive ping from self"); } var pong = new Pong diff --git a/Libplanet/Net/Protocols/RoutingTable.cs b/Libplanet/Net/Protocols/RoutingTable.cs index 038c695232a..7e8a8d989b7 100644 --- a/Libplanet/Net/Protocols/RoutingTable.cs +++ b/Libplanet/Net/Protocols/RoutingTable.cs @@ -132,6 +132,22 @@ public void AddPeer(BoundPeer peer) BucketOf(peer).AddPeer(peer); } + public void UpdatePeer(BoundPeer peer) + { + if (peer is null) + { + throw new ArgumentNullException(nameof(peer)); + } + + if (peer.Address.Equals(_address)) + { + throw new ArgumentException("Cannot add update self."); + } + + _logger.Debug("Updating peer {Peer} from routing table.", peer); + BucketOf(peer).Update(peer); + } + public bool RemovePeer(BoundPeer peer) { if (peer is null) From 50bcea0186f184300e29bd2e0c198daafab9da13 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Wed, 24 Feb 2021 14:48:50 +0900 Subject: [PATCH 4/8] TestTransport to identify peers as its IP address and port, not public key --- Libplanet.Tests/Net/Protocols/ProtocolTest.cs | 14 +++++++-- .../Net/Protocols/TestTransport.cs | 31 +++++++++++++------ 2 files changed, 32 insertions(+), 13 deletions(-) diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs index dfe9a04cdca..7417d9514c6 100644 --- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs +++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; using Libplanet.Crypto; @@ -15,7 +16,9 @@ namespace Libplanet.Tests.Net.Protocols public class ProtocolTest { private const int Timeout = 60 * 1000; - private readonly Dictionary _transports; + private readonly Dictionary _transports; + + private int _assignedPort; public ProtocolTest(ITestOutputHelper output) { @@ -28,7 +31,8 @@ public ProtocolTest(ITestOutputHelper output) .CreateLogger() .ForContext(); - _transports = new Dictionary(); + _transports = new Dictionary(); + _assignedPort = 1234; } [Fact] @@ -362,7 +366,7 @@ public async Task BroadcastGuarantee() }); var seed = CreateTestTransport(privateKey0); - var t1 = CreateTestTransport(privateKey1, true); + var t1 = CreateTestTransport(privateKey1, blockBroadcast: true); var t2 = CreateTestTransport(privateKey2); await StartTestTransportAsync(seed); await StartTestTransportAsync(t1); @@ -460,6 +464,8 @@ public async Task DoNotBroadcastToSourcePeer() private TestTransport CreateTestTransport( PrivateKey privateKey = null, + string host = null, + int? listenPort = null, bool blockBroadcast = false, int tableSize = Kademlia.TableSize, int bucketSize = Kademlia.BucketSize, @@ -468,6 +474,8 @@ private TestTransport CreateTestTransport( return new TestTransport( _transports, privateKey ?? new PrivateKey(), + host ?? IPAddress.Loopback.ToString(), + listenPort ?? _assignedPort++, blockBroadcast, tableSize, bucketSize, diff --git a/Libplanet.Tests/Net/Protocols/TestTransport.cs b/Libplanet.Tests/Net/Protocols/TestTransport.cs index 32f216b2818..22492050550 100644 --- a/Libplanet.Tests/Net/Protocols/TestTransport.cs +++ b/Libplanet.Tests/Net/Protocols/TestTransport.cs @@ -21,13 +21,15 @@ internal class TestTransport : ITransport private static readonly AppProtocolVersion AppProtocolVersion = AppProtocolVersion.Sign(VersionSigner, 1); - private readonly Dictionary _transports; + private readonly Dictionary _transports; private readonly ILogger _logger; - private readonly ConcurrentDictionary _peersToReply; + private readonly ConcurrentDictionary _addressToReply; private readonly ConcurrentDictionary _replyToReceive; private readonly AsyncCollection _requests; private readonly List _ignoreTestMessageWithData; private readonly PrivateKey _privateKey; + private readonly string _host; + private readonly int _listenPort; private readonly Random _random; private readonly bool _blockBroadcast; @@ -35,25 +37,29 @@ internal class TestTransport : ITransport private TimeSpan _networkDelay; public TestTransport( - Dictionary transports, + Dictionary transports, PrivateKey privateKey, + string host, + int listenPort, bool blockBroadcast, int tableSize, int bucketSize, TimeSpan? networkDelay) { _privateKey = privateKey; + _host = host; + _listenPort = listenPort; _blockBroadcast = blockBroadcast; var loggerId = _privateKey.ToAddress().ToHex(); _logger = Log.ForContext() .ForContext("Address", loggerId); - _peersToReply = new ConcurrentDictionary(); + _addressToReply = new ConcurrentDictionary(); _replyToReceive = new ConcurrentDictionary(); ReceivedMessages = new ConcurrentBag(); MessageReceived = new AsyncAutoResetEvent(); _transports = transports; - _transports[privateKey.ToAddress()] = this; + _transports[ToAddress(AsPeer as BoundPeer)] = this; _networkDelay = networkDelay ?? TimeSpan.Zero; _requests = new AsyncCollection(); _ignoreTestMessageWithData = new List(); @@ -70,7 +76,7 @@ public TestTransport( public Peer AsPeer => new BoundPeer( _privateKey.PublicKey, - new DnsEndPoint("localhost", 1234)); + new DnsEndPoint(_host, _listenPort)); public IEnumerable Peers => Table.Peers; @@ -390,8 +396,8 @@ public void ReplyMessage(Message message) Task.Run(async () => { await Task.Delay(_networkDelay); - _transports[_peersToReply[message.Identity]].ReceiveReply(message); - _peersToReply.TryRemove(message.Identity, out Address addr); + _transports[_addressToReply[message.Identity]].ReceiveReply(message); + _addressToReply.TryRemove(message.Identity, out string addr); }); } @@ -420,6 +426,11 @@ public bool ReceivedTestMessageOfData(string data) return ReceivedMessages.OfType().Any(msg => msg.Data == data); } + private string ToAddress(BoundPeer peer) + { + return $"{peer.EndPoint.Host}:{peer.EndPoint.Port}"; + } + private void ReceiveMessage(Message message) { if (_swarmCancellationTokenSource.IsCancellationRequested) @@ -451,7 +462,7 @@ private void ReceiveMessage(Message message) } else { - _peersToReply[message.Identity] = boundPeer.Address; + _addressToReply[message.Identity] = ToAddress(boundPeer); } if (message is Ping) @@ -487,7 +498,7 @@ private async Task ProcessRuntime(CancellationToken cancellationToken) req.Message, req.Message.Identity, req.Target); - _transports[req.Target.Address].ReceiveMessage(req.Message); + _transports[ToAddress(req.Target)].ReceiveMessage(req.Message); } else { From 72f3ed5edcd649a6e82954fac71b894a69d7eb94 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Wed, 24 Feb 2021 16:27:53 +0900 Subject: [PATCH 5/8] Replace Swarm.AddPeersAsync to Swarm.AddPeers --- Libplanet.Benchmarks/SwarmBenchmark.cs | 3 +- .../Net/SwarmTest.AppProtocolVersion.cs | 32 ++++--- Libplanet.Tests/Net/SwarmTest.Broadcast.cs | 36 ++++---- Libplanet.Tests/Net/SwarmTest.Preload.cs | 31 +++---- Libplanet.Tests/Net/SwarmTest.cs | 87 +++++-------------- Libplanet/Net/Swarm.cs | 25 +++--- 6 files changed, 92 insertions(+), 122 deletions(-) diff --git a/Libplanet.Benchmarks/SwarmBenchmark.cs b/Libplanet.Benchmarks/SwarmBenchmark.cs index ebb31b270cb..798bc1ef439 100644 --- a/Libplanet.Benchmarks/SwarmBenchmark.cs +++ b/Libplanet.Benchmarks/SwarmBenchmark.cs @@ -75,8 +75,7 @@ public void InitializeSwarms() for (int i = 0; i < SwarmNumber - 1; i++) { - _swarms[i].AddPeersAsync(new[] { _swarms[i + 1].AsPeer }, null) - .Wait(WaitTimeout); + _swarms[i].RoutingTable.AddPeer(_swarms[i + 1].AsPeer as BoundPeer); } _blockChains[0].Append(_blocks[1]); diff --git a/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs b/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs index 5e8c9807145..8b354ece7b3 100644 --- a/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs +++ b/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs @@ -32,11 +32,12 @@ public async Task DetectAppProtocolVersion() var peers = new[] { c.AsPeer, d.AsPeer }; - foreach (var peer in peers) - { - await a.AddPeersAsync(new[] { peer }, null); - await b.AddPeersAsync(new[] { peer }, null); - } + await a.BootstrapAsync(new[] { c.AsPeer }, null, null); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null)); + await b.BootstrapAsync(new[] { d.AsPeer }, null, null); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); @@ -148,13 +149,20 @@ AppProtocolVersion localVersion await StartAsync(e); await StartAsync(f); - var peers = new[] { c.AsPeer, d.AsPeer, e.AsPeer, f.AsPeer }; - - foreach (var peer in peers) - { - await a.AddPeersAsync(new[] { peer }, null); - await b.AddPeersAsync(new[] { peer }, null); - } + await a.BootstrapAsync(new[] { c.AsPeer }, null, null); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { e.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { f.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null)); + await b.BootstrapAsync(new[] { d.AsPeer }, null, null); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { e.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { f.AsPeer }, null, null)); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index 15f12dee761..e16cd6e3e1a 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -47,7 +47,7 @@ public async Task BroadcastBlockToReconnectedPeer() Assert.Equal(swarmA.AsPeer, swarmB.AsPeer); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + await swarmA.BootstrapAsync(new[] { seed.AsPeer }, null, null); await StopAsync(swarmA); await seed.PeerDiscovery.RefreshTableAsync( TimeSpan.Zero, @@ -55,7 +55,7 @@ await seed.PeerDiscovery.RefreshTableAsync( Assert.DoesNotContain(swarmA.AsPeer, seed.Peers); - await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); + await swarmB.BootstrapAsync(new[] { seed.AsPeer }, null, null); // This is added for context switching. await Task.Delay(100); @@ -107,7 +107,7 @@ public async Task BroadcastIgnoreFromDifferentGenesisHash() await StartAsync(receiverSwarm); await StartAsync(seedSwarm); - await seedSwarm.AddPeersAsync(new[] { receiverSwarm.AsPeer }, null); + seedSwarm.AddPeers(new[] { receiverSwarm.AsPeer }); Block block = await seedChain.MineBlock(seedSwarm.Address); seedSwarm.BroadcastBlock(block); while (!((NetMQTransport)receiverSwarm.Transport).MessageHistory @@ -178,7 +178,7 @@ CancellationToken cancellationToken await StartAsync(a); await StartAsync(b); - await a.AddPeersAsync(new[] { b.AsPeer }, null); + a.AddPeers(new[] { b.AsPeer }); var minerCanceller = new CancellationTokenSource(); Task miningA = CreateMiner(a, chainA, 5000, minerCanceller.Token); @@ -228,9 +228,9 @@ public async Task BroadcastTx() await StartAsync(swarmB); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null); - await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmB.AddPeers(new[] { swarmC.AsPeer }); + swarmC.AddPeers(new[] { swarmA.AsPeer }); swarmA.BroadcastTxs(new[] { tx }); @@ -272,7 +272,7 @@ public async Task BroadcastTxWhileMining() await StartAsync(swarmA); await StartAsync(swarmC); - await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + swarmA.AddPeers(new[] { swarmC.AsPeer }); for (var i = 0; i < 100; i++) { @@ -333,7 +333,7 @@ public async Task BroadcastTxAsync() await StartAsync(swarmC); // Broadcast tx swarmA to swarmB - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); await swarmB.TxReceived.WaitAsync(); Assert.Equal(tx, chainB.GetTransaction(tx.Id)); @@ -341,7 +341,7 @@ public async Task BroadcastTxAsync() await StopAsync(swarmA); // Re-Broadcast received tx swarmB to swarmC - await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null); + swarmB.AddPeers(new[] { swarmC.AsPeer }); await swarmC.TxReceived.WaitAsync(); Assert.Equal(tx, chainC.GetTransaction(tx.Id)); @@ -457,7 +457,7 @@ public async Task DoNotRebroadcastTxsWithLowerNonce() Assert.Equal(1, tx2.Nonce); await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); swarmA.BroadcastTxs(new[] { tx1, tx2 }); await swarmB.TxReceived.WaitAsync(); Assert.Equal( @@ -482,8 +482,8 @@ public async Task DoNotRebroadcastTxsWithLowerNonce() Assert.Equal(2, tx4.Nonce); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmB.AddPeers(new[] { swarmC.AsPeer }); swarmA.BroadcastTxs(new[] { tx3, tx4 }); await swarmC.TxReceived.WaitAsync(); @@ -539,8 +539,12 @@ public async Task CanBroadcastBlock() await StartAsync(swarmB); await StartAsync(swarmC); - await BootstrapAsync(swarmB, swarmA.AsPeer); - await BootstrapAsync(swarmC, swarmA.AsPeer); + swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeers(new[] { swarmC.AsPeer }); + swarmB.AddPeers(new[] { swarmC.AsPeer }); + swarmB.AddPeers(new[] { swarmA.AsPeer }); + swarmC.AddPeers(new[] { swarmA.AsPeer }); + swarmC.AddPeers(new[] { swarmB.AsPeer }); swarmB.BroadcastBlock(chainB[-1]); @@ -717,7 +721,7 @@ public async Task IgnoreExistingBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); swarmA.BroadcastBlock(chainA[-1]); await swarmB.BlockAppended.WaitAsync(); diff --git a/Libplanet.Tests/Net/SwarmTest.Preload.cs b/Libplanet.Tests/Net/SwarmTest.Preload.cs index b74c496f652..0c97234ee4b 100644 --- a/Libplanet.Tests/Net/SwarmTest.Preload.cs +++ b/Libplanet.Tests/Net/SwarmTest.Preload.cs @@ -42,7 +42,7 @@ public async Task InitialBlockDownload() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); await receiverSwarm.PreloadAsync(); @@ -85,7 +85,7 @@ public async Task InitialBlockDownloadStates() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); await receiverSwarm.PreloadAsync(); var state = receiverChain.GetState(address1); @@ -143,7 +143,7 @@ public async Task Preload() { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); _logger.Verbose("Both chains before synchronization:"); _logger.CompareBothChains( @@ -259,7 +259,7 @@ public async Task BlockDownloadTimeout() { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); Task waitTask = receiverSwarm.BlockDownloadStarted.WaitAsync(); Task preloadTask = receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15)); @@ -308,7 +308,7 @@ public async Task PreloadWithFailedActions() { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(1)); var action = new ThrowException { ThrowOnExecution = true }; @@ -393,11 +393,11 @@ public async Task PreloadFromNominer() nominerSwarm0.FindNextHashesChunkSize = 2; nominerSwarm1.FindNextHashesChunkSize = 2; - await nominerSwarm0.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + nominerSwarm0.AddPeers(new[] { minerSwarm.AsPeer }); await nominerSwarm0.PreloadAsync(); - await nominerSwarm1.AddPeersAsync(new[] { nominerSwarm0.AsPeer }, null); + nominerSwarm1.AddPeers(new[] { nominerSwarm0.AsPeer }); await nominerSwarm1.PreloadAsync(); - await receiverSwarm.AddPeersAsync(new[] { nominerSwarm1.AsPeer }, null); + receiverSwarm.AddPeers(new[] { nominerSwarm1.AsPeer }); await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15), progress); // Await 1 second to make sure all progresses is reported. @@ -500,7 +500,7 @@ public async Task PreloadRetryWithNextPeers(int blockCount) Assert.Equal(swarm0.BlockChain.BlockHashes, swarm1.BlockChain.BlockHashes); - await receiverSwarm.AddPeersAsync(new[] { swarm0.AsPeer, swarm1.AsPeer }, null); + receiverSwarm.AddPeers(new[] { swarm0.AsPeer, swarm1.AsPeer }); Assert.Equal( new[] { swarm0.AsPeer, swarm1.AsPeer }.ToImmutableHashSet(), receiverSwarm.Peers.ToImmutableHashSet()); @@ -557,7 +557,7 @@ public async Task PreloadAsyncCancellation(int cancelAfter) minerSwarm.FindNextHashesChunkSize = 2; await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); CancellationTokenSource cts = new CancellationTokenSource(); cts.CancelAfter(cancelAfter); @@ -682,7 +682,7 @@ public async Task PreloadAfterReorg() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); await receiverSwarm.PreloadAsync(); } finally @@ -765,7 +765,7 @@ public async Task PreloadDeleteOnlyTempChain() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); await receiverSwarm.PreloadAsync(); } finally @@ -803,9 +803,7 @@ public async Task PreloadFromTheMostDifficultChain() { await StartAsync(minerSwarm1); await StartAsync(minerSwarm2); - await receiverSwarm.AddPeersAsync( - new[] { minerSwarm1.AsPeer, minerSwarm2.AsPeer }, - null); + receiverSwarm.AddPeers(new[] { minerSwarm1.AsPeer, minerSwarm2.AsPeer }); await receiverSwarm.PreloadAsync(); } finally @@ -876,8 +874,7 @@ BlockChain MakeBlockChain(Block genesisBlock) => await StartAsync(validSeedSwarm); await StartAsync(invalidSeedSwarm); - await receiverSwarm.AddPeersAsync( - new[] { validSeedSwarm.AsPeer, invalidSeedSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { validSeedSwarm.AsPeer, invalidSeedSwarm.AsPeer }); await receiverSwarm.PreloadAsync(); Assert.Equal(receiverChain.Tip, validSeedChain.Tip); diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index a81f2502f86..66efdd70841 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -105,9 +105,9 @@ public async Task HandleReconnection() await StartAsync(seed); await StartAsync(swarmA); await StartAsync(swarmB); - await seed.AddPeersAsync(new[] { swarmA.AsPeer }, null); + seed.AddPeers(new[] { swarmA.AsPeer }); await StopAsync(swarmA); - await seed.AddPeersAsync(new[] { swarmB.AsPeer }, null); + seed.AddPeers(new[] { swarmB.AsPeer }); Assert.Contains(swarmB.AsPeer, seed.Peers); } @@ -166,49 +166,6 @@ public async Task CanWaitForRunning() Assert.False(swarm.Running); } - [Fact(Timeout = Timeout)] - public async Task AddPeersWithoutStart() - { - Swarm a = CreateSwarm(); - Swarm b = CreateSwarm(); - - try - { - await StartAsync(b); - - await a.AddPeersAsync(new Peer[] { b.AsPeer }, null); - - Assert.Contains(b.AsPeer, a.Peers); - Assert.Empty(b.Peers); - } - finally - { - await StopAsync(b); - } - } - - [Fact(Timeout = Timeout)] - public async Task AddPeersAsync() - { - Swarm a = CreateSwarm(); - Swarm b = CreateSwarm(); - - try - { - await StartAsync(a); - await StartAsync(b); - - await a.AddPeersAsync(new Peer[] { b.AsPeer }, null); - - Assert.Contains(b.AsPeer, a.Peers); - } - finally - { - await StopAsync(a); - await StopAsync(b); - } - } - [Fact(Timeout = Timeout)] public async Task BootstrapException() { @@ -327,7 +284,7 @@ public async Task GetBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); (long, HashDigest)[] inventories1 = ( await swarmB.GetBlockHashes( @@ -394,7 +351,7 @@ public async Task GetMultipleBlocksAtOnce() var peer = swarmA.AsPeer as BoundPeer; - await swarmB.AddPeersAsync(new[] { peer }, null); + swarmB.AddPeers(new[] { peer }); Tuple>[] hashes = await swarmB.GetBlockHashes( peer, @@ -471,7 +428,7 @@ public async Task GetTx() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); List> txs = await swarmA.GetTxsAsync( @@ -573,15 +530,15 @@ public async Task ExchangeWithIceServer() await StartAsync(swarmB); // Public-Private - await seed.AddPeersAsync(new[] { swarmA.AsPeer }, null); + await seed.BootstrapAsync(new[] { swarmA.AsPeer }, null, null); Assert.Contains(swarmA.AsPeer, seed.Peers); // Private-Public - await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); + await swarmB.BootstrapAsync(new[] { seed.AsPeer }, null, null); Assert.Contains(seed.AsPeer, swarmB.Peers); // Private-Private - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + await swarmA.BootstrapAsync(new[] { swarmB.AsPeer }, null, null); Assert.Contains(swarmB.AsPeer, swarmA.Peers); } finally @@ -656,7 +613,7 @@ async Task MineAndBroadcast(CancellationToken cancellationToken) await StartAsync(seed); await StartAsync(swarmA); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + swarmA.AddPeers(new[] { seed.AsPeer }); cts.Cancel(); await proxyTask; @@ -718,7 +675,7 @@ public async Task RemoveForkedChainWhenFillBlocksAsyncFail() { await StartAsync(swarm1); await StartAsync(swarm2); - await swarm2.AddPeersAsync(new[] { swarm1.AsPeer }, null); + swarm2.AddPeers(new[] { swarm1.AsPeer }); swarm2.BroadcastBlock(block3); await swarm1.FillBlocksAsyncFailed.WaitAsync(); @@ -1169,7 +1126,7 @@ public async Task DoNotDeleteCanonicalChainWhenBlockDownloadFailed() { await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); swarmA.BroadcastBlock(block); await swarmB.FillBlocksAsyncStarted.WaitAsync(); @@ -1238,8 +1195,8 @@ BlockChain MakeGenesisChain( await StartAsync(swarmB); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); - await swarmA.AddPeersAsync(new[] { swarmC.AsPeer }, null); + swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeers(new[] { swarmC.AsPeer }); var block = await swarmA.BlockChain.MineBlock(swarmA.Address); @@ -1285,9 +1242,9 @@ public async Task FindSpecificPeerAsync() await StartAsync(swarmC); await StartAsync(swarmD); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null); - await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null); + swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); + swarmB.AddPeers(new Peer[] { swarmC.AsPeer }); + swarmC.AddPeers(new Peer[] { swarmD.AsPeer }); BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync( swarmB.AsPeer.Address, @@ -1327,8 +1284,8 @@ public async Task FindSpecificPeerAsyncFail() await StartAsync(swarmB); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null); + swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); + swarmB.AddPeers(new Peer[] { swarmC.AsPeer }); await StopAsync(swarmB); @@ -1369,9 +1326,9 @@ public async Task FindSpecificPeerAsyncDepthFail() await StartAsync(swarmC); await StartAsync(swarmD); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null); - await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null); + swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); + swarmB.AddPeers(new Peer[] { swarmC.AsPeer }); + swarmC.AddPeers(new Peer[] { swarmD.AsPeer }); BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync( swarmC.AsPeer.Address, @@ -1381,7 +1338,7 @@ public async Task FindSpecificPeerAsyncDepthFail() Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address); swarmA.RoutingTable.Clear(); Assert.Empty(swarmA.Peers); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); + swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); foundPeer = await swarmA.FindSpecificPeerAsync( swarmD.AsPeer.Address, diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index baf4344c264..b5ca15f00f0 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -908,22 +908,27 @@ public async Task CheckAllPeersAsync( await kademliaProtocol.CheckAllPeersAsync(timeout, cancellationToken); } - internal async Task AddPeersAsync( - IEnumerable peers, - TimeSpan? timeout, - CancellationToken cancellationToken = default(CancellationToken)) + /// + /// This methods directly adds given peer to routing table. + /// It is not recommended to use this method outside of testing purpose. + /// Use + /// + /// instead to initiate network connection. + /// + /// List of s to add. + /// + /// Thrown when is null. + internal void AddPeers(IEnumerable peers) { - if (Transport is null) + if (RoutingTable is null) { - throw new ArgumentNullException(nameof(Transport)); + throw new ArgumentNullException(nameof(RoutingTable)); } - if (cancellationToken == default(CancellationToken)) + foreach (var peer in peers.Where(p => p is BoundPeer _).Select(p => p as BoundPeer)) { - cancellationToken = _cancellationToken; + RoutingTable.AddPeer(peer); } - - await PeerDiscovery.AddPeersAsync(peers, timeout, cancellationToken); } // FIXME: This would be better if it's merged with GetDemandBlockHashes From f1b71c64e42689188cc207f00281dcf89131a22a Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Wed, 24 Feb 2021 17:59:00 +0900 Subject: [PATCH 6/8] Reject peer with invalid address --- Libplanet.Tests/Net/Protocols/ProtocolTest.cs | 35 +++++++++++++++++++ Libplanet/Net/Protocols/KademliaProtocol.cs | 9 +++++ 2 files changed, 44 insertions(+) diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs index 7417d9514c6..770cb4d0c4e 100644 --- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs +++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs @@ -462,6 +462,41 @@ public async Task DoNotBroadcastToSourcePeer() } } + [Fact(Timeout = Timeout)] + public async Task DoNotAddMaliciousPeerToTable() + { + var actualPrivateKey = new PrivateKey(); + var maliciousPrivateKey = new PrivateKey(); + const int port = 1250; + TestTransport transportA = CreateTestTransport(); + TestTransport transportB = + CreateTestTransport(privateKey: actualPrivateKey, listenPort: port); + + try + { + await StartTestTransportAsync(transportA); + await StartTestTransportAsync(transportB); + + var bytes = new byte[10]; + new Random().NextBytes(bytes); + var maliciousPeer = new BoundPeer( + maliciousPrivateKey.PublicKey, + (transportB.AsPeer as BoundPeer)?.EndPoint); + var kp = (KademliaProtocol)transportA.Protocol; + + // Remote peer of Pong does not match with the target peer. + await Assert.ThrowsAsync(async () => + await kp.PingAsync(maliciousPeer, null, default)); + + Assert.DoesNotContain(maliciousPeer, transportA.Peers); + } + finally + { + await transportA.StopAsync(TimeSpan.Zero); + await transportB.StopAsync(TimeSpan.Zero); + } + } + private TestTransport CreateTestTransport( PrivateKey privateKey = null, string host = null, diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index 3f3bdc27650..ca661d72e82 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -468,6 +468,15 @@ internal async Task PingAsync( throw new InvalidMessageException("Cannot receive pong from self", pong); } + if (!pong.Remote.Address.Equals(target.Address)) + { + // Verify sender + throw new InvalidMessageException( + "Remote peer of Pong does not match with the target peer. " + + $"(Expected: {target.Address}, Actual: {pong.Remote.Address})", + pong); + } + _table.AddPeer(target); } catch (TimeoutException) From 9911c4f968a79f95b4a554684a8c25df38aa8f13 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Thu, 25 Feb 2021 14:17:43 +0900 Subject: [PATCH 7/8] Move Swarm.AddPeers method to test extension --- Libplanet.Tests/Net/SwarmExtensions.cs | 31 ++++++++++++++++ Libplanet.Tests/Net/SwarmTest.Broadcast.cs | 33 ++++++++--------- Libplanet.Tests/Net/SwarmTest.Preload.cs | 22 ++++++------ Libplanet.Tests/Net/SwarmTest.cs | 41 +++++++++++----------- Libplanet/Net/Swarm.cs | 23 ------------ 5 files changed, 78 insertions(+), 72 deletions(-) create mode 100644 Libplanet.Tests/Net/SwarmExtensions.cs diff --git a/Libplanet.Tests/Net/SwarmExtensions.cs b/Libplanet.Tests/Net/SwarmExtensions.cs new file mode 100644 index 00000000000..6a55d641de5 --- /dev/null +++ b/Libplanet.Tests/Net/SwarmExtensions.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Libplanet.Action; +using Libplanet.Net; + +namespace Libplanet.Tests.Net +{ + internal static class SwarmExtensions + { + public static void AddPeer(this Swarm swarm, Peer peer) + where T : IAction, new() + { + if (swarm.RoutingTable is null) + { + throw new NullReferenceException(nameof(swarm.RoutingTable)); + } + + swarm.RoutingTable.AddPeer(peer as BoundPeer); + } + + public static void AddPeers(this Swarm swarm, IEnumerable peers) + where T : IAction, new() + { + foreach (var peer in peers.Where(p => p is BoundPeer _).Select(p => p as BoundPeer)) + { + swarm.AddPeer(peer); + } + } + } +} diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index e16cd6e3e1a..5761daae115 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -107,7 +107,7 @@ public async Task BroadcastIgnoreFromDifferentGenesisHash() await StartAsync(receiverSwarm); await StartAsync(seedSwarm); - seedSwarm.AddPeers(new[] { receiverSwarm.AsPeer }); + seedSwarm.AddPeer(receiverSwarm.AsPeer); Block block = await seedChain.MineBlock(seedSwarm.Address); seedSwarm.BroadcastBlock(block); while (!((NetMQTransport)receiverSwarm.Transport).MessageHistory @@ -178,7 +178,7 @@ CancellationToken cancellationToken await StartAsync(a); await StartAsync(b); - a.AddPeers(new[] { b.AsPeer }); + a.AddPeer(b.AsPeer); var minerCanceller = new CancellationTokenSource(); Task miningA = CreateMiner(a, chainA, 5000, minerCanceller.Token); @@ -228,9 +228,9 @@ public async Task BroadcastTx() await StartAsync(swarmB); await StartAsync(swarmC); - swarmA.AddPeers(new[] { swarmB.AsPeer }); - swarmB.AddPeers(new[] { swarmC.AsPeer }); - swarmC.AddPeers(new[] { swarmA.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); + swarmC.AddPeer(swarmA.AsPeer); swarmA.BroadcastTxs(new[] { tx }); @@ -272,7 +272,7 @@ public async Task BroadcastTxWhileMining() await StartAsync(swarmA); await StartAsync(swarmC); - swarmA.AddPeers(new[] { swarmC.AsPeer }); + swarmA.AddPeer(swarmC.AsPeer); for (var i = 0; i < 100; i++) { @@ -333,7 +333,7 @@ public async Task BroadcastTxAsync() await StartAsync(swarmC); // Broadcast tx swarmA to swarmB - swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); await swarmB.TxReceived.WaitAsync(); Assert.Equal(tx, chainB.GetTransaction(tx.Id)); @@ -341,7 +341,7 @@ public async Task BroadcastTxAsync() await StopAsync(swarmA); // Re-Broadcast received tx swarmB to swarmC - swarmB.AddPeers(new[] { swarmC.AsPeer }); + swarmB.AddPeer(swarmC.AsPeer); await swarmC.TxReceived.WaitAsync(); Assert.Equal(tx, chainC.GetTransaction(tx.Id)); @@ -457,7 +457,7 @@ public async Task DoNotRebroadcastTxsWithLowerNonce() Assert.Equal(1, tx2.Nonce); await StartAsync(swarmA); await StartAsync(swarmB); - swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastTxs(new[] { tx1, tx2 }); await swarmB.TxReceived.WaitAsync(); Assert.Equal( @@ -482,8 +482,8 @@ public async Task DoNotRebroadcastTxsWithLowerNonce() Assert.Equal(2, tx4.Nonce); await StartAsync(swarmC); - swarmA.AddPeers(new[] { swarmB.AsPeer }); - swarmB.AddPeers(new[] { swarmC.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); swarmA.BroadcastTxs(new[] { tx3, tx4 }); await swarmC.TxReceived.WaitAsync(); @@ -539,12 +539,9 @@ public async Task CanBroadcastBlock() await StartAsync(swarmB); await StartAsync(swarmC); - swarmA.AddPeers(new[] { swarmB.AsPeer }); - swarmA.AddPeers(new[] { swarmC.AsPeer }); - swarmB.AddPeers(new[] { swarmC.AsPeer }); - swarmB.AddPeers(new[] { swarmA.AsPeer }); - swarmC.AddPeers(new[] { swarmA.AsPeer }); - swarmC.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeers(new[] { swarmB.AsPeer, swarmC.AsPeer }); + swarmB.AddPeers(new[] { swarmC.AsPeer, swarmA.AsPeer }); + swarmC.AddPeers(new[] { swarmA.AsPeer, swarmB.AsPeer }); swarmB.BroadcastBlock(chainB[-1]); @@ -721,7 +718,7 @@ public async Task IgnoreExistingBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastBlock(chainA[-1]); await swarmB.BlockAppended.WaitAsync(); diff --git a/Libplanet.Tests/Net/SwarmTest.Preload.cs b/Libplanet.Tests/Net/SwarmTest.Preload.cs index 0c97234ee4b..6ab9b5060dd 100644 --- a/Libplanet.Tests/Net/SwarmTest.Preload.cs +++ b/Libplanet.Tests/Net/SwarmTest.Preload.cs @@ -42,7 +42,7 @@ public async Task InitialBlockDownload() try { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); @@ -85,7 +85,7 @@ public async Task InitialBlockDownloadStates() try { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); var state = receiverChain.GetState(address1); @@ -143,7 +143,7 @@ public async Task Preload() { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); _logger.Verbose("Both chains before synchronization:"); _logger.CompareBothChains( @@ -259,7 +259,7 @@ public async Task BlockDownloadTimeout() { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); Task waitTask = receiverSwarm.BlockDownloadStarted.WaitAsync(); Task preloadTask = receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15)); @@ -308,7 +308,7 @@ public async Task PreloadWithFailedActions() { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(1)); var action = new ThrowException { ThrowOnExecution = true }; @@ -393,11 +393,11 @@ public async Task PreloadFromNominer() nominerSwarm0.FindNextHashesChunkSize = 2; nominerSwarm1.FindNextHashesChunkSize = 2; - nominerSwarm0.AddPeers(new[] { minerSwarm.AsPeer }); + nominerSwarm0.AddPeer(minerSwarm.AsPeer); await nominerSwarm0.PreloadAsync(); - nominerSwarm1.AddPeers(new[] { nominerSwarm0.AsPeer }); + nominerSwarm1.AddPeer(nominerSwarm0.AsPeer); await nominerSwarm1.PreloadAsync(); - receiverSwarm.AddPeers(new[] { nominerSwarm1.AsPeer }); + receiverSwarm.AddPeer(nominerSwarm1.AsPeer); await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15), progress); // Await 1 second to make sure all progresses is reported. @@ -557,7 +557,7 @@ public async Task PreloadAsyncCancellation(int cancelAfter) minerSwarm.FindNextHashesChunkSize = 2; await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); CancellationTokenSource cts = new CancellationTokenSource(); cts.CancelAfter(cancelAfter); @@ -682,7 +682,7 @@ public async Task PreloadAfterReorg() try { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); } finally @@ -765,7 +765,7 @@ public async Task PreloadDeleteOnlyTempChain() try { await StartAsync(minerSwarm); - receiverSwarm.AddPeers(new[] { minerSwarm.AsPeer }); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); } finally diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 66efdd70841..20a0cb37c75 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -105,9 +105,9 @@ public async Task HandleReconnection() await StartAsync(seed); await StartAsync(swarmA); await StartAsync(swarmB); - seed.AddPeers(new[] { swarmA.AsPeer }); + seed.AddPeer(swarmA.AsPeer); await StopAsync(swarmA); - seed.AddPeers(new[] { swarmB.AsPeer }); + seed.AddPeer(swarmB.AsPeer); Assert.Contains(swarmB.AsPeer, seed.Peers); } @@ -284,7 +284,7 @@ public async Task GetBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); (long, HashDigest)[] inventories1 = ( await swarmB.GetBlockHashes( @@ -351,7 +351,7 @@ public async Task GetMultipleBlocksAtOnce() var peer = swarmA.AsPeer as BoundPeer; - swarmB.AddPeers(new[] { peer }); + swarmB.AddPeer(peer); Tuple>[] hashes = await swarmB.GetBlockHashes( peer, @@ -428,7 +428,7 @@ public async Task GetTx() await StartAsync(swarmA); await StartAsync(swarmB); - swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); List> txs = await swarmA.GetTxsAsync( @@ -613,7 +613,7 @@ async Task MineAndBroadcast(CancellationToken cancellationToken) await StartAsync(seed); await StartAsync(swarmA); - swarmA.AddPeers(new[] { seed.AsPeer }); + swarmA.AddPeer(seed.AsPeer); cts.Cancel(); await proxyTask; @@ -675,7 +675,7 @@ public async Task RemoveForkedChainWhenFillBlocksAsyncFail() { await StartAsync(swarm1); await StartAsync(swarm2); - swarm2.AddPeers(new[] { swarm1.AsPeer }); + swarm2.AddPeer(swarm1.AsPeer); swarm2.BroadcastBlock(block3); await swarm1.FillBlocksAsyncFailed.WaitAsync(); @@ -734,7 +734,7 @@ public async Task RenderInFork() await StartAsync(miner1); await StartAsync(miner2); - await BootstrapAsync(miner2, miner1.AsPeer); + miner2.AddPeer(miner1.AsPeer); miner2.BroadcastBlock(latest); @@ -781,6 +781,7 @@ public async Task ForkByDifficulty() await StartAsync(miner2); await BootstrapAsync(miner2, miner1.AsPeer); + miner2.AddPeer(miner1.AsPeer); miner2.BroadcastBlock(block); await miner1.BlockReceived.WaitAsync(); @@ -1126,7 +1127,7 @@ public async Task DoNotDeleteCanonicalChainWhenBlockDownloadFailed() { await StartAsync(swarmA); await StartAsync(swarmB); - swarmA.AddPeers(new[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastBlock(block); await swarmB.FillBlocksAsyncStarted.WaitAsync(); @@ -1195,8 +1196,8 @@ BlockChain MakeGenesisChain( await StartAsync(swarmB); await StartAsync(swarmC); - swarmA.AddPeers(new[] { swarmB.AsPeer }); - swarmA.AddPeers(new[] { swarmC.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); + swarmA.AddPeer(swarmC.AsPeer); var block = await swarmA.BlockChain.MineBlock(swarmA.Address); @@ -1242,9 +1243,9 @@ public async Task FindSpecificPeerAsync() await StartAsync(swarmC); await StartAsync(swarmD); - swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); - swarmB.AddPeers(new Peer[] { swarmC.AsPeer }); - swarmC.AddPeers(new Peer[] { swarmD.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); + swarmC.AddPeer(swarmD.AsPeer); BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync( swarmB.AsPeer.Address, @@ -1284,8 +1285,8 @@ public async Task FindSpecificPeerAsyncFail() await StartAsync(swarmB); await StartAsync(swarmC); - swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); - swarmB.AddPeers(new Peer[] { swarmC.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); await StopAsync(swarmB); @@ -1326,9 +1327,9 @@ public async Task FindSpecificPeerAsyncDepthFail() await StartAsync(swarmC); await StartAsync(swarmD); - swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); - swarmB.AddPeers(new Peer[] { swarmC.AsPeer }); - swarmC.AddPeers(new Peer[] { swarmD.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); + swarmC.AddPeer(swarmD.AsPeer); BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync( swarmC.AsPeer.Address, @@ -1338,7 +1339,7 @@ public async Task FindSpecificPeerAsyncDepthFail() Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address); swarmA.RoutingTable.Clear(); Assert.Empty(swarmA.Peers); - swarmA.AddPeers(new Peer[] { swarmB.AsPeer }); + swarmA.AddPeer(swarmB.AsPeer); foundPeer = await swarmA.FindSpecificPeerAsync( swarmD.AsPeer.Address, diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index b5ca15f00f0..b0126affea0 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -908,29 +908,6 @@ public async Task CheckAllPeersAsync( await kademliaProtocol.CheckAllPeersAsync(timeout, cancellationToken); } - /// - /// This methods directly adds given peer to routing table. - /// It is not recommended to use this method outside of testing purpose. - /// Use - /// - /// instead to initiate network connection. - /// - /// List of s to add. - /// - /// Thrown when is null. - internal void AddPeers(IEnumerable peers) - { - if (RoutingTable is null) - { - throw new ArgumentNullException(nameof(RoutingTable)); - } - - foreach (var peer in peers.Where(p => p is BoundPeer _).Select(p => p as BoundPeer)) - { - RoutingTable.AddPeer(peer); - } - } - // FIXME: This would be better if it's merged with GetDemandBlockHashes internal async IAsyncEnumerable>> GetBlockHashes( BoundPeer peer, From bda96b11a9f1eadc033298bc1953d1881d47d585 Mon Sep 17 00:00:00 2001 From: Chanhyuck Ko Date: Fri, 26 Feb 2021 16:31:20 +0900 Subject: [PATCH 8/8] Fix nondeterministic unit tests --- Libplanet.Tests/Net/Protocols/ProtocolTest.cs | 19 +++---- Libplanet.Tests/Net/SwarmTest.Broadcast.cs | 4 +- Libplanet.Tests/Net/SwarmTest.cs | 52 +++++++++++-------- Libplanet/Net/Protocols/KademliaProtocol.cs | 6 +++ 4 files changed, 48 insertions(+), 33 deletions(-) diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs index 770cb4d0c4e..a6d04f28829 100644 --- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs +++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs @@ -212,9 +212,9 @@ public async Task RoutingTableFull() await StartTestTransportAsync(transportB); await StartTestTransportAsync(transportC); - await transportA.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportB.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportC.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportA.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportB.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportC.AsPeer }, null); Assert.Single(transportA.Peers); Assert.Contains(transportA.AsPeer, transport.Peers); @@ -240,10 +240,10 @@ public async Task ReplacementCache() await StartTestTransportAsync(transportB); await StartTestTransportAsync(transportC); - await transportA.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportB.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportA.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportB.AsPeer }, null); await Task.Delay(100); - await transportC.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportC.AsPeer }, null); Assert.Single(transportA.Peers); Assert.Contains(transportA.AsPeer, transport.Peers); @@ -276,8 +276,8 @@ public async Task RemoveDeadReplacementCache() await StartTestTransportAsync(transportB); await StartTestTransportAsync(transportC); - await transportA.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportB.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportA.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportB.AsPeer }, null); Assert.Single(transport.Peers); Assert.Contains(transportA.AsPeer, transport.Peers); @@ -286,7 +286,7 @@ public async Task RemoveDeadReplacementCache() await transportA.StopAsync(TimeSpan.Zero); await transportB.StopAsync(TimeSpan.Zero); - await transportC.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportC.AsPeer }, null); await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken)); await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken)); @@ -323,6 +323,7 @@ public async Task BroadcastMessage(int count) } Log.Debug("Bootstrap completed."); + await Task.Delay(1000); var tasks = transports.Select(transport => transport.WaitForTestMessageWithData("foo")); diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index 5761daae115..aab1c0b45c6 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -627,7 +627,7 @@ public async Task BroadcastBlockWithSkip() await StartAsync(minerSwarm); await StartAsync(receiverSwarm); - await BootstrapAsync(receiverSwarm, minerSwarm.AsPeer); + minerSwarm.AddPeer(receiverSwarm.AsPeer); var block1 = TestUtils.MineNext( blockChain.Genesis, @@ -673,7 +673,7 @@ public async Task BroadcastBlockWithoutGenesis() await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmB, swarmA.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); await chainA.MineBlock(swarmA.Address); swarmA.BroadcastBlock(chainA[-1]); diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 20a0cb37c75..3fd544d86ff 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -223,6 +223,7 @@ public async Task AutoConnectAfterStart() await StartAsync(swarmB); await BootstrapAsync(swarmA, swarmB.AsPeer); + await Task.Delay(1000); Assert.Contains(swarmB.AsPeer, swarmA.Peers); Assert.Empty(swarmB.Peers); @@ -780,7 +781,6 @@ public async Task ForkByDifficulty() await StartAsync(miner1); await StartAsync(miner2); - await BootstrapAsync(miner2, miner1.AsPeer); miner2.AddPeer(miner1.AsPeer); miner2.BroadcastBlock(block); @@ -831,8 +831,8 @@ Swarm MakeSwarm() => CreateSwarm(TestUtils.MakeBlockChain( await StartAsync(miner1); await StartAsync(miner2); - await BootstrapAsync(miner2, miner1.AsPeer); - await BootstrapAsync(receiver, miner1.AsPeer); + miner1.AddPeers(new[] { receiver.AsPeer, miner2.AsPeer }); + miner2.AddPeers(new[] { receiver.AsPeer, miner1.AsPeer }); var t = receiver.PreloadAsync(); await miner1.BlockChain.MineBlock(miner1.Address); @@ -896,9 +896,9 @@ public async void RestageTransactionsOnceLocallyMinedAfterReorg(bool restage) await StartAsync(minerA); await StartAsync(minerB); - await BootstrapAsync(minerA, minerB.AsPeer); + minerB.AddPeer(minerA.AsPeer); - Log.Debug("Reorg occurrs."); + Log.Debug("Reorg occurs."); minerB.BroadcastBlock(blockC); await minerA.BlockAppended.WaitAsync(); @@ -962,7 +962,7 @@ bool IsSignerValid(Transaction tx, BlockChain chain) await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmA, swarmB.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastTxs(new[] { validTx, invalidTx }); await swarmB.TxReceived.WaitAsync(); @@ -1025,7 +1025,7 @@ bool IsSignerValid(Transaction tx, BlockChain chain) await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmA, swarmB.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastTxs(new[] { tx }); await swarmB.TxReceived.WaitAsync(); @@ -1072,8 +1072,8 @@ public async Task CreateNewChainWhenBranchPointNotExist() await StartAsync(minerSwarmB); await StartAsync(receiverSwarm); - await BootstrapAsync(minerSwarmA, receiverSwarm.AsPeer); - await BootstrapAsync(minerSwarmB, receiverSwarm.AsPeer); + minerSwarmA.AddPeer(receiverSwarm.AsPeer); + minerSwarmB.AddPeer(receiverSwarm.AsPeer); // Broadcast SwarmA's first block. var b1 = await minerChainA.MineBlock(minerSwarmA.Address); @@ -1382,7 +1382,7 @@ public async Task DoNotFillWhenGetAllBlockAtFirstTimeFromSender() try { - await BootstrapAsync(sender, receiver.AsPeer); + sender.AddPeer(receiver.AsPeer); sender.BroadcastBlock(sender.BlockChain.Tip); @@ -1423,7 +1423,7 @@ public async Task FillWhenGetAllBlocksFromSender() try { - await BootstrapAsync(sender, receiver.AsPeer); + sender.AddPeer(receiver.AsPeer); sender.BroadcastBlock(sender.BlockChain.Tip); @@ -1458,8 +1458,8 @@ public async Task DoNotFillMultipleTimes() try { - await BootstrapAsync(sender1, receiver.AsPeer); - await BootstrapAsync(sender2, receiver.AsPeer); + sender1.AddPeer(receiver.AsPeer); + sender2.AddPeer(receiver.AsPeer); sender1.BlockChain.Append(b1); sender2.BlockChain.Append(b1); @@ -1523,7 +1523,7 @@ public async Task GetPeerChainStateAsync() await StartAsync(swarm2); await StartAsync(swarm3); - await BootstrapAsync(swarm1, swarm2.AsPeer); + swarm1.AddPeer(swarm2.AsPeer); peerChainState = await swarm1.GetPeerChainStateAsync( TimeSpan.FromSeconds(1), default); @@ -1540,7 +1540,7 @@ public async Task GetPeerChainStateAsync() peerChainState.First() ); - await BootstrapAsync(swarm1, swarm3.AsPeer); + swarm1.AddPeer(swarm3.AsPeer); peerChainState = await swarm1.GetPeerChainStateAsync( TimeSpan.FromSeconds(1), default); Assert.Equal( @@ -1573,6 +1573,7 @@ public async Task LastMessageTimestamp() Assert.Null(swarm1.LastMessageTimestamp); DateTimeOffset bootstrappedAt = DateTimeOffset.UtcNow; await BootstrapAsync(swarm2, swarm1.AsPeer); + await Task.Delay(1000); await StartAsync(swarm2); Assert.NotNull(swarm1.LastMessageTimestamp); @@ -1600,25 +1601,32 @@ public async Task Restart() { // Setup await StartAsync(swarm1); - await BootstrapAsync(swarm2, swarm1.AsPeer); await StartAsync(swarm2); - + await BootstrapAsync(swarm2, swarm1.AsPeer); await Task.Delay(1000); + Assert.True( + swarm1.Peers.Contains(swarm2.AsPeer), + "Swarm2 does not exist in swarm1's routing table after setup."); + // Restart await swarm1.StopAsync(); Assert.False(swarm1.Running); - await Task.Delay(1000); await StartAsync(swarm1); DateTimeOffset restartedAt = DateTimeOffset.UtcNow; // Check await swarm1.CheckAllPeersAsync(); await swarm2.CheckAllPeersAsync(); - await Task.Delay(1000); - Assert.Contains(swarm1.AsPeer, swarm2.Peers); - Assert.Contains(swarm2.AsPeer, swarm1.Peers); + Assert.True( + swarm1.Peers.Contains(swarm2.AsPeer), + "Swarm2 does not exist in swarm1's routing table."); + Assert.True( + swarm2.Peers.Contains(swarm1.AsPeer), + "Swarm1 does not exist in swarm2's routing table."); + Assert.NotNull(swarm1.LastMessageTimestamp); + Assert.NotNull(swarm2.LastMessageTimestamp); Assert.InRange( swarm1.LastMessageTimestamp.Value, restartedAt, @@ -1781,7 +1789,7 @@ public async Task ResetBlockDemandIfNotChanged() await StartAsync(sender); await StartAsync(receiver); - await BootstrapAsync(sender, receiver.AsPeer); + sender.AddPeer(receiver.AsPeer); sender.BroadcastBlock(lowerBlock); await receiver.FillBlocksAsyncStarted.WaitAsync(); diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index ca661d72e82..05d4f522686 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -551,6 +551,12 @@ private async Task ValidateAsync( RemovePeer(peer); throw; } + catch (InvalidMessageException) + { + _logger.Verbose("Peer {Peer} is invalid, removing...", peer); + RemovePeer(peer); + throw; + } } ///