aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2025-05-12 10:07:40 +0200
committerGitHub Enterprise <[email protected]>2025-05-12 10:07:40 +0200
commitacc0b59e9cb349e3f9345467a3941ee25786d79e (patch)
tree5ae9b0337063cdc206bc050738fc2bed4b330bf9 /src
parent5.6.8-pre0 (diff)
downloadzen-acc0b59e9cb349e3f9345467a3941ee25786d79e.tar.xz
zen-acc0b59e9cb349e3f9345467a3941ee25786d79e.zip
handle exception in oplog mirror (#389)
* gracefully handle errors in threaded part of oplog-mirror
Diffstat (limited to 'src')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp114
1 files changed, 68 insertions, 46 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 540c4c9ac..066dac781 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -2005,6 +2005,7 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
std::filesystem::path TmpPath = RootPath / ".tmp";
CreateDirectories(TmpPath);
+ auto _ = MakeGuard([&TmpPath]() { DeleteDirectories(TmpPath); });
std::atomic_int64_t FileCount = 0;
int OplogEntryCount = 0;
@@ -2017,6 +2018,8 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
std::unordered_set<std::u8string> FileNames;
std::atomic<uint64_t> WrittenByteCount = 0;
+ std::atomic<bool> AbortFlag(false);
+
Stopwatch WriteStopWatch;
auto EmitFilesForDataArray = [&](CbArrayView DataArray) {
@@ -2045,38 +2048,50 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
EmitCount++;
WorkRemaining.AddCount(1);
WorkerPool.ScheduleWork(
- [this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining, &WrittenByteCount]() {
+ [this, &RootPath, &AbortFlag, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining, &WrittenByteCount]() {
auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); });
- if (HttpClient::Response ChunkResponse =
- Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath))
+ if (!AbortFlag)
{
- auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer {
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize))
- {
- return Compressed.Decompress().AsIoBuffer();
- };
- return std::move(Buffer);
- };
-
- IoBuffer ChunkData =
- m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload;
-
std::filesystem::path TargetPath = RootPath / FileName;
- if (!MoveToFile(TargetPath, ChunkData))
+ try
+ {
+ if (HttpClient::Response ChunkResponse =
+ Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath))
+ {
+ auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize))
+ {
+ return Compressed.Decompress().AsIoBuffer();
+ };
+ return std::move(Buffer);
+ };
+
+ IoBuffer ChunkData =
+ m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload;
+
+ if (!MoveToFile(TargetPath, ChunkData))
+ {
+ WriteFile(TargetPath, ChunkData);
+ }
+ WrittenByteCount.fetch_add(ChunkData.GetSize());
+ ++FileCount;
+ }
+ else
+ {
+ throw std::runtime_error(fmt::format("Unable to fetch '{}' (chunk {}). Reason: '{}'",
+ FileName,
+ ChunkId,
+ ChunkResponse.ErrorMessage(""sv)));
+ }
+ }
+ catch (const std::exception& Ex)
{
- WriteFile(TargetPath, ChunkData);
+ AbortFlag.store(true);
+ ZEN_CONSOLE("Failed writing file to '{}'. Reason: '{}'", TargetPath, Ex.what());
}
- WrittenByteCount.fetch_add(ChunkData.GetSize());
- ++FileCount;
- }
- else
- {
- ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'",
- FileName,
- ChunkId,
- ChunkResponse.ErrorMessage(""sv));
}
});
}
@@ -2096,28 +2111,31 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
uint64_t Remaining = Entries.Num();
for (auto EntryIter : Entries)
{
- CbObjectView Entry = EntryIter.AsObjectView();
- ParseProgressBar.UpdateState(
- {.Task = "Parsing oplog", .Details = "", .TotalCount = Entries.Num(), .RemainingCount = Remaining},
- false);
- Remaining--;
- if (!m_KeyFilter.empty())
+ if (!AbortFlag)
{
- if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos)
+ CbObjectView Entry = EntryIter.AsObjectView();
+ ParseProgressBar.UpdateState(
+ {.Task = "Parsing oplog", .Details = "", .TotalCount = Entries.Num(), .RemainingCount = Remaining},
+ false);
+ Remaining--;
+ if (!m_KeyFilter.empty())
+ {
+ if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos)
+ {
+ continue;
+ }
+ }
+ if (!EmitProgressBar)
{
- continue;
+ EmitProgressBar = std::make_unique<ProgressBar>(ProgressBar::Mode::Pretty, ""sv);
+ WriteStopWatch.Reset();
}
- }
- if (!EmitProgressBar)
- {
- EmitProgressBar = std::make_unique<ProgressBar>(ProgressBar::Mode::Pretty, ""sv);
- WriteStopWatch.Reset();
- }
- EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView());
- EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView());
+ EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView());
+ EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView());
- ++OplogEntryCount;
+ ++OplogEntryCount;
+ }
}
ParseProgressBar.Finish();
}
@@ -2149,6 +2167,12 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
WorkRemaining.Wait();
}
+
+ if (AbortFlag)
+ {
+ // Error has already been reported by async code
+ return 1;
+ }
}
else
{
@@ -2162,8 +2186,6 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return 1;
}
- DeleteDirectories(TmpPath);
-
ZEN_CONSOLE("mirrored {} files from {} oplog entries successfully", FileCount.load(), OplogEntryCount);
return 0;