From 28bc5ebf05984385cc0567c89b1d8e7a541ebef8 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Wed, 26 Mar 2025 17:06:23 +0100 Subject: zen build cache service (#318) - **EXPERIMENTAL** `zen builds` - Feature: `--zen-cache-host` option for `upload` and `download` operations to use a zenserver host `/builds` endpoint for storing build blob and blob metadata - Feature: New `/builds` endpoint for caching build blobs and blob metadata - `/builds/{namespace}/{bucket}/{buildid}/blobs/{hash}` `GET` and `PUT` method for storing and fetching blobs - `/builds/{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata` `POST` method for storing metadata about blobs - `/builds/{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata` `POST` method for fetching metadata about blobs - `/builds/{namespace}/{bucket}/{buildid}/blobs/exists` `POST` method for checking existance of blobs --- src/zenutil/buildstoragecache.cpp | 362 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 362 insertions(+) create mode 100644 src/zenutil/buildstoragecache.cpp (limited to 'src/zenutil/buildstoragecache.cpp') diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp new file mode 100644 index 000000000..c95215889 --- /dev/null +++ b/src/zenutil/buildstoragecache.cpp @@ -0,0 +1,362 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +ZEN_THIRD_PARTY_INCLUDES_START +#include +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenBuildStorageCache : public BuildStorageCache +{ +public: + explicit ZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath) + : m_HttpClient(HttpClient) + , m_Stats(Stats) + , m_Namespace(Namespace.empty() ? "none" : Namespace) + , m_Bucket(Bucket.empty() ? "none" : Bucket) + , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) + , m_BackgroundWorkPool(1) + , m_PendingBackgroundWorkCount(1) + , m_CancelBackgroundWork(false) + { + } + + virtual ~ZenBuildStorageCache() + { + try + { + m_CancelBackgroundWork.store(true); + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); + } + } + + void ScheduleBackgroundWork(std::function&& Work) + { + m_PendingBackgroundWorkCount.AddCount(1); + try + { + m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) + { + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + m_PendingBackgroundWorkCount.CountDown(); + ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); + } + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ScheduleBackgroundWork( + [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::KeyValueMap Headers; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); + } + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_TempFolderPath, + Headers); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + return CacheResponse.ResponsePayload; + } + return {}; + } + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span BlobHashes, std::span MetaDatas) override + { + ScheduleBackgroundWork([this, + BuildId = Oid(BuildId), + BlobRawHashes = std::vector(BlobHashes.begin(), BlobHashes.end()), + MetaDatas = std::vector(MetaDatas.begin(), MetaDatas.end())]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + const uint64_t BlobCount = BlobRawHashes.size(); + + CbPackage RequestPackage; + std::vector Attachments; + tsl::robin_set AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual std::vector GetBlobMetadatas(const Oid& BuildId, std::span BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + std::vector Result; + + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + Result.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + Result.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return Result; + } + return {}; + } + + virtual std::vector BlobsExists(const Oid& BuildId, std::span BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); + if (!ResponseObject) + { + throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); + } + CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); + if (!BlobsExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); + } + if (BlobsExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", + BlobsExistsArray.Num(), + BlobHashes.size())); + } + + CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); + if (!MetadatasExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); + } + if (MetadatasExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error( + fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", + MetadatasExistsArray.Num(), + BlobHashes.size())); + } + + std::vector Result; + Result.reserve(BlobHashes.size()); + auto BlobExistsIt = begin(BlobsExistsArray); + auto MetadataExistsIt = begin(MetadatasExistsArray); + while (BlobExistsIt != end(BlobsExistsArray)) + { + ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); + + const bool HasBody = (*BlobExistsIt).AsBool(); + const bool HasMetadata = (*MetadataExistsIt).AsBool(); + + Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); + + BlobExistsIt++; + MetadataExistsIt++; + } + return Result; + } + return {}; + } + +private: + void AddStatistic(const HttpClient::Response& Result) + { + m_Stats.TotalBytesWritten += Result.UploadedBytes; + m_Stats.TotalBytesRead += Result.DownloadedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + HttpClient& m_HttpClient; + BuildStorageCache::Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; + + WorkerThreadPool m_BackgroundWorkPool; + Latch m_PendingBackgroundWorkCount; + std::atomic m_CancelBackgroundWork; +}; + +std::unique_ptr +CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath) +{ + return std::make_unique(HttpClient, Stats, Namespace, Bucket, TempFolderPath); +} + +} // namespace zen -- cgit v1.2.3 From 013ac818cd09c1d31bf9411e00b2bbbf02defa3f Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Thu, 27 Mar 2025 16:08:47 +0100 Subject: build cache prime (#327) - Feature: zen `--boost-workers` option to builds `upload`, `download` and `validate-part` that will increase the number of worker threads, may cause computer to be less responsive - Feature: zen `--cache-prime-only` that uploads referenced data from a part to `--zen-cache-host` if it is not already present. Target folder will be untouched. --- src/zenutil/buildstoragecache.cpp | 59 ++++++++++++++++++++++++++++++++++----- 1 file changed, 52 insertions(+), 7 deletions(-) (limited to 'src/zenutil/buildstoragecache.cpp') diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index c95215889..f273ac699 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -11,6 +11,7 @@ #include #include #include +#include ZEN_THIRD_PARTY_INCLUDES_START #include @@ -27,13 +28,16 @@ public: BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) : m_HttpClient(HttpClient) , m_Stats(Stats) , m_Namespace(Namespace.empty() ? "none" : Namespace) , m_Bucket(Bucket.empty() ? "none" : Bucket) , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) - , m_BackgroundWorkPool(1) + , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) + , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)) , m_PendingBackgroundWorkCount(1) , m_CancelBackgroundWork(false) { @@ -44,8 +48,11 @@ public: try { m_CancelBackgroundWork.store(true); - m_PendingBackgroundWorkCount.CountDown(); - m_PendingBackgroundWorkCount.Wait(); + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } } catch (const std::exception& Ex) { @@ -86,6 +93,7 @@ public: ZenContentType ContentType, const CompositeBuffer& Payload) override { + ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); ScheduleBackgroundWork( [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { @@ -132,6 +140,7 @@ public: virtual void PutBlobMetadatas(const Oid& BuildId, std::span BlobHashes, std::span MetaDatas) override { + ZEN_ASSERT(!IsFlushed); ScheduleBackgroundWork([this, BuildId = Oid(BuildId), BlobRawHashes = std::vector(BlobHashes.begin(), BlobHashes.end()), @@ -329,6 +338,39 @@ public: return {}; } + virtual void Flush(int32_t UpdateInteralMS, std::function&& UpdateCallback) override + { + if (IsFlushed) + { + return; + } + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + IsFlushed = true; + } + if (m_PendingBackgroundWorkCount.Wait(100)) + { + return; + } + while (true) + { + intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); + if (UpdateCallback(Remaining)) + { + if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + { + UpdateCallback(0); + return; + } + } + else + { + m_CancelBackgroundWork.store(true); + } + } + } + private: void AddStatistic(const HttpClient::Response& Result) { @@ -343,8 +385,10 @@ private: const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; + const bool m_BoostBackgroundThreadCount; + bool IsFlushed = false; - WorkerThreadPool m_BackgroundWorkPool; + WorkerThreadPool& m_BackgroundWorkPool; Latch m_PendingBackgroundWorkCount; std::atomic m_CancelBackgroundWork; }; @@ -354,9 +398,10 @@ CreateZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, - const std::filesystem::path& TempFolderPath) + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) { - return std::make_unique(HttpClient, Stats, Namespace, Bucket, TempFolderPath); + return std::make_unique(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); } } // namespace zen -- cgit v1.2.3 From 4e2efa1051e3eb86ab48d92b3f6ad5896cda5d81 Mon Sep 17 00:00:00 2001 From: Dan Engelbrecht Date: Fri, 16 May 2025 19:51:36 +0200 Subject: parallel work handle dispatch exception (#400) - Bugfix: Wait for async threads if dispatching of work using ParallellWork throws exception --- src/zenutil/buildstoragecache.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src/zenutil/buildstoragecache.cpp') diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp index f273ac699..88238effd 100644 --- a/src/zenutil/buildstoragecache.cpp +++ b/src/zenutil/buildstoragecache.cpp @@ -338,7 +338,7 @@ public: return {}; } - virtual void Flush(int32_t UpdateInteralMS, std::function&& UpdateCallback) override + virtual void Flush(int32_t UpdateIntervalMS, std::function&& UpdateCallback) override { if (IsFlushed) { @@ -358,7 +358,7 @@ public: intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); if (UpdateCallback(Remaining)) { - if (m_PendingBackgroundWorkCount.Wait(UpdateInteralMS)) + if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) { UpdateCallback(0); return; -- cgit v1.2.3