diff options
Diffstat (limited to 'src/net.cpp')
| -rw-r--r-- | src/net.cpp | 238 |
1 files changed, 118 insertions, 120 deletions
diff --git a/src/net.cpp b/src/net.cpp index 37e7dfed4..4d0d781d6 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -342,8 +342,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo CNode* pnode = FindNode((CService)addrConnect); if (pnode) { - pnode->AddRef(); - return pnode; + LogPrintf("Failed to open new connection, already connected\n"); + return NULL; } } @@ -369,18 +369,16 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo // In that case, drop the connection that was just created, and return the existing CNode instead. // Also store the name we used to connect in that CNode, so that future FindNode() calls to that // name catch this early. + LOCK(cs_vNodes); CNode* pnode = FindNode((CService)addrConnect); if (pnode) { - pnode->AddRef(); - { - LOCK(cs_vNodes); - if (pnode->addrName.empty()) { - pnode->addrName = std::string(pszDest); - } + if (pnode->addrName.empty()) { + pnode->addrName = std::string(pszDest); } CloseSocket(hSocket); - return pnode; + LogPrintf("Failed to open new connection, already connected\n"); + return NULL; } } @@ -391,13 +389,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false); pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices); - pnode->nTimeConnected = GetTime(); + pnode->nTimeConnected = GetSystemTimeInSeconds(); pnode->AddRef(); - GetNodeSignals().InitializeNode(pnode, *this); - { - LOCK(cs_vNodes); - vNodes.push_back(pnode); - } return pnode; } else if (!proxyConnectionFailed) { @@ -437,11 +430,6 @@ void CNode::CloseSocketDisconnect() LogPrint("net", "disconnecting peer=%d\n", id); CloseSocket(hSocket); } - - // in case this fails, we'll empty the recv buffer when the CNode is deleted - TRY_LOCK(cs_vRecvMsg, lockRecv); - if (lockRecv) - vRecvMsg.clear(); } void CConnman::ClearBanned() @@ -650,16 +638,18 @@ void CNode::copyStats(CNodeStats &stats) } #undef X -// requires LOCK(cs_vRecvMsg) bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete) { complete = false; + int64_t nTimeMicros = GetTimeMicros(); + nLastRecv = nTimeMicros / 1000000; + nRecvBytes += nBytes; while (nBytes > 0) { // get current incomplete message, or create a new one if (vRecvMsg.empty() || vRecvMsg.back().complete()) - vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion)); + vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION)); CNetMessage& msg = vRecvMsg.back(); @@ -691,7 +681,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete assert(i != mapRecvBytesPerMsgCmd.end()); i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE; - msg.nTime = GetTimeMicros(); + msg.nTime = nTimeMicros; complete = true; } } @@ -699,6 +689,33 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete return true; } +void CNode::SetSendVersion(int nVersionIn) +{ + // Send version may only be changed in the version message, and + // only one version message is allowed per session. We can therefore + // treat this value as const and even atomic as long as it's only used + // once a version message has been successfully processed. Any attempt to + // set this twice is an error. + if (nSendVersion != 0) { + error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn); + } else { + nSendVersion = nVersionIn; + } +} + +int CNode::GetSendVersion() const +{ + // The send version should always be explicitly set to + // INIT_PROTO_VERSION rather than using this value until SetSendVersion + // has been called. + if (nSendVersion == 0) { + error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION); + return INIT_PROTO_VERSION; + } + return nSendVersion; +} + + int CNetMessage::readHeader(const char *pch, unsigned int nBytes) { // copy data to temporary parsing buffer @@ -764,7 +781,7 @@ const uint256& CNetMessage::GetMessageHash() const // requires LOCK(cs_vSend) -size_t SocketSendData(CNode *pnode) +size_t CConnman::SocketSendData(CNode *pnode) const { auto it = pnode->vSendMsg.begin(); size_t nSentSize = 0; @@ -774,13 +791,14 @@ size_t SocketSendData(CNode *pnode) assert(data.size() > pnode->nSendOffset); int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); if (nBytes > 0) { - pnode->nLastSend = GetTime(); + pnode->nLastSend = GetSystemTimeInSeconds(); pnode->nSendBytes += nBytes; pnode->nSendOffset += nBytes; nSentSize += nBytes; if (pnode->nSendOffset == data.size()) { pnode->nSendOffset = 0; pnode->nSendSize -= data.size(); + pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize; it++; } else { // could not send full message; stop sending more @@ -1052,8 +1070,7 @@ void CConnman::ThreadSocketHandler() std::vector<CNode*> vNodesCopy = vNodes; BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) + if (pnode->fDisconnect) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -1083,13 +1100,9 @@ void CConnman::ThreadSocketHandler() TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { TRY_LOCK(pnode->cs_inventory, lockInv); if (lockInv) fDelete = true; - } } } if (fDelete) @@ -1149,29 +1162,19 @@ void CConnman::ThreadSocketHandler() // write buffer in this case before receiving more. This avoids // needlessly queueing received data, if the remote peer is not themselves // receiving data. This means properly utilizing TCP flow control signalling. - // * Otherwise, if there is no (complete) message in the receive buffer, - // or there is space left in the buffer, select() for receiving data. - // * (if neither of the above applies, there is certainly one message - // in the receiver buffer ready to be processed). - // Together, that means that at least one of the following is always possible, - // so we don't deadlock: - // * We send some data. - // * We wait for data to be received (and disconnect after timeout). - // * We process a message in the buffer (message handler thread). + // * Otherwise, if there is space left in the receive buffer, select() for + // receiving data. + // * Hand off all complete messages to the processor, to be handled without + // blocking here. { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) { - if (!pnode->vSendMsg.empty()) { - FD_SET(pnode->hSocket, &fdsetSend); - continue; - } + LOCK(pnode->cs_vSend); + if (!pnode->vSendMsg.empty()) { + FD_SET(pnode->hSocket, &fdsetSend); + continue; } } { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv && ( - pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || - pnode->GetTotalRecvSize() <= GetReceiveFloodSize())) + if (!pnode->fPauseRecv) FD_SET(pnode->hSocket, &fdsetRecv); } } @@ -1230,8 +1233,6 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) { { // typical socket buffer is 8K-64K @@ -1242,11 +1243,23 @@ void CConnman::ThreadSocketHandler() bool notify = false; if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify)) pnode->CloseSocketDisconnect(); - if(notify) - condMsgProc.notify_one(); - pnode->nLastRecv = GetTime(); - pnode->nRecvBytes += nBytes; RecordBytesRecv(nBytes); + if (notify) { + size_t nSizeAdded = 0; + auto it(pnode->vRecvMsg.begin()); + for (; it != pnode->vRecvMsg.end(); ++it) { + if (!it->complete()) + break; + nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE; + } + { + LOCK(pnode->cs_vProcessMsg); + pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it); + pnode->nProcessQueueSize += nSizeAdded; + pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize; + } + WakeMessageHandler(); + } } else if (nBytes == 0) { @@ -1277,18 +1290,17 @@ void CConnman::ThreadSocketHandler() continue; if (FD_ISSET(pnode->hSocket, &fdsetSend)) { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) { - size_t nBytes = SocketSendData(pnode); - if (nBytes) - RecordBytesSent(nBytes); + LOCK(pnode->cs_vSend); + size_t nBytes = SocketSendData(pnode); + if (nBytes) { + RecordBytesSent(nBytes); } } // // Inactivity checking // - int64_t nTime = GetTime(); + int64_t nTime = GetSystemTimeInSeconds(); if (nTime - pnode->nTimeConnected > 60) { if (pnode->nLastRecv == 0 || pnode->nLastSend == 0) @@ -1321,8 +1333,14 @@ void CConnman::ThreadSocketHandler() } } - - +void CConnman::WakeMessageHandler() +{ + { + std::lock_guard<std::mutex> lock(mutexMsgProc); + fMsgProcWake = true; + } + condMsgProc.notify_one(); +} @@ -1842,6 +1860,12 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai if (fAddnode) pnode->fAddnode = true; + GetNodeSignals().InitializeNode(pnode, *this); + { + LOCK(cs_vNodes); + vNodes.push_back(pnode); + } + return true; } @@ -1858,7 +1882,7 @@ void CConnman::ThreadMessageHandler() } } - bool fSleep = true; + bool fMoreWork = false; BOOST_FOREACH(CNode* pnode, vNodesCopy) { @@ -1866,30 +1890,15 @@ void CConnman::ThreadMessageHandler() continue; // Receive messages - { - TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); - if (lockRecv) - { - if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc)) - pnode->CloseSocketDisconnect(); - - if (pnode->nSendSize < GetSendBufferSize()) - { - if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete())) - { - fSleep = false; - } - } - } - } + bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc); + fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend); if (flagInterruptMsgProc) return; // Send messages { - TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend) - GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); + LOCK(pnode->cs_sendProcessing); + GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc); } if (flagInterruptMsgProc) return; @@ -1901,10 +1910,11 @@ void CConnman::ThreadMessageHandler() pnode->Release(); } - if (fSleep) { - std::unique_lock<std::mutex> lock(mutexMsgProc); - condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100)); + std::unique_lock<std::mutex> lock(mutexMsgProc); + if (!fMoreWork) { + condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; }); } + fMsgProcWake = false; } } @@ -2121,7 +2131,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c nMaxFeeler = connOptions.nMaxFeeler; nSendBufferMaxSize = connOptions.nSendBufferMaxSize; - nReceiveFloodSize = connOptions.nSendBufferMaxSize; + nReceiveFloodSize = connOptions.nReceiveFloodSize; nMaxOutboundLimit = connOptions.nMaxOutboundLimit; nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe; @@ -2182,6 +2192,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c interruptNet.reset(); flagInterruptMsgProc = false; + { + std::unique_lock<std::mutex> lock(mutexMsgProc); + fMsgProcWake = false; + } + // Send and receive from sockets, accept connections threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this))); @@ -2382,26 +2397,9 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats) } } -bool CConnman::DisconnectAddress(const CNetAddr& netAddr) -{ - if (CNode* pnode = FindNode(netAddr)) { - pnode->fDisconnect = true; - return true; - } - return false; -} - -bool CConnman::DisconnectSubnet(const CSubNet& subNet) -{ - if (CNode* pnode = FindNode(subNet)) { - pnode->fDisconnect = true; - return true; - } - return false; -} - bool CConnman::DisconnectNode(const std::string& strNode) { + LOCK(cs_vNodes); if (CNode* pnode = FindNode(strNode)) { pnode->fDisconnect = true; return true; @@ -2420,16 +2418,6 @@ bool CConnman::DisconnectNode(NodeId id) return false; } -void CConnman::RelayTransaction(const CTransaction& tx) -{ - CInv inv(MSG_TX, tx.GetHash()); - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - { - pnode->PushInventory(inv); - } -} - void CConnman::RecordBytesRecv(uint64_t bytes) { LOCK(cs_totalBytesRecv); @@ -2576,7 +2564,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn nLastRecv = 0; nSendBytes = 0; nRecvBytes = 0; - nTimeConnected = GetTime(); + nTimeConnected = GetSystemTimeInSeconds(); nTimeOffset = 0; addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn; nVersion = 0; @@ -2613,6 +2601,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; + fPauseRecv = false; + fPauseSend = false; + nProcessQueueSize = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; @@ -2666,6 +2657,11 @@ void CNode::AskFor(const CInv& inv) mapAskFor.insert(std::make_pair(nRequestTime, inv)); } +bool CConnman::NodeFullyConnected(const CNode* pnode) +{ + return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect; +} + void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) { size_t nMessageSize = msg.data.size(); @@ -2692,6 +2688,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg) pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize; pnode->nSendSize += nTotalSize; + if (pnode->nSendSize > nSendBufferMaxSize) + pnode->fPauseSend = true; pnode->vSendMsg.push_back(std::move(serializedHeader)); if (nMessageSize) pnode->vSendMsg.push_back(std::move(msg.data)); @@ -2714,19 +2712,19 @@ bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func) break; } } - return found != nullptr && func(found); + return found != nullptr && NodeFullyConnected(found) && func(found); } int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) { return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5); } -CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) +CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const { return CSipHasher(nSeed0, nSeed1).Write(id); } -uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) +uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const { std::vector<unsigned char> vchNetGroup(ad.GetGroup()); |