aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/builds_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-05 14:40:02 +0200
committerGitHub Enterprise <[email protected]>2025-06-05 14:40:02 +0200
commit40b9386054de3c23f77da74eefaa743240d164fd (patch)
tree9c4448f86d1df00b3d0f5d5dd94506bca8c067d9 /src/zen/cmds/builds_cmd.cpp
parentrevert system temp dir for builds upload (#422) (diff)
downloadarchived-zen-40b9386054de3c23f77da74eefaa743240d164fd.tar.xz
archived-zen-40b9386054de3c23f77da74eefaa743240d164fd.zip
pause, resume and abort running builds cmd (#421)
- Feature: `zen builds pause`, `zen builds resume` and `zen builds abort` commands to control a running `zen builds` command - `--process-id` the process id to control, if omitted it tries to find a running process using the same executable as itself - Improvement: Process report now indicates if it is pausing or aborting
Diffstat (limited to 'src/zen/cmds/builds_cmd.cpp')
-rw-r--r--src/zen/cmds/builds_cmd.cpp437
1 files changed, 360 insertions, 77 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index b04575009..49b032ab1 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -14,6 +14,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/trace.h>
@@ -59,23 +60,205 @@ ZEN_THIRD_PARTY_INCLUDES_END
#define ZEN_CLOUD_STORAGE "Cloud Storage"
namespace zen {
+
+using namespace std::literals;
+
namespace {
+ namespace zenutil {
+#if ZEN_PLATFORM_WINDOWS
+ class SecurityAttributes
+ {
+ public:
+ inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; }
+
+ protected:
+ SECURITY_ATTRIBUTES m_Attributes{};
+ SECURITY_DESCRIPTOR m_Sd{};
+ };
+
+ // Security attributes which allows any user access
+
+ class AnyUserSecurityAttributes : public SecurityAttributes
+ {
+ public:
+ AnyUserSecurityAttributes()
+ {
+ m_Attributes.nLength = sizeof m_Attributes;
+ m_Attributes.bInheritHandle = false; // Disable inheritance
+
+ const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
+
+ if (Success)
+ {
+ if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE))
+ {
+ ThrowLastError("SetSecurityDescriptorDacl failed");
+ }
+
+ m_Attributes.lpSecurityDescriptor = &m_Sd;
+ }
+ }
+ };
+#endif // ZEN_PLATFORM_WINDOWS
+
+ } // namespace zenutil
+
static std::atomic<bool> AbortFlag = false;
- static void SignalCallbackHandler(int SigNum)
+ static std::atomic<bool> PauseFlag = false;
+
+ static void SignalCallbackHandler(int SigNum)
{
if (SigNum == SIGINT)
{
+ PauseFlag = false;
AbortFlag = true;
}
#if ZEN_PLATFORM_WINDOWS
if (SigNum == SIGBREAK)
{
+ PauseFlag = false;
AbortFlag = true;
}
#endif // ZEN_PLATFORM_WINDOWS
}
- using namespace std::literals;
+ struct ZenStateSharedData
+ {
+ static constexpr uint64_t kMagicV1 = 0x3176646d636e657a; // zencmdv1
+
+ uint64_t Magic = 0; // Implies the size and layout of this struct - changes to the data requires change to Magic constant
+ std::atomic<uint32_t> Pid;
+ uint8_t SessionId[12];
+ uint8_t Padding1[4];
+ std::atomic<uint8_t> Abort;
+ std::atomic<uint8_t> Pause;
+ uint8_t Padding2[2];
+ };
+
+ struct MemMap
+ {
+ void* Handle = nullptr;
+ void* Data = nullptr;
+ size_t Size = 0;
+ std::string Name;
+ };
+
+ class ZenState
+ {
+ public:
+ ZenState(const ZenState&) = delete;
+ ZenState& operator=(const ZenState&) = delete;
+
+ ZenState();
+ explicit ZenState(uint32_t Pid);
+ ~ZenState();
+
+ const ZenStateSharedData& StateData() const
+ {
+ ZEN_ASSERT(m_Data);
+ return *m_Data;
+ }
+ ZenStateSharedData& StateData()
+ {
+ ZEN_ASSERT(m_Data);
+ return *m_Data;
+ }
+
+ private:
+ static constexpr std::string_view MapBaseName = "UnrealEngineZenCmd_"sv;
+ static constexpr size_t MapSize = sizeof(ZenStateSharedData);
+
+ bool m_Created = false;
+ std::unique_ptr<SharedMemory> m_MemMap;
+ ZenStateSharedData* m_Data = nullptr;
+
+ std::thread m_StateMonitor;
+ Event m_ExitStateMonitorEvent;
+ };
+
+ ZenState::ZenState(uint32_t Pid)
+ {
+ const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid);
+
+ if (!IsProcessRunning(Pid))
+ {
+ throw std::runtime_error(fmt::format("The process {} is not running", Pid));
+ }
+ std::unique_ptr<SharedMemory> MemMap = OpenSharedMemory(ZenStateMapName, MapSize, false);
+ if (!MemMap)
+ {
+ throw std::runtime_error(fmt::format("The process {} is not a running zen process", Pid));
+ }
+ ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData();
+ if (uint64_t MemMagic = data->Magic; MemMagic != ZenStateSharedData::kMagicV1)
+ {
+ throw std::runtime_error(fmt::format("The mem map for process {} has an unsupported magic {:x}, expected {:x}",
+ Pid,
+ MemMagic,
+ ZenStateSharedData::kMagicV1));
+ }
+ if (uint32_t MemPid = data->Pid.load(); MemPid != Pid)
+ {
+ throw std::runtime_error(
+ fmt::format("The mem map for process {} has an missmatching pid of {}, expected {}", Pid, MemPid, Pid));
+ }
+ m_MemMap = std::move(MemMap);
+ m_Data = data;
+ }
+
+ ZenState::ZenState()
+ {
+ int Pid = GetCurrentProcessId();
+
+ const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid);
+
+ std::unique_ptr<SharedMemory> MemMap = CreateSharedMemory(ZenStateMapName, MapSize, false);
+ if (!MemMap)
+ {
+ throw std::runtime_error(fmt::format("The mem map for process {} could not be created", Pid));
+ }
+
+ ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData();
+ memset(data, 0, sizeof(ZenStateSharedData));
+ data->Magic = ZenStateSharedData::kMagicV1;
+ data->Pid.store(gsl::narrow<uint32_t>(Pid));
+ data->SessionId;
+ data->Abort.store(false);
+ data->Pause.store(false);
+ const Oid SessionId = GetSessionId();
+ memcpy(data->SessionId, &SessionId, sizeof SessionId);
+
+ m_MemMap = std::move(MemMap);
+ m_Data = data;
+
+ m_StateMonitor = std::thread([this]() {
+ while (!m_ExitStateMonitorEvent.Wait(500))
+ {
+ if (m_Data->Abort.load())
+ {
+ AbortFlag.store(true);
+ }
+ PauseFlag.store(m_Data->Pause.load());
+ }
+ });
+ }
+
+ ZenState::~ZenState()
+ {
+ try
+ {
+ if (m_StateMonitor.joinable())
+ {
+ m_ExitStateMonitorEvent.Set();
+ m_StateMonitor.join();
+ }
+ m_MemMap.reset();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("ZenState::~ZenState threw exception: {}", Ex.what());
+ }
+ }
static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
@@ -435,7 +618,7 @@ namespace {
std::atomic<uint64_t> DiscoveredItemCount = 0;
std::atomic<uint64_t> DeletedItemCount = 0;
std::atomic<uint64_t> DeletedByteCount = 0;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -549,8 +732,8 @@ namespace {
uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
LastUpdateTimeMs = Timer.GetElapsedTimeMs();
uint64_t Deleted = DeletedItemCount.load();
@@ -559,7 +742,8 @@ namespace {
Progress.UpdateState({.Task = "Cleaning folder ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -598,12 +782,17 @@ namespace {
Progress.UpdateState({.Task = "Cleaning folder ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
false);
}
}
Progress.Finish();
+ if (AbortFlag)
+ {
+ return false;
+ }
uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
if (ElapsedTimeMs >= 200)
@@ -1954,7 +2143,7 @@ namespace {
WorkerThreadPool& NetworkPool = GetNetworkPool();
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -2117,8 +2306,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
@@ -2141,7 +2330,8 @@ namespace {
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
.RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))},
+ (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -2397,7 +2587,7 @@ namespace {
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
@@ -2567,8 +2757,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load());
@@ -2587,7 +2777,8 @@ namespace {
{.Task = "Generating blocks",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
- .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -2626,7 +2817,7 @@ namespace {
FilteredRate FilteredCompressedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<size_t> UploadedBlockSize = 0;
std::atomic<size_t> UploadedBlockCount = 0;
@@ -3006,8 +3197,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
@@ -3035,13 +3226,15 @@ namespace {
ProgressBar.UpdateState({.Task = "Uploading blobs ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
- .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)},
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
ProgressBar.Finish();
+
UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
@@ -3493,7 +3686,7 @@ namespace {
Content,
*ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -3506,16 +3699,18 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = TotalRawSize,
- .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()},
+ .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return;
}
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
}
ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
@@ -4266,7 +4461,7 @@ namespace {
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size());
@@ -4414,8 +4609,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
VerifyFolderStats.FilesVerified.load(),
PathCount,
@@ -4424,12 +4619,18 @@ namespace {
ProgressBar.UpdateState({.Task = "Verifying files ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
- .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
ProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
+
for (const std::string& Error : Errors)
{
ZEN_CONSOLE("{}", Error);
@@ -5361,7 +5562,7 @@ namespace {
Stopwatch Timer;
auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> CompletedPathCount = 0;
uint32_t PathIndex = 0;
@@ -5392,7 +5593,7 @@ namespace {
});
PathIndex += PathRangeCount;
}
- Work.Wait(200, [&](bool, ptrdiff_t) {
+ Work.Wait(200, [&](bool, bool, ptrdiff_t) {
if (ProgressCallback)
{
ProgressCallback(PathCount, CompletedPathCount.load());
@@ -5670,7 +5871,7 @@ namespace {
ScavengedPaths.resize(ScavengePathCount);
ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> PathsFound(0);
std::atomic<uint64_t> ChunksFound(0);
@@ -5800,8 +6001,8 @@ namespace {
{
ZEN_TRACE_CPU("ScavengeScan_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavanging",
PathsScavenged.load(),
ScavengePathCount,
@@ -5810,11 +6011,17 @@ namespace {
ScavengeProgressBar.UpdateState({.Task = "Scavenging ",
.Details = Details,
.TotalCount = ScavengePathCount,
- .RemainingCount = ScavengePathCount - PathsScavenged.load()},
+ .RemainingCount = ScavengePathCount - PathsScavenged.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
+
ScavengeProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
for (uint32_t ScavengedContentIndex = 0;
ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
@@ -6130,7 +6337,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct LooseChunkHashWorkData
{
@@ -7583,8 +7790,8 @@ namespace {
{
ZEN_TRACE_CPU("WriteChunks_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
@@ -7610,7 +7817,8 @@ namespace {
.Details = Details,
.TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
.RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load())
- : (BytesToWrite - DiskStats.WriteByteCount.load())},
+ : (BytesToWrite - DiskStats.WriteByteCount.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
@@ -7618,13 +7826,12 @@ namespace {
FilteredWrittenBytesPerSecond.Stop();
FilteredDownloadedBytesPerSecond.Stop();
+ WriteProgressBar.Finish();
if (AbortFlag)
{
return;
}
- WriteProgressBar.Finish();
-
if (!PrimeCacheOnly)
{
uint32_t RawSequencesMissingWriteCount = 0;
@@ -7771,7 +7978,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t LocalPathIndex : FilesToCache)
{
@@ -7800,26 +8007,26 @@ namespace {
{
ZEN_TRACE_CPU("CacheLocal_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t WorkTotal = FilesToCache.size();
const uint64_t WorkComplete = CachedCount.load();
std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
CacheLocalProgressBar.UpdateState({.Task = "Caching local ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
+ CacheLocalProgressBar.Finish();
if (AbortFlag)
{
return;
}
- CacheLocalProgressBar.Finish();
-
ZEN_DEBUG(
"Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
"Delete: {}",
@@ -7858,7 +8065,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
@@ -8109,26 +8316,21 @@ namespace {
{
ZEN_TRACE_CPU("FinalizeTree_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
-
- if (AbortFlag)
- {
- return;
- }
-
RebuildProgressBar.Finish();
}
}
@@ -8690,10 +8892,15 @@ namespace {
ProgressBar.UpdateState({.Task = "Checking files ",
.Details = Details,
.TotalCount = PathCount,
- .RemainingCount = PathCount - CompletedPathCount},
+ .RemainingCount = PathCount - CompletedPathCount,
+ .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
false);
});
ProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return {};
+ }
}
bool ScanContent = true;
@@ -8731,7 +8938,7 @@ namespace {
UpdatedContent,
ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -8744,16 +8951,19 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = ByteCountToScan,
- .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
+ .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return {};
}
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}});
}
}
@@ -8799,7 +9009,7 @@ namespace {
OutLocalFolderContent,
ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -8812,18 +9022,19 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = ByteCountToScan,
- .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())},
+ .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return {};
}
-
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
}
return LocalContent;
}
@@ -9740,6 +9951,21 @@ BuildsCommand::BuildsCommand()
m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"});
m_FetchBlobOptions.positional_help("build-id blob-hash");
+ auto AddZenProcessOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>");
+ };
+ AddZenProcessOptions(m_PauseOptions);
+ m_PauseOptions.parse_positional({"process-id"});
+ m_PauseOptions.positional_help("process-id");
+
+ AddZenProcessOptions(m_ResumeOptions);
+ m_ResumeOptions.parse_positional({"process-id"});
+ m_ResumeOptions.positional_help("process-id");
+
+ AddZenProcessOptions(m_AbortOptions);
+ m_AbortOptions.parse_positional({"process-id"});
+ m_AbortOptions.positional_help("process-id");
+
AddSystemOptions(m_ValidateBuildPartOptions);
AddCloudOptions(m_ValidateBuildPartOptions);
AddFileOptions(m_ValidateBuildPartOptions);
@@ -10458,7 +10684,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (!m_ListResultPath.empty())
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})",
+ GetRunningExecutablePath(),
+ ZEN_CFG_VERSION_BUILD_STRING_FULL,
+ GetCurrentProcessId());
}
BuildStorage::Statistics StorageStats;
@@ -10513,7 +10742,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (!m_ListResultPath.empty())
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})",
+ GetRunningExecutablePath(),
+ ZEN_CFG_VERSION_BUILD_STRING_FULL,
+ GetCurrentProcessId());
}
CbObject QueryObject;
if (m_ListQueryPath.empty())
@@ -10592,7 +10824,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_UploadOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
ParsePath();
@@ -10679,7 +10913,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_DownloadOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
ParsePath();
@@ -10743,7 +10979,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_FetchBlobOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10785,7 +11022,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_ValidateBuildPartOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10877,6 +11116,50 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
+ auto ParseZenProcessId = [&]() {
+ if (m_ZenProcessId == -1)
+ {
+ const std::filesystem::path RunningExecutablePath = GetRunningExecutablePath();
+ ProcessHandle RunningProcess;
+ std::error_code Ec = FindProcess(RunningExecutablePath, RunningProcess, /*IncludeSelf*/ false);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed finding process running '{}', reason: '{}'", RunningExecutablePath, Ec.message()));
+ }
+ if (!RunningProcess.IsValid())
+ {
+ throw zen::OptionParseException(
+ fmt::format("Unable to find a running instance of the zen executable '{}'", RunningExecutablePath));
+ }
+ m_ZenProcessId = RunningProcess.Pid();
+ }
+ };
+
+ if (SubOption == &m_PauseOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Pause.store(true);
+ return 0;
+ }
+
+ if (SubOption == &m_ResumeOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Pause.store(false);
+ return 0;
+ }
+
+ if (SubOption == &m_AbortOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Abort.store(true);
+ return 0;
+ }
+
if (SubOption == &m_TestOptions)
{
m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred();
@@ -11046,7 +11329,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return true;
};
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
uint32_t Randomizer = 0;
auto FileSizeIt = DownloadContent.FileSizes.begin();
@@ -11104,8 +11387,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
FileSizeIt++;
}
- Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(5000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork);
});
ZEN_ASSERT(!AbortFlag.load());