From 53ad9a133a53feb35e31698720cec69c14f56dc1 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:05 -0500 Subject: net: fix typo causing the wrong receive buffer size Surprisingly this hasn't been causing me any issues while testing, probably because it requires lots of large blocks to be flying around. Send/Recv corks need tests! --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index bf2beb774..e7c7cf011 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2102,7 +2102,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c nMaxFeeler = connOptions.nMaxFeeler; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; - nReceiveFloodSize = connOptions.nSendBufferMaxSize; + nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundLimit = connOptions.nMaxOutboundLimit; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; -- cgit v1.2.3 From e5bcd9c84fd3107321ff6dbdef067ba03f2b43cb Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:07 -0500 Subject: net: make vRecvMsg a list so that we can use splice() --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index e7c7cf011..3478d04a7 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1859,7 +1859,7 @@ void CConnman::ThreadMessageHandler() if (pnode->nSendSize < GetSendBufferSize()) { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) + if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete())) { fSleep = false; } -- cgit v1.2.3 From f6315e07f9383f3f43e37ada0d6a835810d610b9 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:11 -0500 Subject: net: only disconnect if fDisconnect has been set These conditions are problematic to check without locking, and we shouldn't be relying on the refcount to disconnect. --- src/net.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 3478d04a7..65f1c62ad 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1051,8 +1051,7 @@ void CConnman::ThreadSocketHandler() std::vector vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) + if (pnode->fDisconnect) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); -- cgit v1.2.3 From 60425870d78cf2bef1fce926ad53f51166c6a3f0 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:13 -0500 Subject: net: wait until the node is destroyed to delete its recv buffer when vRecvMsg becomes a private buffer, it won't make sense to allow other threads to mess with it anymore. --- src/net.cpp | 5 ----- 1 file changed, 5 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 65f1c62ad..9ce7475bc 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -437,11 +437,6 @@ void CNode::CloseSocketDisconnect() LogPrint("net", "disconnecting peer=%d\n", id); CloseSocket(hSocket); } - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } void CConnman::ClearBanned() -- cgit v1.2.3 From 56212e20acf1534d443cb910c9bf3a30f84d0f02 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:15 -0500 Subject: net: set message deserialization version when it's actually time to deserialize We'll soon no longer have access to vRecvMsg, and this is more intuitive anyway. --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 9ce7475bc..6d9971a0f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -653,7 +653,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); -- cgit v1.2.3 From 60befa3997b98d3f913010f2621bec734f643526 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:17 -0500 Subject: net: handle message accounting in ReceiveMsgBytes This allows locking to be pushed down to only where it's needed Also reuse the current time rather than checking multiple times. --- src/net.cpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 6d9971a0f..776047171 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -648,6 +648,9 @@ void CNode::copyStats(CNodeStats &stats) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; + int64_t nTimeMicros = GetTimeMicros(); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one @@ -685,7 +688,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete assert(i != mapRecvBytesPerMsgCmd.end()); i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; - msg.nTime = GetTimeMicros(); + msg.nTime = nTimeMicros; complete = true; } } @@ -1237,8 +1240,6 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); if(notify) condMsgProc.notify_one(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); } else if (nBytes == 0) -- cgit v1.2.3 From f5c36d19b636f01cc24417bc2b2f5b226ff3dd2c Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:19 -0500 Subject: net: record bytes written before notifying the message processor --- src/net.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 776047171..312a6e094 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1238,9 +1238,9 @@ void CConnman::ThreadSocketHandler() bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); + RecordBytesRecv(nBytes); if(notify) condMsgProc.notify_one(); - RecordBytesRecv(nBytes); } else if (nBytes == 0) { -- cgit v1.2.3 From ef7b5ecbb73e9fd3670494c99cfc13ccf3574170 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:21 -0500 Subject: net: Add a simple function for waking the message handler This may be used publicly in the future --- src/net.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 312a6e094..36db77abb 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1239,8 +1239,8 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); - if(notify) - condMsgProc.notify_one(); + if (notify) + WakeMessageHandler(); } else if (nBytes == 0) { @@ -1315,8 +1315,10 @@ void CConnman::ThreadSocketHandler() } } - - +void CConnman::WakeMessageHandler() +{ + condMsgProc.notify_one(); +} -- cgit v1.2.3 From c5a8b1b946b1ab0bb82bd4270b2a40f5731abcff Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:26 -0500 Subject: net: rework the way that the messagehandler sleeps In order to sleep accurately, the message handler needs to know if _any_ node has more processing that it should do before the entire thread sleeps. Rather than returning a value that represents whether ProcessMessages encountered a message that should trigger a disconnnect, interpret the return value as whether or not that node has more work to do. Also, use a global fProcessWake value that can be set by other threads, which takes precedence (for one cycle) over the messagehandler's decision. Note that the previous behavior was to only process one message per loop (except in the case of a bad checksum or invalid header). That was changed in PR #3180. The only change here in that regard is that the current node now falls to the back of the processing queue for the bad checksum/invalid header cases. --- src/net.cpp | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 36db77abb..947f01679 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1317,6 +1317,10 @@ void CConnman::ThreadSocketHandler() void CConnman::WakeMessageHandler() { + { + std::lock_guard lock(mutexMsgProc); + fMsgProcWake = true; + } condMsgProc.notify_one(); } @@ -1839,7 +1843,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1851,16 +1855,8 @@ void CConnman::ThreadMessageHandler() TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg.front().complete())) - { - fSleep = false; - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); } } if (flagInterruptMsgProc) @@ -1882,10 +1878,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) { - std::unique_lock lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2156,6 +2153,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread >, "net", std::function(std::bind(&CConnman::ThreadSocketHandler, this))); -- cgit v1.2.3 From 4d712e366ca7fffaf96394ef01c9246482c0d92e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:28 -0500 Subject: net: add a new message queue for the message processor This separates the storage of messages from the net and queued messages for processing, allowing the locks to be split. --- src/net.cpp | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 947f01679..df2109e3f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1239,8 +1239,18 @@ void CConnman::ThreadSocketHandler() if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); - if (notify) + if (notify) { + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + } WakeMessageHandler(); + } } else if (nBytes == 0) { -- cgit v1.2.3 From c6e8a9bcffe4c0f236e27c663f08785d1a0a783b Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:30 -0500 Subject: net: add a flag to indicate when a node's process queue is full Messages are dumped very quickly from the socket handler to the processor, so it's the depth of the processing queue that's interesting. The socket handler checks the process queue's size during the brief message hand-off and pauses if necessary, and the processor possibly unpauses each time a message is popped off of its queue. --- src/net.cpp | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index df2109e3f..70c04d7a0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1165,9 +1165,7 @@ void CConnman::ThreadSocketHandler() } { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (lockRecv && !pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1240,14 +1238,18 @@ void CConnman::ThreadSocketHandler() pnode->CloseSocketDisconnect(); RecordBytesRecv(nBytes); if (notify) { + size_t nSizeAdded = 0; auto it(pnode->vRecvMsg.begin()); for (; it != pnode->vRecvMsg.end(); ++it) { if (!it->complete()) break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; } { LOCK(pnode->cs_vProcessMsg); pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; } WakeMessageHandler(); } @@ -2592,6 +2594,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; -- cgit v1.2.3 From 991955ee81034dc3fbc1c2a8e60c04fc9e0b538c Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:32 -0500 Subject: net: add a flag to indicate when a node's send buffer is full Similar to the recv flag, but this one indicates whether or not the net's send buffer is full. The socket handler checks the send queue when a new message is added and pauses if necessary, and possibly unpauses after each message is drained from its buffer. --- src/net.cpp | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 70c04d7a0..8ae2bebd3 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -761,7 +761,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -778,6 +778,7 @@ size_t SocketSendData(CNode *pnode) if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1286,8 +1287,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { size_t nBytes = SocketSendData(pnode); - if (nBytes) + if (nBytes) { RecordBytesSent(nBytes); + } } } @@ -1868,7 +1870,7 @@ void CConnman::ThreadMessageHandler() if (lockRecv) { bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && pnode->nSendSize < GetSendBufferSize()); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); } } if (flagInterruptMsgProc) @@ -2595,6 +2597,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; fPauseRecv = false; + fPauseSend = false; nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) @@ -2675,6 +2678,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); -- cgit v1.2.3 From e60360e139852c655930e99d4bb4db554cd8385e Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Sat, 31 Dec 2016 02:05:36 -0500 Subject: net: remove cs_vRecvMsg vRecvMsg is now only touched by the socket handler thread. The accounting vars (nRecvBytes/nLastRecv/mapRecvBytesPerMsgCmd) are also only used by the socket handler thread, with the exception of queries from rpc/gui. These accesses are not threadsafe, but they never were. This needs to be addressed separately. Also, update comment describing data flow --- src/net.cpp | 33 +++++++-------------------------- 1 file changed, 7 insertions(+), 26 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 8ae2bebd3..11638fcc0 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -644,7 +644,6 @@ void CNode::copyStats(CNodeStats &stats) } #undef X -// requires LOCK(cs_vRecvMsg) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; @@ -1080,13 +1079,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) fDelete = true; - } } } if (fDelete) @@ -1146,15 +1141,10 @@ void CConnman::ThreadSocketHandler() // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { @@ -1165,8 +1155,7 @@ void CConnman::ThreadSocketHandler() } } { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && !pnode->fPauseRecv) + if (!pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1225,8 +1214,6 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) { { // typical socket buffer is 8K-64K @@ -1865,14 +1852,8 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); - fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; -- cgit v1.2.3