aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-11-07 12:27:44 +0100
committerGitHub Enterprise <[email protected]>2025-11-07 12:27:44 +0100
commit72b1797e2b65ad47f4dc8e9fab73b9aa170889b4 (patch)
tree93c9f7c99965393ba9c6ec86c01b63899bfba684 /src
parentmove progress bar to separate file (#638) (diff)
downloadzen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.tar.xz
zen-72b1797e2b65ad47f4dc8e9fab73b9aa170889b4.zip
get oplog attachments (#622)
* add support for downloading individual attachments from an oplog
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp293
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp319
-rw-r--r--src/zen/cmds/projectstore_cmd.h16
-rw-r--r--src/zen/threadworkers.cpp79
-rw-r--r--src/zen/threadworkers.h43
-rw-r--r--src/zencore/basicfile.cpp24
-rw-r--r--src/zencore/include/zencore/basicfile.h2
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h106
-rw-r--r--src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h2
-rw-r--r--src/zenremotestore/projectstore/projectstoreoperations.cpp917
-rw-r--r--src/zenremotestore/projectstore/remoteprojectstore.cpp47
11 files changed, 1558 insertions, 290 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index 3a334ed15..4c427fc7f 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -41,6 +41,7 @@
#include <zenutil/zenserverprocess.h>
#include "../progressbar.h"
+#include "../threadworkers.h"
#include <signal.h>
#include <memory>
@@ -267,95 +268,6 @@ namespace {
static ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
static bool AllowFileClone = true;
- static std::unique_ptr<WorkerThreadPool> NetworkPoolInstance;
- static std::unique_ptr<WorkerThreadPool> IOPoolInstance;
-
- WorkerThreadPool* NetworkPool = nullptr;
- WorkerThreadPool* IOPool = nullptr;
-
- void InitializeWorkerPools(bool BoostWorkers)
- {
- if (SingleThreaded)
- {
- NetworkPool = &GetSyncWorkerPool();
- IOPool = &GetSyncWorkerPool();
- ZEN_CONSOLE("Not using thread workers for network or I/O, expect no progress reporting");
- }
- else
- {
- unsigned int Cores = GetHardwareConcurrency();
- if (BoostWorkers)
- {
- // Cores Net Disk
- // 2 4 3
- // 4 6 3
- // 8 11 6
- // 16 16 12
- // 32 16 24
- // 64 16 32
- // n 16 32
-
- unsigned int NetworkThreadCount = Cores + (Cores / 4u) + Max(1u, (Cores / 8u));
- NetworkThreadCount = Min(NetworkThreadCount, 16u);
- NetworkThreadCount = Max(NetworkThreadCount, 4u);
-
- unsigned int IOThreadCount = Cores - (Cores / 4u);
- IOThreadCount = Min(IOThreadCount, 32u);
- IOThreadCount = Max(IOThreadCount, 3u);
-
- NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net");
- IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk");
-
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Worker boost enabled, using {} network threads and {} worker/IO threads",
- NetworkThreadCount,
- IOThreadCount);
- }
- }
- else
- {
- // Cores Net Disk
- // 2 2 2
- // 4 3 2
- // 8 5 4
- // 16 10 9
- // 32 12 18
- // 64 12 20
- // n 12 20
-
- unsigned int NetworkThreadCount = (Cores / 2u) + Max(1u, (Cores / 8u));
- NetworkThreadCount = Min(NetworkThreadCount, 12u);
- NetworkThreadCount = Max(NetworkThreadCount, 2u);
-
- unsigned int IOThreadCount = Cores / 2u + Cores / 16u;
- IOThreadCount = Min(IOThreadCount, 20u);
- IOThreadCount = Max(IOThreadCount, 2u);
-
- NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net");
- IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk");
-
- if (!IsQuiet)
- {
- ZEN_CONSOLE("Using {} network threads and {} worker/IO threads", NetworkThreadCount, IOThreadCount);
- }
- }
- NetworkPool = NetworkPoolInstance.get();
- IOPool = IOPoolInstance.get();
- }
- }
-
- WorkerThreadPool& GetIOWorkerPool()
- {
- ZEN_ASSERT(IOPool);
- return *IOPool;
- }
- WorkerThreadPool& GetNetworkPool()
- {
- ZEN_ASSERT(NetworkPool);
- return *NetworkPool;
- }
-
#define ZEN_CONSOLE_VERBOSE(fmtstr, ...) \
if (IsVerbose) \
{ \
@@ -496,12 +408,16 @@ namespace {
return Count * 1000000 / ElapsedWallTimeUS;
}
- bool CleanAndRemoveDirectory(const std::filesystem::path& Directory)
+ bool CleanAndRemoveDirectory(WorkerThreadPool& WorkerPool, const std::filesystem::path& Directory)
{
- return CleanAndRemoveDirectory(GetIOWorkerPool(), AbortFlag, PauseFlag, Directory);
+ return CleanAndRemoveDirectory(WorkerPool, AbortFlag, PauseFlag, Directory);
}
- void ValidateBuildPart(BuildStorageBase& Storage, const Oid& BuildId, Oid BuildPartId, const std::string_view BuildPartName)
+ void ValidateBuildPart(ThreadWorkers& Workers,
+ BuildStorageBase& Storage,
+ const Oid& BuildId,
+ Oid BuildPartId,
+ const std::string_view BuildPartName)
{
ZEN_TRACE_CPU("ValidateBuildPart");
@@ -513,8 +429,8 @@ namespace {
Storage,
AbortFlag,
PauseFlag,
- GetIOWorkerPool(),
- GetNetworkPool(),
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
BuildId,
BuildPartId,
BuildPartName,
@@ -532,7 +448,8 @@ namespace {
NiceTimeSpanMs(ValidateOp.m_ValidateStats.ElapsedWallTimeUS / 1000));
}
- void UploadFolder(StorageInstance& Storage,
+ void UploadFolder(ThreadWorkers& Workers,
+ StorageInstance& Storage,
const Oid& BuildId,
const Oid& BuildPartId,
const std::string_view BuildPartName,
@@ -557,8 +474,8 @@ namespace {
Storage,
AbortFlag,
PauseFlag,
- GetIOWorkerPool(),
- GetNetworkPool(),
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
BuildId,
BuildPartId,
BuildPartName,
@@ -781,7 +698,8 @@ namespace {
uint64_t VerifyElapsedWallTimeUs = 0;
};
- void VerifyFolder(const ChunkedFolderContent& Content,
+ void VerifyFolder(ThreadWorkers& Workers,
+ const ChunkedFolderContent& Content,
const ChunkedContentLookup& Lookup,
const std::filesystem::path& Path,
bool VerifyFileHash,
@@ -793,7 +711,7 @@ namespace {
ProgressBar ProgressBar(ProgressMode, "Verify Files");
- WorkerThreadPool& VerifyPool = GetIOWorkerPool();
+ WorkerThreadPool& VerifyPool = Workers.GetIOWorkerPool();
ParallelWork Work(AbortFlag, PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
@@ -973,7 +891,8 @@ namespace {
}
}
- FolderContent GetValidFolderContent(GetFolderContentStatistics& LocalFolderScanStats,
+ FolderContent GetValidFolderContent(ThreadWorkers& Workers,
+ GetFolderContentStatistics& LocalFolderScanStats,
const std::filesystem::path& Path,
std::span<const std::filesystem::path> PathsToCheck,
std::function<void(uint64_t PathCount, uint64_t CompletedPathCount)>&& ProgressCallback)
@@ -998,7 +917,7 @@ namespace {
while (PathIndex < PathCount)
{
uint32_t PathRangeCount = Min(128u, PathCount - PathIndex);
- Work.ScheduleWork(GetIOWorkerPool(),
+ Work.ScheduleWork(Workers.GetIOWorkerPool(),
[PathIndex, PathRangeCount, &PathsToCheck, &Path, &Result, &CompletedPathCount, &LocalFolderScanStats](
std::atomic<bool>&) {
for (uint32_t PathRangeIndex = PathIndex; PathRangeIndex < PathIndex + PathRangeCount;
@@ -1448,7 +1367,8 @@ namespace {
return RemoteContent;
}
- ChunkedFolderContent GetLocalContent(GetFolderContentStatistics& LocalFolderScanStats,
+ ChunkedFolderContent GetLocalContent(ThreadWorkers& Workers,
+ GetFolderContentStatistics& LocalFolderScanStats,
ChunkingStatistics& ChunkingStats,
const std::filesystem::path& Path,
const std::filesystem::path& StateFilePath,
@@ -1469,7 +1389,7 @@ namespace {
try
{
ReadStateFile(StateFilePath, LocalFolderState, LocalContent);
- if (IsQuiet)
+ if (!IsQuiet)
{
ZEN_CONSOLE("Read local state file {} in {}", StateFilePath, NiceTimeSpanMs(ReadStateTimer.GetElapsedTimeMs()));
}
@@ -1514,7 +1434,8 @@ namespace {
{
ProgressBar ProgressBar(ProgressMode, "Check Files");
OutLocalFolderContent =
- GetValidFolderContent(LocalFolderScanStats,
+ GetValidFolderContent(Workers,
+ LocalFolderScanStats,
Path,
PathsToCheck,
[&ProgressBar, &LocalFolderScanStats](uint64_t PathCount, uint64_t CompletedPathCount) {
@@ -1571,7 +1492,7 @@ namespace {
ChunkingStatistics LocalChunkingStats;
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
LocalChunkingStats,
- GetIOWorkerPool(),
+ Workers.GetIOWorkerPool(),
Path,
UpdatedContent,
ChunkController,
@@ -1675,7 +1596,7 @@ namespace {
ChunkingStatistics LocalChunkingStats;
ChunkedFolderContent UpdatedLocalContent = ChunkFolderContent(
LocalChunkingStats,
- GetIOWorkerPool(),
+ Workers.GetIOWorkerPool(),
Path,
UpdatedContent,
ChunkController,
@@ -1725,7 +1646,7 @@ namespace {
ChunkingStatistics LocalChunkingStats;
LocalContent = ChunkFolderContent(
LocalChunkingStats,
- GetIOWorkerPool(),
+ Workers.GetIOWorkerPool(),
Path,
OutLocalFolderContent,
ChunkController,
@@ -1761,6 +1682,7 @@ namespace {
}
ChunkedFolderContent ScanAndChunkFolder(
+ ThreadWorkers& Workers,
GetFolderContentStatistics& GetFolderContentStats,
ChunkingStatistics& ChunkingStats,
const std::filesystem::path& Path,
@@ -1777,7 +1699,7 @@ namespace {
Path,
std::move(IsAcceptedFolder),
std::move(IsAcceptedFile),
- GetIOWorkerPool(),
+ Workers.GetIOWorkerPool(),
GetUpdateDelayMS(ProgressMode),
[](bool, std::ptrdiff_t) {},
AbortFlag);
@@ -1787,7 +1709,8 @@ namespace {
}
FolderContent _;
- ChunkedFolderContent Result = GetLocalContent(GetFolderContentStats,
+ ChunkedFolderContent Result = GetLocalContent(Workers,
+ GetFolderContentStats,
ChunkingStats,
Path,
ZenStateFilePath(Path / ZenFolderName),
@@ -1830,7 +1753,8 @@ namespace {
std::vector<std::string> ExcludeWildcards;
};
- void DownloadFolder(StorageInstance& Storage,
+ void DownloadFolder(ThreadWorkers& Workers,
+ StorageInstance& Storage,
const Oid& BuildId,
const std::vector<Oid>& BuildPartIds,
std::span<const std::string> BuildPartNames,
@@ -1903,7 +1827,8 @@ namespace {
ChunkController = CreateStandardChunkingController(StandardChunkingControllerSettings{});
}
- LocalContent = GetLocalContent(LocalFolderScanStats,
+ LocalContent = GetLocalContent(Workers,
+ LocalFolderScanStats,
ChunkingStats,
Path,
ZenStateFilePath(Options.ZenFolderPath),
@@ -1969,8 +1894,8 @@ namespace {
Storage,
AbortFlag,
PauseFlag,
- GetIOWorkerPool(),
- GetNetworkPool(),
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
BuildId,
Path,
LocalContent,
@@ -2010,7 +1935,7 @@ namespace {
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Verify, TaskSteps::StepCount);
- VerifyFolder(RemoteContent, RemoteLookup, Path, Options.PostDownloadVerify, VerifyFolderStats);
+ VerifyFolder(Workers, RemoteContent, RemoteLookup, Path, Options.PostDownloadVerify, VerifyFolderStats);
Stopwatch WriteStateTimer;
CbObject StateObject = CreateStateObject(BuildId, AllBuildParts, PartContents, LocalFolderState, Path);
@@ -2094,7 +2019,7 @@ namespace {
ProgressBar::SetLogOperationProgress(ProgressMode, TaskSteps::Cleanup, TaskSteps::StepCount);
- CleanAndRemoveDirectory(ZenTempFolder);
+ CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), ZenTempFolder);
}
void ListBuild(StorageInstance& Storage,
@@ -2178,7 +2103,10 @@ namespace {
}
}
- void DiffFolders(const std::filesystem::path& BasePath, const std::filesystem::path& ComparePath, bool OnlyChunked)
+ void DiffFolders(ThreadWorkers& Workers,
+ const std::filesystem::path& BasePath,
+ const std::filesystem::path& ComparePath,
+ bool OnlyChunked)
{
ZEN_TRACE_CPU("DiffFolders");
@@ -2246,7 +2174,8 @@ namespace {
GetFolderContentStatistics BaseGetFolderContentStats;
ChunkingStatistics BaseChunkingStats;
- BaseFolderContent = ScanAndChunkFolder(BaseGetFolderContentStats,
+ BaseFolderContent = ScanAndChunkFolder(Workers,
+ BaseGetFolderContentStats,
BaseChunkingStats,
BasePath,
IsAcceptedFolder,
@@ -2261,7 +2190,8 @@ namespace {
GetFolderContentStatistics CompareGetFolderContentStats;
ChunkingStatistics CompareChunkingStats;
- CompareFolderContent = ScanAndChunkFolder(CompareGetFolderContentStats,
+ CompareFolderContent = ScanAndChunkFolder(Workers,
+ CompareGetFolderContentStats,
CompareChunkingStats,
ComparePath,
IsAcceptedFolder,
@@ -3391,7 +3321,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
? MakeSafeAbsolutePath(std::filesystem::current_path()) / ZenFolderName
: MakeSafeAbsolutePath(m_ZenFolderPath);
CreateDirectories(ZenFolderPath);
- auto _ = MakeGuard([ZenFolderPath]() { CleanAndRemoveDirectory(ZenFolderPath); });
+ auto _ = MakeGuard([ZenFolderPath]() { CleanAndRemoveDirectory(GetSmallWorkerPool(EWorkloadType::Burst), ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -3476,7 +3406,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
CreateDirectories(m_ZenFolderPath);
- auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); });
+ auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(GetSmallWorkerPool(EWorkloadType::Burst), m_ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -3537,7 +3467,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
CreateDirectories(m_ZenFolderPath);
- auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); });
+ auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(GetSmallWorkerPool(EWorkloadType::Burst), m_ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -3588,7 +3518,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
LogExecutableVersionAndPid();
}
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
ZenState InstanceState;
@@ -3604,7 +3538,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
CreateDirectories(m_ZenFolderPath);
- auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); });
+ auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -3633,7 +3567,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path TempDir = ZenTempFolderPath(m_ZenFolderPath);
- UploadFolder(Storage,
+ UploadFolder(Workers,
+ Storage,
BuildId,
BuildPartId,
m_BuildPartName,
@@ -3651,7 +3586,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (m_PostUploadVerify)
{
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
}
}
@@ -3705,7 +3640,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
LogExecutableVersionAndPid();
}
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
ZenState InstanceState;
@@ -3758,7 +3697,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
EPartialBlockRequestMode PartialBlockRequestMode = ParseAllowPartialBlockRequests();
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
BuildPartIds,
BuildPartNames,
@@ -3830,12 +3770,16 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
LogExecutableVersionAndPid();
}
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
ParsePath();
ParseDiffPath();
- DiffFolders(m_Path, m_DiffPath, m_OnlyChunked);
+ DiffFolders(Workers, m_Path, m_DiffPath, m_OnlyChunked);
if (AbortFlag)
{
throw std::runtime_error("Diff folders aborted");
@@ -3849,7 +3793,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
LogExecutableVersionAndPid();
}
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
BuildStorageBase::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -3861,7 +3809,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
CreateDirectories(m_ZenFolderPath);
- auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); });
+ auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -3895,7 +3843,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
Storage,
AbortFlag,
PauseFlag,
- GetNetworkPool(),
+ Workers.GetNetworkPool(),
BuildId,
AllBuildPartIds,
BuildsOperationPrimeCache::Options{.IsQuiet = IsQuiet,
@@ -3928,7 +3876,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
LogExecutableVersionAndPid();
}
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
BuildStorageBase::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -3940,7 +3892,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
CreateDirectories(m_ZenFolderPath);
- auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); });
+ auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -3977,7 +3929,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
LogExecutableVersionAndPid();
}
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
ZenState InstanceState;
@@ -3991,7 +3947,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath);
CreateDirectories(m_ZenFolderPath);
- auto _ = MakeGuard([this]() { CleanAndRemoveDirectory(m_ZenFolderPath); });
+ auto _ = MakeGuard([this, &Workers]() { CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_ZenFolderPath); });
StorageInstance Storage = CreateBuildStorage(StorageStats,
StorageCacheStats,
@@ -4011,7 +3967,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const Oid BuildPartId = m_BuildPartName.empty() ? Oid::Zero : ParseBuildPartId();
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
if (AbortFlag)
{
@@ -4021,7 +3977,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_MultiTestDownloadOptions)
{
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred();
CreateDirectories(m_SystemRootDir);
@@ -4056,7 +4016,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
throw OptionParseException(fmt::format("'--build-id' ('{}') is malformed", BuildIdString), SubOption->help());
}
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{},
{},
@@ -4128,7 +4089,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_TestOptions)
{
- InitializeWorkerPools(m_BoostWorkerThreads);
+ ThreadWorkers Workers(m_BoostWorkerThreads, SingleThreaded);
+ if (!IsQuiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred();
CreateDirectories(m_SystemRootDir);
@@ -4140,7 +4105,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (m_OverrideHost.empty() && m_StoragePath.empty())
{
m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred();
- CleanAndRemoveDirectory(m_StoragePath);
+ CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), m_StoragePath);
CreateDirectories(m_StoragePath);
m_StoragePath = m_StoragePath.generic_string();
}
@@ -4161,10 +4126,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path DownloadPath2 = m_Path.parent_path() / (m_BuildPartName + "_test2");
const std::filesystem::path DownloadPath3 = m_Path.parent_path() / (m_BuildPartName + "_test3");
- auto ___ = MakeGuard([DownloadPath, DownloadPath2, DownloadPath3]() {
- CleanAndRemoveDirectory(DownloadPath);
- CleanAndRemoveDirectory(DownloadPath2);
- CleanAndRemoveDirectory(DownloadPath3);
+ auto ___ = MakeGuard([&Workers, DownloadPath, DownloadPath2, DownloadPath3]() {
+ CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), DownloadPath);
+ CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), DownloadPath2);
+ CleanAndRemoveDirectory(Workers.GetIOWorkerPool(), DownloadPath3);
});
if (m_ZenFolderPath.empty())
@@ -4212,7 +4177,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
const std::filesystem::path UploadTempDir = UploadTempDirectory(m_Path);
// std::filesystem::path UploadTempDir = m_ZenFolderPath / "upload_tmp";
- UploadFolder(Storage,
+ UploadFolder(Workers,
+ Storage,
BuildId,
BuildPartId,
m_BuildPartName,
@@ -4230,10 +4196,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error("Test aborted. (Upload build)");
}
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}'", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{BuildPartId},
{},
@@ -4254,7 +4221,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (identical target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{BuildPartId},
{},
@@ -4274,7 +4242,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error("Test aborted. (Re-download identical target)");
}
- auto ScrambleDir = [](const std::filesystem::path& Path) {
+ auto ScrambleDir = [&Workers](const std::filesystem::path& Path) {
ZEN_CONSOLE("\nScrambling '{}'", Path);
Stopwatch Timer;
DirectoryContent DownloadContent;
@@ -4318,7 +4286,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SourceSize > 256)
{
Work.ScheduleWork(
- GetIOWorkerPool(),
+ Workers.GetIOWorkerPool(),
[SourceSize, FilePath = std::filesystem::path(FilePath)](std::atomic<bool>&) {
if (!AbortFlag)
{
@@ -4367,7 +4335,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ScrambleDir(DownloadPath);
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled target)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{BuildPartId},
{},
@@ -4399,7 +4368,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
ZEN_CONSOLE("\nUpload scrambled Build {}, Part {} ({})\n{}\n", BuildId2, BuildPartId2, m_BuildPartName, SB.ToView());
}
- UploadFolder(Storage,
+ UploadFolder(Workers,
+ Storage,
BuildId2,
BuildPartId2,
m_BuildPartName,
@@ -4417,10 +4387,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
throw std::runtime_error("Test aborted. (Upload scrambled)");
}
- ValidateBuildPart(*Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
+ ValidateBuildPart(Workers, *Storage.BuildStorage, BuildId, BuildPartId, m_BuildPartName);
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{BuildPartId},
{},
@@ -4441,7 +4412,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId2,
{BuildPartId2},
{},
@@ -4461,7 +4433,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nRe-download Build {}, Part {} ({}) to '{}' (scrambled)", BuildId2, BuildPartId2, m_BuildPartName, DownloadPath);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId2,
{BuildPartId2},
{},
@@ -4481,7 +4454,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath2);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{BuildPartId},
{},
@@ -4501,7 +4475,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
ZEN_CONSOLE("\nDownload Build {}, Part {} ({}) to '{}' (original)", BuildId, BuildPartId, m_BuildPartName, DownloadPath3);
- DownloadFolder(Storage,
+ DownloadFolder(Workers,
+ Storage,
BuildId,
{BuildPartId},
{},
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 6df317823..fe8c0d675 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -9,6 +9,7 @@
#include <zencore/filesystem.h>
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
+#include <zencore/parallelwork.h>
#include <zencore/process.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
@@ -19,11 +20,17 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/httpclientauth.h>
#include <zenhttp/httpcommon.h>
+#include <zenremotestore/builds/buildstoragecache.h>
#include <zenremotestore/builds/buildstorageutil.h>
#include <zenremotestore/builds/jupiterbuildstorage.h>
#include <zenremotestore/jupiter/jupiterhost.h>
+#include <zenremotestore/operationlogoutput.h>
+#include <zenremotestore/projectstore/projectstoreoperations.h>
+#include <zenremotestore/projectstore/remoteprojectstore.h>
+#include <zenutil/workerpools.h>
#include "../progressbar.h"
+#include "../threadworkers.h"
ZEN_THIRD_PARTY_INCLUDES_START
#include <json11.hpp>
@@ -2212,9 +2219,6 @@ OplogDownloadCommand::OplogDownloadCommand()
m_Options.add_option("", "", "system-dir", "Specify system root", cxxopts::value(m_SystemRootDir), "<systemdir>");
- m_Options.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>");
- m_Options.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>");
-
auto AddCloudOptions = [this](cxxopts::Options& Ops) {
m_AuthOptions.AddOptions(Ops);
@@ -2232,16 +2236,58 @@ OplogDownloadCommand::OplogDownloadCommand()
Ops.add_option("cloud build", "", "bucket", "Builds Storage bucket", cxxopts::value(m_Bucket), "<bucket>");
};
+ auto AddCacheOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("cache", "", "zen-cache-host", "Host ip and port for zen builds cache", cxxopts::value(m_ZenCacheHost), "<zenhost>");
+ };
+
AddCloudOptions(m_Options);
+ AddCacheOptions(m_Options);
+
+ auto AddOutputOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("", "y", "yes", "Don't query for confirmation", cxxopts::value(m_Yes), "<yes>");
+ Ops.add_option("output",
+ "",
+ "plain-progress",
+ "Show progress using plain output",
+ cxxopts::value(m_PlainProgress),
+ "<plainprogress>");
+ Ops.add_option("output",
+ "",
+ "log-progress",
+ "Write @progress style progress to output",
+ cxxopts::value(m_LogProgress),
+ "<logprogress>");
+ Ops.add_option("output", "", "verbose", "Enable verbose console output", cxxopts::value(m_Verbose), "<verbose>");
+ Ops.add_option("output", "", "quiet", "Suppress non-essential output", cxxopts::value(m_Quiet), "<quiet>");
+ };
+ AddOutputOptions(m_Options);
m_Options.add_option("", "", "build-id", "Build Id", cxxopts::value(m_BuildId), "<id>");
+
+ m_Options.add_option("", "", "force", "Force download and disregard local cache", cxxopts::value(m_ForceDownload), "<force>");
+ m_Options.add_option("",
+ "",
+ "boost-workers",
+ "Increase the number of worker threads - may cause computer to be less responsive",
+ cxxopts::value(m_BoostWorkerThreads),
+ "<boostworkers>");
+ m_Options.add_option("", "", "decompress", "Decompress downloaded attachment", cxxopts::value(m_DecompressAttachments), "<decompress>");
+
m_Options.add_option("",
"",
"output-path",
"Path to oplog output, extension .json or .cb (compact binary). Default is output to console",
- cxxopts::value(m_OutputPath),
+ cxxopts::value(m_OplogOutputPath),
"<path>");
+ m_Options.add_option("",
+ "",
+ "attachments",
+ "Comma separated list of attachments in RawHash for to download",
+ cxxopts::value(m_Attachments),
+ "<attachments>");
+ m_Options.add_option("", "", "attachments-path", "Path to folder to write attachments to", cxxopts::value(m_AttachmentsPath), "<path>");
+
m_Options.parse_positional({"cloud-url", "output-path"});
m_Options.positional_help("[<cloud-url> <output-path>]");
}
@@ -2274,6 +2320,49 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
};
ParseSystemOptions();
+ ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
+
+ auto ParseOutputOptions = [&]() {
+ if (m_Verbose && m_Quiet)
+ {
+ throw OptionParseException("'--verbose' conflicts with '--quiet'", m_Options.help());
+ }
+ if (m_LogProgress && m_PlainProgress)
+ {
+ throw OptionParseException("'--plain-progress' conflicts with '--log-progress'", m_Options.help());
+ }
+ if (m_LogProgress && m_Quiet)
+ {
+ throw OptionParseException("'--quiet' conflicts with '--log-progress'", m_Options.help());
+ }
+ if (m_PlainProgress && m_Quiet)
+ {
+ throw OptionParseException("'--quiet' conflicts with '--plain-progress'", m_Options.help());
+ }
+
+ if (m_LogProgress)
+ {
+ ProgressMode = ProgressBar::Mode::Log;
+ }
+ else if (m_PlainProgress)
+ {
+ ProgressMode = ProgressBar::Mode::Plain;
+ }
+ else if (m_Verbose)
+ {
+ ProgressMode = ProgressBar::Mode::Plain;
+ }
+ else if (m_Quiet)
+ {
+ ProgressMode = ProgressBar::Mode::Quiet;
+ }
+ else
+ {
+ ProgressMode = ProgressBar::Mode::Pretty;
+ }
+ };
+ ParseOutputOptions();
+
auto ParseStorageOptions = [&](bool RequireNamespace, bool RequireBucket) {
if (!m_Url.empty())
{
@@ -2316,13 +2405,25 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
};
+ ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true);
+
+ ThreadWorkers Workers(m_BoostWorkerThreads, /*SingleThreaded*/ false);
+ if (!m_Quiet)
+ {
+ ZEN_CONSOLE("{}", Workers.GetWorkersInfo());
+ }
+
std::unique_ptr<AuthMgr> Auth;
HttpClientSettings ClientSettings{.LogCategory = "httpbuildsclient",
.AssumeHttp2 = m_AssumeHttp2,
.AllowResume = true,
.RetryCount = 2};
- ParseStorageOptions(/*RequireNamespace*/ true, /*RequireBucket*/ true);
+ Oid BuildId = Oid::TryFromHexString(m_BuildId);
+ if (BuildId == Oid::Zero)
+ {
+ throw OptionParseException(fmt::format("'--build-id' ('{}') is malformed", m_BuildId), m_Options.help());
+ }
m_AuthOptions.ParseOptions(m_Options,
m_SystemRootDir,
@@ -2332,138 +2433,141 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
m_Quiet,
/*Hidden*/ false);
- BuildStorageResolveResult ResolveRes = ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, ""sv, ZenCacheResolveMode::Off);
+ BuildStorageResolveResult ResolveRes =
+ ResolveBuildStorage(ClientSettings, m_Host, m_OverrideHost, m_ZenCacheHost, ZenCacheResolveMode::Discovery);
-#if 0
- std::string BuildStorageName = ZEN_CLOUD_STORAGE;
+ BuildStorageBase::Statistics StorageStats;
- std::string CloudHost;
+ StorageInstance Storage;
- if (m_OverrideHost.empty())
- {
- JupiterServerDiscovery Response = DiscoverJupiterEndpoints(m_Host, ClientSettings);
+ Storage.BuildStorageHttp = std::make_unique<HttpClient>(ResolveRes.HostUrl, ClientSettings);
- if (Response.ServerEndPoints.empty())
- {
- throw std::runtime_error(fmt::format("Failed to find any builds hosts at {}", m_Host));
- }
- for (const JupiterServerDiscovery::EndPoint& ServerEndpoint : Response.ServerEndPoints)
- {
- if (!ServerEndpoint.BaseUrl.empty())
- {
- if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(ServerEndpoint.BaseUrl, ServerEndpoint.AssumeHttp2);
- TestResult.Success)
- {
- CloudHost = ServerEndpoint.BaseUrl;
- m_AssumeHttp2 = ServerEndpoint.AssumeHttp2;
- BuildStorageName = ServerEndpoint.Name;
- break;
- }
- else
- {
- ZEN_DEBUG("Unable to reach host {}. Reason: {}", ServerEndpoint.BaseUrl, TestResult.FailureReason);
- }
- }
- }
- if (CloudHost.empty())
- {
- throw std::runtime_error(
- fmt::format("Failed to find any usable builds hosts out of {} using {}", Response.ServerEndPoints.size(), m_Host));
- }
- }
- else if (JupiterEndpointTestResult TestResult = TestJupiterEndpoint(m_OverrideHost, m_AssumeHttp2); TestResult.Success)
- {
- CloudHost = m_OverrideHost;
- }
- else
- {
- throw std::runtime_error(fmt::format("Host {} could not be reached. Reason: {}", m_OverrideHost, TestResult.FailureReason));
- }
-#endif // 0
+ BuildStorageCache::Statistics StorageCacheStats;
- Oid BuildId = Oid::TryFromHexString(m_BuildId);
- if (BuildId == Oid::Zero)
+ std::atomic<bool> AbortFlag(false);
+
+ if (!ResolveRes.CacheUrl.empty())
{
- throw OptionParseException("'--build-id' is malformed", m_Options.help());
+ Storage.CacheHttp = std::make_unique<HttpClient>(ResolveRes.CacheUrl,
+ HttpClientSettings{.LogCategory = "httpcacheclient",
+ .ConnectTimeout = std::chrono::milliseconds{3000},
+ .Timeout = std::chrono::milliseconds{30000},
+ .AssumeHttp2 = ResolveRes.CacheAssumeHttp2,
+ .AllowResume = true,
+ .RetryCount = 0},
+ [&AbortFlag]() { return AbortFlag.load(); });
+ Storage.CacheName = ResolveRes.CacheName;
}
- BuildStorageBase::Statistics StorageStats;
- HttpClient BuildStorageHttp(ResolveRes.HostUrl, ClientSettings);
-
if (!m_Quiet)
{
std::string StorageDescription =
- fmt::format("Cloud {}{}. Namespace '{}', Bucket '{}'",
+ fmt::format("Cloud {}{}. SessionId {}. Namespace '{}', Bucket '{}'",
ResolveRes.HostName,
(ResolveRes.HostUrl == ResolveRes.HostName) ? "" : fmt::format(" {}", ResolveRes.HostUrl),
+ Storage.BuildStorageHttp->GetSessionId(),
m_Namespace,
m_Bucket);
ZEN_CONSOLE("Remote: {}", StorageDescription);
- }
-
- std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + Oid::NewOid().ToString());
- std::unique_ptr<BuildStorageBase> BuildStorage =
- CreateJupiterBuildStorage(Log(), BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath);
+ if (Storage.CacheHttp)
+ {
+ std::string CacheDescription =
+ fmt::format("Zen {}{}. SessionId {}. Namespace '{}', Bucket '{}'",
+ ResolveRes.CacheName,
+ (ResolveRes.CacheUrl == ResolveRes.CacheName) ? "" : fmt::format(" {}", ResolveRes.CacheUrl),
+ Storage.CacheHttp->GetSessionId(),
+ m_Namespace,
+ m_Bucket);
- Stopwatch Timer;
- CbObject BuildObject = BuildStorage->GetBuild(BuildId);
- if (!m_Quiet)
- {
- ZEN_CONSOLE("Fetched {}/{}/{}/{} in {}", m_Url, m_Namespace, m_Bucket, BuildId, NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ ZEN_CONSOLE("Cache : {}", CacheDescription);
+ }
}
- Timer.Reset();
+ std::string FullBuildKey = fmt::format("{}_{}_{}", m_Namespace, m_Bucket, m_BuildId);
+ IoHash FullBuildKeyHash = IoHash::HashBuffer(FullBuildKey.data(), FullBuildKey.length());
- CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
- if (!PartsObject)
- {
- throw std::runtime_error(
- fmt::format("The build {}/{}/{}/{} payload does not contain a 'parts' object"sv, m_Url, m_Namespace, m_Bucket, m_BuildId));
- }
+ std::filesystem::path StorageTempPath = std::filesystem::temp_directory_path() / ("zen_" + FullBuildKeyHash.ToHexString());
- static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
+ Storage.BuildStorage =
+ CreateJupiterBuildStorage(Log(), *Storage.BuildStorageHttp, StorageStats, m_Namespace, m_Bucket, m_AllowRedirect, StorageTempPath);
+ Storage.StorageName = ResolveRes.HostName;
- const Oid OplogBuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
- if (OplogBuildPartId == Oid::Zero)
+ if (Storage.CacheHttp)
{
- throw std::runtime_error(fmt::format("The build {}/{}/{}/{} payload 'parts' object does not contain a '{}' entry"sv,
- m_Url,
- m_Namespace,
- m_Bucket,
- m_BuildId,
- OplogContainerPartName));
+ Storage.BuildCacheStorage = CreateZenBuildStorageCache(
+ *Storage.CacheHttp,
+ StorageCacheStats,
+ m_Namespace,
+ m_Bucket,
+ StorageTempPath / "zencache",
+ false ? GetSmallWorkerPool(EWorkloadType::Background) : GetTinyWorkerPool(EWorkloadType::Background));
}
- CbObject ContainerObject = BuildStorage->GetBuildPart(BuildId, OplogBuildPartId);
+ std::unique_ptr<OperationLogOutput> OperationLogOutput(CreateConsoleLogOutput(ProgressMode));
- MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
- IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
- IoBuffer SectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+ ProjectStoreOperationOplogState State(
+ *OperationLogOutput,
+ Storage,
+ BuildId,
+ {.IsQuiet = m_Quiet, .IsVerbose = m_Verbose, .ForceDownload = m_ForceDownload, .TempFolderPath = StorageTempPath});
- CbValidateError ValidateResult = CbValidateError::None;
- if (CbObject SectionObject = ValidateAndReadCompactBinaryObject(std::move(SectionPayload), ValidateResult);
- ValidateResult == CbValidateError::None && ContainerObject)
+ const Oid OplogBuildPartId = State.GetBuildPartId();
+
+ if (!m_Attachments.empty())
{
- if (!m_Quiet)
+ if (m_AttachmentsPath.empty())
{
- ZEN_CONSOLE("Decompressed and validated oplog payload {} -> {} in {}",
- NiceBytes(OpsSection.GetSize()),
- NiceBytes(SectionObject.GetSize()),
- NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ throw OptionParseException("'--attachments-path' is required when '--attachments' is given", m_Options.help());
}
- if (m_OutputPath.empty())
+ std::filesystem::path AttachmentsPath = MakeSafeAbsolutePath(m_AttachmentsPath);
+ CreateDirectories(AttachmentsPath);
+ std::vector<IoHash> AttachmentHashes;
+ AttachmentHashes.reserve(m_Attachments.size());
+ for (const std::string& Attachment : m_Attachments)
+ {
+ IoHash RawHash;
+ if (!IoHash::TryParse(Attachment, RawHash))
+ {
+ throw OptionParseException(fmt::format("'--attachments' ('{}') is malformed", Attachment), m_Options.help());
+ }
+ AttachmentHashes.push_back(RawHash);
+ }
+
+ std::atomic<bool> PauseFlag;
+ ProjectStoreOperationDownloadAttachments Op(*OperationLogOutput,
+ Storage,
+ AbortFlag,
+ PauseFlag,
+ Workers.GetIOWorkerPool(),
+ Workers.GetNetworkPool(),
+ State,
+ AttachmentHashes,
+ {.IsQuiet = m_Quiet,
+ .IsVerbose = m_Verbose,
+ .ForceDownload = m_ForceDownload,
+ .DecompressAttachments = m_DecompressAttachments,
+ .TempFolderPath = StorageTempPath,
+ .AttachmentOutputPath = m_AttachmentsPath});
+
+ Op.Execute();
+ }
+ else
+ {
+ CbObjectView OpsSectionObject = State.LoadOpsSectionObject();
+
+ if (m_OplogOutputPath.empty())
{
if (!m_Yes)
{
- if (OpsSection.GetSize() > 8u * 1024u * 1024u)
+ if (OpsSectionObject.GetSize() > 8u * 1024u * 1024u)
{
while (!m_Yes)
{
const std::string Prompt = fmt::format("Do you want to output an oplog of size {} to console? (yes/no) ",
- NiceBytes(SectionObject.GetSize()));
+ NiceBytes(OpsSectionObject.GetSize()));
printf("%s", Prompt.c_str());
std::string Reponse;
std::getline(std::cin, Reponse);
@@ -2480,7 +2584,7 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
}
ExtendableStringBuilder<1024> SB;
- SectionObject.ToJson(SB);
+ OpsSectionObject.ToJson(SB);
ForEachStrTok(SB.ToView(), '\n', [](std::string_view Row) {
ZEN_CONSOLE("{}", Row);
return true;
@@ -2488,17 +2592,17 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
}
else
{
- Timer.Reset();
- const std::string Extension = ToLower(m_OutputPath.extension().string());
+ Stopwatch Timer;
+ const std::string Extension = ToLower(m_OplogOutputPath.extension().string());
if (Extension == ".cb" || Extension == ".cbo")
{
- WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SectionObject.GetView().GetData(), SectionObject.GetSize()));
+ WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, OpsSectionObject.GetView().GetData(), OpsSectionObject.GetSize()));
}
else if (Extension == ".json")
{
ExtendableStringBuilder<1024> SB;
- SectionObject.ToJson(SB);
- WriteFile(m_OutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
+ OpsSectionObject.ToJson(SB);
+ WriteFile(m_OplogOutputPath, IoBuffer(IoBuffer::Wrap, SB.Data(), SB.Size()));
}
else
{
@@ -2507,17 +2611,12 @@ OplogDownloadCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** a
if (!m_Quiet)
{
ZEN_CONSOLE("Wrote {} to '{}' in {}",
- NiceBytes(FileSizeFromPath(m_OutputPath)),
- m_OutputPath,
+ NiceBytes(FileSizeFromPath(m_OplogOutputPath)),
+ m_OplogOutputPath,
NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
}
}
}
- else
- {
- throw std::runtime_error(
- fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult)));
- }
}
} // namespace zen
diff --git a/src/zen/cmds/projectstore_cmd.h b/src/zen/cmds/projectstore_cmd.h
index 58002a533..6141a7bce 100644
--- a/src/zen/cmds/projectstore_cmd.h
+++ b/src/zen/cmds/projectstore_cmd.h
@@ -288,8 +288,11 @@ private:
std::filesystem::path m_SystemRootDir;
- bool m_Quiet = false;
- bool m_Yes = false;
+ bool m_Quiet = false;
+ bool m_Verbose = false;
+ bool m_Yes = false;
+ bool m_PlainProgress = false;
+ bool m_LogProgress = false;
AuthCommandLineOptions m_AuthOptions;
@@ -299,11 +302,18 @@ private:
std::string m_Url;
bool m_AssumeHttp2 = false;
bool m_AllowRedirect = false;
+ std::string m_ZenCacheHost;
std::string m_Namespace;
std::string m_Bucket;
std::string m_BuildId;
+ bool m_ForceDownload = false;
+ bool m_BoostWorkerThreads = false;
+
+ std::filesystem::path m_OplogOutputPath;
+ bool m_DecompressAttachments = true;
- std::filesystem::path m_OutputPath;
+ std::vector<std::string> m_Attachments;
+ std::filesystem::path m_AttachmentsPath;
};
} // namespace zen
diff --git a/src/zen/threadworkers.cpp b/src/zen/threadworkers.cpp
new file mode 100644
index 000000000..2220b9c95
--- /dev/null
+++ b/src/zen/threadworkers.cpp
@@ -0,0 +1,79 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "threadworkers.h"
+
+#include <zencore/intmath.h>
+#include <zencore/logging.h>
+#include <zencore/thread.h>
+#include <zenutil/workerpools.h>
+
+//////////////////////////////////////////////////////////////////////////
+
+namespace zen {
+
+ThreadWorkers::ThreadWorkers(bool BoostWorkers, bool SingleThreaded)
+{
+ if (SingleThreaded)
+ {
+ m_NetworkPool = &GetSyncWorkerPool();
+ m_IOPool = &GetSyncWorkerPool();
+ WorkersInfo = "Not using thread workers for network or I/O, expect no progress reporting";
+ }
+ else
+ {
+ unsigned int Cores = GetHardwareConcurrency();
+ if (BoostWorkers)
+ {
+ // Cores Net Disk
+ // 2 4 3
+ // 4 6 3
+ // 8 11 6
+ // 16 16 12
+ // 32 16 24
+ // 64 16 32
+ // n 16 32
+
+ unsigned int NetworkThreadCount = Cores + (Cores / 4u) + Max(1u, (Cores / 8u));
+ NetworkThreadCount = Min(NetworkThreadCount, 16u);
+ NetworkThreadCount = Max(NetworkThreadCount, 4u);
+
+ unsigned int IOThreadCount = Cores - (Cores / 4u);
+ IOThreadCount = Min(IOThreadCount, 32u);
+ IOThreadCount = Max(IOThreadCount, 3u);
+
+ m_NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net");
+ m_IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk");
+
+ WorkersInfo =
+ fmt::format("Worker boost enabled, using {} network threads and {} worker/IO threads", NetworkThreadCount, IOThreadCount);
+ }
+ else
+ {
+ // Cores Net Disk
+ // 2 2 2
+ // 4 3 2
+ // 8 5 4
+ // 16 10 9
+ // 32 12 18
+ // 64 12 20
+ // n 12 20
+
+ unsigned int NetworkThreadCount = (Cores / 2u) + Max(1u, (Cores / 8u));
+ NetworkThreadCount = Min(NetworkThreadCount, 12u);
+ NetworkThreadCount = Max(NetworkThreadCount, 2u);
+
+ unsigned int IOThreadCount = Cores / 2u + Cores / 16u;
+ IOThreadCount = Min(IOThreadCount, 20u);
+ IOThreadCount = Max(IOThreadCount, 2u);
+
+ m_NetworkPoolInstance = std::make_unique<WorkerThreadPool>(NetworkThreadCount, "net");
+ m_IOPoolInstance = std::make_unique<WorkerThreadPool>(IOThreadCount, "disk");
+
+ WorkersInfo = fmt::format("Using {} network threads and {} worker/IO threads", NetworkThreadCount, IOThreadCount);
+ }
+ m_NetworkPool = m_NetworkPoolInstance.get();
+ m_IOPool = m_IOPoolInstance.get();
+ }
+}
+
+} // namespace zen
diff --git a/src/zen/threadworkers.h b/src/zen/threadworkers.h
new file mode 100644
index 000000000..bb80650a2
--- /dev/null
+++ b/src/zen/threadworkers.h
@@ -0,0 +1,43 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/timer.h>
+#include <zencore/zencore.h>
+
+#include <memory>
+#include <string>
+
+namespace zen {
+
+class WorkerThreadPool;
+
+class ThreadWorkers
+{
+public:
+ ThreadWorkers(bool BoostWorkers, bool SingleThreaded);
+
+ WorkerThreadPool& GetIOWorkerPool()
+ {
+ ZEN_ASSERT(m_IOPool);
+ return *m_IOPool;
+ }
+ WorkerThreadPool& GetNetworkPool()
+ {
+ ZEN_ASSERT(m_NetworkPool);
+ return *m_NetworkPool;
+ }
+
+ const std::string& GetWorkersInfo() const { return WorkersInfo; }
+
+private:
+ WorkerThreadPool* m_NetworkPool = nullptr;
+ WorkerThreadPool* m_IOPool = nullptr;
+
+ std::unique_ptr<WorkerThreadPool> m_NetworkPoolInstance;
+ std::unique_ptr<WorkerThreadPool> m_IOPoolInstance;
+
+ std::string WorkersInfo;
+};
+
+} // namespace zen
diff --git a/src/zencore/basicfile.cpp b/src/zencore/basicfile.cpp
index 6989da67e..2fa02937d 100644
--- a/src/zencore/basicfile.cpp
+++ b/src/zencore/basicfile.cpp
@@ -513,6 +513,30 @@ TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, MemoryView Data,
}
}
+void
+TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data)
+{
+ std::error_code Ec;
+ SafeWriteFile(Path, Data, Ec);
+ if (Ec)
+ {
+ throw std::system_error(Ec, fmt::format("Failed to safely write file '{}'", Path));
+ }
+}
+
+void
+TemporaryFile::SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data, std::error_code& OutEc)
+{
+ TemporaryFile TempFile;
+ if (TempFile.CreateTemporary(Path.parent_path(), OutEc); !OutEc)
+ {
+ if (TempFile.Write(Data, 0, OutEc); !OutEc)
+ {
+ TempFile.MoveTemporaryIntoPlace(Path, OutEc);
+ }
+ }
+}
+
//////////////////////////////////////////////////////////////////////////
LockFile::LockFile()
diff --git a/src/zencore/include/zencore/basicfile.h b/src/zencore/include/zencore/basicfile.h
index 465499d2b..f5c82b8fe 100644
--- a/src/zencore/include/zencore/basicfile.h
+++ b/src/zencore/include/zencore/basicfile.h
@@ -107,6 +107,8 @@ public:
static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data);
static void SafeWriteFile(const std::filesystem::path& Path, MemoryView Data, std::error_code& OutEc);
+ static void SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data);
+ static void SafeWriteFile(const std::filesystem::path& Path, const CompositeBuffer& Data, std::error_code& OutEc);
private:
void Close();
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h
new file mode 100644
index 000000000..2ddd0f11d
--- /dev/null
+++ b/src/zenremotestore/include/zenremotestore/projectstore/projectstoreoperations.h
@@ -0,0 +1,106 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#pragma once
+
+#include <zencore/iohash.h>
+#include <zencore/logging.h>
+#include <zencore/uid.h>
+#include <zencore/zencore.h>
+#include <zenremotestore/builds/buildstoragecache.h>
+#include <zenremotestore/chunking/chunkedcontent.h>
+#include <zenutil/bufferedwritefilecache.h>
+
+#include <atomic>
+#include <memory>
+
+ZEN_THIRD_PARTY_INCLUDES_START
+#include <tsl/robin_set.h>
+ZEN_THIRD_PARTY_INCLUDES_END
+
+namespace zen {
+
+class BuildStorageBase;
+class OperationLogOutput;
+struct StorageInstance;
+
+class ProjectStoreOperationOplogState
+{
+public:
+ struct Options
+ {
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+ bool ForceDownload = false;
+ std::filesystem::path TempFolderPath;
+ };
+
+ ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput,
+ StorageInstance& Storage,
+ const Oid& BuildId,
+ const Options& Options);
+
+ CbObjectView LoadBuildObject();
+ CbObjectView LoadBuildPartsObject();
+ CbObjectView LoadOpsSectionObject();
+ CbArrayView LoadBlocksArray();
+ CbArrayView LoadChunkedFilesArray();
+ CbArrayView LoadChunksArray();
+ CbArray LoadArrayFromBuildPart(std::string_view ArrayName);
+
+ const Oid& GetBuildId();
+ const Oid& GetBuildPartId();
+
+private:
+ OperationLogOutput& m_LogOutput;
+ StorageInstance& m_Storage;
+ const Oid m_BuildId;
+ const Options m_Options;
+
+ Oid m_BuildPartId;
+ CbObject m_BuildObject;
+ CbObject m_BuildPartsObject;
+ CbObject m_OpsSectionObject;
+ CbArray m_BlocksArray;
+ CbArray m_ChunkedFilesArray;
+ CbArray m_ChunksArray;
+};
+
+class ProjectStoreOperationDownloadAttachments
+{
+public:
+ struct Options
+ {
+ bool IsQuiet = false;
+ bool IsVerbose = false;
+ bool ForceDownload = false;
+ bool DecompressAttachments = true;
+ std::filesystem::path TempFolderPath;
+ std::filesystem::path AttachmentOutputPath;
+ };
+
+ ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ ProjectStoreOperationOplogState& State,
+ std::span<const IoHash> AttachmentHashes,
+ const Options& Options);
+
+ void Execute();
+
+private:
+ OperationLogOutput& m_LogOutput;
+ StorageInstance& m_Storage;
+ std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
+ WorkerThreadPool& m_IOWorkerPool;
+ WorkerThreadPool& m_NetworkPool;
+
+ ProjectStoreOperationOplogState& m_State;
+ const tsl::robin_set<IoHash, IoHash::Hasher> m_AttachmentHashes;
+ const Options m_Options;
+};
+
+} // namespace zen
diff --git a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
index 4740e7029..de858f818 100644
--- a/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
+++ b/src/zenremotestore/include/zenremotestore/projectstore/remoteprojectstore.h
@@ -180,6 +180,8 @@ RemoteProjectStore::Result LoadOplog(CidStore& ChunkStore,
std::vector<IoHash> GetBlockHashesFromOplog(CbObjectView ContainerObject);
std::vector<ThinChunkBlockDescription> GetBlocksFromOplog(CbObjectView ContainerObject, std::span<const IoHash> IncludeBlockHashes);
+ChunkedInfo ReadChunkedInfo(CbObjectView ChunkedFile);
+
void remoteprojectstore_forcelink();
} // namespace zen
diff --git a/src/zenremotestore/projectstore/projectstoreoperations.cpp b/src/zenremotestore/projectstore/projectstoreoperations.cpp
new file mode 100644
index 000000000..7dd85531c
--- /dev/null
+++ b/src/zenremotestore/projectstore/projectstoreoperations.cpp
@@ -0,0 +1,917 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include <zenremotestore/projectstore/projectstoreoperations.h>
+
+#include <zencore/compactbinaryutil.h>
+#include <zencore/parallelwork.h>
+#include <zencore/scopeguard.h>
+#include <zencore/timer.h>
+#include <zenremotestore/builds/buildstorageutil.h>
+#include <zenremotestore/chunking/chunkedfile.h>
+#include <zenremotestore/operationlogoutput.h>
+#include <zenremotestore/projectstore/remoteprojectstore.h>
+
+namespace zen {
+
+using namespace std::literals;
+
+//////////////////////////// ProjectStoreOperationOplogState
+
+ProjectStoreOperationOplogState::ProjectStoreOperationOplogState(OperationLogOutput& OperationLogOutput,
+ StorageInstance& Storage,
+ const Oid& BuildId,
+ const Options& Options)
+: m_LogOutput(OperationLogOutput)
+, m_Storage(Storage)
+, m_BuildId(BuildId)
+, m_Options(Options)
+{
+}
+
+CbObjectView
+ProjectStoreOperationOplogState::LoadBuildObject()
+{
+ if (!m_BuildObject)
+ {
+ const std::filesystem::path CachedBuildObjectPath = m_Options.TempFolderPath / "build.cbo";
+ if (!m_Options.ForceDownload && IsFile(CachedBuildObjectPath))
+ {
+ Stopwatch Timer;
+ CbValidateError Error;
+ m_BuildObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildObjectPath), Error);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedBuildObjectPath);
+ m_BuildObject = {};
+ }
+ else
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read build {} from locally cached file in {}",
+ m_BuildId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ return m_BuildObject;
+ }
+ }
+
+ Stopwatch Timer;
+ m_BuildObject = m_Storage.BuildStorage->GetBuild(m_BuildId);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Fetched build {} from {} in {}",
+ m_BuildId,
+ m_Storage.BuildStorageHttp->GetBaseUri(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ CreateDirectories(CachedBuildObjectPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedBuildObjectPath, m_BuildObject.GetBuffer().GetView());
+ }
+ return m_BuildObject;
+}
+
+const Oid&
+ProjectStoreOperationOplogState::GetBuildId()
+{
+ return m_BuildId;
+}
+
+const Oid&
+ProjectStoreOperationOplogState::GetBuildPartId()
+{
+ if (m_BuildPartId == Oid::Zero)
+ {
+ CbObjectView BuildObject = LoadBuildObject();
+ CbObjectView PartsObject = BuildObject["parts"sv].AsObjectView();
+ if (!PartsObject)
+ {
+ throw std::runtime_error(fmt::format("The build {} payload does not contain a 'parts' object"sv, m_BuildId));
+ }
+ static const std::string_view OplogContainerPartName = "oplogcontainer"sv;
+ m_BuildPartId = PartsObject[OplogContainerPartName].AsObjectId();
+ if (m_BuildPartId == Oid::Zero)
+ {
+ throw std::runtime_error(
+ fmt::format("The build {} payload 'parts' object does not contain a '{}' entry"sv, m_BuildId, OplogContainerPartName));
+ }
+ }
+ return m_BuildPartId;
+}
+
+CbObjectView
+ProjectStoreOperationOplogState::LoadBuildPartsObject()
+{
+ if (!m_BuildPartsObject)
+ {
+ const Oid BuildPartId = GetBuildPartId();
+ const std::filesystem::path CachedBuildPartObjectPath = m_Options.TempFolderPath / fmt::format("{}_part.cbo", BuildPartId);
+ if (!m_Options.ForceDownload && IsFile(CachedBuildPartObjectPath))
+ {
+ Stopwatch Timer;
+ CbValidateError Error;
+ m_BuildPartsObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedBuildPartObjectPath), Error);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedBuildPartObjectPath);
+ m_BuildPartsObject = {};
+ }
+ else
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read build part {}/{} from locally cached file in {}",
+ m_BuildId,
+ BuildPartId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ return m_BuildPartsObject;
+ }
+ }
+
+ Stopwatch Timer;
+ m_BuildPartsObject = m_Storage.BuildStorage->GetBuildPart(m_BuildId, BuildPartId);
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Fetched build part {}/{} from {} in {}",
+ m_BuildId,
+ BuildPartId,
+ m_Storage.BuildStorageHttp->GetBaseUri(),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ CreateDirectories(CachedBuildPartObjectPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedBuildPartObjectPath, m_BuildPartsObject.GetBuffer().GetView());
+ }
+ return m_BuildPartsObject;
+}
+
+CbObjectView
+ProjectStoreOperationOplogState::LoadOpsSectionObject()
+{
+ if (!m_OpsSectionObject)
+ {
+ const Oid BuildPartId = GetBuildPartId();
+ const std::filesystem::path CachedOpsSectionPath = m_Options.TempFolderPath / fmt::format("{}_part_ops.cbo", BuildPartId);
+ if (!m_Options.ForceDownload && IsFile(CachedOpsSectionPath))
+ {
+ Stopwatch Timer;
+ CbValidateError Error;
+ m_OpsSectionObject = ValidateAndReadCompactBinaryObject(IoBufferBuilder::MakeFromFile(CachedOpsSectionPath), Error);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedOpsSectionPath);
+ m_OpsSectionObject = {};
+ }
+ else if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read {}/{}/ops from locally cached file in {}",
+ BuildPartId,
+ m_BuildId,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ return m_OpsSectionObject;
+ }
+ }
+ CbObjectView ContainerObject = LoadBuildPartsObject();
+
+ Stopwatch Timer;
+
+ MemoryView OpsSection = ContainerObject["ops"sv].AsBinaryView();
+ IoBuffer OpsBuffer(IoBuffer::Wrap, OpsSection.GetData(), OpsSection.GetSize());
+ IoBuffer OpsSectionPayload = CompressedBuffer::FromCompressedNoValidate(std::move(OpsBuffer)).Decompress().AsIoBuffer();
+
+ CbValidateError ValidateResult = CbValidateError::None;
+ m_OpsSectionObject = ValidateAndReadCompactBinaryObject(std::move(OpsSectionPayload), ValidateResult);
+ if (ValidateResult != CbValidateError::None)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed to parse oplog container: '{}' ('{}')", "Section has unexpected data type", ToString(ValidateResult)));
+ }
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Decompressed and validated oplog payload {} -> {} in {}",
+ NiceBytes(OpsSection.GetSize()),
+ NiceBytes(m_OpsSectionObject.GetSize()),
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ if (m_OpsSectionObject)
+ {
+ CreateDirectories(CachedOpsSectionPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedOpsSectionPath, m_OpsSectionObject.GetBuffer().GetView());
+ }
+ }
+ return m_OpsSectionObject;
+}
+
+CbArray
+ProjectStoreOperationOplogState::LoadArrayFromBuildPart(std::string_view ArrayName)
+{
+ const Oid BuildPartId = GetBuildPartId();
+ const std::filesystem::path CachedPartArrayPath = m_Options.TempFolderPath / fmt::format("{}_part_{}.cba", BuildPartId, ArrayName);
+ if (!m_Options.ForceDownload && IsFile(CachedPartArrayPath))
+ {
+ Stopwatch Timer;
+ IoBuffer Payload = IoBufferBuilder::MakeFromFile(CachedPartArrayPath);
+ CbValidateError Error = ValidateCompactBinary(Payload.GetView(), CbValidateMode::Default);
+ if (Error != CbValidateError::None)
+ {
+ RemoveFile(CachedPartArrayPath);
+ }
+ else
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Read {}/{}/{} from locally cached file in {}",
+ BuildPartId,
+ m_BuildId,
+ ArrayName,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ CbArray Result = CbArray(SharedBuffer(std::move(Payload)));
+ return Result;
+ }
+ }
+
+ CbObjectView ContainerObject = LoadBuildPartsObject();
+
+ Stopwatch Timer;
+
+ CbArrayView ArrayView = ContainerObject[ArrayName].AsArrayView();
+
+ CbArray Result;
+ {
+ CbWriter Writer;
+ Writer << ArrayView;
+ SharedBuffer ArrayBuffer = SharedBuffer::MakeView(Writer.Save().GetView()).MakeOwned();
+ Result = CbArray(std::move(ArrayBuffer));
+ }
+
+ CreateDirectories(CachedPartArrayPath.parent_path());
+ TemporaryFile::SafeWriteFile(CachedPartArrayPath, Result.GetBuffer().GetView());
+
+ return Result;
+}
+
+CbArrayView
+ProjectStoreOperationOplogState::LoadBlocksArray()
+{
+ if (!m_BlocksArray)
+ {
+ m_BlocksArray = LoadArrayFromBuildPart("blocks"sv);
+ }
+ return m_BlocksArray;
+}
+
+CbArrayView
+ProjectStoreOperationOplogState::LoadChunkedFilesArray()
+{
+ if (!m_ChunkedFilesArray)
+ {
+ m_ChunkedFilesArray = LoadArrayFromBuildPart("chunkedfiles"sv);
+ }
+ return m_ChunkedFilesArray;
+}
+
+CbArrayView
+ProjectStoreOperationOplogState::LoadChunksArray()
+{
+ if (!m_ChunksArray)
+ {
+ m_ChunksArray = LoadArrayFromBuildPart("chunks"sv);
+ }
+ return m_ChunksArray;
+}
+
+//////////////////////////// ProjectStoreOperationDownloadAttachments
+
+ProjectStoreOperationDownloadAttachments::ProjectStoreOperationDownloadAttachments(OperationLogOutput& OperationLogOutput,
+ StorageInstance& Storage,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag,
+ WorkerThreadPool& IOWorkerPool,
+ WorkerThreadPool& NetworkPool,
+ ProjectStoreOperationOplogState& State,
+ std::span<const IoHash> AttachmentHashes,
+ const Options& Options)
+: m_LogOutput(OperationLogOutput)
+, m_Storage(Storage)
+, m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_IOWorkerPool(IOWorkerPool)
+, m_NetworkPool(NetworkPool)
+, m_State(State)
+, m_AttachmentHashes(AttachmentHashes.begin(), AttachmentHashes.end())
+, m_Options(Options)
+{
+}
+
+void
+ProjectStoreOperationDownloadAttachments::Execute()
+{
+ enum class TaskSteps : uint32_t
+ {
+ ReadAttachmentData,
+ Download,
+ AnalyzeDechunk,
+ Dechunk,
+ Cleanup,
+ StepCount
+ };
+
+ auto EndProgress =
+ MakeGuard([&]() { m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::StepCount, (uint32_t)TaskSteps::StepCount); });
+
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::ReadAttachmentData, (uint32_t)TaskSteps::StepCount);
+
+ Stopwatch Timer;
+ tsl::robin_map<IoHash, uint64_t, IoHash::Hasher> ChunkSizes;
+
+ std::vector<ChunkedInfo> ChunkedFileInfos;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> FilesToDechunk;
+ tsl::robin_set<IoHash, IoHash::Hasher> ChunkedFileRawHashes;
+ {
+ CbArrayView ChunkedFilesArray = m_State.LoadChunkedFilesArray();
+ FilesToDechunk.reserve(ChunkedFilesArray.Num());
+ for (CbFieldView ChunkedFileView : ChunkedFilesArray)
+ {
+ CbObjectView ChunkedFile = ChunkedFileView.AsObjectView();
+ IoHash ChunkedRawHash = ChunkedFile["rawhash"sv].AsHash();
+ if (m_AttachmentHashes.contains(ChunkedRawHash))
+ {
+ ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFile);
+ ChunkSizes.insert_or_assign(Chunked.RawHash, Chunked.RawSize);
+ ChunkedFileRawHashes.reserve(ChunkedFileRawHashes.size() + Chunked.ChunkHashes.size());
+ for (const IoHash& ChunkHash : Chunked.ChunkHashes)
+ {
+ ChunkedFileRawHashes.insert(ChunkHash);
+ }
+ FilesToDechunk.insert_or_assign(Chunked.RawHash, ChunkedFileInfos.size());
+ ChunkedFileInfos.emplace_back(std::move(Chunked));
+ }
+ }
+ }
+
+ tsl::robin_set<IoHash, IoHash::Hasher> LooseChunksToDownload;
+ {
+ CbArrayView ChunksArray = m_State.LoadChunksArray();
+ LooseChunksToDownload.reserve(ChunksArray.Num());
+ for (CbFieldView Chunk : ChunksArray)
+ {
+ const IoHash ChunkHash = Chunk.AsAttachment();
+ if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ LooseChunksToDownload.insert(ChunkHash);
+ }
+ }
+ }
+
+ tsl::robin_set<IoHash, IoHash::Hasher> BlocksToDownload;
+ tsl::robin_map<IoHash, IoHash, IoHash::Hasher> ChunkToBlock;
+
+ {
+ CbArrayView BlocksArray = m_State.LoadBlocksArray();
+
+ for (CbFieldView BlockView : BlocksArray)
+ {
+ CbObjectView Block = BlockView.AsObjectView();
+ IoHash BlockHash = Block["rawhash"sv].AsBinaryAttachment();
+ if (BlockHash == IoHash::Zero)
+ {
+ CbArrayView ChunksArray = Block["chunks"sv].AsArrayView();
+ for (CbFieldView ChunkHashView : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkHashView.AsAttachment();
+ if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ LooseChunksToDownload.insert(ChunkHash);
+ }
+ }
+ }
+ else
+ {
+ CbArrayView ChunksArray = Block["chunks"sv].AsArrayView();
+ for (CbFieldView ChunkHashView : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkHashView.AsHash();
+ if (m_AttachmentHashes.contains(ChunkHash) || ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ ChunkToBlock.insert_or_assign(ChunkHash, BlockHash);
+ BlocksToDownload.insert(BlockHash);
+ }
+ }
+ }
+ }
+ }
+
+ if (!m_Options.IsQuiet)
+ {
+ std::string DechunkInfo =
+ FilesToDechunk.size() > 0
+ ? fmt::format("\n{} file{} needs to be dechunked", FilesToDechunk.size(), FilesToDechunk.size() == 1 ? "" : "s")
+ : "";
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Need to download {} block{} and {} chunk{}{}",
+ BlocksToDownload.size(),
+ BlocksToDownload.size() == 1 ? "" : "s",
+ LooseChunksToDownload.size(),
+ LooseChunksToDownload.size() == 1 ? "" : "s",
+ DechunkInfo);
+ }
+
+ auto GetBuildBlob = [this](const IoHash& RawHash, const std::filesystem::path& OutputPath) {
+ IoBuffer Payload;
+ if (m_Storage.BuildCacheStorage)
+ {
+ Payload = m_Storage.BuildCacheStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
+ }
+ if (!Payload)
+ {
+ Payload = m_Storage.BuildStorage->GetBuildBlob(m_State.GetBuildId(), RawHash);
+ if (m_Storage.BuildCacheStorage)
+ {
+ m_Storage.BuildCacheStorage->PutBuildBlob(m_State.GetBuildId(),
+ RawHash,
+ Payload.GetContentType(),
+ CompositeBuffer(SharedBuffer(Payload)));
+ }
+ }
+ uint64_t PayloadSize = Payload.GetSize();
+ IoBufferFileReference FileRef;
+ if (Payload.GetFileReference(FileRef) && (FileRef.FileChunkOffset == 0) && (FileRef.FileChunkSize == PayloadSize))
+ {
+ std::error_code Ec;
+ std::filesystem::path TempPayloadPath = PathFromHandle(FileRef.FileHandle, Ec);
+ if (!Ec)
+ {
+ Payload.SetDeleteOnClose(false);
+ Payload = {};
+ RenameFile(TempPayloadPath, OutputPath, Ec);
+ if (Ec)
+ {
+ // Re-open the temp file again
+ BasicFile OpenTemp(TempPayloadPath, BasicFile::Mode::kDelete);
+ Payload = IoBuffer(IoBuffer::File, OpenTemp.Detach(), 0, PayloadSize, true);
+ Payload.SetDeleteOnClose(true);
+ }
+ }
+ }
+ if (Payload)
+ {
+ TemporaryFile::SafeWriteFile(OutputPath, Payload.GetView());
+ }
+ };
+
+ std::filesystem::path TempAttachmentPath = MakeSafeAbsolutePath(m_Options.AttachmentOutputPath) / ".tmp";
+ CreateDirectories(TempAttachmentPath);
+ auto _0 = MakeGuard([this, &TempAttachmentPath]() {
+ if (true)
+ {
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput, "Cleaning up temporary directory");
+ }
+ CleanDirectory(TempAttachmentPath, true);
+ RemoveDir(TempAttachmentPath);
+ }
+ });
+
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Download, (uint32_t)TaskSteps::StepCount);
+
+ std::filesystem::path BlocksPath = TempAttachmentPath / "blocks";
+ CreateDirectories(BlocksPath);
+
+ {
+ Stopwatch DownloadTimer;
+
+ std::filesystem::path LooseChunksPath = TempAttachmentPath / "loosechunks";
+ CreateDirectories(LooseChunksPath);
+
+ std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Downloading"));
+ OperationLogOutput::ProgressBar& DownloadProgressBar(*ProgressBarPtr);
+
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> LooseChunksCompleted;
+ std::atomic<size_t> BlocksCompleted;
+
+ std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end());
+
+ for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++)
+ {
+ Work.ScheduleWork(m_NetworkPool, [&, LooseChunkIndex](std::atomic<bool>&) {
+ const IoHash RawHash = LooseChunks[LooseChunkIndex];
+ std::filesystem::path LooseChunkOutputPath = LooseChunksPath / fmt::format("{}.ucb", RawHash);
+ if (m_Options.ForceDownload || !IsFile(LooseChunkOutputPath))
+ {
+ GetBuildBlob(RawHash, LooseChunkOutputPath);
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded chunk {}", RawHash);
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex, LooseChunkOutputPath](std::atomic<bool>&) {
+ const IoHash RawHash = LooseChunks[LooseChunkIndex];
+ IoHash ChunkRawHash;
+ uint64_t ChunkRawSize;
+ CompressedBuffer CompressedChunk =
+ CompressedBuffer::FromCompressed(SharedBuffer(IoBufferBuilder::MakeFromFile(LooseChunkOutputPath)),
+ ChunkRawHash,
+ ChunkRawSize);
+ if (!CompressedChunk)
+ {
+ throw std::runtime_error(fmt::format("Downloaded chunk {} is malformed", RawHash));
+ }
+ if (ChunkRawHash != RawHash)
+ {
+ throw std::runtime_error(fmt::format("Downloaded chunk {} mismatches content hash {}", RawHash, ChunkRawHash));
+ }
+
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash);
+ if (m_Options.DecompressAttachments)
+ {
+ BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate);
+ if (!CompressedChunk.DecompressToStream(
+ 0,
+ ChunkRawSize,
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_UNUSED(SourceSize);
+ ChunkOutput.Write(Range, Offset);
+ return true;
+ }))
+ {
+ ChunkOutput.Close();
+ RemoveFile(ChunkOutputPath);
+ throw std::runtime_error(fmt::format("Failed to decompress chunk {} to ", RawHash, ChunkOutputPath));
+ }
+ }
+ else
+ {
+ TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed());
+ }
+
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote loose chunk {} to '{}'", RawHash, ChunkOutputPath);
+ LooseChunksCompleted++;
+ });
+ });
+ }
+
+ std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end());
+
+ RwLock ChunkSizesLock;
+ for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++)
+ {
+ Work.ScheduleWork(m_NetworkPool, [&, BlockIndex](std::atomic<bool>&) {
+ const IoHash& RawHash = Blocks[BlockIndex];
+ std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", RawHash);
+ if (m_Options.ForceDownload || !IsFile(BlockOutputPath))
+ {
+ GetBuildBlob(RawHash, BlockOutputPath);
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Downloaded block {}", RawHash);
+ }
+
+ Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex, BlockOutputPath](std::atomic<bool>&) {
+ const IoHash& BlockRawHash = Blocks[BlockIndex];
+
+ SharedBuffer BlockBuffer =
+ CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress();
+
+ uint64_t HeaderSize = 0;
+ if (IterateChunkBlock(
+ SharedBuffer(BlockBuffer),
+ [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) {
+ if (m_AttachmentHashes.contains(ChunkHash))
+ {
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkHash);
+
+ if (m_Options.DecompressAttachments)
+ {
+ BasicFile ChunkOutput(ChunkOutputPath, BasicFile::Mode::kTruncate);
+ if (!CompressedChunk.DecompressToStream(0u,
+ ~uint64_t(0),
+ [&](uint64_t SourceOffset,
+ uint64_t SourceSize,
+ uint64_t Offset,
+ const CompositeBuffer& Range) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_UNUSED(SourceSize);
+ ChunkOutput.Write(Range, Offset);
+ return true;
+ }))
+ {
+ ChunkOutput.Close();
+ RemoveFile(ChunkOutputPath);
+ throw std::runtime_error(
+ fmt::format("Failed to decompress chunk {} to ", ChunkHash, ChunkOutputPath));
+ }
+ }
+ else
+ {
+ TemporaryFile::SafeWriteFile(ChunkOutputPath, CompressedChunk.GetCompressed());
+ }
+
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Wrote block chunk {} to '{}'", ChunkHash, ChunkOutputPath);
+ }
+ if (ChunkedFileRawHashes.contains(ChunkHash))
+ {
+ uint64_t RawSize = CompressedChunk.DecodeRawSize();
+ ChunkSizesLock.WithExclusiveLock([&]() { ChunkSizes.insert_or_assign(ChunkHash, RawSize); });
+ }
+ },
+ HeaderSize))
+ {
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash));
+ }
+ BlocksCompleted++;
+ });
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused, PendingWork);
+
+ std::string Details = fmt::format("{}/{} blocks, {}/{} chunks downloaded",
+ BlocksCompleted.load(),
+ BlocksToDownload.size(),
+ LooseChunksCompleted.load(),
+ LooseChunksToDownload.size());
+ DownloadProgressBar.UpdateState({.Task = "Downloading",
+ .Details = Details,
+ .TotalCount = BlocksToDownload.size() + LooseChunksToDownload.size(),
+ .RemainingCount = BlocksToDownload.size() + LooseChunksToDownload.size() -
+ (BlocksCompleted.load() + LooseChunksCompleted.load()),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ DownloadProgressBar.Finish();
+
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "{} block{} downloaded, {} loose chunk{} downloaded in {}",
+ BlocksToDownload.size(),
+ BlocksToDownload.size() == 1 ? "" : "s",
+ LooseChunksToDownload.size(),
+ LooseChunksToDownload.size() == 1 ? "" : "s",
+ NiceTimeSpanMs(DownloadTimer.GetElapsedTimeMs()));
+ }
+ }
+
+ if (!ChunkedFileInfos.empty())
+ {
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::AnalyzeDechunk, (uint32_t)TaskSteps::StepCount);
+
+ std::filesystem::path ChunkedFilesPath = TempAttachmentPath / "chunkedfiles";
+ CreateDirectories(ChunkedFilesPath);
+
+ try
+ {
+ std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Dechunking"));
+ OperationLogOutput::ProgressBar& DechunkingProgressBar(*ProgressBarPtr);
+
+ std::atomic<uint64_t> ChunksWritten;
+
+ std::vector<std::unique_ptr<BasicFile>> OpenChunkedFiles;
+ struct ChunkOpenFileTarget
+ {
+ size_t OpenChunkedFileIndex = 0;
+ uint64_t Offset = 0;
+ };
+
+ std::vector<ChunkOpenFileTarget> ChunkOpenFileTargets;
+
+ std::vector<eastl::fixed_vector<size_t, 1>> Targets;
+ tsl::robin_map<IoHash, size_t, IoHash::Hasher> ChunkedFileTargetLookup;
+
+ // Build up file and offset targets for each chunk
+ for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++)
+ {
+ const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex];
+
+ const size_t OpenChunkedFileIndex = OpenChunkedFiles.size();
+
+ uint64_t Offset = 0;
+ for (uint32_t ChunkIndex : ChunkedFileInfo.ChunkSequence)
+ {
+ const IoHash& ChunkHash = ChunkedFileInfo.ChunkHashes[ChunkIndex];
+ auto ChunkSizeIt = ChunkSizes.find(ChunkHash);
+ if (ChunkSizeIt == ChunkSizes.end())
+ {
+ throw std::runtime_error(
+ fmt::format("Missing chunk {} to build chunked file {}", ChunkHash, ChunkedFileInfo.RawHash));
+ }
+ const uint64_t ChunkSize = ChunkSizeIt->second;
+ if (auto ChunkTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash);
+ ChunkTargetLookupIt != ChunkedFileTargetLookup.end())
+ {
+ size_t TargetIndex = ChunkTargetLookupIt->second;
+ eastl::fixed_vector<size_t, 1>& Target = Targets[TargetIndex];
+ Target.push_back(ChunkOpenFileTargets.size());
+ }
+ else
+ {
+ ChunkedFileTargetLookup.insert_or_assign(ChunkHash, Targets.size());
+ Targets.push_back({ChunkOpenFileTargets.size()});
+ }
+ ChunkOpenFileTargets.push_back({.OpenChunkedFileIndex = OpenChunkedFileIndex, .Offset = Offset});
+ Offset += ChunkSize;
+ }
+ std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash);
+ OpenChunkedFiles.emplace_back(std::make_unique<BasicFile>(ChunkedFilePath, BasicFile::Mode::kTruncate));
+ PrepareFileForScatteredWrite(OpenChunkedFiles.back()->Handle(), ChunkedFileInfo.RawSize);
+ }
+
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Dechunk, (uint32_t)TaskSteps::StepCount);
+
+ std::vector<std::atomic<uint8_t>> ChunkWrittenFlags(ChunkOpenFileTargets.size());
+
+ auto WriteChunk = [&](size_t ChunkedTargetsIndex, CompressedBuffer&& CompressedChunkBuffer) {
+ const eastl::fixed_vector<size_t, 1>& Target = Targets[ChunkedTargetsIndex];
+ for (size_t TargetIndex : Target)
+ {
+ uint8_t Expected = 0;
+ if (ChunkWrittenFlags[TargetIndex].compare_exchange_strong(Expected, true))
+ {
+ const ChunkOpenFileTarget& ChunkTarget = ChunkOpenFileTargets[TargetIndex];
+ BasicFile& OutputFile = *OpenChunkedFiles[ChunkTarget.OpenChunkedFileIndex];
+
+ if (!CompressedChunkBuffer.DecompressToStream(
+ 0u,
+ ~uint64_t(0),
+ [&](uint64_t SourceOffset, uint64_t SourceSize, uint64_t Offset, const CompositeBuffer& Range) {
+ ZEN_UNUSED(SourceOffset);
+ ZEN_UNUSED(SourceSize);
+ OutputFile.Write(Range, ChunkTarget.Offset + Offset);
+ Offset += Range.GetSize();
+ return true;
+ }))
+ {
+ std::error_code DummyEc;
+ throw std::runtime_error(fmt::format("Failed to decompress chunk {} at offset {} to {}",
+ CompressedChunkBuffer.DecodeRawHash(),
+ ChunkTarget.Offset,
+ PathFromHandle(OutputFile.Handle(), DummyEc)));
+ }
+ ChunksWritten++;
+ }
+ }
+ };
+
+ {
+ Stopwatch DechunkTimer;
+
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::vector<IoHash> LooseChunks(LooseChunksToDownload.begin(), LooseChunksToDownload.end());
+
+ for (size_t LooseChunkIndex = 0; LooseChunkIndex < LooseChunks.size(); LooseChunkIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool, [&, LooseChunkIndex](std::atomic<bool>&) {
+ const IoHash RawHash = LooseChunks[LooseChunkIndex];
+ if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(RawHash);
+ ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end())
+ {
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", RawHash);
+ IoBuffer ChunkBuffer = IoBufferBuilder::MakeFromFile(ChunkOutputPath);
+
+ WriteChunk(ChunkedFileTargetLookupIt->second,
+ CompressedBuffer::FromCompressedNoValidate(std::move(ChunkBuffer)));
+ }
+ });
+ }
+
+ std::vector<IoHash> Blocks(BlocksToDownload.begin(), BlocksToDownload.end());
+
+ for (size_t BlockIndex = 0; BlockIndex < Blocks.size(); BlockIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool, [&, BlockIndex](std::atomic<bool>&) {
+ const IoHash& BlockRawHash = Blocks[BlockIndex];
+ std::filesystem::path BlockOutputPath = BlocksPath / fmt::format("{}.ucb", BlockRawHash);
+
+ SharedBuffer BlockBuffer =
+ CompressedBuffer::FromCompressedNoValidate(IoBufferBuilder::MakeFromFile(BlockOutputPath)).Decompress();
+
+ uint64_t HeaderSize = 0;
+ if (IterateChunkBlock(
+ SharedBuffer(BlockBuffer),
+ [&](CompressedBuffer&& CompressedChunk, const IoHash& ChunkHash) {
+ if (auto ChunkedFileTargetLookupIt = ChunkedFileTargetLookup.find(ChunkHash);
+ ChunkedFileTargetLookupIt != ChunkedFileTargetLookup.end())
+ {
+ WriteChunk(ChunkedFileTargetLookupIt->second, std::move(CompressedChunk));
+ }
+ },
+ HeaderSize))
+ {
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Failed to iterate block {}", BlockRawHash));
+ }
+ });
+ }
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused, PendingWork);
+ std::string Details = fmt::format("{}/{} chunks written", ChunksWritten.load(), ChunkOpenFileTargets.size());
+
+ DechunkingProgressBar.UpdateState(
+ {.Task = "Dechunking ",
+ .Details = Details,
+ .TotalCount = ChunkOpenFileTargets.size(),
+ .RemainingCount = ChunkOpenFileTargets.size() - ChunksWritten.load(),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ DechunkingProgressBar.Finish();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "{} file{} dechunked in {}",
+ ChunkedFileInfos.size(),
+ ChunkedFileInfos.size() == 1 ? "" : "s",
+ NiceTimeSpanMs(DechunkTimer.GetElapsedTimeMs()));
+ }
+ }
+ }
+ catch (const std::exception&)
+ {
+ for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++)
+ {
+ const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex];
+ std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash);
+ RemoveFile(ChunkedFilePath);
+ }
+ throw;
+ }
+ {
+ Stopwatch VerifyTimer;
+ std::unique_ptr<OperationLogOutput::ProgressBar> ProgressBarPtr(m_LogOutput.CreateProgressBar("Verifying"));
+ OperationLogOutput::ProgressBar& VerifyProgressBar(*ProgressBarPtr);
+
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(m_AbortFlag, m_PauseFlag, WorkerThreadPool::EMode::EnableBacklog);
+
+ std::atomic<size_t> DechunkedFilesMoved;
+
+ for (size_t ChunkedFileIndex = 0; ChunkedFileIndex < ChunkedFileInfos.size(); ChunkedFileIndex++)
+ {
+ Work.ScheduleWork(m_IOWorkerPool, [&, ChunkedFileIndex](std::atomic<bool>&) {
+ const ChunkedInfo& ChunkedFileInfo = ChunkedFileInfos[ChunkedFileIndex];
+ std::filesystem::path ChunkedFilePath = ChunkedFilesPath / fmt::format("{}.ucb", ChunkedFileInfo.RawHash);
+ IoHash VerifyHash = IoHash::HashBuffer(IoBufferBuilder::MakeFromFile(ChunkedFilePath));
+ if (VerifyHash != ChunkedFileInfo.RawHash)
+ {
+ throw std::runtime_error(
+ fmt::format("Dechunked file {} has mismatching hash {}", ChunkedFileInfo.RawHash, VerifyHash));
+ }
+ std::filesystem::path ChunkOutputPath = m_Options.AttachmentOutputPath / fmt::format("{}", ChunkedFileInfo.RawHash);
+ RenameFile(ChunkedFilePath, ChunkOutputPath);
+ ZEN_OPERATION_LOG_DEBUG(m_LogOutput, "Moved dechunked file {} to '{}'", ChunkedFileInfo.RawHash, ChunkOutputPath);
+ DechunkedFilesMoved++;
+ });
+ }
+
+ Work.Wait(m_LogOutput.GetProgressUpdateDelayMS(), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused, PendingWork);
+ std::string Details = fmt::format("{}/{} files verified", DechunkedFilesMoved.load(), ChunkedFileInfos.size());
+
+ VerifyProgressBar.UpdateState({.Task = "Verifying ",
+ .Details = Details,
+ .TotalCount = ChunkedFileInfos.size(),
+ .RemainingCount = ChunkedFileInfos.size() - DechunkedFilesMoved.load(),
+ .Status = OperationLogOutput::ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
+ false);
+ });
+ VerifyProgressBar.Finish();
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Verified {} chunked file{} in {}",
+ ChunkedFileInfos.size(),
+ ChunkedFileInfos.size() == 1 ? "" : "s",
+ NiceTimeSpanMs(VerifyTimer.GetElapsedTimeMs()));
+ }
+ }
+ }
+ if (!m_Options.IsQuiet)
+ {
+ ZEN_OPERATION_LOG_INFO(m_LogOutput,
+ "Downloaded {} attachment{} to '{}' in {}",
+ m_AttachmentHashes.size(),
+ m_AttachmentHashes.size() == 1 ? "" : "s",
+ m_Options.AttachmentOutputPath,
+ NiceTimeSpanMs(Timer.GetElapsedTimeMs()));
+ }
+ m_LogOutput.SetLogOperationProgress((uint32_t)TaskSteps::Cleanup, (uint32_t)TaskSteps::StepCount);
+}
+
+} // namespace zen
diff --git a/src/zenremotestore/projectstore/remoteprojectstore.cpp b/src/zenremotestore/projectstore/remoteprojectstore.cpp
index 7e02e5d69..a928d1644 100644
--- a/src/zenremotestore/projectstore/remoteprojectstore.cpp
+++ b/src/zenremotestore/projectstore/remoteprojectstore.cpp
@@ -2743,26 +2743,10 @@ ParseOplogContainer(const CbObject& ContainerObject,
IoHash RawHash = ChunkedFileView["rawhash"sv].AsHash();
if (OpsAttachments.contains(RawHash) && (!HasAttachment(RawHash)))
{
- ChunkedInfo Chunked;
- Chunked.RawHash = RawHash;
- Chunked.RawSize = ChunkedFileView["rawsize"sv].AsUInt64();
- CbArrayView ChunksArray = ChunkedFileView["chunks"sv].AsArrayView();
- Chunked.ChunkHashes.reserve(ChunksArray.Num());
- for (CbFieldView ChunkField : ChunksArray)
- {
- const IoHash ChunkHash = ChunkField.AsHash();
- Chunked.ChunkHashes.emplace_back(ChunkHash);
- }
+ ChunkedInfo Chunked = ReadChunkedInfo(ChunkedFileView);
+
OnReferencedAttachments(Chunked.ChunkHashes);
OpsAttachments.insert(Chunked.ChunkHashes.begin(), Chunked.ChunkHashes.end());
- CbArrayView SequenceArray = ChunkedFileView["sequence"sv].AsArrayView();
- Chunked.ChunkSequence.reserve(SequenceArray.Num());
- for (CbFieldView SequenceField : SequenceArray)
- {
- uint32_t SequenceIndex = SequenceField.AsUInt32();
- ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
- Chunked.ChunkSequence.push_back(SequenceIndex);
- }
OnChunkedAttachment(Chunked);
ZEN_INFO("Requesting chunked attachment '{}' ({}) built from {} chunks",
Chunked.RawHash,
@@ -3315,6 +3299,33 @@ LoadOplog(CidStore& ChunkStore,
return Result;
}
+ChunkedInfo
+ReadChunkedInfo(CbObjectView ChunkedFile)
+{
+ using namespace std::literals;
+
+ ChunkedInfo Chunked;
+ Chunked.RawHash = ChunkedFile["rawhash"sv].AsHash();
+ Chunked.RawSize = ChunkedFile["rawsize"sv].AsUInt64();
+
+ CbArrayView ChunksArray = ChunkedFile["chunks"sv].AsArrayView();
+ Chunked.ChunkHashes.reserve(ChunksArray.Num());
+ for (CbFieldView ChunkField : ChunksArray)
+ {
+ const IoHash ChunkHash = ChunkField.AsHash();
+ Chunked.ChunkHashes.emplace_back(ChunkHash);
+ }
+ CbArrayView SequenceArray = ChunkedFile["sequence"sv].AsArrayView();
+ Chunked.ChunkSequence.reserve(SequenceArray.Num());
+ for (CbFieldView SequenceField : SequenceArray)
+ {
+ uint32_t SequenceIndex = SequenceField.AsUInt32();
+ ZEN_ASSERT(SequenceIndex < Chunked.ChunkHashes.size());
+ Chunked.ChunkSequence.push_back(SequenceIndex);
+ }
+ return Chunked;
+}
+
//////////////////////////////////////////////////////////////////////////
// These are here to avoid vtable leakage