// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include ZEN_THIRD_PARTY_INCLUDES_START #include ZEN_THIRD_PARTY_INCLUDES_END namespace zen { using namespace std::literals; class ZenBuildStorageCache : public BuildStorageCache { public: explicit ZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath, bool BoostBackgroundThreadCount) : m_HttpClient(HttpClient) , m_Stats(Stats) , m_Namespace(Namespace.empty() ? "none" : Namespace) , m_Bucket(Bucket.empty() ? "none" : Bucket) , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background)) , m_PendingBackgroundWorkCount(1) , m_CancelBackgroundWork(false) { } virtual ~ZenBuildStorageCache() { try { m_CancelBackgroundWork.store(true); if (!IsFlushed) { m_PendingBackgroundWorkCount.CountDown(); m_PendingBackgroundWorkCount.Wait(); } } catch (const std::exception& Ex) { ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); } } void ScheduleBackgroundWork(std::function&& Work) { m_PendingBackgroundWorkCount.AddCount(1); try { m_BackgroundWorkPool.ScheduleWork([this, Work = std::move(Work)]() { ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); if (!m_CancelBackgroundWork) { try { Work(); } catch (const std::exception& Ex) { ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); } } }); } catch (const std::exception& Ex) { m_PendingBackgroundWorkCount.CountDown(); ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); } } virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) override { ZEN_ASSERT(!IsFlushed); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); ScheduleBackgroundWork( [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); HttpClient::Response CacheResponse = m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), Payload, ContentType); AddStatistic(CacheResponse); if (!CacheResponse.IsSuccess()) { ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); } }); } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); HttpClient::KeyValueMap Headers; if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) { Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); } CreateDirectories(m_TempFolderPath); HttpClient::Response CacheResponse = m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), m_TempFolderPath, Headers); AddStatistic(CacheResponse); if (CacheResponse.IsSuccess()) { return CacheResponse.ResponsePayload; } return {}; } virtual void PutBlobMetadatas(const Oid& BuildId, std::span BlobHashes, std::span MetaDatas) override { ZEN_ASSERT(!IsFlushed); ScheduleBackgroundWork([this, BuildId = Oid(BuildId), BlobRawHashes = std::vector(BlobHashes.begin(), BlobHashes.end()), MetaDatas = std::vector(MetaDatas.begin(), MetaDatas.end())]() { ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); const uint64_t BlobCount = BlobRawHashes.size(); CbPackage RequestPackage; std::vector Attachments; tsl::robin_set AttachmentHashes; Attachments.reserve(BlobCount); AttachmentHashes.reserve(BlobCount); { CbObjectWriter RequestWriter; RequestWriter.BeginArray("blobHashes"); for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) { RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); } RequestWriter.EndArray(); // blobHashes RequestWriter.BeginArray("metadatas"); for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) { const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); RequestWriter.AddBinaryAttachment(ObjectHash); if (!AttachmentHashes.contains(ObjectHash)) { Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); AttachmentHashes.insert(ObjectHash); } } RequestWriter.EndArray(); // metadatas RequestPackage.SetObject(RequestWriter.Save()); } RequestPackage.AddAttachments(Attachments); CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); HttpClient::Response CacheResponse = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), RpcRequestBuffer, ZenContentType::kCbPackage); AddStatistic(CacheResponse); if (!CacheResponse.IsSuccess()) { ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); } }); } virtual std::vector GetBlobMetadatas(const Oid& BuildId, std::span BlobHashes) override { ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CbObjectWriter Request; Request.BeginArray("blobHashes"sv); for (const IoHash& BlobHash : BlobHashes) { Request.AddHash(BlobHash); } Request.EndArray(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), Payload, HttpClient::Accept(ZenContentType::kCbObject)); AddStatistic(Response); if (Response.IsSuccess()) { std::vector Result; CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); CbObject ResponseObject = ResponsePackage.GetObject(); CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); Result.reserve(MetadatasArray.Num()); auto BlobHashesIt = BlobHashes.begin(); auto BlobHashArrayIt = begin(BlobHashArray); auto MetadataArrayIt = begin(MetadatasArray); while (MetadataArrayIt != end(MetadatasArray)) { const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); while (BlobHash != *BlobHashesIt) { ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); BlobHashesIt++; } ZEN_ASSERT(BlobHash == *BlobHashesIt); const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); ZEN_ASSERT(MetaAttachment); CbObject Metadata = MetaAttachment->AsObject(); Result.emplace_back(std::move(Metadata)); BlobHashArrayIt++; MetadataArrayIt++; BlobHashesIt++; } return Result; } return {}; } virtual std::vector BlobsExists(const Oid& BuildId, std::span BlobHashes) override { ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); Stopwatch ExecutionTimer; auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); CbObjectWriter Request; Request.BeginArray("blobHashes"sv); for (const IoHash& BlobHash : BlobHashes) { Request.AddHash(BlobHash); } Request.EndArray(); IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); Payload.SetContentType(ZenContentType::kCbObject); HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), Payload, HttpClient::Accept(ZenContentType::kCbObject)); AddStatistic(Response); if (Response.IsSuccess()) { CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); if (!ResponseObject) { throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); } CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); if (!BlobsExistsArray) { throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); } if (BlobsExistsArray.Num() != BlobHashes.size()) { throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", BlobsExistsArray.Num(), BlobHashes.size())); } CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); if (!MetadatasExistsArray) { throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); } if (MetadatasExistsArray.Num() != BlobHashes.size()) { throw std::runtime_error( fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", MetadatasExistsArray.Num(), BlobHashes.size())); } std::vector Result; Result.reserve(BlobHashes.size()); auto BlobExistsIt = begin(BlobsExistsArray); auto MetadataExistsIt = begin(MetadatasExistsArray); while (BlobExistsIt != end(BlobsExistsArray)) { ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); const bool HasBody = (*BlobExistsIt).AsBool(); const bool HasMetadata = (*MetadataExistsIt).AsBool(); Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); BlobExistsIt++; MetadataExistsIt++; } return Result; } return {}; } virtual void Flush(int32_t UpdateIntervalMS, std::function&& UpdateCallback) override { if (IsFlushed) { return; } if (!IsFlushed) { m_PendingBackgroundWorkCount.CountDown(); IsFlushed = true; } if (m_PendingBackgroundWorkCount.Wait(100)) { return; } while (true) { intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); if (UpdateCallback(Remaining)) { if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) { UpdateCallback(0); return; } } else { m_CancelBackgroundWork.store(true); } } } private: void AddStatistic(const HttpClient::Response& Result) { m_Stats.TotalBytesWritten += Result.UploadedBytes; m_Stats.TotalBytesRead += Result.DownloadedBytes; m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); m_Stats.TotalRequestCount++; SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes); SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes); if (Result.ElapsedSeconds > 0.0) { uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); } } HttpClient& m_HttpClient; BuildStorageCache::Statistics& m_Stats; const std::string m_Namespace; const std::string m_Bucket; const std::filesystem::path m_TempFolderPath; const bool m_BoostBackgroundThreadCount; bool IsFlushed = false; WorkerThreadPool& m_BackgroundWorkPool; Latch m_PendingBackgroundWorkCount; std::atomic m_CancelBackgroundWork; }; std::unique_ptr CreateZenBuildStorageCache(HttpClient& HttpClient, BuildStorageCache::Statistics& Stats, std::string_view Namespace, std::string_view Bucket, const std::filesystem::path& TempFolderPath, bool BoostBackgroundThreadCount) { return std::make_unique(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); } } // namespace zen