diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-17 09:55:09 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-17 09:55:09 -0700 |
| commit | 7466cb93fbb9f4082dc253a328222dac8bbe58e4 (patch) | |
| tree | 2b60020b7ab15867bfabf135bf8217aabe553c6d | |
| parent | Introduced basic validation of the clang-format version (diff) | |
| download | zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.tar.xz zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.zip | |
Update horde compute to use Jupiter for storage (#60)
| -rw-r--r-- | zenserver/compute/apply.cpp | 38 | ||||
| -rw-r--r-- | zenserver/compute/apply.h | 14 | ||||
| -rw-r--r-- | zenserver/config.cpp | 95 | ||||
| -rw-r--r-- | zenserver/config.h | 15 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 16 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 5 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 295 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 21 | ||||
| -rw-r--r-- | zenserver/zenserver.cpp | 81 |
9 files changed, 430 insertions, 150 deletions
diff --git a/zenserver/compute/apply.cpp b/zenserver/compute/apply.cpp index e4d5697fa..044078aa4 100644 --- a/zenserver/compute/apply.cpp +++ b/zenserver/compute/apply.cpp @@ -6,6 +6,7 @@ # include <upstream/jupiter.h> # include <upstream/upstreamapply.h> +# include <upstream/upstreamcache.h> # include <zencore/compactbinary.h> # include <zencore/compactbinarybuilder.h> # include <zencore/compactbinarypackage.h> @@ -173,10 +174,10 @@ SandboxedFunctionJob::GrantNamedObjectAccess(PWSTR ObjectName, SE_OBJECT_TYPE Ob .grfAccessMode = GRANT_ACCESS, .grfInheritance = grfInhericance, .Trustee = {.pMultipleTrustee = nullptr, - .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, - .TrusteeForm = TRUSTEE_IS_SID, - .TrusteeType = TRUSTEE_IS_GROUP, - .ptstrName = (PWSTR)m_AppContainerSid}}; + .MultipleTrusteeOperation = NO_MULTIPLE_TRUSTEE, + .TrusteeForm = TRUSTEE_IS_SID, + .TrusteeType = TRUSTEE_IS_GROUP, + .ptstrName = (PWSTR)m_AppContainerSid}}; PACL OldAcl = nullptr; @@ -328,24 +329,29 @@ SandboxedFunctionJob::SpawnJob(std::filesystem::path ExePath) //////////////////////////////////////////////////////////////////////////////// -HttpFunctionService::HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& BaseDir) +HttpFunctionService::HttpFunctionService(CasStore& Store, + CidStore& InCidStore, + const std::filesystem::path& BaseDir, + const CloudCacheClientOptions& ComputeOptions, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const UpstreamAuthConfig& StorageAuthConfig, + AuthMgr& Mgr) : m_Log(logging::Get("apply")) , m_CasStore(Store) , m_CidStore(InCidStore) , m_SandboxPath(BaseDir / "scratch") , m_FunctionPath(BaseDir / "func") { - m_UpstreamApply = MakeUpstreamApply({}, m_CasStore, m_CidStore); - - CloudCacheAccessToken AccessToken{.Value = "ServiceAccount 0f8056b30bd0df0959be55fc3338159b6f938456d3474aed0087fb965268d079", - .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; - - CloudCacheClientOptions Options = {.ServiceUrl = "https://horde.devtools-dev.epicgames.com"sv, - .DdcNamespace = "default"sv, - .BlobStoreNamespace = "default"sv}; - - auto HordeUpstreamEndpoint = - MakeHordeUpstreamEndpoint(Options, CloudCacheTokenProvider::CreateFromStaticToken(AccessToken), m_CasStore, m_CidStore); + m_UpstreamApply = UpstreamApply::Create({}, m_CasStore, m_CidStore); + + auto HordeUpstreamEndpoint = UpstreamApplyEndpoint::CreateHordeEndpoint(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + m_CasStore, + m_CidStore, + Mgr); m_UpstreamApply->RegisterEndpoint(std::move(HordeUpstreamEndpoint)); m_UpstreamApply->Initialize(); diff --git a/zenserver/compute/apply.h b/zenserver/compute/apply.h index 161e47e06..e00afcd61 100644 --- a/zenserver/compute/apply.h +++ b/zenserver/compute/apply.h @@ -23,6 +23,11 @@ namespace zen { class CasStore; class CidStore; class UpstreamApply; +class CloudCacheClient; +class AuthMgr; + +struct UpstreamAuthConfig; +struct CloudCacheClientOptions; /** * Lambda style compute function service @@ -30,7 +35,14 @@ class UpstreamApply; class HttpFunctionService : public HttpService { public: - HttpFunctionService(CasStore& Store, CidStore& InCidStore, const std::filesystem::path& SandboxBaseDir); + HttpFunctionService(CasStore& Store, + CidStore& InCidStore, + const std::filesystem::path& BaseDir, + const CloudCacheClientOptions& ComputeOptions, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const UpstreamAuthConfig& StorageAuthConfig, + AuthMgr& Mgr); ~HttpFunctionService(); virtual const char* BaseUri() const override; diff --git a/zenserver/config.cpp b/zenserver/config.cpp index bcacc16c0..adb079d83 100644 --- a/zenserver/config.cpp +++ b/zenserver/config.cpp @@ -345,6 +345,62 @@ ParseCliOptions(int argc, char* argv[], ZenServerOptions& ServerOptions) cxxopts::value<int32_t>(ServerOptions.UpstreamCacheConfig.TimeoutMilliseconds)->default_value("0"), ""); + options.add_option("compute", + "", + "upstream-horde-url", + "URL to a Horde instance.", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Url)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-oauth-url", + "URL to the OAuth provier", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-oauth-clientid", + "The OAuth client ID", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-oauth-clientsecret", + "The OAuth client secret", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-openid-provider", + "Name of a registered Open ID provider", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-token", + "A static authentication token", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-cluster", + "The Horde compute cluster id", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster)->default_value(""), + ""); + + options.add_option("compute", + "", + "upstream-horde-namespace", + "The Jupiter namespace to use with Horde compute", + cxxopts::value<std::string>(ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace)->default_value(""), + ""); + options.add_option("gc", "", "gc-enabled", @@ -596,6 +652,45 @@ ParseConfigFile(const std::filesystem::path& Path, ZenServerOptions& ServerOptio } } + if (sol::optional<sol::table> ComputeConfig = lua["compute"]) + { + ServerOptions.ComputeServiceEnabled = ComputeConfig->get_or("enable", ServerOptions.ComputeServiceEnabled); + + if (auto UpstreamConfig = ComputeConfig->get<sol::optional<sol::table>>("upstream")) + { + if (auto HordeConfig = UpstreamConfig->get<sol::optional<sol::table>>("horde")) + { + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("name"), + ServerOptions.UpstreamCacheConfig.HordeConfig.Name); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("url"), + ServerOptions.UpstreamCacheConfig.HordeConfig.Url); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("oauthprovider"), + ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthUrl); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("oauthclientid"), + ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientId); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("oauthclientsecret"), + ServerOptions.UpstreamCacheConfig.HordeConfig.OAuthClientSecret); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("openidprovider"), + ServerOptions.UpstreamCacheConfig.HordeConfig.OpenIdProvider); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("token"), + ServerOptions.UpstreamCacheConfig.HordeConfig.AccessToken); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("cluster"), + ServerOptions.UpstreamCacheConfig.HordeConfig.Cluster); + UpdateStringValueFromConfig(HordeConfig.value(), + std::string_view("namespace"), + ServerOptions.UpstreamCacheConfig.HordeConfig.Namespace); + }; + } + } + if (sol::optional<sol::table> GcConfig = lua["gc"]) { ServerOptions.GcConfig.IntervalSeconds = GcConfig.value().get_or("intervalseconds", 0); diff --git a/zenserver/config.h b/zenserver/config.h index fd569bdb1..a7a7815a8 100644 --- a/zenserver/config.h +++ b/zenserver/config.h @@ -33,6 +33,19 @@ struct ZenUpstreamJupiterConfig bool UseLegacyDdc = false; }; +struct ZenUpstreamHordeConfig +{ + std::string Name; + std::string Url; + std::string OAuthUrl; + std::string OAuthClientId; + std::string OAuthClientSecret; + std::string OpenIdProvider; + std::string AccessToken; + std::string Cluster; + std::string Namespace; +}; + struct ZenUpstreamZenConfig { std::string Name; @@ -51,6 +64,7 @@ enum class UpstreamCachePolicy : uint8_t struct ZenUpstreamCacheConfig { ZenUpstreamJupiterConfig JupiterConfig; + ZenUpstreamHordeConfig HordeConfig; ZenUpstreamZenConfig ZenConfig; int32_t UpstreamThreadCount = 4; int32_t ConnectTimeoutMilliseconds = 5000; @@ -106,6 +120,7 @@ struct ZenServerOptions bool IsTest = false; bool IsDedicated = false; // Indicates a dedicated/shared instance, with larger resource requirements bool StructuredCacheEnabled = true; + bool ComputeServiceEnabled = true; bool ShouldCrash = false; // Option for testing crash handling bool IsFirstRun = false; bool NoSentry = false; diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 2b064a610..eef1daab0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -577,12 +577,12 @@ CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) } CloudCacheResult -CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +CloudCacheSession::PostComputeTasks(IoBuffer TasksData) { ZEN_TRACE_CPU("HordeClient::PostComputeTasks"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -609,8 +609,11 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa CloudCacheResult CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) { + ZEN_TRACE_CPU("HordeClient::GetComputeUpdates"); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId + << "?wait=" << WaitSeconds; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -703,6 +706,8 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) { + ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); @@ -731,6 +736,8 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) CloudCacheExistsResult CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) { + ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + ExtendableStringBuilder<256> Query; for (const auto& Key : Keys) { @@ -766,7 +773,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas { IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); - for (auto& Item : ExistsResponse["id"sv]) + for (auto& Item : ExistsResponse["Needs"sv]) { if (Item.IsHash()) { @@ -878,6 +885,7 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std:: , m_ServiceUrl(Options.ServiceUrl) , m_DdcNamespace(Options.DdcNamespace) , m_BlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) , m_TokenProvider(std::move(TokenProvider)) diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index ddd7ea160..9854e6f1e 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -116,7 +116,7 @@ public: CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); - CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData); + CloudCacheResult PostComputeTasks(IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); CloudCacheResult GetObjectTree(const IoHash& Key); @@ -167,6 +167,7 @@ struct CloudCacheClientOptions std::string_view ServiceUrl; std::string_view DdcNamespace; std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; std::chrono::milliseconds ConnectTimeout{5000}; std::chrono::milliseconds Timeout{}; bool UseLegacyDdc = false; @@ -184,6 +185,7 @@ public: CloudCacheAccessToken AcquireAccessToken(); std::string_view DdcNamespace() const { return m_DdcNamespace; } std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } std::string_view ServiceUrl() const { return m_ServiceUrl; } spdlog::logger& Logger() { return m_Log; } @@ -193,6 +195,7 @@ private: std::string m_ServiceUrl; std::string m_DdcNamespace; std::string m_BlobStoreNamespace; + std::string m_ComputeCluster; std::chrono::milliseconds m_ConnectTimeout{}; std::chrono::milliseconds m_Timeout{}; std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 63c334265..918697224 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -23,6 +23,9 @@ # include <zenstore/cas.h> # include <zenstore/cidstore.h> +# include <auth/authmgr.h> +# include <upstream/upstreamcache.h> + # include "cache/structuredcachestore.h" # include "diag/logging.h" @@ -48,17 +51,76 @@ namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint { public: - HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<zen::CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore) + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) + , m_AuthMgr(Mgr) , m_CasStore(CasStore) , m_CidStore(CidStore) { - m_DisplayName = fmt::format("Horde - '{}'", Options.ServiceUrl); - m_Client = new CloudCacheClient(Options, std::move(TokenProvider)); + m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (ComputeAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, + .ClientId = ComputeAuthConfig.OAuthClientId, + .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); + } + else if (ComputeAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); + } + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (StorageAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, + .ClientId = StorageAuthConfig.OAuthClientId, + .ClientSecret = StorageAuthConfig.OAuthClientSecret}); + } + else if (StorageAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); + } } virtual ~HordeUpstreamApplyEndpoint() = default; @@ -109,10 +171,11 @@ namespace detail { m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; } - CloudCacheSession Session(m_Client); + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); { - CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs); + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -126,7 +189,7 @@ namespace detail { } { - CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects); + CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -136,17 +199,33 @@ namespace detail { .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } + + Result = StorageSession.PutRef("requests"sv, + UpstreamData.TaskId, + UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), + ZenContentType::kCbObject); + + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } UpstreamData.Objects.clear(); } CbObjectWriter Writer; + Writer.AddString("c"sv, m_ChannelId); Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); Writer.BeginArray("t"sv); Writer.AddObjectAttachment(UpstreamData.TaskId); Writer.EndArray(); IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); - CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData); + CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -214,8 +293,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"}; + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; } } @@ -265,8 +344,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"}; + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; } } @@ -330,9 +409,10 @@ namespace detail { try { - CloudCacheSession Session(m_Client); + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); - CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); + CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; if (!UpdatesResult.Success) @@ -349,9 +429,6 @@ namespace detail { CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); - // zen::StringBuilder<4096> ObjStr; - // zen::CompactBinaryToJson(TaskStatus, ObjStr); - for (auto& It : TaskStatus["u"sv]) { CbObjectView Status = It.AsObjectView(); @@ -366,7 +443,7 @@ namespace detail { continue; } - const IoHash TaskId = Status["h"sv].AsObjectAttachment(); + const IoHash TaskId = Status["h"sv].AsHash(); IoHash WorkerId; IoHash ActionId; @@ -383,7 +460,7 @@ namespace detail { m_PendingTasks.erase(TaskIt); } - GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session); + GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -411,9 +488,11 @@ namespace detail { CasStore& m_CasStore; CidStore& m_CidStore; + AuthMgr& m_AuthMgr; spdlog::logger& m_Log; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + RefPtr<CloudCacheClient> m_StorageClient; UpstreamApplyEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; std::string m_ChannelId; @@ -453,9 +532,9 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; } - const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); + const IoHash TaskId = TaskStatus["h"sv].AsHash(); const DateTime Time = TaskStatus["t"sv].AsDateTime(); - const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment(); + const IoHash ResultHash = TaskStatus["r"sv].AsHash(); const std::string_view AgentId = TaskStatus["a"sv].AsString(); const std::string_view LeaseId = TaskStatus["l"sv].AsString(); @@ -463,105 +542,96 @@ namespace detail { double ElapsedSeconds{}; // Get Result object and all Object Attachments + Binary Attachment IDs - CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - if (!ObjectTreeResult.Success) + if (!ObjectRefResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } - std::map<IoHash, IoBuffer> TreeObjectData; - std::map<IoHash, IoBuffer> TreeBinaryData; + std::vector<IoHash> ObjectsToIterate; + std::map<IoHash, IoBuffer> ObjectData; + std::map<IoHash, IoBuffer> BinaryData; - MemoryView ResponseView = ObjectTreeResult.Response; - while (ResponseView.GetSize() > 0) - { - CbFieldView Field = CbFieldView(ResponseView.GetData()); - ResponseView += Field.GetSize(); + ObjectData[ResultHash] = ObjectRefResult.Response; + CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]); + Object.IterateAttachments([&](CbFieldView Field) { if (Field.IsObjectAttachment()) { - const IoHash Hash = Field.AsObjectAttachment(); - Field = CbFieldView(ResponseView.GetData()); - ResponseView += Field.GetSize(); - if (!Field.IsObject()) // No data + const IoHash AttachmentHash = Field.AsObjectAttachment(); + if (!ObjectData.contains(AttachmentHash)) { - TreeObjectData[Hash] = {}; - continue; + ObjectsToIterate.push_back(AttachmentHash); } - MemoryView FieldView = Field.AsObjectView().GetView(); - - TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize()); } else if (Field.IsBinaryAttachment()) { - const IoHash Hash = Field.AsBinaryAttachment(); - TreeBinaryData[Hash] = {}; - } - else // Unknown type - { + const IoHash AttachmentHash = Field.AsBinaryAttachment(); + BinaryData[AttachmentHash] = {}; } - } + }); - for (auto& It : TreeObjectData) + while (!ObjectsToIterate.empty()) { - if (It.second.GetSize() == 0) + const IoHash Hash = ObjectsToIterate.back(); + ObjectsToIterate.pop_back(); + + CloudCacheResult ObjectResult = Session.GetObject(Hash); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + if (!ObjectResult.Success) { - CloudCacheResult ObjectResult = Session.GetObject(It.first); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (!ObjectTreeResult.Success) + return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + ObjectData[Hash] = std::move(ObjectResult.Response); + + CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]); + IterateObject.IterateAttachments([&](CbFieldView Field) { + if (Field.IsObjectAttachment()) { - return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + const IoHash AttachmentHash = Field.AsObjectAttachment(); + if (!ObjectData.contains(AttachmentHash)) + { + ObjectsToIterate.push_back(AttachmentHash); + } } - - if (!ObjectResult.Success) + else if (Field.IsBinaryAttachment()) { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + const IoHash AttachmentHash = Field.AsBinaryAttachment(); + BinaryData[AttachmentHash] = {}; } - It.second = std::move(ObjectResult.Response); - } + }); } - for (auto& It : TreeBinaryData) + // Batch load all binary data + for (auto& It : BinaryData) { - if (It.second.GetSize() == 0) + CloudCacheResult BlobResult = Session.GetBlob(It.first); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + if (!BlobResult.Success) { - CloudCacheResult BlobResult = Session.GetBlob(It.first); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - It.second = std::move(BlobResult.Response); + return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; } + It.second = std::move(BlobResult.Response); } - CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]); + CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]); int32_t ExitCode = ResultObject["e"sv].AsInt32(); IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); - std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize()); - std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize()); + std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()); + std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()); if (ExitCode != 0) { @@ -572,7 +642,7 @@ namespace detail { .StdErr = std::move(StdErr)}; } - CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]); + CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]); // Get build.output IoHash BuildOutputId; @@ -583,7 +653,7 @@ namespace detail { if (FileObject["n"sv].AsString() == "Build.output"sv) { BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); - BuildOutput = TreeBinaryData[BuildOutputId]; + BuildOutput = BinaryData[BuildOutputId]; break; } } @@ -604,7 +674,7 @@ namespace detail { const CbObjectView DirectoryObject = It.AsObjectView(); if (DirectoryObject["n"sv].AsString() == "Outputs"sv) { - OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; break; } } @@ -636,7 +706,7 @@ namespace detail { // Hash is the compressed data hash, and how it is stored in Horde IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); - if (!TreeBinaryData.contains(CompressedId)) + if (!BinaryData.contains(CompressedId)) { Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, @@ -659,7 +729,7 @@ namespace detail { } const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); - if (!TreeBinaryData.contains(CompressedId)) + if (!BinaryData.contains(CompressedId)) { Log().warn("Missing output {} compressed {} uncompressed", CompressedId.ToHexString(), @@ -668,7 +738,7 @@ namespace detail { return; } - CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId])); + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); if (!AttachmentBuffer) { @@ -826,11 +896,11 @@ namespace detail { int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); - // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) std::string Condition = fmt::format("Platform == '{}'", HostPlatform); if (HostPlatform == "Win64") { - Condition += " && Pool == 'Win-RemoteExec'"; + // TODO + // Condition += " && Pool == 'Win-RemoteExec'"; } std::map<std::string_view, int64_t> Resources; @@ -1176,10 +1246,10 @@ struct UpstreamApplyStats ////////////////////////////////////////////////////////////////////////// -class DefaultUpstreamApply final : public UpstreamApply +class UpstreamApplyImpl final : public UpstreamApply { public: - DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) : m_Log(logging::Get("upstream-apply")) , m_Options(Options) , m_CasStore(CasStore) @@ -1188,7 +1258,7 @@ public: { } - virtual ~DefaultUpstreamApply() { Shutdown(); } + virtual ~UpstreamApplyImpl() { Shutdown(); } virtual bool Initialize() override { @@ -1213,12 +1283,12 @@ public: for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { - m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); + m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this); } - m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this); + m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); - m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this); + m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); } return m_RunState.IsRunning; @@ -1558,18 +1628,27 @@ private: ////////////////////////////////////////////////////////////////////////// std::unique_ptr<UpstreamApply> -MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) { - return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore); + return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore); } std::unique_ptr<UpstreamApplyEndpoint> -MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore) +UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) { - return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, std::move(TokenProvider), CasStore, CidStore); + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + CasStore, + CidStore, + Mgr); } } // namespace zen diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index c56a22ac3..44c08e30e 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -25,6 +25,8 @@ class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; class CloudCacheTokenProvider; +struct UpstreamAuthConfig; +class AuthMgr; enum class UpstreamApplyState : int32_t { @@ -129,10 +131,18 @@ public: virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0; virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0; virtual UpstreamApplyEndpointStats& Stats() = 0; + + static std::unique_ptr<UpstreamApplyEndpoint> CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr); }; /** - * Manages one or more upstream cache endpoints. + * Manages one or more upstream compute endpoints. */ class UpstreamApply { @@ -157,14 +167,9 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; -}; -std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); - -std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore); + static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); +}; } // namespace zen diff --git a/zenserver/zenserver.cpp b/zenserver/zenserver.cpp index a684272c4..0d9126334 100644 --- a/zenserver/zenserver.cpp +++ b/zenserver/zenserver.cpp @@ -279,15 +279,20 @@ public: #endif #if ZEN_WITH_COMPUTE_SERVICES - ZEN_INFO("instantiating compute services"); + if (ServerOptions.ComputeServiceEnabled) + { + ZEN_INFO("instantiating compute services"); - std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; - zen::CreateDirectories(SandboxDir); - m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); + std::filesystem::path SandboxDir = m_DataRoot / "exec" / "sandbox"; + zen::CreateDirectories(SandboxDir); + m_HttpLaunchService = std::make_unique<zen::HttpLaunchService>(*m_CasStore, SandboxDir); - std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; - zen::CreateDirectories(ApplySandboxDir); - m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, *m_CidStore, ApplySandboxDir); + InitializeCompute(ServerOptions); + } + else + { + ZEN_INFO("NOT instantiating compute services"); + } #endif // ZEN_WITH_COMPUTE_SERVICES if (ServerOptions.StructuredCacheEnabled) @@ -327,14 +332,14 @@ public: m_Http->RegisterService(m_CasService); #if ZEN_WITH_COMPUTE_SERVICES - if (m_HttpLaunchService) + if (ServerOptions.ComputeServiceEnabled) { m_Http->RegisterService(*m_HttpLaunchService); - } - if (m_HttpFunctionService) - { - m_Http->RegisterService(*m_HttpFunctionService); + if (m_HttpFunctionService != nullptr) + { + m_Http->RegisterService(*m_HttpFunctionService); + } } #endif // ZEN_WITH_COMPUTE_SERVICES @@ -360,6 +365,7 @@ public: void InitializeState(const ZenServerOptions& ServerOptions); void InitializeStructuredCache(const ZenServerOptions& ServerOptions); + void InitializeCompute(const ZenServerOptions& ServerOptions); #if ZEN_ENABLE_MESH void StartMesh(int BasePort) @@ -821,6 +827,57 @@ ZenServer::InitializeStructuredCache(const ZenServerOptions& ServerOptions) m_Http->RegisterService(*m_UpstreamService); } +void +ZenServer::InitializeCompute(const ZenServerOptions& ServerOptions) +{ + ServerOptions; + const ZenUpstreamCacheConfig& UpstreamConfig = ServerOptions.UpstreamCacheConfig; + + // Horde compute upstream + if (UpstreamConfig.HordeConfig.Url.empty() == false && UpstreamConfig.HordeConfig.Url.empty() == false) + { + std::string_view EndpointName = UpstreamConfig.HordeConfig.Name.empty() ? "Horde"sv : UpstreamConfig.HordeConfig.Name; + + auto ComputeOptions = + zen::CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.HordeConfig.Url, + .ComputeCluster = UpstreamConfig.HordeConfig.Cluster, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds), + .UseLegacyDdc = false}; + + auto ComputeAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.HordeConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.HordeConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.HordeConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.HordeConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.HordeConfig.AccessToken}; + + auto StorageOptions = + zen::CloudCacheClientOptions{.Name = EndpointName, + .ServiceUrl = UpstreamConfig.JupiterConfig.Url, + .BlobStoreNamespace = UpstreamConfig.HordeConfig.Namespace, + .ConnectTimeout = std::chrono::milliseconds(UpstreamConfig.ConnectTimeoutMilliseconds), + .Timeout = std::chrono::milliseconds(UpstreamConfig.TimeoutMilliseconds)}; + + auto StorageAuthConfig = zen::UpstreamAuthConfig{.OAuthUrl = UpstreamConfig.JupiterConfig.OAuthUrl, + .OAuthClientId = UpstreamConfig.JupiterConfig.OAuthClientId, + .OAuthClientSecret = UpstreamConfig.JupiterConfig.OAuthClientSecret, + .OpenIdProvider = UpstreamConfig.JupiterConfig.OpenIdProvider, + .AccessToken = UpstreamConfig.JupiterConfig.AccessToken}; + + std::filesystem::path ApplySandboxDir = m_DataRoot / "exec" / "apply"; + zen::CreateDirectories(ApplySandboxDir); + m_HttpFunctionService = std::make_unique<zen::HttpFunctionService>(*m_CasStore, + *m_CidStore, + ApplySandboxDir, + ComputeOptions, + StorageOptions, + ComputeAuthConfig, + StorageAuthConfig, + *m_AuthMgr); + } +} + //////////////////////////////////////////////////////////////////////////////// class ZenEntryPoint |