diff options
| author | Dan Engelbrecht <[email protected]> | 2025-10-03 09:59:41 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-10-03 09:59:41 +0200 |
| commit | 1f27c826bd36cd065e0fdd160f60c1f887138f33 (patch) | |
| tree | 000528b3f672d0126ca92438378d9007bfdcd0d8 /src/zenutil/filebuildstorage.cpp | |
| parent | move remoteproject to remotestorelib (#542) (diff) | |
| download | zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.tar.xz zen-1f27c826bd36cd065e0fdd160f60c1f887138f33.zip | |
move zenutil builds code to zenremotestore (#543)
* move buildstorage implementations to zenremotestore lib
* move builds storage to zenremotelib
Diffstat (limited to 'src/zenutil/filebuildstorage.cpp')
| -rw-r--r-- | src/zenutil/filebuildstorage.cpp | 806 |
1 files changed, 0 insertions, 806 deletions
diff --git a/src/zenutil/filebuildstorage.cpp b/src/zenutil/filebuildstorage.cpp deleted file mode 100644 index f75fe403f..000000000 --- a/src/zenutil/filebuildstorage.cpp +++ /dev/null @@ -1,806 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include <zenutil/filebuildstorage.h> - -#include <zencore/basicfile.h> -#include <zencore/compactbinarybuilder.h> -#include <zencore/compactbinaryvalidation.h> -#include <zencore/fmtutils.h> -#include <zencore/scopeguard.h> -#include <zencore/timer.h> -#include <zencore/trace.h> - -namespace zen { - -using namespace std::literals; - -class FileBuildStorage : public BuildStorage -{ -public: - explicit FileBuildStorage(const std::filesystem::path& StoragePath, - BuildStorage::Statistics& Stats, - bool EnableJsonOutput, - double LatencySec, - double DelayPerKBSec) - : m_StoragePath(StoragePath) - , m_Stats(Stats) - , m_EnableJsonOutput(EnableJsonOutput) - , m_LatencySec(LatencySec) - , m_DelayPerKBSec(DelayPerKBSec) - { - CreateDirectories(GetBuildsFolder()); - CreateDirectories(GetBlobsFolder()); - CreateDirectories(GetBlobsMetadataFolder()); - } - - virtual ~FileBuildStorage() {} - - virtual CbObject ListNamespaces(bool bRecursive) override - { - ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); - ZEN_UNUSED(bRecursive); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter Writer; - Writer.BeginArray("results"); - { - } - Writer.EndArray(); // results - - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual CbObject ListBuilds(CbObject Query) override - { - ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); - ZEN_UNUSED(Query); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = Query.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildFolder = GetBuildsFolder(); - DirectoryContent Content; - GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content); - CbObjectWriter Writer; - Writer.BeginArray("results"); - { - for (const std::filesystem::path& BuildPath : Content.Directories) - { - Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string()); - if (BuildId != Oid::Zero) - { - Writer.BeginObject(); - { - Writer.AddObjectId("buildId", BuildId); - Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView()); - } - Writer.EndObject(); - } - } - } - Writer.EndArray(); // results - - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = MetaData.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter BuildObject; - BuildObject.AddObject("metadata", MetaData); - BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); - WriteBuild(BuildId, BuildObject.Save()); - - CbObjectWriter BuildResponse; - BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); - BuildResponse.Finalize(); - ReceivedBytes = BuildResponse.GetSaveSize(); - return BuildResponse.Save(); - } - - virtual CbObject GetBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObject Build = ReadBuild(BuildId); - ReceivedBytes = Build.GetSize(); - return Build; - } - - virtual void FinalizeBuild(const Oid& BuildId) override - { - ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - } - - virtual std::pair<IoHash, std::vector<IoHash>> PutBuildPart(const Oid& BuildId, - const Oid& BuildPartId, - std::string_view PartName, - const CbObject& MetaData) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = MetaData.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); - CreateDirectories(BuildPartDataPath.parent_path()); - - TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView()); - m_WrittenBytes += MetaData.GetSize(); - WriteAsJson(BuildPartDataPath, MetaData); - - IoHash RawHash = IoHash::HashBuffer(MetaData.GetView()); - - CbObjectWriter Writer; - { - CbObject BuildObject = ReadBuild(BuildId); - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView(); - - Writer.AddObject("metadata"sv, MetaDataView); - Writer.BeginObject("parts"sv); - { - for (CbFieldView PartView : PartsObject) - { - if (PartView.GetName() != PartName) - { - Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId()); - } - } - Writer.AddObjectId(PartName, BuildPartId); - } - Writer.EndObject(); // parts - } - WriteBuild(BuildId, Writer.Save()); - - std::vector<IoHash> NeededAttachments = GetNeededAttachments(MetaData); - - ReceivedBytes = sizeof(IoHash) * NeededAttachments.size(); - - return std::make_pair(RawHash, std::move(NeededAttachments)); - } - - virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); - - IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - - ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); - - CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); - - ReceivedBytes = BuildPartObject.GetSize(); - - return BuildPartObject; - } - - virtual std::vector<IoHash> FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override - { - ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); - IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); - - IoHash RawHash = IoHash::HashBuffer(Payload.GetView()); - if (RawHash != PartHash) - { - throw std::runtime_error( - fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash)); - } - - CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); - std::vector<IoHash> NeededAttachments(GetNeededAttachments(BuildPartObject)); - - ReceivedBytes = NeededAttachments.size() * sizeof(IoHash); - - return NeededAttachments; - } - - virtual void PutBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - const CompositeBuffer& Payload) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); - ZEN_UNUSED(BuildId); - ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); - - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = Payload.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!IsFile(BlockPath)) - { - CreateDirectories(BlockPath.parent_path()); - TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); - } - - ReceivedBytes = Payload.GetSize(); - } - - virtual std::vector<std::function<void()>> PutLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - ZenContentType ContentType, - uint64_t PayloadSize, - std::function<IoBuffer(uint64_t Offset, uint64_t Size)>&& Transmitter, - std::function<void(uint64_t, bool)>&& OnSentBytes) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); - ZEN_UNUSED(BuildId); - ZEN_UNUSED(ContentType); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (!IsFile(BlockPath)) - { - CreateDirectories(BlockPath.parent_path()); - - struct WorkloadData - { - std::function<IoBuffer(uint64_t Offset, uint64_t Size)> Transmitter; - std::function<void(uint64_t, bool)> OnSentBytes; - TemporaryFile TempFile; - std::atomic<size_t> PartsLeft; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->Transmitter = std::move(Transmitter); - Workload->OnSentBytes = std::move(OnSentBytes); - std::error_code Ec; - Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec); - - if (Ec) - { - throw std::runtime_error( - fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); - } - - std::vector<std::function<void()>> WorkItems; - uint64_t Offset = 0; - while (Offset < PayloadSize) - { - uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset); - - WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { - ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); - IoBuffer PartPayload = Workload->Transmitter(Offset, Size); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = PartPayload.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - std::error_code Ec; - Workload->TempFile.Write(PartPayload, Offset, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})", - Workload->TempFile.GetPath(), - Ec.message(), - Ec.value())); - } - - const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1; - if (IsLastPart) - { - Workload->TempFile.Flush(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll()))); - Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec); - if (Ec) - { - throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})", - Workload->TempFile.GetPath(), - BlockPath, - Ec.message(), - Ec.value())); - } - } - Workload->OnSentBytes(SentBytes, IsLastPart); - }); - - Offset += Size; - } - Workload->PartsLeft.store(WorkItems.size()); - return WorkItems; - } - return {}; - } - - virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (IsFile(BlockPath)) - { - BasicFile File(BlockPath, BasicFile::Mode::kRead); - IoBuffer Payload; - if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) - { - Payload = IoBuffer(RangeBytes); - File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset); - } - else - { - Payload = File.ReadAll(); - ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); - } - Payload.SetContentType(ZenContentType::kCompressedBinary); - ReceivedBytes = Payload.GetSize(); - return Payload; - } - return IoBuffer{}; - } - - virtual std::vector<std::function<void()>> GetLargeBuildBlob(const Oid& BuildId, - const IoHash& RawHash, - uint64_t ChunkSize, - std::function<void(uint64_t Offset, const IoBuffer& Chunk)>&& OnReceive, - std::function<void()>&& OnComplete) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); - if (IsFile(BlockPath)) - { - struct WorkloadData - { - std::atomic<uint64_t> BytesRemaining; - BasicFile BlobFile; - std::function<void(uint64_t Offset, const IoBuffer& Chunk)> OnReceive; - std::function<void()> OnComplete; - }; - - std::shared_ptr<WorkloadData> Workload(std::make_shared<WorkloadData>()); - Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); - const uint64_t BlobSize = Workload->BlobFile.FileSize(); - - Workload->OnReceive = std::move(OnReceive); - Workload->OnComplete = std::move(OnComplete); - Workload->BytesRemaining = BlobSize; - - std::vector<std::function<void()>> WorkItems; - uint64_t Offset = 0; - while (Offset < BlobSize) - { - uint64_t Size = Min(ChunkSize, BlobSize - Offset); - WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { - ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - IoBuffer PartPayload(Size); - Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); - ReceivedBytes = PartPayload.GetSize(); - Workload->OnReceive(Offset, PartPayload); - uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); - if (ByteRemaning == Size) - { - Workload->OnComplete(); - } - }); - - Offset += Size; - } - return WorkItems; - } - return {}; - } - - virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override - { - ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = MetaData.GetSize(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash); - CreateDirectories(BlockMetaDataPath.parent_path()); - TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView()); - WriteAsJson(BlockMetaDataPath, MetaData); - return true; - } - - virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override - { - ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - uint64_t FoundCount = 0; - - DirectoryContent Content; - GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); - CbObjectWriter Writer; - Writer.BeginArray("blocks"); - for (const std::filesystem::path& MetaDataFile : Content.Files) - { - IoHash ChunkHash; - if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash)) - { - std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash); - if (IsFile(BlockPath)) - { - IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Writer.AddObject(BlockObject); - FoundCount++; - if (FoundCount == MaxBlockCount) - { - break; - } - } - } - } - Writer.EndArray(); // blocks - - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span<const IoHash> BlockHashes) override - { - ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); - ZEN_UNUSED(BuildId); - - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(); - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter Writer; - Writer.BeginArray("blocks"); - - for (const IoHash& BlockHash : BlockHashes) - { - std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); - if (IsFile(MetaDataFile)) - { - IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); - - CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); - Writer.AddObject(BlockObject); - } - } - Writer.EndArray(); // blocks - Writer.Finalize(); - ReceivedBytes = Writer.GetSaveSize(); - return Writer.Save(); - } - - virtual void PutBuildPartStats(const Oid& BuildId, - const Oid& BuildPartId, - const tsl::robin_map<std::string, double>& FloatStats) override - { - uint64_t ReceivedBytes = 0; - uint64_t SentBytes = 0; - - SimulateLatency(SentBytes, 0); - auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); - - Stopwatch ExecutionTimer; - auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); - - CbObjectWriter Request; - Request.BeginObject("floatStats"sv); - for (auto It : FloatStats) - { - Request.AddFloat(It.first, It.second); - } - Request.EndObject(); - Request.Finalize(); - SentBytes = Request.GetSaveSize(); - - const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); - CreateDirectories(BuildPartStatsDataPath.parent_path()); - - CbObject Payload = Request.Save(); - - TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); - WriteAsJson(BuildPartStatsDataPath, Payload); - } - -protected: - std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; } - std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; } - std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; } - std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); } - - std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; } - - std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const - { - return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString(); - } - - std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const - { - return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb"; - } - - std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const - { - return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid()); - } - - std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); } - - std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const - { - return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash); - } - - void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize) - { - double SleepSec = m_LatencySec; - if (m_DelayPerKBSec > 0.0) - { - SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u); - } - if (SleepSec > 0) - { - Sleep(int(SleepSec * 1000)); - } - } - - void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const - { - if (m_EnableJsonOutput) - { - ExtendableStringBuilder<128> SB; - CompactBinaryToJson(Data, SB); - std::filesystem::path JsonPath = OriginalPath; - JsonPath.replace_extension(".json"); - std::string_view JsonMetaData = SB.ToView(); - TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length())); - } - } - - void WriteBuild(const Oid& BuildId, CbObjectView Data) - { - const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); - CreateDirectories(BuildDataPath.parent_path()); - TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView()); - WriteAsJson(BuildDataPath, Data); - } - - CbObject ReadBuild(const Oid& BuildId) - { - const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); - FileContents Content = ReadFile(BuildDataPath); - if (Content.ErrorCode) - { - throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})", - BuildId, - BuildDataPath, - Content.ErrorCode.message(), - Content.ErrorCode.value())); - } - IoBuffer Payload = Content.Flatten(); - ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); - CbObject BuildObject = CbObject(SharedBuffer(Payload)); - return BuildObject; - } - - std::vector<IoHash> GetNeededAttachments(CbObjectView BuildPartObject) - { - std::vector<IoHash> NeededAttachments; - BuildPartObject.IterateAttachments([&](CbFieldView FieldView) { - const IoHash AttachmentHash = FieldView.AsBinaryAttachment(); - const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash); - if (!IsFile(BlockPath)) - { - NeededAttachments.push_back(AttachmentHash); - } - }); - return NeededAttachments; - } - - bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload) - { - IoHash VerifyHash; - uint64_t VerifySize; - CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize); - if (!ValidateBuffer) - { - return false; - } - if (VerifyHash != RawHash) - { - return false; - } - - IoHashStream Hash; - bool CouldDecompress = ValidateBuffer.DecompressToStream( - 0, - (uint64_t)-1, - [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { - ZEN_UNUSED(SourceOffset, SourceSize, Offset); - for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) - { - Hash.Append(Segment.GetView()); - } - return true; - }); - if (!CouldDecompress) - { - return false; - } - if (Hash.GetHash() != VerifyHash) - { - return false; - } - return true; - } - -private: - void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) - { - const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs(); - m_Stats.TotalBytesWritten += UploadedBytes; - m_Stats.TotalBytesRead += DownloadedBytes; - m_Stats.TotalRequestTimeUs += ElapsedUs; - m_Stats.TotalRequestCount++; - SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes); - SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes); - if (ElapsedUs > 0) - { - uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs; - SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); - } - } - - const std::filesystem::path m_StoragePath; - BuildStorage::Statistics& m_Stats; - const bool m_EnableJsonOutput = false; - std::atomic<uint64_t> m_WrittenBytes; - - const double m_LatencySec = 0.0; - const double m_DelayPerKBSec = 0.0; -}; - -std::unique_ptr<BuildStorage> -CreateFileBuildStorage(const std::filesystem::path& StoragePath, - BuildStorage::Statistics& Stats, - bool EnableJsonOutput, - double LatencySec, - double DelayPerKBSec) -{ - return std::make_unique<FileBuildStorage>(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec); -} - -} // namespace zen |