// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; class LocalExportProjectStore : public RemoteProjectStore { public: LocalExportProjectStore(LoggerRef InLog, std::string_view Name, std::string_view OptionalBaseName, const std::filesystem::path& FolderPath, bool ForceDisableBlocks, bool ForceEnableTempBlocks) : m_Log(InLog) , m_Name(Name) , m_OptionalBaseName(OptionalBaseName) , m_OutputPath(FolderPath) { if (ForceDisableBlocks) { m_EnableBlocks = false; } if (ForceEnableTempBlocks) { m_UseTempBlocks = true; } } virtual RemoteStoreInfo GetInfo() const override { return { .CreateBlocks = m_EnableBlocks, .UseTempBlockFiles = m_UseTempBlocks, .AllowChunking = true, .ContainerName = m_Name, .Description = fmt::format("[file] {}/{}{}{}"sv, m_OutputPath, m_Name, m_OptionalBaseName.empty() ? "" : " Base: ", m_OptionalBaseName)}; } virtual Stats GetStats() const override { return {.m_SentBytes = m_SentBytes.load(), .m_ReceivedBytes = m_ReceivedBytes.load(), .m_RequestTimeNS = m_RequestTimeNS.load(), .m_RequestCount = m_RequestCount.load(), .m_PeakSentBytes = m_PeakSentBytes.load(), .m_PeakReceivedBytes = m_PeakReceivedBytes.load(), .m_PeakBytesPerSec = m_PeakBytesPerSec.load()}; } virtual bool GetExtendedStats(ExtendedStats& OutStats) const override { ZEN_UNUSED(OutStats); return false; } virtual CreateContainerResult CreateContainer() override { // Nothing to do here return {}; } virtual SaveResult SaveContainer(const IoBuffer& Payload) override { Stopwatch Timer; SaveResult Result; { CbObject ContainerObject = LoadCompactBinaryObject(Payload); ContainerObject.IterateAttachments([&](CbFieldView FieldView) { IoHash AttachmentHash = FieldView.AsBinaryAttachment(); std::filesystem::path AttachmentPath = GetAttachmentPath(AttachmentHash); if (!IsFile(AttachmentPath)) { Result.Needs.insert(AttachmentHash); } }); } std::filesystem::path ContainerPath = m_OutputPath; ContainerPath.append(m_Name); try { CreateDirectories(m_OutputPath); BasicFile ContainerFile; ContainerFile.Open(ContainerPath, BasicFile::Mode::kTruncate); std::error_code Ec; ContainerFile.WriteAll(Payload, Ec); if (Ec) { throw std::system_error(Ec, Ec.message()); } Result.RawHash = IoHash::HashBuffer(Payload); } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog container to '{}'. Reason: {}", ContainerPath, Ex.what()); } AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash, ChunkBlockDescription&&) override { Stopwatch Timer; SaveAttachmentResult Result; std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (!IsFile(ChunkPath)) { try { CreateDirectories(ChunkPath.parent_path()); BasicFile ChunkFile; ChunkFile.Open(ChunkPath, BasicFile::Mode::kTruncate); size_t Offset = 0; for (const SharedBuffer& Segment : Payload.GetSegments()) { ChunkFile.Write(Segment.GetView(), Offset); Offset += Segment.GetSize(); } } catch (const std::exception& Ex) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("Failed saving oplog attachment to '{}'. Reason: {}", ChunkPath, Ex.what()); } } AddStats(Payload.GetSize(), 0, Timer.GetElapsedTimeUs() * 1000); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual SaveAttachmentsResult SaveAttachments(const std::vector& Chunks) override { Stopwatch Timer; for (const SharedBuffer& Chunk : Chunks) { CompressedBuffer Compressed = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer()); SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash(), {}); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return SaveAttachmentsResult{ChunkResult}; } } SaveAttachmentsResult Result; Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual FinalizeResult FinalizeContainer(const IoHash&) override { return {}; } virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Name); } virtual GetKnownBlocksResult GetKnownBlocks() override { if (m_OptionalBaseName.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent)}}; } LoadContainerResult LoadResult = LoadContainer(m_OptionalBaseName); if (LoadResult.ErrorCode) { return GetKnownBlocksResult{LoadResult}; } Stopwatch Timer; std::vector BlockHashes = GetBlockHashesFromOplog(LoadResult.ContainerObject); if (BlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; } std::vector ExistingBlockHashes; for (const IoHash& RawHash : BlockHashes) { std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (IsFile(ChunkPath)) { ExistingBlockHashes.push_back(RawHash); } } if (ExistingBlockHashes.empty()) { return GetKnownBlocksResult{{.ErrorCode = static_cast(HttpResponseCode::NoContent), .ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; } std::vector ThinKnownBlocks = GetBlocksFromOplog(LoadResult.ContainerObject, ExistingBlockHashes); const size_t KnowBlockCount = ThinKnownBlocks.size(); GetKnownBlocksResult Result{{.ElapsedSeconds = LoadResult.ElapsedSeconds + Timer.GetElapsedTimeUs() * 1000}}; Result.Blocks.resize(KnowBlockCount); for (size_t BlockIndex = 0; BlockIndex < KnowBlockCount; BlockIndex++) { Result.Blocks[BlockIndex].BlockHash = ThinKnownBlocks[BlockIndex].BlockHash; Result.Blocks[BlockIndex].ChunkRawHashes = std::move(ThinKnownBlocks[BlockIndex].ChunkRawHashes); } return Result; } virtual GetBlockDescriptionsResult GetBlockDescriptions(std::span BlockHashes) override { ZEN_UNUSED(BlockHashes); return GetBlockDescriptionsResult{Result{.ErrorCode = int(HttpResponseCode::NotFound)}}; } virtual AttachmentExistsInCacheResult AttachmentExistsInCache(std::span RawHashes) override { return AttachmentExistsInCacheResult{Result{.ErrorCode = 0}, std::vector(RawHashes.size(), false)}; } virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash, const AttachmentRange& Range) override { Stopwatch Timer; LoadAttachmentResult Result; std::filesystem::path ChunkPath = GetAttachmentPath(RawHash); if (!IsFile(ChunkPath)) { Result.ErrorCode = gsl::narrow(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog attachment from '{}'. Reason: 'The file does not exist'", ChunkPath.string()); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } { BasicFile ChunkFile; ChunkFile.Open(ChunkPath, BasicFile::Mode::kRead); if (Range) { Result.Bytes = ChunkFile.ReadRange(Range.Offset, Range.Bytes); } else { Result.Bytes = ChunkFile.ReadAll(); } } AddStats(0, Result.Bytes.GetSize(), Timer.GetElapsedTimeUs() * 1000); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } virtual LoadAttachmentsResult LoadAttachments(const std::vector& RawHashes) override { Stopwatch Timer; LoadAttachmentsResult Result; for (const IoHash& Hash : RawHashes) { LoadAttachmentResult ChunkResult = LoadAttachment(Hash, {}); if (ChunkResult.ErrorCode) { ChunkResult.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return LoadAttachmentsResult{ChunkResult}; } ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast(ChunkResult.ElapsedSeconds * 1000))); Result.Chunks.emplace_back( std::pair{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))}); } return Result; } virtual void Flush() override {} private: LoadContainerResult LoadContainer(const std::string& Name) { Stopwatch Timer; LoadContainerResult Result; std::filesystem::path SourcePath = m_OutputPath; SourcePath.append(Name); if (!IsFile(SourcePath)) { Result.ErrorCode = gsl::narrow(HttpResponseCode::NotFound); Result.Reason = fmt::format("Failed loading oplog container from '{}'. Reason: 'The file does not exist'", SourcePath.string()); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } IoBuffer ContainerPayload; { BasicFile ContainerFile; ContainerFile.Open(SourcePath, BasicFile::Mode::kRead); ContainerPayload = ContainerFile.ReadAll(); } AddStats(0, ContainerPayload.GetSize(), Timer.GetElapsedTimeUs() * 1000); CbValidateError ValidateResult = CbValidateError::None; if (Result.ContainerObject = ValidateAndReadCompactBinaryObject(std::move(ContainerPayload), ValidateResult); ValidateResult != CbValidateError::None || !Result.ContainerObject) { Result.ErrorCode = gsl::narrow(HttpResponseCode::InternalServerError); Result.Reason = fmt::format("The file {} is not formatted as a compact binary object ('{}')", SourcePath.string(), ToString(ValidateResult)); Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } Result.ElapsedSeconds = Timer.GetElapsedTimeMs() / 1000.0; return Result; } std::filesystem::path GetAttachmentPath(const IoHash& RawHash) const { ExtendablePathBuilder<128> ShardedPath; ShardedPath.Append(m_OutputPath.c_str()); ExtendableStringBuilder<64> HashString; RawHash.ToHexString(HashString); const char* str = HashString.c_str(); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str, str + 3); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 3, str + 5); ShardedPath.AppendSeparator(); ShardedPath.AppendAsciiRange(str + 5, str + 40); return ShardedPath.ToPath(); } void AddStats(uint64_t UploadedBytes, uint64_t DownloadedBytes, uint64_t ElapsedNS) { m_SentBytes.fetch_add(UploadedBytes); m_ReceivedBytes.fetch_add(DownloadedBytes); m_RequestTimeNS.fetch_add(ElapsedNS); SetAtomicMax(m_PeakSentBytes, UploadedBytes); SetAtomicMax(m_PeakReceivedBytes, DownloadedBytes); if (ElapsedNS > 0) { uint64_t BytesPerSec = (gsl::narrow(UploadedBytes + DownloadedBytes) * 1000000) / ElapsedNS; SetAtomicMax(m_PeakBytesPerSec, BytesPerSec); } m_RequestCount.fetch_add(1); } inline LoggerRef Log() const { return m_Log; } LoggerRef m_Log; const std::string m_Name; const std::string m_OptionalBaseName; const std::filesystem::path m_OutputPath; bool m_EnableBlocks = true; bool m_UseTempBlocks = false; std::atomic_uint64_t m_SentBytes = {}; std::atomic_uint64_t m_ReceivedBytes = {}; std::atomic_uint64_t m_RequestTimeNS = {}; std::atomic_uint64_t m_RequestCount = {}; std::atomic_uint64_t m_PeakSentBytes = {}; std::atomic_uint64_t m_PeakReceivedBytes = {}; std::atomic_uint64_t m_PeakBytesPerSec = {}; }; std::shared_ptr CreateFileRemoteStore(LoggerRef InLog, const FileRemoteStoreOptions& Options) { std::shared_ptr RemoteStore = std::make_shared(InLog, Options.Name, Options.OptionalBaseName, std::filesystem::path(Options.FolderPath), Options.ForceDisableBlocks, Options.ForceEnableTempBlocks); return RemoteStore; } } // namespace zen