From 3bf9df6b2785fa6d951086978a3e66f49427166a Mon Sep 17 00:00:00 2001 From: FluorescentCIAAfricanAmerican <0934gj3049fk@protonmail.com> Date: Wed, 22 Apr 2020 12:56:21 -0400 Subject: 1 --- gcsdk/workthreadpool.cpp | 751 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 751 insertions(+) create mode 100644 gcsdk/workthreadpool.cpp (limited to 'gcsdk/workthreadpool.cpp') diff --git a/gcsdk/workthreadpool.cpp b/gcsdk/workthreadpool.cpp new file mode 100644 index 0000000..95294cb --- /dev/null +++ b/gcsdk/workthreadpool.cpp @@ -0,0 +1,751 @@ +//========= Copyright Valve Corporation, All rights reserved. ============// +// +// Purpose: +// +// $NoKeywords: $ +//============================================================================= + + +#include "stdafx.h" +#include "tslist.h" +#include +#include + +#include "tier0/memdbgon.h" + +namespace GCSDK { + +IWorkThreadPoolSignal *CWorkThreadPool::sm_pWorkItemsCompletedSignal = NULL; + +//----------------------------------------------------------------------------- +// Purpose: CWorkThread constructors +//----------------------------------------------------------------------------- +CWorkThread::CWorkThread( CWorkThreadPool *pThreadPool ) +: m_pThreadPool( pThreadPool ), + m_bExitThread( false ), + m_bFinished( false ) +{ +} + +CWorkThread::CWorkThread( CWorkThreadPool *pThreadPool, const char *pszName ) +: m_pThreadPool( pThreadPool ), + m_bExitThread( false ), + m_bFinished( false ) +{ + SetName( pszName ); +} + +//----------------------------------------------------------------------------- +// Purpose: Tell work thread pool not to set event on every item added (SetEvent is very expensive) +//----------------------------------------------------------------------------- +void CWorkThreadPool::SetNeverSetEventOnAdd( bool bNeverSet ) +{ + bool bWasSet = m_bNeverSetOnAdd; + m_bNeverSetOnAdd = bNeverSet; + + // In case of disabling set right away to make sure if we have pending work we execute it now with no latency + if ( bWasSet && !m_bNeverSetOnAdd ) + m_EventNewWorkItem.Set(); +} + + + +//----------------------------------------------------------------------------- +// Purpose: performs the work loop for the thread, waits for work, +// notifies the owner (the pool) as it completes work and before it exits +//----------------------------------------------------------------------------- +int CWorkThread::Run() +{ + // manage our thread pool's statistics + ++m_pThreadPool->m_cThreadsRunning; + +#ifdef _SERVER + g_CompletionPortManager.AssociateCallingThreadWithIOCP(); +#endif + + OnStart(); + +#if 0 // need to port over new vprof code +#if defined( VPROF_ENABLED ) + CVProfile *pProfile = GetVProfProfileForCurrentThread(); +#endif +#endif + + CWorkThreadPool *pPool = m_pThreadPool; + + int nIterations = 0; + const int nMaxFastIterations = 4; + while ( !m_bExitThread ) + { +#if 0 // game vprof doesn't yet support TLS'd vprof instances, until new vprof code is ported +#if defined( VPROF_ENABLED ) + if ( pProfile ) + pProfile->MarkFrame( GetName() ); +#endif +#endif + pPool->m_cActiveThreads++; + + nIterations = 0; + while ( (pPool->BNeverSetEventOnAdd() && nIterations < nMaxFastIterations) || nIterations == 0 ) + { + // process any items which have arrived + CWorkItem *pWorkItem = pPool->GetNextWorkItemToProcess( ); + while ( pWorkItem ) + { +#if 0 + pPool->m_StatWaitTime.Update( pWorkItem->WaitingTime() ); +#endif + if ( pWorkItem->HasTimedOut() ) + { + pWorkItem->m_bCanceled = true; + } + else + { + // call the work item to do its work + pWorkItem->m_bCanceled = false; + + CFastTimer fastTimer; + fastTimer.Start(); + pWorkItem->m_bRunning = true; + bool bSuccess = pWorkItem->ThreadProcess( this ); + pWorkItem->m_bRunning = false; + fastTimer.End(); + CCycleCount cycleCount = fastTimer.GetDuration(); + pWorkItem->SetCycleCount(cycleCount); +#if 0 + pPool->m_StatExecutionTime.Update( cycleCount.GetUlMicroseconds() ); +#endif + if ( bSuccess ) + pPool->m_cSuccesses ++; + else + pWorkItem->m_bResubmit ? pPool->m_cRetries++ : pPool->m_cFailures++; + } + + // do we need to resubmit this item? + if ( pWorkItem->m_bResubmit ) + { + pWorkItem->m_bResubmit = false; + pWorkItem->m_bCanceled = false; + // put it at the tail of the incoming queue + pPool->AddWorkItem( pWorkItem ); + pWorkItem->Release(); // dec since AddWorkItem added 1 more again + } + else + { + // put it in the outgoing queue + pPool->OnWorkItemCompleted( pWorkItem ); + } + + // If we are flagged as exiting don't try to get more work, we need to exit right away and orphan the work + // to avoid blocking shutdown. + if ( !m_bExitThread ) + { + // get the next work item (if any) + pWorkItem = pPool->GetNextWorkItemToProcess( ); + } + else + { + pWorkItem = NULL; + } + +#if 0 // game vprof doesn't yet support TLS'd vprof instances, until new vprof code is ported +#if defined( VPROF_ENABLED ) + if ( pProfile && pWorkItem ) + pProfile->MarkFrame( GetName() ); +#endif +#endif + } + + if ( m_bExitThread ) + break; + + ++nIterations; + if ( pPool->BNeverSetEventOnAdd() && nIterations < nMaxFastIterations ) + { + VPROF_BUDGET( "CWorkThread -- Sleep", VPROF_BUDGETGROUP_SLEEPING ); + ThreadSleep( 2 ); + } + } + + pPool->m_cActiveThreads--; + + // wait for a new work item to arrive in the queue, check the counts first just to be sure + { + VPROF_BUDGET( "CWorkThread -- Sleep", VPROF_BUDGETGROUP_SLEEPING ); +#ifdef _SERVER + if ( pPool->BNeverSetEventOnAdd() ) + pPool->m_EventNewWorkItem.Wait( 15 ); + else + pPool->m_EventNewWorkItem.Wait( 50 ); +#else + pPool->m_EventNewWorkItem.Wait( 50 ); +#endif + } + } + + // Since we are exiting, we must have been signaled to shutdown, and we should signal any remaining threads + // since each signal wakes only one thread. + pPool->m_EventNewWorkItem.Set(); + + m_bFinished = true; + + // updates stats + --m_pThreadPool->m_cThreadsRunning; + + return EXIT_SUCCESS; +} + + +//----------------------------------------------------------------------------- +// Purpose: Construct a new CWorkThreadPool object +//----------------------------------------------------------------------------- +CWorkThreadPool::CWorkThreadPool( const char *pszThreadName ) + : +#if 0 + m_StatWaitTime( 100 ), + m_StatExecutionTime( 100 ), +#endif + m_bThreadsInitialized( false ), + m_cThreadsRunning( 0 ), + m_cActiveThreads( 0 ), + m_bMayHaveJobTimeouts( false ), + m_bExiting( false ), + m_bAutoCreateThreads( false ), + m_cMaxThreads( 0 ), + m_cFailures( 0 ), + m_cSuccesses( 0 ), + m_pWorkThreadConstructor( NULL ), + m_ulLastCompletedSequenceNumber( 0 ), + m_ulLastUsedSequenceNumber( 0 ), + m_ulLastDispatchedSequenceNumber( 0 ), + m_bEnsureOutputOrdering( false ), + m_bNeverSetOnAdd( false ) +{ + Assert( pszThreadName != NULL ); + Q_strncpy( m_szThreadNamePfx, pszThreadName, sizeof( m_szThreadNamePfx ) ); + m_LimitTimerCreateNewThreads.SetLimit( 1 ); + + m_pTSQueueToProcess = new CTSQueue< CWorkItem* >; + m_pTSQueueCompleted = new CTSQueue< CWorkItem* >; +} + + +//----------------------------------------------------------------------------- +// Purpose: destructor; does assertion checks to make sure we weere shut down cleanly +// cleans up even if we weren't cleanly stopped +//----------------------------------------------------------------------------- +CWorkThreadPool::~CWorkThreadPool() +{ + // If you hit this you probably didn't call StopWorkThreads() first + AssertMsg1( ( !m_bThreadsInitialized || m_bExiting ) && 0 == m_cThreadsRunning, + "CWorkThreadPool::~CWorkThreadPool(): Thread pool %s shutdown incorrectly.\n", + m_szThreadNamePfx ); + + if ( m_WorkThreads.Count() ) + { + StopWorkThreads(); + Assert( 0 == m_WorkThreads.Count() ); + } + + Assert( 0 == m_cThreadsRunning ); + + // WARNING: We need to release any items left in the queues + CWorkItem *pWorkItem = NULL; + if ( m_pTSQueueCompleted->Count() > 0 ) + { + EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::~CWorkThreadPool: work complete queue not empty, %d items discarded.\n", m_pTSQueueCompleted->Count() ); + pWorkItem = NULL; + while ( m_pTSQueueCompleted->PopItem( &pWorkItem ) ) + { + while( pWorkItem->Release() ) + { + /* nothing */ + } + } + } + + if ( m_pTSQueueToProcess->Count() > 0 ) + { + EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::~CWorkThreadPool: work processing queue not empty: %d items discarded.\n", m_pTSQueueToProcess->Count() ); + while ( m_pTSQueueToProcess->PopItem( &pWorkItem ) ) + { + while( pWorkItem->Release() ) + { + /* nothing */ + } + } + } + + delete m_pTSQueueToProcess; + delete m_pTSQueueCompleted; +} + + +#if 0 +//----------------------------------------------------------------------------- +// Purpose: estimate the current backlog time using previous execution time, +// the number of outstanding items, and the number of running threads +//----------------------------------------------------------------------------- +uint64 CWorkThreadPool::GetCurrentBacklogTime() const +{ + if ( m_WorkThreads.Count() == 0 ) + return 0; + return ( m_pTSQueueToProcess->Count() * m_StatExecutionTime.GetUlAvg() ) / m_WorkThreads.Count(); +} +#endif + + +int CWorkThreadPool::AddWorkThread( CWorkThread *pThread ) +{ + AUTO_LOCK( m_WorkThreadMutex ); + Assert( pThread ); + return m_WorkThreads.AddToTail( pThread ); +} + + +void CWorkThreadPool::StartWorkThread( CWorkThread *pWorkThread, int iName ) +{ + char rgchThreadName[32]; + Q_snprintf( rgchThreadName, sizeof( rgchThreadName ), "%s:%d", m_szThreadNamePfx, iName ); + pWorkThread->SetName( rgchThreadName ); + if ( !pWorkThread->Start() ) + EmitError( SPEW_THREADS, "CWorkThreadPool::StartWorkThread: Thread creation failed.\n" ); +} + + +void CWorkThreadPool::StartWorkThreads() +{ + m_bThreadsInitialized = true; + if ( 0 == m_WorkThreads.Count() ) + { + EmitWarning( SPEW_THREADS, 2, "CWorkThreadPool::StartWorkThreads: called with no threads in the pool, this is probably a bug.\n" ); + return; + } + m_bExiting = false; + m_cThreadsRunning = 0; + AUTO_LOCK( m_WorkThreadMutex ); + FOR_EACH_VEC( m_WorkThreads, i ) + { + StartWorkThread( m_WorkThreads[i], i ); + } + + // XXX why? + while ( m_cThreadsRunning == (uint) 0 ) + { + ThreadSleep( 1 ); + } +} + + +//----------------------------------------------------------------------------- +// Purpose: stops whatever work threads we're running +// this must be called before the thread pool object is destroyed +//----------------------------------------------------------------------------- +void CWorkThreadPool::StopWorkThreads() +{ + // indicate that we're shutting down; + // don't accept more work in this thread + m_bExiting = true; + + AUTO_LOCK( m_WorkThreadMutex ); + + FOR_EACH_VEC( m_WorkThreads, i ) + { + m_WorkThreads[i]->m_bExitThread = true; + m_WorkThreads[i]->Cancel(); + } + + // loop until all threads are dead + while ( true ) + { + // This thread already holds the mutex; recursive try-lock should always succeed + DbgVerify( BTryDeleteExitedWorkerThreads() ); + + if ( m_WorkThreads.Count() == 0 ) + break; + + // Keep waking up threads until they're all dead. + m_EventNewWorkItem.Set(); + +#ifdef _PS3 + // call to abort any running call to gethostbyname(). + // this is called over all the remaining work threads, while + // waiting for the rest of the work threads to finish so that they won't + // spuriously block on new calls to gethostbyname() as the + // sys_net_abort_resolver call only stops the next call to the + // network API, not any future calls. + + FOR_EACH_VEC( m_WorkThreads, iPS3 ) + { + // PS3 hack to abort gethostbyname() calls that may be blocking... + sys_net_abort_resolver( m_WorkThreads[ iPS3 ]->GetThreadID(), SYS_NET_ABORT_STRICT_CHECK ); + } +#endif + + const uint k_uJoinTimeoutMillisec = 10000; // 10 seconds seems pretty arbitrary. + + CWorkThread *pWorkThread = m_WorkThreads[0]; + bool bJoined = pWorkThread->Join( k_uJoinTimeoutMillisec ); + if ( !bJoined ) + { + // Print thread id as a pointer for cross-platform compatibility + EmitWarning( SPEW_THREADS, 2, "Thread \"%s\" (ID %p) failed to shut down", pWorkThread->GetName(), (void*)pWorkThread->GetThreadID() ); + } + else + { + // Succesful join means that the thread has terminated. + if ( !pWorkThread->m_bFinished ) + { + // This would be a logic error in the thread proc if it ever tripped. + AssertMsg( false, "pWorkThread->m_bFinished is false but thread is not running" ); + // Recover by flagging the thread as potentially eligable for deletion, since it's dead. + pWorkThread->m_bFinished = true; + } + } + } + + Assert( m_WorkThreads.Count() == 0 && m_cThreadsRunning == (uint32) 0 ); +} + + +//----------------------------------------------------------------------------- +// Purpose: sees if we have a non-zero number of work threads, +// or a non-zero number of active threads +//----------------------------------------------------------------------------- +bool CWorkThreadPool::HasWorkItemsToProcess() const +{ + return ( m_pTSQueueToProcess->Count() > 0 ) + || ( m_cActiveThreads > 0 ); +} + + +//----------------------------------------------------------------------------- +// Purpose: sets dynamic thread construction +//----------------------------------------------------------------------------- +void CWorkThreadPool::SetWorkThreadAutoConstruct( int cMaxThreads, IWorkThreadFactory *pWorkThreadConstructor ) +{ + AUTO_LOCK( m_WorkThreadMutex ); + + m_bThreadsInitialized = true; + m_bAutoCreateThreads = true; + m_cMaxThreads = MAX( 1, cMaxThreads ); + m_pWorkThreadConstructor = pWorkThreadConstructor; + + // If we have too many threads now, mark some to exit next time they loop. + for ( int i = m_cMaxThreads; i < m_WorkThreads.Count(); i++ ) + { + m_WorkThreads[i]->m_bExitThread = true; + } +} + + +//----------------------------------------------------------------------------- +// Purpose: Adds a work item +// Output: true if successful, +// false if a low priority work item is not added due to a busy system +// false if this work pool is shutting down and work isn't being accepted +// NOTE: Adding normal priority items should always succeed +//----------------------------------------------------------------------------- +bool CWorkThreadPool::AddWorkItem( CWorkItem *pWorkItem ) +{ + Assert( !m_bExiting ); + if ( m_bExiting ) + return false; + + if ( m_bEnsureOutputOrdering ) + { + AssertMsg( pWorkItem->m_bResubmit == false, "CWorkThreadPool can't support item auto resubmission when ensuring output ordering" ); + } + + // if we're in auto-create mode, make sure we have enough threads running + if ( m_bAutoCreateThreads && m_WorkThreads.Count() < m_cMaxThreads ) + { + int cPendingItems = m_pTSQueueToProcess->Count(); + + // we shouldn't get more than 12 items queued per already existing thread, otherwise we + // want to create a new thread to help us keep up. + if ( m_WorkThreads.Count() < 1 || m_WorkThreads.Count() * 12 < ( cPendingItems + 1 ) ) + { + if ( m_WorkThreads.Count() >= 2 && !m_LimitTimerCreateNewThreads.BLimitReached() ) + { + // Don't create more yet, we don't want to create them too fast + } + else + { + // create another thread + CWorkThread *pWorkThread = NULL; + if ( m_pWorkThreadConstructor ) + { + pWorkThread = m_pWorkThreadConstructor->CreateWorkerThread( this ); + } + else + { + pWorkThread = new CWorkThread( this ); + } + if( pWorkThread != NULL ) + { + int iName = AddWorkThread( pWorkThread ); + StartWorkThread( pWorkThread, iName ); + } + m_LimitTimerCreateNewThreads.SetLimit( 250*k_nThousand ); + } + } + } + + // + // Do we actually have any threads ? If creating threads can fail, then maybe we don't ! + // In that case, this WorkItem is not going to run ! + // + if ( m_WorkThreads.Count() == 0 ) + { + Assert(false); + return false ; + } + + + // WARNING: We need to call pWorkItem AddRef() and Release() at all entry/exit points for the thread pool system. + pWorkItem->AddRef(); + + pWorkItem->m_ulSequenceNumber = (++m_ulLastUsedSequenceNumber); + m_pTSQueueToProcess->PushItem( pWorkItem ); + + if ( !BNeverSetEventOnAdd() && m_cActiveThreads == 0 ) + { + VPROF_BUDGET( "SetEvent()", VPROF_BUDGETGROUP_THREADINGMAIN ); + m_EventNewWorkItem.Set(); + } + + return true; +} + + +CWorkItem *CWorkThreadPool::GetNextCompletedWorkItem( ) +{ + CWorkItem *pWorkItem = NULL; + + // Use a while loop just in case ref counts get screwed up and an item gets deleted when we release our reference to it + while ( m_pTSQueueCompleted->PopItem( &pWorkItem ) ) + { + // WARNING: We need to call workitem AddRef() and Release() at all entry/exit points for the thread pool system. + // Release() returns the current refcount of the object (after decrementing it by one) and should be non-zero unless the + // the caller has released it already. + if ( pWorkItem != NULL && pWorkItem->Release() > 0 ) + { + return pWorkItem; + } + } + + return NULL; +} + + +//----------------------------------------------------------------------------- +// Purpose: gets the next work item to process. This non-blocking function +// returns NULL immediately if there's nothing left in the queue. +// otherwise, a pointer to the next CWorkItem. +//----------------------------------------------------------------------------- +CWorkItem *CWorkThreadPool::GetNextWorkItemToProcess( ) +{ + CWorkItem *pWorkItem = NULL; + + if ( m_pTSQueueToProcess->Count() && m_pTSQueueToProcess->PopItem( &pWorkItem ) ) + { + return pWorkItem; + } + + return NULL; +} + + +bool CWorkThreadPool::BDispatchCompletedWorkItems( const CLimitTimer &limitTimer, CJobMgr *pJobMgr ) +{ + BTryDeleteExitedWorkerThreads(); + + CWorkItem *pWorkItem = GetNextCompletedWorkItem( ); + while ( pWorkItem != NULL ) + { + uint64 ulSequenceNumber = pWorkItem->m_ulSequenceNumber; + // NOTE: despite its name, this YIELDS - the target job + // is resumed, and we resume here. + if ( !pWorkItem->DispatchCompletedWorkItem( pJobMgr ) ) + { + EmitWarning( SPEW_THREADS, 2, "Work Item for Work Pool %s completed but job no longer existed to notify\n", m_szThreadNamePfx == NULL ? "UNKNOWN" :m_szThreadNamePfx ); + AssertMsg1( m_bMayHaveJobTimeouts, "Work Item for Work Pool %s completed but job no longer existed to notify", m_szThreadNamePfx == NULL ? "UNKNOWN" :m_szThreadNamePfx ); + } + + // pWorkItem was released by DispatchCompletedWorkItem + m_ulLastDispatchedSequenceNumber = ulSequenceNumber; + if ( limitTimer.BLimitReached() ) + break; + + pWorkItem = GetNextCompletedWorkItem( ); + } + + return ( GetCompletedWorkItemCount() > 0 ); +} + + +//----------------------------------------------------------------------------- +// Purpose: delete any thread objects that have exited +// we'll make sure the thread has actually ended; +// if they haven't, they'll remain in the threads to delete list +//----------------------------------------------------------------------------- +bool CWorkThreadPool::BTryDeleteExitedWorkerThreads() +{ + if ( m_WorkThreadMutex.TryLock() ) + { + if ( m_cThreadsRunning < (uint) m_WorkThreads.Count() ) + { + FOR_EACH_VEC_BACK( m_WorkThreads, i ) + { + CWorkThread *pWorkThread = m_WorkThreads[i]; + if ( pWorkThread->m_bFinished && !pWorkThread->IsThreadRunning() ) + { + m_WorkThreads.FastRemove( i ); + delete pWorkThread; + } + } + } + m_WorkThreadMutex.Unlock(); + return true; + } + return false; +} + + +bool CWorkItem::DispatchCompletedWorkItem( CJobMgr *pJobMgr ) +{ + // Check if this work item needs to signal a job + if ( pJobMgr && k_GIDNil != m_JobID ) + { + if ( !pJobMgr->BRouteWorkItemCompletedIfExists( m_JobID, m_bCanceled ) ) + return false; + } + else if ( k_GIDNil != m_JobID ) + { + // This should never happen since we have already released our reference to the work item + // and the calling job should have released its ref when it exited + AssertMsg( false, "CWorkItem::DispatchCompletedWorkItem: got a work item with no job ID" ); + } + + return true; +} + + +//----------------------------------------------------------------------------- +// Purpose: Called by the worker thread when it finishes an individual work item +// This function will see if our work is meant to be well-ordred; if so, +// it will do the necessary work to ensure ordering. +// +// It adds the item to the completed work item list so +// the pool owner can retrieve it and checks to see if any threads +// deserve to be shut down. +//----------------------------------------------------------------------------- +void CWorkThreadPool::OnWorkItemCompleted( CWorkItem *pWorkItem ) +{ + if ( sm_pWorkItemsCompletedSignal != NULL ) + sm_pWorkItemsCompletedSignal->Signal(); + + if ( !m_bEnsureOutputOrdering ) + { + // Since we aren't locking this sequence number could get screwed up a bit, but it's + // pretty meaningless if ensure output ordering if off anyway... + m_ulLastCompletedSequenceNumber = pWorkItem->m_ulSequenceNumber; + m_pTSQueueCompleted->PushItem( pWorkItem ); + } + else + { + // In the ordered case we need to lock completely here since we'll be moving around between + // various data structures and also need to ensure the ordering of items in the TS queue + m_MutexOnItemCompletedOrdered.Lock(); + if ( m_ulLastCompletedSequenceNumber + 1 == pWorkItem->m_ulSequenceNumber ) + { + m_ulLastCompletedSequenceNumber = pWorkItem->m_ulSequenceNumber; + m_pTSQueueCompleted->PushItem( pWorkItem ); + + // We walk the vector multiple times, but it should be very short as items are likely to come in + // close to in order, just mixed up a little if we have lots of threads or one item is much more + // costly than others. + bool bFoundNext = false; + do + { + bFoundNext = false; + FOR_EACH_VEC( m_vecCompletedAndWaiting, i ) + { + CWorkItem *pWaiting = m_vecCompletedAndWaiting[i]; + if ( m_ulLastCompletedSequenceNumber + 1 == pWaiting->m_ulSequenceNumber ) + { + m_ulLastCompletedSequenceNumber = pWaiting->m_ulSequenceNumber; + m_pTSQueueCompleted->PushItem( pWaiting ); + m_vecCompletedAndWaiting.FastRemove( i ); + bFoundNext = true; + break; + } + } + } while ( bFoundNext == true ); + } + else + { + m_vecCompletedAndWaiting.AddToTail( pWorkItem ); + } + m_MutexOnItemCompletedOrdered.Unlock(); + } +} + + +//----------------------------------------------------------------------------- +// Purpose: return the count of items we've queued to process +//----------------------------------------------------------------------------- +int CWorkThreadPool::GetWorkItemToProcessCount() const +{ + return m_pTSQueueToProcess->Count(); +} + + +//----------------------------------------------------------------------------- +// Purpose: return the count of items we've completed but not notified the consumer about +//----------------------------------------------------------------------------- +int CWorkThreadPool::GetCompletedWorkItemCount() const +{ + int nCount = m_pTSQueueCompleted->Count(); + return nCount; +} + + +#ifdef DBGFLAG_VALIDATE +//----------------------------------------------------------------------------- +// Purpose: Validates memory +//----------------------------------------------------------------------------- +void CWorkThreadPool::Validate( CValidator &validator, const char *pchName ) +{ + VALIDATE_SCOPE(); + AUTO_LOCK( m_WorkThreadMutex ); + + ValidateObj( m_WorkThreads ); + FOR_EACH_VEC( m_WorkThreads, iWorkThread ) + { + m_WorkThreads[ iWorkThread ]->Suspend(); + ValidatePtr( m_WorkThreads[ iWorkThread ] ); + } + + ValidateAlignedPtr( m_pTSQueueCompleted ); + ValidateAlignedPtr( m_pTSQueueToProcess ); + ValidateObj( m_vecCompletedAndWaiting ); + FOR_EACH_VEC( m_vecCompletedAndWaiting, j ) + { + ValidatePtr( m_vecCompletedAndWaiting.Element( j ) ); + } + + FOR_EACH_VEC( m_WorkThreads, iWorkThread ) + { + m_WorkThreads[ iWorkThread ]->Resume(); + } + +#if 0 + ValidateObj( m_StatExecutionTime ); + ValidateObj( m_StatWaitTime ); +#endif +} +#endif // DBGFLAG_VALIDATE + +} // namespace GCSDK -- cgit v1.2.3