aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/runners
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/runners
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/runners')
-rw-r--r--src/zencompute/runners/functionrunner.cpp44
-rw-r--r--src/zencompute/runners/functionrunner.h53
-rw-r--r--src/zencompute/runners/localrunner.cpp10
-rw-r--r--src/zencompute/runners/localrunner.h2
-rw-r--r--src/zencompute/runners/remotehttprunner.cpp399
-rw-r--r--src/zencompute/runners/remotehttprunner.h23
6 files changed, 473 insertions, 58 deletions
diff --git a/src/zencompute/runners/functionrunner.cpp b/src/zencompute/runners/functionrunner.cpp
index 768cdf1e1..4f116e7d8 100644
--- a/src/zencompute/runners/functionrunner.cpp
+++ b/src/zencompute/runners/functionrunner.cpp
@@ -215,15 +215,22 @@ BaseRunnerGroup::GetSubmittedActionCount()
return TotalCount;
}
-void
+bool
BaseRunnerGroup::RegisterWorker(CbPackage Worker)
{
RwLock::SharedLockScope _(m_RunnersLock);
+ bool AllSucceeded = true;
+
for (auto& Runner : m_Runners)
{
- Runner->RegisterWorker(Worker);
+ if (!Runner->RegisterWorker(Worker))
+ {
+ AllSucceeded = false;
+ }
}
+
+ return AllSucceeded;
}
void
@@ -276,12 +283,34 @@ RunnerAction::~RunnerAction()
}
bool
+RunnerAction::RetractAction()
+{
+ State CurrentState = m_ActionState.load();
+
+ do
+ {
+ // Only allow retraction from pre-terminal states (idempotent if already retracted)
+ if (CurrentState > State::Running)
+ {
+ return CurrentState == State::Retracted;
+ }
+
+ if (m_ActionState.compare_exchange_strong(CurrentState, State::Retracted))
+ {
+ this->Timestamps[static_cast<int>(State::Retracted)] = DateTime::Now().GetTicks();
+ m_OwnerSession->PostUpdate(this);
+ return true;
+ }
+ } while (true);
+}
+
+bool
RunnerAction::ResetActionStateToPending()
{
- // Only allow reset from Failed or Abandoned states
+ // Only allow reset from Failed, Abandoned, or Retracted states
State CurrentState = m_ActionState.load();
- if (CurrentState != State::Failed && CurrentState != State::Abandoned)
+ if (CurrentState != State::Failed && CurrentState != State::Abandoned && CurrentState != State::Retracted)
{
return false;
}
@@ -305,8 +334,11 @@ RunnerAction::ResetActionStateToPending()
CpuUsagePercent.store(-1.0f, std::memory_order_relaxed);
CpuSeconds.store(0.0f, std::memory_order_relaxed);
- // Increment retry count
- RetryCount.fetch_add(1, std::memory_order_relaxed);
+ // Increment retry count (skip for Retracted — nothing failed)
+ if (CurrentState != State::Retracted)
+ {
+ RetryCount.fetch_add(1, std::memory_order_relaxed);
+ }
// Re-enter the scheduler pipeline
m_OwnerSession->PostUpdate(this);
diff --git a/src/zencompute/runners/functionrunner.h b/src/zencompute/runners/functionrunner.h
index f67414dbb..56c3f3af0 100644
--- a/src/zencompute/runners/functionrunner.h
+++ b/src/zencompute/runners/functionrunner.h
@@ -29,8 +29,8 @@ public:
FunctionRunner(std::filesystem::path BasePath);
virtual ~FunctionRunner() = 0;
- virtual void Shutdown() = 0;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) = 0;
+ virtual void Shutdown() = 0;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) = 0;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) = 0;
[[nodiscard]] virtual size_t GetSubmittedActionCount() = 0;
@@ -63,7 +63,7 @@ public:
SubmitResult SubmitAction(Ref<RunnerAction> Action);
std::vector<SubmitResult> SubmitActions(const std::vector<Ref<RunnerAction>>& Actions);
size_t GetSubmittedActionCount();
- void RegisterWorker(CbPackage Worker);
+ [[nodiscard]] bool RegisterWorker(CbPackage Worker);
void Shutdown();
bool CancelAction(int ActionLsn);
void CancelRemoteQueue(int QueueId);
@@ -114,6 +114,30 @@ struct RunnerGroup : public BaseRunnerGroup
/**
* This represents an action going through different stages of scheduling and execution.
+ *
+ * State machine
+ * =============
+ *
+ * Normal forward flow (enforced by SetActionState rejecting backward transitions):
+ *
+ * New -> Pending -> Submitting -> Running -> Completed
+ * -> Failed
+ * -> Abandoned
+ * -> Cancelled
+ *
+ * Rescheduling (via ResetActionStateToPending):
+ *
+ * Failed ---> Pending (increments RetryCount, subject to retry limit)
+ * Abandoned ---> Pending (increments RetryCount, subject to retry limit)
+ * Retracted ---> Pending (does NOT increment RetryCount)
+ *
+ * Retraction (via RetractAction, idempotent):
+ *
+ * Pending/Submitting/Running -> Retracted -> Pending (rescheduled)
+ *
+ * Retracted is placed after Cancelled in enum order so that once set,
+ * no runner-side transition (Completed/Failed) can override it via
+ * SetActionState's forward-only rule.
*/
struct RunnerAction : public RefCounted
{
@@ -137,16 +161,20 @@ struct RunnerAction : public RefCounted
enum class State
{
- New,
- Pending,
- Submitting,
- Running,
- Completed,
- Failed,
- Abandoned,
- Cancelled,
+ New, // Initial state at construction, before entering the scheduler
+ Pending, // Queued and waiting for a runner slot
+ Submitting, // Being handed off to a runner (async submission in progress)
+ Running, // Executing on a runner process
+ Completed, // Finished successfully with results available
+ Failed, // Execution failed (transient error, eligible for retry)
+ Abandoned, // Infrastructure termination (e.g. spot eviction, session abandon)
+ Cancelled, // Intentional user cancellation (never retried)
+ Retracted, // Pulled back for rescheduling on a different runner (no retry cost)
_Count
};
+ static_assert(State::Retracted > State::Completed && State::Retracted > State::Failed && State::Retracted > State::Abandoned &&
+ State::Retracted > State::Cancelled,
+ "Retracted must be the highest terminal ordinal so runner-side transitions cannot override it");
static const char* ToString(State _)
{
@@ -168,6 +196,8 @@ struct RunnerAction : public RefCounted
return "Abandoned";
case State::Cancelled:
return "Cancelled";
+ case State::Retracted:
+ return "Retracted";
default:
return "Unknown";
}
@@ -191,6 +221,7 @@ struct RunnerAction : public RefCounted
void SetActionState(State NewState);
bool IsSuccess() const { return ActionState() == State::Completed; }
+ bool RetractAction();
bool ResetActionStateToPending();
bool IsCompleted() const
{
diff --git a/src/zencompute/runners/localrunner.cpp b/src/zencompute/runners/localrunner.cpp
index 7aaefb06e..b61e0a46f 100644
--- a/src/zencompute/runners/localrunner.cpp
+++ b/src/zencompute/runners/localrunner.cpp
@@ -7,14 +7,16 @@
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/compactbinarypackage.h>
+# include <zencore/compactbinaryfile.h>
# include <zencore/compress.h>
# include <zencore/except_fmt.h>
# include <zencore/filesystem.h>
# include <zencore/fmtutils.h>
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
-# include <zencore/system.h>
# include <zencore/scopeguard.h>
+# include <zencore/stream.h>
+# include <zencore/system.h>
# include <zencore/timer.h>
# include <zencore/trace.h>
# include <zenstore/cidstore.h>
@@ -152,7 +154,7 @@ LocalProcessRunner::CreateNewSandbox()
return Path;
}
-void
+bool
LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
{
ZEN_TRACE_CPU("LocalProcessRunner::RegisterWorker");
@@ -173,6 +175,8 @@ LocalProcessRunner::RegisterWorker(const CbPackage& WorkerPackage)
ZEN_INFO("dumped worker '{}' to 'file://{}'", WorkerId, Path);
}
+
+ return true;
}
size_t
@@ -301,7 +305,7 @@ LocalProcessRunner::PrepareActionSubmission(Ref<RunnerAction> Action)
// Write out action
- zen::WriteFile(SandboxPath / "build.action", ActionObj.GetBuffer().AsIoBuffer());
+ WriteCompactBinaryObject(SandboxPath / "build.action", ActionObj);
// Manifest inputs in sandbox
diff --git a/src/zencompute/runners/localrunner.h b/src/zencompute/runners/localrunner.h
index 7493e980b..b8cff6826 100644
--- a/src/zencompute/runners/localrunner.h
+++ b/src/zencompute/runners/localrunner.h
@@ -51,7 +51,7 @@ public:
~LocalProcessRunner();
virtual void Shutdown() override;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override { return true; }
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;
diff --git a/src/zencompute/runners/remotehttprunner.cpp b/src/zencompute/runners/remotehttprunner.cpp
index 672636d06..ce6a81173 100644
--- a/src/zencompute/runners/remotehttprunner.cpp
+++ b/src/zencompute/runners/remotehttprunner.cpp
@@ -42,6 +42,20 @@ RemoteHttpRunner::RemoteHttpRunner(ChunkResolver& InChunkResolver,
, m_Http(m_BaseUrl)
, m_InstanceId(Oid::NewOid())
{
+ // Attempt to connect a WebSocket for push-based completion notifications.
+ // If the remote doesn't support WS, OnWsClose fires and we fall back to polling.
+ {
+ std::string WsUrl = HttpToWsUrl(HostName, "/compute/ws");
+
+ HttpWsClientSettings WsSettings;
+ WsSettings.LogCategory = "http_exec_ws";
+ WsSettings.ConnectTimeout = std::chrono::milliseconds{3000};
+
+ IWsClientHandler& Handler = *this;
+ m_WsClient = std::make_unique<HttpWsClient>(WsUrl, Handler, WsSettings);
+ m_WsClient->Connect();
+ }
+
m_MonitorThread = std::thread{&RemoteHttpRunner::MonitorThreadFunction, this};
}
@@ -53,7 +67,29 @@ RemoteHttpRunner::~RemoteHttpRunner()
void
RemoteHttpRunner::Shutdown()
{
- // TODO: should cleanly drain/cancel pending work
+ m_AcceptNewActions = false;
+
+ // Close the WebSocket client first, so no more wakeup signals arrive.
+ if (m_WsClient)
+ {
+ m_WsClient->Close();
+ }
+
+ // Cancel all known remote queues so the remote side stops scheduling new
+ // work and cancels in-flight actions belonging to those queues.
+
+ {
+ std::vector<std::pair<int, Oid>> Queues;
+
+ m_QueueTokenLock.WithSharedLock([&] { Queues.assign(m_RemoteQueueTokens.begin(), m_RemoteQueueTokens.end()); });
+
+ for (const auto& [QueueId, Token] : Queues)
+ {
+ CancelRemoteQueue(QueueId);
+ }
+ }
+
+ // Stop the monitor thread so it no longer polls the remote.
m_MonitorThreadEnabled = false;
m_MonitorThreadEvent.Set();
@@ -61,9 +97,22 @@ RemoteHttpRunner::Shutdown()
{
m_MonitorThread.join();
}
+
+ // Drain the running map and mark all remaining actions as Failed so the
+ // scheduler can reschedule or finalize them.
+
+ std::unordered_map<int, HttpRunningAction> Remaining;
+
+ m_RunningLock.WithExclusiveLock([&] { Remaining.swap(m_RemoteRunningMap); });
+
+ for (auto& [RemoteLsn, HttpAction] : Remaining)
+ {
+ ZEN_DEBUG("shutdown: marking remote action LSN {} (local LSN {}) as Failed", RemoteLsn, HttpAction.Action->ActionLsn);
+ HttpAction.Action->SetActionState(RunnerAction::State::Failed);
+ }
}
-void
+bool
RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
{
ZEN_TRACE_CPU("RemoteHttpRunner::RegisterWorker");
@@ -125,15 +174,13 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
if (!IsHttpSuccessCode(PayloadResponse.StatusCode))
{
ZEN_ERROR("ERROR: unable to register payloads for worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
-
- // TODO: propagate error
+ return false;
}
}
else if (!IsHttpSuccessCode(DescResponse.StatusCode))
{
ZEN_ERROR("ERROR: unable to register worker {} at {}{}", WorkerId, m_Http.GetBaseUri(), WorkerUrl);
-
- // TODO: propagate error
+ return false;
}
else
{
@@ -152,14 +199,20 @@ RemoteHttpRunner::RegisterWorker(const CbPackage& WorkerPackage)
WorkerUrl,
(int)WorkerResponse.StatusCode,
ToString(WorkerResponse.StatusCode));
-
- // TODO: propagate error
+ return false;
}
+
+ return true;
}
size_t
RemoteHttpRunner::QueryCapacity()
{
+ if (!m_AcceptNewActions)
+ {
+ return 0;
+ }
+
// Estimate how much more work we're ready to accept
RwLock::SharedLockScope _{m_RunningLock};
@@ -191,24 +244,68 @@ RemoteHttpRunner::SubmitActions(const std::vector<Ref<RunnerAction>>& Actions)
return Results;
}
- // For larger batches, submit HTTP requests in parallel via the shared worker pool
+ // Collect distinct QueueIds and ensure remote queues exist once per queue
- std::vector<std::future<SubmitResult>> Futures;
- Futures.reserve(Actions.size());
+ std::unordered_map<int, Oid> QueueTokens; // QueueId → remote token (0 stays as Zero)
for (const Ref<RunnerAction>& Action : Actions)
{
- std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); });
+ const int QueueId = Action->QueueId;
+ if (QueueId != 0 && QueueTokens.find(QueueId) == QueueTokens.end())
+ {
+ CbObject QueueMeta = Action->GetOwnerSession()->GetQueueMetadata(QueueId);
+ CbObject QueueConfig = Action->GetOwnerSession()->GetQueueConfig(QueueId);
+ QueueTokens[QueueId] = EnsureRemoteQueue(QueueId, QueueMeta, QueueConfig);
+ }
+ }
- Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog));
+ // Group actions by QueueId
+
+ struct QueueGroup
+ {
+ std::vector<Ref<RunnerAction>> Actions;
+ std::vector<size_t> OriginalIndices;
+ };
+
+ std::unordered_map<int, QueueGroup> Groups;
+
+ for (size_t i = 0; i < Actions.size(); ++i)
+ {
+ auto& Group = Groups[Actions[i]->QueueId];
+ Group.Actions.push_back(Actions[i]);
+ Group.OriginalIndices.push_back(i);
}
- std::vector<SubmitResult> Results;
- Results.reserve(Futures.size());
+ // Submit each group as a batch and map results back to original indices
- for (auto& Future : Futures)
+ std::vector<SubmitResult> Results(Actions.size());
+
+ for (auto& [QueueId, Group] : Groups)
{
- Results.push_back(Future.get());
+ std::string SubmitUrl = "/jobs";
+ if (QueueId != 0)
+ {
+ if (Oid Token = QueueTokens[QueueId]; Token != Oid::Zero)
+ {
+ SubmitUrl = fmt::format("/queues/{}/jobs", Token);
+ }
+ }
+
+ const size_t BatchLimit = size_t(m_MaxBatchSize);
+
+ for (size_t Offset = 0; Offset < Group.Actions.size(); Offset += BatchLimit)
+ {
+ size_t End = zen::Min(Offset + BatchLimit, Group.Actions.size());
+
+ std::vector<Ref<RunnerAction>> Chunk(Group.Actions.begin() + Offset, Group.Actions.begin() + End);
+
+ std::vector<SubmitResult> ChunkResults = SubmitActionBatch(SubmitUrl, Chunk);
+
+ for (size_t j = 0; j < ChunkResults.size(); ++j)
+ {
+ Results[Group.OriginalIndices[Offset + j]] = std::move(ChunkResults[j]);
+ }
+ }
}
return Results;
@@ -221,6 +318,11 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
// Verify whether we can accept more work
+ if (!m_AcceptNewActions)
+ {
+ return SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"};
+ }
+
{
RwLock::SharedLockScope _{m_RunningLock};
if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
@@ -275,7 +377,7 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
m_Http.GetBaseUri(),
ActionId);
- RegisterWorker(Action->Worker.Descriptor);
+ (void)RegisterWorker(Action->Worker.Descriptor);
}
else
{
@@ -384,6 +486,194 @@ RemoteHttpRunner::SubmitAction(Ref<RunnerAction> Action)
return {};
}
+std::vector<SubmitResult>
+RemoteHttpRunner::SubmitActionBatch(const std::string& SubmitUrl, const std::vector<Ref<RunnerAction>>& Actions)
+{
+ ZEN_TRACE_CPU("RemoteHttpRunner::SubmitActionBatch");
+
+ if (!m_AcceptNewActions)
+ {
+ return std::vector<SubmitResult>(Actions.size(), SubmitResult{.IsAccepted = false, .Reason = "runner is shutting down"});
+ }
+
+ // Capacity check
+
+ {
+ RwLock::SharedLockScope _{m_RunningLock};
+ if (m_RemoteRunningMap.size() >= size_t(m_MaxRunningActions))
+ {
+ std::vector<SubmitResult> Results(Actions.size(), SubmitResult{.IsAccepted = false});
+ return Results;
+ }
+ }
+
+ // Per-action setup and build batch body
+
+ CbObjectWriter Body;
+ Body.BeginArray("actions"sv);
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ Action->ExecutionLocation = m_HostName;
+ MaybeDumpAction(Action->ActionLsn, Action->ActionObj);
+ Body.AddObject(Action->ActionObj);
+ }
+
+ Body.EndArray();
+
+ // POST the batch
+
+ HttpClient::Response Response = m_Http.Post(SubmitUrl, Body.Save());
+
+ if (Response.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(Response, Actions);
+ }
+
+ if (Response.StatusCode == HttpResponseCode::NotFound)
+ {
+ // Server needs attachments — resolve them and retry with a CbPackage
+
+ CbObject NeedObj = Response.AsObject();
+
+ CbPackage Pkg;
+ Pkg.SetObject(Body.Save());
+
+ for (auto& Item : NeedObj["need"sv])
+ {
+ const IoHash NeedHash = Item.AsHash();
+
+ if (IoBuffer Chunk = m_ChunkResolver.FindChunkByCid(NeedHash))
+ {
+ uint64_t DataRawSize = 0;
+ IoHash DataRawHash;
+ CompressedBuffer Compressed =
+ CompressedBuffer::FromCompressed(SharedBuffer{Chunk}, /* out */ DataRawHash, /* out */ DataRawSize);
+
+ ZEN_ASSERT(DataRawHash == NeedHash);
+
+ Pkg.AddAttachment(CbAttachment(Compressed, NeedHash));
+ }
+ else
+ {
+ ZEN_WARN("batch submit: missing attachment {} — falling back to individual submit", NeedHash);
+ return FallbackToIndividualSubmit(Actions);
+ }
+ }
+
+ HttpClient::Response RetryResponse = m_Http.Post(SubmitUrl, Pkg);
+
+ if (RetryResponse.StatusCode == HttpResponseCode::OK)
+ {
+ return ParseBatchResponse(RetryResponse, Actions);
+ }
+
+ ZEN_WARN("batch submit retry failed with {} {} — falling back to individual submit",
+ (int)RetryResponse.StatusCode,
+ ToString(RetryResponse.StatusCode));
+ return FallbackToIndividualSubmit(Actions);
+ }
+
+ // Unexpected status or connection error — fall back to individual submission
+
+ if (Response)
+ {
+ ZEN_WARN("batch submit to {}{} returned {} {} — falling back to individual submit",
+ m_Http.GetBaseUri(),
+ SubmitUrl,
+ (int)Response.StatusCode,
+ ToString(Response.StatusCode));
+ }
+ else
+ {
+ ZEN_WARN("batch submit to {}{} failed — falling back to individual submit", m_Http.GetBaseUri(), SubmitUrl);
+ }
+
+ return FallbackToIndividualSubmit(Actions);
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::ParseBatchResponse(const HttpClient::Response& Response, const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<SubmitResult> Results;
+ Results.reserve(Actions.size());
+
+ CbObject ResponseObj = Response.AsObject();
+ CbArrayView ResultArray = ResponseObj["results"sv].AsArrayView();
+
+ size_t Index = 0;
+ for (CbFieldView Field : ResultArray)
+ {
+ if (Index >= Actions.size())
+ {
+ break;
+ }
+
+ CbObjectView Entry = Field.AsObjectView();
+ const int32_t LsnField = Entry["lsn"sv].AsInt32(0);
+
+ if (LsnField > 0)
+ {
+ HttpRunningAction NewAction;
+ NewAction.Action = Actions[Index];
+ NewAction.RemoteActionLsn = LsnField;
+
+ {
+ RwLock::ExclusiveLockScope _(m_RunningLock);
+ m_RemoteRunningMap[LsnField] = std::move(NewAction);
+ }
+
+ ZEN_DEBUG("batch: scheduled action {} with remote LSN {} (local LSN {})",
+ Actions[Index]->ActionObj.GetHash(),
+ LsnField,
+ Actions[Index]->ActionLsn);
+
+ Actions[Index]->SetActionState(RunnerAction::State::Running);
+
+ Results.push_back(SubmitResult{.IsAccepted = true});
+ }
+ else
+ {
+ std::string_view ErrorMsg = Entry["error"sv].AsString();
+ Results.push_back(SubmitResult{.IsAccepted = false, .Reason = std::string(ErrorMsg)});
+ }
+
+ ++Index;
+ }
+
+ // If the server returned fewer results than actions, mark the rest as not accepted
+ while (Results.size() < Actions.size())
+ {
+ Results.push_back(SubmitResult{.IsAccepted = false, .Reason = "no result from server"});
+ }
+
+ return Results;
+}
+
+std::vector<SubmitResult>
+RemoteHttpRunner::FallbackToIndividualSubmit(const std::vector<Ref<RunnerAction>>& Actions)
+{
+ std::vector<std::future<SubmitResult>> Futures;
+ Futures.reserve(Actions.size());
+
+ for (const Ref<RunnerAction>& Action : Actions)
+ {
+ std::packaged_task<SubmitResult()> Task([this, Action]() { return SubmitAction(Action); });
+
+ Futures.push_back(m_WorkerPool.EnqueueTask(std::move(Task), WorkerThreadPool::EMode::EnableBacklog));
+ }
+
+ std::vector<SubmitResult> Results;
+ Results.reserve(Futures.size());
+
+ for (auto& Future : Futures)
+ {
+ Results.push_back(Future.get());
+ }
+
+ return Results;
+}
+
Oid
RemoteHttpRunner::EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config)
{
@@ -481,6 +771,35 @@ RemoteHttpRunner::GetSubmittedActionCount()
return m_RemoteRunningMap.size();
}
+//////////////////////////////////////////////////////////////////////////
+//
+// IWsClientHandler
+//
+
+void
+RemoteHttpRunner::OnWsOpen()
+{
+ ZEN_INFO("WebSocket connected to {}", m_HostName);
+ m_WsConnected.store(true, std::memory_order_release);
+}
+
+void
+RemoteHttpRunner::OnWsMessage([[maybe_unused]] const WebSocketMessage& Msg)
+{
+ // The message content is a wakeup signal; no parsing needed.
+ // Signal the monitor thread to sweep completed actions immediately.
+ m_MonitorThreadEvent.Set();
+}
+
+void
+RemoteHttpRunner::OnWsClose([[maybe_unused]] uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ ZEN_WARN("WebSocket disconnected from {} (code {})", m_HostName, Code);
+ m_WsConnected.store(false, std::memory_order_release);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
void
RemoteHttpRunner::MonitorThreadFunction()
{
@@ -489,28 +808,40 @@ RemoteHttpRunner::MonitorThreadFunction()
do
{
const int NormalWaitingTime = 200;
- int WaitTimeMs = NormalWaitingTime;
- auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
- auto SweepOnce = [&] {
+ const int WsWaitingTime = 2000; // Safety-net interval when WS is connected
+
+ int WaitTimeMs = m_WsConnected.load(std::memory_order_relaxed) ? WsWaitingTime : NormalWaitingTime;
+ auto WaitOnce = [&] { return m_MonitorThreadEvent.Wait(WaitTimeMs); };
+ auto SweepOnce = [&] {
const size_t RetiredCount = SweepRunningActions();
- m_RunningLock.WithSharedLock([&] {
- if (m_RemoteRunningMap.size() > 16)
- {
- WaitTimeMs = NormalWaitingTime / 4;
- }
- else
- {
- if (RetiredCount)
+ if (m_WsConnected.load(std::memory_order_relaxed))
+ {
+ // WS connected: use long safety-net interval; the WS message
+ // will wake us immediately for the real work.
+ WaitTimeMs = WsWaitingTime;
+ }
+ else
+ {
+ // No WS: adaptive polling as before
+ m_RunningLock.WithSharedLock([&] {
+ if (m_RemoteRunningMap.size() > 16)
{
- WaitTimeMs = NormalWaitingTime / 2;
+ WaitTimeMs = NormalWaitingTime / 4;
}
else
{
- WaitTimeMs = NormalWaitingTime;
+ if (RetiredCount)
+ {
+ WaitTimeMs = NormalWaitingTime / 2;
+ }
+ else
+ {
+ WaitTimeMs = NormalWaitingTime;
+ }
}
- }
- });
+ });
+ }
};
while (!WaitOnce())
@@ -518,7 +849,7 @@ RemoteHttpRunner::MonitorThreadFunction()
SweepOnce();
}
- // Signal received - this may mean we should quit
+ // Signal received — may be a WS wakeup or a quit signal
SweepOnce();
} while (m_MonitorThreadEnabled);
diff --git a/src/zencompute/runners/remotehttprunner.h b/src/zencompute/runners/remotehttprunner.h
index 9119992a9..c17d0cf2a 100644
--- a/src/zencompute/runners/remotehttprunner.h
+++ b/src/zencompute/runners/remotehttprunner.h
@@ -14,9 +14,11 @@
# include <zencore/workthreadpool.h>
# include <zencore/zencore.h>
# include <zenhttp/httpclient.h>
+# include <zenhttp/httpwsclient.h>
# include <atomic>
# include <filesystem>
+# include <memory>
# include <thread>
# include <unordered_map>
@@ -32,7 +34,7 @@ namespace zen::compute {
*/
-class RemoteHttpRunner : public FunctionRunner
+class RemoteHttpRunner : public FunctionRunner, private IWsClientHandler
{
RemoteHttpRunner(RemoteHttpRunner&&) = delete;
RemoteHttpRunner& operator=(RemoteHttpRunner&&) = delete;
@@ -45,7 +47,7 @@ public:
~RemoteHttpRunner();
virtual void Shutdown() override;
- virtual void RegisterWorker(const CbPackage& WorkerPackage) override;
+ [[nodiscard]] virtual bool RegisterWorker(const CbPackage& WorkerPackage) override;
[[nodiscard]] virtual SubmitResult SubmitAction(Ref<RunnerAction> Action) override;
[[nodiscard]] virtual bool IsHealthy() override;
[[nodiscard]] virtual size_t GetSubmittedActionCount() override;
@@ -66,7 +68,9 @@ private:
std::string m_BaseUrl;
HttpClient m_Http;
- int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ std::atomic<bool> m_AcceptNewActions{true};
+ int32_t m_MaxRunningActions = 256; // arbitrary limit for testing
+ int32_t m_MaxBatchSize = 50;
struct HttpRunningAction
{
@@ -92,7 +96,20 @@ private:
// creating remote queues. Generated once at construction and never changes.
Oid m_InstanceId;
+ // WebSocket completion notification client
+ std::unique_ptr<HttpWsClient> m_WsClient;
+ std::atomic<bool> m_WsConnected{false};
+
+ // IWsClientHandler
+ void OnWsOpen() override;
+ void OnWsMessage(const WebSocketMessage& Msg) override;
+ void OnWsClose(uint16_t Code, std::string_view Reason) override;
+
Oid EnsureRemoteQueue(int QueueId, const CbObject& Metadata, const CbObject& Config);
+
+ std::vector<SubmitResult> SubmitActionBatch(const std::string& SubmitUrl, const std::vector<Ref<RunnerAction>>& Actions);
+ std::vector<SubmitResult> ParseBatchResponse(const HttpClient::Response& Response, const std::vector<Ref<RunnerAction>>& Actions);
+ std::vector<SubmitResult> FallbackToIndividualSubmit(const std::vector<Ref<RunnerAction>>& Actions);
};
} // namespace zen::compute