aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorCory Fields <[email protected]>2016-12-31 02:05:30 -0500
committerCory Fields <[email protected]>2017-01-12 23:05:47 -0500
commitc6e8a9bcffe4c0f236e27c663f08785d1a0a783b (patch)
treeb622192961f036991f6de2ceea0d483931a78eca /src
parentnet: add a new message queue for the message processor (diff)
downloaddiscoin-c6e8a9bcffe4c0f236e27c663f08785d1a0a783b.tar.xz
discoin-c6e8a9bcffe4c0f236e27c663f08785d1a0a783b.zip
net: add a flag to indicate when a node's process queue is full
Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue.
Diffstat (limited to 'src')
-rw-r--r--src/net.cpp10
-rw-r--r--src/net.h11
-rw-r--r--src/net_processing.cpp2
3 files changed, 11 insertions, 12 deletions
diff --git a/src/net.cpp b/src/net.cpp
index df2109e3f..70c04d7a0 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler()
}
{
TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
+ if (lockRecv && !pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
@@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler()
pnode->CloseSocketDisconnect();
RecordBytesRecv(nBytes);
if (notify) {
+ size_t nSizeAdded = 0;
auto it(pnode->vRecvMsg.begin());
for (; it != pnode->vRecvMsg.end(); ++it) {
if (!it->complete())
break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
}
{
LOCK(pnode->cs_vProcessMsg);
pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
}
WakeMessageHandler();
}
@@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
diff --git a/src/net.h b/src/net.h
index 21864e73d..0eb430a8b 100644
--- a/src/net.h
+++ b/src/net.h
@@ -610,6 +610,7 @@ public:
CCriticalSection cs_vProcessMsg;
std::list<CNetMessage> vProcessMsg;
+ size_t nProcessQueueSize;
std::deque<CInv> vRecvGetData;
std::list<CNetMessage> vRecvMsg;
@@ -650,6 +651,7 @@ public:
const NodeId id;
const uint64_t nKeyedNetGroup;
+ std::atomic_bool fPauseRecv;
protected:
mapMsgCmdSize mapSendBytesPerMsgCmd;
@@ -744,15 +746,6 @@ public:
}
// requires LOCK(cs_vRecvMsg)
- unsigned int GetTotalRecvSize()
- {
- unsigned int total = 0;
- BOOST_FOREACH(const CNetMessage &msg, vRecvMsg)
- total += msg.vRecv.size() + 24;
- return total;
- }
-
- // requires LOCK(cs_vRecvMsg)
bool ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete);
void SetRecvVersion(int nVersionIn)
diff --git a/src/net_processing.cpp b/src/net_processing.cpp
index 9963a872e..93b6e2ec0 100644
--- a/src/net_processing.cpp
+++ b/src/net_processing.cpp
@@ -2475,6 +2475,8 @@ bool ProcessMessages(CNode* pfrom, CConnman& connman, std::atomic<bool>& interru
return false;
// Just take one message
msgs.splice(msgs.begin(), pfrom->vProcessMsg, pfrom->vProcessMsg.begin());
+ pfrom->nProcessQueueSize -= msgs.front().vRecv.size() + CMessageHeader::HEADER_SIZE;
+ pfrom->fPauseRecv = pfrom->nProcessQueueSize > connman.GetReceiveFloodSize();
fMoreWork = !pfrom->vProcessMsg.empty();
}
CNetMessage& msg(msgs.front());