// Copyright Epic Games, Inc. All Rights Reserved. #include #include #include #include #include #include #include #include #include namespace zen { using namespace std::literals; //////////////////////////// ProjectStoreOperationOplogState ProjectStoreOperationOplogState::ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput, StorageInstance& Storage, const Oid& BuildId, const Options& Options) : m_LogOutput(OperationLogOutput) , m_Storage(Storage) , m_BuildId(BuildId) , m_Options(Options) { } CbObjectView ProjectStoreOperationOplogState::LoadBuildObject() { if (!m_BuildObject) { const std::filesystem::path CachedBuildObjectPath = m_Options.TempFolderPath / "build.cbo"; if (!m_Options.ForceDownload && IsFile(CachedBuildObjectPath)) { Stopwatch Timer; CbValidateError Error; m_BuildObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildObjectPath), Error); if (Error != CbValidateError::None) { RemoveFile(CachedBuildObjectPath); m_BuildObject = {}; } else { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Read build {} from locally cached file in {}", m_BuildId, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } return m_BuildObject; } } Stopwatch Timer; m_BuildObject = m_Storage.BuildStorage->GetBuild(m_BuildId); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Fetched build {} from {} in {}", m_BuildId, m_Storage.BuildStorageHttp->GetBaseUri(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } CreateDirectories(CachedBuildObjectPath.parent_path()); TemporaryFile::SafeWriteFile(CachedBuildObjectPath, m_BuildObject.GetBuffer().GetView()); } return m_BuildObject; } const Oid& ProjectStoreOperationOplogState::GetBuildId() { return m_BuildId; } const Oid& ProjectStoreOperationOplogState::GetBuildPartId() { if (m_BuildPartId == Oid::Zero) { CbObjectView BuildObject = LoadBuildObject(); CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); if (!PartsObject) { throw std::runtime_error(fmt::format("The build {} payload does not contain a 'parts' object"sv, m_BuildId)); } static const std::string_view OplogContainerPartName = "oplogcontainer"sv; m_BuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); if (m_BuildPartId == Oid::Zero) { throw std::runtime_error( fmt::format("The build {} payload 'parts' object does not contain a '{}' entry"sv, m_BuildId, OplogContainerPartName)); } } return m_BuildPartId; } CbObjectView ProjectStoreOperationOplogState::LoadBuildPartsObject() { if (!m_BuildPartsObject) { const Oid BuildPartId = GetBuildPartId(); const std::filesystem::path CachedBuildPartObjectPath = m_Options.TempFolderPath / fmt::format("{}_part.cbo", BuildPartId); if (!m_Options.ForceDownload && IsFile(CachedBuildPartObjectPath)) { Stopwatch Timer; CbValidateError Error; m_BuildPartsObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildPartObjectPath), Error); if (Error != CbValidateError::None) { RemoveFile(CachedBuildPartObjectPath); m_BuildPartsObject = {}; } else { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Read build part {}/{} from locally cached file in {}", m_BuildId, BuildPartId, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } return m_BuildPartsObject; } } Stopwatch Timer; m_BuildPartsObject = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Fetched build part {}/{} from {} in {}", m_BuildId, BuildPartId, m_Storage.BuildStorageHttp->GetBaseUri(), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } CreateDirectories(CachedBuildPartObjectPath.parent_path()); TemporaryFile::SafeWriteFile(CachedBuildPartObjectPath, m_BuildPartsObject.GetBuffer().GetView()); } return m_BuildPartsObject; } CbObjectView ProjectStoreOperationOplogState::LoadOpsSectionObject() { if (!m_OpsSectionObject) { const Oid BuildPartId = GetBuildPartId(); const std::filesystem::path CachedOpsSectionPath = m_Options.TempFolderPath / fmt::format("{}_part_ops.cbo", BuildPartId); if (!m_Options.ForceDownload && IsFile(CachedOpsSectionPath)) { Stopwatch Timer; CbValidateError Error; m_OpsSectionObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedOpsSectionPath), Error); if (Error != CbValidateError::None) { RemoveFile(CachedOpsSectionPath); m_OpsSectionObject = {}; } else if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Read {}/{}/ops from locally cached file in {}", BuildPartId, m_BuildId, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); return m_OpsSectionObject; } } CbObjectView ContainerObject = LoadBuildPartsObject(); Stopwatch Timer; MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); IoBuffer OpsSectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); CbValidateError ValidateResult = CbValidateError::None; m_OpsSectionObject = ValidateAndReadCompactBinaryObject(std::move(OpsSectionPayload), ValidateResult); if (ValidateResult != CbValidateError::None) { throw std::runtime_error( fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); } if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Decompressed and validated oplog payload {} -> {} in {}", NiceBytes(OpsSection.GetSize()), NiceBytes(m_OpsSectionObject.GetSize()), NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } if (m_OpsSectionObject) { CreateDirectories(CachedOpsSectionPath.parent_path()); TemporaryFile::SafeWriteFile(CachedOpsSectionPath, m_OpsSectionObject.GetBuffer().GetView()); } } return m_OpsSectionObject; } CbArray ProjectStoreOperationOplogState::LoadArrayFromBuildPart(std::string_view ArrayName) { const Oid BuildPartId = GetBuildPartId(); const std::filesystem::path CachedPartArrayPath = m_Options.TempFolderPath / fmt::format("{}_part_{}.cba", BuildPartId, ArrayName); if (!m_Options.ForceDownload && IsFile(CachedPartArrayPath)) { Stopwatch Timer; IoBuffer Payload = IoBufferBuilder::MakeFromFile(CachedPartArrayPath); CbValidateError Error = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default); if (Error != CbValidateError::None) { RemoveFile(CachedPartArrayPath); } else { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Read {}/{}/{} from locally cached file in {}", BuildPartId, m_BuildId, ArrayName, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } CbArray Result = CbArray(SharedBuffer(std::move(Payload))); return Result; } } CbObjectView ContainerObject = LoadBuildPartsObject(); Stopwatch Timer; CbArrayView ArrayView = ContainerObject[ArrayName].AsArrayView(); CbArray Result; { CbWriter Writer; Writer << ArrayView; SharedBuffer ArrayBuffer = SharedBuffer::MakeView(Writer.Save().GetView()).MakeOwned(); Result = CbArray(std::move(ArrayBuffer)); } CreateDirectories(CachedPartArrayPath.parent_path()); TemporaryFile::SafeWriteFile(CachedPartArrayPath, Result.GetBuffer().GetView()); return Result; } CbArrayView ProjectStoreOperationOplogState::LoadBlocksArray() { if (!m_BlocksArray) { m_BlocksArray = LoadArrayFromBuildPart("blocks"sv); } return m_BlocksArray; } CbArrayView ProjectStoreOperationOplogState::LoadChunkedFilesArray() { if (!m_ChunkedFilesArray) { m_ChunkedFilesArray = LoadArrayFromBuildPart("chunkedfiles"sv); } return m_ChunkedFilesArray; } CbArrayView ProjectStoreOperationOplogState::LoadChunksArray() { if (!m_ChunksArray) { m_ChunksArray = LoadArrayFromBuildPart("chunks"sv); } return m_ChunksArray; } //////////////////////////// ProjectStoreOperationDownloadAttachments ProjectStoreOperationDownloadAttachments::ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput, StorageInstance& Storage, std::atomic& AbortFlag, std::atomic& PauseFlag, WorkerThreadPool& IOWorkerPool, WorkerThreadPool& NetworkPool, ProjectStoreOperationOplogState& State, std::span AttachmentHashes, const Options& Options) : m_LogOutput(OperationLogOutput) , m_Storage(Storage) , m_AbortFlag(AbortFlag) , m_PauseFlag(PauseFlag) , m_IOWorkerPool(IOWorkerPool) , m_NetworkPool(NetworkPool) , m_State(State) , m_AttachmentHashes(AttachmentHashes.begin(), AttachmentHashes.end()) , m_Options(Options) { } void ProjectStoreOperationDownloadAttachments::Execute() { enum class TaskSteps : uint32_t { ReadAttachmentData, Download, AnalyzeDechunk, Dechunk, Cleanup, StepCount }; auto EndProgress = MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ReadAttachmentData, (uint32_t)TaskSteps::StepCount); Stopwatch Timer; tsl::robin_map ChunkSizes; std::vector ChunkedFileInfos; tsl::robin_map FilesToDechunk; tsl::robin_set ChunkedFileRawHashes; { CbArrayView ChunkedFilesArray = m_State.LoadChunkedFilesArray(); FilesToDechunk.reserve(ChunkedFilesArray.Num()); for (CbFieldView ChunkedFileView : ChunkedFilesArray) { CbObjectView ChunkedFile = ChunkedFileView.AsObjectView(); IoHash ChunkedRawHash = ChunkedFile["rawhash"sv].AsHash(); if (m_AttachmentHashes.contains(ChunkedRawHash)) { ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFile); ChunkSizes.insert_or_assign(Chunked.RawHash, Chunked.RawSize); ChunkedFileRawHashes.reserve(ChunkedFileRawHashes.size() + Chunked.ChunkHashes.size()); for (const IoHash& ChunkHash : Chunked.ChunkHashes) { ChunkedFileRawHashes.insert(ChunkHash); } FilesToDechunk.insert_or_assign(Chunked.RawHash, ChunkedFileInfos.size()); ChunkedFileInfos.emplace_back(std::move(Chunked)); } } } tsl::robin_set LooseChunksToDownload; { CbArrayView ChunksArray = m_State.LoadChunksArray(); LooseChunksToDownload.reserve(ChunksArray.Num()); for (CbFieldView Chunk : ChunksArray) { const IoHash ChunkHash = Chunk.AsAttachment(); if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) { LooseChunksToDownload.insert(ChunkHash); } } } tsl::robin_set BlocksToDownload; tsl::robin_map ChunkToBlock; { CbArrayView BlocksArray = m_State.LoadBlocksArray(); for (CbFieldView BlockView : BlocksArray) { CbObjectView Block = BlockView.AsObjectView(); IoHash BlockHash = Block["rawhash"sv].AsBinaryAttachment(); if (BlockHash == IoHash::Zero) { CbArrayView ChunksArray = Block["chunks"sv].AsArrayView(); for (CbFieldView ChunkHashView : ChunksArray) { const IoHash ChunkHash = ChunkHashView.AsAttachment(); if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) { LooseChunksToDownload.insert(ChunkHash); } } } else { CbArrayView ChunksArray = Block["chunks"sv].AsArrayView(); for (CbFieldView ChunkHashView : ChunksArray) { const IoHash ChunkHash = ChunkHashView.AsHash(); if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) { ChunkToBlock.insert_or_assign(ChunkHash, BlockHash); BlocksToDownload.insert(BlockHash); } } } } } if (!m_Options.IsQuiet) { std::string DechunkInfo = FilesToDechunk.size() > 0 ? fmt::format("\n{} file{} needs to be dechunked", FilesToDechunk.size(), FilesToDechunk.size() == 1 ? "" : "s") : ""; ZEN_OPERATION_LOG_INFO(m_LogOutput, "Need to download {} block{} and {} chunk{}{}", BlocksToDownload.size(), BlocksToDownload.size() == 1 ? "" : "s", LooseChunksToDownload.size(), LooseChunksToDownload.size() == 1 ? "" : "s", DechunkInfo); } auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) { IoBuffer Payload; if (m_Storage.BuildCacheStorage) { Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); } if (!Payload) { Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); if (m_Storage.BuildCacheStorage && m_Options.PopulateCache) { m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(), RawHash, Payload.GetContentType(), CompositeBuffer(SharedBuffer(Payload))); } } uint64_t PayloadSize = Payload.GetSize(); IoBufferFileReference FileRef; if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == PayloadSize)) { std::error_code Ec; std::filesystem::path TempPayloadPath = PathFromHandle(FileRef.FileHandle, Ec); if (!Ec) { Payload.SetDeleteOnClose(false); Payload = {}; RenameFile(TempPayloadPath, OutputPath, Ec); if (Ec) { // Re-open the temp file again BasicFile OpenTemp(TempPayloadPath, BasicFile::Mode::kDelete); Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, PayloadSize, true); Payload.SetDeleteOnClose(true); } } } if (Payload) { TemporaryFile::SafeWriteFile(OutputPath, Payload.GetView()); } }; std::filesystem::path TempAttachmentPath = MakeSafeAbsolutePath(m_Options.AttachmentOutputPath) / ".tmp"; CreateDirectories(TempAttachmentPath); auto _0 = MakeGuard([this, &TempAttachmentPath]() { if (true) { if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Cleaning up temporary directory"); } CleanDirectory(TempAttachmentPath, true); RemoveDir(TempAttachmentPath); } }); m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Download, (uint32_t)TaskSteps::StepCount); std::filesystem::path BlocksPath = TempAttachmentPath / "blocks"; CreateDirectories(BlocksPath); { Stopwatch DownloadTimer; std::filesystem::path LooseChunksPath = TempAttachmentPath / "loosechunks"; CreateDirectories(LooseChunksPath); std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); OperationLogOutput::ProgressBar& DownloadProgressBar(*ProgressBarPtr); std::atomic PauseFlag; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic LooseChunksCompleted; std::atomic BlocksCompleted; std::vector LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end()); for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++) { Work.ScheduleWork(m_NetworkPool, [&, LooseChunkIndex](std::atomic&) { const IoHash RawHash = LooseChunks[LooseChunkIndex]; std::filesystem::path LooseChunkOutputPath = LooseChunksPath / fmt::format("{}.ucb", RawHash); if (m_Options.ForceDownload || !IsFile(LooseChunkOutputPath)) { GetBuildBlob(RawHash, LooseChunkOutputPath); ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded chunk {}", RawHash); } Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex, LooseChunkOutputPath](std::atomic&) { const IoHash RawHash = LooseChunks[LooseChunkIndex]; IoHash ChunkRawHash; uint64_t ChunkRawSize; CompressedBuffer CompressedChunk = CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(LooseChunkOutputPath)), ChunkRawHash, ChunkRawSize); if (!CompressedChunk) { throw std::runtime_error(fmt::format("Downloaded chunk {} is malformed", RawHash)); } if (ChunkRawHash != RawHash) { throw std::runtime_error(fmt::format("Downloaded chunk {} mismatches content hash {}", RawHash, ChunkRawHash)); } std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash); if (m_Options.DecompressAttachments) { BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate); if (!CompressedChunk.DecompressToStream( 0, ChunkRawSize, [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { ZEN_UNUSED(SourceOffset); ZEN_UNUSED(SourceSize); ChunkOutput.Write(Range, Offset); return true; })) { ChunkOutput.Close(); RemoveFile(ChunkOutputPath); throw std::runtime_error(fmt::format("Failed to decompress chunk {} to ", RawHash, ChunkOutputPath)); } } else { TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed()); } ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote loose chunk {} to '{}'", RawHash, ChunkOutputPath); LooseChunksCompleted++; }); }); } std::vector Blocks(BlocksToDownload.begin(), BlocksToDownload.end()); RwLock ChunkSizesLock; for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++) { Work.ScheduleWork(m_NetworkPool, [&, BlockIndex](std::atomic&) { const IoHash& RawHash = Blocks[BlockIndex]; std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", RawHash); if (m_Options.ForceDownload || !IsFile(BlockOutputPath)) { GetBuildBlob(RawHash, BlockOutputPath); ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded block {}", RawHash); } Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex, BlockOutputPath](std::atomic&) { const IoHash& BlockRawHash = Blocks[BlockIndex]; SharedBuffer BlockBuffer = CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress(); uint64_t HeaderSize = 0; if (IterateChunkBlock( SharedBuffer(BlockBuffer), [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) { if (m_AttachmentHashes.contains(ChunkHash)) { std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkHash); if (m_Options.DecompressAttachments) { BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate); if (!CompressedChunk.DecompressToStream(0u, ~uint64_t(0), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { ZEN_UNUSED(SourceOffset); ZEN_UNUSED(SourceSize); ChunkOutput.Write(Range, Offset); return true; })) { ChunkOutput.Close(); RemoveFile(ChunkOutputPath); throw std::runtime_error( fmt::format("Failed to decompress chunk {} to ", ChunkHash, ChunkOutputPath)); } } else { TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed()); } ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote block chunk {} to '{}'", ChunkHash, ChunkOutputPath); } if (ChunkedFileRawHashes.contains(ChunkHash)) { uint64_t RawSize = CompressedChunk.DecodeRawSize(); ChunkSizesLock.WithExclusiveLock([&]() { ChunkSizes.insert_or_assign(ChunkHash, RawSize); }); } }, HeaderSize)) { } else { throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash)); } BlocksCompleted++; }); }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused, PendingWork); std::string Details = fmt::format("{}/{} blocks, {}/{} chunks downloaded", BlocksCompleted.load(), BlocksToDownload.size(), LooseChunksCompleted.load(), LooseChunksToDownload.size()); DownloadProgressBar.UpdateState({.Task = "Downloading", .Details = Details, .TotalCount = BlocksToDownload.size() + LooseChunksToDownload.size(), .RemainingCount = BlocksToDownload.size() + LooseChunksToDownload.size() - (BlocksCompleted.load() + LooseChunksCompleted.load()), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); DownloadProgressBar.Finish(); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "{} block{} downloaded, {} loose chunk{} downloaded in {}", BlocksToDownload.size(), BlocksToDownload.size() == 1 ? "" : "s", LooseChunksToDownload.size(), LooseChunksToDownload.size() == 1 ? "" : "s", NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); } } if (!ChunkedFileInfos.empty()) { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::AnalyzeDechunk, (uint32_t)TaskSteps::StepCount); std::filesystem::path ChunkedFilesPath = TempAttachmentPath / "chunkedfiles"; CreateDirectories(ChunkedFilesPath); try { std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Dechunking")); OperationLogOutput::ProgressBar& DechunkingProgressBar(*ProgressBarPtr); std::atomic ChunksWritten; std::vector> OpenChunkedFiles; struct ChunkOpenFileTarget { size_t OpenChunkedFileIndex = 0; uint64_t Offset = 0; }; std::vector ChunkOpenFileTargets; std::vector> Targets; tsl::robin_map ChunkedFileTargetLookup; // Build up file and offset targets for each chunk for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) { const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; const size_t OpenChunkedFileIndex = OpenChunkedFiles.size(); uint64_t Offset = 0; for (uint32_t ChunkIndex : ChunkedFileInfo.ChunkSequence) { const IoHash& ChunkHash = ChunkedFileInfo.ChunkHashes[ChunkIndex]; auto ChunkSizeIt = ChunkSizes.find(ChunkHash); if (ChunkSizeIt == ChunkSizes.end()) { throw std::runtime_error( fmt::format("Missing chunk {} to build chunked file {}", ChunkHash, ChunkedFileInfo.RawHash)); } const uint64_t ChunkSize = ChunkSizeIt->second; if (auto ChunkTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash); ChunkTargetLookupIt != ChunkedFileTargetLookup.end()) { size_t TargetIndex = ChunkTargetLookupIt->second; eastl::fixed_vector& Target = Targets[TargetIndex]; Target.push_back(ChunkOpenFileTargets.size()); } else { ChunkedFileTargetLookup.insert_or_assign(ChunkHash, Targets.size()); Targets.push_back({ChunkOpenFileTargets.size()}); } ChunkOpenFileTargets.push_back({.OpenChunkedFileIndex = OpenChunkedFileIndex, .Offset = Offset}); Offset += ChunkSize; } std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); OpenChunkedFiles.emplace_back(std::make_unique(ChunkedFilePath, BasicFile::Mode::kTruncate)); PrepareFileForScatteredWrite(OpenChunkedFiles.back()->Handle(), ChunkedFileInfo.RawSize); } m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Dechunk, (uint32_t)TaskSteps::StepCount); std::vector> ChunkWrittenFlags(ChunkOpenFileTargets.size()); auto WriteChunk = [&](size_t ChunkedTargetsIndex, CompressedBuffer&& CompressedChunkBuffer) { const eastl::fixed_vector& Target = Targets[ChunkedTargetsIndex]; for (size_t TargetIndex : Target) { uint8_t Expected = 0; if (ChunkWrittenFlags[TargetIndex].compare_exchange_strong(Expected, true)) { const ChunkOpenFileTarget& ChunkTarget = ChunkOpenFileTargets[TargetIndex]; BasicFile& OutputFile = *OpenChunkedFiles[ChunkTarget.OpenChunkedFileIndex]; if (!CompressedChunkBuffer.DecompressToStream( 0u, ~uint64_t(0), [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { ZEN_UNUSED(SourceOffset); ZEN_UNUSED(SourceSize); OutputFile.Write(Range, ChunkTarget.Offset + Offset); Offset += Range.GetSize(); return true; })) { std::error_code DummyEc; throw std::runtime_error(fmt::format("Failed to decompress chunk {} at offset {} to {}", CompressedChunkBuffer.DecodeRawHash(), ChunkTarget.Offset, PathFromHandle(OutputFile.Handle(), DummyEc))); } ChunksWritten++; } } }; { Stopwatch DechunkTimer; std::atomic PauseFlag; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::vector LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end()); for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++) { Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex](std::atomic&) { const IoHash RawHash = LooseChunks[LooseChunkIndex]; if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(RawHash); ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end()) { std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash); IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkOutputPath); WriteChunk(ChunkedFileTargetLookupIt->second, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer))); } }); } std::vector Blocks(BlocksToDownload.begin(), BlocksToDownload.end()); for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++) { Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex](std::atomic&) { const IoHash& BlockRawHash = Blocks[BlockIndex]; std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", BlockRawHash); SharedBuffer BlockBuffer = CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress(); uint64_t HeaderSize = 0; if (IterateChunkBlock( SharedBuffer(BlockBuffer), [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) { if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash); ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end()) { WriteChunk(ChunkedFileTargetLookupIt->second, std::move(CompressedChunk)); } }, HeaderSize)) { } else { throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash)); } }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused, PendingWork); std::string Details = fmt::format("{}/{} chunks written", ChunksWritten.load(), ChunkOpenFileTargets.size()); DechunkingProgressBar.UpdateState( {.Task = "Dechunking ", .Details = Details, .TotalCount = ChunkOpenFileTargets.size(), .RemainingCount = ChunkOpenFileTargets.size() - ChunksWritten.load(), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); DechunkingProgressBar.Finish(); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "{} file{} dechunked in {}", ChunkedFileInfos.size(), ChunkedFileInfos.size() == 1 ? "" : "s", NiceTimeSpanMs(DechunkTimer.GetElapsedTimeMs())); } } } catch (const std::exception&) { for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) { const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); RemoveFile(ChunkedFilePath); } throw; } { Stopwatch VerifyTimer; std::unique_ptr ProgressBarPtr(m_LogOutput.CreateProgressBar("Verifying")); OperationLogOutput::ProgressBar& VerifyProgressBar(*ProgressBarPtr); std::atomic PauseFlag; ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); std::atomic DechunkedFilesMoved; for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) { Work.ScheduleWork(m_IOWorkerPool, [&, ChunkedFileIndex](std::atomic&) { const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); IoHash VerifyHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(ChunkedFilePath)); if (VerifyHash != ChunkedFileInfo.RawHash) { throw std::runtime_error( fmt::format("Dechunked file {} has mismatching hash {}", ChunkedFileInfo.RawHash, VerifyHash)); } std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkedFileInfo.RawHash); RenameFile(ChunkedFilePath, ChunkOutputPath); ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Moved dechunked file {} to '{}'", ChunkedFileInfo.RawHash, ChunkOutputPath); DechunkedFilesMoved++; }); } Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(IsAborted, IsPaused, PendingWork); std::string Details = fmt::format("{}/{} files verified", DechunkedFilesMoved.load(), ChunkedFileInfos.size()); VerifyProgressBar.UpdateState({.Task = "Verifying ", .Details = Details, .TotalCount = ChunkedFileInfos.size(), .RemainingCount = ChunkedFileInfos.size() - DechunkedFilesMoved.load(), .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); VerifyProgressBar.Finish(); if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Verified {} chunked file{} in {}", ChunkedFileInfos.size(), ChunkedFileInfos.size() == 1 ? "" : "s", NiceTimeSpanMs(VerifyTimer.GetElapsedTimeMs())); } } } if (!m_Options.IsQuiet) { ZEN_OPERATION_LOG_INFO(m_LogOutput, "Downloaded {} attachment{} to '{}' in {}", m_AttachmentHashes.size(), m_AttachmentHashes.size() == 1 ? "" : "s", m_Options.AttachmentOutputPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); } } // namespace zen