aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/filebuildstorage.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-10-03 09:59:41 +0200
committerGitHub Enterprise <[email protected]>2025-10-03 09:59:41 +0200
commit1f27c826bd36cd065e0fdd160f60c1f887138f33 (patch)
tree000528b3f672d0126ca92438378d9007bfdcd0d8 /src/zenutil/filebuildstorage.cpp
parentmove remoteproject to remotestorelib (#542) (diff)
downloadzen-1f27c826bd36cd065e0fdd160f60c1f887138f33.tar.xz
zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.zip
move zenutil builds code to zenremotestore (#543)
* move buildstorage implementations to zenremotestore lib * move builds storage to zenremotelib
Diffstat (limited to 'src/zenutil/filebuildstorage.cpp')
-rw-r--r--src/zenutil/filebuildstorage.cpp806
1 files changed, 0 insertions, 806 deletions
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
deleted file mode 100644
index f75fe403f..000000000
--- a/src/zenutil/filebuildstorage.cpp
+++ /dev/null
@@ -1,806 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include <zenutil/filebuildstorage.h>
-
-#include <zencore/basicfile.h>
-#include <zencore/compactbinarybuilder.h>
-#include <zencore/compactbinaryvalidation.h>
-#include <zencore/fmtutils.h>
-#include <zencore/scopeguard.h>
-#include <zencore/timer.h>
-#include <zencore/trace.h>
-
-namespace zen {
-
-using namespace std::literals;
-
-class FileBuildStorage : public BuildStorage
-{
-public:
- explicit FileBuildStorage(const std::filesystem::path& StoragePath,
- BuildStorage::Statistics& Stats,
- bool EnableJsonOutput,
- double LatencySec,
- double DelayPerKBSec)
- : m_StoragePath(StoragePath)
- , m_Stats(Stats)
- , m_EnableJsonOutput(EnableJsonOutput)
- , m_LatencySec(LatencySec)
- , m_DelayPerKBSec(DelayPerKBSec)
- {
- CreateDirectories(GetBuildsFolder());
- CreateDirectories(GetBlobsFolder());
- CreateDirectories(GetBlobsMetadataFolder());
- }
-
- virtual ~FileBuildStorage() {}
-
- virtual CbObject ListNamespaces(bool bRecursive) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
- ZEN_UNUSED(bRecursive);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter Writer;
- Writer.BeginArray("results");
- {
- }
- Writer.EndArray(); // results
-
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual CbObject ListBuilds(CbObject Query) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
- ZEN_UNUSED(Query);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = Query.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildFolder = GetBuildsFolder();
- DirectoryContent Content;
- GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content);
- CbObjectWriter Writer;
- Writer.BeginArray("results");
- {
- for (const std::filesystem::path& BuildPath : Content.Directories)
- {
- Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string());
- if (BuildId != Oid::Zero)
- {
- Writer.BeginObject();
- {
- Writer.AddObjectId("buildId", BuildId);
- Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView());
- }
- Writer.EndObject();
- }
- }
- }
- Writer.EndArray(); // results
-
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBuild");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = MetaData.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter BuildObject;
- BuildObject.AddObject("metadata", MetaData);
- BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
- WriteBuild(BuildId, BuildObject.Save());
-
- CbObjectWriter BuildResponse;
- BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
- BuildResponse.Finalize();
- ReceivedBytes = BuildResponse.GetSaveSize();
- return BuildResponse.Save();
- }
-
- virtual CbObject GetBuild(const Oid& BuildId) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBuild");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObject Build = ReadBuild(BuildId);
- ReceivedBytes = Build.GetSize();
- return Build;
- }
-
- virtual void FinalizeBuild(const Oid& BuildId) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
- }
-
- virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
- const Oid& BuildPartId,
- std::string_view PartName,
- const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = MetaData.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
- CreateDirectories(BuildPartDataPath.parent_path());
-
- TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView());
- m_WrittenBytes += MetaData.GetSize();
- WriteAsJson(BuildPartDataPath, MetaData);
-
- IoHash RawHash = IoHash::HashBuffer(MetaData.GetView());
-
- CbObjectWriter Writer;
- {
- CbObject BuildObject = ReadBuild(BuildId);
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView();
-
- Writer.AddObject("metadata"sv, MetaDataView);
- Writer.BeginObject("parts"sv);
- {
- for (CbFieldView PartView : PartsObject)
- {
- if (PartView.GetName() != PartName)
- {
- Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId());
- }
- }
- Writer.AddObjectId(PartName, BuildPartId);
- }
- Writer.EndObject(); // parts
- }
- WriteBuild(BuildId, Writer.Save());
-
- std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData);
-
- ReceivedBytes = sizeof(IoHash) * NeededAttachments.size();
-
- return std::make_pair(RawHash, std::move(NeededAttachments));
- }
-
- virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
-
- IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
-
- ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
-
- CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
-
- ReceivedBytes = BuildPartObject.GetSize();
-
- return BuildPartObject;
- }
-
- virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart");
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
- IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
-
- IoHash RawHash = IoHash::HashBuffer(Payload.GetView());
- if (RawHash != PartHash)
- {
- throw std::runtime_error(
- fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash));
- }
-
- CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
- std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject));
-
- ReceivedBytes = NeededAttachments.size() * sizeof(IoHash);
-
- return NeededAttachments;
- }
-
- virtual void PutBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- const CompositeBuffer& Payload) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob");
- ZEN_UNUSED(BuildId);
- ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
-
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload));
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = Payload.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!IsFile(BlockPath))
- {
- CreateDirectories(BlockPath.parent_path());
- TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
- }
-
- ReceivedBytes = Payload.GetSize();
- }
-
- virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- ZenContentType ContentType,
- uint64_t PayloadSize,
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
- std::function<void(uint64_t, bool)>&& OnSentBytes) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob");
- ZEN_UNUSED(BuildId);
- ZEN_UNUSED(ContentType);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (!IsFile(BlockPath))
- {
- CreateDirectories(BlockPath.parent_path());
-
- struct WorkloadData
- {
- std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
- std::function<void(uint64_t, bool)> OnSentBytes;
- TemporaryFile TempFile;
- std::atomic<size_t> PartsLeft;
- };
-
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->Transmitter = std::move(Transmitter);
- Workload->OnSentBytes = std::move(OnSentBytes);
- std::error_code Ec;
- Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec);
-
- if (Ec)
- {
- throw std::runtime_error(
- fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
- }
-
- std::vector<std::function<void()>> WorkItems;
- uint64_t Offset = 0;
- while (Offset < PayloadSize)
- {
- uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset);
-
- WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() {
- ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work");
- IoBuffer PartPayload = Workload->Transmitter(Offset, Size);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = PartPayload.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- std::error_code Ec;
- Workload->TempFile.Write(PartPayload, Offset, Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})",
- Workload->TempFile.GetPath(),
- Ec.message(),
- Ec.value()));
- }
-
- const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1;
- if (IsLastPart)
- {
- Workload->TempFile.Flush();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll())));
- Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec);
- if (Ec)
- {
- throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})",
- Workload->TempFile.GetPath(),
- BlockPath,
- Ec.message(),
- Ec.value()));
- }
- }
- Workload->OnSentBytes(SentBytes, IsLastPart);
- });
-
- Offset += Size;
- }
- Workload->PartsLeft.store(WorkItems.size());
- return WorkItems;
- }
- return {};
- }
-
- virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (IsFile(BlockPath))
- {
- BasicFile File(BlockPath, BasicFile::Mode::kRead);
- IoBuffer Payload;
- if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
- {
- Payload = IoBuffer(RangeBytes);
- File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset);
- }
- else
- {
- Payload = File.ReadAll();
- ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
- }
- Payload.SetContentType(ZenContentType::kCompressedBinary);
- ReceivedBytes = Payload.GetSize();
- return Payload;
- }
- return IoBuffer{};
- }
-
- virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
- const IoHash& RawHash,
- uint64_t ChunkSize,
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
- std::function<void()>&& OnComplete) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
- if (IsFile(BlockPath))
- {
- struct WorkloadData
- {
- std::atomic<uint64_t> BytesRemaining;
- BasicFile BlobFile;
- std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
- std::function<void()> OnComplete;
- };
-
- std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
- Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
- const uint64_t BlobSize = Workload->BlobFile.FileSize();
-
- Workload->OnReceive = std::move(OnReceive);
- Workload->OnComplete = std::move(OnComplete);
- Workload->BytesRemaining = BlobSize;
-
- std::vector<std::function<void()>> WorkItems;
- uint64_t Offset = 0;
- while (Offset < BlobSize)
- {
- uint64_t Size = Min(ChunkSize, BlobSize - Offset);
- WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
- ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work");
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- IoBuffer PartPayload(Size);
- Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
- ReceivedBytes = PartPayload.GetSize();
- Workload->OnReceive(Offset, PartPayload);
- uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
- if (ByteRemaning == Size)
- {
- Workload->OnComplete();
- }
- });
-
- Offset += Size;
- }
- return WorkItems;
- }
- return {};
- }
-
- virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = MetaData.GetSize();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash);
- CreateDirectories(BlockMetaDataPath.parent_path());
- TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView());
- WriteAsJson(BlockMetaDataPath, MetaData);
- return true;
- }
-
- virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- uint64_t FoundCount = 0;
-
- DirectoryContent Content;
- GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
- CbObjectWriter Writer;
- Writer.BeginArray("blocks");
- for (const std::filesystem::path& MetaDataFile : Content.Files)
- {
- IoHash ChunkHash;
- if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash))
- {
- std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash);
- if (IsFile(BlockPath))
- {
- IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
-
- CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Writer.AddObject(BlockObject);
- FoundCount++;
- if (FoundCount == MaxBlockCount)
- {
- break;
- }
- }
- }
- }
- Writer.EndArray(); // blocks
-
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
- {
- ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
- ZEN_UNUSED(BuildId);
-
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size();
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter Writer;
- Writer.BeginArray("blocks");
-
- for (const IoHash& BlockHash : BlockHashes)
- {
- std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
- if (IsFile(MetaDataFile))
- {
- IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
-
- CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
- Writer.AddObject(BlockObject);
- }
- }
- Writer.EndArray(); // blocks
- Writer.Finalize();
- ReceivedBytes = Writer.GetSaveSize();
- return Writer.Save();
- }
-
- virtual void PutBuildPartStats(const Oid& BuildId,
- const Oid& BuildPartId,
- const tsl::robin_map<std::string, double>& FloatStats) override
- {
- uint64_t ReceivedBytes = 0;
- uint64_t SentBytes = 0;
-
- SimulateLatency(SentBytes, 0);
- auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); });
-
- Stopwatch ExecutionTimer;
- auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); });
-
- CbObjectWriter Request;
- Request.BeginObject("floatStats"sv);
- for (auto It : FloatStats)
- {
- Request.AddFloat(It.first, It.second);
- }
- Request.EndObject();
- Request.Finalize();
- SentBytes = Request.GetSaveSize();
-
- const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
- CreateDirectories(BuildPartStatsDataPath.parent_path());
-
- CbObject Payload = Request.Save();
-
- TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
- WriteAsJson(BuildPartStatsDataPath, Payload);
- }
-
-protected:
- std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; }
- std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; }
- std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; }
- std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); }
-
- std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; }
-
- std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const
- {
- return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString();
- }
-
- std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const
- {
- return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb";
- }
-
- std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const
- {
- return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid());
- }
-
- std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); }
-
- std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const
- {
- return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash);
- }
-
- void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize)
- {
- double SleepSec = m_LatencySec;
- if (m_DelayPerKBSec > 0.0)
- {
- SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u);
- }
- if (SleepSec > 0)
- {
- Sleep(int(SleepSec * 1000));
- }
- }
-
- void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const
- {
- if (m_EnableJsonOutput)
- {
- ExtendableStringBuilder<128> SB;
- CompactBinaryToJson(Data, SB);
- std::filesystem::path JsonPath = OriginalPath;
- JsonPath.replace_extension(".json");
- std::string_view JsonMetaData = SB.ToView();
- TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length()));
- }
- }
-
- void WriteBuild(const Oid& BuildId, CbObjectView Data)
- {
- const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
- CreateDirectories(BuildDataPath.parent_path());
- TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView());
- WriteAsJson(BuildDataPath, Data);
- }
-
- CbObject ReadBuild(const Oid& BuildId)
- {
- const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
- FileContents Content = ReadFile(BuildDataPath);
- if (Content.ErrorCode)
- {
- throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})",
- BuildId,
- BuildDataPath,
- Content.ErrorCode.message(),
- Content.ErrorCode.value()));
- }
- IoBuffer Payload = Content.Flatten();
- ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
- CbObject BuildObject = CbObject(SharedBuffer(Payload));
- return BuildObject;
- }
-
- std::vector<IoHash> GetNeededAttachments(CbObjectView BuildPartObject)
- {
- std::vector<IoHash> NeededAttachments;
- BuildPartObject.IterateAttachments([&](CbFieldView FieldView) {
- const IoHash AttachmentHash = FieldView.AsBinaryAttachment();
- const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash);
- if (!IsFile(BlockPath))
- {
- NeededAttachments.push_back(AttachmentHash);
- }
- });
- return NeededAttachments;
- }
-
- bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload)
- {
- IoHash VerifyHash;
- uint64_t VerifySize;
- CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize);
- if (!ValidateBuffer)
- {
- return false;
- }
- if (VerifyHash != RawHash)
- {
- return false;
- }
-
- IoHashStream Hash;
- bool CouldDecompress = ValidateBuffer.DecompressToStream(
- 0,
- (uint64_t)-1,
- [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
- ZEN_UNUSED(SourceOffset, SourceSize, Offset);
- for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
- {
- Hash.Append(Segment.GetView());
- }
- return true;
- });
- if (!CouldDecompress)
- {
- return false;
- }
- if (Hash.GetHash() != VerifyHash)
- {
- return false;
- }
- return true;
- }
-
-private:
- void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes)
- {
- const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs();
- m_Stats.TotalBytesWritten += UploadedBytes;
- m_Stats.TotalBytesRead += DownloadedBytes;
- m_Stats.TotalRequestTimeUs += ElapsedUs;
- m_Stats.TotalRequestCount++;
- SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes);
- SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes);
- if (ElapsedUs > 0)
- {
- uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs;
- SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec);
- }
- }
-
- const std::filesystem::path m_StoragePath;
- BuildStorage::Statistics& m_Stats;
- const bool m_EnableJsonOutput = false;
- std::atomic<uint64_t> m_WrittenBytes;
-
- const double m_LatencySec = 0.0;
- const double m_DelayPerKBSec = 0.0;
-};
-
-std::unique_ptr<BuildStorage>
-CreateFileBuildStorage(const std::filesystem::path& StoragePath,
- BuildStorage::Statistics& Stats,
- bool EnableJsonOutput,
- double LatencySec,
- double DelayPerKBSec)
-{
- return std::make_unique<FileBuildStorage>(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec);
-}
-
-} // namespace zen