diff options
Diffstat (limited to 'src/net_processing.cpp')
| -rw-r--r-- | src/net_processing.cpp | 1467 |
1 files changed, 692 insertions, 775 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp index 7c2d7335a..94d4052fa 100644 --- a/src/net_processing.cpp +++ b/src/net_processing.cpp @@ -23,6 +23,7 @@ #include <random.h> #include <reverse_iterator.h> #include <scheduler.h> +#include <streams.h> #include <tinyformat.h> #include <txmempool.h> #include <util/check.h> // For NDEBUG compile time check @@ -71,22 +72,22 @@ static constexpr std::chrono::minutes PING_INTERVAL{2}; static const unsigned int MAX_LOCATOR_SZ = 101; /** The maximum number of entries in an 'inv' protocol message */ static const unsigned int MAX_INV_SZ = 50000; -/** Maximum number of in-flight transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100; -/** Maximum number of announced transactions from a peer */ -static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ; -/** How many microseconds to delay requesting transactions via txids, if we have wtxid-relaying peers */ -static constexpr std::chrono::microseconds TXID_RELAY_DELAY{std::chrono::seconds{2}}; -/** How many microseconds to delay requesting transactions from inbound peers */ -static constexpr std::chrono::microseconds INBOUND_PEER_TX_DELAY{std::chrono::seconds{2}}; +/** Maximum number of in-flight transaction requests from a peer. It is not a hard limit, but the threshold at which + * point the OVERLOADED_PEER_TX_DELAY kicks in. */ +static constexpr int32_t MAX_PEER_TX_REQUEST_IN_FLIGHT = 100; +/** Maximum number of transactions to consider for requesting, per peer. It provides a reasonable DoS limit to + * per-peer memory usage spent on announcements, while covering peers continuously sending INVs at the maximum + * rate (by our own policy, see INVENTORY_BROADCAST_PER_SECOND) for several minutes, while not receiving + * the actual transaction (from any peer) in response to requests for them. */ +static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 5000; +/** How long to delay requesting transactions via txids, if we have wtxid-relaying peers */ +static constexpr auto TXID_RELAY_DELAY = std::chrono::seconds{2}; +/** How long to delay requesting transactions from non-preferred peers */ +static constexpr auto NONPREF_PEER_TX_DELAY = std::chrono::seconds{2}; +/** How long to delay requesting transactions from overloaded peers (see MAX_PEER_TX_REQUEST_IN_FLIGHT). */ +static constexpr auto OVERLOADED_PEER_TX_DELAY = std::chrono::seconds{2}; /** How long to wait (in microseconds) before downloading a transaction from an additional peer */ static constexpr std::chrono::microseconds GETDATA_TX_INTERVAL{std::chrono::seconds{60}}; -/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */ -static constexpr std::chrono::microseconds MAX_GETDATA_RANDOM_DELAY{std::chrono::seconds{2}}; -/** How long to wait (in microseconds) before expiring an in-flight getdata request to a peer */ -static constexpr std::chrono::microseconds TX_EXPIRY_INTERVAL{GETDATA_TX_INTERVAL * 10}; -static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY, -"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY"); /** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */ static const unsigned int MAX_GETDATA_SZ = 1000; /** Number of blocks that can be requested at any given time from a single peer. */ @@ -143,6 +144,8 @@ static constexpr unsigned int MAX_FEEFILTER_CHANGE_DELAY = 5 * 60; static constexpr uint32_t MAX_GETCFILTERS_SIZE = 1000; /** Maximum number of cf hashes that may be requested with one getcfheaders. See BIP 157. */ static constexpr uint32_t MAX_GETCFHEADERS_SIZE = 2000; +/** the maximum percentage of addresses from our addrman to return in response to a getaddr message. */ +static constexpr size_t MAX_PCT_ADDR_TO_SEND = 23; struct COrphanTx { // When modifying, adapt the copy of this definition in tests/DoS_tests. @@ -151,8 +154,14 @@ struct COrphanTx { int64_t nTimeExpire; size_t list_pos; }; + +/** Guards orphan transactions and extra txs for compact blocks */ RecursiveMutex g_cs_orphans; +/** Map from txid to orphan transaction record. Limited by + * -maxorphantx/DEFAULT_MAX_ORPHAN_TRANSACTIONS */ std::map<uint256, COrphanTx> mapOrphanTransactions GUARDED_BY(g_cs_orphans); +/** Index from wtxid into the mapOrphanTransactions to lookup orphan + * transactions using their witness ids. */ std::map<uint256, std::map<uint256, COrphanTx>::iterator> g_orphans_by_wtxid GUARDED_BY(g_cs_orphans); void EraseOrphansFor(NodeId peer); @@ -256,12 +265,19 @@ namespace { return &(*a) < &(*b); } }; - std::map<COutPoint, std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans); - std::vector<std::map<uint256, COrphanTx>::iterator> g_orphan_list GUARDED_BY(g_cs_orphans); //! For random eviction + /** Index from the parents' COutPoint into the mapOrphanTransactions. Used + * to remove orphan transactions from the mapOrphanTransactions */ + std::map<COutPoint, std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans); + /** Orphan transactions in vector for quick random eviction */ + std::vector<std::map<uint256, COrphanTx>::iterator> g_orphan_list GUARDED_BY(g_cs_orphans); - static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; + /** Orphan/conflicted/etc transactions that are kept for compact block reconstruction. + * The last -blockreconstructionextratxn/DEFAULT_BLOCK_RECONSTRUCTION_EXTRA_TXN of + * these are kept in a ring buffer */ static std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_cs_orphans); + /** Offset into vExtraTxnForCompact to insert the next tx */ + static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0; } // namespace namespace { @@ -276,12 +292,6 @@ struct CNodeState { const CService address; //! Whether we have a fully established connection. bool fCurrentlyConnected; - //! Accumulated misbehaviour score for this peer. - int nMisbehavior; - //! Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). - bool m_should_discourage; - //! String name of this peer (debugging/logging purposes). - const std::string name; //! The best known block we know this peer has announced. const CBlockIndex *pindexBestKnownBlock; //! The hash of the last unknown block this peer has announced. @@ -325,10 +335,17 @@ struct CNodeState { */ bool fSupportsDesiredCmpctVersion; - /** State used to enforce CHAIN_SYNC_TIMEOUT - * Only in effect for outbound, non-manual, full-relay connections, with - * m_protect == false - * Algorithm: if a peer's best known block has less work than our tip, + /** State used to enforce CHAIN_SYNC_TIMEOUT and EXTRA_PEER_CHECK_INTERVAL logic. + * + * Both are only in effect for outbound, non-manual, non-protected connections. + * Any peer protected (m_protect = true) is not chosen for eviction. A peer is + * marked as protected if all of these are true: + * - its connection type is IsBlockOnlyConn() == false + * - it gave us a valid connecting header + * - we haven't reached MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT yet + * - it has a better chain than we have + * + * CHAIN_SYNC_TIMEOUT: if a peer's best known block has less work than our tip, * set a timeout CHAIN_SYNC_TIMEOUT seconds in the future: * - If at timeout their best known block now has more work than our tip * when the timeout was set, then either reset the timeout or clear it @@ -338,6 +355,9 @@ struct CNodeState { * and set a shorter timeout, HEADERS_RESPONSE_TIME seconds in future. * If their best known block is still behind when that new timeout is * reached, disconnect. + * + * EXTRA_PEER_CHECK_INTERVAL: after each interval, if we have too many outbound peers, + * drop the outbound one that least recently announced us a new block. */ struct ChainSyncTimeoutState { //! A timeout used for checking whether our peer has sufficiently synced @@ -355,69 +375,6 @@ struct CNodeState { //! Time of last new block announcement int64_t m_last_block_announcement; - /* - * State associated with transaction download. - * - * Tx download algorithm: - * - * When inv comes in, queue up (process_time, txid) inside the peer's - * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer - * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS). - * - * The process_time for a transaction is set to nNow for outbound peers, - * nNow + 2 seconds for inbound peers. This is the time at which we'll - * consider trying to request the transaction from the peer in - * SendMessages(). The delay for inbound peers is to allow outbound peers - * a chance to announce before we request from inbound peers, to prevent - * an adversary from using inbound connections to blind us to a - * transaction (InvBlock). - * - * When we call SendMessages() for a given peer, - * we will loop over the transactions in m_tx_process_time, looking - * at the transactions whose process_time <= nNow. We'll request each - * such transaction that we don't have already and that hasn't been - * requested from another peer recently, up until we hit the - * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update - * g_already_asked_for for each requested txid, storing the time of the - * GETDATA request. We use g_already_asked_for to coordinate transaction - * requests amongst our peers. - * - * For transactions that we still need but we have already recently - * requested from some other peer, we'll reinsert (process_time, txid) - * back into the peer's m_tx_process_time at the point in the future at - * which the most recent GETDATA request would time out (ie - * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for). - * We add an additional delay for inbound peers, again to prefer - * attempting download from outbound peers first. - * We also add an extra small random delay up to 2 seconds - * to avoid biasing some peers over others. (e.g., due to fixed ordering - * of peer processing in ThreadMessageHandler). - * - * When we receive a transaction from a peer, we remove the txid from the - * peer's m_tx_in_flight set and from their recently announced set - * (m_tx_announced). We also clear g_already_asked_for for that entry, so - * that if somehow the transaction is not accepted but also not added to - * the reject filter, then we will eventually redownload from other - * peers. - */ - struct TxDownloadState { - /* Track when to attempt download of announced transactions (process - * time in micros -> txid) - */ - std::multimap<std::chrono::microseconds, GenTxid> m_tx_process_time; - - //! Store all the transactions a peer has recently announced - std::set<uint256> m_tx_announced; - - //! Store transactions which were requested by us, with timestamp - std::map<uint256, std::chrono::microseconds> m_tx_in_flight; - - //! Periodically check for stuck getdata requests - std::chrono::microseconds m_check_expiry_timer{0}; - }; - - TxDownloadState m_tx_download; - //! Whether this peer is an inbound connection bool m_is_inbound; @@ -430,13 +387,10 @@ struct CNodeState { //! Whether this peer relays txs via wtxid bool m_wtxid_relay{false}; - CNodeState(CAddress addrIn, std::string addrNameIn, bool is_inbound, bool is_manual) : - address(addrIn), name(std::move(addrNameIn)), m_is_inbound(is_inbound), - m_is_manual_connection (is_manual) + CNodeState(CAddress addrIn, bool is_inbound, bool is_manual) + : address(addrIn), m_is_inbound(is_inbound), m_is_manual_connection(is_manual) { fCurrentlyConnected = false; - nMisbehavior = 0; - m_should_discourage = false; pindexBestKnownBlock = nullptr; hashLastUnknownBlock.SetNull(); pindexLastCommonBlock = nullptr; @@ -461,9 +415,6 @@ struct CNodeState { } }; -// Keeps track of the time (in microseconds) when transactions were requested last time -limitedmap<uint256, std::chrono::microseconds> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ); - /** Map maintaining per-node state. */ static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main); @@ -474,12 +425,64 @@ static CNodeState *State(NodeId pnode) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { return &it->second; } +/** + * Data structure for an individual peer. This struct is not protected by + * cs_main since it does not contain validation-critical data. + * + * Memory is owned by shared pointers and this object is destructed when + * the refcount drops to zero. + * + * TODO: move most members from CNodeState to this structure. + * TODO: move remaining application-layer data members from CNode to this structure. + */ +struct Peer { + /** Same id as the CNode object for this peer */ + const NodeId m_id{0}; + + /** Protects misbehavior data members */ + Mutex m_misbehavior_mutex; + /** Accumulated misbehavior score for this peer */ + int m_misbehavior_score GUARDED_BY(m_misbehavior_mutex){0}; + /** Whether this peer should be disconnected and marked as discouraged (unless it has the noban permission). */ + bool m_should_discourage GUARDED_BY(m_misbehavior_mutex){false}; + + /** Set of txids to reconsider once their parent transactions have been accepted **/ + std::set<uint256> m_orphan_work_set GUARDED_BY(g_cs_orphans); + + /** Protects m_getdata_requests **/ + Mutex m_getdata_requests_mutex; + /** Work queue of items requested by this peer **/ + std::deque<CInv> m_getdata_requests GUARDED_BY(m_getdata_requests_mutex); + + Peer(NodeId id) : m_id(id) {} +}; + +using PeerRef = std::shared_ptr<Peer>; + +/** + * Map of all Peer objects, keyed by peer id. This map is protected + * by the global g_peer_mutex. Once a shared pointer reference is + * taken, the lock may be released. Individual fields are protected by + * their own locks. + */ +Mutex g_peer_mutex; +static std::map<NodeId, PeerRef> g_peer_map GUARDED_BY(g_peer_mutex); + +/** Get a shared pointer to the Peer object. + * May return nullptr if the Peer object can't be found. */ +static PeerRef GetPeerRef(NodeId id) +{ + LOCK(g_peer_mutex); + auto it = g_peer_map.find(id); + return it != g_peer_map.end() ? it->second : nullptr; +} + static void UpdatePreferredDownload(const CNode& node, CNodeState* state) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { nPreferredDownload -= state->fPreferredDownload; // Whether this node should be marked as a preferred download node. - state->fPreferredDownload = (!node.fInbound || node.HasPermission(PF_NOBAN)) && !node.fOneShot && !node.fClient; + state->fPreferredDownload = (!node.IsInboundConn() || node.HasPermission(PF_NOBAN)) && !node.IsAddrFetchConn() && !node.fClient; nPreferredDownload += state->fPreferredDownload; } @@ -522,7 +525,7 @@ static bool MarkBlockAsReceived(const uint256& hash) EXCLUSIVE_LOCKS_REQUIRED(cs } if (state->vBlocksInFlight.begin() == itInFlight->second.second) { // First block on the queue was received, update the start download time for the next one - state->nDownloadingSince = std::max(state->nDownloadingSince, GetTimeMicros()); + state->nDownloadingSince = std::max(state->nDownloadingSince, count_microseconds(GetTime<std::chrono::microseconds>())); } state->vBlocksInFlight.erase(itInFlight->second.second); state->nBlocksInFlight--; @@ -557,7 +560,7 @@ static bool MarkBlockAsInFlight(CTxMemPool& mempool, NodeId nodeid, const uint25 state->nBlocksInFlightValidHeaders += it->fValidatedHeaders; if (state->nBlocksInFlight == 1) { // We're starting a block download (batch) from this peer. - state->nDownloadingSince = GetTimeMicros(); + state->nDownloadingSince = GetTime<std::chrono::microseconds>().count(); } if (state->nBlocksInFlightValidHeaders == 1 && pindex != nullptr) { nPeersWithValidatedDownloads++; @@ -625,20 +628,19 @@ static void MaybeSetPeerAsAnnouncingHeaderAndIDs(NodeId nodeid, CConnman& connma return; } } - connman.ForNode(nodeid, [&connman](CNode* pfrom){ - AssertLockHeld(cs_main); + connman.ForNode(nodeid, [&connman](CNode* pfrom) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { + AssertLockHeld(::cs_main); uint64_t nCMPCTBLOCKVersion = (pfrom->GetLocalServices() & NODE_WITNESS) ? 2 : 1; if (lNodesAnnouncingHeaderAndIDs.size() >= 3) { // As per BIP152, we only get 3 of our peers to announce // blocks using compact encodings. connman.ForNode(lNodesAnnouncingHeaderAndIDs.front(), [&connman, nCMPCTBLOCKVersion](CNode* pnodeStop){ - AssertLockHeld(cs_main); - connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetSendVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/false, nCMPCTBLOCKVersion)); + connman.PushMessage(pnodeStop, CNetMsgMaker(pnodeStop->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/false, nCMPCTBLOCKVersion)); return true; }); lNodesAnnouncingHeaderAndIDs.pop_front(); } - connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetSendVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/true, nCMPCTBLOCKVersion)); + connman.PushMessage(pfrom, CNetMsgMaker(pfrom->GetCommonVersion()).Make(NetMsgType::SENDCMPCT, /*fAnnounceUsingCMPCTBLOCK=*/true, nCMPCTBLOCKVersion)); lNodesAnnouncingHeaderAndIDs.push_back(pfrom->GetId()); return true; }); @@ -757,73 +759,35 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec } } -void EraseTxRequest(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - g_already_asked_for.erase(gtxid.GetHash()); -} - -std::chrono::microseconds GetTxRequestTime(const GenTxid& gtxid) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(gtxid.GetHash()); - if (it != g_already_asked_for.end()) { - return it->second; - } - return {}; -} - -void UpdateTxRequestTime(const GenTxid& gtxid, std::chrono::microseconds request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - auto it = g_already_asked_for.find(gtxid.GetHash()); - if (it == g_already_asked_for.end()) { - g_already_asked_for.insert(std::make_pair(gtxid.GetHash(), request_time)); - } else { - g_already_asked_for.update(it, request_time); - } -} - -std::chrono::microseconds CalculateTxGetDataTime(const GenTxid& gtxid, std::chrono::microseconds current_time, bool use_inbound_delay, bool use_txid_delay) EXCLUSIVE_LOCKS_REQUIRED(cs_main) -{ - std::chrono::microseconds process_time; - const auto last_request_time = GetTxRequestTime(gtxid); - // First time requesting this tx - if (last_request_time.count() == 0) { - process_time = current_time; - } else { - // Randomize the delay to avoid biasing some peers over others (such as due to - // fixed ordering of peer processing in ThreadMessageHandler) - process_time = last_request_time + GETDATA_TX_INTERVAL + GetRandMicros(MAX_GETDATA_RANDOM_DELAY); - } - - // We delay processing announcements from inbound peers - if (use_inbound_delay) process_time += INBOUND_PEER_TX_DELAY; - - // We delay processing announcements from peers that use txid-relay (instead of wtxid) - if (use_txid_delay) process_time += TXID_RELAY_DELAY; - - return process_time; -} +} // namespace -void RequestTx(CNodeState* state, const GenTxid& gtxid, std::chrono::microseconds current_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManager::AddTxAnnouncement(const CNode& node, const GenTxid& gtxid, std::chrono::microseconds current_time) { - CNodeState::TxDownloadState& peer_download_state = state->m_tx_download; - if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_process_time.size() >= MAX_PEER_TX_ANNOUNCEMENTS || - peer_download_state.m_tx_announced.count(gtxid.GetHash())) { - // Too many queued announcements from this peer, or we already have - // this announcement + AssertLockHeld(::cs_main); // For m_txrequest + NodeId nodeid = node.GetId(); + if (!node.HasPermission(PF_RELAY) && m_txrequest.Count(nodeid) >= MAX_PEER_TX_ANNOUNCEMENTS) { + // Too many queued announcements from this peer return; } - peer_download_state.m_tx_announced.insert(gtxid.GetHash()); - - // Calculate the time to try requesting this transaction. Use - // fPreferredDownload as a proxy for outbound peers. - const auto process_time = CalculateTxGetDataTime(gtxid, current_time, !state->fPreferredDownload, !state->m_wtxid_relay && g_wtxid_relay_peers > 0); - - peer_download_state.m_tx_process_time.emplace(process_time, gtxid); + const CNodeState* state = State(nodeid); + + // Decide the TxRequestTracker parameters for this announcement: + // - "preferred": if fPreferredDownload is set (= outbound, or PF_NOBAN permission) + // - "reqtime": current time plus delays for: + // - NONPREF_PEER_TX_DELAY for announcements from non-preferred connections + // - TXID_RELAY_DELAY for txid announcements while wtxid peers are available + // - OVERLOADED_PEER_TX_DELAY for announcements from peers which have at least + // MAX_PEER_TX_REQUEST_IN_FLIGHT requests in flight (and don't have PF_RELAY). + auto delay = std::chrono::microseconds{0}; + const bool preferred = state->fPreferredDownload; + if (!preferred) delay += NONPREF_PEER_TX_DELAY; + if (!gtxid.IsWtxid() && g_wtxid_relay_peers > 0) delay += TXID_RELAY_DELAY; + const bool overloaded = !node.HasPermission(PF_RELAY) && + m_txrequest.CountInFlight(nodeid) >= MAX_PEER_TX_REQUEST_IN_FLIGHT; + if (overloaded) delay += OVERLOADED_PEER_TX_DELAY; + m_txrequest.ReceivedInv(nodeid, gtxid, preferred, current_time + delay); } -} // namespace - // This function is used for testing the stale tip eviction logic, see // denialofservice_tests.cpp void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) @@ -833,36 +797,37 @@ void UpdateLastBlockAnnounceTime(NodeId node, int64_t time_in_seconds) if (state) state->m_last_block_announcement = time_in_seconds; } -// Returns true for outbound peers, excluding manual connections, feelers, and -// one-shots. -static bool IsOutboundDisconnectionCandidate(const CNode& node) -{ - return !(node.fInbound || node.m_manual_connection || node.fFeeler || node.fOneShot); -} - -void PeerLogicValidation::InitializeNode(CNode *pnode) { +void PeerManager::InitializeNode(CNode *pnode) { CAddress addr = pnode->addr; std::string addrName = pnode->GetAddrName(); NodeId nodeid = pnode->GetId(); { LOCK(cs_main); - mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, std::move(addrName), pnode->fInbound, pnode->m_manual_connection)); + mapNodeState.emplace_hint(mapNodeState.end(), std::piecewise_construct, std::forward_as_tuple(nodeid), std::forward_as_tuple(addr, pnode->IsInboundConn(), pnode->IsManualConn())); + assert(m_txrequest.Count(nodeid) == 0); + } + { + PeerRef peer = std::make_shared<Peer>(nodeid); + LOCK(g_peer_mutex); + g_peer_map.emplace_hint(g_peer_map.end(), nodeid, std::move(peer)); + } + if (!pnode->IsInboundConn()) { + PushNodeVersion(*pnode, m_connman, GetTime()); } - if(!pnode->fInbound) - PushNodeVersion(*pnode, *connman, GetTime()); } -void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const +void PeerManager::ReattemptInitialBroadcast(CScheduler& scheduler) const { - std::map<uint256, uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); + std::set<uint256> unbroadcast_txids = m_mempool.GetUnbroadcastTxs(); + + for (const auto& txid : unbroadcast_txids) { + CTransactionRef tx = m_mempool.get(txid); - for (const auto& elem : unbroadcast_txids) { - // Sanity check: all unbroadcast txns should exist in the mempool - if (m_mempool.exists(elem.first)) { + if (tx != nullptr) { LOCK(cs_main); - RelayTransaction(elem.first, elem.second, *connman); + RelayTransaction(txid, tx->GetWitnessHash(), m_connman); } else { - m_mempool.RemoveUnbroadcastTx(elem.first, true); + m_mempool.RemoveUnbroadcastTx(txid, true); } } @@ -872,16 +837,24 @@ void PeerLogicValidation::ReattemptInitialBroadcast(CScheduler& scheduler) const scheduler.scheduleFromNow([&] { ReattemptInitialBroadcast(scheduler); }, delta); } -void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { +void PeerManager::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTime) { fUpdateConnectionTime = false; LOCK(cs_main); + int misbehavior{0}; + { + PeerRef peer = GetPeerRef(nodeid); + assert(peer != nullptr); + misbehavior = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score); + LOCK(g_peer_mutex); + g_peer_map.erase(nodeid); + } CNodeState *state = State(nodeid); assert(state != nullptr); if (state->fSyncStarted) nSyncStarted--; - if (state->nMisbehavior == 0 && state->fCurrentlyConnected) { + if (misbehavior == 0 && state->fCurrentlyConnected) { fUpdateConnectionTime = true; } @@ -889,6 +862,7 @@ void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTim mapBlocksInFlight.erase(entry.hash); } EraseOrphansFor(nodeid); + m_txrequest.DisconnectedPeer(nodeid); nPreferredDownload -= state->fPreferredDownload; nPeersWithValidatedDownloads -= (state->nBlocksInFlightValidHeaders != 0); assert(nPeersWithValidatedDownloads >= 0); @@ -906,22 +880,29 @@ void PeerLogicValidation::FinalizeNode(NodeId nodeid, bool& fUpdateConnectionTim assert(nPeersWithValidatedDownloads == 0); assert(g_outbound_peers_with_protect_from_disconnect == 0); assert(g_wtxid_relay_peers == 0); + assert(m_txrequest.Size() == 0); } LogPrint(BCLog::NET, "Cleared nodestate for peer=%d\n", nodeid); } bool GetNodeStateStats(NodeId nodeid, CNodeStateStats &stats) { - LOCK(cs_main); - CNodeState *state = State(nodeid); - if (state == nullptr) - return false; - stats.nMisbehavior = state->nMisbehavior; - stats.nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1; - stats.nCommonHeight = state->pindexLastCommonBlock ? state->pindexLastCommonBlock->nHeight : -1; - for (const QueuedBlock& queue : state->vBlocksInFlight) { - if (queue.pindex) - stats.vHeightInFlight.push_back(queue.pindex->nHeight); + { + LOCK(cs_main); + CNodeState* state = State(nodeid); + if (state == nullptr) + return false; + stats.nSyncHeight = state->pindexBestKnownBlock ? state->pindexBestKnownBlock->nHeight : -1; + stats.nCommonHeight = state->pindexLastCommonBlock ? state->pindexLastCommonBlock->nHeight : -1; + for (const QueuedBlock& queue : state->vBlocksInFlight) { + if (queue.pindex) + stats.vHeightInFlight.push_back(queue.pindex->nHeight); + } } + + PeerRef peer = GetPeerRef(nodeid); + if (peer == nullptr) return false; + stats.m_misbehavior_score = WITH_LOCK(peer->m_misbehavior_mutex, return peer->m_misbehavior_score); + return true; } @@ -1061,39 +1042,27 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans) return nEvicted; } -/** - * Increment peer's misbehavior score. If the new value >= DISCOURAGEMENT_THRESHOLD, mark the node - * to be discouraged, meaning the peer might be disconnected and added to the discouragement filter. - */ -void Misbehaving(const NodeId pnode, const int howmuch, const std::string& message) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +void PeerManager::Misbehaving(const NodeId pnode, const int howmuch, const std::string& message) { assert(howmuch > 0); - CNodeState* const state = State(pnode); - if (state == nullptr) return; + PeerRef peer = GetPeerRef(pnode); + if (peer == nullptr) return; - state->nMisbehavior += howmuch; + LOCK(peer->m_misbehavior_mutex); + peer->m_misbehavior_score += howmuch; const std::string message_prefixed = message.empty() ? "" : (": " + message); - if (state->nMisbehavior >= DISCOURAGEMENT_THRESHOLD && state->nMisbehavior - howmuch < DISCOURAGEMENT_THRESHOLD) - { - LogPrint(BCLog::NET, "Misbehaving: peer=%d (%d -> %d) DISCOURAGE THRESHOLD EXCEEDED%s\n", pnode, state->nMisbehavior - howmuch, state->nMisbehavior, message_prefixed); - state->m_should_discourage = true; + if (peer->m_misbehavior_score >= DISCOURAGEMENT_THRESHOLD && peer->m_misbehavior_score - howmuch < DISCOURAGEMENT_THRESHOLD) { + LogPrint(BCLog::NET, "Misbehaving: peer=%d (%d -> %d) DISCOURAGE THRESHOLD EXCEEDED%s\n", pnode, peer->m_misbehavior_score - howmuch, peer->m_misbehavior_score, message_prefixed); + peer->m_should_discourage = true; } else { - LogPrint(BCLog::NET, "Misbehaving: peer=%d (%d -> %d)%s\n", pnode, state->nMisbehavior - howmuch, state->nMisbehavior, message_prefixed); + LogPrint(BCLog::NET, "Misbehaving: peer=%d (%d -> %d)%s\n", pnode, peer->m_misbehavior_score - howmuch, peer->m_misbehavior_score, message_prefixed); } } -/** - * Potentially mark a node discouraged based on the contents of a BlockValidationState object - * - * @param[in] via_compact_block this bool is passed in because net_processing should - * punish peers differently depending on whether the data was provided in a compact - * block message or not. If the compact block had a valid header, but contained invalid - * txs, the peer should not be punished. See BIP 152. - * - * @return Returns true if the peer was punished (probably disconnected) - */ -static bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state, bool via_compact_block, const std::string& message = "") { +bool PeerManager::MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& state, + bool via_compact_block, const std::string& message) +{ switch (state.GetResult()) { case BlockValidationResult::BLOCK_RESULT_UNSET: break; @@ -1101,7 +1070,6 @@ static bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& s case BlockValidationResult::BLOCK_CONSENSUS: case BlockValidationResult::BLOCK_MUTATED: if (!via_compact_block) { - LOCK(cs_main); Misbehaving(nodeid, 100, message); return true; } @@ -1125,18 +1093,12 @@ static bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& s case BlockValidationResult::BLOCK_INVALID_HEADER: case BlockValidationResult::BLOCK_CHECKPOINT: case BlockValidationResult::BLOCK_INVALID_PREV: - { - LOCK(cs_main); - Misbehaving(nodeid, 100, message); - } + Misbehaving(nodeid, 100, message); return true; // Conflicting (but not necessarily invalid) data or different policy: case BlockValidationResult::BLOCK_MISSING_PREV: - { - // TODO: Handle this much more gracefully (10 DoS points is super arbitrary) - LOCK(cs_main); - Misbehaving(nodeid, 10, message); - } + // TODO: Handle this much more gracefully (10 DoS points is super arbitrary) + Misbehaving(nodeid, 10, message); return true; case BlockValidationResult::BLOCK_RECENT_CONSENSUS_CHANGE: case BlockValidationResult::BLOCK_TIME_FUTURE: @@ -1148,23 +1110,15 @@ static bool MaybePunishNodeForBlock(NodeId nodeid, const BlockValidationState& s return false; } -/** - * Potentially disconnect and discourage a node based on the contents of a TxValidationState object - * - * @return Returns true if the peer was punished (probably disconnected) - */ -static bool MaybePunishNodeForTx(NodeId nodeid, const TxValidationState& state, const std::string& message = "") +bool PeerManager::MaybePunishNodeForTx(NodeId nodeid, const TxValidationState& state, const std::string& message) { switch (state.GetResult()) { case TxValidationResult::TX_RESULT_UNSET: break; // The node is providing invalid data: case TxValidationResult::TX_CONSENSUS: - { - LOCK(cs_main); - Misbehaving(nodeid, 100, message); - return true; - } + Misbehaving(nodeid, 100, message); + return true; // Conflicting (but not necessarily invalid) data or different policy: case TxValidationResult::TX_RECENT_CONSENSUS_CHANGE: case TxValidationResult::TX_INPUTS_NOT_STANDARD: @@ -1202,8 +1156,10 @@ static bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Para (GetBlockProofEquivalentTime(*pindexBestHeader, *pindex, *pindexBestHeader, consensusParams) < STALE_RELAY_AGE_LIMIT); } -PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler& scheduler, ChainstateManager& chainman, CTxMemPool& pool) - : connman(connmanIn), +PeerManager::PeerManager(const CChainParams& chainparams, CConnman& connman, BanMan* banman, + CScheduler& scheduler, ChainstateManager& chainman, CTxMemPool& pool) + : m_chainparams(chainparams), + m_connman(connman), m_banman(banman), m_chainman(chainman), m_mempool(pool), @@ -1223,13 +1179,12 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS // same probability that we have in the reject filter). g_recent_confirmed_transactions.reset(new CRollingBloomFilter(48000, 0.000001)); - const Consensus::Params& consensusParams = Params().GetConsensus(); // Stale tip checking and peer eviction are on two different timers, but we // don't want them to get out of sync due to drift in the scheduler, so we // combine them in one function and schedule at the quicker (peer-eviction) // timer. static_assert(EXTRA_PEER_CHECK_INTERVAL < STALE_CHECK_INTERVAL, "peer eviction timer should be less than stale tip check timer"); - scheduler.scheduleEvery([this, consensusParams] { this->CheckForStaleTipAndEvictPeers(consensusParams); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL}); + scheduler.scheduleEvery([this] { this->CheckForStaleTipAndEvictPeers(); }, std::chrono::seconds{EXTRA_PEER_CHECK_INTERVAL}); // schedule next run for 10-15 minutes in the future const std::chrono::milliseconds delta = std::chrono::minutes{10} + GetRandMillis(std::chrono::minutes{5}); @@ -1238,9 +1193,10 @@ PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CS /** * Evict orphan txn pool entries (EraseOrphanTx) based on a newly connected - * block. Also save the time of the last tip update. + * block, remember the recently confirmed transactions, and delete tracked + * announcements for them. Also save the time of the last tip update. */ -void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) +void PeerManager::BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindex) { { LOCK(g_cs_orphans); @@ -1282,9 +1238,16 @@ void PeerLogicValidation::BlockConnected(const std::shared_ptr<const CBlock>& pb } } } + { + LOCK(cs_main); + for (const auto& ptx : pblock->vtx) { + m_txrequest.ForgetTxHash(ptx->GetHash()); + m_txrequest.ForgetTxHash(ptx->GetWitnessHash()); + } + } } -void PeerLogicValidation::BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) +void PeerManager::BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) { // To avoid relay problems with transactions that were previously // confirmed, clear our filter of recently confirmed transactions whenever @@ -1309,7 +1272,7 @@ static bool fWitnessesPresentInMostRecentCompactBlock GUARDED_BY(cs_most_recent_ * Maintain state about the best-seen block and fast-announce a compact block * to compatible peers. */ -void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) { +void PeerManager::NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) { std::shared_ptr<const CBlockHeaderAndShortTxIDs> pcmpctblock = std::make_shared<const CBlockHeaderAndShortTxIDs> (*pblock, true); const CNetMsgMaker msgMaker(PROTOCOL_VERSION); @@ -1320,7 +1283,7 @@ void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std: return; nHighestFastAnnounce = pindex->nHeight; - bool fWitnessEnabled = IsWitnessEnabled(pindex->pprev, Params().GetConsensus()); + bool fWitnessEnabled = IsWitnessEnabled(pindex->pprev, m_chainparams.GetConsensus()); uint256 hashBlock(pblock->GetHash()); { @@ -1331,11 +1294,11 @@ void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std: fWitnessesPresentInMostRecentCompactBlock = fWitnessEnabled; } - connman->ForEachNode([this, &pcmpctblock, pindex, &msgMaker, fWitnessEnabled, &hashBlock](CNode* pnode) { - AssertLockHeld(cs_main); + m_connman.ForEachNode([this, &pcmpctblock, pindex, &msgMaker, fWitnessEnabled, &hashBlock](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { + AssertLockHeld(::cs_main); // TODO: Avoid the repeated-serialization here - if (pnode->nVersion < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect) + if (pnode->GetCommonVersion() < INVALID_CB_NO_BAN_VERSION || pnode->fDisconnect) return; ProcessBlockAvailability(pnode->GetId()); CNodeState &state = *State(pnode->GetId()); @@ -1344,9 +1307,9 @@ void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std: if (state.fPreferHeaderAndIDs && (!fWitnessEnabled || state.fWantsCmpctWitness) && !PeerHasHeader(&state, pindex) && PeerHasHeader(&state, pindex->pprev)) { - LogPrint(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", "PeerLogicValidation::NewPoWValidBlock", + LogPrint(BCLog::NET, "%s sending header-and-ids %s to peer=%d\n", "PeerManager::NewPoWValidBlock", hashBlock.ToString(), pnode->GetId()); - connman->PushMessage(pnode, msgMaker.Make(NetMsgType::CMPCTBLOCK, *pcmpctblock)); + m_connman.PushMessage(pnode, msgMaker.Make(NetMsgType::CMPCTBLOCK, *pcmpctblock)); state.pindexBestHeaderSent = pindex; } }); @@ -1356,9 +1319,9 @@ void PeerLogicValidation::NewPoWValidBlock(const CBlockIndex *pindex, const std: * Update our best height and announce any block hashes which weren't previously * in ::ChainActive() to our peers. */ -void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { +void PeerManager::UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) { const int nNewHeight = pindexNew->nHeight; - connman->SetBestHeight(nNewHeight); + m_connman.SetBestHeight(nNewHeight); SetServiceFlagsIBDCache(!fInitialDownload); if (!fInitialDownload) { @@ -1375,7 +1338,7 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB } } // Relay inventory, but don't relay old inventory during initial block download. - connman->ForEachNode([nNewHeight, &vHashes](CNode* pnode) { + m_connman.ForEachNode([nNewHeight, &vHashes](CNode* pnode) { LOCK(pnode->cs_inventory); if (nNewHeight > (pnode->nStartingHeight != -1 ? pnode->nStartingHeight - 2000 : 0)) { for (const uint256& hash : reverse_iterate(vHashes)) { @@ -1383,7 +1346,7 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB } } }); - connman->WakeMessageHandler(); + m_connman.WakeMessageHandler(); } } @@ -1391,7 +1354,7 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB * Handle invalid block rejection and consequent peer discouragement, maintain which * peers announce compact blocks. */ -void PeerLogicValidation::BlockChecked(const CBlock& block, const BlockValidationState& state) { +void PeerManager::BlockChecked(const CBlock& block, const BlockValidationState& state) { LOCK(cs_main); const uint256 hash(block.GetHash()); @@ -1414,7 +1377,7 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const BlockValidatio !::ChainstateActive().IsInitialBlockDownload() && mapBlocksInFlight.count(hash) == mapBlocksInFlight.size()) { if (it != mapBlockSource.end()) { - MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, *connman); + MaybeSetPeerAsAnnouncingHeaderAndIDs(it->second.first, m_connman); } } if (it != mapBlockSource.end()) @@ -1427,56 +1390,50 @@ void PeerLogicValidation::BlockChecked(const CBlock& block, const BlockValidatio // -bool static AlreadyHave(const CInv& inv, const CTxMemPool& mempool) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +bool static AlreadyHaveTx(const GenTxid& gtxid, const CTxMemPool& mempool) EXCLUSIVE_LOCKS_REQUIRED(cs_main) { - switch (inv.type) - { - case MSG_TX: - case MSG_WITNESS_TX: - case MSG_WTX: - { - assert(recentRejects); - if (::ChainActive().Tip()->GetBlockHash() != hashRecentRejectsChainTip) - { - // If the chain tip has changed previously rejected transactions - // might be now valid, e.g. due to a nLockTime'd tx becoming valid, - // or a double-spend. Reset the rejects filter and give those - // txs a second chance. - hashRecentRejectsChainTip = ::ChainActive().Tip()->GetBlockHash(); - recentRejects->reset(); - } - - { - LOCK(g_cs_orphans); - if (!inv.IsMsgWtx() && mapOrphanTransactions.count(inv.hash)) { - return true; - } else if (inv.IsMsgWtx() && g_orphans_by_wtxid.count(inv.hash)) { - return true; - } - } + assert(recentRejects); + if (::ChainActive().Tip()->GetBlockHash() != hashRecentRejectsChainTip) { + // If the chain tip has changed previously rejected transactions + // might be now valid, e.g. due to a nLockTime'd tx becoming valid, + // or a double-spend. Reset the rejects filter and give those + // txs a second chance. + hashRecentRejectsChainTip = ::ChainActive().Tip()->GetBlockHash(); + recentRejects->reset(); + } - { - LOCK(g_cs_recent_confirmed_transactions); - if (g_recent_confirmed_transactions->contains(inv.hash)) return true; - } + const uint256& hash = gtxid.GetHash(); - return recentRejects->contains(inv.hash) || mempool.exists(ToGenTxid(inv)); + { + LOCK(g_cs_orphans); + if (!gtxid.IsWtxid() && mapOrphanTransactions.count(hash)) { + return true; + } else if (gtxid.IsWtxid() && g_orphans_by_wtxid.count(hash)) { + return true; } - case MSG_BLOCK: - case MSG_WITNESS_BLOCK: - return LookupBlockIndex(inv.hash) != nullptr; } - // Don't know what it is, just say we already got one - return true; + + { + LOCK(g_cs_recent_confirmed_transactions); + if (g_recent_confirmed_transactions->contains(hash)) return true; + } + + return recentRejects->contains(hash) || mempool.exists(gtxid); +} + +bool static AlreadyHaveBlock(const uint256& block_hash) EXCLUSIVE_LOCKS_REQUIRED(cs_main) +{ + return LookupBlockIndex(block_hash) != nullptr; } void RelayTransaction(const uint256& txid, const uint256& wtxid, const CConnman& connman) { - connman.ForEachNode([&txid, &wtxid](CNode* pnode) - { - AssertLockHeld(cs_main); - CNodeState &state = *State(pnode->GetId()); - if (state.m_wtxid_relay) { + connman.ForEachNode([&txid, &wtxid](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { + AssertLockHeld(::cs_main); + + CNodeState* state = State(pnode->GetId()); + if (state == nullptr) return; + if (state->m_wtxid_relay) { pnode->PushTxInventory(wtxid); } else { pnode->PushTxInventory(txid); @@ -1486,7 +1443,7 @@ void RelayTransaction(const uint256& txid, const uint256& wtxid, const CConnman& static void RelayAddress(const CAddress& addr, bool fReachable, const CConnman& connman) { - unsigned int nRelayNodes = fReachable ? 2 : 1; // limited relaying of addresses outside our network(s) + if (!fReachable && !addr.IsRelayable()) return; // Relay to a limited number of other nodes // Use deterministic randomness to send to the same nodes for 24 hours @@ -1495,11 +1452,14 @@ static void RelayAddress(const CAddress& addr, bool fReachable, const CConnman& const CSipHasher hasher = connman.GetDeterministicRandomizer(RANDOMIZER_ID_ADDRESS_RELAY).Write(hashAddr << 32).Write((GetTime() + hashAddr) / (24 * 60 * 60)); FastRandomContext insecure_rand; + // Relay reachable addresses to 2 peers. Unreachable addresses are relayed randomly to 1 or 2 peers. + unsigned int nRelayNodes = (fReachable || (hasher.Finalize() & 1)) ? 2 : 1; + std::array<std::pair<uint64_t, CNode*>,2> best{{{0, nullptr}, {0, nullptr}}}; assert(nRelayNodes <= best.size()); auto sortfunc = [&best, &hasher, nRelayNodes](CNode* pnode) { - if (pnode->IsAddrRelayPeer()) { + if (pnode->RelayAddrsWithConn()) { uint64_t hashKey = CSipHasher(hasher).Write(pnode->GetId()).Finalize(); for (unsigned int i = 0; i < nRelayNodes; i++) { if (hashKey > best[i].first) { @@ -1552,7 +1512,7 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c } // release cs_main before calling ActivateBestChain if (need_activate_chain) { BlockValidationState state; - if (!ActivateBestChain(state, Params(), a_recent_block)) { + if (!ActivateBestChain(state, chainparams, a_recent_block)) { LogPrint(BCLog::NET, "failed to activate chain (%s)\n", state.ToString()); } } @@ -1565,11 +1525,11 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block that isn't in the main chain\n", __func__, pfrom.GetId()); } } - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); // disconnect node in case we have reached the outbound limit for serving historical blocks if (send && connman.OutboundTargetReached(true) && - (((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - pindex->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.type == MSG_FILTERED_BLOCK) && + (((pindexBestHeader != nullptr) && (pindexBestHeader->GetBlockTime() - pindex->GetBlockTime() > HISTORICAL_BLOCK_AGE)) || inv.IsMsgFilteredBlk()) && !pfrom.HasPermission(PF_DOWNLOAD) // nodes with the download permission may exceed target ) { LogPrint(BCLog::NET, "historical block serving limit reached, disconnect peer=%d\n", pfrom.GetId()); @@ -1595,7 +1555,7 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c std::shared_ptr<const CBlock> pblock; if (a_recent_block && a_recent_block->GetHash() == pindex->GetBlockHash()) { pblock = a_recent_block; - } else if (inv.type == MSG_WITNESS_BLOCK) { + } else if (inv.IsMsgWitnessBlk()) { // Fast-path: in this case it is possible to serve the block directly from disk, // as the network format matches the format on disk std::vector<uint8_t> block_data; @@ -1612,12 +1572,11 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c pblock = pblockRead; } if (pblock) { - if (inv.type == MSG_BLOCK) + if (inv.IsMsgBlk()) { connman.PushMessage(&pfrom, msgMaker.Make(SERIALIZE_TRANSACTION_NO_WITNESS, NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_WITNESS_BLOCK) + } else if (inv.IsMsgWitnessBlk()) { connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::BLOCK, *pblock)); - else if (inv.type == MSG_FILTERED_BLOCK) - { + } else if (inv.IsMsgFilteredBlk()) { bool sendMerkleBlock = false; CMerkleBlock merkleBlock; if (pfrom.m_tx_relay != nullptr) { @@ -1641,9 +1600,7 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c } // else // no response - } - else if (inv.type == MSG_CMPCT_BLOCK) - { + } else if (inv.IsMsgCmpctBlk()) { // If a peer is asking for old blocks, we're almost guaranteed // they won't have a useful mempool to match against a compact block, // and we don't feel like constructing the object for them, so @@ -1678,7 +1635,7 @@ void static ProcessGetBlockData(CNode& pfrom, const CChainParams& chainparams, c } //! Determine whether or not a peer can request a transaction, and return it (or nullptr if not found or not allowed). -CTransactionRef static FindTxForGetData(const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main) +static CTransactionRef FindTxForGetData(const CTxMemPool& mempool, const CNode& peer, const GenTxid& gtxid, const std::chrono::seconds mempool_req, const std::chrono::seconds now) LOCKS_EXCLUDED(cs_main) { auto txinfo = mempool.info(gtxid); if (txinfo.tx) { @@ -1705,13 +1662,13 @@ CTransactionRef static FindTxForGetData(const CNode& peer, const GenTxid& gtxid, return {}; } -void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) LOCKS_EXCLUDED(cs_main) +void static ProcessGetData(CNode& pfrom, Peer& peer, const CChainParams& chainparams, CConnman& connman, CTxMemPool& mempool, const std::atomic<bool>& interruptMsgProc) EXCLUSIVE_LOCKS_REQUIRED(!cs_main, peer.m_getdata_requests_mutex) { AssertLockNotHeld(cs_main); - std::deque<CInv>::iterator it = pfrom.vRecvGetData.begin(); + std::deque<CInv>::iterator it = peer.m_getdata_requests.begin(); std::vector<CInv> vNotFound; - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); const std::chrono::seconds now = GetTime<std::chrono::seconds>(); // Get last mempool request time @@ -1721,7 +1678,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm // Process as many TX items from the front of the getdata queue as // possible, since they're common and it's efficient to batch process // them. - while (it != pfrom.vRecvGetData.end() && it->IsGenTxMsg()) { + while (it != peer.m_getdata_requests.end() && it->IsGenTxMsg()) { if (interruptMsgProc) return; // The send buffer provides backpressure. If there's no space in // the buffer, pause processing until the next call. @@ -1734,7 +1691,7 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm continue; } - CTransactionRef tx = FindTxForGetData(pfrom, ToGenTxid(inv), mempool_req, now); + CTransactionRef tx = FindTxForGetData(mempool, pfrom, ToGenTxid(inv), mempool_req, now); if (tx) { // WTX and WITNESS_TX imply we serialize with witness int nSendFlags = (inv.IsMsgTx() ? SERIALIZE_TRANSACTION_NO_WITNESS : 0); @@ -1746,11 +1703,11 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm LOCK(mempool.cs); auto txiter = mempool.GetIter(tx->GetHash()); if (txiter) { - const CTxMemPool::setEntries& parents = mempool.GetMemPoolParents(*txiter); + const CTxMemPoolEntry::Parents& parents = (*txiter)->GetMemPoolParentsConst(); parent_ids_to_add.reserve(parents.size()); - for (CTxMemPool::txiter parent_iter : parents) { - if (parent_iter->GetTime() > now - UNCONDITIONAL_RELAY_DELAY) { - parent_ids_to_add.push_back(parent_iter->GetTx().GetHash()); + for (const CTxMemPoolEntry& parent : parents) { + if (parent.GetTime() > now - UNCONDITIONAL_RELAY_DELAY) { + parent_ids_to_add.push_back(parent.GetTx().GetHash()); } } } @@ -1769,16 +1726,16 @@ void static ProcessGetData(CNode& pfrom, const CChainParams& chainparams, CConnm // Only process one BLOCK item per call, since they're uncommon and can be // expensive to process. - if (it != pfrom.vRecvGetData.end() && !pfrom.fPauseSend) { + if (it != peer.m_getdata_requests.end() && !pfrom.fPauseSend) { const CInv &inv = *it++; - if (inv.type == MSG_BLOCK || inv.type == MSG_FILTERED_BLOCK || inv.type == MSG_CMPCT_BLOCK || inv.type == MSG_WITNESS_BLOCK) { + if (inv.IsGenBlkMsg()) { ProcessGetBlockData(pfrom, chainparams, inv, connman); } // else: If the first item on the queue is an unknown type, we erase it // and continue processing the queue on the next call. } - pfrom.vRecvGetData.erase(pfrom.vRecvGetData.begin(), it); + peer.m_getdata_requests.erase(peer.m_getdata_requests.begin(), it); if (!vNotFound.empty()) { // Let the peer know that we didn't find what it asked for, so it doesn't @@ -1807,25 +1764,24 @@ static uint32_t GetFetchFlags(const CNode& pfrom) EXCLUSIVE_LOCKS_REQUIRED(cs_ma return nFetchFlags; } -inline void static SendBlockTransactions(const CBlock& block, const BlockTransactionsRequest& req, CNode& pfrom, CConnman& connman) { +void PeerManager::SendBlockTransactions(CNode& pfrom, const CBlock& block, const BlockTransactionsRequest& req) { BlockTransactions resp(req); for (size_t i = 0; i < req.indexes.size(); i++) { if (req.indexes[i] >= block.vtx.size()) { - LOCK(cs_main); Misbehaving(pfrom.GetId(), 100, "getblocktxn with out-of-bounds tx indices"); return; } resp.txn[i] = block.vtx[req.indexes[i]]; } LOCK(cs_main); - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); int nSendFlags = State(pfrom.GetId())->fWantsCmpctWitness ? 0 : SERIALIZE_TRANSACTION_NO_WITNESS; - connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); + m_connman.PushMessage(&pfrom, msgMaker.Make(nSendFlags, NetMsgType::BLOCKTXN, resp)); } -static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateManager& chainman, CTxMemPool& mempool, const std::vector<CBlockHeader>& headers, const CChainParams& chainparams, bool via_compact_block) +void PeerManager::ProcessHeadersMessage(CNode& pfrom, const std::vector<CBlockHeader>& headers, bool via_compact_block) { - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); size_t nCount = headers.size(); if (nCount == 0) { @@ -1849,7 +1805,7 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan // nUnconnectingHeaders gets reset back to 0. if (!LookupBlockIndex(headers[0].hashPrevBlock) && nCount < MAX_BLOCKS_TO_ANNOUNCE) { nodestate->nUnconnectingHeaders++; - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexBestHeader), uint256())); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexBestHeader), uint256())); LogPrint(BCLog::NET, "received header %s: missing prev block %s, sending getheaders (%d) to end (peer=%d, nUnconnectingHeaders=%d)\n", headers[0].GetHash().ToString(), headers[0].hashPrevBlock.ToString(), @@ -1883,7 +1839,7 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan } BlockValidationState state; - if (!chainman.ProcessNewBlockHeaders(headers, state, chainparams, &pindexLast)) { + if (!m_chainman.ProcessNewBlockHeaders(headers, state, m_chainparams, &pindexLast)) { if (state.IsInvalid()) { MaybePunishNodeForBlock(pfrom.GetId(), state, via_compact_block, "invalid header received"); return; @@ -1914,10 +1870,10 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan // TODO: optimize: if pindexLast is an ancestor of ::ChainActive().Tip or pindexBestHeader, continue // from there instead. LogPrint(BCLog::NET, "more getheaders (%d) to end to peer=%d (startheight:%d)\n", pindexLast->nHeight, pfrom.GetId(), pfrom.nStartingHeight); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256())); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexLast), uint256())); } - bool fCanDirectFetch = CanDirectFetch(chainparams.GetConsensus()); + bool fCanDirectFetch = CanDirectFetch(m_chainparams.GetConsensus()); // If this set of headers is valid and ends in a block with at least as // much work as our tip, download as much as possible. if (fCanDirectFetch && pindexLast->IsValid(BLOCK_VALID_TREE) && ::ChainActive().Tip()->nChainWork <= pindexLast->nChainWork) { @@ -1927,7 +1883,7 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan while (pindexWalk && !::ChainActive().Contains(pindexWalk) && vToFetch.size() <= MAX_BLOCKS_IN_TRANSIT_PER_PEER) { if (!(pindexWalk->nStatus & BLOCK_HAVE_DATA) && !mapBlocksInFlight.count(pindexWalk->GetBlockHash()) && - (!IsWitnessEnabled(pindexWalk->pprev, chainparams.GetConsensus()) || State(pfrom.GetId())->fHaveWitness)) { + (!IsWitnessEnabled(pindexWalk->pprev, m_chainparams.GetConsensus()) || State(pfrom.GetId())->fHaveWitness)) { // We don't have this block, and it's not yet in flight. vToFetch.push_back(pindexWalk); } @@ -1951,7 +1907,7 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan } uint32_t nFetchFlags = GetFetchFlags(pfrom); vGetData.push_back(CInv(MSG_BLOCK | nFetchFlags, pindex->GetBlockHash())); - MarkBlockAsInFlight(mempool, pfrom.GetId(), pindex->GetBlockHash(), pindex); + MarkBlockAsInFlight(m_mempool, pfrom.GetId(), pindex->GetBlockHash(), pindex); LogPrint(BCLog::NET, "Requesting block %s from peer=%d\n", pindex->GetBlockHash().ToString(), pfrom.GetId()); } @@ -1964,7 +1920,7 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan // In any case, we want to download using a compact block, not a regular one vGetData[0] = CInv(MSG_CMPCT_BLOCK, vGetData[0].hash); } - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vGetData)); } } } @@ -1982,18 +1938,19 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan // until we have a headers chain that has at least // nMinimumChainWork, even if a peer has a chain past our tip, // as an anti-DoS measure. - if (IsOutboundDisconnectionCandidate(pfrom)) { + if (pfrom.IsOutboundOrBlockRelayConn()) { LogPrintf("Disconnecting outbound peer %d -- headers chain has insufficient work\n", pfrom.GetId()); pfrom.fDisconnect = true; } } } - if (!pfrom.fDisconnect && IsOutboundDisconnectionCandidate(pfrom) && nodestate->pindexBestKnownBlock != nullptr && pfrom.m_tx_relay != nullptr) { - // If this is an outbound full-relay peer, check to see if we should protect - // it from the bad/lagging chain logic. - // Note that block-relay-only peers are already implicitly protected, so we - // only consider setting m_protect for the full-relay peers. + // If this is an outbound full-relay peer, check to see if we should protect + // it from the bad/lagging chain logic. + // Note that outbound block-relay peers are excluded from this protection, and + // thus always subject to eviction under the bad/lagging chain logic. + // See ChainSyncTimeoutState. + if (!pfrom.fDisconnect && pfrom.IsFullOutboundConn() && nodestate->pindexBestKnownBlock != nullptr) { if (g_outbound_peers_with_protect_from_disconnect < MAX_OUTBOUND_PEERS_TO_PROTECT_FROM_DISCONNECT && nodestate->pindexBestKnownBlock->nChainWork >= ::ChainActive().Tip()->nChainWork && !nodestate->m_chain_sync.m_protect) { LogPrint(BCLog::NET, "Protecting outbound peer=%d from eviction\n", pfrom.GetId()); nodestate->m_chain_sync.m_protect = true; @@ -2005,13 +1962,20 @@ static void ProcessHeadersMessage(CNode& pfrom, CConnman& connman, ChainstateMan return; } -void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uint256>& orphan_work_set, std::list<CTransactionRef>& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans) +/** + * Reconsider orphan transactions after a parent has been accepted to the mempool. + * + * @param[in/out] orphan_work_set The set of orphan transactions to reconsider. Generally only one + * orphan will be reconsidered on each call of this function. This set + * may be added to if accepting an orphan causes its children to be + * reconsidered. + */ +void PeerManager::ProcessOrphanTx(std::set<uint256>& orphan_work_set) { AssertLockHeld(cs_main); AssertLockHeld(g_cs_orphans); - std::set<NodeId> setMisbehaving; - bool done = false; - while (!done && !orphan_work_set.empty()) { + + while (!orphan_work_set.empty()) { const uint256 orphanHash = *orphan_work_set.begin(); orphan_work_set.erase(orphan_work_set.begin()); @@ -2019,18 +1983,13 @@ void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uin if (orphan_it == mapOrphanTransactions.end()) continue; const CTransactionRef porphanTx = orphan_it->second.tx; - const CTransaction& orphanTx = *porphanTx; - NodeId fromPeer = orphan_it->second.fromPeer; - // Use a new TxValidationState because orphans come from different peers (and we call - // MaybePunishNodeForTx based on the source peer from the orphan map, not based on the peer - // that relayed the previous transaction). - TxValidationState orphan_state; - - if (setMisbehaving.count(fromPeer)) continue; - if (AcceptToMemoryPool(mempool, orphan_state, porphanTx, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { + TxValidationState state; + std::list<CTransactionRef> removed_txn; + + if (AcceptToMemoryPool(m_mempool, state, porphanTx, &removed_txn, false /* bypass_limits */)) { LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString()); - RelayTransaction(orphanHash, porphanTx->GetWitnessHash(), connman); - for (unsigned int i = 0; i < orphanTx.vout.size(); i++) { + RelayTransaction(orphanHash, porphanTx->GetWitnessHash(), m_connman); + for (unsigned int i = 0; i < porphanTx->vout.size(); i++) { auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i)); if (it_by_prev != mapOrphanTransactionsByPrev.end()) { for (const auto& elem : it_by_prev->second) { @@ -2039,22 +1998,23 @@ void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uin } } EraseOrphanTx(orphanHash); - done = true; - } else if (orphan_state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) { - if (orphan_state.IsInvalid()) { - // Punish peer that gave us an invalid orphan tx - if (MaybePunishNodeForTx(fromPeer, orphan_state)) { - setMisbehaving.insert(fromPeer); - } + for (const CTransactionRef& removedTx : removed_txn) { + AddToCompactExtraTransactions(removedTx); + } + break; + } else if (state.GetResult() != TxValidationResult::TX_MISSING_INPUTS) { + if (state.IsInvalid()) { LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s from peer=%d. %s\n", orphanHash.ToString(), - fromPeer, - orphan_state.ToString()); + orphan_it->second.fromPeer, + state.ToString()); + // Maybe punish peer that gave us an invalid orphan tx + MaybePunishNodeForTx(orphan_it->second.fromPeer, state); } // Has inputs but not accepted to mempool // Probably non-standard or insufficient fee LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString()); - if (orphan_state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) { + if (state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) { // We can add the wtxid of this transaction to our reject filter. // Do not add txids of witness transactions or witness-stripped // transactions to the filter, as they can have been malleated; @@ -2069,7 +2029,7 @@ void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uin // for concerns around weakening security of unupgraded nodes // if we start doing this too early. assert(recentRejects); - recentRejects->insert(orphanTx.GetWitnessHash()); + recentRejects->insert(porphanTx->GetWitnessHash()); // If the transaction failed for TX_INPUTS_NOT_STANDARD, // then we know that the witness was irrelevant to the policy // failure, since this check depends only on the txid @@ -2078,17 +2038,17 @@ void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uin // processing of this transaction in the event that child // transactions are later received (resulting in // parent-fetching by txid via the orphan-handling logic). - if (orphan_state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && orphanTx.GetWitnessHash() != orphanTx.GetHash()) { + if (state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && porphanTx->GetWitnessHash() != porphanTx->GetHash()) { // We only add the txid if it differs from the wtxid, to // avoid wasting entries in the rolling bloom filter. - recentRejects->insert(orphanTx.GetHash()); + recentRejects->insert(porphanTx->GetHash()); } } EraseOrphanTx(orphanHash); - done = true; + break; } - mempool.check(&::ChainstateActive().CoinsTip()); } + m_mempool.check(&::ChainstateActive().CoinsTip()); } /** @@ -2096,7 +2056,7 @@ void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uin * * May disconnect from the peer in the case of a bad request. * - * @param[in] pfrom The peer that we received the request from + * @param[in] peer The peer that we received the request from * @param[in] chain_params Chain parameters * @param[in] filter_type The filter type the request is for. Must be basic filters. * @param[in] start_height The start height for the request @@ -2106,7 +2066,7 @@ void static ProcessOrphanTx(CConnman& connman, CTxMemPool& mempool, std::set<uin * @param[out] filter_index The filter index, if the request can be serviced. * @return True if the request can be serviced. */ -static bool PrepareBlockFilterRequest(CNode& pfrom, const CChainParams& chain_params, +static bool PrepareBlockFilterRequest(CNode& peer, const CChainParams& chain_params, BlockFilterType filter_type, uint32_t start_height, const uint256& stop_hash, uint32_t max_height_diff, const CBlockIndex*& stop_index, @@ -2114,11 +2074,11 @@ static bool PrepareBlockFilterRequest(CNode& pfrom, const CChainParams& chain_pa { const bool supported_filter_type = (filter_type == BlockFilterType::BASIC && - gArgs.GetBoolArg("-peerblockfilters", DEFAULT_PEERBLOCKFILTERS)); + (peer.GetLocalServices() & NODE_COMPACT_FILTERS)); if (!supported_filter_type) { LogPrint(BCLog::NET, "peer %d requested unsupported block filter type: %d\n", - pfrom.GetId(), static_cast<uint8_t>(filter_type)); - pfrom.fDisconnect = true; + peer.GetId(), static_cast<uint8_t>(filter_type)); + peer.fDisconnect = true; return false; } @@ -2129,8 +2089,8 @@ static bool PrepareBlockFilterRequest(CNode& pfrom, const CChainParams& chain_pa // Check that the stop block exists and the peer would be allowed to fetch it. if (!stop_index || !BlockRequestAllowed(stop_index, chain_params.GetConsensus())) { LogPrint(BCLog::NET, "peer %d requested invalid block hash: %s\n", - pfrom.GetId(), stop_hash.ToString()); - pfrom.fDisconnect = true; + peer.GetId(), stop_hash.ToString()); + peer.fDisconnect = true; return false; } } @@ -2139,14 +2099,14 @@ static bool PrepareBlockFilterRequest(CNode& pfrom, const CChainParams& chain_pa if (start_height > stop_height) { LogPrint(BCLog::NET, "peer %d sent invalid getcfilters/getcfheaders with " /* Continued */ "start height %d and stop height %d\n", - pfrom.GetId(), start_height, stop_height); - pfrom.fDisconnect = true; + peer.GetId(), start_height, stop_height); + peer.fDisconnect = true; return false; } if (stop_height - start_height >= max_height_diff) { LogPrint(BCLog::NET, "peer %d requested too many cfilters/cfheaders: %d / %d\n", - pfrom.GetId(), stop_height - start_height + 1, max_height_diff); - pfrom.fDisconnect = true; + peer.GetId(), stop_height - start_height + 1, max_height_diff); + peer.fDisconnect = true; return false; } @@ -2164,12 +2124,12 @@ static bool PrepareBlockFilterRequest(CNode& pfrom, const CChainParams& chain_pa * * May disconnect from the peer in the case of a bad request. * - * @param[in] pfrom The peer that we received the request from + * @param[in] peer The peer that we received the request from * @param[in] vRecv The raw message received * @param[in] chain_params Chain parameters * @param[in] connman Pointer to the connection manager */ -static void ProcessGetCFilters(CNode& pfrom, CDataStream& vRecv, const CChainParams& chain_params, +static void ProcessGetCFilters(CNode& peer, CDataStream& vRecv, const CChainParams& chain_params, CConnman& connman) { uint8_t filter_type_ser; @@ -2182,13 +2142,12 @@ static void ProcessGetCFilters(CNode& pfrom, CDataStream& vRecv, const CChainPar const CBlockIndex* stop_index; BlockFilterIndex* filter_index; - if (!PrepareBlockFilterRequest(pfrom, chain_params, filter_type, start_height, stop_hash, + if (!PrepareBlockFilterRequest(peer, chain_params, filter_type, start_height, stop_hash, MAX_GETCFILTERS_SIZE, stop_index, filter_index)) { return; } std::vector<BlockFilter> filters; - if (!filter_index->LookupFilterRange(start_height, stop_index, filters)) { LogPrint(BCLog::NET, "Failed to find block filter in index: filter_type=%s, start_height=%d, stop_hash=%s\n", BlockFilterTypeName(filter_type), start_height, stop_hash.ToString()); @@ -2196,9 +2155,9 @@ static void ProcessGetCFilters(CNode& pfrom, CDataStream& vRecv, const CChainPar } for (const auto& filter : filters) { - CSerializedNetMsg msg = CNetMsgMaker(pfrom.GetSendVersion()) + CSerializedNetMsg msg = CNetMsgMaker(peer.GetCommonVersion()) .Make(NetMsgType::CFILTER, filter); - connman.PushMessage(&pfrom, std::move(msg)); + connman.PushMessage(&peer, std::move(msg)); } } @@ -2207,12 +2166,12 @@ static void ProcessGetCFilters(CNode& pfrom, CDataStream& vRecv, const CChainPar * * May disconnect from the peer in the case of a bad request. * - * @param[in] pfrom The peer that we received the request from + * @param[in] peer The peer that we received the request from * @param[in] vRecv The raw message received * @param[in] chain_params Chain parameters * @param[in] connman Pointer to the connection manager */ -static void ProcessGetCFHeaders(CNode& pfrom, CDataStream& vRecv, const CChainParams& chain_params, +static void ProcessGetCFHeaders(CNode& peer, CDataStream& vRecv, const CChainParams& chain_params, CConnman& connman) { uint8_t filter_type_ser; @@ -2225,7 +2184,7 @@ static void ProcessGetCFHeaders(CNode& pfrom, CDataStream& vRecv, const CChainPa const CBlockIndex* stop_index; BlockFilterIndex* filter_index; - if (!PrepareBlockFilterRequest(pfrom, chain_params, filter_type, start_height, stop_hash, + if (!PrepareBlockFilterRequest(peer, chain_params, filter_type, start_height, stop_hash, MAX_GETCFHEADERS_SIZE, stop_index, filter_index)) { return; } @@ -2248,13 +2207,13 @@ static void ProcessGetCFHeaders(CNode& pfrom, CDataStream& vRecv, const CChainPa return; } - CSerializedNetMsg msg = CNetMsgMaker(pfrom.GetSendVersion()) + CSerializedNetMsg msg = CNetMsgMaker(peer.GetCommonVersion()) .Make(NetMsgType::CFHEADERS, filter_type_ser, stop_index->GetBlockHash(), prev_header, filter_hashes); - connman.PushMessage(&pfrom, std::move(msg)); + connman.PushMessage(&peer, std::move(msg)); } /** @@ -2262,12 +2221,12 @@ static void ProcessGetCFHeaders(CNode& pfrom, CDataStream& vRecv, const CChainPa * * May disconnect from the peer in the case of a bad request. * - * @param[in] pfrom The peer that we received the request from + * @param[in] peer The peer that we received the request from * @param[in] vRecv The raw message received * @param[in] chain_params Chain parameters * @param[in] connman Pointer to the connection manager */ -static void ProcessGetCFCheckPt(CNode& pfrom, CDataStream& vRecv, const CChainParams& chain_params, +static void ProcessGetCFCheckPt(CNode& peer, CDataStream& vRecv, const CChainParams& chain_params, CConnman& connman) { uint8_t filter_type_ser; @@ -2279,7 +2238,7 @@ static void ProcessGetCFCheckPt(CNode& pfrom, CDataStream& vRecv, const CChainPa const CBlockIndex* stop_index; BlockFilterIndex* filter_index; - if (!PrepareBlockFilterRequest(pfrom, chain_params, filter_type, /*start_height=*/0, stop_hash, + if (!PrepareBlockFilterRequest(peer, chain_params, filter_type, /*start_height=*/0, stop_hash, /*max_height_diff=*/std::numeric_limits<uint32_t>::max(), stop_index, filter_index)) { return; @@ -2300,25 +2259,17 @@ static void ProcessGetCFCheckPt(CNode& pfrom, CDataStream& vRecv, const CChainPa } } - CSerializedNetMsg msg = CNetMsgMaker(pfrom.GetSendVersion()) + CSerializedNetMsg msg = CNetMsgMaker(peer.GetCommonVersion()) .Make(NetMsgType::CFCHECKPT, filter_type_ser, stop_index->GetBlockHash(), headers); - connman.PushMessage(&pfrom, std::move(msg)); + connman.PushMessage(&peer, std::move(msg)); } -void ProcessMessage( - CNode& pfrom, - const std::string& msg_type, - CDataStream& vRecv, - const std::chrono::microseconds time_received, - const CChainParams& chainparams, - ChainstateManager& chainman, - CTxMemPool& mempool, - CConnman& connman, - BanMan* banman, - const std::atomic<bool>& interruptMsgProc) +void PeerManager::ProcessMessage(CNode& pfrom, const std::string& msg_type, CDataStream& vRecv, + const std::chrono::microseconds time_received, + const std::atomic<bool>& interruptMsgProc) { LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(msg_type), vRecv.size(), pfrom.GetId()); if (gArgs.IsArgSet("-dropmessagestest") && GetRand(gArgs.GetArg("-dropmessagestest", 0)) == 0) @@ -2327,12 +2278,13 @@ void ProcessMessage( return; } + PeerRef peer = GetPeerRef(pfrom.GetId()); + if (peer == nullptr) return; if (msg_type == NetMsgType::VERSION) { // Each connection can only send one version message if (pfrom.nVersion != 0) { - LOCK(cs_main); Misbehaving(pfrom.GetId(), 1, "redundant version message"); return; } @@ -2344,19 +2296,17 @@ void ProcessMessage( uint64_t nServiceInt; ServiceFlags nServices; int nVersion; - int nSendVersion; std::string cleanSubVer; int nStartingHeight = -1; bool fRelay = true; vRecv >> nVersion >> nServiceInt >> nTime >> addrMe; - nSendVersion = std::min(nVersion, PROTOCOL_VERSION); nServices = ServiceFlags(nServiceInt); - if (!pfrom.fInbound) + if (!pfrom.IsInboundConn()) { - connman.SetServices(pfrom.addr, nServices); + m_connman.SetServices(pfrom.addr, nServices); } - if (!pfrom.fInbound && !pfrom.fFeeler && !pfrom.m_manual_connection && !HasAllDesirableServiceFlags(nServices)) + if (pfrom.ExpectServicesFromConn() && !HasAllDesirableServiceFlags(nServices)) { LogPrint(BCLog::NET, "peer=%d does not offer the expected services (%08x offered, %08x expected); disconnecting\n", pfrom.GetId(), nServices, GetDesirableServiceFlags(nServices)); pfrom.fDisconnect = true; @@ -2383,27 +2333,37 @@ void ProcessMessage( if (!vRecv.empty()) vRecv >> fRelay; // Disconnect if we connected to ourself - if (pfrom.fInbound && !connman.CheckIncomingNonce(nNonce)) + if (pfrom.IsInboundConn() && !m_connman.CheckIncomingNonce(nNonce)) { LogPrintf("connected to self at %s, disconnecting\n", pfrom.addr.ToString()); pfrom.fDisconnect = true; return; } - if (pfrom.fInbound && addrMe.IsRoutable()) + if (pfrom.IsInboundConn() && addrMe.IsRoutable()) { SeenLocal(addrMe); } // Be shy and don't send version until we hear - if (pfrom.fInbound) - PushNodeVersion(pfrom, connman, GetAdjustedTime()); + if (pfrom.IsInboundConn()) + PushNodeVersion(pfrom, m_connman, GetAdjustedTime()); + + // Change version + const int greatest_common_version = std::min(nVersion, PROTOCOL_VERSION); + pfrom.SetCommonVersion(greatest_common_version); + pfrom.nVersion = nVersion; + + const CNetMsgMaker msg_maker(greatest_common_version); - if (nVersion >= WTXID_RELAY_VERSION) { - connman.PushMessage(&pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::WTXIDRELAY)); + if (greatest_common_version >= WTXID_RELAY_VERSION) { + m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::WTXIDRELAY)); } - connman.PushMessage(&pfrom, CNetMsgMaker(INIT_PROTO_VERSION).Make(NetMsgType::VERACK)); + m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::VERACK)); + + // Signal ADDRv2 support (BIP155). + m_connman.PushMessage(&pfrom, msg_maker.Make(NetMsgType::SENDADDRV2)); pfrom.nServices = nServices; pfrom.SetAddrLocal(addrMe); @@ -2424,10 +2384,6 @@ void ProcessMessage( pfrom.m_tx_relay->fRelayTxes = fRelay; // set to true after we get the first filter* message } - // Change version - pfrom.SetSendVersion(nSendVersion); - pfrom.nVersion = nVersion; - if((nServices & NODE_WITNESS)) { LOCK(cs_main); @@ -2440,9 +2396,23 @@ void ProcessMessage( UpdatePreferredDownload(pfrom, State(pfrom.GetId())); } - if (!pfrom.fInbound && pfrom.IsAddrRelayPeer()) - { - // Advertise our address + if (!pfrom.IsInboundConn() && !pfrom.IsBlockOnlyConn()) { + // For outbound peers, we try to relay our address (so that other + // nodes can try to find us more quickly, as we have no guarantee + // that an outbound peer is even aware of how to reach us) and do a + // one-time address fetch (to help populate/update our addrman). If + // we're starting up for the first time, our addrman may be pretty + // empty and no one will know who we are, so these mechanisms are + // important to help us connect to the network. + // + // We also update the addrman to record connection success for + // these peers (which include OUTBOUND_FULL_RELAY and FEELER + // connections) so that addrman will have an up-to-date notion of + // which peers are online and available. + // + // We skip these operations for BLOCK_RELAY peers to avoid + // potentially leaking information about our BLOCK_RELAY + // connections via the addrman or address relay. if (fListen && !::ChainstateActive().IsInitialBlockDownload()) { CAddress addr = GetLocalAddress(&pfrom.addr, pfrom.GetLocalServices()); @@ -2459,9 +2429,12 @@ void ProcessMessage( } // Get recent addresses - connman.PushMessage(&pfrom, CNetMsgMaker(nSendVersion).Make(NetMsgType::GETADDR)); + m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make(NetMsgType::GETADDR)); pfrom.fGetAddr = true; - connman.MarkAddressGood(pfrom.addr); + + // Moves address from New to Tried table in Addrman, resolves + // tried-table collisions, etc. + m_connman.MarkAddressGood(pfrom.addr); } std::string remoteAddr; @@ -2478,14 +2451,13 @@ void ProcessMessage( AddTimeData(pfrom.addr, nTimeOffset); // If the peer is old enough to have the old alert system, send it the final alert. - if (pfrom.nVersion <= 70012) { + if (greatest_common_version <= 70012) { CDataStream finalAlert(ParseHex("60010000000000000000000000ffffff7f00000000ffffff7ffeffff7f01ffffff7f00000000ffffff7f00ffffff7f002f555247454e543a20416c657274206b657920636f6d70726f6d697365642c2075706772616465207265717569726564004630440220653febd6410f470f6bae11cad19c48413becb1ac2c17f908fd0fd53bdc3abd5202206d0e9c96fe88d4a0f01ed9dedae2b6f9e00da94cad0fecaae66ecf689bf71b50"), SER_NETWORK, PROTOCOL_VERSION); - connman.PushMessage(&pfrom, CNetMsgMaker(nSendVersion).Make("alert", finalAlert)); + m_connman.PushMessage(&pfrom, CNetMsgMaker(greatest_common_version).Make("alert", finalAlert)); } // Feeler connections exist only to verify if address is online. - if (pfrom.fFeeler) { - assert(pfrom.fInbound == false); + if (pfrom.IsFeelerConn()) { pfrom.fDisconnect = true; } return; @@ -2493,19 +2465,17 @@ void ProcessMessage( if (pfrom.nVersion == 0) { // Must have a version message before anything else - LOCK(cs_main); Misbehaving(pfrom.GetId(), 1, "non-version message before version handshake"); return; } // At this point, the outgoing message serialization version can't change. - const CNetMsgMaker msgMaker(pfrom.GetSendVersion()); + const CNetMsgMaker msgMaker(pfrom.GetCommonVersion()); - if (msg_type == NetMsgType::VERACK) - { - pfrom.SetRecvVersion(std::min(pfrom.nVersion.load(), PROTOCOL_VERSION)); + if (msg_type == NetMsgType::VERACK) { + if (pfrom.fSuccessfullyConnected) return; - if (!pfrom.fInbound) { + if (!pfrom.IsInboundConn()) { // Mark this node as currently connected, so we update its timestamp later. LOCK(cs_main); State(pfrom.GetId())->fCurrentlyConnected = true; @@ -2515,14 +2485,14 @@ void ProcessMessage( pfrom.m_tx_relay == nullptr ? "block-relay" : "full-relay"); } - if (pfrom.nVersion >= SENDHEADERS_VERSION) { + if (pfrom.GetCommonVersion() >= SENDHEADERS_VERSION) { // Tell our peer we prefer to receive headers rather than inv's // We send this to non-NODE NETWORK peers as well, because even // non-NODE NETWORK peers can announce blocks (such as pruning // nodes) - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDHEADERS)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDHEADERS)); } - if (pfrom.nVersion >= SHORT_IDS_BLOCKS_VERSION) { + if (pfrom.GetCommonVersion() >= SHORT_IDS_BLOCKS_VERSION) { // Tell our peer we are willing to provide version 1 or 2 cmpctblocks // However, we do not request new block announcements using // cmpctblock messages. @@ -2531,9 +2501,9 @@ void ProcessMessage( bool fAnnounceUsingCMPCTBLOCK = false; uint64_t nCMPCTBLOCKVersion = 2; if (pfrom.GetLocalServices() & NODE_WITNESS) - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); nCMPCTBLOCKVersion = 1; - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::SENDCMPCT, fAnnounceUsingCMPCTBLOCK, nCMPCTBLOCKVersion)); } pfrom.fSuccessfullyConnected = true; return; @@ -2548,7 +2518,7 @@ void ProcessMessage( pfrom.fDisconnect = true; return; } - if (pfrom.nVersion >= WTXID_RELAY_VERSION) { + if (pfrom.GetCommonVersion() >= WTXID_RELAY_VERSION) { LOCK(cs_main); if (!State(pfrom.GetId())->m_wtxid_relay) { State(pfrom.GetId())->m_wtxid_relay = true; @@ -2559,23 +2529,29 @@ void ProcessMessage( } if (!pfrom.fSuccessfullyConnected) { - // Must have a verack message before anything else - LOCK(cs_main); - Misbehaving(pfrom.GetId(), 1, "non-verack message before version handshake"); + LogPrint(BCLog::NET, "Unsupported message \"%s\" prior to verack from peer=%d\n", SanitizeString(msg_type), pfrom.GetId()); return; } - if (msg_type == NetMsgType::ADDR) { + if (msg_type == NetMsgType::ADDR || msg_type == NetMsgType::ADDRV2) { + int stream_version = vRecv.GetVersion(); + if (msg_type == NetMsgType::ADDRV2) { + // Add ADDRV2_FORMAT to the version so that the CNetAddr and CAddress + // unserialize methods know that an address in v2 format is coming. + stream_version |= ADDRV2_FORMAT; + } + + OverrideStream<CDataStream> s(&vRecv, vRecv.GetType(), stream_version); std::vector<CAddress> vAddr; - vRecv >> vAddr; - if (!pfrom.IsAddrRelayPeer()) { + s >> vAddr; + + if (!pfrom.RelayAddrsWithConn()) { return; } if (vAddr.size() > MAX_ADDR_TO_SEND) { - LOCK(cs_main); - Misbehaving(pfrom.GetId(), 20, strprintf("addr message size = %u", vAddr.size())); + Misbehaving(pfrom.GetId(), 20, strprintf("%s message size = %u", msg_type, vAddr.size())); return; } @@ -2597,7 +2573,7 @@ void ProcessMessage( if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60) addr.nTime = nNow - 5 * 24 * 60 * 60; pfrom.AddAddressKnown(addr); - if (banman && (banman->IsDiscouraged(addr) || banman->IsBanned(addr))) { + if (m_banman && (m_banman->IsDiscouraged(addr) || m_banman->IsBanned(addr))) { // Do not process banned/discouraged addresses beyond remembering we received them continue; } @@ -2605,20 +2581,25 @@ void ProcessMessage( if (addr.nTime > nSince && !pfrom.fGetAddr && vAddr.size() <= 10 && addr.IsRoutable()) { // Relay to a limited number of other nodes - RelayAddress(addr, fReachable, connman); + RelayAddress(addr, fReachable, m_connman); } // Do not store addresses outside our network if (fReachable) vAddrOk.push_back(addr); } - connman.AddNewAddresses(vAddrOk, pfrom.addr, 2 * 60 * 60); + m_connman.AddNewAddresses(vAddrOk, pfrom.addr, 2 * 60 * 60); if (vAddr.size() < 1000) pfrom.fGetAddr = false; - if (pfrom.fOneShot) + if (pfrom.IsAddrFetchConn()) pfrom.fDisconnect = true; return; } + if (msg_type == NetMsgType::SENDADDRV2) { + pfrom.m_wants_addrv2 = true; + return; + } + if (msg_type == NetMsgType::SENDHEADERS) { LOCK(cs_main); State(pfrom.GetId())->fPreferHeaders = true; @@ -2653,7 +2634,6 @@ void ProcessMessage( vRecv >> vInv; if (vInv.size() > MAX_INV_SZ) { - LOCK(cs_main); Misbehaving(pfrom.GetId(), 20, strprintf("inv message size = %u", vInv.size())); return; } @@ -2669,14 +2649,11 @@ void ProcessMessage( LOCK(cs_main); - uint32_t nFetchFlags = GetFetchFlags(pfrom); const auto current_time = GetTime<std::chrono::microseconds>(); uint256* best_block{nullptr}; - for (CInv &inv : vInv) - { - if (interruptMsgProc) - return; + for (CInv& inv : vInv) { + if (interruptMsgProc) return; // Ignore INVs that don't match wtxidrelay setting. // Note that orphan parent fetching always uses MSG_TX GETDATAs regardless of the wtxidrelay setting. @@ -2687,14 +2664,10 @@ void ProcessMessage( if (inv.IsMsgWtx()) continue; } - bool fAlreadyHave = AlreadyHave(inv, mempool); - LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); - - if (inv.IsMsgTx()) { - inv.type |= nFetchFlags; - } + if (inv.IsMsgBlk()) { + const bool fAlreadyHave = AlreadyHaveBlock(inv.hash); + LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); - if (inv.type == MSG_BLOCK) { UpdateBlockAvailability(pfrom.GetId(), inv.hash); if (!fAlreadyHave && !fImporting && !fReindex && !mapBlocksInFlight.count(inv.hash)) { // Headers-first is the primary method of announcement on @@ -2704,20 +2677,26 @@ void ProcessMessage( // then fetch the blocks we need to catch up. best_block = &inv.hash; } - } else { + } else if (inv.IsGenTxMsg()) { + const GenTxid gtxid = ToGenTxid(inv); + const bool fAlreadyHave = AlreadyHaveTx(gtxid, m_mempool); + LogPrint(BCLog::NET, "got inv: %s %s peer=%d\n", inv.ToString(), fAlreadyHave ? "have" : "new", pfrom.GetId()); + pfrom.AddKnownTx(inv.hash); if (fBlocksOnly) { LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol, disconnecting peer=%d\n", inv.hash.ToString(), pfrom.GetId()); pfrom.fDisconnect = true; return; - } else if (!fAlreadyHave && !chainman.ActiveChainstate().IsInitialBlockDownload()) { - RequestTx(State(pfrom.GetId()), ToGenTxid(inv), current_time); + } else if (!fAlreadyHave && !m_chainman.ActiveChainstate().IsInitialBlockDownload()) { + AddTxAnnouncement(pfrom, gtxid, current_time); } + } else { + LogPrint(BCLog::NET, "Unknown inv type \"%s\" received from peer=%d\n", inv.ToString(), pfrom.GetId()); } } if (best_block != nullptr) { - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexBestHeader), *best_block)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexBestHeader), *best_block)); LogPrint(BCLog::NET, "getheaders (%d) %s to peer=%d\n", pindexBestHeader->nHeight, best_block->ToString(), pfrom.GetId()); } @@ -2729,7 +2708,6 @@ void ProcessMessage( vRecv >> vInv; if (vInv.size() > MAX_INV_SZ) { - LOCK(cs_main); Misbehaving(pfrom.GetId(), 20, strprintf("getdata message size = %u", vInv.size())); return; } @@ -2740,8 +2718,12 @@ void ProcessMessage( LogPrint(BCLog::NET, "received getdata for: %s peer=%d\n", vInv[0].ToString(), pfrom.GetId()); } - pfrom.vRecvGetData.insert(pfrom.vRecvGetData.end(), vInv.begin(), vInv.end()); - ProcessGetData(pfrom, chainparams, connman, mempool, interruptMsgProc); + { + LOCK(peer->m_getdata_requests_mutex); + peer->m_getdata_requests.insert(peer->m_getdata_requests.end(), vInv.begin(), vInv.end()); + ProcessGetData(pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc); + } + return; } @@ -2770,7 +2752,7 @@ void ProcessMessage( a_recent_block = most_recent_block; } BlockValidationState state; - if (!ActivateBestChain(state, Params(), a_recent_block)) { + if (!ActivateBestChain(state, m_chainparams, a_recent_block)) { LogPrint(BCLog::NET, "failed to activate chain (%s)\n", state.ToString()); } } @@ -2794,7 +2776,7 @@ void ProcessMessage( } // If pruning, don't inv blocks unless we have on disk and are likely to still have // for some reasonable time window (1 hour) that block relay might require. - const int nPrunedBlocksLikelyToHave = MIN_BLOCKS_TO_KEEP - 3600 / chainparams.GetConsensus().nPowTargetSpacing; + const int nPrunedBlocksLikelyToHave = MIN_BLOCKS_TO_KEEP - 3600 / m_chainparams.GetConsensus().nPowTargetSpacing; if (fPruneMode && (!(pindex->nStatus & BLOCK_HAVE_DATA) || pindex->nHeight <= ::ChainActive().Tip()->nHeight - nPrunedBlocksLikelyToHave)) { LogPrint(BCLog::NET, " getblocks stopping, pruned or too old block at %d %s\n", pindex->nHeight, pindex->GetBlockHash().ToString()); @@ -2825,40 +2807,42 @@ void ProcessMessage( // Unlock cs_most_recent_block to avoid cs_main lock inversion } if (recent_block) { - SendBlockTransactions(*recent_block, req, pfrom, connman); + SendBlockTransactions(pfrom, *recent_block, req); return; } - LOCK(cs_main); + { + LOCK(cs_main); - const CBlockIndex* pindex = LookupBlockIndex(req.blockhash); - if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) { - LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId()); - return; - } + const CBlockIndex* pindex = LookupBlockIndex(req.blockhash); + if (!pindex || !(pindex->nStatus & BLOCK_HAVE_DATA)) { + LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block we don't have\n", pfrom.GetId()); + return; + } - if (pindex->nHeight < ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) { - // If an older block is requested (should never happen in practice, - // but can happen in tests) send a block response instead of a - // blocktxn response. Sending a full block response instead of a - // small blocktxn response is preferable in the case where a peer - // might maliciously send lots of getblocktxn requests to trigger - // expensive disk reads, because it will require the peer to - // actually receive all the data read from disk over the network. - LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH); - CInv inv; - inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK; - inv.hash = req.blockhash; - pfrom.vRecvGetData.push_back(inv); - // The message processing loop will go around again (without pausing) and we'll respond then (without cs_main) - return; - } + if (pindex->nHeight >= ::ChainActive().Height() - MAX_BLOCKTXN_DEPTH) { + CBlock block; + bool ret = ReadBlockFromDisk(block, pindex, m_chainparams.GetConsensus()); + assert(ret); - CBlock block; - bool ret = ReadBlockFromDisk(block, pindex, chainparams.GetConsensus()); - assert(ret); + SendBlockTransactions(pfrom, block, req); + return; + } + } - SendBlockTransactions(block, req, pfrom, connman); + // If an older block is requested (should never happen in practice, + // but can happen in tests) send a block response instead of a + // blocktxn response. Sending a full block response instead of a + // small blocktxn response is preferable in the case where a peer + // might maliciously send lots of getblocktxn requests to trigger + // expensive disk reads, because it will require the peer to + // actually receive all the data read from disk over the network. + LogPrint(BCLog::NET, "Peer %d sent us a getblocktxn for a block > %i deep\n", pfrom.GetId(), MAX_BLOCKTXN_DEPTH); + CInv inv; + WITH_LOCK(cs_main, inv.type = State(pfrom.GetId())->fWantsCmpctWitness ? MSG_WITNESS_BLOCK : MSG_BLOCK); + inv.hash = req.blockhash; + WITH_LOCK(peer->m_getdata_requests_mutex, peer->m_getdata_requests.push_back(inv)); + // The message processing loop will go around again (without pausing) and we'll respond then return; } @@ -2889,7 +2873,7 @@ void ProcessMessage( return; } - if (!BlockRequestAllowed(pindex, chainparams.GetConsensus())) { + if (!BlockRequestAllowed(pindex, m_chainparams.GetConsensus())) { LogPrint(BCLog::NET, "%s: ignoring request from peer=%i for old block header that isn't in the main chain\n", __func__, pfrom.GetId()); return; } @@ -2925,7 +2909,7 @@ void ProcessMessage( // will re-announce the new block via headers (or compact blocks again) // in the SendMessages logic. nodestate->pindexBestHeaderSent = pindex ? pindex : ::ChainActive().Tip(); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); return; } @@ -2964,15 +2948,12 @@ void ProcessMessage( TxValidationState state; - for (const GenTxid& gtxid : {GenTxid(false, txid), GenTxid(true, wtxid)}) { - nodestate->m_tx_download.m_tx_announced.erase(gtxid.GetHash()); - nodestate->m_tx_download.m_tx_in_flight.erase(gtxid.GetHash()); - EraseTxRequest(gtxid); - } + m_txrequest.ReceivedResponse(pfrom.GetId(), txid); + if (tx.HasWitness()) m_txrequest.ReceivedResponse(pfrom.GetId(), wtxid); std::list<CTransactionRef> lRemovedTxn; - // We do the AlreadyHave() check using wtxid, rather than txid - in the + // We do the AlreadyHaveTx() check using wtxid, rather than txid - in the // absence of witness malleation, this is strictly better, because the // recent rejects filter may contain the wtxid but rarely contains // the txid of a segwit transaction that has been rejected. @@ -2984,15 +2965,19 @@ void ProcessMessage( // already; and an adversary can already relay us old transactions // (older than our recency filter) if trying to DoS us, without any need // for witness malleation. - if (!AlreadyHave(CInv(MSG_WTX, wtxid), mempool) && - AcceptToMemoryPool(mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) { - mempool.check(&::ChainstateActive().CoinsTip()); - RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), connman); + if (!AlreadyHaveTx(GenTxid(/* is_wtxid=*/true, wtxid), m_mempool) && + AcceptToMemoryPool(m_mempool, state, ptx, &lRemovedTxn, false /* bypass_limits */)) { + m_mempool.check(&::ChainstateActive().CoinsTip()); + // As this version of the transaction was acceptable, we can forget about any + // requests for it. + m_txrequest.ForgetTxHash(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); + RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), m_connman); for (unsigned int i = 0; i < tx.vout.size(); i++) { auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(txid, i)); if (it_by_prev != mapOrphanTransactionsByPrev.end()) { for (const auto& elem : it_by_prev->second) { - pfrom.orphan_work_set.insert(elem->first); + peer->m_orphan_work_set.insert(elem->first); } } } @@ -3002,10 +2987,14 @@ void ProcessMessage( LogPrint(BCLog::MEMPOOL, "AcceptToMemoryPool: peer=%d: accepted %s (poolsz %u txn, %u kB)\n", pfrom.GetId(), tx.GetHash().ToString(), - mempool.size(), mempool.DynamicMemoryUsage() / 1000); + m_mempool.size(), m_mempool.DynamicMemoryUsage() / 1000); + + for (const CTransactionRef& removedTx : lRemovedTxn) { + AddToCompactExtraTransactions(removedTx); + } // Recursively process any orphan transactions that depended on this one - ProcessOrphanTx(connman, mempool, pfrom.orphan_work_set, lRemovedTxn); + ProcessOrphanTx(peer->m_orphan_work_set); } else if (state.GetResult() == TxValidationResult::TX_MISSING_INPUTS) { @@ -3028,7 +3017,6 @@ void ProcessMessage( } } if (!fRejectedParents) { - uint32_t nFetchFlags = GetFetchFlags(pfrom); const auto current_time = GetTime<std::chrono::microseconds>(); for (const uint256& parent_txid : unique_parents) { @@ -3037,12 +3025,16 @@ void ProcessMessage( // wtxidrelay peers. // Eventually we should replace this with an improved // protocol for getting all unconfirmed parents. - CInv _inv(MSG_TX | nFetchFlags, parent_txid); + const GenTxid gtxid{/* is_wtxid=*/false, parent_txid}; pfrom.AddKnownTx(parent_txid); - if (!AlreadyHave(_inv, mempool)) RequestTx(State(pfrom.GetId()), ToGenTxid(_inv), current_time); + if (!AlreadyHaveTx(gtxid, m_mempool)) AddTxAnnouncement(pfrom, gtxid, current_time); } AddOrphanTx(ptx, pfrom.GetId()); + // Once added to the orphan pool, a tx is considered AlreadyHave, and we shouldn't request it anymore. + m_txrequest.ForgetTxHash(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); + // DoS prevention: do not allow mapOrphanTransactions to grow unbounded (see CVE-2012-3789) unsigned int nMaxOrphanTx = (unsigned int)std::max((int64_t)0, gArgs.GetArg("-maxorphantx", DEFAULT_MAX_ORPHAN_TRANSACTIONS)); unsigned int nEvicted = LimitOrphanTxSize(nMaxOrphanTx); @@ -3059,6 +3051,8 @@ void ProcessMessage( // from any of our non-wtxidrelay peers. recentRejects->insert(tx.GetHash()); recentRejects->insert(tx.GetWitnessHash()); + m_txrequest.ForgetTxHash(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); } } else { if (state.GetResult() != TxValidationResult::TX_WITNESS_STRIPPED) { @@ -3077,6 +3071,7 @@ void ProcessMessage( // if we start doing this too early. assert(recentRejects); recentRejects->insert(tx.GetWitnessHash()); + m_txrequest.ForgetTxHash(tx.GetWitnessHash()); // If the transaction failed for TX_INPUTS_NOT_STANDARD, // then we know that the witness was irrelevant to the policy // failure, since this check depends only on the txid @@ -3087,12 +3082,11 @@ void ProcessMessage( // parent-fetching by txid via the orphan-handling logic). if (state.GetResult() == TxValidationResult::TX_INPUTS_NOT_STANDARD && tx.GetWitnessHash() != tx.GetHash()) { recentRejects->insert(tx.GetHash()); + m_txrequest.ForgetTxHash(tx.GetHash()); } if (RecursiveDynamicUsage(*ptx) < 100000) { AddToCompactExtraTransactions(ptx); } - } else if (tx.HasWitness() && RecursiveDynamicUsage(*ptx) < 100000) { - AddToCompactExtraTransactions(ptx); } if (pfrom.HasPermission(PF_FORCERELAY)) { @@ -3100,18 +3094,15 @@ void ProcessMessage( // if they were already in the mempool, // allowing the node to function as a gateway for // nodes hidden behind it. - if (!mempool.exists(tx.GetHash())) { + if (!m_mempool.exists(tx.GetHash())) { LogPrintf("Not relaying non-mempool transaction %s from forcerelay peer=%d\n", tx.GetHash().ToString(), pfrom.GetId()); } else { LogPrintf("Force relaying tx %s from peer=%d\n", tx.GetHash().ToString(), pfrom.GetId()); - RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), connman); + RelayTransaction(tx.GetHash(), tx.GetWitnessHash(), m_connman); } } } - for (const CTransactionRef& removedTx : lRemovedTxn) - AddToCompactExtraTransactions(removedTx); - // If a tx has been detected by recentRejects, we will have reached // this point and the tx will have been ignored. Because we haven't run // the tx through AcceptToMemoryPool, we won't have computed a DoS @@ -3157,7 +3148,7 @@ void ProcessMessage( if (!LookupBlockIndex(cmpctblock.header.hashPrevBlock)) { // Doesn't connect (or is genesis), instead of DoSing in AcceptBlockHeader, request deeper headers if (!::ChainstateActive().IsInitialBlockDownload()) - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexBestHeader), uint256())); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexBestHeader), uint256())); return; } @@ -3168,7 +3159,7 @@ void ProcessMessage( const CBlockIndex *pindex = nullptr; BlockValidationState state; - if (!chainman.ProcessNewBlockHeaders({cmpctblock.header}, state, chainparams, &pindex)) { + if (!m_chainman.ProcessNewBlockHeaders({cmpctblock.header}, state, m_chainparams, &pindex)) { if (state.IsInvalid()) { MaybePunishNodeForBlock(pfrom.GetId(), state, /*via_compact_block*/ true, "invalid header via cmpctblock"); return; @@ -3218,16 +3209,16 @@ void ProcessMessage( // so we just grab the block via normal getdata std::vector<CInv> vInv(1); vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom), cmpctblock.header.GetHash()); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); } return; } // If we're not close to tip yet, give up and let parallel block fetch work its magic - if (!fAlreadyInFlight && !CanDirectFetch(chainparams.GetConsensus())) + if (!fAlreadyInFlight && !CanDirectFetch(m_chainparams.GetConsensus())) return; - if (IsWitnessEnabled(pindex->pprev, chainparams.GetConsensus()) && !nodestate->fSupportsDesiredCmpctVersion) { + if (IsWitnessEnabled(pindex->pprev, m_chainparams.GetConsensus()) && !nodestate->fSupportsDesiredCmpctVersion) { // Don't bother trying to process compact blocks from v1 peers // after segwit activates. return; @@ -3239,9 +3230,9 @@ void ProcessMessage( if ((!fAlreadyInFlight && nodestate->nBlocksInFlight < MAX_BLOCKS_IN_TRANSIT_PER_PEER) || (fAlreadyInFlight && blockInFlightIt->second.first == pfrom.GetId())) { std::list<QueuedBlock>::iterator* queuedBlockIt = nullptr; - if (!MarkBlockAsInFlight(mempool, pfrom.GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) { + if (!MarkBlockAsInFlight(m_mempool, pfrom.GetId(), pindex->GetBlockHash(), pindex, &queuedBlockIt)) { if (!(*queuedBlockIt)->partialBlock) - (*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&mempool)); + (*queuedBlockIt)->partialBlock.reset(new PartiallyDownloadedBlock(&m_mempool)); else { // The block was already in flight using compact blocks from the same peer LogPrint(BCLog::NET, "Peer sent us compact block we were already syncing!\n"); @@ -3259,7 +3250,7 @@ void ProcessMessage( // Duplicate txindexes, the block is now in-flight, so just request it std::vector<CInv> vInv(1); vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom), cmpctblock.header.GetHash()); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); return; } @@ -3276,7 +3267,7 @@ void ProcessMessage( fProcessBLOCKTXN = true; } else { req.blockhash = pindex->GetBlockHash(); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETBLOCKTXN, req)); } } else { // This block is either already in flight from a different @@ -3284,7 +3275,7 @@ void ProcessMessage( // download from. // Optimistically try to reconstruct anyway since we might be // able to without any round trips. - PartiallyDownloadedBlock tempBlock(&mempool); + PartiallyDownloadedBlock tempBlock(&m_mempool); ReadStatus status = tempBlock.InitData(cmpctblock, vExtraTxnForCompact); if (status != READ_STATUS_OK) { // TODO: don't ignore failures @@ -3302,7 +3293,7 @@ void ProcessMessage( // mempool will probably be useless - request the block normally std::vector<CInv> vInv(1); vInv[0] = CInv(MSG_BLOCK | GetFetchFlags(pfrom), cmpctblock.header.GetHash()); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, vInv)); return; } else { // If this was an announce-cmpctblock, we want the same treatment as a header message @@ -3311,8 +3302,9 @@ void ProcessMessage( } } // cs_main - if (fProcessBLOCKTXN) - return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, time_received, chainparams, chainman, mempool, connman, banman, interruptMsgProc); + if (fProcessBLOCKTXN) { + return ProcessMessage(pfrom, NetMsgType::BLOCKTXN, blockTxnMsg, time_received, interruptMsgProc); + } if (fRevertToHeaderProcessing) { // Headers received from HB compact block peers are permitted to be @@ -3320,7 +3312,7 @@ void ProcessMessage( // the peer if the header turns out to be for an invalid block. // Note that if a peer tries to build on an invalid chain, that // will be detected and the peer will be disconnected/discouraged. - return ProcessHeadersMessage(pfrom, connman, chainman, mempool, {cmpctblock.header}, chainparams, /*via_compact_block=*/true); + return ProcessHeadersMessage(pfrom, {cmpctblock.header}, /*via_compact_block=*/true); } if (fBlockReconstructed) { @@ -3340,7 +3332,7 @@ void ProcessMessage( // we have a chain with at least nMinimumChainWork), and we ignore // compact blocks with less work than our tip, it is safe to treat // reconstructed compact blocks as having been requested. - chainman.ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true, &fNewBlock); + m_chainman.ProcessNewBlock(m_chainparams, pblock, /*fForceProcessing=*/true, &fNewBlock); if (fNewBlock) { pfrom.nLastBlockTime = GetTime(); } else { @@ -3392,7 +3384,7 @@ void ProcessMessage( // Might have collided, fall back to getdata now :( std::vector<CInv> invs; invs.push_back(CInv(MSG_BLOCK | GetFetchFlags(pfrom), resp.blockhash)); - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::GETDATA, invs)); } else { // Block is either okay, or possibly we received // READ_STATUS_CHECKBLOCK_FAILED. @@ -3430,7 +3422,7 @@ void ProcessMessage( // disk-space attacks), but this should be safe due to the // protections in the compact block handler -- see related comment // in compact block optimistic reconstruction handling. - chainman.ProcessNewBlock(chainparams, pblock, /*fForceProcessing=*/true, &fNewBlock); + m_chainman.ProcessNewBlock(m_chainparams, pblock, /*fForceProcessing=*/true, &fNewBlock); if (fNewBlock) { pfrom.nLastBlockTime = GetTime(); } else { @@ -3454,7 +3446,6 @@ void ProcessMessage( // Bypass the normal CBlock deserialization, as we don't want to risk deserializing 2000 full blocks. unsigned int nCount = ReadCompactSize(vRecv); if (nCount > MAX_HEADERS_RESULTS) { - LOCK(cs_main); Misbehaving(pfrom.GetId(), 20, strprintf("headers message size = %u", nCount)); return; } @@ -3464,7 +3455,7 @@ void ProcessMessage( ReadCompactSize(vRecv); // ignore tx count; assume it is 0. } - return ProcessHeadersMessage(pfrom, connman, chainman, mempool, headers, chainparams, /*via_compact_block=*/false); + return ProcessHeadersMessage(pfrom, headers, /*via_compact_block=*/false); } if (msg_type == NetMsgType::BLOCK) @@ -3493,7 +3484,7 @@ void ProcessMessage( mapBlockSource.emplace(hash, std::make_pair(pfrom.GetId(), true)); } bool fNewBlock = false; - chainman.ProcessNewBlock(chainparams, pblock, forceProcessing, &fNewBlock); + m_chainman.ProcessNewBlock(m_chainparams, pblock, forceProcessing, &fNewBlock); if (fNewBlock) { pfrom.nLastBlockTime = GetTime(); } else { @@ -3509,12 +3500,8 @@ void ProcessMessage( // to users' AddrMan and later request them by sending getaddr messages. // Making nodes which are behind NAT and can only make outgoing connections ignore // the getaddr message mitigates the attack. - if (!pfrom.fInbound) { - LogPrint(BCLog::NET, "Ignoring \"getaddr\" from outbound connection. peer=%d\n", pfrom.GetId()); - return; - } - if (!pfrom.IsAddrRelayPeer()) { - LogPrint(BCLog::NET, "Ignoring \"getaddr\" from block-relay-only connection. peer=%d\n", pfrom.GetId()); + if (!pfrom.IsInboundConn()) { + LogPrint(BCLog::NET, "Ignoring \"getaddr\" from %s connection. peer=%d\n", pfrom.ConnectionTypeAsString(), pfrom.GetId()); return; } @@ -3529,9 +3516,9 @@ void ProcessMessage( pfrom.vAddrToSend.clear(); std::vector<CAddress> vAddr; if (pfrom.HasPermission(PF_ADDR)) { - vAddr = connman.GetAddresses(); + vAddr = m_connman.GetAddresses(MAX_ADDR_TO_SEND, MAX_PCT_ADDR_TO_SEND); } else { - vAddr = connman.GetAddresses(pfrom.addr.GetNetwork()); + vAddr = m_connman.GetAddresses(pfrom, MAX_ADDR_TO_SEND, MAX_PCT_ADDR_TO_SEND); } FastRandomContext insecure_rand; for (const CAddress &addr : vAddr) { @@ -3551,7 +3538,7 @@ void ProcessMessage( return; } - if (connman.OutboundTargetReached(false) && !pfrom.HasPermission(PF_MEMPOOL)) + if (m_connman.OutboundTargetReached(false) && !pfrom.HasPermission(PF_MEMPOOL)) { if (!pfrom.HasPermission(PF_NOBAN)) { @@ -3569,8 +3556,7 @@ void ProcessMessage( } if (msg_type == NetMsgType::PING) { - if (pfrom.nVersion > BIP0031_VERSION) - { + if (pfrom.GetCommonVersion() > BIP0031_VERSION) { uint64_t nonce = 0; vRecv >> nonce; // Echo the message back with the nonce. This allows for two useful features: @@ -3584,7 +3570,7 @@ void ProcessMessage( // it, if the remote node sends a ping once per second and this node takes 5 // seconds to respond to each, the 5th ping the remote sends would appear to // return very quickly. - connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::PONG, nonce)); + m_connman.PushMessage(&pfrom, msgMaker.Make(NetMsgType::PONG, nonce)); } return; } @@ -3605,7 +3591,7 @@ void ProcessMessage( // Matching pong received, this ping is no longer outstanding bPingFinished = true; const auto ping_time = ping_end - pfrom.m_ping_start.load(); - if (ping_time.count() > 0) { + if (ping_time.count() >= 0) { // Successful ping time measurement, replace previous pfrom.nPingUsecTime = count_microseconds(ping_time); pfrom.nMinPingUsecTime = std::min(pfrom.nMinPingUsecTime.load(), count_microseconds(ping_time)); @@ -3656,7 +3642,6 @@ void ProcessMessage( if (!filter.IsWithinSizeConstraints()) { // There is no excuse for sending a too-large filter - LOCK(cs_main); Misbehaving(pfrom.GetId(), 100, "too-large bloom filter"); } else if (pfrom.m_tx_relay != nullptr) @@ -3690,7 +3675,6 @@ void ProcessMessage( } } if (bad) { - LOCK(cs_main); Misbehaving(pfrom.GetId(), 100, "bad filteradd message"); } return; @@ -3724,39 +3708,30 @@ void ProcessMessage( } if (msg_type == NetMsgType::GETCFILTERS) { - ProcessGetCFilters(pfrom, vRecv, chainparams, connman); + ProcessGetCFilters(pfrom, vRecv, m_chainparams, m_connman); return; } if (msg_type == NetMsgType::GETCFHEADERS) { - ProcessGetCFHeaders(pfrom, vRecv, chainparams, connman); + ProcessGetCFHeaders(pfrom, vRecv, m_chainparams, m_connman); return; } if (msg_type == NetMsgType::GETCFCHECKPT) { - ProcessGetCFCheckPt(pfrom, vRecv, chainparams, connman); + ProcessGetCFCheckPt(pfrom, vRecv, m_chainparams, m_connman); return; } if (msg_type == NetMsgType::NOTFOUND) { - // Remove the NOTFOUND transactions from the peer - LOCK(cs_main); - CNodeState *state = State(pfrom.GetId()); std::vector<CInv> vInv; vRecv >> vInv; - if (vInv.size() <= MAX_PEER_TX_IN_FLIGHT + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + if (vInv.size() <= MAX_PEER_TX_ANNOUNCEMENTS + MAX_BLOCKS_IN_TRANSIT_PER_PEER) { + LOCK(::cs_main); for (CInv &inv : vInv) { if (inv.IsGenTxMsg()) { - // If we receive a NOTFOUND message for a txid we requested, erase - // it from our data structures for this peer. - auto in_flight_it = state->m_tx_download.m_tx_in_flight.find(inv.hash); - if (in_flight_it == state->m_tx_download.m_tx_in_flight.end()) { - // Skip any further work if this is a spurious NOTFOUND - // message. - continue; - } - state->m_tx_download.m_tx_in_flight.erase(in_flight_it); - state->m_tx_download.m_tx_announced.erase(inv.hash); + // If we receive a NOTFOUND message for a tx we requested, mark the announcement for it as + // completed in TxRequestTracker. + m_txrequest.ReceivedResponse(pfrom.GetId(), inv.hash); } } } @@ -3768,23 +3743,20 @@ void ProcessMessage( return; } -/** Maybe disconnect a peer and discourage future connections from its address. - * - * @param[in] pnode The node to check. - * @return True if the peer was marked for disconnection in this function - */ -bool PeerLogicValidation::MaybeDiscourageAndDisconnect(CNode& pnode) +bool PeerManager::MaybeDiscourageAndDisconnect(CNode& pnode) { const NodeId peer_id{pnode.GetId()}; + PeerRef peer = GetPeerRef(peer_id); + if (peer == nullptr) return false; + { - LOCK(cs_main); - CNodeState& state = *State(peer_id); + LOCK(peer->m_misbehavior_mutex); // There's nothing to do if the m_should_discourage flag isn't set - if (!state.m_should_discourage) return false; + if (!peer->m_should_discourage) return false; - state.m_should_discourage = false; - } // cs_main + peer->m_should_discourage = false; + } // peer.m_misbehavior_mutex if (pnode.HasPermission(PF_NOBAN)) { // We never disconnect or discourage peers for bad behavior if they have the NOBAN permission flag @@ -3792,7 +3764,7 @@ bool PeerLogicValidation::MaybeDiscourageAndDisconnect(CNode& pnode) return false; } - if (pnode.m_manual_connection) { + if (pnode.IsManualConn()) { // We never disconnect or discourage manual peers for bad behavior LogPrintf("Warning: not punishing manually connected peer %d!\n", peer_id); return false; @@ -3809,32 +3781,28 @@ bool PeerLogicValidation::MaybeDiscourageAndDisconnect(CNode& pnode) // Normal case: Disconnect the peer and discourage all nodes sharing the address LogPrintf("Disconnecting and discouraging peer %d!\n", peer_id); if (m_banman) m_banman->Discourage(pnode.addr); - connman->DisconnectNode(pnode.addr); + m_connman.DisconnectNode(pnode.addr); return true; } -bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc) +bool PeerManager::ProcessMessages(CNode* pfrom, std::atomic<bool>& interruptMsgProc) { - const CChainParams& chainparams = Params(); - // - // Message format - // (4) message start - // (12) command - // (4) size - // (4) checksum - // (x) data - // bool fMoreWork = false; - if (!pfrom->vRecvGetData.empty()) - ProcessGetData(*pfrom, chainparams, *connman, m_mempool, interruptMsgProc); + PeerRef peer = GetPeerRef(pfrom->GetId()); + if (peer == nullptr) return false; - if (!pfrom->orphan_work_set.empty()) { - std::list<CTransactionRef> removed_txn; + { + LOCK(peer->m_getdata_requests_mutex); + if (!peer->m_getdata_requests.empty()) { + ProcessGetData(*pfrom, *peer, m_chainparams, m_connman, m_mempool, interruptMsgProc); + } + } + + { LOCK2(cs_main, g_cs_orphans); - ProcessOrphanTx(*connman, m_mempool, pfrom->orphan_work_set, removed_txn); - for (const CTransactionRef& removedTx : removed_txn) { - AddToCompactExtraTransactions(removedTx); + if (!peer->m_orphan_work_set.empty()) { + ProcessOrphanTx(peer->m_orphan_work_set); } } @@ -3842,9 +3810,16 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter return false; // this maintains the order of responses - // and prevents vRecvGetData to grow unbounded - if (!pfrom->vRecvGetData.empty()) return true; - if (!pfrom->orphan_work_set.empty()) return true; + // and prevents m_getdata_requests to grow unbounded + { + LOCK(peer->m_getdata_requests_mutex); + if (!peer->m_getdata_requests.empty()) return true; + } + + { + LOCK(g_cs_orphans); + if (!peer->m_orphan_work_set.empty()) return true; + } // Don't bother if send buffer is too full to respond anyway if (pfrom->fPauseSend) @@ -3858,45 +3833,24 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter // Just take one message msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin()); pfrom->nProcessQueueSize -= msgs.front().m_raw_message_size; - pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman->GetReceiveFloodSize(); + pfrom->fPauseRecv = pfrom->nProcessQueueSize > m_connman.GetReceiveFloodSize(); fMoreWork = !pfrom->vProcessMsg.empty(); } CNetMessage& msg(msgs.front()); - msg.SetVersion(pfrom->GetRecvVersion()); - // Check network magic - if (!msg.m_valid_netmagic) { - LogPrint(BCLog::NET, "PROCESSMESSAGE: INVALID MESSAGESTART %s peer=%d\n", SanitizeString(msg.m_command), pfrom->GetId()); - pfrom->fDisconnect = true; - return false; - } - - // Check header - if (!msg.m_valid_header) - { - LogPrint(BCLog::NET, "PROCESSMESSAGE: ERRORS IN HEADER %s peer=%d\n", SanitizeString(msg.m_command), pfrom->GetId()); - return fMoreWork; - } + msg.SetVersion(pfrom->GetCommonVersion()); const std::string& msg_type = msg.m_command; // Message size unsigned int nMessageSize = msg.m_message_size; - // Checksum - CDataStream& vRecv = msg.m_recv; - if (!msg.m_valid_checksum) - { - LogPrint(BCLog::NET, "%s(%s, %u bytes): CHECKSUM ERROR peer=%d\n", __func__, - SanitizeString(msg_type), nMessageSize, pfrom->GetId()); - return fMoreWork; - } - try { - ProcessMessage(*pfrom, msg_type, vRecv, msg.m_time, chainparams, m_chainman, m_mempool, *connman, m_banman, interruptMsgProc); - if (interruptMsgProc) - return false; - if (!pfrom->vRecvGetData.empty()) - fMoreWork = true; + ProcessMessage(*pfrom, msg_type, msg.m_recv, msg.m_time, interruptMsgProc); + if (interruptMsgProc) return false; + { + LOCK(peer->m_getdata_requests_mutex); + if (!peer->m_getdata_requests.empty()) fMoreWork = true; + } } catch (const std::exception& e) { LogPrint(BCLog::NET, "%s(%s, %u bytes): Exception '%s' (%s) caught\n", __func__, SanitizeString(msg_type), nMessageSize, e.what(), typeid(e).name()); } catch (...) { @@ -3906,14 +3860,14 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter return fMoreWork; } -void PeerLogicValidation::ConsiderEviction(CNode& pto, int64_t time_in_seconds) +void PeerManager::ConsiderEviction(CNode& pto, int64_t time_in_seconds) { AssertLockHeld(cs_main); CNodeState &state = *State(pto.GetId()); - const CNetMsgMaker msgMaker(pto.GetSendVersion()); + const CNetMsgMaker msgMaker(pto.GetCommonVersion()); - if (!state.m_chain_sync.m_protect && IsOutboundDisconnectionCandidate(pto) && state.fSyncStarted) { + if (!state.m_chain_sync.m_protect && pto.IsOutboundOrBlockRelayConn() && state.fSyncStarted) { // This is an outbound peer subject to disconnection if they don't // announce a block with as much work as the current tip within // CHAIN_SYNC_TIMEOUT + HEADERS_RESPONSE_TIME seconds (note: if @@ -3945,7 +3899,7 @@ void PeerLogicValidation::ConsiderEviction(CNode& pto, int64_t time_in_seconds) } else { assert(state.m_chain_sync.m_work_header); LogPrint(BCLog::NET, "sending getheaders to outbound peer=%d to verify chain work (current best known block:%s, benchmark blockhash: %s)\n", pto.GetId(), state.pindexBestKnownBlock != nullptr ? state.pindexBestKnownBlock->GetBlockHash().ToString() : "<none>", state.m_chain_sync.m_work_header->GetBlockHash().ToString()); - connman->PushMessage(&pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(state.m_chain_sync.m_work_header->pprev), uint256())); + m_connman.PushMessage(&pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(state.m_chain_sync.m_work_header->pprev), uint256())); state.m_chain_sync.m_sent_getheaders = true; constexpr int64_t HEADERS_RESPONSE_TIME = 120; // 2 minutes // Bump the timeout to allow a response, which could clear the timeout @@ -3959,10 +3913,10 @@ void PeerLogicValidation::ConsiderEviction(CNode& pto, int64_t time_in_seconds) } } -void PeerLogicValidation::EvictExtraOutboundPeers(int64_t time_in_seconds) +void PeerManager::EvictExtraOutboundPeers(int64_t time_in_seconds) { // Check whether we have too many outbound peers - int extra_peers = connman->GetExtraOutboundCount(); + int extra_peers = m_connman.GetExtraOutboundCount(); if (extra_peers > 0) { // If we have more outbound peers than we target, disconnect one. // Pick the outbound peer that least recently announced @@ -3971,11 +3925,11 @@ void PeerLogicValidation::EvictExtraOutboundPeers(int64_t time_in_seconds) NodeId worst_peer = -1; int64_t oldest_block_announcement = std::numeric_limits<int64_t>::max(); - connman->ForEachNode([&](CNode* pnode) { - AssertLockHeld(cs_main); + m_connman.ForEachNode([&](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { + AssertLockHeld(::cs_main); // Ignore non-outbound peers, or nodes marked for disconnect already - if (!IsOutboundDisconnectionCandidate(*pnode) || pnode->fDisconnect) return; + if (!pnode->IsOutboundOrBlockRelayConn() || pnode->fDisconnect) return; CNodeState *state = State(pnode->GetId()); if (state == nullptr) return; // shouldn't be possible, but just in case // Don't evict our protected peers @@ -3988,8 +3942,8 @@ void PeerLogicValidation::EvictExtraOutboundPeers(int64_t time_in_seconds) } }); if (worst_peer != -1) { - bool disconnected = connman->ForNode(worst_peer, [&](CNode *pnode) { - AssertLockHeld(cs_main); + bool disconnected = m_connman.ForNode(worst_peer, [&](CNode* pnode) EXCLUSIVE_LOCKS_REQUIRED(::cs_main) { + AssertLockHeld(::cs_main); // Only disconnect a peer that has been connected to us for // some reasonable fraction of our check-frequency, to give @@ -4012,18 +3966,16 @@ void PeerLogicValidation::EvictExtraOutboundPeers(int64_t time_in_seconds) // detected a stale tip. Don't try any more extra peers until // we next detect a stale tip, to limit the load we put on the // network from these extra connections. - connman->SetTryNewOutboundPeer(false); + m_connman.SetTryNewOutboundPeer(false); } } } } -void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params &consensusParams) +void PeerManager::CheckForStaleTipAndEvictPeers() { LOCK(cs_main); - if (connman == nullptr) return; - int64_t time_in_seconds = GetTime(); EvictExtraOutboundPeers(time_in_seconds); @@ -4031,11 +3983,11 @@ void PeerLogicValidation::CheckForStaleTipAndEvictPeers(const Consensus::Params if (time_in_seconds > m_stale_tip_check_time) { // Check whether our tip is stale, and if so, allow using an extra // outbound peer - if (!fImporting && !fReindex && connman->GetNetworkActive() && connman->GetUseAddrmanOutgoing() && TipMayBeStale(consensusParams)) { + if (!fImporting && !fReindex && m_connman.GetNetworkActive() && m_connman.GetUseAddrmanOutgoing() && TipMayBeStale(m_chainparams.GetConsensus())) { LogPrintf("Potential stale tip detected, will try using extra outbound peer (last tip update: %d seconds ago)\n", time_in_seconds - g_last_tip_update); - connman->SetTryNewOutboundPeer(true); - } else if (connman->GetTryNewOutboundPeer()) { - connman->SetTryNewOutboundPeer(false); + m_connman.SetTryNewOutboundPeer(true); + } else if (m_connman.GetTryNewOutboundPeer()) { + m_connman.SetTryNewOutboundPeer(false); } m_stale_tip_check_time = time_in_seconds + STALE_CHECK_INTERVAL; } @@ -4062,9 +4014,9 @@ public: }; } -bool PeerLogicValidation::SendMessages(CNode* pto) +bool PeerManager::SendMessages(CNode* pto) { - const Consensus::Params& consensusParams = Params().GetConsensus(); + const Consensus::Params& consensusParams = m_chainparams.GetConsensus(); // We must call MaybeDiscourageAndDisconnect first, to ensure that we'll // disconnect misbehaving peers even before the version handshake is complete. @@ -4075,7 +4027,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) return true; // If we get here, the outgoing message serialization version is set and can't change. - const CNetMsgMaker msgMaker(pto->GetSendVersion()); + const CNetMsgMaker msgMaker(pto->GetCommonVersion()); // // Message: ping @@ -4096,13 +4048,13 @@ bool PeerLogicValidation::SendMessages(CNode* pto) } pto->fPingQueued = false; pto->m_ping_start = GetTime<std::chrono::microseconds>(); - if (pto->nVersion > BIP0031_VERSION) { + if (pto->GetCommonVersion() > BIP0031_VERSION) { pto->nPingNonceSent = nonce; - connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING, nonce)); } else { // Peer is too old to support ping command with nonce, pong will never arrive. pto->nPingNonceSent = 0; - connman->PushMessage(pto, msgMaker.Make(NetMsgType::PING)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::PING)); } } @@ -4112,10 +4064,9 @@ bool PeerLogicValidation::SendMessages(CNode* pto) CNodeState &state = *State(pto->GetId()); // Address refresh broadcast - int64_t nNow = GetTimeMicros(); auto current_time = GetTime<std::chrono::microseconds>(); - if (pto->IsAddrRelayPeer() && !::ChainstateActive().IsInitialBlockDownload() && pto->m_next_local_addr_send < current_time) { + if (pto->RelayAddrsWithConn() && !::ChainstateActive().IsInitialBlockDownload() && pto->m_next_local_addr_send < current_time) { AdvertiseLocal(pto); pto->m_next_local_addr_send = PoissonNextSend(current_time, AVG_LOCAL_ADDRESS_BROADCAST_INTERVAL); } @@ -4123,11 +4074,22 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // // Message: addr // - if (pto->IsAddrRelayPeer() && pto->m_next_addr_send < current_time) { + if (pto->RelayAddrsWithConn() && pto->m_next_addr_send < current_time) { pto->m_next_addr_send = PoissonNextSend(current_time, AVG_ADDRESS_BROADCAST_INTERVAL); std::vector<CAddress> vAddr; vAddr.reserve(pto->vAddrToSend.size()); assert(pto->m_addr_known); + + const char* msg_type; + int make_flags; + if (pto->m_wants_addrv2) { + msg_type = NetMsgType::ADDRV2; + make_flags = ADDRV2_FORMAT; + } else { + msg_type = NetMsgType::ADDR; + make_flags = 0; + } + for (const CAddress& addr : pto->vAddrToSend) { if (!pto->m_addr_known->contains(addr.GetKey())) @@ -4137,14 +4099,14 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // receiver rejects addr messages larger than MAX_ADDR_TO_SEND if (vAddr.size() >= MAX_ADDR_TO_SEND) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); + m_connman.PushMessage(pto, msgMaker.Make(make_flags, msg_type, vAddr)); vAddr.clear(); } } } pto->vAddrToSend.clear(); if (!vAddr.empty()) - connman->PushMessage(pto, msgMaker.Make(NetMsgType::ADDR, vAddr)); + m_connman.PushMessage(pto, msgMaker.Make(make_flags, msg_type, vAddr)); // we only send the big addr message once if (pto->vAddrToSend.capacity() > 40) pto->vAddrToSend.shrink_to_fit(); @@ -4153,12 +4115,12 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // Start block sync if (pindexBestHeader == nullptr) pindexBestHeader = ::ChainActive().Tip(); - bool fFetch = state.fPreferredDownload || (nPreferredDownload == 0 && !pto->fClient && !pto->fOneShot); // Download if this is a nice peer, or we have no nice peers and this one might do. + bool fFetch = state.fPreferredDownload || (nPreferredDownload == 0 && !pto->fClient && !pto->IsAddrFetchConn()); // Download if this is a nice peer, or we have no nice peers and this one might do. if (!state.fSyncStarted && !pto->fClient && !fImporting && !fReindex) { // Only actively request headers from a single peer, unless we're close to today. if ((nSyncStarted == 0 && fFetch) || pindexBestHeader->GetBlockTime() > GetAdjustedTime() - 24 * 60 * 60) { state.fSyncStarted = true; - state.nHeadersSyncTimeout = GetTimeMicros() + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); + state.nHeadersSyncTimeout = count_microseconds(current_time) + HEADERS_DOWNLOAD_TIMEOUT_BASE + HEADERS_DOWNLOAD_TIMEOUT_PER_HEADER * (GetAdjustedTime() - pindexBestHeader->GetBlockTime())/(consensusParams.nPowTargetSpacing); nSyncStarted++; const CBlockIndex *pindexStart = pindexBestHeader; /* If possible, start at the block preceding the currently @@ -4171,7 +4133,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) if (pindexStart->pprev) pindexStart = pindexStart->pprev; LogPrint(BCLog::NET, "initial getheaders (%d) to peer=%d (startheight:%d)\n", pindexStart->nHeight, pto->GetId(), pto->nStartingHeight); - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256())); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETHEADERS, ::ChainActive().GetLocator(pindexStart), uint256())); } } @@ -4255,10 +4217,10 @@ bool PeerLogicValidation::SendMessages(CNode* pto) LOCK(cs_most_recent_block); if (most_recent_block_hash == pBestIndex->GetBlockHash()) { if (state.fWantsCmpctWitness || !fWitnessesPresentInMostRecentCompactBlock) - connman->PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); + m_connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, *most_recent_compact_block)); else { CBlockHeaderAndShortTxIDs cmpctblock(*most_recent_block, state.fWantsCmpctWitness); - connman->PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); + m_connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); } fGotBlockFromCache = true; } @@ -4268,7 +4230,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) bool ret = ReadBlockFromDisk(block, pBestIndex, consensusParams); assert(ret); CBlockHeaderAndShortTxIDs cmpctblock(block, state.fWantsCmpctWitness); - connman->PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); + m_connman.PushMessage(pto, msgMaker.Make(nSendFlags, NetMsgType::CMPCTBLOCK, cmpctblock)); } state.pindexBestHeaderSent = pBestIndex; } else if (state.fPreferHeaders) { @@ -4281,7 +4243,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) LogPrint(BCLog::NET, "%s: sending header %s to peer=%d\n", __func__, vHeaders.front().GetHash().ToString(), pto->GetId()); } - connman->PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::HEADERS, vHeaders)); state.pindexBestHeaderSent = pBestIndex; } else fRevertToInv = true; @@ -4326,7 +4288,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) for (const uint256& hash : pto->vInventoryBlockToSend) { vInv.push_back(CInv(MSG_BLOCK, hash)); if (vInv.size() == MAX_INV_SZ) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } } @@ -4338,8 +4300,8 @@ bool PeerLogicValidation::SendMessages(CNode* pto) bool fSendTrickle = pto->HasPermission(PF_NOBAN); if (pto->m_tx_relay->nNextInvSend < current_time) { fSendTrickle = true; - if (pto->fInbound) { - pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{connman->PoissonNextSendInbound(nNow, INVENTORY_BROADCAST_INTERVAL)}; + if (pto->IsInboundConn()) { + pto->m_tx_relay->nNextInvSend = std::chrono::microseconds{m_connman.PoissonNextSendInbound(count_microseconds(current_time), INVENTORY_BROADCAST_INTERVAL)}; } else { // Use half the delay for outbound peers, as there is less privacy concern for them. pto->m_tx_relay->nNextInvSend = PoissonNextSend(current_time, std::chrono::seconds{INVENTORY_BROADCAST_INTERVAL >> 1}); @@ -4379,7 +4341,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // Responses to MEMPOOL requests bypass the m_recently_announced_invs filter. vInv.push_back(inv); if (vInv.size() == MAX_INV_SZ) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } } @@ -4438,7 +4400,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) nRelayedTransactions++; { // Expire old relay messages - while (!vRelayExpiration.empty() && vRelayExpiration.front().first < nNow) + while (!vRelayExpiration.empty() && vRelayExpiration.front().first < count_microseconds(current_time)) { mapRelay.erase(vRelayExpiration.front().second); vRelayExpiration.pop_front(); @@ -4446,16 +4408,16 @@ bool PeerLogicValidation::SendMessages(CNode* pto) auto ret = mapRelay.emplace(txid, std::move(txinfo.tx)); if (ret.second) { - vRelayExpiration.emplace_back(nNow + std::chrono::microseconds{RELAY_TX_CACHE_TIME}.count(), ret.first); + vRelayExpiration.emplace_back(count_microseconds(current_time + std::chrono::microseconds{RELAY_TX_CACHE_TIME}), ret.first); } // Add wtxid-based lookup into mapRelay as well, so that peers can request by wtxid auto ret2 = mapRelay.emplace(wtxid, ret.first->second); if (ret2.second) { - vRelayExpiration.emplace_back(nNow + std::chrono::microseconds{RELAY_TX_CACHE_TIME}.count(), ret2.first); + vRelayExpiration.emplace_back(count_microseconds(current_time + std::chrono::microseconds{RELAY_TX_CACHE_TIME}), ret2.first); } } if (vInv.size() == MAX_INV_SZ) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); vInv.clear(); } pto->m_tx_relay->filterInventoryKnown.insert(hash); @@ -4472,14 +4434,11 @@ bool PeerLogicValidation::SendMessages(CNode* pto) } } if (!vInv.empty()) - connman->PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::INV, vInv)); // Detect whether we're stalling current_time = GetTime<std::chrono::microseconds>(); - // nNow is the current system time (GetTimeMicros is not mockable) and - // should be replaced by the mockable current_time eventually - nNow = GetTimeMicros(); - if (state.nStallingSince && state.nStallingSince < nNow - 1000000 * BLOCK_STALLING_TIMEOUT) { + if (state.nStallingSince && state.nStallingSince < count_microseconds(current_time) - 1000000 * BLOCK_STALLING_TIMEOUT) { // Stalling only triggers when the block download window cannot move. During normal steady state, // the download window should be much larger than the to-be-downloaded set of blocks, so disconnection // should only happen during initial block download. @@ -4495,7 +4454,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) if (state.vBlocksInFlight.size() > 0) { QueuedBlock &queuedBlock = state.vBlocksInFlight.front(); int nOtherPeersWithValidatedDownloads = nPeersWithValidatedDownloads - (state.nBlocksInFlightValidHeaders > 0); - if (nNow > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { + if (count_microseconds(current_time) > state.nDownloadingSince + consensusParams.nPowTargetSpacing * (BLOCK_DOWNLOAD_TIMEOUT_BASE + BLOCK_DOWNLOAD_TIMEOUT_PER_PEER * nOtherPeersWithValidatedDownloads)) { LogPrintf("Timeout downloading block %s from peer=%d, disconnecting\n", queuedBlock.hash.ToString(), pto->GetId()); pto->fDisconnect = true; return true; @@ -4505,7 +4464,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) if (state.fSyncStarted && state.nHeadersSyncTimeout < std::numeric_limits<int64_t>::max()) { // Detect whether this is a stalling initial-headers-sync peer if (pindexBestHeader->GetBlockTime() <= GetAdjustedTime() - 24 * 60 * 60) { - if (nNow > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { + if (count_microseconds(current_time) > state.nHeadersSyncTimeout && nSyncStarted == 1 && (nPreferredDownload - state.fPreferredDownload >= 1)) { // Disconnect a peer (without the noban permission) if it is our only sync peer, // and we have others we could be using instead. // Note: If all our peers are inbound, then we won't @@ -4555,7 +4514,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto) } if (state.nBlocksInFlight == 0 && staller != -1) { if (State(staller)->nStallingSince == 0) { - State(staller)->nStallingSince = nNow; + State(staller)->nStallingSince = count_microseconds(current_time); LogPrint(BCLog::NET, "Stall started peer=%d\n", staller); } } @@ -4564,82 +4523,40 @@ bool PeerLogicValidation::SendMessages(CNode* pto) // // Message: getdata (non-blocks) // - - // For robustness, expire old requests after a long timeout, so that - // we can resume downloading transactions from a peer even if they - // were unresponsive in the past. - // Eventually we should consider disconnecting peers, but this is - // conservative. - if (state.m_tx_download.m_check_expiry_timer <= current_time) { - for (auto it=state.m_tx_download.m_tx_in_flight.begin(); it != state.m_tx_download.m_tx_in_flight.end();) { - if (it->second <= current_time - TX_EXPIRY_INTERVAL) { - LogPrint(BCLog::NET, "timeout of inflight tx %s from peer=%d\n", it->first.ToString(), pto->GetId()); - state.m_tx_download.m_tx_announced.erase(it->first); - state.m_tx_download.m_tx_in_flight.erase(it++); - } else { - ++it; - } - } - // On average, we do this check every TX_EXPIRY_INTERVAL. Randomize - // so that we're not doing this for all peers at the same time. - state.m_tx_download.m_check_expiry_timer = current_time + TX_EXPIRY_INTERVAL / 2 + GetRandMicros(TX_EXPIRY_INTERVAL); - } - - auto& tx_process_time = state.m_tx_download.m_tx_process_time; - while (!tx_process_time.empty() && tx_process_time.begin()->first <= current_time && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) { - const GenTxid gtxid = tx_process_time.begin()->second; - // Erase this entry from tx_process_time (it may be added back for - // processing at a later time, see below) - tx_process_time.erase(tx_process_time.begin()); - CInv inv(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash()); - if (!AlreadyHave(inv, m_mempool)) { - // If this transaction was last requested more than 1 minute ago, - // then request. - const auto last_request_time = GetTxRequestTime(gtxid); - if (last_request_time <= current_time - GETDATA_TX_INTERVAL) { - LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId()); - vGetData.push_back(inv); - if (vGetData.size() >= MAX_GETDATA_SZ) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); - vGetData.clear(); - } - UpdateTxRequestTime(gtxid, current_time); - state.m_tx_download.m_tx_in_flight.emplace(gtxid.GetHash(), current_time); - } else { - // This transaction is in flight from someone else; queue - // up processing to happen after the download times out - // (with a slight delay for inbound peers, to prefer - // requests to outbound peers). - // Don't apply the txid-delay to re-requests of a - // transaction; the heuristic of delaying requests to - // txid-relay peers is to save bandwidth on initial - // announcement of a transaction, and doesn't make sense - // for a followup request if our first peer times out (and - // would open us up to an attacker using inbound - // wtxid-relay to prevent us from requesting transactions - // from outbound txid-relay peers). - const auto next_process_time = CalculateTxGetDataTime(gtxid, current_time, !state.fPreferredDownload, false); - tx_process_time.emplace(next_process_time, gtxid); + std::vector<std::pair<NodeId, GenTxid>> expired; + auto requestable = m_txrequest.GetRequestable(pto->GetId(), current_time, &expired); + for (const auto& entry : expired) { + LogPrint(BCLog::NET, "timeout of inflight %s %s from peer=%d\n", entry.second.IsWtxid() ? "wtx" : "tx", + entry.second.GetHash().ToString(), entry.first); + } + for (const GenTxid& gtxid : requestable) { + if (!AlreadyHaveTx(gtxid, m_mempool)) { + LogPrint(BCLog::NET, "Requesting %s %s peer=%d\n", gtxid.IsWtxid() ? "wtx" : "tx", + gtxid.GetHash().ToString(), pto->GetId()); + vGetData.emplace_back(gtxid.IsWtxid() ? MSG_WTX : (MSG_TX | GetFetchFlags(*pto)), gtxid.GetHash()); + if (vGetData.size() >= MAX_GETDATA_SZ) { + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + vGetData.clear(); } + m_txrequest.RequestedTx(pto->GetId(), gtxid.GetHash(), current_time + GETDATA_TX_INTERVAL); } else { - // We have already seen this transaction, no need to download. - state.m_tx_download.m_tx_announced.erase(gtxid.GetHash()); - state.m_tx_download.m_tx_in_flight.erase(gtxid.GetHash()); + // We have already seen this transaction, no need to download. This is just a belt-and-suspenders, as + // this should already be called whenever a transaction becomes AlreadyHaveTx(). + m_txrequest.ForgetTxHash(gtxid.GetHash()); } } if (!vGetData.empty()) - connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData)); // // Message: feefilter // - if (pto->m_tx_relay != nullptr && pto->nVersion >= FEEFILTER_VERSION && gArgs.GetBoolArg("-feefilter", DEFAULT_FEEFILTER) && + if (pto->m_tx_relay != nullptr && pto->GetCommonVersion() >= FEEFILTER_VERSION && gArgs.GetBoolArg("-feefilter", DEFAULT_FEEFILTER) && !pto->HasPermission(PF_FORCERELAY) // peers with the forcerelay permission should not filter txs to us ) { CAmount currentFilter = m_mempool.GetMinFee(gArgs.GetArg("-maxmempool", DEFAULT_MAX_MEMPOOL_SIZE) * 1000000).GetFeePerK(); - int64_t timeNow = GetTimeMicros(); static FeeFilterRounder g_filter_rounder{CFeeRate{DEFAULT_MIN_RELAY_TX_FEE}}; if (m_chainman.ActiveChainstate().IsInitialBlockDownload()) { // Received tx-inv messages are discarded when the active @@ -4650,24 +4567,24 @@ bool PeerLogicValidation::SendMessages(CNode* pto) if (pto->m_tx_relay->lastSentFeeFilter == MAX_FILTER) { // Send the current filter if we sent MAX_FILTER previously // and made it out of IBD. - pto->m_tx_relay->nextSendTimeFeeFilter = timeNow - 1; + pto->m_tx_relay->nextSendTimeFeeFilter = count_microseconds(current_time) - 1; } } - if (timeNow > pto->m_tx_relay->nextSendTimeFeeFilter) { + if (count_microseconds(current_time) > pto->m_tx_relay->nextSendTimeFeeFilter) { CAmount filterToSend = g_filter_rounder.round(currentFilter); // We always have a fee filter of at least minRelayTxFee filterToSend = std::max(filterToSend, ::minRelayTxFee.GetFeePerK()); if (filterToSend != pto->m_tx_relay->lastSentFeeFilter) { - connman->PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend)); + m_connman.PushMessage(pto, msgMaker.Make(NetMsgType::FEEFILTER, filterToSend)); pto->m_tx_relay->lastSentFeeFilter = filterToSend; } - pto->m_tx_relay->nextSendTimeFeeFilter = PoissonNextSend(timeNow, AVG_FEEFILTER_BROADCAST_INTERVAL); + pto->m_tx_relay->nextSendTimeFeeFilter = PoissonNextSend(count_microseconds(current_time), AVG_FEEFILTER_BROADCAST_INTERVAL); } // If the fee filter has changed substantially and it's still more than MAX_FEEFILTER_CHANGE_DELAY // until scheduled broadcast, then move the broadcast to within MAX_FEEFILTER_CHANGE_DELAY. - else if (timeNow + MAX_FEEFILTER_CHANGE_DELAY * 1000000 < pto->m_tx_relay->nextSendTimeFeeFilter && + else if (count_microseconds(current_time) + MAX_FEEFILTER_CHANGE_DELAY * 1000000 < pto->m_tx_relay->nextSendTimeFeeFilter && (currentFilter < 3 * pto->m_tx_relay->lastSentFeeFilter / 4 || currentFilter > 4 * pto->m_tx_relay->lastSentFeeFilter / 3)) { - pto->m_tx_relay->nextSendTimeFeeFilter = timeNow + GetRandInt(MAX_FEEFILTER_CHANGE_DELAY) * 1000000; + pto->m_tx_relay->nextSendTimeFeeFilter = count_microseconds(current_time) + GetRandInt(MAX_FEEFILTER_CHANGE_DELAY) * 1000000; } } } // release cs_main |