aboutsummaryrefslogtreecommitdiff
path: root/src/zencore
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencore')
-rw-r--r--src/zencore/filesystem.cpp42
-rw-r--r--src/zencore/include/zencore/workthreadpool.h23
-rw-r--r--src/zencore/jobqueue.cpp68
-rw-r--r--src/zencore/workthreadpool.cpp151
-rw-r--r--src/zencore/zencore.cpp7
5 files changed, 171 insertions, 120 deletions
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index 8327838c9..5125beeca 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -2439,26 +2439,28 @@ GetDirectoryContent(const std::filesystem::path& RootDir,
PendingWorkCount.AddCount(1);
try
{
- WorkerPool.ScheduleWork([WorkerPool = &WorkerPool,
- PendingWorkCount = &PendingWorkCount,
- Visitor = Visitor,
- Flags = Flags,
- Path = std::move(Path),
- RelativeRoot = RelativeRoot / DirectoryName]() {
- ZEN_ASSERT(Visitor);
- auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
- try
- {
- MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
- FileSystemTraversal Traversal;
- Traversal.TraverseFileSystem(Path, SubVisitor);
- Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what());
- }
- });
+ WorkerPool.ScheduleWork(
+ [WorkerPool = &WorkerPool,
+ PendingWorkCount = &PendingWorkCount,
+ Visitor = Visitor,
+ Flags = Flags,
+ Path = std::move(Path),
+ RelativeRoot = RelativeRoot / DirectoryName]() {
+ ZEN_ASSERT(Visitor);
+ auto _ = MakeGuard([&]() { PendingWorkCount->CountDown(); });
+ try
+ {
+ MultithreadedVisitor SubVisitor(*WorkerPool, *PendingWorkCount, RelativeRoot, Flags, Visitor);
+ FileSystemTraversal Traversal;
+ Traversal.TraverseFileSystem(Path, SubVisitor);
+ Visitor->AsyncVisitDirectory(SubVisitor.RelativeRoot, std::move(SubVisitor.Content));
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("Failed scheduling work to scan subfolder '{}'. Reason: '{}'", Path / RelativeRoot, Ex.what());
+ }
+ },
+ WorkerThreadPool::EMode::DisableBacklog);
}
catch (const std::exception Ex)
{
diff --git a/src/zencore/include/zencore/workthreadpool.h b/src/zencore/include/zencore/workthreadpool.h
index 62356495c..4c38dd651 100644
--- a/src/zencore/include/zencore/workthreadpool.h
+++ b/src/zencore/include/zencore/workthreadpool.h
@@ -18,11 +18,7 @@ struct IWork : public RefCounted
{
virtual void Execute() = 0;
- inline std::exception_ptr GetException() { return m_Exception; }
-
private:
- std::exception_ptr m_Exception;
-
friend class WorkerThreadPool;
};
@@ -35,13 +31,18 @@ public:
WorkerThreadPool(int InThreadCount, std::string_view WorkerThreadBaseName);
~WorkerThreadPool();
- void ScheduleWork(Ref<IWork> Work);
- void ScheduleWork(std::function<void()>&& Work);
+ // Decides what to do if there are no free workers in the pool when the work is submitted
+ enum class EMode
+ {
+ EnableBacklog, // The work will be added to a backlog of work to do
+ DisableBacklog // The work will be executed synchronously in the caller thread
+ };
+
+ void ScheduleWork(Ref<IWork> Work, EMode Mode);
+ void ScheduleWork(std::function<void()>&& Work, EMode Mode);
template<typename Func>
- auto EnqueueTask(std::packaged_task<Func> Task);
-
- [[nodiscard]] size_t PendingWorkItemCount() const;
+ auto EnqueueTask(std::packaged_task<Func> Task, EMode Mode);
private:
struct Impl;
@@ -54,7 +55,7 @@ private:
template<typename Func>
auto
-WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task)
+WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task, EMode Mode)
{
struct FutureWork : IWork
{
@@ -67,7 +68,7 @@ WorkerThreadPool::EnqueueTask(std::packaged_task<Func> Task)
Ref<FutureWork> Work{new FutureWork(std::move(Task))};
auto Future = Work->m_Task.get_future();
- ScheduleWork(std::move(Work));
+ ScheduleWork(std::move(Work), Mode);
return Future;
}
diff --git a/src/zencore/jobqueue.cpp b/src/zencore/jobqueue.cpp
index 5d727b69c..4aa8c113e 100644
--- a/src/zencore/jobqueue.cpp
+++ b/src/zencore/jobqueue.cpp
@@ -109,10 +109,12 @@ public:
WorkerCounter.AddCount(1);
try
{
- WorkerPool.ScheduleWork([&]() {
- auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
- Worker();
- });
+ WorkerPool.ScheduleWork(
+ [&]() {
+ auto _ = MakeGuard([&]() { WorkerCounter.CountDown(); });
+ Worker();
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
return {.Id = NewJobId};
}
catch (const std::exception& Ex)
@@ -466,34 +468,36 @@ TEST_CASE("JobQueue")
for (uint32_t I = 0; I < 100; I++)
{
JobsLatch.AddCount(1);
- Pool.ScheduleWork([&Queue, &JobsLatch, I]() {
- auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- JobsLatch.AddCount(1);
- auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
- auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("going to sleep", "", 100, 100);
- Sleep(10);
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("going to sleep again", "", 100, 50);
- if ((I & 0xFF) == 0x10)
- {
- zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
- }
- Sleep(10);
- if (Context.IsCancelled())
- {
- return;
- }
- Context.ReportProgress("done", "", 100, 0);
- });
- });
+ Pool.ScheduleWork(
+ [&Queue, &JobsLatch, I]() {
+ auto _ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ JobsLatch.AddCount(1);
+ auto Id = Queue->QueueJob(fmt::format("busy {}", I), [&JobsLatch, I](JobContext& Context) {
+ auto $ = MakeGuard([&JobsLatch]() { JobsLatch.CountDown(); });
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("going to sleep", "", 100, 100);
+ Sleep(10);
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("going to sleep again", "", 100, 50);
+ if ((I & 0xFF) == 0x10)
+ {
+ zen::ThrowSystemError(8, fmt::format("Job {} forced to fail", I));
+ }
+ Sleep(10);
+ if (Context.IsCancelled())
+ {
+ return;
+ }
+ Context.ReportProgress("done", "", 100, 0);
+ });
+ },
+ WorkerThreadPool::EMode::EnableBacklog);
}
auto Join = [](std::span<std::string> Strings, std::string_view Delimiter) -> std::string {
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index 445fe939e..e241c0de8 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -5,6 +5,7 @@
#include <zencore/blockingqueue.h>
#include <zencore/except.h>
#include <zencore/logging.h>
+#include <zencore/scopeguard.h>
#include <zencore/string.h>
#include <zencore/testing.h>
#include <zencore/thread.h>
@@ -13,6 +14,10 @@
#include <thread>
#include <vector>
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
#define ZEN_USE_WINDOWS_THREADPOOL 1
#if ZEN_PLATFORM_WINDOWS && ZEN_USE_WINDOWS_THREADPOOL
@@ -41,18 +46,23 @@ namespace {
struct WorkerThreadPool::Impl
{
+ const int m_ThreadCount = 0;
PTP_POOL m_ThreadPool = nullptr;
PTP_CLEANUP_GROUP m_CleanupGroup = nullptr;
TP_CALLBACK_ENVIRON m_CallbackEnvironment;
PTP_WORK m_Work = nullptr;
- std::string m_WorkerThreadBaseName;
- std::atomic<int> m_WorkerThreadCounter{0};
+ std::string m_WorkerThreadBaseName;
+ std::atomic<size_t> m_WorkerThreadCounter{0};
+ std::atomic<int> m_FreeWorkerCount{0};
- RwLock m_QueueLock;
+ mutable RwLock m_QueueLock;
std::deque<Ref<IWork>> m_WorkQueue;
- Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName)
+ : m_ThreadCount(InThreadCount)
+ , m_WorkerThreadBaseName(WorkerThreadBaseName)
+ , m_FreeWorkerCount(m_ThreadCount)
{
// Thread pool setup
@@ -62,11 +72,11 @@ struct WorkerThreadPool::Impl
ThrowLastError("CreateThreadpool failed");
}
- if (!SetThreadpoolThreadMinimum(m_ThreadPool, InThreadCount))
+ if (!SetThreadpoolThreadMinimum(m_ThreadPool, (DWORD)m_ThreadCount))
{
ThrowLastError("SetThreadpoolThreadMinimum failed");
}
- SetThreadpoolThreadMaximum(m_ThreadPool, InThreadCount * 2);
+ SetThreadpoolThreadMaximum(m_ThreadPool, (DWORD)m_ThreadCount);
InitializeThreadpoolEnvironment(&m_CallbackEnvironment);
@@ -93,12 +103,29 @@ struct WorkerThreadPool::Impl
CloseThreadpool(m_ThreadPool);
}
- void ScheduleWork(Ref<IWork> Work)
+ [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode)
{
- m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); });
+ if (Mode == WorkerThreadPool::EMode::DisableBacklog)
+ {
+ if (m_FreeWorkerCount <= 0)
+ {
+ return Work;
+ }
+ RwLock::ExclusiveLockScope _(m_QueueLock);
+ const int QueuedCount = gsl::narrow<int>(m_WorkQueue.size());
+ if (QueuedCount >= m_FreeWorkerCount)
+ {
+ return Work;
+ }
+ m_WorkQueue.push_back(std::move(Work));
+ }
+ else
+ {
+ m_QueueLock.WithExclusiveLock([&] { m_WorkQueue.push_back(std::move(Work)); });
+ }
SubmitThreadpoolWork(m_Work);
+ return {};
}
- [[nodiscard]] size_t PendingWorkItemCount() const { return 0; }
static VOID CALLBACK WorkCallback(_Inout_ PTP_CALLBACK_INSTANCE Instance, _Inout_opt_ PVOID Context, _Inout_ PTP_WORK Work)
{
@@ -109,10 +136,13 @@ struct WorkerThreadPool::Impl
void DoWork()
{
+ m_FreeWorkerCount--;
+ auto _ = MakeGuard([&]() { m_FreeWorkerCount++; });
+
if (!t_IsThreadNamed)
{
t_IsThreadNamed = true;
- const int ThreadIndex = ++m_WorkerThreadCounter;
+ const size_t ThreadIndex = ++m_WorkerThreadCounter;
zen::ExtendableStringBuilder<128> ThreadName;
ThreadName << m_WorkerThreadBaseName << "_" << ThreadIndex;
SetCurrentThreadName(ThreadName);
@@ -121,7 +151,7 @@ struct WorkerThreadPool::Impl
Ref<IWork> WorkFromQueue;
{
- RwLock::ExclusiveLockScope _{m_QueueLock};
+ RwLock::ExclusiveLockScope __{m_QueueLock};
WorkFromQueue = std::move(m_WorkQueue.front());
m_WorkQueue.pop_front();
}
@@ -141,20 +171,25 @@ struct WorkerThreadPool::ThreadStartInfo
struct WorkerThreadPool::Impl
{
+ const int m_ThreadCount = 0;
void WorkerThreadFunction(ThreadStartInfo Info);
std::string m_WorkerThreadBaseName;
std::vector<std::thread> m_WorkerThreads;
BlockingQueue<Ref<IWork>> m_WorkQueue;
+ std::atomic<int> m_FreeWorkerCount{0};
- Impl(int InThreadCount, std::string_view WorkerThreadBaseName) : m_WorkerThreadBaseName(WorkerThreadBaseName)
+ Impl(int InThreadCount, std::string_view WorkerThreadBaseName)
+ : m_ThreadCount(InThreadCount)
+ , m_WorkerThreadBaseName(WorkerThreadBaseName)
+ , m_FreeWorkerCount(m_ThreadCount)
{
# if ZEN_WITH_TRACE
trace::ThreadGroupBegin(m_WorkerThreadBaseName.c_str());
# endif
- zen::Latch WorkerLatch{InThreadCount};
+ zen::Latch WorkerLatch{m_ThreadCount};
- for (int i = 0; i < InThreadCount; ++i)
+ for (int i = 0; i < m_ThreadCount; ++i)
{
m_WorkerThreads.emplace_back(&Impl::WorkerThreadFunction, this, ThreadStartInfo{i + 1, &WorkerLatch});
}
@@ -181,8 +216,23 @@ struct WorkerThreadPool::Impl
m_WorkerThreads.clear();
}
- void ScheduleWork(Ref<IWork> Work) { m_WorkQueue.Enqueue(std::move(Work)); }
- [[nodiscard]] size_t PendingWorkItemCount() const { return m_WorkQueue.Size(); }
+ [[nodiscard]] Ref<IWork> ScheduleWork(Ref<IWork> Work, WorkerThreadPool::EMode Mode)
+ {
+ if (Mode == WorkerThreadPool::EMode::DisableBacklog)
+ {
+ if (m_FreeWorkerCount <= 0)
+ {
+ return Work;
+ }
+ const int QueuedCount = gsl::narrow<int>(m_WorkQueue.Size());
+ if (QueuedCount >= m_FreeWorkerCount)
+ {
+ return Work;
+ }
+ }
+ m_WorkQueue.Enqueue(std::move(Work));
+ return {};
+ }
};
void
@@ -197,21 +247,23 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info)
Ref<IWork> Work;
if (m_WorkQueue.WaitAndDequeue(Work))
{
+ m_FreeWorkerCount--;
+ auto _ = MakeGuard([&]() { m_FreeWorkerCount++; });
+
try
{
ZEN_TRACE_CPU_FLUSH("AsyncWork");
Work->Execute();
+ Work = {};
}
catch (const AssertException& Ex)
{
- Work->m_Exception = std::current_exception();
-
+ Work = {};
ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
}
catch (const std::exception& e)
{
- Work->m_Exception = std::current_exception();
-
+ Work = {};
ZEN_WARN("Caught exception in worker thread: {}", e.what());
}
}
@@ -243,48 +295,38 @@ WorkerThreadPool::~WorkerThreadPool()
}
void
-WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
+WorkerThreadPool::ScheduleWork(Ref<IWork> Work, EMode Mode)
{
if (m_Impl)
{
- m_Impl->ScheduleWork(std::move(Work));
- }
- else
- {
- try
+ if (Work = m_Impl->ScheduleWork(std::move(Work), Mode); !Work)
{
- ZEN_TRACE_CPU_FLUSH("SyncWork");
- Work->Execute();
- }
- catch (const AssertException& Ex)
- {
- Work->m_Exception = std::current_exception();
-
- ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
+ return;
}
- catch (const std::exception& e)
- {
- Work->m_Exception = std::current_exception();
+ }
- ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what());
- }
+ try
+ {
+ ZEN_TRACE_CPU_FLUSH("SyncWork");
+ Work->Execute();
+ Work = {};
+ }
+ catch (const AssertException& Ex)
+ {
+ Work = {};
+ ZEN_WARN("Assert exception in worker thread: {}", Ex.FullDescription());
+ }
+ catch (const std::exception& e)
+ {
+ Work = {};
+ ZEN_WARN("Caught exception when executing worker synchronously: {}", e.what());
}
}
void
-WorkerThreadPool::ScheduleWork(std::function<void()>&& Work)
-{
- ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))));
-}
-
-[[nodiscard]] size_t
-WorkerThreadPool::PendingWorkItemCount() const
+WorkerThreadPool::ScheduleWork(std::function<void()>&& Work, EMode Mode)
{
- if (m_Impl)
- {
- return m_Impl->PendingWorkItemCount();
- }
- return 0;
+ ScheduleWork(Ref<IWork>(new detail::LambdaWork(std::move(Work))), Mode);
}
//////////////////////////////////////////////////////////////////////////
@@ -302,9 +344,10 @@ TEST_CASE("threadpool.basic")
{
WorkerThreadPool Threadpool{1};
- auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }});
- auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }});
- auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }});
+ auto Future42 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 42; }}, WorkerThreadPool::EMode::EnableBacklog);
+ auto Future99 = Threadpool.EnqueueTask(std::packaged_task<int()>{[] { return 99; }}, WorkerThreadPool::EMode::EnableBacklog);
+ auto FutureThrow = Threadpool.EnqueueTask(std::packaged_task<void()>{[] { throw std::runtime_error("meep!"); }},
+ WorkerThreadPool::EMode::EnableBacklog);
CHECK_EQ(Future42.get(), 42);
CHECK_EQ(Future99.get(), 99);
diff --git a/src/zencore/zencore.cpp b/src/zencore/zencore.cpp
index 51e06ae14..5a6232318 100644
--- a/src/zencore/zencore.cpp
+++ b/src/zencore/zencore.cpp
@@ -350,9 +350,10 @@ TEST_CASE("Assert.Callstack")
WorkerThreadPool Pool(1);
auto Task = Pool.EnqueueTask(std::packaged_task<int()>{[] {
- ZEN_ASSERT(false);
- return 1;
- }});
+ ZEN_ASSERT(false);
+ return 1;
+ }},
+ WorkerThreadPool::EMode::EnableBacklog);
try
{