aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/httpcomputeservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/httpcomputeservice.cpp')
-rw-r--r--src/zencompute/httpcomputeservice.cpp1643
1 files changed, 1643 insertions, 0 deletions
diff --git a/src/zencompute/httpcomputeservice.cpp b/src/zencompute/httpcomputeservice.cpp
new file mode 100644
index 000000000..e82a40781
--- /dev/null
+++ b/src/zencompute/httpcomputeservice.cpp
@@ -0,0 +1,1643 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "zencompute/httpcomputeservice.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include "runners/functionrunner.h"
+
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/logging.h>
+# include <zencore/system.h>
+# include <zencore/thread.h>
+# include <zencore/trace.h>
+# include <zencore/uid.h>
+# include <zenstore/cidstore.h>
+# include <zentelemetry/stats.h>
+
+# include <span>
+# include <unordered_map>
+
+using namespace std::literals;
+
+namespace zen::compute {
+
+constinit AsciiSet g_DecimalSet("0123456789");
+constinit AsciiSet g_HexSet("0123456789abcdefABCDEF");
+
+auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); };
+auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); };
+auto OidMatcher = [](std::string_view Str) { return Str.size() == 24 && AsciiSet::HasOnly(Str, g_HexSet); };
+
+//////////////////////////////////////////////////////////////////////////
+
+struct HttpComputeService::Impl
+{
+ HttpComputeService* m_Self;
+ CidStore& m_CidStore;
+ IHttpStatsService& m_StatsService;
+ LoggerRef m_Log;
+ std::filesystem::path m_BaseDir;
+ HttpRequestRouter m_Router;
+ ComputeServiceSession m_ComputeService;
+ SystemMetricsTracker m_MetricsTracker;
+
+ // Metrics
+
+ metrics::OperationTiming m_HttpRequests;
+
+ // Per-remote-queue metadata, shared across all lookup maps below.
+
+ struct RemoteQueueInfo : RefCounted
+ {
+ int QueueId = 0;
+ Oid Token;
+ std::string IdempotencyKey; // empty if no idempotency key was provided
+ std::string ClientHostname; // empty if no hostname was provided
+ };
+
+ // Remote queue registry — all three maps share the same RemoteQueueInfo objects.
+ // All maps are guarded by m_RemoteQueueLock.
+
+ RwLock m_RemoteQueueLock;
+ std::unordered_map<Oid, Ref<RemoteQueueInfo>, Oid::Hasher> m_RemoteQueuesByToken; // Token → info
+ std::unordered_map<int, Ref<RemoteQueueInfo>> m_RemoteQueuesByQueueId; // QueueId → info
+ std::unordered_map<std::string, Ref<RemoteQueueInfo>> m_RemoteQueuesByTag; // idempotency key → info
+
+ LoggerRef Log() { return m_Log; }
+
+ int ResolveQueueToken(const Oid& Token);
+ int ResolveQueueRef(HttpServerRequest& HttpReq, std::string_view Capture);
+
+ struct IngestStats
+ {
+ int Count = 0;
+ int NewCount = 0;
+ uint64_t Bytes = 0;
+ uint64_t NewBytes = 0;
+ };
+
+ IngestStats IngestPackageAttachments(const CbPackage& Package);
+ bool CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList);
+ void HandleWorkersGet(HttpServerRequest& HttpReq);
+ void HandleWorkersAllGet(HttpServerRequest& HttpReq);
+ void WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status);
+ void HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId);
+
+ void RegisterRoutes();
+
+ Impl(HttpComputeService* Self,
+ CidStore& InCidStore,
+ IHttpStatsService& StatsService,
+ const std::filesystem::path& BaseDir,
+ int32_t MaxConcurrentActions)
+ : m_Self(Self)
+ , m_CidStore(InCidStore)
+ , m_StatsService(StatsService)
+ , m_Log(logging::Get("compute"))
+ , m_BaseDir(BaseDir)
+ , m_ComputeService(InCidStore)
+ {
+ m_ComputeService.AddLocalRunner(InCidStore, m_BaseDir / "local", MaxConcurrentActions);
+ m_ComputeService.WaitUntilReady();
+ m_StatsService.RegisterHandler("compute", *m_Self);
+ RegisterRoutes();
+ }
+};
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+HttpComputeService::Impl::RegisterRoutes()
+{
+ m_Router.AddMatcher("lsn", DecimalMatcher);
+ m_Router.AddMatcher("worker", IoHashMatcher);
+ m_Router.AddMatcher("action", IoHashMatcher);
+ m_Router.AddMatcher("queue", DecimalMatcher);
+ m_Router.AddMatcher("oidtoken", OidMatcher);
+ m_Router.AddMatcher("queueref", [](std::string_view Str) { return DecimalMatcher(Str) || OidMatcher(Str); });
+
+ m_Router.RegisterRoute(
+ "ready",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (m_ComputeService.IsHealthy())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok");
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "abandon",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!HttpReq.IsLocalMachineRequest())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
+ }
+
+ bool Success = m_ComputeService.RequestStateTransition(ComputeServiceSession::SessionState::Abandoned);
+
+ if (Success)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "state"sv
+ << "Abandoned"sv;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "error"sv
+ << "Cannot transition to Abandoned from current state"sv;
+ return HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "workers",
+ [this](HttpRouterRequest& Req) { HandleWorkersGet(Req.ServerRequest()); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "workers/{worker}",
+ [this](HttpRouterRequest& Req) { HandleWorkerRequest(Req.ServerRequest(), IoHash::FromHexString(Req.GetCapture(1))); },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/completed",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ CbObjectWriter Cbo;
+ m_ComputeService.GetCompleted(Cbo);
+
+ ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query());
+ Cbo.BeginObject("metrics");
+ Describe(Sm, Cbo);
+ Cbo.EndObject();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/history",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int QueryLimit = 50;
+
+ if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false)
+ {
+ QueryLimit = ParseInt<int>(LimitParam).value_or(50);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("history");
+ for (const auto& Entry : m_ComputeService.GetActionHistory(QueryLimit))
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Entry.Lsn;
+ Cbo << "queueId"sv << Entry.QueueId;
+ Cbo << "actionId"sv << Entry.ActionId;
+ Cbo << "workerId"sv << Entry.WorkerId;
+ Cbo << "succeeded"sv << Entry.Succeeded;
+ Cbo << "actionDescriptor"sv << Entry.ActionDescriptor;
+ if (Entry.CpuSeconds > 0.0f)
+ {
+ Cbo.AddFloat("cpuSeconds"sv, Entry.CpuSeconds);
+ }
+ if (Entry.RetryCount > 0)
+ {
+ Cbo << "retry_count"sv << Entry.RetryCount;
+ }
+
+ for (const auto& Timestamp : Entry.Timestamps)
+ {
+ Cbo.AddInteger(
+ fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))),
+ Timestamp);
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/running",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ auto Running = m_ComputeService.GetRunningActions();
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("running");
+ for (const auto& Info : Running)
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Info.Lsn;
+ Cbo << "queueId"sv << Info.QueueId;
+ Cbo << "actionId"sv << Info.ActionId;
+ if (Info.CpuUsagePercent >= 0.0f)
+ {
+ Cbo.AddFloat("cpuUsage"sv, Info.CpuUsagePercent);
+ }
+ if (Info.CpuSeconds > 0.0f)
+ {
+ Cbo.AddFloat("cpuSeconds"sv, Info.CpuSeconds);
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{lsn}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int ActionLsn = ParseInt<int>(Req.GetCapture(1)).value_or(0);
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = m_ComputeService.GetActionResult(ActionLsn, Output);
+
+ if (ResponseCode == HttpResponseCode::OK)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ else
+ {
+ HttpReq.WriteResponse(ResponseCode);
+ }
+
+ // Once we've initiated the response we can mark the result
+ // as retired, allowing the service to free any associated
+ // resources. Note that there still needs to be a delay
+ // to allow the transmission to complete, it would be better
+ // if we could issue this once the response is fully sent...
+ m_ComputeService.RetireActionResult(ActionLsn);
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ auto Result = m_ComputeService.RescheduleAction(ActionLsn);
+
+ CbObjectWriter Cbo;
+ if (Result.Success)
+ {
+ Cbo << "lsn"sv << ActionLsn;
+ Cbo << "retry_count"sv << Result.RetryCount;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ else
+ {
+ Cbo << "error"sv << Result.Error;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the
+ // one which uses the scheduled action lsn for lookups
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ CbPackage Output;
+ if (HttpResponseCode ResponseCode = m_ComputeService.FindActionResult(ActionId, /* out */ Output);
+ ResponseCode != HttpResponseCode::OK)
+ {
+ ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
+
+ if (ResponseCode == HttpResponseCode::NotFound)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2))
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker = m_ComputeService.GetWorkerDescriptor(WorkerId);
+
+ if (!Worker)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const auto QueryParams = Req.ServerRequest().GetQueryParams();
+
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
+ break;
+
+ case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything, enqueue the action for execution
+
+ if (ComputeServiceSession::EnqueueResult Result =
+ m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+
+ return;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ if (ComputeServiceSession::EnqueueResult Result =
+ m_ComputeService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ ActionObj.GetHash(),
+ Result.Lsn,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+
+ return;
+ }
+ break;
+
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ // Resolve worker
+
+ //
+
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything, enqueue the action for execution
+
+ if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("action accepted (lsn {})", Result.Lsn);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ // Could not resolve?
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ if (ComputeServiceSession::EnqueueResult Result = m_ComputeService.EnqueueAction(ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ Result.Lsn,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ // Could not resolve?
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+ return;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "workers/all",
+ [this](HttpRouterRequest& Req) { HandleWorkersAllGet(Req.ServerRequest()); },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/workers",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0)
+ return;
+ HandleWorkersGet(HttpReq);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/workers/all",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0)
+ return;
+ HandleWorkersAllGet(HttpReq);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/workers/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ if (ResolveQueueRef(HttpReq, Req.GetCapture(1)) == 0)
+ return;
+ HandleWorkerRequest(HttpReq, IoHash::FromHexString(Req.GetCapture(2)));
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "sysinfo",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ ExtendedSystemMetrics Sm = ApplyReportingOverrides(m_MetricsTracker.Query());
+
+ CbObjectWriter Cbo;
+ Describe(Sm, Cbo);
+
+ Cbo << "cpu_usage" << Sm.CpuUsagePercent;
+ Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024;
+ Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024;
+ Cbo << "disk_used" << 100 * 1024;
+ Cbo << "disk_total" << 100 * 1024 * 1024;
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "record/start",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!HttpReq.IsLocalMachineRequest())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
+ }
+
+ m_ComputeService.StartRecording(m_CidStore, m_BaseDir / "recording");
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "record/stop",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!HttpReq.IsLocalMachineRequest())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
+ }
+
+ m_ComputeService.StopRecording();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK);
+ },
+ HttpVerb::kPost);
+
+ // Local-only queue listing and creation
+
+ m_Router.RegisterRoute(
+ "queues",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ if (!HttpReq.IsLocalMachineRequest())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::Forbidden);
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("queues"sv);
+
+ for (const int QueueId : m_ComputeService.GetQueueIds())
+ {
+ ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ if (!Status.IsValid)
+ {
+ continue;
+ }
+
+ Cbo.BeginObject();
+ WriteQueueDescription(Cbo, QueueId, Status);
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ case HttpVerb::kPost:
+ {
+ CbObject Metadata;
+ CbObject Config;
+ if (const CbObject Body = HttpReq.ReadPayloadObject())
+ {
+ Metadata = Body.Find("metadata"sv).AsObject();
+ Config = Body.Find("config"sv).AsObject();
+ }
+
+ ComputeServiceSession::CreateQueueResult Result =
+ m_ComputeService.CreateQueue({}, std::move(Metadata), std::move(Config));
+
+ CbObjectWriter Cbo;
+ Cbo << "queue_id"sv << Result.QueueId;
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ // Queue creation routes — these remain separate since local creates a plain queue
+ // while remote additionally generates an OID token for external access.
+
+ m_Router.RegisterRoute(
+ "queues/remote",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ // Extract optional fields from the request body.
+ // idempotency_key: when present, we return the existing remote queue token for this
+ // key rather than creating a new queue, making the endpoint safe to call concurrently.
+ // hostname: human-readable origin context stored alongside the queue for diagnostics.
+ // metadata: arbitrary CbObject metadata propagated from the originating queue.
+ // config: arbitrary CbObject config propagated from the originating queue.
+ std::string IdempotencyKey;
+ std::string ClientHostname;
+ CbObject Metadata;
+ CbObject Config;
+ if (const CbObject Body = HttpReq.ReadPayloadObject())
+ {
+ IdempotencyKey = std::string(Body["idempotency_key"sv].AsString());
+ ClientHostname = std::string(Body["hostname"sv].AsString());
+ Metadata = Body.Find("metadata"sv).AsObject();
+ Config = Body.Find("config"sv).AsObject();
+ }
+
+ // Stamp the forwarding node's hostname into the metadata so that the
+ // remote side knows which node originated the queue.
+ if (!ClientHostname.empty())
+ {
+ CbObjectWriter MetaWriter;
+ for (auto Field : Metadata)
+ {
+ MetaWriter.AddField(Field.GetName(), Field);
+ }
+ MetaWriter << "via"sv << ClientHostname;
+ Metadata = MetaWriter.Save();
+ }
+
+ RwLock::ExclusiveLockScope _(m_RemoteQueueLock);
+
+ if (!IdempotencyKey.empty())
+ {
+ if (auto It = m_RemoteQueuesByTag.find(IdempotencyKey); It != m_RemoteQueuesByTag.end())
+ {
+ Ref<RemoteQueueInfo> Existing = It->second;
+ if (m_ComputeService.GetQueueStatus(Existing->QueueId).IsValid)
+ {
+ CbObjectWriter Cbo;
+ Cbo << "queue_token"sv << Existing->Token.ToString();
+ Cbo << "queue_id"sv << Existing->QueueId;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ // Queue has since expired — clean up stale entries and fall through to create a new one
+ m_RemoteQueuesByToken.erase(Existing->Token);
+ m_RemoteQueuesByQueueId.erase(Existing->QueueId);
+ m_RemoteQueuesByTag.erase(It);
+ }
+ }
+
+ ComputeServiceSession::CreateQueueResult Result = m_ComputeService.CreateQueue({}, std::move(Metadata), std::move(Config));
+ Ref<RemoteQueueInfo> InfoRef(new RemoteQueueInfo());
+ InfoRef->QueueId = Result.QueueId;
+ InfoRef->Token = Oid::NewOid();
+ InfoRef->IdempotencyKey = std::move(IdempotencyKey);
+ InfoRef->ClientHostname = std::move(ClientHostname);
+
+ m_RemoteQueuesByToken[InfoRef->Token] = InfoRef;
+ m_RemoteQueuesByQueueId[InfoRef->QueueId] = InfoRef;
+ if (!InfoRef->IdempotencyKey.empty())
+ {
+ m_RemoteQueuesByTag[InfoRef->IdempotencyKey] = InfoRef;
+ }
+
+ CbObjectWriter Cbo;
+ Cbo << "queue_token"sv << InfoRef->Token.ToString();
+ Cbo << "queue_id"sv << InfoRef->QueueId;
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ // Unified queue routes — {queueref} accepts both local integer IDs and remote OID tokens.
+ // ResolveQueueRef() handles access control (local-only for integer IDs) and token resolution.
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ if (!Status.IsValid)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ CbObjectWriter Cbo;
+ WriteQueueDescription(Cbo, QueueId, Status);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+
+ case HttpVerb::kDelete:
+ {
+ ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ if (!Status.IsValid)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ m_ComputeService.CancelQueue(QueueId);
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kDelete);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/drain",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ if (!Status.IsValid)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ m_ComputeService.DrainQueue(QueueId);
+
+ // Return updated queue status
+ Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ CbObjectWriter Cbo;
+ WriteQueueDescription(Cbo, QueueId, Status);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/completed",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ if (!Status.IsValid)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ CbObjectWriter Cbo;
+ m_ComputeService.GetQueueCompleted(QueueId, Cbo);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/history",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ ComputeServiceSession::QueueStatus Status = m_ComputeService.GetQueueStatus(QueueId);
+
+ if (!Status.IsValid)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const auto QueryParams = HttpReq.GetQueryParams();
+
+ int QueryLimit = 50;
+
+ if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false)
+ {
+ QueryLimit = ParseInt<int>(LimitParam).value_or(50);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("history");
+ for (const auto& Entry : m_ComputeService.GetQueueHistory(QueueId, QueryLimit))
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Entry.Lsn;
+ Cbo << "queueId"sv << Entry.QueueId;
+ Cbo << "actionId"sv << Entry.ActionId;
+ Cbo << "workerId"sv << Entry.WorkerId;
+ Cbo << "succeeded"sv << Entry.Succeeded;
+ if (Entry.CpuSeconds > 0.0f)
+ {
+ Cbo.AddFloat("cpuSeconds"sv, Entry.CpuSeconds);
+ }
+ if (Entry.RetryCount > 0)
+ {
+ Cbo << "retry_count"sv << Entry.RetryCount;
+ }
+
+ for (const auto& Timestamp : Entry.Timestamps)
+ {
+ Cbo.AddInteger(
+ fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))),
+ Timestamp);
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/running",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+ if (QueueId == 0)
+ {
+ return;
+ }
+ // Filter global running list to this queue
+ auto AllRunning = m_ComputeService.GetRunningActions();
+ std::vector<ComputeServiceSession::RunningActionInfo> Running;
+ for (auto& Info : AllRunning)
+ if (Info.QueueId == QueueId)
+ Running.push_back(Info);
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("running");
+ for (const auto& Info : Running)
+ {
+ Cbo.BeginObject();
+ Cbo << "lsn"sv << Info.Lsn;
+ Cbo << "queueId"sv << Info.QueueId;
+ Cbo << "actionId"sv << Info.ActionId;
+ if (Info.CpuUsagePercent >= 0.0f)
+ {
+ Cbo.AddFloat("cpuUsage"sv, Info.CpuUsagePercent);
+ }
+ if (Info.CpuSeconds > 0.0f)
+ {
+ Cbo.AddFloat("cpuSeconds"sv, Info.CpuSeconds);
+ }
+ Cbo.EndObject();
+ }
+ Cbo.EndArray();
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/jobs/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(2));
+ WorkerDesc Worker = m_ComputeService.GetWorkerDescriptor(WorkerId);
+
+ if (!Worker)
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ const auto QueryParams = Req.ServerRequest().GetQueryParams();
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ if (!CheckAttachments(ActionObj, NeedList))
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save());
+ }
+
+ if (ComputeServiceSession::EnqueueResult Result =
+ m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("queue {}: action {} accepted (lsn {})", QueueId, ActionObj.GetHash(), Result.Lsn);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ IngestStats Stats = IngestPackageAttachments(Action);
+
+ if (ComputeServiceSession::EnqueueResult Result =
+ m_ComputeService.EnqueueResolvedActionToQueue(QueueId, Worker, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("queue {}: accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ QueueId,
+ ActionObj.GetHash(),
+ Result.Lsn,
+ zen::NiceBytes(Stats.Bytes),
+ Stats.Count,
+ zen::NiceBytes(Stats.NewBytes),
+ Stats.NewCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/jobs",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ const auto QueryParams = Req.ServerRequest().GetQueryParams();
+ int RequestPriority = -1;
+
+ if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
+ {
+ RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
+ }
+
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject ActionObj = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ if (!CheckAttachments(ActionObj, NeedList))
+ {
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Cbo.Save());
+ }
+
+ if (ComputeServiceSession::EnqueueResult Result =
+ m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("queue {}: action accepted (lsn {})", QueueId, Result.Lsn);
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ IngestStats Stats = IngestPackageAttachments(Action);
+
+ if (ComputeServiceSession::EnqueueResult Result =
+ m_ComputeService.EnqueueActionToQueue(QueueId, ActionObj, RequestPriority))
+ {
+ ZEN_DEBUG("queue {}: accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
+ QueueId,
+ Result.Lsn,
+ zen::NiceBytes(Stats.Bytes),
+ Stats.Count,
+ zen::NiceBytes(Stats.NewBytes),
+ Stats.NewCount);
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
+ }
+ else
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
+ }
+ }
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "queues/{queueref}/jobs/{lsn}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const int QueueId = ResolveQueueRef(HttpReq, Req.GetCapture(1));
+ const int ActionLsn = ParseInt<int>(Req.GetCapture(2)).value_or(0);
+
+ if (QueueId == 0)
+ {
+ return;
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ ZEN_UNUSED(QueueId);
+
+ CbPackage Output;
+ HttpResponseCode ResponseCode = m_ComputeService.GetActionResult(ActionLsn, Output);
+
+ if (ResponseCode == HttpResponseCode::OK)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ else
+ {
+ HttpReq.WriteResponse(ResponseCode);
+ }
+
+ m_ComputeService.RetireActionResult(ActionLsn);
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ ZEN_UNUSED(QueueId);
+
+ auto Result = m_ComputeService.RescheduleAction(ActionLsn);
+
+ CbObjectWriter Cbo;
+ if (Result.Success)
+ {
+ Cbo << "lsn"sv << ActionLsn;
+ Cbo << "retry_count"sv << Result.RetryCount;
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+ }
+ else
+ {
+ Cbo << "error"sv << Result.Error;
+ HttpReq.WriteResponse(HttpResponseCode::Conflict, Cbo.Save());
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+HttpComputeService::HttpComputeService(CidStore& InCidStore,
+ IHttpStatsService& StatsService,
+ const std::filesystem::path& BaseDir,
+ int32_t MaxConcurrentActions)
+: m_Impl(std::make_unique<Impl>(this, InCidStore, StatsService, BaseDir, MaxConcurrentActions))
+{
+}
+
+HttpComputeService::~HttpComputeService()
+{
+ m_Impl->m_StatsService.UnregisterHandler("compute", *this);
+}
+
+void
+HttpComputeService::Shutdown()
+{
+ m_Impl->m_ComputeService.Shutdown();
+}
+
+ComputeServiceSession::ActionCounts
+HttpComputeService::GetActionCounts()
+{
+ return m_Impl->m_ComputeService.GetActionCounts();
+}
+
+const char*
+HttpComputeService::BaseUri() const
+{
+ return "/compute/";
+}
+
+void
+HttpComputeService::HandleRequest(HttpServerRequest& Request)
+{
+ ZEN_TRACE_CPU("HttpComputeService::HandleRequest");
+ metrics::OperationTiming::Scope $(m_Impl->m_HttpRequests);
+
+ if (m_Impl->m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+void
+HttpComputeService::HandleStatsRequest(HttpServerRequest& Request)
+{
+ CbObjectWriter Cbo;
+ m_Impl->m_ComputeService.EmitStats(Cbo);
+
+ Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+HttpComputeService::Impl::WriteQueueDescription(CbWriter& Cbo, int QueueId, const ComputeServiceSession::QueueStatus& Status)
+{
+ Cbo << "queue_id"sv << Status.QueueId;
+ Cbo << "active_count"sv << Status.ActiveCount;
+ Cbo << "completed_count"sv << Status.CompletedCount;
+ Cbo << "failed_count"sv << Status.FailedCount;
+ Cbo << "abandoned_count"sv << Status.AbandonedCount;
+ Cbo << "cancelled_count"sv << Status.CancelledCount;
+ Cbo << "state"sv << ToString(Status.State);
+ Cbo << "cancelled"sv << (Status.State == ComputeServiceSession::QueueState::Cancelled);
+ Cbo << "draining"sv << (Status.State == ComputeServiceSession::QueueState::Draining);
+ Cbo << "is_complete"sv << Status.IsComplete;
+
+ if (CbObject Meta = m_ComputeService.GetQueueMetadata(QueueId))
+ {
+ Cbo << "metadata"sv << Meta;
+ }
+
+ if (CbObject Cfg = m_ComputeService.GetQueueConfig(QueueId))
+ {
+ Cbo << "config"sv << Cfg;
+ }
+
+ {
+ RwLock::SharedLockScope $(m_RemoteQueueLock);
+ if (auto It = m_RemoteQueuesByQueueId.find(QueueId); It != m_RemoteQueuesByQueueId.end())
+ {
+ Cbo << "queue_token"sv << It->second->Token.ToString();
+ if (!It->second->ClientHostname.empty())
+ {
+ Cbo << "hostname"sv << It->second->ClientHostname;
+ }
+ }
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+int
+HttpComputeService::Impl::ResolveQueueToken(const Oid& Token)
+{
+ RwLock::SharedLockScope $(m_RemoteQueueLock);
+
+ auto It = m_RemoteQueuesByToken.find(Token);
+
+ if (It != m_RemoteQueuesByToken.end())
+ {
+ return It->second->QueueId;
+ }
+
+ return 0;
+}
+
+int
+HttpComputeService::Impl::ResolveQueueRef(HttpServerRequest& HttpReq, std::string_view Capture)
+{
+ if (OidMatcher(Capture))
+ {
+ // Remote OID token — accessible from any client
+ const Oid Token = Oid::FromHexString(Capture);
+ const int QueueId = ResolveQueueToken(Token);
+
+ if (QueueId == 0)
+ {
+ HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+
+ return QueueId;
+ }
+
+ // Local integer queue ID — restricted to local machine requests
+ if (!HttpReq.IsLocalMachineRequest())
+ {
+ HttpReq.WriteResponse(HttpResponseCode::Forbidden);
+ return 0;
+ }
+
+ return ParseInt<int>(Capture).value_or(0);
+}
+
+HttpComputeService::Impl::IngestStats
+HttpComputeService::Impl::IngestPackageAttachments(const CbPackage& Package)
+{
+ IngestStats Stats;
+
+ for (const CbAttachment& Attachment : Package.GetAttachments())
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ Stats.Bytes += CompressedSize;
+ ++Stats.Count;
+
+ const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ Stats.NewBytes += CompressedSize;
+ ++Stats.NewCount;
+ }
+ }
+
+ return Stats;
+}
+
+bool
+HttpComputeService::Impl::CheckAttachments(const CbObject& ActionObj, std::vector<IoHash>& NeedList)
+{
+ ActionObj.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ return NeedList.empty();
+}
+
+void
+HttpComputeService::Impl::HandleWorkersGet(HttpServerRequest& HttpReq)
+{
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers"sv);
+ for (const IoHash& WorkerId : m_ComputeService.GetKnownWorkerIds())
+ {
+ Cbo << WorkerId;
+ }
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpComputeService::Impl::HandleWorkersAllGet(HttpServerRequest& HttpReq)
+{
+ std::vector<IoHash> WorkerIds = m_ComputeService.GetKnownWorkerIds();
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("workers");
+
+ for (const IoHash& WorkerId : WorkerIds)
+ {
+ Cbo.BeginObject();
+ Cbo << "id" << WorkerId;
+ Cbo << "descriptor" << m_ComputeService.GetWorkerDescriptor(WorkerId).Descriptor.GetObject();
+ Cbo.EndObject();
+ }
+
+ Cbo.EndArray();
+ HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
+}
+
+void
+HttpComputeService::Impl::HandleWorkerRequest(HttpServerRequest& HttpReq, const IoHash& WorkerId)
+{
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ if (WorkerDesc Desc = m_ComputeService.GetWorkerDescriptor(WorkerId))
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject());
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+
+ case HttpVerb::kPost:
+ {
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject WorkerSpec = HttpReq.ReadPayloadObject();
+
+ HashKeySet ChunkSet;
+ WorkerSpec.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ ChunkSet.AddHashToSet(Hash);
+ });
+
+ CbPackage WorkerPackage;
+ WorkerPackage.SetObject(WorkerSpec);
+
+ m_CidStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
+ m_ComputeService.RegisterWorker(WorkerPackage);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+ ChunkSet.IterateHashes([&](const IoHash& Hash) {
+ ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
+ ResponseWriter.AddHash(Hash);
+ });
+ ResponseWriter.EndArray();
+
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage();
+ CbObject WorkerSpec = WorkerSpecPackage.GetObject();
+
+ std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+ TotalAttachmentBytes += Buffer.GetCompressedSize();
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += Buffer.GetCompressedSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ m_ComputeService.RegisterWorker(WorkerSpecPackage);
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+}
+
+//////////////////////////////////////////////////////////////////////////
+
+void
+httpcomputeservice_forcelink()
+{
+}
+
+} // namespace zen::compute
+
+#endif // ZEN_WITH_COMPUTE_SERVICES