From c5a8b1b946b1ab0bb82bd4270b2a40f5731abcff Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:26 -0500 Subject: net: rework the way that the messagehandler sleeps In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases. --- src/net.cpp | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 36db77abb..947f01679 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler() void CConnman::WakeMessageHandler() { + { + std::lock_guard lock(mutexMsgProc); + fMsgProcWake = true; + } condMsgProc.notify_one(); } @@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete())) - { - fSleep = false; - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); } } if (flagInterruptMsgProc) @@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) { - std::unique_lock lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); -- cgit v1.2.3