diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-27 16:08:47 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-27 16:08:47 +0100 |
| commit | 013ac818cd09c1d31bf9411e00b2bbbf02defa3f (patch) | |
| tree | cdc94b8fe80f0c5db20f0417d76e5351fe480f4a /src/zenutil | |
| parent | Merge pull request #317 from ue-foundation/zs/ui-show-cook-artifacts (diff) | |
| download | zen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.tar.xz zen-013ac818cd09c1d31bf9411e00b2bbbf02defa3f.zip | |
build cache prime (#327)
- Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive
- Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched.
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 59 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 7 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/workerpools.h | 3 | ||||
| -rw-r--r-- | src/zenutil/workerpools.cpp | 14 |
4 files changed, 74 insertions, 9 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index c95215889..f273ac699 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -11,6 +11,7 @@ #include <zencore/workthreadpool.h> #include <zenhttp/httpclient.h> #include <zenhttp/packageformat.h> +#include <zenutil/workerpools.h> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_set.h> @@ -27,13 +28,16 @@ public: BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) : m_HttpClient(HttpClient) , m_Stats(Stats) , m_Namespace(Namespace.empty() ? "none" : Namespace) , m_Bucket(Bucket.empty() ? "none" : Bucket) , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) - , m_BackgroundWorkPool(1) + , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) + , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)) , m_PendingBackgroundWorkCount(1) , m_CancelBackgroundWork(false) { @@ -44,8 +48,11 @@ public: try { m_CancelBackgroundWork.store(true); - m_PendingBackgroundWorkCount.CountDown(); - m_PendingBackgroundWorkCount.Wait(); + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } } catch (const std::exception& Ex) { @@ -86,6 +93,7 @@ public: ZenContentType ContentType, const CompositeBuffer& Payload) override { + ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); ScheduleBackgroundWork( [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { @@ -132,6 +140,7 @@ public: virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override { + ZEN_ASSERT(!IsFlushed); ScheduleBackgroundWork([this, BuildId = Oid(BuildId), BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), @@ -329,6 +338,39 @@ public: return {}; } + virtual void Flush(int32_t UpdateInteralMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + { + if (IsFlushed) + { + return; + } + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + IsFlushed = true; + } + if (m_PendingBackgroundWorkCount.Wait(100)) + { + return; + } + while (true) + { + intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); + if (UpdateCallback(Remaining)) + { + if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + { + UpdateCallback(0); + return; + } + } + else + { + m_CancelBackgroundWork.store(true); + } + } + } + private: void AddStatistic(const HttpClient::Response& Result) { @@ -343,8 +385,10 @@ private: const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; + const bool m_BoostBackgroundThreadCount; + bool IsFlushed = false; - WorkerThreadPool m_BackgroundWorkPool; + WorkerThreadPool& m_BackgroundWorkPool; Latch m_PendingBackgroundWorkCount; std::atomic<bool> m_CancelBackgroundWork; }; @@ -354,9 +398,10 @@ CreateZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) { - return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath); + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); } } // namespace zen diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h index 08c936bf5..cab35328d 100644 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ b/src/zenutil/include/zenutil/buildstoragecache.h @@ -42,11 +42,16 @@ public: }; virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; + + virtual void Flush( + int32_t UpdateInteralMS, + std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; }; std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath); + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount); } // namespace zen diff --git a/src/zenutil/include/zenutil/workerpools.h b/src/zenutil/include/zenutil/workerpools.h index 9683ad720..df2033bca 100644 --- a/src/zenutil/include/zenutil/workerpools.h +++ b/src/zenutil/include/zenutil/workerpools.h @@ -21,6 +21,9 @@ WorkerThreadPool& GetMediumWorkerPool(EWorkloadType WorkloadType); // Worker pool with std::thread::hardware_concurrency() / 8 worker threads, but at least one thread WorkerThreadPool& GetSmallWorkerPool(EWorkloadType WorkloadType); +// Worker pool with minimum number of worker threads, but at least one thread +WorkerThreadPool& GetTinyWorkerPool(EWorkloadType WorkloadType); + // Special worker pool that does not use worker thread but issues all scheduled work on the calling thread // This is useful for debugging when multiple async thread can make stepping in debugger complicated WorkerThreadPool& GetSyncWorkerPool(); diff --git a/src/zenutil/workerpools.cpp b/src/zenutil/workerpools.cpp index e3165e838..797034978 100644 --- a/src/zenutil/workerpools.cpp +++ b/src/zenutil/workerpools.cpp @@ -11,9 +11,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace zen { namespace { - const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(std::thread::hardware_concurrency()); + const int LargeWorkerThreadPoolTreadCount = gsl::narrow<int>(Max(std::thread::hardware_concurrency() - 1u, 2u)); const int MediumWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 4u), 2u)); const int SmallWorkerThreadPoolTreadCount = gsl::narrow<int>(Max((std::thread::hardware_concurrency() / 8u), 1u)); + const int TinyWorkerThreadPoolTreadCount = 1; static bool IsShutDown = false; @@ -35,6 +36,9 @@ namespace { WorkerPool BurstSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(burst)"}; WorkerPool BackgroundSmallWorkerPool = {.TreadCount = SmallWorkerThreadPoolTreadCount, .Name = "SmallThreadPool(bkg)"}; + WorkerPool BurstTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(burst)"}; + WorkerPool BackgroundTinyWorkerPool = {.TreadCount = TinyWorkerThreadPoolTreadCount, .Name = "TinyThreadPool(bkg)"}; + WorkerPool SyncWorkerPool = {.TreadCount = 0, .Name = "SyncThreadPool"}; WorkerThreadPool& EnsurePoolPtr(WorkerPool& Pool) @@ -75,6 +79,12 @@ GetSmallWorkerPool(EWorkloadType WorkloadType) } WorkerThreadPool& +GetTinyWorkerPool(EWorkloadType WorkloadType) +{ + return EnsurePoolPtr(WorkloadType == EWorkloadType::Burst ? BurstTinyWorkerPool : BackgroundTinyWorkerPool); +} + +WorkerThreadPool& GetSyncWorkerPool() { return EnsurePoolPtr(SyncWorkerPool); @@ -91,6 +101,8 @@ ShutdownWorkerPools() BackgroundMediumWorkerPool.Pool.reset(); BurstSmallWorkerPool.Pool.reset(); BackgroundSmallWorkerPool.Pool.reset(); + BurstTinyWorkerPool.Pool.reset(); + BackgroundTinyWorkerPool.Pool.reset(); SyncWorkerPool.Pool.reset(); } } // namespace zen |