diff options
| author | Joe Kirchoff <[email protected]> | 2021-11-15 15:25:17 -0800 |
|---|---|---|
| committer | Joe Kirchoff <[email protected]> | 2021-11-15 15:25:17 -0800 |
| commit | 20874d3801c4ed142dfd5246a850452f24043512 (patch) | |
| tree | e67ee13eededfd5dd9754704e3b037a6be4b981a | |
| parent | Horde Apply: Pass through (diff) | |
| download | zen-20874d3801c4ed142dfd5246a850452f24043512.tar.xz zen-20874d3801c4ed142dfd5246a850452f24043512.zip | |
Use upstream apply with Horde
| -rw-r--r-- | zenserver/compute/apply.cpp | 107 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 20 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 27 |
3 files changed, 127 insertions, 27 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 053c262c2..d2ae2febc 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -2,6 +2,8 @@ #include "apply.h" +#include <upstream/jupiter.h> +#include <upstream/upstreamapply.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -331,8 +333,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { + m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore); + + CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv, + .DdcNamespace = "default"sv, + .BlobStoreNamespace = "default"sv, + .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv}; + + auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); + m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "workers/{worker}", @@ -488,6 +502,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( + "jobs/{worker}/{action}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -541,7 +579,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, { // We already have everything - CbPackage Output = ExecAction(Worker, RequestObject); + CbObject Output = ExecActionUpstream(Worker, RequestObject); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -600,7 +638,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); - CbPackage Output = ExecAction(Worker, ActionObj); + CbObject Output = ExecActionUpstream(Worker, ActionObj); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -843,4 +881,69 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) return OutputPackage; } +CbObject +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + const IoHash ActionId = Action.GetHash(); + + Action.MakeOwned(); + + ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + + if (!EnqueueResult.Success) + { + throw std::runtime_error("Error enqueuing upstream task"); + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + Writer.AddHash("action", ActionId); + + return std::move(Writer.Save()); +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) +{ + using namespace fmt::literals; + auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); + if (!Status.Success) + { + // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + if (!Completed.Success || Completed.Error.ErrorCode != 0) + { + ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason); + // throw std::runtime_error( + // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str()); + return HttpResponseCode::NotFound; + } + + ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.OutputPackage.GetAttachments().size(), + NiceBytes(Completed.TotalAttachmentBytes), + NiceBytes(Completed.TotalRawAttachmentBytes)); + + Package = std::move(Completed.OutputPackage); + return HttpResponseCode::OK; +} + } // namespace zen diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 86b262213..15cda4750 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -14,6 +14,7 @@ namespace zen { class CasStore; class CidStore; +class UpstreamApply; /** * Lambda style compute function service @@ -28,14 +29,15 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; private: - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - HttpRequestRouter m_Router; - CasStore& m_CasStore; - CidStore& m_CidStore; - std::filesystem::path m_SandboxPath; - std::filesystem::path m_FunctionPath; - std::atomic<int> m_SandboxCount{0}; + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + CasStore& m_CasStore; + CidStore& m_CidStore; + std::filesystem::path m_SandboxPath; + std::filesystem::path m_FunctionPath; + std::atomic<int> m_SandboxCount{0}; + std::unique_ptr<UpstreamApply> m_UpstreamApply; struct WorkerDesc { @@ -44,6 +46,8 @@ private: [[nodiscard]] std::filesystem::path CreateNewSandbox(); [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); RwLock m_WorkerLock; std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index a9b0e7fd0..038353683 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -439,8 +439,7 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = "Task {}: {}"_format(ComputeTaskOutcomeToString(Outcome), std::string(Detail))}}; } - return { - .Error{.ErrorCode = -1, .Reason = "Task {}"_format(ComputeTaskOutcomeToString(Outcome))}}; + return {.Error{.ErrorCode = -1, .Reason = "Task {}"_format(ComputeTaskOutcomeToString(Outcome))}}; } const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); @@ -796,29 +795,23 @@ namespace detail { const IoHash SandboxHash = Sandbox.GetHash(); Data.Objects[SandboxHash] = std::move(Sandbox); - - std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); - CbObject Requirements; - if (HostPlatform == "Win64"sv) - { - Requirements = BuildRequirements("OSFamily == 'Windows'"sv, {}, false); - } - else { - Log().warn("process apply upstream FAILED, unsupported Host Platform '{}'", HostPlatform); - return false; + using namespace fmt::literals; + std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); + // TODO: Enable when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) + //CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false); + CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; } - const IoHash RequirementsId = Requirements.GetHash(); - Data.Objects[RequirementsId] = std::move(Requirements); - Data.RequirementsId = RequirementsId; - CbObject Task = BuildTask(ExecutablePath, {"-Build=build.action"}, Environment, {}, SandboxHash, - RequirementsId, + Data.RequirementsId, {"Build.output", "Outputs"}); const IoHash TaskId = Task.GetHash(); |