diff options
| author | Jørgen P. Tjernø <[email protected]> | 2013-12-02 19:31:46 -0800 |
|---|---|---|
| committer | Jørgen P. Tjernø <[email protected]> | 2013-12-02 19:46:31 -0800 |
| commit | f56bb35301836e56582a575a75864392a0177875 (patch) | |
| tree | de61ddd39de3e7df52759711950b4c288592f0dc /sp/src/public/vstdlib/jobthread.h | |
| parent | Mark some more files as text. (diff) | |
| download | source-sdk-2013-f56bb35301836e56582a575a75864392a0177875.tar.xz source-sdk-2013-f56bb35301836e56582a575a75864392a0177875.zip | |
Fix line endings. WHAMMY.
Diffstat (limited to 'sp/src/public/vstdlib/jobthread.h')
| -rw-r--r-- | sp/src/public/vstdlib/jobthread.h | 2688 |
1 files changed, 1344 insertions, 1344 deletions
diff --git a/sp/src/public/vstdlib/jobthread.h b/sp/src/public/vstdlib/jobthread.h index ed453cf9..559937b3 100644 --- a/sp/src/public/vstdlib/jobthread.h +++ b/sp/src/public/vstdlib/jobthread.h @@ -1,1344 +1,1344 @@ -//========= Copyright Valve Corporation, All rights reserved. ============//
-//
-// Purpose: A utility for a discrete job-oriented worker thread.
-//
-// The class CThreadPool is both the job queue, and the
-// worker thread. Except when the main thread attempts to
-// synchronously execute a job, most of the inter-thread locking
-// on the queue.
-//
-// The queue threading model uses a manual reset event for optimal
-// throughput. Adding to the queue is guarded by a semaphore that
-// will block the inserting thread if the queue has overflown.
-// This prevents the worker thread from being starved out even if
-// not running at a higher priority than the master thread.
-//
-// The thread function waits for jobs, services jobs, and manages
-// communication between the worker and master threads. The nature
-// of the work is opaque to the Executer.
-//
-// CJob instances actually do the work. The base class
-// calls virtual methods for job primitives, so derivations don't
-// need to worry about threading models. All of the variants of
-// job and OS can be expressed in this hierarchy. Instances of
-// CJob are the items placed in the queue, and by
-// overriding the job primitives they are the manner by which
-// users of the Executer control the state of the job.
-//
-//=============================================================================
-
-#include <limits.h>
-#include "tier0/threadtools.h"
-#include "tier1/refcount.h"
-#include "tier1/utllinkedlist.h"
-#include "tier1/utlvector.h"
-#include "tier1/functors.h"
-#include "tier0/vprof_telemetry.h"
-
-#include "vstdlib/vstdlib.h"
-
-#ifndef JOBTHREAD_H
-#define JOBTHREAD_H
-
-#ifdef AddJob // windows.h print function collisions
-#undef AddJob
-#undef GetJob
-#endif
-
-#ifdef VSTDLIB_DLL_EXPORT
-#define JOB_INTERFACE DLL_EXPORT
-#define JOB_OVERLOAD DLL_GLOBAL_EXPORT
-#define JOB_CLASS DLL_CLASS_EXPORT
-#else
-#define JOB_INTERFACE DLL_IMPORT
-#define JOB_OVERLOAD DLL_GLOBAL_IMPORT
-#define JOB_CLASS DLL_CLASS_IMPORT
-#endif
-
-#if defined( _WIN32 )
-#pragma once
-#endif
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-
-class CJob;
-
-//-----------------------------------------------------------------------------
-//
-//-----------------------------------------------------------------------------
-enum JobStatusEnum_t
-{
- // Use negative for errors
- JOB_OK, // operation is successful
- JOB_STATUS_PENDING, // file is properly queued, waiting for service
- JOB_STATUS_INPROGRESS, // file is being accessed
- JOB_STATUS_ABORTED, // file was aborted by caller
- JOB_STATUS_UNSERVICED, // file is not yet queued
-};
-
-typedef int JobStatus_t;
-
-enum JobFlags_t
-{
- JF_IO = ( 1 << 0 ), // The job primarily blocks on IO or hardware
- JF_BOOST_THREAD = ( 1 << 1 ), // Up the thread priority to max allowed while processing task
- JF_SERIAL = ( 1 << 2 ), // Job cannot be executed out of order relative to other "strict" jobs
- JF_QUEUE = ( 1 << 3 ), // Queue it, even if not an IO job
-};
-
-enum JobPriority_t
-{
- JP_LOW,
- JP_NORMAL,
- JP_HIGH
-};
-
-#define TP_MAX_POOL_THREADS 64
-struct ThreadPoolStartParams_t
-{
- ThreadPoolStartParams_t( bool bIOThreads = false, unsigned nThreads = -1, int *pAffinities = NULL, ThreeState_t fDistribute = TRS_NONE, unsigned nStackSize = -1, int iThreadPriority = SHRT_MIN )
- : bIOThreads( bIOThreads ), nThreads( nThreads ), fDistribute( fDistribute ), nStackSize( nStackSize ), iThreadPriority( iThreadPriority ), nThreadsMax( -1 )
- {
- bExecOnThreadPoolThreadsOnly = false;
-
- bUseAffinityTable = ( pAffinities != NULL ) && ( fDistribute == TRS_TRUE ) && ( nThreads != -1 );
- if ( bUseAffinityTable )
- {
- // user supplied an optional 1:1 affinity mapping to override normal distribute behavior
- nThreads = MIN( TP_MAX_POOL_THREADS, nThreads );
- for ( unsigned int i = 0; i < nThreads; i++ )
- {
- iAffinityTable[i] = pAffinities[i];
- }
- }
- }
-
- int nThreads;
- int nThreadsMax;
- ThreeState_t fDistribute;
- int nStackSize;
- int iThreadPriority;
- int iAffinityTable[TP_MAX_POOL_THREADS];
-
- bool bIOThreads : 1;
- bool bUseAffinityTable : 1;
- bool bExecOnThreadPoolThreadsOnly : 1;
-};
-
-//-----------------------------------------------------------------------------
-//
-// IThreadPool
-//
-//-----------------------------------------------------------------------------
-
-typedef bool (*JobFilter_t)( CJob * );
-
-//---------------------------------------------------------
-// Messages supported through the CallWorker() method
-//---------------------------------------------------------
-enum ThreadPoolMessages_t
-{
- TPM_EXIT, // Exit the thread
- TPM_SUSPEND, // Suspend after next operation
- TPM_RUNFUNCTOR, // Run functor, reply when done.
-};
-
-//---------------------------------------------------------
-
-abstract_class IThreadPool : public IRefCounted
-{
-public:
- virtual ~IThreadPool() {};
-
- //-----------------------------------------------------
- // Thread functions
- //-----------------------------------------------------
- virtual bool Start( const ThreadPoolStartParams_t &startParams = ThreadPoolStartParams_t() ) = 0;
- virtual bool Stop( int timeout = TT_INFINITE ) = 0;
-
- //-----------------------------------------------------
- // Functions for any thread
- //-----------------------------------------------------
- virtual unsigned GetJobCount() = 0;
- virtual int NumThreads() = 0;
- virtual int NumIdleThreads() = 0;
-
- //-----------------------------------------------------
- // Pause/resume processing jobs
- //-----------------------------------------------------
- virtual int SuspendExecution() = 0;
- virtual int ResumeExecution() = 0;
-
- //-----------------------------------------------------
- // Offer the current thread to the pool
- //-----------------------------------------------------
- virtual int YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
- virtual int YieldWait( CJob **, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0;
- virtual void Yield( unsigned timeout ) = 0;
-
- bool YieldWait( CThreadEvent &event, unsigned timeout = TT_INFINITE );
- bool YieldWait( CJob *, unsigned timeout = TT_INFINITE );
-
- //-----------------------------------------------------
- // Add a native job to the queue (master thread)
- //-----------------------------------------------------
- virtual void AddJob( CJob * ) = 0;
-
- //-----------------------------------------------------
- // All threads execute pFunctor asap. Thread will either wake up
- // and execute or execute pFunctor right after completing current job and
- // before looking for another job.
- //-----------------------------------------------------
- virtual void ExecuteHighPriorityFunctor( CFunctor *pFunctor ) = 0;
-
- //-----------------------------------------------------
- // Add an function object to the queue (master thread)
- //-----------------------------------------------------
- virtual void AddFunctor( CFunctor *pFunctor, CJob **ppJob = NULL, const char *pszDescription = NULL, unsigned flags = 0 ) { AddFunctorInternal( RetAddRef( pFunctor ), ppJob, pszDescription, flags ); }
-
- //-----------------------------------------------------
- // Change the priority of an active job
- //-----------------------------------------------------
- virtual void ChangePriority( CJob *p, JobPriority_t priority ) = 0;
-
- //-----------------------------------------------------
- // Bulk job manipulation (blocking)
- //-----------------------------------------------------
- int ExecuteAll( JobFilter_t pfnFilter = NULL ) { return ExecuteToPriority( JP_LOW, pfnFilter ); }
- virtual int ExecuteToPriority( JobPriority_t toPriority, JobFilter_t pfnFilter = NULL ) = 0;
- virtual int AbortAll() = 0;
-
- //-----------------------------------------------------
- virtual void Reserved1() = 0;
-
- //-----------------------------------------------------
- // Add an arbitrary call to the queue (master thread)
- //
- // Avert thy eyes! Imagine rather:
- //
- // CJob *AddCall( <function>, [args1, [arg2,]...]
- // CJob *AddCall( <object>, <function>, [args1, [arg2,]...]
- // CJob *AddRefCall( <object>, <function>, [args1, [arg2,]...]
- // CJob *QueueCall( <function>, [args1, [arg2,]...]
- // CJob *QueueCall( <object>, <function>, [args1, [arg2,]...]
- //-----------------------------------------------------
-
- #define DEFINE_NONMEMBER_ADD_CALL(N) \
- template <typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *AddCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- if ( !NumIdleThreads() ) \
- { \
- pJob = GetDummyJob(); \
- FunctorDirectCall( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
- } \
- \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_MEMBER_ADD_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- if ( !NumIdleThreads() ) \
- { \
- pJob = GetDummyJob(); \
- FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
- } \
- \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_CONST_MEMBER_ADD_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- if ( !NumIdleThreads() ) \
- { \
- pJob = GetDummyJob(); \
- FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
- } \
- \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_REF_COUNTING_MEMBER_ADD_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- if ( !NumIdleThreads() ) \
- { \
- pJob = GetDummyJob(); \
- FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
- } \
- \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- if ( !NumIdleThreads() ) \
- { \
- pJob = GetDummyJob(); \
- FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \
- } \
- \
- return pJob; \
- }
-
- //-----------------------------------------------------------------------------
-
- #define DEFINE_NONMEMBER_QUEUE_CALL(N) \
- template <typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *QueueCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_MEMBER_QUEUE_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_CONST_MEMBER_QUEUE_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
- return pJob; \
- }
-
- //-------------------------------------
-
- #define DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \
- CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- CJob *pJob; \
- AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \
- \
- return pJob; \
- }
-
- FUNC_GENERATE_ALL( DEFINE_NONMEMBER_ADD_CALL );
- FUNC_GENERATE_ALL( DEFINE_MEMBER_ADD_CALL );
- FUNC_GENERATE_ALL( DEFINE_CONST_MEMBER_ADD_CALL );
- FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_MEMBER_ADD_CALL );
- FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL );
- FUNC_GENERATE_ALL( DEFINE_NONMEMBER_QUEUE_CALL );
- FUNC_GENERATE_ALL( DEFINE_MEMBER_QUEUE_CALL );
- FUNC_GENERATE_ALL( DEFINE_CONST_MEMBER_QUEUE_CALL );
- FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL );
- FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL );
-
- #undef DEFINE_NONMEMBER_ADD_CALL
- #undef DEFINE_MEMBER_ADD_CALL
- #undef DEFINE_CONST_MEMBER_ADD_CALL
- #undef DEFINE_REF_COUNTING_MEMBER_ADD_CALL
- #undef DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL
- #undef DEFINE_NONMEMBER_QUEUE_CALL
- #undef DEFINE_MEMBER_QUEUE_CALL
- #undef DEFINE_CONST_MEMBER_QUEUE_CALL
- #undef DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL
- #undef DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL
-
-private:
- virtual void AddFunctorInternal( CFunctor *, CJob ** = NULL, const char *pszDescription = NULL, unsigned flags = 0 ) = 0;
-
- //-----------------------------------------------------
- // Services for internal use by job instances
- //-----------------------------------------------------
- friend class CJob;
-
- virtual CJob *GetDummyJob() = 0;
-
-public:
- virtual void Distribute( bool bDistribute = true, int *pAffinityTable = NULL ) = 0;
-
- virtual bool Start( const ThreadPoolStartParams_t &startParams, const char *pszNameOverride ) = 0;
-};
-
-//-----------------------------------------------------------------------------
-
-JOB_INTERFACE IThreadPool *CreateThreadPool();
-JOB_INTERFACE void DestroyThreadPool( IThreadPool *pPool );
-
-//-------------------------------------
-
-JOB_INTERFACE void RunThreadPoolTests();
-
-//-----------------------------------------------------------------------------
-
-JOB_INTERFACE IThreadPool *g_pThreadPool;
-
-//-----------------------------------------------------------------------------
-// Class to combine the metadata for an operation and the ability to perform
-// the operation. Meant for inheritance. All functions inline, defers to executor
-//-----------------------------------------------------------------------------
-DECLARE_POINTER_HANDLE( ThreadPoolData_t );
-#define JOB_NO_DATA ((ThreadPoolData_t)-1)
-
-class CJob : public CRefCounted1<IRefCounted, CRefCountServiceMT>
-{
-public:
- CJob( JobPriority_t priority = JP_NORMAL )
- : m_status( JOB_STATUS_UNSERVICED ),
- m_ThreadPoolData( JOB_NO_DATA ),
- m_priority( priority ),
- m_flags( 0 ),
- m_pThreadPool( NULL ),
- m_CompleteEvent( true ),
- m_iServicingThread( -1 )
- {
- m_szDescription[ 0 ] = 0;
- }
-
- //-----------------------------------------------------
- // Priority (not thread safe)
- //-----------------------------------------------------
- void SetPriority( JobPriority_t priority ) { m_priority = priority; }
- JobPriority_t GetPriority() const { return m_priority; }
-
- //-----------------------------------------------------
-
- void SetFlags( unsigned flags ) { m_flags = flags; }
- unsigned GetFlags() const { return m_flags; }
-
- //-----------------------------------------------------
-
- void SetServiceThread( int iServicingThread ) { m_iServicingThread = (char)iServicingThread; }
- int GetServiceThread() const { return m_iServicingThread; }
- void ClearServiceThread() { m_iServicingThread = -1; }
-
- //-----------------------------------------------------
- // Fast queries
- //-----------------------------------------------------
- bool Executed() const { return ( m_status == JOB_OK ); }
- bool CanExecute() const { return ( m_status == JOB_STATUS_PENDING || m_status == JOB_STATUS_UNSERVICED ); }
- bool IsFinished() const { return ( m_status != JOB_STATUS_PENDING && m_status != JOB_STATUS_INPROGRESS && m_status != JOB_STATUS_UNSERVICED ); }
- JobStatus_t GetStatus() const { return m_status; }
-
- /// Slam the status to a particular value. This is named "slam" instead of "set,"
- /// to warn you that it should only be used in unusual situations. Otherwise, the
- /// job manager really should manage the status for you, and you should not manhandle it.
- void SlamStatus(JobStatus_t s) { m_status = s; }
-
- //-----------------------------------------------------
- // Try to acquire ownership (to satisfy). If you take the lock, you must either execute or abort.
- //-----------------------------------------------------
- bool TryLock() { return m_mutex.TryLock(); }
- void Lock() { m_mutex.Lock(); }
- void Unlock() { m_mutex.Unlock(); }
-
- //-----------------------------------------------------
- // Thread event support (safe for NULL this to simplify code )
- //-----------------------------------------------------
- bool WaitForFinish( uint32 dwTimeout = TT_INFINITE ) { if (!this) return true; return ( !IsFinished() ) ? g_pThreadPool->YieldWait( this, dwTimeout ) : true; }
- bool WaitForFinishAndRelease( uint32 dwTimeout = TT_INFINITE ) { if (!this) return true; bool bResult = WaitForFinish( dwTimeout); Release(); return bResult; }
- CThreadEvent *AccessEvent() { return &m_CompleteEvent; }
-
- //-----------------------------------------------------
- // Perform the job
- //-----------------------------------------------------
- JobStatus_t Execute();
- JobStatus_t TryExecute();
- JobStatus_t ExecuteAndRelease() { JobStatus_t status = Execute(); Release(); return status; }
- JobStatus_t TryExecuteAndRelease() { JobStatus_t status = TryExecute(); Release(); return status; }
-
- //-----------------------------------------------------
- // Terminate the job, discard if partially or wholly fulfilled
- //-----------------------------------------------------
- JobStatus_t Abort( bool bDiscard = true );
-
- virtual char const *Describe() { return m_szDescription[ 0 ] ? m_szDescription : "Job"; }
- virtual void SetDescription( const char *pszDescription )
- {
- if( pszDescription )
- {
- Q_strncpy( m_szDescription, pszDescription, sizeof( m_szDescription ) );
- }
- else
- {
- m_szDescription[ 0 ] = 0;
- }
- }
-
-private:
- //-----------------------------------------------------
- friend class CThreadPool;
-
- JobStatus_t m_status;
- JobPriority_t m_priority;
- CThreadMutex m_mutex;
- unsigned char m_flags;
- char m_iServicingThread;
- short m_reserved;
- ThreadPoolData_t m_ThreadPoolData;
- IThreadPool * m_pThreadPool;
- CThreadEvent m_CompleteEvent;
- char m_szDescription[ 32 ];
-
-private:
- //-----------------------------------------------------
- CJob( const CJob &fromRequest );
- void operator=(const CJob &fromRequest );
-
- virtual JobStatus_t DoExecute() = 0;
- virtual JobStatus_t DoAbort( bool bDiscard ) { return JOB_STATUS_ABORTED; }
- virtual void DoCleanup() {}
-};
-
-//-----------------------------------------------------------------------------
-
-class CFunctorJob : public CJob
-{
-public:
- CFunctorJob( CFunctor *pFunctor, const char *pszDescription = NULL )
- : m_pFunctor( pFunctor )
- {
- if ( pszDescription )
- {
- Q_strncpy( m_szDescription, pszDescription, sizeof(m_szDescription) );
- }
- else
- {
- m_szDescription[0] = 0;
- }
- }
-
- virtual JobStatus_t DoExecute()
- {
- (*m_pFunctor)();
- return JOB_OK;
- }
-
- const char *Describe()
- {
- return m_szDescription;
- }
-
-private:
- CRefPtr<CFunctor> m_pFunctor;
- char m_szDescription[16];
-};
-
-//-----------------------------------------------------------------------------
-// Utility for managing multiple jobs
-//-----------------------------------------------------------------------------
-
-class CJobSet
-{
-public:
- CJobSet( CJob *pJob = NULL )
- {
- if ( pJob )
- {
- m_jobs.AddToTail( pJob );
- }
- }
-
- CJobSet( CJob **ppJobs, int nJobs )
- {
- if ( ppJobs )
- {
- m_jobs.AddMultipleToTail( nJobs, ppJobs );
- }
- }
-
- ~CJobSet()
- {
- for ( int i = 0; i < m_jobs.Count(); i++ )
- {
- m_jobs[i]->Release();
- }
- }
-
- void operator+=( CJob *pJob )
- {
- m_jobs.AddToTail( pJob );
- }
-
- void operator-=( CJob *pJob )
- {
- m_jobs.FindAndRemove( pJob );
- }
-
- void Execute( bool bRelease = true )
- {
- for ( int i = 0; i < m_jobs.Count(); i++ )
- {
- m_jobs[i]->Execute();
- if ( bRelease )
- {
- m_jobs[i]->Release();
- }
- }
-
- if ( bRelease )
- {
- m_jobs.RemoveAll();
- }
- }
-
- void Abort( bool bRelease = true )
- {
- for ( int i = 0; i < m_jobs.Count(); i++ )
- {
- m_jobs[i]->Abort();
- if ( bRelease )
- {
- m_jobs[i]->Release();
- }
- }
-
- if ( bRelease )
- {
- m_jobs.RemoveAll();
- }
- }
-
- void WaitForFinish( bool bRelease = true )
- {
- for ( int i = 0; i < m_jobs.Count(); i++ )
- {
- m_jobs[i]->WaitForFinish();
- if ( bRelease )
- {
- m_jobs[i]->Release();
- }
- }
-
- if ( bRelease )
- {
- m_jobs.RemoveAll();
- }
- }
-
- void WaitForFinish( IThreadPool *pPool, bool bRelease = true )
- {
- pPool->YieldWait( m_jobs.Base(), m_jobs.Count() );
-
- if ( bRelease )
- {
- for ( int i = 0; i < m_jobs.Count(); i++ )
- {
- m_jobs[i]->Release();
- }
-
- m_jobs.RemoveAll();
- }
- }
-
-private:
- CUtlVectorFixed<CJob *, 16> m_jobs;
-};
-
-//-----------------------------------------------------------------------------
-// Job helpers
-//-----------------------------------------------------------------------------
-
-#define ThreadExecute g_pThreadPool->QueueCall
-#define ThreadExecuteRef g_pThreadPool->QueueRefCall
-
-#define BeginExecuteParallel() do { CJobSet jobSet
-#define EndExecuteParallel() jobSet.WaitForFinish( g_pThreadPool ); } while (0)
-
-#define ExecuteParallel jobSet += g_pThreadPool->QueueCall
-#define ExecuteRefParallel jobSet += g_pThreadPool->QueueCallRef
-
-
-//-----------------------------------------------------------------------------
-// Work splitting: array split, best when cost per item is roughly equal
-//-----------------------------------------------------------------------------
-
-#ifdef _MSC_VER
-#pragma warning(push)
-#pragma warning(disable:4389)
-#pragma warning(disable:4018)
-#pragma warning(disable:4701)
-#endif
-
-#define DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL(N) \
- template <typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \
- void IterRangeParallel(FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_ADDL_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- const int MAX_THREADS = 16; \
- int nIdle = g_pThreadPool->NumIdleThreads(); \
- ITERTYPE1 range = to - from; \
- int nThreads = MIN( nIdle + 1, range ); \
- if ( nThreads > MAX_THREADS ) \
- { \
- nThreads = MAX_THREADS; \
- } \
- if ( nThreads < 2 ) \
- { \
- FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- ITERTYPE1 nIncrement = range / nThreads; \
- \
- CJobSet jobSet; \
- while ( --nThreads ) \
- { \
- ITERTYPE2 thisTo = from + nIncrement; \
- jobSet += g_pThreadPool->AddCall( pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \
- from = thisTo; \
- } \
- FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
- jobSet.WaitForFinish( g_pThreadPool ); \
- } \
- \
- }
-
-FUNC_GENERATE_ALL( DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL );
-
-#define DEFINE_MEMBER_ITER_RANGE_PARALLEL(N) \
- template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \
- void IterRangeParallel(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_ADDL_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \
- { \
- const int MAX_THREADS = 16; \
- int nIdle = g_pThreadPool->NumIdleThreads(); \
- ITERTYPE1 range = to - from; \
- int nThreads = MIN( nIdle + 1, range ); \
- if ( nThreads > MAX_THREADS ) \
- { \
- nThreads = MAX_THREADS; \
- } \
- if ( nThreads < 2 ) \
- { \
- FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
- } \
- else \
- { \
- ITERTYPE1 nIncrement = range / nThreads; \
- \
- CJobSet jobSet; \
- while ( --nThreads ) \
- { \
- ITERTYPE2 thisTo = from + nIncrement; \
- jobSet += g_pThreadPool->AddCall( pObject, pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \
- from = thisTo; \
- } \
- FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \
- jobSet.WaitForFinish( g_pThreadPool ); \
- } \
- \
- }
-
-FUNC_GENERATE_ALL( DEFINE_MEMBER_ITER_RANGE_PARALLEL );
-
-//-----------------------------------------------------------------------------
-// Work splitting: competitive, best when cost per item varies a lot
-//-----------------------------------------------------------------------------
-
-template <typename T>
-class CJobItemProcessor
-{
-public:
- typedef T ItemType_t;
- void Begin() {}
- // void Process( ItemType_t & ) {}
- void End() {}
-};
-
-template <typename T>
-class CFuncJobItemProcessor : public CJobItemProcessor<T>
-{
-public:
- void Init(void (*pfnProcess)( T & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL )
- {
- m_pfnProcess = pfnProcess;
- m_pfnBegin = pfnBegin;
- m_pfnEnd = pfnEnd;
- }
-
- //CFuncJobItemProcessor(OBJECT_TYPE_PTR pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL );
- void Begin() { if ( m_pfnBegin ) (*m_pfnBegin)(); }
- void Process( T &item ) { (*m_pfnProcess)( item ); }
- void End() { if ( m_pfnEnd ) (*m_pfnEnd)(); }
-
-protected:
- void (*m_pfnProcess)( T & );
- void (*m_pfnBegin)();
- void (*m_pfnEnd)();
-};
-
-template <typename T, class OBJECT_TYPE, class FUNCTION_CLASS = OBJECT_TYPE >
-class CMemberFuncJobItemProcessor : public CJobItemProcessor<T>
-{
-public:
- void Init( OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( T & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL )
- {
- m_pObject = pObject;
- m_pfnProcess = pfnProcess;
- m_pfnBegin = pfnBegin;
- m_pfnEnd = pfnEnd;
- }
-
- void Begin() { if ( m_pfnBegin ) ((*m_pObject).*m_pfnBegin)(); }
- void Process( T &item ) { ((*m_pObject).*m_pfnProcess)( item ); }
- void End() { if ( m_pfnEnd ) ((*m_pObject).*m_pfnEnd)(); }
-
-protected:
- OBJECT_TYPE *m_pObject;
- void (FUNCTION_CLASS::*m_pfnProcess)( T & );
- void (FUNCTION_CLASS::*m_pfnBegin)();
- void (FUNCTION_CLASS::*m_pfnEnd)();
-};
-
-template <typename ITEM_TYPE, class ITEM_PROCESSOR_TYPE>
-class CParallelProcessor
-{
-public:
- CParallelProcessor( const char *pszDescription )
- {
- m_pItems = m_pLimit= 0;
- m_szDescription = pszDescription;
- }
-
- void Run( ITEM_TYPE *pItems, unsigned nItems, int nMaxParallel = INT_MAX, IThreadPool *pThreadPool = NULL )
- {
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "Run %s %d", m_szDescription, nItems );
-
- if ( nItems == 0 )
- return;
-
- if ( !pThreadPool )
- {
- pThreadPool = g_pThreadPool;
- }
-
- m_pItems = pItems;
- m_pLimit = pItems + nItems;
-
- int nJobs = nItems - 1;
-
- if ( nJobs > nMaxParallel )
- {
- nJobs = nMaxParallel;
- }
-
- if (! pThreadPool ) // only possible on linux
- {
- DoExecute( );
- return;
- }
-
- int nThreads = pThreadPool->NumThreads();
- if ( nJobs > nThreads )
- {
- nJobs = nThreads;
- }
-
- if ( nJobs > 1 )
- {
- CJob **jobs = (CJob **)stackalloc( nJobs * sizeof(CJob **) );
- int i = nJobs;
-
- while( i-- )
- {
- jobs[i] = pThreadPool->QueueCall( this, &CParallelProcessor<ITEM_TYPE, ITEM_PROCESSOR_TYPE>::DoExecute );
- jobs[i]->SetDescription( m_szDescription );
- }
-
- DoExecute();
-
- for ( i = 0; i < nJobs; i++ )
- {
- jobs[i]->Abort(); // will either abort ones that never got a thread, or noop on ones that did
- jobs[i]->Release();
- }
- }
- else
- {
- DoExecute();
- }
- }
-
- ITEM_PROCESSOR_TYPE m_ItemProcessor;
-
-private:
- void DoExecute()
- {
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "DoExecute %s", m_szDescription );
-
- if ( m_pItems < m_pLimit )
- {
- m_ItemProcessor.Begin();
-
- ITEM_TYPE *pLimit = m_pLimit;
-
- for (;;)
- {
- ITEM_TYPE *pCurrent = m_pItems++;
- if ( pCurrent < pLimit )
- {
- m_ItemProcessor.Process( *pCurrent );
- }
- else
- {
- break;
- }
- }
-
- m_ItemProcessor.End();
- }
- }
- CInterlockedPtr<ITEM_TYPE> m_pItems;
- ITEM_TYPE * m_pLimit;
- const char * m_szDescription;
-};
-
-template <typename ITEM_TYPE>
-inline void ParallelProcess( const char *pszDescription, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
-{
- CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor( pszDescription );
- processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
- processor.Run( pItems, nItems, nMaxParallel );
-
-}
-
-template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS >
-inline void ParallelProcess( const char *pszDescription, ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
-{
- CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor( pszDescription );
- processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
- processor.Run( pItems, nItems, nMaxParallel );
-}
-
-// Parallel Process that lets you specify threadpool
-template <typename ITEM_TYPE>
-inline void ParallelProcess( const char *pszDescription, IThreadPool *pPool, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
-{
- CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor( pszDescription );
- processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
- processor.Run( pItems, nItems, nMaxParallel, pPool );
-}
-
-
-template <class ITEM_PROCESSOR_TYPE>
-class CParallelLoopProcessor
-{
-public:
- CParallelLoopProcessor( const char *pszDescription )
- {
- m_lIndex = m_lLimit= 0;
- m_nActive = 0;
- m_szDescription = pszDescription;
- }
-
- void Run( long lBegin, long nItems, int nMaxParallel = INT_MAX )
- {
- if ( nItems )
- {
- m_lIndex = lBegin;
- m_lLimit = lBegin + nItems;
- int i = g_pThreadPool->NumIdleThreads();
-
- if ( nMaxParallel < i)
- {
- i = nMaxParallel;
- }
-
- while( i-- )
- {
- ++m_nActive;
- ThreadExecute( this, &CParallelLoopProcessor<ITEM_PROCESSOR_TYPE>::DoExecute )->Release();
- }
-
- ++m_nActive;
- DoExecute();
-
- while ( m_nActive )
- {
- ThreadPause();
- }
- }
- }
-
- ITEM_PROCESSOR_TYPE m_ItemProcessor;
-
-private:
- void DoExecute()
- {
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "DoExecute %s", m_szDescription );
-
- m_ItemProcessor.Begin();
-
- long lLimit = m_lLimit;
-
- for (;;)
- {
- long lIndex = m_lIndex ++;
- if ( lIndex < lLimit )
- {
- m_ItemProcessor.Process( lIndex );
- }
- else
- {
- break;
- }
- }
-
- m_ItemProcessor.End();
-
- --m_nActive;
- }
- CInterlockedInt m_lIndex;
- long m_lLimit;
- CInterlockedInt m_nActive;
- const char * m_szDescription;
-};
-
-inline void ParallelLoopProcess( const char *szDescription, long lBegin, unsigned nItems, void (*pfnProcess)( long const & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
-{
- CParallelLoopProcessor< CFuncJobItemProcessor< long const > > processor( szDescription );
- processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd );
- processor.Run( lBegin, nItems, nMaxParallel );
-
-}
-
-template < typename OBJECT_TYPE, typename FUNCTION_CLASS >
-inline void ParallelLoopProcess( const char *szDescription, long lBegin, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( long const & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX )
-{
- CParallelLoopProcessor< CMemberFuncJobItemProcessor<long const, OBJECT_TYPE, FUNCTION_CLASS> > processor( szDescription );
- processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd );
- processor.Run( lBegin, nItems, nMaxParallel );
-}
-
-
-template <class Derived>
-class CParallelProcessorBase
-{
-protected:
- typedef CParallelProcessorBase<Derived> ThisParallelProcessorBase_t;
- typedef Derived ThisParallelProcessorDerived_t;
-
-public:
- CParallelProcessorBase()
- {
- m_nActive = 0;
- m_szDescription = NULL;
- }
- void SetDescription( const char *pszDescription )
- {
- m_szDescription = pszDescription;
- }
-
-protected:
- void Run( int nMaxParallel = INT_MAX, int threadOverride = -1 )
- {
- int i = g_pThreadPool->NumIdleThreads();
-
- if ( nMaxParallel < i)
- {
- i = nMaxParallel;
- }
-
- while( i -- > 0 )
- {
- if ( threadOverride == -1 || i == threadOverride - 1 )
- {
- ++ m_nActive;
- ThreadExecute( this, &ThisParallelProcessorBase_t::DoExecute )->Release();
- }
- }
-
- if ( threadOverride == -1 || threadOverride == 0 )
- {
- ++ m_nActive;
- DoExecute();
- }
-
- while ( m_nActive )
- {
- ThreadPause();
- }
- }
-
-protected:
- void OnBegin() {}
- bool OnProcess() { return false; }
- void OnEnd() {}
-
-private:
- void DoExecute()
- {
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "DoExecute %s", m_szDescription );
-
- static_cast<Derived *>( this )->OnBegin();
-
- while ( static_cast<Derived *>( this )->OnProcess() )
- continue;
-
- static_cast<Derived *>(this)->OnEnd();
-
- -- m_nActive;
- }
-
- CInterlockedInt m_nActive;
- const char * m_szDescription;
-};
-
-
-
-
-//-----------------------------------------------------------------------------
-// Raw thread launching
-//-----------------------------------------------------------------------------
-
-inline unsigned FunctorExecuteThread( void *pParam )
-{
- CFunctor *pFunctor = (CFunctor *)pParam;
- (*pFunctor)();
- pFunctor->Release();
- return 0;
-}
-
-inline ThreadHandle_t ThreadExecuteSoloImpl( CFunctor *pFunctor, const char *pszName = NULL )
-{
- ThreadHandle_t hThread;
- ThreadId_t threadId;
- hThread = CreateSimpleThread( FunctorExecuteThread, pFunctor, &threadId );
- if ( pszName )
- {
- ThreadSetDebugName( threadId, pszName );
- }
- return hThread;
-}
-
-inline ThreadHandle_t ThreadExecuteSolo( CJob *pJob ) { return ThreadExecuteSoloImpl( CreateFunctor( pJob, &CJob::Execute ), pJob->Describe() ); }
-
-template <typename T1>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1 ), pszName ); }
-
-template <typename T1, typename T2>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2 ), pszName ); }
-
-template <typename T1, typename T2, typename T3>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6, a7 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
-inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6, a7, a8 ), pszName ); }
-
-template <typename T1, typename T2>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2 ), pszName ); }
-
-template <typename T1, typename T2, typename T3>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7 ), pszName ); }
-
-template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8>
-inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7, a8 ), pszName ); }
-
-//-----------------------------------------------------------------------------
-
-inline bool IThreadPool::YieldWait( CThreadEvent &event, unsigned timeout )
-{
- CThreadEvent *pEvent = &event;
- return ( YieldWait( &pEvent, 1, true, timeout ) != TW_TIMEOUT );
-}
-
-inline bool IThreadPool::YieldWait( CJob *pJob, unsigned timeout )
-{
- return ( YieldWait( &pJob, 1, true, timeout ) != TW_TIMEOUT );
-}
-
-//-----------------------------------------------------------------------------
-
-inline JobStatus_t CJob::Execute()
-{
- if ( IsFinished() )
- {
- return m_status;
- }
-
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s %s %d", __FUNCTION__, Describe(), m_status );
-
- AUTO_LOCK( m_mutex );
- AddRef();
-
- JobStatus_t result;
-
- switch ( m_status )
- {
- case JOB_STATUS_UNSERVICED:
- case JOB_STATUS_PENDING:
- {
- // Service it
- m_status = JOB_STATUS_INPROGRESS;
- result = m_status = DoExecute();
- DoCleanup();
- m_CompleteEvent.Set();
- break;
- }
-
- case JOB_STATUS_INPROGRESS:
- AssertMsg(0, "Mutex Should have protected use while processing");
- // fall through...
-
- case JOB_OK:
- case JOB_STATUS_ABORTED:
- result = m_status;
- break;
-
- default:
- AssertMsg( m_status < JOB_OK, "Unknown job state");
- result = m_status;
- }
-
- Release();
-
- return result;
-}
-
-
-//---------------------------------------------------------
-
-inline JobStatus_t CJob::TryExecute()
-{
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s %s %d", __FUNCTION__, Describe(), m_status );
-
- // TryLock() would only fail if another thread has entered
- // Execute() or Abort()
- if ( !IsFinished() && TryLock() )
- {
- // ...service the request
- Execute();
- Unlock();
- }
- return m_status;
-}
-
-//---------------------------------------------------------
-
-inline JobStatus_t CJob::Abort( bool bDiscard )
-{
- if ( IsFinished() )
- {
- return m_status;
- }
-
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s %s %d", __FUNCTION__, Describe(), m_status );
-
- AUTO_LOCK( m_mutex );
- AddRef();
-
- JobStatus_t result;
-
- switch ( m_status )
- {
- case JOB_STATUS_UNSERVICED:
- case JOB_STATUS_PENDING:
- {
- tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "CJob::DoAbort" );
-
- result = m_status = DoAbort( bDiscard );
- if ( bDiscard )
- DoCleanup();
- m_CompleteEvent.Set();
- }
- break;
-
- case JOB_STATUS_ABORTED:
- case JOB_STATUS_INPROGRESS:
- case JOB_OK:
- result = m_status;
- break;
-
- default:
- AssertMsg( m_status < JOB_OK, "Unknown job state");
- result = m_status;
- }
-
- Release();
-
- return result;
-}
-
-//-----------------------------------------------------------------------------
-
-#endif // JOBTHREAD_H
+//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: A utility for a discrete job-oriented worker thread. +// +// The class CThreadPool is both the job queue, and the +// worker thread. Except when the main thread attempts to +// synchronously execute a job, most of the inter-thread locking +// on the queue. +// +// The queue threading model uses a manual reset event for optimal +// throughput. Adding to the queue is guarded by a semaphore that +// will block the inserting thread if the queue has overflown. +// This prevents the worker thread from being starved out even if +// not running at a higher priority than the master thread. +// +// The thread function waits for jobs, services jobs, and manages +// communication between the worker and master threads. The nature +// of the work is opaque to the Executer. +// +// CJob instances actually do the work. The base class +// calls virtual methods for job primitives, so derivations don't +// need to worry about threading models. All of the variants of +// job and OS can be expressed in this hierarchy. Instances of +// CJob are the items placed in the queue, and by +// overriding the job primitives they are the manner by which +// users of the Executer control the state of the job. +// +//============================================================================= + +#include <limits.h> +#include "tier0/threadtools.h" +#include "tier1/refcount.h" +#include "tier1/utllinkedlist.h" +#include "tier1/utlvector.h" +#include "tier1/functors.h" +#include "tier0/vprof_telemetry.h" + +#include "vstdlib/vstdlib.h" + +#ifndef JOBTHREAD_H +#define JOBTHREAD_H + +#ifdef AddJob // windows.h print function collisions +#undef AddJob +#undef GetJob +#endif + +#ifdef VSTDLIB_DLL_EXPORT +#define JOB_INTERFACE DLL_EXPORT +#define JOB_OVERLOAD DLL_GLOBAL_EXPORT +#define JOB_CLASS DLL_CLASS_EXPORT +#else +#define JOB_INTERFACE DLL_IMPORT +#define JOB_OVERLOAD DLL_GLOBAL_IMPORT +#define JOB_CLASS DLL_CLASS_IMPORT +#endif + +#if defined( _WIN32 ) +#pragma once +#endif + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- + +class CJob; + +//----------------------------------------------------------------------------- +// +//----------------------------------------------------------------------------- +enum JobStatusEnum_t +{ + // Use negative for errors + JOB_OK, // operation is successful + JOB_STATUS_PENDING, // file is properly queued, waiting for service + JOB_STATUS_INPROGRESS, // file is being accessed + JOB_STATUS_ABORTED, // file was aborted by caller + JOB_STATUS_UNSERVICED, // file is not yet queued +}; + +typedef int JobStatus_t; + +enum JobFlags_t +{ + JF_IO = ( 1 << 0 ), // The job primarily blocks on IO or hardware + JF_BOOST_THREAD = ( 1 << 1 ), // Up the thread priority to max allowed while processing task + JF_SERIAL = ( 1 << 2 ), // Job cannot be executed out of order relative to other "strict" jobs + JF_QUEUE = ( 1 << 3 ), // Queue it, even if not an IO job +}; + +enum JobPriority_t +{ + JP_LOW, + JP_NORMAL, + JP_HIGH +}; + +#define TP_MAX_POOL_THREADS 64 +struct ThreadPoolStartParams_t +{ + ThreadPoolStartParams_t( bool bIOThreads = false, unsigned nThreads = -1, int *pAffinities = NULL, ThreeState_t fDistribute = TRS_NONE, unsigned nStackSize = -1, int iThreadPriority = SHRT_MIN ) + : bIOThreads( bIOThreads ), nThreads( nThreads ), fDistribute( fDistribute ), nStackSize( nStackSize ), iThreadPriority( iThreadPriority ), nThreadsMax( -1 ) + { + bExecOnThreadPoolThreadsOnly = false; + + bUseAffinityTable = ( pAffinities != NULL ) && ( fDistribute == TRS_TRUE ) && ( nThreads != -1 ); + if ( bUseAffinityTable ) + { + // user supplied an optional 1:1 affinity mapping to override normal distribute behavior + nThreads = MIN( TP_MAX_POOL_THREADS, nThreads ); + for ( unsigned int i = 0; i < nThreads; i++ ) + { + iAffinityTable[i] = pAffinities[i]; + } + } + } + + int nThreads; + int nThreadsMax; + ThreeState_t fDistribute; + int nStackSize; + int iThreadPriority; + int iAffinityTable[TP_MAX_POOL_THREADS]; + + bool bIOThreads : 1; + bool bUseAffinityTable : 1; + bool bExecOnThreadPoolThreadsOnly : 1; +}; + +//----------------------------------------------------------------------------- +// +// IThreadPool +// +//----------------------------------------------------------------------------- + +typedef bool (*JobFilter_t)( CJob * ); + +//--------------------------------------------------------- +// Messages supported through the CallWorker() method +//--------------------------------------------------------- +enum ThreadPoolMessages_t +{ + TPM_EXIT, // Exit the thread + TPM_SUSPEND, // Suspend after next operation + TPM_RUNFUNCTOR, // Run functor, reply when done. +}; + +//--------------------------------------------------------- + +abstract_class IThreadPool : public IRefCounted +{ +public: + virtual ~IThreadPool() {}; + + //----------------------------------------------------- + // Thread functions + //----------------------------------------------------- + virtual bool Start( const ThreadPoolStartParams_t &startParams = ThreadPoolStartParams_t() ) = 0; + virtual bool Stop( int timeout = TT_INFINITE ) = 0; + + //----------------------------------------------------- + // Functions for any thread + //----------------------------------------------------- + virtual unsigned GetJobCount() = 0; + virtual int NumThreads() = 0; + virtual int NumIdleThreads() = 0; + + //----------------------------------------------------- + // Pause/resume processing jobs + //----------------------------------------------------- + virtual int SuspendExecution() = 0; + virtual int ResumeExecution() = 0; + + //----------------------------------------------------- + // Offer the current thread to the pool + //----------------------------------------------------- + virtual int YieldWait( CThreadEvent **pEvents, int nEvents, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0; + virtual int YieldWait( CJob **, int nJobs, bool bWaitAll = true, unsigned timeout = TT_INFINITE ) = 0; + virtual void Yield( unsigned timeout ) = 0; + + bool YieldWait( CThreadEvent &event, unsigned timeout = TT_INFINITE ); + bool YieldWait( CJob *, unsigned timeout = TT_INFINITE ); + + //----------------------------------------------------- + // Add a native job to the queue (master thread) + //----------------------------------------------------- + virtual void AddJob( CJob * ) = 0; + + //----------------------------------------------------- + // All threads execute pFunctor asap. Thread will either wake up + // and execute or execute pFunctor right after completing current job and + // before looking for another job. + //----------------------------------------------------- + virtual void ExecuteHighPriorityFunctor( CFunctor *pFunctor ) = 0; + + //----------------------------------------------------- + // Add an function object to the queue (master thread) + //----------------------------------------------------- + virtual void AddFunctor( CFunctor *pFunctor, CJob **ppJob = NULL, const char *pszDescription = NULL, unsigned flags = 0 ) { AddFunctorInternal( RetAddRef( pFunctor ), ppJob, pszDescription, flags ); } + + //----------------------------------------------------- + // Change the priority of an active job + //----------------------------------------------------- + virtual void ChangePriority( CJob *p, JobPriority_t priority ) = 0; + + //----------------------------------------------------- + // Bulk job manipulation (blocking) + //----------------------------------------------------- + int ExecuteAll( JobFilter_t pfnFilter = NULL ) { return ExecuteToPriority( JP_LOW, pfnFilter ); } + virtual int ExecuteToPriority( JobPriority_t toPriority, JobFilter_t pfnFilter = NULL ) = 0; + virtual int AbortAll() = 0; + + //----------------------------------------------------- + virtual void Reserved1() = 0; + + //----------------------------------------------------- + // Add an arbitrary call to the queue (master thread) + // + // Avert thy eyes! Imagine rather: + // + // CJob *AddCall( <function>, [args1, [arg2,]...] + // CJob *AddCall( <object>, <function>, [args1, [arg2,]...] + // CJob *AddRefCall( <object>, <function>, [args1, [arg2,]...] + // CJob *QueueCall( <function>, [args1, [arg2,]...] + // CJob *QueueCall( <object>, <function>, [args1, [arg2,]...] + //----------------------------------------------------- + + #define DEFINE_NONMEMBER_ADD_CALL(N) \ + template <typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *AddCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + if ( !NumIdleThreads() ) \ + { \ + pJob = GetDummyJob(); \ + FunctorDirectCall( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ + } \ + \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_MEMBER_ADD_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + if ( !NumIdleThreads() ) \ + { \ + pJob = GetDummyJob(); \ + FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ + } \ + \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_CONST_MEMBER_ADD_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *AddCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + if ( !NumIdleThreads() ) \ + { \ + pJob = GetDummyJob(); \ + FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ + } \ + \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_REF_COUNTING_MEMBER_ADD_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + if ( !NumIdleThreads() ) \ + { \ + pJob = GetDummyJob(); \ + FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ + } \ + \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *AddRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + if ( !NumIdleThreads() ) \ + { \ + pJob = GetDummyJob(); \ + FunctorDirectCall( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob ); \ + } \ + \ + return pJob; \ + } + + //----------------------------------------------------------------------------- + + #define DEFINE_NONMEMBER_QUEUE_CALL(N) \ + template <typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *QueueCall(FUNCTION_RETTYPE (*pfnProxied)( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + AddFunctorInternal( CreateFunctor( pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_MEMBER_QUEUE_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_CONST_MEMBER_QUEUE_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *QueueCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + AddFunctorInternal( CreateFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ + return pJob; \ + } + + //------------------------------------- + + #define DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N> \ + CJob *QueueRefCall(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( FUNC_BASE_TEMPLATE_FUNC_PARAMS_##N ) const FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + CJob *pJob; \ + AddFunctorInternal( CreateRefCountingFunctor( pObject, pfnProxied FUNC_FUNCTOR_CALL_ARGS_##N ), &pJob, NULL, JF_QUEUE ); \ + \ + return pJob; \ + } + + FUNC_GENERATE_ALL( DEFINE_NONMEMBER_ADD_CALL ); + FUNC_GENERATE_ALL( DEFINE_MEMBER_ADD_CALL ); + FUNC_GENERATE_ALL( DEFINE_CONST_MEMBER_ADD_CALL ); + FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_MEMBER_ADD_CALL ); + FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL ); + FUNC_GENERATE_ALL( DEFINE_NONMEMBER_QUEUE_CALL ); + FUNC_GENERATE_ALL( DEFINE_MEMBER_QUEUE_CALL ); + FUNC_GENERATE_ALL( DEFINE_CONST_MEMBER_QUEUE_CALL ); + FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL ); + FUNC_GENERATE_ALL( DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL ); + + #undef DEFINE_NONMEMBER_ADD_CALL + #undef DEFINE_MEMBER_ADD_CALL + #undef DEFINE_CONST_MEMBER_ADD_CALL + #undef DEFINE_REF_COUNTING_MEMBER_ADD_CALL + #undef DEFINE_REF_COUNTING_CONST_MEMBER_ADD_CALL + #undef DEFINE_NONMEMBER_QUEUE_CALL + #undef DEFINE_MEMBER_QUEUE_CALL + #undef DEFINE_CONST_MEMBER_QUEUE_CALL + #undef DEFINE_REF_COUNTING_MEMBER_QUEUE_CALL + #undef DEFINE_REF_COUNTING_CONST_MEMBER_QUEUE_CALL + +private: + virtual void AddFunctorInternal( CFunctor *, CJob ** = NULL, const char *pszDescription = NULL, unsigned flags = 0 ) = 0; + + //----------------------------------------------------- + // Services for internal use by job instances + //----------------------------------------------------- + friend class CJob; + + virtual CJob *GetDummyJob() = 0; + +public: + virtual void Distribute( bool bDistribute = true, int *pAffinityTable = NULL ) = 0; + + virtual bool Start( const ThreadPoolStartParams_t &startParams, const char *pszNameOverride ) = 0; +}; + +//----------------------------------------------------------------------------- + +JOB_INTERFACE IThreadPool *CreateThreadPool(); +JOB_INTERFACE void DestroyThreadPool( IThreadPool *pPool ); + +//------------------------------------- + +JOB_INTERFACE void RunThreadPoolTests(); + +//----------------------------------------------------------------------------- + +JOB_INTERFACE IThreadPool *g_pThreadPool; + +//----------------------------------------------------------------------------- +// Class to combine the metadata for an operation and the ability to perform +// the operation. Meant for inheritance. All functions inline, defers to executor +//----------------------------------------------------------------------------- +DECLARE_POINTER_HANDLE( ThreadPoolData_t ); +#define JOB_NO_DATA ((ThreadPoolData_t)-1) + +class CJob : public CRefCounted1<IRefCounted, CRefCountServiceMT> +{ +public: + CJob( JobPriority_t priority = JP_NORMAL ) + : m_status( JOB_STATUS_UNSERVICED ), + m_ThreadPoolData( JOB_NO_DATA ), + m_priority( priority ), + m_flags( 0 ), + m_pThreadPool( NULL ), + m_CompleteEvent( true ), + m_iServicingThread( -1 ) + { + m_szDescription[ 0 ] = 0; + } + + //----------------------------------------------------- + // Priority (not thread safe) + //----------------------------------------------------- + void SetPriority( JobPriority_t priority ) { m_priority = priority; } + JobPriority_t GetPriority() const { return m_priority; } + + //----------------------------------------------------- + + void SetFlags( unsigned flags ) { m_flags = flags; } + unsigned GetFlags() const { return m_flags; } + + //----------------------------------------------------- + + void SetServiceThread( int iServicingThread ) { m_iServicingThread = (char)iServicingThread; } + int GetServiceThread() const { return m_iServicingThread; } + void ClearServiceThread() { m_iServicingThread = -1; } + + //----------------------------------------------------- + // Fast queries + //----------------------------------------------------- + bool Executed() const { return ( m_status == JOB_OK ); } + bool CanExecute() const { return ( m_status == JOB_STATUS_PENDING || m_status == JOB_STATUS_UNSERVICED ); } + bool IsFinished() const { return ( m_status != JOB_STATUS_PENDING && m_status != JOB_STATUS_INPROGRESS && m_status != JOB_STATUS_UNSERVICED ); } + JobStatus_t GetStatus() const { return m_status; } + + /// Slam the status to a particular value. This is named "slam" instead of "set," + /// to warn you that it should only be used in unusual situations. Otherwise, the + /// job manager really should manage the status for you, and you should not manhandle it. + void SlamStatus(JobStatus_t s) { m_status = s; } + + //----------------------------------------------------- + // Try to acquire ownership (to satisfy). If you take the lock, you must either execute or abort. + //----------------------------------------------------- + bool TryLock() { return m_mutex.TryLock(); } + void Lock() { m_mutex.Lock(); } + void Unlock() { m_mutex.Unlock(); } + + //----------------------------------------------------- + // Thread event support (safe for NULL this to simplify code ) + //----------------------------------------------------- + bool WaitForFinish( uint32 dwTimeout = TT_INFINITE ) { if (!this) return true; return ( !IsFinished() ) ? g_pThreadPool->YieldWait( this, dwTimeout ) : true; } + bool WaitForFinishAndRelease( uint32 dwTimeout = TT_INFINITE ) { if (!this) return true; bool bResult = WaitForFinish( dwTimeout); Release(); return bResult; } + CThreadEvent *AccessEvent() { return &m_CompleteEvent; } + + //----------------------------------------------------- + // Perform the job + //----------------------------------------------------- + JobStatus_t Execute(); + JobStatus_t TryExecute(); + JobStatus_t ExecuteAndRelease() { JobStatus_t status = Execute(); Release(); return status; } + JobStatus_t TryExecuteAndRelease() { JobStatus_t status = TryExecute(); Release(); return status; } + + //----------------------------------------------------- + // Terminate the job, discard if partially or wholly fulfilled + //----------------------------------------------------- + JobStatus_t Abort( bool bDiscard = true ); + + virtual char const *Describe() { return m_szDescription[ 0 ] ? m_szDescription : "Job"; } + virtual void SetDescription( const char *pszDescription ) + { + if( pszDescription ) + { + Q_strncpy( m_szDescription, pszDescription, sizeof( m_szDescription ) ); + } + else + { + m_szDescription[ 0 ] = 0; + } + } + +private: + //----------------------------------------------------- + friend class CThreadPool; + + JobStatus_t m_status; + JobPriority_t m_priority; + CThreadMutex m_mutex; + unsigned char m_flags; + char m_iServicingThread; + short m_reserved; + ThreadPoolData_t m_ThreadPoolData; + IThreadPool * m_pThreadPool; + CThreadEvent m_CompleteEvent; + char m_szDescription[ 32 ]; + +private: + //----------------------------------------------------- + CJob( const CJob &fromRequest ); + void operator=(const CJob &fromRequest ); + + virtual JobStatus_t DoExecute() = 0; + virtual JobStatus_t DoAbort( bool bDiscard ) { return JOB_STATUS_ABORTED; } + virtual void DoCleanup() {} +}; + +//----------------------------------------------------------------------------- + +class CFunctorJob : public CJob +{ +public: + CFunctorJob( CFunctor *pFunctor, const char *pszDescription = NULL ) + : m_pFunctor( pFunctor ) + { + if ( pszDescription ) + { + Q_strncpy( m_szDescription, pszDescription, sizeof(m_szDescription) ); + } + else + { + m_szDescription[0] = 0; + } + } + + virtual JobStatus_t DoExecute() + { + (*m_pFunctor)(); + return JOB_OK; + } + + const char *Describe() + { + return m_szDescription; + } + +private: + CRefPtr<CFunctor> m_pFunctor; + char m_szDescription[16]; +}; + +//----------------------------------------------------------------------------- +// Utility for managing multiple jobs +//----------------------------------------------------------------------------- + +class CJobSet +{ +public: + CJobSet( CJob *pJob = NULL ) + { + if ( pJob ) + { + m_jobs.AddToTail( pJob ); + } + } + + CJobSet( CJob **ppJobs, int nJobs ) + { + if ( ppJobs ) + { + m_jobs.AddMultipleToTail( nJobs, ppJobs ); + } + } + + ~CJobSet() + { + for ( int i = 0; i < m_jobs.Count(); i++ ) + { + m_jobs[i]->Release(); + } + } + + void operator+=( CJob *pJob ) + { + m_jobs.AddToTail( pJob ); + } + + void operator-=( CJob *pJob ) + { + m_jobs.FindAndRemove( pJob ); + } + + void Execute( bool bRelease = true ) + { + for ( int i = 0; i < m_jobs.Count(); i++ ) + { + m_jobs[i]->Execute(); + if ( bRelease ) + { + m_jobs[i]->Release(); + } + } + + if ( bRelease ) + { + m_jobs.RemoveAll(); + } + } + + void Abort( bool bRelease = true ) + { + for ( int i = 0; i < m_jobs.Count(); i++ ) + { + m_jobs[i]->Abort(); + if ( bRelease ) + { + m_jobs[i]->Release(); + } + } + + if ( bRelease ) + { + m_jobs.RemoveAll(); + } + } + + void WaitForFinish( bool bRelease = true ) + { + for ( int i = 0; i < m_jobs.Count(); i++ ) + { + m_jobs[i]->WaitForFinish(); + if ( bRelease ) + { + m_jobs[i]->Release(); + } + } + + if ( bRelease ) + { + m_jobs.RemoveAll(); + } + } + + void WaitForFinish( IThreadPool *pPool, bool bRelease = true ) + { + pPool->YieldWait( m_jobs.Base(), m_jobs.Count() ); + + if ( bRelease ) + { + for ( int i = 0; i < m_jobs.Count(); i++ ) + { + m_jobs[i]->Release(); + } + + m_jobs.RemoveAll(); + } + } + +private: + CUtlVectorFixed<CJob *, 16> m_jobs; +}; + +//----------------------------------------------------------------------------- +// Job helpers +//----------------------------------------------------------------------------- + +#define ThreadExecute g_pThreadPool->QueueCall +#define ThreadExecuteRef g_pThreadPool->QueueRefCall + +#define BeginExecuteParallel() do { CJobSet jobSet +#define EndExecuteParallel() jobSet.WaitForFinish( g_pThreadPool ); } while (0) + +#define ExecuteParallel jobSet += g_pThreadPool->QueueCall +#define ExecuteRefParallel jobSet += g_pThreadPool->QueueCallRef + + +//----------------------------------------------------------------------------- +// Work splitting: array split, best when cost per item is roughly equal +//----------------------------------------------------------------------------- + +#ifdef _MSC_VER +#pragma warning(push) +#pragma warning(disable:4389) +#pragma warning(disable:4018) +#pragma warning(disable:4701) +#endif + +#define DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL(N) \ + template <typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \ + void IterRangeParallel(FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_ADDL_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + const int MAX_THREADS = 16; \ + int nIdle = g_pThreadPool->NumIdleThreads(); \ + ITERTYPE1 range = to - from; \ + int nThreads = MIN( nIdle + 1, range ); \ + if ( nThreads > MAX_THREADS ) \ + { \ + nThreads = MAX_THREADS; \ + } \ + if ( nThreads < 2 ) \ + { \ + FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + ITERTYPE1 nIncrement = range / nThreads; \ + \ + CJobSet jobSet; \ + while ( --nThreads ) \ + { \ + ITERTYPE2 thisTo = from + nIncrement; \ + jobSet += g_pThreadPool->AddCall( pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \ + from = thisTo; \ + } \ + FunctorDirectCall( pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ + jobSet.WaitForFinish( g_pThreadPool ); \ + } \ + \ + } + +FUNC_GENERATE_ALL( DEFINE_NON_MEMBER_ITER_RANGE_PARALLEL ); + +#define DEFINE_MEMBER_ITER_RANGE_PARALLEL(N) \ + template <typename OBJECT_TYPE, typename FUNCTION_CLASS, typename FUNCTION_RETTYPE FUNC_TEMPLATE_FUNC_PARAMS_##N FUNC_TEMPLATE_ARG_PARAMS_##N, typename ITERTYPE1, typename ITERTYPE2> \ + void IterRangeParallel(OBJECT_TYPE *pObject, FUNCTION_RETTYPE ( FUNCTION_CLASS::*pfnProxied )( ITERTYPE1, ITERTYPE2 FUNC_ADDL_TEMPLATE_FUNC_PARAMS_##N ), ITERTYPE1 from, ITERTYPE2 to FUNC_ARG_FORMAL_PARAMS_##N ) \ + { \ + const int MAX_THREADS = 16; \ + int nIdle = g_pThreadPool->NumIdleThreads(); \ + ITERTYPE1 range = to - from; \ + int nThreads = MIN( nIdle + 1, range ); \ + if ( nThreads > MAX_THREADS ) \ + { \ + nThreads = MAX_THREADS; \ + } \ + if ( nThreads < 2 ) \ + { \ + FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ + } \ + else \ + { \ + ITERTYPE1 nIncrement = range / nThreads; \ + \ + CJobSet jobSet; \ + while ( --nThreads ) \ + { \ + ITERTYPE2 thisTo = from + nIncrement; \ + jobSet += g_pThreadPool->AddCall( pObject, pfnProxied, from, thisTo FUNC_FUNCTOR_CALL_ARGS_##N ); \ + from = thisTo; \ + } \ + FunctorDirectCall( pObject, pfnProxied, from, to FUNC_FUNCTOR_CALL_ARGS_##N ); \ + jobSet.WaitForFinish( g_pThreadPool ); \ + } \ + \ + } + +FUNC_GENERATE_ALL( DEFINE_MEMBER_ITER_RANGE_PARALLEL ); + +//----------------------------------------------------------------------------- +// Work splitting: competitive, best when cost per item varies a lot +//----------------------------------------------------------------------------- + +template <typename T> +class CJobItemProcessor +{ +public: + typedef T ItemType_t; + void Begin() {} + // void Process( ItemType_t & ) {} + void End() {} +}; + +template <typename T> +class CFuncJobItemProcessor : public CJobItemProcessor<T> +{ +public: + void Init(void (*pfnProcess)( T & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL ) + { + m_pfnProcess = pfnProcess; + m_pfnBegin = pfnBegin; + m_pfnEnd = pfnEnd; + } + + //CFuncJobItemProcessor(OBJECT_TYPE_PTR pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL ); + void Begin() { if ( m_pfnBegin ) (*m_pfnBegin)(); } + void Process( T &item ) { (*m_pfnProcess)( item ); } + void End() { if ( m_pfnEnd ) (*m_pfnEnd)(); } + +protected: + void (*m_pfnProcess)( T & ); + void (*m_pfnBegin)(); + void (*m_pfnEnd)(); +}; + +template <typename T, class OBJECT_TYPE, class FUNCTION_CLASS = OBJECT_TYPE > +class CMemberFuncJobItemProcessor : public CJobItemProcessor<T> +{ +public: + void Init( OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( T & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL ) + { + m_pObject = pObject; + m_pfnProcess = pfnProcess; + m_pfnBegin = pfnBegin; + m_pfnEnd = pfnEnd; + } + + void Begin() { if ( m_pfnBegin ) ((*m_pObject).*m_pfnBegin)(); } + void Process( T &item ) { ((*m_pObject).*m_pfnProcess)( item ); } + void End() { if ( m_pfnEnd ) ((*m_pObject).*m_pfnEnd)(); } + +protected: + OBJECT_TYPE *m_pObject; + void (FUNCTION_CLASS::*m_pfnProcess)( T & ); + void (FUNCTION_CLASS::*m_pfnBegin)(); + void (FUNCTION_CLASS::*m_pfnEnd)(); +}; + +template <typename ITEM_TYPE, class ITEM_PROCESSOR_TYPE> +class CParallelProcessor +{ +public: + CParallelProcessor( const char *pszDescription ) + { + m_pItems = m_pLimit= 0; + m_szDescription = pszDescription; + } + + void Run( ITEM_TYPE *pItems, unsigned nItems, int nMaxParallel = INT_MAX, IThreadPool *pThreadPool = NULL ) + { + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "Run %s %d", m_szDescription, nItems ); + + if ( nItems == 0 ) + return; + + if ( !pThreadPool ) + { + pThreadPool = g_pThreadPool; + } + + m_pItems = pItems; + m_pLimit = pItems + nItems; + + int nJobs = nItems - 1; + + if ( nJobs > nMaxParallel ) + { + nJobs = nMaxParallel; + } + + if (! pThreadPool ) // only possible on linux + { + DoExecute( ); + return; + } + + int nThreads = pThreadPool->NumThreads(); + if ( nJobs > nThreads ) + { + nJobs = nThreads; + } + + if ( nJobs > 1 ) + { + CJob **jobs = (CJob **)stackalloc( nJobs * sizeof(CJob **) ); + int i = nJobs; + + while( i-- ) + { + jobs[i] = pThreadPool->QueueCall( this, &CParallelProcessor<ITEM_TYPE, ITEM_PROCESSOR_TYPE>::DoExecute ); + jobs[i]->SetDescription( m_szDescription ); + } + + DoExecute(); + + for ( i = 0; i < nJobs; i++ ) + { + jobs[i]->Abort(); // will either abort ones that never got a thread, or noop on ones that did + jobs[i]->Release(); + } + } + else + { + DoExecute(); + } + } + + ITEM_PROCESSOR_TYPE m_ItemProcessor; + +private: + void DoExecute() + { + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "DoExecute %s", m_szDescription ); + + if ( m_pItems < m_pLimit ) + { + m_ItemProcessor.Begin(); + + ITEM_TYPE *pLimit = m_pLimit; + + for (;;) + { + ITEM_TYPE *pCurrent = m_pItems++; + if ( pCurrent < pLimit ) + { + m_ItemProcessor.Process( *pCurrent ); + } + else + { + break; + } + } + + m_ItemProcessor.End(); + } + } + CInterlockedPtr<ITEM_TYPE> m_pItems; + ITEM_TYPE * m_pLimit; + const char * m_szDescription; +}; + +template <typename ITEM_TYPE> +inline void ParallelProcess( const char *pszDescription, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX ) +{ + CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor( pszDescription ); + processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd ); + processor.Run( pItems, nItems, nMaxParallel ); + +} + +template <typename ITEM_TYPE, typename OBJECT_TYPE, typename FUNCTION_CLASS > +inline void ParallelProcess( const char *pszDescription, ITEM_TYPE *pItems, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( ITEM_TYPE & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX ) +{ + CParallelProcessor<ITEM_TYPE, CMemberFuncJobItemProcessor<ITEM_TYPE, OBJECT_TYPE, FUNCTION_CLASS> > processor( pszDescription ); + processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd ); + processor.Run( pItems, nItems, nMaxParallel ); +} + +// Parallel Process that lets you specify threadpool +template <typename ITEM_TYPE> +inline void ParallelProcess( const char *pszDescription, IThreadPool *pPool, ITEM_TYPE *pItems, unsigned nItems, void (*pfnProcess)( ITEM_TYPE & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX ) +{ + CParallelProcessor<ITEM_TYPE, CFuncJobItemProcessor<ITEM_TYPE> > processor( pszDescription ); + processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd ); + processor.Run( pItems, nItems, nMaxParallel, pPool ); +} + + +template <class ITEM_PROCESSOR_TYPE> +class CParallelLoopProcessor +{ +public: + CParallelLoopProcessor( const char *pszDescription ) + { + m_lIndex = m_lLimit= 0; + m_nActive = 0; + m_szDescription = pszDescription; + } + + void Run( long lBegin, long nItems, int nMaxParallel = INT_MAX ) + { + if ( nItems ) + { + m_lIndex = lBegin; + m_lLimit = lBegin + nItems; + int i = g_pThreadPool->NumIdleThreads(); + + if ( nMaxParallel < i) + { + i = nMaxParallel; + } + + while( i-- ) + { + ++m_nActive; + ThreadExecute( this, &CParallelLoopProcessor<ITEM_PROCESSOR_TYPE>::DoExecute )->Release(); + } + + ++m_nActive; + DoExecute(); + + while ( m_nActive ) + { + ThreadPause(); + } + } + } + + ITEM_PROCESSOR_TYPE m_ItemProcessor; + +private: + void DoExecute() + { + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "DoExecute %s", m_szDescription ); + + m_ItemProcessor.Begin(); + + long lLimit = m_lLimit; + + for (;;) + { + long lIndex = m_lIndex ++; + if ( lIndex < lLimit ) + { + m_ItemProcessor.Process( lIndex ); + } + else + { + break; + } + } + + m_ItemProcessor.End(); + + --m_nActive; + } + CInterlockedInt m_lIndex; + long m_lLimit; + CInterlockedInt m_nActive; + const char * m_szDescription; +}; + +inline void ParallelLoopProcess( const char *szDescription, long lBegin, unsigned nItems, void (*pfnProcess)( long const & ), void (*pfnBegin)() = NULL, void (*pfnEnd)() = NULL, int nMaxParallel = INT_MAX ) +{ + CParallelLoopProcessor< CFuncJobItemProcessor< long const > > processor( szDescription ); + processor.m_ItemProcessor.Init( pfnProcess, pfnBegin, pfnEnd ); + processor.Run( lBegin, nItems, nMaxParallel ); + +} + +template < typename OBJECT_TYPE, typename FUNCTION_CLASS > +inline void ParallelLoopProcess( const char *szDescription, long lBegin, unsigned nItems, OBJECT_TYPE *pObject, void (FUNCTION_CLASS::*pfnProcess)( long const & ), void (FUNCTION_CLASS::*pfnBegin)() = NULL, void (FUNCTION_CLASS::*pfnEnd)() = NULL, int nMaxParallel = INT_MAX ) +{ + CParallelLoopProcessor< CMemberFuncJobItemProcessor<long const, OBJECT_TYPE, FUNCTION_CLASS> > processor( szDescription ); + processor.m_ItemProcessor.Init( pObject, pfnProcess, pfnBegin, pfnEnd ); + processor.Run( lBegin, nItems, nMaxParallel ); +} + + +template <class Derived> +class CParallelProcessorBase +{ +protected: + typedef CParallelProcessorBase<Derived> ThisParallelProcessorBase_t; + typedef Derived ThisParallelProcessorDerived_t; + +public: + CParallelProcessorBase() + { + m_nActive = 0; + m_szDescription = NULL; + } + void SetDescription( const char *pszDescription ) + { + m_szDescription = pszDescription; + } + +protected: + void Run( int nMaxParallel = INT_MAX, int threadOverride = -1 ) + { + int i = g_pThreadPool->NumIdleThreads(); + + if ( nMaxParallel < i) + { + i = nMaxParallel; + } + + while( i -- > 0 ) + { + if ( threadOverride == -1 || i == threadOverride - 1 ) + { + ++ m_nActive; + ThreadExecute( this, &ThisParallelProcessorBase_t::DoExecute )->Release(); + } + } + + if ( threadOverride == -1 || threadOverride == 0 ) + { + ++ m_nActive; + DoExecute(); + } + + while ( m_nActive ) + { + ThreadPause(); + } + } + +protected: + void OnBegin() {} + bool OnProcess() { return false; } + void OnEnd() {} + +private: + void DoExecute() + { + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "DoExecute %s", m_szDescription ); + + static_cast<Derived *>( this )->OnBegin(); + + while ( static_cast<Derived *>( this )->OnProcess() ) + continue; + + static_cast<Derived *>(this)->OnEnd(); + + -- m_nActive; + } + + CInterlockedInt m_nActive; + const char * m_szDescription; +}; + + + + +//----------------------------------------------------------------------------- +// Raw thread launching +//----------------------------------------------------------------------------- + +inline unsigned FunctorExecuteThread( void *pParam ) +{ + CFunctor *pFunctor = (CFunctor *)pParam; + (*pFunctor)(); + pFunctor->Release(); + return 0; +} + +inline ThreadHandle_t ThreadExecuteSoloImpl( CFunctor *pFunctor, const char *pszName = NULL ) +{ + ThreadHandle_t hThread; + ThreadId_t threadId; + hThread = CreateSimpleThread( FunctorExecuteThread, pFunctor, &threadId ); + if ( pszName ) + { + ThreadSetDebugName( threadId, pszName ); + } + return hThread; +} + +inline ThreadHandle_t ThreadExecuteSolo( CJob *pJob ) { return ThreadExecuteSoloImpl( CreateFunctor( pJob, &CJob::Execute ), pJob->Describe() ); } + +template <typename T1> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1 ), pszName ); } + +template <typename T1, typename T2> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2 ), pszName ); } + +template <typename T1, typename T2, typename T3> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6, a7 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8> +inline ThreadHandle_t ThreadExecuteSolo( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8 ) { return ThreadExecuteSoloImpl( CreateFunctor( a1, a2, a3, a4, a5, a6, a7, a8 ), pszName ); } + +template <typename T1, typename T2> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2 ), pszName ); } + +template <typename T1, typename T2, typename T3> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7 ), pszName ); } + +template <typename T1, typename T2, typename T3, typename T4, typename T5, typename T6, typename T7, typename T8> +inline ThreadHandle_t ThreadExecuteSoloRef( const char *pszName, T1 a1, T2 a2, T3 a3, T4 a4, T5 a5, T6 a6, T7 a7, T8 a8 ) { return ThreadExecuteSoloImpl( CreateRefCountingFunctor(a1, a2, a3, a4, a5, a6, a7, a8 ), pszName ); } + +//----------------------------------------------------------------------------- + +inline bool IThreadPool::YieldWait( CThreadEvent &event, unsigned timeout ) +{ + CThreadEvent *pEvent = &event; + return ( YieldWait( &pEvent, 1, true, timeout ) != TW_TIMEOUT ); +} + +inline bool IThreadPool::YieldWait( CJob *pJob, unsigned timeout ) +{ + return ( YieldWait( &pJob, 1, true, timeout ) != TW_TIMEOUT ); +} + +//----------------------------------------------------------------------------- + +inline JobStatus_t CJob::Execute() +{ + if ( IsFinished() ) + { + return m_status; + } + + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s %s %d", __FUNCTION__, Describe(), m_status ); + + AUTO_LOCK( m_mutex ); + AddRef(); + + JobStatus_t result; + + switch ( m_status ) + { + case JOB_STATUS_UNSERVICED: + case JOB_STATUS_PENDING: + { + // Service it + m_status = JOB_STATUS_INPROGRESS; + result = m_status = DoExecute(); + DoCleanup(); + m_CompleteEvent.Set(); + break; + } + + case JOB_STATUS_INPROGRESS: + AssertMsg(0, "Mutex Should have protected use while processing"); + // fall through... + + case JOB_OK: + case JOB_STATUS_ABORTED: + result = m_status; + break; + + default: + AssertMsg( m_status < JOB_OK, "Unknown job state"); + result = m_status; + } + + Release(); + + return result; +} + + +//--------------------------------------------------------- + +inline JobStatus_t CJob::TryExecute() +{ + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s %s %d", __FUNCTION__, Describe(), m_status ); + + // TryLock() would only fail if another thread has entered + // Execute() or Abort() + if ( !IsFinished() && TryLock() ) + { + // ...service the request + Execute(); + Unlock(); + } + return m_status; +} + +//--------------------------------------------------------- + +inline JobStatus_t CJob::Abort( bool bDiscard ) +{ + if ( IsFinished() ) + { + return m_status; + } + + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "%s %s %d", __FUNCTION__, Describe(), m_status ); + + AUTO_LOCK( m_mutex ); + AddRef(); + + JobStatus_t result; + + switch ( m_status ) + { + case JOB_STATUS_UNSERVICED: + case JOB_STATUS_PENDING: + { + tmZone( TELEMETRY_LEVEL0, TMZF_NONE, "CJob::DoAbort" ); + + result = m_status = DoAbort( bDiscard ); + if ( bDiscard ) + DoCleanup(); + m_CompleteEvent.Set(); + } + break; + + case JOB_STATUS_ABORTED: + case JOB_STATUS_INPROGRESS: + case JOB_OK: + result = m_status; + break; + + default: + AssertMsg( m_status < JOB_OK, "Unknown job state"); + result = m_status; + } + + Release(); + + return result; +} + +//----------------------------------------------------------------------------- + +#endif // JOBTHREAD_H |