diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 11:19:10 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-18 11:19:10 +0100 |
| commit | eba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch) | |
| tree | 3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/runners/remotehttprunner.cpp | |
| parent | bugfix release - v5.7.23 (#851) (diff) | |
| download | zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip | |
Compute batching (#849)
### Compute Batch Submission
- Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads
- Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure
- Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps`
### Retracted Action State
- Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount`
- Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession`
- Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints
- Add state machine documentation and per-state comments to `RunnerAction`
### Compute Race Fixes
- Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely
- Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK
### Logging Optimization and ANSI improvements
- Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex`
- Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage`
- Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h`
- Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences
- Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files
### CLI / Exec Refactoring
- Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct
- Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`)
- Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser
### Testing Improvements
- Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter
- Add test suite banners to test listener output
- Made `function.session.abandon_pending` test more robust
### Startup / Reliability Fixes
- Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()`
- Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing
- Fail on unrecognized zenserver `--mode` instead of silently defaulting to store
### Other
- Show host details (hostname, platform, CPU count, memory) when discovering new compute workers
- Move frontend `html.zip` from source tree into build directory
- Add format specifications for Compact Binary and Compressed Buffer wire formats
- Add `WriteCompactBinaryObject` to zencore
- Extended `ConsoleTui` with additional functionality
- Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support
- Disable compute/horde/nomad in release builds (not yet production-ready)
- Disable unintended `ASIO_HAS_IO_URING` enablement
- Fix crashpad patch missing leading whitespace
- Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/runners/remotehttprunner.cpp')
| -rw-r--r-- | src/zencompute/runners/remotehttprunner.cpp | 399 |
1 files changed, 365 insertions, 34 deletions
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp index 672636d06..ce6a81173 100644 --- a/src/zencompute/runners/remotehttprunner.cpp +++ b/src/zencompute/runners/remotehttprunner.cpp @@ -42,6 +42,20 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver, , m_Http(m_BaseUrl) , m_InstanceId(Oid::NewOid()) { + // Attempt to connect a WebSocket for push-based completion notifications. + // If the remote doesn't support WS, OnWsClose fires and we fall back to polling. + { + std::string WsUrl = HttpToWsUrl(HostName, "/compute/ws"); + + HttpWsClientSettings WsSettings; + WsSettings.LogCategory = "http_exec_ws"; + WsSettings.ConnectTimeout = std::chrono::milliseconds{3000}; + + IWsClientHandler& Handler = *this; + m_WsClient = std::make_unique<HttpWsClient>(WsUrl, Handler, WsSettings); + m_WsClient->Connect(); + } + m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this}; } @@ -53,7 +67,29 @@ RemoteHttpRunner::~RemoteHttpRunner() void RemoteHttpRunner::Shutdown() { - // TODO: should cleanly drain/cancel pending work + m_AcceptNewActions = false; + + // Close the WebSocket client first, so no more wakeup signals arrive. + if (m_WsClient) + { + m_WsClient->Close(); + } + + // Cancel all known remote queues so the remote side stops scheduling new + // work and cancels in-flight actions belonging to those queues. + + { + std::vector<std::pair<int, Oid>> Queues; + + m_QueueTokenLock.WithSharedLock([&] { Queues.assign(m_RemoteQueueTokens.begin(), m_RemoteQueueTokens.end()); }); + + for (const auto& [QueueId, Token] : Queues) + { + CancelRemoteQueue(QueueId); + } + } + + // Stop the monitor thread so it no longer polls the remote. m_MonitorThreadEnabled = false; m_MonitorThreadEvent.Set(); @@ -61,9 +97,22 @@ RemoteHttpRunner::Shutdown() { m_MonitorThread.join(); } + + // Drain the running map and mark all remaining actions as Failed so the + // scheduler can reschedule or finalize them. + + std::unordered_map<int, HttpRunningAction> Remaining; + + m_RunningLock.WithExclusiveLock([&] { Remaining.swap(m_RemoteRunningMap); }); + + for (auto& [RemoteLsn, HttpAction] : Remaining) + { + ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn); + HttpAction.Action->SetActionState(RunnerAction::State::Failed); + } } -void +bool RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) { ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker"); @@ -125,15 +174,13 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) if (!IsHttpSuccessCode(PayloadResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); - - // TODO: propagate error + return false; } } else if (!IsHttpSuccessCode(DescResponse.StatusCode)) { ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl); - - // TODO: propagate error + return false; } else { @@ -152,14 +199,20 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage) WorkerUrl, (int)WorkerResponse.StatusCode, ToString(WorkerResponse.StatusCode)); - - // TODO: propagate error + return false; } + + return true; } size_t RemoteHttpRunner::QueryCapacity() { + if (!m_AcceptNewActions) + { + return 0; + } + // Estimate how much more work we're ready to accept RwLock::SharedLockScope _{m_RunningLock}; @@ -191,24 +244,68 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions) return Results; } - // For larger batches, submit HTTP requests in parallel via the shared worker pool + // Collect distinct QueueIds and ensure remote queues exist once per queue - std::vector<std::future<SubmitResult>> Futures; - Futures.reserve(Actions.size()); + std::unordered_map<int, Oid> QueueTokens; // QueueId → remote token (0 stays as Zero) for (const Ref<RunnerAction>& Action : Actions) { - std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); }); + const int QueueId = Action->QueueId; + if (QueueId != 0 && QueueTokens.find(QueueId) == QueueTokens.end()) + { + CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId); + CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId); + QueueTokens[QueueId] = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig); + } + } - Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); + // Group actions by QueueId + + struct QueueGroup + { + std::vector<Ref<RunnerAction>> Actions; + std::vector<size_t> OriginalIndices; + }; + + std::unordered_map<int, QueueGroup> Groups; + + for (size_t i = 0; i < Actions.size(); ++i) + { + auto& Group = Groups[Actions[i]->QueueId]; + Group.Actions.push_back(Actions[i]); + Group.OriginalIndices.push_back(i); } - std::vector<SubmitResult> Results; - Results.reserve(Futures.size()); + // Submit each group as a batch and map results back to original indices - for (auto& Future : Futures) + std::vector<SubmitResult> Results(Actions.size()); + + for (auto& [QueueId, Group] : Groups) { - Results.push_back(Future.get()); + std::string SubmitUrl = "/jobs"; + if (QueueId != 0) + { + if (Oid Token = QueueTokens[QueueId]; Token != Oid::Zero) + { + SubmitUrl = fmt::format("/queues/{}/jobs", Token); + } + } + + const size_t BatchLimit = size_t(m_MaxBatchSize); + + for (size_t Offset = 0; Offset < Group.Actions.size(); Offset += BatchLimit) + { + size_t End = zen::Min(Offset + BatchLimit, Group.Actions.size()); + + std::vector<Ref<RunnerAction>> Chunk(Group.Actions.begin() + Offset, Group.Actions.begin() + End); + + std::vector<SubmitResult> ChunkResults = SubmitActionBatch(SubmitUrl, Chunk); + + for (size_t j = 0; j < ChunkResults.size(); ++j) + { + Results[Group.OriginalIndices[Offset + j]] = std::move(ChunkResults[j]); + } + } } return Results; @@ -221,6 +318,11 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) // Verify whether we can accept more work + if (!m_AcceptNewActions) + { + return SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"}; + } + { RwLock::SharedLockScope _{m_RunningLock}; if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) @@ -275,7 +377,7 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) m_Http.GetBaseUri(), ActionId); - RegisterWorker(Action->Worker.Descriptor); + (void)RegisterWorker(Action->Worker.Descriptor); } else { @@ -384,6 +486,194 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action) return {}; } +std::vector<SubmitResult> +RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vector<Ref<RunnerAction>>& Actions) +{ + ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActionBatch"); + + if (!m_AcceptNewActions) + { + return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"}); + } + + // Capacity check + + { + RwLock::SharedLockScope _{m_RunningLock}; + if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions)) + { + std::vector<SubmitResult> Results(Actions.size(), SubmitResult{.IsAccepted = false}); + return Results; + } + } + + // Per-action setup and build batch body + + CbObjectWriter Body; + Body.BeginArray("actions"sv); + + for (const Ref<RunnerAction>& Action : Actions) + { + Action->ExecutionLocation = m_HostName; + MaybeDumpAction(Action->ActionLsn, Action->ActionObj); + Body.AddObject(Action->ActionObj); + } + + Body.EndArray(); + + // POST the batch + + HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save()); + + if (Response.StatusCode == HttpResponseCode::OK) + { + return ParseBatchResponse(Response, Actions); + } + + if (Response.StatusCode == HttpResponseCode::NotFound) + { + // Server needs attachments — resolve them and retry with a CbPackage + + CbObject NeedObj = Response.AsObject(); + + CbPackage Pkg; + Pkg.SetObject(Body.Save()); + + for (auto& Item : NeedObj["need"sv]) + { + const IoHash NeedHash = Item.AsHash(); + + if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash)) + { + uint64_t DataRawSize = 0; + IoHash DataRawHash; + CompressedBuffer Compressed = + CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize); + + ZEN_ASSERT(DataRawHash == NeedHash); + + Pkg.AddAttachment(CbAttachment(Compressed, NeedHash)); + } + else + { + ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash); + return FallbackToIndividualSubmit(Actions); + } + } + + HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg); + + if (RetryResponse.StatusCode == HttpResponseCode::OK) + { + return ParseBatchResponse(RetryResponse, Actions); + } + + ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit", + (int)RetryResponse.StatusCode, + ToString(RetryResponse.StatusCode)); + return FallbackToIndividualSubmit(Actions); + } + + // Unexpected status or connection error — fall back to individual submission + + if (Response) + { + ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit", + m_Http.GetBaseUri(), + SubmitUrl, + (int)Response.StatusCode, + ToString(Response.StatusCode)); + } + else + { + ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl); + } + + return FallbackToIndividualSubmit(Actions); +} + +std::vector<SubmitResult> +RemoteHttpRunner::ParseBatchResponse(const HttpClient::Response& Response, const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<SubmitResult> Results; + Results.reserve(Actions.size()); + + CbObject ResponseObj = Response.AsObject(); + CbArrayView ResultArray = ResponseObj["results"sv].AsArrayView(); + + size_t Index = 0; + for (CbFieldView Field : ResultArray) + { + if (Index >= Actions.size()) + { + break; + } + + CbObjectView Entry = Field.AsObjectView(); + const int32_t LsnField = Entry["lsn"sv].AsInt32(0); + + if (LsnField > 0) + { + HttpRunningAction NewAction; + NewAction.Action = Actions[Index]; + NewAction.RemoteActionLsn = LsnField; + + { + RwLock::ExclusiveLockScope _(m_RunningLock); + m_RemoteRunningMap[LsnField] = std::move(NewAction); + } + + ZEN_DEBUG("batch: scheduled action {} with remote LSN {} (local LSN {})", + Actions[Index]->ActionObj.GetHash(), + LsnField, + Actions[Index]->ActionLsn); + + Actions[Index]->SetActionState(RunnerAction::State::Running); + + Results.push_back(SubmitResult{.IsAccepted = true}); + } + else + { + std::string_view ErrorMsg = Entry["error"sv].AsString(); + Results.push_back(SubmitResult{.IsAccepted = false, .Reason = std::string(ErrorMsg)}); + } + + ++Index; + } + + // If the server returned fewer results than actions, mark the rest as not accepted + while (Results.size() < Actions.size()) + { + Results.push_back(SubmitResult{.IsAccepted = false, .Reason = "no result from server"}); + } + + return Results; +} + +std::vector<SubmitResult> +RemoteHttpRunner::FallbackToIndividualSubmit(const std::vector<Ref<RunnerAction>>& Actions) +{ + std::vector<std::future<SubmitResult>> Futures; + Futures.reserve(Actions.size()); + + for (const Ref<RunnerAction>& Action : Actions) + { + std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); }); + + Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog)); + } + + std::vector<SubmitResult> Results; + Results.reserve(Futures.size()); + + for (auto& Future : Futures) + { + Results.push_back(Future.get()); + } + + return Results; +} + Oid RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config) { @@ -481,6 +771,35 @@ RemoteHttpRunner::GetSubmittedActionCount() return m_RemoteRunningMap.size(); } +////////////////////////////////////////////////////////////////////////// +// +// IWsClientHandler +// + +void +RemoteHttpRunner::OnWsOpen() +{ + ZEN_INFO("WebSocket connected to {}", m_HostName); + m_WsConnected.store(true, std::memory_order_release); +} + +void +RemoteHttpRunner::OnWsMessage([[maybe_unused]] const WebSocketMessage& Msg) +{ + // The message content is a wakeup signal; no parsing needed. + // Signal the monitor thread to sweep completed actions immediately. + m_MonitorThreadEvent.Set(); +} + +void +RemoteHttpRunner::OnWsClose([[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + ZEN_WARN("WebSocket disconnected from {} (code {})", m_HostName, Code); + m_WsConnected.store(false, std::memory_order_release); +} + +////////////////////////////////////////////////////////////////////////// + void RemoteHttpRunner::MonitorThreadFunction() { @@ -489,28 +808,40 @@ RemoteHttpRunner::MonitorThreadFunction() do { const int NormalWaitingTime = 200; - int WaitTimeMs = NormalWaitingTime; - auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; - auto SweepOnce = [&] { + const int WsWaitingTime = 2000; // Safety-net interval when WS is connected + + int WaitTimeMs = m_WsConnected.load(std::memory_order_relaxed) ? WsWaitingTime : NormalWaitingTime; + auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); }; + auto SweepOnce = [&] { const size_t RetiredCount = SweepRunningActions(); - m_RunningLock.WithSharedLock([&] { - if (m_RemoteRunningMap.size() > 16) - { - WaitTimeMs = NormalWaitingTime / 4; - } - else - { - if (RetiredCount) + if (m_WsConnected.load(std::memory_order_relaxed)) + { + // WS connected: use long safety-net interval; the WS message + // will wake us immediately for the real work. + WaitTimeMs = WsWaitingTime; + } + else + { + // No WS: adaptive polling as before + m_RunningLock.WithSharedLock([&] { + if (m_RemoteRunningMap.size() > 16) { - WaitTimeMs = NormalWaitingTime / 2; + WaitTimeMs = NormalWaitingTime / 4; } else { - WaitTimeMs = NormalWaitingTime; + if (RetiredCount) + { + WaitTimeMs = NormalWaitingTime / 2; + } + else + { + WaitTimeMs = NormalWaitingTime; + } } - } - }); + }); + } }; while (!WaitOnce()) @@ -518,7 +849,7 @@ RemoteHttpRunner::MonitorThreadFunction() SweepOnce(); } - // Signal received - this may mean we should quit + // Signal received — may be a WS wakeup or a quit signal SweepOnce(); } while (m_MonitorThreadEnabled); |