aboutsummaryrefslogtreecommitdiff
path: root/src/zenutil/filebuildstorage.cpp
diff options
context:
space:
mode:
authorzousar <[email protected]>2025-06-24 16:26:29 -0600
committerzousar <[email protected]>2025-06-24 16:26:29 -0600
commitbb298631ba35a323827dda0b8cd6158e276b5f61 (patch)
tree7ba8db91c44ce83f2c518f80f80ab14910eefa6f /src/zenutil/filebuildstorage.cpp
parentChange to PutResult structure (diff)
parent5.6.14 (diff)
downloadzen-bb298631ba35a323827dda0b8cd6158e276b5f61.tar.xz
zen-bb298631ba35a323827dda0b8cd6158e276b5f61.zip
Merge branch 'main' into zs/put-overwrite-policy
Diffstat (limited to 'src/zenutil/filebuildstorage.cpp')
-rw-r--r--src/zenutil/filebuildstorage.cpp726
1 files changed, 726 insertions, 0 deletions
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp
new file mode 100644
index 000000000..c2cc5ab3c
--- /dev/null
+++ b/src/zenutil/filebuildstorage.cpp
@@ -0,0 +1,726 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenutil/filebuildstorage.h>
+
+#include <zencore/basicfile.h>
+#include <zencore/compactbinarybuilder.h>
+#include <zencore/compactbinaryvalidation.h>
+#include <zencore/fmtutils.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zencore/trace.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+class FileBuildStorage : public BuildStorage
+{
+public:
+ explicit FileBuildStorage(const std::filesystem::path& StoragePath,
+ BuildStorage::Statistics& Stats,
+ bool EnableJsonOutput,
+ double LatencySec,
+ double DelayPerKBSec)
+ : m_StoragePath(StoragePath)
+ , m_Stats(Stats)
+ , m_EnableJsonOutput(EnableJsonOutput)
+ , m_LatencySec(LatencySec)
+ , m_DelayPerKBSec(DelayPerKBSec)
+ {
+ CreateDirectories(GetBuildsFolder());
+ CreateDirectories(GetBlobsFolder());
+ CreateDirectories(GetBlobsMetadataFolder());
+ }
+
+ virtual ~FileBuildStorage() {}
+
+ virtual CbObject ListNamespaces(bool bRecursive) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces");
+ ZEN_UNUSED(bRecursive);
+
+ SimulateLatency(0, 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("results");
+ {
+ }
+ Writer.EndArray(); // results
+ Writer.Save();
+ SimulateLatency(Writer.GetSaveSize(), 0);
+ return Writer.Save();
+ }
+
+ virtual CbObject ListBuilds(CbObject Query) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::ListBuilds");
+ ZEN_UNUSED(Query);
+
+ SimulateLatency(Query.GetSize(), 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BuildFolder = GetBuildsFolder();
+ DirectoryContent Content;
+ GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content);
+ CbObjectWriter Writer;
+ Writer.BeginArray("results");
+ {
+ for (const std::filesystem::path& BuildPath : Content.Directories)
+ {
+ Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string());
+ if (BuildId != Oid::Zero)
+ {
+ Writer.BeginObject();
+ {
+ Writer.AddObjectId("buildId", BuildId);
+ Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView());
+ }
+ Writer.EndObject();
+ }
+ }
+ }
+ Writer.EndArray(); // results
+ Writer.Save();
+ SimulateLatency(Writer.GetSaveSize(), 0);
+ return Writer.Save();
+ }
+
+ virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::PutBuild");
+ SimulateLatency(MetaData.GetSize(), 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ CbObjectWriter BuildObject;
+ BuildObject.AddObject("metadata", MetaData);
+ BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
+ WriteBuild(BuildId, BuildObject.Save());
+
+ CbObjectWriter BuildResponse;
+ BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u);
+ BuildResponse.Save();
+
+ SimulateLatency(0, BuildResponse.GetSaveSize());
+ return BuildResponse.Save();
+ }
+
+ virtual CbObject GetBuild(const Oid& BuildId) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuild");
+ SimulateLatency(0, 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ CbObject Build = ReadBuild(BuildId);
+ SimulateLatency(0, Build.GetSize());
+ return Build;
+ }
+
+ virtual void FinalizeBuild(const Oid& BuildId) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild");
+ SimulateLatency(0, 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ ZEN_UNUSED(BuildId);
+ SimulateLatency(0, 0);
+ }
+
+ virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId,
+ const Oid& BuildPartId,
+ std::string_view PartName,
+ const CbObject& MetaData) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart");
+ SimulateLatency(MetaData.GetSize(), 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
+ CreateDirectories(BuildPartDataPath.parent_path());
+
+ TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView());
+ m_WrittenBytes += MetaData.GetSize();
+ WriteAsJson(BuildPartDataPath, MetaData);
+
+ IoHash RawHash = IoHash::HashBuffer(MetaData.GetView());
+
+ CbObjectWriter Writer;
+ {
+ CbObject BuildObject = ReadBuild(BuildId);
+ CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
+ CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView();
+
+ Writer.AddObject("metadata"sv, MetaDataView);
+ Writer.BeginObject("parts"sv);
+ {
+ for (CbFieldView PartView : PartsObject)
+ {
+ if (PartView.GetName() != PartName)
+ {
+ Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId());
+ }
+ }
+ Writer.AddObjectId(PartName, BuildPartId);
+ }
+ Writer.EndObject(); // parts
+ }
+ WriteBuild(BuildId, Writer.Save());
+
+ std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData);
+
+ SimulateLatency(0, sizeof(IoHash) * NeededAttachments.size());
+
+ return std::make_pair(RawHash, std::move(NeededAttachments));
+ }
+
+ virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart");
+ SimulateLatency(0, 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
+
+ IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
+ m_Stats.TotalBytesRead += Payload.GetSize();
+
+ ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
+
+ CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
+
+ SimulateLatency(0, BuildPartObject.GetSize());
+
+ return BuildPartObject;
+ }
+
+ virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart");
+ SimulateLatency(0, 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId);
+ IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten();
+ m_Stats.TotalBytesRead += Payload.GetSize();
+ IoHash RawHash = IoHash::HashBuffer(Payload.GetView());
+ if (RawHash != PartHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash));
+ }
+
+ CbObject BuildPartObject = CbObject(SharedBuffer(Payload));
+ std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject));
+
+ SimulateLatency(0, NeededAttachments.size() * sizeof(IoHash));
+
+ return NeededAttachments;
+ }
+
+ virtual void PutBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ const CompositeBuffer& Payload) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob");
+ ZEN_UNUSED(BuildId);
+ ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary);
+ SimulateLatency(Payload.GetSize(), 0);
+
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload));
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
+ if (!IsFile(BlockPath))
+ {
+ CreateDirectories(BlockPath.parent_path());
+ TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView());
+ }
+ m_Stats.TotalBytesWritten += Payload.GetSize();
+ SimulateLatency(0, 0);
+ }
+
+ virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ ZenContentType ContentType,
+ uint64_t PayloadSize,
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter,
+ std::function<void(uint64_t, bool)>&& OnSentBytes) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob");
+ ZEN_UNUSED(BuildId);
+ ZEN_UNUSED(ContentType);
+ SimulateLatency(0, 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
+ if (!IsFile(BlockPath))
+ {
+ CreateDirectories(BlockPath.parent_path());
+
+ struct WorkloadData
+ {
+ std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter;
+ std::function<void(uint64_t, bool)> OnSentBytes;
+ TemporaryFile TempFile;
+ std::atomic<size_t> PartsLeft;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ Workload->Transmitter = std::move(Transmitter);
+ Workload->OnSentBytes = std::move(OnSentBytes);
+ std::error_code Ec;
+ Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec);
+
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value()));
+ }
+
+ std::vector<std::function<void()>> WorkItems;
+ uint64_t Offset = 0;
+ while (Offset < PayloadSize)
+ {
+ uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset);
+
+ WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() {
+ ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work");
+ IoBuffer PartPayload = Workload->Transmitter(Offset, Size);
+ SimulateLatency(PartPayload.GetSize(), 0);
+
+ std::error_code Ec;
+ Workload->TempFile.Write(PartPayload, Offset, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})",
+ Workload->TempFile.GetPath(),
+ Ec.message(),
+ Ec.value()));
+ }
+ uint64_t BytesWritten = PartPayload.GetSize();
+ m_Stats.TotalBytesWritten += BytesWritten;
+ const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1;
+ if (IsLastPart)
+ {
+ Workload->TempFile.Flush();
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll())));
+ Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec);
+ if (Ec)
+ {
+ throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})",
+ Workload->TempFile.GetPath(),
+ BlockPath,
+ Ec.message(),
+ Ec.value()));
+ }
+ }
+ Workload->OnSentBytes(BytesWritten, IsLastPart);
+ SimulateLatency(0, 0);
+ });
+
+ Offset += Size;
+ }
+ Workload->PartsLeft.store(WorkItems.size());
+
+ SimulateLatency(0, 0);
+ return WorkItems;
+ }
+ SimulateLatency(0, 0);
+ return {};
+ }
+
+ virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob");
+ ZEN_UNUSED(BuildId);
+ SimulateLatency(0, 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
+ if (IsFile(BlockPath))
+ {
+ BasicFile File(BlockPath, BasicFile::Mode::kRead);
+ IoBuffer Payload;
+ if (RangeOffset != 0 || RangeBytes != (uint64_t)-1)
+ {
+ Payload = IoBuffer(RangeBytes);
+ File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset);
+ }
+ else
+ {
+ Payload = File.ReadAll();
+ ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload))));
+ }
+ Payload.SetContentType(ZenContentType::kCompressedBinary);
+ m_Stats.TotalBytesRead += Payload.GetSize();
+ SimulateLatency(0, Payload.GetSize());
+ return Payload;
+ }
+ SimulateLatency(0, 0);
+ return IoBuffer{};
+ }
+
+ virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId,
+ const IoHash& RawHash,
+ uint64_t ChunkSize,
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive,
+ std::function<void()>&& OnComplete) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob");
+ ZEN_UNUSED(BuildId);
+ SimulateLatency(0, 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash);
+ if (IsFile(BlockPath))
+ {
+ struct WorkloadData
+ {
+ std::atomic<uint64_t> BytesRemaining;
+ BasicFile BlobFile;
+ std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive;
+ std::function<void()> OnComplete;
+ };
+
+ std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>());
+ Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead);
+ const uint64_t BlobSize = Workload->BlobFile.FileSize();
+
+ Workload->OnReceive = std::move(OnReceive);
+ Workload->OnComplete = std::move(OnComplete);
+ Workload->BytesRemaining = BlobSize;
+
+ std::vector<std::function<void()>> WorkItems;
+ uint64_t Offset = 0;
+ while (Offset < BlobSize)
+ {
+ uint64_t Size = Min(ChunkSize, BlobSize - Offset);
+ WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() {
+ ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work");
+ SimulateLatency(0, 0);
+ IoBuffer PartPayload(Size);
+ Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset);
+ m_Stats.TotalBytesRead += PartPayload.GetSize();
+ Workload->OnReceive(Offset, PartPayload);
+ uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size);
+ if (ByteRemaning == Size)
+ {
+ Workload->OnComplete();
+ }
+ SimulateLatency(Size, PartPayload.GetSize());
+ });
+
+ Offset += Size;
+ }
+ SimulateLatency(0, 0);
+ return WorkItems;
+ }
+ return {};
+ }
+
+ virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata");
+ ZEN_UNUSED(BuildId);
+
+ SimulateLatency(MetaData.GetSize(), 0);
+
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash);
+ CreateDirectories(BlockMetaDataPath.parent_path());
+ TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView());
+ m_Stats.TotalBytesWritten += MetaData.GetSize();
+ WriteAsJson(BlockMetaDataPath, MetaData);
+ SimulateLatency(0, 0);
+ return true;
+ }
+
+ virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::FindBlocks");
+ ZEN_UNUSED(BuildId);
+ SimulateLatency(sizeof(BuildId), 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ uint64_t FoundCount = 0;
+
+ DirectoryContent Content;
+ GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content);
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
+ for (const std::filesystem::path& MetaDataFile : Content.Files)
+ {
+ IoHash ChunkHash;
+ if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash))
+ {
+ std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash);
+ if (IsFile(BlockPath))
+ {
+ IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
+
+ m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
+
+ CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
+ Writer.AddObject(BlockObject);
+ FoundCount++;
+ if (FoundCount == MaxBlockCount)
+ {
+ break;
+ }
+ }
+ }
+ }
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
+ return Result;
+ }
+
+ virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override
+ {
+ ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata");
+ ZEN_UNUSED(BuildId);
+ SimulateLatency(sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(), 0);
+ Stopwatch ExecutionTimer;
+ auto _ = MakeGuard([&]() { m_Stats.TotalExecutionTimeUs += ExecutionTimer.GetElapsedTimeUs(); });
+ m_Stats.TotalRequestCount++;
+
+ CbObjectWriter Writer;
+ Writer.BeginArray("blocks");
+
+ for (const IoHash& BlockHash : BlockHashes)
+ {
+ std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash);
+ if (IsFile(MetaDataFile))
+ {
+ IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten();
+
+ m_Stats.TotalBytesRead += BlockMetaDataPayload.GetSize();
+
+ CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload));
+ Writer.AddObject(BlockObject);
+ }
+ }
+ Writer.EndArray(); // blocks
+ CbObject Result = Writer.Save();
+ SimulateLatency(0, Result.GetSize());
+ return Result;
+ }
+
+ virtual void PutBuildPartStats(const Oid& BuildId,
+ const Oid& BuildPartId,
+ const tsl::robin_map<std::string, double>& FloatStats) override
+ {
+ CbObjectWriter Request;
+ Request.BeginObject("floatStats"sv);
+ for (auto It : FloatStats)
+ {
+ Request.AddFloat(It.first, It.second);
+ }
+ Request.EndObject();
+ CbObject Payload = Request.Save();
+
+ SimulateLatency(Payload.GetSize(), 0);
+
+ const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId);
+ CreateDirectories(BuildPartStatsDataPath.parent_path());
+
+ TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView());
+ WriteAsJson(BuildPartStatsDataPath, Payload);
+
+ SimulateLatency(0, 0);
+ }
+
+protected:
+ std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; }
+ std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; }
+ std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; }
+ std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); }
+
+ std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; }
+
+ std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const
+ {
+ return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString();
+ }
+
+ std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const
+ {
+ return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb";
+ }
+
+ std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const
+ {
+ return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid());
+ }
+
+ std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); }
+
+ std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const
+ {
+ return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash);
+ }
+
+ void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize)
+ {
+ double SleepSec = m_LatencySec;
+ if (m_DelayPerKBSec > 0.0)
+ {
+ SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u);
+ }
+ if (SleepSec > 0)
+ {
+ Sleep(int(SleepSec * 1000));
+ }
+ }
+
+ void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const
+ {
+ if (m_EnableJsonOutput)
+ {
+ ExtendableStringBuilder<128> SB;
+ CompactBinaryToJson(Data, SB);
+ std::filesystem::path JsonPath = OriginalPath;
+ JsonPath.replace_extension(".json");
+ std::string_view JsonMetaData = SB.ToView();
+ TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length()));
+ }
+ }
+
+ void WriteBuild(const Oid& BuildId, CbObjectView Data)
+ {
+ const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
+ CreateDirectories(BuildDataPath.parent_path());
+ TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView());
+ m_Stats.TotalBytesWritten += Data.GetSize();
+ WriteAsJson(BuildDataPath, Data);
+ }
+
+ CbObject ReadBuild(const Oid& BuildId)
+ {
+ const std::filesystem::path BuildDataPath = GetBuildPath(BuildId);
+ FileContents Content = ReadFile(BuildDataPath);
+ if (Content.ErrorCode)
+ {
+ throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})",
+ BuildId,
+ BuildDataPath,
+ Content.ErrorCode.message(),
+ Content.ErrorCode.value()));
+ }
+ IoBuffer Payload = Content.Flatten();
+ m_Stats.TotalBytesRead += Payload.GetSize();
+ ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None);
+ CbObject BuildObject = CbObject(SharedBuffer(Payload));
+ return BuildObject;
+ }
+
+ std::vector<IoHash> GetNeededAttachments(CbObjectView BuildPartObject)
+ {
+ std::vector<IoHash> NeededAttachments;
+ BuildPartObject.IterateAttachments([&](CbFieldView FieldView) {
+ const IoHash AttachmentHash = FieldView.AsBinaryAttachment();
+ const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash);
+ if (!IsFile(BlockPath))
+ {
+ NeededAttachments.push_back(AttachmentHash);
+ }
+ });
+ return NeededAttachments;
+ }
+
+ bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload)
+ {
+ IoHash VerifyHash;
+ uint64_t VerifySize;
+ CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize);
+ if (!ValidateBuffer)
+ {
+ return false;
+ }
+ if (VerifyHash != RawHash)
+ {
+ return false;
+ }
+
+ IoHashStream Hash;
+ bool CouldDecompress = ValidateBuffer.DecompressToStream(
+ 0,
+ (uint64_t)-1,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) {
+ ZEN_UNUSED(SourceOffset, SourceSize, Offset);
+ for (const SharedBuffer& Segment : RangeBuffer.GetSegments())
+ {
+ Hash.Append(Segment.GetView());
+ }
+ return true;
+ });
+ if (!CouldDecompress)
+ {
+ return false;
+ }
+ if (Hash.GetHash() != VerifyHash)
+ {
+ return false;
+ }
+ return true;
+ }
+
+private:
+ const std::filesystem::path m_StoragePath;
+ BuildStorage::Statistics& m_Stats;
+ const bool m_EnableJsonOutput = false;
+ std::atomic<uint64_t> m_WrittenBytes;
+
+ const double m_LatencySec = 0.0;
+ const double m_DelayPerKBSec = 0.0;
+};
+
+std::unique_ptr<BuildStorage>
+CreateFileBuildStorage(const std::filesystem::path& StoragePath,
+ BuildStorage::Statistics& Stats,
+ bool EnableJsonOutput,
+ double LatencySec,
+ double DelayPerKBSec)
+{
+ return std::make_unique<FileBuildStorage>(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec);
+}
+
+} // namespace zen