// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; class LocalExportProjectStore : public RemoteProjectStore { public: LocalExportProjectStore(LoggerRef InLog, std::string_view Name, std::string_view OptionalBaseName, const std::filesystem::path& FolderPath, bool ForceDisableBlocks, bool ForceEnableTempBlocks) : m_Log(InLog) , m_Name(Name) , m_OptionalBaseName(OptionalBaseName) , m_OutputPath(FolderPath) { if (ForceDisableBlocks) { m_EnableBlocks = false; } if (ForceEnableTempBlocks) { m_UseTempBlocks = true; } } virtual RemoteStoreInfo GetInfo() const override { return { .CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = m_Name, .Description = fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)}; } virtual Stats GetStats() const override { return {.m_SentBytes = m_SentBytes.load(), .m_ReceivedBytes = m_ReceivedBytes.load(), .m_RequestTimeNS = m_RequestTimeNS.load(), .m_RequestCount = m_RequestCount.load(), .m_PeakSentBytes = m_PeakSentBytes.load(), .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } virtual bool GetExtendedStats(ExtendedStats& OutStats) const override { ZEN_UNUSED(OutStats); return false; } virtual CreateContainerResult CreateContainer() override { // Nothing to do here return {}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { SaveResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); { CbObject ContainerObject = LoadCompactBinaryObject(Payload); ContainerObject.IterateAttachments([&](CbFieldView FieldView) { IoHash AttachmentHash = FieldView.AsBinaryAttachment(); std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); if (!IsFile(AttachmentPath)) { Result.Needs.insert(AttachmentHash); } else if (std::filesystem::path AttachmentMetaPath = GetAttachmentMetaPath(AttachmentHash); IsFile(AttachmentMetaPath)) { BasicFile TouchIt(AttachmentMetaPath, BasicFile::Mode::kWrite); } }); } std::filesystem::path ContainerPath = m_OutputPath; ContainerPath.append(m_Name); try { CreateDirectories(m_OutputPath); BasicFile ContainerFile; ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); std::error_code Ec; ContainerFile.WriteAll(Payload, Ec); if (Ec) { throw std::system_error(Ec, Ec.message()); } Result.RawHash = IoHash::HashBuffer(Payload); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what()); } AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&& BlockDescription) override { SaveAttachmentResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (!IsFile(ChunkPath)) { try { CreateDirectories(ChunkPath.parent_path()); BasicFile ChunkFile; ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); size_t Offset = 0; for (const SharedBuffer& Segment : Payload.GetSegments()) { ChunkFile.Write(Segment.GetView(), Offset); Offset += Segment.GetSize(); } } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what()); } } if (!Result.ErrorCode && BlockDescription.BlockHash != IoHash::Zero) { try { std::filesystem::path MetaPath = GetAttachmentMetaPath(RawHash); CbObject MetaData = BuildChunkBlockDescription(BlockDescription, {}); SharedBuffer MetaBuffer = MetaData.GetBuffer(); BasicFile MetaFile; MetaFile.Open(MetaPath, BasicFile::Mode::kTruncate); MetaFile.Write(MetaBuffer.GetView(), 0); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving block description to '{}'. Reason: {}", RawHash, Ex.what()); } } AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { SaveAttachmentsResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { Result = SaveAttachmentsResult{ChunkResult}; break; } } return Result; } virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; } virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); } virtual GetKnownBlocksResult GetKnownBlocks() override { Stopwatch Timer; if (m_OptionalBaseName.empty()) { size_t MaxBlockCount = 10000; GetKnownBlocksResult Result; DirectoryContent Content; GetDirectoryContent( m_OutputPath, DirectoryContentFlags::IncludeFiles | DirectoryContentFlags::Recursive | DirectoryContentFlags::IncludeModificationTick, Content); std::vector RecentOrder(Content.Files.size()); std::iota(RecentOrder.begin(), RecentOrder.end(), 0u); std::sort(RecentOrder.begin(), RecentOrder.end(), [&Content](size_t Lhs, size_t Rhs) { return Content.FileModificationTicks[Lhs] > Content.FileModificationTicks[Rhs]; }); for (size_t FileIndex : RecentOrder) { std::filesystem::path MetaPath = Content.Files[FileIndex]; if (MetaPath.extension() == MetaExtension) { IoBuffer MetaFile = ReadFile(MetaPath).Flatten(); CbValidateError Err; CbObject ValidatedObject = ValidateAndReadCompactBinaryObject(std::move(MetaFile), Err); if (Err == CbValidateError::None) { ChunkBlockDescription Description = ParseChunkBlockDescription(ValidatedObject); if (Description.BlockHash != IoHash::Zero) { Result.Blocks.emplace_back(std::move(Description)); if (Result.Blocks.size() == MaxBlockCount) { break; } } } } } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName); if (LoadResult.ErrorCode) { return GetKnownBlocksResult{LoadResult}; } std::vector BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); if (BlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}}; } std::vector ExistingBlockHashes; for (const IoHash& RawHash : BlockHashes) { std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (IsFile(ChunkPath)) { ExistingBlockHashes.push_back(RawHash); } } if (ExistingBlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}}; } std::vector ThinKnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); const size_t KnownBlockCount = ThinKnownBlocks.size(); GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeMs() / 1000.0}}; Result.Blocks.resize(KnownBlockCount); for (size_t BlockIndex = 0; BlockIndex < KnownBlockCount; BlockIndex++) { Result.Blocks[BlockIndex].BlockHash = ThinKnownBlocks[BlockIndex].BlockHash; Result.Blocks[BlockIndex].ChunkRawHashes = std::move(ThinKnownBlocks[BlockIndex].ChunkRawHashes); } return Result; } virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes, BuildStorageCache* OptionalCache, const Oid& CacheBuildId) override { GetBlockDescriptionsResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); Result.Blocks.reserve(BlockHashes.size()); uint64_t ByteCount = 0; std::vector UnorderedList; { if (OptionalCache) { std::vector CacheBlockMetadatas = OptionalCache->GetBlobMetadatas(CacheBuildId, BlockHashes); for (const CbObject& BlockObject : CacheBlockMetadatas) { ByteCount += BlockObject.GetSize(); } UnorderedList = ParseBlockMetadatas(CacheBlockMetadatas); } tsl::robin_map BlockDescriptionLookup; BlockDescriptionLookup.reserve(BlockHashes.size()); for (size_t DescriptionIndex = 0; DescriptionIndex < UnorderedList.size(); DescriptionIndex++) { const ChunkBlockDescription& Description = UnorderedList[DescriptionIndex]; BlockDescriptionLookup.insert_or_assign(Description.BlockHash, DescriptionIndex); } if (UnorderedList.size() < BlockHashes.size()) { for (const IoHash& RawHash : BlockHashes) { if (!BlockDescriptionLookup.contains(RawHash)) { std::filesystem::path MetaPath = GetAttachmentMetaPath(RawHash); IoBuffer MetaFile = ReadFile(MetaPath).Flatten(); ByteCount += MetaFile.GetSize(); CbValidateError Err; CbObject ValidatedObject = ValidateAndReadCompactBinaryObject(std::move(MetaFile), Err); if (Err == CbValidateError::None) { ChunkBlockDescription Description = ParseChunkBlockDescription(ValidatedObject); if (Description.BlockHash != IoHash::Zero) { BlockDescriptionLookup.insert_or_assign(Description.BlockHash, UnorderedList.size()); UnorderedList.emplace_back(std::move(Description)); } } } } } Result.Blocks.reserve(UnorderedList.size()); for (const IoHash& RawHash : BlockHashes) { if (auto It = BlockDescriptionLookup.find(RawHash); It != BlockDescriptionLookup.end()) { Result.Blocks.emplace_back(std::move(UnorderedList[It->second])); } } } AddStats(0, ByteCount, Timer.GetElapsedTimeUs() * 1000); return Result; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override { LoadAttachmentResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (!IsFile(ChunkPath)) { Result.ErrorCode = gsl::narrow(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); return Result; } { BasicFile ChunkFile; ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); Result.Bytes = ChunkFile.ReadAll(); } AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); return Result; } virtual LoadAttachmentRangesResult LoadAttachmentRanges(const IoHash& RawHash, std::span> Ranges) override { ZEN_ASSERT(!Ranges.empty()); LoadAttachmentRangesResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (!IsFile(ChunkPath)) { Result.ErrorCode = gsl::narrow(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); return Result; } { uint64_t Start = Ranges.front().first; uint64_t Length = Ranges.back().first + Ranges.back().second - Ranges.front().first; Result.Bytes = IoBufferBuilder::MakeFromFile(ChunkPath, Start, Length); Result.Ranges.reserve(Ranges.size()); for (const std::pair& Range : Ranges) { Result.Ranges.push_back(std::make_pair(Range.first - Start, Range.second)); } } AddStats(0, std::accumulate(Result.Ranges.begin(), Result.Ranges.end(), uint64_t(0), [](uint64_t Current, const std::pair& Value) { return Current + Value.second; }), Timer.GetElapsedTimeUs() * 1000); return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { Stopwatch Timer; LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return LoadAttachmentsResult{ChunkResult}; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast(ChunkResult.ElapsedSeconds * 1000))); Result.Chunks.emplace_back( std::pair{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); } return Result; } private: LoadContainerResult LoadContainer(const std::string& Name) { LoadContainerResult Result; Stopwatch Timer; auto _ = MakeGuard([&Result, &Timer]() { Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; }); std::filesystem::path SourcePath = m_OutputPath; SourcePath.append(Name); if (!IsFile(SourcePath)) { Result.ErrorCode = gsl::narrow(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string()); return Result; } IoBuffer ContainerPayload; { BasicFile ContainerFile; ContainerFile.Open(SourcePath, BasicFile::Mode::kRead); ContainerPayload = ContainerFile.ReadAll(); } AddStats(0, ContainerPayload.GetSize(), Timer.GetElapsedTimeUs() * 1000); CbValidateError ValidateResult = CbValidateError::None; if (Result.ContainerObject = ValidateAndReadCompactBinaryObject(std::move(ContainerPayload), ValidateResult); ValidateResult != CbValidateError::None || !Result.ContainerObject) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')", SourcePath.string(), ToString(ValidateResult)); return Result; } return Result; } std::filesystem::path GetAttachmentBasePath(const IoHash& RawHash) const { ExtendablePathBuilder<128> ShardedPath; ShardedPath.Append(m_OutputPath.c_str()); ExtendableStringBuilder<64> HashString; RawHash.ToHexString(HashString); const char* str = HashString.c_str(); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str, str + 3); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 3, str + 5); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 5, str + 40); return ShardedPath.ToPath(); } static constexpr std::string_view BlobExtension = ".blob"; static constexpr std::string_view MetaExtension = ".meta"; std::filesystem::path GetAttachmentPath(const IoHash& RawHash) { return GetAttachmentBasePath(RawHash).replace_extension(BlobExtension); } std::filesystem::path GetAttachmentMetaPath(const IoHash& RawHash) { return GetAttachmentBasePath(RawHash).replace_extension(MetaExtension); } void AddStats(uint64_t UploadedBytes, uint64_t DownloadedBytes, uint64_t ElapsedNS) { m_SentBytes.fetch_add(UploadedBytes); m_ReceivedBytes.fetch_add(DownloadedBytes); m_RequestTimeNS.fetch_add(ElapsedNS); SetAtomicMax(m_PeakSentBytes, UploadedBytes); SetAtomicMax(m_PeakReceivedBytes, DownloadedBytes); if (ElapsedNS > 0) { uint64_t BytesPerSec = (gsl::narrow(UploadedBytes + DownloadedBytes) * 1000000) / ElapsedNS; SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); } m_RequestCount.fetch_add(1); } inline LoggerRef Log() const { return m_Log; } LoggerRef m_Log; const std::string m_Name; const std::string m_OptionalBaseName; const std::filesystem::path m_OutputPath; bool m_EnableBlocks = true; bool m_UseTempBlocks = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; std::atomic_uint64_t m_RequestTimeNS = {}; std::atomic_uint64_t m_RequestCount = {}; std::atomic_uint64_t m_PeakSentBytes = {}; std::atomic_uint64_t m_PeakReceivedBytes = {}; std::atomic_uint64_t m_PeakBytesPerSec = {}; }; std::shared_ptr CreateFileRemoteStore(LoggerRef InLog, const FileRemoteStoreOptions& Options) { std::shared_ptr RemoteStore = std::make_shared(InLog, Options.Name, Options.OptionalBaseName, std::filesystem::path(Options.FolderPath), Options.ForceDisableBlocks, Options.ForceEnableTempBlocks); return RemoteStore; } } // namespace zen