diff options
| author | Dan Engelbrecht <[email protected]> | 2025-11-07 12:27:44 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-11-07 12:27:44 +0100 |
| commit | 72b1797e2b65ad47f4dc8e9fab73b9aa170889b4 (patch) | |
| tree | 93c9f7c99965393ba9c6ec86c01b63899bfba684 /src | |
| parent | move progress bar to separate file (#638) (diff) | |
| download | zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.tar.xz zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.zip | |
get oplog attachments (#622)
* add support for downloading individual attachments from an oplog
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 293 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 319 | ||||
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.h | 16 | ||||
| -rw-r--r-- | src/zen/threadworkers.cpp | 79 | ||||
| -rw-r--r-- | src/zen/threadworkers.h | 43 | ||||
| -rw-r--r-- | src/zencore/basicfile.cpp | 24 | ||||
| -rw-r--r-- | src/zencore/include/zencore/basicfile.h | 2 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h | 106 | ||||
| -rw-r--r-- | src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h | 2 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/projectstoreoperations.cpp | 917 | ||||
| -rw-r--r-- | src/zenremotestore/projectstore/remoteprojectstore.cpp | 47 |
11 files changed, 1558 insertions, 290 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index 3a334ed15..4c427fc7f 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -41,6 +41,7 @@ #include <zenutil/zenserverprocess.h> #include "../progressbar.h" +#include "../threadworkers.h" #include <signal.h> #include <memory> @@ -267,95 +268,6 @@ namespace { static ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty; static bool AllowFileClone = true; - static std::unique_ptr<WorkerThreadPool> NetworkPoolInstance; - static std::unique_ptr<WorkerThreadPool> IOPoolInstance; - - WorkerThreadPool* NetworkPool = nullptr; - WorkerThreadPool* IOPool = nullptr; - - void InitializeWorkerPools(bool BoostWorkers) - { - if (SingleThreaded) - { - NetworkPool = &GetSyncWorkerPool(); - IOPool = &GetSyncWorkerPool(); - ZEN_CONSOLE("Not using thread workers for network or I/O, expect no progress reporting"); - } - else - { - unsigned int Cores = GetHardwareConcurrency(); - if (BoostWorkers) - { - // Cores Net Disk - // 2 4 3 - // 4 6 3 - // 8 11 6 - // 16 16 12 - // 32 16 24 - // 64 16 32 - // n 16 32 - - unsigned int NetworkThreadCount = Cores + (Cores / 4u) + Max(1u, (Cores / 8u)); - NetworkThreadCount = Min(NetworkThreadCount, 16u); - NetworkThreadCount = Max(NetworkThreadCount, 4u); - - unsigned int IOThreadCount = Cores - (Cores / 4u); - IOThreadCount = Min(IOThreadCount, 32u); - IOThreadCount = Max(IOThreadCount, 3u); - - NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net"); - IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk"); - - if (!IsQuiet) - { - ZEN_CONSOLE("Worker boost enabled, using {} network threads and {} worker/IO threads", - NetworkThreadCount, - IOThreadCount); - } - } - else - { - // Cores Net Disk - // 2 2 2 - // 4 3 2 - // 8 5 4 - // 16 10 9 - // 32 12 18 - // 64 12 20 - // n 12 20 - - unsigned int NetworkThreadCount = (Cores / 2u) + Max(1u, (Cores / 8u)); - NetworkThreadCount = Min(NetworkThreadCount, 12u); - NetworkThreadCount = Max(NetworkThreadCount, 2u); - - unsigned int IOThreadCount = Cores / 2u + Cores / 16u; - IOThreadCount = Min(IOThreadCount, 20u); - IOThreadCount = Max(IOThreadCount, 2u); - - NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net"); - IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk"); - - if (!IsQuiet) - { - ZEN_CONSOLE("Using {} network threads and {} worker/IO threads", NetworkThreadCount, IOThreadCount); - } - } - NetworkPool = NetworkPoolInstance.get(); - IOPool = IOPoolInstance.get(); - } - } - - WorkerThreadPool& GetIOWorkerPool() - { - ZEN_ASSERT(IOPool); - return *IOPool; - } - WorkerThreadPool& GetNetworkPool() - { - ZEN_ASSERT(NetworkPool); - return *NetworkPool; - } - #define ZEN_CONSOLE_VERBOSE(fmtstr, ...) \ if (IsVerbose) \ { \ @@ -496,12 +408,16 @@ namespace { return Count * 1000000 / ElapsedWallTimeUS; } - bool CleanAndRemoveDirectory(const std::filesystem::path& Directory) + bool CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool, const std::filesystem::path& Directory) { - return CleanAndRemoveDirectory(GetIOWorkerPool(), AbortFlag, PauseFlag, Directory); + return CleanAndRemoveDirectory(WorkerPool, AbortFlag, PauseFlag, Directory); } - void ValidateBuildPart(BuildStorageBase& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName) + void ValidateBuildPart(ThreadWorkers& Workers, + BuildStorageBase& Storage, + const Oid& BuildId, + Oid BuildPartId, + const std::string_view BuildPartName) { ZEN_TRACE_CPU("ValidateBuildPart"); @@ -513,8 +429,8 @@ namespace { Storage, AbortFlag, PauseFlag, - GetIOWorkerPool(), - GetNetworkPool(), + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), BuildId, BuildPartId, BuildPartName, @@ -532,7 +448,8 @@ namespace { NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000)); } - void UploadFolder(StorageInstance& Storage, + void UploadFolder(ThreadWorkers& Workers, + StorageInstance& Storage, const Oid& BuildId, const Oid& BuildPartId, const std::string_view BuildPartName, @@ -557,8 +474,8 @@ namespace { Storage, AbortFlag, PauseFlag, - GetIOWorkerPool(), - GetNetworkPool(), + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), BuildId, BuildPartId, BuildPartName, @@ -781,7 +698,8 @@ namespace { uint64_t VerifyElapsedWallTimeUs = 0; }; - void VerifyFolder(const ChunkedFolderContent& Content, + void VerifyFolder(ThreadWorkers& Workers, + const ChunkedFolderContent& Content, const ChunkedContentLookup& Lookup, const std::filesystem::path& Path, bool VerifyFileHash, @@ -793,7 +711,7 @@ namespace { ProgressBar ProgressBar(ProgressMode, "Verify Files"); - WorkerThreadPool& VerifyPool = GetIOWorkerPool(); + WorkerThreadPool& VerifyPool = Workers.GetIOWorkerPool(); ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog); @@ -973,7 +891,8 @@ namespace { } } - FolderContent GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats, + FolderContent GetValidFolderContent(ThreadWorkers& Workers, + GetFolderContentStatistics& LocalFolderScanStats, const std::filesystem::path& Path, std::span<const std::filesystem::path> PathsToCheck, std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback) @@ -998,7 +917,7 @@ namespace { while (PathIndex < PathCount) { uint32_t PathRangeCount = Min(128u, PathCount - PathIndex); - Work.ScheduleWork(GetIOWorkerPool(), + Work.ScheduleWork(Workers.GetIOWorkerPool(), [PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats]( std::atomic<bool>&) { for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount; @@ -1448,7 +1367,8 @@ namespace { return RemoteContent; } - ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats, + ChunkedFolderContent GetLocalContent(ThreadWorkers& Workers, + GetFolderContentStatistics& LocalFolderScanStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, const std::filesystem::path& StateFilePath, @@ -1469,7 +1389,7 @@ namespace { try { ReadStateFile(StateFilePath, LocalFolderState, LocalContent); - if (IsQuiet) + if (!IsQuiet) { ZEN_CONSOLE("Read local state file {} in {}", StateFilePath, NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs())); } @@ -1514,7 +1434,8 @@ namespace { { ProgressBar ProgressBar(ProgressMode, "Check Files"); OutLocalFolderContent = - GetValidFolderContent(LocalFolderScanStats, + GetValidFolderContent(Workers, + LocalFolderScanStats, Path, PathsToCheck, [&ProgressBar, &LocalFolderScanStats](uint64_t PathCount, uint64_t CompletedPathCount) { @@ -1571,7 +1492,7 @@ namespace { ChunkingStatistics LocalChunkingStats; ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( LocalChunkingStats, - GetIOWorkerPool(), + Workers.GetIOWorkerPool(), Path, UpdatedContent, ChunkController, @@ -1675,7 +1596,7 @@ namespace { ChunkingStatistics LocalChunkingStats; ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent( LocalChunkingStats, - GetIOWorkerPool(), + Workers.GetIOWorkerPool(), Path, UpdatedContent, ChunkController, @@ -1725,7 +1646,7 @@ namespace { ChunkingStatistics LocalChunkingStats; LocalContent = ChunkFolderContent( LocalChunkingStats, - GetIOWorkerPool(), + Workers.GetIOWorkerPool(), Path, OutLocalFolderContent, ChunkController, @@ -1761,6 +1682,7 @@ namespace { } ChunkedFolderContent ScanAndChunkFolder( + ThreadWorkers& Workers, GetFolderContentStatistics& GetFolderContentStats, ChunkingStatistics& ChunkingStats, const std::filesystem::path& Path, @@ -1777,7 +1699,7 @@ namespace { Path, std::move(IsAcceptedFolder), std::move(IsAcceptedFile), - GetIOWorkerPool(), + Workers.GetIOWorkerPool(), GetUpdateDelayMS(ProgressMode), [](bool, std::ptrdiff_t) {}, AbortFlag); @@ -1787,7 +1709,8 @@ namespace { } FolderContent _; - ChunkedFolderContent Result = GetLocalContent(GetFolderContentStats, + ChunkedFolderContent Result = GetLocalContent(Workers, + GetFolderContentStats, ChunkingStats, Path, ZenStateFilePath(Path / ZenFolderName), @@ -1830,7 +1753,8 @@ namespace { std::vector<std::string> ExcludeWildcards; }; - void DownloadFolder(StorageInstance& Storage, + void DownloadFolder(ThreadWorkers& Workers, + StorageInstance& Storage, const Oid& BuildId, const std::vector<Oid>& BuildPartIds, std::span<const std::string> BuildPartNames, @@ -1903,7 +1827,8 @@ namespace { ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{}); } - LocalContent = GetLocalContent(LocalFolderScanStats, + LocalContent = GetLocalContent(Workers, + LocalFolderScanStats, ChunkingStats, Path, ZenStateFilePath(Options.ZenFolderPath), @@ -1969,8 +1894,8 @@ namespace { Storage, AbortFlag, PauseFlag, - GetIOWorkerPool(), - GetNetworkPool(), + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), BuildId, Path, LocalContent, @@ -2010,7 +1935,7 @@ namespace { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Verify, TaskSteps::StepCount); - VerifyFolder(RemoteContent, RemoteLookup, Path, Options.PostDownloadVerify, VerifyFolderStats); + VerifyFolder(Workers, RemoteContent, RemoteLookup, Path, Options.PostDownloadVerify, VerifyFolderStats); Stopwatch WriteStateTimer; CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState, Path); @@ -2094,7 +2019,7 @@ namespace { ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount); - CleanAndRemoveDirectory(ZenTempFolder); + CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), ZenTempFolder); } void ListBuild(StorageInstance& Storage, @@ -2178,7 +2103,10 @@ namespace { } } - void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked) + void DiffFolders(ThreadWorkers& Workers, + const std::filesystem::path& BasePath, + const std::filesystem::path& ComparePath, + bool OnlyChunked) { ZEN_TRACE_CPU("DiffFolders"); @@ -2246,7 +2174,8 @@ namespace { GetFolderContentStatistics BaseGetFolderContentStats; ChunkingStatistics BaseChunkingStats; - BaseFolderContent = ScanAndChunkFolder(BaseGetFolderContentStats, + BaseFolderContent = ScanAndChunkFolder(Workers, + BaseGetFolderContentStats, BaseChunkingStats, BasePath, IsAcceptedFolder, @@ -2261,7 +2190,8 @@ namespace { GetFolderContentStatistics CompareGetFolderContentStats; ChunkingStatistics CompareChunkingStats; - CompareFolderContent = ScanAndChunkFolder(CompareGetFolderContentStats, + CompareFolderContent = ScanAndChunkFolder(Workers, + CompareGetFolderContentStats, CompareChunkingStats, ComparePath, IsAcceptedFolder, @@ -3391,7 +3321,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ? MakeSafeAbsolutePath(std::filesystem::current_path()) / ZenFolderName : MakeSafeAbsolutePath(m_ZenFolderPath); CreateDirectories(ZenFolderPath); - auto _ = MakeGuard([ZenFolderPath]() { CleanAndRemoveDirectory(ZenFolderPath); }); + auto _ = MakeGuard([ZenFolderPath]() { CleanAndRemoveDirectory(GetSmallWorkerPool(EWorkloadType::Burst), ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -3476,7 +3406,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); CreateDirectories(m_ZenFolderPath); - auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); }); + auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(GetSmallWorkerPool(EWorkloadType::Burst), m_ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -3537,7 +3467,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); CreateDirectories(m_ZenFolderPath); - auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); }); + auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(GetSmallWorkerPool(EWorkloadType::Burst), m_ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -3588,7 +3518,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } ZenState InstanceState; @@ -3604,7 +3538,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); CreateDirectories(m_ZenFolderPath); - auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); }); + auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -3633,7 +3567,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path TempDir = ZenTempFolderPath(m_ZenFolderPath); - UploadFolder(Storage, + UploadFolder(Workers, + Storage, BuildId, BuildPartId, m_BuildPartName, @@ -3651,7 +3586,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (m_PostUploadVerify) { - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); + ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); } } @@ -3705,7 +3640,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } ZenState InstanceState; @@ -3758,7 +3697,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) EPartialBlockRequestMode PartialBlockRequestMode = ParseAllowPartialBlockRequests(); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, BuildPartIds, BuildPartNames, @@ -3830,12 +3770,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } ParsePath(); ParseDiffPath(); - DiffFolders(m_Path, m_DiffPath, m_OnlyChunked); + DiffFolders(Workers, m_Path, m_DiffPath, m_OnlyChunked); if (AbortFlag) { throw std::runtime_error("Diff folders aborted"); @@ -3849,7 +3793,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } BuildStorageBase::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -3861,7 +3809,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); CreateDirectories(m_ZenFolderPath); - auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); }); + auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -3895,7 +3843,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) Storage, AbortFlag, PauseFlag, - GetNetworkPool(), + Workers.GetNetworkPool(), BuildId, AllBuildPartIds, BuildsOperationPrimeCache::Options{.IsQuiet = IsQuiet, @@ -3928,7 +3876,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } BuildStorageBase::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -3940,7 +3892,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); CreateDirectories(m_ZenFolderPath); - auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); }); + auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -3977,7 +3929,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) LogExecutableVersionAndPid(); } - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } ZenState InstanceState; @@ -3991,7 +3947,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); CreateDirectories(m_ZenFolderPath); - auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); }); + auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); }); StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, @@ -4011,7 +3967,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const Oid BuildPartId = m_BuildPartName.empty() ? Oid::Zero : ParseBuildPartId(); - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); + ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); if (AbortFlag) { @@ -4021,7 +3977,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_MultiTestDownloadOptions) { - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred(); CreateDirectories(m_SystemRootDir); @@ -4056,7 +4016,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { throw OptionParseException(fmt::format("'--build-id' ('{}') is malformed", BuildIdString), SubOption->help()); } - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {}, {}, @@ -4128,7 +4089,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_TestOptions) { - InitializeWorkerPools(m_BoostWorkerThreads); + ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded); + if (!IsQuiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred(); CreateDirectories(m_SystemRootDir); @@ -4140,7 +4105,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (m_OverrideHost.empty() && m_StoragePath.empty()) { m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred(); - CleanAndRemoveDirectory(m_StoragePath); + CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_StoragePath); CreateDirectories(m_StoragePath); m_StoragePath = m_StoragePath.generic_string(); } @@ -4161,10 +4126,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path DownloadPath2 = m_Path.parent_path() / (m_BuildPartName + "_test2"); const std::filesystem::path DownloadPath3 = m_Path.parent_path() / (m_BuildPartName + "_test3"); - auto ___ = MakeGuard([DownloadPath, DownloadPath2, DownloadPath3]() { - CleanAndRemoveDirectory(DownloadPath); - CleanAndRemoveDirectory(DownloadPath2); - CleanAndRemoveDirectory(DownloadPath3); + auto ___ = MakeGuard([&Workers, DownloadPath, DownloadPath2, DownloadPath3]() { + CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), DownloadPath); + CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), DownloadPath2); + CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), DownloadPath3); }); if (m_ZenFolderPath.empty()) @@ -4212,7 +4177,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) const std::filesystem::path UploadTempDir = UploadTempDirectory(m_Path); // std::filesystem::path UploadTempDir = m_ZenFolderPath / "upload_tmp"; - UploadFolder(Storage, + UploadFolder(Workers, + Storage, BuildId, BuildPartId, m_BuildPartName, @@ -4230,10 +4196,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error("Test aborted. (Upload build)"); } - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); + ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {BuildPartId}, {}, @@ -4254,7 +4221,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (identical target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {BuildPartId}, {}, @@ -4274,7 +4242,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error("Test aborted. (Re-download identical target)"); } - auto ScrambleDir = [](const std::filesystem::path& Path) { + auto ScrambleDir = [&Workers](const std::filesystem::path& Path) { ZEN_CONSOLE("\nScrambling '{}'", Path); Stopwatch Timer; DirectoryContent DownloadContent; @@ -4318,7 +4286,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SourceSize > 256) { Work.ScheduleWork( - GetIOWorkerPool(), + Workers.GetIOWorkerPool(), [SourceSize, FilePath = std::filesystem::path(FilePath)](std::atomic<bool>&) { if (!AbortFlag) { @@ -4367,7 +4335,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ScrambleDir(DownloadPath); ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {BuildPartId}, {}, @@ -4399,7 +4368,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView()); } - UploadFolder(Storage, + UploadFolder(Workers, + Storage, BuildId2, BuildPartId2, m_BuildPartName, @@ -4417,10 +4387,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) throw std::runtime_error("Test aborted. (Upload scrambled)"); } - ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); + ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName); ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {BuildPartId}, {}, @@ -4441,7 +4412,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId2, {BuildPartId2}, {}, @@ -4461,7 +4433,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId2, {BuildPartId2}, {}, @@ -4481,7 +4454,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath2); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {BuildPartId}, {}, @@ -4501,7 +4475,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath3); - DownloadFolder(Storage, + DownloadFolder(Workers, + Storage, BuildId, {BuildPartId}, {}, diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 6df317823..fe8c0d675 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -9,6 +9,7 @@ #include <zencore/filesystem.h> #include <zencore/fmtutils.h> #include <zencore/logging.h> +#include <zencore/parallelwork.h> #include <zencore/process.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> @@ -19,11 +20,17 @@ #include <zenhttp/httpclient.h> #include <zenhttp/httpclientauth.h> #include <zenhttp/httpcommon.h> +#include <zenremotestore/builds/buildstoragecache.h> #include <zenremotestore/builds/buildstorageutil.h> #include <zenremotestore/builds/jupiterbuildstorage.h> #include <zenremotestore/jupiter/jupiterhost.h> +#include <zenremotestore/operationlogoutput.h> +#include <zenremotestore/projectstore/projectstoreoperations.h> +#include <zenremotestore/projectstore/remoteprojectstore.h> +#include <zenutil/workerpools.h> #include "../progressbar.h" +#include "../threadworkers.h" ZEN_THIRD_PARTY_INCLUDES_START #include <json11.hpp> @@ -2212,9 +2219,6 @@ OplogDownloadCommand::OplogDownloadCommand() m_Options.add_option("", "", "system-dir", "Specify system root", cxxopts::value(m_SystemRootDir), "<systemdir>"); - m_Options.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>"); - m_Options.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>"); - auto AddCloudOptions = [this](cxxopts::Options& Ops) { m_AuthOptions.AddOptions(Ops); @@ -2232,16 +2236,58 @@ OplogDownloadCommand::OplogDownloadCommand() Ops.add_option("cloud build", "", "bucket", "Builds Storage bucket", cxxopts::value(m_Bucket), "<bucket>"); }; + auto AddCacheOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>"); + }; + AddCloudOptions(m_Options); + AddCacheOptions(m_Options); + + auto AddOutputOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>"); + Ops.add_option("output", + "", + "plain-progress", + "Show progress using plain output", + cxxopts::value(m_PlainProgress), + "<plainprogress>"); + Ops.add_option("output", + "", + "log-progress", + "Write @progress style progress to output", + cxxopts::value(m_LogProgress), + "<logprogress>"); + Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>"); + Ops.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>"); + }; + AddOutputOptions(m_Options); m_Options.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>"); + + m_Options.add_option("", "", "force", "Force download and disregard local cache", cxxopts::value(m_ForceDownload), "<force>"); + m_Options.add_option("", + "", + "boost-workers", + "Increase the number of worker threads - may cause computer to be less responsive", + cxxopts::value(m_BoostWorkerThreads), + "<boostworkers>"); + m_Options.add_option("", "", "decompress", "Decompress downloaded attachment", cxxopts::value(m_DecompressAttachments), "<decompress>"); + m_Options.add_option("", "", "output-path", "Path to oplog output, extension .json or .cb (compact binary). Default is output to console", - cxxopts::value(m_OutputPath), + cxxopts::value(m_OplogOutputPath), "<path>"); + m_Options.add_option("", + "", + "attachments", + "Comma separated list of attachments in RawHash for to download", + cxxopts::value(m_Attachments), + "<attachments>"); + m_Options.add_option("", "", "attachments-path", "Path to folder to write attachments to", cxxopts::value(m_AttachmentsPath), "<path>"); + m_Options.parse_positional({"cloud-url", "output-path"}); m_Options.positional_help("[<cloud-url> <output-path>]"); } @@ -2274,6 +2320,49 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a }; ParseSystemOptions(); + ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty; + + auto ParseOutputOptions = [&]() { + if (m_Verbose && m_Quiet) + { + throw OptionParseException("'--verbose' conflicts with '--quiet'", m_Options.help()); + } + if (m_LogProgress && m_PlainProgress) + { + throw OptionParseException("'--plain-progress' conflicts with '--log-progress'", m_Options.help()); + } + if (m_LogProgress && m_Quiet) + { + throw OptionParseException("'--quiet' conflicts with '--log-progress'", m_Options.help()); + } + if (m_PlainProgress && m_Quiet) + { + throw OptionParseException("'--quiet' conflicts with '--plain-progress'", m_Options.help()); + } + + if (m_LogProgress) + { + ProgressMode = ProgressBar::Mode::Log; + } + else if (m_PlainProgress) + { + ProgressMode = ProgressBar::Mode::Plain; + } + else if (m_Verbose) + { + ProgressMode = ProgressBar::Mode::Plain; + } + else if (m_Quiet) + { + ProgressMode = ProgressBar::Mode::Quiet; + } + else + { + ProgressMode = ProgressBar::Mode::Pretty; + } + }; + ParseOutputOptions(); + auto ParseStorageOptions = [&](bool RequireNamespace, bool RequireBucket) { if (!m_Url.empty()) { @@ -2316,13 +2405,25 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } }; + ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true); + + ThreadWorkers Workers(m_BoostWorkerThreads, /*SingleThreaded*/ false); + if (!m_Quiet) + { + ZEN_CONSOLE("{}", Workers.GetWorkersInfo()); + } + std::unique_ptr<AuthMgr> Auth; HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient", .AssumeHttp2 = m_AssumeHttp2, .AllowResume = true, .RetryCount = 2}; - ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true); + Oid BuildId = Oid::TryFromHexString(m_BuildId); + if (BuildId == Oid::Zero) + { + throw OptionParseException(fmt::format("'--build-id' ('{}') is malformed", m_BuildId), m_Options.help()); + } m_AuthOptions.ParseOptions(m_Options, m_SystemRootDir, @@ -2332,138 +2433,141 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a m_Quiet, /*Hidden*/ false); - BuildStorageResolveResult ResolveRes = ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, ""sv, ZenCacheResolveMode::Off); + BuildStorageResolveResult ResolveRes = + ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, m_ZenCacheHost, ZenCacheResolveMode::Discovery); -#if 0 - std::string BuildStorageName = ZEN_CLOUD_STORAGE; + BuildStorageBase::Statistics StorageStats; - std::string CloudHost; + StorageInstance Storage; - if (m_OverrideHost.empty()) - { - JupiterServerDiscovery Response = DiscoverJupiterEndpoints(m_Host, ClientSettings); + Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings); - if (Response.ServerEndPoints.empty()) - { - throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", m_Host)); - } - for (const JupiterServerDiscovery::EndPoint& ServerEndpoint : Response.ServerEndPoints) - { - if (!ServerEndpoint.BaseUrl.empty()) - { - if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2); - TestResult.Success) - { - CloudHost = ServerEndpoint.BaseUrl; - m_AssumeHttp2 = ServerEndpoint.AssumeHttp2; - BuildStorageName = ServerEndpoint.Name; - break; - } - else - { - ZEN_DEBUG("Unable to reach host {}. Reason: {}", ServerEndpoint.BaseUrl, TestResult.FailureReason); - } - } - } - if (CloudHost.empty()) - { - throw std::runtime_error( - fmt::format("Failed to find any usable builds hosts out of {} using {}", Response.ServerEndPoints.size(), m_Host)); - } - } - else if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(m_OverrideHost, m_AssumeHttp2); TestResult.Success) - { - CloudHost = m_OverrideHost; - } - else - { - throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.FailureReason)); - } -#endif // 0 + BuildStorageCache::Statistics StorageCacheStats; - Oid BuildId = Oid::TryFromHexString(m_BuildId); - if (BuildId == Oid::Zero) + std::atomic<bool> AbortFlag(false); + + if (!ResolveRes.CacheUrl.empty()) { - throw OptionParseException("'--build-id' is malformed", m_Options.help()); + Storage.CacheHttp = std::make_unique<HttpClient>(ResolveRes.CacheUrl, + HttpClientSettings{.LogCategory = "httpcacheclient", + .ConnectTimeout = std::chrono::milliseconds{3000}, + .Timeout = std::chrono::milliseconds{30000}, + .AssumeHttp2 = ResolveRes.CacheAssumeHttp2, + .AllowResume = true, + .RetryCount = 0}, + [&AbortFlag]() { return AbortFlag.load(); }); + Storage.CacheName = ResolveRes.CacheName; } - BuildStorageBase::Statistics StorageStats; - HttpClient BuildStorageHttp(ResolveRes.HostUrl, ClientSettings); - if (!m_Quiet) { std::string StorageDescription = - fmt::format("Cloud {}{}. Namespace '{}', Bucket '{}'", + fmt::format("Cloud {}{}. SessionId {}. Namespace '{}', Bucket '{}'", ResolveRes.HostName, (ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl), + Storage.BuildStorageHttp->GetSessionId(), m_Namespace, m_Bucket); ZEN_CONSOLE("Remote: {}", StorageDescription); - } - - std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + Oid::NewOid().ToString()); - std::unique_ptr<BuildStorageBase> BuildStorage = - CreateJupiterBuildStorage(Log(), BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath); + if (Storage.CacheHttp) + { + std::string CacheDescription = + fmt::format("Zen {}{}. SessionId {}. Namespace '{}', Bucket '{}'", + ResolveRes.CacheName, + (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl), + Storage.CacheHttp->GetSessionId(), + m_Namespace, + m_Bucket); - Stopwatch Timer; - CbObject BuildObject = BuildStorage->GetBuild(BuildId); - if (!m_Quiet) - { - ZEN_CONSOLE("Fetched {}/{}/{}/{} in {}", m_Url, m_Namespace, m_Bucket, BuildId, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + ZEN_CONSOLE("Cache : {}", CacheDescription); + } } - Timer.Reset(); + std::string FullBuildKey = fmt::format("{}_{}_{}", m_Namespace, m_Bucket, m_BuildId); + IoHash FullBuildKeyHash = IoHash::HashBuffer(FullBuildKey.data(), FullBuildKey.length()); - CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); - if (!PartsObject) - { - throw std::runtime_error( - fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, m_Url, m_Namespace, m_Bucket, m_BuildId)); - } + std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + FullBuildKeyHash.ToHexString()); - static const std::string_view OplogContainerPartName = "oplogcontainer"sv; + Storage.BuildStorage = + CreateJupiterBuildStorage(Log(), *Storage.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath); + Storage.StorageName = ResolveRes.HostName; - const Oid OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); - if (OplogBuildPartId == Oid::Zero) + if (Storage.CacheHttp) { - throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv, - m_Url, - m_Namespace, - m_Bucket, - m_BuildId, - OplogContainerPartName)); + Storage.BuildCacheStorage = CreateZenBuildStorageCache( + *Storage.CacheHttp, + StorageCacheStats, + m_Namespace, + m_Bucket, + StorageTempPath / "zencache", + false ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background)); } - CbObject ContainerObject = BuildStorage->GetBuildPart(BuildId, OplogBuildPartId); + std::unique_ptr<OperationLogOutput> OperationLogOutput(CreateConsoleLogOutput(ProgressMode)); - MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); - IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); - IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + ProjectStoreOperationOplogState State( + *OperationLogOutput, + Storage, + BuildId, + {.IsQuiet = m_Quiet, .IsVerbose = m_Verbose, .ForceDownload = m_ForceDownload, .TempFolderPath = StorageTempPath}); - CbValidateError ValidateResult = CbValidateError::None; - if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult); - ValidateResult == CbValidateError::None && ContainerObject) + const Oid OplogBuildPartId = State.GetBuildPartId(); + + if (!m_Attachments.empty()) { - if (!m_Quiet) + if (m_AttachmentsPath.empty()) { - ZEN_CONSOLE("Decompressed and validated oplog payload {} -> {} in {}", - NiceBytes(OpsSection.GetSize()), - NiceBytes(SectionObject.GetSize()), - NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + throw OptionParseException("'--attachments-path' is required when '--attachments' is given", m_Options.help()); } - if (m_OutputPath.empty()) + std::filesystem::path AttachmentsPath = MakeSafeAbsolutePath(m_AttachmentsPath); + CreateDirectories(AttachmentsPath); + std::vector<IoHash> AttachmentHashes; + AttachmentHashes.reserve(m_Attachments.size()); + for (const std::string& Attachment : m_Attachments) + { + IoHash RawHash; + if (!IoHash::TryParse(Attachment, RawHash)) + { + throw OptionParseException(fmt::format("'--attachments' ('{}') is malformed", Attachment), m_Options.help()); + } + AttachmentHashes.push_back(RawHash); + } + + std::atomic<bool> PauseFlag; + ProjectStoreOperationDownloadAttachments Op(*OperationLogOutput, + Storage, + AbortFlag, + PauseFlag, + Workers.GetIOWorkerPool(), + Workers.GetNetworkPool(), + State, + AttachmentHashes, + {.IsQuiet = m_Quiet, + .IsVerbose = m_Verbose, + .ForceDownload = m_ForceDownload, + .DecompressAttachments = m_DecompressAttachments, + .TempFolderPath = StorageTempPath, + .AttachmentOutputPath = m_AttachmentsPath}); + + Op.Execute(); + } + else + { + CbObjectView OpsSectionObject = State.LoadOpsSectionObject(); + + if (m_OplogOutputPath.empty()) { if (!m_Yes) { - if (OpsSection.GetSize() > 8u * 1024u * 1024u) + if (OpsSectionObject.GetSize() > 8u * 1024u * 1024u) { while (!m_Yes) { const std::string Prompt = fmt::format("Do you want to output an oplog of size {} to console? (yes/no) ", - NiceBytes(SectionObject.GetSize())); + NiceBytes(OpsSectionObject.GetSize())); printf("%s", Prompt.c_str()); std::string Reponse; std::getline(std::cin, Reponse); @@ -2480,7 +2584,7 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } } ExtendableStringBuilder<1024> SB; - SectionObject.ToJson(SB); + OpsSectionObject.ToJson(SB); ForEachStrTok(SB.ToView(), '\n', [](std::string_view Row) { ZEN_CONSOLE("{}", Row); return true; @@ -2488,17 +2592,17 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a } else { - Timer.Reset(); - const std::string Extension = ToLower(m_OutputPath.extension().string()); + Stopwatch Timer; + const std::string Extension = ToLower(m_OplogOutputPath.extension().string()); if (Extension == ".cb" || Extension == ".cbo") { - WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SectionObject.GetView().GetData(), SectionObject.GetSize())); + WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, OpsSectionObject.GetView().GetData(), OpsSectionObject.GetSize())); } else if (Extension == ".json") { ExtendableStringBuilder<1024> SB; - SectionObject.ToJson(SB); - WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); + OpsSectionObject.ToJson(SB); + WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size())); } else { @@ -2507,17 +2611,12 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a if (!m_Quiet) { ZEN_CONSOLE("Wrote {} to '{}' in {}", - NiceBytes(FileSizeFromPath(m_OutputPath)), - m_OutputPath, + NiceBytes(FileSizeFromPath(m_OplogOutputPath)), + m_OplogOutputPath, NiceTimeSpanMs(Timer.GetElapsedTimeMs())); } } } - else - { - throw std::runtime_error( - fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); - } } } // namespace zen diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h index 58002a533..6141a7bce 100644 --- a/src/zen/cmds/projectstore_cmd.h +++ b/src/zen/cmds/projectstore_cmd.h @@ -288,8 +288,11 @@ private: std::filesystem::path m_SystemRootDir; - bool m_Quiet = false; - bool m_Yes = false; + bool m_Quiet = false; + bool m_Verbose = false; + bool m_Yes = false; + bool m_PlainProgress = false; + bool m_LogProgress = false; AuthCommandLineOptions m_AuthOptions; @@ -299,11 +302,18 @@ private: std::string m_Url; bool m_AssumeHttp2 = false; bool m_AllowRedirect = false; + std::string m_ZenCacheHost; std::string m_Namespace; std::string m_Bucket; std::string m_BuildId; + bool m_ForceDownload = false; + bool m_BoostWorkerThreads = false; + + std::filesystem::path m_OplogOutputPath; + bool m_DecompressAttachments = true; - std::filesystem::path m_OutputPath; + std::vector<std::string> m_Attachments; + std::filesystem::path m_AttachmentsPath; }; } // namespace zen diff --git a/src/zen/threadworkers.cpp b/src/zen/threadworkers.cpp new file mode 100644 index 000000000..2220b9c95 --- /dev/null +++ b/src/zen/threadworkers.cpp @@ -0,0 +1,79 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "threadworkers.h" + +#include <zencore/intmath.h> +#include <zencore/logging.h> +#include <zencore/thread.h> +#include <zenutil/workerpools.h> + +////////////////////////////////////////////////////////////////////////// + +namespace zen { + +ThreadWorkers::ThreadWorkers(bool BoostWorkers, bool SingleThreaded) +{ + if (SingleThreaded) + { + m_NetworkPool = &GetSyncWorkerPool(); + m_IOPool = &GetSyncWorkerPool(); + WorkersInfo = "Not using thread workers for network or I/O, expect no progress reporting"; + } + else + { + unsigned int Cores = GetHardwareConcurrency(); + if (BoostWorkers) + { + // Cores Net Disk + // 2 4 3 + // 4 6 3 + // 8 11 6 + // 16 16 12 + // 32 16 24 + // 64 16 32 + // n 16 32 + + unsigned int NetworkThreadCount = Cores + (Cores / 4u) + Max(1u, (Cores / 8u)); + NetworkThreadCount = Min(NetworkThreadCount, 16u); + NetworkThreadCount = Max(NetworkThreadCount, 4u); + + unsigned int IOThreadCount = Cores - (Cores / 4u); + IOThreadCount = Min(IOThreadCount, 32u); + IOThreadCount = Max(IOThreadCount, 3u); + + m_NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net"); + m_IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk"); + + WorkersInfo = + fmt::format("Worker boost enabled, using {} network threads and {} worker/IO threads", NetworkThreadCount, IOThreadCount); + } + else + { + // Cores Net Disk + // 2 2 2 + // 4 3 2 + // 8 5 4 + // 16 10 9 + // 32 12 18 + // 64 12 20 + // n 12 20 + + unsigned int NetworkThreadCount = (Cores / 2u) + Max(1u, (Cores / 8u)); + NetworkThreadCount = Min(NetworkThreadCount, 12u); + NetworkThreadCount = Max(NetworkThreadCount, 2u); + + unsigned int IOThreadCount = Cores / 2u + Cores / 16u; + IOThreadCount = Min(IOThreadCount, 20u); + IOThreadCount = Max(IOThreadCount, 2u); + + m_NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net"); + m_IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk"); + + WorkersInfo = fmt::format("Using {} network threads and {} worker/IO threads", NetworkThreadCount, IOThreadCount); + } + m_NetworkPool = m_NetworkPoolInstance.get(); + m_IOPool = m_IOPoolInstance.get(); + } +} + +} // namespace zen diff --git a/src/zen/threadworkers.h b/src/zen/threadworkers.h new file mode 100644 index 000000000..bb80650a2 --- /dev/null +++ b/src/zen/threadworkers.h @@ -0,0 +1,43 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/timer.h> +#include <zencore/zencore.h> + +#include <memory> +#include <string> + +namespace zen { + +class WorkerThreadPool; + +class ThreadWorkers +{ +public: + ThreadWorkers(bool BoostWorkers, bool SingleThreaded); + + WorkerThreadPool& GetIOWorkerPool() + { + ZEN_ASSERT(m_IOPool); + return *m_IOPool; + } + WorkerThreadPool& GetNetworkPool() + { + ZEN_ASSERT(m_NetworkPool); + return *m_NetworkPool; + } + + const std::string& GetWorkersInfo() const { return WorkersInfo; } + +private: + WorkerThreadPool* m_NetworkPool = nullptr; + WorkerThreadPool* m_IOPool = nullptr; + + std::unique_ptr<WorkerThreadPool> m_NetworkPoolInstance; + std::unique_ptr<WorkerThreadPool> m_IOPoolInstance; + + std::string WorkersInfo; +}; + +} // namespace zen diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp index 6989da67e..2fa02937d 100644 --- a/src/zencore/basicfile.cpp +++ b/src/zencore/basicfile.cpp @@ -513,6 +513,30 @@ TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, } } +void +TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data) +{ + std::error_code Ec; + SafeWriteFile(Path, Data, Ec); + if (Ec) + { + throw std::system_error(Ec, fmt::format("Failed to safely write file '{}'", Path)); + } +} + +void +TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data, std::error_code& OutEc) +{ + TemporaryFile TempFile; + if (TempFile.CreateTemporary(Path.parent_path(), OutEc); !OutEc) + { + if (TempFile.Write(Data, 0, OutEc); !OutEc) + { + TempFile.MoveTemporaryIntoPlace(Path, OutEc); + } + } +} + ////////////////////////////////////////////////////////////////////////// LockFile::LockFile() diff --git a/src/zencore/include/zencore/basicfile.h b/src/zencore/include/zencore/basicfile.h index 465499d2b..f5c82b8fe 100644 --- a/src/zencore/include/zencore/basicfile.h +++ b/src/zencore/include/zencore/basicfile.h @@ -107,6 +107,8 @@ public: static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data); static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc); + static void SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data); + static void SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data, std::error_code& OutEc); private: void Close(); diff --git a/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h new file mode 100644 index 000000000..2ddd0f11d --- /dev/null +++ b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h @@ -0,0 +1,106 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <zencore/iohash.h> +#include <zencore/logging.h> +#include <zencore/uid.h> +#include <zencore/zencore.h> +#include <zenremotestore/builds/buildstoragecache.h> +#include <zenremotestore/chunking/chunkedcontent.h> +#include <zenutil/bufferedwritefilecache.h> + +#include <atomic> +#include <memory> + +ZEN_THIRD_PARTY_INCLUDES_START +#include <tsl/robin_set.h> +ZEN_THIRD_PARTY_INCLUDES_END + +namespace zen { + +class BuildStorageBase; +class OperationLogOutput; +struct StorageInstance; + +class ProjectStoreOperationOplogState +{ +public: + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + bool ForceDownload = false; + std::filesystem::path TempFolderPath; + }; + + ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput, + StorageInstance& Storage, + const Oid& BuildId, + const Options& Options); + + CbObjectView LoadBuildObject(); + CbObjectView LoadBuildPartsObject(); + CbObjectView LoadOpsSectionObject(); + CbArrayView LoadBlocksArray(); + CbArrayView LoadChunkedFilesArray(); + CbArrayView LoadChunksArray(); + CbArray LoadArrayFromBuildPart(std::string_view ArrayName); + + const Oid& GetBuildId(); + const Oid& GetBuildPartId(); + +private: + OperationLogOutput& m_LogOutput; + StorageInstance& m_Storage; + const Oid m_BuildId; + const Options m_Options; + + Oid m_BuildPartId; + CbObject m_BuildObject; + CbObject m_BuildPartsObject; + CbObject m_OpsSectionObject; + CbArray m_BlocksArray; + CbArray m_ChunkedFilesArray; + CbArray m_ChunksArray; +}; + +class ProjectStoreOperationDownloadAttachments +{ +public: + struct Options + { + bool IsQuiet = false; + bool IsVerbose = false; + bool ForceDownload = false; + bool DecompressAttachments = true; + std::filesystem::path TempFolderPath; + std::filesystem::path AttachmentOutputPath; + }; + + ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + ProjectStoreOperationOplogState& State, + std::span<const IoHash> AttachmentHashes, + const Options& Options); + + void Execute(); + +private: + OperationLogOutput& m_LogOutput; + StorageInstance& m_Storage; + std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; + WorkerThreadPool& m_IOWorkerPool; + WorkerThreadPool& m_NetworkPool; + + ProjectStoreOperationOplogState& m_State; + const tsl::robin_set<IoHash, IoHash::Hasher> m_AttachmentHashes; + const Options m_Options; +}; + +} // namespace zen diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h index 4740e7029..de858f818 100644 --- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h +++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h @@ -180,6 +180,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore, std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject); std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes); +ChunkedInfo ReadChunkedInfo(CbObjectView ChunkedFile); + void remoteprojectstore_forcelink(); } // namespace zen diff --git a/src/zenremotestore/projectstore/projectstoreoperations.cpp b/src/zenremotestore/projectstore/projectstoreoperations.cpp new file mode 100644 index 000000000..7dd85531c --- /dev/null +++ b/src/zenremotestore/projectstore/projectstoreoperations.cpp @@ -0,0 +1,917 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include <zenremotestore/projectstore/projectstoreoperations.h> + +#include <zencore/compactbinaryutil.h> +#include <zencore/parallelwork.h> +#include <zencore/scopeguard.h> +#include <zencore/timer.h> +#include <zenremotestore/builds/buildstorageutil.h> +#include <zenremotestore/chunking/chunkedfile.h> +#include <zenremotestore/operationlogoutput.h> +#include <zenremotestore/projectstore/remoteprojectstore.h> + +namespace zen { + +using namespace std::literals; + +//////////////////////////// ProjectStoreOperationOplogState + +ProjectStoreOperationOplogState::ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput, + StorageInstance& Storage, + const Oid& BuildId, + const Options& Options) +: m_LogOutput(OperationLogOutput) +, m_Storage(Storage) +, m_BuildId(BuildId) +, m_Options(Options) +{ +} + +CbObjectView +ProjectStoreOperationOplogState::LoadBuildObject() +{ + if (!m_BuildObject) + { + const std::filesystem::path CachedBuildObjectPath = m_Options.TempFolderPath / "build.cbo"; + if (!m_Options.ForceDownload && IsFile(CachedBuildObjectPath)) + { + Stopwatch Timer; + CbValidateError Error; + m_BuildObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildObjectPath), Error); + if (Error != CbValidateError::None) + { + RemoveFile(CachedBuildObjectPath); + m_BuildObject = {}; + } + else + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read build {} from locally cached file in {}", + m_BuildId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + return m_BuildObject; + } + } + + Stopwatch Timer; + m_BuildObject = m_Storage.BuildStorage->GetBuild(m_BuildId); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Fetched build {} from {} in {}", + m_BuildId, + m_Storage.BuildStorageHttp->GetBaseUri(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + CreateDirectories(CachedBuildObjectPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedBuildObjectPath, m_BuildObject.GetBuffer().GetView()); + } + return m_BuildObject; +} + +const Oid& +ProjectStoreOperationOplogState::GetBuildId() +{ + return m_BuildId; +} + +const Oid& +ProjectStoreOperationOplogState::GetBuildPartId() +{ + if (m_BuildPartId == Oid::Zero) + { + CbObjectView BuildObject = LoadBuildObject(); + CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView(); + if (!PartsObject) + { + throw std::runtime_error(fmt::format("The build {} payload does not contain a 'parts' object"sv, m_BuildId)); + } + static const std::string_view OplogContainerPartName = "oplogcontainer"sv; + m_BuildPartId = PartsObject[OplogContainerPartName].AsObjectId(); + if (m_BuildPartId == Oid::Zero) + { + throw std::runtime_error( + fmt::format("The build {} payload 'parts' object does not contain a '{}' entry"sv, m_BuildId, OplogContainerPartName)); + } + } + return m_BuildPartId; +} + +CbObjectView +ProjectStoreOperationOplogState::LoadBuildPartsObject() +{ + if (!m_BuildPartsObject) + { + const Oid BuildPartId = GetBuildPartId(); + const std::filesystem::path CachedBuildPartObjectPath = m_Options.TempFolderPath / fmt::format("{}_part.cbo", BuildPartId); + if (!m_Options.ForceDownload && IsFile(CachedBuildPartObjectPath)) + { + Stopwatch Timer; + CbValidateError Error; + m_BuildPartsObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildPartObjectPath), Error); + if (Error != CbValidateError::None) + { + RemoveFile(CachedBuildPartObjectPath); + m_BuildPartsObject = {}; + } + else + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read build part {}/{} from locally cached file in {}", + m_BuildId, + BuildPartId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + return m_BuildPartsObject; + } + } + + Stopwatch Timer; + m_BuildPartsObject = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Fetched build part {}/{} from {} in {}", + m_BuildId, + BuildPartId, + m_Storage.BuildStorageHttp->GetBaseUri(), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + CreateDirectories(CachedBuildPartObjectPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedBuildPartObjectPath, m_BuildPartsObject.GetBuffer().GetView()); + } + return m_BuildPartsObject; +} + +CbObjectView +ProjectStoreOperationOplogState::LoadOpsSectionObject() +{ + if (!m_OpsSectionObject) + { + const Oid BuildPartId = GetBuildPartId(); + const std::filesystem::path CachedOpsSectionPath = m_Options.TempFolderPath / fmt::format("{}_part_ops.cbo", BuildPartId); + if (!m_Options.ForceDownload && IsFile(CachedOpsSectionPath)) + { + Stopwatch Timer; + CbValidateError Error; + m_OpsSectionObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedOpsSectionPath), Error); + if (Error != CbValidateError::None) + { + RemoveFile(CachedOpsSectionPath); + m_OpsSectionObject = {}; + } + else if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read {}/{}/ops from locally cached file in {}", + BuildPartId, + m_BuildId, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + return m_OpsSectionObject; + } + } + CbObjectView ContainerObject = LoadBuildPartsObject(); + + Stopwatch Timer; + + MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView(); + IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize()); + IoBuffer OpsSectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer(); + + CbValidateError ValidateResult = CbValidateError::None; + m_OpsSectionObject = ValidateAndReadCompactBinaryObject(std::move(OpsSectionPayload), ValidateResult); + if (ValidateResult != CbValidateError::None) + { + throw std::runtime_error( + fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult))); + } + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Decompressed and validated oplog payload {} -> {} in {}", + NiceBytes(OpsSection.GetSize()), + NiceBytes(m_OpsSectionObject.GetSize()), + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + if (m_OpsSectionObject) + { + CreateDirectories(CachedOpsSectionPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedOpsSectionPath, m_OpsSectionObject.GetBuffer().GetView()); + } + } + return m_OpsSectionObject; +} + +CbArray +ProjectStoreOperationOplogState::LoadArrayFromBuildPart(std::string_view ArrayName) +{ + const Oid BuildPartId = GetBuildPartId(); + const std::filesystem::path CachedPartArrayPath = m_Options.TempFolderPath / fmt::format("{}_part_{}.cba", BuildPartId, ArrayName); + if (!m_Options.ForceDownload && IsFile(CachedPartArrayPath)) + { + Stopwatch Timer; + IoBuffer Payload = IoBufferBuilder::MakeFromFile(CachedPartArrayPath); + CbValidateError Error = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default); + if (Error != CbValidateError::None) + { + RemoveFile(CachedPartArrayPath); + } + else + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Read {}/{}/{} from locally cached file in {}", + BuildPartId, + m_BuildId, + ArrayName, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + CbArray Result = CbArray(SharedBuffer(std::move(Payload))); + return Result; + } + } + + CbObjectView ContainerObject = LoadBuildPartsObject(); + + Stopwatch Timer; + + CbArrayView ArrayView = ContainerObject[ArrayName].AsArrayView(); + + CbArray Result; + { + CbWriter Writer; + Writer << ArrayView; + SharedBuffer ArrayBuffer = SharedBuffer::MakeView(Writer.Save().GetView()).MakeOwned(); + Result = CbArray(std::move(ArrayBuffer)); + } + + CreateDirectories(CachedPartArrayPath.parent_path()); + TemporaryFile::SafeWriteFile(CachedPartArrayPath, Result.GetBuffer().GetView()); + + return Result; +} + +CbArrayView +ProjectStoreOperationOplogState::LoadBlocksArray() +{ + if (!m_BlocksArray) + { + m_BlocksArray = LoadArrayFromBuildPart("blocks"sv); + } + return m_BlocksArray; +} + +CbArrayView +ProjectStoreOperationOplogState::LoadChunkedFilesArray() +{ + if (!m_ChunkedFilesArray) + { + m_ChunkedFilesArray = LoadArrayFromBuildPart("chunkedfiles"sv); + } + return m_ChunkedFilesArray; +} + +CbArrayView +ProjectStoreOperationOplogState::LoadChunksArray() +{ + if (!m_ChunksArray) + { + m_ChunksArray = LoadArrayFromBuildPart("chunks"sv); + } + return m_ChunksArray; +} + +//////////////////////////// ProjectStoreOperationDownloadAttachments + +ProjectStoreOperationDownloadAttachments::ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput, + StorageInstance& Storage, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag, + WorkerThreadPool& IOWorkerPool, + WorkerThreadPool& NetworkPool, + ProjectStoreOperationOplogState& State, + std::span<const IoHash> AttachmentHashes, + const Options& Options) +: m_LogOutput(OperationLogOutput) +, m_Storage(Storage) +, m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_IOWorkerPool(IOWorkerPool) +, m_NetworkPool(NetworkPool) +, m_State(State) +, m_AttachmentHashes(AttachmentHashes.begin(), AttachmentHashes.end()) +, m_Options(Options) +{ +} + +void +ProjectStoreOperationDownloadAttachments::Execute() +{ + enum class TaskSteps : uint32_t + { + ReadAttachmentData, + Download, + AnalyzeDechunk, + Dechunk, + Cleanup, + StepCount + }; + + auto EndProgress = + MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); }); + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ReadAttachmentData, (uint32_t)TaskSteps::StepCount); + + Stopwatch Timer; + tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> ChunkSizes; + + std::vector<ChunkedInfo> ChunkedFileInfos; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> FilesToDechunk; + tsl::robin_set<IoHash, IoHash::Hasher> ChunkedFileRawHashes; + { + CbArrayView ChunkedFilesArray = m_State.LoadChunkedFilesArray(); + FilesToDechunk.reserve(ChunkedFilesArray.Num()); + for (CbFieldView ChunkedFileView : ChunkedFilesArray) + { + CbObjectView ChunkedFile = ChunkedFileView.AsObjectView(); + IoHash ChunkedRawHash = ChunkedFile["rawhash"sv].AsHash(); + if (m_AttachmentHashes.contains(ChunkedRawHash)) + { + ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFile); + ChunkSizes.insert_or_assign(Chunked.RawHash, Chunked.RawSize); + ChunkedFileRawHashes.reserve(ChunkedFileRawHashes.size() + Chunked.ChunkHashes.size()); + for (const IoHash& ChunkHash : Chunked.ChunkHashes) + { + ChunkedFileRawHashes.insert(ChunkHash); + } + FilesToDechunk.insert_or_assign(Chunked.RawHash, ChunkedFileInfos.size()); + ChunkedFileInfos.emplace_back(std::move(Chunked)); + } + } + } + + tsl::robin_set<IoHash, IoHash::Hasher> LooseChunksToDownload; + { + CbArrayView ChunksArray = m_State.LoadChunksArray(); + LooseChunksToDownload.reserve(ChunksArray.Num()); + for (CbFieldView Chunk : ChunksArray) + { + const IoHash ChunkHash = Chunk.AsAttachment(); + if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) + { + LooseChunksToDownload.insert(ChunkHash); + } + } + } + + tsl::robin_set<IoHash, IoHash::Hasher> BlocksToDownload; + tsl::robin_map<IoHash, IoHash, IoHash::Hasher> ChunkToBlock; + + { + CbArrayView BlocksArray = m_State.LoadBlocksArray(); + + for (CbFieldView BlockView : BlocksArray) + { + CbObjectView Block = BlockView.AsObjectView(); + IoHash BlockHash = Block["rawhash"sv].AsBinaryAttachment(); + if (BlockHash == IoHash::Zero) + { + CbArrayView ChunksArray = Block["chunks"sv].AsArrayView(); + for (CbFieldView ChunkHashView : ChunksArray) + { + const IoHash ChunkHash = ChunkHashView.AsAttachment(); + if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) + { + LooseChunksToDownload.insert(ChunkHash); + } + } + } + else + { + CbArrayView ChunksArray = Block["chunks"sv].AsArrayView(); + for (CbFieldView ChunkHashView : ChunksArray) + { + const IoHash ChunkHash = ChunkHashView.AsHash(); + if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash)) + { + ChunkToBlock.insert_or_assign(ChunkHash, BlockHash); + BlocksToDownload.insert(BlockHash); + } + } + } + } + } + + if (!m_Options.IsQuiet) + { + std::string DechunkInfo = + FilesToDechunk.size() > 0 + ? fmt::format("\n{} file{} needs to be dechunked", FilesToDechunk.size(), FilesToDechunk.size() == 1 ? "" : "s") + : ""; + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Need to download {} block{} and {} chunk{}{}", + BlocksToDownload.size(), + BlocksToDownload.size() == 1 ? "" : "s", + LooseChunksToDownload.size(), + LooseChunksToDownload.size() == 1 ? "" : "s", + DechunkInfo); + } + + auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) { + IoBuffer Payload; + if (m_Storage.BuildCacheStorage) + { + Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); + } + if (!Payload) + { + Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash); + if (m_Storage.BuildCacheStorage) + { + m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(), + RawHash, + Payload.GetContentType(), + CompositeBuffer(SharedBuffer(Payload))); + } + } + uint64_t PayloadSize = Payload.GetSize(); + IoBufferFileReference FileRef; + if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == PayloadSize)) + { + std::error_code Ec; + std::filesystem::path TempPayloadPath = PathFromHandle(FileRef.FileHandle, Ec); + if (!Ec) + { + Payload.SetDeleteOnClose(false); + Payload = {}; + RenameFile(TempPayloadPath, OutputPath, Ec); + if (Ec) + { + // Re-open the temp file again + BasicFile OpenTemp(TempPayloadPath, BasicFile::Mode::kDelete); + Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, PayloadSize, true); + Payload.SetDeleteOnClose(true); + } + } + } + if (Payload) + { + TemporaryFile::SafeWriteFile(OutputPath, Payload.GetView()); + } + }; + + std::filesystem::path TempAttachmentPath = MakeSafeAbsolutePath(m_Options.AttachmentOutputPath) / ".tmp"; + CreateDirectories(TempAttachmentPath); + auto _0 = MakeGuard([this, &TempAttachmentPath]() { + if (true) + { + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, "Cleaning up temporary directory"); + } + CleanDirectory(TempAttachmentPath, true); + RemoveDir(TempAttachmentPath); + } + }); + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Download, (uint32_t)TaskSteps::StepCount); + + std::filesystem::path BlocksPath = TempAttachmentPath / "blocks"; + CreateDirectories(BlocksPath); + + { + Stopwatch DownloadTimer; + + std::filesystem::path LooseChunksPath = TempAttachmentPath / "loosechunks"; + CreateDirectories(LooseChunksPath); + + std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading")); + OperationLogOutput::ProgressBar& DownloadProgressBar(*ProgressBarPtr); + + std::atomic<bool> PauseFlag; + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<size_t> LooseChunksCompleted; + std::atomic<size_t> BlocksCompleted; + + std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end()); + + for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++) + { + Work.ScheduleWork(m_NetworkPool, [&, LooseChunkIndex](std::atomic<bool>&) { + const IoHash RawHash = LooseChunks[LooseChunkIndex]; + std::filesystem::path LooseChunkOutputPath = LooseChunksPath / fmt::format("{}.ucb", RawHash); + if (m_Options.ForceDownload || !IsFile(LooseChunkOutputPath)) + { + GetBuildBlob(RawHash, LooseChunkOutputPath); + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded chunk {}", RawHash); + } + + Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex, LooseChunkOutputPath](std::atomic<bool>&) { + const IoHash RawHash = LooseChunks[LooseChunkIndex]; + IoHash ChunkRawHash; + uint64_t ChunkRawSize; + CompressedBuffer CompressedChunk = + CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(LooseChunkOutputPath)), + ChunkRawHash, + ChunkRawSize); + if (!CompressedChunk) + { + throw std::runtime_error(fmt::format("Downloaded chunk {} is malformed", RawHash)); + } + if (ChunkRawHash != RawHash) + { + throw std::runtime_error(fmt::format("Downloaded chunk {} mismatches content hash {}", RawHash, ChunkRawHash)); + } + + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash); + if (m_Options.DecompressAttachments) + { + BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate); + if (!CompressedChunk.DecompressToStream( + 0, + ChunkRawSize, + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { + ZEN_UNUSED(SourceOffset); + ZEN_UNUSED(SourceSize); + ChunkOutput.Write(Range, Offset); + return true; + })) + { + ChunkOutput.Close(); + RemoveFile(ChunkOutputPath); + throw std::runtime_error(fmt::format("Failed to decompress chunk {} to ", RawHash, ChunkOutputPath)); + } + } + else + { + TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed()); + } + + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote loose chunk {} to '{}'", RawHash, ChunkOutputPath); + LooseChunksCompleted++; + }); + }); + } + + std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end()); + + RwLock ChunkSizesLock; + for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++) + { + Work.ScheduleWork(m_NetworkPool, [&, BlockIndex](std::atomic<bool>&) { + const IoHash& RawHash = Blocks[BlockIndex]; + std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", RawHash); + if (m_Options.ForceDownload || !IsFile(BlockOutputPath)) + { + GetBuildBlob(RawHash, BlockOutputPath); + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded block {}", RawHash); + } + + Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex, BlockOutputPath](std::atomic<bool>&) { + const IoHash& BlockRawHash = Blocks[BlockIndex]; + + SharedBuffer BlockBuffer = + CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress(); + + uint64_t HeaderSize = 0; + if (IterateChunkBlock( + SharedBuffer(BlockBuffer), + [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) { + if (m_AttachmentHashes.contains(ChunkHash)) + { + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkHash); + + if (m_Options.DecompressAttachments) + { + BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate); + if (!CompressedChunk.DecompressToStream(0u, + ~uint64_t(0), + [&](uint64_t SourceOffset, + uint64_t SourceSize, + uint64_t Offset, + const CompositeBuffer& Range) { + ZEN_UNUSED(SourceOffset); + ZEN_UNUSED(SourceSize); + ChunkOutput.Write(Range, Offset); + return true; + })) + { + ChunkOutput.Close(); + RemoveFile(ChunkOutputPath); + throw std::runtime_error( + fmt::format("Failed to decompress chunk {} to ", ChunkHash, ChunkOutputPath)); + } + } + else + { + TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed()); + } + + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote block chunk {} to '{}'", ChunkHash, ChunkOutputPath); + } + if (ChunkedFileRawHashes.contains(ChunkHash)) + { + uint64_t RawSize = CompressedChunk.DecodeRawSize(); + ChunkSizesLock.WithExclusiveLock([&]() { ChunkSizes.insert_or_assign(ChunkHash, RawSize); }); + } + }, + HeaderSize)) + { + } + else + { + throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash)); + } + BlocksCompleted++; + }); + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused, PendingWork); + + std::string Details = fmt::format("{}/{} blocks, {}/{} chunks downloaded", + BlocksCompleted.load(), + BlocksToDownload.size(), + LooseChunksCompleted.load(), + LooseChunksToDownload.size()); + DownloadProgressBar.UpdateState({.Task = "Downloading", + .Details = Details, + .TotalCount = BlocksToDownload.size() + LooseChunksToDownload.size(), + .RemainingCount = BlocksToDownload.size() + LooseChunksToDownload.size() - + (BlocksCompleted.load() + LooseChunksCompleted.load()), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + DownloadProgressBar.Finish(); + + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "{} block{} downloaded, {} loose chunk{} downloaded in {}", + BlocksToDownload.size(), + BlocksToDownload.size() == 1 ? "" : "s", + LooseChunksToDownload.size(), + LooseChunksToDownload.size() == 1 ? "" : "s", + NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs())); + } + } + + if (!ChunkedFileInfos.empty()) + { + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::AnalyzeDechunk, (uint32_t)TaskSteps::StepCount); + + std::filesystem::path ChunkedFilesPath = TempAttachmentPath / "chunkedfiles"; + CreateDirectories(ChunkedFilesPath); + + try + { + std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Dechunking")); + OperationLogOutput::ProgressBar& DechunkingProgressBar(*ProgressBarPtr); + + std::atomic<uint64_t> ChunksWritten; + + std::vector<std::unique_ptr<BasicFile>> OpenChunkedFiles; + struct ChunkOpenFileTarget + { + size_t OpenChunkedFileIndex = 0; + uint64_t Offset = 0; + }; + + std::vector<ChunkOpenFileTarget> ChunkOpenFileTargets; + + std::vector<eastl::fixed_vector<size_t, 1>> Targets; + tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkedFileTargetLookup; + + // Build up file and offset targets for each chunk + for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) + { + const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; + + const size_t OpenChunkedFileIndex = OpenChunkedFiles.size(); + + uint64_t Offset = 0; + for (uint32_t ChunkIndex : ChunkedFileInfo.ChunkSequence) + { + const IoHash& ChunkHash = ChunkedFileInfo.ChunkHashes[ChunkIndex]; + auto ChunkSizeIt = ChunkSizes.find(ChunkHash); + if (ChunkSizeIt == ChunkSizes.end()) + { + throw std::runtime_error( + fmt::format("Missing chunk {} to build chunked file {}", ChunkHash, ChunkedFileInfo.RawHash)); + } + const uint64_t ChunkSize = ChunkSizeIt->second; + if (auto ChunkTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash); + ChunkTargetLookupIt != ChunkedFileTargetLookup.end()) + { + size_t TargetIndex = ChunkTargetLookupIt->second; + eastl::fixed_vector<size_t, 1>& Target = Targets[TargetIndex]; + Target.push_back(ChunkOpenFileTargets.size()); + } + else + { + ChunkedFileTargetLookup.insert_or_assign(ChunkHash, Targets.size()); + Targets.push_back({ChunkOpenFileTargets.size()}); + } + ChunkOpenFileTargets.push_back({.OpenChunkedFileIndex = OpenChunkedFileIndex, .Offset = Offset}); + Offset += ChunkSize; + } + std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); + OpenChunkedFiles.emplace_back(std::make_unique<BasicFile>(ChunkedFilePath, BasicFile::Mode::kTruncate)); + PrepareFileForScatteredWrite(OpenChunkedFiles.back()->Handle(), ChunkedFileInfo.RawSize); + } + + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Dechunk, (uint32_t)TaskSteps::StepCount); + + std::vector<std::atomic<uint8_t>> ChunkWrittenFlags(ChunkOpenFileTargets.size()); + + auto WriteChunk = [&](size_t ChunkedTargetsIndex, CompressedBuffer&& CompressedChunkBuffer) { + const eastl::fixed_vector<size_t, 1>& Target = Targets[ChunkedTargetsIndex]; + for (size_t TargetIndex : Target) + { + uint8_t Expected = 0; + if (ChunkWrittenFlags[TargetIndex].compare_exchange_strong(Expected, true)) + { + const ChunkOpenFileTarget& ChunkTarget = ChunkOpenFileTargets[TargetIndex]; + BasicFile& OutputFile = *OpenChunkedFiles[ChunkTarget.OpenChunkedFileIndex]; + + if (!CompressedChunkBuffer.DecompressToStream( + 0u, + ~uint64_t(0), + [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) { + ZEN_UNUSED(SourceOffset); + ZEN_UNUSED(SourceSize); + OutputFile.Write(Range, ChunkTarget.Offset + Offset); + Offset += Range.GetSize(); + return true; + })) + { + std::error_code DummyEc; + throw std::runtime_error(fmt::format("Failed to decompress chunk {} at offset {} to {}", + CompressedChunkBuffer.DecodeRawHash(), + ChunkTarget.Offset, + PathFromHandle(OutputFile.Handle(), DummyEc))); + } + ChunksWritten++; + } + } + }; + + { + Stopwatch DechunkTimer; + + std::atomic<bool> PauseFlag; + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end()); + + for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex](std::atomic<bool>&) { + const IoHash RawHash = LooseChunks[LooseChunkIndex]; + if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(RawHash); + ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end()) + { + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash); + IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkOutputPath); + + WriteChunk(ChunkedFileTargetLookupIt->second, + CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer))); + } + }); + } + + std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end()); + + for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex](std::atomic<bool>&) { + const IoHash& BlockRawHash = Blocks[BlockIndex]; + std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", BlockRawHash); + + SharedBuffer BlockBuffer = + CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress(); + + uint64_t HeaderSize = 0; + if (IterateChunkBlock( + SharedBuffer(BlockBuffer), + [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) { + if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash); + ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end()) + { + WriteChunk(ChunkedFileTargetLookupIt->second, std::move(CompressedChunk)); + } + }, + HeaderSize)) + { + } + else + { + throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash)); + } + }); + } + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused, PendingWork); + std::string Details = fmt::format("{}/{} chunks written", ChunksWritten.load(), ChunkOpenFileTargets.size()); + + DechunkingProgressBar.UpdateState( + {.Task = "Dechunking ", + .Details = Details, + .TotalCount = ChunkOpenFileTargets.size(), + .RemainingCount = ChunkOpenFileTargets.size() - ChunksWritten.load(), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + DechunkingProgressBar.Finish(); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "{} file{} dechunked in {}", + ChunkedFileInfos.size(), + ChunkedFileInfos.size() == 1 ? "" : "s", + NiceTimeSpanMs(DechunkTimer.GetElapsedTimeMs())); + } + } + } + catch (const std::exception&) + { + for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) + { + const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; + std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); + RemoveFile(ChunkedFilePath); + } + throw; + } + { + Stopwatch VerifyTimer; + std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Verifying")); + OperationLogOutput::ProgressBar& VerifyProgressBar(*ProgressBarPtr); + + std::atomic<bool> PauseFlag; + ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog); + + std::atomic<size_t> DechunkedFilesMoved; + + for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++) + { + Work.ScheduleWork(m_IOWorkerPool, [&, ChunkedFileIndex](std::atomic<bool>&) { + const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex]; + std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash); + IoHash VerifyHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(ChunkedFilePath)); + if (VerifyHash != ChunkedFileInfo.RawHash) + { + throw std::runtime_error( + fmt::format("Dechunked file {} has mismatching hash {}", ChunkedFileInfo.RawHash, VerifyHash)); + } + std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkedFileInfo.RawHash); + RenameFile(ChunkedFilePath, ChunkOutputPath); + ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Moved dechunked file {} to '{}'", ChunkedFileInfo.RawHash, ChunkOutputPath); + DechunkedFilesMoved++; + }); + } + + Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused, PendingWork); + std::string Details = fmt::format("{}/{} files verified", DechunkedFilesMoved.load(), ChunkedFileInfos.size()); + + VerifyProgressBar.UpdateState({.Task = "Verifying ", + .Details = Details, + .TotalCount = ChunkedFileInfos.size(), + .RemainingCount = ChunkedFileInfos.size() - DechunkedFilesMoved.load(), + .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, + false); + }); + VerifyProgressBar.Finish(); + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Verified {} chunked file{} in {}", + ChunkedFileInfos.size(), + ChunkedFileInfos.size() == 1 ? "" : "s", + NiceTimeSpanMs(VerifyTimer.GetElapsedTimeMs())); + } + } + } + if (!m_Options.IsQuiet) + { + ZEN_OPERATION_LOG_INFO(m_LogOutput, + "Downloaded {} attachment{} to '{}' in {}", + m_AttachmentHashes.size(), + m_AttachmentHashes.size() == 1 ? "" : "s", + m_Options.AttachmentOutputPath, + NiceTimeSpanMs(Timer.GetElapsedTimeMs())); + } + m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount); +} + +} // namespace zen diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp index 7e02e5d69..a928d1644 100644 --- a/src/zenremotestore/projectstore/remoteprojectstore.cpp +++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp @@ -2743,26 +2743,10 @@ ParseOplogContainer(const CbObject& ContainerObject, IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash(); if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash))) { - ChunkedInfo Chunked; - Chunked.RawHash = RawHash; - Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64(); - CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView(); - Chunked.ChunkHashes.reserve(ChunksArray.Num()); - for (CbFieldView ChunkField : ChunksArray) - { - const IoHash ChunkHash = ChunkField.AsHash(); - Chunked.ChunkHashes.emplace_back(ChunkHash); - } + ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView); + OnReferencedAttachments(Chunked.ChunkHashes); OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end()); - CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView(); - Chunked.ChunkSequence.reserve(SequenceArray.Num()); - for (CbFieldView SequenceField : SequenceArray) - { - uint32_t SequenceIndex = SequenceField.AsUInt32(); - ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); - Chunked.ChunkSequence.push_back(SequenceIndex); - } OnChunkedAttachment(Chunked); ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks", Chunked.RawHash, @@ -3315,6 +3299,33 @@ LoadOplog(CidStore& ChunkStore, return Result; } +ChunkedInfo +ReadChunkedInfo(CbObjectView ChunkedFile) +{ + using namespace std::literals; + + ChunkedInfo Chunked; + Chunked.RawHash = ChunkedFile["rawhash"sv].AsHash(); + Chunked.RawSize = ChunkedFile["rawsize"sv].AsUInt64(); + + CbArrayView ChunksArray = ChunkedFile["chunks"sv].AsArrayView(); + Chunked.ChunkHashes.reserve(ChunksArray.Num()); + for (CbFieldView ChunkField : ChunksArray) + { + const IoHash ChunkHash = ChunkField.AsHash(); + Chunked.ChunkHashes.emplace_back(ChunkHash); + } + CbArrayView SequenceArray = ChunkedFile["sequence"sv].AsArrayView(); + Chunked.ChunkSequence.reserve(SequenceArray.Num()); + for (CbFieldView SequenceField : SequenceArray) + { + uint32_t SequenceIndex = SequenceField.AsUInt32(); + ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size()); + Chunked.ChunkSequence.push_back(SequenceIndex); + } + return Chunked; +} + ////////////////////////////////////////////////////////////////////////// // These are here to avoid vtable leakage |