From 3e32cd09f643bf7d4344d3bb2e1136f186f3d109 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Mon, 12 Sep 2016 20:00:33 -0400 Subject: connman is in charge of pushing messages The changes here are dense and subtle, but hopefully all is more explicit than before. - CConnman is now in charge of sending data rather than the nodes themselves. This is necessary because many decisions need to be made with all nodes in mind, and a model that requires the nodes calling up to their manager quickly turns to spaghetti. - The per-node-serializer (ssSend) has been replaced with a (quasi-)const send-version. Since the send version for serialization can only change once per connection, we now explicitly tag messages with INIT_PROTO_VERSION if they are sent before the handshake. With this done, there's no need to lock for access to nSendVersion. Also, a new stream is used for each message, so there's no need to lock during the serialization process. - This takes care of accounting for optimistic sends, so the nOptimisticBytesWritten hack can be removed. - -dropmessagestest and -fuzzmessagestest have not been preserved, as I suspect they haven't been used in years. --- src/net.cpp | 91 ++++++++++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 69 insertions(+), 22 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 18d25cbcd..f271aed24 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -394,6 +394,9 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false); + + PushVersion(pnode, GetTime()); + GetNodeSignals().InitializeNode(pnode->GetId(), pnode); pnode->AddRef(); @@ -415,6 +418,24 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo return NULL; } +void CConnman::PushVersion(CNode* pnode, int64_t nTime) +{ + ServiceFlags nLocalNodeServices = pnode->GetLocalServices(); + CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices)); + CAddress addrMe = CAddress(CService(), nLocalNodeServices); + uint64_t nonce = pnode->GetLocalNonce(); + int nNodeStartingHeight = pnode->nMyStartingHeight; + NodeId id = pnode->GetId(); + + PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe, + nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes); + + if (fLogIPs) + LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id); + else + LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id); +} + void CConnman::DumpBanlist() { SweepBanned(); // clean unused entries (if bantime has expired) @@ -450,23 +471,6 @@ void CNode::CloseSocketDisconnect() vRecvMsg.clear(); } -void CNode::PushVersion() -{ - int64_t nTime = (fInbound ? GetAdjustedTime() : GetTime()); - CAddress addrYou = (addr.IsRoutable() && !IsProxy(addr) ? addr : CAddress(CService(), addr.nServices)); - CAddress addrMe = CAddress(CService(), nLocalServices); - if (fLogIPs) - LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), addrYou.ToString(), id); - else - LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nMyStartingHeight, addrMe.ToString(), id); - PushMessage(NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalServices, nTime, addrYou, addrMe, - nLocalHostNonce, strSubVersion, nMyStartingHeight, ::fRelayTxes); -} - - - - - void CConnman::ClearBanned() { { @@ -2530,7 +2534,8 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn filterInventoryKnown(50000, 0.000001), nLocalHostNonce(nLocalHostNonceIn), nLocalServices(nLocalServicesIn), - nMyStartingHeight(nMyStartingHeightIn) + nMyStartingHeight(nMyStartingHeightIn), + nSendVersion(0) { nServices = NODE_NONE; nServicesExpected = NODE_NONE; @@ -2587,10 +2592,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn LogPrint("net", "Added connection to %s peer=%d\n", addrName, id); else LogPrint("net", "Added connection peer=%d\n", id); - - // Be shy and don't send version until we hear - if (hSocket != INVALID_SOCKET && !fInbound) - PushVersion(); } CNode::~CNode() @@ -2696,6 +2697,52 @@ void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend) LEAVE_CRITICAL_SECTION(cs_vSend); } +CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand) +{ + return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) }; +} + +void CConnman::EndMessage(CDataStream& strm) +{ + // Set the size + assert(strm.size () >= CMessageHeader::HEADER_SIZE); + unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE; + WriteLE32((uint8_t*)&strm[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize); + // Set the checksum + uint256 hash = Hash(strm.begin() + CMessageHeader::HEADER_SIZE, strm.end()); + memcpy((char*)&strm[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE); + +} + +void CConnman::PushMessage(CNode* pnode, CDataStream& strm, const std::string& sCommand) +{ + if(strm.empty()) + return; + + unsigned int nSize = strm.size() - CMessageHeader::HEADER_SIZE; + LogPrint("net", "sending %s (%d bytes) peer=%d\n", SanitizeString(sCommand.c_str()), nSize, pnode->id); + + size_t nBytesSent = 0; + { + LOCK(pnode->cs_vSend); + if(pnode->hSocket == INVALID_SOCKET) { + return; + } + bool optimisticSend(pnode->vSendMsg.empty()); + pnode->vSendMsg.emplace_back(strm.begin(), strm.end()); + + //log total amount of bytes per command + pnode->mapSendBytesPerMsgCmd[sCommand] += strm.size(); + pnode->nSendSize += strm.size(); + + // If write queue empty, attempt "optimistic write" + if (optimisticSend == true) + nBytesSent = SocketSendData(pnode); + } + if (nBytesSent) + RecordBytesSent(nBytesSent); +} + bool CConnman::ForNode(NodeId id, std::function func) { CNode* found = nullptr; -- cgit v1.2.3 From ea3326891d8c3dcbcff178b618108d657c5586a3 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Mon, 12 Sep 2016 20:07:44 -0400 Subject: net: switch all callers to connman for pushing messages Drop all of the old stuff. --- src/net.cpp | 61 ------------------------------------------------------------- 1 file changed, 61 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index f271aed24..2cc14a222 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -2636,67 +2636,6 @@ void CNode::AskFor(const CInv& inv) mapAskFor.insert(std::make_pair(nRequestTime, inv)); } -void CNode::BeginMessage(const char* pszCommand) EXCLUSIVE_LOCK_FUNCTION(cs_vSend) -{ - ENTER_CRITICAL_SECTION(cs_vSend); - assert(ssSend.size() == 0); - ssSend << CMessageHeader(Params().MessageStart(), pszCommand, 0); - LogPrint("net", "sending: %s ", SanitizeString(pszCommand)); -} - -void CNode::AbortMessage() UNLOCK_FUNCTION(cs_vSend) -{ - ssSend.clear(); - - LEAVE_CRITICAL_SECTION(cs_vSend); - - LogPrint("net", "(aborted)\n"); -} - -void CNode::EndMessage(const char* pszCommand) UNLOCK_FUNCTION(cs_vSend) -{ - // The -*messagestest options are intentionally not documented in the help message, - // since they are only used during development to debug the networking code and are - // not intended for end-users. - if (mapArgs.count("-dropmessagestest") && GetRand(GetArg("-dropmessagestest", 2)) == 0) - { - LogPrint("net", "dropmessages DROPPING SEND MESSAGE\n"); - AbortMessage(); - return; - } - if (mapArgs.count("-fuzzmessagestest")) - Fuzz(GetArg("-fuzzmessagestest", 10)); - - if (ssSend.size() == 0) - { - LEAVE_CRITICAL_SECTION(cs_vSend); - return; - } - // Set the size - unsigned int nSize = ssSend.size() - CMessageHeader::HEADER_SIZE; - WriteLE32((uint8_t*)&ssSend[CMessageHeader::MESSAGE_SIZE_OFFSET], nSize); - - //log total amount of bytes per command - mapSendBytesPerMsgCmd[std::string(pszCommand)] += nSize + CMessageHeader::HEADER_SIZE; - - // Set the checksum - uint256 hash = Hash(ssSend.begin() + CMessageHeader::HEADER_SIZE, ssSend.end()); - assert(ssSend.size () >= CMessageHeader::CHECKSUM_OFFSET + CMessageHeader::CHECKSUM_SIZE); - memcpy((char*)&ssSend[CMessageHeader::CHECKSUM_OFFSET], hash.begin(), CMessageHeader::CHECKSUM_SIZE); - - LogPrint("net", "(%d bytes) peer=%d\n", nSize, id); - - std::deque::iterator it = vSendMsg.insert(vSendMsg.end(), CSerializeData()); - ssSend.GetAndClear(*it); - nSendSize += (*it).size(); - - // If write queue empty, attempt "optimistic write" - if (it == vSendMsg.begin()) - nOptimisticBytesWritten += SocketSendData(this); - - LEAVE_CRITICAL_SECTION(cs_vSend); -} - CDataStream CConnman::BeginMessage(CNode* pnode, int nVersion, int flags, const std::string& sCommand) { return {SER_NETWORK, (nVersion ? nVersion : pnode->GetSendVersion()) | flags, CMessageHeader(Params().MessageStart(), sCommand.c_str(), 0) }; -- cgit v1.2.3 From 5c2169cc3f263b39ba42d66bcf014163fada2390 Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Mon, 12 Sep 2016 20:09:24 -0400 Subject: drop the optimistic write counter hack This is now handled properly in realtime. --- src/net.cpp | 5 ----- 1 file changed, 5 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 2cc14a222..2d6573f7f 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1167,10 +1167,6 @@ void CConnman::ThreadSocketHandler() { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - if (pnode->nOptimisticBytesWritten) { - RecordBytesSent(pnode->nOptimisticBytesWritten); - pnode->nOptimisticBytesWritten = 0; - } if (!pnode->vSendMsg.empty()) { FD_SET(pnode->hSocket, &fdsetSend); continue; @@ -2582,7 +2578,6 @@ CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn minFeeFilter = 0; lastSentFeeFilter = 0; nextSendTimeFeeFilter = 0; - nOptimisticBytesWritten = 0; BOOST_FOREACH(const std::string &msg, getAllNetMessageTypes()) mapRecvBytesPerMsgCmd[msg] = 0; -- cgit v1.2.3 From 440f1d3e4c60c9c3e0a1a74d3321b0a8e37a1e8d Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Fri, 30 Sep 2016 15:03:57 -0400 Subject: net: remove now-unused ssSend and Fuzz --- src/net.cpp | 38 +------------------------------------- 1 file changed, 1 insertion(+), 37 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 2d6573f7f..16d9527ee 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -1063,7 +1063,7 @@ void CConnman::ThreadSocketHandler() BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0)) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -2482,46 +2482,10 @@ int CConnman::GetBestHeight() const return nBestHeight.load(std::memory_order_acquire); } -void CNode::Fuzz(int nChance) -{ - if (!fSuccessfullyConnected) return; // Don't fuzz initial handshake - if (GetRand(nChance) != 0) return; // Fuzz 1 of every nChance messages - - switch (GetRand(3)) - { - case 0: - // xor a random byte with a random value: - if (!ssSend.empty()) { - CDataStream::size_type pos = GetRand(ssSend.size()); - ssSend[pos] ^= (unsigned char)(GetRand(256)); - } - break; - case 1: - // delete a random byte: - if (!ssSend.empty()) { - CDataStream::size_type pos = GetRand(ssSend.size()); - ssSend.erase(ssSend.begin()+pos); - } - break; - case 2: - // insert a random byte at a random position - { - CDataStream::size_type pos = GetRand(ssSend.size()); - char ch = (char)GetRand(256); - ssSend.insert(ssSend.begin()+pos, ch); - } - break; - } - // Chance of more than one change half the time: - // (more changes exponentially less likely): - Fuzz(2); -} - unsigned int CConnman::GetReceiveFloodSize() const { return nReceiveFloodSize; } unsigned int CConnman::GetSendBufferSize() const{ return nSendBufferMaxSize; } CNode::CNode(NodeId idIn, ServiceFlags nLocalServicesIn, int nMyStartingHeightIn, SOCKET hSocketIn, const CAddress& addrIn, uint64_t nKeyedNetGroupIn, uint64_t nLocalHostNonceIn, const std::string& addrNameIn, bool fInboundIn) : - ssSend(SER_NETWORK, INIT_PROTO_VERSION), addr(addrIn), fInbound(fInboundIn), id(idIn), -- cgit v1.2.3 From 902768099cb92b39f5ff509cd91fdd8970759b8a Mon Sep 17 00:00:00 2001 From: Cory Fields Date: Wed, 26 Oct 2016 18:08:11 -0400 Subject: net: handle version push in InitializeNode --- src/net.cpp | 34 +++++----------------------------- 1 file changed, 5 insertions(+), 29 deletions(-) (limited to 'src/net.cpp') diff --git a/src/net.cpp b/src/net.cpp index 16d9527ee..15cf7ce8b 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -393,21 +393,15 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo NodeId id = GetNewNodeId(); uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addrConnect, CalculateKeyedNetGroup(addrConnect), nonce, pszDest ? pszDest : "", false); - - - PushVersion(pnode, GetTime()); - - GetNodeSignals().InitializeNode(pnode->GetId(), pnode); + pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices); + pnode->nTimeConnected = GetTime(); pnode->AddRef(); - + GetNodeSignals().InitializeNode(pnode, *this); { LOCK(cs_vNodes); vNodes.push_back(pnode); } - pnode->nServicesExpected = ServiceFlags(addrConnect.nServices & nRelevantServices); - pnode->nTimeConnected = GetTime(); - return pnode; } else if (!proxyConnectionFailed) { // If connecting to the node failed, and failure is not caused by a problem connecting to @@ -418,24 +412,6 @@ CNode* CConnman::ConnectNode(CAddress addrConnect, const char *pszDest, bool fCo return NULL; } -void CConnman::PushVersion(CNode* pnode, int64_t nTime) -{ - ServiceFlags nLocalNodeServices = pnode->GetLocalServices(); - CAddress addrYou = (pnode->addr.IsRoutable() && !IsProxy(pnode->addr) ? pnode->addr : CAddress(CService(), pnode->addr.nServices)); - CAddress addrMe = CAddress(CService(), nLocalNodeServices); - uint64_t nonce = pnode->GetLocalNonce(); - int nNodeStartingHeight = pnode->nMyStartingHeight; - NodeId id = pnode->GetId(); - - PushMessageWithVersion(pnode, INIT_PROTO_VERSION, NetMsgType::VERSION, PROTOCOL_VERSION, (uint64_t)nLocalNodeServices, nTime, addrYou, addrMe, - nonce, strSubVersion, nNodeStartingHeight, ::fRelayTxes); - - if (fLogIPs) - LogPrint("net", "send version message: version %d, blocks=%d, us=%s, them=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), addrYou.ToString(), id); - else - LogPrint("net", "send version message: version %d, blocks=%d, us=%s, peer=%d\n", PROTOCOL_VERSION, nNodeStartingHeight, addrMe.ToString(), id); -} - void CConnman::DumpBanlist() { SweepBanned(); // clean unused entries (if bantime has expired) @@ -1036,9 +1012,9 @@ void CConnman::AcceptConnection(const ListenSocket& hListenSocket) { uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); CNode* pnode = new CNode(id, nLocalServices, GetBestHeight(), hSocket, addr, CalculateKeyedNetGroup(addr), nonce, "", true); - GetNodeSignals().InitializeNode(pnode->GetId(), pnode); pnode->AddRef(); pnode->fWhitelisted = whitelisted; + GetNodeSignals().InitializeNode(pnode, *this); LogPrint("net", "connection from %s accepted\n", addr.ToString()); @@ -2130,7 +2106,7 @@ bool CConnman::Start(boost::thread_group& threadGroup, CScheduler& scheduler, st uint64_t nonce = GetDeterministicRandomizer(RANDOMIZER_ID_LOCALHOSTNONCE).Write(id).Finalize(); pnodeLocalHost = new CNode(id, nLocalServices, GetBestHeight(), INVALID_SOCKET, CAddress(CService(local, 0), nLocalServices), 0, nonce); - GetNodeSignals().InitializeNode(pnodeLocalHost->GetId(), pnodeLocalHost); + GetNodeSignals().InitializeNode(pnodeLocalHost, *this); } // -- cgit v1.2.3