summaryrefslogtreecommitdiff
path: root/utils/vmpi/vmpi_distribute_work_internal.h
diff options
context:
space:
mode:
authorFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
committerFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
commit3bf9df6b2785fa6d951086978a3e66f49427166a (patch)
tree2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /utils/vmpi/vmpi_distribute_work_internal.h
downloadarchived-source-engine-2018-hl2-src-master.tar.xz
archived-source-engine-2018-hl2-src-master.zip
Diffstat (limited to 'utils/vmpi/vmpi_distribute_work_internal.h')
-rw-r--r--utils/vmpi/vmpi_distribute_work_internal.h251
1 files changed, 251 insertions, 0 deletions
diff --git a/utils/vmpi/vmpi_distribute_work_internal.h b/utils/vmpi/vmpi_distribute_work_internal.h
new file mode 100644
index 0000000..74aa906
--- /dev/null
+++ b/utils/vmpi/vmpi_distribute_work_internal.h
@@ -0,0 +1,251 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+//=============================================================================
+
+#ifndef VMPI_DISTRIBUTE_WORK_INTERNAL_H
+#define VMPI_DISTRIBUTE_WORK_INTERNAL_H
+#ifdef _WIN32
+#pragma once
+#endif
+
+
+#define VMPI_DISTRIBUTE_WORK_EXTRA_SUBPACKET_BASE 50
+
+
+typedef uint64 WUIndexType;
+class CDSInfo;
+extern bool g_bVMPIEarlyExit;
+
+
+// These classes are overridden to handle the details of communicating and scheduling work units.
+class IWorkUnitDistributorMaster
+{
+public:
+ virtual void Release() = 0;
+
+ virtual void DistributeWork_Master( CDSInfo *pInfo ) = 0;
+
+ virtual void OnWorkerReady( int iWorker ) = 0;
+ virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) = 0;
+
+ // Called when the results for a work unit arrive. This function must return false if
+ // we've already received the results for this work unit.
+ virtual bool HandleWorkUnitResults( WUIndexType iWorkUnit ) = 0;
+
+ virtual void DisconnectHandler( int workerID ) = 0;
+};
+
+class IWorkUnitDistributorWorker
+{
+public:
+ virtual void Release() = 0;
+
+ virtual void Init( CDSInfo *pInfo ) = 0;
+
+ // Called by worker threads to get the next work unit to do.
+ virtual bool GetNextWorkUnit( WUIndexType *pWUIndex ) = 0;
+
+ // Called by the worker threads after a work unit is completed.
+ virtual void NoteLocalWorkUnitCompleted( WUIndexType iWU ) = 0;
+
+ // Called by the main thread when a packet comes in.
+ virtual bool HandlePacket( MessageBuffer *pBuf, int iSource, bool bIgnoreContents ) = 0;
+};
+
+
+
+template < typename T, typename Derived >
+class CVisibleWindowVectorT : protected CUtlVector < T >
+{
+ typedef CUtlVector< T > BaseClass;
+
+public:
+ CVisibleWindowVectorT() : m_uiBase( 0 ), m_uiTotal( 0 ) {}
+
+public:
+ inline void PostInitElement( uint64 uiRealIdx, T &newElement ) { /* do nothing */ return; }
+ inline void PostInitElement( uint64 uiRealIdx, T &newElement, T const &x ) { newElement = x; }
+
+public:
+ // Resets the content and makes size "uiTotal"
+ void Reset( uint64 uiTotal ) {
+ BaseClass::RemoveAll();
+ BaseClass::EnsureCapacity( min( 100, (int)uiTotal ) );
+ m_uiBase = 0;
+ m_uiTotal = uiTotal;
+ }
+
+ // Gets the element from the window, otherwise NULL
+ T & Get( uint64 idx ) {
+ T *pElement = (( idx >= m_uiBase && idx < m_uiBase + BaseClass::Count() ) ? ( BaseClass::Base() + size_t( idx - m_uiBase ) ) : NULL );
+ Assert( pElement );
+ return *pElement;
+ }
+
+ // Expands the window to see the element by its index
+ void ExpandWindow( uint64 idxAccessible ) {
+ Assert( idxAccessible >= m_uiBase );
+ if ( idxAccessible >= m_uiBase + BaseClass::Count() ) {
+ int iOldCount = BaseClass::Count();
+ int iAddElements = int(idxAccessible - m_uiBase) - iOldCount + 1;
+ Assert( iOldCount + iAddElements <= 100000000 /* really need 100 Mb at once? */ );
+ BaseClass::AddMultipleToTail( iAddElements );
+ for ( int iNewCount = BaseClass::Count(); iOldCount < iNewCount; ++ iOldCount )
+ static_cast< Derived * >( this )->PostInitElement( m_uiBase + iOldCount, BaseClass::Element( iOldCount ) );
+ }
+ }
+
+ // Expands the window and initializes the new elements to a given value
+ void ExpandWindow( uint64 idxAccessible, T const& x ) {
+ Assert( idxAccessible >= m_uiBase );
+ if ( idxAccessible >= m_uiBase + BaseClass::Count() ) {
+ int iOldCount = BaseClass::Count();
+ int iAddElements = int(idxAccessible - m_uiBase) - iOldCount + 1;
+ Assert( unsigned(iAddElements) <= 50000000 /* growing 50 Mb at once? */ );
+ BaseClass::AddMultipleToTail( iAddElements );
+ for ( int iNewCount = BaseClass::Count(); iOldCount < iNewCount; ++ iOldCount )
+ static_cast< Derived * >( this )->PostInitElement( m_uiBase + iOldCount, BaseClass::Element( iOldCount ), x );
+ }
+ }
+
+ // Shrinks the window to drop some of the elements from the head
+ void ShrinkWindow( uint64 idxDrop ) {
+ Assert( idxDrop >= m_uiBase && idxDrop <= m_uiBase + BaseClass::Count() );
+ if ( idxDrop >= m_uiBase && idxDrop <= m_uiBase + BaseClass::Count() ) {
+ int iDropElements = int( idxDrop - m_uiBase ) + 1;
+ m_uiBase += iDropElements;
+ BaseClass::RemoveMultiple( 0, min( iDropElements, BaseClass::Count() ) );
+ }
+ }
+
+ // First possible index in this vector (only past dropped items)
+ uint64 FirstPossibleIndex() const { return m_uiBase; }
+
+ // Last possible index in this vector
+ uint64 PastPossibleIndex() const { return m_uiTotal; }
+ // Past visible window index in this vector
+ uint64 PastVisibleIndex() const { return m_uiBase + BaseClass::Count(); }
+
+protected:
+ uint64 m_uiBase, m_uiTotal;
+};
+
+template < typename T >
+class CVisibleWindowVector : public CVisibleWindowVectorT < T, CVisibleWindowVector< T > >
+{
+public:
+ CVisibleWindowVector() {}
+};
+
+
+template < typename T >
+T const * GenericFind( T const *pBegin, T const *pEnd, T const &x )
+{
+ for ( ; pBegin != pEnd; ++ pBegin )
+ if ( *pBegin == x )
+ break;
+ return pBegin;
+}
+
+
+template < typename T >
+T * GenericFind( T *pBegin, T *pEnd, T const &x )
+{
+ for ( ; pBegin != pEnd; ++ pBegin )
+ if ( *pBegin == x )
+ break;
+ return pBegin;
+}
+
+
+class CWorkerInfo
+{
+public:
+ ProcessWorkUnitFn m_pProcessFn;
+ CVisibleWindowVector<WUIndexType> m_WorkUnitsRunning; // A list of work units currently running, index is the thread index
+ CCriticalSection m_WorkUnitsRunningCS;
+};
+
+
+class CMasterInfo
+{
+public:
+
+ // Only used by the master.
+ ReceiveWorkUnitFn m_ReceiveFn;
+};
+
+
+class CDSInfo
+{
+public:
+ inline void WriteWUIndex( WUIndexType iWU, MessageBuffer *pBuf )
+ {
+ if ( m_nWorkUnits <= 0xFFFF )
+ {
+ Assert( iWU <= 0xFFFF );
+ unsigned short val = (unsigned short)iWU;
+ pBuf->write( &val, sizeof( val ) );
+ }
+ else if ( m_nWorkUnits <= 0xFFFFFFFF )
+ {
+ Assert( iWU <= 0xFFFF );
+ unsigned int val = (unsigned int)iWU;
+ pBuf->write( &val, sizeof( val ) );
+ }
+ else
+ {
+ pBuf->write( &iWU, sizeof( iWU ) );
+ }
+ }
+
+ inline void ReadWUIndex( WUIndexType *pWU, MessageBuffer *pBuf )
+ {
+ if ( m_nWorkUnits <= 0xFFFF )
+ {
+ unsigned short val;
+ pBuf->read( &val, sizeof( val ) );
+ *pWU = val;
+ }
+ else if ( m_nWorkUnits <= 0xFFFFFFFF )
+ {
+ unsigned int val;
+ pBuf->read( &val, sizeof( val ) );
+ *pWU = val;
+ }
+ else
+ {
+ pBuf->read( pWU, sizeof( *pWU ) );
+ }
+ }
+
+public:
+ CWorkerInfo m_WorkerInfo;
+ CMasterInfo m_MasterInfo;
+
+ bool m_bMasterReady; // Set to true when the master is ready to go.
+ bool m_bMasterFinished;
+ WUIndexType m_nWorkUnits;
+ char m_cPacketID;
+};
+
+
+// Called to write the packet ID, the subpacket ID, and the ID of which DistributeWork() call we're at.
+void PrepareDistributeWorkHeader( MessageBuffer *pBuf, unsigned char cSubpacketID );
+
+// Called from threads on the master to process a completed work unit.
+void NotifyLocalMasterCompletedWorkUnit( WUIndexType iWorkUnit );
+void CheckLocalMasterCompletedWorkUnits();
+
+
+// Creation functions for different distributors.
+IWorkUnitDistributorMaster* CreateWUDistributor_DefaultMaster();
+IWorkUnitDistributorWorker* CreateWUDistributor_DefaultWorker();
+
+IWorkUnitDistributorMaster* CreateWUDistributor_SDKMaster();
+IWorkUnitDistributorWorker* CreateWUDistributor_SDKWorker();
+
+
+#endif // VMPI_DISTRIBUTE_WORK_INTERNAL_H