aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners/remotehttprunner.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/runners/remotehttprunner.cpp
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/runners/remotehttprunner.cpp')
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp399
1 files changed, 365 insertions, 34 deletions
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index 672636d06..ce6a81173 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -42,6 +42,20 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
{
+ // Attempt to connect a WebSocket for push-based completion notifications.
+ // If the remote doesn't support WS, OnWsClose fires and we fall back to polling.
+ {
+ std::string WsUrl = HttpToWsUrl(HostName, "/compute/ws");
+
+ HttpWsClientSettings WsSettings;
+ WsSettings.LogCategory = "http_exec_ws";
+ WsSettings.ConnectTimeout = std::chrono::milliseconds{3000};
+
+ IWsClientHandler& Handler = *this;
+ m_WsClient = std::make_unique<HttpWsClient>(WsUrl, Handler, WsSettings);
+ m_WsClient->Connect();
+ }
+
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
@@ -53,7 +67,29 @@ RemoteHttpRunner::~RemoteHttpRunner()
void
RemoteHttpRunner::Shutdown()
{
- // TODO: should cleanly drain/cancel pending work
+ m_AcceptNewActions = false;
+
+ // Close the WebSocket client first, so no more wakeup signals arrive.
+ if (m_WsClient)
+ {
+ m_WsClient->Close();
+ }
+
+ // Cancel all known remote queues so the remote side stops scheduling new
+ // work and cancels in-flight actions belonging to those queues.
+
+ {
+ std::vector<std::pair<int, Oid>> Queues;
+
+ m_QueueTokenLock.WithSharedLock([&] { Queues.assign(m_RemoteQueueTokens.begin(), m_RemoteQueueTokens.end()); });
+
+ for (const auto& [QueueId, Token] : Queues)
+ {
+ CancelRemoteQueue(QueueId);
+ }
+ }
+
+ // Stop the monitor thread so it no longer polls the remote.
m_MonitorThreadEnabled = false;
m_MonitorThreadEvent.Set();
@@ -61,9 +97,22 @@ RemoteHttpRunner::Shutdown()
{
m_MonitorThread.join();
}
+
+ // Drain the running map and mark all remaining actions as Failed so the
+ // scheduler can reschedule or finalize them.
+
+ std::unordered_map<int, HttpRunningAction> Remaining;
+
+ m_RunningLock.WithExclusiveLock([&] { Remaining.swap(m_RemoteRunningMap); });
+
+ for (auto& [RemoteLsn, HttpAction] : Remaining)
+ {
+ ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn);
+ HttpAction.Action->SetActionState(RunnerAction::State::Failed);
+ }
}
-void
+bool
RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
{
ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker");
@@ -125,15 +174,13 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
if (!IsHttpSuccessCode(PayloadResponse.StatusCode))
{
ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
-
- // TODO: propagate error
+ return false;
}
}
else if (!IsHttpSuccessCode(DescResponse.StatusCode))
{
ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
-
- // TODO: propagate error
+ return false;
}
else
{
@@ -152,14 +199,20 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
WorkerUrl,
(int)WorkerResponse.StatusCode,
ToString(WorkerResponse.StatusCode));
-
- // TODO: propagate error
+ return false;
}
+
+ return true;
}
size_t
RemoteHttpRunner::QueryCapacity()
{
+ if (!m_AcceptNewActions)
+ {
+ return 0;
+ }
+
// Estimate how much more work we're ready to accept
RwLock::SharedLockScope _{m_RunningLock};
@@ -191,24 +244,68 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
return Results;
}
- // For larger batches, submit HTTP requests in parallel via the shared worker pool
+ // Collect distinct QueueIds and ensure remote queues exist once per queue
- std::vector<std::future<SubmitResult>> Futures;
- Futures.reserve(Actions.size());
+ std::unordered_map<int, Oid> QueueTokens; // QueueId → remote token (0 stays as Zero)
for (const Ref<RunnerAction>& Action : Actions)
{
- std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); });
+ const int QueueId = Action->QueueId;
+ if (QueueId != 0 && QueueTokens.find(QueueId) == QueueTokens.end())
+ {
+ CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId);
+ CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId);
+ QueueTokens[QueueId] = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig);
+ }
+ }
- Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog));
+ // Group actions by QueueId
+
+ struct QueueGroup
+ {
+ std::vector<Ref<RunnerAction>> Actions;
+ std::vector<size_t> OriginalIndices;
+ };
+
+ std::unordered_map<int, QueueGroup> Groups;
+
+ for (size_t i = 0; i < Actions.size(); ++i)
+ {
+ auto& Group = Groups[Actions[i]->QueueId];
+ Group.Actions.push_back(Actions[i]);
+ Group.OriginalIndices.push_back(i);
}
- std::vector<SubmitResult> Results;
- Results.reserve(Futures.size());
+ // Submit each group as a batch and map results back to original indices
- for (auto& Future : Futures)
+ std::vector<SubmitResult> Results(Actions.size());
+
+ for (auto& [QueueId, Group] : Groups)
{
- Results.push_back(Future.get());
+ std::string SubmitUrl = "/jobs";
+ if (QueueId != 0)
+ {
+ if (Oid Token = QueueTokens[QueueId]; Token != Oid::Zero)
+ {
+ SubmitUrl = fmt::format("/queues/{}/jobs", Token);
+ }
+ }
+
+ const size_t BatchLimit = size_t(m_MaxBatchSize);
+
+ for (size_t Offset = 0; Offset < Group.Actions.size(); Offset += BatchLimit)
+ {
+ size_t End = zen::Min(Offset + BatchLimit, Group.Actions.size());
+
+ std::vector<Ref<RunnerAction>> Chunk(Group.Actions.begin() + Offset, Group.Actions.begin() + End);
+
+ std::vector<SubmitResult> ChunkResults = SubmitActionBatch(SubmitUrl, Chunk);
+
+ for (size_t j = 0; j < ChunkResults.size(); ++j)
+ {
+ Results[Group.OriginalIndices[Offset + j]] = std::move(ChunkResults[j]);
+ }
+ }
}
return Results;
@@ -221,6 +318,11 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
// Verify whether we can accept more work
+ if (!m_AcceptNewActions)
+ {
+ return SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"};
+ }
+
{
RwLock::SharedLockScope _{m_RunningLock};
if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
@@ -275,7 +377,7 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
m_Http.GetBaseUri(),
ActionId);
- RegisterWorker(Action->Worker.Descriptor);
+ (void)RegisterWorker(Action->Worker.Descriptor);
}
else
{
@@ -384,6 +486,194 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
return {};
}
+std::vector<SubmitResult>
+RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vector<Ref<RunnerAction>>& Actions)
+{
+ ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActionBatch");
+
+ if (!m_AcceptNewActions)
+ {
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"});
+ }
+
+ // Capacity check
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+ if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ std::vector<SubmitResult> Results(Actions.size(), SubmitResult{.IsAccepted = false});
+ return Results;
+ }
+ }
+
+ // Per-action setup and build batch body
+
+ CbObjectWriter Body;
+ Body.BeginArray("actions"sv);
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Action->ExecutionLocation = m_HostName;
+ MaybeDumpAction(Action->ActionLsn, Action->ActionObj);
+ Body.AddObject(Action->ActionObj);
+ }
+
+ Body.EndArray();
+
+ // POST the batch
+
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
+
+ if (Response.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(Response, Actions);
+ }
+
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ // Server needs attachments — resolve them and retry with a CbPackage
+
+ CbObject NeedObj = Response.AsObject();
+
+ CbPackage Pkg;
+ Pkg.SetObject(Body.Save());
+
+ for (auto& Item : NeedObj["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ }
+ else
+ {
+ ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash);
+ return FallbackToIndividualSubmit(Actions);
+ }
+ }
+
+ HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(RetryResponse, Actions);
+ }
+
+ ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit",
+ (int)RetryResponse.StatusCode,
+ ToString(RetryResponse.StatusCode));
+ return FallbackToIndividualSubmit(Actions);
+ }
+
+ // Unexpected status or connection error — fall back to individual submission
+
+ if (Response)
+ {
+ ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit",
+ m_Http.GetBaseUri(),
+ SubmitUrl,
+ (int)Response.StatusCode,
+ ToString(Response.StatusCode));
+ }
+ else
+ {
+ ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
+ }
+
+ return FallbackToIndividualSubmit(Actions);
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::ParseBatchResponse(const HttpClient::Response& Response, const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+ Results.reserve(Actions.size());
+
+ CbObject ResponseObj = Response.AsObject();
+ CbArrayView ResultArray = ResponseObj["results"sv].AsArrayView();
+
+ size_t Index = 0;
+ for (CbFieldView Field : ResultArray)
+ {
+ if (Index >= Actions.size())
+ {
+ break;
+ }
+
+ CbObjectView Entry = Field.AsObjectView();
+ const int32_t LsnField = Entry["lsn"sv].AsInt32(0);
+
+ if (LsnField > 0)
+ {
+ HttpRunningAction NewAction;
+ NewAction.Action = Actions[Index];
+ NewAction.RemoteActionLsn = LsnField;
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RemoteRunningMap[LsnField] = std::move(NewAction);
+ }
+
+ ZEN_DEBUG("batch: scheduled action {} with remote LSN {} (local LSN {})",
+ Actions[Index]->ActionObj.GetHash(),
+ LsnField,
+ Actions[Index]->ActionLsn);
+
+ Actions[Index]->SetActionState(RunnerAction::State::Running);
+
+ Results.push_back(SubmitResult{.IsAccepted = true});
+ }
+ else
+ {
+ std::string_view ErrorMsg = Entry["error"sv].AsString();
+ Results.push_back(SubmitResult{.IsAccepted = false, .Reason = std::string(ErrorMsg)});
+ }
+
+ ++Index;
+ }
+
+ // If the server returned fewer results than actions, mark the rest as not accepted
+ while (Results.size() < Actions.size())
+ {
+ Results.push_back(SubmitResult{.IsAccepted = false, .Reason = "no result from server"});
+ }
+
+ return Results;
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::FallbackToIndividualSubmit(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<std::future<SubmitResult>> Futures;
+ Futures.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); });
+
+ Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog));
+ }
+
+ std::vector<SubmitResult> Results;
+ Results.reserve(Futures.size());
+
+ for (auto& Future : Futures)
+ {
+ Results.push_back(Future.get());
+ }
+
+ return Results;
+}
+
Oid
RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config)
{
@@ -481,6 +771,35 @@ RemoteHttpRunner::GetSubmittedActionCount()
return m_RemoteRunningMap.size();
}
+//////////////////////////////////////////////////////////////////////////
+//
+// IWsClientHandler
+//
+
+void
+RemoteHttpRunner::OnWsOpen()
+{
+ ZEN_INFO("WebSocket connected to {}", m_HostName);
+ m_WsConnected.store(true, std::memory_order_release);
+}
+
+void
+RemoteHttpRunner::OnWsMessage([[maybe_unused]] const WebSocketMessage& Msg)
+{
+ // The message content is a wakeup signal; no parsing needed.
+ // Signal the monitor thread to sweep completed actions immediately.
+ m_MonitorThreadEvent.Set();
+}
+
+void
+RemoteHttpRunner::OnWsClose([[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_WARN("WebSocket disconnected from {} (code {})", m_HostName, Code);
+ m_WsConnected.store(false, std::memory_order_release);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
void
RemoteHttpRunner::MonitorThreadFunction()
{
@@ -489,28 +808,40 @@ RemoteHttpRunner::MonitorThreadFunction()
do
{
const int NormalWaitingTime = 200;
- int WaitTimeMs = NormalWaitingTime;
- auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
- auto SweepOnce = [&] {
+ const int WsWaitingTime = 2000; // Safety-net interval when WS is connected
+
+ int WaitTimeMs = m_WsConnected.load(std::memory_order_relaxed) ? WsWaitingTime : NormalWaitingTime;
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
+ auto SweepOnce = [&] {
const size_t RetiredCount = SweepRunningActions();
- m_RunningLock.WithSharedLock([&] {
- if (m_RemoteRunningMap.size() > 16)
- {
- WaitTimeMs = NormalWaitingTime / 4;
- }
- else
- {
- if (RetiredCount)
+ if (m_WsConnected.load(std::memory_order_relaxed))
+ {
+ // WS connected: use long safety-net interval; the WS message
+ // will wake us immediately for the real work.
+ WaitTimeMs = WsWaitingTime;
+ }
+ else
+ {
+ // No WS: adaptive polling as before
+ m_RunningLock.WithSharedLock([&] {
+ if (m_RemoteRunningMap.size() > 16)
{
- WaitTimeMs = NormalWaitingTime / 2;
+ WaitTimeMs = NormalWaitingTime / 4;
}
else
{
- WaitTimeMs = NormalWaitingTime;
+ if (RetiredCount)
+ {
+ WaitTimeMs = NormalWaitingTime / 2;
+ }
+ else
+ {
+ WaitTimeMs = NormalWaitingTime;
+ }
}
- }
- });
+ });
+ }
};
while (!WaitOnce())
@@ -518,7 +849,7 @@ RemoteHttpRunner::MonitorThreadFunction()
SweepOnce();
}
- // Signal received - this may mean we should quit
+ // Signal received — may be a WS wakeup or a quit signal
SweepOnce();
} while (m_MonitorThreadEnabled);