aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zencore/workthreadpool.cpp55
-rw-r--r--src/zenhttp/servers/iothreadpool.cpp32
-rw-r--r--src/zenhttp/servers/iothreadpool.h6
3 files changed, 80 insertions, 13 deletions
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index 25a1d982a..b179527d7 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -11,6 +11,7 @@
#include <zencore/thread.h>
#include <zencore/trace.h>
+#include <algorithm>
#include <thread>
#include <vector>
@@ -195,17 +196,19 @@ struct WorkerThreadPool::ThreadStartInfo
struct ExplicitImpl : WorkerThreadPool::Impl
{
- const int m_MinThreads;
- const int m_MaxThreads;
- std::atomic<int> m_TotalThreads{0};
- std::atomic<int> m_ActiveCount{0};
- void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info);
- void SpawnWorkerThread();
- std::string m_WorkerThreadBaseName;
- RwLock m_ThreadListLock;
- std::vector<std::thread> m_WorkerThreads;
- BlockingQueue<Ref<IWork>> m_WorkQueue;
- std::atomic<int> m_FreeWorkerCount{0};
+ const int m_MinThreads;
+ const int m_MaxThreads;
+ std::atomic<int> m_TotalThreads{0};
+ std::atomic<int> m_ActiveCount{0};
+ void WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info);
+ void SpawnWorkerThread();
+ void PruneExitedThreads();
+ std::string m_WorkerThreadBaseName;
+ RwLock m_ThreadListLock;
+ std::vector<std::thread> m_WorkerThreads;
+ std::vector<std::thread::id> m_ExitedThreadIds;
+ BlockingQueue<Ref<IWork>> m_WorkQueue;
+ std::atomic<int> m_FreeWorkerCount{0};
bool ScalingEnabled() const { return m_MinThreads != m_MaxThreads; }
@@ -287,12 +290,38 @@ struct ExplicitImpl : WorkerThreadPool::Impl
};
void
+ExplicitImpl::PruneExitedThreads()
+{
+ // Must be called under m_ThreadListLock
+ if (m_ExitedThreadIds.empty())
+ {
+ return;
+ }
+
+ for (auto It = m_WorkerThreads.begin(); It != m_WorkerThreads.end();)
+ {
+ auto IdIt = std::find(m_ExitedThreadIds.begin(), m_ExitedThreadIds.end(), It->get_id());
+ if (IdIt != m_ExitedThreadIds.end())
+ {
+ It->join();
+ It = m_WorkerThreads.erase(It);
+ m_ExitedThreadIds.erase(IdIt);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
ExplicitImpl::SpawnWorkerThread()
{
static std::atomic<int> s_DynamicThreadIndex{0};
const int ThreadNumber = ++s_DynamicThreadIndex;
RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ PruneExitedThreads();
m_WorkerThreads.emplace_back(&ExplicitImpl::WorkerThreadFunction, this, WorkerThreadPool::ThreadStartInfo{ThreadNumber, nullptr});
}
@@ -361,6 +390,10 @@ ExplicitImpl::WorkerThreadFunction(WorkerThreadPool::ThreadStartInfo Info)
m_WorkerThreadBaseName,
CurrentTotal - 1);
m_FreeWorkerCount--;
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ m_ExitedThreadIds.push_back(std::this_thread::get_id());
+ }
return; // Thread exits
}
}
diff --git a/src/zenhttp/servers/iothreadpool.cpp b/src/zenhttp/servers/iothreadpool.cpp
index d180f17f8..1053f0b0c 100644
--- a/src/zenhttp/servers/iothreadpool.cpp
+++ b/src/zenhttp/servers/iothreadpool.cpp
@@ -8,6 +8,7 @@
#if ZEN_PLATFORM_WINDOWS
+# include <algorithm>
# include <thread>
namespace zen {
@@ -187,10 +188,36 @@ ExplicitIoThreadPool::CancelIo()
}
void
+ExplicitIoThreadPool::PruneExitedThreads()
+{
+ // Must be called under m_ThreadListLock
+ if (m_ExitedThreadIds.empty())
+ {
+ return;
+ }
+
+ for (auto It = m_Threads.begin(); It != m_Threads.end();)
+ {
+ auto IdIt = std::find(m_ExitedThreadIds.begin(), m_ExitedThreadIds.end(), It->get_id());
+ if (IdIt != m_ExitedThreadIds.end())
+ {
+ It->join();
+ It = m_Threads.erase(It);
+ m_ExitedThreadIds.erase(IdIt);
+ }
+ else
+ {
+ ++It;
+ }
+ }
+}
+
+void
ExplicitIoThreadPool::SpawnWorkerThread()
{
RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ PruneExitedThreads();
++m_TotalThreads;
m_Threads.emplace_back([this] { WorkerThreadMain(); });
}
@@ -237,6 +264,10 @@ ExplicitIoThreadPool::WorkerThreadMain()
ZEN_LOG_DEBUG(ExplicitIoPoolLog(),
"scaling down I/O thread (idle timeout), {} threads remaining",
CurrentTotal - 1);
+ {
+ RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ m_ExitedThreadIds.push_back(std::this_thread::get_id());
+ }
return; // Thread exits
}
}
@@ -278,6 +309,7 @@ ExplicitIoThreadPool::WorkerThreadMain()
// We already incremented m_TotalThreads, so do the actual spawn
{
RwLock::ExclusiveLockScope _(m_ThreadListLock);
+ PruneExitedThreads();
m_Threads.emplace_back([this] { WorkerThreadMain(); });
}
}
diff --git a/src/zenhttp/servers/iothreadpool.h b/src/zenhttp/servers/iothreadpool.h
index e2c15ba76..f6bfce450 100644
--- a/src/zenhttp/servers/iothreadpool.h
+++ b/src/zenhttp/servers/iothreadpool.h
@@ -92,6 +92,7 @@ public:
private:
void WorkerThreadMain();
void SpawnWorkerThread();
+ void PruneExitedThreads();
HANDLE m_Iocp = nullptr;
@@ -105,8 +106,9 @@ private:
std::atomic<int> m_ActiveCount{0};
std::atomic<bool> m_ShuttingDown{false};
- RwLock m_ThreadListLock;
- std::vector<std::thread> m_Threads;
+ RwLock m_ThreadListLock;
+ std::vector<std::thread> m_Threads;
+ std::vector<std::thread::id> m_ExitedThreadIds;
};
} // namespace zen