aboutsummaryrefslogtreecommitdiff
path: root/src/zencompute/httpfunctionservice.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zencompute/httpfunctionservice.cpp')
-rw-r--r--src/zencompute/httpfunctionservice.cpp709
1 files changed, 0 insertions, 709 deletions
diff --git a/src/zencompute/httpfunctionservice.cpp b/src/zencompute/httpfunctionservice.cpp
deleted file mode 100644
index 09a9684a7..000000000
--- a/src/zencompute/httpfunctionservice.cpp
+++ /dev/null
@@ -1,709 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "zencompute/httpfunctionservice.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include "functionrunner.h"
-
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compress.h>
-# include <zencore/except.h>
-# include <zencore/filesystem.h>
-# include <zencore/fmtutils.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/system.h>
-# include <zenstore/cidstore.h>
-
-# include <span>
-
-using namespace std::literals;
-
-namespace zen::compute {
-
-constinit AsciiSet g_DecimalSet("0123456789");
-auto DecimalMatcher = [](std::string_view Str) { return AsciiSet::HasOnly(Str, g_DecimalSet); };
-
-constinit AsciiSet g_HexSet("0123456789abcdefABCDEF");
-auto IoHashMatcher = [](std::string_view Str) { return Str.size() == 40 && AsciiSet::HasOnly(Str, g_HexSet); };
-
-HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
- IHttpStatsService& StatsService,
- [[maybe_unused]] const std::filesystem::path& BaseDir)
-: m_CidStore(InCidStore)
-, m_StatsService(StatsService)
-, m_Log(logging::Get("apply"))
-, m_BaseDir(BaseDir)
-, m_FunctionService(InCidStore)
-{
- m_FunctionService.AddLocalRunner(InCidStore, m_BaseDir / "local");
-
- m_StatsService.RegisterHandler("apply", *this);
-
- m_Router.AddMatcher("lsn", DecimalMatcher);
- m_Router.AddMatcher("worker", IoHashMatcher);
- m_Router.AddMatcher("action", IoHashMatcher);
-
- m_Router.RegisterRoute(
- "ready",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- if (m_FunctionService.IsHealthy())
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, HttpContentType::kText, "ok");
- }
-
- return HttpReq.WriteResponse(HttpResponseCode::ServiceUnavailable);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "workers",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("workers"sv);
- for (const IoHash& WorkerId : m_FunctionService.GetKnownWorkerIds())
- {
- Cbo << WorkerId;
- }
- Cbo.EndArray();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "workers/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- if (WorkerDesc Desc = m_FunctionService.GetWorkerDescriptor(WorkerId))
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor.GetObject());
- }
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
-
- case HttpVerb::kPost:
- {
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- CbObject WorkerSpec = HttpReq.ReadPayloadObject();
-
- // Determine which pieces are missing and need to be transmitted
-
- HashKeySet ChunkSet;
-
- WorkerSpec.IterateAttachments([&](CbFieldView Field) {
- const IoHash Hash = Field.AsHash();
- ChunkSet.AddHashToSet(Hash);
- });
-
- CbPackage WorkerPackage;
- WorkerPackage.SetObject(WorkerSpec);
-
- m_CidStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
- m_FunctionService.RegisterWorker(WorkerPackage);
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
-
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("need");
-
- ChunkSet.IterateHashes([&](const IoHash& Hash) {
- ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
- ResponseWriter.AddHash(Hash);
- });
-
- ResponseWriter.EndArray();
-
- ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage WorkerSpecPackage = HttpReq.ReadPayloadPackage();
- CbObject WorkerSpec = WorkerSpecPackage.GetObject();
-
- std::span<const CbAttachment> Attachments = WorkerSpecPackage.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer Buffer = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
- TotalAttachmentBytes += Buffer.GetCompressedSize();
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += Buffer.GetCompressedSize();
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
- WorkerId,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- m_FunctionService.RegisterWorker(WorkerSpecPackage);
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- break;
-
- default:
- break;
- }
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/completed",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- CbObjectWriter Cbo;
- m_FunctionService.GetCompleted(Cbo);
-
- SystemMetrics Sm = GetSystemMetricsForReporting();
- Cbo.BeginObject("metrics");
- Describe(Sm, Cbo);
- Cbo.EndObject();
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "jobs/history",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const auto QueryParams = HttpReq.GetQueryParams();
-
- int QueryLimit = 50;
-
- if (auto LimitParam = QueryParams.GetValue("limit"); LimitParam.empty() == false)
- {
- QueryLimit = ParseInt<int>(LimitParam).value_or(50);
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("history");
- for (const auto& Entry : m_FunctionService.GetActionHistory(QueryLimit))
- {
- Cbo.BeginObject();
- Cbo << "lsn"sv << Entry.Lsn;
- Cbo << "actionId"sv << Entry.ActionId;
- Cbo << "workerId"sv << Entry.WorkerId;
- Cbo << "succeeded"sv << Entry.Succeeded;
- Cbo << "actionDescriptor"sv << Entry.ActionDescriptor;
-
- for (const auto& Timestamp : Entry.Timestamps)
- {
- Cbo.AddInteger(
- fmt::format("time_{}"sv, RunnerAction::ToString(static_cast<RunnerAction::State>(&Timestamp - Entry.Timestamps))),
- Timestamp);
- }
- Cbo.EndObject();
- }
- Cbo.EndArray();
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "jobs/{lsn}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const int ActionLsn = std::stoi(std::string{Req.GetCapture(1)});
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbPackage Output;
- HttpResponseCode ResponseCode = m_FunctionService.GetActionResult(ActionLsn, Output);
-
- if (ResponseCode == HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
-
- return HttpReq.WriteResponse(ResponseCode);
- }
- break;
-
- case HttpVerb::kPost:
- {
- // Add support for cancellation, priority changes
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{worker}/{action}", // This route is inefficient, and is only here for backwards compatibility. The preferred path is the
- // one which uses the scheduled action lsn for lookups
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
-
- CbPackage Output;
- if (HttpResponseCode ResponseCode = m_FunctionService.FindActionResult(ActionId, /* out */ Output);
- ResponseCode != HttpResponseCode::OK)
- {
- ZEN_TRACE("jobs/{}/{}: {}", Req.GetCapture(1), Req.GetCapture(2), ToString(ResponseCode))
-
- if (ResponseCode == HttpResponseCode::NotFound)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
-
- return HttpReq.WriteResponse(ResponseCode);
- }
-
- ZEN_DEBUG("jobs/{}/{}: OK", Req.GetCapture(1), Req.GetCapture(2))
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "jobs/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- WorkerDesc Worker = m_FunctionService.GetWorkerDescriptor(WorkerId);
-
- if (!Worker)
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
-
- const auto QueryParams = Req.ServerRequest().GetQueryParams();
-
- int RequestPriority = -1;
-
- if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
- {
- RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
- }
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- // TODO: return status of all pending or executing jobs
- break;
-
- case HttpVerb::kPost:
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject ActionObj = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- ActionObj.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything, enqueue the action for execution
-
- if (FunctionServiceSession::EnqueueResult Result =
- m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("action {} accepted (lsn {})", ActionObj.GetHash(), Result.Lsn);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
-
- return;
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- if (FunctionServiceSession::EnqueueResult Result =
- m_FunctionService.EnqueueResolvedAction(Worker, ActionObj, RequestPriority))
- {
- ZEN_DEBUG("accepted action {} (lsn {}): {} in {} attachments. {} new ({} attachments)",
- ActionObj.GetHash(),
- Result.Lsn,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
-
- return;
- }
- break;
-
- default:
- break;
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const auto QueryParams = HttpReq.GetQueryParams();
-
- int RequestPriority = -1;
-
- if (auto PriorityParam = QueryParams.GetValue("priority"); PriorityParam.empty() == false)
- {
- RequestPriority = ParseInt<int>(PriorityParam).value_or(-1);
- }
-
- // Resolve worker
-
- //
-
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject ActionObj = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- ActionObj.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything, enqueue the action for execution
-
- if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority))
- {
- ZEN_DEBUG("action accepted (lsn {})", Result.Lsn);
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- // Could not resolve?
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- if (FunctionServiceSession::EnqueueResult Result = m_FunctionService.EnqueueAction(ActionObj, RequestPriority))
- {
- ZEN_DEBUG("accepted action (lsn {}): {} in {} attachments. {} new ({} attachments)",
- Result.Lsn,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Result.ResponseMessage);
- }
- else
- {
- // Could not resolve?
- return HttpReq.WriteResponse(HttpResponseCode::FailedDependency, Result.ResponseMessage);
- }
- }
- return;
- }
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "workers/all",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- std::vector<IoHash> WorkerIds = m_FunctionService.GetKnownWorkerIds();
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("workers");
-
- for (const IoHash& WorkerId : WorkerIds)
- {
- Cbo.BeginObject();
-
- Cbo << "id" << WorkerId;
-
- const auto& Descriptor = m_FunctionService.GetWorkerDescriptor(WorkerId);
-
- Cbo << "descriptor" << Descriptor.Descriptor.GetObject();
-
- Cbo.EndObject();
- }
-
- Cbo.EndArray();
-
- HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "sysinfo",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- SystemMetrics Sm = GetSystemMetricsForReporting();
-
- CbObjectWriter Cbo;
- Describe(Sm, Cbo);
-
- Cbo << "cpu_usage" << Sm.CpuUsagePercent;
- Cbo << "memory_total" << Sm.SystemMemoryMiB * 1024 * 1024;
- Cbo << "memory_used" << (Sm.SystemMemoryMiB - Sm.AvailSystemMemoryMiB) * 1024 * 1024;
- Cbo << "disk_used" << 100 * 1024;
- Cbo << "disk_total" << 100 * 1024 * 1024;
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Cbo.Save());
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "record/start",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- m_FunctionService.StartRecording(m_CidStore, m_BaseDir / "recording");
-
- return HttpReq.WriteResponse(HttpResponseCode::OK);
- },
- HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "record/stop",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- m_FunctionService.StopRecording();
-
- return HttpReq.WriteResponse(HttpResponseCode::OK);
- },
- HttpVerb::kPost);
-}
-
-HttpFunctionService::~HttpFunctionService()
-{
- m_StatsService.UnregisterHandler("apply", *this);
-}
-
-void
-HttpFunctionService::Shutdown()
-{
- m_FunctionService.Shutdown();
-}
-
-const char*
-HttpFunctionService::BaseUri() const
-{
- return "/apply/";
-}
-
-void
-HttpFunctionService::HandleRequest(HttpServerRequest& Request)
-{
- metrics::OperationTiming::Scope $(m_HttpRequests);
-
- if (m_Router.HandleRequest(Request) == false)
- {
- ZEN_WARN("No route found for {0}", Request.RelativeUri());
- }
-}
-
-void
-HttpFunctionService::HandleStatsRequest(HttpServerRequest& Request)
-{
- CbObjectWriter Cbo;
- m_FunctionService.EmitStats(Cbo);
-
- Request.WriteResponse(HttpResponseCode::OK, Cbo.Save());
-}
-
-//////////////////////////////////////////////////////////////////////////
-
-void
-httpfunction_forcelink()
-{
-}
-
-} // namespace zen::compute
-
-#endif // ZEN_WITH_COMPUTE_SERVICES