Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 20 additions & 26 deletions src/llmq/quorums.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,6 @@ namespace llmq
static const std::string DB_QUORUM_SK_SHARE = "q_Qsk";
static const std::string DB_QUORUM_QUORUM_VVEC = "q_Qqvvec";

RecursiveMutex cs_data_requests;
static std::unordered_map<CQuorumDataRequestKey, CQuorumDataRequest, StaticSaltedHasher> mapQuorumDataRequests GUARDED_BY(cs_data_requests);


static uint256 MakeQuorumKey(const CQuorum& q)
{
CHashWriter hw(SER_NETWORK, 0);
Expand Down Expand Up @@ -707,18 +703,9 @@ size_t CQuorumManager::GetQuorumRecoveryStartOffset(const CQuorum& quorum, gsl::

MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& connman, std::string_view msg_type, CDataStream& vRecv)
{
auto strFunc = __func__;
auto errorHandler = [&](const std::string& strError, int nScore = 10) -> MessageProcessingResult {
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: %s, from peer=%d\n", strFunc, msg_type, strError, pfrom.GetId());
if (nScore > 0) {
return MisbehavingError{nScore};
}
return {};
};

if (msg_type == NetMsgType::QGETDATA) {
if (m_mn_activeman == nullptr || (pfrom.GetVerifiedProRegTxHash().IsNull() && !pfrom.qwatch)) {
return errorHandler("Not a verified masternode or a qwatch connection");
return MisbehavingError{10, "not a verified masternode or a qwatch connection"};
}

CQuorumDataRequest request;
Expand All @@ -735,7 +722,7 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c
case (CQuorumDataRequest::Errors::QUORUM_NOT_FOUND):
case (CQuorumDataRequest::Errors::MASTERNODE_IS_NO_MEMBER):
case (CQuorumDataRequest::Errors::UNDEFINED):
if (request_limit_exceeded) ret = errorHandler("Request limit exceeded", 25);
if (request_limit_exceeded) ret = MisbehavingError{25, "request limit exceeded"};
break;
case (CQuorumDataRequest::Errors::QUORUM_VERIFICATION_VECTOR_MISSING):
case (CQuorumDataRequest::Errors::ENCRYPTED_CONTRIBUTIONS_MISSING):
Expand Down Expand Up @@ -812,7 +799,7 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c

if (msg_type == NetMsgType::QDATA) {
if ((m_mn_activeman == nullptr && !m_quorums_watch) || pfrom.GetVerifiedProRegTxHash().IsNull()) {
return errorHandler("Not a verified masternode and -watchquorums is not enabled");
return MisbehavingError{10, "not a verified masternode and -watchquorums is not enabled"};
}

CQuorumDataRequest request;
Expand All @@ -823,25 +810,28 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c
const CQuorumDataRequestKey key(pfrom.GetVerifiedProRegTxHash(), true, request.GetQuorumHash(), request.GetLLMQType());
auto it = mapQuorumDataRequests.find(key);
if (it == mapQuorumDataRequests.end()) {
return errorHandler("Not requested");
return MisbehavingError{10, "not requested"};
}
if (it->second.IsProcessed()) {
return errorHandler("Already received");
return MisbehavingError(10, "already received");
}
if (request != it->second) {
return errorHandler("Not like requested");
return MisbehavingError(10, "not like requested");
}
it->second.SetProcessed();
}

if (request.GetError() != CQuorumDataRequest::Errors::NONE) {
return errorHandler(strprintf("Error %d (%s)", request.GetError(), request.GetErrorString()), 0);
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: %s, from peer=%d\n", __func__, msg_type, strprintf("Error %d (%s)", request.GetError(), request.GetErrorString()), pfrom.GetId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: inline strprintf

NOT_TESTED but probably it should be:

LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: failed %d(%s), from peer=%d\n", __func__, msg_type,  request.GetError(), request.GetErrorString(), pfrom.GetId());

return {};
}

CQuorumPtr pQuorum;
{
if (LOCK(cs_map_quorums); !mapQuorumsCache[request.GetLLMQType()].get(request.GetQuorumHash(), pQuorum)) {
return errorHandler("Quorum not found", 0); // Don't bump score because we asked for it
// Don't bump score because we asked for it
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: Quorum not found, from peer=%d\n", __func__, msg_type, pfrom.GetId());
return {};
}
}

Expand All @@ -854,7 +844,7 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c
if (pQuorum->SetVerificationVector(verificationVector)) {
StartCachePopulatorThread(pQuorum);
} else {
return errorHandler("Invalid quorum verification vector");
return MisbehavingError{10, "invalid quorum verification vector"};
}
}

Expand All @@ -863,12 +853,16 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c
assert(m_mn_activeman);

if (WITH_LOCK(pQuorum->cs_vvec_shShare, return pQuorum->quorumVvec->size() != size_t(pQuorum->params.threshold))) {
return errorHandler("No valid quorum verification vector available", 0); // Don't bump score because we asked for it
// Don't bump score because we asked for it
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: No valid quorum verification vector available, from peer=%d\n", __func__, msg_type, pfrom.GetId());
return {};
}

int memberIdx = pQuorum->GetMemberIndex(request.GetProTxHash());
if (memberIdx == -1) {
return errorHandler("Not a member of the quorum", 0); // Don't bump score because we asked for it
// Don't bump score because we asked for it
LogPrint(BCLog::LLMQ, "CQuorumManager::%s -- %s: Not a member of the quorum, from peer=%d\n", __func__, msg_type, pfrom.GetId());
return {};
}

std::vector<CBLSIESEncryptedObject<CBLSSecretKey>> vecEncrypted;
Expand All @@ -878,13 +872,13 @@ MessageProcessingResult CQuorumManager::ProcessMessage(CNode& pfrom, CConnman& c
vecSecretKeys.resize(vecEncrypted.size());
for (const auto i : irange::range(vecEncrypted.size())) {
if (!m_mn_activeman->Decrypt(vecEncrypted[i], memberIdx, vecSecretKeys[i], PROTOCOL_VERSION)) {
return errorHandler("Failed to decrypt");
return MisbehavingError{10, "failed to decrypt"};
}
}

CBLSSecretKey secretKeyShare = blsWorker.AggregateSecretKeys(vecSecretKeys);
if (!pQuorum->SetSecretKeyShare(secretKeyShare, *m_mn_activeman)) {
return errorHandler("Invalid secret key share received");
return MisbehavingError{10, "invalid secret key share received"};
}
}
WITH_LOCK(cs_db, pQuorum->WriteContributions(*db));
Expand Down
14 changes: 9 additions & 5 deletions src/llmq/quorums.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,10 @@ class CQuorumManager
mutable Mutex cs_db;
std::unique_ptr<CDBWrapper> db GUARDED_BY(cs_db){nullptr};

mutable Mutex cs_data_requests;
mutable std::unordered_map<CQuorumDataRequestKey, CQuorumDataRequest, StaticSaltedHasher> mapQuorumDataRequests
GUARDED_BY(cs_data_requests);

mutable Mutex cs_map_quorums;
mutable std::map<Consensus::LLMQType, Uint256LruHashMap<CQuorumPtr>> mapQuorumsCache GUARDED_BY(cs_map_quorums);

Expand Down Expand Up @@ -294,19 +298,19 @@ class CQuorumManager
void Stop();

void TriggerQuorumDataRecoveryThreads(CConnman& connman, gsl::not_null<const CBlockIndex*> pIndex) const
EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_scan_quorums, !cs_map_quorums);
EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !cs_scan_quorums, !cs_map_quorums);

void UpdatedBlockTip(const CBlockIndex* pindexNew, CConnman& connman, bool fInitialDownload) const
EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_scan_quorums, !cs_map_quorums);
EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !cs_scan_quorums, !cs_map_quorums);

[[nodiscard]] MessageProcessingResult ProcessMessage(CNode& pfrom, CConnman& connman, std::string_view msg_type,
CDataStream& vRecv)
EXCLUSIVE_LOCKS_REQUIRED(!cs_map_quorums, !cs_db);
EXCLUSIVE_LOCKS_REQUIRED(!cs_db, !cs_data_requests, !cs_map_quorums);

static bool HasQuorum(Consensus::LLMQType llmqType, const CQuorumBlockProcessor& quorum_block_processor, const uint256& quorumHash);

bool RequestQuorumData(CNode* pfrom, CConnman& connman, const CQuorum& quorum, uint16_t nDataMask,
const uint256& proTxHash = uint256()) const;
const uint256& proTxHash = uint256()) const EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests);

// all these methods will lock cs_main for a short period of time
CQuorumCPtr GetQuorum(Consensus::LLMQType llmqType, const uint256& quorumHash) const
Expand Down Expand Up @@ -341,7 +345,7 @@ class CQuorumManager

void StartCachePopulatorThread(CQuorumCPtr pQuorum) const;
void StartQuorumDataRecoveryThread(CConnman& connman, CQuorumCPtr pQuorum, gsl::not_null<const CBlockIndex*> pIndex,
uint16_t nDataMask) const;
uint16_t nDataMask) const EXCLUSIVE_LOCKS_REQUIRED(!cs_data_requests);

void StartCleanupOldQuorumDataThread(gsl::not_null<const CBlockIndex*> pIndex) const;
void MigrateOldQuorumDB(CEvoDB& evoDb) const EXCLUSIVE_LOCKS_REQUIRED(!cs_db);
Expand Down
Loading