aboutsummaryrefslogtreecommitdiff
path: root/zenserver
diff options
context:
space:
mode:
Diffstat (limited to 'zenserver')
-rw-r--r--zenserver/compute/apply.cpp107
-rw-r--r--zenserver/compute/apply.h20
-rw-r--r--zenserver/upstream/upstreamapply.cpp46
3 files changed, 157 insertions, 16 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index 4a92b9968..9d8ac47a2 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -4,6 +4,8 @@
#if ZEN_WITH_COMPUTE_SERVICES
+#include <upstream/jupiter.h>
+#include <upstream/upstreamapply.h>
#include <zencore/compactbinary.h>
#include <zencore/compactbinarybuilder.h>
#include <zencore/compactbinarypackage.h>
@@ -335,8 +337,20 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
, m_SandboxPath(BaseDir / "scratch")
, m_FunctionPath(BaseDir / "func")
{
+ m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore);
+
+ CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv,
+ .DdcNamespace = "default"sv,
+ .BlobStoreNamespace = "default"sv,
+ .AccessToken = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079"sv};
+
+ auto HordeUpstreamEndpoint = MakeHordeUpstreamEndpoint(Options, m_CasStore, m_CidStore);
+ m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
+ m_UpstreamApply->Initialize();
+
m_Router.AddPattern("job", "([[:digit:]]+)");
m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
+ m_Router.AddPattern("action", "([[:xdigit:]]{40})");
m_Router.RegisterRoute(
"workers/{worker}",
@@ -503,6 +517,30 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
HttpVerb::kGet | HttpVerb::kPost);
m_Router.RegisterRoute(
+ "jobs/{worker}/{action}",
+ [this](HttpRouterRequest& Req) {
+ HttpServerRequest& HttpReq = Req.ServerRequest();
+ const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
+ const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
+
+ switch (HttpReq.RequestVerb())
+ {
+ case HttpVerb::kGet:
+ {
+ CbPackage Output;
+ HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
+ if (ResponseCode != HttpResponseCode::OK)
+ {
+ return HttpReq.WriteResponse(ResponseCode);
+ }
+ return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
+ }
+ break;
+ }
+ },
+ HttpVerb::kGet);
+
+ m_Router.RegisterRoute(
"jobs/{worker}",
[this](HttpRouterRequest& Req) {
HttpServerRequest& HttpReq = Req.ServerRequest();
@@ -556,7 +594,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
{
// We already have everything
- CbPackage Output = ExecAction(Worker, RequestObject);
+ CbObject Output = ExecActionUpstream(Worker, RequestObject);
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -617,7 +655,7 @@ HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore,
zen::NiceBytes(TotalNewBytes),
NewAttachmentCount);
- CbPackage Output = ExecAction(Worker, ActionObj);
+ CbObject Output = ExecActionUpstream(Worker, ActionObj);
return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
}
@@ -866,6 +904,71 @@ HttpFunctionService::ExecAction(const WorkerDesc& Worker, CbObject Action)
return OutputPackage;
}
+CbObject
+HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action)
+{
+ const IoHash WorkerId = Worker.Descriptor.GetHash();
+ const IoHash ActionId = Action.GetHash();
+
+ Action.MakeOwned();
+
+ ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
+
+ auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action)});
+
+ if (!EnqueueResult.Success)
+ {
+ throw std::runtime_error("Error enqueuing upstream task");
+ }
+
+ CbObjectWriter Writer;
+ Writer.AddHash("worker", WorkerId);
+ Writer.AddHash("action", ActionId);
+
+ return std::move(Writer.Save());
+}
+
+HttpResponseCode
+HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
+{
+ using namespace fmt::literals;
+ auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
+ if (!Status.Success)
+ {
+ // throw std::runtime_error("Action {}/{} not found"_format(WorkerId.ToHexString(), ActionId.ToHexString()).c_str());
+ return HttpResponseCode::NotFound;
+ }
+
+ if (Status.Status.State != UpstreamApplyState::Complete)
+ {
+ return HttpResponseCode::Accepted;
+ }
+
+ GetUpstreamApplyResult& Completed = Status.Status.Result;
+ if (!Completed.Success || Completed.Error.ErrorCode != 0)
+ {
+ ZEN_ERROR("Action {}/{} failed:\n stdout: {} \n stderr: {} \n reason: {}",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.StdOut,
+ Completed.StdErr,
+ Completed.Error.Reason);
+ // throw std::runtime_error(
+ // "Action {}/{} failed: {}"_format(WorkerId.ToHexString(), ActionId.ToHexString(), Completed.Error.Reason).c_str());
+ return HttpResponseCode::NotFound;
+ }
+
+ ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
+ WorkerId.ToHexString(),
+ ActionId.ToHexString(),
+ Completed.OutputPackage.GetAttachments().size(),
+ NiceBytes(Completed.TotalAttachmentBytes),
+ NiceBytes(Completed.TotalRawAttachmentBytes));
+
+ Package = std::move(Completed.OutputPackage);
+ return HttpResponseCode::OK;
+}
+
} // namespace zen
#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index 2506e7448..a3e36819d 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -22,6 +22,7 @@ namespace zen {
class CasStore;
class CidStore;
+class UpstreamApply;
/**
* Lambda style compute function service
@@ -36,14 +37,15 @@ public:
virtual void HandleRequest(HttpServerRequest& Request) override;
private:
- spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- HttpRequestRouter m_Router;
- CasStore& m_CasStore;
- CidStore& m_CidStore;
- std::filesystem::path m_SandboxPath;
- std::filesystem::path m_FunctionPath;
- std::atomic<int> m_SandboxCount{0};
+ spdlog::logger& Log() { return m_Log; }
+ spdlog::logger& m_Log;
+ HttpRequestRouter m_Router;
+ CasStore& m_CasStore;
+ CidStore& m_CidStore;
+ std::filesystem::path m_SandboxPath;
+ std::filesystem::path m_FunctionPath;
+ std::atomic<int> m_SandboxCount{0};
+ std::unique_ptr<UpstreamApply> m_UpstreamApply;
struct WorkerDesc
{
@@ -52,6 +54,8 @@ private:
[[nodiscard]] std::filesystem::path CreateNewSandbox();
[[nodiscard]] CbPackage ExecAction(const WorkerDesc& Worker, CbObject Action);
+ [[nodiscard]] CbObject ExecActionUpstream(const WorkerDesc& Worker, CbObject Action);
+ [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package);
RwLock m_WorkerLock;
std::unordered_map<IoHash, WorkerDesc> m_WorkerMap;
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 3c67779c4..f32b08959 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -282,6 +282,28 @@ namespace detail {
Exception = 6,
};
+ std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
+ {
+ switch (Outcome)
+ {
+ case ComputeTaskOutcome::Success:
+ return "Success"sv;
+ case ComputeTaskOutcome::Failed:
+ return "Failed"sv;
+ case ComputeTaskOutcome::Cancelled:
+ return "Cancelled"sv;
+ case ComputeTaskOutcome::NoResult:
+ return "NoResult"sv;
+ case ComputeTaskOutcome::Exipred:
+ return "Exipred"sv;
+ case ComputeTaskOutcome::BlobNotFound:
+ return "BlobNotFound"sv;
+ case ComputeTaskOutcome::Exception:
+ return "Exception"sv;
+ };
+ return "Unknown"sv;
+ }
+
virtual GetUpstreamApplyUpdatesResult GetUpdates() override
{
int64_t Bytes{};
@@ -413,8 +435,14 @@ namespace detail {
if (Outcome != ComputeTaskOutcome::Success)
{
+ using namespace fmt::literals;
const std::string_view Detail = TaskStatus["d"sv].AsString();
- return {.Error{.ErrorCode = -1, .Reason = std::string(Detail)}};
+ if (!Detail.empty())
+ {
+ return {.Error{.ErrorCode = -1,
+ .Reason = "Task {}: {}"_format(ComputeTaskOutcomeToString(Outcome), std::string(Detail))}};
+ }
+ return {.Error{.ErrorCode = -1, .Reason = "Task {}"_format(ComputeTaskOutcomeToString(Outcome))}};
}
const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment();
@@ -770,17 +798,23 @@ namespace detail {
const IoHash SandboxHash = Sandbox.GetHash();
Data.Objects[SandboxHash] = std::move(Sandbox);
- CbObject Requirements = BuildRequirements("OSFamily == 'Windows'"sv, {}, false);
- const IoHash RequirementsId = Requirements.GetHash();
- Data.Objects[RequirementsId] = std::move(Requirements);
- Data.RequirementsId = RequirementsId;
+ {
+ using namespace fmt::literals;
+ std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
+ // TODO: Enable when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
+ //CbObject Requirements = BuildRequirements("OSFamily == '{}'"_format(HostPlatform), {}, false);
+ CbObject Requirements = BuildRequirements("OSFamily == 'Windows'", {}, false);
+ const IoHash RequirementsId = Requirements.GetHash();
+ Data.Objects[RequirementsId] = std::move(Requirements);
+ Data.RequirementsId = RequirementsId;
+ }
CbObject Task = BuildTask(ExecutablePath,
{"-Build=build.action"},
Environment,
{},
SandboxHash,
- RequirementsId,
+ Data.RequirementsId,
{"Build.output", "Outputs"});
const IoHash TaskId = Task.GetHash();