aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenutil')
-rw-r--r--src/zenutil/buildstoragecache.cpp434
-rw-r--r--src/zenutil/filebuildstorage.cpp806
-rw-r--r--src/zenutil/include/zenutil/buildstorage.h68
-rw-r--r--src/zenutil/include/zenutil/buildstoragecache.h69
-rw-r--r--src/zenutil/include/zenutil/filebuildstorage.h16
-rw-r--r--src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h25
-rw-r--r--src/zenutil/jupiter/jupiterbuildstorage.cpp561
7 files changed, 0 insertions, 1979 deletions
diff --git a/src/zenutil/buildstoragecache.cpp b/src/zenutil/buildstoragecache.cpp
deleted file mode 100644
index 376d967d1..000000000
--- a/src/zenutil/buildstoragecache.cpp
+++ /dev/null
@@ -1,434 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/buildstoragecache.h>
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zencore/workthreadpool.h>
-#include <zenhttp/httpclient.h>
-#include <zenhttp/packageformat.h>
-#include <zenutil/workerpools.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_set.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-namespace zen {
-
-using namespace std::literals;
-
-class ZenBuildStorageCache : public BuildStorageCache
-{
-public:
- explicit ZenBuildStorageCache(HttpClient& HttpClient,
- BuildStorageCache::Statistics& Stats,
- std::string_view Namespace,
- std::string_view Bucket,
- const std::filesystem::path& TempFolderPath,
- bool BoostBackgroundThreadCount)
- : m_HttpClient(HttpClient)
- , m_Stats(Stats)
- , m_Namespace(Namespace.empty() ? "none" : Namespace)
- , m_Bucket(Bucket.empty() ? "none" : Bucket)
- , m_TempFolderPath(std::filesystem::path(TempFolderPath).make_preferred())
- , m_BoostBackgroundThreadCount(BoostBackgroundThreadCount)
- , m_BackgroundWorkPool(m_BoostBackgroundThreadCount ? GetSmallWorkerPool(EWorkloadType::Background)
- : GetTinyWorkerPool(EWorkloadType::Background))
- , m_PendingBackgroundWorkCount(1)
- , m_CancelBackgroundWork(false)
- {
- }
-
- virtual ~ZenBuildStorageCache()
- {
- try
- {
- m_CancelBackgroundWork.store(true);
- if (!IsFlushed)
- {
- m_PendingBackgroundWorkCount.CountDown();
- m_PendingBackgroundWorkCount.Wait();
- }
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("~ZenBuildStorageCache() failed with: {}", Ex.what());
- }
- }
-
- void ScheduleBackgroundWork(std::function<void()>&& Work)
- {
- m_PendingBackgroundWorkCount.AddCount(1);
- try
- {
- m_BackgroundWorkPool.ScheduleWork(
- [this, Work = std::move(Work)]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::BackgroundWork");
- auto _ = MakeGuard([this]() { m_PendingBackgroundWorkCount.CountDown(); });
- if (!m_CancelBackgroundWork)
- {
- try
- {
- Work();
- }
- catch (const std::exception& Ex)
- {
- ZEN_ERROR("Failed executing background upload to build cache. Reason: {}", Ex.what());
- }
- }
- },
- WorkerThreadPool::EMode::EnableBacklog);
- }
- catch (const std::exception& Ex)
- {
- m_PendingBackgroundWorkCount.CountDown();
- ZEN_ERROR("Failed scheduling background upload to build cache. Reason: {}", Ex.what());
- }
- }
-
- virtual void PutBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload) override
- {
- ZEN_ASSERT(!IsFlushed);
- ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
- ScheduleBackgroundWork(
- [this, BuildId = Oid(BuildId), RawHash = IoHash(RawHash), ContentType, Payload = CompositeBuffer(Payload)]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::PutBuildBlob");
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- HttpClient::Response CacheResponse =
- m_HttpClient.Upload(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
- Payload,
- ContentType);
- AddStatistic(CacheResponse);
- if (!CacheResponse.IsSuccess())
- {
- ZEN_DEBUG("Failed posting blob to cache: {}", CacheResponse.ErrorMessage(""sv));
- }
- });
- }
-
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
- {
- ZEN_TRACE_CPU("ZenBuildStorageCache::GetBuildBlob");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- HttpClient::KeyValueMap Headers;
- if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
- {
- Headers.Entries.insert({"Range", fmt::format("bytes={}-{}", RangeOffset, RangeOffset + RangeBytes - 1)});
- }
- CreateDirectories(m_TempFolderPath);
- HttpClient::Response CacheResponse =
- m_HttpClient.Download(fmt::format("/builds/{}/{}/{}/blobs/{}", m_Namespace, m_Bucket, BuildId, RawHash.ToHexString()),
- m_TempFolderPath,
- Headers);
- AddStatistic(CacheResponse);
- if (CacheResponse.IsSuccess())
- {
- return CacheResponse.ResponsePayload;
- }
- return {};
- }
-
- virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) override
- {
- ZEN_ASSERT(!IsFlushed);
- ScheduleBackgroundWork([this,
- BuildId = Oid(BuildId),
- BlobRawHashes = std::vector<IoHash>(BlobHashes.begin(), BlobHashes.end()),
- MetaDatas = std::vector<CbObject>(MetaDatas.begin(), MetaDatas.end())]() {
- ZEN_TRACE_CPU("ZenBuildStorageCache::PutBlobMetadatas");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- const uint64_t BlobCount = BlobRawHashes.size();
-
- CbPackage RequestPackage;
- std::vector<CbAttachment> Attachments;
- tsl::robin_set<IoHash, IoHash::Hasher> AttachmentHashes;
- Attachments.reserve(BlobCount);
- AttachmentHashes.reserve(BlobCount);
- {
- CbObjectWriter RequestWriter;
- RequestWriter.BeginArray("blobHashes");
- for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
- {
- RequestWriter.AddHash(BlobRawHashes[BlockHashIndex]);
- }
- RequestWriter.EndArray(); // blobHashes
-
- RequestWriter.BeginArray("metadatas");
- for (size_t BlockHashIndex = 0; BlockHashIndex < BlobRawHashes.size(); BlockHashIndex++)
- {
- const IoHash ObjectHash = MetaDatas[BlockHashIndex].GetHash();
- RequestWriter.AddBinaryAttachment(ObjectHash);
- if (!AttachmentHashes.contains(ObjectHash))
- {
- Attachments.push_back(CbAttachment(MetaDatas[BlockHashIndex], ObjectHash));
- AttachmentHashes.insert(ObjectHash);
- }
- }
-
- RequestWriter.EndArray(); // metadatas
-
- RequestPackage.SetObject(RequestWriter.Save());
- }
- RequestPackage.AddAttachments(Attachments);
-
- CompositeBuffer RpcRequestBuffer = FormatPackageMessageBuffer(RequestPackage);
-
- HttpClient::Response CacheResponse =
- m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/putBlobMetadata", m_Namespace, m_Bucket, BuildId),
- RpcRequestBuffer,
- ZenContentType::kCbPackage);
- AddStatistic(CacheResponse);
- if (!CacheResponse.IsSuccess())
- {
- ZEN_DEBUG("Failed posting blob metadata to cache: {}", CacheResponse.ErrorMessage(""sv));
- }
- });
- }
-
- virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
- {
- ZEN_TRACE_CPU("ZenBuildStorageCache::GetBlobMetadatas");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- CbObjectWriter Request;
-
- Request.BeginArray("blobHashes"sv);
- for (const IoHash& BlobHash : BlobHashes)
- {
- Request.AddHash(BlobHash);
- }
- Request.EndArray();
-
- IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
-
- HttpClient::Response Response =
- m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/getBlobMetadata", m_Namespace, m_Bucket, BuildId),
- Payload,
- HttpClient::Accept(ZenContentType::kCbObject));
- AddStatistic(Response);
- if (Response.IsSuccess())
- {
- std::vector<CbObject> Result;
-
- CbPackage ResponsePackage = ParsePackageMessage(Response.ResponsePayload);
- CbObject ResponseObject = ResponsePackage.GetObject();
-
- CbArrayView BlobHashArray = ResponseObject["blobHashes"sv].AsArrayView();
- CbArrayView MetadatasArray = ResponseObject["metadatas"sv].AsArrayView();
- Result.reserve(MetadatasArray.Num());
- auto BlobHashesIt = BlobHashes.begin();
- auto BlobHashArrayIt = begin(BlobHashArray);
- auto MetadataArrayIt = begin(MetadatasArray);
- while (MetadataArrayIt != end(MetadatasArray))
- {
- const IoHash BlobHash = (*BlobHashArrayIt).AsHash();
- while (BlobHash != *BlobHashesIt)
- {
- ZEN_ASSERT(BlobHashesIt != BlobHashes.end());
- BlobHashesIt++;
- }
-
- ZEN_ASSERT(BlobHash == *BlobHashesIt);
-
- const IoHash MetaHash = (*MetadataArrayIt).AsAttachment();
- const CbAttachment* MetaAttachment = ResponsePackage.FindAttachment(MetaHash);
- ZEN_ASSERT(MetaAttachment);
-
- CbObject Metadata = MetaAttachment->AsObject();
- Result.emplace_back(std::move(Metadata));
-
- BlobHashArrayIt++;
- MetadataArrayIt++;
- BlobHashesIt++;
- }
- return Result;
- }
- return {};
- }
-
- virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) override
- {
- ZEN_TRACE_CPU("ZenBuildStorageCache::BlobsExists");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
-
- CbObjectWriter Request;
-
- Request.BeginArray("blobHashes"sv);
- for (const IoHash& BlobHash : BlobHashes)
- {
- Request.AddHash(BlobHash);
- }
- Request.EndArray();
-
- IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
-
- HttpClient::Response Response = m_HttpClient.Post(fmt::format("/builds/{}/{}/{}/blobs/exists", m_Namespace, m_Bucket, BuildId),
- Payload,
- HttpClient::Accept(ZenContentType::kCbObject));
- AddStatistic(Response);
- if (Response.IsSuccess())
- {
- CbObject ResponseObject = LoadCompactBinaryObject(Response.ResponsePayload);
- if (!ResponseObject)
- {
- throw std::runtime_error("BlobExists reponse is invalid, failed to load payload as compact binary object");
- }
- CbArrayView BlobsExistsArray = ResponseObject["blobExists"sv].AsArrayView();
- if (!BlobsExistsArray)
- {
- throw std::runtime_error("BlobExists reponse is invalid, 'blobExists' array is missing");
- }
- if (BlobsExistsArray.Num() != BlobHashes.size())
- {
- throw std::runtime_error(fmt::format("BlobExists reponse is invalid, 'blobExists' array contains {} entries, expected {}",
- BlobsExistsArray.Num(),
- BlobHashes.size()));
- }
-
- CbArrayView MetadatasExistsArray = ResponseObject["metadataExists"sv].AsArrayView();
- if (!MetadatasExistsArray)
- {
- throw std::runtime_error("BlobExists reponse is invalid, 'metadataExists' array is missing");
- }
- if (MetadatasExistsArray.Num() != BlobHashes.size())
- {
- throw std::runtime_error(
- fmt::format("BlobExists reponse is invalid, 'metadataExists' array contains {} entries, expected {}",
- MetadatasExistsArray.Num(),
- BlobHashes.size()));
- }
-
- std::vector<BlobExistsResult> Result;
- Result.reserve(BlobHashes.size());
- auto BlobExistsIt = begin(BlobsExistsArray);
- auto MetadataExistsIt = begin(MetadatasExistsArray);
- while (BlobExistsIt != end(BlobsExistsArray))
- {
- ZEN_ASSERT(MetadataExistsIt != end(MetadatasExistsArray));
-
- const bool HasBody = (*BlobExistsIt).AsBool();
- const bool HasMetadata = (*MetadataExistsIt).AsBool();
-
- Result.push_back({.HasBody = HasBody, .HasMetadata = HasMetadata});
-
- BlobExistsIt++;
- MetadataExistsIt++;
- }
- return Result;
- }
- return {};
- }
-
- virtual void Flush(int32_t UpdateIntervalMS, std::function<bool(intptr_t Remaining)>&& UpdateCallback) override
- {
- if (IsFlushed)
- {
- return;
- }
- if (!IsFlushed)
- {
- m_PendingBackgroundWorkCount.CountDown();
- IsFlushed = true;
- }
- if (m_PendingBackgroundWorkCount.Wait(100))
- {
- return;
- }
- while (true)
- {
- intptr_t Remaining = m_PendingBackgroundWorkCount.Remaining();
- if (UpdateCallback(Remaining))
- {
- if (m_PendingBackgroundWorkCount.Wait(UpdateIntervalMS))
- {
- UpdateCallback(0);
- return;
- }
- }
- else
- {
- m_CancelBackgroundWork.store(true);
- }
- }
- }
-
-private:
- void AddStatistic(const HttpClient::Response& Result)
- {
- m_Stats.TotalBytesWritten += Result.UploadedBytes;
- m_Stats.TotalBytesRead += Result.DownloadedBytes;
- m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
- m_Stats.TotalRequestCount++;
- SetAtomicMax(m_Stats.PeakSentBytes, Result.UploadedBytes);
- SetAtomicMax(m_Stats.PeakReceivedBytes, Result.DownloadedBytes);
- if (Result.ElapsedSeconds > 0.0)
- {
- uint64_t BytesPerSec = uint64_t((Result.UploadedBytes + Result.DownloadedBytes) / Result.ElapsedSeconds);
- SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
- }
- }
-
- HttpClient& m_HttpClient;
- BuildStorageCache::Statistics& m_Stats;
- const std::string m_Namespace;
- const std::string m_Bucket;
- const std::filesystem::path m_TempFolderPath;
- const bool m_BoostBackgroundThreadCount;
- bool IsFlushed = false;
-
- WorkerThreadPool& m_BackgroundWorkPool;
- Latch m_PendingBackgroundWorkCount;
- std::atomic<bool> m_CancelBackgroundWork;
-};
-
-std::unique_ptr<BuildStorageCache>
-CreateZenBuildStorageCache(HttpClient& HttpClient,
- BuildStorageCache::Statistics& Stats,
- std::string_view Namespace,
- std::string_view Bucket,
- const std::filesystem::path& TempFolderPath,
- bool BoostBackgroundThreadCount)
-{
- return std::make_unique<ZenBuildStorageCache>(HttpClient, Stats, Namespace, Bucket, TempFolderPath, BoostBackgroundThreadCount);
-}
-
-ZenCacheEndpointTestResult
-TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2)
-{
- HttpClientSettings TestClientSettings{.LogCategory = "httpcacheclient",
- .ConnectTimeout = std::chrono::milliseconds{1000},
- .Timeout = std::chrono::milliseconds{2000},
- .AssumeHttp2 = AssumeHttp2,
- .AllowResume = true,
- .RetryCount = 0};
- HttpClient TestHttpClient(BaseUrl, TestClientSettings);
- HttpClient::Response TestResponse = TestHttpClient.Get("/status/builds");
- if (TestResponse.IsSuccess())
- {
- return {.Success = true};
- }
- return {.Success = false, .FailureReason = TestResponse.ErrorMessage("")};
-};
-
-} // namespace zen
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
deleted file mode 100644
index f75fe403f..000000000
--- a/src/zenutil/filebuildstorage.cpp
+++ /dev/null
@@ -1,806 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/filebuildstorage.h>
-
-#include <zencore/basicfile.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-class FileBuildStorage : public BuildStorage
-{
-public:
- explicit FileBuildStorage(const std::filesystem::path& StoragePath,
- BuildStorage::Statistics& Stats,
- bool EnableJsonOutput,
- double LatencySec,
- double DelayPerKBSec)
- : m_StoragePath(StoragePath)
- , m_Stats(Stats)
- , m_EnableJsonOutput(EnableJsonOutput)
- , m_LatencySec(LatencySec)
- , m_DelayPerKBSec(DelayPerKBSec)
- {
- CreateDirectories(GetBuildsFolder());
- CreateDirectories(GetBlobsFolder());
- CreateDirectories(GetBlobsMetadataFolder());
- }
-
- virtual ~FileBuildStorage() {}
-
- virtual CbObject ListNamespaces(bool bRecursive) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
- ZEN_UNUSED(bRecursive);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter Writer;
- Writer.BeginArray("results");
- {
- }
- Writer.EndArray(); // results
-
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual CbObject ListBuilds(CbObject Query) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
- ZEN_UNUSED(Query);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = Query.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildFolder = GetBuildsFolder();
- DirectoryContent Content;
- GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content);
- CbObjectWriter Writer;
- Writer.BeginArray("results");
- {
- for (const std::filesystem::path& BuildPath : Content.Directories)
- {
- Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string());
- if (BuildId != Oid::Zero)
- {
- Writer.BeginObject();
- {
- Writer.AddObjectId("buildId", BuildId);
- Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView());
- }
- Writer.EndObject();
- }
- }
- }
- Writer.EndArray(); // results
-
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBuild");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = MetaData.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter BuildObject;
- BuildObject.AddObject("metadata", MetaData);
- BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
- WriteBuild(BuildId, BuildObject.Save());
-
- CbObjectWriter BuildResponse;
- BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
- BuildResponse.Finalize();
- ReceivedBytes = BuildResponse.GetSaveSize();
- return BuildResponse.Save();
- }
-
- virtual CbObject GetBuild(const Oid& BuildId) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBuild");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObject Build = ReadBuild(BuildId);
- ReceivedBytes = Build.GetSize();
- return Build;
- }
-
- virtual void FinalizeBuild(const Oid& BuildId) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
- }
-
- virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
- const Oid& BuildPartId,
- std::string_view PartName,
- const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = MetaData.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
- CreateDirectories(BuildPartDataPath.parent_path());
-
- TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView());
- m_WrittenBytes += MetaData.GetSize();
- WriteAsJson(BuildPartDataPath, MetaData);
-
- IoHash RawHash = IoHash::HashBuffer(MetaData.GetView());
-
- CbObjectWriter Writer;
- {
- CbObject BuildObject = ReadBuild(BuildId);
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView();
-
- Writer.AddObject("metadata"sv, MetaDataView);
- Writer.BeginObject("parts"sv);
- {
- for (CbFieldView PartView : PartsObject)
- {
- if (PartView.GetName() != PartName)
- {
- Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId());
- }
- }
- Writer.AddObjectId(PartName, BuildPartId);
- }
- Writer.EndObject(); // parts
- }
- WriteBuild(BuildId, Writer.Save());
-
- std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData);
-
- ReceivedBytes = sizeof(IoHash) * NeededAttachments.size();
-
- return std::make_pair(RawHash, std::move(NeededAttachments));
- }
-
- virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
-
- IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
-
- ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
-
- CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
-
- ReceivedBytes = BuildPartObject.GetSize();
-
- return BuildPartObject;
- }
-
- virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
- IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
-
- IoHash RawHash = IoHash::HashBuffer(Payload.GetView());
- if (RawHash != PartHash)
- {
- throw std::runtime_error(
- fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash));
- }
-
- CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
- std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject));
-
- ReceivedBytes = NeededAttachments.size() * sizeof(IoHash);
-
- return NeededAttachments;
- }
-
- virtual void PutBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob");
- ZEN_UNUSED(BuildId);
- ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
-
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload));
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = Payload.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!IsFile(BlockPath))
- {
- CreateDirectories(BlockPath.parent_path());
- TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
- }
-
- ReceivedBytes = Payload.GetSize();
- }
-
- virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- uint64_t PayloadSize,
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
- std::function<void(uint64_t, bool)>&& OnSentBytes) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob");
- ZEN_UNUSED(BuildId);
- ZEN_UNUSED(ContentType);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!IsFile(BlockPath))
- {
- CreateDirectories(BlockPath.parent_path());
-
- struct WorkloadData
- {
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
- std::function<void(uint64_t, bool)> OnSentBytes;
- TemporaryFile TempFile;
- std::atomic<size_t> PartsLeft;
- };
-
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->Transmitter = std::move(Transmitter);
- Workload->OnSentBytes = std::move(OnSentBytes);
- std::error_code Ec;
- Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec);
-
- if (Ec)
- {
- throw std::runtime_error(
- fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
- }
-
- std::vector<std::function<void()>> WorkItems;
- uint64_t Offset = 0;
- while (Offset < PayloadSize)
- {
- uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset);
-
- WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() {
- ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work");
- IoBuffer PartPayload = Workload->Transmitter(Offset, Size);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = PartPayload.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- std::error_code Ec;
- Workload->TempFile.Write(PartPayload, Offset, Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})",
- Workload->TempFile.GetPath(),
- Ec.message(),
- Ec.value()));
- }
-
- const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1;
- if (IsLastPart)
- {
- Workload->TempFile.Flush();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll())));
- Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})",
- Workload->TempFile.GetPath(),
- BlockPath,
- Ec.message(),
- Ec.value()));
- }
- }
- Workload->OnSentBytes(SentBytes, IsLastPart);
- });
-
- Offset += Size;
- }
- Workload->PartsLeft.store(WorkItems.size());
- return WorkItems;
- }
- return {};
- }
-
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (IsFile(BlockPath))
- {
- BasicFile File(BlockPath, BasicFile::Mode::kRead);
- IoBuffer Payload;
- if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
- {
- Payload = IoBuffer(RangeBytes);
- File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset);
- }
- else
- {
- Payload = File.ReadAll();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
- }
- Payload.SetContentType(ZenContentType::kCompressedBinary);
- ReceivedBytes = Payload.GetSize();
- return Payload;
- }
- return IoBuffer{};
- }
-
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (IsFile(BlockPath))
- {
- struct WorkloadData
- {
- std::atomic<uint64_t> BytesRemaining;
- BasicFile BlobFile;
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
- std::function<void()> OnComplete;
- };
-
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
- const uint64_t BlobSize = Workload->BlobFile.FileSize();
-
- Workload->OnReceive = std::move(OnReceive);
- Workload->OnComplete = std::move(OnComplete);
- Workload->BytesRemaining = BlobSize;
-
- std::vector<std::function<void()>> WorkItems;
- uint64_t Offset = 0;
- while (Offset < BlobSize)
- {
- uint64_t Size = Min(ChunkSize, BlobSize - Offset);
- WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
- ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work");
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- IoBuffer PartPayload(Size);
- Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
- ReceivedBytes = PartPayload.GetSize();
- Workload->OnReceive(Offset, PartPayload);
- uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
- if (ByteRemaning == Size)
- {
- Workload->OnComplete();
- }
- });
-
- Offset += Size;
- }
- return WorkItems;
- }
- return {};
- }
-
- virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = MetaData.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash);
- CreateDirectories(BlockMetaDataPath.parent_path());
- TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView());
- WriteAsJson(BlockMetaDataPath, MetaData);
- return true;
- }
-
- virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- uint64_t FoundCount = 0;
-
- DirectoryContent Content;
- GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
- CbObjectWriter Writer;
- Writer.BeginArray("blocks");
- for (const std::filesystem::path& MetaDataFile : Content.Files)
- {
- IoHash ChunkHash;
- if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash))
- {
- std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash);
- if (IsFile(BlockPath))
- {
- IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
-
- CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Writer.AddObject(BlockObject);
- FoundCount++;
- if (FoundCount == MaxBlockCount)
- {
- break;
- }
- }
- }
- }
- Writer.EndArray(); // blocks
-
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter Writer;
- Writer.BeginArray("blocks");
-
- for (const IoHash& BlockHash : BlockHashes)
- {
- std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
- if (IsFile(MetaDataFile))
- {
- IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
-
- CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Writer.AddObject(BlockObject);
- }
- }
- Writer.EndArray(); // blocks
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual void PutBuildPartStats(const Oid& BuildId,
- const Oid& BuildPartId,
- const tsl::robin_map<std::string, double>& FloatStats) override
- {
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter Request;
- Request.BeginObject("floatStats"sv);
- for (auto It : FloatStats)
- {
- Request.AddFloat(It.first, It.second);
- }
- Request.EndObject();
- Request.Finalize();
- SentBytes = Request.GetSaveSize();
-
- const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
- CreateDirectories(BuildPartStatsDataPath.parent_path());
-
- CbObject Payload = Request.Save();
-
- TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
- WriteAsJson(BuildPartStatsDataPath, Payload);
- }
-
-protected:
- std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; }
- std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; }
- std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; }
- std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); }
-
- std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; }
-
- std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const
- {
- return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString();
- }
-
- std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const
- {
- return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb";
- }
-
- std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const
- {
- return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid());
- }
-
- std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); }
-
- std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const
- {
- return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash);
- }
-
- void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize)
- {
- double SleepSec = m_LatencySec;
- if (m_DelayPerKBSec > 0.0)
- {
- SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u);
- }
- if (SleepSec > 0)
- {
- Sleep(int(SleepSec * 1000));
- }
- }
-
- void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const
- {
- if (m_EnableJsonOutput)
- {
- ExtendableStringBuilder<128> SB;
- CompactBinaryToJson(Data, SB);
- std::filesystem::path JsonPath = OriginalPath;
- JsonPath.replace_extension(".json");
- std::string_view JsonMetaData = SB.ToView();
- TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length()));
- }
- }
-
- void WriteBuild(const Oid& BuildId, CbObjectView Data)
- {
- const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
- CreateDirectories(BuildDataPath.parent_path());
- TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView());
- WriteAsJson(BuildDataPath, Data);
- }
-
- CbObject ReadBuild(const Oid& BuildId)
- {
- const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
- FileContents Content = ReadFile(BuildDataPath);
- if (Content.ErrorCode)
- {
- throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})",
- BuildId,
- BuildDataPath,
- Content.ErrorCode.message(),
- Content.ErrorCode.value()));
- }
- IoBuffer Payload = Content.Flatten();
- ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
- CbObject BuildObject = CbObject(SharedBuffer(Payload));
- return BuildObject;
- }
-
- std::vector<IoHash> GetNeededAttachments(CbObjectView BuildPartObject)
- {
- std::vector<IoHash> NeededAttachments;
- BuildPartObject.IterateAttachments([&](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsBinaryAttachment();
- const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash);
- if (!IsFile(BlockPath))
- {
- NeededAttachments.push_back(AttachmentHash);
- }
- });
- return NeededAttachments;
- }
-
- bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload)
- {
- IoHash VerifyHash;
- uint64_t VerifySize;
- CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize);
- if (!ValidateBuffer)
- {
- return false;
- }
- if (VerifyHash != RawHash)
- {
- return false;
- }
-
- IoHashStream Hash;
- bool CouldDecompress = ValidateBuffer.DecompressToStream(
- 0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset, SourceSize, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- Hash.Append(Segment.GetView());
- }
- return true;
- });
- if (!CouldDecompress)
- {
- return false;
- }
- if (Hash.GetHash() != VerifyHash)
- {
- return false;
- }
- return true;
- }
-
-private:
- void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes)
- {
- const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs();
- m_Stats.TotalBytesWritten += UploadedBytes;
- m_Stats.TotalBytesRead += DownloadedBytes;
- m_Stats.TotalRequestTimeUs += ElapsedUs;
- m_Stats.TotalRequestCount++;
- SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes);
- SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes);
- if (ElapsedUs > 0)
- {
- uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs;
- SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
- }
- }
-
- const std::filesystem::path m_StoragePath;
- BuildStorage::Statistics& m_Stats;
- const bool m_EnableJsonOutput = false;
- std::atomic<uint64_t> m_WrittenBytes;
-
- const double m_LatencySec = 0.0;
- const double m_DelayPerKBSec = 0.0;
-};
-
-std::unique_ptr<BuildStorage>
-CreateFileBuildStorage(const std::filesystem::path& StoragePath,
- BuildStorage::Statistics& Stats,
- bool EnableJsonOutput,
- double LatencySec,
- double DelayPerKBSec)
-{
- return std::make_unique<FileBuildStorage>(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec);
-}
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstorage.h b/src/zenutil/include/zenutil/buildstorage.h
deleted file mode 100644
index 46ecd0a11..000000000
--- a/src/zenutil/include/zenutil/buildstorage.h
+++ /dev/null
@@ -1,68 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/compactbinary.h>
-#include <zenutil/chunkblock.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-namespace zen {
-
-class BuildStorage
-{
-public:
- struct Statistics
- {
- std::atomic<uint64_t> TotalBytesRead = 0;
- std::atomic<uint64_t> TotalBytesWritten = 0;
- std::atomic<uint64_t> TotalRequestCount = 0;
- std::atomic<uint64_t> TotalRequestTimeUs = 0;
- std::atomic<uint64_t> TotalExecutionTimeUs = 0;
- std::atomic<uint64_t> PeakSentBytes = 0;
- std::atomic<uint64_t> PeakReceivedBytes = 0;
- std::atomic<uint64_t> PeakBytesPerSec = 0;
- };
-
- virtual ~BuildStorage() {}
-
- virtual CbObject ListNamespaces(bool bRecursive = false) = 0;
- virtual CbObject ListBuilds(CbObject Query) = 0;
- virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) = 0;
- virtual CbObject GetBuild(const Oid& BuildId) = 0;
- virtual void FinalizeBuild(const Oid& BuildId) = 0;
-
- virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
- const Oid& BuildPartId,
- std::string_view PartName,
- const CbObject& MetaData) = 0;
- virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) = 0;
- virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) = 0;
- virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
- virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- uint64_t PayloadSize,
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
- std::function<void(uint64_t, bool)>&& OnSentBytes) = 0;
-
- virtual IoBuffer GetBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t RangeOffset = 0,
- uint64_t RangeBytes = (uint64_t)-1) = 0;
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete) = 0;
-
- [[nodiscard]] virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) = 0;
- virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) = 0;
- virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) = 0;
-
- virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map<std::string, double>& FloatStats) = 0;
-};
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/buildstoragecache.h b/src/zenutil/include/zenutil/buildstoragecache.h
deleted file mode 100644
index e6ca2c5e4..000000000
--- a/src/zenutil/include/zenutil/buildstoragecache.h
+++ /dev/null
@@ -1,69 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/logging.h>
-
-#include <zencore/compactbinary.h>
-#include <zencore/compositebuffer.h>
-#include <zenutil/chunkblock.h>
-
-namespace zen {
-
-class HttpClient;
-
-class BuildStorageCache
-{
-public:
- struct Statistics
- {
- std::atomic<uint64_t> TotalBytesRead = 0;
- std::atomic<uint64_t> TotalBytesWritten = 0;
- std::atomic<uint64_t> TotalRequestCount = 0;
- std::atomic<uint64_t> TotalRequestTimeUs = 0;
- std::atomic<uint64_t> TotalExecutionTimeUs = 0;
- std::atomic<uint64_t> PeakSentBytes = 0;
- std::atomic<uint64_t> PeakReceivedBytes = 0;
- std::atomic<uint64_t> PeakBytesPerSec = 0;
- };
-
- virtual ~BuildStorageCache() {}
-
- virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) = 0;
- virtual IoBuffer GetBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t RangeOffset = 0,
- uint64_t RangeBytes = (uint64_t)-1) = 0;
-
- virtual void PutBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes, std::span<const CbObject> MetaDatas) = 0;
- virtual std::vector<CbObject> GetBlobMetadatas(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
-
- struct BlobExistsResult
- {
- bool HasBody = 0;
- bool HasMetadata = 0;
- };
-
- virtual std::vector<BlobExistsResult> BlobsExists(const Oid& BuildId, std::span<const IoHash> BlobHashes) = 0;
-
- virtual void Flush(
- int32_t UpdateIntervalMS,
- std::function<bool(intptr_t Remaining)>&& UpdateCallback = [](intptr_t) { return true; }) = 0;
-};
-
-std::unique_ptr<BuildStorageCache> CreateZenBuildStorageCache(HttpClient& HttpClient,
- BuildStorageCache::Statistics& Stats,
- std::string_view Namespace,
- std::string_view Bucket,
- const std::filesystem::path& TempFolderPath,
- bool BoostBackgroundThreadCount);
-
-struct ZenCacheEndpointTestResult
-{
- bool Success = false;
- std::string FailureReason;
-};
-
-ZenCacheEndpointTestResult TestZenCacheEndpoint(std::string_view BaseUrl, const bool AssumeHttp2);
-
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/filebuildstorage.h b/src/zenutil/include/zenutil/filebuildstorage.h
deleted file mode 100644
index c95fb32e6..000000000
--- a/src/zenutil/include/zenutil/filebuildstorage.h
+++ /dev/null
@@ -1,16 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/logging.h>
-#include <zenutil/buildstorage.h>
-
-namespace zen {
-class HttpClient;
-
-std::unique_ptr<BuildStorage> CreateFileBuildStorage(const std::filesystem::path& StoragePath,
- BuildStorage::Statistics& Stats,
- bool EnableJsonOutput,
- double LatencySec = 0.0,
- double DelayPerKBSec = 0.0);
-} // namespace zen
diff --git a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h b/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
deleted file mode 100644
index f25d8933b..000000000
--- a/src/zenutil/include/zenutil/jupiter/jupiterbuildstorage.h
+++ /dev/null
@@ -1,25 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/logging.h>
-#include <zenutil/buildstorage.h>
-
-namespace zen {
-class HttpClient;
-
-std::unique_ptr<BuildStorage> CreateJupiterBuildStorage(LoggerRef InLog,
- HttpClient& InHttpClient,
- BuildStorage::Statistics& Stats,
- std::string_view Namespace,
- std::string_view Bucket,
- bool AllowRedirect,
- const std::filesystem::path& TempFolderPath);
-
-bool ParseBuildStorageUrl(std::string_view InUrl,
- std::string& OutHost,
- std::string& OutNamespace,
- std::string& OutBucket,
- std::string& OutBuildId);
-
-} // namespace zen
diff --git a/src/zenutil/jupiter/jupiterbuildstorage.cpp b/src/zenutil/jupiter/jupiterbuildstorage.cpp
deleted file mode 100644
index 6eb3489dc..000000000
--- a/src/zenutil/jupiter/jupiterbuildstorage.cpp
+++ /dev/null
@@ -1,561 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/jupiter/jupiterbuildstorage.h>
-
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryutil.h>
-#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-#include <zenutil/jupiter/jupitersession.h>
-
-ZEN_THIRD_PARTY_INCLUDES_START
-#include <tsl/robin_map.h>
-ZEN_THIRD_PARTY_INCLUDES_END
-
-#include <regex>
-
-namespace zen {
-
-using namespace std::literals;
-
-namespace {
- void ThrowFromJupiterResult(const JupiterResult& Result, std::string_view Prefix)
- {
- int Error = Result.ErrorCode < (int)HttpResponseCode::Continue ? Result.ErrorCode : 0;
- HttpResponseCode Status =
- Result.ErrorCode >= int(HttpResponseCode::Continue) ? HttpResponseCode(Result.ErrorCode) : HttpResponseCode::ImATeapot;
- throw HttpClientError(fmt::format("{}: {} ({})", Prefix, Result.Reason, Result.ErrorCode), Error, Status);
- }
-} // namespace
-
-class JupiterBuildStorage : public BuildStorage
-{
-public:
- JupiterBuildStorage(LoggerRef InLog,
- HttpClient& InHttpClient,
- Statistics& Stats,
- std::string_view Namespace,
- std::string_view Bucket,
- bool AllowRedirect,
- const std::filesystem::path& TempFolderPath)
- : m_Session(InLog, InHttpClient, AllowRedirect)
- , m_Stats(Stats)
- , m_Namespace(Namespace)
- , m_Bucket(Bucket)
- , m_TempFolderPath(TempFolderPath)
- {
- }
- virtual ~JupiterBuildStorage() {}
-
- virtual CbObject ListNamespaces(bool bRecursive) override
- {
- ZEN_TRACE_CPU("Jupiter::ListNamespaces");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult ListResult = m_Session.ListBuildNamespaces();
- AddStatistic(ListResult);
- if (!ListResult.Success)
- {
- ThrowFromJupiterResult(ListResult, "Failed listing namespaces");
- }
- CbObject NamespaceResponse = PayloadToCbObject("Failed listing namespaces"sv, ListResult.Response);
-
- CbObjectWriter Response;
- Response.BeginArray("results"sv);
- for (CbFieldView NamespaceField : NamespaceResponse["namespaces"])
- {
- std::string_view Namespace = NamespaceField.AsString();
- if (!Namespace.empty())
- {
- Response.BeginObject();
- Response.AddString("name", Namespace);
-
- if (bRecursive)
- {
- JupiterResult BucketsResult = m_Session.ListBuildBuckets(Namespace);
- AddStatistic(BucketsResult);
- if (!BucketsResult.Success)
- {
- ThrowFromJupiterResult(BucketsResult, fmt::format("Failed listing buckets in namespace {}", Namespace));
- }
- CbObject BucketResponse =
- PayloadToCbObject(fmt::format("Failed listing buckets in namespace {}", Namespace), BucketsResult.Response);
-
- Response.BeginArray("items");
- for (CbFieldView BucketField : BucketResponse["buckets"])
- {
- std::string_view Bucket = BucketField.AsString();
- if (!Bucket.empty())
- {
- Response.AddString(Bucket);
- }
- }
- Response.EndArray();
- }
-
- Response.EndObject();
- }
- }
- Response.EndArray();
-
- return Response.Save();
- }
-
- virtual CbObject ListBuilds(CbObject Query) override
- {
- ZEN_TRACE_CPU("Jupiter::ListBuilds");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- IoBuffer Payload = Query.GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult ListResult = m_Session.ListBuilds(m_Namespace, m_Bucket, Payload);
- AddStatistic(ListResult);
- if (!ListResult.Success)
- {
- ThrowFromJupiterResult(ListResult, "Failed listing builds"sv);
- }
- return PayloadToCbObject("Failed listing builds"sv, ListResult.Response);
- }
-
- virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("Jupiter::PutBuild");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutResult = m_Session.PutBuild(m_Namespace, m_Bucket, BuildId, Payload);
- AddStatistic(PutResult);
- if (!PutResult.Success)
- {
- ThrowFromJupiterResult(PutResult, "Failed creating build"sv);
- }
- return PayloadToCbObject(fmt::format("Failed creating build: {}", BuildId), PutResult.Response);
- }
-
- virtual CbObject GetBuild(const Oid& BuildId) override
- {
- ZEN_TRACE_CPU("Jupiter::GetBuild");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult GetBuildResult = m_Session.GetBuild(m_Namespace, m_Bucket, BuildId);
- AddStatistic(GetBuildResult);
- if (!GetBuildResult.Success)
- {
- ThrowFromJupiterResult(GetBuildResult, "Failed fetching build"sv);
- }
- return PayloadToCbObject(fmt::format("Failed fetching build {}:", BuildId), GetBuildResult.Response);
- }
-
- virtual void FinalizeBuild(const Oid& BuildId) override
- {
- ZEN_TRACE_CPU("Jupiter::FinalizeBuild");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult FinalizeBuildResult = m_Session.FinalizeBuild(m_Namespace, m_Bucket, BuildId);
- AddStatistic(FinalizeBuildResult);
- if (!FinalizeBuildResult.Success)
- {
- ThrowFromJupiterResult(FinalizeBuildResult, "Failed finalizing build"sv);
- }
- }
-
- virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
- const Oid& BuildPartId,
- std::string_view PartName,
- const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("Jupiter::PutBuildPart");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
- PutBuildPartResult PutPartResult = m_Session.PutBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartName, Payload);
- AddStatistic(PutPartResult);
- if (!PutPartResult.Success)
- {
- ThrowFromJupiterResult(PutPartResult, "Failed creating build part"sv);
- }
- return std::make_pair(PutPartResult.RawHash, std::move(PutPartResult.Needs));
- }
-
- virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
- {
- ZEN_TRACE_CPU("Jupiter::GetBuildPart");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult GetBuildPartResult = m_Session.GetBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId);
- AddStatistic(GetBuildPartResult);
- if (!GetBuildPartResult.Success)
- {
- ThrowFromJupiterResult(GetBuildPartResult, "Failed fetching build part"sv);
- }
- return PayloadToCbObject(fmt::format("Failed fetching build part {}:", BuildPartId), GetBuildPartResult.Response);
- }
-
- virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
- {
- ZEN_TRACE_CPU("Jupiter::FinalizeBuildPart");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- FinalizeBuildPartResult FinalizePartResult = m_Session.FinalizeBuildPart(m_Namespace, m_Bucket, BuildId, BuildPartId, PartHash);
- AddStatistic(FinalizePartResult);
- if (!FinalizePartResult.Success)
- {
- ThrowFromJupiterResult(FinalizePartResult, "Failed finalizing build part"sv);
- }
- return std::move(FinalizePartResult.Needs);
- }
-
- virtual void PutBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload) override
- {
- ZEN_TRACE_CPU("Jupiter::PutBuildBlob");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult PutBlobResult = m_Session.PutBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, ContentType, Payload);
- AddStatistic(PutBlobResult);
- if (!PutBlobResult.Success)
- {
- ThrowFromJupiterResult(PutBlobResult, "Failed putting build part"sv);
- }
- }
-
- virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- uint64_t PayloadSize,
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
- std::function<void(uint64_t, bool)>&& OnSentBytes) override
- {
- ZEN_TRACE_CPU("Jupiter::PutLargeBuildBlob");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- std::vector<std::function<JupiterResult(bool&)>> WorkItems;
- JupiterResult PutMultipartBlobResult = m_Session.PutMultipartBuildBlob(m_Namespace,
- m_Bucket,
- BuildId,
- RawHash,
- ContentType,
- PayloadSize,
- std::move(Transmitter),
- WorkItems);
- AddStatistic(PutMultipartBlobResult);
- if (!PutMultipartBlobResult.Success)
- {
- ThrowFromJupiterResult(PutMultipartBlobResult, "Failed putting large build blob"sv);
- }
- OnSentBytes(PutMultipartBlobResult.SentBytes, WorkItems.empty());
-
- std::vector<std::function<void()>> WorkList;
- for (auto& WorkItem : WorkItems)
- {
- WorkList.emplace_back([this, WorkItem = std::move(WorkItem), OnSentBytes]() {
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- bool IsComplete = false;
- JupiterResult PartResult = WorkItem(IsComplete);
- AddStatistic(PartResult);
- if (!PartResult.Success)
- {
- ThrowFromJupiterResult(PartResult, "Failed putting large build blob"sv);
- }
- OnSentBytes(PartResult.SentBytes, IsComplete);
- });
- }
- return WorkList;
- }
-
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
- {
- ZEN_TRACE_CPU("Jupiter::GetBuildBlob");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- CreateDirectories(m_TempFolderPath);
- JupiterResult GetBuildBlobResult =
- m_Session.GetBuildBlob(m_Namespace, m_Bucket, BuildId, RawHash, m_TempFolderPath, RangeOffset, RangeBytes);
- AddStatistic(GetBuildBlobResult);
- if (!GetBuildBlobResult.Success)
- {
- ThrowFromJupiterResult(GetBuildBlobResult, "Failed fetching build blob"sv);
- }
- return std::move(GetBuildBlobResult.Response);
- }
-
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete) override
- {
- ZEN_TRACE_CPU("Jupiter::GetLargeBuildBlob");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- std::vector<std::function<JupiterResult()>> WorkItems;
- JupiterResult GetMultipartBlobResult = m_Session.GetMultipartBuildBlob(m_Namespace,
- m_Bucket,
- BuildId,
- RawHash,
- ChunkSize,
- std::move(OnReceive),
- std::move(OnComplete),
- WorkItems);
-
- AddStatistic(GetMultipartBlobResult);
- if (!GetMultipartBlobResult.Success)
- {
- ThrowFromJupiterResult(GetMultipartBlobResult, "Failed getting large build part"sv);
- }
- std::vector<std::function<void()>> WorkList;
- for (auto& WorkItem : WorkItems)
- {
- WorkList.emplace_back([this, WorkItem = std::move(WorkItem)]() {
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult PartResult = WorkItem();
- AddStatistic(PartResult);
- if (!PartResult.Success)
- {
- ThrowFromJupiterResult(PartResult, "Failed getting large build part"sv);
- }
- });
- }
- return WorkList;
- }
-
- virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("Jupiter::PutBlockMetadata");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- IoBuffer Payload = MetaData.GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutMetaResult = m_Session.PutBlockMetadata(m_Namespace, m_Bucket, BuildId, BlockRawHash, Payload);
- AddStatistic(PutMetaResult);
- if (!PutMetaResult.Success)
- {
- if (PutMetaResult.ErrorCode == int32_t(HttpResponseCode::NotFound))
- {
- return false;
- }
- ThrowFromJupiterResult(PutMetaResult, "Failed putting build block metadata"sv);
- }
- return true;
- }
-
- virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
- {
- ZEN_TRACE_CPU("Jupiter::FindBlocks");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- JupiterResult FindResult = m_Session.FindBlocks(m_Namespace, m_Bucket, BuildId, MaxBlockCount);
- AddStatistic(FindResult);
- if (!FindResult.Success)
- {
- ThrowFromJupiterResult(FindResult, "Failed fetching known blocks"sv);
- }
- return PayloadToCbObject("Failed fetching known blocks"sv, FindResult.Response);
- }
-
- virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
- {
- ZEN_TRACE_CPU("Jupiter::GetBlockMetadata");
-
- Stopwatch ExecutionTimer;
- auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
- CbObjectWriter Request;
-
- Request.BeginArray("blocks"sv);
- for (const IoHash& BlockHash : BlockHashes)
- {
- Request.AddHash(BlockHash);
- }
- Request.EndArray();
-
- IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult GetBlockMetadataResult = m_Session.GetBlockMetadata(m_Namespace, m_Bucket, BuildId, Payload);
- AddStatistic(GetBlockMetadataResult);
- if (!GetBlockMetadataResult.Success)
- {
- ThrowFromJupiterResult(GetBlockMetadataResult, "Failed fetching block metadatas"sv);
- }
- return PayloadToCbObject("Failed fetching block metadatas", GetBlockMetadataResult.Response);
- }
-
- virtual void PutBuildPartStats(const Oid& BuildId,
- const Oid& BuildPartId,
- const tsl::robin_map<std::string, double>& FloatStats) override
- {
- ZEN_UNUSED(BuildId, BuildPartId, FloatStats);
- CbObjectWriter Request;
- Request.BeginObject("floatStats"sv);
- for (auto It : FloatStats)
- {
- Request.AddFloat(It.first, It.second);
- }
- Request.EndObject();
- IoBuffer Payload = Request.Save().GetBuffer().AsIoBuffer();
- Payload.SetContentType(ZenContentType::kCbObject);
- JupiterResult PutBuildPartStatsResult = m_Session.PutBuildPartStats(m_Namespace, m_Bucket, BuildId, BuildPartId, Payload);
- AddStatistic(PutBuildPartStatsResult);
- if (!PutBuildPartStatsResult.Success)
- {
- ThrowFromJupiterResult(PutBuildPartStatsResult, "Failed posting build part statistics"sv);
- }
- }
-
-private:
- static CbObject PayloadToCbObject(std::string_view ErrorContext, const IoBuffer& Payload)
- {
- if (Payload.GetContentType() == ZenContentType::kJSON)
- {
- std::string_view Json(reinterpret_cast<const char*>(Payload.GetData()), Payload.GetSize());
- return LoadCompactBinaryFromJson(Json).AsObject();
- }
- else if (Payload.GetContentType() == ZenContentType::kCbObject)
- {
- CbValidateError ValidateResult = CbValidateError::None;
- if (CbObject Object = ValidateAndReadCompactBinaryObject(IoBuffer(Payload), ValidateResult);
- ValidateResult == CbValidateError::None)
- {
- return Object;
- }
- else
- {
- throw std::runtime_error(fmt::format("{}: {} ({})",
- "Invalid compact binary object: '{}'",
- ErrorContext,
- ToString(Payload.GetContentType()),
- ToString(ValidateResult)));
- }
- }
- else if (Payload.GetContentType() == ZenContentType::kCompressedBinary)
- {
- IoHash RawHash;
- uint64_t RawSize;
- CbValidateError ValidateResult = CbValidateError::None;
- if (CbObject Object =
- ValidateAndReadCompactBinaryObject(CompressedBuffer::FromCompressed(SharedBuffer(Payload), RawHash, RawSize),
- ValidateResult);
- ValidateResult == CbValidateError::None)
- {
- return Object;
- }
- else
- {
- throw std::runtime_error(fmt::format("{}: {} ({})",
- "Invalid compresed compact binary object: '{}'",
- ErrorContext,
- ToString(Payload.GetContentType()),
- ToString(ValidateResult)));
- }
- }
- else
- {
- throw std::runtime_error(
- fmt::format("{}: {} ({})", "Unsupported response format", ErrorContext, ToString(Payload.GetContentType())));
- }
- }
-
- void AddStatistic(const JupiterResult& Result)
- {
- m_Stats.TotalBytesWritten += Result.SentBytes;
- m_Stats.TotalBytesRead += Result.ReceivedBytes;
- m_Stats.TotalRequestTimeUs += uint64_t(Result.ElapsedSeconds * 1000000.0);
- m_Stats.TotalRequestCount++;
-
- SetAtomicMax(m_Stats.PeakSentBytes, Result.SentBytes);
- SetAtomicMax(m_Stats.PeakReceivedBytes, Result.ReceivedBytes);
- if (Result.ElapsedSeconds > 0.0)
- {
- uint64_t BytesPerSec = uint64_t((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
- SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
- }
- }
-
- JupiterSession m_Session;
- Statistics& m_Stats;
- const std::string m_Namespace;
- const std::string m_Bucket;
- const std::filesystem::path m_TempFolderPath;
-};
-
-std::unique_ptr<BuildStorage>
-CreateJupiterBuildStorage(LoggerRef InLog,
- HttpClient& InHttpClient,
- BuildStorage::Statistics& Stats,
- std::string_view Namespace,
- std::string_view Bucket,
- bool AllowRedirect,
- const std::filesystem::path& TempFolderPath)
-{
- ZEN_TRACE_CPU("CreateJupiterBuildStorage");
-
- return std::make_unique<JupiterBuildStorage>(InLog, InHttpClient, Stats, Namespace, Bucket, AllowRedirect, TempFolderPath);
-}
-
-bool
-ParseBuildStorageUrl(std::string_view InUrl,
- std::string& OutHost,
- std::string& OutNamespace,
- std::string& OutBucket,
- std::string& OutBuildId)
-{
- std::string Url(InUrl);
- const std::string_view ExtendedApiString = "api/v2/builds/";
- if (auto ApiString = ToLower(Url).find(ExtendedApiString); ApiString != std::string::npos)
- {
- Url.erase(ApiString, ExtendedApiString.length());
- }
-
- const std::string ArtifactURLRegExString = R"((http[s]?:\/\/.*?)\/(.*?)\/(.*?)\/(.*))";
- const std::regex ArtifactURLRegEx(ArtifactURLRegExString, std::regex::ECMAScript | std::regex::icase);
- std::match_results<std::string_view::const_iterator> MatchResults;
- std::string_view UrlToParse(Url);
- if (regex_match(begin(UrlToParse), end(UrlToParse), MatchResults, ArtifactURLRegEx) && MatchResults.size() == 5)
- {
- auto GetMatch = [&MatchResults](uint32_t Index) -> std::string_view {
- ZEN_ASSERT(Index < MatchResults.size());
-
- const auto& Match = MatchResults[Index];
-
- return std::string_view(&*Match.first, Match.second - Match.first);
- };
-
- const std::string_view Host = GetMatch(1);
- const std::string_view Namespace = GetMatch(2);
- const std::string_view Bucket = GetMatch(3);
- const std::string_view BuildId = GetMatch(4);
-
- OutHost = Host;
- OutNamespace = Namespace;
- OutBucket = Bucket;
- OutBuildId = BuildId;
- return true;
- }
- else
- {
- return false;
- }
-}
-
-} // namespace zen