diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-22 11:47:38 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-22 11:47:38 -0700 |
| commit | cc5adf4cb79c92993fabfe09e75dfadb7d4c9665 (patch) | |
| tree | 4ba0a18f68e39685fa784d872bbb4bb9ba2b6fd7 /zenserver/compute/function.cpp | |
| parent | move workthreadpool to zencore (#63) (diff) | |
| download | zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.tar.xz zen-cc5adf4cb79c92993fabfe09e75dfadb7d4c9665.zip | |
Enable Horde compute code on Linux & Mac (#61)
Diffstat (limited to 'zenserver/compute/function.cpp')
| -rw-r--r-- | zenserver/compute/function.cpp | 473 |
1 files changed, 473 insertions, 0 deletions
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp new file mode 100644 index 000000000..9af3efcec --- /dev/null +++ b/zenserver/compute/function.cpp @@ -0,0 +1,473 @@ +// Copyright Epic Games, Inc. All Rights Reserved. + +#include "function.h" + +#if ZEN_WITH_COMPUTE_SERVICES + +# include <upstream/jupiter.h> +# include <upstream/upstreamapply.h> +# include <upstream/upstreamcache.h> +# include <zencore/compactbinary.h> +# include <zencore/compactbinarybuilder.h> +# include <zencore/compactbinarypackage.h> +# include <zencore/compress.h> +# include <zencore/except.h> +# include <zencore/filesystem.h> +# include <zencore/fmtutils.h> +# include <zencore/iobuffer.h> +# include <zencore/iohash.h> +# include <zencore/scopeguard.h> +# include <zenstore/cas.h> +# include <zenstore/cidstore.h> + +# include <span> + +using namespace std::literals; + +namespace zen { + +HttpFunctionService::HttpFunctionService(CasStore& Store, + CidStore& InCidStore, + const CloudCacheClientOptions& ComputeOptions, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const UpstreamAuthConfig& StorageAuthConfig, + AuthMgr& Mgr) +: m_Log(logging::Get("apply")) +, m_CasStore(Store) +, m_CidStore(InCidStore) +{ + m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); + + auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + m_CasStore, + m_CidStore, + Mgr); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + + m_Router.AddPattern("job", "([[:digit:]]+)"); + m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); + m_Router.AddPattern("action", "([[:xdigit:]]{40})"); + + m_Router.RegisterRoute( + "workers/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + else + { + const WorkerDesc& Desc = It->second; + return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); + } + } + break; + + case HttpVerb::kPost: + { + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + CbObject FunctionSpec = HttpReq.ReadPayloadObject(); + + // Determine which pieces are missing and need to be transmitted to populate CAS + + CasChunkSet ChunkSet; + + FunctionSpec.IterateAttachments([&](CbFieldView Field) { + const IoHash Hash = Field.AsHash(); + ChunkSet.AddChunkToSet(Hash); + }); + + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them, hence + // the CAS lookup here instead of CID for the input payloads + + m_CasStore.FilterChunks(ChunkSet); + + if (ChunkSet.IsEmpty()) + { + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); + + ZEN_DEBUG("worker {}: all attachments already available", WorkerId); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + else + { + CbObjectWriter ResponseWriter; + ResponseWriter.BeginArray("need"); + + ChunkSet.IterateChunks([&](const IoHash& Hash) { + ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); + + ResponseWriter.AddHash(Hash); + }); + + ResponseWriter.EndArray(); + + ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); + } + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); + + CbObject Obj = FunctionSpec.GetObject(); + + std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + SharedBuffer Decompressed = DataView.Decompress(); + const uint64_t DecompressedSize = DataView.GetRawSize(); + + ZEN_UNUSED(DataHash); + + TotalAttachmentBytes += DecompressedSize; + ++AttachmentCount; + + // Note that we store executables uncompressed to make it + // more straightforward and efficient to materialize them + + const CasStore::InsertResult InsertResult = + m_CasStore.InsertChunk(Decompressed.AsIoBuffer(), IoHash::FromBLAKE3(DataView.GetRawHash())); + + if (InsertResult.New) + { + TotalNewBytes += DecompressedSize; + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", + WorkerId, + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + RwLock::ExclusiveLockScope _(m_WorkerLock); + + m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); + + return HttpReq.WriteResponse(HttpResponseCode::NoContent); + } + break; + + default: + break; + } + } + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{job}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + break; + + case HttpVerb::kPost: + break; + + default: + break; + } + }, + HttpVerb::kGet | HttpVerb::kPost); + + m_Router.RegisterRoute( + "jobs/{worker}/{action}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( + "jobs/{worker}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + + WorkerDesc Worker; + + { + RwLock::SharedLockScope _(m_WorkerLock); + + if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) + { + return HttpReq.WriteResponse(HttpResponseCode::NotFound); + } + else + { + Worker = It->second; + } + } + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + // TODO: return status of all pending or executing jobs + break; + + case HttpVerb::kPost: + switch (HttpReq.RequestContentType()) + { + case HttpContentType::kCbObject: + { + // This operation takes the proposed job spec and identifies which + // chunks are not present on this server. This list is then returned in + // the "need" list in the response + + IoBuffer Payload = HttpReq.ReadPayload(); + CbObject RequestObject = LoadCompactBinaryObject(Payload); + + std::vector<IoHash> NeedList; + + RequestObject.IterateAttachments([&](CbFieldView Field) { + const IoHash FileHash = Field.AsHash(); + + if (!m_CidStore.ContainsChunk(FileHash)) + { + NeedList.push_back(FileHash); + } + }); + + if (NeedList.empty()) + { + // We already have everything + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); + + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + + CbObjectWriter Cbo; + Cbo.BeginArray("need"); + + for (const IoHash& Hash : NeedList) + { + Cbo << Hash; + } + + Cbo.EndArray(); + CbObject Response = Cbo.Save(); + + return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); + } + break; + + case HttpContentType::kCbPackage: + { + CbPackage Action = HttpReq.ReadPayloadPackage(); + CbObject ActionObj = Action.GetObject(); + + std::span<const CbAttachment> Attachments = Action.GetAttachments(); + + int AttachmentCount = 0; + int NewAttachmentCount = 0; + uint64_t TotalAttachmentBytes = 0; + uint64_t TotalNewBytes = 0; + + for (const CbAttachment& Attachment : Attachments) + { + ZEN_ASSERT(Attachment.IsCompressedBinary()); + + const IoHash DataHash = Attachment.GetHash(); + CompressedBuffer DataView = Attachment.AsCompressedBinary(); + + ZEN_UNUSED(DataHash); + + const uint64_t CompressedSize = DataView.GetCompressedSize(); + + TotalAttachmentBytes += CompressedSize; + ++AttachmentCount; + + const CidStore::InsertResult InsertResult = m_CidStore.AddChunk(DataView); + + if (InsertResult.New) + { + TotalNewBytes += CompressedSize; + ++NewAttachmentCount; + } + } + + ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", + zen::NiceBytes(TotalAttachmentBytes), + AttachmentCount, + zen::NiceBytes(TotalNewBytes), + NewAttachmentCount); + + CbObject Output; + HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); + + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + + default: + break; + } + break; + + default: + break; + } + }, + HttpVerb::kPost); +} + +HttpFunctionService::~HttpFunctionService() +{ +} + +const char* +HttpFunctionService::BaseUri() const +{ + return "/apply/"; +} + +void +HttpFunctionService::HandleRequest(HttpServerRequest& Request) +{ + if (m_Router.HandleRequest(Request) == false) + { + ZEN_WARN("No route found for {0}", Request.RelativeUri()); + } +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + const IoHash ActionId = Action.GetHash(); + + Action.MakeOwned(); + + ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + + if (!EnqueueResult.Success) + { + ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); + return HttpResponseCode::InternalServerError; + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + Writer.AddHash("action", ActionId); + + Object = Writer.Save(); + return HttpResponseCode::OK; +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) +{ + auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); + if (!Status.Success) + { + // throw std::runtime_error(fmt::format("Action {}/{} not found", WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + if (!Completed.Success || Completed.Error.ErrorCode != 0) + { + ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason, + Completed.Error.ErrorCode); + + return HttpResponseCode::InternalServerError; + } + + ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.OutputPackage.GetAttachments().size(), + NiceBytes(Completed.TotalAttachmentBytes), + NiceBytes(Completed.TotalRawAttachmentBytes)); + + Package = std::move(Completed.OutputPackage); + return HttpResponseCode::OK; +} + +} // namespace zen + +#endif // ZEN_WITH_COMPUTE_SERVICES |