1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
|
//========= Copyright Valve Corporation, All rights reserved. ============//
//
// Purpose: A thread pool implementation. You give it CWorkItems,
// it processes them asynchronously, and hands them back to you when they've
// been completed.
//
// To declare a queue, provide the implementation of a CWorkItem subtype,
// the thread name prefix for threads in the pool, and the number of work
// threads you want.
//
// CNet uses this class to offload encryption to a separate thread,
// so that's a good place to start looking for usage examples.
//
//=============================================================================
#ifndef WORKTHREADPOOL_H
#define WORKTHREADPOOL_H
#ifdef _WIN32
#pragma once
#endif
#include <refcount.h>
#include <reliabletimer.h>
#include "jobtime.h"
// forward declaration for CTSQueue which we can't statically allocate as our member
// because of alignment issues on Win64
template <class T, bool bTestOptimizer>
class CTSQueue;
namespace GCSDK {
// forward declarations
class CWorkThread;
class CJobMgr;
// these functions return pointers to fixed string in the code section. We need this for VPROF nodes
#define DECLARE_WORK_ITEM( classname ) \
virtual const char* GetDispatchCompletedName() const { return #classname"::DispatchCompleted"; } \
virtual const char* GetThreadProcessName() const { return #classname"::ThreadProcess"; }
//-----------------------------------------------------------------------------
// Purpose: Work item base class. Derive from this for specific work item types.
// The derived type ideally should be self-contained with all data it
// needs to perform the work.
//-----------------------------------------------------------------------------
class CWorkItem : public CRefCount
{
public:
CWorkItem()
: m_JobID( k_GIDNil ),
m_bRunning( false ),
m_bResubmit( false ),
m_bCanceled( false ),
m_ulSequenceNumber( 0 )
{
m_jobtimeTimeout.SetLTime( 0 );
m_jobtimeQueued.SetToJobTime();
}
CWorkItem( JobID_t jobID )
: m_JobID( jobID ),
m_bRunning( false ),
m_bResubmit( false ),
m_bCanceled( false ),
m_ulSequenceNumber( 0 )
{
m_jobtimeTimeout.SetLTime( 0 );
m_jobtimeQueued.SetToJobTime();
}
CWorkItem( JobID_t jobID, int64 cTimeoutMicroseconds )
: m_JobID( jobID ),
m_bRunning( false ),
m_bResubmit( false ),
m_bCanceled( false ),
m_ulSequenceNumber( 0 )
{
SetPreExecuteTimeout( cTimeoutMicroseconds );
m_jobtimeQueued.SetToJobTime();
}
void SetJobID( JobID_t jobID )
{
Assert(jobID != k_GIDNil) ;
m_JobID = jobID;
}
JobID_t GetJobID() const { return m_JobID; }
bool HasTimedOut() const { return m_jobtimeTimeout.LTime() != 0 && m_jobtimeTimeout.CServerMicroSecsPassed() > 0; }
int64 WaitingTime() const { return m_jobtimeQueued.CServerMicroSecsPassed(); }
void SetPreExecuteTimeout( int64 cMicroSeconds ) { m_jobtimeTimeout.SetFromJobTime( cMicroSeconds ); }
bool BPreExecuteTimeoutSet( ) const { return m_jobtimeTimeout.LTime() != 0; }
void ForceTimeOut() { m_jobtimeTimeout.SetFromJobTime( -1 );}
bool BIsRunning() const { return m_bRunning; } // true if running right now
bool WasCancelled() const { return m_bCanceled; }
void SetCycleCount( CCycleCount& cycleCount ) { m_CycleCount = cycleCount ; }
CCycleCount GetCycleCount() { return m_CycleCount; }
uint64 GetSequenceNumber() { return m_ulSequenceNumber; }
// Work threads can call this to force a work item to be reprocessed (added to the end of the process queue)
void SetResubmit( bool bResubmit ) { m_bResubmit = bResubmit; }
// these functions return pointers to fixed string in the code section.
// We need this for VPROF nodes, you must use the DECLARE_WORK_ITEM macro
virtual const char* GetDispatchCompletedName() const = 0;
virtual const char* GetThreadProcessName() const = 0;
// Return false if your operation failed in some way that you would want to know about
// The CWorkThreadPool will count the failures.
virtual bool ThreadProcess( CWorkThread *pThread ) = 0; // called by the worker thread
virtual bool DispatchCompletedWorkItem( CJobMgr *jobMgr ); // called by main loop after item completed
#ifdef DBGFLAG_VALIDATE
virtual void Validate( CValidator &validator, const char *pchName ) {} // Validate our internal structures
#endif
protected:
// note: destructor is private. This is a ref-counted object, private destructor ensures callers can't accidentally delete
// directly, or declare on stack
virtual ~CWorkItem() { }
friend class CWorkThread;
friend class CWorkThreadPool;
uint64 m_ulSequenceNumber; // Sequence number for the work item, used when enforcing output ordering as matching input order
CCycleCount m_CycleCount; // A record of how long it took to execute this particular work item !
private:
bool m_bResubmit; // true if the item should be resubmitted after last run
volatile bool m_bRunning; // true if the work item is running right now
bool m_bCanceled; // true if the work was canceled due to timeout
CJobTime m_jobtimeTimeout; // time at which this result is no longer valid, so it shouldn't start to be processed
CJobTime m_jobtimeQueued;
JobID_t m_JobID;
};
// forward decl
class CWorkThreadPool;
//-----------------------------------------------------------------------------
// Purpose: Generic work thread implementation, to be specialized if necessary
//-----------------------------------------------------------------------------
class CWorkThread : public CThread
{
public:
CWorkThread( CWorkThreadPool *pThreadPool );
CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName );
virtual ~CWorkThread()
{
}
virtual int Run();
virtual void Cancel()
{
}
protected:
CWorkThreadPool *m_pThreadPool; // parent pool
volatile bool m_bExitThread; // set by CWorkThreadPool::StopWorkerThreads and possibly by subclasses of CWorkThread
volatile bool m_bFinished; // set by CWorkThread::Run [note: must still check IsThreadRunning, and/or call Join]
virtual void OnStart() { }
virtual void OnExit() { }
#ifdef DBGFLAG_VALIDATE
public:
virtual void Validate( CValidator &validator, const char *pchName )
{
VALIDATE_SCOPE();
};
#endif // DBGFLAG_VALIDATE
friend class CWorkThreadPool;
};
//-----------------------------------------------------------------------------
// callback class to create work threads
//-----------------------------------------------------------------------------
class IWorkThreadFactory
{
public:
virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool ) = 0;
};
//-----------------------------------------------------------------------------
// reusable trivial implementation of IWorkThreadFactory
//-----------------------------------------------------------------------------
template<class T>
class CWorkThreadFactory : public IWorkThreadFactory
{
public:
virtual CWorkThread *CreateWorkerThread( class CWorkThreadPool *pWorkThreadPool )
{
return new T( pWorkThreadPool );
}
};
//-----------------------------------------------------------------------------
// Purpose: interface class for object that the WorkThreadPool can signal when
// there are completed work items to process
//-----------------------------------------------------------------------------
class IWorkThreadPoolSignal
{
public:
virtual void Signal() = 0;
};
//-----------------------------------------------------------------------------
// Purpose: pool of work threads.
//-----------------------------------------------------------------------------
class CWorkThreadPool
{
friend class CWorkThread;
public:
static void SetWorkItemCompletedSignal( IWorkThreadPoolSignal *pObject )
{
sm_pWorkItemsCompletedSignal = pObject;
}
CWorkThreadPool( const char *pszThreadNamePfx );
// eventually it might be nice to be able to resize these pools via console command
// in that case, we'd want a constructor like this, and a PoolSize accessor/mutator pair
// it makes this class much more complicated, however (growing the pool is easy, shrinking it
// is less easy) so we'll punt for now.
/* CWorkThreadPool( const char *pszName = "unnamed thread" ) : CWorkThreadPool( pszName, -1 ); */
virtual ~CWorkThreadPool();
// Setting this will ensure that items of the same priority complete and get dispatched in the same order
// they are added to the threadpool. This has a small additional locking overhead and can increase latency
// as items that are actually completed out-of-order have to queue waiting on earlier items.
void SetEnsureOutputOrdering( bool bEnsureOutputOrdering ) { m_bEnsureOutputOrdering = bEnsureOutputOrdering; }
void AllowTimeouts( bool bMayHaveJobTimeouts ) { m_bMayHaveJobTimeouts = bMayHaveJobTimeouts; }
int AddWorkThread( CWorkThread *pThread );
void StartWorkThreads(); // gentlemen, start your engines
void StopWorkThreads(); // stop work threads
bool HasWorkItemsToProcess() const;
// sets it to use dynamic worker thread construction
// if pWorkThreadControl is NULL, just creates a standard CWorkThread object
void SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor );
bool AddWorkItem( CWorkItem *pWorkItem ); // add a work item to the queue to process
CWorkItem *GetNextCompletedWorkItem( ); // get next completed work item and it's priority if needed
const char *GetThreadNamePrefix() const { return m_szThreadNamePfx; }
void SetNeverSetEventOnAdd( bool bNeverSet );
bool BNeverSetEventOnAdd() { return m_bNeverSetOnAdd; }
// get count of completed work items
// can't be inline because of m_TSQueueCompleted type
int GetCompletedWorkItemCount() const;
// get count of work items to process
// can't be inline because of m_TSQueueToProcess type
int GetWorkItemToProcessCount() const;
uint64 GetLastUsedSequenceNumber( ) const
{
return m_ulLastUsedSequenceNumber;
}
uint64 GetLastCompletedSequenceNumber( ) const
{
return m_ulLastCompletedSequenceNumber;
}
uint64 GetLastDispatchedSequenceNumber( ) const
{
return m_ulLastDispatchedSequenceNumber;
}
#if 0
uint64 GetAveExecutionTime() const
{
return m_StatExecutionTime.GetUlAvg();
}
uint64 GetAveWaitTime() const
{
return m_StatWaitTime.GetUlAvg();
}
uint64 GetCurrentBacklogTime() const;
#endif
int CountCompletedSuccess() const { return m_cSuccesses; }
int CountRetries() const { return m_cRetries; }
int CountCompletedFailed() const { return m_cFailures; }
bool BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr );
bool BExiting() const { return m_bExiting; }
int GetWorkerCount() const { return m_WorkThreads.Count(); }
uint GetActiveThreadCount() const { return m_cActiveThreads; }
// make sure you lock before using this
const CWorkThread *GetWorkThread( int iIndex ) const
{
Assert( iIndex >= 0 && iIndex < m_WorkThreads.Count() );
return m_WorkThreads[iIndex];
}
protected:
// STATICS
static IWorkThreadPoolSignal *sm_pWorkItemsCompletedSignal;
// MEMBERS
CWorkItem *GetNextWorkItemToProcess( );
void StartWorkThread( CWorkThread *pWorkThread, int iName );
// meaningful thread name prefix
char m_szThreadNamePfx[32];
// have we actually initialized the threadpool?
bool m_bThreadsInitialized;
// Incoming queue: queue of all work items to process
// must be dynamically allocated for alignment requirements on Win64
CTSQueue< CWorkItem *, false > *m_pTSQueueToProcess;
// Outgoing queues: queue of all completed work items
// must be dynamically allocated for alignment requirements on Win64
CTSQueue< CWorkItem *, false > *m_pTSQueueCompleted;
// Vectors of completed, but out of order and waiting work items, only used when bEnsureOutputOrdering == true
CThreadMutex m_MutexOnItemCompletedOrdered;
CUtlVector< CWorkItem * > m_vecCompletedAndWaiting;
// Should we emit work items in the same order they are received (on a per priority basis)
bool m_bEnsureOutputOrdering;
// Sequence numbers
uint64 m_ulLastUsedSequenceNumber;
uint64 m_ulLastCompletedSequenceNumber;
uint64 m_ulLastDispatchedSequenceNumber;
bool m_bMayHaveJobTimeouts;
CUtlVector< CWorkThread * > m_WorkThreads;
CThreadMutex m_WorkThreadMutex;
CInterlockedUInt m_cThreadsRunning; // how many threads are running
volatile bool m_bExiting; // are we exiting
CThreadEvent m_EventNewWorkItem; // event set when a new work item is available to process
CInterlockedInt m_cActiveThreads;
volatile bool m_bNeverSetOnAdd;
bool m_bAutoCreateThreads;
int m_cMaxThreads;
IWorkThreadFactory *m_pWorkThreadConstructor;
// override this method if you want to do any special handling of completed work items. Default implementation puts
// work items in our completed item queue.
virtual void OnWorkItemCompleted( CWorkItem *pWorkItem );
bool BTryDeleteExitedWorkerThreads();
int m_cSuccesses;
int m_cFailures;
int m_cRetries;
#if 0
CStat m_StatExecutionTime;
CStat m_StatWaitTime;
#endif
CLimitTimer m_LimitTimerCreateNewThreads;
#ifdef DBGFLAG_VALIDATE
public:
void Validate( CValidator &validator, const char *pchName );
#endif
};
} // namespace GCSDK
#endif // WORKTHREAD_H
|