aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/cache_cmd.cpp2
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp2
-rw-r--r--src/zen/cmds/rpcreplay_cmd.cpp2
-rw-r--r--src/zencore/include/zencore/thread.h6
-rw-r--r--src/zencore/thread.cpp21
-rw-r--r--src/zenhttp/servers/httpasio.cpp2
-rw-r--r--src/zenhttp/servers/httpsys.cpp2
-rw-r--r--src/zenhttp/transports/asiotransport.cpp2
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp5
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp2
-rw-r--r--src/zenserver/config.cpp1
-rw-r--r--src/zenserver/config.h1
-rw-r--r--src/zenserver/main.cpp2
-rw-r--r--src/zenstore/blockstore.cpp4
-rw-r--r--src/zenstore/cache/structuredcachestore.cpp4
-rw-r--r--src/zenstore/compactcas.cpp6
-rw-r--r--src/zenstore/workspaces.cpp8
-rw-r--r--src/zenutil/include/zenutil/workerpools.h6
-rw-r--r--src/zenutil/workerpools.cpp6
19 files changed, 55 insertions, 29 deletions
diff --git a/src/zen/cmds/cache_cmd.cpp b/src/zen/cmds/cache_cmd.cpp
index 85dcd7648..2679df10a 100644
--- a/src/zen/cmds/cache_cmd.cpp
+++ b/src/zen/cmds/cache_cmd.cpp
@@ -538,7 +538,7 @@ CacheGenerateCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
return Package;
});
- WorkerThreadPool WorkerPool(gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 2u), 2u)));
+ WorkerThreadPool WorkerPool(gsl::narrow<int>(Max((GetHardwareConcurrency() / 2u), 2u)));
Latch WorkLatch(1);
std::uniform_int_distribution<uint32_t> SizeCountDistribution(m_MaxAttachmentCount > 0 ? 0 : 1,
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 6f9719909..3f82bf982 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -1950,7 +1950,7 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
std::atomic_int64_t FileCount = 0;
int OplogEntryCount = 0;
- size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
+ size_t WorkerCount = Min(GetHardwareConcurrency(), 16u);
WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
Latch WorkRemaining(1);
size_t EmitCount = 0;
diff --git a/src/zen/cmds/rpcreplay_cmd.cpp b/src/zen/cmds/rpcreplay_cmd.cpp
index dbf15ddd4..70e9e5300 100644
--- a/src/zen/cmds/rpcreplay_cmd.cpp
+++ b/src/zen/cmds/rpcreplay_cmd.cpp
@@ -123,7 +123,7 @@ RpcReplayCommand::RpcReplayCommand()
"w",
"numthreads",
"Number of worker threads per process",
- cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", std::thread::hardware_concurrency())),
+ cxxopts::value(m_ThreadCount)->default_value(fmt::format("{}", GetHardwareConcurrency())),
"<count>");
m_Options.add_option("", "", "onhost", "Replay on host, bypassing http/network layer", cxxopts::value(m_OnHost), "<onhost>");
m_Options.add_option("",
diff --git a/src/zencore/include/zencore/thread.h b/src/zencore/include/zencore/thread.h
index d9fb5c023..11ef01a35 100644
--- a/src/zencore/include/zencore/thread.h
+++ b/src/zencore/include/zencore/thread.h
@@ -233,8 +233,10 @@ SetAtomicMax(std::atomic_uint64_t& Max, uint64_t Value)
}
}
-ZENCORE_API int GetCurrentThreadId();
-ZENCORE_API void Sleep(int ms);
+void LimitHardwareConcurrency(unsigned int);
+unsigned int GetHardwareConcurrency();
+int GetCurrentThreadId();
+void Sleep(int ms);
void thread_forcelink(); // internal
diff --git a/src/zencore/thread.cpp b/src/zencore/thread.cpp
index abf282467..7bd21a229 100644
--- a/src/zencore/thread.cpp
+++ b/src/zencore/thread.cpp
@@ -45,6 +45,27 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
+static unsigned int LimitConcurrency = 0;
+
+void
+LimitHardwareConcurrency(unsigned int Limit)
+{
+ LimitConcurrency = Limit;
+}
+
+unsigned int
+GetHardwareConcurrency()
+{
+ static unsigned int SysLimit = std::thread::hardware_concurrency();
+
+ if (LimitConcurrency)
+ {
+ return Min(SysLimit, LimitConcurrency);
+ }
+
+ return SysLimit;
+}
+
#if ZEN_PLATFORM_WINDOWS
// The information on how to set the thread name comes from
// a MSDN article: http://msdn2.microsoft.com/en-us/library/xcb2z8hs.aspx
diff --git a/src/zenhttp/servers/httpasio.cpp b/src/zenhttp/servers/httpasio.cpp
index 2023b6d98..492e867fb 100644
--- a/src/zenhttp/servers/httpasio.cpp
+++ b/src/zenhttp/servers/httpasio.cpp
@@ -1122,7 +1122,7 @@ private:
HttpAsioServer::HttpAsioServer(bool ForceLoopback, unsigned int ThreadCount)
: m_ForceLoopback(ForceLoopback)
-, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(std::thread::hardware_concurrency(), 8u))
+, m_ThreadCount(ThreadCount != 0 ? ThreadCount : Max(GetHardwareConcurrency(), 8u))
, m_Impl(std::make_unique<asio_http::HttpAsioServerImpl>())
{
ZEN_DEBUG("Request object size: {} ({:#x})", sizeof(HttpRequestParser), sizeof(HttpRequestParser));
diff --git a/src/zenhttp/servers/httpsys.cpp b/src/zenhttp/servers/httpsys.cpp
index 95d83911d..c83675f2c 100644
--- a/src/zenhttp/servers/httpsys.cpp
+++ b/src/zenhttp/servers/httpsys.cpp
@@ -930,7 +930,7 @@ HttpSysServer::HttpSysServer(const HttpSysConfig& InConfig)
if (m_InitialConfig.ThreadCount == 0)
{
- MinThreadCount = Max(8u, std::thread::hardware_concurrency());
+ MinThreadCount = Max(8u, GetHardwareConcurrency());
}
else
{
diff --git a/src/zenhttp/transports/asiotransport.cpp b/src/zenhttp/transports/asiotransport.cpp
index 96a15518c..23ac1bc8b 100644
--- a/src/zenhttp/transports/asiotransport.cpp
+++ b/src/zenhttp/transports/asiotransport.cpp
@@ -352,7 +352,7 @@ AsioTransportConnection::OnResponseDataSent(const asio::error_code& Ec, std::siz
//////////////////////////////////////////////////////////////////////////
-AsioTransportPlugin::AsioTransportPlugin() : m_ThreadCount(Max(std::thread::hardware_concurrency(), 8u))
+AsioTransportPlugin::AsioTransportPlugin() : m_ThreadCount(Max(GetHardwareConcurrency(), 8u))
{
}
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 0d1e0d93d..727a25ba1 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -3452,9 +3452,8 @@ TEST_CASE_TEMPLATE("project.store.export",
std::shared_ptr<RemoteProjectStore> RemoteStore = CreateFileRemoteStore(Options);
RemoteProjectStore::RemoteStoreInfo StoreInfo = RemoteStore->GetInfo();
- uint32_t NetworkWorkerCount = Max(std::thread::hardware_concurrency() / 4u, 2u);
- uint32_t WorkerCount =
- (NetworkWorkerCount < std::thread::hardware_concurrency()) ? Max(std::thread::hardware_concurrency() - NetworkWorkerCount, 4u) : 4u;
+ uint32_t NetworkWorkerCount = Max(GetHardwareConcurrency() / 4u, 2u);
+ uint32_t WorkerCount = (NetworkWorkerCount < GetHardwareConcurrency()) ? Max(GetHardwareConcurrency() - NetworkWorkerCount, 4u) : 4u;
WorkerThreadPool WorkerPool(WorkerCount);
WorkerThreadPool NetworkPool(NetworkWorkerCount);
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index dd5bf05cb..9f87c208c 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -423,7 +423,7 @@ HttpStructuredCacheService::HandleRequest(HttpServerRequest& Request)
std::string RecordPath = UrlDecode(Params.GetValue("path"));
- uint32_t ThreadCount = std::thread::hardware_concurrency();
+ uint32_t ThreadCount = GetHardwareConcurrency();
if (auto Param = Params.GetValue("thread_count"); Param.empty() == false)
{
if (auto Value = ParseInt<uint64_t>(Param))
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 0cf5a9ca3..697d44214 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -817,6 +817,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
options.add_options()("malloc",
"Configure memory allocator subsystem",
cxxopts::value(ServerOptions.MemoryOptions)->default_value("mimalloc"));
+ options.add_options()("corelimit", "Limit concurrency", cxxopts::value(ServerOptions.CoreLimit));
// clang-format off
options.add_options("logging")
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index 8380e72e7..3f7cb149a 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -207,6 +207,7 @@ struct ZenServerOptions
bool ObjectStoreEnabled = false;
bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
bool QuietConsole = false; // Configure console logger output to level WARN
+ int CoreLimit = 0; // If set, hardware concurrency queries are capped at this number
std::string Loggers[zen::logging::level::LogLevelCount];
std::string ScrubOptions;
#if ZEN_WITH_TRACE
diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp
index a91c95ffb..bd4f994b8 100644
--- a/src/zenserver/main.cpp
+++ b/src/zenserver/main.cpp
@@ -444,6 +444,8 @@ main(int argc, char* argv[])
#endif
}
+ LimitHardwareConcurrency(ServerOptions.CoreLimit);
+
std::string_view DeleteReason;
if (ServerOptions.IsCleanStart)
diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp
index 0861baaf8..0fee18420 100644
--- a/src/zenstore/blockstore.cpp
+++ b/src/zenstore/blockstore.cpp
@@ -1688,7 +1688,7 @@ TEST_CASE("blockstore.iterate.chunks")
.Size = DefaultIterateSmallChunkWindowSize * 2};
BlockStoreLocation BadBlockIndex = {.BlockIndex = 0xfffff, .Offset = 1024, .Size = 1024};
- WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 4u));
+ WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 4u));
std::vector<BlockStoreLocation> Locations{FirstChunkLocation,
SecondChunkLocation,
@@ -1807,7 +1807,7 @@ TEST_CASE("blockstore.thread.read.write")
std::vector<BlockStoreLocation> ChunkLocations;
ChunkLocations.resize(ChunkCount);
- WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 8u));
+ WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 8u));
{
std::atomic<size_t> WorkCompleted = 0;
Latch L(1);
diff --git a/src/zenstore/cache/structuredcachestore.cpp b/src/zenstore/cache/structuredcachestore.cpp
index da6acbde4..07e72ebb5 100644
--- a/src/zenstore/cache/structuredcachestore.cpp
+++ b/src/zenstore/cache/structuredcachestore.cpp
@@ -1582,7 +1582,7 @@ TEST_CASE("cachestore.threadedinsert") // * doctest::skip(true))
CreateDirectories(TempDir.Path());
- WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency(), 8u));
+ WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency(), 8u));
GcManager Gc;
auto JobQueue = MakeJobQueue(1, "testqueue");
ZenCacheNamespace Zcs(Gc, *JobQueue, TempDir.Path(), {});
@@ -2257,7 +2257,7 @@ TEST_CASE("cachestore.newgc.basics")
std::vector<IoHash> CacheRecords;
std::vector<IoHash> UnstructuredCacheValues;
- WorkerThreadPool WorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u));
+ WorkerThreadPool WorkerPool(Max(GetHardwareConcurrency() - 1u, 2u));
const auto TearDrinkerBucket = "teardrinker"sv;
{
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 5cc4dad54..bf843171e 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -1722,7 +1722,7 @@ TEST_CASE("compactcas.restart")
{
uint64_t ExpectedSize = 0;
- WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+ WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put");
auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) {
Latch WorkLatch(1);
@@ -1959,7 +1959,7 @@ TEST_CASE("compactcas.restart")
TEST_CASE("compactcas.iteratechunks")
{
std::atomic<size_t> WorkCompleted = 0;
- WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put");
+ WorkerThreadPool ThreadPool(Max(GetHardwareConcurrency() - 1u, 2u), "put");
const uint64_t kChunkSize = 1048 + 395;
const size_t kChunkCount = 63840;
@@ -2035,7 +2035,7 @@ TEST_CASE("compactcas.iteratechunks")
WorkLatch.Wait();
}
- WorkerThreadPool BatchWorkerPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "fetch");
+ WorkerThreadPool BatchWorkerPool(Max(GetHardwareConcurrency() - 1u, 2u), "fetch");
{
std::vector<std::atomic<bool>> FetchedFlags(Hashes.size());
std::atomic<uint64_t> FetchedSize = 0;
diff --git a/src/zenstore/workspaces.cpp b/src/zenstore/workspaces.cpp
index 4e7bd79a3..c2391588c 100644
--- a/src/zenstore/workspaces.cpp
+++ b/src/zenstore/workspaces.cpp
@@ -1356,7 +1356,7 @@ TEST_CASE("workspaces.scanfolder")
{
using namespace std::literals;
- WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
+ WorkerThreadPool WorkerPool(GetHardwareConcurrency());
ScopedTemporaryDirectory TempDir;
std::filesystem::path RootPath = TempDir.Path();
@@ -1381,7 +1381,7 @@ TEST_CASE("workspace.share.paths")
{
using namespace std::literals;
- WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
+ WorkerThreadPool WorkerPool(GetHardwareConcurrency());
ScopedTemporaryDirectory TempDir;
std::filesystem::path RootPath = TempDir.Path() / "workspace";
@@ -1409,7 +1409,7 @@ TEST_CASE("workspace.share.basic")
{
using namespace std::literals;
- WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
+ WorkerThreadPool WorkerPool(GetHardwareConcurrency());
ScopedTemporaryDirectory TempDir;
std::filesystem::path RootPath = TempDir.Path() / "workspace";
@@ -1503,7 +1503,7 @@ TEST_CASE("workspace.share.alias")
{
using namespace std::literals;
- WorkerThreadPool WorkerPool(std::thread::hardware_concurrency());
+ WorkerThreadPool WorkerPool(GetHardwareConcurrency());
ScopedTemporaryDirectory TempDir;
std::filesystem::path RootPath = TempDir.Path() / "workspace";
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index df2033bca..3c388dcbe 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -12,13 +12,13 @@ enum class EWorkloadType
Background // Used for background jobs such as GC/Scrub/oplog import-export
};
-// Worker pool with std::thread::hardware_concurrency() worker threads, but at least one thread
+// Worker pool with GetHardwareConcurrency() worker threads, but at least one thread
WorkerThreadPool& GetLargeWorkerPool(EWorkloadType WorkloadType);
-// Worker pool with std::thread::hardware_concurrency() / 4 worker threads, but at least one thread
+// Worker pool with GetHardwareConcurrency() / 4 worker threads, but at least one thread
WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
-// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
+// Worker pool with GetHardwareConcurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
// Worker pool with minimum number of worker threads, but at least one thread
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index 55c91e68e..cd6421e0b 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -11,9 +11,9 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
- const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u));
- const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u));
- const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u));
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(GetHardwareConcurrency() - 1u, 2u));
+ const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((GetHardwareConcurrency() / 4u), 2u));
+ const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((GetHardwareConcurrency() / 8u), 1u));
const int TinyWorkerThreadPoolTreadCount = 1;
static bool IsShutDown = false;