diff options
Diffstat (limited to 'src/net.cpp')
| -rw-r--r-- | src/net.cpp | 722 |
1 files changed, 306 insertions, 416 deletions
diff --git a/src/net.cpp b/src/net.cpp index 3406a28b0..54ed1d9b5 100644 --- a/src/net.cpp +++ b/src/net.cpp @@ -3,7 +3,6 @@ // Distributed under the MIT/X11 software license, see the accompanying // file COPYING or http://www.opensource.org/licenses/mit-license.php. -#include "irc.h" #include "db.h" #include "net.h" #include "init.h" @@ -27,14 +26,6 @@ using namespace boost; static const int MAX_OUTBOUND_CONNECTIONS = 8; -void ThreadMessageHandler2(void* parg); -void ThreadSocketHandler2(void* parg); -void ThreadOpenConnections2(void* parg); -void ThreadOpenAddedConnections2(void* parg); -#ifdef USE_UPNP -void ThreadMapPort2(void* parg); -#endif -void ThreadDNSAddressSeed2(void* parg); bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false); @@ -47,24 +38,24 @@ struct LocalServiceInfo { // Global state variables // bool fDiscover = true; -bool fUseUPnP = false; uint64 nLocalServices = NODE_NETWORK; static CCriticalSection cs_mapLocalHost; static map<CNetAddr, LocalServiceInfo> mapLocalHost; static bool vfReachable[NET_MAX] = {}; static bool vfLimited[NET_MAX] = {}; static CNode* pnodeLocalHost = NULL; +static CNode* pnodeSync = NULL; uint64 nLocalHostNonce = 0; -array<int, THREAD_MAX> vnThreadsRunning; static std::vector<SOCKET> vhListenSocket; CAddrMan addrman; +int nMaxConnections = 125; vector<CNode*> vNodes; CCriticalSection cs_vNodes; map<CInv, CDataStream> mapRelay; deque<pair<int64, CInv> > vRelayExpiration; CCriticalSection cs_mapRelay; -map<CInv, int64> mapAlreadyAskedFor; +limitedmap<CInv, int64> mapAlreadyAskedFor(MAX_INV_SZ); static deque<string> vOneShots; CCriticalSection cs_vOneShots; @@ -157,8 +148,7 @@ bool RecvLine(SOCKET hSocket, string& strLine) } else if (nBytes <= 0) { - if (fShutdown) - return false; + boost::this_thread::interruption_point(); if (nBytes < 0) { int nErr = WSAGetLastError(); @@ -166,7 +156,7 @@ bool RecvLine(SOCKET hSocket, string& strLine) continue; if (nErr == WSAEWOULDBLOCK || nErr == WSAEINTR || nErr == WSAEINPROGRESS) { - Sleep(10); + MilliSleep(10); continue; } } @@ -347,7 +337,6 @@ bool GetMyExternalIP2(const CService& addrConnect, const char* pszGet, const cha return error("GetMyExternalIP() : connection closed"); } -// We now get our external IP from the IRC server first and only use this as a backup bool GetMyExternalIP(CNetAddr& ipRet) { CService addrConnect; @@ -357,13 +346,13 @@ bool GetMyExternalIP(CNetAddr& ipRet) for (int nLookup = 0; nLookup <= 1; nLookup++) for (int nHost = 1; nHost <= 2; nHost++) { - // We should be phasing out our use of sites like these. If we need + // We should be phasing out our use of sites like these. If we need // replacements, we should ask for volunteers to put this simple // php file on their web server that prints the client IP: // <?php echo $_SERVER["REMOTE_ADDR"]; ?> if (nHost == 1) { - addrConnect = CService("91.198.22.70",80); // checkip.dyndns.org + addrConnect = CService("91.198.22.70", 80); // checkip.dyndns.org if (nLookup == 1) { @@ -437,12 +426,10 @@ void AddressCurrentlyConnected(const CService& addr) CNode* FindNode(const CNetAddr& ip) { - { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if ((CNetAddr)pnode->addr == ip) - return (pnode); - } + LOCK(cs_vNodes); + BOOST_FOREACH(CNode* pnode, vNodes) + if ((CNetAddr)pnode->addr == ip) + return (pnode); return NULL; } @@ -457,16 +444,14 @@ CNode* FindNode(std::string addrName) CNode* FindNode(const CService& addr) { - { - LOCK(cs_vNodes); - BOOST_FOREACH(CNode* pnode, vNodes) - if ((CService)pnode->addr == addr) - return (pnode); - } + LOCK(cs_vNodes); + BOOST_FOREACH(CNode* pnode, vNodes) + if ((CService)pnode->addr == addr) + return (pnode); return NULL; } -CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout) +CNode* ConnectNode(CAddress addrConnect, const char *pszDest) { if (pszDest == NULL) { if (IsLocal(addrConnect)) @@ -476,10 +461,7 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout) CNode* pnode = FindNode((CService)addrConnect); if (pnode) { - if (nTimeout != 0) - pnode->AddRef(nTimeout); - else - pnode->AddRef(); + pnode->AddRef(); return pnode; } } @@ -511,10 +493,7 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout) // Add node CNode* pnode = new CNode(hSocket, addrConnect, pszDest ? pszDest : "", false); - if (nTimeout != 0) - pnode->AddRef(nTimeout); - else - pnode->AddRef(); + pnode->AddRef(); { LOCK(cs_vNodes); @@ -538,8 +517,16 @@ void CNode::CloseSocketDisconnect() printf("disconnecting node %s\n", addrName.c_str()); closesocket(hSocket); hSocket = INVALID_SOCKET; - vRecv.clear(); } + + // in case this fails, we'll empty the recv buffer when the CNode is deleted + TRY_LOCK(cs_vRecvMsg, lockRecv); + if (lockRecv) + vRecvMsg.clear(); + + // if this was the sync node, we'll need a new one + if (this == pnodeSync) + pnodeSync = NULL; } void CNode::Cleanup() @@ -624,48 +611,142 @@ void CNode::copyStats(CNodeStats &stats) X(nVersion); X(strSubVer); X(fInbound); - X(nReleaseTime); X(nStartingHeight); X(nMisbehavior); + X(nSendBytes); + X(nRecvBytes); + stats.fSyncNode = (this == pnodeSync); } #undef X +// requires LOCK(cs_vRecvMsg) +bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes) +{ + while (nBytes > 0) { + + // get current incomplete message, or create a new one + if (vRecvMsg.empty() || + vRecvMsg.back().complete()) + vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion)); + + CNetMessage& msg = vRecvMsg.back(); + // absorb network data + int handled; + if (!msg.in_data) + handled = msg.readHeader(pch, nBytes); + else + handled = msg.readData(pch, nBytes); + if (handled < 0) + return false; + pch += handled; + nBytes -= handled; + } + return true; +} +int CNetMessage::readHeader(const char *pch, unsigned int nBytes) +{ + // copy data to temporary parsing buffer + unsigned int nRemaining = 24 - nHdrPos; + unsigned int nCopy = std::min(nRemaining, nBytes); + memcpy(&hdrbuf[nHdrPos], pch, nCopy); + nHdrPos += nCopy; + // if header incomplete, exit + if (nHdrPos < 24) + return nCopy; + // deserialize to CMessageHeader + try { + hdrbuf >> hdr; + } + catch (std::exception &e) { + return -1; + } + + // reject messages larger than MAX_SIZE + if (hdr.nMessageSize > MAX_SIZE) + return -1; + + // switch state to reading message data + in_data = true; + vRecv.resize(hdr.nMessageSize); -void ThreadSocketHandler(void* parg) + return nCopy; +} + +int CNetMessage::readData(const char *pch, unsigned int nBytes) { - // Make this thread recognisable as the networking thread - RenameThread("bitcoin-net"); + unsigned int nRemaining = hdr.nMessageSize - nDataPos; + unsigned int nCopy = std::min(nRemaining, nBytes); - try - { - vnThreadsRunning[THREAD_SOCKETHANDLER]++; - ThreadSocketHandler2(parg); - vnThreadsRunning[THREAD_SOCKETHANDLER]--; + memcpy(&vRecv[nDataPos], pch, nCopy); + nDataPos += nCopy; + + return nCopy; +} + + + + + + + + + +// requires LOCK(cs_vSend) +void SocketSendData(CNode *pnode) +{ + std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin(); + + while (it != pnode->vSendMsg.end()) { + const CSerializeData &data = *it; + assert(data.size() > pnode->nSendOffset); + int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT); + if (nBytes > 0) { + pnode->nLastSend = GetTime(); + pnode->nSendBytes += nBytes; + pnode->nSendOffset += nBytes; + if (pnode->nSendOffset == data.size()) { + pnode->nSendOffset = 0; + pnode->nSendSize -= data.size(); + it++; + } else { + // could not send full message; stop sending more + break; + } + } else { + if (nBytes < 0) { + // error + int nErr = WSAGetLastError(); + if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) + { + printf("socket send error %d\n", nErr); + pnode->CloseSocketDisconnect(); + } + } + // couldn't send anything at all + break; + } } - catch (std::exception& e) { - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - PrintException(&e, "ThreadSocketHandler()"); - } catch (...) { - vnThreadsRunning[THREAD_SOCKETHANDLER]--; - throw; // support pthread_cancel() + + if (it == pnode->vSendMsg.end()) { + assert(pnode->nSendOffset == 0); + assert(pnode->nSendSize == 0); } - printf("ThreadSocketHandler exited\n"); + pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it); } -void ThreadSocketHandler2(void* parg) +static list<CNode*> vNodesDisconnected; + +void ThreadSocketHandler() { - printf("ThreadSocketHandler started\n"); - list<CNode*> vNodesDisconnected; unsigned int nPrevNodeCount = 0; - loop { // @@ -678,7 +759,7 @@ void ThreadSocketHandler2(void* parg) BOOST_FOREACH(CNode* pnode, vNodesCopy) { if (pnode->fDisconnect || - (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty())) + (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty())) { // remove from vNodes vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end()); @@ -691,7 +772,6 @@ void ThreadSocketHandler2(void* parg) pnode->Cleanup(); // hold in disconnected pool until all refs are released - pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 15 * 60); if (pnode->fNetworkNode || pnode->fInbound) pnode->Release(); vNodesDisconnected.push_back(pnode); @@ -710,7 +790,7 @@ void ThreadSocketHandler2(void* parg) TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { TRY_LOCK(pnode->cs_inventory, lockInv); @@ -761,24 +841,46 @@ void ThreadSocketHandler2(void* parg) { if (pnode->hSocket == INVALID_SOCKET) continue; - FD_SET(pnode->hSocket, &fdsetRecv); FD_SET(pnode->hSocket, &fdsetError); hSocketMax = max(hSocketMax, pnode->hSocket); have_fds = true; + + // Implement the following logic: + // * If there is data to send, select() for sending data. As this only + // happens when optimistic write failed, we choose to first drain the + // write buffer in this case before receiving more. This avoids + // needlessly queueing received data, if the remote peer is not themselves + // receiving data. This means properly utilizing TCP flow control signalling. + // * Otherwise, if there is no (complete) message in the receive buffer, + // or there is space left in the buffer, select() for receiving data. + // * (if neither of the above applies, there is certainly one message + // in the receiver buffer ready to be processed). + // Together, that means that at least one of the following is always possible, + // so we don't deadlock: + // * We send some data. + // * We wait for data to be received (and disconnect after timeout). + // * We process a message in the buffer (message handler thread). { TRY_LOCK(pnode->cs_vSend, lockSend); - if (lockSend && !pnode->vSend.empty()) + if (lockSend && !pnode->vSendMsg.empty()) { FD_SET(pnode->hSocket, &fdsetSend); + continue; + } + } + { + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); + if (lockRecv && ( + pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() || + pnode->GetTotalRecvSize() <= ReceiveFloodSize())) + FD_SET(pnode->hSocket, &fdsetRecv); } } } - vnThreadsRunning[THREAD_SOCKETHANDLER]--; int nSelect = select(have_fds ? hSocketMax + 1 : 0, &fdsetRecv, &fdsetSend, &fdsetError, &timeout); - vnThreadsRunning[THREAD_SOCKETHANDLER]++; - if (fShutdown) - return; + boost::this_thread::interruption_point(); + if (nSelect == SOCKET_ERROR) { if (have_fds) @@ -790,7 +892,7 @@ void ThreadSocketHandler2(void* parg) } FD_ZERO(&fdsetSend); FD_ZERO(&fdsetError); - Sleep(timeout.tv_usec/1000); + MilliSleep(timeout.tv_usec/1000); } @@ -827,7 +929,7 @@ void ThreadSocketHandler2(void* parg) if (nErr != WSAEWOULDBLOCK) printf("socket error accept failed: %d\n", nErr); } - else if (nInbound >= GetArg("-maxconnections", 125) - MAX_OUTBOUND_CONNECTIONS) + else if (nInbound >= nMaxConnections - MAX_OUTBOUND_CONNECTIONS) { { LOCK(cs_setservAddNodeAddresses); @@ -865,8 +967,7 @@ void ThreadSocketHandler2(void* parg) } BOOST_FOREACH(CNode* pnode, vNodesCopy) { - if (fShutdown) - return; + boost::this_thread::interruption_point(); // // Receive @@ -875,26 +976,19 @@ void ThreadSocketHandler2(void* parg) continue; if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError)) { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) { - CDataStream& vRecv = pnode->vRecv; - unsigned int nPos = vRecv.size(); - - if (nPos > ReceiveBufferSize()) { - if (!pnode->fDisconnect) - printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size()); - pnode->CloseSocketDisconnect(); - } - else { + { // typical socket buffer is 8K-64K char pchBuf[0x10000]; int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT); if (nBytes > 0) { - vRecv.resize(nPos + nBytes); - memcpy(&vRecv[nPos], pchBuf, nBytes); + if (!pnode->ReceiveMsgBytes(pchBuf, nBytes)) + pnode->CloseSocketDisconnect(); pnode->nLastRecv = GetTime(); + pnode->nRecvBytes += nBytes; } else if (nBytes == 0) { @@ -927,34 +1021,13 @@ void ThreadSocketHandler2(void* parg) { TRY_LOCK(pnode->cs_vSend, lockSend); if (lockSend) - { - CDataStream& vSend = pnode->vSend; - if (!vSend.empty()) - { - int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT); - if (nBytes > 0) - { - vSend.erase(vSend.begin(), vSend.begin() + nBytes); - pnode->nLastSend = GetTime(); - } - else if (nBytes < 0) - { - // error - int nErr = WSAGetLastError(); - if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS) - { - printf("socket send error %d\n", nErr); - pnode->CloseSocketDisconnect(); - } - } - } - } + SocketSendData(pnode); } // // Inactivity checking // - if (pnode->vSend.empty()) + if (pnode->vSendMsg.empty()) pnode->nLastSendEmpty = GetTime(); if (GetTime() - pnode->nTimeConnected > 60) { @@ -981,7 +1054,7 @@ void ThreadSocketHandler2(void* parg) pnode->Release(); } - Sleep(10); + MilliSleep(10); } } @@ -994,31 +1067,8 @@ void ThreadSocketHandler2(void* parg) #ifdef USE_UPNP -void ThreadMapPort(void* parg) -{ - // Make this thread recognisable as the UPnP thread - RenameThread("bitcoin-UPnP"); - - try - { - vnThreadsRunning[THREAD_UPNP]++; - ThreadMapPort2(parg); - vnThreadsRunning[THREAD_UPNP]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_UPNP]--; - PrintException(&e, "ThreadMapPort()"); - } catch (...) { - vnThreadsRunning[THREAD_UPNP]--; - PrintException(NULL, "ThreadMapPort()"); - } - printf("ThreadMapPort exited\n"); -} - -void ThreadMapPort2(void* parg) +void ThreadMapPort() { - printf("ThreadMapPort started\n"); - std::string port = strprintf("%u", GetListenPort()); const char * multicastif = 0; const char * minissdpdpath = 0; @@ -1059,33 +1109,9 @@ void ThreadMapPort2(void* parg) } string strDesc = "Bitcoin " + FormatFullVersion(); -#ifndef UPNPDISCOVER_SUCCESS - /* miniupnpc 1.5 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0); -#else - /* miniupnpc 1.6 */ - r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, - port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0"); -#endif - if(r!=UPNPCOMMAND_SUCCESS) - printf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n", - port.c_str(), port.c_str(), lanaddr, r, strupnperror(r)); - else - printf("UPnP Port Mapping successful.\n"); - int i = 1; - loop { - if (fShutdown || !fUseUPnP) - { - r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); - printf("UPNP_DeletePortMapping() returned : %d\n", r); - freeUPNPDevlist(devlist); devlist = 0; - FreeUPNPUrls(&urls); - return; - } - if (i % 600 == 0) // Refresh every 20 minutes - { + try { + loop { #ifndef UPNPDISCOVER_SUCCESS /* miniupnpc 1.5 */ r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype, @@ -1101,33 +1127,49 @@ void ThreadMapPort2(void* parg) port.c_str(), port.c_str(), lanaddr, r, strupnperror(r)); else printf("UPnP Port Mapping successful.\n");; + + MilliSleep(20*60*1000); // Refresh every 20 minutes } - Sleep(2000); - i++; + } + catch (boost::thread_interrupted) + { + r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0); + printf("UPNP_DeletePortMapping() returned : %d\n", r); + freeUPNPDevlist(devlist); devlist = 0; + FreeUPNPUrls(&urls); + throw; } } else { printf("No valid UPnP IGDs found\n"); freeUPNPDevlist(devlist); devlist = 0; if (r != 0) FreeUPNPUrls(&urls); - loop { - if (fShutdown || !fUseUPnP) - return; - Sleep(2000); - } } } -void MapPort() +void MapPort(bool fUseUPnP) { - if (fUseUPnP && vnThreadsRunning[THREAD_UPNP] < 1) + static boost::thread* upnp_thread = NULL; + + if (fUseUPnP) { - if (!NewThread(ThreadMapPort, NULL)) - printf("Error: ThreadMapPort(ThreadMapPort) failed\n"); + if (upnp_thread) { + upnp_thread->interrupt(); + upnp_thread->join(); + delete upnp_thread; + } + upnp_thread = new boost::thread(boost::bind(&TraceThread<boost::function<void()> >, "upnp", &ThreadMapPort)); + } + else if (upnp_thread) { + upnp_thread->interrupt(); + upnp_thread->join(); + delete upnp_thread; + upnp_thread = NULL; } } + #else -void MapPort() +void MapPort(bool) { // Intentionally left blank. } @@ -1155,35 +1197,14 @@ static const char *strMainNetDNSSeed[][2] = { static const char *strTestNetDNSSeed[][2] = { {"bitcoin.petertodd.org", "testnet-seed.bitcoin.petertodd.org"}, + {"bluematt.me", "testnet-seed.bluematt.me"}, {NULL, NULL} }; -void ThreadDNSAddressSeed(void* parg) -{ - // Make this thread recognisable as the DNS seeding thread - RenameThread("bitcoin-dnsseed"); - - try - { - vnThreadsRunning[THREAD_DNSSEED]++; - ThreadDNSAddressSeed2(parg); - vnThreadsRunning[THREAD_DNSSEED]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_DNSSEED]--; - PrintException(&e, "ThreadDNSAddressSeed()"); - } catch (...) { - vnThreadsRunning[THREAD_DNSSEED]--; - throw; // support pthread_cancel() - } - printf("ThreadDNSAddressSeed exited\n"); -} - -void ThreadDNSAddressSeed2(void* parg) +void ThreadDNSAddressSeed() { static const char *(*strDNSSeed)[2] = fTestNet ? strTestNetDNSSeed : strMainNetDNSSeed; - printf("ThreadDNSAddressSeed started\n"); int found = 0; printf("Loading addresses from DNS seeds (could take a while)\n"); @@ -1313,57 +1334,6 @@ void DumpAddresses() addrman.size(), GetTimeMillis() - nStart); } -void ThreadDumpAddress2(void* parg) -{ - printf("ThreadDumpAddress started\n"); - - vnThreadsRunning[THREAD_DUMPADDRESS]++; - while (!fShutdown) - { - DumpAddresses(); - vnThreadsRunning[THREAD_DUMPADDRESS]--; - Sleep(100000); - vnThreadsRunning[THREAD_DUMPADDRESS]++; - } - vnThreadsRunning[THREAD_DUMPADDRESS]--; -} - -void ThreadDumpAddress(void* parg) -{ - // Make this thread recognisable as the address dumping thread - RenameThread("bitcoin-adrdump"); - - try - { - ThreadDumpAddress2(parg); - } - catch (std::exception& e) { - PrintException(&e, "ThreadDumpAddress()"); - } - printf("ThreadDumpAddress exited\n"); -} - -void ThreadOpenConnections(void* parg) -{ - // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); - - try - { - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - ThreadOpenConnections2(parg); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - PrintException(&e, "ThreadOpenConnections()"); - } catch (...) { - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - PrintException(NULL, "ThreadOpenConnections()"); - } - printf("ThreadOpenConnections exited\n"); -} - void static ProcessOneShot() { string strDest; @@ -1382,10 +1352,8 @@ void static ProcessOneShot() } } -void ThreadOpenConnections2(void* parg) +void ThreadOpenConnections() { - printf("ThreadOpenConnections started\n"); - // Connect to specific addresses if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0) { @@ -1398,12 +1366,10 @@ void ThreadOpenConnections2(void* parg) OpenNetworkConnection(addr, NULL, strAddr.c_str()); for (int i = 0; i < 10 && i < nLoop; i++) { - Sleep(500); - if (fShutdown) - return; + MilliSleep(500); } } - Sleep(500); + MilliSleep(500); } } @@ -1413,18 +1379,10 @@ void ThreadOpenConnections2(void* parg) { ProcessOneShot(); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; - Sleep(500); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; - + MilliSleep(500); - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CSemaphoreGrant grant(*semOutbound); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return; + boost::this_thread::interruption_point(); // Add seed nodes if IRC isn't working if (addrman.size()==0 && (GetTime() - nStart > 60) && !fTestNet) @@ -1504,38 +1462,15 @@ void ThreadOpenConnections2(void* parg) } } -void ThreadOpenAddedConnections(void* parg) +void ThreadOpenAddedConnections() { - // Make this thread recognisable as the connection opening thread - RenameThread("bitcoin-opencon"); - - try - { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; - ThreadOpenAddedConnections2(parg); - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - } - catch (std::exception& e) { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - PrintException(&e, "ThreadOpenAddedConnections()"); - } catch (...) { - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - PrintException(NULL, "ThreadOpenAddedConnections()"); - } - printf("ThreadOpenAddedConnections exited\n"); -} - -void ThreadOpenAddedConnections2(void* parg) -{ - printf("ThreadOpenAddedConnections started\n"); - { LOCK(cs_vAddedNodes); vAddedNodes = mapMultiArgs["-addnode"]; } if (HaveNameProxy()) { - while(!fShutdown) { + while(true) { list<string> lAddresses(0); { LOCK(cs_vAddedNodes); @@ -1546,15 +1481,10 @@ void ThreadOpenAddedConnections2(void* parg) CAddress addr; CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(addr, &grant, strAddNode.c_str()); - Sleep(500); - if (fShutdown) - return; + MilliSleep(500); } - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - Sleep(120000); // Retry every 2 minutes - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; + MilliSleep(120000); // Retry every 2 minutes } - return; } for (unsigned int i = 0; true; i++) @@ -1598,17 +1528,9 @@ void ThreadOpenAddedConnections2(void* parg) { CSemaphoreGrant grant(*semOutbound); OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant); - Sleep(500); - if (fShutdown) - return; + MilliSleep(500); } - if (fShutdown) - return; - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--; - Sleep(120000); // Retry every 2 minutes - vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++; - if (fShutdown) - return; + MilliSleep(120000); // Retry every 2 minutes } } @@ -1618,8 +1540,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu // // Initiate outbound network connection // - if (fShutdown) - return false; + boost::this_thread::interruption_point(); if (!strDest) if (IsLocal(addrConnect) || FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) || @@ -1628,11 +1549,9 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu if (strDest && FindNode(strDest)) return false; - vnThreadsRunning[THREAD_OPENCONNECTIONS]--; CNode* pnode = ConnectNode(addrConnect, strDest); - vnThreadsRunning[THREAD_OPENCONNECTIONS]++; - if (fShutdown) - return false; + boost::this_thread::interruption_point(); + if (!pnode) return false; if (grantOutbound) @@ -1645,61 +1564,81 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu } +// for now, use a very simple selection metric: the node from which we received +// most recently +double static NodeSyncScore(const CNode *pnode) { + return -pnode->nLastRecv; +} +void static StartSync(const vector<CNode*> &vNodes) { + CNode *pnodeNewSync = NULL; + double dBestScore = 0; + // fImporting and fReindex are accessed out of cs_main here, but only + // as an optimization - they are checked again in SendMessages. + if (fImporting || fReindex) + return; - - - -void ThreadMessageHandler(void* parg) -{ - // Make this thread recognisable as the message handling thread - RenameThread("bitcoin-msghand"); - - try - { - vnThreadsRunning[THREAD_MESSAGEHANDLER]++; - ThreadMessageHandler2(parg); - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; + // Iterate over all nodes + BOOST_FOREACH(CNode* pnode, vNodes) { + // check preconditions for allowing a sync + if (!pnode->fClient && !pnode->fOneShot && + !pnode->fDisconnect && pnode->fSuccessfullyConnected && + (pnode->nStartingHeight > (nBestHeight - 144)) && + (pnode->nVersion < NOBLKS_VERSION_START || pnode->nVersion >= NOBLKS_VERSION_END)) { + // if ok, compare node's score with the best so far + double dScore = NodeSyncScore(pnode); + if (pnodeNewSync == NULL || dScore > dBestScore) { + pnodeNewSync = pnode; + dBestScore = dScore; + } + } } - catch (std::exception& e) { - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - PrintException(&e, "ThreadMessageHandler()"); - } catch (...) { - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - PrintException(NULL, "ThreadMessageHandler()"); + // if a new sync candidate was found, start sync! + if (pnodeNewSync) { + pnodeNewSync->fStartSync = true; + pnodeSync = pnodeNewSync; } - printf("ThreadMessageHandler exited\n"); } -void ThreadMessageHandler2(void* parg) +void ThreadMessageHandler() { - printf("ThreadMessageHandler started\n"); SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL); - while (!fShutdown) + while (true) { + bool fHaveSyncNode = false; + vector<CNode*> vNodesCopy; { LOCK(cs_vNodes); vNodesCopy = vNodes; - BOOST_FOREACH(CNode* pnode, vNodesCopy) + BOOST_FOREACH(CNode* pnode, vNodesCopy) { pnode->AddRef(); + if (pnode == pnodeSync) + fHaveSyncNode = true; + } } + if (!fHaveSyncNode) + StartSync(vNodesCopy); + // Poll the connected nodes for messages CNode* pnodeTrickle = NULL; if (!vNodesCopy.empty()) pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())]; BOOST_FOREACH(CNode* pnode, vNodesCopy) { + if (pnode->fDisconnect) + continue; + // Receive messages { - TRY_LOCK(pnode->cs_vRecv, lockRecv); + TRY_LOCK(pnode->cs_vRecvMsg, lockRecv); if (lockRecv) - ProcessMessages(pnode); + if (!ProcessMessages(pnode)) + pnode->CloseSocketDisconnect(); } - if (fShutdown) - return; + boost::this_thread::interruption_point(); // Send messages { @@ -1707,8 +1646,7 @@ void ThreadMessageHandler2(void* parg) if (lockSend) SendMessages(pnode, pnode == pnodeTrickle); } - if (fShutdown) - return; + boost::this_thread::interruption_point(); } { @@ -1717,16 +1655,7 @@ void ThreadMessageHandler2(void* parg) pnode->Release(); } - // Wait and allow messages to bunch up. - // Reduce vnThreadsRunning so StopNode has permission to exit while - // we're sleeping, but we must always check fShutdown after doing this. - vnThreadsRunning[THREAD_MESSAGEHANDLER]--; - Sleep(100); - if (fRequestShutdown) - StartShutdown(); - vnThreadsRunning[THREAD_MESSAGEHANDLER]++; - if (fShutdown) - return; + MilliSleep(100); } } @@ -1740,18 +1669,6 @@ bool BindListenPort(const CService &addrBind, string& strError) strError = ""; int nOne = 1; -#ifdef WIN32 - // Initialize Windows Sockets - WSADATA wsadata; - int ret = WSAStartup(MAKEWORD(2,2), &wsadata); - if (ret != NO_ERROR) - { - strError = strprintf("Error: TCP/IP socket library failed to start (WSAStartup returned error %d)", ret); - printf("%s\n", strError.c_str()); - return false; - } -#endif - // Create socket for listening for incoming connections #ifdef USE_IPV6 struct sockaddr_storage sockaddr; @@ -1898,14 +1815,11 @@ void static Discover() NewThread(ThreadGetMyExternalIP, NULL); } -void StartNode(void* parg) +void StartNode(boost::thread_group& threadGroup) { - // Make this thread recognisable as the startup thread - RenameThread("bitcoin-start"); - if (semOutbound == NULL) { // initialize semaphore - int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125)); + int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, nMaxConnections); semOutbound = new CSemaphore(nMaxOutbound); } @@ -1921,77 +1835,41 @@ void StartNode(void* parg) if (!GetBoolArg("-dnsseed", true)) printf("DNS seeding disabled\n"); else - if (!NewThread(ThreadDNSAddressSeed, NULL)) - printf("Error: NewThread(ThreadDNSAddressSeed) failed\n"); + threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", &ThreadDNSAddressSeed)); +#ifdef USE_UPNP // Map ports with UPnP - if (fUseUPnP) - MapPort(); - - // Get addresses from IRC and advertise ours - if (!NewThread(ThreadIRCSeed, NULL)) - printf("Error: NewThread(ThreadIRCSeed) failed\n"); + MapPort(GetBoolArg("-upnp", USE_UPNP)); +#endif // Send and receive from sockets, accept connections - if (!NewThread(ThreadSocketHandler, NULL)) - printf("Error: NewThread(ThreadSocketHandler) failed\n"); + threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler)); // Initiate outbound connections from -addnode - if (!NewThread(ThreadOpenAddedConnections, NULL)) - printf("Error: NewThread(ThreadOpenAddedConnections) failed\n"); + threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections)); // Initiate outbound connections - if (!NewThread(ThreadOpenConnections, NULL)) - printf("Error: NewThread(ThreadOpenConnections) failed\n"); + threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections)); // Process messages - if (!NewThread(ThreadMessageHandler, NULL)) - printf("Error: NewThread(ThreadMessageHandler) failed\n"); + threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler)); // Dump network addresses - if (!NewThread(ThreadDumpAddress, NULL)) - printf("Error; NewThread(ThreadDumpAddress) failed\n"); - - // Generate coins in the background - GenerateBitcoins(GetBoolArg("-gen", false), pwalletMain); + threadGroup.create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 10000)); } bool StopNode() { printf("StopNode()\n"); - fShutdown = true; + GenerateBitcoins(false, NULL); + MapPort(false); nTransactionsUpdated++; - int64 nStart = GetTime(); if (semOutbound) for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++) semOutbound->post(); - do - { - int nThreadsRunning = 0; - for (int n = 0; n < THREAD_MAX; n++) - nThreadsRunning += vnThreadsRunning[n]; - if (nThreadsRunning == 0) - break; - if (GetTime() - nStart > 20) - break; - Sleep(20); - } while(true); - if (vnThreadsRunning[THREAD_SOCKETHANDLER] > 0) printf("ThreadSocketHandler still running\n"); - if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n"); - if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n"); - if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n"); - if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n"); - if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n"); -#ifdef USE_UPNP - if (vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n"); -#endif - if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n"); - if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n"); - if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n"); - while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0) - Sleep(20); - Sleep(50); + MilliSleep(50); DumpAddresses(); + return true; } @@ -2012,6 +1890,18 @@ public: if (closesocket(hListenSocket) == SOCKET_ERROR) printf("closesocket(hListenSocket) failed with error %d\n", WSAGetLastError()); + // clean up some globals (to help leak detection) + BOOST_FOREACH(CNode *pnode, vNodes) + delete pnode; + BOOST_FOREACH(CNode *pnode, vNodesDisconnected) + delete pnode; + vNodes.clear(); + vNodesDisconnected.clear(); + delete semOutbound; + semOutbound = NULL; + delete pnodeLocalHost; + pnodeLocalHost = NULL; + #ifdef WIN32 // Shutdown Windows Sockets WSACleanup(); |