diff --git a/Libplanet.Benchmarks/SwarmBenchmark.cs b/Libplanet.Benchmarks/SwarmBenchmark.cs
index ebb31b270cb..798bc1ef439 100644
--- a/Libplanet.Benchmarks/SwarmBenchmark.cs
+++ b/Libplanet.Benchmarks/SwarmBenchmark.cs
@@ -75,8 +75,7 @@ public void InitializeSwarms()
for (int i = 0; i < SwarmNumber - 1; i++)
{
- _swarms[i].AddPeersAsync(new[] { _swarms[i + 1].AsPeer }, null)
- .Wait(WaitTimeout);
+ _swarms[i].RoutingTable.AddPeer(_swarms[i + 1].AsPeer as BoundPeer);
}
_blockChains[0].Append(_blocks[1]);
diff --git a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs
index dedbf310661..a6d04f28829 100644
--- a/Libplanet.Tests/Net/Protocols/ProtocolTest.cs
+++ b/Libplanet.Tests/Net/Protocols/ProtocolTest.cs
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
+using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Crypto;
@@ -15,7 +16,9 @@ namespace Libplanet.Tests.Net.Protocols
public class ProtocolTest
{
private const int Timeout = 60 * 1000;
- private readonly Dictionary
_transports;
+ private readonly Dictionary _transports;
+
+ private int _assignedPort;
public ProtocolTest(ITestOutputHelper output)
{
@@ -28,7 +31,8 @@ public ProtocolTest(ITestOutputHelper output)
.CreateLogger()
.ForContext();
- _transports = new Dictionary();
+ _transports = new Dictionary();
+ _assignedPort = 1234;
}
[Fact]
@@ -82,39 +86,9 @@ public async Task Ping()
await transportA.MessageReceived.WaitAsync();
await Task.Delay(100);
- Assert.Single(transportA.ReceivedMessages);
- Assert.Single(transportB.ReceivedMessages);
- Assert.Contains(transportA.AsPeer, transportB.Peers);
- }
- finally
- {
- await transportA.StopAsync(TimeSpan.Zero);
- await transportB.StopAsync(TimeSpan.Zero);
- }
- }
-
- [Fact(Timeout = Timeout)]
- public async Task PingTwice()
- {
- var transportA = CreateTestTransport();
- var transportB = CreateTestTransport();
-
- try
- {
- await StartTestTransportAsync(transportA);
- await StartTestTransportAsync(transportB);
-
- transportA.SendPing(transportB.AsPeer);
- await transportA.MessageReceived.WaitAsync();
- await transportB.MessageReceived.WaitAsync();
- transportB.SendPing(transportA.AsPeer);
- await transportA.MessageReceived.WaitAsync();
- await transportB.MessageReceived.WaitAsync();
-
Assert.Equal(2, transportA.ReceivedMessages.Count);
Assert.Equal(2, transportB.ReceivedMessages.Count);
Assert.Contains(transportA.AsPeer, transportB.Peers);
- Assert.Contains(transportB.AsPeer, transportA.Peers);
}
finally
{
@@ -181,7 +155,9 @@ public async Task BootstrapAsyncTest()
await StartTestTransportAsync(transportC);
await transportB.BootstrapAsync(new[] { transportA.AsPeer });
+ await Task.Delay(100);
await transportC.BootstrapAsync(new[] { transportA.AsPeer });
+ await Task.Delay(100);
Assert.Contains(transportB.AsPeer, transportC.Peers);
Assert.Contains(transportC.AsPeer, transportB.Peers);
@@ -236,9 +212,9 @@ public async Task RoutingTableFull()
await StartTestTransportAsync(transportB);
await StartTestTransportAsync(transportC);
- await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
- await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
- await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportA.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportB.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportC.AsPeer }, null);
Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
@@ -264,10 +240,10 @@ public async Task ReplacementCache()
await StartTestTransportAsync(transportB);
await StartTestTransportAsync(transportC);
- await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
- await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportA.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportB.AsPeer }, null);
await Task.Delay(100);
- await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportC.AsPeer }, null);
Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
@@ -300,8 +276,8 @@ public async Task RemoveDeadReplacementCache()
await StartTestTransportAsync(transportB);
await StartTestTransportAsync(transportC);
- await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
- await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportA.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportB.AsPeer }, null);
Assert.Single(transport.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
@@ -310,7 +286,7 @@ public async Task RemoveDeadReplacementCache()
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
- await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
+ await transport.AddPeersAsync(new[] { transportC.AsPeer }, null);
await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));
@@ -347,6 +323,7 @@ public async Task BroadcastMessage(int count)
}
Log.Debug("Bootstrap completed.");
+ await Task.Delay(1000);
var tasks =
transports.Select(transport => transport.WaitForTestMessageWithData("foo"));
@@ -390,7 +367,7 @@ public async Task BroadcastGuarantee()
});
var seed = CreateTestTransport(privateKey0);
- var t1 = CreateTestTransport(privateKey1, true);
+ var t1 = CreateTestTransport(privateKey1, blockBroadcast: true);
var t2 = CreateTestTransport(privateKey2);
await StartTestTransportAsync(seed);
await StartTestTransportAsync(t1);
@@ -486,8 +463,45 @@ public async Task DoNotBroadcastToSourcePeer()
}
}
+ [Fact(Timeout = Timeout)]
+ public async Task DoNotAddMaliciousPeerToTable()
+ {
+ var actualPrivateKey = new PrivateKey();
+ var maliciousPrivateKey = new PrivateKey();
+ const int port = 1250;
+ TestTransport transportA = CreateTestTransport();
+ TestTransport transportB =
+ CreateTestTransport(privateKey: actualPrivateKey, listenPort: port);
+
+ try
+ {
+ await StartTestTransportAsync(transportA);
+ await StartTestTransportAsync(transportB);
+
+ var bytes = new byte[10];
+ new Random().NextBytes(bytes);
+ var maliciousPeer = new BoundPeer(
+ maliciousPrivateKey.PublicKey,
+ (transportB.AsPeer as BoundPeer)?.EndPoint);
+ var kp = (KademliaProtocol)transportA.Protocol;
+
+ // Remote peer of Pong does not match with the target peer.
+ await Assert.ThrowsAsync(async () =>
+ await kp.PingAsync(maliciousPeer, null, default));
+
+ Assert.DoesNotContain(maliciousPeer, transportA.Peers);
+ }
+ finally
+ {
+ await transportA.StopAsync(TimeSpan.Zero);
+ await transportB.StopAsync(TimeSpan.Zero);
+ }
+ }
+
private TestTransport CreateTestTransport(
PrivateKey privateKey = null,
+ string host = null,
+ int? listenPort = null,
bool blockBroadcast = false,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize,
@@ -496,6 +510,8 @@ private TestTransport CreateTestTransport(
return new TestTransport(
_transports,
privateKey ?? new PrivateKey(),
+ host ?? IPAddress.Loopback.ToString(),
+ listenPort ?? _assignedPort++,
blockBroadcast,
tableSize,
bucketSize,
diff --git a/Libplanet.Tests/Net/Protocols/TestTransport.cs b/Libplanet.Tests/Net/Protocols/TestTransport.cs
index 32f216b2818..22492050550 100644
--- a/Libplanet.Tests/Net/Protocols/TestTransport.cs
+++ b/Libplanet.Tests/Net/Protocols/TestTransport.cs
@@ -21,13 +21,15 @@ internal class TestTransport : ITransport
private static readonly AppProtocolVersion AppProtocolVersion =
AppProtocolVersion.Sign(VersionSigner, 1);
- private readonly Dictionary _transports;
+ private readonly Dictionary _transports;
private readonly ILogger _logger;
- private readonly ConcurrentDictionary _peersToReply;
+ private readonly ConcurrentDictionary _addressToReply;
private readonly ConcurrentDictionary _replyToReceive;
private readonly AsyncCollection _requests;
private readonly List _ignoreTestMessageWithData;
private readonly PrivateKey _privateKey;
+ private readonly string _host;
+ private readonly int _listenPort;
private readonly Random _random;
private readonly bool _blockBroadcast;
@@ -35,25 +37,29 @@ internal class TestTransport : ITransport
private TimeSpan _networkDelay;
public TestTransport(
- Dictionary transports,
+ Dictionary transports,
PrivateKey privateKey,
+ string host,
+ int listenPort,
bool blockBroadcast,
int tableSize,
int bucketSize,
TimeSpan? networkDelay)
{
_privateKey = privateKey;
+ _host = host;
+ _listenPort = listenPort;
_blockBroadcast = blockBroadcast;
var loggerId = _privateKey.ToAddress().ToHex();
_logger = Log.ForContext()
.ForContext("Address", loggerId);
- _peersToReply = new ConcurrentDictionary();
+ _addressToReply = new ConcurrentDictionary();
_replyToReceive = new ConcurrentDictionary();
ReceivedMessages = new ConcurrentBag();
MessageReceived = new AsyncAutoResetEvent();
_transports = transports;
- _transports[privateKey.ToAddress()] = this;
+ _transports[ToAddress(AsPeer as BoundPeer)] = this;
_networkDelay = networkDelay ?? TimeSpan.Zero;
_requests = new AsyncCollection();
_ignoreTestMessageWithData = new List();
@@ -70,7 +76,7 @@ public TestTransport(
public Peer AsPeer => new BoundPeer(
_privateKey.PublicKey,
- new DnsEndPoint("localhost", 1234));
+ new DnsEndPoint(_host, _listenPort));
public IEnumerable Peers => Table.Peers;
@@ -390,8 +396,8 @@ public void ReplyMessage(Message message)
Task.Run(async () =>
{
await Task.Delay(_networkDelay);
- _transports[_peersToReply[message.Identity]].ReceiveReply(message);
- _peersToReply.TryRemove(message.Identity, out Address addr);
+ _transports[_addressToReply[message.Identity]].ReceiveReply(message);
+ _addressToReply.TryRemove(message.Identity, out string addr);
});
}
@@ -420,6 +426,11 @@ public bool ReceivedTestMessageOfData(string data)
return ReceivedMessages.OfType().Any(msg => msg.Data == data);
}
+ private string ToAddress(BoundPeer peer)
+ {
+ return $"{peer.EndPoint.Host}:{peer.EndPoint.Port}";
+ }
+
private void ReceiveMessage(Message message)
{
if (_swarmCancellationTokenSource.IsCancellationRequested)
@@ -451,7 +462,7 @@ private void ReceiveMessage(Message message)
}
else
{
- _peersToReply[message.Identity] = boundPeer.Address;
+ _addressToReply[message.Identity] = ToAddress(boundPeer);
}
if (message is Ping)
@@ -487,7 +498,7 @@ private async Task ProcessRuntime(CancellationToken cancellationToken)
req.Message,
req.Message.Identity,
req.Target);
- _transports[req.Target.Address].ReceiveMessage(req.Message);
+ _transports[ToAddress(req.Target)].ReceiveMessage(req.Message);
}
else
{
diff --git a/Libplanet.Tests/Net/SwarmExtensions.cs b/Libplanet.Tests/Net/SwarmExtensions.cs
new file mode 100644
index 00000000000..6a55d641de5
--- /dev/null
+++ b/Libplanet.Tests/Net/SwarmExtensions.cs
@@ -0,0 +1,31 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using Libplanet.Action;
+using Libplanet.Net;
+
+namespace Libplanet.Tests.Net
+{
+ internal static class SwarmExtensions
+ {
+ public static void AddPeer(this Swarm swarm, Peer peer)
+ where T : IAction, new()
+ {
+ if (swarm.RoutingTable is null)
+ {
+ throw new NullReferenceException(nameof(swarm.RoutingTable));
+ }
+
+ swarm.RoutingTable.AddPeer(peer as BoundPeer);
+ }
+
+ public static void AddPeers(this Swarm swarm, IEnumerable peers)
+ where T : IAction, new()
+ {
+ foreach (var peer in peers.Where(p => p is BoundPeer _).Select(p => p as BoundPeer))
+ {
+ swarm.AddPeer(peer);
+ }
+ }
+ }
+}
diff --git a/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs b/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs
index 5e8c9807145..8b354ece7b3 100644
--- a/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs
+++ b/Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs
@@ -32,11 +32,12 @@ public async Task DetectAppProtocolVersion()
var peers = new[] { c.AsPeer, d.AsPeer };
- foreach (var peer in peers)
- {
- await a.AddPeersAsync(new[] { peer }, null);
- await b.AddPeersAsync(new[] { peer }, null);
- }
+ await a.BootstrapAsync(new[] { c.AsPeer }, null, null);
+ await Assert.ThrowsAsync(
+ async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null));
+ await Assert.ThrowsAsync(
+ async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null));
+ await b.BootstrapAsync(new[] { d.AsPeer }, null, null);
Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray());
Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray());
@@ -148,13 +149,20 @@ AppProtocolVersion localVersion
await StartAsync(e);
await StartAsync(f);
- var peers = new[] { c.AsPeer, d.AsPeer, e.AsPeer, f.AsPeer };
-
- foreach (var peer in peers)
- {
- await a.AddPeersAsync(new[] { peer }, null);
- await b.AddPeersAsync(new[] { peer }, null);
- }
+ await a.BootstrapAsync(new[] { c.AsPeer }, null, null);
+ await Assert.ThrowsAsync(
+ async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null));
+ await Assert.ThrowsAsync(
+ async () => await a.BootstrapAsync(new[] { e.AsPeer }, null, null));
+ await Assert.ThrowsAsync(
+ async () => await a.BootstrapAsync(new[] { f.AsPeer }, null, null));
+ await Assert.ThrowsAsync(
+ async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null));
+ await b.BootstrapAsync(new[] { d.AsPeer }, null, null);
+ await Assert.ThrowsAsync(
+ async () => await b.BootstrapAsync(new[] { e.AsPeer }, null, null));
+ await Assert.ThrowsAsync(
+ async () => await b.BootstrapAsync(new[] { f.AsPeer }, null, null));
Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray());
Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray());
diff --git a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs
index c40a9b824a1..aab1c0b45c6 100644
--- a/Libplanet.Tests/Net/SwarmTest.Broadcast.cs
+++ b/Libplanet.Tests/Net/SwarmTest.Broadcast.cs
@@ -47,7 +47,7 @@ public async Task BroadcastBlockToReconnectedPeer()
Assert.Equal(swarmA.AsPeer, swarmB.AsPeer);
- await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null);
+ await swarmA.BootstrapAsync(new[] { seed.AsPeer }, null, null);
await StopAsync(swarmA);
await seed.PeerDiscovery.RefreshTableAsync(
TimeSpan.Zero,
@@ -55,7 +55,7 @@ await seed.PeerDiscovery.RefreshTableAsync(
Assert.DoesNotContain(swarmA.AsPeer, seed.Peers);
- await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null);
+ await swarmB.BootstrapAsync(new[] { seed.AsPeer }, null, null);
// This is added for context switching.
await Task.Delay(100);
@@ -107,7 +107,7 @@ public async Task BroadcastIgnoreFromDifferentGenesisHash()
await StartAsync(receiverSwarm);
await StartAsync(seedSwarm);
- await receiverSwarm.AddPeersAsync(new[] { seedSwarm.AsPeer }, null);
+ seedSwarm.AddPeer(receiverSwarm.AsPeer);
Block block = await seedChain.MineBlock(seedSwarm.Address);
seedSwarm.BroadcastBlock(block);
while (!((NetMQTransport)receiverSwarm.Transport).MessageHistory
@@ -178,7 +178,7 @@ CancellationToken cancellationToken
await StartAsync(a);
await StartAsync(b);
- await a.AddPeersAsync(new[] { b.AsPeer }, null);
+ a.AddPeer(b.AsPeer);
var minerCanceller = new CancellationTokenSource();
Task miningA = CreateMiner(a, chainA, 5000, minerCanceller.Token);
@@ -228,9 +228,9 @@ public async Task BroadcastTx()
await StartAsync(swarmB);
await StartAsync(swarmC);
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
- await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null);
- await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
+ swarmB.AddPeer(swarmC.AsPeer);
+ swarmC.AddPeer(swarmA.AsPeer);
swarmA.BroadcastTxs(new[] { tx });
@@ -272,7 +272,7 @@ public async Task BroadcastTxWhileMining()
await StartAsync(swarmA);
await StartAsync(swarmC);
- await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null);
+ swarmA.AddPeer(swarmC.AsPeer);
for (var i = 0; i < 100; i++)
{
@@ -333,7 +333,7 @@ public async Task BroadcastTxAsync()
await StartAsync(swarmC);
// Broadcast tx swarmA to swarmB
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
await swarmB.TxReceived.WaitAsync();
Assert.Equal(tx, chainB.GetTransaction(tx.Id));
@@ -341,7 +341,7 @@ public async Task BroadcastTxAsync()
await StopAsync(swarmA);
// Re-Broadcast received tx swarmB to swarmC
- await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null);
+ swarmB.AddPeer(swarmC.AsPeer);
await swarmC.TxReceived.WaitAsync();
Assert.Equal(tx, chainC.GetTransaction(tx.Id));
@@ -457,7 +457,7 @@ public async Task DoNotRebroadcastTxsWithLowerNonce()
Assert.Equal(1, tx2.Nonce);
await StartAsync(swarmA);
await StartAsync(swarmB);
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
swarmA.BroadcastTxs(new[] { tx1, tx2 });
await swarmB.TxReceived.WaitAsync();
Assert.Equal(
@@ -482,8 +482,8 @@ public async Task DoNotRebroadcastTxsWithLowerNonce()
Assert.Equal(2, tx4.Nonce);
await StartAsync(swarmC);
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
- await swarmB.AddPeersAsync(new[] { swarmC.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
+ swarmB.AddPeer(swarmC.AsPeer);
swarmA.BroadcastTxs(new[] { tx3, tx4 });
await swarmC.TxReceived.WaitAsync();
@@ -539,8 +539,9 @@ public async Task CanBroadcastBlock()
await StartAsync(swarmB);
await StartAsync(swarmC);
- await BootstrapAsync(swarmB, swarmA.AsPeer);
- await BootstrapAsync(swarmC, swarmA.AsPeer);
+ swarmA.AddPeers(new[] { swarmB.AsPeer, swarmC.AsPeer });
+ swarmB.AddPeers(new[] { swarmC.AsPeer, swarmA.AsPeer });
+ swarmC.AddPeers(new[] { swarmA.AsPeer, swarmB.AsPeer });
swarmB.BroadcastBlock(chainB[-1]);
@@ -626,7 +627,7 @@ public async Task BroadcastBlockWithSkip()
await StartAsync(minerSwarm);
await StartAsync(receiverSwarm);
- await BootstrapAsync(receiverSwarm, minerSwarm.AsPeer);
+ minerSwarm.AddPeer(receiverSwarm.AsPeer);
var block1 = TestUtils.MineNext(
blockChain.Genesis,
@@ -672,7 +673,7 @@ public async Task BroadcastBlockWithoutGenesis()
await StartAsync(swarmA);
await StartAsync(swarmB);
- await BootstrapAsync(swarmB, swarmA.AsPeer);
+ swarmA.AddPeer(swarmB.AsPeer);
await chainA.MineBlock(swarmA.Address);
swarmA.BroadcastBlock(chainA[-1]);
@@ -717,7 +718,7 @@ public async Task IgnoreExistingBlocks()
await StartAsync(swarmA);
await StartAsync(swarmB);
- await BootstrapAsync(swarmB, swarmA.AsPeer);
+ swarmA.AddPeer(swarmB.AsPeer);
swarmA.BroadcastBlock(chainA[-1]);
await swarmB.BlockAppended.WaitAsync();
diff --git a/Libplanet.Tests/Net/SwarmTest.Preload.cs b/Libplanet.Tests/Net/SwarmTest.Preload.cs
index 24927888cf2..6ab9b5060dd 100644
--- a/Libplanet.Tests/Net/SwarmTest.Preload.cs
+++ b/Libplanet.Tests/Net/SwarmTest.Preload.cs
@@ -42,7 +42,7 @@ public async Task InitialBlockDownload()
try
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
await receiverSwarm.PreloadAsync();
@@ -85,7 +85,7 @@ public async Task InitialBlockDownloadStates()
try
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
await receiverSwarm.PreloadAsync();
var state = receiverChain.GetState(address1);
@@ -143,7 +143,7 @@ public async Task Preload()
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
_logger.Verbose("Both chains before synchronization:");
_logger.CompareBothChains(
@@ -259,7 +259,7 @@ public async Task BlockDownloadTimeout()
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
Task waitTask = receiverSwarm.BlockDownloadStarted.WaitAsync();
Task preloadTask = receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15));
@@ -308,7 +308,7 @@ public async Task PreloadWithFailedActions()
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(1));
var action = new ThrowException { ThrowOnExecution = true };
@@ -393,11 +393,11 @@ public async Task PreloadFromNominer()
nominerSwarm0.FindNextHashesChunkSize = 2;
nominerSwarm1.FindNextHashesChunkSize = 2;
- await nominerSwarm0.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ nominerSwarm0.AddPeer(minerSwarm.AsPeer);
await nominerSwarm0.PreloadAsync();
- await nominerSwarm1.AddPeersAsync(new[] { nominerSwarm0.AsPeer }, null);
+ nominerSwarm1.AddPeer(nominerSwarm0.AsPeer);
await nominerSwarm1.PreloadAsync();
- await receiverSwarm.AddPeersAsync(new[] { nominerSwarm1.AsPeer }, null);
+ receiverSwarm.AddPeer(nominerSwarm1.AsPeer);
await receiverSwarm.PreloadAsync(TimeSpan.FromSeconds(15), progress);
// Await 1 second to make sure all progresses is reported.
@@ -500,7 +500,7 @@ public async Task PreloadRetryWithNextPeers(int blockCount)
Assert.Equal(swarm0.BlockChain.BlockHashes, swarm1.BlockChain.BlockHashes);
- await receiverSwarm.AddPeersAsync(new[] { swarm0.AsPeer, swarm1.AsPeer }, null);
+ receiverSwarm.AddPeers(new[] { swarm0.AsPeer, swarm1.AsPeer });
Assert.Equal(
new[] { swarm0.AsPeer, swarm1.AsPeer }.ToImmutableHashSet(),
receiverSwarm.Peers.ToImmutableHashSet());
@@ -522,7 +522,7 @@ await receiverSwarm.PreloadAsync(
Assert.Equal(swarm0.BlockChain.BlockHashes, receiverSwarm.BlockChain.BlockHashes);
}
- [Theory]
+ [Theory(Timeout = Timeout)]
[InlineData(0)]
[InlineData(50)]
[InlineData(100)]
@@ -557,7 +557,7 @@ public async Task PreloadAsyncCancellation(int cancelAfter)
minerSwarm.FindNextHashesChunkSize = 2;
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
CancellationTokenSource cts = new CancellationTokenSource();
cts.CancelAfter(cancelAfter);
@@ -682,7 +682,7 @@ public async Task PreloadAfterReorg()
try
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
await receiverSwarm.PreloadAsync();
}
finally
@@ -765,7 +765,7 @@ public async Task PreloadDeleteOnlyTempChain()
try
{
await StartAsync(minerSwarm);
- await receiverSwarm.AddPeersAsync(new[] { minerSwarm.AsPeer }, null);
+ receiverSwarm.AddPeer(minerSwarm.AsPeer);
await receiverSwarm.PreloadAsync();
}
finally
@@ -803,9 +803,7 @@ public async Task PreloadFromTheMostDifficultChain()
{
await StartAsync(minerSwarm1);
await StartAsync(minerSwarm2);
- await receiverSwarm.AddPeersAsync(
- new[] { minerSwarm1.AsPeer, minerSwarm2.AsPeer },
- null);
+ receiverSwarm.AddPeers(new[] { minerSwarm1.AsPeer, minerSwarm2.AsPeer });
await receiverSwarm.PreloadAsync();
}
finally
@@ -876,8 +874,7 @@ BlockChain MakeBlockChain(Block genesisBlock) =>
await StartAsync(validSeedSwarm);
await StartAsync(invalidSeedSwarm);
- await receiverSwarm.AddPeersAsync(
- new[] { validSeedSwarm.AsPeer, invalidSeedSwarm.AsPeer }, null);
+ receiverSwarm.AddPeers(new[] { validSeedSwarm.AsPeer, invalidSeedSwarm.AsPeer });
await receiverSwarm.PreloadAsync();
Assert.Equal(receiverChain.Tip, validSeedChain.Tip);
diff --git a/Libplanet.Tests/Net/SwarmTest.cs b/Libplanet.Tests/Net/SwarmTest.cs
index 018b1087ca3..3fd544d86ff 100644
--- a/Libplanet.Tests/Net/SwarmTest.cs
+++ b/Libplanet.Tests/Net/SwarmTest.cs
@@ -96,20 +96,20 @@ public async Task HandleReconnection()
{
Swarm seed = CreateSwarm();
- Swarm swarmA = CreateSwarm();
- Swarm swarmB = CreateSwarm();
+ var privateKey = new PrivateKey();
+ Swarm swarmA = CreateSwarm(privateKey);
+ Swarm swarmB = CreateSwarm(privateKey);
try
{
await StartAsync(seed);
await StartAsync(swarmA);
await StartAsync(swarmB);
- await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null);
+ seed.AddPeer(swarmA.AsPeer);
await StopAsync(swarmA);
- await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null);
+ seed.AddPeer(swarmB.AsPeer);
Assert.Contains(swarmB.AsPeer, seed.Peers);
- Assert.Contains(seed.AsPeer, swarmB.Peers);
}
finally
{
@@ -166,51 +166,6 @@ public async Task CanWaitForRunning()
Assert.False(swarm.Running);
}
- [Fact(Timeout = Timeout)]
- public async Task AddPeersWithoutStart()
- {
- Swarm a = CreateSwarm();
- Swarm b = CreateSwarm();
-
- try
- {
- await StartAsync(b);
-
- await a.AddPeersAsync(new Peer[] { b.AsPeer }, null);
-
- Assert.Contains(b.AsPeer, a.Peers);
- Assert.Empty(b.Peers);
- }
- finally
- {
- await StopAsync(a);
- await StopAsync(b);
- }
- }
-
- [Fact(Timeout = Timeout)]
- public async Task AddPeersAsync()
- {
- Swarm a = CreateSwarm();
- Swarm b = CreateSwarm();
-
- try
- {
- await StartAsync(a);
- await StartAsync(b);
-
- await a.AddPeersAsync(new Peer[] { b.AsPeer }, null);
-
- Assert.Contains(a.AsPeer, b.Peers);
- Assert.Contains(b.AsPeer, a.Peers);
- }
- finally
- {
- await StopAsync(a);
- await StopAsync(b);
- }
- }
-
[Fact(Timeout = Timeout)]
public async Task BootstrapException()
{
@@ -268,12 +223,13 @@ public async Task AutoConnectAfterStart()
await StartAsync(swarmB);
await BootstrapAsync(swarmA, swarmB.AsPeer);
+ await Task.Delay(1000);
Assert.Contains(swarmB.AsPeer, swarmA.Peers);
Assert.Empty(swarmB.Peers);
await StartAsync(swarmA);
- await Task.Delay(100);
+ await Task.Delay(1000);
Assert.Contains(swarmA.AsPeer, swarmB.Peers);
}
finally
@@ -329,7 +285,7 @@ public async Task GetBlocks()
await StartAsync(swarmA);
await StartAsync(swarmB);
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
(long, HashDigest)[] inventories1 = (
await swarmB.GetBlockHashes(
@@ -396,7 +352,7 @@ public async Task GetMultipleBlocksAtOnce()
var peer = swarmA.AsPeer as BoundPeer;
- await swarmB.AddPeersAsync(new[] { peer }, null);
+ swarmB.AddPeer(peer);
Tuple>[] hashes = await swarmB.GetBlockHashes(
peer,
@@ -473,7 +429,7 @@ public async Task GetTx()
await StartAsync(swarmA);
await StartAsync(swarmB);
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
List> txs =
await swarmA.GetTxsAsync(
@@ -574,23 +530,17 @@ public async Task ExchangeWithIceServer()
await StartAsync(swarmA);
await StartAsync(swarmB);
- await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null);
- await swarmB.AddPeersAsync(new[] { seed.AsPeer }, null);
- await swarmA.AddPeersAsync(new[] { swarmB.AsPeer }, null);
+ // Public-Private
+ await seed.BootstrapAsync(new[] { swarmA.AsPeer }, null, null);
+ Assert.Contains(swarmA.AsPeer, seed.Peers);
- Assert.Equal(
- new HashSet
- {
- swarmA.AsPeer as BoundPeer,
- swarmB.AsPeer as BoundPeer,
- },
- seed.Peers.ToHashSet());
- Assert.Equal(
- new HashSet { seed.AsPeer as BoundPeer, swarmB.AsPeer as BoundPeer },
- swarmA.Peers.ToHashSet());
- Assert.Equal(
- new HashSet { seed.AsPeer as BoundPeer, swarmA.AsPeer as BoundPeer },
- swarmB.Peers.ToHashSet());
+ // Private-Public
+ await swarmB.BootstrapAsync(new[] { seed.AsPeer }, null, null);
+ Assert.Contains(seed.AsPeer, swarmB.Peers);
+
+ // Private-Private
+ await swarmA.BootstrapAsync(new[] { swarmB.AsPeer }, null, null);
+ Assert.Contains(swarmB.AsPeer, swarmA.Peers);
}
finally
{
@@ -664,7 +614,7 @@ async Task MineAndBroadcast(CancellationToken cancellationToken)
await StartAsync(seed);
await StartAsync(swarmA);
- await swarmA.AddPeersAsync(new[] { seed.AsPeer }, null);
+ swarmA.AddPeer(seed.AsPeer);
cts.Cancel();
await proxyTask;
@@ -726,7 +676,7 @@ public async Task RemoveForkedChainWhenFillBlocksAsyncFail()
{
await StartAsync(swarm1);
await StartAsync(swarm2);
- await swarm1.AddPeersAsync(new[] { swarm2.AsPeer }, null);
+ swarm2.AddPeer(swarm1.AsPeer);
swarm2.BroadcastBlock(block3);
await swarm1.FillBlocksAsyncFailed.WaitAsync();
@@ -785,7 +735,7 @@ public async Task RenderInFork()
await StartAsync(miner1);
await StartAsync(miner2);
- await BootstrapAsync(miner2, miner1.AsPeer);
+ miner2.AddPeer(miner1.AsPeer);
miner2.BroadcastBlock(latest);
@@ -831,7 +781,7 @@ public async Task ForkByDifficulty()
await StartAsync(miner1);
await StartAsync(miner2);
- await BootstrapAsync(miner2, miner1.AsPeer);
+ miner2.AddPeer(miner1.AsPeer);
miner2.BroadcastBlock(block);
await miner1.BlockReceived.WaitAsync();
@@ -881,8 +831,8 @@ Swarm MakeSwarm() => CreateSwarm(TestUtils.MakeBlockChain(
await StartAsync(miner1);
await StartAsync(miner2);
- await BootstrapAsync(miner2, miner1.AsPeer);
- await BootstrapAsync(receiver, miner1.AsPeer);
+ miner1.AddPeers(new[] { receiver.AsPeer, miner2.AsPeer });
+ miner2.AddPeers(new[] { receiver.AsPeer, miner1.AsPeer });
var t = receiver.PreloadAsync();
await miner1.BlockChain.MineBlock(miner1.Address);
@@ -946,9 +896,9 @@ public async void RestageTransactionsOnceLocallyMinedAfterReorg(bool restage)
await StartAsync(minerA);
await StartAsync(minerB);
- await BootstrapAsync(minerA, minerB.AsPeer);
+ minerB.AddPeer(minerA.AsPeer);
- Log.Debug("Reorg occurrs.");
+ Log.Debug("Reorg occurs.");
minerB.BroadcastBlock(blockC);
await minerA.BlockAppended.WaitAsync();
@@ -1012,7 +962,7 @@ bool IsSignerValid(Transaction tx, BlockChain chain)
await StartAsync(swarmA);
await StartAsync(swarmB);
- await BootstrapAsync(swarmA, swarmB.AsPeer);
+ swarmA.AddPeer(swarmB.AsPeer);
swarmA.BroadcastTxs(new[] { validTx, invalidTx });
await swarmB.TxReceived.WaitAsync();
@@ -1075,7 +1025,7 @@ bool IsSignerValid(Transaction tx, BlockChain chain)
await StartAsync(swarmA);
await StartAsync(swarmB);
- await BootstrapAsync(swarmA, swarmB.AsPeer);
+ swarmA.AddPeer(swarmB.AsPeer);
swarmA.BroadcastTxs(new[] { tx });
await swarmB.TxReceived.WaitAsync();
@@ -1122,8 +1072,8 @@ public async Task CreateNewChainWhenBranchPointNotExist()
await StartAsync(minerSwarmB);
await StartAsync(receiverSwarm);
- await BootstrapAsync(minerSwarmA, receiverSwarm.AsPeer);
- await BootstrapAsync(minerSwarmB, receiverSwarm.AsPeer);
+ minerSwarmA.AddPeer(receiverSwarm.AsPeer);
+ minerSwarmB.AddPeer(receiverSwarm.AsPeer);
// Broadcast SwarmA's first block.
var b1 = await minerChainA.MineBlock(minerSwarmA.Address);
@@ -1177,7 +1127,7 @@ public async Task DoNotDeleteCanonicalChainWhenBlockDownloadFailed()
{
await StartAsync(swarmA);
await StartAsync(swarmB);
- await BootstrapAsync(swarmA, swarmB.AsPeer);
+ swarmA.AddPeer(swarmB.AsPeer);
swarmA.BroadcastBlock(block);
await swarmB.FillBlocksAsyncStarted.WaitAsync();
@@ -1246,8 +1196,8 @@ BlockChain MakeGenesisChain(
await StartAsync(swarmB);
await StartAsync(swarmC);
- await swarmB.AddPeersAsync(new[] { swarmA.AsPeer }, null);
- await swarmC.AddPeersAsync(new[] { swarmA.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
+ swarmA.AddPeer(swarmC.AsPeer);
var block = await swarmA.BlockChain.MineBlock(swarmA.Address);
@@ -1293,9 +1243,9 @@ public async Task FindSpecificPeerAsync()
await StartAsync(swarmC);
await StartAsync(swarmD);
- await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);
- await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null);
- await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
+ swarmB.AddPeer(swarmC.AsPeer);
+ swarmC.AddPeer(swarmD.AsPeer);
BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync(
swarmB.AsPeer.Address,
@@ -1335,8 +1285,8 @@ public async Task FindSpecificPeerAsyncFail()
await StartAsync(swarmB);
await StartAsync(swarmC);
- await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);
- await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
+ swarmB.AddPeer(swarmC.AsPeer);
await StopAsync(swarmB);
@@ -1377,9 +1327,9 @@ public async Task FindSpecificPeerAsyncDepthFail()
await StartAsync(swarmC);
await StartAsync(swarmD);
- await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);
- await swarmB.AddPeersAsync(new Peer[] { swarmC.AsPeer }, null);
- await swarmC.AddPeersAsync(new Peer[] { swarmD.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
+ swarmB.AddPeer(swarmC.AsPeer);
+ swarmC.AddPeer(swarmD.AsPeer);
BoundPeer foundPeer = await swarmA.FindSpecificPeerAsync(
swarmC.AsPeer.Address,
@@ -1389,7 +1339,7 @@ public async Task FindSpecificPeerAsyncDepthFail()
Assert.Equal(swarmC.AsPeer.Address, foundPeer.Address);
swarmA.RoutingTable.Clear();
Assert.Empty(swarmA.Peers);
- await swarmA.AddPeersAsync(new Peer[] { swarmB.AsPeer }, null);
+ swarmA.AddPeer(swarmB.AsPeer);
foundPeer = await swarmA.FindSpecificPeerAsync(
swarmD.AsPeer.Address,
@@ -1432,7 +1382,7 @@ public async Task DoNotFillWhenGetAllBlockAtFirstTimeFromSender()
try
{
- await BootstrapAsync(sender, receiver.AsPeer);
+ sender.AddPeer(receiver.AsPeer);
sender.BroadcastBlock(sender.BlockChain.Tip);
@@ -1473,7 +1423,7 @@ public async Task FillWhenGetAllBlocksFromSender()
try
{
- await BootstrapAsync(sender, receiver.AsPeer);
+ sender.AddPeer(receiver.AsPeer);
sender.BroadcastBlock(sender.BlockChain.Tip);
@@ -1508,8 +1458,8 @@ public async Task DoNotFillMultipleTimes()
try
{
- await BootstrapAsync(sender1, receiver.AsPeer);
- await BootstrapAsync(sender2, receiver.AsPeer);
+ sender1.AddPeer(receiver.AsPeer);
+ sender2.AddPeer(receiver.AsPeer);
sender1.BlockChain.Append(b1);
sender2.BlockChain.Append(b1);
@@ -1573,7 +1523,7 @@ public async Task GetPeerChainStateAsync()
await StartAsync(swarm2);
await StartAsync(swarm3);
- await BootstrapAsync(swarm1, swarm2.AsPeer);
+ swarm1.AddPeer(swarm2.AsPeer);
peerChainState = await swarm1.GetPeerChainStateAsync(
TimeSpan.FromSeconds(1), default);
@@ -1590,7 +1540,7 @@ public async Task GetPeerChainStateAsync()
peerChainState.First()
);
- await BootstrapAsync(swarm1, swarm3.AsPeer);
+ swarm1.AddPeer(swarm3.AsPeer);
peerChainState = await swarm1.GetPeerChainStateAsync(
TimeSpan.FromSeconds(1), default);
Assert.Equal(
@@ -1623,6 +1573,7 @@ public async Task LastMessageTimestamp()
Assert.Null(swarm1.LastMessageTimestamp);
DateTimeOffset bootstrappedAt = DateTimeOffset.UtcNow;
await BootstrapAsync(swarm2, swarm1.AsPeer);
+ await Task.Delay(1000);
await StartAsync(swarm2);
Assert.NotNull(swarm1.LastMessageTimestamp);
@@ -1650,15 +1601,17 @@ public async Task Restart()
{
// Setup
await StartAsync(swarm1);
- await BootstrapAsync(swarm2, swarm1.AsPeer);
await StartAsync(swarm2);
-
+ await BootstrapAsync(swarm2, swarm1.AsPeer);
await Task.Delay(1000);
+ Assert.True(
+ swarm1.Peers.Contains(swarm2.AsPeer),
+ "Swarm2 does not exist in swarm1's routing table after setup.");
+
// Restart
await swarm1.StopAsync();
Assert.False(swarm1.Running);
- await Task.Delay(1000);
await StartAsync(swarm1);
DateTimeOffset restartedAt = DateTimeOffset.UtcNow;
@@ -1666,8 +1619,14 @@ public async Task Restart()
await swarm1.CheckAllPeersAsync();
await swarm2.CheckAllPeersAsync();
- Assert.Contains(swarm1.AsPeer, swarm2.Peers);
- Assert.Contains(swarm2.AsPeer, swarm1.Peers);
+ Assert.True(
+ swarm1.Peers.Contains(swarm2.AsPeer),
+ "Swarm2 does not exist in swarm1's routing table.");
+ Assert.True(
+ swarm2.Peers.Contains(swarm1.AsPeer),
+ "Swarm1 does not exist in swarm2's routing table.");
+ Assert.NotNull(swarm1.LastMessageTimestamp);
+ Assert.NotNull(swarm2.LastMessageTimestamp);
Assert.InRange(
swarm1.LastMessageTimestamp.Value,
restartedAt,
@@ -1830,7 +1789,7 @@ public async Task ResetBlockDemandIfNotChanged()
await StartAsync(sender);
await StartAsync(receiver);
- await BootstrapAsync(sender, receiver.AsPeer);
+ sender.AddPeer(receiver.AsPeer);
sender.BroadcastBlock(lowerBlock);
await receiver.FillBlocksAsyncStarted.WaitAsync();
diff --git a/Libplanet/Net/Protocols/KBucket.cs b/Libplanet/Net/Protocols/KBucket.cs
index ae55561b4e1..460eb0ea831 100644
--- a/Libplanet/Net/Protocols/KBucket.cs
+++ b/Libplanet/Net/Protocols/KBucket.cs
@@ -75,54 +75,63 @@ public void AddPeer(BoundPeer peer)
var updated = DateTimeOffset.UtcNow;
- if (_peerStates.ContainsKey(peer.Address))
+ if (Update(peer))
{
_logger.Verbose("Bucket already contains peer {Peer}", peer);
+ return;
+ }
- // This done because peer's other attribute except public key might be changed.
- // (eg. public IP address, endpoint)
- PeerState state = _peerStates[peer.Address];
- state.Peer = peer;
- state.LastUpdated = updated;
+ if (IsFull())
+ {
+ _logger.Verbose("Bucket is full to add peer {Peer}", peer);
+ if (ReplacementCache.TryAdd(peer, updated))
+ {
+ _logger.Verbose(
+ "Added {Peer} to replacement cache. (total: {Count})",
+ peer,
+ ReplacementCache.Count);
+ }
}
else
{
- if (IsFull())
+ _logger.Verbose("Bucket does not contains peer {Peer}", peer);
+ _lastUpdated = updated;
+ if (_peerStates.TryAdd(peer.Address, new PeerState(peer, updated)))
{
- _logger.Verbose("Bucket is full to add peer {Peer}", peer);
- if (ReplacementCache.TryAdd(peer, updated))
+ _logger.Verbose("Peer {Peer} is added to bucket", peer);
+ _lastUpdated = updated;
+
+ if (ReplacementCache.TryRemove(peer, out var dateTimeOffset))
{
_logger.Verbose(
- "Added {Peer} to replacement cache. (total: {Count})",
+ "Removed peer {Peer} from replacement cache. (total: {Count})",
peer,
ReplacementCache.Count);
}
}
else
{
- _logger.Verbose("Bucket does not contains peer {Peer}", peer);
- _lastUpdated = updated;
- if (_peerStates.TryAdd(peer.Address, new PeerState(peer, updated)))
- {
- _logger.Verbose("Peer {Peer} is added to bucket", peer);
- _lastUpdated = updated;
-
- if (ReplacementCache.TryRemove(peer, out var dateTimeOffset))
- {
- _logger.Verbose(
- "Removed peer {Peer} from replacement cache. (total: {Count})",
- peer,
- ReplacementCache.Count);
- }
- }
- else
- {
- _logger.Verbose("Failed to add peer {Peer} to bucket", peer);
- }
+ _logger.Verbose("Failed to add peer {Peer} to bucket", peer);
}
}
}
+ public bool Update(BoundPeer peer)
+ {
+ if (!Contains(peer))
+ {
+ _logger.Verbose("Bucket does not contains {Peer}. Failed to update.", peer);
+ return false;
+ }
+
+ // This done because peer's other attribute except public key might be changed.
+ // (eg. public IP address, endpoint)
+ PeerState state = _peerStates[peer.Address];
+ state.Peer = peer;
+ state.LastUpdated = DateTimeOffset.UtcNow;
+ return true;
+ }
+
public bool Contains(BoundPeer peer)
{
return _peerStates.ContainsKey(peer.Address);
diff --git a/Libplanet/Net/Protocols/KademliaProtocol.cs b/Libplanet/Net/Protocols/KademliaProtocol.cs
index e260eeaf8fd..05d4f522686 100644
--- a/Libplanet/Net/Protocols/KademliaProtocol.cs
+++ b/Libplanet/Net/Protocols/KademliaProtocol.cs
@@ -26,6 +26,9 @@ public class KademliaProtocol : IProtocol
private readonly ILogger _logger;
+ private readonly ConcurrentDictionary _cache;
+ private readonly int _cacheSize;
+
///
/// Creates a instance.
///
@@ -57,6 +60,8 @@ public KademliaProtocol(
requestTimeout ??
TimeSpan.FromMilliseconds(5000);
_transport.ProcessMessageHandler += ProcessMessageHandler;
+ _cache = new ConcurrentDictionary();
+ _cacheSize = int.MaxValue;
}
///
@@ -180,7 +185,6 @@ public async Task AddPeersAsync(
{
_logger.Debug(
$"Timeout occurred during {nameof(AddPeersAsync)}() after {timeout}.");
- throw;
}
catch (TaskCanceledException)
{
@@ -292,17 +296,14 @@ public async Task CheckReplacementCacheAsync(CancellationToken cancellationToken
try
{
_logger.Verbose("Check peer {Peer}.", replacement);
-
- await PingAsync(replacement, _requestTimeout, cancellationToken);
_table.RemoveCache(replacement);
- Update(replacement);
+ await PingAsync(replacement, _requestTimeout, cancellationToken);
}
catch (PingTimeoutException)
{
_logger.Verbose(
"Remove stale peer {Peer} from replacement cache.",
replacement);
- _table.RemoveCache(replacement);
}
}
}
@@ -442,6 +443,14 @@ internal async Task PingAsync(
try
{
_logger.Verbose("Trying to ping async to {Peer}.", target);
+ if (_cache.ContainsKey(target.Address) || _cache.Count > _cacheSize)
+ {
+ _logger.Verbose(
+ "Request already in cache or cache size limit exceeded. Ping cancelled.");
+ return;
+ }
+
+ _cache[target.Address] = target;
Message reply = await _transport.SendMessageWithReplyAsync(
target,
new Ping(),
@@ -459,7 +468,16 @@ internal async Task PingAsync(
throw new InvalidMessageException("Cannot receive pong from self", pong);
}
- Update(target);
+ if (!pong.Remote.Address.Equals(target.Address))
+ {
+ // Verify sender
+ throw new InvalidMessageException(
+ "Remote peer of Pong does not match with the target peer. " +
+ $"(Expected: {target.Address}, Actual: {pong.Remote.Address})",
+ pong);
+ }
+
+ _table.AddPeer(target);
}
catch (TimeoutException)
{
@@ -479,6 +497,10 @@ internal async Task PingAsync(
$"Different AppProtocolVersion encountered at {nameof(PingAsync)}().");
throw;
}
+ finally
+ {
+ _cache.TryRemove(target.Address, out _);
+ }
}
private void ProcessMessageHandler(object target, Message message)
@@ -529,30 +551,42 @@ private async Task ValidateAsync(
RemovePeer(peer);
throw;
}
+ catch (InvalidMessageException)
+ {
+ _logger.Verbose("Peer {Peer} is invalid, removing...", peer);
+ RemovePeer(peer);
+ throw;
+ }
}
///
/// Updates routing table when receiving a message. If corresponding bucket
- /// for remote peer is not full, just adds given .
+ /// for remote peer is not full, just adds given .
/// Otherwise, checks aliveness of the least recently used (LRU) peer
- /// and determine evict LRU peer or discard given .
+ /// and determine evict LRU peer or discard given .
///
- /// to update.
- ///
- /// Thrown when is null.
+ /// to update.
+ ///
+ /// Thrown when is null or it is not a
+ /// .
///
- private void Update(Peer rawPeer)
+ private void Update(Peer peer)
{
- _logger.Verbose($"Try to {nameof(Update)}() {{Peer}}.", rawPeer);
- if (rawPeer is null)
+ _logger.Verbose($"Try to {nameof(Update)}() {{Peer}}.", peer);
+ if (!(peer is BoundPeer boundPeer))
{
- throw new ArgumentNullException(nameof(rawPeer));
+ _logger.Verbose("Given peer {Peer} is not bounded.", peer);
+ return;
}
- if (rawPeer is BoundPeer peer)
+ // Don't update peer without endpoint or with different appProtocolVersion.
+ if (_table.Contains(boundPeer))
{
- // Don't update peer without endpoint or with different appProtocolVersion.
- _table.AddPeer(peer);
+ _table.UpdatePeer(boundPeer);
+ }
+ else
+ {
+ _ = PingAsync(boundPeer, _requestTimeout, default);
}
}
@@ -673,10 +707,14 @@ private async Task> GetNeighbors(
// Send pong back to remote
private void ReceivePing(Ping ping)
{
+ if (ping.Remote is null)
+ {
+ throw new ArgumentNullException(nameof(ping.Remote));
+ }
+
if (ping.Remote.Address.Equals(_address))
{
- throw new ArgumentException(
- "Cannot receive ping from self");
+ throw new ArgumentException("Cannot receive ping from self");
}
var pong = new Pong
diff --git a/Libplanet/Net/Protocols/RoutingTable.cs b/Libplanet/Net/Protocols/RoutingTable.cs
index 038c695232a..7e8a8d989b7 100644
--- a/Libplanet/Net/Protocols/RoutingTable.cs
+++ b/Libplanet/Net/Protocols/RoutingTable.cs
@@ -132,6 +132,22 @@ public void AddPeer(BoundPeer peer)
BucketOf(peer).AddPeer(peer);
}
+ public void UpdatePeer(BoundPeer peer)
+ {
+ if (peer is null)
+ {
+ throw new ArgumentNullException(nameof(peer));
+ }
+
+ if (peer.Address.Equals(_address))
+ {
+ throw new ArgumentException("Cannot add update self.");
+ }
+
+ _logger.Debug("Updating peer {Peer} from routing table.", peer);
+ BucketOf(peer).Update(peer);
+ }
+
public bool RemovePeer(BoundPeer peer)
{
if (peer is null)
diff --git a/Libplanet/Net/Swarm.cs b/Libplanet/Net/Swarm.cs
index baf4344c264..b0126affea0 100644
--- a/Libplanet/Net/Swarm.cs
+++ b/Libplanet/Net/Swarm.cs
@@ -908,24 +908,6 @@ public async Task CheckAllPeersAsync(
await kademliaProtocol.CheckAllPeersAsync(timeout, cancellationToken);
}
- internal async Task AddPeersAsync(
- IEnumerable peers,
- TimeSpan? timeout,
- CancellationToken cancellationToken = default(CancellationToken))
- {
- if (Transport is null)
- {
- throw new ArgumentNullException(nameof(Transport));
- }
-
- if (cancellationToken == default(CancellationToken))
- {
- cancellationToken = _cancellationToken;
- }
-
- await PeerDiscovery.AddPeersAsync(peers, timeout, cancellationToken);
- }
-
// FIXME: This would be better if it's merged with GetDemandBlockHashes
internal async IAsyncEnumerable>> GetBlockHashes(
BoundPeer peer,