aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp89
1 files changed, 58 insertions, 31 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
index 9719fce77..60968b521 100644
--- a/src/zen/cmds/exec_cmd.cpp
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -2,6 +2,8 @@
#include "exec_cmd.h"
+#include "zenserviceclient.h"
+
#include <zencompute/computeservice.h>
#include <zencompute/recordingreader.h>
#include <zencore/compactbinary.h>
@@ -23,6 +25,8 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
+#include "consoleprogress.h"
+
#include <EASTL/hash_map.h>
#include <EASTL/hash_set.h>
#include <EASTL/map.h>
@@ -114,7 +118,7 @@ namespace {
} // namespace
//////////////////////////////////////////////////////////////////////////
-// ExecSessionConfig — read-only configuration for a session run
+// ExecSessionConfig - read-only configuration for a session run
struct ExecSessionConfig
{
@@ -124,17 +128,18 @@ struct ExecSessionConfig
std::vector<ExecFunctionDefinition>& FunctionList; // mutable for EmitFunctionListOnce
std::string_view OrchestratorUrl;
const std::filesystem::path& OutputPath;
- int Offset = 0;
- int Stride = 1;
- int Limit = 0;
- bool Verbose = false;
- bool Quiet = false;
- bool DumpActions = false;
- bool Binary = false;
+ int Offset = 0;
+ int Stride = 1;
+ int Limit = 0;
+ bool Verbose = false;
+ bool Quiet = false;
+ bool DumpActions = false;
+ bool Binary = false;
+ ConsoleProgressMode ProgressMode = ConsoleProgressMode::Pretty;
};
//////////////////////////////////////////////////////////////////////////
-// ExecSessionRunner — owns per-run state, drives the session lifecycle
+// ExecSessionRunner - owns per-run state, drives the session lifecycle
class ExecSessionRunner
{
@@ -345,8 +350,6 @@ ExecSessionRunner::DrainCompletedJobs()
}
m_PendingJobs.Remove(CompleteLsn);
-
- ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize());
}
}
}
@@ -897,17 +900,22 @@ ExecSessionRunner::Run()
// Then submit work items
- int FailedWorkCounter = 0;
- size_t RemainingWorkItems = m_Config.RecordingReader.GetActionCount();
- int SubmittedWorkItems = 0;
+ std::atomic<int> FailedWorkCounter{0};
+ std::atomic<size_t> RemainingWorkItems{m_Config.RecordingReader.GetActionCount()};
+ std::atomic<int> SubmittedWorkItems{0};
+ size_t TotalWorkItems = RemainingWorkItems.load();
- ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+ std::unique_ptr<ProgressBase> ProgressOwner(CreateConsoleProgress(m_Config.ProgressMode));
+ std::unique_ptr<ProgressBase::ProgressBar> SubmitProgress = ProgressOwner->CreateProgressBar("Submit");
+ SubmitProgress->UpdateState(
+ {.Task = "Submitting work items", .TotalCount = TotalWorkItems, .RemainingCount = RemainingWorkItems.load()},
+ false);
int OffsetCounter = m_Config.Offset;
int StrideCounter = m_Config.Stride;
auto ShouldSchedule = [&]() -> bool {
- if (m_Config.Limit && SubmittedWorkItems >= m_Config.Limit)
+ if (m_Config.Limit && SubmittedWorkItems.load() >= m_Config.Limit)
{
// Limit reached, ignore
@@ -1005,17 +1013,14 @@ ExecSessionRunner::Run()
{
const int32_t LsnField = EnqueueResult.Lsn;
- --RemainingWorkItems;
- ++SubmittedWorkItems;
+ size_t Remaining = --RemainingWorkItems;
+ int Submitted = ++SubmittedWorkItems;
- if (!m_Config.Quiet)
- {
- ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
- SubmittedWorkItems,
- LsnField,
- NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
- RemainingWorkItems);
- }
+ SubmitProgress->UpdateState({.Task = "Submitting work items",
+ .Details = fmt::format("#{} LSN {}", Submitted, LsnField),
+ .TotalCount = TotalWorkItems,
+ .RemainingCount = Remaining},
+ false);
if (!m_Config.OutputPath.empty())
{
@@ -1055,22 +1060,37 @@ ExecSessionRunner::Run()
},
TargetParallelism);
+ SubmitProgress->Finish();
+
// Wait until all pending work is complete
+ size_t TotalPendingJobs = m_PendingJobs.GetSize();
+
+ std::unique_ptr<ProgressBase::ProgressBar> CompletionProgress = ProgressOwner->CreateProgressBar("Execute");
+
while (!m_PendingJobs.IsEmpty())
{
- // TODO: improve this logic
- zen::Sleep(500);
+ size_t PendingCount = m_PendingJobs.GetSize();
+ CompletionProgress->UpdateState(
+ {.Task = "Executing work items",
+ .Details = fmt::format("{} completed, {} remaining", TotalPendingJobs - PendingCount, PendingCount),
+ .TotalCount = TotalPendingJobs,
+ .RemainingCount = PendingCount},
+ false);
+
+ zen::Sleep(ProgressOwner->GetProgressUpdateDelayMS());
DrainCompletedJobs();
SendOrchestratorHeartbeat();
}
+ CompletionProgress->Finish();
+
// Write summary files
WriteSummaryFiles();
- if (FailedWorkCounter)
+ if (FailedWorkCounter.load())
{
return 1;
}
@@ -1089,7 +1109,7 @@ ExecHttpSubCmd::ExecHttpSubCmd(ExecCommand& Parent) : ZenSubCmdBase("http", "For
void
ExecHttpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
{
- m_HostName = ZenCmdBase::ResolveTargetHostSpec(m_HostName);
+ ZenServiceClient Service({.HostSpec = m_HostName, .CommandName = ExecCommand::Name});
ZEN_ASSERT(m_Parent.m_ChunkResolver);
ChunkResolver& Resolver = *m_Parent.m_ChunkResolver;
@@ -1097,7 +1117,7 @@ ExecHttpSubCmd::Run(const ZenCliOptions& /*GlobalOptions*/)
std::filesystem::path TempPath = std::filesystem::absolute(".zen_temp");
zen::compute::ComputeServiceSession ComputeSession(Resolver);
- ComputeSession.AddRemoteRunner(Resolver, TempPath, m_HostName);
+ ComputeSession.AddRemoteRunner(Resolver, TempPath, Service.HostSpec());
Stopwatch ExecTimer;
int ReturnValue = m_Parent.RunSession(ComputeSession);
@@ -1423,6 +1443,12 @@ ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions)
int
ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl)
{
+ ConsoleProgressMode ProgressMode = ConsoleProgressMode::Pretty;
+ if (m_QuietLogging)
+ {
+ ProgressMode = ConsoleProgressMode::Quiet;
+ }
+
ExecSessionConfig Config{
.Resolver = *m_ChunkResolver,
.RecordingReader = *m_RecordingReader,
@@ -1437,6 +1463,7 @@ ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std
.Quiet = m_QuietLogging,
.DumpActions = m_DumpActions,
.Binary = m_Binary,
+ .ProgressMode = ProgressMode,
};
ExecSessionRunner Runner(ComputeSession, Config);