summaryrefslogtreecommitdiff
path: root/utils/vmpi/vmpi_filesystem_worker.cpp
diff options
context:
space:
mode:
authorFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
committerFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
commit3bf9df6b2785fa6d951086978a3e66f49427166a (patch)
tree2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi_filesystem_worker.cpp
downloadarchived-source-engine-2018-hl2-src-master.tar.xz
archived-source-engine-2018-hl2-src-master.zip
Diffstat (limited to 'utils/vmpi/vmpi_filesystem_worker.cpp')
-rw-r--r--utils/vmpi/vmpi_filesystem_worker.cpp815
1 files changed, 815 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_filesystem_worker.cpp b/utils/vmpi/vmpi_filesystem_worker.cpp
new file mode 100644
index 0000000..ef5820d
--- /dev/null
+++ b/utils/vmpi/vmpi_filesystem_worker.cpp
@@ -0,0 +1,815 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+//=============================================================================//
+
+#include <winsock2.h>
+#include "vmpi_filesystem_internal.h"
+#include "threadhelpers.h"
+#include "zlib.h"
+
+
+#define NUM_BUFFERED_CHUNK_ACKS 512
+#define ACK_FLUSH_INTERVAL 500 // Flush the ack queue twice per second.
+
+
+static bool g_bReceivedMulticastIP = false;
+static CIPAddr g_MulticastIP;
+
+
+CCriticalSection g_FileResponsesCS;
+
+class CFileResponse
+{
+public:
+ int m_RequestID;
+ int m_Response;
+ bool m_bZeroLength;
+};
+
+CUtlVector<CFileResponse> g_FileResponses;
+int g_RequestID = 0;
+
+
+class CFileChunkPacket
+{
+public:
+ int m_Len;
+ char m_Data[1];
+};
+CUtlLinkedList<CFileChunkPacket*, int> g_FileChunkPackets; // This is also protected by g_FileResponsesCS.
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// Classes.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+class CWorkerFile
+{
+public:
+ const char* GetFilename() { return m_Filename.Base(); }
+ const char* GetPathID() { return m_PathID.Base(); }
+ bool IsReadyToRead() const { return m_nChunksToReceive == 0; }
+
+
+public:
+ CFastTimer m_Timer; // To see how long it takes to download the file.
+
+ // This has to be sent explicitly as part of the file info or else the protocol
+ // breaks on empty files.
+ bool m_bZeroLength;
+
+ // This is false until we get any packets about the file. In the packets,
+ // we find out what the size is supposed to be.
+ bool m_bGotCompressedSize;
+
+ // The ID the master uses to refer to this file.
+ int m_FileID;
+
+ CUtlVector<char> m_Filename;
+ CUtlVector<char> m_PathID;
+
+ // First data comes in here, then when it's all there, it is inflated into m_UncompressedData.
+ CUtlVector<char> m_CompressedData;
+
+ // 1 bit for each chunk.
+ CUtlVector<unsigned char> m_ChunksReceived;
+
+ // When this is zero, the file is done being received and m_UncompressedData is valid.
+ int m_nChunksToReceive;
+ CUtlVector<char> m_UncompressedData;
+};
+
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// Global helpers.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+static void RecvMulticastIP( CIPAddr *pAddr )
+{
+ while ( !g_bReceivedMulticastIP )
+ VMPI_DispatchNextMessage();
+
+ *pAddr = g_MulticastIP;
+}
+
+
+static bool ZLibDecompress( const void *pInput, int inputLen, void *pOut, int outLen )
+{
+ if ( inputLen == 0 )
+ {
+ // Zero-length file?
+ return true;
+ }
+
+ z_stream decompressStream;
+
+ // Initialize the decompression stream.
+ memset( &decompressStream, 0, sizeof( decompressStream ) );
+ if ( inflateInit( &decompressStream ) != Z_OK )
+ return false;
+
+ // Decompress all this stuff and write it to the file.
+ decompressStream.next_in = (unsigned char*)pInput;
+ decompressStream.avail_in = inputLen;
+
+ char *pOutChar = (char*)pOut;
+ while ( decompressStream.avail_in )
+ {
+ decompressStream.total_out = 0;
+ decompressStream.next_out = (unsigned char*)pOutChar;
+ decompressStream.avail_out = outLen - (pOutChar - (char*)pOut);
+
+ int ret = inflate( &decompressStream, Z_NO_FLUSH );
+ if ( ret != Z_OK && ret != Z_STREAM_END )
+ return false;
+
+
+ pOutChar += decompressStream.total_out;
+
+ if ( ret == Z_STREAM_END )
+ {
+ if ( (pOutChar - (char*)pOut) == outLen )
+ {
+ return true;
+ }
+ else
+ {
+ Assert( false );
+ return false;
+ }
+ }
+ }
+
+ Assert( false ); // Should have gotten to Z_STREAM_END.
+ return false;
+}
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// CWorkerMulticastListener implementation.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+class CWorkerMulticastListener
+{
+public:
+ CWorkerMulticastListener()
+ {
+ m_nUnfinishedFiles = 0;
+ }
+
+ ~CWorkerMulticastListener()
+ {
+ Term();
+ }
+
+ bool Init( const CIPAddr &mcAddr )
+ {
+ m_MulticastAddr = mcAddr;
+ m_hMainThread = GetCurrentThread();
+ return true;
+ }
+
+ void Term()
+ {
+ m_WorkerFiles.PurgeAndDeleteElements();
+ }
+
+
+ CWorkerFile* RequestFileFromServer( const char *pFilename, const char *pPathID )
+ {
+ Assert( pPathID );
+ Assert( FindWorkerFile( pFilename, pPathID ) == NULL );
+
+ // Send a request to the master to find out if this file even exists.
+ CCriticalSectionLock csLock( &g_FileResponsesCS );
+ csLock.Lock();
+ int requestID = g_RequestID++;
+ csLock.Unlock();
+
+ unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_REQUEST };
+ const void *pChunks[4] = { packetID, &requestID, (void*)pFilename, pPathID };
+ int chunkLengths[4] = { sizeof( packetID ), sizeof( requestID ), strlen( pFilename ) + 1, strlen( pPathID ) + 1 };
+ VMPI_SendChunks( pChunks, chunkLengths, ARRAYSIZE( pChunks ), 0 );
+
+ // Wait for the file ID to come back.
+ CFileResponse response;
+ response.m_Response = -1;
+ response.m_bZeroLength = true;
+
+ // We're in a worker thread.. the main thread should be dispatching all the messages, so let it
+ // do that until we get our response.
+ while ( 1 )
+ {
+ bool bGotIt = false;
+ csLock.Lock();
+ for ( int iResponse=0; iResponse < g_FileResponses.Count(); iResponse++ )
+ {
+ if ( g_FileResponses[iResponse].m_RequestID == requestID )
+ {
+ response = g_FileResponses[iResponse];
+ g_FileResponses.Remove( iResponse );
+ bGotIt = true;
+ break;
+ }
+ }
+ csLock.Unlock();
+
+ if ( bGotIt )
+ break;
+
+ if ( GetCurrentThread() == m_hMainThread )
+ VMPI_DispatchNextMessage( 20 );
+ else
+ Sleep( 20 );
+ }
+
+ // If we get -1 back, it means the file doesn't exist.
+ int fileID = response.m_Response;
+ if ( fileID == -1 )
+ return NULL;
+
+ CWorkerFile *pTestFile = new CWorkerFile;
+
+ pTestFile->m_Filename.SetSize( strlen( pFilename ) + 1 );
+ strcpy( pTestFile->m_Filename.Base(), pFilename );
+
+ pTestFile->m_PathID.SetSize( strlen( pPathID ) + 1 );
+ strcpy( pTestFile->m_PathID.Base(), pPathID );
+
+ pTestFile->m_FileID = fileID;
+ pTestFile->m_nChunksToReceive = 9999;
+ pTestFile->m_Timer.Start();
+ m_WorkerFiles.AddToTail( pTestFile );
+ pTestFile->m_bGotCompressedSize = false;
+ pTestFile->m_bZeroLength = response.m_bZeroLength;
+
+ ++m_nUnfinishedFiles;
+
+ return pTestFile;
+ }
+
+ void FlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime )
+ {
+ if ( nChunksToAck )
+ {
+ // Tell the master we received this chunk.
+ unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_CHUNK_RECEIVED };
+ void *pChunks[2] = { packetID, chunksToAck };
+ int chunkLengths[2] = { sizeof( packetID ), nChunksToAck * 4 };
+ VMPI_SendChunks( pChunks, chunkLengths, 2, 0 );
+ nChunksToAck = 0;
+ }
+
+ lastAckTime = GetTickCount();
+ }
+
+ void MaybeFlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime )
+ {
+ if ( nChunksToAck && GetTickCount() - lastAckTime > ACK_FLUSH_INTERVAL )
+ FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
+ }
+
+ void AddAckChunk(
+ unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2],
+ int &nChunksToAck,
+ DWORD &lastAckTime,
+ int fileID,
+ int iChunk )
+ {
+ chunksToAck[nChunksToAck][0] = (unsigned short)fileID;
+ chunksToAck[nChunksToAck][1] = (unsigned short)iChunk;
+
+ // TCP filesystem acks all chunks immediately so it'll send more.
+ ++nChunksToAck;
+ if ( nChunksToAck == NUM_BUFFERED_CHUNK_ACKS || VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP )
+ {
+ FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
+ }
+ }
+
+ // Returns the length of the packet's data or -1 if there is nothing.
+ int CheckFileChunkPackets( char *data, int dataSize )
+ {
+ // Using TCP.. pop the next received packet off the stack.
+ CCriticalSectionLock csLock( &g_FileResponsesCS );
+ csLock.Lock();
+ if ( g_FileChunkPackets.Count() <= 0 )
+ return -1;
+
+ CFileChunkPacket *pPacket = g_FileChunkPackets[ g_FileChunkPackets.Head() ];
+ g_FileChunkPackets.Remove( g_FileChunkPackets.Head() );
+
+ // Yes, this is inefficient, but the amount of data we're handling here is tiny so the
+ // effect is negligible.
+ int len;
+ if ( pPacket->m_Len > dataSize )
+ {
+ len = -1;
+ Warning( "CWorkerMulticastListener::ListenFor: Got a section of data too long (%d bytes).", pPacket->m_Len );
+ }
+ else
+ {
+ memcpy( data, pPacket->m_Data, pPacket->m_Len );
+ len = pPacket->m_Len;
+ }
+
+ free( pPacket );
+ return len;
+ }
+
+ void ShowSDKWorkerMsg( const char *pMsg, ... )
+ {
+ if ( !g_bMPIMaster && VMPI_IsSDKMode() )
+ {
+ va_list marker;
+ va_start( marker, pMsg );
+ char str[4096];
+ V_vsnprintf( str, sizeof( str ), pMsg, marker );
+ va_end( marker );
+ Msg( "%s", str );
+ }
+ }
+
+ // This is the main function the workers use to pick files out of the multicast stream.
+ // The app is waiting for a specific file, but we receive and ack any files we can until
+ // we get the file they're looking for, then we return.
+ //
+ // NOTE: ideally, this would be in a thread, but it adds lots of complications and may
+ // not be worth it.
+ CWorkerFile* ListenFor( const char *pFilename, const char *pPathID )
+ {
+ CWorkerFile *pFile = FindWorkerFile( pFilename, pPathID );
+ if ( !pFile )
+ {
+ // Ok, we haven't requested this file yet. Create an entry for it and
+ // tell the master we'd like this file.
+ pFile = RequestFileFromServer( pFilename, pPathID );
+ if ( !pFile )
+ return NULL;
+
+ // If it's zero-length, we can return right now.
+ if ( pFile->m_bZeroLength )
+ {
+ --m_nUnfinishedFiles;
+ return pFile;
+ }
+ }
+
+ // Setup a filename to print some debug spew with.
+ char printableFilename[58];
+ if ( V_strlen( pFilename ) > ARRAYSIZE( printableFilename ) - 1 )
+ {
+ V_strncpy( printableFilename, "[...]", sizeof( printableFilename ) );
+ V_strncat( printableFilename, &pFilename[V_strlen(pFilename) - ARRAYSIZE(printableFilename) + 1 + V_strlen(printableFilename)], sizeof( printableFilename ) );
+ }
+ else
+ {
+ V_strncpy( printableFilename, pFilename, sizeof( printableFilename ) );
+ }
+ ShowSDKWorkerMsg( "\rRecv %s (0%%) ", printableFilename );
+ int iChunkPayloadSize = VMPI_GetChunkPayloadSize();
+
+ // Now start listening to the stream.
+ // Note: no need to setup anything when in TCP mode - we just use the regular
+ // VMPI dispatch stuff to handle that.
+ ISocket *pSocket = NULL;
+ if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST )
+ {
+ pSocket = CreateMulticastListenSocket( m_MulticastAddr );
+
+ if ( !pSocket )
+ {
+ char str[512];
+ IP_GetLastErrorString( str, sizeof( str ) );
+ Warning( "CreateMulticastListenSocket (%d.%d.%d.%d:%d) failed\n%s\n", EXPAND_ADDR( m_MulticastAddr ), str );
+ return NULL;
+ }
+ }
+ else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST )
+ {
+ pSocket = CreateIPSocket();
+ if ( !pSocket->BindToAny( m_MulticastAddr.port ) )
+ {
+ pSocket->Release();
+ pSocket = NULL;
+ }
+ }
+
+ unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2];
+ int nChunksToAck = 0;
+ DWORD lastAckTime = GetTickCount();
+
+ // Now just receive multicast data until this file has been received.
+ while ( m_nUnfinishedFiles > 0 )
+ {
+ char data[MAX_CHUNK_PAYLOAD_SIZE+1024];
+ int len = -1;
+
+ if ( pSocket )
+ {
+ CIPAddr ipFrom;
+ len = pSocket->RecvFrom( data, sizeof( data ), &ipFrom );
+ }
+ else
+ {
+ len = CheckFileChunkPackets( data, sizeof( data ) );
+ }
+
+ if ( len == -1 )
+ {
+ // Sleep for 10ms and also handle socket errors.
+ Sleep( 0 );
+ VMPI_DispatchNextMessage( 10 );
+ continue;
+ }
+
+ g_nMulticastBytesReceived += len;
+
+ // Alrighty. Figure out what the deal is with this file.
+ CMulticastFileInfo *pInfo = (CMulticastFileInfo*)data;
+ int *piChunk = (int*)( pInfo + 1 );
+ const char *pTestFilename = (const char*)( piChunk + 1 );
+ const char *pPayload = pTestFilename + strlen( pFilename ) + 1;
+ int payloadLen = len - ( pPayload - data );
+ if ( payloadLen < 0 )
+ {
+ Warning( "CWorkerMulticastListener::ListenFor: invalid packet received on multicast group\n" );
+ continue;
+ }
+
+
+ if ( pInfo->m_FileID != pFile->m_FileID )
+ continue;
+
+ CWorkerFile *pTestFile = FindWorkerFile( pInfo->m_FileID );
+ if ( !pTestFile )
+ Error( "FindWorkerFile( %s ) failed\n", pTestFilename );
+
+ // TODO: reenable this code and disable the if right above here.
+ // We always get "invalid payload length" errors on the workers when using this, but
+ // I haven't been able to figure out why yet.
+ /*
+ // Put the data into whatever file it belongs in.
+ if ( !pTestFile )
+ {
+ pTestFile = RequestFileFromServer( pTestFilename );
+ if ( !pTestFile )
+ continue;
+ }
+ */
+
+ // Is this the first packet about this file?
+ if ( !pTestFile->m_bGotCompressedSize )
+ {
+ pTestFile->m_bGotCompressedSize = true;
+ pTestFile->m_CompressedData.SetSize( pInfo->m_CompressedSize );
+ pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize );
+ pTestFile->m_ChunksReceived.SetSize( PAD_NUMBER( pInfo->m_nChunks, 8 ) / 8 );
+ pTestFile->m_nChunksToReceive = pInfo->m_nChunks;
+ memset( pTestFile->m_ChunksReceived.Base(), 0, pTestFile->m_ChunksReceived.Count() );
+ }
+
+ // Validate the chunk index and uncompressed size.
+ int iChunk = *piChunk;
+ if ( iChunk < 0 || iChunk >= pInfo->m_nChunks )
+ {
+ Error( "ListenFor(): invalid chunk index (%d) for file '%s'\n", iChunk, pTestFilename );
+ }
+
+ // Only handle this if we didn't already received the chunk.
+ if ( !(pTestFile->m_ChunksReceived[iChunk >> 3] & (1 << (iChunk & 7))) )
+ {
+ // Make sure the file is properly setup to receive the data into.
+ if ( (int)pInfo->m_UncompressedSize != pTestFile->m_UncompressedData.Count() ||
+ (int)pInfo->m_CompressedSize != pTestFile->m_CompressedData.Count() )
+ {
+ Error( "ListenFor(): invalid compressed or uncompressed size.\n"
+ "pInfo = '%s', pTestFile = '%s'\n"
+ "Compressed (pInfo = %d, pTestFile = %d)\n"
+ "Uncompressed (pInfo = %d, pTestFile = %d)\n",
+ pTestFilename,
+ pTestFile->GetFilename(),
+ pInfo->m_CompressedSize,
+ pTestFile->m_CompressedData.Count(),
+ pInfo->m_UncompressedSize,
+ pTestFile->m_UncompressedData.Count()
+ );
+ }
+
+ int iChunkStart = iChunk * iChunkPayloadSize;
+ int iChunkEnd = min( iChunkStart + iChunkPayloadSize, pTestFile->m_CompressedData.Count() );
+ int chunkLen = iChunkEnd - iChunkStart;
+
+ if ( chunkLen != payloadLen )
+ {
+ Error( "ListenFor(): invalid payload length for '%s' (%d should be %d)\n"
+ "pInfo = '%s', pTestFile = '%s'\n"
+ "Chunk %d out of %d. Compressed size: %d\n",
+ pTestFile->GetFilename(),
+ payloadLen,
+ chunkLen,
+ pTestFilename,
+ pTestFile->GetFilename(),
+ iChunk,
+ pInfo->m_nChunks,
+ pInfo->m_CompressedSize
+ );
+ }
+
+ memcpy( &pTestFile->m_CompressedData[iChunkStart], pPayload, chunkLen );
+ pTestFile->m_ChunksReceived[iChunk >> 3] |= (1 << (iChunk & 7));
+
+ --pTestFile->m_nChunksToReceive;
+
+ if ( pTestFile == pFile )
+ {
+ int percent = 100 - (100 * pFile->m_nChunksToReceive) / pInfo->m_nChunks;
+ ShowSDKWorkerMsg( "\rRecv %s (%d%%) [chunk %d/%d] ", printableFilename, percent, pInfo->m_nChunks - pFile->m_nChunksToReceive, pInfo->m_nChunks );
+ }
+
+ // Remember to ack what we received.
+ AddAckChunk( chunksToAck, nChunksToAck, lastAckTime, pInfo->m_FileID, iChunk );
+
+ // If we're done receiving the data, unpack it.
+ if ( pTestFile->m_nChunksToReceive == 0 )
+ {
+ // Ack the file.
+ FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
+
+ pTestFile->m_Timer.End();
+
+ pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize );
+ --m_nUnfinishedFiles;
+
+ if ( !ZLibDecompress(
+ pTestFile->m_CompressedData.Base(),
+ pTestFile->m_CompressedData.Count(),
+ pTestFile->m_UncompressedData.Base(),
+ pTestFile->m_UncompressedData.Count() ) )
+ {
+ if ( pSocket )
+ pSocket->Release();
+ FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
+ Error( "ZLibDecompress failed.\n" );
+ return NULL;
+ }
+
+ char str[512];
+ V_snprintf( str, sizeof( str ), "Got %s (%dk) in %.2fs",
+ printableFilename,
+ (pTestFile->m_UncompressedData.Count() + 511) / 1024,
+ pTestFile->m_Timer.GetDuration().GetSeconds()
+ );
+ Msg( "\r%-79s\n", str );
+
+ // Won't be needing this anymore.
+ pTestFile->m_CompressedData.Purge();
+ }
+ }
+
+ MaybeFlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
+ }
+
+ Assert( pFile->IsReadyToRead() );
+ FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime );
+ if ( pSocket )
+ pSocket->Release();
+
+ return pFile;
+ }
+
+ CWorkerFile* FindWorkerFile( const char *pFilename, const char *pPathID )
+ {
+ FOR_EACH_LL( m_WorkerFiles, i )
+ {
+ CWorkerFile *pWorkerFile = m_WorkerFiles[i];
+
+ if ( stricmp( pWorkerFile->GetFilename(), pFilename ) == 0 && stricmp( pWorkerFile->GetPathID(), pPathID ) == 0 )
+ return pWorkerFile;
+ }
+ return NULL;
+ }
+
+ CWorkerFile* FindWorkerFile( int fileID )
+ {
+ FOR_EACH_LL( m_WorkerFiles, i )
+ {
+ if ( m_WorkerFiles[i]->m_FileID == fileID )
+ return m_WorkerFiles[i];
+ }
+ return NULL;
+ }
+
+
+private:
+ CIPAddr m_MulticastAddr;
+
+ CUtlLinkedList<CWorkerFile*, int> m_WorkerFiles;
+
+ HANDLE m_hMainThread;
+
+ // How many files do we have open that we haven't finished receiving from the server yet?
+ // We always keep waiting for data until this is zero.
+ int m_nUnfinishedFiles;
+};
+
+
+
+// ------------------------------------------------------------------------------------------------------------------------ //
+// CWorkerVMPIFileSystem implementation.
+// ------------------------------------------------------------------------------------------------------------------------ //
+
+class CWorkerVMPIFileSystem : public CBaseVMPIFileSystem
+{
+public:
+ InitReturnVal_t Init();
+ virtual void Term();
+
+ virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID );
+ virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID );
+
+ virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength );
+ virtual long GetFileTime( const char *pFileName, const char *pathID );
+ virtual bool IsFileWritable( const char *pFileName, const char *pPathID );
+ virtual bool SetFileWritable( char const *pFileName, bool writable, const char *pPathID );
+
+ virtual CSysModule *LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly );
+ virtual void UnloadModule( CSysModule *pModule );
+
+private:
+ CWorkerMulticastListener m_Listener;
+};
+
+
+CBaseVMPIFileSystem* CreateWorkerVMPIFileSystem()
+{
+ CWorkerVMPIFileSystem *pRet = new CWorkerVMPIFileSystem;
+ g_pBaseVMPIFileSystem = pRet;
+ if ( pRet->Init() )
+ {
+ return pRet;
+ }
+ else
+ {
+ delete pRet;
+ g_pBaseVMPIFileSystem = NULL;
+ return NULL;
+ }
+}
+
+
+InitReturnVal_t CWorkerVMPIFileSystem::Init()
+{
+ // Get the multicast addr to listen on.
+ CIPAddr mcAddr;
+ RecvMulticastIP( &mcAddr );
+
+ return m_Listener.Init( mcAddr ) ? INIT_OK : INIT_FAILED;
+}
+
+
+void CWorkerVMPIFileSystem::Term()
+{
+ m_Listener.Term();
+}
+
+
+FileHandle_t CWorkerVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pathID )
+{
+ Assert( g_bUseMPI );
+
+ // When it finally asks the filesystem for a file, it'll pass NULL for pathID if it's "".
+ if ( !pathID )
+ pathID = "";
+
+ if ( g_bDisableFileAccess )
+ Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions );
+
+ // Workers can't open anything for write access.
+ bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0);
+ if ( bWriteAccess )
+ return FILESYSTEM_INVALID_HANDLE;
+
+ // Do we have this file's data already?
+ CWorkerFile *pFile = m_Listener.FindWorkerFile( pFilename, pathID );
+ if ( !pFile || !pFile->IsReadyToRead() )
+ {
+ // Ok, start listening to the multicast stream until we get the file we want.
+
+ // NOTE: it might make sense here to have the client ask for a list of ALL the files that
+ // the master currently has and wait to receive all of them (so we don't come back a bunch
+ // of times and listen
+
+ // NOTE NOTE: really, the best way to do this is to have a thread on the workers that sits there
+ // and listens to the multicast stream. Any time the master opens a new file up, it assumes
+ // all the workers need the file, and it starts to send it on the multicast stream until
+ // the worker threads respond that they all have it.
+ //
+ // (NOTE: this probably means that the clients would have to ack the chunks on a UDP socket that
+ // the thread owns).
+ //
+ // This would simplify all the worries about a client missing half the stream and having to
+ // wait for another cycle through it.
+ pFile = m_Listener.ListenFor( pFilename, pathID );
+
+ if ( !pFile )
+ {
+ return FILESYSTEM_INVALID_HANDLE;
+ }
+ }
+
+ // Ok! Got the file. now setup a memory stream they can read out of it with.
+ CVMPIFile_Memory *pOut = new CVMPIFile_Memory;
+ pOut->Init( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), strchr( pOptions, 't' ) ? 't' : 'b' );
+ return (FileHandle_t)pOut;
+}
+
+
+void CWorkerVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength )
+{
+ Error( "CreateVirtualFile not supported in VMPI worker filesystem." );
+}
+
+
+long CWorkerVMPIFileSystem::GetFileTime( const char *pFileName, const char *pathID )
+{
+ Error( "GetFileTime not supported in VMPI worker filesystem." );
+ return 0;
+}
+
+
+bool CWorkerVMPIFileSystem::IsFileWritable( const char *pFileName, const char *pPathID )
+{
+ Error( "GetFileTime not supported in VMPI worker filesystem." );
+ return false;
+}
+
+
+bool CWorkerVMPIFileSystem::SetFileWritable( char const *pFileName, bool writable, const char *pPathID )
+{
+ Error( "GetFileTime not supported in VMPI worker filesystem." );
+ return false;
+}
+
+bool CWorkerVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID )
+{
+ // Handle this packet.
+ int subPacketID = pBuf->data[1];
+ switch( subPacketID )
+ {
+ case VMPI_FSPACKETID_MULTICAST_ADDR:
+ {
+ char *pInPos = &pBuf->data[2];
+
+ g_MulticastIP = *((CIPAddr*)pInPos);
+ pInPos += sizeof( g_MulticastIP );
+
+ g_bReceivedMulticastIP = true;
+ }
+ return true;
+
+ case VMPI_FSPACKETID_FILE_RESPONSE:
+ {
+ CCriticalSectionLock csLock( &g_FileResponsesCS );
+ csLock.Lock();
+
+ CFileResponse res;
+ res.m_RequestID = *((int*)&pBuf->data[2]);
+ res.m_Response = *((int*)&pBuf->data[6]);
+ res.m_bZeroLength = *((bool*)&pBuf->data[10]);
+
+ g_FileResponses.AddToTail( res );
+ }
+ return true;
+
+ case VMPI_FSPACKETID_FILE_CHUNK:
+ {
+ int nDataBytes = pBuf->getLen() - 2;
+
+ CFileChunkPacket *pPacket = (CFileChunkPacket*)malloc( sizeof( CFileChunkPacket ) + nDataBytes - 1 );
+ memcpy( pPacket->m_Data, &pBuf->data[2], nDataBytes );
+ pPacket->m_Len = nDataBytes;
+
+ CCriticalSectionLock csLock( &g_FileResponsesCS );
+ csLock.Lock();
+ g_FileChunkPackets.AddToTail( pPacket );
+ }
+ return true;
+
+ default:
+ return false;
+ }
+}
+
+CSysModule* CWorkerVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly )
+{
+ return Sys_LoadModule( pFileName );
+}
+
+void CWorkerVMPIFileSystem::UnloadModule( CSysModule *pModule )
+{
+ Sys_UnloadModule( pModule );
+}