aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-03-27 16:08:47 +0100
committerGitHub Enterprise <[email protected]>2025-03-27 16:08:47 +0100
commit013ac818cd09c1d31bf9411e00b2bbbf02defa3f (patch)
treecdc94b8fe80f0c5db20f0417d76e5351fe480f4a /src/zenutil
parentMerge pull request #317 from ue-foundation/zs/ui-show-cook-artifacts (diff)
downloadzen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.tar.xz
zen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.zip
build cache prime (#327)
- Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive - Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched.
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/buildstoragecache.cpp59
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h7
-rw-r--r--src/zenutil/include/zenutil/workerpools.h3
-rw-r--r--src/zenutil/workerpools.cpp14
4 files changed, 74 insertions, 9 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
index c95215889..f273ac699 100644
--- a/src/zenutil/buildstoragecache.cpp
+++ b/src/zenutil/buildstoragecache.cpp
@@ -11,6 +11,7 @@
#include <zencore/workthreadpool.h>
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
+#include <zenutil/workerpools.h>
ZEN_THIRD_PARTY_INCLUDES_START
#include <tsl/robin_set.h>
@@ -27,13 +28,16 @@ public:
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
: m_HttpClient(HttpClient)
, m_Stats(Stats)
, m_Namespace(Namespace.empty() ? "none" : Namespace)
, m_Bucket(Bucket.empty() ? "none" : Bucket)
, m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
- , m_BackgroundWorkPool(1)
+ , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
+ , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
+ : GetTinyWorkerPool(EWorkloadType::Background))
, m_PendingBackgroundWorkCount(1)
, m_CancelBackgroundWork(false)
{
@@ -44,8 +48,11 @@ public:
try
{
m_CancelBackgroundWork.store(true);
- m_PendingBackgroundWorkCount.CountDown();
- m_PendingBackgroundWorkCount.Wait();
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ m_PendingBackgroundWorkCount.Wait();
+ }
}
catch (const std::exception& Ex)
{
@@ -86,6 +93,7 @@ public:
ZenContentType ContentType,
const CompositeBuffer& Payload) override
{
+ ZEN_ASSERT(!IsFlushed);
ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
ScheduleBackgroundWork(
[this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
@@ -132,6 +140,7 @@ public:
virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
{
+ ZEN_ASSERT(!IsFlushed);
ScheduleBackgroundWork([this,
BuildId = Oid(BuildId),
BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
@@ -329,6 +338,39 @@ public:
return {};
}
+ virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
+ {
+ if (IsFlushed)
+ {
+ return;
+ }
+ if (!IsFlushed)
+ {
+ m_PendingBackgroundWorkCount.CountDown();
+ IsFlushed = true;
+ }
+ if (m_PendingBackgroundWorkCount.Wait(100))
+ {
+ return;
+ }
+ while (true)
+ {
+ intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
+ if (UpdateCallback(Remaining))
+ {
+ if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS))
+ {
+ UpdateCallback(0);
+ return;
+ }
+ }
+ else
+ {
+ m_CancelBackgroundWork.store(true);
+ }
+ }
+ }
+
private:
void AddStatistic(const HttpClient::Response& Result)
{
@@ -343,8 +385,10 @@ private:
const std::string m_Namespace;
const std::string m_Bucket;
const std::filesystem::path m_TempFolderPath;
+ const bool m_BoostBackgroundThreadCount;
+ bool IsFlushed = false;
- WorkerThreadPool m_BackgroundWorkPool;
+ WorkerThreadPool& m_BackgroundWorkPool;
Latch m_PendingBackgroundWorkCount;
std::atomic<bool> m_CancelBackgroundWork;
};
@@ -354,9 +398,10 @@ CreateZenBuildStorageCache(HttpClient& HttpClient,
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath)
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount)
{
- return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath);
+ return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
}
} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
index 08c936bf5..cab35328d 100644
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ b/src/zenutil/include/zenutil/buildstoragecache.h
@@ -42,11 +42,16 @@ public:
};
virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
+
+ virtual void Flush(
+ int32_t UpdateInteralMS,
+ std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
};
std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
BuildStorageCache::Statistics& Stats,
std::string_view Namespace,
std::string_view Bucket,
- const std::filesystem::path& TempFolderPath);
+ const std::filesystem::path& TempFolderPath,
+ bool BoostBackgroundThreadCount);
} // namespace zen
diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h
index 9683ad720..df2033bca 100644
--- a/src/zenutil/include/zenutil/workerpools.h
+++ b/src/zenutil/include/zenutil/workerpools.h
@@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType);
// Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread
WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType);
+// Worker pool with minimum number of worker threads, but at least one thread
+WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType);
+
// Special worker pool that does not use worker thread but issues all scheduled work on the calling thread
// This is useful for debugging when multiple async thread can make stepping in debugger complicated
WorkerThreadPool& GetSyncWorkerPool();
diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp
index e3165e838..797034978 100644
--- a/src/zenutil/workerpools.cpp
+++ b/src/zenutil/workerpools.cpp
@@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END
namespace zen {
namespace {
- const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency());
+ const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u));
const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u));
const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u));
+ const int TinyWorkerThreadPoolTreadCount = 1;
static bool IsShutDown = false;
@@ -35,6 +36,9 @@ namespace {
WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"};
WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"};
+ WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"};
+ WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"};
+
WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"};
WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool)
@@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType)
}
WorkerThreadPool&
+GetTinyWorkerPool(EWorkloadType WorkloadType)
+{
+ return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool);
+}
+
+WorkerThreadPool&
GetSyncWorkerPool()
{
return EnsurePoolPtr(SyncWorkerPool);
@@ -91,6 +101,8 @@ ShutdownWorkerPools()
BackgroundMediumWorkerPool.Pool.reset();
BurstSmallWorkerPool.Pool.reset();
BackgroundSmallWorkerPool.Pool.reset();
+ BurstTinyWorkerPool.Pool.reset();
+ BackgroundTinyWorkerPool.Pool.reset();
SyncWorkerPool.Pool.reset();
}
} // namespace zen