aboutsummaryrefslogtreecommitdiff
path: root/src/zencore/workthreadpool.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencore/workthreadpool.cpp')
-rw-r--r--src/zencore/workthreadpool.cpp59
1 files changed, 48 insertions, 11 deletions
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index a43ce7115..b179527d7 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -11,6 +11,7 @@
#include <zencore/thread.h>
#include <zencore/trace.h>
+#include <algorithm>
#include <thread>
#include <vector>
@@ -195,17 +196,19 @@ struct WorkerThreadPool::ThreadStartInfo
struct ExplicitImpl : WorkerThreadPool::Impl
{
- const int m_MinThreads;
- const int m_MaxThreads;
- std::atomic<int> m_TotalThreads{0};
- std::atomic<int> m_ActiveCount{0};
- void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info);
- void SpawnWorkerThread();
- std::string m_WorkerThreadBaseName;
- RwLock m_ThreadListLock;
- std::vector<std::thread> m_WorkerThreads;
- BlockingQueue<Ref<IWork>> m_WorkQueue;
- std::atomic<int> m_FreeWorkerCount{0};
+ const int m_MinThreads;
+ const int m_MaxThreads;
+ std::atomic<int> m_TotalThreads{0};
+ std::atomic<int> m_ActiveCount{0};
+ void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info);
+ void SpawnWorkerThread();
+ void PruneExitedThreads();
+ std::string m_WorkerThreadBaseName;
+ RwLock m_ThreadListLock;
+ std::vector<std::thread> m_WorkerThreads;
+ std::vector<std::thread::id> m_ExitedThreadIds;
+ BlockingQueue<Ref<IWork>> m_WorkQueue;
+ std::atomic<int> m_FreeWorkerCount{0};
bool ScalingEnabled() const { return m_MinThreads != m_MaxThreads; }
@@ -287,12 +290,38 @@ struct ExplicitImpl : WorkerThreadPool::Impl
};
void
+ExplicitImpl::PruneExitedThreads()
+{
+ // Must be called under m_ThreadListLock
+ if (m_ExitedThreadIds.empty())
+ {
+ return;
+ }
+
+ for (auto It = m_WorkerThreads.begin(); It != m_WorkerThreads.end();)
+ {
+ auto IdIt = std::find(m_ExitedThreadIds.begin(), m_ExitedThreadIds.end(), It->get_id());
+ if (IdIt != m_ExitedThreadIds.end())
+ {
+ It->join();
+ It = m_WorkerThreads.erase(It);
+ m_ExitedThreadIds.erase(IdIt);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
ExplicitImpl::SpawnWorkerThread()
{
static std::atomic<int> s_DynamicThreadIndex{0};
const int ThreadNumber = ++s_DynamicThreadIndex;
RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ PruneExitedThreads();
m_WorkerThreads.emplace_back(&ExplicitImpl::WorkerThreadFunction, this, WorkerThreadPool::ThreadStartInfo{ThreadNumber, nullptr});
}
@@ -361,6 +390,10 @@ ExplicitImpl::WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info)
m_WorkerThreadBaseName,
CurrentTotal - 1);
m_FreeWorkerCount--;
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ m_ExitedThreadIds.push_back(std::this_thread::get_id());
+ }
return; // Thread exits
}
}
@@ -457,6 +490,8 @@ workthreadpool_forcelink()
using namespace std::literals;
+TEST_SUITE_BEGIN("core.workthreadpool");
+
TEST_CASE("threadpool.basic")
{
WorkerThreadPool Threadpool{1};
@@ -471,6 +506,8 @@ TEST_CASE("threadpool.basic")
CHECK_THROWS(FutureThrow.get());
}
+TEST_SUITE_END();
+
#endif
} // namespace zen