diff options
Diffstat (limited to 'src/net_processing.cpp')
| -rw-r--r-- | src/net_processing.cpp | 660 |
1 files changed, 455 insertions, 205 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index b48a3bd22..71ebd72b8 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -6,6 +6,7 @@ #include <net_processing.h> #include <addrman.h> +#include <banman.h> #include <arith_uint256.h> #include <blockencodings.h> #include <chainparams.h> @@ -25,9 +26,10 @@ #include <tinyformat.h> #include <txmempool.h> #include <ui_interface.h> -#include <util.h> -#include <utilmoneystr.h> -#include <utilstrencodings.h> +#include <util/system.h> +#include <util/moneystr.h> +#include <util/strencodings.h> +#include <util/validation.h> #include <memory> @@ -63,12 +65,28 @@ static constexpr int STALE_RELAY_AGE_LIMIT = 30 * 24 * 60 * 60; /// Age after which a block is considered historical for purposes of rate /// limiting block relay. Set to one week, denominated in seconds. static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60; +/** Maximum number of in-flight transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; +/** Maximum number of announced transactions from a peer */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; +/** How many microseconds to delay requesting transactions from inbound peers */ +static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000; +/** How long to wait (in microseconds) before downloading a transaction from an additional peer */ +static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000; +/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ +static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000; +static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, +"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); +/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ +static const unsigned int MAX_GETDATA_SZ = 1000; + struct COrphanTx { // When modifying, adapt the copy of this definition in tests/DoS_tests. CTransactionRef tx; NodeId fromPeer; int64_t nTimeExpire; + size_t list_pos; }; CCriticalSection g_cs_orphans; std::map<uint256, COrphanTx> mapOrphanTransactions GUARDED_BY(g_cs_orphans); @@ -158,8 +176,6 @@ namespace { /** Expiration-time ordered list of (expire time, relay map entry) pairs. */ std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration GUARDED_BY(cs_main); - std::atomic<int64_t> nTimeBestReceived(0); // Used only to inform the wallet of when we last received a block - struct IteratorComparator { template<typename I> @@ -170,6 +186,8 @@ namespace { }; std::map<COutPoint, std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans); + std::vector<std::map<uint256, COrphanTx>::iterator> g_orphan_list GUARDED_BY(g_cs_orphans); //! For random eviction + static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; static std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_cs_orphans); } // namespace @@ -273,7 +291,76 @@ struct CNodeState { //! Time of last new block announcement int64_t m_last_block_announcement; - CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) { + /* + * State associated with transaction download. + * + * Tx download algorithm: + * + * When inv comes in, queue up (process_time, txid) inside the peer's + * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer + * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). + * + * The process_time for a transaction is set to nNow for outbound peers, + * nNow + 2 seconds for inbound peers. This is the time at which we'll + * consider trying to request the transaction from the peer in + * SendMessages(). The delay for inbound peers is to allow outbound peers + * a chance to announce before we request from inbound peers, to prevent + * an adversary from using inbound connections to blind us to a + * transaction (InvBlock). + * + * When we call SendMessages() for a given peer, + * we will loop over the transactions in m_tx_process_time, looking + * at the transactions whose process_time <= nNow. We'll request each + * such transaction that we don't have already and that hasn't been + * requested from another peer recently, up until we hit the + * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update + * g_already_asked_for for each requested txid, storing the time of the + * GETDATA request. We use g_already_asked_for to coordinate transaction + * requests amongst our peers. + * + * For transactions that we still need but we have already recently + * requested from some other peer, we'll reinsert (process_time, txid) + * back into the peer's m_tx_process_time at the point in the future at + * which the most recent GETDATA request would time out (ie + * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). + * We add an additional delay for inbound peers, again to prefer + * attempting download from outbound peers first. + * We also add an extra small random delay up to 2 seconds + * to avoid biasing some peers over others. (e.g., due to fixed ordering + * of peer processing in ThreadMessageHandler). + * + * When we receive a transaction from a peer, we remove the txid from the + * peer's m_tx_in_flight set and from their recently announced set + * (m_tx_announced). We also clear g_already_asked_for for that entry, so + * that if somehow the transaction is not accepted but also not added to + * the reject filter, then we will eventually redownload from other + * peers. + */ + struct TxDownloadState { + /* Track when to attempt download of announced transactions (process + * time in micros -> txid) + */ + std::multimap<int64_t, uint256> m_tx_process_time; + + //! Store all the transactions a peer has recently announced + std::set<uint256> m_tx_announced; + + //! Store transactions which were requested by us + std::set<uint256> m_tx_in_flight; + }; + + TxDownloadState m_tx_download; + + //! Whether this peer is an inbound connection + bool m_is_inbound; + + //! Whether this peer is a manual connection + bool m_is_manual_connection; + + CNodeState(CAddress addrIn, std::string addrNameIn, bool is_inbound, bool is_manual) : + address(addrIn), name(std::move(addrNameIn)), m_is_inbound(is_inbound), + m_is_manual_connection (is_manual) + { fCurrentlyConnected = false; nMisbehavior = 0; fShouldBan = false; @@ -300,6 +387,9 @@ struct CNodeState { } }; +// Keeps track of the time (in microseconds) when transactions were requested last time +limitedmap<uint256, int64_t> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); + /** Map maintaining per-node state. */ static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main); @@ -566,7 +656,7 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec return; } if (pindex->nStatus & BLOCK_HAVE_DATA || chainActive.Contains(pindex)) { - if (pindex->nChainTx) + if (pindex->HaveTxsDownloaded()) state->pindexLastCommonBlock = pindex; } else if (mapBlocksInFlight.count(pindex->GetBlockHash()) == 0) { // The block is not already downloaded, and not yet in flight. @@ -590,6 +680,58 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } +void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + g_already_asked_for.erase(txid); +} + +int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + auto it = g_already_asked_for.find(txid); + if (it != g_already_asked_for.end()) { + return it->second; + } + return 0; +} + +void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + auto it = g_already_asked_for.find(txid); + if (it == g_already_asked_for.end()) { + g_already_asked_for.insert(std::make_pair(txid, request_time)); + } else { + g_already_asked_for.update(it, request_time); + } +} + + +void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; + if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) { + // Too many queued announcements from this peer, or we already have + // this announcement + return; + } + peer_download_state.m_tx_announced.insert(txid); + + int64_t process_time; + int64_t last_request_time = GetTxRequestTime(txid); + // First time requesting this tx + if (last_request_time == 0) { + process_time = nNow; + } else { + // Randomize the delay to avoid biasing some peers over others (such as due to + // fixed ordering of peer processing in ThreadMessageHandler) + process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY); + } + + // We delay processing announcements from non-preferred (eg inbound) peers + if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY; + + peer_download_state.m_tx_process_time.emplace(process_time, txid); +} + } // namespace // This function is used for testing the stale tip eviction logic, see @@ -614,7 +756,7 @@ void PeerLogicValidation::InitializeNode(CNode *pnode) { NodeId nodeid = pnode->GetId(); { LOCK(cs_main); - mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName))); + mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName), pnode->fInbound, pnode->m_manual_connection)); } if(!pnode->fInbound) PushNodeVersion(pnode, connman, GetTime()); @@ -706,8 +848,9 @@ bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRE return false; } - auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME}); + auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME, g_orphan_list.size()}); assert(ret.second); + g_orphan_list.push_back(ret.first); for (const CTxIn& txin : tx->vin) { mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first); } @@ -733,6 +876,18 @@ int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans) if (itPrev->second.empty()) mapOrphanTransactionsByPrev.erase(itPrev); } + + size_t old_pos = it->second.list_pos; + assert(g_orphan_list[old_pos] == it); + if (old_pos + 1 != g_orphan_list.size()) { + // Unless we're deleting the last entry in g_orphan_list, move the last + // entry to the position we're deleting. + auto it_last = g_orphan_list.back(); + g_orphan_list[old_pos] = it_last; + it_last->second.list_pos = old_pos; + } + g_orphan_list.pop_back(); + mapOrphanTransactions.erase(it); return 1; } @@ -779,14 +934,12 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) nNextSweep = nMinExpTime + ORPHAN_TX_EXPIRE_INTERVAL; if (nErased > 0) LogPrint(BCLog::MEMPOOL, "Erased %d orphan tx due to expiration\n", nErased); } + FastRandomContext rng; while (mapOrphanTransactions.size() > nMaxOrphans) { // Evict a random orphan: - uint256 randomhash = GetRandHash(); - std::map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.lower_bound(randomhash); - if (it == mapOrphanTransactions.end()) - it = mapOrphanTransactions.begin(); - EraseOrphanTx(it->first); + size_t randompos = rng.randrange(g_orphan_list.size()); + EraseOrphanTx(g_orphan_list[randompos]->first); ++nEvicted; } return nEvicted; @@ -815,6 +968,90 @@ void Misbehaving(NodeId pnode, int howmuch, const std::string& message) EXCLUSIV LogPrint(BCLog::NET, "%s: %s peer=%d (%d -> %d)%s\n", __func__, state->name, pnode, state->nMisbehavior-howmuch, state->nMisbehavior, message_prefixed); } +/** + * Returns true if the given validation state result may result in a peer + * banning/disconnecting us. We use this to determine which unaccepted + * transactions from a whitelisted peer that we can safely relay. + */ +static bool TxRelayMayResultInDisconnect(const CValidationState& state) +{ + assert(IsTransactionReason(state.GetReason())); + return state.GetReason() == ValidationInvalidReason::CONSENSUS; +} + +/** + * Potentially ban a node based on the contents of a CValidationState object + * + * @param[in] via_compact_block: this bool is passed in because net_processing should + * punish peers differently depending on whether the data was provided in a compact + * block message or not. If the compact block had a valid header, but contained invalid + * txs, the peer should not be punished. See BIP 152. + * + * @return Returns true if the peer was punished (probably disconnected) + * + * Changes here may need to be reflected in TxRelayMayResultInDisconnect(). + */ +static bool MaybePunishNode(NodeId nodeid, const CValidationState& state, bool via_compact_block, const std::string& message = "") { + switch (state.GetReason()) { + case ValidationInvalidReason::NONE: + break; + // The node is providing invalid data: + case ValidationInvalidReason::CONSENSUS: + case ValidationInvalidReason::BLOCK_MUTATED: + if (!via_compact_block) { + LOCK(cs_main); + Misbehaving(nodeid, 100, message); + return true; + } + break; + case ValidationInvalidReason::CACHED_INVALID: + { + LOCK(cs_main); + CNodeState *node_state = State(nodeid); + if (node_state == nullptr) { + break; + } + + // Ban outbound (but not inbound) peers if on an invalid chain. + // Exempt HB compact block peers and manual connections. + if (!via_compact_block && !node_state->m_is_inbound && !node_state->m_is_manual_connection) { + Misbehaving(nodeid, 100, message); + return true; + } + break; + } + case ValidationInvalidReason::BLOCK_INVALID_HEADER: + case ValidationInvalidReason::BLOCK_CHECKPOINT: + case ValidationInvalidReason::BLOCK_INVALID_PREV: + { + LOCK(cs_main); + Misbehaving(nodeid, 100, message); + } + return true; + // Conflicting (but not necessarily invalid) data or different policy: + case ValidationInvalidReason::BLOCK_MISSING_PREV: + { + // TODO: Handle this much more gracefully (10 DoS points is super arbitrary) + LOCK(cs_main); + Misbehaving(nodeid, 10, message); + } + return true; + case ValidationInvalidReason::RECENT_CONSENSUS_CHANGE: + case ValidationInvalidReason::BLOCK_TIME_FUTURE: + case ValidationInvalidReason::TX_NOT_STANDARD: + case ValidationInvalidReason::TX_MISSING_INPUTS: + case ValidationInvalidReason::TX_PREMATURE_SPEND: + case ValidationInvalidReason::TX_WITNESS_MUTATED: + case ValidationInvalidReason::TX_CONFLICT: + case ValidationInvalidReason::TX_MEMPOOL_POLICY: + break; + } + if (message != "") { + LogPrint(BCLog::NET, "peer=%d: %s\n", nodeid, message); + } + return false; +} + @@ -840,9 +1077,8 @@ static bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Para (GetBlockProofEquivalentTime(*pindexBestHeader, *pindex, *pindexBestHeader, consensusParams) < STALE_RELAY_AGE_LIMIT); } -PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler, bool enable_bip61) - : connman(connmanIn), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) { - +PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler &scheduler, bool enable_bip61) + : connman(connmanIn), m_banman(banman), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) { // Initialize global variables that cannot be constructed at startup. recentRejects.reset(new CRollingBloomFilter(120000, 0.000001)); @@ -977,8 +1213,6 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB }); connman->WakeMessageHandler(); } - - nTimeBestReceived = GetTime(); } /** @@ -991,14 +1225,12 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const CValidationSta const uint256 hash(block.GetHash()); std::map<uint256, std::pair<NodeId, bool>>::iterator it = mapBlockSource.find(hash); - int nDoS = 0; - if (state.IsInvalid(nDoS)) { + if (state.IsInvalid()) { // Don't send reject message with code 0 or an internal reject code. if (it != mapBlockSource.end() && State(it->second.first) && state.GetRejectCode() > 0 && state.GetRejectCode() < REJECT_INTERNAL) { CBlockReject reject = {(unsigned char)state.GetRejectCode(), state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), hash}; State(it->second.first)->rejects.push_back(reject); - if (nDoS > 0 && it->second.second) - Misbehaving(it->second.first, nDoS); + MaybePunishNode(/*nodeid=*/ it->second.first, state, /*via_compact_block=*/ !it->second.second); } } // Check that: @@ -1124,7 +1356,7 @@ void static ProcessGetBlockData(CNode* pfrom, const CChainParams& chainparams, c LOCK(cs_main); const CBlockIndex* pindex = LookupBlockIndex(inv.hash); if (pindex) { - if (pindex->nChainTx && !pindex->IsValid(BLOCK_VALID_SCRIPTS) && + if (pindex->HaveTxsDownloaded() && !pindex->IsValid(BLOCK_VALID_SCRIPTS) && pindex->IsValid(BLOCK_VALID_TREE)) { // If we have the block and all of its parents, but have not yet validated it, // we might be in the middle of connecting it (ie in the unlock of cs_main @@ -1348,7 +1580,7 @@ inline void static SendBlockTransactions(const CBlock& block, const BlockTransac connman->PushMessage(pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); } -bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::vector<CBlockHeader>& headers, const CChainParams& chainparams, bool punish_duplicate_invalid) +bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::vector<CBlockHeader>& headers, const CChainParams& chainparams, bool via_compact_block) { const CNetMsgMaker msgMaker(pfrom->GetSendVersion()); size_t nCount = headers.size(); @@ -1410,48 +1642,8 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve CValidationState state; CBlockHeader first_invalid_header; if (!ProcessNewBlockHeaders(headers, state, chainparams, &pindexLast, &first_invalid_header)) { - int nDoS; - if (state.IsInvalid(nDoS)) { - LOCK(cs_main); - if (nDoS > 0) { - Misbehaving(pfrom->GetId(), nDoS, "invalid header received"); - } else { - LogPrint(BCLog::NET, "peer=%d: invalid header received\n", pfrom->GetId()); - } - if (punish_duplicate_invalid && LookupBlockIndex(first_invalid_header.GetHash())) { - // Goal: don't allow outbound peers to use up our outbound - // connection slots if they are on incompatible chains. - // - // We ask the caller to set punish_invalid appropriately based - // on the peer and the method of header delivery (compact - // blocks are allowed to be invalid in some circumstances, - // under BIP 152). - // Here, we try to detect the narrow situation that we have a - // valid block header (ie it was valid at the time the header - // was received, and hence stored in mapBlockIndex) but know the - // block is invalid, and that a peer has announced that same - // block as being on its active chain. - // Disconnect the peer in such a situation. - // - // Note: if the header that is invalid was not accepted to our - // mapBlockIndex at all, that may also be grounds for - // disconnecting the peer, as the chain they are on is likely - // to be incompatible. However, there is a circumstance where - // that does not hold: if the header's timestamp is more than - // 2 hours ahead of our current time. In that case, the header - // may become valid in the future, and we don't want to - // disconnect a peer merely for serving us one too-far-ahead - // block header, to prevent an attacker from splitting the - // network by mining a block right at the 2 hour boundary. - // - // TODO: update the DoS logic (or, rather, rewrite the - // DoS-interface between validation and net_processing) so that - // the interface is cleaner, and so that we disconnect on all the - // reasons that a peer's headers chain is incompatible - // with ours (eg block->nVersion softforks, MTP violations, - // etc), and not just the duplicate-invalid case. - pfrom->fDisconnect = true; - } + if (state.IsInvalid()) { + MaybePunishNode(pfrom->GetId(), state, via_compact_block, "invalid header received"); return false; } } @@ -1569,6 +1761,68 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve return true; } +void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set, std::list<CTransactionRef>& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) +{ + AssertLockHeld(cs_main); + AssertLockHeld(g_cs_orphans); + std::set<NodeId> setMisbehaving; + bool done = false; + while (!done && !orphan_work_set.empty()) { + const uint256 orphanHash = *orphan_work_set.begin(); + orphan_work_set.erase(orphan_work_set.begin()); + + auto orphan_it = mapOrphanTransactions.find(orphanHash); + if (orphan_it == mapOrphanTransactions.end()) continue; + + const CTransactionRef porphanTx = orphan_it->second.tx; + const CTransaction& orphanTx = *porphanTx; + NodeId fromPeer = orphan_it->second.fromPeer; + bool fMissingInputs2 = false; + // Use a new CValidationState because orphans come from different peers (and we call + // MaybePunishNode based on the source peer from the orphan map, not based on the peer + // that relayed the previous transaction). + CValidationState orphan_state; + + if (setMisbehaving.count(fromPeer)) continue; + if (AcceptToMemoryPool(mempool, orphan_state, porphanTx, &fMissingInputs2, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { + LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); + RelayTransaction(orphanTx, connman); + for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + orphan_work_set.insert(elem->first); + } + } + } + EraseOrphanTx(orphanHash); + done = true; + } else if (!fMissingInputs2) { + if (orphan_state.IsInvalid()) { + // Punish peer that gave us an invalid orphan tx + if (MaybePunishNode(fromPeer, orphan_state, /*via_compact_block*/ false)) { + setMisbehaving.insert(fromPeer); + } + LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); + } + // Has inputs but not accepted to mempool + // Probably non-standard or insufficient fee + LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); + assert(IsTransactionReason(orphan_state.GetReason())); + if (!orphanTx.HasWitness() && orphan_state.GetReason() != ValidationInvalidReason::TX_WITNESS_MUTATED) { + // Do not use rejection cache for witness transactions or + // witness-stripped transactions, as they can have been malleated. + // See https://github.com/bitcoin/bitcoin/issues/8279 for details. + assert(recentRejects); + recentRejects->insert(orphanHash); + } + EraseOrphanTx(orphanHash); + done = true; + } + mempool.check(pcoinsTip.get()); + } +} + bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId()); @@ -1638,7 +1892,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr ServiceFlags nServices; int nVersion; int nSendVersion; - std::string strSubVer; std::string cleanSubVer; int nStartingHeight = -1; bool fRelay = true; @@ -1675,6 +1928,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (!vRecv.empty()) vRecv >> addrFrom >> nNonce; if (!vRecv.empty()) { + std::string strSubVer; vRecv >> LIMITED_STRING(strSubVer, MAX_SUBVERSION_LENGTH); cleanSubVer = SanitizeString(strSubVer); } @@ -1706,7 +1960,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->SetAddrLocal(addrMe); { LOCK(pfrom->cs_SubVer); - pfrom->strSubVer = strSubVer; pfrom->cleanSubVer = cleanSubVer; } pfrom->nStartingHeight = nStartingHeight; @@ -1878,6 +2131,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60) addr.nTime = nNow - 5 * 24 * 60 * 60; pfrom->AddAddressKnown(addr); + if (g_banman->IsBanned(addr)) continue; // Do not process banned addresses beyond remembering we received them bool fReachable = IsReachable(addr); if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) { @@ -1944,6 +2198,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr LOCK(cs_main); uint32_t nFetchFlags = GetFetchFlags(pfrom); + int64_t nNow = GetTimeMicros(); for (CInv &inv : vInv) { @@ -1975,7 +2230,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr if (fBlocksOnly) { LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->GetId()); } else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload()) { - pfrom->AskFor(inv); + RequestTx(State(pfrom->GetId()), inv.hash, nNow); } } } @@ -2196,8 +2451,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - std::deque<COutPoint> vWorkQueue; - std::vector<uint256> vEraseQueue; CTransactionRef ptx; vRecv >> ptx; const CTransaction& tx = *ptx; @@ -2210,8 +2463,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr bool fMissingInputs = false; CValidationState state; - pfrom->setAskFor.erase(inv.hash); - mapAlreadyAskedFor.erase(inv.hash); + CNodeState* nodestate = State(pfrom->GetId()); + nodestate->m_tx_download.m_tx_announced.erase(inv.hash); + nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash); + EraseTxRequest(inv.hash); std::list<CTransactionRef> lRemovedTxn; @@ -2220,7 +2475,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.check(pcoinsTip.get()); RelayTransaction(tx, connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { - vWorkQueue.emplace_back(inv.hash, i); + auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i)); + if (it_by_prev != mapOrphanTransactionsByPrev.end()) { + for (const auto& elem : it_by_prev->second) { + pfrom->orphan_work_set.insert(elem->first); + } + } } pfrom->nLastTXTime = GetTime(); @@ -2231,65 +2491,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr mempool.size(), mempool.DynamicMemoryUsage() / 1000); // Recursively process any orphan transactions that depended on this one - std::set<NodeId> setMisbehaving; - while (!vWorkQueue.empty()) { - auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front()); - vWorkQueue.pop_front(); - if (itByPrev == mapOrphanTransactionsByPrev.end()) - continue; - for (auto mi = itByPrev->second.begin(); - mi != itByPrev->second.end(); - ++mi) - { - const CTransactionRef& porphanTx = (*mi)->second.tx; - const CTransaction& orphanTx = *porphanTx; - const uint256& orphanHash = orphanTx.GetHash(); - NodeId fromPeer = (*mi)->second.fromPeer; - bool fMissingInputs2 = false; - // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan - // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get - // anyone relaying LegitTxX banned) - CValidationState stateDummy; - - - if (setMisbehaving.count(fromPeer)) - continue; - if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { - LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); - RelayTransaction(orphanTx, connman); - for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { - vWorkQueue.emplace_back(orphanHash, i); - } - vEraseQueue.push_back(orphanHash); - } - else if (!fMissingInputs2) - { - int nDos = 0; - if (stateDummy.IsInvalid(nDos) && nDos > 0) - { - // Punish peer that gave us an invalid orphan tx - Misbehaving(fromPeer, nDos); - setMisbehaving.insert(fromPeer); - LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString()); - } - // Has inputs but not accepted to mempool - // Probably non-standard or insufficient fee - LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); - vEraseQueue.push_back(orphanHash); - if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) { - // Do not use rejection cache for witness transactions or - // witness-stripped transactions, as they can have been malleated. - // See https://github.com/bitcoin/bitcoin/issues/8279 for details. - assert(recentRejects); - recentRejects->insert(orphanHash); - } - } - mempool.check(pcoinsTip.get()); - } - } - - for (const uint256& hash : vEraseQueue) - EraseOrphanTx(hash); + ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn); } else if (fMissingInputs) { @@ -2302,10 +2504,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr } if (!fRejectedParents) { uint32_t nFetchFlags = GetFetchFlags(pfrom); + int64_t nNow = GetTimeMicros(); + for (const CTxIn& txin : tx.vin) { CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash); pfrom->AddInventoryKnown(_inv); - if (!AlreadyHave(_inv)) pfrom->AskFor(_inv); + if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow); } AddOrphanTx(ptx, pfrom->GetId()); @@ -2322,7 +2526,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr recentRejects->insert(tx.GetHash()); } } else { - if (!tx.HasWitness() && !state.CorruptionPossible()) { + assert(IsTransactionReason(state.GetReason())); + if (!tx.HasWitness() && state.GetReason() != ValidationInvalidReason::TX_WITNESS_MUTATED) { // Do not use rejection cache for witness transactions or // witness-stripped transactions, as they can have been malleated. // See https://github.com/bitcoin/bitcoin/issues/8279 for details. @@ -2341,15 +2546,13 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // to policy, allowing the node to function as a gateway for // nodes hidden behind it. // - // Never relay transactions that we would assign a non-zero DoS - // score for, as we expect peers to do the same with us in that - // case. - int nDoS = 0; - if (!state.IsInvalid(nDoS) || nDoS == 0) { + // Never relay transactions that might result in being + // disconnected (or banned). + if (state.IsInvalid() && TxRelayMayResultInDisconnect(state)) { + LogPrintf("Not relaying invalid transaction %s from whitelisted peer=%d (%s)\n", tx.GetHash().ToString(), pfrom->GetId(), FormatStateMessage(state)); + } else { LogPrintf("Force relaying tx %s from whitelisted peer=%d\n", tx.GetHash().ToString(), pfrom->GetId()); RelayTransaction(tx, connman); - } else { - LogPrintf("Not relaying invalid transaction %s from whitelisted peer=%d (%s)\n", tx.GetHash().ToString(), pfrom->GetId(), FormatStateMessage(state)); } } } @@ -2357,8 +2560,24 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr for (const CTransactionRef& removedTx : lRemovedTxn) AddToCompactExtraTransactions(removedTx); - int nDoS = 0; - if (state.IsInvalid(nDoS)) + // If a tx has been detected by recentRejects, we will have reached + // this point and the tx will have been ignored. Because we haven't run + // the tx through AcceptToMemoryPool, we won't have computed a DoS + // score for it or determined exactly why we consider it invalid. + // + // This means we won't penalize any peer subsequently relaying a DoSy + // tx (even if we penalized the first peer who gave it to us) because + // we have to account for recentRejects showing false positives. In + // other words, we shouldn't penalize a peer if we aren't *sure* they + // submitted a DoSy tx. + // + // Note that recentRejects doesn't just record DoSy or invalid + // transactions, but any tx not accepted by the mempool, which may be + // due to node policy (vs. consensus). So we can't blanket penalize a + // peer simply for relaying a tx that our recentRejects has caught, + // regardless of false positives. + + if (state.IsInvalid()) { LogPrint(BCLog::MEMPOOLREJ, "%s from peer=%d was not accepted: %s\n", tx.GetHash().ToString(), pfrom->GetId(), @@ -2367,15 +2586,19 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr connman->PushMessage(pfrom, msgMaker.Make(NetMsgType::REJECT, strCommand, (unsigned char)state.GetRejectCode(), state.GetRejectReason().substr(0, MAX_REJECT_MESSAGE_LENGTH), inv.hash)); } - if (nDoS > 0) { - Misbehaving(pfrom->GetId(), nDoS); - } + MaybePunishNode(pfrom->GetId(), state, /*via_compact_block*/ false); } return true; } - if (strCommand == NetMsgType::CMPCTBLOCK && !fImporting && !fReindex) // Ignore blocks received while importing + if (strCommand == NetMsgType::CMPCTBLOCK) { + // Ignore cmpctblock received while importing + if (fImporting || fReindex) { + LogPrint(BCLog::NET, "Unexpected cmpctblock message received from peer %d\n", pfrom->GetId()); + return true; + } + CBlockHeaderAndShortTxIDs cmpctblock; vRecv >> cmpctblock; @@ -2399,14 +2622,8 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr const CBlockIndex *pindex = nullptr; CValidationState state; if (!ProcessNewBlockHeaders({cmpctblock.header}, state, chainparams, &pindex)) { - int nDoS; - if (state.IsInvalid(nDoS)) { - if (nDoS > 0) { - LOCK(cs_main); - Misbehaving(pfrom->GetId(), nDoS, strprintf("Peer %d sent us invalid header via cmpctblock\n", pfrom->GetId())); - } else { - LogPrint(BCLog::NET, "Peer %d sent us invalid header via cmpctblock\n", pfrom->GetId()); - } + if (state.IsInvalid()) { + MaybePunishNode(pfrom->GetId(), state, /*via_compact_block*/ true, "invalid header via cmpctblock"); return true; } } @@ -2556,7 +2773,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr // the peer if the header turns out to be for an invalid block. // Note that if a peer tries to build on an invalid chain, that // will be detected and the peer will be banned. - return ProcessHeadersMessage(pfrom, connman, {cmpctblock.header}, chainparams, /*punish_duplicate_invalid=*/false); + return ProcessHeadersMessage(pfrom, connman, {cmpctblock.header}, chainparams, /*via_compact_block=*/true); } if (fBlockReconstructed) { @@ -2595,8 +2812,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing + if (strCommand == NetMsgType::BLOCKTXN) { + // Ignore blocktxn received while importing + if (fImporting || fReindex) { + LogPrint(BCLog::NET, "Unexpected blocktxn message received from peer %d\n", pfrom->GetId()); + return true; + } + BlockTransactions resp; vRecv >> resp; @@ -2670,8 +2893,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } - if (strCommand == NetMsgType::HEADERS && !fImporting && !fReindex) // Ignore headers received while importing + if (strCommand == NetMsgType::HEADERS) { + // Ignore headers received while importing + if (fImporting || fReindex) { + LogPrint(BCLog::NET, "Unexpected headers message received from peer %d\n", pfrom->GetId()); + return true; + } + std::vector<CBlockHeader> headers; // Bypass the normal CBlock deserialization, as we don't want to risk deserializing 2000 full blocks. @@ -2687,16 +2916,17 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr ReadCompactSize(vRecv); // ignore tx count; assume it is 0. } - // Headers received via a HEADERS message should be valid, and reflect - // the chain the peer is on. If we receive a known-invalid header, - // disconnect the peer if it is using one of our outbound connection - // slots. - bool should_punish = !pfrom->fInbound && !pfrom->m_manual_connection; - return ProcessHeadersMessage(pfrom, connman, headers, chainparams, should_punish); + return ProcessHeadersMessage(pfrom, connman, headers, chainparams, /*via_compact_block=*/false); } - if (strCommand == NetMsgType::BLOCK && !fImporting && !fReindex) // Ignore blocks received while importing + if (strCommand == NetMsgType::BLOCK) { + // Ignore block received while importing + if (fImporting || fReindex) { + LogPrint(BCLog::NET, "Unexpected block message received from peer %d\n", pfrom->GetId()); + return true; + } + std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>(); vRecv >> *pblock; @@ -2746,8 +2976,11 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr pfrom->vAddrToSend.clear(); std::vector<CAddress> vAddr = connman->GetAddresses(); FastRandomContext insecure_rand; - for (const CAddress &addr : vAddr) - pfrom->PushAddress(addr, insecure_rand); + for (const CAddress &addr : vAddr) { + if (!g_banman->IsBanned(addr)) { + pfrom->PushAddress(addr, insecure_rand); + } + } return true; } @@ -2925,7 +3158,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr return true; } -static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman, bool enable_bip61) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool PeerLogicValidation::SendRejectsAndCheckIfBanned(CNode* pnode, bool enable_bip61) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { AssertLockHeld(cs_main); CNodeState &state = *State(pnode->GetId()); @@ -2943,14 +3176,16 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman, bool en LogPrintf("Warning: not punishing whitelisted peer %s!\n", pnode->addr.ToString()); else if (pnode->m_manual_connection) LogPrintf("Warning: not punishing manually-connected peer %s!\n", pnode->addr.ToString()); - else { + else if (pnode->addr.IsLocal()) { + // Disconnect but don't ban _this_ local node + LogPrintf("Warning: disconnecting but not banning local peer %s!\n", pnode->addr.ToString()); pnode->fDisconnect = true; - if (pnode->addr.IsLocal()) - LogPrintf("Warning: not banning local peer %s!\n", pnode->addr.ToString()); - else - { - connman->Ban(pnode->addr, BanReasonNodeMisbehaving); + } else { + // Disconnect and ban all nodes sharing the address + if (m_banman) { + m_banman->Ban(pnode->addr, BanReasonNodeMisbehaving); } + connman->DisconnectNode(pnode->addr); } return true; } @@ -2973,11 +3208,21 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter if (!pfrom->vRecvGetData.empty()) ProcessGetData(pfrom, chainparams, connman, interruptMsgProc); + if (!pfrom->orphan_work_set.empty()) { + std::list<CTransactionRef> removed_txn; + LOCK2(cs_main, g_cs_orphans); + ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn); + for (const CTransactionRef& removedTx : removed_txn) { + AddToCompactExtraTransactions(removedTx); + } + } + if (pfrom->fDisconnect) return false; // this maintains the order of responses if (!pfrom->vRecvGetData.empty()) return true; + if (!pfrom->orphan_work_set.empty()) return true; // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) @@ -3074,7 +3319,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter } LOCK(cs_main); - SendRejectsAndCheckIfBanned(pfrom, connman, m_enable_bip61); + SendRejectsAndCheckIfBanned(pfrom, m_enable_bip61); return fMoreWork; } @@ -3144,8 +3389,6 @@ void PeerLogicValidation::EvictExtraOutboundPeers(int64_t time_in_seconds) NodeId worst_peer = -1; int64_t oldest_block_announcement = std::numeric_limits<int64_t>::max(); - LOCK(cs_main); - connman->ForEachNode([&](CNode* pnode) { AssertLockHeld(cs_main); @@ -3193,6 +3436,8 @@ void PeerLogicValidation::EvictExtraOutboundPeers(int64_t time_in_seconds) void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams) { + LOCK(cs_main); + if (connman == nullptr) return; int64_t time_in_seconds = GetTime(); @@ -3200,10 +3445,9 @@ void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params EvictExtraOutboundPeers(time_in_seconds); if (time_in_seconds > m_stale_tip_check_time) { - LOCK(cs_main); // Check whether our tip is stale, and if so, allow using an extra // outbound peer - if (TipMayBeStale(consensusParams)) { + if (!fImporting && !fReindex && connman->GetNetworkActive() && connman->GetUseAddrmanOutgoing() && TipMayBeStale(consensusParams)) { LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - g_last_tip_update); connman->SetTryNewOutboundPeer(true); } else if (connman->GetTryNewOutboundPeer()) { @@ -3276,8 +3520,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) if (!lockMain) return true; - if (SendRejectsAndCheckIfBanned(pto, connman, m_enable_bip61)) - return true; + if (SendRejectsAndCheckIfBanned(pto, m_enable_bip61)) return true; CNodeState &state = *State(pto->GetId()); // Address refresh broadcast @@ -3341,14 +3584,6 @@ bool PeerLogicValidation::SendMessages(CNode* pto) } } - // Resend wallet transactions that haven't gotten in a block yet - // Except during reindex, importing and IBD, when old wallet - // transactions become unconfirmed and spams other nodes. - if (!fReindex && !fImporting && !IsInitialBlockDownload()) - { - GetMainSignals().Broadcast(nTimeBestReceived, connman); - } - // // Try sending block announcements via headers // @@ -3713,24 +3948,39 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // // Message: getdata (non-blocks) // - while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow) - { - const CInv& inv = (*pto->mapAskFor.begin()).second; - if (!AlreadyHave(inv)) - { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= 1000) - { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); + auto& tx_process_time = state.m_tx_download.m_tx_process_time; + while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { + const uint256& txid = tx_process_time.begin()->second; + CInv inv(MSG_TX | GetFetchFlags(pto), txid); + if (!AlreadyHave(inv)) { + // If this transaction was last requested more than 1 minute ago, + // then request. + int64_t last_request_time = GetTxRequestTime(inv.hash); + if (last_request_time <= nNow - GETDATA_TX_INTERVAL) { + LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); + vGetData.push_back(inv); + if (vGetData.size() >= MAX_GETDATA_SZ) { + connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); + } + UpdateTxRequestTime(inv.hash, nNow); + state.m_tx_download.m_tx_in_flight.insert(inv.hash); + } else { + // This transaction is in flight from someone else; queue + // up processing to happen after the download times out + // (with a slight delay for inbound peers, to prefer + // requests to outbound peers). + RequestTx(&state, txid, nNow); } } else { - //If we're not going to ask, don't expect a response. - pto->setAskFor.erase(inv.hash); + // We have already seen this transaction, no need to download. + state.m_tx_download.m_tx_announced.erase(inv.hash); + state.m_tx_download.m_tx_in_flight.erase(inv.hash); } - pto->mapAskFor.erase(pto->mapAskFor.begin()); + tx_process_time.erase(tx_process_time.begin()); } + + if (!vGetData.empty()) connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); |