diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:59:41 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:59:41 +0200 |
| commit | 1f27c826bd36cd065e0fdd160f60c1f887138f33 (patch) | |
| tree | 000528b3f672d0126ca92438378d9007bfdcd0d8 /src/zenutil/buildstoragecache.cpp | |
| parent | move remoteproject to remotestorelib (#542) (diff) | |
| download | zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.tar.xz zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.zip | |
move zenutil builds code to zenremotestore (#543)
* move buildstorage implementations to zenremotestore lib
* move builds storage to zenremotelib
Diffstat (limited to 'src/zenutil/buildstoragecache.cpp')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 434 |
1 files changed, 0 insertions, 434 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp deleted file mode 100644 index 376d967d1..000000000 --- a/src/zenutil/buildstoragecache.cpp +++ /dev/null @@ -1,434 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/buildstoragecache.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpclient.h> -#include <zenhttp/packageformat.h> -#include <zenutil/workerpools.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_set.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -using namespace std::literals; - -class ZenBuildStorageCache : public BuildStorageCache -{ -public: - explicit ZenBuildStorageCache(HttpClient& HttpClient, - BuildStorageCache::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount) - : m_HttpClient(HttpClient) - , m_Stats(Stats) - , m_Namespace(Namespace.empty() ? "none" : Namespace) - , m_Bucket(Bucket.empty() ? "none" : Bucket) - , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) - , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) - , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) - : GetTinyWorkerPool(EWorkloadType::Background)) - , m_PendingBackgroundWorkCount(1) - , m_CancelBackgroundWork(false) - { - } - - virtual ~ZenBuildStorageCache() - { - try - { - m_CancelBackgroundWork.store(true); - if (!IsFlushed) - { - m_PendingBackgroundWorkCount.CountDown(); - m_PendingBackgroundWorkCount.Wait(); - } - } - catch (const std::exception& Ex) - { - ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); - } - } - - void ScheduleBackgroundWork(std::function<void()>&& Work) - { - m_PendingBackgroundWorkCount.AddCount(1); - try - { - m_BackgroundWorkPool.ScheduleWork( - [this, Work = std::move(Work)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); - auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); - if (!m_CancelBackgroundWork) - { - try - { - Work(); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); - } - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - catch (const std::exception& Ex) - { - m_PendingBackgroundWorkCount.CountDown(); - ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); - } - } - - virtual void PutBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - const CompositeBuffer& Payload) override - { - ZEN_ASSERT(!IsFlushed); - ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - ScheduleBackgroundWork( - [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - HttpClient::Response CacheResponse = - m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), - Payload, - ContentType); - AddStatistic(CacheResponse); - if (!CacheResponse.IsSuccess()) - { - ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); - } - }); - } - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override - { - ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - HttpClient::KeyValueMap Headers; - if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) - { - Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); - } - CreateDirectories(m_TempFolderPath); - HttpClient::Response CacheResponse = - m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), - m_TempFolderPath, - Headers); - AddStatistic(CacheResponse); - if (CacheResponse.IsSuccess()) - { - return CacheResponse.ResponsePayload; - } - return {}; - } - - virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override - { - ZEN_ASSERT(!IsFlushed); - ScheduleBackgroundWork([this, - BuildId = Oid(BuildId), - BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), - MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - const uint64_t BlobCount = BlobRawHashes.size(); - - CbPackage RequestPackage; - std::vector<CbAttachment> Attachments; - tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; - Attachments.reserve(BlobCount); - AttachmentHashes.reserve(BlobCount); - { - CbObjectWriter RequestWriter; - RequestWriter.BeginArray("blobHashes"); - for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) - { - RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); - } - RequestWriter.EndArray(); // blobHashes - - RequestWriter.BeginArray("metadatas"); - for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) - { - const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); - RequestWriter.AddBinaryAttachment(ObjectHash); - if (!AttachmentHashes.contains(ObjectHash)) - { - Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); - AttachmentHashes.insert(ObjectHash); - } - } - - RequestWriter.EndArray(); // metadatas - - RequestPackage.SetObject(RequestWriter.Save()); - } - RequestPackage.AddAttachments(Attachments); - - CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); - - HttpClient::Response CacheResponse = - m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), - RpcRequestBuffer, - ZenContentType::kCbPackage); - AddStatistic(CacheResponse); - if (!CacheResponse.IsSuccess()) - { - ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); - } - }); - } - - virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override - { - ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - CbObjectWriter Request; - - Request.BeginArray("blobHashes"sv); - for (const IoHash& BlobHash : BlobHashes) - { - Request.AddHash(BlobHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - AddStatistic(Response); - if (Response.IsSuccess()) - { - std::vector<CbObject> Result; - - CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); - CbObject ResponseObject = ResponsePackage.GetObject(); - - CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); - CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); - Result.reserve(MetadatasArray.Num()); - auto BlobHashesIt = BlobHashes.begin(); - auto BlobHashArrayIt = begin(BlobHashArray); - auto MetadataArrayIt = begin(MetadatasArray); - while (MetadataArrayIt != end(MetadatasArray)) - { - const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); - while (BlobHash != *BlobHashesIt) - { - ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); - BlobHashesIt++; - } - - ZEN_ASSERT(BlobHash == *BlobHashesIt); - - const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); - const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); - ZEN_ASSERT(MetaAttachment); - - CbObject Metadata = MetaAttachment->AsObject(); - Result.emplace_back(std::move(Metadata)); - - BlobHashArrayIt++; - MetadataArrayIt++; - BlobHashesIt++; - } - return Result; - } - return {}; - } - - virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override - { - ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - CbObjectWriter Request; - - Request.BeginArray("blobHashes"sv); - for (const IoHash& BlobHash : BlobHashes) - { - Request.AddHash(BlobHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - AddStatistic(Response); - if (Response.IsSuccess()) - { - CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); - if (!ResponseObject) - { - throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); - } - CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); - if (!BlobsExistsArray) - { - throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); - } - if (BlobsExistsArray.Num() != BlobHashes.size()) - { - throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", - BlobsExistsArray.Num(), - BlobHashes.size())); - } - - CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); - if (!MetadatasExistsArray) - { - throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); - } - if (MetadatasExistsArray.Num() != BlobHashes.size()) - { - throw std::runtime_error( - fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", - MetadatasExistsArray.Num(), - BlobHashes.size())); - } - - std::vector<BlobExistsResult> Result; - Result.reserve(BlobHashes.size()); - auto BlobExistsIt = begin(BlobsExistsArray); - auto MetadataExistsIt = begin(MetadatasExistsArray); - while (BlobExistsIt != end(BlobsExistsArray)) - { - ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); - - const bool HasBody = (*BlobExistsIt).AsBool(); - const bool HasMetadata = (*MetadataExistsIt).AsBool(); - - Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); - - BlobExistsIt++; - MetadataExistsIt++; - } - return Result; - } - return {}; - } - - virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override - { - if (IsFlushed) - { - return; - } - if (!IsFlushed) - { - m_PendingBackgroundWorkCount.CountDown(); - IsFlushed = true; - } - if (m_PendingBackgroundWorkCount.Wait(100)) - { - return; - } - while (true) - { - intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); - if (UpdateCallback(Remaining)) - { - if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) - { - UpdateCallback(0); - return; - } - } - else - { - m_CancelBackgroundWork.store(true); - } - } - } - -private: - void AddStatistic(const HttpClient::Response& Result) - { - m_Stats.TotalBytesWritten += Result.UploadedBytes; - m_Stats.TotalBytesRead += Result.DownloadedBytes; - m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); - m_Stats.TotalRequestCount++; - SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes); - SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); - } - } - - HttpClient& m_HttpClient; - BuildStorageCache::Statistics& m_Stats; - const std::string m_Namespace; - const std::string m_Bucket; - const std::filesystem::path m_TempFolderPath; - const bool m_BoostBackgroundThreadCount; - bool IsFlushed = false; - - WorkerThreadPool& m_BackgroundWorkPool; - Latch m_PendingBackgroundWorkCount; - std::atomic<bool> m_CancelBackgroundWork; -}; - -std::unique_ptr<BuildStorageCache> -CreateZenBuildStorageCache(HttpClient& HttpClient, - BuildStorageCache::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount) -{ - return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); -} - -ZenCacheEndpointTestResult -TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2) -{ - HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{1000}, - .Timeout = std::chrono::milliseconds{2000}, - .AssumeHttp2 = AssumeHttp2, - .AllowResume = true, - .RetryCount = 0}; - HttpClient TestHttpClient(BaseUrl, TestClientSettings); - HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); - if (TestResponse.IsSuccess()) - { - return {.Success = true}; - } - return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; -}; - -} // namespace zen |