diff options
| author | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
|---|---|---|
| committer | FluorescentCIAAfricanAmerican <[email protected]> | 2020-04-22 12:56:21 -0400 |
| commit | 3bf9df6b2785fa6d951086978a3e66f49427166a (patch) | |
| tree | 2c0f1f0c63c4832882bc93814ebd2c2b1c6224e5 /public/gcsdk/jobmgr.h | |
| download | archived-source-engine-2018-hl2-src-master.tar.xz archived-source-engine-2018-hl2-src-master.zip | |
Diffstat (limited to 'public/gcsdk/jobmgr.h')
| -rw-r--r-- | public/gcsdk/jobmgr.h | 430 |
1 files changed, 430 insertions, 0 deletions
diff --git a/public/gcsdk/jobmgr.h b/public/gcsdk/jobmgr.h new file mode 100644 index 0000000..93b6387 --- /dev/null +++ b/public/gcsdk/jobmgr.h @@ -0,0 +1,430 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +// $NoKeywords: $ +//============================================================================= + +#ifndef GC_JOBMGR_H +#define GC_JOBMGR_H +#ifdef _WIN32 +#pragma once +#endif + +#include "tier0/fasttimer.h" +#include "tier1/utlpriorityqueue.h" +#include "job.h" +#include "workthreadpool.h" +class GCConVar; + +#include "tier0/memdbgon.h" + +namespace GCSDK +{ + +#if defined(_DEBUG) +// this is restricted to debug builds due to the performance cost +// that could be changed by removing the expensive sm_listAllJobs.Find() command +#define DEBUG_JOB_LIST +#endif // defined(_DEBUG) + +struct JobStats_t +{ + uint m_cJobsCurrent; + uint m_cJobsTotal; + uint m_cJobsFailed; + uint64 m_cJobsTimedOut; // # of jobs timed out ever + double m_flSumJobTimeMicrosec; + double m_flSumSqJobTimeMicrosec; + uint64 m_unMaxJobTimeMicrosec; + + uint m_cTimeslices; + + JobStats_t() + { + memset( this, 0, sizeof(JobStats_t) ); + } +}; + +struct JobStatsBucket_t +{ + JobStatsBucket_t() + { + memset( this, 0, sizeof(JobStatsBucket_t) ); + } + char m_rgchName[64]; + uint64 m_cCompletes; + uint64 m_u64RunTimeMax; + uint64 m_cTimeoutNetMsg; + uint64 m_cLongInterYieldTime; + uint64 m_cLocksAttempted; + uint64 m_cLocksWaitedFor; + uint64 m_cLocksFailed; + uint64 m_cLocksLongHeld; + uint64 m_cLocksLongWait; + uint64 m_cWaitTimeout; + uint64 m_u64JobDuration; + uint64 m_cJobsPaused; + uint64 m_cJobsFailed; + uint64 m_u64RunTime; + // use by ListJobs + uint64 m_cPauseReasonNetworkMsg; + uint64 m_cPauseReasonSleepForTime; + uint64 m_cPauseReasonWaitingForLock; + uint64 m_cPauseReasonYield; + uint64 m_cPauseReasonSQL; + uint64 m_cPauseReasonWorkItem; + +#ifdef DBGFLAG_VALIDATE + void Validate( CValidator &validator, const char *pchName ) + { + VALIDATE_SCOPE(); + } +#endif +}; + +enum EJobProfileAction +{ + k_EJobProfileAction_ErrorReport = 0, + k_EJobProfileAction_Start = 1, + k_EJobProfileAction_Stop = 2, + k_EJobProfileAction_Dump = 3, + k_EJobProfileAction_Clear = 4, +}; + +enum EJobProfileSortOrder +{ + k_EJobProfileSortOrder_Alpha = 0, + k_EJobProfileSortOrder_Count = 1, + k_EJobProfileSortOrder_TotalRuntime = 2, +}; + +struct JobProfileStats_t +{ + int m_iJobProfileSort; + CUtlMap< uint32, JobStatsBucket_t, int > *pmapStatsBucket; +}; + +//----------------------------------------------------------------------------- +// Purpose: This keeps track of all jobs that belong to a given hub. +// It's primarily used for routing incoming messages to jobs. +//----------------------------------------------------------------------------- +class CJobMgr +{ +public: + // Constructors & destructors + CJobMgr(); + ~CJobMgr(); + + // gets the next available job ID + JobID_t GetNewJobID(); + + // Set the thread count for the internal thread pool(s) + void SetThreadPoolSize( uint cThreads ); + + // Run any sleeping jobs who's wakeup time has arrived and check for timeouts + bool BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer ); + + // Run any yielding jobs, even low priority ones + bool BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer ); + + // Route this message to an existing Job, or create a new one if that JobID does not exist + bool BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo ); + + // Adds a new Job to the mgr and generates a JobID for it. + void InsertJob( CJob &job ); + + // Removes a Job from the mgr (the caller is still responsible for freeing it) + void RemoveJob( CJob &job ); + + //called by a job that has just been started to place itself on the yield queue instead of running + void AddDelayedJobToYieldList( CJob &job ); + +#ifdef GC + // resumes the specified job if it is, in fact, waiting for a SQL query to return + bool BResumeSQLJob( JobID_t jobID ); + + // yields waiting for a query response + bool BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog ); + + // SQL profiling + enum ESQLProfileSort + { + k_ESQLProfileSortTotalTime, + k_ESQLProfileSortTotalCount, + k_ESQLProfileSortAvgTime, + k_ESQLProfileSortName + }; + + void StartSQLProfiling(); + void StopSQLProfiling(); + void DumpSQLProfile( ESQLProfileSort eSort ); +#endif + + // returns true if we're running any jobs of the specified name + // slow to call if lots of jobs are running, should only be used by tests + bool BIsJobRunning( const char *pchJobName ); + + // passes a network msg directly to the specified job + void PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo ); + + // yields until a network message is received + bool BYieldingWaitForMsg( CJob &job ); + + // yields for a set amount of time + bool BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep ); + + // simple yield until Run() called again + bool BYield( CJob &job ); + + // Yield only if job manager decides we need to + bool BYieldIfNeeded( CJob &job, bool *pbYielded ); + + // Thread pool work item + bool BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName = NULL ); + bool BRouteWorkItemCompleted( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ true ); } + bool BRouteWorkItemCompletedIfExists( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ true ); } + bool BRouteWorkItemCompletedDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ false ); } + bool BRouteWorkItemCompletedIfExistsDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ false ); } + + void AddThreadedJobWorkItem( CWorkItem *pWorkItem ); + void StopWorkThreads() { m_WorkThreadPool.StopWorkThreads(); } + + static int ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs ); + + void ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder = k_EJobProfileSortOrder_Alpha ); + int DumpJobSummary(); + void DumpJob( JobID_t jobID, int nPrintLocksMax = 20 ) const; + int CountJobs() const; // counts currently active jobs + void CheckThreadID(); // make sure we are still in the correct thread + int CountYieldingJobs() const { return m_ListJobsYieldingRegPri.Count(); } // counts jobs currently in a yielding state + bool HasOutstandingThreadPoolWorkItems(); + + void SetIsShuttingDown(); + bool GetIsShuttingDown() const { return m_bIsShuttingDown; } + + void *GetMainMemoryDebugInfo() { return g_memMainDebugInfo.Base(); } + +#ifdef DBGFLAG_VALIDATE + void Validate( CValidator &validator, const char *pchName ); // Validate our internal structures + static void ValidateStatics( CValidator &validator, const char *pchName ); +#endif /* DBGFLAG_VALIDATE */ + + // wakes up a job that was waiting on a lock + void WakeupLockedJob( CJob &job ); + + // returns true if there is a job active with the specified ID + bool BJobExists( JobID_t jobID ) const; + + // returns a job + CJob *GetPJob( JobID_t jobID ); + const CJob *GetPJob( JobID_t jobID ) const; + + JobStats_t& GetJobStats() { return m_JobStats; } + + // Access work thread pool directly + CWorkThreadPool *AccessWorkThreadPool() { return &m_WorkThreadPool; } + + // Debug helpers + // dumps a list of all running jobs across ALL job managers + void DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax = 1 ) const; + // cause a debug break in the given job + static void DebugJob( int iJob ); + + // disable/enable yielding for debugging + void SetPauseAllowed( bool bNewPauseAllowed ) { m_bDebugDisallowPause = !bNewPauseAllowed; } + +private: + + bool BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately ); + + // Create a new job for this message + bool BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket ); + + // Internal add to yield list (looks at priority) + void AddToYieldList( CJob &job ); + + // Get an IJob given a job ID and pause reason + bool BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob ); + + // Map containing all of our jobs + CUtlMap<JobID_t, CJob *, int> m_MapJob; + + // jobs simply waiting until the next Run() + struct JobYielding_t + { + JobID_t m_JobID; + uint m_nIteration; + }; + CUtlLinkedList<JobYielding_t, int> m_ListJobsYieldingRegPri; + bool BResumeYieldingJobs( CLimitTimer &limitTimer ); + bool BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, CLimitTimer &limitTimer ); + uint m_nCurrentYieldIterationRegPri; + + // jobs waiting on a timer + struct JobSleeping_t + { + JobID_t m_JobID; + CJobTime m_SWakeupTime; + CJobTime m_STimeTouched; + }; + CUtlPriorityQueue<JobSleeping_t> m_QueueJobSleeping; + bool BResumeSleepingJobs( CLimitTimer &limitTimer ); + static bool JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs ); + + // timeout list of jobs, ordered from oldest to newest + struct JobTimeout_t + { + JobID_t m_JobID; + CJobTime m_STimePaused; + CJobTime m_STimeTouched; + uint32 m_cHeartbeatsBeforeTimeout; + }; + CUtlLinkedList<JobTimeout_t, int> m_ListJobTimeouts; + CUtlMap<JobID_t, int, int> m_MapJobTimeoutsIndexByJobID; + void PauseJob( CJob &job, EJobPauseReason eJobPauseReason ); + void CheckForJobTimeouts( CLimitTimer &limitTimer ); + void TimeoutJob( CJob &job ); + bool m_bJobTimedOut; + + // thread pool usage, for running job functions in other threads + CWorkThreadPool m_WorkThreadPool; + + void AccumulateStatsofJob( CJob &job ); + void RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget ); + + // stats info + JobStats_t m_JobStats; + + // static job registration + static void RegisterJobType( const JobType_t *pJobType ); + friend void Job_RegisterJobType( const JobType_t *pJobType ); + + JobID_t m_unNextJobID; + uint m_unFrameFuncThreadID; // the thread is JobMgr is working in + bool m_bProfiling; + bool m_bIsShuttingDown; + int m_cErrorsToReport; + CUtlMap< uint32, JobStatsBucket_t, int > m_mapStatsBucket; + CUtlMap<MsgType_t, int, int> m_mapOrphanMessages; + CUtlMemory<unsigned char> g_memMainDebugInfo; + +#ifdef GC + // sql profiling + bool m_bSQLProfiling; + CFastTimer m_sqlTimer; + + struct PendingSQLJob_t + { + int64 m_nStartMicrosec; + int32 m_iBucket; + }; + + struct SQLProfileBucket_t + { + int64 m_nTotalMicrosec; + uint32 m_unCount; + }; + + CUtlHashMapLarge<GID_t, PendingSQLJob_t> m_mapSQLQueriesInFlight; + CUtlDict<SQLProfileBucket_t> m_dictSQLBuckets; + + struct SQLProfileCtx_t + { + ESQLProfileSort m_eSort; + CUtlDict<SQLProfileBucket_t> *pdictBuckets; + }; + static int SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs ); +#endif + +#ifdef DEBUG_JOB_LIST + // static job debug list + static CUtlLinkedList<CJob *, int> sm_listAllJobs; +#endif + + bool m_bDebugDisallowPause; +}; + + +//----------------------------------------------------------------------------- +// Purpose: passthrough function just so the CJob internal data can be kept private +//----------------------------------------------------------------------------- +inline void Job_RegisterJobType( const JobType_t *pJobType ) +{ + CJobMgr::RegisterJobType( pJobType ); +} + + +//----------------------------------------------------------------------------- +// Purpose: passthrough function just so the CJob internal data can be kept private +//----------------------------------------------------------------------------- +inline void Job_SetJobType( CJob &job, const JobType_t *pJobType ) +{ + job.m_pJobType = pJobType; +} + + +//----------------------------------------------------------------------------- +// Purpose: job registration macro +//----------------------------------------------------------------------------- +#define GC_REG_JOB( parentclass, jobclass, jobname, msg, servertype ) \ + GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \ + static const GCSDK::JobType_t g_JobType_##jobclass = { jobname, (GCSDK::MsgType_t)msg, servertype, (GCSDK::JobCreationFunc_t)CreateJob_##jobclass }; \ + GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \ + { \ + GCSDK::CJob *job = GCSDK::CJob::AllocateJob<jobclass>( pParent ); \ + if ( job ) \ + { \ + Job_SetJobType( *job, &g_JobType_##jobclass ); \ + if ( pvStartParam ) job->SetStartParam( pvStartParam ); \ + } \ + else \ + { \ + AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \ + } \ + return job; \ + } \ + static class CRegJob_##jobclass \ + { \ + public: CRegJob_##jobclass() \ + { \ + Job_RegisterJobType( &g_JobType_##jobclass ); \ + } \ + } g_RegJob_##jobclass; + + +//----------------------------------------------------------------------------- +// Purpose: job registration macro for job triggered by web api request +//----------------------------------------------------------------------------- +#define REG_WEBAPI_JOB( parentclass, jobclass, jobname, servertype ) \ + CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \ + static const JobType_t g_JobType_##jobclass = { jobname, k_EGCMsgInvalid, servertype, (JobCreationFunc_t)CreateJob_##jobclass }; \ + CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \ +{ \ + CJob *job = CJob::AllocateJob<jobclass>( pParent ); \ + if ( job ) \ + { \ + Job_SetJobType( *job, &g_JobType_##jobclass ); \ + if ( pvStartParam ) job->SetStartParam( pvStartParam ); \ + } \ + else \ + { \ + AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \ + } \ + return job; \ +} \ + static class CRegJob_##jobclass \ +{ \ +public: CRegJob_##jobclass() \ +{ \ + Job_RegisterJobType( &g_JobType_##jobclass ); \ +} \ +} g_RegJob_##jobclass; + + + +} // namespace GCSDK + +#include "tier0/memdbgoff.h" + +#endif // GC_JOBMGR_H |