aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute/function.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/compute/function.cpp')
-rw-r--r--src/zenserver/compute/function.cpp629
1 files changed, 629 insertions, 0 deletions
diff --git a/src/zenserver/compute/function.cpp b/src/zenserver/compute/function.cpp
new file mode 100644
index 000000000..493e2666e
--- /dev/null
+++ b/src/zenserver/compute/function.cpp
@@ -0,0 +1,629 @@
+// Copyright Epic Games, Inc. All Rights Reserved.
+
+#include "function.h"
+
+#if ZEN_WITH_COMPUTE_SERVICES
+
+# include <upstream/jupiter.h>
+# include <upstream/upstreamapply.h>
+# include <upstream/upstreamcache.h>
+# include <zencore/compactbinary.h>
+# include <zencore/compactbinarybuilder.h>
+# include <zencore/compactbinarypackage.h>
+# include <zencore/compress.h>
+# include <zencore/except.h>
+# include <zencore/filesystem.h>
+# include <zencore/fmtutils.h>
+# include <zencore/iobuffer.h>
+# include <zencore/iohash.h>
+# include <zencore/scopeguard.h>
+# include <zenstore/cidstore.h>
+
+# include <span>
+
+using namespace std::literals;
+
+namespace zen {
+
+HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
+ const CloudCacheClientOptions& ComputeOptions,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ AuthMgr& Mgr)
+: m_Log(logging::Get("apply"))
+, m_CidStore(InCidStore)
+{
+ m_UpstreamApply = UpstreamApply::Create({}, m_CidStore);
+
+ InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] {
+ auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
+ ComputeAuthConfig,
+ StorageOptions,
+ StorageAuthConfig,
+ m_CidStore,
+ Mgr);
+ m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
+ m_UpstreamApply->Initialize();
+ }};
+
+ m_Router.AddPattern("job", "([[:digit:]]+)");
+ m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("action", "([[:xdigit:]]{40})");
+
+ m_Router.RegisterRoute(
+ "ready",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable);
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "workers/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ const WorkerDesc& Desc = It->second;
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor);
+ }
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ CbObject FunctionSpec = HttpReq.ReadPayloadObject();
+
+ // Determine which pieces are missing and need to be transmitted to populate CAS
+
+ HashKeySet ChunkSet;
+
+ FunctionSpec.IterateAttachments([&](CbFieldView Field) {
+ const IoHash Hash = Field.AsHash();
+ ChunkSet.AddHashToSet(Hash);
+ });
+
+ // Note that we store executables uncompressed to make it
+ // more straightforward and efficient to materialize them, hence
+ // the CAS lookup here instead of CID for the input payloads
+
+ m_CidStore.FilterChunks(ChunkSet);
+
+ if (ChunkSet.IsEmpty())
+ {
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
+
+ ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ else
+ {
+ CbObjectWriter ResponseWriter;
+ ResponseWriter.BeginArray("need");
+
+ ChunkSet.IterateHashes([&](const IoHash& Hash) {
+ ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
+
+ ResponseWriter.AddHash(Hash);
+ });
+
+ ResponseWriter.EndArray();
+
+ ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
+ }
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
+
+ CbObject Obj = FunctionSpec.GetObject();
+
+ std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer Buffer = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+ TotalAttachmentBytes += Buffer.GetCompressedSize();
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += Buffer.GetCompressedSize();
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
+ WorkerId,
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ RwLock::ExclusiveLockScope _(m_WorkerLock);
+
+ m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
+
+ return HttpReq.WriteResponse(HttpResponseCode::NoContent);
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{job}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ break;
+
+ case HttpVerb::kPost:
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}/{action}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
+ "simple/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ Worker = It->second;
+ }
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+ m_WorkerMap.erase(WorkerId);
+ }
+
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+
+ case HttpVerb::kPost:
+ {
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kGet | HttpVerb::kPost);
+
+ m_Router.RegisterRoute(
+ "jobs/{worker}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+
+ WorkerDesc Worker;
+
+ {
+ RwLock::SharedLockScope _(m_WorkerLock);
+
+ if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
+ {
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound);
+ }
+ else
+ {
+ Worker = It->second;
+ }
+ }
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ // TODO: return status of all pending or executing jobs
+ break;
+
+ case HttpVerb::kPost:
+ switch (HttpReq.RequestContentType())
+ {
+ case HttpContentType::kCbObject:
+ {
+ // This operation takes the proposed job spec and identifies which
+ // chunks are not present on this server. This list is then returned in
+ // the "need" list in the response
+
+ IoBuffer Payload = HttpReq.ReadPayload();
+ CbObject RequestObject = LoadCompactBinaryObject(Payload);
+
+ std::vector<IoHash> NeedList;
+
+ RequestObject.IterateAttachments([&](CbFieldView Field) {
+ const IoHash FileHash = Field.AsHash();
+
+ if (!m_CidStore.ContainsChunk(FileHash))
+ {
+ NeedList.push_back(FileHash);
+ }
+ });
+
+ if (NeedList.empty())
+ {
+ // We already have everything
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output);
+
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+
+ CbObjectWriter Cbo;
+ Cbo.BeginArray("need");
+
+ for (const IoHash& Hash : NeedList)
+ {
+ Cbo << Hash;
+ }
+
+ Cbo.EndArray();
+ CbObject Response = Cbo.Save();
+
+ return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
+ }
+ break;
+
+ case HttpContentType::kCbPackage:
+ {
+ CbPackage Action = HttpReq.ReadPayloadPackage();
+ CbObject ActionObj = Action.GetObject();
+
+ std::span<const CbAttachment> Attachments = Action.GetAttachments();
+
+ int AttachmentCount = 0;
+ int NewAttachmentCount = 0;
+ uint64_t TotalAttachmentBytes = 0;
+ uint64_t TotalNewBytes = 0;
+
+ for (const CbAttachment& Attachment : Attachments)
+ {
+ ZEN_ASSERT(Attachment.IsCompressedBinary());
+
+ const IoHash DataHash = Attachment.GetHash();
+ CompressedBuffer DataView = Attachment.AsCompressedBinary();
+
+ ZEN_UNUSED(DataHash);
+
+ const uint64_t CompressedSize = DataView.GetCompressedSize();
+
+ TotalAttachmentBytes += CompressedSize;
+ ++AttachmentCount;
+
+ const CidStore::InsertResult InsertResult =
+ m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
+
+ if (InsertResult.New)
+ {
+ TotalNewBytes += CompressedSize;
+ ++NewAttachmentCount;
+ }
+ }
+
+ ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)",
+ zen::NiceBytes(TotalAttachmentBytes),
+ AttachmentCount,
+ zen::NiceBytes(TotalNewBytes),
+ NewAttachmentCount);
+
+ CbObject Output;
+ HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output);
+
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+
+ default:
+ break;
+ }
+ break;
+
+ default:
+ break;
+ }
+ },
+ HttpVerb::kPost);
+}
+
+HttpFunctionService::~HttpFunctionService()
+{
+}
+
+const char*
+HttpFunctionService::BaseUri() const
+{
+ return "/apply/";
+}
+
+void
+HttpFunctionService::HandleRequest(HttpServerRequest& Request)
+{
+ if (m_Router.HandleRequest(Request) == false)
+ {
+ ZEN_WARN("No route found for {0}", Request.RelativeUri());
+ }
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+
+ ZEN_INFO("Action {} being processed...", WorkerId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple});
+ if (!EnqueueResult.Success)
+ {
+ ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString());
+ return HttpResponseCode::InternalServerError;
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+
+ Object = Writer.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object)
+{
+ const static IoHash Empty = CbObject().GetHash();
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty);
+ if (!Status.Success)
+ {
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+
+ if (!Completed.Success)
+ {
+ ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
+ WorkerId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason,
+ Completed.Error.ErrorCode);
+
+ if (Completed.Error.ErrorCode == 0)
+ {
+ Completed.Error.ErrorCode = -1;
+ }
+ if (Completed.StdErr.empty() && !Completed.Error.Reason.empty())
+ {
+ Completed.StdErr = Completed.Error.Reason;
+ }
+ }
+ else
+ {
+ ZEN_INFO("Action {} completed with {} files ExitCode={}",
+ WorkerId.ToHexString(),
+ Completed.OutputFiles.size(),
+ Completed.Error.ErrorCode);
+ }
+
+ CbObjectWriter ResultObject;
+
+ ResultObject.AddString("agent"sv, Completed.Agent);
+ ResultObject.AddString("detail"sv, Completed.Detail);
+ ResultObject.AddString("stdout"sv, Completed.StdOut);
+ ResultObject.AddString("stderr"sv, Completed.StdErr);
+ ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode);
+ ResultObject.BeginArray("stats"sv);
+ for (const auto& Timepoint : Completed.Timepoints)
+ {
+ ResultObject.BeginObject();
+ ResultObject.AddString("name"sv, Timepoint.first);
+ ResultObject.AddDateTimeTicks("time"sv, Timepoint.second);
+ ResultObject.EndObject();
+ }
+ ResultObject.EndArray();
+
+ ResultObject.BeginArray("files"sv);
+ for (const auto& File : Completed.OutputFiles)
+ {
+ ResultObject.BeginObject();
+ ResultObject.AddString("name"sv, File.first.string());
+ ResultObject.AddBinary("data"sv, Completed.FileData[File.second]);
+ ResultObject.EndObject();
+ }
+ ResultObject.EndArray();
+
+ Object = ResultObject.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+ const IoHash ActionId = Action.GetHash();
+
+ Action.MakeOwned();
+
+ ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream(
+ {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset});
+
+ if (!EnqueueResult.Success)
+ {
+ ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString());
+ return HttpResponseCode::InternalServerError;
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+ Writer.AddHash("action", ActionId);
+
+ Object = Writer.Save();
+ return HttpResponseCode::OK;
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
+{
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
+ if (!Status.Success)
+ {
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+ if (!Completed.Success || Completed.Error.ErrorCode != 0)
+ {
+ ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason,
+ Completed.Error.ErrorCode);
+
+ return HttpResponseCode::InternalServerError;
+ }
+
+ ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.OutputPackage.GetAttachments().size(),
+ NiceBytes(Completed.TotalAttachmentBytes),
+ NiceBytes(Completed.TotalRawAttachmentBytes));
+
+ Package = std::move(Completed.OutputPackage);
+ return HttpResponseCode::OK;
+}
+
+} // namespace zen
+
+#endif // ZEN_WITH_COMPUTE_SERVICES