aboutsummaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorStefan Boberg <[email protected]>2023-10-11 10:03:54 +0200
committerGitHub <[email protected]>2023-10-11 10:03:54 +0200
commit1ad940fafb5e3eae7b308dd290b6de6ade69a3eb (patch)
tree1d1efe188f45bc422292e75c6784929765882771 /src
parentfix clang-format whoopsie (diff)
downloadzen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.tar.xz
zen-1ad940fafb5e3eae7b308dd290b6de6ade69a3eb.zip
remove legacy compute interfaces (#461)
* removed legacy compute code, which will be replaced with a new implementation in the future * also updated references to Jupiter storage
Diffstat (limited to 'src')
-rw-r--r--src/zenserver/compute/function.cpp629
-rw-r--r--src/zenserver/compute/function.h73
-rw-r--r--src/zenserver/config.cpp150
-rw-r--r--src/zenserver/config.h66
-rw-r--r--src/zenserver/upstream/hordecompute.cpp1457
-rw-r--r--src/zenserver/upstream/jupiter.cpp129
-rw-r--r--src/zenserver/upstream/jupiter.h3
-rw-r--r--src/zenserver/upstream/upstreamapply.cpp459
-rw-r--r--src/zenserver/upstream/upstreamapply.h192
-rw-r--r--src/zenserver/upstream/upstreamcache.cpp2
-rw-r--r--src/zenserver/zenserver.cpp79
-rw-r--r--src/zenserver/zenserver.h14
12 files changed, 41 insertions, 3212 deletions
diff --git a/src/zenserver/compute/function.cpp b/src/zenserver/compute/function.cpp
deleted file mode 100644
index 493e2666e..000000000
--- a/src/zenserver/compute/function.cpp
+++ /dev/null
@@ -1,629 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "function.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <upstream/jupiter.h>
-# include <upstream/upstreamapply.h>
-# include <upstream/upstreamcache.h>
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compress.h>
-# include <zencore/except.h>
-# include <zencore/filesystem.h>
-# include <zencore/fmtutils.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/scopeguard.h>
-# include <zenstore/cidstore.h>
-
-# include <span>
-
-using namespace std::literals;
-
-namespace zen {
-
-HttpFunctionService::HttpFunctionService(CidStore& InCidStore,
- const CloudCacheClientOptions& ComputeOptions,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const UpstreamAuthConfig& StorageAuthConfig,
- AuthMgr& Mgr)
-: m_Log(logging::Get("apply"))
-, m_CidStore(InCidStore)
-{
- m_UpstreamApply = UpstreamApply::Create({}, m_CidStore);
-
- InitializeThread = std::thread{[this, ComputeOptions, StorageOptions, ComputeAuthConfig, StorageAuthConfig, &Mgr] {
- auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
- ComputeAuthConfig,
- StorageOptions,
- StorageAuthConfig,
- m_CidStore,
- Mgr);
- m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
- m_UpstreamApply->Initialize();
- }};
-
- m_Router.AddPattern("job", "([[:digit:]]+)");
- m_Router.AddPattern("worker", "([[:xdigit:]]{40})");
- m_Router.AddPattern("action", "([[:xdigit:]]{40})");
-
- m_Router.RegisterRoute(
- "ready",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- return HttpReq.WriteResponse(m_UpstreamApply->IsHealthy() ? HttpResponseCode::OK : HttpResponseCode::ServiceUnavailable);
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "workers/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- const WorkerDesc& Desc = It->second;
- return HttpReq.WriteResponse(HttpResponseCode::OK, Desc.Descriptor);
- }
- }
- break;
-
- case HttpVerb::kPost:
- {
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- CbObject FunctionSpec = HttpReq.ReadPayloadObject();
-
- // Determine which pieces are missing and need to be transmitted to populate CAS
-
- HashKeySet ChunkSet;
-
- FunctionSpec.IterateAttachments([&](CbFieldView Field) {
- const IoHash Hash = Field.AsHash();
- ChunkSet.AddHashToSet(Hash);
- });
-
- // Note that we store executables uncompressed to make it
- // more straightforward and efficient to materialize them, hence
- // the CAS lookup here instead of CID for the input payloads
-
- m_CidStore.FilterChunks(ChunkSet);
-
- if (ChunkSet.IsEmpty())
- {
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{FunctionSpec});
-
- ZEN_DEBUG("worker {}: all attachments already available", WorkerId);
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- else
- {
- CbObjectWriter ResponseWriter;
- ResponseWriter.BeginArray("need");
-
- ChunkSet.IterateHashes([&](const IoHash& Hash) {
- ZEN_DEBUG("worker {}: need chunk {}", WorkerId, Hash);
-
- ResponseWriter.AddHash(Hash);
- });
-
- ResponseWriter.EndArray();
-
- ZEN_DEBUG("worker {}: need {} attachments", WorkerId, ChunkSet.GetSize());
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, ResponseWriter.Save());
- }
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage FunctionSpec = HttpReq.ReadPayloadPackage();
-
- CbObject Obj = FunctionSpec.GetObject();
-
- std::span<const CbAttachment> Attachments = FunctionSpec.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer Buffer = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
- TotalAttachmentBytes += Buffer.GetCompressedSize();
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(Buffer.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += Buffer.GetCompressedSize();
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("worker {}: {} in {} attachments, {} in {} new attachments",
- WorkerId,
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- RwLock::ExclusiveLockScope _(m_WorkerLock);
-
- m_WorkerMap.insert_or_assign(WorkerId, WorkerDesc{.Descriptor = Obj});
-
- return HttpReq.WriteResponse(HttpResponseCode::NoContent);
- }
- break;
-
- default:
- break;
- }
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{job}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- break;
-
- case HttpVerb::kPost:
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{worker}/{action}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
- const IoHash ActionId = IoHash::FromHexString(Req.GetCapture(2));
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbPackage Output;
- HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, ActionId, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
- }
- },
- HttpVerb::kGet);
-
- m_Router.RegisterRoute(
- "simple/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- WorkerDesc Worker;
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- Worker = It->second;
- }
- }
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- {
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstreamResult(WorkerId, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
- m_WorkerMap.erase(WorkerId);
- }
-
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- case HttpVerb::kPost:
- {
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, Output);
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kGet | HttpVerb::kPost);
-
- m_Router.RegisterRoute(
- "jobs/{worker}",
- [this](HttpRouterRequest& Req) {
- HttpServerRequest& HttpReq = Req.ServerRequest();
- const IoHash WorkerId = IoHash::FromHexString(Req.GetCapture(1));
-
- WorkerDesc Worker;
-
- {
- RwLock::SharedLockScope _(m_WorkerLock);
-
- if (auto It = m_WorkerMap.find(WorkerId); It == m_WorkerMap.end())
- {
- return HttpReq.WriteResponse(HttpResponseCode::NotFound);
- }
- else
- {
- Worker = It->second;
- }
- }
-
- switch (HttpReq.RequestVerb())
- {
- case HttpVerb::kGet:
- // TODO: return status of all pending or executing jobs
- break;
-
- case HttpVerb::kPost:
- switch (HttpReq.RequestContentType())
- {
- case HttpContentType::kCbObject:
- {
- // This operation takes the proposed job spec and identifies which
- // chunks are not present on this server. This list is then returned in
- // the "need" list in the response
-
- IoBuffer Payload = HttpReq.ReadPayload();
- CbObject RequestObject = LoadCompactBinaryObject(Payload);
-
- std::vector<IoHash> NeedList;
-
- RequestObject.IterateAttachments([&](CbFieldView Field) {
- const IoHash FileHash = Field.AsHash();
-
- if (!m_CidStore.ContainsChunk(FileHash))
- {
- NeedList.push_back(FileHash);
- }
- });
-
- if (NeedList.empty())
- {
- // We already have everything
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, RequestObject, Output);
-
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
-
- CbObjectWriter Cbo;
- Cbo.BeginArray("need");
-
- for (const IoHash& Hash : NeedList)
- {
- Cbo << Hash;
- }
-
- Cbo.EndArray();
- CbObject Response = Cbo.Save();
-
- return HttpReq.WriteResponse(HttpResponseCode::NotFound, Response);
- }
- break;
-
- case HttpContentType::kCbPackage:
- {
- CbPackage Action = HttpReq.ReadPayloadPackage();
- CbObject ActionObj = Action.GetObject();
-
- std::span<const CbAttachment> Attachments = Action.GetAttachments();
-
- int AttachmentCount = 0;
- int NewAttachmentCount = 0;
- uint64_t TotalAttachmentBytes = 0;
- uint64_t TotalNewBytes = 0;
-
- for (const CbAttachment& Attachment : Attachments)
- {
- ZEN_ASSERT(Attachment.IsCompressedBinary());
-
- const IoHash DataHash = Attachment.GetHash();
- CompressedBuffer DataView = Attachment.AsCompressedBinary();
-
- ZEN_UNUSED(DataHash);
-
- const uint64_t CompressedSize = DataView.GetCompressedSize();
-
- TotalAttachmentBytes += CompressedSize;
- ++AttachmentCount;
-
- const CidStore::InsertResult InsertResult =
- m_CidStore.AddChunk(DataView.GetCompressed().Flatten().AsIoBuffer(), DataHash);
-
- if (InsertResult.New)
- {
- TotalNewBytes += CompressedSize;
- ++NewAttachmentCount;
- }
- }
-
- ZEN_DEBUG("new action: {} in {} attachments. {} new ({} attachments)",
- zen::NiceBytes(TotalAttachmentBytes),
- AttachmentCount,
- zen::NiceBytes(TotalNewBytes),
- NewAttachmentCount);
-
- CbObject Output;
- HttpResponseCode ResponseCode = ExecActionUpstream(Worker, ActionObj, Output);
-
- if (ResponseCode != HttpResponseCode::OK)
- {
- return HttpReq.WriteResponse(ResponseCode);
- }
- return HttpReq.WriteResponse(HttpResponseCode::OK, Output);
- }
- break;
-
- default:
- break;
- }
- break;
-
- default:
- break;
- }
- },
- HttpVerb::kPost);
-}
-
-HttpFunctionService::~HttpFunctionService()
-{
-}
-
-const char*
-HttpFunctionService::BaseUri() const
-{
- return "/apply/";
-}
-
-void
-HttpFunctionService::HandleRequest(HttpServerRequest& Request)
-{
- if (m_Router.HandleRequest(Request) == false)
- {
- ZEN_WARN("No route found for {0}", Request.RelativeUri());
- }
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object)
-{
- const IoHash WorkerId = Worker.Descriptor.GetHash();
-
- ZEN_INFO("Action {} being processed...", WorkerId.ToHexString());
-
- auto EnqueueResult = m_UpstreamApply->EnqueueUpstream({.WorkerDescriptor = Worker.Descriptor, .Type = UpstreamApplyType::Simple});
- if (!EnqueueResult.Success)
- {
- ZEN_ERROR("Error enqueuing upstream Action {}", WorkerId.ToHexString());
- return HttpResponseCode::InternalServerError;
- }
-
- CbObjectWriter Writer;
- Writer.AddHash("worker", WorkerId);
-
- Object = Writer.Save();
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object)
-{
- const static IoHash Empty = CbObject().GetHash();
- auto Status = m_UpstreamApply->GetStatus(WorkerId, Empty);
- if (!Status.Success)
- {
- return HttpResponseCode::NotFound;
- }
-
- if (Status.Status.State != UpstreamApplyState::Complete)
- {
- return HttpResponseCode::Accepted;
- }
-
- GetUpstreamApplyResult& Completed = Status.Status.Result;
-
- if (!Completed.Success)
- {
- ZEN_ERROR("Action {} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
- WorkerId.ToHexString(),
- Completed.StdOut,
- Completed.StdErr,
- Completed.Error.Reason,
- Completed.Error.ErrorCode);
-
- if (Completed.Error.ErrorCode == 0)
- {
- Completed.Error.ErrorCode = -1;
- }
- if (Completed.StdErr.empty() && !Completed.Error.Reason.empty())
- {
- Completed.StdErr = Completed.Error.Reason;
- }
- }
- else
- {
- ZEN_INFO("Action {} completed with {} files ExitCode={}",
- WorkerId.ToHexString(),
- Completed.OutputFiles.size(),
- Completed.Error.ErrorCode);
- }
-
- CbObjectWriter ResultObject;
-
- ResultObject.AddString("agent"sv, Completed.Agent);
- ResultObject.AddString("detail"sv, Completed.Detail);
- ResultObject.AddString("stdout"sv, Completed.StdOut);
- ResultObject.AddString("stderr"sv, Completed.StdErr);
- ResultObject.AddInteger("exitcode"sv, Completed.Error.ErrorCode);
- ResultObject.BeginArray("stats"sv);
- for (const auto& Timepoint : Completed.Timepoints)
- {
- ResultObject.BeginObject();
- ResultObject.AddString("name"sv, Timepoint.first);
- ResultObject.AddDateTimeTicks("time"sv, Timepoint.second);
- ResultObject.EndObject();
- }
- ResultObject.EndArray();
-
- ResultObject.BeginArray("files"sv);
- for (const auto& File : Completed.OutputFiles)
- {
- ResultObject.BeginObject();
- ResultObject.AddString("name"sv, File.first.string());
- ResultObject.AddBinary("data"sv, Completed.FileData[File.second]);
- ResultObject.EndObject();
- }
- ResultObject.EndArray();
-
- Object = ResultObject.Save();
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object)
-{
- const IoHash WorkerId = Worker.Descriptor.GetHash();
- const IoHash ActionId = Action.GetHash();
-
- Action.MakeOwned();
-
- ZEN_INFO("Action {}/{} being processed...", WorkerId.ToHexString(), ActionId.ToHexString());
-
- auto EnqueueResult = m_UpstreamApply->EnqueueUpstream(
- {.WorkerDescriptor = Worker.Descriptor, .Action = std::move(Action), .Type = UpstreamApplyType::Asset});
-
- if (!EnqueueResult.Success)
- {
- ZEN_ERROR("Error enqueuing upstream Action {}/{}", WorkerId.ToHexString(), ActionId.ToHexString());
- return HttpResponseCode::InternalServerError;
- }
-
- CbObjectWriter Writer;
- Writer.AddHash("worker", WorkerId);
- Writer.AddHash("action", ActionId);
-
- Object = Writer.Save();
- return HttpResponseCode::OK;
-}
-
-HttpResponseCode
-HttpFunctionService::ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package)
-{
- auto Status = m_UpstreamApply->GetStatus(WorkerId, ActionId);
- if (!Status.Success)
- {
- return HttpResponseCode::NotFound;
- }
-
- if (Status.Status.State != UpstreamApplyState::Complete)
- {
- return HttpResponseCode::Accepted;
- }
-
- GetUpstreamApplyResult& Completed = Status.Status.Result;
- if (!Completed.Success || Completed.Error.ErrorCode != 0)
- {
- ZEN_ERROR("Action {}/{} failed:\n stdout: {}\n stderr: {}\n reason: {}\n errorcode: {}",
- WorkerId.ToHexString(),
- ActionId.ToHexString(),
- Completed.StdOut,
- Completed.StdErr,
- Completed.Error.Reason,
- Completed.Error.ErrorCode);
-
- return HttpResponseCode::InternalServerError;
- }
-
- ZEN_INFO("Action {}/{} completed with {} attachments ({} compressed, {} uncompressed)",
- WorkerId.ToHexString(),
- ActionId.ToHexString(),
- Completed.OutputPackage.GetAttachments().size(),
- NiceBytes(Completed.TotalAttachmentBytes),
- NiceBytes(Completed.TotalRawAttachmentBytes));
-
- Package = std::move(Completed.OutputPackage);
- return HttpResponseCode::OK;
-}
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/compute/function.h b/src/zenserver/compute/function.h
deleted file mode 100644
index 650cee757..000000000
--- a/src/zenserver/compute/function.h
+++ /dev/null
@@ -1,73 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#include <zencore/zencore.h>
-
-#if !defined(ZEN_WITH_COMPUTE_SERVICES)
-# define ZEN_WITH_COMPUTE_SERVICES 1
-#endif
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <zencore/compactbinary.h>
-# include <zencore/iohash.h>
-# include <zencore/logging.h>
-# include <zenhttp/httpserver.h>
-
-# include <filesystem>
-# include <unordered_map>
-
-namespace zen {
-
-class CidStore;
-class UpstreamApply;
-class CloudCacheClient;
-class AuthMgr;
-
-struct UpstreamAuthConfig;
-struct CloudCacheClientOptions;
-
-/**
- * Lambda style compute function service
- */
-class HttpFunctionService : public HttpService
-{
-public:
- HttpFunctionService(CidStore& InCidStore,
- const CloudCacheClientOptions& ComputeOptions,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const UpstreamAuthConfig& StorageAuthConfig,
- AuthMgr& Mgr);
- ~HttpFunctionService();
-
- virtual const char* BaseUri() const override;
- virtual void HandleRequest(HttpServerRequest& Request) override;
-
-private:
- std::thread InitializeThread;
- spdlog::logger& Log() { return m_Log; }
- spdlog::logger& m_Log;
- HttpRequestRouter m_Router;
- CidStore& m_CidStore;
- std::unique_ptr<UpstreamApply> m_UpstreamApply;
-
- struct WorkerDesc
- {
- CbObject Descriptor;
- };
-
- [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject& Object);
- [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, CbObject& Object);
-
- [[nodiscard]] HttpResponseCode ExecActionUpstream(const WorkerDesc& Worker, CbObject Action, CbObject& Object);
- [[nodiscard]] HttpResponseCode ExecActionUpstreamResult(const IoHash& WorkerId, const IoHash& ActionId, CbPackage& Package);
-
- RwLock m_WorkerLock;
- std::unordered_map<IoHash, WorkerDesc> m_WorkerMap;
-};
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/config.cpp b/src/zenserver/config.cpp
index 342f41b68..85db7bade 100644
--- a/src/zenserver/config.cpp
+++ b/src/zenserver/config.cpp
@@ -855,57 +855,6 @@ ParseConfigFile(const std::filesystem::path& Path,
LuaOptions.AddOption("cache.upstream.zen.dns"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Dns);
LuaOptions.AddOption("cache.upstream.zen.url"sv, ServerOptions.UpstreamCacheConfig.ZenConfig.Urls);
- ////// exec
- LuaOptions.AddOption("exec.enable"sv, ServerOptions.ExecServiceEnabled);
-
- ////// compute
- LuaOptions.AddOption("compute.enable"sv, ServerOptions.ComputeServiceEnabled);
-
- LuaOptions.AddOption("compute.upstream.horde.name"sv, ServerOptions.UpstreamCacheConfig.HordeConfig.Name);
- LuaOptions.AddOption("compute.upstream.horde.url"sv, ServerOptions.UpstreamCacheConfig.HordeConfig.Url, "upstream-horde-url"sv);
- LuaOptions.AddOption("compute.upstream.horde.oauthprovider"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl,
- "upstream-horde-oauth-url"sv);
- LuaOptions.AddOption("compute.upstream.horde.oauthclientid"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId,
- "upstream-horde-oauth-clientid"sv);
- LuaOptions.AddOption("compute.upstream.horde.oauthclientsecret"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret,
- "upstream-horde-oauth-clientsecret"sv);
- LuaOptions.AddOption("compute.upstream.horde.openidprovider"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider,
- "upstream-horde-openid-provider"sv);
- LuaOptions.AddOption("compute.upstream.horde.token"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken,
- "upstream-horde-token"sv);
- LuaOptions.AddOption("compute.upstream.horde.cluster"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster,
- "upstream-horde-cluster"sv);
- LuaOptions.AddOption("compute.upstream.horde.namespace"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace,
- "upstream-horde-namespace"sv);
-
- ////// compute storage
- LuaOptions.AddOption("compute.upstream.storage.name"sv, ServerOptions.UpstreamCacheConfig.HordeConfig.Name);
- LuaOptions.AddOption("compute.upstream.storage.url"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.Url,
- "upstream-horde-storage-oauth-url"sv);
- LuaOptions.AddOption("compute.upstream.storage.oauthprovider"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl,
- "upstream-horde-storage-oauth-url"sv);
- LuaOptions.AddOption("compute.upstream.storage.oauthclientid"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId,
- "upstream-horde-storage-oauth-clientid"sv);
- LuaOptions.AddOption("compute.upstream.storage.oauthclientsecret"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret,
- "upstream-horde-storage-oauth-clientsecret"sv);
- LuaOptions.AddOption("compute.upstream.storage.openidprovider"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider,
- "upstream-horde-storage-openid-provider"sv);
- LuaOptions.AddOption("compute.upstream.storage.token"sv,
- ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken,
- "upstream-horde-storage-token"sv);
-
LuaOptions.AddOption("gc.enabled"sv, ServerOptions.GcConfig.Enabled, "gc-enabled"sv);
LuaOptions.AddOption("gc.monitorintervalseconds"sv, ServerOptions.GcConfig.MonitorIntervalSeconds, "gc-monitor-interval-seconds"sv);
@@ -1208,13 +1157,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"),
"");
- options.add_option("compute",
- "",
- "upstream-horde-url",
- "URL to a Horde instance.",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Url)->default_value(""),
- "");
-
options.add_option("cache",
"",
"cache-write-log",
@@ -1257,98 +1199,6 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<uint64_t>(ServerOptions.StructuredCacheConfig.MemMaxAgeSeconds)->default_value("86400"),
"");
- options.add_option("compute",
- "",
- "upstream-horde-oauth-url",
- "URL to the OAuth provier",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-oauth-clientid",
- "The OAuth client ID",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-oauth-clientsecret",
- "The OAuth client secret",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-openid-provider",
- "Name of a registered Open ID provider",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-token",
- "A static authentication token",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-storage-url",
- "URL to a Horde Storage instance.",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageUrl)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-storage-oauth-url",
- "URL to the OAuth provier",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthUrl)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-storage-oauth-clientid",
- "The OAuth client ID",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientId)->default_value(""),
- "");
-
- options.add_option(
- "compute",
- "",
- "upstream-horde-storage-oauth-clientsecret",
- "The OAuth client secret",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOAuthClientSecret)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-storage-openid-provider",
- "Name of a registered Open ID provider",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageOpenIdProvider)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-storage-token",
- "A static authentication token",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.StorageAccessToken)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-cluster",
- "The Horde compute cluster id",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster)->default_value(""),
- "");
-
- options.add_option("compute",
- "",
- "upstream-horde-namespace",
- "The Jupiter namespace to use with Horde compute",
- cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace)->default_value(""),
- "");
-
options.add_option("gc",
"",
"gc-enabled",
diff --git a/src/zenserver/config.h b/src/zenserver/config.h
index ee57b23cc..d3b20218f 100644
--- a/src/zenserver/config.h
+++ b/src/zenserver/config.h
@@ -23,27 +23,6 @@ struct ZenUpstreamJupiterConfig
std::string DdcNamespace;
};
-struct ZenUpstreamHordeConfig
-{
- std::string Name;
- std::string Url;
- std::string OAuthUrl;
- std::string OAuthClientId;
- std::string OAuthClientSecret;
- std::string OpenIdProvider;
- std::string AccessToken;
-
- std::string StorageUrl;
- std::string StorageOAuthUrl;
- std::string StorageOAuthClientId;
- std::string StorageOAuthClientSecret;
- std::string StorageOpenIdProvider;
- std::string StorageAccessToken;
-
- std::string Cluster;
- std::string Namespace;
-};
-
struct ZenUpstreamZenConfig
{
std::string Name;
@@ -62,7 +41,6 @@ enum class UpstreamCachePolicy : uint8_t
struct ZenUpstreamCacheConfig
{
ZenUpstreamJupiterConfig JupiterConfig;
- ZenUpstreamHordeConfig HordeConfig;
ZenUpstreamZenConfig ZenConfig;
int32_t UpstreamThreadCount = 4;
int32_t ConnectTimeoutMilliseconds = 5000;
@@ -137,29 +115,27 @@ struct ZenServerOptions
ZenObjectStoreConfig ObjectStoreConfig;
zen::HttpServerConfig HttpServerConfig;
ZenStructuredCacheConfig StructuredCacheConfig;
- std::filesystem::path DataDir; // Root directory for state (used for testing)
- std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
- std::filesystem::path AbsLogFile; // Absolute path to main log file
- std::filesystem::path ConfigFile; // Path to Lua config file
- std::string ChildId; // Id assigned by parent process (used for lifetime management)
- std::string LogId; // Id for tagging log output
- std::string EncryptionKey; // 256 bit AES encryption key
- std::string EncryptionIV; // 128 bit AES initialization vector
- int BasePort = 1337; // Service listen port (used for both UDP and TCP)
- int OwnerPid = 0; // Parent process id (zero for standalone)
- bool InstallService = false; // Flag used to initiate service install (temporary)
- bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
- bool IsDebug = false;
- bool IsTest = false;
- bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
- bool ExecServiceEnabled = true;
- bool ComputeServiceEnabled = true;
- bool ShouldCrash = false; // Option for testing crash handling
- bool IsFirstRun = false;
- bool NoSentry = false;
- bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
- bool ObjectStoreEnabled = false;
- bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
+ std::filesystem::path DataDir; // Root directory for state (used for testing)
+ std::filesystem::path ContentDir; // Root directory for serving frontend content (experimental)
+ std::filesystem::path AbsLogFile; // Absolute path to main log file
+ std::filesystem::path ConfigFile; // Path to Lua config file
+ std::string ChildId; // Id assigned by parent process (used for lifetime management)
+ std::string LogId; // Id for tagging log output
+ std::string EncryptionKey; // 256 bit AES encryption key
+ std::string EncryptionIV; // 128 bit AES initialization vector
+ int BasePort = 1337; // Service listen port (used for both UDP and TCP)
+ int OwnerPid = 0; // Parent process id (zero for standalone)
+ bool InstallService = false; // Flag used to initiate service install (temporary)
+ bool UninstallService = false; // Flag used to initiate service uninstall (temporary)
+ bool IsDebug = false;
+ bool IsTest = false;
+ bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
+ bool ShouldCrash = false; // Option for testing crash handling
+ bool IsFirstRun = false;
+ bool NoSentry = false;
+ bool SentryAllowPII = false; // Allow personally identifiable information in sentry crash reports
+ bool ObjectStoreEnabled = false;
+ bool NoConsoleOutput = false; // Control default use of stdout for diagnostics
#if ZEN_WITH_TRACE
std::string TraceHost; // Host name or IP address to send trace data to
std::string TraceFile; // Path of a file to write a trace
diff --git a/src/zenserver/upstream/hordecompute.cpp b/src/zenserver/upstream/hordecompute.cpp
deleted file mode 100644
index cf921eaad..000000000
--- a/src/zenserver/upstream/hordecompute.cpp
+++ /dev/null
@@ -1,1457 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "upstreamapply.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include "jupiter.h"
-
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/compactbinarypackage.h>
-# include <zencore/compactbinaryvalidation.h>
-# include <zencore/fmtutils.h>
-# include <zencore/session.h>
-# include <zencore/stream.h>
-# include <zencore/thread.h>
-# include <zencore/timer.h>
-# include <zencore/workthreadpool.h>
-
-# include <zenstore/cidstore.h>
-
-# include <zenhttp/auth/authmgr.h>
-# include <upstream/upstreamcache.h>
-
-# include "cache/structuredcachestore.h"
-# include "diag/logging.h"
-
-# include <fmt/format.h>
-
-# include <algorithm>
-# include <atomic>
-# include <set>
-# include <stack>
-
-namespace zen {
-
-using namespace std::literals;
-
-static const IoBuffer EmptyBuffer;
-static const IoHash EmptyBufferId = IoHash::HashBuffer(EmptyBuffer);
-
-namespace detail {
-
- class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
- {
- public:
- HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& StorageAuthConfig,
- CidStore& CidStore,
- AuthMgr& Mgr)
- : m_Log(logging::Get("upstream-apply"))
- , m_CidStore(CidStore)
- , m_AuthMgr(Mgr)
- {
- m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl);
- m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString());
-
- {
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
-
- if (ComputeAuthConfig.OAuthUrl.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl,
- .ClientId = ComputeAuthConfig.OAuthClientId,
- .ClientSecret = ComputeAuthConfig.OAuthClientSecret});
- }
- else if (ComputeAuthConfig.OpenIdProvider.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
- }
- else
- {
- CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
- }
-
- m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider));
- }
-
- {
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
-
- if (StorageAuthConfig.OAuthUrl.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl,
- .ClientId = StorageAuthConfig.OAuthClientId,
- .ClientSecret = StorageAuthConfig.OAuthClientSecret});
- }
- else if (StorageAuthConfig.OpenIdProvider.empty() == false)
- {
- TokenProvider =
- CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() {
- AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
- return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
- });
- }
- else
- {
- CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken),
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
- TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
- }
-
- m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider));
- }
- }
-
- virtual ~HordeUpstreamApplyEndpoint() = default;
-
- virtual UpstreamEndpointHealth Initialize() override { return CheckHealth(); }
-
- virtual bool IsHealthy() const override { return m_HealthOk.load(); }
-
- virtual UpstreamEndpointHealth CheckHealth() override
- {
- try
- {
- CloudCacheSession Session(m_Client);
- CloudCacheResult Result = Session.Authenticate();
-
- m_HealthOk = Result.ErrorCode == 0;
-
- return {.Reason = std::move(Result.Reason), .Ok = Result.Success};
- }
- catch (std::exception& Err)
- {
- return {.Reason = Err.what(), .Ok = false};
- }
- }
-
- virtual std::string_view DisplayName() const override { return m_DisplayName; }
-
- virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) override
- {
- PostUpstreamApplyResult ApplyResult{};
- ApplyResult.Timepoints.merge(ApplyRecord.Timepoints);
-
- try
- {
- UpstreamData UpstreamData;
- if (!ProcessApplyKey(ApplyRecord, UpstreamData))
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed to generate task data"}};
- }
-
- {
- ApplyResult.Timepoints["zen-storage-build-ref"] = DateTime::NowTicks();
-
- bool AlreadyQueued;
- {
- std::scoped_lock Lock(m_TaskMutex);
- AlreadyQueued = m_PendingTasks.contains(UpstreamData.TaskId);
- }
- if (AlreadyQueued)
- {
- // Pending task is already queued, return success
- ApplyResult.Success = true;
- return ApplyResult;
- }
- m_PendingTasks[UpstreamData.TaskId] = std::move(ApplyRecord);
- }
-
- CloudCacheSession ComputeSession(m_Client);
- CloudCacheSession StorageSession(m_StorageClient);
-
- {
- CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs, UpstreamData.CasIds);
- ApplyResult.Bytes += Result.Bytes;
- ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
- ApplyResult.Timepoints["zen-storage-upload-blobs"] = DateTime::NowTicks();
- if (!Result.Success)
- {
- ApplyResult.Error = {.ErrorCode = Result.ErrorCode,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload blobs"};
- return ApplyResult;
- }
- UpstreamData.Blobs.clear();
- UpstreamData.CasIds.clear();
- }
-
- {
- CloudCacheResult Result = BatchPutCompressedBlobsIfMissing(StorageSession, UpstreamData.Cids);
- ApplyResult.Bytes += Result.Bytes;
- ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
- ApplyResult.Timepoints["zen-storage-upload-compressed-blobs"] = DateTime::NowTicks();
- if (!Result.Success)
- {
- ApplyResult.Error = {
- .ErrorCode = Result.ErrorCode,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload compressed blobs"};
- return ApplyResult;
- }
- UpstreamData.Cids.clear();
- }
-
- {
- CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects);
- ApplyResult.Bytes += Result.Bytes;
- ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
- ApplyResult.Timepoints["zen-storage-upload-objects"] = DateTime::NowTicks();
- if (!Result.Success)
- {
- ApplyResult.Error = {.ErrorCode = Result.ErrorCode,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to upload objects"};
- return ApplyResult;
- }
- }
-
- {
- PutRefResult RefResult = StorageSession.PutRef(StorageSession.Client().DefaultBlobStoreNamespace(),
- "requests"sv,
- UpstreamData.TaskId,
- UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
- ZenContentType::kCbObject);
- Log().debug("Put ref {} Need={} Bytes={} Duration={}s Result={}",
- UpstreamData.TaskId,
- RefResult.Needs.size(),
- RefResult.Bytes,
- RefResult.ElapsedSeconds,
- RefResult.Success);
- ApplyResult.Bytes += RefResult.Bytes;
- ApplyResult.ElapsedSeconds += RefResult.ElapsedSeconds;
- ApplyResult.Timepoints["zen-storage-put-ref"] = DateTime::NowTicks();
-
- if (RefResult.Needs.size() > 0)
- {
- Log().error("Failed to add task ref {} due to {} missing blobs", UpstreamData.TaskId, RefResult.Needs.size());
- for (const auto& Hash : RefResult.Needs)
- {
- Log().debug("Task ref {} missing blob {}", UpstreamData.TaskId, Hash);
- }
-
- ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode,
- .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason)
- : "Failed to add task ref due to missing blob"};
- return ApplyResult;
- }
-
- if (!RefResult.Success)
- {
- ApplyResult.Error = {.ErrorCode = RefResult.ErrorCode,
- .Reason = !RefResult.Reason.empty() ? std::move(RefResult.Reason) : "Failed to add task ref"};
- return ApplyResult;
- }
- UpstreamData.Objects.clear();
- }
-
- {
- CbObjectWriter Writer;
- Writer.AddString("c"sv, m_ChannelId);
- Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId);
- Writer.BeginArray("t"sv);
- Writer.AddObjectAttachment(UpstreamData.TaskId);
- Writer.EndArray();
- CbObject TasksObject = Writer.Save();
- IoBuffer TasksData = TasksObject.GetBuffer().AsIoBuffer();
-
- CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData);
- Log().debug("Post compute task {} Bytes={} Duration={}s Result={}",
- TasksObject.GetHash(),
- Result.Bytes,
- Result.ElapsedSeconds,
- Result.Success);
- ApplyResult.Bytes += Result.Bytes;
- ApplyResult.ElapsedSeconds += Result.ElapsedSeconds;
- ApplyResult.Timepoints["zen-horde-post-task"] = DateTime::NowTicks();
- if (!Result.Success)
- {
- {
- std::scoped_lock Lock(m_TaskMutex);
- m_PendingTasks.erase(UpstreamData.TaskId);
- }
-
- ApplyResult.Error = {.ErrorCode = Result.ErrorCode,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to post compute task"};
- return ApplyResult;
- }
- }
-
- Log().info("Task posted {}", UpstreamData.TaskId);
- ApplyResult.Success = true;
- return ApplyResult;
- }
- catch (std::exception& Err)
- {
- m_HealthOk = false;
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
- }
- }
-
- [[nodiscard]] CloudCacheResult BatchPutBlobsIfMissing(CloudCacheSession& Session,
- const std::map<IoHash, IoBuffer>& Blobs,
- const std::set<IoHash>& CasIds)
- {
- if (Blobs.size() == 0 && CasIds.size() == 0)
- {
- return {.Success = true};
- }
-
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- // Batch check for missing blobs
- std::set<IoHash> Keys;
- std::transform(Blobs.begin(), Blobs.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
- Keys.insert(CasIds.begin(), CasIds.end());
-
- CloudCacheExistsResult ExistsResult = Session.BlobExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
- Log().debug("Queried {} missing blobs Need={} Duration={}s Result={}",
- Keys.size(),
- ExistsResult.Needs.size(),
- ExistsResult.ElapsedSeconds,
- ExistsResult.Success);
- ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (!ExistsResult.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if blobs exist"};
- }
-
- for (const auto& Hash : ExistsResult.Needs)
- {
- IoBuffer DataBuffer;
- if (Blobs.contains(Hash))
- {
- DataBuffer = Blobs.at(Hash);
- }
- else
- {
- DataBuffer = m_CidStore.FindChunkByCid(Hash);
- if (!DataBuffer)
- {
- Log().warn("Put blob FAILED, input chunk '{}' missing", Hash);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put blobs"};
- }
- }
-
- CloudCacheResult Result = Session.PutBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer);
- Log().debug("Put blob {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"};
- }
- }
-
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
-
- [[nodiscard]] CloudCacheResult BatchPutCompressedBlobsIfMissing(CloudCacheSession& Session, const std::set<IoHash>& Cids)
- {
- if (Cids.size() == 0)
- {
- return {.Success = true};
- }
-
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- // Batch check for missing compressed blobs
- CloudCacheExistsResult ExistsResult = Session.CompressedBlobExists(Session.Client().DefaultBlobStoreNamespace(), Cids);
- Log().debug("Queried {} missing compressed blobs Need={} Duration={}s Result={}",
- Cids.size(),
- ExistsResult.Needs.size(),
- ExistsResult.ElapsedSeconds,
- ExistsResult.Success);
- ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (!ExistsResult.Success)
- {
- return {
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if compressed blobs exist"};
- }
-
- for (const auto& Hash : ExistsResult.Needs)
- {
- IoBuffer DataBuffer = m_CidStore.FindChunkByCid(Hash);
- if (!DataBuffer)
- {
- Log().warn("Put compressed blob FAILED, input CID chunk '{}' missing", Hash);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .ErrorCode = -1, .Reason = "Failed to put compressed blobs"};
- }
-
- CloudCacheResult Result = Session.PutCompressedBlob(Session.Client().DefaultBlobStoreNamespace(), Hash, DataBuffer);
- Log().debug("Put compressed blob {} Bytes={} Duration={}s Result={}",
- Hash,
- Result.Bytes,
- Result.ElapsedSeconds,
- Result.Success);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put compressed blobs"};
- }
- }
-
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
-
- [[nodiscard]] CloudCacheResult BatchPutObjectsIfMissing(CloudCacheSession& Session, const std::map<IoHash, CbObject>& Objects)
- {
- if (Objects.size() == 0)
- {
- return {.Success = true};
- }
-
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- // Batch check for missing objects
- std::set<IoHash> Keys;
- std::transform(Objects.begin(), Objects.end(), std::inserter(Keys, Keys.end()), [](const auto& It) { return It.first; });
-
- CloudCacheExistsResult ExistsResult = Session.ObjectExists(Session.Client().DefaultBlobStoreNamespace(), Keys);
- Log().debug("Queried {} missing objects Need={} Duration={}s Result={}",
- Keys.size(),
- ExistsResult.Needs.size(),
- ExistsResult.ElapsedSeconds,
- ExistsResult.Success);
- ElapsedSeconds += ExistsResult.ElapsedSeconds;
- if (!ExistsResult.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to check if objects exist"};
- }
-
- for (const auto& Hash : ExistsResult.Needs)
- {
- CloudCacheResult Result =
- Session.PutObject(Session.Client().DefaultBlobStoreNamespace(), Hash, Objects.at(Hash).GetBuffer().AsIoBuffer());
- Log().debug("Put object {} Bytes={} Duration={}s Result={}", Hash, Result.Bytes, Result.ElapsedSeconds, Result.Success);
- Bytes += Result.Bytes;
- ElapsedSeconds += Result.ElapsedSeconds;
- if (!Result.Success)
- {
- return {.Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
- .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"};
- }
- }
-
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
-
- enum class ComputeTaskState : int32_t
- {
- Queued = 0,
- Executing = 1,
- Complete = 2,
- };
-
- enum class ComputeTaskOutcome : int32_t
- {
- Success = 0,
- Failed = 1,
- Cancelled = 2,
- NoResult = 3,
- Exipred = 4,
- BlobNotFound = 5,
- Exception = 6,
- };
-
- [[nodiscard]] static std::string_view ComputeTaskStateToString(const ComputeTaskState Outcome)
- {
- switch (Outcome)
- {
- case ComputeTaskState::Queued:
- return "Queued"sv;
- case ComputeTaskState::Executing:
- return "Executing"sv;
- case ComputeTaskState::Complete:
- return "Complete"sv;
- };
- return "Unknown"sv;
- }
-
- [[nodiscard]] static std::string_view ComputeTaskOutcomeToString(const ComputeTaskOutcome Outcome)
- {
- switch (Outcome)
- {
- case ComputeTaskOutcome::Success:
- return "Success"sv;
- case ComputeTaskOutcome::Failed:
- return "Failed"sv;
- case ComputeTaskOutcome::Cancelled:
- return "Cancelled"sv;
- case ComputeTaskOutcome::NoResult:
- return "NoResult"sv;
- case ComputeTaskOutcome::Exipred:
- return "Exipred"sv;
- case ComputeTaskOutcome::BlobNotFound:
- return "BlobNotFound"sv;
- case ComputeTaskOutcome::Exception:
- return "Exception"sv;
- };
- return "Unknown"sv;
- }
-
- virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) override
- {
- int64_t Bytes{};
- double ElapsedSeconds{};
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- if (m_PendingTasks.empty())
- {
- if (m_CompletedTasks.empty())
- {
- // Nothing to do.
- return {.Success = true};
- }
-
- UpstreamApplyCompleted CompletedTasks;
- std::swap(CompletedTasks, m_CompletedTasks);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
- }
- }
-
- try
- {
- CloudCacheSession ComputeSession(m_Client);
-
- CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId);
- Log().debug("Get compute updates Bytes={} Duration={}s Result={}",
- UpdatesResult.Bytes,
- UpdatesResult.ElapsedSeconds,
- UpdatesResult.Success);
- Bytes += UpdatesResult.Bytes;
- ElapsedSeconds += UpdatesResult.ElapsedSeconds;
- if (!UpdatesResult.Success)
- {
- return {.Error{.ErrorCode = UpdatesResult.ErrorCode, .Reason = std::move(UpdatesResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
- if (!UpdatesResult.Success)
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed get task updates"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds};
- }
-
- CbObject TaskStatus = LoadCompactBinaryObject(std::move(UpdatesResult.Response));
-
- for (auto& It : TaskStatus["u"sv])
- {
- CbObjectView Status = It.AsObjectView();
- IoHash TaskId = Status["h"sv].AsHash();
- const ComputeTaskState State = (ComputeTaskState)Status["s"sv].AsInt32();
- const ComputeTaskOutcome Outcome = (ComputeTaskOutcome)Status["o"sv].AsInt32();
-
- Log().info("Task {} State={}", TaskId, ComputeTaskStateToString(State));
-
- // Only completed tasks need to be processed
- if (State != ComputeTaskState::Complete)
- {
- continue;
- }
-
- IoHash WorkerId{};
- IoHash ActionId{};
- UpstreamApplyType ApplyType{};
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- auto TaskIt = m_PendingTasks.find(TaskId);
- if (TaskIt != m_PendingTasks.end())
- {
- WorkerId = TaskIt->second.WorkerDescriptor.GetHash();
- ActionId = TaskIt->second.Action.GetHash();
- ApplyType = TaskIt->second.Type;
- m_PendingTasks.erase(TaskIt);
- }
- }
-
- if (WorkerId == IoHash::Zero)
- {
- Log().warn("Task {} missing from pending tasks", TaskId);
- continue;
- }
-
- std::map<std::string, uint64_t> Timepoints;
- ProcessQueueTimings(Status["qs"sv].AsObjectView(), Timepoints);
- ProcessExecuteTimings(Status["es"sv].AsObjectView(), Timepoints);
-
- if (Outcome != ComputeTaskOutcome::Success)
- {
- const std::string_view Detail = Status["d"sv].AsString();
- {
- std::scoped_lock Lock(m_TaskMutex);
- m_CompletedTasks[WorkerId][ActionId] = {
- .Error{.ErrorCode = -1, .Reason = fmt::format("Task {} {}", ComputeTaskOutcomeToString(Outcome), Detail)},
- .Timepoints = std::move(Timepoints)};
- }
- continue;
- }
-
- Timepoints["zen-complete-queue-added"] = DateTime::NowTicks();
- ThreadPool.ScheduleWork([this,
- ApplyType,
- ResultHash = Status["r"sv].AsHash(),
- Timepoints = std::move(Timepoints),
- TaskId = std::move(TaskId),
- WorkerId = std::move(WorkerId),
- ActionId = std::move(ActionId)]() mutable {
- Timepoints["zen-complete-queue-dispatched"] = DateTime::NowTicks();
- GetUpstreamApplyResult Result = ProcessTaskStatus(ApplyType, ResultHash);
- Timepoints["zen-complete-queue-complete"] = DateTime::NowTicks();
- Result.Timepoints.merge(Timepoints);
-
- Log().debug("Task Processed {} Files={} Attachments={} ExitCode={}",
- TaskId,
- Result.OutputFiles.size(),
- Result.OutputPackage.GetAttachments().size(),
- Result.Error.ErrorCode);
- {
- std::scoped_lock Lock(m_TaskMutex);
- m_CompletedTasks[WorkerId][ActionId] = std::move(Result);
- }
- });
- }
-
- {
- std::scoped_lock Lock(m_TaskMutex);
- if (m_CompletedTasks.empty())
- {
- // Nothing to do.
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Success = true};
- }
- UpstreamApplyCompleted CompletedTasks;
- std::swap(CompletedTasks, m_CompletedTasks);
- return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, .Completed = std::move(CompletedTasks), .Success = true};
- }
- }
- catch (std::exception& Err)
- {
- m_HealthOk = false;
- return {
- .Error{.ErrorCode = -1, .Reason = Err.what()},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds,
- };
- }
- }
-
- virtual UpstreamApplyEndpointStats& Stats() override { return m_Stats; }
-
- private:
- spdlog::logger& Log() { return m_Log; }
-
- spdlog::logger& m_Log;
- CidStore& m_CidStore;
- AuthMgr& m_AuthMgr;
- std::string m_DisplayName;
- RefPtr<CloudCacheClient> m_Client;
- RefPtr<CloudCacheClient> m_StorageClient;
- UpstreamApplyEndpointStats m_Stats;
- std::atomic_bool m_HealthOk{false};
- std::string m_ChannelId;
-
- std::mutex m_TaskMutex;
- std::unordered_map<IoHash, UpstreamApplyRecord> m_PendingTasks;
- UpstreamApplyCompleted m_CompletedTasks;
-
- struct UpstreamData
- {
- std::map<IoHash, IoBuffer> Blobs;
- std::map<IoHash, CbObject> Objects;
- std::set<IoHash> CasIds;
- std::set<IoHash> Cids;
- IoHash TaskId;
- IoHash RequirementsId;
- };
-
- struct UpstreamDirectory
- {
- std::filesystem::path Path;
- std::map<std::string, UpstreamDirectory> Directories;
- std::set<std::string> Files;
- };
-
- static void ProcessQueueTimings(CbObjectView QueueStats, std::map<std::string, uint64_t>& Timepoints)
- {
- uint64_t Ticks = QueueStats["t"sv].AsDateTimeTicks();
- if (Ticks == 0)
- {
- return;
- }
-
- // Scope is an array of miliseconds after start time
- // TODO: cleanup
- Timepoints["horde-queue-added"] = Ticks;
- int Index = 0;
- for (auto& Item : QueueStats["s"sv].AsArrayView())
- {
- Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond;
- switch (Index)
- {
- case 0:
- Timepoints["horde-queue-dispatched"] = Ticks;
- break;
- case 1:
- Timepoints["horde-queue-complete"] = Ticks;
- break;
- }
- Index++;
- }
- }
-
- static void ProcessExecuteTimings(CbObjectView ExecutionStats, std::map<std::string, uint64_t>& Timepoints)
- {
- uint64_t Ticks = ExecutionStats["t"sv].AsDateTimeTicks();
- if (Ticks == 0)
- {
- return;
- }
-
- // Scope is an array of miliseconds after start time
- // TODO: cleanup
- Timepoints["horde-execution-start"] = Ticks;
- int Index = 0;
- for (auto& Item : ExecutionStats["s"sv].AsArrayView())
- {
- Ticks += Item.AsInt32() * TimeSpan::TicksPerMillisecond;
- switch (Index)
- {
- case 0:
- Timepoints["horde-execution-download-ref"] = Ticks;
- break;
- case 1:
- Timepoints["horde-execution-download-input"] = Ticks;
- break;
- case 2:
- Timepoints["horde-execution-execute"] = Ticks;
- break;
- case 3:
- Timepoints["horde-execution-upload-log"] = Ticks;
- break;
- case 4:
- Timepoints["horde-execution-upload-output"] = Ticks;
- break;
- case 5:
- Timepoints["horde-execution-upload-ref"] = Ticks;
- break;
- }
- Index++;
- }
- }
-
- [[nodiscard]] GetUpstreamApplyResult ProcessTaskStatus(const UpstreamApplyType ApplyType, const IoHash& ResultHash)
- {
- try
- {
- CloudCacheSession Session(m_StorageClient);
-
- GetUpstreamApplyResult ApplyResult{};
-
- IoHash StdOutHash;
- IoHash StdErrHash;
- IoHash OutputHash;
-
- std::map<IoHash, IoBuffer> BinaryData;
-
- {
- CloudCacheResult ObjectRefResult =
- Session.GetRef(Session.Client().DefaultBlobStoreNamespace(), "responses"sv, ResultHash, ZenContentType::kCbObject);
- Log().debug("Get ref {} Bytes={} Duration={}s Result={}",
- ResultHash,
- ObjectRefResult.Bytes,
- ObjectRefResult.ElapsedSeconds,
- ObjectRefResult.Success);
- ApplyResult.Bytes += ObjectRefResult.Bytes;
- ApplyResult.ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
- ApplyResult.Timepoints["zen-storage-get-ref"] = DateTime::NowTicks();
-
- if (!ObjectRefResult.Success)
- {
- ApplyResult.Error.Reason = "Failed to get result object data";
- return ApplyResult;
- }
-
- CbObject ResultObject = LoadCompactBinaryObject(ObjectRefResult.Response);
- ApplyResult.Error.ErrorCode = ResultObject["e"sv].AsInt32();
- StdOutHash = ResultObject["so"sv].AsBinaryAttachment();
- StdErrHash = ResultObject["se"sv].AsBinaryAttachment();
- OutputHash = ResultObject["o"sv].AsObjectAttachment();
- }
-
- {
- std::set<IoHash> NeededData;
- if (OutputHash != IoHash::Zero)
- {
- GetObjectReferencesResult ObjectReferenceResult =
- Session.GetObjectReferences(Session.Client().DefaultBlobStoreNamespace(), OutputHash);
- Log().debug("Get object references {} References={} Bytes={} Duration={}s Result={}",
- ResultHash,
- ObjectReferenceResult.References.size(),
- ObjectReferenceResult.Bytes,
- ObjectReferenceResult.ElapsedSeconds,
- ObjectReferenceResult.Success);
- ApplyResult.Bytes += ObjectReferenceResult.Bytes;
- ApplyResult.ElapsedSeconds += ObjectReferenceResult.ElapsedSeconds;
- ApplyResult.Timepoints["zen-storage-get-object-references"] = DateTime::NowTicks();
-
- if (!ObjectReferenceResult.Success)
- {
- ApplyResult.Error.Reason = "Failed to get result object references";
- return ApplyResult;
- }
-
- NeededData = std::move(ObjectReferenceResult.References);
- }
-
- NeededData.insert(OutputHash);
- NeededData.insert(StdOutHash);
- NeededData.insert(StdErrHash);
-
- for (const auto& Hash : NeededData)
- {
- if (Hash == IoHash::Zero)
- {
- continue;
- }
- CloudCacheResult BlobResult = Session.GetBlob(Session.Client().DefaultBlobStoreNamespace(), Hash);
- Log().debug("Get blob {} Bytes={} Duration={}s Result={}",
- Hash,
- BlobResult.Bytes,
- BlobResult.ElapsedSeconds,
- BlobResult.Success);
- ApplyResult.Bytes += BlobResult.Bytes;
- ApplyResult.ElapsedSeconds += BlobResult.ElapsedSeconds;
- if (!BlobResult.Success)
- {
- ApplyResult.Error.Reason = "Failed to get blob";
- return ApplyResult;
- }
- BinaryData[Hash] = std::move(BlobResult.Response);
- }
- ApplyResult.Timepoints["zen-storage-get-blobs"] = DateTime::NowTicks();
- }
-
- ApplyResult.StdOut = StdOutHash != IoHash::Zero
- ? std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize())
- : "";
- ApplyResult.StdErr = StdErrHash != IoHash::Zero
- ? std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize())
- : "";
-
- if (OutputHash == IoHash::Zero)
- {
- ApplyResult.Error.Reason = "Task completed with no output object";
- return ApplyResult;
- }
-
- CbObject OutputObject = LoadCompactBinaryObject(BinaryData[OutputHash]);
-
- switch (ApplyType)
- {
- case UpstreamApplyType::Simple:
- {
- ResolveMerkleTreeDirectory(""sv, OutputHash, BinaryData, ApplyResult.OutputFiles);
- for (const auto& Pair : BinaryData)
- {
- ApplyResult.FileData[Pair.first] = std::move(BinaryData.at(Pair.first));
- }
-
- ApplyResult.Success = ApplyResult.Error.ErrorCode == 0;
- return ApplyResult;
- }
- break;
- case UpstreamApplyType::Asset:
- {
- if (ApplyResult.Error.ErrorCode != 0)
- {
- ApplyResult.Error.Reason = "Task completed with errors";
- return ApplyResult;
- }
-
- // Get build.output
- IoHash BuildOutputId;
- IoBuffer BuildOutput;
- for (auto& It : OutputObject["f"sv])
- {
- const CbObjectView FileObject = It.AsObjectView();
- if (FileObject["n"sv].AsString() == "Build.output"sv)
- {
- BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
- BuildOutput = BinaryData[BuildOutputId];
- break;
- }
- }
-
- if (BuildOutput.GetSize() == 0)
- {
- ApplyResult.Error.Reason = "Build.output file not found in task results";
- return ApplyResult;
- }
-
- // Get Output directory node
- IoBuffer OutputDirectoryTree;
- for (auto& It : OutputObject["d"sv])
- {
- const CbObjectView DirectoryObject = It.AsObjectView();
- if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
- {
- OutputDirectoryTree = BinaryData[DirectoryObject["h"sv].AsObjectAttachment()];
- break;
- }
- }
-
- if (OutputDirectoryTree.GetSize() == 0)
- {
- ApplyResult.Error.Reason = "Outputs directory not found in task results";
- return ApplyResult;
- }
-
- // load build.output as CbObject
-
- // Move Outputs from Horde to CbPackage
-
- std::unordered_map<IoHash, IoHash> CidToCompressedId;
- CbPackage OutputPackage;
- CbObject OutputDirectoryTreeObject = LoadCompactBinaryObject(OutputDirectoryTree);
-
- for (auto& It : OutputDirectoryTreeObject["f"sv])
- {
- CbObjectView FileObject = It.AsObjectView();
- // Name is the uncompressed hash
- IoHash DecompressedId = IoHash::FromHexString(FileObject["n"sv].AsString());
- // Hash is the compressed data hash, and how it is stored in Horde
- IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
-
- if (!BinaryData.contains(CompressedId))
- {
- Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId);
- ApplyResult.Error.Reason = "Object attachment chunk not retrieved from Horde";
- return ApplyResult;
- }
- CidToCompressedId[DecompressedId] = CompressedId;
- }
-
- // Iterate attachments, verify all chunks exist, and add to CbPackage
- bool AnyErrors = false;
- CbObject BuildOutputObject = LoadCompactBinaryObject(BuildOutput);
- BuildOutputObject.IterateAttachments([&](CbFieldView Field) {
- const IoHash DecompressedId = Field.AsHash();
- if (!CidToCompressedId.contains(DecompressedId))
- {
- Log().warn("Attachment not found {}", DecompressedId);
- AnyErrors = true;
- return;
- }
- const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
-
- if (!BinaryData.contains(CompressedId))
- {
- Log().warn("Missing output {} compressed {} uncompressed", CompressedId, DecompressedId);
- AnyErrors = true;
- return;
- }
-
- IoHash RawHash;
- uint64_t RawSize;
- CompressedBuffer AttachmentBuffer =
- CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]), RawHash, RawSize);
-
- if (!AttachmentBuffer || RawHash != DecompressedId)
- {
- Log().warn(
- "Invalid output encountered (not valid CompressedBuffer format) {} compressed {} uncompressed",
- CompressedId,
- DecompressedId);
- AnyErrors = true;
- return;
- }
-
- ApplyResult.TotalAttachmentBytes += AttachmentBuffer.GetCompressedSize();
- ApplyResult.TotalRawAttachmentBytes += RawSize;
-
- CbAttachment Attachment(AttachmentBuffer, DecompressedId);
- OutputPackage.AddAttachment(Attachment);
- });
-
- if (AnyErrors)
- {
- ApplyResult.Error.Reason = "Failed to get result object attachment data";
- return ApplyResult;
- }
-
- OutputPackage.SetObject(BuildOutputObject);
- ApplyResult.OutputPackage = std::move(OutputPackage);
-
- ApplyResult.Success = ApplyResult.Error.ErrorCode == 0;
- return ApplyResult;
- }
- break;
- }
-
- ApplyResult.Error.Reason = "Unknown apply type";
- return ApplyResult;
- }
- catch (std::exception& Err)
- {
- return {.Error{.ErrorCode = -1, .Reason = Err.what()}};
- }
- }
-
- [[nodiscard]] bool ProcessApplyKey(const UpstreamApplyRecord& ApplyRecord, UpstreamData& Data)
- {
- std::string ExecutablePath;
- std::string WorkingDirectory;
- std::vector<std::string> Arguments;
- std::map<std::string, std::string> Environment;
- std::set<std::filesystem::path> InputFiles;
- std::set<std::string> Outputs;
- std::map<std::filesystem::path, IoHash> InputFileHashes;
-
- ExecutablePath = ApplyRecord.WorkerDescriptor["path"sv].AsString();
- if (ExecutablePath.empty())
- {
- Log().warn("process apply upstream FAILED, '{}', path missing from worker descriptor",
- ApplyRecord.WorkerDescriptor.GetHash());
- return false;
- }
-
- WorkingDirectory = ApplyRecord.WorkerDescriptor["workdir"sv].AsString();
-
- for (auto& It : ApplyRecord.WorkerDescriptor["executables"sv])
- {
- CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds))
- {
- return false;
- }
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["files"sv])
- {
- CbObjectView FileEntry = It.AsObjectView();
- if (!ProcessFileEntry(FileEntry, InputFiles, InputFileHashes, Data.CasIds))
- {
- return false;
- }
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["dirs"sv])
- {
- std::string_view Directory = It.AsString();
- std::string DummyFile = fmt::format("{}/.zen_empty_file", Directory);
- InputFiles.insert(DummyFile);
- Data.Blobs[EmptyBufferId] = EmptyBuffer;
- InputFileHashes[DummyFile] = EmptyBufferId;
- }
-
- if (!WorkingDirectory.empty())
- {
- std::string DummyFile = fmt::format("{}/.zen_empty_file", WorkingDirectory);
- InputFiles.insert(DummyFile);
- Data.Blobs[EmptyBufferId] = EmptyBuffer;
- InputFileHashes[DummyFile] = EmptyBufferId;
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["environment"sv])
- {
- std::string_view Env = It.AsString();
- auto Index = Env.find('=');
- if (Index == std::string_view::npos)
- {
- Log().warn("process apply upstream FAILED, environment '{}' malformed", Env);
- return false;
- }
-
- Environment[std::string(Env.substr(0, Index))] = Env.substr(Index + 1);
- }
-
- switch (ApplyRecord.Type)
- {
- case UpstreamApplyType::Simple:
- {
- for (auto& It : ApplyRecord.WorkerDescriptor["arguments"sv])
- {
- Arguments.push_back(std::string(It.AsString()));
- }
-
- for (auto& It : ApplyRecord.WorkerDescriptor["outputs"sv])
- {
- Outputs.insert(std::string(It.AsString()));
- }
- }
- break;
- case UpstreamApplyType::Asset:
- {
- static const std::filesystem::path BuildActionPath = "Build.action"sv;
- static const std::filesystem::path InputPath = "Inputs"sv;
- const IoHash ActionId = ApplyRecord.Action.GetHash();
-
- Arguments.push_back("-Build=build.action");
- Outputs.insert("Build.output");
- Outputs.insert("Outputs");
-
- InputFiles.insert(BuildActionPath);
- InputFileHashes[BuildActionPath] = ActionId;
- Data.Blobs[ActionId] = IoBufferBuilder::MakeCloneFromMemory(ApplyRecord.Action.GetBuffer().GetData(),
- ApplyRecord.Action.GetBuffer().GetSize());
-
- bool AnyErrors = false;
- ApplyRecord.Action.IterateAttachments([&](CbFieldView Field) {
- const IoHash Cid = Field.AsHash();
- const std::filesystem::path FilePath = {InputPath / Cid.ToHexString()};
-
- if (!m_CidStore.ContainsChunk(Cid))
- {
- Log().warn("process apply upstream FAILED, input CID chunk '{}' missing", Cid);
- AnyErrors = true;
- return;
- }
-
- if (InputFiles.contains(FilePath))
- {
- return;
- }
-
- InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = Cid;
- Data.Cids.insert(Cid);
- });
-
- if (AnyErrors)
- {
- return false;
- }
- }
- break;
- }
-
- const UpstreamDirectory RootDirectory = BuildDirectoryTree(InputFiles);
-
- CbObject Sandbox = BuildMerkleTreeDirectory(RootDirectory, InputFileHashes, Data.Cids, Data.Objects);
- const IoHash SandboxHash = Sandbox.GetHash();
- Data.Objects[SandboxHash] = std::move(Sandbox);
-
- {
- std::string_view HostPlatform = ApplyRecord.WorkerDescriptor["host"sv].AsString();
- if (HostPlatform.empty())
- {
- Log().warn("process apply upstream FAILED, 'host' platform not provided");
- return false;
- }
-
- int32_t LogicalCores = ApplyRecord.WorkerDescriptor["cores"sv].AsInt32();
- int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64();
- bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
-
- std::string Condition = fmt::format("Platform == '{}'", HostPlatform);
- if (HostPlatform == "Win64")
- {
- // TODO
- // Condition += " && Pool == 'Win-RemoteExec'";
- }
-
- std::map<std::string_view, int64_t> Resources;
- if (LogicalCores > 0)
- {
- Resources["LogicalCores"sv] = LogicalCores;
- }
- if (Memory > 0)
- {
- Resources["RAM"sv] = std::max(Memory / 1024LL / 1024LL / 1024LL, 1LL);
- }
-
- CbObject Requirements = BuildRequirements(Condition, Resources, Exclusive);
- const IoHash RequirementsId = Requirements.GetHash();
- Data.Objects[RequirementsId] = std::move(Requirements);
- Data.RequirementsId = RequirementsId;
- }
-
- CbObject Task = BuildTask(ExecutablePath, Arguments, Environment, WorkingDirectory, SandboxHash, Data.RequirementsId, Outputs);
-
- const IoHash TaskId = Task.GetHash();
- Data.Objects[TaskId] = std::move(Task);
- Data.TaskId = TaskId;
-
- return true;
- }
-
- [[nodiscard]] bool ProcessFileEntry(const CbObjectView& FileEntry,
- std::set<std::filesystem::path>& InputFiles,
- std::map<std::filesystem::path, IoHash>& InputFileHashes,
- std::set<IoHash>& CasIds)
- {
- const std::filesystem::path FilePath = FileEntry["name"sv].AsString();
- const IoHash ChunkId = FileEntry["hash"sv].AsHash();
- const uint64_t Size = FileEntry["size"sv].AsUInt64();
-
- if (!m_CidStore.ContainsChunk(ChunkId))
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' missing", ChunkId);
- return false;
- }
-
- if (InputFiles.contains(FilePath))
- {
- Log().warn("process apply upstream FAILED, worker CAS chunk '{}' size: {} duplicate filename {}", ChunkId, Size, FilePath);
- return false;
- }
-
- InputFiles.insert(FilePath);
- InputFileHashes[FilePath] = ChunkId;
- CasIds.insert(ChunkId);
- return true;
- }
-
- [[nodiscard]] UpstreamDirectory BuildDirectoryTree(const std::set<std::filesystem::path>& InputFiles)
- {
- static const std::filesystem::path RootPath;
- std::map<std::filesystem::path, UpstreamDirectory*> AllDirectories;
- UpstreamDirectory RootDirectory = {.Path = RootPath};
-
- AllDirectories[RootPath] = &RootDirectory;
-
- // Build tree from flat list
- for (const auto& Path : InputFiles)
- {
- if (Path.has_parent_path())
- {
- if (!AllDirectories.contains(Path.parent_path()))
- {
- std::stack<std::string> PathSplit;
- {
- std::filesystem::path ParentPath = Path.parent_path();
- PathSplit.push(ParentPath.filename().string());
- while (ParentPath.has_parent_path())
- {
- ParentPath = ParentPath.parent_path();
- PathSplit.push(ParentPath.filename().string());
- }
- }
- UpstreamDirectory* ParentPtr = &RootDirectory;
- while (!PathSplit.empty())
- {
- if (!ParentPtr->Directories.contains(PathSplit.top()))
- {
- std::filesystem::path NewParentPath = {ParentPtr->Path / PathSplit.top()};
- ParentPtr->Directories[PathSplit.top()] = {.Path = NewParentPath};
- AllDirectories[NewParentPath] = &ParentPtr->Directories[PathSplit.top()];
- }
- ParentPtr = &ParentPtr->Directories[PathSplit.top()];
- PathSplit.pop();
- }
- }
-
- AllDirectories[Path.parent_path()]->Files.insert(Path.filename().string());
- }
- else
- {
- RootDirectory.Files.insert(Path.filename().string());
- }
- }
-
- return RootDirectory;
- }
-
- [[nodiscard]] CbObject BuildMerkleTreeDirectory(const UpstreamDirectory& RootDirectory,
- const std::map<std::filesystem::path, IoHash>& InputFileHashes,
- const std::set<IoHash>& Cids,
- std::map<IoHash, CbObject>& Objects)
- {
- CbObjectWriter DirectoryTreeWriter;
-
- if (!RootDirectory.Files.empty())
- {
- DirectoryTreeWriter.BeginArray("f"sv);
- for (const auto& File : RootDirectory.Files)
- {
- const std::filesystem::path FilePath = {RootDirectory.Path / File};
- const IoHash& FileHash = InputFileHashes.at(FilePath);
- const bool Compressed = Cids.contains(FileHash);
- DirectoryTreeWriter.BeginObject();
- DirectoryTreeWriter.AddString("n"sv, File);
- DirectoryTreeWriter.AddBinaryAttachment("h"sv, FileHash);
- DirectoryTreeWriter.AddBool("c"sv, Compressed);
- DirectoryTreeWriter.EndObject();
- }
- DirectoryTreeWriter.EndArray();
- }
-
- if (!RootDirectory.Directories.empty())
- {
- DirectoryTreeWriter.BeginArray("d"sv);
- for (const auto& Item : RootDirectory.Directories)
- {
- CbObject Directory = BuildMerkleTreeDirectory(Item.second, InputFileHashes, Cids, Objects);
- const IoHash DirectoryHash = Directory.GetHash();
- Objects[DirectoryHash] = std::move(Directory);
-
- DirectoryTreeWriter.BeginObject();
- DirectoryTreeWriter.AddString("n"sv, Item.first);
- DirectoryTreeWriter.AddObjectAttachment("h"sv, DirectoryHash);
- DirectoryTreeWriter.EndObject();
- }
- DirectoryTreeWriter.EndArray();
- }
-
- return DirectoryTreeWriter.Save();
- }
-
- void ResolveMerkleTreeDirectory(const std::filesystem::path& ParentDirectory,
- const IoHash& DirectoryHash,
- const std::map<IoHash, IoBuffer>& Objects,
- std::map<std::filesystem::path, IoHash>& OutputFiles)
- {
- CbObject Directory = LoadCompactBinaryObject(Objects.at(DirectoryHash));
-
- for (auto& It : Directory["f"sv])
- {
- const CbObjectView FileObject = It.AsObjectView();
- const std::filesystem::path Path = ParentDirectory / FileObject["n"sv].AsString();
-
- OutputFiles[Path] = FileObject["h"sv].AsBinaryAttachment();
- }
-
- for (auto& It : Directory["d"sv])
- {
- const CbObjectView DirectoryObject = It.AsObjectView();
-
- ResolveMerkleTreeDirectory(ParentDirectory / DirectoryObject["n"sv].AsString(),
- DirectoryObject["h"sv].AsObjectAttachment(),
- Objects,
- OutputFiles);
- }
- }
-
- [[nodiscard]] CbObject BuildRequirements(const std::string_view Condition,
- const std::map<std::string_view, int64_t>& Resources,
- const bool Exclusive)
- {
- CbObjectWriter Writer;
- Writer.AddString("c", Condition);
- if (!Resources.empty())
- {
- Writer.BeginArray("r");
- for (const auto& Resource : Resources)
- {
- Writer.BeginArray();
- Writer.AddString(Resource.first);
- Writer.AddInteger(Resource.second);
- Writer.EndArray();
- }
- Writer.EndArray();
- }
- Writer.AddBool("e", Exclusive);
- return Writer.Save();
- }
-
- [[nodiscard]] CbObject BuildTask(const std::string_view Executable,
- const std::vector<std::string>& Arguments,
- const std::map<std::string, std::string>& Environment,
- const std::string_view WorkingDirectory,
- const IoHash& SandboxHash,
- const IoHash& RequirementsId,
- const std::set<std::string>& Outputs)
- {
- CbObjectWriter TaskWriter;
- TaskWriter.AddString("e"sv, Executable);
-
- if (!Arguments.empty())
- {
- TaskWriter.BeginArray("a"sv);
- for (const auto& Argument : Arguments)
- {
- TaskWriter.AddString(Argument);
- }
- TaskWriter.EndArray();
- }
-
- if (!Environment.empty())
- {
- TaskWriter.BeginArray("v"sv);
- for (const auto& Env : Environment)
- {
- TaskWriter.BeginArray();
- TaskWriter.AddString(Env.first);
- TaskWriter.AddString(Env.second);
- TaskWriter.EndArray();
- }
- TaskWriter.EndArray();
- }
-
- if (!WorkingDirectory.empty())
- {
- TaskWriter.AddString("w"sv, WorkingDirectory);
- }
-
- TaskWriter.AddObjectAttachment("s"sv, SandboxHash);
- TaskWriter.AddObjectAttachment("r"sv, RequirementsId);
-
- // Outputs
- if (!Outputs.empty())
- {
- TaskWriter.BeginArray("o"sv);
- for (const auto& Output : Outputs)
- {
- TaskWriter.AddString(Output);
- }
- TaskWriter.EndArray();
- }
-
- return TaskWriter.Save();
- }
- };
-} // namespace detail
-
-//////////////////////////////////////////////////////////////////////////
-
-std::unique_ptr<UpstreamApplyEndpoint>
-UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& StorageAuthConfig,
- CidStore& CidStore,
- AuthMgr& Mgr)
-{
- return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions,
- ComputeAuthConfig,
- StorageOptions,
- StorageAuthConfig,
- CidStore,
- Mgr);
-}
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/upstream/jupiter.cpp b/src/zenserver/upstream/jupiter.cpp
index 61d8a85cc..a67c497ad 100644
--- a/src/zenserver/upstream/jupiter.cpp
+++ b/src/zenserver/upstream/jupiter.cpp
@@ -281,7 +281,7 @@ CloudCacheSession::GetBlob(std::string_view Namespace, const IoHash& Key)
CloudCacheResult
CloudCacheSession::GetCompressedBlob(std::string_view Namespace, const IoHash& Key, std::filesystem::path TempFolderPath)
{
- ZEN_TRACE_CPU("HordeClient::GetCompressedBlob");
+ ZEN_TRACE_CPU("JupiterClient::GetCompressedBlob");
ExtendableStringBuilder<256> Uri;
std::string KeyString = Key.ToHexString();
@@ -338,7 +338,7 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace,
IoHash& OutPayloadHash,
std::filesystem::path TempFolderPath)
{
- ZEN_TRACE_CPU("HordeClient::GetInlineBlob");
+ ZEN_TRACE_CPU("JupiterClient::GetInlineBlob");
ExtendableStringBuilder<256> Uri;
std::string KeyString = Key.ToHexString();
@@ -401,7 +401,7 @@ CloudCacheSession::GetInlineBlob(std::string_view Namespace,
CloudCacheResult
CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
{
- ZEN_TRACE_CPU("HordeClient::GetObject");
+ ZEN_TRACE_CPU("JupiterClient::GetObject");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
@@ -453,7 +453,7 @@ CloudCacheSession::GetObject(std::string_view Namespace, const IoHash& Key)
PutRefResult
CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, IoBuffer Ref, ZenContentType RefType)
{
- ZEN_TRACE_CPU("HordeClient::PutRef");
+ ZEN_TRACE_CPU("JupiterClient::PutRef");
IoHash Hash = IoHash::HashBuffer(Ref.Data(), Ref.Size());
@@ -524,7 +524,7 @@ CloudCacheSession::PutRef(std::string_view Namespace, std::string_view BucketId,
FinalizeRefResult
CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view BucketId, const IoHash& Key, const IoHash& RefHash)
{
- ZEN_TRACE_CPU("HordeClient::FinalizeRef");
+ ZEN_TRACE_CPU("JupiterClient::FinalizeRef");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString() << "/finalize/"
@@ -593,7 +593,7 @@ CloudCacheSession::FinalizeRef(std::string_view Namespace, std::string_view Buck
CloudCacheResult
CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
{
- ZEN_TRACE_CPU("HordeClient::PutBlob");
+ ZEN_TRACE_CPU("JupiterClient::PutBlob");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/blobs/" << Namespace << "/" << Key.ToHexString();
@@ -642,7 +642,7 @@ CloudCacheSession::PutBlob(std::string_view Namespace, const IoHash& Key, IoBuff
CloudCacheResult
CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, IoBuffer Blob)
{
- ZEN_TRACE_CPU("HordeClient::PutCompressedBlob");
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
@@ -708,7 +708,7 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
CloudCacheResult
CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& Key, const CompositeBuffer& Payload)
{
- ZEN_TRACE_CPU("HordeClient::PutCompressedBlob");
+ ZEN_TRACE_CPU("JupiterClient::PutCompressedBlob");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/compressed-blobs/" << Namespace << "/" << Key.ToHexString();
@@ -766,7 +766,7 @@ CloudCacheSession::PutCompressedBlob(std::string_view Namespace, const IoHash& K
CloudCacheResult
CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBuffer Object)
{
- ZEN_TRACE_CPU("HordeClient::PutObject");
+ ZEN_TRACE_CPU("JupiterClient::PutObject");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString();
@@ -815,7 +815,7 @@ CloudCacheSession::PutObject(std::string_view Namespace, const IoHash& Key, IoBu
CloudCacheResult
CloudCacheSession::RefExists(std::string_view Namespace, std::string_view BucketId, const IoHash& Key)
{
- ZEN_TRACE_CPU("HordeClient::RefExists");
+ ZEN_TRACE_CPU("JupiterClient::RefExists");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/refs/" << Namespace << "/" << BucketId << "/" << Key.ToHexString();
@@ -860,7 +860,7 @@ CloudCacheSession::RefExists(std::string_view Namespace, std::string_view Bucket
GetObjectReferencesResult
CloudCacheSession::GetObjectReferences(std::string_view Namespace, const IoHash& Key)
{
- ZEN_TRACE_CPU("HordeClient::GetObjectReferences");
+ ZEN_TRACE_CPU("JupiterClient::GetObjectReferences");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/objects/" << Namespace << "/" << Key.ToHexString() << "/references";
@@ -951,107 +951,6 @@ CloudCacheSession::ObjectExists(std::string_view Namespace, const std::set<IoHas
return CacheTypeExists(Namespace, "objects"sv, Keys);
}
-CloudCacheResult
-CloudCacheSession::PostComputeTasks(IoBuffer TasksData)
-{
- ZEN_TRACE_CPU("HordeClient::PostComputeTasks");
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster();
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Content-Type", "application/x-ue-cb"}});
- Session.SetBody(cpr::Body{(const char*)TasksData.Data(), TasksData.Size()});
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
- if (!Result.Success)
- {
- ZEN_WARN(
- "CloudCacheSession::PostComputeTasks failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
- return Result;
-}
-
-CloudCacheResult
-CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
-{
- ZEN_TRACE_CPU("HordeClient::GetComputeUpdates");
-
- ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId
- << "?wait=" << WaitSeconds;
-
- cpr::Session& Session = GetSession();
- const CloudCacheAccessToken& AccessToken = GetAccessToken();
-
- Session.SetOption(cpr::Url{Uri.c_str()});
- Session.SetOption(cpr::Header{{"Authorization", AccessToken.Value}, {"Accept", "application/x-ue-cb"}});
- Session.SetOption(cpr::Body{});
-
- cpr::Response Response = Session.Post();
- ZEN_DEBUG("POST {}", Response);
-
- CloudCacheResult Result = detail::ConvertResponse(Response);
-
- if (Result.Success)
- {
- Result.Response = IoBufferBuilder::MakeCloneFromMemory(Response.text.data(), Response.text.size());
- }
- else
- {
- ZEN_WARN(
- "CloudCacheSession::GetComputeUpdates failed GET. "
- "Elapsed: {} s, "
- "Uri: '{}', "
- "Header-Authorization: '{} <redacted>', "
- "Header-Accept: '{}', "
- "Response.status_code: {}, "
- "Response.reason: '{}', "
- "Response.error.code: {}, "
- "Response.error.message: '{}', "
- "Response.raw_header: '{}'"
- "Response.text: '{}'",
- Response.elapsed,
- Uri,
- AccessToken.Value.substr(0, 6),
- "application/x-ue-cb",
- Response.status_code,
- Response.reason,
- gsl::narrow<int>(Response.error.code),
- Response.error.message,
- Response.raw_header,
- Response.text);
- }
-
- return Result;
-}
-
std::vector<IoHash>
CloudCacheSession::Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes)
{
@@ -1079,7 +978,7 @@ CloudCacheSession::GetAccessToken(bool RefreshToken)
CloudCacheResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const IoHash& Key)
{
- ZEN_TRACE_CPU("HordeClient::CacheTypeExists");
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << Namespace << "/" << Key.ToHexString();
@@ -1126,7 +1025,7 @@ CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view
CloudCacheExistsResult
CloudCacheSession::CacheTypeExists(std::string_view Namespace, std::string_view TypeId, const std::set<IoHash>& Keys)
{
- ZEN_TRACE_CPU("HordeClient::CacheTypeExists");
+ ZEN_TRACE_CPU("JupiterClient::CacheTypeExists");
ExtendableStringBuilder<256> Body;
Body << "[";
@@ -1311,7 +1210,7 @@ CloudCacheClient::~CloudCacheClient()
CloudCacheAccessToken
CloudCacheClient::AcquireAccessToken()
{
- ZEN_TRACE_CPU("HordeClient::AcquireAccessToken");
+ ZEN_TRACE_CPU("JupiterClient::AcquireAccessToken");
return m_TokenProvider->AcquireAccessToken();
}
diff --git a/src/zenserver/upstream/jupiter.h b/src/zenserver/upstream/jupiter.h
index 4e7d11fd9..ca84905d2 100644
--- a/src/zenserver/upstream/jupiter.h
+++ b/src/zenserver/upstream/jupiter.h
@@ -126,9 +126,6 @@ public:
CloudCacheExistsResult CompressedBlobExists(std::string_view Namespace, const std::set<IoHash>& Keys);
CloudCacheExistsResult ObjectExists(std::string_view Namespace, const std::set<IoHash>& Keys);
- CloudCacheResult PostComputeTasks(IoBuffer TasksData);
- CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
-
std::vector<IoHash> Filter(std::string_view Namespace, std::string_view BucketId, const std::vector<IoHash>& ChunkHashes);
CloudCacheClient& Client() { return *m_CacheClient; };
diff --git a/src/zenserver/upstream/upstreamapply.cpp b/src/zenserver/upstream/upstreamapply.cpp
deleted file mode 100644
index 3d29f2228..000000000
--- a/src/zenserver/upstream/upstreamapply.cpp
+++ /dev/null
@@ -1,459 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#include "upstreamapply.h"
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <zencore/compactbinary.h>
-# include <zencore/compactbinarybuilder.h>
-# include <zencore/fmtutils.h>
-# include <zencore/stream.h>
-# include <zencore/timer.h>
-# include <zencore/workthreadpool.h>
-
-# include <zenstore/cidstore.h>
-
-# include "diag/logging.h"
-
-# include <fmt/format.h>
-
-# include <atomic>
-
-namespace zen {
-
-using namespace std::literals;
-
-struct UpstreamApplyStats
-{
- static constexpr uint64_t MaxSampleCount = 1000ull;
-
- UpstreamApplyStats(bool Enabled) : m_Enabled(Enabled) {}
-
- void Add(UpstreamApplyEndpoint& Endpoint, const PostUpstreamApplyResult& Result)
- {
- UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
-
- if (Result.Error)
- {
- Stats.ErrorCount.Increment(1);
- }
- else if (Result.Success)
- {
- Stats.PostCount.Increment(1);
- Stats.UpBytes.Increment(Result.Bytes / 1024 / 1024);
- }
- }
-
- void Add(UpstreamApplyEndpoint& Endpoint, const GetUpstreamApplyUpdatesResult& Result)
- {
- UpstreamApplyEndpointStats& Stats = Endpoint.Stats();
-
- if (Result.Error)
- {
- Stats.ErrorCount.Increment(1);
- }
- else if (Result.Success)
- {
- Stats.UpdateCount.Increment(1);
- Stats.DownBytes.Increment(Result.Bytes / 1024 / 1024);
- if (!Result.Completed.empty())
- {
- uint64_t Completed = 0;
- for (auto& It : Result.Completed)
- {
- Completed += It.second.size();
- }
- Stats.CompleteCount.Increment(Completed);
- }
- }
- }
-
- bool m_Enabled;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-class UpstreamApplyImpl final : public UpstreamApply
-{
-public:
- UpstreamApplyImpl(const UpstreamApplyOptions& Options, CidStore& CidStore)
- : m_Log(logging::Get("upstream-apply"))
- , m_Options(Options)
- , m_CidStore(CidStore)
- , m_Stats(Options.StatsEnabled)
- , m_UpstreamAsyncWorkPool(Options.UpstreamThreadCount)
- , m_DownstreamAsyncWorkPool(Options.DownstreamThreadCount)
- {
- }
-
- virtual ~UpstreamApplyImpl() { Shutdown(); }
-
- virtual bool Initialize() override
- {
- for (auto& Endpoint : m_Endpoints)
- {
- const UpstreamEndpointHealth Health = Endpoint->Initialize();
- if (Health.Ok)
- {
- Log().info("initialize endpoint '{}' OK", Endpoint->DisplayName());
- }
- else
- {
- Log().warn("initialize endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
- }
- }
-
- m_RunState.IsRunning = !m_Endpoints.empty();
-
- if (m_RunState.IsRunning)
- {
- m_ShutdownEvent.Reset();
-
- m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this);
-
- m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this);
- }
-
- return m_RunState.IsRunning;
- }
-
- virtual bool IsHealthy() const override
- {
- if (m_RunState.IsRunning)
- {
- for (const auto& Endpoint : m_Endpoints)
- {
- if (Endpoint->IsHealthy())
- {
- return true;
- }
- }
- }
-
- return false;
- }
-
- virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) override
- {
- m_Endpoints.emplace_back(std::move(Endpoint));
- }
-
- virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) override
- {
- if (m_RunState.IsRunning)
- {
- const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
- const IoHash ActionId = ApplyRecord.Action.GetHash();
- const uint32_t TimeoutSeconds = ApplyRecord.WorkerDescriptor["timeout"sv].AsInt32(300);
-
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- // Already in progress
- return {.ApplyId = ActionId, .Success = true};
- }
-
- std::chrono::steady_clock::time_point ExpireTime =
- TimeoutSeconds > 0 ? std::chrono::steady_clock::now() + std::chrono::seconds(TimeoutSeconds)
- : std::chrono::steady_clock::time_point::max();
-
- m_ApplyTasks[WorkerId][ActionId] = {.State = UpstreamApplyState::Queued, .Result{}, .ExpireTime = std::move(ExpireTime)};
- }
-
- ApplyRecord.Timepoints["zen-queue-added"] = DateTime::NowTicks();
- m_UpstreamAsyncWorkPool.ScheduleWork(
- [this, ApplyRecord = std::move(ApplyRecord)]() { ProcessApplyRecord(std::move(ApplyRecord)); });
-
- return {.ApplyId = ActionId, .Success = true};
- }
-
- return {};
- }
-
- virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) override
- {
- if (m_RunState.IsRunning)
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- return {.Status = *Status, .Success = true};
- }
- }
-
- return {};
- }
-
- virtual void GetStatus(CbObjectWriter& Status) override
- {
- Status << "upstream_worker_threads" << m_Options.UpstreamThreadCount;
- Status << "upstream_queue_count" << m_UpstreamAsyncWorkPool.PendingWorkItemCount();
- Status << "downstream_worker_threads" << m_Options.DownstreamThreadCount;
- Status << "downstream_queue_count" << m_DownstreamAsyncWorkPool.PendingWorkItemCount();
-
- Status.BeginArray("endpoints");
- for (const auto& Ep : m_Endpoints)
- {
- Status.BeginObject();
- Status << "name" << Ep->DisplayName();
- Status << "health" << (Ep->IsHealthy() ? "ok"sv : "inactive"sv);
-
- UpstreamApplyEndpointStats& Stats = Ep->Stats();
- const uint64_t PostCount = Stats.PostCount.Value();
- const uint64_t CompleteCount = Stats.CompleteCount.Value();
- // const uint64_t UpdateCount = Stats.UpdateCount;
- const double CompleteRate = CompleteCount > 0 ? (double(PostCount) / double(CompleteCount)) : 0.0;
-
- Status << "post_count" << PostCount;
- Status << "complete_count" << PostCount;
- Status << "update_count" << Stats.UpdateCount.Value();
-
- Status << "complete_ratio" << CompleteRate;
- Status << "downloaded_mb" << Stats.DownBytes.Value();
- Status << "uploaded_mb" << Stats.UpBytes.Value();
- Status << "error_count" << Stats.ErrorCount.Value();
-
- Status.EndObject();
- }
- Status.EndArray();
- }
-
-private:
- // The caller is responsible for locking if required
- UpstreamApplyStatus* FindStatus(const IoHash& WorkerId, const IoHash& ActionId)
- {
- if (auto It = m_ApplyTasks.find(WorkerId); It != m_ApplyTasks.end())
- {
- if (auto It2 = It->second.find(ActionId); It2 != It->second.end())
- {
- return &It2->second;
- }
- }
- return nullptr;
- }
-
- void ProcessApplyRecord(UpstreamApplyRecord ApplyRecord)
- {
- const IoHash WorkerId = ApplyRecord.WorkerDescriptor.GetHash();
- const IoHash ActionId = ApplyRecord.Action.GetHash();
- try
- {
- for (auto& Endpoint : m_Endpoints)
- {
- if (Endpoint->IsHealthy())
- {
- ApplyRecord.Timepoints["zen-queue-dispatched"] = DateTime::NowTicks();
- PostUpstreamApplyResult Result = Endpoint->PostApply(std::move(ApplyRecord));
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- Status->Timepoints.merge(Result.Timepoints);
-
- if (Result.Success)
- {
- Status->State = UpstreamApplyState::Executing;
- }
- else
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = {.Error = std::move(Result.Error),
- .Bytes = Result.Bytes,
- .ElapsedSeconds = Result.ElapsedSeconds};
- }
- }
- }
- m_Stats.Add(*Endpoint, Result);
- return;
- }
- }
-
- Log().warn("process upstream apply ({}/{}) FAILED 'No available endpoint'", WorkerId, ActionId);
-
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = {.Error{.ErrorCode = -1, .Reason = "No available endpoint"}};
- }
- }
- }
- catch (std::exception& e)
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(WorkerId, ActionId); Status != nullptr)
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = {.Error{.ErrorCode = -1, .Reason = e.what()}};
- }
- Log().warn("process upstream apply ({}/{}) FAILED '{}'", WorkerId, ActionId, e.what());
- }
- }
-
- void ProcessApplyUpdates()
- {
- for (auto& Endpoint : m_Endpoints)
- {
- if (Endpoint->IsHealthy())
- {
- GetUpstreamApplyUpdatesResult Result = Endpoint->GetUpdates(m_DownstreamAsyncWorkPool);
- m_Stats.Add(*Endpoint, Result);
-
- if (!Result.Success)
- {
- Log().warn("process upstream apply updates FAILED '{}'", Result.Error.Reason);
- }
-
- if (!Result.Completed.empty())
- {
- for (auto& It : Result.Completed)
- {
- for (auto& It2 : It.second)
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- if (auto Status = FindStatus(It.first, It2.first); Status != nullptr)
- {
- Status->State = UpstreamApplyState::Complete;
- Status->Result = std::move(It2.second);
- Status->Result.Timepoints.merge(Status->Timepoints);
- Status->Result.Timepoints["zen-queue-complete"] = DateTime::NowTicks();
- Status->Timepoints.clear();
- }
- }
- }
- }
- }
- }
- }
-
- void ProcessUpstreamUpdates()
- {
- const auto& UpdateSleep = std::chrono::milliseconds(m_Options.UpdatesInterval);
- while (!m_ShutdownEvent.Wait(uint32_t(UpdateSleep.count())))
- {
- if (!m_RunState.IsRunning)
- {
- break;
- }
-
- ProcessApplyUpdates();
-
- // Remove any expired tasks, regardless of state
- {
- std::scoped_lock Lock(m_ApplyTasksMutex);
- for (auto& WorkerIt : m_ApplyTasks)
- {
- const auto Count = std::erase_if(WorkerIt.second, [](const auto& Item) {
- return Item.second.ExpireTime < std::chrono::steady_clock::now();
- });
- if (Count > 0)
- {
- Log().debug("Removed '{}' expired tasks", Count);
- }
- }
- const auto Count = std::erase_if(m_ApplyTasks, [](const auto& Item) { return Item.second.empty(); });
- if (Count > 0)
- {
- Log().debug("Removed '{}' empty task lists", Count);
- }
- }
- }
- }
-
- void MonitorEndpoints()
- {
- for (;;)
- {
- {
- std::unique_lock Lock(m_RunState.Mutex);
- if (m_RunState.ExitSignal.wait_for(Lock, m_Options.HealthCheckInterval, [this]() { return !m_RunState.IsRunning.load(); }))
- {
- break;
- }
- }
-
- for (auto& Endpoint : m_Endpoints)
- {
- if (!Endpoint->IsHealthy())
- {
- if (const UpstreamEndpointHealth Health = Endpoint->CheckHealth(); Health.Ok)
- {
- Log().warn("health check endpoint '{}' OK", Endpoint->DisplayName(), Health.Reason);
- }
- else
- {
- Log().warn("health check endpoint '{}' FAILED, reason '{}'", Endpoint->DisplayName(), Health.Reason);
- }
- }
- }
- }
- }
-
- void Shutdown()
- {
- if (m_RunState.Stop())
- {
- m_ShutdownEvent.Set();
- m_EndpointMonitorThread.join();
- m_UpstreamUpdatesThread.join();
- m_Endpoints.clear();
- }
- }
-
- spdlog::logger& Log() { return m_Log; }
-
- struct RunState
- {
- std::mutex Mutex;
- std::condition_variable ExitSignal;
- std::atomic_bool IsRunning{false};
-
- bool Stop()
- {
- bool Stopped = false;
- {
- std::scoped_lock Lock(Mutex);
- Stopped = IsRunning.exchange(false);
- }
- if (Stopped)
- {
- ExitSignal.notify_all();
- }
- return Stopped;
- }
- };
-
- spdlog::logger& m_Log;
- UpstreamApplyOptions m_Options;
- CidStore& m_CidStore;
- UpstreamApplyStats m_Stats;
- UpstreamApplyTasks m_ApplyTasks;
- std::mutex m_ApplyTasksMutex;
- std::vector<std::unique_ptr<UpstreamApplyEndpoint>> m_Endpoints;
- Event m_ShutdownEvent;
- WorkerThreadPool m_UpstreamAsyncWorkPool;
- WorkerThreadPool m_DownstreamAsyncWorkPool;
- std::thread m_UpstreamUpdatesThread;
- std::thread m_EndpointMonitorThread;
- RunState m_RunState;
-};
-
-//////////////////////////////////////////////////////////////////////////
-
-bool
-UpstreamApply::IsHealthy() const
-{
- return false;
-}
-
-std::unique_ptr<UpstreamApply>
-UpstreamApply::Create(const UpstreamApplyOptions& Options, CidStore& CidStore)
-{
- return std::make_unique<UpstreamApplyImpl>(Options, CidStore);
-}
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/upstream/upstreamapply.h b/src/zenserver/upstream/upstreamapply.h
deleted file mode 100644
index 4a095be6c..000000000
--- a/src/zenserver/upstream/upstreamapply.h
+++ /dev/null
@@ -1,192 +0,0 @@
-// Copyright Epic Games, Inc. All Rights Reserved.
-
-#pragma once
-
-#if ZEN_WITH_COMPUTE_SERVICES
-
-# include <zencore/compactbinarypackage.h>
-# include <zencore/iobuffer.h>
-# include <zencore/iohash.h>
-# include <zencore/stats.h>
-# include <zencore/zencore.h>
-
-# include <chrono>
-# include <map>
-# include <unordered_map>
-# include <unordered_set>
-
-namespace zen {
-
-class AuthMgr;
-class CbObjectWriter;
-class CidStore;
-class CloudCacheTokenProvider;
-class WorkerThreadPool;
-class ZenCacheNamespace;
-struct CloudCacheClientOptions;
-struct UpstreamAuthConfig;
-
-enum class UpstreamApplyState : int32_t
-{
- Queued = 0,
- Executing = 1,
- Complete = 2,
-};
-
-enum class UpstreamApplyType
-{
- Simple = 0,
- Asset = 1,
-};
-
-struct UpstreamApplyRecord
-{
- CbObject WorkerDescriptor;
- CbObject Action;
- UpstreamApplyType Type;
- std::map<std::string, uint64_t> Timepoints{};
-};
-
-struct UpstreamApplyOptions
-{
- std::chrono::seconds HealthCheckInterval{5};
- std::chrono::seconds UpdatesInterval{5};
- uint32_t UpstreamThreadCount = 4;
- uint32_t DownstreamThreadCount = 4;
- bool StatsEnabled = false;
-};
-
-struct UpstreamApplyError
-{
- int32_t ErrorCode{};
- std::string Reason{};
-
- explicit operator bool() const { return ErrorCode != 0; }
-};
-
-struct PostUpstreamApplyResult
-{
- UpstreamApplyError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- std::map<std::string, uint64_t> Timepoints{};
- bool Success = false;
-};
-
-struct GetUpstreamApplyResult
-{
- // UpstreamApplyType::Simple
- std::map<std::filesystem::path, IoHash> OutputFiles{};
- std::map<IoHash, IoBuffer> FileData{};
-
- // UpstreamApplyType::Asset
- CbPackage OutputPackage{};
- int64_t TotalAttachmentBytes{};
- int64_t TotalRawAttachmentBytes{};
-
- UpstreamApplyError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- std::string StdOut{};
- std::string StdErr{};
- std::string Agent{};
- std::string Detail{};
- std::map<std::string, uint64_t> Timepoints{};
- bool Success = false;
-};
-
-using UpstreamApplyCompleted = std::unordered_map<IoHash, std::unordered_map<IoHash, GetUpstreamApplyResult>>;
-
-struct GetUpstreamApplyUpdatesResult
-{
- UpstreamApplyError Error{};
- int64_t Bytes{};
- double ElapsedSeconds{};
- UpstreamApplyCompleted Completed{};
- bool Success = false;
-};
-
-struct UpstreamApplyStatus
-{
- UpstreamApplyState State{};
- GetUpstreamApplyResult Result{};
- std::chrono::steady_clock::time_point ExpireTime{};
- std::map<std::string, uint64_t> Timepoints{};
-};
-
-using UpstreamApplyTasks = std::unordered_map<IoHash, std::unordered_map<IoHash, UpstreamApplyStatus>>;
-
-struct UpstreamEndpointHealth
-{
- std::string Reason;
- bool Ok = false;
-};
-
-struct UpstreamApplyEndpointStats
-{
- metrics::Counter PostCount;
- metrics::Counter CompleteCount;
- metrics::Counter UpdateCount;
- metrics::Counter ErrorCount;
- metrics::Counter UpBytes;
- metrics::Counter DownBytes;
-};
-
-/**
- * The upstream apply endpoint is responsible for handling remote execution.
- */
-class UpstreamApplyEndpoint
-{
-public:
- virtual ~UpstreamApplyEndpoint() = default;
-
- virtual UpstreamEndpointHealth Initialize() = 0;
- virtual bool IsHealthy() const = 0;
- virtual UpstreamEndpointHealth CheckHealth() = 0;
- virtual std::string_view DisplayName() const = 0;
- virtual PostUpstreamApplyResult PostApply(UpstreamApplyRecord ApplyRecord) = 0;
- virtual GetUpstreamApplyUpdatesResult GetUpdates(WorkerThreadPool& ThreadPool) = 0;
- virtual UpstreamApplyEndpointStats& Stats() = 0;
-
- static std::unique_ptr<UpstreamApplyEndpoint> CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions,
- const UpstreamAuthConfig& ComputeAuthConfig,
- const CloudCacheClientOptions& StorageOptions,
- const UpstreamAuthConfig& StorageAuthConfig,
- CidStore& CidStore,
- AuthMgr& Mgr);
-};
-
-/**
- * Manages one or more upstream compute endpoints.
- */
-class UpstreamApply
-{
-public:
- virtual ~UpstreamApply() = default;
-
- virtual bool Initialize() = 0;
- virtual bool IsHealthy() const = 0;
- virtual void RegisterEndpoint(std::unique_ptr<UpstreamApplyEndpoint> Endpoint) = 0;
-
- struct EnqueueResult
- {
- IoHash ApplyId{};
- bool Success = false;
- };
-
- struct StatusResult
- {
- UpstreamApplyStatus Status{};
- bool Success = false;
- };
-
- virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
- virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
- virtual void GetStatus(CbObjectWriter& CbO) = 0;
-
- static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CidStore& CidStore);
-};
-
-} // namespace zen
-
-#endif // ZEN_WITH_COMPUTE_SERVICES
diff --git a/src/zenserver/upstream/upstreamcache.cpp b/src/zenserver/upstream/upstreamcache.cpp
index ac647130e..5f711d0bc 100644
--- a/src/zenserver/upstream/upstreamcache.cpp
+++ b/src/zenserver/upstream/upstreamcache.cpp
@@ -498,7 +498,7 @@ namespace detail {
}
else
{
- ZEN_WARN("Horde request for inline payload of {}/{}/{} has hash {}, expected hash {} from header",
+ ZEN_WARN("Jupiter request for inline payload of {}/{}/{} has hash {}, expected hash {} from header",
Namespace,
Request.Key.Bucket,
Request.Key.Hash.ToHexString(),
diff --git a/src/zenserver/zenserver.cpp b/src/zenserver/zenserver.cpp
index 1f5c91a58..789952a3c 100644
--- a/src/zenserver/zenserver.cpp
+++ b/src/zenserver/zenserver.cpp
@@ -217,17 +217,6 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_ProjectStore = new ProjectStore(*m_CidStore, m_DataRoot / "projects", m_GcManager, *m_JobQueue);
m_HttpProjectService.reset(new HttpProjectService{*m_CidStore, m_ProjectStore, m_StatsService, *m_AuthMgr});
-#if ZEN_WITH_COMPUTE_SERVICES
- if (ServerOptions.ComputeServiceEnabled)
- {
- InitializeCompute(ServerOptions);
- }
- else
- {
- ZEN_INFO("NOT instantiating compute services");
- }
-#endif // ZEN_WITH_COMPUTE_SERVICES
-
if (ServerOptions.StructuredCacheConfig.Enabled)
{
InitializeStructuredCache(ServerOptions);
@@ -250,16 +239,6 @@ ZenServer::Initialize(const ZenServerOptions& ServerOptions, ZenServerState::Zen
m_Http->RegisterService(*m_CidService);
-#if ZEN_WITH_COMPUTE_SERVICES
- if (ServerOptions.ComputeServiceEnabled)
- {
- if (m_HttpFunctionService != nullptr)
- {
- m_Http->RegisterService(*m_HttpFunctionService);
- }
- }
-#endif // ZEN_WITH_COMPUTE_SERVICES
-
m_FrontendService = std::make_unique<HttpFrontendService>(m_ContentRoot);
if (m_FrontendService)
@@ -532,60 +511,6 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
m_Http->RegisterService(*m_UpstreamService);
}
-#if ZEN_WITH_COMPUTE_SERVICES
-void
-ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
-{
- ServerOptions;
- const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
-
- // Horde compute upstream
- if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.StorageUrl.empty() == false)
- {
- ZEN_INFO("instantiating compute service");
-
- std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name;
-
- auto ComputeOptions =
- CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.HordeConfig.Url,
- .ComputeCluster = UpstreamConfig.HordeConfig.Cluster,
- .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
-
- auto ComputeAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl,
- .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId,
- .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret,
- .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider,
- .AccessToken = UpstreamConfig.HordeConfig.AccessToken};
-
- auto StorageOptions =
- CloudCacheClientOptions{.Name = EndpointName,
- .ServiceUrl = UpstreamConfig.HordeConfig.StorageUrl,
- .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace,
- .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
- .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
-
- auto StorageAuthConfig = UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.StorageOAuthUrl,
- .OAuthClientId = UpstreamConfig.HordeConfig.StorageOAuthClientId,
- .OAuthClientSecret = UpstreamConfig.HordeConfig.StorageOAuthClientSecret,
- .OpenIdProvider = UpstreamConfig.HordeConfig.StorageOpenIdProvider,
- .AccessToken = UpstreamConfig.HordeConfig.StorageAccessToken};
-
- m_HttpFunctionService = std::make_unique<HttpFunctionService>(*m_CidStore,
- ComputeOptions,
- StorageOptions,
- ComputeAuthConfig,
- StorageAuthConfig,
- *m_AuthMgr);
- }
- else
- {
- ZEN_INFO("NOT instantiating compute service (missing Horde or Storage config)");
- }
-}
-#endif // ZEN_WITH_COMPUTE_SERVICES
-
void
ZenServer::Run()
{
@@ -679,10 +604,6 @@ ZenServer::Cleanup()
m_UpstreamCache.reset();
m_CacheStore = {};
-#if ZEN_WITH_COMPUTE_SERVICES
- m_HttpFunctionService.reset();
-#endif // ZEN_WITH_COMPUTE_SERVICES
-
m_HttpProjectService.reset();
m_ProjectStore = {};
m_CidService.reset();
diff --git a/src/zenserver/zenserver.h b/src/zenserver/zenserver.h
index 2b1ae3842..0c28c1229 100644
--- a/src/zenserver/zenserver.h
+++ b/src/zenserver/zenserver.h
@@ -29,7 +29,6 @@ ZEN_THIRD_PARTY_INCLUDES_END
#include "admin/admin.h"
#include "cache/httpstructuredcache.h"
#include "cache/structuredcachestore.h"
-#include "compute/function.h"
#include "frontend/frontend.h"
#include "httpcidstore.h"
#include "objectstore/objectstore.h"
@@ -131,14 +130,11 @@ private:
std::unique_ptr<HttpUpstreamService> m_UpstreamService;
std::unique_ptr<HttpStructuredCacheService> m_StructuredCacheService;
HttpHealthService m_HealthService;
-#if ZEN_WITH_COMPUTE_SERVICES
- std::unique_ptr<HttpFunctionService> m_HttpFunctionService;
-#endif
- std::unique_ptr<HttpFrontendService> m_FrontendService;
- std::unique_ptr<HttpObjectStoreService> m_ObjStoreService;
- std::unique_ptr<VfsService> m_VfsService;
- std::unique_ptr<JobQueue> m_JobQueue;
- std::unique_ptr<HttpAdminService> m_AdminService;
+ std::unique_ptr<HttpFrontendService> m_FrontendService;
+ std::unique_ptr<HttpObjectStoreService> m_ObjStoreService;
+ std::unique_ptr<VfsService> m_VfsService;
+ std::unique_ptr<JobQueue> m_JobQueue;
+ std::unique_ptr<HttpAdminService> m_AdminService;
bool m_DebugOptionForcedCrash = false;
bool m_UseSentry = false;