diff options
| author | Stefan Boberg <[email protected]> | 2023-05-02 10:01:47 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2023-05-02 10:01:47 +0200 |
| commit | 075d17f8ada47e990fe94606c3d21df409223465 (patch) | |
| tree | e50549b766a2f3c354798a54ff73404217b4c9af /zenserver/compute/function.cpp | |
| parent | fix: bundle shouldn't append content zip to zen (diff) | |
| download | zen-075d17f8ada47e990fe94606c3d21df409223465.tar.xz zen-075d17f8ada47e990fe94606c3d21df409223465.zip | |
moved source directories into `/src` (#264)
* moved source directories into `/src`
* updated bundle.lua for new `src` path
* moved some docs, icon
* removed old test trees
Diffstat (limited to 'zenserver/compute/function.cpp')
| -rw-r--r-- | zenserver/compute/function.cpp | 629 |
1 files changed, 0 insertions, 629 deletions
diff --git a/zenserver/compute/function.cpp b/zenserver/compute/function.cpp deleted file mode 100644 index 493e2666e..000000000 --- a/zenserver/compute/function.cpp +++ /dev/null @@ -1,629 +0,0 @@ -// Copyright Epic Games, Inc. All Rights Reserved. - -#include "function.h" - -#if ZEN_WITH_COMPUTE_SERVICES - -# include <upstream/jupiter.h> -# include <upstream/upstreamapply.h> -# include <upstream/upstreamcache.h> -# include <zencore/compactbinary.h> -# include <zencore/compactbinarybuilder.h> -# include <zencore/compactbinarypackage.h> -# include <zencore/compress.h> -# include <zencore/except.h> -# include <zencore/filesystem.h> -# include <zencore/fmtutils.h> -# include <zencore/iobuffer.h> -# include <zencore/iohash.h> -# include <zencore/scopeguard.h> -# include <zenstore/cidstore.h> - -# include <span> - -using namespace std::literals; - -namespace zen { - -HttpFunctionService::HttpFunctionService(CidStore& InCidStore, - const CloudCacheClientOptions& ComputeOptions, - const CloudCacheClientOptions& StorageOptions, - const UpstreamAuthConfig& ComputeAuthConfig, - const UpstreamAuthConfig& StorageAuthConfig, - AuthMgr& Mgr) -: m_Log(logging::Get("apply")) -, m_CidStore(InCidStore) -{ - m_UpstreamApply = UpstreamApply::Create({}, m_CidStore); - - InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] { - auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, - ComputeAuthConfig, - StorageOptions, - StorageAuthConfig, - m_CidStore, - Mgr); - m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); - m_UpstreamApply->Initialize(); - }}; - - m_Router.AddPattern("job", "([[:digit:]]+)"); - m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); - m_Router.AddPattern("action", "([[:xdigit:]]{40})"); - - m_Router.RegisterRoute( - "ready", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable); - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "workers/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - const WorkerDesc& Desc = It->second; - return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor); - } - } - break; - - case HttpVerb::kPost: - { - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - CbObject FunctionSpec = HttpReq.ReadPayloadObject(); - - // Determine which pieces are missing and need to be transmitted to populate CAS - - HashKeySet ChunkSet; - - FunctionSpec.IterateAttachments([&](CbFieldView Field) { - const IoHash Hash = Field.AsHash(); - ChunkSet.AddHashToSet(Hash); - }); - - // Note that we store executables uncompressed to make it - // more straightforward and efficient to materialize them, hence - // the CAS lookup here instead of CID for the input payloads - - m_CidStore.FilterChunks(ChunkSet); - - if (ChunkSet.IsEmpty()) - { - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec}); - - ZEN_DEBUG("worker {}: all attachments already available", WorkerId); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - else - { - CbObjectWriter ResponseWriter; - ResponseWriter.BeginArray("need"); - - ChunkSet.IterateHashes([&](const IoHash& Hash) { - ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash); - - ResponseWriter.AddHash(Hash); - }); - - ResponseWriter.EndArray(); - - ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize()); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save()); - } - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage FunctionSpec = HttpReq.ReadPayloadPackage(); - - CbObject Obj = FunctionSpec.GetObject(); - - std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer Buffer = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - TotalAttachmentBytes += Buffer.GetCompressedSize(); - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += Buffer.GetCompressedSize(); - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments", - WorkerId, - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - RwLock::ExclusiveLockScope _(m_WorkerLock); - - m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj}); - - return HttpReq.WriteResponse(HttpResponseCode::NoContent); - } - break; - - default: - break; - } - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{job}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - break; - - case HttpVerb::kPost: - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}/{action}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbPackage Output; - HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - } - }, - HttpVerb::kGet); - - m_Router.RegisterRoute( - "simple/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker; - - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Worker = It->second; - } - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - { - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - - { - RwLock::SharedLockScope _(m_WorkerLock); - m_WorkerMap.erase(WorkerId); - } - - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - case HttpVerb::kPost: - { - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output); - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - default: - break; - } - }, - HttpVerb::kGet | HttpVerb::kPost); - - m_Router.RegisterRoute( - "jobs/{worker}", - [this](HttpRouterRequest& Req) { - HttpServerRequest& HttpReq = Req.ServerRequest(); - const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); - - WorkerDesc Worker; - - { - RwLock::SharedLockScope _(m_WorkerLock); - - if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end()) - { - return HttpReq.WriteResponse(HttpResponseCode::NotFound); - } - else - { - Worker = It->second; - } - } - - switch (HttpReq.RequestVerb()) - { - case HttpVerb::kGet: - // TODO: return status of all pending or executing jobs - break; - - case HttpVerb::kPost: - switch (HttpReq.RequestContentType()) - { - case HttpContentType::kCbObject: - { - // This operation takes the proposed job spec and identifies which - // chunks are not present on this server. This list is then returned in - // the "need" list in the response - - IoBuffer Payload = HttpReq.ReadPayload(); - CbObject RequestObject = LoadCompactBinaryObject(Payload); - - std::vector<IoHash> NeedList; - - RequestObject.IterateAttachments([&](CbFieldView Field) { - const IoHash FileHash = Field.AsHash(); - - if (!m_CidStore.ContainsChunk(FileHash)) - { - NeedList.push_back(FileHash); - } - }); - - if (NeedList.empty()) - { - // We already have everything - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - - CbObjectWriter Cbo; - Cbo.BeginArray("need"); - - for (const IoHash& Hash : NeedList) - { - Cbo << Hash; - } - - Cbo.EndArray(); - CbObject Response = Cbo.Save(); - - return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response); - } - break; - - case HttpContentType::kCbPackage: - { - CbPackage Action = HttpReq.ReadPayloadPackage(); - CbObject ActionObj = Action.GetObject(); - - std::span<const CbAttachment> Attachments = Action.GetAttachments(); - - int AttachmentCount = 0; - int NewAttachmentCount = 0; - uint64_t TotalAttachmentBytes = 0; - uint64_t TotalNewBytes = 0; - - for (const CbAttachment& Attachment : Attachments) - { - ZEN_ASSERT(Attachment.IsCompressedBinary()); - - const IoHash DataHash = Attachment.GetHash(); - CompressedBuffer DataView = Attachment.AsCompressedBinary(); - - ZEN_UNUSED(DataHash); - - const uint64_t CompressedSize = DataView.GetCompressedSize(); - - TotalAttachmentBytes += CompressedSize; - ++AttachmentCount; - - const CidStore::InsertResult InsertResult = - m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash); - - if (InsertResult.New) - { - TotalNewBytes += CompressedSize; - ++NewAttachmentCount; - } - } - - ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)", - zen::NiceBytes(TotalAttachmentBytes), - AttachmentCount, - zen::NiceBytes(TotalNewBytes), - NewAttachmentCount); - - CbObject Output; - HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output); - - if (ResponseCode != HttpResponseCode::OK) - { - return HttpReq.WriteResponse(ResponseCode); - } - return HttpReq.WriteResponse(HttpResponseCode::OK, Output); - } - break; - - default: - break; - } - break; - - default: - break; - } - }, - HttpVerb::kPost); -} - -HttpFunctionService::~HttpFunctionService() -{ -} - -const char* -HttpFunctionService::BaseUri() const -{ - return "/apply/"; -} - -void -HttpFunctionService::HandleRequest(HttpServerRequest& Request) -{ - if (m_Router.HandleRequest(Request) == false) - { - ZEN_WARN("No route found for {0}", Request.RelativeUri()); - } -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object) -{ - const IoHash WorkerId = Worker.Descriptor.GetHash(); - - ZEN_INFO("Action {} being processed...", WorkerId.ToHexString()); - - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple}); - if (!EnqueueResult.Success) - { - ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString()); - return HttpResponseCode::InternalServerError; - } - - CbObjectWriter Writer; - Writer.AddHash("worker", WorkerId); - - Object = Writer.Save(); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object) -{ - const static IoHash Empty = CbObject().GetHash(); - auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty); - if (!Status.Success) - { - return HttpResponseCode::NotFound; - } - - if (Status.Status.State != UpstreamApplyState::Complete) - { - return HttpResponseCode::Accepted; - } - - GetUpstreamApplyResult& Completed = Status.Status.Result; - - if (!Completed.Success) - { - ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", - WorkerId.ToHexString(), - Completed.StdOut, - Completed.StdErr, - Completed.Error.Reason, - Completed.Error.ErrorCode); - - if (Completed.Error.ErrorCode == 0) - { - Completed.Error.ErrorCode = -1; - } - if (Completed.StdErr.empty() && !Completed.Error.Reason.empty()) - { - Completed.StdErr = Completed.Error.Reason; - } - } - else - { - ZEN_INFO("Action {} completed with {} files ExitCode={}", - WorkerId.ToHexString(), - Completed.OutputFiles.size(), - Completed.Error.ErrorCode); - } - - CbObjectWriter ResultObject; - - ResultObject.AddString("agent"sv, Completed.Agent); - ResultObject.AddString("detail"sv, Completed.Detail); - ResultObject.AddString("stdout"sv, Completed.StdOut); - ResultObject.AddString("stderr"sv, Completed.StdErr); - ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode); - ResultObject.BeginArray("stats"sv); - for (const auto& Timepoint : Completed.Timepoints) - { - ResultObject.BeginObject(); - ResultObject.AddString("name"sv, Timepoint.first); - ResultObject.AddDateTimeTicks("time"sv, Timepoint.second); - ResultObject.EndObject(); - } - ResultObject.EndArray(); - - ResultObject.BeginArray("files"sv); - for (const auto& File : Completed.OutputFiles) - { - ResultObject.BeginObject(); - ResultObject.AddString("name"sv, File.first.string()); - ResultObject.AddBinary("data"sv, Completed.FileData[File.second]); - ResultObject.EndObject(); - } - ResultObject.EndArray(); - - Object = ResultObject.Save(); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object) -{ - const IoHash WorkerId = Worker.Descriptor.GetHash(); - const IoHash ActionId = Action.GetHash(); - - Action.MakeOwned(); - - ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); - - auto EnqueueResult = m_UpstreamApply->EnqueueUpstream( - {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset}); - - if (!EnqueueResult.Success) - { - ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString()); - return HttpResponseCode::InternalServerError; - } - - CbObjectWriter Writer; - Writer.AddHash("worker", WorkerId); - Writer.AddHash("action", ActionId); - - Object = Writer.Save(); - return HttpResponseCode::OK; -} - -HttpResponseCode -HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) -{ - auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); - if (!Status.Success) - { - return HttpResponseCode::NotFound; - } - - if (Status.Status.State != UpstreamApplyState::Complete) - { - return HttpResponseCode::Accepted; - } - - GetUpstreamApplyResult& Completed = Status.Status.Result; - if (!Completed.Success || Completed.Error.ErrorCode != 0) - { - ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.StdOut, - Completed.StdErr, - Completed.Error.Reason, - Completed.Error.ErrorCode); - - return HttpResponseCode::InternalServerError; - } - - ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", - WorkerId.ToHexString(), - ActionId.ToHexString(), - Completed.OutputPackage.GetAttachments().size(), - NiceBytes(Completed.TotalAttachmentBytes), - NiceBytes(Completed.TotalRawAttachmentBytes)); - - Package = std::move(Completed.OutputPackage); - return HttpResponseCode::OK; -} - -} // namespace zen - -#endif // ZEN_WITH_COMPUTE_SERVICES |