aboutsummaryrefslogtreecommitdiff
path: root/src/zen/cmds/exec_cmd.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-04-13 16:38:16 +0200
committerGitHub Enterprise <[email protected]>2026-04-13 16:38:16 +0200
commit795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch)
tree7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zen/cmds/exec_cmd.cpp
parent5.8.4-pre2 (diff)
downloadarchived-zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz
archived-zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control. ### Async Horde Agent Rewrite - Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool - Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport` - Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers - Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed) ### Provisioner Drain / Scale-Down - `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain` - Configurable `--horde-drain-grace-period` (default 300s) before force-kill - Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer - Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers ### OIDC Authentication - `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token` - Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode - New `--horde-oidctoken-exe-path` CLI option for explicit path override ### Orchestrator & Scheduler - Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers - New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment - Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock - Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings - New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions) - New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target` ### Remote Execution - Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits - Track in-flight submissions to prevent over-queuing - Show remote runner hostname in `GetDisplayName()` - `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address) ### Frontend Dashboard - Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules - Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count - Editable target-cores input with debounced POST to `/orch/provisioner/target` - Per-agent provisioning status badges (active / draining / deallocated) in the agents table - Active vs total CPU counts in agents summary row ### CLI - New `zen compute record-start` / `record-stop` subcommands - `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet) ### Other - `DataDir` supports environment variable expansion - Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories - Linux/Mac runners `nice(5)` child processes to avoid starving the main server - `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset` - Curl HTTP client logs effective URL on failure - `MachineInfo` carries `Pool` and `Mode` from Horde response - Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zen/cmds/exec_cmd.cpp')
-rw-r--r--src/zen/cmds/exec_cmd.cpp80
1 files changed, 53 insertions, 27 deletions
diff --git a/src/zen/cmds/exec_cmd.cpp b/src/zen/cmds/exec_cmd.cpp
index 9719fce77..89bbf3638 100644
--- a/src/zen/cmds/exec_cmd.cpp
+++ b/src/zen/cmds/exec_cmd.cpp
@@ -23,6 +23,8 @@
#include <zenhttp/httpclient.h>
#include <zenhttp/packageformat.h>
+#include "../progressbar.h"
+
#include <EASTL/hash_map.h>
#include <EASTL/hash_set.h>
#include <EASTL/map.h>
@@ -124,13 +126,14 @@ struct ExecSessionConfig
std::vector<ExecFunctionDefinition>& FunctionList; // mutable for EmitFunctionListOnce
std::string_view OrchestratorUrl;
const std::filesystem::path& OutputPath;
- int Offset = 0;
- int Stride = 1;
- int Limit = 0;
- bool Verbose = false;
- bool Quiet = false;
- bool DumpActions = false;
- bool Binary = false;
+ int Offset = 0;
+ int Stride = 1;
+ int Limit = 0;
+ bool Verbose = false;
+ bool Quiet = false;
+ bool DumpActions = false;
+ bool Binary = false;
+ ProgressBar::Mode ProgressMode = ProgressBar::Mode::PrettyScroll;
};
//////////////////////////////////////////////////////////////////////////
@@ -345,8 +348,6 @@ ExecSessionRunner::DrainCompletedJobs()
}
m_PendingJobs.Remove(CompleteLsn);
-
- ZEN_CONSOLE("completed: LSN {} ({} still pending)", CompleteLsn, m_PendingJobs.GetSize());
}
}
}
@@ -897,17 +898,20 @@ ExecSessionRunner::Run()
// Then submit work items
- int FailedWorkCounter = 0;
- size_t RemainingWorkItems = m_Config.RecordingReader.GetActionCount();
- int SubmittedWorkItems = 0;
+ std::atomic<int> FailedWorkCounter{0};
+ std::atomic<size_t> RemainingWorkItems{m_Config.RecordingReader.GetActionCount()};
+ std::atomic<int> SubmittedWorkItems{0};
+ size_t TotalWorkItems = RemainingWorkItems.load();
- ZEN_CONSOLE("submitting {} work items", RemainingWorkItems);
+ ProgressBar SubmitProgress(m_Config.ProgressMode, "Submit");
+ SubmitProgress.UpdateState({.Task = "Submitting work items", .TotalCount = TotalWorkItems, .RemainingCount = RemainingWorkItems.load()},
+ false);
int OffsetCounter = m_Config.Offset;
int StrideCounter = m_Config.Stride;
auto ShouldSchedule = [&]() -> bool {
- if (m_Config.Limit && SubmittedWorkItems >= m_Config.Limit)
+ if (m_Config.Limit && SubmittedWorkItems.load() >= m_Config.Limit)
{
// Limit reached, ignore
@@ -1005,17 +1009,14 @@ ExecSessionRunner::Run()
{
const int32_t LsnField = EnqueueResult.Lsn;
- --RemainingWorkItems;
- ++SubmittedWorkItems;
+ size_t Remaining = --RemainingWorkItems;
+ int Submitted = ++SubmittedWorkItems;
- if (!m_Config.Quiet)
- {
- ZEN_CONSOLE("submitted work item #{} - LSN {} - {}. {} remaining",
- SubmittedWorkItems,
- LsnField,
- NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()),
- RemainingWorkItems);
- }
+ SubmitProgress.UpdateState({.Task = "Submitting work items",
+ .Details = fmt::format("#{} LSN {}", Submitted, LsnField),
+ .TotalCount = TotalWorkItems,
+ .RemainingCount = Remaining},
+ false);
if (!m_Config.OutputPath.empty())
{
@@ -1055,22 +1056,36 @@ ExecSessionRunner::Run()
},
TargetParallelism);
+ SubmitProgress.Finish();
+
// Wait until all pending work is complete
+ size_t TotalPendingJobs = m_PendingJobs.GetSize();
+
+ ProgressBar CompletionProgress(m_Config.ProgressMode, "Execute");
+
while (!m_PendingJobs.IsEmpty())
{
- // TODO: improve this logic
- zen::Sleep(500);
+ size_t PendingCount = m_PendingJobs.GetSize();
+ CompletionProgress.UpdateState({.Task = "Executing work items",
+ .Details = fmt::format("{} completed, {} remaining", TotalPendingJobs - PendingCount, PendingCount),
+ .TotalCount = TotalPendingJobs,
+ .RemainingCount = PendingCount},
+ false);
+
+ zen::Sleep(GetUpdateDelayMS(m_Config.ProgressMode));
DrainCompletedJobs();
SendOrchestratorHeartbeat();
}
+ CompletionProgress.Finish();
+
// Write summary files
WriteSummaryFiles();
- if (FailedWorkCounter)
+ if (FailedWorkCounter.load())
{
return 1;
}
@@ -1423,6 +1438,16 @@ ExecCommand::OnParentOptionsParsed(const ZenCliOptions& GlobalOptions)
int
ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std::string_view OrchestratorUrl)
{
+ ProgressBar::Mode ProgressMode = ProgressBar::Mode::Pretty;
+ if (m_VerboseLogging)
+ {
+ ProgressMode = ProgressBar::Mode::Plain;
+ }
+ else if (m_QuietLogging)
+ {
+ ProgressMode = ProgressBar::Mode::Quiet;
+ }
+
ExecSessionConfig Config{
.Resolver = *m_ChunkResolver,
.RecordingReader = *m_RecordingReader,
@@ -1437,6 +1462,7 @@ ExecCommand::RunSession(zen::compute::ComputeServiceSession& ComputeSession, std
.Quiet = m_QuietLogging,
.DumpActions = m_DumpActions,
.Binary = m_Binary,
+ .ProgressMode = ProgressMode,
};
ExecSessionRunner Runner(ComputeSession, Config);