diff options
Diffstat (limited to 'gcsdk/jobmgr.cpp')
| -rw-r--r-- | gcsdk/jobmgr.cpp | 1626 |
1 files changed, 1626 insertions, 0 deletions
diff --git a/gcsdk/jobmgr.cpp b/gcsdk/jobmgr.cpp new file mode 100644 index 0000000..1821740 --- /dev/null +++ b/gcsdk/jobmgr.cpp @@ -0,0 +1,1626 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +// $NoKeywords: $ +//============================================================================= + + +#include "stdafx.h" + +// memdbgon must be the last include file in a .cpp file!!! +#include "tier0/memdbgon.h" + +namespace GCSDK +{ +#ifdef DEBUG_JOB_LIST +CUtlLinkedList<CJob *,int> CJobMgr::sm_listAllJobs; +#endif + +typedef int (__cdecl *QSortCompareFuncCtx_t)(void *, const void *, const void *); + + +//----------------------------------------------------------------------------- +// Purpose: Constructor +//----------------------------------------------------------------------------- +CJobMgr::CJobMgr() +: m_MapJob( 0, 0, DefLessFunc( GID_t ) ), + m_QueueJobSleeping( 0, 0, &JobSleepingLessFunc ), + m_unNextJobID( 0 ), + m_mapStatsBucket( 0, 0, DefLessFunc(uint32) ), + m_WorkThreadPool( "CJobMgr::m_WorkThreadPool" ), + m_bDebugDisallowPause( false ) +{ + SetDefLessFunc( m_MapJobTimeoutsIndexByJobID ); + SetDefLessFunc( m_mapOrphanMessages ); + m_bJobTimedOut = false; + m_nCurrentYieldIterationRegPri = 0; + m_bProfiling = false; + m_bIsShuttingDown = false; + m_cErrorsToReport = 0; + m_unFrameFuncThreadID = 0; + m_WorkThreadPool.SetWorkThreadAutoConstruct( 1, NULL ); + + if( MemAlloc_GetDebugInfoSize() > 0 ) + { + g_memMainDebugInfo.Init( 0, MemAlloc_GetDebugInfoSize() ); + } + + if( MemAlloc_GetDebugInfoSize() > 0 ) + { + g_memMainDebugInfo.EnsureCapacity( MemAlloc_GetDebugInfoSize() ); + } +} + + +//----------------------------------------------------------------------------- +// Purpose: Constructor +//----------------------------------------------------------------------------- +CJobMgr::~CJobMgr() +{ + m_WorkThreadPool.StopWorkThreads(); +} + + +//----------------------------------------------------------------------------- +// Purpose: limit the size of our thread pool +//----------------------------------------------------------------------------- +void CJobMgr::SetThreadPoolSize( uint cThreads ) +{ + m_WorkThreadPool.SetWorkThreadAutoConstruct( cThreads, NULL ); +} + + +//----------------------------------------------------------------------------- +// Purpose: gets the next available job ID +//----------------------------------------------------------------------------- +JobID_t CJobMgr::GetNewJobID() +{ +#ifdef GC + return GGCHost()->GenerateGID(); +#else + return ++m_unNextJobID; +#endif +} + + +//----------------------------------------------------------------------------- +// Purpose: Run jobs +// Runs once per frame and resumes any sleeping jobs that are scheduled +// to run again, also checks for jobs which have timed out. +// +// Input: limitTimer - limit timer not to exceed +// Output: true if there is still work remaining to do, false otherwise +//----------------------------------------------------------------------------- +bool CJobMgr::BFrameFuncRunSleepingJobs( CLimitTimer &limitTimer ) +{ + CheckThreadID(); // make sure frame function is called from correct thread + + bool bWorkRemaining = false; + + { + VPROF_BUDGET( "CJobMgr::BResumeSleepingJobs", VPROF_BUDGETGROUP_JOBS_COROUTINES ); + bWorkRemaining |= BResumeSleepingJobs( limitTimer ); + } + + { + VPROF_BUDGET( "CJobMgr::CheckForJobTimeouts", VPROF_BUDGETGROUP_JOBS_COROUTINES ); + CheckForJobTimeouts( limitTimer ); + } + + m_JobStats.m_cJobsCurrent = CountJobs(); + + return bWorkRemaining; +} + + + +//----------------------------------------------------------------------------- +// Purpose: Run jobs +// This function is called repeatedly in a single frame if time is left +// and will first run any yielding jobs +// Input: limitTimer - limit timer not to exceed +// Output: true if there is still work remaining to do, false otherwise +//----------------------------------------------------------------------------- +bool CJobMgr::BFrameFuncRunYieldingJobs( CLimitTimer &limitTimer ) +{ + CheckThreadID(); // make sure frame function is called from correct thread + + bool bWorkRemaining = false; + + { + VPROF_BUDGET( "CJobMgr::BResumeYieldingJobs", VPROF_BUDGETGROUP_JOBS_COROUTINES ); + bWorkRemaining |= BResumeYieldingJobs( limitTimer ); + } + + { + VPROF_BUDGET( "CJobMgr -- Dispatch completed work items", VPROF_BUDGETGROUP_JOBS_COROUTINES ); + bWorkRemaining |= m_WorkThreadPool.BDispatchCompletedWorkItems( limitTimer, this ); + } + + m_JobStats.m_cJobsCurrent = CountJobs(); + + return bWorkRemaining; +} + + +//----------------------------------------------------------------------------- +// Purpose: Registers a new job for us to keep track of. +// Input: job - The job in question +//----------------------------------------------------------------------------- +void CJobMgr::InsertJob( CJob &job ) +{ + Assert( m_MapJob.Find( job.GetJobID() ) == m_MapJob.InvalidIndex() ); + m_MapJob.Insert( job.GetJobID(), &job ); +#ifdef DEBUG_JOB_LIST + sm_listAllJobs.AddToTail( &job ); +#endif +} + + +//----------------------------------------------------------------------------- +// purpose: This job is done, accumulate its stats +//----------------------------------------------------------------------------- +void CJobMgr::AccumulateStatsofJob( CJob &job ) +{ + // if we are not profiling, but the job experienced some kind of failure + // record it anyway - we will issue a consolidated spew about it + if ( !m_bProfiling && job.m_flags.m_uFlags == 0 ) + return; + if ( job.m_flags.m_uFlags ) + m_cErrorsToReport++; + + job.m_FastTimerDelta.End(); + job.m_cyclecountTotal += job.m_FastTimerDelta.GetDuration(); + + uint32 eBucket = 0; + // the pointer to the name is a pointer to a constant string + // so use this dirty trick to make lookups fast + eBucket = (uint32)job.GetName(); + int iBucket = m_mapStatsBucket.Find( eBucket ); + if ( iBucket == m_mapStatsBucket.InvalidIndex() ) + { + iBucket = m_mapStatsBucket.Insert( eBucket ); + V_strcpy_safe( m_mapStatsBucket[iBucket].m_rgchName, job.GetName() ); + } + + JobStatsBucket_t *pJobStatsBucket = &m_mapStatsBucket[iBucket]; + pJobStatsBucket->m_cCompletes++; + pJobStatsBucket->m_cLocksAttempted += job.m_cLocksAttempted; + pJobStatsBucket->m_cLocksWaitedFor += job.m_cLocksWaitedFor; + pJobStatsBucket->m_cLocksFailed += job.m_flags.m_bits.m_bLocksFailed ? 1 : 0; + pJobStatsBucket->m_cLocksLongHeld += job.m_flags.m_bits.m_bLocksLongHeld ? 1 : 0; + pJobStatsBucket->m_cLocksLongWait += job.m_flags.m_bits.m_bLocksLongWait ? 1 : 0; + pJobStatsBucket->m_cWaitTimeout += job.m_flags.m_bits.m_bWaitTimeout ? 1 : 0; + pJobStatsBucket->m_cJobsFailed += job.m_flags.m_bits.m_bJobFailed ? 1 : 0; + pJobStatsBucket->m_cLongInterYieldTime += job.m_flags.m_bits.m_bLongInterYield ? 1 : 0; + pJobStatsBucket->m_cTimeoutNetMsg += job.m_flags.m_bits.m_bTimeoutNetMsg ? 1 : 0; + + pJobStatsBucket->m_u64RunTime += job.m_cyclecountTotal.GetLongCycles(); + if ( (uint64)job.m_cyclecountTotal.GetLongCycles() > pJobStatsBucket->m_u64RunTimeMax ) + pJobStatsBucket->m_u64RunTimeMax = job.m_cyclecountTotal.GetLongCycles(); + if ( job.m_STimeSwitched != job.m_STimeStarted ) + { + pJobStatsBucket->m_cJobsPaused++; + pJobStatsBucket->m_u64JobDuration += job.m_STimeStarted.CServerMicroSecsPassed(); + } + else + { + pJobStatsBucket->m_u64JobDuration += job.m_cyclecountTotal.GetMicroseconds(); + } +} + + +//----------------------------------------------------------------------------- +// purpose: This message was orphaned, accumulate for stats +//----------------------------------------------------------------------------- +void CJobMgr::RecordOrphanedMessage( MsgType_t eMsg, JobID_t jobIDTarget ) +{ + EG_MSG( SPEW_JOB, "Message %s arrived responding to job %lld which no longer exists, dropping message\n", PchMsgNameFromEMsg( eMsg ), jobIDTarget ); + int iBucket = m_mapOrphanMessages.Find( eMsg ); + if ( iBucket == m_mapOrphanMessages.InvalidIndex() ) + { + int ct = 0; + iBucket = m_mapOrphanMessages.Insert( eMsg, ct ); + } + m_mapOrphanMessages[iBucket]++; +} + + +//----------------------------------------------------------------------------- +// Purpose: Removes a job from the manager. Note that we don't free it. +// Input: job - The job in question +//----------------------------------------------------------------------------- +void CJobMgr::RemoveJob( CJob &job ) +{ + m_MapJob.Remove( job.GetJobID() ); + + AccumulateStatsofJob( job ); + m_JobStats.m_cJobsTotal++; + if ( job.m_flags.m_bits.m_bJobFailed ) + m_JobStats.m_cJobsFailed++; + + uint64 u64JobDuration = job.m_STimeStarted.CServerMicroSecsPassed(); + m_JobStats.m_flSumJobTimeMicrosec += u64JobDuration; + m_JobStats.m_flSumSqJobTimeMicrosec += ((double)u64JobDuration * (double)u64JobDuration); + if ( u64JobDuration > m_JobStats.m_unMaxJobTimeMicrosec ) + { + m_JobStats.m_unMaxJobTimeMicrosec = u64JobDuration; + } + +#ifdef DEBUG_JOB_LIST + sm_listAllJobs.FindAndRemove( &job ); +#endif +} + + +#ifdef GC +//----------------------------------------------------------------------------- +// Purpose: resumes the specified job if it is, in fact, waiting for a SQL query +// to return +//----------------------------------------------------------------------------- +bool CJobMgr::BResumeSQLJob( JobID_t jobID ) +{ + int iMap = m_mapSQLQueriesInFlight.Find( jobID ); + if ( m_mapSQLQueriesInFlight.IsValidIndex( iMap ) ) + { + if ( m_bSQLProfiling && m_dictSQLBuckets.IsValidIndex( m_mapSQLQueriesInFlight[iMap].m_iBucket ) ) + { + SQLProfileBucket_t &bucket = m_dictSQLBuckets[ m_mapSQLQueriesInFlight[iMap].m_iBucket ]; + bucket.m_unCount++; + bucket.m_nTotalMicrosec += (int64)m_sqlTimer.GetDurationInProgress().GetUlMicroseconds() - m_mapSQLQueriesInFlight[iMap].m_nStartMicrosec; + } + + m_mapSQLQueriesInFlight.RemoveAt( iMap ); + } + + int iJob; + if ( !BGetIJob( jobID, k_EJobPauseReasonSQL, true, &iJob ) ) + { + EG_MSG( SPEW_JOB, "BResumeSQLJob called for a job that could not be found!\n" ); + return false; + } + + // Just change the job's pause reason and add it to the yield list + // it will wake up on the next heartbeat + m_MapJob[iJob]->EndPause( k_EJobPauseReasonSQL ); + AddToYieldList( *m_MapJob[iJob] ); + + return true; +} +#endif + +//----------------------------------------------------------------------------- +// Purpose: returns true if we're running any jobs of the specified name +// Output : Returns true on success, false on failure. +//----------------------------------------------------------------------------- +bool CJobMgr::BIsJobRunning( const char *pchJobName ) +{ + FOR_EACH_MAP_FAST( m_MapJob, i ) + { + if ( !Q_stricmp( m_MapJob[i]->GetName(), pchJobName ) ) + return true; + } + return false; +} + + +//----------------------------------------------------------------------------- +// Purpose: returns true if there is a job active with the specified ID +//----------------------------------------------------------------------------- +bool CJobMgr::BJobExists( JobID_t jobID ) const +{ + return ( m_MapJob.Find( jobID ) != m_MapJob.InvalidIndex() ); +} + + +//----------------------------------------------------------------------------- +// Purpose: returns a job pointer by id +//----------------------------------------------------------------------------- +const CJob *CJobMgr::GetPJob( JobID_t jobID ) const +{ + int iMap = m_MapJob.Find( jobID ); + if ( iMap != m_MapJob.InvalidIndex() ) + { + return m_MapJob[iMap]; + } + return NULL; +} + +CJob *CJobMgr::GetPJob( JobID_t jobID ) +{ + int iMap = m_MapJob.Find( jobID ); + if ( iMap != m_MapJob.InvalidIndex() ) + { + return m_MapJob[iMap]; + } + return NULL; +} + +//----------------------------------------------------------------------------- +// Purpose: Examines an incoming message to see if it belongs to an active job, +// and if so, sends it to that job. Creates a new job if necessary. +// Output: true if the message was routed to a job +//----------------------------------------------------------------------------- +bool CJobMgr::BRouteMsgToJob( void *pParent, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo ) +{ + if ( pNetPacket == NULL ) + { + AssertMsg(pNetPacket, "CJobMgr::BRouteMsgToJob received NULL packet."); + return false; + } + + if ( jobMsgInfo.m_JobIDTarget != k_GIDNil ) + { + // This message is a reply to a running job + VPROF_BUDGET( "CJobMgr::BRouteMsgToJob() - continue job", VPROF_BUDGETGROUP_JOBS_COROUTINES ); + + // Find the job that this packet is destined for + int iJob = m_MapJob.Find( jobMsgInfo.m_JobIDTarget ); + if ( m_MapJob.InvalidIndex() != iJob ) + { + // found the right job, pass it off + PassMsgToJob( *(m_MapJob[iJob]), pNetPacket, jobMsgInfo ); + return true; + } + + // The job is no longer running, it most likely timed out before the response arrived. + // Continue and see if a job is registered to launch from this message + } + + // no job, so try creating a job that can handle the msg + // We pass in a pointer to m_JobIDTarget so that it gets set to the new Job's ID. This ensures + // that anyone replying to this message from within the new job has the right JobIDSource. + VPROF_BUDGET( "CJobMgr::BRouteMsgToJob() - job", VPROF_BUDGETGROUP_JOBS_COROUTINES ); + bool bRet = BLaunchJobFromNetworkMsg( pParent, jobMsgInfo, pNetPacket ); + + if ( !bRet && jobMsgInfo.m_JobIDTarget != k_GIDNil ) + { + RecordOrphanedMessage( jobMsgInfo.m_eMsg, jobMsgInfo.m_JobIDTarget ); + // return that we've handled this message (as much as it possibly can be) -- was intended for a job that has + // timed out, no one else can do anything with it + return true; + } + + return bRet; +} + + +//----------------------------------------------------------------------------- +// Purpose: Routes a message directly to the specified job +//----------------------------------------------------------------------------- +void CJobMgr::PassMsgToJob( CJob &job, IMsgNetPacket *pNetPacket, const JobMsgInfo_t &jobMsgInfo ) +{ + // Check if this job previously failed to wait for this message type, + // then this is probably a late reply. Discard it + if ( job.BHasFailedToReceivedMsgType( jobMsgInfo.m_eMsg ) ) + { + EmitInfo( SPEW_JOB, 2, LOG_ALWAYS, "Reply msg type %s to job %s is too late; discarding\n", PchMsgNameFromEMsg( jobMsgInfo.m_eMsg ), job.GetName() ); + return; + } + + // make sure it's what we're waiting for + if ( job.GetPauseReason() != k_EJobPauseReasonNetworkMsg ) + { + AssertMsg3( false, "CJobMgr::PassMsgToJob() job %s received unexpected message %s when paused for %s\n", job.GetName(), PchMsgNameFromEMsg( jobMsgInfo.m_eMsg ), job.GetPauseReasonDescription() ); + } + + // In case of error, we need to throw this message away + if ( job.GetPauseReason() != k_EJobPauseReasonNetworkMsg ) + return; + + // Add the packet and resume the job + job.AddPacketToList( pNetPacket, jobMsgInfo.m_JobIDSource ); + job.EndPause( k_EJobPauseReasonNetworkMsg ); + AddToYieldList( job ); + + return; +} + + +//----------------------------------------------------------------------------- +// Purpose: pauses the job until a network msg for the specified job arrives +//----------------------------------------------------------------------------- +bool CJobMgr::BYieldingWaitForMsg( CJob &job ) +{ + // wait until we're woken up by a networking callback, or a timeout + PauseJob( job, k_EJobPauseReasonNetworkMsg ); + return !m_bJobTimedOut; +} + + +//----------------------------------------------------------------------------- +// Purpose: Returns IJob matching a JobID, if it is paused for the given reason +// Input: jobID - The job that should be paused for the given reason +// eJobPauseReason - Pause reason +// bShouldExist - If true, job should exist, so asserts on not finding it ok +// pIJob - IJob to fill in +// Output: true if job paused for matching reason found +//----------------------------------------------------------------------------- +bool CJobMgr::BGetIJob( JobID_t jobID, EJobPauseReason eJobPauseReason, bool bShouldExist, int *pIJob ) +{ + // If this isn't owned by a job, we don't handle it + if ( k_GIDNil == jobID ) + return false; + + // Figure out which job the msg belongs to + int iJob = m_MapJob.Find( jobID ); + Assert( m_MapJob.InvalidIndex() != iJob || !bShouldExist ); + + // If it's not one of ours, ignore it + if ( m_MapJob.InvalidIndex() == iJob ) + return false; + + // make sure it's what we're waiting for + if ( m_MapJob[iJob]->GetPauseReason() != eJobPauseReason ) + return false; + + *pIJob = iJob; + return true; +} + + +//----------------------------------------------------------------------------- +// Purpose: yields for a set amount of time +// Input : &job - job that is yielding +// m_cMicrosecondsToSleep - number of microseconds to wait for before resuming job +// Output : Returns true on success, false on failure. +//----------------------------------------------------------------------------- +bool CJobMgr::BYieldingWaitTime( CJob &job, uint32 cMicrosecondsToSleep ) +{ + Assert( cMicrosecondsToSleep < k_cMicroSecJobPausedTimeout ); + // sleep of zero causes an infinite loop + Assert( 0 != cMicrosecondsToSleep ); + +#ifdef _DEBUG + for ( int i = 0; i < m_QueueJobSleeping.Count(); i++ ) + { + Assert( m_QueueJobSleeping.Element(i).m_JobID != job.GetJobID() ); + } +#endif + + // insert the job into the sleep list + JobSleeping_t jobSleeping; + jobSleeping.m_JobID = job.GetJobID(); + jobSleeping.m_SWakeupTime.SetFromJobTime( cMicrosecondsToSleep ); + jobSleeping.m_STimeTouched.SetToJobTime(); + m_QueueJobSleeping.Insert( jobSleeping ); + + // yield + PauseJob( job, k_EJobPauseReasonSleepForTime ); + if ( m_bJobTimedOut ) + return false; + + return true; +} + + +#ifdef GC +//----------------------------------------------------------------------------- +// Purpose: yields waiting for a query response +// Input : &job - job that is yielding +// Output : Returns true on success, false on failure. +//----------------------------------------------------------------------------- +// yields waiting for a query response +bool CJobMgr::BYieldingRunQuery( CJob &job, CGCSQLQueryGroup *pQueryGroup, ESchemaCatalog eSchemaCatalog ) +{ + // clear the existing results pointer, if any, to make space for the results + // for this query + pQueryGroup->SetResults( NULL ); + + if ( m_bSQLProfiling ) + { + const char *pchName = pQueryGroup->PchName(); + if ( !pchName || !pchName[0] ) + { + if ( pQueryGroup->GetStatementCount() == 1 ) + { + pchName = pQueryGroup->PchCommand( 0 ); + } + + if ( !pchName || !pchName[0] ) + { + pchName = job.GetName(); + } + } + + PendingSQLJob_t sqlJob; + sqlJob.m_nStartMicrosec = (int64)m_sqlTimer.GetDurationInProgress().GetUlMicroseconds(); + sqlJob.m_iBucket = m_dictSQLBuckets.Find( pchName ); + if ( !m_dictSQLBuckets.IsValidIndex( sqlJob.m_iBucket ) ) + { + SQLProfileBucket_t bucket = { 0, 0 }; + sqlJob.m_iBucket = m_dictSQLBuckets.Insert( pchName, bucket ); + } + m_mapSQLQueriesInFlight.Insert( job.GetJobID(), sqlJob ); + } + + VPROF_BUDGET( "GCHost", VPROF_BUDGETGROUP_STEAM ); + { + VPROF_BUDGET( "GCHost - SQLQuery", VPROF_BUDGETGROUP_STEAM ); + GGCHost()->SQLQuery( job.GetJobID(), pQueryGroup, eSchemaCatalog ); + } + PauseJob( job, k_EJobPauseReasonSQL ); + return pQueryGroup->GetResults() && pQueryGroup->GetResults()->GetError() == k_EGCSQLErrorNone; +} + + +//----------------------------------------------------------------------------- +// Purpose: turns on sql profiling +//----------------------------------------------------------------------------- +void CJobMgr::StartSQLProfiling() +{ + if ( m_bSQLProfiling ) + return; + + m_mapSQLQueriesInFlight.RemoveAll(); + m_dictSQLBuckets.RemoveAll(); + m_sqlTimer.Start(); + m_bSQLProfiling = true; +} + + +//----------------------------------------------------------------------------- +// Purpose: turns off sql profiling +//----------------------------------------------------------------------------- +void CJobMgr::StopSQLProfiling() +{ + if ( !m_bSQLProfiling ) + return; + + m_mapSQLQueriesInFlight.RemoveAll(); + m_sqlTimer.End(); + m_bSQLProfiling = false; +} + + + +//----------------------------------------------------------------------------- +// Purpose: sql profile sort func +//----------------------------------------------------------------------------- +int CJobMgr::SQLProfileSortFunc( void *pCtx, const int *lhs, const int *rhs ) +{ + SQLProfileCtx_t *pSQLProfileCtx = (SQLProfileCtx_t *)pCtx; + CUtlDict<SQLProfileBucket_t> *pDictBuckets = pSQLProfileCtx->pdictBuckets; + SQLProfileBucket_t &lhsBucket = pDictBuckets->Element( *lhs ); + SQLProfileBucket_t &rhsBucket = pDictBuckets->Element( *rhs ); + + switch ( pSQLProfileCtx->m_eSort ) + { + default: + case k_ESQLProfileSortTotalTime: return rhsBucket.m_nTotalMicrosec - lhsBucket.m_nTotalMicrosec; + case k_ESQLProfileSortTotalCount: return rhsBucket.m_unCount - lhsBucket.m_unCount; + case k_ESQLProfileSortAvgTime: return ( rhsBucket.m_nTotalMicrosec / rhsBucket.m_unCount ) - ( lhsBucket.m_nTotalMicrosec / lhsBucket.m_unCount ); + case k_ESQLProfileSortName: return Q_stricmp( pDictBuckets->GetElementName( *lhs ), pDictBuckets->GetElementName( *rhs ) ); + } +} + + +//----------------------------------------------------------------------------- +// Purpose: dumps the current sql profile +//----------------------------------------------------------------------------- +void CJobMgr::DumpSQLProfile( ESQLProfileSort eSort ) +{ + CUtlVector<int> vecSort; + for ( int iDict = 0; iDict < m_dictSQLBuckets.MaxElement(); iDict++ ) + { + if ( !m_dictSQLBuckets.IsValidIndex( iDict ) ) + continue; + + if ( m_dictSQLBuckets[iDict].m_unCount > 0 ) + { + vecSort.AddToTail( iDict ); + } + } + + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "SQL statement stats:\n" ); + if ( 0 == vecSort.Count() ) + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "\tNo SQL stats collected; use sql_profile_on / sql_profile_off to collect stats first\n" ); + return; + } + + // sort + SQLProfileCtx_t ctx; + ctx.m_eSort = eSort; + ctx.pdictBuckets = &m_dictSQLBuckets; + + V_qsort_s( vecSort.Base(), vecSort.Count(), sizeof(int), (QSortCompareFuncCtx_t)SQLProfileSortFunc, &ctx ); + + // display + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%8s %8s %8s\n", "count", "time", "avg" ); + FOR_EACH_VEC( vecSort, i ) + { + SQLProfileBucket_t &bucket = m_dictSQLBuckets[ vecSort[i] ]; + const char *pchStatement = m_dictSQLBuckets.GetElementName( vecSort[i] ); + + // cleanup the statement text + char rgchCleaned[140]; + V_strcpy_safe( rgchCleaned, pchStatement ); + for ( int i = 0; NULL != rgchCleaned[i]; i++ ) + { + if ( '\n' == rgchCleaned[i] || '\t' == rgchCleaned[i] ) + { + rgchCleaned[i] = ' '; + } + } + + bool bSeconds = bucket.m_nTotalMicrosec > k_nMillion; + float fTime = bucket.m_nTotalMicrosec / 1000.0f / ( bSeconds ? 1000.0f : 1.0f ); + + // render + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%8d %8.2f%s %8.2f %s\n", + bucket.m_unCount, + fTime, + bSeconds ? "s " : "ms", + (float)bucket.m_nTotalMicrosec / (float)bucket.m_unCount / 1000.0f, + rgchCleaned ); + } +} +#endif + + +//----------------------------------------------------------------------------- +// Purpose: pauses job until a work item completes +//----------------------------------------------------------------------------- +bool CJobMgr::BYieldingWaitForWorkItem( CJob &job, const char *pszWorkItemName ) +{ + // wait until we're woken up by a work item completed, or a timeout + PauseJob( job, k_EJobPauseReasonWorkItem ); + if ( m_bJobTimedOut || job.m_bWorkItemCanceled ) + return false; + + return true; +} + + +//----------------------------------------------------------------------------- +// Purpose: adds a job work item to the thread pool +//----------------------------------------------------------------------------- +void CJobMgr::AddThreadedJobWorkItem( CWorkItem *pWorkItem ) +{ + m_WorkThreadPool.AddWorkItem( pWorkItem ); +} + + +//----------------------------------------------------------------------------- +// Purpose: returns true if we're still working +//----------------------------------------------------------------------------- +bool CJobMgr::HasOutstandingThreadPoolWorkItems() +{ + return m_WorkThreadPool.HasWorkItemsToProcess(); +} + +//----------------------------------------------------------------------------- +// Purpose: Mark that we're shutting down +//----------------------------------------------------------------------------- +void CJobMgr::SetIsShuttingDown() +{ + m_WorkThreadPool.AllowTimeouts( true ); // during shutdown, we might abort jobs before waiting for the work item to complete + m_bIsShuttingDown = true; +} + + +//----------------------------------------------------------------------------- +// Purpose: Wakes up the specified waiting job. +// Input: jobID - The job that owns this work item +// bWorkItemCanceled - true if this job +// bShouldExist - Do we assert if the job doesn't exist? +// Output: true if the message was routed to a job +//----------------------------------------------------------------------------- +bool CJobMgr::BRouteWorkItemCompletedInternal( JobID_t jobID, bool bWorkItemCanceled, bool bShouldExist, bool bResumeImmediately ) +{ + int iJob; + + // this can resume jobs, make sure we didn't switch threads + CheckThreadID(); + + if ( !BGetIJob( jobID, k_EJobPauseReasonWorkItem, bShouldExist, &iJob ) ) + { + EG_MSG( SPEW_JOB, "BRouteWorkItemCompleted called for a job that could not be found!\n" ); + return false; + } + + // continue the job + m_MapJob[iJob]->m_bWorkItemCanceled = bWorkItemCanceled; + if ( bResumeImmediately ) + { + m_MapJob[iJob]->Continue(); + } + else + { + AddToYieldList( *m_MapJob[iJob] ); + + // reset the sleep reason + m_MapJob[iJob]->m_ePauseReason = k_EJobPauseReasonYield; + } + + return true; +} + + +//----------------------------------------------------------------------------- +// Purpose: Adds job to yield list (without actually pausing it) - internal +// Input : &job - job that is yielding +// Output : Returns true on success, false on failure. +//----------------------------------------------------------------------------- +void CJobMgr::AddToYieldList( CJob &job ) +{ +#ifdef _DEBUG + FOR_EACH_LL( m_ListJobsYieldingRegPri, i ) + { + Assert( m_ListJobsYieldingRegPri[i].m_JobID != job.GetJobID() ); + } +#endif + + // insert the job into the sleep list + JobYielding_t jobYielding; + jobYielding.m_JobID = job.GetJobID(); + jobYielding.m_nIteration = m_nCurrentYieldIterationRegPri; + m_ListJobsYieldingRegPri.AddToTail( jobYielding ); +} + +//----------------------------------------------------------------------------- +// called by a job that has just been started to place itself on the yield queue instead of running +//----------------------------------------------------------------------------- +void CJobMgr::AddDelayedJobToYieldList( CJob &job ) +{ + //make sure that this job is setup to be yielded at this point, otherwise it will not resume properly + AssertMsg1( job.GetPauseReason() == k_EJobPauseReasonYield, "Delayed job %s was added to yield list but was not in expected yield state\n", job.GetName() ); + AddToYieldList( job ); +} + + +//----------------------------------------------------------------------------- +// Purpose: yields until the next Run() +// Input : &job - job that is yielding +// Output : Returns true on success, false on failure. +//----------------------------------------------------------------------------- +bool CJobMgr::BYield( CJob &job ) +{ + AddToYieldList( job ); + + // yield + PauseJob( job, k_EJobPauseReasonYield ); + if ( m_bJobTimedOut ) + return false; + + return true; +} + + +//----------------------------------------------------------------------------- +// Purpose: yields IF NEEDED until the next Run() +// Input : &job - job that is possibly yielding +// pbYielded - optional, set to true if we did yield +// Output : Returns true on success, false on failure. +//----------------------------------------------------------------------------- +bool CJobMgr::BYieldIfNeeded( CJob &job, bool *pbYielded ) +{ + if ( pbYielded ) + *pbYielded = false; + + if ( job.GetMicrosecondsRun() > ( k_cMicroSecTaskGranularity / 2 ) ) + { + bool bRet = BYield( job ); + if ( pbYielded ) + *pbYielded = bRet; + return bRet; + } + + return true; +} + + +//----------------------------------------------------------------------------- +// Purpose: Resumes jobs in list passed in that are ready to be awakened +//----------------------------------------------------------------------------- +bool CJobMgr::BResumeYieldingJobsFromList( CUtlLinkedList<JobYielding_t, int> &listJobsYielding, uint nCurrentIteration, + CLimitTimer &limitTimer ) +{ + while ( listJobsYielding.Count() ) + { + int iJobYielding = listJobsYielding.Head(); + const JobYielding_t &jobYielding = listJobsYielding[ iJobYielding ]; + + if ( jobYielding.m_nIteration > nCurrentIteration ) + break; + + // pop the sleep off the top of the queue + int iJob = m_MapJob.Find( jobYielding.m_JobID ); + listJobsYielding.Remove( iJobYielding ); + + if ( m_MapJob.InvalidIndex() == iJob ) + continue; + + Assert( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonYield ); + + // Should never be false, but if it is we + // don't want to do anything to this job + if ( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonYield ) + { + // resume the job + m_MapJob[iJob]->Continue(); + } + + if ( limitTimer.BLimitReached() ) + break; + } + + return ( listJobsYielding.Count() > 0 ); +} + + +//----------------------------------------------------------------------------- +// Purpose: Resumes any jobs that have are ready to be awaken +// Input: limitTimer - limit timer not to exceed +// Output: true if there is still work remaining to do, false otherwise +//----------------------------------------------------------------------------- +bool CJobMgr::BResumeYieldingJobs( CLimitTimer &limitTimer ) +{ + return BResumeYieldingJobsFromList( m_ListJobsYieldingRegPri, m_nCurrentYieldIterationRegPri++, limitTimer ); +} + + +//----------------------------------------------------------------------------- +// Purpose: Resumes any jobs that have are ready to be awaken +// Input: limitTimer - limit timer not to exceed +// Output: true if there is still work remaining to do, false otherwise +//----------------------------------------------------------------------------- +bool CJobMgr::BResumeSleepingJobs( CLimitTimer &limitTimer ) +{ + while ( m_QueueJobSleeping.Count() ) + { + const JobSleeping_t &jobSleeping = m_QueueJobSleeping.ElementAtHead(); + if ( jobSleeping.m_SWakeupTime.LTime() > CJobTime::LJobTimeCur() ) + { + // Check if we need to heartbeat + if ( jobSleeping.m_STimeTouched.CServerMicroSecsPassed() >= k_cMicroSecJobHeartbeat ) + { + int iJob = m_MapJob.Find( jobSleeping.m_JobID ); + if ( m_MapJob.InvalidIndex() != iJob ) + { + m_MapJob[iJob]->Heartbeat(); + } + } + + return false; + } + + // pop the sleep off the top of the queue + int iJob = m_MapJob.Find( jobSleeping.m_JobID ); + m_QueueJobSleeping.RemoveAtHead(); + + if ( m_MapJob.InvalidIndex() == iJob ) + continue; + + Assert( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonSleepForTime ); + + // should never be false, but if it is we don't want to do anything to this job + if ( m_MapJob[iJob]->GetPauseReason() == k_EJobPauseReasonSleepForTime ) + { + // resume the job + m_MapJob[iJob]->Continue(); + } + + if ( limitTimer.BLimitReached() ) + break; + } + + return ( m_QueueJobSleeping.Count() > 0 ); +} + + +//----------------------------------------------------------------------------- +// Purpose: comparison function for sorting sleeping jobs list by time +// Output : Returns true on if lhs is greater than the rhs +//----------------------------------------------------------------------------- +bool CJobMgr::JobSleepingLessFunc( JobSleeping_t const &lhs, JobSleeping_t const &rhs ) +{ + // a lower time is a higher priority + return ( lhs.m_SWakeupTime.LTime() > rhs.m_SWakeupTime.LTime() ); +} + +JobID_t g_DebugJob = k_GIDNil; + +//----------------------------------------------------------------------------- +// Purpose: quickly iterates the list of jobs to make sure none have been paused +// for too long +//----------------------------------------------------------------------------- +void CJobMgr::CheckForJobTimeouts( CLimitTimer &limitTimer ) +{ + // look through each active jobs + // remove from the list any job that has successfully received it's I/O + // send a failure msg to any job that has timed out + // since the timeout time is constant, we only have to check until we find a job + + int cIter = 0; + while ( m_ListJobTimeouts.Head() != m_ListJobTimeouts.InvalidIndex() ) + { + cIter ++; + + // Break if limit timer is reached and we've already processed at least one item. + if ( cIter > 1 && limitTimer.BLimitReached() ) + break; + + JobTimeout_t &jobtimeout = m_ListJobTimeouts[ m_ListJobTimeouts.Head() ]; + + // see if it's timed out + if ( !m_bIsShuttingDown && jobtimeout.m_STimeTouched.CServerMicroSecsPassed() < k_cMicroSecJobHeartbeat ) + { + // we haven't reached our recycle or timeout limit, which means none of the jobs passed us in the queue would have either + break; + } + + // get the first job in the list, which is the most likely to have timed out + int iJob = m_MapJob.Find( jobtimeout.m_JobID ); + if ( m_MapJob.InvalidIndex() == iJob ) + { + m_MapJobTimeoutsIndexByJobID.Remove( jobtimeout.m_JobID ); + m_ListJobTimeouts.Remove( m_ListJobTimeouts.Head() ); + continue; + } + + // job still exists, make sure it is still paused at the same point + CJob *pJob = m_MapJob[iJob]; + + if ( pJob->GetTimeSwitched().LTime() == jobtimeout.m_STimePaused.LTime() ) + { + jobtimeout.m_cHeartbeatsBeforeTimeout--; + + if ( pJob->GetJobID() == g_DebugJob ) + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Heartbeat!\n" ); + } + + // Always heartbeat so anyone waiting on the job (say on another server) will know it is still alive + // Note that we even do this right before we timeout, since the job will actually be continued and may just loop itself right back into this waiting state + // Note also that we do NOT check pJob->GetNextHeartbeatTime() since we've already been watching our own timer + pJob->Heartbeat(); + + if ( m_bIsShuttingDown || jobtimeout.m_cHeartbeatsBeforeTimeout <= 0 ) + { + // Job finished all its available heartbeats before its timeout limit, timeout if appropriate and remove from the list + + m_MapJobTimeoutsIndexByJobID.Remove( jobtimeout.m_JobID ); + m_ListJobTimeouts.Remove( m_ListJobTimeouts.Head() ); + + bool bShouldTimeout = true; + switch ( pJob->m_ePauseReason ) + { + case k_EJobPauseReasonWaitingForLock: + case k_EJobPauseReasonYield: + case k_EJobPauseReasonSQL: + bShouldTimeout = false; + break; + case k_EJobPauseReasonSleepForTime: + bShouldTimeout = m_bIsShuttingDown; + break; + } // switch + + // If the job WAS waiting on IO but now is waiting on a Lock, Sleeping, + // or Yielding, don't time it out. + // BUGBUG taylor we should fix things so that we can timeout Jobs waiting on + // Locks and have them properly unlink themselves from the Lock chain + if ( bShouldTimeout ) + { + TimeoutJob( *( pJob ) ); + } + } + else + { + // Job has not yet used up all its available heartbeats before its timeout limit + // We've already decremented its m_cHeartbeatsBeforeTimeout, now Reset its touched time too + jobtimeout.m_STimeTouched.SetToJobTime(); + // Move it back to the end of the queue so it can come back up to the top for either another heartbeat or a timeout + m_ListJobTimeouts.LinkToTail( m_ListJobTimeouts.Head() ); + int iIndexMap = m_MapJobTimeoutsIndexByJobID.Find( jobtimeout.m_JobID ); + if ( iIndexMap != m_MapJobTimeoutsIndexByJobID.InvalidIndex() ) + { + int &iListIndex = m_MapJobTimeoutsIndexByJobID.Element( iIndexMap ); + iListIndex = m_ListJobTimeouts.Tail(); + } + else + { + AssertMsg( false, "Map of jobs to timeout is corrupted" ); + } + } + + continue; + } + else + { + // This is really the common heartbeating case, where the job waited a short while without ever reaching the k_cMicroSecJobHeartbeat limit + // Thus, we need to heartbeat before removing it from the list IF the job has gone too long without heartbeating + if ( pJob->BJobNeedsToHeartbeat() ) + { + pJob->Heartbeat(); + } + + // Since the job didn't actually time out, clear this timeout event + m_MapJobTimeoutsIndexByJobID.Remove( jobtimeout.m_JobID ); + m_ListJobTimeouts.Remove( m_ListJobTimeouts.Head() ); + } + } +} + + +//----------------------------------------------------------------------------- +// Purpose: Continues a job in a timed out state +//----------------------------------------------------------------------------- +void CJobMgr::TimeoutJob( CJob &job ) +{ + + if ( job.GetPauseReason() == k_EJobPauseReasonNetworkMsg ) + job.m_flags.m_bits.m_bTimeoutNetMsg = true; + else + { + // these are so rare I dont want to add a column for them in the rollup + EG_WARNING( SPEW_JOB, "Resuming job '%s (id: %lld)' due to timeout while paused for %s\n", job.GetName(), + job.GetJobID(), job.GetPauseReasonDescription() ); + job.m_flags.m_bits.m_bTimeoutOther = true; + } + + m_JobStats.m_cJobsTimedOut++; + m_bJobTimedOut = true; + job.Continue(); + m_bJobTimedOut = false; +} + + +//----------------------------------------------------------------------------- +// Purpose: wakes up a job that was waiting on a lock +//----------------------------------------------------------------------------- +void CJobMgr::WakeupLockedJob( CJob &job ) +{ + Assert( job.m_ePauseReason == k_EJobPauseReasonWaitingForLock ); + + // in case of error, bug out now so as not + // to cause more trouble + if ( job.m_ePauseReason != k_EJobPauseReasonWaitingForLock ) + { + return; + } + + // insert the job into the yielding list so it will wakeup next Run + AddToYieldList( job ); + + // reset the sleep reason + job.m_ePauseReason = k_EJobPauseReasonYield; +} + + +//----------------------------------------------------------------------------- +// Purpose: Pauses a job, and puts it in a list to check for timeouts +//----------------------------------------------------------------------------- +void CJobMgr::PauseJob( CJob &job, EJobPauseReason eJobPauseReason ) +{ + Assert( !m_bDebugDisallowPause ); + if ( m_bDebugDisallowPause ) + { + EmitError( SPEW_GC, "Job %s attempted to pause even though pauses were disabled\n", job.GetName() ); + } + + // add to list to check for timeouts later (or update the existing entry if it is already there) + JobTimeout_t *pJobTimeout; + int iMapIndex = m_MapJobTimeoutsIndexByJobID.Find( job.GetJobID() ); + if ( iMapIndex == m_MapJobTimeoutsIndexByJobID.InvalidIndex() ) + { + pJobTimeout = &m_ListJobTimeouts[ m_ListJobTimeouts.AddToTail() ]; + m_MapJobTimeoutsIndexByJobID.Insert( job.GetJobID(), m_ListJobTimeouts.Tail() ); + } + else + { + // There was an existing entry, in addition to updating it, move it to the tail + int &iListIndex = m_MapJobTimeoutsIndexByJobID.Element( iMapIndex ); + m_ListJobTimeouts.LinkToTail( iListIndex ); + iListIndex = m_ListJobTimeouts.Tail(); + + pJobTimeout = &m_ListJobTimeouts.Element( iListIndex ); + } + + pJobTimeout->m_JobID = job.GetJobID(); + pJobTimeout->m_STimePaused.SetToJobTime(); + pJobTimeout->m_STimeTouched.SetToJobTime(); + pJobTimeout->m_cHeartbeatsBeforeTimeout = job.CHeartbeatsBeforeTimeout(); + if ( eJobPauseReason == k_EJobPauseReasonWorkItem ) + { + // work items control their own schedule - wait up to 6 hours + pJobTimeout->m_cHeartbeatsBeforeTimeout = (6 * 60 * 60 * k_nMillion) / k_cMicroSecJobHeartbeat; + } + + if ( pJobTimeout->m_cHeartbeatsBeforeTimeout <= 0 ) + { + pJobTimeout->m_cHeartbeatsBeforeTimeout = k_cJobHeartbeatsBeforeTimeoutDefault; + } + + // tell the job to pause + job.Pause( eJobPauseReason ); +} + +//----------------------------------------------------------------------------- +// Purpose: dumps a list of currently active jobs to the console +// Output : int - number of jobs listed +//----------------------------------------------------------------------------- +int CJobMgr::DumpJobSummary() +{ + CUtlMap< uint32, JobStatsBucket_t, int > mapStatsBucket( 0, 0, DefLessFunc( uint32 ) ); + + FOR_EACH_MAP_FAST( m_MapJob, i ) + { + CJob &job = *m_MapJob[i]; + + // the pointer to the name is a pointer to a constant string + // so use this dirty trick to make lookups fast + uint32 eBucket = (uint32)job.GetName(); + int iBucket = mapStatsBucket.Find( eBucket ); + if ( iBucket == mapStatsBucket.InvalidIndex() ) + { + iBucket = mapStatsBucket.Insert( eBucket ); + V_strcpy_safe( mapStatsBucket[iBucket].m_rgchName, job.GetName() ); + } + + JobStatsBucket_t *pJobStatsBucket = &mapStatsBucket[iBucket]; + pJobStatsBucket->m_cCompletes++; // overloading this to really mean "jobs running" for this spew + pJobStatsBucket->m_cLocksAttempted += job.m_vecLocks.Count(); // overloading this to really be used for "locks held" for this spew + pJobStatsBucket->m_u64JobDuration += job.m_STimeStarted.CServerMicroSecsPassed(); + + switch ( job.m_ePauseReason ) + { + case k_EJobPauseReasonNetworkMsg: pJobStatsBucket->m_cPauseReasonNetworkMsg++; break; + case k_EJobPauseReasonSleepForTime: pJobStatsBucket->m_cPauseReasonSleepForTime++; break; + case k_EJobPauseReasonWaitingForLock: pJobStatsBucket->m_cPauseReasonWaitingForLock++; break; + case k_EJobPauseReasonYield: pJobStatsBucket->m_cPauseReasonYield++; break; + case k_EJobPauseReasonSQL: pJobStatsBucket->m_cPauseReasonSQL++; break; + case k_EJobPauseReasonWorkItem: pJobStatsBucket->m_cPauseReasonWorkItem++; break; + default: break; + } + } + + + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, + "%50s --- running jobs (usec)-- -- locks held -- ----- pause reasons ---------------------------------\n", " " ); + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, + "%50s count aveduration netmsg sql sleep waitlock yield workitem\n", "name" ); + + JobProfileStats_t jobprofilestats; + jobprofilestats.m_iJobProfileSort = k_EJobProfileSortOrder_Count; + jobprofilestats.pmapStatsBucket = &mapStatsBucket; + + CUtlVector<int> vecSort( 0, mapStatsBucket.Count() ); + FOR_EACH_MAP_FAST( mapStatsBucket, iBucket ) + { + vecSort.AddToTail( iBucket ); + } + V_qsort_s( vecSort.Base(), vecSort.Count(), sizeof(int), (QSortCompareFuncCtx_t)ProfileSortFunc, &jobprofilestats ); + + FOR_EACH_VEC( vecSort, iVec ) + { + JobStatsBucket_t &bucket = mapStatsBucket[ vecSort[iVec] ]; + + int64 msecDurationAve = bucket.m_u64JobDuration / bucket.m_cCompletes; + + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%50s %8lld %16lld %13lld %11lld %8lld %8lld %8lld %8lld %8lld \n", + bucket.m_rgchName, + bucket.m_cCompletes, + msecDurationAve, + bucket.m_cLocksAttempted, + + bucket.m_cPauseReasonNetworkMsg, + bucket.m_cPauseReasonSQL, + bucket.m_cPauseReasonSleepForTime, + bucket.m_cPauseReasonWaitingForLock, + bucket.m_cPauseReasonYield, + bucket.m_cPauseReasonWorkItem + ); + } + + return m_MapJob.Count(); +} + + +//----------------------------------------------------------------------------- +// Purpose: spews details about a job by ID +//----------------------------------------------------------------------------- +void CJobMgr::DumpJob( JobID_t jobID, int nPrintLocksMax ) const +{ + const CJob *pJob = GetPJob( jobID ); + if( !pJob ) + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Invalid job ID %llu\n", jobID ); + } + else + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%llu\t%12s %12s\n", + pJob->GetJobID(), + pJob->GetName(), + pJob->GetPauseReasonDescription() ); + + if ( pJob->GetPauseReason() == k_EJobPauseReasonWaitingForLock && pJob->m_pWaitingOnLock != NULL ) + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "\tWaiting for lock %s from: %s line %d\n", pJob->m_pWaitingOnLock->GetName(), pJob->m_pWaitingOnLockFilename, pJob->m_waitingOnLockLine ); + pJob->m_pWaitingOnLock->Dump( "\t ", nPrintLocksMax, true ); + } + + FOR_EACH_VEC( pJob->m_vecLocks, nLock ) + { + CLock *pLock = pJob->m_vecLocks[nLock]; + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "\tHolding lock %s:\n", pLock->GetName() ); + pLock->Dump( "\t ", nPrintLocksMax, true ); + } + } +} + + +//----------------------------------------------------------------------------- +// Purpose: count the number of active jobs +//----------------------------------------------------------------------------- +int CJobMgr::CountJobs() const +{ + return m_MapJob.Count(); +} + + +//----------------------------------------------------------------------------- +// Purpose: verify that current thread is correct +//----------------------------------------------------------------------------- +void CJobMgr::CheckThreadID() +{ + uint unCurrentThread = ThreadGetCurrentId(); + + if ( m_unFrameFuncThreadID == 0 ) + { + m_unFrameFuncThreadID = unCurrentThread; + } + else + { + // if this Assert goes of, you most likely tried to start + // a job from a different thread then the frame function thread + Assert( m_unFrameFuncThreadID == unCurrentThread ); + } +} + + +//----------------------------------------------------------------------------- +// Purpose: JobType_t comparer, used to sort the list of registered +// jobs into a tree by msg that creates them +//----------------------------------------------------------------------------- +bool JobTypeSortFuncByMsg( JobType_t const * const &lhs, JobType_t const * const &rhs ) +{ + if ( lhs->m_eCreationMsg == rhs->m_eCreationMsg ) + { + return ( lhs->m_eServerType < rhs->m_eServerType ); + } + + return ( lhs->m_eCreationMsg < rhs->m_eCreationMsg ); +} + + +//----------------------------------------------------------------------------- +// Purpose: JobType_t comparer, used to sort the list of registered +// jobs into a tree by job name +//----------------------------------------------------------------------------- +bool JobTypeSortFuncByName( JobType_t const * const &lhs, JobType_t const * const &rhs ) +{ + int iCompare = Q_strcmp( lhs->m_pchName, rhs->m_pchName ); + if ( iCompare == 0 ) + { + return ( lhs->m_eServerType < rhs->m_eServerType ); + } + + return ( iCompare < 0 ); +} + + +// singeton accessor to list of registered jobs +CUtlRBTree<const JobType_t *> &GMapJobTypesByMsg() +{ + static CUtlRBTree<const JobType_t *> s_MapJobTypes( 0, 0, JobTypeSortFuncByMsg ); + return s_MapJobTypes; +} + +// singeton accessor to list of registered jobs +CUtlRBTree<const JobType_t *> &GMapJobTypesByName() +{ + static CUtlRBTree<const JobType_t *> s_MapJobTypes( 0, 0, JobTypeSortFuncByName ); + return s_MapJobTypes; +} + + +//----------------------------------------------------------------------------- +// Purpose: adds a new type of job into the global list +//----------------------------------------------------------------------------- +void CJobMgr::RegisterJobType( const JobType_t *pJobType ) +{ + Assert( pJobType->m_pchName != NULL ); + Assert( pJobType->m_pJobFactory != NULL ); + GMapJobTypesByMsg().Insert( pJobType ); + GMapJobTypesByName().Insert( pJobType ); +} + + +//----------------------------------------------------------------------------- +// Purpose: Creates a new job from the network msg +// Input : *pServerParent - server to attach job to +// msg - network msg +// Output : true if a job was created +//----------------------------------------------------------------------------- +bool CJobMgr::BLaunchJobFromNetworkMsg( void *pParent, const JobMsgInfo_t &jobMsgInfo, IMsgNetPacket *pNetPacket ) +{ + if ( pNetPacket == NULL ) + { + AssertMsg(pNetPacket, "CJobMgr::BLaunchJobFromNetworkMsg received NULL packet."); + return false; + } + + if ( pNetPacket->BHasTargetJobName() && BIsValidSystemMsg( pNetPacket->GetEMsg(), NULL ) ) + { + JobType_t jobSearch = { pNetPacket->GetTargetJobName(), k_EGCMsgInvalid, jobMsgInfo.m_eServerType }; + int iJobType = GMapJobTypesByName().Find( &jobSearch ); + + if ( GMapJobTypesByName().IsValidIndex( iJobType ) ) + { + + // Get shortcut to job info + const JobType_t *pJobType = (GMapJobTypesByName())[iJobType]; + Assert( pJobType ); + Assert( pJobType->m_pchName ); + + // Create the job + CJob *job = pJobType->m_pJobFactory( pParent, NULL ); + + // Safety check + if ( job == NULL ) + { + AssertMsg1( job, "Job factory returned NULL for job named '%s'!\n", pJobType->m_pchName ); + return false; + } + + // Start the job + job->StartJobFromNetworkMsg( pNetPacket, jobMsgInfo.m_JobIDSource ); + return true; + } + } + else + { + JobType_t jobSearch = { 0, jobMsgInfo.m_eMsg, jobMsgInfo.m_eServerType }; + int iJobType = GMapJobTypesByMsg().Find( &jobSearch ); + + if ( GMapJobTypesByMsg().IsValidIndex( iJobType ) ) + { + + // Get shortcut to job info + const JobType_t *pJobType = (GMapJobTypesByMsg())[iJobType]; + Assert( pJobType ); + Assert( pJobType->m_pchName ); + + // Create the job + CJob *job = pJobType->m_pJobFactory( pParent, NULL ); + + // Safety check + if ( job == NULL ) + { + AssertMsg3( job, "Job factory returned NULL for job msg %d, server type %d (named '%s')!\n", (int)jobMsgInfo.m_eMsg, (int)jobMsgInfo.m_eServerType, pJobType->m_pchName ); + return false; + } + + // Start the job + job->StartJobFromNetworkMsg( pNetPacket, jobMsgInfo.m_JobIDSource ); + return true; + } + } + + return false; +} + + +//----------------------------------------------------------------------------- +// Purpose: profile sort func +//----------------------------------------------------------------------------- +int CJobMgr::ProfileSortFunc( void *pCtx, const int *lhs, const int *rhs ) +{ + JobProfileStats_t *pJobprofilestats = (JobProfileStats_t *)pCtx; + int64 d = 0; + switch ( pJobprofilestats->m_iJobProfileSort ) + { + default: + case k_EJobProfileSortOrder_Alpha: + return Q_stricmp( pJobprofilestats->pmapStatsBucket->Element(*lhs).m_rgchName, + pJobprofilestats->pmapStatsBucket->Element(*rhs).m_rgchName ); + case k_EJobProfileSortOrder_Count: + d = ((int64)pJobprofilestats->pmapStatsBucket->Element(*rhs).m_cCompletes - + (int64)pJobprofilestats->pmapStatsBucket->Element(*lhs).m_cCompletes); + break; + case k_EJobProfileSortOrder_TotalRuntime: + d = ((int64)pJobprofilestats->pmapStatsBucket->Element(*rhs).m_u64RunTime - + (int64)pJobprofilestats->pmapStatsBucket->Element(*lhs).m_u64RunTime); + break; + } + if ( d < 0 ) + return -1; + if ( d > 0 ) + return 1; + return 0; +} + + +//----------------------------------------------------------------------------- +// Purpose: dump out accumulated job profile data +//----------------------------------------------------------------------------- +void CJobMgr::ProfileJobs( EJobProfileAction ejobProfileAction, EJobProfileSortOrder iSortOrder ) +{ + bool bClearBuckets = false; + + if ( ejobProfileAction == k_EJobProfileAction_Start ) + { + if ( !m_bProfiling ) + { + bClearBuckets = true; + } + m_bProfiling = true; + } + else if ( ejobProfileAction == k_EJobProfileAction_Stop ) + { + m_bProfiling = false; + } + else if ( ejobProfileAction == k_EJobProfileAction_Clear ) + { + bClearBuckets = true; + } + + if ( bClearBuckets ) + { + m_mapStatsBucket.RemoveAll(); + } + + if ( k_EJobProfileAction_Dump != ejobProfileAction ) + return; + + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, + "%44s --- completed jobs (usec)---------------------------------- ------ lock counts---------------------------------- ------ failures -----------\n", " " ); + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, + "%44s count averuntime maxruntime aveduration #yielded attempted waited failed longheld longwait wait-t/o t/o-msg jobfailed longslice\n", "name" ); + + JobProfileStats_t jobprofilestats; + jobprofilestats.m_iJobProfileSort = iSortOrder; + jobprofilestats.pmapStatsBucket = &m_mapStatsBucket; + + CUtlVector<int> vecSort( 0, m_mapStatsBucket.Count() ); + FOR_EACH_MAP_FAST( m_mapStatsBucket, iBucket ) + { + vecSort.AddToTail( iBucket ); + } + V_qsort_s( vecSort.Base(), vecSort.Count(), sizeof(int), (QSortCompareFuncCtx_t)ProfileSortFunc, &jobprofilestats ); + + FOR_EACH_VEC( vecSort, iVec ) + { + JobStatsBucket_t &bucket = m_mapStatsBucket[ vecSort[iVec] ]; + if ( bucket.m_cCompletes ) + { + CCycleCount ccRunTime( bucket.m_u64RunTime / bucket.m_cCompletes ); + int64 usecAve = ccRunTime.GetMicroseconds(); + + CCycleCount ccRunTimeMax( bucket.m_u64RunTimeMax ); + int64 usecMax = ccRunTimeMax.GetMicroseconds(); + + int64 msecDurationAve = bucket.m_u64JobDuration / bucket.m_cCompletes; + + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%44s %12lld %12lld %12lld %12lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld %8lld\n", + bucket.m_rgchName, + bucket.m_cCompletes, + usecAve, + usecMax, + msecDurationAve, + bucket.m_cJobsPaused, + bucket.m_cLocksAttempted, + bucket.m_cLocksWaitedFor, + bucket.m_cLocksFailed, + bucket.m_cLocksLongHeld, + bucket.m_cLocksLongWait, + bucket.m_cWaitTimeout, + bucket.m_cTimeoutNetMsg, + bucket.m_cJobsFailed, + bucket.m_cLongInterYieldTime ); + } + } + if ( m_mapOrphanMessages.Count() ) + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Messages that arrived responding to jobs that no longer exists and were dropped\n" ); + FOR_EACH_MAP_FAST( m_mapOrphanMessages, iBucket ) + { + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "%44s %12d\n", PchMsgNameFromEMsg( m_mapOrphanMessages.Key(iBucket) ), m_mapOrphanMessages[iBucket] ); + } + m_mapOrphanMessages.RemoveAll(); + } +} + +//----------------------------------------------------------------------------- +// Purpose: Dump a list of all jobs to the console +// Each job is indexed, and that index can be used with +// DebugJob() to cause a debug break in that job. +//----------------------------------------------------------------------------- +void CJobMgr::DumpJobs( const char *pszJobName, int nMax, int nPrintLocksMax ) const +{ + FOR_EACH_MAP_FAST( m_MapJob, iJob ) + { + if ( nMax <= 0 ) + break; + nMax--; + + if ( pszJobName == NULL || V_strcmp( pszJobName, m_MapJob[iJob]->GetName() ) == 0 ) + { + DumpJob( m_MapJob.Key(iJob), nPrintLocksMax ); + } + } + EmitInfo( SPEW_CONSOLE, SPEW_ALWAYS, LOG_ALWAYS, "Total job count: %d\n", m_MapJob.Count() ); +} + + +//----------------------------------------------------------------------------- +// Purpose: cause a debug break in the given job +//----------------------------------------------------------------------------- +void CJobMgr::DebugJob( int iJob ) +{ +#ifdef DEBUG_JOB_LIST + if ( sm_listAllJobs.IsValidIndex( iJob ) ) + { + sm_listAllJobs[iJob]->Debug(); + } + else + { + EmitInfo( SPEW_CONSOLE, 1, 1, "Job not found\n" ); + } +#else + EmitInfo( SPEW_CONSOLE, 1, 1, "Job debugging disabled\n" ); +#endif +} + + +#ifdef DBGFLAG_VALIDATE +//----------------------------------------------------------------------------- +// Purpose: Run a global validation pass on all of our data structures and memory +// allocations. +// Input: validator - Our global validator object +// pchName - Our name (typically a member var in our container) +//----------------------------------------------------------------------------- +void CJobMgr::Validate( CValidator &validator, const char *pchName ) +{ + VALIDATE_SCOPE(); + + ValidateObj( m_MapJob ); + FOR_EACH_MAP_FAST( m_MapJob, iJob ) + { + ValidatePtr( m_MapJob[iJob] ); + } + + ValidateObj( m_mapStatsBucket ); + FOR_EACH_MAP_FAST( m_mapStatsBucket, iBucket ) + { + ValidateObj( m_mapStatsBucket[iBucket] ); + } + + ValidateObj( m_ListJobsYieldingRegPri ); + ValidateObj( m_ListJobTimeouts ); + ValidateObj( m_MapJobTimeoutsIndexByJobID ); + ValidateObj( m_QueueJobSleeping ); + ValidateObj( m_WorkThreadPool ); +} + + +//----------------------------------------------------------------------------- +// Purpose: Run a global validation pass on all of our global data +// Input: validator - Our global validator object +//----------------------------------------------------------------------------- +void CJobMgr::ValidateStatics( CValidator &validator, const char *pchName ) +{ + VALIDATE_SCOPE_STATIC( "CJobMgr class statics" ); + + ValidateObj( GMapJobTypesByMsg() ); + ValidateObj( GMapJobTypesByName() ); +#ifdef DEBUG_JOB_LIST + ValidateObj( sm_listAllJobs ); +#endif +} +#endif // DBGFLAG_VALIDATE + +} // namespace GCSDK |