diff options
| author | Dan Engelbrecht <[email protected]> | 2025-05-12 10:07:40 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2025-05-12 10:07:40 +0200 |
| commit | acc0b59e9cb349e3f9345467a3941ee25786d79e (patch) | |
| tree | 5ae9b0337063cdc206bc050738fc2bed4b330bf9 /src | |
| parent | 5.6.8-pre0 (diff) | |
| download | zen-acc0b59e9cb349e3f9345467a3941ee25786d79e.tar.xz zen-acc0b59e9cb349e3f9345467a3941ee25786d79e.zip | |
handle exception in oplog mirror (#389)
* gracefully handle errors in threaded part of oplog-mirror
Diffstat (limited to 'src')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 114 |
1 files changed, 68 insertions, 46 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 540c4c9ac..066dac781 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -2005,6 +2005,7 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg std::filesystem::path TmpPath = RootPath / ".tmp"; CreateDirectories(TmpPath); + auto _ = MakeGuard([&TmpPath]() { DeleteDirectories(TmpPath); }); std::atomic_int64_t FileCount = 0; int OplogEntryCount = 0; @@ -2017,6 +2018,8 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg std::unordered_set<std::u8string> FileNames; std::atomic<uint64_t> WrittenByteCount = 0; + std::atomic<bool> AbortFlag(false); + Stopwatch WriteStopWatch; auto EmitFilesForDataArray = [&](CbArrayView DataArray) { @@ -2045,38 +2048,50 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg EmitCount++; WorkRemaining.AddCount(1); WorkerPool.ScheduleWork( - [this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining, &WrittenByteCount]() { + [this, &RootPath, &AbortFlag, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining, &WrittenByteCount]() { auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); }); - if (HttpClient::Response ChunkResponse = - Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath)) + if (!AbortFlag) { - auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) - { - return Compressed.Decompress().AsIoBuffer(); - }; - return std::move(Buffer); - }; - - IoBuffer ChunkData = - m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload; - std::filesystem::path TargetPath = RootPath / FileName; - if (!MoveToFile(TargetPath, ChunkData)) + try + { + if (HttpClient::Response ChunkResponse = + Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath)) + { + auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) + { + return Compressed.Decompress().AsIoBuffer(); + }; + return std::move(Buffer); + }; + + IoBuffer ChunkData = + m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload; + + if (!MoveToFile(TargetPath, ChunkData)) + { + WriteFile(TargetPath, ChunkData); + } + WrittenByteCount.fetch_add(ChunkData.GetSize()); + ++FileCount; + } + else + { + throw std::runtime_error(fmt::format("Unable to fetch '{}' (chunk {}). Reason: '{}'", + FileName, + ChunkId, + ChunkResponse.ErrorMessage(""sv))); + } + } + catch (const std::exception& Ex) { - WriteFile(TargetPath, ChunkData); + AbortFlag.store(true); + ZEN_CONSOLE("Failed writing file to '{}'. Reason: '{}'", TargetPath, Ex.what()); } - WrittenByteCount.fetch_add(ChunkData.GetSize()); - ++FileCount; - } - else - { - ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'", - FileName, - ChunkId, - ChunkResponse.ErrorMessage(""sv)); } }); } @@ -2096,28 +2111,31 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg uint64_t Remaining = Entries.Num(); for (auto EntryIter : Entries) { - CbObjectView Entry = EntryIter.AsObjectView(); - ParseProgressBar.UpdateState( - {.Task = "Parsing oplog", .Details = "", .TotalCount = Entries.Num(), .RemainingCount = Remaining}, - false); - Remaining--; - if (!m_KeyFilter.empty()) + if (!AbortFlag) { - if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos) + CbObjectView Entry = EntryIter.AsObjectView(); + ParseProgressBar.UpdateState( + {.Task = "Parsing oplog", .Details = "", .TotalCount = Entries.Num(), .RemainingCount = Remaining}, + false); + Remaining--; + if (!m_KeyFilter.empty()) + { + if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos) + { + continue; + } + } + if (!EmitProgressBar) { - continue; + EmitProgressBar = std::make_unique<ProgressBar>(ProgressBar::Mode::Pretty, ""sv); + WriteStopWatch.Reset(); } - } - if (!EmitProgressBar) - { - EmitProgressBar = std::make_unique<ProgressBar>(ProgressBar::Mode::Pretty, ""sv); - WriteStopWatch.Reset(); - } - EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView()); - EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView()); + EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView()); + EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView()); - ++OplogEntryCount; + ++OplogEntryCount; + } } ParseProgressBar.Finish(); } @@ -2149,6 +2167,12 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { WorkRemaining.Wait(); } + + if (AbortFlag) + { + // Error has already been reported by async code + return 1; + } } else { @@ -2162,8 +2186,6 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return 1; } - DeleteDirectories(TmpPath); - ZEN_CONSOLE("mirrored {} files from {} oplog entries successfully", FileCount.load(), OplogEntryCount); return 0; |