diff options
| author | Jørgen P. Tjernø <[email protected]> | 2013-12-02 19:31:46 -0800 |
|---|---|---|
| committer | Jørgen P. Tjernø <[email protected]> | 2013-12-02 19:46:31 -0800 |
| commit | f56bb35301836e56582a575a75864392a0177875 (patch) | |
| tree | de61ddd39de3e7df52759711950b4c288592f0dc /mp/src/utils/vvis/mpivis.cpp | |
| parent | Mark some more files as text. (diff) | |
| download | source-sdk-2013-f56bb35301836e56582a575a75864392a0177875.tar.xz source-sdk-2013-f56bb35301836e56582a575a75864392a0177875.zip | |
Fix line endings. WHAMMY.
Diffstat (limited to 'mp/src/utils/vvis/mpivis.cpp')
| -rw-r--r-- | mp/src/utils/vvis/mpivis.cpp | 1280 |
1 files changed, 640 insertions, 640 deletions
diff --git a/mp/src/utils/vvis/mpivis.cpp b/mp/src/utils/vvis/mpivis.cpp index 6da76ebc..58b76331 100644 --- a/mp/src/utils/vvis/mpivis.cpp +++ b/mp/src/utils/vvis/mpivis.cpp @@ -1,640 +1,640 @@ -//========= Copyright Valve Corporation, All rights reserved. ============//
-//
-// Purpose:
-//
-// $NoKeywords: $
-//
-//=============================================================================//
-
-#include <windows.h>
-#include "vis.h"
-#include "threads.h"
-#include "stdlib.h"
-#include "pacifier.h"
-#include "mpi_stats.h"
-#include "vmpi.h"
-#include "vmpi_dispatch.h"
-#include "vmpi_filesystem.h"
-#include "vmpi_distribute_work.h"
-#include "iphelpers.h"
-#include "threadhelpers.h"
-#include "vstdlib/random.h"
-#include "vmpi_tools_shared.h"
-#include <conio.h>
-#include "scratchpad_helpers.h"
-
-
-#define VMPI_VVIS_PACKET_ID 1
- // Sub packet IDs.
- #define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect.
- #define VMPI_SUBPACKETID_BASEPORTALVIS 5
- #define VMPI_SUBPACKETID_PORTALFLOW 6
- #define VMPI_BASEPORTALVIS_RESULTS 7
- #define VMPI_BASEPORTALVIS_WORKER_DONE 8
- #define VMPI_PORTALFLOW_RESULTS 9
- #define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11
- #define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12
- #define VMPI_SUBPACKETID_MC_ADDR 13
-
-// DistributeWork owns this packet ID.
-#define VMPI_DISTRIBUTEWORK_PACKETID 2
-
-
-extern bool fastvis;
-
-// The worker waits until these are true.
-bool g_bBasePortalVisSync = false;
-bool g_bPortalFlowSync = false;
-
-CUtlVector<char> g_BasePortalVisResultsFilename;
-
-CCycleCount g_CPUTime;
-
-
-// This stuff is all for the multicast channel the master uses to send out the portal results.
-ISocket *g_pPortalMCSocket = NULL;
-CIPAddr g_PortalMCAddr;
-bool g_bGotMCAddr = false;
-HANDLE g_hMCThread = NULL;
-CEvent g_MCThreadExitEvent;
-unsigned long g_PortalMCThreadUniqueID = 0;
-int g_nMulticastPortalsReceived = 0;
-
-
-// Handle VVIS packets.
-bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID )
-{
- switch ( pBuf->data[1] )
- {
- case VMPI_SUBPACKETID_MC_ADDR:
- {
- pBuf->setOffset( 2 );
- pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) );
- g_bGotMCAddr = true;
- return true;
- }
-
- case VMPI_SUBPACKETID_DISCONNECT_NOTIFY:
- {
- // This is just used to cause nonblocking dispatches to jump out so loops like the one
- // in AppBarrier can handle the fact that there are disconnects.
- return true;
- }
-
- case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC:
- {
- g_bBasePortalVisSync = true;
- return true;
- }
-
- case VMPI_SUBPACKETID_PORTALFLOW_SYNC:
- {
- g_bPortalFlowSync = true;
- return true;
- }
-
- case VMPI_BASEPORTALVIS_RESULTS:
- {
- const char *pFilename = &pBuf->data[2];
- g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 );
- return true;
- }
-
- default:
- {
- return false;
- }
- }
-}
-CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want
-CDispatchReg g_DistributeWorkReg( VMPI_DISTRIBUTEWORK_PACKETID, DistributeWorkDispatch );
-
-
-
-void VMPI_DeletePortalMCSocket()
-{
- // Stop the thread if it exists.
- if ( g_hMCThread )
- {
- g_MCThreadExitEvent.SetEvent();
- WaitForSingleObject( g_hMCThread, INFINITE );
- CloseHandle( g_hMCThread );
- g_hMCThread = NULL;
- }
-
- if ( g_pPortalMCSocket )
- {
- g_pPortalMCSocket->Release();
- g_pPortalMCSocket = NULL;
- }
-}
-
-
-void VVIS_SetupMPI( int &argc, char **&argv )
-{
- if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) )
- return;
-
- CmdLib_AtCleanup( VMPI_Stats_Term );
- CmdLib_AtCleanup( VMPI_DeletePortalMCSocket );
-
- VMPI_Stats_InstallSpewHook();
-
- // Force local mode?
- VMPIRunMode mode;
- if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) )
- mode = VMPI_RUN_LOCAL;
- else
- mode = VMPI_RUN_NETWORKED;
-
- //
- // Extract mpi specific arguments
- //
- Msg( "Initializing VMPI...\n" );
- if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) )
- {
- Error( "MPI_Init failed." );
- }
-
- StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" );
-}
-
-
-void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf )
-{
- CTimeAdder adder( &g_CPUTime );
-
- BasePortalVis( iThread, iPortal );
-
- // Send my result to the master
- if ( pBuf )
- {
- portal_t * p = &portals[iPortal];
- pBuf->write( p->portalfront, portalbytes );
- pBuf->write( p->portalflood, portalbytes );
- }
-}
-
-
-void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
-{
- portal_t * p = &portals[iWorkUnit];
- if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0)
- {
- Msg("Duplicate portal %llu\n", iWorkUnit);
- }
-
- if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 )
- Error( "Invalid packet in ReceiveBasePortalVis." );
-
- //
- // allocate memory for bitwise vis solutions for this portal
- //
- p->portalfront = (byte*)malloc (portalbytes);
- pBuf->read( p->portalfront, portalbytes );
-
- p->portalflood = (byte*)malloc (portalbytes);
- pBuf->read( p->portalflood, portalbytes );
-
- p->portalvis = (byte*)malloc (portalbytes);
- memset (p->portalvis, 0, portalbytes);
-
- p->nummightsee = CountBits( p->portalflood, g_numportals*2 );
-}
-
-
-//-----------------------------------------
-//
-// Run BasePortalVis across all available processing nodes
-// Then collect and redistribute the results.
-//
-void RunMPIBasePortalVis()
-{
- int i;
-
- Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 );
- Msg("%-20s ", "BasePortalVis:");
- if ( g_bMPIMaster )
- StartPacifier("");
-
-
- VMPI_SetCurrentStage( "RunMPIBasePortalVis" );
-
- // Note: we're aiming for about 1500 portals in a map, so about 3000 work units.
- g_CPUTime.Init();
- double elapsed = DistributeWork(
- g_numportals * 2, // # work units
- VMPI_DISTRIBUTEWORK_PACKETID, // packet ID
- ProcessBasePortalVis, // Worker function to process work units
- ReceiveBasePortalVis // Master function to receive work results
- );
-
- if ( g_bMPIMaster )
- {
- EndPacifier( false );
- Msg( " (%d)\n", (int)elapsed );
- }
-
- //
- // Distribute the results to all the workers.
- //
- if ( g_bMPIMaster )
- {
- if ( !fastvis )
- {
- VMPI_SetCurrentStage( "SendPortalResults" );
-
- // Store all the portal results in a temp file and multicast that to the workers.
- CUtlVector<char> allPortalData;
- allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 );
-
- char *pOut = allPortalData.Base();
- for ( i=0; i < g_numportals * 2; i++)
- {
- portal_t *p = &portals[i];
-
- memcpy( pOut, p->portalfront, portalbytes );
- pOut += portalbytes;
-
- memcpy( pOut, p->portalflood, portalbytes );
- pOut += portalbytes;
- }
-
- const char *pVirtualFilename = "--portal-results--";
- VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() );
-
- char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS };
- VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT );
- }
- }
- else
- {
- VMPI_SetCurrentStage( "RecvPortalResults" );
-
- // Wait until we've received the filename from the master.
- while ( g_BasePortalVisResultsFilename.Count() == 0 )
- {
- VMPI_DispatchNextMessage();
- }
-
- // Open
- FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID );
- if ( !fp )
- Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() );
-
- for ( i=0; i < g_numportals * 2; i++)
- {
- portal_t *p = &portals[i];
-
- p->portalfront = (byte*)malloc (portalbytes);
- g_pFileSystem->Read( p->portalfront, portalbytes, fp );
-
- p->portalflood = (byte*)malloc (portalbytes);
- g_pFileSystem->Read( p->portalflood, portalbytes, fp );
-
- p->portalvis = (byte*)malloc (portalbytes);
- memset (p->portalvis, 0, portalbytes);
-
- p->nummightsee = CountBits (p->portalflood, g_numportals*2);
- }
-
- g_pFileSystem->Close( fp );
- }
-
-
- if ( !g_bMPIMaster )
- {
- if ( g_iVMPIVerboseLevel >= 1 )
- Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
- }
-}
-
-
-
-void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf )
-{
- // Process Portal and distribute results
- CTimeAdder adder( &g_CPUTime );
-
- PortalFlow( iThread, iPortal );
-
- // Send my result to root and potentially the other slaves
- // The slave results are read in RecursiveLeafFlow
- //
- if ( pBuf )
- {
- portal_t * p = sorted_portals[iPortal];
- pBuf->write( p->portalvis, portalbytes );
- }
-}
-
-
-void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker )
-{
- portal_t *p = sorted_portals[iWorkUnit];
-
- if ( p->status != stat_done )
- {
- pBuf->read( p->portalvis, portalbytes );
- p->status = stat_done;
-
-
- // Multicast the status of this portal out.
- if ( g_pPortalMCSocket )
- {
- char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS };
- void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis };
- int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes };
-
- g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) );
- }
- }
-}
-
-
-DWORD WINAPI PortalMCThreadFn( LPVOID p )
-{
- CUtlVector<char> data;
- data.SetSize( portalbytes + 128 );
-
- DWORD waitTime = 0;
- while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 )
- {
- CIPAddr ipFrom;
- int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom );
- if ( len == -1 )
- {
- waitTime = 20;
- }
- else
- {
- // These lengths must match exactly what is sent in ReceivePortalFlow.
- if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes )
- {
- // Perform more validation...
- if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS )
- {
- if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID )
- {
- int iWorkUnit = *((int*)&data[6]);
- if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 )
- {
- portal_t *p = sorted_portals[iWorkUnit];
- if ( p )
- {
- ++g_nMulticastPortalsReceived;
- memcpy( p->portalvis, &data[10], portalbytes );
- p->status = stat_done;
- waitTime = 0;
- }
- }
- }
- }
- }
- }
- }
-
- return 0;
-}
-
-
-void MCThreadCleanupFn()
-{
- g_MCThreadExitEvent.SetEvent();
-}
-
-
-// --------------------------------------------------------------------------------- //
-// Cheesy hack to let them stop the job early and keep the results of what has
-// been done so far.
-// --------------------------------------------------------------------------------- //
-
-class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks
-{
-public:
- CVisDistributeWorkCallbacks()
- {
- m_bExitedEarly = false;
- m_iState = STATE_NONE;
- }
-
- virtual bool Update()
- {
- if ( kbhit() )
- {
- int key = toupper( getch() );
- if ( m_iState == STATE_NONE )
- {
- if ( key == 'M' )
- {
- m_iState = STATE_AT_MENU;
- Warning("\n\n"
- "----------------------\n"
- "1. Write scratchpad file.\n"
- "2. Exit early and use fast vis for remaining portals.\n"
- "\n"
- "0. Exit menu.\n"
- "----------------------\n"
- "\n"
- );
- }
- }
- else if ( m_iState == STATE_AT_MENU )
- {
- if ( key == '1' )
- {
- Warning(
- "\n"
- "\nWriting scratchpad file."
- "\nCommand line: scratchpad3dviewer -file scratch.pad\n"
- "\nRed portals are the portals that are fast vis'd."
- "\n"
- );
- m_iState = STATE_NONE;
- IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" );
- if ( pPad )
- {
- ScratchPad_DrawWorld( pPad, false );
-
- // Draw the portals that haven't been vis'd.
- for ( int i=0; i < g_numportals*2; i++ )
- {
- portal_t *p = sorted_portals[i];
- ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) );
- }
-
- pPad->Release();
- }
- }
- else if ( key == '2' )
- {
- // Exit the process early.
- m_bExitedEarly = true;
- return true;
- }
- else if ( key == '0' )
- {
- m_iState = STATE_NONE;
- Warning( "\n\nExited menu.\n\n" );
- }
- }
- }
-
- return false;
- }
-
-public:
- enum
- {
- STATE_NONE,
- STATE_AT_MENU
- };
-
- bool m_bExitedEarly;
- int m_iState; // STATE_ enum.
-};
-
-
-CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks;
-
-
-void CheckExitedEarly()
-{
- if ( g_VisDistributeWorkCallbacks.m_bExitedEarly )
- {
- Warning( "\nExited early, using fastvis results...\n" );
- Warning( "Exited early, using fastvis results...\n" );
-
- // Use the fastvis results for portals that we didn't get results for.
- for ( int i=0; i < g_numportals*2; i++ )
- {
- if ( sorted_portals[i]->status != stat_done )
- {
- sorted_portals[i]->portalvis = sorted_portals[i]->portalflood;
- sorted_portals[i]->status = stat_done;
- }
- }
- }
-}
-
-
-//-----------------------------------------
-//
-// Run PortalFlow across all available processing nodes
-//
-void RunMPIPortalFlow()
-{
- Msg( "%-20s ", "MPIPortalFlow:" );
- if ( g_bMPIMaster )
- StartPacifier("");
-
- // Workers wait until we get the MC socket address.
- g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID();
- if ( g_bMPIMaster )
- {
- CCycleCount cnt;
- cnt.Sample();
- CUniformRandomStream randomStream;
- randomStream.SetSeed( cnt.GetMicroseconds() );
-
- g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else.
- g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 );
- g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 );
- g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 );
- g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 );
-
- g_pPortalMCSocket = CreateIPSocket();
- int i=0;
- for ( i; i < 5; i++ )
- {
- if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) )
- break;
- }
- if ( i == 5 )
- {
- Error( "RunMPIPortalFlow: can't open a socket to multicast on." );
- }
-
- char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR };
- VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT );
- }
- else
- {
- VMPI_SetCurrentStage( "wait for MC address" );
-
- while ( !g_bGotMCAddr )
- {
- VMPI_DispatchNextMessage();
- }
-
- // Open our multicast receive socket.
- g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr );
- if ( !g_pPortalMCSocket )
- {
- char err[512];
- IP_GetLastErrorString( err, sizeof( err ) );
- Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err );
- }
-
- // Make a thread to listen for the data on the multicast socket.
- DWORD dwDummy = 0;
- g_MCThreadExitEvent.Init( false, false );
-
- // Make sure we kill the MC thread if the app exits ungracefully.
- CmdLib_AtCleanup( MCThreadCleanupFn );
-
- g_hMCThread = CreateThread(
- NULL,
- 0,
- PortalMCThreadFn,
- NULL,
- 0,
- &dwDummy );
-
- if ( !g_hMCThread )
- {
- Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." );
- }
- }
-
- VMPI_SetCurrentStage( "RunMPIBasePortalFlow" );
-
-
- g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks;
-
- g_CPUTime.Init();
- double elapsed = DistributeWork(
- g_numportals * 2, // # work units
- VMPI_DISTRIBUTEWORK_PACKETID, // packet ID
- ProcessPortalFlow, // Worker function to process work units
- ReceivePortalFlow // Master function to receive work results
- );
-
- g_pDistributeWorkCallbacks = NULL;
-
- CheckExitedEarly();
-
- // Stop the multicast stuff.
- VMPI_DeletePortalMCSocket();
-
- if( !g_bMPIMaster )
- {
- if ( g_iVMPIVerboseLevel >= 1 )
- {
- Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 );
- Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads );
- }
-
- Msg( "VVIS worker finished. Over and out.\n" );
- VMPI_SetCurrentStage( "worker done" );
-
- CmdLib_Exit( 0 );
- }
-
- if ( g_bMPIMaster )
- {
- EndPacifier( false );
- Msg( " (%d)\n", (int)elapsed );
- }
-}
-
+//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +// $NoKeywords: $ +// +//=============================================================================// + +#include <windows.h> +#include "vis.h" +#include "threads.h" +#include "stdlib.h" +#include "pacifier.h" +#include "mpi_stats.h" +#include "vmpi.h" +#include "vmpi_dispatch.h" +#include "vmpi_filesystem.h" +#include "vmpi_distribute_work.h" +#include "iphelpers.h" +#include "threadhelpers.h" +#include "vstdlib/random.h" +#include "vmpi_tools_shared.h" +#include <conio.h> +#include "scratchpad_helpers.h" + + +#define VMPI_VVIS_PACKET_ID 1 + // Sub packet IDs. + #define VMPI_SUBPACKETID_DISCONNECT_NOTIFY 3 // We send ourselves this when there is a disconnect. + #define VMPI_SUBPACKETID_BASEPORTALVIS 5 + #define VMPI_SUBPACKETID_PORTALFLOW 6 + #define VMPI_BASEPORTALVIS_RESULTS 7 + #define VMPI_BASEPORTALVIS_WORKER_DONE 8 + #define VMPI_PORTALFLOW_RESULTS 9 + #define VMPI_SUBPACKETID_BASEPORTALVIS_SYNC 11 + #define VMPI_SUBPACKETID_PORTALFLOW_SYNC 12 + #define VMPI_SUBPACKETID_MC_ADDR 13 + +// DistributeWork owns this packet ID. +#define VMPI_DISTRIBUTEWORK_PACKETID 2 + + +extern bool fastvis; + +// The worker waits until these are true. +bool g_bBasePortalVisSync = false; +bool g_bPortalFlowSync = false; + +CUtlVector<char> g_BasePortalVisResultsFilename; + +CCycleCount g_CPUTime; + + +// This stuff is all for the multicast channel the master uses to send out the portal results. +ISocket *g_pPortalMCSocket = NULL; +CIPAddr g_PortalMCAddr; +bool g_bGotMCAddr = false; +HANDLE g_hMCThread = NULL; +CEvent g_MCThreadExitEvent; +unsigned long g_PortalMCThreadUniqueID = 0; +int g_nMulticastPortalsReceived = 0; + + +// Handle VVIS packets. +bool VVIS_DispatchFn( MessageBuffer *pBuf, int iSource, int iPacketID ) +{ + switch ( pBuf->data[1] ) + { + case VMPI_SUBPACKETID_MC_ADDR: + { + pBuf->setOffset( 2 ); + pBuf->read( &g_PortalMCAddr, sizeof( g_PortalMCAddr ) ); + g_bGotMCAddr = true; + return true; + } + + case VMPI_SUBPACKETID_DISCONNECT_NOTIFY: + { + // This is just used to cause nonblocking dispatches to jump out so loops like the one + // in AppBarrier can handle the fact that there are disconnects. + return true; + } + + case VMPI_SUBPACKETID_BASEPORTALVIS_SYNC: + { + g_bBasePortalVisSync = true; + return true; + } + + case VMPI_SUBPACKETID_PORTALFLOW_SYNC: + { + g_bPortalFlowSync = true; + return true; + } + + case VMPI_BASEPORTALVIS_RESULTS: + { + const char *pFilename = &pBuf->data[2]; + g_BasePortalVisResultsFilename.CopyArray( pFilename, strlen( pFilename ) + 1 ); + return true; + } + + default: + { + return false; + } + } +} +CDispatchReg g_VVISDispatchReg( VMPI_VVIS_PACKET_ID, VVIS_DispatchFn ); // register to handle the messages we want +CDispatchReg g_DistributeWorkReg( VMPI_DISTRIBUTEWORK_PACKETID, DistributeWorkDispatch ); + + + +void VMPI_DeletePortalMCSocket() +{ + // Stop the thread if it exists. + if ( g_hMCThread ) + { + g_MCThreadExitEvent.SetEvent(); + WaitForSingleObject( g_hMCThread, INFINITE ); + CloseHandle( g_hMCThread ); + g_hMCThread = NULL; + } + + if ( g_pPortalMCSocket ) + { + g_pPortalMCSocket->Release(); + g_pPortalMCSocket = NULL; + } +} + + +void VVIS_SetupMPI( int &argc, char **&argv ) +{ + if ( !VMPI_FindArg( argc, argv, "-mpi", "" ) && !VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Worker ), "" ) ) + return; + + CmdLib_AtCleanup( VMPI_Stats_Term ); + CmdLib_AtCleanup( VMPI_DeletePortalMCSocket ); + + VMPI_Stats_InstallSpewHook(); + + // Force local mode? + VMPIRunMode mode; + if ( VMPI_FindArg( argc, argv, VMPI_GetParamString( mpi_Local ), "" ) ) + mode = VMPI_RUN_LOCAL; + else + mode = VMPI_RUN_NETWORKED; + + // + // Extract mpi specific arguments + // + Msg( "Initializing VMPI...\n" ); + if ( !VMPI_Init( argc, argv, "dependency_info_vvis.txt", HandleMPIDisconnect, mode ) ) + { + Error( "MPI_Init failed." ); + } + + StatsDB_InitStatsDatabase( argc, argv, "dbinfo_vvis.txt" ); +} + + +void ProcessBasePortalVis( int iThread, uint64 iPortal, MessageBuffer *pBuf ) +{ + CTimeAdder adder( &g_CPUTime ); + + BasePortalVis( iThread, iPortal ); + + // Send my result to the master + if ( pBuf ) + { + portal_t * p = &portals[iPortal]; + pBuf->write( p->portalfront, portalbytes ); + pBuf->write( p->portalflood, portalbytes ); + } +} + + +void ReceiveBasePortalVis( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker ) +{ + portal_t * p = &portals[iWorkUnit]; + if ( p->portalflood != 0 || p->portalfront != 0 || p->portalvis != 0) + { + Msg("Duplicate portal %llu\n", iWorkUnit); + } + + if ( pBuf->getLen() - pBuf->getOffset() != portalbytes*2 ) + Error( "Invalid packet in ReceiveBasePortalVis." ); + + // + // allocate memory for bitwise vis solutions for this portal + // + p->portalfront = (byte*)malloc (portalbytes); + pBuf->read( p->portalfront, portalbytes ); + + p->portalflood = (byte*)malloc (portalbytes); + pBuf->read( p->portalflood, portalbytes ); + + p->portalvis = (byte*)malloc (portalbytes); + memset (p->portalvis, 0, portalbytes); + + p->nummightsee = CountBits( p->portalflood, g_numportals*2 ); +} + + +//----------------------------------------- +// +// Run BasePortalVis across all available processing nodes +// Then collect and redistribute the results. +// +void RunMPIBasePortalVis() +{ + int i; + + Msg( "\n\nportalbytes: %d\nNum Work Units: %d\nTotal data size: %d\n", portalbytes, g_numportals*2, portalbytes*g_numportals*2 ); + Msg("%-20s ", "BasePortalVis:"); + if ( g_bMPIMaster ) + StartPacifier(""); + + + VMPI_SetCurrentStage( "RunMPIBasePortalVis" ); + + // Note: we're aiming for about 1500 portals in a map, so about 3000 work units. + g_CPUTime.Init(); + double elapsed = DistributeWork( + g_numportals * 2, // # work units + VMPI_DISTRIBUTEWORK_PACKETID, // packet ID + ProcessBasePortalVis, // Worker function to process work units + ReceiveBasePortalVis // Master function to receive work results + ); + + if ( g_bMPIMaster ) + { + EndPacifier( false ); + Msg( " (%d)\n", (int)elapsed ); + } + + // + // Distribute the results to all the workers. + // + if ( g_bMPIMaster ) + { + if ( !fastvis ) + { + VMPI_SetCurrentStage( "SendPortalResults" ); + + // Store all the portal results in a temp file and multicast that to the workers. + CUtlVector<char> allPortalData; + allPortalData.SetSize( g_numportals * 2 * portalbytes * 2 ); + + char *pOut = allPortalData.Base(); + for ( i=0; i < g_numportals * 2; i++) + { + portal_t *p = &portals[i]; + + memcpy( pOut, p->portalfront, portalbytes ); + pOut += portalbytes; + + memcpy( pOut, p->portalflood, portalbytes ); + pOut += portalbytes; + } + + const char *pVirtualFilename = "--portal-results--"; + VMPI_FileSystem_CreateVirtualFile( pVirtualFilename, allPortalData.Base(), allPortalData.Count() ); + + char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_BASEPORTALVIS_RESULTS }; + VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), pVirtualFilename, strlen( pVirtualFilename ) + 1, VMPI_PERSISTENT ); + } + } + else + { + VMPI_SetCurrentStage( "RecvPortalResults" ); + + // Wait until we've received the filename from the master. + while ( g_BasePortalVisResultsFilename.Count() == 0 ) + { + VMPI_DispatchNextMessage(); + } + + // Open + FileHandle_t fp = g_pFileSystem->Open( g_BasePortalVisResultsFilename.Base(), "rb", VMPI_VIRTUAL_FILES_PATH_ID ); + if ( !fp ) + Error( "Can't open '%s' to read portal info.", g_BasePortalVisResultsFilename.Base() ); + + for ( i=0; i < g_numportals * 2; i++) + { + portal_t *p = &portals[i]; + + p->portalfront = (byte*)malloc (portalbytes); + g_pFileSystem->Read( p->portalfront, portalbytes, fp ); + + p->portalflood = (byte*)malloc (portalbytes); + g_pFileSystem->Read( p->portalflood, portalbytes, fp ); + + p->portalvis = (byte*)malloc (portalbytes); + memset (p->portalvis, 0, portalbytes); + + p->nummightsee = CountBits (p->portalflood, g_numportals*2); + } + + g_pFileSystem->Close( fp ); + } + + + if ( !g_bMPIMaster ) + { + if ( g_iVMPIVerboseLevel >= 1 ) + Msg( "\n%% worker CPU utilization during BasePortalVis: %.1f\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads ); + } +} + + + +void ProcessPortalFlow( int iThread, uint64 iPortal, MessageBuffer *pBuf ) +{ + // Process Portal and distribute results + CTimeAdder adder( &g_CPUTime ); + + PortalFlow( iThread, iPortal ); + + // Send my result to root and potentially the other slaves + // The slave results are read in RecursiveLeafFlow + // + if ( pBuf ) + { + portal_t * p = sorted_portals[iPortal]; + pBuf->write( p->portalvis, portalbytes ); + } +} + + +void ReceivePortalFlow( uint64 iWorkUnit, MessageBuffer *pBuf, int iWorker ) +{ + portal_t *p = sorted_portals[iWorkUnit]; + + if ( p->status != stat_done ) + { + pBuf->read( p->portalvis, portalbytes ); + p->status = stat_done; + + + // Multicast the status of this portal out. + if ( g_pPortalMCSocket ) + { + char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_PORTALFLOW_RESULTS }; + void *chunks[4] = { cPacketID, &g_PortalMCThreadUniqueID, &iWorkUnit, p->portalvis }; + int chunkLengths[4] = { sizeof( cPacketID ), sizeof( g_PortalMCThreadUniqueID ), sizeof( iWorkUnit ), portalbytes }; + + g_pPortalMCSocket->SendChunksTo( &g_PortalMCAddr, chunks, chunkLengths, ARRAYSIZE( chunks ) ); + } + } +} + + +DWORD WINAPI PortalMCThreadFn( LPVOID p ) +{ + CUtlVector<char> data; + data.SetSize( portalbytes + 128 ); + + DWORD waitTime = 0; + while ( WaitForSingleObject( g_MCThreadExitEvent.GetEventHandle(), waitTime ) != WAIT_OBJECT_0 ) + { + CIPAddr ipFrom; + int len = g_pPortalMCSocket->RecvFrom( data.Base(), data.Count(), &ipFrom ); + if ( len == -1 ) + { + waitTime = 20; + } + else + { + // These lengths must match exactly what is sent in ReceivePortalFlow. + if ( len == 2 + sizeof( g_PortalMCThreadUniqueID ) + sizeof( int ) + portalbytes ) + { + // Perform more validation... + if ( data[0] == VMPI_VVIS_PACKET_ID && data[1] == VMPI_PORTALFLOW_RESULTS ) + { + if ( *((unsigned long*)&data[2]) == g_PortalMCThreadUniqueID ) + { + int iWorkUnit = *((int*)&data[6]); + if ( iWorkUnit >= 0 && iWorkUnit < g_numportals*2 ) + { + portal_t *p = sorted_portals[iWorkUnit]; + if ( p ) + { + ++g_nMulticastPortalsReceived; + memcpy( p->portalvis, &data[10], portalbytes ); + p->status = stat_done; + waitTime = 0; + } + } + } + } + } + } + } + + return 0; +} + + +void MCThreadCleanupFn() +{ + g_MCThreadExitEvent.SetEvent(); +} + + +// --------------------------------------------------------------------------------- // +// Cheesy hack to let them stop the job early and keep the results of what has +// been done so far. +// --------------------------------------------------------------------------------- // + +class CVisDistributeWorkCallbacks : public IWorkUnitDistributorCallbacks +{ +public: + CVisDistributeWorkCallbacks() + { + m_bExitedEarly = false; + m_iState = STATE_NONE; + } + + virtual bool Update() + { + if ( kbhit() ) + { + int key = toupper( getch() ); + if ( m_iState == STATE_NONE ) + { + if ( key == 'M' ) + { + m_iState = STATE_AT_MENU; + Warning("\n\n" + "----------------------\n" + "1. Write scratchpad file.\n" + "2. Exit early and use fast vis for remaining portals.\n" + "\n" + "0. Exit menu.\n" + "----------------------\n" + "\n" + ); + } + } + else if ( m_iState == STATE_AT_MENU ) + { + if ( key == '1' ) + { + Warning( + "\n" + "\nWriting scratchpad file." + "\nCommand line: scratchpad3dviewer -file scratch.pad\n" + "\nRed portals are the portals that are fast vis'd." + "\n" + ); + m_iState = STATE_NONE; + IScratchPad3D *pPad = ScratchPad3D_Create( "scratch.pad" ); + if ( pPad ) + { + ScratchPad_DrawWorld( pPad, false ); + + // Draw the portals that haven't been vis'd. + for ( int i=0; i < g_numportals*2; i++ ) + { + portal_t *p = sorted_portals[i]; + ScratchPad_DrawWinding( pPad, p->winding->numpoints, p->winding->points, Vector( 1, 0, 0 ), Vector( .3, .3, .3 ) ); + } + + pPad->Release(); + } + } + else if ( key == '2' ) + { + // Exit the process early. + m_bExitedEarly = true; + return true; + } + else if ( key == '0' ) + { + m_iState = STATE_NONE; + Warning( "\n\nExited menu.\n\n" ); + } + } + } + + return false; + } + +public: + enum + { + STATE_NONE, + STATE_AT_MENU + }; + + bool m_bExitedEarly; + int m_iState; // STATE_ enum. +}; + + +CVisDistributeWorkCallbacks g_VisDistributeWorkCallbacks; + + +void CheckExitedEarly() +{ + if ( g_VisDistributeWorkCallbacks.m_bExitedEarly ) + { + Warning( "\nExited early, using fastvis results...\n" ); + Warning( "Exited early, using fastvis results...\n" ); + + // Use the fastvis results for portals that we didn't get results for. + for ( int i=0; i < g_numportals*2; i++ ) + { + if ( sorted_portals[i]->status != stat_done ) + { + sorted_portals[i]->portalvis = sorted_portals[i]->portalflood; + sorted_portals[i]->status = stat_done; + } + } + } +} + + +//----------------------------------------- +// +// Run PortalFlow across all available processing nodes +// +void RunMPIPortalFlow() +{ + Msg( "%-20s ", "MPIPortalFlow:" ); + if ( g_bMPIMaster ) + StartPacifier(""); + + // Workers wait until we get the MC socket address. + g_PortalMCThreadUniqueID = StatsDB_GetUniqueJobID(); + if ( g_bMPIMaster ) + { + CCycleCount cnt; + cnt.Sample(); + CUniformRandomStream randomStream; + randomStream.SetSeed( cnt.GetMicroseconds() ); + + g_PortalMCAddr.port = randomStream.RandomInt( 22000, 25000 ); // Pulled out of something else. + g_PortalMCAddr.ip[0] = (unsigned char)RandomInt( 225, 238 ); + g_PortalMCAddr.ip[1] = (unsigned char)RandomInt( 0, 255 ); + g_PortalMCAddr.ip[2] = (unsigned char)RandomInt( 0, 255 ); + g_PortalMCAddr.ip[3] = (unsigned char)RandomInt( 3, 255 ); + + g_pPortalMCSocket = CreateIPSocket(); + int i=0; + for ( i; i < 5; i++ ) + { + if ( g_pPortalMCSocket->BindToAny( randomStream.RandomInt( 20000, 30000 ) ) ) + break; + } + if ( i == 5 ) + { + Error( "RunMPIPortalFlow: can't open a socket to multicast on." ); + } + + char cPacketID[2] = { VMPI_VVIS_PACKET_ID, VMPI_SUBPACKETID_MC_ADDR }; + VMPI_Send2Chunks( cPacketID, sizeof( cPacketID ), &g_PortalMCAddr, sizeof( g_PortalMCAddr ), VMPI_PERSISTENT ); + } + else + { + VMPI_SetCurrentStage( "wait for MC address" ); + + while ( !g_bGotMCAddr ) + { + VMPI_DispatchNextMessage(); + } + + // Open our multicast receive socket. + g_pPortalMCSocket = CreateMulticastListenSocket( g_PortalMCAddr ); + if ( !g_pPortalMCSocket ) + { + char err[512]; + IP_GetLastErrorString( err, sizeof( err ) ); + Error( "RunMPIPortalFlow: CreateMulticastListenSocket failed. (%s).", err ); + } + + // Make a thread to listen for the data on the multicast socket. + DWORD dwDummy = 0; + g_MCThreadExitEvent.Init( false, false ); + + // Make sure we kill the MC thread if the app exits ungracefully. + CmdLib_AtCleanup( MCThreadCleanupFn ); + + g_hMCThread = CreateThread( + NULL, + 0, + PortalMCThreadFn, + NULL, + 0, + &dwDummy ); + + if ( !g_hMCThread ) + { + Error( "RunMPIPortalFlow: CreateThread failed for multicast receive thread." ); + } + } + + VMPI_SetCurrentStage( "RunMPIBasePortalFlow" ); + + + g_pDistributeWorkCallbacks = &g_VisDistributeWorkCallbacks; + + g_CPUTime.Init(); + double elapsed = DistributeWork( + g_numportals * 2, // # work units + VMPI_DISTRIBUTEWORK_PACKETID, // packet ID + ProcessPortalFlow, // Worker function to process work units + ReceivePortalFlow // Master function to receive work results + ); + + g_pDistributeWorkCallbacks = NULL; + + CheckExitedEarly(); + + // Stop the multicast stuff. + VMPI_DeletePortalMCSocket(); + + if( !g_bMPIMaster ) + { + if ( g_iVMPIVerboseLevel >= 1 ) + { + Msg( "Received %d (out of %d) portals from multicast.\n", g_nMulticastPortalsReceived, g_numportals * 2 ); + Msg( "%.1f%% CPU utilization during PortalFlow\n", (g_CPUTime.GetSeconds() * 100.0f / elapsed) / numthreads ); + } + + Msg( "VVIS worker finished. Over and out.\n" ); + VMPI_SetCurrentStage( "worker done" ); + + CmdLib_Exit( 0 ); + } + + if ( g_bMPIMaster ) + { + EndPacifier( false ); + Msg( " (%d)\n", (int)elapsed ); + } +} + |