// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/httpcomputeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "runners/functionrunner.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include using namespace std::literals; namespace zen::compute { constinit AsciiSet g_DecimalSet("0123456789"); constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSet::HasOnly(Str, g_HexSet); }; ////////////////////////////////////////////////////////////////////////// struct HttpComputeService::Impl { HttpComputeService* m_Self; CidStore& m_CidStore; IHttpStatsService& m_StatsService; LoggerRef m_Log; std::filesystem::path m_BaseDir; HttpRequestRouter m_Router; ComputeServiceSession m_ComputeService; SystemMetricsTracker m_MetricsTracker; // WebSocket connections (completion push) RwLock m_WsConnectionsLock; std::vector> m_WsConnections; // Metrics metrics::OperationTiming m_HttpRequests; // Per-remote-queue metadata, shared across all lookup maps below. struct RemoteQueueInfo : RefCounted { int QueueId = 0; Oid Token; std::string IdempotencyKey; // empty if no idempotency key was provided std::string ClientHostname; // empty if no hostname was provided }; // Remote queue registry — all three maps share the same RemoteQueueInfo objects. // All maps are guarded by m_RemoteQueueLock. RwLock m_RemoteQueueLock; std::unordered_map, Oid::Hasher> m_RemoteQueuesByToken; // Token → info std::unordered_map> m_RemoteQueuesByQueueId; // QueueId → info std::unordered_map> m_RemoteQueuesByTag; // idempotency key → info LoggerRef Log() { return m_Log; } int ResolveQueueToken(const Oid& Token); int ResolveQueueRef(HttpServerRequest& HttpReq, std::string_view Capture); struct IngestStats { int Count = 0; int NewCount = 0; uint64_t Bytes = 0; uint64_t NewBytes = 0; }; IngestStats IngestPackageAttachments(const CbPackage& Package); bool CheckAttachments(const CbObject& ActionObj, std::vector& NeedList); void HandleWorkersGet(HttpServerRequest& HttpReq); void HandleWorkersAllGet(HttpServerRequest& HttpReq); void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); void HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker); // WebSocket / observer void OnWebSocketOpen(Ref Connection); void OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code); void OnActionsCompleted(std::span Actions); void RegisterRoutes(); Impl(HttpComputeService* Self, CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions) : m_Self(Self) , m_CidStore(InCidStore) , m_StatsService(StatsService) , m_Log(logging::Get("compute")) , m_BaseDir(BaseDir) , m_ComputeService(InCidStore) { m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions); m_ComputeService.WaitUntilReady(); m_StatsService.RegisterHandler("compute", *m_Self); RegisterRoutes(); m_ComputeService.SetCompletionObserver(m_Self); } }; ////////////////////////////////////////////////////////////////////////// void HttpComputeService::Impl::RegisterRoutes() { m_Router.AddMatcher("lsn", DecimalMatcher); m_Router.AddMatcher("worker", IoHashMatcher); m_Router.AddMatcher("action", IoHashMatcher); m_Router.AddMatcher("queue", DecimalMatcher); m_Router.AddMatcher("oidtoken", OidMatcher); m_Router.AddMatcher("queueref", [](std::string_view Str) { return DecimalMatcher(Str) || OidMatcher(Str); }); m_Router.RegisterRoute( "ready", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (m_ComputeService.IsHealthy()) { return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); } return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); }, HttpVerb::kGet); m_Router.RegisterRoute( "abandon", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } bool Success = m_ComputeService.Abandon(); if (Success) { CbObjectWriter Cbo; Cbo << "state"sv << "Abandoned"sv; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } CbObjectWriter Cbo; Cbo << "error"sv << "Cannot transition to Abandoned from current state"sv; return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); }, HttpVerb::kPost); m_Router.RegisterRoute( "workers", [this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); }, HttpVerb::kGet); m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HandleWorkerRequest(Req.ServerRequest(), IoHash::FromHexString(Req.GetCapture(1))); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/completed", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); CbObjectWriter Cbo; m_ComputeService.GetCompleted(Cbo); ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query()); Cbo.BeginObject("metrics"); Describe(Sm, Cbo); Cbo.EndObject(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/history", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto QueryParams = HttpReq.GetQueryParams(); int QueryLimit = 50; if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) { QueryLimit = ParseInt(LimitParam).value_or(50); } CbObjectWriter Cbo; Cbo.BeginArray("history"); for (const auto& Entry : m_ComputeService.GetActionHistory(QueryLimit)) { Cbo.BeginObject(); Cbo << "lsn"sv << Entry.Lsn; Cbo << "queueId"sv << Entry.QueueId; Cbo << "actionId"sv << Entry.ActionId; Cbo << "workerId"sv << Entry.WorkerId; Cbo << "succeeded"sv << Entry.Succeeded; Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; if (Entry.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Entry.CpuSeconds); } if (Entry.RetryCount > 0) { Cbo << "retry_count"sv << Entry.RetryCount; } for (const auto& Timestamp : Entry.Timestamps) { Cbo.AddInteger( fmt::format("time_{}"sv, RunnerAction::ToString(static_cast(&Timestamp - Entry.Timestamps))), Timestamp); } Cbo.EndObject(); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/running", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); auto Running = m_ComputeService.GetRunningActions(); CbObjectWriter Cbo; Cbo.BeginArray("running"); for (const auto& Info : Running) { Cbo.BeginObject(); Cbo << "lsn"sv << Info.Lsn; Cbo << "queueId"sv << Info.QueueId; Cbo << "actionId"sv << Info.ActionId; if (Info.CpuUsagePercent >= 0.0f) { Cbo.AddFloat("cpuUsage"sv, Info.CpuUsagePercent); } if (Info.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Info.CpuSeconds); } Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{lsn}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int ActionLsn = ParseInt(Req.GetCapture(1)).value_or(0); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbPackage Output; HttpResponseCode ResponseCode = m_ComputeService.GetActionResult(ActionLsn, Output); if (ResponseCode == HttpResponseCode::OK) { HttpReq.WriteResponse(HttpResponseCode::OK, Output); } else { HttpReq.WriteResponse(ResponseCode); } // Once we've initiated the response we can mark the result // as retired, allowing the service to free any associated // resources. Note that there still needs to be a delay // to allow the transmission to complete, it would be better // if we could issue this once the response is fully sent... m_ComputeService.RetireActionResult(ActionLsn); } break; case HttpVerb::kPost: { auto Result = m_ComputeService.RescheduleAction(ActionLsn); CbObjectWriter Cbo; if (Result.Success) { Cbo << "lsn"sv << ActionLsn; Cbo << "retry_count"sv << Result.RetryCount; HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else { Cbo << "error"sv << Result.Error; HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{lsn}/retract", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int ActionLsn = ParseInt(Req.GetCapture(1)).value_or(0); auto Result = m_ComputeService.RetractAction(ActionLsn); CbObjectWriter Cbo; if (Result.Success) { Cbo << "success"sv << true; Cbo << "lsn"sv << ActionLsn; HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else { Cbo << "error"sv << Result.Error; HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); } }, HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the // one which uses the scheduled action lsn for lookups [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); CbPackage Output; if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output); ResponseCode != HttpResponseCode::OK) { ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) if (ResponseCode == HttpResponseCode::NotFound) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(ResponseCode); } ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) return HttpReq.WriteResponse(HttpResponseCode::OK, Output); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); WorkerDesc Worker = m_ComputeService.GetWorkerDescriptor(WorkerId); if (!Worker) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } HandleSubmitAction(HttpReq, 0, RequestPriority, &Worker); }, HttpVerb::kPost); m_Router.RegisterRoute( "jobs", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto QueryParams = HttpReq.GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } HandleSubmitAction(HttpReq, 0, RequestPriority, nullptr); }, HttpVerb::kPost); m_Router.RegisterRoute( "workers/all", [this](HttpRouterRequest& Req) { HandleWorkersAllGet(Req.ServerRequest()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/workers", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0) return; HandleWorkersGet(HttpReq); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/workers/all", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0) return; HandleWorkersAllGet(HttpReq); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0) return; HandleWorkerRequest(HttpReq, IoHash::FromHexString(Req.GetCapture(2))); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "sysinfo", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query()); CbObjectWriter Cbo; Describe(Sm, Cbo); Cbo << "cpu_usage" << Sm.CpuUsagePercent; Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; Cbo << "disk_used" << 100 * 1024; Cbo << "disk_total" << 100 * 1024 * 1024; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "record/start", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording"); return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); m_Router.RegisterRoute( "record/stop", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } m_ComputeService.StopRecording(); return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); // Local-only queue listing and creation m_Router.RegisterRoute( "queues", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbObjectWriter Cbo; Cbo.BeginArray("queues"sv); for (const int QueueId : m_ComputeService.GetQueueIds()) { ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { continue; } Cbo.BeginObject(); WriteQueueDescription(Cbo, QueueId, Status); Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } case HttpVerb::kPost: { CbObject Metadata; CbObject Config; if (const CbObject Body = HttpReq.ReadPayloadObject()) { Metadata = Body.Find("metadata"sv).AsObject(); Config = Body.Find("config"sv).AsObject(); } ComputeServiceSession::CreateQueueResult Result = m_ComputeService.CreateQueue({}, std::move(Metadata), std::move(Config)); CbObjectWriter Cbo; Cbo << "queue_id"sv << Result.QueueId; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } default: break; } }, HttpVerb::kGet | HttpVerb::kPost); // Queue creation routes — these remain separate since local creates a plain queue // while remote additionally generates an OID token for external access. m_Router.RegisterRoute( "queues/remote", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); // Extract optional fields from the request body. // idempotency_key: when present, we return the existing remote queue token for this // key rather than creating a new queue, making the endpoint safe to call concurrently. // hostname: human-readable origin context stored alongside the queue for diagnostics. // metadata: arbitrary CbObject metadata propagated from the originating queue. // config: arbitrary CbObject config propagated from the originating queue. std::string IdempotencyKey; std::string ClientHostname; CbObject Metadata; CbObject Config; if (const CbObject Body = HttpReq.ReadPayloadObject()) { IdempotencyKey = std::string(Body["idempotency_key"sv].AsString()); ClientHostname = std::string(Body["hostname"sv].AsString()); Metadata = Body.Find("metadata"sv).AsObject(); Config = Body.Find("config"sv).AsObject(); } // Stamp the forwarding node's hostname into the metadata so that the // remote side knows which node originated the queue. if (!ClientHostname.empty()) { CbObjectWriter MetaWriter; for (auto Field : Metadata) { MetaWriter.AddField(Field.GetName(), Field); } MetaWriter << "via"sv << ClientHostname; Metadata = MetaWriter.Save(); } RwLock::ExclusiveLockScope _(m_RemoteQueueLock); if (!IdempotencyKey.empty()) { if (auto It = m_RemoteQueuesByTag.find(IdempotencyKey); It != m_RemoteQueuesByTag.end()) { Ref Existing = It->second; if (m_ComputeService.GetQueueStatus(Existing->QueueId).IsValid) { CbObjectWriter Cbo; Cbo << "queue_token"sv << Existing->Token.ToString(); Cbo << "queue_id"sv << Existing->QueueId; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } // Queue has since expired — clean up stale entries and fall through to create a new one m_RemoteQueuesByToken.erase(Existing->Token); m_RemoteQueuesByQueueId.erase(Existing->QueueId); m_RemoteQueuesByTag.erase(It); } } ComputeServiceSession::CreateQueueResult Result = m_ComputeService.CreateQueue({}, std::move(Metadata), std::move(Config)); Ref InfoRef(new RemoteQueueInfo()); InfoRef->QueueId = Result.QueueId; InfoRef->Token = Oid::NewOid(); InfoRef->IdempotencyKey = std::move(IdempotencyKey); InfoRef->ClientHostname = std::move(ClientHostname); m_RemoteQueuesByToken[InfoRef->Token] = InfoRef; m_RemoteQueuesByQueueId[InfoRef->QueueId] = InfoRef; if (!InfoRef->IdempotencyKey.empty()) { m_RemoteQueuesByTag[InfoRef->IdempotencyKey] = InfoRef; } CbObjectWriter Cbo; Cbo << "queue_token"sv << InfoRef->Token.ToString(); Cbo << "queue_id"sv << InfoRef->QueueId; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); // Unified queue routes — {queueref} accepts both local integer IDs and remote OID tokens. // ResolveQueueRef() handles access control (local-only for integer IDs) and token resolution. m_Router.RegisterRoute( "queues/{queueref}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter Cbo; WriteQueueDescription(Cbo, QueueId, Status); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } case HttpVerb::kDelete: { ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } m_ComputeService.CancelQueue(QueueId); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } default: break; } }, HttpVerb::kGet | HttpVerb::kDelete); m_Router.RegisterRoute( "queues/{queueref}/drain", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } m_ComputeService.DrainQueue(QueueId); // Return updated queue status Status = m_ComputeService.GetQueueStatus(QueueId); CbObjectWriter Cbo; WriteQueueDescription(Cbo, QueueId, Status); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/completed", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter Cbo; m_ComputeService.GetQueueCompleted(QueueId, Cbo); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/history", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = HttpReq.GetQueryParams(); int QueryLimit = 50; if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) { QueryLimit = ParseInt(LimitParam).value_or(50); } CbObjectWriter Cbo; Cbo.BeginArray("history"); for (const auto& Entry : m_ComputeService.GetQueueHistory(QueueId, QueryLimit)) { Cbo.BeginObject(); Cbo << "lsn"sv << Entry.Lsn; Cbo << "queueId"sv << Entry.QueueId; Cbo << "actionId"sv << Entry.ActionId; Cbo << "workerId"sv << Entry.WorkerId; Cbo << "succeeded"sv << Entry.Succeeded; if (Entry.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Entry.CpuSeconds); } if (Entry.RetryCount > 0) { Cbo << "retry_count"sv << Entry.RetryCount; } for (const auto& Timestamp : Entry.Timestamps) { Cbo.AddInteger( fmt::format("time_{}"sv, RunnerAction::ToString(static_cast(&Timestamp - Entry.Timestamps))), Timestamp); } Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/running", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } // Filter global running list to this queue auto AllRunning = m_ComputeService.GetRunningActions(); std::vector Running; for (auto& Info : AllRunning) if (Info.QueueId == QueueId) Running.push_back(Info); CbObjectWriter Cbo; Cbo.BeginArray("running"); for (const auto& Info : Running) { Cbo.BeginObject(); Cbo << "lsn"sv << Info.Lsn; Cbo << "queueId"sv << Info.QueueId; Cbo << "actionId"sv << Info.ActionId; if (Info.CpuUsagePercent >= 0.0f) { Cbo.AddFloat("cpuUsage"sv, Info.CpuUsagePercent); } if (Info.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Info.CpuSeconds); } Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(2)); WorkerDesc Worker = m_ComputeService.GetWorkerDescriptor(WorkerId); if (!Worker) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } HandleSubmitAction(HttpReq, QueueId, RequestPriority, &Worker); }, HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/jobs", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } HandleSubmitAction(HttpReq, QueueId, RequestPriority, nullptr); }, HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/jobs/{lsn}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); const int ActionLsn = ParseInt(Req.GetCapture(2)).value_or(0); if (QueueId == 0) { return; } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { ZEN_UNUSED(QueueId); CbPackage Output; HttpResponseCode ResponseCode = m_ComputeService.GetActionResult(ActionLsn, Output); if (ResponseCode == HttpResponseCode::OK) { HttpReq.WriteResponse(HttpResponseCode::OK, Output); } else { HttpReq.WriteResponse(ResponseCode); } m_ComputeService.RetireActionResult(ActionLsn); } break; case HttpVerb::kPost: { ZEN_UNUSED(QueueId); auto Result = m_ComputeService.RescheduleAction(ActionLsn); CbObjectWriter Cbo; if (Result.Success) { Cbo << "lsn"sv << ActionLsn; Cbo << "retry_count"sv << Result.RetryCount; HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else { Cbo << "error"sv << Result.Error; HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/jobs/{lsn}/retract", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); const int ActionLsn = ParseInt(Req.GetCapture(2)).value_or(0); if (QueueId == 0) { return; } ZEN_UNUSED(QueueId); auto Result = m_ComputeService.RetractAction(ActionLsn); CbObjectWriter Cbo; if (Result.Success) { Cbo << "success"sv << true; Cbo << "lsn"sv << ActionLsn; HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else { Cbo << "error"sv << Result.Error; HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); } }, HttpVerb::kPost); // WebSocket upgrade endpoint — the handler logic lives in // HttpComputeService::OnWebSocket* methods; this route merely // satisfies the router so the upgrade request isn't rejected. m_Router.RegisterRoute( "ws", [this](HttpRouterRequest& Req) { Req.ServerRequest().WriteResponse(HttpResponseCode::OK); }, HttpVerb::kGet); } ////////////////////////////////////////////////////////////////////////// HttpComputeService::HttpComputeService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions) : m_Impl(std::make_unique(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions)) { } HttpComputeService::~HttpComputeService() { m_Impl->m_ComputeService.SetCompletionObserver(nullptr); m_Impl->m_StatsService.UnregisterHandler("compute", *this); } void HttpComputeService::Shutdown() { // Null out observer before shutting down the compute session to prevent // callbacks into a partially-torn-down service. m_Impl->m_ComputeService.SetCompletionObserver(nullptr); m_Impl->m_WsConnectionsLock.WithExclusiveLock([&] { m_Impl->m_WsConnections.clear(); }); m_Impl->m_ComputeService.Shutdown(); } ComputeServiceSession::ActionCounts HttpComputeService::GetActionCounts() { return m_Impl->m_ComputeService.GetActionCounts(); } const char* HttpComputeService::BaseUri() const { return "/compute/"; } void HttpComputeService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpComputeService::HandleRequest"); metrics::OperationTiming::Scope $(m_Impl->m_HttpRequests); if (m_Impl->m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } void HttpComputeService::HandleStatsRequest(HttpServerRequest& Request) { CbObjectWriter Cbo; m_Impl->m_ComputeService.EmitStats(Cbo); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } ////////////////////////////////////////////////////////////////////////// void HttpComputeService::Impl::WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status) { Cbo << "queue_id"sv << Status.QueueId; Cbo << "active_count"sv << Status.ActiveCount; Cbo << "completed_count"sv << Status.CompletedCount; Cbo << "failed_count"sv << Status.FailedCount; Cbo << "abandoned_count"sv << Status.AbandonedCount; Cbo << "cancelled_count"sv << Status.CancelledCount; Cbo << "state"sv << ToString(Status.State); Cbo << "cancelled"sv << (Status.State == ComputeServiceSession::QueueState::Cancelled); Cbo << "draining"sv << (Status.State == ComputeServiceSession::QueueState::Draining); Cbo << "is_complete"sv << Status.IsComplete; if (CbObject Meta = m_ComputeService.GetQueueMetadata(QueueId)) { Cbo << "metadata"sv << Meta; } if (CbObject Cfg = m_ComputeService.GetQueueConfig(QueueId)) { Cbo << "config"sv << Cfg; } { RwLock::SharedLockScope $(m_RemoteQueueLock); if (auto It = m_RemoteQueuesByQueueId.find(QueueId); It != m_RemoteQueuesByQueueId.end()) { Cbo << "queue_token"sv << It->second->Token.ToString(); if (!It->second->ClientHostname.empty()) { Cbo << "hostname"sv << It->second->ClientHostname; } } } } ////////////////////////////////////////////////////////////////////////// int HttpComputeService::Impl::ResolveQueueToken(const Oid& Token) { RwLock::SharedLockScope $(m_RemoteQueueLock); auto It = m_RemoteQueuesByToken.find(Token); if (It != m_RemoteQueuesByToken.end()) { return It->second->QueueId; } return 0; } int HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::string_view Capture) { if (OidMatcher(Capture)) { // Remote OID token — accessible from any client const Oid Token = Oid::FromHexString(Capture); const int QueueId = ResolveQueueToken(Token); if (QueueId == 0) { HttpReq.WriteResponse(HttpResponseCode::NotFound); } return QueueId; } // Local integer queue ID — restricted to local machine requests if (!HttpReq.IsLocalMachineRequest()) { HttpReq.WriteResponse(HttpResponseCode::Forbidden); return 0; } return ParseInt(Capture).value_or(0); } HttpComputeService::Impl::IngestStats HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package) { IngestStats Stats; for (const CbAttachment& Attachment : Package.GetAttachments()) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); Stats.Bytes += CompressedSize; ++Stats.Count; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { Stats.NewBytes += CompressedSize; ++Stats.NewCount; } } return Stats; } bool HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vector& NeedList) { ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); return NeedList.empty(); } void HttpComputeService::Impl::HandleSubmitAction(HttpServerRequest& HttpReq, int QueueId, int Priority, const WorkerDesc* Worker) { // QueueId > 0: queue-scoped enqueue; QueueId == 0: implicit queue (global routes) auto Enqueue = [&](CbObject ActionObj) -> ComputeServiceSession::EnqueueResult { if (QueueId > 0) { if (Worker) { return m_ComputeService.EnqueueResolvedActionToQueue(QueueId, *Worker, ActionObj, Priority); } return m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, Priority); } else { if (Worker) { return m_ComputeService.EnqueueResolvedAction(*Worker, ActionObj, Priority); } return m_ComputeService.EnqueueAction(ActionObj, Priority); } }; // Read payload upfront and handle attachments based on content type CbObject Body; IngestStats Stats = {}; switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { IoBuffer Payload = HttpReq.ReadPayload(); Body = LoadCompactBinaryObject(Payload); break; } case HttpContentType::kCbPackage: { CbPackage Package = HttpReq.ReadPayloadPackage(); Body = Package.GetObject(); Stats = IngestPackageAttachments(Package); break; } default: return HttpReq.WriteResponse(HttpResponseCode::BadRequest); } // Check for "actions" array to determine batch vs single-action path CbArray Actions = Body.Find("actions"sv).AsArray(); if (Actions.Num() > 0) { // --- Batch path --- // For CbObject payloads, check all attachments upfront before enqueuing anything if (HttpReq.RequestContentType() == HttpContentType::kCbObject) { std::vector NeedList; for (CbField ActionField : Actions) { CbObject ActionObj = ActionField.AsObject(); CheckAttachments(ActionObj, NeedList); } if (!NeedList.empty()) { CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); } } // Enqueue all actions and collect results CbObjectWriter Cbo; int Accepted = 0; Cbo.BeginArray("results"); for (CbField ActionField : Actions) { CbObject ActionObj = ActionField.AsObject(); ComputeServiceSession::EnqueueResult Result = Enqueue(ActionObj); Cbo.BeginObject(); if (Result) { Cbo << "lsn"sv << Result.Lsn; ++Accepted; } else { Cbo << "error"sv << Result.ResponseMessage; } Cbo.EndObject(); } Cbo.EndArray(); if (Stats.Count > 0) { ZEN_DEBUG("queue {}: batch accepted {}/{} actions: {} in {} attachments. {} new ({} attachments)", QueueId, Accepted, Actions.Num(), zen::NiceBytes(Stats.Bytes), Stats.Count, zen::NiceBytes(Stats.NewBytes), Stats.NewCount); } else { ZEN_DEBUG("queue {}: batch accepted {}/{} actions", QueueId, Accepted, Actions.Num()); } return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } // --- Single-action path: Body is the action itself --- if (HttpReq.RequestContentType() == HttpContentType::kCbObject) { std::vector NeedList; if (!CheckAttachments(Body, NeedList)) { CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); } } if (ComputeServiceSession::EnqueueResult Result = Enqueue(Body)) { if (Stats.Count > 0) { ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", QueueId, Body.GetHash(), Result.Lsn, zen::NiceBytes(Stats.Bytes), Stats.Count, zen::NiceBytes(Stats.NewBytes), Stats.NewCount); } else { ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, Body.GetHash(), Result.Lsn); } return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } void HttpComputeService::Impl::HandleWorkersGet(HttpServerRequest& HttpReq) { CbObjectWriter Cbo; Cbo.BeginArray("workers"sv); for (const IoHash& WorkerId : m_ComputeService.GetKnownWorkerIds()) { Cbo << WorkerId; } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpComputeService::Impl::HandleWorkersAllGet(HttpServerRequest& HttpReq) { std::vector WorkerIds = m_ComputeService.GetKnownWorkerIds(); CbObjectWriter Cbo; Cbo.BeginArray("workers"); for (const IoHash& WorkerId : WorkerIds) { Cbo.BeginObject(); Cbo << "id" << WorkerId; Cbo << "descriptor" << m_ComputeService.GetWorkerDescriptor(WorkerId).Descriptor.GetObject(); Cbo.EndObject(); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId) { switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: if (WorkerDesc Desc = m_ComputeService.GetWorkerDescriptor(WorkerId)) { return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); } return HttpReq.WriteResponse(HttpResponseCode::NotFound); case HttpVerb::kPost: { switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { CbObject WorkerSpec = HttpReq.ReadPayloadObject(); HashKeySet ChunkSet; WorkerSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); ChunkSet.AddHashToSet(Hash); }); CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerSpec); m_CidStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { ZEN_DEBUG("worker {}: all attachments already available", WorkerId); m_ComputeService.RegisterWorker(WorkerPackage); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); ChunkSet.IterateHashes([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); }); ResponseWriter.EndArray(); ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } break; case HttpContentType::kCbPackage: { CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); CbObject WorkerSpec = WorkerSpecPackage.GetObject(); std::span Attachments = WorkerSpecPackage.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer Buffer = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += Buffer.GetCompressedSize(); ++NewAttachmentCount; } } ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", WorkerId, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); m_ComputeService.RegisterWorker(WorkerSpecPackage); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; default: break; } } break; default: break; } } ////////////////////////////////////////////////////////////////////////// // // IWebSocketHandler // void HttpComputeService::OnWebSocketOpen(Ref Connection) { m_Impl->OnWebSocketOpen(std::move(Connection)); } void HttpComputeService::OnWebSocketMessage([[maybe_unused]] WebSocketConnection& Conn, [[maybe_unused]] const WebSocketMessage& Msg) { // Clients are receive-only; ignore any inbound messages. } void HttpComputeService::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code, [[maybe_unused]] std::string_view Reason) { m_Impl->OnWebSocketClose(Conn, Code); } void HttpComputeService::OnActionsCompleted(std::span Actions) { m_Impl->OnActionsCompleted(Actions); } ////////////////////////////////////////////////////////////////////////// // // Impl — WebSocket / observer // void HttpComputeService::Impl::OnWebSocketOpen(Ref Connection) { ZEN_INFO("compute WebSocket client connected"); m_WsConnectionsLock.WithExclusiveLock([&] { m_WsConnections.push_back(std::move(Connection)); }); } void HttpComputeService::Impl::OnWebSocketClose(WebSocketConnection& Conn, uint16_t Code) { ZEN_INFO("compute WebSocket client disconnected (code {})", Code); m_WsConnectionsLock.WithExclusiveLock([&] { auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [&Conn](const Ref& C) { return C.Get() == &Conn; }); m_WsConnections.erase(It, m_WsConnections.end()); }); } void HttpComputeService::Impl::OnActionsCompleted(std::span Actions) { using ActionState = IComputeCompletionObserver::ActionState; using CompletedActionNotification = IComputeCompletionObserver::CompletedActionNotification; // Snapshot connections under shared lock eastl::fixed_vector, 16> Connections; m_WsConnectionsLock.WithSharedLock([&] { Connections = {begin(m_WsConnections), end(m_WsConnections)}; }); if (Connections.empty()) { return; } // Build CompactBinary notification grouped by state: // {"Completed": [lsn, ...], "Failed": [lsn, ...], ...} // Each state name becomes an array key containing the LSNs in that state. CbObjectWriter Cbo; // Sort by state so we can emit one array per state in a single pass. // Copy into a local vector since the span is const. eastl::fixed_vector Sorted(Actions.begin(), Actions.end()); std::sort(Sorted.begin(), Sorted.end(), [](const auto& A, const auto& B) { return A.State < B.State; }); ActionState CurrentState{}; bool ArrayOpen = false; for (const CompletedActionNotification& Action : Sorted) { if (!ArrayOpen || Action.State != CurrentState) { if (ArrayOpen) { Cbo.EndArray(); } CurrentState = Action.State; Cbo.BeginArray(IComputeCompletionObserver::ActionStateToString(CurrentState)); ArrayOpen = true; } Cbo.AddInteger(Action.Lsn); } if (ArrayOpen) { Cbo.EndArray(); } CbObject Msg = Cbo.Save(); MemoryView MsgView = Msg.GetView(); // Broadcast to all connected clients, prune closed ones bool HadClosedConnections = false; for (auto& Conn : Connections) { if (Conn->IsOpen()) { Conn->SendBinary(MsgView); } else { HadClosedConnections = true; } } if (HadClosedConnections) { m_WsConnectionsLock.WithExclusiveLock([&] { auto It = std::remove_if(m_WsConnections.begin(), m_WsConnections.end(), [](const Ref& C) { return !C->IsOpen(); }); m_WsConnections.erase(It, m_WsConnections.end()); }); } } ////////////////////////////////////////////////////////////////////////// void httpcomputeservice_forcelink() { } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES