diff options
Diffstat (limited to 'src/zenutil')
| -rw-r--r-- | src/zenutil/buildstoragecache.cpp | 434 | ||||
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 806 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstorage.h | 68 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/buildstoragecache.h | 69 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/filebuildstorage.h | 16 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h | 25 | ||||
| -rw-r--r-- | src/zenutil/jupiter/jupiterbuildstorage.cpp | 561 |
7 files changed, 0 insertions, 1979 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp deleted file mode 100644 index 376d967d1..000000000 --- a/src/zenutil/buildstoragecache.cpp +++ /dev/null @@ -1,434 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/buildstoragecache.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zencore/workthreadpool.h> -#include <zenhttp/httpclient.h> -#include <zenhttp/packageformat.h> -#include <zenutil/workerpools.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_set.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -using namespace std::literals; - -class ZenBuildStorageCache : public BuildStorageCache -{ -public: - explicit ZenBuildStorageCache(HttpClient& HttpClient, - BuildStorageCache::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount) - : m_HttpClient(HttpClient) - , m_Stats(Stats) - , m_Namespace(Namespace.empty() ? "none" : Namespace) - , m_Bucket(Bucket.empty() ? "none" : Bucket) - , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred()) - , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount) - , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background) - : GetTinyWorkerPool(EWorkloadType::Background)) - , m_PendingBackgroundWorkCount(1) - , m_CancelBackgroundWork(false) - { - } - - virtual ~ZenBuildStorageCache() - { - try - { - m_CancelBackgroundWork.store(true); - if (!IsFlushed) - { - m_PendingBackgroundWorkCount.CountDown(); - m_PendingBackgroundWorkCount.Wait(); - } - } - catch (const std::exception& Ex) - { - ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what()); - } - } - - void ScheduleBackgroundWork(std::function<void()>&& Work) - { - m_PendingBackgroundWorkCount.AddCount(1); - try - { - m_BackgroundWorkPool.ScheduleWork( - [this, Work = std::move(Work)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork"); - auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); }); - if (!m_CancelBackgroundWork) - { - try - { - Work(); - } - catch (const std::exception& Ex) - { - ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what()); - } - } - }, - WorkerThreadPool::EMode::EnableBacklog); - } - catch (const std::exception& Ex) - { - m_PendingBackgroundWorkCount.CountDown(); - ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what()); - } - } - - virtual void PutBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - const CompositeBuffer& Payload) override - { - ZEN_ASSERT(!IsFlushed); - ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - ScheduleBackgroundWork( - [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob"); - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - HttpClient::Response CacheResponse = - m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), - Payload, - ContentType); - AddStatistic(CacheResponse); - if (!CacheResponse.IsSuccess()) - { - ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv)); - } - }); - } - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override - { - ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - HttpClient::KeyValueMap Headers; - if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) - { - Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)}); - } - CreateDirectories(m_TempFolderPath); - HttpClient::Response CacheResponse = - m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()), - m_TempFolderPath, - Headers); - AddStatistic(CacheResponse); - if (CacheResponse.IsSuccess()) - { - return CacheResponse.ResponsePayload; - } - return {}; - } - - virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override - { - ZEN_ASSERT(!IsFlushed); - ScheduleBackgroundWork([this, - BuildId = Oid(BuildId), - BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()), - MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() { - ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - const uint64_t BlobCount = BlobRawHashes.size(); - - CbPackage RequestPackage; - std::vector<CbAttachment> Attachments; - tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes; - Attachments.reserve(BlobCount); - AttachmentHashes.reserve(BlobCount); - { - CbObjectWriter RequestWriter; - RequestWriter.BeginArray("blobHashes"); - for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) - { - RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]); - } - RequestWriter.EndArray(); // blobHashes - - RequestWriter.BeginArray("metadatas"); - for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++) - { - const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash(); - RequestWriter.AddBinaryAttachment(ObjectHash); - if (!AttachmentHashes.contains(ObjectHash)) - { - Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash)); - AttachmentHashes.insert(ObjectHash); - } - } - - RequestWriter.EndArray(); // metadatas - - RequestPackage.SetObject(RequestWriter.Save()); - } - RequestPackage.AddAttachments(Attachments); - - CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage); - - HttpClient::Response CacheResponse = - m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId), - RpcRequestBuffer, - ZenContentType::kCbPackage); - AddStatistic(CacheResponse); - if (!CacheResponse.IsSuccess()) - { - ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv)); - } - }); - } - - virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override - { - ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - CbObjectWriter Request; - - Request.BeginArray("blobHashes"sv); - for (const IoHash& BlobHash : BlobHashes) - { - Request.AddHash(BlobHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - - HttpClient::Response Response = - m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - AddStatistic(Response); - if (Response.IsSuccess()) - { - std::vector<CbObject> Result; - - CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload); - CbObject ResponseObject = ResponsePackage.GetObject(); - - CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView(); - CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView(); - Result.reserve(MetadatasArray.Num()); - auto BlobHashesIt = BlobHashes.begin(); - auto BlobHashArrayIt = begin(BlobHashArray); - auto MetadataArrayIt = begin(MetadatasArray); - while (MetadataArrayIt != end(MetadatasArray)) - { - const IoHash BlobHash = (*BlobHashArrayIt).AsHash(); - while (BlobHash != *BlobHashesIt) - { - ZEN_ASSERT(BlobHashesIt != BlobHashes.end()); - BlobHashesIt++; - } - - ZEN_ASSERT(BlobHash == *BlobHashesIt); - - const IoHash MetaHash = (*MetadataArrayIt).AsAttachment(); - const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash); - ZEN_ASSERT(MetaAttachment); - - CbObject Metadata = MetaAttachment->AsObject(); - Result.emplace_back(std::move(Metadata)); - - BlobHashArrayIt++; - MetadataArrayIt++; - BlobHashesIt++; - } - return Result; - } - return {}; - } - - virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override - { - ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - - CbObjectWriter Request; - - Request.BeginArray("blobHashes"sv); - for (const IoHash& BlobHash : BlobHashes) - { - Request.AddHash(BlobHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId), - Payload, - HttpClient::Accept(ZenContentType::kCbObject)); - AddStatistic(Response); - if (Response.IsSuccess()) - { - CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload); - if (!ResponseObject) - { - throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object"); - } - CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView(); - if (!BlobsExistsArray) - { - throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing"); - } - if (BlobsExistsArray.Num() != BlobHashes.size()) - { - throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}", - BlobsExistsArray.Num(), - BlobHashes.size())); - } - - CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView(); - if (!MetadatasExistsArray) - { - throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing"); - } - if (MetadatasExistsArray.Num() != BlobHashes.size()) - { - throw std::runtime_error( - fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}", - MetadatasExistsArray.Num(), - BlobHashes.size())); - } - - std::vector<BlobExistsResult> Result; - Result.reserve(BlobHashes.size()); - auto BlobExistsIt = begin(BlobsExistsArray); - auto MetadataExistsIt = begin(MetadatasExistsArray); - while (BlobExistsIt != end(BlobsExistsArray)) - { - ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray)); - - const bool HasBody = (*BlobExistsIt).AsBool(); - const bool HasMetadata = (*MetadataExistsIt).AsBool(); - - Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata}); - - BlobExistsIt++; - MetadataExistsIt++; - } - return Result; - } - return {}; - } - - virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override - { - if (IsFlushed) - { - return; - } - if (!IsFlushed) - { - m_PendingBackgroundWorkCount.CountDown(); - IsFlushed = true; - } - if (m_PendingBackgroundWorkCount.Wait(100)) - { - return; - } - while (true) - { - intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining(); - if (UpdateCallback(Remaining)) - { - if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS)) - { - UpdateCallback(0); - return; - } - } - else - { - m_CancelBackgroundWork.store(true); - } - } - } - -private: - void AddStatistic(const HttpClient::Response& Result) - { - m_Stats.TotalBytesWritten += Result.UploadedBytes; - m_Stats.TotalBytesRead += Result.DownloadedBytes; - m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); - m_Stats.TotalRequestCount++; - SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes); - SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); - } - } - - HttpClient& m_HttpClient; - BuildStorageCache::Statistics& m_Stats; - const std::string m_Namespace; - const std::string m_Bucket; - const std::filesystem::path m_TempFolderPath; - const bool m_BoostBackgroundThreadCount; - bool IsFlushed = false; - - WorkerThreadPool& m_BackgroundWorkPool; - Latch m_PendingBackgroundWorkCount; - std::atomic<bool> m_CancelBackgroundWork; -}; - -std::unique_ptr<BuildStorageCache> -CreateZenBuildStorageCache(HttpClient& HttpClient, - BuildStorageCache::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount) -{ - return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount); -} - -ZenCacheEndpointTestResult -TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2) -{ - HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient", - .ConnectTimeout = std::chrono::milliseconds{1000}, - .Timeout = std::chrono::milliseconds{2000}, - .AssumeHttp2 = AssumeHttp2, - .AllowResume = true, - .RetryCount = 0}; - HttpClient TestHttpClient(BaseUrl, TestClientSettings); - HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds"); - if (TestResponse.IsSuccess()) - { - return {.Success = true}; - } - return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")}; -}; - -} // namespace zen diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp deleted file mode 100644 index f75fe403f..000000000 --- a/src/zenutil/filebuildstorage.cpp +++ /dev/null @@ -1,806 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/filebuildstorage.h> - -#include <zencore/basicfile.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> -#include <zencore/trace.h> - -namespace zen { - -using namespace std::literals; - -class FileBuildStorage : public BuildStorage -{ -public: - explicit FileBuildStorage(const std::filesystem::path& StoragePath, - BuildStorage::Statistics& Stats, - bool EnableJsonOutput, - double LatencySec, - double DelayPerKBSec) - : m_StoragePath(StoragePath) - , m_Stats(Stats) - , m_EnableJsonOutput(EnableJsonOutput) - , m_LatencySec(LatencySec) - , m_DelayPerKBSec(DelayPerKBSec) - { - CreateDirectories(GetBuildsFolder()); - CreateDirectories(GetBlobsFolder()); - CreateDirectories(GetBlobsMetadataFolder()); - } - - virtual ~FileBuildStorage() {} - - virtual CbObject ListNamespaces(bool bRecursive) override - { - ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); - ZEN_UNUSED(bRecursive); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter Writer; - Writer.BeginArray("results"); - { - } - Writer.EndArray(); // results - - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual CbObject ListBuilds(CbObject Query) override - { - ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); - ZEN_UNUSED(Query); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = Query.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildFolder = GetBuildsFolder(); - DirectoryContent Content; - GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content); - CbObjectWriter Writer; - Writer.BeginArray("results"); - { - for (const std::filesystem::path& BuildPath : Content.Directories) - { - Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string()); - if (BuildId != Oid::Zero) - { - Writer.BeginObject(); - { - Writer.AddObjectId("buildId", BuildId); - Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView()); - } - Writer.EndObject(); - } - } - } - Writer.EndArray(); // results - - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = MetaData.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter BuildObject; - BuildObject.AddObject("metadata", MetaData); - BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); - WriteBuild(BuildId, BuildObject.Save()); - - CbObjectWriter BuildResponse; - BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); - BuildResponse.Finalize(); - ReceivedBytes = BuildResponse.GetSaveSize(); - return BuildResponse.Save(); - } - - virtual CbObject GetBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObject Build = ReadBuild(BuildId); - ReceivedBytes = Build.GetSize(); - return Build; - } - - virtual void FinalizeBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - } - - virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, - const Oid& BuildPartId, - std::string_view PartName, - const CbObject& MetaData) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = MetaData.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); - CreateDirectories(BuildPartDataPath.parent_path()); - - TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView()); - m_WrittenBytes += MetaData.GetSize(); - WriteAsJson(BuildPartDataPath, MetaData); - - IoHash RawHash = IoHash::HashBuffer(MetaData.GetView()); - - CbObjectWriter Writer; - { - CbObject BuildObject = ReadBuild(BuildId); - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView(); - - Writer.AddObject("metadata"sv, MetaDataView); - Writer.BeginObject("parts"sv); - { - for (CbFieldView PartView : PartsObject) - { - if (PartView.GetName() != PartName) - { - Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId()); - } - } - Writer.AddObjectId(PartName, BuildPartId); - } - Writer.EndObject(); // parts - } - WriteBuild(BuildId, Writer.Save()); - - std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData); - - ReceivedBytes = sizeof(IoHash) * NeededAttachments.size(); - - return std::make_pair(RawHash, std::move(NeededAttachments)); - } - - virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); - - IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - - ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); - - CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); - - ReceivedBytes = BuildPartObject.GetSize(); - - return BuildPartObject; - } - - virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override - { - ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); - IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - - IoHash RawHash = IoHash::HashBuffer(Payload.GetView()); - if (RawHash != PartHash) - { - throw std::runtime_error( - fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash)); - } - - CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); - std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject)); - - ReceivedBytes = NeededAttachments.size() * sizeof(IoHash); - - return NeededAttachments; - } - - virtual void PutBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - const CompositeBuffer& Payload) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); - ZEN_UNUSED(BuildId); - ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = Payload.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!IsFile(BlockPath)) - { - CreateDirectories(BlockPath.parent_path()); - TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); - } - - ReceivedBytes = Payload.GetSize(); - } - - virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::function<void(uint64_t, bool)>&& OnSentBytes) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); - ZEN_UNUSED(BuildId); - ZEN_UNUSED(ContentType); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!IsFile(BlockPath)) - { - CreateDirectories(BlockPath.parent_path()); - - struct WorkloadData - { - std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; - std::function<void(uint64_t, bool)> OnSentBytes; - TemporaryFile TempFile; - std::atomic<size_t> PartsLeft; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->Transmitter = std::move(Transmitter); - Workload->OnSentBytes = std::move(OnSentBytes); - std::error_code Ec; - Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec); - - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); - } - - std::vector<std::function<void()>> WorkItems; - uint64_t Offset = 0; - while (Offset < PayloadSize) - { - uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset); - - WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { - ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); - IoBuffer PartPayload = Workload->Transmitter(Offset, Size); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = PartPayload.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - std::error_code Ec; - Workload->TempFile.Write(PartPayload, Offset, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})", - Workload->TempFile.GetPath(), - Ec.message(), - Ec.value())); - } - - const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1; - if (IsLastPart) - { - Workload->TempFile.Flush(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll()))); - Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})", - Workload->TempFile.GetPath(), - BlockPath, - Ec.message(), - Ec.value())); - } - } - Workload->OnSentBytes(SentBytes, IsLastPart); - }); - - Offset += Size; - } - Workload->PartsLeft.store(WorkItems.size()); - return WorkItems; - } - return {}; - } - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (IsFile(BlockPath)) - { - BasicFile File(BlockPath, BasicFile::Mode::kRead); - IoBuffer Payload; - if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) - { - Payload = IoBuffer(RangeBytes); - File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset); - } - else - { - Payload = File.ReadAll(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); - } - Payload.SetContentType(ZenContentType::kCompressedBinary); - ReceivedBytes = Payload.GetSize(); - return Payload; - } - return IoBuffer{}; - } - - virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (IsFile(BlockPath)) - { - struct WorkloadData - { - std::atomic<uint64_t> BytesRemaining; - BasicFile BlobFile; - std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; - std::function<void()> OnComplete; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); - const uint64_t BlobSize = Workload->BlobFile.FileSize(); - - Workload->OnReceive = std::move(OnReceive); - Workload->OnComplete = std::move(OnComplete); - Workload->BytesRemaining = BlobSize; - - std::vector<std::function<void()>> WorkItems; - uint64_t Offset = 0; - while (Offset < BlobSize) - { - uint64_t Size = Min(ChunkSize, BlobSize - Offset); - WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { - ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - IoBuffer PartPayload(Size); - Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); - ReceivedBytes = PartPayload.GetSize(); - Workload->OnReceive(Offset, PartPayload); - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); - if (ByteRemaning == Size) - { - Workload->OnComplete(); - } - }); - - Offset += Size; - } - return WorkItems; - } - return {}; - } - - virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = MetaData.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash); - CreateDirectories(BlockMetaDataPath.parent_path()); - TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView()); - WriteAsJson(BlockMetaDataPath, MetaData); - return true; - } - - virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override - { - ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - uint64_t FoundCount = 0; - - DirectoryContent Content; - GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); - CbObjectWriter Writer; - Writer.BeginArray("blocks"); - for (const std::filesystem::path& MetaDataFile : Content.Files) - { - IoHash ChunkHash; - if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash)) - { - std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash); - if (IsFile(BlockPath)) - { - IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Writer.AddObject(BlockObject); - FoundCount++; - if (FoundCount == MaxBlockCount) - { - break; - } - } - } - } - Writer.EndArray(); // blocks - - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter Writer; - Writer.BeginArray("blocks"); - - for (const IoHash& BlockHash : BlockHashes) - { - std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); - if (IsFile(MetaDataFile)) - { - IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Writer.AddObject(BlockObject); - } - } - Writer.EndArray(); // blocks - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual void PutBuildPartStats(const Oid& BuildId, - const Oid& BuildPartId, - const tsl::robin_map<std::string, double>& FloatStats) override - { - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter Request; - Request.BeginObject("floatStats"sv); - for (auto It : FloatStats) - { - Request.AddFloat(It.first, It.second); - } - Request.EndObject(); - Request.Finalize(); - SentBytes = Request.GetSaveSize(); - - const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); - CreateDirectories(BuildPartStatsDataPath.parent_path()); - - CbObject Payload = Request.Save(); - - TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); - WriteAsJson(BuildPartStatsDataPath, Payload); - } - -protected: - std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; } - std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; } - std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; } - std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); } - - std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; } - - std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const - { - return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString(); - } - - std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const - { - return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb"; - } - - std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const - { - return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid()); - } - - std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); } - - std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const - { - return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash); - } - - void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize) - { - double SleepSec = m_LatencySec; - if (m_DelayPerKBSec > 0.0) - { - SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u); - } - if (SleepSec > 0) - { - Sleep(int(SleepSec * 1000)); - } - } - - void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const - { - if (m_EnableJsonOutput) - { - ExtendableStringBuilder<128> SB; - CompactBinaryToJson(Data, SB); - std::filesystem::path JsonPath = OriginalPath; - JsonPath.replace_extension(".json"); - std::string_view JsonMetaData = SB.ToView(); - TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length())); - } - } - - void WriteBuild(const Oid& BuildId, CbObjectView Data) - { - const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); - CreateDirectories(BuildDataPath.parent_path()); - TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView()); - WriteAsJson(BuildDataPath, Data); - } - - CbObject ReadBuild(const Oid& BuildId) - { - const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); - FileContents Content = ReadFile(BuildDataPath); - if (Content.ErrorCode) - { - throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})", - BuildId, - BuildDataPath, - Content.ErrorCode.message(), - Content.ErrorCode.value())); - } - IoBuffer Payload = Content.Flatten(); - ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); - CbObject BuildObject = CbObject(SharedBuffer(Payload)); - return BuildObject; - } - - std::vector<IoHash> GetNeededAttachments(CbObjectView BuildPartObject) - { - std::vector<IoHash> NeededAttachments; - BuildPartObject.IterateAttachments([&](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsBinaryAttachment(); - const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash); - if (!IsFile(BlockPath)) - { - NeededAttachments.push_back(AttachmentHash); - } - }); - return NeededAttachments; - } - - bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload) - { - IoHash VerifyHash; - uint64_t VerifySize; - CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize); - if (!ValidateBuffer) - { - return false; - } - if (VerifyHash != RawHash) - { - return false; - } - - IoHashStream Hash; - bool CouldDecompress = ValidateBuffer.DecompressToStream( - 0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset, SourceSize, Offset); - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - } - return true; - }); - if (!CouldDecompress) - { - return false; - } - if (Hash.GetHash() != VerifyHash) - { - return false; - } - return true; - } - -private: - void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) - { - const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs(); - m_Stats.TotalBytesWritten += UploadedBytes; - m_Stats.TotalBytesRead += DownloadedBytes; - m_Stats.TotalRequestTimeUs += ElapsedUs; - m_Stats.TotalRequestCount++; - SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes); - SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes); - if (ElapsedUs > 0) - { - uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs; - SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); - } - } - - const std::filesystem::path m_StoragePath; - BuildStorage::Statistics& m_Stats; - const bool m_EnableJsonOutput = false; - std::atomic<uint64_t> m_WrittenBytes; - - const double m_LatencySec = 0.0; - const double m_DelayPerKBSec = 0.0; -}; - -std::unique_ptr<BuildStorage> -CreateFileBuildStorage(const std::filesystem::path& StoragePath, - BuildStorage::Statistics& Stats, - bool EnableJsonOutput, - double LatencySec, - double DelayPerKBSec) -{ - return std::make_unique<FileBuildStorage>(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec); -} - -} // namespace zen diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h deleted file mode 100644 index 46ecd0a11..000000000 --- a/src/zenutil/include/zenutil/buildstorage.h +++ /dev/null @@ -1,68 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/compactbinary.h> -#include <zenutil/chunkblock.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -namespace zen { - -class BuildStorage -{ -public: - struct Statistics - { - std::atomic<uint64_t> TotalBytesRead = 0; - std::atomic<uint64_t> TotalBytesWritten = 0; - std::atomic<uint64_t> TotalRequestCount = 0; - std::atomic<uint64_t> TotalRequestTimeUs = 0; - std::atomic<uint64_t> TotalExecutionTimeUs = 0; - std::atomic<uint64_t> PeakSentBytes = 0; - std::atomic<uint64_t> PeakReceivedBytes = 0; - std::atomic<uint64_t> PeakBytesPerSec = 0; - }; - - virtual ~BuildStorage() {} - - virtual CbObject ListNamespaces(bool bRecursive = false) = 0; - virtual CbObject ListBuilds(CbObject Query) = 0; - virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0; - virtual CbObject GetBuild(const Oid& BuildId) = 0; - virtual void FinalizeBuild(const Oid& BuildId) = 0; - - virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, - const Oid& BuildPartId, - std::string_view PartName, - const CbObject& MetaData) = 0; - virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) = 0; - virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) = 0; - virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0; - virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::function<void(uint64_t, bool)>&& OnSentBytes) = 0; - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t RangeOffset = 0, - uint64_t RangeBytes = (uint64_t)-1) = 0; - virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete) = 0; - - [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0; - virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0; - virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0; - - virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0; -}; - -} // namespace zen diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h deleted file mode 100644 index e6ca2c5e4..000000000 --- a/src/zenutil/include/zenutil/buildstoragecache.h +++ /dev/null @@ -1,69 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/logging.h> - -#include <zencore/compactbinary.h> -#include <zencore/compositebuffer.h> -#include <zenutil/chunkblock.h> - -namespace zen { - -class HttpClient; - -class BuildStorageCache -{ -public: - struct Statistics - { - std::atomic<uint64_t> TotalBytesRead = 0; - std::atomic<uint64_t> TotalBytesWritten = 0; - std::atomic<uint64_t> TotalRequestCount = 0; - std::atomic<uint64_t> TotalRequestTimeUs = 0; - std::atomic<uint64_t> TotalExecutionTimeUs = 0; - std::atomic<uint64_t> PeakSentBytes = 0; - std::atomic<uint64_t> PeakReceivedBytes = 0; - std::atomic<uint64_t> PeakBytesPerSec = 0; - }; - - virtual ~BuildStorageCache() {} - - virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0; - virtual IoBuffer GetBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t RangeOffset = 0, - uint64_t RangeBytes = (uint64_t)-1) = 0; - - virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0; - virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; - - struct BlobExistsResult - { - bool HasBody = 0; - bool HasMetadata = 0; - }; - - virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0; - - virtual void Flush( - int32_t UpdateIntervalMS, - std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0; -}; - -std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient, - BuildStorageCache::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - const std::filesystem::path& TempFolderPath, - bool BoostBackgroundThreadCount); - -struct ZenCacheEndpointTestResult -{ - bool Success = false; - std::string FailureReason; -}; - -ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2); - -} // namespace zen diff --git a/src/zenutil/include/zenutil/filebuildstorage.h b/src/zenutil/include/zenutil/filebuildstorage.h deleted file mode 100644 index c95fb32e6..000000000 --- a/src/zenutil/include/zenutil/filebuildstorage.h +++ /dev/null @@ -1,16 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/logging.h> -#include <zenutil/buildstorage.h> - -namespace zen { -class HttpClient; - -std::unique_ptr<BuildStorage> CreateFileBuildStorage(const std::filesystem::path& StoragePath, - BuildStorage::Statistics& Stats, - bool EnableJsonOutput, - double LatencySec = 0.0, - double DelayPerKBSec = 0.0); -} // namespace zen diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h deleted file mode 100644 index f25d8933b..000000000 --- a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#pragma once - -#include <zencore/logging.h> -#include <zenutil/buildstorage.h> - -namespace zen { -class HttpClient; - -std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog, - HttpClient& InHttpClient, - BuildStorage::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - bool AllowRedirect, - const std::filesystem::path& TempFolderPath); - -bool ParseBuildStorageUrl(std::string_view InUrl, - std::string& OutHost, - std::string& OutNamespace, - std::string& OutBucket, - std::string& OutBuildId); - -} // namespace zen diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp deleted file mode 100644 index 6eb3489dc..000000000 --- a/src/zenutil/jupiter/jupiterbuildstorage.cpp +++ /dev/null @@ -1,561 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/jupiter/jupiterbuildstorage.h> - -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryutil.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> -#include <zencore/trace.h> -#include <zenutil/jupiter/jupitersession.h> - -ZEN_THIRD_PARTY_INCLUDES_START -#include <tsl/robin_map.h> -ZEN_THIRD_PARTY_INCLUDES_END - -#include <regex> - -namespace zen { - -using namespace std::literals; - -namespace { - void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix) - { - int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0; - HttpResponseCode Status = - Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot; - throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status); - } -} // namespace - -class JupiterBuildStorage : public BuildStorage -{ -public: - JupiterBuildStorage(LoggerRef InLog, - HttpClient& InHttpClient, - Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - bool AllowRedirect, - const std::filesystem::path& TempFolderPath) - : m_Session(InLog, InHttpClient, AllowRedirect) - , m_Stats(Stats) - , m_Namespace(Namespace) - , m_Bucket(Bucket) - , m_TempFolderPath(TempFolderPath) - { - } - virtual ~JupiterBuildStorage() {} - - virtual CbObject ListNamespaces(bool bRecursive) override - { - ZEN_TRACE_CPU("Jupiter::ListNamespaces"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult ListResult = m_Session.ListBuildNamespaces(); - AddStatistic(ListResult); - if (!ListResult.Success) - { - ThrowFromJupiterResult(ListResult, "Failed listing namespaces"); - } - CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response); - - CbObjectWriter Response; - Response.BeginArray("results"sv); - for (CbFieldView NamespaceField : NamespaceResponse["namespaces"]) - { - std::string_view Namespace = NamespaceField.AsString(); - if (!Namespace.empty()) - { - Response.BeginObject(); - Response.AddString("name", Namespace); - - if (bRecursive) - { - JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace); - AddStatistic(BucketsResult); - if (!BucketsResult.Success) - { - ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace)); - } - CbObject BucketResponse = - PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response); - - Response.BeginArray("items"); - for (CbFieldView BucketField : BucketResponse["buckets"]) - { - std::string_view Bucket = BucketField.AsString(); - if (!Bucket.empty()) - { - Response.AddString(Bucket); - } - } - Response.EndArray(); - } - - Response.EndObject(); - } - } - Response.EndArray(); - - return Response.Save(); - } - - virtual CbObject ListBuilds(CbObject Query) override - { - ZEN_TRACE_CPU("Jupiter::ListBuilds"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = Query.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload); - AddStatistic(ListResult); - if (!ListResult.Success) - { - ThrowFromJupiterResult(ListResult, "Failed listing builds"sv); - } - return PayloadToCbObject("Failed listing builds"sv, ListResult.Response); - } - - virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("Jupiter::PutBuild"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload); - AddStatistic(PutResult); - if (!PutResult.Success) - { - ThrowFromJupiterResult(PutResult, "Failed creating build"sv); - } - return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response); - } - - virtual CbObject GetBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("Jupiter::GetBuild"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId); - AddStatistic(GetBuildResult); - if (!GetBuildResult.Success) - { - ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv); - } - return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response); - } - - virtual void FinalizeBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("Jupiter::FinalizeBuild"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId); - AddStatistic(FinalizeBuildResult); - if (!FinalizeBuildResult.Success) - { - ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv); - } - } - - virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, - const Oid& BuildPartId, - std::string_view PartName, - const CbObject& MetaData) override - { - ZEN_TRACE_CPU("Jupiter::PutBuildPart"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload); - AddStatistic(PutPartResult); - if (!PutPartResult.Success) - { - ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv); - } - return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs)); - } - - virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override - { - ZEN_TRACE_CPU("Jupiter::GetBuildPart"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId); - AddStatistic(GetBuildPartResult); - if (!GetBuildPartResult.Success) - { - ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv); - } - return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response); - } - - virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override - { - ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash); - AddStatistic(FinalizePartResult); - if (!FinalizePartResult.Success) - { - ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv); - } - return std::move(FinalizePartResult.Needs); - } - - virtual void PutBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - const CompositeBuffer& Payload) override - { - ZEN_TRACE_CPU("Jupiter::PutBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload); - AddStatistic(PutBlobResult); - if (!PutBlobResult.Success) - { - ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv); - } - } - - virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::function<void(uint64_t, bool)>&& OnSentBytes) override - { - ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - std::vector<std::function<JupiterResult(bool&)>> WorkItems; - JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace, - m_Bucket, - BuildId, - RawHash, - ContentType, - PayloadSize, - std::move(Transmitter), - WorkItems); - AddStatistic(PutMultipartBlobResult); - if (!PutMultipartBlobResult.Success) - { - ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv); - } - OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty()); - - std::vector<std::function<void()>> WorkList; - for (auto& WorkItem : WorkItems) - { - WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() { - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - bool IsComplete = false; - JupiterResult PartResult = WorkItem(IsComplete); - AddStatistic(PartResult); - if (!PartResult.Success) - { - ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv); - } - OnSentBytes(PartResult.SentBytes, IsComplete); - }); - } - return WorkList; - } - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override - { - ZEN_TRACE_CPU("Jupiter::GetBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - CreateDirectories(m_TempFolderPath); - JupiterResult GetBuildBlobResult = - m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes); - AddStatistic(GetBuildBlobResult); - if (!GetBuildBlobResult.Success) - { - ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv); - } - return std::move(GetBuildBlobResult.Response); - } - - virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete) override - { - ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - std::vector<std::function<JupiterResult()>> WorkItems; - JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace, - m_Bucket, - BuildId, - RawHash, - ChunkSize, - std::move(OnReceive), - std::move(OnComplete), - WorkItems); - - AddStatistic(GetMultipartBlobResult); - if (!GetMultipartBlobResult.Success) - { - ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv); - } - std::vector<std::function<void()>> WorkList; - for (auto& WorkItem : WorkItems) - { - WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() { - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult PartResult = WorkItem(); - AddStatistic(PartResult); - if (!PartResult.Success) - { - ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv); - } - }); - } - return WorkList; - } - - virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("Jupiter::PutBlockMetadata"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload); - AddStatistic(PutMetaResult); - if (!PutMetaResult.Success) - { - if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound)) - { - return false; - } - ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv); - } - return true; - } - - virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override - { - ZEN_TRACE_CPU("Jupiter::FindBlocks"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount); - AddStatistic(FindResult); - if (!FindResult.Success) - { - ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv); - } - return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response); - } - - virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override - { - ZEN_TRACE_CPU("Jupiter::GetBlockMetadata"); - - Stopwatch ExecutionTimer; - auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); }); - CbObjectWriter Request; - - Request.BeginArray("blocks"sv); - for (const IoHash& BlockHash : BlockHashes) - { - Request.AddHash(BlockHash); - } - Request.EndArray(); - - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload); - AddStatistic(GetBlockMetadataResult); - if (!GetBlockMetadataResult.Success) - { - ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv); - } - return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response); - } - - virtual void PutBuildPartStats(const Oid& BuildId, - const Oid& BuildPartId, - const tsl::robin_map<std::string, double>& FloatStats) override - { - ZEN_UNUSED(BuildId, BuildPartId, FloatStats); - CbObjectWriter Request; - Request.BeginObject("floatStats"sv); - for (auto It : FloatStats) - { - Request.AddFloat(It.first, It.second); - } - Request.EndObject(); - IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer(); - Payload.SetContentType(ZenContentType::kCbObject); - JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload); - AddStatistic(PutBuildPartStatsResult); - if (!PutBuildPartStatsResult.Success) - { - ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv); - } - } - -private: - static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload) - { - if (Payload.GetContentType() == ZenContentType::kJSON) - { - std::string_view Json(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize()); - return LoadCompactBinaryFromJson(Json).AsObject(); - } - else if (Payload.GetContentType() == ZenContentType::kCbObject) - { - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult); - ValidateResult == CbValidateError::None) - { - return Object; - } - else - { - throw std::runtime_error(fmt::format("{}: {} ({})", - "Invalid compact binary object: '{}'", - ErrorContext, - ToString(Payload.GetContentType()), - ToString(ValidateResult))); - } - } - else if (Payload.GetContentType() == ZenContentType::kCompressedBinary) - { - IoHash RawHash; - uint64_t RawSize; - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject Object = - ValidateAndReadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize), - ValidateResult); - ValidateResult == CbValidateError::None) - { - return Object; - } - else - { - throw std::runtime_error(fmt::format("{}: {} ({})", - "Invalid compresed compact binary object: '{}'", - ErrorContext, - ToString(Payload.GetContentType()), - ToString(ValidateResult))); - } - } - else - { - throw std::runtime_error( - fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType()))); - } - } - - void AddStatistic(const JupiterResult& Result) - { - m_Stats.TotalBytesWritten += Result.SentBytes; - m_Stats.TotalBytesRead += Result.ReceivedBytes; - m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0); - m_Stats.TotalRequestCount++; - - SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes); - SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes); - if (Result.ElapsedSeconds > 0.0) - { - uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds); - SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); - } - } - - JupiterSession m_Session; - Statistics& m_Stats; - const std::string m_Namespace; - const std::string m_Bucket; - const std::filesystem::path m_TempFolderPath; -}; - -std::unique_ptr<BuildStorage> -CreateJupiterBuildStorage(LoggerRef InLog, - HttpClient& InHttpClient, - BuildStorage::Statistics& Stats, - std::string_view Namespace, - std::string_view Bucket, - bool AllowRedirect, - const std::filesystem::path& TempFolderPath) -{ - ZEN_TRACE_CPU("CreateJupiterBuildStorage"); - - return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath); -} - -bool -ParseBuildStorageUrl(std::string_view InUrl, - std::string& OutHost, - std::string& OutNamespace, - std::string& OutBucket, - std::string& OutBuildId) -{ - std::string Url(InUrl); - const std::string_view ExtendedApiString = "api/v2/builds/"; - if (auto ApiString = ToLower(Url).find(ExtendedApiString); ApiString != std::string::npos) - { - Url.erase(ApiString, ExtendedApiString.length()); - } - - const std::string ArtifactURLRegExString = R"((http[s]?:\/\/.*?)\/(.*?)\/(.*?)\/(.*))"; - const std::regex ArtifactURLRegEx(ArtifactURLRegExString, std::regex::ECMAScript | std::regex::icase); - std::match_results<std::string_view::const_iterator> MatchResults; - std::string_view UrlToParse(Url); - if (regex_match(begin(UrlToParse), end(UrlToParse), MatchResults, ArtifactURLRegEx) && MatchResults.size() == 5) - { - auto GetMatch = [&MatchResults](uint32_t Index) -> std::string_view { - ZEN_ASSERT(Index < MatchResults.size()); - - const auto& Match = MatchResults[Index]; - - return std::string_view(&*Match.first, Match.second - Match.first); - }; - - const std::string_view Host = GetMatch(1); - const std::string_view Namespace = GetMatch(2); - const std::string_view Bucket = GetMatch(3); - const std::string_view BuildId = GetMatch(4); - - OutHost = Host; - OutNamespace = Namespace; - OutBucket = Bucket; - OutBuildId = BuildId; - return true; - } - else - { - return false; - } -} - -} // namespace zen |