diff options
| author | Stefan Boberg <[email protected]> | 2026-03-18 11:19:10 +0100 |
|---|---|---|
| committer | GitHub Enterprise <[email protected]> | 2026-03-18 11:19:10 +0100 |
| commit | eba410c4168e23d7908827eb34b7cf0c58a5dc48 (patch) | |
| tree | 3cda8e8f3f81941d3bb5b84a8155350c5bb2068c /src/zencompute/httpcomputeservice.cpp | |
| parent | bugfix release - v5.7.23 (#851) (diff) | |
| download | zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.tar.xz zen-eba410c4168e23d7908827eb34b7cf0c58a5dc48.zip | |
Compute batching (#849)
### Compute Batch Submission
- Consolidate duplicated action submission logic in `httpcomputeservice` into a single `HandleSubmitAction` supporting both single-action and batch (actions array) payloads
- Group actions by queue in `RemoteHttpRunner` and submit as batches with configurable chunk size, falling back to individual submission on failure
- Extract shared helpers: `MakeErrorResult`, `ValidateQueueForEnqueue`, `ActivateActionInQueue`, `RemoveActionFromActiveMaps`
### Retracted Action State
- Add `Retracted` state to `RunnerAction` for retry-free rescheduling — an explicit request to pull an action back and reschedule it on a different runner without incrementing `RetryCount`
- Implement idempotent `RetractAction()` on `RunnerAction` and `ComputeServiceSession`
- Add `POST jobs/{lsn}/retract` and `queues/{queueref}/jobs/{lsn}/retract` HTTP endpoints
- Add state machine documentation and per-state comments to `RunnerAction`
### Compute Race Fixes
- Fix race in `HandleActionUpdates` where actions enqueued between session abandon and scheduler tick were never abandoned, causing `GetActionResult` to return 202 indefinitely
- Fix queue `ActiveCount` race where `NotifyQueueActionComplete` was called after releasing `m_ResultsLock`, allowing callers to observe stale counters immediately after `GetActionResult` returned OK
### Logging Optimization and ANSI improvements
- Improve `AnsiColorStdoutSink` write efficiency — single write call, dirty-flag flush, `RwLock` instead of `std::mutex`
- Move ANSI color emission from sink into formatters via `Formatter::SetColorEnabled()`; remove `ColorRangeStart`/`End` from `LogMessage`
- Extract color helpers (`AnsiColorForLevel`, `StripAnsiSgrSequences`) into `helpers.h`
- Strip upstream ANSI SGR escapes in non-color output mode. This enables colour in log messages without polluting log files with ANSI control sequences
- Move `RotatingFileSink`, `JsonFormatter`, and `FullFormatter` from header-only to pimpl with `.cpp` files
### CLI / Exec Refactoring
- Extract `ExecSessionRunner` class from ~920-line `ExecUsingSession` into focused methods and a `ExecSessionConfig` struct
- Replace monolithic `ExecCommand` with subcommand-based architecture (`http`, `inproc`, `beacon`, `dump`, `buildlog`)
- Allow parent options to appear after subcommand name by parsing subcommand args permissively and forwarding unmatched tokens to the parent parser
### Testing Improvements
- Fix `--test-suite` filter being ignored due to accumulation with default wildcard filter
- Add test suite banners to test listener output
- Made `function.session.abandon_pending` test more robust
### Startup / Reliability Fixes
- Fix silent exit when a second zenserver instance detects a port conflict — use `ZEN_CONSOLE_*` for log calls that precede `InitializeLogging()`
- Fix two potential SIGSEGV paths during early startup: guard `sentry_options_new()` returning nullptr, and throw on `ZenServerState::Register()` returning nullptr instead of dereferencing
- Fail on unrecognized zenserver `--mode` instead of silently defaulting to store
### Other
- Show host details (hostname, platform, CPU count, memory) when discovering new compute workers
- Move frontend `html.zip` from source tree into build directory
- Add format specifications for Compact Binary and Compressed Buffer wire formats
- Add `WriteCompactBinaryObject` to zencore
- Extended `ConsoleTui` with additional functionality
- Add `--vscode` option to `xmake sln` for clangd / `compile_commands.json` support
- Disable compute/horde/nomad in release builds (not yet production-ready)
- Disable unintended `ASIO_HAS_IO_URING` enablement
- Fix crashpad patch missing leading whitespace
- Clean up code triggering gcc false positives
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
| -rw-r--r-- | src/zencompute/httpcomputeservice.cpp | 760 |
1 files changed, 395 insertions, 365 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp index e82a40781..bdfd9d197 100644 --- a/src/zencompute/httpcomputeservice.cpp +++ b/src/zencompute/httpcomputeservice.cpp @@ -16,6 +16,7 @@ # include <zencore/iobuffer.h> # include <zencore/iohash.h> # include <zencore/logging.h> +# include <zencore/string.h> # include <zencore/system.h> # include <zencore/thread.h> # include <zencore/trace.h> @@ -23,8 +24,10 @@ # include <zenstore/cidstore.h> # include <zentelemetry/stats.h> +# include <algorithm> # include <span> # include <unordered_map> +# include <vector> using namespace std::literals; @@ -50,6 +53,11 @@ struct HttpComputeService::Impl ComputeServiceSession m_ComputeService; SystemMetricsTracker m_MetricsTracker; + // WebSocket connections (completion push) + + RwLock m_WsConnectionsLock; + std::vector<Ref<WebSocketConnection>> m_WsConnections; + // Metrics metrics::OperationTiming m_HttpRequests; @@ -91,6 +99,12 @@ struct HttpComputeService::Impl void HandleWorkersAllGet(HttpServerRequest& HttpReq); void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); + void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); + + // WebSocket / observer + void OnWebSocketOpen(Ref<WebSocketConnection> Connection); + void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code); + void OnActionsCompleted(std::span<const IComputeCompletionObserver::CompletedActionNotification> Actions); void RegisterRoutes(); @@ -110,6 +124,7 @@ struct HttpComputeService::Impl m_ComputeService.WaitUntilReady(); m_StatsService.RegisterHandler("compute", *m_Self); RegisterRoutes(); + m_ComputeService.SetCompletionObserver(m_Self); } }; @@ -149,7 +164,7 @@ HttpComputeService::Impl::RegisterRoutes() return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } - bool Success = m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Abandoned); + bool Success = m_ComputeService.Abandon(); if (Success) { @@ -325,6 +340,29 @@ HttpComputeService::Impl::RegisterRoutes() HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( + "jobs/{lsn}/retract", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const int ActionLsn = ParseInt<int>(Req.GetCapture(1)).value_or(0); + + auto Result = m_ComputeService.RetractAction(ActionLsn); + + CbObjectWriter Cbo; + if (Result.Success) + { + Cbo << "success"sv << true; + Cbo << "lsn"sv << ActionLsn; + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + else + { + Cbo << "error"sv << Result.Error; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } + }, + HttpVerb::kPost); + + m_Router.RegisterRoute( "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the // one which uses the scheduled action lsn for lookups [this](HttpRouterRequest& Req) { @@ -373,127 +411,7 @@ HttpComputeService::Impl::RegisterRoutes() RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); } - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - // TODO: return status of all pending or executing jobs - break; - - case HttpVerb::kPost: - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject ActionObj = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - ActionObj.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything, enqueue the action for execution - - if (ComputeServiceSession::EnqueueResult Result = - m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) - { - ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); - - HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - - return; - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - if (ComputeServiceSession::EnqueueResult Result = - m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) - { - ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", - ActionObj.GetHash(), - Result.Lsn, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - - return; - } - break; - - default: - break; - } - break; - - default: - break; - } + HandleSubmitAction(HttpReq, 0, RequestPriority, &Worker); }, HttpVerb::kPost); @@ -511,118 +429,7 @@ HttpComputeService::Impl::RegisterRoutes() RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); } - // Resolve worker - - // - - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject ActionObj = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - ActionObj.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything, enqueue the action for execution - - if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority)) - { - ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - // Could not resolve? - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority)) - { - ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", - Result.Lsn, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - // Could not resolve? - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - return; - } + HandleSubmitAction(HttpReq, 0, RequestPriority, nullptr); }, HttpVerb::kPost); @@ -1090,72 +897,7 @@ HttpComputeService::Impl::RegisterRoutes() RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); } - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject ActionObj = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - if (!CheckAttachments(ActionObj, NeedList)) - { - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); - } - - if (ComputeServiceSession::EnqueueResult Result = - m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority)) - { - ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, ActionObj.GetHash(), Result.Lsn); - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - IngestStats Stats = IngestPackageAttachments(Action); - - if (ComputeServiceSession::EnqueueResult Result = - m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority)) - { - ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", - QueueId, - ActionObj.GetHash(), - Result.Lsn, - zen::NiceBytes(Stats.Bytes), - Stats.Count, - zen::NiceBytes(Stats.NewBytes), - Stats.NewCount); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - - default: - break; - } + HandleSubmitAction(HttpReq, QueueId, RequestPriority, &Worker); }, HttpVerb::kPost); @@ -1178,71 +920,7 @@ HttpComputeService::Impl::RegisterRoutes() RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); } - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject ActionObj = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - if (!CheckAttachments(ActionObj, NeedList)) - { - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); - } - - if (ComputeServiceSession::EnqueueResult Result = - m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority)) - { - ZEN_DEBUG("queue {}: action accepted (lsn {})", QueueId, Result.Lsn); - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - IngestStats Stats = IngestPackageAttachments(Action); - - if (ComputeServiceSession::EnqueueResult Result = - m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority)) - { - ZEN_DEBUG("queue {}: accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", - QueueId, - Result.Lsn, - zen::NiceBytes(Stats.Bytes), - Stats.Count, - zen::NiceBytes(Stats.NewBytes), - Stats.NewCount); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - - default: - break; - } + HandleSubmitAction(HttpReq, QueueId, RequestPriority, nullptr); }, HttpVerb::kPost); @@ -1306,6 +984,45 @@ HttpComputeService::Impl::RegisterRoutes() } }, HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "queues/{queueref}/jobs/{lsn}/retract", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); + const int ActionLsn = ParseInt<int>(Req.GetCapture(2)).value_or(0); + + if (QueueId == 0) + { + return; + } + + ZEN_UNUSED(QueueId); + + auto Result = m_ComputeService.RetractAction(ActionLsn); + + CbObjectWriter Cbo; + if (Result.Success) + { + Cbo << "success"sv << true; + Cbo << "lsn"sv << ActionLsn; + HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + else + { + Cbo << "error"sv << Result.Error; + HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); + } + }, + HttpVerb::kPost); + + // WebSocket upgrade endpoint — the handler logic lives in + // HttpComputeService::OnWebSocket* methods; this route merely + // satisfies the router so the upgrade request isn't rejected. + m_Router.RegisterRoute( + "ws", + [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, + HttpVerb::kGet); } ////////////////////////////////////////////////////////////////////////// @@ -1320,12 +1037,17 @@ HttpComputeService::HttpComputeService(CidStore& InCidStore, HttpComputeService::~HttpComputeService() { + m_Impl->m_ComputeService.SetCompletionObserver(nullptr); m_Impl->m_StatsService.UnregisterHandler("compute", *this); } void HttpComputeService::Shutdown() { + // Null out observer before shutting down the compute session to prevent + // callbacks into a partially-torn-down service. + m_Impl->m_ComputeService.SetCompletionObserver(nullptr); + m_Impl->m_WsConnectionsLock.WithExclusiveLock([&] { m_Impl->m_WsConnections.clear(); }); m_Impl->m_ComputeService.Shutdown(); } @@ -1492,6 +1214,184 @@ HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vecto } void +HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker) +{ + // QueueId > 0: queue-scoped enqueue; QueueId == 0: implicit queue (global routes) + auto Enqueue = [&](CbObject ActionObj) -> ComputeServiceSession::EnqueueResult { + if (QueueId > 0) + { + if (Worker) + { + return m_ComputeService.EnqueueResolvedActionToQueue(QueueId, *Worker, ActionObj, Priority); + } + return m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, Priority); + } + else + { + if (Worker) + { + return m_ComputeService.EnqueueResolvedAction(*Worker, ActionObj, Priority); + } + return m_ComputeService.EnqueueAction(ActionObj, Priority); + } + }; + + // Read payload upfront and handle attachments based on content type + CbObject Body; + IngestStats Stats = {}; + + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + IoBuffer Payload = HttpReq.ReadPayload(); + Body = LoadCompactBinaryObject(Payload); + break; + } + + case HttpContentType::kCbPackage: + { + CbPackage Package = HttpReq.ReadPayloadPackage(); + Body = Package.GetObject(); + Stats = IngestPackageAttachments(Package); + break; + } + + default: + return HttpReq.WriteResponse(HttpResponseCode::BadRequest); + } + + // Check for "actions" array to determine batch vs single-action path + CbArray Actions = Body.Find("actions"sv).AsArray(); + + if (Actions.Num() > 0) + { + // --- Batch path --- + + // For CbObject payloads, check all attachments upfront before enqueuing anything + if (HttpReq.RequestContentType() == HttpContentType::kCbObject) + { + std::vector<IoHash> NeedList; + + for (CbField ActionField : Actions) + { + CbObject ActionObj = ActionField.AsObject(); + CheckAttachments(ActionObj, NeedList); + } + + if (!NeedList.empty()) + { + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); + } + } + + // Enqueue all actions and collect results + CbObjectWriter Cbo; + int Accepted = 0; + + Cbo.BeginArray("results"); + + for (CbField ActionField : Actions) + { + CbObject ActionObj = ActionField.AsObject(); + + ComputeServiceSession::EnqueueResult Result = Enqueue(ActionObj); + + Cbo.BeginObject(); + + if (Result) + { + Cbo << "lsn"sv << Result.Lsn; + ++Accepted; + } + else + { + Cbo << "error"sv << Result.ResponseMessage; + } + + Cbo.EndObject(); + } + + Cbo.EndArray(); + + if (Stats.Count > 0) + { + ZEN_DEBUG("queue {}: batch accepted {}/{} actions: {} in {} attachments. {} new ({} attachments)", + QueueId, + Accepted, + Actions.Num(), + zen::NiceBytes(Stats.Bytes), + Stats.Count, + zen::NiceBytes(Stats.NewBytes), + Stats.NewCount); + } + else + { + ZEN_DEBUG("queue {}: batch accepted {}/{} actions", QueueId, Accepted, Actions.Num()); + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); + } + + // --- Single-action path: Body is the action itself --- + + if (HttpReq.RequestContentType() == HttpContentType::kCbObject) + { + std::vector<IoHash> NeedList; + + if (!CheckAttachments(Body, NeedList)) + { + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); + } + } + + if (ComputeServiceSession::EnqueueResult Result = Enqueue(Body)) + { + if (Stats.Count > 0) + { + ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", + QueueId, + Body.GetHash(), + Result.Lsn, + zen::NiceBytes(Stats.Bytes), + Stats.Count, + zen::NiceBytes(Stats.NewBytes), + Stats.NewCount); + } + else + { + ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, Body.GetHash(), Result.Lsn); + } + + return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); + } + else + { + return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); + } +} + +void HttpComputeService::Impl::HandleWorkersGet(HttpServerRequest& HttpReq) { CbObjectWriter Cbo; @@ -1632,6 +1532,136 @@ HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const } ////////////////////////////////////////////////////////////////////////// +// +// IWebSocketHandler +// + +void +HttpComputeService::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + m_Impl->OnWebSocketOpen(std::move(Connection)); +} + +void +HttpComputeService::OnWebSocketMessage([[maybe_unused]] WebSocketConnection& Conn, [[maybe_unused]] const WebSocketMessage& Msg) +{ + // Clients are receive-only; ignore any inbound messages. +} + +void +HttpComputeService::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, [[maybe_unused]] std::string_view Reason) +{ + m_Impl->OnWebSocketClose(Conn, Code); +} + +void +HttpComputeService::OnActionsCompleted(std::span<const CompletedActionNotification> Actions) +{ + m_Impl->OnActionsCompleted(Actions); +} + +////////////////////////////////////////////////////////////////////////// +// +// Impl — WebSocket / observer +// + +void +HttpComputeService::Impl::OnWebSocketOpen(Ref<WebSocketConnection> Connection) +{ + ZEN_INFO("compute WebSocket client connected"); + m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); +} + +void +HttpComputeService::Impl::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code) +{ + ZEN_INFO("compute WebSocket client disconnected (code {})", Code); + + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref<WebSocketConnection>& C) { + return C.Get() == &Conn; + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); +} + +void +HttpComputeService::Impl::OnActionsCompleted(std::span<const IComputeCompletionObserver::CompletedActionNotification> Actions) +{ + using ActionState = IComputeCompletionObserver::ActionState; + using CompletedActionNotification = IComputeCompletionObserver::CompletedActionNotification; + + // Snapshot connections under shared lock + eastl::fixed_vector<Ref<WebSocketConnection>, 16> Connections; + m_WsConnectionsLock.WithSharedLock([&] { Connections = {begin(m_WsConnections), end(m_WsConnections)}; }); + + if (Connections.empty()) + { + return; + } + + // Build CompactBinary notification grouped by state: + // {"Completed": [lsn, ...], "Failed": [lsn, ...], ...} + // Each state name becomes an array key containing the LSNs in that state. + CbObjectWriter Cbo; + + // Sort by state so we can emit one array per state in a single pass. + // Copy into a local vector since the span is const. + eastl::fixed_vector<CompletedActionNotification, 16> Sorted(Actions.begin(), Actions.end()); + std::sort(Sorted.begin(), Sorted.end(), [](const auto& A, const auto& B) { return A.State < B.State; }); + + ActionState CurrentState{}; + bool ArrayOpen = false; + + for (const CompletedActionNotification& Action : Sorted) + { + if (!ArrayOpen || Action.State != CurrentState) + { + if (ArrayOpen) + { + Cbo.EndArray(); + } + CurrentState = Action.State; + Cbo.BeginArray(IComputeCompletionObserver::ActionStateToString(CurrentState)); + ArrayOpen = true; + } + Cbo.AddInteger(Action.Lsn); + } + + if (ArrayOpen) + { + Cbo.EndArray(); + } + + CbObject Msg = Cbo.Save(); + MemoryView MsgView = Msg.GetView(); + + // Broadcast to all connected clients, prune closed ones + bool HadClosedConnections = false; + for (auto& Conn : Connections) + { + if (Conn->IsOpen()) + { + Conn->SendBinary(MsgView); + } + else + { + HadClosedConnections = true; + } + } + + if (HadClosedConnections) + { + m_WsConnectionsLock.WithExclusiveLock([&] { + auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [](const Ref<WebSocketConnection>& C) { + return !C->IsOpen(); + }); + m_WsConnections.erase(It, m_WsConnections.end()); + }); + } +} + +////////////////////////////////////////////////////////////////////////// void httpcomputeservice_forcelink() |