aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2023-11-29 09:14:57 -0500
committerGitHub <[email protected]>2023-11-29 15:14:57 +0100
commit68b3382ef7e0f7795b9a601aae73adc2f8ef9873 (patch)
treea8460fa497195575505e14f7beef069ecee31ef7 /src
parentfixed file logger pattern (#579) (diff)
downloadzen-68b3382ef7e0f7795b9a601aae73adc2f8ef9873.tar.xz
zen-68b3382ef7e0f7795b9a601aae73adc2f8ef9873.zip
global thread worker pools (#577)
- Improvement: Use two global worker thread pools instead of ad-hoc creation of worker pools
Diffstat (limited to 'src')
-rw-r--r--src/zencore/include/zencore/trace.h1
-rw-r--r--src/zencore/workthreadpool.cpp3
-rw-r--r--src/zenserver/cache/cachedisklayer.cpp15
-rw-r--r--src/zenserver/projectstore/remoteprojectstore.cpp16
-rw-r--r--src/zenserver/zenserver.cpp2
-rw-r--r--src/zenstore/cas.cpp3
-rw-r--r--src/zenstore/gc.cpp14
-rw-r--r--src/zenutil/include/zenutil/workerpools.h21
-rw-r--r--src/zenutil/workerpools.cpp89
9 files changed, 131 insertions, 33 deletions
diff --git a/src/zencore/include/zencore/trace.h b/src/zencore/include/zencore/trace.h
index 665df5808..8ff856fed 100644
--- a/src/zencore/include/zencore/trace.h
+++ b/src/zencore/include/zencore/trace.h
@@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_START
ZEN_THIRD_PARTY_INCLUDES_END
#define ZEN_TRACE_CPU(x) TRACE_CPU_SCOPE(x)
+#define ZEN_TRACE_CPU_FLUSH(x) TRACE_CPU_SCOPE(x, trace::CpuScopeFlags::CpuFlush)
enum class TraceType
{
diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp
index e6a6b5c54..6ff6463dd 100644
--- a/src/zencore/workthreadpool.cpp
+++ b/src/zencore/workthreadpool.cpp
@@ -110,6 +110,7 @@ struct WorkerThreadPool::Impl
m_WorkQueue.pop_front();
}
+ ZEN_TRACE_CPU_FLUSH("AsyncWork");
WorkFromQueue->Execute();
}
};
@@ -178,6 +179,7 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info)
{
try
{
+ ZEN_TRACE_CPU_FLUSH("AsyncWork");
Work->Execute();
}
catch (std::exception& e)
@@ -225,6 +227,7 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work)
{
try
{
+ ZEN_TRACE_CPU_FLUSH("SyncWork");
Work->Execute();
}
catch (std::exception& e)
diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp
index c8bc3871a..9117b8820 100644
--- a/src/zenserver/cache/cachedisklayer.cpp
+++ b/src/zenserver/cache/cachedisklayer.cpp
@@ -14,6 +14,7 @@
#include <zencore/trace.h>
#include <zencore/workthreadpool.h>
#include <zencore/xxhash.h>
+#include <zenutil/workerpools.h>
#include <future>
@@ -3709,11 +3710,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
RwLock SyncLock;
- const size_t MaxHwTreadUse = std::thread::hardware_concurrency();
- const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size()));
-
- WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate");
- Latch WorkLatch(1);
+ WorkerThreadPool& Pool = GetLargeWorkerPool();
+ Latch WorkLatch(1);
for (auto& BucketPath : FoundBucketDirectories)
{
WorkLatch.AddCount(1);
@@ -3825,11 +3823,8 @@ ZenCacheDiskLayer::Flush()
}
}
{
- const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
- const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size()));
-
- WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush");
- Latch WorkLatch(1);
+ WorkerThreadPool& Pool = GetSmallWorkerPool();
+ Latch WorkLatch(1);
for (auto& Bucket : Buckets)
{
WorkLatch.AddCount(1);
diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp
index d5d229e42..826c8ff51 100644
--- a/src/zenserver/projectstore/remoteprojectstore.cpp
+++ b/src/zenserver/projectstore/remoteprojectstore.cpp
@@ -13,6 +13,7 @@
#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
+#include <zenutil/workerpools.h>
#include <unordered_map>
@@ -802,10 +803,7 @@ BuildContainer(CidStore& ChunkStore,
const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks,
tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments)
{
- // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a
- // WorkerThreadPool alive
- size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
- WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
AsyncRemoteResult RemoteResult;
CbObject ContainerObject = BuildContainer(ChunkStore,
@@ -1153,10 +1151,7 @@ SaveOplog(CidStore& ChunkStore,
Stopwatch Timer;
- // We are creating a worker thread pool here since we are uploading a lot of attachments in one go
- // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive.
- size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
- WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount), "oplog_upload"sv);
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
std::filesystem::path AttachmentTempPath;
if (UseTempBlocks)
@@ -1528,10 +1523,7 @@ LoadOplog(CidStore& ChunkStore,
Stopwatch Timer;
- // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a
- // WorkerThreadPool alive
- size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
- WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
std::unordered_set<IoHash, IoHash::Hasher> Attachments;
std::vector<std::vector<IoHash>> ChunksInBlocks;
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 841f19295..2430267c1 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -24,6 +24,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
#include <zenutil/basicfile.h>
+#include <zenutil/workerpools.h>
#include <zenutil/zenserverprocess.h>
#if ZEN_PLATFORM_WINDOWS
@@ -659,6 +660,7 @@ ZenServer::Cleanup()
m_AuthMgr.reset();
m_Http = {};
m_JobQueue.reset();
+ ShutdownWorkerPools();
}
catch (std::exception& Ex)
{
diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp
index c6bfda8b9..d38099117 100644
--- a/src/zenstore/cas.cpp
+++ b/src/zenstore/cas.cpp
@@ -23,6 +23,7 @@
#include <zenstore/cidstore.h>
#include <zenstore/gc.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/workerpools.h>
#include <gsl/gsl-lite.hpp>
@@ -106,7 +107,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig)
// Initialize payload storage
{
- WorkerThreadPool WorkerPool(3, "CasImpl::Initialize");
+ WorkerThreadPool& WorkerPool = GetSmallWorkerPool();
std::vector<std::future<void>> Work;
Work.emplace_back(
WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }}));
diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp
index b53ca4bab..e2ab34d1e 100644
--- a/src/zenstore/gc.cpp
+++ b/src/zenstore/gc.cpp
@@ -18,6 +18,7 @@
#include <zencore/workthreadpool.h>
#include <zenstore/cidstore.h>
#include <zenstore/scrubcontext.h>
+#include <zenutil/workerpools.h>
#include "cas.h"
@@ -610,18 +611,11 @@ GcManager::CollectGarbage(const GcSettings& Settings)
RwLock::SharedLockScope GcLock(m_Lock);
- int WorkerThreadPoolCount = 0;
- if (!Settings.SingleThread)
- {
- const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u);
- WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, m_GcReferencers.size()));
- }
-
Result.ReferencerStats.resize(m_GcReferencers.size());
std::unordered_map<std::unique_ptr<GcStoreCompactor>, GcCompactStoreStats*> StoreCompactors;
RwLock StoreCompactorsLock;
- WorkerThreadPool ThreadPool(WorkerThreadPoolCount, "GCV2");
+ WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetSmallWorkerPool();
ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size());
if (!m_GcReferencers.empty())
@@ -1932,8 +1926,8 @@ GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice)
Stopwatch Timer;
ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete);
- WorkerThreadPool ThreadPool{4, "scrubber"};
- ScrubContext Ctx{ThreadPool, Deadline};
+ WorkerThreadPool& ThreadPool = GetSmallWorkerPool();
+ ScrubContext Ctx{ThreadPool, Deadline};
try
{
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
new file mode 100644
index 000000000..339120ece
--- /dev/null
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -0,0 +1,21 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/workthreadpool.h>
+
+namespace zen {
+
+// Worker pool with std::thread::hardware_concurrency() worker threads
+WorkerThreadPool& GetLargeWorkerPool();
+
+// Worker pool with std::thread::hardware_concurrency() / 4 worker threads
+WorkerThreadPool& GetSmallWorkerPool();
+
+// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
+// This is useful for debugging when multiple async thread can make stepping in debugger complicated
+WorkerThreadPool& GetSyncWorkerPool();
+
+void ShutdownWorkerPools();
+
+} // namespace zen
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
new file mode 100644
index 000000000..b511b0c5c
--- /dev/null
+++ b/src/zenutil/workerpools.cpp
@@ -0,0 +1,89 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zenutil/workerpools.h"
+
+#include <zencore/intmath.h>
+#include <zencore/thread.h>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <gsl/gsl-lite.hpp>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+namespace {
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
+ const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u));
+
+ RwLock PoolLock;
+
+ std::unique_ptr<WorkerThreadPool> LargeWorkerPool;
+ std::unique_ptr<WorkerThreadPool> SmallWorkerPool;
+ std::unique_ptr<WorkerThreadPool> SyncWorkerPool;
+} // namespace
+
+WorkerThreadPool&
+GetLargeWorkerPool()
+{
+ {
+ RwLock::SharedLockScope _(PoolLock);
+ if (LargeWorkerPool)
+ {
+ return *LargeWorkerPool;
+ }
+ }
+ RwLock::ExclusiveLockScope _(PoolLock);
+ if (LargeWorkerPool)
+ {
+ return *LargeWorkerPool;
+ }
+ LargeWorkerPool.reset(new WorkerThreadPool(LargeWorkerThreadPoolTreadCount, "LargeThreadPool"));
+ return *LargeWorkerPool;
+}
+
+WorkerThreadPool&
+GetSmallWorkerPool()
+{
+ {
+ RwLock::SharedLockScope _(PoolLock);
+ if (SmallWorkerPool)
+ {
+ return *SmallWorkerPool;
+ }
+ }
+ RwLock::ExclusiveLockScope _(PoolLock);
+ if (SmallWorkerPool)
+ {
+ return *SmallWorkerPool;
+ }
+ SmallWorkerPool.reset(new WorkerThreadPool(SmallWorkerThreadPoolTreadCount, "SmallThreadPool"));
+ return *SmallWorkerPool;
+}
+
+WorkerThreadPool&
+GetSyncWorkerPool()
+{
+ {
+ RwLock::SharedLockScope _(PoolLock);
+ if (SyncWorkerPool)
+ {
+ return *SyncWorkerPool;
+ }
+ }
+ RwLock::ExclusiveLockScope _(PoolLock);
+ if (SyncWorkerPool)
+ {
+ return *SyncWorkerPool;
+ }
+ SyncWorkerPool.reset(new WorkerThreadPool(0, "SyncThreadPool"));
+ return *SyncWorkerPool;
+}
+
+void
+ShutdownWorkerPools()
+{
+ RwLock::ExclusiveLockScope _(PoolLock);
+ LargeWorkerPool.reset();
+ SmallWorkerPool.reset();
+ SyncWorkerPool.reset();
+}
+} // namespace zen