diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:59:41 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:59:41 +0200 |
| commit | 1f27c826bd36cd065e0fdd160f60c1f887138f33 (patch) | |
| tree | 000528b3f672d0126ca92438378d9007bfdcd0d8 /src/zenremotestore/builds/filebuildstorage.cpp | |
| parent | move remoteproject to remotestorelib (#542) (diff) | |
| download | zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.tar.xz zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.zip | |
move zenutil builds code to zenremotestore (#543)
* move buildstorage implementations to zenremotestore lib
* move builds storage to zenremotelib
Diffstat (limited to 'src/zenremotestore/builds/filebuildstorage.cpp')
| -rw-r--r-- | src/zenremotestore/builds/filebuildstorage.cpp | 806 |
1 files changed, 806 insertions, 0 deletions
diff --git a/src/zenremotestore/builds/filebuildstorage.cpp b/src/zenremotestore/builds/filebuildstorage.cpp new file mode 100644 index 000000000..5cfd80666 --- /dev/null +++ b/src/zenremotestore/builds/filebuildstorage.cpp @@ -0,0 +1,806 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/builds/filebuildstorage.h> + +#include <zencore/basicfile.h> +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalidation.h> +#include <zencore/fmtutils.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zencore/trace.h> + +namespace zen { + +using namespace std::literals; + +class FileBuildStorage : public BuildStorage +{ +public: + explicit FileBuildStorage(const std::filesystem::path& StoragePath, + BuildStorage::Statistics& Stats, + bool EnableJsonOutput, + double LatencySec, + double DelayPerKBSec) + : m_StoragePath(StoragePath) + , m_Stats(Stats) + , m_EnableJsonOutput(EnableJsonOutput) + , m_LatencySec(LatencySec) + , m_DelayPerKBSec(DelayPerKBSec) + { + CreateDirectories(GetBuildsFolder()); + CreateDirectories(GetBlobsFolder()); + CreateDirectories(GetBlobsMetadataFolder()); + } + + virtual ~FileBuildStorage() {} + + virtual CbObject ListNamespaces(bool bRecursive) override + { + ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); + ZEN_UNUSED(bRecursive); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + CbObjectWriter Writer; + Writer.BeginArray("results"); + { + } + Writer.EndArray(); // results + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); + } + + virtual CbObject ListBuilds(CbObject Query) override + { + ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); + ZEN_UNUSED(Query); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Query.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BuildFolder = GetBuildsFolder(); + DirectoryContent Content; + GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content); + CbObjectWriter Writer; + Writer.BeginArray("results"); + { + for (const std::filesystem::path& BuildPath : Content.Directories) + { + Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string()); + if (BuildId != Oid::Zero) + { + Writer.BeginObject(); + { + Writer.AddObjectId("buildId", BuildId); + Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView()); + } + Writer.EndObject(); + } + } + } + Writer.EndArray(); // results + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); + } + + virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override + { + ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + CbObjectWriter BuildObject; + BuildObject.AddObject("metadata", MetaData); + BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); + WriteBuild(BuildId, BuildObject.Save()); + + CbObjectWriter BuildResponse; + BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); + BuildResponse.Finalize(); + ReceivedBytes = BuildResponse.GetSaveSize(); + return BuildResponse.Save(); + } + + virtual CbObject GetBuild(const Oid& BuildId) override + { + ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + CbObject Build = ReadBuild(BuildId); + ReceivedBytes = Build.GetSize(); + return Build; + } + + virtual void FinalizeBuild(const Oid& BuildId) override + { + ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); + ZEN_UNUSED(BuildId); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + } + + virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, + const Oid& BuildPartId, + std::string_view PartName, + const CbObject& MetaData) override + { + ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); + CreateDirectories(BuildPartDataPath.parent_path()); + + TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView()); + m_WrittenBytes += MetaData.GetSize(); + WriteAsJson(BuildPartDataPath, MetaData); + + IoHash RawHash = IoHash::HashBuffer(MetaData.GetView()); + + CbObjectWriter Writer; + { + CbObject BuildObject = ReadBuild(BuildId); + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView(); + + Writer.AddObject("metadata"sv, MetaDataView); + Writer.BeginObject("parts"sv); + { + for (CbFieldView PartView : PartsObject) + { + if (PartView.GetName() != PartName) + { + Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId()); + } + } + Writer.AddObjectId(PartName, BuildPartId); + } + Writer.EndObject(); // parts + } + WriteBuild(BuildId, Writer.Save()); + + std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData); + + ReceivedBytes = sizeof(IoHash) * NeededAttachments.size(); + + return std::make_pair(RawHash, std::move(NeededAttachments)); + } + + virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override + { + ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); + + IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); + + ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); + + CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); + + ReceivedBytes = BuildPartObject.GetSize(); + + return BuildPartObject; + } + + virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override + { + ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); + IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); + + IoHash RawHash = IoHash::HashBuffer(Payload.GetView()); + if (RawHash != PartHash) + { + throw std::runtime_error( + fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash)); + } + + CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); + std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject)); + + ReceivedBytes = NeededAttachments.size() * sizeof(IoHash); + + return NeededAttachments; + } + + virtual void PutBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + const CompositeBuffer& Payload) override + { + ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); + ZEN_UNUSED(BuildId); + ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); + + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = Payload.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); + if (!IsFile(BlockPath)) + { + CreateDirectories(BlockPath.parent_path()); + TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); + } + + ReceivedBytes = Payload.GetSize(); + } + + virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + ZenContentType ContentType, + uint64_t PayloadSize, + std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, + std::function<void(uint64_t, bool)>&& OnSentBytes) override + { + ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); + ZEN_UNUSED(BuildId); + ZEN_UNUSED(ContentType); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); + if (!IsFile(BlockPath)) + { + CreateDirectories(BlockPath.parent_path()); + + struct WorkloadData + { + std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; + std::function<void(uint64_t, bool)> OnSentBytes; + TemporaryFile TempFile; + std::atomic<size_t> PartsLeft; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + Workload->Transmitter = std::move(Transmitter); + Workload->OnSentBytes = std::move(OnSentBytes); + std::error_code Ec; + Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec); + + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); + } + + std::vector<std::function<void()>> WorkItems; + uint64_t Offset = 0; + while (Offset < PayloadSize) + { + uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset); + + WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { + ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); + IoBuffer PartPayload = Workload->Transmitter(Offset, Size); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = PartPayload.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + std::error_code Ec; + Workload->TempFile.Write(PartPayload, Offset, Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})", + Workload->TempFile.GetPath(), + Ec.message(), + Ec.value())); + } + + const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1; + if (IsLastPart) + { + Workload->TempFile.Flush(); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll()))); + Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec); + if (Ec) + { + throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})", + Workload->TempFile.GetPath(), + BlockPath, + Ec.message(), + Ec.value())); + } + } + Workload->OnSentBytes(SentBytes, IsLastPart); + }); + + Offset += Size; + } + Workload->PartsLeft.store(WorkItems.size()); + return WorkItems; + } + return {}; + } + + virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override + { + ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); + ZEN_UNUSED(BuildId); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); + if (IsFile(BlockPath)) + { + BasicFile File(BlockPath, BasicFile::Mode::kRead); + IoBuffer Payload; + if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) + { + Payload = IoBuffer(RangeBytes); + File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset); + } + else + { + Payload = File.ReadAll(); + ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); + } + Payload.SetContentType(ZenContentType::kCompressedBinary); + ReceivedBytes = Payload.GetSize(); + return Payload; + } + return IoBuffer{}; + } + + virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, + const IoHash& RawHash, + uint64_t ChunkSize, + std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, + std::function<void()>&& OnComplete) override + { + ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); + ZEN_UNUSED(BuildId); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); + if (IsFile(BlockPath)) + { + struct WorkloadData + { + std::atomic<uint64_t> BytesRemaining; + BasicFile BlobFile; + std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; + std::function<void()> OnComplete; + }; + + std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); + Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); + const uint64_t BlobSize = Workload->BlobFile.FileSize(); + + Workload->OnReceive = std::move(OnReceive); + Workload->OnComplete = std::move(OnComplete); + Workload->BytesRemaining = BlobSize; + + std::vector<std::function<void()>> WorkItems; + uint64_t Offset = 0; + while (Offset < BlobSize) + { + uint64_t Size = Min(ChunkSize, BlobSize - Offset); + WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { + ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + IoBuffer PartPayload(Size); + Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); + ReceivedBytes = PartPayload.GetSize(); + Workload->OnReceive(Offset, PartPayload); + uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); + if (ByteRemaning == Size) + { + Workload->OnComplete(); + } + }); + + Offset += Size; + } + return WorkItems; + } + return {}; + } + + virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override + { + ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); + ZEN_UNUSED(BuildId); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = MetaData.GetSize(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash); + CreateDirectories(BlockMetaDataPath.parent_path()); + TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView()); + WriteAsJson(BlockMetaDataPath, MetaData); + return true; + } + + virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override + { + ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); + ZEN_UNUSED(BuildId); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + uint64_t FoundCount = 0; + + DirectoryContent Content; + GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); + CbObjectWriter Writer; + Writer.BeginArray("blocks"); + for (const std::filesystem::path& MetaDataFile : Content.Files) + { + IoHash ChunkHash; + if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash)) + { + std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash); + if (IsFile(BlockPath)) + { + IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); + + CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); + Writer.AddObject(BlockObject); + FoundCount++; + if (FoundCount == MaxBlockCount) + { + break; + } + } + } + } + Writer.EndArray(); // blocks + + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); + } + + virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override + { + ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); + ZEN_UNUSED(BuildId); + + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(); + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + CbObjectWriter Writer; + Writer.BeginArray("blocks"); + + for (const IoHash& BlockHash : BlockHashes) + { + std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); + if (IsFile(MetaDataFile)) + { + IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); + + CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); + Writer.AddObject(BlockObject); + } + } + Writer.EndArray(); // blocks + Writer.Finalize(); + ReceivedBytes = Writer.GetSaveSize(); + return Writer.Save(); + } + + virtual void PutBuildPartStats(const Oid& BuildId, + const Oid& BuildPartId, + const tsl::robin_map<std::string, double>& FloatStats) override + { + uint64_t ReceivedBytes = 0; + uint64_t SentBytes = 0; + + SimulateLatency(SentBytes, 0); + auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); + + Stopwatch ExecutionTimer; + auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); + + CbObjectWriter Request; + Request.BeginObject("floatStats"sv); + for (auto It : FloatStats) + { + Request.AddFloat(It.first, It.second); + } + Request.EndObject(); + Request.Finalize(); + SentBytes = Request.GetSaveSize(); + + const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); + CreateDirectories(BuildPartStatsDataPath.parent_path()); + + CbObject Payload = Request.Save(); + + TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); + WriteAsJson(BuildPartStatsDataPath, Payload); + } + +protected: + std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; } + std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; } + std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; } + std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); } + + std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; } + + std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const + { + return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString(); + } + + std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const + { + return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb"; + } + + std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const + { + return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid()); + } + + std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); } + + std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const + { + return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash); + } + + void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize) + { + double SleepSec = m_LatencySec; + if (m_DelayPerKBSec > 0.0) + { + SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u); + } + if (SleepSec > 0) + { + Sleep(int(SleepSec * 1000)); + } + } + + void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const + { + if (m_EnableJsonOutput) + { + ExtendableStringBuilder<128> SB; + CompactBinaryToJson(Data, SB); + std::filesystem::path JsonPath = OriginalPath; + JsonPath.replace_extension(".json"); + std::string_view JsonMetaData = SB.ToView(); + TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length())); + } + } + + void WriteBuild(const Oid& BuildId, CbObjectView Data) + { + const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); + CreateDirectories(BuildDataPath.parent_path()); + TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView()); + WriteAsJson(BuildDataPath, Data); + } + + CbObject ReadBuild(const Oid& BuildId) + { + const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); + FileContents Content = ReadFile(BuildDataPath); + if (Content.ErrorCode) + { + throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})", + BuildId, + BuildDataPath, + Content.ErrorCode.message(), + Content.ErrorCode.value())); + } + IoBuffer Payload = Content.Flatten(); + ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); + CbObject BuildObject = CbObject(SharedBuffer(Payload)); + return BuildObject; + } + + std::vector<IoHash> GetNeededAttachments(CbObjectView BuildPartObject) + { + std::vector<IoHash> NeededAttachments; + BuildPartObject.IterateAttachments([&](CbFieldView FieldView) { + const IoHash AttachmentHash = FieldView.AsBinaryAttachment(); + const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash); + if (!IsFile(BlockPath)) + { + NeededAttachments.push_back(AttachmentHash); + } + }); + return NeededAttachments; + } + + bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload) + { + IoHash VerifyHash; + uint64_t VerifySize; + CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize); + if (!ValidateBuffer) + { + return false; + } + if (VerifyHash != RawHash) + { + return false; + } + + IoHashStream Hash; + bool CouldDecompress = ValidateBuffer.DecompressToStream( + 0, + (uint64_t)-1, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { + ZEN_UNUSED(SourceOffset, SourceSize, Offset); + for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) + { + Hash.Append(Segment.GetView()); + } + return true; + }); + if (!CouldDecompress) + { + return false; + } + if (Hash.GetHash() != VerifyHash) + { + return false; + } + return true; + } + +private: + void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) + { + const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs(); + m_Stats.TotalBytesWritten += UploadedBytes; + m_Stats.TotalBytesRead += DownloadedBytes; + m_Stats.TotalRequestTimeUs += ElapsedUs; + m_Stats.TotalRequestCount++; + SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes); + SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes); + if (ElapsedUs > 0) + { + uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs; + SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); + } + } + + const std::filesystem::path m_StoragePath; + BuildStorage::Statistics& m_Stats; + const bool m_EnableJsonOutput = false; + std::atomic<uint64_t> m_WrittenBytes; + + const double m_LatencySec = 0.0; + const double m_DelayPerKBSec = 0.0; +}; + +std::unique_ptr<BuildStorage> +CreateFileBuildStorage(const std::filesystem::path& StoragePath, + BuildStorage::Statistics& Stats, + bool EnableJsonOutput, + double LatencySec, + double DelayPerKBSec) +{ + return std::make_unique<FileBuildStorage>(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec); +} + +} // namespace zen |