From 607dbfdeaf7ec053d959c47c125d60c0b7e7216a Mon Sep 17 00:00:00 2001 From: Jeff Garzik Date: Thu, 15 Nov 2012 19:41:12 -0500 Subject: P2P: parse network datastream into header/data components in socket thread Replaces CNode::vRecv buffer with a vector of CNetMessage's. This simplifies ProcessMessages() and eliminates several redundant data copies. Overview: * socket thread now parses incoming message datastream into header/data components, as encapsulated by CNetMessage * socket thread adds each CNetMessage to a vector inside CNode * message thread (ProcessMessages) iterates through CNode's CNetMessage vector Message parsing is made more strict: * Socket is disconnected, if message larger than MAX_SIZE or if CMessageHeader deserialization fails (latter is impossible?). Previously, code would simply eat garbage data all day long. * Socket is disconnected, if we fail to find pchMessageStart. We do not search through garbage, to find pchMessageStart. Each message must begin precisely after the last message ends. ProcessMessages() always processes a complete message, and is more efficient: * buffer is always precisely sized, using CDataStream::resize(), rather than progressively sized in 64k chunks. More efficient for large messages like "block". * whole-buffer memory copy eliminated (vRecv -> vMsg) * other buffer-shifting memory copies eliminated (vRecv.insert, vRecv.erase) --- src/main.cpp | 65 +++++++++++++++++++++++++----------------------------------- 1 file changed, 27 insertions(+), 38 deletions(-) (limited to 'src/main.cpp') diff --git a/src/main.cpp b/src/main.cpp index 22baf0f3e..340614459 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -3168,7 +3168,7 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) else if (strCommand == "verack") { - pfrom->vRecv.SetVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); + pfrom->SetRecvVersion(min(pfrom->nVersion, PROTOCOL_VERSION)); } @@ -3705,13 +3705,13 @@ bool static ProcessMessage(CNode* pfrom, string strCommand, CDataStream& vRecv) return true; } +// requires LOCK(cs_vRecvMsg) bool ProcessMessages(CNode* pfrom) { - CDataStream& vRecv = pfrom->vRecv; - if (vRecv.empty()) + if (pfrom->vRecvMsg.empty()) return true; //if (fDebug) - // printf("ProcessMessages(%u bytes)\n", vRecv.size()); + // printf("ProcessMessages(%zu messages)\n", pfrom->vRecvMsg.size()); // // Message format @@ -3722,32 +3722,32 @@ bool ProcessMessages(CNode* pfrom) // (x) data // - loop + unsigned int nMsgPos = 0; + for (; nMsgPos < pfrom->vRecvMsg.size(); nMsgPos++) { // Don't bother if send buffer is too full to respond anyway if (pfrom->vSend.size() >= SendBufferSize()) break; - // Scan for message start - CDataStream::iterator pstart = search(vRecv.begin(), vRecv.end(), BEGIN(pchMessageStart), END(pchMessageStart)); - int nHeaderSize = vRecv.GetSerializeSize(CMessageHeader()); - if (vRecv.end() - pstart < nHeaderSize) - { - if ((int)vRecv.size() > nHeaderSize) - { - printf("\n\nPROCESSMESSAGE MESSAGESTART NOT FOUND\n\n"); - vRecv.erase(vRecv.begin(), vRecv.end() - nHeaderSize); - } + // get next message; end, if an incomplete message is found + CNetMessage& msg = pfrom->vRecvMsg[nMsgPos]; + + //if (fDebug) + // printf("ProcessMessages(message %u msgsz, %zu bytes, complete:%s)\n", + // msg.hdr.nMessageSize, msg.vRecv.size(), + // msg.complete() ? "Y" : "N"); + + if (!msg.complete()) break; + + // Scan for message start + if (memcmp(msg.hdr.pchMessageStart, pchMessageStart, sizeof(pchMessageStart)) != 0) { + printf("\n\nPROCESSMESSAGE: INVALID MESSAGESTART\n\n"); + return false; } - if (pstart - vRecv.begin() > 0) - printf("\n\nPROCESSMESSAGE SKIPPED %"PRIpdd" BYTES\n\n", pstart - vRecv.begin()); - vRecv.erase(vRecv.begin(), pstart); // Read header - vector vHeaderSave(vRecv.begin(), vRecv.begin() + nHeaderSize); - CMessageHeader hdr; - vRecv >> hdr; + CMessageHeader& hdr = msg.hdr; if (!hdr.IsValid()) { printf("\n\nPROCESSMESSAGE: ERRORS IN HEADER %s\n\n\n", hdr.GetCommand().c_str()); @@ -3757,19 +3757,9 @@ bool ProcessMessages(CNode* pfrom) // Message size unsigned int nMessageSize = hdr.nMessageSize; - if (nMessageSize > MAX_SIZE) - { - printf("ProcessMessages(%s, %u bytes) : nMessageSize > MAX_SIZE\n", strCommand.c_str(), nMessageSize); - continue; - } - if (nMessageSize > vRecv.size()) - { - // Rewind and wait for rest of message - vRecv.insert(vRecv.begin(), vHeaderSave.begin(), vHeaderSave.end()); - break; - } // Checksum + CDataStream& vRecv = msg.vRecv; uint256 hash = Hash(vRecv.begin(), vRecv.begin() + nMessageSize); unsigned int nChecksum = 0; memcpy(&nChecksum, &hash, sizeof(nChecksum)); @@ -3780,17 +3770,13 @@ bool ProcessMessages(CNode* pfrom) continue; } - // Copy message to its own buffer - CDataStream vMsg(vRecv.begin(), vRecv.begin() + nMessageSize, vRecv.nType, vRecv.nVersion); - vRecv.ignore(nMessageSize); - // Process message bool fRet = false; try { { LOCK(cs_main); - fRet = ProcessMessage(pfrom, strCommand, vMsg); + fRet = ProcessMessage(pfrom, strCommand, vRecv); } if (fShutdown) return true; @@ -3822,7 +3808,10 @@ bool ProcessMessages(CNode* pfrom) printf("ProcessMessage(%s, %u bytes) FAILED\n", strCommand.c_str(), nMessageSize); } - vRecv.Compact(); + // remove processed messages; one incomplete message may remain + if (nMsgPos > 0) + pfrom->vRecvMsg.erase(pfrom->vRecvMsg.begin(), + pfrom->vRecvMsg.begin() + nMsgPos); return true; } -- cgit v1.2.3