summaryrefslogtreecommitdiff
path: root/vstdlib/jobthread.cpp
diff options
context:
space:
mode:
authorFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
committerFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
commit3bf9df6b2785fa6d951086978a3e66f49427166a (patch)
tree2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /vstdlib/jobthread.cpp
downloadarchived-source-engine-2018-hl2-src-master.tar.xz
archived-source-engine-2018-hl2-src-master.zip
Diffstat (limited to 'vstdlib/jobthread.cpp')
-rw-r--r--vstdlib/jobthread.cpp1457
1 files changed, 1457 insertions, 0 deletions
diff --git a/vstdlib/jobthread.cpp b/vstdlib/jobthread.cpp
new file mode 100644
index 0000000..46d843e
--- /dev/null
+++ b/vstdlib/jobthread.cpp
@@ -0,0 +1,1457 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+//=============================================================================
+
+#if defined( _WIN32 ) && !defined( _X360 )
+#define WIN32_LEAN_AND_MEAN
+#include <windows.h>
+#endif
+#include "tier0/dbg.h"
+#include "tier0/tslist.h"
+#include "tier0/icommandline.h"
+#include "vstdlib/jobthread.h"
+#include "vstdlib/random.h"
+#include "tier1/functors.h"
+#include "tier1/fmtstr.h"
+#include "tier1/utlvector.h"
+#include "tier1/generichash.h"
+#include "tier0/vprof.h"
+
+#if defined( _X360 )
+#include "xbox/xbox_win32stubs.h"
+#endif
+
+#include "tier0/memdbgon.h"
+
+
+class CJobThread;
+
+//-----------------------------------------------------------------------------
+
+inline void ServiceJobAndRelease( CJob *pJob, int iThread = -1 )
+{
+ // TryLock() would only fail if another thread has entered
+ // Execute() or Abort()
+ if ( !pJob->IsFinished() && pJob->TryLock() )
+ {
+ // ...service the request
+ pJob->SetServiceThread( iThread );
+ pJob->Execute();
+ pJob->Unlock();
+ }
+ pJob->Release();
+}
+
+//-----------------------------------------------------------------------------
+
+class ALIGN16 CJobQueue
+{
+public:
+ CJobQueue() :
+ m_nItems( 0 ),
+ m_nMaxItems( INT_MAX )
+ {
+ for ( int i = 0; i < ARRAYSIZE( m_pQueues ); i++ )
+ {
+ m_pQueues[i] = new CTSQueue<CJob *>;
+ }
+ }
+
+ ~CJobQueue()
+ {
+ for ( int i = 0; i < ARRAYSIZE( m_pQueues ); i++ )
+ {
+ delete m_pQueues[i];
+ }
+ }
+
+ int Count()
+ {
+ return m_nItems;
+ }
+
+ int Count( JobPriority_t priority )
+ {
+ return m_pQueues[priority]->Count();
+ }
+
+
+ CJob *PrePush()
+ {
+ if ( m_nItems >= m_nMaxItems )
+ {
+ CJob *pOverflowJob;
+ if ( Pop( &pOverflowJob ) )
+ {
+ return pOverflowJob;
+ }
+ }
+ return NULL;
+ }
+
+ int Push( CJob *pJob, int iThread = -1 )
+ {
+ pJob->AddRef();
+
+ CJob *pOverflowJob;
+ int nOverflow = 0;
+ while ( ( pOverflowJob = PrePush() ) != NULL )
+ {
+ ServiceJobAndRelease( pJob );
+ nOverflow++;
+ }
+
+ m_pQueues[pJob->GetPriority()]->PushItem( pJob );
+
+ m_mutex.Lock();
+ if ( ++m_nItems == 1 )
+ {
+ m_JobAvailableEvent.Set();
+ }
+ m_mutex.Unlock();
+
+ return nOverflow;
+ }
+
+ bool Pop( CJob **ppJob )
+ {
+ m_mutex.Lock();
+ if ( !m_nItems )
+ {
+ m_mutex.Unlock();
+ *ppJob = NULL;
+ return false;
+ }
+ if ( --m_nItems == 0 )
+ {
+ m_JobAvailableEvent.Reset();
+ }
+ m_mutex.Unlock();
+
+ for ( int i = JP_HIGH; i >= 0; --i )
+ {
+ if ( m_pQueues[i]->PopItem( ppJob ) )
+ {
+ return true;
+ }
+ }
+
+
+ AssertMsg( 0, "Expected at least one queue item" );
+ *ppJob = NULL;
+ return false;
+ }
+
+ CThreadEvent &GetEventHandle()
+ {
+ return m_JobAvailableEvent;
+ }
+
+ void Flush()
+ {
+ // Only safe to call when system is suspended
+ m_mutex.Lock();
+ m_nItems = 0;
+ m_JobAvailableEvent.Reset();
+ CJob *pJob;
+ for ( int i = JP_HIGH; i >= 0; --i )
+ {
+ while ( m_pQueues[i]->PopItem( &pJob ) )
+ {
+ pJob->Abort();
+ pJob->Release();
+ }
+ }
+ m_mutex.Unlock();
+ }
+
+private:
+ CTSQueue<CJob *> *m_pQueues[JP_HIGH + 1];
+ int m_nItems;
+ int m_nMaxItems;
+ CThreadMutex m_mutex;
+ CThreadManualEvent m_JobAvailableEvent;
+
+} ALIGN16_POST;
+
+//-----------------------------------------------------------------------------
+//
+// CThreadPool
+//
+//-----------------------------------------------------------------------------
+
+class CThreadPool : public CRefCounted1<IThreadPool, CRefCountServiceMT>
+{
+public:
+ CThreadPool();
+ ~CThreadPool();
+
+ //-----------------------------------------------------
+ // Thread functions
+ //-----------------------------------------------------
+ bool Start( const ThreadPoolStartParams_t &startParams = ThreadPoolStartParams_t() ) { return Start( startParams, NULL ); }
+ bool Start( const ThreadPoolStartParams_t &startParams, const char *pszNameOverride );
+ bool Stop( int timeout = TT_INFINITE );
+ void Distribute( bool bDistribute = true, int *pAffinityTable = NULL );
+
+ //-----------------------------------------------------
+ // Functions for any thread
+ //-----------------------------------------------------
+ unsigned GetJobCount() { return m_nJobs; }
+ int NumThreads();
+ int NumIdleThreads();
+
+ //-----------------------------------------------------
+ // Pause/resume processing jobs
+ //-----------------------------------------------------
+ int SuspendExecution();
+ int ResumeExecution();
+
+ //-----------------------------------------------------
+ // Offer the current thread to the pool
+ //-----------------------------------------------------
+ virtual int YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE );
+ virtual int YieldWait( CJob **, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE );
+ void Yield( unsigned timeout );
+
+ //-----------------------------------------------------
+ // Add a native job to the queue (master thread)
+ //-----------------------------------------------------
+ void AddJob( CJob * );
+ void InsertJobInQueue( CJob * );
+
+ //-----------------------------------------------------
+ // All threads execute pFunctor asap. Thread will either wake up
+ // and execute or execute pFunctor right after completing current job and
+ // before looking for another job.
+ //-----------------------------------------------------
+ void ExecuteHighPriorityFunctor( CFunctor *pFunctor );
+
+ //-----------------------------------------------------
+ // Add an function object to the queue (master thread)
+ //-----------------------------------------------------
+ void AddFunctorInternal( CFunctor *, CJob ** = NULL, const char *pszDescription = NULL, unsigned flags = 0 );
+
+ //-----------------------------------------------------
+ // Remove a job from the queue (master thread)
+ //-----------------------------------------------------
+ virtual void ChangePriority( CJob *p, JobPriority_t priority );
+
+ //-----------------------------------------------------
+ // Bulk job manipulation (blocking)
+ //-----------------------------------------------------
+ int ExecuteToPriority( JobPriority_t toPriority, JobFilter_t pfnFilter = NULL );
+ int AbortAll();
+
+ virtual void Reserved1() {}
+
+ void WaitForIdle( bool bAll = true );
+
+private:
+ enum
+ {
+ IO_STACKSIZE = ( 64 * 1024 ),
+ COMPUTATION_STACKSIZE = 0,
+ };
+
+ //-----------------------------------------------------
+ //
+ //-----------------------------------------------------
+ CJob *PeekJob();
+ CJob *GetDummyJob();
+
+ //-----------------------------------------------------
+ // Thread functions
+ //-----------------------------------------------------
+ int Run();
+
+private:
+ friend class CJobThread;
+
+ CJobQueue m_SharedQueue;
+ CInterlockedInt m_nIdleThreads;
+ CUtlVector<CJobThread *> m_Threads;
+ CUtlVector<CThreadEvent *> m_IdleEvents;
+
+ CThreadMutex m_SuspendMutex;
+ int m_nSuspend;
+ CInterlockedInt m_nJobs;
+
+ // Some jobs should only be executed on the threadpool thread(s). Ie: the rendering thread has the GL context
+ // and the main thread coming in and "helping" with jobs breaks that pretty nicely. This flag states that
+ // only the threadpool threads should execute these jobs.
+ bool m_bExecOnThreadPoolThreadsOnly;
+};
+
+//-----------------------------------------------------------------------------
+
+JOB_INTERFACE IThreadPool *CreateThreadPool()
+{
+ return new CThreadPool;
+}
+
+JOB_INTERFACE void DestroyThreadPool( IThreadPool *pPool )
+{
+ delete pPool;
+}
+
+//-----------------------------------------------------------------------------
+
+class CGlobalThreadPool : public CThreadPool
+{
+public:
+ virtual bool Start( const ThreadPoolStartParams_t &startParamsIn )
+ {
+ int nThreads = ( CommandLine()->ParmValue( "-threads", -1 ) - 1 );
+ ThreadPoolStartParams_t startParams = startParamsIn;
+
+ if ( nThreads >= 0 )
+ {
+ startParams.nThreads = nThreads;
+ }
+ else
+ {
+ // Cap the GlobPool threads at 4.
+ startParams.nThreadsMax = 4;
+ }
+ return CThreadPool::Start( startParams, "Glob" );
+ }
+
+ virtual bool OnFinalRelease()
+ {
+ AssertMsg( 0, "Releasing global thread pool object!" );
+ return false;
+ }
+};
+
+//-----------------------------------------------------------------------------
+
+class CJobThread : public CWorkerThread
+{
+public:
+ CJobThread( CThreadPool *pOwner, int iThread ) :
+ m_SharedQueue( pOwner->m_SharedQueue ),
+ m_pOwner( pOwner ),
+ m_iThread( iThread )
+ {
+ }
+
+ CThreadEvent &GetIdleEvent()
+ {
+ return m_IdleEvent;
+ }
+
+ CJobQueue &AccessDirectQueue()
+ {
+ return m_DirectQueue;
+ }
+
+private:
+ unsigned Wait()
+ {
+ unsigned waitResult;
+ tmZone( TELEMETRY_LEVEL0, TMZF_IDLE, "%s", __FUNCTION__ );
+#ifdef WIN32
+ enum Event_t
+ {
+ CALL_FROM_MASTER,
+ SHARED_QUEUE,
+ DIRECT_QUEUE,
+
+ NUM_EVENTS
+ };
+
+ HANDLE waitHandles[NUM_EVENTS];
+
+ waitHandles[CALL_FROM_MASTER] = GetCallHandle().GetHandle();
+ waitHandles[SHARED_QUEUE] = m_SharedQueue.GetEventHandle().GetHandle();
+ waitHandles[DIRECT_QUEUE] = m_DirectQueue.GetEventHandle().GetHandle();
+
+#ifdef _DEBUG
+ while ( ( waitResult = WaitForMultipleObjects( ARRAYSIZE(waitHandles), waitHandles, FALSE, 10 ) ) == WAIT_TIMEOUT )
+ {
+ waitResult = waitResult; // break here
+ }
+#else
+ waitResult = WaitForMultipleObjects( ARRAYSIZE(waitHandles), waitHandles, FALSE, INFINITE );
+#endif
+#else // !win32
+ bool bSet = false;
+ int nWaitTime = 100;
+
+ while( !bSet )
+ {
+ // Jobs are typically enqueued to the shared job queue so wait on it first.
+ bSet = m_SharedQueue.GetEventHandle().Wait( nWaitTime );
+ if( !bSet )
+ bSet = m_DirectQueue.GetEventHandle().Wait( 10 );
+ if ( !bSet )
+ bSet = GetCallHandle().Wait( 0 );
+ }
+
+ if ( !bSet )
+ waitResult = WAIT_TIMEOUT;
+ else
+ waitResult = WAIT_OBJECT_0;
+#endif
+ return waitResult;
+ }
+
+ int Run()
+ {
+
+
+ // Wait for either a call from the master thread, or an item in the queue...
+ unsigned waitResult;
+ bool bExit = false;
+
+ tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s", __FUNCTION__ );
+
+ m_pOwner->m_nIdleThreads++;
+ m_IdleEvent.Set();
+ while (!bExit && ( ( waitResult = Wait() ) != WAIT_FAILED ) )
+ {
+ if ( PeekCall() )
+ {
+ CFunctor *pFunctor = NULL;
+ tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s PeekCall():%d", __FUNCTION__, GetCallParam() );
+
+ switch ( GetCallParam( &pFunctor ) )
+ {
+ case TPM_EXIT:
+ Reply( true );
+ bExit = TRUE;
+ break;
+
+ case TPM_SUSPEND:
+ Reply( true );
+ SuspendCooperative();
+ break;
+
+ case TPM_RUNFUNCTOR:
+ if( pFunctor )
+ {
+ ( *pFunctor )();
+ Reply( true );
+ }
+ else
+ {
+ Assert( pFunctor );
+ Reply( false );
+ }
+ break;
+
+ default:
+ AssertMsg( 0, "Unknown call to thread" );
+ Reply( false );
+ break;
+ }
+ }
+ else
+ {
+ tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s !PeekCall()", __FUNCTION__ );
+
+ CJob *pJob;
+ bool bTookJob = false;
+ do
+ {
+ if ( !m_DirectQueue.Pop( &pJob) )
+ {
+ if ( !m_SharedQueue.Pop( &pJob ) )
+ {
+ // Nothing to process, return to wait state
+ break;
+ }
+ }
+ if ( !bTookJob )
+ {
+ m_IdleEvent.Reset();
+ m_pOwner->m_nIdleThreads--;
+ bTookJob = true;
+ }
+ ServiceJobAndRelease( pJob, m_iThread );
+ m_pOwner->m_nJobs--;
+ } while ( !PeekCall() );
+
+ if ( bTookJob )
+ {
+ m_pOwner->m_nIdleThreads++;
+ m_IdleEvent.Set();
+ }
+ }
+ }
+ m_pOwner->m_nIdleThreads--;
+ m_IdleEvent.Reset();
+ return 0;
+ }
+
+ CJobQueue m_DirectQueue;
+ CJobQueue & m_SharedQueue;
+ CThreadPool * m_pOwner;
+ CThreadManualEvent m_IdleEvent;
+ int m_iThread;
+};
+
+//-----------------------------------------------------------------------------
+
+CGlobalThreadPool g_ThreadPool;
+IThreadPool *g_pThreadPool = &g_ThreadPool;
+
+//-----------------------------------------------------------------------------
+//
+// CThreadPool
+//
+//-----------------------------------------------------------------------------
+
+CThreadPool::CThreadPool() :
+ m_nIdleThreads( 0 ),
+ m_nJobs( 0 ),
+ m_nSuspend( 0 )
+{
+}
+
+//---------------------------------------------------------
+
+CThreadPool::~CThreadPool()
+{
+ Stop();
+}
+
+//---------------------------------------------------------
+//
+//---------------------------------------------------------
+int CThreadPool::NumThreads()
+{
+ return m_Threads.Count();
+}
+
+//---------------------------------------------------------
+//
+//---------------------------------------------------------
+int CThreadPool::NumIdleThreads()
+{
+ return m_nIdleThreads;
+}
+
+void CThreadPool::ExecuteHighPriorityFunctor( CFunctor *pFunctor )
+{
+ int i;
+ for ( i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->CallWorker( TPM_RUNFUNCTOR, 0, false, pFunctor );
+ }
+
+ for ( i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->WaitForReply();
+ }
+}
+
+//---------------------------------------------------------
+// Pause/resume processing jobs
+//---------------------------------------------------------
+int CThreadPool::SuspendExecution()
+{
+ AUTO_LOCK( m_SuspendMutex );
+
+ // If not already suspended
+ if ( m_nSuspend == 0 )
+ {
+ // Make sure state is correct
+ int i;
+ for ( i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->CallWorker( TPM_SUSPEND, 0 );
+ }
+
+ for ( i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->WaitForReply();
+ }
+
+ // Because worker must signal before suspending, we could reach
+ // here with the thread not actually suspended
+ for ( i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->BWaitForThreadSuspendCooperative();
+ }
+ }
+
+ return m_nSuspend++;
+}
+
+//---------------------------------------------------------
+
+int CThreadPool::ResumeExecution()
+{
+ AUTO_LOCK( m_SuspendMutex );
+ AssertMsg( m_nSuspend >= 1, "Attempted resume when not suspended");
+ int result = m_nSuspend--;
+ if (m_nSuspend == 0 )
+ {
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->ResumeCooperative();
+ }
+ }
+ return result;
+}
+
+//---------------------------------------------------------
+
+void CThreadPool::WaitForIdle( bool bAll )
+{
+ ThreadWaitForEvents( m_IdleEvents.Count(), m_IdleEvents.Base(), bAll, 60000 );
+}
+
+//---------------------------------------------------------
+
+int CThreadPool::YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll, unsigned timeout )
+{
+ tmZone( TELEMETRY_LEVEL0, TMZF_IDLE, "%s(%d) SPINNING %t", __FUNCTION__, timeout, tmSendCallStack( TELEMETRY_LEVEL0, 0 ) );
+
+ Assert( timeout == TT_INFINITE ); // unimplemented
+
+ int result;
+ CJob *pJob;
+ // Always wait for zero milliseconds initially, to let us process jobs on this thread.
+ timeout = 0;
+ while ( ( result = ThreadWaitForEvents( nEvents, pEvents, bWaitAll, timeout ) ) == WAIT_TIMEOUT )
+ {
+ if ( !m_bExecOnThreadPoolThreadsOnly && m_SharedQueue.Pop( &pJob ) )
+ {
+ ServiceJobAndRelease( pJob );
+ m_nJobs--;
+ }
+ else
+ {
+ // Since there are no jobs for the main thread set the timeout to infinite.
+ // The only disadvantage to this is that if a job thread creates a new job
+ // then the main thread will not be available to pick it up, but if that
+ // is a problem you can just create more worker threads. Debugging test runs
+ // of TF2 suggests that jobs are only ever added from the main thread which
+ // means that there is no disadvantage.
+ // Waiting on the events instead of busy spinning has multiple advantages.
+ // It avoids wasting CPU time/electricity, it makes it more obvious in profiles
+ // when the main thread is idle versus busy, and it allows ready thread analysis
+ // in xperf to find out what woke up a waiting thread.
+ // It also avoids unnecessary CPU starvation -- seen on customer traces of TF2.
+ timeout = TT_INFINITE;
+ }
+ }
+ return result;
+}
+
+//---------------------------------------------------------
+
+int CThreadPool::YieldWait( CJob **ppJobs, int nJobs, bool bWaitAll, unsigned timeout )
+{
+ CUtlVectorFixed<CThreadEvent *, 64> handles;
+ if ( nJobs > handles.NumAllocated() - 2 )
+ {
+ return TW_FAILED;
+ }
+
+ for ( int i = 0; i < nJobs; i++ )
+ {
+ handles.AddToTail( ppJobs[i]->AccessEvent() );
+ }
+
+ return YieldWait( handles.Base(), handles.Count(), bWaitAll, timeout);
+}
+
+//---------------------------------------------------------
+
+void CThreadPool::Yield( unsigned timeout )
+{
+ // @MULTICORE (toml 10/24/2006): not implemented
+ Assert( ThreadInMainThread() );
+ if ( !ThreadInMainThread() )
+ {
+ ThreadSleep( timeout );
+ return;
+ }
+ ThreadSleep( timeout );
+}
+
+//---------------------------------------------------------
+// Add a job to the queue
+//---------------------------------------------------------
+
+void CThreadPool::AddJob( CJob *pJob )
+{
+ if ( !pJob )
+ {
+ return;
+ }
+
+ if ( pJob->m_ThreadPoolData != JOB_NO_DATA )
+ {
+ Warning( "Cannot add a thread job already committed to another thread pool\n" );
+ return;
+ }
+
+ if ( m_Threads.Count() == 0 )
+ {
+ // So only threadpool jobs are supposed to execute the jobs, but there are no threadpool threads?
+ Assert( !m_bExecOnThreadPoolThreadsOnly );
+
+ pJob->Execute();
+ return;
+ }
+
+ int flags = pJob->GetFlags();
+
+ if ( !m_bExecOnThreadPoolThreadsOnly && ( ( flags & ( JF_IO | JF_QUEUE ) ) == 0 ) /* @TBD && !m_queue.Count() */ )
+ {
+ if ( !NumIdleThreads() )
+ {
+ pJob->Execute();
+ return;
+ }
+ pJob->SetPriority( JP_HIGH );
+ }
+
+
+ if ( !pJob->CanExecute() )
+ {
+ // Already handled
+ ExecuteOnce( Warning( "Attempted to add job to job queue that has already been completed\n" ) );
+ return;
+ }
+
+ pJob->m_pThreadPool = this;
+ pJob->m_status = JOB_STATUS_PENDING;
+ InsertJobInQueue( pJob );
+ ++m_nJobs;
+}
+
+//---------------------------------------------------------
+//
+//---------------------------------------------------------
+
+void CThreadPool::InsertJobInQueue( CJob *pJob )
+{
+ CJobQueue *pQueue;
+
+ if ( !( pJob->GetFlags() & JF_SERIAL ) )
+ {
+ int iThread = pJob->GetServiceThread();
+ if ( iThread == -1 || !m_Threads.IsValidIndex( iThread ) )
+ {
+ pQueue = &m_SharedQueue;
+ }
+ else
+ {
+ pQueue = &(m_Threads[iThread]->AccessDirectQueue());
+ }
+ }
+ else
+ {
+ pQueue = &(m_Threads[0]->AccessDirectQueue());
+ }
+
+ m_nJobs -= pQueue->Push( pJob );
+}
+
+//---------------------------------------------------------
+// Add an function object to the queue (master thread)
+//---------------------------------------------------------
+
+void CThreadPool::AddFunctorInternal( CFunctor *pFunctor, CJob **ppJob, const char *pszDescription, unsigned flags )
+{
+ // Note: assumes caller has handled refcount
+ CJob *pJob = new CFunctorJob( pFunctor, pszDescription );
+
+ pJob->SetFlags( flags );
+
+ AddJob( pJob );
+
+ if ( ppJob )
+ {
+ *ppJob = pJob;
+ }
+ else
+ {
+ pJob->Release();
+ }
+}
+
+//---------------------------------------------------------
+// Remove a job from the queue
+//---------------------------------------------------------
+
+void CThreadPool::ChangePriority( CJob *pJob, JobPriority_t priority )
+{
+ // Right now, only support upping the priority
+ if ( pJob->GetPriority() < priority )
+ {
+ pJob->SetPriority( priority );
+ m_SharedQueue.Push( pJob );
+ }
+ else
+ {
+ ExecuteOnce( if ( pJob->GetPriority() != priority ) DevMsg( "CThreadPool::RemoveJob not implemented right now" ) );
+ }
+
+}
+
+//---------------------------------------------------------
+// Execute to a specified priority
+//---------------------------------------------------------
+
+int CThreadPool::ExecuteToPriority( JobPriority_t iToPriority, JobFilter_t pfnFilter )
+{
+ SuspendExecution();
+
+ CJob *pJob;
+ int nExecuted = 0;
+ int i;
+ int nJobsTotal = GetJobCount();
+ CUtlVector<CJob *> jobsToPutBack;
+
+ for ( int iCurPriority = JP_HIGH; iCurPriority >= iToPriority; --iCurPriority )
+ {
+ for ( i = 0; i < m_Threads.Count(); i++ )
+ {
+ CJobQueue &queue = m_Threads[i]->AccessDirectQueue();
+ while ( queue.Count( (JobPriority_t)iCurPriority ) )
+ {
+ queue.Pop( &pJob );
+ if ( pfnFilter && !(*pfnFilter)( pJob ) )
+ {
+ if ( pJob->CanExecute() )
+ {
+ jobsToPutBack.EnsureCapacity( nJobsTotal );
+ jobsToPutBack.AddToTail( pJob );
+ }
+ else
+ {
+ m_nJobs--;
+ pJob->Release(); // an already serviced job in queue, may as well ditch it (as in, main thread probably force executed)
+ }
+ continue;
+ }
+ ServiceJobAndRelease( pJob );
+ m_nJobs--;
+ nExecuted++;
+ }
+
+ }
+
+ while ( m_SharedQueue.Count( (JobPriority_t)iCurPriority ) )
+ {
+ m_SharedQueue.Pop( &pJob );
+ if ( pfnFilter && !(*pfnFilter)( pJob ) )
+ {
+ if ( pJob->CanExecute() )
+ {
+ jobsToPutBack.EnsureCapacity( nJobsTotal );
+ jobsToPutBack.AddToTail( pJob );
+ }
+ else
+ {
+ m_nJobs--;
+ pJob->Release(); // see above
+ }
+ continue;
+ }
+
+ ServiceJobAndRelease( pJob );
+ m_nJobs--;
+ nExecuted++;
+ }
+ }
+
+ for ( i = 0; i < jobsToPutBack.Count(); i++ )
+ {
+ InsertJobInQueue( jobsToPutBack[i] );
+ jobsToPutBack[i]->Release();
+ }
+
+ ResumeExecution();
+
+ return nExecuted;
+}
+
+//---------------------------------------------------------
+//
+//---------------------------------------------------------
+
+int CThreadPool::AbortAll()
+{
+ SuspendExecution();
+ CJob *pJob;
+
+ int iAborted = 0;
+ while ( m_SharedQueue.Pop( &pJob ) )
+ {
+ pJob->Abort();
+ pJob->Release();
+ iAborted++;
+ }
+
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+ CJobQueue &queue = m_Threads[i]->AccessDirectQueue();
+ while ( queue.Pop( &pJob ) )
+ {
+ pJob->Abort();
+ pJob->Release();
+ iAborted++;
+ }
+
+ }
+
+ m_nJobs = 0;
+
+ ResumeExecution();
+
+ return iAborted;
+}
+
+//---------------------------------------------------------
+// CThreadPool thread functions
+//---------------------------------------------------------
+
+bool CThreadPool::Start( const ThreadPoolStartParams_t &startParams, const char *pszName )
+{
+ int nThreads = startParams.nThreads;
+
+ m_bExecOnThreadPoolThreadsOnly = startParams.bExecOnThreadPoolThreadsOnly;
+
+ if ( nThreads < 0 )
+ {
+ const CPUInformation &ci = *GetCPUInformation();
+ if ( startParams.bIOThreads )
+ {
+ nThreads = ci.m_nLogicalProcessors;
+ }
+ else
+ {
+ nThreads = ( ci.m_nLogicalProcessors / (( ci.m_bHT ) ? 2 : 1) ) - 1; // One per
+ if ( IsPC() )
+ {
+ if ( nThreads > 3 )
+ {
+ DevMsg( "Defaulting to limit of 3 worker threads, use -threads on command line if want more\n" ); // Current >4 processor configs don't really work so well, probably due to cache issues? (toml 7/12/2007)
+ nThreads = 3;
+ }
+ }
+ }
+
+ if ( ( startParams.nThreadsMax >= 0 ) && ( nThreads > startParams.nThreadsMax ) )
+ {
+ nThreads = startParams.nThreadsMax;
+ }
+ }
+
+ if ( nThreads <= 0 )
+ {
+ return true;
+ }
+
+ int nStackSize = startParams.nStackSize;
+
+ if ( nStackSize < 0 )
+ {
+ if ( startParams.bIOThreads )
+ {
+ nStackSize = IO_STACKSIZE;
+ }
+ else
+ {
+ nStackSize = COMPUTATION_STACKSIZE;
+ }
+ }
+
+ int priority = startParams.iThreadPriority;
+
+ if ( priority == SHRT_MIN )
+ {
+ if ( startParams.bIOThreads )
+ {
+ priority = THREAD_PRIORITY_HIGHEST;
+ }
+ else
+ {
+ priority = ThreadGetPriority();
+ }
+ }
+
+ bool bDistribute;
+ if ( startParams.fDistribute != TRS_NONE )
+ {
+ bDistribute = ( startParams.fDistribute == TRS_TRUE );
+ }
+ else
+ {
+ bDistribute = !startParams.bIOThreads;
+ }
+
+ //--------------------------------------------------------
+
+ m_Threads.EnsureCapacity( nThreads );
+ m_IdleEvents.EnsureCapacity( nThreads );
+
+ if ( !pszName )
+ {
+ pszName = ( startParams.bIOThreads ) ? "IOJobX" : "CmpJobX";
+ }
+ while ( nThreads-- )
+ {
+ int iThread = m_Threads.AddToTail();
+ m_IdleEvents.AddToTail();
+ m_Threads[iThread] = new CJobThread( this, iThread );
+ m_IdleEvents[iThread] = &m_Threads[iThread]->GetIdleEvent();
+ m_Threads[iThread]->SetName( CFmtStr( "%s%d", pszName, iThread ) );
+ m_Threads[iThread]->Start( nStackSize );
+ m_Threads[iThread]->GetIdleEvent().Wait();
+#ifdef WIN32
+ ThreadSetPriority( (ThreadHandle_t)m_Threads[iThread]->GetThreadHandle(), priority );
+#endif
+ }
+
+ Distribute( bDistribute, startParams.bUseAffinityTable ? (int *)startParams.iAffinityTable : NULL );
+
+ return true;
+}
+
+//---------------------------------------------------------
+
+void CThreadPool::Distribute( bool bDistribute, int *pAffinityTable )
+{
+ if ( bDistribute )
+ {
+ const CPUInformation &ci = *GetCPUInformation();
+ int nHwThreadsPer = (( ci.m_bHT ) ? 2 : 1);
+ if ( ci.m_nLogicalProcessors > 1 )
+ {
+ if ( !pAffinityTable )
+ {
+#if defined( IS_WINDOWS_PC )
+ // no affinity table, distribution is cycled across all available
+ HINSTANCE hInst = LoadLibrary( "kernel32.dll" );
+ if ( hInst )
+ {
+ typedef DWORD (WINAPI *SetThreadIdealProcessorFn)(ThreadHandle_t hThread, DWORD dwIdealProcessor);
+ SetThreadIdealProcessorFn Thread_SetIdealProcessor = (SetThreadIdealProcessorFn)GetProcAddress( hInst, "SetThreadIdealProcessor" );
+ if ( Thread_SetIdealProcessor )
+ {
+ ThreadHandle_t hMainThread = ThreadGetCurrentHandle();
+ Thread_SetIdealProcessor( hMainThread, 0 );
+ int iProc = 0;
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+ iProc += nHwThreadsPer;
+ if ( iProc >= ci.m_nLogicalProcessors )
+ {
+ iProc %= ci.m_nLogicalProcessors;
+ if ( nHwThreadsPer > 1 )
+ {
+ iProc = ( iProc + 1 ) % nHwThreadsPer;
+ }
+ }
+ Thread_SetIdealProcessor((ThreadHandle_t)m_Threads[i]->GetThreadHandle(), iProc);
+ }
+ }
+ FreeLibrary( hInst );
+ }
+#else
+ // no affinity table, distribution is cycled across all available
+ int iProc = 0;
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+ iProc += nHwThreadsPer;
+ if ( iProc >= ci.m_nLogicalProcessors )
+ {
+ iProc %= ci.m_nLogicalProcessors;
+ if ( nHwThreadsPer > 1 )
+ {
+ iProc = ( iProc + 1 ) % nHwThreadsPer;
+ }
+ }
+#ifdef WIN32
+ ThreadSetAffinity( (ThreadHandle_t)m_Threads[i]->GetThreadHandle(), 1 << iProc );
+#endif
+ }
+#endif
+ }
+ else
+ {
+ // distribution is from affinity table
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+#ifdef WIN32
+ ThreadSetAffinity( (ThreadHandle_t)m_Threads[i]->GetThreadHandle(), pAffinityTable[i] );
+#endif
+ }
+ }
+ }
+ }
+ else
+ {
+#ifdef WIN32
+ DWORD_PTR dwProcessAffinity, dwSystemAffinity;
+ if ( GetProcessAffinityMask( GetCurrentProcess(), &dwProcessAffinity, &dwSystemAffinity ) )
+ {
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+ ThreadSetAffinity( (ThreadHandle_t)m_Threads[i]->GetThreadHandle(), dwProcessAffinity );
+ }
+ }
+#endif
+ }
+}
+
+//---------------------------------------------------------
+
+bool CThreadPool::Stop( int timeout )
+{
+ for ( int i = 0; i < m_Threads.Count(); i++ )
+ {
+ m_Threads[i]->CallWorker( TPM_EXIT );
+ }
+
+ for ( int i = 0; i < m_Threads.Count(); ++i )
+ {
+ while( m_Threads[i]->IsAlive() )
+ {
+ ThreadSleep( 0 );
+ }
+ delete m_Threads[i];
+ }
+
+ m_nJobs = 0;
+ m_SharedQueue.Flush();
+ m_nIdleThreads = 0;
+ m_Threads.RemoveAll();
+ m_IdleEvents.RemoveAll();
+
+ return true;
+}
+
+//---------------------------------------------------------
+
+CJob *CThreadPool::GetDummyJob()
+{
+ class CDummyJob : public CJob
+ {
+ public:
+ CDummyJob()
+ {
+ Execute();
+ }
+
+ virtual JobStatus_t DoExecute() { return JOB_OK; }
+ };
+
+ static CDummyJob dummyJob;
+
+ dummyJob.AddRef();
+ return &dummyJob;
+}
+
+//-----------------------------------------------------------------------------
+
+
+namespace ThreadPoolTest
+{
+int g_iSleep;
+
+CThreadEvent g_done;
+int g_nTotalToComplete;
+CThreadPool *g_pTestThreadPool;
+
+class CCountJob : public CJob
+{
+public:
+ virtual JobStatus_t DoExecute()
+ {
+ m_nCount++;
+ ThreadPause();
+ if ( g_iSleep >= 0)
+ ThreadSleep( g_iSleep );
+ if ( bDoWork )
+ {
+ byte pMemory[1024];
+ int i;
+ for ( i = 0; i < 1024; i++ )
+ {
+ pMemory[i] = rand();
+ }
+ for ( i = 0; i < 50; i++ )
+ {
+ sqrt( (float)HashBlock( pMemory, 1024 ) + HashBlock( pMemory, 1024 ) + 10.0 );
+ }
+ bDoWork = false;
+ }
+ if ( m_nCount == g_nTotalToComplete )
+ g_done.Set();
+ return 0;
+ }
+
+ static CInterlockedInt m_nCount;
+ bool bDoWork;
+};
+CInterlockedInt CCountJob::m_nCount;
+int g_nTotalAtFinish;
+
+void Test( bool bDistribute, bool bSleep = true, bool bFinishExecute = false, bool bDoWork = false )
+{
+ for ( int bInterleavePushPop = 0; bInterleavePushPop < 2; bInterleavePushPop++ )
+ {
+ for ( g_iSleep = -10; g_iSleep <= 10; g_iSleep += 10 )
+ {
+ Msg( "ThreadPoolTest: Testing! Sleep %d, interleave %d \n", g_iSleep, bInterleavePushPop );
+ int nMaxThreads = ( IsX360() ) ? 6 : 8;
+ int nIncrement = ( IsX360() ) ? 1 : 2;
+ for ( int i = 1; i <= nMaxThreads; i += nIncrement )
+ {
+ CCountJob::m_nCount = 0;
+ g_nTotalAtFinish = 0;
+ ThreadPoolStartParams_t params;
+ params.nThreads = i;
+ params.fDistribute = ( bDistribute) ? TRS_TRUE : TRS_FALSE;
+ g_pTestThreadPool->Start( params, "Tst" );
+ if ( !bInterleavePushPop )
+ {
+ g_pTestThreadPool->SuspendExecution();
+ }
+
+ CCountJob jobs[4000];
+ g_nTotalToComplete = ARRAYSIZE(jobs);
+
+ CFastTimer timer, suspendTimer;
+
+ suspendTimer.Start();
+ timer.Start();
+ for ( int j = 0; j < ARRAYSIZE(jobs); j++ )
+ {
+ jobs[j].SetFlags( JF_QUEUE );
+ jobs[j].bDoWork = bDoWork;
+ g_pTestThreadPool->AddJob( &jobs[j] );
+ if ( bSleep && j % 16 == 0 )
+ {
+ ThreadSleep( 0 );
+ }
+ }
+ if ( !bInterleavePushPop )
+ {
+ g_pTestThreadPool->ResumeExecution();
+ }
+ if ( bFinishExecute && g_iSleep <= 1 )
+ {
+ g_done.Wait();
+ }
+ g_nTotalAtFinish = CCountJob::m_nCount;
+ timer.End();
+ g_pTestThreadPool->SuspendExecution();
+ suspendTimer.End();
+ g_pTestThreadPool->ResumeExecution();
+ g_pTestThreadPool->Stop();
+ g_done.Reset();
+
+ int counts[8] = { 0 };
+ for ( int j = 0; j < ARRAYSIZE(jobs); j++ )
+ {
+ if ( jobs[j].GetServiceThread() != -1 )
+ {
+ counts[jobs[j].GetServiceThread()]++;
+ jobs[j].ClearServiceThread();
+ }
+ }
+
+ Msg( "ThreadPoolTest: %d threads -- %d (%d) jobs processed in %fms, %fms to suspend (%f/%f) [%d, %d, %d, %d, %d, %d, %d, %d]\n",
+ i, g_nTotalAtFinish, (int)CCountJob::m_nCount, timer.GetDuration().GetMillisecondsF(), suspendTimer.GetDuration().GetMillisecondsF() - timer.GetDuration().GetMillisecondsF(),
+ timer.GetDuration().GetMillisecondsF() / (float)CCountJob::m_nCount, (suspendTimer.GetDuration().GetMillisecondsF())/(float)g_nTotalAtFinish,
+ counts[0], counts[1], counts[2], counts[3], counts[4], counts[5], counts[6], counts[7] );
+ }
+ }
+ }
+}
+
+
+bool g_bOutputError;
+volatile int g_ReadyToExecute;
+CInterlockedInt g_nReady;
+
+class CExecuteTestJob : public CJob
+{
+public:
+ virtual JobStatus_t DoExecute()
+ {
+ byte pMemory[1024];
+ int i;
+ for ( i = 0; i < 1024; i++ )
+ {
+ pMemory[i] = rand();
+ }
+ for ( i = 0; i < 50; i++ )
+ {
+ sqrt( (float)HashBlock( pMemory, 1024 ) + HashBlock( pMemory, 1024 ) + 10.0 );
+ }
+ if ( AccessEvent()->Check() || IsFinished() )
+ {
+ if ( !g_bOutputError )
+ {
+ Msg( "Forced execute test failed!\n" );
+ DebuggerBreakIfDebugging();
+ }
+ }
+ return 0;
+ }
+};
+
+class CExecuteTestExecuteJob : public CJob
+{
+public:
+ virtual JobStatus_t DoExecute()
+ {
+ bool bAbort = ( RandomInt( 1, 10 ) == 1 );
+ g_nReady++;
+ while ( !g_ReadyToExecute )
+ {
+ ThreadPause();
+ }
+
+ if ( !bAbort )
+ m_pTestJob->Execute();
+ else
+ m_pTestJob->Abort();
+ g_nReady--;
+ return 0;
+ }
+
+ CExecuteTestJob *m_pTestJob;
+};
+
+
+void TestForcedExecute()
+{
+ Msg( "TestForcedExecute\n" );
+ for ( int tests = 0; tests < 30; tests++ )
+ {
+ for ( int i = 1; i <= 5; i += 2 )
+ {
+ g_nReady = 0;
+ ThreadPoolStartParams_t params;
+ params.nThreads = i;
+ params.fDistribute = TRS_TRUE;
+ g_pTestThreadPool->Start( params, "Tst" );
+
+ static CExecuteTestJob jobs[4000];
+ for ( int j = 0; j < ARRAYSIZE(jobs); j++ )
+ {
+ g_ReadyToExecute = false;
+ for ( int k = 0; k < i; k++ )
+ {
+ CExecuteTestExecuteJob *pJob = new CExecuteTestExecuteJob;
+ pJob->SetFlags( JF_QUEUE );
+ pJob->m_pTestJob = &jobs[j];
+ g_pTestThreadPool->AddJob( pJob );
+ pJob->Release();
+ }
+ while ( g_nReady < i )
+ {
+ ThreadPause();
+ }
+ g_ReadyToExecute = true;
+ ThreadSleep();
+ jobs[j].Execute();
+ while ( g_nReady > 0 )
+ {
+ ThreadPause();
+ }
+ }
+ g_pTestThreadPool->Stop();
+ }
+ }
+ Msg( "TestForcedExecute DONE\n" );
+}
+
+} // namespace ThreadPoolTest
+
+void RunThreadPoolTests()
+{
+ CThreadPool pool;
+ ThreadPoolTest::g_pTestThreadPool = &pool;
+ RunTSQueueTests(10000);
+ RunTSListTests(10000);
+
+#ifdef _WIN32
+ DWORD_PTR mask1 = 0;
+ --mask1;
+ DWORD_PTR mask2 = 0;
+ --mask2;
+ GetProcessAffinityMask( GetCurrentProcess(), &mask1, &mask2 );
+#else
+ int32 mask1=-1;
+#endif
+ Msg( "ThreadPoolTest: Job distribution speed\n" );
+ for ( int i = 0; i < 2; i++ )
+ {
+ bool bToCompletion = ( i % 2 != 0 );
+ if ( !IsX360() )
+ {
+ Msg( "ThreadPoolTest: Non-distribute\n" );
+ ThreadPoolTest::Test( false, true, bToCompletion );
+ }
+
+ Msg( "ThreadPoolTest: Distribute\n" );
+ ThreadPoolTest::Test( true, true, bToCompletion );
+
+ Msg( "ThreadPoolTest: One core\n" );
+ ThreadSetAffinity( 0, 1 );
+ ThreadPoolTest::Test( false, true, bToCompletion );
+ ThreadSetAffinity( 0, mask1 );
+
+ Msg( "ThreadPoolTest: NO Sleep\n" );
+ ThreadPoolTest::Test( false, false, bToCompletion );
+
+ Msg( "ThreadPoolTest: Distribute\n" );
+ ThreadPoolTest::Test( true, false, bToCompletion );
+
+ Msg( "ThreadPoolTest: One core\n" );
+ ThreadSetAffinity( 0, 1 );
+ ThreadPoolTest::Test( false, false, bToCompletion );
+ ThreadSetAffinity( 0, mask1 );
+ }
+
+ Msg( "ThreadPoolTest: Jobs doing work\n" );
+ for ( int i = 0; i < 2; i++ )
+ {
+ bool bToCompletion = true;// = ( i % 2 != 0 );
+ if ( !IsX360() )
+ {
+ Msg( "ThreadPoolTest: Non-distribute\n" );
+ ThreadPoolTest::Test( false, true, bToCompletion, true );
+ }
+
+ Msg( "ThreadPoolTest: Distribute\n" );
+ ThreadPoolTest::Test( true, true, bToCompletion, true );
+
+ Msg( "ThreadPoolTest: One core\n" );
+ ThreadSetAffinity( 0, 1 );
+ ThreadPoolTest::Test( false, true, bToCompletion, true );
+ ThreadSetAffinity( 0, mask1 );
+
+ Msg( "ThreadPoolTest: NO Sleep\n" );
+ ThreadPoolTest::Test( false, false, bToCompletion, true );
+
+ Msg( "ThreadPoolTest: Distribute\n" );
+ ThreadPoolTest::Test( true, false, bToCompletion, true );
+
+ Msg( "ThreadPoolTest: One core\n" );
+ ThreadSetAffinity( 0, 1 );
+ ThreadPoolTest::Test( false, false, bToCompletion, true );
+ ThreadSetAffinity( 0, mask1 );
+ }
+#ifdef _WIN32
+ GetProcessAffinityMask( GetCurrentProcess(), &mask1, &mask2 );
+#endif
+
+ ThreadPoolTest::TestForcedExecute();
+}