summaryrefslogtreecommitdiff
path: root/public/gcsdk/workthreadpool.h
diff options
context:
space:
mode:
authorFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
committerFluorescentCIAAfricanAmerican <[email protected]>2020-04-22 12:56:21 -0400
commit3bf9df6b2785fa6d951086978a3e66f49427166a (patch)
tree2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /public/gcsdk/workthreadpool.h
downloadarchived-source-engine-2018-hl2-src-master.tar.xz
archived-source-engine-2018-hl2-src-master.zip
Diffstat (limited to 'public/gcsdk/workthreadpool.h')
-rw-r--r--public/gcsdk/workthreadpool.h386
1 files changed, 386 insertions, 0 deletions
diff --git a/public/gcsdk/workthreadpool.h b/public/gcsdk/workthreadpool.h
new file mode 100644
index 0000000..feaa3f4
--- /dev/null
+++ b/public/gcsdk/workthreadpool.h
@@ -0,0 +1,386 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose: A thread pool implementation. You give it CWorkItems,
+// it processes them asynchronously, and hands them back to you when they've
+// been completed.
+//
+// To declare a queue, provide the implementation of a CWorkItem subtype,
+// the thread name prefix for threads in the pool, and the number of work
+// threads you want.
+//
+// CNet uses this class to offload encryption to a separate thread,
+// so that's a good place to start looking for usage examples.
+//
+//=============================================================================
+
+#ifndef WORKTHREADPOOL_H
+#define WORKTHREADPOOL_H
+#ifdef _WIN32
+#pragma once
+#endif
+
+#include <refcount.h>
+#include <reliabletimer.h>
+#include "jobtime.h"
+
+// forward declaration for CTSQueue which we can't statically allocate as our member
+// because of alignment issues on Win64
+template <class T, bool bTestOptimizer>
+class CTSQueue;
+
+namespace GCSDK {
+
+// forward declarations
+class CWorkThread;
+class CJobMgr;
+
+
+// these functions return pointers to fixed string in the code section. We need this for VPROF nodes
+#define DECLARE_WORK_ITEM( classname ) \
+ virtual const char* GetDispatchCompletedName() const { return #classname"::DispatchCompleted"; } \
+ virtual const char* GetThreadProcessName() const { return #classname"::ThreadProcess"; }
+
+
+//-----------------------------------------------------------------------------
+// Purpose: Work item base class. Derive from this for specific work item types.
+// The derived type ideally should be self-contained with all data it
+// needs to perform the work.
+//-----------------------------------------------------------------------------
+class CWorkItem : public CRefCount
+{
+public:
+ CWorkItem()
+ : m_JobID( k_GIDNil ),
+ m_bRunning( false ),
+ m_bResubmit( false ),
+ m_bCanceled( false ),
+ m_ulSequenceNumber( 0 )
+ {
+ m_jobtimeTimeout.SetLTime( 0 );
+ m_jobtimeQueued.SetToJobTime();
+ }
+
+ CWorkItem( JobID_t jobID )
+ : m_JobID( jobID ),
+ m_bRunning( false ),
+ m_bResubmit( false ),
+ m_bCanceled( false ),
+ m_ulSequenceNumber( 0 )
+ {
+ m_jobtimeTimeout.SetLTime( 0 );
+ m_jobtimeQueued.SetToJobTime();
+ }
+
+ CWorkItem( JobID_t jobID, int64 cTimeoutMicroseconds )
+ : m_JobID( jobID ),
+ m_bRunning( false ),
+ m_bResubmit( false ),
+ m_bCanceled( false ),
+ m_ulSequenceNumber( 0 )
+ {
+ SetPreExecuteTimeout( cTimeoutMicroseconds );
+ m_jobtimeQueued.SetToJobTime();
+ }
+
+ void SetJobID( JobID_t jobID )
+ {
+ Assert(jobID != k_GIDNil) ;
+ m_JobID = jobID;
+ }
+ JobID_t GetJobID() const { return m_JobID; }
+
+ bool HasTimedOut() const { return m_jobtimeTimeout.LTime() != 0 && m_jobtimeTimeout.CServerMicroSecsPassed() > 0; }
+ int64 WaitingTime() const { return m_jobtimeQueued.CServerMicroSecsPassed(); }
+ void SetPreExecuteTimeout( int64 cMicroSeconds ) { m_jobtimeTimeout.SetFromJobTime( cMicroSeconds ); }
+ bool BPreExecuteTimeoutSet( ) const { return m_jobtimeTimeout.LTime() != 0; }
+ void ForceTimeOut() { m_jobtimeTimeout.SetFromJobTime( -1 );}
+ bool BIsRunning() const { return m_bRunning; } // true if running right now
+ bool WasCancelled() const { return m_bCanceled; }
+ void SetCycleCount( CCycleCount& cycleCount ) { m_CycleCount = cycleCount ; }
+ CCycleCount GetCycleCount() { return m_CycleCount; }
+ uint64 GetSequenceNumber() { return m_ulSequenceNumber; }
+
+ // Work threads can call this to force a work item to be reprocessed (added to the end of the process queue)
+ void SetResubmit( bool bResubmit ) { m_bResubmit = bResubmit; }
+
+ // these functions return pointers to fixed string in the code section.
+ // We need this for VPROF nodes, you must use the DECLARE_WORK_ITEM macro
+ virtual const char* GetDispatchCompletedName() const = 0;
+ virtual const char* GetThreadProcessName() const = 0;
+
+ // Return false if your operation failed in some way that you would want to know about
+ // The CWorkThreadPool will count the failures.
+ virtual bool ThreadProcess( CWorkThread *pThread ) = 0; // called by the worker thread
+ virtual bool DispatchCompletedWorkItem( CJobMgr *jobMgr ); // called by main loop after item completed
+
+#ifdef DBGFLAG_VALIDATE
+ virtual void Validate( CValidator &validator, const char *pchName ) {} // Validate our internal structures
+#endif
+
+protected:
+ // note: destructor is private. This is a ref-counted object, private destructor ensures callers can't accidentally delete
+ // directly, or declare on stack
+ virtual ~CWorkItem() { }
+
+ friend class CWorkThread;
+ friend class CWorkThreadPool;
+ uint64 m_ulSequenceNumber; // Sequence number for the work item, used when enforcing output ordering as matching input order
+ CCycleCount m_CycleCount; // A record of how long it took to execute this particular work item !
+
+private:
+ bool m_bResubmit; // true if the item should be resubmitted after last run
+ volatile bool m_bRunning; // true if the work item is running right now
+ bool m_bCanceled; // true if the work was canceled due to timeout
+ CJobTime m_jobtimeTimeout; // time at which this result is no longer valid, so it shouldn't start to be processed
+ CJobTime m_jobtimeQueued;
+ JobID_t m_JobID;
+};
+
+// forward decl
+class CWorkThreadPool;
+
+//-----------------------------------------------------------------------------
+// Purpose: Generic work thread implementation, to be specialized if necessary
+//-----------------------------------------------------------------------------
+class CWorkThread : public CThread
+{
+public:
+ CWorkThread( CWorkThreadPool *pThreadPool );
+ CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName );
+
+ virtual ~CWorkThread()
+ {
+ }
+
+ virtual int Run();
+
+ virtual void Cancel()
+ {
+ }
+
+protected:
+ CWorkThreadPool *m_pThreadPool; // parent pool
+ volatile bool m_bExitThread; // set by CWorkThreadPool::StopWorkerThreads and possibly by subclasses of CWorkThread
+ volatile bool m_bFinished; // set by CWorkThread::Run [note: must still check IsThreadRunning, and/or call Join]
+ virtual void OnStart() { }
+ virtual void OnExit() { }
+
+#ifdef DBGFLAG_VALIDATE
+public:
+ virtual void Validate( CValidator &validator, const char *pchName )
+ {
+ VALIDATE_SCOPE();
+ };
+#endif // DBGFLAG_VALIDATE
+
+friend class CWorkThreadPool;
+};
+
+
+//-----------------------------------------------------------------------------
+// callback class to create work threads
+//-----------------------------------------------------------------------------
+class IWorkThreadFactory
+{
+public:
+ virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) = 0;
+};
+
+
+//-----------------------------------------------------------------------------
+// reusable trivial implementation of IWorkThreadFactory
+//-----------------------------------------------------------------------------
+template<class T>
+class CWorkThreadFactory : public IWorkThreadFactory
+{
+public:
+ virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool )
+ {
+ return new T( pWorkThreadPool );
+ }
+};
+
+
+//-----------------------------------------------------------------------------
+// Purpose: interface class for object that the WorkThreadPool can signal when
+// there are completed work items to process
+//-----------------------------------------------------------------------------
+class IWorkThreadPoolSignal
+{
+public:
+ virtual void Signal() = 0;
+};
+
+
+//-----------------------------------------------------------------------------
+// Purpose: pool of work threads.
+//-----------------------------------------------------------------------------
+class CWorkThreadPool
+{
+ friend class CWorkThread;
+public:
+
+ static void SetWorkItemCompletedSignal( IWorkThreadPoolSignal *pObject )
+ {
+ sm_pWorkItemsCompletedSignal = pObject;
+ }
+
+
+ CWorkThreadPool( const char *pszThreadNamePfx );
+
+ // eventually it might be nice to be able to resize these pools via console command
+ // in that case, we'd want a constructor like this, and a PoolSize accessor/mutator pair
+ // it makes this class much more complicated, however (growing the pool is easy, shrinking it
+ // is less easy) so we'll punt for now.
+ /* CWorkThreadPool( const char *pszName = "unnamed thread" ) : CWorkThreadPool( pszName, -1 ); */
+
+ virtual ~CWorkThreadPool();
+
+ // Setting this will ensure that items of the same priority complete and get dispatched in the same order
+ // they are added to the threadpool. This has a small additional locking overhead and can increase latency
+ // as items that are actually completed out-of-order have to queue waiting on earlier items.
+ void SetEnsureOutputOrdering( bool bEnsureOutputOrdering ) { m_bEnsureOutputOrdering = bEnsureOutputOrdering; }
+
+ void AllowTimeouts( bool bMayHaveJobTimeouts ) { m_bMayHaveJobTimeouts = bMayHaveJobTimeouts; }
+
+ int AddWorkThread( CWorkThread *pThread );
+ void StartWorkThreads(); // gentlemen, start your engines
+ void StopWorkThreads(); // stop work threads
+ bool HasWorkItemsToProcess() const;
+
+ // sets it to use dynamic worker thread construction
+ // if pWorkThreadControl is NULL, just creates a standard CWorkThread object
+ void SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor );
+
+ bool AddWorkItem( CWorkItem *pWorkItem ); // add a work item to the queue to process
+ CWorkItem *GetNextCompletedWorkItem( ); // get next completed work item and it's priority if needed
+ const char *GetThreadNamePrefix() const { return m_szThreadNamePfx; }
+
+ void SetNeverSetEventOnAdd( bool bNeverSet );
+ bool BNeverSetEventOnAdd() { return m_bNeverSetOnAdd; }
+
+ // get count of completed work items
+ // can't be inline because of m_TSQueueCompleted type
+ int GetCompletedWorkItemCount() const;
+
+ // get count of work items to process
+ // can't be inline because of m_TSQueueToProcess type
+ int GetWorkItemToProcessCount() const;
+
+ uint64 GetLastUsedSequenceNumber( ) const
+ {
+ return m_ulLastUsedSequenceNumber;
+ }
+
+ uint64 GetLastCompletedSequenceNumber( ) const
+ {
+ return m_ulLastCompletedSequenceNumber;
+ }
+
+ uint64 GetLastDispatchedSequenceNumber( ) const
+ {
+ return m_ulLastDispatchedSequenceNumber;
+ }
+
+#if 0
+ uint64 GetAveExecutionTime() const
+ {
+ return m_StatExecutionTime.GetUlAvg();
+ }
+ uint64 GetAveWaitTime() const
+ {
+ return m_StatWaitTime.GetUlAvg();
+ }
+ uint64 GetCurrentBacklogTime() const;
+#endif
+
+ int CountCompletedSuccess() const { return m_cSuccesses; }
+ int CountRetries() const { return m_cRetries; }
+ int CountCompletedFailed() const { return m_cFailures; }
+
+ bool BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr );
+ bool BExiting() const { return m_bExiting; }
+
+ int GetWorkerCount() const { return m_WorkThreads.Count(); }
+
+ uint GetActiveThreadCount() const { return m_cActiveThreads; }
+
+ // make sure you lock before using this
+ const CWorkThread *GetWorkThread( int iIndex ) const
+ {
+ Assert( iIndex >= 0 && iIndex < m_WorkThreads.Count() );
+ return m_WorkThreads[iIndex];
+ }
+
+protected:
+
+ // STATICS
+ static IWorkThreadPoolSignal *sm_pWorkItemsCompletedSignal;
+
+ // MEMBERS
+ CWorkItem *GetNextWorkItemToProcess( );
+ void StartWorkThread( CWorkThread *pWorkThread, int iName );
+
+ // meaningful thread name prefix
+ char m_szThreadNamePfx[32];
+ // have we actually initialized the threadpool?
+ bool m_bThreadsInitialized;
+
+ // Incoming queue: queue of all work items to process
+ // must be dynamically allocated for alignment requirements on Win64
+ CTSQueue< CWorkItem *, false > *m_pTSQueueToProcess;
+
+ // Outgoing queues: queue of all completed work items
+ // must be dynamically allocated for alignment requirements on Win64
+ CTSQueue< CWorkItem *, false > *m_pTSQueueCompleted;
+
+ // Vectors of completed, but out of order and waiting work items, only used when bEnsureOutputOrdering == true
+ CThreadMutex m_MutexOnItemCompletedOrdered;
+ CUtlVector< CWorkItem * > m_vecCompletedAndWaiting;
+
+ // Should we emit work items in the same order they are received (on a per priority basis)
+ bool m_bEnsureOutputOrdering;
+
+ // Sequence numbers
+ uint64 m_ulLastUsedSequenceNumber;
+ uint64 m_ulLastCompletedSequenceNumber;
+ uint64 m_ulLastDispatchedSequenceNumber;
+
+ bool m_bMayHaveJobTimeouts;
+ CUtlVector< CWorkThread * > m_WorkThreads;
+ CThreadMutex m_WorkThreadMutex;
+ CInterlockedUInt m_cThreadsRunning; // how many threads are running
+ volatile bool m_bExiting; // are we exiting
+ CThreadEvent m_EventNewWorkItem; // event set when a new work item is available to process
+ CInterlockedInt m_cActiveThreads;
+ volatile bool m_bNeverSetOnAdd;
+
+ bool m_bAutoCreateThreads;
+ int m_cMaxThreads;
+ IWorkThreadFactory *m_pWorkThreadConstructor;
+
+ // override this method if you want to do any special handling of completed work items. Default implementation puts
+ // work items in our completed item queue.
+ virtual void OnWorkItemCompleted( CWorkItem *pWorkItem );
+
+ bool BTryDeleteExitedWorkerThreads();
+
+ int m_cSuccesses;
+ int m_cFailures;
+ int m_cRetries;
+#if 0
+ CStat m_StatExecutionTime;
+ CStat m_StatWaitTime;
+#endif
+ CLimitTimer m_LimitTimerCreateNewThreads;
+
+#ifdef DBGFLAG_VALIDATE
+public:
+ void Validate( CValidator &validator, const char *pchName );
+#endif
+};
+
+} // namespace GCSDK
+
+
+#endif // WORKTHREAD_H