diff options
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
| -rw-r--r-- | src/zen/cmds/exec_cmd.cpp | 89 |
1 files changed, 58 insertions, 31 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp index 9719fce77..60968b521 100644 --- a/src/zen/cmds/exec_cmd.cpp +++ b/src/zen/cmds/exec_cmd.cpp @@ -2,6 +2,8 @@ #include "exec_cmd.h" +#include "zenserviceclient.h" + #include <zencompute/computeservice.h> #include <zencompute/recordingreader.h> #include <zencore/compactbinary.h> @@ -23,6 +25,8 @@ #include <zenhttp/httpclient.h> #include <zenhttp/packageformat.h> +#include "consoleprogress.h" + #include <EASTL/hash_map.h> #include <EASTL/hash_set.h> #include <EASTL/map.h> @@ -114,7 +118,7 @@ namespace { } // namespace ////////////////////////////////////////////////////////////////////////// -// ExecSessionConfig — read-only configuration for a session run +// ExecSessionConfig - read-only configuration for a session run struct ExecSessionConfig { @@ -124,17 +128,18 @@ struct ExecSessionConfig std::vector<ExecFunctionDefinition>& FunctionList; // mutable for EmitFunctionListOnce std::string_view OrchestratorUrl; const std::filesystem::path& OutputPath; - int Offset = 0; - int Stride = 1; - int Limit = 0; - bool Verbose = false; - bool Quiet = false; - bool DumpActions = false; - bool Binary = false; + int Offset = 0; + int Stride = 1; + int Limit = 0; + bool Verbose = false; + bool Quiet = false; + bool DumpActions = false; + bool Binary = false; + ConsoleProgressMode ProgressMode = ConsoleProgressMode::Pretty; }; ////////////////////////////////////////////////////////////////////////// -// ExecSessionRunner — owns per-run state, drives the session lifecycle +// ExecSessionRunner - owns per-run state, drives the session lifecycle class ExecSessionRunner { @@ -345,8 +350,6 @@ ExecSessionRunner::DrainCompletedJobs() } m_PendingJobs.Remove(CompleteLsn); - - ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize()); } } } @@ -897,17 +900,22 @@ ExecSessionRunner::Run() // Then submit work items - int FailedWorkCounter = 0; - size_t RemainingWorkItems = m_Config.RecordingReader.GetActionCount(); - int SubmittedWorkItems = 0; + std::atomic<int> FailedWorkCounter{0}; + std::atomic<size_t> RemainingWorkItems{m_Config.RecordingReader.GetActionCount()}; + std::atomic<int> SubmittedWorkItems{0}; + size_t TotalWorkItems = RemainingWorkItems.load(); - ZEN_CONSOLE("submitting {} work items", RemainingWorkItems); + std::unique_ptr<ProgressBase> ProgressOwner(CreateConsoleProgress(m_Config.ProgressMode)); + std::unique_ptr<ProgressBase::ProgressBar> SubmitProgress = ProgressOwner->CreateProgressBar("Submit"); + SubmitProgress->UpdateState( + {.Task = "Submitting work items", .TotalCount = TotalWorkItems, .RemainingCount = RemainingWorkItems.load()}, + false); int OffsetCounter = m_Config.Offset; int StrideCounter = m_Config.Stride; auto ShouldSchedule = [&]() -> bool { - if (m_Config.Limit && SubmittedWorkItems >= m_Config.Limit) + if (m_Config.Limit && SubmittedWorkItems.load() >= m_Config.Limit) { // Limit reached, ignore @@ -1005,17 +1013,14 @@ ExecSessionRunner::Run() { const int32_t LsnField = EnqueueResult.Lsn; - --RemainingWorkItems; - ++SubmittedWorkItems; + size_t Remaining = --RemainingWorkItems; + int Submitted = ++SubmittedWorkItems; - if (!m_Config.Quiet) - { - ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining", - SubmittedWorkItems, - LsnField, - NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), - RemainingWorkItems); - } + SubmitProgress->UpdateState({.Task = "Submitting work items", + .Details = fmt::format("#{} LSN {}", Submitted, LsnField), + .TotalCount = TotalWorkItems, + .RemainingCount = Remaining}, + false); if (!m_Config.OutputPath.empty()) { @@ -1055,22 +1060,37 @@ ExecSessionRunner::Run() }, TargetParallelism); + SubmitProgress->Finish(); + // Wait until all pending work is complete + size_t TotalPendingJobs = m_PendingJobs.GetSize(); + + std::unique_ptr<ProgressBase::ProgressBar> CompletionProgress = ProgressOwner->CreateProgressBar("Execute"); + while (!m_PendingJobs.IsEmpty()) { - // TODO: improve this logic - zen::Sleep(500); + size_t PendingCount = m_PendingJobs.GetSize(); + CompletionProgress->UpdateState( + {.Task = "Executing work items", + .Details = fmt::format("{} completed, {} remaining", TotalPendingJobs - PendingCount, PendingCount), + .TotalCount = TotalPendingJobs, + .RemainingCount = PendingCount}, + false); + + zen::Sleep(ProgressOwner->GetProgressUpdateDelayMS()); DrainCompletedJobs(); SendOrchestratorHeartbeat(); } + CompletionProgress->Finish(); + // Write summary files WriteSummaryFiles(); - if (FailedWorkCounter) + if (FailedWorkCounter.load()) { return 1; } @@ -1089,7 +1109,7 @@ ExecHttpSubCmd::ExecHttpSubCmd(ExecCommand& Parent) : ZenSubCmdBase("http", "For void ExecHttpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) { - m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName); + ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = ExecCommand::Name}); ZEN_ASSERT(m_Parent.m_ChunkResolver); ChunkResolver& Resolver = *m_Parent.m_ChunkResolver; @@ -1097,7 +1117,7 @@ ExecHttpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/) std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp"); zen::compute::ComputeServiceSession ComputeSession(Resolver); - ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName); + ComputeSession.AddRemoteRunner(Resolver, TempPath, Service.HostSpec()); Stopwatch ExecTimer; int ReturnValue = m_Parent.RunSession(ComputeSession); @@ -1423,6 +1443,12 @@ ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions) int ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl) { + ConsoleProgressMode ProgressMode = ConsoleProgressMode::Pretty; + if (m_QuietLogging) + { + ProgressMode = ConsoleProgressMode::Quiet; + } + ExecSessionConfig Config{ .Resolver = *m_ChunkResolver, .RecordingReader = *m_RecordingReader, @@ -1437,6 +1463,7 @@ ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std .Quiet = m_QuietLogging, .DumpActions = m_DumpActions, .Binary = m_Binary, + .ProgressMode = ProgressMode, }; ExecSessionRunner Runner(ComputeSession, Config); |