aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp238
1 files changed, 118 insertions, 120 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 37e7dfed4..4d0d781d6 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -342,8 +342,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- pnode->AddRef();
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return NULL;
}
}
@@ -369,18 +369,16 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
// In that case, drop the connection that was just created, and return the existing CNode instead.
// Also store the name we used to connect in that CNode, so that future FindNode() calls to that
// name catch this early.
+ LOCK(cs_vNodes);
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- pnode->AddRef();
- {
- LOCK(cs_vNodes);
- if (pnode->addrName.empty()) {
- pnode->addrName = std::string(pszDest);
- }
+ if (pnode->addrName.empty()) {
+ pnode->addrName = std::string(pszDest);
}
CloseSocket(hSocket);
- return pnode;
+ LogPrintf("Failed to open new connection, already connected\n");
+ return NULL;
}
}
@@ -391,13 +389,8 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo
uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize();
CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false);
pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices);
- pnode->nTimeConnected = GetTime();
+ pnode->nTimeConnected = GetSystemTimeInSeconds();
pnode->AddRef();
- GetNodeSignals().InitializeNode(pnode, *this);
- {
- LOCK(cs_vNodes);
- vNodes.push_back(pnode);
- }
return pnode;
} else if (!proxyConnectionFailed) {
@@ -437,11 +430,6 @@ void CNode::CloseSocketDisconnect()
LogPrint("net", "disconnecting peer=%d\n", id);
CloseSocket(hSocket);
}
-
- // in case this fails, we'll empty the recv buffer when the CNode is deleted
- TRY_LOCK(cs_vRecvMsg, lockRecv);
- if (lockRecv)
- vRecvMsg.clear();
}
void CConnman::ClearBanned()
@@ -650,16 +638,18 @@ void CNode::copyStats(CNodeStats &stats)
}
#undef X
-// requires LOCK(cs_vRecvMsg)
bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete)
{
complete = false;
+ int64_t nTimeMicros = GetTimeMicros();
+ nLastRecv = nTimeMicros / 1000000;
+ nRecvBytes += nBytes;
while (nBytes > 0) {
// get current incomplete message, or create a new one
if (vRecvMsg.empty() ||
vRecvMsg.back().complete())
- vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, nRecvVersion));
+ vRecvMsg.push_back(CNetMessage(Params().MessageStart(), SER_NETWORK, INIT_PROTO_VERSION));
CNetMessage& msg = vRecvMsg.back();
@@ -691,7 +681,7 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
assert(i != mapRecvBytesPerMsgCmd.end());
i->second += msg.hdr.nMessageSize + CMessageHeader::HEADER_SIZE;
- msg.nTime = GetTimeMicros();
+ msg.nTime = nTimeMicros;
complete = true;
}
}
@@ -699,6 +689,33 @@ bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes, bool& complete
return true;
}
+void CNode::SetSendVersion(int nVersionIn)
+{
+ // Send version may only be changed in the version message, and
+ // only one version message is allowed per session. We can therefore
+ // treat this value as const and even atomic as long as it's only used
+ // once a version message has been successfully processed. Any attempt to
+ // set this twice is an error.
+ if (nSendVersion != 0) {
+ error("Send version already set for node: %i. Refusing to change from %i to %i", id, nSendVersion, nVersionIn);
+ } else {
+ nSendVersion = nVersionIn;
+ }
+}
+
+int CNode::GetSendVersion() const
+{
+ // The send version should always be explicitly set to
+ // INIT_PROTO_VERSION rather than using this value until SetSendVersion
+ // has been called.
+ if (nSendVersion == 0) {
+ error("Requesting unset send version for node: %i. Using %i", id, INIT_PROTO_VERSION);
+ return INIT_PROTO_VERSION;
+ }
+ return nSendVersion;
+}
+
+
int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
{
// copy data to temporary parsing buffer
@@ -764,7 +781,7 @@ const uint256& CNetMessage::GetMessageHash() const
// requires LOCK(cs_vSend)
-size_t SocketSendData(CNode *pnode)
+size_t CConnman::SocketSendData(CNode *pnode) const
{
auto it = pnode->vSendMsg.begin();
size_t nSentSize = 0;
@@ -774,13 +791,14 @@ size_t SocketSendData(CNode *pnode)
assert(data.size() > pnode->nSendOffset);
int nBytes = send(pnode->hSocket, reinterpret_cast<const char*>(data.data()) + pnode->nSendOffset, data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
if (nBytes > 0) {
- pnode->nLastSend = GetTime();
+ pnode->nLastSend = GetSystemTimeInSeconds();
pnode->nSendBytes += nBytes;
pnode->nSendOffset += nBytes;
nSentSize += nBytes;
if (pnode->nSendOffset == data.size()) {
pnode->nSendOffset = 0;
pnode->nSendSize -= data.size();
+ pnode->fPauseSend = pnode->nSendSize > nSendBufferMaxSize;
it++;
} else {
// could not send full message; stop sending more
@@ -1052,8 +1070,7 @@ void CConnman::ThreadSocketHandler()
std::vector<CNode*> vNodesCopy = vNodes;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
- if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0))
+ if (pnode->fDisconnect)
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -1083,13 +1100,9 @@ void CConnman::ThreadSocketHandler()
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
TRY_LOCK(pnode->cs_inventory, lockInv);
if (lockInv)
fDelete = true;
- }
}
}
if (fDelete)
@@ -1149,29 +1162,19 @@ void CConnman::ThreadSocketHandler()
// write buffer in this case before receiving more. This avoids
// needlessly queueing received data, if the remote peer is not themselves
// receiving data. This means properly utilizing TCP flow control signalling.
- // * Otherwise, if there is no (complete) message in the receive buffer,
- // or there is space left in the buffer, select() for receiving data.
- // * (if neither of the above applies, there is certainly one message
- // in the receiver buffer ready to be processed).
- // Together, that means that at least one of the following is always possible,
- // so we don't deadlock:
- // * We send some data.
- // * We wait for data to be received (and disconnect after timeout).
- // * We process a message in the buffer (message handler thread).
+ // * Otherwise, if there is space left in the receive buffer, select() for
+ // receiving data.
+ // * Hand off all complete messages to the processor, to be handled without
+ // blocking here.
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend) {
- if (!pnode->vSendMsg.empty()) {
- FD_SET(pnode->hSocket, &fdsetSend);
- continue;
- }
+ LOCK(pnode->cs_vSend);
+ if (!pnode->vSendMsg.empty()) {
+ FD_SET(pnode->hSocket, &fdsetSend);
+ continue;
}
}
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv && (
- pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
- pnode->GetTotalRecvSize() <= GetReceiveFloodSize()))
+ if (!pnode->fPauseRecv)
FD_SET(pnode->hSocket, &fdsetRecv);
}
}
@@ -1230,8 +1233,6 @@ void CConnman::ThreadSocketHandler()
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
{
{
// typical socket buffer is 8K-64K
@@ -1242,11 +1243,23 @@ void CConnman::ThreadSocketHandler()
bool notify = false;
if (!pnode->ReceiveMsgBytes(pchBuf, nBytes, notify))
pnode->CloseSocketDisconnect();
- if(notify)
- condMsgProc.notify_one();
- pnode->nLastRecv = GetTime();
- pnode->nRecvBytes += nBytes;
RecordBytesRecv(nBytes);
+ if (notify) {
+ size_t nSizeAdded = 0;
+ auto it(pnode->vRecvMsg.begin());
+ for (; it != pnode->vRecvMsg.end(); ++it) {
+ if (!it->complete())
+ break;
+ nSizeAdded += it->vRecv.size() + CMessageHeader::HEADER_SIZE;
+ }
+ {
+ LOCK(pnode->cs_vProcessMsg);
+ pnode->vProcessMsg.splice(pnode->vProcessMsg.end(), pnode->vRecvMsg, pnode->vRecvMsg.begin(), it);
+ pnode->nProcessQueueSize += nSizeAdded;
+ pnode->fPauseRecv = pnode->nProcessQueueSize > nReceiveFloodSize;
+ }
+ WakeMessageHandler();
+ }
}
else if (nBytes == 0)
{
@@ -1277,18 +1290,17 @@ void CConnman::ThreadSocketHandler()
continue;
if (FD_ISSET(pnode->hSocket, &fdsetSend))
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend) {
- size_t nBytes = SocketSendData(pnode);
- if (nBytes)
- RecordBytesSent(nBytes);
+ LOCK(pnode->cs_vSend);
+ size_t nBytes = SocketSendData(pnode);
+ if (nBytes) {
+ RecordBytesSent(nBytes);
}
}
//
// Inactivity checking
//
- int64_t nTime = GetTime();
+ int64_t nTime = GetSystemTimeInSeconds();
if (nTime - pnode->nTimeConnected > 60)
{
if (pnode->nLastRecv == 0 || pnode->nLastSend == 0)
@@ -1321,8 +1333,14 @@ void CConnman::ThreadSocketHandler()
}
}
-
-
+void CConnman::WakeMessageHandler()
+{
+ {
+ std::lock_guard<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = true;
+ }
+ condMsgProc.notify_one();
+}
@@ -1842,6 +1860,12 @@ bool CConnman::OpenNetworkConnection(const CAddress& addrConnect, bool fCountFai
if (fAddnode)
pnode->fAddnode = true;
+ GetNodeSignals().InitializeNode(pnode, *this);
+ {
+ LOCK(cs_vNodes);
+ vNodes.push_back(pnode);
+ }
+
return true;
}
@@ -1858,7 +1882,7 @@ void CConnman::ThreadMessageHandler()
}
}
- bool fSleep = true;
+ bool fMoreWork = false;
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
@@ -1866,30 +1890,15 @@ void CConnman::ThreadMessageHandler()
continue;
// Receive messages
- {
- TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
- if (lockRecv)
- {
- if (!GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc))
- pnode->CloseSocketDisconnect();
-
- if (pnode->nSendSize < GetSendBufferSize())
- {
- if (!pnode->vRecvGetData.empty() || (!pnode->vRecvMsg.empty() && pnode->vRecvMsg[0].complete()))
- {
- fSleep = false;
- }
- }
- }
- }
+ bool fMoreNodeWork = GetNodeSignals().ProcessMessages(pnode, *this, flagInterruptMsgProc);
+ fMoreWork |= (fMoreNodeWork && !pnode->fPauseSend);
if (flagInterruptMsgProc)
return;
// Send messages
{
- TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend)
- GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
+ LOCK(pnode->cs_sendProcessing);
+ GetNodeSignals().SendMessages(pnode, *this, flagInterruptMsgProc);
}
if (flagInterruptMsgProc)
return;
@@ -1901,10 +1910,11 @@ void CConnman::ThreadMessageHandler()
pnode->Release();
}
- if (fSleep) {
- std::unique_lock<std::mutex> lock(mutexMsgProc);
- condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100));
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ if (!fMoreWork) {
+ condMsgProc.wait_until(lock, std::chrono::steady_clock::now() + std::chrono::milliseconds(100), [this] { return fMsgProcWake; });
}
+ fMsgProcWake = false;
}
}
@@ -2121,7 +2131,7 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
nMaxFeeler = connOptions.nMaxFeeler;
nSendBufferMaxSize = connOptions.nSendBufferMaxSize;
- nReceiveFloodSize = connOptions.nSendBufferMaxSize;
+ nReceiveFloodSize = connOptions.nReceiveFloodSize;
nMaxOutboundLimit = connOptions.nMaxOutboundLimit;
nMaxOutboundTimeframe = connOptions.nMaxOutboundTimeframe;
@@ -2182,6 +2192,11 @@ bool CConnman::Start(CScheduler& scheduler, std::string& strNodeError, Options c
interruptNet.reset();
flagInterruptMsgProc = false;
+ {
+ std::unique_lock<std::mutex> lock(mutexMsgProc);
+ fMsgProcWake = false;
+ }
+
// Send and receive from sockets, accept connections
threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
@@ -2382,26 +2397,9 @@ void CConnman::GetNodeStats(std::vector<CNodeStats>& vstats)
}
}
-bool CConnman::DisconnectAddress(const CNetAddr& netAddr)
-{
- if (CNode* pnode = FindNode(netAddr)) {
- pnode->fDisconnect = true;
- return true;
- }
- return false;
-}
-
-bool CConnman::DisconnectSubnet(const CSubNet& subNet)
-{
- if (CNode* pnode = FindNode(subNet)) {
- pnode->fDisconnect = true;
- return true;
- }
- return false;
-}
-
bool CConnman::DisconnectNode(const std::string& strNode)
{
+ LOCK(cs_vNodes);
if (CNode* pnode = FindNode(strNode)) {
pnode->fDisconnect = true;
return true;
@@ -2420,16 +2418,6 @@ bool CConnman::DisconnectNode(NodeId id)
return false;
}
-void CConnman::RelayTransaction(const CTransaction& tx)
-{
- CInv inv(MSG_TX, tx.GetHash());
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- {
- pnode->PushInventory(inv);
- }
-}
-
void CConnman::RecordBytesRecv(uint64_t bytes)
{
LOCK(cs_totalBytesRecv);
@@ -2576,7 +2564,7 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
nLastRecv = 0;
nSendBytes = 0;
nRecvBytes = 0;
- nTimeConnected = GetTime();
+ nTimeConnected = GetSystemTimeInSeconds();
nTimeOffset = 0;
addrName = addrNameIn == "" ? addr.ToStringIPPort() : addrNameIn;
nVersion = 0;
@@ -2613,6 +2601,9 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn
minFeeFilter = 0;
lastSentFeeFilter = 0;
nextSendTimeFeeFilter = 0;
+ fPauseRecv = false;
+ fPauseSend = false;
+ nProcessQueueSize = 0;
BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes())
mapRecvBytesPerMsgCmd[msg] = 0;
@@ -2666,6 +2657,11 @@ void CNode::AskFor(const CInv& inv)
mapAskFor.insert(std::make_pair(nRequestTime, inv));
}
+bool CConnman::NodeFullyConnected(const CNode* pnode)
+{
+ return pnode && pnode->fSuccessfullyConnected && !pnode->fDisconnect;
+}
+
void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
{
size_t nMessageSize = msg.data.size();
@@ -2692,6 +2688,8 @@ void CConnman::PushMessage(CNode* pnode, CSerializedNetMsg&& msg)
pnode->mapSendBytesPerMsgCmd[msg.command] += nTotalSize;
pnode->nSendSize += nTotalSize;
+ if (pnode->nSendSize > nSendBufferMaxSize)
+ pnode->fPauseSend = true;
pnode->vSendMsg.push_back(std::move(serializedHeader));
if (nMessageSize)
pnode->vSendMsg.push_back(std::move(msg.data));
@@ -2714,19 +2712,19 @@ bool CConnman::ForNode(NodeId id, std::function<bool(CNode* pnode)> func)
break;
}
}
- return found != nullptr && func(found);
+ return found != nullptr && NodeFullyConnected(found) && func(found);
}
int64_t PoissonNextSend(int64_t nNow, int average_interval_seconds) {
return nNow + (int64_t)(log1p(GetRand(1ULL << 48) * -0.0000000000000035527136788 /* -1/2^48 */) * average_interval_seconds * -1000000.0 + 0.5);
}
-CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id)
+CSipHasher CConnman::GetDeterministicRandomizer(uint64_t id) const
{
return CSipHasher(nSeed0, nSeed1).Write(id);
}
-uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad)
+uint64_t CConnman::CalculateKeyedNetGroup(const CAddress& ad) const
{
std::vector<unsigned char> vchNetGroup(ad.GetGroup());