diff options
| author | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
|---|---|---|
| committer | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
| commit | 3bf9df6b2785fa6d951086978a3e66f49427166a (patch) | |
| tree | 2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi.cpp | |
| download | archived-source-engine-2018-hl2-src-master.tar.xz archived-source-engine-2018-hl2-src-master.zip | |
Diffstat (limited to 'utils/vmpi/vmpi.cpp')
| -rw-r--r-- | utils/vmpi/vmpi.cpp | 2478 |
1 files changed, 2478 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi.cpp b/utils/vmpi/vmpi.cpp new file mode 100644 index 0000000..ceed5fb --- /dev/null +++ b/utils/vmpi/vmpi.cpp @@ -0,0 +1,2478 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: This module implements the subset of MPI that VRAD and VVIS use. +// +// $NoKeywords: $ +//=============================================================================// + +#include <windows.h> +#include <io.h> +#include <conio.h> +#include <sys/stat.h> +#include <stdio.h> +#include <direct.h> +#include "iphelpers.h" +#include "utlvector.h" +#include "utllinkedlist.h" +#include "vmpi.h" +#include "bitbuf.h" +#include "tier1/strtools.h" +#include "threadhelpers.h" +#include "IThreadedTCPSocket.h" +#include "vstdlib/random.h" +#include "vmpi_distribute_work.h" +#include "filesystem.h" +#include "checksum_md5.h" +#include "tslist.h" +#include "tier0/icommandline.h" + + +#define DEFAULT_MAX_WORKERS 32 // Unless they specify -mpi_MaxWorkers, it will stop accepting workers after it gets this many. +int g_nMaxWorkerCount = DEFAULT_MAX_WORKERS; + +#define VMPI_INTERNAL_PACKET_ID 27 + #define VMPI_INTERNAL_SUBPACKET_MACHINE_NAME 1 + #define VMPI_INTERNAL_SUBPACKET_COMMAND_LINE 2 + #define VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE 3 + #define VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET 4 + #define VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE 5 + #define VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME 6 + + +typedef CUtlVector<char> PersistentPacket; + +CCriticalSection g_PersistentPacketsCS; +CUtlLinkedList<PersistentPacket*> g_PersistentPackets; + + +// Command-line parameters list. +#define VMPI_PARAM( paramName, paramFlags, helpText ) {paramName, paramFlags, "-"#paramName, helpText}, +class CVMPIParam +{ +public: + EVMPICmdLineParam m_eParam; + int m_ParamFlags; + const char *m_pName; + const char *m_pHelpText; +}; +static CVMPIParam g_VMPIParams[] = +{ + {k_eVMPICmdLineParam_FirstParam, 0, "k_eVMPICmdLineParam_FirstParam", "unused"}, + {k_eVMPICmdLineParam_VMPIParam, 0, "mpi", "Enable VMPI."}, +#include "vmpi_parameters.h" +}; +#undef VMPI_PARAM + + +// ---------------------------------------------------------------------------------------- // +// Globals. +// ---------------------------------------------------------------------------------------- // + +class CVMPIConnection; + +// Used by -mpi_AutoRestart. +CUtlVector<char*> g_OriginalCommandLineParameters; + +// This queues up all the incoming VMPI messages. +CCriticalSection g_VMPIMessagesCS; +CUtlLinkedList< CTCPPacket*, int > g_VMPIMessages; +CEvent g_VMPIMessagesEvent; // This is set when there are messages in the queue. + +// These are used to notify the main thread when some socket had OnError() called on it. +CUtlLinkedList<CVMPIConnection*,int> g_ErrorSockets; +CEvent g_ErrorSocketsEvent; +CCriticalSection g_ErrorSocketsCS; +bool g_bTimingWaitDone = false; +bool g_bGroupPackets = false; + +#define MAX_VMPI_CONNECTIONS 512 +CVMPIConnection *g_Connections[MAX_VMPI_CONNECTIONS]; +int g_nConnections = 0; +CCriticalSection g_ConnectionsCS; + +// If true, then it will set certain thread priorities low. +bool g_bSetThreadPriorities = true; + +VMPIDispatchFn g_VMPIDispatch[MAX_VMPI_PACKET_IDS]; +CTSList<MessageBuffer *> g_DispatchBuffers; + +VMPIRunMode g_VMPIRunMode = VMPI_RUN_NETWORKED; +VMPIFileSystemMode g_VMPIFileSystemMode = VMPI_FILESYSTEM_TCP; + +static char g_GroupedPacketHeader[] = { VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET }; +static unsigned long g_LastFlushGroupedPacketsTime = 0; + +// Set to true if we're running under the SDK (i.e. vmpi_transfer.exe is not found). +bool g_bVMPISDKMode = false; +bool g_bVMPISDKModeSet = false; // If g_bVMPISDKMode has not been set, then VMPI_IsSDKMode just looks for VMPI_Transfer (and doesn't check the command line). + +int g_nBytesSent = 0; +int g_nMessagesSent = 0; +int g_nBytesReceived = 0; +int g_nMessagesReceived = 0; + +int g_nMulticastBytesSent = 0; +int g_nMulticastBytesReceived = 0; + + +CUtlLinkedList<VMPI_Disconnect_Handler,int> g_DisconnectHandlers; + +bool g_bUseMPI = false; +int g_iVMPIVerboseLevel = 0; +bool g_bMPIMaster = false; + +bool g_bMPI_Stats = false; +bool g_bMPI_StatsTextOutput = false; + +char g_CurrentStageString[128] = ""; +CCriticalSection g_CurrentStageCS; + +char g_MasterExeName[MAX_PATH]; +bool g_bReceivedMasterExeName = false; + + +// Change our window text. +HINSTANCE g_hKernel32DLL = NULL; +typedef HWND (*GetConsoleWndFn)(); +GetConsoleWndFn g_pConsoleWndFn = NULL; + + +// ---------------------------------------------------------------------------------------- // +// Classes. +// ---------------------------------------------------------------------------------------- // + +// This class is used while discovering what files the workers need. +class CDependencyInfo +{ +public: + class CDependencyFile + { + public: + char m_Name[MAX_PATH]; + }; + + + // This is the directory where the dependency files live (i.e. all the binaries that the workers need to run the job). + char m_DependencyFilesDir[MAX_PATH]; + + // "vrad.exe", "vvis.exe", etc. + char m_OriginalExeFilename[MAX_PATH]; + + CUtlVector<CDependencyFile*> m_Files; + + +public: + + CDependencyFile* FindFile( const char *pFilename ) + { + for ( int i=0; i < m_Files.Count(); i++ ) + { + if ( stricmp( pFilename, m_Files[i]->m_Name ) == 0 ) + return m_Files[i]; + } + return NULL; + } +}; + + +class CVMPIConnection : public ITCPSocketHandler +{ +public: + CVMPIConnection( int iConnection ) + { + m_iConnection = iConnection; + m_pSocket = NULL; + m_bIsAService = false; + + char str[512]; + Q_snprintf( str, sizeof( str ), "%d", iConnection ); + SetMachineName( str ); + m_JobWorkerID = 0xFFFFFFFF; + + m_bNameSet = false; + } + + ~CVMPIConnection() + { + if ( m_pSocket ) + m_pSocket->Release(); + } + + +public: + + void HandleDisconnect() + { + if ( m_pSocket ) + { + // Copy out the error string. + CCriticalSectionLock csLock( &g_ErrorSocketsCS ); + csLock.Lock(); + char str[512]; + Q_strncpy( str, m_ErrorString.Base(), sizeof( str ) ); + csLock.Unlock(); + + // Tell the app. + FOR_EACH_LL( g_DisconnectHandlers, i ) + g_DisconnectHandlers[i]( m_iConnection, str ); + + // Free our socket. + m_pSocket->Release(); + m_pSocket = NULL; + } + } + + + IThreadedTCPSocket* GetSocket() + { + return m_pSocket; + } + + + void SetMachineName( const char *pName ) + { + m_MachineName.CopyArray( pName, strlen( pName ) + 1 ); + m_bNameSet = true; + } + + const char* GetMachineName() + { + return m_MachineName.Base(); + } + + bool HasMachineNameBeenSet() + { + return m_bNameSet; + } + + +// ITCPSocketHandler implementation (thread-safe stuff). +public: + + virtual void Init( IThreadedTCPSocket *pSocket ) + { + m_pSocket = pSocket; + } + + virtual void OnPacketReceived( CTCPPacket *pPacket ) + { + // Set who this message came from. + pPacket->SetUserData( m_iConnection ); + Assert( m_iConnection >= 0 && m_iConnection < 2048 ); + + // Store it in the global list. + CCriticalSectionLock csLock( &g_VMPIMessagesCS ); + csLock.Lock(); + + g_VMPIMessages.AddToTail( pPacket ); + + if ( g_VMPIMessages.Count() == 1 ) + g_VMPIMessagesEvent.SetEvent(); + } + + virtual void OnError( int errorCode, const char *pErrorString ) + { + if ( !g_bMPIMaster ) + { + Msg( "%s - CVMPIConnection::OnError( %s )\n", GetMachineName(), pErrorString ); + } + + CCriticalSectionLock csLock( &g_ErrorSocketsCS ); + csLock.Lock(); + + m_ErrorString.CopyArray( pErrorString, strlen( pErrorString ) + 1 ); + + g_ErrorSockets.AddToTail( this ); + + // Notify the main thread that a socket is in trouble! + g_ErrorSocketsEvent.SetEvent(); + + // Make sure the main thread picks up this error soon. + InterlockedIncrement( &m_ErrorSignal ); + } + + +public: + + unsigned long m_JobWorkerID; + bool m_bIsAService; // If true, then this is just a service getting the files. Don't count it as an active worker. + + CUtlVector<int> m_GroupedChunkLengths; + CUtlVector<void*> m_GroupedChunks; + + +private: + + CUtlVector<char> m_MachineName; + CUtlVector<char> m_ErrorString; + long m_ErrorSignal; + int m_iConnection; + IThreadedTCPSocket *m_pSocket; + bool m_bNameSet; +}; + + +class CVMPIConnectionCreator : public IHandlerCreator +{ +public: + virtual ITCPSocketHandler* CreateNewHandler() + { + Assert( g_nConnections < MAX_VMPI_CONNECTIONS ); + CVMPIConnection *pRet = new CVMPIConnection( g_nConnections ); + g_Connections[g_nConnections++] = pRet; + return pRet; + } +}; + + + +// ---------------------------------------------------------------------------------------- // +// Helpers. +// ---------------------------------------------------------------------------------------- // + +const char* VMPI_FindArg( int argc, char **argv, const char *pName, const char *pDefault ) +{ + for ( int i=0; i < argc; i++ ) + { + if ( stricmp( argv[i], pName ) == 0 ) + { + if ( (i+1) < argc ) + return argv[i+1]; + else + return pDefault; + } + } + return NULL; +} + + +void ParseOptions( int argc, char **argv ) +{ + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_NoTimeout ) ) ) + ThreadedTCP_EnableTimeouts( false ); + + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_DontSetThreadPriorities ) ) ) + { + Msg( "%s found.\n", VMPI_GetParamString( mpi_DontSetThreadPriorities ) ); + g_bSetThreadPriorities = false; + ThreadedTCP_SetTCPSocketThreadPriorities( false ); + } + + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_GroupPackets ) ) ) + { + Msg( "%s found.\n", VMPI_GetParamString( mpi_GroupPackets ) ); + g_bGroupPackets = true; + } + + const char *pTransmitRate = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_FileTransmitRate ), "1" ); + if ( pTransmitRate ) + { + extern int MULTICAST_TRANSMIT_RATE; + MULTICAST_TRANSMIT_RATE = atoi( pTransmitRate ) * 1024; + } + + const char *pVerbose = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Verbose ), "1" ); + if ( pVerbose ) + { + if ( pVerbose[0] == '1' ) + g_iVMPIVerboseLevel = 1; + else if ( pVerbose[0] == '2' ) + g_iVMPIVerboseLevel = 2; + } + + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Stats ) ) ) + g_bMPI_Stats = true; + + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Stats_TextOutput ) ) ) + g_bMPI_StatsTextOutput = true; +} + + +void SetupDependencyFilename( CDependencyInfo *pInfo, const char *pPatchDirectory ) +{ + char baseExeFilename[512]; + if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) + Error( "GetModuleFileName failed." ); + + // If they're in patch mode, then the dependency files come out of a directory they've passed in. + // Otherwise, the files come from the same exe dir we're in (like c:\valve\game\bin). + if ( pPatchDirectory ) + { + V_strncpy( pInfo->m_DependencyFilesDir, pPatchDirectory, sizeof( pInfo->m_DependencyFilesDir ) ); + } + else + { + V_strncpy( pInfo->m_DependencyFilesDir, baseExeFilename, sizeof( pInfo->m_DependencyFilesDir ) ); + V_StripLastDir( pInfo->m_DependencyFilesDir, sizeof( pInfo->m_DependencyFilesDir ) ); + } + + // Get the exe filename. + V_strncpy( pInfo->m_OriginalExeFilename, V_UnqualifiedFileName( baseExeFilename ), sizeof( pInfo->m_OriginalExeFilename ) ); +} + + +bool ReadString( char *pOut, int maxLen, FILE *fp ) +{ + if ( !fgets( pOut, maxLen, fp ) || pOut[0] == 0 ) + return false; + + int len = strlen( pOut ); + if ( pOut[len - 1] == '\n' ) + pOut[len - 1] = 0; + + return true; +} + + +void ParseDependencyFile( CDependencyInfo *pInfo, const char *pDepFilename ) +{ + FILE *fp = fopen( pDepFilename, "rt" ); + if ( !fp ) + Error( "Can't find %s.", pDepFilename ); + + const char *pOptionalPrefix = "optional "; + + char tempStr[MAX_PATH]; + while ( ReadString( tempStr, sizeof( tempStr ), fp ) ) + { + CDependencyInfo::CDependencyFile *pFile = new CDependencyInfo::CDependencyFile; + bool bOptional = false; + if ( strstr( tempStr, "optional " ) == tempStr ) + { + bOptional = true; + Q_strncpy( pFile->m_Name, tempStr + strlen( pOptionalPrefix ), sizeof( pFile->m_Name ) ); + } + else + { + Q_strncpy( pFile->m_Name, tempStr, sizeof( pFile->m_Name ) ); + } + + // Now get the file info. + char fullFilename[MAX_PATH]; + V_ComposeFileName( pInfo->m_DependencyFilesDir, pFile->m_Name, fullFilename, sizeof( fullFilename ) ); + + if ( _access( fullFilename, 0 ) == 0 ) + { + pInfo->m_Files.AddToTail( pFile ); + } + else + { + delete pFile; + + if ( !bOptional ) + Error( "Can't find %s (listed in %s).", fullFilename, pDepFilename ); + } + } + + fclose( fp ); +} + + +void SetupDependenciesForPatch( CDependencyInfo *pInfo, const char *pPatchDirectory ) +{ + char searchStr[MAX_PATH]; + V_ComposeFileName( pPatchDirectory, "*.*", searchStr, sizeof( searchStr ) ); + + _finddata_t data; + long handle = _findfirst( searchStr, &data ); + if ( handle != -1 ) + { + do + { + if ( data.name[0] == '.' || (data.attrib & _A_SUBDIR) != 0 ) + continue; + + CDependencyInfo::CDependencyFile *pFile = new CDependencyInfo::CDependencyFile; + V_strncpy( pFile->m_Name, data.name, sizeof( pFile->m_Name ) ); + pInfo->m_Files.AddToTail( pFile ); + } while( _findnext( handle, &data ) == 0 ); + + _findclose( handle ); + } +} + + +void SetupDependencyInfo( CDependencyInfo *pInfo, const char *pDependencyFilename, bool bPatchMode ) +{ + if ( bPatchMode ) + { + const char *pPatchDirectory = pDependencyFilename; + + SetupDependencyFilename( pInfo, pPatchDirectory ); + SetupDependenciesForPatch( pInfo, pPatchDirectory ); + } + else + { + SetupDependencyFilename( pInfo, NULL ); + + // Parse the dependency file. + char depFilename[MAX_PATH]; + V_ComposeFileName( pInfo->m_DependencyFilesDir, pDependencyFilename, depFilename, sizeof( depFilename ) ); + ParseDependencyFile( pInfo, depFilename ); + } +} + + +int GetCurMicrosecondsAndSleep( int sleepLen ) +{ + Sleep( sleepLen ); + + CCycleCount cnt; + cnt.Sample(); + return cnt.GetMicroseconds(); +} + + +void CountActiveConnections( int *nRegularWorkers, int *nServiceDownloaders ) +{ + *nRegularWorkers = *nServiceDownloaders = 0; + int nTotalConnections = g_nConnections; + for ( int i=0; i < nTotalConnections; i++ ) + { + if ( VMPI_IsProcConnected( i ) ) + { + if ( VMPI_IsProcAService( i ) ) + (*nServiceDownloaders)++; + else + (*nRegularWorkers)++; + } + } +} + +// In this function, we update the window text to tell how many active workers there are. +void UpdateActiveConnectionsText() +{ + if ( !g_bMPIMaster || !g_pConsoleWndFn ) + return; + + HWND hWnd = g_pConsoleWndFn(); + if ( !hWnd ) + return; + + int nRegularWorkers, nDownloaders; + CountActiveConnections( &nRegularWorkers, &nDownloaders ); + + char str[512]; + if ( g_bVMPISDKMode ) + { + V_snprintf( str, sizeof( str ), "VMPI (SDK) - Workers: %d", nRegularWorkers ); + } + else + { + V_snprintf( str, sizeof( str ), "VMPI - Workers: %d, Downloaders: %d", nRegularWorkers, nDownloaders ); + } + SetWindowText( hWnd, str ); +} + + +void VMPI_SendMachineNameTo( int iProc ) +{ + const char *pMyName = VMPI_GetLocalMachineName(); + + unsigned char packetData[512]; + packetData[0] = VMPI_INTERNAL_PACKET_ID; + packetData[1] = VMPI_INTERNAL_SUBPACKET_MACHINE_NAME; + Q_strncpy( (char*)&packetData[2], pMyName, sizeof( packetData ) - 2 ); + VMPI_SendData( packetData, 2 + strlen( pMyName ) + 1, iProc ); +} + +static CVMPIConnection* FindConnectionBySocket( IThreadedTCPSocket *pSocket, bool bLockConnections ) +{ + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + if ( bLockConnections ) + connectionsLock.Lock(); + + for ( int i=0; i < g_nConnections; i++ ) + if ( g_Connections[i]->GetSocket() == pSocket ) + return g_Connections[i]; + + return NULL; +} + +static char* CopyString( const char *pStr ) +{ + int len = V_strlen( pStr ) + 1; + char *pArg = new char[len]; + Q_strncpy( pArg, pStr, len ); + return pArg; +} + +// ---------------------------------------------------------------------------------------- // +// Internal VMPI dispatch.. +// ---------------------------------------------------------------------------------------- // + +void VMPI_SetMachineName( int iProc, const char *pName ); + +CUtlVector<char*> g_WorkerCommandLine; +bool g_bReceivedWorkerCommandLine = false; + + +bool VMPI_InternalDispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID ) +{ + if ( pBuf->getLen() >= 2 ) + { + if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_MACHINE_NAME ) + { + if ( pBuf->getLen() >= 3 ) + { + VMPI_SetMachineName( iSource, &pBuf->data[2] ); + return true; + } + } + else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE ) + { + if ( !VMPI_IsSDKMode() ) + { + Warning( "Worker %d is running in SDK mode (and the master is not)!\n", iSource ); + } + return true; + } + else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_COMMAND_LINE ) + { + pBuf->setOffset( 2 ); + + int nArgs; + pBuf->read( &nArgs, sizeof( nArgs ) ); + for ( int i=0; i < nArgs; i++ ) + { + char str[4096]; + if ( pBuf->ReadString( str, sizeof( str ) ) == -1 ) + Error( "Error in ReadString() while reading command line." ); + + g_WorkerCommandLine.AddToTail( CopyString( str ) ); + } + + g_bReceivedWorkerCommandLine = true; + return true; + } + else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME ) + { + pBuf->setOffset( 2 ); + + if ( pBuf->ReadString( g_MasterExeName, sizeof( g_MasterExeName ) ) == -1 ) + Error( "Error in ReadString() while reading VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME." ); + + g_bReceivedMasterExeName = true; + return true; + } + else if ( pBuf->data[1] == VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE ) + { + g_bTimingWaitDone = true; + return true; + } + } + + return false; +} +CDispatchReg g_VMPIInternalDispatchReg( VMPI_INTERNAL_PACKET_ID, VMPI_InternalDispatchFn ); // register to handle the messages we want + + +void VMPI_SendCommandLine( int argc, char **argv ) +{ + MessageBuffer mb; + + char cPacketHeader[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_COMMAND_LINE}; + mb.write( cPacketHeader, sizeof( cPacketHeader ) ); + mb.write( &argc, sizeof( argc ) ); + for ( int i=0; i < argc; i++ ) + mb.WriteString( argv[i] ); + + VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT ); +} + +void VMPI_ReceiveCommandLine() +{ + // For verification purposes, tell the master we're trying to get the command line. + unsigned char chData[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_WAITING_FOR_COMMAND_LINE}; + VMPI_SendData( chData, sizeof( chData ), VMPI_MASTER_ID ); + + double startTime = Plat_FloatTime(); + while ( !g_bReceivedWorkerCommandLine ) + { + if ( Plat_FloatTime() - startTime > 30 ) + Error( "VMPI_ReceiveCommandLine: timeout. Is the master running in SDK mode?" ); + + VMPI_DispatchNextMessage( 10 * 1000 ); + } +} + + +void VMPI_SendExeName() +{ + MessageBuffer mb; + + char cPacketHeader[2] = {VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_VERIFY_EXE_NAME}; + mb.write( cPacketHeader, sizeof( cPacketHeader ) ); + + char baseExeFilename[MAX_PATH], fileBase[MAX_PATH]; + if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) + Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." ); + + V_FileBase( baseExeFilename, fileBase, sizeof( fileBase ) ); + mb.WriteString( fileBase ); + + VMPI_SendData( mb.data, mb.getLen(), VMPI_PERSISTENT ); +} + +void VMPI_ReceiveExeName() +{ + double startTime = Plat_FloatTime(); + while ( !g_bReceivedMasterExeName ) + { + if ( Plat_FloatTime() - startTime > 30 ) + Error( "VMPI_ReceiveExeName: timeout." ); + + VMPI_DispatchNextMessage( 10 * 1000 ); + } + + // Now compare the exe name we got with our own. + char baseExeFilename[MAX_PATH], fileBase[MAX_PATH]; + if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) + Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." ); + + // Unless we're a vmpi_transfer.. vmpi_transfer can always connect. + V_FileBase( baseExeFilename, fileBase, sizeof( fileBase ) ); + if ( V_stricmp( fileBase, "vmpi_transfer" ) != 0 ) + { + if ( V_stricmp( fileBase, g_MasterExeName ) != 0 ) + { + Error( "VMPI_ReceiveExeName: mismatched exe names (master: %s, me: %s).\nThis usually just means the master finished" + " a job like vvis really fast and started a vrad immediately, and an old vvis worker connected to the new vrad job.", + g_MasterExeName, fileBase ); + } + } +} + + +// ---------------------------------------------------------------------------------------- // +// CMasterBroadcaster +// This class broadcasts messages looking for workers. The app updates it as often as possible +// and it'll add workers as necessary. +// ---------------------------------------------------------------------------------------- // + +#define MASTER_BROADCAST_INTERVAL 600 // Send every N milliseconds. + +class CMasterBroadcaster +{ +public: + + CMasterBroadcaster(); + ~CMasterBroadcaster(); + + bool Init( int argc, char **argv, const char *pDependencyFilename, int nMaxWorkers, VMPIRunMode runMode, bool bPatchMode ); + void Term(); + + // What port is it listening on? + int GetListenPort() const; + + // These can be used to allow more workers on or filter who's able to connect + int GetMaxWorkers() const; + void IncreaseMaxWorkers( int count ); + void SetPassword( const char *pPassword ); + void SetNoTimeoutOption(); + + +private: + + void GetPatchWorkerList( int argc, char **argv ); + + +private: + + class CMasterBroadcastInfo + { + public: + int m_JobID[4]; + char m_Password[256]; + char m_WorkerExeFilename[MAX_PATH]; + CUtlVector<char*> m_Args; + char m_PatchVersion[32]; // 0 if not patching. + bool m_bForcePatch; + }; + + void ThreadFn(); + static DWORD WINAPI StaticThreadFn( LPVOID lpParameter ); + + bool Update(); + void BuildBroadcastPacket( bf_write &buf ); + + +private: + + ITCPConnectSocket *m_pListenSocket; + ITCPConnectSocket *m_pDownloaderListenSocket; + ISocket *m_pSocket; + + DWORD m_LastSendTime; + CMasterBroadcastInfo m_BroadcastInfo; + CUtlVector<CIPAddr> m_PatchWorkerIPs; // If in patch mode, these are the IPs we send the job request to (instead of broadcasting). + bool m_bPatching; + + CVMPIConnectionCreator m_ConnectionCreator; + int m_nMaxWorkers; + + HANDLE m_hThread; + CEvent m_hShutdownEvent; + CEvent m_hShutdownReply; + + VMPIRunMode m_RunMode; + int m_iListenPort; + int m_iDownloaderListenPort; +}; + + +CMasterBroadcaster::CMasterBroadcaster() +{ + m_pListenSocket = NULL; + m_pDownloaderListenSocket = NULL; + m_pSocket = NULL; + m_iListenPort = -1; + m_iDownloaderListenPort = -1; +} + +CMasterBroadcaster::~CMasterBroadcaster() +{ + Term(); +} + + +void CMasterBroadcaster::GetPatchWorkerList( int argc, char **argv ) +{ + m_PatchWorkerIPs.Purge(); + for ( int i=0; i < argc-1; i++ ) + { + if ( V_stricmp( argv[i], "-mpi_PatchWorkers" ) == 0 ) + { + int workerCount = atoi( argv[i+1] ); + for ( int iWorker=0; iWorker < workerCount; iWorker++ ) + { + int iArg = i+2 + iWorker; + if ( iArg >= argc ) + Error( "-mpi_PatchWorkers: %d specified for count, but not enough IPs following.\n", workerCount ); + + int a, b, c, d; + const char *pArg = argv[iArg]; + sscanf( pArg, "%d.%d.%d.%d", &a, &b, &c, &d ); + + CIPAddr addr; + addr.Init( a, b, c, d, 0 ); + m_PatchWorkerIPs.AddToTail( addr ); + } + return; + } + } +} + +bool CMasterBroadcaster::Init( + int argc, + char **argv, + const char *pDependencyFilename, + int nMaxWorkers, + VMPIRunMode runMode, + bool bPatchMode ) +{ + m_RunMode = runMode; + m_nMaxWorkers = nMaxWorkers; + + // Open the file that tells us which binaries we depend on. + CDependencyInfo dependencyInfo; + if ( m_RunMode == VMPI_RUN_NETWORKED && !g_bVMPISDKMode ) + { + SetupDependencyInfo( &dependencyInfo, pDependencyFilename, bPatchMode ); + } + + m_pListenSocket = NULL; + m_pDownloaderListenSocket = NULL; + + const char *pPortStr = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Port ) ); + if ( pPortStr ) + { + m_iListenPort = atoi( pPortStr ); + m_iDownloaderListenPort = m_iListenPort + 1; + m_pListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iListenPort ); + if ( !g_bVMPISDKMode ) + { + m_pDownloaderListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iDownloaderListenPort ); + } + } + else + { + // Create a socket to listen on. + CCycleCount cnt; + cnt.Sample(); + int iTime = (int)cnt.GetMicroseconds(); + srand( (unsigned)iTime ); + + for ( int iTest=VMPI_MASTER_FIRST_PORT; iTest <= VMPI_MASTER_LAST_PORT; iTest++ ) + { + m_iListenPort = iTest; + m_pListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iListenPort ); + if ( m_pListenSocket ) + break; + } + // No need to create the downloader in SDK mode. + if ( m_pListenSocket && !g_bVMPISDKMode ) + { + for ( int iTest=m_iListenPort+1; iTest <= VMPI_MASTER_LAST_PORT; iTest++ ) + { + m_iDownloaderListenPort = iTest; + if ( m_iDownloaderListenPort == m_iListenPort ) + continue; + + m_pDownloaderListenSocket = ThreadedTCP_CreateListener( &m_ConnectionCreator, m_iDownloaderListenPort ); + if ( m_pDownloaderListenSocket ) + break; + } + } + } + + if ( !m_pListenSocket || (!g_bVMPISDKMode && !m_pDownloaderListenSocket) ) + { + Error( "Can't bind a listen socket in port range [%d, %d].", VMPI_MASTER_PORT_FIRST, VMPI_MASTER_PORT_LAST ); + } + + + // Create a socket to broadcast from unless we're in the SDK in which case we don't broadcast. + m_bPatching = false; + if ( m_RunMode == VMPI_RUN_NETWORKED && !g_bVMPISDKMode ) + { + m_pSocket = CreateIPSocket(); + if ( !m_pSocket->BindToAny( 0 ) ) + Error( "MPI_Init_Master: can't bind a socket" ); + + m_BroadcastInfo.m_bForcePatch = false; + if ( bPatchMode ) + { + m_bPatching = true; + if ( VMPI_FindArg( argc, argv, "-mpi_ForcePatch", NULL ) ) + m_BroadcastInfo.m_bForcePatch = true; + + const char *pArg = VMPI_FindArg( argc, argv, "-mpi_PatchVersion", "0" ); + float iPatchVersion = atof( pArg ); + if ( iPatchVersion <= 0 || iPatchVersion >= ((1 << 15) - 1) ) + { + Error( "-mpi_PatchVersion <val> - val must be between 1.0 and 32767.0" ); + } + + V_strncpy( m_BroadcastInfo.m_PatchVersion, pArg, sizeof( m_BroadcastInfo.m_PatchVersion ) ); + } + else + { + m_BroadcastInfo.m_PatchVersion[0] = 0; + } + + // Come up with a unique job ID. + m_BroadcastInfo.m_JobID[0] = GetCurMicrosecondsAndSleep( 1 ); + m_BroadcastInfo.m_JobID[1] = GetCurMicrosecondsAndSleep( 1 ); + m_BroadcastInfo.m_JobID[2] = GetCurMicrosecondsAndSleep( 1 ); + m_BroadcastInfo.m_JobID[3] = GetCurMicrosecondsAndSleep( 1 ); + + const char *pPassword = VMPI_FindArg( argc, argv, "-mpi_pw", "" ); + Q_strncpy( m_BroadcastInfo.m_Password, pPassword ? pPassword : "", sizeof( m_BroadcastInfo.m_Password ) ); + Q_strncpy( m_BroadcastInfo.m_WorkerExeFilename, dependencyInfo.m_OriginalExeFilename, sizeof( m_BroadcastInfo.m_WorkerExeFilename ) ); + + // Store the command-line args. + m_BroadcastInfo.m_Args.Purge(); + for ( int i=1; i < argc; i++ ) + { + m_BroadcastInfo.m_Args.AddToTail( CopyString( argv[i] ) ); + } + // 0th arg is the exe name. + m_BroadcastInfo.m_Args.InsertBefore( 0, CopyString( m_BroadcastInfo.m_WorkerExeFilename ) ); + + // Now add arguments for each file they need to transmit. The service will use this to get all the files from the master before it starts the app. + for ( int i=0; i < dependencyInfo.m_Files.Count(); i++ ) + { + m_BroadcastInfo.m_Args.InsertAfter( 0, "-mpi_file" ); + m_BroadcastInfo.m_Args.InsertAfter( 1, CopyString( dependencyInfo.m_Files[i]->m_Name ) ); + } + + // Add -mpi_filebase so it can use absolute paths with the filesystem so we get the exact right set of files. + m_BroadcastInfo.m_Args.InsertAfter( 0, "-mpi_filebase" ); + m_BroadcastInfo.m_Args.InsertAfter( 1, CopyString( dependencyInfo.m_DependencyFilesDir ) ); + + if ( bPatchMode ) + { + GetPatchWorkerList( argc, argv ); + } + } + + + // Add ourselves as the first process (rank 0). + m_ConnectionCreator.CreateNewHandler(); + + // Initiate as many connections as we can for a few seconds. + m_LastSendTime = Plat_MSTime() - MASTER_BROADCAST_INTERVAL*2; + + + m_hShutdownEvent.Init( false, false ); + m_hShutdownReply.Init( false, false ); + + DWORD dwThreadID = 0; + m_hThread = CreateThread( + NULL, + 0, + &CMasterBroadcaster::StaticThreadFn, + this, + 0, + &dwThreadID ); + + if ( m_hThread ) + { + SetThreadPriority( m_hThread, THREAD_PRIORITY_HIGHEST ); + return true; + } + else + { + return false; + } +} + + +void CMasterBroadcaster::BuildBroadcastPacket( bf_write &buf ) +{ + // Broadcast out to tell all the machines we want workers. + buf.WriteByte( VMPI_PROTOCOL_VERSION ); + + buf.WriteString( m_BroadcastInfo.m_Password ); + + if ( m_BroadcastInfo.m_PatchVersion[0] == 0 ) + buf.WriteByte( VMPI_LOOKING_FOR_WORKERS ); + else + buf.WriteByte( VMPI_SERVICE_PATCH ); + + buf.WriteString( m_BroadcastInfo.m_PatchVersion ); + buf.WriteLong( m_iListenPort ); // Tell the port that we're listening on. + buf.WriteLong( m_BroadcastInfo.m_JobID[0] ); + buf.WriteLong( m_BroadcastInfo.m_JobID[1] ); + buf.WriteLong( m_BroadcastInfo.m_JobID[2] ); + buf.WriteLong( m_BroadcastInfo.m_JobID[3] ); + buf.WriteWord( m_BroadcastInfo.m_Args.Count() + 2 ); + + // Write the alternate exe name. + buf.WriteString( m_BroadcastInfo.m_WorkerExeFilename ); + + // Write the machine name of the master into the command line. It's ignored by the code, but it's useful + // if a job crashes the workers - by looking at the command line in vmpi_service, you can see who ran the job. + buf.WriteString( "-mpi_MasterName" ); + buf.WriteString( VMPI_GetLocalMachineName() ); + + for ( int i=1; i < m_BroadcastInfo.m_Args.Count(); i++ ) + buf.WriteString( m_BroadcastInfo.m_Args[i] ); + + buf.WriteByte( (unsigned char)m_BroadcastInfo.m_bForcePatch ); + buf.WriteShort( m_iDownloaderListenPort ); // Tell the port that we're listening for downloaders on. +} + +bool CMasterBroadcaster::Update() +{ + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + // Don't accept any more connections when we've hit the limit. + int nActiveConnections, nServiceDownloaders; + CountActiveConnections( &nActiveConnections, &nServiceDownloaders ); + if ( nActiveConnections >= m_nMaxWorkers ) + return false; + + // Only broadcast our presence so often. + if ( m_pSocket ) + { + DWORD curTime = Plat_MSTime(); + if ( curTime - m_LastSendTime >= MASTER_BROADCAST_INTERVAL ) + { + char packetData[512]; + bf_write packetBuf( "packetBuf", packetData, sizeof( packetData ) ); + BuildBroadcastPacket( packetBuf ); + + for ( int iBroadcastPort=VMPI_SERVICE_PORT; iBroadcastPort <= VMPI_LAST_SERVICE_PORT; iBroadcastPort++ ) + { + if ( m_bPatching ) + { + // Only send to this specific list of workers if necessary. + for ( int i=0; i < m_PatchWorkerIPs.Count(); i++ ) + { + CIPAddr addr = m_PatchWorkerIPs[i]; + addr.port = iBroadcastPort; + m_pSocket->SendTo( &addr, packetBuf.GetBasePointer(), packetBuf.GetNumBytesWritten() ); + } + } + else + { + m_pSocket->Broadcast( packetBuf.GetBasePointer(), packetBuf.GetNumBytesWritten(), iBroadcastPort ); + } + } + + // We don't want them to keep patching over and over. + if ( m_PatchWorkerIPs.Count() > 0 && m_BroadcastInfo.m_bForcePatch ) + m_PatchWorkerIPs.Purge(); + + m_LastSendTime = curTime; + } + } + + // First look for normal workers. + IThreadedTCPSocket *pNewConn = NULL; + bool bRet = m_pListenSocket->Update( &pNewConn, 0 ); + + // Now look for downloaders. + if ( !bRet || !pNewConn ) + { + if ( m_pDownloaderListenSocket ) + { + int nDownloadersAllowed = (m_nMaxWorkers - nActiveConnections) + 8; // Don't allow too many downloaders. + if ( nServiceDownloaders < nDownloadersAllowed ) + bRet = m_pDownloaderListenSocket->Update( &pNewConn, 0 ); + } + } + + if ( bRet && pNewConn ) + { + // Mark this guy as a downloader if necessary. + CIPAddr remoteAddr = pNewConn->GetRemoteAddr(); + if ( remoteAddr.port >= VMPI_SERVICE_DOWNLOADER_PORT_FIRST && remoteAddr.port <= VMPI_SERVICE_DOWNLOADER_PORT_LAST ) + { + CVMPIConnection *pVMPIConnection = FindConnectionBySocket( pNewConn, false ); + if ( pVMPIConnection ) + pVMPIConnection->m_bIsAService = true; + } + + // Send this guy all the persistent packets. + CCriticalSectionLock csLock( &g_PersistentPacketsCS ); + csLock.Lock(); + + FOR_EACH_LL( g_PersistentPackets, iPacket ) + { + PersistentPacket *pPacket = g_PersistentPackets[iPacket]; + VMPI_SendData( pPacket->Base(), pPacket->Count(), g_nConnections-1 ); + } + + UpdateActiveConnectionsText(); + return true; + } + else + { + return false; + } +} + + +void CMasterBroadcaster::ThreadFn() +{ + // Update every 100ms or until the main thread tells us to go away. + while ( WaitForSingleObject( m_hShutdownEvent.GetEventHandle(), 20 ) == WAIT_TIMEOUT ) + { + DWORD startTime = GetTickCount(); + while ( Update() && (GetTickCount() - startTime) < 500 ) + { + } + } + m_hShutdownReply.SetEvent(); +} + + +DWORD CMasterBroadcaster::StaticThreadFn( LPVOID lpParameter ) +{ + ((CMasterBroadcaster*)lpParameter)->ThreadFn(); + return 0; +} + + +void CMasterBroadcaster::Term() +{ + // Shutdown the update thread. + if ( m_hThread ) + { + m_hShutdownEvent.SetEvent(); + WaitForSingleObject( m_hThread, INFINITE ); + CloseHandle( m_hThread ); + m_hThread = 0; + } + + if ( m_pSocket ) + { + m_pSocket->Release(); + m_pSocket = NULL; + } + + if ( m_pListenSocket ) + { + m_pListenSocket->Release(); + m_pListenSocket = NULL; + } + + if ( m_pDownloaderListenSocket ) + { + m_pDownloaderListenSocket->Release(); + m_pDownloaderListenSocket = NULL; + } + + m_iListenPort = -1; + m_iDownloaderListenPort = -1; +} + + +int CMasterBroadcaster::GetListenPort() const +{ + return m_iListenPort; +} + + +int CMasterBroadcaster::GetMaxWorkers() const +{ + return m_nMaxWorkers; +} + + +void CMasterBroadcaster::IncreaseMaxWorkers( int count ) +{ + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + m_nMaxWorkers = min( MAX_VMPI_CONNECTIONS, m_nMaxWorkers + count ); +} + +void CMasterBroadcaster::SetPassword( const char *pPassword ) +{ + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + Q_strncpy( m_BroadcastInfo.m_Password, pPassword, sizeof( m_BroadcastInfo.m_Password ) ); +} + +void CMasterBroadcaster::SetNoTimeoutOption() +{ + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + // Don't re-add the option if it's already there. + for ( int i=1; i < m_BroadcastInfo.m_Args.Count(); i++ ) + { + if ( Q_stricmp( m_BroadcastInfo.m_Args[i], VMPI_GetParamString( mpi_NoTimeout ) ) == 0 ) + return; + } + + m_BroadcastInfo.m_Args.InsertAfter( 0, (char*)VMPI_GetParamString( mpi_NoTimeout ) ); +} + + +CMasterBroadcaster g_MasterBroadcaster; + + +// ---------------------------------------------------------------------------------------- // +// CDispatchReg. +// ---------------------------------------------------------------------------------------- // + +CDispatchReg::CDispatchReg( int iPacketID, VMPIDispatchFn fn ) +{ + Assert( iPacketID >= 0 && iPacketID < MAX_VMPI_PACKET_IDS ); + Assert( !g_VMPIDispatch[iPacketID] ); + g_VMPIDispatch[iPacketID] = fn; +} + + +void VMPI_HandleTimingWait_Worker() +{ + if ( VMPI_IsParamUsed( mpi_TimingWait ) ) + { + Msg( "-mpi_TimingWait specified. Waiting for master to start..." ); + + // Wait for the signal to go. + while ( !g_bTimingWaitDone ) + { + VMPI_DispatchNextMessage( 50 ); + } + + Msg( "\n "); + } +} + + +void VMPI_HandleTimingWait_Master() +{ + if ( VMPI_IsParamUsed( mpi_TimingWait ) ) + { + Msg( "-mpi_TimingWait specified. Waiting for a keypress to continue... " ); + getch(); + Msg( "\n" ); + + unsigned char cPacket[2] = { VMPI_INTERNAL_PACKET_ID, VMPI_INTERNAL_SUBPACKET_TIMING_WAIT_DONE }; + VMPI_SendData( cPacket, sizeof( cPacket ), VMPI_PERSISTENT ); + } +} + + +// ---------------------------------------------------------------------------------------- // +// Helpers. +// ---------------------------------------------------------------------------------------- // + +bool MPI_Init_Worker( int &argc, char **&argv, const CIPAddr &masterAddr, bool bConnectingAsService ) +{ + g_bMPIMaster = false; + + // Make a connector to try connect to the master. + CVMPIConnectionCreator connectionCreator; + + int iFirstPort = VMPI_WORKER_PORT_FIRST; + int iLastPort = VMPI_WORKER_PORT_LAST; + if ( bConnectingAsService ) + { + iFirstPort = VMPI_SERVICE_DOWNLOADER_PORT_FIRST; + iLastPort = VMPI_SERVICE_DOWNLOADER_PORT_LAST; + } + + // Now wait for a connection. + int nAttempts = 1; +Retry:; + + ITCPConnectSocket *pConnectSocket = NULL; + int iPort; + for ( iPort=iFirstPort; iPort <= iLastPort; iPort++ ) + { + pConnectSocket = ThreadedTCP_CreateConnector( + masterAddr, + CIPAddr( 0, 0, 0, 0, iPort ), + &connectionCreator ); + + if ( pConnectSocket ) + break; + } + if ( !pConnectSocket ) + { + Error( "Can't bind a port in range [%d, %d].", iFirstPort, iLastPort ); + } + + + CWaitTimer wait( 3 ); + while ( 1 ) + { + IThreadedTCPSocket *pSocket = NULL; + if ( pConnectSocket->Update( &pSocket, 100 ) ) + { + if ( pSocket ) + { + // Send the master our machine name. + VMPI_SendMachineNameTo( VMPI_MASTER_ID ); + + // Verify that the exe is correct. + VMPI_ReceiveExeName(); + + if ( g_bVMPISDKMode ) + { + VMPI_ReceiveCommandLine(); + + CommandLine()->CreateCmdLine( g_WorkerCommandLine.Count(), g_WorkerCommandLine.Base() ); + argc = g_WorkerCommandLine.Count(); + argv = g_WorkerCommandLine.Base(); + } + + ParseOptions( g_WorkerCommandLine.Count(), g_WorkerCommandLine.Base() ); + for ( int i=0; i < g_WorkerCommandLine.Count(); i++ ) + { + Msg( "arg %d: %s\n", i, g_WorkerCommandLine[i] ); + } + + VMPI_HandleTimingWait_Worker(); + return true; + } + } + else + { + pConnectSocket->Release(); + Error( "ITCPConnectSocket::Update() errored out" ); + } + + if( wait.ShouldKeepWaiting() ) + Sleep( 100 ); + else + break; + }; + + // Never made a connection, shucks. + pConnectSocket->Release(); + + if ( VMPI_IsParamUsed( mpi_Retry ) ) + { + Msg( "%s found. Retrying connection to %d.%d.%d.%d:%d (attempt %d).\n", VMPI_GetParamString( mpi_Retry ), masterAddr.ip[0], masterAddr.ip[1], masterAddr.ip[2], masterAddr.ip[3], masterAddr.port, nAttempts++ ); + goto Retry; + } + + return false; +} + + +bool SpawnLocalWorker( int argc, char **argv, int iListenPort, bool bShowConsoleWindow ) +{ + char commandLine[4096]; + commandLine[0] = 0; + + // Add the -mpi_worker argument in, then launch the process. + for ( int i=0; i < 9999999; i++ ) + { + char argStr[512]; + + if ( i == 1 ) + { + Q_snprintf( argStr, sizeof( argStr ), "-mpi_worker 127.0.0.1:%d ", iListenPort ); + Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS ); + Q_strncat( commandLine, "-allowdebug ", sizeof( commandLine ), COPY_ALL_CHARACTERS ); + + // Add -mpi_SDKMode if it's needed. This would mostly only occur in a debugging situation + // (someone running out of rel using -mpi_AutoLocalWorker). + if ( VMPI_IsSDKMode() && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_SDKMode ), "" ) ) + { + Q_strncat( commandLine, VMPI_GetParamString( mpi_SDKMode ), sizeof( commandLine ), COPY_ALL_CHARACTERS ); + } + } + + if ( i >= argc ) + break; + + Q_snprintf( argStr, sizeof( argStr ), "\"%s\" ", argv[i] ); + Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS ); + } + + char workingDir[1024]; + if ( !_getcwd( workingDir, sizeof( workingDir ) ) ) + { + Warning( "_getcwd() failed.\n" ); + return false; + } + + STARTUPINFO si; + memset( &si, 0, sizeof( si ) ); + si.cb = sizeof( si ); + + PROCESS_INFORMATION pi; + memset( &pi, 0, sizeof( pi ) ); + + if ( CreateProcess( + NULL, + commandLine, + NULL, // security + NULL, + TRUE, + (bShowConsoleWindow ? CREATE_NEW_CONSOLE : CREATE_NO_WINDOW) | IDLE_PRIORITY_CLASS, // flags + NULL, // environment + workingDir, // current directory (use c:\\ because we don't want it to accidentally share + // DLLs like vstdlib with us). + &si, + &pi ) ) + { + return true; + } + else + { + char errStr[1024]; + IP_GetLastErrorString( errStr, sizeof( errStr ) ); + Warning( " - ERROR in CreateProcess (%s)!\n", errStr ); + return false; + } +} + + +bool InitMaster( int argc, char **argv, const char *pDependencyFilename, VMPIRunMode runMode, bool bPatchMode ) +{ + int nMaxWorkers = -1; + const char *pProcCount = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_WorkerCount ) ); + if ( pProcCount ) + { + nMaxWorkers = atoi( pProcCount ); + Warning( "%s: waiting for %d processes to join.\n", VMPI_GetParamString( mpi_WorkerCount ), nMaxWorkers ); + } + else + { + nMaxWorkers = DEFAULT_MAX_WORKERS; + } + nMaxWorkers = clamp( nMaxWorkers, 2, MAX_VMPI_CONNECTIONS ); + + + g_bMPIMaster = true; + g_nMaxWorkerCount = nMaxWorkers; + + if ( argc <= 0 ) + Error( "MPI_Init_Master: argc <= 0!" ); + + ParseOptions( argc, argv ); + + // Send the base filename of the exe we're running. Sometimes if we run vvis followed by vrad + // really quickly, the old vvis workers can connect to the vrad process and mess with it. + VMPI_SendExeName(); + + // In SDK mode, the master sends the command line to the workers since + // the workers weren't given a full command line by vmpi_service. + if ( VMPI_IsSDKMode() ) + { + VMPI_SendCommandLine( argc, argv ); + } + + if ( !g_MasterBroadcaster.Init( argc, argv, pDependencyFilename, nMaxWorkers, runMode, bPatchMode ) ) + return false; + + bool bRet; + if ( runMode == VMPI_RUN_LOCAL ) + { + bRet = SpawnLocalWorker( argc, argv, g_MasterBroadcaster.GetListenPort(), false ); + } + else + { + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_AutoLocalWorker ), "" ) ) + { + Msg( "%s found. Spawning a local worker automatically.\n", VMPI_GetParamString( mpi_AutoLocalWorker ) ); + SpawnLocalWorker( 1, argv, g_MasterBroadcaster.GetListenPort(), true ); + } + + bRet = true; + } + + VMPI_HandleTimingWait_Master(); + return bRet; +} + + +void VMPI_InitGlobals( int argc, char **argv, VMPIRunMode runMode ) +{ + g_bUseMPI = true; + g_VMPIRunMode = runMode; + + // Init event objects. + g_VMPIMessagesEvent.Init( false, false ); + g_ErrorSocketsEvent.Init( false, false ); + + // Load this for GetConsoleWindow(). + g_hKernel32DLL = LoadLibrary( "kernel32.dll" ); + if ( g_hKernel32DLL ) + { + g_pConsoleWndFn = (GetConsoleWndFn)GetProcAddress( g_hKernel32DLL, "GetConsoleWindow" ); + } + + #if defined( _DEBUG ) + + for ( int iArg=0; iArg < argc; iArg++ ) + { + Warning( "%s\n", argv[iArg] ); + } + + Warning( "\n" ); + + #endif +} + + +bool VMPI_CheckForNonSDKExecutables() +{ + char baseExeFilename[512]; + if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) + Error( "VMPI_CheckSDKMode -> GetModuleFileName failed." ); + + V_StripLastDir( baseExeFilename, sizeof( baseExeFilename ) ); + V_AppendSlash( baseExeFilename, sizeof( baseExeFilename ) ); + V_strncat( baseExeFilename, "mysql_wrapper.dll", sizeof( baseExeFilename ) ); + + // If vmpi_transfer.exe doesn't exist, then we assume we're in SDK mode. + return ( _access( baseExeFilename, 0 ) == 0 ); +} + + +bool IsValidSDKBinPath( CUtlVector< char* > &outStrings, int *pError ) +{ + *pError = 0; + + // Minimum must have drive:/basedir/steamapps/name/sourcesdk/bin/[ep1|orangebox]/bin/exename + if ( outStrings.Count() < 9 ) + { + *pError = 0; + return false; + } + + if ( V_stricmp( outStrings[outStrings.Count()-2], "bin" ) != 0 ) + { + *pError = 1; + return false; + } + + if ( V_stricmp( outStrings[outStrings.Count()-5], "sourcesdk" ) != 0 ) + { + *pError = 2; + return false; + } + + if ( V_stricmp( outStrings[outStrings.Count()-7], "steamapps" ) != 0 ) + { + *pError = 3; + return false; + } + + // Check the last-access date on clientregistry.blob + char baseSteamPath[MAX_PATH]; + V_strncpy( baseSteamPath, outStrings[0], sizeof( baseSteamPath) ); + for ( int i=1; i < outStrings.Count() - 7; i++ ) + { + V_AppendSlash( baseSteamPath, sizeof( baseSteamPath ) ); + V_strncat( baseSteamPath, outStrings[i], sizeof( baseSteamPath ) ); + } + + char blobPath[MAX_PATH]; + V_ComposeFileName( baseSteamPath, "ClientRegistry.blob", blobPath, sizeof( blobPath ) ); + struct _stat results; + if ( _stat( blobPath, &results ) != 0 ) + { + *pError = 4; + return false; + } + + long curTime; + VCRHook_Time( &curTime ); + int nSecondsSinceLastSteamAccess = curTime - results.st_mtime; + int nSecondsPerDay = 60 * 60 * 24; + int nMaxDaysUnaccessed = 10; + if ( nSecondsSinceLastSteamAccess > nSecondsPerDay*nMaxDaysUnaccessed ) + { + *pError = 5; // NOTE: don't change this error code because the outer function checks for it. + return false; + } + + // Check for some of the files under sourcesdk_content. + char sourcesdkContentPath[MAX_PATH]; + V_strncpy( sourcesdkContentPath, outStrings[0], sizeof( sourcesdkContentPath ) ); + for ( int i=1; i < outStrings.Count() - 5; i++ ) + { + V_AppendSlash( sourcesdkContentPath, sizeof( sourcesdkContentPath ) ); + V_strncat( sourcesdkContentPath, outStrings[i], sizeof( sourcesdkContentPath ) ); + } + V_AppendSlash( sourcesdkContentPath, sizeof( sourcesdkContentPath ) ); + V_strncat( sourcesdkContentPath, "sourcesdk_content", sizeof( sourcesdkContentPath ) ); + + char tempFilename[MAX_PATH], mapsrcFilename[MAX_PATH]; + V_snprintf( tempFilename, sizeof( tempFilename ), "cstrike%cmapsrc", CORRECT_PATH_SEPARATOR ); + V_ComposeFileName( sourcesdkContentPath, tempFilename, mapsrcFilename, sizeof( mapsrcFilename ) ); + if ( _access( mapsrcFilename, 0 ) != 0 ) + { + *pError = 6; + return false; + } + + return true; +} + +void VerifyValidSDKMode() +{ + // Make sure we're running out of the SourceSDK directory and that our SDK directories are filled out. + char baseExeFilename[MAX_PATH]; + if ( !GetModuleFileName( GetModuleHandle( NULL ), baseExeFilename, sizeof( baseExeFilename ) ) ) + Error( "VerifyValidSDKMode: GetModuleFileName failed." ); + V_FixSlashes( baseExeFilename ); + + CUtlVector< char* > outStrings; + char strSlash[2] = {CORRECT_PATH_SEPARATOR, 0}; + V_SplitString( baseExeFilename, strSlash, outStrings ); + + int err; + if ( !IsValidSDKBinPath( outStrings, &err ) ) + { + outStrings.PurgeAndDeleteElements(); + + if ( err == 5 ) + Error( "VMPI running in SDK mode but Steam hasn't been run recently. Please run Steam and retry." ); + else + Error( "VMPI running in SDK mode but incorrect SDK install detected (error %d).", err ); + } +} + +void VMPI_CheckSDKMode( int argc, char **argv ) +{ + g_bVMPISDKMode = !VMPI_CheckForNonSDKExecutables(); + g_bVMPISDKModeSet = true; + + // Also check for -mpi_sdkmode (only used in testing). + if ( !g_bVMPISDKMode ) + { + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_SDKMode ), "" ) ) + g_bVMPISDKMode = true; + } + + if ( g_bVMPISDKMode ) + { + VerifyValidSDKMode(); + } + + if ( g_bVMPISDKMode ) + { + Msg( "VMPI running in SDK mode.\n" ); + } +} + + +void VMPI_SetupAutoRestartParameters( int argc, char **argv ) +{ + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_AutoRestart ) ) ) + { + g_OriginalCommandLineParameters.SetSize( argc ); + for ( int i=0; i < argc; i++ ) + { + g_OriginalCommandLineParameters[i] = CopyString( argv[i] ); + } + } +} + + +bool VMPI_HandleAutoRestart() +{ + if ( g_OriginalCommandLineParameters.Count() == 0 ) + return true; + + Msg( "%s found. Auto-restarting.\n", VMPI_GetParamString( mpi_AutoRestart ) ); + DWORD curPriority = GetPriorityClass( GetCurrentProcess() ); + + char commandLine[1024*8]; + commandLine[0] = 0; + + // Add the -mpi_worker argument in, then launch the process. + for ( int i=0; i < g_OriginalCommandLineParameters.Count(); i++ ) + { + char argStr[512]; + Q_snprintf( argStr, sizeof( argStr ), "\"%s\" ", g_OriginalCommandLineParameters[i] ); + Q_strncat( commandLine, argStr, sizeof( commandLine ), COPY_ALL_CHARACTERS ); + } + + STARTUPINFO si; + memset( &si, 0, sizeof( si ) ); + si.cb = sizeof( si ); + + PROCESS_INFORMATION pi; + memset( &pi, 0, sizeof( pi ) ); + + if ( CreateProcess( + NULL, + commandLine, + NULL, // security + NULL, + TRUE, + CREATE_NEW_CONSOLE | curPriority, // flags + NULL, // environment + NULL, + &si, + &pi ) ) + { + g_OriginalCommandLineParameters.Purge(); + return true; + } + else + { + char errStr[1024]; + IP_GetLastErrorString( errStr, sizeof( errStr ) ); + Warning( " - ERROR in CreateProcess (%s)!\n", errStr ); + return false; + } +} + + +bool VMPI_Init( + int &argc, + char **&argv, + const char *pDependencyFilename, + VMPI_Disconnect_Handler handler, + VMPIRunMode runMode, + bool bConnectingAsService + ) +{ + if ( handler ) + VMPI_AddDisconnectHandler( handler ); + + VMPI_SetupAutoRestartParameters( argc, argv ); + + VMPI_CheckSDKMode( argc, argv ); + VMPI_InitGlobals( argc, argv, runMode ); + + // Were we launched by the vmpi service as a worker? + const char *pMasterIP = VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), NULL ); + if ( pMasterIP ) + { + CIPAddr addr; + addr.port = VMPI_MASTER_FIRST_PORT; + if ( !ConvertStringToIPAddr( pMasterIP, &addr ) ) + Error( "Unable to parse or resolve master IP (%s).\n", pMasterIP ); + + return MPI_Init_Worker( argc, argv, addr, bConnectingAsService ); + } + else + { + if ( !pDependencyFilename ) + { + Error( "VMPI started as master, but no dependency filename specified.\n" ); + return false; + } + + return InitMaster( argc, argv, pDependencyFilename, runMode, false ); + } +} + + +void VMPI_Init_PatchMaster( int argc, char **argv ) +{ + const char *pPatchDirectory = VMPI_FindArg( argc, argv, "-mpi_PatchDirectory", NULL ); + if ( !pPatchDirectory ) + Error( "-mpi_PatchDirectory <dir> must be specified if using -PatchHost mode." ); + + VMPI_InitGlobals( argc, argv, VMPI_RUN_NETWORKED ); + + InitMaster( argc, argv, pPatchDirectory, VMPI_RUN_NETWORKED, true ); +} + + +void VMPI_Finalize() +{ + g_MasterBroadcaster.Term(); + + DistributeWork_Cancel(); + + // Get rid of all the sockets. + for ( int iConn=0; iConn < g_nConnections; iConn++ ) + delete g_Connections[iConn]; + + g_nConnections = 0; + + // Get rid of all the packets. + FOR_EACH_LL( g_VMPIMessages, i ) + { + g_VMPIMessages[i]->Release(); + } + g_VMPIMessages.Purge(); + + g_PersistentPackets.PurgeAndDeleteElements(); + + // Get rid of the message buffers + g_DispatchBuffers.Purge(); + + if ( g_hKernel32DLL ) + { + FreeLibrary( g_hKernel32DLL ); + g_hKernel32DLL = NULL; + } + + g_WorkerCommandLine.PurgeAndDeleteElements(); + + VMPI_HandleAutoRestart(); +} + + +VMPIRunMode VMPI_GetRunMode() +{ + return g_VMPIRunMode; +} + + +VMPIFileSystemMode VMPI_GetFileSystemMode() +{ + return g_VMPIFileSystemMode; +} + + +int VMPI_GetCurrentNumberOfConnections() +{ + return g_nConnections; +} + + +void InternalHandleSocketErrors() +{ + // Copy the list of sockets with errors into a local array so we can handle all the errors outside + // the mutex, thus avoiding potential deadlock if any error handlers call Error(). + CUtlVector<CVMPIConnection*> errorSockets; + + CCriticalSectionLock csLock( &g_ErrorSocketsCS ); + csLock.Lock(); + + errorSockets.SetSize( g_ErrorSockets.Count() ); + int iCur = 0; + FOR_EACH_LL( g_ErrorSockets, i ) + { + errorSockets[iCur++] = g_ErrorSockets[i]; + } + + g_ErrorSockets.Purge(); + + csLock.Unlock(); + + // Handle the errors. + for ( int i=0; i < errorSockets.Count(); i++ ) + { + errorSockets[i]->HandleDisconnect(); + } + + UpdateActiveConnectionsText(); +} + + +void VMPI_HandleSocketErrors( unsigned long timeout ) +{ + DWORD ret = WaitForSingleObject( g_ErrorSocketsEvent.GetEventHandle(), timeout ); + if ( ret == WAIT_OBJECT_0 ) + { + InternalHandleSocketErrors(); + } +} + + +// If bWait is false, then this function returns false immediately if there are no messages waiting. +bool VMPI_GetNextMessage( MessageBuffer *pBuf, int *pSource, unsigned long startTimeout ) +{ + HANDLE handles[2] = { g_ErrorSocketsEvent.GetEventHandle(), g_VMPIMessagesEvent.GetEventHandle() }; + + DWORD startTime = Plat_MSTime(); + DWORD timeout = startTimeout; + + while ( 1 ) + { + DWORD ret = WaitForMultipleObjects( ARRAYSIZE( handles ), handles, FALSE, timeout ); + if ( ret == WAIT_TIMEOUT ) + { + return false; + } + else if ( ret == WAIT_OBJECT_0 ) + { + // A socket had an error. Handle all socket errors. + InternalHandleSocketErrors(); + + // Update the timeout. + DWORD delta = Plat_MSTime() - startTime; + if ( delta >= startTimeout ) + return false; + + timeout = startTimeout - delta; + continue; + } + else if ( ret == (WAIT_OBJECT_0 + 1) ) + { + // Read out the next message. + CCriticalSectionLock csLock( &g_VMPIMessagesCS ); + csLock.Lock(); + +GrabNextMessage:; + int iHead = g_VMPIMessages.Head(); + CTCPPacket *pPacket = g_VMPIMessages[iHead]; + g_VMPIMessages.Remove( iHead ); + + // Set the event again if there are more messages waiting. + const char *pBase = pPacket->GetData(); + if ( pPacket->GetLen() >= 6 && (unsigned char)pBase[0] == VMPI_INTERNAL_PACKET_ID && (unsigned char)pBase[1] == VMPI_INTERNAL_SUBPACKET_GROUPED_PACKET ) + { + // Ok, this is a grouped packet. Split it out into a bunch of separate packets. + CUtlVector<CTCPPacket*> groupedPackets; + int iCurOffset = 2; + while ( (iCurOffset+4) <= pPacket->GetLen() ) + { + int curPacketLen = *((int*)&pBase[iCurOffset]); + if ( iCurOffset + curPacketLen > pPacket->GetLen() ) + Error( "Invalid chunked packet\n" ); + + iCurOffset += 4; + + CTCPPacket *pChunkPacket = (CTCPPacket*)malloc( sizeof( CTCPPacket ) + curPacketLen - 1 ); + pChunkPacket->m_Len = curPacketLen; + pChunkPacket->m_UserData = pPacket->m_UserData; + memcpy( pChunkPacket->m_Data, &pBase[iCurOffset], curPacketLen ); + groupedPackets.AddToTail( pChunkPacket ); + + iCurOffset += curPacketLen; + } + + for ( int i=0; i < groupedPackets.Count(); i++ ) + { + g_VMPIMessages.AddToHead( groupedPackets[groupedPackets.Count() - i - 1] ); + } + pPacket->Release(); + goto GrabNextMessage; + } + else + { + if ( g_VMPIMessages.Count() > 0 ) + g_VMPIMessagesEvent.SetEvent(); + } + + csLock.Unlock(); + + // Copy it into their message buffer. + pBuf->setLen( pPacket->GetLen() ); + memcpy( pBuf->data, pPacket->GetData(), pPacket->GetLen() ); + + *pSource = pPacket->GetUserData(); + Assert( *pSource >= 0 && *pSource < g_nConnections ); + + // Update global stats about how much data we've received. + ++g_nMessagesReceived; + g_nBytesReceived += pPacket->GetLen() + 4; // (4 bytes extra for the packet length) + + // Free the memory associated with the packet. + pPacket->Release(); + return true; + } + else + { + Error( "VMPI_GetNextMessage: WaitForSingleObject returned %lu", ret ); + return false; + } + } +} + + +bool VMPI_InternalDispatch( MessageBuffer *pBuf, int iSource ) +{ + if ( pBuf->getLen() >= 1 && + pBuf->data[0] >= 0 && pBuf->data[0] < MAX_VMPI_PACKET_IDS && + g_VMPIDispatch[pBuf->data[0]] ) + { + return g_VMPIDispatch[ pBuf->data[0] ]( pBuf, iSource, pBuf->data[0] ); + + } + else + { + return false; + } +} + +bool VMPI_DispatchNextMessage( unsigned long timeout ) +{ + MessageBuffer *pBuf = NULL; + if ( !g_DispatchBuffers.PopItem( &pBuf ) ) + { + pBuf = new MessageBuffer(); + } + + bool bRetval = true; + while ( 1 ) + { + int iSource; + if ( VMPI_GetNextMessage( pBuf, &iSource, timeout ) ) + { + if ( VMPI_InternalDispatch( pBuf, iSource ) ) + { + break; + } + else + { + // Workers running in service mode don't hook anything except filesystem stuff, so if they happen to be sent something, no problem. + if ( !VMPI_IsProcAService( iSource ) ) + { + // Oops! What is this packet? + Assert( false ); + } + } + } + else + { + bRetval = false; + break; + } + } + + g_DispatchBuffers.PushItem( pBuf ); + return bRetval; +} + + +bool VMPI_DispatchUntil( MessageBuffer *pBuf, int *pSource, int packetID, int subPacketID, bool bWait ) +{ + while ( 1 ) + { + if ( !VMPI_GetNextMessage( pBuf, pSource, bWait ? VMPI_TIMEOUT_INFINITE : 0 ) ) + return false; + + if ( !VMPI_InternalDispatch( pBuf, *pSource ) ) + { + if ( pBuf->getLen() >= 1 && (unsigned char)pBuf->data[0] == packetID ) + { + if ( subPacketID == -1 ) + return true; + + if ( pBuf->getLen() >= 2 && (unsigned char)pBuf->data[1] == subPacketID ) + return true; + } + + // Oops! What is this packet? + // Note: the most common case where this happens is if it finishes a BuildFaceLights run + // and is in an AppBarrier and one of the workers is still finishing up some work given to it. + // It'll be waiting for a barrier packet, and it'll get results. In that case, the packet should + // be discarded like we do here, so maybe this assert won't be necessary. + //Assert( false ); + } + } +} + + +bool VMPI_SendData( void *pData, int nBytes, int iDest, int fVMPISendFlags ) +{ + return VMPI_SendChunks( &pData, &nBytes, 1, iDest, fVMPISendFlags ); +} + + +inline bool VMPI_FilterPacketsForServiceDownloader( CVMPIConnection *pConnection, void const * const *pChunks, const int *pChunkLengths, int nChunks ) +{ + if ( pConnection->m_bIsAService ) + { + // Find the first byte and treat that as the packet ID. + for ( int i=0; i < nChunks; i++ ) + { + if ( pChunkLengths[i] > 0 ) + { + unsigned char cPacketID = *((unsigned char*)pChunks[i]); + if ( cPacketID == VMPI_INTERNAL_PACKET_ID || cPacketID == VMPI_SHARED_PACKET_ID || cPacketID == VMPI_PACKETID_FILESYSTEM ) + return false; + else + return true; + } + } + } + return false; +} + + +void VMPI_GroupPackets( CVMPIConnection *pConn, void const * const *pChunks, const int *pChunkLengths, int nChunks ) +{ + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + // First add the header. + if ( pConn->m_GroupedChunks.Count() == 0 ) + { + pConn->m_GroupedChunks.AddToTail( g_GroupedPacketHeader ); + pConn->m_GroupedChunkLengths.AddToTail( sizeof( g_GroupedPacketHeader ) ); + } + + // Collate the chunks. + int nTotalLength = 0; + for ( int i=0; i < nChunks; i++ ) + nTotalLength += pChunkLengths[i]; + + char *pOut = new char[nTotalLength + 4]; + *((int*)pOut) = nTotalLength; + int iOutByte = 4; + for ( int i=0; i < nChunks; i++ ) + { + memcpy( &pOut[iOutByte], pChunks[i], pChunkLengths[i] ); + iOutByte += pChunkLengths[i]; + } + + pConn->m_GroupedChunks.AddToTail( pOut ); + pConn->m_GroupedChunkLengths.AddToTail( nTotalLength + 4 ); +} + + +void VMPI_FlushGroupedPackets( unsigned long msInterval ) +{ + if ( msInterval != 0 ) + { + unsigned long curTime = Plat_MSTime(); + if ( curTime - g_LastFlushGroupedPacketsTime < msInterval ) + return; + g_LastFlushGroupedPacketsTime = curTime; + } + + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + for ( int i=0; i < g_nConnections; i++ ) + { + CVMPIConnection *pConn = g_Connections[i]; + + if ( !pConn ) + continue; + + IThreadedTCPSocket *pSocket = pConn->GetSocket(); + if ( !pSocket || pConn->m_GroupedChunks.Count() == 0 ) + continue; + + pSocket->SendChunks( pConn->m_GroupedChunks.Base(), pConn->m_GroupedChunkLengths.Base(), pConn->m_GroupedChunks.Count() ); + + // Free the chunks. + for ( int i=1; i < pConn->m_GroupedChunks.Count(); i++ ) + { + free( pConn->m_GroupedChunks[i] ); + } + pConn->m_GroupedChunks.RemoveAll(); + pConn->m_GroupedChunkLengths.RemoveAll(); + } +} + + +bool VMPI_SendChunks( void const * const *pChunks, const int *pChunkLengths, int nChunks, int iDest, int fVMPISendFlags ) +{ + if ( iDest == VMPI_SEND_TO_ALL ) + { + // Don't want new connections while in here! + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + for ( int i=0; i < g_nConnections; i++ ) + VMPI_SendChunks( pChunks, pChunkLengths, nChunks, i ); + + return true; + } + else if ( iDest == VMPI_PERSISTENT ) + { + // Don't want new connections while in here! + CCriticalSectionLock connectionsLock( &g_ConnectionsCS ); + connectionsLock.Lock(); + + CCriticalSectionLock csLock( &g_PersistentPacketsCS ); + csLock.Lock(); + + // Send the packet to everyone. + for ( int i=0; i < g_nConnections; i++ ) + VMPI_SendChunks( pChunks, pChunkLengths, nChunks, i ); + + // Remember to send it to the new workers. + if ( iDest == VMPI_PERSISTENT ) + { + PersistentPacket *pNew = new PersistentPacket; + for ( int i=0; i < nChunks; i++ ) + pNew->AddMultipleToTail( pChunkLengths[i], (const char*)pChunks[i] ); + + g_PersistentPackets.AddToTail( pNew ); + } + + return true; + } + else + { + g_nMessagesSent++; + g_nBytesSent += 4; // for message tag. + for ( int i=0; i < nChunks; i++ ) + g_nBytesSent += pChunkLengths[i]; + + CVMPIConnection *pConnection = g_Connections[iDest]; + + if ( pConnection ) + { + // If it's a service downloader, only send certain packet IDs. + if ( VMPI_FilterPacketsForServiceDownloader( pConnection, pChunks, pChunkLengths, nChunks ) ) + return true; + + IThreadedTCPSocket *pSocket = pConnection->GetSocket(); + if ( !pSocket ) + return false; + + if ( g_bGroupPackets && (fVMPISendFlags & k_eVMPISendFlags_GroupPackets) ) + { + VMPI_GroupPackets( pConnection, pChunks, pChunkLengths, nChunks ); + return true; + } + else + { + return pSocket->SendChunks( pChunks, pChunkLengths, nChunks ); + } + } + else + { + return false; + } + } +} + + +bool VMPI_Send2Chunks( const void *pChunk1, int chunk1Len, const void *pChunk2, int chunk2Len, int iDest, int fVMPISendFlags ) +{ + const void *pChunks[2] = { pChunk1, pChunk2 }; + int len[2] = { chunk1Len, chunk2Len }; + return VMPI_SendChunks( pChunks, len, ARRAYSIZE( pChunks ), iDest, fVMPISendFlags ); +} + + +bool VMPI_Send3Chunks( const void *pChunk1, int chunk1Len, const void *pChunk2, int chunk2Len, const void *pChunk3, int chunk3Len, int iDest, int fVMPISendFlags ) +{ + const void *pChunks[3] = { pChunk1, pChunk2, pChunk3 }; + int len[3] = { chunk1Len, chunk2Len, chunk3Len }; + return VMPI_SendChunks( pChunks, len, ARRAYSIZE( pChunks ), iDest, fVMPISendFlags ); +} + + +void VMPI_AddDisconnectHandler( VMPI_Disconnect_Handler handler ) +{ + g_DisconnectHandlers.AddToTail( handler ); +} + + +CVMPIConnection* GetConnection( int procID ) +{ + Assert( procID >= 0 && procID < g_nConnections ); + return g_Connections[procID]; +} + + +bool VMPI_IsProcConnected( int procID ) +{ + if ( procID < 0 || procID >= g_nConnections ) + { + Assert( false ); + return false; + } + + return g_Connections[procID]->GetSocket() != NULL; +} + +bool VMPI_IsProcAService( int procID ) +{ + if ( procID < 0 || procID >= g_nConnections ) + { + Assert( false ); + return false; + } + + return g_Connections[procID]->m_bIsAService; +} + +void VMPI_Sleep( unsigned long ms ) +{ + Sleep( ms ); +} + + +const char* VMPI_GetMachineName( int iProc ) +{ + if ( g_bMPIMaster && iProc == VMPI_MASTER_ID ) + return VMPI_GetLocalMachineName(); + + if ( iProc < 0 || iProc >= g_nConnections ) + { + Assert( false ); + return "invalid index"; + } + + return g_Connections[iProc]->GetMachineName(); +} + + +void VMPI_SetMachineName( int iProc, const char *pName ) +{ + if ( iProc < 0 || iProc >= g_nConnections ) + { + Assert( false ); + return; + } + + g_Connections[iProc]->SetMachineName( pName ); +} + + +bool VMPI_HasMachineNameBeenSet( int iProc ) +{ + if ( iProc < 0 || iProc >= g_nConnections ) + { + Assert( false ); + return false; + } + + return g_Connections[iProc]->HasMachineNameBeenSet(); +} + + +const char* VMPI_GetLocalMachineName() +{ + static char cName[MAX_COMPUTERNAME_LENGTH+1]; + DWORD len = sizeof( cName ); + if ( GetComputerName( cName, &len ) ) + return cName; + else + return "(error in GetComputerName)"; +} + + +unsigned long VMPI_GetJobWorkerID( int iProc ) +{ + return GetConnection( iProc )->m_JobWorkerID; +} + + +void VMPI_SetJobWorkerID( int iProc, unsigned long jobWorkerID ) +{ + GetConnection( iProc )->m_JobWorkerID = jobWorkerID; +} + + +void VMPI_GetCurrentStage( char *pOut, int strLen ) +{ + CCriticalSectionLock csLock( &g_CurrentStageCS ); + csLock.Lock(); + Q_strncpy( pOut, g_CurrentStageString, strLen ); +} + + +void VMPI_SetCurrentStage( const char *pCurStage ) +{ + CCriticalSectionLock csLock( &g_CurrentStageCS ); + csLock.Lock(); + Q_strncpy( g_CurrentStageString, pCurStage, sizeof( g_CurrentStageString ) ); +} + + +void VMPI_InviteDebugWorkers() +{ + // Only allow workers with password set to debugworker. + g_MasterBroadcaster.SetPassword( "debugworker" ); + + // Disable timeouts so they can sit in the debugger. + g_MasterBroadcaster.SetNoTimeoutOption(); + ThreadedTCP_EnableTimeouts( false ); + + // Let in some more workers. + g_MasterBroadcaster.IncreaseMaxWorkers( 25 ); +} + + +bool VMPI_IsSDKMode() +{ + if ( g_bVMPISDKModeSet ) + return g_bVMPISDKMode; + else + return !VMPI_CheckForNonSDKExecutables(); +} + + +const char* VMPI_GetParamString( EVMPICmdLineParam eParam ) +{ + if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam ) + { + Assert( false ); + Warning( "Invalid call: VMPI_GetParamString( %d )\n", eParam ); + return "unknown"; + } + else + { + return g_VMPIParams[eParam].m_pName; + } +} + +int VMPI_GetParamFlags( EVMPICmdLineParam eParam ) +{ + if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam ) + { + Assert( false ); + Warning( "Invalid call: VMPI_GetParamString( %d )\n", eParam ); + return 0; + } + else + { + return g_VMPIParams[eParam].m_ParamFlags; + } +} + +bool VMPI_IsParamUsed( EVMPICmdLineParam eParam ) +{ + int iParam = CommandLine()->FindParm( VMPI_GetParamString( eParam ) ); + return iParam != 0; +} + +const char* VMPI_GetParamHelpString( EVMPICmdLineParam eParam ) +{ + if ( eParam <= k_eVMPICmdLineParam_FirstParam || eParam >= k_eVMPICmdLineParam_LastParam ) + { + Assert( false ); + Warning( "Invalid call: VMPI_GetParamHelpString( %d )\n", eParam ); + return "unknown vmpi param"; + } + else + { + return g_VMPIParams[eParam].m_pHelpText; + } +} + + |