diff options
| author | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
|---|---|---|
| committer | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
| commit | 3bf9df6b2785fa6d951086978a3e66f49427166a (patch) | |
| tree | 2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /public/gcsdk/workthreadpool.h | |
| download | archived-source-engine-2018-hl2-src-master.tar.xz archived-source-engine-2018-hl2-src-master.zip | |
Diffstat (limited to 'public/gcsdk/workthreadpool.h')
| -rw-r--r-- | public/gcsdk/workthreadpool.h | 386 |
1 files changed, 386 insertions, 0 deletions
diff --git a/public/gcsdk/workthreadpool.h b/public/gcsdk/workthreadpool.h new file mode 100644 index 0000000..feaa3f4 --- /dev/null +++ b/public/gcsdk/workthreadpool.h @@ -0,0 +1,386 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: A thread pool implementation. You give it CWorkItems, +// it processes them asynchronously, and hands them back to you when they've +// been completed. +// +// To declare a queue, provide the implementation of a CWorkItem subtype, +// the thread name prefix for threads in the pool, and the number of work +// threads you want. +// +// CNet uses this class to offload encryption to a separate thread, +// so that's a good place to start looking for usage examples. +// +//============================================================================= + +#ifndef WORKTHREADPOOL_H +#define WORKTHREADPOOL_H +#ifdef _WIN32 +#pragma once +#endif + +#include <refcount.h> +#include <reliabletimer.h> +#include "jobtime.h" + +// forward declaration for CTSQueue which we can't statically allocate as our member +// because of alignment issues on Win64 +template <class T, bool bTestOptimizer> +class CTSQueue; + +namespace GCSDK { + +// forward declarations +class CWorkThread; +class CJobMgr; + + +// these functions return pointers to fixed string in the code section. We need this for VPROF nodes +#define DECLARE_WORK_ITEM( classname ) \ + virtual const char* GetDispatchCompletedName() const { return #classname"::DispatchCompleted"; } \ + virtual const char* GetThreadProcessName() const { return #classname"::ThreadProcess"; } + + +//----------------------------------------------------------------------------- +// Purpose: Work item base class. Derive from this for specific work item types. +// The derived type ideally should be self-contained with all data it +// needs to perform the work. +//----------------------------------------------------------------------------- +class CWorkItem : public CRefCount +{ +public: + CWorkItem() + : m_JobID( k_GIDNil ), + m_bRunning( false ), + m_bResubmit( false ), + m_bCanceled( false ), + m_ulSequenceNumber( 0 ) + { + m_jobtimeTimeout.SetLTime( 0 ); + m_jobtimeQueued.SetToJobTime(); + } + + CWorkItem( JobID_t jobID ) + : m_JobID( jobID ), + m_bRunning( false ), + m_bResubmit( false ), + m_bCanceled( false ), + m_ulSequenceNumber( 0 ) + { + m_jobtimeTimeout.SetLTime( 0 ); + m_jobtimeQueued.SetToJobTime(); + } + + CWorkItem( JobID_t jobID, int64 cTimeoutMicroseconds ) + : m_JobID( jobID ), + m_bRunning( false ), + m_bResubmit( false ), + m_bCanceled( false ), + m_ulSequenceNumber( 0 ) + { + SetPreExecuteTimeout( cTimeoutMicroseconds ); + m_jobtimeQueued.SetToJobTime(); + } + + void SetJobID( JobID_t jobID ) + { + Assert(jobID != k_GIDNil) ; + m_JobID = jobID; + } + JobID_t GetJobID() const { return m_JobID; } + + bool HasTimedOut() const { return m_jobtimeTimeout.LTime() != 0 && m_jobtimeTimeout.CServerMicroSecsPassed() > 0; } + int64 WaitingTime() const { return m_jobtimeQueued.CServerMicroSecsPassed(); } + void SetPreExecuteTimeout( int64 cMicroSeconds ) { m_jobtimeTimeout.SetFromJobTime( cMicroSeconds ); } + bool BPreExecuteTimeoutSet( ) const { return m_jobtimeTimeout.LTime() != 0; } + void ForceTimeOut() { m_jobtimeTimeout.SetFromJobTime( -1 );} + bool BIsRunning() const { return m_bRunning; } // true if running right now + bool WasCancelled() const { return m_bCanceled; } + void SetCycleCount( CCycleCount& cycleCount ) { m_CycleCount = cycleCount ; } + CCycleCount GetCycleCount() { return m_CycleCount; } + uint64 GetSequenceNumber() { return m_ulSequenceNumber; } + + // Work threads can call this to force a work item to be reprocessed (added to the end of the process queue) + void SetResubmit( bool bResubmit ) { m_bResubmit = bResubmit; } + + // these functions return pointers to fixed string in the code section. + // We need this for VPROF nodes, you must use the DECLARE_WORK_ITEM macro + virtual const char* GetDispatchCompletedName() const = 0; + virtual const char* GetThreadProcessName() const = 0; + + // Return false if your operation failed in some way that you would want to know about + // The CWorkThreadPool will count the failures. + virtual bool ThreadProcess( CWorkThread *pThread ) = 0; // called by the worker thread + virtual bool DispatchCompletedWorkItem( CJobMgr *jobMgr ); // called by main loop after item completed + +#ifdef DBGFLAG_VALIDATE + virtual void Validate( CValidator &validator, const char *pchName ) {} // Validate our internal structures +#endif + +protected: + // note: destructor is private. This is a ref-counted object, private destructor ensures callers can't accidentally delete + // directly, or declare on stack + virtual ~CWorkItem() { } + + friend class CWorkThread; + friend class CWorkThreadPool; + uint64 m_ulSequenceNumber; // Sequence number for the work item, used when enforcing output ordering as matching input order + CCycleCount m_CycleCount; // A record of how long it took to execute this particular work item ! + +private: + bool m_bResubmit; // true if the item should be resubmitted after last run + volatile bool m_bRunning; // true if the work item is running right now + bool m_bCanceled; // true if the work was canceled due to timeout + CJobTime m_jobtimeTimeout; // time at which this result is no longer valid, so it shouldn't start to be processed + CJobTime m_jobtimeQueued; + JobID_t m_JobID; +}; + +// forward decl +class CWorkThreadPool; + +//----------------------------------------------------------------------------- +// Purpose: Generic work thread implementation, to be specialized if necessary +//----------------------------------------------------------------------------- +class CWorkThread : public CThread +{ +public: + CWorkThread( CWorkThreadPool *pThreadPool ); + CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName ); + + virtual ~CWorkThread() + { + } + + virtual int Run(); + + virtual void Cancel() + { + } + +protected: + CWorkThreadPool *m_pThreadPool; // parent pool + volatile bool m_bExitThread; // set by CWorkThreadPool::StopWorkerThreads and possibly by subclasses of CWorkThread + volatile bool m_bFinished; // set by CWorkThread::Run [note: must still check IsThreadRunning, and/or call Join] + virtual void OnStart() { } + virtual void OnExit() { } + +#ifdef DBGFLAG_VALIDATE +public: + virtual void Validate( CValidator &validator, const char *pchName ) + { + VALIDATE_SCOPE(); + }; +#endif // DBGFLAG_VALIDATE + +friend class CWorkThreadPool; +}; + + +//----------------------------------------------------------------------------- +// callback class to create work threads +//----------------------------------------------------------------------------- +class IWorkThreadFactory +{ +public: + virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) = 0; +}; + + +//----------------------------------------------------------------------------- +// reusable trivial implementation of IWorkThreadFactory +//----------------------------------------------------------------------------- +template<class T> +class CWorkThreadFactory : public IWorkThreadFactory +{ +public: + virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) + { + return new T( pWorkThreadPool ); + } +}; + + +//----------------------------------------------------------------------------- +// Purpose: interface class for object that the WorkThreadPool can signal when +// there are completed work items to process +//----------------------------------------------------------------------------- +class IWorkThreadPoolSignal +{ +public: + virtual void Signal() = 0; +}; + + +//----------------------------------------------------------------------------- +// Purpose: pool of work threads. +//----------------------------------------------------------------------------- +class CWorkThreadPool +{ + friend class CWorkThread; +public: + + static void SetWorkItemCompletedSignal( IWorkThreadPoolSignal *pObject ) + { + sm_pWorkItemsCompletedSignal = pObject; + } + + + CWorkThreadPool( const char *pszThreadNamePfx ); + + // eventually it might be nice to be able to resize these pools via console command + // in that case, we'd want a constructor like this, and a PoolSize accessor/mutator pair + // it makes this class much more complicated, however (growing the pool is easy, shrinking it + // is less easy) so we'll punt for now. + /* CWorkThreadPool( const char *pszName = "unnamed thread" ) : CWorkThreadPool( pszName, -1 ); */ + + virtual ~CWorkThreadPool(); + + // Setting this will ensure that items of the same priority complete and get dispatched in the same order + // they are added to the threadpool. This has a small additional locking overhead and can increase latency + // as items that are actually completed out-of-order have to queue waiting on earlier items. + void SetEnsureOutputOrdering( bool bEnsureOutputOrdering ) { m_bEnsureOutputOrdering = bEnsureOutputOrdering; } + + void AllowTimeouts( bool bMayHaveJobTimeouts ) { m_bMayHaveJobTimeouts = bMayHaveJobTimeouts; } + + int AddWorkThread( CWorkThread *pThread ); + void StartWorkThreads(); // gentlemen, start your engines + void StopWorkThreads(); // stop work threads + bool HasWorkItemsToProcess() const; + + // sets it to use dynamic worker thread construction + // if pWorkThreadControl is NULL, just creates a standard CWorkThread object + void SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor ); + + bool AddWorkItem( CWorkItem *pWorkItem ); // add a work item to the queue to process + CWorkItem *GetNextCompletedWorkItem( ); // get next completed work item and it's priority if needed + const char *GetThreadNamePrefix() const { return m_szThreadNamePfx; } + + void SetNeverSetEventOnAdd( bool bNeverSet ); + bool BNeverSetEventOnAdd() { return m_bNeverSetOnAdd; } + + // get count of completed work items + // can't be inline because of m_TSQueueCompleted type + int GetCompletedWorkItemCount() const; + + // get count of work items to process + // can't be inline because of m_TSQueueToProcess type + int GetWorkItemToProcessCount() const; + + uint64 GetLastUsedSequenceNumber( ) const + { + return m_ulLastUsedSequenceNumber; + } + + uint64 GetLastCompletedSequenceNumber( ) const + { + return m_ulLastCompletedSequenceNumber; + } + + uint64 GetLastDispatchedSequenceNumber( ) const + { + return m_ulLastDispatchedSequenceNumber; + } + +#if 0 + uint64 GetAveExecutionTime() const + { + return m_StatExecutionTime.GetUlAvg(); + } + uint64 GetAveWaitTime() const + { + return m_StatWaitTime.GetUlAvg(); + } + uint64 GetCurrentBacklogTime() const; +#endif + + int CountCompletedSuccess() const { return m_cSuccesses; } + int CountRetries() const { return m_cRetries; } + int CountCompletedFailed() const { return m_cFailures; } + + bool BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr ); + bool BExiting() const { return m_bExiting; } + + int GetWorkerCount() const { return m_WorkThreads.Count(); } + + uint GetActiveThreadCount() const { return m_cActiveThreads; } + + // make sure you lock before using this + const CWorkThread *GetWorkThread( int iIndex ) const + { + Assert( iIndex >= 0 && iIndex < m_WorkThreads.Count() ); + return m_WorkThreads[iIndex]; + } + +protected: + + // STATICS + static IWorkThreadPoolSignal *sm_pWorkItemsCompletedSignal; + + // MEMBERS + CWorkItem *GetNextWorkItemToProcess( ); + void StartWorkThread( CWorkThread *pWorkThread, int iName ); + + // meaningful thread name prefix + char m_szThreadNamePfx[32]; + // have we actually initialized the threadpool? + bool m_bThreadsInitialized; + + // Incoming queue: queue of all work items to process + // must be dynamically allocated for alignment requirements on Win64 + CTSQueue< CWorkItem *, false > *m_pTSQueueToProcess; + + // Outgoing queues: queue of all completed work items + // must be dynamically allocated for alignment requirements on Win64 + CTSQueue< CWorkItem *, false > *m_pTSQueueCompleted; + + // Vectors of completed, but out of order and waiting work items, only used when bEnsureOutputOrdering == true + CThreadMutex m_MutexOnItemCompletedOrdered; + CUtlVector< CWorkItem * > m_vecCompletedAndWaiting; + + // Should we emit work items in the same order they are received (on a per priority basis) + bool m_bEnsureOutputOrdering; + + // Sequence numbers + uint64 m_ulLastUsedSequenceNumber; + uint64 m_ulLastCompletedSequenceNumber; + uint64 m_ulLastDispatchedSequenceNumber; + + bool m_bMayHaveJobTimeouts; + CUtlVector< CWorkThread * > m_WorkThreads; + CThreadMutex m_WorkThreadMutex; + CInterlockedUInt m_cThreadsRunning; // how many threads are running + volatile bool m_bExiting; // are we exiting + CThreadEvent m_EventNewWorkItem; // event set when a new work item is available to process + CInterlockedInt m_cActiveThreads; + volatile bool m_bNeverSetOnAdd; + + bool m_bAutoCreateThreads; + int m_cMaxThreads; + IWorkThreadFactory *m_pWorkThreadConstructor; + + // override this method if you want to do any special handling of completed work items. Default implementation puts + // work items in our completed item queue. + virtual void OnWorkItemCompleted( CWorkItem *pWorkItem ); + + bool BTryDeleteExitedWorkerThreads(); + + int m_cSuccesses; + int m_cFailures; + int m_cRetries; +#if 0 + CStat m_StatExecutionTime; + CStat m_StatWaitTime; +#endif + CLimitTimer m_LimitTimerCreateNewThreads; + +#ifdef DBGFLAG_VALIDATE +public: + void Validate( CValidator &validator, const char *pchName ); +#endif +}; + +} // namespace GCSDK + + +#endif // WORKTHREAD_H |