diff options
| author | Dan Engelbrecht <[email protected]> | 2025-03-26 17:06:23 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-03-26 17:06:23 +0100 |
| commit | 28bc5ebf05984385cc0567c89b1d8e7a541ebef8 (patch) | |
| tree | 424efc19bc8630d33f76d3372f9105731d00a45f /src/zenutil/buildstoragecache.cpp | |
| parent | don't let auth env argument block other auth options (#316) (diff) | |
| download | zen-28bc5ebf05984385cc0567c89b1d8e7a541ebef8.tar.xz zen-28bc5ebf05984385cc0567c89b1d8e7a541ebef8.zip | |
zen build cache service (#318)
- **EXPERIMENTAL** `zen builds`
- Feature: `--zen-cache-host` option for `upload` and `download` operations to use a zenserver host `/builds` endpoint for storing build blob and blob metadata
- Feature: New `/builds` endpoint for caching build blobs and blob metadata
- `/builds/{namespace}/{bucket}/{buildid}/blobs/{hash}` `GET` and `PUT` method for storing and fetching blobs
- `/builds/{namespace}/{bucket}/{buildid}/blobs/putBlobMetadata` `POST` method for storing metadata about blobs
- `/builds/{namespace}/{bucket}/{buildid}/blobs/getBlobMetadata` `POST` method for fetching metadata about blobs
- `/builds/{namespace}/{bucket}/{buildid}/blobs/exists` `POST` method for checking existance of blobs
Diffstat (limited to 'src/zenutil/buildstoragecache.cpp')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 362 |
1 files changed, 362 insertions, 0 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp new file mode 100644 index 000000000..c95215889 --- /dev/null +++ b/src/zenutil/buildstoragecache.cpp @@ -0,0 +1,362 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenutil/buildstoragecache.h> + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpclient.h> +#include <zenhttp/packageformat.h> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +using namespace std::literals; + +class ZenBuildStorageCache : public BuildStorageCache +{ +public: + explicit ZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath) + : m_HttpClient(HttpClient) + , m_Stats(Stats) + , m_Namespace(Namespace.empty() ? "none" : Namespace) + , m_Bucket(Bucket.empty() ? "none" : Bucket) + , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) + , m_BackgroundWorkPool(1) + , m_PendingBackgroundWorkCount(1) + , m_CancelBackgroundWork(false) + { + } + + virtual ~ZenBuildStorageCache() + { + try + { + m_CancelBackgroundWork.store(true); + m_PendingBackgroundWorkCount.CountDown(); + m_PendingBackgroundWorkCount.Wait(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); + } + } + + void ScheduleBackgroundWork(std::function<void()>&& Work) + { + m_PendingBackgroundWorkCount.AddCount(1); + try + { + m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); + auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); + if (!m_CancelBackgroundWork) + { + try + { + Work(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); + } + } + }); + } + catch (const std::exception& Ex) + { + m_PendingBackgroundWorkCount.CountDown(); + ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); + } + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + ScheduleBackgroundWork( + [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::Response CacheResponse = + m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + Payload, + ContentType); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + HttpClient::KeyValueMap Headers; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); + } + CreateDirectories(m_TempFolderPath); + HttpClient::Response CacheResponse = + m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), + m_TempFolderPath, + Headers); + AddStatistic(CacheResponse); + if (CacheResponse.IsSuccess()) + { + return CacheResponse.ResponsePayload; + } + return {}; + } + + virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override + { + ScheduleBackgroundWork([this, + BuildId = Oid(BuildId), + BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), + MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { + ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + const uint64_t BlobCount = BlobRawHashes.size(); + + CbPackage RequestPackage; + std::vector<CbAttachment> Attachments; + tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; + Attachments.reserve(BlobCount); + AttachmentHashes.reserve(BlobCount); + { + CbObjectWriter RequestWriter; + RequestWriter.BeginArray("blobHashes"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); + } + RequestWriter.EndArray(); // blobHashes + + RequestWriter.BeginArray("metadatas"); + for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) + { + const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); + RequestWriter.AddBinaryAttachment(ObjectHash); + if (!AttachmentHashes.contains(ObjectHash)) + { + Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); + AttachmentHashes.insert(ObjectHash); + } + } + + RequestWriter.EndArray(); // metadatas + + RequestPackage.SetObject(RequestWriter.Save()); + } + RequestPackage.AddAttachments(Attachments); + + CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); + + HttpClient::Response CacheResponse = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), + RpcRequestBuffer, + ZenContentType::kCbPackage); + AddStatistic(CacheResponse); + if (!CacheResponse.IsSuccess()) + { + ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); + } + }); + } + + virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = + m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + std::vector<CbObject> Result; + + CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); + CbObject ResponseObject = ResponsePackage.GetObject(); + + CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); + CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); + Result.reserve(MetadatasArray.Num()); + auto BlobHashesIt = BlobHashes.begin(); + auto BlobHashArrayIt = begin(BlobHashArray); + auto MetadataArrayIt = begin(MetadatasArray); + while (MetadataArrayIt != end(MetadatasArray)) + { + const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); + while (BlobHash != *BlobHashesIt) + { + ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); + BlobHashesIt++; + } + + ZEN_ASSERT(BlobHash == *BlobHashesIt); + + const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); + const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); + ZEN_ASSERT(MetaAttachment); + + CbObject Metadata = MetaAttachment->AsObject(); + Result.emplace_back(std::move(Metadata)); + + BlobHashArrayIt++; + MetadataArrayIt++; + BlobHashesIt++; + } + return Result; + } + return {}; + } + + virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override + { + ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); + + Stopwatch ExecutionTimer; + auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); + + CbObjectWriter Request; + + Request.BeginArray("blobHashes"sv); + for (const IoHash& BlobHash : BlobHashes) + { + Request.AddHash(BlobHash); + } + Request.EndArray(); + + IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); + Payload.SetContentType(ZenContentType::kCbObject); + + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), + Payload, + HttpClient::Accept(ZenContentType::kCbObject)); + AddStatistic(Response); + if (Response.IsSuccess()) + { + CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); + if (!ResponseObject) + { + throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); + } + CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); + if (!BlobsExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); + } + if (BlobsExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", + BlobsExistsArray.Num(), + BlobHashes.size())); + } + + CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); + if (!MetadatasExistsArray) + { + throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); + } + if (MetadatasExistsArray.Num() != BlobHashes.size()) + { + throw std::runtime_error( + fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", + MetadatasExistsArray.Num(), + BlobHashes.size())); + } + + std::vector<BlobExistsResult> Result; + Result.reserve(BlobHashes.size()); + auto BlobExistsIt = begin(BlobsExistsArray); + auto MetadataExistsIt = begin(MetadatasExistsArray); + while (BlobExistsIt != end(BlobsExistsArray)) + { + ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); + + const bool HasBody = (*BlobExistsIt).AsBool(); + const bool HasMetadata = (*MetadataExistsIt).AsBool(); + + Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); + + BlobExistsIt++; + MetadataExistsIt++; + } + return Result; + } + return {}; + } + +private: + void AddStatistic(const HttpClient::Response& Result) + { + m_Stats.TotalBytesWritten += Result.UploadedBytes; + m_Stats.TotalBytesRead += Result.DownloadedBytes; + m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); + m_Stats.TotalRequestCount++; + } + + HttpClient& m_HttpClient; + BuildStorageCache::Statistics& m_Stats; + const std::string m_Namespace; + const std::string m_Bucket; + const std::filesystem::path m_TempFolderPath; + + WorkerThreadPool m_BackgroundWorkPool; + Latch m_PendingBackgroundWorkCount; + std::atomic<bool> m_CancelBackgroundWork; +}; + +std::unique_ptr<BuildStorageCache> +CreateZenBuildStorageCache(HttpClient& HttpClient, + BuildStorageCache::Statistics& Stats, + std::string_view Namespace, + std::string_view Bucket, + const std::filesystem::path& TempFolderPath) +{ + return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath); +} + +} // namespace zen |