diff options
Diffstat (limited to 'src/zencompute/httpfunctionservice.cpp')
| -rw-r--r-- | src/zencompute/httpfunctionservice.cpp | 709 |
1 files changed, 0 insertions, 709 deletions
diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp deleted file mode 100644 index 09a9684a7..000000000 --- a/src/zencompute/httpfunctionservice.cpp +++ /dev/null @@ -1,709 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "zencompute/httpfunctionservice.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include "functionrunner.h" - -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/system.h> -# include <zenstore/cidstore.h> - -# include <span> - -using namespace std::literals; - -namespace zen::compute { - -constinit AsciiSet g_DecimalSet("0123456789"); -auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); }; - -constinit AsciiSet g_HexSet("0123456789abcdefABCDEF"); -auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); }; - -HttpFunctionService::HttpFunctionService(CidStore& InCidStore, - IHttpStatsService& StatsService, - [[maybe_unused]] const std::filesystem::path& BaseDir) -: m_CidStore(InCidStore) -, m_StatsService(StatsService) -, m_Log(logging::Get("apply")) -, m_BaseDir(BaseDir) -, m_FunctionService(InCidStore) -{ - m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local"); - - m_StatsService.RegisterHandler("apply", *this); - - m_Router.AddMatcher("lsn", DecimalMatcher); - m_Router.AddMatcher("worker", IoHashMatcher); - m_Router.AddMatcher("action", IoHashMatcher); - - m_Router.RegisterRoute( - "ready", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - if (m_FunctionService.IsHealthy()) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok"); - } - - return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "workers", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - CbObjectWriter Cbo; - Cbo.BeginArray("workers"sv); - for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds()) - { - Cbo << WorkerId; - } - Cbo.EndArray(); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "workers/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId)) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject()); - } - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - - case HttpVerb::kPost: - { - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - CbObject WorkerSpec = HttpReq.ReadPayloadObject(); - - // Determine which pieces are missing and need to be transmitted - - HashKeySet ChunkSet; - - WorkerSpec.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - ChunkSet.AddHashToSet(Hash); - }); - - CbPackage WorkerPackage; - WorkerPackage.SetObject(WorkerSpec); - - m_CidStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - ZEN_DEBUG("worker {}: all attachments already available", WorkerId); - m_FunctionService.RegisterWorker(WorkerPackage); - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("need"); - - ChunkSet.IterateHashes([&](const IoHash& Hash) { - ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); - ResponseWriter.AddHash(Hash); - }); - - ResponseWriter.EndArray(); - - ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage(); - CbObject WorkerSpec = WorkerSpecPackage.GetObject(); - - std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer Buffer = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - TotalAttachmentBytes += Buffer.GetCompressedSize(); - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += Buffer.GetCompressedSize(); - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", - WorkerId, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - m_FunctionService.RegisterWorker(WorkerSpecPackage); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/completed", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - CbObjectWriter Cbo; - m_FunctionService.GetCompleted(Cbo); - - SystemMetrics Sm = GetSystemMetricsForReporting(); - Cbo.BeginObject("metrics"); - Describe(Sm, Cbo); - Cbo.EndObject(); - - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "jobs/history", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const auto QueryParams = HttpReq.GetQueryParams(); - - int QueryLimit = 50; - - if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false) - { - QueryLimit = ParseInt<int>(LimitParam).value_or(50); - } - - CbObjectWriter Cbo; - Cbo.BeginArray("history"); - for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit)) - { - Cbo.BeginObject(); - Cbo << "lsn"sv << Entry.Lsn; - Cbo << "actionId"sv << Entry.ActionId; - Cbo << "workerId"sv << Entry.WorkerId; - Cbo << "succeeded"sv << Entry.Succeeded; - Cbo << "actionDescriptor"sv << Entry.ActionDescriptor; - - for (const auto& Timestamp : Entry.Timestamps) - { - Cbo.AddInteger( - fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))), - Timestamp); - } - Cbo.EndObject(); - } - Cbo.EndArray(); - - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "jobs/{lsn}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)}); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbPackage Output; - HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output); - - if (ResponseCode == HttpResponseCode::OK) - { - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - - return HttpReq.WriteResponse(ResponseCode); - } - break; - - case HttpVerb::kPost: - { - // Add support for cancellation, priority changes - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the - // one which uses the scheduled action lsn for lookups - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); - - CbPackage Output; - if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output); - ResponseCode != HttpResponseCode::OK) - { - ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode)) - - if (ResponseCode == HttpResponseCode::NotFound) - { - return HttpReq.WriteResponse(ResponseCode); - } - - return HttpReq.WriteResponse(ResponseCode); - } - - ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2)) - - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "jobs/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId); - - if (!Worker) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - - const auto QueryParams = Req.ServerRequest().GetQueryParams(); - - int RequestPriority = -1; - - if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) - { - RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - // TODO: return status of all pending or executing jobs - break; - - case HttpVerb::kPost: - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject ActionObj = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - ActionObj.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything, enqueue the action for execution - - if (FunctionServiceSession::EnqueueResult Result = - m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) - { - ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn); - - HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - - return; - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - if (FunctionServiceSession::EnqueueResult Result = - m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority)) - { - ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)", - ActionObj.GetHash(), - Result.Lsn, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - - return; - } - break; - - default: - break; - } - break; - - default: - break; - } - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const auto QueryParams = HttpReq.GetQueryParams(); - - int RequestPriority = -1; - - if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false) - { - RequestPriority = ParseInt<int>(PriorityParam).value_or(-1); - } - - // Resolve worker - - // - - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject ActionObj = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - ActionObj.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything, enqueue the action for execution - - if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) - { - ZEN_DEBUG("action accepted (lsn {})", Result.Lsn); - - return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - // Could not resolve? - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority)) - { - ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)", - Result.Lsn, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage); - } - else - { - // Could not resolve? - return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage); - } - } - return; - } - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "workers/all", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - std::vector<IoHash> WorkerIds = m_FunctionService.GetKnownWorkerIds(); - - CbObjectWriter Cbo; - Cbo.BeginArray("workers"); - - for (const IoHash& WorkerId : WorkerIds) - { - Cbo.BeginObject(); - - Cbo << "id" << WorkerId; - - const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId); - - Cbo << "descriptor" << Descriptor.Descriptor.GetObject(); - - Cbo.EndObject(); - } - - Cbo.EndArray(); - - HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "sysinfo", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - SystemMetrics Sm = GetSystemMetricsForReporting(); - - CbObjectWriter Cbo; - Describe(Sm, Cbo); - - Cbo << "cpu_usage" << Sm.CpuUsagePercent; - Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024; - Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024; - Cbo << "disk_used" << 100 * 1024; - Cbo << "disk_total" << 100 * 1024 * 1024; - - return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save()); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "record/start", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording"); - - return HttpReq.WriteResponse(HttpResponseCode::OK); - }, - HttpVerb::kPost); - - m_Router.RegisterRoute( - "record/stop", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - m_FunctionService.StopRecording(); - - return HttpReq.WriteResponse(HttpResponseCode::OK); - }, - HttpVerb::kPost); -} - -HttpFunctionService::~HttpFunctionService() -{ - m_StatsService.UnregisterHandler("apply", *this); -} - -void -HttpFunctionService::Shutdown() -{ - m_FunctionService.Shutdown(); -} - -const char* -HttpFunctionService::BaseUri() const -{ - return "/apply/"; -} - -void -HttpFunctionService::HandleRequest(HttpServerRequest& Request) -{ - metrics::OperationTiming::Scope $(m_HttpRequests); - - if (m_Router.HandleRequest(Request) == false) - { - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -void -HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request) -{ - CbObjectWriter Cbo; - m_FunctionService.EmitStats(Cbo); - - Request.WriteResponse(HttpResponseCode::OK, Cbo.Save()); -} - -////////////////////////////////////////////////////////////////////////// - -void -httpfunction_forcelink() -{ -} - -} // namespace zen::compute - -#endif // ZEN_WITH_COMPUTE_SERVICES |