diff options
| author | Stefan Boberg <[email protected]> | 2025-10-10 18:09:53 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-10 18:09:53 +0200 |
| commit | c4801320150ea0c492cf343d490a0a339432f060 (patch) | |
| tree | d3013ef82c1707e38db3abac94d1fec0ca352394 /src | |
| parent | shorten thread pool names for Linux which has a limit of 15 characters (#563) (diff) | |
| download | zen-c4801320150ea0c492cf343d490a0a339432f060.tar.xz zen-c4801320150ea0c492cf343d490a0a339432f060.zip | |
add ability to limit concurrency (#565)
effective concurrency in zenserver can be limited via the `--corelimit=<N>` option on the command line. Any value passed in here will be used instead of the return value from `std::thread::hardware_concurrency()` if it is lower.
* added --corelimit option to zenserver
* made sure thread pools are configured lazily and not during global init
* added log output indicating effective and HW concurrency
* added change log entry
* removed debug logging from ZenEntryPoint::Run()
also removed main thread naming on Linux since it makes the output from `top` and similar tools confusing (it shows `main` instead of `zenserver`)
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/cache_cmd.cpp | 2 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 2 | ||||
| -rw-r--r-- | src/zen/cmds/rpcreplay_cmd.cpp | 2 | ||||
| -rw-r--r-- | src/zencore/include/zencore/thread.h | 6 | ||||
| -rw-r--r-- | src/zencore/thread.cpp | 21 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpasio.cpp | 2 | ||||
| -rw-r--r-- | src/zenhttp/servers/httpsys.cpp | 2 | ||||
| -rw-r--r-- | src/zenhttp/transports/asiotransport.cpp | 2 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 5 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 2 | ||||
| -rw-r--r-- | src/zenserver/config.cpp | 1 | ||||
| -rw-r--r-- | src/zenserver/config.h | 1 | ||||
| -rw-r--r-- | src/zenserver/main.cpp | 9 | ||||
| -rw-r--r-- | src/zenserver/zenserver.cpp | 2 | ||||
| -rw-r--r-- | src/zenstore/blockstore.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/cache/structuredcachestore.cpp | 4 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 6 | ||||
| -rw-r--r-- | src/zenstore/workspaces.cpp | 8 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 6 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 115 |
20 files changed, 128 insertions, 74 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp index 85dcd7648..2679df10a 100644 --- a/src/zen/cmds/cache_cmd.cpp +++ b/src/zen/cmds/cache_cmd.cpp @@ -538,7 +538,7 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a return Package; }); - WorkerThreadPool WorkerPool(gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 2u), 2u))); + WorkerThreadPool WorkerPool(gsl::narrow<int>(Max((GetHardwareConcurrency() / 2u), 2u))); Latch WorkLatch(1); std::uniform_int_distribution<uint32_t> SizeCountDistribution(m_MaxAttachmentCount > 0 ? 0 : 1, diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 6f9719909..3f82bf982 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -1950,7 +1950,7 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg std::atomic_int64_t FileCount = 0; int OplogEntryCount = 0; - size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); + size_t WorkerCount = Min(GetHardwareConcurrency(), 16u); WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); Latch WorkRemaining(1); size_t EmitCount = 0; diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp index dbf15ddd4..70e9e5300 100644 --- a/src/zen/cmds/rpcreplay_cmd.cpp +++ b/src/zen/cmds/rpcreplay_cmd.cpp @@ -123,7 +123,7 @@ RpcReplayCommand::RpcReplayCommand() "w", "numthreads", "Number of worker threads per process", - cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", std::thread::hardware_concurrency())), + cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", GetHardwareConcurrency())), "<count>"); m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>"); m_Options.add_option("", diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h index d9fb5c023..11ef01a35 100644 --- a/src/zencore/include/zencore/thread.h +++ b/src/zencore/include/zencore/thread.h @@ -233,8 +233,10 @@ SetAtomicMax(std::atomic_uint64_t& Max, uint64_t Value) } } -ZENCORE_API int GetCurrentThreadId(); -ZENCORE_API void Sleep(int ms); +void LimitHardwareConcurrency(unsigned int); +unsigned int GetHardwareConcurrency(); +int GetCurrentThreadId(); +void Sleep(int ms); void thread_forcelink(); // internal diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp index abf282467..7bd21a229 100644 --- a/src/zencore/thread.cpp +++ b/src/zencore/thread.cpp @@ -45,6 +45,27 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { +static unsigned int LimitConcurrency = 0; + +void +LimitHardwareConcurrency(unsigned int Limit) +{ + LimitConcurrency = Limit; +} + +unsigned int +GetHardwareConcurrency() +{ + static unsigned int SysLimit = std::thread::hardware_concurrency(); + + if (LimitConcurrency) + { + return Min(SysLimit, LimitConcurrency); + } + + return SysLimit; +} + #if ZEN_PLATFORM_WINDOWS // The information on how to set the thread name comes from // a MSDN article: http://msdn2.microsoft.com/en-us/library/xcb2z8hs.aspx diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp index 2023b6d98..492e867fb 100644 --- a/src/zenhttp/servers/httpasio.cpp +++ b/src/zenhttp/servers/httpasio.cpp @@ -1122,7 +1122,7 @@ private: HttpAsioServer::HttpAsioServer(bool ForceLoopback, unsigned int ThreadCount) : m_ForceLoopback(ForceLoopback) -, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(std::thread::hardware_concurrency(), 8u)) +, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(GetHardwareConcurrency(), 8u)) , m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>()) { ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser)); diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp index 95d83911d..c83675f2c 100644 --- a/src/zenhttp/servers/httpsys.cpp +++ b/src/zenhttp/servers/httpsys.cpp @@ -930,7 +930,7 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig) if (m_InitialConfig.ThreadCount == 0) { - MinThreadCount = Max(8u, std::thread::hardware_concurrency()); + MinThreadCount = Max(8u, GetHardwareConcurrency()); } else { diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp index 96a15518c..23ac1bc8b 100644 --- a/src/zenhttp/transports/asiotransport.cpp +++ b/src/zenhttp/transports/asiotransport.cpp @@ -352,7 +352,7 @@ AsioTransportConnection::OnResponseDataSent(const asio::error_code& Ec, std::siz ////////////////////////////////////////////////////////////////////////// -AsioTransportPlugin::AsioTransportPlugin() : m_ThreadCount(Max(std::thread::hardware_concurrency(), 8u)) +AsioTransportPlugin::AsioTransportPlugin() : m_ThreadCount(Max(GetHardwareConcurrency(), 8u)) { } diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 0d1e0d93d..727a25ba1 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -3452,9 +3452,8 @@ TEST_CASE_TEMPLATE("project.store.export", std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options); RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo(); - uint32_t NetworkWorkerCount = Max(std::thread::hardware_concurrency() / 4u, 2u); - uint32_t WorkerCount = - (NetworkWorkerCount < std::thread::hardware_concurrency()) ? Max(std::thread::hardware_concurrency() - NetworkWorkerCount, 4u) : 4u; + uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u); + uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u; WorkerThreadPool WorkerPool(WorkerCount); WorkerThreadPool NetworkPool(NetworkWorkerCount); diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index dd5bf05cb..9f87c208c 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -423,7 +423,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request) std::string RecordPath = UrlDecode(Params.GetValue("path")); - uint32_t ThreadCount = std::thread::hardware_concurrency(); + uint32_t ThreadCount = GetHardwareConcurrency(); if (auto Param = Params.GetValue("thread_count"); Param.empty() == false) { if (auto Value = ParseInt<uint64_t>(Param)) diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index 0cf5a9ca3..697d44214 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -817,6 +817,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("malloc", "Configure memory allocator subsystem", cxxopts::value(ServerOptions.MemoryOptions)->default_value("mimalloc")); + options.add_options()("corelimit", "Limit concurrency", cxxopts::value(ServerOptions.CoreLimit)); // clang-format off options.add_options("logging") diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 8380e72e7..3f7cb149a 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -207,6 +207,7 @@ struct ZenServerOptions bool ObjectStoreEnabled = false; bool NoConsoleOutput = false; // Control default use of stdout for diagnostics bool QuietConsole = false; // Configure console logger output to level WARN + int CoreLimit = 0; // If set, hardware concurrency queries are capped at this number std::string Loggers[zen::logging::level::LogLevelCount]; std::string ScrubOptions; #if ZEN_WITH_TRACE diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index a91c95ffb..b4d53ec5f 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -93,8 +93,13 @@ ZenEntryPoint::ZenEntryPoint(ZenServerOptions& ServerOptions) : m_ServerOptions( int ZenEntryPoint::Run() { - ZEN_INFO("ZenEntryPoint::Run()"); + // On Linux this has the unfortunate side effect of making `top` and other tools display + // `main` as the program name since threads and processes have a closer relationship + // there. So we don't name the main thread explicitly there. + +#ifndef ZEN_PLATFORM_LINUX zen::SetCurrentThreadName("main"); +#endif #if ZEN_USE_SENTRY SentryIntegration Sentry; @@ -444,6 +449,8 @@ main(int argc, char* argv[]) #endif } + LimitHardwareConcurrency(ServerOptions.CoreLimit); + std::string_view DeleteReason; if (ServerOptions.IsCleanStart) diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp index cab234165..975459e68 100644 --- a/src/zenserver/zenserver.cpp +++ b/src/zenserver/zenserver.cpp @@ -244,6 +244,8 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen m_StatusService.RegisterHandler("status", *this); + ZEN_INFO("Effective concurrency: {} (hw: {})", GetHardwareConcurrency(), std::thread::hardware_concurrency()); + // Initialize storage and services ZEN_INFO("initializing storage"); diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 0861baaf8..0fee18420 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -1688,7 +1688,7 @@ TEST_CASE("blockstore.iterate.chunks") .Size = DefaultIterateSmallChunkWindowSize * 2}; BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024}; - WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 4u)); + WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 4u)); std::vector<BlockStoreLocation> Locations{FirstChunkLocation, SecondChunkLocation, @@ -1807,7 +1807,7 @@ TEST_CASE("blockstore.thread.read.write") std::vector<BlockStoreLocation> ChunkLocations; ChunkLocations.resize(ChunkCount); - WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 8u)); + WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 8u)); { std::atomic<size_t> WorkCompleted = 0; Latch L(1); diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp index da6acbde4..07e72ebb5 100644 --- a/src/zenstore/cache/structuredcachestore.cpp +++ b/src/zenstore/cache/structuredcachestore.cpp @@ -1582,7 +1582,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true)) CreateDirectories(TempDir.Path()); - WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency(), 8u)); + WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency(), 8u)); GcManager Gc; auto JobQueue = MakeJobQueue(1, "testqueue"); ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {}); @@ -2257,7 +2257,7 @@ TEST_CASE("cachestore.newgc.basics") std::vector<IoHash> CacheRecords; std::vector<IoHash> UnstructuredCacheValues; - WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u)); + WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 2u)); const auto TearDrinkerBucket = "teardrinker"sv; { diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 5cc4dad54..bf843171e 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -1722,7 +1722,7 @@ TEST_CASE("compactcas.restart") { uint64_t ExpectedSize = 0; - WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put"); auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { Latch WorkLatch(1); @@ -1959,7 +1959,7 @@ TEST_CASE("compactcas.restart") TEST_CASE("compactcas.iteratechunks") { std::atomic<size_t> WorkCompleted = 0; - WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put"); const uint64_t kChunkSize = 1048 + 395; const size_t kChunkCount = 63840; @@ -2035,7 +2035,7 @@ TEST_CASE("compactcas.iteratechunks") WorkLatch.Wait(); } - WorkerThreadPool BatchWorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "fetch"); + WorkerThreadPool BatchWorkerPool(Max(GetHardwareConcurrency() - 1u, 2u), "fetch"); { std::vector<std::atomic<bool>> FetchedFlags(Hashes.size()); std::atomic<uint64_t> FetchedSize = 0; diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp index 4e7bd79a3..c2391588c 100644 --- a/src/zenstore/workspaces.cpp +++ b/src/zenstore/workspaces.cpp @@ -1356,7 +1356,7 @@ TEST_CASE("workspaces.scanfolder") { using namespace std::literals; - WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + WorkerThreadPool WorkerPool(GetHardwareConcurrency()); ScopedTemporaryDirectory TempDir; std::filesystem::path RootPath = TempDir.Path(); @@ -1381,7 +1381,7 @@ TEST_CASE("workspace.share.paths") { using namespace std::literals; - WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + WorkerThreadPool WorkerPool(GetHardwareConcurrency()); ScopedTemporaryDirectory TempDir; std::filesystem::path RootPath = TempDir.Path() / "workspace"; @@ -1409,7 +1409,7 @@ TEST_CASE("workspace.share.basic") { using namespace std::literals; - WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + WorkerThreadPool WorkerPool(GetHardwareConcurrency()); ScopedTemporaryDirectory TempDir; std::filesystem::path RootPath = TempDir.Path() / "workspace"; @@ -1503,7 +1503,7 @@ TEST_CASE("workspace.share.alias") { using namespace std::literals; - WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + WorkerThreadPool WorkerPool(GetHardwareConcurrency()); ScopedTemporaryDirectory TempDir; std::filesystem::path RootPath = TempDir.Path() / "workspace"; diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index df2033bca..3c388dcbe 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -12,13 +12,13 @@ enum class EWorkloadType Background // Used for background jobs such as GC/Scrub/oplog import-export }; -// Worker pool with std::thread::hardware_concurrency() worker threads, but at least one thread +// Worker pool with GetHardwareConcurrency() worker threads, but at least one thread WorkerThreadPool& GetLargeWorkerPool(EWorkloadType WorkloadType); -// Worker pool with std::thread::hardware_concurrency() / 4 worker threads, but at least one thread +// Worker pool with GetHardwareConcurrency() / 4 worker threads, but at least one thread WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); -// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread +// Worker pool with GetHardwareConcurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); // Worker pool with minimum number of worker threads, but at least one thread diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index 55c91e68e..1bab39b2a 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -11,98 +11,119 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { - const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u)); - const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u)); - const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u)); - const int TinyWorkerThreadPoolTreadCount = 1; - static bool IsShutDown = false; + struct WorkerPools + { + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(GetHardwareConcurrency() - 1u, 2u)); + const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((GetHardwareConcurrency() / 4u), 2u)); + const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((GetHardwareConcurrency() / 8u), 1u)); + const int TinyWorkerThreadPoolTreadCount = 1; - RwLock PoolLock; + bool IsShutDown = false; - struct WorkerPool - { - std::unique_ptr<WorkerThreadPool> Pool; - const int TreadCount; - const std::string_view Name; - }; + RwLock PoolLock; - WorkerPool BurstLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "large"}; - WorkerPool BackgroundLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "large_bg"}; + struct WorkerPool + { + std::unique_ptr<WorkerThreadPool> Pool; + const int TreadCount; + const std::string_view Name; + }; - WorkerPool BurstMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "medium"}; - WorkerPool BackgroundMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "medium_bg"}; + WorkerPool BurstLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "large"}; + WorkerPool BackgroundLargeWorkerPool = {.TreadCount = LargeWorkerThreadPoolTreadCount, .Name = "large_bg"}; - WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "small"}; - WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "small_bg"}; + WorkerPool BurstMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "medium"}; + WorkerPool BackgroundMediumWorkerPool = {.TreadCount = MediumWorkerThreadPoolTreadCount, .Name = "medium_bg"}; - WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "tiny"}; - WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "tiny_bg"}; + WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "small"}; + WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "small_bg"}; - WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "synctp"}; + WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "tiny"}; + WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "tiny_bg"}; - WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) - { + WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "synctp"}; + + WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) { - RwLock::SharedLockScope _(PoolLock); - if (Pool.Pool) { - return *Pool.Pool; + RwLock::SharedLockScope _(PoolLock); + if (Pool.Pool) + { + return *Pool.Pool; + } } + RwLock::ExclusiveLockScope _(PoolLock); + ZEN_ASSERT(!IsShutDown); + if (!Pool.Pool) + { + Pool.Pool.reset(new WorkerThreadPool(Pool.TreadCount, Pool.Name)); + } + return *Pool.Pool; } - RwLock::ExclusiveLockScope _(PoolLock); - ZEN_ASSERT(!IsShutDown); - if (!Pool.Pool) + + void Shutdown() { - Pool.Pool.reset(new WorkerThreadPool(Pool.TreadCount, Pool.Name)); + RwLock::ExclusiveLockScope _(PoolLock); + IsShutDown = true; + BurstLargeWorkerPool.Pool.reset(); + BackgroundLargeWorkerPool.Pool.reset(); + BurstMediumWorkerPool.Pool.reset(); + BackgroundMediumWorkerPool.Pool.reset(); + BurstSmallWorkerPool.Pool.reset(); + BackgroundSmallWorkerPool.Pool.reset(); + BurstTinyWorkerPool.Pool.reset(); + BackgroundTinyWorkerPool.Pool.reset(); + SyncWorkerPool.Pool.reset(); } - return *Pool.Pool; + }; + + WorkerPools& Pools() + { + static WorkerPools _; + return _; } + } // namespace WorkerThreadPool& GetLargeWorkerPool(EWorkloadType WorkloadType) { - return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstLargeWorkerPool : BackgroundLargeWorkerPool); + auto& p = Pools(); + return p.EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? p.BurstLargeWorkerPool : p.BackgroundLargeWorkerPool); } WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType) { - return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstMediumWorkerPool : BackgroundMediumWorkerPool); + auto& p = Pools(); + return p.EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? p.BurstMediumWorkerPool : p.BackgroundMediumWorkerPool); } WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType) { - return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstSmallWorkerPool : BackgroundSmallWorkerPool); + auto& p = Pools(); + return p.EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? p.BurstSmallWorkerPool : p.BackgroundSmallWorkerPool); } WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType) { - return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool); + auto& p = Pools(); + return p.EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? p.BurstTinyWorkerPool : p.BackgroundTinyWorkerPool); } WorkerThreadPool& GetSyncWorkerPool() { - return EnsurePoolPtr(SyncWorkerPool); + auto& p = Pools(); + return p.EnsurePoolPtr(p.SyncWorkerPool); } void ShutdownWorkerPools() { - RwLock::ExclusiveLockScope _(PoolLock); - IsShutDown = true; - BurstLargeWorkerPool.Pool.reset(); - BackgroundLargeWorkerPool.Pool.reset(); - BurstMediumWorkerPool.Pool.reset(); - BackgroundMediumWorkerPool.Pool.reset(); - BurstSmallWorkerPool.Pool.reset(); - BackgroundSmallWorkerPool.Pool.reset(); - BurstTinyWorkerPool.Pool.reset(); - BackgroundTinyWorkerPool.Pool.reset(); - SyncWorkerPool.Pool.reset(); + Pools().Shutdown(); } } // namespace zen |