diff options
| author | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
|---|---|---|
| committer | zousar <[email protected]> | 2025-06-24 16:26:29 -0600 |
| commit | bb298631ba35a323827dda0b8cd6158e276b5f61 (patch) | |
| tree | 7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenutil/buildstoragecache.cpp | |
| parent | Change to PutResult structure (diff) | |
| parent | 5.6.14 (diff) | |
| download | zen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip | |
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenutil/buildstoragecache.cpp')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 407 |
1 files changed, 407 insertions, 0 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp new file mode 100644 index 000000000..88238effd --- /dev/null +++ b/src/zenutil/buildstoragecache.cpp @@ -0,0 +1,407 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/buildstoragecache.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> +#include <zenutil/workerpools.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenBuildStorageCache : public BuildStorageCache +{ +public: + explicit ZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) + : m_HttpClient(HttpClient) + , m_Stats(Stats) + , m_Namespace(Namespace.empty() ? "none" : Namespace) + , m_Bucket(Bucket.empty() ? "none" : Bucket) + , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) + , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) + , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) + : GetTinyWorkerPool(EWorkloadType::Background)) + , m_PendingBackgroundWorkCount(1) + , m_CancelBackgroundWork(false) + { + } + + virtual ~ZenBuildStorageCache() + { + try + { + m_CancelBackgroundWork.store(true); + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); + } + } + + void ScheduleBackgroundWork(std::function<void()>&& Work) + { + m_PendingBackgroundWorkCount.AddCount(1); + try + { + m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) + { + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + m_PendingBackgroundWorkCount.CountDown(); + ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); + } + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_ASSERT(!IsFlushed); + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ScheduleBackgroundWork( + [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::KeyValueMap Headers; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); + } + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_TempFolderPath, + Headers); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + return CacheResponse.ResponsePayload; + } + return {}; + } + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override + { + ZEN_ASSERT(!IsFlushed); + ScheduleBackgroundWork([this, + BuildId = Oid(BuildId), + BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), + MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + const uint64_t BlobCount = BlobRawHashes.size(); + + CbPackage RequestPackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + std::vector<CbObject> Result; + + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + Result.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + Result.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return Result; + } + return {}; + } + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); + if (!ResponseObject) + { + throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); + } + CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); + if (!BlobsExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); + } + if (BlobsExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", + BlobsExistsArray.Num(), + BlobHashes.size())); + } + + CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); + if (!MetadatasExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); + } + if (MetadatasExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error( + fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", + MetadatasExistsArray.Num(), + BlobHashes.size())); + } + + std::vector<BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + auto BlobExistsIt = begin(BlobsExistsArray); + auto MetadataExistsIt = begin(MetadatasExistsArray); + while (BlobExistsIt != end(BlobsExistsArray)) + { + ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); + + const bool HasBody = (*BlobExistsIt).AsBool(); + const bool HasMetadata = (*MetadataExistsIt).AsBool(); + + Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); + + BlobExistsIt++; + MetadataExistsIt++; + } + return Result; + } + return {}; + } + + virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override + { + if (IsFlushed) + { + return; + } + if (!IsFlushed) + { + m_PendingBackgroundWorkCount.CountDown(); + IsFlushed = true; + } + if (m_PendingBackgroundWorkCount.Wait(100)) + { + return; + } + while (true) + { + intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); + if (UpdateCallback(Remaining)) + { + if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) + { + UpdateCallback(0); + return; + } + } + else + { + m_CancelBackgroundWork.store(true); + } + } + } + +private: + void AddStatistic(const HttpClient::Response& Result) + { + m_Stats.TotalBytesWritten += Result.UploadedBytes; + m_Stats.TotalBytesRead += Result.DownloadedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + HttpClient& m_HttpClient; + BuildStorageCache::Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; + const bool m_BoostBackgroundThreadCount; + bool IsFlushed = false; + + WorkerThreadPool& m_BackgroundWorkPool; + Latch m_PendingBackgroundWorkCount; + std::atomic<bool> m_CancelBackgroundWork; +}; + +std::unique_ptr<BuildStorageCache> +CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath, + bool BoostBackgroundThreadCount) +{ + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); +} + +} // namespace zen |