aboutsummaryrefslogtreecommitdiff
path: root/src/zenremotestore/projectstore/projectstoreoperations.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-07 12:27:44 +0100
committerGitHub Enterprise <[email protected]>2025-11-07 12:27:44 +0100
commit72b1797e2b65ad47f4dc8e9fab73b9aa170889b4 (patch)
tree93c9f7c99965393ba9c6ec86c01b63899bfba684 /src/zenremotestore/projectstore/projectstoreoperations.cpp
parentmove progress bar to separate file (#638) (diff)
downloadzen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.tar.xz
zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.zip
get oplog attachments (#622)
* add support for downloading individual attachments from an oplog
Diffstat (limited to 'src/zenremotestore/projectstore/projectstoreoperations.cpp')
-rw-r--r--src/zenremotestore/projectstore/projectstoreoperations.cpp917
1 files changed, 917 insertions, 0 deletions
diff --git a/src/zenremotestore/projectstore/projectstoreoperations.cpp b/src/zenremotestore/projectstore/projectstoreoperations.cpp
new file mode 100644
index 000000000..7dd85531c
--- /dev/null
+++ b/src/zenremotestore/projectstore/projectstoreoperations.cpp
@@ -0,0 +1,917 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/projectstore/projectstoreoperations.h>
+
+#include <zencore/compactbinaryutil.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zenremotestore/builds/buildstorageutil.h>
+#include <zenremotestore/chunking/chunkedfile.h>
+#include <zenremotestore/operationlogoutput.h>
+#include <zenremotestore/projectstore/remoteprojectstore.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+//////////////////////////// ProjectStoreOperationOplogState
+
+ProjectStoreOperationOplogState::ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput,
+ StorageInstance& Storage,
+ const Oid& BuildId,
+ const Options& Options)
+: m_LogOutput(OperationLogOutput)
+, m_Storage(Storage)
+, m_BuildId(BuildId)
+, m_Options(Options)
+{
+}
+
+CbObjectView
+ProjectStoreOperationOplogState::LoadBuildObject()
+{
+ if (!m_BuildObject)
+ {
+ const std::filesystem::path CachedBuildObjectPath = m_Options.TempFolderPath / "build.cbo";
+ if (!m_Options.ForceDownload && IsFile(CachedBuildObjectPath))
+ {
+ Stopwatch Timer;
+ CbValidateError Error;
+ m_BuildObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildObjectPath), Error);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedBuildObjectPath);
+ m_BuildObject = {};
+ }
+ else
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read build {} from locally cached file in {}",
+ m_BuildId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ return m_BuildObject;
+ }
+ }
+
+ Stopwatch Timer;
+ m_BuildObject = m_Storage.BuildStorage->GetBuild(m_BuildId);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Fetched build {} from {} in {}",
+ m_BuildId,
+ m_Storage.BuildStorageHttp->GetBaseUri(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ CreateDirectories(CachedBuildObjectPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedBuildObjectPath, m_BuildObject.GetBuffer().GetView());
+ }
+ return m_BuildObject;
+}
+
+const Oid&
+ProjectStoreOperationOplogState::GetBuildId()
+{
+ return m_BuildId;
+}
+
+const Oid&
+ProjectStoreOperationOplogState::GetBuildPartId()
+{
+ if (m_BuildPartId == Oid::Zero)
+ {
+ CbObjectView BuildObject = LoadBuildObject();
+ CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
+ if (!PartsObject)
+ {
+ throw std::runtime_error(fmt::format("The build {} payload does not contain a 'parts' object"sv, m_BuildId));
+ }
+ static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
+ m_BuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
+ if (m_BuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(
+ fmt::format("The build {} payload 'parts' object does not contain a '{}' entry"sv, m_BuildId, OplogContainerPartName));
+ }
+ }
+ return m_BuildPartId;
+}
+
+CbObjectView
+ProjectStoreOperationOplogState::LoadBuildPartsObject()
+{
+ if (!m_BuildPartsObject)
+ {
+ const Oid BuildPartId = GetBuildPartId();
+ const std::filesystem::path CachedBuildPartObjectPath = m_Options.TempFolderPath / fmt::format("{}_part.cbo", BuildPartId);
+ if (!m_Options.ForceDownload && IsFile(CachedBuildPartObjectPath))
+ {
+ Stopwatch Timer;
+ CbValidateError Error;
+ m_BuildPartsObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildPartObjectPath), Error);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedBuildPartObjectPath);
+ m_BuildPartsObject = {};
+ }
+ else
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read build part {}/{} from locally cached file in {}",
+ m_BuildId,
+ BuildPartId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ return m_BuildPartsObject;
+ }
+ }
+
+ Stopwatch Timer;
+ m_BuildPartsObject = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Fetched build part {}/{} from {} in {}",
+ m_BuildId,
+ BuildPartId,
+ m_Storage.BuildStorageHttp->GetBaseUri(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ CreateDirectories(CachedBuildPartObjectPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedBuildPartObjectPath, m_BuildPartsObject.GetBuffer().GetView());
+ }
+ return m_BuildPartsObject;
+}
+
+CbObjectView
+ProjectStoreOperationOplogState::LoadOpsSectionObject()
+{
+ if (!m_OpsSectionObject)
+ {
+ const Oid BuildPartId = GetBuildPartId();
+ const std::filesystem::path CachedOpsSectionPath = m_Options.TempFolderPath / fmt::format("{}_part_ops.cbo", BuildPartId);
+ if (!m_Options.ForceDownload && IsFile(CachedOpsSectionPath))
+ {
+ Stopwatch Timer;
+ CbValidateError Error;
+ m_OpsSectionObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedOpsSectionPath), Error);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedOpsSectionPath);
+ m_OpsSectionObject = {};
+ }
+ else if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read {}/{}/ops from locally cached file in {}",
+ BuildPartId,
+ m_BuildId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ return m_OpsSectionObject;
+ }
+ }
+ CbObjectView ContainerObject = LoadBuildPartsObject();
+
+ Stopwatch Timer;
+
+ MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
+ IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
+ IoBuffer OpsSectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+
+ CbValidateError ValidateResult = CbValidateError::None;
+ m_OpsSectionObject = ValidateAndReadCompactBinaryObject(std::move(OpsSectionPayload), ValidateResult);
+ if (ValidateResult != CbValidateError::None)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult)));
+ }
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Decompressed and validated oplog payload {} -> {} in {}",
+ NiceBytes(OpsSection.GetSize()),
+ NiceBytes(m_OpsSectionObject.GetSize()),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ if (m_OpsSectionObject)
+ {
+ CreateDirectories(CachedOpsSectionPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedOpsSectionPath, m_OpsSectionObject.GetBuffer().GetView());
+ }
+ }
+ return m_OpsSectionObject;
+}
+
+CbArray
+ProjectStoreOperationOplogState::LoadArrayFromBuildPart(std::string_view ArrayName)
+{
+ const Oid BuildPartId = GetBuildPartId();
+ const std::filesystem::path CachedPartArrayPath = m_Options.TempFolderPath / fmt::format("{}_part_{}.cba", BuildPartId, ArrayName);
+ if (!m_Options.ForceDownload && IsFile(CachedPartArrayPath))
+ {
+ Stopwatch Timer;
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(CachedPartArrayPath);
+ CbValidateError Error = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedPartArrayPath);
+ }
+ else
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read {}/{}/{} from locally cached file in {}",
+ BuildPartId,
+ m_BuildId,
+ ArrayName,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ CbArray Result = CbArray(SharedBuffer(std::move(Payload)));
+ return Result;
+ }
+ }
+
+ CbObjectView ContainerObject = LoadBuildPartsObject();
+
+ Stopwatch Timer;
+
+ CbArrayView ArrayView = ContainerObject[ArrayName].AsArrayView();
+
+ CbArray Result;
+ {
+ CbWriter Writer;
+ Writer << ArrayView;
+ SharedBuffer ArrayBuffer = SharedBuffer::MakeView(Writer.Save().GetView()).MakeOwned();
+ Result = CbArray(std::move(ArrayBuffer));
+ }
+
+ CreateDirectories(CachedPartArrayPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedPartArrayPath, Result.GetBuffer().GetView());
+
+ return Result;
+}
+
+CbArrayView
+ProjectStoreOperationOplogState::LoadBlocksArray()
+{
+ if (!m_BlocksArray)
+ {
+ m_BlocksArray = LoadArrayFromBuildPart("blocks"sv);
+ }
+ return m_BlocksArray;
+}
+
+CbArrayView
+ProjectStoreOperationOplogState::LoadChunkedFilesArray()
+{
+ if (!m_ChunkedFilesArray)
+ {
+ m_ChunkedFilesArray = LoadArrayFromBuildPart("chunkedfiles"sv);
+ }
+ return m_ChunkedFilesArray;
+}
+
+CbArrayView
+ProjectStoreOperationOplogState::LoadChunksArray()
+{
+ if (!m_ChunksArray)
+ {
+ m_ChunksArray = LoadArrayFromBuildPart("chunks"sv);
+ }
+ return m_ChunksArray;
+}
+
+//////////////////////////// ProjectStoreOperationDownloadAttachments
+
+ProjectStoreOperationDownloadAttachments::ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ ProjectStoreOperationOplogState& State,
+ std::span<const IoHash> AttachmentHashes,
+ const Options& Options)
+: m_LogOutput(OperationLogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_State(State)
+, m_AttachmentHashes(AttachmentHashes.begin(), AttachmentHashes.end())
+, m_Options(Options)
+{
+}
+
+void
+ProjectStoreOperationDownloadAttachments::Execute()
+{
+ enum class TaskSteps : uint32_t
+ {
+ ReadAttachmentData,
+ Download,
+ AnalyzeDechunk,
+ Dechunk,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress =
+ MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); });
+
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ReadAttachmentData, (uint32_t)TaskSteps::StepCount);
+
+ Stopwatch Timer;
+ tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> ChunkSizes;
+
+ std::vector<ChunkedInfo> ChunkedFileInfos;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> FilesToDechunk;
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkedFileRawHashes;
+ {
+ CbArrayView ChunkedFilesArray = m_State.LoadChunkedFilesArray();
+ FilesToDechunk.reserve(ChunkedFilesArray.Num());
+ for (CbFieldView ChunkedFileView : ChunkedFilesArray)
+ {
+ CbObjectView ChunkedFile = ChunkedFileView.AsObjectView();
+ IoHash ChunkedRawHash = ChunkedFile["rawhash"sv].AsHash();
+ if (m_AttachmentHashes.contains(ChunkedRawHash))
+ {
+ ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFile);
+ ChunkSizes.insert_or_assign(Chunked.RawHash, Chunked.RawSize);
+ ChunkedFileRawHashes.reserve(ChunkedFileRawHashes.size() + Chunked.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.ChunkHashes)
+ {
+ ChunkedFileRawHashes.insert(ChunkHash);
+ }
+ FilesToDechunk.insert_or_assign(Chunked.RawHash, ChunkedFileInfos.size());
+ ChunkedFileInfos.emplace_back(std::move(Chunked));
+ }
+ }
+ }
+
+ tsl::robin_set<IoHash, IoHash::Hasher> LooseChunksToDownload;
+ {
+ CbArrayView ChunksArray = m_State.LoadChunksArray();
+ LooseChunksToDownload.reserve(ChunksArray.Num());
+ for (CbFieldView Chunk : ChunksArray)
+ {
+ const IoHash ChunkHash = Chunk.AsAttachment();
+ if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ LooseChunksToDownload.insert(ChunkHash);
+ }
+ }
+ }
+
+ tsl::robin_set<IoHash, IoHash::Hasher> BlocksToDownload;
+ tsl::robin_map<IoHash, IoHash, IoHash::Hasher> ChunkToBlock;
+
+ {
+ CbArrayView BlocksArray = m_State.LoadBlocksArray();
+
+ for (CbFieldView BlockView : BlocksArray)
+ {
+ CbObjectView Block = BlockView.AsObjectView();
+ IoHash BlockHash = Block["rawhash"sv].AsBinaryAttachment();
+ if (BlockHash == IoHash::Zero)
+ {
+ CbArrayView ChunksArray = Block["chunks"sv].AsArrayView();
+ for (CbFieldView ChunkHashView : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkHashView.AsAttachment();
+ if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ LooseChunksToDownload.insert(ChunkHash);
+ }
+ }
+ }
+ else
+ {
+ CbArrayView ChunksArray = Block["chunks"sv].AsArrayView();
+ for (CbFieldView ChunkHashView : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkHashView.AsHash();
+ if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ ChunkToBlock.insert_or_assign(ChunkHash, BlockHash);
+ BlocksToDownload.insert(BlockHash);
+ }
+ }
+ }
+ }
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ std::string DechunkInfo =
+ FilesToDechunk.size() > 0
+ ? fmt::format("\n{} file{} needs to be dechunked", FilesToDechunk.size(), FilesToDechunk.size() == 1 ? "" : "s")
+ : "";
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Need to download {} block{} and {} chunk{}{}",
+ BlocksToDownload.size(),
+ BlocksToDownload.size() == 1 ? "" : "s",
+ LooseChunksToDownload.size(),
+ LooseChunksToDownload.size() == 1 ? "" : "s",
+ DechunkInfo);
+ }
+
+ auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) {
+ IoBuffer Payload;
+ if (m_Storage.BuildCacheStorage)
+ {
+ Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
+ }
+ if (!Payload)
+ {
+ Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(),
+ RawHash,
+ Payload.GetContentType(),
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ }
+ uint64_t PayloadSize = Payload.GetSize();
+ IoBufferFileReference FileRef;
+ if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == PayloadSize))
+ {
+ std::error_code Ec;
+ std::filesystem::path TempPayloadPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ Payload.SetDeleteOnClose(false);
+ Payload = {};
+ RenameFile(TempPayloadPath, OutputPath, Ec);
+ if (Ec)
+ {
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempPayloadPath, BasicFile::Mode::kDelete);
+ Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, PayloadSize, true);
+ Payload.SetDeleteOnClose(true);
+ }
+ }
+ }
+ if (Payload)
+ {
+ TemporaryFile::SafeWriteFile(OutputPath, Payload.GetView());
+ }
+ };
+
+ std::filesystem::path TempAttachmentPath = MakeSafeAbsolutePath(m_Options.AttachmentOutputPath) / ".tmp";
+ CreateDirectories(TempAttachmentPath);
+ auto _0 = MakeGuard([this, &TempAttachmentPath]() {
+ if (true)
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput, "Cleaning up temporary directory");
+ }
+ CleanDirectory(TempAttachmentPath, true);
+ RemoveDir(TempAttachmentPath);
+ }
+ });
+
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Download, (uint32_t)TaskSteps::StepCount);
+
+ std::filesystem::path BlocksPath = TempAttachmentPath / "blocks";
+ CreateDirectories(BlocksPath);
+
+ {
+ Stopwatch DownloadTimer;
+
+ std::filesystem::path LooseChunksPath = TempAttachmentPath / "loosechunks";
+ CreateDirectories(LooseChunksPath);
+
+ std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading"));
+ OperationLogOutput::ProgressBar& DownloadProgressBar(*ProgressBarPtr);
+
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> LooseChunksCompleted;
+ std::atomic<size_t> BlocksCompleted;
+
+ std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end());
+
+ for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++)
+ {
+ Work.ScheduleWork(m_NetworkPool, [&, LooseChunkIndex](std::atomic<bool>&) {
+ const IoHash RawHash = LooseChunks[LooseChunkIndex];
+ std::filesystem::path LooseChunkOutputPath = LooseChunksPath / fmt::format("{}.ucb", RawHash);
+ if (m_Options.ForceDownload || !IsFile(LooseChunkOutputPath))
+ {
+ GetBuildBlob(RawHash, LooseChunkOutputPath);
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded chunk {}", RawHash);
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex, LooseChunkOutputPath](std::atomic<bool>&) {
+ const IoHash RawHash = LooseChunks[LooseChunkIndex];
+ IoHash ChunkRawHash;
+ uint64_t ChunkRawSize;
+ CompressedBuffer CompressedChunk =
+ CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(LooseChunkOutputPath)),
+ ChunkRawHash,
+ ChunkRawSize);
+ if (!CompressedChunk)
+ {
+ throw std::runtime_error(fmt::format("Downloaded chunk {} is malformed", RawHash));
+ }
+ if (ChunkRawHash != RawHash)
+ {
+ throw std::runtime_error(fmt::format("Downloaded chunk {} mismatches content hash {}", RawHash, ChunkRawHash));
+ }
+
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash);
+ if (m_Options.DecompressAttachments)
+ {
+ BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate);
+ if (!CompressedChunk.DecompressToStream(
+ 0,
+ ChunkRawSize,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_UNUSED(SourceSize);
+ ChunkOutput.Write(Range, Offset);
+ return true;
+ }))
+ {
+ ChunkOutput.Close();
+ RemoveFile(ChunkOutputPath);
+ throw std::runtime_error(fmt::format("Failed to decompress chunk {} to ", RawHash, ChunkOutputPath));
+ }
+ }
+ else
+ {
+ TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed());
+ }
+
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote loose chunk {} to '{}'", RawHash, ChunkOutputPath);
+ LooseChunksCompleted++;
+ });
+ });
+ }
+
+ std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end());
+
+ RwLock ChunkSizesLock;
+ for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++)
+ {
+ Work.ScheduleWork(m_NetworkPool, [&, BlockIndex](std::atomic<bool>&) {
+ const IoHash& RawHash = Blocks[BlockIndex];
+ std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", RawHash);
+ if (m_Options.ForceDownload || !IsFile(BlockOutputPath))
+ {
+ GetBuildBlob(RawHash, BlockOutputPath);
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded block {}", RawHash);
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex, BlockOutputPath](std::atomic<bool>&) {
+ const IoHash& BlockRawHash = Blocks[BlockIndex];
+
+ SharedBuffer BlockBuffer =
+ CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress();
+
+ uint64_t HeaderSize = 0;
+ if (IterateChunkBlock(
+ SharedBuffer(BlockBuffer),
+ [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) {
+ if (m_AttachmentHashes.contains(ChunkHash))
+ {
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkHash);
+
+ if (m_Options.DecompressAttachments)
+ {
+ BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate);
+ if (!CompressedChunk.DecompressToStream(0u,
+ ~uint64_t(0),
+ [&](uint64_t SourceOffset,
+ uint64_t SourceSize,
+ uint64_t Offset,
+ const CompositeBuffer& Range) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_UNUSED(SourceSize);
+ ChunkOutput.Write(Range, Offset);
+ return true;
+ }))
+ {
+ ChunkOutput.Close();
+ RemoveFile(ChunkOutputPath);
+ throw std::runtime_error(
+ fmt::format("Failed to decompress chunk {} to ", ChunkHash, ChunkOutputPath));
+ }
+ }
+ else
+ {
+ TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed());
+ }
+
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote block chunk {} to '{}'", ChunkHash, ChunkOutputPath);
+ }
+ if (ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ uint64_t RawSize = CompressedChunk.DecodeRawSize();
+ ChunkSizesLock.WithExclusiveLock([&]() { ChunkSizes.insert_or_assign(ChunkHash, RawSize); });
+ }
+ },
+ HeaderSize))
+ {
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash));
+ }
+ BlocksCompleted++;
+ });
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused, PendingWork);
+
+ std::string Details = fmt::format("{}/{} blocks, {}/{} chunks downloaded",
+ BlocksCompleted.load(),
+ BlocksToDownload.size(),
+ LooseChunksCompleted.load(),
+ LooseChunksToDownload.size());
+ DownloadProgressBar.UpdateState({.Task = "Downloading",
+ .Details = Details,
+ .TotalCount = BlocksToDownload.size() + LooseChunksToDownload.size(),
+ .RemainingCount = BlocksToDownload.size() + LooseChunksToDownload.size() -
+ (BlocksCompleted.load() + LooseChunksCompleted.load()),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ DownloadProgressBar.Finish();
+
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "{} block{} downloaded, {} loose chunk{} downloaded in {}",
+ BlocksToDownload.size(),
+ BlocksToDownload.size() == 1 ? "" : "s",
+ LooseChunksToDownload.size(),
+ LooseChunksToDownload.size() == 1 ? "" : "s",
+ NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+ }
+ }
+
+ if (!ChunkedFileInfos.empty())
+ {
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::AnalyzeDechunk, (uint32_t)TaskSteps::StepCount);
+
+ std::filesystem::path ChunkedFilesPath = TempAttachmentPath / "chunkedfiles";
+ CreateDirectories(ChunkedFilesPath);
+
+ try
+ {
+ std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Dechunking"));
+ OperationLogOutput::ProgressBar& DechunkingProgressBar(*ProgressBarPtr);
+
+ std::atomic<uint64_t> ChunksWritten;
+
+ std::vector<std::unique_ptr<BasicFile>> OpenChunkedFiles;
+ struct ChunkOpenFileTarget
+ {
+ size_t OpenChunkedFileIndex = 0;
+ uint64_t Offset = 0;
+ };
+
+ std::vector<ChunkOpenFileTarget> ChunkOpenFileTargets;
+
+ std::vector<eastl::fixed_vector<size_t, 1>> Targets;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkedFileTargetLookup;
+
+ // Build up file and offset targets for each chunk
+ for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++)
+ {
+ const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex];
+
+ const size_t OpenChunkedFileIndex = OpenChunkedFiles.size();
+
+ uint64_t Offset = 0;
+ for (uint32_t ChunkIndex : ChunkedFileInfo.ChunkSequence)
+ {
+ const IoHash& ChunkHash = ChunkedFileInfo.ChunkHashes[ChunkIndex];
+ auto ChunkSizeIt = ChunkSizes.find(ChunkHash);
+ if (ChunkSizeIt == ChunkSizes.end())
+ {
+ throw std::runtime_error(
+ fmt::format("Missing chunk {} to build chunked file {}", ChunkHash, ChunkedFileInfo.RawHash));
+ }
+ const uint64_t ChunkSize = ChunkSizeIt->second;
+ if (auto ChunkTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash);
+ ChunkTargetLookupIt != ChunkedFileTargetLookup.end())
+ {
+ size_t TargetIndex = ChunkTargetLookupIt->second;
+ eastl::fixed_vector<size_t, 1>& Target = Targets[TargetIndex];
+ Target.push_back(ChunkOpenFileTargets.size());
+ }
+ else
+ {
+ ChunkedFileTargetLookup.insert_or_assign(ChunkHash, Targets.size());
+ Targets.push_back({ChunkOpenFileTargets.size()});
+ }
+ ChunkOpenFileTargets.push_back({.OpenChunkedFileIndex = OpenChunkedFileIndex, .Offset = Offset});
+ Offset += ChunkSize;
+ }
+ std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash);
+ OpenChunkedFiles.emplace_back(std::make_unique<BasicFile>(ChunkedFilePath, BasicFile::Mode::kTruncate));
+ PrepareFileForScatteredWrite(OpenChunkedFiles.back()->Handle(), ChunkedFileInfo.RawSize);
+ }
+
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Dechunk, (uint32_t)TaskSteps::StepCount);
+
+ std::vector<std::atomic<uint8_t>> ChunkWrittenFlags(ChunkOpenFileTargets.size());
+
+ auto WriteChunk = [&](size_t ChunkedTargetsIndex, CompressedBuffer&& CompressedChunkBuffer) {
+ const eastl::fixed_vector<size_t, 1>& Target = Targets[ChunkedTargetsIndex];
+ for (size_t TargetIndex : Target)
+ {
+ uint8_t Expected = 0;
+ if (ChunkWrittenFlags[TargetIndex].compare_exchange_strong(Expected, true))
+ {
+ const ChunkOpenFileTarget& ChunkTarget = ChunkOpenFileTargets[TargetIndex];
+ BasicFile& OutputFile = *OpenChunkedFiles[ChunkTarget.OpenChunkedFileIndex];
+
+ if (!CompressedChunkBuffer.DecompressToStream(
+ 0u,
+ ~uint64_t(0),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_UNUSED(SourceSize);
+ OutputFile.Write(Range, ChunkTarget.Offset + Offset);
+ Offset += Range.GetSize();
+ return true;
+ }))
+ {
+ std::error_code DummyEc;
+ throw std::runtime_error(fmt::format("Failed to decompress chunk {} at offset {} to {}",
+ CompressedChunkBuffer.DecodeRawHash(),
+ ChunkTarget.Offset,
+ PathFromHandle(OutputFile.Handle(), DummyEc)));
+ }
+ ChunksWritten++;
+ }
+ }
+ };
+
+ {
+ Stopwatch DechunkTimer;
+
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end());
+
+ for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex](std::atomic<bool>&) {
+ const IoHash RawHash = LooseChunks[LooseChunkIndex];
+ if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(RawHash);
+ ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end())
+ {
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash);
+ IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkOutputPath);
+
+ WriteChunk(ChunkedFileTargetLookupIt->second,
+ CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)));
+ }
+ });
+ }
+
+ std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end());
+
+ for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex](std::atomic<bool>&) {
+ const IoHash& BlockRawHash = Blocks[BlockIndex];
+ std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", BlockRawHash);
+
+ SharedBuffer BlockBuffer =
+ CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress();
+
+ uint64_t HeaderSize = 0;
+ if (IterateChunkBlock(
+ SharedBuffer(BlockBuffer),
+ [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) {
+ if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash);
+ ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end())
+ {
+ WriteChunk(ChunkedFileTargetLookupIt->second, std::move(CompressedChunk));
+ }
+ },
+ HeaderSize))
+ {
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash));
+ }
+ });
+ }
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused, PendingWork);
+ std::string Details = fmt::format("{}/{} chunks written", ChunksWritten.load(), ChunkOpenFileTargets.size());
+
+ DechunkingProgressBar.UpdateState(
+ {.Task = "Dechunking ",
+ .Details = Details,
+ .TotalCount = ChunkOpenFileTargets.size(),
+ .RemainingCount = ChunkOpenFileTargets.size() - ChunksWritten.load(),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ DechunkingProgressBar.Finish();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "{} file{} dechunked in {}",
+ ChunkedFileInfos.size(),
+ ChunkedFileInfos.size() == 1 ? "" : "s",
+ NiceTimeSpanMs(DechunkTimer.GetElapsedTimeMs()));
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++)
+ {
+ const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex];
+ std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash);
+ RemoveFile(ChunkedFilePath);
+ }
+ throw;
+ }
+ {
+ Stopwatch VerifyTimer;
+ std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Verifying"));
+ OperationLogOutput::ProgressBar& VerifyProgressBar(*ProgressBarPtr);
+
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> DechunkedFilesMoved;
+
+ for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool, [&, ChunkedFileIndex](std::atomic<bool>&) {
+ const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex];
+ std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash);
+ IoHash VerifyHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(ChunkedFilePath));
+ if (VerifyHash != ChunkedFileInfo.RawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Dechunked file {} has mismatching hash {}", ChunkedFileInfo.RawHash, VerifyHash));
+ }
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkedFileInfo.RawHash);
+ RenameFile(ChunkedFilePath, ChunkOutputPath);
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Moved dechunked file {} to '{}'", ChunkedFileInfo.RawHash, ChunkOutputPath);
+ DechunkedFilesMoved++;
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused, PendingWork);
+ std::string Details = fmt::format("{}/{} files verified", DechunkedFilesMoved.load(), ChunkedFileInfos.size());
+
+ VerifyProgressBar.UpdateState({.Task = "Verifying ",
+ .Details = Details,
+ .TotalCount = ChunkedFileInfos.size(),
+ .RemainingCount = ChunkedFileInfos.size() - DechunkedFilesMoved.load(),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ VerifyProgressBar.Finish();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Verified {} chunked file{} in {}",
+ ChunkedFileInfos.size(),
+ ChunkedFileInfos.size() == 1 ? "" : "s",
+ NiceTimeSpanMs(VerifyTimer.GetElapsedTimeMs()));
+ }
+ }
+ }
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Downloaded {} attachment{} to '{}' in {}",
+ m_AttachmentHashes.size(),
+ m_AttachmentHashes.size() == 1 ? "" : "s",
+ m_Options.AttachmentOutputPath,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount);
+}
+
+} // namespace zen