diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-07 12:27:44 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-07 12:27:44 +0100 |
| commit | 72b1797e2b65ad47f4dc8e9fab73b9aa170889b4 (patch) | |
| tree | 93c9f7c99965393ba9c6ec86c01b63899bfba684 /src/zenremotestore/projectstore/projectstoreoperations.cpp | |
| parent | move progress bar to separate file (#638) (diff) | |
| download | zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.tar.xz zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.zip | |
get oplog attachments (#622)
* add support for downloading individual attachments from an oplog
Diffstat (limited to 'src/zenremotestore/projectstore/projectstoreoperations.cpp')
| -rw-r--r-- | src/zenremotestore/projectstore/projectstoreoperations.cpp | 917 |
1 files changed, 917 insertions, 0 deletions
diff --git a/src/zenremotestore/projectstore/projectstoreoperations.cpp b/src/zenremotestore/projectstore/projectstoreoperations.cpp new file mode 100644 index 000000000..7dd85531c --- /dev/null +++ b/src/zenremotestore/projectstore/projectstoreoperations.cpp @@ -0,0 +1,917 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/projectstore/projectstoreoperations.h> + +#include <zencore/compactbinaryutil.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zenremotestore/builds/buildstorageutil.h> +#include <zenremotestore/chunking/chunkedfile.h> +#include <zenremotestore/operationlogoutput.h> +#include <zenremotestore/projectstore/remoteprojectstore.h> + +namespace zen { + +using namespace std::literals; + +//////////////////////////// ProjectStoreOperationOplogState + +ProjectStoreOperationOplogState::ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput, + StorageInstance& Storage, + const Oid& BuildId, + const Options& Options) +: m_LogOutput(OperationLogOutput) +, m_Storage(Storage) +, m_BuildId(BuildId) +, m_Options(Options) +{ +} + +CbObjectView +ProjectStoreOperationOplogState::LoadBuildObject() +{ + if (!m_BuildObject) + { + const std::filesystem::path CachedBuildObjectPath = m_Options.TempFolderPath / "build.cbo"; + if (!m_Options.ForceDownload && IsFile(CachedBuildObjectPath)) + { + Stopwatch Timer; + CbValidateError Error; + m_BuildObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildObjectPath), Error); + if (Error != CbValidateError::None) + { + RemoveFile(CachedBuildObjectPath); + m_BuildObject = {}; + } + else + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read build {} from locally cached file in {}", + m_BuildId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + return m_BuildObject; + } + } + + Stopwatch Timer; + m_BuildObject = m_Storage.BuildStorage->GetBuild(m_BuildId); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Fetched build {} from {} in {}", + m_BuildId, + m_Storage.BuildStorageHttp->GetBaseUri(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + CreateDirectories(CachedBuildObjectPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedBuildObjectPath, m_BuildObject.GetBuffer().GetView()); + } + return m_BuildObject; +} + +const Oid& +ProjectStoreOperationOplogState::GetBuildId() +{ + return m_BuildId; +} + +const Oid& +ProjectStoreOperationOplogState::GetBuildPartId() +{ + if (m_BuildPartId == Oid::Zero) + { + CbObjectView BuildObject = LoadBuildObject(); + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + if (!PartsObject) + { + throw std::runtime_error(fmt::format("The build {} payload does not contain a 'parts' object"sv, m_BuildId)); + } + static const std::string_view OplogContainerPartName = "oplogcontainer"sv; + m_BuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); + if (m_BuildPartId == Oid::Zero) + { + throw std::runtime_error( + fmt::format("The build {} payload 'parts' object does not contain a '{}' entry"sv, m_BuildId, OplogContainerPartName)); + } + } + return m_BuildPartId; +} + +CbObjectView +ProjectStoreOperationOplogState::LoadBuildPartsObject() +{ + if (!m_BuildPartsObject) + { + const Oid BuildPartId = GetBuildPartId(); + const std::filesystem::path CachedBuildPartObjectPath = m_Options.TempFolderPath / fmt::format("{}_part.cbo", BuildPartId); + if (!m_Options.ForceDownload && IsFile(CachedBuildPartObjectPath)) + { + Stopwatch Timer; + CbValidateError Error; + m_BuildPartsObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildPartObjectPath), Error); + if (Error != CbValidateError::None) + { + RemoveFile(CachedBuildPartObjectPath); + m_BuildPartsObject = {}; + } + else + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read build part {}/{} from locally cached file in {}", + m_BuildId, + BuildPartId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + return m_BuildPartsObject; + } + } + + Stopwatch Timer; + m_BuildPartsObject = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Fetched build part {}/{} from {} in {}", + m_BuildId, + BuildPartId, + m_Storage.BuildStorageHttp->GetBaseUri(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + CreateDirectories(CachedBuildPartObjectPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedBuildPartObjectPath, m_BuildPartsObject.GetBuffer().GetView()); + } + return m_BuildPartsObject; +} + +CbObjectView +ProjectStoreOperationOplogState::LoadOpsSectionObject() +{ + if (!m_OpsSectionObject) + { + const Oid BuildPartId = GetBuildPartId(); + const std::filesystem::path CachedOpsSectionPath = m_Options.TempFolderPath / fmt::format("{}_part_ops.cbo", BuildPartId); + if (!m_Options.ForceDownload && IsFile(CachedOpsSectionPath)) + { + Stopwatch Timer; + CbValidateError Error; + m_OpsSectionObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedOpsSectionPath), Error); + if (Error != CbValidateError::None) + { + RemoveFile(CachedOpsSectionPath); + m_OpsSectionObject = {}; + } + else if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read {}/{}/ops from locally cached file in {}", + BuildPartId, + m_BuildId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + return m_OpsSectionObject; + } + } + CbObjectView ContainerObject = LoadBuildPartsObject(); + + Stopwatch Timer; + + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer OpsSectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + CbValidateError ValidateResult = CbValidateError::None; + m_OpsSectionObject = ValidateAndReadCompactBinaryObject(std::move(OpsSectionPayload), ValidateResult); + if (ValidateResult != CbValidateError::None) + { + throw std::runtime_error( + fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); + } + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Decompressed and validated oplog payload {} -> {} in {}", + NiceBytes(OpsSection.GetSize()), + NiceBytes(m_OpsSectionObject.GetSize()), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + if (m_OpsSectionObject) + { + CreateDirectories(CachedOpsSectionPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedOpsSectionPath, m_OpsSectionObject.GetBuffer().GetView()); + } + } + return m_OpsSectionObject; +} + +CbArray +ProjectStoreOperationOplogState::LoadArrayFromBuildPart(std::string_view ArrayName) +{ + const Oid BuildPartId = GetBuildPartId(); + const std::filesystem::path CachedPartArrayPath = m_Options.TempFolderPath / fmt::format("{}_part_{}.cba", BuildPartId, ArrayName); + if (!m_Options.ForceDownload && IsFile(CachedPartArrayPath)) + { + Stopwatch Timer; + IoBuffer Payload = IoBufferBuilder::MakeFromFile(CachedPartArrayPath); + CbValidateError Error = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default); + if (Error != CbValidateError::None) + { + RemoveFile(CachedPartArrayPath); + } + else + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read {}/{}/{} from locally cached file in {}", + BuildPartId, + m_BuildId, + ArrayName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + CbArray Result = CbArray(SharedBuffer(std::move(Payload))); + return Result; + } + } + + CbObjectView ContainerObject = LoadBuildPartsObject(); + + Stopwatch Timer; + + CbArrayView ArrayView = ContainerObject[ArrayName].AsArrayView(); + + CbArray Result; + { + CbWriter Writer; + Writer << ArrayView; + SharedBuffer ArrayBuffer = SharedBuffer::MakeView(Writer.Save().GetView()).MakeOwned(); + Result = CbArray(std::move(ArrayBuffer)); + } + + CreateDirectories(CachedPartArrayPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedPartArrayPath, Result.GetBuffer().GetView()); + + return Result; +} + +CbArrayView +ProjectStoreOperationOplogState::LoadBlocksArray() +{ + if (!m_BlocksArray) + { + m_BlocksArray = LoadArrayFromBuildPart("blocks"sv); + } + return m_BlocksArray; +} + +CbArrayView +ProjectStoreOperationOplogState::LoadChunkedFilesArray() +{ + if (!m_ChunkedFilesArray) + { + m_ChunkedFilesArray = LoadArrayFromBuildPart("chunkedfiles"sv); + } + return m_ChunkedFilesArray; +} + +CbArrayView +ProjectStoreOperationOplogState::LoadChunksArray() +{ + if (!m_ChunksArray) + { + m_ChunksArray = LoadArrayFromBuildPart("chunks"sv); + } + return m_ChunksArray; +} + +//////////////////////////// ProjectStoreOperationDownloadAttachments + +ProjectStoreOperationDownloadAttachments::ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + ProjectStoreOperationOplogState& State, + std::span<const IoHash> AttachmentHashes, + const Options& Options) +: m_LogOutput(OperationLogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_State(State) +, m_AttachmentHashes(AttachmentHashes.begin(), AttachmentHashes.end()) +, m_Options(Options) +{ +} + +void +ProjectStoreOperationDownloadAttachments::Execute() +{ + enum class TaskSteps : uint32_t + { + ReadAttachmentData, + Download, + AnalyzeDechunk, + Dechunk, + Cleanup, + StepCount + }; + + auto EndProgress = + MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ReadAttachmentData, (uint32_t)TaskSteps::StepCount); + + Stopwatch Timer; + tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> ChunkSizes; + + std::vector<ChunkedInfo> ChunkedFileInfos; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> FilesToDechunk; + tsl::robin_set<IoHash, IoHash::Hasher> ChunkedFileRawHashes; + { + CbArrayView ChunkedFilesArray = m_State.LoadChunkedFilesArray(); + FilesToDechunk.reserve(ChunkedFilesArray.Num()); + for (CbFieldView ChunkedFileView : ChunkedFilesArray) + { + CbObjectView ChunkedFile = ChunkedFileView.AsObjectView(); + IoHash ChunkedRawHash = ChunkedFile["rawhash"sv].AsHash(); + if (m_AttachmentHashes.contains(ChunkedRawHash)) + { + ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFile); + ChunkSizes.insert_or_assign(Chunked.RawHash, Chunked.RawSize); + ChunkedFileRawHashes.reserve(ChunkedFileRawHashes.size() + Chunked.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.ChunkHashes) + { + ChunkedFileRawHashes.insert(ChunkHash); + } + FilesToDechunk.insert_or_assign(Chunked.RawHash, ChunkedFileInfos.size()); + ChunkedFileInfos.emplace_back(std::move(Chunked)); + } + } + } + + tsl::robin_set<IoHash, IoHash::Hasher> LooseChunksToDownload; + { + CbArrayView ChunksArray = m_State.LoadChunksArray(); + LooseChunksToDownload.reserve(ChunksArray.Num()); + for (CbFieldView Chunk : ChunksArray) + { + const IoHash ChunkHash = Chunk.AsAttachment(); + if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) + { + LooseChunksToDownload.insert(ChunkHash); + } + } + } + + tsl::robin_set<IoHash, IoHash::Hasher> BlocksToDownload; + tsl::robin_map<IoHash, IoHash, IoHash::Hasher> ChunkToBlock; + + { + CbArrayView BlocksArray = m_State.LoadBlocksArray(); + + for (CbFieldView BlockView : BlocksArray) + { + CbObjectView Block = BlockView.AsObjectView(); + IoHash BlockHash = Block["rawhash"sv].AsBinaryAttachment(); + if (BlockHash == IoHash::Zero) + { + CbArrayView ChunksArray = Block["chunks"sv].AsArrayView(); + for (CbFieldView ChunkHashView : ChunksArray) + { + const IoHash ChunkHash = ChunkHashView.AsAttachment(); + if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) + { + LooseChunksToDownload.insert(ChunkHash); + } + } + } + else + { + CbArrayView ChunksArray = Block["chunks"sv].AsArrayView(); + for (CbFieldView ChunkHashView : ChunksArray) + { + const IoHash ChunkHash = ChunkHashView.AsHash(); + if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) + { + ChunkToBlock.insert_or_assign(ChunkHash, BlockHash); + BlocksToDownload.insert(BlockHash); + } + } + } + } + } + + if (!m_Options.IsQuiet) + { + std::string DechunkInfo = + FilesToDechunk.size() > 0 + ? fmt::format("\n{} file{} needs to be dechunked", FilesToDechunk.size(), FilesToDechunk.size() == 1 ? "" : "s") + : ""; + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Need to download {} block{} and {} chunk{}{}", + BlocksToDownload.size(), + BlocksToDownload.size() == 1 ? "" : "s", + LooseChunksToDownload.size(), + LooseChunksToDownload.size() == 1 ? "" : "s", + DechunkInfo); + } + + auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) { + IoBuffer Payload; + if (m_Storage.BuildCacheStorage) + { + Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); + } + if (!Payload) + { + Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(), + RawHash, + Payload.GetContentType(), + CompositeBuffer(SharedBuffer(Payload))); + } + } + uint64_t PayloadSize = Payload.GetSize(); + IoBufferFileReference FileRef; + if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == PayloadSize)) + { + std::error_code Ec; + std::filesystem::path TempPayloadPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + Payload.SetDeleteOnClose(false); + Payload = {}; + RenameFile(TempPayloadPath, OutputPath, Ec); + if (Ec) + { + // Re-open the temp file again + BasicFile OpenTemp(TempPayloadPath, BasicFile::Mode::kDelete); + Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + } + } + } + if (Payload) + { + TemporaryFile::SafeWriteFile(OutputPath, Payload.GetView()); + } + }; + + std::filesystem::path TempAttachmentPath = MakeSafeAbsolutePath(m_Options.AttachmentOutputPath) / ".tmp"; + CreateDirectories(TempAttachmentPath); + auto _0 = MakeGuard([this, &TempAttachmentPath]() { + if (true) + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, "Cleaning up temporary directory"); + } + CleanDirectory(TempAttachmentPath, true); + RemoveDir(TempAttachmentPath); + } + }); + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Download, (uint32_t)TaskSteps::StepCount); + + std::filesystem::path BlocksPath = TempAttachmentPath / "blocks"; + CreateDirectories(BlocksPath); + + { + Stopwatch DownloadTimer; + + std::filesystem::path LooseChunksPath = TempAttachmentPath / "loosechunks"; + CreateDirectories(LooseChunksPath); + + std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); + OperationLogOutput::ProgressBar& DownloadProgressBar(*ProgressBarPtr); + + std::atomic<bool> PauseFlag; + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<size_t> LooseChunksCompleted; + std::atomic<size_t> BlocksCompleted; + + std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end()); + + for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++) + { + Work.ScheduleWork(m_NetworkPool, [&, LooseChunkIndex](std::atomic<bool>&) { + const IoHash RawHash = LooseChunks[LooseChunkIndex]; + std::filesystem::path LooseChunkOutputPath = LooseChunksPath / fmt::format("{}.ucb", RawHash); + if (m_Options.ForceDownload || !IsFile(LooseChunkOutputPath)) + { + GetBuildBlob(RawHash, LooseChunkOutputPath); + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded chunk {}", RawHash); + } + + Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex, LooseChunkOutputPath](std::atomic<bool>&) { + const IoHash RawHash = LooseChunks[LooseChunkIndex]; + IoHash ChunkRawHash; + uint64_t ChunkRawSize; + CompressedBuffer CompressedChunk = + CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(LooseChunkOutputPath)), + ChunkRawHash, + ChunkRawSize); + if (!CompressedChunk) + { + throw std::runtime_error(fmt::format("Downloaded chunk {} is malformed", RawHash)); + } + if (ChunkRawHash != RawHash) + { + throw std::runtime_error(fmt::format("Downloaded chunk {} mismatches content hash {}", RawHash, ChunkRawHash)); + } + + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash); + if (m_Options.DecompressAttachments) + { + BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate); + if (!CompressedChunk.DecompressToStream( + 0, + ChunkRawSize, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { + ZEN_UNUSED(SourceOffset); + ZEN_UNUSED(SourceSize); + ChunkOutput.Write(Range, Offset); + return true; + })) + { + ChunkOutput.Close(); + RemoveFile(ChunkOutputPath); + throw std::runtime_error(fmt::format("Failed to decompress chunk {} to ", RawHash, ChunkOutputPath)); + } + } + else + { + TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed()); + } + + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote loose chunk {} to '{}'", RawHash, ChunkOutputPath); + LooseChunksCompleted++; + }); + }); + } + + std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end()); + + RwLock ChunkSizesLock; + for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++) + { + Work.ScheduleWork(m_NetworkPool, [&, BlockIndex](std::atomic<bool>&) { + const IoHash& RawHash = Blocks[BlockIndex]; + std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", RawHash); + if (m_Options.ForceDownload || !IsFile(BlockOutputPath)) + { + GetBuildBlob(RawHash, BlockOutputPath); + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded block {}", RawHash); + } + + Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex, BlockOutputPath](std::atomic<bool>&) { + const IoHash& BlockRawHash = Blocks[BlockIndex]; + + SharedBuffer BlockBuffer = + CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress(); + + uint64_t HeaderSize = 0; + if (IterateChunkBlock( + SharedBuffer(BlockBuffer), + [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) { + if (m_AttachmentHashes.contains(ChunkHash)) + { + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkHash); + + if (m_Options.DecompressAttachments) + { + BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate); + if (!CompressedChunk.DecompressToStream(0u, + ~uint64_t(0), + [&](uint64_t SourceOffset, + uint64_t SourceSize, + uint64_t Offset, + const CompositeBuffer& Range) { + ZEN_UNUSED(SourceOffset); + ZEN_UNUSED(SourceSize); + ChunkOutput.Write(Range, Offset); + return true; + })) + { + ChunkOutput.Close(); + RemoveFile(ChunkOutputPath); + throw std::runtime_error( + fmt::format("Failed to decompress chunk {} to ", ChunkHash, ChunkOutputPath)); + } + } + else + { + TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed()); + } + + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote block chunk {} to '{}'", ChunkHash, ChunkOutputPath); + } + if (ChunkedFileRawHashes.contains(ChunkHash)) + { + uint64_t RawSize = CompressedChunk.DecodeRawSize(); + ChunkSizesLock.WithExclusiveLock([&]() { ChunkSizes.insert_or_assign(ChunkHash, RawSize); }); + } + }, + HeaderSize)) + { + } + else + { + throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash)); + } + BlocksCompleted++; + }); + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused, PendingWork); + + std::string Details = fmt::format("{}/{} blocks, {}/{} chunks downloaded", + BlocksCompleted.load(), + BlocksToDownload.size(), + LooseChunksCompleted.load(), + LooseChunksToDownload.size()); + DownloadProgressBar.UpdateState({.Task = "Downloading", + .Details = Details, + .TotalCount = BlocksToDownload.size() + LooseChunksToDownload.size(), + .RemainingCount = BlocksToDownload.size() + LooseChunksToDownload.size() - + (BlocksCompleted.load() + LooseChunksCompleted.load()), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + DownloadProgressBar.Finish(); + + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "{} block{} downloaded, {} loose chunk{} downloaded in {}", + BlocksToDownload.size(), + BlocksToDownload.size() == 1 ? "" : "s", + LooseChunksToDownload.size(), + LooseChunksToDownload.size() == 1 ? "" : "s", + NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); + } + } + + if (!ChunkedFileInfos.empty()) + { + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::AnalyzeDechunk, (uint32_t)TaskSteps::StepCount); + + std::filesystem::path ChunkedFilesPath = TempAttachmentPath / "chunkedfiles"; + CreateDirectories(ChunkedFilesPath); + + try + { + std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Dechunking")); + OperationLogOutput::ProgressBar& DechunkingProgressBar(*ProgressBarPtr); + + std::atomic<uint64_t> ChunksWritten; + + std::vector<std::unique_ptr<BasicFile>> OpenChunkedFiles; + struct ChunkOpenFileTarget + { + size_t OpenChunkedFileIndex = 0; + uint64_t Offset = 0; + }; + + std::vector<ChunkOpenFileTarget> ChunkOpenFileTargets; + + std::vector<eastl::fixed_vector<size_t, 1>> Targets; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkedFileTargetLookup; + + // Build up file and offset targets for each chunk + for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) + { + const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; + + const size_t OpenChunkedFileIndex = OpenChunkedFiles.size(); + + uint64_t Offset = 0; + for (uint32_t ChunkIndex : ChunkedFileInfo.ChunkSequence) + { + const IoHash& ChunkHash = ChunkedFileInfo.ChunkHashes[ChunkIndex]; + auto ChunkSizeIt = ChunkSizes.find(ChunkHash); + if (ChunkSizeIt == ChunkSizes.end()) + { + throw std::runtime_error( + fmt::format("Missing chunk {} to build chunked file {}", ChunkHash, ChunkedFileInfo.RawHash)); + } + const uint64_t ChunkSize = ChunkSizeIt->second; + if (auto ChunkTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash); + ChunkTargetLookupIt != ChunkedFileTargetLookup.end()) + { + size_t TargetIndex = ChunkTargetLookupIt->second; + eastl::fixed_vector<size_t, 1>& Target = Targets[TargetIndex]; + Target.push_back(ChunkOpenFileTargets.size()); + } + else + { + ChunkedFileTargetLookup.insert_or_assign(ChunkHash, Targets.size()); + Targets.push_back({ChunkOpenFileTargets.size()}); + } + ChunkOpenFileTargets.push_back({.OpenChunkedFileIndex = OpenChunkedFileIndex, .Offset = Offset}); + Offset += ChunkSize; + } + std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); + OpenChunkedFiles.emplace_back(std::make_unique<BasicFile>(ChunkedFilePath, BasicFile::Mode::kTruncate)); + PrepareFileForScatteredWrite(OpenChunkedFiles.back()->Handle(), ChunkedFileInfo.RawSize); + } + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Dechunk, (uint32_t)TaskSteps::StepCount); + + std::vector<std::atomic<uint8_t>> ChunkWrittenFlags(ChunkOpenFileTargets.size()); + + auto WriteChunk = [&](size_t ChunkedTargetsIndex, CompressedBuffer&& CompressedChunkBuffer) { + const eastl::fixed_vector<size_t, 1>& Target = Targets[ChunkedTargetsIndex]; + for (size_t TargetIndex : Target) + { + uint8_t Expected = 0; + if (ChunkWrittenFlags[TargetIndex].compare_exchange_strong(Expected, true)) + { + const ChunkOpenFileTarget& ChunkTarget = ChunkOpenFileTargets[TargetIndex]; + BasicFile& OutputFile = *OpenChunkedFiles[ChunkTarget.OpenChunkedFileIndex]; + + if (!CompressedChunkBuffer.DecompressToStream( + 0u, + ~uint64_t(0), + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { + ZEN_UNUSED(SourceOffset); + ZEN_UNUSED(SourceSize); + OutputFile.Write(Range, ChunkTarget.Offset + Offset); + Offset += Range.GetSize(); + return true; + })) + { + std::error_code DummyEc; + throw std::runtime_error(fmt::format("Failed to decompress chunk {} at offset {} to {}", + CompressedChunkBuffer.DecodeRawHash(), + ChunkTarget.Offset, + PathFromHandle(OutputFile.Handle(), DummyEc))); + } + ChunksWritten++; + } + } + }; + + { + Stopwatch DechunkTimer; + + std::atomic<bool> PauseFlag; + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end()); + + for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex](std::atomic<bool>&) { + const IoHash RawHash = LooseChunks[LooseChunkIndex]; + if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(RawHash); + ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end()) + { + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash); + IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkOutputPath); + + WriteChunk(ChunkedFileTargetLookupIt->second, + CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer))); + } + }); + } + + std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end()); + + for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex](std::atomic<bool>&) { + const IoHash& BlockRawHash = Blocks[BlockIndex]; + std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", BlockRawHash); + + SharedBuffer BlockBuffer = + CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress(); + + uint64_t HeaderSize = 0; + if (IterateChunkBlock( + SharedBuffer(BlockBuffer), + [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) { + if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash); + ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end()) + { + WriteChunk(ChunkedFileTargetLookupIt->second, std::move(CompressedChunk)); + } + }, + HeaderSize)) + { + } + else + { + throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash)); + } + }); + } + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused, PendingWork); + std::string Details = fmt::format("{}/{} chunks written", ChunksWritten.load(), ChunkOpenFileTargets.size()); + + DechunkingProgressBar.UpdateState( + {.Task = "Dechunking ", + .Details = Details, + .TotalCount = ChunkOpenFileTargets.size(), + .RemainingCount = ChunkOpenFileTargets.size() - ChunksWritten.load(), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + DechunkingProgressBar.Finish(); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "{} file{} dechunked in {}", + ChunkedFileInfos.size(), + ChunkedFileInfos.size() == 1 ? "" : "s", + NiceTimeSpanMs(DechunkTimer.GetElapsedTimeMs())); + } + } + } + catch (const std::exception&) + { + for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) + { + const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; + std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); + RemoveFile(ChunkedFilePath); + } + throw; + } + { + Stopwatch VerifyTimer; + std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Verifying")); + OperationLogOutput::ProgressBar& VerifyProgressBar(*ProgressBarPtr); + + std::atomic<bool> PauseFlag; + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<size_t> DechunkedFilesMoved; + + for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, [&, ChunkedFileIndex](std::atomic<bool>&) { + const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; + std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); + IoHash VerifyHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(ChunkedFilePath)); + if (VerifyHash != ChunkedFileInfo.RawHash) + { + throw std::runtime_error( + fmt::format("Dechunked file {} has mismatching hash {}", ChunkedFileInfo.RawHash, VerifyHash)); + } + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkedFileInfo.RawHash); + RenameFile(ChunkedFilePath, ChunkOutputPath); + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Moved dechunked file {} to '{}'", ChunkedFileInfo.RawHash, ChunkOutputPath); + DechunkedFilesMoved++; + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused, PendingWork); + std::string Details = fmt::format("{}/{} files verified", DechunkedFilesMoved.load(), ChunkedFileInfos.size()); + + VerifyProgressBar.UpdateState({.Task = "Verifying ", + .Details = Details, + .TotalCount = ChunkedFileInfos.size(), + .RemainingCount = ChunkedFileInfos.size() - DechunkedFilesMoved.load(), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + VerifyProgressBar.Finish(); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Verified {} chunked file{} in {}", + ChunkedFileInfos.size(), + ChunkedFileInfos.size() == 1 ? "" : "s", + NiceTimeSpanMs(VerifyTimer.GetElapsedTimeMs())); + } + } + } + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Downloaded {} attachment{} to '{}' in {}", + m_AttachmentHashes.size(), + m_AttachmentHashes.size() == 1 ? "" : "s", + m_Options.AttachmentOutputPath, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); +} + +} // namespace zen |