From f56bb35301836e56582a575a75864392a0177875 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B8rgen=20P=2E=20Tjern=C3=B8?= Date: Mon, 2 Dec 2013 19:31:46 -0800 Subject: Fix line endings. WHAMMY. --- mp/src/utils/vvis/mpivis.cpp | 1280 +++++++++++++++++++++--------------------- 1 file changed, 640 insertions(+), 640 deletions(-) (limited to 'mp/src/utils/vvis/mpivis.cpp') diff --git a/mp/src/utils/vvis/mpivis.cpp b/mp/src/utils/vvis/mpivis.cpp index 6da76ebc..58b76331 100644 --- a/mp/src/utils/vvis/mpivis.cpp +++ b/mp/src/utils/vvis/mpivis.cpp @@ -1,640 +1,640 @@ -//========= Copyright Valve Corporation, All rights reserved. ============// -// -// Purpose: -// -// $NoKeywords: $ -// -//=============================================================================// - -#include -#include "vis.h" -#include "threads.h" -#include "stdlib.h" -#include "pacifier.h" -#include "mpi_stats.h" -#include "vmpi.h" -#include "vmpi_dispatch.h" -#include "vmpi_filesystem.h" -#include "vmpi_distribute_work.h" -#include "iphelpers.h" -#include "threadhelpers.h" -#include "vstdlib/random.h" -#include "vmpi_tools_shared.h" -#include -#include "scratchpad_helpers.h" - - -#define VMPI_VVIS_PACKET_ID 1 - // Sub packet IDs. - #define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect. - #define VMPI_SUBPACKETID_BASEPORTALVIS 5 - #define VMPI_SUBPACKETID_PORTALFLOW 6 - #define VMPI_BASEPORTALVIS_RESULTS 7 - #define VMPI_BASEPORTALVIS_WORKER_DONE 8 - #define VMPI_PORTALFLOW_RESULTS 9 - #define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11 - #define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12 - #define VMPI_SUBPACKETID_MC_ADDR 13 - -// DistributeWork owns this packet ID. -#define VMPI_DISTRIBUTEWORK_PACKETID 2 - - -extern bool fastvis; - -// The worker waits until these are true. -bool g_bBasePortalVisSync = false; -bool g_bPortalFlowSync = false; - -CUtlVector g_BasePortalVisResultsFilename; - -CCycleCount g_CPUTime; - - -// This stuff is all for the multicast channel the master uses to send out the portal results. -ISocket *g_pPortalMCSocket = NULL; -CIPAddr g_PortalMCAddr; -bool g_bGotMCAddr = false; -HANDLE g_hMCThread = NULL; -CEvent g_MCThreadExitEvent; -unsigned long g_PortalMCThreadUniqueID = 0; -int g_nMulticastPortalsReceived = 0; - - -// Handle VVIS packets. -bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID ) -{ - switch ( pBuf->data[1] ) - { - case VMPI_SUBPACKETID_MC_ADDR: - { - pBuf->setOffset( 2 ); - pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) ); - g_bGotMCAddr = true; - return true; - } - - case VMPI_SUBPACKETID_DISCONNECT_NOTIFY: - { - // This is just used to cause nonblocking dispatches to jump out so loops like the one - // in AppBarrier can handle the fact that there are disconnects. - return true; - } - - case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC: - { - g_bBasePortalVisSync = true; - return true; - } - - case VMPI_SUBPACKETID_PORTALFLOW_SYNC: - { - g_bPortalFlowSync = true; - return true; - } - - case VMPI_BASEPORTALVIS_RESULTS: - { - const char *pFilename = &pBuf->data[2]; - g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 ); - return true; - } - - default: - { - return false; - } - } -} -CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want -CDispatchReg g_DistributeWorkReg( VMPI_DISTRIBUTEWORK_PACKETID, DistributeWorkDispatch ); - - - -void VMPI_DeletePortalMCSocket() -{ - // Stop the thread if it exists. - if ( g_hMCThread ) - { - g_MCThreadExitEvent.SetEvent(); - WaitForSingleObject( g_hMCThread, INFINITE ); - CloseHandle( g_hMCThread ); - g_hMCThread = NULL; - } - - if ( g_pPortalMCSocket ) - { - g_pPortalMCSocket->Release(); - g_pPortalMCSocket = NULL; - } -} - - -void VVIS_SetupMPI( int &argc, char **&argv ) -{ - if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) ) - return; - - CmdLib_AtCleanup( VMPI_Stats_Term ); - CmdLib_AtCleanup( VMPI_DeletePortalMCSocket ); - - VMPI_Stats_InstallSpewHook(); - - // Force local mode? - VMPIRunMode mode; - if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) ) - mode = VMPI_RUN_LOCAL; - else - mode = VMPI_RUN_NETWORKED; - - // - // Extract mpi specific arguments - // - Msg( "Initializing VMPI...\n" ); - if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) ) - { - Error( "MPI_Init failed." ); - } - - StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" ); -} - - -void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf ) -{ - CTimeAdder adder( &g_CPUTime ); - - BasePortalVis( iThread, iPortal ); - - // Send my result to the master - if ( pBuf ) - { - portal_t * p = &portals[iPortal]; - pBuf->write( p->portalfront, portalbytes ); - pBuf->write( p->portalflood, portalbytes ); - } -} - - -void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker ) -{ - portal_t * p = &portals[iWorkUnit]; - if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0) - { - Msg("Duplicate portal %llu\n", iWorkUnit); - } - - if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 ) - Error( "Invalid packet in ReceiveBasePortalVis." ); - - // - // allocate memory for bitwise vis solutions for this portal - // - p->portalfront = (byte*)malloc (portalbytes); - pBuf->read( p->portalfront, portalbytes ); - - p->portalflood = (byte*)malloc (portalbytes); - pBuf->read( p->portalflood, portalbytes ); - - p->portalvis = (byte*)malloc (portalbytes); - memset (p->portalvis, 0, portalbytes); - - p->nummightsee = CountBits( p->portalflood, g_numportals*2 ); -} - - -//----------------------------------------- -// -// Run BasePortalVis across all available processing nodes -// Then collect and redistribute the results. -// -void RunMPIBasePortalVis() -{ - int i; - - Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 ); - Msg("%-20s ", "BasePortalVis:"); - if ( g_bMPIMaster ) - StartPacifier(""); - - - VMPI_SetCurrentStage( "RunMPIBasePortalVis" ); - - // Note: we're aiming for about 1500 portals in a map, so about 3000 work units. - g_CPUTime.Init(); - double elapsed = DistributeWork( - g_numportals * 2, // # work units - VMPI_DISTRIBUTEWORK_PACKETID, // packet ID - ProcessBasePortalVis, // Worker function to process work units - ReceiveBasePortalVis // Master function to receive work results - ); - - if ( g_bMPIMaster ) - { - EndPacifier( false ); - Msg( " (%d)\n", (int)elapsed ); - } - - // - // Distribute the results to all the workers. - // - if ( g_bMPIMaster ) - { - if ( !fastvis ) - { - VMPI_SetCurrentStage( "SendPortalResults" ); - - // Store all the portal results in a temp file and multicast that to the workers. - CUtlVector allPortalData; - allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 ); - - char *pOut = allPortalData.Base(); - for ( i=0; i < g_numportals * 2; i++) - { - portal_t *p = &portals[i]; - - memcpy( pOut, p->portalfront, portalbytes ); - pOut += portalbytes; - - memcpy( pOut, p->portalflood, portalbytes ); - pOut += portalbytes; - } - - const char *pVirtualFilename = "--portal-results--"; - VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() ); - - char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS }; - VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT ); - } - } - else - { - VMPI_SetCurrentStage( "RecvPortalResults" ); - - // Wait until we've received the filename from the master. - while ( g_BasePortalVisResultsFilename.Count() == 0 ) - { - VMPI_DispatchNextMessage(); - } - - // Open - FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID ); - if ( !fp ) - Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() ); - - for ( i=0; i < g_numportals * 2; i++) - { - portal_t *p = &portals[i]; - - p->portalfront = (byte*)malloc (portalbytes); - g_pFileSystem->Read( p->portalfront, portalbytes, fp ); - - p->portalflood = (byte*)malloc (portalbytes); - g_pFileSystem->Read( p->portalflood, portalbytes, fp ); - - p->portalvis = (byte*)malloc (portalbytes); - memset (p->portalvis, 0, portalbytes); - - p->nummightsee = CountBits (p->portalflood, g_numportals*2); - } - - g_pFileSystem->Close( fp ); - } - - - if ( !g_bMPIMaster ) - { - if ( g_iVMPIVerboseLevel >= 1 ) - Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads ); - } -} - - - -void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf ) -{ - // Process Portal and distribute results - CTimeAdder adder( &g_CPUTime ); - - PortalFlow( iThread, iPortal ); - - // Send my result to root and potentially the other slaves - // The slave results are read in RecursiveLeafFlow - // - if ( pBuf ) - { - portal_t * p = sorted_portals[iPortal]; - pBuf->write( p->portalvis, portalbytes ); - } -} - - -void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker ) -{ - portal_t *p = sorted_portals[iWorkUnit]; - - if ( p->status != stat_done ) - { - pBuf->read( p->portalvis, portalbytes ); - p->status = stat_done; - - - // Multicast the status of this portal out. - if ( g_pPortalMCSocket ) - { - char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS }; - void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis }; - int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes }; - - g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) ); - } - } -} - - -DWORD WINAPI PortalMCThreadFn( LPVOID p ) -{ - CUtlVector data; - data.SetSize( portalbytes + 128 ); - - DWORD waitTime = 0; - while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 ) - { - CIPAddr ipFrom; - int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom ); - if ( len == -1 ) - { - waitTime = 20; - } - else - { - // These lengths must match exactly what is sent in ReceivePortalFlow. - if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes ) - { - // Perform more validation... - if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS ) - { - if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID ) - { - int iWorkUnit = *((int*)&data[6]); - if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 ) - { - portal_t *p = sorted_portals[iWorkUnit]; - if ( p ) - { - ++g_nMulticastPortalsReceived; - memcpy( p->portalvis, &data[10], portalbytes ); - p->status = stat_done; - waitTime = 0; - } - } - } - } - } - } - } - - return 0; -} - - -void MCThreadCleanupFn() -{ - g_MCThreadExitEvent.SetEvent(); -} - - -// --------------------------------------------------------------------------------- // -// Cheesy hack to let them stop the job early and keep the results of what has -// been done so far. -// --------------------------------------------------------------------------------- // - -class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks -{ -public: - CVisDistributeWorkCallbacks() - { - m_bExitedEarly = false; - m_iState = STATE_NONE; - } - - virtual bool Update() - { - if ( kbhit() ) - { - int key = toupper( getch() ); - if ( m_iState == STATE_NONE ) - { - if ( key == 'M' ) - { - m_iState = STATE_AT_MENU; - Warning("\n\n" - "----------------------\n" - "1. Write scratchpad file.\n" - "2. Exit early and use fast vis for remaining portals.\n" - "\n" - "0. Exit menu.\n" - "----------------------\n" - "\n" - ); - } - } - else if ( m_iState == STATE_AT_MENU ) - { - if ( key == '1' ) - { - Warning( - "\n" - "\nWriting scratchpad file." - "\nCommand line: scratchpad3dviewer -file scratch.pad\n" - "\nRed portals are the portals that are fast vis'd." - "\n" - ); - m_iState = STATE_NONE; - IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" ); - if ( pPad ) - { - ScratchPad_DrawWorld( pPad, false ); - - // Draw the portals that haven't been vis'd. - for ( int i=0; i < g_numportals*2; i++ ) - { - portal_t *p = sorted_portals[i]; - ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) ); - } - - pPad->Release(); - } - } - else if ( key == '2' ) - { - // Exit the process early. - m_bExitedEarly = true; - return true; - } - else if ( key == '0' ) - { - m_iState = STATE_NONE; - Warning( "\n\nExited menu.\n\n" ); - } - } - } - - return false; - } - -public: - enum - { - STATE_NONE, - STATE_AT_MENU - }; - - bool m_bExitedEarly; - int m_iState; // STATE_ enum. -}; - - -CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks; - - -void CheckExitedEarly() -{ - if ( g_VisDistributeWorkCallbacks.m_bExitedEarly ) - { - Warning( "\nExited early, using fastvis results...\n" ); - Warning( "Exited early, using fastvis results...\n" ); - - // Use the fastvis results for portals that we didn't get results for. - for ( int i=0; i < g_numportals*2; i++ ) - { - if ( sorted_portals[i]->status != stat_done ) - { - sorted_portals[i]->portalvis = sorted_portals[i]->portalflood; - sorted_portals[i]->status = stat_done; - } - } - } -} - - -//----------------------------------------- -// -// Run PortalFlow across all available processing nodes -// -void RunMPIPortalFlow() -{ - Msg( "%-20s ", "MPIPortalFlow:" ); - if ( g_bMPIMaster ) - StartPacifier(""); - - // Workers wait until we get the MC socket address. - g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID(); - if ( g_bMPIMaster ) - { - CCycleCount cnt; - cnt.Sample(); - CUniformRandomStream randomStream; - randomStream.SetSeed( cnt.GetMicroseconds() ); - - g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else. - g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 ); - g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 ); - g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 ); - g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 ); - - g_pPortalMCSocket = CreateIPSocket(); - int i=0; - for ( i; i < 5; i++ ) - { - if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) ) - break; - } - if ( i == 5 ) - { - Error( "RunMPIPortalFlow: can't open a socket to multicast on." ); - } - - char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR }; - VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT ); - } - else - { - VMPI_SetCurrentStage( "wait for MC address" ); - - while ( !g_bGotMCAddr ) - { - VMPI_DispatchNextMessage(); - } - - // Open our multicast receive socket. - g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr ); - if ( !g_pPortalMCSocket ) - { - char err[512]; - IP_GetLastErrorString( err, sizeof( err ) ); - Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err ); - } - - // Make a thread to listen for the data on the multicast socket. - DWORD dwDummy = 0; - g_MCThreadExitEvent.Init( false, false ); - - // Make sure we kill the MC thread if the app exits ungracefully. - CmdLib_AtCleanup( MCThreadCleanupFn ); - - g_hMCThread = CreateThread( - NULL, - 0, - PortalMCThreadFn, - NULL, - 0, - &dwDummy ); - - if ( !g_hMCThread ) - { - Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." ); - } - } - - VMPI_SetCurrentStage( "RunMPIBasePortalFlow" ); - - - g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks; - - g_CPUTime.Init(); - double elapsed = DistributeWork( - g_numportals * 2, // # work units - VMPI_DISTRIBUTEWORK_PACKETID, // packet ID - ProcessPortalFlow, // Worker function to process work units - ReceivePortalFlow // Master function to receive work results - ); - - g_pDistributeWorkCallbacks = NULL; - - CheckExitedEarly(); - - // Stop the multicast stuff. - VMPI_DeletePortalMCSocket(); - - if( !g_bMPIMaster ) - { - if ( g_iVMPIVerboseLevel >= 1 ) - { - Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 ); - Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads ); - } - - Msg( "VVIS worker finished. Over and out.\n" ); - VMPI_SetCurrentStage( "worker done" ); - - CmdLib_Exit( 0 ); - } - - if ( g_bMPIMaster ) - { - EndPacifier( false ); - Msg( " (%d)\n", (int)elapsed ); - } -} - +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +// $NoKeywords: $ +// +//=============================================================================// + +#include +#include "vis.h" +#include "threads.h" +#include "stdlib.h" +#include "pacifier.h" +#include "mpi_stats.h" +#include "vmpi.h" +#include "vmpi_dispatch.h" +#include "vmpi_filesystem.h" +#include "vmpi_distribute_work.h" +#include "iphelpers.h" +#include "threadhelpers.h" +#include "vstdlib/random.h" +#include "vmpi_tools_shared.h" +#include +#include "scratchpad_helpers.h" + + +#define VMPI_VVIS_PACKET_ID 1 + // Sub packet IDs. + #define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect. + #define VMPI_SUBPACKETID_BASEPORTALVIS 5 + #define VMPI_SUBPACKETID_PORTALFLOW 6 + #define VMPI_BASEPORTALVIS_RESULTS 7 + #define VMPI_BASEPORTALVIS_WORKER_DONE 8 + #define VMPI_PORTALFLOW_RESULTS 9 + #define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11 + #define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12 + #define VMPI_SUBPACKETID_MC_ADDR 13 + +// DistributeWork owns this packet ID. +#define VMPI_DISTRIBUTEWORK_PACKETID 2 + + +extern bool fastvis; + +// The worker waits until these are true. +bool g_bBasePortalVisSync = false; +bool g_bPortalFlowSync = false; + +CUtlVector g_BasePortalVisResultsFilename; + +CCycleCount g_CPUTime; + + +// This stuff is all for the multicast channel the master uses to send out the portal results. +ISocket *g_pPortalMCSocket = NULL; +CIPAddr g_PortalMCAddr; +bool g_bGotMCAddr = false; +HANDLE g_hMCThread = NULL; +CEvent g_MCThreadExitEvent; +unsigned long g_PortalMCThreadUniqueID = 0; +int g_nMulticastPortalsReceived = 0; + + +// Handle VVIS packets. +bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID ) +{ + switch ( pBuf->data[1] ) + { + case VMPI_SUBPACKETID_MC_ADDR: + { + pBuf->setOffset( 2 ); + pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) ); + g_bGotMCAddr = true; + return true; + } + + case VMPI_SUBPACKETID_DISCONNECT_NOTIFY: + { + // This is just used to cause nonblocking dispatches to jump out so loops like the one + // in AppBarrier can handle the fact that there are disconnects. + return true; + } + + case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC: + { + g_bBasePortalVisSync = true; + return true; + } + + case VMPI_SUBPACKETID_PORTALFLOW_SYNC: + { + g_bPortalFlowSync = true; + return true; + } + + case VMPI_BASEPORTALVIS_RESULTS: + { + const char *pFilename = &pBuf->data[2]; + g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 ); + return true; + } + + default: + { + return false; + } + } +} +CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want +CDispatchReg g_DistributeWorkReg( VMPI_DISTRIBUTEWORK_PACKETID, DistributeWorkDispatch ); + + + +void VMPI_DeletePortalMCSocket() +{ + // Stop the thread if it exists. + if ( g_hMCThread ) + { + g_MCThreadExitEvent.SetEvent(); + WaitForSingleObject( g_hMCThread, INFINITE ); + CloseHandle( g_hMCThread ); + g_hMCThread = NULL; + } + + if ( g_pPortalMCSocket ) + { + g_pPortalMCSocket->Release(); + g_pPortalMCSocket = NULL; + } +} + + +void VVIS_SetupMPI( int &argc, char **&argv ) +{ + if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) ) + return; + + CmdLib_AtCleanup( VMPI_Stats_Term ); + CmdLib_AtCleanup( VMPI_DeletePortalMCSocket ); + + VMPI_Stats_InstallSpewHook(); + + // Force local mode? + VMPIRunMode mode; + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) ) + mode = VMPI_RUN_LOCAL; + else + mode = VMPI_RUN_NETWORKED; + + // + // Extract mpi specific arguments + // + Msg( "Initializing VMPI...\n" ); + if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) ) + { + Error( "MPI_Init failed." ); + } + + StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" ); +} + + +void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf ) +{ + CTimeAdder adder( &g_CPUTime ); + + BasePortalVis( iThread, iPortal ); + + // Send my result to the master + if ( pBuf ) + { + portal_t * p = &portals[iPortal]; + pBuf->write( p->portalfront, portalbytes ); + pBuf->write( p->portalflood, portalbytes ); + } +} + + +void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker ) +{ + portal_t * p = &portals[iWorkUnit]; + if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0) + { + Msg("Duplicate portal %llu\n", iWorkUnit); + } + + if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 ) + Error( "Invalid packet in ReceiveBasePortalVis." ); + + // + // allocate memory for bitwise vis solutions for this portal + // + p->portalfront = (byte*)malloc (portalbytes); + pBuf->read( p->portalfront, portalbytes ); + + p->portalflood = (byte*)malloc (portalbytes); + pBuf->read( p->portalflood, portalbytes ); + + p->portalvis = (byte*)malloc (portalbytes); + memset (p->portalvis, 0, portalbytes); + + p->nummightsee = CountBits( p->portalflood, g_numportals*2 ); +} + + +//----------------------------------------- +// +// Run BasePortalVis across all available processing nodes +// Then collect and redistribute the results. +// +void RunMPIBasePortalVis() +{ + int i; + + Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 ); + Msg("%-20s ", "BasePortalVis:"); + if ( g_bMPIMaster ) + StartPacifier(""); + + + VMPI_SetCurrentStage( "RunMPIBasePortalVis" ); + + // Note: we're aiming for about 1500 portals in a map, so about 3000 work units. + g_CPUTime.Init(); + double elapsed = DistributeWork( + g_numportals * 2, // # work units + VMPI_DISTRIBUTEWORK_PACKETID, // packet ID + ProcessBasePortalVis, // Worker function to process work units + ReceiveBasePortalVis // Master function to receive work results + ); + + if ( g_bMPIMaster ) + { + EndPacifier( false ); + Msg( " (%d)\n", (int)elapsed ); + } + + // + // Distribute the results to all the workers. + // + if ( g_bMPIMaster ) + { + if ( !fastvis ) + { + VMPI_SetCurrentStage( "SendPortalResults" ); + + // Store all the portal results in a temp file and multicast that to the workers. + CUtlVector allPortalData; + allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 ); + + char *pOut = allPortalData.Base(); + for ( i=0; i < g_numportals * 2; i++) + { + portal_t *p = &portals[i]; + + memcpy( pOut, p->portalfront, portalbytes ); + pOut += portalbytes; + + memcpy( pOut, p->portalflood, portalbytes ); + pOut += portalbytes; + } + + const char *pVirtualFilename = "--portal-results--"; + VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() ); + + char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS }; + VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT ); + } + } + else + { + VMPI_SetCurrentStage( "RecvPortalResults" ); + + // Wait until we've received the filename from the master. + while ( g_BasePortalVisResultsFilename.Count() == 0 ) + { + VMPI_DispatchNextMessage(); + } + + // Open + FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID ); + if ( !fp ) + Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() ); + + for ( i=0; i < g_numportals * 2; i++) + { + portal_t *p = &portals[i]; + + p->portalfront = (byte*)malloc (portalbytes); + g_pFileSystem->Read( p->portalfront, portalbytes, fp ); + + p->portalflood = (byte*)malloc (portalbytes); + g_pFileSystem->Read( p->portalflood, portalbytes, fp ); + + p->portalvis = (byte*)malloc (portalbytes); + memset (p->portalvis, 0, portalbytes); + + p->nummightsee = CountBits (p->portalflood, g_numportals*2); + } + + g_pFileSystem->Close( fp ); + } + + + if ( !g_bMPIMaster ) + { + if ( g_iVMPIVerboseLevel >= 1 ) + Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads ); + } +} + + + +void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf ) +{ + // Process Portal and distribute results + CTimeAdder adder( &g_CPUTime ); + + PortalFlow( iThread, iPortal ); + + // Send my result to root and potentially the other slaves + // The slave results are read in RecursiveLeafFlow + // + if ( pBuf ) + { + portal_t * p = sorted_portals[iPortal]; + pBuf->write( p->portalvis, portalbytes ); + } +} + + +void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker ) +{ + portal_t *p = sorted_portals[iWorkUnit]; + + if ( p->status != stat_done ) + { + pBuf->read( p->portalvis, portalbytes ); + p->status = stat_done; + + + // Multicast the status of this portal out. + if ( g_pPortalMCSocket ) + { + char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS }; + void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis }; + int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes }; + + g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) ); + } + } +} + + +DWORD WINAPI PortalMCThreadFn( LPVOID p ) +{ + CUtlVector data; + data.SetSize( portalbytes + 128 ); + + DWORD waitTime = 0; + while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 ) + { + CIPAddr ipFrom; + int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom ); + if ( len == -1 ) + { + waitTime = 20; + } + else + { + // These lengths must match exactly what is sent in ReceivePortalFlow. + if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes ) + { + // Perform more validation... + if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS ) + { + if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID ) + { + int iWorkUnit = *((int*)&data[6]); + if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 ) + { + portal_t *p = sorted_portals[iWorkUnit]; + if ( p ) + { + ++g_nMulticastPortalsReceived; + memcpy( p->portalvis, &data[10], portalbytes ); + p->status = stat_done; + waitTime = 0; + } + } + } + } + } + } + } + + return 0; +} + + +void MCThreadCleanupFn() +{ + g_MCThreadExitEvent.SetEvent(); +} + + +// --------------------------------------------------------------------------------- // +// Cheesy hack to let them stop the job early and keep the results of what has +// been done so far. +// --------------------------------------------------------------------------------- // + +class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks +{ +public: + CVisDistributeWorkCallbacks() + { + m_bExitedEarly = false; + m_iState = STATE_NONE; + } + + virtual bool Update() + { + if ( kbhit() ) + { + int key = toupper( getch() ); + if ( m_iState == STATE_NONE ) + { + if ( key == 'M' ) + { + m_iState = STATE_AT_MENU; + Warning("\n\n" + "----------------------\n" + "1. Write scratchpad file.\n" + "2. Exit early and use fast vis for remaining portals.\n" + "\n" + "0. Exit menu.\n" + "----------------------\n" + "\n" + ); + } + } + else if ( m_iState == STATE_AT_MENU ) + { + if ( key == '1' ) + { + Warning( + "\n" + "\nWriting scratchpad file." + "\nCommand line: scratchpad3dviewer -file scratch.pad\n" + "\nRed portals are the portals that are fast vis'd." + "\n" + ); + m_iState = STATE_NONE; + IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" ); + if ( pPad ) + { + ScratchPad_DrawWorld( pPad, false ); + + // Draw the portals that haven't been vis'd. + for ( int i=0; i < g_numportals*2; i++ ) + { + portal_t *p = sorted_portals[i]; + ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) ); + } + + pPad->Release(); + } + } + else if ( key == '2' ) + { + // Exit the process early. + m_bExitedEarly = true; + return true; + } + else if ( key == '0' ) + { + m_iState = STATE_NONE; + Warning( "\n\nExited menu.\n\n" ); + } + } + } + + return false; + } + +public: + enum + { + STATE_NONE, + STATE_AT_MENU + }; + + bool m_bExitedEarly; + int m_iState; // STATE_ enum. +}; + + +CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks; + + +void CheckExitedEarly() +{ + if ( g_VisDistributeWorkCallbacks.m_bExitedEarly ) + { + Warning( "\nExited early, using fastvis results...\n" ); + Warning( "Exited early, using fastvis results...\n" ); + + // Use the fastvis results for portals that we didn't get results for. + for ( int i=0; i < g_numportals*2; i++ ) + { + if ( sorted_portals[i]->status != stat_done ) + { + sorted_portals[i]->portalvis = sorted_portals[i]->portalflood; + sorted_portals[i]->status = stat_done; + } + } + } +} + + +//----------------------------------------- +// +// Run PortalFlow across all available processing nodes +// +void RunMPIPortalFlow() +{ + Msg( "%-20s ", "MPIPortalFlow:" ); + if ( g_bMPIMaster ) + StartPacifier(""); + + // Workers wait until we get the MC socket address. + g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID(); + if ( g_bMPIMaster ) + { + CCycleCount cnt; + cnt.Sample(); + CUniformRandomStream randomStream; + randomStream.SetSeed( cnt.GetMicroseconds() ); + + g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else. + g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 ); + g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 ); + g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 ); + g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 ); + + g_pPortalMCSocket = CreateIPSocket(); + int i=0; + for ( i; i < 5; i++ ) + { + if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) ) + break; + } + if ( i == 5 ) + { + Error( "RunMPIPortalFlow: can't open a socket to multicast on." ); + } + + char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR }; + VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT ); + } + else + { + VMPI_SetCurrentStage( "wait for MC address" ); + + while ( !g_bGotMCAddr ) + { + VMPI_DispatchNextMessage(); + } + + // Open our multicast receive socket. + g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr ); + if ( !g_pPortalMCSocket ) + { + char err[512]; + IP_GetLastErrorString( err, sizeof( err ) ); + Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err ); + } + + // Make a thread to listen for the data on the multicast socket. + DWORD dwDummy = 0; + g_MCThreadExitEvent.Init( false, false ); + + // Make sure we kill the MC thread if the app exits ungracefully. + CmdLib_AtCleanup( MCThreadCleanupFn ); + + g_hMCThread = CreateThread( + NULL, + 0, + PortalMCThreadFn, + NULL, + 0, + &dwDummy ); + + if ( !g_hMCThread ) + { + Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." ); + } + } + + VMPI_SetCurrentStage( "RunMPIBasePortalFlow" ); + + + g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks; + + g_CPUTime.Init(); + double elapsed = DistributeWork( + g_numportals * 2, // # work units + VMPI_DISTRIBUTEWORK_PACKETID, // packet ID + ProcessPortalFlow, // Worker function to process work units + ReceivePortalFlow // Master function to receive work results + ); + + g_pDistributeWorkCallbacks = NULL; + + CheckExitedEarly(); + + // Stop the multicast stuff. + VMPI_DeletePortalMCSocket(); + + if( !g_bMPIMaster ) + { + if ( g_iVMPIVerboseLevel >= 1 ) + { + Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 ); + Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads ); + } + + Msg( "VVIS worker finished. Over and out.\n" ); + VMPI_SetCurrentStage( "worker done" ); + + CmdLib_Exit( 0 ); + } + + if ( g_bMPIMaster ) + { + EndPacifier( false ); + Msg( " (%d)\n", (int)elapsed ); + } +} + -- cgit v1.2.3