diff options
Diffstat (limited to 'zen/cmds/exportproject.cpp')
| -rw-r--r-- | zen/cmds/exportproject.cpp | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/zen/cmds/exportproject.cpp b/zen/cmds/exportproject.cpp new file mode 100644 index 000000000..5d8c7d536 --- /dev/null +++ b/zen/cmds/exportproject.cpp @@ -0,0 +1,376 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "exportproject.h" + +#include <zencore/compactbinarybuilder.h> +#include <zencore/compactbinaryvalue.h> +#include <zencore/compress.h> +#include <zencore/filesystem.h> +#include <zencore/fmtutils.h> +#include <zencore/logging.h> +#include <zencore/stream.h> +#include <zencore/uid.h> +#include <zencore/workthreadpool.h> +#include <zenhttp/httpcommon.h> +#include <zenhttp/httpshared.h> +#include <zenutil/basicfile.h> + +#include <memory> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <cpr/cpr.h> +#include <gsl/gsl-lite.hpp> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace { + +void +EnsureDirectoryExists(const std::filesystem::path& Path) +{ + while (!std::filesystem::is_directory(Path)) + { + if (Path.has_parent_path()) + { + EnsureDirectoryExists(Path.parent_path()); + } + std::filesystem::create_directory(Path); + } +} + +} // namespace + +ExportProjectCommand::ExportProjectCommand() +{ + m_Options.add_options()("h,help", "Print help"); + m_Options.add_option("", "u", "hosturl", "Host URL", cxxopts::value(m_HostName)->default_value("http://localhost:1337"), "<hosturl>"); + m_Options.add_option("", "t", "target", "Target path", cxxopts::value(m_TargetPath), "<targetpath>"); + m_Options.add_option("", "p", "project", "Project name", cxxopts::value(m_ProjectName), "<projectname>"); + m_Options.add_option("", "o", "oplog", "Oplog name", cxxopts::value(m_OplogNames), "<oplog>"); +} + +ExportProjectCommand::~ExportProjectCommand() = default; + +bool +ExportProjectCommand::IsSuccess(const cpr::Response& Response, const std::string_view Operation) +{ + if (!zen::IsHttpSuccessCode(Response.status_code)) + { + if (Response.status_code) + { + ZEN_ERROR("{} failed: {}: {} ({})", Operation, Response.status_code, Response.reason, Response.text); + } + else + { + ZEN_ERROR("{} failed: {}", Operation, Response.error.message); + } + + return false; + } + return true; +} + +int +ExportProjectCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) +{ + using namespace std::literals; + + ZEN_UNUSED(GlobalOptions); + + m_Options.parse_positional({"target", "project", "oplog"}); + m_Options.parse(argc, argv); + + if (m_ProjectName.empty()) + { + ZEN_ERROR("Project name must be given"); + return 1; + } + + if (m_TargetPath.empty()) + { + ZEN_ERROR("Target path must be given"); + return 1; + } + + if (!std::filesystem::exists(m_TargetPath)) + { + zen::CreateDirectories(m_TargetPath); + } + else if (!std::filesystem::is_directory(m_TargetPath)) + { + ZEN_ERROR("Target path '{}' is not a directory", m_TargetPath); + return 1; + } + + const std::string UrlBase = fmt::format("{}/prj", m_HostName); + + cpr::Session Session; + { + ZEN_INFO("Requesting project '{}' from '{}'", m_ProjectName, m_HostName); + + std::string ProjectRequest = fmt::format("{}/{}", UrlBase, m_ProjectName); + Session.SetUrl({ProjectRequest}); + cpr::Response Response = Session.Get(); + if (!IsSuccess(Response, ProjectRequest)) + { + return 1; + } + zen::IoBuffer Payload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::BasicFile ProjectStore; + ProjectStore.Open(GetProjectPath(m_TargetPath, m_ProjectName), zen::BasicFile::Mode::kTruncate); + ProjectStore.Write(Payload.GetView(), 0); + + if (m_OplogNames.empty()) + { + zen::CbObject Params = LoadCompactBinaryObject(Payload); + zen ::CbArrayView Oplogs = Params["oplogs"sv].AsArrayView(); + for (auto& OplogEntry : Oplogs) + { + std::string_view OpLog = OplogEntry.AsObjectView()["id"sv].AsString(); + m_OplogNames.push_back(std::string(OpLog)); + } + } + } + + std::unordered_set<zen::IoHash, zen::IoHash::Hasher> UniqueChunks; + std::vector<zen::CbAttachment> AllAttachments; + std::vector<zen::CbPackage> OplogResponses; + for (const std::string& OplogName : m_OplogNames) + { + ZEN_INFO("Requesting oplog '{}/{}' from '{}' to '{}'", m_ProjectName, OplogName, m_HostName, m_TargetPath); + + std::string GetOplogArchiveRequest = fmt::format("{}/{}/oplog/{}/archive", UrlBase, m_ProjectName, OplogName); + Session.SetUrl({GetOplogArchiveRequest}); + Session.SetHeader(cpr::Header{{"Accept", "application/x-ue-comp"}}); + cpr::Response Response = Session.Get(); + if (!IsSuccess(Response, GetOplogArchiveRequest)) + { + return 1; + } + zen::IoBuffer CompressedPayload(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); + zen::IoBuffer Payload = zen::CompressedBuffer::FromCompressed(zen::SharedBuffer(CompressedPayload)).Decompress().AsIoBuffer(); + + OplogResponses.emplace_back(zen::ParsePackageMessage(Payload)); + zen::CbPackage& ResponsePackage = OplogResponses.back(); + zen::CbObject Result = ResponsePackage.GetObject(); + + zen::IoHash Checksum = Result["checksum"sv].AsHash(); + zen ::CbArrayView Entries = Result["entries"sv].AsArrayView(); + + ZEN_INFO("Exporting {} ops for oplog '{}/{}' with checksum '{}' to '{}'", + Entries.Num(), + m_ProjectName, + OplogName, + Checksum, + m_TargetPath); + { + zen::BasicFile OpStore; + OpStore.Open(GetOplogPath(m_TargetPath, OplogName), zen::BasicFile::Mode::kTruncate); + OplogHeader Header = {.OpCount = Entries.Num(), .Checksum = Checksum}; + OpStore.Write(&Header, sizeof(OplogHeader), 0); + std::vector<OplogEntry> OpEntries; + OpEntries.resize(Entries.Num()); + const uint64_t DataOffset = sizeof(OplogHeader) + OpEntries.size() * sizeof(OplogEntry); + uint64_t BulkOffset = DataOffset; + + zen::IoHashStream Hasher; + + for (uint64_t OpIndex = 0; auto& OpEntry : Entries) + { + zen::BinaryWriter Writer; + OpEntry.CopyTo(Writer); + zen::MemoryView OpView = Writer.GetView(); + Hasher.Append(OpView); + + OpEntries[OpIndex].Offset = BulkOffset; + OpEntries[OpIndex].OpLength = gsl::narrow<uint32_t>(OpView.GetSize()); + OpStore.Write(OpView, BulkOffset); + BulkOffset += OpView.GetSize(); + OpIndex++; + } + zen::IoHash CalculatedChecksum = Hasher.GetHash(); + if (CalculatedChecksum != Checksum) + { + ZEN_ERROR("Checksum for oplog does not match. Expected '{}' but got '{}'", Checksum, CalculatedChecksum); + return 1; + } + OpStore.Write(OpEntries.data(), OpEntries.size() * sizeof(OplogEntry), sizeof(OplogHeader)); + } + + std::span<const zen::CbAttachment> Attachments = ResponsePackage.GetAttachments(); + AllAttachments.reserve(AllAttachments.size() + Attachments.size()); + AllAttachments.reserve(UniqueChunks.size() + Attachments.size()); + for (const zen::CbAttachment& Attachment : Attachments) + { + if (UniqueChunks.insert(Attachment.GetHash()).second) + { + AllAttachments.push_back(Attachment); + } + } + + ZEN_INFO("Exported {} ops referencing {} chunks for {}", Entries.Num(), Attachments.size(), OplogName); + } + + size_t ChunkCount = AllAttachments.size(); + zen::BasicFile ChunkStoreIndex; + ChunkStoreIndex.Open(GetChunksIndexPath(m_TargetPath), zen::BasicFile::Mode::kTruncate); + ChunksHeader Header = {.ChunkCount = ChunkCount}; + ChunkStoreIndex.Write(&Header, sizeof(ChunksHeader), 0); + std::vector<ChunkEntry> ChunkEntries; + ChunkEntries.resize(ChunkCount); + uint64_t ChunkOffset = 0; + + zen::WorkerThreadPool WorkerPool(std::thread::hardware_concurrency()); + std::atomic_int64_t JobCount = 0; + std::vector<size_t> BlockChunkIndexes; + const size_t BlockSize = 1ull << Header.BlockSizeShift; + uint32_t CurrentBlockIndex = 0; + + auto WriteBlockAsync = [](const std::string& TargetPath, + size_t WriteBlockOffset, + uint32_t BlockIndex, + const std::vector<size_t>& BlockChunkIndexes, + const std::vector<ChunkEntry>& ChunkEntries, + const std::vector<zen::CbAttachment>& Attachments, + zen::WorkerThreadPool& WorkerPool, + std::atomic_int64_t& JobCount) { + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([&TargetPath, WriteBlockOffset, BlockIndex, BlockChunkIndexes, &ChunkEntries, &Attachments, &JobCount]() { + zen::BasicFile ChunkBlock; + ChunkBlock.Open(GetChunksPath(TargetPath, BlockIndex), zen::BasicFile::Mode::kTruncate); + for (size_t ChunkIndex : BlockChunkIndexes) + { + const ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; + zen::CompositeBuffer AttachmentBody = Attachments[ChunkIndex].AsCompressedBinary().GetCompressed(); + size_t AttachmentBulkOffset = Chunk.Offset - WriteBlockOffset; + for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + ChunkBlock.Write(Segment.GetData(), Segment.GetSize(), AttachmentBulkOffset); + AttachmentBulkOffset += SegmentSize; + } + } + JobCount.fetch_add(-1); + }); + }; + + ZEN_INFO("Exporting {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); + for (size_t ChunkIndex = 0; const zen::CbAttachment& Attachment : AllAttachments) + { + ChunkEntry& Chunk = ChunkEntries[ChunkIndex]; + Chunk.ChunkHash = Attachment.GetHash(); + zen::CompositeBuffer AttachmentBody = Attachment.AsCompressedBinary().GetCompressed(); + Chunk.Length = AttachmentBody.GetSize(); + + if (Chunk.Length < 1 * 1024 * 1024) // Use reasonable length for file + { + uint32_t BlockIndex = gsl::narrow<uint32_t>((ChunkOffset + Chunk.Length) / BlockSize); + if (BlockIndex != CurrentBlockIndex) + { + size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; + WriteBlockAsync(m_TargetPath, + WriteBlockOffset, + CurrentBlockIndex, + BlockChunkIndexes, + ChunkEntries, + AllAttachments, + WorkerPool, + JobCount); + + ChunkOffset = BlockIndex * BlockSize; + CurrentBlockIndex = BlockIndex; + BlockChunkIndexes.clear(); + } + + Chunk.Offset = ChunkOffset; + ChunkOffset = Chunk.Offset + Chunk.Length; + BlockChunkIndexes.push_back(ChunkIndex); + } + else + { + Chunk.Offset = ~0ull; + JobCount.fetch_add(1); + WorkerPool.ScheduleWork([this, AttachmentBody, &Chunk, &JobCount]() { + std::filesystem::path Path = GetLargeChunkPath(m_TargetPath, Chunk.ChunkHash); + EnsureDirectoryExists(Path.parent_path()); + zen::BasicFile ChunkFile; + ChunkFile.Open(Path, zen::BasicFile::Mode::kTruncate); + uint64_t Offset = 0; + for (const zen::SharedBuffer& Segment : AttachmentBody.GetSegments()) + { + size_t SegmentSize = Segment.GetSize(); + ChunkFile.Write(Segment.GetData(), Segment.GetSize(), Offset); + Offset += SegmentSize; + } + JobCount.fetch_add(-1); + }); + } + ChunkIndex++; + } + if (!BlockChunkIndexes.empty()) + { + size_t WriteBlockOffset = CurrentBlockIndex * BlockSize; + WriteBlockAsync(m_TargetPath, + WriteBlockOffset, + CurrentBlockIndex, + BlockChunkIndexes, + ChunkEntries, + AllAttachments, + WorkerPool, + JobCount); + } + + while (JobCount.load()) + { + zen::Sleep(1); + } + + ChunkStoreIndex.Write(ChunkEntries.data(), ChunkEntries.size() * sizeof(ChunkEntry), sizeof(ChunksHeader)); + + ZEN_INFO("Exported {} chunks from '{}' to '{}'", AllAttachments.size(), m_HostName, m_TargetPath); + + return 0; +} + +std::filesystem::path +ExportProjectCommand::GetOplogPath(const std::filesystem::path RootPath, const std::string& Oplog) +{ + return RootPath / (Oplog + ".ops"); +} + +std::filesystem::path +ExportProjectCommand::GetLargeChunkPath(const std::filesystem::path RootPath, const zen::IoHash& OpHash) +{ + zen::ExtendablePathBuilder<128> ShardedPath; + ShardedPath.Append(RootPath.c_str()); + zen::ExtendableStringBuilder<64> HashString; + OpHash.ToHexString(HashString); + const char* str = HashString.c_str(); + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str, str + 3); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 3, str + 5); + + ShardedPath.AppendSeparator(); + ShardedPath.AppendAsciiRange(str + 5, str + 40); + + return ShardedPath.ToPath(); +} + +std::filesystem::path +ExportProjectCommand::GetProjectPath(const std::filesystem::path RootPath, const std::string_view ProjectName) +{ + return RootPath / (std::string(ProjectName) + ".zcb"); +} + +std::filesystem::path +ExportProjectCommand::GetChunksIndexPath(const std::filesystem::path RootPath) +{ + return RootPath / "chunks.idx"; +} + +std::filesystem::path +ExportProjectCommand::GetChunksPath(const std::filesystem::path RootPath, uint32_t BlockIndex) +{ + return RootPath / fmt::format("chunks{}.bin", BlockIndex); +} |