// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/httpcomputeservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "runners/functionrunner.h" # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include # include using namespace std::literals; namespace zen::compute { constinit AsciiSet g_DecimalSet("0123456789"); constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSet::HasOnly(Str, g_HexSet); }; ////////////////////////////////////////////////////////////////////////// struct HttpComputeService::Impl { HttpComputeService* m_Self; CidStore& m_CidStore; IHttpStatsService& m_StatsService; LoggerRef m_Log; std::filesystem::path m_BaseDir; HttpRequestRouter m_Router; ComputeServiceSession m_ComputeService; SystemMetricsTracker m_MetricsTracker; // Metrics metrics::OperationTiming m_HttpRequests; // Per-remote-queue metadata, shared across all lookup maps below. struct RemoteQueueInfo : RefCounted { int QueueId = 0; Oid Token; std::string IdempotencyKey; // empty if no idempotency key was provided std::string ClientHostname; // empty if no hostname was provided }; // Remote queue registry — all three maps share the same RemoteQueueInfo objects. // All maps are guarded by m_RemoteQueueLock. RwLock m_RemoteQueueLock; std::unordered_map, Oid::Hasher> m_RemoteQueuesByToken; // Token → info std::unordered_map> m_RemoteQueuesByQueueId; // QueueId → info std::unordered_map> m_RemoteQueuesByTag; // idempotency key → info LoggerRef Log() { return m_Log; } int ResolveQueueToken(const Oid& Token); int ResolveQueueRef(HttpServerRequest& HttpReq, std::string_view Capture); struct IngestStats { int Count = 0; int NewCount = 0; uint64_t Bytes = 0; uint64_t NewBytes = 0; }; IngestStats IngestPackageAttachments(const CbPackage& Package); bool CheckAttachments(const CbObject& ActionObj, std::vector& NeedList); void HandleWorkersGet(HttpServerRequest& HttpReq); void HandleWorkersAllGet(HttpServerRequest& HttpReq); void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status); void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId); void RegisterRoutes(); Impl(HttpComputeService* Self, CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions) : m_Self(Self) , m_CidStore(InCidStore) , m_StatsService(StatsService) , m_Log(logging::Get("compute")) , m_BaseDir(BaseDir) , m_ComputeService(InCidStore) { m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions); m_ComputeService.WaitUntilReady(); m_StatsService.RegisterHandler("compute", *m_Self); RegisterRoutes(); } }; ////////////////////////////////////////////////////////////////////////// void HttpComputeService::Impl::RegisterRoutes() { m_Router.AddMatcher("lsn", DecimalMatcher); m_Router.AddMatcher("worker", IoHashMatcher); m_Router.AddMatcher("action", IoHashMatcher); m_Router.AddMatcher("queue", DecimalMatcher); m_Router.AddMatcher("oidtoken", OidMatcher); m_Router.AddMatcher("queueref", [](std::string_view Str) { return DecimalMatcher(Str) || OidMatcher(Str); }); m_Router.RegisterRoute( "ready", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (m_ComputeService.IsHealthy()) { return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); } return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); }, HttpVerb::kGet); m_Router.RegisterRoute( "abandon", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } bool Success = m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Abandoned); if (Success) { CbObjectWriter Cbo; Cbo << "state"sv << "Abandoned"sv; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } CbObjectWriter Cbo; Cbo << "error"sv << "Cannot transition to Abandoned from current state"sv; return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); }, HttpVerb::kPost); m_Router.RegisterRoute( "workers", [this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); }, HttpVerb::kGet); m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HandleWorkerRequest(Req.ServerRequest(), IoHash::FromHexString(Req.GetCapture(1))); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/completed", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); CbObjectWriter Cbo; m_ComputeService.GetCompleted(Cbo); ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query()); Cbo.BeginObject("metrics"); Describe(Sm, Cbo); Cbo.EndObject(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/history", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto QueryParams = HttpReq.GetQueryParams(); int QueryLimit = 50; if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) { QueryLimit = ParseInt(LimitParam).value_or(50); } CbObjectWriter Cbo; Cbo.BeginArray("history"); for (const auto& Entry : m_ComputeService.GetActionHistory(QueryLimit)) { Cbo.BeginObject(); Cbo << "lsn"sv << Entry.Lsn; Cbo << "queueId"sv << Entry.QueueId; Cbo << "actionId"sv << Entry.ActionId; Cbo << "workerId"sv << Entry.WorkerId; Cbo << "succeeded"sv << Entry.Succeeded; Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; if (Entry.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Entry.CpuSeconds); } if (Entry.RetryCount > 0) { Cbo << "retry_count"sv << Entry.RetryCount; } for (const auto& Timestamp : Entry.Timestamps) { Cbo.AddInteger( fmt::format("time_{}"sv, RunnerAction::ToString(static_cast(&Timestamp - Entry.Timestamps))), Timestamp); } Cbo.EndObject(); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/running", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); auto Running = m_ComputeService.GetRunningActions(); CbObjectWriter Cbo; Cbo.BeginArray("running"); for (const auto& Info : Running) { Cbo.BeginObject(); Cbo << "lsn"sv << Info.Lsn; Cbo << "queueId"sv << Info.QueueId; Cbo << "actionId"sv << Info.ActionId; if (Info.CpuUsagePercent >= 0.0f) { Cbo.AddFloat("cpuUsage"sv, Info.CpuUsagePercent); } if (Info.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Info.CpuSeconds); } Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{lsn}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int ActionLsn = ParseInt(Req.GetCapture(1)).value_or(0); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbPackage Output; HttpResponseCode ResponseCode = m_ComputeService.GetActionResult(ActionLsn, Output); if (ResponseCode == HttpResponseCode::OK) { HttpReq.WriteResponse(HttpResponseCode::OK, Output); } else { HttpReq.WriteResponse(ResponseCode); } // Once we've initiated the response we can mark the result // as retired, allowing the service to free any associated // resources. Note that there still needs to be a delay // to allow the transmission to complete, it would be better // if we could issue this once the response is fully sent... m_ComputeService.RetireActionResult(ActionLsn); } break; case HttpVerb::kPost: { auto Result = m_ComputeService.RescheduleAction(ActionLsn); CbObjectWriter Cbo; if (Result.Success) { Cbo << "lsn"sv << ActionLsn; Cbo << "retry_count"sv << Result.RetryCount; HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else { Cbo << "error"sv << Result.Error; HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the // one which uses the scheduled action lsn for lookups [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); CbPackage Output; if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output); ResponseCode != HttpResponseCode::OK) { ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) if (ResponseCode == HttpResponseCode::NotFound) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(ResponseCode); } ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) return HttpReq.WriteResponse(HttpResponseCode::OK, Output); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); WorkerDesc Worker = m_ComputeService.GetWorkerDescriptor(WorkerId); if (!Worker) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: // TODO: return status of all pending or executing jobs break; case HttpVerb::kPost: switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { // This operation takes the proposed job spec and identifies which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject ActionObj = LoadCompactBinaryObject(Payload); std::vector NeedList; ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); if (NeedList.empty()) { // We already have everything, enqueue the action for execution if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) { ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } return; } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } break; case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); std::span Attachments = Action.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); TotalAttachmentBytes += CompressedSize; ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) { ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", ActionObj.GetHash(), Result.Lsn, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } return; } break; default: break; } break; default: break; } }, HttpVerb::kPost); m_Router.RegisterRoute( "jobs", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto QueryParams = HttpReq.GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } // Resolve worker // switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { // This operation takes the proposed job spec and identifies which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject ActionObj = LoadCompactBinaryObject(Payload); std::vector NeedList; ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); if (NeedList.empty()) { // We already have everything, enqueue the action for execution if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority)) { ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { // Could not resolve? return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); std::span Attachments = Action.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); TotalAttachmentBytes += CompressedSize; ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority)) { ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", Result.Lsn, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { // Could not resolve? return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } return; } }, HttpVerb::kPost); m_Router.RegisterRoute( "workers/all", [this](HttpRouterRequest& Req) { HandleWorkersAllGet(Req.ServerRequest()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/workers", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0) return; HandleWorkersGet(HttpReq); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/workers/all", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0) return; HandleWorkersAllGet(HttpReq); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0) return; HandleWorkerRequest(HttpReq, IoHash::FromHexString(Req.GetCapture(2))); }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "sysinfo", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query()); CbObjectWriter Cbo; Describe(Sm, Cbo); Cbo << "cpu_usage" << Sm.CpuUsagePercent; Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; Cbo << "disk_used" << 100 * 1024; Cbo << "disk_total" << 100 * 1024 * 1024; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "record/start", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording"); return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); m_Router.RegisterRoute( "record/stop", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } m_ComputeService.StopRecording(); return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); // Local-only queue listing and creation m_Router.RegisterRoute( "queues", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (!HttpReq.IsLocalMachineRequest()) { return HttpReq.WriteResponse(HttpResponseCode::Forbidden); } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbObjectWriter Cbo; Cbo.BeginArray("queues"sv); for (const int QueueId : m_ComputeService.GetQueueIds()) { ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { continue; } Cbo.BeginObject(); WriteQueueDescription(Cbo, QueueId, Status); Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } case HttpVerb::kPost: { CbObject Metadata; CbObject Config; if (const CbObject Body = HttpReq.ReadPayloadObject()) { Metadata = Body.Find("metadata"sv).AsObject(); Config = Body.Find("config"sv).AsObject(); } ComputeServiceSession::CreateQueueResult Result = m_ComputeService.CreateQueue({}, std::move(Metadata), std::move(Config)); CbObjectWriter Cbo; Cbo << "queue_id"sv << Result.QueueId; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } default: break; } }, HttpVerb::kGet | HttpVerb::kPost); // Queue creation routes — these remain separate since local creates a plain queue // while remote additionally generates an OID token for external access. m_Router.RegisterRoute( "queues/remote", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); // Extract optional fields from the request body. // idempotency_key: when present, we return the existing remote queue token for this // key rather than creating a new queue, making the endpoint safe to call concurrently. // hostname: human-readable origin context stored alongside the queue for diagnostics. // metadata: arbitrary CbObject metadata propagated from the originating queue. // config: arbitrary CbObject config propagated from the originating queue. std::string IdempotencyKey; std::string ClientHostname; CbObject Metadata; CbObject Config; if (const CbObject Body = HttpReq.ReadPayloadObject()) { IdempotencyKey = std::string(Body["idempotency_key"sv].AsString()); ClientHostname = std::string(Body["hostname"sv].AsString()); Metadata = Body.Find("metadata"sv).AsObject(); Config = Body.Find("config"sv).AsObject(); } // Stamp the forwarding node's hostname into the metadata so that the // remote side knows which node originated the queue. if (!ClientHostname.empty()) { CbObjectWriter MetaWriter; for (auto Field : Metadata) { MetaWriter.AddField(Field.GetName(), Field); } MetaWriter << "via"sv << ClientHostname; Metadata = MetaWriter.Save(); } RwLock::ExclusiveLockScope _(m_RemoteQueueLock); if (!IdempotencyKey.empty()) { if (auto It = m_RemoteQueuesByTag.find(IdempotencyKey); It != m_RemoteQueuesByTag.end()) { Ref Existing = It->second; if (m_ComputeService.GetQueueStatus(Existing->QueueId).IsValid) { CbObjectWriter Cbo; Cbo << "queue_token"sv << Existing->Token.ToString(); Cbo << "queue_id"sv << Existing->QueueId; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } // Queue has since expired — clean up stale entries and fall through to create a new one m_RemoteQueuesByToken.erase(Existing->Token); m_RemoteQueuesByQueueId.erase(Existing->QueueId); m_RemoteQueuesByTag.erase(It); } } ComputeServiceSession::CreateQueueResult Result = m_ComputeService.CreateQueue({}, std::move(Metadata), std::move(Config)); Ref InfoRef(new RemoteQueueInfo()); InfoRef->QueueId = Result.QueueId; InfoRef->Token = Oid::NewOid(); InfoRef->IdempotencyKey = std::move(IdempotencyKey); InfoRef->ClientHostname = std::move(ClientHostname); m_RemoteQueuesByToken[InfoRef->Token] = InfoRef; m_RemoteQueuesByQueueId[InfoRef->QueueId] = InfoRef; if (!InfoRef->IdempotencyKey.empty()) { m_RemoteQueuesByTag[InfoRef->IdempotencyKey] = InfoRef; } CbObjectWriter Cbo; Cbo << "queue_token"sv << InfoRef->Token.ToString(); Cbo << "queue_id"sv << InfoRef->QueueId; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); // Unified queue routes — {queueref} accepts both local integer IDs and remote OID tokens. // ResolveQueueRef() handles access control (local-only for integer IDs) and token resolution. m_Router.RegisterRoute( "queues/{queueref}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter Cbo; WriteQueueDescription(Cbo, QueueId, Status); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } case HttpVerb::kDelete: { ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } m_ComputeService.CancelQueue(QueueId); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } default: break; } }, HttpVerb::kGet | HttpVerb::kDelete); m_Router.RegisterRoute( "queues/{queueref}/drain", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } m_ComputeService.DrainQueue(QueueId); // Return updated queue status Status = m_ComputeService.GetQueueStatus(QueueId); CbObjectWriter Cbo; WriteQueueDescription(Cbo, QueueId, Status); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/completed", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } CbObjectWriter Cbo; m_ComputeService.GetQueueCompleted(QueueId, Cbo); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/history", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId); if (!Status.IsValid) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = HttpReq.GetQueryParams(); int QueryLimit = 50; if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) { QueryLimit = ParseInt(LimitParam).value_or(50); } CbObjectWriter Cbo; Cbo.BeginArray("history"); for (const auto& Entry : m_ComputeService.GetQueueHistory(QueueId, QueryLimit)) { Cbo.BeginObject(); Cbo << "lsn"sv << Entry.Lsn; Cbo << "queueId"sv << Entry.QueueId; Cbo << "actionId"sv << Entry.ActionId; Cbo << "workerId"sv << Entry.WorkerId; Cbo << "succeeded"sv << Entry.Succeeded; if (Entry.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Entry.CpuSeconds); } if (Entry.RetryCount > 0) { Cbo << "retry_count"sv << Entry.RetryCount; } for (const auto& Timestamp : Entry.Timestamps) { Cbo.AddInteger( fmt::format("time_{}"sv, RunnerAction::ToString(static_cast(&Timestamp - Entry.Timestamps))), Timestamp); } Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/running", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } // Filter global running list to this queue auto AllRunning = m_ComputeService.GetRunningActions(); std::vector Running; for (auto& Info : AllRunning) if (Info.QueueId == QueueId) Running.push_back(Info); CbObjectWriter Cbo; Cbo.BeginArray("running"); for (const auto& Info : Running) { Cbo.BeginObject(); Cbo << "lsn"sv << Info.Lsn; Cbo << "queueId"sv << Info.QueueId; Cbo << "actionId"sv << Info.ActionId; if (Info.CpuUsagePercent >= 0.0f) { Cbo.AddFloat("cpuUsage"sv, Info.CpuUsagePercent); } if (Info.CpuSeconds > 0.0f) { Cbo.AddFloat("cpuSeconds"sv, Info.CpuSeconds); } Cbo.EndObject(); } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "queues/{queueref}/jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(2)); WorkerDesc Worker = m_ComputeService.GetWorkerDescriptor(WorkerId); if (!Worker) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { IoBuffer Payload = HttpReq.ReadPayload(); CbObject ActionObj = LoadCompactBinaryObject(Payload); std::vector NeedList; if (!CheckAttachments(ActionObj, NeedList)) { CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); } if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority)) { ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, ActionObj.GetHash(), Result.Lsn); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); IngestStats Stats = IngestPackageAttachments(Action); if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority)) { ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", QueueId, ActionObj.GetHash(), Result.Lsn, zen::NiceBytes(Stats.Bytes), Stats.Count, zen::NiceBytes(Stats.NewBytes), Stats.NewCount); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } default: break; } }, HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/jobs", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); if (QueueId == 0) { return; } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { IoBuffer Payload = HttpReq.ReadPayload(); CbObject ActionObj = LoadCompactBinaryObject(Payload); std::vector NeedList; if (!CheckAttachments(ActionObj, NeedList)) { CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save()); } if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority)) { ZEN_DEBUG("queue {}: action accepted (lsn {})", QueueId, Result.Lsn); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); IngestStats Stats = IngestPackageAttachments(Action); if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority)) { ZEN_DEBUG("queue {}: accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", QueueId, Result.Lsn, zen::NiceBytes(Stats.Bytes), Stats.Count, zen::NiceBytes(Stats.NewBytes), Stats.NewCount); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } default: break; } }, HttpVerb::kPost); m_Router.RegisterRoute( "queues/{queueref}/jobs/{lsn}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1)); const int ActionLsn = ParseInt(Req.GetCapture(2)).value_or(0); if (QueueId == 0) { return; } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { ZEN_UNUSED(QueueId); CbPackage Output; HttpResponseCode ResponseCode = m_ComputeService.GetActionResult(ActionLsn, Output); if (ResponseCode == HttpResponseCode::OK) { HttpReq.WriteResponse(HttpResponseCode::OK, Output); } else { HttpReq.WriteResponse(ResponseCode); } m_ComputeService.RetireActionResult(ActionLsn); } break; case HttpVerb::kPost: { ZEN_UNUSED(QueueId); auto Result = m_ComputeService.RescheduleAction(ActionLsn); CbObjectWriter Cbo; if (Result.Success) { Cbo << "lsn"sv << ActionLsn; Cbo << "retry_count"sv << Result.RetryCount; HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } else { Cbo << "error"sv << Result.Error; HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save()); } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); } ////////////////////////////////////////////////////////////////////////// HttpComputeService::HttpComputeService(CidStore& InCidStore, IHttpStatsService& StatsService, const std::filesystem::path& BaseDir, int32_t MaxConcurrentActions) : m_Impl(std::make_unique(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions)) { } HttpComputeService::~HttpComputeService() { m_Impl->m_StatsService.UnregisterHandler("compute", *this); } void HttpComputeService::Shutdown() { m_Impl->m_ComputeService.Shutdown(); } ComputeServiceSession::ActionCounts HttpComputeService::GetActionCounts() { return m_Impl->m_ComputeService.GetActionCounts(); } const char* HttpComputeService::BaseUri() const { return "/compute/"; } void HttpComputeService::HandleRequest(HttpServerRequest& Request) { ZEN_TRACE_CPU("HttpComputeService::HandleRequest"); metrics::OperationTiming::Scope $(m_Impl->m_HttpRequests); if (m_Impl->m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } void HttpComputeService::HandleStatsRequest(HttpServerRequest& Request) { CbObjectWriter Cbo; m_Impl->m_ComputeService.EmitStats(Cbo); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } ////////////////////////////////////////////////////////////////////////// void HttpComputeService::Impl::WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status) { Cbo << "queue_id"sv << Status.QueueId; Cbo << "active_count"sv << Status.ActiveCount; Cbo << "completed_count"sv << Status.CompletedCount; Cbo << "failed_count"sv << Status.FailedCount; Cbo << "abandoned_count"sv << Status.AbandonedCount; Cbo << "cancelled_count"sv << Status.CancelledCount; Cbo << "state"sv << ToString(Status.State); Cbo << "cancelled"sv << (Status.State == ComputeServiceSession::QueueState::Cancelled); Cbo << "draining"sv << (Status.State == ComputeServiceSession::QueueState::Draining); Cbo << "is_complete"sv << Status.IsComplete; if (CbObject Meta = m_ComputeService.GetQueueMetadata(QueueId)) { Cbo << "metadata"sv << Meta; } if (CbObject Cfg = m_ComputeService.GetQueueConfig(QueueId)) { Cbo << "config"sv << Cfg; } { RwLock::SharedLockScope $(m_RemoteQueueLock); if (auto It = m_RemoteQueuesByQueueId.find(QueueId); It != m_RemoteQueuesByQueueId.end()) { Cbo << "queue_token"sv << It->second->Token.ToString(); if (!It->second->ClientHostname.empty()) { Cbo << "hostname"sv << It->second->ClientHostname; } } } } ////////////////////////////////////////////////////////////////////////// int HttpComputeService::Impl::ResolveQueueToken(const Oid& Token) { RwLock::SharedLockScope $(m_RemoteQueueLock); auto It = m_RemoteQueuesByToken.find(Token); if (It != m_RemoteQueuesByToken.end()) { return It->second->QueueId; } return 0; } int HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::string_view Capture) { if (OidMatcher(Capture)) { // Remote OID token — accessible from any client const Oid Token = Oid::FromHexString(Capture); const int QueueId = ResolveQueueToken(Token); if (QueueId == 0) { HttpReq.WriteResponse(HttpResponseCode::NotFound); } return QueueId; } // Local integer queue ID — restricted to local machine requests if (!HttpReq.IsLocalMachineRequest()) { HttpReq.WriteResponse(HttpResponseCode::Forbidden); return 0; } return ParseInt(Capture).value_or(0); } HttpComputeService::Impl::IngestStats HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package) { IngestStats Stats; for (const CbAttachment& Attachment : Package.GetAttachments()) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); Stats.Bytes += CompressedSize; ++Stats.Count; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { Stats.NewBytes += CompressedSize; ++Stats.NewCount; } } return Stats; } bool HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vector& NeedList) { ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); return NeedList.empty(); } void HttpComputeService::Impl::HandleWorkersGet(HttpServerRequest& HttpReq) { CbObjectWriter Cbo; Cbo.BeginArray("workers"sv); for (const IoHash& WorkerId : m_ComputeService.GetKnownWorkerIds()) { Cbo << WorkerId; } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpComputeService::Impl::HandleWorkersAllGet(HttpServerRequest& HttpReq) { std::vector WorkerIds = m_ComputeService.GetKnownWorkerIds(); CbObjectWriter Cbo; Cbo.BeginArray("workers"); for (const IoHash& WorkerId : WorkerIds) { Cbo.BeginObject(); Cbo << "id" << WorkerId; Cbo << "descriptor" << m_ComputeService.GetWorkerDescriptor(WorkerId).Descriptor.GetObject(); Cbo.EndObject(); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } void HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId) { switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: if (WorkerDesc Desc = m_ComputeService.GetWorkerDescriptor(WorkerId)) { return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); } return HttpReq.WriteResponse(HttpResponseCode::NotFound); case HttpVerb::kPost: { switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { CbObject WorkerSpec = HttpReq.ReadPayloadObject(); HashKeySet ChunkSet; WorkerSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); ChunkSet.AddHashToSet(Hash); }); CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerSpec); m_CidStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { ZEN_DEBUG("worker {}: all attachments already available", WorkerId); m_ComputeService.RegisterWorker(WorkerPackage); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); ChunkSet.IterateHashes([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); }); ResponseWriter.EndArray(); ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } break; case HttpContentType::kCbPackage: { CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); CbObject WorkerSpec = WorkerSpecPackage.GetObject(); std::span Attachments = WorkerSpecPackage.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer Buffer = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += Buffer.GetCompressedSize(); ++NewAttachmentCount; } } ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", WorkerId, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); m_ComputeService.RegisterWorker(WorkerSpecPackage); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; default: break; } } break; default: break; } } ////////////////////////////////////////////////////////////////////////// void httpcomputeservice_forcelink() { } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES