diff options
| author | Stefan Boberg <[email protected]> | 2026-04-13 16:38:16 +0200 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-04-13 16:38:16 +0200 |
| commit | 795345e5fd7974a1f5227d507a58bb3ed75eafd5 (patch) | |
| tree | 7a0f142bf562c3590400586c82b0e7a1b5ad6493 /src/zencompute | |
| parent | 5.8.4-pre2 (diff) | |
| download | zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.tar.xz zen-795345e5fd7974a1f5227d507a58bb3ed75eafd5.zip | |
Compute OIDC auth, async Horde agents, and orchestrator improvements (#913)
Rework of the Horde agent subsystem from synchronous per-thread I/O to an async ASIO-driven architecture, plus provisioner scale-down with graceful draining, OIDC authentication, scheduler improvements, and dashboard UI for provisioner control.
### Async Horde Agent Rewrite
- Replace synchronous `HordeAgent` (one thread per agent, blocking I/O) with `AsyncHordeAgent` — an ASIO state machine running on a shared `io_context` thread pool
- Replace `TcpComputeTransport`/`AesComputeTransport` with `AsyncTcpComputeTransport`/`AsyncAesComputeTransport`
- Replace `AgentMessageChannel` with `AsyncAgentMessageChannel` using frame queuing and ASIO timers
- Delete `ComputeBuffer` and `ComputeChannel` ring-buffer classes (no longer needed)
### Provisioner Drain / Scale-Down
- `HordeProvisioner` can now drain agents when target core count is lowered: queries each agent's `/compute/session/status` for workload, selects candidates by largest-fit/lowest-workload, and sends `/compute/session/drain`
- Configurable `--horde-drain-grace-period` (default 300s) before force-kill
- Implement `IProvisionerStateProvider` interface to expose provisioner state to the orchestrator HTTP layer
- Forward `--coordinator-session`, `--provision-clean`, and `--provision-tracehost` through both Horde and Nomad provisioners to spawned workers
### OIDC Authentication
- `HordeClient` accepts an `AccessTokenProvider` (refreshable token function) as alternative to static `--horde-token`
- Wire up `OidcToken.exe` auto-discovery via `httpclientauth::CreateFromOidcTokenExecutable` with `--HordeUrl` mode
- New `--horde-oidctoken-exe-path` CLI option for explicit path override
### Orchestrator & Scheduler
- Orchestrator generates a session ID at startup; workers include `coordinator_session` in announcements so the orchestrator can reject stale-session workers
- New `Rejected` action state — when a remote runner declines at capacity, the action is rescheduled without retry count increment
- Reduce scheduler lock contention: snapshot pending actions under shared lock, sort/trim outside the lock
- Parallelize remote action submission across runners via `WorkerThreadPool` with slow-submit warnings
- New action field `FailureReason` populated by all runner types (exit codes, sandbox failures, exceptions)
- New endpoints: `session/drain`, `session/status`, `session/sunset`, `provisioner/status`, `provisioner/target`
### Remote Execution
- Eager-attach mode for `RemoteHttpRunner` — bundles all attachments upfront in a `CbPackage` for single-roundtrip submits
- Track in-flight submissions to prevent over-queuing
- Show remote runner hostname in `GetDisplayName()`
- `--announce-url` to override the endpoint announced to the coordinator (e.g. relay-visible address)
### Frontend Dashboard
- Delete standalone `compute.html` (925 lines) and `orchestrator.html` (669 lines), consolidated into JS page modules
- Add provisioner panel to orchestrator dashboard: target/active/estimated core counts, draining agent count
- Editable target-cores input with debounced POST to `/orch/provisioner/target`
- Per-agent provisioning status badges (active / draining / deallocated) in the agents table
- Active vs total CPU counts in agents summary row
### CLI
- New `zen compute record-start` / `record-stop` subcommands
- `zen exec` progress bar with submit and completion phases, atomic work counters, `--progress` mode (Pretty/Plain/Quiet)
### Other
- `DataDir` supports environment variable expansion
- Worker manifest validation checks for `worker.zcb` marker to detect incomplete cached directories
- Linux/Mac runners `nice(5)` child processes to avoid starving the main server
- `ComputeService::SetShutdownCallback` wired to `RequestExit` via `session/sunset`
- Curl HTTP client logs effective URL on failure
- `MachineInfo` carries `Pool` and `Mode` from Horde response
- Horde bundle creation includes `.pdb` on Windows
Diffstat (limited to 'src/zencompute')
20 files changed, 848 insertions, 220 deletions
diff --git a/src/zencompute/CLAUDE.md b/src/zencompute/CLAUDE.md index 750879d5a..bb574edc2 100644 --- a/src/zencompute/CLAUDE.md +++ b/src/zencompute/CLAUDE.md @@ -218,6 +218,13 @@ Worker handler logic is extracted into private helpers (`HandleWorkersGet`, `Han **Locking discipline:** The three action maps (`m_PendingActions`, `m_RunningMap`, `m_ResultsMap`) are guarded by a single `m_ActionMapLock`. This eliminates lock-ordering concerns between maps and prevents actions from being temporarily absent from all maps during state transitions. Runner-level `m_RunningLock` in `LocalProcessRunner` / `RemoteHttpRunner` is a separate lock on a different class — unrelated to the session-level action map lock. +**Lock ordering:** When acquiring multiple session-level locks, always acquire in this order to avoid deadlocks: +1. `m_ActionMapLock` (session action maps) +2. `QueueEntry::m_Lock` (per-queue state) +3. `m_ActionHistoryLock` (action history ring) + +Never acquire an earlier lock while holding a later one (e.g. never acquire `m_ActionMapLock` while holding `QueueEntry::m_Lock`). + **Atomic fields** for counters and simple state: queue counts, `CpuUsagePercent`, `CpuSeconds`, `RetryCount`, `RunnerAction::m_ActionState`. **Update decoupling:** Runners call `PostUpdate(RunnerAction*)` rather than directly mutating service state. The scheduler thread batches and deduplicates updates. diff --git a/src/zencompute/computeservice.cpp b/src/zencompute/computeservice.cpp index aaf34cbe2..852e93fa0 100644 --- a/src/zencompute/computeservice.cpp +++ b/src/zencompute/computeservice.cpp @@ -121,6 +121,8 @@ struct ComputeServiceSession::Impl , m_LocalSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) , m_RemoteSubmitPool(GetLargeWorkerPool(EWorkloadType::Burst)) { + m_RemoteRunnerGroup.SetWorkerPool(&m_RemoteSubmitPool); + // Create a non-expiring, non-deletable implicit queue for legacy endpoints auto Result = CreateQueue("implicit"sv, {}, {}); m_ImplicitQueueId = Result.QueueId; @@ -240,8 +242,9 @@ struct ComputeServiceSession::Impl // Recording - void StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); - void StopRecording(); + bool StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath); + bool StopRecording(); + bool IsRecording() const; std::unique_ptr<ActionRecorder> m_Recorder; @@ -615,6 +618,7 @@ ComputeServiceSession::Impl::UpdateCoordinatorState() m_KnownWorkerUris.insert(UriStr); auto* NewRunner = new RemoteHttpRunner(m_ChunkResolver, m_OrchestratorBasePath, UriStr, m_RemoteSubmitPool); + NewRunner->SetRemoteHostname(Hostname); SyncWorkersToRunner(*NewRunner); m_RemoteRunnerGroup.AddRunner(NewRunner); } @@ -716,24 +720,44 @@ ComputeServiceSession::Impl::ShutdownRunners() m_RemoteRunnerGroup.Shutdown(); } -void +bool ComputeServiceSession::Impl::StartRecording(ChunkResolver& InCidStore, const std::filesystem::path& RecordingPath) { + if (m_Recorder) + { + ZEN_WARN("recording is already active"); + return false; + } + ZEN_INFO("starting recording to '{}'", RecordingPath); m_Recorder = std::make_unique<ActionRecorder>(InCidStore, RecordingPath); ZEN_INFO("started recording to '{}'", RecordingPath); + return true; } -void +bool ComputeServiceSession::Impl::StopRecording() { + if (!m_Recorder) + { + ZEN_WARN("no recording is active"); + return false; + } + ZEN_INFO("stopping recording"); m_Recorder = nullptr; ZEN_INFO("stopped recording"); + return true; +} + +bool +ComputeServiceSession::Impl::IsRecording() const +{ + return m_Recorder != nullptr; } std::vector<ComputeServiceSession::RunningActionInfo> @@ -1128,6 +1152,10 @@ ComputeServiceSession::Impl::GetCompleted(CbWriter& Cbo) Cbo.BeginObject(); Cbo << "lsn"sv << Lsn; Cbo << "state"sv << RunnerAction::ToString(Action->ActionState()); + if (!Action->FailureReason.empty()) + { + Cbo << "reason"sv << Action->FailureReason; + } Cbo.EndObject(); } }); @@ -1416,8 +1444,8 @@ ComputeServiceSession::Impl::GetQueueCompleted(int QueueId, CbWriter& Cbo) if (Queue) { - Queue->m_Lock.WithSharedLock([&] { - m_ActionMapLock.WithSharedLock([&] { + m_ActionMapLock.WithSharedLock([&] { + Queue->m_Lock.WithSharedLock([&] { for (int Lsn : Queue->FinishedLsns) { if (m_ResultsMap.contains(Lsn)) @@ -1530,12 +1558,12 @@ ComputeServiceSession::Impl::SchedulePendingActions() static Stopwatch DumpRunningTimer; auto _ = MakeGuard([&] { - ZEN_INFO("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", - ScheduledCount, - RunningCount, - m_RetiredCount.load(), - PendingCount, - ResultCount); + ZEN_DEBUG("scheduled {} pending actions. {} running ({} retired), {} still pending, {} results", + ScheduledCount, + RunningCount, + m_RetiredCount.load(), + PendingCount, + ResultCount); if (DumpRunningTimer.GetElapsedTimeMs() > 30000) { @@ -1584,13 +1612,13 @@ ComputeServiceSession::Impl::SchedulePendingActions() // Also note that the m_PendingActions list is not maintained // here, that's done periodically in SchedulePendingActions() - m_ActionMapLock.WithExclusiveLock([&] { - if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused) - { - return; - } + // Extract pending actions under a shared lock — we only need to read + // the map and take Ref copies. ActionState() is atomic so this is safe. + // Sorting and capacity trimming happen outside the lock to avoid + // blocking HTTP handlers on O(N log N) work with large pending queues. - if (m_PendingActions.empty()) + m_ActionMapLock.WithSharedLock([&] { + if (m_SessionState.load(std::memory_order_relaxed) >= SessionState::Paused) { return; } @@ -1610,6 +1638,7 @@ ComputeServiceSession::Impl::SchedulePendingActions() case RunnerAction::State::Completed: case RunnerAction::State::Failed: case RunnerAction::State::Abandoned: + case RunnerAction::State::Rejected: case RunnerAction::State::Cancelled: break; @@ -1620,30 +1649,30 @@ ComputeServiceSession::Impl::SchedulePendingActions() } } - // Sort by priority descending, then by LSN ascending (FIFO within same priority) - std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) { - if (A->Priority != B->Priority) - { - return A->Priority > B->Priority; - } - return A->ActionLsn < B->ActionLsn; - }); + PendingCount = m_PendingActions.size(); + }); - if (ActionsToSchedule.size() > Capacity) + // Sort by priority descending, then by LSN ascending (FIFO within same priority) + std::sort(ActionsToSchedule.begin(), ActionsToSchedule.end(), [](const Ref<RunnerAction>& A, const Ref<RunnerAction>& B) { + if (A->Priority != B->Priority) { - ActionsToSchedule.resize(Capacity); + return A->Priority > B->Priority; } - - PendingCount = m_PendingActions.size(); + return A->ActionLsn < B->ActionLsn; }); + if (ActionsToSchedule.size() > Capacity) + { + ActionsToSchedule.resize(Capacity); + } + if (ActionsToSchedule.empty()) { _.Dismiss(); return; } - ZEN_INFO("attempting schedule of {} pending actions", ActionsToSchedule.size()); + ZEN_DEBUG("attempting schedule of {} pending actions", ActionsToSchedule.size()); Stopwatch SubmitTimer; std::vector<SubmitResult> SubmitResults = SubmitActions(ActionsToSchedule); @@ -1663,10 +1692,10 @@ ComputeServiceSession::Impl::SchedulePendingActions() } } - ZEN_INFO("scheduled {} pending actions in {} ({} rejected)", - ScheduledActionCount, - NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), - NotAcceptedCount); + ZEN_DEBUG("scheduled {} pending actions in {} ({} rejected)", + ScheduledActionCount, + NiceTimeSpanMs(SubmitTimer.GetElapsedTimeMs()), + NotAcceptedCount); ScheduledCount += ScheduledActionCount; PendingCount -= ScheduledActionCount; @@ -1975,6 +2004,14 @@ ComputeServiceSession::Impl::HandleActionUpdates() break; } + // Rejected — runner was at capacity, reschedule without retry cost + case RunnerAction::State::Rejected: + { + Action->ResetActionStateToPending(); + ZEN_DEBUG("action {} ({}) rescheduled after runner rejection", Action->ActionId, ActionLsn); + break; + } + // Terminal states — move to results, record history, notify queue case RunnerAction::State::Completed: case RunnerAction::State::Failed: @@ -2009,6 +2046,14 @@ ComputeServiceSession::Impl::HandleActionUpdates() MaxRetries); break; } + else + { + ZEN_WARN("action {} ({}) {} after {} retries, not rescheduling", + Action->ActionId, + ActionLsn, + RunnerAction::ToString(TerminalState), + Action->RetryCount.load(std::memory_order_relaxed)); + } } m_ActionMapLock.WithExclusiveLock([&] { @@ -2101,10 +2146,9 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& ZEN_TRACE_CPU("ComputeServiceSession::SubmitActions"); std::vector<SubmitResult> Results(Actions.size()); - // First try submitting the batch to local runners in parallel + // First try submitting the batch to local runners std::vector<SubmitResult> LocalResults = m_LocalRunnerGroup.SubmitActions(Actions); - std::vector<size_t> RemoteIndices; std::vector<Ref<RunnerAction>> RemoteActions; for (size_t i = 0; i < Actions.size(); ++i) @@ -2115,20 +2159,40 @@ ComputeServiceSession::Impl::SubmitActions(const std::vector<Ref<RunnerAction>>& } else { - RemoteIndices.push_back(i); RemoteActions.push_back(Actions[i]); + Results[i] = SubmitResult{.IsAccepted = true, .Reason = "dispatched to remote"}; } } - // Submit remaining actions to remote runners in parallel + // Dispatch remaining actions to remote runners asynchronously. + // Mark actions as Submitting so the scheduler won't re-pick them. + // The remote runner will transition them to Running on success, or + // we mark them Failed on rejection so HandleActionUpdates retries. if (!RemoteActions.empty()) { - std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions); - - for (size_t j = 0; j < RemoteIndices.size(); ++j) + for (const Ref<RunnerAction>& Action : RemoteActions) { - Results[RemoteIndices[j]] = std::move(RemoteResults[j]); + Action->SetActionState(RunnerAction::State::Submitting); } + + m_RemoteSubmitPool.ScheduleWork( + [this, RemoteActions = std::move(RemoteActions)]() { + std::vector<SubmitResult> RemoteResults = m_RemoteRunnerGroup.SubmitActions(RemoteActions); + + for (size_t j = 0; j < RemoteResults.size(); ++j) + { + if (!RemoteResults[j].IsAccepted) + { + ZEN_DEBUG("remote submission rejected for action {} ({}): {}", + RemoteActions[j]->ActionId, + RemoteActions[j]->ActionLsn, + RemoteResults[j].Reason); + + RemoteActions[j]->SetActionState(RunnerAction::State::Rejected); + } + } + }, + WorkerThreadPool::EMode::EnableBacklog); } return Results; @@ -2194,16 +2258,22 @@ ComputeServiceSession::NotifyOrchestratorChanged() m_Impl->NotifyOrchestratorChanged(); } -void +bool ComputeServiceSession::StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath) { - m_Impl->StartRecording(InResolver, RecordingPath); + return m_Impl->StartRecording(InResolver, RecordingPath); } -void +bool ComputeServiceSession::StopRecording() { - m_Impl->StopRecording(); + return m_Impl->StopRecording(); +} + +bool +ComputeServiceSession::IsRecording() const +{ + return m_Impl->IsRecording(); } ComputeServiceSession::ActionCounts diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index 6cb975dd3..8cbb25afd 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -62,6 +62,8 @@ struct HttpComputeService::Impl RwLock m_WsConnectionsLock; std::vector<Ref<WebSocketConnection>> m_WsConnections; + std::function<void()> m_ShutdownCallback; + // Metrics metrics::OperationTiming m_HttpRequests; @@ -190,6 +192,65 @@ HttpComputeService::Impl::RegisterRoutes() HttpVerb::kPost); m_Router.RegisterRoute( + "session/drain", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Draining)) + { + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + + CbObjectWriter Cbo; + Cbo << "error"sv + << "Cannot transition to Draining from current state"sv; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( + "session/status", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + auto Counts = m_ComputeService.GetActionCounts(); + Cbo << "actions_pending"sv << Counts.Pending; + Cbo << "actions_running"sv << Counts.Running; + Cbo << "actions_completed"sv << Counts.Completed; + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "session/sunset", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + if (m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Sunset)) + { + CbObjectWriter Cbo; + Cbo << "state"sv << ToString(m_ComputeService.GetSessionState()); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + + if (m_ShutdownCallback) + { + m_ShutdownCallback(); + } + return; + } + + CbObjectWriter Cbo; + Cbo << "error"sv + << "Cannot transition to Sunset from current state"sv; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "workers", [this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); }, HttpVerb::kGet); @@ -506,9 +567,19 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StartRecording(m_CombinedResolver, m_BaseDir / "recording"); + std::filesystem::path RecordingPath = m_BaseDir / "recording"; - return HttpReq.WriteResponse(HttpResponseCode::OK); + if (!m_ComputeService.StartRecording(m_CombinedResolver, RecordingPath)) + { + CbObjectWriter Cbo; + Cbo << "error" + << "recording is already active"; + return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } + + CbObjectWriter Cbo; + Cbo << "path" << RecordingPath.string(); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); @@ -522,9 +593,19 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - m_ComputeService.StopRecording(); + std::filesystem::path RecordingPath = m_BaseDir / "recording"; + + if (!m_ComputeService.StopRecording()) + { + CbObjectWriter Cbo; + Cbo << "error" + << "no recording is active"; + return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } - return HttpReq.WriteResponse(HttpResponseCode::OK); + CbObjectWriter Cbo; + Cbo << "path" << RecordingPath.string(); + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); @@ -1066,6 +1147,12 @@ HttpComputeService::GetActionCounts() return m_Impl->m_ComputeService.GetActionCounts(); } +void +HttpComputeService::SetShutdownCallback(std::function<void()> Callback) +{ + m_Impl->m_ShutdownCallback = std::move(Callback); +} + const char* HttpComputeService::BaseUri() const { diff --git a/src/zencompute/httporchestrator.cpp b/src/zencompute/httporchestrator.cpp index d92af8716..1f51e560e 100644 --- a/src/zencompute/httporchestrator.cpp +++ b/src/zencompute/httporchestrator.cpp @@ -7,6 +7,7 @@ # include <zencompute/orchestratorservice.h> # include <zencore/compactbinarybuilder.h> # include <zencore/logging.h> +# include <zencore/session.h> # include <zencore/string.h> # include <zencore/system.h> @@ -77,10 +78,47 @@ ParseWorkerAnnouncement(const CbObjectView& Data, OrchestratorService::WorkerAnn return Ann.Id; } +static OrchestratorService::WorkerAnnotator +MakeWorkerAnnotator(IProvisionerStateProvider* Prov) +{ + if (!Prov) + { + return {}; + } + return [Prov](std::string_view WorkerId, CbObjectWriter& Cbo) { + AgentProvisioningStatus Status = Prov->GetAgentStatus(WorkerId); + if (Status != AgentProvisioningStatus::Unknown) + { + const char* StatusStr = (Status == AgentProvisioningStatus::Draining) ? "draining" : "active"; + Cbo << "provisioner_status" << std::string_view(StatusStr); + } + }; +} + +bool +HttpOrchestratorService::ValidateCoordinatorSession(const CbObjectView& Data, std::string_view WorkerId) +{ + std::string_view SessionStr = Data["coordinator_session"].AsString(""); + if (SessionStr.empty()) + { + return true; // backwards compatibility: accept announcements without a session + } + Oid Session = Oid::TryFromHexString(SessionStr); + if (Session == m_SessionId) + { + return true; + } + ZEN_WARN("rejecting stale announcement from '{}' (session {} != {})", WorkerId, SessionStr, m_SessionId.ToString()); + return false; +} + HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir, bool EnableWorkerWebSocket) : m_Service(std::make_unique<OrchestratorService>(std::move(DataDir), EnableWorkerWebSocket)) , m_Hostname(GetMachineName()) { + m_SessionId = zen::GetSessionId(); + ZEN_INFO("orchestrator session id: {}", m_SessionId.ToString()); + m_Router.AddMatcher("workerid", [](std::string_view Segment) { return IsValidWorkerId(Segment); }); m_Router.AddMatcher("clientid", [](std::string_view Segment) { return IsValidWorkerId(Segment); }); @@ -95,13 +133,17 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir, [this](HttpRouterRequest& Req) { CbObjectWriter Cbo; Cbo << "hostname" << std::string_view(m_Hostname); + Cbo << "session_id" << m_SessionId.ToString(); Req.ServerRequest().WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "provision", - [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_Service->GetWorkerList()); }, + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, + m_Service->GetWorkerList(MakeWorkerAnnotator(m_Provisioner.load(std::memory_order_acquire)))); + }, HttpVerb::kPost); m_Router.RegisterRoute( @@ -122,6 +164,11 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir, "characters and uri must start with http:// or https://"); } + if (!ValidateCoordinatorSession(Data, WorkerId)) + { + return HttpReq.WriteResponse(HttpResponseCode::Conflict, HttpContentType::kText, "Stale coordinator session"); + } + m_Service->AnnounceWorker(Ann); HttpReq.WriteResponse(HttpResponseCode::OK); @@ -135,7 +182,10 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir, m_Router.RegisterRoute( "agents", - [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK, m_Service->GetWorkerList()); }, + [this](HttpRouterRequest& Req) { + Req.ServerRequest().WriteResponse(HttpResponseCode::OK, + m_Service->GetWorkerList(MakeWorkerAnnotator(m_Provisioner.load(std::memory_order_acquire)))); + }, HttpVerb::kGet); m_Router.RegisterRoute( @@ -241,6 +291,59 @@ HttpOrchestratorService::HttpOrchestratorService(std::filesystem::path DataDir, }, HttpVerb::kGet); + // Provisioner endpoints + + m_Router.RegisterRoute( + "provisioner/status", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObjectWriter Cbo; + if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire)) + { + Cbo << "name" << Prov->GetName(); + Cbo << "target_cores" << Prov->GetTargetCoreCount(); + Cbo << "estimated_cores" << Prov->GetEstimatedCoreCount(); + Cbo << "active_cores" << Prov->GetActiveCoreCount(); + Cbo << "agents" << Prov->GetAgentCount(); + Cbo << "agents_draining" << Prov->GetDrainingAgentCount(); + } + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "provisioner/target", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + CbObject Data = HttpReq.ReadPayloadObject(); + int32_t Cores = Data["target_cores"].AsInt32(-1); + + ZEN_INFO("provisioner/target: received request (target_cores={}, payload_valid={})", Cores, Data ? true : false); + + if (Cores < 0) + { + ZEN_WARN("provisioner/target: bad request (target_cores={})", Cores); + return HttpReq.WriteResponse(HttpResponseCode::BadRequest, HttpContentType::kText, "Missing or invalid target_cores field"); + } + + IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire); + if (!Prov) + { + ZEN_WARN("provisioner/target: no provisioner configured"); + return HttpReq.WriteResponse(HttpResponseCode::NotFound, HttpContentType::kText, "No provisioner configured"); + } + + ZEN_INFO("provisioner/target: setting target to {} cores", Cores); + Prov->SetTargetCoreCount(static_cast<uint32_t>(Cores)); + + CbObjectWriter Cbo; + Cbo << "target_cores" << Prov->GetTargetCoreCount(); + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + }, + HttpVerb::kPost); + // Client tracking endpoints m_Router.RegisterRoute( @@ -411,6 +514,13 @@ HttpOrchestratorService::HandleRequest(HttpServerRequest& Request) } } +void +HttpOrchestratorService::SetProvisionerStateProvider(IProvisionerStateProvider* Provider) +{ + m_Provisioner.store(Provider, std::memory_order_release); + m_Service->SetProvisionerStateProvider(Provider); +} + ////////////////////////////////////////////////////////////////////////// // // IWebSocketHandler @@ -488,6 +598,11 @@ HttpOrchestratorService::HandleWorkerWebSocketMessage(const WebSocketMessage& Ms return {}; } + if (!ValidateCoordinatorSession(Data, WorkerId)) + { + return {}; + } + m_Service->AnnounceWorker(Ann); return std::string(WorkerId); } @@ -563,7 +678,7 @@ HttpOrchestratorService::PushThreadFunction() } // Build combined JSON with worker list, provisioning history, clients, and client history - CbObject WorkerList = m_Service->GetWorkerList(); + CbObject WorkerList = m_Service->GetWorkerList(MakeWorkerAnnotator(m_Provisioner.load(std::memory_order_acquire))); CbObject History = m_Service->GetProvisioningHistory(50); CbObject ClientList = m_Service->GetClientList(); CbObject ClientHistory = m_Service->GetClientHistory(50); @@ -615,6 +730,20 @@ HttpOrchestratorService::PushThreadFunction() JsonBuilder.Append(ClientHistoryJsonView.substr(1, ClientHistoryJsonView.size() - 2)); } + // Emit provisioner stats if available + if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire)) + { + JsonBuilder.Append( + fmt::format(",\"provisioner\":{{\"name\":\"{}\",\"target_cores\":{},\"estimated_cores\":{}" + ",\"active_cores\":{},\"agents\":{},\"agents_draining\":{}}}", + Prov->GetName(), + Prov->GetTargetCoreCount(), + Prov->GetEstimatedCoreCount(), + Prov->GetActiveCoreCount(), + Prov->GetAgentCount(), + Prov->GetDrainingAgentCount())); + } + JsonBuilder.Append("}"); std::string_view Json = JsonBuilder.ToView(); diff --git a/src/zencompute/include/zencompute/computeservice.h b/src/zencompute/include/zencompute/computeservice.h index ad556f546..97de4321a 100644 --- a/src/zencompute/include/zencompute/computeservice.h +++ b/src/zencompute/include/zencompute/computeservice.h @@ -279,7 +279,7 @@ public: // sized to match RunnerAction::State::_Count but we can't use the enum here // for dependency reasons, so just use a fixed size array and static assert in // the implementation file - uint64_t Timestamps[9] = {}; + uint64_t Timestamps[10] = {}; }; [[nodiscard]] std::vector<ActionHistoryEntry> GetActionHistory(int Limit = 100); @@ -305,8 +305,9 @@ public: // Recording - void StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); - void StopRecording(); + bool StartRecording(ChunkResolver& InResolver, const std::filesystem::path& RecordingPath); + bool StopRecording(); + bool IsRecording() const; private: void PostUpdate(RunnerAction* Action); diff --git a/src/zencompute/include/zencompute/httpcomputeservice.h b/src/zencompute/include/zencompute/httpcomputeservice.h index db3fce3c2..32f54f293 100644 --- a/src/zencompute/include/zencompute/httpcomputeservice.h +++ b/src/zencompute/include/zencompute/httpcomputeservice.h @@ -35,6 +35,10 @@ public: void Shutdown(); + /** Set a callback to be invoked when the session/sunset endpoint is hit. + * Typically wired to HttpServer::RequestExit() to shut down the process. */ + void SetShutdownCallback(std::function<void()> Callback); + [[nodiscard]] ComputeServiceSession::ActionCounts GetActionCounts(); const char* BaseUri() const override; diff --git a/src/zencompute/include/zencompute/httporchestrator.h b/src/zencompute/include/zencompute/httporchestrator.h index 58b2c9152..ef0a1269a 100644 --- a/src/zencompute/include/zencompute/httporchestrator.h +++ b/src/zencompute/include/zencompute/httporchestrator.h @@ -2,10 +2,12 @@ #pragma once +#include <zencompute/provisionerstate.h> #include <zencompute/zencompute.h> #include <zencore/logging.h> #include <zencore/thread.h> +#include <zencore/uid.h> #include <zenhttp/httpserver.h> #include <zenhttp/websocket.h> @@ -65,6 +67,16 @@ public: */ void Shutdown(); + /** Return the session ID generated at construction time. Provisioners + * pass this to spawned workers so the orchestrator can reject stale + * announcements from previous sessions. */ + Oid GetSessionId() const { return m_SessionId; } + + /** Register a provisioner whose target core count can be read and changed + * via the orchestrator HTTP API and dashboard. Caller retains ownership; + * the provider must outlive this service. */ + void SetProvisionerStateProvider(IProvisionerStateProvider* Provider); + virtual const char* BaseUri() const override; virtual void HandleRequest(HttpServerRequest& Request) override; @@ -81,6 +93,11 @@ private: std::unique_ptr<OrchestratorService> m_Service; std::string m_Hostname; + Oid m_SessionId; + bool ValidateCoordinatorSession(const CbObjectView& Data, std::string_view WorkerId); + + std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr}; + // WebSocket push #if ZEN_WITH_WEBSOCKETS diff --git a/src/zencompute/include/zencompute/orchestratorservice.h b/src/zencompute/include/zencompute/orchestratorservice.h index 549ff8e3c..2c49e22df 100644 --- a/src/zencompute/include/zencompute/orchestratorservice.h +++ b/src/zencompute/include/zencompute/orchestratorservice.h @@ -6,6 +6,7 @@ #if ZEN_WITH_COMPUTE_SERVICES +# include <zencompute/provisionerstate.h> # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> # include <zencore/logbase.h> @@ -90,9 +91,16 @@ public: std::string Hostname; }; - CbObject GetWorkerList(); + /** Per-worker callback invoked during GetWorkerList serialization. + * The callback receives the worker ID and a CbObjectWriter positioned + * inside the worker's object, allowing the caller to append extra fields. */ + using WorkerAnnotator = std::function<void(std::string_view WorkerId, CbObjectWriter& Cbo)>; + + CbObject GetWorkerList(const WorkerAnnotator& Annotate = {}); void AnnounceWorker(const WorkerAnnouncement& Announcement); + void SetProvisionerStateProvider(IProvisionerStateProvider* Provider); + bool IsWorkerWebSocketEnabled() const; void SetWorkerWebSocketConnected(std::string_view WorkerId, bool Connected); @@ -171,6 +179,8 @@ private: LoggerRef m_Log{"compute.orchestrator"}; bool m_EnableWorkerWebSocket = false; + std::atomic<IProvisionerStateProvider*> m_Provisioner{nullptr}; + std::thread m_ProbeThread; std::atomic<bool> m_ProbeThreadEnabled{true}; Event m_ProbeThreadEvent; diff --git a/src/zencompute/include/zencompute/provisionerstate.h b/src/zencompute/include/zencompute/provisionerstate.h new file mode 100644 index 000000000..e9af8a635 --- /dev/null +++ b/src/zencompute/include/zencompute/provisionerstate.h @@ -0,0 +1,38 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#pragma once + +#include <cstdint> +#include <string_view> + +namespace zen::compute { + +/** Per-agent provisioning status as seen by the provisioner. */ +enum class AgentProvisioningStatus +{ + Unknown, ///< Not known to the provisioner + Active, ///< Running and allocated + Draining, ///< Being gracefully deprovisioned +}; + +/** Abstract interface for querying and controlling a provisioner from the HTTP layer. + * This decouples the orchestrator service from specific provisioner implementations. */ +class IProvisionerStateProvider +{ +public: + virtual ~IProvisionerStateProvider() = default; + + virtual std::string_view GetName() const = 0; ///< e.g. "horde", "nomad" + virtual uint32_t GetTargetCoreCount() const = 0; + virtual uint32_t GetEstimatedCoreCount() const = 0; + virtual uint32_t GetActiveCoreCount() const = 0; + virtual uint32_t GetAgentCount() const = 0; + virtual uint32_t GetDrainingAgentCount() const { return 0; } + virtual void SetTargetCoreCount(uint32_t Count) = 0; + + /** Return the provisioning status for a worker by its orchestrator ID + * (e.g. "horde-{LeaseId}"). Returns Unknown if the ID is not recognized. */ + virtual AgentProvisioningStatus GetAgentStatus(std::string_view /*WorkerId*/) const { return AgentProvisioningStatus::Unknown; } +}; + +} // namespace zen::compute diff --git a/src/zencompute/orchestratorservice.cpp b/src/zencompute/orchestratorservice.cpp index 9ea695305..aee8fa63a 100644 --- a/src/zencompute/orchestratorservice.cpp +++ b/src/zencompute/orchestratorservice.cpp @@ -31,7 +31,7 @@ OrchestratorService::~OrchestratorService() } CbObject -OrchestratorService::GetWorkerList() +OrchestratorService::GetWorkerList(const WorkerAnnotator& Annotate) { ZEN_TRACE_CPU("OrchestratorService::GetWorkerList"); CbObjectWriter Cbo; @@ -71,6 +71,10 @@ OrchestratorService::GetWorkerList() Cbo << "ws_connected" << true; } Cbo << "dt" << Worker.LastSeen.GetElapsedTimeMs(); + if (Annotate) + { + Annotate(WorkerId, Cbo); + } Cbo.EndObject(); } }); @@ -144,6 +148,12 @@ OrchestratorService::AnnounceWorker(const WorkerAnnouncement& Ann) } } +void +OrchestratorService::SetProvisionerStateProvider(IProvisionerStateProvider* Provider) +{ + m_Provisioner.store(Provider, std::memory_order_release); +} + bool OrchestratorService::IsWorkerWebSocketEnabled() const { @@ -607,6 +617,14 @@ OrchestratorService::ProbeThreadFunction() continue; } + // Check if the provisioner knows this worker is draining — if so, + // unreachability is expected and should not be logged as a warning. + bool IsDraining = false; + if (IProvisionerStateProvider* Prov = m_Provisioner.load(std::memory_order_acquire)) + { + IsDraining = Prov->GetAgentStatus(Snap.Id) == AgentProvisioningStatus::Draining; + } + ReachableState NewState = ReachableState::Unreachable; try @@ -621,7 +639,10 @@ OrchestratorService::ProbeThreadFunction() } catch (const std::exception& Ex) { - ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what()); + if (!IsDraining) + { + ZEN_WARN("probe failed for worker {} ({}): {}", Snap.Id, Snap.Uri, Ex.what()); + } } ReachableState PrevState = ReachableState::Unknown; @@ -646,6 +667,10 @@ OrchestratorService::ProbeThreadFunction() { ZEN_INFO("worker {} ({}) is now reachable", Snap.Id, Snap.Uri); } + else if (IsDraining) + { + ZEN_INFO("worker {} ({}) shut down (draining)", Snap.Id, Snap.Uri); + } else if (PrevState == ReachableState::Reachable) { ZEN_WARN("worker {} ({}) is no longer reachable", Snap.Id, Snap.Uri); diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp index 67e12b84e..ab22c6363 100644 --- a/src/zencompute/runners/functionrunner.cpp +++ b/src/zencompute/runners/functionrunner.cpp @@ -6,9 +6,15 @@ # include <zencore/compactbinary.h> # include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/logging.h> +# include <zencore/string.h> +# include <zencore/timer.h> # include <zencore/trace.h> +# include <zencore/workthreadpool.h> # include <fmt/format.h> +# include <future> # include <vector> namespace zen::compute { @@ -118,23 +124,34 @@ std::vector<SubmitResult> BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) { ZEN_TRACE_CPU("BaseRunnerGroup::SubmitActions"); - RwLock::SharedLockScope _(m_RunnersLock); - const int RunnerCount = gsl::narrow<int>(m_Runners.size()); + // Snapshot runners and query capacity under the lock, then release + // before submitting — HTTP submissions to remote runners can take + // hundreds of milliseconds and we must not hold m_RunnersLock during I/O. - if (RunnerCount == 0) - { - return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"}); - } + std::vector<Ref<FunctionRunner>> Runners; + std::vector<size_t> Capacities; + std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions; + size_t TotalCapacity = 0; - // Query capacity per runner and compute total - std::vector<size_t> Capacities(RunnerCount); - size_t TotalCapacity = 0; + m_RunnersLock.WithSharedLock([&] { + const int RunnerCount = gsl::narrow<int>(m_Runners.size()); + Runners.assign(m_Runners.begin(), m_Runners.end()); + Capacities.resize(RunnerCount); + PerRunnerActions.resize(RunnerCount); - for (int i = 0; i < RunnerCount; ++i) + for (int i = 0; i < RunnerCount; ++i) + { + Capacities[i] = Runners[i]->QueryCapacity(); + TotalCapacity += Capacities[i]; + } + }); + + const int RunnerCount = gsl::narrow<int>(Runners.size()); + + if (RunnerCount == 0) { - Capacities[i] = m_Runners[i]->QueryCapacity(); - TotalCapacity += Capacities[i]; + return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "No runners available"}); } if (TotalCapacity == 0) @@ -143,9 +160,8 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) } // Distribute actions across runners proportionally to their available capacity - std::vector<std::vector<Ref<RunnerAction>>> PerRunnerActions(RunnerCount); - std::vector<size_t> ActionRunnerIndex(Actions.size()); - size_t ActionIdx = 0; + std::vector<size_t> ActionRunnerIndex(Actions.size()); + size_t ActionIdx = 0; for (int i = 0; i < RunnerCount; ++i) { @@ -176,14 +192,74 @@ BaseRunnerGroup::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) } } - // Submit batches per runner + // Submit batches per runner — in parallel when a worker pool is available + std::vector<std::vector<SubmitResult>> PerRunnerResults(RunnerCount); + int ActiveRunnerCount = 0; for (int i = 0; i < RunnerCount; ++i) { if (!PerRunnerActions[i].empty()) { - PerRunnerResults[i] = m_Runners[i]->SubmitActions(PerRunnerActions[i]); + ++ActiveRunnerCount; + } + } + + static constexpr uint64_t SubmitWarnThresholdMs = 500; + + auto SubmitToRunner = [&](int RunnerIndex) { + auto& Runner = Runners[RunnerIndex]; + Runner->m_LastSubmitStats.Reset(); + + Stopwatch Timer; + + PerRunnerResults[RunnerIndex] = Runner->SubmitActions(PerRunnerActions[RunnerIndex]); + + uint64_t ElapsedMs = Timer.GetElapsedTimeMs(); + if (ElapsedMs >= SubmitWarnThresholdMs) + { + size_t Attachments = Runner->m_LastSubmitStats.TotalAttachments.load(std::memory_order_relaxed); + uint64_t AttachmentBytes = Runner->m_LastSubmitStats.TotalAttachmentBytes.load(std::memory_order_relaxed); + + ZEN_WARN("submit of {} actions ({} attachments, {}) to '{}' took {}ms", + PerRunnerActions[RunnerIndex].size(), + Attachments, + NiceBytes(AttachmentBytes), + Runner->GetDisplayName(), + ElapsedMs); + } + }; + + if (m_WorkerPool && ActiveRunnerCount > 1) + { + std::vector<std::future<void>> Futures(RunnerCount); + + for (int i = 0; i < RunnerCount; ++i) + { + if (!PerRunnerActions[i].empty()) + { + std::packaged_task<void()> Task([&SubmitToRunner, i]() { SubmitToRunner(i); }); + + Futures[i] = m_WorkerPool->EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog); + } + } + + for (int i = 0; i < RunnerCount; ++i) + { + if (Futures[i].valid()) + { + Futures[i].get(); + } + } + } + else + { + for (int i = 0; i < RunnerCount; ++i) + { + if (!PerRunnerActions[i].empty()) + { + SubmitToRunner(i); + } } } @@ -309,10 +385,11 @@ RunnerAction::RetractAction() bool RunnerAction::ResetActionStateToPending() { - // Only allow reset from Failed, Abandoned, or Retracted states + // Only allow reset from Failed, Abandoned, Rejected, or Retracted states State CurrentState = m_ActionState.load(); - if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted) + if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Rejected && + CurrentState != State::Retracted) { return false; } @@ -333,11 +410,12 @@ RunnerAction::ResetActionStateToPending() // Clear execution fields ExecutionLocation.clear(); + FailureReason.clear(); CpuUsagePercent.store(-1.0f, std::memory_order_relaxed); CpuSeconds.store(0.0f, std::memory_order_relaxed); - // Increment retry count (skip for Retracted — nothing failed) - if (CurrentState != State::Retracted) + // Increment retry count (skip for Retracted/Rejected — nothing failed) + if (CurrentState != State::Retracted && CurrentState != State::Rejected) { RetryCount.fetch_add(1, std::memory_order_relaxed); } diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h index 56c3f3af0..449f0e228 100644 --- a/src/zencompute/runners/functionrunner.h +++ b/src/zencompute/runners/functionrunner.h @@ -10,6 +10,10 @@ # include <filesystem> # include <vector> +namespace zen { +class WorkerThreadPool; +} + namespace zen::compute { struct SubmitResult @@ -37,6 +41,22 @@ public: [[nodiscard]] virtual bool IsHealthy() = 0; [[nodiscard]] virtual size_t QueryCapacity(); [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions); + [[nodiscard]] virtual std::string_view GetDisplayName() const { return "local"; } + + // Accumulated stats from the most recent SubmitActions call. + // Reset before each call, populated by the runner implementation. + struct SubmitStats + { + std::atomic<size_t> TotalAttachments{0}; + std::atomic<uint64_t> TotalAttachmentBytes{0}; + + void Reset() + { + TotalAttachments.store(0, std::memory_order_relaxed); + TotalAttachmentBytes.store(0, std::memory_order_relaxed); + } + }; + SubmitStats m_LastSubmitStats; // Best-effort cancellation of a specific in-flight action. Returns true if the // cancellation signal was successfully sent. The action will transition to Cancelled @@ -68,6 +88,8 @@ public: bool CancelAction(int ActionLsn); void CancelRemoteQueue(int QueueId); + void SetWorkerPool(WorkerThreadPool* Pool) { m_WorkerPool = Pool; } + size_t GetRunnerCount() { return m_RunnersLock.WithSharedLock([this] { return m_Runners.size(); }); @@ -79,6 +101,7 @@ protected: RwLock m_RunnersLock; std::vector<Ref<FunctionRunner>> m_Runners; std::atomic<int> m_NextSubmitIndex{0}; + WorkerThreadPool* m_WorkerPool = nullptr; }; /** Typed RunnerGroup that adds type-safe runner addition and predicate-based removal. @@ -151,6 +174,7 @@ struct RunnerAction : public RefCounted CbObject ActionObj; int Priority = 0; std::string ExecutionLocation; // "local" or remote hostname + std::string FailureReason; // human-readable reason when action fails (empty on success) // CPU usage and total CPU time of the running process, sampled periodically by the local runner. // CpuUsagePercent: -1.0 means not yet sampled; >=0.0 is the most recent reading as a percentage. @@ -168,6 +192,7 @@ struct RunnerAction : public RefCounted Completed, // Finished successfully with results available Failed, // Execution failed (transient error, eligible for retry) Abandoned, // Infrastructure termination (e.g. spot eviction, session abandon) + Rejected, // Runner declined (e.g. at capacity) — rescheduled without retry cost Cancelled, // Intentional user cancellation (never retried) Retracted, // Pulled back for rescheduling on a different runner (no retry cost) _Count @@ -194,6 +219,8 @@ struct RunnerAction : public RefCounted return "Failed"; case State::Abandoned: return "Abandoned"; + case State::Rejected: + return "Rejected"; case State::Cancelled: return "Cancelled"; case State::Retracted: diff --git a/src/zencompute/runners/linuxrunner.cpp b/src/zencompute/runners/linuxrunner.cpp index 9055005d9..ce5bbdcc8 100644 --- a/src/zencompute/runners/linuxrunner.cpp +++ b/src/zencompute/runners/linuxrunner.cpp @@ -430,7 +430,8 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action) if (ChildPid == 0) { - // Child process + // Child process — lower priority so workers don't starve the main server + nice(5); if (m_Sandboxed) { @@ -481,7 +482,8 @@ LinuxProcessRunner::SubmitAction(Ref<RunnerAction> Action) // Clean up the sandbox in the background m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath)); - ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf); + Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf); + ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason); Action->SetActionState(RunnerAction::State::Failed); return SubmitResult{.IsAccepted = false}; diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp index 1b748c0e5..96cbdc134 100644 --- a/src/zencompute/runners/localrunner.cpp +++ b/src/zencompute/runners/localrunner.cpp @@ -357,14 +357,21 @@ LocalProcessRunner::ManifestWorker(const WorkerDesc& Worker) std::filesystem::path WorkerDir = m_WorkerPath / fmt::format("runner_{}", Worker.WorkerId); - if (!std::filesystem::exists(WorkerDir)) + // worker.zcb is written as the last step of ManifestWorker, so its presence + // indicates a complete manifest. If the directory exists but the marker is + // missing, a previous manifest was interrupted and we need to start over. + bool NeedsManifest = !std::filesystem::exists(WorkerDir / "worker.zcb"); + + if (NeedsManifest) { _.ReleaseNow(); RwLock::ExclusiveLockScope $(m_WorkerLock); - if (!std::filesystem::exists(WorkerDir)) + if (!std::filesystem::exists(WorkerDir / "worker.zcb")) { + std::error_code Ec; + std::filesystem::remove_all(WorkerDir, Ec); ManifestWorker(Worker.Descriptor, WorkerDir, [](const IoHash&, CompressedBuffer&) {}); } } @@ -673,9 +680,15 @@ LocalProcessRunner::ProcessCompletedActions(std::vector<Ref<RunningAction>>& Com } catch (std::exception& Ex) { - ZEN_ERROR("Encountered failure while gathering outputs for action lsn {}, '{}'", ActionLsn, Ex.what()); + Running->Action->FailureReason = fmt::format("exception gathering outputs: {}", Ex.what()); + ZEN_ERROR("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason); } } + else + { + Running->Action->FailureReason = fmt::format("process exited with code {}", Running->ExitCode); + ZEN_WARN("action {} ({}) failed: {}", Running->Action->ActionId, ActionLsn, Running->Action->FailureReason); + } // Failed - clean up the sandbox in the background. diff --git a/src/zencompute/runners/macrunner.cpp b/src/zencompute/runners/macrunner.cpp index c2ccca9a6..13c01d988 100644 --- a/src/zencompute/runners/macrunner.cpp +++ b/src/zencompute/runners/macrunner.cpp @@ -211,7 +211,8 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action) if (ChildPid == 0) { - // Child process + // Child process — lower priority so workers don't starve the main server + nice(5); if (m_Sandboxed) { @@ -281,7 +282,8 @@ MacProcessRunner::SubmitAction(Ref<RunnerAction> Action) // Clean up the sandbox in the background m_DeferredDeleter.Enqueue(Action->ActionLsn, std::move(Prepared->SandboxPath)); - ZEN_ERROR("Sandbox setup failed for action {}: {}", Action->ActionLsn, ErrBuf); + Action->FailureReason = fmt::format("sandbox setup failed: {}", ErrBuf); + ZEN_ERROR("action {} ({}): {}", Action->ActionId, Action->ActionLsn, Action->FailureReason); Action->SetActionState(RunnerAction::State::Failed); return SubmitResult{.IsAccepted = false}; diff --git a/src/zencompute/runners/managedrunner.cpp b/src/zencompute/runners/managedrunner.cpp index e4a7ba388..a4f586852 100644 --- a/src/zencompute/runners/managedrunner.cpp +++ b/src/zencompute/runners/managedrunner.cpp @@ -128,7 +128,7 @@ ManagedProcessRunner::SubmitAction(Ref<RunnerAction> Action) CreateProcOptions Options; Options.WorkingDirectory = &Prepared->SandboxPath; - Options.Flags = CreateProcOptions::Flag_NoConsole; + Options.Flags = CreateProcOptions::Flag_NoConsole | CreateProcOptions::Flag_BelowNormalPriority; Options.Environment = std::move(EnvPairs); const int32_t ActionLsn = Prepared->ActionLsn; diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp index ce6a81173..55f78fdd6 100644 --- a/src/zencompute/runners/remotehttprunner.cpp +++ b/src/zencompute/runners/remotehttprunner.cpp @@ -20,6 +20,7 @@ # include <zenstore/cidstore.h> # include <span> +# include <unordered_set> ////////////////////////////////////////////////////////////////////////// @@ -38,6 +39,7 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, , m_ChunkResolver{InChunkResolver} , m_WorkerPool{InWorkerPool} , m_HostName{HostName} +, m_DisplayName{HostName} , m_BaseUrl{fmt::format("{}/compute", HostName)} , m_Http(m_BaseUrl) , m_InstanceId(Oid::NewOid()) @@ -59,6 +61,15 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } +void +RemoteHttpRunner::SetRemoteHostname(std::string_view Hostname) +{ + if (!Hostname.empty()) + { + m_DisplayName = fmt::format("{} ({})", m_HostName, Hostname); + } +} + RemoteHttpRunner::~RemoteHttpRunner() { Shutdown(); @@ -108,6 +119,7 @@ RemoteHttpRunner::Shutdown() for (auto& [RemoteLsn, HttpAction] : Remaining) { ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn); + HttpAction.Action->FailureReason = "remote runner shutdown"; HttpAction.Action->SetActionState(RunnerAction::State::Failed); } } @@ -213,11 +225,13 @@ RemoteHttpRunner::QueryCapacity() return 0; } - // Estimate how much more work we're ready to accept + // Estimate how much more work we're ready to accept. + // Include actions currently being submitted over HTTP so we don't + // keep queueing new submissions while previous ones are still in flight. RwLock::SharedLockScope _{m_RunningLock}; - size_t RunningCount = m_RemoteRunningMap.size(); + size_t RunningCount = m_RemoteRunningMap.size() + m_InFlightSubmissions.load(std::memory_order_relaxed); if (RunningCount >= size_t(m_MaxRunningActions)) { @@ -232,6 +246,9 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) { ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActions"); + m_InFlightSubmissions.fetch_add(Actions.size(), std::memory_order_relaxed); + auto InFlightGuard = MakeGuard([&] { m_InFlightSubmissions.fetch_sub(Actions.size(), std::memory_order_relaxed); }); + if (Actions.size() <= 1) { std::vector<SubmitResult> Results; @@ -359,108 +376,141 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) } } - // Enqueue job. If the remote returns FailedDependency (424), it means it - // cannot resolve the worker/function — re-register the worker and retry once. + // Submit the action to the remote. In eager-attach mode we build a + // CbPackage with all referenced attachments upfront to avoid the 404 + // round-trip. In the default mode we POST the bare object first and + // only upload missing attachments if the remote requests them. + // + // In both modes, FailedDependency (424) triggers a worker re-register + // and a single retry. CbObject Result; HttpClient::Response WorkResponse; HttpResponseCode WorkResponseCode{}; - for (int Attempt = 0; Attempt < 2; ++Attempt) - { - WorkResponse = m_Http.Post(SubmitUrl, ActionObj); - WorkResponseCode = WorkResponse.StatusCode; - - if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0) - { - ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying", - m_Http.GetBaseUri(), - ActionId); - - (void)RegisterWorker(Action->Worker.Descriptor); - } - else - { - break; - } - } - - if (WorkResponseCode == HttpResponseCode::OK) - { - Result = WorkResponse.AsObject(); - } - else if (WorkResponseCode == HttpResponseCode::NotFound) + if (m_EagerAttach) { - // Not all attachments are present - - // Build response package including all required attachments - CbPackage Pkg; Pkg.SetObject(ActionObj); - CbObject Response = WorkResponse.AsObject(); + ActionObj.IterateAttachments([&](CbFieldView Field) { + const IoHash AttachHash = Field.AsHash(); - for (auto& Item : Response["need"sv]) - { - const IoHash NeedHash = Item.AsHash(); - - if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash)) { uint64_t DataRawSize = 0; IoHash DataRawHash; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); - ZEN_ASSERT(DataRawHash == NeedHash); + Pkg.AddAttachment(CbAttachment(Compressed, AttachHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); + } + }); + + for (int Attempt = 0; Attempt < 2; ++Attempt) + { + WorkResponse = m_Http.Post(SubmitUrl, Pkg); + WorkResponseCode = WorkResponse.StatusCode; + + if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0) + { + ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying", + m_Http.GetBaseUri(), + ActionId); - Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + (void)RegisterWorker(Action->Worker.Descriptor); } else { - // No such attachment - - return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; + break; } } + } + else + { + for (int Attempt = 0; Attempt < 2; ++Attempt) + { + WorkResponse = m_Http.Post(SubmitUrl, ActionObj); + WorkResponseCode = WorkResponse.StatusCode; - // Post resulting package + if (WorkResponseCode == HttpResponseCode::FailedDependency && Attempt == 0) + { + ZEN_WARN("remote {} returned FailedDependency for action {} — re-registering worker and retrying", + m_Http.GetBaseUri(), + ActionId); - HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg); + (void)RegisterWorker(Action->Worker.Descriptor); + } + else + { + break; + } + } - if (!PayloadResponse) + if (WorkResponseCode == HttpResponseCode::NotFound) { - ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl); + // Remote needs attachments — resolve them and retry with a CbPackage - // TODO: include more information about the failure in the response + CbPackage Pkg; + Pkg.SetObject(ActionObj); - return {.IsAccepted = false, .Reason = "HTTP request failed"}; - } - else if (PayloadResponse.StatusCode == HttpResponseCode::OK) - { - Result = PayloadResponse.AsObject(); - } - else - { - // Unexpected response - - const int ResponseStatusCode = (int)PayloadResponse.StatusCode; - - ZEN_WARN("unable to register payloads for action {} at {}{} (error: {} {})", - ActionId, - m_Http.GetBaseUri(), - SubmitUrl, - ResponseStatusCode, - ToString(ResponseStatusCode)); - - return {.IsAccepted = false, - .Reason = fmt::format("unexpected response code {} {} from {}{}", - ResponseStatusCode, - ToString(ResponseStatusCode), - m_Http.GetBaseUri(), - SubmitUrl)}; + CbObject Response = WorkResponse.AsObject(); + + for (auto& Item : Response["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); + } + else + { + return {.IsAccepted = false, .Reason = fmt::format("missing attachment {}", NeedHash)}; + } + } + + HttpClient::Response PayloadResponse = m_Http.Post(SubmitUrl, Pkg); + + if (!PayloadResponse) + { + ZEN_WARN("unable to register payloads for action {} at {}{}", ActionId, m_Http.GetBaseUri(), SubmitUrl); + return {.IsAccepted = false, .Reason = "HTTP request failed"}; + } + + WorkResponse = std::move(PayloadResponse); + WorkResponseCode = WorkResponse.StatusCode; } } + if (WorkResponseCode == HttpResponseCode::OK) + { + Result = WorkResponse.AsObject(); + } + else if (!WorkResponse) + { + ZEN_WARN("submit of action {} to {}{} failed", ActionId, m_Http.GetBaseUri(), SubmitUrl); + return {.IsAccepted = false, .Reason = "HTTP request failed"}; + } + else if (!IsHttpSuccessCode(WorkResponseCode)) + { + const int Code = static_cast<int>(WorkResponseCode); + ZEN_WARN("submit of action {} to {}{} returned {} {}", ActionId, m_Http.GetBaseUri(), SubmitUrl, Code, ToString(Code)); + return {.IsAccepted = false, + .Reason = fmt::format("unexpected response code {} {} from {}{}", Code, ToString(Code), m_Http.GetBaseUri(), SubmitUrl)}; + } + if (Result) { if (const int32_t LsnField = Result["lsn"].AsInt32(0)) @@ -512,82 +562,110 @@ RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vec CbObjectWriter Body; Body.BeginArray("actions"sv); + std::unordered_set<IoHash, IoHash::Hasher> AttachmentsSeen; + for (const Ref<RunnerAction>& Action : Actions) { Action->ExecutionLocation = m_HostName; MaybeDumpAction(Action->ActionLsn, Action->ActionObj); Body.AddObject(Action->ActionObj); + + if (m_EagerAttach) + { + Action->ActionObj.IterateAttachments([&](CbFieldView Field) { AttachmentsSeen.insert(Field.AsHash()); }); + } } Body.EndArray(); - // POST the batch - - HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save()); - - if (Response.StatusCode == HttpResponseCode::OK) - { - return ParseBatchResponse(Response, Actions); - } + // In eager-attach mode, build a CbPackage with all referenced attachments + // so the remote can accept in a single round-trip. Otherwise POST a bare + // CbObject and handle the 404 need-list flow. - if (Response.StatusCode == HttpResponseCode::NotFound) + if (m_EagerAttach) { - // Server needs attachments — resolve them and retry with a CbPackage - - CbObject NeedObj = Response.AsObject(); - CbPackage Pkg; Pkg.SetObject(Body.Save()); - for (auto& Item : NeedObj["need"sv]) + for (const IoHash& AttachHash : AttachmentsSeen) { - const IoHash NeedHash = Item.AsHash(); - - if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(AttachHash)) { uint64_t DataRawSize = 0; IoHash DataRawHash; CompressedBuffer Compressed = CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); - ZEN_ASSERT(DataRawHash == NeedHash); - - Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); - } - else - { - ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash); - return FallbackToIndividualSubmit(Actions); + Pkg.AddAttachment(CbAttachment(Compressed, AttachHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); } } - HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg); + HttpClient::Response Response = m_Http.Post(SubmitUrl, Pkg); + + if (Response.StatusCode == HttpResponseCode::OK) + { + return ParseBatchResponse(Response, Actions); + } + } + else + { + HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save()); - if (RetryResponse.StatusCode == HttpResponseCode::OK) + if (Response.StatusCode == HttpResponseCode::OK) { - return ParseBatchResponse(RetryResponse, Actions); + return ParseBatchResponse(Response, Actions); } - ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit", - (int)RetryResponse.StatusCode, - ToString(RetryResponse.StatusCode)); - return FallbackToIndividualSubmit(Actions); + if (Response.StatusCode == HttpResponseCode::NotFound) + { + CbObject NeedObj = Response.AsObject(); + + CbPackage Pkg; + Pkg.SetObject(Body.Save()); + + for (auto& Item : NeedObj["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + m_LastSubmitStats.TotalAttachments.fetch_add(1, std::memory_order_relaxed); + m_LastSubmitStats.TotalAttachmentBytes.fetch_add(Chunk.GetSize(), std::memory_order_relaxed); + } + else + { + ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash); + return FallbackToIndividualSubmit(Actions); + } + } + + HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg); + + if (RetryResponse.StatusCode == HttpResponseCode::OK) + { + return ParseBatchResponse(RetryResponse, Actions); + } + + ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit", + (int)RetryResponse.StatusCode, + ToString(RetryResponse.StatusCode)); + return FallbackToIndividualSubmit(Actions); + } } // Unexpected status or connection error — fall back to individual submission - if (Response) - { - ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit", - m_Http.GetBaseUri(), - SubmitUrl, - (int)Response.StatusCode, - ToString(Response.StatusCode)); - } - else - { - ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl); - } + ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl); return FallbackToIndividualSubmit(Actions); } @@ -869,9 +947,10 @@ RemoteHttpRunner::SweepRunningActions() { for (auto& FieldIt : Completed["completed"sv]) { - CbObjectView EntryObj = FieldIt.AsObjectView(); - const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32(); - std::string_view StateName = EntryObj["state"sv].AsString(); + CbObjectView EntryObj = FieldIt.AsObjectView(); + const int32_t CompleteLsn = EntryObj["lsn"sv].AsInt32(); + std::string_view StateName = EntryObj["state"sv].AsString(); + std::string_view FailureReason = EntryObj["reason"sv].AsString(); RunnerAction::State RemoteState = RunnerAction::FromString(StateName); @@ -884,6 +963,7 @@ RemoteHttpRunner::SweepRunningActions() { HttpRunningAction CompletedAction = std::move(CompleteIt->second); CompletedAction.RemoteState = RemoteState; + CompletedAction.FailureReason = std::string(FailureReason); if (RemoteState == RunnerAction::State::Completed && ResponseJob) { @@ -927,16 +1007,44 @@ RemoteHttpRunner::SweepRunningActions() { const int ActionLsn = HttpAction.Action->ActionLsn; - ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}", - HttpAction.Action->ActionId, - ActionLsn, - HttpAction.RemoteActionLsn, - RunnerAction::ToString(HttpAction.RemoteState)); - if (HttpAction.RemoteState == RunnerAction::State::Completed) { + ZEN_DEBUG("action {} LSN {} (remote LSN {}) completed on {}", + HttpAction.Action->ActionId, + ActionLsn, + HttpAction.RemoteActionLsn, + m_HostName); HttpAction.Action->SetResult(std::move(HttpAction.ActionResults)); } + else if (HttpAction.RemoteState == RunnerAction::State::Failed || HttpAction.RemoteState == RunnerAction::State::Abandoned) + { + HttpAction.Action->FailureReason = HttpAction.FailureReason; + if (HttpAction.FailureReason.empty()) + { + ZEN_WARN("action {} ({}) {} on remote {}", + HttpAction.Action->ActionId, + ActionLsn, + RunnerAction::ToString(HttpAction.RemoteState), + m_HostName); + } + else + { + ZEN_WARN("action {} ({}) {} on remote {}: {}", + HttpAction.Action->ActionId, + ActionLsn, + RunnerAction::ToString(HttpAction.RemoteState), + m_HostName, + HttpAction.FailureReason); + } + } + else + { + ZEN_DEBUG("action {} LSN {} (remote LSN {}) -> {}", + HttpAction.Action->ActionId, + ActionLsn, + HttpAction.RemoteActionLsn, + RunnerAction::ToString(HttpAction.RemoteState)); + } HttpAction.Action->SetActionState(HttpAction.RemoteState); } diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h index c17d0cf2a..fdf113c77 100644 --- a/src/zencompute/runners/remotehttprunner.h +++ b/src/zencompute/runners/remotehttprunner.h @@ -54,8 +54,10 @@ public: [[nodiscard]] virtual size_t QueryCapacity() override; [[nodiscard]] virtual std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) override; virtual void CancelRemoteQueue(int QueueId) override; + [[nodiscard]] virtual std::string_view GetDisplayName() const override { return m_DisplayName; } std::string_view GetHostName() const { return m_HostName; } + void SetRemoteHostname(std::string_view Hostname); protected: LoggerRef Log() { return m_Log; } @@ -65,12 +67,15 @@ private: ChunkResolver& m_ChunkResolver; WorkerThreadPool& m_WorkerPool; std::string m_HostName; + std::string m_DisplayName; std::string m_BaseUrl; HttpClient m_Http; - std::atomic<bool> m_AcceptNewActions{true}; - int32_t m_MaxRunningActions = 256; // arbitrary limit for testing - int32_t m_MaxBatchSize = 50; + std::atomic<bool> m_AcceptNewActions{true}; + int32_t m_MaxRunningActions = 256; // arbitrary limit for testing + int32_t m_MaxBatchSize = 50; + bool m_EagerAttach = true; ///< Send attachments with every submit instead of the two-step 404 retry + std::atomic<size_t> m_InFlightSubmissions{0}; // actions currently being submitted over HTTP struct HttpRunningAction { @@ -78,6 +83,7 @@ private: int RemoteActionLsn = 0; // Remote LSN RunnerAction::State RemoteState = RunnerAction::State::Failed; CbPackage ActionResults; + std::string FailureReason; }; RwLock m_RunningLock; diff --git a/src/zencompute/runners/windowsrunner.cpp b/src/zencompute/runners/windowsrunner.cpp index 92ee65c2d..e643c9ce8 100644 --- a/src/zencompute/runners/windowsrunner.cpp +++ b/src/zencompute/runners/windowsrunner.cpp @@ -48,7 +48,9 @@ WindowsProcessRunner::WindowsProcessRunner(ChunkResolver& Resolver, if (m_JobObject) { JOBOBJECT_EXTENDED_LIMIT_INFORMATION ExtLimits{}; - ExtLimits.BasicLimitInformation.LimitFlags = JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION; + ExtLimits.BasicLimitInformation.LimitFlags = + JOB_OBJECT_LIMIT_KILL_ON_JOB_CLOSE | JOB_OBJECT_LIMIT_DIE_ON_UNHANDLED_EXCEPTION | JOB_OBJECT_LIMIT_PRIORITY_CLASS; + ExtLimits.BasicLimitInformation.PriorityClass = BELOW_NORMAL_PRIORITY_CLASS; SetInformationJobObject(m_JobObject, JobObjectExtendedLimitInformation, &ExtLimits, sizeof(ExtLimits)); JOBOBJECT_BASIC_UI_RESTRICTIONS UiRestrictions{}; diff --git a/src/zencompute/runners/winerunner.cpp b/src/zencompute/runners/winerunner.cpp index b4fafb467..593b19e55 100644 --- a/src/zencompute/runners/winerunner.cpp +++ b/src/zencompute/runners/winerunner.cpp @@ -96,7 +96,9 @@ WineProcessRunner::SubmitAction(Ref<RunnerAction> Action) if (ChildPid == 0) { - // Child process + // Child process — lower priority so workers don't starve the main server + nice(5); + if (chdir(SandboxPathStr.c_str()) != 0) { _exit(127); |