diff options
| author | Dan Engelbrecht <[email protected]> | 2023-11-29 09:14:57 -0500 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-11-29 15:14:57 +0100 |
| commit | 68b3382ef7e0f7795b9a601aae73adc2f8ef9873 (patch) | |
| tree | a8460fa497195575505e14f7beef069ecee31ef7 /src | |
| parent | fixed file logger pattern (#579) (diff) | |
| download | zen-68b3382ef7e0f7795b9a601aae73adc2f8ef9873.tar.xz zen-68b3382ef7e0f7795b9a601aae73adc2f8ef9873.zip | |
global thread worker pools (#577)
- Improvement: Use two global worker thread pools instead of ad-hoc creation of worker pools
Diffstat (limited to 'src')
| -rw-r--r-- | src/zencore/include/zencore/trace.h | 1 | ||||
| -rw-r--r-- | src/zencore/workthreadpool.cpp | 3 | ||||
| -rw-r--r-- | src/zenserver/cache/cachedisklayer.cpp | 15 | ||||
| -rw-r--r-- | src/zenserver/projectstore/remoteprojectstore.cpp | 16 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/cas.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/gc.cpp | 14 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 21 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 89 |
9 files changed, 131 insertions, 33 deletions
diff --git a/src/zencore/include/zencore/trace.h b/src/zencore/include/zencore/trace.h index 665df5808..8ff856fed 100644 --- a/src/zencore/include/zencore/trace.h +++ b/src/zencore/include/zencore/trace.h @@ -17,6 +17,7 @@ ZEN_THIRD_PARTY_INCLUDES_START ZEN_THIRD_PARTY_INCLUDES_END #define ZEN_TRACE_CPU(x) TRACE_CPU_SCOPE(x) +#define ZEN_TRACE_CPU_FLUSH(x) TRACE_CPU_SCOPE(x, trace::CpuScopeFlags::CpuFlush) enum class TraceType { diff --git a/src/zencore/workthreadpool.cpp b/src/zencore/workthreadpool.cpp index e6a6b5c54..6ff6463dd 100644 --- a/src/zencore/workthreadpool.cpp +++ b/src/zencore/workthreadpool.cpp @@ -110,6 +110,7 @@ struct WorkerThreadPool::Impl m_WorkQueue.pop_front(); } + ZEN_TRACE_CPU_FLUSH("AsyncWork"); WorkFromQueue->Execute(); } }; @@ -178,6 +179,7 @@ WorkerThreadPool::Impl::WorkerThreadFunction(ThreadStartInfo Info) { try { + ZEN_TRACE_CPU_FLUSH("AsyncWork"); Work->Execute(); } catch (std::exception& e) @@ -225,6 +227,7 @@ WorkerThreadPool::ScheduleWork(Ref<IWork> Work) { try { + ZEN_TRACE_CPU_FLUSH("SyncWork"); Work->Execute(); } catch (std::exception& e) diff --git a/src/zenserver/cache/cachedisklayer.cpp b/src/zenserver/cache/cachedisklayer.cpp index c8bc3871a..9117b8820 100644 --- a/src/zenserver/cache/cachedisklayer.cpp +++ b/src/zenserver/cache/cachedisklayer.cpp @@ -14,6 +14,7 @@ #include <zencore/trace.h> #include <zencore/workthreadpool.h> #include <zencore/xxhash.h> +#include <zenutil/workerpools.h> #include <future> @@ -3709,11 +3710,8 @@ ZenCacheDiskLayer::DiscoverBuckets() RwLock SyncLock; - const size_t MaxHwTreadUse = std::thread::hardware_concurrency(); - const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, FoundBucketDirectories.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::OpenOrCreate"); - Latch WorkLatch(1); + WorkerThreadPool& Pool = GetLargeWorkerPool(); + Latch WorkLatch(1); for (auto& BucketPath : FoundBucketDirectories) { WorkLatch.AddCount(1); @@ -3825,11 +3823,8 @@ ZenCacheDiskLayer::Flush() } } { - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - const int WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, Buckets.size())); - - WorkerThreadPool Pool(WorkerThreadPoolCount, "CacheBucket::Flush"); - Latch WorkLatch(1); + WorkerThreadPool& Pool = GetSmallWorkerPool(); + Latch WorkLatch(1); for (auto& Bucket : Buckets) { WorkLatch.AddCount(1); diff --git a/src/zenserver/projectstore/remoteprojectstore.cpp b/src/zenserver/projectstore/remoteprojectstore.cpp index d5d229e42..826c8ff51 100644 --- a/src/zenserver/projectstore/remoteprojectstore.cpp +++ b/src/zenserver/projectstore/remoteprojectstore.cpp @@ -13,6 +13,7 @@ #include <zencore/timer.h> #include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> +#include <zenutil/workerpools.h> #include <unordered_map> @@ -802,10 +803,7 @@ BuildContainer(CidStore& ChunkStore, const std::function<void(const std::unordered_set<IoHash, IoHash::Hasher>)>& OnBlockChunks, tsl::robin_map<IoHash, IoBuffer, IoHash::Hasher>* OutOptionalTempAttachments) { - // We are creating a worker thread pool here since we are uploading a lot of attachments in one go and we dont want to keep a - // WorkerThreadPool alive - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); - WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); AsyncRemoteResult RemoteResult; CbObject ContainerObject = BuildContainer(ChunkStore, @@ -1153,10 +1151,7 @@ SaveOplog(CidStore& ChunkStore, Stopwatch Timer; - // We are creating a worker thread pool here since we are uploading a lot of attachments in one go - // Doing upload is a rare and transient occation so we don't want to keep a WorkerThreadPool alive. - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); - WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount), "oplog_upload"sv); + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); std::filesystem::path AttachmentTempPath; if (UseTempBlocks) @@ -1528,10 +1523,7 @@ LoadOplog(CidStore& ChunkStore, Stopwatch Timer; - // We are creating a worker thread pool here since we are download a lot of attachments in one go and we dont want to keep a - // WorkerThreadPool alive - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); - WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); std::unordered_set<IoHash, IoHash::Hasher> Attachments; std::vector<std::vector<IoHash>> ChunksInBlocks; diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index 841f19295..2430267c1 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -24,6 +24,7 @@ #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> #include <zenutil/basicfile.h> +#include <zenutil/workerpools.h> #include <zenutil/zenserverprocess.h> #if ZEN_PLATFORM_WINDOWS @@ -659,6 +660,7 @@ ZenServer::Cleanup() m_AuthMgr.reset(); m_Http = {}; m_JobQueue.reset(); + ShutdownWorkerPools(); } catch (std::exception& Ex) { diff --git a/src/zenstore/cas.cpp b/src/zenstore/cas.cpp index c6bfda8b9..d38099117 100644 --- a/src/zenstore/cas.cpp +++ b/src/zenstore/cas.cpp @@ -23,6 +23,7 @@ #include <zenstore/cidstore.h> #include <zenstore/gc.h> #include <zenstore/scrubcontext.h> +#include <zenutil/workerpools.h> #include <gsl/gsl-lite.hpp> @@ -106,7 +107,7 @@ CasImpl::Initialize(const CidStoreConfiguration& InConfig) // Initialize payload storage { - WorkerThreadPool WorkerPool(3, "CasImpl::Initialize"); + WorkerThreadPool& WorkerPool = GetSmallWorkerPool(); std::vector<std::future<void>> Work; Work.emplace_back( WorkerPool.EnqueueTask(std::packaged_task<void()>{[&]() { m_LargeStrategy.Initialize(m_Config.RootDirectory, IsNewStore); }})); diff --git a/src/zenstore/gc.cpp b/src/zenstore/gc.cpp index b53ca4bab..e2ab34d1e 100644 --- a/src/zenstore/gc.cpp +++ b/src/zenstore/gc.cpp @@ -18,6 +18,7 @@ #include <zencore/workthreadpool.h> #include <zenstore/cidstore.h> #include <zenstore/scrubcontext.h> +#include <zenutil/workerpools.h> #include "cas.h" @@ -610,18 +611,11 @@ GcManager::CollectGarbage(const GcSettings& Settings) RwLock::SharedLockScope GcLock(m_Lock); - int WorkerThreadPoolCount = 0; - if (!Settings.SingleThread) - { - const size_t MaxHwTreadUse = Max((std::thread::hardware_concurrency() / 4u), 1u); - WorkerThreadPoolCount = gsl::narrow<int>(Min(MaxHwTreadUse, m_GcReferencers.size())); - } - Result.ReferencerStats.resize(m_GcReferencers.size()); std::unordered_map<std::unique_ptr<GcStoreCompactor>, GcCompactStoreStats*> StoreCompactors; RwLock StoreCompactorsLock; - WorkerThreadPool ThreadPool(WorkerThreadPoolCount, "GCV2"); + WorkerThreadPool& ThreadPool = Settings.SingleThread ? GetSyncWorkerPool() : GetSmallWorkerPool(); ZEN_INFO("GCV2: Removing expired data from {} referencers", m_GcReferencers.size()); if (!m_GcReferencers.empty()) @@ -1932,8 +1926,8 @@ GcScheduler::ScrubStorage(bool DoDelete, std::chrono::seconds TimeSlice) Stopwatch Timer; ZEN_INFO("scrubbing STARTING (delete mode => {})", DoDelete); - WorkerThreadPool ThreadPool{4, "scrubber"}; - ScrubContext Ctx{ThreadPool, Deadline}; + WorkerThreadPool& ThreadPool = GetSmallWorkerPool(); + ScrubContext Ctx{ThreadPool, Deadline}; try { diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h new file mode 100644 index 000000000..339120ece --- /dev/null +++ b/src/zenutil/include/zenutil/workerpools.h @@ -0,0 +1,21 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/workthreadpool.h> + +namespace zen { + +// Worker pool with std::thread::hardware_concurrency() worker threads +WorkerThreadPool& GetLargeWorkerPool(); + +// Worker pool with std::thread::hardware_concurrency() / 4 worker threads +WorkerThreadPool& GetSmallWorkerPool(); + +// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread +// This is useful for debugging when multiple async thread can make stepping in debugger complicated +WorkerThreadPool& GetSyncWorkerPool(); + +void ShutdownWorkerPools(); + +} // namespace zen diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp new file mode 100644 index 000000000..b511b0c5c --- /dev/null +++ b/src/zenutil/workerpools.cpp @@ -0,0 +1,89 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "zenutil/workerpools.h" + +#include <zencore/intmath.h> +#include <zencore/thread.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { +namespace { + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); + const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 1u)); + + RwLock PoolLock; + + std::unique_ptr<WorkerThreadPool> LargeWorkerPool; + std::unique_ptr<WorkerThreadPool> SmallWorkerPool; + std::unique_ptr<WorkerThreadPool> SyncWorkerPool; +} // namespace + +WorkerThreadPool& +GetLargeWorkerPool() +{ + { + RwLock::SharedLockScope _(PoolLock); + if (LargeWorkerPool) + { + return *LargeWorkerPool; + } + } + RwLock::ExclusiveLockScope _(PoolLock); + if (LargeWorkerPool) + { + return *LargeWorkerPool; + } + LargeWorkerPool.reset(new WorkerThreadPool(LargeWorkerThreadPoolTreadCount, "LargeThreadPool")); + return *LargeWorkerPool; +} + +WorkerThreadPool& +GetSmallWorkerPool() +{ + { + RwLock::SharedLockScope _(PoolLock); + if (SmallWorkerPool) + { + return *SmallWorkerPool; + } + } + RwLock::ExclusiveLockScope _(PoolLock); + if (SmallWorkerPool) + { + return *SmallWorkerPool; + } + SmallWorkerPool.reset(new WorkerThreadPool(SmallWorkerThreadPoolTreadCount, "SmallThreadPool")); + return *SmallWorkerPool; +} + +WorkerThreadPool& +GetSyncWorkerPool() +{ + { + RwLock::SharedLockScope _(PoolLock); + if (SyncWorkerPool) + { + return *SyncWorkerPool; + } + } + RwLock::ExclusiveLockScope _(PoolLock); + if (SyncWorkerPool) + { + return *SyncWorkerPool; + } + SyncWorkerPool.reset(new WorkerThreadPool(0, "SyncThreadPool")); + return *SyncWorkerPool; +} + +void +ShutdownWorkerPools() +{ + RwLock::ExclusiveLockScope _(PoolLock); + LargeWorkerPool.reset(); + SmallWorkerPool.reset(); + SyncWorkerPool.reset(); +} +} // namespace zen |