From 7bf7e33f1376059654610b15546af7c1e2242f3c Mon Sep 17 00:00:00 2001 From: dangershony Date: Fri, 24 Feb 2023 18:04:48 +0000 Subject: [PATCH 1/3] Add LruHashSet and use it in some behaviors, it will make sure the inv doesn't grow indefinitely --- src/Blockcore/Utilities/LruHashSet.cs | 84 +++++++++++++ .../BlockStoreBehavior.cs | 8 +- .../MempoolBehavior.cs | 59 +++++++--- .../Utilities/LruHashSetTest.cs | 110 ++++++++++++++++++ 4 files changed, 241 insertions(+), 20 deletions(-) create mode 100644 src/Blockcore/Utilities/LruHashSet.cs create mode 100644 src/Tests/Blockcore.Tests/Utilities/LruHashSetTest.cs diff --git a/src/Blockcore/Utilities/LruHashSet.cs b/src/Blockcore/Utilities/LruHashSet.cs new file mode 100644 index 000000000..7b8ff4dd3 --- /dev/null +++ b/src/Blockcore/Utilities/LruHashSet.cs @@ -0,0 +1,84 @@ +using System.Collections.Generic; + +namespace Blockcore.Utilities +{ + public class LruHashSet + { + private readonly LinkedList lru; + + private HashSet items; + + private long maxSize; + + private long itemCount; + + private readonly object lockObject = new object(); + + public LruHashSet(long maxSize = long.MaxValue) + { + this.lru = new LinkedList(); + this.items = new HashSet(); + + this.maxSize = maxSize; + this.itemCount = 0; + } + + public void AddOrUpdate(T item) + { + lock (this.lockObject) + { + // First check if we are performing the 'Update' case. No change to item count. + if (this.items.Contains(item)) + { + this.lru.Remove(item); + this.lru.AddLast(item); + + return; + } + + // Otherwise it's 'Add'. + // First perform the size test. + if ((this.itemCount + 1) > this.maxSize) + { + LinkedListNode tempItem = this.lru.First; + this.lru.RemoveFirst(); + this.items.Remove(tempItem.Value); + this.itemCount--; + } + + this.lru.AddLast(item); + this.items.Add(item); + this.itemCount++; + } + } + + public void Clear() + { + lock (this.lockObject) + { + this.lru.Clear(); + this.items.Clear(); + this.itemCount = 0; + } + } + + public bool Contains(T item) + { + lock (this.lockObject) + { + // Fastest to check the hashmap. + return this.items.Contains(item); + } + } + + public void Remove(T item) + { + lock (this.lockObject) + { + this.lru.Remove(item); + this.items.Remove(item); + this.itemCount--; + } + } + } +} \ No newline at end of file diff --git a/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs b/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs index c29b5c8f0..ae6026eec 100644 --- a/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs +++ b/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs @@ -283,16 +283,22 @@ private async Task ProcessGetDataAsync(INetworkPeer peer, GetDataPayload getData { ChainedHeaderBlock chainedHeaderBlock = this.consensusManager.GetBlockData(item.Hash); + if (!peer.IsConnected) + continue; + if (chainedHeaderBlock?.Block != null) { this.logger.LogDebug("Sending block '{0}' to peer '{1}'.", chainedHeaderBlock.ChainedHeader, peer.RemoteSocketEndpoint); - //TODO strip block of witness if node does not support await peer.SendMessageAsync(new BlockPayload(chainedHeaderBlock.Block.WithOptions(this.ChainIndexer.Network.Consensus.ConsensusFactory, peer.SupportedTransactionOptions))).ConfigureAwait(false); } else { this.logger.LogDebug("Block with hash '{0}' requested from peer '{1}' was not found in store.", item.Hash, peer.RemoteSocketEndpoint); + + // https://btcinformation.org/en/developer-reference#notfound + // https://github.com/bitcoin/bitcoin/pull/2192 + await peer.SendMessageAsync(new NotFoundPayload(InventoryType.MSG_BLOCK, item.Hash)).ConfigureAwait(false); } // If the peer is syncing using "getblocks" message we are supposed to send diff --git a/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs b/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs index a7c26b6c2..ead4b0488 100644 --- a/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs +++ b/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs @@ -38,6 +38,14 @@ public class MempoolBehavior : NetworkPeerBehavior /// private const int InventoryBroadcastMax = 7 * InventoryBroadcastInterval; + /// + /// Prevents a single peer from sending us huge quantities of fake `inv` messages with nonexistent hashes. + /// This value is somewhat arbitrarily chosen, it is far greater than the maximum number of transactions that can be stored in any single block. + /// The number of unmined transactions in the mempool should ideally hover around the number in a single block or below, otherwise the mempool will eventually + /// fill completely. However, on some networks it is not uncommon for there to be surges in transaction volume, so we leave some leeway. + /// + private const int MaximumInventoryToTrackCount = 50000; + /// Memory pool validator for validating transactions. private readonly IMempoolValidator validator; @@ -75,7 +83,7 @@ public class MempoolBehavior : NetworkPeerBehavior /// Filter for inventory known. /// State that is local to the behavior. /// - private readonly HashSet filterInventoryKnown; + private readonly LruHashSet filterInventoryKnown; /// /// Locking object for memory pool behaviour. @@ -120,7 +128,7 @@ public MempoolBehavior( this.lockObject = new object(); this.inventoryTxToSend = new HashSet(); - this.filterInventoryKnown = new HashSet(); + this.filterInventoryKnown = new LruHashSet(MaximumInventoryToTrackCount); this.isPeerWhitelistedForRelay = false; this.isBlocksOnlyMode = false; @@ -267,14 +275,14 @@ private async Task SendMempoolPayloadAsync(INetworkPeer peer, MempoolPayload mes } } - this.filterInventoryKnown.Add(hash); + this.filterInventoryKnown.AddOrUpdate(hash); transactionsToSend.Add(hash); this.logger.LogDebug("Added transaction ID '{0}' to inventory list.", hash); } } this.logger.LogDebug("Sending transaction inventory to peer '{0}'.", peer.RemoteSocketEndpoint); - await this.SendAsTxInventoryAsync(peer, transactionsToSend); + await this.SendAsTxInventoryAsync(peer, transactionsToSend).ConfigureAwait(false); this.LastMempoolReq = this.mempoolManager.DateTimeProvider.GetTime(); } @@ -303,7 +311,13 @@ private async Task ProcessInvAsync(INetworkPeer peer, InvPayload invPayload) { foreach (var inv in inventoryTxs) { - this.filterInventoryKnown.Add(inv.Hash); + // It is unlikely that the transaction will be in the to-send hashmap, but there is no harm attempting to remove it anyway. + this.inventoryTxToSend.Remove(inv.Hash); + + // At this point we have no idea whether the proffered hashes exist or are spurious. So we have to rely on the bounded LRU hashmap implementation + // to control the maximum number of extant hashes that we track for this peer. It isn't really worth trying to evict mined transactions from this + // hashmap. + this.filterInventoryKnown.AddOrUpdate(inv.Hash); } } @@ -344,18 +358,22 @@ private async Task ProcessGetDataAsync(INetworkPeer peer, GetDataPayload getData foreach (InventoryVector item in getDataPayload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX))) { - // TODO: check if we need to add support for "not found" - TxMempoolInfo trxInfo = await this.mempoolManager.InfoAsync(item.Hash).ConfigureAwait(false); + + if (!peer.IsConnected) + continue; - if (trxInfo != null) + if (trxInfo == null) { - if (peer.IsConnected) - { - this.logger.LogDebug("Sending transaction '{0}' to peer '{1}'.", item.Hash, peer.RemoteSocketEndpoint); - await peer.SendMessageAsync(new TxPayload(trxInfo.Trx.WithOptions(peer.SupportedTransactionOptions, this.network.Consensus.ConsensusFactory))).ConfigureAwait(false); - } + // https://btcinformation.org/en/developer-reference#notfound + // https://github.com/bitcoin/bitcoin/pull/2192 + await peer.SendMessageAsync(new NotFoundPayload(InventoryType.MSG_TX, item.Hash)).ConfigureAwait(false); + + continue; } + + this.logger.LogDebug("Sending transaction '{0}' to peer '{1}'.", item.Hash, peer.RemoteSocketEndpoint); + await peer.SendMessageAsync(new TxPayload(trxInfo.Trx.WithOptions(peer.SupportedTransactionOptions, this.network.Consensus.ConsensusFactory))).ConfigureAwait(false); } } @@ -381,14 +399,14 @@ private async Task ProcessTxPayloadAsync(INetworkPeer peer, TxPayload transactio // add to local filter lock (this.lockObject) { - this.filterInventoryKnown.Add(trxHash); + this.filterInventoryKnown.AddOrUpdate(trxHash); } this.logger.LogDebug("Added transaction ID '{0}' to known inventory filter.", trxHash); var state = new MempoolValidationState(true); - if (!await this.orphans.AlreadyHaveAsync(trxHash) && await this.validator.AcceptToMemoryPool(state, trx)) + if (!await this.orphans.AlreadyHaveAsync(trxHash).ConfigureAwait(false) && await this.validator.AcceptToMemoryPool(state, trx).ConfigureAwait(false)) { - await this.validator.SanityCheck(); + await this.validator.SanityCheck().ConfigureAwait(false); this.RelayTransaction(trxHash); this.signals.Publish(new TransactionReceived(trx)); @@ -398,7 +416,7 @@ private async Task ProcessTxPayloadAsync(INetworkPeer peer, TxPayload transactio this.logger.LogDebug("Transaction ID '{0}' accepted to memory pool from peer '{1}' (poolsz {2} txn, {3} kb).", trxHash, peer.RemoteSocketEndpoint, mmsize, memdyn / 1000); - await this.orphans.ProcessesOrphansAsync(this, trx); + await this.orphans.ProcessesOrphansAsync(this, trx).ConfigureAwait(false); } else if (state.MissingInputs) { @@ -470,7 +488,10 @@ private void AddTransactionToSend(uint256 hash) lock (this.lockObject) { if (!this.filterInventoryKnown.Contains(hash)) - { + { + // We do not need to bound this in the same way as the 'known' map, + // because transactions are only sent/relayed when they are considered valid, + // plus this map is constantly shrinking as txes are sent. this.inventoryTxToSend.Add(hash); } } @@ -569,7 +590,7 @@ public async Task SendTrickleAsync() foreach (uint256 hash in findInMempool) { // Not in the mempool anymore? don't bother sending it. - TxMempoolInfo txInfo = await this.mempoolManager.InfoAsync(hash); + TxMempoolInfo txInfo = await this.mempoolManager.InfoAsync(hash).ConfigureAwait(false); if (txInfo == null) { this.logger.LogDebug("Transaction ID '{0}' not added to inventory list, no longer in mempool.", hash); diff --git a/src/Tests/Blockcore.Tests/Utilities/LruHashSetTest.cs b/src/Tests/Blockcore.Tests/Utilities/LruHashSetTest.cs new file mode 100644 index 000000000..81afd546d --- /dev/null +++ b/src/Tests/Blockcore.Tests/Utilities/LruHashSetTest.cs @@ -0,0 +1,110 @@ +using Blockcore.Utilities; +using Xunit; + +namespace Blockcore.Tests.Utilities +{ + public class LruHashSetTests + { + [Fact] + public void AddOrUpdate_AddsItemToSet() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + + // Act + set.AddOrUpdate(1); + + // Assert + Assert.True(set.Contains(1)); + } + + [Fact] + public void AddOrUpdate_UpdatesExistingItem() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + set.AddOrUpdate(1); + + // Act + set.AddOrUpdate(1); + + // Assert + Assert.True(set.Contains(1)); + } + + [Fact] + public void AddOrUpdate_RemovesOldestItemWhenMaxSizeExceeded() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + set.AddOrUpdate(1); + set.AddOrUpdate(2); + + // Act + set.AddOrUpdate(3); + + // Assert + Assert.False(set.Contains(1)); + Assert.True(set.Contains(2)); + Assert.True(set.Contains(3)); + } + + [Fact] + public void Clear_RemovesAllItemsFromSet() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + set.AddOrUpdate(1); + set.AddOrUpdate(2); + + // Act + set.Clear(); + + // Assert + Assert.False(set.Contains(1)); + Assert.False(set.Contains(2)); + } + + [Fact] + public void Contains_ReturnsTrueIfItemInSet() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + set.AddOrUpdate(1); + + // Act + var contains = set.Contains(1); + + // Assert + Assert.True(contains); + } + + [Fact] + public void Contains_ReturnsFalseIfItemNotInSet() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + set.AddOrUpdate(1); + + // Act + var contains = set.Contains(2); + + // Assert + Assert.False(contains); + } + + [Fact] + public void Remove_RemovesItemFromSet() + { + // Arrange + var set = new LruHashSet(maxSize: 2); + set.AddOrUpdate(1); + + // Act + set.Remove(1); + + // Assert + Assert.False(set.Contains(1)); + } + } +} From 5ce1dd5d07f0a1096b0f83699054d8728dfbcd1e Mon Sep 17 00:00:00 2001 From: dangershony Date: Fri, 24 Feb 2023 18:19:41 +0000 Subject: [PATCH 2/3] Check that the peer is still connected as the first operation --- .../Blockcore.Features.BlockStore/BlockStoreBehavior.cs | 4 ++-- src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs b/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs index ae6026eec..b6264e24b 100644 --- a/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs +++ b/src/Features/Blockcore.Features.BlockStore/BlockStoreBehavior.cs @@ -281,11 +281,11 @@ private async Task ProcessGetDataAsync(INetworkPeer peer, GetDataPayload getData // TODO: bring logic from core foreach (InventoryVector item in getDataPayload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_BLOCK))) { - ChainedHeaderBlock chainedHeaderBlock = this.consensusManager.GetBlockData(item.Hash); - if (!peer.IsConnected) continue; + ChainedHeaderBlock chainedHeaderBlock = this.consensusManager.GetBlockData(item.Hash); + if (chainedHeaderBlock?.Block != null) { this.logger.LogDebug("Sending block '{0}' to peer '{1}'.", chainedHeaderBlock.ChainedHeader, peer.RemoteSocketEndpoint); diff --git a/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs b/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs index ead4b0488..5b89408bc 100644 --- a/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs +++ b/src/Features/Blockcore.Features.MemoryPool/MempoolBehavior.cs @@ -358,11 +358,11 @@ private async Task ProcessGetDataAsync(INetworkPeer peer, GetDataPayload getData foreach (InventoryVector item in getDataPayload.Inventory.Where(inv => inv.Type.HasFlag(InventoryType.MSG_TX))) { - TxMempoolInfo trxInfo = await this.mempoolManager.InfoAsync(item.Hash).ConfigureAwait(false); - if (!peer.IsConnected) continue; + TxMempoolInfo trxInfo = await this.mempoolManager.InfoAsync(item.Hash).ConfigureAwait(false); + if (trxInfo == null) { // https://btcinformation.org/en/developer-reference#notfound From f916b23c5ea6fb4387864ca85d6fe86b3eb09de8 Mon Sep 17 00:00:00 2001 From: dangershony Date: Mon, 27 Feb 2023 09:25:11 +0000 Subject: [PATCH 3/3] Add missing attribute --- src/Blockcore/P2P/Protocol/Payloads/NotFoundPayload.cs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/Blockcore/P2P/Protocol/Payloads/NotFoundPayload.cs b/src/Blockcore/P2P/Protocol/Payloads/NotFoundPayload.cs index 82ab15eaf..09d613e5c 100644 --- a/src/Blockcore/P2P/Protocol/Payloads/NotFoundPayload.cs +++ b/src/Blockcore/P2P/Protocol/Payloads/NotFoundPayload.cs @@ -9,6 +9,7 @@ namespace Blockcore.P2P.Protocol.Payloads /// /// A getdata message for an asked hash is not found by the remote peer. /// + [Payload("notfound")] public class NotFoundPayload : Payload, IEnumerable { private List inventory = new List();