Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions Libplanet.Benchmarks/SwarmBenchmark.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,7 @@ public void InitializeSwarms()

for (int i = 0; i < SwarmNumber - 1; i++)
{
_swarms[i].AddPeersAsync(new[] { _swarms[i + 1].AsPeer }, null)
.Wait(WaitTimeout);
_swarms[i].RoutingTable.AddPeer(_swarms[i + 1].AsPeer as BoundPeer);
}

_blockChains[0].Append(_blocks[1]);
Expand Down
100 changes: 58 additions & 42 deletions Libplanet.Tests/Net/Protocols/ProtocolTest.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Libplanet.Crypto;
Expand All @@ -15,7 +16,9 @@ namespace Libplanet.Tests.Net.Protocols
public class ProtocolTest
{
private const int Timeout = 60 * 1000;
private readonly Dictionary<Address, TestTransport> _transports;
private readonly Dictionary<string, TestTransport> _transports;

private int _assignedPort;

public ProtocolTest(ITestOutputHelper output)
{
Expand All @@ -28,7 +31,8 @@ public ProtocolTest(ITestOutputHelper output)
.CreateLogger()
.ForContext<ProtocolTest>();

_transports = new Dictionary<Address, TestTransport>();
_transports = new Dictionary<string, TestTransport>();
_assignedPort = 1234;
}

[Fact]
Expand Down Expand Up @@ -82,39 +86,9 @@ public async Task Ping()
await transportA.MessageReceived.WaitAsync();
await Task.Delay(100);

Assert.Single(transportA.ReceivedMessages);
Assert.Single(transportB.ReceivedMessages);
Assert.Contains(transportA.AsPeer, transportB.Peers);
}
finally
{
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
}
}

[Fact(Timeout = Timeout)]
public async Task PingTwice()
{
var transportA = CreateTestTransport();
var transportB = CreateTestTransport();

try
{
await StartTestTransportAsync(transportA);
await StartTestTransportAsync(transportB);

transportA.SendPing(transportB.AsPeer);
await transportA.MessageReceived.WaitAsync();
await transportB.MessageReceived.WaitAsync();
transportB.SendPing(transportA.AsPeer);
await transportA.MessageReceived.WaitAsync();
await transportB.MessageReceived.WaitAsync();

Assert.Equal(2, transportA.ReceivedMessages.Count);
Assert.Equal(2, transportB.ReceivedMessages.Count);
Assert.Contains(transportA.AsPeer, transportB.Peers);
Assert.Contains(transportB.AsPeer, transportA.Peers);
}
finally
{
Expand Down Expand Up @@ -181,7 +155,9 @@ public async Task BootstrapAsyncTest()
await StartTestTransportAsync(transportC);

await transportB.BootstrapAsync(new[] { transportA.AsPeer });
await Task.Delay(100);
await transportC.BootstrapAsync(new[] { transportA.AsPeer });
await Task.Delay(100);

Assert.Contains(transportB.AsPeer, transportC.Peers);
Assert.Contains(transportC.AsPeer, transportB.Peers);
Expand Down Expand Up @@ -236,9 +212,9 @@ public async Task RoutingTableFull()
await StartTestTransportAsync(transportB);
await StartTestTransportAsync(transportC);

await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportA.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportB.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportC.AsPeer }, null);

Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Expand All @@ -264,10 +240,10 @@ public async Task ReplacementCache()
await StartTestTransportAsync(transportB);
await StartTestTransportAsync(transportC);

await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportA.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportB.AsPeer }, null);
await Task.Delay(100);
await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportC.AsPeer }, null);

Assert.Single(transportA.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Expand Down Expand Up @@ -300,8 +276,8 @@ public async Task RemoveDeadReplacementCache()
await StartTestTransportAsync(transportB);
await StartTestTransportAsync(transportC);

await transportA.AddPeersAsync(new[] { transport.AsPeer }, null);
await transportB.AddPeersAsync(new[] { transport.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportA.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportB.AsPeer }, null);

Assert.Single(transport.Peers);
Assert.Contains(transportA.AsPeer, transport.Peers);
Expand All @@ -310,7 +286,7 @@ public async Task RemoveDeadReplacementCache()
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);

await transportC.AddPeersAsync(new[] { transport.AsPeer }, null);
await transport.AddPeersAsync(new[] { transportC.AsPeer }, null);
await transport.Protocol.RefreshTableAsync(TimeSpan.Zero, default(CancellationToken));
await transport.Protocol.CheckReplacementCacheAsync(default(CancellationToken));

Expand Down Expand Up @@ -347,6 +323,7 @@ public async Task BroadcastMessage(int count)
}

Log.Debug("Bootstrap completed.");
await Task.Delay(1000);

var tasks =
transports.Select(transport => transport.WaitForTestMessageWithData("foo"));
Expand Down Expand Up @@ -390,7 +367,7 @@ public async Task BroadcastGuarantee()
});

var seed = CreateTestTransport(privateKey0);
var t1 = CreateTestTransport(privateKey1, true);
var t1 = CreateTestTransport(privateKey1, blockBroadcast: true);
var t2 = CreateTestTransport(privateKey2);
await StartTestTransportAsync(seed);
await StartTestTransportAsync(t1);
Expand Down Expand Up @@ -486,8 +463,45 @@ public async Task DoNotBroadcastToSourcePeer()
}
}

[Fact(Timeout = Timeout)]
public async Task DoNotAddMaliciousPeerToTable()
{
var actualPrivateKey = new PrivateKey();
var maliciousPrivateKey = new PrivateKey();
const int port = 1250;
TestTransport transportA = CreateTestTransport();
TestTransport transportB =
CreateTestTransport(privateKey: actualPrivateKey, listenPort: port);

try
{
await StartTestTransportAsync(transportA);
await StartTestTransportAsync(transportB);

var bytes = new byte[10];
new Random().NextBytes(bytes);
var maliciousPeer = new BoundPeer(
maliciousPrivateKey.PublicKey,
(transportB.AsPeer as BoundPeer)?.EndPoint);
var kp = (KademliaProtocol)transportA.Protocol;

// Remote peer of Pong does not match with the target peer.
await Assert.ThrowsAsync<InvalidMessageException>(async () =>
await kp.PingAsync(maliciousPeer, null, default));

Assert.DoesNotContain(maliciousPeer, transportA.Peers);
}
finally
{
await transportA.StopAsync(TimeSpan.Zero);
await transportB.StopAsync(TimeSpan.Zero);
}
}

private TestTransport CreateTestTransport(
PrivateKey privateKey = null,
string host = null,
int? listenPort = null,
bool blockBroadcast = false,
int tableSize = Kademlia.TableSize,
int bucketSize = Kademlia.BucketSize,
Expand All @@ -496,6 +510,8 @@ private TestTransport CreateTestTransport(
return new TestTransport(
_transports,
privateKey ?? new PrivateKey(),
host ?? IPAddress.Loopback.ToString(),
listenPort ?? _assignedPort++,
blockBroadcast,
tableSize,
bucketSize,
Expand Down
31 changes: 21 additions & 10 deletions Libplanet.Tests/Net/Protocols/TestTransport.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,45 @@ internal class TestTransport : ITransport
private static readonly AppProtocolVersion AppProtocolVersion =
AppProtocolVersion.Sign(VersionSigner, 1);

private readonly Dictionary<Address, TestTransport> _transports;
private readonly Dictionary<string, TestTransport> _transports;
private readonly ILogger _logger;
private readonly ConcurrentDictionary<byte[], Address> _peersToReply;
private readonly ConcurrentDictionary<byte[], string> _addressToReply;
private readonly ConcurrentDictionary<byte[], Message> _replyToReceive;
private readonly AsyncCollection<Request> _requests;
private readonly List<string> _ignoreTestMessageWithData;
private readonly PrivateKey _privateKey;
private readonly string _host;
private readonly int _listenPort;
private readonly Random _random;
private readonly bool _blockBroadcast;

private CancellationTokenSource _swarmCancellationTokenSource;
private TimeSpan _networkDelay;

public TestTransport(
Dictionary<Address, TestTransport> transports,
Dictionary<string, TestTransport> transports,
PrivateKey privateKey,
string host,
int listenPort,
bool blockBroadcast,
int tableSize,
int bucketSize,
TimeSpan? networkDelay)
{
_privateKey = privateKey;
_host = host;
_listenPort = listenPort;
_blockBroadcast = blockBroadcast;
var loggerId = _privateKey.ToAddress().ToHex();
_logger = Log.ForContext<TestTransport>()
.ForContext("Address", loggerId);

_peersToReply = new ConcurrentDictionary<byte[], Address>();
_addressToReply = new ConcurrentDictionary<byte[], string>();
_replyToReceive = new ConcurrentDictionary<byte[], Message>();
ReceivedMessages = new ConcurrentBag<Message>();
MessageReceived = new AsyncAutoResetEvent();
_transports = transports;
_transports[privateKey.ToAddress()] = this;
_transports[ToAddress(AsPeer as BoundPeer)] = this;
_networkDelay = networkDelay ?? TimeSpan.Zero;
_requests = new AsyncCollection<Request>();
_ignoreTestMessageWithData = new List<string>();
Expand All @@ -70,7 +76,7 @@ public TestTransport(

public Peer AsPeer => new BoundPeer(
_privateKey.PublicKey,
new DnsEndPoint("localhost", 1234));
new DnsEndPoint(_host, _listenPort));

public IEnumerable<BoundPeer> Peers => Table.Peers;

Expand Down Expand Up @@ -390,8 +396,8 @@ public void ReplyMessage(Message message)
Task.Run(async () =>
{
await Task.Delay(_networkDelay);
_transports[_peersToReply[message.Identity]].ReceiveReply(message);
_peersToReply.TryRemove(message.Identity, out Address addr);
_transports[_addressToReply[message.Identity]].ReceiveReply(message);
_addressToReply.TryRemove(message.Identity, out string addr);
});
}

Expand Down Expand Up @@ -420,6 +426,11 @@ public bool ReceivedTestMessageOfData(string data)
return ReceivedMessages.OfType<TestMessage>().Any(msg => msg.Data == data);
}

private string ToAddress(BoundPeer peer)
{
return $"{peer.EndPoint.Host}:{peer.EndPoint.Port}";
}

private void ReceiveMessage(Message message)
{
if (_swarmCancellationTokenSource.IsCancellationRequested)
Expand Down Expand Up @@ -451,7 +462,7 @@ private void ReceiveMessage(Message message)
}
else
{
_peersToReply[message.Identity] = boundPeer.Address;
_addressToReply[message.Identity] = ToAddress(boundPeer);
}

if (message is Ping)
Expand Down Expand Up @@ -487,7 +498,7 @@ private async Task ProcessRuntime(CancellationToken cancellationToken)
req.Message,
req.Message.Identity,
req.Target);
_transports[req.Target.Address].ReceiveMessage(req.Message);
_transports[ToAddress(req.Target)].ReceiveMessage(req.Message);
}
else
{
Expand Down
31 changes: 31 additions & 0 deletions Libplanet.Tests/Net/SwarmExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Libplanet.Action;
using Libplanet.Net;

namespace Libplanet.Tests.Net
{
internal static class SwarmExtensions
{
public static void AddPeer<T>(this Swarm<T> swarm, Peer peer)
where T : IAction, new()
{
if (swarm.RoutingTable is null)
{
throw new NullReferenceException(nameof(swarm.RoutingTable));
}

swarm.RoutingTable.AddPeer(peer as BoundPeer);
}

public static void AddPeers<T>(this Swarm<T> swarm, IEnumerable<Peer> peers)
where T : IAction, new()
{
foreach (var peer in peers.Where(p => p is BoundPeer _).Select(p => p as BoundPeer))
{
swarm.AddPeer(peer);
}
}
}
}
32 changes: 20 additions & 12 deletions Libplanet.Tests/Net/SwarmTest.AppProtocolVersion.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public async Task DetectAppProtocolVersion()

var peers = new[] { c.AsPeer, d.AsPeer };

foreach (var peer in peers)
{
await a.AddPeersAsync(new[] { peer }, null);
await b.AddPeersAsync(new[] { peer }, null);
}
await a.BootstrapAsync(new[] { c.AsPeer }, null, null);
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null));
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null));
await b.BootstrapAsync(new[] { d.AsPeer }, null, null);

Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray());
Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray());
Expand Down Expand Up @@ -148,13 +149,20 @@ AppProtocolVersion localVersion
await StartAsync(e);
await StartAsync(f);

var peers = new[] { c.AsPeer, d.AsPeer, e.AsPeer, f.AsPeer };

foreach (var peer in peers)
{
await a.AddPeersAsync(new[] { peer }, null);
await b.AddPeersAsync(new[] { peer }, null);
}
await a.BootstrapAsync(new[] { c.AsPeer }, null, null);
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await a.BootstrapAsync(new[] { d.AsPeer }, null, null));
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await a.BootstrapAsync(new[] { e.AsPeer }, null, null));
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await a.BootstrapAsync(new[] { f.AsPeer }, null, null));
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await b.BootstrapAsync(new[] { c.AsPeer }, null, null));
await b.BootstrapAsync(new[] { d.AsPeer }, null, null);
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await b.BootstrapAsync(new[] { e.AsPeer }, null, null));
await Assert.ThrowsAsync<PeerDiscoveryException>(
async () => await b.BootstrapAsync(new[] { f.AsPeer }, null, null));

Assert.Equal(new[] { c.AsPeer }, a.Peers.ToArray());
Assert.Equal(new[] { d.AsPeer }, b.Peers.ToArray());
Expand Down
Loading