summaryrefslogtreecommitdiff
path: root/public/gcsdk/jobmgr.h
diff options
context:
space:
mode:
Diffstat (limited to 'public/gcsdk/jobmgr.h')
-rw-r--r--public/gcsdk/jobmgr.h430
1 files changed, 430 insertions, 0 deletions
diff --git a/public/gcsdk/jobmgr.h b/public/gcsdk/jobmgr.h
new file mode 100644
index 0000000..93b6387
--- /dev/null
+++ b/public/gcsdk/jobmgr.h
@@ -0,0 +1,430 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+// $NoKeywords: $
+//=============================================================================
+
+#ifndef GC_JOBMGR_H
+#define GC_JOBMGR_H
+#ifdef _WIN32
+#pragma once
+#endif
+
+#include "tier0/fasttimer.h"
+#include "tier1/utlpriorityqueue.h"
+#include "job.h"
+#include "workthreadpool.h"
+class GCConVar;
+
+#include "tier0/memdbgon.h"
+
+namespace GCSDK
+{
+
+#if defined(_DEBUG)
+// this is restricted to debug builds due to the performance cost
+// that could be changed by removing the expensive sm_listAllJobs.Find() command
+#define DEBUG_JOB_LIST
+#endif // defined(_DEBUG)
+
+struct JobStats_t
+{
+ uint m_cJobsCurrent;
+ uint m_cJobsTotal;
+ uint m_cJobsFailed;
+ uint64 m_cJobsTimedOut; // # of jobs timed out ever
+ double m_flSumJobTimeMicrosec;
+ double m_flSumSqJobTimeMicrosec;
+ uint64 m_unMaxJobTimeMicrosec;
+
+ uint m_cTimeslices;
+
+ JobStats_t()
+ {
+ memset( this, 0, sizeof(JobStats_t) );
+ }
+};
+
+struct JobStatsBucket_t
+{
+ JobStatsBucket_t()
+ {
+ memset( this, 0, sizeof(JobStatsBucket_t) );
+ }
+ char m_rgchName[64];
+ uint64 m_cCompletes;
+ uint64 m_u64RunTimeMax;
+ uint64 m_cTimeoutNetMsg;
+ uint64 m_cLongInterYieldTime;
+ uint64 m_cLocksAttempted;
+ uint64 m_cLocksWaitedFor;
+ uint64 m_cLocksFailed;
+ uint64 m_cLocksLongHeld;
+ uint64 m_cLocksLongWait;
+ uint64 m_cWaitTimeout;
+ uint64 m_u64JobDuration;
+ uint64 m_cJobsPaused;
+ uint64 m_cJobsFailed;
+ uint64 m_u64RunTime;
+ // use by ListJobs
+ uint64 m_cPauseReasonNetworkMsg;
+ uint64 m_cPauseReasonSleepForTime;
+ uint64 m_cPauseReasonWaitingForLock;
+ uint64 m_cPauseReasonYield;
+ uint64 m_cPauseReasonSQL;
+ uint64 m_cPauseReasonWorkItem;
+
+#ifdef DBGFLAG_VALIDATE
+ void Validate( CValidator &validator, const char *pchName )
+ {
+ VALIDATE_SCOPE();
+ }
+#endif
+};
+
+enum EJobProfileAction
+{
+ k_EJobProfileAction_ErrorReport = 0,
+ k_EJobProfileAction_Start = 1,
+ k_EJobProfileAction_Stop = 2,
+ k_EJobProfileAction_Dump = 3,
+ k_EJobProfileAction_Clear = 4,
+};
+
+enum EJobProfileSortOrder
+{
+ k_EJobProfileSortOrder_Alpha = 0,
+ k_EJobProfileSortOrder_Count = 1,
+ k_EJobProfileSortOrder_TotalRuntime = 2,
+};
+
+struct JobProfileStats_t
+{
+ int m_iJobProfileSort;
+ CUtlMap< uint32, JobStatsBucket_t, int > *pmapStatsBucket;
+};
+
+//-----------------------------------------------------------------------------
+// Purpose: This keeps track of all jobs that belong to a given hub.
+// It's primarily used for routing incoming messages to jobs.
+//-----------------------------------------------------------------------------
+class CJobMgr
+{
+public:
+ // Constructors & destructors
+ CJobMgr();
+ ~CJobMgr();
+
+ // gets the next available job ID
+ JobID_t GetNewJobID();
+
+ // Set the thread count for the internal thread pool(s)
+ void SetThreadPoolSize( uint cThreads );
+
+ // Run any sleeping jobs who's wakeup time has arrived and check for timeouts
+ bool BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer );
+
+ // Run any yielding jobs, even low priority ones
+ bool BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer );
+
+ // Route this message to an existing Job, or create a new one if that JobID does not exist
+ bool BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );
+
+ // Adds a new Job to the mgr and generates a JobID for it.
+ void InsertJob( CJob &job );
+
+ // Removes a Job from the mgr (the caller is still responsible for freeing it)
+ void RemoveJob( CJob &job );
+
+ //called by a job that has just been started to place itself on the yield queue instead of running
+ void AddDelayedJobToYieldList( CJob &job );
+
+#ifdef GC
+ // resumes the specified job if it is, in fact, waiting for a SQL query to return
+ bool BResumeSQLJob( JobID_t jobID );
+
+ // yields waiting for a query response
+ bool BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog );
+
+ // SQL profiling
+ enum ESQLProfileSort
+ {
+ k_ESQLProfileSortTotalTime,
+ k_ESQLProfileSortTotalCount,
+ k_ESQLProfileSortAvgTime,
+ k_ESQLProfileSortName
+ };
+
+ void StartSQLProfiling();
+ void StopSQLProfiling();
+ void DumpSQLProfile( ESQLProfileSort eSort );
+#endif
+
+ // returns true if we're running any jobs of the specified name
+ // slow to call if lots of jobs are running, should only be used by tests
+ bool BIsJobRunning( const char *pchJobName );
+
+ // passes a network msg directly to the specified job
+ void PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );
+
+ // yields until a network message is received
+ bool BYieldingWaitForMsg( CJob &job );
+
+ // yields for a set amount of time
+ bool BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep );
+
+ // simple yield until Run() called again
+ bool BYield( CJob &job );
+
+ // Yield only if job manager decides we need to
+ bool BYieldIfNeeded( CJob &job, bool *pbYielded );
+
+ // Thread pool work item
+ bool BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName = NULL );
+ bool BRouteWorkItemCompleted( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ true ); }
+ bool BRouteWorkItemCompletedIfExists( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ true ); }
+ bool BRouteWorkItemCompletedDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ false ); }
+ bool BRouteWorkItemCompletedIfExistsDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ false ); }
+
+ void AddThreadedJobWorkItem( CWorkItem *pWorkItem );
+ void StopWorkThreads() { m_WorkThreadPool.StopWorkThreads(); }
+
+ static int ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
+
+ void ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder = k_EJobProfileSortOrder_Alpha );
+ int DumpJobSummary();
+ void DumpJob( JobID_t jobID, int nPrintLocksMax = 20 ) const;
+ int CountJobs() const; // counts currently active jobs
+ void CheckThreadID(); // make sure we are still in the correct thread
+ int CountYieldingJobs() const { return m_ListJobsYieldingRegPri.Count(); } // counts jobs currently in a yielding state
+ bool HasOutstandingThreadPoolWorkItems();
+
+ void SetIsShuttingDown();
+ bool GetIsShuttingDown() const { return m_bIsShuttingDown; }
+
+ void *GetMainMemoryDebugInfo() { return g_memMainDebugInfo.Base(); }
+
+#ifdef DBGFLAG_VALIDATE
+ void Validate( CValidator &validator, const char *pchName ); // Validate our internal structures
+ static void ValidateStatics( CValidator &validator, const char *pchName );
+#endif /* DBGFLAG_VALIDATE */
+
+ // wakes up a job that was waiting on a lock
+ void WakeupLockedJob( CJob &job );
+
+ // returns true if there is a job active with the specified ID
+ bool BJobExists( JobID_t jobID ) const;
+
+ // returns a job
+ CJob *GetPJob( JobID_t jobID );
+ const CJob *GetPJob( JobID_t jobID ) const;
+
+ JobStats_t& GetJobStats() { return m_JobStats; }
+
+ // Access work thread pool directly
+ CWorkThreadPool *AccessWorkThreadPool() { return &m_WorkThreadPool; }
+
+ // Debug helpers
+ // dumps a list of all running jobs across ALL job managers
+ void DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax = 1 ) const;
+ // cause a debug break in the given job
+ static void DebugJob( int iJob );
+
+ // disable/enable yielding for debugging
+ void SetPauseAllowed( bool bNewPauseAllowed ) { m_bDebugDisallowPause = !bNewPauseAllowed; }
+
+private:
+
+ bool BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately );
+
+ // Create a new job for this message
+ bool BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket );
+
+ // Internal add to yield list (looks at priority)
+ void AddToYieldList( CJob &job );
+
+ // Get an IJob given a job ID and pause reason
+ bool BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob );
+
+ // Map containing all of our jobs
+ CUtlMap<JobID_t, CJob *, int> m_MapJob;
+
+ // jobs simply waiting until the next Run()
+ struct JobYielding_t
+ {
+ JobID_t m_JobID;
+ uint m_nIteration;
+ };
+ CUtlLinkedList<JobYielding_t, int> m_ListJobsYieldingRegPri;
+ bool BResumeYieldingJobs( CLimitTimer &limitTimer );
+ bool BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, CLimitTimer &limitTimer );
+ uint m_nCurrentYieldIterationRegPri;
+
+ // jobs waiting on a timer
+ struct JobSleeping_t
+ {
+ JobID_t m_JobID;
+ CJobTime m_SWakeupTime;
+ CJobTime m_STimeTouched;
+ };
+ CUtlPriorityQueue<JobSleeping_t> m_QueueJobSleeping;
+ bool BResumeSleepingJobs( CLimitTimer &limitTimer );
+ static bool JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs );
+
+ // timeout list of jobs, ordered from oldest to newest
+ struct JobTimeout_t
+ {
+ JobID_t m_JobID;
+ CJobTime m_STimePaused;
+ CJobTime m_STimeTouched;
+ uint32 m_cHeartbeatsBeforeTimeout;
+ };
+ CUtlLinkedList<JobTimeout_t, int> m_ListJobTimeouts;
+ CUtlMap<JobID_t, int, int> m_MapJobTimeoutsIndexByJobID;
+ void PauseJob( CJob &job, EJobPauseReason eJobPauseReason );
+ void CheckForJobTimeouts( CLimitTimer &limitTimer );
+ void TimeoutJob( CJob &job );
+ bool m_bJobTimedOut;
+
+ // thread pool usage, for running job functions in other threads
+ CWorkThreadPool m_WorkThreadPool;
+
+ void AccumulateStatsofJob( CJob &job );
+ void RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget );
+
+ // stats info
+ JobStats_t m_JobStats;
+
+ // static job registration
+ static void RegisterJobType( const JobType_t *pJobType );
+ friend void Job_RegisterJobType( const JobType_t *pJobType );
+
+ JobID_t m_unNextJobID;
+ uint m_unFrameFuncThreadID; // the thread is JobMgr is working in
+ bool m_bProfiling;
+ bool m_bIsShuttingDown;
+ int m_cErrorsToReport;
+ CUtlMap< uint32, JobStatsBucket_t, int > m_mapStatsBucket;
+ CUtlMap<MsgType_t, int, int> m_mapOrphanMessages;
+ CUtlMemory<unsigned char> g_memMainDebugInfo;
+
+#ifdef GC
+ // sql profiling
+ bool m_bSQLProfiling;
+ CFastTimer m_sqlTimer;
+
+ struct PendingSQLJob_t
+ {
+ int64 m_nStartMicrosec;
+ int32 m_iBucket;
+ };
+
+ struct SQLProfileBucket_t
+ {
+ int64 m_nTotalMicrosec;
+ uint32 m_unCount;
+ };
+
+ CUtlHashMapLarge<GID_t, PendingSQLJob_t> m_mapSQLQueriesInFlight;
+ CUtlDict<SQLProfileBucket_t> m_dictSQLBuckets;
+
+ struct SQLProfileCtx_t
+ {
+ ESQLProfileSort m_eSort;
+ CUtlDict<SQLProfileBucket_t> *pdictBuckets;
+ };
+ static int SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
+#endif
+
+#ifdef DEBUG_JOB_LIST
+ // static job debug list
+ static CUtlLinkedList<CJob *, int> sm_listAllJobs;
+#endif
+
+ bool m_bDebugDisallowPause;
+};
+
+
+//-----------------------------------------------------------------------------
+// Purpose: passthrough function just so the CJob internal data can be kept private
+//-----------------------------------------------------------------------------
+inline void Job_RegisterJobType( const JobType_t *pJobType )
+{
+ CJobMgr::RegisterJobType( pJobType );
+}
+
+
+//-----------------------------------------------------------------------------
+// Purpose: passthrough function just so the CJob internal data can be kept private
+//-----------------------------------------------------------------------------
+inline void Job_SetJobType( CJob &job, const JobType_t *pJobType )
+{
+ job.m_pJobType = pJobType;
+}
+
+
+//-----------------------------------------------------------------------------
+// Purpose: job registration macro
+//-----------------------------------------------------------------------------
+#define GC_REG_JOB( parentclass, jobclass, jobname, msg, servertype ) \
+ GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
+ static const GCSDK::JobType_t g_JobType_##jobclass = { jobname, (GCSDK::MsgType_t)msg, servertype, (GCSDK::JobCreationFunc_t)CreateJob_##jobclass }; \
+ GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
+ { \
+ GCSDK::CJob *job = GCSDK::CJob::AllocateJob<jobclass>( pParent ); \
+ if ( job ) \
+ { \
+ Job_SetJobType( *job, &g_JobType_##jobclass ); \
+ if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
+ } \
+ else \
+ { \
+ AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
+ } \
+ return job; \
+ } \
+ static class CRegJob_##jobclass \
+ { \
+ public: CRegJob_##jobclass() \
+ { \
+ Job_RegisterJobType( &g_JobType_##jobclass ); \
+ } \
+ } g_RegJob_##jobclass;
+
+
+//-----------------------------------------------------------------------------
+// Purpose: job registration macro for job triggered by web api request
+//-----------------------------------------------------------------------------
+#define REG_WEBAPI_JOB( parentclass, jobclass, jobname, servertype ) \
+ CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
+ static const JobType_t g_JobType_##jobclass = { jobname, k_EGCMsgInvalid, servertype, (JobCreationFunc_t)CreateJob_##jobclass }; \
+ CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
+{ \
+ CJob *job = CJob::AllocateJob<jobclass>( pParent ); \
+ if ( job ) \
+ { \
+ Job_SetJobType( *job, &g_JobType_##jobclass ); \
+ if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
+ } \
+ else \
+ { \
+ AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
+ } \
+ return job; \
+} \
+ static class CRegJob_##jobclass \
+{ \
+public: CRegJob_##jobclass() \
+{ \
+ Job_RegisterJobType( &g_JobType_##jobclass ); \
+} \
+} g_RegJob_##jobclass;
+
+
+
+} // namespace GCSDK
+
+#include "tier0/memdbgoff.h"
+
+#endif // GC_JOBMGR_H