aboutsummaryrefslogtreecommitdiff
path: root/src/net_processing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net_processing.cpp')
-rw-r--r--src/net_processing.cpp434
1 files changed, 315 insertions, 119 deletions
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 0e222bdfa..74e33189d 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -6,6 +6,7 @@
#include <net_processing.h>
#include <addrman.h>
+#include <banman.h>
#include <arith_uint256.h>
#include <blockencodings.h>
#include <chainparams.h>
@@ -28,6 +29,7 @@
#include <util/system.h>
#include <util/moneystr.h>
#include <util/strencodings.h>
+#include <util/validation.h>
#include <memory>
@@ -63,12 +65,28 @@ static constexpr int STALE_RELAY_AGE_LIMIT = 30 * 24 * 60 * 60;
/// Age after which a block is considered historical for purposes of rate
/// limiting block relay. Set to one week, denominated in seconds.
static constexpr int HISTORICAL_BLOCK_AGE = 7 * 24 * 60 * 60;
+/** Maximum number of in-flight transactions from a peer */
+static constexpr int32_t MAX_PEER_TX_IN_FLIGHT = 100;
+/** Maximum number of announced transactions from a peer */
+static constexpr int32_t MAX_PEER_TX_ANNOUNCEMENTS = 2 * MAX_INV_SZ;
+/** How many microseconds to delay requesting transactions from inbound peers */
+static constexpr int64_t INBOUND_PEER_TX_DELAY = 2 * 1000000;
+/** How long to wait (in microseconds) before downloading a transaction from an additional peer */
+static constexpr int64_t GETDATA_TX_INTERVAL = 60 * 1000000;
+/** Maximum delay (in microseconds) for transaction requests to avoid biasing some peers over others. */
+static constexpr int64_t MAX_GETDATA_RANDOM_DELAY = 2 * 1000000;
+static_assert(INBOUND_PEER_TX_DELAY >= MAX_GETDATA_RANDOM_DELAY,
+"To preserve security, MAX_GETDATA_RANDOM_DELAY should not exceed INBOUND_PEER_DELAY");
+/** Limit to avoid sending big packets. Not used in processing incoming GETDATA for compatibility */
+static const unsigned int MAX_GETDATA_SZ = 1000;
+
struct COrphanTx {
// When modifying, adapt the copy of this definition in tests/DoS_tests.
CTransactionRef tx;
NodeId fromPeer;
int64_t nTimeExpire;
+ size_t list_pos;
};
CCriticalSection g_cs_orphans;
std::map<uint256, COrphanTx> mapOrphanTransactions GUARDED_BY(g_cs_orphans);
@@ -158,8 +176,6 @@ namespace {
/** Expiration-time ordered list of (expire time, relay map entry) pairs. */
std::deque<std::pair<int64_t, MapRelay::iterator>> vRelayExpiration GUARDED_BY(cs_main);
- std::atomic<int64_t> nTimeBestReceived(0); // Used only to inform the wallet of when we last received a block
-
struct IteratorComparator
{
template<typename I>
@@ -170,6 +186,8 @@ namespace {
};
std::map<COutPoint, std::set<std::map<uint256, COrphanTx>::iterator, IteratorComparator>> mapOrphanTransactionsByPrev GUARDED_BY(g_cs_orphans);
+ std::vector<std::map<uint256, COrphanTx>::iterator> g_orphan_list GUARDED_BY(g_cs_orphans); //! For random eviction
+
static size_t vExtraTxnForCompactIt GUARDED_BY(g_cs_orphans) = 0;
static std::vector<std::pair<uint256, CTransactionRef>> vExtraTxnForCompact GUARDED_BY(g_cs_orphans);
} // namespace
@@ -273,6 +291,66 @@ struct CNodeState {
//! Time of last new block announcement
int64_t m_last_block_announcement;
+ /*
+ * State associated with transaction download.
+ *
+ * Tx download algorithm:
+ *
+ * When inv comes in, queue up (process_time, txid) inside the peer's
+ * CNodeState (m_tx_process_time) as long as m_tx_announced for the peer
+ * isn't too big (MAX_PEER_TX_ANNOUNCEMENTS).
+ *
+ * The process_time for a transaction is set to nNow for outbound peers,
+ * nNow + 2 seconds for inbound peers. This is the time at which we'll
+ * consider trying to request the transaction from the peer in
+ * SendMessages(). The delay for inbound peers is to allow outbound peers
+ * a chance to announce before we request from inbound peers, to prevent
+ * an adversary from using inbound connections to blind us to a
+ * transaction (InvBlock).
+ *
+ * When we call SendMessages() for a given peer,
+ * we will loop over the transactions in m_tx_process_time, looking
+ * at the transactions whose process_time <= nNow. We'll request each
+ * such transaction that we don't have already and that hasn't been
+ * requested from another peer recently, up until we hit the
+ * MAX_PEER_TX_IN_FLIGHT limit for the peer. Then we'll update
+ * g_already_asked_for for each requested txid, storing the time of the
+ * GETDATA request. We use g_already_asked_for to coordinate transaction
+ * requests amongst our peers.
+ *
+ * For transactions that we still need but we have already recently
+ * requested from some other peer, we'll reinsert (process_time, txid)
+ * back into the peer's m_tx_process_time at the point in the future at
+ * which the most recent GETDATA request would time out (ie
+ * GETDATA_TX_INTERVAL + the request time stored in g_already_asked_for).
+ * We add an additional delay for inbound peers, again to prefer
+ * attempting download from outbound peers first.
+ * We also add an extra small random delay up to 2 seconds
+ * to avoid biasing some peers over others. (e.g., due to fixed ordering
+ * of peer processing in ThreadMessageHandler).
+ *
+ * When we receive a transaction from a peer, we remove the txid from the
+ * peer's m_tx_in_flight set and from their recently announced set
+ * (m_tx_announced). We also clear g_already_asked_for for that entry, so
+ * that if somehow the transaction is not accepted but also not added to
+ * the reject filter, then we will eventually redownload from other
+ * peers.
+ */
+ struct TxDownloadState {
+ /* Track when to attempt download of announced transactions (process
+ * time in micros -> txid)
+ */
+ std::multimap<int64_t, uint256> m_tx_process_time;
+
+ //! Store all the transactions a peer has recently announced
+ std::set<uint256> m_tx_announced;
+
+ //! Store transactions which were requested by us
+ std::set<uint256> m_tx_in_flight;
+ };
+
+ TxDownloadState m_tx_download;
+
CNodeState(CAddress addrIn, std::string addrNameIn) : address(addrIn), name(addrNameIn) {
fCurrentlyConnected = false;
nMisbehavior = 0;
@@ -300,6 +378,9 @@ struct CNodeState {
}
};
+// Keeps track of the time (in microseconds) when transactions were requested last time
+limitedmap<uint256, int64_t> g_already_asked_for GUARDED_BY(cs_main)(MAX_INV_SZ);
+
/** Map maintaining per-node state. */
static std::map<NodeId, CNodeState> mapNodeState GUARDED_BY(cs_main);
@@ -590,6 +671,58 @@ static void FindNextBlocksToDownload(NodeId nodeid, unsigned int count, std::vec
}
}
+void EraseTxRequest(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ g_already_asked_for.erase(txid);
+}
+
+int64_t GetTxRequestTime(const uint256& txid) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ auto it = g_already_asked_for.find(txid);
+ if (it != g_already_asked_for.end()) {
+ return it->second;
+ }
+ return 0;
+}
+
+void UpdateTxRequestTime(const uint256& txid, int64_t request_time) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ auto it = g_already_asked_for.find(txid);
+ if (it == g_already_asked_for.end()) {
+ g_already_asked_for.insert(std::make_pair(txid, request_time));
+ } else {
+ g_already_asked_for.update(it, request_time);
+ }
+}
+
+
+void RequestTx(CNodeState* state, const uint256& txid, int64_t nNow) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+{
+ CNodeState::TxDownloadState& peer_download_state = state->m_tx_download;
+ if (peer_download_state.m_tx_announced.size() >= MAX_PEER_TX_ANNOUNCEMENTS || peer_download_state.m_tx_announced.count(txid)) {
+ // Too many queued announcements from this peer, or we already have
+ // this announcement
+ return;
+ }
+ peer_download_state.m_tx_announced.insert(txid);
+
+ int64_t process_time;
+ int64_t last_request_time = GetTxRequestTime(txid);
+ // First time requesting this tx
+ if (last_request_time == 0) {
+ process_time = nNow;
+ } else {
+ // Randomize the delay to avoid biasing some peers over others (such as due to
+ // fixed ordering of peer processing in ThreadMessageHandler)
+ process_time = last_request_time + GETDATA_TX_INTERVAL + GetRand(MAX_GETDATA_RANDOM_DELAY);
+ }
+
+ // We delay processing announcements from non-preferred (eg inbound) peers
+ if (!state->fPreferredDownload) process_time += INBOUND_PEER_TX_DELAY;
+
+ peer_download_state.m_tx_process_time.emplace(process_time, txid);
+}
+
} // namespace
// This function is used for testing the stale tip eviction logic, see
@@ -706,8 +839,9 @@ bool AddOrphanTx(const CTransactionRef& tx, NodeId peer) EXCLUSIVE_LOCKS_REQUIRE
return false;
}
- auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME});
+ auto ret = mapOrphanTransactions.emplace(hash, COrphanTx{tx, peer, GetTime() + ORPHAN_TX_EXPIRE_TIME, g_orphan_list.size()});
assert(ret.second);
+ g_orphan_list.push_back(ret.first);
for (const CTxIn& txin : tx->vin) {
mapOrphanTransactionsByPrev[txin.prevout].insert(ret.first);
}
@@ -733,6 +867,18 @@ int static EraseOrphanTx(uint256 hash) EXCLUSIVE_LOCKS_REQUIRED(g_cs_orphans)
if (itPrev->second.empty())
mapOrphanTransactionsByPrev.erase(itPrev);
}
+
+ size_t old_pos = it->second.list_pos;
+ assert(g_orphan_list[old_pos] == it);
+ if (old_pos + 1 != g_orphan_list.size()) {
+ // Unless we're deleting the last entry in g_orphan_list, move the last
+ // entry to the position we're deleting.
+ auto it_last = g_orphan_list.back();
+ g_orphan_list[old_pos] = it_last;
+ it_last->second.list_pos = old_pos;
+ }
+ g_orphan_list.pop_back();
+
mapOrphanTransactions.erase(it);
return 1;
}
@@ -783,11 +929,8 @@ unsigned int LimitOrphanTxSize(unsigned int nMaxOrphans)
while (mapOrphanTransactions.size() > nMaxOrphans)
{
// Evict a random orphan:
- uint256 randomhash = rng.rand256();
- std::map<uint256, COrphanTx>::iterator it = mapOrphanTransactions.lower_bound(randomhash);
- if (it == mapOrphanTransactions.end())
- it = mapOrphanTransactions.begin();
- EraseOrphanTx(it->first);
+ size_t randompos = rng.randrange(g_orphan_list.size());
+ EraseOrphanTx(g_orphan_list[randompos]->first);
++nEvicted;
}
return nEvicted;
@@ -841,9 +984,8 @@ static bool BlockRequestAllowed(const CBlockIndex* pindex, const Consensus::Para
(GetBlockProofEquivalentTime(*pindexBestHeader, *pindex, *pindexBestHeader, consensusParams) < STALE_RELAY_AGE_LIMIT);
}
-PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, CScheduler &scheduler, bool enable_bip61)
- : connman(connmanIn), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) {
-
+PeerLogicValidation::PeerLogicValidation(CConnman* connmanIn, BanMan* banman, CScheduler &scheduler, bool enable_bip61)
+ : connman(connmanIn), m_banman(banman), m_stale_tip_check_time(0), m_enable_bip61(enable_bip61) {
// Initialize global variables that cannot be constructed at startup.
recentRejects.reset(new CRollingBloomFilter(120000, 0.000001));
@@ -978,8 +1120,6 @@ void PeerLogicValidation::UpdatedBlockTip(const CBlockIndex *pindexNew, const CB
});
connman->WakeMessageHandler();
}
-
- nTimeBestReceived = GetTime();
}
/**
@@ -1570,6 +1710,67 @@ bool static ProcessHeadersMessage(CNode *pfrom, CConnman *connman, const std::ve
return true;
}
+void static ProcessOrphanTx(CConnman* connman, std::set<uint256>& orphan_work_set, std::list<CTransactionRef>& removed_txn) EXCLUSIVE_LOCKS_REQUIRED(cs_main, g_cs_orphans)
+{
+ AssertLockHeld(cs_main);
+ AssertLockHeld(g_cs_orphans);
+ std::set<NodeId> setMisbehaving;
+ bool done = false;
+ while (!done && !orphan_work_set.empty()) {
+ const uint256 orphanHash = *orphan_work_set.begin();
+ orphan_work_set.erase(orphan_work_set.begin());
+
+ auto orphan_it = mapOrphanTransactions.find(orphanHash);
+ if (orphan_it == mapOrphanTransactions.end()) continue;
+
+ const CTransactionRef porphanTx = orphan_it->second.tx;
+ const CTransaction& orphanTx = *porphanTx;
+ NodeId fromPeer = orphan_it->second.fromPeer;
+ bool fMissingInputs2 = false;
+ // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
+ // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
+ // anyone relaying LegitTxX banned)
+ CValidationState stateDummy;
+
+ if (setMisbehaving.count(fromPeer)) continue;
+ if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &removed_txn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
+ LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
+ RelayTransaction(orphanTx, connman);
+ for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
+ auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(orphanHash, i));
+ if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
+ for (const auto& elem : it_by_prev->second) {
+ orphan_work_set.insert(elem->first);
+ }
+ }
+ }
+ EraseOrphanTx(orphanHash);
+ done = true;
+ } else if (!fMissingInputs2) {
+ int nDos = 0;
+ if (stateDummy.IsInvalid(nDos) && nDos > 0) {
+ // Punish peer that gave us an invalid orphan tx
+ Misbehaving(fromPeer, nDos);
+ setMisbehaving.insert(fromPeer);
+ LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
+ }
+ // Has inputs but not accepted to mempool
+ // Probably non-standard or insufficient fee
+ LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
+ if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
+ // Do not use rejection cache for witness transactions or
+ // witness-stripped transactions, as they can have been malleated.
+ // See https://github.com/bitcoin/bitcoin/issues/8279 for details.
+ assert(recentRejects);
+ recentRejects->insert(orphanHash);
+ }
+ EraseOrphanTx(orphanHash);
+ done = true;
+ }
+ mempool.check(pcoinsTip.get());
+ }
+}
+
bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStream& vRecv, int64_t nTimeReceived, const CChainParams& chainparams, CConnman* connman, const std::atomic<bool>& interruptMsgProc, bool enable_bip61)
{
LogPrint(BCLog::NET, "received: %s (%u bytes) peer=%d\n", SanitizeString(strCommand), vRecv.size(), pfrom->GetId());
@@ -1639,7 +1840,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
ServiceFlags nServices;
int nVersion;
int nSendVersion;
- std::string strSubVer;
std::string cleanSubVer;
int nStartingHeight = -1;
bool fRelay = true;
@@ -1676,6 +1876,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (!vRecv.empty())
vRecv >> addrFrom >> nNonce;
if (!vRecv.empty()) {
+ std::string strSubVer;
vRecv >> LIMITED_STRING(strSubVer, MAX_SUBVERSION_LENGTH);
cleanSubVer = SanitizeString(strSubVer);
}
@@ -1707,7 +1908,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
pfrom->SetAddrLocal(addrMe);
{
LOCK(pfrom->cs_SubVer);
- pfrom->strSubVer = strSubVer;
pfrom->cleanSubVer = cleanSubVer;
}
pfrom->nStartingHeight = nStartingHeight;
@@ -1879,6 +2079,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (addr.nTime <= 100000000 || addr.nTime > nNow + 10 * 60)
addr.nTime = nNow - 5 * 24 * 60 * 60;
pfrom->AddAddressKnown(addr);
+ if (g_banman->IsBanned(addr)) continue; // Do not process banned addresses beyond remembering we received them
bool fReachable = IsReachable(addr);
if (addr.nTime > nSince && !pfrom->fGetAddr && vAddr.size() <= 10 && addr.IsRoutable())
{
@@ -1945,6 +2146,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
LOCK(cs_main);
uint32_t nFetchFlags = GetFetchFlags(pfrom);
+ int64_t nNow = GetTimeMicros();
for (CInv &inv : vInv)
{
@@ -1976,7 +2178,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
if (fBlocksOnly) {
LogPrint(BCLog::NET, "transaction (%s) inv sent in violation of protocol peer=%d\n", inv.hash.ToString(), pfrom->GetId());
} else if (!fAlreadyHave && !fImporting && !fReindex && !IsInitialBlockDownload()) {
- pfrom->AskFor(inv);
+ RequestTx(State(pfrom->GetId()), inv.hash, nNow);
}
}
}
@@ -2197,8 +2399,6 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
- std::deque<COutPoint> vWorkQueue;
- std::vector<uint256> vEraseQueue;
CTransactionRef ptx;
vRecv >> ptx;
const CTransaction& tx = *ptx;
@@ -2211,8 +2411,10 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
bool fMissingInputs = false;
CValidationState state;
- pfrom->setAskFor.erase(inv.hash);
- mapAlreadyAskedFor.erase(inv.hash);
+ CNodeState* nodestate = State(pfrom->GetId());
+ nodestate->m_tx_download.m_tx_announced.erase(inv.hash);
+ nodestate->m_tx_download.m_tx_in_flight.erase(inv.hash);
+ EraseTxRequest(inv.hash);
std::list<CTransactionRef> lRemovedTxn;
@@ -2221,7 +2423,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.check(pcoinsTip.get());
RelayTransaction(tx, connman);
for (unsigned int i = 0; i < tx.vout.size(); i++) {
- vWorkQueue.emplace_back(inv.hash, i);
+ auto it_by_prev = mapOrphanTransactionsByPrev.find(COutPoint(inv.hash, i));
+ if (it_by_prev != mapOrphanTransactionsByPrev.end()) {
+ for (const auto& elem : it_by_prev->second) {
+ pfrom->orphan_work_set.insert(elem->first);
+ }
+ }
}
pfrom->nLastTXTime = GetTime();
@@ -2232,65 +2439,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
mempool.size(), mempool.DynamicMemoryUsage() / 1000);
// Recursively process any orphan transactions that depended on this one
- std::set<NodeId> setMisbehaving;
- while (!vWorkQueue.empty()) {
- auto itByPrev = mapOrphanTransactionsByPrev.find(vWorkQueue.front());
- vWorkQueue.pop_front();
- if (itByPrev == mapOrphanTransactionsByPrev.end())
- continue;
- for (auto mi = itByPrev->second.begin();
- mi != itByPrev->second.end();
- ++mi)
- {
- const CTransactionRef& porphanTx = (*mi)->second.tx;
- const CTransaction& orphanTx = *porphanTx;
- const uint256& orphanHash = orphanTx.GetHash();
- NodeId fromPeer = (*mi)->second.fromPeer;
- bool fMissingInputs2 = false;
- // Use a dummy CValidationState so someone can't setup nodes to counter-DoS based on orphan
- // resolution (that is, feeding people an invalid transaction based on LegitTxX in order to get
- // anyone relaying LegitTxX banned)
- CValidationState stateDummy;
-
-
- if (setMisbehaving.count(fromPeer))
- continue;
- if (AcceptToMemoryPool(mempool, stateDummy, porphanTx, &fMissingInputs2, &lRemovedTxn, false /* bypass_limits */, 0 /* nAbsurdFee */)) {
- LogPrint(BCLog::MEMPOOL, " accepted orphan tx %s\n", orphanHash.ToString());
- RelayTransaction(orphanTx, connman);
- for (unsigned int i = 0; i < orphanTx.vout.size(); i++) {
- vWorkQueue.emplace_back(orphanHash, i);
- }
- vEraseQueue.push_back(orphanHash);
- }
- else if (!fMissingInputs2)
- {
- int nDos = 0;
- if (stateDummy.IsInvalid(nDos) && nDos > 0)
- {
- // Punish peer that gave us an invalid orphan tx
- Misbehaving(fromPeer, nDos);
- setMisbehaving.insert(fromPeer);
- LogPrint(BCLog::MEMPOOL, " invalid orphan tx %s\n", orphanHash.ToString());
- }
- // Has inputs but not accepted to mempool
- // Probably non-standard or insufficient fee
- LogPrint(BCLog::MEMPOOL, " removed orphan tx %s\n", orphanHash.ToString());
- vEraseQueue.push_back(orphanHash);
- if (!orphanTx.HasWitness() && !stateDummy.CorruptionPossible()) {
- // Do not use rejection cache for witness transactions or
- // witness-stripped transactions, as they can have been malleated.
- // See https://github.com/bitcoin/bitcoin/issues/8279 for details.
- assert(recentRejects);
- recentRejects->insert(orphanHash);
- }
- }
- mempool.check(pcoinsTip.get());
- }
- }
-
- for (const uint256& hash : vEraseQueue)
- EraseOrphanTx(hash);
+ ProcessOrphanTx(connman, pfrom->orphan_work_set, lRemovedTxn);
}
else if (fMissingInputs)
{
@@ -2303,10 +2452,12 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
}
if (!fRejectedParents) {
uint32_t nFetchFlags = GetFetchFlags(pfrom);
+ int64_t nNow = GetTimeMicros();
+
for (const CTxIn& txin : tx.vin) {
CInv _inv(MSG_TX | nFetchFlags, txin.prevout.hash);
pfrom->AddInventoryKnown(_inv);
- if (!AlreadyHave(_inv)) pfrom->AskFor(_inv);
+ if (!AlreadyHave(_inv)) RequestTx(State(pfrom->GetId()), _inv.hash, nNow);
}
AddOrphanTx(ptx, pfrom->GetId());
@@ -2392,8 +2543,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
- if (strCommand == NetMsgType::CMPCTBLOCK && !fImporting && !fReindex) // Ignore blocks received while importing
+ if (strCommand == NetMsgType::CMPCTBLOCK)
{
+ // Ignore cmpctblock received while importing
+ if (fImporting || fReindex) {
+ LogPrint(BCLog::NET, "Unexpected cmpctblock message received from peer %d\n", pfrom->GetId());
+ return true;
+ }
+
CBlockHeaderAndShortTxIDs cmpctblock;
vRecv >> cmpctblock;
@@ -2613,8 +2770,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
- if (strCommand == NetMsgType::BLOCKTXN && !fImporting && !fReindex) // Ignore blocks received while importing
+ if (strCommand == NetMsgType::BLOCKTXN)
{
+ // Ignore blocktxn received while importing
+ if (fImporting || fReindex) {
+ LogPrint(BCLog::NET, "Unexpected blocktxn message received from peer %d\n", pfrom->GetId());
+ return true;
+ }
+
BlockTransactions resp;
vRecv >> resp;
@@ -2688,8 +2851,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
- if (strCommand == NetMsgType::HEADERS && !fImporting && !fReindex) // Ignore headers received while importing
+ if (strCommand == NetMsgType::HEADERS)
{
+ // Ignore headers received while importing
+ if (fImporting || fReindex) {
+ LogPrint(BCLog::NET, "Unexpected headers message received from peer %d\n", pfrom->GetId());
+ return true;
+ }
+
std::vector<CBlockHeader> headers;
// Bypass the normal CBlock deserialization, as we don't want to risk deserializing 2000 full blocks.
@@ -2713,8 +2882,14 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return ProcessHeadersMessage(pfrom, connman, headers, chainparams, should_punish);
}
- if (strCommand == NetMsgType::BLOCK && !fImporting && !fReindex) // Ignore blocks received while importing
+ if (strCommand == NetMsgType::BLOCK)
{
+ // Ignore block received while importing
+ if (fImporting || fReindex) {
+ LogPrint(BCLog::NET, "Unexpected block message received from peer %d\n", pfrom->GetId());
+ return true;
+ }
+
std::shared_ptr<CBlock> pblock = std::make_shared<CBlock>();
vRecv >> *pblock;
@@ -2764,8 +2939,11 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
pfrom->vAddrToSend.clear();
std::vector<CAddress> vAddr = connman->GetAddresses();
FastRandomContext insecure_rand;
- for (const CAddress &addr : vAddr)
- pfrom->PushAddress(addr, insecure_rand);
+ for (const CAddress &addr : vAddr) {
+ if (!g_banman->IsBanned(addr)) {
+ pfrom->PushAddress(addr, insecure_rand);
+ }
+ }
return true;
}
@@ -2943,7 +3121,7 @@ bool static ProcessMessage(CNode* pfrom, const std::string& strCommand, CDataStr
return true;
}
-static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman, bool enable_bip61) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
+bool PeerLogicValidation::SendRejectsAndCheckIfBanned(CNode* pnode, bool enable_bip61) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
AssertLockHeld(cs_main);
CNodeState &state = *State(pnode->GetId());
@@ -2961,14 +3139,16 @@ static bool SendRejectsAndCheckIfBanned(CNode* pnode, CConnman* connman, bool en
LogPrintf("Warning: not punishing whitelisted peer %s!\n", pnode->addr.ToString());
else if (pnode->m_manual_connection)
LogPrintf("Warning: not punishing manually-connected peer %s!\n", pnode->addr.ToString());
- else {
+ else if (pnode->addr.IsLocal()) {
+ // Disconnect but don't ban _this_ local node
+ LogPrintf("Warning: disconnecting but not banning local peer %s!\n", pnode->addr.ToString());
pnode->fDisconnect = true;
- if (pnode->addr.IsLocal())
- LogPrintf("Warning: not banning local peer %s!\n", pnode->addr.ToString());
- else
- {
- connman->Ban(pnode->addr, BanReasonNodeMisbehaving);
+ } else {
+ // Disconnect and ban all nodes sharing the address
+ if (m_banman) {
+ m_banman->Ban(pnode->addr, BanReasonNodeMisbehaving);
}
+ connman->DisconnectNode(pnode->addr);
}
return true;
}
@@ -2991,11 +3171,21 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
if (!pfrom->vRecvGetData.empty())
ProcessGetData(pfrom, chainparams, connman, interruptMsgProc);
+ if (!pfrom->orphan_work_set.empty()) {
+ std::list<CTransactionRef> removed_txn;
+ LOCK2(cs_main, g_cs_orphans);
+ ProcessOrphanTx(connman, pfrom->orphan_work_set, removed_txn);
+ for (const CTransactionRef& removedTx : removed_txn) {
+ AddToCompactExtraTransactions(removedTx);
+ }
+ }
+
if (pfrom->fDisconnect)
return false;
// this maintains the order of responses
if (!pfrom->vRecvGetData.empty()) return true;
+ if (!pfrom->orphan_work_set.empty()) return true;
// Don't bother if send buffer is too full to respond anyway
if (pfrom->fPauseSend)
@@ -3092,7 +3282,7 @@ bool PeerLogicValidation::ProcessMessages(CNode* pfrom, std::atomic<bool>& inter
}
LOCK(cs_main);
- SendRejectsAndCheckIfBanned(pfrom, connman, m_enable_bip61);
+ SendRejectsAndCheckIfBanned(pfrom, m_enable_bip61);
return fMoreWork;
}
@@ -3293,8 +3483,7 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
if (!lockMain)
return true;
- if (SendRejectsAndCheckIfBanned(pto, connman, m_enable_bip61))
- return true;
+ if (SendRejectsAndCheckIfBanned(pto, m_enable_bip61)) return true;
CNodeState &state = *State(pto->GetId());
// Address refresh broadcast
@@ -3358,14 +3547,6 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
}
}
- // Resend wallet transactions that haven't gotten in a block yet
- // Except during reindex, importing and IBD, when old wallet
- // transactions become unconfirmed and spams other nodes.
- if (!fReindex && !fImporting && !IsInitialBlockDownload())
- {
- GetMainSignals().Broadcast(nTimeBestReceived, connman);
- }
-
//
// Try sending block announcements via headers
//
@@ -3730,24 +3911,39 @@ bool PeerLogicValidation::SendMessages(CNode* pto)
//
// Message: getdata (non-blocks)
//
- while (!pto->mapAskFor.empty() && (*pto->mapAskFor.begin()).first <= nNow)
- {
- const CInv& inv = (*pto->mapAskFor.begin()).second;
- if (!AlreadyHave(inv))
- {
- LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
- vGetData.push_back(inv);
- if (vGetData.size() >= 1000)
- {
- connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
- vGetData.clear();
+ auto& tx_process_time = state.m_tx_download.m_tx_process_time;
+ while (!tx_process_time.empty() && tx_process_time.begin()->first <= nNow && state.m_tx_download.m_tx_in_flight.size() < MAX_PEER_TX_IN_FLIGHT) {
+ const uint256& txid = tx_process_time.begin()->second;
+ CInv inv(MSG_TX | GetFetchFlags(pto), txid);
+ if (!AlreadyHave(inv)) {
+ // If this transaction was last requested more than 1 minute ago,
+ // then request.
+ int64_t last_request_time = GetTxRequestTime(inv.hash);
+ if (last_request_time <= nNow - GETDATA_TX_INTERVAL) {
+ LogPrint(BCLog::NET, "Requesting %s peer=%d\n", inv.ToString(), pto->GetId());
+ vGetData.push_back(inv);
+ if (vGetData.size() >= MAX_GETDATA_SZ) {
+ connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));
+ vGetData.clear();
+ }
+ UpdateTxRequestTime(inv.hash, nNow);
+ state.m_tx_download.m_tx_in_flight.insert(inv.hash);
+ } else {
+ // This transaction is in flight from someone else; queue
+ // up processing to happen after the download times out
+ // (with a slight delay for inbound peers, to prefer
+ // requests to outbound peers).
+ RequestTx(&state, txid, nNow);
}
} else {
- //If we're not going to ask, don't expect a response.
- pto->setAskFor.erase(inv.hash);
+ // We have already seen this transaction, no need to download.
+ state.m_tx_download.m_tx_announced.erase(inv.hash);
+ state.m_tx_download.m_tx_in_flight.erase(inv.hash);
}
- pto->mapAskFor.erase(pto->mapAskFor.begin());
+ tx_process_time.erase(tx_process_time.begin());
}
+
+
if (!vGetData.empty())
connman->PushMessage(pto, msgMaker.Make(NetMsgType::GETDATA, vGetData));