summaryrefslogtreecommitdiff
path: root/public/gcsdk/job.h
diff options
context:
space:
mode:
Diffstat (limited to 'public/gcsdk/job.h')
-rw-r--r--public/gcsdk/job.h465
1 files changed, 465 insertions, 0 deletions
diff --git a/public/gcsdk/job.h b/public/gcsdk/job.h
new file mode 100644
index 0000000..99bc076
--- /dev/null
+++ b/public/gcsdk/job.h
@@ -0,0 +1,465 @@
+//========= Copyright Valve Corporation, All rights reserved. ============//
+//
+// Purpose:
+//
+// $NoKeywords: $
+//=============================================================================
+
+#ifndef GC_JOB_H
+#define GC_JOB_H
+#ifdef _WIN32
+#pragma once
+#endif
+
+#include "tier0/memdbgon.h"
+#include "tier1/functors.h"
+#include "workthreadpool.h"
+
+namespace GCSDK
+{
+class CJobMgr;
+class CLock;
+class CJob;
+class IMsgNetPacket;
+
+//-----------------------------------------------------------------------------
+// Purpose: Use these macros to declare blocks where it is unsafe to yield.
+// The job will assert if it pauses within the block
+//-----------------------------------------------------------------------------
+#define DO_NOT_YIELD_THIS_SCOPE() GCSDK::CDoNotYieldScope doNotYieldScope_##line( FILE_AND_LINE )
+#define BEGIN_DO_NOT_YIELD() GJobCur().PushDoNotYield( FILE_AND_LINE )
+#define END_DO_NOT_YIELD() GJobCur().PopDoNotYield()
+
+class CDoNotYieldScope
+{
+public:
+ CDoNotYieldScope( const char *pchLocation );
+ ~CDoNotYieldScope();
+private:
+ // Disallow these constructors and operators
+ CDoNotYieldScope();
+ CDoNotYieldScope( const CDoNotYieldScope &that );
+ CDoNotYieldScope &operator=( const CDoNotYieldScope &that );
+};
+
+//-----------------------------------------------------------------------------
+
+// job creation function
+typedef CJob *(*JobCreationFunc_t)( void *pvServerParent, void * pvStartParam );
+
+
+//-----------------------------------------------------------------------------
+// Purpose: static job information
+// contains information relevant to one type of CJob
+//-----------------------------------------------------------------------------
+struct JobType_t
+{
+ const char *m_pchName; // name of this type of job
+ MsgType_t m_eCreationMsg; // network message that creates this job
+ EServerType m_eServerType; // the server type that responds to this message
+ JobCreationFunc_t m_pJobFactory; // virtual constructor
+};
+
+
+//-----------------------------------------------------------------------------
+// Purpose: reason as to why the current job has yielded to the main thread (paused)
+// if this is updated, k_prgchJobPauseReason[] in job.cpp also needs to be updated
+//-----------------------------------------------------------------------------
+enum EJobPauseReason
+{
+ k_EJobPauseReasonNone,
+ k_EJobPauseReasonNotStarted,
+ k_EJobPauseReasonNetworkMsg,
+ k_EJobPauseReasonSleepForTime,
+ k_EJobPauseReasonWaitingForLock,
+ k_EJobPauseReasonYield,
+ k_EJobPauseReasonSQL,
+ k_EJobPauseReasonWorkItem,
+
+ k_EJobPauseReasonCount
+};
+
+
+//-----------------------------------------------------------------------------
+// Purpose: contains information used to route a message to a job, or to
+// create a new job from that message type
+//-----------------------------------------------------------------------------
+struct JobMsgInfo_t
+{
+ MsgType_t m_eMsg;
+ JobID_t m_JobIDSource;
+ JobID_t m_JobIDTarget;
+ EServerType m_eServerType;
+
+ JobMsgInfo_t()
+ {
+ m_eMsg = (MsgType_t)0;
+ m_JobIDSource = k_GIDNil;
+ m_JobIDTarget = k_GIDNil;
+ m_eServerType = k_EServerTypeInvalid;
+ }
+
+ JobMsgInfo_t( MsgType_t eMsg, JobID_t jobIDSource, JobID_t jobIDTarget, EServerType eServerType )
+ {
+ m_eMsg = eMsg;
+ m_JobIDSource = jobIDSource;
+ m_JobIDTarget = jobIDTarget;
+ m_eServerType = eServerType;
+ }
+};
+
+typedef void (CJob::*JobThreadFunc_t)();
+
+#define BYieldingAcquireLock( lock ) _BYieldingAcquireLock( lock, __FILE__, __LINE__ )
+#define BAcquireLockImmediate( lock ) _BAcquireLockImmediate( lock, __FILE__, __LINE__ )
+#define ReleaseLock( lock ) _ReleaseLock( lock, false, __FILE__, __LINE__ )
+
+//-----------------------------------------------------------------------------
+// Purpose: A job is any server operation that requires state. Typically, we use jobs for
+// operations that need to pause waiting for responses from other servers. The
+// job object persists the state of the operation while it waits, and the reply
+// from the remote server re-activates the job.
+//-----------------------------------------------------------------------------
+class CJob
+{
+public:
+ // Constructors & destructors, when overriding job name a static string pointer must be used
+ explicit CJob( CJobMgr &jobMgr, char const *pchJobName = NULL );
+ virtual ~CJob();
+
+ // starts the job, storing off the network msg and calling it's Run() function
+ void StartJobFromNetworkMsg( IMsgNetPacket *pNetPacket, const JobID_t &gidJobIDSrc );
+
+ // accessors
+ JobID_t GetJobID() const { return m_JobID; }
+
+ // start job immediately
+ // mostly for CMD jobs, which should immediately Yield() once
+ // so that they don't do their work until the enclosing JobMbr.Run()
+ // is called
+ void StartJob( void * pvStartParam );
+ // schedules the job for execution, but does not interrup the currently running job. Effectively starts the job on the yielding list as if it had immediately yielded
+ // although is more efficient than actually doing so
+ void StartJobDelayed( void * pvStartParam );
+
+ // string name of the job
+ const char *GetName() const;
+ // return reason why we're paused
+ EJobPauseReason GetPauseReason() const { return m_ePauseReason; }
+ // string description of why we're paused
+ const char *GetPauseReasonDescription() const;
+ // return time at which this job was last paused or continued
+ const CJobTime& GetTimeSwitched() const { return m_STimeSwitched; }
+ // return microseconds run since we were last continued
+ uint64 GetMicrosecondsRun() const { return m_FastTimerDelta.GetDurationInProgress().GetUlMicroseconds(); }
+ bool BJobNeedsToHeartbeat() const { return ( m_STimeNextHeartbeat.CServerMicroSecsPassed() >= 0 ); }
+
+ // --- locking pointers
+ bool _BYieldingAcquireLock( CLock *pLock, const char *filename = "unknown", int line = 0 );
+ bool _BAcquireLockImmediate( CLock *pLock, const char *filename = "unknown", int line = 0 );
+ void _ReleaseLock( CLock *pLock, bool bForce = false, const char *filename = "unknown", int line = 0 );
+ bool BHoldsAnyLocks() const { return m_vecLocks.Count() > 0; }
+ void ReleaseLocks();
+
+ /// If we hold any locks, spew about them and release them.
+ /// This is useful for long running jobs, to make sure they don't leak
+ /// locks that never get cleaned up
+ void ShouldNotHoldAnyLocks();
+
+ // --- general methods for waiting for events
+ // Simple yield to other jobs until Run() called again
+ bool BYield();
+ // Yield IF JobMgr thinks we need to based on how long we've run and our priority
+ bool BYieldIfNeeded( bool *pbYielded = NULL );
+ // waits for a set amount of time
+ bool BYieldingWaitTime( uint32 m_cMicrosecondsToSleep );
+ bool BYieldingWaitOneFrame();
+ // waits for another network msg, returning false if none returns
+ bool BYieldingWaitForMsg( IMsgNetPacket **ppNetPacket );
+ bool BYieldingWaitForMsg( CGCMsgBase *pMsg, MsgType_t eMsg );
+ bool BYieldingWaitForMsg( CProtoBufMsgBase *pMsg, MsgType_t eMsg );
+
+#ifdef GC
+ bool BYieldingWaitForMsg( CGCMsgBase *pMsg, MsgType_t eMsg, const CSteamID &expectedID );
+ bool BYieldingWaitForMsg( CProtoBufMsgBase *pMsg, MsgType_t eMsg, const CSteamID &expectedID );
+ bool BYieldingRunQuery( CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog );
+#endif
+
+ bool BYieldingWaitTimeWithLimit( uint32 cMicrosecondsToSleep, CJobTime &stimeStarted, int64 nMicroSecLimit );
+ bool BYieldingWaitTimeWithLimitRealTime( uint32 cMicrosecondsToSleep, int nSecLimit );
+
+ void RecordWaitTimeout() { m_flags.m_bits.m_bWaitTimeout = true; }
+
+ // wait for pending work items before deleting job
+ void WaitForThreadFuncWorkItemBlocking();
+
+ // waits for a work item completion callback
+ // You can pass a string that describes what sort of work item you are waiting on.
+ // WARNING: This function saves the pointer to the string, it doesn't copy the string
+ bool BYieldingWaitForWorkItem( const char *pszWorkItemName = NULL );
+
+ // adds this work item to threaded work pool and waits for it
+ bool BYieldingWaitForThreadFuncWorkItem( CWorkItem * );
+
+ // calls a local function in a thread, and yields until it's done
+ bool BYieldingWaitForThreadFunc( CFunctor *jobFunctor );
+
+ // Used by the DO_NOT_YIELD() macros
+ int32 GetDoNotYieldDepth() const;
+ void PushDoNotYield( const char *pchFileAndLine );
+ void PopDoNotYield();
+
+#ifdef DBGFLAG_VALIDATE
+ virtual void Validate( CValidator &validator, const char *pchName ); // Validate our internal structures
+ static void ValidateStatics( CValidator &validator, const char *pchName );
+#endif
+
+ // creates a job
+ template <typename JOB_TYPE, typename PARAM_TYPE>
+ static JOB_TYPE *AllocateJob( PARAM_TYPE *pParam )
+ {
+ return new JOB_TYPE( pParam );
+ }
+ // delete a job (the job knows what allocator to use)
+ static void DeleteJob( CJob *pJob );
+
+ void SetStartParam( void * pvStartParam ) { Assert( NULL == m_pvStartParam ); m_pvStartParam = pvStartParam; }
+ void SetFromFromMsg( bool bRunFromMsg ) { m_bRunFromMsg = true; }
+
+ void AddPacketToList( IMsgNetPacket *pNetPacket, const JobID_t gidJobIDSrc );
+ // marks a packet as being finished with, releases the packet and frees the memory
+ void ReleaseNetPacket( IMsgNetPacket *pNetPacket );
+
+ void EndPause( EJobPauseReason eExpectedState );
+
+ // Generate an assertion in the coroutine of this job
+ // (creating a minidump). Useful for inspecting stuck jobs
+ void GenerateAssert( const char *pchMsg = NULL );
+
+ /// Return true if we tried to waited on a message of this type,
+ /// and failed to receive it. (If so, then this means we could
+ /// conceivably still receive a reply of that type at any moment.)
+ bool BHasFailedToReceivedMsgType( MsgType_t m ) const;
+
+ /// Mark that we awaited a message of the specified type, but timed out
+ void MarkFailedToReceivedMsgType( MsgType_t m );
+
+ /// Clear flag that we timed out waiting on a message of the specified type.
+ /// This is used to allow you to wait ont he same message again, even if
+ /// you know the reply might be a late reply. It's up to you to deal with
+ /// mismatched replies!
+ void ClearFailedToReceivedMsgType( MsgType_t m );
+
+protected:
+ // main job implementation, in the coroutine. Every job must implement at least one of these methods.
+ virtual bool BYieldingRunJob( void * pvStartParam ) { Assert( false ); return true; } // implement this if your job can be started directly
+ virtual bool BYieldingRunJobFromMsg( IMsgNetPacket * pNetPacket ) { Assert( false ); return true; } // implement this if your job can be started by a network message
+
+ // Can be overridden to return a different timeout per job class
+ virtual uint32 CHeartbeatsBeforeTimeout();
+
+ // Called by CJobMgr to send heartbeat message to our listeners during long operations
+ void Heartbeat();
+
+
+ // accessor to get access to the JobMgr from the server we belong to
+ CJobMgr &GetJobMgr();
+ uint32 m_bRunFromMsg:1,
+ m_bWorkItemCanceled:1, // true if the work item we were waiting on was canceled
+ m_bIsTest:1,
+ m_bIsLongRunning:1;
+
+private:
+ // starts the coroutine that activates the job
+ void InitCoroutine();
+
+ // continues the current job
+ void Continue();
+
+ // break into this coroutine - can only be called from OUTSIDE this coroutine
+ void Debug();
+
+ // pauses the current job - can only be called from inside a coroutine
+ void Pause( EJobPauseReason eReason );
+
+ static void BRunProxy( void *pvThis );
+
+ JobID_t m_JobID; // Our unique identifier.
+ HCoroutine m_hCoroutine;
+ void * m_pvStartParam; // Start params for our job, if any
+ // all these flags indicate some kind of failure and we will want to report them
+ union {
+ struct {
+ uint32
+ m_bJobFailed:1, // true if BYieldingRunJob returned false
+ m_bLocksFailed:1,
+ m_bLocksLongHeld:1,
+ m_bLocksLongWait:1,
+ m_bWaitTimeout:1,
+ m_bLongInterYield:1,
+ m_bTimeoutNetMsg:1,
+ m_bTimeoutOther:1,
+ m_uUnused:24;
+ } m_bits;
+ uint32 m_uFlags;
+ } m_flags;
+ int m_cLocksAttempted;
+ int m_cLocksWaitedFor;
+ EJobPauseReason m_ePauseReason;
+ MsgType_t m_unWaitMsgType;
+ CJobTime m_STimeStarted; // time (frame) at which this job started
+ CJobTime m_STimeSwitched; // time (frame) at which we were last paused or continued
+ CJobTime m_STimeNextHeartbeat; // Time at which next heartbeat should be performed
+ CFastTimer m_FastTimerDelta; // How much time we've been running for without yielding
+ CCycleCount m_cyclecountTotal; // Total runtime
+ CJob *m_pJobPrev; // the job that launched us
+
+ // lock manipulation
+ void _SetLock( CLock *pLock, const char *filename, int line );
+ void UnsetLock( CLock *pLock );
+ void PassLockToJob( CJob *pNewJob, CLock *pLock );
+ void OnLockDeleted( CLock *pLock );
+ void AddJobToNotifyOnLockRelease( CJob *pJob );
+ CUtlVectorFixedGrowable< CLock *, 2 > m_vecLocks;
+ CLock *m_pWaitingOnLock; // lock we're waiting on, if any
+ const char *m_pWaitingOnLockFilename;
+ int m_waitingOnLockLine;
+ CJob *m_pJobToNotifyOnLockRelease; // other job that wants this lock
+ CWorkItem *m_pWaitingOnWorkItem; // set if job is waiting for this work item
+
+ CJobMgr &m_JobMgr; // our job manager
+ CUtlVectorFixedGrowable< IMsgNetPacket *, 1 > m_vecNetPackets; // list of tcp packets currently held by this job (ie, needing release on job exit)
+
+ /// List of message types that we have waited for, but timed out.
+ /// once this happens, we currently do not have a good mechanism
+ /// to recover gracefully. But we at least can detect the situation
+ /// and avoid getting totally hosed or processing the wrong reply
+ CUtlVector<MsgType_t> m_vecMsgTypesFailedToReceive;
+
+ // pointer to our own static job info
+ struct JobType_t const *m_pJobType;
+
+ // Name of the job for when it's not registered
+ const char *m_pchJobName;
+
+ // A stack of do not yield guards so we can print the right warning if they're nested
+ CUtlLinkedList<const char *> m_stackDoNotYieldGuards;
+
+ // setting the job info
+ friend void Job_SetJobType( CJob &job, const JobType_t *pJobType );
+ friend class CJobMgr;
+ friend class CLock;
+
+ // used to store the memory allocation stack
+ CUtlMemory< unsigned char > m_memAllocStack;
+};
+
+
+// Only one job can be running at a time. We keep a global accessor to it.
+extern CJob *g_pJobCur;
+inline CJob &GJobCur() { Assert( g_pJobCur != NULL ); return *g_pJobCur; }
+
+#define AssertRunningJob() { Assert( NULL != g_pJobCur ); }
+#define AssertRunningThisJob() { Assert( this == g_pJobCur ); }
+#define AssertNotRunningThisJob() { Assert( this != g_pJobCur ); }
+#define AssertNotRunningJob() { Assert( NULL == g_pJobCur ); }
+
+
+//-----------------------------------------------------------------------------
+// Purpose: simple locking class
+// add this object to any classes you want jobs to be able to lock
+//-----------------------------------------------------------------------------
+class CLock
+{
+public:
+ CLock( );
+ ~CLock();
+
+ bool BIsLocked() { return m_pJob != NULL; }
+ CJob *GetJobLocking() { return m_pJob; }
+ CJob *GetJobWaitingQueueHead() { return m_pJobToNotifyOnLockRelease; }
+ CJob *GetJobWaitingQueueTail() { return m_pJobWaitingQueueTail; }
+ void AddToWaitingQueue( CJob *pJob );
+ const char *GetName() const;
+ void SetName( const char *pchName );
+ void SetName( const char *pchPrefix, uint64 ulID );
+ void SetName( const CSteamID &steamID );
+ int16 GetLockType() const { return m_nsLockType; }
+ void SetLockType( int16 nsLockType ) { m_nsLockType = nsLockType; }
+ uint64 GetLockSubType() const { return m_unLockSubType; }
+ void SetLockSubType( uint64 unLockSubType ) { m_unLockSubType = unLockSubType; }
+ int32 GetWaitingCount() const { return m_nWaitingCount; }
+ int64 GetMicroSecondsSinceLock() const { return m_sTimeAcquired.CServerMicroSecsPassed(); }
+ void IncrementReference();
+ int DecrementReference();
+ void ClearReference() { m_nRefCount = 0; }
+ int32 GetReferenceCount() const { return m_nRefCount; }
+
+ void Dump( const char *pszPrefix = "\t\t", int nPrintMax = 1, bool bPrintWaiting = true ) const;
+
+private:
+ enum ENameType
+ {
+ k_ENameTypeNone = 0,
+ k_ENameTypeSteamID = 1,
+ k_ENameTypeConstStr = 2,
+ k_ENameTypeConcat = 3,
+ };
+
+ CJob *m_pJob; // the job that's currently locking us
+ CJob *m_pJobToNotifyOnLockRelease; // Pointer to the first job waiting on us
+ CJob *m_pJobWaitingQueueTail; // Pointer to the last job waiting on us
+ int16 m_nsLockType; // Lock priority for safely waiting on multiple locks
+ int16 m_nsNameType; // Enum that controls how this lock is named
+
+ uint64 m_ulID; // ID part of the lock's name
+ const char *m_pchConstStr; // Prefix part of the lock's name
+ mutable CUtlString m_strName; // Cached name
+
+ int32 m_nRefCount; // # of times locked
+ int32 m_nWaitingCount; // Count of jobs waiting on the lock
+ CJobTime m_sTimeAcquired; // Time the lock was last locked
+ uint64 m_unLockSubType;
+
+ const char *m_pFilename; // Filename of the source file who aquired this lock
+ int m_line; // Line number of the filename
+
+ friend class CJob;
+};
+
+//-----------------------------------------------------------------------------
+// Purpose: automatic locking class
+//-----------------------------------------------------------------------------
+
+class CAutoCLock
+{
+public:
+ CAutoCLock( CLock &refLock )
+ : m_pLock ( &refLock)
+ {
+ DbgVerify( GJobCur().BYieldingAcquireLock( m_pLock ) );
+ }
+
+ // explicitly unlock before destruction
+ void Unlock()
+ {
+ if ( m_pLock != NULL )
+ GJobCur().ReleaseLock( m_pLock );
+ m_pLock = NULL;
+ }
+
+ ~CAutoCLock( )
+ {
+ Unlock();
+ }
+
+private:
+ CLock *m_pLock;
+ CJob *m_pJob;
+};
+
+} // namespace GCSDK
+
+#include "tier0/memdbgoff.h"
+
+#endif // GC_JOB_H