// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; class FileBuildStorage : public BuildStorage { public: explicit FileBuildStorage(const std::filesystem::path& StoragePath, BuildStorage::Statistics& Stats, bool EnableJsonOutput, double LatencySec, double DelayPerKBSec) : m_StoragePath(StoragePath) , m_Stats(Stats) , m_EnableJsonOutput(EnableJsonOutput) , m_LatencySec(LatencySec) , m_DelayPerKBSec(DelayPerKBSec) { CreateDirectories(GetBuildsFolder()); CreateDirectories(GetBlobsFolder()); CreateDirectories(GetBlobsMetadataFolder()); } virtual ~FileBuildStorage() {} virtual CbObject ListNamespaces(bool bRecursive) override { ZEN_TRACE_CPU("FileBuildStorage::ListNamespaces"); ZEN_UNUSED(bRecursive); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Writer; Writer.BeginArray("results"); { } Writer.EndArray(); // results Writer.Finalize(); ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } virtual CbObject ListBuilds(CbObject Query) override { ZEN_TRACE_CPU("FileBuildStorage::ListBuilds"); ZEN_UNUSED(Query); uint64_t ReceivedBytes = 0; uint64_t SentBytes = Query.GetSize(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildFolder = GetBuildsFolder(); DirectoryContent Content; GetDirectoryContent(BuildFolder, DirectoryContentFlags::IncludeDirs, Content); CbObjectWriter Writer; Writer.BeginArray("results"); { for (const std::filesystem::path& BuildPath : Content.Directories) { Oid BuildId = Oid::TryFromHexString(BuildPath.stem().string()); if (BuildId != Oid::Zero) { Writer.BeginObject(); { Writer.AddObjectId("buildId", BuildId); Writer.AddObject("metadata", ReadBuild(BuildId)["metadata"sv].AsObjectView()); } Writer.EndObject(); } } } Writer.EndArray(); // results Writer.Finalize(); ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } virtual CbObject PutBuild(const Oid& BuildId, const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuild"); uint64_t ReceivedBytes = 0; uint64_t SentBytes = MetaData.GetSize(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter BuildObject; BuildObject.AddObject("metadata", MetaData); BuildObject.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); WriteBuild(BuildId, BuildObject.Save()); CbObjectWriter BuildResponse; BuildResponse.AddInteger("chunkSize"sv, 32u * 1024u * 1024u); BuildResponse.Finalize(); ReceivedBytes = BuildResponse.GetSaveSize(); return BuildResponse.Save(); } virtual CbObject GetBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuild"); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObject Build = ReadBuild(BuildId); ReceivedBytes = Build.GetSize(); return Build; } virtual void FinalizeBuild(const Oid& BuildId) override { ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuild"); ZEN_UNUSED(BuildId); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); } virtual std::pair> PutBuildPart(const Oid& BuildId, const Oid& BuildPartId, std::string_view PartName, const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuildPart"); uint64_t ReceivedBytes = 0; uint64_t SentBytes = MetaData.GetSize(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); CreateDirectories(BuildPartDataPath.parent_path()); TemporaryFile::SafeWriteFile(BuildPartDataPath, MetaData.GetView()); m_WrittenBytes += MetaData.GetSize(); WriteAsJson(BuildPartDataPath, MetaData); IoHash RawHash = IoHash::HashBuffer(MetaData.GetView()); CbObjectWriter Writer; { CbObject BuildObject = ReadBuild(BuildId); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); CbObjectView MetaDataView = BuildObject["metadata"sv].AsObjectView(); Writer.AddObject("metadata"sv, MetaDataView); Writer.BeginObject("parts"sv); { for (CbFieldView PartView : PartsObject) { if (PartView.GetName() != PartName) { Writer.AddObjectId(PartView.GetName(), PartView.AsObjectId()); } } Writer.AddObjectId(PartName, BuildPartId); } Writer.EndObject(); // parts } WriteBuild(BuildId, Writer.Save()); std::vector NeededAttachments = GetNeededAttachments(MetaData); ReceivedBytes = sizeof(IoHash) * NeededAttachments.size(); return std::make_pair(RawHash, std::move(NeededAttachments)); } virtual CbObject GetBuildPart(const Oid& BuildId, const Oid& BuildPartId) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuildPart"); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); ReceivedBytes = BuildPartObject.GetSize(); return BuildPartObject; } virtual std::vector FinalizeBuildPart(const Oid& BuildId, const Oid& BuildPartId, const IoHash& PartHash) override { ZEN_TRACE_CPU("FileBuildStorage::FinalizeBuildPart"); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BuildPartDataPath = GetBuildPartPath(BuildId, BuildPartId); IoBuffer Payload = ReadFile(BuildPartDataPath).Flatten(); IoHash RawHash = IoHash::HashBuffer(Payload.GetView()); if (RawHash != PartHash) { throw std::runtime_error( fmt::format("Failed finalizing build part {}: Expected hash {}, got {}", BuildPartId, PartHash, RawHash)); } CbObject BuildPartObject = CbObject(SharedBuffer(Payload)); std::vector NeededAttachments(GetNeededAttachments(BuildPartObject)); ReceivedBytes = NeededAttachments.size() * sizeof(IoHash); return NeededAttachments; } virtual void PutBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, const CompositeBuffer& Payload) override { ZEN_TRACE_CPU("FileBuildStorage::PutBuildBlob"); ZEN_UNUSED(BuildId); ZEN_ASSERT(ContentType == ZenContentType::kCompressedBinary); ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, Payload)); uint64_t ReceivedBytes = 0; uint64_t SentBytes = Payload.GetSize(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (!IsFile(BlockPath)) { CreateDirectories(BlockPath.parent_path()); TemporaryFile::SafeWriteFile(BlockPath, Payload.Flatten().GetView()); } ReceivedBytes = Payload.GetSize(); } virtual std::vector> PutLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, ZenContentType ContentType, uint64_t PayloadSize, std::function&& Transmitter, std::function&& OnSentBytes) override { ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob"); ZEN_UNUSED(BuildId); ZEN_UNUSED(ContentType); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (!IsFile(BlockPath)) { CreateDirectories(BlockPath.parent_path()); struct WorkloadData { std::function Transmitter; std::function OnSentBytes; TemporaryFile TempFile; std::atomic PartsLeft; }; std::shared_ptr Workload(std::make_shared()); Workload->Transmitter = std::move(Transmitter); Workload->OnSentBytes = std::move(OnSentBytes); std::error_code Ec; Workload->TempFile.CreateTemporary(BlockPath.parent_path(), Ec); if (Ec) { throw std::runtime_error( fmt::format("Failed opening temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); } std::vector> WorkItems; uint64_t Offset = 0; while (Offset < PayloadSize) { uint64_t Size = Min(32u * 1024u * 1024u, PayloadSize - Offset); WorkItems.push_back([this, RawHash, BlockPath, Workload, Offset, Size]() { ZEN_TRACE_CPU("FileBuildStorage::PutLargeBuildBlob_Work"); IoBuffer PartPayload = Workload->Transmitter(Offset, Size); uint64_t ReceivedBytes = 0; uint64_t SentBytes = PartPayload.GetSize(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); std::error_code Ec; Workload->TempFile.Write(PartPayload, Offset, Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed writing to temporary file '{}': {} ({})", Workload->TempFile.GetPath(), Ec.message(), Ec.value())); } const bool IsLastPart = Workload->PartsLeft.fetch_sub(1) == 1; if (IsLastPart) { Workload->TempFile.Flush(); ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(Workload->TempFile.ReadAll()))); Workload->TempFile.MoveTemporaryIntoPlace(BlockPath, Ec); if (Ec) { throw std::runtime_error(fmt::format("Failed moving temporary file '{}' to '{}': {} ({})", Workload->TempFile.GetPath(), BlockPath, Ec.message(), Ec.value())); } } Workload->OnSentBytes(SentBytes, IsLastPart); }); Offset += Size; } Workload->PartsLeft.store(WorkItems.size()); return WorkItems; } return {}; } virtual IoBuffer GetBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t RangeOffset, uint64_t RangeBytes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBuildBlob"); ZEN_UNUSED(BuildId); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (IsFile(BlockPath)) { BasicFile File(BlockPath, BasicFile::Mode::kRead); IoBuffer Payload; if (RangeOffset != 0 || RangeBytes != (uint64_t)-1) { Payload = IoBuffer(RangeBytes); File.Read(Payload.GetMutableView().GetData(), RangeBytes, RangeOffset); } else { Payload = File.ReadAll(); ZEN_ASSERT_SLOW(ValidateCompressedBuffer(RawHash, CompositeBuffer(SharedBuffer(Payload)))); } Payload.SetContentType(ZenContentType::kCompressedBinary); ReceivedBytes = Payload.GetSize(); return Payload; } return IoBuffer{}; } virtual std::vector> GetLargeBuildBlob(const Oid& BuildId, const IoHash& RawHash, uint64_t ChunkSize, std::function&& OnReceive, std::function&& OnComplete) override { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob"); ZEN_UNUSED(BuildId); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockPath = GetBlobPayloadPath(RawHash); if (IsFile(BlockPath)) { struct WorkloadData { std::atomic BytesRemaining; BasicFile BlobFile; std::function OnReceive; std::function OnComplete; }; std::shared_ptr Workload(std::make_shared()); Workload->BlobFile.Open(BlockPath, BasicFile::Mode::kRead); const uint64_t BlobSize = Workload->BlobFile.FileSize(); Workload->OnReceive = std::move(OnReceive); Workload->OnComplete = std::move(OnComplete); Workload->BytesRemaining = BlobSize; std::vector> WorkItems; uint64_t Offset = 0; while (Offset < BlobSize) { uint64_t Size = Min(ChunkSize, BlobSize - Offset); WorkItems.push_back([this, BlockPath, Workload, Offset, Size]() { ZEN_TRACE_CPU("FileBuildStorage::GetLargeBuildBlob_Work"); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); IoBuffer PartPayload(Size); Workload->BlobFile.Read(PartPayload.GetMutableView().GetData(), Size, Offset); ReceivedBytes = PartPayload.GetSize(); Workload->OnReceive(Offset, PartPayload); uint64_t ByteRemaning = Workload->BytesRemaining.fetch_sub(Size); if (ByteRemaning == Size) { Workload->OnComplete(); } }); Offset += Size; } return WorkItems; } return {}; } virtual bool PutBlockMetadata(const Oid& BuildId, const IoHash& BlockRawHash, const CbObject& MetaData) override { ZEN_TRACE_CPU("FileBuildStorage::PutBlockMetadata"); ZEN_UNUSED(BuildId); uint64_t ReceivedBytes = 0; uint64_t SentBytes = MetaData.GetSize(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); const std::filesystem::path BlockMetaDataPath = GetBlobMetadataPath(BlockRawHash); CreateDirectories(BlockMetaDataPath.parent_path()); TemporaryFile::SafeWriteFile(BlockMetaDataPath, MetaData.GetView()); WriteAsJson(BlockMetaDataPath, MetaData); return true; } virtual CbObject FindBlocks(const Oid& BuildId, uint64_t MaxBlockCount) override { ZEN_TRACE_CPU("FileBuildStorage::FindBlocks"); ZEN_UNUSED(BuildId); uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); uint64_t FoundCount = 0; DirectoryContent Content; GetDirectoryContent(GetBlobsMetadataFolder(), DirectoryContentFlags::IncludeFiles, Content); CbObjectWriter Writer; Writer.BeginArray("blocks"); for (const std::filesystem::path& MetaDataFile : Content.Files) { IoHash ChunkHash; if (IoHash::TryParse(MetaDataFile.stem().string(), ChunkHash)) { std::filesystem::path BlockPath = GetBlobPayloadPath(ChunkHash); if (IsFile(BlockPath)) { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); Writer.AddObject(BlockObject); FoundCount++; if (FoundCount == MaxBlockCount) { break; } } } } Writer.EndArray(); // blocks Writer.Finalize(); ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } virtual CbObject GetBlockMetadatas(const Oid& BuildId, std::span BlockHashes) override { ZEN_TRACE_CPU("FileBuildStorage::GetBlockMetadata"); ZEN_UNUSED(BuildId); uint64_t ReceivedBytes = 0; uint64_t SentBytes = sizeof(Oid) + sizeof(IoHash) * BlockHashes.size(); SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Writer; Writer.BeginArray("blocks"); for (const IoHash& BlockHash : BlockHashes) { std::filesystem::path MetaDataFile = GetBlobMetadataPath(BlockHash); if (IsFile(MetaDataFile)) { IoBuffer BlockMetaDataPayload = ReadFile(MetaDataFile).Flatten(); CbObject BlockObject = CbObject(SharedBuffer(BlockMetaDataPayload)); Writer.AddObject(BlockObject); } } Writer.EndArray(); // blocks Writer.Finalize(); ReceivedBytes = Writer.GetSaveSize(); return Writer.Save(); } virtual void PutBuildPartStats(const Oid& BuildId, const Oid& BuildPartId, const tsl::robin_map& FloatStats) override { uint64_t ReceivedBytes = 0; uint64_t SentBytes = 0; SimulateLatency(SentBytes, 0); auto _ = MakeGuard([&]() { SimulateLatency(0, ReceivedBytes); }); Stopwatch ExecutionTimer; auto __ = MakeGuard([&]() { AddStatistic(ExecutionTimer, SentBytes, ReceivedBytes); }); CbObjectWriter Request; Request.BeginObject("floatStats"sv); for (auto It : FloatStats) { Request.AddFloat(It.first, It.second); } Request.EndObject(); Request.Finalize(); SentBytes = Request.GetSaveSize(); const std::filesystem::path BuildPartStatsDataPath = GetBuildPartStatsPath(BuildId, BuildPartId); CreateDirectories(BuildPartStatsDataPath.parent_path()); CbObject Payload = Request.Save(); TemporaryFile::SafeWriteFile(BuildPartStatsDataPath, Payload.GetView()); WriteAsJson(BuildPartStatsDataPath, Payload); } protected: std::filesystem::path GetBuildsFolder() const { return m_StoragePath / "builds"; } std::filesystem::path GetBlobsFolder() const { return m_StoragePath / "blobs"; } std::filesystem::path GetBlobsMetadataFolder() const { return m_StoragePath / "blocks"; } std::filesystem::path GetBuildFolder(const Oid& BuildId) const { return GetBuildsFolder() / BuildId.ToString(); } std::filesystem::path GetBuildPath(const Oid& BuildId) const { return GetBuildFolder(BuildId) / "metadata.cb"; } std::filesystem::path GetBuildPartFolder(const Oid& BuildId, const Oid& BuildPartId) const { return GetBuildFolder(BuildId) / "parts" / BuildPartId.ToString(); } std::filesystem::path GetBuildPartPath(const Oid& BuildId, const Oid& BuildPartId) const { return GetBuildPartFolder(BuildId, BuildPartId) / "metadata.cb"; } std::filesystem::path GetBuildPartStatsPath(const Oid& BuildId, const Oid& BuildPartId) const { return GetBuildPartFolder(BuildId, BuildPartId) / fmt::format("stats_{}.cb", Oid::NewOid()); } std::filesystem::path GetBlobPayloadPath(const IoHash& RawHash) const { return GetBlobsFolder() / fmt::format("{}.cbz", RawHash); } std::filesystem::path GetBlobMetadataPath(const IoHash& RawHash) const { return GetBlobsMetadataFolder() / fmt::format("{}.cb", RawHash); } void SimulateLatency(uint64_t ReceiveSize, uint64_t SendSize) { double SleepSec = m_LatencySec; if (m_DelayPerKBSec > 0.0) { SleepSec += m_DelayPerKBSec * (double(SendSize + ReceiveSize) / 1024u); } if (SleepSec > 0) { Sleep(int(SleepSec * 1000)); } } void WriteAsJson(const std::filesystem::path& OriginalPath, CbObjectView Data) const { if (m_EnableJsonOutput) { ExtendableStringBuilder<128> SB; CompactBinaryToJson(Data, SB); std::filesystem::path JsonPath = OriginalPath; JsonPath.replace_extension(".json"); std::string_view JsonMetaData = SB.ToView(); TemporaryFile::SafeWriteFile(JsonPath, MemoryView(JsonMetaData.data(), JsonMetaData.length())); } } void WriteBuild(const Oid& BuildId, CbObjectView Data) { const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); CreateDirectories(BuildDataPath.parent_path()); TemporaryFile::SafeWriteFile(BuildDataPath, Data.GetView()); WriteAsJson(BuildDataPath, Data); } CbObject ReadBuild(const Oid& BuildId) { const std::filesystem::path BuildDataPath = GetBuildPath(BuildId); FileContents Content = ReadFile(BuildDataPath); if (Content.ErrorCode) { throw std::runtime_error(fmt::format("Failed reading build '{}' from '{}': {} ({})", BuildId, BuildDataPath, Content.ErrorCode.message(), Content.ErrorCode.value())); } IoBuffer Payload = Content.Flatten(); ZEN_ASSERT(ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default) == CbValidateError::None); CbObject BuildObject = CbObject(SharedBuffer(Payload)); return BuildObject; } std::vector GetNeededAttachments(CbObjectView BuildPartObject) { std::vector NeededAttachments; BuildPartObject.IterateAttachments([&](CbFieldView FieldView) { const IoHash AttachmentHash = FieldView.AsBinaryAttachment(); const std::filesystem::path BlockPath = GetBlobPayloadPath(AttachmentHash); if (!IsFile(BlockPath)) { NeededAttachments.push_back(AttachmentHash); } }); return NeededAttachments; } bool ValidateCompressedBuffer(const IoHash& RawHash, const CompositeBuffer& Payload) { IoHash VerifyHash; uint64_t VerifySize; CompressedBuffer ValidateBuffer = CompressedBuffer::FromCompressed(Payload, VerifyHash, VerifySize); if (!ValidateBuffer) { return false; } if (VerifyHash != RawHash) { return false; } IoHashStream Hash; bool CouldDecompress = ValidateBuffer.DecompressToStream( 0, (uint64_t)-1, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& RangeBuffer) { ZEN_UNUSED(SourceOffset, SourceSize, Offset); for (const SharedBuffer& Segment : RangeBuffer.GetSegments()) { Hash.Append(Segment.GetView()); } return true; }); if (!CouldDecompress) { return false; } if (Hash.GetHash() != VerifyHash) { return false; } return true; } private: void AddStatistic(Stopwatch& ExecutionTimer, uint64_t UploadedBytes, uint64_t DownloadedBytes) { const uint64_t ElapsedUs = ExecutionTimer.GetElapsedTimeUs(); m_Stats.TotalBytesWritten += UploadedBytes; m_Stats.TotalBytesRead += DownloadedBytes; m_Stats.TotalRequestTimeUs += ElapsedUs; m_Stats.TotalRequestCount++; SetAtomicMax(m_Stats.PeakSentBytes, UploadedBytes); SetAtomicMax(m_Stats.PeakReceivedBytes, DownloadedBytes); if (ElapsedUs > 0) { uint64_t BytesPerSec = ((UploadedBytes + DownloadedBytes) * 1000000u) / ElapsedUs; SetAtomicMax(m_Stats.PeakBytesPerSec, BytesPerSec); } } const std::filesystem::path m_StoragePath; BuildStorage::Statistics& m_Stats; const bool m_EnableJsonOutput = false; std::atomic m_WrittenBytes; const double m_LatencySec = 0.0; const double m_DelayPerKBSec = 0.0; }; std::unique_ptr CreateFileBuildStorage(const std::filesystem::path& StoragePath, BuildStorage::Statistics& Stats, bool EnableJsonOutput, double LatencySec, double DelayPerKBSec) { return std::make_unique(StoragePath, Stats, EnableJsonOutput, LatencySec, DelayPerKBSec); } } // namespace zen