diff options
Diffstat (limited to 'src/zencore/workthreadpool.cpp')
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 59 |
1 files changed, 48 insertions, 11 deletions
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index a43ce7115..b179527d7 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -11,6 +11,7 @@ #include <zencore/thread.h> #include <zencore/trace.h> +#include <algorithm> #include <thread> #include <vector> @@ -195,17 +196,19 @@ struct WorkerThreadPool::ThreadStartInfo struct ExplicitImpl : WorkerThreadPool::Impl { - const int m_MinThreads; - const int m_MaxThreads; - std::atomic<int> m_TotalThreads{0}; - std::atomic<int> m_ActiveCount{0}; - void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info); - void SpawnWorkerThread(); - std::string m_WorkerThreadBaseName; - RwLock m_ThreadListLock; - std::vector<std::thread> m_WorkerThreads; - BlockingQueue<Ref<IWork>> m_WorkQueue; - std::atomic<int> m_FreeWorkerCount{0}; + const int m_MinThreads; + const int m_MaxThreads; + std::atomic<int> m_TotalThreads{0}; + std::atomic<int> m_ActiveCount{0}; + void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info); + void SpawnWorkerThread(); + void PruneExitedThreads(); + std::string m_WorkerThreadBaseName; + RwLock m_ThreadListLock; + std::vector<std::thread> m_WorkerThreads; + std::vector<std::thread::id> m_ExitedThreadIds; + BlockingQueue<Ref<IWork>> m_WorkQueue; + std::atomic<int> m_FreeWorkerCount{0}; bool ScalingEnabled() const { return m_MinThreads != m_MaxThreads; } @@ -287,12 +290,38 @@ struct ExplicitImpl : WorkerThreadPool::Impl }; void +ExplicitImpl::PruneExitedThreads() +{ + // Must be called under m_ThreadListLock + if (m_ExitedThreadIds.empty()) + { + return; + } + + for (auto It = m_WorkerThreads.begin(); It != m_WorkerThreads.end();) + { + auto IdIt = std::find(m_ExitedThreadIds.begin(), m_ExitedThreadIds.end(), It->get_id()); + if (IdIt != m_ExitedThreadIds.end()) + { + It->join(); + It = m_WorkerThreads.erase(It); + m_ExitedThreadIds.erase(IdIt); + } + else + { + ++It; + } + } +} + +void ExplicitImpl::SpawnWorkerThread() { static std::atomic<int> s_DynamicThreadIndex{0}; const int ThreadNumber = ++s_DynamicThreadIndex; RwLock::ExclusiveLockScope _(m_ThreadListLock); + PruneExitedThreads(); m_WorkerThreads.emplace_back(&ExplicitImpl::WorkerThreadFunction, this, WorkerThreadPool::ThreadStartInfo{ThreadNumber, nullptr}); } @@ -361,6 +390,10 @@ ExplicitImpl::WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info) m_WorkerThreadBaseName, CurrentTotal - 1); m_FreeWorkerCount--; + { + RwLock::ExclusiveLockScope _(m_ThreadListLock); + m_ExitedThreadIds.push_back(std::this_thread::get_id()); + } return; // Thread exits } } @@ -457,6 +490,8 @@ workthreadpool_forcelink() using namespace std::literals; +TEST_SUITE_BEGIN("core.workthreadpool"); + TEST_CASE("threadpool.basic") { WorkerThreadPool Threadpool{1}; @@ -471,6 +506,8 @@ TEST_CASE("threadpool.basic") CHECK_THROWS(FutureThrow.get()); } +TEST_SUITE_END(); + #endif } // namespace zen |