aboutsummaryrefslogtreecommitdiff
path: root/zenserver/compute/apply.cpp
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2021-11-15 15:25:17 -0800
committerJoe Kirchoff <[email protected]>2021-11-15 15:25:17 -0800
commit20874d3801c4ed142dfd5246a850452f24043512 (patch)
treee67ee13eededfd5dd9754704e3b037a6be4b981a /zenserver/compute/apply.cpp
parentHorde Apply: Pass through (diff)
downloadzen-20874d3801c4ed142dfd5246a850452f24043512.tar.xz
zen-20874d3801c4ed142dfd5246a850452f24043512.zip
Use upstream apply with Horde
Diffstat (limited to 'zenserver/compute/apply.cpp')
-rw-r--r--zenserver/compute/apply.cpp107
1 files changed, 105 insertions, 2 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 053c262c2..d2ae2febc 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -2,6 +2,8 @@
#include "apply.h"
+#include <upstream/jupiter.h>
+#include <upstream/upstreamapply.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -331,8 +333,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
, m_SandboxPath(BaseDir / "scratch")
, m_FunctionPath(BaseDir / "func")
{
+ m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore);
+
+ CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "default"sv,
+ .BlobStoreNamespace = "default"sv,
+ .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv};
+
+ auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore);
+ m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
+ m_UpstreamApply->Initialize();
+
m_Router.AddPattern("job", "([[:digit:]]+)");
m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("action", "([[:xdigit:]]{40})");
m_Router.RegisterRoute(
"workers/{worker}",
@@ -488,6 +502,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
HttpVerb::kGet | HttpVerb::kPost);
m_Router.RegisterRoute(
+ "jobs/{worker}/{action}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"jobs/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -541,7 +579,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
{
// We already have everything
- CbPackage Output = ExecAction(Worker, RequestObject);
+ CbObject Output = ExecActionUpstream(Worker, RequestObject);
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -600,7 +638,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
zen::NiceBytes(TotalNewBytes),
NewAttachmentCount);
- CbPackage Output = ExecAction(Worker, ActionObj);
+ CbObject Output = ExecActionUpstream(Worker, ActionObj);
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -843,4 +881,69 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
return OutputPackage;
}
+CbObject
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+ const IoHash ActionId = Action.GetHash();
+
+ Action.MakeOwned();
+
+ ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)});
+
+ if (!EnqueueResult.Success)
+ {
+ throw std::runtime_error("Error enqueuing upstream task");
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+ Writer.AddHash("action", ActionId);
+
+ return std::move(Writer.Save());
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
+{
+ using namespace fmt::literals;
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
+ if (!Status.Success)
+ {
+ // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+ if (!Completed.Success || Completed.Error.ErrorCode != 0)
+ {
+ ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason);
+ // throw std::runtime_error(
+ // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str());
+ return HttpResponseCode::NotFound;
+ }
+
+ ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.OutputPackage.GetAttachments().size(),
+ NiceBytes(Completed.TotalAttachmentBytes),
+ NiceBytes(Completed.TotalRawAttachmentBytes));
+
+ Package = std::move(Completed.OutputPackage);
+ return HttpResponseCode::OK;
+}
+
} // namespace zen