diff options
| author | Joe Kirchoff <[email protected]> | 2021-11-15 15:25:17 -0800 |
|---|---|---|
| committer | Joe Kirchoff <[email protected]> | 2021-11-15 15:25:17 -0800 |
| commit | 20874d3801c4ed142dfd5246a850452f24043512 (patch) | |
| tree | e67ee13eededfd5dd9754704e3b037a6be4b981a /zenserver/compute/apply.cpp | |
| parent | Horde Apply: Pass through (diff) | |
| download | zen-20874d3801c4ed142dfd5246a850452f24043512.tar.xz zen-20874d3801c4ed142dfd5246a850452f24043512.zip | |
Use upstream apply with Horde
Diffstat (limited to 'zenserver/compute/apply.cpp')
| -rw-r--r-- | zenserver/compute/apply.cpp | 107 |
1 files changed, 105 insertions, 2 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 053c262c2..d2ae2febc 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -2,6 +2,8 @@ #include "apply.h" +#include <upstream/jupiter.h> +#include <upstream/upstreamapply.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -331,8 +333,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { + m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore); + + CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv, + .DdcNamespace = "default"sv, + .BlobStoreNamespace = "default"sv, + .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv}; + + auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); + m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "workers/{worker}", @@ -488,6 +502,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( + "jobs/{worker}/{action}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -541,7 +579,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, { // We already have everything - CbPackage Output = ExecAction(Worker, RequestObject); + CbObject Output = ExecActionUpstream(Worker, RequestObject); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -600,7 +638,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); - CbPackage Output = ExecAction(Worker, ActionObj); + CbObject Output = ExecActionUpstream(Worker, ActionObj); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -843,4 +881,69 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) return OutputPackage; } +CbObject +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + const IoHash ActionId = Action.GetHash(); + + Action.MakeOwned(); + + ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + + if (!EnqueueResult.Success) + { + throw std::runtime_error("Error enqueuing upstream task"); + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + Writer.AddHash("action", ActionId); + + return std::move(Writer.Save()); +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) +{ + using namespace fmt::literals; + auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); + if (!Status.Success) + { + // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + if (!Completed.Success || Completed.Error.ErrorCode != 0) + { + ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason); + // throw std::runtime_error( + // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str()); + return HttpResponseCode::NotFound; + } + + ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.OutputPackage.GetAttachments().size(), + NiceBytes(Completed.TotalAttachmentBytes), + NiceBytes(Completed.TotalRawAttachmentBytes)); + + Package = std::move(Completed.OutputPackage); + return HttpResponseCode::OK; +} + } // namespace zen |