aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoe Kirchoff <[email protected]>2022-03-17 09:55:09 -0700
committerGitHub <[email protected]>2022-03-17 09:55:09 -0700
commit7466cb93fbb9f4082dc253a328222dac8bbe58e4 (patch)
tree2b60020b7ab15867bfabf135bf8217aabe553c6d
parentIntroduced basic validation of the clang-format version (diff)
downloadzen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.tar.xz
zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.zip
Update horde compute to use Jupiter for storage (#60)
-rw-r--r--zenserver/compute/apply.cpp38
-rw-r--r--zenserver/compute/apply.h14
-rw-r--r--zenserver/config.cpp95
-rw-r--r--zenserver/config.h15
-rw-r--r--zenserver/upstream/jupiter.cpp16
-rw-r--r--zenserver/upstream/jupiter.h5
-rw-r--r--zenserver/upstream/upstreamapply.cpp295
-rw-r--r--zenserver/upstream/upstreamapply.h21
-rw-r--r--zenserver/zenserver.cpp81
9 files changed, 430 insertions, 150 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp
index e4d5697fa..044078aa4 100644
--- a/zenserver/compute/apply.cpp
+++ b/zenserver/compute/apply.cpp
@@ -6,6 +6,7 @@
# include <upstream/jupiter.h>
# include <upstream/upstreamapply.h>
+# include <upstream/upstreamcache.h>
# include <zencore/compactbinary.h>
# include <zencore/compactbinarybuilder.h>
# include <zencore/compactbinarypackage.h>
@@ -173,10 +174,10 @@ SandboxedFunctionJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE Ob
.grfAccessMode = GRANT_ACCESS,
.grfInheritance = grfInhericance,
.Trustee = {.pMultipleTrustee = nullptr,
- .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE,
- .TrusteeForm = TRUSTEE_IS_SID,
- .TrusteeType = TRUSTEE_IS_GROUP,
- .ptstrName = (PWSTR)m_AppContainerSid}};
+ .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE,
+ .TrusteeForm = TRUSTEE_IS_SID,
+ .TrusteeType = TRUSTEE_IS_GROUP,
+ .ptstrName = (PWSTR)m_AppContainerSid}};
PACL OldAcl = nullptr;
@@ -328,24 +329,29 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath)
////////////////////////////////////////////////////////////////////////////////
-HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir)
+HttpFunctionService::HttpFunctionService(CasStore& Store,
+ CidStore& InCidStore,
+ const std::filesystem::path& BaseDir,
+ const CloudCacheClientOptions& ComputeOptions,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ AuthMgr& Mgr)
: m_Log(logging::Get("apply"))
, m_CasStore(Store)
, m_CidStore(InCidStore)
, m_SandboxPath(BaseDir / "scratch")
, m_FunctionPath(BaseDir / "func")
{
- m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore);
-
- CloudCacheAccessToken AccessToken{.Value = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079",
- .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
-
- CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv,
- .DdcNamespace = "default"sv,
- .BlobStoreNamespace = "default"sv};
-
- auto HordeUpstreamEndpoint =
- MakeHordeUpstreamEndpoint(Options, CloudCacheTokenProvider::CreateFromStaticToken(AccessToken), m_CasStore, m_CidStore);
+ m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore);
+
+ auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions,
+ ComputeAuthConfig,
+ StorageOptions,
+ StorageAuthConfig,
+ m_CasStore,
+ m_CidStore,
+ Mgr);
m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint));
m_UpstreamApply->Initialize();
diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h
index 161e47e06..e00afcd61 100644
--- a/zenserver/compute/apply.h
+++ b/zenserver/compute/apply.h
@@ -23,6 +23,11 @@ namespace zen {
class CasStore;
class CidStore;
class UpstreamApply;
+class CloudCacheClient;
+class AuthMgr;
+
+struct UpstreamAuthConfig;
+struct CloudCacheClientOptions;
/**
* Lambda style compute function service
@@ -30,7 +35,14 @@ class UpstreamApply;
class HttpFunctionService : public HttpService
{
public:
- HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& SandboxBaseDir);
+ HttpFunctionService(CasStore& Store,
+ CidStore& InCidStore,
+ const std::filesystem::path& BaseDir,
+ const CloudCacheClientOptions& ComputeOptions,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ AuthMgr& Mgr);
~HttpFunctionService();
virtual const char* BaseUri() const override;
diff --git a/zenserver/config.cpp b/zenserver/config.cpp
index bcacc16c0..adb079d83 100644
--- a/zenserver/config.cpp
+++ b/zenserver/config.cpp
@@ -345,6 +345,62 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions)
cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"),
"");
+ options.add_option("compute",
+ "",
+ "upstream-horde-url",
+ "URL to a Horde instance.",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Url)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-oauth-url",
+ "URL to the OAuth provier",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-oauth-clientid",
+ "The OAuth client ID",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-oauth-clientsecret",
+ "The OAuth client secret",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-openid-provider",
+ "Name of a registered Open ID provider",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-token",
+ "A static authentication token",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-cluster",
+ "The Horde compute cluster id",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster)->default_value(""),
+ "");
+
+ options.add_option("compute",
+ "",
+ "upstream-horde-namespace",
+ "The Jupiter namespace to use with Horde compute",
+ cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace)->default_value(""),
+ "");
+
options.add_option("gc",
"",
"gc-enabled",
@@ -596,6 +652,45 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio
}
}
+ if (sol::optional<sol::table> ComputeConfig = lua["compute"])
+ {
+ ServerOptions.ComputeServiceEnabled = ComputeConfig->get_or("enable", ServerOptions.ComputeServiceEnabled);
+
+ if (auto UpstreamConfig = ComputeConfig->get<sol::optional<sol::table>>("upstream"))
+ {
+ if (auto HordeConfig = UpstreamConfig->get<sol::optional<sol::table>>("horde"))
+ {
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("name"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.Name);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("url"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.Url);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("oauthprovider"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("oauthclientid"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("oauthclientsecret"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("openidprovider"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("token"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("cluster"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster);
+ UpdateStringValueFromConfig(HordeConfig.value(),
+ std::string_view("namespace"),
+ ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace);
+ };
+ }
+ }
+
if (sol::optional<sol::table> GcConfig = lua["gc"])
{
ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0);
diff --git a/zenserver/config.h b/zenserver/config.h
index fd569bdb1..a7a7815a8 100644
--- a/zenserver/config.h
+++ b/zenserver/config.h
@@ -33,6 +33,19 @@ struct ZenUpstreamJupiterConfig
bool UseLegacyDdc = false;
};
+struct ZenUpstreamHordeConfig
+{
+ std::string Name;
+ std::string Url;
+ std::string OAuthUrl;
+ std::string OAuthClientId;
+ std::string OAuthClientSecret;
+ std::string OpenIdProvider;
+ std::string AccessToken;
+ std::string Cluster;
+ std::string Namespace;
+};
+
struct ZenUpstreamZenConfig
{
std::string Name;
@@ -51,6 +64,7 @@ enum class UpstreamCachePolicy : uint8_t
struct ZenUpstreamCacheConfig
{
ZenUpstreamJupiterConfig JupiterConfig;
+ ZenUpstreamHordeConfig HordeConfig;
ZenUpstreamZenConfig ZenConfig;
int32_t UpstreamThreadCount = 4;
int32_t ConnectTimeoutMilliseconds = 5000;
@@ -106,6 +120,7 @@ struct ZenServerOptions
bool IsTest = false;
bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements
bool StructuredCacheEnabled = true;
+ bool ComputeServiceEnabled = true;
bool ShouldCrash = false; // Option for testing crash handling
bool IsFirstRun = false;
bool NoSentry = false;
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp
index 2b064a610..eef1daab0 100644
--- a/zenserver/upstream/jupiter.cpp
+++ b/zenserver/upstream/jupiter.cpp
@@ -577,12 +577,12 @@ CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys)
}
CloudCacheResult
-CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData)
+CloudCacheSession::PostComputeTasks(IoBuffer TasksData)
{
ZEN_TRACE_CPU("HordeClient::PostComputeTasks");
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster();
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -609,8 +609,11 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa
CloudCacheResult
CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds)
{
+ ZEN_TRACE_CPU("HordeClient::GetComputeUpdates");
+
ExtendableStringBuilder<256> Uri;
- Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds;
+ Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId
+ << "?wait=" << WaitSeconds;
cpr::Session& Session = GetSession();
const CloudCacheAccessToken& AccessToken = GetAccessToken();
@@ -703,6 +706,8 @@ CloudCacheSession::VerifyAccessToken(long StatusCode)
CloudCacheResult
CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
{
+ ZEN_TRACE_CPU("HordeClient::CacheTypeExists");
+
ExtendableStringBuilder<256> Uri;
Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString();
@@ -731,6 +736,8 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key)
CloudCacheExistsResult
CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys)
{
+ ZEN_TRACE_CPU("HordeClient::CacheTypeExists");
+
ExtendableStringBuilder<256> Query;
for (const auto& Key : Keys)
{
@@ -766,7 +773,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas
{
IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size());
const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer);
- for (auto& Item : ExistsResponse["id"sv])
+ for (auto& Item : ExistsResponse["Needs"sv])
{
if (Item.IsHash())
{
@@ -878,6 +885,7 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std::
, m_ServiceUrl(Options.ServiceUrl)
, m_DdcNamespace(Options.DdcNamespace)
, m_BlobStoreNamespace(Options.BlobStoreNamespace)
+, m_ComputeCluster(Options.ComputeCluster)
, m_ConnectTimeout(Options.ConnectTimeout)
, m_Timeout(Options.Timeout)
, m_TokenProvider(std::move(TokenProvider))
diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h
index ddd7ea160..9854e6f1e 100644
--- a/zenserver/upstream/jupiter.h
+++ b/zenserver/upstream/jupiter.h
@@ -116,7 +116,7 @@ public:
CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys);
CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys);
- CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData);
+ CloudCacheResult PostComputeTasks(IoBuffer TasksData);
CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0);
CloudCacheResult GetObjectTree(const IoHash& Key);
@@ -167,6 +167,7 @@ struct CloudCacheClientOptions
std::string_view ServiceUrl;
std::string_view DdcNamespace;
std::string_view BlobStoreNamespace;
+ std::string_view ComputeCluster;
std::chrono::milliseconds ConnectTimeout{5000};
std::chrono::milliseconds Timeout{};
bool UseLegacyDdc = false;
@@ -184,6 +185,7 @@ public:
CloudCacheAccessToken AcquireAccessToken();
std::string_view DdcNamespace() const { return m_DdcNamespace; }
std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; }
+ std::string_view ComputeCluster() const { return m_ComputeCluster; }
std::string_view ServiceUrl() const { return m_ServiceUrl; }
spdlog::logger& Logger() { return m_Log; }
@@ -193,6 +195,7 @@ private:
std::string m_ServiceUrl;
std::string m_DdcNamespace;
std::string m_BlobStoreNamespace;
+ std::string m_ComputeCluster;
std::chrono::milliseconds m_ConnectTimeout{};
std::chrono::milliseconds m_Timeout{};
std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider;
diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp
index 63c334265..918697224 100644
--- a/zenserver/upstream/upstreamapply.cpp
+++ b/zenserver/upstream/upstreamapply.cpp
@@ -23,6 +23,9 @@
# include <zenstore/cas.h>
# include <zenstore/cidstore.h>
+# include <auth/authmgr.h>
+# include <upstream/upstreamcache.h>
+
# include "cache/structuredcachestore.h"
# include "diag/logging.h"
@@ -48,17 +51,76 @@ namespace detail {
class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint
{
public:
- HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options,
- std::unique_ptr<zen::CloudCacheTokenProvider> TokenProvider,
- CasStore& CasStore,
- CidStore& CidStore)
+ HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ CasStore& CasStore,
+ CidStore& CidStore,
+ AuthMgr& Mgr)
: m_Log(logging::Get("upstream-apply"))
+ , m_AuthMgr(Mgr)
, m_CasStore(CasStore)
, m_CidStore(CidStore)
{
- m_DisplayName = fmt::format("Horde - '{}'", Options.ServiceUrl);
- m_Client = new CloudCacheClient(Options, std::move(TokenProvider));
+ m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl);
m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString());
+
+ {
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+
+ if (ComputeAuthConfig.OAuthUrl.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl,
+ .ClientId = ComputeAuthConfig.OAuthClientId,
+ .ClientSecret = ComputeAuthConfig.OAuthClientSecret});
+ }
+ else if (ComputeAuthConfig.OpenIdProvider.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() {
+ AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+ else
+ {
+ CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken),
+ .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
+ TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ }
+
+ m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider));
+ }
+
+ {
+ std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
+
+ if (StorageAuthConfig.OAuthUrl.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl,
+ .ClientId = StorageAuthConfig.OAuthClientId,
+ .ClientSecret = StorageAuthConfig.OAuthClientSecret});
+ }
+ else if (StorageAuthConfig.OpenIdProvider.empty() == false)
+ {
+ TokenProvider =
+ CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() {
+ AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName);
+ return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
+ });
+ }
+ else
+ {
+ CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken),
+ .ExpireTime = CloudCacheAccessToken::TimePoint::max()};
+ TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken);
+ }
+
+ m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider));
+ }
}
virtual ~HordeUpstreamApplyEndpoint() = default;
@@ -109,10 +171,11 @@ namespace detail {
m_PendingTasks[UpstreamData.TaskId] = ApplyRecord;
}
- CloudCacheSession Session(m_Client);
+ CloudCacheSession ComputeSession(m_Client);
+ CloudCacheSession StorageSession(m_StorageClient);
{
- CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs);
+ CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
@@ -126,7 +189,7 @@ namespace detail {
}
{
- CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects);
+ CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
@@ -136,17 +199,33 @@ namespace detail {
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds};
}
+
+ Result = StorageSession.PutRef("requests"sv,
+ UpstreamData.TaskId,
+ UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(),
+ ZenContentType::kCbObject);
+
+ Bytes += Result.Bytes;
+ ElapsedSeconds += Result.ElapsedSeconds;
+ if (!Result.Success)
+ {
+ return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
UpstreamData.Objects.clear();
}
CbObjectWriter Writer;
+ Writer.AddString("c"sv, m_ChannelId);
Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId);
Writer.BeginArray("t"sv);
Writer.AddObjectAttachment(UpstreamData.TaskId);
Writer.EndArray();
IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer();
- CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData);
+ CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
if (!Result.Success)
@@ -214,8 +293,8 @@ namespace detail {
{
return {.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"};
+ .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"};
}
}
@@ -265,8 +344,8 @@ namespace detail {
{
return {.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds,
- .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1,
- .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"};
+ .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1,
+ .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"};
}
}
@@ -330,9 +409,10 @@ namespace detail {
try
{
- CloudCacheSession Session(m_Client);
+ CloudCacheSession ComputeSession(m_Client);
+ CloudCacheSession StorageSession(m_StorageClient);
- CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId);
+ CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId);
Bytes += UpdatesResult.Bytes;
ElapsedSeconds += UpdatesResult.ElapsedSeconds;
if (!UpdatesResult.Success)
@@ -349,9 +429,6 @@ namespace detail {
CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response);
- // zen::StringBuilder<4096> ObjStr;
- // zen::CompactBinaryToJson(TaskStatus, ObjStr);
-
for (auto& It : TaskStatus["u"sv])
{
CbObjectView Status = It.AsObjectView();
@@ -366,7 +443,7 @@ namespace detail {
continue;
}
- const IoHash TaskId = Status["h"sv].AsObjectAttachment();
+ const IoHash TaskId = Status["h"sv].AsHash();
IoHash WorkerId;
IoHash ActionId;
@@ -383,7 +460,7 @@ namespace detail {
m_PendingTasks.erase(TaskIt);
}
- GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session);
+ GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession);
Bytes += Result.Bytes;
ElapsedSeconds += Result.ElapsedSeconds;
@@ -411,9 +488,11 @@ namespace detail {
CasStore& m_CasStore;
CidStore& m_CidStore;
+ AuthMgr& m_AuthMgr;
spdlog::logger& m_Log;
std::string m_DisplayName;
RefPtr<CloudCacheClient> m_Client;
+ RefPtr<CloudCacheClient> m_StorageClient;
UpstreamApplyEndpointStats m_Stats;
std::atomic_bool m_HealthOk{false};
std::string m_ChannelId;
@@ -453,9 +532,9 @@ namespace detail {
return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}};
}
- const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment();
+ const IoHash TaskId = TaskStatus["h"sv].AsHash();
const DateTime Time = TaskStatus["t"sv].AsDateTime();
- const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment();
+ const IoHash ResultHash = TaskStatus["r"sv].AsHash();
const std::string_view AgentId = TaskStatus["a"sv].AsString();
const std::string_view LeaseId = TaskStatus["l"sv].AsString();
@@ -463,105 +542,96 @@ namespace detail {
double ElapsedSeconds{};
// Get Result object and all Object Attachments + Binary Attachment IDs
- CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash);
- Bytes += ObjectTreeResult.Bytes;
- ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
+ CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject);
+ Bytes += ObjectRefResult.Bytes;
+ ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
- if (!ObjectTreeResult.Success)
+ if (!ObjectRefResult.Success)
{
return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"},
.Bytes = Bytes,
.ElapsedSeconds = ElapsedSeconds};
}
- std::map<IoHash, IoBuffer> TreeObjectData;
- std::map<IoHash, IoBuffer> TreeBinaryData;
+ std::vector<IoHash> ObjectsToIterate;
+ std::map<IoHash, IoBuffer> ObjectData;
+ std::map<IoHash, IoBuffer> BinaryData;
- MemoryView ResponseView = ObjectTreeResult.Response;
- while (ResponseView.GetSize() > 0)
- {
- CbFieldView Field = CbFieldView(ResponseView.GetData());
- ResponseView += Field.GetSize();
+ ObjectData[ResultHash] = ObjectRefResult.Response;
+ CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]);
+ Object.IterateAttachments([&](CbFieldView Field) {
if (Field.IsObjectAttachment())
{
- const IoHash Hash = Field.AsObjectAttachment();
- Field = CbFieldView(ResponseView.GetData());
- ResponseView += Field.GetSize();
- if (!Field.IsObject()) // No data
+ const IoHash AttachmentHash = Field.AsObjectAttachment();
+ if (!ObjectData.contains(AttachmentHash))
{
- TreeObjectData[Hash] = {};
- continue;
+ ObjectsToIterate.push_back(AttachmentHash);
}
- MemoryView FieldView = Field.AsObjectView().GetView();
-
- TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize());
}
else if (Field.IsBinaryAttachment())
{
- const IoHash Hash = Field.AsBinaryAttachment();
- TreeBinaryData[Hash] = {};
- }
- else // Unknown type
- {
+ const IoHash AttachmentHash = Field.AsBinaryAttachment();
+ BinaryData[AttachmentHash] = {};
}
- }
+ });
- for (auto& It : TreeObjectData)
+ while (!ObjectsToIterate.empty())
{
- if (It.second.GetSize() == 0)
+ const IoHash Hash = ObjectsToIterate.back();
+ ObjectsToIterate.pop_back();
+
+ CloudCacheResult ObjectResult = Session.GetObject(Hash);
+ Bytes += ObjectRefResult.Bytes;
+ ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
+ if (!ObjectResult.Success)
{
- CloudCacheResult ObjectResult = Session.GetObject(It.first);
- Bytes += ObjectTreeResult.Bytes;
- ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
- if (!ObjectTreeResult.Success)
+ return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
+ }
+ ObjectData[Hash] = std::move(ObjectResult.Response);
+
+ CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]);
+ IterateObject.IterateAttachments([&](CbFieldView Field) {
+ if (Field.IsObjectAttachment())
{
- return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
+ const IoHash AttachmentHash = Field.AsObjectAttachment();
+ if (!ObjectData.contains(AttachmentHash))
+ {
+ ObjectsToIterate.push_back(AttachmentHash);
+ }
}
-
- if (!ObjectResult.Success)
+ else if (Field.IsBinaryAttachment())
{
- return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
+ const IoHash AttachmentHash = Field.AsBinaryAttachment();
+ BinaryData[AttachmentHash] = {};
}
- It.second = std::move(ObjectResult.Response);
- }
+ });
}
- for (auto& It : TreeBinaryData)
+ // Batch load all binary data
+ for (auto& It : BinaryData)
{
- if (It.second.GetSize() == 0)
+ CloudCacheResult BlobResult = Session.GetBlob(It.first);
+ Bytes += ObjectRefResult.Bytes;
+ ElapsedSeconds += ObjectRefResult.ElapsedSeconds;
+ if (!BlobResult.Success)
{
- CloudCacheResult BlobResult = Session.GetBlob(It.first);
- Bytes += ObjectTreeResult.Bytes;
- ElapsedSeconds += ObjectTreeResult.ElapsedSeconds;
- if (!BlobResult.Success)
- {
- return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
-
- if (!BlobResult.Success)
- {
- return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"},
- .Bytes = Bytes,
- .ElapsedSeconds = ElapsedSeconds};
- }
- It.second = std::move(BlobResult.Response);
+ return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)},
+ .Bytes = Bytes,
+ .ElapsedSeconds = ElapsedSeconds};
}
+ It.second = std::move(BlobResult.Response);
}
- CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]);
+ CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]);
int32_t ExitCode = ResultObject["e"sv].AsInt32();
IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment();
IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment();
IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment();
- std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize());
- std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize());
+ std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize());
+ std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize());
if (ExitCode != 0)
{
@@ -572,7 +642,7 @@ namespace detail {
.StdErr = std::move(StdErr)};
}
- CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]);
+ CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]);
// Get build.output
IoHash BuildOutputId;
@@ -583,7 +653,7 @@ namespace detail {
if (FileObject["n"sv].AsString() == "Build.output"sv)
{
BuildOutputId = FileObject["h"sv].AsBinaryAttachment();
- BuildOutput = TreeBinaryData[BuildOutputId];
+ BuildOutput = BinaryData[BuildOutputId];
break;
}
}
@@ -604,7 +674,7 @@ namespace detail {
const CbObjectView DirectoryObject = It.AsObjectView();
if (DirectoryObject["n"sv].AsString() == "Outputs"sv)
{
- OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
+ OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()];
break;
}
}
@@ -636,7 +706,7 @@ namespace detail {
// Hash is the compressed data hash, and how it is stored in Horde
IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment();
- if (!TreeBinaryData.contains(CompressedId))
+ if (!BinaryData.contains(CompressedId))
{
Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString());
return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"},
@@ -659,7 +729,7 @@ namespace detail {
}
const IoHash& CompressedId = CidToCompressedId.at(DecompressedId);
- if (!TreeBinaryData.contains(CompressedId))
+ if (!BinaryData.contains(CompressedId))
{
Log().warn("Missing output {} compressed {} uncompressed",
CompressedId.ToHexString(),
@@ -668,7 +738,7 @@ namespace detail {
return;
}
- CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId]));
+ CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId]));
if (!AttachmentBuffer)
{
@@ -826,11 +896,11 @@ namespace detail {
int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64();
bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool();
- // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac)
std::string Condition = fmt::format("Platform == '{}'", HostPlatform);
if (HostPlatform == "Win64")
{
- Condition += " && Pool == 'Win-RemoteExec'";
+ // TODO
+ // Condition += " && Pool == 'Win-RemoteExec'";
}
std::map<std::string_view, int64_t> Resources;
@@ -1176,10 +1246,10 @@ struct UpstreamApplyStats
//////////////////////////////////////////////////////////////////////////
-class DefaultUpstreamApply final : public UpstreamApply
+class UpstreamApplyImpl final : public UpstreamApply
{
public:
- DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+ UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
: m_Log(logging::Get("upstream-apply"))
, m_Options(Options)
, m_CasStore(CasStore)
@@ -1188,7 +1258,7 @@ public:
{
}
- virtual ~DefaultUpstreamApply() { Shutdown(); }
+ virtual ~UpstreamApplyImpl() { Shutdown(); }
virtual bool Initialize() override
{
@@ -1213,12 +1283,12 @@ public:
for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++)
{
- m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this);
+ m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this);
}
- m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this);
+ m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this);
- m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this);
+ m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this);
}
return m_RunState.IsRunning;
@@ -1558,18 +1628,27 @@ private:
//////////////////////////////////////////////////////////////////////////
std::unique_ptr<UpstreamApply>
-MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
+UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore)
{
- return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore);
+ return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore);
}
std::unique_ptr<UpstreamApplyEndpoint>
-MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options,
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider,
- CasStore& CasStore,
- CidStore& CidStore)
+UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ CasStore& CasStore,
+ CidStore& CidStore,
+ AuthMgr& Mgr)
{
- return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, std::move(TokenProvider), CasStore, CidStore);
+ return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions,
+ ComputeAuthConfig,
+ StorageOptions,
+ StorageAuthConfig,
+ CasStore,
+ CidStore,
+ Mgr);
}
} // namespace zen
diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h
index c56a22ac3..44c08e30e 100644
--- a/zenserver/upstream/upstreamapply.h
+++ b/zenserver/upstream/upstreamapply.h
@@ -25,6 +25,8 @@ class CidStore;
class ZenCacheStore;
struct CloudCacheClientOptions;
class CloudCacheTokenProvider;
+struct UpstreamAuthConfig;
+class AuthMgr;
enum class UpstreamApplyState : int32_t
{
@@ -129,10 +131,18 @@ public:
virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0;
virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0;
virtual UpstreamApplyEndpointStats& Stats() = 0;
+
+ static std::unique_ptr<UpstreamApplyEndpoint> CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions,
+ const UpstreamAuthConfig& ComputeAuthConfig,
+ const CloudCacheClientOptions& StorageOptions,
+ const UpstreamAuthConfig& StorageAuthConfig,
+ CasStore& CasStore,
+ CidStore& CidStore,
+ AuthMgr& Mgr);
};
/**
- * Manages one or more upstream cache endpoints.
+ * Manages one or more upstream compute endpoints.
*/
class UpstreamApply
{
@@ -157,14 +167,9 @@ public:
virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0;
virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0;
virtual void GetStatus(CbObjectWriter& CbO) = 0;
-};
-std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
-
-std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options,
- std::unique_ptr<CloudCacheTokenProvider> TokenProvider,
- CasStore& CasStore,
- CidStore& CidStore);
+ static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore);
+};
} // namespace zen
diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp
index a684272c4..0d9126334 100644
--- a/zenserver/zenserver.cpp
+++ b/zenserver/zenserver.cpp
@@ -279,15 +279,20 @@ public:
#endif
#if ZEN_WITH_COMPUTE_SERVICES
- ZEN_INFO("instantiating compute services");
+ if (ServerOptions.ComputeServiceEnabled)
+ {
+ ZEN_INFO("instantiating compute services");
- std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
- zen::CreateDirectories(SandboxDir);
- m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
+ std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox";
+ zen::CreateDirectories(SandboxDir);
+ m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir);
- std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply";
- zen::CreateDirectories(ApplySandboxDir);
- m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, *m_CidStore, ApplySandboxDir);
+ InitializeCompute(ServerOptions);
+ }
+ else
+ {
+ ZEN_INFO("NOT instantiating compute services");
+ }
#endif // ZEN_WITH_COMPUTE_SERVICES
if (ServerOptions.StructuredCacheEnabled)
@@ -327,14 +332,14 @@ public:
m_Http->RegisterService(m_CasService);
#if ZEN_WITH_COMPUTE_SERVICES
- if (m_HttpLaunchService)
+ if (ServerOptions.ComputeServiceEnabled)
{
m_Http->RegisterService(*m_HttpLaunchService);
- }
- if (m_HttpFunctionService)
- {
- m_Http->RegisterService(*m_HttpFunctionService);
+ if (m_HttpFunctionService != nullptr)
+ {
+ m_Http->RegisterService(*m_HttpFunctionService);
+ }
}
#endif // ZEN_WITH_COMPUTE_SERVICES
@@ -360,6 +365,7 @@ public:
void InitializeState(const ZenServerOptions& ServerOptions);
void InitializeStructuredCache(const ZenServerOptions& ServerOptions);
+ void InitializeCompute(const ZenServerOptions& ServerOptions);
#if ZEN_ENABLE_MESH
void StartMesh(int BasePort)
@@ -821,6 +827,57 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions)
m_Http->RegisterService(*m_UpstreamService);
}
+void
+ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions)
+{
+ ServerOptions;
+ const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig;
+
+ // Horde compute upstream
+ if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.Url.empty() == false)
+ {
+ std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name;
+
+ auto ComputeOptions =
+ zen::CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.HordeConfig.Url,
+ .ComputeCluster = UpstreamConfig.HordeConfig.Cluster,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds),
+ .UseLegacyDdc = false};
+
+ auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.HordeConfig.AccessToken};
+
+ auto StorageOptions =
+ zen::CloudCacheClientOptions{.Name = EndpointName,
+ .ServiceUrl = UpstreamConfig.JupiterConfig.Url,
+ .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace,
+ .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds),
+ .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)};
+
+ auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl,
+ .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId,
+ .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret,
+ .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider,
+ .AccessToken = UpstreamConfig.JupiterConfig.AccessToken};
+
+ std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply";
+ zen::CreateDirectories(ApplySandboxDir);
+ m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore,
+ *m_CidStore,
+ ApplySandboxDir,
+ ComputeOptions,
+ StorageOptions,
+ ComputeAuthConfig,
+ StorageAuthConfig,
+ *m_AuthMgr);
+ }
+}
+
////////////////////////////////////////////////////////////////////////////////
class ZenEntryPoint