From 607dbfdeaf7ec053d959c47c125d60c0b7e7216a Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 19:41:12 -0500 Subject: P2P: parse network datastream into header/data components in socket thread Replaces CNode::vRecv buffer with a vector of CNetMessage's. This simplifies ProcessMessages() and eliminates several redundant data copies. Overview: * socket thread now parses incoming message datastream into header/data components, as encapsulated by CNetMessage * socket thread adds each CNetMessage to a vector inside CNode * message thread (ProcessMessages) iterates through CNode's CNetMessage vector Message parsing is made more strict: * Socket is disconnected, if message larger than MAX_SIZE or if CMessageHeader deserialization fails (latter is impossible?). Previously, code would simply eat garbage data all day long. * Socket is disconnected, if we fail to find pchMessageStart. We do not search through garbage, to find pchMessageStart. Each message must begin precisely after the last message ends. ProcessMessages() always processes a complete message, and is more efficient: * buffer is always precisely sized, using CDataStream::resize(), rather than progressively sized in 64k chunks. More efficient for large messages like "block". * whole-buffer memory copy eliminated (vRecv -> vMsg) * other buffer-shifting memory copies eliminated (vRecv.insert, vRecv.erase) --- src/net.cpp | 96 ++++++++++++++++++++++++++++++++++++++++++++++++++++--------- 1 file changed, 83 insertions(+), 13 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 6c8fe3ffc..0e558228d 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,7 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecv.clear(); + vRecvMsg.clear(); } } @@ -628,6 +628,78 @@ void CNode::copyStats(CNodeStats &stats) } #undef X +// requires LOCK(cs_vRecvMsg) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +{ + while (nBytes > 0) { + + // get current incomplete message, or create a new one + if (vRecvMsg.size() == 0 || + vRecvMsg.back().complete()) + vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + + CNetMessage& msg = vRecvMsg.back(); + + // absorb network data + int handled; + if (!msg.in_data) + handled = msg.readHeader(pch, nBytes); + else + handled = msg.readData(pch, nBytes); + + if (handled < 0) + return false; + + pch += handled; + nBytes -= handled; + } + + return true; +} + +int CNetMessage::readHeader(const char *pch, unsigned int nBytes) +{ + // copy data to temporary parsing buffer + unsigned int nRemaining = 24 - nHdrPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&hdrbuf[nHdrPos], pch, nCopy); + nHdrPos += nCopy; + + // if header incomplete, exit + if (nHdrPos < 24) + return nCopy; + + // deserialize to CMessageHeader + try { + hdrbuf >> hdr; + } + catch (std::exception &e) { + return -1; + } + + // reject messages larger than MAX_SIZE + if (hdr.nMessageSize > MAX_SIZE) + return -1; + + // switch state to reading message data + in_data = true; + vRecv.resize(hdr.nMessageSize); + + return nCopy; +} + +int CNetMessage::readData(const char *pch, unsigned int nBytes) +{ + unsigned int nRemaining = hdr.nMessageSize - nDataPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + + memcpy(&vRecv[nDataPos], pch, nCopy); + nDataPos += nCopy; + + return nCopy; +} + @@ -676,7 +748,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -708,7 +780,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { TRY_LOCK(pnode->cs_inventory, lockInv); @@ -873,15 +945,12 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - CDataStream& vRecv = pnode->vRecv; - unsigned int nPos = vRecv.size(); - - if (nPos > ReceiveBufferSize()) { + if (pnode->GetTotalRecvSize() > ReceiveFloodSize()) { if (!pnode->fDisconnect) - printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size()); + printf("socket recv flood control disconnect (%u bytes)\n", pnode->GetTotalRecvSize()); pnode->CloseSocketDisconnect(); } else { @@ -890,8 +959,8 @@ void ThreadSocketHandler2(void* parg) int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); if (nBytes > 0) { - vRecv.resize(nPos + nBytes); - memcpy(&vRecv[nPos], pchBuf, nBytes); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) + pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); } else if (nBytes == 0) @@ -1693,9 +1762,10 @@ void ThreadMessageHandler2(void* parg) { // Receive messages { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) - ProcessMessages(pnode); + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); } if (fShutdown) return; -- cgit v1.2.3 From bc2f5aa72cfb3f456280a6d34c5d425bf24b009c Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 18:04:52 -0500 Subject: P2P, cosmetic: break out buffer send(2) code into separate function --- src/net.cpp | 47 +++++++++++++++++++++++++---------------------- 1 file changed, 25 insertions(+), 22 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 0e558228d..96719367c 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -708,6 +708,30 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) +// requires LOCK(cs_vSend) +void SocketSendData(CNode *pnode) +{ + CDataStream& vSend = pnode->vSend; + if (vSend.empty()) + return; + + int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) + { + vSend.erase(vSend.begin(), vSend.begin() + nBytes); + pnode->nLastSend = GetTime(); + } + else if (nBytes < 0) + { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } +} void ThreadSocketHandler(void* parg) { @@ -994,28 +1018,7 @@ void ThreadSocketHandler2(void* parg) { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - { - CDataStream& vSend = pnode->vSend; - if (!vSend.empty()) - { - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); - } - } - } - } + SocketSendData(pnode); } // -- cgit v1.2.3 From b9ff2970b9fbb24e2fffc449b4ef478d019633d8 Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 18:20:26 -0500 Subject: P2P: improve RX/TX flow control 1) "optimistic write": Push each message to kernel socket buffer immediately. 2) If there is write data at select time, that implies send() blocked during optimistic write. Drain write queue, before receiving any more messages. This avoids needlessly queueing received data, if the remote peer is not themselves receiving data. Result: write buffer (and thus memory usage) is kept small, DoS potential is slightly lower, and TCP flow control signalling is properly utilized. The kernel will queue data into the socket buffer, then signal the remote peer to stop sending data, until we resume reading again. --- src/net.cpp | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 96719367c..eafb33564 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -855,14 +855,18 @@ void ThreadSocketHandler2(void* parg) { if (pnode->hSocket == INVALID_SOCKET) continue; - FD_SET(pnode->hSocket, &fdsetRecv); - FD_SET(pnode->hSocket, &fdsetError); - hSocketMax = max(hSocketMax, pnode->hSocket); - have_fds = true; { TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend && !pnode->vSend.empty()) - FD_SET(pnode->hSocket, &fdsetSend); + if (lockSend) { + // do not read, if draining write queue + if (!pnode->vSend.empty()) + FD_SET(pnode->hSocket, &fdsetSend); + else + FD_SET(pnode->hSocket, &fdsetRecv); + FD_SET(pnode->hSocket, &fdsetError); + hSocketMax = max(hSocketMax, pnode->hSocket); + have_fds = true; + } } } } -- cgit v1.2.3 From 967f24590b43f0f84148f669d886b40fe45aa978 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Fri, 1 Mar 2013 01:41:28 +0100 Subject: Some fixes to CNetMessage processing * Change CNode::vRecvMsg to be a deque instead of a vector (less copying) * Make sure to acquire cs_vRecvMsg in CNode::CloseSocketDisconnect (as it may be called without that lock). --- src/net.cpp | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index eafb33564..1016d5d9f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -536,7 +536,11 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecvMsg.clear(); + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); } } @@ -634,7 +638,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) while (nBytes > 0) { // get current incomplete message, or create a new one - if (vRecvMsg.size() == 0 || + if (vRecvMsg.empty() || vRecvMsg.back().complete()) vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); @@ -1767,6 +1771,9 @@ void ThreadMessageHandler2(void* parg) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; BOOST_FOREACH(CNode* pnode, vNodesCopy) { + if (pnode->fDisconnect) + continue; + // Receive messages { TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); -- cgit v1.2.3 From 41b052ad87633d5a8a989c512c8710b875f2ba88 Mon Sep 17 00:00:00 2001 From: Pieter Wuille Date: Sun, 24 Mar 2013 16:52:24 +0100 Subject: Use per-message send buffer, rather than per connection --- src/net.cpp | 59 ++++++++++++++++++++++++++++++++++++++--------------------- 1 file changed, 38 insertions(+), 21 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 1016d5d9f..9ee6cb423 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -715,26 +715,43 @@ int CNetMessage::readData(const char *pch, unsigned int nBytes) // requires LOCK(cs_vSend) void SocketSendData(CNode *pnode) { - CDataStream& vSend = pnode->vSend; - if (vSend.empty()) - return; - - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); + std::deque::iterator it = pnode->vSendMsg.begin(); + + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; } } + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); + } + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } void ThreadSocketHandler(void* parg) @@ -776,7 +793,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -863,7 +880,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { // do not read, if draining write queue - if (!pnode->vSend.empty()) + if (!pnode->vSendMsg.empty()) FD_SET(pnode->hSocket, &fdsetSend); else FD_SET(pnode->hSocket, &fdsetRecv); @@ -1032,7 +1049,7 @@ void ThreadSocketHandler2(void* parg) // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { -- cgit v1.2.3