aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/httpcomputeservice.cpp
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2026-03-18 11:19:10 +0100
committerGitHub Enterprise <[email protected]>2026-03-18 11:19:10 +0100
commiteba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch)
tree3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/httpcomputeservice.cpp
parentbugfix release - v5.7.23 (#851) (diff)
downloadzen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz
zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip
Compute batching (#849)
### Compute Batch Submission - Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads - Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure - Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps` ### Retracted Action State - Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount` - Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession` - Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints - Add state machine documentation and per-state comments to `RunnerAction` ### Compute Race Fixes - Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely - Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK ### Logging Optimization and ANSI improvements - Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex` - Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage` - Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h` - Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences - Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files ### CLI / Exec Refactoring - Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct - Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`) - Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser ### Testing Improvements - Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter - Add test suite banners to test listener output - Made `function.session.abandon_pending` test more robust ### Startup / Reliability Fixes - Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()` - Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing - Fail on unrecognized zenserver `--mode` instead of silently defaulting to store ### Other - Show host details (hostname, platform, CPU count, memory) when discovering new compute workers - Move frontend `html.zip` from source tree into build directory - Add format specifications for Compact Binary and Compressed Buffer wire formats - Add `WriteCompactBinaryObject` to zencore - Extended `ConsoleTui` with additional functionality - Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support - Disable compute/horde/nomad in release builds (not yet production-ready) - Disable unintended `ASIO_HAS_IO_URING` enablement - Fix crashpad patch missing leading whitespace - Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
-rw-r--r--src/zencompute/httpcomputeservice.cpp760
1 files changed, 395 insertions, 365 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
index e82a40781..bdfd9d197 100644
--- a/src/zencompute/httpcomputeservice.cpp
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -16,6 +16,7 @@
# include <zencore/iobuffer.h>
# include <zencore/iohash.h>
# include <zencore/logging.h>
+# include <zencore/string.h>
# include <zencore/system.h>
# include <zencore/thread.h>
# include <zencore/trace.h>
@@ -23,8 +24,10 @@
# include <zenstore/cidstore.h>
# include <zentelemetry/stats.h>
+# include <algorithm>
# include <span>
# include <unordered_map>
+# include <vector>
using namespace std::literals;
@@ -50,6 +53,11 @@ struct HttpComputeService::Impl
ComputeServiceSession m_ComputeService;
SystemMetricsTracker m_MetricsTracker;
+ // WebSocket connections (completion push)
+
+ RwLock m_WsConnectionsLock;
+ std::vector<Ref<WebSocketConnection>> m_WsConnections;
+
// Metrics
metrics::OperationTiming m_HttpRequests;
@@ -91,6 +99,12 @@ struct HttpComputeService::Impl
void HandleWorkersAllGet(HttpServerRequest& HttpReq);
void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status);
void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId);
+ void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker);
+
+ // WebSocket / observer
+ void OnWebSocketOpen(Ref<WebSocketConnection> Connection);
+ void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code);
+ void OnActionsCompleted(std::span<const IComputeCompletionObserver::CompletedActionNotification> Actions);
void RegisterRoutes();
@@ -110,6 +124,7 @@ struct HttpComputeService::Impl
m_ComputeService.WaitUntilReady();
m_StatsService.RegisterHandler("compute", *m_Self);
RegisterRoutes();
+ m_ComputeService.SetCompletionObserver(m_Self);
}
};
@@ -149,7 +164,7 @@ HttpComputeService::Impl::RegisterRoutes()
return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
}
- bool Success = m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Abandoned);
+ bool Success = m_ComputeService.Abandon();
if (Success)
{
@@ -325,6 +340,29 @@ HttpComputeService::Impl::RegisterRoutes()
HttpVerb::kGet | HttpVerb::kPost);
m_Router.RegisterRoute(
+ "jobs/{lsn}/retract",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int ActionLsn = ParseInt<int>(Req.GetCapture(1)).value_or(0);
+
+ auto Result = m_ComputeService.RetractAction(ActionLsn);
+
+ CbObjectWriter Cbo;
+ if (Result.Success)
+ {
+ Cbo << "success"sv << true;
+ Cbo << "lsn"sv << ActionLsn;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ else
+ {
+ Cbo << "error"sv << Result.Error;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
"jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the
// one which uses the scheduled action lsn for lookups
[this](HttpRouterRequest& Req) {
@@ -373,127 +411,7 @@ HttpComputeService::Impl::RegisterRoutes()
RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
}
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- // TODO: return status of all pending or executing jobs
- break;
-
- case HttpVerb::kPost:
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject ActionObj = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- ActionObj.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything, enqueue the action for execution
-
- if (ComputeServiceSession::EnqueueResult Result =
- m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
-
- return;
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- if (ComputeServiceSession::EnqueueResult Result =
- m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
- ActionObj.GetHash(),
- Result.Lsn,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
-
- return;
- }
- break;
-
- default:
- break;
- }
- break;
-
- default:
- break;
- }
+ HandleSubmitAction(HttpReq, 0, RequestPriority, &Worker);
},
HttpVerb::kPost);
@@ -511,118 +429,7 @@ HttpComputeService::Impl::RegisterRoutes()
RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
}
- // Resolve worker
-
- //
-
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject ActionObj = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- ActionObj.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything, enqueue the action for execution
-
- if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority))
- {
- ZEN_DEBUG("action accepted (lsn {})", Result.Lsn);
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- // Could not resolve?
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority))
- {
- ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
- Result.Lsn,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- // Could not resolve?
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
- return;
- }
+ HandleSubmitAction(HttpReq, 0, RequestPriority, nullptr);
},
HttpVerb::kPost);
@@ -1090,72 +897,7 @@ HttpComputeService::Impl::RegisterRoutes()
RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
}
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject ActionObj = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- if (!CheckAttachments(ActionObj, NeedList))
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save());
- }
-
- if (ComputeServiceSession::EnqueueResult Result =
- m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, ActionObj.GetHash(), Result.Lsn);
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- IngestStats Stats = IngestPackageAttachments(Action);
-
- if (ComputeServiceSession::EnqueueResult Result =
- m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
- QueueId,
- ActionObj.GetHash(),
- Result.Lsn,
- zen::NiceBytes(Stats.Bytes),
- Stats.Count,
- zen::NiceBytes(Stats.NewBytes),
- Stats.NewCount);
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
-
- default:
- break;
- }
+ HandleSubmitAction(HttpReq, QueueId, RequestPriority, &Worker);
},
HttpVerb::kPost);
@@ -1178,71 +920,7 @@ HttpComputeService::Impl::RegisterRoutes()
RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
}
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject ActionObj = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- if (!CheckAttachments(ActionObj, NeedList))
- {
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save());
- }
-
- if (ComputeServiceSession::EnqueueResult Result =
- m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("queue {}: action accepted (lsn {})", QueueId, Result.Lsn);
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- IngestStats Stats = IngestPackageAttachments(Action);
-
- if (ComputeServiceSession::EnqueueResult Result =
- m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("queue {}: accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
- QueueId,
- Result.Lsn,
- zen::NiceBytes(Stats.Bytes),
- Stats.Count,
- zen::NiceBytes(Stats.NewBytes),
- Stats.NewCount);
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
-
- default:
- break;
- }
+ HandleSubmitAction(HttpReq, QueueId, RequestPriority, nullptr);
},
HttpVerb::kPost);
@@ -1306,6 +984,45 @@ HttpComputeService::Impl::RegisterRoutes()
}
},
HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/jobs/{lsn}/retract",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+ const int ActionLsn = ParseInt<int>(Req.GetCapture(2)).value_or(0);
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ ZEN_UNUSED(QueueId);
+
+ auto Result = m_ComputeService.RetractAction(ActionLsn);
+
+ CbObjectWriter Cbo;
+ if (Result.Success)
+ {
+ Cbo << "success"sv << true;
+ Cbo << "lsn"sv << ActionLsn;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ else
+ {
+ Cbo << "error"sv << Result.Error;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
+ },
+ HttpVerb::kPost);
+
+ // WebSocket upgrade endpoint — the handler logic lives in
+ // HttpComputeService::OnWebSocket* methods; this route merely
+ // satisfies the router so the upgrade request isn't rejected.
+ m_Router.RegisterRoute(
+ "ws",
+ [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); },
+ HttpVerb::kGet);
}
//////////////////////////////////////////////////////////////////////////
@@ -1320,12 +1037,17 @@ HttpComputeService::HttpComputeService(CidStore& InCidStore,
HttpComputeService::~HttpComputeService()
{
+ m_Impl->m_ComputeService.SetCompletionObserver(nullptr);
m_Impl->m_StatsService.UnregisterHandler("compute", *this);
}
void
HttpComputeService::Shutdown()
{
+ // Null out observer before shutting down the compute session to prevent
+ // callbacks into a partially-torn-down service.
+ m_Impl->m_ComputeService.SetCompletionObserver(nullptr);
+ m_Impl->m_WsConnectionsLock.WithExclusiveLock([&] { m_Impl->m_WsConnections.clear(); });
m_Impl->m_ComputeService.Shutdown();
}
@@ -1492,6 +1214,184 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto
}
void
+HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker)
+{
+ // QueueId > 0: queue-scoped enqueue; QueueId == 0: implicit queue (global routes)
+ auto Enqueue = [&](CbObject ActionObj) -> ComputeServiceSession::EnqueueResult {
+ if (QueueId > 0)
+ {
+ if (Worker)
+ {
+ return m_ComputeService.EnqueueResolvedActionToQueue(QueueId, *Worker, ActionObj, Priority);
+ }
+ return m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, Priority);
+ }
+ else
+ {
+ if (Worker)
+ {
+ return m_ComputeService.EnqueueResolvedAction(*Worker, ActionObj, Priority);
+ }
+ return m_ComputeService.EnqueueAction(ActionObj, Priority);
+ }
+ };
+
+ // Read payload upfront and handle attachments based on content type
+ CbObject Body;
+ IngestStats Stats = {};
+
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ IoBuffer Payload = HttpReq.ReadPayload();
+ Body = LoadCompactBinaryObject(Payload);
+ break;
+ }
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Package = HttpReq.ReadPayloadPackage();
+ Body = Package.GetObject();
+ Stats = IngestPackageAttachments(Package);
+ break;
+ }
+
+ default:
+ return HttpReq.WriteResponse(HttpResponseCode::BadRequest);
+ }
+
+ // Check for "actions" array to determine batch vs single-action path
+ CbArray Actions = Body.Find("actions"sv).AsArray();
+
+ if (Actions.Num() > 0)
+ {
+ // --- Batch path ---
+
+ // For CbObject payloads, check all attachments upfront before enqueuing anything
+ if (HttpReq.RequestContentType() == HttpContentType::kCbObject)
+ {
+ std::vector<IoHash> NeedList;
+
+ for (CbField ActionField : Actions)
+ {
+ CbObject ActionObj = ActionField.AsObject();
+ CheckAttachments(ActionObj, NeedList);
+ }
+
+ if (!NeedList.empty())
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save());
+ }
+ }
+
+ // Enqueue all actions and collect results
+ CbObjectWriter Cbo;
+ int Accepted = 0;
+
+ Cbo.BeginArray("results");
+
+ for (CbField ActionField : Actions)
+ {
+ CbObject ActionObj = ActionField.AsObject();
+
+ ComputeServiceSession::EnqueueResult Result = Enqueue(ActionObj);
+
+ Cbo.BeginObject();
+
+ if (Result)
+ {
+ Cbo << "lsn"sv << Result.Lsn;
+ ++Accepted;
+ }
+ else
+ {
+ Cbo << "error"sv << Result.ResponseMessage;
+ }
+
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ if (Stats.Count > 0)
+ {
+ ZEN_DEBUG("queue {}: batch accepted {}/{} actions: {} in {} attachments. {} new ({} attachments)",
+ QueueId,
+ Accepted,
+ Actions.Num(),
+ zen::NiceBytes(Stats.Bytes),
+ Stats.Count,
+ zen::NiceBytes(Stats.NewBytes),
+ Stats.NewCount);
+ }
+ else
+ {
+ ZEN_DEBUG("queue {}: batch accepted {}/{} actions", QueueId, Accepted, Actions.Num());
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ // --- Single-action path: Body is the action itself ---
+
+ if (HttpReq.RequestContentType() == HttpContentType::kCbObject)
+ {
+ std::vector<IoHash> NeedList;
+
+ if (!CheckAttachments(Body, NeedList))
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save());
+ }
+ }
+
+ if (ComputeServiceSession::EnqueueResult Result = Enqueue(Body))
+ {
+ if (Stats.Count > 0)
+ {
+ ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ QueueId,
+ Body.GetHash(),
+ Result.Lsn,
+ zen::NiceBytes(Stats.Bytes),
+ Stats.Count,
+ zen::NiceBytes(Stats.NewBytes),
+ Stats.NewCount);
+ }
+ else
+ {
+ ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, Body.GetHash(), Result.Lsn);
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+}
+
+void
HttpComputeService::Impl::HandleWorkersGet(HttpServerRequest& HttpReq)
{
CbObjectWriter Cbo;
@@ -1632,6 +1532,136 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const
}
//////////////////////////////////////////////////////////////////////////
+//
+// IWebSocketHandler
+//
+
+void
+HttpComputeService::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ m_Impl->OnWebSocketOpen(std::move(Connection));
+}
+
+void
+HttpComputeService::OnWebSocketMessage([[maybe_unused]] WebSocketConnection& Conn, [[maybe_unused]] const WebSocketMessage& Msg)
+{
+ // Clients are receive-only; ignore any inbound messages.
+}
+
+void
+HttpComputeService::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, [[maybe_unused]] std::string_view Reason)
+{
+ m_Impl->OnWebSocketClose(Conn, Code);
+}
+
+void
+HttpComputeService::OnActionsCompleted(std::span<const CompletedActionNotification> Actions)
+{
+ m_Impl->OnActionsCompleted(Actions);
+}
+
+//////////////////////////////////////////////////////////////////////////
+//
+// Impl — WebSocket / observer
+//
+
+void
+HttpComputeService::Impl::OnWebSocketOpen(Ref<WebSocketConnection> Connection)
+{
+ ZEN_INFO("compute WebSocket client connected");
+ m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); });
+}
+
+void
+HttpComputeService::Impl::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code)
+{
+ ZEN_INFO("compute WebSocket client disconnected (code {})", Code);
+
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) {
+ return C.Get() == &Conn;
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+}
+
+void
+HttpComputeService::Impl::OnActionsCompleted(std::span<const IComputeCompletionObserver::CompletedActionNotification> Actions)
+{
+ using ActionState = IComputeCompletionObserver::ActionState;
+ using CompletedActionNotification = IComputeCompletionObserver::CompletedActionNotification;
+
+ // Snapshot connections under shared lock
+ eastl::fixed_vector<Ref<WebSocketConnection>, 16> Connections;
+ m_WsConnectionsLock.WithSharedLock([&] { Connections = {begin(m_WsConnections), end(m_WsConnections)}; });
+
+ if (Connections.empty())
+ {
+ return;
+ }
+
+ // Build CompactBinary notification grouped by state:
+ // {"Completed": [lsn, ...], "Failed": [lsn, ...], ...}
+ // Each state name becomes an array key containing the LSNs in that state.
+ CbObjectWriter Cbo;
+
+ // Sort by state so we can emit one array per state in a single pass.
+ // Copy into a local vector since the span is const.
+ eastl::fixed_vector<CompletedActionNotification, 16> Sorted(Actions.begin(), Actions.end());
+ std::sort(Sorted.begin(), Sorted.end(), [](const auto& A, const auto& B) { return A.State < B.State; });
+
+ ActionState CurrentState{};
+ bool ArrayOpen = false;
+
+ for (const CompletedActionNotification& Action : Sorted)
+ {
+ if (!ArrayOpen || Action.State != CurrentState)
+ {
+ if (ArrayOpen)
+ {
+ Cbo.EndArray();
+ }
+ CurrentState = Action.State;
+ Cbo.BeginArray(IComputeCompletionObserver::ActionStateToString(CurrentState));
+ ArrayOpen = true;
+ }
+ Cbo.AddInteger(Action.Lsn);
+ }
+
+ if (ArrayOpen)
+ {
+ Cbo.EndArray();
+ }
+
+ CbObject Msg = Cbo.Save();
+ MemoryView MsgView = Msg.GetView();
+
+ // Broadcast to all connected clients, prune closed ones
+ bool HadClosedConnections = false;
+ for (auto& Conn : Connections)
+ {
+ if (Conn->IsOpen())
+ {
+ Conn->SendBinary(MsgView);
+ }
+ else
+ {
+ HadClosedConnections = true;
+ }
+ }
+
+ if (HadClosedConnections)
+ {
+ m_WsConnectionsLock.WithExclusiveLock([&] {
+ auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [](const Ref<WebSocketConnection>& C) {
+ return !C->IsOpen();
+ });
+ m_WsConnections.erase(It, m_WsConnections.end());
+ });
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
void
httpcomputeservice_forcelink()