diff options
| author | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
|---|---|---|
| committer | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
| commit | 3bf9df6b2785fa6d951086978a3e66f49427166a (patch) | |
| tree | 2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi_filesystem_worker.cpp | |
| download | archived-source-engine-2018-hl2-src-master.tar.xz archived-source-engine-2018-hl2-src-master.zip | |
Diffstat (limited to 'utils/vmpi/vmpi_filesystem_worker.cpp')
| -rw-r--r-- | utils/vmpi/vmpi_filesystem_worker.cpp | 815 |
1 files changed, 815 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_filesystem_worker.cpp b/utils/vmpi/vmpi_filesystem_worker.cpp new file mode 100644 index 0000000..ef5820d --- /dev/null +++ b/utils/vmpi/vmpi_filesystem_worker.cpp @@ -0,0 +1,815 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +//=============================================================================// + +#include <winsock2.h> +#include "vmpi_filesystem_internal.h" +#include "threadhelpers.h" +#include "zlib.h" + + +#define NUM_BUFFERED_CHUNK_ACKS 512 +#define ACK_FLUSH_INTERVAL 500 // Flush the ack queue twice per second. + + +static bool g_bReceivedMulticastIP = false; +static CIPAddr g_MulticastIP; + + +CCriticalSection g_FileResponsesCS; + +class CFileResponse +{ +public: + int m_RequestID; + int m_Response; + bool m_bZeroLength; +}; + +CUtlVector<CFileResponse> g_FileResponses; +int g_RequestID = 0; + + +class CFileChunkPacket +{ +public: + int m_Len; + char m_Data[1]; +}; +CUtlLinkedList<CFileChunkPacket*, int> g_FileChunkPackets; // This is also protected by g_FileResponsesCS. + + +// ------------------------------------------------------------------------------------------------------------------------ // +// Classes. +// ------------------------------------------------------------------------------------------------------------------------ // + +class CWorkerFile +{ +public: + const char* GetFilename() { return m_Filename.Base(); } + const char* GetPathID() { return m_PathID.Base(); } + bool IsReadyToRead() const { return m_nChunksToReceive == 0; } + + +public: + CFastTimer m_Timer; // To see how long it takes to download the file. + + // This has to be sent explicitly as part of the file info or else the protocol + // breaks on empty files. + bool m_bZeroLength; + + // This is false until we get any packets about the file. In the packets, + // we find out what the size is supposed to be. + bool m_bGotCompressedSize; + + // The ID the master uses to refer to this file. + int m_FileID; + + CUtlVector<char> m_Filename; + CUtlVector<char> m_PathID; + + // First data comes in here, then when it's all there, it is inflated into m_UncompressedData. + CUtlVector<char> m_CompressedData; + + // 1 bit for each chunk. + CUtlVector<unsigned char> m_ChunksReceived; + + // When this is zero, the file is done being received and m_UncompressedData is valid. + int m_nChunksToReceive; + CUtlVector<char> m_UncompressedData; +}; + + + +// ------------------------------------------------------------------------------------------------------------------------ // +// Global helpers. +// ------------------------------------------------------------------------------------------------------------------------ // + +static void RecvMulticastIP( CIPAddr *pAddr ) +{ + while ( !g_bReceivedMulticastIP ) + VMPI_DispatchNextMessage(); + + *pAddr = g_MulticastIP; +} + + +static bool ZLibDecompress( const void *pInput, int inputLen, void *pOut, int outLen ) +{ + if ( inputLen == 0 ) + { + // Zero-length file? + return true; + } + + z_stream decompressStream; + + // Initialize the decompression stream. + memset( &decompressStream, 0, sizeof( decompressStream ) ); + if ( inflateInit( &decompressStream ) != Z_OK ) + return false; + + // Decompress all this stuff and write it to the file. + decompressStream.next_in = (unsigned char*)pInput; + decompressStream.avail_in = inputLen; + + char *pOutChar = (char*)pOut; + while ( decompressStream.avail_in ) + { + decompressStream.total_out = 0; + decompressStream.next_out = (unsigned char*)pOutChar; + decompressStream.avail_out = outLen - (pOutChar - (char*)pOut); + + int ret = inflate( &decompressStream, Z_NO_FLUSH ); + if ( ret != Z_OK && ret != Z_STREAM_END ) + return false; + + + pOutChar += decompressStream.total_out; + + if ( ret == Z_STREAM_END ) + { + if ( (pOutChar - (char*)pOut) == outLen ) + { + return true; + } + else + { + Assert( false ); + return false; + } + } + } + + Assert( false ); // Should have gotten to Z_STREAM_END. + return false; +} + + +// ------------------------------------------------------------------------------------------------------------------------ // +// CWorkerMulticastListener implementation. +// ------------------------------------------------------------------------------------------------------------------------ // + +class CWorkerMulticastListener +{ +public: + CWorkerMulticastListener() + { + m_nUnfinishedFiles = 0; + } + + ~CWorkerMulticastListener() + { + Term(); + } + + bool Init( const CIPAddr &mcAddr ) + { + m_MulticastAddr = mcAddr; + m_hMainThread = GetCurrentThread(); + return true; + } + + void Term() + { + m_WorkerFiles.PurgeAndDeleteElements(); + } + + + CWorkerFile* RequestFileFromServer( const char *pFilename, const char *pPathID ) + { + Assert( pPathID ); + Assert( FindWorkerFile( pFilename, pPathID ) == NULL ); + + // Send a request to the master to find out if this file even exists. + CCriticalSectionLock csLock( &g_FileResponsesCS ); + csLock.Lock(); + int requestID = g_RequestID++; + csLock.Unlock(); + + unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_FILE_REQUEST }; + const void *pChunks[4] = { packetID, &requestID, (void*)pFilename, pPathID }; + int chunkLengths[4] = { sizeof( packetID ), sizeof( requestID ), strlen( pFilename ) + 1, strlen( pPathID ) + 1 }; + VMPI_SendChunks( pChunks, chunkLengths, ARRAYSIZE( pChunks ), 0 ); + + // Wait for the file ID to come back. + CFileResponse response; + response.m_Response = -1; + response.m_bZeroLength = true; + + // We're in a worker thread.. the main thread should be dispatching all the messages, so let it + // do that until we get our response. + while ( 1 ) + { + bool bGotIt = false; + csLock.Lock(); + for ( int iResponse=0; iResponse < g_FileResponses.Count(); iResponse++ ) + { + if ( g_FileResponses[iResponse].m_RequestID == requestID ) + { + response = g_FileResponses[iResponse]; + g_FileResponses.Remove( iResponse ); + bGotIt = true; + break; + } + } + csLock.Unlock(); + + if ( bGotIt ) + break; + + if ( GetCurrentThread() == m_hMainThread ) + VMPI_DispatchNextMessage( 20 ); + else + Sleep( 20 ); + } + + // If we get -1 back, it means the file doesn't exist. + int fileID = response.m_Response; + if ( fileID == -1 ) + return NULL; + + CWorkerFile *pTestFile = new CWorkerFile; + + pTestFile->m_Filename.SetSize( strlen( pFilename ) + 1 ); + strcpy( pTestFile->m_Filename.Base(), pFilename ); + + pTestFile->m_PathID.SetSize( strlen( pPathID ) + 1 ); + strcpy( pTestFile->m_PathID.Base(), pPathID ); + + pTestFile->m_FileID = fileID; + pTestFile->m_nChunksToReceive = 9999; + pTestFile->m_Timer.Start(); + m_WorkerFiles.AddToTail( pTestFile ); + pTestFile->m_bGotCompressedSize = false; + pTestFile->m_bZeroLength = response.m_bZeroLength; + + ++m_nUnfinishedFiles; + + return pTestFile; + } + + void FlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime ) + { + if ( nChunksToAck ) + { + // Tell the master we received this chunk. + unsigned char packetID[2] = { VMPI_PACKETID_FILESYSTEM, VMPI_FSPACKETID_CHUNK_RECEIVED }; + void *pChunks[2] = { packetID, chunksToAck }; + int chunkLengths[2] = { sizeof( packetID ), nChunksToAck * 4 }; + VMPI_SendChunks( pChunks, chunkLengths, 2, 0 ); + nChunksToAck = 0; + } + + lastAckTime = GetTickCount(); + } + + void MaybeFlushAckChunks( unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], int &nChunksToAck, DWORD &lastAckTime ) + { + if ( nChunksToAck && GetTickCount() - lastAckTime > ACK_FLUSH_INTERVAL ) + FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); + } + + void AddAckChunk( + unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2], + int &nChunksToAck, + DWORD &lastAckTime, + int fileID, + int iChunk ) + { + chunksToAck[nChunksToAck][0] = (unsigned short)fileID; + chunksToAck[nChunksToAck][1] = (unsigned short)iChunk; + + // TCP filesystem acks all chunks immediately so it'll send more. + ++nChunksToAck; + if ( nChunksToAck == NUM_BUFFERED_CHUNK_ACKS || VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_TCP ) + { + FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); + } + } + + // Returns the length of the packet's data or -1 if there is nothing. + int CheckFileChunkPackets( char *data, int dataSize ) + { + // Using TCP.. pop the next received packet off the stack. + CCriticalSectionLock csLock( &g_FileResponsesCS ); + csLock.Lock(); + if ( g_FileChunkPackets.Count() <= 0 ) + return -1; + + CFileChunkPacket *pPacket = g_FileChunkPackets[ g_FileChunkPackets.Head() ]; + g_FileChunkPackets.Remove( g_FileChunkPackets.Head() ); + + // Yes, this is inefficient, but the amount of data we're handling here is tiny so the + // effect is negligible. + int len; + if ( pPacket->m_Len > dataSize ) + { + len = -1; + Warning( "CWorkerMulticastListener::ListenFor: Got a section of data too long (%d bytes).", pPacket->m_Len ); + } + else + { + memcpy( data, pPacket->m_Data, pPacket->m_Len ); + len = pPacket->m_Len; + } + + free( pPacket ); + return len; + } + + void ShowSDKWorkerMsg( const char *pMsg, ... ) + { + if ( !g_bMPIMaster && VMPI_IsSDKMode() ) + { + va_list marker; + va_start( marker, pMsg ); + char str[4096]; + V_vsnprintf( str, sizeof( str ), pMsg, marker ); + va_end( marker ); + Msg( "%s", str ); + } + } + + // This is the main function the workers use to pick files out of the multicast stream. + // The app is waiting for a specific file, but we receive and ack any files we can until + // we get the file they're looking for, then we return. + // + // NOTE: ideally, this would be in a thread, but it adds lots of complications and may + // not be worth it. + CWorkerFile* ListenFor( const char *pFilename, const char *pPathID ) + { + CWorkerFile *pFile = FindWorkerFile( pFilename, pPathID ); + if ( !pFile ) + { + // Ok, we haven't requested this file yet. Create an entry for it and + // tell the master we'd like this file. + pFile = RequestFileFromServer( pFilename, pPathID ); + if ( !pFile ) + return NULL; + + // If it's zero-length, we can return right now. + if ( pFile->m_bZeroLength ) + { + --m_nUnfinishedFiles; + return pFile; + } + } + + // Setup a filename to print some debug spew with. + char printableFilename[58]; + if ( V_strlen( pFilename ) > ARRAYSIZE( printableFilename ) - 1 ) + { + V_strncpy( printableFilename, "[...]", sizeof( printableFilename ) ); + V_strncat( printableFilename, &pFilename[V_strlen(pFilename) - ARRAYSIZE(printableFilename) + 1 + V_strlen(printableFilename)], sizeof( printableFilename ) ); + } + else + { + V_strncpy( printableFilename, pFilename, sizeof( printableFilename ) ); + } + ShowSDKWorkerMsg( "\rRecv %s (0%%) ", printableFilename ); + int iChunkPayloadSize = VMPI_GetChunkPayloadSize(); + + // Now start listening to the stream. + // Note: no need to setup anything when in TCP mode - we just use the regular + // VMPI dispatch stuff to handle that. + ISocket *pSocket = NULL; + if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_MULTICAST ) + { + pSocket = CreateMulticastListenSocket( m_MulticastAddr ); + + if ( !pSocket ) + { + char str[512]; + IP_GetLastErrorString( str, sizeof( str ) ); + Warning( "CreateMulticastListenSocket (%d.%d.%d.%d:%d) failed\n%s\n", EXPAND_ADDR( m_MulticastAddr ), str ); + return NULL; + } + } + else if ( VMPI_GetFileSystemMode() == VMPI_FILESYSTEM_BROADCAST ) + { + pSocket = CreateIPSocket(); + if ( !pSocket->BindToAny( m_MulticastAddr.port ) ) + { + pSocket->Release(); + pSocket = NULL; + } + } + + unsigned short chunksToAck[NUM_BUFFERED_CHUNK_ACKS][2]; + int nChunksToAck = 0; + DWORD lastAckTime = GetTickCount(); + + // Now just receive multicast data until this file has been received. + while ( m_nUnfinishedFiles > 0 ) + { + char data[MAX_CHUNK_PAYLOAD_SIZE+1024]; + int len = -1; + + if ( pSocket ) + { + CIPAddr ipFrom; + len = pSocket->RecvFrom( data, sizeof( data ), &ipFrom ); + } + else + { + len = CheckFileChunkPackets( data, sizeof( data ) ); + } + + if ( len == -1 ) + { + // Sleep for 10ms and also handle socket errors. + Sleep( 0 ); + VMPI_DispatchNextMessage( 10 ); + continue; + } + + g_nMulticastBytesReceived += len; + + // Alrighty. Figure out what the deal is with this file. + CMulticastFileInfo *pInfo = (CMulticastFileInfo*)data; + int *piChunk = (int*)( pInfo + 1 ); + const char *pTestFilename = (const char*)( piChunk + 1 ); + const char *pPayload = pTestFilename + strlen( pFilename ) + 1; + int payloadLen = len - ( pPayload - data ); + if ( payloadLen < 0 ) + { + Warning( "CWorkerMulticastListener::ListenFor: invalid packet received on multicast group\n" ); + continue; + } + + + if ( pInfo->m_FileID != pFile->m_FileID ) + continue; + + CWorkerFile *pTestFile = FindWorkerFile( pInfo->m_FileID ); + if ( !pTestFile ) + Error( "FindWorkerFile( %s ) failed\n", pTestFilename ); + + // TODO: reenable this code and disable the if right above here. + // We always get "invalid payload length" errors on the workers when using this, but + // I haven't been able to figure out why yet. + /* + // Put the data into whatever file it belongs in. + if ( !pTestFile ) + { + pTestFile = RequestFileFromServer( pTestFilename ); + if ( !pTestFile ) + continue; + } + */ + + // Is this the first packet about this file? + if ( !pTestFile->m_bGotCompressedSize ) + { + pTestFile->m_bGotCompressedSize = true; + pTestFile->m_CompressedData.SetSize( pInfo->m_CompressedSize ); + pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize ); + pTestFile->m_ChunksReceived.SetSize( PAD_NUMBER( pInfo->m_nChunks, 8 ) / 8 ); + pTestFile->m_nChunksToReceive = pInfo->m_nChunks; + memset( pTestFile->m_ChunksReceived.Base(), 0, pTestFile->m_ChunksReceived.Count() ); + } + + // Validate the chunk index and uncompressed size. + int iChunk = *piChunk; + if ( iChunk < 0 || iChunk >= pInfo->m_nChunks ) + { + Error( "ListenFor(): invalid chunk index (%d) for file '%s'\n", iChunk, pTestFilename ); + } + + // Only handle this if we didn't already received the chunk. + if ( !(pTestFile->m_ChunksReceived[iChunk >> 3] & (1 << (iChunk & 7))) ) + { + // Make sure the file is properly setup to receive the data into. + if ( (int)pInfo->m_UncompressedSize != pTestFile->m_UncompressedData.Count() || + (int)pInfo->m_CompressedSize != pTestFile->m_CompressedData.Count() ) + { + Error( "ListenFor(): invalid compressed or uncompressed size.\n" + "pInfo = '%s', pTestFile = '%s'\n" + "Compressed (pInfo = %d, pTestFile = %d)\n" + "Uncompressed (pInfo = %d, pTestFile = %d)\n", + pTestFilename, + pTestFile->GetFilename(), + pInfo->m_CompressedSize, + pTestFile->m_CompressedData.Count(), + pInfo->m_UncompressedSize, + pTestFile->m_UncompressedData.Count() + ); + } + + int iChunkStart = iChunk * iChunkPayloadSize; + int iChunkEnd = min( iChunkStart + iChunkPayloadSize, pTestFile->m_CompressedData.Count() ); + int chunkLen = iChunkEnd - iChunkStart; + + if ( chunkLen != payloadLen ) + { + Error( "ListenFor(): invalid payload length for '%s' (%d should be %d)\n" + "pInfo = '%s', pTestFile = '%s'\n" + "Chunk %d out of %d. Compressed size: %d\n", + pTestFile->GetFilename(), + payloadLen, + chunkLen, + pTestFilename, + pTestFile->GetFilename(), + iChunk, + pInfo->m_nChunks, + pInfo->m_CompressedSize + ); + } + + memcpy( &pTestFile->m_CompressedData[iChunkStart], pPayload, chunkLen ); + pTestFile->m_ChunksReceived[iChunk >> 3] |= (1 << (iChunk & 7)); + + --pTestFile->m_nChunksToReceive; + + if ( pTestFile == pFile ) + { + int percent = 100 - (100 * pFile->m_nChunksToReceive) / pInfo->m_nChunks; + ShowSDKWorkerMsg( "\rRecv %s (%d%%) [chunk %d/%d] ", printableFilename, percent, pInfo->m_nChunks - pFile->m_nChunksToReceive, pInfo->m_nChunks ); + } + + // Remember to ack what we received. + AddAckChunk( chunksToAck, nChunksToAck, lastAckTime, pInfo->m_FileID, iChunk ); + + // If we're done receiving the data, unpack it. + if ( pTestFile->m_nChunksToReceive == 0 ) + { + // Ack the file. + FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); + + pTestFile->m_Timer.End(); + + pTestFile->m_UncompressedData.SetSize( pInfo->m_UncompressedSize ); + --m_nUnfinishedFiles; + + if ( !ZLibDecompress( + pTestFile->m_CompressedData.Base(), + pTestFile->m_CompressedData.Count(), + pTestFile->m_UncompressedData.Base(), + pTestFile->m_UncompressedData.Count() ) ) + { + if ( pSocket ) + pSocket->Release(); + FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); + Error( "ZLibDecompress failed.\n" ); + return NULL; + } + + char str[512]; + V_snprintf( str, sizeof( str ), "Got %s (%dk) in %.2fs", + printableFilename, + (pTestFile->m_UncompressedData.Count() + 511) / 1024, + pTestFile->m_Timer.GetDuration().GetSeconds() + ); + Msg( "\r%-79s\n", str ); + + // Won't be needing this anymore. + pTestFile->m_CompressedData.Purge(); + } + } + + MaybeFlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); + } + + Assert( pFile->IsReadyToRead() ); + FlushAckChunks( chunksToAck, nChunksToAck, lastAckTime ); + if ( pSocket ) + pSocket->Release(); + + return pFile; + } + + CWorkerFile* FindWorkerFile( const char *pFilename, const char *pPathID ) + { + FOR_EACH_LL( m_WorkerFiles, i ) + { + CWorkerFile *pWorkerFile = m_WorkerFiles[i]; + + if ( stricmp( pWorkerFile->GetFilename(), pFilename ) == 0 && stricmp( pWorkerFile->GetPathID(), pPathID ) == 0 ) + return pWorkerFile; + } + return NULL; + } + + CWorkerFile* FindWorkerFile( int fileID ) + { + FOR_EACH_LL( m_WorkerFiles, i ) + { + if ( m_WorkerFiles[i]->m_FileID == fileID ) + return m_WorkerFiles[i]; + } + return NULL; + } + + +private: + CIPAddr m_MulticastAddr; + + CUtlLinkedList<CWorkerFile*, int> m_WorkerFiles; + + HANDLE m_hMainThread; + + // How many files do we have open that we haven't finished receiving from the server yet? + // We always keep waiting for data until this is zero. + int m_nUnfinishedFiles; +}; + + + +// ------------------------------------------------------------------------------------------------------------------------ // +// CWorkerVMPIFileSystem implementation. +// ------------------------------------------------------------------------------------------------------------------------ // + +class CWorkerVMPIFileSystem : public CBaseVMPIFileSystem +{ +public: + InitReturnVal_t Init(); + virtual void Term(); + + virtual FileHandle_t Open( const char *pFilename, const char *pOptions, const char *pathID ); + virtual bool HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID ); + + virtual void CreateVirtualFile( const char *pFilename, const void *pData, int fileLength ); + virtual long GetFileTime( const char *pFileName, const char *pathID ); + virtual bool IsFileWritable( const char *pFileName, const char *pPathID ); + virtual bool SetFileWritable( char const *pFileName, bool writable, const char *pPathID ); + + virtual CSysModule *LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly ); + virtual void UnloadModule( CSysModule *pModule ); + +private: + CWorkerMulticastListener m_Listener; +}; + + +CBaseVMPIFileSystem* CreateWorkerVMPIFileSystem() +{ + CWorkerVMPIFileSystem *pRet = new CWorkerVMPIFileSystem; + g_pBaseVMPIFileSystem = pRet; + if ( pRet->Init() ) + { + return pRet; + } + else + { + delete pRet; + g_pBaseVMPIFileSystem = NULL; + return NULL; + } +} + + +InitReturnVal_t CWorkerVMPIFileSystem::Init() +{ + // Get the multicast addr to listen on. + CIPAddr mcAddr; + RecvMulticastIP( &mcAddr ); + + return m_Listener.Init( mcAddr ) ? INIT_OK : INIT_FAILED; +} + + +void CWorkerVMPIFileSystem::Term() +{ + m_Listener.Term(); +} + + +FileHandle_t CWorkerVMPIFileSystem::Open( const char *pFilename, const char *pOptions, const char *pathID ) +{ + Assert( g_bUseMPI ); + + // When it finally asks the filesystem for a file, it'll pass NULL for pathID if it's "". + if ( !pathID ) + pathID = ""; + + if ( g_bDisableFileAccess ) + Error( "Open( %s, %s ) - file access has been disabled.", pFilename, pOptions ); + + // Workers can't open anything for write access. + bool bWriteAccess = (Q_stristr( pOptions, "w" ) != 0); + if ( bWriteAccess ) + return FILESYSTEM_INVALID_HANDLE; + + // Do we have this file's data already? + CWorkerFile *pFile = m_Listener.FindWorkerFile( pFilename, pathID ); + if ( !pFile || !pFile->IsReadyToRead() ) + { + // Ok, start listening to the multicast stream until we get the file we want. + + // NOTE: it might make sense here to have the client ask for a list of ALL the files that + // the master currently has and wait to receive all of them (so we don't come back a bunch + // of times and listen + + // NOTE NOTE: really, the best way to do this is to have a thread on the workers that sits there + // and listens to the multicast stream. Any time the master opens a new file up, it assumes + // all the workers need the file, and it starts to send it on the multicast stream until + // the worker threads respond that they all have it. + // + // (NOTE: this probably means that the clients would have to ack the chunks on a UDP socket that + // the thread owns). + // + // This would simplify all the worries about a client missing half the stream and having to + // wait for another cycle through it. + pFile = m_Listener.ListenFor( pFilename, pathID ); + + if ( !pFile ) + { + return FILESYSTEM_INVALID_HANDLE; + } + } + + // Ok! Got the file. now setup a memory stream they can read out of it with. + CVMPIFile_Memory *pOut = new CVMPIFile_Memory; + pOut->Init( pFile->m_UncompressedData.Base(), pFile->m_UncompressedData.Count(), strchr( pOptions, 't' ) ? 't' : 'b' ); + return (FileHandle_t)pOut; +} + + +void CWorkerVMPIFileSystem::CreateVirtualFile( const char *pFilename, const void *pData, int fileLength ) +{ + Error( "CreateVirtualFile not supported in VMPI worker filesystem." ); +} + + +long CWorkerVMPIFileSystem::GetFileTime( const char *pFileName, const char *pathID ) +{ + Error( "GetFileTime not supported in VMPI worker filesystem." ); + return 0; +} + + +bool CWorkerVMPIFileSystem::IsFileWritable( const char *pFileName, const char *pPathID ) +{ + Error( "GetFileTime not supported in VMPI worker filesystem." ); + return false; +} + + +bool CWorkerVMPIFileSystem::SetFileWritable( char const *pFileName, bool writable, const char *pPathID ) +{ + Error( "GetFileTime not supported in VMPI worker filesystem." ); + return false; +} + +bool CWorkerVMPIFileSystem::HandleFileSystemPacket( MessageBuffer *pBuf, int iSource, int iPacketID ) +{ + // Handle this packet. + int subPacketID = pBuf->data[1]; + switch( subPacketID ) + { + case VMPI_FSPACKETID_MULTICAST_ADDR: + { + char *pInPos = &pBuf->data[2]; + + g_MulticastIP = *((CIPAddr*)pInPos); + pInPos += sizeof( g_MulticastIP ); + + g_bReceivedMulticastIP = true; + } + return true; + + case VMPI_FSPACKETID_FILE_RESPONSE: + { + CCriticalSectionLock csLock( &g_FileResponsesCS ); + csLock.Lock(); + + CFileResponse res; + res.m_RequestID = *((int*)&pBuf->data[2]); + res.m_Response = *((int*)&pBuf->data[6]); + res.m_bZeroLength = *((bool*)&pBuf->data[10]); + + g_FileResponses.AddToTail( res ); + } + return true; + + case VMPI_FSPACKETID_FILE_CHUNK: + { + int nDataBytes = pBuf->getLen() - 2; + + CFileChunkPacket *pPacket = (CFileChunkPacket*)malloc( sizeof( CFileChunkPacket ) + nDataBytes - 1 ); + memcpy( pPacket->m_Data, &pBuf->data[2], nDataBytes ); + pPacket->m_Len = nDataBytes; + + CCriticalSectionLock csLock( &g_FileResponsesCS ); + csLock.Lock(); + g_FileChunkPackets.AddToTail( pPacket ); + } + return true; + + default: + return false; + } +} + +CSysModule* CWorkerVMPIFileSystem::LoadModule( const char *pFileName, const char *pPathID, bool bValidatedDllOnly ) +{ + return Sys_LoadModule( pFileName ); +} + +void CWorkerVMPIFileSystem::UnloadModule( CSysModule *pModule ) +{ + Sys_UnloadModule( pModule ); +} |