From ae94d83f271f52ca47ab11604acbb3a16f1ee5e3 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Mon, 13 Apr 2026 22:45:09 +0200 Subject: let windows threads in thread pools die if inactive trim mimalloc pages if thread in thread pool is idle for 2 min (non-windows) --- src/zencore/include/zencore/blockingqueue.h | 29 ++++++++++++ src/zencore/workthreadpool.cpp | 68 +++++++++++++++-------------- 2 files changed, 64 insertions(+), 33 deletions(-) (limited to 'src') diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h index b6c93e937..076af3aaf 100644 --- a/src/zencore/include/zencore/blockingqueue.h +++ b/src/zencore/include/zencore/blockingqueue.h @@ -5,12 +5,20 @@ #include // For ZEN_ASSERT #include +#include #include #include #include namespace zen { +enum class DequeueResult +{ + Success, + Closed, + Timeout, +}; + template class BlockingQueue { @@ -50,6 +58,27 @@ public: return true; } + template + DequeueResult WaitAndDequeueFor(T& Item, std::chrono::duration Timeout) + { + std::unique_lock Lock(m_Lock); + if (m_Queue.empty()) + { + if (m_CompleteAdding) + { + return DequeueResult::Closed; + } + m_NewItemSignal.wait_for(Lock, Timeout, [this]() { return !m_Queue.empty() || m_CompleteAdding; }); + if (m_Queue.empty()) + { + return m_CompleteAdding ? DequeueResult::Closed : DequeueResult::Timeout; + } + } + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + return DequeueResult::Success; + } + void CompleteAdding() { std::unique_lock Lock(m_Lock); diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 1cb338c66..ef56cb3a6 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -4,11 +4,14 @@ #include #include +#include #include +#include #include #include #include #include +#include #include #include @@ -72,10 +75,10 @@ struct WorkerThreadPool::Impl ThrowLastError("CreateThreadpool failed"); } - if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount)) - { - ThrowLastError("SetThreadpoolThreadMinimum failed"); - } + // if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount)) + // { + // ThrowLastError("SetThreadpoolThreadMinimum failed"); + // } SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount); InitializeThreadpoolEnvironment(&m_CallbackEnvironment); @@ -107,10 +110,6 @@ struct WorkerThreadPool::Impl { if (Mode == WorkerThreadPool::EMode::DisableBacklog) { - if (m_FreeWorkerCount <= 0) - { - return Work; - } RwLock::ExclusiveLockScope _(m_QueueLock); const int QueuedCount = gsl::narrow(m_WorkQueue.size()); if (QueuedCount >= m_FreeWorkerCount) @@ -234,10 +233,6 @@ struct WorkerThreadPool::Impl { if (Mode == WorkerThreadPool::EMode::DisableBacklog) { - if (m_FreeWorkerCount <= 0) - { - return Work; - } const int QueuedCount = gsl::narrow(m_WorkQueue.Size()); if (QueuedCount >= m_FreeWorkerCount) { @@ -256,35 +251,42 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) Info.Latch->CountDown(); + using namespace std::chrono_literals; + do { Ref Work; - if (m_WorkQueue.WaitAndDequeue(Work)) - { - m_FreeWorkerCount--; - auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); - try - { - ZEN_TRACE_CPU_FLUSH("AsyncWork"); - Work->Execute(); - Work = {}; - } - catch (const AssertException& Ex) - { - Work = {}; - ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); - } - catch (const std::exception& e) - { - Work = {}; - ZEN_ERROR("Caught exception in worker thread: {}", e.what()); - } + DequeueResult Result = m_WorkQueue.WaitAndDequeueFor(Work, 2min); + if (Result == DequeueResult::Timeout) + { + GMalloc->Trim(false); + continue; } - else + if (Result == DequeueResult::Closed) { return; } + + m_FreeWorkerCount--; + auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); + + try + { + ZEN_TRACE_CPU_FLUSH("AsyncWork"); + Work->Execute(); + Work = {}; + } + catch (const AssertException& Ex) + { + Work = {}; + ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + } + catch (const std::exception& e) + { + Work = {}; + ZEN_ERROR("Caught exception in worker thread: {}", e.what()); + } } while (true); } -- cgit v1.2.3