aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-06-05 14:40:02 +0200
committerGitHub Enterprise <[email protected]>2025-06-05 14:40:02 +0200
commit40b9386054de3c23f77da74eefaa743240d164fd (patch)
tree9c4448f86d1df00b3d0f5d5dd94506bca8c067d9 /src
parentrevert system temp dir for builds upload (#422) (diff)
downloadzen-40b9386054de3c23f77da74eefaa743240d164fd.tar.xz
zen-40b9386054de3c23f77da74eefaa743240d164fd.zip
pause, resume and abort running builds cmd (#421)
- Feature: `zen builds pause`, `zen builds resume` and `zen builds abort` commands to control a running `zen builds` command - `--process-id` the process id to control, if omitted it tries to find a running process using the same executable as itself - Improvement: Process report now indicates if it is pausing or aborting
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/builds_cmd.cpp437
-rw-r--r--src/zen/cmds/builds_cmd.h27
-rw-r--r--src/zen/cmds/up_cmd.cpp2
-rw-r--r--src/zen/cmds/wipe_cmd.cpp12
-rw-r--r--src/zen/zen.cpp43
-rw-r--r--src/zen/zen.h21
-rw-r--r--src/zencore/filesystem.cpp230
-rw-r--r--src/zencore/include/zencore/filesystem.h10
-rw-r--r--src/zencore/include/zencore/process.h2
-rw-r--r--src/zencore/process.cpp64
-rw-r--r--src/zenserver/cache/httpstructuredcache.cpp7
-rw-r--r--src/zenserver/projectstore/projectstore.cpp9
-rw-r--r--src/zenstore/buildstore/buildstore.cpp3
-rw-r--r--src/zenstore/cache/cachedisklayer.cpp12
-rw-r--r--src/zenstore/compactcas.cpp3
-rw-r--r--src/zenstore/filecas.cpp3
-rw-r--r--src/zenutil/chunkedcontent.cpp24
-rw-r--r--src/zenutil/include/zenutil/chunkedcontent.h17
-rw-r--r--src/zenutil/include/zenutil/parallelwork.h13
-rw-r--r--src/zenutil/parallelwork.cpp19
-rw-r--r--src/zenutil/zenserverprocess.cpp2
21 files changed, 799 insertions, 161 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp
index b04575009..49b032ab1 100644
--- a/src/zen/cmds/builds_cmd.cpp
+++ b/src/zen/cmds/builds_cmd.cpp
@@ -14,6 +14,7 @@
#include <zencore/fmtutils.h>
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
+#include <zencore/session.h>
#include <zencore/stream.h>
#include <zencore/string.h>
#include <zencore/trace.h>
@@ -59,23 +60,205 @@ ZEN_THIRD_PARTY_INCLUDES_END
#define ZEN_CLOUD_STORAGE "Cloud Storage"
namespace zen {
+
+using namespace std::literals;
+
namespace {
+ namespace zenutil {
+#if ZEN_PLATFORM_WINDOWS
+ class SecurityAttributes
+ {
+ public:
+ inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; }
+
+ protected:
+ SECURITY_ATTRIBUTES m_Attributes{};
+ SECURITY_DESCRIPTOR m_Sd{};
+ };
+
+ // Security attributes which allows any user access
+
+ class AnyUserSecurityAttributes : public SecurityAttributes
+ {
+ public:
+ AnyUserSecurityAttributes()
+ {
+ m_Attributes.nLength = sizeof m_Attributes;
+ m_Attributes.bInheritHandle = false; // Disable inheritance
+
+ const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
+
+ if (Success)
+ {
+ if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE))
+ {
+ ThrowLastError("SetSecurityDescriptorDacl failed");
+ }
+
+ m_Attributes.lpSecurityDescriptor = &m_Sd;
+ }
+ }
+ };
+#endif // ZEN_PLATFORM_WINDOWS
+
+ } // namespace zenutil
+
static std::atomic<bool> AbortFlag = false;
- static void SignalCallbackHandler(int SigNum)
+ static std::atomic<bool> PauseFlag = false;
+
+ static void SignalCallbackHandler(int SigNum)
{
if (SigNum == SIGINT)
{
+ PauseFlag = false;
AbortFlag = true;
}
#if ZEN_PLATFORM_WINDOWS
if (SigNum == SIGBREAK)
{
+ PauseFlag = false;
AbortFlag = true;
}
#endif // ZEN_PLATFORM_WINDOWS
}
- using namespace std::literals;
+ struct ZenStateSharedData
+ {
+ static constexpr uint64_t kMagicV1 = 0x3176646d636e657a; // zencmdv1
+
+ uint64_t Magic = 0; // Implies the size and layout of this struct - changes to the data requires change to Magic constant
+ std::atomic<uint32_t> Pid;
+ uint8_t SessionId[12];
+ uint8_t Padding1[4];
+ std::atomic<uint8_t> Abort;
+ std::atomic<uint8_t> Pause;
+ uint8_t Padding2[2];
+ };
+
+ struct MemMap
+ {
+ void* Handle = nullptr;
+ void* Data = nullptr;
+ size_t Size = 0;
+ std::string Name;
+ };
+
+ class ZenState
+ {
+ public:
+ ZenState(const ZenState&) = delete;
+ ZenState& operator=(const ZenState&) = delete;
+
+ ZenState();
+ explicit ZenState(uint32_t Pid);
+ ~ZenState();
+
+ const ZenStateSharedData& StateData() const
+ {
+ ZEN_ASSERT(m_Data);
+ return *m_Data;
+ }
+ ZenStateSharedData& StateData()
+ {
+ ZEN_ASSERT(m_Data);
+ return *m_Data;
+ }
+
+ private:
+ static constexpr std::string_view MapBaseName = "UnrealEngineZenCmd_"sv;
+ static constexpr size_t MapSize = sizeof(ZenStateSharedData);
+
+ bool m_Created = false;
+ std::unique_ptr<SharedMemory> m_MemMap;
+ ZenStateSharedData* m_Data = nullptr;
+
+ std::thread m_StateMonitor;
+ Event m_ExitStateMonitorEvent;
+ };
+
+ ZenState::ZenState(uint32_t Pid)
+ {
+ const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid);
+
+ if (!IsProcessRunning(Pid))
+ {
+ throw std::runtime_error(fmt::format("The process {} is not running", Pid));
+ }
+ std::unique_ptr<SharedMemory> MemMap = OpenSharedMemory(ZenStateMapName, MapSize, false);
+ if (!MemMap)
+ {
+ throw std::runtime_error(fmt::format("The process {} is not a running zen process", Pid));
+ }
+ ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData();
+ if (uint64_t MemMagic = data->Magic; MemMagic != ZenStateSharedData::kMagicV1)
+ {
+ throw std::runtime_error(fmt::format("The mem map for process {} has an unsupported magic {:x}, expected {:x}",
+ Pid,
+ MemMagic,
+ ZenStateSharedData::kMagicV1));
+ }
+ if (uint32_t MemPid = data->Pid.load(); MemPid != Pid)
+ {
+ throw std::runtime_error(
+ fmt::format("The mem map for process {} has an missmatching pid of {}, expected {}", Pid, MemPid, Pid));
+ }
+ m_MemMap = std::move(MemMap);
+ m_Data = data;
+ }
+
+ ZenState::ZenState()
+ {
+ int Pid = GetCurrentProcessId();
+
+ const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid);
+
+ std::unique_ptr<SharedMemory> MemMap = CreateSharedMemory(ZenStateMapName, MapSize, false);
+ if (!MemMap)
+ {
+ throw std::runtime_error(fmt::format("The mem map for process {} could not be created", Pid));
+ }
+
+ ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData();
+ memset(data, 0, sizeof(ZenStateSharedData));
+ data->Magic = ZenStateSharedData::kMagicV1;
+ data->Pid.store(gsl::narrow<uint32_t>(Pid));
+ data->SessionId;
+ data->Abort.store(false);
+ data->Pause.store(false);
+ const Oid SessionId = GetSessionId();
+ memcpy(data->SessionId, &SessionId, sizeof SessionId);
+
+ m_MemMap = std::move(MemMap);
+ m_Data = data;
+
+ m_StateMonitor = std::thread([this]() {
+ while (!m_ExitStateMonitorEvent.Wait(500))
+ {
+ if (m_Data->Abort.load())
+ {
+ AbortFlag.store(true);
+ }
+ PauseFlag.store(m_Data->Pause.load());
+ }
+ });
+ }
+
+ ZenState::~ZenState()
+ {
+ try
+ {
+ if (m_StateMonitor.joinable())
+ {
+ m_ExitStateMonitorEvent.Set();
+ m_StateMonitor.join();
+ }
+ m_MemMap.reset();
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("ZenState::~ZenState threw exception: {}", Ex.what());
+ }
+ }
static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u;
static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u;
@@ -435,7 +618,7 @@ namespace {
std::atomic<uint64_t> DiscoveredItemCount = 0;
std::atomic<uint64_t> DeletedItemCount = 0;
std::atomic<uint64_t> DeletedByteCount = 0;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -549,8 +732,8 @@ namespace {
uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs();
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
LastUpdateTimeMs = Timer.GetElapsedTimeMs();
uint64_t Deleted = DeletedItemCount.load();
@@ -559,7 +742,8 @@ namespace {
Progress.UpdateState({.Task = "Cleaning folder ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -598,12 +782,17 @@ namespace {
Progress.UpdateState({.Task = "Cleaning folder ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
false);
}
}
Progress.Finish();
+ if (AbortFlag)
+ {
+ return false;
+ }
uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs();
if (ElapsedTimeMs >= 200)
@@ -1954,7 +2143,7 @@ namespace {
WorkerThreadPool& NetworkPool = GetNetworkPool();
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
const std::filesystem::path TempFolder = ".zen-tmp";
@@ -2117,8 +2306,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount;
const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount;
@@ -2141,7 +2330,8 @@ namespace {
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2),
.RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 -
- (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))},
+ (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -2397,7 +2587,7 @@ namespace {
FilteredRate FilteredGeneratedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0;
@@ -2567,8 +2757,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load());
@@ -2587,7 +2777,8 @@ namespace {
{.Task = "Generating blocks",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(NewBlockCount),
- .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
@@ -2626,7 +2817,7 @@ namespace {
FilteredRate FilteredCompressedBytesPerSecond;
FilteredRate FilteredUploadedBytesPerSecond;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<size_t> UploadedBlockSize = 0;
std::atomic<size_t> UploadedBlockCount = 0;
@@ -3006,8 +3197,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load());
FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load());
FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load());
@@ -3035,13 +3226,15 @@ namespace {
ProgressBar.UpdateState({.Task = "Uploading blobs ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(TotalRawSize),
- .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)},
+ .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0);
ProgressBar.Finish();
+
UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS();
LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS();
}
@@ -3493,7 +3686,7 @@ namespace {
Content,
*ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -3506,16 +3699,18 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = TotalRawSize,
- .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()},
+ .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return;
}
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
}
ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec",
@@ -4266,7 +4461,7 @@ namespace {
WorkerThreadPool& VerifyPool = GetIOWorkerPool();
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size());
@@ -4414,8 +4609,8 @@ namespace {
});
}
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}",
VerifyFolderStats.FilesVerified.load(),
PathCount,
@@ -4424,12 +4619,18 @@ namespace {
ProgressBar.UpdateState({.Task = "Verifying files ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(PathCount),
- .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())},
+ .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs();
ProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
+
for (const std::string& Error : Errors)
{
ZEN_CONSOLE("{}", Error);
@@ -5361,7 +5562,7 @@ namespace {
Stopwatch Timer;
auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); });
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> CompletedPathCount = 0;
uint32_t PathIndex = 0;
@@ -5392,7 +5593,7 @@ namespace {
});
PathIndex += PathRangeCount;
}
- Work.Wait(200, [&](bool, ptrdiff_t) {
+ Work.Wait(200, [&](bool, bool, ptrdiff_t) {
if (ProgressCallback)
{
ProgressCallback(PathCount, CompletedPathCount.load());
@@ -5670,7 +5871,7 @@ namespace {
ScavengedPaths.resize(ScavengePathCount);
ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
std::atomic<uint64_t> PathsFound(0);
std::atomic<uint64_t> ChunksFound(0);
@@ -5800,8 +6001,8 @@ namespace {
{
ZEN_TRACE_CPU("ScavengeScan_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavanging",
PathsScavenged.load(),
ScavengePathCount,
@@ -5810,11 +6011,17 @@ namespace {
ScavengeProgressBar.UpdateState({.Task = "Scavenging ",
.Details = Details,
.TotalCount = ScavengePathCount,
- .RemainingCount = ScavengePathCount - PathsScavenged.load()},
+ .RemainingCount = ScavengePathCount - PathsScavenged.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
+
ScavengeProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return;
+ }
for (uint32_t ScavengedContentIndex = 0;
ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty());
@@ -6130,7 +6337,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct LooseChunkHashWorkData
{
@@ -7583,8 +7790,8 @@ namespace {
{
ZEN_TRACE_CPU("WriteChunks_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() +
DownloadStats.DownloadedBlockByteCount.load() +
+DownloadStats.DownloadedPartialBlockByteCount.load();
@@ -7610,7 +7817,8 @@ namespace {
.Details = Details,
.TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite,
.RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load())
- : (BytesToWrite - DiskStats.WriteByteCount.load())},
+ : (BytesToWrite - DiskStats.WriteByteCount.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
@@ -7618,13 +7826,12 @@ namespace {
FilteredWrittenBytesPerSecond.Stop();
FilteredDownloadedBytesPerSecond.Stop();
+ WriteProgressBar.Finish();
if (AbortFlag)
{
return;
}
- WriteProgressBar.Finish();
-
if (!PrimeCacheOnly)
{
uint32_t RawSequencesMissingWriteCount = 0;
@@ -7771,7 +7978,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t LocalPathIndex : FilesToCache)
{
@@ -7800,26 +8007,26 @@ namespace {
{
ZEN_TRACE_CPU("CacheLocal_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t WorkTotal = FilesToCache.size();
const uint64_t WorkComplete = CachedCount.load();
std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount));
CacheLocalProgressBar.UpdateState({.Task = "Caching local ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
+ CacheLocalProgressBar.Finish();
if (AbortFlag)
{
return;
}
- CacheLocalProgressBar.Finish();
-
ZEN_DEBUG(
"Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, "
"Delete: {}",
@@ -7858,7 +8065,7 @@ namespace {
WorkerThreadPool& WritePool = GetIOWorkerPool();
ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State");
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
OutLocalFolderState.Paths.resize(RemoteContent.Paths.size());
OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size());
@@ -8109,26 +8316,21 @@ namespace {
{
ZEN_TRACE_CPU("FinalizeTree_Wait");
- Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted, PendingWork);
+ Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size();
const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load();
std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal);
RebuildProgressBar.UpdateState({.Task = "Rebuilding state ",
.Details = Details,
.TotalCount = gsl::narrow<uint64_t>(WorkTotal),
- .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)},
+ .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
}
RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs();
-
- if (AbortFlag)
- {
- return;
- }
-
RebuildProgressBar.Finish();
}
}
@@ -8690,10 +8892,15 @@ namespace {
ProgressBar.UpdateState({.Task = "Checking files ",
.Details = Details,
.TotalCount = PathCount,
- .RemainingCount = PathCount - CompletedPathCount},
+ .RemainingCount = PathCount - CompletedPathCount,
+ .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)},
false);
});
ProgressBar.Finish();
+ if (AbortFlag)
+ {
+ return {};
+ }
}
bool ScanContent = true;
@@ -8731,7 +8938,7 @@ namespace {
UpdatedContent,
ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -8744,16 +8951,19 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = ByteCountToScan,
- .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()},
+ .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load(),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return {};
}
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}});
}
}
@@ -8799,7 +9009,7 @@ namespace {
OutLocalFolderContent,
ChunkController,
GetUpdateDelayMS(ProgressMode),
- [&](bool, std::ptrdiff_t) {
+ [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) {
FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load());
std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found",
ChunkingStats.FilesProcessed.load(),
@@ -8812,18 +9022,19 @@ namespace {
ProgressBar.UpdateState({.Task = "Scanning files ",
.Details = Details,
.TotalCount = ByteCountToScan,
- .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())},
+ .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load()),
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
},
- AbortFlag);
+ AbortFlag,
+ PauseFlag);
+ FilteredBytesHashed.Stop();
+ ProgressBar.Finish();
if (AbortFlag)
{
return {};
}
-
- FilteredBytesHashed.Stop();
- ProgressBar.Finish();
}
return LocalContent;
}
@@ -9740,6 +9951,21 @@ BuildsCommand::BuildsCommand()
m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"});
m_FetchBlobOptions.positional_help("build-id blob-hash");
+ auto AddZenProcessOptions = [this](cxxopts::Options& Ops) {
+ Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>");
+ };
+ AddZenProcessOptions(m_PauseOptions);
+ m_PauseOptions.parse_positional({"process-id"});
+ m_PauseOptions.positional_help("process-id");
+
+ AddZenProcessOptions(m_ResumeOptions);
+ m_ResumeOptions.parse_positional({"process-id"});
+ m_ResumeOptions.positional_help("process-id");
+
+ AddZenProcessOptions(m_AbortOptions);
+ m_AbortOptions.parse_positional({"process-id"});
+ m_AbortOptions.positional_help("process-id");
+
AddSystemOptions(m_ValidateBuildPartOptions);
AddCloudOptions(m_ValidateBuildPartOptions);
AddFileOptions(m_ValidateBuildPartOptions);
@@ -10458,7 +10684,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
{
if (!m_ListResultPath.empty())
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})",
+ GetRunningExecutablePath(),
+ ZEN_CFG_VERSION_BUILD_STRING_FULL,
+ GetCurrentProcessId());
}
BuildStorage::Statistics StorageStats;
@@ -10513,7 +10742,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (!m_ListResultPath.empty())
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})",
+ GetRunningExecutablePath(),
+ ZEN_CFG_VERSION_BUILD_STRING_FULL,
+ GetCurrentProcessId());
}
CbObject QueryObject;
if (m_ListQueryPath.empty())
@@ -10592,7 +10824,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_UploadOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
ParsePath();
@@ -10679,7 +10913,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_DownloadOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
ParsePath();
@@ -10743,7 +10979,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_FetchBlobOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10785,7 +11022,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
if (SubOption == &m_ValidateBuildPartOptions)
{
- ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL);
+ ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId());
+
+ ZenState InstanceState;
BuildStorage::Statistics StorageStats;
BuildStorageCache::Statistics StorageCacheStats;
@@ -10877,6 +11116,50 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return 0;
}
+ auto ParseZenProcessId = [&]() {
+ if (m_ZenProcessId == -1)
+ {
+ const std::filesystem::path RunningExecutablePath = GetRunningExecutablePath();
+ ProcessHandle RunningProcess;
+ std::error_code Ec = FindProcess(RunningExecutablePath, RunningProcess, /*IncludeSelf*/ false);
+ if (Ec)
+ {
+ throw std::runtime_error(
+ fmt::format("Failed finding process running '{}', reason: '{}'", RunningExecutablePath, Ec.message()));
+ }
+ if (!RunningProcess.IsValid())
+ {
+ throw zen::OptionParseException(
+ fmt::format("Unable to find a running instance of the zen executable '{}'", RunningExecutablePath));
+ }
+ m_ZenProcessId = RunningProcess.Pid();
+ }
+ };
+
+ if (SubOption == &m_PauseOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Pause.store(true);
+ return 0;
+ }
+
+ if (SubOption == &m_ResumeOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Pause.store(false);
+ return 0;
+ }
+
+ if (SubOption == &m_AbortOptions)
+ {
+ ParseZenProcessId();
+ ZenState RunningState(m_ZenProcessId);
+ RunningState.StateData().Abort.store(true);
+ return 0;
+ }
+
if (SubOption == &m_TestOptions)
{
m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred();
@@ -11046,7 +11329,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
return true;
};
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
uint32_t Randomizer = 0;
auto FileSizeIt = DownloadContent.FileSizes.begin();
@@ -11104,8 +11387,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
}
FileSizeIt++;
}
- Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(5000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork);
});
ZEN_ASSERT(!AbortFlag.load());
diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h
index 378810155..b3466c0d3 100644
--- a/src/zen/cmds/builds_cmd.h
+++ b/src/zen/cmds/builds_cmd.h
@@ -114,6 +114,12 @@ private:
cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"};
std::string m_BlobHash;
+ cxxopts::Options m_PauseOptions{"pause", "Pause an ongoing zen builds process"};
+ cxxopts::Options m_ResumeOptions{"resume", "Resume a paused zen builds process"};
+ cxxopts::Options m_AbortOptions{"abort", "Abort an ongoing zen builds process"};
+
+ int m_ZenProcessId = -1;
+
cxxopts::Options m_ValidateBuildPartOptions{"validate-part", "Fetch a build part and validate all referenced attachments"};
cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"};
@@ -121,15 +127,18 @@ private:
cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"};
std::vector<std::string> m_BuildIds;
- cxxopts::Options* m_SubCommands[9] = {&m_ListNamespacesOptions,
- &m_ListOptions,
- &m_UploadOptions,
- &m_DownloadOptions,
- &m_DiffOptions,
- &m_FetchBlobOptions,
- &m_ValidateBuildPartOptions,
- &m_TestOptions,
- &m_MultiTestDownloadOptions};
+ cxxopts::Options* m_SubCommands[12] = {&m_ListNamespacesOptions,
+ &m_ListOptions,
+ &m_UploadOptions,
+ &m_DownloadOptions,
+ &m_PauseOptions,
+ &m_ResumeOptions,
+ &m_AbortOptions,
+ &m_DiffOptions,
+ &m_FetchBlobOptions,
+ &m_ValidateBuildPartOptions,
+ &m_TestOptions,
+ &m_MultiTestDownloadOptions};
};
} // namespace zen
diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp
index f3bf2f66d..d0763701e 100644
--- a/src/zen/cmds/up_cmd.cpp
+++ b/src/zen/cmds/up_cmd.cpp
@@ -311,7 +311,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv)
// Try to find the running executable by path name
std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
ProcessHandle RunningProcess;
- if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec)
+ if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec, /*IncludeSelf*/ false)
{
ZEN_WARN("attempting hard terminate of zen process with pid ({})", RunningProcess.Pid());
try
diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp
index 9dfdca0a1..fcc18df2b 100644
--- a/src/zen/cmds/wipe_cmd.cpp
+++ b/src/zen/cmds/wipe_cmd.cpp
@@ -33,6 +33,7 @@ namespace zen {
namespace {
static std::atomic<bool> AbortFlag = false;
+ static std::atomic<bool> PauseFlag = false;
static bool IsVerbose = false;
static bool Quiet = false;
static ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
@@ -72,11 +73,13 @@ namespace {
{
if (SigNum == SIGINT)
{
+ PauseFlag = false;
AbortFlag = true;
}
#if ZEN_PLATFORM_WINDOWS
if (SigNum == SIGBREAK)
{
+ PauseFlag = false;
AbortFlag = true;
}
#endif // ZEN_PLATFORM_WINDOWS
@@ -248,7 +251,7 @@ namespace {
return Added;
};
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
struct AsyncVisitor : public GetDirectoryContentVisitor
{
@@ -423,12 +426,12 @@ namespace {
GetIOWorkerPool(),
Work.PendingWork());
- Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, ptrdiff_t PendingWork) {
+ Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) {
+ ZEN_UNUSED(PendingWork);
if (Quiet)
{
return;
}
- ZEN_UNUSED(IsAborted, PendingWork);
LastUpdateTimeMs = Timer.GetElapsedTimeMs();
uint64_t Deleted = DeletedItemCount.load();
@@ -437,7 +440,8 @@ namespace {
Progress.UpdateState({.Task = "Removing files ",
.Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)),
.TotalCount = Discovered,
- .RemainingCount = Discovered - Deleted},
+ .RemainingCount = Discovered - Deleted,
+ .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)},
false);
});
diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp
index 2dc0fa98c..119215d40 100644
--- a/src/zen/zen.cpp
+++ b/src/zen/zen.cpp
@@ -376,7 +376,8 @@ ProgressBar::SetLogOperationProgress(Mode InMode, uint32_t StepIndex, uint32_t S
ProgressBar::ProgressBar(Mode InMode, std::string_view InSubTask)
: m_Mode((!IsStdoutTty() && InMode == Mode::Pretty) ? Mode::Plain : InMode)
-, m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000)
+, m_LastUpdateMS((uint64_t)-1)
+, m_PausedMS(0)
, m_SubTask(InSubTask)
{
if (!m_SubTask.empty() && InMode == Mode::Log)
@@ -413,20 +414,46 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
}
uint64_t ElapsedTimeMS = m_SW.GetElapsedTimeMs();
- if (!DoLinebreak && (NewState.Task == m_State.Task) && ((m_LastUpdateMS + 200) > ElapsedTimeMS))
+ if (m_LastUpdateMS != (uint64_t)-1)
{
- return;
+ if (!DoLinebreak && (NewState.Status == m_State.Status) && (NewState.Task == m_State.Task) &&
+ ((m_LastUpdateMS + 200) > ElapsedTimeMS))
+ {
+ return;
+ }
+ if (m_State.Status == State::EStatus::Paused)
+ {
+ uint64_t ElapsedSinceLast = ElapsedTimeMS - m_LastUpdateMS;
+ m_PausedMS += ElapsedSinceLast;
+ }
}
m_LastUpdateMS = ElapsedTimeMS;
+ std::string Task = NewState.Task;
+ switch (NewState.Status)
+ {
+ case State::EStatus::Aborted:
+ Task = "Aborting";
+ break;
+ case State::EStatus::Paused:
+ Task = "Paused";
+ break;
+ default:
+ break;
+ }
+ if (NewState.Task.length() > Task.length())
+ {
+ Task += std::string(NewState.Task.length() - Task.length(), ' ');
+ }
+
const size_t PercentDone =
NewState.TotalCount > 0u ? gsl::narrow<uint8_t>((100 * (NewState.TotalCount - NewState.RemainingCount)) / NewState.TotalCount) : 0u;
if (m_Mode == Mode::Plain)
{
const std::string Details = (!NewState.Details.empty()) ? fmt::format(": {}", NewState.Details) : "";
- const std::string Output = fmt::format("{} {}% ({}){}\n", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details);
+ const std::string Output = fmt::format("{} {}% ({}){}\n", Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details);
OutputToConsoleRaw(Output);
}
else if (m_Mode == Mode::Pretty)
@@ -435,12 +462,12 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
size_t ProgressBarCount = (ProgressBarSize * PercentDone) / 100;
uint64_t Completed = NewState.TotalCount - NewState.RemainingCount;
- uint64_t ETAMS = (PercentDone > 5) ? (ElapsedTimeMS * NewState.RemainingCount) / Completed : 0;
+ uint64_t ETAElapsedMS = ElapsedTimeMS -= m_PausedMS;
+ uint64_t ETAMS =
+ (NewState.Status == State::EStatus::Running) && (PercentDone > 5) ? (ETAElapsedMS * NewState.RemainingCount) / Completed : 0;
uint32_t ConsoleColumns = GetConsoleColumns();
- std::string_view TaskString = NewState.Task;
-
const std::string PercentString = fmt::format("{:#3}%", PercentDone);
const std::string ProgressBarString =
@@ -454,7 +481,7 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak)
ExtendableStringBuilder<256> OutputBuilder;
- OutputBuilder << "\r" << TaskString << PercentString;
+ OutputBuilder << "\r" << Task << PercentString;
if (OutputBuilder.Size() + 1 < ConsoleColumns)
{
size_t RemainingSpace = ConsoleColumns - (OutputBuilder.Size() + 1);
diff --git a/src/zen/zen.h b/src/zen/zen.h
index 9ca228e37..995bd13b6 100644
--- a/src/zen/zen.h
+++ b/src/zen/zen.h
@@ -79,6 +79,26 @@ public:
std::string Details;
uint64_t TotalCount = 0;
uint64_t RemainingCount = 0;
+ enum class EStatus
+ {
+ Running,
+ Aborted,
+ Paused
+ };
+ EStatus Status = EStatus::Running;
+
+ static EStatus CalculateStatus(bool IsAborted, bool IsPaused)
+ {
+ if (IsAborted)
+ {
+ return EStatus::Aborted;
+ }
+ if (IsPaused)
+ {
+ return EStatus::Paused;
+ }
+ return EStatus::Running;
+ }
};
enum class Mode
@@ -104,6 +124,7 @@ private:
const Mode m_Mode;
Stopwatch m_SW;
uint64_t m_LastUpdateMS;
+ uint64_t m_PausedMS;
State m_State;
const std::string m_SubTask;
size_t m_LastOutputLength = 0;
diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp
index c4264bc29..46337ffc8 100644
--- a/src/zencore/filesystem.cpp
+++ b/src/zencore/filesystem.cpp
@@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <dirent.h>
# include <fcntl.h>
# include <sys/resource.h>
+# include <sys/mman.h>
# include <sys/stat.h>
# include <pwd.h>
# include <unistd.h>
@@ -43,6 +44,7 @@ ZEN_THIRD_PARTY_INCLUDES_END
# include <fcntl.h>
# include <libproc.h>
# include <sys/resource.h>
+# include <sys/mman.h>
# include <sys/stat.h>
# include <sys/syslimits.h>
# include <pwd.h>
@@ -2824,6 +2826,218 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly)
return Result;
}
+class SharedMemoryImpl : public SharedMemory
+{
+public:
+ struct Data
+ {
+ void* Handle = nullptr;
+ void* DataPtr = nullptr;
+ size_t Size = 0;
+ std::string Name;
+ };
+
+ static Data Open(std::string_view Name, size_t Size, bool SystemGlobal)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name));
+
+ HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, InstanceMapName.c_str());
+ if (hMap == NULL)
+ {
+ return {};
+ }
+ void* pBuf = MapViewOfFile(hMap, // handle to map object
+ FILE_MAP_ALL_ACCESS, // read/write permission
+ 0, // offset high
+ 0, // offset low
+ DWORD(Size)); // size
+
+ if (pBuf == NULL)
+ {
+ CloseHandle(hMap);
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ ZEN_UNUSED(SystemGlobal);
+ std::string InstanceMapName = fmt::format("/{}", Name);
+
+ int Fd = shm_open(InstanceMapName.c_str(), O_RDWR, 0666);
+ if (Fd < 0)
+ {
+ return {};
+ }
+ void* hMap = (void*)intptr_t(Fd);
+
+ struct stat Stat;
+ fstat(Fd, &Stat);
+
+ if (size_t(Stat.st_size) < Size)
+ {
+ close(Fd);
+ return {};
+ }
+
+ void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0);
+ if (pBuf == MAP_FAILED)
+ {
+ close(Fd);
+ return {};
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ static Data Create(std::string_view Name, size_t Size, bool SystemGlobal)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name));
+
+ SECURITY_ATTRIBUTES m_Attributes{};
+ SECURITY_DESCRIPTOR m_Sd{};
+
+ m_Attributes.nLength = sizeof m_Attributes;
+ m_Attributes.bInheritHandle = false; // Disable inheritance
+
+ const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION);
+
+ if (Success)
+ {
+ if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE))
+ {
+ ThrowLastError("SetSecurityDescriptorDacl failed");
+ }
+
+ m_Attributes.lpSecurityDescriptor = &m_Sd;
+ }
+
+ HANDLE hMap = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file
+ &m_Attributes, // allow anyone to access
+ PAGE_READWRITE, // read/write access
+ 0, // maximum object size (high-order DWORD)
+ DWORD(Size), // maximum object size (low-order DWORD)
+ InstanceMapName.c_str());
+ if (hMap == NULL)
+ {
+ return {};
+ }
+ void* pBuf = MapViewOfFile(hMap, // handle to map object
+ FILE_MAP_ALL_ACCESS, // read/write permission
+ 0, // offset high
+ 0, // offset low
+ DWORD(Size)); // size
+
+ if (pBuf == NULL)
+ {
+ CloseHandle(hMap);
+ return {};
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ ZEN_UNUSED(SystemGlobal);
+ std::string InstanceMapName = fmt::format("/{}", Name);
+
+ int Fd = shm_open(InstanceMapName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666);
+ if (Fd < 0)
+ {
+ return {};
+ }
+ fchmod(Fd, 0666);
+ void* hMap = (void*)intptr_t(Fd);
+
+ int Result = ftruncate(Fd, Size);
+ ZEN_UNUSED(Result);
+
+ void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0);
+ if (pBuf == MAP_FAILED)
+ {
+ close(Fd);
+ return {};
+ }
+ return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)};
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ static void Close(Data&& MemMap, bool Delete)
+ {
+#if ZEN_PLATFORM_WINDOWS
+ ZEN_UNUSED(Delete);
+ if (MemMap.DataPtr != nullptr)
+ {
+ UnmapViewOfFile(MemMap.DataPtr);
+ MemMap.DataPtr = nullptr;
+ }
+ if (MemMap.Handle != nullptr)
+ {
+ CloseHandle(MemMap.Handle);
+ MemMap.Handle = nullptr;
+ }
+#endif // ZEN_PLATFORM_WINDOWS
+#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ if (MemMap.DataPtr != nullptr)
+ {
+ munmap(MemMap.DataPtr, MemMap.Size);
+ MemMap.DataPtr = nullptr;
+ }
+
+ if (MemMap.Handle != nullptr)
+ {
+ int Fd = int(intptr_t(MemMap.Handle));
+ close(Fd);
+ MemMap.Handle = nullptr;
+ }
+ if (Delete)
+ {
+ std::string InstanceMapName = fmt::format("/{}", MemMap.Name);
+ shm_unlink(InstanceMapName.c_str());
+ }
+#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC
+ }
+
+ SharedMemoryImpl(Data&& MemMap, bool IsOwned) : m_MemMap(std::move(MemMap)), m_IsOwned(IsOwned) {}
+ virtual ~SharedMemoryImpl()
+ {
+ try
+ {
+ Close(std::move(m_MemMap), /*Delete*/ m_IsOwned);
+ }
+ catch (const std::exception& Ex)
+ {
+ ZEN_ERROR("SharedMemoryImpl::~SharedMemoryImpl threw exception: {}", Ex.what());
+ }
+ }
+
+ virtual void* GetData() override { return m_MemMap.DataPtr; }
+
+private:
+ Data m_MemMap;
+ const bool m_IsOwned = false;
+};
+
+std::unique_ptr<SharedMemory>
+OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal)
+{
+ SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Open(Name, Size, SystemGlobal);
+ if (MemMap.DataPtr)
+ {
+ return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ false);
+ }
+ return {};
+}
+
+std::unique_ptr<SharedMemory>
+CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal)
+{
+ SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Create(Name, Size, SystemGlobal);
+ if (MemMap.DataPtr)
+ {
+ return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ true);
+ }
+ return {};
+}
+
//////////////////////////////////////////////////////////////////////////
//
// Testing related code follows...
@@ -3108,6 +3322,22 @@ TEST_CASE("RotateDirectories")
}
}
+TEST_CASE("SharedMemory")
+{
+ CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false));
+ CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, true));
+
+ {
+ auto Mem0 = CreateSharedMemory("SharedMemoryTest0", 482, false);
+ CHECK(Mem0);
+ strcpy((char*)Mem0->GetData(), "this is the string we are looking for");
+ auto Mem1 = OpenSharedMemory("SharedMemoryTest0", 482, false);
+ CHECK_EQ(std::string((char*)Mem0->GetData()), std::string((char*)Mem1->GetData()));
+ }
+
+ CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false));
+}
+
#endif
} // namespace zen
diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h
index dfd0eedc9..36d4d1b68 100644
--- a/src/zencore/include/zencore/filesystem.h
+++ b/src/zencore/include/zencore/filesystem.h
@@ -385,6 +385,16 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly, std::error_code& Ec);
bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly);
+class SharedMemory
+{
+public:
+ virtual ~SharedMemory() {}
+ virtual void* GetData() = 0;
+};
+
+std::unique_ptr<SharedMemory> OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal);
+std::unique_ptr<SharedMemory> CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal);
+
//////////////////////////////////////////////////////////////////////////
void filesystem_forcelink(); // internal
diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h
index d1394cd9a..d3e1de703 100644
--- a/src/zencore/include/zencore/process.h
+++ b/src/zencore/include/zencore/process.h
@@ -98,7 +98,7 @@ ZENCORE_API int GetCurrentProcessId();
int GetProcessId(CreateProcResult ProcId);
std::filesystem::path GetProcessExecutablePath(int Pid, std::error_code& OutEc);
-std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle);
+std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf = true);
void process_forcelink(); // internal
diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp
index 48efc3f85..fcbe657cb 100644
--- a/src/zencore/process.cpp
+++ b/src/zencore/process.cpp
@@ -929,7 +929,7 @@ GetProcessExecutablePath(int Pid, std::error_code& OutEc)
}
std::error_code
-FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle)
+FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf)
{
#if ZEN_PLATFORM_WINDOWS
HANDLE ProcessSnapshotHandle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0);
@@ -939,13 +939,15 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
}
auto _ = MakeGuard([&]() { CloseHandle(ProcessSnapshotHandle); });
+ const DWORD ThisProcessId = ::GetCurrentProcessId();
+
PROCESSENTRY32 Entry;
Entry.dwSize = sizeof(PROCESSENTRY32);
if (Process32First(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry))
{
do
{
- if (ExecutableImage.filename() == Entry.szExeFile)
+ if ((IncludeSelf || (Entry.th32ProcessID != ThisProcessId)) && (ExecutableImage.filename() == Entry.szExeFile))
{
std::error_code Ec;
std::filesystem::path EntryPath = GetProcessExecutablePath(Entry.th32ProcessID, Ec);
@@ -970,6 +972,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
}
}
} while (::Process32Next(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry));
+ return {};
}
return MakeErrorCodeFromLastError();
#endif // ZEN_PLATFORM_WINDOWS
@@ -980,6 +983,8 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
struct kinfo_proc* Processes = nullptr;
uint32_t ProcCount = 0;
+ const pid_t ThisProcessId = getpid();
+
if (sysctl(Mib, 4, NULL, &BufferSize, NULL, 0) != -1 && BufferSize > 0)
{
struct kinfo_proc* Processes = (struct kinfo_proc*)malloc(BufferSize);
@@ -990,36 +995,46 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
char Buffer[PROC_PIDPATHINFO_MAXSIZE];
for (uint32_t ProcIndex = 0; ProcIndex < ProcCount; ProcIndex++)
{
- pid_t Pid = Processes[ProcIndex].kp_proc.p_pid;
- std::error_code Ec;
- std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec);
- if (!Ec)
+ pid_t Pid = Processes[ProcIndex].kp_proc.p_pid;
+ if (IncludeSelf || (Pid != ThisProcessId))
{
- if (EntryPath == ExecutableImage)
+ std::error_code Ec;
+ std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec);
+ if (!Ec)
{
- if (Processes[ProcIndex].kp_proc.p_stat != SZOMB)
+ if (EntryPath == ExecutableImage)
{
- OutHandle.Initialize(Pid, Ec);
- return Ec;
+ if (Processes[ProcIndex].kp_proc.p_stat != SZOMB)
+ {
+ OutHandle.Initialize(Pid, Ec);
+ return Ec;
+ }
}
}
+ Ec.clear();
}
}
+ return {};
}
}
return MakeErrorCodeFromLastError();
#endif // ZEN_PLATFORM_MAC
#if ZEN_PLATFORM_LINUX
+ const pid_t ThisProcessId = getpid();
+
std::vector<uint32_t> RunningPids;
DirectoryContent ProcList;
GetDirectoryContent("/proc", DirectoryContentFlags::IncludeDirs, ProcList);
for (const std::filesystem::path& EntryPath : ProcList.Directories)
{
std::string EntryName = EntryPath.stem();
- std::optional<uint32_t> Pid = ParseInt<uint32_t>(EntryName);
- if (Pid.has_value())
+ std::optional<uint32_t> PidMaybe = ParseInt<uint32_t>(EntryName);
+ if (PidMaybe.has_value())
{
- RunningPids.push_back(Pid.value());
+ if (pid_t Pid = PidMaybe.value(); IncludeSelf || (Pid != ThisProcessId))
+ {
+ RunningPids.push_back(Pid);
+ }
}
}
@@ -1042,6 +1057,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand
}
}
}
+ Ec.clear();
}
return {};
#endif // ZEN_PLATFORM_LINUX
@@ -1065,10 +1081,24 @@ TEST_CASE("Process")
TEST_CASE("FindProcess")
{
- ProcessHandle Process;
- std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process);
- CHECK(!Ec);
- CHECK(Process.IsValid());
+ {
+ ProcessHandle Process;
+ std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ true);
+ CHECK(!Ec);
+ CHECK(Process.IsValid());
+ }
+ {
+ ProcessHandle Process;
+ std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ false);
+ CHECK(!Ec);
+ CHECK(!Process.IsValid());
+ }
+ {
+ ProcessHandle Process;
+ std::error_code Ec = FindProcess("this/does\\not/exist\\123914921929412312312312asdad\\12134.no", Process, /*IncludeSelf*/ false);
+ CHECK(!Ec);
+ CHECK(!Process.IsValid());
+ }
}
TEST_CASE("BuildArgV")
diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp
index f7e63433b..9f2e826d6 100644
--- a/src/zenserver/cache/httpstructuredcache.cpp
+++ b/src/zenserver/cache/httpstructuredcache.cpp
@@ -1588,7 +1588,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
Stopwatch Timer;
auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); });
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
ZEN_INFO("Replaying {} requests", RequestCount);
for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex)
{
@@ -1638,8 +1639,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co
}
});
}
- Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
ZEN_INFO("Replayed {} of {} requests, elapsed {}",
RequestCount - PendingWork,
RequestCount,
diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp
index 3ec4373a2..a2e73380f 100644
--- a/src/zenserver/projectstore/projectstore.cpp
+++ b/src/zenserver/projectstore/projectstore.cpp
@@ -1597,7 +1597,8 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo
};
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++)
{
if (OptionalWorkerPool)
@@ -2111,7 +2112,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds,
{
std::atomic_bool Result = true;
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++)
{
@@ -3890,7 +3892,8 @@ ProjectStore::Flush()
WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (const Ref<Project>& Project : Projects)
{
diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp
index 41c747e08..afb7e4bee 100644
--- a/src/zenstore/buildstore/buildstore.cpp
+++ b/src/zenstore/buildstore/buildstore.cpp
@@ -483,7 +483,8 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O
if (!MetaLocations.empty())
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
m_MetadataBlockStore.IterateChunks(
MetaLocations,
diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp
index e973cee77..3f1f0e34a 100644
--- a/src/zenstore/cache/cachedisklayer.cpp
+++ b/src/zenstore/cache/cachedisklayer.cpp
@@ -3978,7 +3978,8 @@ ZenCacheDiskLayer::DiscoverBuckets()
WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (auto& BucketPath : FoundBucketDirectories)
{
Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) {
@@ -4141,7 +4142,8 @@ ZenCacheDiskLayer::Flush()
{
WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
try
{
for (auto& Bucket : Buckets)
@@ -4164,8 +4166,10 @@ ZenCacheDiskLayer::Flush()
{
ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what());
}
- Work.Wait(1000,
- [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); });
+ Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) {
+ ZEN_UNUSED(IsAborted, IsPaused);
+ ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir);
+ });
}
}
diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp
index 75562176e..0c9302ec8 100644
--- a/src/zenstore/compactcas.cpp
+++ b/src/zenstore/compactcas.cpp
@@ -402,7 +402,8 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas
std::atomic<bool> AsyncContinue = true;
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
const bool Continue = m_BlockStore.IterateChunks(
FoundChunkLocations,
[this,
diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp
index 56979f267..539b5e95b 100644
--- a/src/zenstore/filecas.cpp
+++ b/src/zenstore/filecas.cpp
@@ -664,7 +664,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes,
};
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++)
{
if (!AsyncContinue)
diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp
index c7532e098..cd1bf7dd7 100644
--- a/src/zenutil/chunkedcontent.cpp
+++ b/src/zenutil/chunkedcontent.cpp
@@ -758,14 +758,15 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span
}
ChunkedFolderContent
-ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateIntervalMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag)
+ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag)
{
ZEN_TRACE_CPU("ChunkFolderContent");
@@ -804,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats,
RwLock Lock;
- ParallelWork Work(AbortFlag);
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t PathIndex : Order)
{
@@ -831,10 +832,9 @@ ChunkFolderContent(ChunkingStatistics& Stats,
});
}
- Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) {
- ZEN_UNUSED(IsAborted);
+ Work.Wait(UpdateIntervalMS, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) {
ZEN_UNUSED(PendingWork);
- UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining());
+ UpdateCallback(IsAborted, IsPaused, Work.PendingWork().Remaining());
});
}
return Result;
diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h
index 225b1a3a5..306a5d990 100644
--- a/src/zenutil/include/zenutil/chunkedcontent.h
+++ b/src/zenutil/include/zenutil/chunkedcontent.h
@@ -132,14 +132,15 @@ struct ChunkingStatistics
uint64_t ElapsedWallTimeUS = 0;
};
-ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
- WorkerThreadPool& WorkerPool,
- const std::filesystem::path& RootPath,
- const FolderContent& Content,
- const ChunkingController& InChunkingController,
- int32_t UpdateIntervalMS,
- std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback,
- std::atomic<bool>& AbortFlag);
+ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats,
+ WorkerThreadPool& WorkerPool,
+ const std::filesystem::path& RootPath,
+ const FolderContent& Content,
+ const ChunkingController& InChunkingController,
+ int32_t UpdateIntervalMS,
+ std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback,
+ std::atomic<bool>& AbortFlag,
+ std::atomic<bool>& PauseFlag);
ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content);
diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h
index 08e730b28..d7e986551 100644
--- a/src/zenutil/include/zenutil/parallelwork.h
+++ b/src/zenutil/include/zenutil/parallelwork.h
@@ -12,13 +12,13 @@ namespace zen {
class ParallelWork
{
public:
- ParallelWork(std::atomic<bool>& AbortFlag);
+ ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag);
~ParallelWork();
- typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
- typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
- typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback;
+ typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback;
+ typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback;
+ typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback;
void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {})
{
@@ -28,6 +28,10 @@ public:
WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] {
try
{
+ while (m_PauseFlag && !m_AbortFlag)
+ {
+ Sleep(2000);
+ }
Work(m_AbortFlag);
}
catch (...)
@@ -59,6 +63,7 @@ private:
void RethrowErrors();
std::atomic<bool>& m_AbortFlag;
+ std::atomic<bool>& m_PauseFlag;
bool m_DispatchComplete = false;
Latch m_PendingWork;
diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp
index ecacc4b5a..67fc03c04 100644
--- a/src/zenutil/parallelwork.cpp
+++ b/src/zenutil/parallelwork.cpp
@@ -14,7 +14,10 @@
namespace zen {
-ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1)
+ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag)
+: m_AbortFlag(AbortFlag)
+, m_PauseFlag(PauseFlag)
+, m_PendingWork(1)
{
}
@@ -59,7 +62,7 @@ ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback)
while (!m_PendingWork.Wait(UpdateIntervalMS))
{
- UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining());
+ UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining());
}
RethrowErrors();
@@ -111,7 +114,8 @@ ParallelWork::RethrowErrors()
TEST_CASE("parallellwork.nowork")
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
Work.Wait();
}
@@ -120,7 +124,8 @@ TEST_CASE("parallellwork.basic")
WorkerThreadPool WorkerPool(2);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); });
@@ -133,7 +138,8 @@ TEST_CASE("parallellwork.throws_in_work")
WorkerThreadPool WorkerPool(2);
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t I = 0; I < 10; I++)
{
Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) {
@@ -158,7 +164,8 @@ TEST_CASE("parallellwork.throws_in_dispatch")
try
{
std::atomic<bool> AbortFlag;
- ParallelWork Work(AbortFlag);
+ std::atomic<bool> PauseFlag;
+ ParallelWork Work(AbortFlag, PauseFlag);
for (uint32_t I = 0; I < 5; I++)
{
Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) {
diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp
index bfa0d3c49..a5b342cb0 100644
--- a/src/zenutil/zenserverprocess.cpp
+++ b/src/zenutil/zenserverprocess.cpp
@@ -1059,7 +1059,7 @@ ZenServerInstance::Terminate()
const std::filesystem::path BaseDir = m_Env.ProgramBaseDir();
const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL;
ProcessHandle RunningProcess;
- std::error_code Ec = FindProcess(Executable, RunningProcess);
+ std::error_code Ec = FindProcess(Executable, RunningProcess, /*IncludeSelf*/ false);
if (Ec)
{
throw std::system_error(Ec, fmt::format("failed to look up running server executable '{}'", Executable));