diff options
| author | Dan Engelbrecht <[email protected]> | 2025-06-05 14:40:02 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-06-05 14:40:02 +0200 |
| commit | 40b9386054de3c23f77da74eefaa743240d164fd (patch) | |
| tree | 9c4448f86d1df00b3d0f5d5dd94506bca8c067d9 /src | |
| parent | revert system temp dir for builds upload (#422) (diff) | |
| download | zen-40b9386054de3c23f77da74eefaa743240d164fd.tar.xz zen-40b9386054de3c23f77da74eefaa743240d164fd.zip | |
pause, resume and abort running builds cmd (#421)
- Feature: `zen builds pause`, `zen builds resume` and `zen builds abort` commands to control a running `zen builds` command
- `--process-id` the process id to control, if omitted it tries to find a running process using the same executable as itself
- Improvement: Process report now indicates if it is pausing or aborting
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/builds_cmd.cpp | 437 | ||||
| -rw-r--r-- | src/zen/cmds/builds_cmd.h | 27 | ||||
| -rw-r--r-- | src/zen/cmds/up_cmd.cpp | 2 | ||||
| -rw-r--r-- | src/zen/cmds/wipe_cmd.cpp | 12 | ||||
| -rw-r--r-- | src/zen/zen.cpp | 43 | ||||
| -rw-r--r-- | src/zen/zen.h | 21 | ||||
| -rw-r--r-- | src/zencore/filesystem.cpp | 230 | ||||
| -rw-r--r-- | src/zencore/include/zencore/filesystem.h | 10 | ||||
| -rw-r--r-- | src/zencore/include/zencore/process.h | 2 | ||||
| -rw-r--r-- | src/zencore/process.cpp | 64 | ||||
| -rw-r--r-- | src/zenserver/cache/httpstructuredcache.cpp | 7 | ||||
| -rw-r--r-- | src/zenserver/projectstore/projectstore.cpp | 9 | ||||
| -rw-r--r-- | src/zenstore/buildstore/buildstore.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/cache/cachedisklayer.cpp | 12 | ||||
| -rw-r--r-- | src/zenstore/compactcas.cpp | 3 | ||||
| -rw-r--r-- | src/zenstore/filecas.cpp | 3 | ||||
| -rw-r--r-- | src/zenutil/chunkedcontent.cpp | 24 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/chunkedcontent.h | 17 | ||||
| -rw-r--r-- | src/zenutil/include/zenutil/parallelwork.h | 13 | ||||
| -rw-r--r-- | src/zenutil/parallelwork.cpp | 19 | ||||
| -rw-r--r-- | src/zenutil/zenserverprocess.cpp | 2 |
21 files changed, 799 insertions, 161 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index b04575009..49b032ab1 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -14,6 +14,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/trace.h> @@ -59,23 +60,205 @@ ZEN_THIRD_PARTY_INCLUDES_END #define ZEN_CLOUD_STORAGE "Cloud Storage" namespace zen { + +using namespace std::literals; + namespace { + namespace zenutil { +#if ZEN_PLATFORM_WINDOWS + class SecurityAttributes + { + public: + inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } + + protected: + SECURITY_ATTRIBUTES m_Attributes{}; + SECURITY_DESCRIPTOR m_Sd{}; + }; + + // Security attributes which allows any user access + + class AnyUserSecurityAttributes : public SecurityAttributes + { + public: + AnyUserSecurityAttributes() + { + m_Attributes.nLength = sizeof m_Attributes; + m_Attributes.bInheritHandle = false; // Disable inheritance + + const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); + + if (Success) + { + if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE)) + { + ThrowLastError("SetSecurityDescriptorDacl failed"); + } + + m_Attributes.lpSecurityDescriptor = &m_Sd; + } + } + }; +#endif // ZEN_PLATFORM_WINDOWS + + } // namespace zenutil + static std::atomic<bool> AbortFlag = false; - static void SignalCallbackHandler(int SigNum) + static std::atomic<bool> PauseFlag = false; + + static void SignalCallbackHandler(int SigNum) { if (SigNum == SIGINT) { + PauseFlag = false; AbortFlag = true; } #if ZEN_PLATFORM_WINDOWS if (SigNum == SIGBREAK) { + PauseFlag = false; AbortFlag = true; } #endif // ZEN_PLATFORM_WINDOWS } - using namespace std::literals; + struct ZenStateSharedData + { + static constexpr uint64_t kMagicV1 = 0x3176646d636e657a; // zencmdv1 + + uint64_t Magic = 0; // Implies the size and layout of this struct - changes to the data requires change to Magic constant + std::atomic<uint32_t> Pid; + uint8_t SessionId[12]; + uint8_t Padding1[4]; + std::atomic<uint8_t> Abort; + std::atomic<uint8_t> Pause; + uint8_t Padding2[2]; + }; + + struct MemMap + { + void* Handle = nullptr; + void* Data = nullptr; + size_t Size = 0; + std::string Name; + }; + + class ZenState + { + public: + ZenState(const ZenState&) = delete; + ZenState& operator=(const ZenState&) = delete; + + ZenState(); + explicit ZenState(uint32_t Pid); + ~ZenState(); + + const ZenStateSharedData& StateData() const + { + ZEN_ASSERT(m_Data); + return *m_Data; + } + ZenStateSharedData& StateData() + { + ZEN_ASSERT(m_Data); + return *m_Data; + } + + private: + static constexpr std::string_view MapBaseName = "UnrealEngineZenCmd_"sv; + static constexpr size_t MapSize = sizeof(ZenStateSharedData); + + bool m_Created = false; + std::unique_ptr<SharedMemory> m_MemMap; + ZenStateSharedData* m_Data = nullptr; + + std::thread m_StateMonitor; + Event m_ExitStateMonitorEvent; + }; + + ZenState::ZenState(uint32_t Pid) + { + const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid); + + if (!IsProcessRunning(Pid)) + { + throw std::runtime_error(fmt::format("The process {} is not running", Pid)); + } + std::unique_ptr<SharedMemory> MemMap = OpenSharedMemory(ZenStateMapName, MapSize, false); + if (!MemMap) + { + throw std::runtime_error(fmt::format("The process {} is not a running zen process", Pid)); + } + ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData(); + if (uint64_t MemMagic = data->Magic; MemMagic != ZenStateSharedData::kMagicV1) + { + throw std::runtime_error(fmt::format("The mem map for process {} has an unsupported magic {:x}, expected {:x}", + Pid, + MemMagic, + ZenStateSharedData::kMagicV1)); + } + if (uint32_t MemPid = data->Pid.load(); MemPid != Pid) + { + throw std::runtime_error( + fmt::format("The mem map for process {} has an missmatching pid of {}, expected {}", Pid, MemPid, Pid)); + } + m_MemMap = std::move(MemMap); + m_Data = data; + } + + ZenState::ZenState() + { + int Pid = GetCurrentProcessId(); + + const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid); + + std::unique_ptr<SharedMemory> MemMap = CreateSharedMemory(ZenStateMapName, MapSize, false); + if (!MemMap) + { + throw std::runtime_error(fmt::format("The mem map for process {} could not be created", Pid)); + } + + ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData(); + memset(data, 0, sizeof(ZenStateSharedData)); + data->Magic = ZenStateSharedData::kMagicV1; + data->Pid.store(gsl::narrow<uint32_t>(Pid)); + data->SessionId; + data->Abort.store(false); + data->Pause.store(false); + const Oid SessionId = GetSessionId(); + memcpy(data->SessionId, &SessionId, sizeof SessionId); + + m_MemMap = std::move(MemMap); + m_Data = data; + + m_StateMonitor = std::thread([this]() { + while (!m_ExitStateMonitorEvent.Wait(500)) + { + if (m_Data->Abort.load()) + { + AbortFlag.store(true); + } + PauseFlag.store(m_Data->Pause.load()); + } + }); + } + + ZenState::~ZenState() + { + try + { + if (m_StateMonitor.joinable()) + { + m_ExitStateMonitorEvent.Set(); + m_StateMonitor.join(); + } + m_MemMap.reset(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("ZenState::~ZenState threw exception: {}", Ex.what()); + } + } static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; @@ -435,7 +618,7 @@ namespace { std::atomic<uint64_t> DiscoveredItemCount = 0; std::atomic<uint64_t> DeletedItemCount = 0; std::atomic<uint64_t> DeletedByteCount = 0; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); struct AsyncVisitor : public GetDirectoryContentVisitor { @@ -549,8 +732,8 @@ namespace { uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs(); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); LastUpdateTimeMs = Timer.GetElapsedTimeMs(); uint64_t Deleted = DeletedItemCount.load(); @@ -559,7 +742,8 @@ namespace { Progress.UpdateState({.Task = "Cleaning folder ", .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), .TotalCount = Discovered, - .RemainingCount = Discovered - Deleted}, + .RemainingCount = Discovered - Deleted, + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); @@ -598,12 +782,17 @@ namespace { Progress.UpdateState({.Task = "Cleaning folder ", .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), .TotalCount = Discovered, - .RemainingCount = Discovered - Deleted}, + .RemainingCount = Discovered - Deleted, + .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, false); } } Progress.Finish(); + if (AbortFlag) + { + return false; + } uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); if (ElapsedTimeMs >= 200) @@ -1954,7 +2143,7 @@ namespace { WorkerThreadPool& NetworkPool = GetNetworkPool(); WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -2117,8 +2306,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount; @@ -2141,7 +2330,8 @@ namespace { .Details = Details, .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - - (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))}, + (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); @@ -2397,7 +2587,7 @@ namespace { FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; @@ -2567,8 +2757,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); @@ -2587,7 +2777,8 @@ namespace { {.Task = "Generating blocks", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(NewBlockCount), - .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())}, + .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); @@ -2626,7 +2817,7 @@ namespace { FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<size_t> UploadedBlockSize = 0; std::atomic<size_t> UploadedBlockCount = 0; @@ -3006,8 +3197,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); @@ -3035,13 +3226,15 @@ namespace { ProgressBar.UpdateState({.Task = "Uploading blobs ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(TotalRawSize), - .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)}, + .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); ProgressBar.Finish(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } @@ -3493,7 +3686,7 @@ namespace { Content, *ChunkController, GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), @@ -3506,16 +3699,18 @@ namespace { ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = TotalRawSize, - .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()}, + .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, - AbortFlag); + AbortFlag, + PauseFlag); + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); if (AbortFlag) { return; } - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); } ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", @@ -4266,7 +4461,7 @@ namespace { WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size()); @@ -4414,8 +4609,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", VerifyFolderStats.FilesVerified.load(), PathCount, @@ -4424,12 +4619,18 @@ namespace { ProgressBar.UpdateState({.Task = "Verifying files ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(PathCount), - .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())}, + .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs(); ProgressBar.Finish(); + if (AbortFlag) + { + return; + } + for (const std::string& Error : Errors) { ZEN_CONSOLE("{}", Error); @@ -5361,7 +5562,7 @@ namespace { Stopwatch Timer; auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<uint64_t> CompletedPathCount = 0; uint32_t PathIndex = 0; @@ -5392,7 +5593,7 @@ namespace { }); PathIndex += PathRangeCount; } - Work.Wait(200, [&](bool, ptrdiff_t) { + Work.Wait(200, [&](bool, bool, ptrdiff_t) { if (ProgressCallback) { ProgressCallback(PathCount, CompletedPathCount.load()); @@ -5670,7 +5871,7 @@ namespace { ScavengedPaths.resize(ScavengePathCount); ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<uint64_t> PathsFound(0); std::atomic<uint64_t> ChunksFound(0); @@ -5800,8 +6001,8 @@ namespace { { ZEN_TRACE_CPU("ScavengeScan_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavanging", PathsScavenged.load(), ScavengePathCount, @@ -5810,11 +6011,17 @@ namespace { ScavengeProgressBar.UpdateState({.Task = "Scavenging ", .Details = Details, .TotalCount = ScavengePathCount, - .RemainingCount = ScavengePathCount - PathsScavenged.load()}, + .RemainingCount = ScavengePathCount - PathsScavenged.load(), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } + ScavengeProgressBar.Finish(); + if (AbortFlag) + { + return; + } for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); @@ -6130,7 +6337,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); struct LooseChunkHashWorkData { @@ -7583,8 +7790,8 @@ namespace { { ZEN_TRACE_CPU("WriteChunks_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); @@ -7610,7 +7817,8 @@ namespace { .Details = Details, .TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite, .RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load()) - : (BytesToWrite - DiskStats.WriteByteCount.load())}, + : (BytesToWrite - DiskStats.WriteByteCount.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } @@ -7618,13 +7826,12 @@ namespace { FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); + WriteProgressBar.Finish(); if (AbortFlag) { return; } - WriteProgressBar.Finish(); - if (!PrimeCacheOnly) { uint32_t RawSequencesMissingWriteCount = 0; @@ -7771,7 +7978,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t LocalPathIndex : FilesToCache) { @@ -7800,26 +8007,26 @@ namespace { { ZEN_TRACE_CPU("CacheLocal_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = FilesToCache.size(); const uint64_t WorkComplete = CachedCount.load(); std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); CacheLocalProgressBar.UpdateState({.Task = "Caching local ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)}, + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } + CacheLocalProgressBar.Finish(); if (AbortFlag) { return; } - CacheLocalProgressBar.Finish(); - ZEN_DEBUG( "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " "Delete: {}", @@ -7858,7 +8065,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); @@ -8109,26 +8316,21 @@ namespace { { ZEN_TRACE_CPU("FinalizeTree_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)}, + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - - if (AbortFlag) - { - return; - } - RebuildProgressBar.Finish(); } } @@ -8690,10 +8892,15 @@ namespace { ProgressBar.UpdateState({.Task = "Checking files ", .Details = Details, .TotalCount = PathCount, - .RemainingCount = PathCount - CompletedPathCount}, + .RemainingCount = PathCount - CompletedPathCount, + .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, false); }); ProgressBar.Finish(); + if (AbortFlag) + { + return {}; + } } bool ScanContent = true; @@ -8731,7 +8938,7 @@ namespace { UpdatedContent, ChunkController, GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), @@ -8744,16 +8951,19 @@ namespace { ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, - .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, + .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load(), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, - AbortFlag); + AbortFlag, + PauseFlag); + + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); if (AbortFlag) { return {}; } - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}}); } } @@ -8799,7 +9009,7 @@ namespace { OutLocalFolderContent, ChunkController, GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), @@ -8812,18 +9022,19 @@ namespace { ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, - .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())}, + .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, - AbortFlag); + AbortFlag, + PauseFlag); + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); if (AbortFlag) { return {}; } - - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); } return LocalContent; } @@ -9740,6 +9951,21 @@ BuildsCommand::BuildsCommand() m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"}); m_FetchBlobOptions.positional_help("build-id blob-hash"); + auto AddZenProcessOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>"); + }; + AddZenProcessOptions(m_PauseOptions); + m_PauseOptions.parse_positional({"process-id"}); + m_PauseOptions.positional_help("process-id"); + + AddZenProcessOptions(m_ResumeOptions); + m_ResumeOptions.parse_positional({"process-id"}); + m_ResumeOptions.positional_help("process-id"); + + AddZenProcessOptions(m_AbortOptions); + m_AbortOptions.parse_positional({"process-id"}); + m_AbortOptions.positional_help("process-id"); + AddSystemOptions(m_ValidateBuildPartOptions); AddCloudOptions(m_ValidateBuildPartOptions); AddFileOptions(m_ValidateBuildPartOptions); @@ -10458,7 +10684,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (!m_ListResultPath.empty()) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", + GetRunningExecutablePath(), + ZEN_CFG_VERSION_BUILD_STRING_FULL, + GetCurrentProcessId()); } BuildStorage::Statistics StorageStats; @@ -10513,7 +10742,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (!m_ListResultPath.empty()) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", + GetRunningExecutablePath(), + ZEN_CFG_VERSION_BUILD_STRING_FULL, + GetCurrentProcessId()); } CbObject QueryObject; if (m_ListQueryPath.empty()) @@ -10592,7 +10824,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_UploadOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); + + ZenState InstanceState; ParsePath(); @@ -10679,7 +10913,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_DownloadOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); + + ZenState InstanceState; ParsePath(); @@ -10743,7 +10979,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_FetchBlobOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); + BuildStorage::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -10785,7 +11022,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_ValidateBuildPartOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); + + ZenState InstanceState; BuildStorage::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -10877,6 +11116,50 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + auto ParseZenProcessId = [&]() { + if (m_ZenProcessId == -1) + { + const std::filesystem::path RunningExecutablePath = GetRunningExecutablePath(); + ProcessHandle RunningProcess; + std::error_code Ec = FindProcess(RunningExecutablePath, RunningProcess, /*IncludeSelf*/ false); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed finding process running '{}', reason: '{}'", RunningExecutablePath, Ec.message())); + } + if (!RunningProcess.IsValid()) + { + throw zen::OptionParseException( + fmt::format("Unable to find a running instance of the zen executable '{}'", RunningExecutablePath)); + } + m_ZenProcessId = RunningProcess.Pid(); + } + }; + + if (SubOption == &m_PauseOptions) + { + ParseZenProcessId(); + ZenState RunningState(m_ZenProcessId); + RunningState.StateData().Pause.store(true); + return 0; + } + + if (SubOption == &m_ResumeOptions) + { + ParseZenProcessId(); + ZenState RunningState(m_ZenProcessId); + RunningState.StateData().Pause.store(false); + return 0; + } + + if (SubOption == &m_AbortOptions) + { + ParseZenProcessId(); + ZenState RunningState(m_ZenProcessId); + RunningState.StateData().Abort.store(true); + return 0; + } + if (SubOption == &m_TestOptions) { m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred(); @@ -11046,7 +11329,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return true; }; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); uint32_t Randomizer = 0; auto FileSizeIt = DownloadContent.FileSizes.begin(); @@ -11104,8 +11387,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } FileSizeIt++; } - Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(5000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); }); ZEN_ASSERT(!AbortFlag.load()); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 378810155..b3466c0d3 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -114,6 +114,12 @@ private: cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"}; std::string m_BlobHash; + cxxopts::Options m_PauseOptions{"pause", "Pause an ongoing zen builds process"}; + cxxopts::Options m_ResumeOptions{"resume", "Resume a paused zen builds process"}; + cxxopts::Options m_AbortOptions{"abort", "Abort an ongoing zen builds process"}; + + int m_ZenProcessId = -1; + cxxopts::Options m_ValidateBuildPartOptions{"validate-part", "Fetch a build part and validate all referenced attachments"}; cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"}; @@ -121,15 +127,18 @@ private: cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"}; std::vector<std::string> m_BuildIds; - cxxopts::Options* m_SubCommands[9] = {&m_ListNamespacesOptions, - &m_ListOptions, - &m_UploadOptions, - &m_DownloadOptions, - &m_DiffOptions, - &m_FetchBlobOptions, - &m_ValidateBuildPartOptions, - &m_TestOptions, - &m_MultiTestDownloadOptions}; + cxxopts::Options* m_SubCommands[12] = {&m_ListNamespacesOptions, + &m_ListOptions, + &m_UploadOptions, + &m_DownloadOptions, + &m_PauseOptions, + &m_ResumeOptions, + &m_AbortOptions, + &m_DiffOptions, + &m_FetchBlobOptions, + &m_ValidateBuildPartOptions, + &m_TestOptions, + &m_MultiTestDownloadOptions}; }; } // namespace zen diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp index f3bf2f66d..d0763701e 100644 --- a/src/zen/cmds/up_cmd.cpp +++ b/src/zen/cmds/up_cmd.cpp @@ -311,7 +311,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // Try to find the running executable by path name std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; ProcessHandle RunningProcess; - if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec) + if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec, /*IncludeSelf*/ false) { ZEN_WARN("attempting hard terminate of zen process with pid ({})", RunningProcess.Pid()); try diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp index 9dfdca0a1..fcc18df2b 100644 --- a/src/zen/cmds/wipe_cmd.cpp +++ b/src/zen/cmds/wipe_cmd.cpp @@ -33,6 +33,7 @@ namespace zen { namespace { static std::atomic<bool> AbortFlag = false; + static std::atomic<bool> PauseFlag = false; static bool IsVerbose = false; static bool Quiet = false; static ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty; @@ -72,11 +73,13 @@ namespace { { if (SigNum == SIGINT) { + PauseFlag = false; AbortFlag = true; } #if ZEN_PLATFORM_WINDOWS if (SigNum == SIGBREAK) { + PauseFlag = false; AbortFlag = true; } #endif // ZEN_PLATFORM_WINDOWS @@ -248,7 +251,7 @@ namespace { return Added; }; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); struct AsyncVisitor : public GetDirectoryContentVisitor { @@ -423,12 +426,12 @@ namespace { GetIOWorkerPool(), Work.PendingWork()); - Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, ptrdiff_t PendingWork) { + Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); if (Quiet) { return; } - ZEN_UNUSED(IsAborted, PendingWork); LastUpdateTimeMs = Timer.GetElapsedTimeMs(); uint64_t Deleted = DeletedItemCount.load(); @@ -437,7 +440,8 @@ namespace { Progress.UpdateState({.Task = "Removing files ", .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), .TotalCount = Discovered, - .RemainingCount = Discovered - Deleted}, + .RemainingCount = Discovered - Deleted, + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 2dc0fa98c..119215d40 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -376,7 +376,8 @@ ProgressBar::SetLogOperationProgress(Mode InMode, uint32_t StepIndex, uint32_t S ProgressBar::ProgressBar(Mode InMode, std::string_view InSubTask) : m_Mode((!IsStdoutTty() && InMode == Mode::Pretty) ? Mode::Plain : InMode) -, m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000) +, m_LastUpdateMS((uint64_t)-1) +, m_PausedMS(0) , m_SubTask(InSubTask) { if (!m_SubTask.empty() && InMode == Mode::Log) @@ -413,20 +414,46 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) } uint64_t ElapsedTimeMS = m_SW.GetElapsedTimeMs(); - if (!DoLinebreak && (NewState.Task == m_State.Task) && ((m_LastUpdateMS + 200) > ElapsedTimeMS)) + if (m_LastUpdateMS != (uint64_t)-1) { - return; + if (!DoLinebreak && (NewState.Status == m_State.Status) && (NewState.Task == m_State.Task) && + ((m_LastUpdateMS + 200) > ElapsedTimeMS)) + { + return; + } + if (m_State.Status == State::EStatus::Paused) + { + uint64_t ElapsedSinceLast = ElapsedTimeMS - m_LastUpdateMS; + m_PausedMS += ElapsedSinceLast; + } } m_LastUpdateMS = ElapsedTimeMS; + std::string Task = NewState.Task; + switch (NewState.Status) + { + case State::EStatus::Aborted: + Task = "Aborting"; + break; + case State::EStatus::Paused: + Task = "Paused"; + break; + default: + break; + } + if (NewState.Task.length() > Task.length()) + { + Task += std::string(NewState.Task.length() - Task.length(), ' '); + } + const size_t PercentDone = NewState.TotalCount > 0u ? gsl::narrow<uint8_t>((100 * (NewState.TotalCount - NewState.RemainingCount)) / NewState.TotalCount) : 0u; if (m_Mode == Mode::Plain) { const std::string Details = (!NewState.Details.empty()) ? fmt::format(": {}", NewState.Details) : ""; - const std::string Output = fmt::format("{} {}% ({}){}\n", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details); + const std::string Output = fmt::format("{} {}% ({}){}\n", Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details); OutputToConsoleRaw(Output); } else if (m_Mode == Mode::Pretty) @@ -435,12 +462,12 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) size_t ProgressBarCount = (ProgressBarSize * PercentDone) / 100; uint64_t Completed = NewState.TotalCount - NewState.RemainingCount; - uint64_t ETAMS = (PercentDone > 5) ? (ElapsedTimeMS * NewState.RemainingCount) / Completed : 0; + uint64_t ETAElapsedMS = ElapsedTimeMS -= m_PausedMS; + uint64_t ETAMS = + (NewState.Status == State::EStatus::Running) && (PercentDone > 5) ? (ETAElapsedMS * NewState.RemainingCount) / Completed : 0; uint32_t ConsoleColumns = GetConsoleColumns(); - std::string_view TaskString = NewState.Task; - const std::string PercentString = fmt::format("{:#3}%", PercentDone); const std::string ProgressBarString = @@ -454,7 +481,7 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) ExtendableStringBuilder<256> OutputBuilder; - OutputBuilder << "\r" << TaskString << PercentString; + OutputBuilder << "\r" << Task << PercentString; if (OutputBuilder.Size() + 1 < ConsoleColumns) { size_t RemainingSpace = ConsoleColumns - (OutputBuilder.Size() + 1); diff --git a/src/zen/zen.h b/src/zen/zen.h index 9ca228e37..995bd13b6 100644 --- a/src/zen/zen.h +++ b/src/zen/zen.h @@ -79,6 +79,26 @@ public: std::string Details; uint64_t TotalCount = 0; uint64_t RemainingCount = 0; + enum class EStatus + { + Running, + Aborted, + Paused + }; + EStatus Status = EStatus::Running; + + static EStatus CalculateStatus(bool IsAborted, bool IsPaused) + { + if (IsAborted) + { + return EStatus::Aborted; + } + if (IsPaused) + { + return EStatus::Paused; + } + return EStatus::Running; + } }; enum class Mode @@ -104,6 +124,7 @@ private: const Mode m_Mode; Stopwatch m_SW; uint64_t m_LastUpdateMS; + uint64_t m_PausedMS; State m_State; const std::string m_SubTask; size_t m_LastOutputLength = 0; diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index c4264bc29..46337ffc8 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <dirent.h> # include <fcntl.h> # include <sys/resource.h> +# include <sys/mman.h> # include <sys/stat.h> # include <pwd.h> # include <unistd.h> @@ -43,6 +44,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <fcntl.h> # include <libproc.h> # include <sys/resource.h> +# include <sys/mman.h> # include <sys/stat.h> # include <sys/syslimits.h> # include <pwd.h> @@ -2824,6 +2826,218 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly) return Result; } +class SharedMemoryImpl : public SharedMemory +{ +public: + struct Data + { + void* Handle = nullptr; + void* DataPtr = nullptr; + size_t Size = 0; + std::string Name; + }; + + static Data Open(std::string_view Name, size_t Size, bool SystemGlobal) + { +#if ZEN_PLATFORM_WINDOWS + std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name)); + + HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, InstanceMapName.c_str()); + if (hMap == NULL) + { + return {}; + } + void* pBuf = MapViewOfFile(hMap, // handle to map object + FILE_MAP_ALL_ACCESS, // read/write permission + 0, // offset high + 0, // offset low + DWORD(Size)); // size + + if (pBuf == NULL) + { + CloseHandle(hMap); + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + ZEN_UNUSED(SystemGlobal); + std::string InstanceMapName = fmt::format("/{}", Name); + + int Fd = shm_open(InstanceMapName.c_str(), O_RDWR, 0666); + if (Fd < 0) + { + return {}; + } + void* hMap = (void*)intptr_t(Fd); + + struct stat Stat; + fstat(Fd, &Stat); + + if (size_t(Stat.st_size) < Size) + { + close(Fd); + return {}; + } + + void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0); + if (pBuf == MAP_FAILED) + { + close(Fd); + return {}; + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + static Data Create(std::string_view Name, size_t Size, bool SystemGlobal) + { +#if ZEN_PLATFORM_WINDOWS + std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name)); + + SECURITY_ATTRIBUTES m_Attributes{}; + SECURITY_DESCRIPTOR m_Sd{}; + + m_Attributes.nLength = sizeof m_Attributes; + m_Attributes.bInheritHandle = false; // Disable inheritance + + const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); + + if (Success) + { + if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE)) + { + ThrowLastError("SetSecurityDescriptorDacl failed"); + } + + m_Attributes.lpSecurityDescriptor = &m_Sd; + } + + HANDLE hMap = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file + &m_Attributes, // allow anyone to access + PAGE_READWRITE, // read/write access + 0, // maximum object size (high-order DWORD) + DWORD(Size), // maximum object size (low-order DWORD) + InstanceMapName.c_str()); + if (hMap == NULL) + { + return {}; + } + void* pBuf = MapViewOfFile(hMap, // handle to map object + FILE_MAP_ALL_ACCESS, // read/write permission + 0, // offset high + 0, // offset low + DWORD(Size)); // size + + if (pBuf == NULL) + { + CloseHandle(hMap); + return {}; + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + ZEN_UNUSED(SystemGlobal); + std::string InstanceMapName = fmt::format("/{}", Name); + + int Fd = shm_open(InstanceMapName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666); + if (Fd < 0) + { + return {}; + } + fchmod(Fd, 0666); + void* hMap = (void*)intptr_t(Fd); + + int Result = ftruncate(Fd, Size); + ZEN_UNUSED(Result); + + void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0); + if (pBuf == MAP_FAILED) + { + close(Fd); + return {}; + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + static void Close(Data&& MemMap, bool Delete) + { +#if ZEN_PLATFORM_WINDOWS + ZEN_UNUSED(Delete); + if (MemMap.DataPtr != nullptr) + { + UnmapViewOfFile(MemMap.DataPtr); + MemMap.DataPtr = nullptr; + } + if (MemMap.Handle != nullptr) + { + CloseHandle(MemMap.Handle); + MemMap.Handle = nullptr; + } +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (MemMap.DataPtr != nullptr) + { + munmap(MemMap.DataPtr, MemMap.Size); + MemMap.DataPtr = nullptr; + } + + if (MemMap.Handle != nullptr) + { + int Fd = int(intptr_t(MemMap.Handle)); + close(Fd); + MemMap.Handle = nullptr; + } + if (Delete) + { + std::string InstanceMapName = fmt::format("/{}", MemMap.Name); + shm_unlink(InstanceMapName.c_str()); + } +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + SharedMemoryImpl(Data&& MemMap, bool IsOwned) : m_MemMap(std::move(MemMap)), m_IsOwned(IsOwned) {} + virtual ~SharedMemoryImpl() + { + try + { + Close(std::move(m_MemMap), /*Delete*/ m_IsOwned); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("SharedMemoryImpl::~SharedMemoryImpl threw exception: {}", Ex.what()); + } + } + + virtual void* GetData() override { return m_MemMap.DataPtr; } + +private: + Data m_MemMap; + const bool m_IsOwned = false; +}; + +std::unique_ptr<SharedMemory> +OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal) +{ + SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Open(Name, Size, SystemGlobal); + if (MemMap.DataPtr) + { + return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ false); + } + return {}; +} + +std::unique_ptr<SharedMemory> +CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal) +{ + SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Create(Name, Size, SystemGlobal); + if (MemMap.DataPtr) + { + return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ true); + } + return {}; +} + ////////////////////////////////////////////////////////////////////////// // // Testing related code follows... @@ -3108,6 +3322,22 @@ TEST_CASE("RotateDirectories") } } +TEST_CASE("SharedMemory") +{ + CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false)); + CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, true)); + + { + auto Mem0 = CreateSharedMemory("SharedMemoryTest0", 482, false); + CHECK(Mem0); + strcpy((char*)Mem0->GetData(), "this is the string we are looking for"); + auto Mem1 = OpenSharedMemory("SharedMemoryTest0", 482, false); + CHECK_EQ(std::string((char*)Mem0->GetData()), std::string((char*)Mem1->GetData())); + } + + CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false)); +} + #endif } // namespace zen diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index dfd0eedc9..36d4d1b68 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -385,6 +385,16 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly); bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly, std::error_code& Ec); bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly); +class SharedMemory +{ +public: + virtual ~SharedMemory() {} + virtual void* GetData() = 0; +}; + +std::unique_ptr<SharedMemory> OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal); +std::unique_ptr<SharedMemory> CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal); + ////////////////////////////////////////////////////////////////////////// void filesystem_forcelink(); // internal diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h index d1394cd9a..d3e1de703 100644 --- a/src/zencore/include/zencore/process.h +++ b/src/zencore/include/zencore/process.h @@ -98,7 +98,7 @@ ZENCORE_API int GetCurrentProcessId(); int GetProcessId(CreateProcResult ProcId); std::filesystem::path GetProcessExecutablePath(int Pid, std::error_code& OutEc); -std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle); +std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf = true); void process_forcelink(); // internal diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp index 48efc3f85..fcbe657cb 100644 --- a/src/zencore/process.cpp +++ b/src/zencore/process.cpp @@ -929,7 +929,7 @@ GetProcessExecutablePath(int Pid, std::error_code& OutEc) } std::error_code -FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle) +FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf) { #if ZEN_PLATFORM_WINDOWS HANDLE ProcessSnapshotHandle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); @@ -939,13 +939,15 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand } auto _ = MakeGuard([&]() { CloseHandle(ProcessSnapshotHandle); }); + const DWORD ThisProcessId = ::GetCurrentProcessId(); + PROCESSENTRY32 Entry; Entry.dwSize = sizeof(PROCESSENTRY32); if (Process32First(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry)) { do { - if (ExecutableImage.filename() == Entry.szExeFile) + if ((IncludeSelf || (Entry.th32ProcessID != ThisProcessId)) && (ExecutableImage.filename() == Entry.szExeFile)) { std::error_code Ec; std::filesystem::path EntryPath = GetProcessExecutablePath(Entry.th32ProcessID, Ec); @@ -970,6 +972,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand } } } while (::Process32Next(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry)); + return {}; } return MakeErrorCodeFromLastError(); #endif // ZEN_PLATFORM_WINDOWS @@ -980,6 +983,8 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand struct kinfo_proc* Processes = nullptr; uint32_t ProcCount = 0; + const pid_t ThisProcessId = getpid(); + if (sysctl(Mib, 4, NULL, &BufferSize, NULL, 0) != -1 && BufferSize > 0) { struct kinfo_proc* Processes = (struct kinfo_proc*)malloc(BufferSize); @@ -990,36 +995,46 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand char Buffer[PROC_PIDPATHINFO_MAXSIZE]; for (uint32_t ProcIndex = 0; ProcIndex < ProcCount; ProcIndex++) { - pid_t Pid = Processes[ProcIndex].kp_proc.p_pid; - std::error_code Ec; - std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec); - if (!Ec) + pid_t Pid = Processes[ProcIndex].kp_proc.p_pid; + if (IncludeSelf || (Pid != ThisProcessId)) { - if (EntryPath == ExecutableImage) + std::error_code Ec; + std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec); + if (!Ec) { - if (Processes[ProcIndex].kp_proc.p_stat != SZOMB) + if (EntryPath == ExecutableImage) { - OutHandle.Initialize(Pid, Ec); - return Ec; + if (Processes[ProcIndex].kp_proc.p_stat != SZOMB) + { + OutHandle.Initialize(Pid, Ec); + return Ec; + } } } + Ec.clear(); } } + return {}; } } return MakeErrorCodeFromLastError(); #endif // ZEN_PLATFORM_MAC #if ZEN_PLATFORM_LINUX + const pid_t ThisProcessId = getpid(); + std::vector<uint32_t> RunningPids; DirectoryContent ProcList; GetDirectoryContent("/proc", DirectoryContentFlags::IncludeDirs, ProcList); for (const std::filesystem::path& EntryPath : ProcList.Directories) { std::string EntryName = EntryPath.stem(); - std::optional<uint32_t> Pid = ParseInt<uint32_t>(EntryName); - if (Pid.has_value()) + std::optional<uint32_t> PidMaybe = ParseInt<uint32_t>(EntryName); + if (PidMaybe.has_value()) { - RunningPids.push_back(Pid.value()); + if (pid_t Pid = PidMaybe.value(); IncludeSelf || (Pid != ThisProcessId)) + { + RunningPids.push_back(Pid); + } } } @@ -1042,6 +1057,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand } } } + Ec.clear(); } return {}; #endif // ZEN_PLATFORM_LINUX @@ -1065,10 +1081,24 @@ TEST_CASE("Process") TEST_CASE("FindProcess") { - ProcessHandle Process; - std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process); - CHECK(!Ec); - CHECK(Process.IsValid()); + { + ProcessHandle Process; + std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ true); + CHECK(!Ec); + CHECK(Process.IsValid()); + } + { + ProcessHandle Process; + std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ false); + CHECK(!Ec); + CHECK(!Process.IsValid()); + } + { + ProcessHandle Process; + std::error_code Ec = FindProcess("this/does\\not/exist\\123914921929412312312312asdad\\12134.no", Process, /*IncludeSelf*/ false); + CHECK(!Ec); + CHECK(!Process.IsValid()); + } } TEST_CASE("BuildArgV") diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f7e63433b..9f2e826d6 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1588,7 +1588,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { @@ -1638,8 +1639,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } }); } - Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_INFO("Replayed {} of {} requests, elapsed {}", RequestCount - PendingWork, RequestCount, diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 3ec4373a2..a2e73380f 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1597,7 +1597,8 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo }; std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { if (OptionalWorkerPool) @@ -2111,7 +2112,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, { std::atomic_bool Result = true; std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { @@ -3890,7 +3892,8 @@ ProjectStore::Flush() WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (const Ref<Project>& Project : Projects) { diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index 41c747e08..afb7e4bee 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -483,7 +483,8 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (!MetaLocations.empty()) { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); m_MetadataBlockStore.IterateChunks( MetaLocations, diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index e973cee77..3f1f0e34a 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -3978,7 +3978,8 @@ ZenCacheDiskLayer::DiscoverBuckets() WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (auto& BucketPath : FoundBucketDirectories) { Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { @@ -4141,7 +4142,8 @@ ZenCacheDiskLayer::Flush() { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); try { for (auto& Bucket : Buckets) @@ -4164,8 +4166,10 @@ ZenCacheDiskLayer::Flush() { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - Work.Wait(1000, - [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); }); + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) { + ZEN_UNUSED(IsAborted, IsPaused); + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir); + }); } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 75562176e..0c9302ec8 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -402,7 +402,8 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas std::atomic<bool> AsyncContinue = true; { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); const bool Continue = m_BlockStore.IterateChunks( FoundChunkLocations, [this, diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 56979f267..539b5e95b 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -664,7 +664,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, }; std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { if (!AsyncContinue) diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index c7532e098..cd1bf7dd7 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -758,14 +758,15 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span } ChunkedFolderContent -ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateIntervalMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag) +ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag) { ZEN_TRACE_CPU("ChunkFolderContent"); @@ -804,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t PathIndex : Order) { @@ -831,10 +832,9 @@ ChunkFolderContent(ChunkingStatistics& Stats, }); } - Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(UpdateIntervalMS, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); - UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining()); + UpdateCallback(IsAborted, IsPaused, Work.PendingWork().Remaining()); }); } return Result; diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 225b1a3a5..306a5d990 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -132,14 +132,15 @@ struct ChunkingStatistics uint64_t ElapsedWallTimeUS = 0; }; -ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateIntervalMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag); +ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag); ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content); diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h index 08e730b28..d7e986551 100644 --- a/src/zenutil/include/zenutil/parallelwork.h +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -12,13 +12,13 @@ namespace zen { class ParallelWork { public: - ParallelWork(std::atomic<bool>& AbortFlag); + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); ~ParallelWork(); - typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; - typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; - typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback; + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback; void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) { @@ -28,6 +28,10 @@ public: WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { try { + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } Work(m_AbortFlag); } catch (...) @@ -59,6 +63,7 @@ private: void RethrowErrors(); std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; bool m_DispatchComplete = false; Latch m_PendingWork; diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index ecacc4b5a..67fc03c04 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -14,7 +14,10 @@ namespace zen { -ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +: m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_PendingWork(1) { } @@ -59,7 +62,7 @@ ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) while (!m_PendingWork.Wait(UpdateIntervalMS)) { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); } RethrowErrors(); @@ -111,7 +114,8 @@ ParallelWork::RethrowErrors() TEST_CASE("parallellwork.nowork") { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); Work.Wait(); } @@ -120,7 +124,8 @@ TEST_CASE("parallellwork.basic") WorkerThreadPool WorkerPool(2); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); @@ -133,7 +138,8 @@ TEST_CASE("parallellwork.throws_in_work") WorkerThreadPool WorkerPool(2); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { @@ -158,7 +164,8 @@ TEST_CASE("parallellwork.throws_in_dispatch") try { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index bfa0d3c49..a5b342cb0 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -1059,7 +1059,7 @@ ZenServerInstance::Terminate() const std::filesystem::path BaseDir = m_Env.ProgramBaseDir(); const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; ProcessHandle RunningProcess; - std::error_code Ec = FindProcess(Executable, RunningProcess); + std::error_code Ec = FindProcess(Executable, RunningProcess, /*IncludeSelf*/ false); if (Ec) { throw std::system_error(Ec, fmt::format("failed to look up running server executable '{}'", Executable)); |