diff options
| author | Joe Kirchoff <[email protected]> | 2022-03-17 09:55:09 -0700 |
|---|---|---|
| committer | GitHub <[email protected]> | 2022-03-17 09:55:09 -0700 |
| commit | 7466cb93fbb9f4082dc253a328222dac8bbe58e4 (patch) | |
| tree | 2b60020b7ab15867bfabf135bf8217aabe553c6d /zenserver/upstream | |
| parent | Introduced basic validation of the clang-format version (diff) | |
| download | zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.tar.xz zen-7466cb93fbb9f4082dc253a328222dac8bbe58e4.zip | |
Update horde compute to use Jupiter for storage (#60)
Diffstat (limited to 'zenserver/upstream')
| -rw-r--r-- | zenserver/upstream/jupiter.cpp | 16 | ||||
| -rw-r--r-- | zenserver/upstream/jupiter.h | 5 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.cpp | 295 | ||||
| -rw-r--r-- | zenserver/upstream/upstreamapply.h | 21 |
4 files changed, 216 insertions, 121 deletions
diff --git a/zenserver/upstream/jupiter.cpp b/zenserver/upstream/jupiter.cpp index 2b064a610..eef1daab0 100644 --- a/zenserver/upstream/jupiter.cpp +++ b/zenserver/upstream/jupiter.cpp @@ -577,12 +577,12 @@ CloudCacheSession::ObjectExists(const std::set<IoHash>& Keys) } CloudCacheResult -CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData) +CloudCacheSession::PostComputeTasks(IoBuffer TasksData) { ZEN_TRACE_CPU("HordeClient::PostComputeTasks"); ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster(); cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -609,8 +609,11 @@ CloudCacheSession::PostComputeTasks(std::string_view ChannelId, IoBuffer TasksDa CloudCacheResult CloudCacheSession::GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds) { + ZEN_TRACE_CPU("HordeClient::GetComputeUpdates"); + ExtendableStringBuilder<256> Uri; - Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << ChannelId << "/updates?wait=" << WaitSeconds; + Uri << m_CacheClient->ServiceUrl() << "/api/v1/compute/" << m_CacheClient->ComputeCluster() << "/updates/" << ChannelId + << "?wait=" << WaitSeconds; cpr::Session& Session = GetSession(); const CloudCacheAccessToken& AccessToken = GetAccessToken(); @@ -703,6 +706,8 @@ CloudCacheSession::VerifyAccessToken(long StatusCode) CloudCacheResult CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) { + ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + ExtendableStringBuilder<256> Uri; Uri << m_CacheClient->ServiceUrl() << "/api/v1/" << TypeId << "/" << m_CacheClient->BlobStoreNamespace() << "/" << Key.ToHexString(); @@ -731,6 +736,8 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const IoHash& Key) CloudCacheExistsResult CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHash>& Keys) { + ZEN_TRACE_CPU("HordeClient::CacheTypeExists"); + ExtendableStringBuilder<256> Query; for (const auto& Key : Keys) { @@ -766,7 +773,7 @@ CloudCacheSession::CacheTypeExists(std::string_view TypeId, const std::set<IoHas { IoBuffer Buffer = IoBuffer(zen::IoBuffer::Wrap, Response.text.data(), Response.text.size()); const CbObject ExistsResponse = LoadCompactBinaryObject(Buffer); - for (auto& Item : ExistsResponse["id"sv]) + for (auto& Item : ExistsResponse["Needs"sv]) { if (Item.IsHash()) { @@ -878,6 +885,7 @@ CloudCacheClient::CloudCacheClient(const CloudCacheClientOptions& Options, std:: , m_ServiceUrl(Options.ServiceUrl) , m_DdcNamespace(Options.DdcNamespace) , m_BlobStoreNamespace(Options.BlobStoreNamespace) +, m_ComputeCluster(Options.ComputeCluster) , m_ConnectTimeout(Options.ConnectTimeout) , m_Timeout(Options.Timeout) , m_TokenProvider(std::move(TokenProvider)) diff --git a/zenserver/upstream/jupiter.h b/zenserver/upstream/jupiter.h index ddd7ea160..9854e6f1e 100644 --- a/zenserver/upstream/jupiter.h +++ b/zenserver/upstream/jupiter.h @@ -116,7 +116,7 @@ public: CloudCacheExistsResult CompressedBlobExists(const std::set<IoHash>& Keys); CloudCacheExistsResult ObjectExists(const std::set<IoHash>& Keys); - CloudCacheResult PostComputeTasks(std::string_view ChannelId, IoBuffer TasksData); + CloudCacheResult PostComputeTasks(IoBuffer TasksData); CloudCacheResult GetComputeUpdates(std::string_view ChannelId, const uint32_t WaitSeconds = 0); CloudCacheResult GetObjectTree(const IoHash& Key); @@ -167,6 +167,7 @@ struct CloudCacheClientOptions std::string_view ServiceUrl; std::string_view DdcNamespace; std::string_view BlobStoreNamespace; + std::string_view ComputeCluster; std::chrono::milliseconds ConnectTimeout{5000}; std::chrono::milliseconds Timeout{}; bool UseLegacyDdc = false; @@ -184,6 +185,7 @@ public: CloudCacheAccessToken AcquireAccessToken(); std::string_view DdcNamespace() const { return m_DdcNamespace; } std::string_view BlobStoreNamespace() const { return m_BlobStoreNamespace; } + std::string_view ComputeCluster() const { return m_ComputeCluster; } std::string_view ServiceUrl() const { return m_ServiceUrl; } spdlog::logger& Logger() { return m_Log; } @@ -193,6 +195,7 @@ private: std::string m_ServiceUrl; std::string m_DdcNamespace; std::string m_BlobStoreNamespace; + std::string m_ComputeCluster; std::chrono::milliseconds m_ConnectTimeout{}; std::chrono::milliseconds m_Timeout{}; std::unique_ptr<CloudCacheTokenProvider> m_TokenProvider; diff --git a/zenserver/upstream/upstreamapply.cpp b/zenserver/upstream/upstreamapply.cpp index 63c334265..918697224 100644 --- a/zenserver/upstream/upstreamapply.cpp +++ b/zenserver/upstream/upstreamapply.cpp @@ -23,6 +23,9 @@ # include <zenstore/cas.h> # include <zenstore/cidstore.h> +# include <auth/authmgr.h> +# include <upstream/upstreamcache.h> + # include "cache/structuredcachestore.h" # include "diag/logging.h" @@ -48,17 +51,76 @@ namespace detail { class HordeUpstreamApplyEndpoint final : public UpstreamApplyEndpoint { public: - HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<zen::CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore) + HordeUpstreamApplyEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) : m_Log(logging::Get("upstream-apply")) + , m_AuthMgr(Mgr) , m_CasStore(CasStore) , m_CidStore(CidStore) { - m_DisplayName = fmt::format("Horde - '{}'", Options.ServiceUrl); - m_Client = new CloudCacheClient(Options, std::move(TokenProvider)); + m_DisplayName = fmt::format("{} - '{}'+'{}'", ComputeOptions.Name, ComputeOptions.ServiceUrl, StorageOptions.ServiceUrl); m_ChannelId = fmt::format("zen-{}", zen::GetSessionIdString()); + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (ComputeAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = ComputeAuthConfig.OAuthUrl, + .ClientId = ComputeAuthConfig.OAuthClientId, + .ClientSecret = ComputeAuthConfig.OAuthClientSecret}); + } + else if (ComputeAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(ComputeAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(ComputeAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_Client = new CloudCacheClient(ComputeOptions, std::move(TokenProvider)); + } + + { + std::unique_ptr<CloudCacheTokenProvider> TokenProvider; + + if (StorageAuthConfig.OAuthUrl.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromOAuthClientCredentials({.Url = StorageAuthConfig.OAuthUrl, + .ClientId = StorageAuthConfig.OAuthClientId, + .ClientSecret = StorageAuthConfig.OAuthClientSecret}); + } + else if (StorageAuthConfig.OpenIdProvider.empty() == false) + { + TokenProvider = + CloudCacheTokenProvider::CreateFromCallback([this, ProviderName = std::string(StorageAuthConfig.OpenIdProvider)]() { + AuthMgr::OpenIdAccessToken Token = m_AuthMgr.GetOpenIdAccessToken(ProviderName); + return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime}; + }); + } + else + { + CloudCacheAccessToken AccessToken{.Value = std::string(StorageAuthConfig.AccessToken), + .ExpireTime = CloudCacheAccessToken::TimePoint::max()}; + TokenProvider = CloudCacheTokenProvider::CreateFromStaticToken(AccessToken); + } + + m_StorageClient = new CloudCacheClient(StorageOptions, std::move(TokenProvider)); + } } virtual ~HordeUpstreamApplyEndpoint() = default; @@ -109,10 +171,11 @@ namespace detail { m_PendingTasks[UpstreamData.TaskId] = ApplyRecord; } - CloudCacheSession Session(m_Client); + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); { - CloudCacheResult Result = BatchPutBlobsIfMissing(Session, UpstreamData.Blobs); + CloudCacheResult Result = BatchPutBlobsIfMissing(StorageSession, UpstreamData.Blobs); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -126,7 +189,7 @@ namespace detail { } { - CloudCacheResult Result = BatchPutObjectsIfMissing(Session, UpstreamData.Objects); + CloudCacheResult Result = BatchPutObjectsIfMissing(StorageSession, UpstreamData.Objects); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -136,17 +199,33 @@ namespace detail { .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } + + Result = StorageSession.PutRef("requests"sv, + UpstreamData.TaskId, + UpstreamData.Objects[UpstreamData.TaskId].GetBuffer().AsIoBuffer(), + ZenContentType::kCbObject); + + Bytes += Result.Bytes; + ElapsedSeconds += Result.ElapsedSeconds; + if (!Result.Success) + { + return {.Error{.ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to add task ref"}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } UpstreamData.Objects.clear(); } CbObjectWriter Writer; + Writer.AddString("c"sv, m_ChannelId); Writer.AddObjectAttachment("r"sv, UpstreamData.RequirementsId); Writer.BeginArray("t"sv); Writer.AddObjectAttachment(UpstreamData.TaskId); Writer.EndArray(); IoBuffer TasksData = Writer.Save().GetBuffer().AsIoBuffer(); - CloudCacheResult Result = Session.PostComputeTasks(m_ChannelId, TasksData); + CloudCacheResult Result = ComputeSession.PostComputeTasks(TasksData); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; if (!Result.Success) @@ -214,8 +293,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put blobs"}; + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put blobs"}; } } @@ -265,8 +344,8 @@ namespace detail { { return {.Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds, - .ErrorCode = ExistsResult.ErrorCode ? ExistsResult.ErrorCode : -1, - .Reason = !ExistsResult.Reason.empty() ? std::move(ExistsResult.Reason) : "Failed to put objects"}; + .ErrorCode = Result.ErrorCode ? Result.ErrorCode : -1, + .Reason = !Result.Reason.empty() ? std::move(Result.Reason) : "Failed to put objects"}; } } @@ -330,9 +409,10 @@ namespace detail { try { - CloudCacheSession Session(m_Client); + CloudCacheSession ComputeSession(m_Client); + CloudCacheSession StorageSession(m_StorageClient); - CloudCacheResult UpdatesResult = Session.GetComputeUpdates(m_ChannelId); + CloudCacheResult UpdatesResult = ComputeSession.GetComputeUpdates(m_ChannelId); Bytes += UpdatesResult.Bytes; ElapsedSeconds += UpdatesResult.ElapsedSeconds; if (!UpdatesResult.Success) @@ -349,9 +429,6 @@ namespace detail { CbObject TaskStatus = LoadCompactBinaryObject(UpdatesResult.Response); - // zen::StringBuilder<4096> ObjStr; - // zen::CompactBinaryToJson(TaskStatus, ObjStr); - for (auto& It : TaskStatus["u"sv]) { CbObjectView Status = It.AsObjectView(); @@ -366,7 +443,7 @@ namespace detail { continue; } - const IoHash TaskId = Status["h"sv].AsObjectAttachment(); + const IoHash TaskId = Status["h"sv].AsHash(); IoHash WorkerId; IoHash ActionId; @@ -383,7 +460,7 @@ namespace detail { m_PendingTasks.erase(TaskIt); } - GetUpstreamApplyResult Result = ProcessTaskStatus(Status, Session); + GetUpstreamApplyResult Result = ProcessTaskStatus(Status, StorageSession); Bytes += Result.Bytes; ElapsedSeconds += Result.ElapsedSeconds; @@ -411,9 +488,11 @@ namespace detail { CasStore& m_CasStore; CidStore& m_CidStore; + AuthMgr& m_AuthMgr; spdlog::logger& m_Log; std::string m_DisplayName; RefPtr<CloudCacheClient> m_Client; + RefPtr<CloudCacheClient> m_StorageClient; UpstreamApplyEndpointStats m_Stats; std::atomic_bool m_HealthOk{false}; std::string m_ChannelId; @@ -453,9 +532,9 @@ namespace detail { return {.Error{.ErrorCode = -1, .Reason = fmt::format("Task {}", ComputeTaskOutcomeToString(Outcome))}}; } - const IoHash TaskId = TaskStatus["h"sv].AsObjectAttachment(); + const IoHash TaskId = TaskStatus["h"sv].AsHash(); const DateTime Time = TaskStatus["t"sv].AsDateTime(); - const IoHash ResultHash = TaskStatus["r"sv].AsObjectAttachment(); + const IoHash ResultHash = TaskStatus["r"sv].AsHash(); const std::string_view AgentId = TaskStatus["a"sv].AsString(); const std::string_view LeaseId = TaskStatus["l"sv].AsString(); @@ -463,105 +542,96 @@ namespace detail { double ElapsedSeconds{}; // Get Result object and all Object Attachments + Binary Attachment IDs - CloudCacheResult ObjectTreeResult = Session.GetObjectTree(ResultHash); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; + CloudCacheResult ObjectRefResult = Session.GetRef("responses"sv, ResultHash, ZenContentType::kCbObject); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; - if (!ObjectTreeResult.Success) + if (!ObjectRefResult.Success) { return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object data"}, .Bytes = Bytes, .ElapsedSeconds = ElapsedSeconds}; } - std::map<IoHash, IoBuffer> TreeObjectData; - std::map<IoHash, IoBuffer> TreeBinaryData; + std::vector<IoHash> ObjectsToIterate; + std::map<IoHash, IoBuffer> ObjectData; + std::map<IoHash, IoBuffer> BinaryData; - MemoryView ResponseView = ObjectTreeResult.Response; - while (ResponseView.GetSize() > 0) - { - CbFieldView Field = CbFieldView(ResponseView.GetData()); - ResponseView += Field.GetSize(); + ObjectData[ResultHash] = ObjectRefResult.Response; + CbObject Object = LoadCompactBinaryObject(ObjectData[ResultHash]); + Object.IterateAttachments([&](CbFieldView Field) { if (Field.IsObjectAttachment()) { - const IoHash Hash = Field.AsObjectAttachment(); - Field = CbFieldView(ResponseView.GetData()); - ResponseView += Field.GetSize(); - if (!Field.IsObject()) // No data + const IoHash AttachmentHash = Field.AsObjectAttachment(); + if (!ObjectData.contains(AttachmentHash)) { - TreeObjectData[Hash] = {}; - continue; + ObjectsToIterate.push_back(AttachmentHash); } - MemoryView FieldView = Field.AsObjectView().GetView(); - - TreeObjectData[Hash] = IoBuffer(IoBuffer::Wrap, FieldView.GetData(), FieldView.GetSize()); } else if (Field.IsBinaryAttachment()) { - const IoHash Hash = Field.AsBinaryAttachment(); - TreeBinaryData[Hash] = {}; - } - else // Unknown type - { + const IoHash AttachmentHash = Field.AsBinaryAttachment(); + BinaryData[AttachmentHash] = {}; } - } + }); - for (auto& It : TreeObjectData) + while (!ObjectsToIterate.empty()) { - if (It.second.GetSize() == 0) + const IoHash Hash = ObjectsToIterate.back(); + ObjectsToIterate.pop_back(); + + CloudCacheResult ObjectResult = Session.GetObject(Hash); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + if (!ObjectResult.Success) { - CloudCacheResult ObjectResult = Session.GetObject(It.first); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (!ObjectTreeResult.Success) + return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; + } + ObjectData[Hash] = std::move(ObjectResult.Response); + + CbObject IterateObject = LoadCompactBinaryObject(ObjectData[Hash]); + IterateObject.IterateAttachments([&](CbFieldView Field) { + if (Field.IsObjectAttachment()) { - return {.Error{.ErrorCode = ObjectResult.ErrorCode, .Reason = std::move(ObjectResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + const IoHash AttachmentHash = Field.AsObjectAttachment(); + if (!ObjectData.contains(AttachmentHash)) + { + ObjectsToIterate.push_back(AttachmentHash); + } } - - if (!ObjectResult.Success) + else if (Field.IsBinaryAttachment()) { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result object attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; + const IoHash AttachmentHash = Field.AsBinaryAttachment(); + BinaryData[AttachmentHash] = {}; } - It.second = std::move(ObjectResult.Response); - } + }); } - for (auto& It : TreeBinaryData) + // Batch load all binary data + for (auto& It : BinaryData) { - if (It.second.GetSize() == 0) + CloudCacheResult BlobResult = Session.GetBlob(It.first); + Bytes += ObjectRefResult.Bytes; + ElapsedSeconds += ObjectRefResult.ElapsedSeconds; + if (!BlobResult.Success) { - CloudCacheResult BlobResult = Session.GetBlob(It.first); - Bytes += ObjectTreeResult.Bytes; - ElapsedSeconds += ObjectTreeResult.ElapsedSeconds; - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - - if (!BlobResult.Success) - { - return {.Error{.ErrorCode = -1, .Reason = "Failed to get result binary attachment data"}, - .Bytes = Bytes, - .ElapsedSeconds = ElapsedSeconds}; - } - It.second = std::move(BlobResult.Response); + return {.Error{.ErrorCode = BlobResult.ErrorCode, .Reason = std::move(BlobResult.Reason)}, + .Bytes = Bytes, + .ElapsedSeconds = ElapsedSeconds}; } + It.second = std::move(BlobResult.Response); } - CbObject ResultObject = LoadCompactBinaryObject(TreeObjectData[ResultHash]); + CbObject ResultObject = LoadCompactBinaryObject(ObjectData[ResultHash]); int32_t ExitCode = ResultObject["e"sv].AsInt32(); IoHash StdOutHash = ResultObject["so"sv].AsBinaryAttachment(); IoHash StdErrHash = ResultObject["se"sv].AsBinaryAttachment(); IoHash OutputHash = ResultObject["o"sv].AsObjectAttachment(); - std::string StdOut = std::string((const char*)TreeBinaryData[StdOutHash].GetData(), TreeBinaryData[StdOutHash].GetSize()); - std::string StdErr = std::string((const char*)TreeBinaryData[StdErrHash].GetData(), TreeBinaryData[StdErrHash].GetSize()); + std::string StdOut = std::string((const char*)BinaryData[StdOutHash].GetData(), BinaryData[StdOutHash].GetSize()); + std::string StdErr = std::string((const char*)BinaryData[StdErrHash].GetData(), BinaryData[StdErrHash].GetSize()); if (ExitCode != 0) { @@ -572,7 +642,7 @@ namespace detail { .StdErr = std::move(StdErr)}; } - CbObject OutputObject = LoadCompactBinaryObject(TreeObjectData[OutputHash]); + CbObject OutputObject = LoadCompactBinaryObject(ObjectData[OutputHash]); // Get build.output IoHash BuildOutputId; @@ -583,7 +653,7 @@ namespace detail { if (FileObject["n"sv].AsString() == "Build.output"sv) { BuildOutputId = FileObject["h"sv].AsBinaryAttachment(); - BuildOutput = TreeBinaryData[BuildOutputId]; + BuildOutput = BinaryData[BuildOutputId]; break; } } @@ -604,7 +674,7 @@ namespace detail { const CbObjectView DirectoryObject = It.AsObjectView(); if (DirectoryObject["n"sv].AsString() == "Outputs"sv) { - OutputDirectoryTree = TreeObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; + OutputDirectoryTree = ObjectData[DirectoryObject["h"sv].AsObjectAttachment()]; break; } } @@ -636,7 +706,7 @@ namespace detail { // Hash is the compressed data hash, and how it is stored in Horde IoHash CompressedId = FileObject["h"sv].AsBinaryAttachment(); - if (!TreeBinaryData.contains(CompressedId)) + if (!BinaryData.contains(CompressedId)) { Log().warn("Object attachment chunk not retrieved from Horde {}", CompressedId.ToHexString()); return {.Error{.ErrorCode = -1, .Reason = "Object attachment chunk not retrieved from Horde"}, @@ -659,7 +729,7 @@ namespace detail { } const IoHash& CompressedId = CidToCompressedId.at(DecompressedId); - if (!TreeBinaryData.contains(CompressedId)) + if (!BinaryData.contains(CompressedId)) { Log().warn("Missing output {} compressed {} uncompressed", CompressedId.ToHexString(), @@ -668,7 +738,7 @@ namespace detail { return; } - CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(TreeBinaryData[CompressedId])); + CompressedBuffer AttachmentBuffer = CompressedBuffer::FromCompressed(SharedBuffer(BinaryData[CompressedId])); if (!AttachmentBuffer) { @@ -826,11 +896,11 @@ namespace detail { int64_t Memory = ApplyRecord.WorkerDescriptor["memory"sv].AsInt64(); bool Exclusive = ApplyRecord.WorkerDescriptor["exclusive"sv].AsBool(); - // TODO: Remove override when Horde accepts the UE style Host Platforms (Win64, Linux, Mac) std::string Condition = fmt::format("Platform == '{}'", HostPlatform); if (HostPlatform == "Win64") { - Condition += " && Pool == 'Win-RemoteExec'"; + // TODO + // Condition += " && Pool == 'Win-RemoteExec'"; } std::map<std::string_view, int64_t> Resources; @@ -1176,10 +1246,10 @@ struct UpstreamApplyStats ////////////////////////////////////////////////////////////////////////// -class DefaultUpstreamApply final : public UpstreamApply +class UpstreamApplyImpl final : public UpstreamApply { public: - DefaultUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) + UpstreamApplyImpl(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) : m_Log(logging::Get("upstream-apply")) , m_Options(Options) , m_CasStore(CasStore) @@ -1188,7 +1258,7 @@ public: { } - virtual ~DefaultUpstreamApply() { Shutdown(); } + virtual ~UpstreamApplyImpl() { Shutdown(); } virtual bool Initialize() override { @@ -1213,12 +1283,12 @@ public: for (uint32_t Idx = 0; Idx < m_Options.ThreadCount; Idx++) { - m_UpstreamThreads.emplace_back(&DefaultUpstreamApply::ProcessUpstreamQueue, this); + m_UpstreamThreads.emplace_back(&UpstreamApplyImpl::ProcessUpstreamQueue, this); } - m_UpstreamUpdatesThread = std::thread(&DefaultUpstreamApply::ProcessUpstreamUpdates, this); + m_UpstreamUpdatesThread = std::thread(&UpstreamApplyImpl::ProcessUpstreamUpdates, this); - m_EndpointMonitorThread = std::thread(&DefaultUpstreamApply::MonitorEndpoints, this); + m_EndpointMonitorThread = std::thread(&UpstreamApplyImpl::MonitorEndpoints, this); } return m_RunState.IsRunning; @@ -1558,18 +1628,27 @@ private: ////////////////////////////////////////////////////////////////////////// std::unique_ptr<UpstreamApply> -MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) +UpstreamApply::Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore) { - return std::make_unique<DefaultUpstreamApply>(Options, CasStore, CidStore); + return std::make_unique<UpstreamApplyImpl>(Options, CasStore, CidStore); } std::unique_ptr<UpstreamApplyEndpoint> -MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore) +UpstreamApplyEndpoint::CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr) { - return std::make_unique<detail::HordeUpstreamApplyEndpoint>(Options, std::move(TokenProvider), CasStore, CidStore); + return std::make_unique<detail::HordeUpstreamApplyEndpoint>(ComputeOptions, + ComputeAuthConfig, + StorageOptions, + StorageAuthConfig, + CasStore, + CidStore, + Mgr); } } // namespace zen diff --git a/zenserver/upstream/upstreamapply.h b/zenserver/upstream/upstreamapply.h index c56a22ac3..44c08e30e 100644 --- a/zenserver/upstream/upstreamapply.h +++ b/zenserver/upstream/upstreamapply.h @@ -25,6 +25,8 @@ class CidStore; class ZenCacheStore; struct CloudCacheClientOptions; class CloudCacheTokenProvider; +struct UpstreamAuthConfig; +class AuthMgr; enum class UpstreamApplyState : int32_t { @@ -129,10 +131,18 @@ public: virtual PostUpstreamApplyResult PostApply(const UpstreamApplyRecord& ApplyRecord) = 0; virtual GetUpstreamApplyUpdatesResult GetUpdates() = 0; virtual UpstreamApplyEndpointStats& Stats() = 0; + + static std::unique_ptr<UpstreamApplyEndpoint> CreateHordeEndpoint(const CloudCacheClientOptions& ComputeOptions, + const UpstreamAuthConfig& ComputeAuthConfig, + const CloudCacheClientOptions& StorageOptions, + const UpstreamAuthConfig& StorageAuthConfig, + CasStore& CasStore, + CidStore& CidStore, + AuthMgr& Mgr); }; /** - * Manages one or more upstream cache endpoints. + * Manages one or more upstream compute endpoints. */ class UpstreamApply { @@ -157,14 +167,9 @@ public: virtual EnqueueResult EnqueueUpstream(UpstreamApplyRecord ApplyRecord) = 0; virtual StatusResult GetStatus(const IoHash& WorkerId, const IoHash& ActionId) = 0; virtual void GetStatus(CbObjectWriter& CbO) = 0; -}; -std::unique_ptr<UpstreamApply> MakeUpstreamApply(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); - -std::unique_ptr<UpstreamApplyEndpoint> MakeHordeUpstreamEndpoint(const CloudCacheClientOptions& Options, - std::unique_ptr<CloudCacheTokenProvider> TokenProvider, - CasStore& CasStore, - CidStore& CidStore); + static std::unique_ptr<UpstreamApply> Create(const UpstreamApplyOptions& Options, CasStore& CasStore, CidStore& CidStore); +}; } // namespace zen |