aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/projectstore_cmd.cpp
diff options
context:
space:
mode:
authorDan Engelbrecht <[email protected]>2024-11-06 09:08:02 +0100
committerGitHub Enterprise <[email protected]>2024-11-06 09:08:02 +0100
commit9285f6d0b00d720957b69b5ecd464cce1dee89bf (patch)
tree08518888b701e54059cfec0de5f56223c332d538 /src/zen/cmds/projectstore_cmd.cpp
parentsponsor process attach hardening (#208) (diff)
downloadarchived-zen-9285f6d0b00d720957b69b5ecd464cce1dee89bf.tar.xz
archived-zen-9285f6d0b00d720957b69b5ecd464cce1dee89bf.zip
Improved oplog import/export progress indicator at commandline (#206)
Nicer progress bar during oplog import/export Verify that oplog has not been deleted from disk behind our back
Diffstat (limited to 'src/zen/cmds/projectstore_cmd.cpp')
-rw-r--r--src/zen/cmds/projectstore_cmd.cpp208
1 files changed, 156 insertions, 52 deletions
diff --git a/src/zen/cmds/projectstore_cmd.cpp b/src/zen/cmds/projectstore_cmd.cpp
index 8573c36aa..4e2d5123d 100644
--- a/src/zen/cmds/projectstore_cmd.cpp
+++ b/src/zen/cmds/projectstore_cmd.cpp
@@ -9,6 +9,7 @@
#include <zencore/logging.h>
#include <zencore/scopeguard.h>
#include <zencore/stream.h>
+#include <zencore/timer.h>
#include <zencore/workthreadpool.h>
#include <zenhttp/formatters.h>
#include <zenhttp/httpclient.h>
@@ -79,7 +80,7 @@ namespace {
}
}
- void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload)
+ void ExecuteAsyncOperation(HttpClient& Http, std::string_view Url, IoBuffer&& Payload, bool PlainProgress)
{
signal(SIGINT, SignalCallbackHandler);
#if ZEN_PLATFORM_WINDOWS
@@ -98,8 +99,20 @@ namespace {
throw std::runtime_error(fmt::format("invalid job id returned, received '{}'", JobIdText));
}
- std::string LastCurrentOp;
- uint32_t LastCurrentOpPercentComplete = 0;
+ ProgressBar ProgressBar(PlainProgress);
+
+ auto OuputMessages = [&](CbObjectView StatusObject) {
+ CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
+ if (Messages.Num() > 0)
+ {
+ ProgressBar.ForceLinebreak();
+ for (auto M : Messages)
+ {
+ std::string_view Message = M.AsString();
+ ZEN_CONSOLE("{}", Message);
+ }
+ }
+ };
uint64_t JobId = JobIdMaybe.value();
while (true)
@@ -110,27 +123,47 @@ namespace {
{
StatusResult.ThrowError("failed to create project"sv);
}
+
CbObject StatusObject = StatusResult.AsObject();
std::string_view Status = StatusObject["Status"sv].AsString();
+ bool MessagesDone = false;
+
if (Status == "Running")
{
- std::string_view CurrentOp = StatusObject["CurrentOp"sv].AsString();
- uint32_t CurrentOpPercentComplete = StatusObject["CurrentOpPercentComplete"sv].AsUInt32();
- if (CurrentOp != LastCurrentOp || CurrentOpPercentComplete != LastCurrentOpPercentComplete)
+ std::string_view CurrentOp = StatusObject["Op"sv].AsString();
+ std::string_view CurrentOpDetails = StatusObject["Details"sv].AsString();
+ uint64_t TotalCount = StatusObject["TotalCount"sv].AsUInt64();
+ uint64_t RemainingCount = StatusObject["RemainingCount"sv].AsUInt64();
+
+ if (!ProgressBar.IsSameTask(CurrentOp))
{
- LastCurrentOp = CurrentOp;
- LastCurrentOpPercentComplete = CurrentOpPercentComplete;
- ZEN_CONSOLE("{} {}%", CurrentOp, CurrentOpPercentComplete);
+ ProgressBar.Finish();
}
+
+ if (!ProgressBar.HasActiveTask())
+ {
+ OuputMessages(StatusObject);
+ MessagesDone = true;
+ }
+
+ ProgressBar.UpdateState({.Task = std::string(CurrentOp),
+ .Details = std::string(CurrentOpDetails),
+ .TotalCount = TotalCount,
+ .RemainingCount = RemainingCount},
+ false);
}
- CbArrayView Messages = StatusObject["Messages"sv].AsArrayView();
- for (auto M : Messages)
+ if ((Status == "Complete") || (Status == "Aborted"))
{
- std::string_view Message = M.AsString();
- ZEN_CONSOLE("{}", Message);
+ ProgressBar.Finish();
}
+
+ if (!MessagesDone)
+ {
+ OuputMessages(StatusObject);
+ }
+
if (Status == "Complete")
{
if (Cancelled)
@@ -178,16 +211,25 @@ namespace {
#endif // ZEN_PLATFORM_WINDOWS
if (HttpClient::Response DeleteResult = Http.Delete(fmt::format("/admin/jobs/{}", JobId)))
{
+ ProgressBar.ForceLinebreak();
ZEN_CONSOLE("Requested cancel...");
Cancelled = true;
}
else
{
+ ProgressBar.ForceLinebreak();
ZEN_CONSOLE("Failed cancelling job {}", DeleteResult);
}
continue;
}
- Sleep(100);
+ if (PlainProgress)
+ {
+ Sleep(5000);
+ }
+ else
+ {
+ Sleep(500);
+ }
}
}
else
@@ -749,7 +791,7 @@ ExportOplogCommand::ExportOplogCommand()
m_Options.add_option("",
"",
"ignore-missing-attachments",
- "Continue importing oplog even if attachments are missing",
+ "Continue exporting oplog even if attachments are missing",
cxxopts::value(m_IgnoreMissingAttachments),
"<ignore>");
m_Options.add_option("",
@@ -818,6 +860,8 @@ ExportOplogCommand::ExportOplogCommand()
cxxopts::value(m_FileForceEnableTempBlocks),
"<forcetempblocks>");
+ m_Options.add_option("", "", "plainprogress", "Use (legacy) plain progress update", cxxopts::value(m_PlainProgress), "<plainprogress>");
+
m_Options.parse_positional({"project", "oplog"});
}
@@ -1113,7 +1157,7 @@ ExportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
else
{
- ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload));
+ ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress);
}
return 0;
}
@@ -1182,6 +1226,8 @@ ImportOplogCommand::ImportOplogCommand()
m_Options.add_option("", "", "file", "Local folder path", cxxopts::value(m_FileDirectoryPath), "<path>");
m_Options.add_option("file", "", "name", "Local file name", cxxopts::value(m_FileName), "<filename>");
+ m_Options.add_option("", "", "plainprogress", "Use (legacy) plain progress update", cxxopts::value(m_PlainProgress), "<plainprogress>");
+
m_Options.parse_positional({"project", "oplog", "gcpath"});
m_Options.positional_help("[<projectid> <oplogid> [<gcpath>]]");
}
@@ -1332,6 +1378,10 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
Writer.AddBool("clean"sv, true);
}
+ if (m_Force)
+ {
+ Writer.AddBool("force"sv, true);
+ }
if (!m_FileDirectoryPath.empty())
{
Writer.BeginObject("file"sv);
@@ -1421,7 +1471,7 @@ ImportOplogCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
}
else
{
- ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload));
+ ExecuteAsyncOperation(Http, fmt::format("/prj/{}/oplog/{}/rpc", m_ProjectName, m_OplogName), std::move(Payload), m_PlainProgress);
}
return 0;
}
@@ -1761,8 +1811,12 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
size_t WorkerCount = Min(std::thread::hardware_concurrency(), 16u);
WorkerThreadPool WorkerPool(gsl::narrow<int>(WorkerCount));
Latch WorkRemaining(1);
+ size_t EmitCount = 0;
std::unordered_set<std::u8string> FileNames;
+ std::atomic<uint64_t> WrittenByteCount = 0;
+
+ Stopwatch WriteStopWatch;
auto EmitFilesForDataArray = [&](CbArrayView DataArray) {
for (auto DataIter : DataArray)
@@ -1783,36 +1837,43 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
continue;
}
+ EmitCount++;
WorkRemaining.AddCount(1);
- WorkerPool.ScheduleWork([this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining]() {
- auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); });
- if (HttpClient::Response ChunkResponse =
- Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath))
- {
- auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer {
- IoHash RawHash;
- uint64_t RawSize;
- if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize))
- {
- return Compressed.Decompress().AsIoBuffer();
+ WorkerPool.ScheduleWork(
+ [this, &RootPath, FileName, &FileCount, ChunkId, &Http, TmpPath, &WorkRemaining, &WrittenByteCount]() {
+ auto _ = MakeGuard([&WorkRemaining]() { WorkRemaining.CountDown(); });
+ if (HttpClient::Response ChunkResponse =
+ Http.Download(fmt::format("/prj/{}/oplog/{}/{}"sv, m_ProjectName, m_OplogName, ChunkId), TmpPath))
+ {
+ auto TryDecompress = [](const IoBuffer& Buffer) -> IoBuffer {
+ IoHash RawHash;
+ uint64_t RawSize;
+ if (CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer(Buffer), RawHash, RawSize))
+ {
+ return Compressed.Decompress().AsIoBuffer();
+ };
+ return std::move(Buffer);
};
- return std::move(Buffer);
- };
- IoBuffer ChunkData = m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload;
+ IoBuffer ChunkData =
+ m_Decompress ? TryDecompress(ChunkResponse.ResponsePayload) : ChunkResponse.ResponsePayload;
- std::filesystem::path TargetPath = RootPath / FileName;
- if (!MoveToFile(TargetPath, ChunkData))
+ std::filesystem::path TargetPath = RootPath / FileName;
+ if (!MoveToFile(TargetPath, ChunkData))
+ {
+ WriteFile(TargetPath, ChunkData);
+ }
+ WrittenByteCount.fetch_add(ChunkData.GetSize());
+ ++FileCount;
+ }
+ else
{
- WriteFile(TargetPath, ChunkData);
+ ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'",
+ FileName,
+ ChunkId,
+ ChunkResponse.ErrorMessage(""sv));
}
- ++FileCount;
- }
- else
- {
- ZEN_CONSOLE("Unable to fetch '{}' (chunk {}). Reason: '{}'", FileName, ChunkId, ChunkResponse.ErrorMessage(""sv));
- }
- });
+ });
}
}
};
@@ -1823,21 +1884,66 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
{
if (CbObject ResponseObject = Response.AsObject())
{
- for (auto EntryIter : ResponseObject["entries"sv])
+ std::unique_ptr<ProgressBar> EmitProgressBar;
{
- CbObjectView Entry = EntryIter.AsObjectView();
- if (!m_KeyFilter.empty())
+ ProgressBar ParseProgressBar(false);
+ CbArrayView Entries = ResponseObject["entries"sv].AsArrayView();
+ uint64_t Remaining = Entries.Num();
+ for (auto EntryIter : Entries)
{
- // ZEN_CONSOLE("{}", Entry["key"].AsString());
- if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos)
+ CbObjectView Entry = EntryIter.AsObjectView();
+ ParseProgressBar.UpdateState(
+ {.Task = "Parsing oplog", .Details = "", .TotalCount = Entries.Num(), .RemainingCount = Remaining},
+ false);
+ Remaining--;
+ if (!m_KeyFilter.empty())
{
- continue;
+ // ZEN_CONSOLE("{}", Entry["key"].AsString());
+ if (Entry["key"].AsString().find(m_KeyFilter) == std::string_view::npos)
+ {
+ continue;
+ }
+ }
+ if (!EmitProgressBar)
+ {
+ EmitProgressBar = std::make_unique<ProgressBar>(false);
+ WriteStopWatch.Reset();
}
+
+ EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView());
+ EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView());
+
+ ++OplogEntryCount;
}
- EmitFilesForDataArray(Entry["packagedata"sv].AsArrayView());
- EmitFilesForDataArray(Entry["bulkdata"sv].AsArrayView());
+ ParseProgressBar.Finish();
+ }
- ++OplogEntryCount;
+ WorkRemaining.CountDown();
+ if (EmitProgressBar)
+ {
+ while (!WorkRemaining.Wait(200))
+ {
+ uint64_t EmitRemaining = gsl::narrow<uint64_t>(WorkRemaining.Remaining());
+
+ uint64_t WrittenBytes = WrittenByteCount.load();
+ uint64_t ElapsedTimeMS = WriteStopWatch.GetElapsedTimeMs();
+ uint64_t BytesPerSecond = ElapsedTimeMS > 0 ? (1000 * WrittenBytes) / ElapsedTimeMS : WrittenBytes;
+
+ EmitProgressBar->UpdateState({.Task = "Writing files",
+ .Details = fmt::format("{}/{} files, {}/sec ({})",
+ EmitCount - EmitRemaining,
+ EmitCount,
+ NiceBytes(BytesPerSecond),
+ NiceBytes(WrittenByteCount.load())),
+ .TotalCount = EmitCount,
+ .RemainingCount = EmitRemaining},
+ false);
+ }
+ EmitProgressBar->Finish();
+ }
+ else
+ {
+ WorkRemaining.Wait();
}
}
else
@@ -1851,8 +1957,6 @@ OplogMirrorCommand::Run(const ZenCliOptions& GlobalOptions, int argc, char** arg
return 1;
}
- WorkRemaining.CountDown();
- WorkRemaining.Wait();
std::filesystem::remove_all(TmpPath);