aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2026-04-13 22:45:09 +0200
committerDan Engelbrecht <[email protected]>2026-04-13 22:45:09 +0200
commitae94d83f271f52ca47ab11604acbb3a16f1ee5e3 (patch)
tree69c84c52595405f1b9f821f6f3e089b773f8b4b1
parentfix utf characters in source code (#953) (diff)
downloadzen-de/trim-malloc-at-threadpool-idle.tar.xz
zen-de/trim-malloc-at-threadpool-idle.zip
let windows threads in thread pools die if inactivede/trim-malloc-at-threadpool-idle
trim mimalloc pages if thread in thread pool is idle for 2 min (non-windows)
-rw-r--r--src/zencore/include/zencore/blockingqueue.h29
-rw-r--r--src/zencore/workthreadpool.cpp68
2 files changed, 64 insertions, 33 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h
index b6c93e937..076af3aaf 100644
--- a/src/zencore/include/zencore/blockingqueue.h
+++ b/src/zencore/include/zencore/blockingqueue.h
@@ -5,12 +5,20 @@
#include <zencore/zencore.h> // For ZEN_ASSERT
#include <atomic>
+#include <chrono>
#include <condition_variable>
#include <deque>
#include <mutex>
namespace zen {
+enum class DequeueResult
+{
+ Success,
+ Closed,
+ Timeout,
+};
+
template<typename T>
class BlockingQueue
{
@@ -50,6 +58,27 @@ public:
return true;
}
+ template<typename Rep, typename Period>
+ DequeueResult WaitAndDequeueFor(T& Item, std::chrono::duration<Rep, Period> Timeout)
+ {
+ std::unique_lock Lock(m_Lock);
+ if (m_Queue.empty())
+ {
+ if (m_CompleteAdding)
+ {
+ return DequeueResult::Closed;
+ }
+ m_NewItemSignal.wait_for(Lock, Timeout, [this]() { return !m_Queue.empty() || m_CompleteAdding; });
+ if (m_Queue.empty())
+ {
+ return m_CompleteAdding ? DequeueResult::Closed : DequeueResult::Timeout;
+ }
+ }
+ Item = std::move(m_Queue.front());
+ m_Queue.pop_front();
+ return DequeueResult::Success;
+ }
+
void CompleteAdding()
{
std::unique_lock Lock(m_Lock);
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index 1cb338c66..ef56cb3a6 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -4,11 +4,14 @@
#include <zencore/blockingqueue.h>
#include <zencore/except.h>
+#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/memory/fmalloc.h>
#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/thread.h>
+#include <zencore/timer.h>
#include <zencore/trace.h>
#include <thread>
@@ -72,10 +75,10 @@ struct WorkerThreadPool::Impl
ThrowLastError("CreateThreadpool failed");
}
- if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount))
- {
- ThrowLastError("SetThreadpoolThreadMinimum failed");
- }
+ // if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount))
+ // {
+ // ThrowLastError("SetThreadpoolThreadMinimum failed");
+ // }
SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount);
InitializeThreadpoolEnvironment(&m_CallbackEnvironment);
@@ -107,10 +110,6 @@ struct WorkerThreadPool::Impl
{
if (Mode == WorkerThreadPool::EMode::DisableBacklog)
{
- if (m_FreeWorkerCount <= 0)
- {
- return Work;
- }
RwLock::ExclusiveLockScope _(m_QueueLock);
const int QueuedCount = gsl::narrow<int>(m_WorkQueue.size());
if (QueuedCount >= m_FreeWorkerCount)
@@ -234,10 +233,6 @@ struct WorkerThreadPool::Impl
{
if (Mode == WorkerThreadPool::EMode::DisableBacklog)
{
- if (m_FreeWorkerCount <= 0)
- {
- return Work;
- }
const int QueuedCount = gsl::narrow<int>(m_WorkQueue.Size());
if (QueuedCount >= m_FreeWorkerCount)
{
@@ -256,35 +251,42 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info)
Info.Latch->CountDown();
+ using namespace std::chrono_literals;
+
do
{
Ref<IWork> Work;
- if (m_WorkQueue.WaitAndDequeue(Work))
- {
- m_FreeWorkerCount--;
- auto _ = MakeGuard([&]() { m_FreeWorkerCount++; });
- try
- {
- ZEN_TRACE_CPU_FLUSH("AsyncWork");
- Work->Execute();
- Work = {};
- }
- catch (const AssertException& Ex)
- {
- Work = {};
- ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
- }
- catch (const std::exception& e)
- {
- Work = {};
- ZEN_ERROR("Caught exception in worker thread: {}", e.what());
- }
+ DequeueResult Result = m_WorkQueue.WaitAndDequeueFor(Work, 2min);
+ if (Result == DequeueResult::Timeout)
+ {
+ GMalloc->Trim(false);
+ continue;
}
- else
+ if (Result == DequeueResult::Closed)
{
return;
}
+
+ m_FreeWorkerCount--;
+ auto _ = MakeGuard([&]() { m_FreeWorkerCount++; });
+
+ try
+ {
+ ZEN_TRACE_CPU_FLUSH("AsyncWork");
+ Work->Execute();
+ Work = {};
+ }
+ catch (const AssertException& Ex)
+ {
+ Work = {};
+ ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
+ }
+ catch (const std::exception& e)
+ {
+ Work = {};
+ ZEN_ERROR("Caught exception in worker thread: {}", e.what());
+ }
} while (true);
}