aboutsummaryrefslogtreecommitdiff
path: root/src/zenserver/compute/function.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/zenserver/compute/function.cpp')
-rw-r--r--src/zenserver/compute/function.cpp629
1 files changed, 0 insertions, 629 deletions
diff --git a/src/zenserver/compute/function.cpp b/src/zenserver/compute/function.cpp
deleted file mode 100644
index 493e2666e..000000000
--- a/src/zenserver/compute/function.cpp
+++ /dev/null
@@ -1,629 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "function.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <upstream/jupiter.h>
-# include <upstream/upstreamapply.h>
-# include <upstream/upstreamcache.h>
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compress.h>
-# include <zencore/except.h>
-# include <zencore/filesystem.h>
-# include <zencore/fmtutils.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/scopeguard.h>
-# include <zenstore/cidstore.h>
-
-# include <span>
-
-using namespace std::literals;
-
-namespace zen {
-
-HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
- const CloudCacheClientOptions& ComputeOptions,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const UpstreamAuthConfig& StorageAuthConfig,
- AuthMgr& Mgr)
-: m_Log(logging::Get("apply"))
-, m_CidStore(InCidStore)
-{
- m_UpstreamApply = UpstreamApply::Create({}, m_CidStore);
-
- InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] {
- auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
- ComputeAuthConfig,
- StorageOptions,
- StorageAuthConfig,
- m_CidStore,
- Mgr);
- m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
- m_UpstreamApply->Initialize();
- }};
-
- m_Router.AddPattern("job", "([[:digit:]]+)");
- m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
- m_Router.AddPattern("action", "([[:xdigit:]]{40})");
-
- m_Router.RegisterRoute(
- "ready",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "workers/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- const WorkerDesc& Desc = It->second;
- return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor);
- }
- }
- break;
-
- case HttpVerb::kPost:
- {
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- CbObject FunctionSpec = HttpReq.ReadPayloadObject();
-
- // Determine which pieces are missing and need to be transmitted to populate CAS
-
- HashKeySet ChunkSet;
-
- FunctionSpec.IterateAttachments([&](CbFieldView Field) {
- const IoHash Hash = Field.AsHash();
- ChunkSet.AddHashToSet(Hash);
- });
-
- // Note that we store executables uncompressed to make it
- // more straightforward and efficient to materialize them, hence
- // the CAS lookup here instead of CID for the input payloads
-
- m_CidStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
-
- ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- else
- {
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("need");
-
- ChunkSet.IterateHashes([&](const IoHash& Hash) {
- ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
-
- ResponseWriter.AddHash(Hash);
- });
-
- ResponseWriter.EndArray();
-
- ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
- }
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
-
- CbObject Obj = FunctionSpec.GetObject();
-
- std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer Buffer = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
- TotalAttachmentBytes += Buffer.GetCompressedSize();
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += Buffer.GetCompressedSize();
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
- WorkerId,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- break;
-
- default:
- break;
- }
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{job}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- break;
-
- case HttpVerb::kPost:
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{worker}/{action}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
- const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbPackage Output;
- HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
- }
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "simple/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- WorkerDesc Worker;
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- Worker = It->second;
- }
- }
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
- m_WorkerMap.erase(WorkerId);
- }
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- case HttpVerb::kPost:
- {
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- WorkerDesc Worker;
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- Worker = It->second;
- }
- }
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- // TODO: return status of all pending or executing jobs
- break;
-
- case HttpVerb::kPost:
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- RequestObject.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output);
-
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)",
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output);
-
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- default:
- break;
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kPost);
-}
-
-HttpFunctionService::~HttpFunctionService()
-{
-}
-
-const char*
-HttpFunctionService::BaseUri() const
-{
- return "/apply/";
-}
-
-void
-HttpFunctionService::HandleRequest(HttpServerRequest& Request)
-{
- if (m_Router.HandleRequest(Request) == false)
- {
- ZEN_WARN("No route found for {0}", Request.RelativeUri());
- }
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object)
-{
- const IoHash WorkerId = Worker.Descriptor.GetHash();
-
- ZEN_INFO("Action {} being processed...", WorkerId.ToHexString());
-
- auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple});
- if (!EnqueueResult.Success)
- {
- ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString());
- return HttpResponseCode::InternalServerError;
- }
-
- CbObjectWriter Writer;
- Writer.AddHash("worker", WorkerId);
-
- Object = Writer.Save();
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object)
-{
- const static IoHash Empty = CbObject().GetHash();
- auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty);
- if (!Status.Success)
- {
- return HttpResponseCode::NotFound;
- }
-
- if (Status.Status.State != UpstreamApplyState::Complete)
- {
- return HttpResponseCode::Accepted;
- }
-
- GetUpstreamApplyResult& Completed = Status.Status.Result;
-
- if (!Completed.Success)
- {
- ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
- WorkerId.ToHexString(),
- Completed.StdOut,
- Completed.StdErr,
- Completed.Error.Reason,
- Completed.Error.ErrorCode);
-
- if (Completed.Error.ErrorCode == 0)
- {
- Completed.Error.ErrorCode = -1;
- }
- if (Completed.StdErr.empty() && !Completed.Error.Reason.empty())
- {
- Completed.StdErr = Completed.Error.Reason;
- }
- }
- else
- {
- ZEN_INFO("Action {} completed with {} files ExitCode={}",
- WorkerId.ToHexString(),
- Completed.OutputFiles.size(),
- Completed.Error.ErrorCode);
- }
-
- CbObjectWriter ResultObject;
-
- ResultObject.AddString("agent"sv, Completed.Agent);
- ResultObject.AddString("detail"sv, Completed.Detail);
- ResultObject.AddString("stdout"sv, Completed.StdOut);
- ResultObject.AddString("stderr"sv, Completed.StdErr);
- ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode);
- ResultObject.BeginArray("stats"sv);
- for (const auto& Timepoint : Completed.Timepoints)
- {
- ResultObject.BeginObject();
- ResultObject.AddString("name"sv, Timepoint.first);
- ResultObject.AddDateTimeTicks("time"sv, Timepoint.second);
- ResultObject.EndObject();
- }
- ResultObject.EndArray();
-
- ResultObject.BeginArray("files"sv);
- for (const auto& File : Completed.OutputFiles)
- {
- ResultObject.BeginObject();
- ResultObject.AddString("name"sv, File.first.string());
- ResultObject.AddBinary("data"sv, Completed.FileData[File.second]);
- ResultObject.EndObject();
- }
- ResultObject.EndArray();
-
- Object = ResultObject.Save();
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
-{
- const IoHash WorkerId = Worker.Descriptor.GetHash();
- const IoHash ActionId = Action.GetHash();
-
- Action.MakeOwned();
-
- ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
-
- auto EnqueueResult = m_UpstreamApply->EnqueueUpstream(
- {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset});
-
- if (!EnqueueResult.Success)
- {
- ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString());
- return HttpResponseCode::InternalServerError;
- }
-
- CbObjectWriter Writer;
- Writer.AddHash("worker", WorkerId);
- Writer.AddHash("action", ActionId);
-
- Object = Writer.Save();
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
-{
- auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
- if (!Status.Success)
- {
- return HttpResponseCode::NotFound;
- }
-
- if (Status.Status.State != UpstreamApplyState::Complete)
- {
- return HttpResponseCode::Accepted;
- }
-
- GetUpstreamApplyResult& Completed = Status.Status.Result;
- if (!Completed.Success || Completed.Error.ErrorCode != 0)
- {
- ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
- WorkerId.ToHexString(),
- ActionId.ToHexString(),
- Completed.StdOut,
- Completed.StdErr,
- Completed.Error.Reason,
- Completed.Error.ErrorCode);
-
- return HttpResponseCode::InternalServerError;
- }
-
- ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
- WorkerId.ToHexString(),
- ActionId.ToHexString(),
- Completed.OutputPackage.GetAttachments().size(),
- NiceBytes(Completed.TotalAttachmentBytes),
- NiceBytes(Completed.TotalRawAttachmentBytes));
-
- Package = std::move(Completed.OutputPackage);
- return HttpResponseCode::OK;
-}
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES