diff options
| author | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
|---|---|---|
| committer | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
| commit | 3bf9df6b2785fa6d951086978a3e66f49427166a (patch) | |
| tree | 2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi_distribute_work_sdk.cpp | |
| download | archived-source-engine-2018-hl2-src-master.tar.xz archived-source-engine-2018-hl2-src-master.zip | |
Diffstat (limited to 'utils/vmpi/vmpi_distribute_work_sdk.cpp')
| -rw-r--r-- | utils/vmpi/vmpi_distribute_work_sdk.cpp | 699 |
1 files changed, 699 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_distribute_work_sdk.cpp b/utils/vmpi/vmpi_distribute_work_sdk.cpp new file mode 100644 index 0000000..71b03e8 --- /dev/null +++ b/utils/vmpi/vmpi_distribute_work_sdk.cpp @@ -0,0 +1,699 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +//============================================================================= + +#include "vmpi.h" +#include "vmpi_distribute_work.h" +#include "tier0/platform.h" +#include "tier0/dbg.h" +#include "utlvector.h" +#include "utllinkedlist.h" +#include "vmpi_dispatch.h" +#include "pacifier.h" +#include "vstdlib/random.h" +#include "mathlib/mathlib.h" +#include "threadhelpers.h" +#include "threads.h" +#include "tier1/strtools.h" +#include "tier1/utlmap.h" +#include "tier1/smartptr.h" +#include "tier0/icommandline.h" +#include "cmdlib.h" +#include "vmpi_distribute_tracker.h" +#include "vmpi_distribute_work_internal.h" + + + +#define DW_SUBPACKETID_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0) +#define DW_SUBPACKETID_REQUEST_SHUFFLE (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+1) +#define DW_SUBPACKETID_WUS_COMPLETED_LIST (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+2) + + + +// This is a pretty simple iterator. Basically, it holds a matrix of numbers. +// Each row is assigned to a worker, and the worker just walks through his row. +// +// When a worker reaches the end of his row, it gets a little trickier. +// They'll start doing their neighbor's row +// starting at the back and continue on. At about this time, the master should reshuffle the +// remaining work units to evenly distribute them amongst the workers. +class CWorkUnitWalker +{ +public: + CWorkUnitWalker() + { + m_nWorkUnits = 0; + } + + // This is all that's needed for it to start assigning work units. + void Init( WUIndexType matrixWidth, WUIndexType matrixHeight, WUIndexType nWorkUnits ) + { + m_nWorkUnits = nWorkUnits; + m_MatrixWidth = matrixWidth; + m_MatrixHeight = matrixHeight; + Assert( m_MatrixWidth * m_MatrixHeight >= nWorkUnits ); + + m_WorkerInfos.RemoveAll(); + m_WorkerInfos.EnsureCount( m_MatrixHeight ); + for ( int i=0; i < m_MatrixHeight; i++ ) + { + m_WorkerInfos[i].m_iStartWorkUnit = matrixWidth * i; + m_WorkerInfos[i].m_iWorkUnitOffset = 0; + } + } + + // This is the main function of the shuffler + bool GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex, bool *bWorkerFinishedHisColumn ) + { + if ( iWorker < 0 || iWorker >= m_WorkerInfos.Count() ) + { + Assert( false ); + return false; + } + + // If this worker has walked through all the work units, then he's done. + CWorkerInfo *pWorker = &m_WorkerInfos[iWorker]; + if ( pWorker->m_iWorkUnitOffset >= m_nWorkUnits ) + return false; + + // If we've gone past the end of our work unit list, then we start at the BACK of the other rows of work units + // in the hopes that we won't collide with the guy working there. We also should tell the master to reshuffle. + WUIndexType iWorkUnitOffset = pWorker->m_iWorkUnitOffset; + if ( iWorkUnitOffset >= m_MatrixWidth ) + { + WUIndexType xOffset = iWorkUnitOffset % m_MatrixWidth; + WUIndexType yOffset = iWorkUnitOffset / m_MatrixWidth; + xOffset = m_MatrixWidth - xOffset - 1; + iWorkUnitOffset = yOffset * m_MatrixWidth + xOffset; + *bWorkerFinishedHisColumn = true; + } + else + { + *bWorkerFinishedHisColumn = false; + } + + *pWUIndex = (pWorker->m_iStartWorkUnit + iWorkUnitOffset) % m_nWorkUnits; + ++pWorker->m_iWorkUnitOffset; + return true; + } + + +private: + class CWorkerInfo + { + public: + WUIndexType m_iStartWorkUnit; + WUIndexType m_iWorkUnitOffset; // Which work unit in my list of work units am I working on? + }; + + WUIndexType m_nWorkUnits; + WUIndexType m_MatrixWidth; + WUIndexType m_MatrixHeight; + CUtlVector<CWorkerInfo> m_WorkerInfos; +}; + + +class IShuffleRequester +{ +public: + virtual void RequestShuffle() = 0; +}; + + +// This is updated every time the master decides to reshuffle. +// In-between shuffles, you can call NoteWorkUnitCompleted when a work unit is completed +// and it'll avoid returning that work unit from GetNextWorkUnit again, but it WON'T +class CShuffledWorkUnitWalker +{ +public: + void Init( WUIndexType nWorkUnits, IShuffleRequester *pRequester ) + { + m_iLastShuffleRequest = 0; + m_iCurShuffle = 1; + m_flLastShuffleTime = Plat_FloatTime(); + m_pShuffleRequester = pRequester; + + int nBytes = PAD_NUMBER( nWorkUnits, 8 ) / 8; + m_CompletedWUBits.SetSize( nBytes ); + m_LocalCompletedWUBits.SetSize( nBytes ); + for ( WUIndexType i=0; i < m_CompletedWUBits.Count(); i++ ) + m_LocalCompletedWUBits[i] = m_CompletedWUBits[i] = 0; + + // Setup our list of work units remaining. + for ( WUIndexType iWU=0; iWU < nWorkUnits; iWU++ ) + { + // Note: we're making an assumption here that if we add entries to a CUtlLinkedList in ascending order, their indices + // will be ascending 1-by-1 as well. If that assumption breaks, we can create an extra array here to map WU indices to the linked list indices. + WUIndexType index = m_WorkUnitsRemaining.AddToTail( iWU ); + if ( index != iWU ) + { + Error( "CShuffledWorkUnitWalker: assumption on CUtlLinkedList indexing failed.\n" ); + } + } + } + + void Shuffle( int nWorkers ) + { + if ( nWorkers == 0 ) + return; + + ++m_iCurShuffle; + m_flLastShuffleTime = Plat_FloatTime(); + + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + m_WorkUnitsMap.RemoveAll(); + m_WorkUnitsMap.EnsureCount( m_WorkUnitsRemaining.Count() ); + + // Here's the shuffle. The CWorkUnitWalker is going to walk each worker through its own group from 0-W, + // and our job is to interleave it so when worker 0 goes [0,1,2] and worker 1 goes [100,101,102], they're actually + // doing [0,N,2N] and [1,N+1,2N+1] where N=# of workers. + + // The grid is RxW long, and R*W is >= nWorkUnits. + // R = # units per worker = width of the matrix + // W = # workers = height of the matrix + WUIndexType matrixHeight = nWorkers; + WUIndexType matrixWidth = m_WorkUnitsRemaining.Count() / matrixHeight; + if ( (m_WorkUnitsRemaining.Count() % matrixHeight) != 0 ) + ++matrixWidth; + + Assert( matrixWidth * matrixHeight >= m_WorkUnitsRemaining.Count() ); + + WUIndexType iWorkUnit = 0; + FOR_EACH_LL( m_WorkUnitsRemaining, i ) + { + WUIndexType xCoord = iWorkUnit / matrixHeight; + WUIndexType yCoord = iWorkUnit % matrixHeight; + Assert( xCoord < matrixWidth ); + Assert( yCoord < matrixHeight ); + + m_WorkUnitsMap[yCoord*matrixWidth+xCoord] = m_WorkUnitsRemaining[i]; + ++iWorkUnit; + } + + m_Walker.Init( matrixWidth, matrixHeight, m_WorkUnitsRemaining.Count() ); + } + + // Threadsafe. + bool Thread_IsWorkUnitCompleted( WUIndexType iWU ) + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); + return (val != 0); + } + + WUIndexType Thread_NumWorkUnitsRemaining() + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + return m_WorkUnitsRemaining.Count(); + } + + bool Thread_GetNextWorkUnit( int iWorker, WUIndexType *pWUIndex ) + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + while ( 1 ) + { + WUIndexType iUnmappedWorkUnit; + bool bWorkerFinishedHisColumn; + if ( !m_Walker.GetNextWorkUnit( iWorker, &iUnmappedWorkUnit, &bWorkerFinishedHisColumn ) ) + return false; + + // If we've done all the work units assigned to us in the last shuffle, then request a reshuffle. + if ( bWorkerFinishedHisColumn ) + HandleWorkerFinishedColumn(); + + // Check the pending list. + *pWUIndex = m_WorkUnitsMap[iUnmappedWorkUnit]; + byte bIsCompleted = m_CompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); + byte bIsCompletedLocally = m_LocalCompletedWUBits[*pWUIndex >> 3] & (1 << (*pWUIndex & 7)); + if ( !bIsCompleted && !bIsCompletedLocally ) + return true; + } + } + + void HandleWorkerFinishedColumn() + { + if ( m_iLastShuffleRequest != m_iCurShuffle ) + { + double flCurTime = Plat_FloatTime(); + if ( flCurTime - m_flLastShuffleTime > 2.0f ) + { + m_pShuffleRequester->RequestShuffle(); + m_iLastShuffleRequest = m_iCurShuffle; + } + } + } + + void Thread_NoteWorkUnitCompleted( WUIndexType iWU ) + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + byte val = m_CompletedWUBits[iWU >> 3] & (1 << (iWU & 7)); + if ( val == 0 ) + { + m_WorkUnitsRemaining.Remove( iWU ); + m_CompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); + } + } + + void Thread_NoteLocalWorkUnitCompleted( WUIndexType iWU ) + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + m_LocalCompletedWUBits[iWU >> 3] |= (1 << (iWU & 7)); + } + + CRC32_t GetShuffleCRC() + { +#ifdef _DEBUG + static bool bCalcShuffleCRC = true; +#else + static bool bCalcShuffleCRC = VMPI_IsParamUsed( mpi_CalcShuffleCRC ); +#endif + if ( bCalcShuffleCRC ) + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + CRC32_t ret; + CRC32_Init( &ret ); + + FOR_EACH_LL( m_WorkUnitsRemaining, i ) + { + WUIndexType iWorkUnit = m_WorkUnitsRemaining[i]; + CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); + } + + for ( int i=0; i < m_WorkUnitsMap.Count(); i++ ) + { + WUIndexType iWorkUnit = m_WorkUnitsMap[i]; + CRC32_ProcessBuffer( &ret, &iWorkUnit, sizeof( iWorkUnit ) ); + } + + CRC32_Final( &ret ); + return ret; + } + else + { + return false; + } + } + +private: + // These are PENDING WU completions until we call Shuffle() again, at which point we actually reorder the list + // based on the completed WUs. + CUtlVector<byte> m_CompletedWUBits; // Bit vector of completed WUs. + CUtlLinkedList<WUIndexType, WUIndexType> m_WorkUnitsRemaining; + CUtlVector<WUIndexType> m_WorkUnitsMap; // Maps the 0-N indices in the CWorkUnitWalker to the list of remaining work units. + + // Helps us avoid some duplicates that happen during shuffling if we've completed some WUs and sent them + // to the master, but the master hasn't included them in the DW_SUBPACKETID_WUS_COMPLETED_LIST yet. + CUtlVector<byte> m_LocalCompletedWUBits; // Bit vector of completed WUs. + + // Used to control how frequently we request a reshuffle. + unsigned int m_iCurShuffle; + unsigned int m_iLastShuffleRequest; // The index of the shuffle we last requested a reshuffle on (don't request a reshuffle on the same one). + double m_flLastShuffleTime; + IShuffleRequester *m_pShuffleRequester; + + CWorkUnitWalker m_Walker; + CCriticalSection m_CS; +}; + + + +class CDistributor_SDKMaster : public IWorkUnitDistributorMaster, public IShuffleRequester +{ +public: + virtual void Release() + { + delete this; + } + + static void Master_WorkerThread_Static( int iThread, void *pUserData ) + { + ((CDistributor_SDKMaster*)pUserData)->Master_WorkerThread( iThread ); + } + + void Master_WorkerThread( int iThread ) + { + while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 && !g_bVMPIEarlyExit ) + { + WUIndexType iWU; + if ( !m_WorkUnitWalker.Thread_GetNextWorkUnit( 0, &iWU ) ) + { + // Wait until there are some WUs to do. + VMPI_Sleep( 10 ); + continue; + } + + // Do this work unit. + m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); // We do this before it's completed because otherwise if a Shuffle() occurs, + // the other thread might happen to pickup this work unit and we don't want that. + m_pInfo->m_WorkerInfo.m_pProcessFn( iThread, iWU, NULL ); + NotifyLocalMasterCompletedWorkUnit( iWU ); + } + } + + virtual void DistributeWork_Master( CDSInfo *pInfo ) + { + m_pInfo = pInfo; + m_bForceShuffle = false; + m_bShuffleRequested = false; + m_flLastShuffleRequestServiceTime = Plat_FloatTime(); + + // Spawn idle-priority worker threads right here. + m_bUsingMasterLocalThreads = (pInfo->m_WorkerInfo.m_pProcessFn != 0); + if ( VMPI_IsParamUsed( mpi_NoMasterWorkerThreads ) ) + { + Msg( "%s found. No worker threads will be created.\n", VMPI_GetParamString( mpi_NoMasterWorkerThreads ) ); + m_bUsingMasterLocalThreads = false; + } + m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); + Shuffle(); + + if ( m_bUsingMasterLocalThreads ) + RunThreads_Start( Master_WorkerThread_Static, this, k_eRunThreadsPriority_Idle ); + + uint64 lastShuffleTime = Plat_MSTime(); + while ( m_WorkUnitWalker.Thread_NumWorkUnitsRemaining() > 0 ) + { + VMPI_DispatchNextMessage( 200 ); + CheckLocalMasterCompletedWorkUnits(); + + VMPITracker_HandleDebugKeypresses(); + if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) + break; + + // Reshuffle the work units optimally every certain interval. + if ( m_bForceShuffle || CheckShuffleRequest() ) + { + Shuffle(); + lastShuffleTime = Plat_MSTime(); + m_bForceShuffle = false; + } + } + + RunThreads_End(); + } + + virtual void RequestShuffle() + { + m_bShuffleRequested = true; + } + + bool CheckShuffleRequest() + { + if ( m_bShuffleRequested ) + { + double flCurTime = Plat_FloatTime(); + if ( flCurTime - m_flLastShuffleRequestServiceTime > 2.0f ) // Only handle shuffle requests every so often. + { + m_flLastShuffleRequestServiceTime = flCurTime; + m_bShuffleRequested = false; + return true; + } + } + + return false; + } + + void Shuffle() + { + // Build a list of who's working. + CUtlVector<unsigned short> whosWorking; + if ( m_bUsingMasterLocalThreads ) + { + whosWorking.AddToTail( VMPI_MASTER_ID ); + Assert( VMPI_MASTER_ID == 0 ); + } + + { + CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); + for ( int i=0; i < pWorkersReady->m_WorkersReady.Count(); i++ ) + { + int iWorker = pWorkersReady->m_WorkersReady[i]; + if ( VMPI_IsProcConnected( iWorker ) ) + whosWorking.AddToTail( iWorker ); + } + m_WorkersReadyCS.Unlock(); + } + + // Before sending the shuffle command, tell any of these active workers about the pending WUs completed. + CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); + + m_WUSCompletedMessageBuffer.setLen( 0 ); + if ( BuildWUsCompletedMessage( pWUsCompleted->m_Pending, m_WUSCompletedMessageBuffer ) > 0 ) + { + for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) + { + VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), whosWorking[i] ); + } + } + pWUsCompleted->m_Completed.AddMultipleToTail( pWUsCompleted->m_Pending.Count(), pWUsCompleted->m_Pending.Base() ); // Add the pending ones to the full list now. + pWUsCompleted->m_Pending.RemoveAll(); + + m_WUsCompletedCS.Unlock(); + + // Shuffle ourselves. + m_WorkUnitWalker.Shuffle( whosWorking.Count() ); + + // Send the shuffle command to the workers. + MessageBuffer mb; + PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_SHUFFLE ); + + unsigned short nWorkers = whosWorking.Count(); + mb.write( &nWorkers, sizeof( nWorkers ) ); + + CRC32_t shuffleCRC = m_WorkUnitWalker.GetShuffleCRC(); + mb.write( &shuffleCRC, sizeof( shuffleCRC ) ); + + // Now for each worker, assign him an index in the shuffle and send the shuffle command. + int workerIDPos = mb.getLen(); + unsigned short id = 0; + mb.write( &id, sizeof( id ) ); + for ( int i=m_bUsingMasterLocalThreads; i < whosWorking.Count(); i++ ) + { + id = (unsigned short)i; + mb.update( workerIDPos, &id, sizeof( id ) ); + VMPI_SendData( mb.data, mb.getLen(), whosWorking[i] ); + } + } + + int BuildWUsCompletedMessage( CUtlVector<WUIndexType> &wusCompleted, MessageBuffer &mb ) + { + PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WUS_COMPLETED_LIST ); + m_pInfo->WriteWUIndex( wusCompleted.Count(), &mb ); + for ( int i=0; i < wusCompleted.Count(); i++ ) + { + m_pInfo->WriteWUIndex( wusCompleted[i], &mb ); + } + return wusCompleted.Count(); + } + + virtual void OnWorkerReady( int iSource ) + { + CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); + if ( pWorkersReady->m_WorkersReady.Find( iSource ) == -1 ) + { + pWorkersReady->m_WorkersReady.AddToTail( iSource ); + + // Get this guy up to speed on which WUs are done. + { + CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); + m_WUSCompletedMessageBuffer.setLen( 0 ); + BuildWUsCompletedMessage( pWUsCompleted->m_Completed, m_WUSCompletedMessageBuffer ); + m_WUsCompletedCS.Unlock(); + } + + VMPI_SendData( m_WUSCompletedMessageBuffer.data, m_WUSCompletedMessageBuffer.getLen(), iSource ); + m_bForceShuffle = true; + } + m_WorkersReadyCS.Unlock(); + } + + virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) + { + return Thread_HandleWorkUnitResults( iWorkUnit ); + } + + bool Thread_HandleWorkUnitResults( WUIndexType iWorkUnit ) + { + if ( m_WorkUnitWalker.Thread_IsWorkUnitCompleted( iWorkUnit ) ) + { + return false; + } + else + { + m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWorkUnit ); + + // We need the lock on here because our own worker threads can call into here. + CWUsCompleted *pWUsCompleted = m_WUsCompletedCS.Lock(); + pWUsCompleted->m_Pending.AddToTail( iWorkUnit ); + m_WUsCompletedCS.Unlock(); + return true; + } + } + + virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) + { + if ( pBuf->data[1] == DW_SUBPACKETID_REQUEST_SHUFFLE ) + { + if ( bIgnoreContents ) + return true; + + m_bShuffleRequested = true; + } + return false; + } + + virtual void DisconnectHandler( int workerID ) + { + CWorkersReady *pWorkersReady = m_WorkersReadyCS.Lock(); + + if ( pWorkersReady->m_WorkersReady.Find( workerID ) != -1 ) + m_bForceShuffle = true; + + m_WorkersReadyCS.Unlock(); + } + +public: + CDSInfo *m_pInfo; + + class CWorkersReady + { + public: + CUtlVector<int> m_WorkersReady; // The list of workers who have said they're ready to participate. + }; + CCriticalSectionData<CWorkersReady> m_WorkersReadyCS; + + class CWUsCompleted + { + public: + CUtlVector<WUIndexType> m_Completed; // WUs completed that we have sent to workers. + CUtlVector<WUIndexType> m_Pending; // WUs completed that we haven't sent to workers. + }; + CCriticalSectionData<CWUsCompleted> m_WUsCompletedCS; + MessageBuffer m_WUSCompletedMessageBuffer; // Used to send lists of completed WUs. + int m_bUsingMasterLocalThreads; + + bool m_bForceShuffle; + bool m_bShuffleRequested; + double m_flLastShuffleRequestServiceTime; + + CShuffledWorkUnitWalker m_WorkUnitWalker; +}; + + +class CDistributor_SDKWorker : public IWorkUnitDistributorWorker, public IShuffleRequester +{ +public: + virtual void Init( CDSInfo *pInfo ) + { + m_iMyWorkUnitWalkerID = -1; + m_pInfo = pInfo; + m_WorkUnitWalker.Init( pInfo->m_nWorkUnits, this ); + } + + virtual void Release() + { + delete this; + } + + virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) + { + // If we don't have an ID yet, we haven't received a Shuffle() command, so we're waiting for that before working. + // TODO: we could do some random WUs here while we're waiting, although that could suck if the WUs take forever to do + // and they're duplicates. + if ( m_iMyWorkUnitWalkerID == -1 ) + return false; + + // Look in our current shuffled list of work units for the next one. + return m_WorkUnitWalker.Thread_GetNextWorkUnit( m_iMyWorkUnitWalkerID, pWUIndex ); + } + + virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) + { + m_WorkUnitWalker.Thread_NoteLocalWorkUnitCompleted( iWU ); + } + + virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) + { + // If it's a SHUFFLE message, then shuffle.. + if ( pBuf->data[1] == DW_SUBPACKETID_SHUFFLE ) + { + if ( bIgnoreContents ) + return true; + + unsigned short nWorkers, myID; + CRC32_t shuffleCRC; + pBuf->read( &nWorkers, sizeof( nWorkers ) ); + pBuf->read( &shuffleCRC, sizeof( shuffleCRC ) ); + pBuf->read( &myID, sizeof( myID ) ); + m_iMyWorkUnitWalkerID = myID; + + m_WorkUnitWalker.Shuffle( nWorkers ); + if ( m_WorkUnitWalker.GetShuffleCRC() != shuffleCRC ) + { + static int nWarnings = 1; + if ( ++nWarnings <= 2 ) + Warning( "\nShuffle CRC mismatch\n" ); + } + return true; + } + else if ( pBuf->data[1] == DW_SUBPACKETID_WUS_COMPLETED_LIST ) + { + if ( bIgnoreContents ) + return true; + + WUIndexType nCompleted; + m_pInfo->ReadWUIndex( &nCompleted, pBuf ); + for ( WUIndexType i=0; i < nCompleted; i++ ) + { + WUIndexType iWU; + m_pInfo->ReadWUIndex( &iWU, pBuf ); + m_WorkUnitWalker.Thread_NoteWorkUnitCompleted( iWU ); + } + + return true; + } + + return false; + } + + virtual void RequestShuffle() + { + // Ok.. request a reshuffle. + MessageBuffer mb; + PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_REQUEST_SHUFFLE ); + VMPI_SendData( mb.data, mb.getLen(), VMPI_MASTER_ID ); + } + +private: + CDSInfo *m_pInfo; + CShuffledWorkUnitWalker m_WorkUnitWalker; + int m_iMyWorkUnitWalkerID; +}; + + + +IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster() +{ + return new CDistributor_SDKMaster; +} + +IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker() +{ + return new CDistributor_SDKWorker; +} + |