diff options
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/exec_cmd.cpp | 80 |
1 files changed, 53 insertions, 27 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp index 9719fce77..89bbf3638 100644 --- a/src/zen/cmds/exec_cmd.cpp +++ b/src/zen/cmds/exec_cmd.cpp @@ -23,6 +23,8 @@ #include <zenhttp/httpclient.h> #include <zenhttp/packageformat.h> +#include "../progressbar.h" + #include <EASTL/hash_map.h> #include <EASTL/hash_set.h> #include <EASTL/map.h> @@ -124,13 +126,14 @@ struct ExecSessionConfig std::vector<ExecFunctionDefinition>& FunctionList; // mutable for EmitFunctionListOnce std::string_view OrchestratorUrl; const std::filesystem::path& OutputPath; - int Offset = 0; - int Stride = 1; - int Limit = 0; - bool Verbose = false; - bool Quiet = false; - bool DumpActions = false; - bool Binary = false; + int Offset = 0; + int Stride = 1; + int Limit = 0; + bool Verbose = false; + bool Quiet = false; + bool DumpActions = false; + bool Binary = false; + ProgressBar::Mode ProgressMode = ProgressBar::Mode::PrettyScroll; }; ////////////////////////////////////////////////////////////////////////// @@ -345,8 +348,6 @@ ExecSessionRunner::DrainCompletedJobs() } m_PendingJobs.Remove(CompleteLsn); - - ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize()); } } } @@ -897,17 +898,20 @@ ExecSessionRunner::Run() // Then submit work items - int FailedWorkCounter = 0; - size_t RemainingWorkItems = m_Config.RecordingReader.GetActionCount(); - int SubmittedWorkItems = 0; + std::atomic<int> FailedWorkCounter{0}; + std::atomic<size_t> RemainingWorkItems{m_Config.RecordingReader.GetActionCount()}; + std::atomic<int> SubmittedWorkItems{0}; + size_t TotalWorkItems = RemainingWorkItems.load(); - ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + ProgressBar SubmitProgress(m_Config.ProgressMode, "Submit"); + SubmitProgress.UpdateState({.Task = "Submitting work items", .TotalCount = TotalWorkItems, .RemainingCount = RemainingWorkItems.load()}, + false); int OffsetCounter = m_Config.Offset; int StrideCounter = m_Config.Stride; auto ShouldSchedule = [&]() -> bool { - if (m_Config.Limit && SubmittedWorkItems >= m_Config.Limit) + if (m_Config.Limit && SubmittedWorkItems.load() >= m_Config.Limit) { // Limit reached, ignore @@ -1005,17 +1009,14 @@ ExecSessionRunner::Run() { const int32_t LsnField = EnqueueResult.Lsn; - --RemainingWorkItems; - ++SubmittedWorkItems; + size_t Remaining = --RemainingWorkItems; + int Submitted = ++SubmittedWorkItems; - if (!m_Config.Quiet) - { - ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", - SubmittedWorkItems, - LsnField, - NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), - RemainingWorkItems); - } + SubmitProgress.UpdateState({.Task = "Submitting work items", + .Details = fmt::format("#{} LSN {}", Submitted, LsnField), + .TotalCount = TotalWorkItems, + .RemainingCount = Remaining}, + false); if (!m_Config.OutputPath.empty()) { @@ -1055,22 +1056,36 @@ ExecSessionRunner::Run() }, TargetParallelism); + SubmitProgress.Finish(); + // Wait until all pending work is complete + size_t TotalPendingJobs = m_PendingJobs.GetSize(); + + ProgressBar CompletionProgress(m_Config.ProgressMode, "Execute"); + while (!m_PendingJobs.IsEmpty()) { - // TODO: improve this logic - zen::Sleep(500); + size_t PendingCount = m_PendingJobs.GetSize(); + CompletionProgress.UpdateState({.Task = "Executing work items", + .Details = fmt::format("{} completed, {} remaining", TotalPendingJobs - PendingCount, PendingCount), + .TotalCount = TotalPendingJobs, + .RemainingCount = PendingCount}, + false); + + zen::Sleep(GetUpdateDelayMS(m_Config.ProgressMode)); DrainCompletedJobs(); SendOrchestratorHeartbeat(); } + CompletionProgress.Finish(); + // Write summary files WriteSummaryFiles(); - if (FailedWorkCounter) + if (FailedWorkCounter.load()) { return 1; } @@ -1423,6 +1438,16 @@ ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions) int ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl) { + ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty; + if (m_VerboseLogging) + { + ProgressMode = ProgressBar::Mode::Plain; + } + else if (m_QuietLogging) + { + ProgressMode = ProgressBar::Mode::Quiet; + } + ExecSessionConfig Config{ .Resolver = *m_ChunkResolver, .RecordingReader = *m_RecordingReader, @@ -1437,6 +1462,7 @@ ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std .Quiet = m_QuietLogging, .DumpActions = m_DumpActions, .Binary = m_Binary, + .ProgressMode = ProgressMode, }; ExecSessionRunner Runner(ComputeSession, Config); |