// Copyright Epic Games, Inc. All Rights Reserved. #include "zencompute/httpfunctionservice.h" #if ZEN_WITH_COMPUTE_SERVICES # include "functionrunner.h" # include # include # include # include # include # include # include # include # include # include # include # include using namespace std::literals; namespace zen::compute { constinit AsciiSet g_DecimalSet("0123456789"); auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; HttpFunctionService::HttpFunctionService(CidStore& InCidStore, IHttpStatsService& StatsService, [[maybe_unused]] const std::filesystem::path& BaseDir) : m_CidStore(InCidStore) , m_StatsService(StatsService) , m_Log(logging::Get("apply")) , m_BaseDir(BaseDir) , m_FunctionService(InCidStore) { m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local"); m_StatsService.RegisterHandler("apply", *this); m_Router.AddMatcher("lsn", DecimalMatcher); m_Router.AddMatcher("worker", IoHashMatcher); m_Router.AddMatcher("action", IoHashMatcher); m_Router.RegisterRoute( "ready", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); if (m_FunctionService.IsHealthy()) { return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); } return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); }, HttpVerb::kGet); m_Router.RegisterRoute( "workers", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); CbObjectWriter Cbo; Cbo.BeginArray("workers"sv); for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds()) { Cbo << WorkerId; } Cbo.EndArray(); return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "workers/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId)) { return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); } return HttpReq.WriteResponse(HttpResponseCode::NotFound); case HttpVerb::kPost: { switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { CbObject WorkerSpec = HttpReq.ReadPayloadObject(); // Determine which pieces are missing and need to be transmitted HashKeySet ChunkSet; WorkerSpec.IterateAttachments([&](CbFieldView Field) { const IoHash Hash = Field.AsHash(); ChunkSet.AddHashToSet(Hash); }); CbPackage WorkerPackage; WorkerPackage.SetObject(WorkerSpec); m_CidStore.FilterChunks(ChunkSet); if (ChunkSet.IsEmpty()) { ZEN_DEBUG("worker {}: all attachments already available", WorkerId); m_FunctionService.RegisterWorker(WorkerPackage); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } CbObjectWriter ResponseWriter; ResponseWriter.BeginArray("need"); ChunkSet.IterateHashes([&](const IoHash& Hash) { ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); ResponseWriter.AddHash(Hash); }); ResponseWriter.EndArray(); ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); } break; case HttpContentType::kCbPackage: { CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); CbObject WorkerSpec = WorkerSpecPackage.GetObject(); std::span Attachments = WorkerSpecPackage.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer Buffer = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); TotalAttachmentBytes += Buffer.GetCompressedSize(); ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += Buffer.GetCompressedSize(); ++NewAttachmentCount; } } ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", WorkerId, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); m_FunctionService.RegisterWorker(WorkerSpecPackage); return HttpReq.WriteResponse(HttpResponseCode::NoContent); } break; default: break; } } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/completed", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); CbObjectWriter Cbo; m_FunctionService.GetCompleted(Cbo); SystemMetrics Sm = GetSystemMetricsForReporting(); Cbo.BeginObject("metrics"); Describe(Sm, Cbo); Cbo.EndObject(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/history", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto QueryParams = HttpReq.GetQueryParams(); int QueryLimit = 50; if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) { QueryLimit = ParseInt(LimitParam).value_or(50); } CbObjectWriter Cbo; Cbo.BeginArray("history"); for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit)) { Cbo.BeginObject(); Cbo << "lsn"sv << Entry.Lsn; Cbo << "actionId"sv << Entry.ActionId; Cbo << "workerId"sv << Entry.WorkerId; Cbo << "succeeded"sv << Entry.Succeeded; Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; for (const auto& Timestamp : Entry.Timestamps) { Cbo.AddInteger( fmt::format("time_{}"sv, RunnerAction::ToString(static_cast(&Timestamp - Entry.Timestamps))), Timestamp); } Cbo.EndObject(); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{lsn}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)}); switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: { CbPackage Output; HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output); if (ResponseCode == HttpResponseCode::OK) { return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } return HttpReq.WriteResponse(ResponseCode); } break; case HttpVerb::kPost: { // Add support for cancellation, priority changes } break; default: break; } }, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the // one which uses the scheduled action lsn for lookups [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); CbPackage Output; if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output); ResponseCode != HttpResponseCode::OK) { ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) if (ResponseCode == HttpResponseCode::NotFound) { return HttpReq.WriteResponse(ResponseCode); } return HttpReq.WriteResponse(ResponseCode); } ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) return HttpReq.WriteResponse(HttpResponseCode::OK, Output); }, HttpVerb::kGet); m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId); if (!Worker) { return HttpReq.WriteResponse(HttpResponseCode::NotFound); } const auto QueryParams = Req.ServerRequest().GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } switch (HttpReq.RequestVerb()) { case HttpVerb::kGet: // TODO: return status of all pending or executing jobs break; case HttpVerb::kPost: switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { // This operation takes the proposed job spec and identifies which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject ActionObj = LoadCompactBinaryObject(Payload); std::vector NeedList; ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); if (NeedList.empty()) { // We already have everything, enqueue the action for execution if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) { ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } return; } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } break; case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); std::span Attachments = Action.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); TotalAttachmentBytes += CompressedSize; ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) { ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", ActionObj.GetHash(), Result.Lsn, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } return; } break; default: break; } break; default: break; } }, HttpVerb::kPost); m_Router.RegisterRoute( "jobs", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); const auto QueryParams = HttpReq.GetQueryParams(); int RequestPriority = -1; if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) { RequestPriority = ParseInt(PriorityParam).value_or(-1); } // Resolve worker // switch (HttpReq.RequestContentType()) { case HttpContentType::kCbObject: { // This operation takes the proposed job spec and identifies which // chunks are not present on this server. This list is then returned in // the "need" list in the response IoBuffer Payload = HttpReq.ReadPayload(); CbObject ActionObj = LoadCompactBinaryObject(Payload); std::vector NeedList; ActionObj.IterateAttachments([&](CbFieldView Field) { const IoHash FileHash = Field.AsHash(); if (!m_CidStore.ContainsChunk(FileHash)) { NeedList.push_back(FileHash); } }); if (NeedList.empty()) { // We already have everything, enqueue the action for execution if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) { ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { // Could not resolve? return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } CbObjectWriter Cbo; Cbo.BeginArray("need"); for (const IoHash& Hash : NeedList) { Cbo << Hash; } Cbo.EndArray(); CbObject Response = Cbo.Save(); return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); } case HttpContentType::kCbPackage: { CbPackage Action = HttpReq.ReadPayloadPackage(); CbObject ActionObj = Action.GetObject(); std::span Attachments = Action.GetAttachments(); int AttachmentCount = 0; int NewAttachmentCount = 0; uint64_t TotalAttachmentBytes = 0; uint64_t TotalNewBytes = 0; for (const CbAttachment& Attachment : Attachments) { ZEN_ASSERT(Attachment.IsCompressedBinary()); const IoHash DataHash = Attachment.GetHash(); CompressedBuffer DataView = Attachment.AsCompressedBinary(); ZEN_UNUSED(DataHash); const uint64_t CompressedSize = DataView.GetCompressedSize(); TotalAttachmentBytes += CompressedSize; ++AttachmentCount; const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); if (InsertResult.New) { TotalNewBytes += CompressedSize; ++NewAttachmentCount; } } if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) { ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", Result.Lsn, zen::NiceBytes(TotalAttachmentBytes), AttachmentCount, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); } else { // Could not resolve? return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); } } return; } }, HttpVerb::kPost); m_Router.RegisterRoute( "workers/all", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); std::vector WorkerIds = m_FunctionService.GetKnownWorkerIds(); CbObjectWriter Cbo; Cbo.BeginArray("workers"); for (const IoHash& WorkerId : WorkerIds) { Cbo.BeginObject(); Cbo << "id" << WorkerId; const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId); Cbo << "descriptor" << Descriptor.Descriptor.GetObject(); Cbo.EndObject(); } Cbo.EndArray(); HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "sysinfo", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); SystemMetrics Sm = GetSystemMetricsForReporting(); CbObjectWriter Cbo; Describe(Sm, Cbo); Cbo << "cpu_usage" << Sm.CpuUsagePercent; Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; Cbo << "disk_used" << 100 * 1024; Cbo << "disk_total" << 100 * 1024 * 1024; return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); }, HttpVerb::kGet); m_Router.RegisterRoute( "record/start", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording"); return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); m_Router.RegisterRoute( "record/stop", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); m_FunctionService.StopRecording(); return HttpReq.WriteResponse(HttpResponseCode::OK); }, HttpVerb::kPost); } HttpFunctionService::~HttpFunctionService() { m_StatsService.UnregisterHandler("apply", *this); } void HttpFunctionService::Shutdown() { m_FunctionService.Shutdown(); } const char* HttpFunctionService::BaseUri() const { return "/apply/"; } void HttpFunctionService::HandleRequest(HttpServerRequest& Request) { metrics::OperationTiming::Scope $(m_HttpRequests); if (m_Router.HandleRequest(Request) == false) { ZEN_WARN("No route found for {0}", Request.RelativeUri()); } } void HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request) { CbObjectWriter Cbo; m_FunctionService.EmitStats(Cbo); Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); } ////////////////////////////////////////////////////////////////////////// void httpfunction_forcelink() { } } // namespace zen::compute #endif // ZEN_WITH_COMPUTE_SERVICES