diff options
| author | Dan Engelbrecht <[email protected]> | 2024-11-06 09:08:02 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2024-11-06 09:08:02 +0100 |
| commit | 9285f6d0b00d720957b69b5ecd464cce1dee89bf (patch) | |
| tree | 08518888b701e54059cfec0de5f56223c332d538 /src/zen/cmds/projectstore_cmd.cpp | |
| parent | sponsor process attach hardening (#208) (diff) | |
| download | archived-zen-9285f6d0b00d720957b69b5ecd464cce1dee89bf.tar.xz archived-zen-9285f6d0b00d720957b69b5ecd464cce1dee89bf.zip | |
Improved oplog import/export progress indicator at commandline (#206)
Nicer progress bar during oplog import/export
Verify that oplog has not been deleted from disk behind our back
Diffstat (limited to 'src/zen/cmds/projectstore_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/projectstore_cmd.cpp | 208 |
1 files changed, 156 insertions, 52 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp index 8573c36aa..4e2d5123d 100644 --- a/src/zen/cmds/projectstore_cmd.cpp +++ b/src/zen/cmds/projectstore_cmd.cpp @@ -9,6 +9,7 @@ #include <zencore/logging.h> #include <zencore/scopeguard.h> #include <zencore/stream.h> +#include <zencore/timer.h> #include <zencore/workthreadpool.h> #include <zenhttp/formatters.h> #include <zenhttp/httpclient.h> @@ -79,7 +80,7 @@ namespace { } } - void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload) + void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload, bool PlainProgress) { signal(SIGINT, SignalCallbackHandler); #if ZEN_PLATFORM_WINDOWS @@ -98,8 +99,20 @@ namespace { throw std::runtime_error(fmt::format("invalid job id returned, received '{}'", JobIdText)); } - std::string LastCurrentOp; - uint32_t LastCurrentOpPercentComplete = 0; + ProgressBar ProgressBar(PlainProgress); + + auto OuputMessages = [&](CbObjectView StatusObject) { + CbArrayView Messages = StatusObject["Messages"sv].AsArrayView(); + if (Messages.Num() > 0) + { + ProgressBar.ForceLinebreak(); + for (auto M : Messages) + { + std::string_view Message = M.AsString(); + ZEN_CONSOLE("{}", Message); + } + } + }; uint64_t JobId = JobIdMaybe.value(); while (true) @@ -110,27 +123,47 @@ namespace { { StatusResult.ThrowError("failed to create project"sv); } + CbObject StatusObject = StatusResult.AsObject(); std::string_view Status = StatusObject["Status"sv].AsString(); + bool MessagesDone = false; + if (Status == "Running") { - std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString(); - uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32(); - if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete) + std::string_view CurrentOp = StatusObject["Op"sv].AsString(); + std::string_view CurrentOpDetails = StatusObject["Details"sv].AsString(); + uint64_t TotalCount = StatusObject["TotalCount"sv].AsUInt64(); + uint64_t RemainingCount = StatusObject["RemainingCount"sv].AsUInt64(); + + if (!ProgressBar.IsSameTask(CurrentOp)) { - LastCurrentOp = CurrentOp; - LastCurrentOpPercentComplete = CurrentOpPercentComplete; - ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete); + ProgressBar.Finish(); } + + if (!ProgressBar.HasActiveTask()) + { + OuputMessages(StatusObject); + MessagesDone = true; + } + + ProgressBar.UpdateState({.Task = std::string(CurrentOp), + .Details = std::string(CurrentOpDetails), + .TotalCount = TotalCount, + .RemainingCount = RemainingCount}, + false); } - CbArrayView Messages = StatusObject["Messages"sv].AsArrayView(); - for (auto M : Messages) + if ((Status == "Complete") || (Status == "Aborted")) { - std::string_view Message = M.AsString(); - ZEN_CONSOLE("{}", Message); + ProgressBar.Finish(); } + + if (!MessagesDone) + { + OuputMessages(StatusObject); + } + if (Status == "Complete") { if (Cancelled) @@ -178,16 +211,25 @@ namespace { #endif // ZEN_PLATFORM_WINDOWS if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId))) { + ProgressBar.ForceLinebreak(); ZEN_CONSOLE("Requested cancel..."); Cancelled = true; } else { + ProgressBar.ForceLinebreak(); ZEN_CONSOLE("Failed cancelling job {}", DeleteResult); } continue; } - Sleep(100); + if (PlainProgress) + { + Sleep(5000); + } + else + { + Sleep(500); + } } } else @@ -749,7 +791,7 @@ ExportOplogCommand::ExportOplogCommand() m_Options.add_option("", "", "ignore-missing-attachments", - "Continue importing oplog even if attachments are missing", + "Continue exporting oplog even if attachments are missing", cxxopts::value(m_IgnoreMissingAttachments), "<ignore>"); m_Options.add_option("", @@ -818,6 +860,8 @@ ExportOplogCommand::ExportOplogCommand() cxxopts::value(m_FileForceEnableTempBlocks), "<forcetempblocks>"); + m_Options.add_option("", "", "plainprogress", "Use (legacy) plain progress update", cxxopts::value(m_PlainProgress), "<plainprogress>"); + m_Options.parse_positional({"project", "oplog"}); } @@ -1113,7 +1157,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg } else { - ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); + ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress); } return 0; } @@ -1182,6 +1226,8 @@ ImportOplogCommand::ImportOplogCommand() m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>"); m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>"); + m_Options.add_option("", "", "plainprogress", "Use (legacy) plain progress update", cxxopts::value(m_PlainProgress), "<plainprogress>"); + m_Options.parse_positional({"project", "oplog", "gcpath"}); m_Options.positional_help("[<projectid> <oplogid> [<gcpath>]]"); } @@ -1332,6 +1378,10 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { Writer.AddBool("clean"sv, true); } + if (m_Force) + { + Writer.AddBool("force"sv, true); + } if (!m_FileDirectoryPath.empty()) { Writer.BeginObject("file"sv); @@ -1421,7 +1471,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg } else { - ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload)); + ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress); } return 0; } @@ -1761,8 +1811,12 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u); WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount)); Latch WorkRemaining(1); + size_t EmitCount = 0; std::unordered_set<std::u8string> FileNames; + std::atomic<uint64_t> WrittenByteCount = 0; + + Stopwatch WriteStopWatch; auto EmitFilesForDataArray = [&](CbArrayView DataArray) { for (auto DataIter : DataArray) @@ -1783,36 +1837,43 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { continue; } + EmitCount++; WorkRemaining.AddCount(1); - WorkerPool.ScheduleWork([this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining]() { - auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); }); - if (HttpClient::Response ChunkResponse = - Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath)) - { - auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { - IoHash RawHash; - uint64_t RawSize; - if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) - { - return Compressed.Decompress().AsIoBuffer(); + WorkerPool.ScheduleWork( + [this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining, &WrittenByteCount]() { + auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); }); + if (HttpClient::Response ChunkResponse = + Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath)) + { + auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer { + IoHash RawHash; + uint64_t RawSize; + if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize)) + { + return Compressed.Decompress().AsIoBuffer(); + }; + return std::move(Buffer); }; - return std::move(Buffer); - }; - IoBuffer ChunkData = m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload; + IoBuffer ChunkData = + m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload; - std::filesystem::path TargetPath = RootPath / FileName; - if (!MoveToFile(TargetPath, ChunkData)) + std::filesystem::path TargetPath = RootPath / FileName; + if (!MoveToFile(TargetPath, ChunkData)) + { + WriteFile(TargetPath, ChunkData); + } + WrittenByteCount.fetch_add(ChunkData.GetSize()); + ++FileCount; + } + else { - WriteFile(TargetPath, ChunkData); + ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'", + FileName, + ChunkId, + ChunkResponse.ErrorMessage(""sv)); } - ++FileCount; - } - else - { - ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'", FileName, ChunkId, ChunkResponse.ErrorMessage(""sv)); - } - }); + }); } } }; @@ -1823,21 +1884,66 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg { if (CbObject ResponseObject = Response.AsObject()) { - for (auto EntryIter : ResponseObject["entries"sv]) + std::unique_ptr<ProgressBar> EmitProgressBar; { - CbObjectView Entry = EntryIter.AsObjectView(); - if (!m_KeyFilter.empty()) + ProgressBar ParseProgressBar(false); + CbArrayView Entries = ResponseObject["entries"sv].AsArrayView(); + uint64_t Remaining = Entries.Num(); + for (auto EntryIter : Entries) { - // ZEN_CONSOLE("{}", Entry["key"].AsString()); - if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos) + CbObjectView Entry = EntryIter.AsObjectView(); + ParseProgressBar.UpdateState( + {.Task = "Parsing oplog", .Details = "", .TotalCount = Entries.Num(), .RemainingCount = Remaining}, + false); + Remaining--; + if (!m_KeyFilter.empty()) { - continue; + // ZEN_CONSOLE("{}", Entry["key"].AsString()); + if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos) + { + continue; + } + } + if (!EmitProgressBar) + { + EmitProgressBar = std::make_unique<ProgressBar>(false); + WriteStopWatch.Reset(); } + + EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView()); + EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView()); + + ++OplogEntryCount; } - EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView()); - EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView()); + ParseProgressBar.Finish(); + } - ++OplogEntryCount; + WorkRemaining.CountDown(); + if (EmitProgressBar) + { + while (!WorkRemaining.Wait(200)) + { + uint64_t EmitRemaining = gsl::narrow<uint64_t>(WorkRemaining.Remaining()); + + uint64_t WrittenBytes = WrittenByteCount.load(); + uint64_t ElapsedTimeMS = WriteStopWatch.GetElapsedTimeMs(); + uint64_t BytesPerSecond = ElapsedTimeMS > 0 ? (1000 * WrittenBytes) / ElapsedTimeMS : WrittenBytes; + + EmitProgressBar->UpdateState({.Task = "Writing files", + .Details = fmt::format("{}/{} files, {}/sec ({})", + EmitCount - EmitRemaining, + EmitCount, + NiceBytes(BytesPerSecond), + NiceBytes(WrittenByteCount.load())), + .TotalCount = EmitCount, + .RemainingCount = EmitRemaining}, + false); + } + EmitProgressBar->Finish(); + } + else + { + WorkRemaining.Wait(); } } else @@ -1851,8 +1957,6 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg return 1; } - WorkRemaining.CountDown(); - WorkRemaining.Wait(); std::filesystem::remove_all(TmpPath); |