summaryrefslogtreecommitdiff
path: root/utils/vmpi/vmpi_distribute_work_sdk.cpp
diff options
context:
space:
mode:
authorFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
committerFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
commit3bf9df6b2785fa6d951086978a3e66f49427166a (patch)
tree2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi_distribute_work_sdk.cpp
downloadarchived-source-engine-2018-hl2-src-master.tar.xz
archived-source-engine-2018-hl2-src-master.zip
Diffstat (limited to 'utils/vmpi/vmpi_distribute_work_sdk.cpp')
-rw-r--r--utils/vmpi/vmpi_distribute_work_sdk.cpp699
1 files changed, 699 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_distribute_work_sdk.cpp b/utils/vmpi/vmpi_distribute_work_sdk.cpp
new file mode 100644
index 0000000..71b03e8
--- /dev/null
+++ b/utils/vmpi/vmpi_distribute_work_sdk.cpp
@@ -0,0 +1,699 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+//=============================================================================
+
+#include "vmpi.h"
+#include "vmpi_distribute_work.h"
+#include "tier0/platform.h"
+#include "tier0/dbg.h"
+#include "utlvector.h"
+#include "utllinkedlist.h"
+#include "vmpi_dispatch.h"
+#include "pacifier.h"
+#include "vstdlib/random.h"
+#include "mathlib/mathlib.h"
+#include "threadhelpers.h"
+#include "threads.h"
+#include "tier1/strtools.h"
+#include "tier1/utlmap.h"
+#include "tier1/smartptr.h"
+#include "tier0/icommandline.h"
+#include "cmdlib.h"
+#include "vmpi_distribute_tracker.h"
+#include "vmpi_distribute_work_internal.h"
+
+
+
+#define DW_SUBPACKETID_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0)
+#define DW_SUBPACKETID_REQUEST_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+1)
+#define DW_SUBPACKETID_WUS_COMPLETED_LIST (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+2)
+
+
+
+// This is a pretty simple iterator. Basically, it holds a matrix of numbers.
+// Each row is assigned to a worker, and the worker just walks through his row.
+//
+// When a worker reaches the end of his row, it gets a little trickier.
+// They'll start doing their neighbor's row
+// starting at the back and continue on. At about this time, the master should reshuffle the
+// remaining work units to evenly distribute them amongst the workers.
+class CWorkUnitWalker
+{
+public:
+ CWorkUnitWalker()
+ {
+ m_nWorkUnits = 0;
+ }
+
+ // This is all that's needed for it to start assigning work units.
+ void Init( WUIndexType matrixWidth, WUIndexType matrixHeight, WUIndexType nWorkUnits )
+ {
+ m_nWorkUnits = nWorkUnits;
+ m_MatrixWidth = matrixWidth;
+ m_MatrixHeight = matrixHeight;
+ Assert( m_MatrixWidth * m_MatrixHeight >= nWorkUnits );
+
+ m_WorkerInfos.RemoveAll();
+ m_WorkerInfos.EnsureCount( m_MatrixHeight );
+ for ( int i=0; i < m_MatrixHeight; i++ )
+ {
+ m_WorkerInfos[i].m_iStartWorkUnit = matrixWidth * i;
+ m_WorkerInfos[i].m_iWorkUnitOffset = 0;
+ }
+ }
+
+ // This is the main function of the shuffler
+ bool GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex, bool *bWorkerFinishedHisColumn )
+ {
+ if ( iWorker < 0 || iWorker >= m_WorkerInfos.Count() )
+ {
+ Assert( false );
+ return false;
+ }
+
+ // If this worker has walked through all the work units, then he's done.
+ CWorkerInfo *pWorker = &m_WorkerInfos[iWorker];
+ if ( pWorker->m_iWorkUnitOffset >= m_nWorkUnits )
+ return false;
+
+ // If we've gone past the end of our work unit list, then we start at the BACK of the other rows of work units
+ // in the hopes that we won't collide with the guy working there. We also should tell the master to reshuffle.
+ WUIndexType iWorkUnitOffset = pWorker->m_iWorkUnitOffset;
+ if ( iWorkUnitOffset >= m_MatrixWidth )
+ {
+ WUIndexType xOffset = iWorkUnitOffset % m_MatrixWidth;
+ WUIndexType yOffset = iWorkUnitOffset / m_MatrixWidth;
+ xOffset = m_MatrixWidth - xOffset - 1;
+ iWorkUnitOffset = yOffset * m_MatrixWidth + xOffset;
+ *bWorkerFinishedHisColumn = true;
+ }
+ else
+ {
+ *bWorkerFinishedHisColumn = false;
+ }
+
+ *pWUIndex = (pWorker->m_iStartWorkUnit + iWorkUnitOffset) % m_nWorkUnits;
+ ++pWorker->m_iWorkUnitOffset;
+ return true;
+ }
+
+
+private:
+ class CWorkerInfo
+ {
+ public:
+ WUIndexType m_iStartWorkUnit;
+ WUIndexType m_iWorkUnitOffset; // Which work unit in my list of work units am I working on?
+ };
+
+ WUIndexType m_nWorkUnits;
+ WUIndexType m_MatrixWidth;
+ WUIndexType m_MatrixHeight;
+ CUtlVector<CWorkerInfo> m_WorkerInfos;
+};
+
+
+class IShuffleRequester
+{
+public:
+ virtual void RequestShuffle() = 0;
+};
+
+
+// This is updated every time the master decides to reshuffle.
+// In-between shuffles, you can call NoteWorkUnitCompleted when a work unit is completed
+// and it'll avoid returning that work unit from GetNextWorkUnit again, but it WON'T
+class CShuffledWorkUnitWalker
+{
+public:
+ void Init( WUIndexType nWorkUnits, IShuffleRequester *pRequester )
+ {
+ m_iLastShuffleRequest = 0;
+ m_iCurShuffle = 1;
+ m_flLastShuffleTime = Plat_FloatTime();
+ m_pShuffleRequester = pRequester;
+
+ int nBytes = PAD_NUMBER( nWorkUnits, 8 ) / 8;
+ m_CompletedWUBits.SetSize( nBytes );
+ m_LocalCompletedWUBits.SetSize( nBytes );
+ for ( WUIndexType i=0; i < m_CompletedWUBits.Count(); i++ )
+ m_LocalCompletedWUBits[i] = m_CompletedWUBits[i] = 0;
+
+ // Setup our list of work units remaining.
+ for ( WUIndexType iWU=0; iWU < nWorkUnits; iWU++ )
+ {
+ // Note: we're making an assumption here that if we add entries to a CUtlLinkedList in ascending order, their indices
+ // will be ascending 1-by-1 as well. If that assumption breaks, we can create an extra array here to map WU indices to the linked list indices.
+ WUIndexType index = m_WorkUnitsRemaining.AddToTail( iWU );
+ if ( index != iWU )
+ {
+ Error( "CShuffledWorkUnitWalker: assumption on CUtlLinkedList indexing failed.\n" );
+ }
+ }
+ }
+
+ void Shuffle( int nWorkers )
+ {
+ if ( nWorkers == 0 )
+ return;
+
+ ++m_iCurShuffle;
+ m_flLastShuffleTime = Plat_FloatTime();
+
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+
+ m_WorkUnitsMap.RemoveAll();
+ m_WorkUnitsMap.EnsureCount( m_WorkUnitsRemaining.Count() );
+
+ // Here's the shuffle. The CWorkUnitWalker is going to walk each worker through its own group from 0-W,
+ // and our job is to interleave it so when worker 0 goes [0,1,2] and worker 1 goes [100,101,102], they're actually
+ // doing [0,N,2N] and [1,N+1,2N+1] where N=# of workers.
+
+ // The grid is RxW long, and R*W is >= nWorkUnits.
+ // R = # units per worker = width of the matrix
+ // W = # workers = height of the matrix
+ WUIndexType matrixHeight = nWorkers;
+ WUIndexType matrixWidth = m_WorkUnitsRemaining.Count() / matrixHeight;
+ if ( (m_WorkUnitsRemaining.Count() % matrixHeight) != 0 )
+ ++matrixWidth;
+
+ Assert( matrixWidth * matrixHeight >= m_WorkUnitsRemaining.Count() );
+
+ WUIndexType iWorkUnit = 0;
+ FOR_EACH_LL( m_WorkUnitsRemaining, i )
+ {
+ WUIndexType xCoord = iWorkUnit / matrixHeight;
+ WUIndexType yCoord = iWorkUnit % matrixHeight;
+ Assert( xCoord < matrixWidth );
+ Assert( yCoord < matrixHeight );
+
+ m_WorkUnitsMap[yCoord*matrixWidth+xCoord] = m_WorkUnitsRemaining[i];
+ ++iWorkUnit;
+ }
+
+ m_Walker.Init( matrixWidth, matrixHeight, m_WorkUnitsRemaining.Count() );
+ }
+
+ // Threadsafe.
+ bool Thread_IsWorkUnitCompleted( WUIndexType iWU )
+ {
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+
+ byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7));
+ return (val != 0);
+ }
+
+ WUIndexType Thread_NumWorkUnitsRemaining()
+ {
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+
+ return m_WorkUnitsRemaining.Count();
+ }
+
+ bool Thread_GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex )
+ {
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+
+ while ( 1 )
+ {
+ WUIndexType iUnmappedWorkUnit;
+ bool bWorkerFinishedHisColumn;
+ if ( !m_Walker.GetNextWorkUnit( iWorker, &iUnmappedWorkUnit, &bWorkerFinishedHisColumn ) )
+ return false;
+
+ // If we've done all the work units assigned to us in the last shuffle, then request a reshuffle.
+ if ( bWorkerFinishedHisColumn )
+ HandleWorkerFinishedColumn();
+
+ // Check the pending list.
+ *pWUIndex = m_WorkUnitsMap[iUnmappedWorkUnit];
+ byte bIsCompleted = m_CompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7));
+ byte bIsCompletedLocally = m_LocalCompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7));
+ if ( !bIsCompleted && !bIsCompletedLocally )
+ return true;
+ }
+ }
+
+ void HandleWorkerFinishedColumn()
+ {
+ if ( m_iLastShuffleRequest != m_iCurShuffle )
+ {
+ double flCurTime = Plat_FloatTime();
+ if ( flCurTime - m_flLastShuffleTime > 2.0f )
+ {
+ m_pShuffleRequester->RequestShuffle();
+ m_iLastShuffleRequest = m_iCurShuffle;
+ }
+ }
+ }
+
+ void Thread_NoteWorkUnitCompleted( WUIndexType iWU )
+ {
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+
+ byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7));
+ if ( val == 0 )
+ {
+ m_WorkUnitsRemaining.Remove( iWU );
+ m_CompletedWUBits[iWU >> 3] |= (1 << (iWU & 7));
+ }
+ }
+
+ void Thread_NoteLocalWorkUnitCompleted( WUIndexType iWU )
+ {
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+ m_LocalCompletedWUBits[iWU >> 3] |= (1 << (iWU & 7));
+ }
+
+ CRC32_t GetShuffleCRC()
+ {
+#ifdef _DEBUG
+ static bool bCalcShuffleCRC = true;
+#else
+ static bool bCalcShuffleCRC = VMPI_IsParamUsed( mpi_CalcShuffleCRC );
+#endif
+ if ( bCalcShuffleCRC )
+ {
+ CCriticalSectionLock csLock( &m_CS );
+ csLock.Lock();
+
+ CRC32_t ret;
+ CRC32_Init( &ret );
+
+ FOR_EACH_LL( m_WorkUnitsRemaining, i )
+ {
+ WUIndexType iWorkUnit = m_WorkUnitsRemaining[i];
+ CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) );
+ }
+
+ for ( int i=0; i < m_WorkUnitsMap.Count(); i++ )
+ {
+ WUIndexType iWorkUnit = m_WorkUnitsMap[i];
+ CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) );
+ }
+
+ CRC32_Final( &ret );
+ return ret;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+private:
+ // These are PENDING WU completions until we call Shuffle() again, at which point we actually reorder the list
+ // based on the completed WUs.
+ CUtlVector<byte> m_CompletedWUBits; // Bit vector of completed WUs.
+ CUtlLinkedList<WUIndexType, WUIndexType> m_WorkUnitsRemaining;
+ CUtlVector<WUIndexType> m_WorkUnitsMap; // Maps the 0-N indices in the CWorkUnitWalker to the list of remaining work units.
+
+ // Helps us avoid some duplicates that happen during shuffling if we've completed some WUs and sent them
+ // to the master, but the master hasn't included them in the DW_SUBPACKETID_WUS_COMPLETED_LIST yet.
+ CUtlVector<byte> m_LocalCompletedWUBits; // Bit vector of completed WUs.
+
+ // Used to control how frequently we request a reshuffle.
+ unsigned int m_iCurShuffle;
+ unsigned int m_iLastShuffleRequest; // The index of the shuffle we last requested a reshuffle on (don't request a reshuffle on the same one).
+ double m_flLastShuffleTime;
+ IShuffleRequester *m_pShuffleRequester;
+
+ CWorkUnitWalker m_Walker;
+ CCriticalSection m_CS;
+};
+
+
+
+class CDistributor_SDKMaster : public IWorkUnitDistributorMaster, public IShuffleRequester
+{
+public:
+ virtual void Release()
+ {
+ delete this;
+ }
+
+ static void Master_WorkerThread_Static( int iThread, void *pUserData )
+ {
+ ((CDistributor_SDKMaster*)pUserData)->Master_WorkerThread( iThread );
+ }
+
+ void Master_WorkerThread( int iThread )
+ {
+ while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 && !g_bVMPIEarlyExit )
+ {
+ WUIndexType iWU;
+ if ( !m_WorkUnitWalker.Thread_GetNextWorkUnit( 0, &iWU ) )
+ {
+ // Wait until there are some WUs to do.
+ VMPI_Sleep( 10 );
+ continue;
+ }
+
+ // Do this work unit.
+ m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); // We do this before it's completed because otherwise if a Shuffle() occurs,
+ // the other thread might happen to pickup this work unit and we don't want that.
+ m_pInfo->m_WorkerInfo.m_pProcessFn( iThread, iWU, NULL );
+ NotifyLocalMasterCompletedWorkUnit( iWU );
+ }
+ }
+
+ virtual void DistributeWork_Master( CDSInfo *pInfo )
+ {
+ m_pInfo = pInfo;
+ m_bForceShuffle = false;
+ m_bShuffleRequested = false;
+ m_flLastShuffleRequestServiceTime = Plat_FloatTime();
+
+ // Spawn idle-priority worker threads right here.
+ m_bUsingMasterLocalThreads = (pInfo->m_WorkerInfo.m_pProcessFn != 0);
+ if ( VMPI_IsParamUsed( mpi_NoMasterWorkerThreads ) )
+ {
+ Msg( "%s found. No worker threads will be created.\n", VMPI_GetParamString( mpi_NoMasterWorkerThreads ) );
+ m_bUsingMasterLocalThreads = false;
+ }
+ m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this );
+ Shuffle();
+
+ if ( m_bUsingMasterLocalThreads )
+ RunThreads_Start( Master_WorkerThread_Static, this, k_eRunThreadsPriority_Idle );
+
+ uint64 lastShuffleTime = Plat_MSTime();
+ while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 )
+ {
+ VMPI_DispatchNextMessage( 200 );
+ CheckLocalMasterCompletedWorkUnits();
+
+ VMPITracker_HandleDebugKeypresses();
+ if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() )
+ break;
+
+ // Reshuffle the work units optimally every certain interval.
+ if ( m_bForceShuffle || CheckShuffleRequest() )
+ {
+ Shuffle();
+ lastShuffleTime = Plat_MSTime();
+ m_bForceShuffle = false;
+ }
+ }
+
+ RunThreads_End();
+ }
+
+ virtual void RequestShuffle()
+ {
+ m_bShuffleRequested = true;
+ }
+
+ bool CheckShuffleRequest()
+ {
+ if ( m_bShuffleRequested )
+ {
+ double flCurTime = Plat_FloatTime();
+ if ( flCurTime - m_flLastShuffleRequestServiceTime > 2.0f ) // Only handle shuffle requests every so often.
+ {
+ m_flLastShuffleRequestServiceTime = flCurTime;
+ m_bShuffleRequested = false;
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ void Shuffle()
+ {
+ // Build a list of who's working.
+ CUtlVector<unsigned short> whosWorking;
+ if ( m_bUsingMasterLocalThreads )
+ {
+ whosWorking.AddToTail( VMPI_MASTER_ID );
+ Assert( VMPI_MASTER_ID == 0 );
+ }
+
+ {
+ CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock();
+ for ( int i=0; i < pWorkersReady->m_WorkersReady.Count(); i++ )
+ {
+ int iWorker = pWorkersReady->m_WorkersReady[i];
+ if ( VMPI_IsProcConnected( iWorker ) )
+ whosWorking.AddToTail( iWorker );
+ }
+ m_WorkersReadyCS.Unlock();
+ }
+
+ // Before sending the shuffle command, tell any of these active workers about the pending WUs completed.
+ CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock();
+
+ m_WUSCompletedMessageBuffer.setLen( 0 );
+ if ( BuildWUsCompletedMessage( pWUsCompleted->m_Pending, m_WUSCompletedMessageBuffer ) > 0 )
+ {
+ for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ )
+ {
+ VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), whosWorking[i] );
+ }
+ }
+ pWUsCompleted->m_Completed.AddMultipleToTail( pWUsCompleted->m_Pending.Count(), pWUsCompleted->m_Pending.Base() ); // Add the pending ones to the full list now.
+ pWUsCompleted->m_Pending.RemoveAll();
+
+ m_WUsCompletedCS.Unlock();
+
+ // Shuffle ourselves.
+ m_WorkUnitWalker.Shuffle( whosWorking.Count() );
+
+ // Send the shuffle command to the workers.
+ MessageBuffer mb;
+ PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_SHUFFLE );
+
+ unsigned short nWorkers = whosWorking.Count();
+ mb.write( &nWorkers, sizeof( nWorkers ) );
+
+ CRC32_t shuffleCRC = m_WorkUnitWalker.GetShuffleCRC();
+ mb.write( &shuffleCRC, sizeof( shuffleCRC ) );
+
+ // Now for each worker, assign him an index in the shuffle and send the shuffle command.
+ int workerIDPos = mb.getLen();
+ unsigned short id = 0;
+ mb.write( &id, sizeof( id ) );
+ for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ )
+ {
+ id = (unsigned short)i;
+ mb.update( workerIDPos, &id, sizeof( id ) );
+ VMPI_SendData( mb.data, mb.getLen(), whosWorking[i] );
+ }
+ }
+
+ int BuildWUsCompletedMessage( CUtlVector<WUIndexType> &wusCompleted, MessageBuffer &mb )
+ {
+ PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WUS_COMPLETED_LIST );
+ m_pInfo->WriteWUIndex( wusCompleted.Count(), &mb );
+ for ( int i=0; i < wusCompleted.Count(); i++ )
+ {
+ m_pInfo->WriteWUIndex( wusCompleted[i], &mb );
+ }
+ return wusCompleted.Count();
+ }
+
+ virtual void OnWorkerReady( int iSource )
+ {
+ CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock();
+ if ( pWorkersReady->m_WorkersReady.Find( iSource ) == -1 )
+ {
+ pWorkersReady->m_WorkersReady.AddToTail( iSource );
+
+ // Get this guy up to speed on which WUs are done.
+ {
+ CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock();
+ m_WUSCompletedMessageBuffer.setLen( 0 );
+ BuildWUsCompletedMessage( pWUsCompleted->m_Completed, m_WUSCompletedMessageBuffer );
+ m_WUsCompletedCS.Unlock();
+ }
+
+ VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), iSource );
+ m_bForceShuffle = true;
+ }
+ m_WorkersReadyCS.Unlock();
+ }
+
+ virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit )
+ {
+ return Thread_HandleWorkUnitResults( iWorkUnit );
+ }
+
+ bool Thread_HandleWorkUnitResults( WUIndexType iWorkUnit )
+ {
+ if ( m_WorkUnitWalker.Thread_IsWorkUnitCompleted( iWorkUnit ) )
+ {
+ return false;
+ }
+ else
+ {
+ m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWorkUnit );
+
+ // We need the lock on here because our own worker threads can call into here.
+ CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock();
+ pWUsCompleted->m_Pending.AddToTail( iWorkUnit );
+ m_WUsCompletedCS.Unlock();
+ return true;
+ }
+ }
+
+ virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents )
+ {
+ if ( pBuf->data[1] == DW_SUBPACKETID_REQUEST_SHUFFLE )
+ {
+ if ( bIgnoreContents )
+ return true;
+
+ m_bShuffleRequested = true;
+ }
+ return false;
+ }
+
+ virtual void DisconnectHandler( int workerID )
+ {
+ CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock();
+
+ if ( pWorkersReady->m_WorkersReady.Find( workerID ) != -1 )
+ m_bForceShuffle = true;
+
+ m_WorkersReadyCS.Unlock();
+ }
+
+public:
+ CDSInfo *m_pInfo;
+
+ class CWorkersReady
+ {
+ public:
+ CUtlVector<int> m_WorkersReady; // The list of workers who have said they're ready to participate.
+ };
+ CCriticalSectionData<CWorkersReady> m_WorkersReadyCS;
+
+ class CWUsCompleted
+ {
+ public:
+ CUtlVector<WUIndexType> m_Completed; // WUs completed that we have sent to workers.
+ CUtlVector<WUIndexType> m_Pending; // WUs completed that we haven't sent to workers.
+ };
+ CCriticalSectionData<CWUsCompleted> m_WUsCompletedCS;
+ MessageBuffer m_WUSCompletedMessageBuffer; // Used to send lists of completed WUs.
+ int m_bUsingMasterLocalThreads;
+
+ bool m_bForceShuffle;
+ bool m_bShuffleRequested;
+ double m_flLastShuffleRequestServiceTime;
+
+ CShuffledWorkUnitWalker m_WorkUnitWalker;
+};
+
+
+class CDistributor_SDKWorker : public IWorkUnitDistributorWorker, public IShuffleRequester
+{
+public:
+ virtual void Init( CDSInfo *pInfo )
+ {
+ m_iMyWorkUnitWalkerID = -1;
+ m_pInfo = pInfo;
+ m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this );
+ }
+
+ virtual void Release()
+ {
+ delete this;
+ }
+
+ virtual bool GetNextWorkUnit( WUIndexType *pWUIndex )
+ {
+ // If we don't have an ID yet, we haven't received a Shuffle() command, so we're waiting for that before working.
+ // TODO: we could do some random WUs here while we're waiting, although that could suck if the WUs take forever to do
+ // and they're duplicates.
+ if ( m_iMyWorkUnitWalkerID == -1 )
+ return false;
+
+ // Look in our current shuffled list of work units for the next one.
+ return m_WorkUnitWalker.Thread_GetNextWorkUnit( m_iMyWorkUnitWalkerID, pWUIndex );
+ }
+
+ virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU )
+ {
+ m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU );
+ }
+
+ virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents )
+ {
+ // If it's a SHUFFLE message, then shuffle..
+ if ( pBuf->data[1] == DW_SUBPACKETID_SHUFFLE )
+ {
+ if ( bIgnoreContents )
+ return true;
+
+ unsigned short nWorkers, myID;
+ CRC32_t shuffleCRC;
+ pBuf->read( &nWorkers, sizeof( nWorkers ) );
+ pBuf->read( &shuffleCRC, sizeof( shuffleCRC ) );
+ pBuf->read( &myID, sizeof( myID ) );
+ m_iMyWorkUnitWalkerID = myID;
+
+ m_WorkUnitWalker.Shuffle( nWorkers );
+ if ( m_WorkUnitWalker.GetShuffleCRC() != shuffleCRC )
+ {
+ static int nWarnings = 1;
+ if ( ++nWarnings <= 2 )
+ Warning( "\nShuffle CRC mismatch\n" );
+ }
+ return true;
+ }
+ else if ( pBuf->data[1] == DW_SUBPACKETID_WUS_COMPLETED_LIST )
+ {
+ if ( bIgnoreContents )
+ return true;
+
+ WUIndexType nCompleted;
+ m_pInfo->ReadWUIndex( &nCompleted, pBuf );
+ for ( WUIndexType i=0; i < nCompleted; i++ )
+ {
+ WUIndexType iWU;
+ m_pInfo->ReadWUIndex( &iWU, pBuf );
+ m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWU );
+ }
+
+ return true;
+ }
+
+ return false;
+ }
+
+ virtual void RequestShuffle()
+ {
+ // Ok.. request a reshuffle.
+ MessageBuffer mb;
+ PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_REQUEST_SHUFFLE );
+ VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID );
+ }
+
+private:
+ CDSInfo *m_pInfo;
+ CShuffledWorkUnitWalker m_WorkUnitWalker;
+ int m_iMyWorkUnitWalkerID;
+};
+
+
+
+IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster()
+{
+ return new CDistributor_SDKMaster;
+}
+
+IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker()
+{
+ return new CDistributor_SDKWorker;
+}
+