diff options
| -rw-r--r-- | src/zencore/include/zencore/blockingqueue.h | 29 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 68 |
2 files changed, 64 insertions, 33 deletions
diff --git a/src/zencore/include/zencore/blockingqueue.h b/src/zencore/include/zencore/blockingqueue.h index b6c93e937..076af3aaf 100644 --- a/src/zencore/include/zencore/blockingqueue.h +++ b/src/zencore/include/zencore/blockingqueue.h @@ -5,12 +5,20 @@ #include <zencore/zencore.h> // For ZEN_ASSERT #include <atomic> +#include <chrono> #include <condition_variable> #include <deque> #include <mutex> namespace zen { +enum class DequeueResult +{ + Success, + Closed, + Timeout, +}; + template<typename T> class BlockingQueue { @@ -50,6 +58,27 @@ public: return true; } + template<typename Rep, typename Period> + DequeueResult WaitAndDequeueFor(T& Item, std::chrono::duration<Rep, Period> Timeout) + { + std::unique_lock Lock(m_Lock); + if (m_Queue.empty()) + { + if (m_CompleteAdding) + { + return DequeueResult::Closed; + } + m_NewItemSignal.wait_for(Lock, Timeout, [this]() { return !m_Queue.empty() || m_CompleteAdding; }); + if (m_Queue.empty()) + { + return m_CompleteAdding ? DequeueResult::Closed : DequeueResult::Timeout; + } + } + Item = std::move(m_Queue.front()); + m_Queue.pop_front(); + return DequeueResult::Success; + } + void CompleteAdding() { std::unique_lock Lock(m_Lock); diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index 1cb338c66..ef56cb3a6 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -4,11 +4,14 @@ #include <zencore/blockingqueue.h> #include <zencore/except.h> +#include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/memory/fmalloc.h> #include <zencore/scopeguard.h> #include <zencore/string.h> #include <zencore/testing.h> #include <zencore/thread.h> +#include <zencore/timer.h> #include <zencore/trace.h> #include <thread> @@ -72,10 +75,10 @@ struct WorkerThreadPool::Impl ThrowLastError("CreateThreadpool failed"); } - if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount)) - { - ThrowLastError("SetThreadpoolThreadMinimum failed"); - } + // if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount)) + // { + // ThrowLastError("SetThreadpoolThreadMinimum failed"); + // } SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount); InitializeThreadpoolEnvironment(&m_CallbackEnvironment); @@ -107,10 +110,6 @@ struct WorkerThreadPool::Impl { if (Mode == WorkerThreadPool::EMode::DisableBacklog) { - if (m_FreeWorkerCount <= 0) - { - return Work; - } RwLock::ExclusiveLockScope _(m_QueueLock); const int QueuedCount = gsl::narrow<int>(m_WorkQueue.size()); if (QueuedCount >= m_FreeWorkerCount) @@ -234,10 +233,6 @@ struct WorkerThreadPool::Impl { if (Mode == WorkerThreadPool::EMode::DisableBacklog) { - if (m_FreeWorkerCount <= 0) - { - return Work; - } const int QueuedCount = gsl::narrow<int>(m_WorkQueue.Size()); if (QueuedCount >= m_FreeWorkerCount) { @@ -256,35 +251,42 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) Info.Latch->CountDown(); + using namespace std::chrono_literals; + do { Ref<IWork> Work; - if (m_WorkQueue.WaitAndDequeue(Work)) - { - m_FreeWorkerCount--; - auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); - try - { - ZEN_TRACE_CPU_FLUSH("AsyncWork"); - Work->Execute(); - Work = {}; - } - catch (const AssertException& Ex) - { - Work = {}; - ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); - } - catch (const std::exception& e) - { - Work = {}; - ZEN_ERROR("Caught exception in worker thread: {}", e.what()); - } + DequeueResult Result = m_WorkQueue.WaitAndDequeueFor(Work, 2min); + if (Result == DequeueResult::Timeout) + { + GMalloc->Trim(false); + continue; } - else + if (Result == DequeueResult::Closed) { return; } + + m_FreeWorkerCount--; + auto _ = MakeGuard([&]() { m_FreeWorkerCount++; }); + + try + { + ZEN_TRACE_CPU_FLUSH("AsyncWork"); + Work->Execute(); + Work = {}; + } + catch (const AssertException& Ex) + { + Work = {}; + ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription()); + } + catch (const std::exception& e) + { + Work = {}; + ZEN_ERROR("Caught exception in worker thread: {}", e.what()); + } } while (true); } |