summaryrefslogtreecommitdiff
path: root/utils/vmpi/vmpi_filesystem_master.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'utils/vmpi/vmpi_filesystem_master.cpp')
-rw-r--r--utils/vmpi/vmpi_filesystem_master.cpp1606
1 files changed, 1606 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_filesystem_master.cpp b/utils/vmpi/vmpi_filesystem_master.cpp
new file mode 100644
index 0000000..f047bf7
--- /dev/null
+++ b/utils/vmpi/vmpi_filesystem_master.cpp
@@ -0,0 +1,1606 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+//=============================================================================//
+
+#include <winsock2.h>
+#include "vmpi_filesystem_internal.h"
+#include "zlib.h"
+#include "vstdlib/random.h"
+
+
+#define MINIMUM_SLEEP_MS 1
+
+// NOTE: This number comes from measurements on our network to find out how fast
+// we can broadcast without the network freaking out.
+//
+// This number can be changed on the command line with the -mpi_FileTransmitRate parameter.
+int MULTICAST_TRANSMIT_RATE = (1024*1000); // N megs per second
+
+// Defines when we'll stop transmitting a file to a client.
+// (After we've transmitted the file to the client N times and we haven't heard an ack back for M seconds).
+#define MIN_FILE_CYCLE_COUNT 5
+#define CLIENT_FILE_ACK_TIMEOUT 20
+
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// Global helpers.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+static void SendMulticastIP( const CIPAddr *pAddr )
+{
+ unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_MULTICAST_ADDR };
+ VMPI_Send2Chunks(
+ packetID, sizeof( packetID ),
+ pAddr, sizeof( *pAddr ),
+ VMPI_PERSISTENT );
+}
+
+
+static bool IsOpeningForWriteAccess( const char *pOptions )
+{
+ return strchr( pOptions, 'w' ) || strchr( pOptions, 'a' ) || strchr( pOptions, '+' );
+}
+
+
+// This does a fast zlib compression of the source data into the 'out' buffer.
+static bool ZLibCompress( const void *pData, int len, CUtlVector<char> &out )
+{
+ if ( len == 0 )
+ {
+ out.Purge();
+ return true;
+ }
+
+ int outStartLen = len;
+RETRY:;
+
+ // Prepare the compression stream.
+ z_stream zs;
+ memset( &zs, 0, sizeof( zs ) );
+
+ if ( deflateInit( &zs, 1 ) != Z_OK )
+ return false;
+
+
+ // Now compress it into the output buffer.
+ out.SetSize( outStartLen );
+
+ zs.next_in = (unsigned char*)pData;
+ zs.avail_in = len;
+
+ zs.next_out = (unsigned char*)out.Base();
+ zs.avail_out = out.Count();
+
+ int ret = deflate( &zs, Z_FINISH );
+ deflateEnd( &zs );
+
+ if ( ret == Z_STREAM_END )
+ {
+ // Get rid of whatever was left over.
+ out.RemoveMultiple( zs.total_out, out.Count() - zs.total_out );
+ return true;
+ }
+ else if ( ret == Z_OK )
+ {
+ // Need more space in the output buffer.
+ outStartLen += 1024 * 128;
+ goto RETRY;
+ }
+ else
+ {
+ return false;
+ }
+}
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// CVMPIFile_PassThru
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+class CVMPIFile_PassThru : public IVMPIFile
+{
+public:
+ void Init( IBaseFileSystem *pPassThru, FileHandle_t fp )
+ {
+ m_pPassThru = pPassThru;
+ m_fp = fp;
+ }
+
+ virtual void Close()
+ {
+ m_pPassThru->Close( m_fp );
+ delete this;
+ }
+
+ virtual void Seek( int pos, FileSystemSeek_t seekType )
+ {
+ m_pPassThru->Seek( m_fp, pos, seekType );
+ }
+
+ virtual unsigned int Tell()
+ {
+ return m_pPassThru->Tell( m_fp );
+ }
+
+ virtual unsigned int Size()
+ {
+ return m_pPassThru->Size( m_fp );
+ }
+
+ virtual void Flush()
+ {
+ m_pPassThru->Flush( m_fp );
+ }
+
+ virtual int Read( void* pOutput, int size )
+ {
+ return m_pPassThru->Read( pOutput, size, m_fp );
+ }
+
+ virtual int Write( void const* pInput, int size )
+ {
+ return m_pPassThru->Write( pInput, size, m_fp );
+ }
+
+
+private:
+ IBaseFileSystem *m_pPassThru;
+ FileHandle_t m_fp;
+};
+
+
+
+// ---------------------------------------------------------------------------------------------------- //
+// CTransmitRateMgr coordinates with any other currently-running VMPI jobs, and they all will cut
+// down their transmission rate to stay within MULTICAST_TRANSMIT_RATE.
+// ---------------------------------------------------------------------------------------------------- //
+
+#define TRANSMITRATEMGR_BROADCAST_INVERVAL (1.0 / 3.0) // How many times per second we broadcast our presence.
+#define TRANSMITRATEMGR_EXPIRE_TIME 0.7 // How long it'll go before deciding a guy is not transmitting anymore.
+
+static char s_cTransmitRateMgrPacket[] = {2,6,-3,2,1,-66};
+
+class CTransmitRateMgr
+{
+public:
+ CTransmitRateMgr();
+
+ void ReadPackets();
+ void BroadcastPresence();
+
+ double GetMicrosecondsPerByte() const;
+
+private:
+ class CMachineRecord
+ {
+ public:
+ unsigned long m_UniqueID;
+ float m_flLastTime;
+ };
+ CUtlVector<CMachineRecord> m_MachineRecords;
+
+ unsigned long m_UniqueID;
+ float m_flLastBroadcastTime;
+ double m_nMicrosecondsPerByte;
+ ISocket *m_pSocket;
+};
+
+CTransmitRateMgr::CTransmitRateMgr()
+{
+ m_nMicrosecondsPerByte = 1000000.0 / (double)MULTICAST_TRANSMIT_RATE;
+ m_flLastBroadcastTime = 0;
+
+ // Build a (hopefully) unique ID.
+ m_UniqueID = (unsigned long)this;
+ CCycleCount cnt;
+ cnt.Sample();
+ m_UniqueID += cnt.GetMicroseconds();
+ Sleep( 1 );
+ m_UniqueID += cnt.GetMicroseconds();
+
+ m_pSocket = CreateIPSocket();
+ if ( m_pSocket )
+ {
+ m_pSocket->BindToAny( VMPI_MASTER_FILESYSTEM_BROADCAST_PORT );
+ }
+}
+
+void CTransmitRateMgr::ReadPackets()
+{
+ if ( !m_pSocket )
+ return;
+
+ float flCurTime = Plat_FloatTime();
+
+ // First, update/add records.
+ while ( 1 )
+ {
+ char data[512];
+ CIPAddr ipFrom;
+ int len = m_pSocket->RecvFrom( data, sizeof( data ), &ipFrom );
+ if ( len == -1 )
+ break;
+
+ if ( len == sizeof( s_cTransmitRateMgrPacket ) + sizeof( unsigned long ) &&
+ memcmp( data, s_cTransmitRateMgrPacket, sizeof( s_cTransmitRateMgrPacket ) ) == 0 )
+ {
+ unsigned long id = *((unsigned long*)&data[sizeof(s_cTransmitRateMgrPacket)]);
+ if ( id == m_UniqueID )
+ continue;
+
+ int i;
+ for ( i=0; i < m_MachineRecords.Count(); i++ )
+ {
+ if ( m_MachineRecords[i].m_UniqueID == id )
+ {
+ m_MachineRecords[i].m_flLastTime = flCurTime;
+ break;
+ }
+ }
+
+ if ( i == m_MachineRecords.Count() )
+ {
+ int index = m_MachineRecords.AddToTail();
+ m_MachineRecords[index].m_UniqueID = id;
+ m_MachineRecords[index].m_flLastTime = flCurTime;
+ }
+ }
+ }
+
+ // Now, expire any old records.
+ for ( int i=0; i < m_MachineRecords.Count(); i++ )
+ {
+ if ( (flCurTime - m_MachineRecords[i].m_flLastTime) > TRANSMITRATEMGR_EXPIRE_TIME )
+ {
+ m_MachineRecords.Remove( i );
+ --i;
+ }
+ }
+
+ // Recalculate our transmit rate (assuming we're receiving our own broadcast packets).
+ m_nMicrosecondsPerByte = 1000000.0 / (double)(MULTICAST_TRANSMIT_RATE / (m_MachineRecords.Count() + 1));
+}
+
+void CTransmitRateMgr::BroadcastPresence()
+{
+ if ( !m_pSocket )
+ return;
+
+ float flCurTime = Plat_FloatTime();
+ if ( (flCurTime - m_flLastBroadcastTime) < TRANSMITRATEMGR_BROADCAST_INVERVAL )
+ return;
+
+ m_flLastBroadcastTime = flCurTime;
+
+ char cData[sizeof( s_cTransmitRateMgrPacket ) + sizeof( unsigned long )];
+ memcpy( cData, s_cTransmitRateMgrPacket, sizeof( s_cTransmitRateMgrPacket ) );
+ *((unsigned long*)&cData[ sizeof( s_cTransmitRateMgrPacket ) ] ) = m_UniqueID;
+
+ m_pSocket->Broadcast( cData, sizeof( cData ), VMPI_MASTER_FILESYSTEM_BROADCAST_PORT );
+}
+
+inline double CTransmitRateMgr::GetMicrosecondsPerByte() const
+{
+ return m_nMicrosecondsPerByte;
+}
+
+
+// ---------------------------------------------------------------------------------------------------- //
+// CRateLimiter manages waiting for small periods of time between packets so the rate is
+// whatever we want it to be.
+//
+// It also will give up some CPU time to other processes every 50 milliseconds.
+// ---------------------------------------------------------------------------------------------------- //
+
+class CRateLimiter
+{
+public:
+
+ CRateLimiter();
+
+ void GiveUpTimeSlice();
+ void NoteExcessTimeTaken( unsigned long excessTimeInMicroseconds );
+
+
+public:
+
+ DWORD m_SleepIntervalMS; // Give up a timeslice every N milliseconds.
+
+ // Since we sleep once in a while, we time how long the sleep took and we beef
+ // up the transmit rate until we've accounted for the time lost during the sleep.
+ DWORD m_AccumulatedSleepMicroseconds;
+
+ // When was the last time we gave up a little bit of CPU to other programs.
+ CCycleCount m_LastSleepTime;
+};
+
+CRateLimiter::CRateLimiter()
+{
+ m_SleepIntervalMS = 50;
+ m_AccumulatedSleepMicroseconds = 0;
+ m_LastSleepTime.Sample();
+}
+
+void CRateLimiter::GiveUpTimeSlice()
+{
+ // Sleep again?
+ CCycleCount currentTime, dtSinceLastSleep;
+ currentTime.Sample();
+ CCycleCount::Sub( currentTime, m_LastSleepTime, dtSinceLastSleep );
+
+ if ( dtSinceLastSleep.GetMilliseconds() >= m_SleepIntervalMS )
+ {
+ CFastTimer sleepTimer;
+
+ sleepTimer.Start();
+ Sleep( 10 );
+ sleepTimer.End();
+
+ m_AccumulatedSleepMicroseconds += sleepTimer.GetDuration().GetMicroseconds();
+ m_LastSleepTime.Sample();
+ }
+}
+
+
+void CRateLimiter::NoteExcessTimeTaken( unsigned long excessTimeInMicroseconds )
+{
+ // Note: we give up time slices above.
+ if ( excessTimeInMicroseconds > m_AccumulatedSleepMicroseconds )
+ {
+ excessTimeInMicroseconds -= m_AccumulatedSleepMicroseconds;
+ m_AccumulatedSleepMicroseconds = 0;
+
+ CCycleCount startCount;
+ startCount.Sample();
+ while ( 1 )
+ {
+ CCycleCount curCount, diff;
+ curCount.Sample();
+
+ CCycleCount::Sub( curCount, startCount, diff );
+ if ( diff.GetMicroseconds() >= excessTimeInMicroseconds )
+ break;
+ }
+ }
+ else
+ {
+ m_AccumulatedSleepMicroseconds -= excessTimeInMicroseconds;
+ excessTimeInMicroseconds = 0;
+ }
+}
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// CMasterMulticastThread.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+class CMasterMulticastThread
+{
+public:
+
+ CMasterMulticastThread();
+ ~CMasterMulticastThread();
+
+ // This creates the socket and starts the thread (initially in an idle state since it doesn't
+ // know of any files anyone wants).
+ bool Init( IBaseFileSystem *pPassThru, unsigned short localPort, const CIPAddr *pAddr, int maxFileSystemMemoryUsage );
+ void Term();
+
+ // Returns -1 if there is an error.
+ int FindOrAddFile( const char *pFilename, const char *pPathID );
+ const CUtlVector<char>& GetFileData( int iFile ) const;
+
+ // When a client requests a files, this is called to tell the thread to start
+ // adding chunks from the specified file into the queue it's multicasting.
+ //
+ // Returns -1 if the file isn't there. Otherwise, it returns the file ID
+ // that will be sent along with the file's chunks in the multicast packets.
+ int AddFileRequest( const char *pFilename, const char *pPathID, int clientID, bool *bZeroLength );
+
+ // As each client receives multicasted chunks, they ack them so the master can
+ // stop transmitting any chunks it knows nobody wants.
+ void OnChunkReceived( int fileID, int clientID, int iChunk );
+ void OnFileReceived( int fileID, int clientID );
+
+ // Call this if a client disconnects so it can stop sending anything this client wants.
+ void OnClientDisconnect( int clientID, bool bGrabCriticalSection=true );
+
+ void CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength );
+
+private:
+
+ class CChunkInfo
+ {
+ public:
+ unsigned short m_iChunk;
+ unsigned short m_RefCount; // How many clients want this chunk.
+ unsigned short m_iActiveChunksIndex; // Index into m_ActiveChunks.
+ };
+
+
+ // This stores a client's reference to a file so it knows which pieces of the file the client needs.
+ class CClientFileInfo
+ {
+ public:
+ bool NeedsChunk( int i ) const { return (m_ChunksToSend[i>>3] & (1 << (i&7))) != 0; }
+
+ public:
+ int m_ClientID;
+ CUtlVector<unsigned char> m_ChunksToSend; // One bit for each chunk that this client still wants.
+ int m_nChunksLeft;
+
+ // TCP transmission only.
+ int m_TCP_LastChunkAcked;
+ int m_TCP_LastChunkSent;
+
+ float m_flTransmitStartTime;
+
+ float m_flLastAckTime; // Last time we heard an ack back from this client about this file.
+ // If this goes for too long, then we assume that the client is
+ // in a screwed state, and we stop sending the file to him.
+ int m_nTimesFileCycled; // How many times has the master multicast thread cycled over this file?
+ // We won't kick the client until we've cycled over the file a few times
+ // after the client asked for it.
+ };
+
+
+ class CMulticastFile
+ {
+ public:
+ ~CMulticastFile()
+ {
+ m_Clients.PurgeAndDeleteElements();
+ }
+
+ const char* GetFilename() { return m_Filename.Base(); }
+ const char* GetPathID() { return m_PathID.Base(); }
+
+
+ public:
+ int m_nCycles; // How many times has the multicast thread visited this file?
+
+ // This is sent along with every packet. If a client gets a chunk and doesn't have that file's
+ // info, the client will receive that file too.
+ CUtlVector<char> m_Filename;
+ CUtlVector<char> m_PathID;
+
+ CMulticastFileInfo m_Info;
+
+ // This is stored so the app can read out the uncompressed data.
+ CUtlVector<char> m_UncompressedData;
+
+ // zlib-compressed file data
+ CUtlVector<char> m_Data;
+
+ // This gets set to false if we run over our memory limit and start caching file data out.
+ // Then it'll reload the data if a client requests the file.
+ bool m_bDataLoaded;
+
+ // m_Chunks holds the chunks by index.
+ // m_ActiveChunks holds them sorted by whether they're active or not.
+ //
+ // Each chunk has a refcount. While the refcount is > 0, the chunk is in the first
+ // half of m_ActiveChunks. When the refcount gets to 0, the chunk is moved to the end of
+ // m_ActiveChunks. That way, we can iterate through the chunks that need to be sent and
+ // stop iterating the first time we hit one with a refcount of 0.
+ CUtlVector<CChunkInfo> m_Chunks;
+ CUtlLinkedList<CChunkInfo*,int> m_ActiveChunks;
+
+ // This tells which clients want pieces of this file.
+ CUtlLinkedList<CClientFileInfo*,int> m_Clients;
+ };
+
+
+private:
+
+ static DWORD WINAPI StaticMulticastThread( LPVOID pParameter );
+ DWORD MulticastThread();
+
+ bool CheckClientTimeouts();
+ bool Thread_SendFileChunk_Multicast( int *pnBytesSent );
+ void Thread_SeekToNextActiveChunk();
+
+ // In TCP mode, we send new chunks as they are acked.
+ void TCP_SendNextChunk( CMulticastFile *pFile, CClientFileInfo *pClient );
+
+ void EnsureMemoryLimit( CMulticastFile *pIgnore );
+
+ // Called after pFile->m_UncompressedData has been setup. This compresses the data, prepares the header,
+ // copies the filename, and adds it into the queue for the multicast thread.
+ int FinishFileSetup( CMulticastFile *pFile, const char *pFilename, const char *pPathID, bool bFileAlreadyExisted );
+
+ void IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk );
+ void DecrementChunkRefCount( int iFile, int iChunk );
+
+ int FindFile( const char *pFilename, const char *pPathID );
+
+ bool FindWarningSuppression( const char *pFilename );
+ void AddWarningSuppression( const char *pFilename );
+
+private:
+
+ CUtlLinkedList<CMulticastFile*,int> m_Files;
+
+ unsigned long m_nCurMemoryUsage; // Total of all the file data we have loaded.
+ unsigned long m_nMaxMemoryUsage; // 0 means that there is no limit.
+
+ // This tracks how many chunks we have that want to be sent.
+ int m_nTotalActiveChunks;
+
+ SOCKET m_Socket;
+ sockaddr_in m_MulticastAddr;
+
+ HANDLE m_hMainThread;
+ IBaseFileSystem *m_pPassThru;
+
+ HANDLE m_hThread;
+ CRITICAL_SECTION m_CS;
+
+ // Events used to communicate with our thread.
+ HANDLE m_hTermEvent;
+
+ // The thread walks through this as it spews chunks of data.
+ volatile int m_iCurFile; // Index into m_Files.
+ volatile int m_iCurActiveChunk; // Current index into CMulticastFile::m_ActiveChunks.
+
+ CUtlLinkedList<char*,int> m_WarningSuppressions;
+};
+
+
+CMasterMulticastThread::CMasterMulticastThread()
+{
+ m_hThread = m_hMainThread = NULL;
+ m_Socket = INVALID_SOCKET;
+ m_nTotalActiveChunks = 0;
+ m_iCurFile = m_iCurActiveChunk = -1;
+ m_pPassThru = NULL;
+
+ m_hTermEvent = CreateEvent( NULL, FALSE, FALSE, NULL );
+ InitializeCriticalSection( &m_CS );
+ m_nCurMemoryUsage = m_nMaxMemoryUsage = 0;
+}
+
+
+CMasterMulticastThread::~CMasterMulticastThread()
+{
+ Term();
+
+ CloseHandle( m_hTermEvent );
+
+ DeleteCriticalSection( &m_CS );
+}
+
+
+bool CMasterMulticastThread::Init( IBaseFileSystem *pPassThru, unsigned short localPort, const CIPAddr *pAddr, int maxMemoryUsage )
+{
+ Term();
+
+ m_nMaxMemoryUsage = maxMemoryUsage;
+ Assert( m_nCurMemoryUsage == 0 );
+ m_nCurMemoryUsage = 0;
+
+ if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
+ {
+ // No need for an extra socket in this mode.
+ m_Socket = INVALID_SOCKET;
+ }
+ else
+ {
+ // First, create our socket.
+ m_Socket = socket( AF_INET, SOCK_DGRAM, IPPROTO_IP );
+ if ( m_Socket == INVALID_SOCKET )
+ {
+ Warning( "CMasterMulticastThread::Init - socket() failed\n" );
+ return false;
+ }
+
+ // Bind to INADDR_ANY.
+ CIPAddr localAddr( 0, 0, 0, 0, localPort );
+
+ sockaddr_in addr;
+ IPAddrToSockAddr( &localAddr, &addr );
+
+ int status = bind( m_Socket, (sockaddr*)&addr, sizeof(addr) );
+ if ( status != 0 )
+ {
+ Term();
+ Warning( "CMasterMulticastThread::Init - bind( %d.%d.%d.%d:%d ) failed\n", EXPAND_ADDR( *pAddr ) );
+ return false;
+ }
+
+ if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST )
+ {
+ // Set up for broadcast
+ BOOL bBroadcast = TRUE;
+ if ( setsockopt( m_Socket, SOL_SOCKET, SO_BROADCAST, (char*)&bBroadcast, bBroadcast ) == SOCKET_ERROR )
+ {
+ Term();
+ Warning( "CMasterMulticastThread::Init - setsockopt() failed to set broadcast mode\n" );
+ return false;
+ }
+ }
+
+ // Remember the address we want to send to.
+ IPAddrToSockAddr( pAddr, &m_MulticastAddr );
+
+ // Now create our thread.
+ DWORD dwThreadID = 0;
+ m_hThread = CreateThread( NULL, 0, &CMasterMulticastThread::StaticMulticastThread, this, 0, &dwThreadID );
+ if ( !m_hThread )
+ {
+ Term();
+ Warning( "CMasterMulticastThread::Init - CreateThread failed\n" );
+ return false;
+ }
+
+ SetThreadPriority( m_hThread, THREAD_PRIORITY_LOWEST );
+ }
+
+ // For debug mode to verify that we don't try to open files while in another thread.
+ m_hMainThread = GetCurrentThread();
+
+ m_pPassThru = pPassThru;
+ return true;
+}
+
+
+void CMasterMulticastThread::Term()
+{
+ // Stop the thread if it is running.
+ if ( m_hThread )
+ {
+ SetEvent( m_hTermEvent );
+ WaitForSingleObject( m_hThread, INFINITE );
+ CloseHandle( m_hThread );
+
+ m_hThread = NULL;
+ }
+
+ // Close the socket.
+ if ( m_Socket != INVALID_SOCKET )
+ {
+ closesocket( m_Socket );
+ m_Socket = INVALID_SOCKET;
+ }
+
+ // Free up other data.
+ m_Files.PurgeAndDeleteElements();
+ m_nCurMemoryUsage = m_nMaxMemoryUsage = 0;
+}
+
+
+void CMasterMulticastThread::TCP_SendNextChunk( CMulticastFile *pFile, CClientFileInfo *pClient )
+{
+ // No more chunks to send?
+ if ( (pClient->m_TCP_LastChunkSent+1) >= pFile->m_Info.m_nChunks )
+ return;
+
+ // Figure out what data we'd be sending.
+ int iChunkToSend = pClient->m_TCP_LastChunkSent + 1;
+ int iStartByte = iChunkToSend * TCP_CHUNK_PAYLOAD_SIZE;
+ int iEndByte = min( iStartByte + TCP_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() );
+
+ // If the start point is past the end, then we're done sending the file to this client.
+ if ( iStartByte >= pFile->m_Data.Count() )
+ return;
+
+ // Record that we sent this data.
+ pClient->m_TCP_LastChunkSent = iChunkToSend;
+
+ // Assemble the packet.
+ unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_CHUNK };
+
+ const void *chunks[5] =
+ {
+ cPacket,
+ &pFile->m_Info,
+ &iChunkToSend,
+ pFile->GetFilename(),
+ &pFile->m_Data[iStartByte]
+ };
+
+ int chunkLengths[5] =
+ {
+ sizeof( cPacket ),
+ sizeof( pFile->m_Info ),
+ sizeof( m_iCurActiveChunk ),
+ strlen( pFile->GetFilename() ) + 1,
+ iEndByte - iStartByte
+ };
+
+ VMPI_SendChunks( chunks, chunkLengths, 5, pClient->m_ClientID );
+}
+
+
+int CMasterMulticastThread::AddFileRequest( const char *pFilename, const char *pPathID, int clientID, bool *bZeroLength )
+{
+ // Firstly, do we already have this file?
+ int iFile = FindOrAddFile( pFilename, pPathID );
+ if ( iFile == -1 )
+ return -1;
+
+ CMulticastFile *pFile = m_Files[iFile];
+
+ // Now that we have a file setup, merge in this client's info.
+ EnterCriticalSection( &m_CS );
+
+ CClientFileInfo *pClient = new CClientFileInfo;
+ pClient->m_TCP_LastChunkAcked = -1;
+ pClient->m_TCP_LastChunkSent = -1;
+ pClient->m_ClientID = clientID;
+ pClient->m_flLastAckTime = Plat_FloatTime();
+ pClient->m_flTransmitStartTime = pClient->m_flLastAckTime;
+ pClient->m_nTimesFileCycled = 0;
+ pClient->m_nChunksLeft = pFile->m_Info.m_nChunks;
+ pClient->m_ChunksToSend.SetSize( PAD_NUMBER( pFile->m_Info.m_nChunks, 8 ) / 8 );
+ memset( pClient->m_ChunksToSend.Base(), 0xFF, pClient->m_ChunksToSend.Count() );
+ pFile->m_Clients.AddToTail( pClient );
+
+ for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
+ {
+ IncrementChunkRefCount( pFile, i );
+ }
+
+ // In TCP mode, let's get the sliding window started..
+ if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
+ {
+ for ( int iDepth=0; iDepth < TCP_CHUNK_QUEUE_LEN; iDepth++ )
+ TCP_SendNextChunk( pFile, pClient );
+ }
+
+ LeaveCriticalSection( &m_CS );
+
+ *bZeroLength = (pFile->m_Info.m_UncompressedSize == 0);
+
+ return iFile;
+}
+
+
+void CMasterMulticastThread::OnChunkReceived( int fileID, int clientID, int iChunk )
+{
+ if ( !m_Files.IsValidIndex( fileID ) )
+ {
+ Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
+ return;
+ }
+
+ CMulticastFile *pFile = m_Files[fileID];
+ CClientFileInfo *pClient = NULL;
+ FOR_EACH_LL( pFile->m_Clients, iClient )
+ {
+ if ( pFile->m_Clients[iClient]->m_ClientID == clientID )
+ {
+ pClient = pFile->m_Clients[iClient];
+ break;
+ }
+ }
+ if ( !pClient )
+ {
+ // This will spam sometimes if a worker stops responding and we timeout on it,
+ // but then it comes back alive and starts responding. So let's ignore its packets silently.
+ //Warning( "CMasterMulticastThread::OnChunkReceived: invalid client ID (%d) for file %s\n", clientID, pFile->GetFilename() );
+ return;
+ }
+
+ if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
+ {
+ // Send the next chunk, if there is one.
+ EnterCriticalSection( &m_CS );
+ TCP_SendNextChunk( pFile, pClient );
+ LeaveCriticalSection( &m_CS );
+ }
+ else
+ {
+ if ( !pFile->m_Chunks.IsValidIndex( iChunk ) )
+ {
+ Warning( "CMasterMulticastThread::OnChunkReceived: invalid chunk index (%d) for file %s\n", iChunk, pFile->GetFilename() );
+ return;
+ }
+
+ // Mark that this client doesn't need this chunk anymore.
+ pClient->m_ChunksToSend[iChunk >> 3] &= ~(1 << (iChunk & 7));
+ pClient->m_nChunksLeft--;
+
+ pClient->m_flLastAckTime = Plat_FloatTime();
+ if ( pClient->m_nChunksLeft == 0 && g_iVMPIVerboseLevel >= 2 )
+ Warning( "Client %d got file %s\n", clientID, pFile->GetFilename() );
+
+ EnterCriticalSection( &m_CS );
+ DecrementChunkRefCount( fileID, iChunk );
+ LeaveCriticalSection( &m_CS );
+ }
+}
+
+
+void CMasterMulticastThread::OnFileReceived( int fileID, int clientID )
+{
+ if ( !m_Files.IsValidIndex( fileID ) )
+ {
+ Warning( "CMasterMulticastThread::OnChunkReceived: invalid file (%d) from client %d\n", fileID, clientID );
+ return;
+ }
+
+ CMulticastFile *pFile = m_Files[fileID];
+ for ( int i=0; i < pFile->m_Info.m_nChunks; i++ )
+ OnChunkReceived( fileID, clientID, i );
+}
+
+
+void CMasterMulticastThread::OnClientDisconnect( int clientID, bool bGrabCriticalSection )
+{
+ if ( bGrabCriticalSection )
+ EnterCriticalSection( &m_CS );
+
+ // Remove all references from this client.
+ FOR_EACH_LL( m_Files, iFile )
+ {
+ CMulticastFile *pFile = m_Files[iFile];
+
+ FOR_EACH_LL( pFile->m_Clients, iClient )
+ {
+ CClientFileInfo *pClient = pFile->m_Clients[iClient];
+
+ if ( pClient->m_ClientID != clientID )
+ continue;
+
+ // Ok, this is our man. Decrement the refcount of any chunks this client wanted.
+ for ( int iChunk=0; iChunk < pFile->m_Info.m_nChunks; iChunk++ )
+ {
+ if ( pClient->NeedsChunk( iChunk ) )
+ {
+ DecrementChunkRefCount( iFile, iChunk );
+ }
+ }
+
+ delete pClient;
+ pFile->m_Clients.Remove( iClient );
+
+ break;
+ }
+ }
+
+ if ( bGrabCriticalSection )
+ LeaveCriticalSection( &m_CS );
+}
+
+
+void CMasterMulticastThread::CreateVirtualFile( const char *pFilename, const void *pData, unsigned long fileLength )
+{
+ const char *pPathID = VMPI_VIRTUAL_FILES_PATH_ID;
+
+ int iFile = FindFile( pFilename, pPathID );
+ if ( iFile != -1 )
+ Error( "CMasterMulticastThread::CreateVirtualFile( %s ) - file already exists!", pFilename );
+
+ CMulticastFile *pFile = new CMulticastFile;
+ pFile->m_UncompressedData.CopyArray( (const char*)pData, fileLength );
+
+ FinishFileSetup( pFile, pFilename, pPathID, false );
+}
+
+
+DWORD WINAPI CMasterMulticastThread::StaticMulticastThread( LPVOID pParameter )
+{
+ return ((CMasterMulticastThread*)pParameter)->MulticastThread();
+}
+
+
+bool CMasterMulticastThread::CheckClientTimeouts()
+{
+ bool bRet = false;
+ CMulticastFile *pFile = m_Files[m_iCurFile];
+
+ float flCurTime = Plat_FloatTime();
+
+ int iNext;
+ for( int iCur=pFile->m_Clients.Head(); iCur != pFile->m_Clients.InvalidIndex(); iCur=iNext )
+ {
+ iNext = pFile->m_Clients.Next( iCur );
+ CClientFileInfo *pInfo = pFile->m_Clients[iCur];
+
+ // If the client has already fully received this file, don't bother timing out on it.
+ if ( pInfo->m_nChunksLeft == 0 )
+ continue;
+
+ ++pInfo->m_nTimesFileCycled;
+ if ( pInfo->m_nTimesFileCycled >= MIN_FILE_CYCLE_COUNT && (flCurTime - pInfo->m_flLastAckTime) > CLIENT_FILE_ACK_TIMEOUT )
+ {
+ // For debug output, get the most recent time we heard any ack from this client at all.
+ float flMostRecentTime = pInfo->m_flLastAckTime;
+ FOR_EACH_LL( m_Files, iTestFile )
+ {
+ CMulticastFile *pTestFile = m_Files[iTestFile];
+ FOR_EACH_LL( pTestFile->m_Clients, iTestClient )
+ {
+ if ( pTestFile->m_Clients[iTestClient]->m_ClientID == pInfo->m_ClientID )
+ {
+ flMostRecentTime = max( flMostRecentTime, pTestFile->m_Clients[iTestClient]->m_flLastAckTime );
+ }
+ }
+ }
+
+ Warning( "\nClient %s timed out on file %s (latest: %.2f / cur: %.2f).\n",
+ VMPI_GetMachineName( pInfo->m_ClientID ), pFile->GetFilename(), flMostRecentTime, flCurTime );
+ OnClientDisconnect( pInfo->m_ClientID, false );
+ bRet = true; // yes, we booted a client.
+ }
+ }
+
+ return bRet;
+}
+
+inline bool CMasterMulticastThread::Thread_SendFileChunk_Multicast( int *pnBytesSent )
+{
+ // Send the next chunk (file, size, time, chunk data).
+ CMulticastFile *pFile = m_Files[m_iCurFile];
+
+ int iStartByte = m_iCurActiveChunk * MULTICAST_CHUNK_PAYLOAD_SIZE;
+ int iEndByte = min( iStartByte + MULTICAST_CHUNK_PAYLOAD_SIZE, pFile->m_Data.Count() );
+
+ WSABUF bufs[4];
+ bufs[0].buf = (char*)&pFile->m_Info;
+ bufs[0].len = sizeof( pFile->m_Info );
+
+ bufs[1].buf = (char*)&m_iCurActiveChunk;
+ bufs[1].len = sizeof( m_iCurActiveChunk );
+
+ bufs[2].buf = (char*)pFile->GetFilename();
+ bufs[2].len = strlen( pFile->GetFilename() ) + 1;
+
+ bufs[3].buf = &pFile->m_Data[iStartByte];
+ bufs[3].len = iEndByte - iStartByte;
+
+ DWORD nBytesSent = 0;
+ DWORD nWantedBytes = ( bufs[0].len + bufs[1].len + bufs[2].len + bufs[3].len );
+ bool bSuccess;
+
+ if ( m_MulticastAddr.sin_addr.S_un.S_un_b.s_b1 == 127 &&
+ m_MulticastAddr.sin_addr.S_un.S_un_b.s_b2 == 0 &&
+ m_MulticastAddr.sin_addr.S_un.S_un_b.s_b3 == 0 &&
+ m_MulticastAddr.sin_addr.S_un.S_un_b.s_b4 == 1 )
+ {
+ // For some mysterious reason, WSASendTo only sends the first buffer
+ // if we're sending to 127.0.0.1 (ie: in local mode).
+ char allData[1024*8];
+ if ( nWantedBytes > sizeof( allData ) )
+ Error( "nWantedBytes > sizeof( allData )" );
+
+ memcpy( &allData[0], bufs[0].buf, bufs[0].len );
+ memcpy( &allData[bufs[0].len], bufs[1].buf, bufs[1].len );
+ memcpy( &allData[bufs[0].len+bufs[1].len], bufs[2].buf, bufs[2].len );
+ memcpy( &allData[bufs[0].len+bufs[1].len+bufs[2].len], bufs[3].buf, bufs[3].len );
+ int ret = sendto( m_Socket, allData, nWantedBytes, 0, (sockaddr*)&m_MulticastAddr, sizeof( m_MulticastAddr ) );
+ bSuccess = (ret == (int)nWantedBytes);
+ }
+ else
+ {
+ WSASendTo(
+ m_Socket,
+ bufs,
+ ARRAYSIZE( bufs ),
+ &nBytesSent,
+ 0,
+ (sockaddr*)&m_MulticastAddr,
+ sizeof( m_MulticastAddr ),
+ NULL,
+ NULL );
+ bSuccess = (nBytesSent == nWantedBytes);
+ }
+
+ // Handle errors.. let it get a few errors, then quit.
+ if ( bSuccess )
+ {
+ *pnBytesSent = (int)nBytesSent;
+ }
+ else
+ {
+ static int nWarnings = 0;
+ ++nWarnings;
+ if ( nWarnings < 10 )
+ {
+ Warning( "\nMulticastThread: WSASendTo with %d bytes sent %d bytes.\n", nWantedBytes, nBytesSent );
+
+ char *lpMsgBuf;
+ if ( FormatMessage(
+ FORMAT_MESSAGE_ALLOCATE_BUFFER |
+ FORMAT_MESSAGE_FROM_SYSTEM |
+ FORMAT_MESSAGE_IGNORE_INSERTS,
+ NULL,
+ GetLastError(),
+ MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT), // Default language
+ (char*)&lpMsgBuf,
+ 0,
+ NULL
+ ) )
+ {
+ Warning( "%s", lpMsgBuf );
+ LocalFree( lpMsgBuf );
+ }
+ }
+ else if ( nWarnings == 10 )
+ {
+ Warning( "\nThis machine's ability to multicast may be broken. Please reboot and try again.\n" );
+ }
+ }
+
+ return bSuccess;
+}
+
+
+void CMasterMulticastThread::Thread_SeekToNextActiveChunk()
+{
+ // Make sure we're on a valid chunk.
+ if ( m_iCurFile == -1 )
+ {
+ Assert( m_Files.Count() > 0 );
+ m_iCurFile = m_Files.Head();
+ m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
+ }
+
+ while ( 1 )
+ {
+ if ( m_iCurActiveChunk == m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() ||
+ m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount == 0 )
+ {
+ // Now check for client timeouts.
+ // (This is kicking clients unjustly for some reason.. need to debug).
+ if ( CheckClientTimeouts() && m_nTotalActiveChunks == 0 )
+ break;
+
+ // Finished with that file. Send the next one.
+ m_iCurFile = m_Files.Next( m_iCurFile );
+ if ( m_iCurFile == m_Files.InvalidIndex() )
+ m_iCurFile = m_Files.Head();
+
+ m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Head();
+ }
+
+ if ( m_iCurActiveChunk != m_Files[m_iCurFile]->m_ActiveChunks.InvalidIndex() )
+ {
+ // Only break if we're on an active chunk.
+ if ( m_Files[m_iCurFile]->m_ActiveChunks[m_iCurActiveChunk]->m_RefCount != 0 )
+ {
+ break;
+ }
+
+ m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );
+ }
+ }
+}
+
+
+DWORD CMasterMulticastThread::MulticastThread()
+{
+ CTransmitRateMgr transmitRateMgr;
+ CRateLimiter rateLimiter;
+
+
+ DWORD msToWait = 0; // Only temporarily used if we don't have any data to send.
+
+ while ( WaitForSingleObject( m_hTermEvent, msToWait ) != WAIT_OBJECT_0 )
+ {
+ rateLimiter.GiveUpTimeSlice();
+ msToWait = 0;
+
+ EnterCriticalSection( &m_CS );
+
+ transmitRateMgr.ReadPackets();
+
+ // If we have nothing to send then kick back for a while.
+ if ( m_nTotalActiveChunks == 0 )
+ {
+ LeaveCriticalSection( &m_CS );
+ msToWait = 50;
+ continue;
+ }
+
+ // Ok, now we're active, so send out our presence to other CTransmitRateMgrs on the network.
+ transmitRateMgr.BroadcastPresence();
+
+
+ // We're going to time how long this chunk took to send.
+ CFastTimer timer;
+ timer.Start();
+
+ Thread_SeekToNextActiveChunk();
+
+ // We have to do this check a second time here because the CheckClientTimeouts() call may have
+ // booted our last client. If we don't check it here, we might be transmitting
+ // something we don't want to transmit. Also, if we don't break out of the loop above,
+ // it can prevent the process from ever exiting because it'll never exit that while() loop.
+ if ( m_nTotalActiveChunks == 0 )
+ {
+ LeaveCriticalSection( &m_CS );
+ msToWait = 50;
+ continue;
+ }
+
+ int nBytesSent = 0;
+
+ bool bSuccess;
+ bSuccess = Thread_SendFileChunk_Multicast( &nBytesSent );
+
+ g_nMulticastBytesSent += (int)nBytesSent;
+
+ // Move to the next chunk.
+ m_iCurActiveChunk = m_Files[m_iCurFile]->m_ActiveChunks.Next( m_iCurActiveChunk );
+
+ LeaveCriticalSection( &m_CS );
+
+
+ // Measure how long it took to send this.
+ timer.End();
+ unsigned long timeTaken = timer.GetDuration().GetMicroseconds();
+
+
+ // Measure how long it should have taken.
+ unsigned long estimatedPacketHeaderSize = 32;
+ unsigned long optimalTimeTaken = (unsigned long)( transmitRateMgr.GetMicrosecondsPerByte() * (nBytesSent + estimatedPacketHeaderSize) );
+
+
+ // If we went faster than we should have, then wait for the difference in time.
+ if ( timeTaken < optimalTimeTaken )
+ {
+ rateLimiter.NoteExcessTimeTaken( optimalTimeTaken - timeTaken );
+ }
+ }
+
+ return 0;
+}
+
+
+void CMasterMulticastThread::IncrementChunkRefCount( CMasterMulticastThread::CMulticastFile *pFile, int iChunk )
+{
+ CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];
+
+ if ( pChunk->m_RefCount == 0 )
+ {
+ ++m_nTotalActiveChunks;
+
+ // Move the chunk to the head of the list since it is now active.
+ pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
+ pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToHead( pChunk );
+ }
+
+ pChunk->m_RefCount++;
+}
+
+
+void CMasterMulticastThread::DecrementChunkRefCount( int iFile, int iChunk )
+{
+ CMulticastFile *pFile = m_Files[iFile];
+ CChunkInfo *pChunk = &pFile->m_Chunks[iChunk];
+
+ if ( pChunk->m_RefCount == 0 )
+ {
+ Error( "CMasterMulticastThread::DecrementChunkRefCount - refcount already zero!\n" );
+ }
+
+ pChunk->m_RefCount--;
+ if ( pChunk->m_RefCount == 0 )
+ {
+ --m_nTotalActiveChunks;
+
+ // If this is the current chunk the thread is reading on, seek up to the next chunk so
+ // the thread doesn't spin off into the next file and skip its current file's contents.
+ if ( iFile == m_iCurFile && pChunk->m_iActiveChunksIndex == m_iCurActiveChunk )
+ {
+ m_iCurActiveChunk = pFile->m_ActiveChunks.Next( pChunk->m_iActiveChunksIndex );
+ }
+
+ // Move the chunk to the end of the list since it is now inactive.
+ pFile->m_ActiveChunks.Remove( pChunk->m_iActiveChunksIndex );
+ pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
+ }
+}
+
+
+int CMasterMulticastThread::FindFile( const char *pName, const char *pPathID )
+{
+ FOR_EACH_LL( m_Files, i )
+ {
+ CMulticastFile *pFile = m_Files[i];
+ if ( stricmp( pFile->GetFilename(), pName ) == 0 && stricmp( pFile->GetPathID(), pPathID ) == 0 )
+ return i;
+ }
+ return -1;
+}
+
+
+bool CMasterMulticastThread::FindWarningSuppression( const char *pFilename )
+{
+ FOR_EACH_LL( m_WarningSuppressions, i )
+ {
+ if ( Q_stricmp( m_WarningSuppressions[i], pFilename ) == 0 )
+ return true;
+ }
+ return false;
+}
+
+
+void CMasterMulticastThread::AddWarningSuppression( const char *pFilename )
+{
+ char *pBlah = new char[ strlen( pFilename ) + 1 ];
+ strcpy( pBlah, pFilename );
+ m_WarningSuppressions.AddToTail( pBlah );
+}
+
+
+int CMasterMulticastThread::FindOrAddFile( const char *pFilename, const char *pPathID )
+{
+ CMulticastFile *pFile = NULL;
+ bool bFileAlreadyExisted = false;
+
+ // See if we've already opened this file.
+ int iFile = FindFile( pFilename, pPathID );
+ if ( iFile != -1 )
+ {
+ pFile = m_Files[iFile];
+ if ( pFile->m_bDataLoaded )
+ {
+ return iFile;
+ }
+ else
+ {
+ // Ok, we have the file entry, but its data has been freed, so we need to reload it.
+ EnterCriticalSection( &m_CS );
+ bFileAlreadyExisted = true;
+ }
+ }
+
+ // Can't open a file outside our main thread, because we have to talk to the filesystem
+ // and the filesystem doesn't support that.
+ Assert( GetCurrentThread() == m_hMainThread );
+
+ // When the worker originally asked for the path ID, they could pass NULL and it would come through as "".
+ // Now set it back to null for the filesystem we're passing the call to.
+ FileHandle_t fp = m_pPassThru->Open( pFilename, "rb", pPathID[0] == 0 ? NULL : pPathID );
+ if ( !fp )
+ {
+ if ( bFileAlreadyExisted )
+ LeaveCriticalSection( &m_CS );
+
+ return -1;
+ }
+
+ if ( !bFileAlreadyExisted )
+ pFile = new CMulticastFile;
+
+ pFile->m_UncompressedData.SetSize( m_pPassThru->Size( fp ) );
+ m_pPassThru->Read( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), fp );
+ m_pPassThru->Close( fp );
+
+ int iRet = FinishFileSetup( pFile, pFilename, pPathID, bFileAlreadyExisted );
+ if ( bFileAlreadyExisted )
+ LeaveCriticalSection( &m_CS );
+
+ return iRet;
+}
+
+
+int CMasterMulticastThread::FinishFileSetup( CMulticastFile *pFile, const char *pFilename, const char *pPathID, bool bFileAlreadyExisted )
+{
+ // Compress the file's contents.
+ if ( !ZLibCompress( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), pFile->m_Data ) )
+ {
+ delete pFile;
+ return -1;
+ }
+
+ pFile->m_bDataLoaded = true;
+ int chunkSize = VMPI_GetChunkPayloadSize();
+
+ // Get this file in the queue.
+ if ( !bFileAlreadyExisted )
+ {
+ pFile->m_Filename.SetSize( strlen( pFilename ) + 1 );
+ strcpy( pFile->m_Filename.Base(), pFilename );
+
+ pFile->m_PathID.SetSize( strlen( pPathID ) + 1 );
+ strcpy( pFile->m_PathID.Base(), pPathID );
+
+ pFile->m_nCycles = 0;
+
+ pFile->m_Info.m_CompressedSize = pFile->m_Data.Count();
+ pFile->m_Info.m_UncompressedSize = pFile->m_UncompressedData.Count();
+
+ pFile->m_Info.m_nChunks = PAD_NUMBER( pFile->m_Info.m_CompressedSize, chunkSize ) / chunkSize;
+
+ // Initialize the chunks.
+ pFile->m_Chunks.SetSize( pFile->m_Info.m_nChunks );
+ for ( int i=0; i < pFile->m_Chunks.Count(); i++ )
+ {
+ CChunkInfo *pChunk = &pFile->m_Chunks[i];
+
+ pChunk->m_iChunk = (unsigned short)i;
+ pChunk->m_RefCount = 0;
+ pChunk->m_iActiveChunksIndex = pFile->m_ActiveChunks.AddToTail( pChunk );
+ }
+
+ EnterCriticalSection( &m_CS );
+ }
+
+ // Boot some other file out of memory if we're out of space.
+ m_nCurMemoryUsage += ( pFile->m_Info.m_CompressedSize + pFile->m_Info.m_UncompressedSize );
+ EnsureMemoryLimit( pFile );
+
+ if ( !bFileAlreadyExisted )
+ {
+ pFile->m_Info.m_FileID = m_Files.AddToTail( pFile );
+ LeaveCriticalSection( &m_CS );
+ }
+
+ return pFile->m_Info.m_FileID;
+}
+
+
+void CMasterMulticastThread::EnsureMemoryLimit( CMulticastFile *pIgnore )
+{
+ if ( m_nMaxMemoryUsage != 0 && m_nCurMemoryUsage > m_nMaxMemoryUsage )
+ {
+ // Free all the files that we can.
+ FOR_EACH_LL( m_Files, iFile )
+ {
+ CMulticastFile *pFile = m_Files[iFile];
+ if ( pFile == pIgnore || !pFile->m_bDataLoaded )
+ continue;
+
+ if ( pFile->m_ActiveChunks.Count() == 0 )
+ {
+ m_nCurMemoryUsage -= ( pFile->m_Info.m_CompressedSize + pFile->m_Info.m_UncompressedSize );
+
+ pFile->m_Data.Purge();
+ pFile->m_UncompressedData.Purge();
+ pFile->m_bDataLoaded = false;
+ }
+ }
+ }
+}
+
+
+const CUtlVector<char>& CMasterMulticastThread::GetFileData( int iFile ) const
+{
+ return m_Files[iFile]->m_UncompressedData;
+}
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// CMasterVMPIFileSystem implementation.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+class CMasterVMPIFileSystem : public CBaseVMPIFileSystem
+{
+public:
+ CMasterVMPIFileSystem();
+ virtual ~CMasterVMPIFileSystem();
+
+ bool Init( int maxMemoryUsage, IFileSystem *pPassThru );
+ virtual void Term();
+
+ virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID );
+ virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID );
+
+ virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength );
+
+ virtual CSysModule *LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly );
+ virtual void UnloadModule( CSysModule *pModule );
+
+
+private:
+
+ static void OnClientDisconnect( int procID, const char *pReason );
+
+
+private:
+ CMasterMulticastThread m_MasterThread;
+ IFileSystem *m_pMasterVMPIFileSystemPassThru;
+
+ static CMasterVMPIFileSystem *s_pMasterVMPIFileSystem;
+};
+
+
+CMasterVMPIFileSystem *CMasterVMPIFileSystem::s_pMasterVMPIFileSystem = NULL;
+
+
+CBaseVMPIFileSystem* CreateMasterVMPIFileSystem( int maxMemoryUsage, IFileSystem *pPassThru )
+{
+ CMasterVMPIFileSystem *pRet = new CMasterVMPIFileSystem;
+ g_pBaseVMPIFileSystem = pRet;
+ if ( pRet->Init( maxMemoryUsage, pPassThru ) )
+ {
+ return pRet;
+ }
+ else
+ {
+ delete pRet;
+ g_pBaseVMPIFileSystem = NULL;
+ return NULL;
+ }
+}
+
+
+CMasterVMPIFileSystem::CMasterVMPIFileSystem()
+{
+ Assert( !s_pMasterVMPIFileSystem );
+ s_pMasterVMPIFileSystem = this;
+}
+
+
+CMasterVMPIFileSystem::~CMasterVMPIFileSystem()
+{
+ Assert( s_pMasterVMPIFileSystem == this );
+ s_pMasterVMPIFileSystem = NULL;
+}
+
+
+bool CMasterVMPIFileSystem::Init( int maxMemoryUsage, IFileSystem *pPassThru )
+{
+ // Only init the BASE filesystem passthru. Leave the IFileSystem passthru using NULL so it'll crash
+ // immediately if they try to use a function we don't support.
+ InitPassThru( pPassThru, true );
+ m_pMasterVMPIFileSystemPassThru = pPassThru;
+
+ // Pick a random IP in the multicast range (224.0.0.2 to 239.255.255.255);
+ CCycleCount cnt;
+ cnt.Sample();
+ RandomSeed( (int)cnt.GetMicroseconds() );
+
+ int localPort = 23412; // This can be anything.
+
+ unsigned short port = RandomInt( 22000, 25000 );
+ if ( VMPI_GetRunMode() == VMPI_RUN_NETWORKED )
+ {
+ if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST )
+ {
+ m_MulticastIP.port = port;
+ m_MulticastIP.ip[0] = (unsigned char)RandomInt( 225, 238 );
+ m_MulticastIP.ip[1] = (unsigned char)RandomInt( 0, 255 );
+ m_MulticastIP.ip[2] = (unsigned char)RandomInt( 0, 255 );
+ m_MulticastIP.ip[3] = (unsigned char)RandomInt( 3, 255 );
+ }
+ else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST )
+ {
+ m_MulticastIP.Init( 0xFF, 0xFF, 0xFF, 0xFF, port );
+ }
+ }
+ else
+ {
+ // Doesn't matter.. we don't use the multicast IP in TCP mode.
+ m_MulticastIP.Init( 0, 0, 0, 0, 0 );
+ }
+
+ if ( !m_MasterThread.Init( pPassThru, localPort, &m_MulticastIP, maxMemoryUsage ) )
+ return false;
+
+ // Send out the multicast addr to all the clients.
+ SendMulticastIP( &m_MulticastIP );
+
+ // Make sure we're notified when a client disconnects so we can unlink them from the
+ // multicast thread's structures.
+ VMPI_AddDisconnectHandler( &CMasterVMPIFileSystem::OnClientDisconnect );
+ return true;
+}
+
+
+void CMasterVMPIFileSystem::Term()
+{
+ m_MasterThread.Term();
+}
+
+
+FileHandle_t CMasterVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pPathID )
+{
+ Assert( g_bUseMPI );
+
+ if ( g_bDisableFileAccess )
+ Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions );
+
+ // Use a stdio file if they want to write to it.
+ bool bWriteAccess = IsOpeningForWriteAccess( pOptions );
+ if ( bWriteAccess )
+ {
+ FileHandle_t fp = m_pBaseFileSystemPassThru->Open( pFilename, pOptions, pPathID );
+ if ( fp == FILESYSTEM_INVALID_HANDLE )
+ return FILESYSTEM_INVALID_HANDLE;
+
+ CVMPIFile_PassThru *pFile = new CVMPIFile_PassThru;
+ pFile->Init( m_pBaseFileSystemPassThru, fp );
+ return (FileHandle_t)pFile;
+ }
+
+ // Internally, we require path IDs to be non-null. We'll convert it back to null whenever we make filesystem calls though.
+ if ( !pPathID )
+ pPathID = "";
+
+ // Have our multicast thread load all the data so it's there when workers want it.
+ int iFile = m_MasterThread.FindOrAddFile( pFilename, pPathID );
+ if ( iFile == -1 )
+ return FILESYSTEM_INVALID_HANDLE;
+
+ const CUtlVector<char> &data = m_MasterThread.GetFileData( iFile );
+
+ CVMPIFile_Memory *pFile = new CVMPIFile_Memory;
+ pFile->Init( data.Base(), data.Count(), strchr( pOptions, 't' ) ? 't' : 'b' );
+ return (FileHandle_t)pFile;
+}
+
+
+void CMasterVMPIFileSystem::OnClientDisconnect( int procID, const char *pReason )
+{
+ s_pMasterVMPIFileSystem->m_MasterThread.OnClientDisconnect( procID );
+}
+
+
+void CMasterVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength )
+{
+ m_MasterThread.CreateVirtualFile( pFilename, pData, fileLength );
+}
+
+bool CMasterVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID )
+{
+ // Handle this packet.
+ int subPacketID = pBuf->data[1];
+ switch( subPacketID )
+ {
+ case VMPI_FSPACKETID_FILE_REQUEST:
+ {
+ int requestID = *((int*)&pBuf->data[2]);
+ const char *pFilename = (const char*)&pBuf->data[6];
+ const char *pPathID = (const char*)pFilename + strlen( pFilename ) + 1;
+
+ if ( g_iVMPIVerboseLevel >= 2 )
+ Msg( "Client %d requested '%s'\n", iSource, pFilename );
+
+ bool bZeroLength;
+ int fileID = m_MasterThread.AddFileRequest( pFilename, pPathID, iSource, &bZeroLength );
+
+ // Send back the file ID.
+ unsigned char cPacket[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_RESPONSE };
+ void *pChunks[4] = { cPacket, &requestID, &fileID, &bZeroLength };
+ int chunkLen[4] = { sizeof( cPacket ), sizeof( requestID ), sizeof( fileID ), sizeof( bZeroLength ) };
+
+ VMPI_SendChunks( pChunks, chunkLen, ARRAYSIZE( pChunks ), iSource );
+ }
+ return true;
+
+ case VMPI_FSPACKETID_CHUNK_RECEIVED:
+ {
+ unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
+ unsigned short *pChunkID = pFileID+1;
+
+ int nChunks = (pBuf->getLen() - 2) / 4;
+ for ( int i=0; i < nChunks; i++ )
+ {
+ m_MasterThread.OnChunkReceived( *pFileID, iSource, *pChunkID );
+ pFileID += 2;
+ pChunkID += 2;
+ }
+ }
+ return true;
+
+ case VMPI_FSPACKETID_FILE_RECEIVED:
+ {
+ unsigned short *pFileID = (unsigned short*)&pBuf->data[2];
+ m_MasterThread.OnFileReceived( *pFileID, iSource );
+ }
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+
+CSysModule* CMasterVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly )
+{
+ return m_pMasterVMPIFileSystemPassThru->LoadModule( pFileName, pPathID, bValidatedDllOnly );
+}
+
+void CMasterVMPIFileSystem::UnloadModule( CSysModule *pModule )
+{
+ m_pMasterVMPIFileSystemPassThru->UnloadModule( pModule );
+}
+