diff --git a/Libplanet.Benchmarks/SwarmBenchmark.cs b/Libplanet.Benchmarks/SwarmBenchmark.cs index ebb31b270cb..798bc1ef439 100644 --- a/Libplanet.Benchmarks/SwarmBenchmark.cs +++ b/Libplanet.Benchmarks/SwarmBenchmark.cs @@ -75,8 +75,7 @@ public void InitializeSwarms() for (int i = 0; i < SwarmNumber - 1; i++) { - _swarms[i].AddPeersAsync(new[] { _swarms[i + 1].AsPeer }, null) - .Wait(WaitTimeout); + _swarms[i].RoutingTable.AddPeer(_swarms[i + 1].AsPeer as BoundPeer); } _blockChains[0].Append(_blocks[1]); diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs index dedbf310661..a6d04f28829 100644 --- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs +++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using System.Linq; +using System.Net; using System.Threading; using System.Threading.Tasks; using Libplanet.Crypto; @@ -15,7 +16,9 @@ namespace Libplanet.Tests.Net.Protocols public class ProtocolTest { private const int Timeout = 60 * 1000; - private readonly Dictionary _transports; + private readonly Dictionary _transports; + + private int _assignedPort; public ProtocolTest(ITestOutputHelper output) { @@ -28,7 +31,8 @@ public ProtocolTest(ITestOutputHelper output) .CreateLogger() .ForContext(); - _transports = new Dictionary(); + _transports = new Dictionary(); + _assignedPort = 1234; } [Fact] @@ -82,39 +86,9 @@ public async Task Ping() await transportA.MessageReceived.WaitAsync(); await Task.Delay(100); - Assert.Single(transportA.ReceivedMessages); - Assert.Single(transportB.ReceivedMessages); - Assert.Contains(transportA.AsPeer, transportB.Peers); - } - finally - { - await transportA.StopAsync(TimeSpan.Zero); - await transportB.StopAsync(TimeSpan.Zero); - } - } - - [Fact(Timeout = Timeout)] - public async Task PingTwice() - { - var transportA = CreateTestTransport(); - var transportB = CreateTestTransport(); - - try - { - await StartTestTransportAsync(transportA); - await StartTestTransportAsync(transportB); - - transportA.SendPing(transportB.AsPeer); - await transportA.MessageReceived.WaitAsync(); - await transportB.MessageReceived.WaitAsync(); - transportB.SendPing(transportA.AsPeer); - await transportA.MessageReceived.WaitAsync(); - await transportB.MessageReceived.WaitAsync(); - Assert.Equal(2, transportA.ReceivedMessages.Count); Assert.Equal(2, transportB.ReceivedMessages.Count); Assert.Contains(transportA.AsPeer, transportB.Peers); - Assert.Contains(transportB.AsPeer, transportA.Peers); } finally { @@ -181,7 +155,9 @@ public async Task BootstrapAsyncTest() await StartTestTransportAsync(transportC); await transportB.BootstrapAsync(new[] { transportA.AsPeer }); + await Task.Delay(100); await transportC.BootstrapAsync(new[] { transportA.AsPeer }); + await Task.Delay(100); Assert.Contains(transportB.AsPeer, transportC.Peers); Assert.Contains(transportC.AsPeer, transportB.Peers); @@ -236,9 +212,9 @@ public async Task RoutingTableFull() await StartTestTransportAsync(transportB); await StartTestTransportAsync(transportC); - await transportA.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportB.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportC.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportA.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportB.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportC.AsPeer }, null); Assert.Single(transportA.Peers); Assert.Contains(transportA.AsPeer, transport.Peers); @@ -264,10 +240,10 @@ public async Task ReplacementCache() await StartTestTransportAsync(transportB); await StartTestTransportAsync(transportC); - await transportA.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportB.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportA.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportB.AsPeer }, null); await Task.Delay(100); - await transportC.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportC.AsPeer }, null); Assert.Single(transportA.Peers); Assert.Contains(transportA.AsPeer, transport.Peers); @@ -300,8 +276,8 @@ public async Task RemoveDeadReplacementCache() await StartTestTransportAsync(transportB); await StartTestTransportAsync(transportC); - await transportA.AddPeersAsync(new[] { transport.AsPeer }, null); - await transportB.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportA.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportB.AsPeer }, null); Assert.Single(transport.Peers); Assert.Contains(transportA.AsPeer, transport.Peers); @@ -310,7 +286,7 @@ public async Task RemoveDeadReplacementCache() await transportA.StopAsync(TimeSpan.Zero); await transportB.StopAsync(TimeSpan.Zero); - await transportC.AddPeersAsync(new[] { transport.AsPeer }, null); + await transport.AddPeersAsync(new[] { transportC.AsPeer }, null); await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken)); await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken)); @@ -347,6 +323,7 @@ public async Task BroadcastMessage(int count) } Log.Debug("Bootstrap completed."); + await Task.Delay(1000); var tasks = transports.Select(transport => transport.WaitForTestMessageWithData("foo")); @@ -390,7 +367,7 @@ public async Task BroadcastGuarantee() }); var seed = CreateTestTransport(privateKey0); - var t1 = CreateTestTransport(privateKey1, true); + var t1 = CreateTestTransport(privateKey1, blockBroadcast: true); var t2 = CreateTestTransport(privateKey2); await StartTestTransportAsync(seed); await StartTestTransportAsync(t1); @@ -486,8 +463,45 @@ public async Task DoNotBroadcastToSourcePeer() } } + [Fact(Timeout = Timeout)] + public async Task DoNotAddMaliciousPeerToTable() + { + var actualPrivateKey = new PrivateKey(); + var maliciousPrivateKey = new PrivateKey(); + const int port = 1250; + TestTransport transportA = CreateTestTransport(); + TestTransport transportB = + CreateTestTransport(privateKey: actualPrivateKey, listenPort: port); + + try + { + await StartTestTransportAsync(transportA); + await StartTestTransportAsync(transportB); + + var bytes = new byte[10]; + new Random().NextBytes(bytes); + var maliciousPeer = new BoundPeer( + maliciousPrivateKey.PublicKey, + (transportB.AsPeer as BoundPeer)?.EndPoint); + var kp = (KademliaProtocol)transportA.Protocol; + + // Remote peer of Pong does not match with the target peer. + await Assert.ThrowsAsync(async () => + await kp.PingAsync(maliciousPeer, null, default)); + + Assert.DoesNotContain(maliciousPeer, transportA.Peers); + } + finally + { + await transportA.StopAsync(TimeSpan.Zero); + await transportB.StopAsync(TimeSpan.Zero); + } + } + private TestTransport CreateTestTransport( PrivateKey privateKey = null, + string host = null, + int? listenPort = null, bool blockBroadcast = false, int tableSize = Kademlia.TableSize, int bucketSize = Kademlia.BucketSize, @@ -496,6 +510,8 @@ private TestTransport CreateTestTransport( return new TestTransport( _transports, privateKey ?? new PrivateKey(), + host ?? IPAddress.Loopback.ToString(), + listenPort ?? _assignedPort++, blockBroadcast, tableSize, bucketSize, diff --git a/Libplanet.Tests/Net/Protocols/TestTransport.cs b/Libplanet.Tests/Net/Protocols/TestTransport.cs index 32f216b2818..22492050550 100644 --- a/Libplanet.Tests/Net/Protocols/TestTransport.cs +++ b/Libplanet.Tests/Net/Protocols/TestTransport.cs @@ -21,13 +21,15 @@ internal class TestTransport : ITransport private static readonly AppProtocolVersion AppProtocolVersion = AppProtocolVersion.Sign(VersionSigner, 1); - private readonly Dictionary _transports; + private readonly Dictionary _transports; private readonly ILogger _logger; - private readonly ConcurrentDictionary _peersToReply; + private readonly ConcurrentDictionary _addressToReply; private readonly ConcurrentDictionary _replyToReceive; private readonly AsyncCollection _requests; private readonly List _ignoreTestMessageWithData; private readonly PrivateKey _privateKey; + private readonly string _host; + private readonly int _listenPort; private readonly Random _random; private readonly bool _blockBroadcast; @@ -35,25 +37,29 @@ internal class TestTransport : ITransport private TimeSpan _networkDelay; public TestTransport( - Dictionary transports, + Dictionary transports, PrivateKey privateKey, + string host, + int listenPort, bool blockBroadcast, int tableSize, int bucketSize, TimeSpan? networkDelay) { _privateKey = privateKey; + _host = host; + _listenPort = listenPort; _blockBroadcast = blockBroadcast; var loggerId = _privateKey.ToAddress().ToHex(); _logger = Log.ForContext() .ForContext("Address", loggerId); - _peersToReply = new ConcurrentDictionary(); + _addressToReply = new ConcurrentDictionary(); _replyToReceive = new ConcurrentDictionary(); ReceivedMessages = new ConcurrentBag(); MessageReceived = new AsyncAutoResetEvent(); _transports = transports; - _transports[privateKey.ToAddress()] = this; + _transports[ToAddress(AsPeer as BoundPeer)] = this; _networkDelay = networkDelay ?? TimeSpan.Zero; _requests = new AsyncCollection(); _ignoreTestMessageWithData = new List(); @@ -70,7 +76,7 @@ public TestTransport( public Peer AsPeer => new BoundPeer( _privateKey.PublicKey, - new DnsEndPoint("localhost", 1234)); + new DnsEndPoint(_host, _listenPort)); public IEnumerable Peers => Table.Peers; @@ -390,8 +396,8 @@ public void ReplyMessage(Message message) Task.Run(async () => { await Task.Delay(_networkDelay); - _transports[_peersToReply[message.Identity]].ReceiveReply(message); - _peersToReply.TryRemove(message.Identity, out Address addr); + _transports[_addressToReply[message.Identity]].ReceiveReply(message); + _addressToReply.TryRemove(message.Identity, out string addr); }); } @@ -420,6 +426,11 @@ public bool ReceivedTestMessageOfData(string data) return ReceivedMessages.OfType().Any(msg => msg.Data == data); } + private string ToAddress(BoundPeer peer) + { + return $"{peer.EndPoint.Host}:{peer.EndPoint.Port}"; + } + private void ReceiveMessage(Message message) { if (_swarmCancellationTokenSource.IsCancellationRequested) @@ -451,7 +462,7 @@ private void ReceiveMessage(Message message) } else { - _peersToReply[message.Identity] = boundPeer.Address; + _addressToReply[message.Identity] = ToAddress(boundPeer); } if (message is Ping) @@ -487,7 +498,7 @@ private async Task ProcessRuntime(CancellationToken cancellationToken) req.Message, req.Message.Identity, req.Target); - _transports[req.Target.Address].ReceiveMessage(req.Message); + _transports[ToAddress(req.Target)].ReceiveMessage(req.Message); } else { diff --git a/Libplanet.Tests/Net/SwarmExtensions.cs b/Libplanet.Tests/Net/SwarmExtensions.cs new file mode 100644 index 00000000000..6a55d641de5 --- /dev/null +++ b/Libplanet.Tests/Net/SwarmExtensions.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Libplanet.Action; +using Libplanet.Net; + +namespace Libplanet.Tests.Net +{ + internal static class SwarmExtensions + { + public static void AddPeer(this Swarm swarm, Peer peer) + where T : IAction, new() + { + if (swarm.RoutingTable is null) + { + throw new NullReferenceException(nameof(swarm.RoutingTable)); + } + + swarm.RoutingTable.AddPeer(peer as BoundPeer); + } + + public static void AddPeers(this Swarm swarm, IEnumerable peers) + where T : IAction, new() + { + foreach (var peer in peers.Where(p => p is BoundPeer _).Select(p => p as BoundPeer)) + { + swarm.AddPeer(peer); + } + } + } +} diff --git a/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs b/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs index 5e8c9807145..8b354ece7b3 100644 --- a/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs +++ b/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs @@ -32,11 +32,12 @@ public async Task DetectAppProtocolVersion() var peers = new[] { c.AsPeer, d.AsPeer }; - foreach (var peer in peers) - { - await a.AddPeersAsync(new[] { peer }, null); - await b.AddPeersAsync(new[] { peer }, null); - } + await a.BootstrapAsync(new[] { c.AsPeer }, null, null); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null)); + await b.BootstrapAsync(new[] { d.AsPeer }, null, null); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); @@ -148,13 +149,20 @@ AppProtocolVersion localVersion await StartAsync(e); await StartAsync(f); - var peers = new[] { c.AsPeer, d.AsPeer, e.AsPeer, f.AsPeer }; - - foreach (var peer in peers) - { - await a.AddPeersAsync(new[] { peer }, null); - await b.AddPeersAsync(new[] { peer }, null); - } + await a.BootstrapAsync(new[] { c.AsPeer }, null, null); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { e.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await a.BootstrapAsync(new[] { f.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null)); + await b.BootstrapAsync(new[] { d.AsPeer }, null, null); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { e.AsPeer }, null, null)); + await Assert.ThrowsAsync( + async () => await b.BootstrapAsync(new[] { f.AsPeer }, null, null)); Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray()); Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray()); diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs index c40a9b824a1..aab1c0b45c6 100644 --- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs +++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs @@ -47,7 +47,7 @@ public async Task BroadcastBlockToReconnectedPeer() Assert.Equal(swarmA.AsPeer, swarmB.AsPeer); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + await swarmA.BootstrapAsync(new[] { seed.AsPeer }, null, null); await StopAsync(swarmA); await seed.PeerDiscovery.RefreshTableAsync( TimeSpan.Zero, @@ -55,7 +55,7 @@ await seed.PeerDiscovery.RefreshTableAsync( Assert.DoesNotContain(swarmA.AsPeer, seed.Peers); - await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); + await swarmB.BootstrapAsync(new[] { seed.AsPeer }, null, null); // This is added for context switching. await Task.Delay(100); @@ -107,7 +107,7 @@ public async Task BroadcastIgnoreFromDifferentGenesisHash() await StartAsync(receiverSwarm); await StartAsync(seedSwarm); - await receiverSwarm.AddPeersAsync(new[] { seedSwarm.AsPeer }, null); + seedSwarm.AddPeer(receiverSwarm.AsPeer); Block block = await seedChain.MineBlock(seedSwarm.Address); seedSwarm.BroadcastBlock(block); while (!((NetMQTransport)receiverSwarm.Transport).MessageHistory @@ -178,7 +178,7 @@ CancellationToken cancellationToken await StartAsync(a); await StartAsync(b); - await a.AddPeersAsync(new[] { b.AsPeer }, null); + a.AddPeer(b.AsPeer); var minerCanceller = new CancellationTokenSource(); Task miningA = CreateMiner(a, chainA, 5000, minerCanceller.Token); @@ -228,9 +228,9 @@ public async Task BroadcastTx() await StartAsync(swarmB); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null); - await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); + swarmC.AddPeer(swarmA.AsPeer); swarmA.BroadcastTxs(new[] { tx }); @@ -272,7 +272,7 @@ public async Task BroadcastTxWhileMining() await StartAsync(swarmA); await StartAsync(swarmC); - await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + swarmA.AddPeer(swarmC.AsPeer); for (var i = 0; i < 100; i++) { @@ -333,7 +333,7 @@ public async Task BroadcastTxAsync() await StartAsync(swarmC); // Broadcast tx swarmA to swarmB - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); await swarmB.TxReceived.WaitAsync(); Assert.Equal(tx, chainB.GetTransaction(tx.Id)); @@ -341,7 +341,7 @@ public async Task BroadcastTxAsync() await StopAsync(swarmA); // Re-Broadcast received tx swarmB to swarmC - await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null); + swarmB.AddPeer(swarmC.AsPeer); await swarmC.TxReceived.WaitAsync(); Assert.Equal(tx, chainC.GetTransaction(tx.Id)); @@ -457,7 +457,7 @@ public async Task DoNotRebroadcastTxsWithLowerNonce() Assert.Equal(1, tx2.Nonce); await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastTxs(new[] { tx1, tx2 }); await swarmB.TxReceived.WaitAsync(); Assert.Equal( @@ -482,8 +482,8 @@ public async Task DoNotRebroadcastTxsWithLowerNonce() Assert.Equal(2, tx4.Nonce); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); swarmA.BroadcastTxs(new[] { tx3, tx4 }); await swarmC.TxReceived.WaitAsync(); @@ -539,8 +539,9 @@ public async Task CanBroadcastBlock() await StartAsync(swarmB); await StartAsync(swarmC); - await BootstrapAsync(swarmB, swarmA.AsPeer); - await BootstrapAsync(swarmC, swarmA.AsPeer); + swarmA.AddPeers(new[] { swarmB.AsPeer, swarmC.AsPeer }); + swarmB.AddPeers(new[] { swarmC.AsPeer, swarmA.AsPeer }); + swarmC.AddPeers(new[] { swarmA.AsPeer, swarmB.AsPeer }); swarmB.BroadcastBlock(chainB[-1]); @@ -626,7 +627,7 @@ public async Task BroadcastBlockWithSkip() await StartAsync(minerSwarm); await StartAsync(receiverSwarm); - await BootstrapAsync(receiverSwarm, minerSwarm.AsPeer); + minerSwarm.AddPeer(receiverSwarm.AsPeer); var block1 = TestUtils.MineNext( blockChain.Genesis, @@ -672,7 +673,7 @@ public async Task BroadcastBlockWithoutGenesis() await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmB, swarmA.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); await chainA.MineBlock(swarmA.Address); swarmA.BroadcastBlock(chainA[-1]); @@ -717,7 +718,7 @@ public async Task IgnoreExistingBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmB, swarmA.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastBlock(chainA[-1]); await swarmB.BlockAppended.WaitAsync(); diff --git a/Libplanet.Tests/Net/SwarmTest.Preload.cs b/Libplanet.Tests/Net/SwarmTest.Preload.cs index 24927888cf2..6ab9b5060dd 100644 --- a/Libplanet.Tests/Net/SwarmTest.Preload.cs +++ b/Libplanet.Tests/Net/SwarmTest.Preload.cs @@ -42,7 +42,7 @@ public async Task InitialBlockDownload() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); @@ -85,7 +85,7 @@ public async Task InitialBlockDownloadStates() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); var state = receiverChain.GetState(address1); @@ -143,7 +143,7 @@ public async Task Preload() { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); _logger.Verbose("Both chains before synchronization:"); _logger.CompareBothChains( @@ -259,7 +259,7 @@ public async Task BlockDownloadTimeout() { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); Task waitTask = receiverSwarm.BlockDownloadStarted.WaitAsync(); Task preloadTask = receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15)); @@ -308,7 +308,7 @@ public async Task PreloadWithFailedActions() { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(1)); var action = new ThrowException { ThrowOnExecution = true }; @@ -393,11 +393,11 @@ public async Task PreloadFromNominer() nominerSwarm0.FindNextHashesChunkSize = 2; nominerSwarm1.FindNextHashesChunkSize = 2; - await nominerSwarm0.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + nominerSwarm0.AddPeer(minerSwarm.AsPeer); await nominerSwarm0.PreloadAsync(); - await nominerSwarm1.AddPeersAsync(new[] { nominerSwarm0.AsPeer }, null); + nominerSwarm1.AddPeer(nominerSwarm0.AsPeer); await nominerSwarm1.PreloadAsync(); - await receiverSwarm.AddPeersAsync(new[] { nominerSwarm1.AsPeer }, null); + receiverSwarm.AddPeer(nominerSwarm1.AsPeer); await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15), progress); // Await 1 second to make sure all progresses is reported. @@ -500,7 +500,7 @@ public async Task PreloadRetryWithNextPeers(int blockCount) Assert.Equal(swarm0.BlockChain.BlockHashes, swarm1.BlockChain.BlockHashes); - await receiverSwarm.AddPeersAsync(new[] { swarm0.AsPeer, swarm1.AsPeer }, null); + receiverSwarm.AddPeers(new[] { swarm0.AsPeer, swarm1.AsPeer }); Assert.Equal( new[] { swarm0.AsPeer, swarm1.AsPeer }.ToImmutableHashSet(), receiverSwarm.Peers.ToImmutableHashSet()); @@ -522,7 +522,7 @@ await receiverSwarm.PreloadAsync( Assert.Equal(swarm0.BlockChain.BlockHashes, receiverSwarm.BlockChain.BlockHashes); } - [Theory] + [Theory(Timeout = Timeout)] [InlineData(0)] [InlineData(50)] [InlineData(100)] @@ -557,7 +557,7 @@ public async Task PreloadAsyncCancellation(int cancelAfter) minerSwarm.FindNextHashesChunkSize = 2; await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); CancellationTokenSource cts = new CancellationTokenSource(); cts.CancelAfter(cancelAfter); @@ -682,7 +682,7 @@ public async Task PreloadAfterReorg() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); } finally @@ -765,7 +765,7 @@ public async Task PreloadDeleteOnlyTempChain() try { await StartAsync(minerSwarm); - await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null); + receiverSwarm.AddPeer(minerSwarm.AsPeer); await receiverSwarm.PreloadAsync(); } finally @@ -803,9 +803,7 @@ public async Task PreloadFromTheMostDifficultChain() { await StartAsync(minerSwarm1); await StartAsync(minerSwarm2); - await receiverSwarm.AddPeersAsync( - new[] { minerSwarm1.AsPeer, minerSwarm2.AsPeer }, - null); + receiverSwarm.AddPeers(new[] { minerSwarm1.AsPeer, minerSwarm2.AsPeer }); await receiverSwarm.PreloadAsync(); } finally @@ -876,8 +874,7 @@ BlockChain MakeBlockChain(Block genesisBlock) => await StartAsync(validSeedSwarm); await StartAsync(invalidSeedSwarm); - await receiverSwarm.AddPeersAsync( - new[] { validSeedSwarm.AsPeer, invalidSeedSwarm.AsPeer }, null); + receiverSwarm.AddPeers(new[] { validSeedSwarm.AsPeer, invalidSeedSwarm.AsPeer }); await receiverSwarm.PreloadAsync(); Assert.Equal(receiverChain.Tip, validSeedChain.Tip); diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs index 018b1087ca3..3fd544d86ff 100644 --- a/Libplanet.Tests/Net/SwarmTest.cs +++ b/Libplanet.Tests/Net/SwarmTest.cs @@ -96,20 +96,20 @@ public async Task HandleReconnection() { Swarm seed = CreateSwarm(); - Swarm swarmA = CreateSwarm(); - Swarm swarmB = CreateSwarm(); + var privateKey = new PrivateKey(); + Swarm swarmA = CreateSwarm(privateKey); + Swarm swarmB = CreateSwarm(privateKey); try { await StartAsync(seed); await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + seed.AddPeer(swarmA.AsPeer); await StopAsync(swarmA); - await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); + seed.AddPeer(swarmB.AsPeer); Assert.Contains(swarmB.AsPeer, seed.Peers); - Assert.Contains(seed.AsPeer, swarmB.Peers); } finally { @@ -166,51 +166,6 @@ public async Task CanWaitForRunning() Assert.False(swarm.Running); } - [Fact(Timeout = Timeout)] - public async Task AddPeersWithoutStart() - { - Swarm a = CreateSwarm(); - Swarm b = CreateSwarm(); - - try - { - await StartAsync(b); - - await a.AddPeersAsync(new Peer[] { b.AsPeer }, null); - - Assert.Contains(b.AsPeer, a.Peers); - Assert.Empty(b.Peers); - } - finally - { - await StopAsync(a); - await StopAsync(b); - } - } - - [Fact(Timeout = Timeout)] - public async Task AddPeersAsync() - { - Swarm a = CreateSwarm(); - Swarm b = CreateSwarm(); - - try - { - await StartAsync(a); - await StartAsync(b); - - await a.AddPeersAsync(new Peer[] { b.AsPeer }, null); - - Assert.Contains(a.AsPeer, b.Peers); - Assert.Contains(b.AsPeer, a.Peers); - } - finally - { - await StopAsync(a); - await StopAsync(b); - } - } - [Fact(Timeout = Timeout)] public async Task BootstrapException() { @@ -268,12 +223,13 @@ public async Task AutoConnectAfterStart() await StartAsync(swarmB); await BootstrapAsync(swarmA, swarmB.AsPeer); + await Task.Delay(1000); Assert.Contains(swarmB.AsPeer, swarmA.Peers); Assert.Empty(swarmB.Peers); await StartAsync(swarmA); - await Task.Delay(100); + await Task.Delay(1000); Assert.Contains(swarmA.AsPeer, swarmB.Peers); } finally @@ -329,7 +285,7 @@ public async Task GetBlocks() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); (long, HashDigest)[] inventories1 = ( await swarmB.GetBlockHashes( @@ -396,7 +352,7 @@ public async Task GetMultipleBlocksAtOnce() var peer = swarmA.AsPeer as BoundPeer; - await swarmB.AddPeersAsync(new[] { peer }, null); + swarmB.AddPeer(peer); Tuple>[] hashes = await swarmB.GetBlockHashes( peer, @@ -473,7 +429,7 @@ public async Task GetTx() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); List> txs = await swarmA.GetTxsAsync( @@ -574,23 +530,17 @@ public async Task ExchangeWithIceServer() await StartAsync(swarmA); await StartAsync(swarmB); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); - await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null); - await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null); + // Public-Private + await seed.BootstrapAsync(new[] { swarmA.AsPeer }, null, null); + Assert.Contains(swarmA.AsPeer, seed.Peers); - Assert.Equal( - new HashSet - { - swarmA.AsPeer as BoundPeer, - swarmB.AsPeer as BoundPeer, - }, - seed.Peers.ToHashSet()); - Assert.Equal( - new HashSet { seed.AsPeer as BoundPeer, swarmB.AsPeer as BoundPeer }, - swarmA.Peers.ToHashSet()); - Assert.Equal( - new HashSet { seed.AsPeer as BoundPeer, swarmA.AsPeer as BoundPeer }, - swarmB.Peers.ToHashSet()); + // Private-Public + await swarmB.BootstrapAsync(new[] { seed.AsPeer }, null, null); + Assert.Contains(seed.AsPeer, swarmB.Peers); + + // Private-Private + await swarmA.BootstrapAsync(new[] { swarmB.AsPeer }, null, null); + Assert.Contains(swarmB.AsPeer, swarmA.Peers); } finally { @@ -664,7 +614,7 @@ async Task MineAndBroadcast(CancellationToken cancellationToken) await StartAsync(seed); await StartAsync(swarmA); - await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null); + swarmA.AddPeer(seed.AsPeer); cts.Cancel(); await proxyTask; @@ -726,7 +676,7 @@ public async Task RemoveForkedChainWhenFillBlocksAsyncFail() { await StartAsync(swarm1); await StartAsync(swarm2); - await swarm1.AddPeersAsync(new[] { swarm2.AsPeer }, null); + swarm2.AddPeer(swarm1.AsPeer); swarm2.BroadcastBlock(block3); await swarm1.FillBlocksAsyncFailed.WaitAsync(); @@ -785,7 +735,7 @@ public async Task RenderInFork() await StartAsync(miner1); await StartAsync(miner2); - await BootstrapAsync(miner2, miner1.AsPeer); + miner2.AddPeer(miner1.AsPeer); miner2.BroadcastBlock(latest); @@ -831,7 +781,7 @@ public async Task ForkByDifficulty() await StartAsync(miner1); await StartAsync(miner2); - await BootstrapAsync(miner2, miner1.AsPeer); + miner2.AddPeer(miner1.AsPeer); miner2.BroadcastBlock(block); await miner1.BlockReceived.WaitAsync(); @@ -881,8 +831,8 @@ Swarm MakeSwarm() => CreateSwarm(TestUtils.MakeBlockChain( await StartAsync(miner1); await StartAsync(miner2); - await BootstrapAsync(miner2, miner1.AsPeer); - await BootstrapAsync(receiver, miner1.AsPeer); + miner1.AddPeers(new[] { receiver.AsPeer, miner2.AsPeer }); + miner2.AddPeers(new[] { receiver.AsPeer, miner1.AsPeer }); var t = receiver.PreloadAsync(); await miner1.BlockChain.MineBlock(miner1.Address); @@ -946,9 +896,9 @@ public async void RestageTransactionsOnceLocallyMinedAfterReorg(bool restage) await StartAsync(minerA); await StartAsync(minerB); - await BootstrapAsync(minerA, minerB.AsPeer); + minerB.AddPeer(minerA.AsPeer); - Log.Debug("Reorg occurrs."); + Log.Debug("Reorg occurs."); minerB.BroadcastBlock(blockC); await minerA.BlockAppended.WaitAsync(); @@ -1012,7 +962,7 @@ bool IsSignerValid(Transaction tx, BlockChain chain) await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmA, swarmB.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastTxs(new[] { validTx, invalidTx }); await swarmB.TxReceived.WaitAsync(); @@ -1075,7 +1025,7 @@ bool IsSignerValid(Transaction tx, BlockChain chain) await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmA, swarmB.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastTxs(new[] { tx }); await swarmB.TxReceived.WaitAsync(); @@ -1122,8 +1072,8 @@ public async Task CreateNewChainWhenBranchPointNotExist() await StartAsync(minerSwarmB); await StartAsync(receiverSwarm); - await BootstrapAsync(minerSwarmA, receiverSwarm.AsPeer); - await BootstrapAsync(minerSwarmB, receiverSwarm.AsPeer); + minerSwarmA.AddPeer(receiverSwarm.AsPeer); + minerSwarmB.AddPeer(receiverSwarm.AsPeer); // Broadcast SwarmA's first block. var b1 = await minerChainA.MineBlock(minerSwarmA.Address); @@ -1177,7 +1127,7 @@ public async Task DoNotDeleteCanonicalChainWhenBlockDownloadFailed() { await StartAsync(swarmA); await StartAsync(swarmB); - await BootstrapAsync(swarmA, swarmB.AsPeer); + swarmA.AddPeer(swarmB.AsPeer); swarmA.BroadcastBlock(block); await swarmB.FillBlocksAsyncStarted.WaitAsync(); @@ -1246,8 +1196,8 @@ BlockChain MakeGenesisChain( await StartAsync(swarmB); await StartAsync(swarmC); - await swarmB.AddPeersAsync(new[] { swarmA.AsPeer }, null); - await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); + swarmA.AddPeer(swarmC.AsPeer); var block = await swarmA.BlockChain.MineBlock(swarmA.Address); @@ -1293,9 +1243,9 @@ public async Task FindSpecificPeerAsync() await StartAsync(swarmC); await StartAsync(swarmD); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null); - await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); + swarmC.AddPeer(swarmD.AsPeer); BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync( swarmB.AsPeer.Address, @@ -1335,8 +1285,8 @@ public async Task FindSpecificPeerAsyncFail() await StartAsync(swarmB); await StartAsync(swarmC); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); await StopAsync(swarmB); @@ -1377,9 +1327,9 @@ public async Task FindSpecificPeerAsyncDepthFail() await StartAsync(swarmC); await StartAsync(swarmD); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); - await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null); - await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); + swarmB.AddPeer(swarmC.AsPeer); + swarmC.AddPeer(swarmD.AsPeer); BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync( swarmC.AsPeer.Address, @@ -1389,7 +1339,7 @@ public async Task FindSpecificPeerAsyncDepthFail() Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address); swarmA.RoutingTable.Clear(); Assert.Empty(swarmA.Peers); - await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null); + swarmA.AddPeer(swarmB.AsPeer); foundPeer = await swarmA.FindSpecificPeerAsync( swarmD.AsPeer.Address, @@ -1432,7 +1382,7 @@ public async Task DoNotFillWhenGetAllBlockAtFirstTimeFromSender() try { - await BootstrapAsync(sender, receiver.AsPeer); + sender.AddPeer(receiver.AsPeer); sender.BroadcastBlock(sender.BlockChain.Tip); @@ -1473,7 +1423,7 @@ public async Task FillWhenGetAllBlocksFromSender() try { - await BootstrapAsync(sender, receiver.AsPeer); + sender.AddPeer(receiver.AsPeer); sender.BroadcastBlock(sender.BlockChain.Tip); @@ -1508,8 +1458,8 @@ public async Task DoNotFillMultipleTimes() try { - await BootstrapAsync(sender1, receiver.AsPeer); - await BootstrapAsync(sender2, receiver.AsPeer); + sender1.AddPeer(receiver.AsPeer); + sender2.AddPeer(receiver.AsPeer); sender1.BlockChain.Append(b1); sender2.BlockChain.Append(b1); @@ -1573,7 +1523,7 @@ public async Task GetPeerChainStateAsync() await StartAsync(swarm2); await StartAsync(swarm3); - await BootstrapAsync(swarm1, swarm2.AsPeer); + swarm1.AddPeer(swarm2.AsPeer); peerChainState = await swarm1.GetPeerChainStateAsync( TimeSpan.FromSeconds(1), default); @@ -1590,7 +1540,7 @@ public async Task GetPeerChainStateAsync() peerChainState.First() ); - await BootstrapAsync(swarm1, swarm3.AsPeer); + swarm1.AddPeer(swarm3.AsPeer); peerChainState = await swarm1.GetPeerChainStateAsync( TimeSpan.FromSeconds(1), default); Assert.Equal( @@ -1623,6 +1573,7 @@ public async Task LastMessageTimestamp() Assert.Null(swarm1.LastMessageTimestamp); DateTimeOffset bootstrappedAt = DateTimeOffset.UtcNow; await BootstrapAsync(swarm2, swarm1.AsPeer); + await Task.Delay(1000); await StartAsync(swarm2); Assert.NotNull(swarm1.LastMessageTimestamp); @@ -1650,15 +1601,17 @@ public async Task Restart() { // Setup await StartAsync(swarm1); - await BootstrapAsync(swarm2, swarm1.AsPeer); await StartAsync(swarm2); - + await BootstrapAsync(swarm2, swarm1.AsPeer); await Task.Delay(1000); + Assert.True( + swarm1.Peers.Contains(swarm2.AsPeer), + "Swarm2 does not exist in swarm1's routing table after setup."); + // Restart await swarm1.StopAsync(); Assert.False(swarm1.Running); - await Task.Delay(1000); await StartAsync(swarm1); DateTimeOffset restartedAt = DateTimeOffset.UtcNow; @@ -1666,8 +1619,14 @@ public async Task Restart() await swarm1.CheckAllPeersAsync(); await swarm2.CheckAllPeersAsync(); - Assert.Contains(swarm1.AsPeer, swarm2.Peers); - Assert.Contains(swarm2.AsPeer, swarm1.Peers); + Assert.True( + swarm1.Peers.Contains(swarm2.AsPeer), + "Swarm2 does not exist in swarm1's routing table."); + Assert.True( + swarm2.Peers.Contains(swarm1.AsPeer), + "Swarm1 does not exist in swarm2's routing table."); + Assert.NotNull(swarm1.LastMessageTimestamp); + Assert.NotNull(swarm2.LastMessageTimestamp); Assert.InRange( swarm1.LastMessageTimestamp.Value, restartedAt, @@ -1830,7 +1789,7 @@ public async Task ResetBlockDemandIfNotChanged() await StartAsync(sender); await StartAsync(receiver); - await BootstrapAsync(sender, receiver.AsPeer); + sender.AddPeer(receiver.AsPeer); sender.BroadcastBlock(lowerBlock); await receiver.FillBlocksAsyncStarted.WaitAsync(); diff --git a/Libplanet/Net/Protocols/KBucket.cs b/Libplanet/Net/Protocols/KBucket.cs index ae55561b4e1..460eb0ea831 100644 --- a/Libplanet/Net/Protocols/KBucket.cs +++ b/Libplanet/Net/Protocols/KBucket.cs @@ -75,54 +75,63 @@ public void AddPeer(BoundPeer peer) var updated = DateTimeOffset.UtcNow; - if (_peerStates.ContainsKey(peer.Address)) + if (Update(peer)) { _logger.Verbose("Bucket already contains peer {Peer}", peer); + return; + } - // This done because peer's other attribute except public key might be changed. - // (eg. public IP address, endpoint) - PeerState state = _peerStates[peer.Address]; - state.Peer = peer; - state.LastUpdated = updated; + if (IsFull()) + { + _logger.Verbose("Bucket is full to add peer {Peer}", peer); + if (ReplacementCache.TryAdd(peer, updated)) + { + _logger.Verbose( + "Added {Peer} to replacement cache. (total: {Count})", + peer, + ReplacementCache.Count); + } } else { - if (IsFull()) + _logger.Verbose("Bucket does not contains peer {Peer}", peer); + _lastUpdated = updated; + if (_peerStates.TryAdd(peer.Address, new PeerState(peer, updated))) { - _logger.Verbose("Bucket is full to add peer {Peer}", peer); - if (ReplacementCache.TryAdd(peer, updated)) + _logger.Verbose("Peer {Peer} is added to bucket", peer); + _lastUpdated = updated; + + if (ReplacementCache.TryRemove(peer, out var dateTimeOffset)) { _logger.Verbose( - "Added {Peer} to replacement cache. (total: {Count})", + "Removed peer {Peer} from replacement cache. (total: {Count})", peer, ReplacementCache.Count); } } else { - _logger.Verbose("Bucket does not contains peer {Peer}", peer); - _lastUpdated = updated; - if (_peerStates.TryAdd(peer.Address, new PeerState(peer, updated))) - { - _logger.Verbose("Peer {Peer} is added to bucket", peer); - _lastUpdated = updated; - - if (ReplacementCache.TryRemove(peer, out var dateTimeOffset)) - { - _logger.Verbose( - "Removed peer {Peer} from replacement cache. (total: {Count})", - peer, - ReplacementCache.Count); - } - } - else - { - _logger.Verbose("Failed to add peer {Peer} to bucket", peer); - } + _logger.Verbose("Failed to add peer {Peer} to bucket", peer); } } } + public bool Update(BoundPeer peer) + { + if (!Contains(peer)) + { + _logger.Verbose("Bucket does not contains {Peer}. Failed to update.", peer); + return false; + } + + // This done because peer's other attribute except public key might be changed. + // (eg. public IP address, endpoint) + PeerState state = _peerStates[peer.Address]; + state.Peer = peer; + state.LastUpdated = DateTimeOffset.UtcNow; + return true; + } + public bool Contains(BoundPeer peer) { return _peerStates.ContainsKey(peer.Address); diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs index e260eeaf8fd..05d4f522686 100644 --- a/Libplanet/Net/Protocols/KademliaProtocol.cs +++ b/Libplanet/Net/Protocols/KademliaProtocol.cs @@ -26,6 +26,9 @@ public class KademliaProtocol : IProtocol private readonly ILogger _logger; + private readonly ConcurrentDictionary _cache; + private readonly int _cacheSize; + /// /// Creates a instance. /// @@ -57,6 +60,8 @@ public KademliaProtocol( requestTimeout ?? TimeSpan.FromMilliseconds(5000); _transport.ProcessMessageHandler += ProcessMessageHandler; + _cache = new ConcurrentDictionary(); + _cacheSize = int.MaxValue; } /// @@ -180,7 +185,6 @@ public async Task AddPeersAsync( { _logger.Debug( $"Timeout occurred during {nameof(AddPeersAsync)}() after {timeout}."); - throw; } catch (TaskCanceledException) { @@ -292,17 +296,14 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken try { _logger.Verbose("Check peer {Peer}.", replacement); - - await PingAsync(replacement, _requestTimeout, cancellationToken); _table.RemoveCache(replacement); - Update(replacement); + await PingAsync(replacement, _requestTimeout, cancellationToken); } catch (PingTimeoutException) { _logger.Verbose( "Remove stale peer {Peer} from replacement cache.", replacement); - _table.RemoveCache(replacement); } } } @@ -442,6 +443,14 @@ internal async Task PingAsync( try { _logger.Verbose("Trying to ping async to {Peer}.", target); + if (_cache.ContainsKey(target.Address) || _cache.Count > _cacheSize) + { + _logger.Verbose( + "Request already in cache or cache size limit exceeded. Ping cancelled."); + return; + } + + _cache[target.Address] = target; Message reply = await _transport.SendMessageWithReplyAsync( target, new Ping(), @@ -459,7 +468,16 @@ internal async Task PingAsync( throw new InvalidMessageException("Cannot receive pong from self", pong); } - Update(target); + if (!pong.Remote.Address.Equals(target.Address)) + { + // Verify sender + throw new InvalidMessageException( + "Remote peer of Pong does not match with the target peer. " + + $"(Expected: {target.Address}, Actual: {pong.Remote.Address})", + pong); + } + + _table.AddPeer(target); } catch (TimeoutException) { @@ -479,6 +497,10 @@ internal async Task PingAsync( $"Different AppProtocolVersion encountered at {nameof(PingAsync)}()."); throw; } + finally + { + _cache.TryRemove(target.Address, out _); + } } private void ProcessMessageHandler(object target, Message message) @@ -529,30 +551,42 @@ private async Task ValidateAsync( RemovePeer(peer); throw; } + catch (InvalidMessageException) + { + _logger.Verbose("Peer {Peer} is invalid, removing...", peer); + RemovePeer(peer); + throw; + } } /// /// Updates routing table when receiving a message. If corresponding bucket - /// for remote peer is not full, just adds given . + /// for remote peer is not full, just adds given . /// Otherwise, checks aliveness of the least recently used (LRU) peer - /// and determine evict LRU peer or discard given . + /// and determine evict LRU peer or discard given . /// - /// to update. - /// - /// Thrown when is null. + /// to update. + /// + /// Thrown when is null or it is not a + /// . /// - private void Update(Peer rawPeer) + private void Update(Peer peer) { - _logger.Verbose($"Try to {nameof(Update)}() {{Peer}}.", rawPeer); - if (rawPeer is null) + _logger.Verbose($"Try to {nameof(Update)}() {{Peer}}.", peer); + if (!(peer is BoundPeer boundPeer)) { - throw new ArgumentNullException(nameof(rawPeer)); + _logger.Verbose("Given peer {Peer} is not bounded.", peer); + return; } - if (rawPeer is BoundPeer peer) + // Don't update peer without endpoint or with different appProtocolVersion. + if (_table.Contains(boundPeer)) { - // Don't update peer without endpoint or with different appProtocolVersion. - _table.AddPeer(peer); + _table.UpdatePeer(boundPeer); + } + else + { + _ = PingAsync(boundPeer, _requestTimeout, default); } } @@ -673,10 +707,14 @@ private async Task> GetNeighbors( // Send pong back to remote private void ReceivePing(Ping ping) { + if (ping.Remote is null) + { + throw new ArgumentNullException(nameof(ping.Remote)); + } + if (ping.Remote.Address.Equals(_address)) { - throw new ArgumentException( - "Cannot receive ping from self"); + throw new ArgumentException("Cannot receive ping from self"); } var pong = new Pong diff --git a/Libplanet/Net/Protocols/RoutingTable.cs b/Libplanet/Net/Protocols/RoutingTable.cs index 038c695232a..7e8a8d989b7 100644 --- a/Libplanet/Net/Protocols/RoutingTable.cs +++ b/Libplanet/Net/Protocols/RoutingTable.cs @@ -132,6 +132,22 @@ public void AddPeer(BoundPeer peer) BucketOf(peer).AddPeer(peer); } + public void UpdatePeer(BoundPeer peer) + { + if (peer is null) + { + throw new ArgumentNullException(nameof(peer)); + } + + if (peer.Address.Equals(_address)) + { + throw new ArgumentException("Cannot add update self."); + } + + _logger.Debug("Updating peer {Peer} from routing table.", peer); + BucketOf(peer).Update(peer); + } + public bool RemovePeer(BoundPeer peer) { if (peer is null) diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs index baf4344c264..b0126affea0 100644 --- a/Libplanet/Net/Swarm.cs +++ b/Libplanet/Net/Swarm.cs @@ -908,24 +908,6 @@ public async Task CheckAllPeersAsync( await kademliaProtocol.CheckAllPeersAsync(timeout, cancellationToken); } - internal async Task AddPeersAsync( - IEnumerable peers, - TimeSpan? timeout, - CancellationToken cancellationToken = default(CancellationToken)) - { - if (Transport is null) - { - throw new ArgumentNullException(nameof(Transport)); - } - - if (cancellationToken == default(CancellationToken)) - { - cancellationToken = _cancellationToken; - } - - await PeerDiscovery.AddPeersAsync(peers, timeout, cancellationToken); - } - // FIXME: This would be better if it's merged with GetDemandBlockHashes internal async IAsyncEnumerable>> GetBlockHashes( BoundPeer peer,