diff options
| author | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
|---|---|---|
| committer | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
| commit | 3bf9df6b2785fa6d951086978a3e66f49427166a (patch) | |
| tree | 2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi_distribute_work_default.cpp | |
| download | archived-source-engine-2018-hl2-src-master.tar.xz archived-source-engine-2018-hl2-src-master.zip | |
Diffstat (limited to 'utils/vmpi/vmpi_distribute_work_default.cpp')
| -rw-r--r-- | utils/vmpi/vmpi_distribute_work_default.cpp | 602 |
1 files changed, 602 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_distribute_work_default.cpp b/utils/vmpi/vmpi_distribute_work_default.cpp new file mode 100644 index 0000000..4a0e2c6 --- /dev/null +++ b/utils/vmpi/vmpi_distribute_work_default.cpp @@ -0,0 +1,602 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +//============================================================================= + +#include "vmpi.h" +#include "vmpi_distribute_work.h" +#include "tier0/platform.h" +#include "tier0/dbg.h" +#include "utlvector.h" +#include "utllinkedlist.h" +#include "vmpi_dispatch.h" +#include "pacifier.h" +#include "vstdlib/random.h" +#include "mathlib/mathlib.h" +#include "threadhelpers.h" +#include "threads.h" +#include "tier1/strtools.h" +#include "tier1/utlmap.h" +#include "tier1/smartptr.h" +#include "tier0/icommandline.h" +#include "cmdlib.h" +#include "vmpi_distribute_tracker.h" +#include "vmpi_distribute_work_internal.h" + + +#define DW_SUBPACKETID_WU_ASSIGNMENT (VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE+0) + + + +static int s_numWusToDeal = -1; + +void VMPI_SetWorkUnitsPartitionSize( int numWusToDeal ) +{ + s_numWusToDeal = numWusToDeal; +} + + +class CWorkUnitInfo +{ +public: + WUIndexType m_iWorkUnit; +}; + + +class CWULookupInfo +{ +public: + CWULookupInfo() : m_iWUInfo( -1 ), m_iPartition( -222222 ), m_iPartitionListIndex( -1 ) {} + +public: + int m_iWUInfo; // Index into m_WUInfo. + int m_iPartition; // Which partition it's in. + int m_iPartitionListIndex; // Index into its partition's m_WUs. +}; + + +class CPartitionInfo +{ +public: + typedef CUtlLinkedList< WUIndexType, int > PartitionWUs; + +public: + int m_iPartition; // Index into m_Partitions. + int m_iWorker; // Who owns this partition? + PartitionWUs m_WUs; // Which WUs are in this partition? +}; + + +// Work units tracker to track consecutive finished blocks +class CWorkUnitsTracker +{ +public: + CWorkUnitsTracker() {} + +public: + // Initializes the unit tracker to receive numUnits in future + void PrepareForWorkUnits( uint64 numUnits ); + // Signals that a work unit has been finished + // returns a zero-based index of the next pending work unit + // up to which the task list has been processed fully now + // because the received work unit filled the gap or was the next pending work unit. + // returns 0 to indicate that this work unit is a "faster processed future work unit". + uint64 WorkUnitFinished( uint64 iWorkUnit ); + +public: + enum WUInfo { kNone, kTrigger, kDone }; + CVisibleWindowVector< uint8 > m_arrInfo; +}; + +void CWorkUnitsTracker::PrepareForWorkUnits( uint64 numUnits ) +{ + m_arrInfo.Reset( numUnits + 1 ); + + if ( numUnits ) + { + m_arrInfo.ExpandWindow( 2ull, kNone ); + m_arrInfo.Get( 0ull ) = kTrigger; + } +} + +uint64 CWorkUnitsTracker::WorkUnitFinished( uint64 iWorkUnit ) +{ + uint64 uiResult = uint64( 0 ); + + if ( iWorkUnit >= m_arrInfo.FirstPossibleIndex() && iWorkUnit < m_arrInfo.PastPossibleIndex() ) + { + // Need to access the element + m_arrInfo.ExpandWindow( iWorkUnit + 1, kNone ); + + // Set it done + uint8 &rchThere = m_arrInfo.Get( iWorkUnit ), chThere = rchThere; + rchThere = kDone; + + // Should we trigger? + if ( kTrigger == chThere ) + { + // Go along all "done" work units and trigger the last found one + while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && + ( kDone == m_arrInfo.Get( iWorkUnit ) ) ) + continue; + + m_arrInfo.Get( iWorkUnit ) = kTrigger; + m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); + uiResult = iWorkUnit; + } + else if( iWorkUnit == m_arrInfo.FirstPossibleIndex() ) + { + // Go along all "done" work units and shrink including the last found one + while ( ( ( ++ iWorkUnit ) < m_arrInfo.PastVisibleIndex() ) && + ( kDone == m_arrInfo.Get( iWorkUnit ) ) ) + continue; + + m_arrInfo.ShrinkWindow( iWorkUnit - 1 ); + } + } + + return uiResult; +} + +CWorkUnitsTracker g_MasterWorkUnitsTracker; + + + +static bool CompareSoonestWorkUnitSets( CPartitionInfo::PartitionWUs * const &x, CPartitionInfo::PartitionWUs * const &y ) +{ + // Compare by fourth/second/first job in the partitions + WUIndexType missing = ~WUIndexType(0); + WUIndexType jobsX[4] = { missing, missing, missing, missing }; + WUIndexType jobsY[4] = { missing, missing, missing, missing }; + int counter = 0; + + counter = 0; + FOR_EACH_LL( (*x), i ) + { + jobsX[ counter ++ ] = (*x)[i]; + if ( counter >= 4 ) + break; + } + + counter = 0; + FOR_EACH_LL( (*y), i ) + { + jobsY[ counter ++ ] = (*y)[i]; + if ( counter >= 4 ) + break; + } + + // Compare + if ( jobsX[3] != jobsY[3] ) + return ( jobsX[3] < jobsY[3] ); + + if ( jobsX[1] != jobsY[1] ) + return ( jobsX[1] < jobsY[1] ); + + return jobsX[0] < jobsY[0]; +} + + + +class CDistributor_DefaultMaster : public IWorkUnitDistributorMaster +{ +public: + virtual void Release() + { + delete this; + } + + virtual void DistributeWork_Master( CDSInfo *pInfo ) + { + m_pInfo = pInfo; + g_MasterWorkUnitsTracker.PrepareForWorkUnits( m_pInfo->m_nWorkUnits ); + + m_WULookup.Reset( pInfo->m_nWorkUnits ); + while ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) + { + VMPI_DispatchNextMessage( 200 ); + + VMPITracker_HandleDebugKeypresses(); + + if ( g_pDistributeWorkCallbacks && g_pDistributeWorkCallbacks->Update() ) + break; + } + } + + virtual void OnWorkerReady( int iSource ) + { + AssignWUsToWorker( iSource ); + } + + virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) + { + CWULookupInfo *pLookup = NULL; + if ( iWorkUnit >= m_WULookup.FirstPossibleIndex() && iWorkUnit < m_WULookup.PastVisibleIndex() ) + pLookup = &m_WULookup.Get( iWorkUnit ); + + if ( !pLookup || pLookup->m_iWUInfo == -1 ) + return false; + + // Mark this WU finished and remove it from the list of pending WUs. + m_WUInfo.Remove( pLookup->m_iWUInfo ); + pLookup->m_iWUInfo = -1; + + + // Get rid of the WU from its partition. + int iPartition = pLookup->m_iPartition; + CPartitionInfo *pPartition = m_Partitions[iPartition]; + pPartition->m_WUs.Remove( pLookup->m_iPartitionListIndex ); + + // Shrink the window of the lookup work units + if ( iWorkUnit == m_WULookup.FirstPossibleIndex() ) + { + WUIndexType kwu = iWorkUnit; + for ( WUIndexType kwuEnd = m_WULookup.PastVisibleIndex(); kwu < kwuEnd; ++ kwu ) + { + if ( -1 != m_WULookup.Get( kwu ).m_iWUInfo && kwu > iWorkUnit ) + break; + } + m_WULookup.ShrinkWindow( kwu - 1 ); + } + + // Give the worker some new work if need be. + if ( pPartition->m_WUs.Count() == 0 ) + { + int iPartitionWorker = pPartition->m_iWorker; + delete pPartition; + m_Partitions.Remove( iPartition ); + + // If there are any more WUs remaining, give the worker from this partition some more of them. + if ( m_WULookup.FirstPossibleIndex() < m_WULookup.PastPossibleIndex() ) + { + AssignWUsToWorker( iPartitionWorker ); + } + } + + uint64 iDoneWorkUnits = g_MasterWorkUnitsTracker.WorkUnitFinished( iWorkUnit ); + if ( iDoneWorkUnits && g_pDistributeWorkCallbacks ) + { + g_pDistributeWorkCallbacks->OnWorkUnitsCompleted( iDoneWorkUnits ); + } + + return true; + } + + virtual void DisconnectHandler( int workerID ) + { + int iPartitionLookup = FindPartitionByWorker( workerID ); + if ( iPartitionLookup != -1 ) + { + // Mark this guy's partition as unowned so another worker can get it. + CPartitionInfo *pPartition = m_Partitions[iPartitionLookup]; + pPartition->m_iWorker = -1; + } + } + + CPartitionInfo* AddPartition( int iWorker ) + { + CPartitionInfo *pNew = new CPartitionInfo; + pNew->m_iPartition = m_Partitions.AddToTail( pNew ); + pNew->m_iWorker = iWorker; + return pNew; + } + + bool SplitWUsPartition( CPartitionInfo *pPartitionLarge, + CPartitionInfo **ppFirstHalf, CPartitionInfo **ppSecondHalf, + int iFirstHalfWorker, int iSecondHalfWorker ) + { + int nCount = pPartitionLarge->m_WUs.Count(); + + if ( nCount > 1 ) // Allocate the partitions for the two workers + { + *ppFirstHalf = AddPartition( iFirstHalfWorker ); + *ppSecondHalf = AddPartition( iSecondHalfWorker ); + } + else // Specially transfer a partition with too few work units + { + *ppFirstHalf = NULL; + *ppSecondHalf = AddPartition( iSecondHalfWorker ); + } + + // Prepare for transfer + CPartitionInfo *arrNewParts[2] = { *ppFirstHalf ? *ppFirstHalf : *ppSecondHalf, *ppSecondHalf }; + + // Transfer the work units: + // alternate first/second halves + // don't put more than "half deal units" tasks into the second half + // e.g. { 1, 2, 3, 4 } + // becomes: 1st half { 1, 2 }, 2nd half { 3, 4 } + for ( int k = 0; k < nCount; ++ k ) + { + int iHead = pPartitionLarge->m_WUs.Head(); + WUIndexType iWU = pPartitionLarge->m_WUs[ iHead ]; + pPartitionLarge->m_WUs.Remove( iHead ); + + /* + int nHalf = !!( ( k % 2 ) || ( k >= nCount - 1 ) ); + if ( k == 5 ) // no more than 2 jobs to branch off + arrNewParts[ 1 ] = arrNewParts[ 0 ]; + */ + int nHalf = !( k < nCount/2 ); + CPartitionInfo *pTo = arrNewParts[ nHalf ]; + + CWULookupInfo &li = m_WULookup.Get( iWU ); + li.m_iPartition = pTo->m_iPartition; + li.m_iPartitionListIndex = pTo->m_WUs.AddToTail( iWU ); + } + + // LogPartitionsWorkUnits( pInfo ); + return true; + } + + void AssignWUsToWorker( int iWorker ) + { + // Get rid of this worker's old partition. + int iPrevious = FindPartitionByWorker( iWorker ); + if ( iPrevious != -1 ) + { + delete m_Partitions[iPrevious]; + m_Partitions.Remove( iPrevious ); + } + + if ( g_iVMPIVerboseLevel >= 1 ) + Msg( "A" ); + + + CVisibleWindowVector< CWULookupInfo > &vlkup = m_WULookup; + if ( CommandLine()->FindParm( "-mpi_NoScheduler" ) ) + { + Warning( "\n\n-mpi_NoScheduler found: Warning - this should only be used for testing and with 1 worker!\n\n" ); + vlkup.ExpandWindow( m_pInfo->m_nWorkUnits ); + CPartitionInfo *pPartition = AddPartition( iWorker ); + for ( int i=0; i < m_pInfo->m_nWorkUnits; i++ ) + { + CWorkUnitInfo info; + info.m_iWorkUnit = i; + + CWULookupInfo &li = vlkup.Get( i ); + li.m_iPartition = pPartition->m_iPartition; + li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); + li.m_iWUInfo = m_WUInfo.AddToTail( info ); + } + + SendPartitionToWorker( pPartition, iWorker ); + return; + } + + + // Any partitions abandoned by workers? + int iAbandonedPartition = FindPartitionByWorker( -1 ); + if ( -1 != iAbandonedPartition ) + { + CPartitionInfo *pPartition = m_Partitions[ iAbandonedPartition ]; + pPartition->m_iWorker = iWorker; + SendPartitionToWorker( pPartition, iWorker ); + } + + // Any absolutely untouched partitions yet? + else if ( vlkup.PastVisibleIndex() < vlkup.PastPossibleIndex() ) + { + // Figure out how many WUs to include in a batch + int numWusToDeal = s_numWusToDeal; + if ( numWusToDeal <= 0 ) + { + uint64 uiFraction = vlkup.PastPossibleIndex() / g_nMaxWorkerCount; + Assert( uiFraction < INT_MAX/2 ); + + numWusToDeal = int( uiFraction ); + if ( numWusToDeal <= 0 ) + numWusToDeal = 8; + } + + // Allocate room for upcoming work units lookup + WUIndexType iBegin = vlkup.PastVisibleIndex(); + WUIndexType iEnd = min( iBegin + g_nMaxWorkerCount * numWusToDeal, vlkup.PastPossibleIndex() ); + vlkup.ExpandWindow( iEnd - 1 ); + + // Allocate a partition + size_t numPartitions = min( ( size_t )(iEnd - iBegin), ( size_t )g_nMaxWorkerCount ); + CArrayAutoPtr< CPartitionInfo * > spArrPartitions( new CPartitionInfo* [ numPartitions ] ); + CPartitionInfo **arrPartitions = spArrPartitions.Get(); + + arrPartitions[0] = AddPartition( iWorker ); + for ( size_t k = 1; k < numPartitions; ++ k ) + arrPartitions[k] = AddPartition( -1 ); + + // Assign upcoming work units to the partitions. + for ( WUIndexType i = iBegin ; i < iEnd; ++ i ) + { + CWorkUnitInfo info; + info.m_iWorkUnit = i; + + CPartitionInfo *pPartition = arrPartitions[ size_t( (i - iBegin) % numPartitions ) ]; + + CWULookupInfo &li = vlkup.Get( i ); + li.m_iPartition = pPartition->m_iPartition; + li.m_iPartitionListIndex = pPartition->m_WUs.AddToTail( i ); + li.m_iWUInfo = m_WUInfo.AddToTail( info ); + } + + // Now send this guy the WU list in his partition. + SendPartitionToWorker( arrPartitions[0], iWorker ); + } + + // Split one of the last partitions to finish sooner + else + { + // Find a partition to split. + int iPartToSplit = FindSoonestPartition(); + if ( iPartToSplit >= 0 ) + { + CPartitionInfo *pPartition = m_Partitions[ iPartToSplit ]; + + CPartitionInfo *pOldHalf = NULL, *pNewHalf = NULL; + int iOldWorker = pPartition->m_iWorker, iNewWorker = iWorker; + if ( SplitWUsPartition( pPartition, &pOldHalf, &pNewHalf, iOldWorker, iNewWorker ) ) + { + if ( pOldHalf ) + SendPartitionToWorker( pOldHalf, iOldWorker ); + if ( pNewHalf ) + SendPartitionToWorker( pNewHalf, iNewWorker ); + + // Delete the partition that got split + Assert( pPartition->m_WUs.Count() == 0 ); + delete pPartition; + m_Partitions.Remove( iPartToSplit ); + } + } + } + } + + int FindSoonestPartition() + { + CUtlLinkedList < CPartitionInfo *, int > &lst = m_Partitions; + + // Sorted partitions + CUtlMap< CPartitionInfo::PartitionWUs *, int > sortedPartitions ( CompareSoonestWorkUnitSets ); + sortedPartitions.EnsureCapacity( lst.Count() ); + FOR_EACH_LL( lst, i ) + { + sortedPartitions.Insert( &lst[i]->m_WUs, i ); + } + + if ( sortedPartitions.Count() ) + { + return sortedPartitions.Element( sortedPartitions.FirstInorder() ); + } + + return lst.Head(); + } + + int FindPartitionByWorker( int iWorker ) + { + FOR_EACH_LL( m_Partitions, i ) + { + if ( m_Partitions[i]->m_iWorker == iWorker ) + return i; + } + return -1; + } + + void SendPartitionToWorker( CPartitionInfo *pPartition, int iWorker ) + { + // Stuff the next nWUs work units into the buffer. + MessageBuffer mb; + PrepareDistributeWorkHeader( &mb, DW_SUBPACKETID_WU_ASSIGNMENT ); + + FOR_EACH_LL( pPartition->m_WUs, i ) + { + WUIndexType iWU = pPartition->m_WUs[i]; + mb.write( &iWU, sizeof( iWU ) ); + VMPITracker_WorkUnitSentToWorker( ( int ) iWU, iWorker ); + } + + VMPI_SendData( mb.data, mb.getLen(), iWorker ); + } + + virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) + { + return false; + } + +private: + + CDSInfo *m_pInfo; + + CUtlLinkedList<CPartitionInfo*,int> m_Partitions; + CVisibleWindowVector<CWULookupInfo> m_WULookup; // Map work unit index to CWorkUnitInfo. + CUtlLinkedList<CWorkUnitInfo,int> m_WUInfo; // Sorted with most elegible WU at the head. +}; + + + +class CDistributor_DefaultWorker : public IWorkUnitDistributorWorker +{ +public: + virtual void Release() + { + delete this; + } + + virtual void Init( CDSInfo *pInfo ) + { + } + + virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) + { + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + // NOTE: this is called from INSIDE worker threads. + if ( m_WorkUnits.Count() == 0 ) + { + return false; + } + else + { + *pWUIndex = m_WorkUnits[ m_WorkUnits.Head() ]; + m_WorkUnits.Remove( m_WorkUnits.Head() ); + return true; + } + } + + virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) + { + } + + virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) + { + if ( pBuf->data[1] == DW_SUBPACKETID_WU_ASSIGNMENT ) + { + // If the message wasn't even related to the current DistributeWork() call we're on, ignore it. + if ( bIgnoreContents ) + return true; + + if ( ((pBuf->getLen() - pBuf->getOffset()) % sizeof( WUIndexType )) != 0 ) + { + Error( "DistributeWork: invalid work units packet from master" ); + } + + // Parse out the work unit indices. + CCriticalSectionLock csLock( &m_CS ); + csLock.Lock(); + + m_WorkUnits.Purge(); + + int nIndices = (pBuf->getLen() - pBuf->getOffset()) / sizeof( WUIndexType ); + for ( int i=0; i < nIndices; i++ ) + { + WUIndexType iWU; + pBuf->read( &iWU, sizeof( iWU ) ); + + // Add the index to the list. + m_WorkUnits.AddToTail( iWU ); + } + + csLock.Unlock(); + + return true; + } + else + { + return false; + } + } + + // Threads eat up the list of WUs in here. + CCriticalSection m_CS; + CUtlLinkedList<WUIndexType, int> m_WorkUnits; // A list of work units assigned to this worker +}; + + + + +IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster() +{ + return new CDistributor_DefaultMaster; +} +IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker() +{ + return new CDistributor_DefaultWorker; +} |