diff options
Diffstat (limited to 'zenserver')
| -rw-r--r-- | zenserver/compute/apply.cpp | 107 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 20 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 46 |
3 files changed, 157 insertions, 16 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index 4a92b9968..9d8ac47a2 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -4,6 +4,8 @@ #if ZEN_WITH_COMPUTE_SERVICES +#include <upstream/jupiter.h> +#include <upstream/upstreamapply.h> #include <zencore/compactbinary.h> #include <zencore/compactbinarybuilder.h> #include <zencore/compactbinarypackage.h> @@ -335,8 +337,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { + m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore); + + CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv, + .DdcNamespace = "default"sv, + .BlobStoreNamespace = "default"sv, + .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv}; + + auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore); + m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); + m_UpstreamApply->Initialize(); + m_Router.AddPattern("job", "([[:digit:]]+)"); m_Router.AddPattern("worker", "([[:xdigit:]]{40})"); + m_Router.AddPattern("action", "([[:xdigit:]]{40})"); m_Router.RegisterRoute( "workers/{worker}", @@ -503,6 +517,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, HttpVerb::kGet | HttpVerb::kPost); m_Router.RegisterRoute( + "jobs/{worker}/{action}", + [this](HttpRouterRequest& Req) { + HttpServerRequest& HttpReq = Req.ServerRequest(); + const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1)); + const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2)); + + switch (HttpReq.RequestVerb()) + { + case HttpVerb::kGet: + { + CbPackage Output; + HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output); + if (ResponseCode != HttpResponseCode::OK) + { + return HttpReq.WriteResponse(ResponseCode); + } + return HttpReq.WriteResponse(HttpResponseCode::OK, Output); + } + break; + } + }, + HttpVerb::kGet); + + m_Router.RegisterRoute( "jobs/{worker}", [this](HttpRouterRequest& Req) { HttpServerRequest& HttpReq = Req.ServerRequest(); @@ -556,7 +594,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, { // We already have everything - CbPackage Output = ExecAction(Worker, RequestObject); + CbObject Output = ExecActionUpstream(Worker, RequestObject); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -617,7 +655,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, zen::NiceBytes(TotalNewBytes), NewAttachmentCount); - CbPackage Output = ExecAction(Worker, ActionObj); + CbObject Output = ExecActionUpstream(Worker, ActionObj); return HttpReq.WriteResponse(HttpResponseCode::OK, Output); } @@ -866,6 +904,71 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action) return OutputPackage; } +CbObject +HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action) +{ + const IoHash WorkerId = Worker.Descriptor.GetHash(); + const IoHash ActionId = Action.GetHash(); + + Action.MakeOwned(); + + ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString()); + + auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)}); + + if (!EnqueueResult.Success) + { + throw std::runtime_error("Error enqueuing upstream task"); + } + + CbObjectWriter Writer; + Writer.AddHash("worker", WorkerId); + Writer.AddHash("action", ActionId); + + return std::move(Writer.Save()); +} + +HttpResponseCode +HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package) +{ + using namespace fmt::literals; + auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId); + if (!Status.Success) + { + // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str()); + return HttpResponseCode::NotFound; + } + + if (Status.Status.State != UpstreamApplyState::Complete) + { + return HttpResponseCode::Accepted; + } + + GetUpstreamApplyResult& Completed = Status.Status.Result; + if (!Completed.Success || Completed.Error.ErrorCode != 0) + { + ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.StdOut, + Completed.StdErr, + Completed.Error.Reason); + // throw std::runtime_error( + // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str()); + return HttpResponseCode::NotFound; + } + + ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)", + WorkerId.ToHexString(), + ActionId.ToHexString(), + Completed.OutputPackage.GetAttachments().size(), + NiceBytes(Completed.TotalAttachmentBytes), + NiceBytes(Completed.TotalRawAttachmentBytes)); + + Package = std::move(Completed.OutputPackage); + return HttpResponseCode::OK; +} + } // namespace zen #endif // ZEN_WITH_COMPUTE_SERVICES diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 2506e7448..a3e36819d 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -22,6 +22,7 @@ namespace zen { class CasStore; class CidStore; +class UpstreamApply; /** * Lambda style compute function service @@ -36,14 +37,15 @@ public: virtual void HandleRequest(HttpServerRequest& Request) override; private: - spdlog::logger& Log() { return m_Log; } - spdlog::logger& m_Log; - HttpRequestRouter m_Router; - CasStore& m_CasStore; - CidStore& m_CidStore; - std::filesystem::path m_SandboxPath; - std::filesystem::path m_FunctionPath; - std::atomic<int> m_SandboxCount{0}; + spdlog::logger& Log() { return m_Log; } + spdlog::logger& m_Log; + HttpRequestRouter m_Router; + CasStore& m_CasStore; + CidStore& m_CidStore; + std::filesystem::path m_SandboxPath; + std::filesystem::path m_FunctionPath; + std::atomic<int> m_SandboxCount{0}; + std::unique_ptr<UpstreamApply> m_UpstreamApply; struct WorkerDesc { @@ -52,6 +54,8 @@ private: [[nodiscard]] std::filesystem::path CreateNewSandbox(); [[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action); + [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package); RwLock m_WorkerLock; std::unordered_map<IoHash, WorkerDesc> m_WorkerMap; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 3c67779c4..f32b08959 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -282,6 +282,28 @@ namespace detail { Exception = 6, }; + std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome) + { + switch (Outcome) + { + case ComputeTaskOutcome::Success: + return "Success"sv; + case ComputeTaskOutcome::Failed: + return "Failed"sv; + case ComputeTaskOutcome::Cancelled: + return "Cancelled"sv; + case ComputeTaskOutcome::NoResult: + return "NoResult"sv; + case ComputeTaskOutcome::Exipred: + return "Exipred"sv; + case ComputeTaskOutcome::BlobNotFound: + return "BlobNotFound"sv; + case ComputeTaskOutcome::Exception: + return "Exception"sv; + }; + return "Unknown"sv; + } + virtual GetUpstreamApplyUpdatesResult GetUpdates() override { int64_t Bytes{}; @@ -413,8 +435,14 @@ namespace detail { if (Outcome != ComputeTaskOutcome::Success) { + using namespace fmt::literals; const std::string_view Detail = TaskStatus["d"sv].AsString(); - return {.Error{.ErrorCode = -1, .Reason = std::string(Detail)}}; + if (!Detail.empty()) + { + return {.Error{.ErrorCode = -1, + .Reason = "Task {}: {}"_format(ComputeTaskOutcomeToString(Outcome), std::string(Detail))}}; + } + return {.Error{.ErrorCode = -1, .Reason = "Task {}"_format(ComputeTaskOutcomeToString(Outcome))}}; } const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); @@ -770,17 +798,23 @@ namespace detail { const IoHash SandboxHash = Sandbox.GetHash(); Data.Objects[SandboxHash] = std::move(Sandbox); - CbObject Requirements = BuildRequirements("OSFamily == 'Windows'"sv, {}, false); - const IoHash RequirementsId = Requirements.GetHash(); - Data.Objects[RequirementsId] = std::move(Requirements); - Data.RequirementsId = RequirementsId; + { + using namespace fmt::literals; + std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString(); + // TODO: Enable when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) + //CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false); + CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false); + const IoHash RequirementsId = Requirements.GetHash(); + Data.Objects[RequirementsId] = std::move(Requirements); + Data.RequirementsId = RequirementsId; + } CbObject Task = BuildTask(ExecutablePath, {"-Build=build.action"}, Environment, {}, SandboxHash, - RequirementsId, + Data.RequirementsId, {"Build.output", "Outputs"}); const IoHash TaskId = Task.GetHash(); |