aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp80
1 files changed, 53 insertions, 27 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
index 9719fce77..89bbf3638 100644
--- a/src/zen/cmds/exec_cmd.cpp
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -23,6 +23,8 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
+#include "../progressbar.h"
+
#include <EASTL/hash_map.h>
#include <EASTL/hash_set.h>
#include <EASTL/map.h>
@@ -124,13 +126,14 @@ struct ExecSessionConfig
std::vector<ExecFunctionDefinition>& FunctionList; // mutable for EmitFunctionListOnce
std::string_view OrchestratorUrl;
const std::filesystem::path& OutputPath;
- int Offset = 0;
- int Stride = 1;
- int Limit = 0;
- bool Verbose = false;
- bool Quiet = false;
- bool DumpActions = false;
- bool Binary = false;
+ int Offset = 0;
+ int Stride = 1;
+ int Limit = 0;
+ bool Verbose = false;
+ bool Quiet = false;
+ bool DumpActions = false;
+ bool Binary = false;
+ ProgressBar::Mode ProgressMode = ProgressBar::Mode::PrettyScroll;
};
//////////////////////////////////////////////////////////////////////////
@@ -345,8 +348,6 @@ ExecSessionRunner::DrainCompletedJobs()
}
m_PendingJobs.Remove(CompleteLsn);
-
- ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize());
}
}
}
@@ -897,17 +898,20 @@ ExecSessionRunner::Run()
// Then submit work items
- int FailedWorkCounter = 0;
- size_t RemainingWorkItems = m_Config.RecordingReader.GetActionCount();
- int SubmittedWorkItems = 0;
+ std::atomic<int> FailedWorkCounter{0};
+ std::atomic<size_t> RemainingWorkItems{m_Config.RecordingReader.GetActionCount()};
+ std::atomic<int> SubmittedWorkItems{0};
+ size_t TotalWorkItems = RemainingWorkItems.load();
- ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+ ProgressBar SubmitProgress(m_Config.ProgressMode, "Submit");
+ SubmitProgress.UpdateState({.Task = "Submitting work items", .TotalCount = TotalWorkItems, .RemainingCount = RemainingWorkItems.load()},
+ false);
int OffsetCounter = m_Config.Offset;
int StrideCounter = m_Config.Stride;
auto ShouldSchedule = [&]() -> bool {
- if (m_Config.Limit && SubmittedWorkItems >= m_Config.Limit)
+ if (m_Config.Limit && SubmittedWorkItems.load() >= m_Config.Limit)
{
// Limit reached, ignore
@@ -1005,17 +1009,14 @@ ExecSessionRunner::Run()
{
const int32_t LsnField = EnqueueResult.Lsn;
- --RemainingWorkItems;
- ++SubmittedWorkItems;
+ size_t Remaining = --RemainingWorkItems;
+ int Submitted = ++SubmittedWorkItems;
- if (!m_Config.Quiet)
- {
- ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
- SubmittedWorkItems,
- LsnField,
- NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
- RemainingWorkItems);
- }
+ SubmitProgress.UpdateState({.Task = "Submitting work items",
+ .Details = fmt::format("#{} LSN {}", Submitted, LsnField),
+ .TotalCount = TotalWorkItems,
+ .RemainingCount = Remaining},
+ false);
if (!m_Config.OutputPath.empty())
{
@@ -1055,22 +1056,36 @@ ExecSessionRunner::Run()
},
TargetParallelism);
+ SubmitProgress.Finish();
+
// Wait until all pending work is complete
+ size_t TotalPendingJobs = m_PendingJobs.GetSize();
+
+ ProgressBar CompletionProgress(m_Config.ProgressMode, "Execute");
+
while (!m_PendingJobs.IsEmpty())
{
- // TODO: improve this logic
- zen::Sleep(500);
+ size_t PendingCount = m_PendingJobs.GetSize();
+ CompletionProgress.UpdateState({.Task = "Executing work items",
+ .Details = fmt::format("{} completed, {} remaining", TotalPendingJobs - PendingCount, PendingCount),
+ .TotalCount = TotalPendingJobs,
+ .RemainingCount = PendingCount},
+ false);
+
+ zen::Sleep(GetUpdateDelayMS(m_Config.ProgressMode));
DrainCompletedJobs();
SendOrchestratorHeartbeat();
}
+ CompletionProgress.Finish();
+
// Write summary files
WriteSummaryFiles();
- if (FailedWorkCounter)
+ if (FailedWorkCounter.load())
{
return 1;
}
@@ -1423,6 +1438,16 @@ ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions)
int
ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl)
{
+ ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
+ if (m_VerboseLogging)
+ {
+ ProgressMode = ProgressBar::Mode::Plain;
+ }
+ else if (m_QuietLogging)
+ {
+ ProgressMode = ProgressBar::Mode::Quiet;
+ }
+
ExecSessionConfig Config{
.Resolver = *m_ChunkResolver,
.RecordingReader = *m_RecordingReader,
@@ -1437,6 +1462,7 @@ ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std
.Quiet = m_QuietLogging,
.DumpActions = m_DumpActions,
.Binary = m_Binary,
+ .ProgressMode = ProgressMode,
};
ExecSessionRunner Runner(ComputeSession, Config);