1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
|
//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose:
//
// $NoKeywords: $
//=============================================================================
#ifndef GC_JOBMGR_H
#define GC_JOBMGR_H
#ifdef _WIN32
#pragma once
#endif
#include "tier0/fasttimer.h"
#include "tier1/utlpriorityqueue.h"
#include "job.h"
#include "workthreadpool.h"
class GCConVar;
#include "tier0/memdbgon.h"
namespace GCSDK
{
#if defined(_DEBUG)
// this is restricted to debug builds due to the performance cost
// that could be changed by removing the expensive sm_listAllJobs.Find() command
#define DEBUG_JOB_LIST
#endif // defined(_DEBUG)
struct JobStats_t
{
uint m_cJobsCurrent;
uint m_cJobsTotal;
uint m_cJobsFailed;
uint64 m_cJobsTimedOut; // # of jobs timed out ever
double m_flSumJobTimeMicrosec;
double m_flSumSqJobTimeMicrosec;
uint64 m_unMaxJobTimeMicrosec;
uint m_cTimeslices;
JobStats_t()
{
memset( this, 0, sizeof(JobStats_t) );
}
};
struct JobStatsBucket_t
{
JobStatsBucket_t()
{
memset( this, 0, sizeof(JobStatsBucket_t) );
}
char m_rgchName[64];
uint64 m_cCompletes;
uint64 m_u64RunTimeMax;
uint64 m_cTimeoutNetMsg;
uint64 m_cLongInterYieldTime;
uint64 m_cLocksAttempted;
uint64 m_cLocksWaitedFor;
uint64 m_cLocksFailed;
uint64 m_cLocksLongHeld;
uint64 m_cLocksLongWait;
uint64 m_cWaitTimeout;
uint64 m_u64JobDuration;
uint64 m_cJobsPaused;
uint64 m_cJobsFailed;
uint64 m_u64RunTime;
// use by ListJobs
uint64 m_cPauseReasonNetworkMsg;
uint64 m_cPauseReasonSleepForTime;
uint64 m_cPauseReasonWaitingForLock;
uint64 m_cPauseReasonYield;
uint64 m_cPauseReasonSQL;
uint64 m_cPauseReasonWorkItem;
#ifdef DBGFLAG_VALIDATE
void Validate( CValidator &validator, const char *pchName )
{
VALIDATE_SCOPE();
}
#endif
};
enum EJobProfileAction
{
k_EJobProfileAction_ErrorReport = 0,
k_EJobProfileAction_Start = 1,
k_EJobProfileAction_Stop = 2,
k_EJobProfileAction_Dump = 3,
k_EJobProfileAction_Clear = 4,
};
enum EJobProfileSortOrder
{
k_EJobProfileSortOrder_Alpha = 0,
k_EJobProfileSortOrder_Count = 1,
k_EJobProfileSortOrder_TotalRuntime = 2,
};
struct JobProfileStats_t
{
int m_iJobProfileSort;
CUtlMap< uint32, JobStatsBucket_t, int > *pmapStatsBucket;
};
//-----------------------------------------------------------------------------
// Purpose: This keeps track of all jobs that belong to a given hub.
// It's primarily used for routing incoming messages to jobs.
//-----------------------------------------------------------------------------
class CJobMgr
{
public:
// Constructors & destructors
CJobMgr();
~CJobMgr();
// gets the next available job ID
JobID_t GetNewJobID();
// Set the thread count for the internal thread pool(s)
void SetThreadPoolSize( uint cThreads );
// Run any sleeping jobs who's wakeup time has arrived and check for timeouts
bool BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer );
// Run any yielding jobs, even low priority ones
bool BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer );
// Route this message to an existing Job, or create a new one if that JobID does not exist
bool BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );
// Adds a new Job to the mgr and generates a JobID for it.
void InsertJob( CJob &job );
// Removes a Job from the mgr (the caller is still responsible for freeing it)
void RemoveJob( CJob &job );
//called by a job that has just been started to place itself on the yield queue instead of running
void AddDelayedJobToYieldList( CJob &job );
#ifdef GC
// resumes the specified job if it is, in fact, waiting for a SQL query to return
bool BResumeSQLJob( JobID_t jobID );
// yields waiting for a query response
bool BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog );
// SQL profiling
enum ESQLProfileSort
{
k_ESQLProfileSortTotalTime,
k_ESQLProfileSortTotalCount,
k_ESQLProfileSortAvgTime,
k_ESQLProfileSortName
};
void StartSQLProfiling();
void StopSQLProfiling();
void DumpSQLProfile( ESQLProfileSort eSort );
#endif
// returns true if we're running any jobs of the specified name
// slow to call if lots of jobs are running, should only be used by tests
bool BIsJobRunning( const char *pchJobName );
// passes a network msg directly to the specified job
void PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo );
// yields until a network message is received
bool BYieldingWaitForMsg( CJob &job );
// yields for a set amount of time
bool BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep );
// simple yield until Run() called again
bool BYield( CJob &job );
// Yield only if job manager decides we need to
bool BYieldIfNeeded( CJob &job, bool *pbYielded );
// Thread pool work item
bool BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName = NULL );
bool BRouteWorkItemCompleted( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ true ); }
bool BRouteWorkItemCompletedIfExists( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ true ); }
bool BRouteWorkItemCompletedDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ true, /* bResumeImmediately */ false ); }
bool BRouteWorkItemCompletedIfExistsDelayed( JobID_t jobID, bool bWorkItemCanceled ) { return BRouteWorkItemCompletedInternal( jobID, bWorkItemCanceled, /* bShouldExist */ false, /* bResumeImmediately */ false ); }
void AddThreadedJobWorkItem( CWorkItem *pWorkItem );
void StopWorkThreads() { m_WorkThreadPool.StopWorkThreads(); }
static int ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
void ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder = k_EJobProfileSortOrder_Alpha );
int DumpJobSummary();
void DumpJob( JobID_t jobID, int nPrintLocksMax = 20 ) const;
int CountJobs() const; // counts currently active jobs
void CheckThreadID(); // make sure we are still in the correct thread
int CountYieldingJobs() const { return m_ListJobsYieldingRegPri.Count(); } // counts jobs currently in a yielding state
bool HasOutstandingThreadPoolWorkItems();
void SetIsShuttingDown();
bool GetIsShuttingDown() const { return m_bIsShuttingDown; }
void *GetMainMemoryDebugInfo() { return g_memMainDebugInfo.Base(); }
#ifdef DBGFLAG_VALIDATE
void Validate( CValidator &validator, const char *pchName ); // Validate our internal structures
static void ValidateStatics( CValidator &validator, const char *pchName );
#endif /* DBGFLAG_VALIDATE */
// wakes up a job that was waiting on a lock
void WakeupLockedJob( CJob &job );
// returns true if there is a job active with the specified ID
bool BJobExists( JobID_t jobID ) const;
// returns a job
CJob *GetPJob( JobID_t jobID );
const CJob *GetPJob( JobID_t jobID ) const;
JobStats_t& GetJobStats() { return m_JobStats; }
// Access work thread pool directly
CWorkThreadPool *AccessWorkThreadPool() { return &m_WorkThreadPool; }
// Debug helpers
// dumps a list of all running jobs across ALL job managers
void DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax = 1 ) const;
// cause a debug break in the given job
static void DebugJob( int iJob );
// disable/enable yielding for debugging
void SetPauseAllowed( bool bNewPauseAllowed ) { m_bDebugDisallowPause = !bNewPauseAllowed; }
private:
bool BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately );
// Create a new job for this message
bool BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket );
// Internal add to yield list (looks at priority)
void AddToYieldList( CJob &job );
// Get an IJob given a job ID and pause reason
bool BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob );
// Map containing all of our jobs
CUtlMap<JobID_t, CJob *, int> m_MapJob;
// jobs simply waiting until the next Run()
struct JobYielding_t
{
JobID_t m_JobID;
uint m_nIteration;
};
CUtlLinkedList<JobYielding_t, int> m_ListJobsYieldingRegPri;
bool BResumeYieldingJobs( CLimitTimer &limitTimer );
bool BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, CLimitTimer &limitTimer );
uint m_nCurrentYieldIterationRegPri;
// jobs waiting on a timer
struct JobSleeping_t
{
JobID_t m_JobID;
CJobTime m_SWakeupTime;
CJobTime m_STimeTouched;
};
CUtlPriorityQueue<JobSleeping_t> m_QueueJobSleeping;
bool BResumeSleepingJobs( CLimitTimer &limitTimer );
static bool JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs );
// timeout list of jobs, ordered from oldest to newest
struct JobTimeout_t
{
JobID_t m_JobID;
CJobTime m_STimePaused;
CJobTime m_STimeTouched;
uint32 m_cHeartbeatsBeforeTimeout;
};
CUtlLinkedList<JobTimeout_t, int> m_ListJobTimeouts;
CUtlMap<JobID_t, int, int> m_MapJobTimeoutsIndexByJobID;
void PauseJob( CJob &job, EJobPauseReason eJobPauseReason );
void CheckForJobTimeouts( CLimitTimer &limitTimer );
void TimeoutJob( CJob &job );
bool m_bJobTimedOut;
// thread pool usage, for running job functions in other threads
CWorkThreadPool m_WorkThreadPool;
void AccumulateStatsofJob( CJob &job );
void RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget );
// stats info
JobStats_t m_JobStats;
// static job registration
static void RegisterJobType( const JobType_t *pJobType );
friend void Job_RegisterJobType( const JobType_t *pJobType );
JobID_t m_unNextJobID;
uint m_unFrameFuncThreadID; // the thread is JobMgr is working in
bool m_bProfiling;
bool m_bIsShuttingDown;
int m_cErrorsToReport;
CUtlMap< uint32, JobStatsBucket_t, int > m_mapStatsBucket;
CUtlMap<MsgType_t, int, int> m_mapOrphanMessages;
CUtlMemory<unsigned char> g_memMainDebugInfo;
#ifdef GC
// sql profiling
bool m_bSQLProfiling;
CFastTimer m_sqlTimer;
struct PendingSQLJob_t
{
int64 m_nStartMicrosec;
int32 m_iBucket;
};
struct SQLProfileBucket_t
{
int64 m_nTotalMicrosec;
uint32 m_unCount;
};
CUtlHashMapLarge<GID_t, PendingSQLJob_t> m_mapSQLQueriesInFlight;
CUtlDict<SQLProfileBucket_t> m_dictSQLBuckets;
struct SQLProfileCtx_t
{
ESQLProfileSort m_eSort;
CUtlDict<SQLProfileBucket_t> *pdictBuckets;
};
static int SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs );
#endif
#ifdef DEBUG_JOB_LIST
// static job debug list
static CUtlLinkedList<CJob *, int> sm_listAllJobs;
#endif
bool m_bDebugDisallowPause;
};
//-----------------------------------------------------------------------------
// Purpose: passthrough function just so the CJob internal data can be kept private
//-----------------------------------------------------------------------------
inline void Job_RegisterJobType( const JobType_t *pJobType )
{
CJobMgr::RegisterJobType( pJobType );
}
//-----------------------------------------------------------------------------
// Purpose: passthrough function just so the CJob internal data can be kept private
//-----------------------------------------------------------------------------
inline void Job_SetJobType( CJob &job, const JobType_t *pJobType )
{
job.m_pJobType = pJobType;
}
//-----------------------------------------------------------------------------
// Purpose: job registration macro
//-----------------------------------------------------------------------------
#define GC_REG_JOB( parentclass, jobclass, jobname, msg, servertype ) \
GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
static const GCSDK::JobType_t g_JobType_##jobclass = { jobname, (GCSDK::MsgType_t)msg, servertype, (GCSDK::JobCreationFunc_t)CreateJob_##jobclass }; \
GCSDK::CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
{ \
GCSDK::CJob *job = GCSDK::CJob::AllocateJob<jobclass>( pParent ); \
if ( job ) \
{ \
Job_SetJobType( *job, &g_JobType_##jobclass ); \
if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
} \
else \
{ \
AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
} \
return job; \
} \
static class CRegJob_##jobclass \
{ \
public: CRegJob_##jobclass() \
{ \
Job_RegisterJobType( &g_JobType_##jobclass ); \
} \
} g_RegJob_##jobclass;
//-----------------------------------------------------------------------------
// Purpose: job registration macro for job triggered by web api request
//-----------------------------------------------------------------------------
#define REG_WEBAPI_JOB( parentclass, jobclass, jobname, servertype ) \
CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ); \
static const JobType_t g_JobType_##jobclass = { jobname, k_EGCMsgInvalid, servertype, (JobCreationFunc_t)CreateJob_##jobclass }; \
CJob *CreateJob_##jobclass( parentclass *pParent, void * pvStartParam ) \
{ \
CJob *job = CJob::AllocateJob<jobclass>( pParent ); \
if ( job ) \
{ \
Job_SetJobType( *job, &g_JobType_##jobclass ); \
if ( pvStartParam ) job->SetStartParam( pvStartParam ); \
} \
else \
{ \
AssertMsg( job, "CJob::AllocateJob<" #jobclass "> returned NULL!" ); \
} \
return job; \
} \
static class CRegJob_##jobclass \
{ \
public: CRegJob_##jobclass() \
{ \
Job_RegisterJobType( &g_JobType_##jobclass ); \
} \
} g_RegJob_##jobclass;
} // namespace GCSDK
#include "tier0/memdbgoff.h"
#endif // GC_JOBMGR_H
|