diff options
| author | Stefan Boberg <[email protected]> | 2025-06-11 15:19:16 +0200 |
|---|---|---|
| committer | Stefan Boberg <[email protected]> | 2025-06-11 15:19:16 +0200 |
| commit | 9a6f0b91802acb5fdfce30952fd7d3f49d55d51e (patch) | |
| tree | 1eaac7f01c808d65a636c931d2a0fa1df7a1d184 /src | |
| parent | Fixed accidental path copy (diff) | |
| parent | Merge branch 'main' into rpc-analyze (diff) | |
| download | zen-9a6f0b91802acb5fdfce30952fd7d3f49d55d51e.tar.xz zen-9a6f0b91802acb5fdfce30952fd7d3f49d55d51e.zip | |
Merge branch 'rpc-analyze' of https://github.ol.epicgames.net/ue-foundation/zen into rpc-analyze
Diffstat (limited to 'src')
34 files changed, 1608 insertions, 402 deletions
diff --git a/src/zen/cmds/builds_cmd.cpp b/src/zen/cmds/builds_cmd.cpp index fbcb6b900..abb4bfe0c 100644 --- a/src/zen/cmds/builds_cmd.cpp +++ b/src/zen/cmds/builds_cmd.cpp @@ -14,6 +14,7 @@ #include <zencore/fmtutils.h> #include <zencore/logging.h> #include <zencore/scopeguard.h> +#include <zencore/session.h> #include <zencore/stream.h> #include <zencore/string.h> #include <zencore/trace.h> @@ -37,6 +38,7 @@ #include <signal.h> #include <memory> +#include <regex> ZEN_THIRD_PARTY_INCLUDES_START #include <tsl/robin_map.h> @@ -58,23 +60,205 @@ ZEN_THIRD_PARTY_INCLUDES_END #define ZEN_CLOUD_STORAGE "Cloud Storage" namespace zen { + +using namespace std::literals; + namespace { + namespace zenutil { +#if ZEN_PLATFORM_WINDOWS + class SecurityAttributes + { + public: + inline SECURITY_ATTRIBUTES* Attributes() { return &m_Attributes; } + + protected: + SECURITY_ATTRIBUTES m_Attributes{}; + SECURITY_DESCRIPTOR m_Sd{}; + }; + + // Security attributes which allows any user access + + class AnyUserSecurityAttributes : public SecurityAttributes + { + public: + AnyUserSecurityAttributes() + { + m_Attributes.nLength = sizeof m_Attributes; + m_Attributes.bInheritHandle = false; // Disable inheritance + + const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); + + if (Success) + { + if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE)) + { + ThrowLastError("SetSecurityDescriptorDacl failed"); + } + + m_Attributes.lpSecurityDescriptor = &m_Sd; + } + } + }; +#endif // ZEN_PLATFORM_WINDOWS + + } // namespace zenutil + static std::atomic<bool> AbortFlag = false; - static void SignalCallbackHandler(int SigNum) + static std::atomic<bool> PauseFlag = false; + + static void SignalCallbackHandler(int SigNum) { if (SigNum == SIGINT) { + PauseFlag = false; AbortFlag = true; } #if ZEN_PLATFORM_WINDOWS if (SigNum == SIGBREAK) { + PauseFlag = false; AbortFlag = true; } #endif // ZEN_PLATFORM_WINDOWS } - using namespace std::literals; + struct ZenStateSharedData + { + static constexpr uint64_t kMagicV1 = 0x3176646d636e657a; // zencmdv1 + + uint64_t Magic = 0; // Implies the size and layout of this struct - changes to the data requires change to Magic constant + std::atomic<uint32_t> Pid; + uint8_t SessionId[12]; + uint8_t Padding1[4]; + std::atomic<uint8_t> Abort; + std::atomic<uint8_t> Pause; + uint8_t Padding2[2]; + }; + + struct MemMap + { + void* Handle = nullptr; + void* Data = nullptr; + size_t Size = 0; + std::string Name; + }; + + class ZenState + { + public: + ZenState(const ZenState&) = delete; + ZenState& operator=(const ZenState&) = delete; + + ZenState(); + explicit ZenState(uint32_t Pid); + ~ZenState(); + + const ZenStateSharedData& StateData() const + { + ZEN_ASSERT(m_Data); + return *m_Data; + } + ZenStateSharedData& StateData() + { + ZEN_ASSERT(m_Data); + return *m_Data; + } + + private: + static constexpr std::string_view MapBaseName = "UnrealEngineZenCmd_"sv; + static constexpr size_t MapSize = sizeof(ZenStateSharedData); + + bool m_Created = false; + std::unique_ptr<SharedMemory> m_MemMap; + ZenStateSharedData* m_Data = nullptr; + + std::thread m_StateMonitor; + Event m_ExitStateMonitorEvent; + }; + + ZenState::ZenState(uint32_t Pid) + { + const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid); + + if (!IsProcessRunning(Pid)) + { + throw std::runtime_error(fmt::format("The process {} is not running", Pid)); + } + std::unique_ptr<SharedMemory> MemMap = OpenSharedMemory(ZenStateMapName, MapSize, false); + if (!MemMap) + { + throw std::runtime_error(fmt::format("The process {} is not a running zen process", Pid)); + } + ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData(); + if (uint64_t MemMagic = data->Magic; MemMagic != ZenStateSharedData::kMagicV1) + { + throw std::runtime_error(fmt::format("The mem map for process {} has an unsupported magic {:x}, expected {:x}", + Pid, + MemMagic, + ZenStateSharedData::kMagicV1)); + } + if (uint32_t MemPid = data->Pid.load(); MemPid != Pid) + { + throw std::runtime_error( + fmt::format("The mem map for process {} has an missmatching pid of {}, expected {}", Pid, MemPid, Pid)); + } + m_MemMap = std::move(MemMap); + m_Data = data; + } + + ZenState::ZenState() + { + int Pid = GetCurrentProcessId(); + + const std::string ZenStateMapName = fmt::format("{}{}", MapBaseName, Pid); + + std::unique_ptr<SharedMemory> MemMap = CreateSharedMemory(ZenStateMapName, MapSize, false); + if (!MemMap) + { + throw std::runtime_error(fmt::format("The mem map for process {} could not be created", Pid)); + } + + ZenStateSharedData* data = (ZenStateSharedData*)MemMap->GetData(); + memset(data, 0, sizeof(ZenStateSharedData)); + data->Magic = ZenStateSharedData::kMagicV1; + data->Pid.store(gsl::narrow<uint32_t>(Pid)); + data->SessionId; + data->Abort.store(false); + data->Pause.store(false); + const Oid SessionId = GetSessionId(); + memcpy(data->SessionId, &SessionId, sizeof SessionId); + + m_MemMap = std::move(MemMap); + m_Data = data; + + m_StateMonitor = std::thread([this]() { + while (!m_ExitStateMonitorEvent.Wait(500)) + { + if (m_Data->Abort.load()) + { + AbortFlag.store(true); + } + PauseFlag.store(m_Data->Pause.load()); + } + }); + } + + ZenState::~ZenState() + { + try + { + if (m_StateMonitor.joinable()) + { + m_ExitStateMonitorEvent.Set(); + m_StateMonitor.join(); + } + m_MemMap.reset(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("ZenState::~ZenState threw exception: {}", Ex.what()); + } + } static const size_t DefaultMaxBlockSize = 64u * 1024u * 1024u; static const size_t DefaultMaxChunkEmbedSize = 3u * 512u * 1024u; @@ -434,7 +618,7 @@ namespace { std::atomic<uint64_t> DiscoveredItemCount = 0; std::atomic<uint64_t> DeletedItemCount = 0; std::atomic<uint64_t> DeletedByteCount = 0; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); struct AsyncVisitor : public GetDirectoryContentVisitor { @@ -548,8 +732,8 @@ namespace { uint64_t LastUpdateTimeMs = Timer.GetElapsedTimeMs(); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); LastUpdateTimeMs = Timer.GetElapsedTimeMs(); uint64_t Deleted = DeletedItemCount.load(); @@ -558,7 +742,8 @@ namespace { Progress.UpdateState({.Task = "Cleaning folder ", .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), .TotalCount = Discovered, - .RemainingCount = Discovered - Deleted}, + .RemainingCount = Discovered - Deleted, + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); @@ -597,12 +782,17 @@ namespace { Progress.UpdateState({.Task = "Cleaning folder ", .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), .TotalCount = Discovered, - .RemainingCount = Discovered - Deleted}, + .RemainingCount = Discovered - Deleted, + .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, false); } } Progress.Finish(); + if (AbortFlag) + { + return false; + } uint64_t ElapsedTimeMs = Timer.GetElapsedTimeMs(); if (ElapsedTimeMs >= 200) @@ -1953,7 +2143,7 @@ namespace { WorkerThreadPool& NetworkPool = GetNetworkPool(); WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); const std::filesystem::path TempFolder = ".zen-tmp"; @@ -2116,8 +2306,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); const uint64_t DownloadedAttachmentCount = DownloadStats.DownloadedChunkCount + DownloadStats.DownloadedBlockCount; const uint64_t DownloadedByteCount = DownloadStats.DownloadedChunkByteCount + DownloadStats.DownloadedBlockByteCount; @@ -2140,7 +2330,8 @@ namespace { .Details = Details, .TotalCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2), .RemainingCount = gsl::narrow<uint64_t>(AttachmentsToVerifyCount * 2 - - (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load()))}, + (DownloadedAttachmentCount + ValidateStats.VerifiedAttachmentCount.load())), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); @@ -2396,7 +2587,7 @@ namespace { FilteredRate FilteredGeneratedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<uint64_t> QueuedPendingBlocksForUpload = 0; @@ -2566,8 +2757,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); FilteredGeneratedBytesPerSecond.Update(GenerateBlocksStats.GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadStats.BlocksBytes.load()); @@ -2586,7 +2777,8 @@ namespace { {.Task = "Generating blocks", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(NewBlockCount), - .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load())}, + .RemainingCount = gsl::narrow<uint64_t>(NewBlockCount - GenerateBlocksStats.GeneratedBlockCount.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); @@ -2625,7 +2817,7 @@ namespace { FilteredRate FilteredCompressedBytesPerSecond; FilteredRate FilteredUploadedBytesPerSecond; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<size_t> UploadedBlockSize = 0; std::atomic<size_t> UploadedBlockCount = 0; @@ -3005,8 +3197,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); FilteredCompressedBytesPerSecond.Update(LooseChunksStats.CompressedChunkRawBytes.load()); FilteredGenerateBlockBytesPerSecond.Update(GeneratedBlockByteCount.load()); FilteredUploadedBytesPerSecond.Update(UploadedCompressedChunkSize.load() + UploadedBlockSize.load()); @@ -3034,13 +3226,15 @@ namespace { ProgressBar.UpdateState({.Task = "Uploading blobs ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(TotalRawSize), - .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize)}, + .RemainingCount = gsl::narrow<uint64_t>(TotalRawSize - UploadedRawSize), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); ZEN_ASSERT(AbortFlag || QueuedPendingInMemoryBlocksForUpload.load() == 0); ProgressBar.Finish(); + UploadStats.ElapsedWallTimeUS = FilteredUploadedBytesPerSecond.GetElapsedTimeUS(); LooseChunksStats.CompressChunksElapsedWallTimeUS = FilteredCompressedBytesPerSecond.GetElapsedTimeUS(); } @@ -3492,7 +3686,7 @@ namespace { Content, *ChunkController, GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), @@ -3505,16 +3699,18 @@ namespace { ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = TotalRawSize, - .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load()}, + .RemainingCount = TotalRawSize - ChunkingStats.BytesHashed.load(), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, - AbortFlag); + AbortFlag, + PauseFlag); + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); if (AbortFlag) { return; } - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); } ZEN_CONSOLE("Found {} ({}) files divided into {} ({}) unique chunks in '{}' in {}. Average hash rate {}B/sec", @@ -4265,7 +4461,7 @@ namespace { WorkerThreadPool& VerifyPool = GetIOWorkerPool(); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); const uint32_t PathCount = gsl::narrow<uint32_t>(Content.Paths.size()); @@ -4413,8 +4609,8 @@ namespace { }); } - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); std::string Details = fmt::format("Verified {}/{} ({}). Failed files: {}", VerifyFolderStats.FilesVerified.load(), PathCount, @@ -4423,12 +4619,18 @@ namespace { ProgressBar.UpdateState({.Task = "Verifying files ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(PathCount), - .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load())}, + .RemainingCount = gsl::narrow<uint64_t>(PathCount - VerifyFolderStats.FilesVerified.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); VerifyFolderStats.VerifyElapsedWallTimeUs = Timer.GetElapsedTimeUs(); ProgressBar.Finish(); + if (AbortFlag) + { + return; + } + for (const std::string& Error : Errors) { ZEN_CONSOLE("{}", Error); @@ -5360,7 +5562,7 @@ namespace { Stopwatch Timer; auto _ = MakeGuard([&LocalFolderScanStats, &Timer]() { LocalFolderScanStats.ElapsedWallTimeUS = Timer.GetElapsedTimeUs(); }); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<uint64_t> CompletedPathCount = 0; uint32_t PathIndex = 0; @@ -5391,7 +5593,7 @@ namespace { }); PathIndex += PathRangeCount; } - Work.Wait(200, [&](bool, ptrdiff_t) { + Work.Wait(200, [&](bool, bool, ptrdiff_t) { if (ProgressCallback) { ProgressCallback(PathCount, CompletedPathCount.load()); @@ -5669,7 +5871,7 @@ namespace { ScavengedPaths.resize(ScavengePathCount); ProgressBar ScavengeProgressBar(ProgressMode, "Scavenging"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); std::atomic<uint64_t> PathsFound(0); std::atomic<uint64_t> ChunksFound(0); @@ -5799,8 +6001,8 @@ namespace { { ZEN_TRACE_CPU("ScavengeScan_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); std::string Details = fmt::format("{}/{} scanned. {} paths and {} chunks found for scavanging", PathsScavenged.load(), ScavengePathCount, @@ -5809,11 +6011,17 @@ namespace { ScavengeProgressBar.UpdateState({.Task = "Scavenging ", .Details = Details, .TotalCount = ScavengePathCount, - .RemainingCount = ScavengePathCount - PathsScavenged.load()}, + .RemainingCount = ScavengePathCount - PathsScavenged.load(), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } + ScavengeProgressBar.Finish(); + if (AbortFlag) + { + return; + } for (uint32_t ScavengedContentIndex = 0; ScavengedContentIndex < ScavengedContents.size() && (!SequenceIndexesLeftToFindToRemoteIndex.empty()); @@ -6129,7 +6337,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar WriteProgressBar(ProgressMode, PrimeCacheOnly ? "Downloading" : "Writing"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); struct LooseChunkHashWorkData { @@ -7582,8 +7790,8 @@ namespace { { ZEN_TRACE_CPU("WriteChunks_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); uint64_t DownloadedBytes = DownloadStats.DownloadedChunkByteCount.load() + DownloadStats.DownloadedBlockByteCount.load() + +DownloadStats.DownloadedPartialBlockByteCount.load(); @@ -7609,7 +7817,8 @@ namespace { .Details = Details, .TotalCount = PrimeCacheOnly ? TotalRequestCount : BytesToWrite, .RemainingCount = PrimeCacheOnly ? (TotalRequestCount - DownloadStats.RequestsCompleteCount.load()) - : (BytesToWrite - DiskStats.WriteByteCount.load())}, + : (BytesToWrite - DiskStats.WriteByteCount.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } @@ -7617,13 +7826,12 @@ namespace { FilteredWrittenBytesPerSecond.Stop(); FilteredDownloadedBytesPerSecond.Stop(); + WriteProgressBar.Finish(); if (AbortFlag) { return; } - WriteProgressBar.Finish(); - if (!PrimeCacheOnly) { uint32_t RawSequencesMissingWriteCount = 0; @@ -7770,7 +7978,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar CacheLocalProgressBar(ProgressMode, "Cache Local Data"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t LocalPathIndex : FilesToCache) { @@ -7799,26 +8007,26 @@ namespace { { ZEN_TRACE_CPU("CacheLocal_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = FilesToCache.size(); const uint64_t WorkComplete = CachedCount.load(); std::string Details = fmt::format("{}/{} ({}) files", WorkComplete, WorkTotal, NiceBytes(CachedByteCount)); CacheLocalProgressBar.UpdateState({.Task = "Caching local ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)}, + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } + CacheLocalProgressBar.Finish(); if (AbortFlag) { return; } - CacheLocalProgressBar.Finish(); - ZEN_DEBUG( "Local state prep: Match: {}, PathMismatch: {}, HashMismatch: {}, Cached: {} ({}), Skipped: {}, " "Delete: {}", @@ -7857,7 +8065,7 @@ namespace { WorkerThreadPool& WritePool = GetIOWorkerPool(); ProgressBar RebuildProgressBar(ProgressMode, "Rebuild State"); - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); OutLocalFolderState.Paths.resize(RemoteContent.Paths.size()); OutLocalFolderState.RawSizes.resize(RemoteContent.Paths.size()); @@ -8108,26 +8316,21 @@ namespace { { ZEN_TRACE_CPU("FinalizeTree_Wait"); - Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted, PendingWork); + Work.Wait(GetUpdateDelayMS(ProgressMode), [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); const uint64_t WorkTotal = Targets.size() + RemoveLocalPathIndexes.size(); const uint64_t WorkComplete = TargetsComplete.load() + DeletedCount.load(); std::string Details = fmt::format("{}/{} files", WorkComplete, WorkTotal); RebuildProgressBar.UpdateState({.Task = "Rebuilding state ", .Details = Details, .TotalCount = gsl::narrow<uint64_t>(WorkTotal), - .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete)}, + .RemainingCount = gsl::narrow<uint64_t>(WorkTotal - WorkComplete), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); } RebuildFolderStateStats.FinalizeTreeElapsedWallTimeUs = Timer.GetElapsedTimeUs(); - - if (AbortFlag) - { - return; - } - RebuildProgressBar.Finish(); } } @@ -8689,10 +8892,15 @@ namespace { ProgressBar.UpdateState({.Task = "Checking files ", .Details = Details, .TotalCount = PathCount, - .RemainingCount = PathCount - CompletedPathCount}, + .RemainingCount = PathCount - CompletedPathCount, + .Status = ProgressBar::State::CalculateStatus(AbortFlag, PauseFlag)}, false); }); ProgressBar.Finish(); + if (AbortFlag) + { + return {}; + } } bool ScanContent = true; @@ -8730,7 +8938,7 @@ namespace { UpdatedContent, ChunkController, GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), @@ -8743,16 +8951,19 @@ namespace { ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, - .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load()}, + .RemainingCount = ByteCountToScan - ChunkingStats.BytesHashed.load(), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, - AbortFlag); + AbortFlag, + PauseFlag); + + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); if (AbortFlag) { return {}; } - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); LocalContent = MergeChunkedFolderContents(LocalContent, {{UpdatedLocalContent}}); } } @@ -8798,7 +9009,7 @@ namespace { OutLocalFolderContent, ChunkController, GetUpdateDelayMS(ProgressMode), - [&](bool, std::ptrdiff_t) { + [&](bool IsAborted, bool IsPaused, std::ptrdiff_t) { FilteredBytesHashed.Update(ChunkingStats.BytesHashed.load()); std::string Details = fmt::format("{}/{} ({}/{}, {}B/s) scanned, {} ({}) chunks found", ChunkingStats.FilesProcessed.load(), @@ -8811,18 +9022,19 @@ namespace { ProgressBar.UpdateState({.Task = "Scanning files ", .Details = Details, .TotalCount = ByteCountToScan, - .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load())}, + .RemainingCount = (ByteCountToScan - ChunkingStats.BytesHashed.load()), + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }, - AbortFlag); + AbortFlag, + PauseFlag); + FilteredBytesHashed.Stop(); + ProgressBar.Finish(); if (AbortFlag) { return {}; } - - FilteredBytesHashed.Stop(); - ProgressBar.Finish(); } return LocalContent; } @@ -9433,7 +9645,6 @@ BuildsCommand::BuildsCommand() auto AddCloudOptions = [this, &AddAuthOptions](cxxopts::Options& Ops) { AddAuthOptions(Ops); - Ops.add_option("cloud build", "", "override-host", "Cloud Builds URL", cxxopts::value(m_OverrideHost), "<override-host>"); Ops.add_option("cloud build", "", @@ -9441,6 +9652,7 @@ BuildsCommand::BuildsCommand() "Cloud Builds host url (legacy - use --override-host)", cxxopts::value(m_OverrideHost), "<url>"); + Ops.add_option("cloud build", "", "cloud-url", "Cloud Artifact URL", cxxopts::value(m_Url), "<cloud-url>"); Ops.add_option("cloud build", "", "host", "Cloud Builds host", cxxopts::value(m_Host), "<host>"); Ops.add_option("cloud build", "", @@ -9745,6 +9957,21 @@ BuildsCommand::BuildsCommand() m_FetchBlobOptions.parse_positional({"build-id", "blob-hash"}); m_FetchBlobOptions.positional_help("build-id blob-hash"); + auto AddZenProcessOptions = [this](cxxopts::Options& Ops) { + Ops.add_option("", "", "process-id", "Process id of running process", cxxopts::value(m_ZenProcessId), "<pid>"); + }; + AddZenProcessOptions(m_PauseOptions); + m_PauseOptions.parse_positional({"process-id"}); + m_PauseOptions.positional_help("process-id"); + + AddZenProcessOptions(m_ResumeOptions); + m_ResumeOptions.parse_positional({"process-id"}); + m_ResumeOptions.positional_help("process-id"); + + AddZenProcessOptions(m_AbortOptions); + m_AbortOptions.parse_positional({"process-id"}); + m_AbortOptions.positional_help("process-id"); + AddSystemOptions(m_ValidateBuildPartOptions); AddCloudOptions(m_ValidateBuildPartOptions); AddFileOptions(m_ValidateBuildPartOptions); @@ -9829,22 +10056,75 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; ParseSystemOptions(); - auto ParseStorageOptions = [&]() { + auto ParseStorageOptions = [&](bool RequireNamespace, bool RequireBucket) { + m_Host = RemoveQuotes(m_Host); + m_OverrideHost = RemoveQuotes(m_OverrideHost); + m_Url = RemoveQuotes(m_Url); + m_Namespace = RemoveQuotes(m_Namespace); + m_Bucket = RemoveQuotes(m_Bucket); + if (!m_Url.empty()) + { + if (!m_Host.empty()) + { + throw zen::OptionParseException(fmt::format("host is not compatible with the url option\n{}", SubOption->help())); + } + if (!m_Bucket.empty()) + { + throw zen::OptionParseException(fmt::format("bucket is not compatible with the url option\n{}", SubOption->help())); + } + if (!m_BuildId.empty()) + { + throw zen::OptionParseException(fmt::format("buildid is not compatible with the url option\n{}", SubOption->help())); + } + const std::string ArtifactURLRegExString = R"((.*?:\/\/.*?)\/api\/v2\/builds\/(.*?)\/(.*?)\/(.*))"; + const std::regex ArtifactURLRegEx(ArtifactURLRegExString, std::regex::ECMAScript); + std::match_results<std::string_view::const_iterator> MatchResults; + const std::string_view Url(RemoveQuotes(m_Url)); + if (regex_match(begin(Url), end(Url), MatchResults, ArtifactURLRegEx) && MatchResults.size() == 5) + { + auto GetMatch = [&MatchResults](uint32_t Index) -> std::string_view { + ZEN_ASSERT(Index < MatchResults.size()); + + const auto& Match = MatchResults[Index]; + + return std::string_view(&*Match.first, Match.second - Match.first); + }; + + const std::string_view Host = GetMatch(1); + const std::string_view Namespace = GetMatch(2); + const std::string_view Bucket = GetMatch(3); + const std::string_view BuildId = GetMatch(4); + + m_Host = Host; + m_Namespace = Namespace; + m_Bucket = Bucket; + m_BuildId = BuildId; + } + else + { + throw zen::OptionParseException(fmt::format("url does not match the Cloud Artifact URL format\n{}", SubOption->help())); + } + } + if (!m_OverrideHost.empty() || !m_Host.empty()) { if (!m_StoragePath.empty()) { - throw zen::OptionParseException(fmt::format("url is not compatible with the storage-path option\n{}", m_Options.help())); + throw zen::OptionParseException( + fmt::format("host/url/override-host is not compatible with the storage-path option\n{}", SubOption->help())); } - if (SubOption != &m_ListNamespacesOptions && (m_Namespace.empty() || m_Bucket.empty())) + if (RequireNamespace && m_Namespace.empty()) { - throw zen::OptionParseException( - fmt::format("namespace and bucket options are required for url option\n{}", m_Options.help())); + throw zen::OptionParseException(fmt::format("namespace option is required for this storage option\n{}", SubOption->help())); + } + if (RequireBucket && m_Bucket.empty()) + { + throw zen::OptionParseException(fmt::format("bucket option is required for this storage option\n{}", SubOption->help())); } } else if (m_StoragePath.empty()) { - throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("At least one storage option is required\n{}", SubOption->help())); } MakeSafeAbsolutePathÍnPlace(m_StoragePath); }; @@ -9887,6 +10167,19 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; auto ParseAuthOptions = [&]() { + m_OpenIdProviderUrl = RemoveQuotes(m_OpenIdProviderUrl); + m_OpenIdClientId = RemoveQuotes(m_OpenIdClientId); + + m_AccessToken = RemoveQuotes(m_AccessToken); + m_EncryptionKey = RemoveQuotes(m_EncryptionKey); + m_EncryptionIV = RemoveQuotes(m_EncryptionIV); + + m_OAuthUrl = RemoveQuotes(m_OAuthUrl); + m_OAuthClientId = RemoveQuotes(m_OAuthClientId); + m_OAuthClientSecret = RemoveQuotes(m_OAuthClientSecret); + + m_OidcTokenAuthExecutablePath = RemoveQuotes(m_OidcTokenAuthExecutablePath); + if (!m_OpenIdProviderUrl.empty() && !m_OpenIdClientId.empty()) { CreateAuthMgr(); @@ -9986,8 +10279,12 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) auto CreateBuildStorage = [&](BuildStorage::Statistics& StorageStats, BuildStorageCache::Statistics& StorageCacheStats, - const std::filesystem::path& TempPath) -> StorageInstance { - ParseStorageOptions(); + const std::filesystem::path& TempPath, + bool RequireNamespace, + bool RequireBucket) -> StorageInstance { + ParseStorageOptions(RequireNamespace, RequireBucket); + + m_ZenCacheHost = RemoveQuotes(m_ZenCacheHost); StorageInstance Result; @@ -10198,7 +10495,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } else { - throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Storage option is missing\n{}", SubOption->help())); } if (!m_ZenCacheHost.empty()) { @@ -10241,7 +10538,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) auto ParsePath = [&]() { if (m_Path.empty()) { - throw zen::OptionParseException(fmt::format("local-path is required\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("local-path is required\n{}", SubOption->help())); } MakeSafeAbsolutePathÍnPlace(m_Path); }; @@ -10249,34 +10546,36 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) auto ParseDiffPath = [&]() { if (m_DiffPath.empty()) { - throw zen::OptionParseException(fmt::format("compare-path is required\n{}", m_DownloadOptions.help())); + throw zen::OptionParseException(fmt::format("compare-path is required\n{}", SubOption->help())); } MakeSafeAbsolutePathÍnPlace(m_DiffPath); }; auto ParseBlobHash = [&]() -> IoHash { + m_BlobHash = RemoveQuotes(m_BlobHash); if (m_BlobHash.empty()) { - throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Blob hash string is missing\n{}", SubOption->help())); } IoHash BlobHash; if (!IoHash::TryParse(m_BlobHash, BlobHash)) { - throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Blob hash string is invalid\n{}", SubOption->help())); } return BlobHash; }; auto ParseBuildId = [&]() -> Oid { + m_BuildId = RemoveQuotes(m_BuildId); if (m_BuildId.length() != Oid::StringLength) { - throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Invalid build id\n{}", SubOption->help())); } else if (Oid BuildId = Oid::FromHexString(m_BuildId); BuildId == Oid::Zero) { - throw zen::OptionParseException(fmt::format("Invalid build id\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Invalid build id\n{}", SubOption->help())); } else { @@ -10285,13 +10584,14 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) }; auto ParseBuildPartId = [&]() -> Oid { + m_BuildPartId = RemoveQuotes(m_BuildPartId); if (m_BuildPartId.length() != Oid::StringLength) { - throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", SubOption->help())); } else if (Oid BuildPartId = Oid::FromHexString(m_BuildPartId); BuildPartId == Oid::Zero) { - throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Invalid build part id\n{}", SubOption->help())); } else { @@ -10303,25 +10603,38 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) std::vector<Oid> BuildPartIds; for (const std::string& BuildPartId : m_BuildPartIds) { - BuildPartIds.push_back(Oid::TryFromHexString(BuildPartId)); + BuildPartIds.push_back(Oid::TryFromHexString(RemoveQuotes(BuildPartId))); if (BuildPartIds.back() == Oid::Zero) { - throw zen::OptionParseException(fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, m_DownloadOptions.help())); + throw zen::OptionParseException(fmt::format("build-part-id '{}' is invalid\n{}", BuildPartId, SubOption->help())); } } return BuildPartIds; }; + auto ParseBuildPartNames = [&]() -> std::vector<std::string> { + std::vector<std::string> BuildPartNames; + for (const std::string& BuildPartName : m_BuildPartNames) + { + BuildPartNames.push_back(std::string(RemoveQuotes(BuildPartName))); + if (BuildPartNames.back().empty()) + { + throw zen::OptionParseException(fmt::format("build-part-names '{}' is invalid\n{}", BuildPartName, SubOption->help())); + } + } + return BuildPartNames; + }; + auto ParseBuildMetadata = [&]() -> CbObject { if (m_CreateBuild) { if (m_BuildMetadataPath.empty() && m_BuildMetadata.empty()) { - throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Options for builds target are missing\n{}", SubOption->help())); } if (!m_BuildMetadataPath.empty() && !m_BuildMetadata.empty()) { - throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("Conflicting options for builds target\n{}", SubOption->help())); } if (!m_BuildMetadataPath.empty()) @@ -10338,6 +10651,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } return MetaData; } + m_BuildMetadata = RemoveQuotes(m_BuildMetadata); if (!m_BuildMetadata.empty()) { CbObjectWriter MetaDataWriter(1024); @@ -10358,12 +10672,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (!m_BuildMetadataPath.empty()) { throw zen::OptionParseException( - fmt::format("metadata-path option is only valid if creating a build\n{}", m_UploadOptions.help())); + fmt::format("metadata-path option is only valid if creating a build\n{}", SubOption->help())); } if (!m_BuildMetadata.empty()) { - throw zen::OptionParseException( - fmt::format("metadata option is only valid if creating a build\n{}", m_UploadOptions.help())); + throw zen::OptionParseException(fmt::format("metadata option is only valid if creating a build\n{}", SubOption->help())); } } return {}; @@ -10378,7 +10691,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) { if (!m_ListResultPath.empty()) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", + GetRunningExecutablePath(), + ZEN_CFG_VERSION_BUILD_STRING_FULL, + GetCurrentProcessId()); } BuildStorage::Statistics StorageStats; @@ -10396,7 +10712,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(ZenFolderPath), + /*RequriesNamespace*/ false, + /*RequireBucket*/ false); CbObject Response = Storage.BuildStorage->ListNamespaces(m_ListNamespacesRecursive); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None); @@ -10432,7 +10752,10 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (!m_ListResultPath.empty()) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", + GetRunningExecutablePath(), + ZEN_CFG_VERSION_BUILD_STRING_FULL, + GetCurrentProcessId()); } CbObject QueryObject; if (m_ListQueryPath.empty()) @@ -10480,7 +10803,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ false); CbObject Response = Storage.BuildStorage->ListBuilds(QueryObject); ZEN_ASSERT(ValidateCompactBinary(Response.GetView(), CbValidateMode::All) == CbValidateError::None); @@ -10510,25 +10837,11 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_UploadOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); - - ParsePath(); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); - if (m_BuildPartName.empty()) - { - m_BuildPartName = m_Path.filename().string(); - } + ZenState InstanceState; - const Oid BuildId = m_BuildId.empty() ? Oid::NewOid() : ParseBuildId(); - if (m_BuildId.empty()) - { - m_BuildId = BuildId.ToString(); - } - const Oid BuildPartId = m_BuildPartId.empty() ? Oid::NewOid() : ParseBuildPartId(); - if (m_BuildPartId.empty()) - { - m_BuildPartId = BuildPartId.ToString(); - } + ParsePath(); BuildStorage::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -10548,11 +10861,33 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true); + + if (m_BuildPartName.empty()) + { + m_BuildPartName = m_Path.filename().string(); + } + + const Oid BuildId = m_BuildId.empty() ? Oid::NewOid() : ParseBuildId(); + if (m_BuildId.empty()) + { + m_BuildId = BuildId.ToString(); + } + const Oid BuildPartId = m_BuildPartId.empty() ? Oid::NewOid() : ParseBuildPartId(); + if (m_BuildPartId.empty()) + { + m_BuildPartId = BuildPartId.ToString(); + } CbObject MetaData = ParseBuildMetadata(); - const std::filesystem::path TempDir = UploadTempDirectory(m_Path); + const std::filesystem::path TempDir = ZenTempFolderPath(m_ZenFolderPath); + + m_ManifestPath = RemoveQuotes(m_ManifestPath); UploadFolder(Storage, BuildId, @@ -10594,16 +10929,33 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_DownloadOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); + + ZenState InstanceState; ParsePath(); + if (m_ZenFolderPath.empty()) + { + m_ZenFolderPath = m_Path / ZenFolderName; + } + MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); + + BuildStorage::Statistics StorageStats; + BuildStorageCache::Statistics StorageCacheStats; + + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true); + const Oid BuildId = ParseBuildId(); if (m_PostDownloadVerify && m_PrimeCacheOnly) { throw zen::OptionParseException( - fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", m_DownloadOptions.help())); + fmt::format("'cache-prime-only' option is not compatible with 'verify' option\n{}", SubOption->help())); } if (m_Clean && m_PrimeCacheOnly) @@ -10616,23 +10968,13 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ZEN_WARN("ignoring 'allow-partial-block-requests' option when 'cache-prime-only' is enabled"); } - std::vector<Oid> BuildPartIds = ParseBuildPartIds(); - - if (m_ZenFolderPath.empty()) - { - m_ZenFolderPath = m_Path / ZenFolderName; - } - MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); - - BuildStorage::Statistics StorageStats; - BuildStorageCache::Statistics StorageCacheStats; - - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + std::vector<Oid> BuildPartIds = ParseBuildPartIds(); + std::vector<std::string> BuildPartNames = ParseBuildPartNames(); DownloadFolder(Storage, BuildId, BuildPartIds, - m_BuildPartNames, + BuildPartNames, m_Path, m_ZenFolderPath, m_SystemRootDir, @@ -10656,10 +10998,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_FetchBlobOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); - IoHash BlobHash = ParseBlobHash(); - - const Oid BuildId = Oid::FromHexString(m_BuildId); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); BuildStorage::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -10679,7 +11018,15 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true); + + IoHash BlobHash = ParseBlobHash(); + + const Oid BuildId = Oid::FromHexString(m_BuildId); uint64_t CompressedSize; uint64_t DecompressedSize; @@ -10697,14 +11044,9 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) if (SubOption == &m_ValidateBuildPartOptions) { - ZEN_CONSOLE("Running {}: {}", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL); + ZEN_CONSOLE("Running {}: {} (pid {})", GetRunningExecutablePath(), ZEN_CFG_VERSION_BUILD_STRING_FULL, GetCurrentProcessId()); - Oid BuildId = ParseBuildId(); - - if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) - { - throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", m_DownloadOptions.help())); - } + ZenState InstanceState; BuildStorage::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; @@ -10724,7 +11066,18 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } }); - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true); + + Oid BuildId = ParseBuildId(); + + if (!m_BuildPartName.empty() && !m_BuildPartId.empty()) + { + throw zen::OptionParseException(fmt::format("build-part-id conflicts with build-part-name\n{}", SubOption->help())); + } const Oid BuildPartId = m_BuildPartName.empty() ? Oid::Zero : ParseBuildPartId(); @@ -10753,15 +11106,19 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) BuildStorage::Statistics StorageStats; BuildStorageCache::Statistics StorageCacheStats; - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true); Stopwatch Timer; for (const std::string& BuildIdString : m_BuildIds) { - Oid BuildId = Oid::FromHexString(BuildIdString); + Oid BuildId = Oid::FromHexString(RemoveQuotes(BuildIdString)); if (BuildId == Oid::Zero) { - throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, m_DownloadOptions.help())); + throw zen::OptionParseException(fmt::format("invalid build id {}\n{}", BuildIdString, SubOption->help())); } DownloadFolder(Storage, BuildId, @@ -10787,6 +11144,50 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } + auto ParseZenProcessId = [&]() { + if (m_ZenProcessId == -1) + { + const std::filesystem::path RunningExecutablePath = GetRunningExecutablePath(); + ProcessHandle RunningProcess; + std::error_code Ec = FindProcess(RunningExecutablePath, RunningProcess, /*IncludeSelf*/ false); + if (Ec) + { + throw std::runtime_error( + fmt::format("Failed finding process running '{}', reason: '{}'", RunningExecutablePath, Ec.message())); + } + if (!RunningProcess.IsValid()) + { + throw zen::OptionParseException( + fmt::format("Unable to find a running instance of the zen executable '{}'", RunningExecutablePath)); + } + m_ZenProcessId = RunningProcess.Pid(); + } + }; + + if (SubOption == &m_PauseOptions) + { + ParseZenProcessId(); + ZenState RunningState(m_ZenProcessId); + RunningState.StateData().Pause.store(true); + return 0; + } + + if (SubOption == &m_ResumeOptions) + { + ParseZenProcessId(); + ZenState RunningState(m_ZenProcessId); + RunningState.StateData().Pause.store(false); + return 0; + } + + if (SubOption == &m_AbortOptions) + { + ParseZenProcessId(); + ZenState RunningState(m_ZenProcessId); + RunningState.StateData().Abort.store(true); + return 0; + } + if (SubOption == &m_TestOptions) { m_SystemRootDir = (GetRunningExecutablePath().parent_path() / ".tmpzensystem").make_preferred(); @@ -10796,14 +11197,6 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) ParsePath(); - m_BuildId = Oid::NewOid().ToString(); - m_BuildPartName = m_Path.filename().string(); - m_BuildPartId = Oid::NewOid().ToString(); - m_CreateBuild = true; - - const Oid BuildId = Oid::FromHexString(m_BuildId); - const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); - if (m_OverrideHost.empty() && m_StoragePath.empty()) { m_StoragePath = (GetRunningExecutablePath().parent_path() / ".tmpstore").make_preferred(); @@ -10838,7 +11231,19 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } MakeSafeAbsolutePathÍnPlace(m_ZenFolderPath); - StorageInstance Storage = CreateBuildStorage(StorageStats, StorageCacheStats, ZenTempFolderPath(m_ZenFolderPath)); + StorageInstance Storage = CreateBuildStorage(StorageStats, + StorageCacheStats, + ZenTempFolderPath(m_ZenFolderPath), + /*RequriesNamespace*/ true, + /*RequireBucket*/ true); + + m_BuildId = Oid::NewOid().ToString(); + m_BuildPartName = m_Path.filename().string(); + m_BuildPartId = Oid::NewOid().ToString(); + m_CreateBuild = true; + + const Oid BuildId = Oid::FromHexString(m_BuildId); + const Oid BuildPartId = Oid::FromHexString(m_BuildPartId); auto MakeMetaData = [](const Oid& BuildId) -> CbObject { CbObjectWriter BuildMetaDataWriter; @@ -10955,7 +11360,7 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return true; }; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); uint32_t Randomizer = 0; auto FileSizeIt = DownloadContent.FileSizes.begin(); @@ -11013,8 +11418,8 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) } FileSizeIt++; } - Work.Wait(5000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(5000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_CONSOLE("Scrambling files, {} remaining", PendingWork); }); ZEN_ASSERT(!AbortFlag.load()); @@ -11161,6 +11566,24 @@ BuildsCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) return 0; } } + catch (const std::system_error& SysErr) + { + if (IsOOD(SysErr)) + { + ZEN_CONSOLE("Operation failed due to out of disk space: {}", SysErr.what()); + return 3; + } + else if (IsOOM(SysErr)) + { + ZEN_CONSOLE("Operation failed due to out of memory: {}", SysErr.what()); + return 3; + } + else + { + ZEN_ERROR("{}", SysErr.what()); + return 3; + } + } catch (const std::exception& Ex) { ZEN_ERROR("{}", Ex.what()); diff --git a/src/zen/cmds/builds_cmd.h b/src/zen/cmds/builds_cmd.h index 41ed65105..b3466c0d3 100644 --- a/src/zen/cmds/builds_cmd.h +++ b/src/zen/cmds/builds_cmd.h @@ -38,6 +38,7 @@ private: // cloud builds std::string m_OverrideHost; std::string m_Host; + std::string m_Url; bool m_AssumeHttp2 = false; bool m_AllowRedirect = false; std::string m_Namespace; @@ -88,7 +89,6 @@ private: std::string m_Verb; // list, upload, download cxxopts::Options m_ListNamespacesOptions{"list-namespaces", "List available build namespaces"}; - std::string m_ListNamespacesResultPath; bool m_ListNamespacesRecursive = false; cxxopts::Options m_ListOptions{"list", "List available builds"}; @@ -114,6 +114,12 @@ private: cxxopts::Options m_FetchBlobOptions{"fetch-blob", "Fetch a blob from remote store"}; std::string m_BlobHash; + cxxopts::Options m_PauseOptions{"pause", "Pause an ongoing zen builds process"}; + cxxopts::Options m_ResumeOptions{"resume", "Resume a paused zen builds process"}; + cxxopts::Options m_AbortOptions{"abort", "Abort an ongoing zen builds process"}; + + int m_ZenProcessId = -1; + cxxopts::Options m_ValidateBuildPartOptions{"validate-part", "Fetch a build part and validate all referenced attachments"}; cxxopts::Options m_TestOptions{"test", "Test upload and download with verify"}; @@ -121,15 +127,18 @@ private: cxxopts::Options m_MultiTestDownloadOptions{"multi-test-download", "Test multiple sequenced downloads with verify"}; std::vector<std::string> m_BuildIds; - cxxopts::Options* m_SubCommands[9] = {&m_ListNamespacesOptions, - &m_ListOptions, - &m_UploadOptions, - &m_DownloadOptions, - &m_DiffOptions, - &m_FetchBlobOptions, - &m_ValidateBuildPartOptions, - &m_TestOptions, - &m_MultiTestDownloadOptions}; + cxxopts::Options* m_SubCommands[12] = {&m_ListNamespacesOptions, + &m_ListOptions, + &m_UploadOptions, + &m_DownloadOptions, + &m_PauseOptions, + &m_ResumeOptions, + &m_AbortOptions, + &m_DiffOptions, + &m_FetchBlobOptions, + &m_ValidateBuildPartOptions, + &m_TestOptions, + &m_MultiTestDownloadOptions}; }; } // namespace zen diff --git a/src/zen/cmds/up_cmd.cpp b/src/zen/cmds/up_cmd.cpp index f3bf2f66d..d0763701e 100644 --- a/src/zen/cmds/up_cmd.cpp +++ b/src/zen/cmds/up_cmd.cpp @@ -311,7 +311,7 @@ DownCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** argv) // Try to find the running executable by path name std::filesystem::path ServerExePath = m_ProgramBaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; ProcessHandle RunningProcess; - if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec) + if (std::error_code Ec = FindProcess(ServerExePath, RunningProcess); !Ec, /*IncludeSelf*/ false) { ZEN_WARN("attempting hard terminate of zen process with pid ({})", RunningProcess.Pid()); try diff --git a/src/zen/cmds/wipe_cmd.cpp b/src/zen/cmds/wipe_cmd.cpp index 9dfdca0a1..fcc18df2b 100644 --- a/src/zen/cmds/wipe_cmd.cpp +++ b/src/zen/cmds/wipe_cmd.cpp @@ -33,6 +33,7 @@ namespace zen { namespace { static std::atomic<bool> AbortFlag = false; + static std::atomic<bool> PauseFlag = false; static bool IsVerbose = false; static bool Quiet = false; static ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty; @@ -72,11 +73,13 @@ namespace { { if (SigNum == SIGINT) { + PauseFlag = false; AbortFlag = true; } #if ZEN_PLATFORM_WINDOWS if (SigNum == SIGBREAK) { + PauseFlag = false; AbortFlag = true; } #endif // ZEN_PLATFORM_WINDOWS @@ -248,7 +251,7 @@ namespace { return Added; }; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); struct AsyncVisitor : public GetDirectoryContentVisitor { @@ -423,12 +426,12 @@ namespace { GetIOWorkerPool(), Work.PendingWork()); - Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, ptrdiff_t PendingWork) { + Work.Wait(ProgressMode == ProgressBar::Mode::Pretty ? 200 : 5000, [&](bool IsAborted, bool IsPaused, ptrdiff_t PendingWork) { + ZEN_UNUSED(PendingWork); if (Quiet) { return; } - ZEN_UNUSED(IsAborted, PendingWork); LastUpdateTimeMs = Timer.GetElapsedTimeMs(); uint64_t Deleted = DeletedItemCount.load(); @@ -437,7 +440,8 @@ namespace { Progress.UpdateState({.Task = "Removing files ", .Details = fmt::format("Found {}, Deleted {} ({})", Discovered, Deleted, NiceBytes(DeletedBytes)), .TotalCount = Discovered, - .RemainingCount = Discovered - Deleted}, + .RemainingCount = Discovered - Deleted, + .Status = ProgressBar::State::CalculateStatus(IsAborted, IsPaused)}, false); }); diff --git a/src/zen/zen.cpp b/src/zen/zen.cpp index 56c5bcc09..614112235 100644 --- a/src/zen/zen.cpp +++ b/src/zen/zen.cpp @@ -376,7 +376,8 @@ ProgressBar::SetLogOperationProgress(Mode InMode, uint32_t StepIndex, uint32_t S ProgressBar::ProgressBar(Mode InMode, std::string_view InSubTask) : m_Mode((!IsStdoutTty() && InMode == Mode::Pretty) ? Mode::Plain : InMode) -, m_LastUpdateMS(m_SW.GetElapsedTimeMs() - 10000) +, m_LastUpdateMS((uint64_t)-1) +, m_PausedMS(0) , m_SubTask(InSubTask) { if (!m_SubTask.empty() && InMode == Mode::Log) @@ -413,20 +414,46 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) } uint64_t ElapsedTimeMS = m_SW.GetElapsedTimeMs(); - if (!DoLinebreak && (NewState.Task == m_State.Task) && ((m_LastUpdateMS + 200) > ElapsedTimeMS)) + if (m_LastUpdateMS != (uint64_t)-1) { - return; + if (!DoLinebreak && (NewState.Status == m_State.Status) && (NewState.Task == m_State.Task) && + ((m_LastUpdateMS + 200) > ElapsedTimeMS)) + { + return; + } + if (m_State.Status == State::EStatus::Paused) + { + uint64_t ElapsedSinceLast = ElapsedTimeMS - m_LastUpdateMS; + m_PausedMS += ElapsedSinceLast; + } } m_LastUpdateMS = ElapsedTimeMS; + std::string Task = NewState.Task; + switch (NewState.Status) + { + case State::EStatus::Aborted: + Task = "Aborting"; + break; + case State::EStatus::Paused: + Task = "Paused"; + break; + default: + break; + } + if (NewState.Task.length() > Task.length()) + { + Task += std::string(NewState.Task.length() - Task.length(), ' '); + } + const size_t PercentDone = NewState.TotalCount > 0u ? gsl::narrow<uint8_t>((100 * (NewState.TotalCount - NewState.RemainingCount)) / NewState.TotalCount) : 0u; if (m_Mode == Mode::Plain) { const std::string Details = (!NewState.Details.empty()) ? fmt::format(": {}", NewState.Details) : ""; - const std::string Output = fmt::format("{} {}% ({}){}\n", NewState.Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details); + const std::string Output = fmt::format("{} {}% ({}){}\n", Task, PercentDone, NiceTimeSpanMs(ElapsedTimeMS), Details); OutputToConsoleRaw(Output); } else if (m_Mode == Mode::Pretty) @@ -435,12 +462,12 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) size_t ProgressBarCount = (ProgressBarSize * PercentDone) / 100; uint64_t Completed = NewState.TotalCount - NewState.RemainingCount; - uint64_t ETAMS = (PercentDone > 5) ? (ElapsedTimeMS * NewState.RemainingCount) / Completed : 0; + uint64_t ETAElapsedMS = ElapsedTimeMS -= m_PausedMS; + uint64_t ETAMS = + (NewState.Status == State::EStatus::Running) && (PercentDone > 5) ? (ETAElapsedMS * NewState.RemainingCount) / Completed : 0; uint32_t ConsoleColumns = GetConsoleColumns(); - std::string_view TaskString = NewState.Task; - const std::string PercentString = fmt::format("{:#3}%", PercentDone); const std::string ProgressBarString = @@ -454,7 +481,7 @@ ProgressBar::UpdateState(const State& NewState, bool DoLinebreak) ExtendableStringBuilder<256> OutputBuilder; - OutputBuilder << "\r" << TaskString << PercentString; + OutputBuilder << "\r" << Task << PercentString; if (OutputBuilder.Size() + 1 < ConsoleColumns) { size_t RemainingSpace = ConsoleColumns - (OutputBuilder.Size() + 1); @@ -549,7 +576,7 @@ ProgressBar::ForceLinebreak() void ProgressBar::Finish() { - if (m_LastOutputLength > 0) + if (m_LastOutputLength > 0 || m_State.RemainingCount > 0) { State NewState = m_State; NewState.RemainingCount = 0; @@ -846,39 +873,22 @@ main(int argc, char** argv) .add_option("ue-trace", "", "tracefile", "Path to write a trace to", cxxopts::value<std::string>(TraceFile)->default_value(""), ""); #endif // ZEN_WITH_TRACE - Options.parse_positional({"command"}); #if ZEN_USE_SENTRY - bool NoSentry = false; - bool SentryAllowPII = false; - Options.add_options()("no-sentry", "Disable Sentry crash handler", cxxopts::value<bool>(NoSentry)->default_value("false")); - Options.add_options()("sentry-allow-personal-info", - "Allow personally identifiable information in sentry crash reports", - cxxopts::value<bool>(SentryAllowPII)->default_value("false")); + bool NoSentry = false; + bool SentryAllowPII = false; + std::string SentryDsn; + Options + .add_option("sentry", "", "no-sentry", "Disable Sentry crash handler", cxxopts::value<bool>(NoSentry)->default_value("false"), ""); + Options.add_option("sentry", + "", + "sentry-allow-personal-info", + "Allow personally identifiable information in sentry crash reports", + cxxopts::value<bool>(SentryAllowPII)->default_value("false"), + ""); + Options.add_option("sentry", "", "sentry-dsn", "Sentry DSN to send events to", cxxopts::value<std::string>(SentryDsn), ""); #endif -#if ZEN_USE_SENTRY - SentryIntegration Sentry; - - if (NoSentry == false) - { - std::string SentryDatabasePath = (GetRunningExecutablePath().parent_path() / ".sentry-native").string(); - - ExtendableStringBuilder<512> SB; - for (int i = 0; i < argc; ++i) - { - if (i) - { - SB.Append(' '); - } - - SB.Append(argv[i]); - } - - Sentry.Initialize(SentryDatabasePath, {}, SentryAllowPII, SB.ToString()); - - SentryIntegration::ClearCaches(); - } -#endif + Options.parse_positional({"command"}); const bool IsNullInvoke = (argc == 1); // If no arguments are passed we want to print usage information @@ -919,6 +929,30 @@ main(int argc, char** argv) exit(0); } +#if ZEN_USE_SENTRY + SentryIntegration Sentry; + + if (NoSentry == false) + { + std::string SentryDatabasePath = (std::filesystem::temp_directory_path() / ".zen-sentry-native").string(); + + ExtendableStringBuilder<512> SB; + for (int i = 0; i < argc; ++i) + { + if (i) + { + SB.Append(' '); + } + + SB.Append(argv[i]); + } + + Sentry.Initialize(SentryDatabasePath, {}, SentryDsn, SentryAllowPII, SB.ToString()); + + SentryIntegration::ClearCaches(); + } +#endif + zen::LoggingOptions LogOptions; LogOptions.IsDebug = GlobalOptions.IsDebug; LogOptions.IsVerbose = GlobalOptions.IsVerbose; diff --git a/src/zen/zen.h b/src/zen/zen.h index 9ca228e37..995bd13b6 100644 --- a/src/zen/zen.h +++ b/src/zen/zen.h @@ -79,6 +79,26 @@ public: std::string Details; uint64_t TotalCount = 0; uint64_t RemainingCount = 0; + enum class EStatus + { + Running, + Aborted, + Paused + }; + EStatus Status = EStatus::Running; + + static EStatus CalculateStatus(bool IsAborted, bool IsPaused) + { + if (IsAborted) + { + return EStatus::Aborted; + } + if (IsPaused) + { + return EStatus::Paused; + } + return EStatus::Running; + } }; enum class Mode @@ -104,6 +124,7 @@ private: const Mode m_Mode; Stopwatch m_SW; uint64_t m_LastUpdateMS; + uint64_t m_PausedMS; State m_State; const std::string m_SubTask; size_t m_LastOutputLength = 0; diff --git a/src/zencore/filesystem.cpp b/src/zencore/filesystem.cpp index c4264bc29..46337ffc8 100644 --- a/src/zencore/filesystem.cpp +++ b/src/zencore/filesystem.cpp @@ -33,6 +33,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <dirent.h> # include <fcntl.h> # include <sys/resource.h> +# include <sys/mman.h> # include <sys/stat.h> # include <pwd.h> # include <unistd.h> @@ -43,6 +44,7 @@ ZEN_THIRD_PARTY_INCLUDES_END # include <fcntl.h> # include <libproc.h> # include <sys/resource.h> +# include <sys/mman.h> # include <sys/stat.h> # include <sys/syslimits.h> # include <pwd.h> @@ -2824,6 +2826,218 @@ SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly) return Result; } +class SharedMemoryImpl : public SharedMemory +{ +public: + struct Data + { + void* Handle = nullptr; + void* DataPtr = nullptr; + size_t Size = 0; + std::string Name; + }; + + static Data Open(std::string_view Name, size_t Size, bool SystemGlobal) + { +#if ZEN_PLATFORM_WINDOWS + std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name)); + + HANDLE hMap = OpenFileMapping(FILE_MAP_ALL_ACCESS, FALSE, InstanceMapName.c_str()); + if (hMap == NULL) + { + return {}; + } + void* pBuf = MapViewOfFile(hMap, // handle to map object + FILE_MAP_ALL_ACCESS, // read/write permission + 0, // offset high + 0, // offset low + DWORD(Size)); // size + + if (pBuf == NULL) + { + CloseHandle(hMap); + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + ZEN_UNUSED(SystemGlobal); + std::string InstanceMapName = fmt::format("/{}", Name); + + int Fd = shm_open(InstanceMapName.c_str(), O_RDWR, 0666); + if (Fd < 0) + { + return {}; + } + void* hMap = (void*)intptr_t(Fd); + + struct stat Stat; + fstat(Fd, &Stat); + + if (size_t(Stat.st_size) < Size) + { + close(Fd); + return {}; + } + + void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0); + if (pBuf == MAP_FAILED) + { + close(Fd); + return {}; + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + static Data Create(std::string_view Name, size_t Size, bool SystemGlobal) + { +#if ZEN_PLATFORM_WINDOWS + std::wstring InstanceMapName = Utf8ToWide(fmt::format("{}\\{}", SystemGlobal ? "Global" : "Local", Name)); + + SECURITY_ATTRIBUTES m_Attributes{}; + SECURITY_DESCRIPTOR m_Sd{}; + + m_Attributes.nLength = sizeof m_Attributes; + m_Attributes.bInheritHandle = false; // Disable inheritance + + const BOOL Success = InitializeSecurityDescriptor(&m_Sd, SECURITY_DESCRIPTOR_REVISION); + + if (Success) + { + if (!SetSecurityDescriptorDacl(&m_Sd, TRUE, (PACL)NULL, FALSE)) + { + ThrowLastError("SetSecurityDescriptorDacl failed"); + } + + m_Attributes.lpSecurityDescriptor = &m_Sd; + } + + HANDLE hMap = CreateFileMapping(INVALID_HANDLE_VALUE, // use paging file + &m_Attributes, // allow anyone to access + PAGE_READWRITE, // read/write access + 0, // maximum object size (high-order DWORD) + DWORD(Size), // maximum object size (low-order DWORD) + InstanceMapName.c_str()); + if (hMap == NULL) + { + return {}; + } + void* pBuf = MapViewOfFile(hMap, // handle to map object + FILE_MAP_ALL_ACCESS, // read/write permission + 0, // offset high + 0, // offset low + DWORD(Size)); // size + + if (pBuf == NULL) + { + CloseHandle(hMap); + return {}; + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + ZEN_UNUSED(SystemGlobal); + std::string InstanceMapName = fmt::format("/{}", Name); + + int Fd = shm_open(InstanceMapName.c_str(), O_RDWR | O_CREAT | O_CLOEXEC, 0666); + if (Fd < 0) + { + return {}; + } + fchmod(Fd, 0666); + void* hMap = (void*)intptr_t(Fd); + + int Result = ftruncate(Fd, Size); + ZEN_UNUSED(Result); + + void* pBuf = mmap(nullptr, Size, PROT_READ | PROT_WRITE, MAP_SHARED, Fd, 0); + if (pBuf == MAP_FAILED) + { + close(Fd); + return {}; + } + return Data{.Handle = hMap, .DataPtr = pBuf, .Size = Size, .Name = std::string(Name)}; +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + static void Close(Data&& MemMap, bool Delete) + { +#if ZEN_PLATFORM_WINDOWS + ZEN_UNUSED(Delete); + if (MemMap.DataPtr != nullptr) + { + UnmapViewOfFile(MemMap.DataPtr); + MemMap.DataPtr = nullptr; + } + if (MemMap.Handle != nullptr) + { + CloseHandle(MemMap.Handle); + MemMap.Handle = nullptr; + } +#endif // ZEN_PLATFORM_WINDOWS +#if ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + if (MemMap.DataPtr != nullptr) + { + munmap(MemMap.DataPtr, MemMap.Size); + MemMap.DataPtr = nullptr; + } + + if (MemMap.Handle != nullptr) + { + int Fd = int(intptr_t(MemMap.Handle)); + close(Fd); + MemMap.Handle = nullptr; + } + if (Delete) + { + std::string InstanceMapName = fmt::format("/{}", MemMap.Name); + shm_unlink(InstanceMapName.c_str()); + } +#endif // ZEN_PLATFORM_LINUX || ZEN_PLATFORM_MAC + } + + SharedMemoryImpl(Data&& MemMap, bool IsOwned) : m_MemMap(std::move(MemMap)), m_IsOwned(IsOwned) {} + virtual ~SharedMemoryImpl() + { + try + { + Close(std::move(m_MemMap), /*Delete*/ m_IsOwned); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("SharedMemoryImpl::~SharedMemoryImpl threw exception: {}", Ex.what()); + } + } + + virtual void* GetData() override { return m_MemMap.DataPtr; } + +private: + Data m_MemMap; + const bool m_IsOwned = false; +}; + +std::unique_ptr<SharedMemory> +OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal) +{ + SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Open(Name, Size, SystemGlobal); + if (MemMap.DataPtr) + { + return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ false); + } + return {}; +} + +std::unique_ptr<SharedMemory> +CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal) +{ + SharedMemoryImpl::Data MemMap = SharedMemoryImpl::Create(Name, Size, SystemGlobal); + if (MemMap.DataPtr) + { + return std::make_unique<SharedMemoryImpl>(std::move(MemMap), /*IsOwned*/ true); + } + return {}; +} + ////////////////////////////////////////////////////////////////////////// // // Testing related code follows... @@ -3108,6 +3322,22 @@ TEST_CASE("RotateDirectories") } } +TEST_CASE("SharedMemory") +{ + CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false)); + CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, true)); + + { + auto Mem0 = CreateSharedMemory("SharedMemoryTest0", 482, false); + CHECK(Mem0); + strcpy((char*)Mem0->GetData(), "this is the string we are looking for"); + auto Mem1 = OpenSharedMemory("SharedMemoryTest0", 482, false); + CHECK_EQ(std::string((char*)Mem0->GetData()), std::string((char*)Mem1->GetData())); + } + + CHECK(!OpenSharedMemory("SharedMemoryTest0", 482, false)); +} + #endif } // namespace zen diff --git a/src/zencore/include/zencore/filesystem.h b/src/zencore/include/zencore/filesystem.h index dfd0eedc9..36d4d1b68 100644 --- a/src/zencore/include/zencore/filesystem.h +++ b/src/zencore/include/zencore/filesystem.h @@ -385,6 +385,16 @@ uint32_t MakeFileModeReadOnly(uint32_t FileMode, bool ReadOnly); bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly, std::error_code& Ec); bool SetFileReadOnly(const std::filesystem::path& Filename, bool ReadOnly); +class SharedMemory +{ +public: + virtual ~SharedMemory() {} + virtual void* GetData() = 0; +}; + +std::unique_ptr<SharedMemory> OpenSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal); +std::unique_ptr<SharedMemory> CreateSharedMemory(std::string_view Name, size_t Size, bool SystemGlobal); + ////////////////////////////////////////////////////////////////////////// void filesystem_forcelink(); // internal diff --git a/src/zencore/include/zencore/process.h b/src/zencore/include/zencore/process.h index d1394cd9a..d3e1de703 100644 --- a/src/zencore/include/zencore/process.h +++ b/src/zencore/include/zencore/process.h @@ -98,7 +98,7 @@ ZENCORE_API int GetCurrentProcessId(); int GetProcessId(CreateProcResult ProcId); std::filesystem::path GetProcessExecutablePath(int Pid, std::error_code& OutEc); -std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle); +std::error_code FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf = true); void process_forcelink(); // internal diff --git a/src/zencore/include/zencore/sentryintegration.h b/src/zencore/include/zencore/sentryintegration.h index 40e22af4e..d14c1c275 100644 --- a/src/zencore/include/zencore/sentryintegration.h +++ b/src/zencore/include/zencore/sentryintegration.h @@ -31,8 +31,12 @@ public: SentryIntegration(); ~SentryIntegration(); - void Initialize(std::string SentryDatabasePath, std::string SentryAttachmentsPath, bool AllowPII, const std::string& CommandLine); - void LogStartupInformation(); + void Initialize(std::string SentryDatabasePath, + std::string SentryAttachmentsPath, + std::string SentryDsn, + bool AllowPII, + const std::string& CommandLine); + void LogStartupInformation(); static void ClearCaches(); private: diff --git a/src/zencore/process.cpp b/src/zencore/process.cpp index 48efc3f85..fcbe657cb 100644 --- a/src/zencore/process.cpp +++ b/src/zencore/process.cpp @@ -929,7 +929,7 @@ GetProcessExecutablePath(int Pid, std::error_code& OutEc) } std::error_code -FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle) +FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHandle, bool IncludeSelf) { #if ZEN_PLATFORM_WINDOWS HANDLE ProcessSnapshotHandle = CreateToolhelp32Snapshot(TH32CS_SNAPPROCESS, 0); @@ -939,13 +939,15 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand } auto _ = MakeGuard([&]() { CloseHandle(ProcessSnapshotHandle); }); + const DWORD ThisProcessId = ::GetCurrentProcessId(); + PROCESSENTRY32 Entry; Entry.dwSize = sizeof(PROCESSENTRY32); if (Process32First(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry)) { do { - if (ExecutableImage.filename() == Entry.szExeFile) + if ((IncludeSelf || (Entry.th32ProcessID != ThisProcessId)) && (ExecutableImage.filename() == Entry.szExeFile)) { std::error_code Ec; std::filesystem::path EntryPath = GetProcessExecutablePath(Entry.th32ProcessID, Ec); @@ -970,6 +972,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand } } } while (::Process32Next(ProcessSnapshotHandle, (LPPROCESSENTRY32)&Entry)); + return {}; } return MakeErrorCodeFromLastError(); #endif // ZEN_PLATFORM_WINDOWS @@ -980,6 +983,8 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand struct kinfo_proc* Processes = nullptr; uint32_t ProcCount = 0; + const pid_t ThisProcessId = getpid(); + if (sysctl(Mib, 4, NULL, &BufferSize, NULL, 0) != -1 && BufferSize > 0) { struct kinfo_proc* Processes = (struct kinfo_proc*)malloc(BufferSize); @@ -990,36 +995,46 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand char Buffer[PROC_PIDPATHINFO_MAXSIZE]; for (uint32_t ProcIndex = 0; ProcIndex < ProcCount; ProcIndex++) { - pid_t Pid = Processes[ProcIndex].kp_proc.p_pid; - std::error_code Ec; - std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec); - if (!Ec) + pid_t Pid = Processes[ProcIndex].kp_proc.p_pid; + if (IncludeSelf || (Pid != ThisProcessId)) { - if (EntryPath == ExecutableImage) + std::error_code Ec; + std::filesystem::path EntryPath = GetProcessExecutablePath(Pid, Ec); + if (!Ec) { - if (Processes[ProcIndex].kp_proc.p_stat != SZOMB) + if (EntryPath == ExecutableImage) { - OutHandle.Initialize(Pid, Ec); - return Ec; + if (Processes[ProcIndex].kp_proc.p_stat != SZOMB) + { + OutHandle.Initialize(Pid, Ec); + return Ec; + } } } + Ec.clear(); } } + return {}; } } return MakeErrorCodeFromLastError(); #endif // ZEN_PLATFORM_MAC #if ZEN_PLATFORM_LINUX + const pid_t ThisProcessId = getpid(); + std::vector<uint32_t> RunningPids; DirectoryContent ProcList; GetDirectoryContent("/proc", DirectoryContentFlags::IncludeDirs, ProcList); for (const std::filesystem::path& EntryPath : ProcList.Directories) { std::string EntryName = EntryPath.stem(); - std::optional<uint32_t> Pid = ParseInt<uint32_t>(EntryName); - if (Pid.has_value()) + std::optional<uint32_t> PidMaybe = ParseInt<uint32_t>(EntryName); + if (PidMaybe.has_value()) { - RunningPids.push_back(Pid.value()); + if (pid_t Pid = PidMaybe.value(); IncludeSelf || (Pid != ThisProcessId)) + { + RunningPids.push_back(Pid); + } } } @@ -1042,6 +1057,7 @@ FindProcess(const std::filesystem::path& ExecutableImage, ProcessHandle& OutHand } } } + Ec.clear(); } return {}; #endif // ZEN_PLATFORM_LINUX @@ -1065,10 +1081,24 @@ TEST_CASE("Process") TEST_CASE("FindProcess") { - ProcessHandle Process; - std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process); - CHECK(!Ec); - CHECK(Process.IsValid()); + { + ProcessHandle Process; + std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ true); + CHECK(!Ec); + CHECK(Process.IsValid()); + } + { + ProcessHandle Process; + std::error_code Ec = FindProcess(GetRunningExecutablePath(), Process, /*IncludeSelf*/ false); + CHECK(!Ec); + CHECK(!Process.IsValid()); + } + { + ProcessHandle Process; + std::error_code Ec = FindProcess("this/does\\not/exist\\123914921929412312312312asdad\\12134.no", Process, /*IncludeSelf*/ false); + CHECK(!Ec); + CHECK(!Process.IsValid()); + } } TEST_CASE("BuildArgV") diff --git a/src/zencore/sentryintegration.cpp b/src/zencore/sentryintegration.cpp index d08fb7f1d..520d5162e 100644 --- a/src/zencore/sentryintegration.cpp +++ b/src/zencore/sentryintegration.cpp @@ -31,6 +31,10 @@ ZEN_THIRD_PARTY_INCLUDES_END namespace sentry { +namespace { + static const std::string DefaultDsn("https://[email protected]/5919284"); +} + struct SentryAssertImpl : zen::AssertImpl { virtual void ZEN_FORCENOINLINE ZEN_DEBUG_SECTION @@ -194,6 +198,7 @@ SentryIntegration::~SentryIntegration() void SentryIntegration::Initialize(std::string SentryDatabasePath, std::string SentryAttachmentsPath, + std::string SentryDsn, bool AllowPII, const std::string& CommandLine) { @@ -204,7 +209,7 @@ SentryIntegration::Initialize(std::string SentryDatabasePath, SentryDatabasePath = SentryDatabasePath.substr(4); } sentry_options_t* SentryOptions = sentry_options_new(); - sentry_options_set_dsn(SentryOptions, "https://[email protected]/5919284"); + sentry_options_set_dsn(SentryOptions, SentryDsn.empty() ? sentry::DefaultDsn.c_str() : SentryDsn.c_str()); sentry_options_set_database_path(SentryOptions, SentryDatabasePath.c_str()); sentry_options_set_logger(SentryOptions, SentryLogFunction, this); if (!SentryAttachmentsPath.empty()) diff --git a/src/zenhttp/httpclient.cpp b/src/zenhttp/httpclient.cpp index f2b26b922..a2d323b5e 100644 --- a/src/zenhttp/httpclient.cpp +++ b/src/zenhttp/httpclient.cpp @@ -333,7 +333,7 @@ namespace detail { m_Buffer = (uint8_t*)Memory::Alloc(gsl::narrow<size_t>(m_BufferSize)); } m_BufferStart = Begin; - m_BufferEnd = Min(Begin + m_BufferSize, m_FileSize); + m_BufferEnd = Min(Begin + m_BufferSize, m_FileEnd); Read(m_Buffer, m_BufferEnd - m_BufferStart, m_BufferStart); uint64_t Count = Min(m_BufferEnd, End) - m_BufferStart; memcpy(WritePtr + Begin - m_FileOffset, m_Buffer, Count); diff --git a/src/zenhttp/httpclientauth.cpp b/src/zenhttp/httpclientauth.cpp index 916b25bff..39efe1d0c 100644 --- a/src/zenhttp/httpclientauth.cpp +++ b/src/zenhttp/httpclientauth.cpp @@ -7,6 +7,7 @@ #include <zencore/process.h> #include <zencore/scopeguard.h> #include <zencore/timer.h> +#include <zencore/uid.h> #include <zenhttp/auth/authmgr.h> #include <ctime> @@ -94,9 +95,8 @@ namespace zen { namespace httpclientauth { CreateProcOptions ProcOptions; - const std::string AuthTokenPath = ".zen-auth-oidctoken"; - - auto _ = MakeGuard([AuthTokenPath]() { RemoveFile(AuthTokenPath); }); + const std::filesystem::path AuthTokenPath(std::filesystem::temp_directory_path() / fmt::format(".zen-auth-{}", Oid::NewOid())); + auto _ = MakeGuard([AuthTokenPath]() { RemoveFile(AuthTokenPath); }); const std::string ProcArgs = fmt::format("{} --AuthConfigUrl {} --OutFile {} --Unattended={}", OidcExecutablePath, diff --git a/src/zenserver/cache/httpstructuredcache.cpp b/src/zenserver/cache/httpstructuredcache.cpp index f7e63433b..9f2e826d6 100644 --- a/src/zenserver/cache/httpstructuredcache.cpp +++ b/src/zenserver/cache/httpstructuredcache.cpp @@ -1588,7 +1588,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co Stopwatch Timer; auto _ = MakeGuard([&]() { ZEN_INFO("Replayed {} requests in {}", RequestCount, NiceLatencyNs(Timer.GetElapsedTimeUs() * 1000)); }); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); ZEN_INFO("Replaying {} requests", RequestCount); for (uint64_t RequestIndex = 0; RequestIndex < RequestCount; ++RequestIndex) { @@ -1638,8 +1639,8 @@ HttpStructuredCacheService::ReplayRequestRecorder(const CacheRequestContext& Co } }); } - Work.Wait(10000, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(10000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { + ZEN_UNUSED(IsAborted, IsPaused); ZEN_INFO("Replayed {} of {} requests, elapsed {}", RequestCount - PendingWork, RequestCount, diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp index e097147fc..055376b5c 100644 --- a/src/zenserver/config.cpp +++ b/src/zenserver/config.cpp @@ -443,6 +443,7 @@ ParseConfigFile(const std::filesystem::path& Path, LuaOptions.AddOption("server.logid"sv, ServerOptions.LogId, "log-id"sv); LuaOptions.AddOption("server.sentry.disable"sv, ServerOptions.NoSentry, "no-sentry"sv); LuaOptions.AddOption("server.sentry.allowpersonalinfo"sv, ServerOptions.SentryAllowPII, "sentry-allow-personal-info"sv); + LuaOptions.AddOption("server.sentry.dsn"sv, ServerOptions.SentryDsn, "sentry-dsn"sv); LuaOptions.AddOption("server.systemrootdir"sv, ServerOptions.SystemRootDir, "system-dir"sv); LuaOptions.AddOption("server.datadir"sv, ServerOptions.DataDir, "data-dir"sv); LuaOptions.AddOption("server.contentdir"sv, ServerOptions.ContentDir, "content-dir"sv); @@ -762,6 +763,7 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) options.add_options()("sentry-allow-personal-info", "Allow personally identifiable information in sentry crash reports", cxxopts::value<bool>(ServerOptions.SentryAllowPII)->default_value("false")); + options.add_options()("sentry-dsn", "Sentry DSN to send events to", cxxopts::value<std::string>(ServerOptions.SentryDsn)); options.add_options()("detach", "Indicate whether zenserver should detach from parent process group", cxxopts::value<bool>(ServerOptions.Detach)->default_value("true")); diff --git a/src/zenserver/config.h b/src/zenserver/config.h index 1d7d22ce9..1a1793b8d 100644 --- a/src/zenserver/config.h +++ b/src/zenserver/config.h @@ -169,31 +169,32 @@ struct ZenServerOptions ZenBuildStoreConfig BuildStoreConfig; ZenStatsConfig StatsConfig; ZenWorkspacesConfig WorksSpacesConfig; - std::filesystem::path SystemRootDir; // System root directory (used for machine level config) - std::filesystem::path DataDir; // Root directory for state (used for testing) - std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) - std::filesystem::path AbsLogFile; // Absolute path to main log file - std::filesystem::path ConfigFile; // Path to Lua config file - std::filesystem::path PluginsConfigFile; // Path to plugins config file - std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start) - std::string ChildId; // Id assigned by parent process (used for lifetime management) - std::string LogId; // Id for tagging log output - std::string EncryptionKey; // 256 bit AES encryption key - std::string EncryptionIV; // 128 bit AES initialization vector - int BasePort = 8558; // Service listen port (used for both UDP and TCP) - int OwnerPid = 0; // Parent process id (zero for standalone) - bool InstallService = false; // Flag used to initiate service install (temporary) - bool UninstallService = false; // Flag used to initiate service uninstall (temporary) - bool IsDebug = false; - bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not - bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization - bool IsTest = false; - bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements - bool ShouldCrash = false; // Option for testing crash handling - bool IsFirstRun = false; - bool NoSentry = false; - bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports - bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux) + std::filesystem::path SystemRootDir; // System root directory (used for machine level config) + std::filesystem::path DataDir; // Root directory for state (used for testing) + std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental) + std::filesystem::path AbsLogFile; // Absolute path to main log file + std::filesystem::path ConfigFile; // Path to Lua config file + std::filesystem::path PluginsConfigFile; // Path to plugins config file + std::filesystem::path BaseSnapshotDir; // Path to server state snapshot (will be copied into data dir on start) + std::string ChildId; // Id assigned by parent process (used for lifetime management) + std::string LogId; // Id for tagging log output + std::string EncryptionKey; // 256 bit AES encryption key + std::string EncryptionIV; // 128 bit AES initialization vector + int BasePort = 8558; // Service listen port (used for both UDP and TCP) + int OwnerPid = 0; // Parent process id (zero for standalone) + bool InstallService = false; // Flag used to initiate service install (temporary) + bool UninstallService = false; // Flag used to initiate service uninstall (temporary) + bool IsDebug = false; + bool IsCleanStart = false; // Indicates whether all state should be wiped on startup or not + bool IsPowerCycle = false; // When true, the process shuts down immediately after initialization + bool IsTest = false; + bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements + bool ShouldCrash = false; // Option for testing crash handling + bool IsFirstRun = false; + bool NoSentry = false; + bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports + std::string SentryDsn; + bool Detach = true; // Whether zenserver should detach from existing process group (Mac/Linux) bool ObjectStoreEnabled = false; bool NoConsoleOutput = false; // Control default use of stdout for diagnostics std::string Loggers[zen::logging::level::LogLevelCount]; diff --git a/src/zenserver/main.cpp b/src/zenserver/main.cpp index 0f647cd5c..868126533 100644 --- a/src/zenserver/main.cpp +++ b/src/zenserver/main.cpp @@ -101,7 +101,11 @@ ZenEntryPoint::Run() std::string SentryDatabasePath = (m_ServerOptions.DataDir / ".sentry-native").string(); std::string SentryAttachmentPath = m_ServerOptions.AbsLogFile.string(); - Sentry.Initialize(SentryDatabasePath, SentryAttachmentPath, m_ServerOptions.SentryAllowPII, m_ServerOptions.CommandLine); + Sentry.Initialize(SentryDatabasePath, + SentryAttachmentPath, + m_ServerOptions.SentryDsn, + m_ServerOptions.SentryAllowPII, + m_ServerOptions.CommandLine); } #endif try diff --git a/src/zenserver/projectstore/projectstore.cpp b/src/zenserver/projectstore/projectstore.cpp index 3ec4373a2..6359b9db9 100644 --- a/src/zenserver/projectstore/projectstore.cpp +++ b/src/zenserver/projectstore/projectstore.cpp @@ -1010,8 +1010,8 @@ struct ProjectStore::OplogStorage : public RefCounted .OpCoreHash = OpData.OpCoreHash, .OpKeyHash = OpData.KeyHash}; - m_Oplog.Append(Entry); m_OpBlobs.Write(OpData.Buffer.GetData(), WriteSize, WriteOffset); + m_Oplog.Append(Entry); return Entry; } @@ -1597,7 +1597,8 @@ ProjectStore::Oplog::Validate(std::atomic_bool& IsCancelledFlag, WorkerThreadPoo }; std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t OpIndex = 0; !IsCancelledFlag && OpIndex < Result.OpCount; OpIndex++) { if (OptionalWorkerPool) @@ -2111,7 +2112,8 @@ ProjectStore::Oplog::IterateChunks(std::span<Oid> ChunkIds, { std::atomic_bool Result = true; std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (size_t ChunkIndex = 0; ChunkIndex < FileChunkIndexes.size(); ChunkIndex++) { @@ -3890,7 +3892,8 @@ ProjectStore::Flush() WorkerThreadPool& WorkerPool = GetSmallWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (const Ref<Project>& Project : Projects) { diff --git a/src/zenstore/blockstore.cpp b/src/zenstore/blockstore.cpp index 5081ae65d..7b56c64bd 100644 --- a/src/zenstore/blockstore.cpp +++ b/src/zenstore/blockstore.cpp @@ -70,7 +70,7 @@ BlockStoreFile::Open() return false; } ZEN_WARN("Failed to open cas block '{}', reason: '{}', retries left: {}.", m_Path, Ec.message(), RetriesLeft); - Sleep(100 - (3 - RetriesLeft) * 100); // Total 600 ms + Sleep(100 + (3 - RetriesLeft) * 100); // Total 600 ms RetriesLeft--; return true; }); @@ -286,6 +286,14 @@ BlockStore::BlockStore() BlockStore::~BlockStore() { + try + { + Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~BlockStore() failed with: ", Ex.what()); + } } void @@ -307,6 +315,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max if (IsDir(m_BlocksBasePath)) { + std::vector<std::filesystem::path> EmptyBlockFiles; uint32_t NextBlockIndex = 0; std::vector<std::filesystem::path> FoldersToScan; FoldersToScan.push_back(m_BlocksBasePath); @@ -334,6 +343,12 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max { continue; } + if (Entry.file_size() == 0) + { + EmptyBlockFiles.push_back(Path); + continue; + } + Ref<BlockStoreFile> BlockFile{new BlockStoreFile(Path)}; BlockFile->Open(); m_TotalSize.fetch_add(BlockFile->TotalSize(), std::memory_order::relaxed); @@ -347,6 +362,17 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max } ++FolderOffset; } + + for (const std::filesystem::path& EmptyBlockFile : EmptyBlockFiles) + { + std::error_code Ec; + RemoveFile(EmptyBlockFile, Ec); + if (Ec) + { + ZEN_WARN("Unable to remove empty block file {}. Reason: {}", EmptyBlockFile, Ec.message()); + } + } + m_WriteBlockIndex.store(NextBlockIndex, std::memory_order_release); } else @@ -355,7 +381,7 @@ BlockStore::Initialize(const std::filesystem::path& BlocksBasePath, uint64_t Max } } -void +BlockStore::BlockIndexSet BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) { ZEN_MEMSCOPE(GetBlocksTag()); @@ -363,8 +389,8 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) RwLock::ExclusiveLockScope InsertLock(m_InsertLock); - tsl::robin_set<uint32_t> MissingBlocks; - tsl::robin_set<uint32_t> DeleteBlocks; + BlockIndexSet MissingBlocks; + BlockIndexSet DeleteBlocks; DeleteBlocks.reserve(m_ChunkBlocks.size()); for (auto It : m_ChunkBlocks) { @@ -383,13 +409,6 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) MissingBlocks.insert(BlockIndex); } } - for (std::uint32_t BlockIndex : MissingBlocks) - { - std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); - Ref<BlockStoreFile> NewBlockFile(new BlockStoreFile(BlockPath)); - NewBlockFile->Create(0); - m_ChunkBlocks[BlockIndex] = NewBlockFile; - } for (std::uint32_t BlockIndex : DeleteBlocks) { std::filesystem::path BlockPath = GetBlockPath(m_BlocksBasePath, BlockIndex); @@ -400,6 +419,7 @@ BlockStore::SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks) } m_ChunkBlocks.erase(BlockIndex); } + return MissingBlocks; } BlockStore::BlockEntryCountMap @@ -1037,6 +1057,7 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, { Continue = ChangeCallback(MovedChunks, ScrubbedChunks, RemovedSize > AddedSize ? RemovedSize - AddedSize : 0); DeletedSize += RemovedSize; + m_TotalSize.fetch_add(AddedSize); RemovedSize = 0; AddedSize = 0; MovedCount += MovedChunks.size(); @@ -1220,14 +1241,13 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, NiceBytes(Space.Free + ReclaimedSpace)); } NewBlockFile->Create(m_MaxBlockSize); - NewBlockIndex = NextBlockIndex; - WriteOffset = 0; - AddedSize += WriteOffset; + NewBlockIndex = NextBlockIndex; WriteOffset = 0; TargetFileBuffer = std::make_unique<BasicFileWriter>(NewBlockFile->GetBasicFile(), Min(256u * 1024u, m_MaxBlockSize)); } - WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment); + const uint64_t OldWriteOffset = WriteOffset; + WriteOffset = TargetFileBuffer->AlignTo(PayloadAlignment); TargetFileBuffer->Write(ChunkView.GetData(), ChunkLocation.Size, WriteOffset); MovedChunks.push_back( @@ -1235,8 +1255,9 @@ BlockStore::CompactBlocks(const BlockStoreCompactState& CompactState, WriteOffset += ChunkLocation.Size; MovedFromBlock += RoundUp(ChunkLocation.Offset + ChunkLocation.Size, PayloadAlignment) - ChunkLocation.Offset; + uint64_t WrittenBytes = WriteOffset - OldWriteOffset; + AddedSize += WrittenBytes; } - AddedSize += WriteOffset; ZEN_INFO("{}moved {} chunks ({}) from '{}' to new block, freeing {}", LogPrefix, KeepChunkIndexes.size(), diff --git a/src/zenstore/buildstore/buildstore.cpp b/src/zenstore/buildstore/buildstore.cpp index 41c747e08..c25f762f5 100644 --- a/src/zenstore/buildstore/buildstore.cpp +++ b/src/zenstore/buildstore/buildstore.cpp @@ -177,22 +177,60 @@ BuildStore::BuildStore(const BuildStoreConfig& Config, GcManager& Gc) m_Config.SmallBlobBlockStoreAlignement, IsNew); m_MetadataBlockStore.Initialize(Config.RootDirectory / "metadata", m_Config.MetadataBlockStoreMaxBlockSize, 1u << 20); + + BlockStore::BlockIndexSet KnownBlocks; + for (const BlobEntry& Blob : m_BlobEntries) { - BlockStore::BlockIndexSet KnownBlocks; - for (const BlobEntry& Blob : m_BlobEntries) + if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) { - if (const MetadataIndex MetaIndex = Blob.Metadata; MetaIndex) - { - const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; - KnownBlocks.insert(Metadata.Location.BlockIndex); - } + const MetadataEntry& Metadata = m_MetadataEntries[MetaIndex]; + KnownBlocks.insert(Metadata.Location.BlockIndex); } - m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); } + BlockStore::BlockIndexSet MissingBlocks = m_MetadataBlockStore.SyncExistingBlocksOnDisk(KnownBlocks); m_PayloadlogFile.Open(BlobLogPath, CasLogFile::Mode::kWrite); m_MetadatalogFile.Open(MetaLogPath, CasLogFile::Mode::kWrite); + if (!MissingBlocks.empty()) + { + std::vector<MetadataDiskEntry> MissingMetadatas; + for (auto& It : m_BlobLookup) + { + const IoHash& BlobHash = It.first; + const BlobIndex ReadBlobIndex = It.second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + if (ReadBlobEntry.Metadata) + { + const MetadataEntry& MetaData = m_MetadataEntries[ReadBlobEntry.Metadata]; + if (MissingBlocks.contains(MetaData.Location.BlockIndex)) + { + MissingMetadatas.push_back( + MetadataDiskEntry{.Entry = m_MetadataEntries[ReadBlobEntry.Metadata], .BlobHash = BlobHash}); + MissingMetadatas.back().Entry.Flags |= MetadataEntry::kTombStone; + m_MetadataEntries[ReadBlobEntry.Metadata] = {}; + m_BlobEntries[ReadBlobIndex].Metadata = {}; + } + } + } + ZEN_ASSERT(!MissingMetadatas.empty()); + + for (const MetadataDiskEntry& Entry : MissingMetadatas) + { + auto It = m_BlobLookup.find(Entry.BlobHash); + ZEN_ASSERT(It != m_BlobLookup.end()); + + const BlobIndex ReadBlobIndex = It->second; + const BlobEntry& ReadBlobEntry = m_BlobEntries[ReadBlobIndex]; + if (!ReadBlobEntry.Payload) + { + m_BlobLookup.erase(It); + } + } + m_MetadatalogFile.Append(MissingMetadatas); + CompactState(); + } + m_Gc.AddGcReferencer(*this); m_Gc.AddGcReferenceLocker(*this); m_Gc.AddGcStorage(this); @@ -256,34 +294,36 @@ BuildStore::PutBlob(const IoHash& BlobHash, const IoBuffer& Payload) ZEN_UNUSED(Result); Entry = PayloadEntry(0, PayloadSize); } - m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); - RwLock::ExclusiveLockScope _(m_Lock); - if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { - const BlobIndex ExistingBlobIndex = It->second; - BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; - if (Blob.Payload) + RwLock::ExclusiveLockScope _(m_Lock); + if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { - m_PayloadEntries[Blob.Payload] = Entry; + const BlobIndex ExistingBlobIndex = It->second; + BlobEntry& Blob = m_BlobEntries[ExistingBlobIndex]; + if (Blob.Payload) + { + m_PayloadEntries[Blob.Payload] = Entry; + } + else + { + Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + m_PayloadEntries.push_back(Entry); + } + Blob.LastAccessTime = GcClock::TickCount(); } else { - Blob.Payload = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); + PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); m_PayloadEntries.push_back(Entry); - } - Blob.LastAccessTime = GcClock::TickCount(); - } - else - { - PayloadIndex NewPayloadIndex = PayloadIndex(gsl::narrow<uint32_t>(m_PayloadEntries.size())); - m_PayloadEntries.push_back(Entry); - const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); - // we only remove during GC and compact this then... - m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); - m_BlobLookup.insert({BlobHash, NewBlobIndex}); + const BlobIndex NewBlobIndex(gsl::narrow<uint32_t>(m_BlobEntries.size())); + // we only remove during GC and compact this then... + m_BlobEntries.push_back(BlobEntry{.Payload = NewPayloadIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); + m_BlobLookup.insert({BlobHash, NewBlobIndex}); + } } + m_PayloadlogFile.Append(PayloadDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); m_LastAccessTimeUpdateCount++; } @@ -370,7 +410,6 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB const BlockStoreLocation& Location = Locations[LocationIndex]; MetadataEntry Entry = {.Location = Location, .ContentType = Data.GetContentType(), .Flags = 0}; - m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); if (auto It = m_BlobLookup.find(BlobHash); It != m_BlobLookup.end()) { @@ -396,6 +435,9 @@ BuildStore::PutMetadatas(std::span<const IoHash> BlobHashes, std::span<const IoB m_BlobEntries.push_back(BlobEntry{.Metadata = NewMetadataIndex, .LastAccessTime = AccessTime(GcClock::TickCount())}); m_BlobLookup.insert({BlobHash, NewBlobIndex}); } + + m_MetadatalogFile.Append(MetadataDiskEntry{.Entry = Entry, .BlobHash = BlobHash}); + m_LastAccessTimeUpdateCount++; WriteBlobIndex++; if (m_TrackedCacheKeys) @@ -483,7 +525,8 @@ BuildStore::GetMetadatas(std::span<const IoHash> BlobHashes, WorkerThreadPool* O if (!MetaLocations.empty()) { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); m_MetadataBlockStore.IterateChunks( MetaLocations, diff --git a/src/zenstore/cache/cachedisklayer.cpp b/src/zenstore/cache/cachedisklayer.cpp index e973cee77..0ee70890c 100644 --- a/src/zenstore/cache/cachedisklayer.cpp +++ b/src/zenstore/cache/cachedisklayer.cpp @@ -751,6 +751,16 @@ ZenCacheDiskLayer::CacheBucket::CacheBucket(GcManager& Gc, ZenCacheDiskLayer::CacheBucket::~CacheBucket() { + try + { + m_SlogFile.Flush(); + m_SlogFile.Close(); + m_BlockStore.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CacheBucket() failed with: ", Ex.what()); + } m_Gc.RemoveGcReferencer(*this); } @@ -824,11 +834,13 @@ ZenCacheDiskLayer::CacheBucket::OpenOrCreate(std::filesystem::path BucketDir, bo } void -ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc) +ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(uint64_t LogPosition, + bool ResetLog, + const std::function<uint64_t()>& ClaimDiskReserveFunc) { ZEN_TRACE_CPU("Z$::Bucket::WriteIndexSnapshot"); - if (m_LogFlushPosition == m_SlogFile.GetLogCount()) + if (m_LogFlushPosition == LogPosition) { return; } @@ -877,7 +889,7 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st throw std::system_error(Ec, fmt::format("failed to create new snapshot file in '{}'", m_BucketDir)); } - const uint64_t IndexLogPosition = ResetLog ? 0 : m_SlogFile.GetLogCount(); + const uint64_t IndexLogPosition = ResetLog ? 0 : LogPosition; cache::impl::CacheBucketIndexHeader Header = {.EntryCount = EntryCount, .LogPosition = IndexLogPosition, @@ -930,12 +942,14 @@ ZenCacheDiskLayer::CacheBucket::WriteIndexSnapshotLocked(bool ResetLog, const st if (IsFile(LogPath)) { + m_SlogFile.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; @@ -1149,13 +1163,6 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco } } - if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0) - { - WriteIndexSnapshot(IndexLock, /*Flush log*/ true); - } - - m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_Index) { @@ -1173,7 +1180,53 @@ ZenCacheDiskLayer::CacheBucket::InitializeIndexFromDisk(RwLock::ExclusiveLockSco KnownBlocks.insert(BlockIndex); } } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + m_SlogFile.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<DiskIndexEntry> MissingEntries; + + for (auto& It : m_Index) + { + BucketPayload& Payload = m_Payloads[It.second]; + DiskLocation Location = Payload.Location; + if (!Location.IsFlagSet(DiskLocation::kStandaloneFile)) + { + if (MissingBlocks.contains(Location.Location.BlockLocation.GetBlockIndex())) + { + RemoveMemCachedData(IndexLock, Payload); + RemoveMetaData(IndexLock, Payload); + } + } + Location.Flags |= DiskLocation::kTombStone; + MissingEntries.push_back(DiskIndexEntry{.Key = It.first, .Location = Location}); + } + + ZEN_ASSERT(!MissingEntries.empty()); + + for (const DiskIndexEntry& Entry : MissingEntries) + { + m_Index.erase(Entry.Key); + } + m_SlogFile.Append(MissingEntries); + m_SlogFile.Flush(); + { + std::vector<BucketPayload> Payloads; + std::vector<AccessTime> AccessTimes; + std::vector<BucketMetaData> MetaDatas; + std::vector<MemCacheData> MemCachedPayloads; + IndexMap Index; + CompactState(IndexLock, Payloads, AccessTimes, MetaDatas, MemCachedPayloads, Index); + } + RemovedEntries = true; + } + + if (IsNew || LogEntryCount > 0 || m_LogFlushPosition != 0 || RemovedEntries) + { + WriteIndexSnapshot(IndexLock, m_SlogFile.GetLogCount(), /*Flush log*/ true); + } } void @@ -2024,6 +2077,9 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl ZEN_TRACE_CPU("Z$::Bucket::SaveSnapshot"); try { + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t LogPosition = m_SlogFile.GetLogCount(); + bool UseLegacyScheme = false; IoBuffer Buffer; @@ -2038,7 +2094,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); // Note: this copy could be eliminated on shutdown to // reduce memory usage and execution time Index = m_Index; @@ -2078,7 +2134,7 @@ ZenCacheDiskLayer::CacheBucket::SaveSnapshot(const std::function<uint64_t()>& Cl else { RwLock::SharedLockScope IndexLock(m_IndexLock); - WriteIndexSnapshot(IndexLock, /*Flush log*/ false); + WriteIndexSnapshot(IndexLock, LogPosition, /*Flush log*/ false); const uint64_t EntryCount = m_Index.size(); Buffer = ManifestWriter.MakeSidecarManifest(m_BucketId, EntryCount); uint64_t SidecarSize = ManifestWriter.GetSidecarSize(); @@ -2727,7 +2783,6 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, ZEN_MEMSCOPE(GetCacheDiskTag()); ZEN_TRACE_CPU("Z$::Bucket::UpdateLocation"); DiskLocation Location(BlockStoreLocation, m_Configuration.PayloadAlignment, EntryFlags); - m_SlogFile.Append({.Key = HashKey, .Location = Location}); RwLock::ExclusiveLockScope IndexLock(m_IndexLock); if (m_TrackedCacheKeys) @@ -2757,6 +2812,7 @@ ZenCacheDiskLayer::CacheBucket::PutInlineCacheValue(const IoHash& HashKey, m_AccessTimes.emplace_back(GcClock::TickCount()); m_Index.insert_or_assign(HashKey, EntryIndex); } + m_SlogFile.Append({.Key = HashKey, .Location = Location}); }); } @@ -3978,7 +4034,8 @@ ZenCacheDiskLayer::DiscoverBuckets() WorkerThreadPool& Pool = GetLargeWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (auto& BucketPath : FoundBucketDirectories) { Work.ScheduleWork(Pool, [this, &SyncLock, BucketPath](std::atomic<bool>&) { @@ -4141,7 +4198,8 @@ ZenCacheDiskLayer::Flush() { WorkerThreadPool& Pool = GetMediumWorkerPool(EWorkloadType::Burst); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); try { for (auto& Bucket : Buckets) @@ -4164,8 +4222,10 @@ ZenCacheDiskLayer::Flush() { ZEN_ERROR("Failed to flush buckets at '{}'. Reason: '{}'", m_RootDir, Ex.what()); } - Work.Wait(1000, - [&](std::ptrdiff_t Remaining, bool) { ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", Remaining, m_RootDir); }); + Work.Wait(1000, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t RemainingWork) { + ZEN_UNUSED(IsAborted, IsPaused); + ZEN_DEBUG("Waiting for {} buckets at '{}' to flush", RemainingWork, m_RootDir); + }); } } diff --git a/src/zenstore/compactcas.cpp b/src/zenstore/compactcas.cpp index 75562176e..2ab5752ff 100644 --- a/src/zenstore/compactcas.cpp +++ b/src/zenstore/compactcas.cpp @@ -145,6 +145,16 @@ CasContainerStrategy::CasContainerStrategy(GcManager& Gc) : m_Log(logging::Get(" CasContainerStrategy::~CasContainerStrategy() { + try + { + m_BlockStore.Close(); + m_CasLog.Flush(); + m_CasLog.Close(); + } + catch (const std::exception& Ex) + { + ZEN_ERROR("~CasContainerStrategy failed with: ", Ex.what()); + } m_Gc.RemoveGcReferenceStore(*this); m_Gc.RemoveGcStorage(this); } @@ -204,12 +214,12 @@ CasContainerStrategy::InsertChunk(const void* ChunkData, size_t ChunkSize, const ZEN_TRACE_CPU("CasContainer::UpdateLocation"); BlockStoreDiskLocation DiskLocation(Location, m_PayloadAlignment); const CasDiskIndexEntry IndexEntry{.Key = ChunkHash, .Location = DiskLocation}; - m_CasLog.Append(IndexEntry); { RwLock::ExclusiveLockScope _(m_LocationMapLock); m_LocationMap.emplace(ChunkHash, m_Locations.size()); m_Locations.push_back(DiskLocation); } + m_CasLog.Append(IndexEntry); }); return CasStore::InsertResult{.New = true}; @@ -273,7 +283,6 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c IndexEntries.emplace_back( CasDiskIndexEntry{.Key = ChunkHashes[ChunkIndex], .Location = BlockStoreDiskLocation(Location, m_PayloadAlignment)}); } - m_CasLog.Append(IndexEntries); { RwLock::ExclusiveLockScope _(m_LocationMapLock); for (const CasDiskIndexEntry& DiskIndexEntry : IndexEntries) @@ -282,6 +291,7 @@ CasContainerStrategy::InsertChunks(std::span<const IoBuffer> Chunks, std::span<c m_Locations.push_back(DiskIndexEntry.Location); } } + m_CasLog.Append(IndexEntries); }); return Result; } @@ -402,7 +412,8 @@ CasContainerStrategy::IterateChunks(std::span<const IoHash> ChunkHas std::atomic<bool> AsyncContinue = true; { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); const bool Continue = m_BlockStore.IterateChunks( FoundChunkLocations, [this, @@ -745,19 +756,27 @@ public: MovedEntries.push_back(CasDiskIndexEntry{.Key = Key, .Location = Location}); } } - for (size_t ScrubbedIndex : ScrubbedArray) + for (size_t ChunkIndex : ScrubbedArray) { - const IoHash& Key = BlockCompactStateKeys[ScrubbedIndex]; + const IoHash& Key = BlockCompactStateKeys[ChunkIndex]; if (auto It = m_CasContainerStrategy.m_LocationMap.find(Key); It != m_CasContainerStrategy.m_LocationMap.end()) { - BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + BlockStoreDiskLocation& Location = m_CasContainerStrategy.m_Locations[It->second]; + const BlockStoreLocation& OldLocation = BlockCompactState.GetLocation(ChunkIndex); + if (Location.Get(m_CasContainerStrategy.m_PayloadAlignment) != OldLocation) + { + // Someone has moved our chunk so lets just skip the new location we were provided, it will be + // GC:d at a later time + continue; + } MovedEntries.push_back( CasDiskIndexEntry{.Key = Key, .Location = Location, .Flags = CasDiskIndexEntry::kTombstone}); m_CasContainerStrategy.m_LocationMap.erase(It); } } m_CasContainerStrategy.m_CasLog.Append(MovedEntries); + m_CasContainerStrategy.m_CasLog.Flush(); Stats.RemovedDisk += FreedDiskSpace; if (Ctx.IsCancelledFlag.load()) { @@ -983,14 +1002,11 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) { // Write the current state of the location map to a new index state std::vector<CasDiskIndexEntry> Entries; - uint64_t IndexLogPosition = 0; + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount(); { RwLock::SharedLockScope ___(m_LocationMapLock); - if (!ResetLog) - { - IndexLogPosition = m_CasLog.GetLogCount(); - } Entries.resize(m_LocationMap.size()); uint64_t EntryIndex = 0; @@ -1035,12 +1051,14 @@ CasContainerStrategy::MakeIndexSnapshot(bool ResetLog) if (IsFile(LogPath)) { + m_CasLog.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; @@ -1153,7 +1171,7 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski ZEN_WARN("reading full log at '{}', reason: Log position from index snapshot is out of range", LogPath); SkipEntryCount = 0; } - LogEntryCount = EntryCount - SkipEntryCount; + LogEntryCount = SkipEntryCount; CasLog.Replay( [&](const CasDiskIndexEntry& Record) { LogEntryCount++; @@ -1172,7 +1190,6 @@ CasContainerStrategy::ReadLog(const std::filesystem::path& LogPath, uint64_t Ski m_Locations.push_back(Record.Location); }, SkipEntryCount); - return LogEntryCount; } return 0; @@ -1228,8 +1245,6 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) } } - m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); - BlockStore::BlockIndexSet KnownBlocks; for (const auto& Entry : m_LocationMap) @@ -1239,9 +1254,39 @@ CasContainerStrategy::OpenContainer(bool IsNewStore) KnownBlocks.insert(BlockIndex); } - m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + BlockStore::BlockIndexSet MissingBlocks = m_BlockStore.SyncExistingBlocksOnDisk(KnownBlocks); + + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); + + bool RemovedEntries = false; + if (!MissingBlocks.empty()) + { + std::vector<CasDiskIndexEntry> MissingEntries; + for (auto& It : m_LocationMap) + { + const uint32_t BlockIndex = m_Locations[It.second].GetBlockIndex(); + if (MissingBlocks.contains(BlockIndex)) + { + MissingEntries.push_back({.Key = It.first, .Location = m_Locations[It.second], .Flags = CasDiskIndexEntry::kTombstone}); + } + } + ZEN_ASSERT(!MissingEntries.empty()); + + for (const CasDiskIndexEntry& Entry : MissingEntries) + { + m_LocationMap.erase(Entry.Key); + } + m_CasLog.Append(MissingEntries); + m_CasLog.Flush(); + + { + RwLock::ExclusiveLockScope IndexLock(m_LocationMapLock); + CompactIndex(IndexLock); + } + RemovedEntries = true; + } - if (IsNewStore || (LogEntryCount > 0)) + if (IsNewStore || (LogEntryCount > 0) || RemovedEntries) { MakeIndexSnapshot(/*ResetLog*/ true); } @@ -1611,6 +1656,236 @@ TEST_CASE("compactcas.threadedinsert") } } +TEST_CASE("compactcas.restart") +{ + uint64_t ExpectedSize = 0; + + auto GenerateChunks = [&](CasContainerStrategy& Cas, size_t ChunkCount, uint64_t ChunkSize, std::vector<IoHash>& Hashes) { + WorkerThreadPool ThreadPool(Max(std::thread::hardware_concurrency() - 1u, 2u), "put"); + + Latch WorkLatch(1); + tsl::robin_set<IoHash, IoHash::Hasher> ChunkHashesLookup; + ChunkHashesLookup.reserve(ChunkCount); + RwLock InsertLock; + for (size_t Offset = 0; Offset < ChunkCount;) + { + size_t BatchCount = Min<size_t>(ChunkCount - Offset, 512u); + WorkLatch.AddCount(1); + ThreadPool.ScheduleWork( + [&WorkLatch, &InsertLock, &ChunkHashesLookup, &ExpectedSize, &Hashes, &Cas, Offset, BatchCount, ChunkSize]() { + auto _ = MakeGuard([&WorkLatch]() { WorkLatch.CountDown(); }); + + std::vector<IoBuffer> BatchBlobs; + std::vector<IoHash> BatchHashes; + BatchBlobs.reserve(BatchCount); + BatchHashes.reserve(BatchCount); + + while (BatchBlobs.size() < BatchCount) + { + IoBuffer Chunk = + CreateSemiRandomBlob(ChunkSize + ((BatchHashes.size() % 100) + (BatchHashes.size() % 7) * 315u + Offset % 377)); + IoHash Hash = IoHash::HashBuffer(Chunk); + { + RwLock::ExclusiveLockScope __(InsertLock); + if (ChunkHashesLookup.contains(Hash)) + { + continue; + } + ChunkHashesLookup.insert(Hash); + ExpectedSize += Chunk.Size(); + } + + BatchBlobs.emplace_back(CompressedBuffer::Compress(SharedBuffer(Chunk)).GetCompressed().Flatten().AsIoBuffer()); + BatchHashes.push_back(Hash); + } + + Cas.InsertChunks(BatchBlobs, BatchHashes); + { + RwLock::ExclusiveLockScope __(InsertLock); + Hashes.insert(Hashes.end(), BatchHashes.begin(), BatchHashes.end()); + } + }); + Offset += BatchCount; + } + WorkLatch.CountDown(); + WorkLatch.Wait(); + }; + + ScopedTemporaryDirectory TempDir; + std::filesystem::path CasPath = TempDir.Path(); + CreateDirectories(CasPath); + + bool Generate = false; + if (!Generate) + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + } + + const uint64_t kChunkSize = 1048 + 395; + const size_t kChunkCount = 7167; + + std::vector<IoHash> Hashes; + Hashes.reserve(kChunkCount); + + auto ValidateChunks = [&](CasContainerStrategy& Cas, std::span<const IoHash> Hashes, bool ShouldExist) { + for (const IoHash& Hash : Hashes) + { + if (ShouldExist) + { + CHECK(Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(Buffer); + IoHash ValidateHash; + uint64_t ValidateRawSize; + CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), ValidateHash, ValidateRawSize); + CHECK(Compressed); + CHECK(ValidateHash == Hash); + } + else + { + CHECK(!Cas.HaveChunk(Hash)); + IoBuffer Buffer = Cas.FindChunk(Hash); + CHECK(!Buffer); + } + } + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, true); + GenerateChunks(Cas, kChunkCount, kChunkSize, Hashes); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 4, Hashes); + ValidateChunks(Cas, Hashes, true); + } + + class GcRefChecker : public GcReferenceChecker + { + public: + explicit GcRefChecker(std::vector<IoHash>&& HashesToKeep) : m_HashesToKeep(std::move(HashesToKeep)) {} + ~GcRefChecker() {} + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + void PreCache(GcCtx& Ctx) override { FilterReferences(Ctx, "test", m_HashesToKeep); } + void UpdateLockedState(GcCtx& Ctx) override { ZEN_UNUSED(Ctx); } + std::span<IoHash> GetUnusedReferences(GcCtx& Ctx, std::span<IoHash> IoCids) override + { + ZEN_UNUSED(Ctx); + return KeepUnusedReferences(m_HashesToKeep, IoCids); + } + + private: + std::vector<IoHash> m_HashesToKeep; + }; + + class GcRef : public GcReferencer + { + public: + GcRef(GcManager& Gc, std::span<const IoHash> HashesToKeep) : m_Gc(Gc) + { + m_HashesToKeep.insert(m_HashesToKeep.begin(), HashesToKeep.begin(), HashesToKeep.end()); + m_Gc.AddGcReferencer(*this); + } + ~GcRef() { m_Gc.RemoveGcReferencer(*this); } + std::string GetGcName(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return "test"; + } + GcStoreCompactor* RemoveExpiredData(GcCtx& Ctx, GcStats& Stats) override + { + ZEN_UNUSED(Ctx, Stats); + return nullptr; + } + std::vector<GcReferenceChecker*> CreateReferenceCheckers(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {new GcRefChecker(std::move(m_HashesToKeep))}; + } + std::vector<GcReferenceValidator*> CreateReferenceValidators(GcCtx& Ctx) override + { + ZEN_UNUSED(Ctx); + return {}; + } + + private: + GcManager& m_Gc; + std::vector<IoHash> m_HashesToKeep; + }; + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + GenerateChunks(Cas, kChunkCount, kChunkSize / 5, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + GenerateChunks(Cas, kChunkCount, kChunkSize / 2, Hashes); + ValidateChunks(Cas, Hashes, true); + if (true) + { + std::vector<IoHash> DropHashes; + std::vector<IoHash> KeepHashes; + for (size_t Index = 0; Index < Hashes.size(); Index++) + { + if (Index % 5 == 0) + { + KeepHashes.push_back(Hashes[Index]); + } + else + { + DropHashes.push_back(Hashes[Index]); + } + } + // std::span<const IoHash> KeepHashes(Hashes); + // ZEN_ASSERT(ExpectedGcCount < Hashes.size()); + // KeepHashes = KeepHashes.subspan(ExpectedGcCount); + GcRef Ref(Gc, KeepHashes); + Gc.CollectGarbage(GcSettings{.CollectSmallObjects = true, .IsDeleteMode = true}); + ValidateChunks(Cas, KeepHashes, true); + ValidateChunks(Cas, DropHashes, false); + Hashes = KeepHashes; + } + GenerateChunks(Cas, kChunkCount, kChunkSize / 3, Hashes); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + Cas.Flush(); + ValidateChunks(Cas, Hashes, true); + } + + { + GcManager Gc; + CasContainerStrategy Cas(Gc); + Cas.Initialize(CasPath, "test", 65536 * 128, 8, false); + ValidateChunks(Cas, Hashes, true); + } +} + TEST_CASE("compactcas.iteratechunks") { std::atomic<size_t> WorkCompleted = 0; diff --git a/src/zenstore/filecas.cpp b/src/zenstore/filecas.cpp index 56979f267..11a266f1c 100644 --- a/src/zenstore/filecas.cpp +++ b/src/zenstore/filecas.cpp @@ -664,7 +664,8 @@ FileCasStrategy::IterateChunks(std::span<IoHash> ChunkHashes, }; std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (size_t Index = 0; Index < FoundChunkIndexes.size(); Index++) { if (!AsyncContinue) @@ -968,14 +969,11 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog) { // Write the current state of the location map to a new index state std::vector<FileCasIndexEntry> Entries; - uint64_t IndexLogPosition = 0; + // Be defensive regarding log position as it is written to without acquiring m_LocationMapLock + const uint64_t IndexLogPosition = ResetLog ? 0 : m_CasLog.GetLogCount(); { RwLock::SharedLockScope __(m_Lock); - if (!ResetLog) - { - IndexLogPosition = m_CasLog.GetLogCount(); - } Entries.resize(m_Index.size()); uint64_t EntryIndex = 0; @@ -1018,12 +1016,14 @@ FileCasStrategy::MakeIndexSnapshot(bool ResetLog) if (IsFile(LogPath)) { + m_CasLog.Close(); if (!RemoveFile(LogPath, Ec) || Ec) { // This is non-critical, it only means that we will replay the events of the log over the snapshot - inefficent but in // the end it will be the same result ZEN_WARN("Snapshot failed to clean log file '{}', reason: '{}'", LogPath, IndexPath, Ec.message()); } + m_CasLog.Open(LogPath, CasLogFile::Mode::kWrite); } } m_LogFlushPosition = IndexLogPosition; diff --git a/src/zenstore/include/zenstore/blockstore.h b/src/zenstore/include/zenstore/blockstore.h index 8cbcad11b..fce05766f 100644 --- a/src/zenstore/include/zenstore/blockstore.h +++ b/src/zenstore/include/zenstore/blockstore.h @@ -147,10 +147,10 @@ public: typedef tsl::robin_set<uint32_t> BlockIndexSet; - // Ask the store to create empty blocks for all locations that does not have a block // Remove any block that is not referenced - void SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks); - BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent); + // Return a list of blocks that are not present + [[nodiscard]] BlockIndexSet SyncExistingBlocksOnDisk(const BlockIndexSet& KnownBlocks); + BlockEntryCountMap GetBlocksToCompact(const BlockUsageMap& BlockUsage, uint32_t BlockUsageThresholdPercent); void Close(); diff --git a/src/zenstore/include/zenstore/cache/cachedisklayer.h b/src/zenstore/include/zenstore/cache/cachedisklayer.h index b67a043df..3cd2d6423 100644 --- a/src/zenstore/include/zenstore/cache/cachedisklayer.h +++ b/src/zenstore/include/zenstore/cache/cachedisklayer.h @@ -409,19 +409,22 @@ public: void SaveSnapshot(const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); void WriteIndexSnapshot( RwLock::ExclusiveLockScope&, + uint64_t LogPosition, bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }) { - WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc); + WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc); } void WriteIndexSnapshot( RwLock::SharedLockScope&, + uint64_t LogPosition, bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }) { - WriteIndexSnapshotLocked(ResetLog, ClaimDiskReserveFunc); + WriteIndexSnapshotLocked(LogPosition, ResetLog, ClaimDiskReserveFunc); } void WriteIndexSnapshotLocked( + uint64_t LogPosition, bool ResetLog, const std::function<uint64_t()>& ClaimDiskReserveFunc = []() { return 0; }); diff --git a/src/zenutil/chunkedcontent.cpp b/src/zenutil/chunkedcontent.cpp index c7532e098..cd1bf7dd7 100644 --- a/src/zenutil/chunkedcontent.cpp +++ b/src/zenutil/chunkedcontent.cpp @@ -758,14 +758,15 @@ DeletePathsFromChunkedContent(const ChunkedFolderContent& BaseContent, std::span } ChunkedFolderContent -ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateIntervalMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag) +ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag) { ZEN_TRACE_CPU("ChunkFolderContent"); @@ -804,7 +805,7 @@ ChunkFolderContent(ChunkingStatistics& Stats, RwLock Lock; - ParallelWork Work(AbortFlag); + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t PathIndex : Order) { @@ -831,10 +832,9 @@ ChunkFolderContent(ChunkingStatistics& Stats, }); } - Work.Wait(UpdateIntervalMS, [&](bool IsAborted, std::ptrdiff_t PendingWork) { - ZEN_UNUSED(IsAborted); + Work.Wait(UpdateIntervalMS, [&](bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork) { ZEN_UNUSED(PendingWork); - UpdateCallback(Work.IsAborted(), Work.PendingWork().Remaining()); + UpdateCallback(IsAborted, IsPaused, Work.PendingWork().Remaining()); }); } return Result; diff --git a/src/zenutil/commandlineoptions.cpp b/src/zenutil/commandlineoptions.cpp index 0dffa42f0..afef7f6f2 100644 --- a/src/zenutil/commandlineoptions.cpp +++ b/src/zenutil/commandlineoptions.cpp @@ -157,12 +157,7 @@ MakeSafeAbsolutePath(const std::filesystem::path& Path) std::filesystem::path StringToPath(const std::string_view& Path) { - std::string_view UnquotedPath = Path; - - if (UnquotedPath.length() > 2 && UnquotedPath.front() == '\"' && UnquotedPath.back() == '\"') - { - UnquotedPath = UnquotedPath.substr(1, UnquotedPath.length() - 2); - } + std::string_view UnquotedPath = RemoveQuotes(Path); if (UnquotedPath.ends_with('/') || UnquotedPath.ends_with('\\') || UnquotedPath.ends_with(std::filesystem::path::preferred_separator)) { @@ -172,6 +167,19 @@ StringToPath(const std::string_view& Path) return std::filesystem::path(UnquotedPath).make_preferred(); } +std::string_view +RemoveQuotes(const std::string_view& Arg) +{ + if (Arg.length() > 2) + { + if (Arg[0] == '"' && Arg[Arg.length() - 1] == '"') + { + return Arg.substr(1, Arg.length() - 2); + } + } + return Arg; +} + #if ZEN_WITH_TESTS void diff --git a/src/zenutil/include/zenutil/chunkedcontent.h b/src/zenutil/include/zenutil/chunkedcontent.h index 225b1a3a5..306a5d990 100644 --- a/src/zenutil/include/zenutil/chunkedcontent.h +++ b/src/zenutil/include/zenutil/chunkedcontent.h @@ -132,14 +132,15 @@ struct ChunkingStatistics uint64_t ElapsedWallTimeUS = 0; }; -ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, - WorkerThreadPool& WorkerPool, - const std::filesystem::path& RootPath, - const FolderContent& Content, - const ChunkingController& InChunkingController, - int32_t UpdateIntervalMS, - std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)>&& UpdateCallback, - std::atomic<bool>& AbortFlag); +ChunkedFolderContent ChunkFolderContent(ChunkingStatistics& Stats, + WorkerThreadPool& WorkerPool, + const std::filesystem::path& RootPath, + const FolderContent& Content, + const ChunkingController& InChunkingController, + int32_t UpdateIntervalMS, + std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)>&& UpdateCallback, + std::atomic<bool>& AbortFlag, + std::atomic<bool>& PauseFlag); ChunkedContentLookup BuildChunkedContentLookup(const ChunkedFolderContent& Content); diff --git a/src/zenutil/include/zenutil/commandlineoptions.h b/src/zenutil/include/zenutil/commandlineoptions.h index b7581f6cd..f927d41e5 100644 --- a/src/zenutil/include/zenutil/commandlineoptions.h +++ b/src/zenutil/include/zenutil/commandlineoptions.h @@ -22,6 +22,7 @@ std::vector<char*> StripCommandlineQuotes(std::vector<std::string>& InOutArg void MakeSafeAbsolutePathÍnPlace(std::filesystem::path& Path); [[nodiscard]] std::filesystem::path MakeSafeAbsolutePath(const std::filesystem::path& Path); std::filesystem::path StringToPath(const std::string_view& Path); +std::string_view RemoveQuotes(const std::string_view& Arg); void commandlineoptions_forcelink(); // internal diff --git a/src/zenutil/include/zenutil/parallelwork.h b/src/zenutil/include/zenutil/parallelwork.h index 08e730b28..d7e986551 100644 --- a/src/zenutil/include/zenutil/parallelwork.h +++ b/src/zenutil/include/zenutil/parallelwork.h @@ -12,13 +12,13 @@ namespace zen { class ParallelWork { public: - ParallelWork(std::atomic<bool>& AbortFlag); + ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag); ~ParallelWork(); - typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; - typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; - typedef std::function<void(bool IsAborted, std::ptrdiff_t PendingWork)> UpdateCallback; + typedef std::function<void(std::atomic<bool>& AbortFlag)> WorkCallback; + typedef std::function<void(std::exception_ptr Ex, std::atomic<bool>& AbortFlag)> ExceptionCallback; + typedef std::function<void(bool IsAborted, bool IsPaused, std::ptrdiff_t PendingWork)> UpdateCallback; void ScheduleWork(WorkerThreadPool& WorkerPool, WorkCallback&& Work, ExceptionCallback&& OnError = {}) { @@ -28,6 +28,10 @@ public: WorkerPool.ScheduleWork([this, Work = std::move(Work), OnError = OnError ? std::move(OnError) : DefaultErrorFunction()] { try { + while (m_PauseFlag && !m_AbortFlag) + { + Sleep(2000); + } Work(m_AbortFlag); } catch (...) @@ -59,6 +63,7 @@ private: void RethrowErrors(); std::atomic<bool>& m_AbortFlag; + std::atomic<bool>& m_PauseFlag; bool m_DispatchComplete = false; Latch m_PendingWork; diff --git a/src/zenutil/jupiter/jupitersession.cpp b/src/zenutil/jupiter/jupitersession.cpp index 01a703a1b..1fd59acdf 100644 --- a/src/zenutil/jupiter/jupitersession.cpp +++ b/src/zenutil/jupiter/jupitersession.cpp @@ -379,9 +379,10 @@ JupiterResult JupiterSession::ListBuilds(std::string_view Namespace, std::string_view BucketId, const IoBuffer& Payload) { ZEN_ASSERT(Payload.GetContentType() == ZenContentType::kCbObject); - HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}/{}/search", Namespace, BucketId), - Payload, - {HttpClient::Accept(ZenContentType::kCbObject)}); + std::string OptionalBucketPath = BucketId.empty() ? "" : fmt::format("/{}", BucketId); + HttpClient::Response Response = m_HttpClient.Post(fmt::format("/api/v2/builds/{}{}/search", Namespace, OptionalBucketPath), + Payload, + {HttpClient::Accept(ZenContentType::kCbObject)}); return detail::ConvertResponse(Response, "JupiterSession::ListBuilds"sv); } diff --git a/src/zenutil/parallelwork.cpp b/src/zenutil/parallelwork.cpp index ecacc4b5a..67fc03c04 100644 --- a/src/zenutil/parallelwork.cpp +++ b/src/zenutil/parallelwork.cpp @@ -14,7 +14,10 @@ namespace zen { -ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag) : m_AbortFlag(AbortFlag), m_PendingWork(1) +ParallelWork::ParallelWork(std::atomic<bool>& AbortFlag, std::atomic<bool>& PauseFlag) +: m_AbortFlag(AbortFlag) +, m_PauseFlag(PauseFlag) +, m_PendingWork(1) { } @@ -59,7 +62,7 @@ ParallelWork::Wait(int32_t UpdateIntervalMS, UpdateCallback&& UpdateCallback) while (!m_PendingWork.Wait(UpdateIntervalMS)) { - UpdateCallback(m_AbortFlag.load(), m_PendingWork.Remaining()); + UpdateCallback(m_AbortFlag.load(), m_PauseFlag.load(), m_PendingWork.Remaining()); } RethrowErrors(); @@ -111,7 +114,8 @@ ParallelWork::RethrowErrors() TEST_CASE("parallellwork.nowork") { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); Work.Wait(); } @@ -120,7 +124,8 @@ TEST_CASE("parallellwork.basic") WorkerThreadPool WorkerPool(2); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [](std::atomic<bool>& AbortFlag) { CHECK(!AbortFlag); }); @@ -133,7 +138,8 @@ TEST_CASE("parallellwork.throws_in_work") WorkerThreadPool WorkerPool(2); std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 10; I++) { Work.ScheduleWork(WorkerPool, [I](std::atomic<bool>& AbortFlag) { @@ -158,7 +164,8 @@ TEST_CASE("parallellwork.throws_in_dispatch") try { std::atomic<bool> AbortFlag; - ParallelWork Work(AbortFlag); + std::atomic<bool> PauseFlag; + ParallelWork Work(AbortFlag, PauseFlag); for (uint32_t I = 0; I < 5; I++) { Work.ScheduleWork(WorkerPool, [I, &ExecutedCount](std::atomic<bool>& AbortFlag) { diff --git a/src/zenutil/zenserverprocess.cpp b/src/zenutil/zenserverprocess.cpp index bfa0d3c49..a5b342cb0 100644 --- a/src/zenutil/zenserverprocess.cpp +++ b/src/zenutil/zenserverprocess.cpp @@ -1059,7 +1059,7 @@ ZenServerInstance::Terminate() const std::filesystem::path BaseDir = m_Env.ProgramBaseDir(); const std::filesystem::path Executable = BaseDir / "zenserver" ZEN_EXE_SUFFIX_LITERAL; ProcessHandle RunningProcess; - std::error_code Ec = FindProcess(Executable, RunningProcess); + std::error_code Ec = FindProcess(Executable, RunningProcess, /*IncludeSelf*/ false); if (Ec) { throw std::system_error(Ec, fmt::format("failed to look up running server executable '{}'", Executable)); |