aboutsummaryrefslogtreecommitdiff
path: root/src/net.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/net.cpp')
-rw-r--r--src/net.cpp722
1 files changed, 306 insertions, 416 deletions
diff --git a/src/net.cpp b/src/net.cpp
index 3406a28b0..54ed1d9b5 100644
--- a/src/net.cpp
+++ b/src/net.cpp
@@ -3,7 +3,6 @@
// Distributed under the MIT/X11 software license, see the accompanying
// file COPYING or http://www.opensource.org/licenses/mit-license.php.
-#include "irc.h"
#include "db.h"
#include "net.h"
#include "init.h"
@@ -27,14 +26,6 @@ using namespace boost;
static const int MAX_OUTBOUND_CONNECTIONS = 8;
-void ThreadMessageHandler2(void* parg);
-void ThreadSocketHandler2(void* parg);
-void ThreadOpenConnections2(void* parg);
-void ThreadOpenAddedConnections2(void* parg);
-#ifdef USE_UPNP
-void ThreadMapPort2(void* parg);
-#endif
-void ThreadDNSAddressSeed2(void* parg);
bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOutbound = NULL, const char *strDest = NULL, bool fOneShot = false);
@@ -47,24 +38,24 @@ struct LocalServiceInfo {
// Global state variables
//
bool fDiscover = true;
-bool fUseUPnP = false;
uint64 nLocalServices = NODE_NETWORK;
static CCriticalSection cs_mapLocalHost;
static map<CNetAddr, LocalServiceInfo> mapLocalHost;
static bool vfReachable[NET_MAX] = {};
static bool vfLimited[NET_MAX] = {};
static CNode* pnodeLocalHost = NULL;
+static CNode* pnodeSync = NULL;
uint64 nLocalHostNonce = 0;
-array<int, THREAD_MAX> vnThreadsRunning;
static std::vector<SOCKET> vhListenSocket;
CAddrMan addrman;
+int nMaxConnections = 125;
vector<CNode*> vNodes;
CCriticalSection cs_vNodes;
map<CInv, CDataStream> mapRelay;
deque<pair<int64, CInv> > vRelayExpiration;
CCriticalSection cs_mapRelay;
-map<CInv, int64> mapAlreadyAskedFor;
+limitedmap<CInv, int64> mapAlreadyAskedFor(MAX_INV_SZ);
static deque<string> vOneShots;
CCriticalSection cs_vOneShots;
@@ -157,8 +148,7 @@ bool RecvLine(SOCKET hSocket, string& strLine)
}
else if (nBytes <= 0)
{
- if (fShutdown)
- return false;
+ boost::this_thread::interruption_point();
if (nBytes < 0)
{
int nErr = WSAGetLastError();
@@ -166,7 +156,7 @@ bool RecvLine(SOCKET hSocket, string& strLine)
continue;
if (nErr == WSAEWOULDBLOCK || nErr == WSAEINTR || nErr == WSAEINPROGRESS)
{
- Sleep(10);
+ MilliSleep(10);
continue;
}
}
@@ -347,7 +337,6 @@ bool GetMyExternalIP2(const CService& addrConnect, const char* pszGet, const cha
return error("GetMyExternalIP() : connection closed");
}
-// We now get our external IP from the IRC server first and only use this as a backup
bool GetMyExternalIP(CNetAddr& ipRet)
{
CService addrConnect;
@@ -357,13 +346,13 @@ bool GetMyExternalIP(CNetAddr& ipRet)
for (int nLookup = 0; nLookup <= 1; nLookup++)
for (int nHost = 1; nHost <= 2; nHost++)
{
- // We should be phasing out our use of sites like these. If we need
+ // We should be phasing out our use of sites like these. If we need
// replacements, we should ask for volunteers to put this simple
// php file on their web server that prints the client IP:
// <?php echo $_SERVER["REMOTE_ADDR"]; ?>
if (nHost == 1)
{
- addrConnect = CService("91.198.22.70",80); // checkip.dyndns.org
+ addrConnect = CService("91.198.22.70", 80); // checkip.dyndns.org
if (nLookup == 1)
{
@@ -437,12 +426,10 @@ void AddressCurrentlyConnected(const CService& addr)
CNode* FindNode(const CNetAddr& ip)
{
- {
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if ((CNetAddr)pnode->addr == ip)
- return (pnode);
- }
+ LOCK(cs_vNodes);
+ BOOST_FOREACH(CNode* pnode, vNodes)
+ if ((CNetAddr)pnode->addr == ip)
+ return (pnode);
return NULL;
}
@@ -457,16 +444,14 @@ CNode* FindNode(std::string addrName)
CNode* FindNode(const CService& addr)
{
- {
- LOCK(cs_vNodes);
- BOOST_FOREACH(CNode* pnode, vNodes)
- if ((CService)pnode->addr == addr)
- return (pnode);
- }
+ LOCK(cs_vNodes);
+ BOOST_FOREACH(CNode* pnode, vNodes)
+ if ((CService)pnode->addr == addr)
+ return (pnode);
return NULL;
}
-CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout)
+CNode* ConnectNode(CAddress addrConnect, const char *pszDest)
{
if (pszDest == NULL) {
if (IsLocal(addrConnect))
@@ -476,10 +461,7 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout)
CNode* pnode = FindNode((CService)addrConnect);
if (pnode)
{
- if (nTimeout != 0)
- pnode->AddRef(nTimeout);
- else
- pnode->AddRef();
+ pnode->AddRef();
return pnode;
}
}
@@ -511,10 +493,7 @@ CNode* ConnectNode(CAddress addrConnect, const char *pszDest, int64 nTimeout)
// Add node
CNode* pnode = new CNode(hSocket, addrConnect, pszDest ? pszDest : "", false);
- if (nTimeout != 0)
- pnode->AddRef(nTimeout);
- else
- pnode->AddRef();
+ pnode->AddRef();
{
LOCK(cs_vNodes);
@@ -538,8 +517,16 @@ void CNode::CloseSocketDisconnect()
printf("disconnecting node %s\n", addrName.c_str());
closesocket(hSocket);
hSocket = INVALID_SOCKET;
- vRecv.clear();
}
+
+ // in case this fails, we'll empty the recv buffer when the CNode is deleted
+ TRY_LOCK(cs_vRecvMsg, lockRecv);
+ if (lockRecv)
+ vRecvMsg.clear();
+
+ // if this was the sync node, we'll need a new one
+ if (this == pnodeSync)
+ pnodeSync = NULL;
}
void CNode::Cleanup()
@@ -624,48 +611,142 @@ void CNode::copyStats(CNodeStats &stats)
X(nVersion);
X(strSubVer);
X(fInbound);
- X(nReleaseTime);
X(nStartingHeight);
X(nMisbehavior);
+ X(nSendBytes);
+ X(nRecvBytes);
+ stats.fSyncNode = (this == pnodeSync);
}
#undef X
+// requires LOCK(cs_vRecvMsg)
+bool CNode::ReceiveMsgBytes(const char *pch, unsigned int nBytes)
+{
+ while (nBytes > 0) {
+
+ // get current incomplete message, or create a new one
+ if (vRecvMsg.empty() ||
+ vRecvMsg.back().complete())
+ vRecvMsg.push_back(CNetMessage(SER_NETWORK, nRecvVersion));
+
+ CNetMessage& msg = vRecvMsg.back();
+ // absorb network data
+ int handled;
+ if (!msg.in_data)
+ handled = msg.readHeader(pch, nBytes);
+ else
+ handled = msg.readData(pch, nBytes);
+ if (handled < 0)
+ return false;
+ pch += handled;
+ nBytes -= handled;
+ }
+ return true;
+}
+int CNetMessage::readHeader(const char *pch, unsigned int nBytes)
+{
+ // copy data to temporary parsing buffer
+ unsigned int nRemaining = 24 - nHdrPos;
+ unsigned int nCopy = std::min(nRemaining, nBytes);
+ memcpy(&hdrbuf[nHdrPos], pch, nCopy);
+ nHdrPos += nCopy;
+ // if header incomplete, exit
+ if (nHdrPos < 24)
+ return nCopy;
+ // deserialize to CMessageHeader
+ try {
+ hdrbuf >> hdr;
+ }
+ catch (std::exception &e) {
+ return -1;
+ }
+
+ // reject messages larger than MAX_SIZE
+ if (hdr.nMessageSize > MAX_SIZE)
+ return -1;
+
+ // switch state to reading message data
+ in_data = true;
+ vRecv.resize(hdr.nMessageSize);
-void ThreadSocketHandler(void* parg)
+ return nCopy;
+}
+
+int CNetMessage::readData(const char *pch, unsigned int nBytes)
{
- // Make this thread recognisable as the networking thread
- RenameThread("bitcoin-net");
+ unsigned int nRemaining = hdr.nMessageSize - nDataPos;
+ unsigned int nCopy = std::min(nRemaining, nBytes);
- try
- {
- vnThreadsRunning[THREAD_SOCKETHANDLER]++;
- ThreadSocketHandler2(parg);
- vnThreadsRunning[THREAD_SOCKETHANDLER]--;
+ memcpy(&vRecv[nDataPos], pch, nCopy);
+ nDataPos += nCopy;
+
+ return nCopy;
+}
+
+
+
+
+
+
+
+
+
+// requires LOCK(cs_vSend)
+void SocketSendData(CNode *pnode)
+{
+ std::deque<CSerializeData>::iterator it = pnode->vSendMsg.begin();
+
+ while (it != pnode->vSendMsg.end()) {
+ const CSerializeData &data = *it;
+ assert(data.size() > pnode->nSendOffset);
+ int nBytes = send(pnode->hSocket, &data[pnode->nSendOffset], data.size() - pnode->nSendOffset, MSG_NOSIGNAL | MSG_DONTWAIT);
+ if (nBytes > 0) {
+ pnode->nLastSend = GetTime();
+ pnode->nSendBytes += nBytes;
+ pnode->nSendOffset += nBytes;
+ if (pnode->nSendOffset == data.size()) {
+ pnode->nSendOffset = 0;
+ pnode->nSendSize -= data.size();
+ it++;
+ } else {
+ // could not send full message; stop sending more
+ break;
+ }
+ } else {
+ if (nBytes < 0) {
+ // error
+ int nErr = WSAGetLastError();
+ if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
+ {
+ printf("socket send error %d\n", nErr);
+ pnode->CloseSocketDisconnect();
+ }
+ }
+ // couldn't send anything at all
+ break;
+ }
}
- catch (std::exception& e) {
- vnThreadsRunning[THREAD_SOCKETHANDLER]--;
- PrintException(&e, "ThreadSocketHandler()");
- } catch (...) {
- vnThreadsRunning[THREAD_SOCKETHANDLER]--;
- throw; // support pthread_cancel()
+
+ if (it == pnode->vSendMsg.end()) {
+ assert(pnode->nSendOffset == 0);
+ assert(pnode->nSendSize == 0);
}
- printf("ThreadSocketHandler exited\n");
+ pnode->vSendMsg.erase(pnode->vSendMsg.begin(), it);
}
-void ThreadSocketHandler2(void* parg)
+static list<CNode*> vNodesDisconnected;
+
+void ThreadSocketHandler()
{
- printf("ThreadSocketHandler started\n");
- list<CNode*> vNodesDisconnected;
unsigned int nPrevNodeCount = 0;
-
loop
{
//
@@ -678,7 +759,7 @@ void ThreadSocketHandler2(void* parg)
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
if (pnode->fDisconnect ||
- (pnode->GetRefCount() <= 0 && pnode->vRecv.empty() && pnode->vSend.empty()))
+ (pnode->GetRefCount() <= 0 && pnode->vRecvMsg.empty() && pnode->nSendSize == 0 && pnode->ssSend.empty()))
{
// remove from vNodes
vNodes.erase(remove(vNodes.begin(), vNodes.end(), pnode), vNodes.end());
@@ -691,7 +772,6 @@ void ThreadSocketHandler2(void* parg)
pnode->Cleanup();
// hold in disconnected pool until all refs are released
- pnode->nReleaseTime = max(pnode->nReleaseTime, GetTime() + 15 * 60);
if (pnode->fNetworkNode || pnode->fInbound)
pnode->Release();
vNodesDisconnected.push_back(pnode);
@@ -710,7 +790,7 @@ void ThreadSocketHandler2(void* parg)
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
{
- TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
TRY_LOCK(pnode->cs_inventory, lockInv);
@@ -761,24 +841,46 @@ void ThreadSocketHandler2(void* parg)
{
if (pnode->hSocket == INVALID_SOCKET)
continue;
- FD_SET(pnode->hSocket, &fdsetRecv);
FD_SET(pnode->hSocket, &fdsetError);
hSocketMax = max(hSocketMax, pnode->hSocket);
have_fds = true;
+
+ // Implement the following logic:
+ // * If there is data to send, select() for sending data. As this only
+ // happens when optimistic write failed, we choose to first drain the
+ // write buffer in this case before receiving more. This avoids
+ // needlessly queueing received data, if the remote peer is not themselves
+ // receiving data. This means properly utilizing TCP flow control signalling.
+ // * Otherwise, if there is no (complete) message in the receive buffer,
+ // or there is space left in the buffer, select() for receiving data.
+ // * (if neither of the above applies, there is certainly one message
+ // in the receiver buffer ready to be processed).
+ // Together, that means that at least one of the following is always possible,
+ // so we don't deadlock:
+ // * We send some data.
+ // * We wait for data to be received (and disconnect after timeout).
+ // * We process a message in the buffer (message handler thread).
{
TRY_LOCK(pnode->cs_vSend, lockSend);
- if (lockSend && !pnode->vSend.empty())
+ if (lockSend && !pnode->vSendMsg.empty()) {
FD_SET(pnode->hSocket, &fdsetSend);
+ continue;
+ }
+ }
+ {
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
+ if (lockRecv && (
+ pnode->vRecvMsg.empty() || !pnode->vRecvMsg.front().complete() ||
+ pnode->GetTotalRecvSize() <= ReceiveFloodSize()))
+ FD_SET(pnode->hSocket, &fdsetRecv);
}
}
}
- vnThreadsRunning[THREAD_SOCKETHANDLER]--;
int nSelect = select(have_fds ? hSocketMax + 1 : 0,
&fdsetRecv, &fdsetSend, &fdsetError, &timeout);
- vnThreadsRunning[THREAD_SOCKETHANDLER]++;
- if (fShutdown)
- return;
+ boost::this_thread::interruption_point();
+
if (nSelect == SOCKET_ERROR)
{
if (have_fds)
@@ -790,7 +892,7 @@ void ThreadSocketHandler2(void* parg)
}
FD_ZERO(&fdsetSend);
FD_ZERO(&fdsetError);
- Sleep(timeout.tv_usec/1000);
+ MilliSleep(timeout.tv_usec/1000);
}
@@ -827,7 +929,7 @@ void ThreadSocketHandler2(void* parg)
if (nErr != WSAEWOULDBLOCK)
printf("socket error accept failed: %d\n", nErr);
}
- else if (nInbound >= GetArg("-maxconnections", 125) - MAX_OUTBOUND_CONNECTIONS)
+ else if (nInbound >= nMaxConnections - MAX_OUTBOUND_CONNECTIONS)
{
{
LOCK(cs_setservAddNodeAddresses);
@@ -865,8 +967,7 @@ void ThreadSocketHandler2(void* parg)
}
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
- if (fShutdown)
- return;
+ boost::this_thread::interruption_point();
//
// Receive
@@ -875,26 +976,19 @@ void ThreadSocketHandler2(void* parg)
continue;
if (FD_ISSET(pnode->hSocket, &fdsetRecv) || FD_ISSET(pnode->hSocket, &fdsetError))
{
- TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
{
- CDataStream& vRecv = pnode->vRecv;
- unsigned int nPos = vRecv.size();
-
- if (nPos > ReceiveBufferSize()) {
- if (!pnode->fDisconnect)
- printf("socket recv flood control disconnect (%"PRIszu" bytes)\n", vRecv.size());
- pnode->CloseSocketDisconnect();
- }
- else {
+ {
// typical socket buffer is 8K-64K
char pchBuf[0x10000];
int nBytes = recv(pnode->hSocket, pchBuf, sizeof(pchBuf), MSG_DONTWAIT);
if (nBytes > 0)
{
- vRecv.resize(nPos + nBytes);
- memcpy(&vRecv[nPos], pchBuf, nBytes);
+ if (!pnode->ReceiveMsgBytes(pchBuf, nBytes))
+ pnode->CloseSocketDisconnect();
pnode->nLastRecv = GetTime();
+ pnode->nRecvBytes += nBytes;
}
else if (nBytes == 0)
{
@@ -927,34 +1021,13 @@ void ThreadSocketHandler2(void* parg)
{
TRY_LOCK(pnode->cs_vSend, lockSend);
if (lockSend)
- {
- CDataStream& vSend = pnode->vSend;
- if (!vSend.empty())
- {
- int nBytes = send(pnode->hSocket, &vSend[0], vSend.size(), MSG_NOSIGNAL | MSG_DONTWAIT);
- if (nBytes > 0)
- {
- vSend.erase(vSend.begin(), vSend.begin() + nBytes);
- pnode->nLastSend = GetTime();
- }
- else if (nBytes < 0)
- {
- // error
- int nErr = WSAGetLastError();
- if (nErr != WSAEWOULDBLOCK && nErr != WSAEMSGSIZE && nErr != WSAEINTR && nErr != WSAEINPROGRESS)
- {
- printf("socket send error %d\n", nErr);
- pnode->CloseSocketDisconnect();
- }
- }
- }
- }
+ SocketSendData(pnode);
}
//
// Inactivity checking
//
- if (pnode->vSend.empty())
+ if (pnode->vSendMsg.empty())
pnode->nLastSendEmpty = GetTime();
if (GetTime() - pnode->nTimeConnected > 60)
{
@@ -981,7 +1054,7 @@ void ThreadSocketHandler2(void* parg)
pnode->Release();
}
- Sleep(10);
+ MilliSleep(10);
}
}
@@ -994,31 +1067,8 @@ void ThreadSocketHandler2(void* parg)
#ifdef USE_UPNP
-void ThreadMapPort(void* parg)
-{
- // Make this thread recognisable as the UPnP thread
- RenameThread("bitcoin-UPnP");
-
- try
- {
- vnThreadsRunning[THREAD_UPNP]++;
- ThreadMapPort2(parg);
- vnThreadsRunning[THREAD_UPNP]--;
- }
- catch (std::exception& e) {
- vnThreadsRunning[THREAD_UPNP]--;
- PrintException(&e, "ThreadMapPort()");
- } catch (...) {
- vnThreadsRunning[THREAD_UPNP]--;
- PrintException(NULL, "ThreadMapPort()");
- }
- printf("ThreadMapPort exited\n");
-}
-
-void ThreadMapPort2(void* parg)
+void ThreadMapPort()
{
- printf("ThreadMapPort started\n");
-
std::string port = strprintf("%u", GetListenPort());
const char * multicastif = 0;
const char * minissdpdpath = 0;
@@ -1059,33 +1109,9 @@ void ThreadMapPort2(void* parg)
}
string strDesc = "Bitcoin " + FormatFullVersion();
-#ifndef UPNPDISCOVER_SUCCESS
- /* miniupnpc 1.5 */
- r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
- port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0);
-#else
- /* miniupnpc 1.6 */
- r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
- port.c_str(), port.c_str(), lanaddr, strDesc.c_str(), "TCP", 0, "0");
-#endif
- if(r!=UPNPCOMMAND_SUCCESS)
- printf("AddPortMapping(%s, %s, %s) failed with code %d (%s)\n",
- port.c_str(), port.c_str(), lanaddr, r, strupnperror(r));
- else
- printf("UPnP Port Mapping successful.\n");
- int i = 1;
- loop {
- if (fShutdown || !fUseUPnP)
- {
- r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0);
- printf("UPNP_DeletePortMapping() returned : %d\n", r);
- freeUPNPDevlist(devlist); devlist = 0;
- FreeUPNPUrls(&urls);
- return;
- }
- if (i % 600 == 0) // Refresh every 20 minutes
- {
+ try {
+ loop {
#ifndef UPNPDISCOVER_SUCCESS
/* miniupnpc 1.5 */
r = UPNP_AddPortMapping(urls.controlURL, data.first.servicetype,
@@ -1101,33 +1127,49 @@ void ThreadMapPort2(void* parg)
port.c_str(), port.c_str(), lanaddr, r, strupnperror(r));
else
printf("UPnP Port Mapping successful.\n");;
+
+ MilliSleep(20*60*1000); // Refresh every 20 minutes
}
- Sleep(2000);
- i++;
+ }
+ catch (boost::thread_interrupted)
+ {
+ r = UPNP_DeletePortMapping(urls.controlURL, data.first.servicetype, port.c_str(), "TCP", 0);
+ printf("UPNP_DeletePortMapping() returned : %d\n", r);
+ freeUPNPDevlist(devlist); devlist = 0;
+ FreeUPNPUrls(&urls);
+ throw;
}
} else {
printf("No valid UPnP IGDs found\n");
freeUPNPDevlist(devlist); devlist = 0;
if (r != 0)
FreeUPNPUrls(&urls);
- loop {
- if (fShutdown || !fUseUPnP)
- return;
- Sleep(2000);
- }
}
}
-void MapPort()
+void MapPort(bool fUseUPnP)
{
- if (fUseUPnP && vnThreadsRunning[THREAD_UPNP] < 1)
+ static boost::thread* upnp_thread = NULL;
+
+ if (fUseUPnP)
{
- if (!NewThread(ThreadMapPort, NULL))
- printf("Error: ThreadMapPort(ThreadMapPort) failed\n");
+ if (upnp_thread) {
+ upnp_thread->interrupt();
+ upnp_thread->join();
+ delete upnp_thread;
+ }
+ upnp_thread = new boost::thread(boost::bind(&TraceThread<boost::function<void()> >, "upnp", &ThreadMapPort));
+ }
+ else if (upnp_thread) {
+ upnp_thread->interrupt();
+ upnp_thread->join();
+ delete upnp_thread;
+ upnp_thread = NULL;
}
}
+
#else
-void MapPort()
+void MapPort(bool)
{
// Intentionally left blank.
}
@@ -1155,35 +1197,14 @@ static const char *strMainNetDNSSeed[][2] = {
static const char *strTestNetDNSSeed[][2] = {
{"bitcoin.petertodd.org", "testnet-seed.bitcoin.petertodd.org"},
+ {"bluematt.me", "testnet-seed.bluematt.me"},
{NULL, NULL}
};
-void ThreadDNSAddressSeed(void* parg)
-{
- // Make this thread recognisable as the DNS seeding thread
- RenameThread("bitcoin-dnsseed");
-
- try
- {
- vnThreadsRunning[THREAD_DNSSEED]++;
- ThreadDNSAddressSeed2(parg);
- vnThreadsRunning[THREAD_DNSSEED]--;
- }
- catch (std::exception& e) {
- vnThreadsRunning[THREAD_DNSSEED]--;
- PrintException(&e, "ThreadDNSAddressSeed()");
- } catch (...) {
- vnThreadsRunning[THREAD_DNSSEED]--;
- throw; // support pthread_cancel()
- }
- printf("ThreadDNSAddressSeed exited\n");
-}
-
-void ThreadDNSAddressSeed2(void* parg)
+void ThreadDNSAddressSeed()
{
static const char *(*strDNSSeed)[2] = fTestNet ? strTestNetDNSSeed : strMainNetDNSSeed;
- printf("ThreadDNSAddressSeed started\n");
int found = 0;
printf("Loading addresses from DNS seeds (could take a while)\n");
@@ -1313,57 +1334,6 @@ void DumpAddresses()
addrman.size(), GetTimeMillis() - nStart);
}
-void ThreadDumpAddress2(void* parg)
-{
- printf("ThreadDumpAddress started\n");
-
- vnThreadsRunning[THREAD_DUMPADDRESS]++;
- while (!fShutdown)
- {
- DumpAddresses();
- vnThreadsRunning[THREAD_DUMPADDRESS]--;
- Sleep(100000);
- vnThreadsRunning[THREAD_DUMPADDRESS]++;
- }
- vnThreadsRunning[THREAD_DUMPADDRESS]--;
-}
-
-void ThreadDumpAddress(void* parg)
-{
- // Make this thread recognisable as the address dumping thread
- RenameThread("bitcoin-adrdump");
-
- try
- {
- ThreadDumpAddress2(parg);
- }
- catch (std::exception& e) {
- PrintException(&e, "ThreadDumpAddress()");
- }
- printf("ThreadDumpAddress exited\n");
-}
-
-void ThreadOpenConnections(void* parg)
-{
- // Make this thread recognisable as the connection opening thread
- RenameThread("bitcoin-opencon");
-
- try
- {
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- ThreadOpenConnections2(parg);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- }
- catch (std::exception& e) {
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- PrintException(&e, "ThreadOpenConnections()");
- } catch (...) {
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- PrintException(NULL, "ThreadOpenConnections()");
- }
- printf("ThreadOpenConnections exited\n");
-}
-
void static ProcessOneShot()
{
string strDest;
@@ -1382,10 +1352,8 @@ void static ProcessOneShot()
}
}
-void ThreadOpenConnections2(void* parg)
+void ThreadOpenConnections()
{
- printf("ThreadOpenConnections started\n");
-
// Connect to specific addresses
if (mapArgs.count("-connect") && mapMultiArgs["-connect"].size() > 0)
{
@@ -1398,12 +1366,10 @@ void ThreadOpenConnections2(void* parg)
OpenNetworkConnection(addr, NULL, strAddr.c_str());
for (int i = 0; i < 10 && i < nLoop; i++)
{
- Sleep(500);
- if (fShutdown)
- return;
+ MilliSleep(500);
}
}
- Sleep(500);
+ MilliSleep(500);
}
}
@@ -1413,18 +1379,10 @@ void ThreadOpenConnections2(void* parg)
{
ProcessOneShot();
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
- Sleep(500);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
-
+ MilliSleep(500);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
CSemaphoreGrant grant(*semOutbound);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return;
+ boost::this_thread::interruption_point();
// Add seed nodes if IRC isn't working
if (addrman.size()==0 && (GetTime() - nStart > 60) && !fTestNet)
@@ -1504,38 +1462,15 @@ void ThreadOpenConnections2(void* parg)
}
}
-void ThreadOpenAddedConnections(void* parg)
+void ThreadOpenAddedConnections()
{
- // Make this thread recognisable as the connection opening thread
- RenameThread("bitcoin-opencon");
-
- try
- {
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
- ThreadOpenAddedConnections2(parg);
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
- }
- catch (std::exception& e) {
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
- PrintException(&e, "ThreadOpenAddedConnections()");
- } catch (...) {
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
- PrintException(NULL, "ThreadOpenAddedConnections()");
- }
- printf("ThreadOpenAddedConnections exited\n");
-}
-
-void ThreadOpenAddedConnections2(void* parg)
-{
- printf("ThreadOpenAddedConnections started\n");
-
{
LOCK(cs_vAddedNodes);
vAddedNodes = mapMultiArgs["-addnode"];
}
if (HaveNameProxy()) {
- while(!fShutdown) {
+ while(true) {
list<string> lAddresses(0);
{
LOCK(cs_vAddedNodes);
@@ -1546,15 +1481,10 @@ void ThreadOpenAddedConnections2(void* parg)
CAddress addr;
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(addr, &grant, strAddNode.c_str());
- Sleep(500);
- if (fShutdown)
- return;
+ MilliSleep(500);
}
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
- Sleep(120000); // Retry every 2 minutes
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
+ MilliSleep(120000); // Retry every 2 minutes
}
- return;
}
for (unsigned int i = 0; true; i++)
@@ -1598,17 +1528,9 @@ void ThreadOpenAddedConnections2(void* parg)
{
CSemaphoreGrant grant(*semOutbound);
OpenNetworkConnection(CAddress(vserv[i % vserv.size()]), &grant);
- Sleep(500);
- if (fShutdown)
- return;
+ MilliSleep(500);
}
- if (fShutdown)
- return;
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]--;
- Sleep(120000); // Retry every 2 minutes
- vnThreadsRunning[THREAD_ADDEDCONNECTIONS]++;
- if (fShutdown)
- return;
+ MilliSleep(120000); // Retry every 2 minutes
}
}
@@ -1618,8 +1540,7 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
//
// Initiate outbound network connection
//
- if (fShutdown)
- return false;
+ boost::this_thread::interruption_point();
if (!strDest)
if (IsLocal(addrConnect) ||
FindNode((CNetAddr)addrConnect) || CNode::IsBanned(addrConnect) ||
@@ -1628,11 +1549,9 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
if (strDest && FindNode(strDest))
return false;
- vnThreadsRunning[THREAD_OPENCONNECTIONS]--;
CNode* pnode = ConnectNode(addrConnect, strDest);
- vnThreadsRunning[THREAD_OPENCONNECTIONS]++;
- if (fShutdown)
- return false;
+ boost::this_thread::interruption_point();
+
if (!pnode)
return false;
if (grantOutbound)
@@ -1645,61 +1564,81 @@ bool OpenNetworkConnection(const CAddress& addrConnect, CSemaphoreGrant *grantOu
}
+// for now, use a very simple selection metric: the node from which we received
+// most recently
+double static NodeSyncScore(const CNode *pnode) {
+ return -pnode->nLastRecv;
+}
+void static StartSync(const vector<CNode*> &vNodes) {
+ CNode *pnodeNewSync = NULL;
+ double dBestScore = 0;
+ // fImporting and fReindex are accessed out of cs_main here, but only
+ // as an optimization - they are checked again in SendMessages.
+ if (fImporting || fReindex)
+ return;
-
-
-
-void ThreadMessageHandler(void* parg)
-{
- // Make this thread recognisable as the message handling thread
- RenameThread("bitcoin-msghand");
-
- try
- {
- vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
- ThreadMessageHandler2(parg);
- vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
+ // Iterate over all nodes
+ BOOST_FOREACH(CNode* pnode, vNodes) {
+ // check preconditions for allowing a sync
+ if (!pnode->fClient && !pnode->fOneShot &&
+ !pnode->fDisconnect && pnode->fSuccessfullyConnected &&
+ (pnode->nStartingHeight > (nBestHeight - 144)) &&
+ (pnode->nVersion < NOBLKS_VERSION_START || pnode->nVersion >= NOBLKS_VERSION_END)) {
+ // if ok, compare node's score with the best so far
+ double dScore = NodeSyncScore(pnode);
+ if (pnodeNewSync == NULL || dScore > dBestScore) {
+ pnodeNewSync = pnode;
+ dBestScore = dScore;
+ }
+ }
}
- catch (std::exception& e) {
- vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
- PrintException(&e, "ThreadMessageHandler()");
- } catch (...) {
- vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
- PrintException(NULL, "ThreadMessageHandler()");
+ // if a new sync candidate was found, start sync!
+ if (pnodeNewSync) {
+ pnodeNewSync->fStartSync = true;
+ pnodeSync = pnodeNewSync;
}
- printf("ThreadMessageHandler exited\n");
}
-void ThreadMessageHandler2(void* parg)
+void ThreadMessageHandler()
{
- printf("ThreadMessageHandler started\n");
SetThreadPriority(THREAD_PRIORITY_BELOW_NORMAL);
- while (!fShutdown)
+ while (true)
{
+ bool fHaveSyncNode = false;
+
vector<CNode*> vNodesCopy;
{
LOCK(cs_vNodes);
vNodesCopy = vNodes;
- BOOST_FOREACH(CNode* pnode, vNodesCopy)
+ BOOST_FOREACH(CNode* pnode, vNodesCopy) {
pnode->AddRef();
+ if (pnode == pnodeSync)
+ fHaveSyncNode = true;
+ }
}
+ if (!fHaveSyncNode)
+ StartSync(vNodesCopy);
+
// Poll the connected nodes for messages
CNode* pnodeTrickle = NULL;
if (!vNodesCopy.empty())
pnodeTrickle = vNodesCopy[GetRand(vNodesCopy.size())];
BOOST_FOREACH(CNode* pnode, vNodesCopy)
{
+ if (pnode->fDisconnect)
+ continue;
+
// Receive messages
{
- TRY_LOCK(pnode->cs_vRecv, lockRecv);
+ TRY_LOCK(pnode->cs_vRecvMsg, lockRecv);
if (lockRecv)
- ProcessMessages(pnode);
+ if (!ProcessMessages(pnode))
+ pnode->CloseSocketDisconnect();
}
- if (fShutdown)
- return;
+ boost::this_thread::interruption_point();
// Send messages
{
@@ -1707,8 +1646,7 @@ void ThreadMessageHandler2(void* parg)
if (lockSend)
SendMessages(pnode, pnode == pnodeTrickle);
}
- if (fShutdown)
- return;
+ boost::this_thread::interruption_point();
}
{
@@ -1717,16 +1655,7 @@ void ThreadMessageHandler2(void* parg)
pnode->Release();
}
- // Wait and allow messages to bunch up.
- // Reduce vnThreadsRunning so StopNode has permission to exit while
- // we're sleeping, but we must always check fShutdown after doing this.
- vnThreadsRunning[THREAD_MESSAGEHANDLER]--;
- Sleep(100);
- if (fRequestShutdown)
- StartShutdown();
- vnThreadsRunning[THREAD_MESSAGEHANDLER]++;
- if (fShutdown)
- return;
+ MilliSleep(100);
}
}
@@ -1740,18 +1669,6 @@ bool BindListenPort(const CService &addrBind, string& strError)
strError = "";
int nOne = 1;
-#ifdef WIN32
- // Initialize Windows Sockets
- WSADATA wsadata;
- int ret = WSAStartup(MAKEWORD(2,2), &wsadata);
- if (ret != NO_ERROR)
- {
- strError = strprintf("Error: TCP/IP socket library failed to start (WSAStartup returned error %d)", ret);
- printf("%s\n", strError.c_str());
- return false;
- }
-#endif
-
// Create socket for listening for incoming connections
#ifdef USE_IPV6
struct sockaddr_storage sockaddr;
@@ -1898,14 +1815,11 @@ void static Discover()
NewThread(ThreadGetMyExternalIP, NULL);
}
-void StartNode(void* parg)
+void StartNode(boost::thread_group& threadGroup)
{
- // Make this thread recognisable as the startup thread
- RenameThread("bitcoin-start");
-
if (semOutbound == NULL) {
// initialize semaphore
- int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, (int)GetArg("-maxconnections", 125));
+ int nMaxOutbound = min(MAX_OUTBOUND_CONNECTIONS, nMaxConnections);
semOutbound = new CSemaphore(nMaxOutbound);
}
@@ -1921,77 +1835,41 @@ void StartNode(void* parg)
if (!GetBoolArg("-dnsseed", true))
printf("DNS seeding disabled\n");
else
- if (!NewThread(ThreadDNSAddressSeed, NULL))
- printf("Error: NewThread(ThreadDNSAddressSeed) failed\n");
+ threadGroup.create_thread(boost::bind(&TraceThread<boost::function<void()> >, "dnsseed", &ThreadDNSAddressSeed));
+#ifdef USE_UPNP
// Map ports with UPnP
- if (fUseUPnP)
- MapPort();
-
- // Get addresses from IRC and advertise ours
- if (!NewThread(ThreadIRCSeed, NULL))
- printf("Error: NewThread(ThreadIRCSeed) failed\n");
+ MapPort(GetBoolArg("-upnp", USE_UPNP));
+#endif
// Send and receive from sockets, accept connections
- if (!NewThread(ThreadSocketHandler, NULL))
- printf("Error: NewThread(ThreadSocketHandler) failed\n");
+ threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "net", &ThreadSocketHandler));
// Initiate outbound connections from -addnode
- if (!NewThread(ThreadOpenAddedConnections, NULL))
- printf("Error: NewThread(ThreadOpenAddedConnections) failed\n");
+ threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "addcon", &ThreadOpenAddedConnections));
// Initiate outbound connections
- if (!NewThread(ThreadOpenConnections, NULL))
- printf("Error: NewThread(ThreadOpenConnections) failed\n");
+ threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "opencon", &ThreadOpenConnections));
// Process messages
- if (!NewThread(ThreadMessageHandler, NULL))
- printf("Error: NewThread(ThreadMessageHandler) failed\n");
+ threadGroup.create_thread(boost::bind(&TraceThread<void (*)()>, "msghand", &ThreadMessageHandler));
// Dump network addresses
- if (!NewThread(ThreadDumpAddress, NULL))
- printf("Error; NewThread(ThreadDumpAddress) failed\n");
-
- // Generate coins in the background
- GenerateBitcoins(GetBoolArg("-gen", false), pwalletMain);
+ threadGroup.create_thread(boost::bind(&LoopForever<void (*)()>, "dumpaddr", &DumpAddresses, 10000));
}
bool StopNode()
{
printf("StopNode()\n");
- fShutdown = true;
+ GenerateBitcoins(false, NULL);
+ MapPort(false);
nTransactionsUpdated++;
- int64 nStart = GetTime();
if (semOutbound)
for (int i=0; i<MAX_OUTBOUND_CONNECTIONS; i++)
semOutbound->post();
- do
- {
- int nThreadsRunning = 0;
- for (int n = 0; n < THREAD_MAX; n++)
- nThreadsRunning += vnThreadsRunning[n];
- if (nThreadsRunning == 0)
- break;
- if (GetTime() - nStart > 20)
- break;
- Sleep(20);
- } while(true);
- if (vnThreadsRunning[THREAD_SOCKETHANDLER] > 0) printf("ThreadSocketHandler still running\n");
- if (vnThreadsRunning[THREAD_OPENCONNECTIONS] > 0) printf("ThreadOpenConnections still running\n");
- if (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0) printf("ThreadMessageHandler still running\n");
- if (vnThreadsRunning[THREAD_MINER] > 0) printf("ThreadBitcoinMiner still running\n");
- if (vnThreadsRunning[THREAD_RPCLISTENER] > 0) printf("ThreadRPCListener still running\n");
- if (vnThreadsRunning[THREAD_RPCHANDLER] > 0) printf("ThreadsRPCServer still running\n");
-#ifdef USE_UPNP
- if (vnThreadsRunning[THREAD_UPNP] > 0) printf("ThreadMapPort still running\n");
-#endif
- if (vnThreadsRunning[THREAD_DNSSEED] > 0) printf("ThreadDNSAddressSeed still running\n");
- if (vnThreadsRunning[THREAD_ADDEDCONNECTIONS] > 0) printf("ThreadOpenAddedConnections still running\n");
- if (vnThreadsRunning[THREAD_DUMPADDRESS] > 0) printf("ThreadDumpAddresses still running\n");
- while (vnThreadsRunning[THREAD_MESSAGEHANDLER] > 0 || vnThreadsRunning[THREAD_RPCHANDLER] > 0)
- Sleep(20);
- Sleep(50);
+ MilliSleep(50);
DumpAddresses();
+
return true;
}
@@ -2012,6 +1890,18 @@ public:
if (closesocket(hListenSocket) == SOCKET_ERROR)
printf("closesocket(hListenSocket) failed with error %d\n", WSAGetLastError());
+ // clean up some globals (to help leak detection)
+ BOOST_FOREACH(CNode *pnode, vNodes)
+ delete pnode;
+ BOOST_FOREACH(CNode *pnode, vNodesDisconnected)
+ delete pnode;
+ vNodes.clear();
+ vNodesDisconnected.clear();
+ delete semOutbound;
+ semOutbound = NULL;
+ delete pnodeLocalHost;
+ pnodeLocalHost = NULL;
+
#ifdef WIN32
// Shutdown Windows Sockets
WSACleanup();